Whamcloud - gitweb
- start fld and seq services in MDT, thus, make MDT an universal network related...
authoryury <yury>
Wed, 6 Sep 2006 15:23:34 +0000 (15:23 +0000)
committeryury <yury>
Wed, 6 Sep 2006 15:23:34 +0000 (15:23 +0000)
13 files changed:
lustre/fid/fid_handler.c
lustre/fid/fid_internal.h
lustre/fid/fid_store.c
lustre/fld/fld_handler.c
lustre/fld/fld_index.c
lustre/fld/fld_internal.h
lustre/include/lustre_fid.h
lustre/include/lustre_fld.h
lustre/include/lustre_mdt.h [new file with mode: 0644]
lustre/include/obd_support.h
lustre/mds/handler.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h

index 7a04805..b7e123f 100644 (file)
@@ -339,37 +339,6 @@ struct lu_context_key seq_thread_key = {
         .lct_fini = seq_thread_fini
 };
 
-static int seq_handle0(struct ptlrpc_request *req,
-                       struct seq_thread_info *info)
-{
-        int rc = 0;
-        ENTRY;
-
-        OBD_FAIL_RETURN(OBD_FAIL_SEQ_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
-
-        if (lustre_msg_get_opc(req->rq_reqmsg) == SEQ_QUERY) {
-                if (req->rq_export != NULL) {
-                        /* 
-                         * No need to return error here and overwrite @rc, this
-                         * function should return 0 even if seq_req_handle0()
-                         * returns some error code.
-                         */
-                        seq_req_handle(req, info);
-                } else {
-                        CERROR("Unconnected request\n");
-                        req->rq_status = -ENOTCONN;
-                }
-        } else {
-                CERROR("Wrong SEQ opcode: %d\n", 
-                       lustre_msg_get_opc(req->rq_reqmsg));
-                req->rq_status = -ENOTSUPP;
-                RETURN(ptlrpc_error(req));
-        }
-
-        target_send_reply(req, rc, OBD_FAIL_SEQ_ALL_REPLY_NET);
-        RETURN(rc);
-}
-
 static void seq_thread_info_init(struct ptlrpc_request *req,
                                  struct seq_thread_info *info)
 {
@@ -405,12 +374,21 @@ static int seq_handle(struct ptlrpc_request *req)
         LASSERT(info != NULL);
 
         seq_thread_info_init(req, info);
-        rc = seq_handle0(req, info);
+        rc = seq_req_handle(req, info);
         seq_thread_info_fini(info);
 
         return rc;
 }
 
+/*
+ * Entry point for handling FLD RPCs called from MDT.
+ */
+int seq_query(struct com_thread_info *info)
+{
+        return seq_handle(info->cti_pill.rc_req);
+}
+EXPORT_SYMBOL(seq_query);
+
 static void seq_server_proc_fini(struct lu_server_seq *seq);
 
 #ifdef LPROCFS
@@ -427,14 +405,6 @@ static int seq_server_proc_init(struct lu_server_seq *seq)
                 RETURN(rc);
         }
 
-        seq->lss_proc_entry = lprocfs_register("services",
-                                               seq->lss_proc_dir,
-                                               NULL, NULL);
-        if (IS_ERR(seq->lss_proc_entry)) {
-                rc = PTR_ERR(seq->lss_proc_entry);
-                GOTO(out_cleanup, rc);
-        }
-
         rc = lprocfs_add_vars(seq->lss_proc_dir,
                               seq_server_proc_list, seq);
         if (rc) {
@@ -453,12 +423,6 @@ out_cleanup:
 static void seq_server_proc_fini(struct lu_server_seq *seq)
 {
         ENTRY;
-        if (seq->lss_proc_entry != NULL) {
-                if (!IS_ERR(seq->lss_proc_entry))
-                        lprocfs_remove(seq->lss_proc_entry);
-                seq->lss_proc_entry = NULL;
-        }
-
         if (seq->lss_proc_dir != NULL) {
                 if (!IS_ERR(seq->lss_proc_dir))
                         lprocfs_remove(seq->lss_proc_dir);
@@ -489,22 +453,6 @@ int seq_server_init(struct lu_server_seq *seq,
                     const struct lu_context *ctx)
 {
         int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
-        
-        static struct ptlrpc_service_conf seq_conf;
-        seq_conf = (typeof(seq_conf)) {
-                .psc_nbufs = MDS_NBUFS,
-                .psc_bufsize = MDS_BUFSIZE,
-                .psc_max_req_size = SEQ_MAXREQSIZE,
-                .psc_max_reply_size = SEQ_MAXREPSIZE,
-                .psc_req_portal = (is_srv ?
-                                   SEQ_METADATA_PORTAL :
-                                   SEQ_CONTROLLER_PORTAL),
-                .psc_rep_portal = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_num_threads = SEQ_NUM_THREADS,
-                .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
-        };
-
         ENTRY;
 
        LASSERT(dev != NULL);
@@ -545,46 +493,6 @@ int seq_server_init(struct lu_server_seq *seq,
         if (rc)
                GOTO(out, rc);
 
-        seq->lss_md_service = ptlrpc_init_svc_conf(&seq_conf,
-                                                   seq_handle,
-                                                   LUSTRE_SEQ_NAME"_md",
-                                                   seq->lss_proc_entry,
-                                                   NULL);
-       if (seq->lss_md_service != NULL)
-               rc = ptlrpc_start_threads(NULL, seq->lss_md_service,
-                                          is_srv ? LUSTRE_MD_SEQ_NAME :
-                                                   LUSTRE_CT_SEQ_NAME);
-       else
-                GOTO(out, rc = -ENOMEM);
-
-        /* 
-         * we want to have really cluster-wide sequences space. This is why we
-         * start only one sequence controller which manages space.
-         */
-        seq_conf = (typeof(seq_conf)) {
-                .psc_nbufs = MDS_NBUFS,
-                .psc_bufsize = MDS_BUFSIZE,
-                .psc_max_req_size = SEQ_MAXREQSIZE,
-                .psc_max_reply_size = SEQ_MAXREPSIZE,
-                .psc_req_portal = SEQ_DATA_PORTAL,
-                .psc_rep_portal = OSC_REPLY_PORTAL,
-                .psc_watchdog_timeout = SEQ_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_num_threads = SEQ_NUM_THREADS,
-                .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
-        };
-        if (is_srv) {
-                seq->lss_dt_service =  ptlrpc_init_svc_conf(&seq_conf,
-                                                            seq_handle,
-                                                            LUSTRE_SEQ_NAME"_dt",
-                                                            seq->lss_proc_entry,
-                                                            NULL);
-                if (seq->lss_dt_service != NULL)
-                        rc = ptlrpc_start_threads(NULL, seq->lss_dt_service,
-                                                  LUSTRE_DT_SEQ_NAME);
-                else
-                        GOTO(out, rc = -ENOMEM);
-        }
-        
        EXIT;
 out:
        if (rc) {
@@ -602,16 +510,6 @@ void seq_server_fini(struct lu_server_seq *seq,
 {
         ENTRY;
 
-        if (seq->lss_md_service != NULL) {
-                ptlrpc_unregister_service(seq->lss_md_service);
-                seq->lss_md_service = NULL;
-        }
-
-        if (seq->lss_dt_service != NULL) {
-                ptlrpc_unregister_service(seq->lss_dt_service);
-                seq->lss_dt_service = NULL;
-        }
-
         seq_server_proc_fini(seq);
         seq_store_fini(seq, ctx);
 
index a18aa0e..79ebea6 100644 (file)
@@ -34,8 +34,6 @@
 
 #include <linux/types.h>
 
-#define SEQ_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
-
 #ifdef __KERNEL__
 struct seq_store_record {
         struct lu_range ssr_space;
index 7aa3006..57bd621 100644 (file)
@@ -151,9 +151,6 @@ int seq_store_init(struct lu_server_seq *seq,
         int rc;
         ENTRY;
 
-        LASSERT(seq->lss_md_service == NULL);
-        LASSERT(seq->lss_dt_service == NULL);
-
         name = seq->lss_type == LUSTRE_SEQ_SERVER ?
                 "seq_srv" : "seq_ctl";
         
index 27366fd..e10de82 100644 (file)
@@ -48,7 +48,7 @@
 #include <lprocfs_status.h>
 
 #include <md_object.h>
-
+#include <lustre_req_layout.h>
 #include "fld_internal.h"
 
 #ifdef __KERNEL__
@@ -146,16 +146,18 @@ static int fld_server_handle(struct lu_server_fld *fld,
 
 }
 
-static int fld_req_handle(struct lu_server_fld *fld,
-                          struct ptlrpc_request *req,
+static int fld_req_handle(struct ptlrpc_request *req,
                           struct fld_thread_info *info)
 {
+        struct lu_site *site;
         struct md_fld *in;
         struct md_fld *out;
         int rc = -EPROTO;
         __u32 *opc;
         ENTRY;
 
+        site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
+        
         rc = req_capsule_pack(&info->fti_pill);
         if (rc)
                 RETURN(rc);
@@ -169,47 +171,14 @@ static int fld_req_handle(struct lu_server_fld *fld,
                 if (out == NULL)
                         RETURN(-EPROTO);
                 *out = *in;
-                rc = fld_server_handle(fld, req->rq_svc_thread->t_ctx,
+                rc = fld_server_handle(site->ls_server_fld,
+                                       req->rq_svc_thread->t_ctx,
                                        *opc, out);
         }
 
         RETURN(rc);
 }
 
-static int fld_handle0(struct ptlrpc_request *req,
-                       struct fld_thread_info *info)
-{
-        struct lu_site *site;
-        int rc = 0;
-        ENTRY;
-
-        OBD_FAIL_RETURN(OBD_FAIL_FLD_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
-
-        if (lustre_msg_get_opc(req->rq_reqmsg) == FLD_QUERY) {
-                if (req->rq_export != NULL) {
-                        site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
-                        LASSERT(site != NULL);
-                        /* 
-                         * No need to return error here and overwrite @rc, this
-                         * function should return 0 even if fld_req_handle0()
-                         * returns some error code.
-                         */
-                        fld_req_handle(site->ls_server_fld, req, info);
-                } else {
-                        CERROR("Unconnected request\n");
-                        req->rq_status = -ENOTCONN;
-                }
-        } else {
-                CERROR("Wrong FLD opcode: %d\n", 
-                       lustre_msg_get_opc(req->rq_reqmsg));
-                req->rq_status = -ENOTSUPP;
-                RETURN(ptlrpc_error(req));
-        }
-
-        target_send_reply(req, rc, OBD_FAIL_FLD_ALL_REPLY_NET);
-        RETURN(rc);
-}
-
 static void fld_thread_info_init(struct ptlrpc_request *req,
                                  struct fld_thread_info *info)
 {
@@ -245,13 +214,22 @@ static int fld_handle(struct ptlrpc_request *req)
         LASSERT(info != NULL);
 
         fld_thread_info_init(req, info);
-        rc = fld_handle0(req, info);
+        rc = fld_req_handle(req, info);
         fld_thread_info_fini(info);
 
         return rc;
 }
 
 /*
+ * Entry point for handling FLD RPCs called from MDT.
+ */
+int fld_query(struct com_thread_info *info)
+{
+        return fld_handle(info->cti_pill.rc_req);
+}
+EXPORT_SYMBOL(fld_query);
+
+/*
  * Returns true, if fid is local to this server node.
  *
  * WARNING: this function is *not* guaranteed to return false if fid is
@@ -293,29 +271,12 @@ static int fld_server_proc_init(struct lu_server_fld *fld)
                 RETURN(rc);
         }
 
-        fld->lsf_proc_entry = lprocfs_register("services",
-                                               fld->lsf_proc_dir,
-                                               NULL, NULL);
-        if (IS_ERR(fld->lsf_proc_entry)) {
-                rc = PTR_ERR(fld->lsf_proc_entry);
-                GOTO(out_cleanup, rc);
-        }
         RETURN(rc);
-
-out_cleanup:
-        fld_server_proc_fini(fld);
-        return rc;
 }
 
 static void fld_server_proc_fini(struct lu_server_fld *fld)
 {
         ENTRY;
-        if (fld->lsf_proc_entry != NULL) {
-                if (!IS_ERR(fld->lsf_proc_entry))
-                        lprocfs_remove(fld->lsf_proc_entry);
-                fld->lsf_proc_entry = NULL;
-        }
-
         if (fld->lsf_proc_dir != NULL) {
                 if (!IS_ERR(fld->lsf_proc_dir))
                         lprocfs_remove(fld->lsf_proc_dir);
@@ -341,18 +302,6 @@ int fld_server_init(struct lu_server_fld *fld,
                     const char *uuid)
 {
         int rc;
-        static struct ptlrpc_service_conf fld_conf;
-        fld_conf = (typeof(fld_conf)) {
-                .psc_nbufs            = MDS_NBUFS,
-                .psc_bufsize          = MDS_BUFSIZE,
-                .psc_max_req_size     = FLD_MAXREQSIZE,
-                .psc_max_reply_size   = FLD_MAXREPSIZE,
-                .psc_req_portal       = FLD_REQUEST_PORTAL,
-                .psc_rep_portal       = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = FLD_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_num_threads      = FLD_NUM_THREADS,
-                .psc_ctx_tags         = LCT_DT_THREAD|LCT_MD_THREAD
-        };
         ENTRY;
 
         snprintf(fld->lsf_name, sizeof(fld->lsf_name),
@@ -366,16 +315,6 @@ int fld_server_init(struct lu_server_fld *fld,
         if (rc)
                 GOTO(out, rc);
 
-        fld->lsf_service =
-                ptlrpc_init_svc_conf(&fld_conf, fld_handle,
-                                     LUSTRE_FLD_NAME,
-                                     fld->lsf_proc_entry, NULL);
-        if (fld->lsf_service != NULL)
-                rc = ptlrpc_start_threads(NULL, fld->lsf_service,
-                                          LUSTRE_FLD_NAME);
-        else
-                rc = -ENOMEM;
-
         EXIT;
 out:
         if (rc)
@@ -391,11 +330,6 @@ void fld_server_fini(struct lu_server_fld *fld,
 {
         ENTRY;
 
-        if (fld->lsf_service != NULL) {
-                ptlrpc_unregister_service(fld->lsf_service);
-                fld->lsf_service = NULL;
-        }
-
         fld_server_proc_fini(fld);
         fld_index_fini(fld, ctx);
         
index 159b03f..31d8dfa 100644 (file)
@@ -172,12 +172,6 @@ int fld_index_init(struct lu_server_fld *fld,
         int rc;
         ENTRY;
 
-        /*
-         * lu_context_key has to be registered before threads are started,
-         * check this.
-         */
-        LASSERT(fld->lsf_service == NULL);
-
         dt_obj = dt_store_open(ctx, dt, fld_index_name, &fid);
         if (!IS_ERR(dt_obj)) {
                 fld->lsf_obj = dt_obj;
index 8734c5a..f4448db 100644 (file)
@@ -51,7 +51,6 @@ enum {
 extern struct lu_fld_hash fld_hash[];
 
 #ifdef __KERNEL__
-#define FLD_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
 struct fld_thread_info {
         struct req_capsule fti_pill;
         int                fti_rep_buf_size[4];
index c026364..2b0046c 100644 (file)
@@ -27,6 +27,8 @@
  * struct lu_fid
  */
 #include <lustre/lustre_idl.h>
+#include <lustre_req_layout.h>
+#include <lustre_mdt.h>
 
 #include <libcfs/list.h>
 #include <libcfs/kp30.h>
@@ -113,18 +115,11 @@ struct lu_server_seq {
         struct dt_object       *lss_obj;
 
         /* seq related proc */
-        cfs_proc_dir_entry_t   *lss_proc_entry;
         cfs_proc_dir_entry_t   *lss_proc_dir;
 
         /* LUSTRE_SEQ_SERVER or LUSTRE_SEQ_CONTROLLER */
         enum lu_mgr_type       lss_type;
 
-        /* server side seq service for metadata stack */
-        struct ptlrpc_service  *lss_md_service;
-
-        /* server side seq service for data stack */
-        struct ptlrpc_service  *lss_dt_service;
-
         /* client interafce to request controller */
         struct lu_client_seq   *lss_cli;
 
@@ -141,8 +136,9 @@ struct lu_server_seq {
         __u64                   lss_meta_width;
 };
 
-#ifdef __KERNEL__
+int seq_query(struct com_thread_info *info);
 
+/* Server methods */
 int seq_server_init(struct lu_server_seq *seq,
                     struct dt_device *dev,
                     const char *prefix,
@@ -163,8 +159,8 @@ int seq_server_alloc_meta(struct lu_server_seq *seq,
 int seq_server_set_cli(struct lu_server_seq *seq,
                        struct lu_client_seq *cli,
                        const struct lu_context *ctx);
-#endif
 
+/* Client methods */
 int seq_client_init(struct lu_client_seq *seq,
                     struct obd_export *exp,
                     enum lu_cli_type type,
index 4faf665..a9de9e1 100644 (file)
@@ -24,6 +24,7 @@
 #define __LINUX_FLD_H
 
 #include <lustre/lustre_idl.h>
+#include <lustre_mdt.h>
 #include <dt_object.h>
 
 #include <libcfs/list.h>
@@ -62,15 +63,9 @@ struct lu_fld_hash {
 };
 
 struct lu_server_fld {
-        /* service proc entry */
-        cfs_proc_dir_entry_t    *lsf_proc_entry;
-
         /* fld dir proc entry */
         cfs_proc_dir_entry_t    *lsf_proc_dir;
 
-        /* pointer to started server service */
-        struct ptlrpc_service   *lsf_service;
-
         /* /fld file object device */
         struct dt_object        *lsf_obj;
 
@@ -139,7 +134,9 @@ struct lu_client_fld {
         const struct lu_context       *lcf_ctx;
 };
 
-/* server methods */
+int fld_query(struct com_thread_info *info);
+
+/* Server methods */
 int fld_server_init(struct lu_server_fld *fld,
                     const struct lu_context *ctx,
                     struct dt_device *dt,
@@ -160,7 +157,7 @@ int fld_server_lookup(struct lu_server_fld *fld,
                       const struct lu_context *ctx,
                       seqno_t seq, mdsno_t *mds);
 
-/* client methods */
+/* Client methods */
 int fld_client_init(struct lu_client_fld *fld,
                     const char *prefix, int hash,
                     const struct lu_context *ctx);
@@ -182,7 +179,7 @@ int fld_client_add_target(struct lu_client_fld *fld,
 int fld_client_del_target(struct lu_client_fld *fld,
                           __u64 idx);
 
-/* cache methods */
+/* Cache methods */
 struct fld_cache_info *fld_cache_init(int hash_size,
                                       int cache_size,
                                       int cache_threshold);
diff --git a/lustre/include/lustre_mdt.h b/lustre/include/lustre_mdt.h
new file mode 100644 (file)
index 0000000..732d3a4
--- /dev/null
@@ -0,0 +1,43 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2006 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#ifndef __LINUX_MDT_H
+#define __LINUX_MDT_H
+
+#include <lustre/lustre_idl.h>
+#include <lustre_req_layout.h>
+#include <md_object.h> 
+#include <dt_object.h> 
+#include <libcfs/list.h>
+#include <libcfs/kp30.h>
+
+/*
+ * Common thread info for mdt, seq and fld
+ */
+struct com_thread_info {
+        /*
+         * for req-layout interface.
+         */
+        struct req_capsule cti_pill;
+};
+
+#endif
index e38e2b1..3808808 100644 (file)
@@ -176,12 +176,10 @@ extern int obd_race_state;
 #define OBD_FAIL_MGC_PROCESS_LOG         0x903
 
 #define OBD_FAIL_SEQ                     0x1000
-#define OBD_FAIL_SEQ_ALL_REQUEST_NET     0x1001
-#define OBD_FAIL_SEQ_ALL_REPLY_NET       0x1002
+#define OBD_FAIL_SEQ_QUERY_NET           0x1001
 
 #define OBD_FAIL_FLD                     0x1100
-#define OBD_FAIL_FLD_ALL_REQUEST_NET     0x1101
-#define OBD_FAIL_FLD_ALL_REPLY_NET       0x1102
+#define OBD_FAIL_FLD_QUERY_NET           0x1101
 
 /* preparation for a more advanced failure testbed (not functional yet) */
 #define OBD_FAIL_MASK_SYS    0x0000FF00
index 43dc952..f2ad741 100644 (file)
@@ -1369,6 +1369,8 @@ int mds_msg_check_version(struct lustre_msg *msg)
         case MDS_QUOTACTL:
         case QUOTA_DQACQ:
         case QUOTA_DQREL:
+        case SEQ_QUERY:
+        case FLD_QUERY:
                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
                 if (rc)
                         CERROR("bad opc %u version %08x, expecting %08x\n",
index e183507..4f90d63 100644 (file)
@@ -54,6 +54,7 @@
 /* lu2dt_dev() */
 #include <dt_object.h>
 #include <lustre_mds.h>
+#include <lustre_mdt.h>
 #include "mdt_internal.h"
 #include <linux/lustre_acl.h>
 /*
@@ -118,6 +119,8 @@ struct mdt_opc_slice {
 
 static struct mdt_opc_slice mdt_regular_handlers[];
 static struct mdt_opc_slice mdt_readpage_handlers[];
+static struct mdt_opc_slice mdt_seq_handlers[];
+static struct mdt_opc_slice mdt_fld_handlers[];
 
 static struct mdt_device *mdt_dev(struct lu_device *d);
 static int mdt_regular_handle(struct ptlrpc_request *req);
@@ -1569,6 +1572,26 @@ static int mdt_readpage_handle(struct ptlrpc_request *req)
         return mdt_handle_common(req, mdt_readpage_handlers);
 }
 
+static int mdt_mdsc_handle(struct ptlrpc_request *req)
+{
+        return mdt_handle_common(req, mdt_seq_handlers);
+}
+
+static int mdt_mdss_handle(struct ptlrpc_request *req)
+{
+        return mdt_handle_common(req, mdt_seq_handlers);
+}
+
+static int mdt_dtss_handle(struct ptlrpc_request *req)
+{
+        return mdt_handle_common(req, mdt_seq_handlers);
+}
+
+static int mdt_fld_handle(struct ptlrpc_request *req)
+{
+        return mdt_handle_common(req, mdt_fld_handlers);
+}
+
 enum mdt_it_code {
         MDT_IT_OPEN,
         MDT_IT_OCREAT,
@@ -2203,9 +2226,9 @@ out_fld_fini:
 /* device init/fini methods */
 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
 {
-        if (m->mdt_service != NULL) {
-                ptlrpc_unregister_service(m->mdt_service);
-                m->mdt_service = NULL;
+        if (m->mdt_regular_service != NULL) {
+                ptlrpc_unregister_service(m->mdt_regular_service);
+                m->mdt_regular_service = NULL;
         }
         if (m->mdt_readpage_service != NULL) {
                 ptlrpc_unregister_service(m->mdt_readpage_service);
@@ -2215,6 +2238,22 @@ static void mdt_stop_ptlrpc_service(struct mdt_device *m)
                 ptlrpc_unregister_service(m->mdt_setattr_service);
                 m->mdt_setattr_service = NULL;
         }
+        if (m->mdt_mdsc_service != NULL) {
+                ptlrpc_unregister_service(m->mdt_mdsc_service);
+                m->mdt_mdsc_service = NULL;
+        }
+        if (m->mdt_mdss_service != NULL) {
+                ptlrpc_unregister_service(m->mdt_mdss_service);
+                m->mdt_mdss_service = NULL;
+        }
+        if (m->mdt_dtss_service != NULL) {
+                ptlrpc_unregister_service(m->mdt_dtss_service);
+                m->mdt_dtss_service = NULL;
+        }
+        if (m->mdt_fld_service != NULL) {
+                ptlrpc_unregister_service(m->mdt_fld_service);
+                m->mdt_fld_service = NULL;
+        }
 }
 
 static int mdt_start_ptlrpc_service(struct mdt_device *m)
@@ -2244,14 +2283,14 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "mdt_ldlm_client", m->mdt_ldlm_client);
 
-        m->mdt_service =
+        m->mdt_regular_service =
                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT_NAME,
                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
                                      NULL);
-        if (m->mdt_service == NULL)
+        if (m->mdt_regular_service == NULL)
                 RETURN(-ENOMEM);
 
-        rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT_NAME);
+        rc = ptlrpc_start_threads(NULL, m->mdt_regular_service, LUSTRE_MDT_NAME);
         if (rc)
                 GOTO(err_mdt_svc, rc);
 
@@ -2315,6 +2354,123 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
         if (rc)
                 GOTO(err_mdt_svc, rc);
 
+        /*
+         * sequence controller service configuration
+         */
+        conf = (typeof(conf)) {
+                .psc_nbufs = MDS_NBUFS,
+                .psc_bufsize = MDS_BUFSIZE,
+                .psc_max_req_size = SEQ_MAXREQSIZE,
+                .psc_max_reply_size = SEQ_MAXREPSIZE,
+                .psc_req_portal = SEQ_CONTROLLER_PORTAL,
+                .psc_rep_portal = MDC_REPLY_PORTAL,
+                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
+                .psc_num_threads = SEQ_NUM_THREADS,
+                .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
+        };
+
+        m->mdt_mdsc_service =
+                ptlrpc_init_svc_conf(&conf, mdt_mdsc_handle,
+                                     LUSTRE_MDT_NAME"_mdsc",
+                                     m->mdt_md_dev.md_lu_dev.ld_proc_entry,
+                                     NULL);
+        if (!m->mdt_mdsc_service) {
+                CERROR("failed to start seq controller service\n");
+                GOTO(err_mdt_svc, rc = -ENOMEM);
+        }
+
+        rc = ptlrpc_start_threads(NULL, m->mdt_mdsc_service, "mdt_mdsc");
+        if (rc)
+                GOTO(err_mdt_svc, rc);
+
+        /*
+         * metadata sequence server service configuration
+         */
+        conf = (typeof(conf)) {
+                .psc_nbufs = MDS_NBUFS,
+                .psc_bufsize = MDS_BUFSIZE,
+                .psc_max_req_size = SEQ_MAXREQSIZE,
+                .psc_max_reply_size = SEQ_MAXREPSIZE,
+                .psc_req_portal = SEQ_METADATA_PORTAL,
+                .psc_rep_portal = MDC_REPLY_PORTAL,
+                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
+                .psc_num_threads = SEQ_NUM_THREADS,
+                .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
+        };
+
+        m->mdt_mdss_service =
+                ptlrpc_init_svc_conf(&conf, mdt_mdss_handle,
+                                     LUSTRE_MDT_NAME"_mdss",
+                                     m->mdt_md_dev.md_lu_dev.ld_proc_entry,
+                                     NULL);
+        if (!m->mdt_mdss_service) {
+                CERROR("failed to start metadata seq server service\n");
+                GOTO(err_mdt_svc, rc = -ENOMEM);
+        }
+
+        rc = ptlrpc_start_threads(NULL, m->mdt_mdss_service, "mdt_mdss");
+        if (rc)
+                GOTO(err_mdt_svc, rc);
+
+
+        /*
+         * Data sequence server service configuration. We want to have really
+         * cluster-wide sequences space. This is why we start only one sequence
+         * controller which manages space.
+         */
+        conf = (typeof(conf)) {
+                .psc_nbufs = MDS_NBUFS,
+                .psc_bufsize = MDS_BUFSIZE,
+                .psc_max_req_size = SEQ_MAXREQSIZE,
+                .psc_max_reply_size = SEQ_MAXREPSIZE,
+                .psc_req_portal = SEQ_DATA_PORTAL,
+                .psc_rep_portal = OSC_REPLY_PORTAL,
+                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
+                .psc_num_threads = SEQ_NUM_THREADS,
+                .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
+        };
+
+        m->mdt_dtss_service =
+                ptlrpc_init_svc_conf(&conf, mdt_dtss_handle,
+                                     LUSTRE_MDT_NAME"_dtss",
+                                     m->mdt_md_dev.md_lu_dev.ld_proc_entry,
+                                     NULL);
+        if (!m->mdt_dtss_service) {
+                CERROR("failed to start data seq server service\n");
+                GOTO(err_mdt_svc, rc = -ENOMEM);
+        }
+
+        rc = ptlrpc_start_threads(NULL, m->mdt_dtss_service, "mdt_dtss");
+        if (rc)
+                GOTO(err_mdt_svc, rc);
+
+        /* FLD service start */
+        conf = (typeof(conf)) {
+                .psc_nbufs            = MDS_NBUFS,
+                .psc_bufsize          = MDS_BUFSIZE,
+                .psc_max_req_size     = FLD_MAXREQSIZE,
+                .psc_max_reply_size   = FLD_MAXREPSIZE,
+                .psc_req_portal       = FLD_REQUEST_PORTAL,
+                .psc_rep_portal       = MDC_REPLY_PORTAL,
+                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
+                .psc_num_threads      = FLD_NUM_THREADS,
+                .psc_ctx_tags         = LCT_DT_THREAD|LCT_MD_THREAD
+        };
+
+        m->mdt_fld_service =
+                ptlrpc_init_svc_conf(&conf, mdt_fld_handle,
+                                     LUSTRE_MDT_NAME"_fld",
+                                     m->mdt_md_dev.md_lu_dev.ld_proc_entry,
+                                     NULL);
+        if (!m->mdt_fld_service) {
+                CERROR("failed to start fld service\n");
+                GOTO(err_mdt_svc, rc = -ENOMEM);
+        }
+
+        rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, "mdt_fld");
+        if (rc)
+                GOTO(err_mdt_svc, rc);
+
         EXIT;
 err_mdt_svc:
         if (rc)
@@ -3135,11 +3291,23 @@ static void __exit mdt_mod_exit(void)
 
 #define DEF_MDT_HNDL(flags, name, fn, fmt)                                  \
         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
+
+#define DEF_SEQ_HNDL(flags, name, fn, fmt)                      \
+        DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, fmt)
+
+#define DEF_FLD_HNDL(flags, name, fn, fmt)                      \
+        DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, fmt)
 /*
  * Request with a format known in advance
  */
 #define DEF_MDT_HNDL_F(flags, name, fn)                                 \
         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
+
+#define DEF_SEQ_HNDL_F(flags, name, fn)                                 \
+        DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, &RQF_SEQ_ ## name)
+
+#define DEF_FLD_HNDL_F(flags, name, fn)                                 \
+        DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, &RQF_SEQ_ ## name)
 /*
  * Request with a format we do not yet know
  */
@@ -3225,7 +3393,7 @@ static struct mdt_handler mdt_readpage_ops[] = {
 
         /*
          * XXX: this is ugly and should be fixed one day, see mdc_close() for
-         * detailed comment. --umka
+         * detailed comments. --umka
          */
         DEF_MDT_HNDL_F(HABEO_CORPUS,              CLOSE,    mdt_close),
 };
@@ -3241,6 +3409,36 @@ static struct mdt_opc_slice mdt_readpage_handlers[] = {
         }
 };
 
+static struct mdt_handler mdt_seq_ops[] = {
+        DEF_SEQ_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))seq_query)
+};
+
+static struct mdt_opc_slice mdt_seq_handlers[] = {
+        {
+                .mos_opc_start = SEQ_QUERY,
+                .mos_opc_end   = SEQ_LAST_OPC,
+                .mos_hs        = mdt_seq_ops
+        },
+        {
+                .mos_hs        = NULL
+        }
+};
+
+static struct mdt_handler mdt_fld_ops[] = {
+        DEF_FLD_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))fld_query)
+};
+
+static struct mdt_opc_slice mdt_fld_handlers[] = {
+        {
+                .mos_opc_start = FLD_QUERY,
+                .mos_opc_end   = FLD_LAST_OPC,
+                .mos_hs        = mdt_fld_ops
+        },
+        {
+                .mos_hs        = NULL
+        }
+};
+
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT_NAME")");
 MODULE_LICENSE("GPL");
index 5d65123..d2c9064 100644 (file)
@@ -120,9 +120,13 @@ struct mdt_file_data {
 struct mdt_device {
         /* super-class */
         struct md_device           mdt_md_dev;
-        struct ptlrpc_service     *mdt_service;
+        struct ptlrpc_service     *mdt_regular_service;
         struct ptlrpc_service     *mdt_readpage_service;
         struct ptlrpc_service     *mdt_setattr_service;
+        struct ptlrpc_service     *mdt_mdsc_service;
+        struct ptlrpc_service     *mdt_mdss_service;
+        struct ptlrpc_service     *mdt_dtss_service;
+        struct ptlrpc_service     *mdt_fld_service;
         /* DLM name-space for meta-data locks maintained by this server */
         struct ldlm_namespace     *mdt_namespace;
         /* ptlrpc handle for MDS->client connections (for lock ASTs). */
@@ -218,6 +222,11 @@ enum {
  * reduce stack consumption.
  */
 struct mdt_thread_info {
+        /*
+         * for req-layout interface.
+         */
+        struct req_capsule         mti_pill;
+
         const struct lu_context   *mti_ctxt;
         struct mdt_device         *mti_mdt;
         /*
@@ -261,10 +270,7 @@ struct mdt_thread_info {
          * A couple of lock handles.
          */
         struct mdt_lock_handle     mti_lh[MDT_LH_NR];
-        /*
-         * for req-layout interface.
-         */
-        struct req_capsule         mti_pill;
+
         /* transaction number of current request */
         __u64                      mti_transno;
         __u32                      mti_trans_flags;