Whamcloud - gitweb
LU-6475 mdt: race between open and migrate
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
index 67b95a6..17b654d 100644 (file)
@@ -3205,6 +3205,7 @@ static int mdd_migrate_xattrs(const struct lu_env *env,
        int                     list_xsize;
        struct lu_buf           list_xbuf;
        int                     rc;
+       int                     rc1;
 
        /* retrieve xattr list from the old object */
        list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL);
@@ -3279,7 +3280,9 @@ static int mdd_migrate_xattrs(const struct lu_env *env,
                if (rc != 0)
                        GOTO(stop_trans, rc);
 stop_trans:
-               mdd_trans_stop(env, mdd, rc, handle);
+               rc1 = mdd_trans_stop(env, mdd, rc, handle);
+               if (rc == 0)
+                       rc = rc1;
                if (rc != 0)
                        GOTO(out, rc);
 next:
@@ -3397,7 +3400,7 @@ static int mdd_migrate_create(const struct lu_env *env,
                        RETURN(rc);
                }
                spec->u.sp_symname = link_buf.lb_buf;
-       } else if S_ISREG(la->la_mode) {
+       } else if (S_ISREG(la->la_mode)) {
                /* retrieve lov of the old object */
                rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
                if (rc != 0 && rc != -ENODATA)
@@ -3473,8 +3476,13 @@ static int mdd_migrate_create(const struct lu_env *env,
        la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
        rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
 stop_trans:
-       if (handle != NULL)
-               mdd_trans_stop(env, mdd, rc, handle);
+       if (handle != NULL) {
+               int rc1;
+
+               rc1 = mdd_trans_stop(env, mdd, rc, handle);
+               if (rc == 0)
+                       rc = rc1;
+       }
 out_free:
        if (lmm_buf.lb_buf != NULL)
                OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
@@ -3529,6 +3537,7 @@ static int mdd_migrate_entries(const struct lu_env *env,
                int                     recsize;
                int                     is_dir;
                bool                    target_exist = false;
+               int                     rc1;
 
                len = iops->key_size(env, it);
                if (len == 0)
@@ -3556,6 +3565,7 @@ static int mdd_migrate_entries(const struct lu_env *env,
                if (IS_ERR(child))
                        GOTO(out, rc = PTR_ERR(child));
 
+               mdd_write_lock(env, child, MOR_SRC_CHILD);
                is_dir = S_ISDIR(mdd_object_type(child));
 
                snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
@@ -3658,8 +3668,12 @@ static int mdd_migrate_entries(const struct lu_env *env,
                                             strlen(name), handle);
 
 out_put:
+               mdd_write_unlock(env, child);
                mdd_object_put(env, child);
-               mdd_trans_stop(env, mdd, rc, handle);
+               rc1 = mdd_trans_stop(env, mdd, rc, handle);
+               if (rc == 0)
+                       rc = rc1;
+
                if (rc != 0)
                        GOTO(out, rc);
 next:
@@ -3734,6 +3748,13 @@ static int mdd_declare_migrate_update_name(const struct lu_env *env,
                                           handle);
                if (rc != 0)
                        return rc;
+
+               handle->th_complex = 1;
+               rc = mdo_declare_xattr_set(env, mdd_tobj, NULL,
+                                          XATTR_NAME_FID,
+                                          LU_XATTR_REPLACE, handle);
+               if (rc < 0)
+                       return rc;
        }
 
        if (S_ISDIR(mdd_object_type(mdd_sobj))) {
@@ -3857,6 +3878,12 @@ static int mdd_migrate_update_name(const struct lu_env *env,
                                           handle);
                        if (rc != 0 && rc != -ENODATA)
                                GOTO(stop_trans, rc);
+
+                       rc = mdo_xattr_set(env, mdd_tobj, NULL,
+                                          XATTR_NAME_FID,
+                                          LU_XATTR_REPLACE, handle);
+                       if (rc < 0)
+                               GOTO(stop_trans, rc);
                }
        }
 
@@ -3871,7 +3898,18 @@ static int mdd_migrate_update_name(const struct lu_env *env,
        if (rc != 0)
                GOTO(stop_trans, rc);
 
-       mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
+       mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
+
+       /* Increase mod_count to add the source object to the orphan list,
+        * so if other clients still send RPC to the old object, then these
+        * objects can help the request to find the new object, see
+        * mdt_reint_open() */
+       mdd_sobj->mod_count++;
+       rc = mdd_finish_unlink(env, mdd_sobj, ma, mdd_pobj, lname, handle);
+       mdd_sobj->mod_count--;
+       if (rc != 0)
+               GOTO(out_unlock, rc);
+
        mdo_ref_del(env, mdd_sobj, handle);
        if (is_dir)
                mdo_ref_del(env, mdd_sobj, handle);
@@ -3883,9 +3921,6 @@ static int mdd_migrate_update_name(const struct lu_env *env,
 
        ma->ma_attr = *so_attr;
        ma->ma_valid |= MA_INODE;
-       rc = mdd_finish_unlink(env, mdd_sobj, ma, mdd_pobj, lname, handle);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
 
        rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
        if (rc != 0)
@@ -3962,15 +3997,15 @@ static int mdd_migrate_sanity_check(const struct lu_env *env,
        if (rc != 0) {
                /* For multiple links files, if there are no linkEA data at all,
                 * means the file might be created before linkEA is enabled, and
-                * all all of its links should not be migrated yet, otherwise
-                * it should have some linkEA there */
+                * all of its links should not be migrated yet, otherwise it
+                * should have some linkEA there */
                if (rc == -ENOENT || rc == -ENODATA)
                        RETURN(1);
                RETURN(rc);
        }
 
-       /* If it is mulitple links file, we need update the name entry for
-        * all parent */
+       /* If there are still links locally, then the file will not be
+        * migrated. */
        LASSERT(ldata->ld_leh != NULL);
        ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
        for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
@@ -4025,7 +4060,7 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
        /* If the file will being migrated, it will check whether
         * the file is being opened by someone else right now */
        mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
-       if (mdd_sobj->mod_count >= 1) {
+       if (mdd_sobj->mod_count > 0) {
                CERROR("%s: "DFID"%s is already opened count %d: rc = %d\n",
                       mdd2obd_dev(mdd)->obd_name,
                       PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
@@ -4080,6 +4115,7 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
                        GOTO(put, rc);
        }
 
+       LASSERT(mdd_object_exists(mdd_tobj));
        /* step 2: migrate xattr */
        rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
        if (rc != 0)
@@ -4097,6 +4133,7 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
                OBD_FAIL_TIMEOUT(OBD_FAIL_MIGRATE_DELAY, cfs_fail_val);
        }
 
+       LASSERT(mdd_object_exists(mdd_tobj));
        /* step 4: update name entry to the new object */
        rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
                                     ma);