*/
unsigned long mdt_num_threads;
-static int mdt_handle(struct ptlrpc_request *req);
-static struct ptlrpc_thread_key mdt_thread_key;
+static int mdt_handle (struct ptlrpc_request *req);
+static struct mdt_device *mdt_dev (struct lu_device *d);
+static struct lu_fid *mdt_object_fid(struct mdt_object *o);
-static int mdt_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
- struct lu_fid *pfid, const char *name, struct lu_fid *cfid)
+static struct lu_context_key mdt_thread_key;
+
+/* object operations */
+static int mdt_md_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
+ struct lu_fid *pfid, const char *name,
+ struct lu_fid *cfid)
{
struct mdt_object *o;
struct mdt_object *child;
lh = &info->mti_lh[MDT_LH_PARENT];
lh->mlh_mode = LCK_PW;
- o = mdt_object_find_lock(d, pfid, lh, MDS_INODELOCK_UPDATE);
+ o = mdt_object_find_lock(info->mti_ctxt,
+ d, pfid, lh, MDS_INODELOCK_UPDATE);
if (IS_ERR(o))
return PTR_ERR(o);
- child = mdt_object_find(d, cfid);
+ child = mdt_object_find(info->mti_ctxt, d, cfid);
if (!IS_ERR(child)) {
- result = mdt_child_ops(d)->mdo_mkdir(mdt_object_child(o), name,
- mdt_object_child(child));
- mdt_object_put(child);
- } else
- result = PTR_ERR(child);
- mdt_object_unlock(d->mdt_namespace, o, lh);
- mdt_object_put(o);
- return result;
-}
-#if 0
-static int mdt_md_getattr(struct mdt_thread_info *info, struct lu_fid *fid,
- struct md_object_attr *attr)
-{
- struct mdt_device *d = info->mti_mdt;
- struct mdt_object *o;
- struct iattr
- int result;
+ struct md_object *next = mdt_object_child(o);
- o = mdt_object_find(d, fid);
- if (IS_ERR(o))
- return PTR_ERR(o);
-
- result = mdt_child_ops(d)->mdo_attr_get(mdt_object_child(o), name,
- mdt_object_child(child));
- mdt_object_put(child);
+ result = next->mo_ops->moo_mkdir(info->mti_ctxt, next, name,
+ mdt_object_child(child));
+ mdt_object_put(info->mti_ctxt, child);
} else
result = PTR_ERR(child);
mdt_object_unlock(d->mdt_namespace, o, lh);
- mdt_object_put(o);
+ mdt_object_put(info->mti_ctxt, o);
return result;
}
-#endif
+
static int mdt_getstatus(struct mdt_thread_info *info,
struct ptlrpc_request *req, int offset)
{
- struct md_device *mdd = info->mti_mdt->mdt_child;
+ struct md_device *next = info->mti_mdt->mdt_child;
struct mdt_body *body;
int size = sizeof *body;
int result;
result = -ENOMEM;
else {
body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
- result = mdd->md_ops->mdo_root_get(mdd, &body->fid1);
+ result = next->md_ops->mdo_root_get(info->mti_ctxt,
+ next, &body->fid1);
}
/* the last_committed and last_xid fields are filled in for all
static int mdt_statfs(struct mdt_thread_info *info,
struct ptlrpc_request *req, int offset)
{
- struct md_device *child = info->mti_mdt->mdt_child;
+ struct md_device *next = info->mti_mdt->mdt_child;
struct obd_statfs *osfs;
struct kstatfs sfs;
int result;
int size = sizeof(struct obd_statfs);
-
+
ENTRY;
result = lustre_pack_reply(req, 1, &size, NULL);
} else {
osfs = lustre_msg_buf(req->rq_repmsg, 0, size);
/* XXX max_age optimisation is needed here. See mds_statfs */
- result = child->md_ops->mdo_statfs(child, &sfs);
+ result = next->md_ops->mdo_statfs(info->mti_ctxt, next, &sfs);
statfs_pack(osfs, &sfs);
}
-out:
+
RETURN(result);
}
-#if 0
+
+static void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr)
+{
+ b->valid |= OBD_MD_FLID | OBD_MD_FLCTIME | OBD_MD_FLUID |
+ OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLTYPE |
+ OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLGENER;
+
+ if (!S_ISREG(attr->la_mode))
+ b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME |
+ OBD_MD_FLMTIME;
+
+ b->atime = attr->la_atime;
+ b->mtime = attr->la_mtime;
+ b->ctime = attr->la_ctime;
+ b->mode = attr->la_mode;
+ b->size = attr->la_size;
+ b->blocks = attr->la_blocks;
+ b->uid = attr->la_uid;
+ b->gid = attr->la_gid;
+ b->flags = attr->la_flags;
+ b->nlink = attr->la_nlink;
+}
+
static int mdt_getattr(struct mdt_thread_info *info,
struct ptlrpc_request *req, int offset)
{
- struct mdt_body *body;
- int size = sizeof (*body);
- struct md_obj_attr attr;
- int result;
-
+ struct mdt_body *body;
+ int size = sizeof (*body);
+ int result;
+
+ LASSERT(info->mti_object != NULL);
+
ENTRY;
-
+
result = lustre_pack_reply(req, 1, &size, NULL);
if (result)
- CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
- size);
+ CERROR(LUSTRE_MDT0_NAME" cannot pack size=%d, rc=%d\n",
+ size, result);
else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
result = -ENOMEM;
} else {
- body = lustre_swab_reqbuf(req, offset, size,
- lustre_swab_mdt_body);
- result = mdt_md_getattr(info, body->fid1);
+ struct md_object *next = mdt_object_child(info->mti_object);
+
+ result = next->mo_ops->moo_attr_get(info->mti_ctxt, next,
+ &info->mti_ctxt->lc_attr);
+ if (result == 0) {
+ body = lustre_msg_buf(req->rq_repmsg, 0, size);
+ mdt_pack_attr2body(body, &info->mti_ctxt->lc_attr);
+ body->fid1 = *mdt_object_fid(info->mti_object);
+ }
}
-out:
RETURN(result);
}
-#else
-static int mdt_getattr(struct mdt_thread_info *info,
- struct ptlrpc_request *req, int offset)
+
+static struct lu_device_operations mdt_lu_ops;
+
+static int lu_device_is_mdt(struct lu_device *d)
{
- return -EOPNOTSUPP;
+ /*
+ * XXX for now. Tags in lu_device_type->ldt_something are needed.
+ */
+ return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
}
-#endif
+
+static struct mdt_device *mdt_dev(struct lu_device *d)
+{
+ LASSERT(lu_device_is_mdt(d));
+ return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
+}
+
static int mdt_connect(struct mdt_thread_info *info,
struct ptlrpc_request *req, int offset)
{
- return target_handle_connect(req, mdt_handle);
+ int result;
+
+ result = target_handle_connect(req, mdt_handle);
+ if (result == 0) {
+ struct obd_connect_data *data;
+
+ LASSERT(req->rq_export != NULL);
+ info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
+
+ data = lustre_msg_buf(req->rq_repmsg, 0, sizeof *data);
+ result = seq_mgr_alloc(info->mti_ctxt,
+ info->mti_mdt->mdt_seq_mgr,
+ &data->ocd_seq);
+ }
+ return result;
}
static int mdt_disconnect(struct mdt_thread_info *info,
struct ptlrpc_request *req, int offset)
{
- return -EOPNOTSUPP;
+ //return -EOPNOTSUPP;
+ return target_handle_disconnect(req);
}
static int mdt_getattr_name(struct mdt_thread_info *info,
}
static int mdt_readpage(struct mdt_thread_info *info,
- struct ptlrpc_request *req, int offset)
+ struct ptlrpc_request *req, int offset)
{
return -EOPNOTSUPP;
}
return -EOPNOTSUPP;
}
-static int mdt_set_info(struct mdt_thread_info *info,
- struct ptlrpc_request *req, int offset)
-{
- return -EOPNOTSUPP;
-}
-
static int mdt_handle_quotacheck(struct mdt_thread_info *info,
struct ptlrpc_request *req, int offset)
{
* info->mti_dlm_req already contains swapped and (if necessary)
* converted dlm request.
*/
- LASSERT(info->mti_dlm_req);
+ LASSERT(info->mti_dlm_req != NULL);
info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
return ldlm_handle_enqueue0(req, info->mti_dlm_req, &cbs);
EXIT;
}
-static struct lu_device_operations mdt_lu_ops;
-
-static int lu_device_is_mdt(struct lu_device *d)
-{
- /*
- * XXX for now. Tags in lu_device_type->ldt_something are needed.
- */
- return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
-}
-
static struct mdt_object *mdt_obj(struct lu_object *o)
{
LASSERT(lu_device_is_mdt(o->lo_dev));
return container_of(o, struct mdt_object, mot_obj.mo_lu);
}
-static struct mdt_object *mdt_object_find(struct mdt_device *d,
- struct lu_fid *f)
+struct mdt_object *mdt_object_find(struct lu_context *ctxt,
+ struct mdt_device *d,
+ struct lu_fid *f)
{
struct lu_object *o;
- o = lu_object_find(d->mdt_md_dev.md_lu_dev.ld_site, f);
+ o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
if (IS_ERR(o))
return (struct mdt_object *)o;
else
return mdt_obj(o);
}
-static void mdt_object_put(struct mdt_object *o)
+void mdt_object_put(struct lu_context *ctxt, struct mdt_object *o)
{
- lu_object_put(&o->mot_obj.mo_lu);
+ lu_object_put(ctxt, &o->mot_obj.mo_lu);
}
static struct lu_fid *mdt_object_fid(struct mdt_object *o)
return lu_object_fid(&o->mot_obj.mo_lu);
}
-static int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 ibits)
+int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
+ struct mdt_lock_handle *lh, __u64 ibits)
{
ldlm_policy_data_t p = {
.l_inodebits = {
return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
}
-static void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
+void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
struct mdt_lock_handle *lh)
{
if (lustre_handle_is_used(&lh->mlh_lh)) {
}
}
-static struct mdt_object *mdt_object_find_lock(struct mdt_device *d,
- struct lu_fid *f,
- struct mdt_lock_handle *lh,
- __u64 ibits)
+struct mdt_object *mdt_object_find_lock(struct lu_context *ctxt,
+ struct mdt_device *d,
+ struct lu_fid *f,
+ struct mdt_lock_handle *lh,
+ __u64 ibits)
{
struct mdt_object *o;
- o = mdt_object_find(d, f);
+ o = mdt_object_find(ctxt, d, f);
if (!IS_ERR(o)) {
int result;
result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
if (result != 0) {
- mdt_object_put(o);
+ mdt_object_put(ctxt, o);
o = ERR_PTR(result);
}
}
{
int result;
int off;
- int lock_conv;
ENTRY;
OBD_FAIL_RETURN(h->mh_fail_id, 0);
off = MDS_REQ_REC_OFF + shift;
- lock_conv =
- h->mh_flags & HABEO_CLAVIS &&
- info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME;
result = 0;
if (h->mh_flags & HABEO_CORPUS) {
lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
lustre_swab_mdt_body);
if (body != NULL) {
- info->mti_object = mdt_object_find(info->mti_mdt,
+ info->mti_object = mdt_object_find(info->mti_ctxt,
+ info->mti_mdt,
&body->fid1);
- if (IS_ERR(info->mti_object))
+ if (IS_ERR(info->mti_object)) {
result = PTR_ERR(info->mti_object);
+ info->mti_object = NULL;
+ }
} else {
CERROR("Can't unpack body\n");
result = -EFAULT;
}
- } else if (lock_conv) {
+ } else if (h->mh_flags & HABEO_CLAVIS) {
struct ldlm_request *dlm;
LASSERT(shift == 0);
lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
sizeof *dlm,
lustre_swab_ldlm_request);
- if (dlm != NULL)
- result = mdt_lock_resname_compat(info->mti_mdt, dlm);
- else {
+ if (dlm != NULL) {
+ if (info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME)
+ result = mdt_lock_resname_compat(info->mti_mdt,
+ dlm);
+ } else {
CERROR("Can't unpack dlm request\n");
result = -EFAULT;
}
LASSERT(current->journal_info == NULL);
- if (lock_conv) {
+ if (h->mh_flags & HABEO_CLAVIS &&
+ info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME) {
struct ldlm_reply *rep;
rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep);
{
int i;
- memset(info, 0, sizeof *info);
info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
/*
* Poison size array.
info->mti_rep_buf_nr = i;
for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
mdt_lock_handle_init(&info->mti_lh[i]);
+ lu_context_enter(info->mti_ctxt);
}
static void mdt_thread_info_fini(struct mdt_thread_info *info)
{
int i;
+ lu_context_exit(info->mti_ctxt);
if (info->mti_object != NULL) {
- mdt_object_put(info->mti_object);
+ mdt_object_put(info->mti_ctxt, info->mti_object);
info->mti_object = NULL;
}
for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
RETURN(result);
}
-static struct mdt_device *mdt_dev(struct lu_device *d)
-{
- LASSERT(lu_device_is_mdt(d));
- return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
-}
-
static int mdt_handle(struct ptlrpc_request *req)
{
int result;
-
- struct mdt_thread_info *info = ptlrpc_thread_key_get(req->rq_svc_thread,
- &mdt_thread_key);
+ struct lu_context *ctx;
+ struct mdt_thread_info *info;
ENTRY;
+ ctx = req->rq_svc_thread->t_ctx;
+ LASSERT(ctx != NULL);
+ LASSERT(ctx->lc_thread == req->rq_svc_thread);
+
+ info = lu_context_key_get(ctx, &mdt_thread_key);
+ LASSERT(info != NULL);
+
mdt_thread_info_init(info);
/* it can be NULL while CONNECT */
if (req->rq_export)
prntfn, c->psc_num_threads);
}
+static int mdt_config(struct lu_context *ctx, struct mdt_device *m,
+ const char *name, void *buf, int size, int mode)
+{
+ struct md_device *child = m->mdt_child;
+ ENTRY;
+ RETURN(child->md_ops->mdo_config(ctx, child, name, buf, size, mode));
+}
+
+static int mdt_seq_mgr_hpr(struct lu_context *ctx, void *opaque, __u64 *seq,
+ int mode)
+{
+ struct mdt_device *m = opaque;
+ int rc;
+ ENTRY;
+
+ rc = mdt_config(ctx, m, LUSTRE_CONFIG_METASEQ,
+ seq, sizeof(*seq),
+ mode);
+ RETURN(rc);
+}
+
+static int mdt_seq_mgr_read(struct lu_context *ctx, void *opaque, __u64 *seq)
+{
+ ENTRY;
+ RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_GET));
+}
+
+static int mdt_seq_mgr_write(struct lu_context *ctx, void *opaque, __u64 *seq)
+{
+ ENTRY;
+ RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_SET));
+}
+
+struct lu_seq_mgr_ops seq_mgr_ops = {
+ .smo_read = mdt_seq_mgr_read,
+ .smo_write = mdt_seq_mgr_write
+};
+
+/* device init/fini methods */
+
+static int mdt_fld(struct mdt_thread_info *info,
+ struct ptlrpc_request *req, int offset)
+{
+ struct lu_site *ls = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
+ struct md_fld mf, *p, *reply;
+ int size = sizeof(*reply);
+ __u32 *opt;
+ int rc;
+ ENTRY;
+
+ rc = lustre_pack_reply(req, 1, &size, NULL);
+ if (rc)
+ RETURN(rc);
+
+ opt = lustre_swab_reqbuf(req, 0, sizeof(*opt), lustre_swab_generic_32s);
+ p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
+ mf = *p;
+
+ rc = fld_handle(ls->ls_fld, *opt, &mf);
+ if (rc)
+ RETURN(rc);
+
+ reply = lustre_msg_buf(req->rq_repmsg, 0, size);
+ *reply = mf;
+ RETURN(rc);
+}
+
+struct dt_device *md2_bottom_dev(struct mdt_device *m)
+{
+ /*FIXME: get dt device here*/
+ RETURN (NULL);
+}
+
+static int mdt_fld_init(struct mdt_device *m)
+{
+ struct dt_device *dt;
+ struct lu_site *ls;
+ int rc;
+ ENTRY;
+
+ dt = md2_bottom_dev(m);
+
+ ls = m->mdt_md_dev.md_lu_dev.ld_site;
+
+ OBD_ALLOC_PTR(ls->ls_fld);
+
+ if (!ls->ls_fld)
+ RETURN(-ENOMEM);
+
+ rc = fld_server_init(ls->ls_fld, dt);
+
+ RETURN(rc);
+}
+
+static int mdt_fld_fini(struct mdt_device *m)
+{
+ struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
+ int rc = 0;
+
+ if (ls && ls->ls_fld) {
+ fld_server_fini(ls->ls_fld);
+ OBD_FREE_PTR(ls->ls_fld);
+ }
+ RETURN(rc);
+}
+
+static void mdt_stop_ptlrpc_service(struct mdt_device *m)
+{
+ if (m->mdt_service != NULL) {
+ ptlrpc_unregister_service(m->mdt_service);
+ m->mdt_service = NULL;
+ }
+ if (m->mdt_fld_service != NULL) {
+ ptlrpc_unregister_service(m->mdt_fld_service);
+ m->mdt_fld_service = NULL;
+ }
+}
+
+static int mdt_start_ptlrpc_service(struct mdt_device *m)
+{
+ int rc;
+ ENTRY;
+
+ m->mdt_service_conf.psc_nbufs = MDS_NBUFS;
+ m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE;
+ m->mdt_service_conf.psc_max_req_size = MDS_MAXREQSIZE;
+ m->mdt_service_conf.psc_max_reply_size = MDS_MAXREPSIZE;
+ m->mdt_service_conf.psc_req_portal = MDS_REQUEST_PORTAL;
+ m->mdt_service_conf.psc_rep_portal = MDC_REPLY_PORTAL;
+ m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
+ /*
+ * We'd like to have a mechanism to set this on a per-device basis,
+ * but alas...
+ */
+ m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
+ MDT_MIN_THREADS),
+ MDT_MAX_THREADS);
+
+ ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
+ "mdt_ldlm_client", &m->mdt_ldlm_client);
+
+ m->mdt_service =
+ ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
+ LUSTRE_MDT0_NAME,
+ m->mdt_md_dev.md_lu_dev.ld_proc_entry,
+ NULL);
+ if (m->mdt_service == NULL)
+ RETURN(-ENOMEM);
+
+ rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
+ if (rc)
+ GOTO(err_mdt_svc, rc);
+
+ /*start mdt fld service */
+
+ m->mdt_service_conf.psc_req_portal = MDS_FLD_PORTAL;
+
+ m->mdt_fld_service =
+ ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
+ LUSTRE_FLD0_NAME,
+ m->mdt_md_dev.md_lu_dev.ld_proc_entry,
+ NULL);
+ if (m->mdt_fld_service == NULL)
+ RETURN(-ENOMEM);
+
+ rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, LUSTRE_FLD0_NAME);
+ if (rc)
+ GOTO(err_fld_svc, rc);
+
+ RETURN(rc);
+err_fld_svc:
+ ptlrpc_unregister_service(m->mdt_fld_service);
+ m->mdt_fld_service = NULL;
+err_mdt_svc:
+ ptlrpc_unregister_service(m->mdt_service);
+ m->mdt_service = NULL;
+
+ RETURN(rc);
+}
+
+static void mdt_stack_fini(struct mdt_device *m)
+{
+ struct lu_device *d = md2lu_dev(m->mdt_child);
+ /* goes through all stack */
+ while (d != NULL) {
+ struct lu_device *n;
+ struct obd_type *type;
+ struct lu_device_type *ldt = d->ld_type;
+
+ lu_device_put(d);
+
+ /* each fini() returns next device in stack of layers
+ * * so we can avoid the recursion */
+ n = ldt->ldt_ops->ldto_device_fini(d);
+ ldt->ldt_ops->ldto_device_free(d);
+
+ type = ldt->obd_type;
+ type->typ_refcnt--;
+ class_put_type(type);
+ /* switch to the next device in the layer */
+ d = n;
+ }
+}
+
+static struct lu_device *mdt_layer_setup(const char *typename,
+ struct lu_device *child,
+ struct lustre_cfg *cfg)
+{
+ struct obd_type *type;
+ struct lu_device_type *ldt;
+ struct lu_device *d;
+ int rc;
+
+ /* find the type */
+ type = class_get_type(typename);
+ if (!type) {
+ CERROR("Unknown type: '%s'\n", typename);
+ GOTO(out, rc = -ENODEV);
+ }
+
+ ldt = type->typ_lu;
+ ldt->obd_type = type;
+ if (ldt == NULL) {
+ CERROR("type: '%s'\n", typename);
+ GOTO(out_type, rc = -EINVAL);
+ }
+
+ d = ldt->ldt_ops->ldto_device_alloc(ldt, cfg);
+ if (IS_ERR(d)) {
+ CERROR("Cannot allocate device: '%s'\n", typename);
+ GOTO(out_type, rc = -ENODEV);
+ }
+
+ LASSERT(child->ld_site);
+ d->ld_site = child->ld_site;
+
+ type->typ_refcnt++;
+ rc = ldt->ldt_ops->ldto_device_init(d, child);
+ if (rc) {
+ CERROR("can't init device '%s', rc %d\n", typename, rc);
+ GOTO(out_alloc, rc);
+ }
+ lu_device_get(d);
+
+ RETURN(d);
+out_alloc:
+ ldt->ldt_ops->ldto_device_free(d);
+ type->typ_refcnt--;
+out_type:
+ class_put_type(type);
+out:
+ RETURN(ERR_PTR(rc));
+}
+
+static int mdt_stack_init(struct mdt_device *m, struct lustre_cfg *cfg)
+{
+ struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
+ struct lu_device *tmp;
+ int rc;
+
+ /* init the stack */
+ tmp = mdt_layer_setup(LUSTRE_OSD0_NAME, d, cfg);
+ if (IS_ERR(tmp)) {
+ RETURN (PTR_ERR(tmp));
+ }
+ d = tmp;
+ tmp = mdt_layer_setup(LUSTRE_MDD0_NAME, d, cfg);
+ if (IS_ERR(tmp)) {
+ GOTO(out, rc = PTR_ERR(tmp));
+ }
+ d = tmp;
+ tmp = mdt_layer_setup(LUSTRE_CMM0_NAME, d, cfg);
+ if (IS_ERR(tmp)) {
+ GOTO(out, rc = PTR_ERR(tmp));
+ }
+ d = tmp;
+ m->mdt_child = lu2md_dev(d);
+
+ /* process setup config */
+ tmp = &m->mdt_md_dev.md_lu_dev;
+ rc = tmp->ld_ops->ldo_process_config(tmp, cfg);
+
+out:
+ /* fini from last known good lu_device */
+ if (rc)
+ mdt_stack_fini(d);
+
+ return rc;
+}
+
static void mdt_fini(struct mdt_device *m)
{
struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
+ ENTRY;
+
+ mdt_stop_ptlrpc_service(m);
+
+ /* finish the stack */
+ mdt_stack_fini(m);
+
if (d->ld_site != NULL) {
lu_site_fini(d->ld_site);
+ OBD_FREE_PTR(d->ld_site);
d->ld_site = NULL;
}
- if (m->mdt_service != NULL) {
- ptlrpc_unregister_service(m->mdt_service);
- m->mdt_service = NULL;
- }
if (m->mdt_namespace != NULL) {
ldlm_namespace_free(m->mdt_namespace, 0);
m->mdt_namespace = NULL;
}
- /* finish the stack */
- if (m->mdt_child) {
- struct lu_device *child = md2lu_dev(m->mdt_child);
-
- child->ld_ops->ldo_device_fini(child);
+
+ if (m->mdt_seq_mgr) {
+ seq_mgr_fini(m->mdt_seq_mgr);
+ m->mdt_seq_mgr = NULL;
}
LASSERT(atomic_read(&d->ld_ref) == 0);
md_device_fini(&m->mdt_md_dev);
+ EXIT;
}
static int mdt_init0(struct mdt_device *m,
struct lu_device_type *t, struct lustre_cfg *cfg)
{
+ int rc;
struct lu_site *s;
char ns_name[48];
- struct obd_device * obd = NULL;
- char *top = lustre_cfg_string(cfg, 0);
- char *child = lustre_cfg_string(cfg, 1);
+ struct lu_context ctx;
ENTRY;
OBD_ALLOC_PTR(s);
if (s == NULL)
- return -ENOMEM;
+ RETURN(-ENOMEM);
md_device_init(&m->mdt_md_dev, t);
-
m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
- /* get next layer */
- obd = class_name2obd(child);
- if (obd && obd->obd_lu_dev) {
- CDEBUG(D_INFO, "Child device is %s\n", child);
- m->mdt_child = lu2md_dev(obd->obd_lu_dev);
- } else {
- CDEBUG(D_INFO, "Child device %s is not found\n", child);
- return -EINVAL;
+ rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
+ if (rc) {
+ CERROR("can't init lu_site, rc %d\n", rc);
+ GOTO(err_fini_site, rc);
}
- m->mdt_service_conf.psc_nbufs = MDS_NBUFS;
- m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE;
- m->mdt_service_conf.psc_max_req_size = MDS_MAXREQSIZE;
- m->mdt_service_conf.psc_max_reply_size = MDS_MAXREPSIZE;
- m->mdt_service_conf.psc_req_portal = MDS_REQUEST_PORTAL;
- m->mdt_service_conf.psc_rep_portal = MDC_REPLY_PORTAL;
- m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
- /*
- * We'd like to have a mechanism to set this on a per-device basis,
- * but alas...
- */
- m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
- MDT_MIN_THREADS),
- MDT_MAX_THREADS);
- lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
+ /* init the stack */
+ rc = mdt_stack_init(m, cfg);
+ if (rc) {
+ CERROR("can't init device stack, rc %d\n", rc);
+ GOTO(err_fini_site, rc);
+ }
+
+ m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
+ if (!m->mdt_seq_mgr) {
+ CERROR("can't initialize sequence manager\n");
+ GOTO(err_fini_stack, rc);
+ }
+
+ rc = lu_context_init(&ctx);
+ if (rc != 0)
+ GOTO(err_fini_mgr, rc);
+
+ lu_context_enter(&ctx);
+ /* init sequence info after device stack is initialized. */
+ rc = seq_mgr_setup(&ctx, m->mdt_seq_mgr);
+ lu_context_exit(&ctx);
+ if (rc)
+ GOTO(err_fini_ctx, rc);
+
+ lu_context_fini(&ctx);
snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
if (m->mdt_namespace == NULL)
- return -ENOMEM;
+ GOTO(err_fini_site, rc = -ENOMEM);
+
ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
- ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
- "mdt_ldlm_client", &m->mdt_ldlm_client);
+ rc = mdt_fld_init(m);
+ if (rc)
+ GOTO(err_free_ns, rc);
- m->mdt_service =
- ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
- LUSTRE_MDT0_NAME,
- m->mdt_md_dev.md_lu_dev.ld_proc_entry,
- NULL);
- if (m->mdt_service == NULL)
- return -ENOMEM;
-
- /* init the stack */
- if (m->mdt_child) {
- struct lu_device *child = md2lu_dev(m->mdt_child);
- int err;
-
- if (child->ld_ops->ldo_device_init) {
- err = child->ld_ops->ldo_device_init(child, top);
- if (err)
- return err;
- }
+ rc = mdt_start_ptlrpc_service(m);
+ if (rc)
+ GOTO(err_free_fld, rc);
+ RETURN(0);
+
+err_free_fld:
+ mdt_fld_fini(m);
+err_free_ns:
+ ldlm_namespace_free(m->mdt_namespace, 0);
+ m->mdt_namespace = NULL;
+err_fini_ctx:
+ lu_context_fini(&ctx);
+err_fini_mgr:
+ seq_mgr_fini(m->mdt_seq_mgr);
+ m->mdt_seq_mgr = NULL;
+err_fini_stack:
+ mdt_stack_fini(m);
+err_fini_site:
+ lu_site_fini(s);
+ OBD_FREE_PTR(s);
+ RETURN(rc);
+}
+/* used by MGS to process specific configurations */
+static int mdt_process_config(struct lu_device *d, struct lustre_cfg *cfg)
+{
+ struct lu_device *next = md2lu_dev(mdt_dev(d)->mdt_child);
+ int err;
+ ENTRY;
+ switch(cfg->lcfg_command) {
+ /* all MDT specific commands should be here */
+ default:
+ /* others are passed further */
+ err = next->ld_ops->ldo_process_config(next, cfg);
}
- return ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
+out:
+ RETURN(err);
}
-static struct lu_object *mdt_object_alloc(struct lu_device *d)
+static struct lu_object *mdt_object_alloc(struct lu_context *ctxt,
+ struct lu_device *d)
{
struct mdt_object *mo;
h = &mo->mot_header;
lu_object_header_init(h);
lu_object_init(o, h, d);
+ lu_object_add_top(h, o);
return o;
} else
return NULL;
}
-static int mdt_object_init(struct lu_object *o)
+static int mdt_object_init(struct lu_context *ctxt, struct lu_object *o)
{
struct mdt_device *d = mdt_dev(o->lo_dev);
struct lu_device *under;
struct lu_object *below;
under = &d->mdt_child->md_lu_dev;
- below = under->ld_ops->ldo_object_alloc(under);
+ below = under->ld_ops->ldo_object_alloc(ctxt, under);
if (below != NULL) {
lu_object_add(o, below);
return 0;
return -ENOMEM;
}
-static void mdt_object_free(struct lu_object *o)
+static void mdt_object_free(struct lu_context *ctxt, struct lu_object *o)
{
+ struct mdt_object *mo = mdt_obj(o);
struct lu_object_header *h;
h = o->lo_header;
lu_object_fini(o);
lu_object_header_fini(h);
+ OBD_FREE_PTR(mo);
}
-static void mdt_object_release(struct lu_object *o)
+static void mdt_object_release(struct lu_context *ctxt, struct lu_object *o)
{
}
-static int mdt_object_print(struct seq_file *f, const struct lu_object *o)
+static int mdt_object_print(struct lu_context *ctxt,
+ struct seq_file *f, const struct lu_object *o)
{
return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
}
.ldo_object_init = mdt_object_init,
.ldo_object_free = mdt_object_free,
.ldo_object_release = mdt_object_release,
- .ldo_object_print = mdt_object_print
+ .ldo_object_print = mdt_object_print,
+ .ldo_process_config = mdt_process_config
};
/* mds_connect copy */
struct obd_connect_data *data)
{
struct obd_export *exp;
- int rc, abort_recovery;
+ int rc;
+ struct mdt_device *mdt;
struct mds_export_data *med;
struct mds_client_data *mcd = NULL;
-
ENTRY;
if (!conn || !obd || !cluuid)
RETURN(-EINVAL);
- /* Check for aborted recovery. */
- spin_lock_bh(&obd->obd_processing_task_lock);
- abort_recovery = obd->obd_abort_recovery;
- spin_unlock_bh(&obd->obd_processing_task_lock);
- if (abort_recovery)
- target_abort_recovery(obd);
+ mdt = mdt_dev(obd->obd_lu_dev);
- /* XXX There is a small race between checking the list and adding a
- * new connection for the same UUID, but the real threat (list
- * corruption when multiple different clients connect) is solved.
- *
- * There is a second race between adding the export to the list,
- * and filling in the client data below. Hence skipping the case
- * of NULL mcd above. We should already be controlling multiple
- * connects at the client, and we can't hold the spinlock over
- * memory allocations without risk of deadlocking.
- */
rc = class_connect(conn, obd, cluuid);
if (rc)
RETURN(rc);
+
exp = class_conn2export(conn);
LASSERT(exp);
med = &exp->exp_mds_data;
-
- OBD_ALLOC(mcd, sizeof(*mcd));
+
+ OBD_ALLOC_PTR(mcd);
if (!mcd)
GOTO(out, rc = -ENOMEM);
memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
med->med_mcd = mcd;
-
+
out:
if (rc) {
- if (mcd) {
- OBD_FREE(mcd, sizeof(*mcd));
- med->med_mcd = NULL;
- }
class_disconnect(exp);
} else {
class_export_put(exp);
RETURN(rc);
}
+static int mdt_obd_disconnect(struct obd_export *exp)
+{
+ struct mds_export_data *med = &exp->exp_mds_data;
+ unsigned long irqflags;
+ int rc;
+ ENTRY;
+
+ LASSERT(exp);
+ class_export_get(exp);
+
+ /* Disconnect early so that clients can't keep using export */
+ rc = class_disconnect(exp);
+ //ldlm_cancel_locks_for_export(exp);
+
+ /* complete all outstanding replies */
+ spin_lock_irqsave(&exp->exp_lock, irqflags);
+ while (!list_empty(&exp->exp_outstanding_replies)) {
+ struct ptlrpc_reply_state *rs =
+ list_entry(exp->exp_outstanding_replies.next,
+ struct ptlrpc_reply_state, rs_exp_list);
+ struct ptlrpc_service *svc = rs->rs_service;
+
+ spin_lock(&svc->srv_lock);
+ list_del_init(&rs->rs_exp_list);
+ ptlrpc_schedule_difficult_reply(rs);
+ spin_unlock(&svc->srv_lock);
+ }
+ spin_unlock_irqrestore(&exp->exp_lock, irqflags);
+
+ OBD_FREE_PTR(med->med_mcd);
+
+ class_export_put(exp);
+ RETURN(rc);
+}
+
static struct obd_ops mdt_obd_device_ops = {
.o_owner = THIS_MODULE,
- .o_connect = mdt_obd_connect
+ .o_connect = mdt_obd_connect,
+ .o_disconnect = mdt_obd_disconnect,
};
static struct lu_device *mdt_device_alloc(struct lu_device_type *t,
mdt_fini(m);
return ERR_PTR(result);
}
-
+
} else
l = ERR_PTR(-ENOMEM);
return l;
OBD_FREE_PTR(m);
}
-static void *mdt_thread_init(struct ptlrpc_thread *t)
+static void *mdt_thread_init(struct lu_context *ctx)
{
struct mdt_thread_info *info;
- return OBD_ALLOC_PTR(info) ? : ERR_PTR(-ENOMEM);
+ OBD_ALLOC_PTR(info);
+ if (info != NULL)
+ info->mti_ctxt = ctx;
+ else
+ info = ERR_PTR(-ENOMEM);
+ return info;
}
-static void mdt_thread_fini(struct ptlrpc_thread *t, void *data)
+static void mdt_thread_fini(struct lu_context *ctx, void *data)
{
struct mdt_thread_info *info = data;
OBD_FREE_PTR(info);
}
-static struct ptlrpc_thread_key mdt_thread_key = {
- .ptk_init = mdt_thread_init,
- .ptk_fini = mdt_thread_fini
+static struct lu_context_key mdt_thread_key = {
+ .lct_init = mdt_thread_init,
+ .lct_fini = mdt_thread_fini
};
static int mdt_type_init(struct lu_device_type *t)
{
- return ptlrpc_thread_key_register(&mdt_thread_key);
+ return lu_context_key_register(&mdt_thread_key);
}
static void mdt_type_fini(struct lu_device_type *t)
{
+ lu_context_key_degister(&mdt_thread_key);
}
static struct lu_device_type_operations mdt_device_type_ops = {
static int __init mdt_mod_init(void)
{
struct lprocfs_static_vars lvars;
- struct obd_type *type;
- int result;
mdt_num_threads = MDT_NUM_THREADS;
lprocfs_init_vars(mdt, &lvars);
- result = class_register_type(&mdt_obd_device_ops,
- lvars.module_vars, LUSTRE_MDT0_NAME);
- if (result == 0) {
- type = class_get_type(LUSTRE_MDT0_NAME);
- LASSERT(type != NULL);
- type->typ_lu = &mdt_device_type;
- result = type->typ_lu->ldt_ops->ldto_init(type->typ_lu);
- if (result != 0)
- class_unregister_type(LUSTRE_MDT0_NAME);
- }
- return result;
+ return class_register_type(&mdt_obd_device_ops, lvars.module_vars,
+ LUSTRE_MDT0_NAME, &mdt_device_type);
}
static void __exit mdt_mod_exit(void)
DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mdt_done_writing),
DEF_MDT_HNDL(0, PIN, mdt_pin),
DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mdt_sync),
- DEF_MDT_HNDL(0, SET_INFO, mdt_set_info),
+ DEF_MDT_HNDL(0, FLD, mdt_fld),
DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck),
DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl)
};
CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
"number of mdt service threads to start");
-cfs_module(mdt, "0.0.3", mdt_mod_init, mdt_mod_exit);
+cfs_module(mdt, "0.0.4", mdt_mod_init, mdt_mod_exit);