Whamcloud - gitweb
LU-718 mds: new MDS layer.
authorwangdi <di.wang@whamcloud.com>
Thu, 26 Sep 2013 08:36:35 +0000 (01:36 -0700)
committerOleg Drokin <green@whamcloud.com>
Mon, 7 Jan 2013 17:12:24 +0000 (12:12 -0500)
Pull out ptlrpc service from MDT and create new MDS layer.

Change-Id: Ib8a5d22c1f85d086f6ed37e4c46c7aa0e3c4b03a
Signed-off-by: Wang Di <di.wang@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/4354
Tested-by: Hudson
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_mds.h
lustre/include/lustre_net.h
lustre/mdt/Makefile.in
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lproc.c
lustre/mdt/mdt_mds.c [new file with mode: 0644]
lustre/obdclass/genops.c
lustre/obdclass/lu_object.c
lustre/obdclass/obd_mount.c
lustre/tests/conf-sanity.sh

index a7ca7a5..0fa15cb 100644 (file)
@@ -66,7 +66,6 @@ struct mds_capa_info {
 
 #define MDD_OBD_NAME     "mdd_obd"
 #define MDD_OBD_UUID     "mdd_obd_uuid"
 
 #define MDD_OBD_NAME     "mdd_obd"
 #define MDD_OBD_UUID     "mdd_obd_uuid"
-#define MDD_OBD_TYPE     "mds"
 
 static inline int md_should_create(__u64 flags)
 {
 
 static inline int md_should_create(__u64 flags)
 {
index 25ee3e7..c02ecc2 100644 (file)
  *
  * Examples
  *
  *
  * Examples
  *
- * #define MDT_NTHRS_INIT      2
- * #define MDT_NTHRS_BASE      64
- * #define MDT_NTHRS_FACTOR    8
- * #define MDT_NTHRS_MAX       1024
+ * #define MDS_NTHRS_INIT      2
+ * #define MDS_NTHRS_BASE      64
+ * #define MDS_NTHRS_FACTOR    8
+ * #define MDS_NTHRS_MAX       1024
  *
  * Example 1):
  * ---------------------------------------------------------------------
  * Server(A) has 16 cores, user configured it to 4 partitions so each
  * partition has 4 cores, then actual number of service threads on each
  * partition is:
  *
  * Example 1):
  * ---------------------------------------------------------------------
  * Server(A) has 16 cores, user configured it to 4 partitions so each
  * partition has 4 cores, then actual number of service threads on each
  * partition is:
- *     MDT_NTHRS_BASE(64) + cores(4) * MDT_NTHRS_FACTOR(8) = 96
+ *     MDS_NTHRS_BASE(64) + cores(4) * MDS_NTHRS_FACTOR(8) = 96
  *
  * Total number of threads for the service is:
  *     96 * partitions(4) = 384
  *
  * Total number of threads for the service is:
  *     96 * partitions(4) = 384
  * Server(B) has 32 cores, user configured it to 4 partitions so each
  * partition has 8 cores, then actual number of service threads on each
  * partition is:
  * Server(B) has 32 cores, user configured it to 4 partitions so each
  * partition has 8 cores, then actual number of service threads on each
  * partition is:
- *     MDT_NTHRS_BASE(64) + cores(8) * MDT_NTHRS_FACTOR(8) = 128
+ *     MDS_NTHRS_BASE(64) + cores(8) * MDS_NTHRS_FACTOR(8) = 128
  *
  * Total number of threads for the service is:
  *     128 * partitions(4) = 512
  *
  * Total number of threads for the service is:
  *     128 * partitions(4) = 512
  * Server(B) has 96 cores, user configured it to 8 partitions so each
  * partition has 12 cores, then actual number of service threads on each
  * partition is:
  * Server(B) has 96 cores, user configured it to 8 partitions so each
  * partition has 12 cores, then actual number of service threads on each
  * partition is:
- *     MDT_NTHRS_BASE(64) + cores(12) * MDT_NTHRS_FACTOR(8) = 160
+ *     MDS_NTHRS_BASE(64) + cores(12) * MDS_NTHRS_FACTOR(8) = 160
  *
  * Total number of threads for the service is:
  *     160 * partitions(8) = 1280
  *
  *
  * Total number of threads for the service is:
  *     160 * partitions(8) = 1280
  *
- * However, it's above the soft limit MDT_NTHRS_MAX, so we choose this number
+ * However, it's above the soft limit MDS_NTHRS_MAX, so we choose this number
  * as upper limit of threads number for each partition:
  * as upper limit of threads number for each partition:
- *     MDT_NTHRS_MAX(1024) / partitions(8) = 128
+ *     MDS_NTHRS_MAX(1024) / partitions(8) = 128
  *
  * Example 4):
  * ---------------------------------------------------------------------
  * Server(C) have a thousand of cores and user configured it to 32 partitions
  *
  * Example 4):
  * ---------------------------------------------------------------------
  * Server(C) have a thousand of cores and user configured it to 32 partitions
- *     MDT_NTHRS_BASE(64) * 32 = 2048
+ *     MDS_NTHRS_BASE(64) * 32 = 2048
  *
  *
- * which is already above soft limit MDT_NTHRS_MAX(1024), but we still need
- * to guarantee that each partition has at least MDT_NTHRS_BASE(64) threads
+ * which is already above soft limit MDS_NTHRS_MAX(1024), but we still need
+ * to guarantee that each partition has at least MDS_NTHRS_BASE(64) threads
  * to keep service healthy, so total number of threads will just be 2048.
  *
  * NB: we don't suggest to choose server with that many cores because backend
  * to keep service healthy, so total number of threads will just be 2048.
  *
  * NB: we don't suggest to choose server with that many cores because backend
   * Please see examples in "Thread Constants", MDS threads number will be at
   * the comparable level of old versions, unless the server has many cores.
   */
   * Please see examples in "Thread Constants", MDS threads number will be at
   * the comparable level of old versions, unless the server has many cores.
   */
-#ifndef MDT_MAX_THREADS
-#define MDT_MAX_THREADS                1024
-#define MDT_MAX_OTHR_THREADS   256
-
-#else /* MDT_MAX_THREADS */
-#if MDT_MAX_THREADS < PTLRPC_NTHRS_INIT
-#undef MDT_MAX_THREADS
-#define MDT_MAX_THREADS        PTLRPC_NTHRS_INIT
+#ifndef MDS_MAX_THREADS
+#define MDS_MAX_THREADS                1024
+#define MDS_MAX_OTHR_THREADS   256
+
+#else /* MDS_MAX_THREADS */
+#if MDS_MAX_THREADS < PTLRPC_NTHRS_INIT
+#undef MDS_MAX_THREADS
+#define MDS_MAX_THREADS        PTLRPC_NTHRS_INIT
 #endif
 #endif
-#define MDT_MAX_OTHR_THREADS   max(PTLRPC_NTHRS_INIT, MDT_MAX_THREADS / 2)
+#define MDS_MAX_OTHR_THREADS   max(PTLRPC_NTHRS_INIT, MDS_MAX_THREADS / 2)
 #endif
 
 /* default service */
 #endif
 
 /* default service */
-#define MDT_THR_FACTOR         8
-#define MDT_NTHRS_INIT         PTLRPC_NTHRS_INIT
-#define MDT_NTHRS_MAX          MDT_MAX_THREADS
-#define MDT_NTHRS_BASE         min(64, MDT_NTHRS_MAX)
+#define MDS_THR_FACTOR         8
+#define MDS_NTHRS_INIT         PTLRPC_NTHRS_INIT
+#define MDS_NTHRS_MAX          MDS_MAX_THREADS
+#define MDS_NTHRS_BASE         min(64, MDS_NTHRS_MAX)
 
 /* read-page service */
 
 /* read-page service */
-#define MDT_RDPG_THR_FACTOR    4
-#define MDT_RDPG_NTHRS_INIT    PTLRPC_NTHRS_INIT
-#define MDT_RDPG_NTHRS_MAX     MDT_MAX_OTHR_THREADS
-#define MDT_RDPG_NTHRS_BASE    min(48, MDT_RDPG_NTHRS_MAX)
+#define MDS_RDPG_THR_FACTOR    4
+#define MDS_RDPG_NTHRS_INIT    PTLRPC_NTHRS_INIT
+#define MDS_RDPG_NTHRS_MAX     MDS_MAX_OTHR_THREADS
+#define MDS_RDPG_NTHRS_BASE    min(48, MDS_RDPG_NTHRS_MAX)
 
 /* these should be removed when we remove setattr service in the future */
 
 /* these should be removed when we remove setattr service in the future */
-#define MDT_SETA_THR_FACTOR    4
-#define MDT_SETA_NTHRS_INIT    PTLRPC_NTHRS_INIT
-#define MDT_SETA_NTHRS_MAX     MDT_MAX_OTHR_THREADS
-#define MDT_SETA_NTHRS_BASE    min(48, MDT_SETA_NTHRS_MAX)
+#define MDS_SETA_THR_FACTOR    4
+#define MDS_SETA_NTHRS_INIT    PTLRPC_NTHRS_INIT
+#define MDS_SETA_NTHRS_MAX     MDS_MAX_OTHR_THREADS
+#define MDS_SETA_NTHRS_BASE    min(48, MDS_SETA_NTHRS_MAX)
 
 /* non-affinity threads */
 
 /* non-affinity threads */
-#define MDT_OTHR_NTHRS_INIT    PTLRPC_NTHRS_INIT
-#define MDT_OTHR_NTHRS_MAX     MDT_MAX_OTHR_THREADS
+#define MDS_OTHR_NTHRS_INIT    PTLRPC_NTHRS_INIT
+#define MDS_OTHR_NTHRS_MAX     MDS_MAX_OTHR_THREADS
 
 #define MDS_NBUFS              (64 * cfs_num_online_cpus())
 /**
 
 #define MDS_NBUFS              (64 * cfs_num_online_cpus())
 /**
index 8573464..4865a02 100644 (file)
@@ -1,6 +1,6 @@
 MODULES := mdt
 mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_recovery.o
 mdt-objs += mdt_open.o mdt_idmap.o mdt_identity.o mdt_capa.o mdt_lproc.o mdt_fs.o
 MODULES := mdt
 mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_recovery.o
 mdt-objs += mdt_open.o mdt_idmap.o mdt_identity.o mdt_capa.o mdt_lproc.o mdt_fs.o
-mdt-objs += mdt_lvb.o mdt_hsm.o
+mdt-objs += mdt_lvb.o mdt_hsm.o mdt_mds.o
 
 @INCLUDE_RULES@
 
 @INCLUDE_RULES@
index 1afa76d..58ee664 100644 (file)
@@ -91,102 +91,8 @@ ldlm_mode_t mdt_dlm_lock_modes[] = {
         [MDL_GROUP]   = LCK_GROUP
 };
 
         [MDL_GROUP]   = LCK_GROUP
 };
 
-/*
- * Initialized in mdt_mod_init().
- */
-static unsigned long mdt_num_threads;
-CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
-               "number of MDS service threads to start "
-               "(deprecated in favor of mds_num_threads)");
-
-static unsigned long mds_num_threads;
-CFS_MODULE_PARM(mds_num_threads, "ul", ulong, 0444,
-               "number of MDS service threads to start");
-
-static char *mds_num_cpts;
-CFS_MODULE_PARM(mds_num_cpts, "c", charp, 0444,
-               "CPU partitions MDS threads should run on");
-
-static unsigned long mds_rdpg_num_threads;
-CFS_MODULE_PARM(mds_rdpg_num_threads, "ul", ulong, 0444,
-               "number of MDS readpage service threads to start");
-
-static char *mds_rdpg_num_cpts;
-CFS_MODULE_PARM(mds_rdpg_num_cpts, "c", charp, 0444,
-               "CPU partitions MDS readpage threads should run on");
-
-/* NB: these two should be removed along with setattr service in the future */
-static unsigned long mds_attr_num_threads;
-CFS_MODULE_PARM(mds_attr_num_threads, "ul", ulong, 0444,
-               "number of MDS setattr service threads to start");
-
-static char *mds_attr_num_cpts;
-CFS_MODULE_PARM(mds_attr_num_cpts, "c", charp, 0444,
-               "CPU partitions MDS setattr threads should run on");
-
-/* ptlrpc request handler for MDT. All handlers are
- * grouped into several slices - struct mdt_opc_slice,
- * and stored in an array - mdt_handlers[].
- */
-struct mdt_handler {
-        /* The name of this handler. */
-        const char *mh_name;
-        /* Fail id for this handler, checked at the beginning of this handler*/
-        int         mh_fail_id;
-        /* Operation code for this handler */
-        __u32       mh_opc;
-        /* flags are listed in enum mdt_handler_flags below. */
-        __u32       mh_flags;
-        /* The actual handler function to execute. */
-        int (*mh_act)(struct mdt_thread_info *info);
-        /* Request format for this request. */
-        const struct req_format *mh_fmt;
-};
-
-enum mdt_handler_flags {
-        /*
-         * struct mdt_body is passed in the incoming message, and object
-         * identified by this fid exists on disk.
-         *
-         * "habeo corpus" == "I have a body"
-         */
-        HABEO_CORPUS = (1 << 0),
-        /*
-         * struct ldlm_request is passed in the incoming message.
-         *
-         * "habeo clavis" == "I have a key"
-         */
-        HABEO_CLAVIS = (1 << 1),
-        /*
-         * this request has fixed reply format, so that reply message can be
-         * packed by generic code.
-         *
-         * "habeo refero" == "I have a reply"
-         */
-        HABEO_REFERO = (1 << 2),
-        /*
-         * this request will modify something, so check whether the filesystem
-         * is readonly or not, then return -EROFS to client asap if necessary.
-         *
-         * "mutabor" == "I shall modify"
-         */
-        MUTABOR      = (1 << 3)
-};
-
-struct mdt_opc_slice {
-        __u32               mos_opc_start;
-        int                 mos_opc_end;
-        struct mdt_handler *mos_hs;
-};
-
-static struct mdt_opc_slice mdt_regular_handlers[];
-static struct mdt_opc_slice mdt_readpage_handlers[];
-static struct mdt_opc_slice mdt_xmds_handlers[];
-static struct mdt_opc_slice mdt_seq_handlers[];
-static struct mdt_opc_slice mdt_fld_handlers[];
 
 static struct mdt_device *mdt_dev(struct lu_device *d);
 
 static struct mdt_device *mdt_dev(struct lu_device *d);
-static int mdt_regular_handle(struct ptlrpc_request *req);
 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
 static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
                         struct getinfo_fid2path *fp);
 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
 static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
                         struct getinfo_fid2path *fp);
@@ -329,7 +235,7 @@ static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o
         EXIT;
 }
 
         EXIT;
 }
 
-static int mdt_getstatus(struct mdt_thread_info *info)
+int mdt_getstatus(struct mdt_thread_info *info)
 {
         struct mdt_device *mdt  = info->mti_mdt;
         struct md_device  *next = mdt->mdt_child;
 {
         struct mdt_device *mdt  = info->mti_mdt;
         struct md_device  *next = mdt->mdt_child;
@@ -374,7 +280,7 @@ static int mdt_getstatus(struct mdt_thread_info *info)
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static int mdt_statfs(struct mdt_thread_info *info)
+int mdt_statfs(struct mdt_thread_info *info)
 {
        struct ptlrpc_request           *req = mdt_info_req(info);
        struct md_device                *next = info->mti_mdt->mdt_child;
 {
        struct ptlrpc_request           *req = mdt_info_req(info);
        struct md_device                *next = info->mti_mdt->mdt_child;
@@ -1012,7 +918,7 @@ static int mdt_renew_capa(struct mdt_thread_info *info)
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static int mdt_getattr(struct mdt_thread_info *info)
+int mdt_getattr(struct mdt_thread_info *info)
 {
         struct mdt_object       *obj = info->mti_object;
         struct req_capsule      *pill = info->mti_pill;
 {
         struct mdt_object       *obj = info->mti_object;
         struct req_capsule      *pill = info->mti_pill;
@@ -1079,7 +985,7 @@ out_shrink:
         return rc;
 }
 
         return rc;
 }
 
-static int mdt_is_subdir(struct mdt_thread_info *info)
+int mdt_is_subdir(struct mdt_thread_info *info)
 {
         struct mdt_object     *o = info->mti_object;
         struct req_capsule    *pill = info->mti_pill;
 {
         struct mdt_object     *o = info->mti_object;
         struct req_capsule    *pill = info->mti_pill;
@@ -1427,7 +1333,7 @@ out_parent:
 }
 
 /* normal handler: should release the child lock */
 }
 
 /* normal handler: should release the child lock */
-static int mdt_getattr_name(struct mdt_thread_info *info)
+int mdt_getattr_name(struct mdt_thread_info *info)
 {
         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
         struct mdt_body        *reqbody;
 {
         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
         struct mdt_body        *reqbody;
@@ -1463,17 +1369,10 @@ out_shrink:
         return rc;
 }
 
         return rc;
 }
 
-static const struct lu_device_operations mdt_lu_ops;
-
-static int lu_device_is_mdt(struct lu_device *d)
-{
-        return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
-}
-
 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                          void *karg, void *uarg);
 
 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                          void *karg, void *uarg);
 
-static int mdt_set_info(struct mdt_thread_info *info)
+int mdt_set_info(struct mdt_thread_info *info)
 {
         struct ptlrpc_request *req = mdt_info_req(info);
         char *key;
 {
         struct ptlrpc_request *req = mdt_info_req(info);
         char *key;
@@ -1540,7 +1439,7 @@ static int mdt_set_info(struct mdt_thread_info *info)
 /**
  * Top-level handler for MDT connection requests.
  */
 /**
  * Top-level handler for MDT connection requests.
  */
-static int mdt_connect(struct mdt_thread_info *info)
+int mdt_connect(struct mdt_thread_info *info)
 {
        int rc;
        struct obd_connect_data *reply;
 {
        int rc;
        struct obd_connect_data *reply;
@@ -1576,7 +1475,7 @@ static int mdt_connect(struct mdt_thread_info *info)
        return rc;
 }
 
        return rc;
 }
 
-static int mdt_disconnect(struct mdt_thread_info *info)
+int mdt_disconnect(struct mdt_thread_info *info)
 {
         int rc;
         ENTRY;
 {
         int rc;
         ENTRY;
@@ -1622,7 +1521,7 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static int mdt_readpage(struct mdt_thread_info *info)
+int mdt_readpage(struct mdt_thread_info *info)
 {
         struct mdt_object *object = info->mti_object;
         struct lu_rdpg    *rdpg = &info->mti_u.rdpg.mti_rdpg;
 {
         struct mdt_object *object = info->mti_object;
         struct lu_rdpg    *rdpg = &info->mti_u.rdpg.mti_rdpg;
@@ -1781,7 +1680,7 @@ static long mdt_reint_opcode(struct mdt_thread_info *info,
         return opc;
 }
 
         return opc;
 }
 
-static int mdt_reint(struct mdt_thread_info *info)
+int mdt_reint(struct mdt_thread_info *info)
 {
         long opc;
         int  rc;
 {
         long opc;
         int  rc;
@@ -1842,7 +1741,7 @@ static int mdt_object_sync(struct mdt_thread_info *info)
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static int mdt_sync(struct mdt_thread_info *info)
+int mdt_sync(struct mdt_thread_info *info)
 {
         struct ptlrpc_request *req = mdt_info_req(info);
         struct req_capsule *pill = info->mti_pill;
 {
         struct ptlrpc_request *req = mdt_info_req(info);
         struct req_capsule *pill = info->mti_pill;
@@ -1900,7 +1799,7 @@ static int mdt_sync(struct mdt_thread_info *info)
  * Quotacheck handler.
  * in-kernel quotacheck isn't supported any more.
  */
  * Quotacheck handler.
  * in-kernel quotacheck isn't supported any more.
  */
-static int mdt_quotacheck(struct mdt_thread_info *info)
+int mdt_quotacheck(struct mdt_thread_info *info)
 {
        struct obd_quotactl     *oqctl;
        int                      rc;
 {
        struct obd_quotactl     *oqctl;
        int                      rc;
@@ -1922,7 +1821,7 @@ static int mdt_quotacheck(struct mdt_thread_info *info)
  * Handle quota control requests to consult current usage/limit, but also
  * to configure quota enforcement
  */
  * Handle quota control requests to consult current usage/limit, but also
  * to configure quota enforcement
  */
-static int mdt_quotactl(struct mdt_thread_info *info)
+int mdt_quotactl(struct mdt_thread_info *info)
 {
        struct obd_export       *exp  = info->mti_exp;
        struct req_capsule      *pill = info->mti_pill;
 {
        struct obd_export       *exp  = info->mti_exp;
        struct req_capsule      *pill = info->mti_pill;
@@ -2029,7 +1928,7 @@ static int mdt_quotactl(struct mdt_thread_info *info)
 /*
  * OBD PING and other handlers.
  */
 /*
  * OBD PING and other handlers.
  */
-static int mdt_obd_ping(struct mdt_thread_info *info)
+int mdt_obd_ping(struct mdt_thread_info *info)
 {
         int rc;
         ENTRY;
 {
         int rc;
         ENTRY;
@@ -2045,7 +1944,7 @@ static int mdt_obd_ping(struct mdt_thread_info *info)
 /*
  * OBD_IDX_READ handler
  */
 /*
  * OBD_IDX_READ handler
  */
-static int mdt_obd_idx_read(struct mdt_thread_info *info)
+int mdt_obd_idx_read(struct mdt_thread_info *info)
 {
        struct mdt_device       *mdt = info->mti_mdt;
        struct lu_rdpg          *rdpg = &info->mti_u.rdpg.mti_rdpg;
 {
        struct mdt_device       *mdt = info->mti_mdt;
        struct lu_rdpg          *rdpg = &info->mti_u.rdpg.mti_rdpg;
@@ -2125,17 +2024,16 @@ out:
        return rc;
 }
 
        return rc;
 }
 
-static int mdt_obd_log_cancel(struct mdt_thread_info *info)
+int mdt_obd_log_cancel(struct mdt_thread_info *info)
 {
         return err_serious(-EOPNOTSUPP);
 }
 
 {
         return err_serious(-EOPNOTSUPP);
 }
 
-static int mdt_obd_qc_callback(struct mdt_thread_info *info)
+int mdt_obd_qc_callback(struct mdt_thread_info *info)
 {
         return err_serious(-EOPNOTSUPP);
 }
 
 {
         return err_serious(-EOPNOTSUPP);
 }
 
-
 /*
  * LLOG handlers.
  */
 /*
  * LLOG handlers.
  */
@@ -2184,7 +2082,7 @@ static int mdt_llog_ctxt_unclone(const struct lu_env *env,
         return 0;
 }
 
         return 0;
 }
 
-static int mdt_llog_create(struct mdt_thread_info *info)
+int mdt_llog_create(struct mdt_thread_info *info)
 {
        int rc;
 
 {
        int rc;
 
@@ -2193,7 +2091,7 @@ static int mdt_llog_create(struct mdt_thread_info *info)
        return (rc < 0 ? err_serious(rc) : rc);
 }
 
        return (rc < 0 ? err_serious(rc) : rc);
 }
 
-static int mdt_llog_destroy(struct mdt_thread_info *info)
+int mdt_llog_destroy(struct mdt_thread_info *info)
 {
         int rc;
 
 {
         int rc;
 
@@ -2202,7 +2100,7 @@ static int mdt_llog_destroy(struct mdt_thread_info *info)
         return (rc < 0 ? err_serious(rc) : rc);
 }
 
         return (rc < 0 ? err_serious(rc) : rc);
 }
 
-static int mdt_llog_read_header(struct mdt_thread_info *info)
+int mdt_llog_read_header(struct mdt_thread_info *info)
 {
         int rc;
 
 {
         int rc;
 
@@ -2211,7 +2109,7 @@ static int mdt_llog_read_header(struct mdt_thread_info *info)
         return (rc < 0 ? err_serious(rc) : rc);
 }
 
         return (rc < 0 ? err_serious(rc) : rc);
 }
 
-static int mdt_llog_next_block(struct mdt_thread_info *info)
+int mdt_llog_next_block(struct mdt_thread_info *info)
 {
         int rc;
 
 {
         int rc;
 
@@ -2220,7 +2118,7 @@ static int mdt_llog_next_block(struct mdt_thread_info *info)
         return (rc < 0 ? err_serious(rc) : rc);
 }
 
         return (rc < 0 ? err_serious(rc) : rc);
 }
 
-static int mdt_llog_prev_block(struct mdt_thread_info *info)
+int mdt_llog_prev_block(struct mdt_thread_info *info)
 {
         int rc;
 
 {
         int rc;
 
@@ -2239,7 +2137,7 @@ static struct ldlm_callback_suite cbs = {
        .lcs_glimpse    = ldlm_server_glimpse_ast
 };
 
        .lcs_glimpse    = ldlm_server_glimpse_ast
 };
 
-static int mdt_enqueue(struct mdt_thread_info *info)
+int mdt_enqueue(struct mdt_thread_info *info)
 {
         struct ptlrpc_request *req;
         int rc;
 {
         struct ptlrpc_request *req;
         int rc;
@@ -2257,7 +2155,7 @@ static int mdt_enqueue(struct mdt_thread_info *info)
         return rc ? err_serious(rc) : req->rq_status;
 }
 
         return rc ? err_serious(rc) : req->rq_status;
 }
 
-static int mdt_convert(struct mdt_thread_info *info)
+int mdt_convert(struct mdt_thread_info *info)
 {
         int rc;
         struct ptlrpc_request *req;
 {
         int rc;
         struct ptlrpc_request *req;
@@ -2268,14 +2166,14 @@ static int mdt_convert(struct mdt_thread_info *info)
         return rc ? err_serious(rc) : req->rq_status;
 }
 
         return rc ? err_serious(rc) : req->rq_status;
 }
 
-static int mdt_bl_callback(struct mdt_thread_info *info)
+int mdt_bl_callback(struct mdt_thread_info *info)
 {
         CERROR("bl callbacks should not happen on MDS\n");
         LBUG();
         return err_serious(-EOPNOTSUPP);
 }
 
 {
         CERROR("bl callbacks should not happen on MDS\n");
         LBUG();
         return err_serious(-EOPNOTSUPP);
 }
 
-static int mdt_cp_callback(struct mdt_thread_info *info)
+int mdt_cp_callback(struct mdt_thread_info *info)
 {
         CERROR("cp callbacks should not happen on MDS\n");
         LBUG();
 {
         CERROR("cp callbacks should not happen on MDS\n");
         LBUG();
@@ -2285,7 +2183,7 @@ static int mdt_cp_callback(struct mdt_thread_info *info)
 /*
  * sec context handlers
  */
 /*
  * sec context handlers
  */
-static int mdt_sec_ctx_handle(struct mdt_thread_info *info)
+int mdt_sec_ctx_handle(struct mdt_thread_info *info)
 {
         int rc;
 
 {
         int rc;
 
@@ -2308,7 +2206,7 @@ static int mdt_sec_ctx_handle(struct mdt_thread_info *info)
 /*
  * quota request handlers
  */
 /*
  * quota request handlers
  */
-static int mdt_quota_dqacq(struct mdt_thread_info *info)
+int mdt_quota_dqacq(struct mdt_thread_info *info)
 {
        struct lu_device        *qmt = info->mti_mdt->mdt_qmt_dev;
        int                      rc;
 {
        struct lu_device        *qmt = info->mti_mdt->mdt_qmt_dev;
        int                      rc;
@@ -2961,7 +2859,6 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
                                  struct mdt_thread_info *info)
 {
         int i;
                                  struct mdt_thread_info *info)
 {
         int i;
-        struct md_capainfo *ci;
 
         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
         info->mti_pill = &req->rq_pill;
 
         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
         info->mti_pill = &req->rq_pill;
@@ -2977,18 +2874,6 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
         } else
                 info->mti_mdt = NULL;
         info->mti_env = req->rq_svc_thread->t_env;
         } else
                 info->mti_mdt = NULL;
         info->mti_env = req->rq_svc_thread->t_env;
-        ci = md_capainfo(info->mti_env);
-        memset(ci, 0, sizeof *ci);
-        if (req->rq_export) {
-                if (exp_connect_rmtclient(req->rq_export))
-                        ci->mc_auth = LC_ID_CONVERT;
-                else if (req->rq_export->exp_connect_flags &
-                         OBD_CONNECT_MDS_CAPA)
-                        ci->mc_auth = LC_ID_PLAIN;
-                else
-                        ci->mc_auth = LC_ID_NONE;
-        }
-
         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
         info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
         info->mti_mos = NULL;
         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
         info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
         info->mti_mos = NULL;
@@ -3259,8 +3144,8 @@ static int mdt_handle0(struct ptlrpc_request *req,
  * XXX common "target" functionality should be factored into separate module
  * shared by mdt, ost and stand-alone services like fld.
  */
  * XXX common "target" functionality should be factored into separate module
  * shared by mdt, ost and stand-alone services like fld.
  */
-static int mdt_handle_common(struct ptlrpc_request *req,
-                             struct mdt_opc_slice *supported)
+int mdt_handle_common(struct ptlrpc_request *req,
+                     struct mdt_opc_slice *supported)
 {
         struct lu_env          *env;
         struct mdt_thread_info *info;
 {
         struct lu_env          *env;
         struct mdt_thread_info *info;
@@ -3268,6 +3153,11 @@ static int mdt_handle_common(struct ptlrpc_request *req,
         ENTRY;
 
         env = req->rq_svc_thread->t_env;
         ENTRY;
 
         env = req->rq_svc_thread->t_env;
+       /* Refill(initilize) the context(mdt_thread_info), in case it is
+        * not initialized yet. Usually it happens during start up, after
+        * MDS(ptlrpc threads) is start up, it gets the first CONNECT request,
+        * before MDT_thread_info is initialized */
+       lu_env_refill(env);
         LASSERT(env != NULL);
         LASSERT(env->le_ses != NULL);
         LASSERT(env->le_ctx.lc_thread == req->rq_svc_thread);
         LASSERT(env != NULL);
         LASSERT(env->le_ses != NULL);
         LASSERT(env->le_ctx.lc_thread == req->rq_svc_thread);
@@ -3306,41 +3196,6 @@ int mdt_recovery_handle(struct ptlrpc_request *req)
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static int mdt_regular_handle(struct ptlrpc_request *req)
-{
-        return mdt_handle_common(req, mdt_regular_handlers);
-}
-
-static int mdt_readpage_handle(struct ptlrpc_request *req)
-{
-        return mdt_handle_common(req, mdt_readpage_handlers);
-}
-
-static int mdt_xmds_handle(struct ptlrpc_request *req)
-{
-        return mdt_handle_common(req, mdt_xmds_handlers);
-}
-
-static int mdt_mdsc_handle(struct ptlrpc_request *req)
-{
-        return mdt_handle_common(req, mdt_seq_handlers);
-}
-
-static int mdt_mdss_handle(struct ptlrpc_request *req)
-{
-        return mdt_handle_common(req, mdt_seq_handlers);
-}
-
-static int mdt_dtss_handle(struct ptlrpc_request *req)
-{
-        return mdt_handle_common(req, mdt_seq_handlers);
-}
-
-static int mdt_fld_handle(struct ptlrpc_request *req)
-{
-        return mdt_handle_common(req, mdt_fld_handlers);
-}
-
 enum mdt_it_code {
         MDT_IT_OPEN,
         MDT_IT_OCREAT,
 enum mdt_it_code {
         MDT_IT_OPEN,
         MDT_IT_OCREAT,
@@ -4156,375 +4011,6 @@ static int mdt_fld_init(const struct lu_env *env,
         RETURN(0);
 }
 
         RETURN(0);
 }
 
-/* device init/fini methods */
-static void mdt_stop_ptlrpc_service(struct mdt_device *m)
-{
-        ENTRY;
-        if (m->mdt_regular_service != NULL) {
-                ptlrpc_unregister_service(m->mdt_regular_service);
-                m->mdt_regular_service = NULL;
-        }
-        if (m->mdt_readpage_service != NULL) {
-                ptlrpc_unregister_service(m->mdt_readpage_service);
-                m->mdt_readpage_service = NULL;
-        }
-        if (m->mdt_xmds_service != NULL) {
-                ptlrpc_unregister_service(m->mdt_xmds_service);
-                m->mdt_xmds_service = NULL;
-        }
-        if (m->mdt_setattr_service != NULL) {
-                ptlrpc_unregister_service(m->mdt_setattr_service);
-                m->mdt_setattr_service = NULL;
-        }
-        if (m->mdt_mdsc_service != NULL) {
-                ptlrpc_unregister_service(m->mdt_mdsc_service);
-                m->mdt_mdsc_service = NULL;
-        }
-        if (m->mdt_mdss_service != NULL) {
-                ptlrpc_unregister_service(m->mdt_mdss_service);
-                m->mdt_mdss_service = NULL;
-        }
-        if (m->mdt_dtss_service != NULL) {
-                ptlrpc_unregister_service(m->mdt_dtss_service);
-                m->mdt_dtss_service = NULL;
-        }
-        if (m->mdt_fld_service != NULL) {
-                ptlrpc_unregister_service(m->mdt_fld_service);
-                m->mdt_fld_service = NULL;
-        }
-        EXIT;
-}
-
-static int mdt_start_ptlrpc_service(struct mdt_device *m)
-{
-        static struct ptlrpc_service_conf conf;
-        cfs_proc_dir_entry_t *procfs_entry;
-       int rc = 0;
-       ENTRY;
-
-       m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
-       ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
-                          "mdt_ldlm_client", m->mdt_ldlm_client);
-
-       procfs_entry = m->mdt_md_dev.md_lu_dev.ld_obd->obd_proc_entry;
-
-       conf = (typeof(conf)) {
-               .psc_name               = LUSTRE_MDT_NAME,
-               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
-               .psc_buf                = {
-                       .bc_nbufs               = MDS_NBUFS,
-                       .bc_buf_size            = MDS_BUFSIZE,
-                       .bc_req_max_size        = MDS_MAXREQSIZE,
-                       .bc_rep_max_size        = MDS_MAXREPSIZE,
-                       .bc_req_portal          = MDS_REQUEST_PORTAL,
-                       .bc_rep_portal          = MDC_REPLY_PORTAL,
-               },
-               /*
-                * We'd like to have a mechanism to set this on a per-device
-                * basis, but alas...
-                */
-               .psc_thr                = {
-                       .tc_thr_name            = LUSTRE_MDT_NAME,
-                       .tc_thr_factor          = MDT_THR_FACTOR,
-                       .tc_nthrs_init          = MDT_NTHRS_INIT,
-                       .tc_nthrs_base          = MDT_NTHRS_BASE,
-                       .tc_nthrs_max           = MDT_NTHRS_MAX,
-                       .tc_nthrs_user          = mds_num_threads,
-                       .tc_cpu_affinity        = 1,
-                       .tc_ctx_tags            = LCT_MD_THREAD,
-               },
-               .psc_cpt                = {
-                       .cc_pattern             = mds_num_cpts,
-               },
-               .psc_ops                = {
-                       .so_req_handler         = mdt_regular_handle,
-                       .so_req_printer         = target_print_req,
-                       .so_hpreq_handler       = ptlrpc_hpreq_handler,
-               },
-       };
-       m->mdt_regular_service = ptlrpc_register_service(&conf, procfs_entry);
-       if (IS_ERR(m->mdt_regular_service)) {
-               rc = PTR_ERR(m->mdt_regular_service);
-               CERROR("failed to start regular mdt service: %d\n", rc);
-               m->mdt_regular_service = NULL;
-
-               RETURN(rc);
-       }
-
-       /*
-        * readpage service configuration. Parameters have to be adjusted,
-        * ideally.
-        */
-       memset(&conf, 0, sizeof(conf));
-       conf = (typeof(conf)) {
-               .psc_name               = LUSTRE_MDT_NAME "_readpage",
-               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
-               .psc_buf                = {
-                       .bc_nbufs               = MDS_NBUFS,
-                       .bc_buf_size            = MDS_BUFSIZE,
-                       .bc_req_max_size        = MDS_MAXREQSIZE,
-                       .bc_rep_max_size        = MDS_MAXREPSIZE,
-                       .bc_req_portal          = MDS_READPAGE_PORTAL,
-                       .bc_rep_portal          = MDC_REPLY_PORTAL,
-               },
-               .psc_thr                = {
-                       .tc_thr_name            = "mdt_rdpg",
-                       .tc_thr_factor          = MDT_RDPG_THR_FACTOR,
-                       .tc_nthrs_init          = MDT_RDPG_NTHRS_INIT,
-                       .tc_nthrs_base          = MDT_RDPG_NTHRS_BASE,
-                       .tc_nthrs_max           = MDT_RDPG_NTHRS_MAX,
-                       .tc_nthrs_user          = mds_rdpg_num_threads,
-                       .tc_cpu_affinity        = 1,
-                       .tc_ctx_tags            = LCT_MD_THREAD,
-               },
-               .psc_cpt                = {
-                       .cc_pattern             = mds_rdpg_num_cpts,
-               },
-               .psc_ops                = {
-                       .so_req_handler         = mdt_readpage_handle,
-                       .so_req_printer         = target_print_req,
-               },
-       };
-       m->mdt_readpage_service = ptlrpc_register_service(&conf, procfs_entry);
-       if (IS_ERR(m->mdt_readpage_service)) {
-               rc = PTR_ERR(m->mdt_readpage_service);
-               CERROR("failed to start readpage service: %d\n", rc);
-               m->mdt_readpage_service = NULL;
-
-               GOTO(err_mdt_svc, rc);
-        }
-
-        /*
-         * setattr service configuration.
-         *
-         * XXX To keep the compatibility with old client(< 2.2), we need to
-         * preserve this portal for a certain time, it should be removed
-         * eventually. LU-617.
-         */
-       memset(&conf, 0, sizeof(conf));
-       conf = (typeof(conf)) {
-               .psc_name               = LUSTRE_MDT_NAME "_setattr",
-               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
-               .psc_buf                = {
-                       .bc_nbufs               = MDS_NBUFS,
-                       .bc_buf_size            = MDS_BUFSIZE,
-                       .bc_req_max_size        = MDS_MAXREQSIZE,
-                       .bc_rep_max_size        = MDS_MAXREPSIZE,
-                       .bc_req_portal          = MDS_SETATTR_PORTAL,
-                       .bc_rep_portal          = MDC_REPLY_PORTAL,
-               },
-               .psc_thr                = {
-                       .tc_thr_name            = "mdt_attr",
-                       .tc_thr_factor          = MDT_SETA_THR_FACTOR,
-                       .tc_nthrs_init          = MDT_SETA_NTHRS_INIT,
-                       .tc_nthrs_base          = MDT_SETA_NTHRS_BASE,
-                       .tc_nthrs_max           = MDT_SETA_NTHRS_MAX,
-                       .tc_nthrs_user          = mds_attr_num_threads,
-                       .tc_cpu_affinity        = 1,
-                       .tc_ctx_tags            = LCT_MD_THREAD,
-               },
-               .psc_cpt                = {
-                       .cc_pattern             = mds_attr_num_cpts,
-               },
-               .psc_ops                = {
-                       .so_req_handler         = mdt_regular_handle,
-                       .so_req_printer         = target_print_req,
-               },
-       };
-       m->mdt_setattr_service = ptlrpc_register_service(&conf, procfs_entry);
-       if (IS_ERR(m->mdt_setattr_service)) {
-               rc = PTR_ERR(m->mdt_setattr_service);
-               CERROR("failed to start setattr service: %d\n", rc);
-               m->mdt_setattr_service = NULL;
-
-               GOTO(err_mdt_svc, rc);
-       }
-
-       /*
-        * sequence controller service configuration
-        */
-       memset(&conf, 0, sizeof(conf));
-       conf = (typeof(conf)) {
-               .psc_name               = LUSTRE_MDT_NAME "_mdsc",
-               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
-               .psc_buf                = {
-                       .bc_nbufs               = MDS_NBUFS,
-                       .bc_buf_size            = MDS_BUFSIZE,
-                       .bc_req_max_size        = SEQ_MAXREQSIZE,
-                       .bc_rep_max_size        = SEQ_MAXREPSIZE,
-                       .bc_req_portal          = SEQ_CONTROLLER_PORTAL,
-                       .bc_rep_portal          = MDC_REPLY_PORTAL,
-               },
-               .psc_thr                = {
-                       .tc_thr_name            = "mdt_mdsc",
-                       .tc_nthrs_init          = MDT_OTHR_NTHRS_INIT,
-                       .tc_nthrs_max           = MDT_OTHR_NTHRS_MAX,
-                       .tc_ctx_tags            = LCT_MD_THREAD,
-               },
-               .psc_ops                = {
-                       .so_req_handler         = mdt_mdsc_handle,
-                       .so_req_printer         = target_print_req,
-               },
-       };
-       m->mdt_mdsc_service = ptlrpc_register_service(&conf, procfs_entry);
-       if (IS_ERR(m->mdt_mdsc_service)) {
-               rc = PTR_ERR(m->mdt_mdsc_service);
-               CERROR("failed to start seq controller service: %d\n", rc);
-               m->mdt_mdsc_service = NULL;
-
-               GOTO(err_mdt_svc, rc);
-       }
-
-       /*
-        * metadata sequence server service configuration
-        */
-       memset(&conf, 0, sizeof(conf));
-       conf = (typeof(conf)) {
-               .psc_name               = LUSTRE_MDT_NAME "_mdss",
-               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
-               .psc_buf                = {
-                       .bc_nbufs               = MDS_NBUFS,
-                       .bc_buf_size            = MDS_BUFSIZE,
-                       .bc_req_max_size        = SEQ_MAXREQSIZE,
-                       .bc_rep_max_size        = SEQ_MAXREPSIZE,
-                       .bc_req_portal          = SEQ_METADATA_PORTAL,
-                       .bc_rep_portal          = MDC_REPLY_PORTAL,
-               },
-               .psc_thr                = {
-                       .tc_thr_name            = "mdt_mdss",
-                       .tc_nthrs_init          = MDT_OTHR_NTHRS_INIT,
-                       .tc_nthrs_max           = MDT_OTHR_NTHRS_MAX,
-                       .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD
-               },
-               .psc_ops                = {
-                       .so_req_handler         = mdt_mdss_handle,
-                       .so_req_printer         = target_print_req,
-               },
-        };
-       m->mdt_mdss_service = ptlrpc_register_service(&conf, procfs_entry);
-       if (IS_ERR(m->mdt_mdss_service)) {
-               rc = PTR_ERR(m->mdt_mdss_service);
-               CERROR("failed to start metadata seq server service: %d\n", rc);
-               m->mdt_mdss_service = NULL;
-
-               GOTO(err_mdt_svc, rc);
-       }
-
-       /*
-        * Data sequence server service configuration. We want to have really
-        * cluster-wide sequences space. This is why we start only one sequence
-        * controller which manages space.
-        */
-       memset(&conf, 0, sizeof(conf));
-       conf = (typeof(conf)) {
-               .psc_name               = LUSTRE_MDT_NAME "_dtss",
-               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
-               .psc_buf                = {
-                       .bc_nbufs               = MDS_NBUFS,
-                       .bc_buf_size            = MDS_BUFSIZE,
-                       .bc_req_max_size        = SEQ_MAXREQSIZE,
-                       .bc_rep_max_size        = SEQ_MAXREPSIZE,
-                       .bc_req_portal          = SEQ_DATA_PORTAL,
-                       .bc_rep_portal          = OSC_REPLY_PORTAL,
-               },
-               .psc_thr                = {
-                       .tc_thr_name            = "mdt_dtss",
-                       .tc_nthrs_init          = MDT_OTHR_NTHRS_INIT,
-                       .tc_nthrs_max           = MDT_OTHR_NTHRS_MAX,
-                       .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD
-               },
-               .psc_ops                = {
-                       .so_req_handler         = mdt_dtss_handle,
-                       .so_req_printer         = target_print_req,
-               },
-        };
-       m->mdt_dtss_service = ptlrpc_register_service(&conf, procfs_entry);
-       if (IS_ERR(m->mdt_dtss_service)) {
-               rc = PTR_ERR(m->mdt_dtss_service);
-               CERROR("failed to start data seq server service: %d\n", rc);
-               m->mdt_dtss_service = NULL;
-
-               GOTO(err_mdt_svc, rc);
-       }
-
-       /* FLD service start */
-       memset(&conf, 0, sizeof(conf));
-       conf = (typeof(conf)) {
-               .psc_name            = LUSTRE_MDT_NAME "_fld",
-                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
-               .psc_buf                = {
-                       .bc_nbufs               = MDS_NBUFS,
-                       .bc_buf_size            = MDS_BUFSIZE,
-                       .bc_req_max_size        = FLD_MAXREQSIZE,
-                       .bc_rep_max_size        = FLD_MAXREPSIZE,
-                       .bc_req_portal          = FLD_REQUEST_PORTAL,
-                       .bc_rep_portal          = MDC_REPLY_PORTAL,
-               },
-               .psc_thr                = {
-                       .tc_thr_name            = "mdt_fld",
-                       .tc_nthrs_init          = MDT_OTHR_NTHRS_INIT,
-                       .tc_nthrs_max           = MDT_OTHR_NTHRS_MAX,
-                       .tc_ctx_tags            = LCT_DT_THREAD | LCT_MD_THREAD
-               },
-               .psc_ops                = {
-                       .so_req_handler         = mdt_fld_handle,
-                       .so_req_printer         = target_print_req,
-               },
-       };
-       m->mdt_fld_service = ptlrpc_register_service(&conf, procfs_entry);
-       if (IS_ERR(m->mdt_fld_service)) {
-               rc = PTR_ERR(m->mdt_fld_service);
-               CERROR("failed to start fld service: %d\n", rc);
-               m->mdt_fld_service = NULL;
-
-               GOTO(err_mdt_svc, rc);
-       }
-
-       /*
-        * mds-mds service configuration. Separate portal is used to allow
-        * mds-mds requests be not blocked during recovery.
-        */
-       memset(&conf, 0, sizeof(conf));
-       conf = (typeof(conf)) {
-               .psc_name               = LUSTRE_MDT_NAME "_mds",
-               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
-               .psc_buf                = {
-                       .bc_nbufs               = MDS_NBUFS,
-                       .bc_buf_size            = MDS_BUFSIZE,
-                       .bc_req_max_size        = MDS_MAXREQSIZE,
-                       .bc_rep_max_size        = MDS_MAXREPSIZE,
-                       .bc_req_portal          = MDS_MDS_PORTAL,
-                       .bc_rep_portal          = MDC_REPLY_PORTAL,
-               },
-               .psc_thr                = {
-                       .tc_thr_name            = "mdt_mds",
-                       .tc_nthrs_init          = MDT_OTHR_NTHRS_INIT,
-                       .tc_nthrs_max           = MDT_OTHR_NTHRS_MAX,
-                       .tc_ctx_tags            = LCT_MD_THREAD,
-               },
-               .psc_ops                = {
-                       .so_req_handler         = mdt_xmds_handle,
-                       .so_req_printer         = target_print_req,
-                       .so_hpreq_handler       = ptlrpc_hpreq_handler,
-               },
-       };
-       m->mdt_xmds_service = ptlrpc_register_service(&conf, procfs_entry);
-       if (IS_ERR(m->mdt_xmds_service)) {
-               rc = PTR_ERR(m->mdt_xmds_service);
-               CERROR("failed to start xmds service: %d\n", rc);
-               m->mdt_xmds_service = NULL;
-
-               GOTO(err_mdt_svc, rc);
-        }
-
-        EXIT;
-err_mdt_svc:
-        if (rc)
-                mdt_stop_ptlrpc_service(m);
-
-        return rc;
-}
-
 static void mdt_stack_fini(const struct lu_env *env,
                            struct mdt_device *m, struct lu_device *top)
 {
 static void mdt_stack_fini(const struct lu_env *env,
                            struct mdt_device *m, struct lu_device *top)
 {
@@ -4909,7 +4395,6 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 
         ping_evictor_stop();
 
 
         ping_evictor_stop();
 
-        mdt_stop_ptlrpc_service(m);
         mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
         obd_exports_barrier(obd);
         obd_zombie_barrier();
         mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
         obd_exports_barrier(obd);
         obd_zombie_barrier();
@@ -5163,9 +4648,9 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        if (rc)
                GOTO(err_procfs, rc);
 
        if (rc)
                GOTO(err_procfs, rc);
 
-        rc = mdt_start_ptlrpc_service(m);
-        if (rc)
-               GOTO(err_quota, rc);
+       m->mdt_ldlm_client = &mdt2obd_dev(m)->obd_ldlm_client;
+       ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
+                          "mdt_ldlm_client", m->mdt_ldlm_client);
 
         ping_evictor_start();
 
 
         ping_evictor_start();
 
@@ -5183,10 +4668,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 
         RETURN(0);
 
 
         RETURN(0);
 
-        ping_evictor_stop();
-        mdt_stop_ptlrpc_service(m);
-err_quota:
-       mdt_quota_fini(env, m);
 err_procfs:
         mdt_procfs_fini(m);
 err_recovery:
 err_procfs:
         mdt_procfs_fini(m);
 err_recovery:
@@ -5427,7 +4908,7 @@ static int mdt_prepare(const struct lu_env *env,
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-static const struct lu_device_operations mdt_lu_ops = {
+const struct lu_device_operations mdt_lu_ops = {
         .ldo_object_alloc   = mdt_object_alloc,
         .ldo_process_config = mdt_process_config,
        .ldo_prepare        = mdt_prepare,
         .ldo_object_alloc   = mdt_object_alloc,
         .ldo_process_config = mdt_process_config,
        .ldo_prepare        = mdt_prepare,
@@ -5906,7 +5387,7 @@ static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-static int mdt_get_info(struct mdt_thread_info *info)
+int mdt_get_info(struct mdt_thread_info *info)
 {
         struct ptlrpc_request *req = mdt_info_req(info);
         char *key;
 {
         struct ptlrpc_request *req = mdt_info_req(info);
         char *key;
@@ -6284,16 +5765,7 @@ int mdt_cos_is_enabled(struct mdt_device *mdt)
         return mdt->mdt_opts.mo_cos != 0;
 }
 
         return mdt->mdt_opts.mo_cos != 0;
 }
 
-/* type constructor/destructor: mdt_type_init, mdt_type_fini */
-LU_TYPE_INIT_FINI(mdt, &mdt_thread_key);
-
 static struct lu_device_type_operations mdt_device_type_ops = {
 static struct lu_device_type_operations mdt_device_type_ops = {
-        .ldto_init = mdt_type_init,
-        .ldto_fini = mdt_type_fini,
-
-        .ldto_start = mdt_type_start,
-        .ldto_stop  = mdt_type_stop,
-
         .ldto_device_alloc = mdt_device_alloc,
         .ldto_device_free  = mdt_device_free,
         .ldto_device_fini  = mdt_device_fini
         .ldto_device_alloc = mdt_device_alloc,
         .ldto_device_free  = mdt_device_free,
         .ldto_device_fini  = mdt_device_fini
@@ -6308,260 +5780,39 @@ static struct lu_device_type mdt_device_type = {
 
 static int __init mdt_mod_init(void)
 {
 
 static int __init mdt_mod_init(void)
 {
-        struct lprocfs_static_vars lvars;
-        int rc;
+       struct lprocfs_static_vars lvars;
+       int rc;
 
        rc = lu_kmem_init(mdt_caches);
        if (rc)
                return rc;
 
 
        rc = lu_kmem_init(mdt_caches);
        if (rc)
                return rc;
 
-       if (mdt_num_threads != 0 && mds_num_threads == 0) {
-               LCONSOLE_INFO("mdt_num_threads module parameter is deprecated,"
-                             "use mds_num_threads instead or unset both for"
-                             "dynamic thread startup\n");
-               mds_num_threads = mdt_num_threads;
-       }
-
-        lprocfs_mdt_init_vars(&lvars);
-        rc = class_register_type(&mdt_obd_device_ops, NULL,
-                                 lvars.module_vars, LUSTRE_MDT_NAME,
-                                 &mdt_device_type);
+       rc = mds_mod_init();
+       if (rc)
+               GOTO(lu_fini, rc);
 
 
+       lprocfs_mdt_init_vars(&lvars);
+       rc = class_register_type(&mdt_obd_device_ops, NULL,
+                                lvars.module_vars, LUSTRE_MDT_NAME,
+                                &mdt_device_type);
+       if (rc)
+               GOTO(mds_fini, rc);
+lu_fini:
        if (rc)
                lu_kmem_fini(mdt_caches);
        if (rc)
                lu_kmem_fini(mdt_caches);
-        return rc;
+mds_fini:
+       if (rc)
+               mds_mod_exit();
+       return rc;
 }
 
 static void __exit mdt_mod_exit(void)
 {
 }
 
 static void __exit mdt_mod_exit(void)
 {
-        class_unregister_type(LUSTRE_MDT_NAME);
+       class_unregister_type(LUSTRE_MDT_NAME);
+       mds_mod_exit();
        lu_kmem_fini(mdt_caches);
 }
 
        lu_kmem_fini(mdt_caches);
 }
 
-#define DEFINE_RPC_HANDLER(base, flags, opc, fn, fmt)                  \
-[opc - base] = {                                                       \
-       .mh_name        = #opc,                                         \
-       .mh_fail_id     = OBD_FAIL_ ## opc ## _NET,                     \
-       .mh_opc         = opc,                                          \
-       .mh_flags       = flags,                                        \
-       .mh_act         = fn,                                           \
-       .mh_fmt         = fmt                                           \
-}
-
-/* Request with a format known in advance */
-#define DEF_MDT_HDL(flags, name, fn)                                   \
-       DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, &RQF_ ## name)
-
-/* Request with a format we do not yet know */
-#define DEF_MDT_HDL_VAR(flags, name, fn)                               \
-       DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, NULL)
-
-/* Map one non-standard request format handler.  This should probably get
- * a common OBD_SET_INFO RPC opcode instead of this mismatch. */
-#define RQF_MDS_SET_INFO RQF_OBD_SET_INFO
-
-static struct mdt_handler mdt_mds_ops[] = {
-DEF_MDT_HDL(0,                         MDS_CONNECT,      mdt_connect),
-DEF_MDT_HDL(0,                         MDS_DISCONNECT,   mdt_disconnect),
-DEF_MDT_HDL(0,                         MDS_SET_INFO,     mdt_set_info),
-DEF_MDT_HDL(0,                         MDS_GET_INFO,     mdt_get_info),
-DEF_MDT_HDL(0          | HABEO_REFERO, MDS_GETSTATUS,    mdt_getstatus),
-DEF_MDT_HDL(HABEO_CORPUS,              MDS_GETATTR,      mdt_getattr),
-DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO,        MDS_GETATTR_NAME, mdt_getattr_name),
-DEF_MDT_HDL(HABEO_CORPUS,              MDS_GETXATTR,     mdt_getxattr),
-DEF_MDT_HDL(0          | HABEO_REFERO, MDS_STATFS,       mdt_statfs),
-DEF_MDT_HDL(0          | MUTABOR,      MDS_REINT,        mdt_reint),
-DEF_MDT_HDL(HABEO_CORPUS,              MDS_CLOSE,        mdt_close),
-DEF_MDT_HDL(HABEO_CORPUS,              MDS_DONE_WRITING, mdt_done_writing),
-DEF_MDT_HDL(0          | HABEO_REFERO, MDS_PIN,          mdt_pin),
-DEF_MDT_HDL_VAR(0,                     MDS_SYNC,         mdt_sync),
-DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO,        MDS_IS_SUBDIR,    mdt_is_subdir),
-DEF_MDT_HDL(0,                         MDS_QUOTACHECK,   mdt_quotacheck),
-DEF_MDT_HDL(0,                         MDS_QUOTACTL,     mdt_quotactl)
-};
-
-#define DEF_OBD_HDL(flags, name, fn)                                   \
-       DEFINE_RPC_HANDLER(OBD_PING, flags, name, fn, NULL)
-
-static struct mdt_handler mdt_obd_ops[] = {
-DEF_OBD_HDL(0,                         OBD_PING,         mdt_obd_ping),
-DEF_OBD_HDL(0,                         OBD_LOG_CANCEL,   mdt_obd_log_cancel),
-DEF_OBD_HDL(0,                         OBD_QC_CALLBACK,  mdt_obd_qc_callback),
-DEF_OBD_HDL(0,                         OBD_IDX_READ,     mdt_obd_idx_read)
-};
-
-#define DEF_DLM_HDL_VAR(flags, name, fn)                               \
-       DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, NULL)
-#define DEF_DLM_HDL(flags, name, fn)                                   \
-       DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, &RQF_ ## name)
-
-static struct mdt_handler mdt_dlm_ops[] = {
-DEF_DLM_HDL    (HABEO_CLAVIS,          LDLM_ENQUEUE,     mdt_enqueue),
-DEF_DLM_HDL_VAR(HABEO_CLAVIS,          LDLM_CONVERT,     mdt_convert),
-DEF_DLM_HDL_VAR(0,                     LDLM_BL_CALLBACK, mdt_bl_callback),
-DEF_DLM_HDL_VAR(0,                     LDLM_CP_CALLBACK, mdt_cp_callback)
-};
-
-#define DEF_LLOG_HDL(flags, name, fn)                                  \
-       DEFINE_RPC_HANDLER(LLOG_ORIGIN_HANDLE_CREATE, flags, name, fn, NULL)
-
-static struct mdt_handler mdt_llog_ops[] = {
-DEF_LLOG_HDL(0,                LLOG_ORIGIN_HANDLE_CREATE,        mdt_llog_create),
-DEF_LLOG_HDL(0,                LLOG_ORIGIN_HANDLE_NEXT_BLOCK,    mdt_llog_next_block),
-DEF_LLOG_HDL(0,                LLOG_ORIGIN_HANDLE_READ_HEADER,   mdt_llog_read_header),
-DEF_LLOG_HDL(0,                LLOG_ORIGIN_HANDLE_WRITE_REC,     NULL),
-DEF_LLOG_HDL(0,                LLOG_ORIGIN_HANDLE_CLOSE,         NULL),
-DEF_LLOG_HDL(0,                LLOG_ORIGIN_CONNECT,              NULL),
-DEF_LLOG_HDL(0,                LLOG_CATINFO,                     NULL),
-DEF_LLOG_HDL(0,                LLOG_ORIGIN_HANDLE_PREV_BLOCK,    mdt_llog_prev_block),
-DEF_LLOG_HDL(0,                LLOG_ORIGIN_HANDLE_DESTROY,       mdt_llog_destroy),
-};
-
-#define DEF_SEC_HDL(flags, name, fn)                                   \
-       DEFINE_RPC_HANDLER(SEC_CTX_INIT, flags, name, fn, NULL)
-
-static struct mdt_handler mdt_sec_ctx_ops[] = {
-DEF_SEC_HDL(0,                         SEC_CTX_INIT,     mdt_sec_ctx_handle),
-DEF_SEC_HDL(0,                         SEC_CTX_INIT_CONT,mdt_sec_ctx_handle),
-DEF_SEC_HDL(0,                         SEC_CTX_FINI,     mdt_sec_ctx_handle)
-};
-
-#define DEF_QUOTA_HDL(flags, name, fn)                         \
-       DEFINE_RPC_HANDLER(QUOTA_DQACQ, flags, name, fn, &RQF_ ## name)
-
-static struct mdt_handler mdt_quota_ops[] = {
-DEF_QUOTA_HDL(HABEO_REFERO,            QUOTA_DQACQ,      mdt_quota_dqacq),
-};
-
-static struct mdt_opc_slice mdt_regular_handlers[] = {
-        {
-                .mos_opc_start = MDS_GETATTR,
-                .mos_opc_end   = MDS_LAST_OPC,
-                .mos_hs        = mdt_mds_ops
-        },
-        {
-                .mos_opc_start = OBD_PING,
-                .mos_opc_end   = OBD_LAST_OPC,
-                .mos_hs        = mdt_obd_ops
-        },
-        {
-                .mos_opc_start = LDLM_ENQUEUE,
-                .mos_opc_end   = LDLM_LAST_OPC,
-                .mos_hs        = mdt_dlm_ops
-        },
-        {
-                .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
-                .mos_opc_end   = LLOG_LAST_OPC,
-                .mos_hs        = mdt_llog_ops
-        },
-        {
-                .mos_opc_start = SEC_CTX_INIT,
-                .mos_opc_end   = SEC_LAST_OPC,
-                .mos_hs        = mdt_sec_ctx_ops
-        },
-       {
-               .mos_opc_start = QUOTA_DQACQ,
-               .mos_opc_end   = QUOTA_LAST_OPC,
-               .mos_hs        = mdt_quota_ops
-       },
-        {
-                .mos_hs        = NULL
-        }
-};
-
-/* Readpage/readdir handlers */
-static struct mdt_handler mdt_readpage_ops[] = {
-DEF_MDT_HDL(0,                 MDS_CONNECT,  mdt_connect),
-DEF_MDT_HDL(HABEO_CORPUS | HABEO_REFERO, MDS_READPAGE, mdt_readpage),
-/* XXX: this is ugly and should be fixed one day, see mdc_close() for
- * detailed comments. --umka */
-DEF_MDT_HDL(HABEO_CORPUS,              MDS_CLOSE,        mdt_close),
-DEF_MDT_HDL(HABEO_CORPUS,              MDS_DONE_WRITING, mdt_done_writing),
-};
-
-static struct mdt_opc_slice mdt_readpage_handlers[] = {
-        {
-                .mos_opc_start = MDS_GETATTR,
-                .mos_opc_end   = MDS_LAST_OPC,
-                .mos_hs        = mdt_readpage_ops
-        },
-       {
-               .mos_opc_start = OBD_FIRST_OPC,
-               .mos_opc_end   = OBD_LAST_OPC,
-               .mos_hs        = mdt_obd_ops
-       },
-        {
-                .mos_hs        = NULL
-        }
-};
-
-/* Cross MDT operation handlers for DNE */
-static struct mdt_handler mdt_xmds_ops[] = {
-DEF_MDT_HDL(0,                         MDS_CONNECT,      mdt_connect),
-DEF_MDT_HDL(HABEO_CORPUS,              MDS_GETATTR,      mdt_getattr),
-DEF_MDT_HDL(0          | MUTABOR,      MDS_REINT,        mdt_reint),
-DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO,        MDS_IS_SUBDIR,    mdt_is_subdir),
-};
-
-static struct mdt_opc_slice mdt_xmds_handlers[] = {
-        {
-                .mos_opc_start = MDS_GETATTR,
-                .mos_opc_end   = MDS_LAST_OPC,
-                .mos_hs        = mdt_xmds_ops
-        },
-        {
-                .mos_opc_start = OBD_PING,
-                .mos_opc_end   = OBD_LAST_OPC,
-                .mos_hs        = mdt_obd_ops
-        },
-        {
-                .mos_opc_start = SEC_CTX_INIT,
-                .mos_opc_end   = SEC_LAST_OPC,
-                .mos_hs        = mdt_sec_ctx_ops
-        },
-        {
-                .mos_hs        = NULL
-        }
-};
-
-/* Sequence service handlers */
-#define DEF_SEQ_HDL(flags, name, fn)                                   \
-       DEFINE_RPC_HANDLER(SEQ_QUERY, flags, name, fn, &RQF_ ## name)
-
-static struct mdt_handler mdt_seq_ops[] = {
-DEF_SEQ_HDL(0,                         SEQ_QUERY,        (void *)seq_query),
-};
-
-static struct mdt_opc_slice mdt_seq_handlers[] = {
-        {
-                .mos_opc_start = SEQ_QUERY,
-                .mos_opc_end   = SEQ_LAST_OPC,
-                .mos_hs        = mdt_seq_ops
-        },
-        {
-                .mos_hs        = NULL
-        }
-};
-
-/* FID Location Database handlers */
-#define DEF_FLD_HDL(flags, name, fn)                                   \
-       DEFINE_RPC_HANDLER(FLD_QUERY, flags, name, fn, &RQF_ ## name)
-
-static struct mdt_handler mdt_fld_ops[] = {
-DEF_FLD_HDL(0,                         FLD_QUERY,        (void *)fld_query),
-};
-
-static struct mdt_opc_slice mdt_fld_handlers[] = {
-        {
-                .mos_opc_start = FLD_QUERY,
-                .mos_opc_end   = FLD_LAST_OPC,
-                .mos_hs        = mdt_fld_ops
-        },
-        {
-                .mos_hs        = NULL
-        }
-};
-
 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
 MODULE_DESCRIPTION("Lustre Metadata Target ("LUSTRE_MDT_NAME")");
 MODULE_LICENSE("GPL");
 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
 MODULE_DESCRIPTION("Lustre Metadata Target ("LUSTRE_MDT_NAME")");
 MODULE_LICENSE("GPL");
index 2d68060..b3e5a34 100644 (file)
@@ -415,6 +415,61 @@ struct mdt_thread_info {
        char                       mti_xattr_buf[128];
 };
 
        char                       mti_xattr_buf[128];
 };
 
+/* ptlrpc request handler for MDT. All handlers are
+ * grouped into several slices - struct mdt_opc_slice,
+ * and stored in an array - mdt_handlers[].
+ */
+struct mdt_handler {
+       /* The name of this handler. */
+       const char *mh_name;
+       /* Fail id for this handler, checked at the beginning of this handler*/
+       int      mh_fail_id;
+       /* Operation code for this handler */
+       __u32       mh_opc;
+       /* flags are listed in enum mdt_handler_flags below. */
+       __u32       mh_flags;
+       /* The actual handler function to execute. */
+       int (*mh_act)(struct mdt_thread_info *info);
+       /* Request format for this request. */
+       const struct req_format *mh_fmt;
+};
+
+enum mdt_handler_flags {
+       /*
+        * struct mdt_body is passed in the incoming message, and object
+        * identified by this fid exists on disk.
+        *
+        * "habeo corpus" == "I have a body"
+        */
+       HABEO_CORPUS = (1 << 0),
+       /*
+        * struct ldlm_request is passed in the incoming message.
+        *
+        * "habeo clavis" == "I have a key"
+        */
+       HABEO_CLAVIS = (1 << 1),
+       /*
+        * this request has fixed reply format, so that reply message can be
+        * packed by generic code.
+        *
+        * "habeo refero" == "I have a reply"
+        */
+       HABEO_REFERO = (1 << 2),
+       /*
+        * this request will modify something, so check whether the filesystem
+        * is readonly or not, then return -EROFS to client asap if necessary.
+        *
+        * "mutabor" == "I shall modify"
+        */
+       MUTABOR      = (1 << 3)
+};
+
+struct mdt_opc_slice {
+       __u32                   mos_opc_start;
+       int                     mos_opc_end;
+       struct mdt_handler      *mos_hs;
+};
+
 static inline const struct md_device_operations *
 mdt_child_ops(struct mdt_device * m)
 {
 static inline const struct md_device_operations *
 mdt_child_ops(struct mdt_device * m)
 {
@@ -624,6 +679,45 @@ int mdt_version_get_check(struct mdt_thread_info *, struct mdt_object *, int);
 void mdt_version_get_save(struct mdt_thread_info *, struct mdt_object *, int);
 int mdt_version_get_check_save(struct mdt_thread_info *, struct mdt_object *,
                                int);
 void mdt_version_get_save(struct mdt_thread_info *, struct mdt_object *, int);
 int mdt_version_get_check_save(struct mdt_thread_info *, struct mdt_object *,
                                int);
+int mdt_handle_common(struct ptlrpc_request *req,
+                     struct mdt_opc_slice *supported);
+int mdt_connect(struct mdt_thread_info *info);
+int mdt_disconnect(struct mdt_thread_info *info);
+int mdt_set_info(struct mdt_thread_info *info);
+int mdt_get_info(struct mdt_thread_info *info);
+int mdt_getstatus(struct mdt_thread_info *info);
+int mdt_getattr(struct mdt_thread_info *info);
+int mdt_getattr_name(struct mdt_thread_info *info);
+int mdt_statfs(struct mdt_thread_info *info);
+int mdt_reint(struct mdt_thread_info *info);
+int mdt_sync(struct mdt_thread_info *info);
+int mdt_is_subdir(struct mdt_thread_info *info);
+int mdt_obd_ping(struct mdt_thread_info *info);
+int mdt_obd_log_cancel(struct mdt_thread_info *info);
+int mdt_obd_qc_callback(struct mdt_thread_info *info);
+int mdt_enqueue(struct mdt_thread_info *info);
+int mdt_convert(struct mdt_thread_info *info);
+int mdt_bl_callback(struct mdt_thread_info *info);
+int mdt_cp_callback(struct mdt_thread_info *info);
+int mdt_llog_create(struct mdt_thread_info *info);
+int mdt_llog_destroy(struct mdt_thread_info *info);
+int mdt_llog_read_header(struct mdt_thread_info *info);
+int mdt_llog_next_block(struct mdt_thread_info *info);
+int mdt_llog_prev_block(struct mdt_thread_info *info);
+int mdt_sec_ctx_handle(struct mdt_thread_info *info);
+int mdt_readpage(struct mdt_thread_info *info);
+int mdt_obd_idx_read(struct mdt_thread_info *info);
+
+extern struct mdt_opc_slice mdt_regular_handlers[];
+extern struct mdt_opc_slice mdt_seq_handlers[];
+extern struct mdt_opc_slice mdt_fld_handlers[];
+
+int mdt_quotacheck(struct mdt_thread_info *info);
+int mdt_quotactl(struct mdt_thread_info *info);
+int mdt_quota_dqacq(struct mdt_thread_info *info);
+
+extern struct lprocfs_vars lprocfs_mds_module_vars[];
+extern struct lprocfs_vars lprocfs_mds_obd_vars[];
 
 /* mdt_idmap.c */
 int mdt_init_sec_level(struct mdt_thread_info *);
 
 /* mdt_idmap.c */
 int mdt_init_sec_level(struct mdt_thread_info *);
@@ -818,6 +912,7 @@ enum {
 void mdt_counter_incr(struct ptlrpc_request *req, int opcode);
 void mdt_stats_counter_init(struct lprocfs_stats *stats);
 void lprocfs_mdt_init_vars(struct lprocfs_static_vars *lvars);
 void mdt_counter_incr(struct ptlrpc_request *req, int opcode);
 void mdt_stats_counter_init(struct lprocfs_stats *stats);
 void lprocfs_mdt_init_vars(struct lprocfs_static_vars *lvars);
+void lprocfs_mds_init_vars(struct lprocfs_static_vars *lvars);
 int mdt_procfs_init(struct mdt_device *mdt, const char *name);
 int mdt_procfs_fini(struct mdt_device *mdt);
 void mdt_rename_counter_tally(struct mdt_thread_info *info,
 int mdt_procfs_init(struct mdt_device *mdt, const char *name);
 int mdt_procfs_fini(struct mdt_device *mdt);
 void mdt_rename_counter_tally(struct mdt_thread_info *info,
@@ -877,5 +972,28 @@ static inline struct obd_device *mdt2obd_dev(const struct mdt_device *mdt)
 {
         return mdt->mdt_md_dev.md_lu_dev.ld_obd;
 }
 {
         return mdt->mdt_md_dev.md_lu_dev.ld_obd;
 }
+
+extern const struct lu_device_operations mdt_lu_ops;
+
+static inline int lu_device_is_mdt(struct lu_device *d)
+{
+       return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
+}
+
+static inline struct mdt_device *lu2mdt_dev(struct lu_device *d)
+{
+       LASSERTF(lu_device_is_mdt(d), "It is %s instead of MDT %p %p\n",
+                d->ld_type->ldt_name, d->ld_ops, &mdt_lu_ops);
+       return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
+}
+
+static inline char *mdt_obd_name(struct mdt_device *mdt)
+{
+       return mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name;
+}
+
+int mds_mod_init(void);
+void mds_mod_exit(void);
+
 #endif /* __KERNEL__ */
 #endif /* _MDT_H */
 #endif /* __KERNEL__ */
 #endif /* _MDT_H */
index 75b7aff..0d10215 100644 (file)
@@ -1030,10 +1030,20 @@ static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
 
 void lprocfs_mdt_init_vars(struct lprocfs_static_vars *lvars)
 {
 
 void lprocfs_mdt_init_vars(struct lprocfs_static_vars *lvars)
 {
-    lvars->module_vars  = lprocfs_mdt_module_vars;
-    lvars->obd_vars     = lprocfs_mdt_obd_vars;
+       lvars->module_vars  = lprocfs_mdt_module_vars;
+       lvars->obd_vars     = lprocfs_mdt_obd_vars;
 }
 
 }
 
+struct lprocfs_vars lprocfs_mds_obd_vars[] = {
+       { "uuid",        lprocfs_rd_uuid,       0, 0 },
+       { 0 }
+};
+
+struct lprocfs_vars lprocfs_mds_module_vars[] = {
+       { "num_refs",     lprocfs_rd_numrefs,     0, 0 },
+       { 0 }
+};
+
 void mdt_counter_incr(struct ptlrpc_request *req, int opcode)
 {
        struct obd_export *exp = req->rq_export;
 void mdt_counter_incr(struct ptlrpc_request *req, int opcode)
 {
        struct obd_export *exp = req->rq_export;
diff --git a/lustre/mdt/mdt_mds.c b/lustre/mdt/mdt_mds.c
new file mode 100644 (file)
index 0000000..ac4894b
--- /dev/null
@@ -0,0 +1,729 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License version 2 for more details.  A copy is
+ * included in the COPYING file that accompanied this code.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012 Intel Corporation
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ *
+ * lustre/mdt/mdt_mds.c
+ *
+ * Lustre Metadata Service Layer
+ *
+ * Author: Di Wang <di.wang@whamcloud.com>
+ **/
+
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <linux/module.h>
+
+#include <obd_support.h>
+/* struct ptlrpc_request */
+#include <lustre_net.h>
+/* struct obd_export */
+#include <lustre_export.h>
+/* struct obd_device */
+#include <obd.h>
+/* lu2dt_dev() */
+#include <dt_object.h>
+#include <lustre_mds.h>
+#include <lustre_mdt.h>
+#include "mdt_internal.h"
+#ifdef HAVE_QUOTA_SUPPORT
+# include <lustre_quota.h>
+#endif
+#include <lustre_acl.h>
+#include <lustre_param.h>
+#include <lustre_fsfilt.h>
+
+struct mds_device {
+       /* super-class */
+       struct md_device           mds_md_dev;
+       struct ptlrpc_service     *mds_regular_service;
+       struct ptlrpc_service     *mds_readpage_service;
+       struct ptlrpc_service     *mds_setattr_service;
+       struct ptlrpc_service     *mds_mdsc_service;
+       struct ptlrpc_service     *mds_mdss_service;
+       struct ptlrpc_service     *mds_fld_service;
+};
+
+/*
+ *  * Initialized in mdt_mod_init().
+ *   */
+static unsigned long mdt_num_threads;
+CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
+               "number of MDS service threads to start "
+               "(deprecated in favor of mds_num_threads)");
+
+static unsigned long mds_num_threads;
+CFS_MODULE_PARM(mds_num_threads, "ul", ulong, 0444,
+               "number of MDS service threads to start");
+
+static char *mds_num_cpts;
+CFS_MODULE_PARM(mds_num_cpts, "c", charp, 0444,
+               "CPU partitions MDS threads should run on");
+
+static unsigned long mds_rdpg_num_threads;
+CFS_MODULE_PARM(mds_rdpg_num_threads, "ul", ulong, 0444,
+               "number of MDS readpage service threads to start");
+
+static char *mds_rdpg_num_cpts;
+CFS_MODULE_PARM(mds_rdpg_num_cpts, "c", charp, 0444,
+               "CPU partitions MDS readpage threads should run on");
+
+/* NB: these two should be removed along with setattr service in the future */
+static unsigned long mds_attr_num_threads;
+CFS_MODULE_PARM(mds_attr_num_threads, "ul", ulong, 0444,
+               "number of MDS setattr service threads to start");
+
+static char *mds_attr_num_cpts;
+CFS_MODULE_PARM(mds_attr_num_cpts, "c", charp, 0444,
+               "CPU partitions MDS setattr threads should run on");
+
+#define DEFINE_RPC_HANDLER(base, flags, opc, fn, fmt)                  \
+[opc - base] = {                                                       \
+       .mh_name        = #opc,                                         \
+       .mh_fail_id     = OBD_FAIL_ ## opc ## _NET,                     \
+       .mh_opc         = opc,                                          \
+       .mh_flags       = flags,                                        \
+       .mh_act         = fn,                                           \
+       .mh_fmt         = fmt                                           \
+}
+
+/* Request with a format known in advance */
+#define DEF_MDT_HDL(flags, name, fn)                                   \
+       DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, &RQF_ ## name)
+
+/* Request with a format we do not yet know */
+#define DEF_MDT_HDL_VAR(flags, name, fn)                               \
+       DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, NULL)
+
+/* Map one non-standard request format handler.  This should probably get
+ * a common OBD_SET_INFO RPC opcode instead of this mismatch. */
+#define RQF_MDS_SET_INFO RQF_OBD_SET_INFO
+
+static struct mdt_handler mdt_mds_ops[] = {
+DEF_MDT_HDL(0,                         MDS_CONNECT,      mdt_connect),
+DEF_MDT_HDL(0,                         MDS_DISCONNECT,   mdt_disconnect),
+DEF_MDT_HDL(0,                         MDS_SET_INFO,     mdt_set_info),
+DEF_MDT_HDL(0,                         MDS_GET_INFO,     mdt_get_info),
+DEF_MDT_HDL(0          | HABEO_REFERO, MDS_GETSTATUS,    mdt_getstatus),
+DEF_MDT_HDL(HABEO_CORPUS,              MDS_GETATTR,      mdt_getattr),
+DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO,        MDS_GETATTR_NAME, mdt_getattr_name),
+DEF_MDT_HDL(HABEO_CORPUS,              MDS_GETXATTR,     mdt_getxattr),
+DEF_MDT_HDL(0          | HABEO_REFERO, MDS_STATFS,       mdt_statfs),
+DEF_MDT_HDL(0          | MUTABOR,      MDS_REINT,        mdt_reint),
+DEF_MDT_HDL(HABEO_CORPUS,              MDS_CLOSE,        mdt_close),
+DEF_MDT_HDL(HABEO_CORPUS,              MDS_DONE_WRITING, mdt_done_writing),
+DEF_MDT_HDL(0          | HABEO_REFERO, MDS_PIN,          mdt_pin),
+DEF_MDT_HDL_VAR(0,                     MDS_SYNC,         mdt_sync),
+DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO,        MDS_IS_SUBDIR,    mdt_is_subdir),
+DEF_MDT_HDL(0,                         MDS_QUOTACHECK,   mdt_quotacheck),
+DEF_MDT_HDL(0,                         MDS_QUOTACTL,     mdt_quotactl)
+};
+
+#define DEF_OBD_HDL(flags, name, fn)                                   \
+       DEFINE_RPC_HANDLER(OBD_PING, flags, name, fn, NULL)
+
+static struct mdt_handler mdt_obd_ops[] = {
+DEF_OBD_HDL(0,                         OBD_PING,         mdt_obd_ping),
+DEF_OBD_HDL(0,                         OBD_LOG_CANCEL,   mdt_obd_log_cancel),
+DEF_OBD_HDL(0,                         OBD_QC_CALLBACK,  mdt_obd_qc_callback),
+DEF_OBD_HDL(0,                         OBD_IDX_READ,     mdt_obd_idx_read)
+};
+
+#define DEF_DLM_HDL_VAR(flags, name, fn)                               \
+       DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, NULL)
+#define DEF_DLM_HDL(flags, name, fn)                                   \
+       DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, &RQF_ ## name)
+
+static struct mdt_handler mdt_dlm_ops[] = {
+DEF_DLM_HDL    (HABEO_CLAVIS,          LDLM_ENQUEUE,     mdt_enqueue),
+DEF_DLM_HDL_VAR(HABEO_CLAVIS,          LDLM_CONVERT,     mdt_convert),
+DEF_DLM_HDL_VAR(0,                     LDLM_BL_CALLBACK, mdt_bl_callback),
+DEF_DLM_HDL_VAR(0,                     LDLM_CP_CALLBACK, mdt_cp_callback)
+};
+
+#define DEF_LLOG_HDL(flags, name, fn)                                  \
+       DEFINE_RPC_HANDLER(LLOG_ORIGIN_HANDLE_CREATE, flags, name, fn, NULL)
+
+static struct mdt_handler mdt_llog_ops[] = {
+DEF_LLOG_HDL(0,                LLOG_ORIGIN_HANDLE_CREATE,        mdt_llog_create),
+DEF_LLOG_HDL(0,                LLOG_ORIGIN_HANDLE_NEXT_BLOCK,    mdt_llog_next_block),
+DEF_LLOG_HDL(0,                LLOG_ORIGIN_HANDLE_READ_HEADER,   mdt_llog_read_header),
+DEF_LLOG_HDL(0,                LLOG_ORIGIN_HANDLE_WRITE_REC,     NULL),
+DEF_LLOG_HDL(0,                LLOG_ORIGIN_HANDLE_CLOSE,         NULL),
+DEF_LLOG_HDL(0,                LLOG_ORIGIN_CONNECT,              NULL),
+DEF_LLOG_HDL(0,                LLOG_CATINFO,                     NULL),
+DEF_LLOG_HDL(0,                LLOG_ORIGIN_HANDLE_PREV_BLOCK,    mdt_llog_prev_block),
+DEF_LLOG_HDL(0,                LLOG_ORIGIN_HANDLE_DESTROY,       mdt_llog_destroy),
+};
+
+#define DEF_SEC_HDL(flags, name, fn)                                   \
+       DEFINE_RPC_HANDLER(SEC_CTX_INIT, flags, name, fn, NULL)
+
+static struct mdt_handler mdt_sec_ctx_ops[] = {
+DEF_SEC_HDL(0,                         SEC_CTX_INIT,     mdt_sec_ctx_handle),
+DEF_SEC_HDL(0,                         SEC_CTX_INIT_CONT,mdt_sec_ctx_handle),
+DEF_SEC_HDL(0,                         SEC_CTX_FINI,     mdt_sec_ctx_handle)
+};
+
+#define DEF_QUOTA_HDL(flags, name, fn)                         \
+       DEFINE_RPC_HANDLER(QUOTA_DQACQ, flags, name, fn, &RQF_ ## name)
+
+static struct mdt_handler mdt_quota_ops[] = {
+DEF_QUOTA_HDL(HABEO_REFERO,            QUOTA_DQACQ,      mdt_quota_dqacq),
+};
+
+struct mdt_opc_slice mdt_regular_handlers[] = {
+       {
+               .mos_opc_start  = MDS_GETATTR,
+               .mos_opc_end    = MDS_LAST_OPC,
+               .mos_hs         = mdt_mds_ops
+       },
+       {
+               .mos_opc_start  = OBD_PING,
+               .mos_opc_end    = OBD_LAST_OPC,
+               .mos_hs         = mdt_obd_ops
+       },
+       {
+               .mos_opc_start  = LDLM_ENQUEUE,
+               .mos_opc_end    = LDLM_LAST_OPC,
+               .mos_hs         = mdt_dlm_ops
+       },
+       {
+               .mos_opc_start  = LLOG_ORIGIN_HANDLE_CREATE,
+               .mos_opc_end    = LLOG_LAST_OPC,
+               .mos_hs         = mdt_llog_ops
+       },
+       {
+               .mos_opc_start  = SEC_CTX_INIT,
+               .mos_opc_end    = SEC_LAST_OPC,
+               .mos_hs         = mdt_sec_ctx_ops
+       },
+       {
+               .mos_opc_start  = QUOTA_DQACQ,
+               .mos_opc_end    = QUOTA_LAST_OPC,
+               .mos_hs         = mdt_quota_ops
+       },
+       {
+               .mos_hs         = NULL
+       }
+};
+
+/* Readpage/readdir handlers */
+static struct mdt_handler mdt_readpage_ops[] = {
+DEF_MDT_HDL(0,                 MDS_CONNECT,  mdt_connect),
+DEF_MDT_HDL(HABEO_CORPUS | HABEO_REFERO, MDS_READPAGE, mdt_readpage),
+/* XXX: this is ugly and should be fixed one day, see mdc_close() for
+ * detailed comments. --umka */
+DEF_MDT_HDL(HABEO_CORPUS,              MDS_CLOSE,        mdt_close),
+DEF_MDT_HDL(HABEO_CORPUS,              MDS_DONE_WRITING, mdt_done_writing),
+};
+
+static struct mdt_opc_slice mdt_readpage_handlers[] = {
+       {
+               .mos_opc_start = MDS_GETATTR,
+               .mos_opc_end   = MDS_LAST_OPC,
+               .mos_hs = mdt_readpage_ops
+       },
+       {
+               .mos_opc_start = OBD_FIRST_OPC,
+               .mos_opc_end   = OBD_LAST_OPC,
+               .mos_hs = mdt_obd_ops
+       },
+       {
+               .mos_hs = NULL
+       }
+};
+
+/* Sequence service handlers */
+#define DEF_SEQ_HDL(flags, name, fn)                                   \
+       DEFINE_RPC_HANDLER(SEQ_QUERY, flags, name, fn, &RQF_ ## name)
+
+static struct mdt_handler mdt_seq_ops[] = {
+DEF_SEQ_HDL(0,                         SEQ_QUERY,        (void *)seq_query),
+};
+
+struct mdt_opc_slice mdt_seq_handlers[] = {
+       {
+               .mos_opc_start = SEQ_QUERY,
+               .mos_opc_end   = SEQ_LAST_OPC,
+               .mos_hs = mdt_seq_ops
+       },
+       {
+               .mos_hs = NULL
+       }
+};
+
+/* FID Location Database handlers */
+#define DEF_FLD_HDL(flags, name, fn)                                   \
+       DEFINE_RPC_HANDLER(FLD_QUERY, flags, name, fn, &RQF_ ## name)
+
+static struct mdt_handler mdt_fld_ops[] = {
+DEF_FLD_HDL(0,                         FLD_QUERY,        (void *)fld_query),
+};
+
+struct mdt_opc_slice mdt_fld_handlers[] = {
+       {
+               .mos_opc_start = FLD_QUERY,
+               .mos_opc_end   = FLD_LAST_OPC,
+               .mos_hs = mdt_fld_ops
+       },
+       {
+               .mos_hs = NULL
+       }
+};
+
+static int mds_regular_handle(struct ptlrpc_request *req)
+{
+       return mdt_handle_common(req, mdt_regular_handlers);
+}
+
+static int mds_readpage_handle(struct ptlrpc_request *req)
+{
+       return mdt_handle_common(req, mdt_readpage_handlers);
+}
+
+static int mds_mdsc_handle(struct ptlrpc_request *req)
+{
+       return mdt_handle_common(req, mdt_seq_handlers);
+}
+
+static int mds_mdss_handle(struct ptlrpc_request *req)
+{
+       return mdt_handle_common(req, mdt_seq_handlers);
+}
+
+static int mds_fld_handle(struct ptlrpc_request *req)
+{
+       return mdt_handle_common(req, mdt_fld_handlers);
+}
+
+/* device init/fini methods */
+static void mds_stop_ptlrpc_service(struct mds_device *m)
+{
+       ENTRY;
+       if (m->mds_regular_service != NULL) {
+               ptlrpc_unregister_service(m->mds_regular_service);
+               m->mds_regular_service = NULL;
+       }
+       if (m->mds_readpage_service != NULL) {
+               ptlrpc_unregister_service(m->mds_readpage_service);
+               m->mds_readpage_service = NULL;
+       }
+       if (m->mds_setattr_service != NULL) {
+               ptlrpc_unregister_service(m->mds_setattr_service);
+               m->mds_setattr_service = NULL;
+       }
+       if (m->mds_mdsc_service != NULL) {
+               ptlrpc_unregister_service(m->mds_mdsc_service);
+               m->mds_mdsc_service = NULL;
+       }
+       if (m->mds_mdss_service != NULL) {
+               ptlrpc_unregister_service(m->mds_mdss_service);
+               m->mds_mdss_service = NULL;
+       }
+       if (m->mds_fld_service != NULL) {
+               ptlrpc_unregister_service(m->mds_fld_service);
+               m->mds_fld_service = NULL;
+       }
+       EXIT;
+}
+
+static int mds_start_ptlrpc_service(struct mds_device *m)
+{
+       static struct ptlrpc_service_conf conf;
+       struct obd_device *obd = m->mds_md_dev.md_lu_dev.ld_obd;
+       cfs_proc_dir_entry_t *procfs_entry;
+       int rc = 0;
+       ENTRY;
+
+       procfs_entry = obd->obd_proc_entry;
+       LASSERT(procfs_entry != NULL);
+
+       conf = (typeof(conf)) {
+               .psc_name               = LUSTRE_MDT_NAME,
+               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = MDS_NBUFS,
+                       .bc_buf_size            = MDS_BUFSIZE,
+                       .bc_req_max_size        = MDS_MAXREQSIZE,
+                       .bc_rep_max_size        = MDS_MAXREPSIZE,
+                       .bc_req_portal          = MDS_REQUEST_PORTAL,
+                       .bc_rep_portal          = MDC_REPLY_PORTAL,
+               },
+               /*
+                * We'd like to have a mechanism to set this on a per-device
+                * basis, but alas...
+                */
+               .psc_thr                = {
+                       .tc_thr_name            = LUSTRE_MDT_NAME,
+                       .tc_thr_factor          = MDS_THR_FACTOR,
+                       .tc_nthrs_init          = MDS_NTHRS_INIT,
+                       .tc_nthrs_base          = MDS_NTHRS_BASE,
+                       .tc_nthrs_max           = MDS_NTHRS_MAX,
+                       .tc_nthrs_user          = mds_num_threads,
+                       .tc_cpu_affinity        = 1,
+                       .tc_ctx_tags            = LCT_MD_THREAD,
+               },
+               .psc_cpt                = {
+                       .cc_pattern             = mds_num_cpts,
+               },
+               .psc_ops                = {
+                       .so_req_handler         = mds_regular_handle,
+                       .so_req_printer         = target_print_req,
+                       .so_hpreq_handler       = ptlrpc_hpreq_handler,
+               },
+       };
+       m->mds_regular_service = ptlrpc_register_service(&conf, procfs_entry);
+       if (IS_ERR(m->mds_regular_service)) {
+               rc = PTR_ERR(m->mds_regular_service);
+               CERROR("failed to start regular mdt service: %d\n", rc);
+               m->mds_regular_service = NULL;
+
+               RETURN(rc);
+       }
+
+       /*
+        * readpage service configuration. Parameters have to be adjusted,
+        * ideally.
+        */
+       memset(&conf, 0, sizeof(conf));
+       conf = (typeof(conf)) {
+               .psc_name               = LUSTRE_MDT_NAME "_readpage",
+               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = MDS_NBUFS,
+                       .bc_buf_size            = MDS_BUFSIZE,
+                       .bc_req_max_size        = MDS_MAXREQSIZE,
+                       .bc_rep_max_size        = MDS_MAXREPSIZE,
+                       .bc_req_portal          = MDS_READPAGE_PORTAL,
+                       .bc_rep_portal          = MDC_REPLY_PORTAL,
+               },
+               .psc_thr                = {
+                       .tc_thr_name            = LUSTRE_MDT_NAME "_rdpg",
+                       .tc_thr_factor          = MDS_RDPG_THR_FACTOR,
+                       .tc_nthrs_init          = MDS_RDPG_NTHRS_INIT,
+                       .tc_nthrs_base          = MDS_RDPG_NTHRS_BASE,
+                       .tc_nthrs_max           = MDS_RDPG_NTHRS_MAX,
+                       .tc_nthrs_user          = mds_rdpg_num_threads,
+                       .tc_cpu_affinity        = 1,
+                       .tc_ctx_tags            = LCT_MD_THREAD,
+               },
+               .psc_cpt                = {
+                       .cc_pattern             = mds_rdpg_num_cpts,
+               },
+               .psc_ops                = {
+                       .so_req_handler         = mds_readpage_handle,
+                       .so_req_printer         = target_print_req,
+               },
+       };
+       m->mds_readpage_service = ptlrpc_register_service(&conf, procfs_entry);
+       if (IS_ERR(m->mds_readpage_service)) {
+               rc = PTR_ERR(m->mds_readpage_service);
+               CERROR("failed to start readpage service: %d\n", rc);
+               m->mds_readpage_service = NULL;
+
+               GOTO(err_mds_svc, rc);
+       }
+
+       /*
+        * setattr service configuration.
+        *
+        * XXX To keep the compatibility with old client(< 2.2), we need to
+        * preserve this portal for a certain time, it should be removed
+        * eventually. LU-617.
+        */
+       memset(&conf, 0, sizeof(conf));
+       conf = (typeof(conf)) {
+               .psc_name               = LUSTRE_MDT_NAME "_setattr",
+               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = MDS_NBUFS,
+                       .bc_buf_size            = MDS_BUFSIZE,
+                       .bc_req_max_size        = MDS_MAXREQSIZE,
+                       .bc_rep_max_size        = MDS_MAXREPSIZE,
+                       .bc_req_portal          = MDS_SETATTR_PORTAL,
+                       .bc_rep_portal          = MDC_REPLY_PORTAL,
+               },
+               .psc_thr                = {
+                       .tc_thr_name            = LUSTRE_MDT_NAME "_attr",
+                       .tc_thr_factor          = MDS_SETA_THR_FACTOR,
+                       .tc_nthrs_init          = MDS_SETA_NTHRS_INIT,
+                       .tc_nthrs_base          = MDS_SETA_NTHRS_BASE,
+                       .tc_nthrs_max           = MDS_SETA_NTHRS_MAX,
+                       .tc_nthrs_user          = mds_attr_num_threads,
+                       .tc_cpu_affinity        = 1,
+                       .tc_ctx_tags            = LCT_MD_THREAD,
+               },
+               .psc_cpt                = {
+                       .cc_pattern             = mds_attr_num_cpts,
+               },
+               .psc_ops                = {
+                       .so_req_handler         = mds_regular_handle,
+                       .so_req_printer         = target_print_req,
+                       .so_hpreq_handler       = NULL,
+               },
+       };
+       m->mds_setattr_service = ptlrpc_register_service(&conf, procfs_entry);
+       if (IS_ERR(m->mds_setattr_service)) {
+               rc = PTR_ERR(m->mds_setattr_service);
+               CERROR("failed to start setattr service: %d\n", rc);
+               m->mds_setattr_service = NULL;
+
+               GOTO(err_mds_svc, rc);
+       }
+
+       /*
+        * sequence controller service configuration
+        */
+       memset(&conf, 0, sizeof(conf));
+       conf = (typeof(conf)) {
+               .psc_name               = LUSTRE_MDT_NAME "_seqs",
+               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = MDS_NBUFS,
+                       .bc_buf_size            = MDS_BUFSIZE,
+                       .bc_req_max_size        = SEQ_MAXREQSIZE,
+                       .bc_rep_max_size        = SEQ_MAXREPSIZE,
+                       .bc_req_portal          = SEQ_CONTROLLER_PORTAL,
+                       .bc_rep_portal          = MDC_REPLY_PORTAL,
+               },
+               .psc_thr                = {
+                       .tc_thr_name            = LUSTRE_MDT_NAME "_seqs",
+                       .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
+                       .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
+                       .tc_ctx_tags            = LCT_MD_THREAD,
+               },
+               .psc_ops                = {
+                       .so_req_handler         = mds_mdsc_handle,
+                       .so_req_printer         = target_print_req,
+                       .so_hpreq_handler       = NULL,
+               },
+       };
+       m->mds_mdsc_service = ptlrpc_register_service(&conf, procfs_entry);
+       if (IS_ERR(m->mds_mdsc_service)) {
+               rc = PTR_ERR(m->mds_mdsc_service);
+               CERROR("failed to start seq controller service: %d\n", rc);
+               m->mds_mdsc_service = NULL;
+
+               GOTO(err_mds_svc, rc);
+       }
+
+       /*
+        * metadata sequence server service configuration
+        */
+       memset(&conf, 0, sizeof(conf));
+       conf = (typeof(conf)) {
+               .psc_name               = LUSTRE_MDT_NAME "_seqm",
+               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = MDS_NBUFS,
+                       .bc_buf_size            = MDS_BUFSIZE,
+                       .bc_req_max_size        = SEQ_MAXREQSIZE,
+                       .bc_rep_max_size        = SEQ_MAXREPSIZE,
+                       .bc_req_portal          = SEQ_METADATA_PORTAL,
+                       .bc_rep_portal          = MDC_REPLY_PORTAL,
+               },
+               .psc_thr                = {
+                       .tc_thr_name            = LUSTRE_MDT_NAME "_seqm",
+                       .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
+                       .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
+                       .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD
+               },
+               .psc_ops                = {
+                       .so_req_handler         = mds_mdss_handle,
+                       .so_req_printer         = target_print_req,
+                       .so_hpreq_handler       = NULL,
+               },
+       };
+       m->mds_mdss_service = ptlrpc_register_service(&conf, procfs_entry);
+       if (IS_ERR(m->mds_mdss_service)) {
+               rc = PTR_ERR(m->mds_mdss_service);
+               CERROR("failed to start metadata seq server service: %d\n", rc);
+               m->mds_mdss_service = NULL;
+
+               GOTO(err_mds_svc, rc);
+       }
+
+       /* FLD service start */
+       memset(&conf, 0, sizeof(conf));
+       conf = (typeof(conf)) {
+               .psc_name            = LUSTRE_MDT_NAME "_fld",
+               .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = MDS_NBUFS,
+                       .bc_buf_size            = MDS_BUFSIZE,
+                       .bc_req_max_size        = FLD_MAXREQSIZE,
+                       .bc_rep_max_size        = FLD_MAXREPSIZE,
+                       .bc_req_portal          = FLD_REQUEST_PORTAL,
+                       .bc_rep_portal          = MDC_REPLY_PORTAL,
+               },
+               .psc_thr                = {
+                       .tc_thr_name            = LUSTRE_MDT_NAME "_fld",
+                       .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
+                       .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
+                       .tc_ctx_tags            = LCT_DT_THREAD | LCT_MD_THREAD
+               },
+               .psc_ops                = {
+                       .so_req_handler         = mds_fld_handle,
+                       .so_req_printer         = target_print_req,
+                       .so_hpreq_handler       = NULL,
+               },
+       };
+       m->mds_fld_service = ptlrpc_register_service(&conf, procfs_entry);
+       if (IS_ERR(m->mds_fld_service)) {
+               rc = PTR_ERR(m->mds_fld_service);
+               CERROR("failed to start fld service: %d\n", rc);
+               m->mds_fld_service = NULL;
+
+               GOTO(err_mds_svc, rc);
+       }
+
+       EXIT;
+err_mds_svc:
+       if (rc)
+               mds_stop_ptlrpc_service(m);
+
+       return rc;
+}
+
+static inline struct mds_device *mds_dev(struct lu_device *d)
+{
+       return container_of0(d, struct mds_device, mds_md_dev.md_lu_dev);
+}
+
+static struct lu_device *mds_device_fini(const struct lu_env *env,
+                                        struct lu_device *d)
+{
+       struct mds_device *m = mds_dev(d);
+       struct obd_device *obd = d->ld_obd;
+       ENTRY;
+
+       mds_stop_ptlrpc_service(m);
+       lprocfs_obd_cleanup(obd);
+       RETURN(NULL);
+}
+
+static struct lu_device *mds_device_free(const struct lu_env *env,
+                                        struct lu_device *d)
+{
+       struct mds_device *m = mds_dev(d);
+       ENTRY;
+
+       md_device_fini(&m->mds_md_dev);
+       OBD_FREE_PTR(m);
+       RETURN(NULL);
+}
+
+static struct lu_device *mds_device_alloc(const struct lu_env *env,
+                                         struct lu_device_type *t,
+                                         struct lustre_cfg *cfg)
+{
+       struct mds_device        *m;
+       struct obd_device        *obd;
+       struct lu_device          *l;
+       int rc;
+
+       OBD_ALLOC_PTR(m);
+       if (m == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       md_device_init(&m->mds_md_dev, t);
+       l = &m->mds_md_dev.md_lu_dev;
+
+       obd = class_name2obd(lustre_cfg_string(cfg, 0));
+       LASSERT(obd != NULL);
+
+       l->ld_obd = obd;
+       /* set this lu_device to obd, because error handling need it */
+       obd->obd_lu_dev = l;
+
+       rc = lprocfs_obd_setup(obd, lprocfs_mds_obd_vars);
+       if (rc != 0) {
+               mds_device_free(env, l);
+               l = ERR_PTR(rc);
+               return l;
+       }
+
+       rc = mds_start_ptlrpc_service(m);
+
+       if (rc != 0) {
+               mds_device_free(env, l);
+               l = ERR_PTR(rc);
+               return l;
+       }
+
+       return l;
+}
+
+/* type constructor/destructor: mdt_type_init, mdt_type_fini */
+LU_TYPE_INIT_FINI(mds, &mdt_thread_key);
+
+static struct lu_device_type_operations mds_device_type_ops = {
+       .ldto_init = mds_type_init,
+       .ldto_fini = mds_type_fini,
+
+       .ldto_start = mds_type_start,
+       .ldto_stop  = mds_type_stop,
+
+       .ldto_device_alloc = mds_device_alloc,
+       .ldto_device_free  = mds_device_free,
+       .ldto_device_fini  = mds_device_fini
+};
+
+static struct lu_device_type mds_device_type = {
+       .ldt_tags     = LU_DEVICE_MD,
+       .ldt_name     = LUSTRE_MDS_NAME,
+       .ldt_ops      = &mds_device_type_ops,
+       .ldt_ctx_tags = LCT_MD_THREAD
+};
+
+static struct obd_ops mds_obd_device_ops = {
+       .o_owner           = THIS_MODULE,
+};
+
+int mds_mod_init(void)
+{
+       int rc;
+
+       if (mdt_num_threads != 0 && mds_num_threads == 0) {
+               LCONSOLE_INFO("mdt_num_threads module parameter is deprecated, "
+                             "use mds_num_threads instead or unset both for "
+                             "dynamic thread startup\n");
+               mds_num_threads = mdt_num_threads;
+       }
+
+       rc = class_register_type(&mds_obd_device_ops, NULL,
+                                lprocfs_mds_module_vars, LUSTRE_MDS_NAME,
+                                &mds_device_type);
+       return rc;
+}
+
+void mds_mod_exit(void)
+{
+       class_unregister_type(LUSTRE_MDS_NAME);
+}
index e22b7f8..75f19fd 100644 (file)
@@ -125,6 +125,9 @@ struct obd_type *class_get_type(const char *name)
                if (strcmp(modname, "obdfilter") == 0)
                        modname = "ofd";
 
                if (strcmp(modname, "obdfilter") == 0)
                        modname = "ofd";
 
+               if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
+                       modname = LUSTRE_MDT_NAME;
+
                 if (!cfs_request_module("%s", modname)) {
                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
                         type = class_search_type(name);
                 if (!cfs_request_module("%s", modname)) {
                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
                         type = class_search_type(name);
index dd46c1b..8443bc2 100644 (file)
@@ -718,20 +718,22 @@ static CFS_LIST_HEAD(lu_device_types);
 
 int lu_device_type_init(struct lu_device_type *ldt)
 {
 
 int lu_device_type_init(struct lu_device_type *ldt)
 {
-        int result;
+       int result = 0;
 
 
-        CFS_INIT_LIST_HEAD(&ldt->ldt_linkage);
-        result = ldt->ldt_ops->ldto_init(ldt);
-        if (result == 0)
-                cfs_list_add(&ldt->ldt_linkage, &lu_device_types);
-        return result;
+       CFS_INIT_LIST_HEAD(&ldt->ldt_linkage);
+       if (ldt->ldt_ops->ldto_init)
+               result = ldt->ldt_ops->ldto_init(ldt);
+       if (result == 0)
+               cfs_list_add(&ldt->ldt_linkage, &lu_device_types);
+       return result;
 }
 EXPORT_SYMBOL(lu_device_type_init);
 
 void lu_device_type_fini(struct lu_device_type *ldt)
 {
 }
 EXPORT_SYMBOL(lu_device_type_init);
 
 void lu_device_type_fini(struct lu_device_type *ldt)
 {
-        cfs_list_del_init(&ldt->ldt_linkage);
-        ldt->ldt_ops->ldto_fini(ldt);
+       cfs_list_del_init(&ldt->ldt_linkage);
+       if (ldt->ldt_ops->ldto_fini)
+               ldt->ldt_ops->ldto_fini(ldt);
 }
 EXPORT_SYMBOL(lu_device_type_fini);
 
 }
 EXPORT_SYMBOL(lu_device_type_fini);
 
@@ -739,10 +741,10 @@ void lu_types_stop(void)
 {
         struct lu_device_type *ldt;
 
 {
         struct lu_device_type *ldt;
 
-        cfs_list_for_each_entry(ldt, &lu_device_types, ldt_linkage) {
-                if (ldt->ldt_device_nr == 0)
-                        ldt->ldt_ops->ldto_stop(ldt);
-        }
+       cfs_list_for_each_entry(ldt, &lu_device_types, ldt_linkage) {
+               if (ldt->ldt_device_nr == 0 && ldt->ldt_ops->ldto_stop)
+                       ldt->ldt_ops->ldto_stop(ldt);
+       }
 }
 EXPORT_SYMBOL(lu_types_stop);
 
 }
 EXPORT_SYMBOL(lu_types_stop);
 
index fb270ea..6359717 100644 (file)
@@ -1445,11 +1445,10 @@ static int server_stop_servers(int lsiflags)
         /* Either an MDT or an OST or neither  */
         /* if this was an MDT, and there are no more MDT's, clean up the MDS */
        if ((lsiflags & LDD_F_SV_TYPE_MDT) &&
         /* Either an MDT or an OST or neither  */
         /* if this was an MDT, and there are no more MDT's, clean up the MDS */
        if ((lsiflags & LDD_F_SV_TYPE_MDT) &&
-            (obd = class_name2obd(LUSTRE_MDS_OBDNAME))) {
-                /*FIXME pre-rename, should eventually be LUSTRE_MDT_NAME*/
-                type = class_search_type(LUSTRE_MDS_NAME);
-        }
-        /* if this was an OST, and there are no more OST's, clean up the OSS */
+           (obd = class_name2obd(LUSTRE_MDS_OBDNAME))) {
+               type = class_search_type(LUSTRE_MDT_NAME);
+       }
+       /* if this was an OST, and there are no more OST's, clean up the OSS */
        if ((lsiflags & LDD_F_SV_TYPE_OST) &&
             (obd = class_name2obd(LUSTRE_OSS_OBDNAME))) {
                 type = class_search_type(LUSTRE_OST_NAME);
        if ((lsiflags & LDD_F_SV_TYPE_OST) &&
             (obd = class_name2obd(LUSTRE_OSS_OBDNAME))) {
                 type = class_search_type(LUSTRE_OST_NAME);
@@ -1784,27 +1783,23 @@ static int server_start_targets(struct super_block *sb, struct vfsmount *mnt)
 
        CDEBUG(D_MOUNT, "starting target %s\n", lsi->lsi_svname);
 
 
        CDEBUG(D_MOUNT, "starting target %s\n", lsi->lsi_svname);
 
-#if 0
-        /* If we're an MDT, make sure the global MDS is running */
-        if (lsi->lsi_ldd->ldd_flags & LDD_F_SV_TYPE_MDT) {
-                /* make sure the MDS is started */
+       if (IS_MDT(lsi)) {
+               /* make sure the MDS is started */
                mutex_lock(&server_start_lock);
                mutex_lock(&server_start_lock);
-                obd = class_name2obd(LUSTRE_MDS_OBDNAME);
-                if (!obd) {
-                        rc = lustre_start_simple(LUSTRE_MDS_OBDNAME,
-                    /* FIXME pre-rename, should eventually be LUSTRE_MDS_NAME */
-                                                 LUSTRE_MDT_NAME,
-                                                 LUSTRE_MDS_OBDNAME"_uuid",
-                                                 0, 0);
-                        if (rc) {
+               obd = class_name2obd(LUSTRE_MDS_OBDNAME);
+               if (!obd) {
+                       rc = lustre_start_simple(LUSTRE_MDS_OBDNAME,
+                                                LUSTRE_MDS_NAME,
+                                                LUSTRE_MDS_OBDNAME"_uuid",
+                                                0, 0, 0, 0);
+                       if (rc) {
                                mutex_unlock(&server_start_lock);
                                mutex_unlock(&server_start_lock);
-                                CERROR("failed to start MDS: %d\n", rc);
-                                RETURN(rc);
-                        }
-                }
+                               CERROR("failed to start MDS: %d\n", rc);
+                               RETURN(rc);
+                       }
+               }
                mutex_unlock(&server_start_lock);
                mutex_unlock(&server_start_lock);
-        }
-#endif
+       }
 
         /* If we're an OST, make sure the global OSS is running */
        if (IS_OST(lsi)) {
 
         /* If we're an OST, make sure the global OSS is running */
        if (IS_OST(lsi)) {
@@ -1830,7 +1825,7 @@ static int server_start_targets(struct super_block *sb, struct vfsmount *mnt)
        if (lsi->lsi_srv_mnt) {
                rc = server_mgc_set_fs(lsi->lsi_mgc, sb);
                if (rc)
        if (lsi->lsi_srv_mnt) {
                rc = server_mgc_set_fs(lsi->lsi_mgc, sb);
                if (rc)
-                       RETURN(rc);
+                       GOTO(out_stop_service, rc);
        }
 
         /* Register with MGS */
        }
 
         /* Register with MGS */
@@ -1913,6 +1908,10 @@ out_mgc:
        if (lsi->lsi_srv_mnt)
                server_mgc_clear_fs(lsi->lsi_mgc);
 
        if (lsi->lsi_srv_mnt)
                server_mgc_clear_fs(lsi->lsi_mgc);
 
+out_stop_service:
+       if (rc != 0)
+               server_stop_servers(lsi->lsi_flags);
+
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -2135,7 +2134,7 @@ static void server_put_super(struct super_block *sb)
                obd = class_name2obd(lsi->lsi_svname);
                 if (obd) {
                         CDEBUG(D_MOUNT, "stopping %s\n", obd->obd_name);
                obd = class_name2obd(lsi->lsi_svname);
                 if (obd) {
                         CDEBUG(D_MOUNT, "stopping %s\n", obd->obd_name);
-                        if (lsi->lsi_flags & LSI_UMOUNT_FAILOVER)
+                       if (lsiflags & LSI_UMOUNT_FAILOVER)
                                 obd->obd_fail = 1;
                         /* We can't seem to give an error return code
                          * to .put_super, so we better make sure we clean up! */
                                 obd->obd_fail = 1;
                         /* We can't seem to give an error return code
                          * to .put_super, so we better make sure we clean up! */
index c2b15d3..0f0e47d 100644 (file)
@@ -2966,7 +2966,6 @@ thread_sanity() {
        local nthrs
         shift 4
 
        local nthrs
         shift 4
 
-        setup
         check_mount || return 41
 
         # We need to expand $parampat, but it may match multiple parameters, so
         check_mount || return 41
 
         # We need to expand $parampat, but it may match multiple parameters, so
@@ -3040,18 +3039,28 @@ thread_sanity() {
 
         load_modules
         setup
 
         load_modules
         setup
-        cleanup
 }
 
 test_53a() {
 }
 
 test_53a() {
+       setup
        thread_sanity OST ost1 'ost.*.ost' 'oss_num_threads' '16'
        thread_sanity OST ost1 'ost.*.ost' 'oss_num_threads' '16'
+       cleanup
 }
 run_test 53a "check OSS thread count params"
 
 test_53b() {
 }
 run_test 53a "check OSS thread count params"
 
 test_53b() {
-       thread_sanity MDT $SINGLEMDS 'mdt.*.*.' 'mdt_num_threads' '16'
+       setup
+       local mds=$(do_facet $SINGLEMDS "lctl get_param -N mds.*.*.threads_max \
+                   2>/dev/null")
+       if [ -z "$mds" ]; then
+               #running this on an old MDT
+               thread_sanity MDT $SINGLEMDS 'mdt.*.*.' 'mdt_num_threads' 16
+       else
+               thread_sanity MDT $SINGLEMDS 'mds.*.*.' 'mds_num_threads' 16
+       fi
+       cleanup
 }
 }
-run_test 53b "check MDT thread count params"
+run_test 53b "check MDS thread count params"
 
 test_54a() {
        if [ $(facet_fstype $SINGLEMDS) != ldiskfs ]; then
 
 test_54a() {
        if [ $(facet_fstype $SINGLEMDS) != ldiskfs ]; then