Whamcloud - gitweb
LU-5147 doc: design docs in documentation dir
[fs/lustre-release.git] / lustre / lod / lod_object.c
index 7e84a61..aab61a3 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
  */
 /*
  * lustre/lod/lod_object.c
@@ -33,7 +33,7 @@
  * local OSD object interface to the MDD layer, and abstracts the
  * addressing of local (OSD) and remote (OSP) objects. The API is
  * described in the file lustre/include/dt_object.h and in
- * lustre/doc/osd-api.txt.
+ * Documentation/osd-api.txt.
  *
  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
  */
 
 #include <obd.h>
 #include <obd_class.h>
-#include <lustre_ver.h>
 #include <obd_support.h>
-#include <lprocfs_status.h>
 
 #include <lustre_fid.h>
-#include <lustre_param.h>
-#include <lustre_fid.h>
+#include <lustre_linkea.h>
 #include <lustre_lmv.h>
+#include <lustre_param.h>
+#include <lustre_swab.h>
+#include <lustre_ver.h>
+#include <lprocfs_status.h>
 #include <md_object.h>
-#include <lustre_linkea.h>
 
 #include "lod_internal.h"
 
@@ -59,6 +59,7 @@ static const char dot[] = ".";
 static const char dotdot[] = "..";
 
 static const struct dt_body_operations lod_body_lnk_ops;
+static const struct dt_body_operations lod_body_ops;
 
 /**
  * Implementation of dt_index_operations::dio_lookup
@@ -439,13 +440,17 @@ static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
        struct lod_object       *lo = lod_dt_obj(it->lit_obj);
        struct dt_object        *next;
 
-       LOD_CHECK_STRIPED_IT(env, it, lo);
+       /* If lit_it == NULL, then it means the sub_it has been finished,
+        * which only happens in failure cases, see lod_striped_it_next() */
+       if (it->lit_it != NULL) {
+               LOD_CHECK_STRIPED_IT(env, it, lo);
 
-       next = lo->ldo_stripe[it->lit_stripe_index];
-       LASSERT(next != NULL);
-       LASSERT(next->do_index_ops != NULL);
+               next = lo->ldo_stripe[it->lit_stripe_index];
+               LASSERT(next != NULL);
+               LASSERT(next->do_index_ops != NULL);
 
-       next->do_index_ops->dio_it.fini(env, it->lit_it);
+               next->do_index_ops->dio_it.fini(env, it->lit_it);
+       }
 
        /* the iterator not in use any more */
        it->lit_obj = NULL;
@@ -562,13 +567,14 @@ again:
 
        next->do_index_ops->dio_it.put(env, it->lit_it);
        next->do_index_ops->dio_it.fini(env, it->lit_it);
+       it->lit_it = NULL;
 
+       next = lo->ldo_stripe[it->lit_stripe_index];
+       LASSERT(next != NULL);
        rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
        if (rc != 0)
                RETURN(rc);
 
-       next = lo->ldo_stripe[it->lit_stripe_index];
-       LASSERT(next != NULL);
        LASSERT(next->do_index_ops != NULL);
 
        it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr);
@@ -853,7 +859,8 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo,
                                goto next;
                }
 
-               len = snprintf(name, FID_LEN + 1, DFID":", PFID(&ent->lde_fid));
+               len = snprintf(name, sizeof(name),
+                              DFID":", PFID(&ent->lde_fid));
                /* The ent->lde_name is composed of ${FID}:${index} */
                if (ent->lde_namelen < len + 1 ||
                    memcmp(ent->lde_name, name, len) != 0) {
@@ -1176,7 +1183,8 @@ static int lod_declare_attr_set(const struct lu_env *env,
                        RETURN(0);
        } else {
                if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
-                                       LA_ATIME | LA_MTIME | LA_CTIME)))
+                                       LA_ATIME | LA_MTIME | LA_CTIME |
+                                       LA_FLAGS)))
                        RETURN(rc);
        }
        /*
@@ -1267,7 +1275,8 @@ static int lod_attr_set(const struct lu_env *env,
                        RETURN(0);
        } else {
                if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
-                                       LA_ATIME | LA_MTIME | LA_CTIME)))
+                                       LA_ATIME | LA_MTIME | LA_CTIME |
+                                       LA_FLAGS)))
                        RETURN(rc);
        }
 
@@ -1614,19 +1623,18 @@ out:
 }
 
 /**
- * Create a striped directory.
+ * Declare create a striped directory.
  *
- * Create a striped directory with a given stripe pattern on the specified MDTs.
- * A striped directory is represented as a regular directory - an index listing
- * all the stripes. The stripes point back to the master object with ".." and
- * LinkEA. The master object gets LMV EA which identifies it as a striped
- * directory. The function allocates FIDs for all the stripes.
+ * Declare creating a striped directory with a given stripe pattern on the
+ * specified MDTs. A striped directory is represented as a regular directory
+ * - an index listing all the stripes. The stripes point back to the master
+ * object with ".." and LinkEA. The master object gets LMV EA which
+ * identifies it as a striped directory. The function allocates FIDs
+ * for all stripes.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object
  * \param[in] attr     attributes to initialize the objects with
- * \param[in] lum      a pattern specifying the number of stripes and
- *                     MDT to start from
  * \param[in] dof      type of objects to be created
  * \param[in] th       transaction handle
  *
@@ -1779,6 +1787,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
        struct dt_object        **stripe;
        __u32                   stripe_count;
        int                     *idx_array;
+       __u32                   master_index;
        int                     rc = 0;
        __u32                   i;
        __u32                   j;
@@ -1790,10 +1799,6 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
 
        stripe_count = le32_to_cpu(lum->lum_stripe_count);
 
-       /* shrink the stripe_count to the avaible MDT count */
-       if (stripe_count > lod->lod_remote_mdt_count + 1)
-               stripe_count = lod->lod_remote_mdt_count + 1;
-
        OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
        if (stripe == NULL)
                RETURN(-ENOMEM);
@@ -1802,6 +1807,9 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
        if (idx_array == NULL)
                GOTO(out_free, rc = -ENOMEM);
 
+       /* Start index will be the master MDT */
+       master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
+       idx_array[0] = master_index;
        for (i = 0; i < stripe_count; i++) {
                struct lod_tgt_desc     *tgt = NULL;
                struct dt_object        *dto;
@@ -1810,44 +1818,42 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
                struct lu_object_conf   conf = { 0 };
                struct dt_device        *tgt_dt = NULL;
 
-               if (i == 0) {
-                       /* Right now, master stripe and master object are
-                        * on the same MDT */
-                       idx = le32_to_cpu(lum->lum_stripe_offset);
-                       rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
-                                          NULL);
-                       if (rc < 0)
-                               GOTO(out_put, rc);
-                       tgt_dt = lod->lod_child;
-                       goto next;
-               }
-
-               idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
-
+               /* Try to find next avaible target */
+               idx = idx_array[i];
                for (j = 0; j < lod->lod_remote_mdt_count;
                     j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
                        bool already_allocated = false;
                        __u32 k;
 
-                       CDEBUG(D_INFO, "try idx %d, mdt cnt %u,"
-                              " allocated %u, last allocated %d\n", idx,
-                              lod->lod_remote_mdt_count, i, idx_array[i - 1]);
+                       CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n",
+                              idx, lod->lod_remote_mdt_count + 1, i);
+                       if (idx == master_index) {
+                               /* Allocate the FID locally */
+                               rc = obd_fid_alloc(env, lod->lod_child_exp,
+                                                  &fid, NULL);
+                               if (rc < 0)
+                                       GOTO(out_put, rc);
+                               tgt_dt = lod->lod_child;
+                               break;
+                       }
 
                        /* Find next available target */
                        if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
                                continue;
 
-                       /* check whether the idx already exists
-                        * in current allocated array */
-                       for (k = 0; k < i; k++) {
-                               if (idx_array[k] == idx) {
-                                       already_allocated = true;
-                                       break;
+                       if (likely(!OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) {
+                               /* check whether the idx already exists
+                                * in current allocated array */
+                               for (k = 0; k < i; k++) {
+                                       if (idx_array[k] == idx) {
+                                               already_allocated = true;
+                                               break;
+                                       }
                                }
-                       }
 
-                       if (already_allocated)
-                               continue;
+                               if (already_allocated)
+                                       continue;
+                       }
 
                        /* check the status of the OSP */
                        tgt = LTD_TGT(ltd, idx);
@@ -1878,11 +1884,13 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
                        break;
                }
 
-               CDEBUG(D_INFO, "idx %d, mdt cnt %u,"
-                      " allocated %u, last allocated %d\n", idx,
-                      lod->lod_remote_mdt_count, i, idx_array[i - 1]);
-
-next:
+               CDEBUG(D_INFO, "Get idx %d, for stripe %d "DFID"\n",
+                      idx, i, PFID(&fid));
+               idx_array[i] = idx;
+               /* Set the start index for next stripe allocation */
+               if (i < stripe_count - 1)
+                       idx_array[i + 1] = (idx + 1) %
+                                          (lod->lod_remote_mdt_count + 1);
                /* tgt_dt and fid must be ready after search avaible OSP
                 * in the above loop */
                LASSERT(tgt_dt != NULL);
@@ -1894,7 +1902,6 @@ next:
                if (IS_ERR(dto))
                        GOTO(out_put, rc = PTR_ERR(dto));
                stripe[i] = dto;
-               idx_array[i] = idx;
        }
 
        lo->ldo_dir_striped = 1;
@@ -1985,7 +1992,6 @@ out:
        RETURN(rc);
 }
 
-
 /**
  * Implementation of dt_object_operations::do_declare_xattr_set.
  *
@@ -2023,16 +2029,19 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env,
        if (rc != 0)
                RETURN(rc);
 
+       /* Note: Do not set LinkEA on sub-stripes, otherwise
+        * it will confuse the fid2path process(see mdt_path_current()).
+        * The linkEA between master and sub-stripes is set in
+        * lod_xattr_set_lmv(). */
+       if (strcmp(name, XATTR_NAME_LINK) == 0)
+               RETURN(0);
+
        /* set xattr to each stripes, if needed */
        rc = lod_load_striping(env, lo);
        if (rc != 0)
                RETURN(rc);
 
-       /* Note: Do not set LinkEA on sub-stripes, otherwise
-        * it will confuse the fid2path process(see mdt_path_current()).
-        * The linkEA between master and sub-stripes is set in
-        * lod_xattr_set_lmv(). */
-       if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
+       if (lo->ldo_stripenr == 0)
                RETURN(0);
 
        for (i = 0; i < lo->ldo_stripenr; i++) {
@@ -2048,6 +2057,85 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env,
 }
 
 /**
+ * Reset parent FID on OST object
+ *
+ * Replace parent FID with @dt object FID, which is only called during migration
+ * to reset the parent FID after the MDT object is migrated to the new MDT, i.e.
+ * the FID is changed.
+ *
+ * \param[in] env execution environment
+ * \param[in] dt dt_object whose stripes's parent FID will be reset
+ * \parem[in] th thandle
+ * \param[in] declare if it is declare
+ *
+ * \retval     0 if reset succeeds
+ * \retval     negative errno if reset fais
+ */
+static int lod_object_replace_parent_fid(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        struct thandle *th, bool declare)
+{
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct lu_buf *buf = &info->lti_buf;
+       struct filter_fid *ff;
+       int i, rc;
+       ENTRY;
+
+       LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr));
+
+       /* set xattr to each stripes, if needed */
+       rc = lod_load_striping(env, lo);
+       if (rc != 0)
+               RETURN(rc);
+
+       if (lo->ldo_stripenr == 0)
+               RETURN(0);
+
+       if (info->lti_ea_store_size < sizeof(*ff)) {
+               rc = lod_ea_store_resize(info, sizeof(*ff));
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       buf->lb_buf = info->lti_ea_store;
+       buf->lb_len = info->lti_ea_store_size;
+
+       for (i = 0; i < lo->ldo_stripenr; i++) {
+               if (lo->ldo_stripe[i] == NULL)
+                       continue;
+
+               rc = dt_xattr_get(env, lo->ldo_stripe[i], buf,
+                                 XATTR_NAME_FID);
+               if (rc < 0) {
+                       rc = 0;
+                       continue;
+               }
+
+               ff = buf->lb_buf;
+               fid_le_to_cpu(&ff->ff_parent, &ff->ff_parent);
+               ff->ff_parent.f_seq = lu_object_fid(&dt->do_lu)->f_seq;
+               ff->ff_parent.f_oid = lu_object_fid(&dt->do_lu)->f_oid;
+               fid_cpu_to_le(&ff->ff_parent, &ff->ff_parent);
+
+               if (declare) {
+                       rc = lod_sub_object_declare_xattr_set(env,
+                                               lo->ldo_stripe[i], buf,
+                                               XATTR_NAME_FID,
+                                               LU_XATTR_REPLACE, th);
+               } else {
+                       rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i],
+                                                     buf, XATTR_NAME_FID,
+                                                     LU_XATTR_REPLACE, th);
+               }
+               if (rc < 0)
+                       break;
+       }
+
+       RETURN(rc);
+}
+
+/**
  * Implementation of dt_object_operations::do_declare_xattr_set.
  *
  * \see dt_object_operations::do_declare_xattr_set() in the API description
@@ -2093,6 +2181,8 @@ static int lod_declare_xattr_set(const struct lu_env *env,
                rc = lod_declare_striped_object(env, dt, attr, buf, th);
        } else if (S_ISDIR(mode)) {
                rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
+       } else if (strcmp(name, XATTR_NAME_FID) == 0) {
+               rc = lod_object_replace_parent_fid(env, dt, th, true);
        } else {
                rc = lod_sub_object_declare_xattr_set(env, next, buf, name,
                                                      fl, th);
@@ -2135,7 +2225,8 @@ static void lod_lov_stripe_cache_clear(struct lod_object *lo)
 static int lod_xattr_set_internal(const struct lu_env *env,
                                  struct dt_object *dt,
                                  const struct lu_buf *buf,
-                                 const char *name, int fl, struct thandle *th)
+                                 const char *name, int fl,
+                                 struct thandle *th)
 {
        struct dt_object        *next = dt_object_child(dt);
        struct lod_object       *lo = lod_dt_obj(dt);
@@ -2744,6 +2835,10 @@ static int lod_xattr_set(const struct lu_env *env,
                        rc = lod_striping_create(env, dt, NULL, NULL, th);
                }
                RETURN(rc);
+       } else if (strcmp(name, XATTR_NAME_FID) == 0) {
+               rc = lod_object_replace_parent_fid(env, dt, th, false);
+
+               RETURN(rc);
        }
 
        /* then all other xattr */
@@ -3095,17 +3190,8 @@ static void lod_ah_init(const struct lu_env *env,
        LASSERT(lc->ldo_stripenr == 0);
        LASSERT(lc->ldo_stripe == NULL);
 
-       /*
-        * local object may want some hints
-        * in case of late striping creation, ->ah_init()
-        * can be called with local object existing
-        */
-       if (!dt_object_exists(nextc) || dt_object_remote(nextc)) {
-               struct dt_object *obj;
-
-               obj = (nextp != NULL && dt_object_remote(nextp)) ? NULL : nextp;
-               nextc->do_ops->do_ah_init(env, ah, obj, nextc, child_mode);
-       }
+       if (!dt_object_exists(nextc))
+               nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
 
        if (S_ISDIR(child_mode)) {
                if (lc->ldo_dir_stripe == NULL) {
@@ -3162,8 +3248,6 @@ static void lod_ah_init(const struct lu_env *env,
                        rc = lod_verify_md_striping(d, lum1);
                        if (rc == 0 &&
                                le32_to_cpu(lum1->lum_stripe_count) > 1) {
-                               /* Directory will be striped only if
-                                * stripe_count > 1 */
                                lc->ldo_stripenr =
                                        le32_to_cpu(lum1->lum_stripe_count);
                                lc->ldo_dir_stripe_offset =
@@ -3191,6 +3275,18 @@ static void lod_ah_init(const struct lu_env *env,
                        lc->ldo_dir_stripe_offset = -1;
                }
 
+               /* shrink the stripe_count to the avaible MDT count */
+               if (lc->ldo_stripenr > d->lod_remote_mdt_count + 1 &&
+                   !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))
+                       lc->ldo_stripenr = d->lod_remote_mdt_count + 1;
+
+               /* Directory will be striped only if stripe_count > 1, if
+                * stripe_count == 1, let's reset stripenr = 0 to avoid
+                * create single master stripe and also help to unify the
+                * stripe handling of directories and files */
+               if (lc->ldo_stripenr == 1)
+                       lc->ldo_stripenr = 0;
+
                CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
                       lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
 
@@ -3277,6 +3373,9 @@ static int lod_declare_init_size(const struct lu_env *env,
        LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
        LASSERT(lo->ldo_stripe_size > 0);
 
+       if (lo->ldo_stripenr == 0)
+               RETURN(0);
+
        rc = dt_attr_get(env, next, attr);
        LASSERT(attr->la_valid & LA_SIZE);
        if (rc)
@@ -3417,6 +3516,8 @@ static int lod_declare_object_create(const struct lu_env *env,
 
        if (dof->dof_type == DFT_SYM)
                dt->do_body_ops = &lod_body_lnk_ops;
+       else if (dof->dof_type == DFT_REGULAR)
+               dt->do_body_ops = &lod_body_ops;
 
        /*
         * it's lod_ah_init() that has decided the object will be striped
@@ -3445,10 +3546,44 @@ static int lod_declare_object_create(const struct lu_env *env,
                 * Note: if dah_eadata != NULL, it means creating the
                 * striped directory with specified stripeEA, then it
                 * should ignore the default stripeEA */
-               if ((hint == NULL || hint->dah_eadata == NULL) &&
-                   lo->ldo_dir_stripe_offset != -1 &&
-                   lo->ldo_dir_stripe_offset != ss->ss_node_id)
-                       GOTO(out, rc = -EREMOTE);
+               if (hint != NULL && hint->dah_eadata == NULL) {
+                       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STALE_DIR_LAYOUT))
+                               GOTO(out, rc = -EREMOTE);
+
+                       if (lo->ldo_dir_stripe_offset == -1) {
+                               /* child and parent should be in the same MDT */
+                               if (hint->dah_parent != NULL &&
+                                   dt_object_remote(hint->dah_parent))
+                                       GOTO(out, rc = -EREMOTE);
+                       } else if (lo->ldo_dir_stripe_offset !=
+                                  ss->ss_node_id) {
+                               struct lod_device *lod;
+                               struct lod_tgt_descs *ltd;
+                               struct lod_tgt_desc *tgt = NULL;
+                               bool found_mdt = false;
+                               int i;
+
+                               lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+                               ltd = &lod->lod_mdt_descs;
+                               cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
+                                       tgt = LTD_TGT(ltd, i);
+                                       if (tgt->ltd_index ==
+                                               lo->ldo_dir_stripe_offset) {
+                                               found_mdt = true;
+                                               break;
+                                       }
+                               }
+
+                               /* If the MDT indicated by stripe_offset can be
+                                * found, then tell client to resend the create
+                                * request to the correct MDT, otherwise return
+                                * error to client */
+                               if (found_mdt)
+                                       GOTO(out, rc = -EREMOTE);
+                               else
+                                       GOTO(out, rc = -EINVAL);
+                       }
+               }
 
                /* Orphan object (like migrating object) does not have
                 * lod_dir_stripe, see lod_ah_init */
@@ -3755,11 +3890,6 @@ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
        return dt_object_sync(env, dt_object_child(dt), start, end);
 }
 
-struct lod_slave_locks {
-       int                     lsl_lock_count;
-       struct lustre_handle    lsl_handle[0];
-};
-
 /**
  * Release LDLM locks on the stripes of a striped directory.
  *
@@ -3777,10 +3907,9 @@ struct lod_slave_locks   {
 static int lod_object_unlock_internal(const struct lu_env *env,
                                      struct dt_object *dt,
                                      struct ldlm_enqueue_info *einfo,
-                                     ldlm_policy_data_t *policy)
+                                     union ldlm_policy_data *policy)
 {
-       struct lod_object       *lo = lod_dt_obj(dt);
-       struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
+       struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
        int                     rc = 0;
        int                     i;
        ENTRY;
@@ -3788,16 +3917,10 @@ static int lod_object_unlock_internal(const struct lu_env *env,
        if (slave_locks == NULL)
                RETURN(0);
 
-       for (i = 1; i < slave_locks->lsl_lock_count; i++) {
-               if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
-                       int     rc1;
-
-                       einfo->ei_cbdata = &slave_locks->lsl_handle[i];
-                       rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
-                                              policy);
-                       if (rc1 < 0)
-                               rc = rc == 0 ? rc1 : rc;
-               }
+       for (i = 1; i < slave_locks->count; i++) {
+               if (lustre_handle_is_used(&slave_locks->handles[i]))
+                       ldlm_lock_decref(&slave_locks->handles[i],
+                                        einfo->ei_mode);
        }
 
        RETURN(rc);
@@ -3815,36 +3938,31 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
                             struct ldlm_enqueue_info *einfo,
                             union ldlm_policy_data *policy)
 {
-       struct lod_object       *lo = lod_dt_obj(dt);
-       struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
-       int                     slave_locks_size;
-       int                     rc;
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
+       int slave_locks_size;
+       int i;
        ENTRY;
 
        if (slave_locks == NULL)
                RETURN(0);
 
-       if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
-               RETURN(-ENOTDIR);
-
-       rc = lod_load_striping(env, lo);
-       if (rc != 0)
-               RETURN(rc);
-
+       LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr));
+       LASSERT(lo->ldo_stripenr > 1);
        /* Note: for remote lock for single stripe dir, MDT will cancel
         * the lock by lockh directly */
-       if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
-               RETURN(0);
+       LASSERT(!dt_object_remote(dt_object_child(dt)));
 
-       /* Only cancel slave lock for striped dir */
-       rc = lod_object_unlock_internal(env, dt, einfo, policy);
+       /* locks were unlocked in MDT layer */
+       for (i = 1; i < slave_locks->count; i++)
+               LASSERT(!lustre_handle_is_used(&slave_locks->handles[i]));
 
-       slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
-                          sizeof(slave_locks->lsl_handle[0]);
+       slave_locks_size = sizeof(*slave_locks) + slave_locks->count *
+                          sizeof(slave_locks->handles[0]);
        OBD_FREE(slave_locks, slave_locks_size);
        einfo->ei_cbdata = NULL;
 
-       RETURN(rc);
+       RETURN(0);
 }
 
 /**
@@ -3865,7 +3983,7 @@ static int lod_object_lock(const struct lu_env *env,
        int                     rc = 0;
        int                     i;
        int                     slave_locks_size;
-       struct lod_slave_locks  *slave_locks = NULL;
+       struct lustre_handle_array *slave_locks = NULL;
        ENTRY;
 
        /* remote object lock */
@@ -3887,12 +4005,12 @@ static int lod_object_lock(const struct lu_env *env,
                RETURN(0);
 
        slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
-                          sizeof(slave_locks->lsl_handle[0]);
+                          sizeof(slave_locks->handles[0]);
        /* Freed in lod_object_unlock */
        OBD_ALLOC(slave_locks, slave_locks_size);
        if (slave_locks == NULL)
                RETURN(-ENOMEM);
-       slave_locks->lsl_lock_count = lo->ldo_stripenr;
+       slave_locks->count = lo->ldo_stripenr;
 
        /* striped directory lock */
        for (i = 1; i < lo->ldo_stripenr; i++) {
@@ -3904,12 +4022,35 @@ static int lod_object_lock(const struct lu_env *env,
                                       res_id);
                einfo->ei_res_id = res_id;
 
-               LASSERT(lo->ldo_stripe[i]);
-               rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
-                                   policy);
+               LASSERT(lo->ldo_stripe[i] != NULL);
+               if (likely(dt_object_remote(lo->ldo_stripe[i]))) {
+                       rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh,
+                                           einfo, policy);
+               } else {
+                       struct ldlm_namespace *ns = einfo->ei_namespace;
+                       ldlm_blocking_callback blocking = einfo->ei_cb_local_bl;
+                       ldlm_completion_callback completion = einfo->ei_cb_cp;
+                       __u64   dlmflags = LDLM_FL_ATOMIC_CB;
+
+                       if (einfo->ei_mode == LCK_PW ||
+                           einfo->ei_mode == LCK_EX)
+                               dlmflags |= LDLM_FL_COS_INCOMPAT;
+
+                       /* This only happens if there are mulitple stripes
+                        * on the master MDT, i.e. except stripe0, there are
+                        * other stripes on the Master MDT as well, Only
+                        * happens in the test case right now. */
+                       LASSERT(ns != NULL);
+                       rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS,
+                                                   policy, einfo->ei_mode,
+                                                   &dlmflags, blocking,
+                                                   completion, NULL,
+                                                   NULL, 0, LVB_T_NONE,
+                                                   NULL, &lockh);
+               }
                if (rc != 0)
                        GOTO(out, rc);
-               slave_locks->lsl_handle[i] = lockh;
+               slave_locks->handles[i] = lockh;
        }
 
        einfo->ei_cbdata = slave_locks;
@@ -3994,12 +4135,39 @@ static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
        return lod_sub_object_write(env, dt_object_child(dt), buf, pos, th, iq);
 }
 
+static int lod_declare_punch(const struct lu_env *env, struct dt_object *dt,
+                            __u64 start, __u64 end, struct thandle *th)
+{
+       if (dt_object_remote(dt))
+               return -ENOTSUPP;
+
+       return lod_sub_object_declare_punch(env, dt_object_child(dt), start,
+                                           end, th);
+}
+
+static int lod_punch(const struct lu_env *env, struct dt_object *dt,
+                    __u64 start, __u64 end, struct thandle *th)
+{
+       if (dt_object_remote(dt))
+               return -ENOTSUPP;
+
+       return lod_sub_object_punch(env, dt_object_child(dt), start, end, th);
+}
+
 static const struct dt_body_operations lod_body_lnk_ops = {
        .dbo_read               = lod_read,
        .dbo_declare_write      = lod_declare_write,
        .dbo_write              = lod_write
 };
 
+static const struct dt_body_operations lod_body_ops = {
+       .dbo_read               = lod_read,
+       .dbo_declare_write      = lod_declare_write,
+       .dbo_write              = lod_write,
+       .dbo_declare_punch      = lod_declare_punch,
+       .dbo_punch              = lod_punch,
+};
+
 /**
  * Implementation of lu_object_operations::loo_object_init.
  *
@@ -4119,8 +4287,16 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
  */
 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
 {
-       if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
+       if (S_ISLNK(o->lo_header->loh_attr & S_IFMT)) {
                lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
+       } else if (S_ISREG(o->lo_header->loh_attr & S_IFMT) ||
+                  fid_is_local_file(lu_object_fid(o))) {
+               /* Note: some local file (like last rcvd) is created
+                * through bottom layer (OSD), so the object initialization
+                * comes to lod, it does not set loh_attr yet, so
+                * set do_body_ops for local file anyway */
+               lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_ops;
+       }
        return 0;
 }