Whamcloud - gitweb
LU-9859 libcfs: move tgt_descs to standard Linux bitmaps.
[fs/lustre-release.git] / lustre / lod / lod_object.c
index e81de8c..0381a6e 100644 (file)
@@ -367,6 +367,50 @@ static struct dt_index_operations lod_index_ops = {
 };
 
 /**
+ * Implementation of dt_index_operations::dio_lookup
+ *
+ * Used with striped directories.
+ *
+ * \see dt_index_operations::dio_lookup() in the API description for details.
+ */
+static int lod_striped_lookup(const struct lu_env *env, struct dt_object *dt,
+                     struct dt_rec *rec, const struct dt_key *key)
+{
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct dt_object *next;
+       const char *name = (const char *)key;
+
+       LASSERT(lo->ldo_dir_stripe_count > 0);
+
+       if (strcmp(name, dot) == 0) {
+               struct lu_fid *fid = (struct lu_fid *)rec;
+
+               *fid = *lod_object_fid(lo);
+               return 1;
+       }
+
+       if (strcmp(name, dotdot) == 0) {
+               next = dt_object_child(dt);
+       } else {
+               int index;
+
+               index = __lmv_name_to_stripe_index(lo->ldo_dir_hash_type,
+                                                  lo->ldo_dir_stripe_count,
+                                                  lo->ldo_dir_migrate_hash,
+                                                  lo->ldo_dir_migrate_offset,
+                                                  name, strlen(name), true);
+               if (index < 0)
+                       return index;
+
+               next = lo->ldo_stripe[index];
+               if (!next || !dt_object_exists(next))
+                       return -ENODEV;
+       }
+
+       return next->do_index_ops->dio_lookup(env, next, rec, key);
+}
+
+/**
  * Implementation of dt_it_ops::init.
  *
  * Used with striped objects. Internally just initializes the iterator
@@ -744,7 +788,7 @@ static int lod_striped_it_load(const struct lu_env *env,
 }
 
 static struct dt_index_operations lod_striped_index_ops = {
-       .dio_lookup             = lod_lookup,
+       .dio_lookup             = lod_striped_lookup,
        .dio_declare_insert     = lod_declare_insert,
        .dio_insert             = lod_insert,
        .dio_declare_delete     = lod_declare_delete,
@@ -1075,6 +1119,37 @@ static int lod_attr_get(const struct lu_env *env,
        return dt_attr_get(env, dt_object_child(dt), attr);
 }
 
+void lod_adjust_stripe_size(struct lod_layout_component *comp,
+                           __u32 def_stripe_size)
+{
+       __u64 comp_end = comp->llc_extent.e_end;
+
+       /* Choose stripe size if not set. Note that default stripe size can't
+        * be used as is, because it must be multiplier of given component end.
+        *  - first check if default stripe size can be used
+        *  - if not than select the lowest set bit from component end and use
+        *    that value as stripe size
+        */
+       if (!comp->llc_stripe_size) {
+               if (comp_end == LUSTRE_EOF || !(comp_end % def_stripe_size))
+                       comp->llc_stripe_size = def_stripe_size;
+               else
+                       comp->llc_stripe_size = comp_end & ~(comp_end - 1);
+       } else {
+               /* check stripe size is multiplier of comp_end */
+               if (comp_end != LUSTRE_EOF &&
+                   comp_end % comp->llc_stripe_size) {
+                       /* fix that even for defined stripe size but warn
+                        * about the problem, that must not happen
+                        */
+                       CWARN("Component end %llu is not aligned by the stripe size %u\n",
+                             comp_end, comp->llc_stripe_size);
+                       dump_stack();
+                       comp->llc_stripe_size = comp_end & ~(comp_end - 1);
+               }
+       }
+}
+
 static inline void lod_adjust_stripe_info(struct lod_layout_component *comp,
                                          struct lov_desc *desc,
                                          int append_stripes)
@@ -1087,8 +1162,8 @@ static inline void lod_adjust_stripe_info(struct lod_layout_component *comp,
                                desc->ld_default_stripe_count;
                }
        }
-       if (comp->llc_stripe_size <= 0)
-               comp->llc_stripe_size = desc->ld_default_stripe_size;
+
+       lod_adjust_stripe_size(comp, desc->ld_default_stripe_size);
 }
 
 int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
@@ -1529,13 +1604,13 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
                        /* The on-disk LMV EA only contains header, but the
                         * returned LMV EA size should contain the space for
                         * the FIDs of all shards of the striped directory. */
-                       if (lmv_is_sane(lmv1))
+                       if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1)
                                rc = lmv_mds_md_size(
                                        le32_to_cpu(lmv1->lmv_stripe_count),
                                        le32_to_cpu(lmv1->lmv_magic));
                } else {
-                       lfm = buf->lb_buf;
-                       if (le32_to_cpu(lfm->lfm_magic) == LMV_MAGIC_FOREIGN)
+                       lmv1 = buf->lb_buf;
+                       if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
                                RETURN(rc);
 
                        if (rc != sizeof(*lmv1))
@@ -1681,7 +1756,8 @@ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
        lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
        lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
        lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
-       if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) {
+       lmm1->lmv_layout_version = cpu_to_le32(lo->ldo_dir_layout_version);
+       if (lod_is_layout_changing(lo)) {
                lmm1->lmv_migrate_hash = cpu_to_le32(lo->ldo_dir_migrate_hash);
                lmm1->lmv_migrate_offset =
                        cpu_to_le32(lo->ldo_dir_migrate_offset);
@@ -1742,8 +1818,7 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
                RETURN(-EINVAL);
 
        LASSERT(lo->ldo_stripe == NULL);
-       OBD_ALLOC(stripe, sizeof(stripe[0]) *
-                 (le32_to_cpu(lmv1->lmv_stripe_count)));
+       OBD_ALLOC_PTR_ARRAY(stripe, le32_to_cpu(lmv1->lmv_stripe_count));
        if (stripe == NULL)
                RETURN(-ENOMEM);
 
@@ -1787,6 +1862,8 @@ out:
        lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count);
        lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
        lo->ldo_dir_layout_version = le32_to_cpu(lmv1->lmv_layout_version);
+       lo->ldo_dir_migrate_offset = le32_to_cpu(lmv1->lmv_migrate_offset);
+       lo->ldo_dir_migrate_hash = le32_to_cpu(lmv1->lmv_migrate_hash);
        lo->ldo_dir_hash_type = le32_to_cpu(lmv1->lmv_hash_type);
        if (rc != 0)
                lod_striping_free_nolock(env, lo);
@@ -1858,31 +1935,74 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env,
                if (!dto)
                        continue;
 
-               rc = lod_sub_declare_create(env, dto, attr, NULL, dof, th);
-               if (rc != 0)
-                       GOTO(out, rc);
+               /* directory split skip create for existing stripes */
+               if (!(lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) {
+                       rc = lod_sub_declare_create(env, dto, attr, NULL, dof,
+                                                   th);
+                       if (rc != 0)
+                               GOTO(out, rc);
 
-               if (!dt_try_as_dir(env, dto))
-                       GOTO(out, rc = -EINVAL);
+                       if (!dt_try_as_dir(env, dto))
+                               GOTO(out, rc = -EINVAL);
 
-               rc = lod_sub_declare_ref_add(env, dto, th);
-               if (rc != 0)
-                       GOTO(out, rc);
+                       rc = lod_sub_declare_ref_add(env, dto, th);
+                       if (rc != 0)
+                               GOTO(out, rc);
 
-               rec->rec_fid = lu_object_fid(&dto->do_lu);
-               rc = lod_sub_declare_insert(env, dto,
-                                           (const struct dt_rec *)rec,
-                                           (const struct dt_key *)dot, th);
-               if (rc != 0)
-                       GOTO(out, rc);
+                       rec->rec_fid = lu_object_fid(&dto->do_lu);
+                       rc = lod_sub_declare_insert(env, dto,
+                                                   (const struct dt_rec *)rec,
+                                                   (const struct dt_key *)dot,
+                                                   th);
+                       if (rc != 0)
+                               GOTO(out, rc);
 
-               /* master stripe FID will be put to .. */
-               rec->rec_fid = lu_object_fid(&dt->do_lu);
-               rc = lod_sub_declare_insert(env, dto,
-                                           (const struct dt_rec *)rec,
-                                           (const struct dt_key *)dotdot, th);
-               if (rc != 0)
-                       GOTO(out, rc);
+                       /* master stripe FID will be put to .. */
+                       rec->rec_fid = lu_object_fid(&dt->do_lu);
+                       rc = lod_sub_declare_insert(env, dto,
+                                                 (const struct dt_rec *)rec,
+                                                 (const struct dt_key *)dotdot,
+                                                 th);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
+                           cfs_fail_val == i)
+                               snprintf(stripe_name, sizeof(info->lti_key),
+                                        DFID":%u",
+                                        PFID(lu_object_fid(&dto->do_lu)),
+                                        i + 1);
+                       else
+                               snprintf(stripe_name, sizeof(info->lti_key),
+                                        DFID":%u",
+                                        PFID(lu_object_fid(&dto->do_lu)), i);
+
+                       sname = lod_name_get(env, stripe_name,
+                                            strlen(stripe_name));
+                       rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
+                                             sname, lu_object_fid(&dt->do_lu));
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+                       linkea_buf.lb_len = ldata.ld_leh->leh_len;
+                       rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf,
+                                                      XATTR_NAME_LINK, 0, th);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       rec->rec_fid = lu_object_fid(&dto->do_lu);
+                       rc = lod_sub_declare_insert(env, dt_object_child(dt),
+                                       (const struct dt_rec *)rec,
+                                       (const struct dt_key *)stripe_name, th);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       rc = lod_sub_declare_ref_add(env, dt_object_child(dt),
+                                                    th);
+                       if (rc != 0)
+                               GOTO(out, rc);
+               }
 
                if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) ||
                    cfs_fail_val != i) {
@@ -1898,39 +2018,6 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env,
                        if (rc != 0)
                                GOTO(out, rc);
                }
-
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
-                   cfs_fail_val == i)
-                       snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
-                               PFID(lu_object_fid(&dto->do_lu)), i + 1);
-               else
-                       snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
-                               PFID(lu_object_fid(&dto->do_lu)), i);
-
-               sname = lod_name_get(env, stripe_name, strlen(stripe_name));
-               rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
-                                     sname, lu_object_fid(&dt->do_lu));
-               if (rc != 0)
-                       GOTO(out, rc);
-
-               linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
-               linkea_buf.lb_len = ldata.ld_leh->leh_len;
-               rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf,
-                                              XATTR_NAME_LINK, 0, th);
-               if (rc != 0)
-                       GOTO(out, rc);
-
-               rec->rec_fid = lu_object_fid(&dto->do_lu);
-               rc = lod_sub_declare_insert(env, dt_object_child(dt),
-                                           (const struct dt_rec *)rec,
-                                           (const struct dt_key *)stripe_name,
-                                           th);
-               if (rc != 0)
-                       GOTO(out, rc);
-
-               rc = lod_sub_declare_ref_add(env, dt_object_child(dt), th);
-               if (rc != 0)
-                       GOTO(out, rc);
        }
 
        rc = lod_sub_declare_xattr_set(env, dt_object_child(dt),
@@ -1973,7 +2060,6 @@ static int lod_mdt_alloc_specific(const struct lu_env *env,
                                  struct dt_object **stripes,
                                  __u32 *mdt_indices, bool is_specific)
 {
-       struct lod_thread_info  *info = lod_env_info(env);
        struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
        struct lu_tgt_descs *ltd = &lod->lod_mdt_descs;
        struct lu_tgt_desc *tgt = NULL;
@@ -2022,17 +2108,17 @@ static int lod_mdt_alloc_specific(const struct lu_env *env,
 
                        /* Sigh, this index is not in the bitmap, let's check
                         * next available target */
-                       if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx) &&
+                       if (!test_bit(idx, ltd->ltd_tgt_bitmap) &&
                            idx != master_index)
                                continue;
 
                        if (idx == master_index) {
                                /* Allocate the FID locally */
-                               rc = obd_fid_alloc(env, lod->lod_child_exp,
-                                                  &fid, NULL);
+                               tgt_dt = lod->lod_child;
+                               rc = dt_fid_alloc(env, tgt_dt, &fid, NULL,
+                                                 NULL);
                                if (rc < 0)
                                        continue;
-                               tgt_dt = lod->lod_child;
                                break;
                        }
 
@@ -2042,12 +2128,11 @@ static int lod_mdt_alloc_specific(const struct lu_env *env,
                                continue;
 
                        tgt_dt = tgt->ltd_tgt;
-                       rc = dt_statfs(env, tgt_dt, &info->lti_osfs);
-                       if (rc)
+                       if (!tgt->ltd_active)
                                /* this OSP doesn't feel well */
                                continue;
 
-                       rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
+                       rc = dt_fid_alloc(env, tgt_dt, &fid, NULL, NULL);
                        if (rc < 0)
                                continue;
 
@@ -2132,12 +2217,12 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
 
        stripe_count = lo->ldo_dir_stripe_count;
 
-       OBD_ALLOC(stripes, sizeof(stripes[0]) * stripe_count);
+       OBD_ALLOC_PTR_ARRAY(stripes, stripe_count);
        if (!stripes)
                RETURN(-ENOMEM);
 
        /* Allocate the first stripe locally */
-       rc = obd_fid_alloc(env, lod->lod_child_exp, &fid, NULL);
+       rc = dt_fid_alloc(env, lod->lod_child, &fid, NULL, NULL);
        if (rc < 0)
                GOTO(out, rc);
 
@@ -2148,14 +2233,15 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
 
        if (lo->ldo_dir_stripe_offset == LMV_OFFSET_DEFAULT) {
                lod_qos_statfs_update(env, lod, &lod->lod_mdt_descs);
-               rc = lod_mdt_alloc_qos(env, lo, stripes);
+               rc = lod_mdt_alloc_qos(env, lo, stripes, 1, stripe_count);
                if (rc == -EAGAIN)
-                       rc = lod_mdt_alloc_rr(env, lo, stripes);
+                       rc = lod_mdt_alloc_rr(env, lo, stripes, 1,
+                                             stripe_count);
        } else {
                int *idx_array;
                bool is_specific = false;
 
-               OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
+               OBD_ALLOC_PTR_ARRAY(idx_array, stripe_count);
                if (!idx_array)
                        GOTO(out, rc = -ENOMEM);
 
@@ -2171,7 +2257,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
                        lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
                rc = lod_mdt_alloc_specific(env, lo, stripes, idx_array,
                                            is_specific);
-               OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
+               OBD_FREE_PTR_ARRAY(idx_array, stripe_count);
        }
 
        if (rc < 0)
@@ -2198,7 +2284,7 @@ out:
                dt_object_put(env, stripes[0]);
        for (i = 1; i < stripe_count; i++)
                LASSERT(!stripes[i]);
-       OBD_FREE(stripes, sizeof(stripes[0]) * stripe_count);
+       OBD_FREE_PTR_ARRAY(stripes, stripe_count);
 
        return rc;
 }
@@ -2305,6 +2391,7 @@ static int lod_dir_layout_set(const struct lu_env *env,
 {
        struct dt_object *next = dt_object_child(dt);
        struct lod_object *lo = lod_dt_obj(dt);
+       struct lod_device *lod = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
        struct lmv_mds_md_v1 *lmv = buf->lb_buf;
        struct lmv_mds_md_v1 *slave_lmv;
        struct lu_buf slave_buf;
@@ -2313,10 +2400,29 @@ static int lod_dir_layout_set(const struct lu_env *env,
 
        ENTRY;
 
+       if (!lmv_is_sane2(lmv))
+               RETURN(-EINVAL);
+
+       /* adjust hash for dir merge, which may not be set in user command */
+       if (lmv_is_merging(lmv) && !lmv->lmv_migrate_hash)
+               lmv->lmv_merge_hash =
+                       lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern;
+
+       LMV_DEBUG(D_INFO, lmv, "set");
+
        rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV, fl, th);
        if (rc)
                RETURN(rc);
 
+       /* directory restripe may update stripe LMV directly */
+       if (!lo->ldo_dir_stripe_count)
+               RETURN(0);
+
+       lo->ldo_dir_hash_type = le32_to_cpu(lmv->lmv_hash_type);
+       lo->ldo_dir_migrate_offset = le32_to_cpu(lmv->lmv_migrate_offset);
+       lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_migrate_hash);
+       lo->ldo_dir_layout_version = le32_to_cpu(lmv->lmv_layout_version);
+
        OBD_ALLOC_PTR(slave_lmv);
        if (!slave_lmv)
                RETURN(-ENOMEM);
@@ -2338,7 +2444,6 @@ static int lod_dir_layout_set(const struct lu_env *env,
                        break;
        }
 
-       lod_striping_free(env, lod_dt_obj(dt));
        OBD_FREE_PTR(slave_lmv);
 
        RETURN(rc);
@@ -2376,7 +2481,7 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env,
                if (rc != 0)
                        RETURN(rc);
        } else if (strcmp(name, XATTR_NAME_LOV) == 0) {
-               rc = lod_verify_striping(d, lo, buf, false);
+               rc = lod_verify_striping(env, d, lo, buf, false);
                if (rc != 0)
                        RETURN(rc);
        }
@@ -2623,7 +2728,7 @@ static int lod_declare_layout_add(const struct lu_env *env,
        if (lo->ldo_flr_state != LCM_FL_NONE)
                RETURN(-EBUSY);
 
-       rc = lod_verify_striping(d, lo, buf, false);
+       rc = lod_verify_striping(env, d, lo, buf, false);
        if (rc != 0)
                RETURN(rc);
 
@@ -2637,7 +2742,7 @@ static int lod_declare_layout_add(const struct lu_env *env,
                RETURN(-EINVAL);
 
        array_cnt = lo->ldo_comp_cnt + comp_v1->lcm_entry_count;
-       OBD_ALLOC(comp_array, sizeof(*comp_array) * array_cnt);
+       OBD_ALLOC_PTR_ARRAY(comp_array, array_cnt);
        if (comp_array == NULL)
                RETURN(-ENOMEM);
 
@@ -2691,7 +2796,7 @@ static int lod_declare_layout_add(const struct lu_env *env,
                GOTO(error, rc);
        }
 
-       OBD_FREE(old_array, sizeof(*lod_comp) * old_array_cnt);
+       OBD_FREE_PTR_ARRAY(old_array, old_array_cnt);
 
        LASSERT(lo->ldo_mirror_count == 1);
        lo->ldo_mirrors[0].lme_end = array_cnt - 1;
@@ -2707,7 +2812,7 @@ error:
                        lod_comp->llc_pool = NULL;
                }
        }
-       OBD_FREE(comp_array, sizeof(*comp_array) * array_cnt);
+       OBD_FREE_PTR_ARRAY(comp_array, array_cnt);
        RETURN(rc);
 }
 
@@ -3713,9 +3818,15 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE))
                        continue;
 
-               /* if it's source stripe of migrating directory, don't create */
-               if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) &&
-                     i >= lo->ldo_dir_migrate_offset)) {
+               /* don't create stripe if:
+                * 1. it's source stripe of migrating directory
+                * 2. it's existed stripe of splitting directory
+                */
+               if ((lod_is_migrating(lo) && i >= lo->ldo_dir_migrate_offset) ||
+                   (lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) {
+                       if (!dt_object_exists(dto))
+                               GOTO(out, rc = -EINVAL);
+               } else {
                        dt_write_lock(env, dto, DT_TGT_CHILD);
                        rc = lod_sub_create(env, dto, attr, NULL, dof, th);
                        if (rc != 0) {
@@ -3736,12 +3847,6 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                                GOTO(out, rc);
                }
 
-               rec->rec_fid = lu_object_fid(&dt->do_lu);
-               rc = lod_sub_insert(env, dto, (struct dt_rec *)rec,
-                                   (const struct dt_key *)dotdot, th);
-               if (rc != 0)
-                       GOTO(out, rc);
-
                if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) ||
                    cfs_fail_val != i) {
                        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) &&
@@ -3758,6 +3863,21 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                                GOTO(out, rc);
                }
 
+               /* don't insert stripe if it's existed stripe of splitting
+                * directory (this directory is striped).
+                * NB, plain directory will insert itself as the first
+                * stripe in target.
+                */
+               if (lod_is_splitting(lo) && lo->ldo_dir_split_offset > 1 &&
+                   lo->ldo_dir_split_offset > i)
+                       continue;
+
+               rec->rec_fid = lu_object_fid(&dt->do_lu);
+               rc = lod_sub_insert(env, dto, (struct dt_rec *)rec,
+                                   (const struct dt_key *)dotdot, th);
+               if (rc != 0)
+                       GOTO(out, rc);
+
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
                    cfs_fail_val == i)
                        snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
@@ -4081,7 +4201,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env,
 
        CDEBUG(D_LAYOUT, "repeating component %d\n", index);
 
-       OBD_ALLOC(comp_array, sizeof(*comp_array) * new_cnt);
+       OBD_ALLOC_PTR_ARRAY(comp_array, new_cnt);
        if (comp_array == NULL)
                GOTO(out, rc = -ENOMEM);
 
@@ -4128,8 +4248,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env,
                new_comp->llc_ostlist.op_array = op_array;
        }
 
-       OBD_FREE(lo->ldo_comp_entries,
-                sizeof(*comp_array) * lo->ldo_comp_cnt);
+       OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt);
        lo->ldo_comp_entries = comp_array;
        lo->ldo_comp_cnt = new_cnt;
 
@@ -4143,7 +4262,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env,
        EXIT;
 out:
        if (rc)
-               OBD_FREE(comp_array, sizeof(*comp_array) * new_cnt);
+               OBD_FREE_PTR_ARRAY(comp_array, new_cnt);
 
        return rc;
 }
@@ -4160,12 +4279,11 @@ static int lod_layout_data_init(struct lod_thread_info *info, __u32 comp_cnt)
                RETURN(0);
 
        if (info->lti_comp_size > 0) {
-               OBD_FREE(info->lti_comp_idx,
-                        info->lti_comp_size * sizeof(__u32));
+               OBD_FREE_PTR_ARRAY(info->lti_comp_idx, info->lti_comp_size);
                info->lti_comp_size = 0;
        }
 
-       OBD_ALLOC(info->lti_comp_idx, comp_cnt * sizeof(__u32));
+       OBD_ALLOC_PTR_ARRAY(info->lti_comp_idx, comp_cnt);
        if (!info->lti_comp_idx)
                RETURN(-ENOMEM);
 
@@ -4252,11 +4370,11 @@ static int lod_layout_del_prep_layout(const struct lu_env *env,
                        lu_object_put(env, &obj->do_lu);
                        lod_comp->llc_stripe[j] = NULL;
                }
-               OBD_FREE(lod_comp->llc_stripe, sizeof(*lod_comp->llc_stripe) *
-                                       lod_comp->llc_stripes_allocated);
+               OBD_FREE_PTR_ARRAY(lod_comp->llc_stripe,
+                                  lod_comp->llc_stripes_allocated);
                lod_comp->llc_stripe = NULL;
-               OBD_FREE(lod_comp->llc_ost_indices,
-                        sizeof(__u32) * lod_comp->llc_stripes_allocated);
+               OBD_FREE_PTR_ARRAY(lod_comp->llc_ost_indices,
+                                  lod_comp->llc_stripes_allocated);
                lod_comp->llc_ost_indices = NULL;
                lod_comp->llc_stripes_allocated = 0;
        }
@@ -4269,7 +4387,7 @@ static int lod_layout_del_prep_layout(const struct lu_env *env,
        if (info->lti_count > 0) {
                struct lod_layout_component *comp_array;
 
-               OBD_ALLOC(comp_array, sizeof(*comp_array) * info->lti_count);
+               OBD_ALLOC_PTR_ARRAY(comp_array, info->lti_count);
                if (comp_array == NULL)
                        GOTO(out, rc = -ENOMEM);
 
@@ -4279,8 +4397,7 @@ static int lod_layout_del_prep_layout(const struct lu_env *env,
                               sizeof(*comp_array));
                }
 
-               OBD_FREE(lo->ldo_comp_entries,
-                        sizeof(*comp_array) * lo->ldo_comp_cnt);
+               OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt);
                lo->ldo_comp_entries = comp_array;
                lo->ldo_comp_cnt = info->lti_count;
        } else {
@@ -6277,7 +6394,7 @@ static bool lod_sel_osts_allowed(const struct lu_env *env,
                if (j < lod_comp->llc_stripe_count)
                        continue;
 
-               if (!cfs_bitmap_check(lod->lod_ost_bitmap, index)) {
+               if (!test_bit(index, lod->lod_ost_bitmap)) {
                        CDEBUG(D_LAYOUT, "ost %d no longer present\n", index);
                        ret = false;
                        break;
@@ -6291,9 +6408,9 @@ static bool lod_sel_osts_allowed(const struct lu_env *env,
                        break;
                }
 
-               if (sfs->os_state & OS_STATE_ENOSPC ||
-                   sfs->os_state & OS_STATE_READONLY ||
-                   sfs->os_state & OS_STATE_DEGRADED) {
+               if (sfs->os_state & OS_STATFS_ENOSPC ||
+                   sfs->os_state & OS_STATFS_READONLY ||
+                   sfs->os_state & OS_STATFS_DEGRADED) {
                        CDEBUG(D_LAYOUT, "ost %d is not availble for SEL "
                               "extension, state %u\n", index, sfs->os_state);
                        ret = false;
@@ -6980,8 +7097,8 @@ static inline int lod_check_ost_avail(const struct lu_env *env,
 
        ost = OST_TGT(lod, idx);
        if (ost->ltd_statfs.os_state &
-               (OS_STATE_READONLY | OS_STATE_ENOSPC | OS_STATE_ENOINO |
-                OS_STATE_NOPRECREATE) ||
+               (OS_STATFS_READONLY | OS_STATFS_ENOSPC | OS_STATFS_ENOINO |
+                OS_STATFS_NOPRECREATE) ||
            ost->ltd_active == 0) {
                CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail, rc = %d\n",
                       PFID(lod_object_fid(lo)), index, idx, rc);
@@ -7327,7 +7444,7 @@ static int lod_declare_update_write_pending(const struct lu_env *env,
                if (lo->ldo_mirrors[i].lme_stale)
                        continue;
 
-               LASSERTF(primary < 0, DFID " has multiple primary: %u / %u",
+               LASSERTF(primary < 0, DFID " has multiple primary: %u / %u\n",
                         PFID(lod_object_fid(lo)),
                         lo->ldo_mirrors[i].lme_id,
                         lo->ldo_mirrors[primary].lme_id);
@@ -7598,8 +7715,7 @@ static int lod_dir_declare_layout_attach(const struct lu_env *env,
 
        dof->dof_type = DFT_DIR;
 
-       OBD_ALLOC(stripes,
-                 sizeof(*stripes) * (lo->ldo_dir_stripe_count + stripe_count));
+       OBD_ALLOC_PTR_ARRAY(stripes, (lo->ldo_dir_stripe_count + stripe_count));
        if (!stripes)
                RETURN(-ENOMEM);
 
@@ -7690,14 +7806,19 @@ static int lod_dir_declare_layout_attach(const struct lu_env *env,
        }
 
        if (lo->ldo_stripe)
-               OBD_FREE(lo->ldo_stripe,
-                        sizeof(*stripes) * lo->ldo_dir_stripes_allocated);
+               OBD_FREE_PTR_ARRAY(lo->ldo_stripe,
+                                  lo->ldo_dir_stripes_allocated);
        lo->ldo_stripe = stripes;
        lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count;
        lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type);
        lo->ldo_dir_stripe_count += stripe_count;
        lo->ldo_dir_stripes_allocated += stripe_count;
-       lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION;
+
+       /* plain directory split creates target as a plain directory, while
+        * after source attached as the first stripe, it becomes a striped
+        * directory, set correct do_index_ops, otherwise it can't be unlinked.
+        */
+       dt->do_index_ops = &lod_striped_index_ops;
 
        RETURN(0);
 out:
@@ -7705,8 +7826,7 @@ out:
        while (i < lo->ldo_dir_stripe_count + stripe_count && stripes[i])
                dt_object_put(env, stripes[i++]);
 
-       OBD_FREE(stripes,
-                sizeof(*stripes) * (stripe_count + lo->ldo_dir_stripe_count));
+       OBD_FREE_PTR_ARRAY(stripes, stripe_count + lo->ldo_dir_stripe_count);
        return rc;
 }
 
@@ -7876,6 +7996,86 @@ static int lod_dir_declare_layout_shrink(const struct lu_env *env,
        return rc;
 }
 
+/**
+ * Allocate stripes for split directory.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       target object
+ * \param[in] mlc      layout change data
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 on success
+ * \retval             negative if failed
+ */
+static int lod_dir_declare_layout_split(const struct lu_env *env,
+                                       struct dt_object *dt,
+                                       const struct md_layout_change *mlc,
+                                       struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct dt_object_format *dof = &info->lti_format;
+       struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata;
+       struct dt_object **stripes;
+       u32 stripe_count;
+       u32 saved_count;
+       int i;
+       int rc;
+
+       ENTRY;
+
+       LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
+       LASSERT(le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT);
+
+       saved_count = lo->ldo_dir_stripes_allocated;
+       stripe_count = le32_to_cpu(lum->lum_stripe_count);
+       if (stripe_count <= saved_count)
+               RETURN(-EINVAL);
+
+       dof->dof_type = DFT_DIR;
+
+       OBD_ALLOC(stripes, sizeof(*stripes) * stripe_count);
+       if (!stripes)
+               RETURN(-ENOMEM);
+
+       for (i = 0; i < lo->ldo_dir_stripes_allocated; i++)
+               stripes[i] = lo->ldo_stripe[i];
+
+       lod_qos_statfs_update(env, lod, &lod->lod_mdt_descs);
+       rc = lod_mdt_alloc_qos(env, lo, stripes, saved_count, stripe_count);
+       if (rc == -EAGAIN)
+               rc = lod_mdt_alloc_rr(env, lo, stripes, saved_count,
+                                     stripe_count);
+       if (rc < 0) {
+               OBD_FREE(stripes, sizeof(*stripes) * stripe_count);
+               RETURN(rc);
+       }
+
+       LASSERT(rc > saved_count);
+       OBD_FREE(lo->ldo_stripe,
+                sizeof(*stripes) * lo->ldo_dir_stripes_allocated);
+       lo->ldo_stripe = stripes;
+       lo->ldo_dir_striped = 1;
+       lo->ldo_dir_stripe_count = rc;
+       lo->ldo_dir_stripes_allocated = stripe_count;
+       lo->ldo_dir_split_hash = lo->ldo_dir_hash_type;
+       lo->ldo_dir_hash_type = le32_to_cpu(lum->lum_hash_type);
+       if (!lmv_is_known_hash_type(lo->ldo_dir_hash_type))
+               lo->ldo_dir_hash_type =
+                       lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern;
+       lo->ldo_dir_hash_type |= LMV_HASH_FLAG_SPLIT | LMV_HASH_FLAG_MIGRATION;
+       lo->ldo_dir_split_offset = saved_count;
+       lo->ldo_dir_layout_version++;
+       lo->ldo_dir_stripe_loaded = 1;
+
+       rc = lod_dir_declare_create_stripes(env, dt, mlc->mlc_attr, dof, th);
+       if (rc)
+               lod_striping_free(env, lo);
+
+       RETURN(rc);
+}
+
 /*
  * detach all stripes from dir master object, NB, stripes are not destroyed, but
  * deleted from it's parent namespace, this function is called in two places:
@@ -7942,8 +8142,7 @@ static int lod_dir_layout_detach(const struct lu_env *env,
                if (dto)
                        dt_object_put(env, dto);
        }
-       OBD_FREE(lo->ldo_stripe,
-                sizeof(struct dt_object *) * lo->ldo_dir_stripes_allocated);
+       OBD_FREE_PTR_ARRAY(lo->ldo_stripe, lo->ldo_dir_stripes_allocated);
        lo->ldo_stripe = NULL;
        lo->ldo_dir_stripes_allocated = 0;
        lo->ldo_dir_stripe_count = 0;
@@ -8063,6 +8262,7 @@ static mlc_handler dir_mlc_declare_ops[MD_LAYOUT_MAX] = {
        [MD_LAYOUT_ATTACH] = lod_dir_declare_layout_attach,
        [MD_LAYOUT_DETACH] = lod_dir_declare_layout_detach,
        [MD_LAYOUT_SHRINK] = lod_dir_declare_layout_shrink,
+       [MD_LAYOUT_SPLIT]  = lod_dir_declare_layout_split,
 };
 
 static mlc_handler dir_mlc_ops[MD_LAYOUT_MAX] = {
@@ -8302,7 +8502,7 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
 
        if (ltd != NULL) {
                if (ltd->ltd_tgts_size > idx &&
-                   cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
+                   test_bit(idx, ltd->ltd_tgt_bitmap)) {
                        tgt = LTD_TGT(ltd, idx);
 
                        LASSERT(tgt != NULL);
@@ -8426,13 +8626,11 @@ void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo)
                                        lu_object_put(env,
                                               &lod_comp->llc_stripe[j]->do_lu);
                        }
-                       OBD_FREE(lod_comp->llc_stripe,
-                                sizeof(struct dt_object *) *
-                                lod_comp->llc_stripes_allocated);
+                       OBD_FREE_PTR_ARRAY(lod_comp->llc_stripe,
+                                          lod_comp->llc_stripes_allocated);
                        lod_comp->llc_stripe = NULL;
-                       OBD_FREE(lod_comp->llc_ost_indices,
-                                sizeof(__u32) *
-                                lod_comp->llc_stripes_allocated);
+                       OBD_FREE_PTR_ARRAY(lod_comp->llc_ost_indices,
+                                          lod_comp->llc_stripes_allocated);
                        lod_comp->llc_ost_indices = NULL;
                        lod_comp->llc_stripes_allocated = 0;
                }