Whamcloud - gitweb
LU-6977 lod: do_index_try should be called first
[fs/lustre-release.git] / lustre / lod / lod_object.c
index f191742..ff13bda 100644 (file)
@@ -571,12 +571,12 @@ again:
 
        next = lo->ldo_stripe[it->lit_stripe_index];
        LASSERT(next != NULL);
-       LASSERT(next->do_index_ops != NULL);
-
        rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
        if (rc != 0)
                RETURN(rc);
 
+       LASSERT(next->do_index_ops != NULL);
+
        it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr);
        if (!IS_ERR(it_next)) {
                it->lit_it = it_next;
@@ -2035,16 +2035,19 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env,
        if (rc != 0)
                RETURN(rc);
 
+       /* Note: Do not set LinkEA on sub-stripes, otherwise
+        * it will confuse the fid2path process(see mdt_path_current()).
+        * The linkEA between master and sub-stripes is set in
+        * lod_xattr_set_lmv(). */
+       if (strcmp(name, XATTR_NAME_LINK) == 0)
+               RETURN(0);
+
        /* set xattr to each stripes, if needed */
        rc = lod_load_striping(env, lo);
        if (rc != 0)
                RETURN(rc);
 
-       /* Note: Do not set LinkEA on sub-stripes, otherwise
-        * it will confuse the fid2path process(see mdt_path_current()).
-        * The linkEA between master and sub-stripes is set in
-        * lod_xattr_set_lmv(). */
-       if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
+       if (lo->ldo_stripenr == 0)
                RETURN(0);
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
@@ -2060,6 +2063,85 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env,
 }
 
 /**
+ * Reset parent FID on OST object
+ *
+ * Replace parent FID with @dt object FID, which is only called during migration
+ * to reset the parent FID after the MDT object is migrated to the new MDT, i.e.
+ * the FID is changed.
+ *
+ * \param[in] env execution environment
+ * \param[in] dt dt_object whose stripes's parent FID will be reset
+ * \parem[in] th thandle
+ * \param[in] declare if it is declare
+ *
+ * \retval     0 if reset succeeds
+ * \retval     negative errno if reset fais
+ */
+static int lod_object_replace_parent_fid(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        struct thandle *th, bool declare)
+{
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct lu_buf *buf = &info->lti_buf;
+       struct filter_fid *ff;
+       int i, rc;
+       ENTRY;
+
+       LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr));
+
+       /* set xattr to each stripes, if needed */
+       rc = lod_load_striping(env, lo);
+       if (rc != 0)
+               RETURN(rc);
+
+       if (lo->ldo_stripenr == 0)
+               RETURN(0);
+
+       if (info->lti_ea_store_size < sizeof(*ff)) {
+               rc = lod_ea_store_resize(info, sizeof(*ff));
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       buf->lb_buf = info->lti_ea_store;
+       buf->lb_len = info->lti_ea_store_size;
+
+       for (i = 0; i < lo->ldo_stripenr; i++) {
+               if (lo->ldo_stripe[i] == NULL)
+                       continue;
+
+               rc = dt_xattr_get(env, lo->ldo_stripe[i], buf,
+                                 XATTR_NAME_FID);
+               if (rc < 0) {
+                       rc = 0;
+                       continue;
+               }
+
+               ff = buf->lb_buf;
+               fid_le_to_cpu(&ff->ff_parent, &ff->ff_parent);
+               ff->ff_parent.f_seq = lu_object_fid(&dt->do_lu)->f_seq;
+               ff->ff_parent.f_oid = lu_object_fid(&dt->do_lu)->f_oid;
+               fid_cpu_to_le(&ff->ff_parent, &ff->ff_parent);
+
+               if (declare) {
+                       rc = lod_sub_object_declare_xattr_set(env,
+                                               lo->ldo_stripe[i], buf,
+                                               XATTR_NAME_FID,
+                                               LU_XATTR_REPLACE, th);
+               } else {
+                       rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i],
+                                                     buf, XATTR_NAME_FID,
+                                                     LU_XATTR_REPLACE, th);
+               }
+               if (rc < 0)
+                       break;
+       }
+
+       RETURN(rc);
+}
+
+/**
  * Implementation of dt_object_operations::do_declare_xattr_set.
  *
  * \see dt_object_operations::do_declare_xattr_set() in the API description
@@ -2105,6 +2187,8 @@ static int lod_declare_xattr_set(const struct lu_env *env,
                rc = lod_declare_striped_object(env, dt, attr, buf, th);
        } else if (S_ISDIR(mode)) {
                rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
+       } else if (strcmp(name, XATTR_NAME_FID) == 0) {
+               rc = lod_object_replace_parent_fid(env, dt, th, true);
        } else {
                rc = lod_sub_object_declare_xattr_set(env, next, buf, name,
                                                      fl, th);
@@ -2757,6 +2841,10 @@ static int lod_xattr_set(const struct lu_env *env,
                        rc = lod_striping_create(env, dt, NULL, NULL, th);
                }
                RETURN(rc);
+       } else if (strcmp(name, XATTR_NAME_FID) == 0) {
+               rc = lod_object_replace_parent_fid(env, dt, th, false);
+
+               RETURN(rc);
        }
 
        /* then all other xattr */
@@ -3108,17 +3196,8 @@ static void lod_ah_init(const struct lu_env *env,
        LASSERT(lc->ldo_stripenr == 0);
        LASSERT(lc->ldo_stripe == NULL);
 
-       /*
-        * local object may want some hints
-        * in case of late striping creation, ->ah_init()
-        * can be called with local object existing
-        */
-       if (!dt_object_exists(nextc) || dt_object_remote(nextc)) {
-               struct dt_object *obj;
-
-               obj = (nextp != NULL && dt_object_remote(nextp)) ? NULL : nextp;
-               nextc->do_ops->do_ah_init(env, ah, obj, nextc, child_mode);
-       }
+       if (!dt_object_exists(nextc))
+               nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
 
        if (S_ISDIR(child_mode)) {
                if (lc->ldo_dir_stripe == NULL) {
@@ -3460,10 +3539,20 @@ static int lod_declare_object_create(const struct lu_env *env,
                 * Note: if dah_eadata != NULL, it means creating the
                 * striped directory with specified stripeEA, then it
                 * should ignore the default stripeEA */
-               if ((hint == NULL || hint->dah_eadata == NULL) &&
-                   lo->ldo_dir_stripe_offset != -1 &&
-                   lo->ldo_dir_stripe_offset != ss->ss_node_id)
-                       GOTO(out, rc = -EREMOTE);
+               if (hint != NULL && hint->dah_eadata == NULL) {
+                       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STALE_DIR_LAYOUT))
+                               GOTO(out, rc = -EREMOTE);
+
+                       if (lo->ldo_dir_stripe_offset == -1) {
+                               /* child and parent should be in the same MDT */
+                               if (hint->dah_parent != NULL &&
+                                   dt_object_remote(hint->dah_parent))
+                                       GOTO(out, rc = -EREMOTE);
+                       } else if (lo->ldo_dir_stripe_offset !=
+                                  ss->ss_node_id) {
+                               GOTO(out, rc = -EREMOTE);
+                       }
+               }
 
                /* Orphan object (like migrating object) does not have
                 * lod_dir_stripe, see lod_ah_init */