* Copyright 2009 Sun Microsystems, Inc. All rights reserved
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Intel, Inc.
+ * Copyright (c) 2012, Intel Corporation.
*/
/*
* lustre/lod/lod_object.c
#define LOD_CHECK_IT(env, it) \
{ \
- /* IT is supposed to be in thread info always */ \
- LASSERT((it) == &lod_env_info(env)->lti_it); \
LASSERT((it)->lit_obj != NULL); \
LASSERT((it)->lit_it != NULL); \
} while(0)
* allow to declare predefined striping on a new (!mode) object
* which is supposed to be replay of regular file creation
* (when LOV setting is declared)
+ * LU_XATTR_REPLACE is set to indicate a layout swap
*/
mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
- if ((S_ISREG(mode) || !mode) && !strcmp(name, XATTR_NAME_LOV)) {
+ if ((S_ISREG(mode) || !mode) && !strcmp(name, XATTR_NAME_LOV) &&
+ !(fl & LU_XATTR_REPLACE)) {
/*
* this is a request to manipulate object's striping
*/
const char *name, int fl, struct thandle *th,
struct lustre_capa *capa)
{
- struct dt_object *next = dt_object_child(dt);
- __u32 attr;
- int rc;
+ struct dt_object *next = dt_object_child(dt);
+ __u32 attr;
+ int rc;
ENTRY;
attr = dt->do_lu.lo_header->loh_attr & S_IFMT;
rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
} else if (S_ISREG(attr) && !strcmp(name, XATTR_NAME_LOV)) {
- /*
- * XXX: check striping match what we already have
- * during req replay, declare_xattr_set() defines striping,
- * then create() does the work
- */
- rc = lod_striping_create(env, dt, NULL, NULL, th);
+ /* in case of lov EA swap, just set it
+ * if not, it is a replay so check striping match what we
+ * already have during req replay, declare_xattr_set()
+ * defines striping, then create() does the work
+ */
+ if (fl & LU_XATTR_REPLACE)
+ rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
+ else
+ rc = lod_striping_create(env, dt, NULL, NULL, th);
RETURN(rc);
} else {
/*
* in case of late striping creation, ->ah_init()
* can be called with local object existing
*/
- if (!dt_object_exists(nextc))
+ if (!dt_object_exists(nextc) || dt_object_remote(nextc))
nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
if (S_ISDIR(child_mode)) {
lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
lc->ldo_striping_cached = 1;
lc->ldo_def_striping_set = 1;
- CDEBUG(D_OTHER, "inherite striping defaults\n");
+ CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
+ (int)lc->ldo_def_stripenr,
+ (int)lc->ldo_def_stripe_size,
+ (int)lc->ldo_def_stripe_offset);
}
return;
}
LASSERT(dof);
LASSERT(attr);
LASSERT(th);
- LASSERT(!dt_object_exists(next));
/*
* first of all, we declare creation of local object
} else if (dof->dof_type == DFT_DIR && lo->ldo_striping_cached) {
struct lod_thread_info *info = lod_env_info(env);
- info->lti_buf.lb_buf = NULL;
- info->lti_buf.lb_len = sizeof(struct lov_user_md_v3);
+ struct lov_user_md_v3 *v3;
+
+ if (LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
+ lo->ldo_def_stripenr,
+ lo->ldo_def_stripe_offset))
+ RETURN(0);
+
+ OBD_ALLOC_PTR(v3);
+ if (v3 == NULL)
+ RETURN(-ENOMEM);
+
+ v3->lmm_magic = cpu_to_le32(LOV_MAGIC_V3);
+ v3->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
+ v3->lmm_object_id = fid_oid(lu_object_fid(&dt->do_lu));
+ v3->lmm_object_seq = fid_seq(lu_object_fid(&dt->do_lu));
+ v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
+ v3->lmm_stripe_count = cpu_to_le32(lo->ldo_def_stripenr);
+ v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
+ if (lo->ldo_pool)
+ strncpy(v3->lmm_pool_name, lo->ldo_pool,
+ LOV_MAXPOOLNAME);
+
+ info->lti_buf.lb_buf = v3;
+ info->lti_buf.lb_len = sizeof(*v3);
+
/* to transfer default striping from the parent */
rc = dt_declare_xattr_set(env, next, &info->lti_buf,
XATTR_NAME_LOV, 0, th);
+ OBD_FREE_PTR(v3);
}
out:
ENTRY;
LASSERT(lo->ldo_stripe);
- LASSERT(lo->ldo_stripe > 0);
+ LASSERT(lo->ldo_stripenr > 0);
LASSERT(lo->ldo_striping_cached == 0);
/* create all underlying objects */
.do_object_sync = lod_object_sync,
};
+static int lod_object_lock(const struct lu_env *env,
+ struct dt_object *dt, struct lustre_handle *lh,
+ struct ldlm_enqueue_info *einfo,
+ void *policy)
+{
+ struct dt_object *next = dt_object_child(dt);
+ int rc;
+ ENTRY;
+
+ /*
+ * declare setattr on the local object
+ */
+ rc = dt_object_lock(env, next, lh, einfo, policy);
+
+ RETURN(rc);
+}
+
+struct dt_lock_operations lod_lock_ops = {
+ .do_object_lock = lod_object_lock,
+};
static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
struct lu_buf *buf, loff_t *pos,
struct lustre_capa *capa)
.loo_object_release = lod_object_release,
.loo_object_print = lod_object_print,
};
+
+/**
+ * Init remote lod object
+ */
+static int lod_robject_init(const struct lu_env *env, struct lu_object *lo,
+ const struct lu_object_conf *conf)
+{
+ struct lod_device *lod = lu2lod_dev(lo->lo_dev);
+ struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
+ struct lu_device *c_dev = NULL;
+ struct lu_object *c_obj;
+ int i;
+ ENTRY;
+
+ lod_getref(ltd);
+ if (ltd->ltd_tgts_size > 0) {
+ cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
+ struct lod_tgt_desc *tgt;
+ tgt = LTD_TGT(ltd, i);
+ LASSERT(tgt && tgt->ltd_tgt);
+ if (tgt->ltd_index ==
+ lu2lod_obj(lo)->ldo_mds_num) {
+ c_dev = &(tgt->ltd_tgt->dd_lu_dev);
+ break;
+ }
+ }
+ }
+ lod_putref(lod, ltd);
+
+ if (unlikely(c_dev == NULL))
+ RETURN(-ENOENT);
+
+ c_obj = c_dev->ld_ops->ldo_object_alloc(env, lo->lo_header, c_dev);
+ if (unlikely(c_obj == NULL))
+ RETURN(-ENOMEM);
+
+ lu_object_add(lo, c_obj);
+
+ RETURN(0);
+}
+
+struct lu_object_operations lod_lu_robj_ops = {
+ .loo_object_init = lod_robject_init,
+ .loo_object_start = lod_object_start,
+ .loo_object_free = lod_object_free,
+ .loo_object_release = lod_object_release,
+ .loo_object_print = lod_object_print,
+};