X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=d7fdad2ff9646e92c6a018d979486a43a53e020b;hb=622a94d5e27ed3e596918863c08b304a6be9a646;hp=31caad5b8c1e475647474a38380a249fa2f72a8c;hpb=8733d5d764531621dc3e064dfb922e49373f7946;p=fs%2Flustre-release.git diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 31caad5..d7fdad2 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -98,9 +98,9 @@ static int lod_declare_insert(const struct lu_env *env, struct dt_object *dt, */ static int lod_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, - struct thandle *th, int ign) + struct thandle *th) { - return lod_sub_insert(env, dt_object_child(dt), rec, key, th, ign); + return lod_sub_insert(env, dt_object_child(dt), rec, key, th); } /** @@ -776,14 +776,9 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, int rc; ENTRY; - /* If it is not a striped directory, then load nothing. */ if (magic != LMV_MAGIC_V1) RETURN(0); - /* If it is in migration (or failure), then load nothing. */ - if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION) - RETURN(0); - stripes = le32_to_cpu(lmv1->lmv_stripe_count); if (stripes < 1) RETURN(0); @@ -1085,6 +1080,18 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, data->locd_comp_skip_cb(env, lo, i, data)) continue; + if (data->locd_comp_cb) { + rc = data->locd_comp_cb(env, lo, i, data); + if (rc) + RETURN(rc); + } + + /* could used just to do sth about component, not each + * stripes + */ + if (!data->locd_stripe_cb) + continue; + LASSERT(lod_comp->llc_stripe_count > 0); for (j = 0; j < lod_comp->llc_stripe_count; j++) { struct dt_object *dt = lod_comp->llc_stripe[j]; @@ -1603,9 +1610,15 @@ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, } lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store; + memset(lmm1, 0, sizeof(*lmm1)); lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC); lmm1->lmv_stripe_count = cpu_to_le32(stripe_count); lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type); + if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) { + lmm1->lmv_migrate_hash = cpu_to_le32(lo->ldo_dir_migrate_hash); + lmm1->lmv_migrate_offset = + cpu_to_le32(lo->ldo_dir_migrate_offset); + } rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu), &mdtidx, &type); if (rc != 0) @@ -1649,9 +1662,6 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, LASSERT(mutex_is_locked(&lo->ldo_layout_mutex)); - if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION) - RETURN(0); - if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) { lo->ldo_dir_slave_stripe = 1; RETURN(0); @@ -1865,6 +1875,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, struct dt_object_format *dof, struct thandle *th) { + struct lod_thread_info *info = lod_env_info(env); struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; struct lod_object *lo = lod_dt_obj(dt); @@ -1881,9 +1892,8 @@ static int lod_prep_md_striped_create(const struct lu_env *env, /* The lum has been verifed in lod_verify_md_striping */ LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC || le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC); - LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0); - stripe_count = le32_to_cpu(lum->lum_stripe_count); + stripe_count = lo->ldo_dir_stripe_count; OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count); if (idx_array == NULL) @@ -1957,7 +1967,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, continue; tgt_dt = tgt->ltd_tgt; - rc = dt_statfs(env, tgt_dt, NULL); + rc = dt_statfs(env, tgt_dt, &info->lti_osfs); if (rc) { /* this OSP doesn't feel well */ rc = 0; @@ -2092,6 +2102,284 @@ out: } /** + * Append source stripes after target stripes for migrating directory. NB, we + * only need to declare this, the append is done inside lod_xattr_set_lmv(). + * + * \param[in] env execution environment + * \param[in] dt target object + * \param[in] buf LMV buf which contains source stripe fids + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_dir_declare_layout_add(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct dt_object_format *dof = &info->lti_format; + struct lmv_mds_md_v1 *lmv = buf->lb_buf; + struct dt_object **stripe; + __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count); + struct lu_fid *fid = &info->lti_fid; + struct lod_tgt_desc *tgt; + struct dt_object *dto; + struct dt_device *tgt_dt; + int type = LU_SEQ_RANGE_ANY; + struct dt_insert_rec *rec = &info->lti_dt_rec; + char *stripe_name = info->lti_key; + struct lu_name *sname; + struct linkea_data ldata = { NULL }; + struct lu_buf linkea_buf; + __u32 idx; + int i; + int rc; + + ENTRY; + + if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1) + RETURN(-EINVAL); + + if (stripe_count == 0) + RETURN(-EINVAL); + + dof->dof_type = DFT_DIR; + + OBD_ALLOC(stripe, + sizeof(*stripe) * (lo->ldo_dir_stripe_count + stripe_count)); + if (stripe == NULL) + RETURN(-ENOMEM); + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) + stripe[i] = lo->ldo_stripe[i]; + + for (i = 0; i < stripe_count; i++) { + fid_le_to_cpu(fid, + &lmv->lmv_stripe_fids[i]); + if (!fid_is_sane(fid)) + GOTO(out, rc = -ESTALE); + + rc = lod_fld_lookup(env, lod, fid, &idx, &type); + if (rc) + GOTO(out, rc); + + if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) { + tgt_dt = lod->lod_child; + } else { + tgt = LTD_TGT(ltd, idx); + if (tgt == NULL) + GOTO(out, rc = -ESTALE); + tgt_dt = tgt->ltd_tgt; + } + + dto = dt_locate_at(env, tgt_dt, fid, + lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev, + NULL); + if (IS_ERR(dto)) + GOTO(out, rc = PTR_ERR(dto)); + + stripe[i + lo->ldo_dir_stripe_count] = dto; + + if (!dt_try_as_dir(env, dto)) + GOTO(out, rc = -ENOTDIR); + + rc = lod_sub_declare_ref_add(env, dto, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_xattr_set(env, dto, buf, + XATTR_NAME_LMV, 0, th); + if (rc) + GOTO(out, rc); + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), + i + lo->ldo_dir_stripe_count); + + sname = lod_name_get(env, stripe_name, strlen(stripe_name)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); + if (rc) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_insert(env, next, + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, + th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_ref_add(env, next, th); + if (rc) + GOTO(out, rc); + } + + if (lo->ldo_stripe) + OBD_FREE(lo->ldo_stripe, + sizeof(*stripe) * lo->ldo_dir_stripes_allocated); + lo->ldo_stripe = stripe; + lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count; + lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type); + lo->ldo_dir_stripe_count += stripe_count; + lo->ldo_dir_stripes_allocated += stripe_count; + lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION; + + RETURN(0); +out: + i = lo->ldo_dir_stripe_count; + while (i < lo->ldo_dir_stripe_count + stripe_count && stripe[i]) + dt_object_put(env, stripe[i++]); + + OBD_FREE(stripe, + sizeof(*stripe) * (stripe_count + lo->ldo_dir_stripe_count)); + RETURN(rc); +} + +static int lod_dir_declare_layout_delete(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct lmv_user_md *lmu = buf->lb_buf; + __u32 final_stripe_count; + char *stripe_name = info->lti_key; + struct dt_object *dto; + int i; + int rc = 0; + + if (!lmu) + return -EINVAL; + + final_stripe_count = le32_to_cpu(lmu->lum_stripe_count); + if (final_stripe_count >= lo->ldo_dir_stripe_count) + return -EINVAL; + + for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + LASSERT(dto); + + if (!dt_try_as_dir(env, dto)) + return -ENOTDIR; + + rc = lod_sub_declare_delete(env, dto, + (const struct dt_key *)dot, th); + if (rc) + return rc; + + rc = lod_sub_declare_ref_del(env, dto, th); + if (rc) + return rc; + + rc = lod_sub_declare_delete(env, dto, + (const struct dt_key *)dotdot, th); + if (rc) + return rc; + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + + rc = lod_sub_declare_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc) + return rc; + + rc = lod_sub_declare_ref_del(env, next, th); + if (rc) + return rc; + } + + return 0; +} + +/* + * delete stripes from dir master object, the lum_stripe_count in argument is + * the final stripe count, the stripes after that will be deleted, NB, they + * are not destroyed, but deleted from it's parent namespace, this function + * will be called in two places: + * 1. mdd_migrate_create() delete stripes from source, and append them to + * target. + * 2. mdd_dir_layout_shrink() delete stripes from source, and destroy them. + */ +static int lod_dir_layout_delete(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct lmv_user_md *lmu = buf->lb_buf; + __u32 final_stripe_count; + char *stripe_name = info->lti_key; + struct dt_object *dto; + int i; + int rc = 0; + + ENTRY; + + if (!lmu) + RETURN(-EINVAL); + + final_stripe_count = le32_to_cpu(lmu->lum_stripe_count); + if (final_stripe_count >= lo->ldo_dir_stripe_count) + RETURN(-EINVAL); + + for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + LASSERT(dto); + + rc = lod_sub_delete(env, dto, + (const struct dt_key *)dotdot, th); + if (rc) + break; + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + + rc = lod_sub_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc) + break; + + rc = lod_sub_ref_del(env, next, th); + if (rc) + break; + } + + lod_striping_free(env, lod_dt_obj(dt)); + + RETURN(rc); +} + +/** * Implementation of dt_object_operations::do_declare_xattr_set. * * Used with regular (non-striped) objects. Basically it @@ -2181,14 +2469,25 @@ lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env, return rc; } - filter_fid_le_to_cpu(ff, ff, sizeof(*ff)); - if (lu_fid_eq(lu_object_fid(&lo->ldo_obj.do_lu), &ff->ff_parent) && - ff->ff_layout.ol_comp_id == comp->llc_id) - return 0; + /* + * locd_buf is set if it's called by dir migration, which doesn't check + * pfid and comp id. + */ + if (data->locd_buf) { + memset(ff, 0, sizeof(*ff)); + ff->ff_parent = *(struct lu_fid *)data->locd_buf->lb_buf; + } else { + filter_fid_le_to_cpu(ff, ff, sizeof(*ff)); + + if (lu_fid_eq(lod_object_fid(lo), &ff->ff_parent) && + ff->ff_layout.ol_comp_id == comp->llc_id) + return 0; + + memset(ff, 0, sizeof(*ff)); + ff->ff_parent = *lu_object_fid(&lo->ldo_obj.do_lu); + } /* rewrite filter_fid */ - memset(ff, 0, sizeof(*ff)); - ff->ff_parent = *lu_object_fid(&lo->ldo_obj.do_lu); ff->ff_parent.f_ver = stripe_idx; ff->ff_layout.ol_stripe_size = comp->llc_stripe_size; ff->ff_layout.ol_stripe_count = comp->llc_stripe_count; @@ -2224,11 +2523,11 @@ lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env, */ static int lod_replace_parent_fid(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, struct thandle *th, bool declare) { struct lod_object *lo = lod_dt_obj(dt); struct lod_thread_info *info = lod_env_info(env); - struct lu_buf *buf = &info->lti_buf; struct filter_fid *ff; struct lod_obj_stripe_cb_data data = { { 0 } }; int rc; @@ -2250,11 +2549,9 @@ static int lod_replace_parent_fid(const struct lu_env *env, RETURN(rc); } - buf->lb_buf = info->lti_ea_store; - buf->lb_len = info->lti_ea_store_size; - data.locd_declare = declare; data.locd_stripe_cb = lod_obj_stripe_replace_parent_fid_cb; + data.locd_buf = buf; rc = lod_obj_for_each_stripe(env, lo, th, &data); RETURN(rc); @@ -2495,6 +2792,8 @@ static int lod_declare_layout_set(const struct lu_env *env, for (i = 0; i < comp_v1->lcm_entry_count; i++) { __u32 id = comp_v1->lcm_entries[i].lcme_id; __u32 flags = comp_v1->lcm_entries[i].lcme_flags; + __u32 mirror_flag = flags & LCME_MIRROR_FLAGS; + bool neg = flags & LCME_FL_NEG; if (flags & LCME_FL_INIT) { if (changed) @@ -2502,16 +2801,30 @@ static int lod_declare_layout_set(const struct lu_env *env, RETURN(-EINVAL); } + flags &= ~(LCME_MIRROR_FLAGS | LCME_FL_NEG); for (j = 0; j < lo->ldo_comp_cnt; j++) { lod_comp = &lo->ldo_comp_entries[j]; - if (id != lod_comp->llc_id) + + /* lfs only put one flag in each entry */ + if ((flags && id != lod_comp->llc_id) || + (mirror_flag && mirror_id_of(id) != + mirror_id_of(lod_comp->llc_id))) continue; - if (flags & LCME_FL_NEG) { - flags &= ~LCME_FL_NEG; - lod_comp->llc_flags &= ~flags; + if (neg) { + if (flags) + lod_comp->llc_flags &= ~flags; + if (mirror_flag) + lod_comp->llc_flags &= ~mirror_flag; } else { - lod_comp->llc_flags |= flags; + if (flags) + lod_comp->llc_flags |= flags; + if (mirror_flag) { + lod_comp->llc_flags |= mirror_flag; + if (mirror_flag & LCME_FL_NOSYNC) + lod_comp->llc_timestamp = + ktime_get_real_seconds(); + } } changed = true; } @@ -2886,8 +3199,12 @@ static int lod_declare_layout_merge(const struct lu_env *env, offset += le32_to_cpu(lcme->lcme_size); - if (mirror_count == 1) { - /* new mirrored file, create new mirror ID */ + if (mirror_count == 1 && + mirror_id_of(le32_to_cpu(lcme->lcme_id)) == 0) { + /* Add mirror from a non-flr file, create new mirror ID. + * Otherwise, keep existing mirror's component ID, used + * for mirror extension. + */ id = pflr_id(1, i + 1); lcme->lcme_id = cpu_to_le32(id); } @@ -3026,10 +3343,24 @@ static int lod_declare_xattr_set(const struct lu_env *env, RETURN(-ENOENT); rc = lod_declare_modify_layout(env, dt, name, buf, th); + } else if (strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 && + strlen(name) > strlen(XATTR_NAME_LMV) + 1) { + const char *op = name + strlen(XATTR_NAME_LMV) + 1; + + rc = -ENOTSUPP; + if (strcmp(op, "add") == 0) + rc = lod_dir_declare_layout_add(env, dt, buf, th); + else if (strcmp(op, "del") == 0) + rc = lod_dir_declare_layout_delete(env, dt, buf, th); + else if (strcmp(op, "set") == 0) + rc = lod_sub_declare_xattr_set(env, next, buf, + XATTR_NAME_LMV, fl, th); + + RETURN(rc); } else if (S_ISDIR(mode)) { rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th); } else if (strcmp(name, XATTR_NAME_FID) == 0) { - rc = lod_replace_parent_fid(env, dt, th, true); + rc = lod_replace_parent_fid(env, dt, buf, th, true); } else { rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th); } @@ -3163,6 +3494,7 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, lum = buf->lb_buf; switch (lum->lmm_magic) { + case LOV_USER_MAGIC_SPECIFIC: case LOV_USER_MAGIC_V3: v3 = buf->lb_buf; if (v3->lmm_pool_name[0] != '\0') @@ -3320,35 +3652,38 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, rec->rec_type = S_IFDIR; for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - struct dt_object *dto; - char *stripe_name = info->lti_key; - struct lu_name *sname; - struct linkea_data ldata = { NULL }; - struct lu_buf linkea_buf; - - dto = lo->ldo_stripe[i]; + struct dt_object *dto = lo->ldo_stripe[i]; + char *stripe_name = info->lti_key; + struct lu_name *sname; + struct linkea_data ldata = { NULL }; + struct lu_buf linkea_buf; + + /* if it's source stripe of migrating directory, don't create */ + if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) && + i >= lo->ldo_dir_migrate_offset)) { + dt_write_lock(env, dto, MOR_TGT_CHILD); + rc = lod_sub_create(env, dto, attr, NULL, dof, th); + if (rc != 0) { + dt_write_unlock(env, dto); + GOTO(out, rc); + } - dt_write_lock(env, dto, MOR_TGT_CHILD); - rc = lod_sub_create(env, dto, attr, NULL, dof, th); - if (rc != 0) { + rc = lod_sub_ref_add(env, dto, th); dt_write_unlock(env, dto); - GOTO(out, rc); - } - - rc = lod_sub_ref_add(env, dto, th); - dt_write_unlock(env, dto); - if (rc != 0) - GOTO(out, rc); + if (rc != 0) + GOTO(out, rc); - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_insert(env, dto, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th, 0); - if (rc != 0) - GOTO(out, rc); + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, th); + if (rc != 0) + GOTO(out, rc); + } rec->rec_fid = lu_object_fid(&dt->do_lu); rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, 0); + (const struct dt_key *)dotdot, th); if (rc != 0) GOTO(out, rc); @@ -3392,7 +3727,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, rec->rec_fid = lu_object_fid(&dto->do_lu); rc = lod_sub_insert(env, dt_object_child(dt), (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, th, 0); + (const struct dt_key *)stripe_name, th); if (rc != 0) GOTO(out, rc); @@ -3769,18 +4104,26 @@ static int lod_xattr_set(const struct lu_env *env, if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_LMV) == 0) { - struct lmv_mds_md_v1 *lmm = buf->lb_buf; + rc = lod_dir_striping_create(env, dt, NULL, NULL, th); + RETURN(rc); + } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 && + strlen(name) > strlen(XATTR_NAME_LMV) + 1) { + const char *op = name + strlen(XATTR_NAME_LMV) + 1; - if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) & - LMV_HASH_FLAG_MIGRATION) - rc = lod_sub_xattr_set(env, next, buf, name, fl, th); - else - rc = lod_dir_striping_create(env, dt, NULL, NULL, th); + rc = -ENOTSUPP; + /* + * XATTR_NAME_LMV".add" is never called, but only declared, + * because lod_xattr_set_lmv() will do the addition. + */ + if (strcmp(op, "del") == 0) + rc = lod_dir_layout_delete(env, dt, buf, th); + else if (strcmp(op, "set") == 0) + rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV, + fl, th); RETURN(rc); - } - - if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_LOV) == 0) { struct lod_thread_info *info = lod_env_info(env); struct lod_default_striping *lds = &info->lti_def_striping; @@ -3886,7 +4229,7 @@ static int lod_xattr_set(const struct lu_env *env, } RETURN(rc); } else if (strcmp(name, XATTR_NAME_FID) == 0) { - rc = lod_replace_parent_fid(env, dt, th, false); + rc = lod_replace_parent_fid(env, dt, buf, th, false); RETURN(rc); } @@ -3907,12 +4250,13 @@ static int lod_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) { - struct lod_object *lo = lod_dt_obj(dt); - int rc; - int i; + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + int i; + int rc; ENTRY; - rc = lod_sub_declare_xattr_del(env, dt_object_child(dt), name, th); + rc = lod_sub_declare_xattr_del(env, next, name, th); if (rc != 0) RETURN(rc); @@ -3928,9 +4272,10 @@ static int lod_declare_xattr_del(const struct lu_env *env, RETURN(0); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - LASSERT(lo->ldo_stripe[i]); - rc = lod_sub_declare_xattr_del(env, lo->ldo_stripe[i], - name, th); + struct dt_object *dto = lo->ldo_stripe[i]; + + LASSERT(dto); + rc = lod_sub_declare_xattr_del(env, dto, name, th); if (rc != 0) break; } @@ -3955,7 +4300,7 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, int i; ENTRY; - if (!strcmp(name, XATTR_NAME_LOV)) + if (!strcmp(name, XATTR_NAME_LOV) || !strcmp(name, XATTR_NAME_LMV)) lod_striping_free(env, lod_dt_obj(dt)); rc = lod_sub_xattr_del(env, next, name, th); @@ -3966,9 +4311,11 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, RETURN(0); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - LASSERT(lo->ldo_stripe[i]); + struct dt_object *dto = lo->ldo_stripe[i]; - rc = lod_sub_xattr_del(env, lo->ldo_stripe[i], name, th); + LASSERT(dto); + + rc = lod_sub_xattr_del(env, dto, name, th); if (rc != 0) break; } @@ -3993,6 +4340,52 @@ static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fi return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE); } +/** + * Copy OST list from layout provided by user. + * + * \param[in] lod_comp layout_component to be filled + * \param[in] v3 LOV EA V3 user data + * + * \retval 0 on success + * \retval negative if failed + */ +int lod_comp_copy_ost_lists(struct lod_layout_component *lod_comp, + struct lov_user_md_v3 *v3) +{ + int j; + + ENTRY; + + if (v3->lmm_stripe_offset == LOV_OFFSET_DEFAULT) + v3->lmm_stripe_offset = v3->lmm_objects[0].l_ost_idx; + + if (lod_comp->llc_ostlist.op_array) { + if (lod_comp->llc_ostlist.op_size >= + v3->lmm_stripe_count * sizeof(__u32)) { + lod_comp->llc_ostlist.op_count = + v3->lmm_stripe_count; + goto skip; + } + OBD_FREE(lod_comp->llc_ostlist.op_array, + lod_comp->llc_ostlist.op_size); + } + + /* copy ost list from lmm */ + lod_comp->llc_ostlist.op_count = v3->lmm_stripe_count; + lod_comp->llc_ostlist.op_size = v3->lmm_stripe_count * sizeof(__u32); + OBD_ALLOC(lod_comp->llc_ostlist.op_array, + lod_comp->llc_ostlist.op_size); + if (!lod_comp->llc_ostlist.op_array) + RETURN(-ENOMEM); +skip: + for (j = 0; j < v3->lmm_stripe_count; j++) { + lod_comp->llc_ostlist.op_array[j] = + v3->lmm_objects[j].l_ost_idx; + } + + RETURN(0); +} + /** * Get default striping. @@ -4015,7 +4408,7 @@ static int lod_get_default_lov_striping(const struct lu_env *env, __u16 comp_cnt; __u16 mirror_cnt; bool composite; - int rc, i; + int rc, i, j; ENTRY; lds->lds_def_striping_set = 0; @@ -4033,13 +4426,19 @@ static int lod_get_default_lov_striping(const struct lu_env *env, } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) { v3 = (struct lov_user_md_v3 *)v1; lustre_swab_lov_user_md_v3(v3); + } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_SPECIFIC)) { + v3 = (struct lov_user_md_v3 *)v1; + lustre_swab_lov_user_md_v3(v3); + lustre_swab_lov_user_md_objects(v3->lmm_objects, + v3->lmm_stripe_count); } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_COMP_V1)) { comp_v1 = (struct lov_comp_md_v1 *)v1; lustre_swab_lov_comp_md_v1(comp_v1); } if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1 && - v1->lmm_magic != LOV_MAGIC_COMP_V1) + v1->lmm_magic != LOV_MAGIC_COMP_V1 && + v1->lmm_magic != LOV_USER_MAGIC_SPECIFIC) RETURN(-ENOTSUPP); if (v1->lmm_magic == LOV_MAGIC_COMP_V1) { @@ -4066,7 +4465,6 @@ static int lod_get_default_lov_striping(const struct lu_env *env, for (i = 0; i < comp_cnt; i++) { struct lod_layout_component *lod_comp; - struct lu_extent *ext; char *pool; lod_comp = &lds->lds_def_comp_entries[i]; @@ -4080,8 +4478,12 @@ static int lod_get_default_lov_striping(const struct lu_env *env, if (composite) { v1 = (struct lov_user_md *)((char *)comp_v1 + comp_v1->lcm_entries[i].lcme_offset); - ext = &comp_v1->lcm_entries[i].lcme_extent; - lod_comp->llc_extent = *ext; + lod_comp->llc_extent = + comp_v1->lcm_entries[i].lcme_extent; + /* We only inherit certain flags from the layout */ + lod_comp->llc_flags = + comp_v1->lcm_entries[i].lcme_flags & + LCME_TEMPLATE_FLAGS; } if (v1->lmm_pattern != LOV_PATTERN_RAID0 && @@ -4110,6 +4512,17 @@ static int lod_get_default_lov_striping(const struct lu_env *env, pool = v3->lmm_pool_name; } lod_set_def_pool(lds, i, pool); + if (v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) { + v3 = (struct lov_user_md_v3 *)v1; + rc = lod_comp_copy_ost_lists(lod_comp, v3); + if (rc) + RETURN(rc); + } else if (lod_comp->llc_ostlist.op_array && + lod_comp->llc_ostlist.op_count) { + for (j = 0; j < lod_comp->llc_ostlist.op_count; j++) + lod_comp->llc_ostlist.op_array[j] = -1; + lod_comp->llc_ostlist.op_count = 0; + } } lds->lds_def_striping_set = 1; @@ -4213,8 +4626,9 @@ static void lod_striping_from_default(struct lod_object *lo, struct lod_layout_component *def_comp = &lds->lds_def_comp_entries[i]; - CDEBUG(D_LAYOUT, "Inherite from default: size:%hu " - "nr:%u offset:%u pattern %#x %s\n", + CDEBUG(D_LAYOUT, "Inherit from default: flags=%#x " + "size=%hu nr=%u offset=%u pattern=%#x pool=%s\n", + def_comp->llc_flags, def_comp->llc_stripe_size, def_comp->llc_stripe_count, def_comp->llc_stripe_offset, @@ -4228,6 +4642,20 @@ static void lod_striping_from_default(struct lod_object *lo, lod_obj_set_pool(lo, i, def_comp->llc_pool); } + /* copy ost list */ + if (def_comp->llc_ostlist.op_array && + def_comp->llc_ostlist.op_count) { + OBD_ALLOC(obj_comp->llc_ostlist.op_array, + obj_comp->llc_ostlist.op_size); + if (!obj_comp->llc_ostlist.op_array) + return; + memcpy(obj_comp->llc_ostlist.op_array, + def_comp->llc_ostlist.op_array, + obj_comp->llc_ostlist.op_size); + } else if (def_comp->llc_ostlist.op_array) { + obj_comp->llc_ostlist.op_array = NULL; + } + /* * Don't initialize these fields for plain layout * (v1/v3) here, they are inherited in the order of @@ -4365,19 +4793,19 @@ static void lod_ah_init(const struct lu_env *env, } else { /* transfer defaults LMV to new directory */ lod_striping_from_default(lc, lds, child_mode); + + /* set count 0 to create normal directory */ + if (lc->ldo_dir_stripe_count == 1) + lc->ldo_dir_stripe_count = 0; } /* shrink the stripe_count to the avaible MDT count */ if (lc->ldo_dir_stripe_count > d->lod_remote_mdt_count + 1 && - !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) + !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) { lc->ldo_dir_stripe_count = d->lod_remote_mdt_count + 1; - - /* Directory will be striped only if stripe_count > 1, if - * stripe_count == 1, let's reset stripe_count = 0 to avoid - * create single master stripe and also help to unify the - * stripe handling of directories and files */ - if (lc->ldo_dir_stripe_count == 1) - lc->ldo_dir_stripe_count = 0; + if (lc->ldo_dir_stripe_count == 1) + lc->ldo_dir_stripe_count = 0; + } CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n", lc->ldo_dir_stripe_count, @@ -4827,16 +5255,28 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL); - mirror_id = lo->ldo_mirror_count > 1 ? 1 : 0; + mirror_id = 0; /* non-flr file's mirror_id is 0 */ + if (lo->ldo_mirror_count > 1) { + for (i = 0; i < lo->ldo_comp_cnt; i++) { + lod_comp = &lo->ldo_comp_entries[i]; + if (lod_comp->llc_id != LCME_ID_INVAL && + mirror_id_of(lod_comp->llc_id) > mirror_id) + mirror_id = mirror_id_of(lod_comp->llc_id); + } + } /* create all underlying objects */ for (i = 0; i < lo->ldo_comp_cnt; i++) { lod_comp = &lo->ldo_comp_entries[i]; - if (lod_comp->llc_extent.e_start == 0 && i > 0) /* new mirror */ - ++mirror_id; - if (lod_comp->llc_id == LCME_ID_INVAL) { + /* only the component of FLR layout with more than 1 + * mirror has mirror ID in its component ID. + */ + if (lod_comp->llc_extent.e_start == 0 && + lo->ldo_mirror_count > 1) + ++mirror_id; + lod_comp->llc_id = lod_gen_component_id(lo, mirror_id, i); if (lod_comp->llc_id == LCME_ID_INVAL) @@ -4882,6 +5322,23 @@ out: RETURN(rc); } +static inline bool lod_obj_is_dom(struct dt_object *dt) +{ + struct lod_object *lo = lod_dt_obj(dt); + + if (!dt_object_exists(dt_object_child(dt))) + return false; + + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) + return false; + + if (!lo->ldo_comp_cnt) + return false; + + return (lov_pattern(lo->ldo_comp_entries[0].llc_pattern) == + LOV_PATTERN_MDT); +} + /** * Implementation of dt_object_operations::do_create. * @@ -4904,7 +5361,8 @@ static int lod_create(const struct lu_env *env, struct dt_object *dt, RETURN(rc); if (S_ISREG(dt->do_lu.lo_header->loh_attr) && - lod_obj_is_striped(dt) && dof->u.dof_reg.striped != 0) { + (lod_obj_is_striped(dt) || lod_obj_is_dom(dt)) && + dof->u.dof_reg.striped != 0) { LASSERT(lod_dt_obj(dt)->ldo_comp_cached == 0); rc = lod_striped_create(env, dt, attr, dof, th); } @@ -5188,7 +5646,6 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt, RETURN(0); LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr)); - LASSERT(lo->ldo_dir_stripe_count > 1); /* Note: for remote lock for single stripe dir, MDT will cancel * the lock by lockh directly */ LASSERT(!dt_object_remote(dt_object_child(dt))); @@ -5285,7 +5742,7 @@ static int lod_object_lock(const struct lu_env *env, dlmflags |= LDLM_FL_COS_INCOMPAT; LASSERT(ns != NULL); - rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, + rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_IBITS, policy, einfo->ei_mode, &dlmflags, blocking, completion, NULL, @@ -5346,22 +5803,15 @@ static int lod_declare_instantiate_components(const struct lu_env *env, struct lod_object *lo, struct thandle *th) { struct lod_thread_info *info = lod_env_info(env); - struct ost_pool *inuse = &info->lti_inuse_osts; int i; int rc = 0; ENTRY; LASSERT(info->lti_count < lo->ldo_comp_cnt); - if (info->lti_count > 0) { - /* Prepare inuse array for composite file */ - rc = lod_prepare_inuse(env, lo); - if (rc) - RETURN(rc); - } for (i = 0; i < info->lti_count; i++) { rc = lod_qos_prep_create(env, lo, NULL, th, - info->lti_comp_idx[i], inuse); + info->lti_comp_idx[i]); if (rc) break; } @@ -5410,6 +5860,7 @@ static int lod_declare_update_plain(const struct lu_env *env, rc = lod_use_defined_striping(env, lo, buf); if (rc) GOTO(out, rc); + lo->ldo_comp_cached = 1; rc = lod_get_lov_ea(env, lo); if (rc <= 0) @@ -5684,6 +6135,30 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, RETURN(picked); } +static int lod_prepare_resync_mirror(const struct lu_env *env, + struct lod_object *lo, + __u16 mirror_id) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_layout_component *lod_comp; + int i; + + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (lo->ldo_mirrors[i].lme_id != mirror_id) + continue; + + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (lod_comp_inited(lod_comp)) + continue; + + info->lti_comp_idx[info->lti_count++] = + lod_comp_index(lo, lod_comp); + } + } + + return 0; +} + /** * figure out the components should be instantiated for resync. */ @@ -5799,23 +6274,33 @@ static int lod_declare_update_rdonly(const struct lu_env *env, * prep uninited all components assuming any non-stale mirror * could be picked as the primary mirror. */ - for (i = 0; i < lo->ldo_mirror_count; i++) { - if (lo->ldo_mirrors[i].lme_stale) - continue; + if (mlc->mlc_mirror_id == 0) { + /* normal resync */ + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (lo->ldo_mirrors[i].lme_stale) + continue; - lod_foreach_mirror_comp(lod_comp, lo, i) { - if (!lod_comp_inited(lod_comp)) - break; + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (!lod_comp_inited(lod_comp)) + break; - if (extent.e_end < lod_comp->llc_extent.e_end) - extent.e_end = - lod_comp->llc_extent.e_end; + if (extent.e_end < + lod_comp->llc_extent.e_end) + extent.e_end = + lod_comp->llc_extent.e_end; + } } + rc = lod_prepare_resync(env, lo, &extent); + if (rc) + GOTO(out, rc); + } else { + /* mirror write, try to init its all components */ + rc = lod_prepare_resync_mirror(env, lo, + mlc->mlc_mirror_id); + if (rc) + GOTO(out, rc); } - rc = lod_prepare_resync(env, lo, &extent); - if (rc) - GOTO(out, rc); /* change the file state to SYNC_PENDING */ lo->ldo_flr_state = LCM_FL_SYNC_PENDING; } @@ -5939,16 +6424,26 @@ static int lod_declare_update_write_pending(const struct lu_env *env, lod_comp_index(lo, lod_comp); } } else { /* MD_LAYOUT_RESYNC */ - lod_foreach_mirror_comp(lod_comp, lo, primary) { - if (!lod_comp_inited(lod_comp)) - break; + if (mlc->mlc_mirror_id == 0) { + /* normal resync */ + lod_foreach_mirror_comp(lod_comp, lo, primary) { + if (!lod_comp_inited(lod_comp)) + break; + + extent.e_end = lod_comp->llc_extent.e_end; + } - extent.e_end = lod_comp->llc_extent.e_end; + rc = lod_prepare_resync(env, lo, &extent); + if (rc) + GOTO(out, rc); + } else { + /* mirror write, try to init its all components */ + rc = lod_prepare_resync_mirror(env, lo, + mlc->mlc_mirror_id); + if (rc) + GOTO(out, rc); } - rc = lod_prepare_resync(env, lo, &extent); - if (rc) - GOTO(out, rc); /* change the file state to SYNC_PENDING */ lo->ldo_flr_state = LCM_FL_SYNC_PENDING; } @@ -6199,11 +6694,11 @@ static ssize_t lod_declare_write(const struct lu_env *env, */ static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, - struct thandle *th, int iq) + struct thandle *th) { LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr) || S_ISLNK(dt->do_lu.lo_header->loh_attr)); - return lod_sub_write(env, dt_object_child(dt), buf, pos, th, iq); + return lod_sub_write(env, dt_object_child(dt), buf, pos, th); } static int lod_declare_punch(const struct lu_env *env, struct dt_object *dt,