for ((i=0; i < $ndevs; i++)); do
host=${host_names[$i]}
obd=$(remote_shell $host $lctl device_list | awk "{if (\$2 == \"UP\" &&
- \$3 == \"osc\") { print \$4 } }")
+ (\$3 == \"osc\" || \$3 == \"osp\")) { print \$4 } }")
if [ -z "$obd" ]; then
echo "Need obdfilter to test stripe_count"
exit 1
}
};
-static int mdd_device_init(const struct lu_env *env, struct lu_device *d,
- const char *name, struct lu_device *next)
+static int mdd_connect_to_next(const struct lu_env *env, struct mdd_device *m,
+ const char *nextdev)
{
- struct mdd_device *mdd = lu2mdd_dev(d);
- int rc;
- ENTRY;
+ struct obd_connect_data *data = NULL;
+ struct lu_device *lud = mdd2lu_dev(m);
+ struct obd_device *obd;
+ int rc;
+ ENTRY;
+
+ LASSERT(m->mdd_child_exp == NULL);
+
+ OBD_ALLOC(data, sizeof(*data));
+ if (data == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ obd = class_name2obd(nextdev);
+ if (obd == NULL) {
+ CERROR("can't locate next device: %s\n", nextdev);
+ GOTO(out, rc = -ENOTCONN);
+ }
+
+ data->ocd_connect_flags = OBD_CONNECT_VERSION;
+ data->ocd_version = LUSTRE_VERSION_CODE;
+
+ rc = obd_connect(NULL, &m->mdd_child_exp, obd, &obd->obd_uuid, data, NULL);
+ if (rc) {
+ CERROR("cannot connect to next dev %s (%d)\n", nextdev, rc);
+ GOTO(out, rc);
+ }
+
+ lud->ld_site = m->mdd_child_exp->exp_obd->obd_lu_dev->ld_site;
+ LASSERT(lud->ld_site);
+ m->mdd_child = lu2dt_dev(m->mdd_child_exp->exp_obd->obd_lu_dev);
+ lu_dev_add_linkage(lud->ld_site, lud);
- mdd->mdd_child = lu2dt_dev(next);
+out:
+ if (data)
+ OBD_FREE(data, sizeof(*data));
+ RETURN(rc);
+}
+
+static int mdd_init0(const struct lu_env *env, struct mdd_device *mdd,
+ struct lu_device_type *t, struct lustre_cfg *lcfg)
+{
+ int rc;
+ ENTRY;
+
+ mdd->mdd_md_dev.md_lu_dev.ld_ops = &mdd_lu_ops;
+ mdd->mdd_md_dev.md_ops = &mdd_ops;
- /* Prepare transactions callbacks. */
- mdd->mdd_txn_cb.dtc_txn_start = NULL;
- mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb;
- mdd->mdd_txn_cb.dtc_txn_commit = NULL;
- mdd->mdd_txn_cb.dtc_cookie = mdd;
- mdd->mdd_txn_cb.dtc_tag = LCT_MD_THREAD;
- CFS_INIT_LIST_HEAD(&mdd->mdd_txn_cb.dtc_linkage);
- mdd->mdd_atime_diff = MAX_ATIME_DIFF;
+ rc = mdd_connect_to_next(env, mdd, lustre_cfg_string(lcfg, 3));
+ if (rc)
+ RETURN(rc);
+
+ mdd->mdd_atime_diff = MAX_ATIME_DIFF;
/* sync permission changes */
mdd->mdd_sync_permission = 1;
- rc = mdd_procfs_init(mdd, name);
+ dt_conf_get(env, mdd->mdd_child, &mdd->mdd_dt_conf);
+
+ /* we are using service name but not mdd obd name
+ * for compatibility reasons.
+ * It is passed from MDT in lustre_cfg[2] buffer */
+ rc = mdd_procfs_init(mdd, lustre_cfg_string(lcfg, 2));
+ if (rc < 0)
+ obd_disconnect(mdd->mdd_child_exp);
+
RETURN(rc);
}
struct mdd_device *mdd = lu2mdd_dev(d);
int rc;
+ if (d->ld_site)
+ lu_dev_del_linkage(d->ld_site, d);
+
rc = mdd_procfs_fini(mdd);
if (rc) {
CERROR("proc fini error %d \n", rc);
ENTRY;
mdd_lfsck_cleanup(env, m);
mdd_changelog_fini(env, m);
- dt_txn_callback_del(m->mdd_child, &m->mdd_txn_cb);
if (m->mdd_dot_lustre_objs.mdd_obf)
mdd_object_put(env, m->mdd_dot_lustre_objs.mdd_obf);
if (m->mdd_dot_lustre)
mdd_object_put(env, m->mdd_dot_lustre);
- if (m->mdd_obd_dev)
- mdd_fini_obd(env, m, cfg);
orph_index_fini(env, m);
if (m->mdd_capa != NULL) {
lu_object_put(env, &m->mdd_capa->do_lu);
m->mdd_capa = NULL;
}
+ lu_site_purge(env, m->mdd_md_dev.md_lu_dev.ld_site, -1);
/* remove upcall device*/
md_upcall_fini(&m->mdd_md_dev);
+
+ if (m->mdd_child_exp)
+ obd_disconnect(m->mdd_child_exp);
+
EXIT;
}
struct md_object *obj, struct lu_buf *buf,
const char *name)
{
+ struct mdd_device *mdd = mdo2mdd(obj);
+ struct mdd_object *root;
+ struct lu_fid rootfid;
int rc = 0;
- /* XXX: a temp. solution till LOD/OSP is landed */
+ /*
+ * .lustre returns default striping which is 'stored'
+ * in the root
+ */
if (strcmp(name, XATTR_NAME_LOV) == 0) {
- if (buf->lb_buf == NULL) {
- rc = sizeof(struct lov_user_md);
- } else if (buf->lb_len >= sizeof(struct lov_user_md)) {
- rc = mdd_get_default_md(md2mdd_obj(obj), buf->lb_buf);
- } else {
- rc = -ERANGE;
- }
+ rc = dt_root_get(env, mdd->mdd_child, &rootfid);
+ if (rc)
+ return rc;
+ root = mdd_object_find(env, mdd, &rootfid);
+ if (IS_ERR(root))
+ return PTR_ERR(root);
+ rc = mdo_xattr_get(env, root, buf, name,
+ mdd_object_capa(env, md2mdd_obj(obj)));
+ mdd_object_put(env, root);
}
return rc;
sscanf(name, SFID, RFID(f));
if (!fid_is_sane(f)) {
CWARN("%s: bad FID format [%s], should be "DFID"\n",
- mdd->mdd_obd_dev->obd_name, lname->ln_name,
+ mdd2obd_dev(mdd)->obd_name, lname->ln_name,
(__u64)FID_SEQ_NORMAL, 1, 0);
GOTO(out, rc = -EINVAL);
}
if (!fid_is_norm(f)) {
CWARN("%s: "DFID" is invalid, sequence should be "
- ">= "LPX64"\n", mdd->mdd_obd_dev->obd_name, PFID(f),
+ ">= "LPX64"\n", mdd2obd_dev(mdd)->obd_name, PFID(f),
(__u64)FID_SEQ_NORMAL);
GOTO(out, rc = -EINVAL);
}
GOTO(out, rc);
dt->dd_ops->dt_conf_get(env, dt, &m->mdd_dt_conf);
- rc = mdd_init_obd(env, m, cfg);
- if (rc) {
- CERROR("lov init error %d\n", rc);
- GOTO(out, rc);
- }
-
mdd_changelog_init(env, m);
break;
case LCFG_CLEANUP:
+ rc = next->ld_ops->ldo_process_config(env, next, cfg);
lu_dev_del_linkage(d->ld_site, d);
mdd_device_shutdown(env, m, cfg);
+ break;
default:
rc = next->ld_ops->ldo_process_config(env, next, cfg);
break;
RETURN(rc);
}
-#if 0
-static int mdd_lov_set_nextid(const struct lu_env *env,
- struct mdd_device *mdd)
-{
- struct mds_obd *mds = &mdd->mdd_obd_dev->u.mds;
- int rc;
- ENTRY;
-
- LASSERT(mds->mds_lov_objids != NULL);
- rc = obd_set_info_async(mds->mds_lov_exp, strlen(KEY_NEXT_ID),
- KEY_NEXT_ID, mds->mds_lov_desc.ld_tgt_count,
- mds->mds_lov_objids, NULL);
-
- RETURN(rc);
-}
-
-static int mdd_cleanup_unlink_llog(const struct lu_env *env,
- struct mdd_device *mdd)
-{
- /* XXX: to be implemented! */
- return 0;
-}
-#endif
-
static int mdd_recovery_complete(const struct lu_env *env,
struct lu_device *d)
{
struct mdd_device *mdd = lu2mdd_dev(d);
struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
- struct obd_device *obd = mdd2obd_dev(mdd);
int rc;
ENTRY;
LASSERT(mdd != NULL);
- LASSERT(obd != NULL);
-#if 0
- /* XXX: Do we need this in new stack? */
- rc = mdd_lov_set_nextid(env, mdd);
- if (rc) {
- CERROR("mdd_lov_set_nextid() failed %d\n",
- rc);
- RETURN(rc);
- }
-
- /* XXX: cleanup unlink. */
- rc = mdd_cleanup_unlink_llog(env, mdd);
- if (rc) {
- CERROR("mdd_cleanup_unlink_llog() failed %d\n",
- rc);
- RETURN(rc);
- }
-#endif
- /* Call that with obd_recovering = 1 just to update objids */
- obd_notify(obd->u.mds.mds_lov_obd, NULL, (obd->obd_async_recov ?
- OBD_NOTIFY_SYNC_NONBLOCK : OBD_NOTIFY_SYNC), NULL);
-
- /* Drop obd_recovering to 0 and call o_postrecov to recover mds_lov */
- cfs_spin_lock(&obd->obd_dev_lock);
- obd->obd_recovering = 0;
- cfs_spin_unlock(&obd->obd_dev_lock);
- obd->obd_type->typ_dt_ops->o_postrecov(obd);
/* XXX: orphans handling. */
__mdd_orphan_cleanup(env, mdd);
if (rc)
GOTO(out, rc);
- dt_txn_callback_add(mdd->mdd_child, &mdd->mdd_txn_cb);
root = dt_store_open(env, mdd->mdd_child, "", mdd_root_dir_name,
&mdd->mdd_root_fid);
if (!IS_ERR(root)) {
struct md_device *m,
struct lustre_capa_key *key)
{
- struct mds_capa_info info = { .uuid = NULL, .capa = key };
- struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
- struct obd_export *lov_exp = mdd2obd_dev(mdd)->u.mds.mds_lov_exp;
- int rc;
- ENTRY;
-
- rc = obd_set_info_async(env, lov_exp, sizeof(KEY_CAPA_KEY),
- KEY_CAPA_KEY, sizeof(info), &info, NULL);
- RETURN(rc);
+ /* we do not support capabilities ... */
+ return -EINVAL;
}
static int mdd_llog_ctxt_get(const struct lu_env *env, struct md_device *m,
return (*h == NULL ? -ENOENT : 0);
}
+static struct lu_device *mdd_device_free(const struct lu_env *env,
+ struct lu_device *lu)
+{
+ struct mdd_device *m = lu2mdd_dev(lu);
+ ENTRY;
+
+ LASSERT(cfs_atomic_read(&lu->ld_ref) == 0);
+ md_device_fini(&m->mdd_md_dev);
+ OBD_FREE_PTR(m);
+ RETURN(NULL);
+}
+
static struct lu_device *mdd_device_alloc(const struct lu_env *env,
struct lu_device_type *t,
struct lustre_cfg *lcfg)
if (m == NULL) {
l = ERR_PTR(-ENOMEM);
} else {
- md_device_init(&m->mdd_md_dev, t);
+ int rc;
+
l = mdd2lu_dev(m);
- l->ld_ops = &mdd_lu_ops;
- m->mdd_md_dev.md_ops = &mdd_ops;
- md_upcall_init(&m->mdd_md_dev, NULL);
+ md_device_init(&m->mdd_md_dev, t);
+ rc = mdd_init0(env, m, t, lcfg);
+ if (rc != 0) {
+ mdd_device_free(env, l);
+ l = ERR_PTR(rc);
+ }
}
return l;
}
-static struct lu_device *mdd_device_free(const struct lu_env *env,
- struct lu_device *lu)
+/*
+ * we use exports to track all mdd users
+ */
+static int mdd_obd_connect(const struct lu_env *env, struct obd_export **exp,
+ struct obd_device *obd, struct obd_uuid *cluuid,
+ struct obd_connect_data *data, void *localdata)
{
- struct mdd_device *m = lu2mdd_dev(lu);
- ENTRY;
+ struct mdd_device *mdd = lu2mdd_dev(obd->obd_lu_dev);
+ struct lustre_handle conn;
+ int rc;
+ ENTRY;
- LASSERT(cfs_atomic_read(&lu->ld_ref) == 0);
- md_device_fini(&m->mdd_md_dev);
- OBD_FREE_PTR(m);
- RETURN(NULL);
+ CDEBUG(D_CONFIG, "connect #%d\n", mdd->mdd_connects);
+
+ rc = class_connect(&conn, obd, cluuid);
+ if (rc)
+ RETURN(rc);
+
+ *exp = class_conn2export(&conn);
+
+ /* Why should there ever be more than 1 connect? */
+ LASSERT(mdd->mdd_connects == 0);
+ mdd->mdd_connects++;
+
+ RETURN(0);
+}
+
+/*
+ * once last export (we don't count self-export) disappeared
+ * mdd can be released
+ */
+static int mdd_obd_disconnect(struct obd_export *exp)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct mdd_device *mdd = lu2mdd_dev(obd->obd_lu_dev);
+ int rc, release = 0;
+ ENTRY;
+
+ mdd->mdd_connects--;
+ if (mdd->mdd_connects == 0)
+ release = 1;
+
+ rc = class_disconnect(exp);
+
+ if (rc == 0 && release)
+ class_manual_cleanup(obd);
+ RETURN(rc);
+}
+
+static int mdd_obd_health_check(const struct lu_env *env,
+ struct obd_device *obd)
+{
+ struct mdd_device *mdd = lu2mdd_dev(obd->obd_lu_dev);
+ int rc;
+ ENTRY;
+
+ LASSERT(mdd);
+ rc = obd_health_check(env, mdd->mdd_child_exp->exp_obd);
+ RETURN(rc);
}
static struct obd_ops mdd_obd_device_ops = {
- .o_owner = THIS_MODULE
+ .o_owner = THIS_MODULE,
+ .o_connect = mdd_obd_connect,
+ .o_disconnect = mdd_obd_disconnect,
+ .o_health_check = mdd_obd_health_check
};
/* context key constructor/destructor: mdd_ucred_key_init, mdd_ucred_key_fini */
.ldto_device_alloc = mdd_device_alloc,
.ldto_device_free = mdd_device_free,
- .ldto_device_init = mdd_device_init,
.ldto_device_fini = mdd_device_fini
};
if (rc)
return rc;
- return mdd_declare_object_kill(env, obj, ma, handle);
+ return mdo_declare_destroy(env, obj, handle);
}
/* caller should take a lock before calling */
struct thandle *th)
{
int rc = 0;
- int reset = 1;
int is_dir = S_ISDIR(ma->ma_attr.la_mode);
ENTRY;
PFID(mdd_object_fid(obj)),
obj->mod_count);
} else {
- rc = mdd_object_kill(env, obj, ma, th);
- if (rc == 0)
- reset = 0;
+ rc = mdo_destroy(env, obj, th);
}
}
- if (reset)
- ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
RETURN(rc);
}
stop:
mdd_trans_stop(env, mdd, rc, handle);
-#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 55, 0)
- if (rc == 0 && ma->ma_valid & MA_COOKIE && ma->ma_valid & MA_LOV &&
- ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_UNLINK_DESTROY)
- /* Since echo client is incapable of destorying ost object,
- * it will destory the object here. */
- rc = mdd_lovobj_unlink(env, mdd, mdd_cobj, la, ma, 1);
-#else
-#warning "please remove this after 2.4 (LOD/OSP)."
-#endif
return rc;
}
}
-static int mdd_declare_create_data(const struct lu_env *env,
- struct mdd_device *mdd,
- struct mdd_object *obj,
- int lmm_size,
- struct thandle *handle)
-{
- struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
- int rc;
-
- buf->lb_buf = NULL;
- buf->lb_len = lmm_size;
- rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
- 0, handle);
- if (rc)
- return rc;
-
- rc = mdd_declare_lov_objid_update(env, mdd, handle);
-
- return rc;
-}
-
static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
struct md_object *cobj, const struct md_op_spec *spec,
struct md_attr *ma)
struct mdd_device *mdd = mdo2mdd(cobj);
struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
struct mdd_object *son = md2mdd_obj(cobj);
- struct lov_mds_md *lmm = NULL;
- int lmm_size = 0;
struct thandle *handle;
- struct lu_attr *attr = &mdd_env_info(env)->mti_la_for_fix;
+ const struct lu_buf *buf;
+ struct lu_attr *attr = &mdd_env_info(env)->mti_cattr;
int rc;
ENTRY;
+ /* do not let users to create stripes via .lustre/
+ * mdd_obf_setup() sets IMMUTE_OBJ on this directory */
+ if (pobj && mdd_pobj->mod_flags & IMMUTE_OBJ)
+ RETURN(-ENOENT);
+
rc = mdd_cd_sanity_check(env, son);
if (rc)
RETURN(rc);
if (!md_should_create(spec->sp_cr_flags))
RETURN(0);
- lmm_size = ma->ma_lmm_size;
-
- rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec, ma);
- if (rc)
- RETURN(rc);
+ /*
+ * there are following use cases for this function:
+ * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE
+ * striping can be specified or not
+ * 2) CMD?
+ */
rc = mdd_la_get(env, son, attr, mdd_object_capa(env, son));
if (rc)
RETURN(rc);
if (IS_ERR(handle))
GOTO(out_free, rc = PTR_ERR(handle));
- rc = mdd_declare_create_data(env, mdd, son, lmm_size, handle);
- if (rc)
- GOTO(stop, rc);
-
- rc = mdd_trans_start(env, mdd, handle);
- if (rc)
- GOTO(stop, rc);
-
/*
* XXX: Setting the lov ea is not locked but setting the attr is locked?
* Should this be fixed?
*/
+ CDEBUG(D_OTHER, "ea %p/%u, cr_flags %Lo, no_create %u\n",
+ spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
+ spec->sp_cr_flags, spec->no_create);
+
+ if (spec->no_create) {
+ /* replay case */
+ buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
+ spec->u.sp_ea.eadatalen);
+ } else if (!(spec->sp_cr_flags & MDS_OPEN_HAS_OBJS)) {
+ if (spec->sp_cr_flags & MDS_OPEN_HAS_EA) {
+ /* lfs setstripe */
+ buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
+ spec->u.sp_ea.eadatalen);
+ } else {
+ buf = &LU_BUF_NULL;
+ }
+ } else {
+ /* MDS_OPEN_HAS_OBJS is not used anymore ? */
+ LBUG();
+ }
- /* Replay creates has objects already */
-#if 0
- if (spec->no_create) {
- CDEBUG(D_INFO, "we already have lov ea\n");
- rc = mdd_lov_set_md(env, mdd_pobj, son,
- (struct lov_mds_md *)spec->u.sp_ea.eadata,
- spec->u.sp_ea.eadatalen, handle, 0);
- } else
-#endif
- /* No need mdd_lsm_sanity_check here */
- rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
- lmm_size, handle, 0);
+ rc = dt_declare_xattr_set(env, mdd_object_child(son), buf,
+ XATTR_NAME_LOV, 0, handle);
+ if (rc)
+ GOTO(stop, rc);
- /* update lov_objid data, must be before transaction stop! */
- if (rc == 0)
- mdd_lov_objid_update(mdd, lmm);
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop, rc);
+ rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV,
+ 0, handle, mdd_object_capa(env, son));
stop:
- mdd_trans_stop(env, mdd, rc, handle);
+ mdd_trans_stop(env, mdd, rc, handle);
out_free:
- /* Finish mdd_lov_create() stuff. */
- /* if no_create == 0 (not replay), we free lmm allocated by
- * mdd_lov_create() */
- mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
- RETURN(rc);
+ RETURN(rc);
}
/* Get fid from name and parent */
static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
struct mdd_object *p, struct mdd_object *c,
const struct lu_name *name,
- struct lu_attr *attr, int lmm_size,
+ struct lu_attr *attr,
int got_def_acl,
struct thandle *handle,
const struct md_op_spec *spec)
{
struct mdd_thread_info *info = mdd_env_info(env);
- struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
int rc = 0;
rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec);
if (rc)
GOTO(out, rc);
- rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV,
- 0, handle);
- if (rc)
- GOTO(out, rc);
+ /* replay case, create LOV EA from client data */
+ if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
+ const struct lu_buf *buf;
+
+ buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
+ spec->u.sp_ea.eadatalen);
+ rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV,
+ 0, handle);
+ if (rc)
+ GOTO(out, rc);
+ }
if (S_ISLNK(attr->la_mode)) {
rc = dt_declare_record_write(env, mdd_object_child(c),
if (rc)
return rc;
- rc = mdd_declare_lov_objid_update(env, mdd, handle);
-
out:
return rc;
}
struct mdd_object *son = md2mdd_obj(child);
struct mdd_device *mdd = mdo2mdd(pobj);
struct lu_attr *attr = &ma->ma_attr;
- struct lov_mds_md *lmm = NULL;
struct thandle *handle;
struct lu_attr *pattr = &info->mti_pattr;
struct dynlock_handle *dlh;
const char *name = lname->ln_name;
- int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
+ int rc, created = 0, initialized = 0, inserted = 0;
int got_def_acl = 0;
ENTRY;
RETURN(rc);
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
- GOTO(out_pending, rc = -EINPROGRESS);
-
- /*
- * No RPC inside the transaction, so OST objects should be created at
- * first.
- */
- if (S_ISREG(attr->la_mode)) {
- lmm_size = ma->ma_lmm_size;
- rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
- spec, ma);
- if (rc)
- GOTO(out_pending, rc);
- }
+ GOTO(out_free, rc = -EINPROGRESS);
if (!S_ISLNK(attr->la_mode)) {
struct lu_buf *acl_buf;
GOTO(out_free, rc = PTR_ERR(handle));
rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
- got_def_acl, lmm_size, handle, spec);
+ got_def_acl, handle, spec);
if (rc)
GOTO(out_stop, rc);
rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
son, attr, handle, spec);
+
+ /*
+ * in case of replay we just set LOVEA provided by the client
+ * XXX: I think it would be interesting to try "old" way where
+ * MDT calls this xattr_set(LOV) in a different transaction.
+ * probably this way we code can be made better.
+ */
+ if (rc == 0 &&
+ (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA))) {
+ const struct lu_buf *buf;
+
+ buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
+ spec->u.sp_ea.eadatalen);
+ rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle,
+ BYPASS_CAPA);
+ }
mdd_write_unlock(env, son);
if (rc)
/*
inserted = 1;
- /* No need mdd_lsm_sanity_check here */
- rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
- if (rc) {
- CERROR("error on stripe info copy %d \n", rc);
- GOTO(cleanup, rc);
- }
-
if (S_ISLNK(attr->la_mode)) {
struct md_ucred *uc = md_ucred(env);
struct dt_object *dt = mdd_object_child(son);
mdd_write_unlock(env, son);
}
- /* update lov_objid data, must be before transaction stop! */
- if (rc == 0)
- mdd_lov_objid_update(mdd, lmm);
-
mdd_pdo_write_unlock(env, mdd_pobj, dlh);
out_trans:
if (rc == 0)
out_stop:
mdd_trans_stop(env, mdd, rc, handle);
out_free:
- /* finish lov_create stuff, free all temporary data */
- mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
-out_pending:
/* The child object shouldn't be cached anymore */
if (rc)
cfs_set_bit(LU_OBJECT_HEARD_BANSHEE,
struct mdd_device {
struct md_device mdd_md_dev;
+ struct obd_export *mdd_child_exp;
struct dt_device *mdd_child;
- struct obd_device *mdd_obd_dev;
struct lu_fid mdd_root_fid;
struct dt_device_param mdd_dt_conf;
struct dt_object *mdd_orphans; /* PENDING directory */
struct dt_object *mdd_capa;
- struct dt_txn_callback mdd_txn_cb;
cfs_proc_dir_entry_t *mdd_proc_entry;
struct lprocfs_stats *mdd_stats;
struct mdd_changelog mdd_cl;
struct mdd_dot_lustre_objs mdd_dot_lustre_objs;
struct md_lfsck mdd_lfsck;
unsigned int mdd_sync_permission;
+ int mdd_connects;
};
enum mod_flags {
static inline struct obd_device *mdd2obd_dev(struct mdd_device *mdd)
{
- return mdd->mdd_obd_dev;
+ return (mdd->mdd_md_dev.md_lu_dev.ld_obd);
}
static inline struct mdd_device *mdd_obj2mdd_dev(struct mdd_object *obj)
struct mdd_device *mdd;
mdd = container_of0(lfsck, struct mdd_device, mdd_lfsck);
- return mdd->mdd_obd_dev->obd_name;
+ return mdd2obd_dev(mdd)->obd_name;
}
void mdd_lfsck_set_speed(struct md_lfsck *lfsck, __u32 limit)
lu_object_put(env, &obj->do_lu);
if (rc == -ENOTSUPP) {
CERROR("%s: Lustre LFSCK unsupported on this device.\n",
- mdd->mdd_obd_dev->obd_name);
+ mdd2obd_dev(mdd)->obd_name);
rc = 0;
}
return rc;
#include "mdd_internal.h"
-static int mdd_notify(struct obd_device *host, struct obd_device *watched,
- enum obd_notify_event ev, void *owner, void *data)
-{
- struct mdd_device *mdd = owner;
- int rc = 0;
- ENTRY;
-
- LASSERT(owner != NULL);
- switch (ev)
- {
- case OBD_NOTIFY_ACTIVE:
- case OBD_NOTIFY_SYNC:
- case OBD_NOTIFY_SYNC_NONBLOCK:
- rc = md_do_upcall(NULL, &mdd->mdd_md_dev,
- MD_LOV_SYNC, data);
- break;
- case OBD_NOTIFY_CONFIG:
- rc = md_do_upcall(NULL, &mdd->mdd_md_dev,
- MD_LOV_CONFIG, data);
- break;
- default:
- CDEBUG(D_INFO, "Unhandled notification %#x\n", ev);
- }
-
- RETURN(rc);
-}
-
-/* The obd is created for handling data stack for mdd */
-int mdd_init_obd(const struct lu_env *env, struct mdd_device *mdd,
- struct lustre_cfg *cfg)
-{
- char *dev = lustre_cfg_string(cfg, 0);
- int rc, name_size, uuid_size;
- char *name, *uuid;
- __u32 mds_id;
- struct lustre_cfg_bufs *bufs;
- struct lustre_cfg *lcfg;
- struct obd_device *obd;
- ENTRY;
-
- mds_id = lu_site2md(mdd2lu_dev(mdd)->ld_site)->ms_node_id;
- name_size = strlen(MDD_OBD_NAME) + 35;
- uuid_size = strlen(MDD_OBD_UUID) + 35;
-
- OBD_ALLOC(name, name_size);
- OBD_ALLOC(uuid, uuid_size);
- if (name == NULL || uuid == NULL)
- GOTO(cleanup_mem, rc = -ENOMEM);
-
- OBD_ALLOC_PTR(bufs);
- if (!bufs)
- GOTO(cleanup_mem, rc = -ENOMEM);
-
- snprintf(name, strlen(MDD_OBD_NAME) + 35, "%s-%s",
- MDD_OBD_NAME, dev);
-
- snprintf(uuid, strlen(MDD_OBD_UUID) + 35, "%s-%s",
- MDD_OBD_UUID, dev);
-
- lustre_cfg_bufs_reset(bufs, name);
- lustre_cfg_bufs_set_string(bufs, 1, MDD_OBD_TYPE);
- lustre_cfg_bufs_set_string(bufs, 2, uuid);
- lustre_cfg_bufs_set_string(bufs, 3, (char*)dev/* MDD_OBD_PROFILE */);
- lustre_cfg_bufs_set_string(bufs, 4, (char*)dev);
-
- lcfg = lustre_cfg_new(LCFG_ATTACH, bufs);
- OBD_FREE_PTR(bufs);
- if (!lcfg)
- GOTO(cleanup_mem, rc = -ENOMEM);
-
- rc = class_attach(lcfg);
- if (rc)
- GOTO(lcfg_cleanup, rc);
-
- obd = class_name2obd(name);
- if (!obd) {
- CERROR("Can not find obd %s\n", MDD_OBD_NAME);
- LBUG();
- }
-
- cfs_spin_lock(&obd->obd_dev_lock);
- obd->obd_recovering = 1;
- cfs_spin_unlock(&obd->obd_dev_lock);
- obd->u.mds.mds_id = mds_id;
- obd->u.obt.obt_osd_properties.osd_max_ea_size =
- mdd->mdd_dt_conf.ddp_max_ea_size;
-
- rc = class_setup(obd, lcfg);
- if (rc)
- GOTO(class_detach, rc);
-
- /*
- * Add here for obd notify mechanism, when adding a new ost, the mds
- * will notify this mdd.
- */
- obd->obd_upcall.onu_upcall = mdd_notify;
- obd->obd_upcall.onu_owner = mdd;
- mdd->mdd_obd_dev = obd;
-
- EXIT;
-class_detach:
- if (rc)
- class_detach(obd, lcfg);
-lcfg_cleanup:
- lustre_cfg_free(lcfg);
-cleanup_mem:
- if (name)
- OBD_FREE(name, name_size);
- if (uuid)
- OBD_FREE(uuid, uuid_size);
- return rc;
-}
-
-int mdd_fini_obd(const struct lu_env *env, struct mdd_device *mdd,
- struct lustre_cfg *lcfg)
-{
- struct obd_device *obd;
- int rc;
- ENTRY;
-
- obd = mdd2obd_dev(mdd);
- LASSERT(obd);
-
- rc = class_cleanup(obd, lcfg);
- if (rc)
- GOTO(lcfg_cleanup, rc);
-
- obd->obd_upcall.onu_upcall = NULL;
- obd->obd_upcall.onu_owner = NULL;
- rc = class_detach(obd, lcfg);
- if (rc)
- GOTO(lcfg_cleanup, rc);
- mdd->mdd_obd_dev = NULL;
-
- EXIT;
-lcfg_cleanup:
- return rc;
-}
-
int mdd_get_md(const struct lu_env *env, struct mdd_object *obj,
void *md, int *md_size, const char *name)
{
RETURN(rc);
}
-int mdd_get_md_locked(const struct lu_env *env, struct mdd_object *obj,
- void *md, int *md_size, const char *name)
-{
- int rc = 0;
- mdd_read_lock(env, obj, MOR_TGT_CHILD);
- rc = mdd_get_md(env, obj, md, md_size, name);
- mdd_read_unlock(env, obj);
- return rc;
-}
-
-static int mdd_lov_set_stripe_md(const struct lu_env *env,
- struct mdd_object *obj, struct lu_buf *buf,
- struct thandle *handle)
-{
- struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
- struct obd_device *obd = mdd2obd_dev(mdd);
- struct obd_export *lov_exp = obd->u.mds.mds_lov_exp;
- struct lov_stripe_md *lsm = NULL;
- int rc;
- ENTRY;
-
- LASSERT(S_ISDIR(mdd_object_type(obj)) || S_ISREG(mdd_object_type(obj)));
- rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, lov_exp, 0,
- &lsm, buf->lb_buf);
- if (rc)
- RETURN(rc);
- obd_free_memmd(lov_exp, &lsm);
-
- rc = mdd_xattr_set_txn(env, obj, buf, XATTR_NAME_LOV, 0, handle);
-
- CDEBUG(D_INFO, "set lov ea of "DFID" rc %d \n", PFID(mdo2fid(obj)), rc);
- RETURN(rc);
-}
-
-/*
- * Permission check is done before call it,
- * no need check again.
- */
-static int mdd_lov_set_dir_md(const struct lu_env *env,
- struct mdd_object *obj, struct lu_buf *buf,
- struct thandle *handle)
-{
- struct lov_user_md *lum = NULL;
- int rc = 0;
- ENTRY;
-
- LASSERT(S_ISDIR(mdd_object_type(obj)));
- lum = (struct lov_user_md*)buf->lb_buf;
-
- /* if { size, offset, count } = { 0, -1, 0 } and no pool
- * (i.e. all default values specified) then delete default
- * striping from dir. */
- if (LOVEA_DELETE_VALUES(lum->lmm_stripe_size, lum->lmm_stripe_count,
- lum->lmm_stripe_offset) &&
- lum->lmm_magic != LOV_USER_MAGIC_V3) {
- rc = mdd_xattr_set_txn(env, obj, &LU_BUF_NULL,
- XATTR_NAME_LOV, 0, handle);
- if (rc == -ENODATA)
- rc = 0;
- CDEBUG(D_INFO, "delete lov ea of "DFID" rc %d \n",
- PFID(mdo2fid(obj)), rc);
- } else {
- rc = mdd_lov_set_stripe_md(env, obj, buf, handle);
- }
- RETURN(rc);
-}
-
-int mdd_lsm_sanity_check(const struct lu_env *env, struct mdd_object *obj)
-{
- struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
- struct md_ucred *uc = md_ucred(env);
- int rc;
- ENTRY;
-
- rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
- if (rc)
- RETURN(rc);
-
- if ((uc->mu_fsuid != tmp_la->la_uid) &&
- !mdd_capable(uc, CFS_CAP_FOWNER))
- rc = mdd_permission_internal_locked(env, obj, tmp_la,
- MAY_WRITE, MOR_TGT_CHILD);
-
- RETURN(rc);
-}
-
-int mdd_lov_set_md(const struct lu_env *env, struct mdd_object *pobj,
- struct mdd_object *child, struct lov_mds_md *lmmp,
- int lmm_size, struct thandle *handle, int set_stripe)
-{
- struct lu_buf *buf;
- cfs_umode_t mode;
- int rc = 0;
- ENTRY;
-
- buf = mdd_buf_get(env, lmmp, lmm_size);
- mode = mdd_object_type(child);
- if (S_ISREG(mode) && lmm_size > 0) {
- if (set_stripe) {
- rc = mdd_lov_set_stripe_md(env, child, buf, handle);
- } else {
- rc = mdd_xattr_set_txn(env, child, buf,
- XATTR_NAME_LOV, 0, handle);
- }
- } else if (S_ISDIR(mode)) {
- if (lmmp == NULL && lmm_size == 0) {
- struct mdd_device *mdd = mdd_obj2mdd_dev(child);
- struct lov_mds_md *lmm = mdd_max_lmm_get(env, mdd);
- int size = sizeof(struct lov_mds_md_v3);
-
- /* Get parent dir stripe and set */
- if (pobj != NULL)
- rc = mdd_get_md_locked(env, pobj, lmm, &size,
- XATTR_NAME_LOV);
- if (rc > 0) {
- buf = mdd_buf_get(env, lmm, size);
- rc = mdd_xattr_set_txn(env, child, buf,
- XATTR_NAME_LOV, 0,
- handle);
- if (rc)
- CERROR("error on copy stripe info: rc "
- "= %d\n", rc);
- }
- } else {
- LASSERT(lmmp != NULL && lmm_size > 0);
- rc = mdd_lov_set_dir_md(env, child, buf, handle);
- }
- }
- CDEBUG(D_INFO, "Set lov md %p size %d for fid "DFID" rc %d\n",
- lmmp, lmm_size, PFID(mdo2fid(child)), rc);
- RETURN(rc);
-}
-
-int mdd_lov_objid_prepare(struct mdd_device *mdd, struct lov_mds_md *lmm)
-{
- /* copy mds_lov code is using wrong layer */
- return mds_lov_prepare_objids(mdd->mdd_obd_dev, lmm);
-}
-
-int mdd_declare_lov_objid_update(const struct lu_env *env,
- struct mdd_device *mdd,
- struct thandle *handle)
-{
- struct obd_device *obd = mdd2obd_dev(mdd);
- int size;
-
- /* in prepare we create local files */
- if (unlikely(mdd->mdd_capa == NULL))
- return 0;
-
- /* XXX: this is a temporary solution to declare llog changes
- * will be fixed in 2.3 with new llog implementation */
-
- size = obd->u.mds.mds_lov_desc.ld_tgt_count * sizeof(obd_id);
- return dt_declare_record_write(env, mdd->mdd_capa, size, 0, handle);
-}
-
-void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm)
-{
- /* copy mds_lov code is using wrong layer */
- mds_lov_update_objids(mdd->mdd_obd_dev, lmm);
-}
-
-void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd,
- struct lov_mds_md *lmm, int lmm_size,
- const struct md_op_spec *spec)
-{
- if (lmm && !spec->no_create)
- OBD_FREE_LARGE(lmm, lmm_size);
-}
-
-int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
- struct mdd_object *parent, struct mdd_object *child,
- struct lov_mds_md **lmm, int *lmm_size,
- const struct md_op_spec *spec, struct md_attr *ma)
-{
- struct obd_device *obd = mdd2obd_dev(mdd);
- struct obd_export *lov_exp = obd->u.mds.mds_lov_exp;
- struct lu_site *site = mdd2lu_dev(mdd)->ld_site;
- struct obdo *oa;
- struct lov_stripe_md *lsm = NULL;
- const void *eadata = spec->u.sp_ea.eadata;
- __u64 create_flags = spec->sp_cr_flags;
- struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti;
- struct lu_attr *la = &ma->ma_attr;
- int rc = 0;
- ENTRY;
-
- if (!md_should_create(create_flags)) {
- *lmm_size = 0;
- RETURN(0);
- }
- oti_init(oti, NULL);
-
- /* replay case, has objects already, only get lov from eadata */
- if (spec->no_create != 0) {
- *lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
- *lmm_size = spec->u.sp_ea.eadatalen;
- if (*lmm_size == lov_mds_md_size((*lmm)->lmm_stripe_count,
- (*lmm)->lmm_magic)) {
- RETURN(0);
- } else {
- CERROR("incorrect lsm received during recovery\n");
- RETURN(-EPROTO);
- }
- }
-
- if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO))
- GOTO(out_ids, rc = -ENOMEM);
-
- LASSERT(lov_exp != NULL);
- oa = &mdd_env_info(env)->mti_oa;
-
- oa->o_uid = 0; /* must have 0 uid / gid on OST */
- oa->o_gid = 0;
- oa->o_seq = mdt_to_obd_objseq(lu_site2md(site)->ms_node_id);
- oa->o_mode = S_IFREG | 0600;
- oa->o_id = fid_ver_oid(mdd_object_fid(child));
- oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS |
- OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP;
- oa->o_size = 0;
-
- if (!(create_flags & MDS_OPEN_HAS_OBJS)) {
- if (create_flags & MDS_OPEN_HAS_EA) {
- LASSERT(eadata != NULL);
- rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, lov_exp,
- 0, &lsm, (void*)eadata);
- if (rc)
- GOTO(out_oti, rc);
- } else {
- /* get lov ea from parent and set to lov */
- struct lov_mds_md *_lmm;
- int _lmm_size = mdd_lov_mdsize(env, mdd);
-
- LASSERT(parent != NULL);
-
- /*
- * can not create child's lov_mds_md by access it
- * thru .lustre path
- */
- if (mdd_object_obf(parent))
- GOTO(out_oti, rc = -EBADFD);
-
- _lmm = mdd_max_lmm_get(env, mdd);
- if (_lmm == NULL)
- GOTO(out_oti, rc = -ENOMEM);
-
- rc = mdd_get_md_locked(env, parent, _lmm,
- &_lmm_size,
- XATTR_NAME_LOV);
- if (rc > 0) {
- _lmm_size = mdd_lov_mdsize(env, mdd);
- rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE,
- lov_exp, _lmm_size,
- &lsm, _lmm);
- }
- if (rc)
- GOTO(out_oti, rc);
- }
-
- OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_OPEN_WAIT_CREATE, 10);
- rc = obd_create(env, lov_exp, oa, &lsm, oti);
- if (rc) {
- if (rc > 0) {
- CERROR("Create error for "DFID": %d\n",
- PFID(mdo2fid(child)), rc);
- rc = -EIO;
- }
- GOTO(out_oti, rc);
- }
-
- if (ma->ma_valid & MA_LAY_GEN)
- /* If we already have a lsm, the file is not new and we
- * are about to change the layout, so we have to bump
- * the generation. It is worth noting that old versions
- * will be confused by a non-zero gen, that's why
- * OBD_INCOMPAT_LMM_VER has been introduced */
- lsm->lsm_layout_gen = ma->ma_layout_gen + 1;
- else
- /* Start with a null generation for backward
- * compatiblity with old versions */
- lsm->lsm_layout_gen = 0;
-
- LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
- } else {
- LASSERT(eadata != NULL);
- rc = obd_iocontrol(OBD_IOC_LOV_SETEA, lov_exp, 0, &lsm,
- (void*)eadata);
- if (rc)
- GOTO(out_oti, rc);
-
- if (ma->ma_valid & MA_LAY_GEN)
- lsm->lsm_layout_gen = ma->ma_layout_gen;
- else
- lsm->lsm_layout_gen = 0;
- }
-
- lsm->lsm_object_id = fid_ver_oid(mdd_object_fid(child));
- lsm->lsm_object_seq = fid_seq(mdd_object_fid(child));
- /*
- * Sometimes, we may truncate some object(without lsm) then open it
- * (with write flags), so creating lsm above. The Nonzero(truncated)
- * size should tell ost, since size attr is in charge by OST.
- */
- if (la->la_size && la->la_valid & LA_SIZE) {
- struct obd_info *oinfo = &mdd_env_info(env)->mti_oi;
-
- memset(oinfo, 0, sizeof(*oinfo));
-
- /* When setting attr to ost, FLBKSZ is not needed. */
- oa->o_valid &= ~OBD_MD_FLBLKSZ;
- obdo_from_la(oa, la, LA_TYPE | LA_ATIME | LA_MTIME |
- LA_CTIME | LA_SIZE);
- /*
- * XXX: Pack lustre id to OST, in OST, it will be packed by
- * filter_fid, but can not see what is the usages. So just pack
- * o_seq o_ver here, maybe fix it after this cycle.
- */
- obdo_set_parent_fid(oa, mdd_object_fid(child));
- oinfo->oi_oa = oa;
- oinfo->oi_md = lsm;
- oinfo->oi_capa = NULL;
- oinfo->oi_policy.l_extent.start = la->la_size;
- oinfo->oi_policy.l_extent.end = OBD_OBJECT_EOF;
-
- rc = obd_punch_rqset(lov_exp, oinfo, oti);
- if (rc) {
- CERROR("Error setting attrs for "DFID": rc %d\n",
- PFID(mdo2fid(child)), rc);
- if (rc > 0) {
- CERROR("obd_setattr for "DFID" rc %d\n",
- PFID(mdo2fid(child)), rc);
- rc = -EIO;
- }
- GOTO(out_oti, rc);
- }
- }
- /* blksize should be changed after create data object */
- la->la_valid |= LA_BLKSIZE;
- la->la_blksize = oa->o_blksize;
- *lmm = NULL;
- rc = obd_packmd(lov_exp, lmm, lsm);
- if (rc < 0) {
- CERROR("Cannot pack lsm, err = %d\n", rc);
- GOTO(out_oti, rc);
- }
- if (mdd_lov_objid_prepare(mdd, *lmm) != 0) {
- CERROR("Not have memory for update objid\n");
- OBD_FREE(*lmm, rc);
- *lmm = NULL;
- GOTO(out_oti, rc = -ENOMEM);
- }
- *lmm_size = rc;
- rc = 0;
- EXIT;
-out_oti:
- oti_free_cookies(oti);
-out_ids:
- if (lsm)
- obd_free_memmd(lov_exp, &lsm);
-
- return rc;
-}
-
-/*
- * used when destroying orphans and from mds_reint_unlink() when MDS wants to
- * destroy objects on OSS.
- */
-int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd,
- struct mdd_object *obj, struct lu_attr *la,
- struct md_attr *ma, int log_unlink)
-{
- struct obd_device *obd = mdd2obd_dev(mdd);
- struct obd_export *lov_exp = obd->u.mds.mds_lov_exp;
- struct lov_stripe_md *lsm = NULL;
- struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti;
- struct obdo *oa = &mdd_env_info(env)->mti_oa;
- struct lu_site *site = mdd2lu_dev(mdd)->ld_site;
- struct lov_mds_md *lmm = ma->ma_lmm;
- int lmm_size = ma->ma_lmm_size;
- struct llog_cookie *logcookies = ma->ma_cookie;
- int rc;
- ENTRY;
-
- if (lmm_size == 0)
- RETURN(0);
-
- rc = obd_unpackmd(lov_exp, &lsm, lmm, lmm_size);
- if (rc < 0) {
- CERROR("Error unpack md %p\n", lmm);
- RETURN(rc);
- } else {
- LASSERT(rc >= sizeof(*lsm));
- rc = 0;
- }
-
- oa->o_id = lsm->lsm_object_id;
- oa->o_seq = mdt_to_obd_objseq(lu_site2md(site)->ms_node_id);
- oa->o_mode = la->la_mode & S_IFMT;
- oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
-
- oti_init(oti, NULL);
- if (log_unlink && logcookies) {
- oa->o_valid |= OBD_MD_FLCOOKIE;
- oti->oti_logcookies = logcookies;
- }
-
- if (!(ma->ma_attr_flags & MDS_UNLINK_DESTROY))
- oa->o_flags = OBD_FL_DELORPHAN;
-
- CDEBUG(D_INFO, "destroying OSS object "LPU64":"LPU64"\n", oa->o_seq,
- oa->o_id);
-
- rc = obd_destroy(env, lov_exp, oa, lsm, oti, NULL, NULL);
-
- obd_free_memmd(lov_exp, &lsm);
- RETURN(rc);
-}
-
-/*
- * called with obj locked.
- */
-int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
- struct mdd_object *obj, struct lu_attr *la)
-{
- struct md_attr *ma = &mdd_env_info(env)->mti_ma;
- int rc;
- ENTRY;
-
- LASSERT(mdd_write_locked(env, obj) != 0);
-
- if (unlikely(!S_ISREG(mdd_object_type(obj))))
- RETURN(0);
-
- if (unlikely(la->la_nlink != 0)) {
- CWARN("Attempt to destroy OSS object when nlink == %d\n",
- la->la_nlink);
- RETURN(0);
- }
-
- ma->ma_lmm_size = mdd_lov_mdsize(env, mdd);
- ma->ma_lmm = mdd_max_lmm_get(env, mdd);
- ma->ma_cookie_size = mdd_lov_cookiesize(env, mdd);
- ma->ma_cookie = mdd_max_cookie_get(env, mdd);
- if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
- RETURN(rc = -ENOMEM);
-
- /* get lov ea */
-
- rc = mdd_get_md(env, obj, ma->ma_lmm, &ma->ma_lmm_size,
- XATTR_NAME_LOV);
-
- if (rc <= 0) {
- CWARN("Get lov ea failed for "DFID" rc = %d\n",
- PFID(mdo2fid(obj)), rc);
- if (rc == 0)
- rc = -ENOENT;
- RETURN(rc);
- }
-
- ma->ma_valid = MA_LOV;
-
- rc = mdd_unlink_log(env, mdd, obj, ma);
- if (rc) {
- CWARN("mds unlink log for "DFID" failed: %d\n",
- PFID(mdo2fid(obj)), rc);
- RETURN(rc);
- }
-
- if (ma->ma_valid & MA_COOKIE)
- rc = mdd_lovobj_unlink(env, mdd, obj, la, ma, 1);
-
- RETURN(rc);
-}
-
-int mdd_declare_unlink_log(const struct lu_env *env, struct mdd_object *obj,
- struct md_attr *ma, struct thandle *handle)
-{
- struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
- int rc, i;
- __u16 stripe;
-
- LASSERT(obj);
- LASSERT(ma);
-
- if (!S_ISREG(lu_object_attr(&obj->mod_obj.mo_lu)))
- return 0;
-
- rc = mdd_lmm_get_locked(env, obj, ma);
- if (rc || !(ma->ma_valid & MA_LOV))
- return rc;
-
- LASSERT(ma->ma_lmm);
- if (le32_to_cpu(ma->ma_lmm->lmm_magic) != LOV_MAGIC_V1 &&
- le32_to_cpu(ma->ma_lmm->lmm_magic) != LOV_MAGIC_V3) {
- CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
- mdd->mdd_obd_dev->obd_name,
- le32_to_cpu(ma->ma_lmm->lmm_magic),
- PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
- return -EINVAL;
- }
-
- stripe = le16_to_cpu(ma->ma_lmm->lmm_stripe_count);
- if (stripe == LOV_ALL_STRIPES);
- stripe = mdd2obd_dev(mdd)->u.mds.mds_lov_desc.ld_tgt_count;
-
- for (i = 0; i < stripe; i++) {
- rc = mdd_declare_llog_record(env, mdd,
- sizeof(struct llog_unlink_rec),
- handle);
- if (rc)
- return rc;
- }
-
- return rc;
-}
-
-int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd,
- struct mdd_object *mdd_cobj, struct md_attr *ma)
-{
- LASSERT(ma->ma_valid & MA_LOV);
-
- if ((ma->ma_cookie_size > 0) &&
- (mds_log_op_unlink(mdd2obd_dev(mdd), ma->ma_lmm, ma->ma_lmm_size,
- ma->ma_cookie, ma->ma_cookie_size) > 0)) {
- CDEBUG(D_HA, "DEBUG: unlink log is added for object "DFID"\n",
- PFID(mdd_object_fid(mdd_cobj)));
- ma->ma_valid |= MA_COOKIE;
- }
- return 0;
-}
-
-int mdd_log_op_setattr(struct obd_device *obd, __u32 uid, __u32 gid,
- struct lov_mds_md *lmm, int lmm_size,
- struct llog_cookie *logcookies, int cookies_size)
-{
- struct mds_obd *mds = &obd->u.mds;
- struct lov_stripe_md *lsm = NULL;
- struct llog_setattr64_rec *lsr;
- struct llog_ctxt *ctxt;
- int rc;
- ENTRY;
-
- if (IS_ERR(mds->mds_lov_obd))
- RETURN(PTR_ERR(mds->mds_lov_obd));
-
- rc = obd_unpackmd(mds->mds_lov_exp, &lsm, lmm, lmm_size);
- if (rc < 0)
- RETURN(rc);
-
- OBD_ALLOC(lsr, sizeof(*lsr));
- if (!lsr)
- GOTO(out, rc = -ENOMEM);
-
- /* prepare setattr log record */
- lsr->lsr_hdr.lrh_len = lsr->lsr_tail.lrt_len = sizeof(*lsr);
- lsr->lsr_hdr.lrh_type = MDS_SETATTR64_REC;
- lsr->lsr_uid = uid;
- lsr->lsr_gid = gid;
-
- /* write setattr log */
- ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
- rc = llog_obd_add(NULL, ctxt, &lsr->lsr_hdr, lsm, logcookies,
- cookies_size / sizeof(struct llog_cookie));
-
- llog_ctxt_put(ctxt);
-
- OBD_FREE(lsr, sizeof(*lsr));
- out:
- obd_free_memmd(mds->mds_lov_exp, &lsm);
- RETURN(rc);
-}
-
-int mdd_setattr_log(const struct lu_env *env, struct mdd_device *mdd,
- const struct md_attr *ma,
- struct lov_mds_md *lmm, int lmm_size,
- struct llog_cookie *logcookies, int cookies_size)
-{
- struct obd_device *obd = mdd2obd_dev(mdd);
-
- /* journal chown/chgrp in llog, just like unlink */
- if (lmm_size > 0) {
- CDEBUG(D_INFO, "setattr llog for uid/gid=%lu/%lu\n",
- (unsigned long)ma->ma_attr.la_uid,
- (unsigned long)ma->ma_attr.la_gid);
- return mdd_log_op_setattr(obd, ma->ma_attr.la_uid,
- ma->ma_attr.la_gid, lmm,
- lmm_size, logcookies,
- cookies_size);
- } else
- return 0;
-}
-
-static int mdd_osc_setattr_async(struct obd_device *obd, __u32 uid, __u32 gid,
- struct lov_mds_md *lmm, int lmm_size,
- struct llog_cookie *logcookies, const struct lu_fid *parent,
- struct obd_capa *oc)
-{
- struct mds_obd *mds = &obd->u.mds;
- struct obd_trans_info oti = { 0 };
- struct obd_info oinfo = { { { 0 } } };
- int rc;
- ENTRY;
-
- if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OST_SETATTR))
- RETURN(0);
-
- /* first get memory EA */
- OBDO_ALLOC(oinfo.oi_oa);
- if (!oinfo.oi_oa)
- RETURN(-ENOMEM);
-
- LASSERT(lmm);
-
- rc = obd_unpackmd(mds->mds_lov_exp, &oinfo.oi_md, lmm, lmm_size);
- if (rc < 0) {
- CERROR("Error unpack md %p for obj "DFID"\n", lmm,
- PFID(parent));
- GOTO(out, rc);
- }
-
- /* then fill oa */
- oinfo.oi_oa->o_uid = uid;
- oinfo.oi_oa->o_gid = gid;
- oinfo.oi_oa->o_id = oinfo.oi_md->lsm_object_id;
- oinfo.oi_oa->o_seq = oinfo.oi_md->lsm_object_seq;
- oinfo.oi_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP |
- OBD_MD_FLUID | OBD_MD_FLGID;
- if (logcookies) {
- oinfo.oi_oa->o_valid |= OBD_MD_FLCOOKIE;
- oti.oti_logcookies = logcookies;
- }
-
- obdo_set_parent_fid(oinfo.oi_oa, parent);
- oinfo.oi_capa = oc;
-
- /* do async setattr from mds to ost not waiting for responses. */
- rc = obd_setattr_async(mds->mds_lov_exp, &oinfo, &oti, NULL);
- if (rc)
- CDEBUG(D_INODE, "mds to ost setattr objid 0x"LPX64
- " on ost error %d\n", oinfo.oi_md->lsm_object_id, rc);
-out:
- if (oinfo.oi_md)
- obd_free_memmd(mds->mds_lov_exp, &oinfo.oi_md);
- OBDO_FREE(oinfo.oi_oa);
- RETURN(rc);
-}
-
-int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj,
- struct lov_mds_md *lmm, int lmm_size,
- struct llog_cookie *logcookies)
-{
- struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
- struct obd_device *obd = mdd2obd_dev(mdd);
- struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
- const struct lu_fid *fid = mdd_object_fid(obj);
- int rc = 0;
- ENTRY;
-
- mdd_read_lock(env, obj, MOR_TGT_CHILD);
- rc = mdo_attr_get(env, obj, tmp_la, mdd_object_capa(env, obj));
- mdd_read_unlock(env, obj);
- if (rc)
- RETURN(rc);
-
- rc = mdd_osc_setattr_async(obd, tmp_la->la_uid, tmp_la->la_gid, lmm,
- lmm_size, logcookies, fid, NULL);
- RETURN(rc);
-}
-
int mdd_procfs_init(struct mdd_device *mdd, const char *name)
{
struct lprocfs_static_vars lvars;
- struct lu_device *ld = &mdd->mdd_md_dev.md_lu_dev;
struct obd_type *type;
int rc;
ENTRY;
- type = ld->ld_type->ldt_obd_type;
+ /* at the moment there is no linkage between lu_type
+ * and obd_type, so we lookup obd_type this way */
+ type = class_search_type(LUSTRE_MDD_NAME);
LASSERT(name != NULL);
LASSERT(type != NULL);
return 0;
}
-struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env,
- struct mdd_device *mdd)
-{
- struct mdd_thread_info *mti = mdd_env_info(env);
- int max_cookie_size;
-
- max_cookie_size = mdd_lov_cookiesize(env, mdd);
- if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
- if (mti->mti_max_cookie)
- OBD_FREE_LARGE(mti->mti_max_cookie,
- mti->mti_max_cookie_size);
- mti->mti_max_cookie = NULL;
- mti->mti_max_cookie_size = 0;
- }
- if (unlikely(mti->mti_max_cookie == NULL)) {
- OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
- if (likely(mti->mti_max_cookie != NULL))
- mti->mti_max_cookie_size = max_cookie_size;
- }
- if (likely(mti->mti_max_cookie != NULL))
- memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size);
- return mti->mti_max_cookie;
-}
-
-struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size)
-{
- struct mdd_thread_info *mti = mdd_env_info(env);
-
- if (unlikely(mti->mti_max_lmm_size < size)) {
- int rsize = size_roundup_power2(size);
-
- if (mti->mti_max_lmm_size > 0) {
- LASSERT(mti->mti_max_lmm);
- OBD_FREE_LARGE(mti->mti_max_lmm,
- mti->mti_max_lmm_size);
- mti->mti_max_lmm = NULL;
- mti->mti_max_lmm_size = 0;
- }
-
- OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize);
- if (likely(mti->mti_max_lmm != NULL))
- mti->mti_max_lmm_size = rsize;
- }
- return mti->mti_max_lmm;
-}
-
-struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
- struct mdd_device *mdd)
-{
- int max_lmm_size;
-
- max_lmm_size = mdd_lov_mdsize(env, mdd);
- return mdd_max_lmm_buffer(env, max_lmm_size);
-}
-
struct lu_object *mdd_object_alloc(const struct lu_env *env,
const struct lu_object_header *hdr,
struct lu_device *d)
RETURN(rc);
}
-/* get only inode attributes */
-int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
- struct md_attr *ma)
-{
- int rc = 0;
- ENTRY;
-
- if (ma->ma_valid & MA_INODE)
- RETURN(0);
-
- rc = mdd_la_get(env, mdd_obj, &ma->ma_attr,
- mdd_object_capa(env, mdd_obj));
- if (rc == 0)
- ma->ma_valid |= MA_INODE;
- RETURN(rc);
-}
-
-int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
-{
- struct lov_desc *ldesc;
- struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
- struct lov_user_md *lum = (struct lov_user_md*)lmm;
- ENTRY;
-
- if (!lum)
- RETURN(0);
-
- ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
- LASSERT(ldesc != NULL);
-
- lum->lmm_magic = LOV_MAGIC_V1;
- lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
- lum->lmm_pattern = ldesc->ld_pattern;
- lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
- lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
- lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
-
- RETURN(sizeof(*lum));
-}
-
-static int is_rootdir(struct mdd_object *mdd_obj)
-{
- const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
- const struct lu_fid *fid = mdo2fid(mdd_obj);
-
- return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
-}
-
-int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj,
- struct md_attr *ma)
-{
- struct mdd_thread_info *info = mdd_env_info(env);
- int size;
- int rc = -EINVAL;
- ENTRY;
-
- LASSERT(info != NULL);
- LASSERT(ma->ma_big_lmm_used == 0);
-
- if (ma->ma_lmm_size == 0) {
- CERROR("No buffer to hold %s xattr of object "DFID"\n",
- XATTR_NAME_LOV, PFID(mdd_object_fid(obj)));
- RETURN(rc);
- }
-
- rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
- mdd_object_capa(env, obj));
- if (rc < 0)
- RETURN(rc);
-
- /* big_lmm may need to grow */
- size = rc;
- mdd_max_lmm_buffer(env, size);
- if (info->mti_max_lmm == NULL)
- RETURN(-ENOMEM);
-
- LASSERT(info->mti_max_lmm_size >= size);
- rc = mdd_get_md(env, obj, info->mti_max_lmm, &size,
- XATTR_NAME_LOV);
- if (rc < 0)
- RETURN(rc);
-
- ma->ma_big_lmm_used = 1;
- ma->ma_valid |= MA_LOV;
- ma->ma_lmm = info->mti_max_lmm;
- ma->ma_lmm_size = size;
- LASSERT(size == rc);
- RETURN(rc);
-}
-
-/* get lov EA only */
-static int __mdd_lmm_get(const struct lu_env *env,
- struct mdd_object *mdd_obj, struct md_attr *ma)
-{
- int rc;
- ENTRY;
-
- if (ma->ma_valid & MA_LOV)
- RETURN(0);
-
- rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
- XATTR_NAME_LOV);
- if (rc == -ERANGE)
- rc = mdd_big_lmm_get(env, mdd_obj, ma);
- else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
- rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
-
- if (rc > 0) {
- ma->ma_lmm_size = rc;
- ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
- ma->ma_valid |= MA_LOV | MA_LAY_GEN;
- rc = 0;
- }
- RETURN(rc);
-}
-
-int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
- struct md_attr *ma)
-{
- int rc;
- ENTRY;
-
- mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
- rc = __mdd_lmm_get(env, mdd_obj, ma);
- mdd_read_unlock(env, mdd_obj);
- RETURN(rc);
-}
-
/*
* No permission check is needed.
*/
int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
struct md_attr *ma)
{
- struct mdd_object *mdd_obj = md2mdd_obj(obj);
- int rc;
-
+ int rc;
ENTRY;
- rc = mdd_iattr_get(env, mdd_obj, ma);
+
+ return mdd_la_get(env, md2mdd_obj(obj), &ma->ma_attr,
+ mdd_object_capa(env, md2mdd_obj(obj)));
RETURN(rc);
}
const char *name)
{
struct mdd_object *mdd_obj = md2mdd_obj(obj);
- struct mdd_device *mdd = mdo2mdd(obj);
- struct lu_fid rootfid;
- int is_root;
int rc;
ENTRY;
mdd_object_capa(env, mdd_obj));
mdd_read_unlock(env, mdd_obj);
- dt_root_get(env, mdd->mdd_child, &rootfid);
- is_root = lu_fid_eq(mdd_object_fid(mdd_obj), &rootfid);
-
- /* XXX: a temp. solution till LOD/OSP is landed */
- if (rc == -ENODATA && strcmp(name, XATTR_NAME_LOV) == 0 && is_root) {
- if (buf->lb_buf == NULL) {
- rc = sizeof(struct lov_user_md);
- } else if (buf->lb_len >= sizeof(struct lov_user_md)) {
- rc = mdd_get_default_md(mdd_obj, buf->lb_buf);
- } else {
- rc = -ERANGE;
- }
- }
-
RETURN(rc);
}
int rc;
ENTRY;
- if (feat != &dt_directory_features && feat != NULL)
+ if (feat != &dt_directory_features && feat != NULL) {
dof->dof_type = DFT_INDEX;
- else
- dof->dof_type = dt_mode_to_dft(attr->la_mode);
+ dof->u.dof_idx.di_feat = feat;
- dof->u.dof_idx.di_feat = feat;
+ } else {
+ dof->dof_type = dt_mode_to_dft(attr->la_mode);
+ if (dof->dof_type == DFT_REGULAR) {
+ dof->u.dof_reg.striped =
+ md_should_create(spec->sp_cr_flags);
+ if (spec->sp_cr_flags & MDS_OPEN_HAS_EA)
+ dof->u.dof_reg.striped = 0;
+ /* is this replay? */
+ if (spec->no_create)
+ dof->u.dof_reg.striped = 0;
+ }
+ }
rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
{
struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
- const struct dt_index_features *feat = spec->sp_feat;
int rc;
ENTRY;
- if (!mdd_object_exists(c)) {
- struct dt_object *next = mdd_object_child(c);
- LASSERT(next);
-
- if (feat != &dt_directory_features && feat != NULL)
- dof->dof_type = DFT_INDEX;
- else
- dof->dof_type = dt_mode_to_dft(attr->la_mode);
+ LASSERT(!mdd_object_exists(c));
- dof->u.dof_idx.di_feat = feat;
+ rc = mdo_create_obj(env, c, attr, hint, dof, handle);
- rc = mdo_create_obj(env, c, attr, hint, dof, handle);
- LASSERT(ergo(rc == 0, mdd_object_exists(c)));
- } else
- rc = -EEXIST;
+ LASSERT(ergo(rc == 0, mdd_object_exists(c)));
- RETURN(rc);
+ RETURN(rc);
}
/**
RETURN(rc);
}
-int mdd_attr_check_set_internal_locked(const struct lu_env *env,
- struct mdd_object *obj,
- struct lu_attr *attr,
- struct thandle *handle,
- int needacl)
-{
- int rc;
- ENTRY;
-
- needacl = needacl && (attr->la_valid & LA_MODE);
- if (needacl)
- mdd_write_lock(env, obj, MOR_TGT_CHILD);
- rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl);
- if (needacl)
- mdd_write_unlock(env, obj);
- RETURN(rc);
-}
-
-int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj,
- const struct lu_buf *buf, const char *name,
- int fl, struct thandle *handle)
-{
- struct lustre_capa *capa = mdd_object_capa(env, obj);
- int rc = -EINVAL;
- ENTRY;
-
- if (buf->lb_buf && buf->lb_len > 0)
- rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa);
- else if (buf->lb_buf == NULL && buf->lb_len == 0)
- rc = mdo_xattr_del(env, obj, name, handle, capa);
-
- RETURN(rc);
-}
-
/*
* This gives the same functionality as the code between
* sys_chmod and inode_setattr
}
/**
- * Should be called with write lock held.
- *
- * \see mdd_lma_set_locked().
- */
-static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
- const struct md_attr *ma, struct thandle *handle)
-{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct lu_buf *buf;
- struct lustre_mdt_attrs *lma =
- (struct lustre_mdt_attrs *) info->mti_xattr_buf;
- int lmasize = sizeof(struct lustre_mdt_attrs);
- int rc = 0;
-
- ENTRY;
-
- /* Either HSM or SOM part is not valid, we need to read it before */
- if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
- rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
- if (rc <= 0)
- RETURN(rc);
-
- lustre_lma_swab(lma);
- } else {
- memset(lma, 0, lmasize);
- }
-
- /* Copy HSM data */
- if (ma->ma_valid & MA_HSM) {
- lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
- lma->lma_compat |= LMAC_HSM;
- }
-
- /* Copy SOM data */
- if (ma->ma_valid & MA_SOM) {
- LASSERT(ma->ma_som != NULL);
- if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
- lma->lma_compat &= ~LMAC_SOM;
- } else {
- lma->lma_compat |= LMAC_SOM;
- lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
- lma->lma_som_size = ma->ma_som->msd_size;
- lma->lma_som_blocks = ma->ma_som->msd_blocks;
- lma->lma_som_mountid = ma->ma_som->msd_mountid;
- }
- }
-
- /* Copy FID */
- memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
-
- lustre_lma_swab(lma);
- buf = mdd_buf_get(env, lma, lmasize);
- rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
-
- RETURN(rc);
-}
-
-/**
* Save LMA extended attributes with data from \a ma.
*
* HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
* not, LMA EA will be first read from disk, modified and write back.
*
*/
-static int mdd_lma_set_locked(const struct lu_env *env,
- struct mdd_object *mdd_obj,
- const struct md_attr *ma, struct thandle *handle)
-{
- int rc;
-
- mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
- rc = __mdd_lma_set(env, mdd_obj, ma, handle);
- mdd_write_unlock(env, mdd_obj);
- return rc;
-}
-
/* Precedence for choosing record type when multiple
* attributes change: setattr > mtime > ctime > atime
* (ctime changes when mtime does, plus chmod/chown.
static int mdd_declare_attr_set(const struct lu_env *env,
struct mdd_device *mdd,
struct mdd_object *obj,
- const struct md_attr *ma,
- struct lov_mds_md *lmm,
+ const struct lu_attr *attr,
struct thandle *handle)
{
- struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
- struct lu_attr *attr = (struct lu_attr *) &ma->ma_attr;
- int rc, i;
+ int rc;
rc = mdo_declare_attr_set(env, obj, attr, handle);
if (rc)
if (rc)
return rc;
- if (ma->ma_valid & MA_LOV) {
- buf->lb_buf = NULL;
- buf->lb_len = ma->ma_lmm_size;
- rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
- 0, handle);
- if (rc)
- return rc;
- }
-
- if (ma->ma_valid & (MA_HSM | MA_SOM)) {
- buf->lb_buf = NULL;
- buf->lb_len = sizeof(struct lustre_mdt_attrs);
- rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
- 0, handle);
- if (rc)
- return rc;
- }
-
#ifdef CONFIG_FS_POSIX_ACL
if (attr->la_valid & LA_MODE) {
mdd_read_lock(env, obj, MOR_TGT_CHILD);
}
#endif
- /* basically the log is the same as in unlink case */
- if (lmm) {
- __u16 stripe;
-
- if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
- le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
- CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
- mdd->mdd_obd_dev->obd_name,
- le32_to_cpu(lmm->lmm_magic),
- PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
- return -EINVAL;
- }
-
- stripe = le16_to_cpu(lmm->lmm_stripe_count);
- if (stripe == LOV_ALL_STRIPES) {
- struct lov_desc *ldesc;
-
- ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
- LASSERT(ldesc != NULL);
- stripe = ldesc->ld_tgt_count;
- }
-
- for (i = 0; i < stripe; i++) {
- rc = mdd_declare_llog_record(env, mdd,
- sizeof(struct llog_unlink_rec),
- handle);
- if (rc)
- return rc;
- }
- }
-
return rc;
}
struct mdd_object *mdd_obj = md2mdd_obj(obj);
struct mdd_device *mdd = mdo2mdd(obj);
struct thandle *handle;
- struct lov_mds_md *lmm = NULL;
- struct llog_cookie *logcookies = NULL;
- int rc, lmm_size = 0, cookie_size = 0;
struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
const struct lu_attr *la = &ma->ma_attr;
+ int rc;
ENTRY;
+ /* we do not use ->attr_set() for LOV/SOM/HSM EA any more */
+ LASSERT((ma->ma_valid & MA_LOV) == 0);
+ LASSERT((ma->ma_valid & MA_HSM) == 0);
+ LASSERT((ma->ma_valid & MA_SOM) == 0);
+
*la_copy = ma->ma_attr;
rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags);
if (rc)
if (la->la_valid == LA_ATIME && la_copy->la_valid == 0)
RETURN(0);
- if (S_ISREG(mdd_object_type(mdd_obj)) &&
- ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
- lmm_size = mdd_lov_mdsize(env, mdd);
- lmm = mdd_max_lmm_get(env, mdd);
- if (lmm == NULL)
- RETURN(-ENOMEM);
-
- rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
- XATTR_NAME_LOV);
-
- if (rc < 0)
- RETURN(rc);
- }
-
handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
- rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
- lmm_size > 0 ? lmm : NULL, handle);
+ rc = mdd_declare_attr_set(env, mdd, mdd_obj, la, handle);
if (rc)
GOTO(stop, rc);
mdd_flags_xlate(mdd_obj, la_copy->la_flags);
} else if (la_copy->la_valid) { /* setattr */
rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
- /* journal chown/chgrp in llog, just like unlink */
- if (rc == 0 && lmm_size){
- cookie_size = mdd_lov_cookiesize(env, mdd);
- logcookies = mdd_max_cookie_get(env, mdd);
- if (logcookies == NULL)
- GOTO(cleanup, rc = -ENOMEM);
-
- if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size,
- logcookies, cookie_size) <= 0)
- logcookies = NULL;
- }
}
- if (rc == 0 && ma->ma_valid & MA_LOV) {
- cfs_umode_t mode;
-
- mode = mdd_object_type(mdd_obj);
- if (S_ISREG(mode) || S_ISDIR(mode)) {
- rc = mdd_lsm_sanity_check(env, mdd_obj);
- if (rc)
- GOTO(cleanup, rc);
-
- rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm,
- ma->ma_lmm_size, handle, 1);
- }
-
- }
- if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
- cfs_umode_t mode;
-
- mode = mdd_object_type(mdd_obj);
- if (S_ISREG(mode))
- rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
-
- }
-cleanup:
if (rc == 0)
rc = mdd_attr_set_changelog(env, obj, handle,
- ma->ma_attr.la_valid);
+ la->la_valid);
stop:
mdd_trans_stop(env, mdd, rc, handle);
- if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
- /*set obd attr, if needed*/
- rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
- logcookies);
- }
- RETURN(rc);
-}
-
-int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj,
- const struct lu_buf *buf, const char *name, int fl,
- struct thandle *handle)
-{
- int rc;
- ENTRY;
-
- mdd_write_lock(env, obj, MOR_TGT_CHILD);
- rc = __mdd_xattr_set(env, obj, buf, name, fl, handle);
- mdd_write_unlock(env, obj);
-
RETURN(rc);
}
int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
struct md_attr *ma, struct thandle *handle)
{
- int rc;
-
- rc = mdd_declare_unlink_log(env, obj, ma, handle);
- if (rc)
- return rc;
-
return mdo_declare_destroy(env, obj, handle);
}
int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
struct md_attr *ma, struct thandle *handle)
{
- int rc = 0;
+ int rc;
ENTRY;
- if (S_ISREG(mdd_object_type(obj))) {
- /* Return LOV & COOKIES unconditionally here. We clean evth up.
- * Caller must be ready for that. */
- rc = __mdd_lmm_get(env, obj, ma);
- if ((ma->ma_valid & MA_LOV))
- rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
- obj, ma);
- }
-
- if (rc == 0)
- rc = mdo_destroy(env, obj, handle);
+ rc = mdo_destroy(env, obj, handle);
RETURN(rc);
}
if (rc)
return rc;
- return mdd_declare_object_kill(env, obj, ma, handle);
+ return mdo_declare_destroy(env, obj, handle);
}
/*
struct mdd_object *mdd_obj = md2mdd_obj(obj);
struct mdd_device *mdd = mdo2mdd(obj);
struct thandle *handle = NULL;
- int rc;
- int is_orphan = 0, reset = 1;
+ int rc, is_orphan = 0;
ENTRY;
if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
/* Object maybe not in orphan list originally, it is rare case for
* mdd_finish_unlink() failure. */
if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
- /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
- if (ma->ma_valid & MA_FLAGS &&
- ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
- rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
- } else {
- if (handle == NULL) {
- handle = mdd_trans_create(env, mdo2mdd(obj));
- if (IS_ERR(handle))
- GOTO(out, rc = PTR_ERR(handle));
-
- rc = mdd_declare_object_kill(env, mdd_obj, ma,
- handle);
- if (rc)
- GOTO(out, rc);
-
- rc = mdd_declare_changelog_store(env, mdd,
- NULL, handle);
- if (rc)
- GOTO(stop, rc);
-
- rc = mdd_trans_start(env, mdo2mdd(obj), handle);
- if (rc)
- GOTO(out, rc);
- }
+ if (handle == NULL) {
+ handle = mdd_trans_create(env, mdo2mdd(obj));
+ if (IS_ERR(handle))
+ GOTO(out, rc = PTR_ERR(handle));
+
+ rc = mdo_declare_destroy(env, mdd_obj, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdd_declare_changelog_store(env, mdd,
+ NULL, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+ if (rc)
+ GOTO(out, rc);
+ }
- rc = mdd_object_kill(env, mdd_obj, ma, handle);
- if (rc == 0)
- reset = 0;
- }
+ rc = mdo_destroy(env, mdd_obj, handle);
if (rc != 0)
CERROR("Error when prepare to delete Object "DFID" , "
EXIT;
out:
- if (reset)
- ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
mdd_write_unlock(env, mdd_obj);
struct thandle *th)
{
struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+ struct dt_key *key;
int rc;
- rc = dt_declare_insert(env, mdd->mdd_orphans, NULL, NULL, th);
+ key = orph_key_fill(env, mdo2fid(obj), ORPH_OP_UNLINK);
+
+ rc = dt_declare_insert(env, mdd->mdd_orphans, NULL, key, th);
if (rc)
return rc;
RETURN(rc);
}
-/**
- * Destroy OSD object on MDD and associated OST objects.
- *
- * \param obj orphan object
- * \param mdd used for sending llog msg to osts
- *
- * \retval 0 success
- * \retval -ve error
- */
-static int orphan_object_kill(const struct lu_env *env,
- struct mdd_object *obj,
- struct mdd_device *mdd,
- struct thandle *th)
-{
- struct lu_attr *la = &mdd_env_info(env)->mti_la;
- int rc = 0;
- ENTRY;
-
- /* No need to lock this object as its recovery phase, and
- * no other thread can access it. But we need to lock it
- * as its precondition for osd api we using. */
-
- mdo_ref_del(env, obj, th);
- if (S_ISDIR(mdd_object_type(obj))) {
- mdo_ref_del(env, obj, th);
- mdd_orphan_ref_del(env, mdd, th);
- } else {
- /* regular file , cleanup linked ost objects */
- rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
- if (rc == 0)
- rc = mdd_lov_destroy(env, mdd, obj, la);
- }
- mdo_destroy(env, obj, th);
- RETURN(rc);
-}
-
int orph_declare_index_delete(const struct lu_env *env,
struct mdd_object *obj,
+
struct thandle *th)
{
struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+ struct dt_key *key;
int rc;
- rc = dt_declare_delete(env, mdd->mdd_orphans, NULL, th);
+ key = orph_key_fill(env, mdo2fid(obj), ORPH_OP_UNLINK);
+
+ rc = dt_declare_delete(env, mdd->mdd_orphans, key, th);
if (rc)
return rc;
{
struct thandle *th = NULL;
struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
- struct md_attr *ma = &mdd_env_info(env)->mti_ma;
int rc = 0;
ENTRY;
- /* init ma */
- ma->ma_lmm_size = mdd_lov_mdsize(env, mdd);
- ma->ma_lmm = mdd_max_lmm_get(env, mdd);
- ma->ma_cookie_size = mdd_lov_cookiesize(env, mdd);
- ma->ma_cookie = mdd_max_cookie_get(env, mdd);
- ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
- ma->ma_valid = 0;
-
th = mdd_trans_create(env, mdd);
if (IS_ERR(th)) {
CERROR("Cannot get thandle\n");
if (rc)
GOTO(stop, rc);
- rc = mdd_declare_object_kill(env, obj, ma, th);
+ rc = mdo_declare_destroy(env, obj, th);
if (rc)
GOTO(stop, rc);
if (likely(obj->mod_count == 0)) {
mdd_orphan_write_lock(env, mdd);
rc = mdd_orphan_delete_obj(env, mdd, key, th);
- if (rc == 0)
- orphan_object_kill(env, obj, mdd, th);
- else
+ if (rc == 0) {
+ mdo_ref_del(env, obj, th);
+ if (S_ISDIR(mdd_object_type(obj))) {
+ mdo_ref_del(env, obj, th);
+ mdd_orphan_ref_del(env, mdd, th);
+ }
+ rc = mdo_destroy(env, obj, th);
+ } else
CERROR("could not delete object: rc = %d\n",rc);
mdd_orphan_write_unlock(env, mdd);
}
if (rc) /* so replay-single.sh test_37 works */
CERROR("%s: error unlinking orphan "DFID" from "
"PENDING: rc = %d\n",
- mdd->mdd_obd_dev->obd_name, PFID(lf), rc);
+ mdd2obd_dev(mdd)->obd_name, PFID(lf), rc);
} else {
mdd_write_lock(env, mdo, MOR_TGT_CHILD);
if (likely(mdo->mod_count > 0)) {
if (IS_ERR(it)) {
rc = PTR_ERR(it);
CERROR("%s: cannot clean PENDING: rc = %d\n",
- mdd->mdd_obd_dev->obd_name, rc);
+ mdd2obd_dev(mdd)->obd_name, rc);
GOTO(out, rc);
}
GOTO(out_put, rc);
if (rc == 0) {
CERROR("%s: error loading iterator to clean PENDING\n",
- mdd->mdd_obd_dev->obd_name);
+ mdd2obd_dev(mdd)->obd_name);
/* Index contains no zero key? */
GOTO(out_put, rc = -EIO);
}
rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
if (rc != 0) {
CERROR("%s: fail to get FID for orphan it: rc = %d\n",
- mdd->mdd_obd_dev->obd_name, rc);
+ mdd2obd_dev(mdd)->obd_name, rc);
goto next;
}
fid_le_to_cpu(&fid, &ent->lde_fid);
if (!fid_is_sane(&fid)) {
CERROR("%s: bad FID "DFID" cleaning PENDING\n",
- mdd->mdd_obd_dev->obd_name, PFID(&fid));
+ mdd2obd_dev(mdd)->obd_name, PFID(&fid));
goto next;
}
#include "mdd_internal.h"
-int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
- void *cookie)
-{
- struct mdd_device *mdd = cookie;
- struct obd_device *obd = mdd2obd_dev(mdd);
-
- LASSERT(obd);
- return mds_lov_write_objids(obd);
-}
-
struct thandle *mdd_trans_create(const struct lu_env *env,
struct mdd_device *mdd)
{
}
if (reqbody->valid & OBD_MD_FLMODEASIZE) {
- repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
+ repbody->max_cookiesize = 0;
repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
repbody->valid |= OBD_MD_FLMODEASIZE;
CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & "
req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
info->mti_rr.rr_eadatalen);
+ /* llog cookies are always 0, the field is kept for compatibility */
if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
- req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
- info->mti_mdt->mdt_max_cookiesize);
+ req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
rc = req_capsule_server_pack(pill);
if (rc != 0) {
rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt);
if (rc || ctxt == NULL) {
- CERROR("Can't get mdd ctxt %d\n", rc);
- return rc;
+ /* XXX: no support for changelogs yet - in another patch */
+ /*CERROR("Can't get mdd ctxt %d\n", rc);*/
+#if LUSTRE_VERSION_CODE >= OBD_OCD_VERSION(2, 3, 90, 0)
+#error "do not forget about changelogs"
+#endif
+ return 0;
}
rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx);
req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
info->mti_body->eadatasize);
if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
- req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
- info->mti_mdt->mdt_max_cookiesize);
+ req_capsule_set_size(pill, &RMF_LOGCOOKIES,
+ RCL_SERVER, 0);
rc = req_capsule_server_pack(pill);
}
lu_dev_del_linkage(top->ld_site, top);
+ lu_site_purge(env, top->ld_site, -1);
+
bufs = &info->mti_u.bufs;
/* process cleanup, pass mdt obd name to get obd umount flags */
+ /* another purpose is to let all layers to release their objects */
lustre_cfg_bufs_reset(bufs, obd->obd_name);
if (obd->obd_force)
strcat(flags, "F");
top->ld_ops->ldo_process_config(env, top, lcfg);
lustre_cfg_free(lcfg);
- lu_stack_fini(env, top);
+ lu_site_purge(env, top->ld_site, -1);
+
m->mdt_child = NULL;
m->mdt_bottom = NULL;
- obd_disconnect(m->mdt_bottom_exp);
-}
-
-static struct lu_device *mdt_layer_setup(struct lu_env *env,
- const char *typename,
- struct lu_device *child,
- struct lustre_cfg *cfg)
-{
- const char *dev = lustre_cfg_string(cfg, 0);
- struct obd_type *type;
- struct lu_device_type *ldt;
- struct lu_device *d;
- int rc;
- ENTRY;
-
- /* find the type */
- type = class_get_type(typename);
- if (!type) {
- CERROR("Unknown type: '%s'\n", typename);
- GOTO(out, rc = -ENODEV);
- }
-
- rc = lu_env_refill((struct lu_env *)env);
- if (rc != 0) {
- CERROR("Failure to refill session: '%d'\n", rc);
- GOTO(out_type, rc);
- }
-
- ldt = type->typ_lu;
- if (ldt == NULL) {
- CERROR("type: '%s'\n", typename);
- GOTO(out_type, rc = -EINVAL);
- }
-
- ldt->ldt_obd_type = type;
- d = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
- if (IS_ERR(d)) {
- CERROR("Cannot allocate device: '%s'\n", typename);
- GOTO(out_type, rc = -ENODEV);
- }
-
- LASSERT(child->ld_site);
- d->ld_site = child->ld_site;
+ obd_disconnect(m->mdt_child_exp);
+ m->mdt_child_exp = NULL;
- type->typ_refcnt++;
- rc = ldt->ldt_ops->ldto_device_init(env, d, dev, child);
- if (rc) {
- CERROR("can't init device '%s', rc %d\n", typename, rc);
- GOTO(out_alloc, rc);
- }
- lu_device_get(d);
- lu_ref_add(&d->ld_reference, "lu-stack", &lu_site_init);
-
- lu_dev_add_linkage(d->ld_site, d);
- RETURN(d);
-out_alloc:
- ldt->ldt_ops->ldto_device_free(env, d);
- type->typ_refcnt--;
-out_type:
- class_put_type(type);
-out:
- return ERR_PTR(rc);
+ obd_disconnect(m->mdt_bottom_exp);
+ m->mdt_child_exp = NULL;
}
static int mdt_connect_to_next(const struct lu_env *env, struct mdt_device *m,
RETURN(rc);
}
-static int mdt_stack_init(struct lu_env *env,
- struct mdt_device *m,
- struct lustre_cfg *cfg,
- struct lustre_mount_info *lmi)
+static int mdt_stack_init(const struct lu_env *env, struct mdt_device *mdt,
+ struct lustre_cfg *cfg)
{
- struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
- struct lu_device *tmp;
- struct md_device *md;
- struct lu_device *child_lu_dev;
- char *osdname;
- int rc;
+ char *dev = lustre_cfg_string(cfg, 0);
+ int rc, name_size, uuid_size;
+ char *name, *uuid, *p;
+ struct lustre_cfg_bufs *bufs;
+ struct lustre_cfg *lcfg;
+ struct obd_device *obd;
+ struct lustre_profile *lprof;
+ struct lu_site *site;
ENTRY;
- /* find bottom osd */
- OBD_ALLOC(osdname, MTI_NAME_MAXLEN);
- if (osdname == NULL)
- RETURN(-ENOMEM);
+ /* in 1.8 we had the only device in the stack - MDS.
+ * 2.0 introduces MDT, MDD, OSD; MDT starts others internally.
+ * in 2.3 OSD is instantiated by obd_mount.c, so we need
+ * to generate names and setup MDT, MDD. MDT will be using
+ * generated name to connect to MDD. for MDD the next device
+ * will be LOD with name taken from so called "profile" which
+ * is generated by mount_option line
+ *
+ * 1.8 MGS generates config. commands like this:
+ * #06 (104)mount_option 0: 1:lustre-MDT0000 2:lustre-mdtlov
+ * #08 (120)setup 0:lustre-MDT0000 1:dev 2:type 3:lustre-MDT0000
+ * 2.0 MGS generates config. commands like this:
+ * #07 (112)mount_option 0: 1:lustre-MDT0000 2:lustre-MDT0000-mdtlov
+ * #08 (160)setup 0:lustre-MDT0000 1:lustre-MDT0000_UUID 2:0
+ * 3:lustre-MDT0000-mdtlov 4:f
+ *
+ * we generate MDD name from MDT one, just replacing T with D
+ *
+ * after all the preparations, the logical equivalent will be
+ * #01 (160)setup 0:lustre-MDD0000 1:lustre-MDD0000_UUID 2:0
+ * 3:lustre-MDT0000-mdtlov 4:f
+ * #02 (160)setup 0:lustre-MDT0000 1:lustre-MDT0000_UUID 2:0
+ * 3:lustre-MDD0000 4:f
+ *
+ * notice we build the stack from down to top: MDD first, then MDT */
+
+ name_size = MAX_OBD_NAME;
+ uuid_size = MAX_OBD_NAME;
+
+ OBD_ALLOC(name, name_size);
+ OBD_ALLOC(uuid, uuid_size);
+ if (name == NULL || uuid == NULL)
+ GOTO(cleanup_mem, rc = -ENOMEM);
+
+ OBD_ALLOC_PTR(bufs);
+ if (!bufs)
+ GOTO(cleanup_mem, rc = -ENOMEM);
+
+ strcpy(name, dev);
+ p = strstr(name, "-MDT");
+ if (p == NULL)
+ GOTO(cleanup_mem, rc = -ENOMEM);
+ p[3] = 'D';
+
+ snprintf(uuid, MAX_OBD_NAME, "%s_UUID", name);
+
+ lprof = class_get_profile(lustre_cfg_string(cfg, 0));
+ if (lprof == NULL || lprof->lp_dt == NULL) {
+ CERROR("can't find the profile: %s\n",
+ lustre_cfg_string(cfg, 0));
+ GOTO(cleanup_mem, rc = -EINVAL);
+ }
+
+ lustre_cfg_bufs_reset(bufs, name);
+ lustre_cfg_bufs_set_string(bufs, 1, LUSTRE_MDD_NAME);
+ lustre_cfg_bufs_set_string(bufs, 2, uuid);
+ lustre_cfg_bufs_set_string(bufs, 3, lprof->lp_dt);
+
+ lcfg = lustre_cfg_new(LCFG_ATTACH, bufs);
+ if (!lcfg)
+ GOTO(free_bufs, rc = -ENOMEM);
- snprintf(osdname, MTI_NAME_MAXLEN, "%s-osd", lustre_cfg_string(cfg, 0));
- rc = mdt_connect_to_next(env, m, osdname, &m->mdt_bottom_exp);
- OBD_FREE(osdname, MTI_NAME_MAXLEN);
+ rc = class_attach(lcfg);
if (rc)
- RETURN(rc);
+ GOTO(lcfg_cleanup, rc);
- tmp = m->mdt_bottom_exp->exp_obd->obd_lu_dev;
- LASSERT(tmp);
- m->mdt_bottom = lu2dt_dev(tmp);
+ obd = class_name2obd(name);
+ if (!obd) {
+ CERROR("Can not find obd %s (%s in config)\n",
+ MDD_OBD_NAME, lustre_cfg_string(cfg, 0));
+ GOTO(class_detach, rc = -EINVAL);
+ }
- /* initialize site's pointers: md_site, top device */
- d->ld_site = tmp->ld_site;
- d->ld_site->ls_top_dev = d;
- m->mdt_mite.ms_lu = tmp->ld_site;
- tmp->ld_site->ld_md_site = &m->mdt_mite;
- LASSERT(d->ld_site);
- d = tmp;
+ lustre_cfg_free(lcfg);
- tmp = mdt_layer_setup(env, LUSTRE_MDD_NAME, d, cfg);
- if (IS_ERR(tmp)) {
- GOTO(out, rc = PTR_ERR(tmp));
- }
- d = tmp;
- md = lu2md_dev(d);
+ lustre_cfg_bufs_reset(bufs, name);
+ lustre_cfg_bufs_set_string(bufs, 1, uuid);
+ lustre_cfg_bufs_set_string(bufs, 2, dev);
+ lustre_cfg_bufs_set_string(bufs, 3, lprof->lp_dt);
- tmp = mdt_layer_setup(env, LUSTRE_CMM_NAME, d, cfg);
- if (IS_ERR(tmp)) {
- GOTO(out, rc = PTR_ERR(tmp));
- }
- d = tmp;
- /*set mdd upcall device*/
- md_upcall_dev_set(md, lu2md_dev(d));
+ lcfg = lustre_cfg_new(LCFG_SETUP, bufs);
- md = lu2md_dev(d);
- /*set cmm upcall device*/
- md_upcall_dev_set(md, &m->mdt_md_dev);
+ rc = class_setup(obd, lcfg);
+ if (rc)
+ GOTO(class_detach, rc);
- m->mdt_child = lu2md_dev(d);
+ /* connect to MDD we just setup */
+ rc = mdt_connect_to_next(env, mdt, name, &mdt->mdt_child_exp);
+ if (rc)
+ RETURN(rc);
- /* process setup config */
- tmp = &m->mdt_md_dev.md_lu_dev;
- rc = tmp->ld_ops->ldo_process_config(env, tmp, cfg);
- if (rc)
- GOTO(out, rc);
+ site = mdt->mdt_child_exp->exp_obd->obd_lu_dev->ld_site;
+ LASSERT(site);
+ LASSERT(mdt->mdt_md_dev.md_lu_dev.ld_site == NULL);
+ mdt->mdt_md_dev.md_lu_dev.ld_site = site;
+ site->ls_top_dev = &mdt->mdt_md_dev.md_lu_dev;
+ mdt->mdt_child = lu2md_dev(mdt->mdt_child_exp->exp_obd->obd_lu_dev);
- /* initialize local objects */
- child_lu_dev = &m->mdt_child->md_lu_dev;
- rc = child_lu_dev->ld_ops->ldo_prepare(env,
- &m->mdt_md_dev.md_lu_dev,
- child_lu_dev);
+ /* now connect to bottom OSD */
+ snprintf(name, MAX_OBD_NAME, "%s-osd", dev);
+ rc = mdt_connect_to_next(env, mdt, name, &mdt->mdt_bottom_exp);
if (rc)
- GOTO(out, rc);
+ RETURN(rc);
+ mdt->mdt_bottom =
+ lu2dt_dev(mdt->mdt_bottom_exp->exp_obd->obd_lu_dev);
- rc = m->mdt_child->md_ops->mdo_root_get(env, m->mdt_child,
- &m->mdt_md_root_fid);
-out:
- /* fini from last known good lu_device */
- if (rc)
- mdt_stack_fini(env, m, d);
+ rc = lu_env_refill((struct lu_env *)env);
+ if (rc != 0)
+ CERROR("Failure to refill session: '%d'\n", rc);
- return rc;
+ lu_dev_add_linkage(site, &mdt->mdt_md_dev.md_lu_dev);
+
+ EXIT;
+class_detach:
+ if (rc)
+ class_detach(obd, lcfg);
+lcfg_cleanup:
+ lustre_cfg_free(lcfg);
+free_bufs:
+ OBD_FREE_PTR(bufs);
+cleanup_mem:
+ if (name)
+ OBD_FREE(name, name_size);
+ if (uuid)
+ OBD_FREE(uuid, uuid_size);
+ RETURN(rc);
}
/**
LASSERT(obd != NULL);
m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */
- m->mdt_max_cookiesize = sizeof(struct llog_cookie);
m->mdt_som_conf = 0;
obd->obd_lu_dev = &m->mdt_md_dev.md_lu_dev;
/* init the stack */
- rc = mdt_stack_init((struct lu_env *)env, m, cfg, lmi);
+ rc = mdt_stack_init((struct lu_env *)env, m, cfg);
if (rc) {
CERROR("Can't init device stack, rc %d\n", rc);
RETURN(rc);
s = m->mdt_md_dev.md_lu_dev.ld_site;
mite = &m->mdt_mite;
+ s->ld_md_site = mite;
/* set server index */
mite->ms_node_id = node_id;
if (rc)
GOTO(err_fini_stack, rc);
- rc = mdt_fld_init(env, obd->obd_name, m);
- if (rc)
- GOTO(err_lut, rc);
-
- rc = mdt_seq_init(env, obd->obd_name, m);
- if (rc)
- GOTO(err_fini_fld, rc);
-
snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
LUSTRE_MDT_NAME"-%p", m);
m->mdt_namespace = ldlm_namespace_new(obd, info->mti_u.ns_name,
GOTO(err_llog_cleanup, rc);
}
- target_recovery_init(&m->mdt_lut, mdt_recovery_handle);
-
rc = mdt_procfs_init(m, dev);
if (rc) {
CERROR("Can't init MDT lprocfs, rc %d\n", rc);
ping_evictor_start();
- if (obd->obd_recovering == 0)
- mdt_postrecov(env, m);
+ /* recovery will be started upon mdt_prepare()
+ * when the whole stack is complete and ready
+ * to serve the requests */
mdt_init_capa_ctxt(env, m);
obd->obd_namespace = m->mdt_namespace = NULL;
err_fini_seq:
mdt_seq_fini(env, m);
-err_fini_fld:
mdt_fld_fini(env, m);
-err_lut:
lut_fini(env, &m->mdt_lut);
err_fini_stack:
mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
mdto->mot_ioepoch_count, mdto->mot_writecount);
}
+static int mdt_prepare(const struct lu_env *env,
+ struct lu_device *pdev,
+ struct lu_device *cdev)
+{
+ struct mdt_device *mdt = mdt_dev(cdev);
+ struct lu_device *next = &mdt->mdt_child->md_lu_dev;
+ struct obd_device *obd = cdev->ld_obd;
+ int rc;
+
+ ENTRY;
+
+ LASSERT(obd);
+
+ rc = next->ld_ops->ldo_prepare(env, cdev, next);
+ if (rc)
+ RETURN(rc);
+
+ rc = mdt_fld_init(env, obd->obd_name, mdt);
+ if (rc)
+ RETURN(rc);
+
+ rc = mdt_seq_init(env, obd->obd_name, mdt);
+ if (rc)
+ RETURN(rc);
+
+ rc = mdt->mdt_child->md_ops->mdo_root_get(env, mdt->mdt_child,
+ &mdt->mdt_md_root_fid);
+ if (rc)
+ RETURN(rc);
+
+ LASSERT(!cfs_test_bit(MDT_FL_CFGLOG, &mdt->mdt_state));
+ target_recovery_init(&mdt->mdt_lut, mdt_recovery_handle);
+ cfs_set_bit(MDT_FL_CFGLOG, &mdt->mdt_state);
+ LASSERT(obd->obd_no_conn);
+ cfs_spin_lock(&obd->obd_dev_lock);
+ obd->obd_no_conn = 0;
+ cfs_spin_unlock(&obd->obd_dev_lock);
+
+ if (obd->obd_recovering == 0)
+ mdt_postrecov(env, mdt);
+
+ RETURN(rc);
+}
+
static const struct lu_device_operations mdt_lu_ops = {
.ldo_object_alloc = mdt_object_alloc,
.ldo_process_config = mdt_process_config,
+ .ldo_prepare = mdt_prepare,
};
static const struct lu_object_operations mdt_obj_ops = {
req = info->mti_pill->rc_req;
mdt = mdt_dev(obd->obd_lu_dev);
+ /*
+ * first, check whether the stack is ready to handle requests
+ * XXX: probably not very appropriate method is used now
+ * at some point we should find a better one
+ */
+ if (!cfs_test_bit(MDT_FL_SYNCED, &mdt->mdt_state)) {
+ rc = obd_health_check(env, mdt->mdt_child_exp->exp_obd);
+ if (rc)
+ RETURN(-EAGAIN);
+ cfs_set_bit(MDT_FL_SYNCED, &mdt->mdt_state);
+ }
+
rc = class_connect(&conn, obd, cluuid);
if (rc)
RETURN(rc);
if (!cfs_list_empty(&closing_list)) {
struct md_attr *ma = &info->mti_attr;
- int lmm_size;
- int cookie_size;
-
- lmm_size = mdt->mdt_max_mdsize;
- OBD_ALLOC_LARGE(ma->ma_lmm, lmm_size);
- if (ma->ma_lmm == NULL)
- GOTO(out_lmm, rc = -ENOMEM);
-
- cookie_size = mdt->mdt_max_cookiesize;
- OBD_ALLOC_LARGE(ma->ma_cookie, cookie_size);
- if (ma->ma_cookie == NULL)
- GOTO(out_cookie, rc = -ENOMEM);
/* Close any open files (which may also cause orphan unlinking). */
cfs_list_for_each_entry_safe(mfd, n, &closing_list, mfd_list) {
cfs_list_del_init(&mfd->mfd_list);
- memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
- ma->ma_lmm_size = lmm_size;
- ma->ma_cookie_size = cookie_size;
- ma->ma_need = 0;
- /* It is not for setattr, just tell MDD to send
- * DESTROY RPC to OSS if needed */
- ma->ma_valid = MA_FLAGS;
- ma->ma_attr_flags = MDS_CLOSE_CLEANUP;
- /* Don't unlink orphan on failover umount, LU-184 */
- if (exp->exp_flags & OBD_OPT_FAILOVER)
- ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
+ ma->ma_need = ma->ma_valid = 0;
+ /* Don't unlink orphan on failover umount, LU-184 */
+ if (exp->exp_flags & OBD_OPT_FAILOVER) {
+ ma->ma_valid = MA_FLAGS;
+ ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
+ }
mdt_mfd_close(info, mfd);
}
- OBD_FREE_LARGE(ma->ma_cookie, cookie_size);
- ma->ma_cookie = NULL;
-out_cookie:
- OBD_FREE_LARGE(ma->ma_lmm, lmm_size);
- ma->ma_lmm = NULL;
}
-out_lmm:
info->mti_mdt = NULL;
/* cleanup client slot early */
/* Do not erase record for recoverable client. */
RETURN(0);
}
-static void mdt_allow_cli(struct mdt_device *m, unsigned int flag)
-{
- if (flag & CONFIG_LOG)
- cfs_set_bit(MDT_FL_CFGLOG, &m->mdt_state);
-
- /* also notify active event */
- if (flag & CONFIG_SYNC)
- cfs_set_bit(MDT_FL_SYNCED, &m->mdt_state);
-
- if (cfs_test_bit(MDT_FL_CFGLOG, &m->mdt_state) &&
- cfs_test_bit(MDT_FL_SYNCED, &m->mdt_state)) {
- struct obd_device *obd = m->mdt_md_dev.md_lu_dev.ld_obd;
-
- /* Open for clients */
- if (obd->obd_no_conn) {
- cfs_spin_lock(&obd->obd_dev_lock);
- obd->obd_no_conn = 0;
- cfs_spin_unlock(&obd->obd_dev_lock);
- }
- }
-}
-
-static int mdt_upcall(const struct lu_env *env, struct md_device *md,
- enum md_upcall_event ev, void *data)
-{
- struct mdt_device *m = mdt_dev(&md->md_lu_dev);
- struct md_device *next = m->mdt_child;
- struct mdt_thread_info *mti;
- int rc = 0;
- ENTRY;
-
- switch (ev) {
- case MD_LOV_SYNC:
- rc = next->md_ops->mdo_maxsize_get(env, next,
- &m->mdt_max_mdsize,
- &m->mdt_max_cookiesize);
- CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
- m->mdt_max_mdsize, m->mdt_max_cookiesize);
- mdt_allow_cli(m, CONFIG_SYNC);
- if (data)
- (*(__u64 *)data) =
- m->mdt_lut.lut_obd->u.obt.obt_mount_count;
- break;
- case MD_NO_TRANS:
- mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
- mti->mti_no_need_trans = 1;
- CDEBUG(D_INFO, "disable mdt trans for this thread\n");
- break;
- case MD_LOV_CONFIG:
- /* Check that MDT is not yet configured */
- LASSERT(!cfs_test_bit(MDT_FL_CFGLOG, &m->mdt_state));
- break;
- default:
- CERROR("invalid event\n");
- rc = -EINVAL;
- break;
- }
- RETURN(rc);
-}
-
-static int mdt_obd_notify(struct obd_device *obd,
- struct obd_device *watched,
- enum obd_notify_event ev, void *data)
-{
- struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
- ENTRY;
-
- switch (ev) {
- case OBD_NOTIFY_CONFIG:
- mdt_allow_cli(mdt, (unsigned long)data);
- break;
- default:
- CDEBUG(D_INFO, "Unhandled notification %#x\n", ev);
- }
- RETURN(0);
-}
-
static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key,
void *val, int vallen)
{
.o_destroy_export = mdt_destroy_export,
.o_iocontrol = mdt_iocontrol,
.o_postrecov = mdt_obd_postrecov,
- .o_notify = mdt_obd_notify
};
static struct lu_device* mdt_device_fini(const struct lu_env *env,
l = ERR_PTR(rc);
return l;
}
- md_upcall_init(&m->mdt_md_dev, mdt_upcall);
} else
l = ERR_PTR(-ENOMEM);
return l;
mdt_pack_attr2body(info, repbody, la, mdt_object_fid(mo));
if (ma->ma_valid & MA_LOV) {
- __u32 mode;
-
- if (mdt_object_exists(mo) < 0)
- /* If it is a remote object, and we do not retrieve
- * EA back unlink reg file*/
- mode = S_IFREG;
- else
- mode = lu_object_attr(&mo->mot_obj.mo_lu);
-
- LASSERT(ma->ma_lmm_size);
- mdt_dump_lmm(D_INFO, ma->ma_lmm);
- repbody->eadatasize = ma->ma_lmm_size;
- if (S_ISREG(mode))
- repbody->valid |= OBD_MD_FLEASIZE;
- else if (S_ISDIR(mode))
- repbody->valid |= OBD_MD_FLDIREA;
- else
- LBUG();
+ CERROR("No need in LOV EA upon unlink\n");
+ dump_stack();
}
+ repbody->eadatasize = 0;
if (ma->ma_cookie_size && (ma->ma_valid & MA_COOKIE)) {
repbody->aclsize = ma->ma_cookie_size;
}
}
- ma->ma_cookie_size = req_capsule_get_size(pill, &RMF_LOGCOOKIES,
- RCL_CLIENT);
- if (ma->ma_cookie_size) {
- ma->ma_cookie = req_capsule_client_get(pill, &RMF_LOGCOOKIES);
- ma->ma_valid |= MA_COOKIE;
- }
-
rc = mdt_dlmreq_unpack(info);
RETURN(rc);
}
ma->ma_attr_flags &= ~MDS_VTX_BYPASS;
info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info));
- /* last unlink need LOV EA sent back */
- rr->rr_eadatalen = info->mti_mdt->mdt_max_mdsize;
rc = mdt_dlmreq_unpack(info);
RETURN(rc);
ma->ma_attr_flags &= ~MDS_VTX_BYPASS;
info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info));
- /* rename may contain unlink so we might need LOV EA sent back */
- rr->rr_eadatalen = info->mti_mdt->mdt_max_mdsize;
rc = mdt_dlmreq_unpack(info);
RETURN(rc);
}
+/*
+ * please see comment above LOV_MAGIC_V1_DEF
+ */
+static void mdt_fix_lov_magic(struct mdt_thread_info *info)
+{
+ struct mdt_reint_record *rr = &info->mti_rr;
+ struct lov_user_md_v1 *v1;
+
+ v1 = (void *)rr->rr_eadata;
+ LASSERT(v1);
+
+ if (unlikely(req_is_replay(mdt_info_req(info)))) {
+ if (v1->lmm_magic == LOV_USER_MAGIC_V1) {
+ v1->lmm_magic = LOV_MAGIC_V1_DEF;
+ } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
+ v1->lmm_magic = __swab32(LOV_MAGIC_V1_DEF);
+ } else if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
+ v1->lmm_magic = LOV_MAGIC_V3_DEF;
+ } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
+ v1->lmm_magic = __swab32(LOV_MAGIC_V3_DEF);
+ }
+ }
+}
+
static int mdt_open_unpack(struct mdt_thread_info *info)
{
struct md_ucred *uc = mdt_ucred(info);
sp->u.sp_ea.eadatalen = rr->rr_eadatalen;
sp->u.sp_ea.eadata = rr->rr_eadata;
sp->no_create = !!req_is_replay(req);
+ mdt_fix_lov_magic(info);
}
/*
static int mdt_som_attr_set(struct mdt_thread_info *info,
struct mdt_object *obj, __u64 ioepoch, int enable)
{
- struct md_attr *ma = &info->mti_attr;
- int rc;
+ struct lustre_mdt_attrs *lma;
+ struct md_attr *ma = &info->mti_attr;
+ struct lu_buf *buf = &info->mti_buf;
+ struct md_object *next = mdt_object_child(obj);
+ struct mdt_device *mdt = info->mti_mdt;
+ struct lu_attr *la = &ma->ma_attr;
+ int rc;
ENTRY;
CDEBUG(D_INODE, "Size-on-MDS attribute %s for epoch "LPU64
" on "DFID".\n", enable ? "update" : "disabling",
ioepoch, PFID(mdt_object_fid(obj)));
- ma->ma_valid |= MA_SOM;
- ma->ma_som = &info->mti_u.som.data;
- if (enable) {
- struct mdt_device *mdt = info->mti_mdt;
- struct lu_attr *la = &ma->ma_attr;
-
- ma->ma_som->msd_ioepoch = ioepoch;
- ma->ma_som->msd_size = la->la_valid & LA_SIZE ? la->la_size : 0;
- ma->ma_som->msd_blocks = la->la_valid & LA_BLOCKS ?
- la->la_blocks : 0;
- ma->ma_som->msd_mountid = mdt->mdt_lut.lut_obd->u.obt.obt_mount_count;
- ma->ma_attr.la_valid &= LA_ATIME | LA_MTIME | LA_CTIME;
- } else {
- ma->ma_som->msd_ioepoch = IOEPOCH_INVAL;
- ma->ma_attr.la_valid &= LA_ATIME;
- }
+ lma = (struct lustre_mdt_attrs *) info->mti_xattr_buf;
+ CLASSERT(sizeof(info->mti_xattr_buf) >= sizeof(*lma));
+
+ buf->lb_buf = lma;
+ buf->lb_len = sizeof(info->mti_xattr_buf);
+ rc = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_LMA);
+ if (rc > 0) {
+ lustre_lma_swab(lma);
+ } else if (rc == -ENODATA) {
+ memset(lma, 0, sizeof(*lma));
+ } else {
+ RETURN(rc);
+ }
+
+ /* Copy FID */
+ memcpy(&lma->lma_self_fid, mdt_object_fid(obj), sizeof(lma->lma_self_fid));
+
+ /* Copy SOM data */
+ lma->lma_ioepoch = ioepoch;
+ lma->lma_som_size = la->la_valid & LA_SIZE ? la->la_size : 0;
+ lma->lma_som_blocks = la->la_valid & LA_BLOCKS ? la->la_blocks : 0;
+ lma->lma_som_mountid = mdt->mdt_lut.lut_obd->u.obt.obt_mount_count;
+ if (enable)
+ lma->lma_compat |= LMAC_SOM;
+ else
+ lma->lma_compat &= ~LMAC_SOM;
- /* Since we have opened the file, it is unnecessary
- * to check permission when close it. Between the "open"
- * and "close", maybe someone has changed the file mode
- * or flags, or the file created mode do not permit wirte,
- * and so on. Just set MDS_PERM_BYPASS for all the cases. */
- ma->ma_attr_flags |= MDS_PERM_BYPASS | MDS_SOM;
+ rc = mo_xattr_set(info->mti_env, next, buf, XATTR_NAME_LMA, 0);
- rc = mdt_attr_set(info, obj, ma, 0);
RETURN(rc);
}
struct obd_device *obd,
struct lustre_sb_info *lsi)
{
- struct lu_fid fid;
- struct dt_object *o;
int rc = 0;
ENTRY;
dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
rc = mdt_server_data_init(env, mdt, lsi);
- if (rc)
- RETURN(rc);
- o = dt_store_open(env, mdt->mdt_bottom, "", CAPA_KEYS, &fid);
- if (!IS_ERR(o)) {
- mdt->mdt_ck_obj = o;
- rc = mdt_capa_keys_init(env, mdt);
- if (rc)
- GOTO(put_ck_object, rc);
- } else {
- rc = PTR_ERR(o);
- CERROR("cannot open %s: rc = %d\n", CAPA_KEYS, rc);
- GOTO(disconnect_exports, rc);
- }
- RETURN(0);
-
-put_ck_object:
- lu_object_put(env, &o->do_lu);
- mdt->mdt_ck_obj = NULL;
-disconnect_exports:
- class_disconnect_exports(obd);
- return rc;
+ RETURN(rc);
}
void mdt_fs_cleanup(const struct lu_env *env, struct mdt_device *mdt)
static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
struct md_attr *ma)
{
- ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
- ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
- &RMF_MDT_MD, RCL_SERVER);
-
- ma->ma_cookie = req_capsule_server_get(info->mti_pill,
- &RMF_LOGCOOKIES);
- ma->ma_cookie_size = req_capsule_get_size(info->mti_pill,
- &RMF_LOGCOOKIES,
- RCL_SERVER);
-
- ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
+ ma->ma_need = MA_INODE;
ma->ma_valid = 0;
}
cfs_list_del_init(&mfd->mfd_list);
cfs_spin_unlock(&med->med_open_lock);
- /* Close the found mfd, update attributes. */
- ma->ma_lmm_size = info->mti_mdt->mdt_max_mdsize;
- OBD_ALLOC_LARGE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
- if (ma->ma_lmm == NULL)
- GOTO(out_put, rc = -ENOMEM);
-
mdt_mfd_close(info, mfd);
-
- OBD_FREE_LARGE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize);
- } else {
+ } else if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
+ LASSERT((ma->ma_valid & MA_LOV) == 0);
rc = mdt_attr_set(info, mo, ma, rr->rr_flags);
if (rc)
GOTO(out_put, rc);
- }
+ } else if ((ma->ma_valid & MA_LOV) && (ma->ma_valid & MA_INODE)) {
+ struct lu_buf *buf = &info->mti_buf;
+ LASSERT(ma->ma_attr.la_valid == 0);
+ buf->lb_buf = ma->ma_lmm;
+ buf->lb_len = ma->ma_lmm_size;
+ rc = mo_xattr_set(info->mti_env, mdt_object_child(mo),
+ buf, XATTR_NAME_LOV, 0);
+ if (rc)
+ GOTO(out_put, rc);
+ } else
+ LBUG();
ma->ma_need = MA_INODE;
ma->ma_valid = 0;
GOTO(out_unlock_parent, rc);
mdt_reint_init_ma(info, ma);
- if (!ma->ma_lmm || !ma->ma_cookie)
- GOTO(out_unlock_parent, rc = -EINVAL);
if (info->mti_cross_ref) {
/*
GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
mdt_reint_init_ma(info, ma);
- if (!ma->ma_lmm || !ma->ma_cookie)
- GOTO(out_unlock_tgt, rc = -EINVAL);
rc = mdo_rename_tgt(info->mti_env, mdt_object_child(mtgtdir),
mdt_object_child(mtgt), rr->rr_fid2,
mdt_handle_last_unlink(info, mtgt, ma);
EXIT;
-out_unlock_tgt:
+
if (mtgt)
mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
out_unlock_tgtdir:
/* step 5: rename it */
mdt_reint_init_ma(info, ma);
- if (!ma->ma_lmm || !ma->ma_cookie)
- GOTO(out_unlock_new, rc = -EINVAL);
mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
OBD_FAIL_MDS_REINT_RENAME_WRITE);
RETURN(rc);
}
+EXPORT_SYMBOL(cat_cancel_cb);
/* helper to initialize catalog llog and process it to cancel */
int llog_cat_init_and_process(const struct lu_env *env,
}
if (strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) &&
strcmp(obd->obd_type->typ_name, LUSTRE_OSC_NAME) &&
+ strcmp(obd->obd_type->typ_name, LUSTRE_OSP_NAME) &&
strcmp(obd->obd_type->typ_name, LUSTRE_MGC_NAME)) {
CERROR("can't add connection on non-client dev\n");
RETURN(-EINVAL);
}
}
+#if defined(HAVE_SERVER_SUPPORT) && defined(__KERNEL__)
+ /* newer MDS replaces LOV/OSC with LOD/OSP */
+ {
+ char *typename = lustre_cfg_string(lcfg, 1);
+
+ if ((lcfg->lcfg_command == LCFG_ATTACH && typename &&
+ strcmp(typename, LUSTRE_LOV_NAME) == 0) &&
+ IS_MDT(s2lsi(clli->cfg_sb))) {
+ CDEBUG(D_CONFIG,
+ "For 2.x interoperability, rename obd "
+ "type from lov to lod (%s)\n",
+ s2lsi(clli->cfg_sb)->lsi_svname);
+ strcpy(typename, LUSTRE_LOD_NAME);
+ }
+ if ((lcfg->lcfg_command == LCFG_ATTACH && typename &&
+ strcmp(typename, LUSTRE_OSC_NAME) == 0) &&
+ IS_MDT(s2lsi(clli->cfg_sb))) {
+ CDEBUG(D_CONFIG,
+ "For 2.x interoperability, rename obd "
+ "type from osc to osp (%s)\n",
+ s2lsi(clli->cfg_sb)->lsi_svname);
+ strcpy(typename, LUSTRE_OSP_NAME);
+ }
+ }
+#endif
+
if ((clli->cfg_flags & CFG_F_EXCLUDE) &&
(lcfg->lcfg_command == LCFG_LOV_ADD_OBD))
/* Add inactive instead */
struct obd_device *obd;
struct lustre_sb_info *lsi = s2lsi(sb);
struct config_llog_instance cfg;
+ struct lu_env env;
+ struct lu_device *dev;
int rc;
ENTRY;
RETURN(-ENXIO);
}
- if ((lsi->lsi_lmd->lmd_flags & LMD_FLG_ABORT_RECOV) &&
- (OBP(obd, iocontrol))) {
- obd_iocontrol(OBD_IOC_ABORT_RECOVERY,
- obd->obd_self_export, 0, NULL, NULL);
- }
-
server_notify_target(sb, obd);
/* calculate recovery timeout, do it after lustre_process_log */
/* log has been fully processed */
obd_notify(obd, NULL, OBD_NOTIFY_CONFIG, (void *)CONFIG_LOG);
+
+ /* log has been fully processed, let clients connect */
+ dev = obd->obd_lu_dev;
+ if (dev && dev->ld_ops->ldo_prepare) {
+ rc = lu_env_init(&env, dev->ld_type->ldt_ctx_tags);
+ if (rc == 0) {
+ struct lu_context session_ctx;
+
+ lu_context_init(&session_ctx, LCT_SESSION);
+ session_ctx.lc_thread = NULL;
+ lu_context_enter(&session_ctx);
+ env.le_ses = &session_ctx;
+
+ dev->ld_ops->ldo_prepare(&env, NULL, dev);
+
+ lu_env_fini(&env);
+ lu_context_exit(&session_ctx);
+ lu_context_fini(&session_ctx);
+ }
+ }
+
+ /* abort recovery only on the complete stack:
+ * many devices can be involved */
+ if ((lsi->lsi_lmd->lmd_flags & LMD_FLG_ABORT_RECOV) &&
+ (OBP(obd, iocontrol))) {
+ obd_iocontrol(OBD_IOC_ABORT_RECOVERY,
+ obd->obd_self_export, 0, NULL, NULL);
+ }
}
RETURN(rc);
lum->lmm_stripe_offset = stripe_offset;
lum->lmm_pattern = 0;
spec->u.sp_ea.eadata = lum;
+ spec->u.sp_ea.eadatalen = sizeof(*lum);
spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
}
}
{ FID_SEQ_SRV_OID, "" /* "seq_srv" */ },
{ MDD_ROOT_INDEX_OID, "" /* "ROOT" */ },
{ MDD_ORPHAN_OID, "" /* "PENDING" */ },
- { MDD_LOV_OBJ_OID, "" /* LOV_OBJID */ },
+ { MDD_LOV_OBJ_OID, LOV_OBJID },
{ MDD_CAPA_KEYS_OID, "" /* CAPA_KEYS */ },
{ MDT_LAST_RECV_OID, LAST_RCVD },
{ LFSCK_BOOKMARK_OID, "" /* "lfsck_bookmark" */ },
{ OTABLE_IT_OID, "" /* "otable iterator" */},
{ OFD_LAST_RECV_OID, "" /* LAST_RCVD */ },
{ OFD_LAST_GROUP_OID, "LAST_GROUP" },
- { LLOG_CATALOGS_OID, "" /* "CATALOGS" */ },
+ { LLOG_CATALOGS_OID, "CATALOGS" },
{ MGS_CONFIGS_OID, "" /* MOUNT_CONFIGS_DIR */ },
{ OFD_HEALTH_CHECK_OID, HEALTH_CHECK },
{ 0, NULL }
int result = 0;
ENTRY;
-#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 55, 0)
- /* Unfortunately, the current MDD implementation relies on some specific
- * code to be executed in the OSD layer. Since OFD now also uses the OSD
- * module, we need a way to skip the metadata-specific code when running
- * with OFD.
- * The hack here is to check the type of the parent device which is
- * either MD (i.e. MDD device) with the current MDT stack or DT (i.e.
- * OFD device) on an OST. As a reminder, obdfilter does not use the OSD
- * layer and still relies on lvfs. This hack won't work any more when
- * LOD is landed since LOD is of DT type.
- * This code should be removed once the orion MDT changes (LOD/OSP, ...)
- * have been landed */
- osd->od_is_md = lu_device_is_md(pdev);
-#else
-#warning "all is_md checks must be removed from osd-ldiskfs"
-#endif
-
- if (osd->od_is_md) {
- /* 1. setup local objects */
+ if (dev->ld_site && lu_device_is_md(dev->ld_site->ls_top_dev)) {
+ /* MDT/MDD still use old infrastructure to create
+ * special files */
result = llo_local_objects_setup(env, lu2md_dev(pdev),
lu2dt_dev(dev));
if (result)
lprocfs_osp_init_vars(&lvars);
LASSERT(d->opd_obd);
- rc = class_process_proc_param(PARAM_OSP, lvars.obd_vars,
+ rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
lcfg, d->opd_obd);
if (rc > 0)
rc = 0;
*
* 2) send synchronous truncate RPC with just assigned id
*/
- LASSERT(attr);
+
+ /* there are few places in MDD code still passing NULL
+ * XXX: to be fixed soon */
+ if (attr == NULL)
+ RETURN(0);
+
if (attr->la_valid & LA_SIZE && attr->la_size > 0) {
LASSERT(!dt_object_exists(dt));
osp_object_assign_id(env, d, o);
/*
* There can be gaps in precreated ids and record to unlink llog
+ * XXX: we do not handle gaps yet, implemented before solution
+ * was found to be racy, so we disabled that. there is no
+ * point in making useless but expensive llog declaration.
*/
- rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
+ /* rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); */
if (unlikely(!fid_is_zero(fid))) {
/* replay case: caller knows fid */
RETURN(rc);
}
+
+static int osp_get_lastid_from_ost(struct osp_device *d)
+{
+ struct ptlrpc_request *req;
+ struct obd_import *imp;
+ obd_id *reply;
+ char *tmp;
+ int rc;
+
+ imp = d->opd_obd->u.cli.cl_import;
+ LASSERT(imp);
+
+ req = ptlrpc_request_alloc(imp, &RQF_OST_GET_INFO_LAST_ID);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
+ RCL_CLIENT, sizeof(KEY_LAST_ID));
+ rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
+ if (rc) {
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
+
+ tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
+ memcpy(tmp, KEY_LAST_ID, sizeof(KEY_LAST_ID));
+
+ req->rq_no_delay = req->rq_no_resend = 1;
+ ptlrpc_request_set_replen(req);
+ rc = ptlrpc_queue_wait(req);
+ if (rc) {
+ /* bad-bad OST.. let sysadm sort this out */
+ ptlrpc_set_import_active(imp, 0);
+ GOTO(out, rc);
+ }
+
+ reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
+ if (reply == NULL)
+ GOTO(out, rc = -EPROTO);
+
+ d->opd_last_used_id = *reply;
+ CDEBUG(D_HA, "%s: got last_id "LPU64" from OST\n",
+ d->opd_obd->obd_name, d->opd_last_used_id);
+
+out:
+ ptlrpc_req_finished(req);
+ RETURN(rc);
+
+}
+
/**
* asks OST to clean precreate orphans
* and gets next id for new objects
LASSERT(d->opd_recovery_completed);
LASSERT(d->opd_pre_reserved == 0);
+ CDEBUG(D_HA, "%s: going to cleanup orphans since "LPU64"\n",
+ d->opd_obd->obd_name, d->opd_last_used_id);
+
+ if (d->opd_last_used_id < 2) {
+ /* lastid looks strange... ask OST */
+ rc = osp_get_lastid_from_ost(d);
+ if (rc)
+ GOTO(out, rc);
+ }
+
imp = d->opd_obd->u.cli.cl_import;
LASSERT(imp);
req = ptlrpc_request_alloc(imp, &RQF_OST_CREATE);
if (req == NULL)
- RETURN(-ENOMEM);
+ GOTO(out, rc = -ENOMEM);
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
if (rc) {
ptlrpc_request_free(req);
- RETURN(rc);
+ GOTO(out, rc);
}
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL)
- GOTO(out_req, rc = -EPROTO);
+ GOTO(out, rc = -EPROTO);
body->oa.o_flags = OBD_FL_DELORPHAN;
body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
req->rq_no_resend = req->rq_no_delay = 1;
rc = ptlrpc_queue_wait(req);
- if (rc) {
- ptlrpc_set_import_active(imp, 0);
- GOTO(out_req, rc);
- }
+ if (rc)
+ GOTO(out, rc);
body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL)
- GOTO(out_req, rc = -EPROTO);
+ GOTO(out, rc = -EPROTO);
/*
* OST provides us with id new pool starts from in body->oa.o_id
d->opd_pre_grow_slow = 0;
cfs_spin_unlock(&d->opd_pre_lock);
- /* now we can wakeup all users awaiting for objects */
- osp_pre_update_status(d, rc);
- cfs_waitq_signal(&d->opd_pre_user_waitq);
-
CDEBUG(D_HA, "Got last_id "LPU64" from OST, last_used is "LPU64
", next "LPU64"\n", body->oa.o_id,
le64_to_cpu(d->opd_last_used_id), d->opd_pre_next);
-out_req:
- ptlrpc_req_finished(req);
+out:
+ if (req)
+ ptlrpc_req_finished(req);
+
RETURN(rc);
}
if (rc) {
CERROR("%s: cannot cleanup orphans: rc = %d\n",
d->opd_obd->obd_name, rc);
+ /* we can't proceed from here, OST seem to
+ * be in a bad shape, better to wait for
+ * a new instance of the server and repeat
+ * from the beginning. notify possible waiters
+ * this OSP isn't quite functional yet */
+ osp_pre_update_status(d, rc);
+ cfs_waitq_signal(&d->opd_pre_user_waitq);
+ l_wait_event(d->opd_pre_waitq,
+ !osp_precreate_running(d) ||
+ d->opd_new_connection, &lwi);
+ continue;
+
}
}
struct l_wait_info lwi;
cfs_time_t expire = cfs_time_shift(obd_timeout);
int precreated, rc;
+ int count = 0;
ENTRY;
break;
}
+#if LUSTRE_VERSION_CODE >= OBD_OCD_VERSION(2, 3, 90, 0)
+#error "remove this before the release"
+#endif
+ /*
+ * to address Andreas's concern on possible busy-loop
+ * between this thread and osp_precreate_send()
+ */
+ LASSERT(count++ < 1000);
+
/*
* increase number of precreations
*/
[ "$ALWAYS_EXCEPT$EXCEPT" ] && \
echo "Skipping tests: `echo $ALWAYS_EXCEPT $EXCEPT`"
+# disable till changelogs from orion landed (LU-2034)
+ALWAYS_EXCEPT="1 2 3 4 5 6 7 8 9"
+
KILL=/bin/kill
TMP=${TMP:-/tmp}
set -e
-# bug 5494 5493
-ALWAYS_EXCEPT="24 52 $RECOVERY_SMALL_EXCEPT"
+# bug 5494 5493 LU2034
+ALWAYS_EXCEPT="24 52 60 $RECOVERY_SMALL_EXCEPT"
export MULTIOP=${MULTIOP:-multiop}
PTLDEBUG=${PTLDEBUG:--1}
ALWAYS_EXCEPT=" 27u 42a 42b 42c 42d 45 51d 68b $SANITY_EXCEPT"
# UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT!
+# with LOD/OSP landing
+# bug number for skipped tests: LU2036 LU2034
+ALWAYS_EXCEPT=" 76 160 $ALWAYS_EXCEPT"
+
+
# Tests that fail on uml
CPU=`awk '/model/ {print $4}' /proc/cpuinfo`
# buffer i/o errs sock spc runas
load_module osd-ldiskfs/osd_ldiskfs
fi
load_module mdt/mdt
- load_module cmm/cmm
load_module ost/ost
load_module lod/lod
+ load_module osp/osp
if [ "$USE_OFD" == yes ]; then
load_module ofd/ofd
else