Whamcloud - gitweb
LU-1187 mdt: unlink remote directory
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index b906337..171ee3e 100644 (file)
@@ -91,7 +91,6 @@ static void mdt_obj_version_get(struct mdt_thread_info *info,
                                 struct mdt_object *o, __u64 *version)
 {
         LASSERT(o);
-        LASSERT(mdt_object_exists(o) >= 0);
        if (mdt_object_exists(o) > 0 && !mdt_object_obf(o))
                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
         else
@@ -308,6 +307,14 @@ static int mdt_md_create(struct mdt_thread_info *info)
 
                if (mdt_object_exists(child) < 0) {
                        struct seq_server_site *ss;
+                       struct lu_ucred *uc  = mdt_ucred(info);
+
+                       if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) {
+                               CERROR("%s: Creating remote dir is only "
+                                      "permitted for administrator: rc = %d\n",
+                                       mdt2obd_dev(mdt)->obd_name, -EPERM);
+                               GOTO(out_put_child, rc = -EPERM);
+                       }
 
                        ss = mdt_seq_site(mdt);
                        if (ss->ss_node_id != 0 &&
@@ -671,46 +678,115 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                 RETURN(err_serious(-ENOENT));
 
         /*
-        * step 1: lock the parent.
+        * step 1: Found the parent.
          */
-        parent_lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
-                         rr->rr_namelen);
+       mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
+       if (IS_ERR(mp)) {
+               rc = PTR_ERR(mp);
+               GOTO(out, rc);
+       }
 
-        mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
-                                  MDS_INODELOCK_UPDATE);
-       if (IS_ERR(mp))
-               GOTO(out, rc = PTR_ERR(mp));
+       if (mdt_object_obf(mp))
+               GOTO(put_parent, rc = -EPERM);
+
+       parent_lh = &info->mti_lh[MDT_LH_PARENT];
+       lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
+       if (mdt_object_exists(mp) < 0) {
+               mdt_lock_reg_init(parent_lh, LCK_EX);
+               rc = mdt_remote_object_lock(info, mp, &parent_lh->mlh_rreg_lh,
+                                           parent_lh->mlh_rreg_mode,
+                                           MDS_INODELOCK_UPDATE);
+               if (rc != ELDLM_OK)
+                       GOTO(put_parent, rc);
+
+       } else {
+               mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
+                                 rr->rr_namelen);
+               rc = mdt_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
+                                    MDT_LOCAL_LOCK);
+               if (rc)
+                       GOTO(put_parent, rc);
 
-        if (mdt_object_obf(mp))
-                GOTO(out_unlock_parent, rc = -EPERM);
+               rc = mdt_version_get_check_save(info, mp, 0);
+               if (rc)
+                       GOTO(unlock_parent, rc);
+       }
 
-        rc = mdt_version_get_check_save(info, mp, 0);
-        if (rc)
-                GOTO(out_unlock_parent, rc);
+       /* step 2: find & lock the child */
+       /* lookup child object along with version checking */
+       fid_zero(child_fid);
+       rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1);
+       if (rc != 0)
+               GOTO(unlock_parent, rc);
 
         mdt_reint_init_ma(info, ma);
 
-        /* step 2: find & lock the child */
-        lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
-        /* lookup child object along with version checking */
-        fid_zero(child_fid);
-        rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1);
-        if (rc != 0)
-                 GOTO(out_unlock_parent, rc);
+       /* We will lock the child regardless it is local or remote. No harm. */
+       mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
+       if (IS_ERR(mc))
+               GOTO(unlock_parent, rc = PTR_ERR(mc));
 
-        /* We will lock the child regardless it is local or remote. No harm. */
-        mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
-        if (IS_ERR(mc))
-                GOTO(out_unlock_parent, rc = PTR_ERR(mc));
         child_lh = &info->mti_lh[MDT_LH_CHILD];
         mdt_lock_reg_init(child_lh, LCK_EX);
+       if (mdt_object_exists(mc) < 0) {
+               struct mdt_body  *repbody;
+
+               if (!fid_is_zero(rr->rr_fid2)) {
+                       CDEBUG(D_INFO, "%s: name %s can not find "DFID"\n",
+                              mdt2obd_dev(info->mti_mdt)->obd_name,
+                              (char *)rr->rr_name, PFID(mdt_object_fid(mc)));
+                       GOTO(unlock_parent, rc = -ENOENT);
+               }
+               CDEBUG(D_INFO, "%s: name %s: "DFID" is another MDT\n",
+                      mdt2obd_dev(info->mti_mdt)->obd_name,
+                      (char *)rr->rr_name, PFID(mdt_object_fid(mc)));
+
+               if (info->mti_spec.sp_rm_entry) {
+                       struct lu_ucred *uc  = mdt_ucred(info);
+
+                       if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) {
+                               CERROR("%s: unlink remote entry is only "
+                                      "permitted for administrator: rc = %d\n",
+                                       mdt2obd_dev(info->mti_mdt)->obd_name,
+                                       -EPERM);
+                               GOTO(unlock_parent, rc = -EPERM);
+                       }
+
+                       ma->ma_need = MA_INODE;
+                       ma->ma_valid = 0;
+                       mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
+                       rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
+                                       NULL, lname, ma);
+                       mdt_object_put(info->mti_env, mc);
+                       GOTO(unlock_parent, rc);
+               }
+               /* Revoke the LOOKUP lock of the remote object granted by
+                * this MDT. Since the unlink will happen on another MDT,
+                * it will release the LOOKUP lock right away. Then What
+                * would happen if another client try to grab the LOOKUP
+                * lock at the same time with unlink XXX */
+               mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP,
+                               MDT_CROSS_LOCK);
+               repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+               LASSERT(repbody != NULL);
+               repbody->fid1 = *mdt_object_fid(mc);
+               repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+               mdt_object_unlock_put(info, mc, child_lh, rc);
+               GOTO(unlock_parent, rc = -EREMOTE);
+       } else if (info->mti_spec.sp_rm_entry) {
+               CERROR("%s: lfs rmdir should not be used on local dir %s\n",
+                      mdt2obd_dev(info->mti_mdt)->obd_name,
+                      (char *)rr->rr_name);
+               mdt_object_put(info->mti_env, mc);
+               GOTO(unlock_parent, rc = -EPERM);
+       }
+
         rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
                              MDT_CROSS_LOCK);
-        if (rc != 0) {
-                mdt_object_put(info->mti_env, mc);
-                GOTO(out_unlock_parent, rc);
-        }
+       if (rc != 0) {
+               mdt_object_put(info->mti_env, mc);
+               GOTO(unlock_parent, rc);
+       }
 
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
@@ -752,8 +828,10 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         EXIT;
 
         mdt_object_unlock_put(info, mc, child_lh, rc);
-out_unlock_parent:
-        mdt_object_unlock_put(info, mp, parent_lh, rc);
+unlock_parent:
+       mdt_object_unlock(info, mp, parent_lh, rc);
+put_parent:
+       mdt_object_put(info->mti_env, mp);
 out:
         return rc;
 }
@@ -1189,13 +1267,14 @@ typedef int (*mdt_reinter)(struct mdt_thread_info *info,
                            struct mdt_lock_handle *lhc);
 
 static mdt_reinter reinters[REINT_MAX] = {
-        [REINT_SETATTR]  = mdt_reint_setattr,
-        [REINT_CREATE]   = mdt_reint_create,
-        [REINT_LINK]     = mdt_reint_link,
-        [REINT_UNLINK]   = mdt_reint_unlink,
-        [REINT_RENAME]   = mdt_reint_rename,
-        [REINT_OPEN]     = mdt_reint_open,
-        [REINT_SETXATTR] = mdt_reint_setxattr
+       [REINT_SETATTR]  = mdt_reint_setattr,
+       [REINT_CREATE]   = mdt_reint_create,
+       [REINT_LINK]     = mdt_reint_link,
+       [REINT_UNLINK]   = mdt_reint_unlink,
+       [REINT_RENAME]   = mdt_reint_rename,
+       [REINT_OPEN]     = mdt_reint_open,
+       [REINT_SETXATTR] = mdt_reint_setxattr,
+       [REINT_RMENTRY]  = mdt_reint_unlink
 };
 
 int mdt_reint_rec(struct mdt_thread_info *info,