X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=cb15551fdcec1c15cbaadf829372dcbbcf15524a;hp=fc9802ca271e8d22f52906180db9579509540894;hb=b78fb445555916e380b1661546c821df14098596;hpb=34e7d46234e5c957e1e815c5267b13fe610a9d8d;ds=sidebyside diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index fc9802c..cb15551 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -23,7 +23,7 @@ * Copyright 2009 Sun Microsystems, Inc. All rights reserved * Use is subject to license terms. * - * Copyright (c) 2012, 2014, Intel Corporation. + * Copyright (c) 2012, 2016, Intel Corporation. */ /* * lustre/lod/lod_object.c @@ -33,7 +33,7 @@ * local OSD object interface to the MDD layer, and abstracts the * addressing of local (OSD) and remote (OSP) objects. The API is * described in the file lustre/include/dt_object.h and in - * lustre/doc/osd-api.txt. + * Documentation/osd-api.txt. * * Author: Alex Zhuravlev */ @@ -42,16 +42,16 @@ #include #include -#include #include -#include #include -#include -#include +#include #include +#include +#include +#include +#include #include -#include #include "lod_internal.h" @@ -59,6 +59,7 @@ static const char dot[] = "."; static const char dotdot[] = ".."; static const struct dt_body_operations lod_body_lnk_ops; +static const struct dt_body_operations lod_body_ops; /** * Implementation of dt_index_operations::dio_lookup @@ -68,11 +69,10 @@ static const struct dt_body_operations lod_body_lnk_ops; * \see dt_index_operations::dio_lookup() in the API description for details. */ static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) + struct dt_rec *rec, const struct dt_key *key) { struct dt_object *next = dt_object_child(dt); - return next->do_index_ops->dio_lookup(env, next, rec, key, capa); + return next->do_index_ops->dio_lookup(env, next, rec, key); } /** @@ -87,9 +87,10 @@ static int lod_declare_index_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, - struct thandle *handle) + struct thandle *th) { - return dt_declare_insert(env, dt_object_child(dt), rec, key, handle); + return lod_sub_object_declare_insert(env, dt_object_child(dt), + rec, key, th); } /** @@ -104,10 +105,10 @@ static int lod_index_insert(const struct lu_env *env, const struct dt_rec *rec, const struct dt_key *key, struct thandle *th, - struct lustre_capa *capa, int ign) { - return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign); + return lod_sub_object_index_insert(env, dt_object_child(dt), rec, key, + th, ign); } /** @@ -123,7 +124,8 @@ static int lod_declare_index_delete(const struct lu_env *env, const struct dt_key *key, struct thandle *th) { - return dt_declare_delete(env, dt_object_child(dt), key, th); + return lod_sub_object_declare_delete(env, dt_object_child(dt), key, + th); } /** @@ -136,10 +138,9 @@ static int lod_declare_index_delete(const struct lu_env *env, static int lod_index_delete(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, - struct thandle *th, - struct lustre_capa *capa) + struct thandle *th) { - return dt_delete(env, dt_object_child(dt), key, th, capa); + return lod_sub_object_delete(env, dt_object_child(dt), key, th); } /** @@ -150,15 +151,13 @@ static int lod_index_delete(const struct lu_env *env, * \see dt_it_ops::init() in the API description for details. */ static struct dt_it *lod_it_init(const struct lu_env *env, - struct dt_object *dt, __u32 attr, - struct lustre_capa *capa) + struct dt_object *dt, __u32 attr) { struct dt_object *next = dt_object_child(dt); struct lod_it *it = &lod_env_info(env)->lti_it; struct dt_it *it_next; - - it_next = next->do_index_ops->dio_it.init(env, next, attr, capa); + it_next = next->do_index_ops->dio_it.init(env, next, attr); if (IS_ERR(it_next)) return it_next; @@ -389,8 +388,7 @@ static struct dt_index_operations lod_index_ops = { * \see dt_it_ops::init() in the API description for details. */ static struct dt_it *lod_striped_it_init(const struct lu_env *env, - struct dt_object *dt, __u32 attr, - struct lustre_capa *capa) + struct dt_object *dt, __u32 attr) { struct lod_object *lo = lod_dt_obj(dt); struct dt_object *next; @@ -403,7 +401,7 @@ static struct dt_it *lod_striped_it_init(const struct lu_env *env, LASSERT(next != NULL); LASSERT(next->do_index_ops != NULL); - it_next = next->do_index_ops->dio_it.init(env, next, attr, capa); + it_next = next->do_index_ops->dio_it.init(env, next, attr); if (IS_ERR(it_next)) return it_next; @@ -442,13 +440,17 @@ static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di) struct lod_object *lo = lod_dt_obj(it->lit_obj); struct dt_object *next; - LOD_CHECK_STRIPED_IT(env, it, lo); + /* If lit_it == NULL, then it means the sub_it has been finished, + * which only happens in failure cases, see lod_striped_it_next() */ + if (it->lit_it != NULL) { + LOD_CHECK_STRIPED_IT(env, it, lo); - next = lo->ldo_stripe[it->lit_stripe_index]; - LASSERT(next != NULL); - LASSERT(next->do_index_ops != NULL); + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); + LASSERT(next->do_index_ops != NULL); - next->do_index_ops->dio_it.fini(env, it->lit_it); + next->do_index_ops->dio_it.fini(env, it->lit_it); + } /* the iterator not in use any more */ it->lit_obj = NULL; @@ -565,17 +567,17 @@ again: next->do_index_ops->dio_it.put(env, it->lit_it); next->do_index_ops->dio_it.fini(env, it->lit_it); + it->lit_it = NULL; + next = lo->ldo_stripe[it->lit_stripe_index]; + LASSERT(next != NULL); rc = next->do_ops->do_index_try(env, next, &dt_directory_features); if (rc != 0) RETURN(rc); - next = lo->ldo_stripe[it->lit_stripe_index]; - LASSERT(next != NULL); LASSERT(next->do_index_ops != NULL); - it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr, - BYPASS_CAPA); + it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr); if (!IS_ERR(it_next)) { it->lit_it = it_next; goto again; @@ -825,7 +827,7 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid)); iops = &obj->do_index_ops->dio_it; - it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA); + it = iops->init(env, obj, LUDA_64BITHASH); if (IS_ERR(it)) RETURN(PTR_ERR(it)); @@ -857,7 +859,8 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, goto next; } - len = snprintf(name, FID_LEN + 1, DFID":", PFID(&ent->lde_fid)); + len = snprintf(name, sizeof(name), + DFID":", PFID(&ent->lde_fid)); /* The ent->lde_name is composed of ${FID}:${index} */ if (ent->lde_namelen < len + 1 || memcmp(ent->lde_name, name, len) != 0) { @@ -1052,85 +1055,13 @@ static int lod_object_write_locked(const struct lu_env *env, */ static int lod_attr_get(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, - struct lustre_capa *capa) + struct lu_attr *attr) { /* Note: for striped directory, client will merge attributes * from all of the sub-stripes see lmv_merge_attr(), and there * no MDD logic depend on directory nlink/size/time, so we can * always use master inode nlink and size for now. */ - return dt_attr_get(env, dt_object_child(dt), attr, capa); -} - -/** - * Mark all of the striped directory sub-stripes dead. - * - * When a striped object is a subject to removal, we have - * to mark all the stripes to prevent further access to - * them (e.g. create a new file in those). So we mark - * all the stripes with LMV_HASH_FLAG_DEAD. The function - * can be used to declare the changes and to apply them. - * If the object isn't striped, then just return success. - * - * \param[in] env execution environment - * \param[in] dt the striped object - * \param[in] handle transaction handle - * \param[in] declare whether to declare the change or apply - * - * \retval 0 on success - * \retval negative if failed - **/ -static int lod_mark_dead_object(const struct lu_env *env, - struct dt_object *dt, - struct thandle *handle, - bool declare) -{ - struct lod_object *lo = lod_dt_obj(dt); - struct lmv_mds_md_v1 *lmv; - __u32 dead_hash_type; - int rc; - int i; - - ENTRY; - - if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) - RETURN(0); - - rc = lod_load_striping_locked(env, lo); - if (rc != 0) - RETURN(rc); - - if (lo->ldo_stripenr == 0) - RETURN(0); - - rc = lod_get_lmv_ea(env, lo); - if (rc <= 0) - RETURN(rc); - - lmv = lod_env_info(env)->lti_ea_store; - lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE); - dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD; - lmv->lmv_hash_type = cpu_to_le32(dead_hash_type); - for (i = 0; i < lo->ldo_stripenr; i++) { - struct lu_buf buf; - - lmv->lmv_master_mdt_index = i; - buf.lb_buf = lmv; - buf.lb_len = sizeof(*lmv); - if (declare) { - rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf, - XATTR_NAME_LMV, - LU_XATTR_REPLACE, handle); - } else { - rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf, - XATTR_NAME_LMV, LU_XATTR_REPLACE, - handle, BYPASS_CAPA); - } - if (rc != 0) - break; - } - - RETURN(rc); + return dt_attr_get(env, dt_object_child(dt), attr); } /** @@ -1144,24 +1075,17 @@ static int lod_mark_dead_object(const struct lu_env *env, static int lod_declare_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, - struct thandle *handle) + struct thandle *th) { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); int rc, i; ENTRY; - /* Set dead object on all other stripes */ - if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) && - attr->la_flags & LUSTRE_SLAVE_DEAD_FL) { - rc = lod_mark_dead_object(env, dt, handle, true); - RETURN(rc); - } - /* * declare setattr on the local object */ - rc = dt_declare_attr_set(env, next, attr, handle); + rc = lod_sub_object_declare_attr_set(env, next, attr, th); if (rc) RETURN(rc); @@ -1180,7 +1104,8 @@ static int lod_declare_attr_set(const struct lu_env *env, RETURN(0); } else { if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE | - LA_ATIME | LA_MTIME | LA_CTIME))) + LA_ATIME | LA_MTIME | LA_CTIME | + LA_FLAGS))) RETURN(rc); } /* @@ -1200,20 +1125,20 @@ static int lod_declare_attr_set(const struct lu_env *env, */ LASSERT(lo->ldo_stripe); for (i = 0; i < lo->ldo_stripenr; i++) { - if (likely(lo->ldo_stripe[i] != NULL)) { - rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, - handle); - if (rc != 0) { - CERROR("failed declaration: %d\n", rc); - break; - } - } + if (lo->ldo_stripe[i] == NULL) + continue; + rc = lod_sub_object_declare_attr_set(env, + lo->ldo_stripe[i], attr, + th); + if (rc != 0) + RETURN(rc); } if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) && dt_object_exists(next) != 0 && dt_object_remote(next) == 0) - dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle); + lod_sub_object_declare_xattr_del(env, next, + XATTR_NAME_LOV, th); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) && dt_object_exists(next) && @@ -1223,8 +1148,9 @@ static int lod_declare_attr_set(const struct lu_env *env, buf->lb_buf = info->lti_ea_store; buf->lb_len = info->lti_ea_store_size; - dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV, - LU_XATTR_REPLACE, handle); + lod_sub_object_declare_xattr_set(env, next, buf, + XATTR_NAME_LOV, + LU_XATTR_REPLACE, th); } RETURN(rc); @@ -1241,25 +1167,17 @@ static int lod_declare_attr_set(const struct lu_env *env, static int lod_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, - struct thandle *handle, - struct lustre_capa *capa) + struct thandle *th) { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); int rc, i; ENTRY; - /* Set dead object on all other stripes */ - if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) && - attr->la_flags & LUSTRE_SLAVE_DEAD_FL) { - rc = lod_mark_dead_object(env, dt, handle, false); - RETURN(rc); - } - /* * apply changes to the local object */ - rc = dt_attr_set(env, next, attr, handle, capa); + rc = lod_sub_object_attr_set(env, next, attr, th); if (rc) RETURN(rc); @@ -1271,7 +1189,8 @@ static int lod_attr_set(const struct lu_env *env, RETURN(0); } else { if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE | - LA_ATIME | LA_MTIME | LA_CTIME))) + LA_ATIME | LA_MTIME | LA_CTIME | + LA_FLAGS))) RETURN(rc); } @@ -1285,21 +1204,20 @@ static int lod_attr_set(const struct lu_env *env, for (i = 0; i < lo->ldo_stripenr; i++) { if (unlikely(lo->ldo_stripe[i] == NULL)) continue; + if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && (dt_object_exists(lo->ldo_stripe[i]) == 0)) continue; - rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa); - if (rc != 0) { - CERROR("failed declaration: %d\n", rc); + rc = lod_sub_object_attr_set(env, lo->ldo_stripe[i], attr, th); + if (rc != 0) break; - } } if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) && dt_object_exists(next) != 0 && dt_object_remote(next) == 0) - dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA); + rc = lod_sub_object_xattr_del(env, next, XATTR_NAME_LOV, th); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) && dt_object_exists(next) && @@ -1330,8 +1248,9 @@ static int lod_attr_set(const struct lu_env *env, fid->f_oid--; fid_to_ostid(fid, oi); ostid_cpu_to_le(oi, &objs->l_ost_oi); - dt_xattr_set(env, next, buf, XATTR_NAME_LOV, - LU_XATTR_REPLACE, handle, BYPASS_CAPA); + + rc = lod_sub_object_xattr_set(env, next, buf, XATTR_NAME_LOV, + LU_XATTR_REPLACE, th); } RETURN(rc); @@ -1346,15 +1265,14 @@ static int lod_attr_set(const struct lu_env *env, * \see dt_object_operations::do_xattr_get() in the API description for details. */ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, const char *name, - struct lustre_capa *capa) + struct lu_buf *buf, const char *name) { struct lod_thread_info *info = lod_env_info(env); struct lod_device *dev = lu2lod_dev(dt->do_lu.lo_dev); int rc, is_root; ENTRY; - rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa); + rc = dt_xattr_get(env, dt_object_child(dt), buf, name); if (strcmp(name, XATTR_NAME_LMV) == 0) { struct lmv_mds_md_v1 *lmv1; int rc1 = 0; @@ -1371,7 +1289,7 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, info->lti_buf.lb_buf = info->lti_key; info->lti_buf.lb_len = sizeof(*lmv1); rc = dt_xattr_get(env, dt_object_child(dt), - &info->lti_buf, name, capa); + &info->lti_buf, name); if (unlikely(rc != sizeof(*lmv1))) RETURN(rc = rc > 0 ? -EINVAL : rc); @@ -1619,25 +1537,154 @@ out: } /** - * Create a striped directory. + * Declare create a striped directory. * - * Create a striped directory with a given stripe pattern on the specified MDTs. - * A striped directory is represented as a regular directory - an index listing - * all the stripes. The stripes point back to the master object with ".." and - * LinkEA. The master object gets LMV EA which identifies it as a striped - * directory. The function allocates FIDs for all the stripes. + * Declare creating a striped directory with a given stripe pattern on the + * specified MDTs. A striped directory is represented as a regular directory + * - an index listing all the stripes. The stripes point back to the master + * object with ".." and LinkEA. The master object gets LMV EA which + * identifies it as a striped directory. The function allocates FIDs + * for all stripes. * * \param[in] env execution environment * \param[in] dt object * \param[in] attr attributes to initialize the objects with - * \param[in] lum a pattern specifying the number of stripes and - * MDT to start from * \param[in] dof type of objects to be created * \param[in] th transaction handle * * \retval 0 on success * \retval negative if failed */ +static int lod_dir_declare_create_stripes(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_object_format *dof, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_buf lmv_buf; + struct lu_buf slave_lmv_buf; + struct lmv_mds_md_v1 *lmm; + struct lmv_mds_md_v1 *slave_lmm = NULL; + struct dt_insert_rec *rec = &info->lti_dt_rec; + struct lod_object *lo = lod_dt_obj(dt); + int rc; + __u32 i; + ENTRY; + + rc = lod_prep_lmv_md(env, dt, &lmv_buf); + if (rc != 0) + GOTO(out, rc); + lmm = lmv_buf.lb_buf; + + OBD_ALLOC_PTR(slave_lmm); + if (slave_lmm == NULL) + GOTO(out, rc = -ENOMEM); + + lod_prep_slave_lmv_md(slave_lmm, lmm); + slave_lmv_buf.lb_buf = slave_lmm; + slave_lmv_buf.lb_len = sizeof(*slave_lmm); + + if (!dt_try_as_dir(env, dt_object_child(dt))) + GOTO(out, rc = -EINVAL); + + rec->rec_type = S_IFDIR; + for (i = 0; i < lo->ldo_stripenr; i++) { + struct dt_object *dto = lo->ldo_stripe[i]; + char *stripe_name = info->lti_key; + struct lu_name *sname; + struct linkea_data ldata = { NULL }; + struct lu_buf linkea_buf; + + rc = lod_sub_object_declare_create(env, dto, attr, NULL, + dof, th); + if (rc != 0) + GOTO(out, rc); + + if (!dt_try_as_dir(env, dto)) + GOTO(out, rc = -EINVAL); + + rc = lod_sub_object_declare_ref_add(env, dto, th); + if (rc != 0) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_object_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, th); + if (rc != 0) + GOTO(out, rc); + + /* master stripe FID will be put to .. */ + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = lod_sub_object_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, + th); + if (rc != 0) + GOTO(out, rc); + + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || + cfs_fail_val != i) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && + cfs_fail_val == i) + slave_lmm->lmv_master_mdt_index = + cpu_to_le32(i + 1); + else + slave_lmm->lmv_master_mdt_index = + cpu_to_le32(i); + rc = lod_sub_object_declare_xattr_set(env, dto, + &slave_lmv_buf, XATTR_NAME_LMV, 0, th); + if (rc != 0) + GOTO(out, rc); + } + + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && + cfs_fail_val == i) + snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), i + 1); + else + snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), i); + + sname = lod_name_get(env, stripe_name, strlen(stripe_name)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); + if (rc != 0) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_object_declare_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_object_declare_insert(env, dt_object_child(dt), + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, + th); + if (rc != 0) + GOTO(out, rc); + + rc = lod_sub_object_declare_ref_add(env, dt_object_child(dt), + th); + if (rc != 0) + GOTO(out, rc); + } + + rc = lod_sub_object_declare_xattr_set(env, dt_object_child(dt), + &lmv_buf, XATTR_NAME_LMV, 0, th); + if (rc != 0) + GOTO(out, rc); +out: + if (slave_lmm != NULL) + OBD_FREE_PTR(slave_lmm); + + RETURN(rc); +} + static int lod_prep_md_striped_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, @@ -1648,15 +1695,10 @@ static int lod_prep_md_striped_create(const struct lu_env *env, struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; struct lod_object *lo = lod_dt_obj(dt); - struct lod_thread_info *info = lod_env_info(env); struct dt_object **stripe; - struct lu_buf lmv_buf; - struct lu_buf slave_lmv_buf; - struct lmv_mds_md_v1 *lmm; - struct lmv_mds_md_v1 *slave_lmm = NULL; - struct dt_insert_rec *rec = &info->lti_dt_rec; __u32 stripe_count; int *idx_array; + __u32 master_index; int rc = 0; __u32 i; __u32 j; @@ -1668,18 +1710,17 @@ static int lod_prep_md_striped_create(const struct lu_env *env, stripe_count = le32_to_cpu(lum->lum_stripe_count); - /* shrink the stripe_count to the avaible MDT count */ - if (stripe_count > lod->lod_remote_mdt_count + 1) - stripe_count = lod->lod_remote_mdt_count + 1; + OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count); + if (idx_array == NULL) + RETURN(-ENOMEM); OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count); if (stripe == NULL) - RETURN(-ENOMEM); - - OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count); - if (idx_array == NULL) GOTO(out_free, rc = -ENOMEM); + /* Start index will be the master MDT */ + master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; + idx_array[0] = master_index; for (i = 0; i < stripe_count; i++) { struct lod_tgt_desc *tgt = NULL; struct dt_object *dto; @@ -1688,44 +1729,42 @@ static int lod_prep_md_striped_create(const struct lu_env *env, struct lu_object_conf conf = { 0 }; struct dt_device *tgt_dt = NULL; - if (i == 0) { - /* Right now, master stripe and master object are - * on the same MDT */ - idx = le32_to_cpu(lum->lum_stripe_offset); - rc = obd_fid_alloc(env, lod->lod_child_exp, &fid, - NULL); - if (rc < 0) - GOTO(out_put, rc); - tgt_dt = lod->lod_child; - goto next; - } - - idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1); - + /* Try to find next avaible target */ + idx = idx_array[i]; for (j = 0; j < lod->lod_remote_mdt_count; j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) { bool already_allocated = false; __u32 k; - CDEBUG(D_INFO, "try idx %d, mdt cnt %u," - " allocated %u, last allocated %d\n", idx, - lod->lod_remote_mdt_count, i, idx_array[i - 1]); + CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n", + idx, lod->lod_remote_mdt_count + 1, i); + if (idx == master_index) { + /* Allocate the FID locally */ + rc = obd_fid_alloc(env, lod->lod_child_exp, + &fid, NULL); + if (rc < 0) + GOTO(out_put, rc); + tgt_dt = lod->lod_child; + break; + } /* Find next available target */ if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) continue; - /* check whether the idx already exists - * in current allocated array */ - for (k = 0; k < i; k++) { - if (idx_array[k] == idx) { - already_allocated = true; - break; + if (likely(!OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) { + /* check whether the idx already exists + * in current allocated array */ + for (k = 0; k < i; k++) { + if (idx_array[k] == idx) { + already_allocated = true; + break; + } } - } - if (already_allocated) - continue; + if (already_allocated) + continue; + } /* check the status of the OSP */ tgt = LTD_TGT(ltd, idx); @@ -1756,11 +1795,13 @@ static int lod_prep_md_striped_create(const struct lu_env *env, break; } - CDEBUG(D_INFO, "idx %d, mdt cnt %u," - " allocated %u, last allocated %d\n", idx, - lod->lod_remote_mdt_count, i, idx_array[i - 1]); - -next: + CDEBUG(D_INFO, "Get idx %d, for stripe %d "DFID"\n", + idx, i, PFID(&fid)); + idx_array[i] = idx; + /* Set the start index for next stripe allocation */ + if (i < stripe_count - 1) + idx_array[i + 1] = (idx + 1) % + (lod->lod_remote_mdt_count + 1); /* tgt_dt and fid must be ready after search avaible OSP * in the above loop */ LASSERT(tgt_dt != NULL); @@ -1772,7 +1813,6 @@ next: if (IS_ERR(dto)) GOTO(out_put, rc = PTR_ERR(dto)); stripe[i] = dto; - idx_array[i] = idx; } lo->ldo_dir_striped = 1; @@ -1783,167 +1823,26 @@ next: if (lo->ldo_stripenr == 0) GOTO(out_put, rc = -ENOSPC); - rc = lod_prep_lmv_md(env, dt, &lmv_buf); + rc = lod_dir_declare_create_stripes(env, dt, attr, dof, th); if (rc != 0) GOTO(out_put, rc); - lmm = lmv_buf.lb_buf; - - OBD_ALLOC_PTR(slave_lmm); - if (slave_lmm == NULL) - GOTO(out_put, rc = -ENOMEM); - - lod_prep_slave_lmv_md(slave_lmm, lmm); - slave_lmv_buf.lb_buf = slave_lmm; - slave_lmv_buf.lb_len = sizeof(*slave_lmm); - - if (!dt_try_as_dir(env, dt_object_child(dt))) - GOTO(out_put, rc = -EINVAL); - - rec->rec_type = S_IFDIR; - for (i = 0; i < lo->ldo_stripenr; i++) { - struct dt_object *dto = stripe[i]; - char *stripe_name = info->lti_key; - struct lu_name *sname; - struct linkea_data ldata = { NULL }; - struct lu_buf linkea_buf; - - rc = dt_declare_create(env, dto, attr, NULL, dof, th); - if (rc != 0) - GOTO(out_put, rc); - if (!dt_try_as_dir(env, dto)) - GOTO(out_put, rc = -EINVAL); - - rc = dt_declare_ref_add(env, dto, th); - if (rc != 0) - GOTO(out_put, rc); +out_put: + if (rc < 0) { + for (i = 0; i < stripe_count; i++) + if (stripe[i] != NULL) + lu_object_put(env, &stripe[i]->do_lu); + OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count); + lo->ldo_stripenr = 0; + lo->ldo_stripes_allocated = 0; + lo->ldo_stripe = NULL; + } - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th); - if (rc != 0) - GOTO(out_put, rc); +out_free: + OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count); - /* master stripe FID will be put to .. */ - rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th); - if (rc != 0) - GOTO(out_put, rc); - - /* probably nothing to inherite */ - if (lo->ldo_def_striping_set && - !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, - lo->ldo_def_stripenr, - lo->ldo_def_stripe_offset, - lo->ldo_pool)) { - struct lov_user_md_v3 *v3; - - /* sigh, lti_ea_store has been used for lmv_buf, - * so we have to allocate buffer for default - * stripe EA */ - OBD_ALLOC_PTR(v3); - if (v3 == NULL) - GOTO(out_put, rc = -ENOMEM); - - memset(v3, 0, sizeof(*v3)); - v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); - v3->lmm_stripe_count = - cpu_to_le16(lo->ldo_def_stripenr); - v3->lmm_stripe_offset = - cpu_to_le16(lo->ldo_def_stripe_offset); - v3->lmm_stripe_size = - cpu_to_le32(lo->ldo_def_stripe_size); - if (lo->ldo_pool != NULL) - strlcpy(v3->lmm_pool_name, lo->ldo_pool, - sizeof(v3->lmm_pool_name)); - - info->lti_buf.lb_buf = v3; - info->lti_buf.lb_len = sizeof(*v3); - rc = dt_declare_xattr_set(env, dto, - &info->lti_buf, - XATTR_NAME_LOV, - 0, th); - OBD_FREE_PTR(v3); - if (rc != 0) - GOTO(out_put, rc); - } - - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || - cfs_fail_val != i) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && - cfs_fail_val == i) - slave_lmm->lmv_master_mdt_index = - cpu_to_le32(i + 1); - else - slave_lmm->lmv_master_mdt_index = - cpu_to_le32(i); - rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf, - XATTR_NAME_LMV, 0, th); - if (rc != 0) - GOTO(out_put, rc); - } - - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && - cfs_fail_val == i) - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), i + 1); - else - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), i); - - sname = lod_name_get(env, stripe_name, strlen(stripe_name)); - rc = linkea_data_new(&ldata, &info->lti_linkea_buf); - if (rc != 0) - GOTO(out_put, rc); - - rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu)); - if (rc != 0) - GOTO(out_put, rc); - - linkea_buf.lb_buf = ldata.ld_buf->lb_buf; - linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = dt_declare_xattr_set(env, dto, &linkea_buf, - XATTR_NAME_LINK, 0, th); - if (rc != 0) - GOTO(out_put, rc); - - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = dt_declare_insert(env, dt_object_child(dt), - (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, th); - if (rc != 0) - GOTO(out_put, rc); - - rc = dt_declare_ref_add(env, dt_object_child(dt), th); - if (rc != 0) - GOTO(out_put, rc); - } - - rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf, - XATTR_NAME_LMV, 0, th); - if (rc != 0) - GOTO(out_put, rc); - -out_put: - if (rc < 0) { - for (i = 0; i < stripe_count; i++) - if (stripe[i] != NULL) - lu_object_put(env, &stripe[i]->do_lu); - OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count); - lo->ldo_stripenr = 0; - lo->ldo_stripes_allocated = 0; - lo->ldo_stripe = NULL; - } - -out_free: - if (idx_array != NULL) - OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count); - if (slave_lmm != NULL) - OBD_FREE_PTR(slave_lmm); - - RETURN(rc); -} + RETURN(rc); +} /** * Declare create striped md object. @@ -2003,7 +1902,6 @@ out: RETURN(rc); } - /** * Implementation of dt_object_operations::do_declare_xattr_set. * @@ -2037,26 +1935,30 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, RETURN(rc); } - rc = dt_declare_xattr_set(env, next, buf, name, fl, th); + rc = lod_sub_object_declare_xattr_set(env, next, buf, name, fl, th); if (rc != 0) RETURN(rc); + /* Note: Do not set LinkEA on sub-stripes, otherwise + * it will confuse the fid2path process(see mdt_path_current()). + * The linkEA between master and sub-stripes is set in + * lod_xattr_set_lmv(). */ + if (strcmp(name, XATTR_NAME_LINK) == 0) + RETURN(0); + /* set xattr to each stripes, if needed */ rc = lod_load_striping(env, lo); if (rc != 0) RETURN(rc); - /* Note: Do not set LinkEA on sub-stripes, otherwise - * it will confuse the fid2path process(see mdt_path_current()). - * The linkEA between master and sub-stripes is set in - * lod_xattr_set_lmv(). */ - if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0) + if (lo->ldo_stripenr == 0) RETURN(0); for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); - rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf, - name, fl, th); + + rc = lod_sub_object_declare_xattr_set(env, lo->ldo_stripe[i], + buf, name, fl, th); if (rc != 0) break; } @@ -2065,6 +1967,85 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, } /** + * Reset parent FID on OST object + * + * Replace parent FID with @dt object FID, which is only called during migration + * to reset the parent FID after the MDT object is migrated to the new MDT, i.e. + * the FID is changed. + * + * \param[in] env execution environment + * \param[in] dt dt_object whose stripes's parent FID will be reset + * \parem[in] th thandle + * \param[in] declare if it is declare + * + * \retval 0 if reset succeeds + * \retval negative errno if reset fais + */ +static int lod_object_replace_parent_fid(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th, bool declare) +{ + struct lod_object *lo = lod_dt_obj(dt); + struct lod_thread_info *info = lod_env_info(env); + struct lu_buf *buf = &info->lti_buf; + struct filter_fid *ff; + int i, rc; + ENTRY; + + LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr)); + + /* set xattr to each stripes, if needed */ + rc = lod_load_striping(env, lo); + if (rc != 0) + RETURN(rc); + + if (lo->ldo_stripenr == 0) + RETURN(0); + + if (info->lti_ea_store_size < sizeof(*ff)) { + rc = lod_ea_store_resize(info, sizeof(*ff)); + if (rc != 0) + RETURN(rc); + } + + buf->lb_buf = info->lti_ea_store; + buf->lb_len = info->lti_ea_store_size; + + for (i = 0; i < lo->ldo_stripenr; i++) { + if (lo->ldo_stripe[i] == NULL) + continue; + + rc = dt_xattr_get(env, lo->ldo_stripe[i], buf, + XATTR_NAME_FID); + if (rc < 0) { + rc = 0; + continue; + } + + ff = buf->lb_buf; + fid_le_to_cpu(&ff->ff_parent, &ff->ff_parent); + ff->ff_parent.f_seq = lu_object_fid(&dt->do_lu)->f_seq; + ff->ff_parent.f_oid = lu_object_fid(&dt->do_lu)->f_oid; + fid_cpu_to_le(&ff->ff_parent, &ff->ff_parent); + + if (declare) { + rc = lod_sub_object_declare_xattr_set(env, + lo->ldo_stripe[i], buf, + XATTR_NAME_FID, + LU_XATTR_REPLACE, th); + } else { + rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i], + buf, XATTR_NAME_FID, + LU_XATTR_REPLACE, th); + } + if (rc < 0) + break; + } + + RETURN(rc); +} + +/** * Implementation of dt_object_operations::do_declare_xattr_set. * * \see dt_object_operations::do_declare_xattr_set() in the API description @@ -2099,7 +2080,7 @@ static int lod_declare_xattr_set(const struct lu_env *env, * this is a request to manipulate object's striping */ if (dt_object_exists(dt)) { - rc = dt_attr_get(env, next, attr, BYPASS_CAPA); + rc = dt_attr_get(env, next, attr); if (rc) RETURN(rc); } else { @@ -2110,30 +2091,17 @@ static int lod_declare_xattr_set(const struct lu_env *env, rc = lod_declare_striped_object(env, dt, attr, buf, th); } else if (S_ISDIR(mode)) { rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th); + } else if (strcmp(name, XATTR_NAME_FID) == 0) { + rc = lod_object_replace_parent_fid(env, dt, th, true); } else { - rc = dt_declare_xattr_set(env, next, buf, name, fl, th); + rc = lod_sub_object_declare_xattr_set(env, next, buf, name, + fl, th); } RETURN(rc); } /** - * Resets cached default striping in the object. - * - * \param[in] lo object - */ -static void lod_lov_stripe_cache_clear(struct lod_object *lo) -{ - lo->ldo_def_striping_set = 0; - lo->ldo_def_striping_cached = 0; - lod_object_set_pool(lo, NULL); - lo->ldo_def_stripe_size = 0; - lo->ldo_def_stripenr = 0; - if (lo->ldo_dir_stripe != NULL) - lo->ldo_dir_def_striping_cached = 0; -} - -/** * Apply xattr changes to the object. * * Applies xattr changes to the object and the stripes if the latter exist. @@ -2144,7 +2112,6 @@ static void lod_lov_stripe_cache_clear(struct lod_object *lo) * \param[in] name name of xattr * \param[in] fl flags * \param[in] th transaction handle - * \param[in] capa not used currently * * \retval 0 on success * \retval negative if failed @@ -2152,8 +2119,8 @@ static void lod_lov_stripe_cache_clear(struct lod_object *lo) static int lod_xattr_set_internal(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, - const char *name, int fl, struct thandle *th, - struct lustre_capa *capa) + const char *name, int fl, + struct thandle *th) { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); @@ -2161,7 +2128,7 @@ static int lod_xattr_set_internal(const struct lu_env *env, int i; ENTRY; - rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + rc = lod_sub_object_xattr_set(env, next, buf, name, fl, th); if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) RETURN(rc); @@ -2174,8 +2141,9 @@ static int lod_xattr_set_internal(const struct lu_env *env, for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); - rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th, - capa); + + rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i], buf, name, + fl, th); if (rc != 0) break; } @@ -2192,15 +2160,13 @@ static int lod_xattr_set_internal(const struct lu_env *env, * \param[in] dt object * \param[in] name name of xattr * \param[in] th transaction handle - * \param[in] capa not used currently * * \retval 0 on success * \retval negative if failed */ static int lod_xattr_del_internal(const struct lu_env *env, struct dt_object *dt, - const char *name, struct thandle *th, - struct lustre_capa *capa) + const char *name, struct thandle *th) { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); @@ -2208,7 +2174,7 @@ static int lod_xattr_del_internal(const struct lu_env *env, int i; ENTRY; - rc = dt_xattr_del(env, next, name, th, capa); + rc = lod_sub_object_xattr_del(env, next, name, th); if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) RETURN(rc); @@ -2217,8 +2183,9 @@ static int lod_xattr_del_internal(const struct lu_env *env, for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); - rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th, - capa); + + rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name, + th); if (rc != 0) break; } @@ -2240,7 +2207,6 @@ static int lod_xattr_del_internal(const struct lu_env *env, * \param[in] name name of EA * \param[in] fl xattr flag (see OSD API description) * \param[in] th transaction handle - * \param[in] capa not used * * \retval 0 on success * \retval negative if failed @@ -2249,22 +2215,15 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, - struct thandle *th, - struct lustre_capa *capa) + struct thandle *th) { struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); - struct lod_object *l = lod_dt_obj(dt); struct lov_user_md_v1 *lum; struct lov_user_md_v3 *v3 = NULL; const char *pool_name = NULL; int rc; ENTRY; - /* If it is striped dir, we should clear the stripe cache for - * slave stripe as well, but there are no effective way to - * notify the LOD on the slave MDT, so we do not cache stripe - * information for slave stripe for now. XXX*/ - lod_lov_stripe_cache_clear(l); LASSERT(buf != NULL && buf->lb_buf != NULL); lum = buf->lb_buf; @@ -2290,11 +2249,11 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, if (LOVEA_DELETE_VALUES(lum->lmm_stripe_size, lum->lmm_stripe_count, lum->lmm_stripe_offset, pool_name)) { - rc = lod_xattr_del_internal(env, dt, name, th, capa); + rc = lod_xattr_del_internal(env, dt, name, th); if (rc == -ENODATA) rc = 0; } else { - rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa); + rc = lod_xattr_set_internal(env, dt, buf, name, fl, th); } RETURN(rc); @@ -2314,7 +2273,6 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, * \param[in] name name of EA * \param[in] fl xattr flag (see OSD API description) * \param[in] th transaction handle - * \param[in] capa not used * * \retval 0 on success * \retval negative if failed @@ -2323,10 +2281,8 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, - struct thandle *th, - struct lustre_capa *capa) + struct thandle *th) { - struct lod_object *l = lod_dt_obj(dt); struct lmv_user_md_v1 *lum; int rc; ENTRY; @@ -2341,23 +2297,15 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)), le32_to_cpu(lum->lum_stripe_offset)) && le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) { - rc = lod_xattr_del_internal(env, dt, name, th, capa); + rc = lod_xattr_del_internal(env, dt, name, th); if (rc == -ENODATA) rc = 0; } else { - rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa); + rc = lod_xattr_set_internal(env, dt, buf, name, fl, th); if (rc != 0) RETURN(rc); } - /* Update default stripe cache */ - if (l->ldo_dir_stripe == NULL) { - OBD_ALLOC_PTR(l->ldo_dir_stripe); - if (l->ldo_dir_stripe == NULL) - RETURN(-ENOMEM); - } - - l->ldo_dir_def_striping_cached = 0; RETURN(rc); } @@ -2368,7 +2316,7 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, * failure, then the layer above LOD sends this defined striping * using ->do_xattr_set(), so LOD uses this method to replay creation * of the stripes. Notice the original information for the striping - * (#stripes, FIDs, etc) was transfered in declare path. + * (#stripes, FIDs, etc) was transferred in declare path. * * \param[in] env execution environment * \param[in] dt the striped object @@ -2376,15 +2324,13 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, * \param[in] name not used currently * \param[in] fl xattr flag (see OSD API description) * \param[in] th transaction handle - * \param[in] capa not used * * \retval 0 on success * \retval negative if failed */ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, - int fl, struct thandle *th, - struct lustre_capa *capa) + int fl, struct thandle *th) { struct lod_object *lo = lod_dt_obj(dt); struct lod_thread_info *info = lod_env_info(env); @@ -2407,7 +2353,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, if (lo->ldo_stripenr == 0) RETURN(0); - rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA); + rc = dt_attr_get(env, dt_object_child(dt), attr); if (rc != 0) RETURN(rc); @@ -2430,71 +2376,39 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, rec->rec_type = S_IFDIR; for (i = 0; i < lo->ldo_stripenr; i++) { - struct dt_object *dto; - char *stripe_name = info->lti_key; + struct dt_object *dto; + char *stripe_name = info->lti_key; struct lu_name *sname; struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; dto = lo->ldo_stripe[i]; + dt_write_lock(env, dto, MOR_TGT_CHILD); - rc = dt_create(env, dto, attr, NULL, dof, th); + rc = lod_sub_object_create(env, dto, attr, NULL, dof, + th); if (rc != 0) { dt_write_unlock(env, dto); - RETURN(rc); + GOTO(out, rc); } - rc = dt_ref_add(env, dto, th); + rc = lod_sub_object_ref_add(env, dto, th); dt_write_unlock(env, dto); if (rc != 0) - RETURN(rc); + GOTO(out, rc); rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = dt_insert(env, dto, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th, capa, 0); + rc = lod_sub_object_index_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, th, 0); if (rc != 0) - RETURN(rc); + GOTO(out, rc); rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = dt_insert(env, dto, (struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, capa, 0); + rc = lod_sub_object_index_insert(env, dto, (struct dt_rec *)rec, + (const struct dt_key *)dotdot, th, 0); if (rc != 0) - RETURN(rc); - - if (lo->ldo_def_striping_set && - !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, - lo->ldo_def_stripenr, - lo->ldo_def_stripe_offset, - lo->ldo_pool)) { - struct lov_user_md_v3 *v3; - - /* sigh, lti_ea_store has been used for lmv_buf, - * so we have to allocate buffer for default - * stripe EA */ - OBD_ALLOC_PTR(v3); - if (v3 == NULL) - GOTO(out, rc); - - memset(v3, 0, sizeof(*v3)); - v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); - v3->lmm_stripe_count = - cpu_to_le16(lo->ldo_def_stripenr); - v3->lmm_stripe_offset = - cpu_to_le16(lo->ldo_def_stripe_offset); - v3->lmm_stripe_size = - cpu_to_le32(lo->ldo_def_stripe_size); - if (lo->ldo_pool != NULL) - strlcpy(v3->lmm_pool_name, lo->ldo_pool, - sizeof(v3->lmm_pool_name)); - - info->lti_buf.lb_buf = v3; - info->lti_buf.lb_len = sizeof(*v3); - rc = dt_xattr_set(env, dto, &info->lti_buf, - XATTR_NAME_LOV, 0, th, capa); - OBD_FREE_PTR(v3); - if (rc != 0) - GOTO(out, rc); - } + GOTO(out, rc); if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || cfs_fail_val != i) { @@ -2505,8 +2419,9 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, else slave_lmm->lmv_master_mdt_index = cpu_to_le32(i); - rc = dt_xattr_set(env, dto, &slave_lmv_buf, - XATTR_NAME_LMV, fl, th, capa); + + rc = lod_sub_object_xattr_set(env, dto, &slave_lmv_buf, + XATTR_NAME_LMV, fl, th); if (rc != 0) GOTO(out, rc); } @@ -2520,37 +2435,33 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, PFID(lu_object_fid(&dto->do_lu)), i); sname = lod_name_get(env, stripe_name, strlen(stripe_name)); - rc = linkea_data_new(&ldata, &info->lti_linkea_buf); - if (rc != 0) - GOTO(out, rc); - - rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); if (rc != 0) GOTO(out, rc); linkea_buf.lb_buf = ldata.ld_buf->lb_buf; linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK, - 0, th, BYPASS_CAPA); + rc = lod_sub_object_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); if (rc != 0) GOTO(out, rc); rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = dt_insert(env, dt_object_child(dt), + rc = lod_sub_object_index_insert(env, dt_object_child(dt), (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, th, capa, 0); + (const struct dt_key *)stripe_name, th, 0); if (rc != 0) GOTO(out, rc); - rc = dt_ref_add(env, dt_object_child(dt), th); + rc = lod_sub_object_ref_add(env, dt_object_child(dt), th); if (rc != 0) GOTO(out, rc); } if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV)) - rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, - XATTR_NAME_LMV, fl, th, capa); - + rc = lod_sub_object_xattr_set(env, dt_object_child(dt), + &lmv_buf, XATTR_NAME_LMV, fl, th); out: if (slave_lmm != NULL) OBD_FREE_PTR(slave_lmm); @@ -2587,11 +2498,17 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, struct thandle *th, bool declare) { - struct lod_thread_info *info = lod_env_info(env); - struct lod_object *lo = lod_dt_obj(dt); - int rc; + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + const struct lod_default_striping *lds = lo->ldo_def_striping; + const char *poolname = NULL; + int rc; ENTRY; + LASSERT(ergo(lds != NULL, + lds->lds_def_striping_set || + lds->lds_dir_def_striping_set)); + if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr, lo->ldo_dir_stripe_offset)) { struct lmv_user_md_v1 *v1 = info->lti_ea_store; @@ -2618,18 +2535,16 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, &info->lti_buf, dof, th); else rc = lod_xattr_set_lmv(env, dt, &info->lti_buf, - XATTR_NAME_LMV, 0, th, - BYPASS_CAPA); + XATTR_NAME_LMV, 0, th); if (rc != 0) RETURN(rc); } /* Transfer default LMV striping from the parent */ - if (lo->ldo_dir_def_striping_set && - !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr, - lo->ldo_dir_def_stripe_offset)) { + if (lds != NULL && lds->lds_dir_def_striping_set && + !LMVEA_DELETE_VALUES(lds->lds_dir_def_stripenr, + lds->lds_dir_def_stripe_offset)) { struct lmv_user_md_v1 *v1 = info->lti_ea_store; - int def_stripe_count = lo->ldo_dir_def_stripenr; if (info->lti_ea_store_size < sizeof(*v1)) { rc = lod_ea_store_resize(info, sizeof(*v1)); @@ -2640,11 +2555,11 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, memset(v1, 0, sizeof(*v1)); v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); - v1->lum_stripe_count = cpu_to_le32(def_stripe_count); + v1->lum_stripe_count = cpu_to_le32(lds->lds_dir_def_stripenr); v1->lum_stripe_offset = - cpu_to_le32(lo->ldo_dir_def_stripe_offset); + cpu_to_le32(lds->lds_dir_def_stripe_offset); v1->lum_hash_type = - cpu_to_le32(lo->ldo_dir_def_hash_type); + cpu_to_le32(lds->lds_dir_def_hash_type); info->lti_buf.lb_buf = v1; info->lti_buf.lb_len = sizeof(*v1); @@ -2656,17 +2571,20 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, rc = lod_xattr_set_default_lmv_on_dir(env, dt, &info->lti_buf, XATTR_NAME_DEFAULT_LMV, 0, - th, BYPASS_CAPA); + th); if (rc != 0) RETURN(rc); } + if (lds != NULL && lds->lds_def_pool[0] != '\0') + poolname = lds->lds_def_pool; + /* Transfer default LOV striping from the parent */ - if (lo->ldo_def_striping_set && - !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, - lo->ldo_def_stripenr, - lo->ldo_def_stripe_offset, - lo->ldo_pool)) { + if (lds != NULL && lds->lds_def_striping_set && + !LOVEA_DELETE_VALUES(lds->lds_def_stripe_size, + lds->lds_def_stripenr, + lds->lds_def_stripe_offset, + poolname)) { struct lov_user_md_v3 *v3 = info->lti_ea_store; if (info->lti_ea_store_size < sizeof(*v3)) { @@ -2678,11 +2596,11 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, memset(v3, 0, sizeof(*v3)); v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); - v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr); - v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset); - v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size); - if (lo->ldo_pool != NULL) - strlcpy(v3->lmm_pool_name, lo->ldo_pool, + v3->lmm_stripe_count = cpu_to_le16(lds->lds_def_stripenr); + v3->lmm_stripe_offset = cpu_to_le16(lds->lds_def_stripe_offset); + v3->lmm_stripe_size = cpu_to_le32(lds->lds_def_stripe_size); + if (poolname != NULL) + strlcpy(v3->lmm_pool_name, poolname, sizeof(v3->lmm_pool_name)); info->lti_buf.lb_buf = v3; @@ -2693,8 +2611,7 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, XATTR_NAME_LOV, 0, th); else rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf, - XATTR_NAME_LOV, 0, th, - BYPASS_CAPA); + XATTR_NAME_LOV, 0, th); if (rc != 0) RETURN(rc); } @@ -2717,14 +2634,7 @@ static int lod_dir_striping_create(const struct lu_env *env, struct dt_object_format *dof, struct thandle *th) { - struct lod_object *lo = lod_dt_obj(dt); - int rc; - - rc = lod_dir_striping_create_internal(env, dt, attr, dof, th, false); - if (rc == 0) - lo->ldo_striping_cached = 1; - - return rc; + return lod_dir_striping_create_internal(env, dt, attr, dof, th, false); } /** @@ -2747,8 +2657,7 @@ static int lod_dir_striping_create(const struct lu_env *env, */ static int lod_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, - const char *name, int fl, struct thandle *th, - struct lustre_capa *capa) + const char *name, int fl, struct thandle *th) { struct dt_object *next = dt_object_child(dt); int rc; @@ -2760,7 +2669,8 @@ static int lod_xattr_set(const struct lu_env *env, if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION) - rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + rc = lod_sub_object_xattr_set(env, next, buf, name, fl, + th); else rc = lod_dir_striping_create(env, dt, NULL, NULL, th); @@ -2770,33 +2680,46 @@ static int lod_xattr_set(const struct lu_env *env, if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_LOV) == 0) { /* default LOVEA */ - rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa); + rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th); RETURN(rc); } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { /* default LMVEA */ rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl, - th, capa); + th); RETURN(rc); } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) && !strcmp(name, XATTR_NAME_LOV)) { /* in case of lov EA swap, just set it * if not, it is a replay so check striping match what we * already have during req replay, declare_xattr_set() - * defines striping, then create() does the work - */ + * defines striping, then create() does the work */ if (fl & LU_XATTR_REPLACE) { /* free stripes, then update disk */ lod_object_free_striping(env, lod_dt_obj(dt)); - rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + + rc = lod_sub_object_xattr_set(env, next, buf, name, + fl, th); + } else if (dt_object_remote(dt)) { + /* This only happens during migration, see + * mdd_migrate_create(), in which Master MDT will + * create a remote target object, and only set + * (migrating) stripe EA on the remote object, + * and does not need creating each stripes. */ + rc = lod_sub_object_xattr_set(env, next, buf, name, + fl, th); } else { rc = lod_striping_create(env, dt, NULL, NULL, th); } RETURN(rc); + } else if (strcmp(name, XATTR_NAME_FID) == 0) { + rc = lod_object_replace_parent_fid(env, dt, th, false); + + RETURN(rc); } /* then all other xattr */ - rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa); + rc = lod_xattr_set_internal(env, dt, buf, name, fl, th); RETURN(rc); } @@ -2811,7 +2734,36 @@ static int lod_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) { - return dt_declare_xattr_del(env, dt_object_child(dt), name, th); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + int i; + ENTRY; + + rc = lod_sub_object_declare_xattr_del(env, dt_object_child(dt), + name, th); + if (rc != 0) + RETURN(rc); + + if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(0); + + /* set xattr to each stripes, if needed */ + rc = lod_load_striping(env, lo); + if (rc != 0) + RETURN(rc); + + if (lo->ldo_stripenr == 0) + RETURN(0); + + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + rc = lod_sub_object_declare_xattr_del(env, lo->ldo_stripe[i], + name, th); + if (rc != 0) + break; + } + + RETURN(rc); } /** @@ -2823,12 +2775,33 @@ static int lod_declare_xattr_del(const struct lu_env *env, * \see dt_object_operations::do_xattr_del() in the API description for details. */ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, - const char *name, struct thandle *th, - struct lustre_capa *capa) + const char *name, struct thandle *th) { + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + int i; + ENTRY; + if (!strcmp(name, XATTR_NAME_LOV)) lod_object_free_striping(env, lod_dt_obj(dt)); - return dt_xattr_del(env, dt_object_child(dt), name, th, capa); + + rc = lod_sub_object_xattr_del(env, next, name, th); + if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(rc); + + if (lo->ldo_stripenr == 0) + RETURN(0); + + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + + rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name, th); + if (rc != 0) + break; + } + + RETURN(rc); } /** @@ -2838,42 +2811,9 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, * for details. */ static int lod_xattr_list(const struct lu_env *env, - struct dt_object *dt, const struct lu_buf *buf, - struct lustre_capa *capa) + struct dt_object *dt, const struct lu_buf *buf) { - return dt_xattr_list(env, dt_object_child(dt), buf, capa); -} - -/** - * Initialize a pool the object belongs to. - * - * When a striped object is being created, striping configuration - * may demand the stripes are allocated on a limited set of the - * targets. These limited sets are known as "pools". So we copy - * a pool name into the object and later actual creation methods - * (like lod_object_create()) will use this information to allocate - * the stripes properly. - * - * \param[in] o object - * \param[in] pool pool name - */ -int lod_object_set_pool(struct lod_object *o, char *pool) -{ - int len; - - if (o->ldo_pool) { - len = strlen(o->ldo_pool); - OBD_FREE(o->ldo_pool, len + 1); - o->ldo_pool = NULL; - } - if (pool) { - len = strlen(pool); - OBD_ALLOC(o->ldo_pool, len + 1); - if (o->ldo_pool == NULL) - return -ENOMEM; - strcpy(o->ldo_pool, pool); - } - return 0; + return dt_xattr_list(env, dt_object_child(dt), buf); } static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid) @@ -2883,44 +2823,31 @@ static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fi /** - * Cache default regular striping in the object. - * - * To improve performance of striped regular object creation we cache - * default LOV striping (if it exists) in the parent directory object. + * Get default striping. * * \param[in] env execution environment - * \param[in] lp object + * \param[in] lo object + * \param[out] lds default striping * * \retval 0 on success * \retval negative if failed */ -static int lod_cache_parent_lov_striping(const struct lu_env *env, - struct lod_object *lp) +static int lod_get_default_lov_striping(const struct lu_env *env, + struct lod_object *lo, + struct lod_default_striping *lds) { - struct lod_thread_info *info = lod_env_info(env); - struct lov_user_md_v1 *v1 = NULL; - struct lov_user_md_v3 *v3 = NULL; - int rc; - ENTRY; + struct lod_thread_info *info = lod_env_info(env); + struct lov_user_md_v1 *v1 = NULL; + struct lov_user_md_v3 *v3 = NULL; + int rc; - /* called from MDD without parent being write locked, - * lock it here */ - dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0); - rc = lod_get_lov_ea(env, lp); + rc = lod_get_lov_ea(env, lo); if (rc < 0) - GOTO(unlock, rc); - - if (rc < (typeof(rc))sizeof(struct lov_user_md)) { - /* don't lookup for non-existing or invalid striping */ - lp->ldo_def_striping_set = 0; - lp->ldo_def_striping_cached = 1; - lp->ldo_def_stripe_size = 0; - lp->ldo_def_stripenr = 0; - lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1); - GOTO(unlock, rc = 0); - } + return rc; + + if (rc < (typeof(rc))sizeof(struct lov_user_md)) + return 0; - rc = 0; v1 = info->lti_ea_store; if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) { lustre_swab_lov_user_md_v1(v1); @@ -2930,127 +2857,126 @@ static int lod_cache_parent_lov_striping(const struct lu_env *env, } if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1) - GOTO(unlock, rc = 0); + return 0; if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0) - GOTO(unlock, rc = 0); - - CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n", - PFID(lu_object_fid(&lp->ldo_obj.do_lu)), - (int)v1->lmm_stripe_count, - (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset); - - lp->ldo_def_stripenr = v1->lmm_stripe_count; - lp->ldo_def_stripe_size = v1->lmm_stripe_size; - lp->ldo_def_stripe_offset = v1->lmm_stripe_offset; - lp->ldo_def_striping_cached = 1; - lp->ldo_def_striping_set = 1; + return 0; + + lds->lds_def_stripenr = v1->lmm_stripe_count; + lds->lds_def_stripe_size = v1->lmm_stripe_size; + lds->lds_def_stripe_offset = v1->lmm_stripe_offset; + lds->lds_def_striping_set = 1; if (v1->lmm_magic == LOV_USER_MAGIC_V3) { - /* XXX: sanity check here */ - v3 = (struct lov_user_md_v3 *) v1; - if (v3->lmm_pool_name[0]) - lod_object_set_pool(lp, v3->lmm_pool_name); + v3 = (struct lov_user_md_v3 *)v1; + if (v3->lmm_pool_name[0] != '\0') + strlcpy(lds->lds_def_pool, v3->lmm_pool_name, + sizeof(lds->lds_def_pool)); } - EXIT; -unlock: - dt_write_unlock(env, dt_object_child(&lp->ldo_obj)); - return rc; -} + return 0; +} /** - * Cache default directory striping in the object. - * - * To improve performance of striped directory creation we cache default - * directory striping (if it exists) in the parent directory object. + * Get default directory striping. * * \param[in] env execution environment - * \param[in] lp object + * \param[in] lo object + * \param[out] lds default striping * * \retval 0 on success * \retval negative if failed */ -static int lod_cache_parent_lmv_striping(const struct lu_env *env, - struct lod_object *lp) +static int lod_get_default_lmv_striping(const struct lu_env *env, + struct lod_object *lo, + struct lod_default_striping *lds) { struct lod_thread_info *info = lod_env_info(env); struct lmv_user_md_v1 *v1 = NULL; int rc; - ENTRY; - /* called from MDD without parent being write locked, - * lock it here */ - dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0); - rc = lod_get_default_lmv_ea(env, lp); + rc = lod_get_default_lmv_ea(env, lo); if (rc < 0) - GOTO(unlock, rc); - - if (rc < (typeof(rc))sizeof(struct lmv_user_md)) { - /* don't lookup for non-existing or invalid striping */ - lp->ldo_dir_def_striping_set = 0; - lp->ldo_dir_def_striping_cached = 1; - lp->ldo_dir_def_stripenr = 0; - lp->ldo_dir_def_stripe_offset = - (typeof(v1->lum_stripe_offset))(-1); - lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64; - GOTO(unlock, rc = 0); - } + return rc; + + if (rc < (typeof(rc))sizeof(struct lmv_user_md)) + return 0; - rc = 0; v1 = info->lti_ea_store; - lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count); - lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset); - lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type); - lp->ldo_dir_def_striping_set = 1; - lp->ldo_dir_def_striping_cached = 1; + lds->lds_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count); + lds->lds_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset); + lds->lds_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type); + lds->lds_dir_def_striping_set = 1; - EXIT; -unlock: - dt_write_unlock(env, dt_object_child(&lp->ldo_obj)); - return rc; + return 0; } /** - * Cache default striping in the object. + * Get default striping in the object. * - * To improve performance of striped object creation we cache default striping - * (if it exists) in the parent directory object. We always cache default - * striping for the regular files (stored in LOV EA) and we cache default - * striping for the directories if requested by \a child_mode (when a new - * directory is being created). + * Get object default striping and default directory striping. * * \param[in] env execution environment - * \param[in] lp object - * \param[in] child_mode new object's mode + * \param[in] lo object + * \param[out] lds default striping * * \retval 0 on success * \retval negative if failed */ -static int lod_cache_parent_striping(const struct lu_env *env, - struct lod_object *lp, - umode_t child_mode) +static int lod_get_default_striping(const struct lu_env *env, + struct lod_object *lo, + struct lod_default_striping *lds) { - int rc = 0; - ENTRY; - - if (!lp->ldo_def_striping_cached) { - /* we haven't tried to get default striping for - * the directory yet, let's cache it in the object */ - rc = lod_cache_parent_lov_striping(env, lp); - if (rc != 0) - RETURN(rc); - } + int rc; - /* If the parent is on the remote MDT, we should always - * try to refresh the default stripeEA cache, because we - * do not cache default striping information for remote - * object. */ - if (S_ISDIR(child_mode) && (!lp->ldo_dir_def_striping_cached || - dt_object_remote(&lp->ldo_obj))) - rc = lod_cache_parent_lmv_striping(env, lp); + rc = lod_get_default_lov_striping(env, lo, lds); + if (rc == 0) + rc = lod_get_default_lmv_striping(env, lo, lds); + return rc; +} - RETURN(rc); +/** + * Apply default striping on object. + * + * If object striping pattern is not set, set to the one in default striping. + * The default striping is from parent or fs. + * + * \param[in] lo new object + * \param[in] lds default striping + * \param[in] mode new object's mode + */ +static void lod_striping_from_default(struct lod_object *lo, + const struct lod_default_striping *lds, + umode_t mode) +{ + if (lds->lds_def_striping_set && S_ISREG(mode)) { + if (lo->ldo_stripenr == 0) + lo->ldo_stripenr = lds->lds_def_stripenr; + if (lo->ldo_stripe_size == 0) + lo->ldo_stripe_size = lds->lds_def_stripe_size; + if (lo->ldo_stripe_offset == LOV_OFFSET_DEFAULT) + lo->ldo_stripe_offset = lds->lds_def_stripe_offset; + if (lo->ldo_pool == NULL && lds->lds_def_pool[0] != '\0') + lod_object_set_pool(lo, lds->lds_def_pool); + + CDEBUG(D_INFO, "striping from default: count %hu, size %u, " + "offset %d, pool %s\n", + lo->ldo_stripenr, lo->ldo_stripe_size, + (int)lo->ldo_stripe_offset, lo->ldo_pool ?: ""); + } else if (lds->lds_dir_def_striping_set && S_ISDIR(mode)) { + if (lo->ldo_stripenr == 0) + lo->ldo_stripenr = lds->lds_dir_def_stripenr; + if (lo->ldo_dir_stripe_offset == -1) + lo->ldo_dir_stripe_offset = + lds->lds_dir_def_stripe_offset; + if (lo->ldo_dir_hash_type == 0) + lo->ldo_dir_hash_type = lds->lds_dir_def_hash_type; + + CDEBUG(D_INFO, "striping from default: count %hu, offset %d, " + "hash_type %u\n", + lo->ldo_stripenr, (int)lo->ldo_dir_stripe_offset, + lo->ldo_dir_hash_type); + } } /** @@ -3071,12 +2997,13 @@ static void lod_ah_init(const struct lu_env *env, umode_t child_mode) { struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev); - struct dt_object *nextp = NULL; - struct dt_object *nextc; + struct lod_thread_info *info = lod_env_info(env); + struct lod_default_striping *lds = &info->lti_def_striping; + struct dt_object *nextp = NULL; + struct dt_object *nextc; struct lod_object *lp = NULL; struct lod_object *lc; - struct lov_desc *desc; - int rc; + struct lov_desc *desc; ENTRY; LASSERT(child); @@ -3084,9 +3011,6 @@ static void lod_ah_init(const struct lu_env *env, if (likely(parent)) { nextp = dt_object_child(parent); lp = lod_dt_obj(parent); - rc = lod_load_striping(env, lp); - if (rc != 0) - return; } nextc = dt_object_child(child); @@ -3095,106 +3019,56 @@ static void lod_ah_init(const struct lu_env *env, LASSERT(lc->ldo_stripenr == 0); LASSERT(lc->ldo_stripe == NULL); - /* - * local object may want some hints - * in case of late striping creation, ->ah_init() - * can be called with local object existing - */ - if (!dt_object_exists(nextc) || dt_object_remote(nextc)) { - struct dt_object *obj; - - obj = (nextp != NULL && dt_object_remote(nextp)) ? NULL : nextp; - nextc->do_ops->do_ah_init(env, ah, obj, nextc, child_mode); - } + if (!dt_object_exists(nextc)) + nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode); if (S_ISDIR(child_mode)) { - if (lc->ldo_dir_stripe == NULL) { - OBD_ALLOC_PTR(lc->ldo_dir_stripe); - if (lc->ldo_dir_stripe == NULL) - return; - } - - LASSERT(lp != NULL); - if (lp->ldo_dir_stripe == NULL) { - OBD_ALLOC_PTR(lp->ldo_dir_stripe); - if (lp->ldo_dir_stripe == NULL) - return; - } + /* other default values are 0 */ + lc->ldo_dir_stripe_offset = -1; - rc = lod_cache_parent_striping(env, lp, child_mode); - if (rc != 0) - return; - - /* transfer defaults to new directory */ - if (lp->ldo_def_striping_set) { - if (lp->ldo_pool) - lod_object_set_pool(lc, lp->ldo_pool); - lc->ldo_def_stripenr = lp->ldo_def_stripenr; - lc->ldo_def_stripe_size = lp->ldo_def_stripe_size; - lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset; - lc->ldo_def_striping_set = 1; - lc->ldo_def_striping_cached = 1; - CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n", - (int)lc->ldo_def_stripe_size, - (int)lc->ldo_def_stripe_offset, - (int)lc->ldo_def_stripenr); - } + memset(lds, 0, sizeof(*lds)); + lod_get_default_striping(env, lp, lds); - /* transfer dir defaults to new directory */ - if (lp->ldo_dir_def_striping_set) { - lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr; - lc->ldo_dir_def_stripe_offset = - lp->ldo_dir_def_stripe_offset; - lc->ldo_dir_def_hash_type = - lp->ldo_dir_def_hash_type; - lc->ldo_dir_def_striping_set = 1; - lc->ldo_dir_def_striping_cached = 1; - CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n", - (int)lc->ldo_dir_def_stripenr, - (int)lc->ldo_dir_def_stripe_offset, - lc->ldo_dir_def_hash_type); - } + /* inherit parent default striping */ + if (lds->lds_def_striping_set || lds->lds_dir_def_striping_set) + lc->ldo_def_striping = lds; /* It should always honour the specified stripes */ - if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) { + if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 && + lod_verify_md_striping(d, ah->dah_eadata) == 0) { const struct lmv_user_md_v1 *lum1 = ah->dah_eadata; - rc = lod_verify_md_striping(d, lum1); - if (rc == 0 && - le32_to_cpu(lum1->lum_stripe_count) > 1) { - /* Directory will be striped only if - * stripe_count > 1 */ - lc->ldo_stripenr = - le32_to_cpu(lum1->lum_stripe_count); - lc->ldo_dir_stripe_offset = - le32_to_cpu(lum1->lum_stripe_offset); - lc->ldo_dir_hash_type = - le32_to_cpu(lum1->lum_hash_type); - CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n", - lc->ldo_stripenr, - (int)lc->ldo_dir_stripe_offset); - } - /* then check whether there is default stripes from parent */ - } else if (lp->ldo_dir_def_striping_set) { - /* If there are default dir stripe from parent */ - lc->ldo_stripenr = lp->ldo_dir_def_stripenr; + lc->ldo_stripenr = le32_to_cpu(lum1->lum_stripe_count); lc->ldo_dir_stripe_offset = - lp->ldo_dir_def_stripe_offset; + le32_to_cpu(lum1->lum_stripe_offset); lc->ldo_dir_hash_type = - lp->ldo_dir_def_hash_type; - CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n", - lc->ldo_stripenr, - (int)lc->ldo_dir_stripe_offset); + le32_to_cpu(lum1->lum_hash_type); + CDEBUG(D_INFO, "set dir stripe: count %hu, offset %d, " + "hash_type %u\n", + lc->ldo_stripenr, + (int)lc->ldo_dir_stripe_offset, + lc->ldo_dir_hash_type); } else { - /* set default stripe for this directory */ - lc->ldo_stripenr = 0; - lc->ldo_dir_stripe_offset = -1; + lod_striping_from_default(lc, lds, child_mode); } - CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n", - lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset); + /* shrink the stripe_count to the avaible MDT count */ + if (lc->ldo_stripenr > d->lod_remote_mdt_count + 1 && + !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) + lc->ldo_stripenr = d->lod_remote_mdt_count + 1; - goto out; + /* Directory will be striped only if stripe_count > 1, if + * stripe_count == 1, let's reset stripenr = 0 to avoid + * create single master stripe and also help to unify the + * stripe handling of directories and files */ + if (lc->ldo_stripenr == 1) + lc->ldo_stripenr = 0; + + CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n", + lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset, + lc->ldo_dir_hash_type); + + RETURN_EXIT; } /* @@ -3204,45 +3078,60 @@ static void lod_ah_init(const struct lu_env *env, */ if (!lod_object_will_be_striped(S_ISREG(child_mode), lu_object_fid(&child->do_lu))) - goto out; - /* - * try from the parent - */ + RETURN_EXIT; + + /* other default values are 0 */ + lc->ldo_stripe_offset = LOV_OFFSET_DEFAULT; + + /* striping from parent default */ if (likely(parent)) { - lod_cache_parent_striping(env, lp, child_mode); - - lc->ldo_def_stripe_offset = LOV_OFFSET_DEFAULT; - - if (lp->ldo_def_striping_set) { - if (lp->ldo_pool) - lod_object_set_pool(lc, lp->ldo_pool); - lc->ldo_stripenr = lp->ldo_def_stripenr; - lc->ldo_stripe_size = lp->ldo_def_stripe_size; - lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset; - CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n", - lc->ldo_stripenr, lc->ldo_stripe_size, - lp->ldo_pool ? lp->ldo_pool : ""); + memset(lds, 0, sizeof(*lds)); + lod_get_default_lov_striping(env, lp, lds); + lod_striping_from_default(lc, lds, child_mode); + } + + if (d->lod_md_root == NULL) { + struct dt_object *root; + struct lod_object *lroot; + + lu_root_fid(&info->lti_fid); + root = dt_locate(env, &d->lod_dt_dev, &info->lti_fid); + if (!IS_ERR(root)) { + lroot = lod_dt_obj(root); + + spin_lock(&d->lod_lock); + if (d->lod_md_root != NULL) + lu_object_put(env, + &d->lod_md_root->ldo_obj.do_lu); + d->lod_md_root = lroot; + spin_unlock(&d->lod_lock); } } + /* if parent doesn't provide all defaults, striping from fs default */ + if (d->lod_md_root != NULL && + (lc->ldo_stripenr == 0 || + lc->ldo_stripe_size == 0 || + lc->ldo_stripe_offset == LOV_OFFSET_DEFAULT || + lc->ldo_pool == NULL)) { + memset(lds, 0, sizeof(*lds)); + lod_get_default_lov_striping(env, d->lod_md_root, lds); + lod_striping_from_default(lc, lds, child_mode); + } + /* - * if the parent doesn't provide with specific pattern, grab fs-wide one + * fs default striping may not be explicitly set, or historically set + * in config log, check striping sanity here and fix to sane values. */ desc = &d->lod_desc; if (lc->ldo_stripenr == 0) lc->ldo_stripenr = desc->ld_default_stripe_count; if (lc->ldo_stripe_size == 0) lc->ldo_stripe_size = desc->ld_default_stripe_size; - CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n", - lc->ldo_stripenr, lc->ldo_stripe_size, - lc->ldo_pool ? lc->ldo_pool : ""); - -out: - /* we do not cache stripe information for slave stripe, see - * lod_xattr_set_lov_on_dir */ - if (lp != NULL && lp->ldo_dir_slave_stripe) - lod_lov_stripe_cache_clear(lp); + CDEBUG(D_INFO, "final striping [%hu %u %d %s]\n", + lc->ldo_stripenr, lc->ldo_stripe_size, + (int)lc->ldo_stripe_offset, lc->ldo_pool ?: ""); EXIT; } @@ -3277,7 +3166,10 @@ static int lod_declare_init_size(const struct lu_env *env, LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0); LASSERT(lo->ldo_stripe_size > 0); - rc = dt_attr_get(env, next, attr, BYPASS_CAPA); + if (lo->ldo_stripenr == 0) + RETURN(0); + + rc = dt_attr_get(env, next, attr); LASSERT(attr->la_valid & LA_SIZE); if (rc) RETURN(rc); @@ -3297,7 +3189,8 @@ static int lod_declare_init_size(const struct lu_env *env, attr->la_valid = LA_SIZE; attr->la_size = size; - rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th); + rc = lod_sub_object_declare_attr_set(env, lo->ldo_stripe[stripe], attr, + th); RETURN(rc); } @@ -3331,22 +3224,14 @@ int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt, int rc; ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) { - /* failed to create striping, let's reset - * config so that others don't get confused */ - lod_object_free_striping(env, lo); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) GOTO(out, rc = -ENOMEM); - } if (!dt_object_remote(next)) { /* choose OST and generate appropriate objects */ rc = lod_qos_prep_create(env, lo, attr, lovea, th); - if (rc) { - /* failed to create striping, let's reset - * config so that others don't get confused */ - lod_object_free_striping(env, lo); + if (rc) GOTO(out, rc); - } /* * declare storage for striping data @@ -3363,8 +3248,8 @@ int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt, info->lti_buf = *lovea; } - rc = dt_declare_xattr_set(env, next, &info->lti_buf, - XATTR_NAME_LOV, 0, th); + rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf, + XATTR_NAME_LOV, 0, th); if (rc) GOTO(out, rc); @@ -3377,6 +3262,11 @@ int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt, rc = lod_declare_init_size(env, dt, th); out: + /* failed to create striping or to set initial size, let's reset + * config so that others don't get confused */ + if (rc) + lod_object_free_striping(env, lo); + RETURN(rc); } @@ -3410,15 +3300,17 @@ static int lod_declare_object_create(const struct lu_env *env, /* * first of all, we declare creation of local object */ - rc = dt_declare_create(env, next, attr, hint, dof, th); - if (rc) + rc = lod_sub_object_declare_create(env, next, attr, hint, dof, th); + if (rc != 0) GOTO(out, rc); if (dof->dof_type == DFT_SYM) dt->do_body_ops = &lod_body_lnk_ops; + else if (dof->dof_type == DFT_REGULAR) + dt->do_body_ops = &lod_body_ops; /* - * it's lod_ah_init() who has decided the object will striped + * it's lod_ah_init() that has decided the object will be striped */ if (dof->dof_type == DFT_REGULAR) { /* callers don't want stripes */ @@ -3444,16 +3336,46 @@ static int lod_declare_object_create(const struct lu_env *env, * Note: if dah_eadata != NULL, it means creating the * striped directory with specified stripeEA, then it * should ignore the default stripeEA */ - if ((hint == NULL || hint->dah_eadata == NULL) && - lo->ldo_dir_stripe_offset != -1 && - lo->ldo_dir_stripe_offset != ss->ss_node_id) - GOTO(out, rc = -EREMOTE); - - /* Orphan object (like migrating object) does not have - * lod_dir_stripe, see lod_ah_init */ - if (lo->ldo_dir_stripe != NULL) - rc = lod_declare_dir_striping_create(env, dt, attr, - dof, th); + if (hint != NULL && hint->dah_eadata == NULL) { + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STALE_DIR_LAYOUT)) + GOTO(out, rc = -EREMOTE); + + if (lo->ldo_dir_stripe_offset == -1) { + /* child and parent should be in the same MDT */ + if (hint->dah_parent != NULL && + dt_object_remote(hint->dah_parent)) + GOTO(out, rc = -EREMOTE); + } else if (lo->ldo_dir_stripe_offset != + ss->ss_node_id) { + struct lod_device *lod; + struct lod_tgt_descs *ltd; + struct lod_tgt_desc *tgt = NULL; + bool found_mdt = false; + int i; + + lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + ltd = &lod->lod_mdt_descs; + cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) { + tgt = LTD_TGT(ltd, i); + if (tgt->ltd_index == + lo->ldo_dir_stripe_offset) { + found_mdt = true; + break; + } + } + + /* If the MDT indicated by stripe_offset can be + * found, then tell client to resend the create + * request to the correct MDT, otherwise return + * error to client */ + if (found_mdt) + GOTO(out, rc = -EREMOTE); + else + GOTO(out, rc = -EINVAL); + } + } + + rc = lod_declare_dir_striping_create(env, dt, attr, dof, th); } out: RETURN(rc); @@ -3486,22 +3408,17 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt, int rc = 0, i; ENTRY; - LASSERT(lo->ldo_striping_cached == 0); - /* create all underlying objects */ for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); - rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th); - + rc = lod_sub_object_create(env, lo->ldo_stripe[i], attr, NULL, + dof, th); if (rc) break; } - if (rc == 0) { + if (rc == 0) rc = lod_generate_and_set_lovea(env, lo, th); - if (rc == 0) - lo->ldo_striping_cached = 1; - } RETURN(rc); } @@ -3520,13 +3437,13 @@ static int lod_object_create(const struct lu_env *env, struct dt_object *dt, struct dt_allocation_hint *hint, struct dt_object_format *dof, struct thandle *th) { - struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); int rc; ENTRY; /* create local object */ - rc = dt_create(env, next, attr, hint, dof, th); + rc = lod_sub_object_create(env, dt_object_child(dt), attr, hint, dof, + th); if (rc != 0) RETURN(rc); @@ -3576,42 +3493,44 @@ static int lod_declare_object_destroy(const struct lu_env *env, RETURN(rc); for (i = 0; i < lo->ldo_stripenr; i++) { - rc = dt_declare_ref_del(env, next, th); + rc = lod_sub_object_declare_ref_del(env, next, th); if (rc != 0) RETURN(rc); + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)), i); - rc = dt_declare_delete(env, next, + rc = lod_sub_object_declare_delete(env, next, (const struct dt_key *)stripe_name, th); if (rc != 0) RETURN(rc); } } + /* * we declare destroy for the local object */ - rc = dt_declare_destroy(env, next, th); + rc = lod_sub_object_declare_destroy(env, next, th); if (rc) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || + OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) RETURN(0); /* declare destroy all striped objects */ for (i = 0; i < lo->ldo_stripenr; i++) { - if (likely(lo->ldo_stripe[i] != NULL)) { - if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { - rc = dt_declare_ref_del(env, lo->ldo_stripe[i], - th); - if (rc != 0) - RETURN(rc); - } + if (lo->ldo_stripe[i] == NULL) + continue; - rc = dt_declare_destroy(env, lo->ldo_stripe[i], th); - if (rc != 0) - break; - } + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) + rc = lod_sub_object_declare_ref_del(env, + lo->ldo_stripe[i], th); + + rc = lod_sub_object_declare_destroy(env, lo->ldo_stripe[i], + th); + if (rc != 0) + break; } RETURN(rc); @@ -3645,7 +3564,7 @@ static int lod_object_destroy(const struct lu_env *env, RETURN(rc); for (i = 0; i < lo->ldo_stripenr; i++) { - rc = dt_ref_del(env, next, th); + rc = lod_sub_object_ref_del(env, next, th); if (rc != 0) RETURN(rc); @@ -3657,18 +3576,19 @@ static int lod_object_destroy(const struct lu_env *env, PFID(lu_object_fid(&dt->do_lu)), stripe_name, PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu))); - rc = dt_delete(env, next, - (const struct dt_key *)stripe_name, - th, BYPASS_CAPA); + rc = lod_sub_object_delete(env, next, + (const struct dt_key *)stripe_name, th); if (rc != 0) RETURN(rc); } } - rc = dt_destroy(env, next, th); + + rc = lod_sub_object_destroy(env, next, th); if (rc != 0) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || + OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) RETURN(0); /* destroy all striped objects */ @@ -3679,13 +3599,14 @@ static int lod_object_destroy(const struct lu_env *env, if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { dt_write_lock(env, lo->ldo_stripe[i], MOR_TGT_CHILD); - rc = dt_ref_del(env, lo->ldo_stripe[i], th); + rc = lod_sub_object_ref_del(env, + lo->ldo_stripe[i], th); dt_write_unlock(env, lo->ldo_stripe[i]); if (rc != 0) break; } - rc = dt_destroy(env, lo->ldo_stripe[i], th); + rc = lod_sub_object_destroy(env, lo->ldo_stripe[i], th); if (rc != 0) break; } @@ -3703,7 +3624,7 @@ static int lod_object_destroy(const struct lu_env *env, static int lod_declare_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - return dt_declare_ref_add(env, dt_object_child(dt), th); + return lod_sub_object_declare_ref_add(env, dt_object_child(dt), th); } /** @@ -3714,7 +3635,7 @@ static int lod_declare_ref_add(const struct lu_env *env, static int lod_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - return dt_ref_add(env, dt_object_child(dt), th); + return lod_sub_object_ref_add(env, dt_object_child(dt), th); } /** @@ -3726,7 +3647,7 @@ static int lod_ref_add(const struct lu_env *env, static int lod_declare_ref_del(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - return dt_declare_ref_del(env, dt_object_child(dt), th); + return lod_sub_object_declare_ref_del(env, dt_object_child(dt), th); } /** @@ -3737,19 +3658,7 @@ static int lod_declare_ref_del(const struct lu_env *env, static int lod_ref_del(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - return dt_ref_del(env, dt_object_child(dt), th); -} - -/** - * Implementation of dt_object_operations::do_capa_get. - * - * \see dt_object_operations::do_capa_get() in the API description for details. - */ -static struct obd_capa *lod_capa_get(const struct lu_env *env, - struct dt_object *dt, - struct lustre_capa *old, __u64 opc) -{ - return dt_capa_get(env, dt_object_child(dt), old, opc); + return lod_sub_object_ref_del(env, dt_object_child(dt), th); } /** @@ -3764,16 +3673,11 @@ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt, return dt_object_sync(env, dt_object_child(dt), start, end); } -struct lod_slave_locks { - int lsl_lock_count; - struct lustre_handle lsl_handle[0]; -}; - /** * Release LDLM locks on the stripes of a striped directory. * * Iterates over all the locks taken on the stripe objects and - * release them using ->do_object_unlock() method. + * cancel them. * * \param[in] env execution environment * \param[in] dt striped object @@ -3786,10 +3690,9 @@ struct lod_slave_locks { static int lod_object_unlock_internal(const struct lu_env *env, struct dt_object *dt, struct ldlm_enqueue_info *einfo, - ldlm_policy_data_t *policy) + union ldlm_policy_data *policy) { - struct lod_object *lo = lod_dt_obj(dt); - struct lod_slave_locks *slave_locks = einfo->ei_cbdata; + struct lustre_handle_array *slave_locks = einfo->ei_cbdata; int rc = 0; int i; ENTRY; @@ -3797,16 +3700,10 @@ static int lod_object_unlock_internal(const struct lu_env *env, if (slave_locks == NULL) RETURN(0); - for (i = 1; i < slave_locks->lsl_lock_count; i++) { - if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) { - int rc1; - - einfo->ei_cbdata = &slave_locks->lsl_handle[i]; - rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo, - policy); - if (rc1 < 0) - rc = rc == 0 ? rc1 : rc; - } + for (i = 1; i < slave_locks->count; i++) { + if (lustre_handle_is_used(&slave_locks->handles[i])) + ldlm_lock_decref_and_cancel(&slave_locks->handles[i], + einfo->ei_mode); } RETURN(rc); @@ -3824,36 +3721,33 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt, struct ldlm_enqueue_info *einfo, union ldlm_policy_data *policy) { - struct lod_object *lo = lod_dt_obj(dt); - struct lod_slave_locks *slave_locks = einfo->ei_cbdata; - int slave_locks_size; - int rc; + struct lod_object *lo = lod_dt_obj(dt); + struct lustre_handle_array *slave_locks = einfo->ei_cbdata; + int slave_locks_size; + int i; ENTRY; if (slave_locks == NULL) RETURN(0); - if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) - RETURN(-ENOTDIR); - - rc = lod_load_striping(env, lo); - if (rc != 0) - RETURN(rc); - + LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr)); + LASSERT(lo->ldo_stripenr > 1); /* Note: for remote lock for single stripe dir, MDT will cancel * the lock by lockh directly */ - if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt))) - RETURN(0); + LASSERT(!dt_object_remote(dt_object_child(dt))); - /* Only cancel slave lock for striped dir */ - rc = lod_object_unlock_internal(env, dt, einfo, policy); + /* locks were unlocked in MDT layer */ + for (i = 1; i < slave_locks->count; i++) { + LASSERT(!lustre_handle_is_used(&slave_locks->handles[i])); + dt_invalidate(env, lo->ldo_stripe[i]); + } - slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count * - sizeof(slave_locks->lsl_handle[0]); + slave_locks_size = sizeof(*slave_locks) + slave_locks->count * + sizeof(slave_locks->handles[0]); OBD_FREE(slave_locks, slave_locks_size); einfo->ei_cbdata = NULL; - RETURN(rc); + RETURN(0); } /** @@ -3874,7 +3768,7 @@ static int lod_object_lock(const struct lu_env *env, int rc = 0; int i; int slave_locks_size; - struct lod_slave_locks *slave_locks = NULL; + struct lustre_handle_array *slave_locks = NULL; ENTRY; /* remote object lock */ @@ -3885,23 +3779,30 @@ static int lod_object_lock(const struct lu_env *env, } if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) - RETURN(-ENOTDIR); + GOTO(out, rc = -ENOTDIR); rc = lod_load_striping(env, lo); if (rc != 0) - RETURN(rc); + GOTO(out, rc); /* No stripes */ - if (lo->ldo_stripenr <= 1) - RETURN(0); + if (lo->ldo_stripenr <= 1) { + /* + * NB, ei_cbdata stores pointer to slave locks, if no locks + * taken, make sure it's set to NULL, otherwise MDT will try to + * unlock them. + */ + einfo->ei_cbdata = NULL; + GOTO(out, rc = 0); + } slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr * - sizeof(slave_locks->lsl_handle[0]); + sizeof(slave_locks->handles[0]); /* Freed in lod_object_unlock */ OBD_ALLOC(slave_locks, slave_locks_size); if (slave_locks == NULL) - RETURN(-ENOMEM); - slave_locks->lsl_lock_count = lo->ldo_stripenr; + GOTO(out, rc = -ENOMEM); + slave_locks->count = lo->ldo_stripenr; /* striped directory lock */ for (i = 1; i < lo->ldo_stripenr; i++) { @@ -3913,27 +3814,59 @@ static int lod_object_lock(const struct lu_env *env, res_id); einfo->ei_res_id = res_id; - LASSERT(lo->ldo_stripe[i]); - rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo, - policy); + LASSERT(lo->ldo_stripe[i] != NULL); + if (likely(dt_object_remote(lo->ldo_stripe[i]))) { + rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, + einfo, policy); + } else { + struct ldlm_namespace *ns = einfo->ei_namespace; + ldlm_blocking_callback blocking = einfo->ei_cb_local_bl; + ldlm_completion_callback completion = einfo->ei_cb_cp; + __u64 dlmflags = LDLM_FL_ATOMIC_CB; + + if (einfo->ei_mode == LCK_PW || + einfo->ei_mode == LCK_EX) + dlmflags |= LDLM_FL_COS_INCOMPAT; + + /* This only happens if there are mulitple stripes + * on the master MDT, i.e. except stripe0, there are + * other stripes on the Master MDT as well, Only + * happens in the test case right now. */ + LASSERT(ns != NULL); + rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, + policy, einfo->ei_mode, + &dlmflags, blocking, + completion, NULL, + NULL, 0, LVB_T_NONE, + NULL, &lockh); + } if (rc != 0) - GOTO(out, rc); - slave_locks->lsl_handle[i] = lockh; + break; + slave_locks->handles[i] = lockh; } - einfo->ei_cbdata = slave_locks; -out: if (rc != 0 && slave_locks != NULL) { - einfo->ei_cbdata = slave_locks; lod_object_unlock_internal(env, dt, einfo, policy); OBD_FREE(slave_locks, slave_locks_size); - einfo->ei_cbdata = NULL; } - + EXIT; +out: + if (rc != 0) + einfo->ei_cbdata = NULL; RETURN(rc); } +/** + * Implementation of dt_object_operations::do_invalidate. + * + * \see dt_object_operations::do_invalidate() in the API description for details + */ +static int lod_invalidate(const struct lu_env *env, struct dt_object *dt) +{ + return dt_invalidate(env, dt_object_child(dt)); +} + struct dt_object_operations lod_obj_ops = { .do_read_lock = lod_object_read_lock, .do_write_lock = lod_object_write_lock, @@ -3959,10 +3892,10 @@ struct dt_object_operations lod_obj_ops = { .do_ref_add = lod_ref_add, .do_declare_ref_del = lod_declare_ref_del, .do_ref_del = lod_ref_del, - .do_capa_get = lod_capa_get, .do_object_sync = lod_object_sync, .do_object_lock = lod_object_lock, .do_object_unlock = lod_object_unlock, + .do_invalidate = lod_invalidate, }; /** @@ -3971,11 +3904,10 @@ struct dt_object_operations lod_obj_ops = { * \see dt_body_operations::dbo_read() in the API description for details. */ static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, loff_t *pos, - struct lustre_capa *capa) + struct lu_buf *buf, loff_t *pos) { struct dt_object *next = dt_object_child(dt); - return next->do_body_ops->dbo_read(env, next, buf, pos, capa); + return next->do_body_ops->dbo_read(env, next, buf, pos); } /** @@ -3989,8 +3921,8 @@ static ssize_t lod_declare_write(const struct lu_env *env, const struct lu_buf *buf, loff_t pos, struct thandle *th) { - return dt_declare_record_write(env, dt_object_child(dt), - buf, pos, th); + return lod_sub_object_declare_write(env, dt_object_child(dt), buf, pos, + th); } /** @@ -4000,11 +3932,28 @@ static ssize_t lod_declare_write(const struct lu_env *env, */ static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, - struct thandle *th, struct lustre_capa *capa, int iq) + struct thandle *th, int iq) { - struct dt_object *next = dt_object_child(dt); - LASSERT(next); - return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq); + return lod_sub_object_write(env, dt_object_child(dt), buf, pos, th, iq); +} + +static int lod_declare_punch(const struct lu_env *env, struct dt_object *dt, + __u64 start, __u64 end, struct thandle *th) +{ + if (dt_object_remote(dt)) + return -ENOTSUPP; + + return lod_sub_object_declare_punch(env, dt_object_child(dt), start, + end, th); +} + +static int lod_punch(const struct lu_env *env, struct dt_object *dt, + __u64 start, __u64 end, struct thandle *th) +{ + if (dt_object_remote(dt)) + return -ENOTSUPP; + + return lod_sub_object_punch(env, dt_object_child(dt), start, end, th); } static const struct dt_body_operations lod_body_lnk_ops = { @@ -4013,6 +3962,14 @@ static const struct dt_body_operations lod_body_lnk_ops = { .dbo_write = lod_write }; +static const struct dt_body_operations lod_body_ops = { + .dbo_read = lod_read, + .dbo_declare_write = lod_declare_write, + .dbo_write = lod_write, + .dbo_declare_punch = lod_declare_punch, + .dbo_punch = lod_punch, +}; + /** * Implementation of lu_object_operations::loo_object_init. * @@ -4099,14 +4056,11 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, */ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) { - int i; + int len; - if (lo->ldo_dir_stripe != NULL) { - OBD_FREE_PTR(lo->ldo_dir_stripe); - lo->ldo_dir_stripe = NULL; - } + if (lo->ldo_stripe != NULL) { + int i; - if (lo->ldo_stripe) { LASSERT(lo->ldo_stripes_allocated > 0); for (i = 0; i < lo->ldo_stripenr; i++) { @@ -4114,14 +4068,12 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) lu_object_put(env, &lo->ldo_stripe[i]->do_lu); } - i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated; - OBD_FREE(lo->ldo_stripe, i); + len = sizeof(struct dt_object *) * lo->ldo_stripes_allocated; + OBD_FREE(lo->ldo_stripe, len); lo->ldo_stripe = NULL; lo->ldo_stripes_allocated = 0; } - lo->ldo_striping_cached = 0; lo->ldo_stripenr = 0; - lo->ldo_pattern = 0; } /** @@ -4132,8 +4084,16 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) */ static int lod_object_start(const struct lu_env *env, struct lu_object *o) { - if (S_ISLNK(o->lo_header->loh_attr & S_IFMT)) + if (S_ISLNK(o->lo_header->loh_attr & S_IFMT)) { lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops; + } else if (S_ISREG(o->lo_header->loh_attr & S_IFMT) || + fid_is_local_file(lu_object_fid(o))) { + /* Note: some local file (like last rcvd) is created + * through bottom layer (OSD), so the object initialization + * comes to lod, it does not set loh_attr yet, so + * set do_body_ops for local file anyway */ + lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_ops; + } return 0; } @@ -4145,18 +4105,13 @@ static int lod_object_start(const struct lu_env *env, struct lu_object *o) */ static void lod_object_free(const struct lu_env *env, struct lu_object *o) { - struct lod_object *mo = lu2lod_obj(o); - - /* - * release all underlying object pinned - */ - - lod_object_free_striping(env, mo); - - lod_object_set_pool(mo, NULL); + struct lod_object *lo = lu2lod_obj(o); + lod_object_set_pool(lo, NULL); + /* release all underlying object pinned */ + lod_object_free_striping(env, lo); lu_object_fini(o); - OBD_SLAB_FREE_PTR(mo, lod_object_kmem); + OBD_SLAB_FREE_PTR(lo, lod_object_kmem); } /**