Whamcloud - gitweb
LU-12090 utils: lfs rmfid
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 9167c83..5ae5c63 100644 (file)
@@ -1974,6 +1974,219 @@ out_shrink:
        return rc;
 }
 
+static int mdt_rmfid_unlink(struct mdt_thread_info *info,
+                           const struct lu_fid *pfid,
+                           const struct lu_name *name,
+                           struct mdt_object *obj, s64 ctime)
+{
+       struct lu_fid *child_fid = &info->mti_tmp_fid1;
+       struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
+       struct mdt_device *mdt = info->mti_mdt;
+       struct md_attr *ma = &info->mti_attr;
+       struct mdt_lock_handle *parent_lh;
+       struct mdt_lock_handle *child_lh;
+       struct mdt_object *pobj;
+       bool cos_incompat = false;
+       int rc;
+       ENTRY;
+
+       pobj = mdt_object_find(info->mti_env, mdt, pfid);
+       if (IS_ERR(pobj))
+               GOTO(out, rc = PTR_ERR(pobj));
+
+       parent_lh = &info->mti_lh[MDT_LH_PARENT];
+       mdt_lock_pdo_init(parent_lh, LCK_PW, name);
+       rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
+       if (rc != 0)
+               GOTO(put_parent, rc);
+
+       if (mdt_object_remote(pobj))
+               cos_incompat = true;
+
+       rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
+                       name, child_fid, &info->mti_spec);
+       if (rc != 0)
+               GOTO(unlock_parent, rc);
+
+       if (!lu_fid_eq(child_fid, mdt_object_fid(obj)))
+               GOTO(unlock_parent, rc = -EREMCHG);
+
+       child_lh = &info->mti_lh[MDT_LH_CHILD];
+       mdt_lock_reg_init(child_lh, LCK_EX);
+       rc = mdt_reint_striped_lock(info, obj, child_lh,
+                                   MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
+                                   einfo, cos_incompat);
+       if (rc != 0)
+               GOTO(unlock_parent, rc);
+
+       if (atomic_read(&obj->mot_open_count)) {
+               CDEBUG(D_OTHER, "object "DFID" open, skip\n",
+                      PFID(mdt_object_fid(obj)));
+               GOTO(unlock_child, rc = -EBUSY);
+       }
+
+       ma->ma_need = 0;
+       ma->ma_valid = MA_INODE;
+       ma->ma_attr.la_valid = LA_CTIME;
+       ma->ma_attr.la_ctime = ctime;
+
+       mutex_lock(&obj->mot_lov_mutex);
+
+       rc = mdo_unlink(info->mti_env, mdt_object_child(pobj),
+                       mdt_object_child(obj), name, ma, 0);
+
+       mutex_unlock(&obj->mot_lov_mutex);
+
+unlock_child:
+       mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
+unlock_parent:
+       mdt_object_unlock(info, pobj, parent_lh, 1);
+put_parent:
+       mdt_object_put(info->mti_env, pobj);
+out:
+       RETURN(rc);
+}
+
+static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
+                                       struct mdt_object *obj)
+{
+       struct lu_ucred *uc = lu_ucred(info->mti_env);
+       struct md_attr *ma = &info->mti_attr;
+       struct lu_attr *la = &ma->ma_attr;
+       int rc = 0;
+       ENTRY;
+
+       ma->ma_need = MA_INODE;
+       rc = mo_attr_get(info->mti_env, mdt_object_child(obj), ma);
+       if (rc)
+               GOTO(out, rc);
+
+       if (la->la_flags & LUSTRE_IMMUTABLE_FL)
+                       rc = -EACCES;
+
+       if (md_capable(uc, CFS_CAP_DAC_OVERRIDE))
+               RETURN(0);
+       if (uc->uc_fsuid == la->la_uid) {
+               if ((la->la_mode & S_IWUSR) == 0)
+                       rc = -EACCES;
+       } else if (uc->uc_fsgid == la->la_gid) {
+               if ((la->la_mode & S_IWGRP) == 0)
+                       rc = -EACCES;
+       } else if ((la->la_mode & S_IWOTH) == 0) {
+                       rc = -EACCES;
+       }
+
+out:
+       RETURN(rc);
+}
+
+static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid,
+                        s64 ctime)
+{
+       struct mdt_device *mdt = info->mti_mdt;
+       struct mdt_object *obj = NULL;
+       struct linkea_data ldata = { NULL };
+       struct lu_buf *buf = &info->mti_big_buf;
+       struct lu_name *name = &info->mti_name;
+       struct lu_fid *pfid = &info->mti_tmp_fid1;
+       struct link_ea_header *leh;
+       struct link_ea_entry *lee;
+       int reclen, count, rc = 0;
+       ENTRY;
+
+       if (!fid_is_sane(fid))
+               GOTO(out, rc = -EINVAL);
+
+       if (!fid_is_namespace_visible(fid))
+               GOTO(out, rc = -EINVAL);
+
+       obj = mdt_object_find(info->mti_env, mdt, fid);
+       if (IS_ERR(obj))
+               GOTO(out, rc = PTR_ERR(obj));
+
+       if (mdt_object_remote(obj))
+               GOTO(out, rc = -EREMOTE);
+       if (!mdt_object_exists(obj) || lu_object_is_dying(&obj->mot_header))
+               GOTO(out, rc = -ENOENT);
+
+       rc = mdt_rmfid_check_permission(info, obj);
+       if (rc)
+               GOTO(out, rc);
+
+       /* take LinkEA */
+       buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+       if (!buf->lb_buf)
+               GOTO(out, rc = -ENOMEM);
+
+       ldata.ld_buf = buf;
+       rc = mdt_links_read(info, obj, &ldata);
+       if (rc)
+               GOTO(out, rc);
+
+       leh = buf->lb_buf;
+       lee = (struct link_ea_entry *)(leh + 1);
+       for (count = 0; count < leh->leh_reccount; count++) {
+               /* remove every hardlink */
+               linkea_entry_unpack(lee, &reclen, name, pfid);
+               lee = (struct link_ea_entry *) ((char *)lee + reclen);
+               rc = mdt_rmfid_unlink(info, pfid, name, obj, ctime);
+               if (rc)
+                       break;
+       }
+
+out:
+       if (obj && !IS_ERR(obj))
+               mdt_object_put(info->mti_env, obj);
+       if (info->mti_big_buf.lb_buf)
+               lu_buf_free(&info->mti_big_buf);
+
+       RETURN(rc);
+}
+
+static int mdt_rmfid(struct tgt_session_info *tsi)
+{
+       struct mdt_thread_info *mti = tsi2mdt_info(tsi);
+       struct mdt_body *reqbody;
+       struct lu_fid *fids, *rfids;
+       int bufsize, rc;
+       __u32 *rcs;
+       int i, nr;
+       ENTRY;
+
+       reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY);
+       if (reqbody == NULL)
+               RETURN(-EPROTO);
+       bufsize = req_capsule_get_size(tsi->tsi_pill, &RMF_FID_ARRAY,
+                                      RCL_CLIENT);
+       nr = bufsize / sizeof(struct lu_fid);
+       if (nr * sizeof(struct lu_fid) != bufsize)
+               RETURN(-EINVAL);
+       req_capsule_set_size(tsi->tsi_pill, &RMF_RCS,
+                            RCL_SERVER, nr * sizeof(__u32));
+       req_capsule_set_size(tsi->tsi_pill, &RMF_FID_ARRAY,
+                            RCL_SERVER, nr * sizeof(struct lu_fid));
+       rc = req_capsule_server_pack(tsi->tsi_pill);
+       if (rc)
+               GOTO(out, rc = err_serious(rc));
+       fids = req_capsule_client_get(tsi->tsi_pill, &RMF_FID_ARRAY);
+       if (fids == NULL)
+               RETURN(-EPROTO);
+       rcs = req_capsule_server_get(tsi->tsi_pill, &RMF_RCS);
+       LASSERT(rcs);
+       rfids = req_capsule_server_get(tsi->tsi_pill, &RMF_FID_ARRAY);
+       LASSERT(rfids);
+
+       mdt_init_ucred(mti, reqbody);
+       for (i = 0; i < nr; i++) {
+               rfids[i] = fids[i];
+               rcs[i] = mdt_rmfid_one(mti, fids + i, reqbody->mbo_ctime);
+       }
+       mdt_exit_ucred(mti);
+
+out:
+       RETURN(rc);
+}
+
 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         void *karg, void __user *uarg);
 
@@ -4941,6 +5154,7 @@ TGT_MDT_HDL(HAS_BODY | HAS_REPLY, MDS_HSM_REQUEST,
 TGT_MDT_HDL(HAS_KEY | HAS_BODY | HAS_REPLY | IS_MUTABLE,
            MDS_SWAP_LAYOUTS,
            mdt_swap_layouts),
+TGT_MDT_HDL(IS_MUTABLE,                MDS_RMFID,      mdt_rmfid),
 };
 
 static struct tgt_handler mdt_io_ops[] = {