static int mdt_disconnect(struct mdt_thread_info *info,
struct ptlrpc_request *req, int offset)
{
- return -EOPNOTSUPP;
+ //return -EOPNOTSUPP;
+ return target_handle_disconnect(req);
}
static int mdt_getattr_name(struct mdt_thread_info *info,
.smo_write = mdt_seq_mgr_write
};
+/* device init/fini methods */
+
+static int mdt_fld(struct mdt_thread_info *info,
+ struct ptlrpc_request *req, int offset)
+{
+ struct lu_site *ls = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
+ struct md_fld mf, *p, *reply;
+ int size = sizeof(*reply);
+ __u32 *opt;
+ int rc;
+ ENTRY;
+
+ rc = lustre_pack_reply(req, 1, &size, NULL);
+ if (rc)
+ RETURN(rc);
+
+ opt = lustre_swab_reqbuf(req, 0, sizeof(*opt), lustre_swab_generic_32s);
+ p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
+ mf = *p;
+
+ rc = fld_handle(ls->ls_fld, *opt, &mf);
+ if (rc)
+ RETURN(rc);
+
+ reply = lustre_msg_buf(req->rq_repmsg, 0, size);
+ *reply = mf;
+ RETURN(rc);
+}
+
+struct dt_device *md2_bottom_dev(struct mdt_device *m)
+{
+ /*FIXME: get dt device here*/
+ RETURN (NULL);
+}
+
+static int mdt_fld_init(struct mdt_device *m)
+{
+ struct dt_device *dt;
+ struct lu_site *ls;
+ int rc;
+ ENTRY;
+
+ dt = md2_bottom_dev(m);
+
+ ls = m->mdt_md_dev.md_lu_dev.ld_site;
+
+ OBD_ALLOC_PTR(ls->ls_fld);
+
+ if (!ls->ls_fld)
+ RETURN(-ENOMEM);
+
+ rc = fld_server_init(ls->ls_fld, dt);
+
+ RETURN(rc);
+}
+
+static int mdt_fld_fini(struct mdt_device *m)
+{
+ struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
+ int rc = 0;
+
+ if (ls && ls->ls_fld) {
+ fld_server_fini(ls->ls_fld);
+ OBD_FREE_PTR(ls->ls_fld);
+ }
+ RETURN(rc);
+}
+
static void mdt_stop_ptlrpc_service(struct mdt_device *m)
{
if (m->mdt_service != NULL) {
}
}
-static void mdt_fini(struct mdt_device *m)
-{
- struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
-
- mdt_stop_ptlrpc_service(m);
- if (d->ld_site != NULL) {
- lu_site_fini(d->ld_site);
- OBD_FREE_PTR(d->ld_site);
- d->ld_site = NULL;
- }
- if (m->mdt_namespace != NULL) {
- ldlm_namespace_free(m->mdt_namespace, 0);
- m->mdt_namespace = NULL;
- }
- /* finish the stack */
- if (m->mdt_child) {
- struct lu_device *child = md2lu_dev(m->mdt_child);
- child->ld_type->ldt_ops->ldto_device_fini(child);
- }
-
- if (m->mdt_seq_mgr) {
- seq_mgr_fini(m->mdt_seq_mgr);
- m->mdt_seq_mgr = NULL;
- }
-
- LASSERT(atomic_read(&d->ld_ref) == 0);
- md_device_fini(&m->mdt_md_dev);
-}
-
static int mdt_start_ptlrpc_service(struct mdt_device *m)
{
int rc;
RETURN(rc);
}
-static int mdt_fld(struct mdt_thread_info *info,
- struct ptlrpc_request *req, int offset)
+static void mdt_stack_fini(struct mdt_device *m)
{
- struct lu_site *ls = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
- struct md_fld mf, *p, *reply;
- int size = sizeof(*reply);
- __u32 *opt;
- int rc;
- ENTRY;
-
- rc = lustre_pack_reply(req, 1, &size, NULL);
- if (rc)
- RETURN(rc);
+ if (m->mdt_child) {
+ struct lu_device *d = md2lu_dev(m->mdt_child);
+ /* goes through all stack */
+ while (d != NULL) {
+ struct lu_device *n;
+ struct obd_type *type;
+ struct lu_device_type *ldt = d->ld_type;
+
+ lu_device_put(d);
+
+ /* each fini() returns next device in stack of layers
+ * so we can avoid the recursion */
+ n = d->ld_type->ldt_ops->ldto_device_fini(d);
+ d->ld_type->ldt_ops->ldto_device_free(d);
+
+ type = ldt->obd_type;
+ type->typ_refcnt--;
+ class_put_type(type);
+ /* switch to the next device in the layer */
+ d = n;
+ }
+ }
+ return;
+}
- opt = lustre_swab_reqbuf(req, 0, sizeof(*opt), lustre_swab_generic_32s);
- p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
- mf = *p;
+static struct lu_device *mdt_layer_setup(const char *typename,
+ struct lu_device *child,
+ struct lustre_cfg *cfg)
+{
+ struct obd_type *type;
+ struct lu_device_type *ldt;
+ struct lu_device *d;
+ int rc;
- rc = fld_handle(ls->ls_fld, *opt, &mf);
- if (rc)
- RETURN(rc);
+ /* find the type */
+ type = class_get_type(typename);
+ if (!type) {
+ CERROR("Unknown type: '%s'\n", typename);
+ GOTO(out, rc = -ENODEV);
+ }
- reply = lustre_msg_buf(req->rq_repmsg, 0, size);
- *reply = mf;
- RETURN(rc);
-}
+ ldt = type->typ_lu;
+ ldt->obd_type = type;
+ if (ldt == NULL) {
+ CERROR("type: '%s'\n", typename);
+ GOTO(out_type, rc = -EINVAL);
+ }
+
+ d = ldt->ldt_ops->ldto_device_alloc(ldt, cfg);
+ if (IS_ERR(d)) {
+ CERROR("Cannot allocate device: '%s'\n", typename);
+ GOTO(out_type, rc = -ENODEV);
+ }
+
+ LASSERT(child->ld_site);
+ d->ld_site = child->ld_site;
-struct dt_device *md2_bottom_dev(struct mdt_device *m)
-{
- /*FIXME: get dt device here*/
- RETURN (NULL);
+ type->typ_refcnt++;
+ rc = ldt->ldt_ops->ldto_device_init(d, child);
+ if (rc) {
+ CERROR("can't init device '%s', rc %d\n", typename, rc);
+ GOTO(out_alloc, rc);
+ }
+ lu_device_get(d);
+
+ RETURN(d);
+out_alloc:
+ ldt->ldt_ops->ldto_device_free(d);
+ type->typ_refcnt--;
+out_type:
+ class_put_type(type);
+out:
+ RETURN(ERR_PTR(rc));
}
-static int mdt_fld_init(struct mdt_device *m)
+static int mdt_stack_init(struct mdt_device *m, struct lustre_cfg *cfg)
{
- struct dt_device *dt;
- struct lu_site *ls;
+ struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
int rc;
- ENTRY;
-
- dt = md2_bottom_dev(m);
+
+ /* init the stack */
+ d = mdt_layer_setup(LUSTRE_OSD0_NAME, d, cfg);
+ if (IS_ERR(d)) {
+ GOTO(out, rc = PTR_ERR(d));
+ }
- ls = m->mdt_md_dev.md_lu_dev.ld_site;
+ d = mdt_layer_setup(LUSTRE_MDD0_NAME, d, cfg);
+ if (IS_ERR(d)) {
+ GOTO(out, rc = PTR_ERR(d));
+ }
- OBD_ALLOC_PTR(ls->ls_fld);
+ d = mdt_layer_setup(LUSTRE_CMM0_NAME, d, cfg);
+ if (IS_ERR(d)) {
+ GOTO(out, rc = PTR_ERR(d));
+ }
- if (!ls->ls_fld)
- rc = -ENOMEM;
- else
- rc = fld_server_init(ls->ls_fld, dt);
+ m->mdt_child = lu2md_dev(d);
- RETURN(rc);
+ RETURN(0);
+out:
+ mdt_stack_fini(m);
+ return rc;
}
-static void mdt_fld_fini(struct mdt_device *m)
+static void mdt_fini(struct mdt_device *m)
{
- struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
+ struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
- if (ls && ls->ls_fld) {
- fld_server_fini(ls->ls_fld);
- OBD_FREE_PTR(ls->ls_fld);
+ mdt_stop_ptlrpc_service(m);
+ if (d->ld_site != NULL) {
+ lu_site_fini(d->ld_site);
+ OBD_FREE_PTR(d->ld_site);
+ d->ld_site = NULL;
+ }
+ if (m->mdt_namespace != NULL) {
+ ldlm_namespace_free(m->mdt_namespace, 0);
+ m->mdt_namespace = NULL;
+ }
+
+ /* finish the stack */
+ mdt_stack_fini(m);
+
+ if (m->mdt_seq_mgr) {
+ seq_mgr_fini(m->mdt_seq_mgr);
+ m->mdt_seq_mgr = NULL;
}
+
+ LASSERT(atomic_read(&d->ld_ref) == 0);
+ md_device_fini(&m->mdt_md_dev);
}
static int mdt_init0(struct mdt_device *m,
int rc;
struct lu_site *s;
char ns_name[48];
- struct obd_device *obd;
- struct lu_device *mdt_child;
- const char *top = lustre_cfg_string(cfg, 0);
- const char *child = lustre_cfg_string(cfg, 1);
struct lu_context ctx;
-
ENTRY;
- /* get next layer */
- obd = class_name2obd((char *)child);
- if (obd && obd->obd_lu_dev) {
- CDEBUG(D_INFO, "Child device is %s\n", child);
- m->mdt_child = lu2md_dev(obd->obd_lu_dev);
- mdt_child = md2lu_dev(m->mdt_child);
- } else {
- CDEBUG(D_INFO, "Child device %s is not found\n", child);
- RETURN(-EINVAL);
- }
-
OBD_ALLOC_PTR(s);
if (s == NULL)
RETURN(-ENOMEM);
md_device_init(&m->mdt_md_dev, t);
m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
- lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
-
+
+ rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev, cfg);
+ if (rc) {
+ CERROR("can't init lu_site, rc %d\n", rc);
+ GOTO(err_fini_site, rc);
+ }
/* init the stack */
- LASSERT(mdt_child->ld_type->ldt_ops->ldto_device_init != NULL);
- rc = mdt_child->ld_type->ldt_ops->ldto_device_init(mdt_child, top);
+ rc = mdt_stack_init(m, cfg);
if (rc) {
CERROR("can't init device stack, rc %d\n", rc);
GOTO(err_fini_site, rc);
m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
if (!m->mdt_seq_mgr) {
CERROR("can't initialize sequence manager\n");
- GOTO(err_fini_child, rc);
+ GOTO(err_fini_stack, rc);
}
rc = lu_context_init(&ctx);
if (rc)
GOTO(err_free_fld, rc);
RETURN(0);
+
err_free_fld:
mdt_fld_fini(m);
err_free_ns:
err_fini_mgr:
seq_mgr_fini(m->mdt_seq_mgr);
m->mdt_seq_mgr = NULL;
-err_fini_child:
- mdt_child->ld_type->ldt_ops->ldto_device_fini(mdt_child);
+err_fini_stack:
+ mdt_stack_fini(m);
err_fini_site:
lu_site_fini(s);
static void mdt_object_free(struct lu_context *ctxt, struct lu_object *o)
{
+ struct mdt_object *mo = lu2mdt_obj(o);
struct lu_object_header *h;
h = o->lo_header;
lu_object_fini(o);
lu_object_header_fini(h);
+ OBD_FREE_PTR(mo);
}
static void mdt_object_release(struct lu_context *ctxt, struct lu_object *o)
struct obd_connect_data *data)
{
struct obd_export *exp;
- int rc, abort_recovery;
+ int rc;
struct mdt_device *mdt;
struct mds_export_data *med;
struct mds_client_data *mcd = NULL;
-
ENTRY;
if (!conn || !obd || !cluuid)
mdt = mdt_dev(obd->obd_lu_dev);
- /* Check for aborted recovery. */
- spin_lock_bh(&obd->obd_processing_task_lock);
- abort_recovery = obd->obd_abort_recovery;
- spin_unlock_bh(&obd->obd_processing_task_lock);
- if (abort_recovery)
- target_abort_recovery(obd);
-
- /* XXX There is a small race between checking the list and adding a
- * new connection for the same UUID, but the real threat (list
- * corruption when multiple different clients connect) is solved.
- *
- * There is a second race between adding the export to the list,
- * and filling in the client data below. Hence skipping the case
- * of NULL mcd above. We should already be controlling multiple
- * connects at the client, and we can't hold the spinlock over
- * memory allocations without risk of deadlocking.
- */
rc = class_connect(conn, obd, cluuid);
if (rc)
RETURN(rc);
+
exp = class_conn2export(conn);
LASSERT(exp);
med = &exp->exp_mds_data;
-
- OBD_ALLOC(mcd, sizeof(*mcd));
+
+ OBD_ALLOC_PTR(mcd);
if (!mcd)
GOTO(out, rc = -ENOMEM);
out:
if (rc) {
- if (mcd) {
- OBD_FREE(mcd, sizeof(*mcd));
- med->med_mcd = NULL;
- }
class_disconnect(exp);
} else {
class_export_put(exp);
RETURN(rc);
}
+static int mdt_obd_disconnect(struct obd_export *exp)
+{
+ struct mds_export_data *med = &exp->exp_mds_data;
+ unsigned long irqflags;
+ int rc;
+ ENTRY;
+
+ LASSERT(exp);
+ class_export_get(exp);
+
+ /* Disconnect early so that clients can't keep using export */
+ rc = class_disconnect(exp);
+ //ldlm_cancel_locks_for_export(exp);
+
+ /* complete all outstanding replies */
+ spin_lock_irqsave(&exp->exp_lock, irqflags);
+ while (!list_empty(&exp->exp_outstanding_replies)) {
+ struct ptlrpc_reply_state *rs =
+ list_entry(exp->exp_outstanding_replies.next,
+ struct ptlrpc_reply_state, rs_exp_list);
+ struct ptlrpc_service *svc = rs->rs_service;
+
+ spin_lock(&svc->srv_lock);
+ list_del_init(&rs->rs_exp_list);
+ ptlrpc_schedule_difficult_reply(rs);
+ spin_unlock(&svc->srv_lock);
+ }
+ spin_unlock_irqrestore(&exp->exp_lock, irqflags);
+
+ OBD_FREE_PTR(med->med_mcd);
+
+ class_export_put(exp);
+ RETURN(rc);
+}
+
static struct obd_ops mdt_obd_device_ops = {
.o_owner = THIS_MODULE,
- .o_connect = mdt_obd_connect
+ .o_connect = mdt_obd_connect,
+ .o_disconnect = mdt_obd_disconnect,
};
static struct lu_device *mdt_device_alloc(struct lu_device_type *t,