X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=6179e4db7db08ec6e52f13eb42e26ef0b6b86b46;hb=da12d3ba35bbb86c8e5860a5ed161a55f01b69d5;hp=30760cdb174e5a9a33940308b3f3fa2159cd09b2;hpb=135092984c44a0594194abd8267c3e2eaf7187c1;p=fs%2Flustre-release.git diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 30760cd..6179e4d 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -23,7 +23,7 @@ * Copyright 2009 Sun Microsystems, Inc. All rights reserved * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Intel, Inc. + * Copyright (c) 2012, 2013, Intel Corporation. */ /* * lustre/lod/lod_object.c @@ -31,9 +31,6 @@ * Author: Alex Zhuravlev */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_MDS #include @@ -49,7 +46,7 @@ #include "lod_internal.h" -extern cfs_mem_cache_t *lod_object_kmem; +extern struct kmem_cache *lod_object_kmem; static const struct dt_body_operations lod_body_lnk_ops; static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt, @@ -101,19 +98,138 @@ static struct dt_it *lod_it_init(const struct lu_env *env, struct dt_object *dt, __u32 attr, struct lustre_capa *capa) { - struct dt_object *next = dt_object_child(dt); + struct dt_object *next = dt_object_child(dt); + struct lod_it *it = &lod_env_info(env)->lti_it; + struct dt_it *it_next; + + + it_next = next->do_index_ops->dio_it.init(env, next, attr, capa); + if (IS_ERR(it_next)) + return it_next; + + /* currently we do not use more than one iterator per thread + * so we store it in thread info. if at some point we need + * more active iterators in a single thread, we can allocate + * additional ones */ + LASSERT(it->lit_obj == NULL); + + it->lit_it = it_next; + it->lit_obj = next; + + return (struct dt_it *)it; +} + +#define LOD_CHECK_IT(env, it) \ +{ \ + LASSERT((it)->lit_obj != NULL); \ + LASSERT((it)->lit_it != NULL); \ +} while(0) + +void lod_it_fini(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; - return next->do_index_ops->dio_it.init(env, next, attr, capa); + LOD_CHECK_IT(env, it); + it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it); + + /* the iterator not in use any more */ + it->lit_obj = NULL; + it->lit_it = NULL; +} + +int lod_it_get(const struct lu_env *env, struct dt_it *di, + const struct dt_key *key) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key); +} + +void lod_it_put(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it); +} + +int lod_it_next(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it); +} + +struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it); +} + +int lod_it_key_size(const struct lu_env *env, const struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it); +} + +int lod_it_rec(const struct lu_env *env, const struct dt_it *di, + struct dt_rec *rec, __u32 attr) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr); +} + +__u64 lod_it_store(const struct lu_env *env, const struct dt_it *di) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it); +} + +int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash); +} + +int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di, + void* key_rec) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it, key_rec); } static struct dt_index_operations lod_index_ops = { - .dio_lookup = lod_index_lookup, - .dio_declare_insert = lod_declare_index_insert, - .dio_insert = lod_index_insert, - .dio_declare_delete = lod_declare_index_delete, - .dio_delete = lod_index_delete, - .dio_it = { - .init = lod_it_init, + .dio_lookup = lod_index_lookup, + .dio_declare_insert = lod_declare_index_insert, + .dio_insert = lod_index_insert, + .dio_declare_delete = lod_declare_index_delete, + .dio_delete = lod_index_delete, + .dio_it = { + .init = lod_it_init, + .fini = lod_it_fini, + .get = lod_it_get, + .put = lod_it_put, + .next = lod_it_next, + .key = lod_it_key, + .key_size = lod_it_key_size, + .rec = lod_it_rec, + .store = lod_it_store, + .load = lod_it_load, + .key_rec = lod_it_key_rec, } }; @@ -172,6 +288,16 @@ static int lod_declare_attr_set(const struct lu_env *env, if (rc) RETURN(rc); + /* osp_declare_attr_set() ignores all attributes other than + * UID, GID, and size, and osp_attr_set() ignores all but UID + * and GID. Declaration of size attr setting happens through + * lod_declare_init_size(), and not through this function. + * Therefore we need not load striping unless ownership is + * changing. This should save memory and (we hope) speed up + * rename(). */ + if (!(attr->la_valid & (LA_UID | LA_GID))) + RETURN(rc); + /* * load striping information, notice we don't do this when object * is being initialized as we don't need this information till @@ -215,6 +341,9 @@ static int lod_attr_set(const struct lu_env *env, if (rc) RETURN(rc); + if (!(attr->la_valid & (LA_UID | LA_GID))) + RETURN(rc); + /* * if object is striped, apply changes to all the stripes */ @@ -258,15 +387,20 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, struct lov_desc *desc = &dev->lod_desc; if (buf->lb_buf == NULL) { - rc = sizeof(struct lov_user_md_v1); - } else if (buf->lb_len >= sizeof(struct lov_user_md_v1)) { - lum->lmm_magic = LOV_USER_MAGIC_V1; - lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT; - lum->lmm_pattern = desc->ld_pattern; - lum->lmm_stripe_size = desc->ld_default_stripe_size; - lum->lmm_stripe_count = desc->ld_default_stripe_count; - lum->lmm_stripe_offset = desc->ld_default_stripe_offset; - rc = sizeof(struct lov_user_md_v1); + rc = sizeof(*lum); + } else if (buf->lb_len >= sizeof(*lum)) { + lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1); + lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT); + lmm_oi_set_id(&lum->lmm_oi, 0); + lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi); + lum->lmm_pattern = cpu_to_le32(desc->ld_pattern); + lum->lmm_stripe_size = cpu_to_le32( + desc->ld_default_stripe_size); + lum->lmm_stripe_count = cpu_to_le16( + desc->ld_default_stripe_count); + lum->lmm_stripe_offset = cpu_to_le16( + desc->ld_default_stripe_offset); + rc = sizeof(*lum); } else { rc = -ERANGE; } @@ -292,9 +426,37 @@ static int lod_declare_xattr_set(const struct lu_env *env, struct thandle *th) { struct dt_object *next = dt_object_child(dt); + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + __u32 mode; int rc; ENTRY; + /* + * allow to declare predefined striping on a new (!mode) object + * which is supposed to be replay of regular file creation + * (when LOV setting is declared) + * LU_XATTR_REPLACE is set to indicate a layout swap + */ + mode = dt->do_lu.lo_header->loh_attr & S_IFMT; + if ((S_ISREG(mode) || !mode) && !strcmp(name, XATTR_NAME_LOV) && + !(fl & LU_XATTR_REPLACE)) { + /* + * this is a request to manipulate object's striping + */ + if (dt_object_exists(dt)) { + rc = dt_attr_get(env, next, attr, BYPASS_CAPA); + if (rc) + RETURN(rc); + } else { + memset(attr, 0, sizeof(*attr)); + attr->la_valid = LA_TYPE | LA_MODE; + attr->la_mode = S_IFREG; + } + rc = lod_declare_striped_object(env, dt, attr, buf, th); + if (rc) + RETURN(rc); + } + rc = dt_declare_xattr_set(env, next, buf, name, fl, th); RETURN(rc); @@ -322,8 +484,7 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, l->ldo_def_stripe_size = 0; l->ldo_def_stripenr = 0; - LASSERT(buf); - LASSERT(buf->lb_buf); + LASSERT(buf != NULL && buf->lb_buf != NULL); lum = buf->lb_buf; rc = lod_verify_striping(d, buf, 0); @@ -362,9 +523,9 @@ static int lod_xattr_set(const struct lu_env *env, const char *name, int fl, struct thandle *th, struct lustre_capa *capa) { - struct dt_object *next = dt_object_child(dt); - __u32 attr; - int rc; + struct dt_object *next = dt_object_child(dt); + __u32 attr; + int rc; ENTRY; attr = dt->do_lu.lo_header->loh_attr & S_IFMT; @@ -375,6 +536,20 @@ static int lod_xattr_set(const struct lu_env *env, else rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + } else if (S_ISREG(attr) && !strcmp(name, XATTR_NAME_LOV)) { + /* in case of lov EA swap, just set it + * if not, it is a replay so check striping match what we + * already have during req replay, declare_xattr_set() + * defines striping, then create() does the work + */ + if (fl & LU_XATTR_REPLACE) { + /* free stripes, then update disk */ + lod_object_free_striping(env, lod_dt_obj(dt)); + rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + } else { + rc = lod_striping_create(env, dt, NULL, NULL, th); + } + RETURN(rc); } else { /* * behave transparantly for all other EAs @@ -396,6 +571,8 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th, struct lustre_capa *capa) { + if (!strcmp(name, XATTR_NAME_LOV)) + lod_object_free_striping(env, lod_dt_obj(dt)); return dt_xattr_del(env, dt_object_child(dt), name, th, capa); } @@ -501,7 +678,7 @@ static void lod_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah, struct dt_object *parent, struct dt_object *child, - cfs_umode_t child_mode) + umode_t child_mode) { struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev); struct dt_object *nextp = NULL; @@ -529,8 +706,9 @@ static void lod_ah_init(const struct lu_env *env, * in case of late striping creation, ->ah_init() * can be called with local object existing */ - if (!dt_object_exists(nextc)) - nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode); + if (!dt_object_exists(nextc) || dt_object_remote(nextc)) + nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ? + NULL : nextp, nextc, child_mode); if (S_ISDIR(child_mode)) { if (lp->ldo_striping_cached == 0) { @@ -547,7 +725,10 @@ static void lod_ah_init(const struct lu_env *env, lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset; lc->ldo_striping_cached = 1; lc->ldo_def_striping_set = 1; - CDEBUG(D_OTHER, "inherite striping defaults\n"); + CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n", + (int)lc->ldo_def_stripenr, + (int)lc->ldo_def_stripe_size, + (int)lc->ldo_def_stripe_offset); } return; } @@ -600,6 +781,104 @@ static void lod_ah_init(const struct lu_env *env, EXIT; } +#define ll_do_div64(aaa,bbb) do_div((aaa), (bbb)) +/* + * this function handles a special case when truncate was done + * on a stripeless object and now striping is being created + * we can't lose that size, so we have to propagate it to newly + * created object + */ +static int lod_declare_init_size(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + uint64_t size, offs; + int rc, stripe; + ENTRY; + + /* XXX: we support the simplest (RAID0) striping so far */ + LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0); + LASSERT(lo->ldo_stripe_size > 0); + + rc = dt_attr_get(env, next, attr, BYPASS_CAPA); + LASSERT(attr->la_valid & LA_SIZE); + if (rc) + RETURN(rc); + + size = attr->la_size; + if (size == 0) + RETURN(0); + + /* ll_do_div64(a, b) returns a % b, and a = a / b */ + ll_do_div64(size, (__u64) lo->ldo_stripe_size); + stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr); + + size = size * lo->ldo_stripe_size; + offs = attr->la_size; + size += ll_do_div64(offs, lo->ldo_stripe_size); + + attr->la_valid = LA_SIZE; + attr->la_size = size; + + rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th); + + RETURN(rc); +} + + +/** + * Create declaration of striped object + */ +int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, + const struct lu_buf *lovea, struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + ENTRY; + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) { + /* failed to create striping, let's reset + * config so that others don't get confused */ + lod_object_free_striping(env, lo); + GOTO(out, rc = -ENOMEM); + } + + /* choose OST and generate appropriate objects */ + rc = lod_qos_prep_create(env, lo, attr, lovea, th); + if (rc) { + /* failed to create striping, let's reset + * config so that others don't get confused */ + lod_object_free_striping(env, lo); + GOTO(out, rc); + } + + /* + * declare storage for striping data + */ + info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr, + lo->ldo_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1); + rc = dt_declare_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV, + 0, th); + if (rc) + GOTO(out, rc); + + /* + * if striping is created with local object's size > 0, + * we have to propagate this size to specific object + * the case is possible only when local object was created previously + */ + if (dt_object_exists(next)) + rc = lod_declare_init_size(env, dt, th); + +out: + RETURN(rc); +} + static int lod_declare_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, @@ -615,7 +894,6 @@ static int lod_declare_object_create(const struct lu_env *env, LASSERT(dof); LASSERT(attr); LASSERT(th); - LASSERT(!dt_object_exists(next)); /* * first of all, we declare creation of local object @@ -627,26 +905,88 @@ static int lod_declare_object_create(const struct lu_env *env, if (dof->dof_type == DFT_SYM) dt->do_body_ops = &lod_body_lnk_ops; - if (dof->dof_type == DFT_DIR && lo->ldo_striping_cached) { + /* + * it's lod_ah_init() who has decided the object will striped + */ + if (dof->dof_type == DFT_REGULAR) { + /* callers don't want stripes */ + /* XXX: all tricky interactions with ->ah_make_hint() decided + * to use striping, then ->declare_create() behaving differently + * should be cleaned */ + if (dof->u.dof_reg.striped == 0) + lo->ldo_stripenr = 0; + if (lo->ldo_stripenr > 0) + rc = lod_declare_striped_object(env, dt, attr, + NULL, th); + } else if (dof->dof_type == DFT_DIR && lo->ldo_striping_cached) { struct lod_thread_info *info = lod_env_info(env); - info->lti_buf.lb_buf = NULL; - info->lti_buf.lb_len = sizeof(struct lov_user_md_v3); + struct lov_user_md_v3 *v3; + + if (LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, + lo->ldo_def_stripenr, + lo->ldo_def_stripe_offset)) + RETURN(0); + + OBD_ALLOC_PTR(v3); + if (v3 == NULL) + RETURN(-ENOMEM); + + v3->lmm_magic = cpu_to_le32(LOV_MAGIC_V3); + v3->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0); + fid_to_lmm_oi(lu_object_fid(&dt->do_lu), &v3->lmm_oi); + lmm_oi_cpu_to_le(&v3->lmm_oi, &v3->lmm_oi); + v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size); + v3->lmm_stripe_count = cpu_to_le32(lo->ldo_def_stripenr); + v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset); + if (lo->ldo_pool) + strncpy(v3->lmm_pool_name, lo->ldo_pool, + LOV_MAXPOOLNAME); + + info->lti_buf.lb_buf = v3; + info->lti_buf.lb_len = sizeof(*v3); + /* to transfer default striping from the parent */ rc = dt_declare_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV, 0, th); + OBD_FREE_PTR(v3); } out: RETURN(rc); } +int lod_striping_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, struct dt_object_format *dof, + struct thandle *th) +{ + struct lod_object *lo = lod_dt_obj(dt); + int rc = 0, i; + ENTRY; + + LASSERT(lo->ldo_striping_cached == 0); + + /* create all underlying objects */ + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th); + + if (rc) + break; + } + if (rc == 0) + rc = lod_generate_and_set_lovea(env, lo, th); + + RETURN(rc); +} + static int lod_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_allocation_hint *hint, struct dt_object_format *dof, struct thandle *th) { struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); int rc; ENTRY; @@ -656,6 +996,8 @@ static int lod_object_create(const struct lu_env *env, struct dt_object *dt, if (rc == 0) { if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) rc = lod_store_def_striping(env, dt, th); + else if (lo->ldo_stripe) + rc = lod_striping_create(env, dt, attr, dof, th); } RETURN(rc); @@ -733,15 +1075,8 @@ static int lod_index_try(const struct lu_env *env, struct dt_object *dt, LASSERT(next->do_ops->do_index_try); rc = next->do_ops->do_index_try(env, next, feat); - if (next->do_index_ops && dt->do_index_ops == NULL) { + if (next->do_index_ops && dt->do_index_ops == NULL) dt->do_index_ops = &lod_index_ops; - /* XXX: iterators don't accept device, so bypass LOD */ - /* will be fixed with DNE */ - if (lod_index_ops.dio_it.fini == NULL) { - lod_index_ops.dio_it = next->do_index_ops->dio_it; - lod_index_ops.dio_it.init = lod_it_init; - } - } RETURN(rc); } @@ -782,6 +1117,23 @@ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt) return dt_object_sync(env, dt_object_child(dt)); } +static int lod_object_lock(const struct lu_env *env, + struct dt_object *dt, struct lustre_handle *lh, + struct ldlm_enqueue_info *einfo, + void *policy) +{ + struct dt_object *next = dt_object_child(dt); + int rc; + ENTRY; + + /* + * declare setattr on the local object + */ + rc = dt_object_lock(env, next, lh, einfo, policy); + + RETURN(rc); +} + struct dt_object_operations lod_obj_ops = { .do_read_lock = lod_object_read_lock, .do_write_lock = lod_object_write_lock, @@ -809,6 +1161,7 @@ struct dt_object_operations lod_obj_ops = { .do_ref_del = lod_ref_del, .do_capa_get = lod_capa_get, .do_object_sync = lod_object_sync, + .do_object_lock = lod_object_lock, }; static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt, @@ -882,6 +1235,7 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) lo->ldo_stripes_allocated = 0; } lo->ldo_stripenr = 0; + lo->ldo_pattern = 0; } /* @@ -932,3 +1286,51 @@ struct lu_object_operations lod_lu_obj_ops = { .loo_object_release = lod_object_release, .loo_object_print = lod_object_print, }; + +/** + * Init remote lod object + */ +static int lod_robject_init(const struct lu_env *env, struct lu_object *lo, + const struct lu_object_conf *conf) +{ + struct lod_device *lod = lu2lod_dev(lo->lo_dev); + struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; + struct lu_device *c_dev = NULL; + struct lu_object *c_obj; + int i; + ENTRY; + + lod_getref(ltd); + if (ltd->ltd_tgts_size > 0) { + cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) { + struct lod_tgt_desc *tgt; + tgt = LTD_TGT(ltd, i); + LASSERT(tgt && tgt->ltd_tgt); + if (tgt->ltd_index == + lu2lod_obj(lo)->ldo_mds_num) { + c_dev = &(tgt->ltd_tgt->dd_lu_dev); + break; + } + } + } + lod_putref(lod, ltd); + + if (unlikely(c_dev == NULL)) + RETURN(-ENOENT); + + c_obj = c_dev->ld_ops->ldo_object_alloc(env, lo->lo_header, c_dev); + if (unlikely(c_obj == NULL)) + RETURN(-ENOMEM); + + lu_object_add(lo, c_obj); + + RETURN(0); +} + +struct lu_object_operations lod_lu_robj_ops = { + .loo_object_init = lod_robject_init, + .loo_object_start = lod_object_start, + .loo_object_free = lod_object_free, + .loo_object_release = lod_object_release, + .loo_object_print = lod_object_print, +};