Whamcloud - gitweb
LU-5147 doc: design docs in documentation dir
[fs/lustre-release.git] / lustre / lod / lod_object.c
index 948a675..aab61a3 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
  */
 /*
  * lustre/lod/lod_object.c
@@ -33,7 +33,7 @@
  * local OSD object interface to the MDD layer, and abstracts the
  * addressing of local (OSD) and remote (OSP) objects. The API is
  * described in the file lustre/include/dt_object.h and in
- * lustre/doc/osd-api.txt.
+ * Documentation/osd-api.txt.
  *
  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
  */
 
 #include <obd.h>
 #include <obd_class.h>
-#include <lustre_ver.h>
 #include <obd_support.h>
-#include <lprocfs_status.h>
 
 #include <lustre_fid.h>
-#include <lustre_param.h>
-#include <lustre_fid.h>
+#include <lustre_linkea.h>
 #include <lustre_lmv.h>
+#include <lustre_param.h>
+#include <lustre_swab.h>
+#include <lustre_ver.h>
+#include <lprocfs_status.h>
 #include <md_object.h>
-#include <lustre_linkea.h>
 
 #include "lod_internal.h"
 
@@ -571,12 +571,12 @@ again:
 
        next = lo->ldo_stripe[it->lit_stripe_index];
        LASSERT(next != NULL);
-       LASSERT(next->do_index_ops != NULL);
-
        rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
        if (rc != 0)
                RETURN(rc);
 
+       LASSERT(next->do_index_ops != NULL);
+
        it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr);
        if (!IS_ERR(it_next)) {
                it->lit_it = it_next;
@@ -1799,11 +1799,6 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
 
        stripe_count = le32_to_cpu(lum->lum_stripe_count);
 
-       /* shrink the stripe_count to the avaible MDT count */
-       if (stripe_count > lod->lod_remote_mdt_count + 1 &&
-           !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))
-               stripe_count = lod->lod_remote_mdt_count + 1;
-
        OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
        if (stripe == NULL)
                RETURN(-ENOMEM);
@@ -1893,7 +1888,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
                       idx, i, PFID(&fid));
                idx_array[i] = idx;
                /* Set the start index for next stripe allocation */
-               if (i < stripe_count)
+               if (i < stripe_count - 1)
                        idx_array[i + 1] = (idx + 1) %
                                           (lod->lod_remote_mdt_count + 1);
                /* tgt_dt and fid must be ready after search avaible OSP
@@ -1997,7 +1992,6 @@ out:
        RETURN(rc);
 }
 
-
 /**
  * Implementation of dt_object_operations::do_declare_xattr_set.
  *
@@ -2035,16 +2029,19 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env,
        if (rc != 0)
                RETURN(rc);
 
+       /* Note: Do not set LinkEA on sub-stripes, otherwise
+        * it will confuse the fid2path process(see mdt_path_current()).
+        * The linkEA between master and sub-stripes is set in
+        * lod_xattr_set_lmv(). */
+       if (strcmp(name, XATTR_NAME_LINK) == 0)
+               RETURN(0);
+
        /* set xattr to each stripes, if needed */
        rc = lod_load_striping(env, lo);
        if (rc != 0)
                RETURN(rc);
 
-       /* Note: Do not set LinkEA on sub-stripes, otherwise
-        * it will confuse the fid2path process(see mdt_path_current()).
-        * The linkEA between master and sub-stripes is set in
-        * lod_xattr_set_lmv(). */
-       if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
+       if (lo->ldo_stripenr == 0)
                RETURN(0);
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
@@ -2060,6 +2057,85 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env,
 }
 
 /**
+ * Reset parent FID on OST object
+ *
+ * Replace parent FID with @dt object FID, which is only called during migration
+ * to reset the parent FID after the MDT object is migrated to the new MDT, i.e.
+ * the FID is changed.
+ *
+ * \param[in] env execution environment
+ * \param[in] dt dt_object whose stripes's parent FID will be reset
+ * \parem[in] th thandle
+ * \param[in] declare if it is declare
+ *
+ * \retval     0 if reset succeeds
+ * \retval     negative errno if reset fais
+ */
+static int lod_object_replace_parent_fid(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        struct thandle *th, bool declare)
+{
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct lu_buf *buf = &info->lti_buf;
+       struct filter_fid *ff;
+       int i, rc;
+       ENTRY;
+
+       LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr));
+
+       /* set xattr to each stripes, if needed */
+       rc = lod_load_striping(env, lo);
+       if (rc != 0)
+               RETURN(rc);
+
+       if (lo->ldo_stripenr == 0)
+               RETURN(0);
+
+       if (info->lti_ea_store_size < sizeof(*ff)) {
+               rc = lod_ea_store_resize(info, sizeof(*ff));
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       buf->lb_buf = info->lti_ea_store;
+       buf->lb_len = info->lti_ea_store_size;
+
+       for (i = 0; i < lo->ldo_stripenr; i++) {
+               if (lo->ldo_stripe[i] == NULL)
+                       continue;
+
+               rc = dt_xattr_get(env, lo->ldo_stripe[i], buf,
+                                 XATTR_NAME_FID);
+               if (rc < 0) {
+                       rc = 0;
+                       continue;
+               }
+
+               ff = buf->lb_buf;
+               fid_le_to_cpu(&ff->ff_parent, &ff->ff_parent);
+               ff->ff_parent.f_seq = lu_object_fid(&dt->do_lu)->f_seq;
+               ff->ff_parent.f_oid = lu_object_fid(&dt->do_lu)->f_oid;
+               fid_cpu_to_le(&ff->ff_parent, &ff->ff_parent);
+
+               if (declare) {
+                       rc = lod_sub_object_declare_xattr_set(env,
+                                               lo->ldo_stripe[i], buf,
+                                               XATTR_NAME_FID,
+                                               LU_XATTR_REPLACE, th);
+               } else {
+                       rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i],
+                                                     buf, XATTR_NAME_FID,
+                                                     LU_XATTR_REPLACE, th);
+               }
+               if (rc < 0)
+                       break;
+       }
+
+       RETURN(rc);
+}
+
+/**
  * Implementation of dt_object_operations::do_declare_xattr_set.
  *
  * \see dt_object_operations::do_declare_xattr_set() in the API description
@@ -2105,6 +2181,8 @@ static int lod_declare_xattr_set(const struct lu_env *env,
                rc = lod_declare_striped_object(env, dt, attr, buf, th);
        } else if (S_ISDIR(mode)) {
                rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
+       } else if (strcmp(name, XATTR_NAME_FID) == 0) {
+               rc = lod_object_replace_parent_fid(env, dt, th, true);
        } else {
                rc = lod_sub_object_declare_xattr_set(env, next, buf, name,
                                                      fl, th);
@@ -2757,6 +2835,10 @@ static int lod_xattr_set(const struct lu_env *env,
                        rc = lod_striping_create(env, dt, NULL, NULL, th);
                }
                RETURN(rc);
+       } else if (strcmp(name, XATTR_NAME_FID) == 0) {
+               rc = lod_object_replace_parent_fid(env, dt, th, false);
+
+               RETURN(rc);
        }
 
        /* then all other xattr */
@@ -3108,17 +3190,8 @@ static void lod_ah_init(const struct lu_env *env,
        LASSERT(lc->ldo_stripenr == 0);
        LASSERT(lc->ldo_stripe == NULL);
 
-       /*
-        * local object may want some hints
-        * in case of late striping creation, ->ah_init()
-        * can be called with local object existing
-        */
-       if (!dt_object_exists(nextc) || dt_object_remote(nextc)) {
-               struct dt_object *obj;
-
-               obj = (nextp != NULL && dt_object_remote(nextp)) ? NULL : nextp;
-               nextc->do_ops->do_ah_init(env, ah, obj, nextc, child_mode);
-       }
+       if (!dt_object_exists(nextc))
+               nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
 
        if (S_ISDIR(child_mode)) {
                if (lc->ldo_dir_stripe == NULL) {
@@ -3175,8 +3248,6 @@ static void lod_ah_init(const struct lu_env *env,
                        rc = lod_verify_md_striping(d, lum1);
                        if (rc == 0 &&
                                le32_to_cpu(lum1->lum_stripe_count) > 1) {
-                               /* Directory will be striped only if
-                                * stripe_count > 1 */
                                lc->ldo_stripenr =
                                        le32_to_cpu(lum1->lum_stripe_count);
                                lc->ldo_dir_stripe_offset =
@@ -3204,6 +3275,18 @@ static void lod_ah_init(const struct lu_env *env,
                        lc->ldo_dir_stripe_offset = -1;
                }
 
+               /* shrink the stripe_count to the avaible MDT count */
+               if (lc->ldo_stripenr > d->lod_remote_mdt_count + 1 &&
+                   !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))
+                       lc->ldo_stripenr = d->lod_remote_mdt_count + 1;
+
+               /* Directory will be striped only if stripe_count > 1, if
+                * stripe_count == 1, let's reset stripenr = 0 to avoid
+                * create single master stripe and also help to unify the
+                * stripe handling of directories and files */
+               if (lc->ldo_stripenr == 1)
+                       lc->ldo_stripenr = 0;
+
                CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
                       lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
 
@@ -3290,6 +3373,9 @@ static int lod_declare_init_size(const struct lu_env *env,
        LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
        LASSERT(lo->ldo_stripe_size > 0);
 
+       if (lo->ldo_stripenr == 0)
+               RETURN(0);
+
        rc = dt_attr_get(env, next, attr);
        LASSERT(attr->la_valid & LA_SIZE);
        if (rc)
@@ -3460,10 +3546,44 @@ static int lod_declare_object_create(const struct lu_env *env,
                 * Note: if dah_eadata != NULL, it means creating the
                 * striped directory with specified stripeEA, then it
                 * should ignore the default stripeEA */
-               if ((hint == NULL || hint->dah_eadata == NULL) &&
-                   lo->ldo_dir_stripe_offset != -1 &&
-                   lo->ldo_dir_stripe_offset != ss->ss_node_id)
-                       GOTO(out, rc = -EREMOTE);
+               if (hint != NULL && hint->dah_eadata == NULL) {
+                       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STALE_DIR_LAYOUT))
+                               GOTO(out, rc = -EREMOTE);
+
+                       if (lo->ldo_dir_stripe_offset == -1) {
+                               /* child and parent should be in the same MDT */
+                               if (hint->dah_parent != NULL &&
+                                   dt_object_remote(hint->dah_parent))
+                                       GOTO(out, rc = -EREMOTE);
+                       } else if (lo->ldo_dir_stripe_offset !=
+                                  ss->ss_node_id) {
+                               struct lod_device *lod;
+                               struct lod_tgt_descs *ltd;
+                               struct lod_tgt_desc *tgt = NULL;
+                               bool found_mdt = false;
+                               int i;
+
+                               lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+                               ltd = &lod->lod_mdt_descs;
+                               cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
+                                       tgt = LTD_TGT(ltd, i);
+                                       if (tgt->ltd_index ==
+                                               lo->ldo_dir_stripe_offset) {
+                                               found_mdt = true;
+                                               break;
+                                       }
+                               }
+
+                               /* If the MDT indicated by stripe_offset can be
+                                * found, then tell client to resend the create
+                                * request to the correct MDT, otherwise return
+                                * error to client */
+                               if (found_mdt)
+                                       GOTO(out, rc = -EREMOTE);
+                               else
+                                       GOTO(out, rc = -EINVAL);
+                       }
+               }
 
                /* Orphan object (like migrating object) does not have
                 * lod_dir_stripe, see lod_ah_init */
@@ -3770,11 +3890,6 @@ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
        return dt_object_sync(env, dt_object_child(dt), start, end);
 }
 
-struct lod_slave_locks {
-       int                     lsl_lock_count;
-       struct lustre_handle    lsl_handle[0];
-};
-
 /**
  * Release LDLM locks on the stripes of a striped directory.
  *
@@ -3792,9 +3907,9 @@ struct lod_slave_locks    {
 static int lod_object_unlock_internal(const struct lu_env *env,
                                      struct dt_object *dt,
                                      struct ldlm_enqueue_info *einfo,
-                                     ldlm_policy_data_t *policy)
+                                     union ldlm_policy_data *policy)
 {
-       struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
+       struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
        int                     rc = 0;
        int                     i;
        ENTRY;
@@ -3802,9 +3917,9 @@ static int lod_object_unlock_internal(const struct lu_env *env,
        if (slave_locks == NULL)
                RETURN(0);
 
-       for (i = 1; i < slave_locks->lsl_lock_count; i++) {
-               if (lustre_handle_is_used(&slave_locks->lsl_handle[i]))
-                       ldlm_lock_decref(&slave_locks->lsl_handle[i],
+       for (i = 1; i < slave_locks->count; i++) {
+               if (lustre_handle_is_used(&slave_locks->handles[i]))
+                       ldlm_lock_decref(&slave_locks->handles[i],
                                         einfo->ei_mode);
        }
 
@@ -3823,32 +3938,31 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
                             struct ldlm_enqueue_info *einfo,
                             union ldlm_policy_data *policy)
 {
-       struct lod_object       *lo = lod_dt_obj(dt);
-       struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
-       int                     slave_locks_size;
-       int                     rc;
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
+       int slave_locks_size;
+       int i;
        ENTRY;
 
        if (slave_locks == NULL)
                RETURN(0);
 
-       if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
-               RETURN(-ENOTDIR);
-
+       LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr));
+       LASSERT(lo->ldo_stripenr > 1);
        /* Note: for remote lock for single stripe dir, MDT will cancel
         * the lock by lockh directly */
-       if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
-               RETURN(0);
+       LASSERT(!dt_object_remote(dt_object_child(dt)));
 
-       /* Only cancel slave lock for striped dir */
-       rc = lod_object_unlock_internal(env, dt, einfo, policy);
+       /* locks were unlocked in MDT layer */
+       for (i = 1; i < slave_locks->count; i++)
+               LASSERT(!lustre_handle_is_used(&slave_locks->handles[i]));
 
-       slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
-                          sizeof(slave_locks->lsl_handle[0]);
+       slave_locks_size = sizeof(*slave_locks) + slave_locks->count *
+                          sizeof(slave_locks->handles[0]);
        OBD_FREE(slave_locks, slave_locks_size);
        einfo->ei_cbdata = NULL;
 
-       RETURN(rc);
+       RETURN(0);
 }
 
 /**
@@ -3869,7 +3983,7 @@ static int lod_object_lock(const struct lu_env *env,
        int                     rc = 0;
        int                     i;
        int                     slave_locks_size;
-       struct lod_slave_locks  *slave_locks = NULL;
+       struct lustre_handle_array *slave_locks = NULL;
        ENTRY;
 
        /* remote object lock */
@@ -3891,12 +4005,12 @@ static int lod_object_lock(const struct lu_env *env,
                RETURN(0);
 
        slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
-                          sizeof(slave_locks->lsl_handle[0]);
+                          sizeof(slave_locks->handles[0]);
        /* Freed in lod_object_unlock */
        OBD_ALLOC(slave_locks, slave_locks_size);
        if (slave_locks == NULL)
                RETURN(-ENOMEM);
-       slave_locks->lsl_lock_count = lo->ldo_stripenr;
+       slave_locks->count = lo->ldo_stripenr;
 
        /* striped directory lock */
        for (i = 1; i < lo->ldo_stripenr; i++) {
@@ -3918,6 +4032,10 @@ static int lod_object_lock(const struct lu_env *env,
                        ldlm_completion_callback completion = einfo->ei_cb_cp;
                        __u64   dlmflags = LDLM_FL_ATOMIC_CB;
 
+                       if (einfo->ei_mode == LCK_PW ||
+                           einfo->ei_mode == LCK_EX)
+                               dlmflags |= LDLM_FL_COS_INCOMPAT;
+
                        /* This only happens if there are mulitple stripes
                         * on the master MDT, i.e. except stripe0, there are
                         * other stripes on the Master MDT as well, Only
@@ -3932,7 +4050,7 @@ static int lod_object_lock(const struct lu_env *env,
                }
                if (rc != 0)
                        GOTO(out, rc);
-               slave_locks->lsl_handle[i] = lockh;
+               slave_locks->handles[i] = lockh;
        }
 
        einfo->ei_cbdata = slave_locks;