Whamcloud - gitweb
add layers initialization here. MDT as top device will init/fini other devices.
authortappro <tappro>
Mon, 17 Apr 2006 09:57:41 +0000 (09:57 +0000)
committertappro <tappro>
Mon, 17 Apr 2006 09:57:41 +0000 (09:57 +0000)
add disconnect to put export
fix missed OBD_FREE

lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h

index 8b4754c..70ebd3c 100644 (file)
@@ -248,7 +248,8 @@ static int mdt_connect(struct mdt_thread_info *info,
 static int mdt_disconnect(struct mdt_thread_info *info,
                           struct ptlrpc_request *req, int offset)
 {
-        return -EOPNOTSUPP;
+        //return -EOPNOTSUPP;
+        return target_handle_disconnect(req);
 }
 
 static int mdt_getattr_name(struct mdt_thread_info *info,
@@ -995,6 +996,74 @@ struct lu_seq_mgr_ops seq_mgr_ops = {
         .smo_write = mdt_seq_mgr_write
 };
 
+/* device init/fini methods */
+
+static int mdt_fld(struct mdt_thread_info *info,
+                   struct ptlrpc_request *req, int offset)
+{
+        struct lu_site *ls  = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
+        struct md_fld mf, *p, *reply;
+        int size = sizeof(*reply);
+        __u32 *opt; 
+        int rc;
+        ENTRY;
+
+        rc = lustre_pack_reply(req, 1, &size, NULL);
+        if (rc)
+                RETURN(rc);
+
+        opt = lustre_swab_reqbuf(req, 0, sizeof(*opt), lustre_swab_generic_32s);
+        p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
+        mf = *p;
+        
+        rc = fld_handle(ls->ls_fld, *opt, &mf);
+        if (rc)
+                RETURN(rc);        
+
+        reply = lustre_msg_buf(req->rq_repmsg, 0, size);
+        *reply = mf;
+        RETURN(rc);
+}
+
+struct dt_device *md2_bottom_dev(struct mdt_device *m)
+{
+        /*FIXME: get dt device here*/
+        RETURN (NULL);
+}
+
+static int mdt_fld_init(struct mdt_device *m)
+{
+        struct dt_device *dt;
+        struct lu_site   *ls;
+        int rc;
+        ENTRY;
+
+        dt = md2_bottom_dev(m);
+       
+        ls = m->mdt_md_dev.md_lu_dev.ld_site;
+        
+        OBD_ALLOC_PTR(ls->ls_fld);
+        
+        if (!ls->ls_fld)
+             RETURN(-ENOMEM);
+        
+        rc = fld_server_init(ls->ls_fld, dt);
+        
+        RETURN(rc);
+}
+
+static int mdt_fld_fini(struct mdt_device *m)
+{
+        struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
+        int rc = 0;
+        if (ls && ls->ls_fld) {
+                fld_server_fini(ls->ls_fld);
+                OBD_FREE_PTR(ls->ls_fld);
+        }
+        RETURN(rc);
+}
+
 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
 {
         if (m->mdt_service != NULL) {
@@ -1007,35 +1076,6 @@ static void mdt_stop_ptlrpc_service(struct mdt_device *m)
         }
 }
 
-static void mdt_fini(struct mdt_device *m)
-{
-        struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
-
-        mdt_stop_ptlrpc_service(m);
-        if (d->ld_site != NULL) {
-                lu_site_fini(d->ld_site);
-                OBD_FREE_PTR(d->ld_site);
-                d->ld_site = NULL;
-        }
-        if (m->mdt_namespace != NULL) {
-                ldlm_namespace_free(m->mdt_namespace, 0);
-                m->mdt_namespace = NULL;
-        }
-        /* finish the stack */
-        if (m->mdt_child) {
-                struct lu_device *child = md2lu_dev(m->mdt_child);
-                child->ld_type->ldt_ops->ldto_device_fini(child);
-        }
-
-        if (m->mdt_seq_mgr) {
-                seq_mgr_fini(m->mdt_seq_mgr);
-                m->mdt_seq_mgr = NULL;
-        }
-
-        LASSERT(atomic_read(&d->ld_ref) == 0);
-        md_device_fini(&m->mdt_md_dev);
-}
-
 static int mdt_start_ptlrpc_service(struct mdt_device *m)
 {
         int rc;
@@ -1098,68 +1138,137 @@ err_mdt_svc:
         RETURN(rc);
 }
 
-static int mdt_fld(struct mdt_thread_info *info,
-                   struct ptlrpc_request *req, int offset)
+static void mdt_stack_fini(struct mdt_device *m)
 {
-        struct lu_site *ls  = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
-        struct md_fld mf, *p, *reply;
-        int size = sizeof(*reply);
-        __u32 *opt;
-        int rc;
-        ENTRY;
-
-        rc = lustre_pack_reply(req, 1, &size, NULL);
-        if (rc)
-                RETURN(rc);
+        if (m->mdt_child) {
+                struct lu_device *d = md2lu_dev(m->mdt_child);
+                /* goes through all stack */
+                while (d != NULL) {
+                        struct lu_device *n;
+                        struct obd_type *type;
+                        struct lu_device_type *ldt = d->ld_type;
+
+                        lu_device_put(d);
+                        
+                        /* each fini() returns next device in stack of layers 
+                         * so we can avoid the recursion */
+                        n = d->ld_type->ldt_ops->ldto_device_fini(d);
+                        d->ld_type->ldt_ops->ldto_device_free(d);
+
+                        type = ldt->obd_type;
+                        type->typ_refcnt--;
+                        class_put_type(type);
+                        /* switch to the next device in the layer */
+                        d = n;
+                }
+        }
+        return;
+}
 
-        opt = lustre_swab_reqbuf(req, 0, sizeof(*opt), lustre_swab_generic_32s);
-        p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
-        mf = *p;
+static struct lu_device *mdt_layer_setup(const char *typename,
+                                         struct lu_device *child,
+                                         struct lustre_cfg *cfg)
+{
+        struct obd_type       *type;
+        struct lu_device_type *ldt;
+        struct lu_device      *d;
+        int rc;
 
-        rc = fld_handle(ls->ls_fld, *opt, &mf);
-        if (rc)
-                RETURN(rc);
+        /* find the type */
+        type = class_get_type(typename);
+        if (!type) {
+                CERROR("Unknown type: '%s'\n", typename);
+                GOTO(out, rc = -ENODEV);
+        }
 
-        reply = lustre_msg_buf(req->rq_repmsg, 0, size);
-        *reply = mf;
-        RETURN(rc);
-}
+        ldt = type->typ_lu;
+        ldt->obd_type = type;
+        if (ldt == NULL) {
+                CERROR("type: '%s'\n", typename);
+                GOTO(out_type, rc = -EINVAL);
+        }
+                
+        d = ldt->ldt_ops->ldto_device_alloc(ldt, cfg);
+        if (IS_ERR(d)) {
+                CERROR("Cannot allocate device: '%s'\n", typename);
+                GOTO(out_type, rc = -ENODEV);
+        }
+        
+        LASSERT(child->ld_site);
+        d->ld_site = child->ld_site;
 
-struct dt_device *md2_bottom_dev(struct mdt_device *m)
-{
-        /*FIXME: get dt device here*/
-        RETURN (NULL);
+        type->typ_refcnt++;
+        rc = ldt->ldt_ops->ldto_device_init(d, child);
+        if (rc) {
+                CERROR("can't init device '%s', rc %d\n", typename, rc);
+                GOTO(out_alloc, rc);
+        }
+        lu_device_get(d);
+
+        RETURN(d);
+out_alloc:
+        ldt->ldt_ops->ldto_device_free(d);
+        type->typ_refcnt--;
+out_type:
+        class_put_type(type);
+out:
+        RETURN(ERR_PTR(rc));
 }
 
-static int mdt_fld_init(struct mdt_device *m)
+static int mdt_stack_init(struct mdt_device *m, struct lustre_cfg *cfg) 
 {
-        struct dt_device *dt;
-        struct lu_site   *ls;
+        struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
         int rc;
-        ENTRY;
-
-        dt = md2_bottom_dev(m);
+        
+        /* init the stack */
+        d = mdt_layer_setup(LUSTRE_OSD0_NAME, d, cfg);
+        if (IS_ERR(d)) {
+                GOTO(out, rc = PTR_ERR(d));
+        }
 
-        ls = m->mdt_md_dev.md_lu_dev.ld_site;
+        d = mdt_layer_setup(LUSTRE_MDD0_NAME, d, cfg);
+        if (IS_ERR(d)) {
+                GOTO(out, rc = PTR_ERR(d));
+        }
 
-        OBD_ALLOC_PTR(ls->ls_fld);
+        d = mdt_layer_setup(LUSTRE_CMM0_NAME, d, cfg);
+        if (IS_ERR(d)) {
+                GOTO(out, rc = PTR_ERR(d));
+        }
 
-        if (!ls->ls_fld)
-                rc = -ENOMEM;
-        else
-                rc = fld_server_init(ls->ls_fld, dt);
+        m->mdt_child = lu2md_dev(d);
 
-        RETURN(rc);
+        RETURN(0);
+out:
+        mdt_stack_fini(m);
+        return rc;
 }
 
-static void mdt_fld_fini(struct mdt_device *m)
+static void mdt_fini(struct mdt_device *m)
 {
-        struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
+        struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
 
-        if (ls && ls->ls_fld) {
-                fld_server_fini(ls->ls_fld);
-                OBD_FREE_PTR(ls->ls_fld);
+        mdt_stop_ptlrpc_service(m);
+        if (d->ld_site != NULL) {
+                lu_site_fini(d->ld_site);
+                OBD_FREE_PTR(d->ld_site);
+                d->ld_site = NULL;
+        }
+        if (m->mdt_namespace != NULL) {
+                ldlm_namespace_free(m->mdt_namespace, 0);
+                m->mdt_namespace = NULL;
+        }
+
+        /* finish the stack */
+        mdt_stack_fini(m);
+
+        if (m->mdt_seq_mgr) {
+                seq_mgr_fini(m->mdt_seq_mgr);
+                m->mdt_seq_mgr = NULL;
         }
+
+        LASSERT(atomic_read(&d->ld_ref) == 0);
+        md_device_fini(&m->mdt_md_dev);
 }
 
 static int mdt_init0(struct mdt_device *m,
@@ -1168,36 +1277,23 @@ static int mdt_init0(struct mdt_device *m,
         int rc;
         struct lu_site *s;
         char   ns_name[48];
-        struct obd_device *obd;
-        struct lu_device  *mdt_child;
-        const char *top   = lustre_cfg_string(cfg, 0);
-        const char *child = lustre_cfg_string(cfg, 1);
         struct lu_context ctx;
-
         ENTRY;
 
-        /* get next layer */
-        obd = class_name2obd((char *)child);
-        if (obd && obd->obd_lu_dev) {
-                CDEBUG(D_INFO, "Child device is %s\n", child);
-                m->mdt_child = lu2md_dev(obd->obd_lu_dev);
-                mdt_child = md2lu_dev(m->mdt_child);
-        } else {
-                CDEBUG(D_INFO, "Child device %s is not found\n", child);
-                RETURN(-EINVAL);
-        }
-
         OBD_ALLOC_PTR(s);
         if (s == NULL)
                 RETURN(-ENOMEM);
 
         md_device_init(&m->mdt_md_dev, t);
         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
-        lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
-
+        
+        rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev, cfg);
+        if (rc) {
+                CERROR("can't init lu_site, rc %d\n", rc);
+                GOTO(err_fini_site, rc);
+        }
         /* init the stack */
-        LASSERT(mdt_child->ld_type->ldt_ops->ldto_device_init != NULL);
-        rc = mdt_child->ld_type->ldt_ops->ldto_device_init(mdt_child, top);
+        rc = mdt_stack_init(m, cfg);
         if (rc) {
                 CERROR("can't init device stack, rc %d\n", rc);
                 GOTO(err_fini_site, rc);
@@ -1206,7 +1302,7 @@ static int mdt_init0(struct mdt_device *m,
         m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
         if (!m->mdt_seq_mgr) {
                 CERROR("can't initialize sequence manager\n");
-                GOTO(err_fini_child, rc);
+                GOTO(err_fini_stack, rc);
         }
 
         rc = lu_context_init(&ctx);
@@ -1237,6 +1333,7 @@ static int mdt_init0(struct mdt_device *m,
         if (rc)
                 GOTO(err_free_fld, rc);
         RETURN(0);
+
 err_free_fld:
         mdt_fld_fini(m);
 err_free_ns:
@@ -1247,8 +1344,8 @@ err_fini_ctx:
 err_fini_mgr:
         seq_mgr_fini(m->mdt_seq_mgr);
         m->mdt_seq_mgr = NULL;
-err_fini_child:
-        mdt_child->ld_type->ldt_ops->ldto_device_fini(mdt_child);
+err_fini_stack:
+        mdt_stack_fini(m);
 
 err_fini_site:
         lu_site_fini(s);
@@ -1293,11 +1390,13 @@ static int mdt_object_init(struct lu_context *ctxt, struct lu_object *o)
 
 static void mdt_object_free(struct lu_context *ctxt, struct lu_object *o)
 {
+        struct mdt_object *mo = lu2mdt_obj(o);
         struct lu_object_header *h;
 
         h = o->lo_header;
         lu_object_fini(o);
         lu_object_header_fini(h);
+        OBD_FREE_PTR(mo);
 }
 
 static void mdt_object_release(struct lu_context *ctxt, struct lu_object *o)
@@ -1324,11 +1423,10 @@ static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
                            struct obd_connect_data *data)
 {
         struct obd_export *exp;
-        int rc, abort_recovery;
+        int rc;
         struct mdt_device *mdt;
         struct mds_export_data *med;
         struct mds_client_data *mcd = NULL;
-
         ENTRY;
 
         if (!conn || !obd || !cluuid)
@@ -1336,31 +1434,15 @@ static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
 
         mdt = mdt_dev(obd->obd_lu_dev);
 
-        /* Check for aborted recovery. */
-        spin_lock_bh(&obd->obd_processing_task_lock);
-        abort_recovery = obd->obd_abort_recovery;
-        spin_unlock_bh(&obd->obd_processing_task_lock);
-        if (abort_recovery)
-                target_abort_recovery(obd);
-
-        /* XXX There is a small race between checking the list and adding a
-         * new connection for the same UUID, but the real threat (list
-         * corruption when multiple different clients connect) is solved.
-         *
-         * There is a second race between adding the export to the list,
-         * and filling in the client data below.  Hence skipping the case
-         * of NULL mcd above.  We should already be controlling multiple
-         * connects at the client, and we can't hold the spinlock over
-         * memory allocations without risk of deadlocking.
-         */
         rc = class_connect(conn, obd, cluuid);
         if (rc)
                 RETURN(rc);
+
         exp = class_conn2export(conn);
         LASSERT(exp);
         med = &exp->exp_mds_data;
-
-        OBD_ALLOC(mcd, sizeof(*mcd));
+        
+        OBD_ALLOC_PTR(mcd);
         if (!mcd)
                 GOTO(out, rc = -ENOMEM);
 
@@ -1369,10 +1451,6 @@ static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
 
 out:
         if (rc) {
-                if (mcd) {
-                        OBD_FREE(mcd, sizeof(*mcd));
-                        med->med_mcd = NULL;
-                }
                 class_disconnect(exp);
         } else {
                 class_export_put(exp);
@@ -1381,9 +1459,45 @@ out:
         RETURN(rc);
 }
 
+static int mdt_obd_disconnect(struct obd_export *exp)
+{
+        struct mds_export_data *med = &exp->exp_mds_data;
+        unsigned long irqflags;
+        int rc;
+        ENTRY;
+
+        LASSERT(exp);
+        class_export_get(exp);
+
+        /* Disconnect early so that clients can't keep using export */
+        rc = class_disconnect(exp);
+        //ldlm_cancel_locks_for_export(exp);
+
+        /* complete all outstanding replies */
+        spin_lock_irqsave(&exp->exp_lock, irqflags);
+        while (!list_empty(&exp->exp_outstanding_replies)) {
+                struct ptlrpc_reply_state *rs =
+                        list_entry(exp->exp_outstanding_replies.next,
+                                   struct ptlrpc_reply_state, rs_exp_list);
+                struct ptlrpc_service *svc = rs->rs_service;
+
+                spin_lock(&svc->srv_lock);
+                list_del_init(&rs->rs_exp_list);
+                ptlrpc_schedule_difficult_reply(rs);
+                spin_unlock(&svc->srv_lock);
+        }
+        spin_unlock_irqrestore(&exp->exp_lock, irqflags);
+        
+        OBD_FREE_PTR(med->med_mcd);
+
+        class_export_put(exp);
+        RETURN(rc);
+}
+
 static struct obd_ops mdt_obd_device_ops = {
         .o_owner = THIS_MODULE,
-        .o_connect = mdt_obd_connect
+        .o_connect = mdt_obd_connect,
+        .o_disconnect = mdt_obd_disconnect,
 };
 
 static struct lu_device *mdt_device_alloc(struct lu_device_type *t,
index ce3da4f..f1d72ca 100644 (file)
@@ -105,6 +105,11 @@ static inline struct md_object *mdt_object_child(struct mdt_object *o)
         return lu2md(lu_object_next(&o->mot_obj.mo_lu));
 }
 
+static inline struct mdt_object *lu2mdt_obj(struct lu_object *o)
+{
+        return (container_of(lu2md(o),struct mdt_object, mot_obj));
+}
+
 struct mdt_lock_handle {
         struct lustre_handle    mlh_lh;
         ldlm_mode_t             mlh_mode;
@@ -113,10 +118,6 @@ struct mdt_lock_handle {
 void mdt_lock_handle_init(struct mdt_lock_handle *lh);
 void mdt_lock_handle_fini(struct mdt_lock_handle *lh);
 
-struct mdd_object {
-        struct md_object  mod_obj;
-};
-
 int md_device_init(struct md_device *md, struct lu_device_type *t);
 void md_device_fini(struct md_device *md);