Whamcloud - gitweb
LU-14470 dne: striped mkdir replay by client request
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index 701863e..9579e9b 100644 (file)
@@ -498,6 +498,7 @@ static int mdt_create(struct mdt_thread_info *info)
        struct md_op_spec *spec = &info->mti_spec;
        struct lu_ucred *uc = mdt_ucred(info);
        bool restripe = false;
+       bool recreate_obj = false;
        int rc;
 
        ENTRY;
@@ -573,28 +574,56 @@ static int mdt_create(struct mdt_thread_info *info)
            parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
                GOTO(put_parent, rc = -EPERM);
 
+       info->mti_spec.sp_replay = req_is_replay(mdt_info_req(info));
+
        /*
         * LU-10235: check if name exists locklessly first to avoid massive
         * lock recalls on existing directories.
         */
-       rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
-                                     &info->mti_tmp_fid1, 1);
+       rc = mdo_lookup(info->mti_env, mdt_object_child(parent), &rr->rr_name,
+                       &info->mti_tmp_fid1, &info->mti_spec);
        if (rc == 0) {
-               if (!restripe)
+               /* mkdir may be partially executed: name entry was successfully
+                * inserted into parent diretory on remote MDT, while target not
+                * created on local MDT. This happens when update log recovery
+                * is aborted, and mkdir is replayed by client request.
+                */
+               if (unlikely(!(info->mti_spec.sp_replay &&
+                              mdt_object_remote(parent)) &&
+                            !restripe))
                        GOTO(put_parent, rc = -EEXIST);
 
-               rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
-                                 ma);
-       }
+               child = mdt_object_find(info->mti_env, info->mti_mdt,
+                                       &info->mti_tmp_fid1);
+               if (unlikely(IS_ERR(child)))
+                       GOTO(put_parent, rc = PTR_ERR(child));
 
-       /* -ENOENT is expected here */
-       if (rc != -ENOENT)
+               if (mdt_object_exists(child)) {
+                       mdt_object_put(info->mti_env, child);
+                       rc = -EEXIST;
+                       if (restripe)
+                               rc = mdt_restripe(info, parent, &rr->rr_name,
+                                                 rr->rr_fid2, spec, ma);
+                       GOTO(put_parent, rc);
+               }
+               mdt_object_put(info->mti_env, child);
+               recreate_obj = true;
+       } else if (rc != -ENOENT) {
                GOTO(put_parent, rc);
+       }
 
-       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP, cfs_fail_val);
+       if (unlikely(info->mti_spec.sp_replay)) {
+               /* check version only during replay */
+               rc = mdt_version_check(mdt_info_req(info), ENOENT_VERSION, 1);
+               if (rc)
+                       GOTO(put_parent, rc);
+       } else {
+               CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP,
+                                cfs_fail_val);
 
-       /* save version of file name for replay, it must be ENOENT here */
-       mdt_enoent_version_save(info, 1);
+               /* save version of file name for replay, must be ENOENT here */
+               mdt_enoent_version_save(info, 1);
+       }
 
        CFS_RACE(OBD_FAIL_MDS_CREATE_RACE);
 
@@ -617,7 +646,7 @@ static int mdt_create(struct mdt_thread_info *info)
         */
        rc = mdo_lookup(info->mti_env, mdt_object_child(parent), &rr->rr_name,
                        &info->mti_tmp_fid1, &info->mti_spec);
-       if (unlikely(rc == 0))
+       if (unlikely(rc == 0 && !recreate_obj))
                GOTO(unlock_parent, rc = -EEXIST);
 
        child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
@@ -657,12 +686,23 @@ static int mdt_create(struct mdt_thread_info *info)
 
        rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
                        mdt_object_child(child), &info->mti_spec, ma);
-       if (rc == 0)
-               rc = mdt_attr_get_complex(info, child, ma);
+       if (rc < 0)
+               GOTO(put_child, rc);
 
+       if (S_ISDIR(ma->ma_attr.la_mode) &&
+           (info->mti_spec.sp_cr_flags & MDS_MKDIR_LMV))
+               mdt_prep_ma_buf_from_rep(info, child, ma, 0);
+
+       rc = mdt_attr_get_complex(info, child, ma);
        if (rc < 0)
                GOTO(put_child, rc);
 
+       if (ma->ma_valid & MA_LMV) {
+               mdt_dump_lmv(D_INFO, ma->ma_lmv);
+               repbody->mbo_eadatasize = ma->ma_lmv_size;
+               repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
+       }
+
        /* save child locks to eliminate dependey between 'mkdir a' and
         * 'mkdir a/b' if b is a remote directory
         */