Whamcloud - gitweb
add support for new init schema
authortappro <tappro>
Tue, 11 Apr 2006 06:44:31 +0000 (06:44 +0000)
committertappro <tappro>
Tue, 11 Apr 2006 06:44:31 +0000 (06:44 +0000)
add .mdo_statfs(), .mdo_root_get() with calling the child device
add mdt_connect() which accept client connection
add check for req->rq_export in mdt_handle()
add initial proto for getattr()

lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h

index dcf6fe3..1b5cc54 100644 (file)
@@ -63,6 +63,56 @@ unsigned long mdt_num_threads;
 static int mdt_handle(struct ptlrpc_request *req);
 static struct ptlrpc_thread_key mdt_thread_key;
 
+static int mdt_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
+                     struct lu_fid *pfid, const char *name, struct lu_fid *cfid)
+{
+        struct mdt_object      *o;
+        struct mdt_object      *child;
+        struct mdt_lock_handle *lh;
+
+        int result;
+
+        lh = &info->mti_lh[MDT_LH_PARENT];
+        lh->mlh_mode = LCK_PW;
+
+        o = mdt_object_find_lock(d, pfid, lh, MDS_INODELOCK_UPDATE);
+        if (IS_ERR(o))
+                return PTR_ERR(o);
+
+        child = mdt_object_find(d, cfid);
+        if (!IS_ERR(child)) {
+                result = mdt_child_ops(d)->mdo_mkdir(mdt_object_child(o), name,
+                                                     mdt_object_child(child));
+                mdt_object_put(child);
+        } else
+                result = PTR_ERR(child);
+        mdt_object_unlock(d->mdt_namespace, o, lh);
+        mdt_object_put(o);
+        return result;
+}
+#if 0
+static int mdt_md_getattr(struct mdt_thread_info *info, struct lu_fid *fid,
+                          struct md_object_attr *attr)
+{
+        struct mdt_device *d = info->mti_mdt;
+        struct mdt_object *o;
+        struct iattr
+        int               result;
+
+        o = mdt_object_find(d, fid);
+        if (IS_ERR(o))
+                return PTR_ERR(o);
+
+        result = mdt_child_ops(d)->mdo_attr_get(mdt_object_child(o), name,
+                                                     mdt_object_child(child));
+                mdt_object_put(child);
+        } else
+                result = PTR_ERR(child);
+        mdt_object_unlock(d->mdt_namespace, o, lh);
+        mdt_object_put(o);
+        return result;
+}
+#endif
 static int mdt_getstatus(struct mdt_thread_info *info,
                          struct ptlrpc_request *req, int offset)
 {
@@ -90,6 +140,66 @@ static int mdt_getstatus(struct mdt_thread_info *info,
         RETURN(result);
 }
 
+static int mdt_statfs(struct mdt_thread_info *info,
+                      struct ptlrpc_request *req, int offset)
+{
+        struct md_device  *child  = info->mti_mdt->mdt_child;
+        struct obd_statfs *osfs;
+        struct kstatfs    sfs;
+        int               result;
+        int               size = sizeof(struct obd_statfs);
+        
+        ENTRY;
+
+        result = lustre_pack_reply(req, 1, &size, NULL);
+        if (result)
+                CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
+                       size);
+        else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
+                CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
+                result = -ENOMEM;
+        } else {
+                osfs = lustre_msg_buf(req->rq_repmsg, 0, size);
+                /* XXX max_age optimisation is needed here. See mds_statfs */
+                result = child->md_ops->mdo_statfs(child, &sfs);
+                statfs_pack(osfs, &sfs);
+        }
+out:
+        RETURN(result);
+}
+#if 0
+static int mdt_getattr(struct mdt_thread_info *info,
+                       struct ptlrpc_request *req, int offset)
+{
+        struct mdt_body        *body;
+        int                    size = sizeof (*body);
+        struct md_obj_attr     attr;
+        int result;
+        
+        ENTRY;
+        
+        result = lustre_pack_reply(req, 1, &size, NULL);
+        if (result)
+                CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
+                       size);
+        else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
+                CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
+                result = -ENOMEM;
+        } else {
+                body = lustre_swab_reqbuf(req, offset, size,
+                                          lustre_swab_mdt_body);
+                result = mdt_md_getattr(info, body->fid1);
+        }
+out:
+        RETURN(result);
+}
+#else
+static int mdt_getattr(struct mdt_thread_info *info,
+                       struct ptlrpc_request *req, int offset)
+{
+         return -EOPNOTSUPP;
+}
+#endif
 static int mdt_connect(struct mdt_thread_info *info,
                        struct ptlrpc_request *req, int offset)
 {
@@ -102,12 +212,6 @@ static int mdt_disconnect(struct mdt_thread_info *info,
         return -EOPNOTSUPP;
 }
 
-static int mdt_getattr(struct mdt_thread_info *info,
-                       struct ptlrpc_request *req, int offset)
-{
-        return -EOPNOTSUPP;
-}
-
 static int mdt_getattr_name(struct mdt_thread_info *info,
                             struct ptlrpc_request *req, int offset)
 {
@@ -126,12 +230,6 @@ static int mdt_getxattr(struct mdt_thread_info *info,
         return -EOPNOTSUPP;
 }
 
-static int mdt_statfs(struct mdt_thread_info *info,
-                      struct ptlrpc_request *req, int offset)
-{
-        return -EOPNOTSUPP;
-}
-
 static int mdt_readpage(struct mdt_thread_info *info,
                        struct ptlrpc_request *req, int offset)
 {
@@ -798,11 +896,14 @@ static int mdt_handle(struct ptlrpc_request *req)
 
         struct mdt_thread_info *info = ptlrpc_thread_key_get(req->rq_svc_thread,
                                                              &mdt_thread_key);
+        ENTRY;
+
         mdt_thread_info_init(info);
-        info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
+        /* it can be NULL while CONNECT */
+        if (req->rq_export)
+                info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
 
         result = mdt_handle0(req, info);
-
         mdt_thread_info_fini(info);
         return result;
 }
@@ -844,6 +945,12 @@ static void mdt_fini(struct mdt_device *m)
                 ldlm_namespace_free(m->mdt_namespace, 0);
                 m->mdt_namespace = NULL;
         }
+        /* finish the stack */
+        if (m->mdt_child) {
+                struct lu_device *child = md2lu_dev(m->mdt_child);
+                                
+                child->ld_ops->ldo_device_fini(child);
+        }
 
         LASSERT(atomic_read(&d->ld_ref) == 0);
         md_device_fini(&m->mdt_md_dev);
@@ -855,6 +962,7 @@ static int mdt_init0(struct mdt_device *m,
         struct lu_site *s;
         char   ns_name[48];
         struct obd_device * obd = NULL;
+        char *top = lustre_cfg_string(cfg, 0);
         char *child = lustre_cfg_string(cfg, 1);
 
         ENTRY;
@@ -874,6 +982,7 @@ static int mdt_init0(struct mdt_device *m,
                 m->mdt_child = lu2md_dev(obd->obd_lu_dev);
         } else {
                 CDEBUG(D_INFO, "Child device %s is not found\n", child);
+                return -EINVAL;
         }
 
         m->mdt_service_conf.psc_nbufs            = MDS_NBUFS;
@@ -908,7 +1017,18 @@ static int mdt_init0(struct mdt_device *m,
                                      NULL);
         if (m->mdt_service == NULL)
                 return -ENOMEM;
-
+        
+        /* init the stack */
+        if (m->mdt_child) {
+                struct lu_device *child = md2lu_dev(m->mdt_child);
+                int err;
+                
+                if (child->ld_ops->ldo_device_init) {
+                        err = child->ld_ops->ldo_device_init(child, top);
+                        if (err) 
+                                return err;                        
+                }
+        }
         return ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
 }
 
@@ -971,46 +1091,69 @@ static struct lu_device_operations mdt_lu_ops = {
         .ldo_object_print   = mdt_object_print
 };
 
-static struct md_object *mdt_object_child(struct mdt_object *o)
+/* mds_connect copy */
+static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
+                           struct obd_uuid *cluuid,
+                           struct obd_connect_data *data)
 {
-        return lu2md(lu_object_next(&o->mot_obj.mo_lu));
-}
-
-static inline struct md_device_operations *mdt_child_ops(struct mdt_device *d)
-{
-        return d->mdt_child->md_ops;
-}
-
-static int mdt_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
-                     struct lu_fid *pfid, const char *name, struct lu_fid *cfid)
-{
-        struct mdt_object      *o;
-        struct mdt_object      *child;
-        struct mdt_lock_handle *lh;
+        struct obd_export *exp;
+        int rc, abort_recovery;
+        struct mds_export_data *med;
+        struct mds_client_data *mcd = NULL;
+        
+        ENTRY;
 
-        int result;
+        if (!conn || !obd || !cluuid)
+                RETURN(-EINVAL);
 
-        lh = &info->mti_lh[MDT_LH_PARENT];
-        lh->mlh_mode = LCK_PW;
+        /* Check for aborted recovery. */
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        abort_recovery = obd->obd_abort_recovery;
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        if (abort_recovery)
+                target_abort_recovery(obd);
 
-        o = mdt_object_find_lock(d, pfid, lh, MDS_INODELOCK_UPDATE);
-        if (IS_ERR(o))
-                return PTR_ERR(o);
+        /* XXX There is a small race between checking the list and adding a
+         * new connection for the same UUID, but the real threat (list
+         * corruption when multiple different clients connect) is solved.
+         *
+         * There is a second race between adding the export to the list,
+         * and filling in the client data below.  Hence skipping the case
+         * of NULL mcd above.  We should already be controlling multiple
+         * connects at the client, and we can't hold the spinlock over
+         * memory allocations without risk of deadlocking.
+         */
+        rc = class_connect(conn, obd, cluuid);
+        if (rc)
+                RETURN(rc);
+        exp = class_conn2export(conn);
+        LASSERT(exp);
+        med = &exp->exp_mds_data;
+        
+        OBD_ALLOC(mcd, sizeof(*mcd));
+        if (!mcd)
+                GOTO(out, rc = -ENOMEM);
+
+        memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
+        med->med_mcd = mcd;
+        
+out:
+        if (rc) {
+                if (mcd) {
+                        OBD_FREE(mcd, sizeof(*mcd));
+                        med->med_mcd = NULL;
+                }
+                class_disconnect(exp);
+        } else {
+                class_export_put(exp);
+        }
 
-        child = mdt_object_find(d, cfid);
-        if (!IS_ERR(child)) {
-                result = mdt_child_ops(d)->mdo_mkdir(mdt_object_child(o), name,
-                                                     mdt_object_child(child));
-                mdt_object_put(child);
-        } else
-                result = PTR_ERR(child);
-        mdt_object_unlock(d->mdt_namespace, o, lh);
-        mdt_object_put(o);
-        return result;
+        RETURN(rc);
 }
 
 static struct obd_ops mdt_obd_device_ops = {
-        .o_owner = THIS_MODULE
+        .o_owner = THIS_MODULE,
+        .o_connect = mdt_obd_connect
 };
 
 static struct lu_device *mdt_device_alloc(struct lu_device_type *t,
index 42eaed8..7ab6489 100644 (file)
@@ -77,6 +77,12 @@ struct mdt_device {
         unsigned long              mdt_flags;
 };
 
+static inline struct md_device_operations *mdt_child_ops(struct mdt_device * m)
+{
+        LASSERT(m->mdt_child);
+        return m->mdt_child->md_ops;
+}
+
 enum mdt_flags {
         /*
          * This mdt works with legacy clients with different resource name
@@ -90,6 +96,11 @@ struct mdt_object {
         struct md_object        mot_obj;
 };
 
+static inline struct md_object *mdt_object_child(struct mdt_object *o)
+{
+        return lu2md(lu_object_next(&o->mot_obj.mo_lu));
+}
+
 struct mdt_lock_handle {
         struct lustre_handle    mlh_lh;
         ldlm_mode_t             mlh_mode;