Whamcloud - gitweb
LU-7230 llite: clear dir stripe md in ll_iget 77/16677/15
authorDi Wang <di.wang@intel.com>
Thu, 8 Oct 2015 07:51:16 +0000 (00:51 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 3 Nov 2015 20:01:55 +0000 (20:01 +0000)
If ll_iget fails during inode initialization, especially
during striped directory lookup after creation failed,
then it should clear stripe MD before make_bad_inode(),
because make_bad_inode() will reset the i_mode, which
can cause ll_clear_inode() skip freeing those stripe MD.

Remove the name entry from the directory, once creation
failed. Note: this will not rollback all of local
operation, and LFSCK will take care of the orphan object.

Add sanity.sh 300p to verify the case.

And also enable lfs rm_entry for local object as well,
because sometimes it is quite possible to create the
local corrupted striped directory, and we might need
use "lfs rm_entry" to delete the corrupted striped dir.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I892c52117b83c8348aa0ceb888e73c84e79ffe46
Reviewed-on: http://review.whamcloud.com/16677
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
lustre/include/obd_support.h
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/namei.c
lustre/mdd/mdd_dir.c
lustre/mdt/mdt_reint.c
lustre/target/out_lib.c
lustre/target/update_trans.c
lustre/tests/sanity.sh

index 0f58fb0..a35309b 100644 (file)
@@ -562,6 +562,7 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_OUT_UPDATE_NET_REP    0x1701
 #define OBD_FAIL_SPLIT_UPDATE_REC      0x1702
 #define OBD_FAIL_LARGE_STRIPE          0x1703
+#define OBD_FAIL_OUT_ENOSPC             0x1704
 
 /* MIGRATE */
 #define OBD_FAIL_MIGRATE_NET_REP               0x1800
index 9054f77..d1a8e9b 100644 (file)
@@ -896,6 +896,7 @@ int ll_fill_super(struct super_block *sb, struct vfsmount *mnt);
 void ll_put_super(struct super_block *sb);
 void ll_kill_super(struct super_block *sb);
 struct inode *ll_inode_from_resource_lock(struct ldlm_lock *lock);
+void ll_dir_clear_lsm_md(struct inode *inode);
 void ll_clear_inode(struct inode *inode);
 int ll_setattr_raw(struct dentry *dentry, struct iattr *attr, bool hsm_import);
 int ll_setattr(struct dentry *de, struct iattr *attr);
index 72b178e..8dd092b 100644 (file)
@@ -1187,7 +1187,7 @@ struct inode *ll_inode_from_resource_lock(struct ldlm_lock *lock)
        return inode;
 }
 
-static void ll_dir_clear_lsm_md(struct inode *inode)
+void ll_dir_clear_lsm_md(struct inode *inode)
 {
        struct ll_inode_info *lli = ll_i2info(inode);
 
index 29e379e..9b92265 100644 (file)
@@ -128,6 +128,12 @@ struct inode *ll_iget(struct super_block *sb, ino_t hash,
                        rc = cl_file_inode_init(inode, md);
 
                if (rc != 0) {
+                       /* Let's clear directory lsm here, otherwise
+                        * make_bad_inode() will reset the inode mode
+                        * to regular, then ll_clear_inode will not
+                        * be able to clear lsm_md */
+                       if (S_ISDIR(inode->i_mode))
+                               ll_dir_clear_lsm_md(inode);
                        make_bad_inode(inode);
                        unlock_new_inode(inode);
                        iput(inode);
@@ -140,6 +146,8 @@ struct inode *ll_iget(struct super_block *sb, ino_t hash,
                CDEBUG(D_VFSTRACE, "got inode: "DFID"(%p): rc = %d\n",
                       PFID(&md->body->mbo_fid1), inode, rc);
                if (rc != 0) {
+                       if (S_ISDIR(inode->i_mode))
+                               ll_dir_clear_lsm_md(inode);
                        iput(inode);
                        inode = ERR_PTR(rc);
                }
index 185dad9..1d89027 100644 (file)
@@ -2293,6 +2293,47 @@ unlock:
        RETURN(rc);
 }
 
+static int mdd_index_delete(const struct lu_env *env,
+                           struct mdd_object *mdd_pobj,
+                           struct lu_attr *cattr,
+                           const struct lu_name *lname)
+{
+       struct mdd_device *mdd = mdo2mdd(&mdd_pobj->mod_obj);
+       struct thandle *handle;
+       int rc;
+       ENTRY;
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name,
+                                     handle);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       if (S_ISDIR(cattr->la_mode)) {
+               rc = mdo_declare_ref_del(env, mdd_pobj, handle);
+               if (rc != 0)
+                       GOTO(stop, rc);
+       }
+
+       /* Since this will only be used in the error handler path,
+        * Let's set the thandle to be local and not mess the transno */
+       handle->th_local = 1;
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = __mdd_index_delete(env, mdd_pobj, lname->ln_name,
+                               S_ISDIR(cattr->la_mode), handle);
+       if (rc)
+               GOTO(stop, rc);
+stop:
+       mdd_trans_stop(env, mdd, rc, handle);
+       RETURN(rc);
+}
+
 /*
  * Create object and insert it into namespace.
  */
@@ -2480,8 +2521,16 @@ out_volatile:
                                NULL, handle);
 out_stop:
        rc2 = mdd_trans_stop(env, mdd, rc, handle);
-       if (rc == 0)
+       if (rc == 0) {
+               /* If creation fails, it is most likely due to the remote update
+                * failure, because local transaction will mostly succeed at
+                * this stage. There is no easy way to rollback all of previous
+                * updates, so let's remove the object from namespace, and
+                * LFSCK should handle the orphan object. */
+               if (rc2 < 0 && !mdd_object_remote(mdd_pobj))
+                       mdd_index_delete(env, mdd_pobj, attr, lname);
                rc = rc2;
+       }
 out_free:
        if (is_vmalloc_addr(ldata->ld_buf))
                /* if we vmalloced a large buffer drop it */
index 430900f..fda9a8c 100644 (file)
@@ -866,6 +866,23 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
 
         child_lh = &info->mti_lh[MDT_LH_CHILD];
         mdt_lock_reg_init(child_lh, LCK_EX);
+       if (info->mti_spec.sp_rm_entry) {
+               struct lu_ucred *uc  = mdt_ucred(info);
+
+               if (!mdt_is_dne_client(req->rq_export))
+                       /* Return -ENOTSUPP for old client */
+                       GOTO(put_child, rc = -ENOTSUPP);
+
+               if (!md_capable(uc, CFS_CAP_SYS_ADMIN))
+                       GOTO(put_child, rc = -EPERM);
+
+               ma->ma_need = MA_INODE;
+               ma->ma_valid = 0;
+               rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
+                               NULL, &rr->rr_name, ma, no_name);
+               GOTO(put_child, rc);
+       }
+
        if (mdt_object_remote(mc)) {
                struct mdt_body  *repbody;
 
@@ -883,23 +900,6 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                        /* Return -ENOTSUPP for old client */
                        GOTO(put_child, rc = -ENOTSUPP);
 
-               if (info->mti_spec.sp_rm_entry) {
-                       struct lu_ucred *uc  = mdt_ucred(info);
-
-                       if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) {
-                               CERROR("%s: unlink remote entry is only "
-                                      "permitted for administrator: rc = %d\n",
-                                       mdt_obd_name(info->mti_mdt),
-                                       -EPERM);
-                               GOTO(put_child, rc = -EPERM);
-                       }
-
-                       ma->ma_need = MA_INODE;
-                       ma->ma_valid = 0;
-                       rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
-                                       NULL, &rr->rr_name, ma, no_name);
-                       GOTO(put_child, rc);
-               }
                /* Revoke the LOOKUP lock of the remote object granted by
                 * this MDT. Since the unlink will happen on another MDT,
                 * it will release the LOOKUP lock right away. Then What
@@ -911,14 +911,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                repbody->mbo_fid1 = *mdt_object_fid(mc);
                repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
                GOTO(unlock_child, rc = -EREMOTE);
-       } else if (info->mti_spec.sp_rm_entry) {
-               rc = -EPERM;
-               CDEBUG(D_INFO, "%s: no rm_entry on local dir '"DNAME"': "
-                      "rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), PNAME(&rr->rr_name), rc);
-               GOTO(put_child, rc);
        }
-
        /* We used to acquire MDS_INODELOCK_FULL here but we can't do
         * this now because a running HSM restore on the child (unlink
         * victim) will hold the layout lock. See LU-4002. */
index e6a0879..aa6c209 100644 (file)
@@ -665,13 +665,17 @@ static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
               PFID(lu_object_fid(&dt_obj->do_lu)), arg->u.write.pos,
               arg->u.write.buf.lb_buf, (unsigned long)arg->u.write.buf.lb_len);
 
-       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
-       rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
-                            &arg->u.write.pos, th);
-       dt_write_unlock(env, dt_obj);
-
-       if (rc == 0)
-               rc = arg->u.write.buf.lb_len;
+       if (OBD_FAIL_CHECK(OBD_FAIL_OUT_ENOSPC)) {
+               rc = -ENOSPC;
+       } else {
+               dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+               rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
+                                    &arg->u.write.pos, th);
+               dt_write_unlock(env, dt_obj);
+
+               if (rc == 0)
+                       rc = arg->u.write.buf.lb_len;
+       }
 
        if (arg->reply != NULL)
                object_update_result_insert(arg->reply, NULL, 0, arg->index,
index d3fd4c1..db92713 100644 (file)
@@ -773,6 +773,11 @@ int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
        ENTRY;
 
        if (tmt == NULL) {
+               if (th->th_sync)
+                       top_th->tt_master_sub_thandle->th_sync = th->th_sync;
+               if (th->th_local)
+                       top_th->tt_master_sub_thandle->th_local = th->th_local;
+               top_th->tt_master_sub_thandle->th_tags = th->th_tags;
                rc = dt_trans_start(env, top_th->tt_master_sub_thandle->th_dev,
                                    top_th->tt_master_sub_thandle);
                RETURN(rc);
@@ -788,7 +793,8 @@ int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
                        continue;
                if (th->th_sync)
                        st->st_sub_th->th_sync = th->th_sync;
-               st->st_sub_th->th_local = th->th_local;
+               if (th->th_local)
+                       st->st_sub_th->th_local = th->th_local;
                st->st_sub_th->th_tags = th->th_tags;
                rc = dt_trans_start(env, st->st_sub_th->th_dev,
                                    st->st_sub_th);
@@ -922,6 +928,12 @@ int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
 
        if (likely(top_th->tt_multiple_thandle == NULL)) {
                LASSERT(master_dev != NULL);
+
+               if (th->th_sync)
+                       top_th->tt_master_sub_thandle->th_sync = th->th_sync;
+               if (th->th_local)
+                       top_th->tt_master_sub_thandle->th_local = th->th_local;
+               top_th->tt_master_sub_thandle->th_tags = th->th_tags;
                rc = dt_trans_stop(env, master_dev,
                                   top_th->tt_master_sub_thandle);
                OBD_FREE_PTR(top_th);
@@ -975,7 +987,8 @@ stop_master_trans:
        /* Step 2: Stop the transaction on the master MDT, and fill the
         * master transno in the update logs to other MDT. */
        if (master_st != NULL && master_st->st_sub_th != NULL) {
-               master_st->st_sub_th->th_local = th->th_local;
+               if (th->th_local)
+                       master_st->st_sub_th->th_local = th->th_local;
                if (th->th_sync)
                        master_st->st_sub_th->th_sync = th->th_sync;
                master_st->st_sub_th->th_tags = th->th_tags;
@@ -1033,7 +1046,8 @@ stop_other_trans:
 
                if (th->th_sync)
                        st->st_sub_th->th_sync = th->th_sync;
-               st->st_sub_th->th_local = th->th_local;
+               if (th->th_local)
+                       st->st_sub_th->th_local = th->th_local;
                st->st_sub_th->th_tags = th->th_tags;
                st->st_sub_th->th_result = th->th_result;
                rc = dt_trans_stop(env, st->st_sub_th->th_dev,
index f43e434..fae421c 100755 (executable)
@@ -14077,6 +14077,22 @@ test_300o() {
 }
 run_test 300o "unlink big sub stripe(> 65000 subdirs)"
 
+test_300p() {
+       [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+
+       mkdir -p $DIR/$tdir
+
+       #define OBD_FAIL_OUT_ENOSPC     0x1704
+       do_facet mds2 lctl set_param fail_loc=0x80001704
+       $LFS setdirstripe -c2 $DIR/$tdir/bad_striped_dir > /dev/null 2>&1 &&
+                       error "create striped directory should fail"
+
+       [ -e $DIR/$tdir/bad_striped_dir ] && error "striped dir exists"
+       true
+}
+run_test 300p "create striped directory without space"
+
 prepare_remote_file() {
        mkdir $DIR/$tdir/src_dir ||
                error "create remote source failed"