X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=f082c9198244526fd924a06524f88fed79f0930b;hb=e39a2a90f9f3a4c12bbfdb00bad59841909ef2a5;hp=816e6cd0b09bde1662801606cc4e8183d8b8f0c5;hpb=df70a5c8938888686188cfc63748011f8c7557e0;p=fs%2Flustre-release.git diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 816e6cd..f082c91 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -23,7 +23,7 @@ * Copyright 2009 Sun Microsystems, Inc. All rights reserved * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Intel, Inc. + * Copyright (c) 2012, Intel Corporation. */ /* * lustre/lod/lod_object.c @@ -101,19 +101,140 @@ static struct dt_it *lod_it_init(const struct lu_env *env, struct dt_object *dt, __u32 attr, struct lustre_capa *capa) { - struct dt_object *next = dt_object_child(dt); + struct dt_object *next = dt_object_child(dt); + struct lod_it *it = &lod_env_info(env)->lti_it; + struct dt_it *it_next; + + + it_next = next->do_index_ops->dio_it.init(env, next, attr, capa); + if (IS_ERR(it_next)) + return it_next; + + /* currently we do not use more than one iterator per thread + * so we store it in thread info. if at some point we need + * more active iterators in a single thread, we can allocate + * additional ones */ + LASSERT(it->lit_obj == NULL); + + it->lit_it = it_next; + it->lit_obj = next; + + return (struct dt_it *)it; +} + +#define LOD_CHECK_IT(env, it) \ +{ \ + /* IT is supposed to be in thread info always */ \ + LASSERT((it) == &lod_env_info(env)->lti_it); \ + LASSERT((it)->lit_obj != NULL); \ + LASSERT((it)->lit_it != NULL); \ +} while(0) + +void lod_it_fini(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + + LOD_CHECK_IT(env, it); + it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it); + + /* the iterator not in use any more */ + it->lit_obj = NULL; + it->lit_it = NULL; +} + +int lod_it_get(const struct lu_env *env, struct dt_it *di, + const struct dt_key *key) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key); +} + +void lod_it_put(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it); +} + +int lod_it_next(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it); +} + +struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di) +{ + const struct lod_it *it = (const struct lod_it *)di; - return next->do_index_ops->dio_it.init(env, next, attr, capa); + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it); +} + +int lod_it_key_size(const struct lu_env *env, const struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it); +} + +int lod_it_rec(const struct lu_env *env, const struct dt_it *di, + struct dt_rec *rec, __u32 attr) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr); +} + +__u64 lod_it_store(const struct lu_env *env, const struct dt_it *di) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it); +} + +int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash); +} + +int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di, + void* key_rec) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it, key_rec); } static struct dt_index_operations lod_index_ops = { - .dio_lookup = lod_index_lookup, - .dio_declare_insert = lod_declare_index_insert, - .dio_insert = lod_index_insert, - .dio_declare_delete = lod_declare_index_delete, - .dio_delete = lod_index_delete, - .dio_it = { - .init = lod_it_init, + .dio_lookup = lod_index_lookup, + .dio_declare_insert = lod_declare_index_insert, + .dio_insert = lod_index_insert, + .dio_declare_delete = lod_declare_index_delete, + .dio_delete = lod_index_delete, + .dio_it = { + .init = lod_it_init, + .fini = lod_it_fini, + .get = lod_it_get, + .put = lod_it_put, + .next = lod_it_next, + .key = lod_it_key, + .key_size = lod_it_key_size, + .rec = lod_it_rec, + .store = lod_it_store, + .load = lod_it_load, + .key_rec = lod_it_key_rec, } }; @@ -301,9 +422,11 @@ static int lod_declare_xattr_set(const struct lu_env *env, * allow to declare predefined striping on a new (!mode) object * which is supposed to be replay of regular file creation * (when LOV setting is declared) + * LU_XATTR_REPLACE is set to indicate a layout swap */ mode = dt->do_lu.lo_header->loh_attr & S_IFMT; - if ((S_ISREG(mode) || !mode) && !strcmp(name, XATTR_NAME_LOV)) { + if ((S_ISREG(mode) || !mode) && !strcmp(name, XATTR_NAME_LOV) && + !(fl & LU_XATTR_REPLACE)) { /* * this is a request to manipulate object's striping */ @@ -388,9 +511,9 @@ static int lod_xattr_set(const struct lu_env *env, const char *name, int fl, struct thandle *th, struct lustre_capa *capa) { - struct dt_object *next = dt_object_child(dt); - __u32 attr; - int rc; + struct dt_object *next = dt_object_child(dt); + __u32 attr; + int rc; ENTRY; attr = dt->do_lu.lo_header->loh_attr & S_IFMT; @@ -402,12 +525,15 @@ static int lod_xattr_set(const struct lu_env *env, rc = dt_xattr_set(env, next, buf, name, fl, th, capa); } else if (S_ISREG(attr) && !strcmp(name, XATTR_NAME_LOV)) { - /* - * XXX: check striping match what we already have - * during req replay, declare_xattr_set() defines striping, - * then create() does the work - */ - rc = lod_striping_create(env, dt, NULL, NULL, th); + /* in case of lov EA swap, just set it + * if not, it is a replay so check striping match what we + * already have during req replay, declare_xattr_set() + * defines striping, then create() does the work + */ + if (fl & LU_XATTR_REPLACE) + rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + else + rc = lod_striping_create(env, dt, NULL, NULL, th); RETURN(rc); } else { /* @@ -581,7 +707,10 @@ static void lod_ah_init(const struct lu_env *env, lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset; lc->ldo_striping_cached = 1; lc->ldo_def_striping_set = 1; - CDEBUG(D_OTHER, "inherite striping defaults\n"); + CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n", + (int)lc->ldo_def_stripenr, + (int)lc->ldo_def_stripe_size, + (int)lc->ldo_def_stripe_offset); } return; } @@ -634,6 +763,53 @@ static void lod_ah_init(const struct lu_env *env, EXIT; } +#define ll_do_div64(aaa,bbb) do_div((aaa), (bbb)) +/* + * this function handles a special case when truncate was done + * on a stripeless object and now striping is being created + * we can't lose that size, so we have to propagate it to newly + * created object + */ +static int lod_declare_init_size(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + uint64_t size, offs; + int rc, stripe; + ENTRY; + + /* XXX: we support the simplest (RAID0) striping so far */ + LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0); + LASSERT(lo->ldo_stripe_size > 0); + + rc = dt_attr_get(env, next, attr, BYPASS_CAPA); + LASSERT(attr->la_valid & LA_SIZE); + if (rc) + RETURN(rc); + + size = attr->la_size; + if (size == 0) + RETURN(0); + + /* ll_do_div64(a, b) returns a % b, and a = a / b */ + ll_do_div64(size, (__u64) lo->ldo_stripe_size); + stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr); + + size = size * lo->ldo_stripe_size; + offs = attr->la_size; + size += ll_do_div64(offs, lo->ldo_stripe_size); + + attr->la_valid = LA_SIZE; + attr->la_size = size; + + rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th); + + RETURN(rc); +} + + /** * Create declaration of striped object */ @@ -654,8 +830,14 @@ int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc = -ENOMEM); } - /* XXX: there will be a call to QoS here */ - RETURN(0); + /* choose OST and generate appropriate objects */ + rc = lod_qos_prep_create(env, lo, attr, lovea, th); + if (rc) { + /* failed to create striping, let's reset + * config so that others don't get confused */ + lod_object_free_striping(env, lo); + GOTO(out, rc); + } /* * declare storage for striping data @@ -667,6 +849,14 @@ int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt, if (rc) GOTO(out, rc); + /* + * if striping is created with local object's size > 0, + * we have to propagate this size to specific object + * the case is possible only when local object was created previously + */ + if (dt_object_exists(next)) + rc = lod_declare_init_size(env, dt, th); + out: RETURN(rc); } @@ -714,11 +904,35 @@ static int lod_declare_object_create(const struct lu_env *env, } else if (dof->dof_type == DFT_DIR && lo->ldo_striping_cached) { struct lod_thread_info *info = lod_env_info(env); - info->lti_buf.lb_buf = NULL; - info->lti_buf.lb_len = sizeof(struct lov_user_md_v3); + struct lov_user_md_v3 *v3; + + if (LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, + lo->ldo_def_stripenr, + lo->ldo_def_stripe_offset)) + RETURN(0); + + OBD_ALLOC_PTR(v3); + if (v3 == NULL) + RETURN(-ENOMEM); + + v3->lmm_magic = cpu_to_le32(LOV_MAGIC_V3); + v3->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0); + v3->lmm_object_id = fid_oid(lu_object_fid(&dt->do_lu)); + v3->lmm_object_seq = fid_seq(lu_object_fid(&dt->do_lu)); + v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size); + v3->lmm_stripe_count = cpu_to_le32(lo->ldo_def_stripenr); + v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset); + if (lo->ldo_pool) + strncpy(v3->lmm_pool_name, lo->ldo_pool, + LOV_MAXPOOLNAME); + + info->lti_buf.lb_buf = v3; + info->lti_buf.lb_len = sizeof(*v3); + /* to transfer default striping from the parent */ rc = dt_declare_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV, 0, th); + OBD_FREE_PTR(v3); } out: @@ -846,15 +1060,8 @@ static int lod_index_try(const struct lu_env *env, struct dt_object *dt, LASSERT(next->do_ops->do_index_try); rc = next->do_ops->do_index_try(env, next, feat); - if (next->do_index_ops && dt->do_index_ops == NULL) { + if (next->do_index_ops && dt->do_index_ops == NULL) dt->do_index_ops = &lod_index_ops; - /* XXX: iterators don't accept device, so bypass LOD */ - /* will be fixed with DNE */ - if (lod_index_ops.dio_it.fini == NULL) { - lod_index_ops.dio_it = next->do_index_ops->dio_it; - lod_index_ops.dio_it.init = lod_it_init; - } - } RETURN(rc); } @@ -1045,3 +1252,51 @@ struct lu_object_operations lod_lu_obj_ops = { .loo_object_release = lod_object_release, .loo_object_print = lod_object_print, }; + +/** + * Init remote lod object + */ +static int lod_robject_init(const struct lu_env *env, struct lu_object *lo, + const struct lu_object_conf *conf) +{ + struct lod_device *lod = lu2lod_dev(lo->lo_dev); + struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; + struct lu_device *c_dev = NULL; + struct lu_object *c_obj; + int i; + ENTRY; + + lod_getref(ltd); + if (ltd->ltd_tgts_size > 0) { + cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) { + struct lod_tgt_desc *tgt; + tgt = LTD_TGT(ltd, i); + LASSERT(tgt && tgt->ltd_tgt); + if (tgt->ltd_index == + lu2lod_obj(lo)->ldo_mds_num) { + c_dev = &(tgt->ltd_tgt->dd_lu_dev); + break; + } + } + } + lod_putref(lod, ltd); + + if (unlikely(c_dev == NULL)) + RETURN(-ENOENT); + + c_obj = c_dev->ld_ops->ldo_object_alloc(env, lo->lo_header, c_dev); + if (unlikely(c_obj == NULL)) + RETURN(-ENOMEM); + + lu_object_add(lo, c_obj); + + RETURN(0); +} + +struct lu_object_operations lod_lu_robj_ops = { + .loo_object_init = lod_robject_init, + .loo_object_start = lod_object_start, + .loo_object_free = lod_object_free, + .loo_object_release = lod_object_release, + .loo_object_print = lod_object_print, +};