Whamcloud - gitweb
LU-14521 flr: delete mirror without volatile file
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index b839eaa..040d261 100644 (file)
@@ -2038,7 +2038,7 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
        struct ldlm_lock        *lease;
        struct mdt_object       *o1 = o, *o2 = NULL;
        bool                     lease_broken;
-       bool                     swap_objects;
+       bool                     swap_objects = false;
        int                      rc;
        ENTRY;
 
@@ -2056,38 +2056,52 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
                RETURN(-EINVAL);
 
        rc = lu_fid_cmp(&data->cd_fid, mdt_object_fid(o));
-       if (unlikely(rc == 0))
-               RETURN(-EINVAL);
+       if (rc == 0) {
+               /**
+                * only MDS_CLOSE_LAYOUT_SPLIT use the same fid to indicate
+                * mirror deletion, so we'd zero cd_fid, and keeps o2 be NULL.
+                */
+               if (!(ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT))
+                       RETURN(-EINVAL);
 
-       /* Exchange o1 and o2, to enforce locking order */
-       swap_objects = (rc < 0);
+               /* zero cd_fid to keeps o2 be NULL */
+               fid_zero(&data->cd_fid);
+       } else if (rc < 0) {
+               /* Exchange o1 and o2, to enforce locking order */
+               swap_objects = true;
+       }
 
        lease = ldlm_handle2lock(&data->cd_handle);
        if (lease == NULL)
                RETURN(-ESTALE);
 
-       o2 = mdt_object_find(info->mti_env, info->mti_mdt, &data->cd_fid);
-       if (IS_ERR(o2))
-               GOTO(out_lease, rc = PTR_ERR(o2));
+       if (!fid_is_zero(&data->cd_fid)) {
+               o2 = mdt_object_find(info->mti_env, info->mti_mdt,
+                                    &data->cd_fid);
+               if (IS_ERR(o2))
+                       GOTO(out_lease, rc = PTR_ERR(o2));
 
-       if (!mdt_object_exists(o2))
-               GOTO(out_obj, rc = -ENOENT);
+               if (!mdt_object_exists(o2))
+                       GOTO(out_obj, rc = -ENOENT);
 
-       if (!S_ISREG(lu_object_attr(&o2->mot_obj)))
-               GOTO(out_obj, rc = -EINVAL);
+               if (!S_ISREG(lu_object_attr(&o2->mot_obj)))
+                       GOTO(out_obj, rc = -EINVAL);
 
-       if (swap_objects)
-               swap(o1, o2);
+               if (swap_objects)
+                       swap(o1, o2);
+       }
 
        rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
                           MAY_WRITE);
        if (rc < 0)
                GOTO(out_obj, rc);
 
-       rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
-                          MAY_WRITE);
-       if (rc < 0)
-               GOTO(out_obj, rc);
+       if (o2) {
+               rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2),
+                                  NULL, MAY_WRITE);
+               if (rc < 0)
+                       GOTO(out_obj, rc);
+       }
 
        /* try to hold open_sem so that nobody else can open the file */
        if (!down_write_trylock(&o->mot_open_sem)) {
@@ -2117,11 +2131,13 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
        if (rc < 0)
                GOTO(out_unlock_sem, rc);
 
-       mdt_lock_reg_init(lh2, LCK_EX);
-       rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
-                            MDS_INODELOCK_XATTR);
-       if (rc < 0)
-               GOTO(out_unlock1, rc);
+       if (o2) {
+               mdt_lock_reg_init(lh2, LCK_EX);
+               rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
+                                    MDS_INODELOCK_XATTR);
+               if (rc < 0)
+                       GOTO(out_unlock1, rc);
+       }
 
        /* Swap layout with orphan object */
        if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SWAP) {
@@ -2132,7 +2148,21 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
                struct lu_buf *buf = &info->mti_buf;
                struct md_rejig_data mrd;
 
-               mrd.mrd_obj = mdt_object_child(o == o1 ? o2 : o1);
+               if (o2) {
+                       mrd.mrd_obj = mdt_object_child(o == o1 ? o2 : o1);
+               } else {
+                       if (!(ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT)) {
+                               /* paranoid check again */
+                               CERROR(DFID
+                                 ":only mirror split support NULL o2 object\n",
+                                       PFID(mdt_object_fid(o)));
+                               GOTO(out_unlock1, rc = -EINVAL);
+                       }
+
+                       /* set NULL mrd_obj for deleting mirror objects */
+                       mrd.mrd_obj = NULL;
+               }
+
                if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT)
                        mrd.mrd_mirror_id = data->cd_mirror_id;
 
@@ -2164,7 +2194,8 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
 
 out_unlock2:
        /* Release exclusive LL */
-       mdt_object_unlock(info, o2, lh2, 1);
+       if (o2)
+               mdt_object_unlock(info, o2, lh2, 1);
 
 out_unlock1:
        mdt_object_unlock(info, o1, lh1, 1);
@@ -2182,9 +2213,12 @@ out_unlock_sem:
        }
 
 out_obj:
-       /* Callee takes care of o, we must put the other one. We know
-        * that o1 != o2 from check of lu_fid_cmp() above. */
-       mdt_object_put(info->mti_env, o1 != o ? o1 : o2);
+       if (o1 != o)
+               /* the 2nd object has been used, and swapped to o1 */
+               mdt_object_put(info->mti_env, o1);
+       else if (o2)
+               /* the 2nd object has been used, and not swapped */
+               mdt_object_put(info->mti_env, o2);
 
        ldlm_reprocess_all(lease->l_resource, lease);