X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=b6c9a4f7a9c8ca52792eb46e340ae29243dcf3c2;hp=dba607cc3d01a76c815c50042f9faf38c3636b01;hb=de8572645d287d17c409b99dabdf176822d91486;hpb=75f0f0675d0eadc634c098d664b7af78547999c9 diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index dba607c..b6c9a4f 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -6,13 +6,13 @@ * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 only, * as published by the Free Software Foundation. - + * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License version 2 for more details. A copy is * included in the COPYING file that accompanied this code. - + * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA @@ -23,7 +23,7 @@ * Copyright 2009 Sun Microsystems, Inc. All rights reserved * Use is subject to license terms. * - * Copyright (c) 2012, 2013, Intel Corporation. + * Copyright (c) 2012, 2014, Intel Corporation. */ /* * lustre/lod/lod_object.c @@ -58,7 +58,6 @@ static const char dot[] = "."; static const char dotdot[] = ".."; -extern struct kmem_cache *lod_object_kmem; static const struct dt_body_operations lod_body_lnk_ops; /** @@ -69,11 +68,10 @@ static const struct dt_body_operations lod_body_lnk_ops; * \see dt_index_operations::dio_lookup() in the API description for details. */ static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) + struct dt_rec *rec, const struct dt_key *key) { struct dt_object *next = dt_object_child(dt); - return next->do_index_ops->dio_lookup(env, next, rec, key, capa); + return next->do_index_ops->dio_lookup(env, next, rec, key); } /** @@ -88,9 +86,10 @@ static int lod_declare_index_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, - struct thandle *handle) + struct thandle *th) { - return dt_declare_insert(env, dt_object_child(dt), rec, key, handle); + return lod_sub_object_declare_insert(env, dt_object_child(dt), + rec, key, th); } /** @@ -105,10 +104,10 @@ static int lod_index_insert(const struct lu_env *env, const struct dt_rec *rec, const struct dt_key *key, struct thandle *th, - struct lustre_capa *capa, int ign) { - return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign); + return lod_sub_object_index_insert(env, dt_object_child(dt), rec, key, + th, ign); } /** @@ -124,7 +123,8 @@ static int lod_declare_index_delete(const struct lu_env *env, const struct dt_key *key, struct thandle *th) { - return dt_declare_delete(env, dt_object_child(dt), key, th); + return lod_sub_object_declare_delete(env, dt_object_child(dt), key, + th); } /** @@ -137,10 +137,9 @@ static int lod_declare_index_delete(const struct lu_env *env, static int lod_index_delete(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, - struct thandle *th, - struct lustre_capa *capa) + struct thandle *th) { - return dt_delete(env, dt_object_child(dt), key, th, capa); + return lod_sub_object_delete(env, dt_object_child(dt), key, th); } /** @@ -151,15 +150,13 @@ static int lod_index_delete(const struct lu_env *env, * \see dt_it_ops::init() in the API description for details. */ static struct dt_it *lod_it_init(const struct lu_env *env, - struct dt_object *dt, __u32 attr, - struct lustre_capa *capa) + struct dt_object *dt, __u32 attr) { struct dt_object *next = dt_object_child(dt); struct lod_it *it = &lod_env_info(env)->lti_it; struct dt_it *it_next; - - it_next = next->do_index_ops->dio_it.init(env, next, attr, capa); + it_next = next->do_index_ops->dio_it.init(env, next, attr); if (IS_ERR(it_next)) return it_next; @@ -188,7 +185,7 @@ do { \ * * \see dt_index_operations::dio_it.fini() in the API description for details. */ -void lod_it_fini(const struct lu_env *env, struct dt_it *di) +static void lod_it_fini(const struct lu_env *env, struct dt_it *di) { struct lod_it *it = (struct lod_it *)di; @@ -207,8 +204,8 @@ void lod_it_fini(const struct lu_env *env, struct dt_it *di) * * \see dt_it_ops::get() in the API description for details. */ -int lod_it_get(const struct lu_env *env, struct dt_it *di, - const struct dt_key *key) +static int lod_it_get(const struct lu_env *env, struct dt_it *di, + const struct dt_key *key) { const struct lod_it *it = (const struct lod_it *)di; @@ -223,7 +220,7 @@ int lod_it_get(const struct lu_env *env, struct dt_it *di, * * \see dt_it_ops::put() in the API description for details. */ -void lod_it_put(const struct lu_env *env, struct dt_it *di) +static void lod_it_put(const struct lu_env *env, struct dt_it *di) { struct lod_it *it = (struct lod_it *)di; @@ -238,7 +235,7 @@ void lod_it_put(const struct lu_env *env, struct dt_it *di) * * \see dt_it_ops::next() in the API description for details. */ -int lod_it_next(const struct lu_env *env, struct dt_it *di) +static int lod_it_next(const struct lu_env *env, struct dt_it *di) { struct lod_it *it = (struct lod_it *)di; @@ -253,7 +250,8 @@ int lod_it_next(const struct lu_env *env, struct dt_it *di) * * \see dt_it_ops::key() in the API description for details. */ -struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di) +static struct dt_key *lod_it_key(const struct lu_env *env, + const struct dt_it *di) { const struct lod_it *it = (const struct lod_it *)di; @@ -268,7 +266,7 @@ struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di) * * \see dt_it_ops::key_size() in the API description for details. */ -int lod_it_key_size(const struct lu_env *env, const struct dt_it *di) +static int lod_it_key_size(const struct lu_env *env, const struct dt_it *di) { struct lod_it *it = (struct lod_it *)di; @@ -283,8 +281,8 @@ int lod_it_key_size(const struct lu_env *env, const struct dt_it *di) * * \see dt_it_ops::rec() in the API description for details. */ -int lod_it_rec(const struct lu_env *env, const struct dt_it *di, - struct dt_rec *rec, __u32 attr) +static int lod_it_rec(const struct lu_env *env, const struct dt_it *di, + struct dt_rec *rec, __u32 attr) { const struct lod_it *it = (const struct lod_it *)di; @@ -300,8 +298,8 @@ int lod_it_rec(const struct lu_env *env, const struct dt_it *di, * * \see dt_it_ops::rec_size() in the API description for details. */ -int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di, - __u32 attr) +static int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di, + __u32 attr) { const struct lod_it *it = (const struct lod_it *)di; @@ -317,7 +315,7 @@ int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di, * * \see dt_it_ops::store() in the API description for details. */ -__u64 lod_it_store(const struct lu_env *env, const struct dt_it *di) +static __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di) { const struct lod_it *it = (const struct lod_it *)di; @@ -332,7 +330,8 @@ __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di) * * \see dt_it_ops::load() in the API description for details. */ -int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) +static int lod_it_load(const struct lu_env *env, const struct dt_it *di, + __u64 hash) { const struct lod_it *it = (const struct lod_it *)di; @@ -347,8 +346,8 @@ int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) * * \see dt_it_ops::rec() in the API description for details. */ -int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di, - void *key_rec) +static int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di, + void *key_rec) { const struct lod_it *it = (const struct lod_it *)di; @@ -388,8 +387,7 @@ static struct dt_index_operations lod_index_ops = { * \see dt_it_ops::init() in the API description for details. */ static struct dt_it *lod_striped_it_init(const struct lu_env *env, - struct dt_object *dt, __u32 attr, - struct lustre_capa *capa) + struct dt_object *dt, __u32 attr) { struct lod_object *lo = lod_dt_obj(dt); struct dt_object *next; @@ -402,7 +400,7 @@ static struct dt_it *lod_striped_it_init(const struct lu_env *env, LASSERT(next != NULL); LASSERT(next->do_index_ops != NULL); - it_next = next->do_index_ops->dio_it.init(env, next, attr, capa); + it_next = next->do_index_ops->dio_it.init(env, next, attr); if (IS_ERR(it_next)) return it_next; @@ -573,8 +571,7 @@ again: LASSERT(next != NULL); LASSERT(next->do_index_ops != NULL); - it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr, - BYPASS_CAPA); + it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr); if (!IS_ERR(it_next)) { it->lit_it = it_next; goto again; @@ -824,7 +821,7 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid)); iops = &obj->do_index_ops->dio_it; - it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA); + it = iops->init(env, obj, LUDA_64BITHASH); if (IS_ERR(it)) RETURN(PTR_ERR(it)); @@ -1051,14 +1048,13 @@ static int lod_object_write_locked(const struct lu_env *env, */ static int lod_attr_get(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, - struct lustre_capa *capa) + struct lu_attr *attr) { /* Note: for striped directory, client will merge attributes * from all of the sub-stripes see lmv_merge_attr(), and there * no MDD logic depend on directory nlink/size/time, so we can * always use master inode nlink and size for now. */ - return dt_attr_get(env, dt_object_child(dt), attr, capa); + return dt_attr_get(env, dt_object_child(dt), attr); } /** @@ -1081,7 +1077,7 @@ static int lod_attr_get(const struct lu_env *env, **/ static int lod_mark_dead_object(const struct lu_env *env, struct dt_object *dt, - struct thandle *handle, + struct thandle *th, bool declare) { struct lod_object *lo = lod_dt_obj(dt); @@ -1117,13 +1113,14 @@ static int lod_mark_dead_object(const struct lu_env *env, buf.lb_buf = lmv; buf.lb_len = sizeof(*lmv); if (declare) { - rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf, - XATTR_NAME_LMV, - LU_XATTR_REPLACE, handle); + rc = lod_sub_object_declare_xattr_set(env, + lo->ldo_stripe[i], &buf, + XATTR_NAME_LMV, + LU_XATTR_REPLACE, th); } else { - rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf, - XATTR_NAME_LMV, LU_XATTR_REPLACE, - handle, BYPASS_CAPA); + rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i], + &buf, XATTR_NAME_LMV, + LU_XATTR_REPLACE, th); } if (rc != 0) break; @@ -1143,7 +1140,7 @@ static int lod_mark_dead_object(const struct lu_env *env, static int lod_declare_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, - struct thandle *handle) + struct thandle *th) { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); @@ -1153,14 +1150,14 @@ static int lod_declare_attr_set(const struct lu_env *env, /* Set dead object on all other stripes */ if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) && attr->la_flags & LUSTRE_SLAVE_DEAD_FL) { - rc = lod_mark_dead_object(env, dt, handle, true); + rc = lod_mark_dead_object(env, dt, th, true); RETURN(rc); } /* * declare setattr on the local object */ - rc = dt_declare_attr_set(env, next, attr, handle); + rc = lod_sub_object_declare_attr_set(env, next, attr, th); if (rc) RETURN(rc); @@ -1199,20 +1196,20 @@ static int lod_declare_attr_set(const struct lu_env *env, */ LASSERT(lo->ldo_stripe); for (i = 0; i < lo->ldo_stripenr; i++) { - if (likely(lo->ldo_stripe[i] != NULL)) { - rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, - handle); - if (rc != 0) { - CERROR("failed declaration: %d\n", rc); - break; - } - } + if (lo->ldo_stripe[i] == NULL) + continue; + rc = lod_sub_object_declare_attr_set(env, + lo->ldo_stripe[i], attr, + th); + if (rc != 0) + RETURN(rc); } if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) && dt_object_exists(next) != 0 && dt_object_remote(next) == 0) - dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle); + lod_sub_object_declare_xattr_del(env, next, + XATTR_NAME_LOV, th); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) && dt_object_exists(next) && @@ -1222,8 +1219,9 @@ static int lod_declare_attr_set(const struct lu_env *env, buf->lb_buf = info->lti_ea_store; buf->lb_len = info->lti_ea_store_size; - dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV, - LU_XATTR_REPLACE, handle); + lod_sub_object_declare_xattr_set(env, next, buf, + XATTR_NAME_LOV, + LU_XATTR_REPLACE, th); } RETURN(rc); @@ -1240,8 +1238,7 @@ static int lod_declare_attr_set(const struct lu_env *env, static int lod_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, - struct thandle *handle, - struct lustre_capa *capa) + struct thandle *th) { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); @@ -1251,14 +1248,14 @@ static int lod_attr_set(const struct lu_env *env, /* Set dead object on all other stripes */ if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) && attr->la_flags & LUSTRE_SLAVE_DEAD_FL) { - rc = lod_mark_dead_object(env, dt, handle, false); + rc = lod_mark_dead_object(env, dt, th, false); RETURN(rc); } /* * apply changes to the local object */ - rc = dt_attr_set(env, next, attr, handle, capa); + rc = lod_sub_object_attr_set(env, next, attr, th); if (rc) RETURN(rc); @@ -1284,21 +1281,20 @@ static int lod_attr_set(const struct lu_env *env, for (i = 0; i < lo->ldo_stripenr; i++) { if (unlikely(lo->ldo_stripe[i] == NULL)) continue; + if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && (dt_object_exists(lo->ldo_stripe[i]) == 0)) continue; - rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa); - if (rc != 0) { - CERROR("failed declaration: %d\n", rc); + rc = lod_sub_object_attr_set(env, lo->ldo_stripe[i], attr, th); + if (rc != 0) break; - } } if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) && dt_object_exists(next) != 0 && dt_object_remote(next) == 0) - dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA); + rc = lod_sub_object_xattr_del(env, next, XATTR_NAME_LOV, th); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) && dt_object_exists(next) && @@ -1329,8 +1325,9 @@ static int lod_attr_set(const struct lu_env *env, fid->f_oid--; fid_to_ostid(fid, oi); ostid_cpu_to_le(oi, &objs->l_ost_oi); - dt_xattr_set(env, next, buf, XATTR_NAME_LOV, - LU_XATTR_REPLACE, handle, BYPASS_CAPA); + + rc = lod_sub_object_xattr_set(env, next, buf, XATTR_NAME_LOV, + LU_XATTR_REPLACE, th); } RETURN(rc); @@ -1345,15 +1342,14 @@ static int lod_attr_set(const struct lu_env *env, * \see dt_object_operations::do_xattr_get() in the API description for details. */ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, const char *name, - struct lustre_capa *capa) + struct lu_buf *buf, const char *name) { struct lod_thread_info *info = lod_env_info(env); struct lod_device *dev = lu2lod_dev(dt->do_lu.lo_dev); int rc, is_root; ENTRY; - rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa); + rc = dt_xattr_get(env, dt_object_child(dt), buf, name); if (strcmp(name, XATTR_NAME_LMV) == 0) { struct lmv_mds_md_v1 *lmv1; int rc1 = 0; @@ -1370,7 +1366,7 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, info->lti_buf.lb_buf = info->lti_key; info->lti_buf.lb_len = sizeof(*lmv1); rc = dt_xattr_get(env, dt_object_child(dt), - &info->lti_buf, name, capa); + &info->lti_buf, name); if (unlikely(rc != sizeof(*lmv1))) RETURN(rc = rc > 0 ? -EINVAL : rc); @@ -1432,7 +1428,7 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, /** * Verify LVM EA. * - * Checks that the magic and the number of the stripes are sane. + * Checks that the magic of the stripe is sane. * * \param[in] lod lod device * \param[in] lum a buffer storing LMV EA to verify @@ -1443,22 +1439,16 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, static int lod_verify_md_striping(struct lod_device *lod, const struct lmv_user_md_v1 *lum) { - int rc = 0; - ENTRY; - - if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC)) - GOTO(out, rc = -EINVAL); - - if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0)) - GOTO(out, rc = -EINVAL); -out: - if (rc != 0) + if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC)) { CERROR("%s: invalid lmv_user_md: magic = %x, " "stripe_offset = %d, stripe_count = %u: rc = %d\n", lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic), (int)le32_to_cpu(lum->lum_stripe_offset), - le32_to_cpu(lum->lum_stripe_count), rc); - return rc; + le32_to_cpu(lum->lum_stripe_count), -EINVAL); + return -EINVAL; + } + + return 0; } /** @@ -1490,8 +1480,8 @@ static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv, * \retval 0 on success * \retval negative if failed */ -int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *lmv_buf) +static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, + struct lu_buf *lmv_buf) { struct lod_thread_info *info = lod_env_info(env); struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); @@ -1527,7 +1517,6 @@ int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx); lmv_buf->lb_buf = info->lti_ea_store; lmv_buf->lb_len = sizeof(*lmm1); - lo->ldo_dir_striping_cached = 1; RETURN(rc); } @@ -1644,6 +1633,139 @@ out: * \retval 0 on success * \retval negative if failed */ +static int lod_dir_declare_create_stripes(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_object_format *dof, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_buf lmv_buf; + struct lu_buf slave_lmv_buf; + struct lmv_mds_md_v1 *lmm; + struct lmv_mds_md_v1 *slave_lmm = NULL; + struct dt_insert_rec *rec = &info->lti_dt_rec; + struct lod_object *lo = lod_dt_obj(dt); + int rc; + __u32 i; + ENTRY; + + rc = lod_prep_lmv_md(env, dt, &lmv_buf); + if (rc != 0) + GOTO(out, rc); + lmm = lmv_buf.lb_buf; + + OBD_ALLOC_PTR(slave_lmm); + if (slave_lmm == NULL) + GOTO(out, rc = -ENOMEM); + + lod_prep_slave_lmv_md(slave_lmm, lmm); + slave_lmv_buf.lb_buf = slave_lmm; + slave_lmv_buf.lb_len = sizeof(*slave_lmm); + + if (!dt_try_as_dir(env, dt_object_child(dt))) + GOTO(out, rc = -EINVAL); + + rec->rec_type = S_IFDIR; + for (i = 0; i < lo->ldo_stripenr; i++) { + struct dt_object *dto = lo->ldo_stripe[i]; + char *stripe_name = info->lti_key; + struct lu_name *sname; + struct linkea_data ldata = { NULL }; + struct lu_buf linkea_buf; + + rc = lod_sub_object_declare_create(env, dto, attr, NULL, + dof, th); + if (rc != 0) + GOTO(out, rc); + + if (!dt_try_as_dir(env, dto)) + GOTO(out, rc = -EINVAL); + + rc = lod_sub_object_declare_ref_add(env, dto, th); + if (rc != 0) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_object_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, th); + if (rc != 0) + GOTO(out, rc); + + /* master stripe FID will be put to .. */ + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = lod_sub_object_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, + th); + if (rc != 0) + GOTO(out, rc); + + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || + cfs_fail_val != i) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && + cfs_fail_val == i) + slave_lmm->lmv_master_mdt_index = + cpu_to_le32(i + 1); + else + slave_lmm->lmv_master_mdt_index = + cpu_to_le32(i); + rc = lod_sub_object_declare_xattr_set(env, dto, + &slave_lmv_buf, XATTR_NAME_LMV, 0, th); + if (rc != 0) + GOTO(out, rc); + } + + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && + cfs_fail_val == i) + snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), i + 1); + else + snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), i); + + sname = lod_name_get(env, stripe_name, strlen(stripe_name)); + rc = linkea_data_new(&ldata, &info->lti_linkea_buf); + if (rc != 0) + GOTO(out, rc); + + rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu)); + if (rc != 0) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_object_declare_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_object_declare_insert(env, dt_object_child(dt), + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, + th); + if (rc != 0) + GOTO(out, rc); + + rc = lod_sub_object_declare_ref_add(env, dt_object_child(dt), + th); + if (rc != 0) + GOTO(out, rc); + } + + rc = lod_sub_object_declare_xattr_set(env, dt_object_child(dt), + &lmv_buf, XATTR_NAME_LMV, 0, th); + if (rc != 0) + GOTO(out, rc); +out: + if (slave_lmm != NULL) + OBD_FREE_PTR(slave_lmm); + + RETURN(rc); +} + static int lod_prep_md_striped_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, @@ -1654,13 +1776,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; struct lod_object *lo = lod_dt_obj(dt); - struct lod_thread_info *info = lod_env_info(env); struct dt_object **stripe; - struct lu_buf lmv_buf; - struct lu_buf slave_lmv_buf; - struct lmv_mds_md_v1 *lmm; - struct lmv_mds_md_v1 *slave_lmm = NULL; - struct dt_insert_rec *rec = &info->lti_dt_rec; __u32 stripe_count; int *idx_array; int rc = 0; @@ -1789,141 +1905,7 @@ next: if (lo->ldo_stripenr == 0) GOTO(out_put, rc = -ENOSPC); - rc = lod_prep_lmv_md(env, dt, &lmv_buf); - if (rc != 0) - GOTO(out_put, rc); - lmm = lmv_buf.lb_buf; - - OBD_ALLOC_PTR(slave_lmm); - if (slave_lmm == NULL) - GOTO(out_put, rc = -ENOMEM); - - lod_prep_slave_lmv_md(slave_lmm, lmm); - slave_lmv_buf.lb_buf = slave_lmm; - slave_lmv_buf.lb_len = sizeof(*slave_lmm); - - if (!dt_try_as_dir(env, dt_object_child(dt))) - GOTO(out_put, rc = -EINVAL); - - rec->rec_type = S_IFDIR; - for (i = 0; i < lo->ldo_stripenr; i++) { - struct dt_object *dto = stripe[i]; - char *stripe_name = info->lti_key; - struct lu_name *sname; - struct linkea_data ldata = { 0 }; - struct lu_buf linkea_buf; - - rc = dt_declare_create(env, dto, attr, NULL, dof, th); - if (rc != 0) - GOTO(out_put, rc); - - if (!dt_try_as_dir(env, dto)) - GOTO(out_put, rc = -EINVAL); - - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th); - if (rc != 0) - GOTO(out_put, rc); - - /* master stripe FID will be put to .. */ - rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th); - if (rc != 0) - GOTO(out_put, rc); - - /* probably nothing to inherite */ - if (lo->ldo_striping_cached && - !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, - lo->ldo_def_stripenr, - lo->ldo_def_stripe_offset, - lo->ldo_pool)) { - struct lov_user_md_v3 *v3; - - /* sigh, lti_ea_store has been used for lmv_buf, - * so we have to allocate buffer for default - * stripe EA */ - OBD_ALLOC_PTR(v3); - if (v3 == NULL) - GOTO(out_put, rc = -ENOMEM); - - memset(v3, 0, sizeof(*v3)); - v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); - v3->lmm_stripe_count = - cpu_to_le16(lo->ldo_def_stripenr); - v3->lmm_stripe_offset = - cpu_to_le16(lo->ldo_def_stripe_offset); - v3->lmm_stripe_size = - cpu_to_le32(lo->ldo_def_stripe_size); - if (lo->ldo_pool != NULL) - strlcpy(v3->lmm_pool_name, lo->ldo_pool, - sizeof(v3->lmm_pool_name)); - - info->lti_buf.lb_buf = v3; - info->lti_buf.lb_len = sizeof(*v3); - rc = dt_declare_xattr_set(env, dto, - &info->lti_buf, - XATTR_NAME_LOV, - 0, th); - OBD_FREE_PTR(v3); - if (rc != 0) - GOTO(out_put, rc); - } - - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || - cfs_fail_val != i) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && - cfs_fail_val == i) - slave_lmm->lmv_master_mdt_index = - cpu_to_le32(i + 1); - else - slave_lmm->lmv_master_mdt_index = - cpu_to_le32(i); - rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf, - XATTR_NAME_LMV, 0, th); - if (rc != 0) - GOTO(out_put, rc); - } - - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && - cfs_fail_val == i) - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), i + 1); - else - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), i); - - sname = lod_name_get(env, stripe_name, strlen(stripe_name)); - rc = linkea_data_new(&ldata, &info->lti_linkea_buf); - if (rc != 0) - GOTO(out_put, rc); - - rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu)); - if (rc != 0) - GOTO(out_put, rc); - - linkea_buf.lb_buf = ldata.ld_buf->lb_buf; - linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = dt_declare_xattr_set(env, dto, &linkea_buf, - XATTR_NAME_LINK, 0, th); - if (rc != 0) - GOTO(out_put, rc); - - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = dt_declare_insert(env, dt_object_child(dt), - (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, th); - if (rc != 0) - GOTO(out_put, rc); - - rc = dt_declare_ref_add(env, dt_object_child(dt), th); - if (rc != 0) - GOTO(out_put, rc); - } - - rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf, - XATTR_NAME_LMV, 0, th); + rc = lod_dir_declare_create_stripes(env, dt, attr, dof, th); if (rc != 0) GOTO(out_put, rc); @@ -1941,8 +1923,6 @@ out_put: out_free: if (idx_array != NULL) OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count); - if (slave_lmm != NULL) - OBD_FREE_PTR(slave_lmm); RETURN(rc); } @@ -2039,7 +2019,7 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, RETURN(rc); } - rc = dt_declare_xattr_set(env, next, buf, name, fl, th); + rc = lod_sub_object_declare_xattr_set(env, next, buf, name, fl, th); if (rc != 0) RETURN(rc); @@ -2057,8 +2037,9 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); - rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf, - name, fl, th); + + rc = lod_sub_object_declare_xattr_set(env, lo->ldo_stripe[i], + buf, name, fl, th); if (rc != 0) break; } @@ -2101,7 +2082,7 @@ static int lod_declare_xattr_set(const struct lu_env *env, * this is a request to manipulate object's striping */ if (dt_object_exists(dt)) { - rc = dt_attr_get(env, next, attr, BYPASS_CAPA); + rc = dt_attr_get(env, next, attr); if (rc) RETURN(rc); } else { @@ -2113,7 +2094,8 @@ static int lod_declare_xattr_set(const struct lu_env *env, } else if (S_ISDIR(mode)) { rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th); } else { - rc = dt_declare_xattr_set(env, next, buf, name, fl, th); + rc = lod_sub_object_declare_xattr_set(env, next, buf, name, + fl, th); } RETURN(rc); @@ -2126,13 +2108,13 @@ static int lod_declare_xattr_set(const struct lu_env *env, */ static void lod_lov_stripe_cache_clear(struct lod_object *lo) { - lo->ldo_striping_cached = 0; lo->ldo_def_striping_set = 0; + lo->ldo_def_striping_cached = 0; lod_object_set_pool(lo, NULL); lo->ldo_def_stripe_size = 0; lo->ldo_def_stripenr = 0; if (lo->ldo_dir_stripe != NULL) - lo->ldo_dir_striping_cached = 0; + lo->ldo_dir_def_striping_cached = 0; } /** @@ -2146,7 +2128,6 @@ static void lod_lov_stripe_cache_clear(struct lod_object *lo) * \param[in] name name of xattr * \param[in] fl flags * \param[in] th transaction handle - * \param[in] capa not used currently * * \retval 0 on success * \retval negative if failed @@ -2154,8 +2135,7 @@ static void lod_lov_stripe_cache_clear(struct lod_object *lo) static int lod_xattr_set_internal(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, - const char *name, int fl, struct thandle *th, - struct lustre_capa *capa) + const char *name, int fl, struct thandle *th) { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); @@ -2163,7 +2143,7 @@ static int lod_xattr_set_internal(const struct lu_env *env, int i; ENTRY; - rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + rc = lod_sub_object_xattr_set(env, next, buf, name, fl, th); if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) RETURN(rc); @@ -2176,8 +2156,9 @@ static int lod_xattr_set_internal(const struct lu_env *env, for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); - rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th, - capa); + + rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i], buf, name, + fl, th); if (rc != 0) break; } @@ -2194,15 +2175,13 @@ static int lod_xattr_set_internal(const struct lu_env *env, * \param[in] dt object * \param[in] name name of xattr * \param[in] th transaction handle - * \param[in] capa not used currently * * \retval 0 on success * \retval negative if failed */ static int lod_xattr_del_internal(const struct lu_env *env, struct dt_object *dt, - const char *name, struct thandle *th, - struct lustre_capa *capa) + const char *name, struct thandle *th) { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); @@ -2210,7 +2189,7 @@ static int lod_xattr_del_internal(const struct lu_env *env, int i; ENTRY; - rc = dt_xattr_del(env, next, name, th, capa); + rc = lod_sub_object_xattr_del(env, next, name, th); if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) RETURN(rc); @@ -2219,8 +2198,9 @@ static int lod_xattr_del_internal(const struct lu_env *env, for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); - rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th, - capa); + + rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name, + th); if (rc != 0) break; } @@ -2242,7 +2222,6 @@ static int lod_xattr_del_internal(const struct lu_env *env, * \param[in] name name of EA * \param[in] fl xattr flag (see OSD API description) * \param[in] th transaction handle - * \param[in] capa not used * * \retval 0 on success * \retval negative if failed @@ -2251,8 +2230,7 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, - struct thandle *th, - struct lustre_capa *capa) + struct thandle *th) { struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); struct lod_object *l = lod_dt_obj(dt); @@ -2292,11 +2270,11 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, if (LOVEA_DELETE_VALUES(lum->lmm_stripe_size, lum->lmm_stripe_count, lum->lmm_stripe_offset, pool_name)) { - rc = lod_xattr_del_internal(env, dt, name, th, capa); + rc = lod_xattr_del_internal(env, dt, name, th); if (rc == -ENODATA) rc = 0; } else { - rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa); + rc = lod_xattr_set_internal(env, dt, buf, name, fl, th); } RETURN(rc); @@ -2316,7 +2294,6 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, * \param[in] name name of EA * \param[in] fl xattr flag (see OSD API description) * \param[in] th transaction handle - * \param[in] capa not used * * \retval 0 on success * \retval negative if failed @@ -2325,8 +2302,7 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, - struct thandle *th, - struct lustre_capa *capa) + struct thandle *th) { struct lod_object *l = lod_dt_obj(dt); struct lmv_user_md_v1 *lum; @@ -2343,11 +2319,11 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)), le32_to_cpu(lum->lum_stripe_offset)) && le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) { - rc = lod_xattr_del_internal(env, dt, name, th, capa); + rc = lod_xattr_del_internal(env, dt, name, th); if (rc == -ENODATA) rc = 0; } else { - rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa); + rc = lod_xattr_set_internal(env, dt, buf, name, fl, th); if (rc != 0) RETURN(rc); } @@ -2359,10 +2335,7 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, RETURN(-ENOMEM); } - l->ldo_dir_striping_cached = 0; - l->ldo_dir_def_striping_set = 1; - l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count); - + l->ldo_dir_def_striping_cached = 0; RETURN(rc); } @@ -2381,15 +2354,13 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, * \param[in] name not used currently * \param[in] fl xattr flag (see OSD API description) * \param[in] th transaction handle - * \param[in] capa not used * * \retval 0 on success * \retval negative if failed */ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, - int fl, struct thandle *th, - struct lustre_capa *capa) + int fl, struct thandle *th) { struct lod_object *lo = lod_dt_obj(dt); struct lod_thread_info *info = lod_env_info(env); @@ -2412,11 +2383,12 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, if (lo->ldo_stripenr == 0) RETURN(0); - rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA); + rc = dt_attr_get(env, dt_object_child(dt), attr); if (rc != 0) RETURN(rc); - attr->la_valid = LA_TYPE | LA_MODE; + attr->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | + LA_MODE | LA_UID | LA_GID | LA_TYPE; dof->dof_type = DFT_DIR; rc = lod_prep_lmv_md(env, dt, &lmv_buf); @@ -2434,65 +2406,39 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, rec->rec_type = S_IFDIR; for (i = 0; i < lo->ldo_stripenr; i++) { - struct dt_object *dto; - char *stripe_name = info->lti_key; + struct dt_object *dto; + char *stripe_name = info->lti_key; struct lu_name *sname; - struct linkea_data ldata = { 0 }; + struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; dto = lo->ldo_stripe[i]; + dt_write_lock(env, dto, MOR_TGT_CHILD); - rc = dt_create(env, dto, attr, NULL, dof, th); + rc = lod_sub_object_create(env, dto, attr, NULL, dof, + th); + if (rc != 0) { + dt_write_unlock(env, dto); + GOTO(out, rc); + } + + rc = lod_sub_object_ref_add(env, dto, th); dt_write_unlock(env, dto); if (rc != 0) - RETURN(rc); + GOTO(out, rc); rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = dt_insert(env, dto, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th, capa, 0); + rc = lod_sub_object_index_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, th, 0); if (rc != 0) - RETURN(rc); + GOTO(out, rc); rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = dt_insert(env, dto, (struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, capa, 0); + rc = lod_sub_object_index_insert(env, dto, (struct dt_rec *)rec, + (const struct dt_key *)dotdot, th, 0); if (rc != 0) - RETURN(rc); - - if (lo->ldo_striping_cached && - !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, - lo->ldo_def_stripenr, - lo->ldo_def_stripe_offset, - lo->ldo_pool)) { - struct lov_user_md_v3 *v3; - - /* sigh, lti_ea_store has been used for lmv_buf, - * so we have to allocate buffer for default - * stripe EA */ - OBD_ALLOC_PTR(v3); - if (v3 == NULL) - GOTO(out, rc); - - memset(v3, 0, sizeof(*v3)); - v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); - v3->lmm_stripe_count = - cpu_to_le16(lo->ldo_def_stripenr); - v3->lmm_stripe_offset = - cpu_to_le16(lo->ldo_def_stripe_offset); - v3->lmm_stripe_size = - cpu_to_le32(lo->ldo_def_stripe_size); - if (lo->ldo_pool != NULL) - strlcpy(v3->lmm_pool_name, lo->ldo_pool, - sizeof(v3->lmm_pool_name)); - - info->lti_buf.lb_buf = v3; - info->lti_buf.lb_len = sizeof(*v3); - rc = dt_xattr_set(env, dto, &info->lti_buf, - XATTR_NAME_LOV, 0, th, capa); - OBD_FREE_PTR(v3); - if (rc != 0) - GOTO(out, rc); - } + GOTO(out, rc); if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || cfs_fail_val != i) { @@ -2503,8 +2449,9 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, else slave_lmm->lmv_master_mdt_index = cpu_to_le32(i); - rc = dt_xattr_set(env, dto, &slave_lmv_buf, - XATTR_NAME_LMV, fl, th, capa); + + rc = lod_sub_object_xattr_set(env, dto, &slave_lmv_buf, + XATTR_NAME_LMV, fl, th); if (rc != 0) GOTO(out, rc); } @@ -2528,27 +2475,26 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, linkea_buf.lb_buf = ldata.ld_buf->lb_buf; linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK, - 0, th, BYPASS_CAPA); + rc = lod_sub_object_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); if (rc != 0) GOTO(out, rc); rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = dt_insert(env, dt_object_child(dt), + rc = lod_sub_object_index_insert(env, dt_object_child(dt), (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, th, capa, 0); + (const struct dt_key *)stripe_name, th, 0); if (rc != 0) GOTO(out, rc); - rc = dt_ref_add(env, dt_object_child(dt), th); + rc = lod_sub_object_ref_add(env, dt_object_child(dt), th); if (rc != 0) GOTO(out, rc); } if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV)) - rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, - XATTR_NAME_LMV, fl, th, capa); - + rc = lod_sub_object_xattr_set(env, dt_object_child(dt), + &lmv_buf, XATTR_NAME_LMV, fl, th); out: if (slave_lmm != NULL) OBD_FREE_PTR(slave_lmm); @@ -2578,12 +2524,12 @@ out: * \retval 0 on success * \retval negative if failed */ -int lod_dir_striping_create_internal(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_object_format *dof, - struct thandle *th, - bool declare) +static int lod_dir_striping_create_internal(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_object_format *dof, + struct thandle *th, + bool declare) { struct lod_thread_info *info = lod_env_info(env); struct lod_object *lo = lod_dt_obj(dt); @@ -2616,14 +2562,13 @@ int lod_dir_striping_create_internal(const struct lu_env *env, &info->lti_buf, dof, th); else rc = lod_xattr_set_lmv(env, dt, &info->lti_buf, - XATTR_NAME_LMV, 0, th, - BYPASS_CAPA); + XATTR_NAME_LMV, 0, th); if (rc != 0) RETURN(rc); } /* Transfer default LMV striping from the parent */ - if (lo->ldo_dir_striping_cached && + if (lo->ldo_dir_def_striping_set && !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr, lo->ldo_dir_def_stripe_offset)) { struct lmv_user_md_v1 *v1 = info->lti_ea_store; @@ -2654,13 +2599,13 @@ int lod_dir_striping_create_internal(const struct lu_env *env, rc = lod_xattr_set_default_lmv_on_dir(env, dt, &info->lti_buf, XATTR_NAME_DEFAULT_LMV, 0, - th, BYPASS_CAPA); + th); if (rc != 0) RETURN(rc); } /* Transfer default LOV striping from the parent */ - if (lo->ldo_striping_cached && + if (lo->ldo_def_striping_set && !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, lo->ldo_def_stripenr, lo->ldo_def_stripe_offset, @@ -2691,8 +2636,7 @@ int lod_dir_striping_create_internal(const struct lu_env *env, XATTR_NAME_LOV, 0, th); else rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf, - XATTR_NAME_LOV, 0, th, - BYPASS_CAPA); + XATTR_NAME_LOV, 0, th); if (rc != 0) RETURN(rc); } @@ -2715,7 +2659,14 @@ static int lod_dir_striping_create(const struct lu_env *env, struct dt_object_format *dof, struct thandle *th) { - return lod_dir_striping_create_internal(env, dt, attr, dof, th, false); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + + rc = lod_dir_striping_create_internal(env, dt, attr, dof, th, false); + if (rc == 0) + lo->ldo_striping_cached = 1; + + return rc; } /** @@ -2738,8 +2689,7 @@ static int lod_dir_striping_create(const struct lu_env *env, */ static int lod_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, - const char *name, int fl, struct thandle *th, - struct lustre_capa *capa) + const char *name, int fl, struct thandle *th) { struct dt_object *next = dt_object_child(dt); int rc; @@ -2751,7 +2701,8 @@ static int lod_xattr_set(const struct lu_env *env, if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION) - rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + rc = lod_sub_object_xattr_set(env, next, buf, name, fl, + th); else rc = lod_dir_striping_create(env, dt, NULL, NULL, th); @@ -2761,25 +2712,34 @@ static int lod_xattr_set(const struct lu_env *env, if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_LOV) == 0) { /* default LOVEA */ - rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa); + rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th); RETURN(rc); } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { /* default LMVEA */ rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl, - th, capa); + th); RETURN(rc); } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) && !strcmp(name, XATTR_NAME_LOV)) { /* in case of lov EA swap, just set it * if not, it is a replay so check striping match what we * already have during req replay, declare_xattr_set() - * defines striping, then create() does the work - */ + * defines striping, then create() does the work */ if (fl & LU_XATTR_REPLACE) { /* free stripes, then update disk */ lod_object_free_striping(env, lod_dt_obj(dt)); - rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + + rc = lod_sub_object_xattr_set(env, next, buf, name, + fl, th); + } else if (dt_object_remote(dt)) { + /* This only happens during migration, see + * mdd_migrate_create(), in which Master MDT will + * create a remote target object, and only set + * (migrating) stripe EA on the remote object, + * and does not need creating each stripes. */ + rc = lod_sub_object_xattr_set(env, next, buf, name, + fl, th); } else { rc = lod_striping_create(env, dt, NULL, NULL, th); } @@ -2787,7 +2747,7 @@ static int lod_xattr_set(const struct lu_env *env, } /* then all other xattr */ - rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa); + rc = lod_xattr_set_internal(env, dt, buf, name, fl, th); RETURN(rc); } @@ -2802,7 +2762,36 @@ static int lod_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) { - return dt_declare_xattr_del(env, dt_object_child(dt), name, th); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + int i; + ENTRY; + + rc = lod_sub_object_declare_xattr_del(env, dt_object_child(dt), + name, th); + if (rc != 0) + RETURN(rc); + + if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(0); + + /* set xattr to each stripes, if needed */ + rc = lod_load_striping(env, lo); + if (rc != 0) + RETURN(rc); + + if (lo->ldo_stripenr == 0) + RETURN(0); + + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + rc = lod_sub_object_declare_xattr_del(env, lo->ldo_stripe[i], + name, th); + if (rc != 0) + break; + } + + RETURN(rc); } /** @@ -2814,12 +2803,33 @@ static int lod_declare_xattr_del(const struct lu_env *env, * \see dt_object_operations::do_xattr_del() in the API description for details. */ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, - const char *name, struct thandle *th, - struct lustre_capa *capa) + const char *name, struct thandle *th) { + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + int i; + ENTRY; + if (!strcmp(name, XATTR_NAME_LOV)) lod_object_free_striping(env, lod_dt_obj(dt)); - return dt_xattr_del(env, dt_object_child(dt), name, th, capa); + + rc = lod_sub_object_xattr_del(env, next, name, th); + if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) + RETURN(rc); + + if (lo->ldo_stripenr == 0) + RETURN(0); + + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + + rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name, th); + if (rc != 0) + break; + } + + RETURN(rc); } /** @@ -2829,10 +2839,9 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, * for details. */ static int lod_xattr_list(const struct lu_env *env, - struct dt_object *dt, const struct lu_buf *buf, - struct lustre_capa *capa) + struct dt_object *dt, const struct lu_buf *buf) { - return dt_xattr_list(env, dt_object_child(dt), buf, capa); + return dt_xattr_list(env, dt_object_child(dt), buf); } /** @@ -2904,7 +2913,7 @@ static int lod_cache_parent_lov_striping(const struct lu_env *env, if (rc < (typeof(rc))sizeof(struct lov_user_md)) { /* don't lookup for non-existing or invalid striping */ lp->ldo_def_striping_set = 0; - lp->ldo_striping_cached = 1; + lp->ldo_def_striping_cached = 1; lp->ldo_def_stripe_size = 0; lp->ldo_def_stripenr = 0; lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1); @@ -2934,7 +2943,7 @@ static int lod_cache_parent_lov_striping(const struct lu_env *env, lp->ldo_def_stripenr = v1->lmm_stripe_count; lp->ldo_def_stripe_size = v1->lmm_stripe_size; lp->ldo_def_stripe_offset = v1->lmm_stripe_offset; - lp->ldo_striping_cached = 1; + lp->ldo_def_striping_cached = 1; lp->ldo_def_striping_set = 1; if (v1->lmm_magic == LOV_USER_MAGIC_V3) { /* XXX: sanity check here */ @@ -2979,7 +2988,7 @@ static int lod_cache_parent_lmv_striping(const struct lu_env *env, if (rc < (typeof(rc))sizeof(struct lmv_user_md)) { /* don't lookup for non-existing or invalid striping */ lp->ldo_dir_def_striping_set = 0; - lp->ldo_dir_striping_cached = 1; + lp->ldo_dir_def_striping_cached = 1; lp->ldo_dir_def_stripenr = 0; lp->ldo_dir_def_stripe_offset = (typeof(v1->lum_stripe_offset))(-1); @@ -2994,7 +3003,7 @@ static int lod_cache_parent_lmv_striping(const struct lu_env *env, lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset); lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type); lp->ldo_dir_def_striping_set = 1; - lp->ldo_dir_striping_cached = 1; + lp->ldo_dir_def_striping_cached = 1; EXIT; unlock: @@ -3025,11 +3034,7 @@ static int lod_cache_parent_striping(const struct lu_env *env, int rc = 0; ENTRY; - rc = lod_load_striping(env, lp); - if (rc != 0) - RETURN(rc); - - if (!lp->ldo_striping_cached) { + if (!lp->ldo_def_striping_cached) { /* we haven't tried to get default striping for * the directory yet, let's cache it in the object */ rc = lod_cache_parent_lov_striping(env, lp); @@ -3037,7 +3042,12 @@ static int lod_cache_parent_striping(const struct lu_env *env, RETURN(rc); } - if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached) + /* If the parent is on the remote MDT, we should always + * try to refresh the default stripeEA cache, because we + * do not cache default striping information for remote + * object. */ + if (S_ISDIR(child_mode) && (!lp->ldo_dir_def_striping_cached || + dt_object_remote(&lp->ldo_obj))) rc = lod_cache_parent_lmv_striping(env, lp); RETURN(rc); @@ -3104,6 +3114,7 @@ static void lod_ah_init(const struct lu_env *env, return; } + LASSERT(lp != NULL); if (lp->ldo_dir_stripe == NULL) { OBD_ALLOC_PTR(lp->ldo_dir_stripe); if (lp->ldo_dir_stripe == NULL) @@ -3115,14 +3126,14 @@ static void lod_ah_init(const struct lu_env *env, return; /* transfer defaults to new directory */ - if (lp->ldo_striping_cached) { + if (lp->ldo_def_striping_set) { if (lp->ldo_pool) lod_object_set_pool(lc, lp->ldo_pool); lc->ldo_def_stripenr = lp->ldo_def_stripenr; lc->ldo_def_stripe_size = lp->ldo_def_stripe_size; lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset; - lc->ldo_striping_cached = 1; lc->ldo_def_striping_set = 1; + lc->ldo_def_striping_cached = 1; CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n", (int)lc->ldo_def_stripe_size, (int)lc->ldo_def_stripe_offset, @@ -3130,14 +3141,14 @@ static void lod_ah_init(const struct lu_env *env, } /* transfer dir defaults to new directory */ - if (lp->ldo_dir_striping_cached) { + if (lp->ldo_dir_def_striping_set) { lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr; lc->ldo_dir_def_stripe_offset = lp->ldo_dir_def_stripe_offset; lc->ldo_dir_def_hash_type = lp->ldo_dir_def_hash_type; - lc->ldo_dir_striping_cached = 1; lc->ldo_dir_def_striping_set = 1; + lc->ldo_dir_def_striping_cached = 1; CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n", (int)lc->ldo_dir_def_stripenr, (int)lc->ldo_dir_def_stripe_offset, @@ -3266,7 +3277,7 @@ static int lod_declare_init_size(const struct lu_env *env, LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0); LASSERT(lo->ldo_stripe_size > 0); - rc = dt_attr_get(env, next, attr, BYPASS_CAPA); + rc = dt_attr_get(env, next, attr); LASSERT(attr->la_valid & LA_SIZE); if (rc) RETURN(rc); @@ -3286,7 +3297,8 @@ static int lod_declare_init_size(const struct lu_env *env, attr->la_valid = LA_SIZE; attr->la_size = size; - rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th); + rc = lod_sub_object_declare_attr_set(env, lo->ldo_stripe[stripe], attr, + th); RETURN(rc); } @@ -3352,8 +3364,8 @@ int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt, info->lti_buf = *lovea; } - rc = dt_declare_xattr_set(env, next, &info->lti_buf, - XATTR_NAME_LOV, 0, th); + rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf, + XATTR_NAME_LOV, 0, th); if (rc) GOTO(out, rc); @@ -3399,15 +3411,15 @@ static int lod_declare_object_create(const struct lu_env *env, /* * first of all, we declare creation of local object */ - rc = dt_declare_create(env, next, attr, hint, dof, th); - if (rc) + rc = lod_sub_object_declare_create(env, next, attr, hint, dof, th); + if (rc != 0) GOTO(out, rc); if (dof->dof_type == DFT_SYM) dt->do_body_ops = &lod_body_lnk_ops; /* - * it's lod_ah_init() who has decided the object will striped + * it's lod_ah_init() that has decided the object will be striped */ if (dof->dof_type == DFT_REGULAR) { /* callers don't want stripes */ @@ -3420,6 +3432,24 @@ static int lod_declare_object_create(const struct lu_env *env, rc = lod_declare_striped_object(env, dt, attr, NULL, th); } else if (dof->dof_type == DFT_DIR) { + struct seq_server_site *ss; + + ss = lu_site2seq(dt->do_lu.lo_dev->ld_site); + + /* If the parent has default stripeEA, and client + * did not find it before sending create request, + * then MDT will return -EREMOTE, and client will + * retrieve the default stripeEA and re-create the + * sub directory. + * + * Note: if dah_eadata != NULL, it means creating the + * striped directory with specified stripeEA, then it + * should ignore the default stripeEA */ + if ((hint == NULL || hint->dah_eadata == NULL) && + lo->ldo_dir_stripe_offset != -1 && + lo->ldo_dir_stripe_offset != ss->ss_node_id) + GOTO(out, rc = -EREMOTE); + /* Orphan object (like migrating object) does not have * lod_dir_stripe, see lod_ah_init */ if (lo->ldo_dir_stripe != NULL) @@ -3462,13 +3492,17 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt, /* create all underlying objects */ for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); - rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th); - + rc = lod_sub_object_create(env, lo->ldo_stripe[i], attr, NULL, + dof, th); if (rc) break; } - if (rc == 0) + + if (rc == 0) { rc = lod_generate_and_set_lovea(env, lo, th); + if (rc == 0) + lo->ldo_striping_cached = 1; + } RETURN(rc); } @@ -3487,13 +3521,13 @@ static int lod_object_create(const struct lu_env *env, struct dt_object *dt, struct dt_allocation_hint *hint, struct dt_object_format *dof, struct thandle *th) { - struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); int rc; ENTRY; /* create local object */ - rc = dt_create(env, next, attr, hint, dof, th); + rc = lod_sub_object_create(env, dt_object_child(dt), attr, hint, dof, + th); if (rc != 0) RETURN(rc); @@ -3543,22 +3577,24 @@ static int lod_declare_object_destroy(const struct lu_env *env, RETURN(rc); for (i = 0; i < lo->ldo_stripenr; i++) { - rc = dt_declare_ref_del(env, next, th); + rc = lod_sub_object_declare_ref_del(env, next, th); if (rc != 0) RETURN(rc); + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)), i); - rc = dt_declare_delete(env, next, + rc = lod_sub_object_declare_delete(env, next, (const struct dt_key *)stripe_name, th); if (rc != 0) RETURN(rc); } } + /* * we declare destroy for the local object */ - rc = dt_declare_destroy(env, next, th); + rc = lod_sub_object_declare_destroy(env, next, th); if (rc) RETURN(rc); @@ -3567,11 +3603,17 @@ static int lod_declare_object_destroy(const struct lu_env *env, /* declare destroy all striped objects */ for (i = 0; i < lo->ldo_stripenr; i++) { - if (likely(lo->ldo_stripe[i] != NULL)) { - rc = dt_declare_destroy(env, lo->ldo_stripe[i], th); - if (rc != 0) - break; - } + if (lo->ldo_stripe[i] == NULL) + continue; + + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) + rc = lod_sub_object_declare_ref_del(env, + lo->ldo_stripe[i], th); + + rc = lod_sub_object_declare_destroy(env, lo->ldo_stripe[i], + th); + if (rc != 0) + break; } RETURN(rc); @@ -3605,7 +3647,7 @@ static int lod_object_destroy(const struct lu_env *env, RETURN(rc); for (i = 0; i < lo->ldo_stripenr; i++) { - rc = dt_ref_del(env, next, th); + rc = lod_sub_object_ref_del(env, next, th); if (rc != 0) RETURN(rc); @@ -3617,14 +3659,14 @@ static int lod_object_destroy(const struct lu_env *env, PFID(lu_object_fid(&dt->do_lu)), stripe_name, PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu))); - rc = dt_delete(env, next, - (const struct dt_key *)stripe_name, - th, BYPASS_CAPA); + rc = lod_sub_object_delete(env, next, + (const struct dt_key *)stripe_name, th); if (rc != 0) RETURN(rc); } } - rc = dt_destroy(env, next, th); + + rc = lod_sub_object_destroy(env, next, th); if (rc != 0) RETURN(rc); @@ -3636,7 +3678,17 @@ static int lod_object_destroy(const struct lu_env *env, if (likely(lo->ldo_stripe[i] != NULL) && (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || i == cfs_fail_val)) { - rc = dt_destroy(env, lo->ldo_stripe[i], th); + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + dt_write_lock(env, lo->ldo_stripe[i], + MOR_TGT_CHILD); + rc = lod_sub_object_ref_del(env, + lo->ldo_stripe[i], th); + dt_write_unlock(env, lo->ldo_stripe[i]); + if (rc != 0) + break; + } + + rc = lod_sub_object_destroy(env, lo->ldo_stripe[i], th); if (rc != 0) break; } @@ -3654,7 +3706,7 @@ static int lod_object_destroy(const struct lu_env *env, static int lod_declare_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - return dt_declare_ref_add(env, dt_object_child(dt), th); + return lod_sub_object_declare_ref_add(env, dt_object_child(dt), th); } /** @@ -3665,7 +3717,7 @@ static int lod_declare_ref_add(const struct lu_env *env, static int lod_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - return dt_ref_add(env, dt_object_child(dt), th); + return lod_sub_object_ref_add(env, dt_object_child(dt), th); } /** @@ -3677,7 +3729,7 @@ static int lod_ref_add(const struct lu_env *env, static int lod_declare_ref_del(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - return dt_declare_ref_del(env, dt_object_child(dt), th); + return lod_sub_object_declare_ref_del(env, dt_object_child(dt), th); } /** @@ -3688,19 +3740,7 @@ static int lod_declare_ref_del(const struct lu_env *env, static int lod_ref_del(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - return dt_ref_del(env, dt_object_child(dt), th); -} - -/** - * Implementation of dt_object_operations::do_capa_get. - * - * \see dt_object_operations::do_capa_get() in the API description for details. - */ -static struct obd_capa *lod_capa_get(const struct lu_env *env, - struct dt_object *dt, - struct lustre_capa *old, __u64 opc) -{ - return dt_capa_get(env, dt_object_child(dt), old, opc); + return lod_sub_object_ref_del(env, dt_object_child(dt), th); } /** @@ -3910,7 +3950,6 @@ struct dt_object_operations lod_obj_ops = { .do_ref_add = lod_ref_add, .do_declare_ref_del = lod_declare_ref_del, .do_ref_del = lod_ref_del, - .do_capa_get = lod_capa_get, .do_object_sync = lod_object_sync, .do_object_lock = lod_object_lock, .do_object_unlock = lod_object_unlock, @@ -3922,11 +3961,10 @@ struct dt_object_operations lod_obj_ops = { * \see dt_body_operations::dbo_read() in the API description for details. */ static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, loff_t *pos, - struct lustre_capa *capa) + struct lu_buf *buf, loff_t *pos) { struct dt_object *next = dt_object_child(dt); - return next->do_body_ops->dbo_read(env, next, buf, pos, capa); + return next->do_body_ops->dbo_read(env, next, buf, pos); } /** @@ -3940,8 +3978,8 @@ static ssize_t lod_declare_write(const struct lu_env *env, const struct lu_buf *buf, loff_t pos, struct thandle *th) { - return dt_declare_record_write(env, dt_object_child(dt), - buf, pos, th); + return lod_sub_object_declare_write(env, dt_object_child(dt), buf, pos, + th); } /** @@ -3951,11 +3989,9 @@ static ssize_t lod_declare_write(const struct lu_env *env, */ static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, - struct thandle *th, struct lustre_capa *capa, int iq) + struct thandle *th, int iq) { - struct dt_object *next = dt_object_child(dt); - LASSERT(next); - return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq); + return lod_sub_object_write(env, dt_object_child(dt), buf, pos, th, iq); } static const struct dt_body_operations lod_body_lnk_ops = { @@ -4070,6 +4106,7 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) lo->ldo_stripe = NULL; lo->ldo_stripes_allocated = 0; } + lo->ldo_striping_cached = 0; lo->ldo_stripenr = 0; lo->ldo_pattern = 0; }