X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=070b2b6ca86a6c073c25847cfedb55269bf6d554;hp=e45e1959a522b3aac60edf426d810f0e75a4ecfc;hb=0607e01af74a81d0fe12ceec79bd22810a5dfe92;hpb=f7e40c602d4e5f449dcdade8f48f090b41a7743b diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index e45e195..070b2b6 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -23,7 +23,7 @@ * Copyright 2009 Sun Microsystems, Inc. All rights reserved * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Intel, Inc. + * Copyright (c) 2012, Intel Corporation. */ /* * lustre/lod/lod_object.c @@ -101,19 +101,138 @@ static struct dt_it *lod_it_init(const struct lu_env *env, struct dt_object *dt, __u32 attr, struct lustre_capa *capa) { - struct dt_object *next = dt_object_child(dt); + struct dt_object *next = dt_object_child(dt); + struct lod_it *it = &lod_env_info(env)->lti_it; + struct dt_it *it_next; + + + it_next = next->do_index_ops->dio_it.init(env, next, attr, capa); + if (IS_ERR(it_next)) + return it_next; + + /* currently we do not use more than one iterator per thread + * so we store it in thread info. if at some point we need + * more active iterators in a single thread, we can allocate + * additional ones */ + LASSERT(it->lit_obj == NULL); + + it->lit_it = it_next; + it->lit_obj = next; + + return (struct dt_it *)it; +} + +#define LOD_CHECK_IT(env, it) \ +{ \ + LASSERT((it)->lit_obj != NULL); \ + LASSERT((it)->lit_it != NULL); \ +} while(0) + +void lod_it_fini(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + + LOD_CHECK_IT(env, it); + it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it); + + /* the iterator not in use any more */ + it->lit_obj = NULL; + it->lit_it = NULL; +} + +int lod_it_get(const struct lu_env *env, struct dt_it *di, + const struct dt_key *key) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key); +} + +void lod_it_put(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it); +} + +int lod_it_next(const struct lu_env *env, struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it); +} + +struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di) +{ + const struct lod_it *it = (const struct lod_it *)di; - return next->do_index_ops->dio_it.init(env, next, attr, capa); + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it); +} + +int lod_it_key_size(const struct lu_env *env, const struct dt_it *di) +{ + struct lod_it *it = (struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it); +} + +int lod_it_rec(const struct lu_env *env, const struct dt_it *di, + struct dt_rec *rec, __u32 attr) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr); +} + +__u64 lod_it_store(const struct lu_env *env, const struct dt_it *di) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it); +} + +int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash); +} + +int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di, + void* key_rec) +{ + const struct lod_it *it = (const struct lod_it *)di; + + LOD_CHECK_IT(env, it); + return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it, key_rec); } static struct dt_index_operations lod_index_ops = { - .dio_lookup = lod_index_lookup, - .dio_declare_insert = lod_declare_index_insert, - .dio_insert = lod_index_insert, - .dio_declare_delete = lod_declare_index_delete, - .dio_delete = lod_index_delete, - .dio_it = { - .init = lod_it_init, + .dio_lookup = lod_index_lookup, + .dio_declare_insert = lod_declare_index_insert, + .dio_insert = lod_index_insert, + .dio_declare_delete = lod_declare_index_delete, + .dio_delete = lod_index_delete, + .dio_it = { + .init = lod_it_init, + .fini = lod_it_fini, + .get = lod_it_get, + .put = lod_it_put, + .next = lod_it_next, + .key = lod_it_key, + .key_size = lod_it_key_size, + .rec = lod_it_rec, + .store = lod_it_store, + .load = lod_it_load, + .key_rec = lod_it_key_rec, } }; @@ -235,7 +354,44 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, struct lu_buf *buf, const char *name, struct lustre_capa *capa) { - return dt_xattr_get(env, dt_object_child(dt), buf, name, capa); + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *dev = lu2lod_dev(dt->do_lu.lo_dev); + int rc, is_root; + ENTRY; + + rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa); + if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT)) + RETURN(rc); + + /* + * lod returns default striping on the real root of the device + * this is like the root stores default striping for the whole + * filesystem. historically we've been using a different approach + * and store it in the config. + */ + dt_root_get(env, dev->lod_child, &info->lti_fid); + is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu)); + + if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) { + struct lov_user_md *lum = buf->lb_buf; + struct lov_desc *desc = &dev->lod_desc; + + if (buf->lb_buf == NULL) { + rc = sizeof(struct lov_user_md_v1); + } else if (buf->lb_len >= sizeof(struct lov_user_md_v1)) { + lum->lmm_magic = LOV_USER_MAGIC_V1; + lum->lmm_oi.oi_seq = FID_SEQ_LOV_DEFAULT; + lum->lmm_pattern = desc->ld_pattern; + lum->lmm_stripe_size = desc->ld_default_stripe_size; + lum->lmm_stripe_count = desc->ld_default_stripe_count; + lum->lmm_stripe_offset = desc->ld_default_stripe_offset; + rc = sizeof(struct lov_user_md_v1); + } else { + rc = -ERANGE; + } + } + + RETURN(rc); } /* @@ -255,27 +411,134 @@ static int lod_declare_xattr_set(const struct lu_env *env, struct thandle *th) { struct dt_object *next = dt_object_child(dt); + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + __u32 mode; int rc; ENTRY; + /* + * allow to declare predefined striping on a new (!mode) object + * which is supposed to be replay of regular file creation + * (when LOV setting is declared) + * LU_XATTR_REPLACE is set to indicate a layout swap + */ + mode = dt->do_lu.lo_header->loh_attr & S_IFMT; + if ((S_ISREG(mode) || !mode) && !strcmp(name, XATTR_NAME_LOV) && + !(fl & LU_XATTR_REPLACE)) { + /* + * this is a request to manipulate object's striping + */ + if (dt_object_exists(dt)) { + rc = dt_attr_get(env, next, attr, BYPASS_CAPA); + if (rc) + RETURN(rc); + } else { + memset(attr, 0, sizeof(*attr)); + attr->la_valid = LA_TYPE | LA_MODE; + attr->la_mode = S_IFREG; + } + rc = lod_declare_striped_object(env, dt, attr, buf, th); + if (rc) + RETURN(rc); + } + rc = dt_declare_xattr_set(env, next, buf, name, fl, th); RETURN(rc); } +static int lod_xattr_set_lov_on_dir(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th, + struct lustre_capa *capa) +{ + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct dt_object *next = dt_object_child(dt); + struct lod_object *l = lod_dt_obj(dt); + struct lov_user_md_v1 *lum; + struct lov_user_md_v3 *v3 = NULL; + int rc; + ENTRY; + + LASSERT(l->ldo_stripe == NULL); + l->ldo_striping_cached = 0; + l->ldo_def_striping_set = 0; + lod_object_set_pool(l, NULL); + l->ldo_def_stripe_size = 0; + l->ldo_def_stripenr = 0; + + LASSERT(buf); + LASSERT(buf->lb_buf); + lum = buf->lb_buf; + + rc = lod_verify_striping(d, buf, 0); + if (rc) + RETURN(rc); + + if (lum->lmm_magic == LOV_USER_MAGIC_V3) + v3 = buf->lb_buf; + + /* if { size, offset, count } = { 0, -1, 0 } and no pool + * (i.e. all default values specified) then delete default + * striping from dir. */ + CDEBUG(D_OTHER, + "set default striping: sz %u # %u offset %d %s %s\n", + (unsigned)lum->lmm_stripe_size, + (unsigned)lum->lmm_stripe_count, + (int)lum->lmm_stripe_offset, + v3 ? "from" : "", v3 ? v3->lmm_pool_name : ""); + + if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size), + (lum->lmm_stripe_count), + (lum->lmm_stripe_offset)) && + lum->lmm_magic == LOV_USER_MAGIC_V1) { + rc = dt_xattr_del(env, next, name, th, capa); + if (rc == -ENODATA) + rc = 0; + } else { + rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + } + + RETURN(rc); +} + static int lod_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *th, struct lustre_capa *capa) { - struct dt_object *next = dt_object_child(dt); - int rc; + struct dt_object *next = dt_object_child(dt); + __u32 attr; + int rc; ENTRY; - /* - * behave transparantly for all other EAs - */ - rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + attr = dt->do_lu.lo_header->loh_attr & S_IFMT; + if (S_ISDIR(attr)) { + if (strncmp(name, XATTR_NAME_LOV, strlen(XATTR_NAME_LOV)) == 0) + rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, + fl, th, capa); + else + rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + + } else if (S_ISREG(attr) && !strcmp(name, XATTR_NAME_LOV)) { + /* in case of lov EA swap, just set it + * if not, it is a replay so check striping match what we + * already have during req replay, declare_xattr_set() + * defines striping, then create() does the work + */ + if (fl & LU_XATTR_REPLACE) + rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + else + rc = lod_striping_create(env, dt, NULL, NULL, th); + RETURN(rc); + } else { + /* + * behave transparantly for all other EAs + */ + rc = dt_xattr_set(env, next, buf, name, fl, th, capa); + } RETURN(rc); } @@ -320,6 +583,75 @@ int lod_object_set_pool(struct lod_object *o, char *pool) return 0; } +static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid) +{ + return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE); +} + +static int lod_cache_parent_striping(const struct lu_env *env, + struct lod_object *lp) +{ + struct lov_user_md_v1 *v1 = NULL; + struct lov_user_md_v3 *v3 = NULL; + int rc; + ENTRY; + + /* dt_ah_init() is called from MDD without parent being write locked + * lock it here */ + dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0); + if (lp->ldo_striping_cached) + GOTO(unlock, rc = 0); + + rc = lod_get_lov_ea(env, lp); + if (rc < 0) + GOTO(unlock, rc); + + if (rc < sizeof(struct lov_user_md)) { + /* don't lookup for non-existing or invalid striping */ + lp->ldo_def_striping_set = 0; + lp->ldo_striping_cached = 1; + lp->ldo_def_stripe_size = 0; + lp->ldo_def_stripenr = 0; + lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1); + GOTO(unlock, rc = 0); + } + + v1 = (struct lov_user_md_v1 *)lod_env_info(env)->lti_ea_store; + if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) + lustre_swab_lov_user_md_v1(v1); + else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) + lustre_swab_lov_user_md_v3(v3); + + if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1) + GOTO(unlock, rc = 0); + + if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0) + GOTO(unlock, rc = 0); + + lp->ldo_def_stripenr = v1->lmm_stripe_count; + lp->ldo_def_stripe_size = v1->lmm_stripe_size; + lp->ldo_def_stripe_offset = v1->lmm_stripe_offset; + lp->ldo_striping_cached = 1; + lp->ldo_def_striping_set = 1; + + if (v1->lmm_magic == LOV_USER_MAGIC_V3) { + /* XXX: sanity check here */ + v3 = (struct lov_user_md_v3 *) v1; + if (v3->lmm_pool_name[0]) + lod_object_set_pool(lp, v3->lmm_pool_name); + } + + CDEBUG(D_OTHER, "def. striping: # %d, sz %d, off %d %s%s on "DFID"\n", + lp->ldo_def_stripenr, lp->ldo_def_stripe_size, + lp->ldo_def_stripe_offset, v3 ? "from " : "", + v3 ? lp->ldo_pool : "", PFID(lu_object_fid(&lp->ldo_obj.do_lu))); + + EXIT; +unlock: + dt_write_unlock(env, dt_object_child(&lp->ldo_obj)); + return rc; +} + /** * used to transfer default striping data to the object being created */ @@ -329,15 +661,20 @@ static void lod_ah_init(const struct lu_env *env, struct dt_object *child, cfs_umode_t child_mode) { - struct dt_object *nextc; + struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev); struct dt_object *nextp = NULL; + struct dt_object *nextc; + struct lod_object *lp = NULL; struct lod_object *lc; + struct lov_desc *desc; ENTRY; LASSERT(child); - if (likely(parent)) + if (likely(parent)) { nextp = dt_object_child(parent); + lp = lod_dt_obj(parent); + } nextc = dt_object_child(child); lc = lod_dt_obj(child); @@ -350,12 +687,178 @@ static void lod_ah_init(const struct lu_env *env, * in case of late striping creation, ->ah_init() * can be called with local object existing */ - if (!dt_object_exists(nextc)) + if (!dt_object_exists(nextc) || dt_object_remote(nextc)) nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode); + if (S_ISDIR(child_mode)) { + if (lp->ldo_striping_cached == 0) { + /* we haven't tried to get default striping for + * the directory yet, let's cache it in the object */ + lod_cache_parent_striping(env, lp); + } + /* transfer defaults to new directory */ + if (lp->ldo_striping_cached) { + if (lp->ldo_pool) + lod_object_set_pool(lc, lp->ldo_pool); + lc->ldo_def_stripenr = lp->ldo_def_stripenr; + lc->ldo_def_stripe_size = lp->ldo_def_stripe_size; + lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset; + lc->ldo_striping_cached = 1; + lc->ldo_def_striping_set = 1; + CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n", + (int)lc->ldo_def_stripenr, + (int)lc->ldo_def_stripe_size, + (int)lc->ldo_def_stripe_offset); + } + return; + } + + /* + * if object is going to be striped over OSTs, transfer default + * striping information to the child, so that we can use it + * during declaration and creation + */ + if (!lod_object_will_be_striped(S_ISREG(child_mode), + lu_object_fid(&child->do_lu))) + return; + + /* + * try from the parent + */ + if (likely(parent)) { + if (lp->ldo_striping_cached == 0) { + /* we haven't tried to get default striping for + * the directory yet, let's cache it in the object */ + lod_cache_parent_striping(env, lp); + } + + lc->ldo_def_stripe_offset = (__u16) -1; + + if (lp->ldo_def_striping_set) { + if (lp->ldo_pool) + lod_object_set_pool(lc, lp->ldo_pool); + lc->ldo_stripenr = lp->ldo_def_stripenr; + lc->ldo_stripe_size = lp->ldo_def_stripe_size; + lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset; + CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n", + lc->ldo_stripenr, lc->ldo_stripe_size, + lp->ldo_pool ? lp->ldo_pool : ""); + } + } + + /* + * if the parent doesn't provide with specific pattern, grab fs-wide one + */ + desc = &d->lod_desc; + if (lc->ldo_stripenr == 0) + lc->ldo_stripenr = desc->ld_default_stripe_count; + if (lc->ldo_stripe_size == 0) + lc->ldo_stripe_size = desc->ld_default_stripe_size; + CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n", + lc->ldo_stripenr, lc->ldo_stripe_size, + lc->ldo_pool ? lc->ldo_pool : ""); + EXIT; } +#define ll_do_div64(aaa,bbb) do_div((aaa), (bbb)) +/* + * this function handles a special case when truncate was done + * on a stripeless object and now striping is being created + * we can't lose that size, so we have to propagate it to newly + * created object + */ +static int lod_declare_init_size(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + uint64_t size, offs; + int rc, stripe; + ENTRY; + + /* XXX: we support the simplest (RAID0) striping so far */ + LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0); + LASSERT(lo->ldo_stripe_size > 0); + + rc = dt_attr_get(env, next, attr, BYPASS_CAPA); + LASSERT(attr->la_valid & LA_SIZE); + if (rc) + RETURN(rc); + + size = attr->la_size; + if (size == 0) + RETURN(0); + + /* ll_do_div64(a, b) returns a % b, and a = a / b */ + ll_do_div64(size, (__u64) lo->ldo_stripe_size); + stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr); + + size = size * lo->ldo_stripe_size; + offs = attr->la_size; + size += ll_do_div64(offs, lo->ldo_stripe_size); + + attr->la_valid = LA_SIZE; + attr->la_size = size; + + rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th); + + RETURN(rc); +} + + +/** + * Create declaration of striped object + */ +int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, + const struct lu_buf *lovea, struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + ENTRY; + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) { + /* failed to create striping, let's reset + * config so that others don't get confused */ + lod_object_free_striping(env, lo); + GOTO(out, rc = -ENOMEM); + } + + /* choose OST and generate appropriate objects */ + rc = lod_qos_prep_create(env, lo, attr, lovea, th); + if (rc) { + /* failed to create striping, let's reset + * config so that others don't get confused */ + lod_object_free_striping(env, lo); + GOTO(out, rc); + } + + /* + * declare storage for striping data + */ + info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr, + lo->ldo_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1); + rc = dt_declare_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV, + 0, th); + if (rc) + GOTO(out, rc); + + /* + * if striping is created with local object's size > 0, + * we have to propagate this size to specific object + * the case is possible only when local object was created previously + */ + if (dt_object_exists(next)) + rc = lod_declare_init_size(env, dt, th); + +out: + RETURN(rc); +} + static int lod_declare_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, @@ -364,13 +867,13 @@ static int lod_declare_object_create(const struct lu_env *env, struct thandle *th) { struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); int rc; ENTRY; LASSERT(dof); LASSERT(attr); LASSERT(th); - LASSERT(!dt_object_exists(next)); /* * first of all, we declare creation of local object @@ -382,22 +885,103 @@ static int lod_declare_object_create(const struct lu_env *env, if (dof->dof_type == DFT_SYM) dt->do_body_ops = &lod_body_lnk_ops; + /* + * it's lod_ah_init() who has decided the object will striped + */ + if (dof->dof_type == DFT_REGULAR) { + /* callers don't want stripes */ + /* XXX: all tricky interactions with ->ah_make_hint() decided + * to use striping, then ->declare_create() behaving differently + * should be cleaned */ + if (dof->u.dof_reg.striped == 0) + lo->ldo_stripenr = 0; + if (lo->ldo_stripenr > 0) + rc = lod_declare_striped_object(env, dt, attr, + NULL, th); + } else if (dof->dof_type == DFT_DIR && lo->ldo_striping_cached) { + struct lod_thread_info *info = lod_env_info(env); + + struct lov_user_md_v3 *v3; + + if (LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, + lo->ldo_def_stripenr, + lo->ldo_def_stripe_offset)) + RETURN(0); + + OBD_ALLOC_PTR(v3); + if (v3 == NULL) + RETURN(-ENOMEM); + + v3->lmm_magic = cpu_to_le32(LOV_MAGIC_V3); + v3->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0); + fid_ostid_pack(lu_object_fid(&dt->do_lu), &v3->lmm_oi); + ostid_cpu_to_le(&v3->lmm_oi, &v3->lmm_oi); + v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size); + v3->lmm_stripe_count = cpu_to_le32(lo->ldo_def_stripenr); + v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset); + if (lo->ldo_pool) + strncpy(v3->lmm_pool_name, lo->ldo_pool, + LOV_MAXPOOLNAME); + + info->lti_buf.lb_buf = v3; + info->lti_buf.lb_len = sizeof(*v3); + + /* to transfer default striping from the parent */ + rc = dt_declare_xattr_set(env, next, &info->lti_buf, + XATTR_NAME_LOV, 0, th); + OBD_FREE_PTR(v3); + } + out: RETURN(rc); } +int lod_striping_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, struct dt_object_format *dof, + struct thandle *th) +{ + struct lod_object *lo = lod_dt_obj(dt); + int rc = 0, i; + ENTRY; + + LASSERT(lo->ldo_stripe); + LASSERT(lo->ldo_stripenr > 0); + LASSERT(lo->ldo_striping_cached == 0); + + /* create all underlying objects */ + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th); + + if (rc) + break; + } + if (rc == 0) + rc = lod_generate_and_set_lovea(env, lo, th); + + RETURN(rc); +} + static int lod_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_allocation_hint *hint, struct dt_object_format *dof, struct thandle *th) { struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); int rc; ENTRY; /* create local object */ rc = dt_create(env, next, attr, hint, dof, th); + if (rc == 0) { + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) + rc = lod_store_def_striping(env, dt, th); + else if (lo->ldo_stripe) + rc = lod_striping_create(env, dt, attr, dof, th); + } + RETURN(rc); } @@ -473,15 +1057,8 @@ static int lod_index_try(const struct lu_env *env, struct dt_object *dt, LASSERT(next->do_ops->do_index_try); rc = next->do_ops->do_index_try(env, next, feat); - if (next->do_index_ops && dt->do_index_ops == NULL) { + if (next->do_index_ops && dt->do_index_ops == NULL) dt->do_index_ops = &lod_index_ops; - /* XXX: iterators don't accept device, so bypass LOD */ - /* will be fixed with DNE */ - if (lod_index_ops.dio_it.fini == NULL) { - lod_index_ops.dio_it = next->do_index_ops->dio_it; - lod_index_ops.dio_it.init = lod_it_init; - } - } RETURN(rc); } @@ -551,6 +1128,26 @@ struct dt_object_operations lod_obj_ops = { .do_object_sync = lod_object_sync, }; +static int lod_object_lock(const struct lu_env *env, + struct dt_object *dt, struct lustre_handle *lh, + struct ldlm_enqueue_info *einfo, + void *policy) +{ + struct dt_object *next = dt_object_child(dt); + int rc; + ENTRY; + + /* + * declare setattr on the local object + */ + rc = dt_object_lock(env, next, lh, einfo, policy); + + RETURN(rc); +} + +struct dt_lock_operations lod_lock_ops = { + .do_object_lock = lod_object_lock, +}; static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt, struct lu_buf *buf, loff_t *pos, struct lustre_capa *capa) @@ -672,3 +1269,51 @@ struct lu_object_operations lod_lu_obj_ops = { .loo_object_release = lod_object_release, .loo_object_print = lod_object_print, }; + +/** + * Init remote lod object + */ +static int lod_robject_init(const struct lu_env *env, struct lu_object *lo, + const struct lu_object_conf *conf) +{ + struct lod_device *lod = lu2lod_dev(lo->lo_dev); + struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; + struct lu_device *c_dev = NULL; + struct lu_object *c_obj; + int i; + ENTRY; + + lod_getref(ltd); + if (ltd->ltd_tgts_size > 0) { + cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) { + struct lod_tgt_desc *tgt; + tgt = LTD_TGT(ltd, i); + LASSERT(tgt && tgt->ltd_tgt); + if (tgt->ltd_index == + lu2lod_obj(lo)->ldo_mds_num) { + c_dev = &(tgt->ltd_tgt->dd_lu_dev); + break; + } + } + } + lod_putref(lod, ltd); + + if (unlikely(c_dev == NULL)) + RETURN(-ENOENT); + + c_obj = c_dev->ld_ops->ldo_object_alloc(env, lo->lo_header, c_dev); + if (unlikely(c_obj == NULL)) + RETURN(-ENOMEM); + + lu_object_add(lo, c_obj); + + RETURN(0); +} + +struct lu_object_operations lod_lu_robj_ops = { + .loo_object_init = lod_robject_init, + .loo_object_start = lod_object_start, + .loo_object_free = lod_object_free, + .loo_object_release = lod_object_release, + .loo_object_print = lod_object_print, +};