X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=070b2b6ca86a6c073c25847cfedb55269bf6d554;hp=676eb986dd4b65ca7a2d2248f8e8c55d8d084f62;hb=0607e01af74a81d0fe12ceec79bd22810a5dfe92;hpb=163dc2bd1e2f7654c2d32d65c0eb326717fce67d diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 676eb98..070b2b6 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -23,7 +23,7 @@ * Copyright 2009 Sun Microsystems, Inc. All rights reserved * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Intel, Inc. + * Copyright (c) 2012, Intel Corporation. */ /* * lustre/lod/lod_object.c @@ -101,19 +101,138 @@ static struct dt_it *lod_it_init(const struct lu_env *env, struct dt_object *dt, __u32 attr, struct lustre_capa *capa) { - struct dt_object *next = dt_object_child(dt); + struct dt_object *next = dt_object_child(dt); + struct lod_it *it = &lod_env_info(env)->lti_it; + struct dt_it *it_next; + + + it_next = next->do_index_ops->dio_it.init(env, next, attr, capa); + if (IS_ERR(it_next)) + return it_next; + + /* currently we do not use more than one iterator per thread + * so we store it in thread info. if at some point we need + * more active iterators in a single thread, we can allocate + * additional ones */ + LASSERT(it->lit_obj == NULL); + + it->lit_it = it_next; + it->lit_obj = next; + + return (struct dt_it *)it; +} + +#define LOD_CHECK_IT(env, it) \ +{ \ + LASSERT((it)->lit_obj != NULL); \ + LASSERT((it)->lit_it != NULL); \ +} while(0) + +void lod_it_fini(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + + LOD_CHECK_IT(env, it); + it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it); + + /* the iterator not in use any more */ + it->lit_obj = NULL; + it->lit_it = NULL; +} + +int lod_it_get(const struct lu_env *env, struct dt_it *di, + const struct dt_key *key) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key); +} + +void lod_it_put(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it); +} + +int lod_it_next(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; - return next->do_index_ops->dio_it.init(env, next, attr, capa); + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it); +} + +struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it); +} + +int lod_it_key_size(const struct lu_env *env, const struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it); +} + +int lod_it_rec(const struct lu_env *env, const struct dt_it *di, + struct dt_rec *rec, __u32 attr) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr); +} + +__u64 lod_it_store(const struct lu_env *env, const struct dt_it *di) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it); +} + +int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash); +} + +int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di, + void* key_rec) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it, key_rec); } static struct dt_index_operations lod_index_ops = { - .dio_lookup = lod_index_lookup, - .dio_declare_insert = lod_declare_index_insert, - .dio_insert = lod_index_insert, - .dio_declare_delete = lod_declare_index_delete, - .dio_delete = lod_index_delete, - .dio_it = { - .init = lod_it_init, + .dio_lookup = lod_index_lookup, + .dio_declare_insert = lod_declare_index_insert, + .dio_insert = lod_index_insert, + .dio_declare_delete = lod_declare_index_delete, + .dio_delete = lod_index_delete, + .dio_it = { + .init = lod_it_init, + .fini = lod_it_fini, + .get = lod_it_get, + .put = lod_it_put, + .next = lod_it_next, + .key = lod_it_key, + .key_size = lod_it_key_size, + .rec = lod_it_rec, + .store = lod_it_store, + .load = lod_it_load, + .key_rec = lod_it_key_rec, } }; @@ -261,7 +380,7 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, rc = sizeof(struct lov_user_md_v1); } else if (buf->lb_len >= sizeof(struct lov_user_md_v1)) { lum->lmm_magic = LOV_USER_MAGIC_V1; - lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT; + lum->lmm_oi.oi_seq = FID_SEQ_LOV_DEFAULT; lum->lmm_pattern = desc->ld_pattern; lum->lmm_stripe_size = desc->ld_default_stripe_size; lum->lmm_stripe_count = desc->ld_default_stripe_count; @@ -301,9 +420,11 @@ static int lod_declare_xattr_set(const struct lu_env *env, * allow to declare predefined striping on a new (!mode) object * which is supposed to be replay of regular file creation * (when LOV setting is declared) + * LU_XATTR_REPLACE is set to indicate a layout swap */ mode = dt->do_lu.lo_header->loh_attr & S_IFMT; - if ((S_ISREG(mode) || !mode) && !strcmp(name, XATTR_NAME_LOV)) { + if ((S_ISREG(mode) || !mode) && !strcmp(name, XATTR_NAME_LOV) && + !(fl & LU_XATTR_REPLACE)) { /* * this is a request to manipulate object's striping */ @@ -312,7 +433,7 @@ static int lod_declare_xattr_set(const struct lu_env *env, if (rc) RETURN(rc); } else { - memset(attr, 0, sizeof(attr)); + memset(attr, 0, sizeof(*attr)); attr->la_valid = LA_TYPE | LA_MODE; attr->la_mode = S_IFREG; } @@ -388,9 +509,9 @@ static int lod_xattr_set(const struct lu_env *env, const char *name, int fl, struct thandle *th, struct lustre_capa *capa) { - struct dt_object *next = dt_object_child(dt); - __u32 attr; - int rc; + struct dt_object *next = dt_object_child(dt); + __u32 attr; + int rc; ENTRY; attr = dt->do_lu.lo_header->loh_attr & S_IFMT; @@ -402,12 +523,15 @@ static int lod_xattr_set(const struct lu_env *env, rc = dt_xattr_set(env, next, buf, name, fl, th, capa); } else if (S_ISREG(attr) && !strcmp(name, XATTR_NAME_LOV)) { - /* - * XXX: check striping match what we already have - * during req replay, declare_xattr_set() defines striping, - * then create() does the work - */ - rc = lod_striping_create(env, dt, NULL, NULL, th); + /* in case of lov EA swap, just set it + * if not, it is a replay so check striping match what we + * already have during req replay, declare_xattr_set() + * defines striping, then create() does the work + */ + if (fl & LU_XATTR_REPLACE) + rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + else + rc = lod_striping_create(env, dt, NULL, NULL, th); RETURN(rc); } else { /* @@ -563,7 +687,7 @@ static void lod_ah_init(const struct lu_env *env, * in case of late striping creation, ->ah_init() * can be called with local object existing */ - if (!dt_object_exists(nextc)) + if (!dt_object_exists(nextc) || dt_object_remote(nextc)) nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode); if (S_ISDIR(child_mode)) { @@ -581,7 +705,10 @@ static void lod_ah_init(const struct lu_env *env, lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset; lc->ldo_striping_cached = 1; lc->ldo_def_striping_set = 1; - CDEBUG(D_OTHER, "inherite striping defaults\n"); + CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n", + (int)lc->ldo_def_stripenr, + (int)lc->ldo_def_stripe_size, + (int)lc->ldo_def_stripe_offset); } return; } @@ -747,7 +874,6 @@ static int lod_declare_object_create(const struct lu_env *env, LASSERT(dof); LASSERT(attr); LASSERT(th); - LASSERT(!dt_object_exists(next)); /* * first of all, we declare creation of local object @@ -775,11 +901,35 @@ static int lod_declare_object_create(const struct lu_env *env, } else if (dof->dof_type == DFT_DIR && lo->ldo_striping_cached) { struct lod_thread_info *info = lod_env_info(env); - info->lti_buf.lb_buf = NULL; - info->lti_buf.lb_len = sizeof(struct lov_user_md_v3); + struct lov_user_md_v3 *v3; + + if (LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, + lo->ldo_def_stripenr, + lo->ldo_def_stripe_offset)) + RETURN(0); + + OBD_ALLOC_PTR(v3); + if (v3 == NULL) + RETURN(-ENOMEM); + + v3->lmm_magic = cpu_to_le32(LOV_MAGIC_V3); + v3->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0); + fid_ostid_pack(lu_object_fid(&dt->do_lu), &v3->lmm_oi); + ostid_cpu_to_le(&v3->lmm_oi, &v3->lmm_oi); + v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size); + v3->lmm_stripe_count = cpu_to_le32(lo->ldo_def_stripenr); + v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset); + if (lo->ldo_pool) + strncpy(v3->lmm_pool_name, lo->ldo_pool, + LOV_MAXPOOLNAME); + + info->lti_buf.lb_buf = v3; + info->lti_buf.lb_len = sizeof(*v3); + /* to transfer default striping from the parent */ rc = dt_declare_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV, 0, th); + OBD_FREE_PTR(v3); } out: @@ -795,7 +945,7 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt, ENTRY; LASSERT(lo->ldo_stripe); - LASSERT(lo->ldo_stripe > 0); + LASSERT(lo->ldo_stripenr > 0); LASSERT(lo->ldo_striping_cached == 0); /* create all underlying objects */ @@ -907,15 +1057,8 @@ static int lod_index_try(const struct lu_env *env, struct dt_object *dt, LASSERT(next->do_ops->do_index_try); rc = next->do_ops->do_index_try(env, next, feat); - if (next->do_index_ops && dt->do_index_ops == NULL) { + if (next->do_index_ops && dt->do_index_ops == NULL) dt->do_index_ops = &lod_index_ops; - /* XXX: iterators don't accept device, so bypass LOD */ - /* will be fixed with DNE */ - if (lod_index_ops.dio_it.fini == NULL) { - lod_index_ops.dio_it = next->do_index_ops->dio_it; - lod_index_ops.dio_it.init = lod_it_init; - } - } RETURN(rc); } @@ -985,6 +1128,26 @@ struct dt_object_operations lod_obj_ops = { .do_object_sync = lod_object_sync, }; +static int lod_object_lock(const struct lu_env *env, + struct dt_object *dt, struct lustre_handle *lh, + struct ldlm_enqueue_info *einfo, + void *policy) +{ + struct dt_object *next = dt_object_child(dt); + int rc; + ENTRY; + + /* + * declare setattr on the local object + */ + rc = dt_object_lock(env, next, lh, einfo, policy); + + RETURN(rc); +} + +struct dt_lock_operations lod_lock_ops = { + .do_object_lock = lod_object_lock, +}; static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt, struct lu_buf *buf, loff_t *pos, struct lustre_capa *capa) @@ -1106,3 +1269,51 @@ struct lu_object_operations lod_lu_obj_ops = { .loo_object_release = lod_object_release, .loo_object_print = lod_object_print, }; + +/** + * Init remote lod object + */ +static int lod_robject_init(const struct lu_env *env, struct lu_object *lo, + const struct lu_object_conf *conf) +{ + struct lod_device *lod = lu2lod_dev(lo->lo_dev); + struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; + struct lu_device *c_dev = NULL; + struct lu_object *c_obj; + int i; + ENTRY; + + lod_getref(ltd); + if (ltd->ltd_tgts_size > 0) { + cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) { + struct lod_tgt_desc *tgt; + tgt = LTD_TGT(ltd, i); + LASSERT(tgt && tgt->ltd_tgt); + if (tgt->ltd_index == + lu2lod_obj(lo)->ldo_mds_num) { + c_dev = &(tgt->ltd_tgt->dd_lu_dev); + break; + } + } + } + lod_putref(lod, ltd); + + if (unlikely(c_dev == NULL)) + RETURN(-ENOENT); + + c_obj = c_dev->ld_ops->ldo_object_alloc(env, lo->lo_header, c_dev); + if (unlikely(c_obj == NULL)) + RETURN(-ENOMEM); + + lu_object_add(lo, c_obj); + + RETURN(0); +} + +struct lu_object_operations lod_lu_robj_ops = { + .loo_object_init = lod_robject_init, + .loo_object_start = lod_object_start, + .loo_object_free = lod_object_free, + .loo_object_release = lod_object_release, + .loo_object_print = lod_object_print, +};