Whamcloud - gitweb
LU-12960 lod: don't set index for 2nd stripe if specific
[fs/lustre-release.git] / lustre / lod / lod_object.c
index e8fcb30..fc9efb5 100644 (file)
@@ -367,6 +367,50 @@ static struct dt_index_operations lod_index_ops = {
 };
 
 /**
+ * Implementation of dt_index_operations::dio_lookup
+ *
+ * Used with striped directories.
+ *
+ * \see dt_index_operations::dio_lookup() in the API description for details.
+ */
+static int lod_striped_lookup(const struct lu_env *env, struct dt_object *dt,
+                     struct dt_rec *rec, const struct dt_key *key)
+{
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct dt_object *next;
+       const char *name = (const char *)key;
+
+       LASSERT(lo->ldo_dir_stripe_count > 0);
+
+       if (strcmp(name, dot) == 0) {
+               struct lu_fid *fid = (struct lu_fid *)rec;
+
+               *fid = *lod_object_fid(lo);
+               return 1;
+       }
+
+       if (strcmp(name, dotdot) == 0) {
+               next = dt_object_child(dt);
+       } else {
+               int index;
+
+               index = __lmv_name_to_stripe_index(lo->ldo_dir_hash_type,
+                                                  lo->ldo_dir_stripe_count,
+                                                  lo->ldo_dir_migrate_hash,
+                                                  lo->ldo_dir_migrate_offset,
+                                                  name, strlen(name), true);
+               if (index < 0)
+                       return index;
+
+               next = lo->ldo_stripe[index];
+               if (!next || !dt_object_exists(next))
+                       return -ENODEV;
+       }
+
+       return next->do_index_ops->dio_lookup(env, next, rec, key);
+}
+
+/**
  * Implementation of dt_it_ops::init.
  *
  * Used with striped objects. Internally just initializes the iterator
@@ -744,7 +788,7 @@ static int lod_striped_it_load(const struct lu_env *env,
 }
 
 static struct dt_index_operations lod_striped_index_ops = {
-       .dio_lookup             = lod_lookup,
+       .dio_lookup             = lod_striped_lookup,
        .dio_declare_insert     = lod_declare_insert,
        .dio_insert             = lod_insert,
        .dio_declare_delete     = lod_declare_delete,
@@ -1560,13 +1604,13 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
                        /* The on-disk LMV EA only contains header, but the
                         * returned LMV EA size should contain the space for
                         * the FIDs of all shards of the striped directory. */
-                       if (lmv_is_sane(lmv1))
+                       if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1)
                                rc = lmv_mds_md_size(
                                        le32_to_cpu(lmv1->lmv_stripe_count),
                                        le32_to_cpu(lmv1->lmv_magic));
                } else {
-                       lfm = buf->lb_buf;
-                       if (le32_to_cpu(lfm->lfm_magic) == LMV_MAGIC_FOREIGN)
+                       lmv1 = buf->lb_buf;
+                       if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
                                RETURN(rc);
 
                        if (rc != sizeof(*lmv1))
@@ -1712,7 +1756,8 @@ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
        lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
        lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
        lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
-       if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) {
+       lmm1->lmv_layout_version = cpu_to_le32(lo->ldo_dir_layout_version);
+       if (lod_is_layout_changing(lo)) {
                lmm1->lmv_migrate_hash = cpu_to_le32(lo->ldo_dir_migrate_hash);
                lmm1->lmv_migrate_offset =
                        cpu_to_le32(lo->ldo_dir_migrate_offset);
@@ -1773,8 +1818,7 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
                RETURN(-EINVAL);
 
        LASSERT(lo->ldo_stripe == NULL);
-       OBD_ALLOC(stripe, sizeof(stripe[0]) *
-                 (le32_to_cpu(lmv1->lmv_stripe_count)));
+       OBD_ALLOC_PTR_ARRAY(stripe, le32_to_cpu(lmv1->lmv_stripe_count));
        if (stripe == NULL)
                RETURN(-ENOMEM);
 
@@ -1818,6 +1862,8 @@ out:
        lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count);
        lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
        lo->ldo_dir_layout_version = le32_to_cpu(lmv1->lmv_layout_version);
+       lo->ldo_dir_migrate_offset = le32_to_cpu(lmv1->lmv_migrate_offset);
+       lo->ldo_dir_migrate_hash = le32_to_cpu(lmv1->lmv_migrate_hash);
        lo->ldo_dir_hash_type = le32_to_cpu(lmv1->lmv_hash_type);
        if (rc != 0)
                lod_striping_free_nolock(env, lo);
@@ -1889,31 +1935,74 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env,
                if (!dto)
                        continue;
 
-               rc = lod_sub_declare_create(env, dto, attr, NULL, dof, th);
-               if (rc != 0)
-                       GOTO(out, rc);
+               /* directory split skip create for existing stripes */
+               if (!(lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) {
+                       rc = lod_sub_declare_create(env, dto, attr, NULL, dof,
+                                                   th);
+                       if (rc != 0)
+                               GOTO(out, rc);
 
-               if (!dt_try_as_dir(env, dto))
-                       GOTO(out, rc = -EINVAL);
+                       if (!dt_try_as_dir(env, dto))
+                               GOTO(out, rc = -EINVAL);
 
-               rc = lod_sub_declare_ref_add(env, dto, th);
-               if (rc != 0)
-                       GOTO(out, rc);
+                       rc = lod_sub_declare_ref_add(env, dto, th);
+                       if (rc != 0)
+                               GOTO(out, rc);
 
-               rec->rec_fid = lu_object_fid(&dto->do_lu);
-               rc = lod_sub_declare_insert(env, dto,
-                                           (const struct dt_rec *)rec,
-                                           (const struct dt_key *)dot, th);
-               if (rc != 0)
-                       GOTO(out, rc);
+                       rec->rec_fid = lu_object_fid(&dto->do_lu);
+                       rc = lod_sub_declare_insert(env, dto,
+                                                   (const struct dt_rec *)rec,
+                                                   (const struct dt_key *)dot,
+                                                   th);
+                       if (rc != 0)
+                               GOTO(out, rc);
 
-               /* master stripe FID will be put to .. */
-               rec->rec_fid = lu_object_fid(&dt->do_lu);
-               rc = lod_sub_declare_insert(env, dto,
-                                           (const struct dt_rec *)rec,
-                                           (const struct dt_key *)dotdot, th);
-               if (rc != 0)
-                       GOTO(out, rc);
+                       /* master stripe FID will be put to .. */
+                       rec->rec_fid = lu_object_fid(&dt->do_lu);
+                       rc = lod_sub_declare_insert(env, dto,
+                                                 (const struct dt_rec *)rec,
+                                                 (const struct dt_key *)dotdot,
+                                                 th);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
+                           cfs_fail_val == i)
+                               snprintf(stripe_name, sizeof(info->lti_key),
+                                        DFID":%u",
+                                        PFID(lu_object_fid(&dto->do_lu)),
+                                        i + 1);
+                       else
+                               snprintf(stripe_name, sizeof(info->lti_key),
+                                        DFID":%u",
+                                        PFID(lu_object_fid(&dto->do_lu)), i);
+
+                       sname = lod_name_get(env, stripe_name,
+                                            strlen(stripe_name));
+                       rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
+                                             sname, lu_object_fid(&dt->do_lu));
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+                       linkea_buf.lb_len = ldata.ld_leh->leh_len;
+                       rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf,
+                                                      XATTR_NAME_LINK, 0, th);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       rec->rec_fid = lu_object_fid(&dto->do_lu);
+                       rc = lod_sub_declare_insert(env, dt_object_child(dt),
+                                       (const struct dt_rec *)rec,
+                                       (const struct dt_key *)stripe_name, th);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       rc = lod_sub_declare_ref_add(env, dt_object_child(dt),
+                                                    th);
+                       if (rc != 0)
+                               GOTO(out, rc);
+               }
 
                if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) ||
                    cfs_fail_val != i) {
@@ -1929,39 +2018,6 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env,
                        if (rc != 0)
                                GOTO(out, rc);
                }
-
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
-                   cfs_fail_val == i)
-                       snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
-                               PFID(lu_object_fid(&dto->do_lu)), i + 1);
-               else
-                       snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
-                               PFID(lu_object_fid(&dto->do_lu)), i);
-
-               sname = lod_name_get(env, stripe_name, strlen(stripe_name));
-               rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
-                                     sname, lu_object_fid(&dt->do_lu));
-               if (rc != 0)
-                       GOTO(out, rc);
-
-               linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
-               linkea_buf.lb_len = ldata.ld_leh->leh_len;
-               rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf,
-                                              XATTR_NAME_LINK, 0, th);
-               if (rc != 0)
-                       GOTO(out, rc);
-
-               rec->rec_fid = lu_object_fid(&dto->do_lu);
-               rc = lod_sub_declare_insert(env, dt_object_child(dt),
-                                           (const struct dt_rec *)rec,
-                                           (const struct dt_key *)stripe_name,
-                                           th);
-               if (rc != 0)
-                       GOTO(out, rc);
-
-               rc = lod_sub_declare_ref_add(env, dt_object_child(dt), th);
-               if (rc != 0)
-                       GOTO(out, rc);
        }
 
        rc = lod_sub_declare_xattr_set(env, dt_object_child(dt),
@@ -2019,7 +2075,7 @@ static int lod_mdt_alloc_specific(const struct lu_env *env,
        int rc;
 
        master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
-       if (stripe_count > 1)
+       if (!is_specific && stripe_count > 1)
                /* Set the start index for the 2nd stripe allocation */
                mdt_indices[1] = (mdt_indices[0] + 1) %
                                        (lod->lod_remote_mdt_count + 1);
@@ -2052,17 +2108,17 @@ static int lod_mdt_alloc_specific(const struct lu_env *env,
 
                        /* Sigh, this index is not in the bitmap, let's check
                         * next available target */
-                       if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx) &&
+                       if (!test_bit(idx, ltd->ltd_tgt_bitmap) &&
                            idx != master_index)
                                continue;
 
                        if (idx == master_index) {
                                /* Allocate the FID locally */
-                               rc = obd_fid_alloc(env, lod->lod_child_exp,
-                                                  &fid, NULL);
+                               tgt_dt = lod->lod_child;
+                               rc = dt_fid_alloc(env, tgt_dt, &fid, NULL,
+                                                 NULL);
                                if (rc < 0)
                                        continue;
-                               tgt_dt = lod->lod_child;
                                break;
                        }
 
@@ -2076,7 +2132,7 @@ static int lod_mdt_alloc_specific(const struct lu_env *env,
                                /* this OSP doesn't feel well */
                                continue;
 
-                       rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
+                       rc = dt_fid_alloc(env, tgt_dt, &fid, NULL, NULL);
                        if (rc < 0)
                                continue;
 
@@ -2161,12 +2217,12 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
 
        stripe_count = lo->ldo_dir_stripe_count;
 
-       OBD_ALLOC(stripes, sizeof(stripes[0]) * stripe_count);
+       OBD_ALLOC_PTR_ARRAY(stripes, stripe_count);
        if (!stripes)
                RETURN(-ENOMEM);
 
        /* Allocate the first stripe locally */
-       rc = obd_fid_alloc(env, lod->lod_child_exp, &fid, NULL);
+       rc = dt_fid_alloc(env, lod->lod_child, &fid, NULL, NULL);
        if (rc < 0)
                GOTO(out, rc);
 
@@ -2185,7 +2241,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
                int *idx_array;
                bool is_specific = false;
 
-               OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
+               OBD_ALLOC_PTR_ARRAY(idx_array, stripe_count);
                if (!idx_array)
                        GOTO(out, rc = -ENOMEM);
 
@@ -2201,7 +2257,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
                        lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
                rc = lod_mdt_alloc_specific(env, lo, stripes, idx_array,
                                            is_specific);
-               OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
+               OBD_FREE_PTR_ARRAY(idx_array, stripe_count);
        }
 
        if (rc < 0)
@@ -2228,7 +2284,7 @@ out:
                dt_object_put(env, stripes[0]);
        for (i = 1; i < stripe_count; i++)
                LASSERT(!stripes[i]);
-       OBD_FREE(stripes, sizeof(stripes[0]) * stripe_count);
+       OBD_FREE_PTR_ARRAY(stripes, stripe_count);
 
        return rc;
 }
@@ -2335,6 +2391,7 @@ static int lod_dir_layout_set(const struct lu_env *env,
 {
        struct dt_object *next = dt_object_child(dt);
        struct lod_object *lo = lod_dt_obj(dt);
+       struct lod_device *lod = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
        struct lmv_mds_md_v1 *lmv = buf->lb_buf;
        struct lmv_mds_md_v1 *slave_lmv;
        struct lu_buf slave_buf;
@@ -2343,10 +2400,29 @@ static int lod_dir_layout_set(const struct lu_env *env,
 
        ENTRY;
 
+       if (!lmv_is_sane2(lmv))
+               RETURN(-EINVAL);
+
+       /* adjust hash for dir merge, which may not be set in user command */
+       if (lmv_is_merging(lmv) && !lmv->lmv_migrate_hash)
+               lmv->lmv_merge_hash =
+                       lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern;
+
+       LMV_DEBUG(D_INFO, lmv, "set");
+
        rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV, fl, th);
        if (rc)
                RETURN(rc);
 
+       /* directory restripe may update stripe LMV directly */
+       if (!lo->ldo_dir_stripe_count)
+               RETURN(0);
+
+       lo->ldo_dir_hash_type = le32_to_cpu(lmv->lmv_hash_type);
+       lo->ldo_dir_migrate_offset = le32_to_cpu(lmv->lmv_migrate_offset);
+       lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_migrate_hash);
+       lo->ldo_dir_layout_version = le32_to_cpu(lmv->lmv_layout_version);
+
        OBD_ALLOC_PTR(slave_lmv);
        if (!slave_lmv)
                RETURN(-ENOMEM);
@@ -2368,7 +2444,6 @@ static int lod_dir_layout_set(const struct lu_env *env,
                        break;
        }
 
-       lod_striping_free(env, lod_dt_obj(dt));
        OBD_FREE_PTR(slave_lmv);
 
        RETURN(rc);
@@ -2406,7 +2481,7 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env,
                if (rc != 0)
                        RETURN(rc);
        } else if (strcmp(name, XATTR_NAME_LOV) == 0) {
-               rc = lod_verify_striping(d, lo, buf, false);
+               rc = lod_verify_striping(env, d, lo, buf, false);
                if (rc != 0)
                        RETURN(rc);
        }
@@ -2653,7 +2728,7 @@ static int lod_declare_layout_add(const struct lu_env *env,
        if (lo->ldo_flr_state != LCM_FL_NONE)
                RETURN(-EBUSY);
 
-       rc = lod_verify_striping(d, lo, buf, false);
+       rc = lod_verify_striping(env, d, lo, buf, false);
        if (rc != 0)
                RETURN(rc);
 
@@ -2667,7 +2742,7 @@ static int lod_declare_layout_add(const struct lu_env *env,
                RETURN(-EINVAL);
 
        array_cnt = lo->ldo_comp_cnt + comp_v1->lcm_entry_count;
-       OBD_ALLOC(comp_array, sizeof(*comp_array) * array_cnt);
+       OBD_ALLOC_PTR_ARRAY(comp_array, array_cnt);
        if (comp_array == NULL)
                RETURN(-ENOMEM);
 
@@ -2721,7 +2796,7 @@ static int lod_declare_layout_add(const struct lu_env *env,
                GOTO(error, rc);
        }
 
-       OBD_FREE(old_array, sizeof(*lod_comp) * old_array_cnt);
+       OBD_FREE_PTR_ARRAY(old_array, old_array_cnt);
 
        LASSERT(lo->ldo_mirror_count == 1);
        lo->ldo_mirrors[0].lme_end = array_cnt - 1;
@@ -2737,7 +2812,7 @@ error:
                        lod_comp->llc_pool = NULL;
                }
        }
-       OBD_FREE(comp_array, sizeof(*comp_array) * array_cnt);
+       OBD_FREE_PTR_ARRAY(comp_array, array_cnt);
        RETURN(rc);
 }
 
@@ -3131,13 +3206,13 @@ static int lod_layout_convert(struct lod_thread_info *info)
        }
 
        lcm = info->lti_ea_store;
+       memset(lcm, 0, sizeof(*lcm) + sizeof(*lcme));
        lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1);
        lcm->lcm_size = cpu_to_le32(size);
        lcm->lcm_layout_gen = cpu_to_le32(le16_to_cpu(
                                                lmm_save->lmm_layout_gen));
        lcm->lcm_flags = cpu_to_le16(LCM_FL_NONE);
        lcm->lcm_entry_count = cpu_to_le16(1);
-       lcm->lcm_mirror_count = 0;
 
        lcme = &lcm->lcm_entries[0];
        lcme->lcme_flags = cpu_to_le32(LCME_FL_INIT);
@@ -3710,7 +3785,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
        if (rc != 0)
                RETURN(rc);
 
-       attr->la_valid = LA_ATIME | LA_MTIME | LA_CTIME |
+       attr->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_FLAGS |
                         LA_MODE | LA_UID | LA_GID | LA_TYPE | LA_PROJID;
        dof->dof_type = DFT_DIR;
 
@@ -3743,9 +3818,15 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE))
                        continue;
 
-               /* if it's source stripe of migrating directory, don't create */
-               if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) &&
-                     i >= lo->ldo_dir_migrate_offset)) {
+               /* don't create stripe if:
+                * 1. it's source stripe of migrating directory
+                * 2. it's existed stripe of splitting directory
+                */
+               if ((lod_is_migrating(lo) && i >= lo->ldo_dir_migrate_offset) ||
+                   (lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) {
+                       if (!dt_object_exists(dto))
+                               GOTO(out, rc = -EINVAL);
+               } else {
                        dt_write_lock(env, dto, DT_TGT_CHILD);
                        rc = lod_sub_create(env, dto, attr, NULL, dof, th);
                        if (rc != 0) {
@@ -3766,12 +3847,6 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                                GOTO(out, rc);
                }
 
-               rec->rec_fid = lu_object_fid(&dt->do_lu);
-               rc = lod_sub_insert(env, dto, (struct dt_rec *)rec,
-                                   (const struct dt_key *)dotdot, th);
-               if (rc != 0)
-                       GOTO(out, rc);
-
                if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) ||
                    cfs_fail_val != i) {
                        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) &&
@@ -3788,6 +3863,21 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                                GOTO(out, rc);
                }
 
+               /* don't insert stripe if it's existed stripe of splitting
+                * directory (this directory is striped).
+                * NB, plain directory will insert itself as the first
+                * stripe in target.
+                */
+               if (lod_is_splitting(lo) && lo->ldo_dir_split_offset > 1 &&
+                   lo->ldo_dir_split_offset > i)
+                       continue;
+
+               rec->rec_fid = lu_object_fid(&dt->do_lu);
+               rc = lod_sub_insert(env, dto, (struct dt_rec *)rec,
+                                   (const struct dt_key *)dotdot, th);
+               if (rc != 0)
+                       GOTO(out, rc);
+
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
                    cfs_fail_val == i)
                        snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
@@ -4111,7 +4201,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env,
 
        CDEBUG(D_LAYOUT, "repeating component %d\n", index);
 
-       OBD_ALLOC(comp_array, sizeof(*comp_array) * new_cnt);
+       OBD_ALLOC_PTR_ARRAY(comp_array, new_cnt);
        if (comp_array == NULL)
                GOTO(out, rc = -ENOMEM);
 
@@ -4133,6 +4223,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env,
        new_comp->llc_flags &= ~LCME_FL_INIT;
        new_comp->llc_stripe = NULL;
        new_comp->llc_stripes_allocated = 0;
+       new_comp->llc_ost_indices = NULL;
        new_comp->llc_stripe_offset = LOV_OFFSET_DEFAULT;
        /* for uninstantiated components, layout gen stores default stripe
         * offset */
@@ -4158,8 +4249,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env,
                new_comp->llc_ostlist.op_array = op_array;
        }
 
-       OBD_FREE(lo->ldo_comp_entries,
-                sizeof(*comp_array) * lo->ldo_comp_cnt);
+       OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt);
        lo->ldo_comp_entries = comp_array;
        lo->ldo_comp_cnt = new_cnt;
 
@@ -4173,7 +4263,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env,
        EXIT;
 out:
        if (rc)
-               OBD_FREE(comp_array, sizeof(*comp_array) * new_cnt);
+               OBD_FREE_PTR_ARRAY(comp_array, new_cnt);
 
        return rc;
 }
@@ -4190,12 +4280,11 @@ static int lod_layout_data_init(struct lod_thread_info *info, __u32 comp_cnt)
                RETURN(0);
 
        if (info->lti_comp_size > 0) {
-               OBD_FREE(info->lti_comp_idx,
-                        info->lti_comp_size * sizeof(__u32));
+               OBD_FREE_PTR_ARRAY(info->lti_comp_idx, info->lti_comp_size);
                info->lti_comp_size = 0;
        }
 
-       OBD_ALLOC(info->lti_comp_idx, comp_cnt * sizeof(__u32));
+       OBD_ALLOC_PTR_ARRAY(info->lti_comp_idx, comp_cnt);
        if (!info->lti_comp_idx)
                RETURN(-ENOMEM);
 
@@ -4282,11 +4371,11 @@ static int lod_layout_del_prep_layout(const struct lu_env *env,
                        lu_object_put(env, &obj->do_lu);
                        lod_comp->llc_stripe[j] = NULL;
                }
-               OBD_FREE(lod_comp->llc_stripe, sizeof(*lod_comp->llc_stripe) *
-                                       lod_comp->llc_stripes_allocated);
+               OBD_FREE_PTR_ARRAY(lod_comp->llc_stripe,
+                                  lod_comp->llc_stripes_allocated);
                lod_comp->llc_stripe = NULL;
-               OBD_FREE(lod_comp->llc_ost_indices,
-                        sizeof(__u32) * lod_comp->llc_stripes_allocated);
+               OBD_FREE_PTR_ARRAY(lod_comp->llc_ost_indices,
+                                  lod_comp->llc_stripes_allocated);
                lod_comp->llc_ost_indices = NULL;
                lod_comp->llc_stripes_allocated = 0;
        }
@@ -4299,7 +4388,7 @@ static int lod_layout_del_prep_layout(const struct lu_env *env,
        if (info->lti_count > 0) {
                struct lod_layout_component *comp_array;
 
-               OBD_ALLOC(comp_array, sizeof(*comp_array) * info->lti_count);
+               OBD_ALLOC_PTR_ARRAY(comp_array, info->lti_count);
                if (comp_array == NULL)
                        GOTO(out, rc = -ENOMEM);
 
@@ -4309,8 +4398,7 @@ static int lod_layout_del_prep_layout(const struct lu_env *env,
                               sizeof(*comp_array));
                }
 
-               OBD_FREE(lo->ldo_comp_entries,
-                        sizeof(*comp_array) * lo->ldo_comp_cnt);
+               OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt);
                lo->ldo_comp_entries = comp_array;
                lo->ldo_comp_cnt = info->lti_count;
        } else {
@@ -6203,7 +6291,9 @@ static int lod_invalidate(const struct lu_env *env, struct dt_object *dt)
 }
 
 static int lod_declare_instantiate_components(const struct lu_env *env,
-               struct lod_object *lo, struct thandle *th)
+                                             struct lod_object *lo,
+                                             struct thandle *th,
+                                             __u64 reserve)
 {
        struct lod_thread_info *info = lod_env_info(env);
        int i;
@@ -6214,7 +6304,7 @@ static int lod_declare_instantiate_components(const struct lu_env *env,
 
        for (i = 0; i < info->lti_count; i++) {
                rc = lod_qos_prep_create(env, lo, NULL, th,
-                                        info->lti_comp_idx[i]);
+                                        info->lti_comp_idx[i], reserve);
                if (rc)
                        break;
        }
@@ -6250,15 +6340,15 @@ static int lod_declare_instantiate_components(const struct lu_env *env,
  */
 static bool lod_sel_osts_allowed(const struct lu_env *env,
                                 struct lod_object *lo,
-                                int index, __u64 extension_size,
+                                int index, __u64 reserve,
                                 struct lu_extent *extent,
                                 struct lu_extent *comp_extent, int write)
 {
        struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[index];
        struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
-       struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs;
+       struct lod_thread_info *tinfo = lod_env_info(env);
+       struct obd_statfs *sfs = &tinfo->lti_osfs;
        __u64 available = 0;
-       __u64 size;
        bool ret = true;
        int i, rc;
 
@@ -6266,21 +6356,6 @@ static bool lod_sel_osts_allowed(const struct lu_env *env,
 
        LASSERT(lod_comp->llc_stripe_count != 0);
 
-       if (write == 0 ||
-           (extent->e_start == 0 && extent->e_end == OBD_OBJECT_EOF)) {
-               /* truncate or append */
-               size = extension_size;
-       } else {
-               /* In case of write op, check the real write extent,
-                * it may be larger than the extension_size */
-               size = roundup(min(extent->e_end, comp_extent->e_end) -
-                              max(extent->e_start, comp_extent->e_start),
-                              extension_size);
-       }
-       /* extension_size is file level, so we must divide by stripe count to
-        * compare it to available space on a single OST */
-       size /= lod_comp->llc_stripe_count;
-
        lod_getref(&lod->lod_ost_descs);
        for (i = 0; i < lod_comp->llc_stripe_count; i++) {
                int index = lod_comp->llc_ost_indices[i];
@@ -6307,7 +6382,7 @@ static bool lod_sel_osts_allowed(const struct lu_env *env,
                if (j < lod_comp->llc_stripe_count)
                        continue;
 
-               if (!cfs_bitmap_check(lod->lod_ost_bitmap, index)) {
+               if (!test_bit(index, lod->lod_ost_bitmap)) {
                        CDEBUG(D_LAYOUT, "ost %d no longer present\n", index);
                        ret = false;
                        break;
@@ -6321,9 +6396,9 @@ static bool lod_sel_osts_allowed(const struct lu_env *env,
                        break;
                }
 
-               if (sfs->os_state & OS_STATE_ENOSPC ||
-                   sfs->os_state & OS_STATE_READONLY ||
-                   sfs->os_state & OS_STATE_DEGRADED) {
+               if (sfs->os_state & OS_STATFS_ENOSPC ||
+                   sfs->os_state & OS_STATFS_READONLY ||
+                   sfs->os_state & OS_STATFS_DEGRADED) {
                        CDEBUG(D_LAYOUT, "ost %d is not availble for SEL "
                               "extension, state %u\n", index, sfs->os_state);
                        ret = false;
@@ -6341,11 +6416,11 @@ static bool lod_sel_osts_allowed(const struct lu_env *env,
                       (100ull * sfs->os_bavail) / sfs->os_blocks,
                       (100ull * sfs->os_bfree) / sfs->os_blocks);
 
-               if (size * repeated > available) {
+               if (reserve * repeated > available) {
                        ret = false;
                        CDEBUG(D_LAYOUT, "low space on ost %d, available %llu "
-                              "< extension size %llu\n", index, available,
-                              extension_size);
+                              "< extension size %llu repeated %d\n", index,
+                              available, reserve, repeated);
                        break;
                }
        }
@@ -6451,6 +6526,26 @@ static __u64 lod_extension_new_end(__u64 extension_size, __u64 extent_end,
        return new_end;
 }
 
+/**
+ * Calculate the exact reservation (per-OST extension_size) on the OSTs being
+ * instantiated. It needs to be calculated in advance and taken into account at
+ * the instantiation time, because otherwise lod_statfs_and_check() may consider
+ * an OST as OK, but SEL needs its extension_size to fit the free space and the
+ * OST may turn out to be low-on-space, thus inappropriate OST may be used and
+ * ENOSPC occurs.
+ *
+ * \param[in] lod_comp         lod component we are checking
+ *
+ * \retval     size to reserved on each OST of lod_comp's stripe.
+ */
+static __u64 lod_sel_stripe_reserved(struct lod_layout_component *lod_comp)
+{
+       /* extension_size is file level, so we must divide by stripe count to
+        * compare it to available space on a single OST */
+       return  lod_comp->llc_stripe_size * SEL_UNIT_SIZE /
+               lod_comp->llc_stripe_count;
+}
+
 /* As lod_sel_handler() could be re-entered for the same component several
  * times, this is the data for the next call. Fields could be changed to
  * component indexes when needed, (e.g. if there is no need to instantiate
@@ -6532,7 +6627,7 @@ static int lod_sel_handler(const struct lu_env *env,
        struct lod_layout_component *lod_comp;
        struct lod_layout_component *prev;
        struct lod_layout_component *next = NULL;
-       __u64 extension_size;
+       __u64 extension_size, reserve;
        __u64 new_end = 0;
        bool repeated;
        int change = 0;
@@ -6569,11 +6664,13 @@ static int lod_sel_handler(const struct lu_env *env,
                RETURN(-EINVAL);
        }
 
+       reserve = lod_sel_stripe_reserved(lod_comp);
+
        if (!prev->llc_stripe) {
                CDEBUG(D_LAYOUT, "Previous component not inited\n");
                info->lti_count = 1;
                info->lti_comp_idx[0] = index - 1;
-               rc = lod_declare_instantiate_components(env, lo, th);
+               rc = lod_declare_instantiate_components(env, lo, th, reserve);
                /* ENOSPC tells us we can't use this component.  If there is
                 * a next or we are repeating, we either spill over (next) or
                 * extend the original comp (repeat).  Otherwise, return the
@@ -6585,8 +6682,7 @@ static int lod_sel_handler(const struct lu_env *env,
        }
 
        if (sd->sd_force == 0 && rc == 0)
-               rc = !lod_sel_osts_allowed(env, lo, index - 1,
-                                          extension_size, extent,
+               rc = !lod_sel_osts_allowed(env, lo, index - 1, reserve, extent,
                                           &lod_comp->llc_extent, write);
 
        repeated = !!(sd->sd_repeat);
@@ -6900,7 +6996,7 @@ static int lod_declare_update_plain(const struct lu_env *env,
                RETURN(-EALREADY);
 
        lod_obj_inc_layout_gen(lo);
-       rc = lod_declare_instantiate_components(env, lo, th);
+       rc = lod_declare_instantiate_components(env, lo, th, 0);
        EXIT;
 out:
        if (rc)
@@ -7010,8 +7106,8 @@ static inline int lod_check_ost_avail(const struct lu_env *env,
 
        ost = OST_TGT(lod, idx);
        if (ost->ltd_statfs.os_state &
-               (OS_STATE_READONLY | OS_STATE_ENOSPC | OS_STATE_ENOINO |
-                OS_STATE_NOPRECREATE) ||
+               (OS_STATFS_READONLY | OS_STATFS_ENOSPC | OS_STATFS_ENOINO |
+                OS_STATFS_NOPRECREATE) ||
            ost->ltd_active == 0) {
                CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail, rc = %d\n",
                       PFID(lod_object_fid(lo)), index, idx, rc);
@@ -7317,7 +7413,7 @@ static int lod_declare_update_rdonly(const struct lu_env *env,
                lo->ldo_layout_gen = layout_version & 0xffff;
        }
 
-       rc = lod_declare_instantiate_components(env, lo, th);
+       rc = lod_declare_instantiate_components(env, lo, th, 0);
        if (rc)
                GOTO(out, rc);
 
@@ -7357,7 +7453,7 @@ static int lod_declare_update_write_pending(const struct lu_env *env,
                if (lo->ldo_mirrors[i].lme_stale)
                        continue;
 
-               LASSERTF(primary < 0, DFID " has multiple primary: %u / %u",
+               LASSERTF(primary < 0, DFID " has multiple primary: %u / %u\n",
                         PFID(lod_object_fid(lo)),
                         lo->ldo_mirrors[i].lme_id,
                         lo->ldo_mirrors[primary].lme_id);
@@ -7461,7 +7557,7 @@ static int lod_declare_update_write_pending(const struct lu_env *env,
                lo->ldo_flr_state = LCM_FL_SYNC_PENDING;
        }
 
-       rc = lod_declare_instantiate_components(env, lo, th);
+       rc = lod_declare_instantiate_components(env, lo, th, 0);
        if (rc)
                GOTO(out, rc);
 
@@ -7628,8 +7724,7 @@ static int lod_dir_declare_layout_attach(const struct lu_env *env,
 
        dof->dof_type = DFT_DIR;
 
-       OBD_ALLOC(stripes,
-                 sizeof(*stripes) * (lo->ldo_dir_stripe_count + stripe_count));
+       OBD_ALLOC_PTR_ARRAY(stripes, (lo->ldo_dir_stripe_count + stripe_count));
        if (!stripes)
                RETURN(-ENOMEM);
 
@@ -7720,14 +7815,19 @@ static int lod_dir_declare_layout_attach(const struct lu_env *env,
        }
 
        if (lo->ldo_stripe)
-               OBD_FREE(lo->ldo_stripe,
-                        sizeof(*stripes) * lo->ldo_dir_stripes_allocated);
+               OBD_FREE_PTR_ARRAY(lo->ldo_stripe,
+                                  lo->ldo_dir_stripes_allocated);
        lo->ldo_stripe = stripes;
        lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count;
        lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type);
        lo->ldo_dir_stripe_count += stripe_count;
        lo->ldo_dir_stripes_allocated += stripe_count;
-       lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION;
+
+       /* plain directory split creates target as a plain directory, while
+        * after source attached as the first stripe, it becomes a striped
+        * directory, set correct do_index_ops, otherwise it can't be unlinked.
+        */
+       dt->do_index_ops = &lod_striped_index_ops;
 
        RETURN(0);
 out:
@@ -7735,8 +7835,7 @@ out:
        while (i < lo->ldo_dir_stripe_count + stripe_count && stripes[i])
                dt_object_put(env, stripes[i++]);
 
-       OBD_FREE(stripes,
-                sizeof(*stripes) * (stripe_count + lo->ldo_dir_stripe_count));
+       OBD_FREE_PTR_ARRAY(stripes, stripe_count + lo->ldo_dir_stripe_count);
        return rc;
 }
 
@@ -7906,6 +8005,86 @@ static int lod_dir_declare_layout_shrink(const struct lu_env *env,
        return rc;
 }
 
+/**
+ * Allocate stripes for split directory.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       target object
+ * \param[in] mlc      layout change data
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 on success
+ * \retval             negative if failed
+ */
+static int lod_dir_declare_layout_split(const struct lu_env *env,
+                                       struct dt_object *dt,
+                                       const struct md_layout_change *mlc,
+                                       struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct dt_object_format *dof = &info->lti_format;
+       struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata;
+       struct dt_object **stripes;
+       u32 stripe_count;
+       u32 saved_count;
+       int i;
+       int rc;
+
+       ENTRY;
+
+       LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
+       LASSERT(le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT);
+
+       saved_count = lo->ldo_dir_stripes_allocated;
+       stripe_count = le32_to_cpu(lum->lum_stripe_count);
+       if (stripe_count <= saved_count)
+               RETURN(-EINVAL);
+
+       dof->dof_type = DFT_DIR;
+
+       OBD_ALLOC(stripes, sizeof(*stripes) * stripe_count);
+       if (!stripes)
+               RETURN(-ENOMEM);
+
+       for (i = 0; i < lo->ldo_dir_stripes_allocated; i++)
+               stripes[i] = lo->ldo_stripe[i];
+
+       lod_qos_statfs_update(env, lod, &lod->lod_mdt_descs);
+       rc = lod_mdt_alloc_qos(env, lo, stripes, saved_count, stripe_count);
+       if (rc == -EAGAIN)
+               rc = lod_mdt_alloc_rr(env, lo, stripes, saved_count,
+                                     stripe_count);
+       if (rc < 0) {
+               OBD_FREE(stripes, sizeof(*stripes) * stripe_count);
+               RETURN(rc);
+       }
+
+       LASSERT(rc > saved_count);
+       OBD_FREE(lo->ldo_stripe,
+                sizeof(*stripes) * lo->ldo_dir_stripes_allocated);
+       lo->ldo_stripe = stripes;
+       lo->ldo_dir_striped = 1;
+       lo->ldo_dir_stripe_count = rc;
+       lo->ldo_dir_stripes_allocated = stripe_count;
+       lo->ldo_dir_split_hash = lo->ldo_dir_hash_type;
+       lo->ldo_dir_hash_type = le32_to_cpu(lum->lum_hash_type);
+       if (!lmv_is_known_hash_type(lo->ldo_dir_hash_type))
+               lo->ldo_dir_hash_type =
+                       lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern;
+       lo->ldo_dir_hash_type |= LMV_HASH_FLAG_SPLIT | LMV_HASH_FLAG_MIGRATION;
+       lo->ldo_dir_split_offset = saved_count;
+       lo->ldo_dir_layout_version++;
+       lo->ldo_dir_stripe_loaded = 1;
+
+       rc = lod_dir_declare_create_stripes(env, dt, mlc->mlc_attr, dof, th);
+       if (rc)
+               lod_striping_free(env, lo);
+
+       RETURN(rc);
+}
+
 /*
  * detach all stripes from dir master object, NB, stripes are not destroyed, but
  * deleted from it's parent namespace, this function is called in two places:
@@ -7972,11 +8151,11 @@ static int lod_dir_layout_detach(const struct lu_env *env,
                if (dto)
                        dt_object_put(env, dto);
        }
-       OBD_FREE(lo->ldo_stripe,
-                sizeof(struct dt_object *) * lo->ldo_dir_stripes_allocated);
+       OBD_FREE_PTR_ARRAY(lo->ldo_stripe, lo->ldo_dir_stripes_allocated);
        lo->ldo_stripe = NULL;
        lo->ldo_dir_stripes_allocated = 0;
        lo->ldo_dir_stripe_count = 0;
+       dt->do_index_ops = &lod_index_ops;
 
        RETURN(rc);
 }
@@ -8093,6 +8272,7 @@ static mlc_handler dir_mlc_declare_ops[MD_LAYOUT_MAX] = {
        [MD_LAYOUT_ATTACH] = lod_dir_declare_layout_attach,
        [MD_LAYOUT_DETACH] = lod_dir_declare_layout_detach,
        [MD_LAYOUT_SHRINK] = lod_dir_declare_layout_shrink,
+       [MD_LAYOUT_SPLIT]  = lod_dir_declare_layout_split,
 };
 
 static mlc_handler dir_mlc_ops[MD_LAYOUT_MAX] = {
@@ -8332,7 +8512,7 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
 
        if (ltd != NULL) {
                if (ltd->ltd_tgts_size > idx &&
-                   cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
+                   test_bit(idx, ltd->ltd_tgt_bitmap)) {
                        tgt = LTD_TGT(ltd, idx);
 
                        LASSERT(tgt != NULL);
@@ -8456,13 +8636,11 @@ void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo)
                                        lu_object_put(env,
                                               &lod_comp->llc_stripe[j]->do_lu);
                        }
-                       OBD_FREE(lod_comp->llc_stripe,
-                                sizeof(struct dt_object *) *
-                                lod_comp->llc_stripes_allocated);
+                       OBD_FREE_PTR_ARRAY(lod_comp->llc_stripe,
+                                          lod_comp->llc_stripes_allocated);
                        lod_comp->llc_stripe = NULL;
-                       OBD_FREE(lod_comp->llc_ost_indices,
-                                sizeof(__u32) *
-                                lod_comp->llc_stripes_allocated);
+                       OBD_FREE_PTR_ARRAY(lod_comp->llc_ost_indices,
+                                          lod_comp->llc_stripes_allocated);
                        lod_comp->llc_ost_indices = NULL;
                        lod_comp->llc_stripes_allocated = 0;
                }