Whamcloud - gitweb
LU-2145 server: use unified request handler for MGS 26/4826/15
authorMikhail Pershin <tappro@whamcloud.com>
Thu, 13 Dec 2012 18:07:52 +0000 (22:07 +0400)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 23 Jul 2013 04:24:27 +0000 (04:24 +0000)
- Unify request handler. It finds target for particular request and
  calls appropriate handler for it. Generic handlers are moved to
  the unified target code. The tgt_session_info is introduced to
  store all request-related data and passed to all handlers.
- Pack reply in llog server functions early and use err_serious()
- remove obsoleted llog_origin_handle_cancel(), it is not used
  anymore
- remove push_ctxt/pop_ctxt from llog server function, it is based
  on OSD now.

Change-Id: Idded90d8112bcab74de9a58e7885fa17dd3340b8
Signed-off-by: Mikhail Pershin <mike.pershin@intel.com>
Reviewed-on: http://review.whamcloud.com/4826
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: James Simmons <uja.ornl@gmail.com>
22 files changed:
lustre/include/lu_target.h
lustre/include/lustre_mdt.h
lustre/include/lustre_net.h
lustre/include/lustre_req_layout.h
lustre/include/obd_support.h
lustre/ldlm/ldlm_lockd.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mgs/lproc_mgs.c
lustre/mgs/mgs_handler.c
lustre/mgs/mgs_internal.h
lustre/mgs/mgs_nids.c
lustre/ofd/ofd_dev.c
lustre/ost/ost_handler.c
lustre/ptlrpc/Makefile.in
lustre/ptlrpc/layout.c
lustre/ptlrpc/llog_server.c
lustre/ptlrpc/lproc_ptlrpc.c
lustre/target/Makefile.am
lustre/target/tgt_handler.c [new file with mode: 0644]
lustre/target/tgt_internal.h
lustre/target/tgt_main.c

index 3192adf..66e6f76 100644 (file)
 #define _LUSTRE_LU_TARGET_H
 
 #include <dt_object.h>
 #define _LUSTRE_LU_TARGET_H
 
 #include <dt_object.h>
+#include <lustre_export.h>
 #include <lustre_disk.h>
 
 struct lu_target {
 #include <lustre_disk.h>
 
 struct lu_target {
-        struct obd_device       *lut_obd;
-        struct dt_device        *lut_bottom;
-        /** last_rcvd file */
-        struct dt_object        *lut_last_rcvd;
-        /* transaction callbacks */
-        struct dt_txn_callback   lut_txn_cb;
-        /** server data in last_rcvd file */
-        struct lr_server_data    lut_lsd;
-        /** Server last transaction number */
-        __u64                    lut_last_transno;
-        /** Lock protecting last transaction number */
+       struct obd_device       *lut_obd;
+       struct dt_device        *lut_bottom;
+
+       /* supported opcodes and handlers for this target */
+       struct tgt_opc_slice    *lut_slice;
+       __u32                    lut_reply_fail_id;
+       __u32                    lut_request_fail_id;
+
+       /* sptlrpc rules */
+       rwlock_t                 lut_sptlrpc_lock;
+       struct sptlrpc_rule_set  lut_sptlrpc_rset;
+       int                      lut_sec_level;
+       unsigned int             lut_mds_capa:1,
+                                lut_oss_capa:1;
+
+       /* LAST_RCVD parameters */
+       /** last_rcvd file */
+       struct dt_object        *lut_last_rcvd;
+       /* transaction callbacks */
+       struct dt_txn_callback   lut_txn_cb;
+       /** server data in last_rcvd file */
+       struct lr_server_data    lut_lsd;
+       /** Server last transaction number */
+       __u64                    lut_last_transno;
+       /** Lock protecting last transaction number */
        spinlock_t               lut_translock;
        /** Lock protecting client bitmap */
        spinlock_t               lut_client_bitmap_lock;
        /** Bitmap of known clients */
        spinlock_t               lut_translock;
        /** Lock protecting client bitmap */
        spinlock_t               lut_client_bitmap_lock;
        /** Bitmap of known clients */
-       unsigned long           *lut_client_bitmap;
+       unsigned long           *lut_client_bitmap;
 };
 
 };
 
+extern struct lu_context_key tgt_session_key;
+
+struct tgt_session_info {
+       /*
+        * The following members will be filled explicitly
+        * with specific data in tgt_ses_init().
+        */
+       struct req_capsule      *tsi_pill;
+
+       /*
+        * Lock request for "habeo clavis" operations.
+        */
+       struct ldlm_request     *tsi_dlm_req;
+
+       /* although we have export in req, there are cases when it is not
+        * available, e.g. closing files upon export destroy */
+       struct obd_export       *tsi_exp;
+       const struct lu_env     *tsi_env;
+       struct lu_target        *tsi_tgt;
+       /*
+        * Additional fail id that can be set by handler.
+        */
+       int                      tsi_reply_fail_id;
+       int                      tsi_request_fail_id;
+};
+
+static inline struct tgt_session_info *tgt_ses_info(const struct lu_env *env)
+{
+       struct tgt_session_info *tsi;
+
+       LASSERT(env->le_ses != NULL);
+       tsi = lu_context_key_get(env->le_ses, &tgt_session_key);
+       LASSERT(tsi);
+       return tsi;
+}
+
+/*
+ * Generic unified target support.
+ */
+enum tgt_handler_flags {
+       /*
+        * struct *_body is passed in the incoming message, and object
+        * identified by this fid exists on disk.
+        *                            *
+        * "habeo corpus" == "I have a body"
+        */
+       HABEO_CORPUS = (1 << 0),
+       /*
+        * struct ldlm_request is passed in the incoming message.
+        *
+        * "habeo clavis" == "I have a key"
+        *                                     */
+       HABEO_CLAVIS = (1 << 1),
+       /*
+        * this request has fixed reply format, so that reply message can be
+        * packed by generic code.
+        *
+        * "habeo refero" == "I have a reply"
+        */
+       HABEO_REFERO = (1 << 2),
+       /*
+        * this request will modify something, so check whether the file system
+        * is readonly or not, then return -EROFS to client asap if necessary.
+        *
+        * "mutabor" == "I shall modify"
+        */
+       MUTABOR      = (1 << 3)
+};
+
+struct tgt_handler {
+       /* The name of this handler. */
+       const char              *th_name;
+       /* Fail id, check at the beginning */
+       int                      th_fail_id;
+       /* Operation code */
+       __u32                    th_opc;
+       /* Flags in enum tgt_handler_flags */
+       __u32                    th_flags;
+       /* Request version for this opcode */
+       int                      th_version;
+       /* Handler function */
+       int                     (*th_act)(struct tgt_session_info *tti);
+       /* Request format for this request */
+       const struct req_format *th_fmt;
+};
+
+struct tgt_opc_slice {
+       __u32                    tos_opc_start; /* First op code */
+       __u32                    tos_opc_end; /* Last op code */
+       struct tgt_handler      *tos_hs; /* Registered handler */
+};
+
+static inline struct ptlrpc_request *tgt_ses_req(struct tgt_session_info *tsi)
+{
+       return tsi->tsi_pill ? tsi->tsi_pill->rc_req : NULL;
+}
+
+static inline __u64 tgt_conn_flags(struct tgt_session_info *tsi)
+{
+       LASSERT(tsi->tsi_exp);
+       return exp_connect_flags(tsi->tsi_exp);
+}
+
+/* target/tgt_handler.c */
+int tgt_request_handle(struct ptlrpc_request *req);
+char *tgt_name(struct lu_target *tgt);
+void tgt_counter_incr(struct obd_export *exp, int opcode);
+int tgt_connect_check_sptlrpc(struct ptlrpc_request *req,
+                             struct obd_export *exp);
+int tgt_connect(struct tgt_session_info *tsi);
+int tgt_disconnect(struct tgt_session_info *uti);
+int tgt_obd_ping(struct tgt_session_info *tsi);
+int tgt_enqueue(struct tgt_session_info *tsi);
+int tgt_convert(struct tgt_session_info *tsi);
+int tgt_bl_callback(struct tgt_session_info *tsi);
+int tgt_cp_callback(struct tgt_session_info *tsi);
+int tgt_llog_open(struct tgt_session_info *tsi);
+int tgt_llog_close(struct tgt_session_info *tsi);
+int tgt_llog_destroy(struct tgt_session_info *tsi);
+int tgt_llog_read_header(struct tgt_session_info *tsi);
+int tgt_llog_next_block(struct tgt_session_info *tsi);
+int tgt_llog_prev_block(struct tgt_session_info *tsi);
+int tgt_sec_ctx_init(struct tgt_session_info *tsi);
+int tgt_sec_ctx_init_cont(struct tgt_session_info *tsi);
+int tgt_sec_ctx_fini(struct tgt_session_info *tsi);
+
 typedef void (*tgt_cb_t)(struct lu_target *lut, __u64 transno,
 typedef void (*tgt_cb_t)(struct lu_target *lut, __u64 transno,
-                         void *data, int err);
+                        void *data, int err);
 struct tgt_commit_cb {
 struct tgt_commit_cb {
-        tgt_cb_t  tgt_cb_func;
-        void     *tgt_cb_data;
+       tgt_cb_t  tgt_cb_func;
+       void     *tgt_cb_data;
 };
 
 };
 
+/* target/tgt_main.c */
 void tgt_boot_epoch_update(struct lu_target *lut);
 int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
                           struct obd_export *exp, __u64 transno);
 int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp);
 int tgt_init(const struct lu_env *env, struct lu_target *lut,
 void tgt_boot_epoch_update(struct lu_target *lut);
 int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
                           struct obd_export *exp, __u64 transno);
 int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp);
 int tgt_init(const struct lu_env *env, struct lu_target *lut,
-            struct obd_device *obd, struct dt_device *dt);
+            struct obd_device *obd, struct dt_device *dt,
+            struct tgt_opc_slice *slice,
+            int request_fail_id, int reply_fail_id);
 void tgt_fini(const struct lu_env *env, struct lu_target *lut);
 int tgt_client_alloc(struct obd_export *exp);
 void tgt_client_free(struct obd_export *exp);
 void tgt_fini(const struct lu_env *env, struct lu_target *lut);
 int tgt_client_alloc(struct obd_export *exp);
 void tgt_client_free(struct obd_export *exp);
@@ -88,4 +232,82 @@ int tgt_server_data_write(const struct lu_env *env, struct lu_target *tg,
 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg, int sync);
 int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg, loff_t off);
 
 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg, int sync);
 int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg, loff_t off);
 
+enum {
+       ESERIOUS = 0x0001000
+};
+
+static inline int err_serious(int rc)
+{
+       LASSERT(rc < 0);
+       return -(-rc | ESERIOUS);
+}
+
+static inline int clear_serious(int rc)
+{
+       if (rc < 0)
+               rc = -(-rc & ~ESERIOUS);
+       return rc;
+}
+
+static inline int is_serious(int rc)
+{
+       return (rc < 0 && -rc & ESERIOUS);
+}
+
+/*
+ * Unified target generic handers macros and generic functions.
+ */
+#define TGT_RPC_HANDLER(base, flags, opc, fn, fmt, version)            \
+[opc - base] = {                                                       \
+       .th_name        = #opc,                                         \
+       .th_fail_id     = OBD_FAIL_ ## opc ## _NET,                     \
+       .th_opc         = opc,                                          \
+       .th_flags       = flags,                                        \
+       .th_act         = fn,                                           \
+       .th_fmt         = fmt,                                          \
+       .th_version     = version                                       \
+}
+
+/* MGS request with a format known in advance */
+#define TGT_MGS_HDL(flags, name, fn)                                   \
+       TGT_RPC_HANDLER(MGS_FIRST_OPC, flags, name, fn, &RQF_ ## name,\
+                          LUSTRE_MGS_VERSION)
+#define TGT_MGS_HDL_VAR(flags, name, fn)                               \
+       TGT_RPC_HANDLER(MGS_FIRST_OPC, flags, name, fn, NULL,   \
+                          LUSTRE_MGS_VERSION)
+
+/*
+ * OBD handler macros and generic functions.
+ */
+#define TGT_OBD_HDL(flags, name, fn)                                   \
+       TGT_RPC_HANDLER(OBD_FIRST_OPC, flags, name, fn, &RQF_ ## name,\
+                          LUSTRE_OBD_VERSION)
+
+/*
+ * DLM handler macros and generic functions.
+ */
+#define TGT_DLM_HDL_VAR(flags, name, fn)                               \
+       TGT_RPC_HANDLER(LDLM_FIRST_OPC, flags, name, fn, NULL,  \
+                          LUSTRE_DLM_VERSION)
+#define TGT_DLM_HDL(flags, name, fn)                                   \
+       TGT_RPC_HANDLER(LDLM_FIRST_OPC, flags, name, fn, &RQF_ ## name,\
+                          LUSTRE_DLM_VERSION)
+
+/*
+ * LLOG handler macros and generic functions.
+ */
+#define TGT_LLOG_HDL_VAR(flags, name, fn)                              \
+       TGT_RPC_HANDLER(LLOG_FIRST_OPC, flags, name, fn, NULL,  \
+                          LUSTRE_LOG_VERSION)
+#define TGT_LLOG_HDL(flags, name, fn)                                  \
+       TGT_RPC_HANDLER(LLOG_FIRST_OPC, flags, name, fn, &RQF_ ## name,\
+                          LUSTRE_LOG_VERSION)
+
+/*
+ * Sec context handler macros and generic functions.
+ */
+#define TGT_SEC_HDL_VAR(flags, name, fn)                               \
+       TGT_RPC_HANDLER(SEC_FIRST_OPC, flags, name, fn, NULL,   \
+                          LUSTRE_OBD_VERSION)
+
 #endif /* __LUSTRE_LU_TARGET_H */
 #endif /* __LUSTRE_LU_TARGET_H */
index 81b69fc..a5f3352 100644 (file)
@@ -43,7 +43,6 @@
 #include <lustre/lustre_idl.h>
 #include <lustre_req_layout.h>
 #include <md_object.h>
 #include <lustre/lustre_idl.h>
 #include <lustre_req_layout.h>
 #include <md_object.h>
-#include <dt_object.h>
 #include <libcfs/libcfs.h>
 
 /*
 #include <libcfs/libcfs.h>
 
 /*
@@ -56,28 +55,6 @@ struct com_thread_info {
         struct req_capsule *cti_pill;
 };
 
         struct req_capsule *cti_pill;
 };
 
-enum {
-        ESERIOUS = 0x0001000
-};
-
-static inline int err_serious(int rc)
-{
-        LASSERT(rc < 0);
-        return -(-rc | ESERIOUS);
-}
-
-static inline int clear_serious(int rc)
-{
-        if (rc < 0)
-                rc = -(-rc & ~ESERIOUS);
-        return rc;
-}
-
-static inline int is_serious(int rc)
-{
-        return (rc < 0 && -rc & ESERIOUS);
-}
-
 /** @} mdt */
 
 #endif
 /** @} mdt */
 
 #endif
index 724fe0e..2ef7b9e 100644 (file)
@@ -3544,7 +3544,6 @@ int llog_origin_handle_prev_block(struct ptlrpc_request *req);
 int llog_origin_handle_next_block(struct ptlrpc_request *req);
 int llog_origin_handle_read_header(struct ptlrpc_request *req);
 int llog_origin_handle_close(struct ptlrpc_request *req);
 int llog_origin_handle_next_block(struct ptlrpc_request *req);
 int llog_origin_handle_read_header(struct ptlrpc_request *req);
 int llog_origin_handle_close(struct ptlrpc_request *req);
-int llog_origin_handle_cancel(struct ptlrpc_request *req);
 
 /* ptlrpc/llog_client.c */
 extern struct llog_operations llog_client_ops;
 
 /* ptlrpc/llog_client.c */
 extern struct llog_operations llog_client_ops;
index 1d35252..d795b03 100644 (file)
@@ -245,6 +245,8 @@ extern struct req_format RQF_LLOG_ORIGIN_HANDLE_PREV_BLOCK;
 extern struct req_format RQF_LLOG_ORIGIN_HANDLE_READ_HEADER;
 extern struct req_format RQF_LLOG_ORIGIN_CONNECT;
 
 extern struct req_format RQF_LLOG_ORIGIN_HANDLE_READ_HEADER;
 extern struct req_format RQF_LLOG_ORIGIN_CONNECT;
 
+extern struct req_format RQF_CONNECT;
+
 extern struct req_msg_field RMF_GENERIC_DATA;
 extern struct req_msg_field RMF_PTLRPC_BODY;
 extern struct req_msg_field RMF_MDT_BODY;
 extern struct req_msg_field RMF_GENERIC_DATA;
 extern struct req_msg_field RMF_PTLRPC_BODY;
 extern struct req_msg_field RMF_MDT_BODY;
index f572a63..75a5cd3 100644 (file)
@@ -424,6 +424,13 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_MGC_PAUSE_PROCESS_LOG   0x903
 #define OBD_FAIL_MGS_PAUSE_REQ           0x904
 #define OBD_FAIL_MGS_PAUSE_TARGET_REG    0x905
 #define OBD_FAIL_MGC_PAUSE_PROCESS_LOG   0x903
 #define OBD_FAIL_MGS_PAUSE_REQ           0x904
 #define OBD_FAIL_MGS_PAUSE_TARGET_REG    0x905
+#define OBD_FAIL_MGS_CONNECT_NET        0x906
+#define OBD_FAIL_MGS_DISCONNECT_NET     0x907
+#define OBD_FAIL_MGS_SET_INFO_NET       0x908
+#define OBD_FAIL_MGS_EXCEPTION_NET      0x909
+#define OBD_FAIL_MGS_TARGET_REG_NET     0x90a
+#define OBD_FAIL_MGS_TARGET_DEL_NET     0x90b
+#define OBD_FAIL_MGS_CONFIG_READ_NET    0x90c
 
 #define OBD_FAIL_QUOTA_DQACQ_NET                       0xA01
 #define OBD_FAIL_QUOTA_EDQUOT            0xA02
 
 #define OBD_FAIL_QUOTA_DQACQ_NET                       0xA01
 #define OBD_FAIL_QUOTA_EDQUOT            0xA02
index a2d284c..2f11b93 100644 (file)
@@ -2163,16 +2163,6 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 rc = ldlm_handle_setinfo(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
                 rc = ldlm_handle_setinfo(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
-        case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */
-                CERROR("shouldn't be handling OBD_LOG_CANCEL on DLM thread\n");
-                req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
-                        RETURN(0);
-                rc = llog_origin_handle_cancel(req);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
-                        RETURN(0);
-                ldlm_callback_reply(req, rc);
-                RETURN(0);
         case LLOG_ORIGIN_HANDLE_CREATE:
                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
         case LLOG_ORIGIN_HANDLE_CREATE:
                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
@@ -2374,15 +2364,6 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
                 if (rc)
                         break;
                 RETURN(0);
                 if (rc)
                         break;
                 RETURN(0);
-        case OBD_LOG_CANCEL:
-                req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
-                        RETURN(0);
-                rc = llog_origin_handle_cancel(req);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
-                        RETURN(0);
-                ldlm_callback_reply(req, rc);
-                RETURN(0);
         default:
                 CERROR("invalid opcode %d\n",
                        lustre_msg_get_opc(req->rq_reqmsg));
         default:
                 CERROR("invalid opcode %d\n",
                        lustre_msg_get_opc(req->rq_reqmsg));
index 3c558ae..1d9f96c 100644 (file)
@@ -4797,13 +4797,9 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                }
        }
 
                }
        }
 
-        rc = tgt_init(env, &m->mdt_lut, obd, m->mdt_bottom);
-        if (rc)
-                GOTO(err_fini_stack, rc);
-
        rc = mdt_fld_init(env, mdt_obd_name(m), m);
        if (rc)
        rc = mdt_fld_init(env, mdt_obd_name(m), m);
        if (rc)
-               GOTO(err_lut, rc);
+               GOTO(err_fini_stack, rc);
 
        rc = mdt_seq_init(env, mdt_obd_name(m), m);
        if (rc)
 
        rc = mdt_seq_init(env, mdt_obd_name(m), m);
        if (rc)
@@ -4831,9 +4827,15 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         if (rc)
                 GOTO(err_free_ns, rc);
 
         if (rc)
                 GOTO(err_free_ns, rc);
 
-        rc = mdt_fs_setup(env, m, obd, lsi);
-        if (rc)
-                GOTO(err_capa, rc);
+       rc = tgt_init(env, &m->mdt_lut, obd, m->mdt_bottom, NULL,
+                     OBD_FAIL_MDS_ALL_REQUEST_NET,
+                     OBD_FAIL_MDS_ALL_REPLY_NET);
+       if (rc)
+               GOTO(err_capa, rc);
+
+       rc = mdt_fs_setup(env, m, obd, lsi);
+       if (rc)
+               GOTO(err_tgt, rc);
 
         mdt_adapt_sptlrpc_conf(obd, 1);
 
 
         mdt_adapt_sptlrpc_conf(obd, 1);
 
@@ -4841,7 +4843,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         rc = next->md_ops->mdo_iocontrol(env, next, OBD_IOC_GET_MNTOPT, 0,
                                          &mntopts);
         if (rc)
         rc = next->md_ops->mdo_iocontrol(env, next, OBD_IOC_GET_MNTOPT, 0,
                                          &mntopts);
         if (rc)
-               GOTO(err_llog_cleanup, rc);
+               GOTO(err_fs_cleanup, rc);
 
         if (mntopts & MNTOPT_USERXATTR)
                 m->mdt_opts.mo_user_xattr = 1;
 
         if (mntopts & MNTOPT_USERXATTR)
                 m->mdt_opts.mo_user_xattr = 1;
@@ -4864,7 +4866,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        if (IS_ERR(m->mdt_identity_cache)) {
                rc = PTR_ERR(m->mdt_identity_cache);
                m->mdt_identity_cache = NULL;
        if (IS_ERR(m->mdt_identity_cache)) {
                rc = PTR_ERR(m->mdt_identity_cache);
                m->mdt_identity_cache = NULL;
-               GOTO(err_llog_cleanup, rc);
+               GOTO(err_fs_cleanup, rc);
        }
 
         rc = mdt_procfs_init(m, dev);
        }
 
         rc = mdt_procfs_init(m, dev);
@@ -4881,7 +4883,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                           "mdt_ldlm_client", m->mdt_ldlm_client);
 
        ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                           "mdt_ldlm_client", m->mdt_ldlm_client);
 
-        ping_evictor_start();
+       ping_evictor_start();
 
        /* recovery will be started upon mdt_prepare()
         * when the whole stack is complete and ready
 
        /* recovery will be started upon mdt_prepare()
         * when the whole stack is complete and ready
@@ -4903,10 +4905,10 @@ err_recovery:
        target_recovery_fini(obd);
        upcall_cache_cleanup(m->mdt_identity_cache);
        m->mdt_identity_cache = NULL;
        target_recovery_fini(obd);
        upcall_cache_cleanup(m->mdt_identity_cache);
        m->mdt_identity_cache = NULL;
-err_llog_cleanup:
-       mdt_llog_ctxt_unclone(env, m, LLOG_AGENT_ORIG_CTXT);
-       mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
+err_fs_cleanup:
        mdt_fs_cleanup(env, m);
        mdt_fs_cleanup(env, m);
+err_tgt:
+       tgt_fini(env, &m->mdt_lut);
 err_capa:
        cfs_timer_disarm(&m->mdt_ck_timer);
        mdt_ck_thread_stop(m);
 err_capa:
        cfs_timer_disarm(&m->mdt_ck_timer);
        mdt_ck_thread_stop(m);
@@ -4917,8 +4919,6 @@ err_fini_seq:
        mdt_seq_fini(env, m);
 err_fini_fld:
        mdt_fld_fini(env, m);
        mdt_seq_fini(env, m);
 err_fini_fld:
        mdt_fld_fini(env, m);
-err_lut:
-       tgt_fini(env, &m->mdt_lut);
 err_fini_stack:
        mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
 err_lmi:
 err_fini_stack:
        mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
 err_lmi:
index 1022e9a..90f002e 100644 (file)
@@ -53,6 +53,7 @@
 #include <obd_class.h>
 #include <lustre_net.h>
 #include <lustre/lustre_idl.h>
 #include <obd_class.h>
 #include <lustre_net.h>
 #include <lustre/lustre_idl.h>
+#include <obd_class.h>
 #include <lustre_disk.h>
 #include <lu_target.h>
 #include <md_object.h>
 #include <lustre_disk.h>
 #include <lu_target.h>
 #include <md_object.h>
@@ -61,7 +62,6 @@
 #include <lustre_mdt.h>
 #include <lustre_req_layout.h>
 #include <lustre_sec.h>
 #include <lustre_mdt.h>
 #include <lustre_req_layout.h>
 #include <lustre_sec.h>
-#include <lvfs.h>
 #include <lustre_idmap.h>
 #include <lustre_eacl.h>
 #include <lustre_quota.h>
 #include <lustre_idmap.h>
 #include <lustre_eacl.h>
 #include <lustre_quota.h>
@@ -554,36 +554,6 @@ struct mdt_handler {
        const struct req_format *mh_fmt;
 };
 
        const struct req_format *mh_fmt;
 };
 
-enum mdt_handler_flags {
-       /*
-        * struct mdt_body is passed in the incoming message, and object
-        * identified by this fid exists on disk.
-        *
-        * "habeo corpus" == "I have a body"
-        */
-       HABEO_CORPUS = (1 << 0),
-       /*
-        * struct ldlm_request is passed in the incoming message.
-        *
-        * "habeo clavis" == "I have a key"
-        */
-       HABEO_CLAVIS = (1 << 1),
-       /*
-        * this request has fixed reply format, so that reply message can be
-        * packed by generic code.
-        *
-        * "habeo refero" == "I have a reply"
-        */
-       HABEO_REFERO = (1 << 2),
-       /*
-        * this request will modify something, so check whether the filesystem
-        * is readonly or not, then return -EROFS to client asap if necessary.
-        *
-        * "mutabor" == "I shall modify"
-        */
-       MUTABOR      = (1 << 3)
-};
-
 struct mdt_opc_slice {
        __u32                   mos_opc_start;
        int                     mos_opc_end;
 struct mdt_opc_slice {
        __u32                   mos_opc_start;
        int                     mos_opc_end;
index 732e362..ffa2d0d 100644 (file)
@@ -116,30 +116,14 @@ static void seq_show_srpc_rules(struct seq_file *seq, const char *tgtname,
 
 static int mgsself_srpc_seq_show(struct seq_file *seq, void *v)
 {
 
 static int mgsself_srpc_seq_show(struct seq_file *seq, void *v)
 {
-        struct obd_device *obd = seq->private;
-       struct mgs_device *mgs;
-        struct fs_db      *fsdb;
-       struct lu_env      env;
-        int                rc;
-
-       LASSERT(obd != NULL);
-       LASSERT(obd->obd_lu_dev != NULL);
-       mgs = lu2mgs_dev(obd->obd_lu_dev);
-
-       rc = lu_env_init(&env, LCT_MG_THREAD);
-       if (rc)
-               return rc;
+       struct obd_device       *obd = seq->private;
+       struct mgs_device       *mgs = lu2mgs_dev(obd->obd_lu_dev);
+       struct lu_target        *tgt = &mgs->mgs_lut;
 
 
-       rc = mgs_find_or_make_fsdb(&env, mgs, MGSSELF_NAME, &fsdb);
-        if (rc)
-               goto out;
+       read_lock(&tgt->lut_sptlrpc_lock);
+       seq_show_srpc_rules(seq, MGSSELF_NAME, &tgt->lut_sptlrpc_rset);
+       read_unlock(&tgt->lut_sptlrpc_lock);
 
 
-       mutex_lock(&fsdb->fsdb_mutex);
-        seq_show_srpc_rules(seq, fsdb->fsdb_name, &fsdb->fsdb_srpc_gen);
-       mutex_unlock(&fsdb->fsdb_mutex);
-
-out:
-       lu_env_fini(&env);
         return 0;
 }
 
         return 0;
 }
 
index d83beee..045e94c 100644 (file)
 
 #include "mgs_internal.h"
 
 
 #include "mgs_internal.h"
 
-/* Establish a connection to the MGS.*/
-static int mgs_connect(const struct lu_env *env,
-                       struct obd_export **exp, struct obd_device *obd,
-                       struct obd_uuid *cluuid, struct obd_connect_data *data,
-                       void *localdata)
+/*
+ * Regular MGS handlers
+ */
+static int mgs_connect(struct tgt_session_info *tsi)
 {
 {
-        struct obd_export *lexp;
-        struct lustre_handle conn = { 0 };
-        int rc;
-        ENTRY;
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       int                      rc;
 
 
-        if (!exp || !obd || !cluuid)
-                RETURN(-EINVAL);
+       ENTRY;
 
 
-        rc = class_connect(&conn, obd, cluuid);
-        if (rc)
-                RETURN(rc);
+       rc = tgt_connect(tsi);
+       if (rc)
+               RETURN(rc);
 
 
-        lexp = class_conn2export(&conn);
-       if (lexp == NULL)
-               GOTO(out, rc = -EFAULT);
+       if (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1)
+               lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
 
 
-        mgs_counter_incr(lexp, LPROC_MGS_CONNECT);
+       RETURN(0);
+}
 
 
-       if (data != NULL) {
-               data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
-               data->ocd_version = LUSTRE_VERSION_CODE;
-               lexp->exp_connect_data = *data;
-       }
+static int mgs_disconnect(struct tgt_session_info *tsi)
+{
+       int rc;
 
 
-        rc = mgs_export_stats_init(obd, lexp, localdata);
+       ENTRY;
 
 
-out:
-        if (rc) {
-                class_disconnect(lexp);
-        } else {
-                *exp = lexp;
-        }
+       LASSERT(tsi->tsi_exp);
 
 
-        RETURN(rc);
+       rc = tgt_disconnect(tsi);
+       if (rc)
+               RETURN(err_serious(rc));
+       RETURN(0);
 }
 
 }
 
-static int mgs_reconnect(const struct lu_env *env,
-                        struct obd_export *exp, struct obd_device *obd,
-                        struct obd_uuid *cluuid, struct obd_connect_data *data,
-                        void *localdata)
+static int mgs_exception(struct tgt_session_info *tsi)
 {
        ENTRY;
 
 {
        ENTRY;
 
-       if (exp == NULL || obd == NULL || cluuid == NULL)
-               RETURN(-EINVAL);
-
-       mgs_counter_incr(exp, LPROC_MGS_CONNECT);
-
-       if (data != NULL) {
-               data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
-               data->ocd_version = LUSTRE_VERSION_CODE;
-               exp->exp_connect_data = *data;
-       }
+       tgt_counter_incr(tsi->tsi_exp, LPROC_MGS_EXCEPTION);
 
 
-       RETURN(mgs_export_stats_init(obd, exp, localdata));
+       RETURN(0);
 }
 
 }
 
-static int mgs_disconnect(struct obd_export *exp)
+static int mgs_set_info(struct tgt_session_info *tsi)
 {
 {
-        int rc;
-        ENTRY;
+       struct mgs_thread_info  *mgi;
+       struct mgs_send_param   *msp, *rep_msp;
+       struct lustre_cfg       *lcfg;
+       int                      rc;
 
 
-        LASSERT(exp);
+       ENTRY;
 
 
-        mgs_fsc_cleanup(exp);
+       mgi = mgs_env_info(tsi->tsi_env);
+       if (IS_ERR(mgi))
+               RETURN(err_serious(PTR_ERR(mgi)));
 
 
-        class_export_get(exp);
-        mgs_counter_incr(exp, LPROC_MGS_DISCONNECT);
+       msp = req_capsule_client_get(tsi->tsi_pill, &RMF_MGS_SEND_PARAM);
+       if (msp == NULL)
+               RETURN(err_serious(-EFAULT));
 
 
-        rc = server_disconnect_export(exp);
-        class_export_put(exp);
-        RETURN(rc);
-}
+       /* Construct lustre_cfg structure to pass to function mgs_setparam */
+       lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
+       lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, msp->mgs_param);
+       lcfg = lustre_cfg_new(LCFG_PARAM, &mgi->mgi_bufs);
+
+       rc = mgs_setparam(tsi->tsi_env, exp2mgs_dev(tsi->tsi_exp), lcfg,
+                         mgi->mgi_fsname);
+       if (rc) {
+               LCONSOLE_WARN("%s: Unable to set parameter %s for %s: %d\n",
+                             tgt_name(tsi->tsi_tgt), msp->mgs_param,
+                             mgi->mgi_fsname, rc);
+               GOTO(out_cfg, rc);
+       }
 
 
-static int mgs_handle(struct ptlrpc_request *req);
+       /* send back the whole msp in the reply */
+       rep_msp = req_capsule_server_get(tsi->tsi_pill, &RMF_MGS_SEND_PARAM);
+       *rep_msp = *msp;
+       EXIT;
+out_cfg:
+       lustre_cfg_free(lcfg);
+       return rc;
+}
 
 static int mgs_completion_ast_config(struct ldlm_lock *lock, __u64 flags,
                                     void *cbdata)
 
 static int mgs_completion_ast_config(struct ldlm_lock *lock, __u64 flags,
                                     void *cbdata)
@@ -295,22 +296,31 @@ static int mgs_check_failover_reg(struct mgs_target_info *mti)
 }
 
 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
 }
 
 /* Called whenever a target starts up.  Flags indicate first connect, etc. */
-static int mgs_handle_target_reg(struct ptlrpc_request *req)
+static int mgs_target_reg(struct tgt_session_info *tsi)
 {
 {
-        struct obd_device *obd = req->rq_export->exp_obd;
-       struct mgs_device *mgs = exp2mgs_dev(req->rq_export);
-       struct lu_env     *env = req->rq_svc_thread->t_env;
-        struct mgs_target_info *mti, *rep_mti;
-        struct fs_db *fsdb;
-        int opc;
-        int rc = 0;
-        ENTRY;
+       struct obd_device       *obd = tsi->tsi_exp->exp_obd;
+       struct mgs_device       *mgs = exp2mgs_dev(tsi->tsi_exp);
+       struct mgs_target_info  *mti, *rep_mti;
+       struct fs_db            *fsdb;
+       int                      opc;
+       int                      rc = 0;
+
+       ENTRY;
+
+       rc = lu_env_refill((struct lu_env *)tsi->tsi_env);
+       if (rc)
+               return err_serious(rc);
 
 
-        mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_REG);
+       tgt_counter_incr(tsi->tsi_exp, LPROC_MGS_TARGET_REG);
 
 
-        mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
+       mti = req_capsule_client_get(tsi->tsi_pill, &RMF_MGS_TARGET_INFO);
+       if (mti == NULL) {
+               DEBUG_REQ(D_HA, tgt_ses_req(tsi), "no mgs_send_param");
+               RETURN(err_serious(-EFAULT));
+       }
 
 
-       if (OCD_HAS_FLAG(&req->rq_export->exp_connect_data, IMP_RECOV))
+       if (OCD_HAS_FLAG(&tgt_ses_req(tsi)->rq_export->exp_connect_data,
+                        IMP_RECOV))
                opc = mti->mti_flags & LDD_F_OPC_MASK;
        else
                opc = LDD_F_OPC_REG;
                opc = mti->mti_flags & LDD_F_OPC_MASK;
        else
                opc = LDD_F_OPC_REG;
@@ -318,7 +328,7 @@ static int mgs_handle_target_reg(struct ptlrpc_request *req)
         if (opc == LDD_F_OPC_READY) {
                 CDEBUG(D_MGS, "fs: %s index: %d is ready to reconnect.\n",
                        mti->mti_fsname, mti->mti_stripe_index);
         if (opc == LDD_F_OPC_READY) {
                 CDEBUG(D_MGS, "fs: %s index: %d is ready to reconnect.\n",
                        mti->mti_fsname, mti->mti_stripe_index);
-               rc = mgs_ir_update(env, mgs, mti);
+               rc = mgs_ir_update(tsi->tsi_env, mgs, mti);
                 if (rc) {
                         LASSERT(!(mti->mti_flags & LDD_F_IR_CAPABLE));
                         CERROR("Update IR return with %d(ignore and IR "
                 if (rc) {
                         LASSERT(!(mti->mti_flags & LDD_F_IR_CAPABLE));
                         CERROR("Update IR return with %d(ignore and IR "
@@ -341,8 +351,8 @@ static int mgs_handle_target_reg(struct ptlrpc_request *req)
                                 LDD_F_UPDATE))) {
                 /* We're just here as a startup ping. */
                 CDEBUG(D_MGS, "Server %s is running on %s\n",
                                 LDD_F_UPDATE))) {
                 /* We're just here as a startup ping. */
                 CDEBUG(D_MGS, "Server %s is running on %s\n",
-                       mti->mti_svname, obd_export_nid2str(req->rq_export));
-               rc = mgs_check_target(env, mgs, mti);
+                      mti->mti_svname, obd_export_nid2str(tsi->tsi_exp));
+               rc = mgs_check_target(tsi->tsi_env, mgs, mti);
                 /* above will set appropriate mti flags */
                 if (rc <= 0)
                         /* Nothing wrong, or fatal error */
                 /* above will set appropriate mti flags */
                 if (rc <= 0)
                         /* Nothing wrong, or fatal error */
@@ -358,14 +368,15 @@ static int mgs_handle_target_reg(struct ptlrpc_request *req)
         if (mti->mti_flags & LDD_F_WRITECONF) {
                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
                     mti->mti_stripe_index == 0) {
         if (mti->mti_flags & LDD_F_WRITECONF) {
                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT &&
                     mti->mti_stripe_index == 0) {
-                       rc = mgs_erase_logs(env, mgs, mti->mti_fsname);
+                       rc = mgs_erase_logs(tsi->tsi_env, mgs,
+                                           mti->mti_fsname);
                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
                                       "request.  All servers must be restarted "
                                       "in order to regenerate the logs."
                                       "\n", obd->obd_name, mti->mti_fsname);
                 } else if (mti->mti_flags &
                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
                         LCONSOLE_WARN("%s: Logs for fs %s were removed by user "
                                       "request.  All servers must be restarted "
                                       "in order to regenerate the logs."
                                       "\n", obd->obd_name, mti->mti_fsname);
                 } else if (mti->mti_flags &
                            (LDD_F_SV_TYPE_OST | LDD_F_SV_TYPE_MDT)) {
-                       rc = mgs_erase_log(env, mgs, mti->mti_svname);
+                       rc = mgs_erase_log(tsi->tsi_env, mgs, mti->mti_svname);
                         LCONSOLE_WARN("%s: Regenerating %s log by user "
                                       "request.\n",
                                       obd->obd_name, mti->mti_svname);
                         LCONSOLE_WARN("%s: Regenerating %s log by user "
                                       "request.\n",
                                       obd->obd_name, mti->mti_svname);
@@ -377,7 +388,7 @@ static int mgs_handle_target_reg(struct ptlrpc_request *req)
                        GOTO(out_nolock, rc);
         }
 
                        GOTO(out_nolock, rc);
         }
 
-       rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
+       rc = mgs_find_or_make_fsdb(tsi->tsi_env, mgs, mti->mti_fsname, &fsdb);
         if (rc) {
                 CERROR("Can't get db for %s: %d\n", mti->mti_fsname, rc);
                 GOTO(out_nolock, rc);
         if (rc) {
                 CERROR("Can't get db for %s: %d\n", mti->mti_fsname, rc);
                 GOTO(out_nolock, rc);
@@ -402,7 +413,7 @@ static int mgs_handle_target_reg(struct ptlrpc_request *req)
 
                 /* create or update the target log
                    and update the client/mdt logs */
 
                 /* create or update the target log
                    and update the client/mdt logs */
-               rc = mgs_write_log_target(env, mgs, mti, fsdb);
+               rc = mgs_write_log_target(tsi->tsi_env, mgs, mti, fsdb);
                 if (rc) {
                         CERROR("Failed to write %s log (%d)\n",
                                mti->mti_svname, rc);
                 if (rc) {
                         CERROR("Failed to write %s log (%d)\n",
                                mti->mti_svname, rc);
@@ -419,349 +430,133 @@ out:
        mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
 
 out_nolock:
        mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
 
 out_nolock:
-        CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
-               mti->mti_stripe_index, rc);
-        req->rq_status = rc;
-        if (rc)
-                /* we need an error flag to tell the target what's going on,
-                 * instead of just doing it by error code only. */
-                mti->mti_flags |= LDD_F_ERROR;
-
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                RETURN(rc);
-
-        /* send back the whole mti in the reply */
-        rep_mti = req_capsule_server_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
-        *rep_mti = *mti;
-
-        /* Flush logs to disk */
-       dt_sync(req->rq_svc_thread->t_env, mgs->mgs_bottom);
-        RETURN(rc);
-}
-
-static int mgs_set_info_rpc(struct ptlrpc_request *req)
-{
-       struct mgs_device *mgs = exp2mgs_dev(req->rq_export);
-       struct lu_env     *env = req->rq_svc_thread->t_env;
-        struct mgs_send_param *msp, *rep_msp;
-       struct mgs_thread_info *mgi = mgs_env_info(env);
-        int rc;
-        struct lustre_cfg *lcfg;
-        ENTRY;
-
-        msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
-        LASSERT(msp);
+       CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
+              mti->mti_stripe_index, rc);
+        /* An error flag is set in the mti reply rather than an error code */
+       if (rc)
+               mti->mti_flags |= LDD_F_ERROR;
 
 
-        /* Construct lustre_cfg structure to pass to function mgs_setparam */
-       lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
-       lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, msp->mgs_param);
-       lcfg = lustre_cfg_new(LCFG_PARAM, &mgi->mgi_bufs);
-       if (IS_ERR(lcfg))
-               GOTO(out, rc = PTR_ERR(lcfg));
-       rc = mgs_setparam(env, mgs, lcfg, mgi->mgi_fsname);
-        if (rc) {
-                CERROR("Error %d in setting the parameter %s for fs %s\n",
-                      rc, msp->mgs_param, mgi->mgi_fsname);
-               GOTO(out_cfg, rc);
-        }
+       /* send back the whole mti in the reply */
+       rep_mti = req_capsule_server_get(tsi->tsi_pill, &RMF_MGS_TARGET_INFO);
+       *rep_mti = *mti;
 
 
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc == 0) {
-                rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
-                rep_msp = msp;
-        }
-out_cfg:
-       lustre_cfg_free(lcfg);
-out:
-        RETURN(rc);
+       /* Flush logs to disk */
+       dt_sync(tsi->tsi_env, mgs->mgs_bottom);
+       RETURN(rc);
 }
 
 }
 
-static int mgs_config_read(struct ptlrpc_request *req)
+/* Called whenever a target cleans up. */
+static int mgs_target_del(struct tgt_session_info *tsi)
 {
 {
-        struct mgs_config_body *body;
-        int rc;
-        ENTRY;
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
-        if (body == NULL)
-                RETURN(-EINVAL);
-
-        switch (body->mcb_type) {
-        case CONFIG_T_RECOVER:
-                rc = mgs_get_ir_logs(req);
-                break;
-
-        case CONFIG_T_CONFIG:
-                rc = -ENOTSUPP;
-                break;
+       ENTRY;
 
 
-        default:
-                rc = -EINVAL;
-                break;
-        }
+       tgt_counter_incr(tsi->tsi_exp, LPROC_MGS_TARGET_DEL);
 
 
-        RETURN(rc);
+       RETURN(0);
 }
 
 }
 
-/*
- * similar as in ost_connect_check_sptlrpc()
- */
-static int mgs_connect_check_sptlrpc(struct ptlrpc_request *req)
+static int mgs_config_read(struct tgt_session_info *tsi)
 {
 {
-        struct obd_export     *exp = req->rq_export;
-       struct mgs_device     *mgs = exp2mgs_dev(req->rq_export);
-       struct lu_env         *env = req->rq_svc_thread->t_env;
-        struct fs_db          *fsdb;
-        struct sptlrpc_flavor  flvr;
-        int                    rc = 0;
-
-        if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
-               rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
-                if (rc)
-                        return rc;
-
-               mutex_lock(&fsdb->fsdb_mutex);
-               if (sptlrpc_rule_set_choose(&fsdb->fsdb_srpc_gen,
-                                           LUSTRE_SP_MGC, LUSTRE_SP_MGS,
-                                           req->rq_peer.nid,
-                                           &flvr) == 0) {
-                       /* by defualt allow any flavors */
-                       flvr.sf_rpc = SPTLRPC_FLVR_ANY;
-               }
-               mutex_unlock(&fsdb->fsdb_mutex);
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       struct mgs_config_body  *body;
+       int                      rc;
 
 
-               spin_lock(&exp->exp_lock);
-
-                exp->exp_sp_peer = req->rq_sp_from;
-                exp->exp_flvr = flvr;
-
-                if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
-                    exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
-                        CERROR("invalid rpc flavor %x, expect %x, from %s\n",
-                               req->rq_flvr.sf_rpc, exp->exp_flvr.sf_rpc,
-                               libcfs_nid2str(req->rq_peer.nid));
-                        rc = -EACCES;
-                }
-
-               spin_unlock(&exp->exp_lock);
-        } else {
-                if (exp->exp_sp_peer != req->rq_sp_from) {
-                        CERROR("RPC source %s doesn't match %s\n",
-                               sptlrpc_part2name(req->rq_sp_from),
-                               sptlrpc_part2name(exp->exp_sp_peer));
-                        rc = -EACCES;
-                } else {
-                        rc = sptlrpc_target_export_check(exp, req);
-                }
-        }
+       ENTRY;
 
 
-        return rc;
-}
+       body = req_capsule_client_get(tsi->tsi_pill, &RMF_MGS_CONFIG_BODY);
+       if (body == NULL) {
+               DEBUG_REQ(D_HA, req, "no mgs_config_body");
+               RETURN(err_serious(-EFAULT));
+       }
 
 
-/* Called whenever a target cleans up. */
-/* XXX - Currently unused */
-static int mgs_handle_target_del(struct ptlrpc_request *req)
-{
-        ENTRY;
-        mgs_counter_incr(req->rq_export, LPROC_MGS_TARGET_DEL);
-        RETURN(0);
-}
+       switch (body->mcb_type) {
+       case CONFIG_T_RECOVER:
+               rc = mgs_get_ir_logs(req);
+               break;
+       case CONFIG_T_CONFIG:
+               rc = -EOPNOTSUPP;
+               break;
+       default:
+               rc = -EINVAL;
+               break;
+       }
 
 
-/* XXX - Currently unused */
-static int mgs_handle_exception(struct ptlrpc_request *req)
-{
-        ENTRY;
-        mgs_counter_incr(req->rq_export, LPROC_MGS_EXCEPTION);
-        RETURN(0);
+       RETURN(rc);
 }
 
 }
 
-/*
- * For old clients there is no direct way of knowing which filesystems
- * a client is operating at the MGS side. But we need to pick up those
- * clients so that the MGS can mark the corresponding filesystem as
- * non-IR capable because old clients are not ready to be notified.
- *
- * This is why we have this _hack_ function. We detect the filesystem's
- * name by hacking llog operation which is currently used by the clients
- * to fetch configuration logs. At present this is fine because this is
- * the ONLY llog operation between mgc and the MGS.
- *
- * If extra llog operation is going to be added, this function needs fixing.
- *
- * If releases prior than 2.0 are not supported, we can remove this function.
- */
-static int mgs_handle_fslog_hack(struct ptlrpc_request *req)
+static int mgs_llog_open(struct tgt_session_info *tsi)
 {
 {
-        char *logname;
-        char fsname[16];
-        char *ptr;
-        int rc;
+       struct mgs_thread_info  *mgi;
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       char                    *logname;
+       int                      rc;
 
 
-        /* XXX: We suppose that llog at mgs is only used for
-         * fetching file system log */
-        logname = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
-        if (logname == NULL) {
-                CERROR("No logname, is llog on MGS used for something else?\n");
-                return -EINVAL;
-        }
-
-        ptr = strchr(logname, '-');
-        rc = (int)(ptr - logname);
-        if (ptr == NULL || rc >= sizeof(fsname)) {
-                CERROR("Invalid logname received: %s\n", logname);
-                return -EINVAL;
-        }
+       ENTRY;
 
 
-        strncpy(fsname, logname, rc);
-        fsname[rc] = 0;
-       rc = mgs_fsc_attach(req->rq_svc_thread->t_env, req->rq_export, fsname);
-        if (rc < 0 && rc != -EEXIST)
-                CERROR("add fs client %s returns %d\n", fsname, rc);
+       rc = tgt_llog_open(tsi);
+       if (rc)
+               RETURN(rc);
 
 
-        return rc;
+       /*
+        * For old clients there is no direct way of knowing which file system
+        * a client is operating at the MGS side. But we need to pick up those
+        * clients so that the MGS can mark the corresponding file system as
+        * non-IR capable because old clients are not ready to be notified.
+        *
+        * Therefore we attempt to detect the file systems name by hacking the
+        * llog operation which is currently used by the clients to fetch
+        * configuration logs. At present this is fine because this is the
+        * ONLY llog operation between mgc and the MGS.
+        *
+        * If extra llog operation are going to be added, this function needs
+        * further work.
+        *
+        * When releases prior than 2.0 are not supported, the following code
+        * can be removed.
+        */
+       mgi = mgs_env_info(tsi->tsi_env);
+       if (IS_ERR(mgi))
+               RETURN(PTR_ERR(mgi));
+
+       logname = req_capsule_client_get(tsi->tsi_pill, &RMF_NAME);
+       if (logname) {
+               char *ptr = strchr(logname, '-');
+               int   len = (int)(ptr - logname);
+
+               if (ptr == NULL || len >= sizeof(mgi->mgi_fsname)) {
+                       LCONSOLE_WARN("%s: non-config logname received: %s\n",
+                                     tgt_name(tsi->tsi_tgt), logname);
+                       /* not error, this can be llog test name */
+               } else {
+                       strncpy(mgi->mgi_fsname, logname, len);
+                       mgi->mgi_fsname[len] = 0;
+
+                       rc = mgs_fsc_attach(tsi->tsi_env, tsi->tsi_exp,
+                                           mgi->mgi_fsname);
+                       if (rc && rc != -EEXIST) {
+                               LCONSOLE_WARN("%s: Unable to add client %s "
+                                             "to file system %s: %d\n",
+                                             tgt_name(tsi->tsi_tgt),
+                                             libcfs_nid2str(req->rq_peer.nid),
+                                             mgi->mgi_fsname, rc);
+                       } else {
+                               rc = 0;
+                       }
+               }
+       } else {
+               CERROR("%s: no logname in request\n", tgt_name(tsi->tsi_tgt));
+               RETURN(-EINVAL);
+       }
+       RETURN(rc);
 }
 
 }
 
-/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
-int mgs_handle(struct ptlrpc_request *req)
+/*
+ * sec context handlers
+ */
+/* XXX: Implement based on mdt_sec_ctx_handle()? */
+static int mgs_sec_ctx_handle(struct tgt_session_info *tsi)
 {
 {
-        int fail = OBD_FAIL_MGS_ALL_REPLY_NET;
-        int opc, rc = 0;
-        ENTRY;
-
-        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
-        CFS_FAIL_TIMEOUT_MS(OBD_FAIL_MGS_PAUSE_REQ, cfs_fail_val);
-        if (CFS_FAIL_CHECK(OBD_FAIL_MGS_ALL_REQUEST_NET))
-                RETURN(0);
-
-        LASSERT(current->journal_info == NULL);
-        opc = lustre_msg_get_opc(req->rq_reqmsg);
-
-        if (opc == SEC_CTX_INIT ||
-            opc == SEC_CTX_INIT_CONT ||
-            opc == SEC_CTX_FINI)
-                GOTO(out, rc = 0);
-
-        if (opc != MGS_CONNECT) {
-                if (!class_connected_export(req->rq_export)) {
-                        DEBUG_REQ(D_MGS, req, "operation on unconnected MGS\n");
-                        req->rq_status = -ENOTCONN;
-                        GOTO(out, rc = -ENOTCONN);
-                }
-        }
-
-        switch (opc) {
-        case MGS_CONNECT:
-                DEBUG_REQ(D_MGS, req, "connect");
-                /* MGS and MDS have same request format for connect */
-                req_capsule_set(&req->rq_pill, &RQF_MDS_CONNECT);
-                rc = target_handle_connect(req);
-                if (rc == 0)
-                        rc = mgs_connect_check_sptlrpc(req);
-
-                if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
-                        /* Make clients trying to reconnect after a MGS restart
-                           happy; also requires obd_replayable */
-                        lustre_msg_add_op_flags(req->rq_repmsg,
-                                                MSG_CONNECT_RECONNECT);
-                break;
-        case MGS_DISCONNECT:
-                DEBUG_REQ(D_MGS, req, "disconnect");
-                /* MGS and MDS have same request format for disconnect */
-                req_capsule_set(&req->rq_pill, &RQF_MDS_DISCONNECT);
-                rc = target_handle_disconnect(req);
-                req->rq_status = rc;            /* superfluous? */
-                break;
-        case MGS_EXCEPTION:
-                DEBUG_REQ(D_MGS, req, "exception");
-                rc = mgs_handle_exception(req);
-                break;
-        case MGS_TARGET_REG:
-                DEBUG_REQ(D_MGS, req, "target add");
-                req_capsule_set(&req->rq_pill, &RQF_MGS_TARGET_REG);
-                rc = mgs_handle_target_reg(req);
-                break;
-        case MGS_TARGET_DEL:
-                DEBUG_REQ(D_MGS, req, "target del");
-                rc = mgs_handle_target_del(req);
-                break;
-        case MGS_SET_INFO:
-                DEBUG_REQ(D_MGS, req, "set_info");
-                req_capsule_set(&req->rq_pill, &RQF_MGS_SET_INFO);
-                rc = mgs_set_info_rpc(req);
-                break;
-        case MGS_CONFIG_READ:
-                DEBUG_REQ(D_MGS, req, "read config");
-                req_capsule_set(&req->rq_pill, &RQF_MGS_CONFIG_READ);
-                rc = mgs_config_read(req);
-                break;
-        case LDLM_ENQUEUE:
-                DEBUG_REQ(D_MGS, req, "enqueue");
-                req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
-                rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
-                                         ldlm_server_blocking_ast, NULL);
-                break;
-        case LDLM_BL_CALLBACK:
-        case LDLM_CP_CALLBACK:
-                DEBUG_REQ(D_MGS, req, "callback");
-                CERROR("callbacks should not happen on MGS\n");
-                LBUG();
-                break;
-
-        case OBD_PING:
-                DEBUG_REQ(D_INFO, req, "ping");
-                req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
-                rc = target_handle_ping(req);
-                break;
-        case OBD_LOG_CANCEL:
-                DEBUG_REQ(D_MGS, req, "log cancel");
-                rc = -ENOTSUPP; /* la la la */
-                break;
-
-        case LLOG_ORIGIN_HANDLE_CREATE:
-               DEBUG_REQ(D_MGS, req, "llog_open");
-               req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
-               rc = llog_origin_handle_open(req);
-                if (rc == 0)
-                        (void)mgs_handle_fslog_hack(req);
-                break;
-        case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
-                DEBUG_REQ(D_MGS, req, "llog next block");
-                req_capsule_set(&req->rq_pill,
-                                &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
-                rc = llog_origin_handle_next_block(req);
-                break;
-       case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
-               DEBUG_REQ(D_MGS, req, "llog prev block");
-               req_capsule_set(&req->rq_pill,
-                               &RQF_LLOG_ORIGIN_HANDLE_PREV_BLOCK);
-               rc = llog_origin_handle_prev_block(req);
-               break;
-        case LLOG_ORIGIN_HANDLE_READ_HEADER:
-                DEBUG_REQ(D_MGS, req, "llog read header");
-                req_capsule_set(&req->rq_pill,
-                                &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
-                rc = llog_origin_handle_read_header(req);
-                break;
-        case LLOG_ORIGIN_HANDLE_CLOSE:
-                DEBUG_REQ(D_MGS, req, "llog close");
-                rc = llog_origin_handle_close(req);
-                break;
-       default:
-               rc = -EOPNOTSUPP;
-        }
-
-        LASSERT(current->journal_info == NULL);
-       if (rc) {
-               DEBUG_REQ(D_MGS, req, "MGS fail to handle opc = %d: rc = %d\n",
-                         opc, rc);
-               req->rq_status = rc;
-               rc = ptlrpc_error(req);
-               RETURN(rc);
-       }
-out:
-        target_send_reply(req, rc, fail);
-        RETURN(0);
+       return 0;
 }
 
 static inline int mgs_init_export(struct obd_export *exp)
 }
 
 static inline int mgs_init_export(struct obd_export *exp)
@@ -1058,15 +853,81 @@ out:
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
+static struct tgt_handler mgs_mgs_handlers[] = {
+TGT_RPC_HANDLER(MGS_FIRST_OPC,
+               0,                      MGS_CONNECT,     mgs_connect,
+               &RQF_CONNECT, LUSTRE_OBD_VERSION),
+TGT_RPC_HANDLER(MGS_FIRST_OPC,
+               0,                      MGS_DISCONNECT,  mgs_disconnect,
+               &RQF_MDS_DISCONNECT, LUSTRE_OBD_VERSION),
+TGT_MGS_HDL_VAR(0,                     MGS_EXCEPTION,   mgs_exception),
+TGT_MGS_HDL    (HABEO_REFERO | MUTABOR,        MGS_SET_INFO,    mgs_set_info),
+TGT_MGS_HDL    (HABEO_REFERO | MUTABOR,        MGS_TARGET_REG,  mgs_target_reg),
+TGT_MGS_HDL_VAR(0,                     MGS_TARGET_DEL,  mgs_target_del),
+TGT_MGS_HDL    (HABEO_REFERO,          MGS_CONFIG_READ, mgs_config_read),
+};
+
+static struct tgt_handler mgs_obd_handlers[] = {
+TGT_OBD_HDL(0, OBD_PING,       tgt_obd_ping),
+};
+
+static struct tgt_handler mgs_dlm_handlers[] = {
+TGT_DLM_HDL(HABEO_CLAVIS,      LDLM_ENQUEUE,   tgt_enqueue),
+};
+
+static struct tgt_handler mgs_llog_handlers[] = {
+TGT_LLOG_HDL    (0,    LLOG_ORIGIN_HANDLE_CREATE,      mgs_llog_open),
+TGT_LLOG_HDL    (0,    LLOG_ORIGIN_HANDLE_NEXT_BLOCK,  tgt_llog_next_block),
+TGT_LLOG_HDL    (0,    LLOG_ORIGIN_HANDLE_READ_HEADER, tgt_llog_read_header),
+TGT_LLOG_HDL_VAR(0,    LLOG_ORIGIN_HANDLE_CLOSE,       tgt_llog_close),
+TGT_LLOG_HDL    (0,    LLOG_ORIGIN_HANDLE_PREV_BLOCK,  tgt_llog_prev_block),
+};
+
+static struct tgt_handler mgs_sec_ctx_handlers[] = {
+TGT_SEC_HDL_VAR(0,     SEC_CTX_INIT,           mgs_sec_ctx_handle),
+TGT_SEC_HDL_VAR(0,     SEC_CTX_INIT_CONT,      mgs_sec_ctx_handle),
+TGT_SEC_HDL_VAR(0,     SEC_CTX_FINI,           mgs_sec_ctx_handle),
+};
+
+static struct tgt_opc_slice mgs_common_slice[] = {
+       {
+               .tos_opc_start = MGS_FIRST_OPC,
+               .tos_opc_end   = MGS_LAST_OPC,
+               .tos_hs        = mgs_mgs_handlers
+       },
+       {
+               .tos_opc_start = OBD_FIRST_OPC,
+               .tos_opc_end   = OBD_LAST_OPC,
+               .tos_hs        = mgs_obd_handlers
+       },
+       {
+               .tos_opc_start = LDLM_FIRST_OPC,
+               .tos_opc_end   = LDLM_LAST_OPC,
+               .tos_hs        = mgs_dlm_handlers
+       },
+       {
+               .tos_opc_start = LLOG_FIRST_OPC,
+               .tos_opc_end   = LLOG_LAST_OPC,
+               .tos_hs        = mgs_llog_handlers
+       },
+       {
+               .tos_opc_start = SEC_FIRST_OPC,
+               .tos_opc_end   = SEC_LAST_OPC,
+               .tos_hs        = mgs_sec_ctx_handlers
+       },
+       {
+               .tos_hs        = NULL
+       }
+};
 
 static int mgs_init0(const struct lu_env *env, struct mgs_device *mgs,
                     struct lu_device_type *ldt, struct lustre_cfg *lcfg)
 {
 
 static int mgs_init0(const struct lu_env *env, struct mgs_device *mgs,
                     struct lu_device_type *ldt, struct lustre_cfg *lcfg)
 {
-       static struct ptlrpc_service_conf        conf;
-       struct obd_device                       *obd;
-       struct lustre_mount_info                *lmi;
-       struct llog_ctxt                        *ctxt;
-       int                                      rc;
+       struct ptlrpc_service_conf       conf;
+       struct obd_device               *obd;
+       struct lustre_mount_info        *lmi;
+       struct llog_ctxt                *ctxt;
+       int                              rc;
 
        ENTRY;
 
 
        ENTRY;
 
@@ -1096,15 +957,20 @@ static int mgs_init0(const struct lu_env *env, struct mgs_device *mgs,
        if (obd->obd_namespace == NULL)
                GOTO(err_ops, rc = -ENOMEM);
 
        if (obd->obd_namespace == NULL)
                GOTO(err_ops, rc = -ENOMEM);
 
-       /* ldlm setup */
-       ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
-                          "mgs_ldlm_client", &obd->obd_ldlm_client);
+       /* No recovery for MGCs */
+       obd->obd_replayable = 0;
+
+       rc = tgt_init(env, &mgs->mgs_lut, obd, mgs->mgs_bottom,
+                     mgs_common_slice, OBD_FAIL_MGS_ALL_REQUEST_NET,
+                     OBD_FAIL_MGS_ALL_REPLY_NET);
+       if (rc)
+               GOTO(err_ns, rc);
 
        rc = mgs_fs_setup(env, mgs);
        if (rc) {
                CERROR("%s: MGS filesystem method init failed: rc = %d\n",
                       obd->obd_name, rc);
 
        rc = mgs_fs_setup(env, mgs);
        if (rc) {
                CERROR("%s: MGS filesystem method init failed: rc = %d\n",
                       obd->obd_name, rc);
-               GOTO(err_ns, rc);
+               GOTO(err_tgt, rc);
        }
 
        rc = llog_setup(env, obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT,
        }
 
        rc = llog_setup(env, obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT,
@@ -1119,9 +985,6 @@ static int mgs_init0(const struct lu_env *env, struct mgs_device *mgs,
        ctxt->loc_dir = mgs->mgs_configs_dir;
        llog_ctxt_put(ctxt);
 
        ctxt->loc_dir = mgs->mgs_configs_dir;
        llog_ctxt_put(ctxt);
 
-       /* No recovery for MGC's */
-       obd->obd_replayable = 0;
-
        /* Internal mgs setup */
        mgs_init_fsdb_list(mgs);
        mutex_init(&mgs->mgs_mutex);
        /* Internal mgs setup */
        mgs_init_fsdb_list(mgs);
        mutex_init(&mgs->mgs_mutex);
@@ -1135,6 +998,9 @@ static int mgs_init0(const struct lu_env *env, struct mgs_device *mgs,
                GOTO(err_llog, rc);
        }
 
                GOTO(err_llog, rc);
        }
 
+       ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
+                          "mgs_ldlm_client", &obd->obd_ldlm_client);
+
        conf = (typeof(conf)) {
                .psc_name               = LUSTRE_MGS_NAME,
                .psc_watchdog_factor    = MGS_SERVICE_WATCHDOG_FACTOR,
        conf = (typeof(conf)) {
                .psc_name               = LUSTRE_MGS_NAME,
                .psc_watchdog_factor    = MGS_SERVICE_WATCHDOG_FACTOR,
@@ -1153,15 +1019,17 @@ static int mgs_init0(const struct lu_env *env, struct mgs_device *mgs,
                        .tc_ctx_tags            = LCT_MG_THREAD,
                },
                .psc_ops                = {
                        .tc_ctx_tags            = LCT_MG_THREAD,
                },
                .psc_ops                = {
-                       .so_req_handler         = mgs_handle,
+                       .so_req_handler         = tgt_request_handle,
                        .so_req_printer         = target_print_req,
                },
        };
                        .so_req_printer         = target_print_req,
                },
        };
+
        /* Start the service threads */
        mgs->mgs_service = ptlrpc_register_service(&conf, obd->obd_proc_entry);
        if (IS_ERR(mgs->mgs_service)) {
                rc = PTR_ERR(mgs->mgs_service);
        /* Start the service threads */
        mgs->mgs_service = ptlrpc_register_service(&conf, obd->obd_proc_entry);
        if (IS_ERR(mgs->mgs_service)) {
                rc = PTR_ERR(mgs->mgs_service);
-               CERROR("failed to start service: %d\n", rc);
+               CERROR("failed to start mgs service: %d\n", rc);
+               mgs->mgs_service = NULL;
                GOTO(err_lproc, rc);
        }
 
                GOTO(err_lproc, rc);
        }
 
@@ -1172,7 +1040,6 @@ static int mgs_init0(const struct lu_env *env, struct mgs_device *mgs,
        /* device stack is not yet fully setup to keep no objects behind */
        lu_site_purge(env, mgs2lu_dev(mgs)->ld_site, ~0);
        RETURN(0);
        /* device stack is not yet fully setup to keep no objects behind */
        lu_site_purge(env, mgs2lu_dev(mgs)->ld_site, ~0);
        RETURN(0);
-
 err_lproc:
        lproc_mgs_cleanup(mgs);
 err_llog:
 err_lproc:
        lproc_mgs_cleanup(mgs);
 err_llog:
@@ -1181,6 +1048,8 @@ err_llog:
                ctxt->loc_dir = NULL;
                llog_cleanup(env, ctxt);
        }
                ctxt->loc_dir = NULL;
                llog_cleanup(env, ctxt);
        }
+err_tgt:
+       tgt_fini(env, &mgs->mgs_lut);
 err_fs:
        /* No extra cleanup needed for llog_init_commit_thread() */
        mgs_fs_cleanup(env, mgs);
 err_fs:
        /* No extra cleanup needed for llog_init_commit_thread() */
        mgs_fs_cleanup(env, mgs);
@@ -1335,6 +1204,8 @@ static struct lu_device *mgs_device_fini(const struct lu_env *env,
 
        LASSERT(mgs->mgs_bottom);
 
 
        LASSERT(mgs->mgs_bottom);
 
+       class_disconnect_exports(obd);
+
        ping_evictor_stop();
 
        ptlrpc_unregister_service(mgs->mgs_service);
        ping_evictor_stop();
 
        ptlrpc_unregister_service(mgs->mgs_service);
@@ -1342,6 +1213,7 @@ static struct lu_device *mgs_device_fini(const struct lu_env *env,
        obd_exports_barrier(obd);
        obd_zombie_barrier();
 
        obd_exports_barrier(obd);
        obd_zombie_barrier();
 
+       tgt_fini(env, &mgs->mgs_lut);
        mgs_cleanup_fsdb_list(mgs);
        lproc_mgs_cleanup(mgs);
 
        mgs_cleanup_fsdb_list(mgs);
        lproc_mgs_cleanup(mgs);
 
@@ -1397,24 +1269,99 @@ static struct lu_device_type mgs_device_type = {
        .ldt_ctx_tags   = LCT_MG_THREAD
 };
 
        .ldt_ctx_tags   = LCT_MG_THREAD
 };
 
+static int mgs_obd_connect(const struct lu_env *env, struct obd_export **exp,
+                          struct obd_device *obd, struct obd_uuid *cluuid,
+                          struct obd_connect_data *data, void *localdata)
+{
+       struct obd_export       *lexp;
+       struct lustre_handle     conn = { 0 };
+       int                      rc;
+
+       ENTRY;
+
+       if (exp == NULL || obd == NULL || cluuid == NULL)
+               RETURN(-EINVAL);
+
+       rc = class_connect(&conn, obd, cluuid);
+       if (rc)
+               RETURN(rc);
+
+       lexp = class_conn2export(&conn);
+       if (lexp == NULL)
+               RETURN(-EFAULT);
+
+       if (data != NULL) {
+               data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
+               data->ocd_version = LUSTRE_VERSION_CODE;
+               lexp->exp_connect_data = *data;
+       }
+
+       tgt_counter_incr(lexp, LPROC_MGS_CONNECT);
+
+       rc = mgs_export_stats_init(obd, lexp, localdata);
+       if (rc)
+               class_disconnect(lexp);
+       else
+               *exp = lexp;
+
+       RETURN(rc);
+}
+
+static int mgs_obd_reconnect(const struct lu_env *env, struct obd_export *exp,
+                            struct obd_device *obd, struct obd_uuid *cluuid,
+                            struct obd_connect_data *data, void *localdata)
+{
+       ENTRY;
+
+       if (exp == NULL || obd == NULL || cluuid == NULL)
+               RETURN(-EINVAL);
+
+       tgt_counter_incr(exp, LPROC_MGS_CONNECT);
+
+       if (data != NULL) {
+               data->ocd_connect_flags &= MGS_CONNECT_SUPPORTED;
+               data->ocd_version = LUSTRE_VERSION_CODE;
+               exp->exp_connect_data = *data;
+       }
+
+       RETURN(mgs_export_stats_init(obd, exp, localdata));
+}
+
+static int mgs_obd_disconnect(struct obd_export *exp)
+{
+       int rc;
+
+       ENTRY;
+
+       LASSERT(exp);
+
+       mgs_fsc_cleanup(exp);
+
+       class_export_get(exp);
+       tgt_counter_incr(exp, LPROC_MGS_DISCONNECT);
+
+       rc = server_disconnect_export(exp);
+       class_export_put(exp);
+       RETURN(rc);
+}
 
 /* use obd ops to offer management infrastructure */
 
 /* use obd ops to offer management infrastructure */
-static struct obd_ops mgs_obd_ops = {
-        .o_owner           = THIS_MODULE,
-        .o_connect         = mgs_connect,
-        .o_reconnect       = mgs_reconnect,
-        .o_disconnect      = mgs_disconnect,
-        .o_init_export     = mgs_init_export,
-        .o_destroy_export  = mgs_destroy_export,
-        .o_iocontrol       = mgs_iocontrol,
+static struct obd_ops mgs_obd_device_ops = {
+       .o_owner                = THIS_MODULE,
+       .o_connect              = mgs_obd_connect,
+       .o_reconnect            = mgs_obd_reconnect,
+       .o_disconnect           = mgs_obd_disconnect,
+       .o_init_export          = mgs_init_export,
+       .o_destroy_export       = mgs_destroy_export,
+       .o_iocontrol            = mgs_iocontrol,
 };
 
 static int __init mgs_init(void)
 {
 };
 
 static int __init mgs_init(void)
 {
-        struct lprocfs_static_vars lvars;
+       struct lprocfs_static_vars lvars;
 
 
-        lprocfs_mgs_init_vars(&lvars);
-       class_register_type(&mgs_obd_ops, NULL, lvars.module_vars,
+       lprocfs_mgs_init_vars(&lvars);
+       class_register_type(&mgs_obd_device_ops, NULL, lvars.module_vars,
                            LUSTRE_MGS_NAME, &mgs_device_type);
 
        return 0;
                            LUSTRE_MGS_NAME, &mgs_device_type);
 
        return 0;
@@ -1422,7 +1369,7 @@ static int __init mgs_init(void)
 
 static void /*__exit*/ mgs_exit(void)
 {
 
 static void /*__exit*/ mgs_exit(void)
 {
-        class_unregister_type(LUSTRE_MGS_NAME);
+       class_unregister_type(LUSTRE_MGS_NAME);
 }
 
 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
 }
 
 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
index fc3687c..fe5fcb0 100644 (file)
@@ -173,6 +173,7 @@ struct mgs_device {
        struct obd_device               *mgs_obd;
        struct local_oid_storage        *mgs_los;
        struct mutex                     mgs_mutex;
        struct obd_device               *mgs_obd;
        struct local_oid_storage        *mgs_los;
        struct mutex                     mgs_mutex;
+       struct lu_target                 mgs_lut;
 };
 
 /* this is a top object */
 };
 
 /* this is a top object */
@@ -290,9 +291,16 @@ extern struct lu_context_key mgs_thread_key;
 
 static inline struct mgs_thread_info *mgs_env_info(const struct lu_env *env)
 {
 
 static inline struct mgs_thread_info *mgs_env_info(const struct lu_env *env)
 {
-       struct mgs_thread_info *info;
+       struct mgs_thread_info  *info;
+       int                     rc;
 
        info = lu_context_key_get(&env->le_ctx, &mgs_thread_key);
 
        info = lu_context_key_get(&env->le_ctx, &mgs_thread_key);
+       if (info == NULL) {
+               rc = lu_env_refill((struct lu_env *)env);
+               if (rc != 0)
+                       return ERR_PTR(rc);
+               info = lu_context_key_get(&env->le_ctx, &mgs_thread_key);
+       }
        LASSERT(info != NULL);
        return info;
 }
        LASSERT(info != NULL);
        return info;
 }
index 4f46881..391f925 100644 (file)
@@ -658,10 +658,6 @@ int mgs_get_ir_logs(struct ptlrpc_request *req)
         if (pages == NULL)
                 RETURN(-ENOMEM);
 
         if (pages == NULL)
                 RETURN(-ENOMEM);
 
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                GOTO(out, rc);
-
         res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
         if (res == NULL)
                 GOTO(out, rc = -EINVAL);
         res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
         if (res == NULL)
                 GOTO(out, rc = -EINVAL);
index a9237f7..dfe363b 100644 (file)
@@ -728,7 +728,9 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
        m->ofd_grant_ratio =
                ofd_grant_ratio_conv(m->ofd_dt_conf.ddp_grant_reserved);
 
        m->ofd_grant_ratio =
                ofd_grant_ratio_conv(m->ofd_dt_conf.ddp_grant_reserved);
 
-       rc = tgt_init(env, &m->ofd_lut, obd, m->ofd_osd);
+       rc = tgt_init(env, &m->ofd_lut, obd, m->ofd_osd, NULL,
+                     OBD_FAIL_OST_ALL_REQUEST_NET,
+                     OBD_FAIL_OST_ALL_REPLY_NET);
        if (rc)
                GOTO(err_free_ns, rc);
 
        if (rc)
                GOTO(err_free_ns, rc);
 
index fc7d74d..5b7e34e 100644 (file)
@@ -2494,19 +2494,6 @@ int ost_handle(struct ptlrpc_request *req)
                 if (rc)
                         RETURN(rc);
                 RETURN(ptlrpc_reply(req));
                 if (rc)
                         RETURN(rc);
                 RETURN(ptlrpc_reply(req));
-        case OBD_LOG_CANCEL:
-                CDEBUG(D_INODE, "log cancel\n");
-                req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
-                        RETURN(0);
-                rc = llog_origin_handle_cancel(req);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
-                        RETURN(0);
-                req->rq_status = rc;
-                rc = req_capsule_server_pack(&req->rq_pill);
-                if (rc)
-                        RETURN(rc);
-                RETURN(ptlrpc_reply(req));
        case LDLM_ENQUEUE:
                CDEBUG(D_INODE, "enqueue\n");
                req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
        case LDLM_ENQUEUE:
                CDEBUG(D_INODE, "enqueue\n");
                req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
index a4ccd7c..dd23c84 100644 (file)
@@ -18,6 +18,7 @@ ptlrpc_objs += sec_null.o sec_plain.o nrs.o nrs_fifo.o nrs_crr.o nrs_orr.o
 ptlrpc_objs += errno.o
 
 target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o
 ptlrpc_objs += errno.o
 
 target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o
+target_objs += $(TARGET)tgt_handler.o
 
 ptlrpc-objs := $(ldlm_objs) $(ptlrpc_objs)
 @SERVER_TRUE@ptlrpc-objs += $(target_objs)
 
 ptlrpc-objs := $(ldlm_objs) $(ptlrpc_objs)
 @SERVER_TRUE@ptlrpc-objs += $(target_objs)
index 82696c4..c16d05b 100644 (file)
@@ -742,7 +742,8 @@ static struct req_format *req_formats[] = {
         &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK,
         &RQF_LLOG_ORIGIN_HANDLE_PREV_BLOCK,
         &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER,
         &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK,
         &RQF_LLOG_ORIGIN_HANDLE_PREV_BLOCK,
         &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER,
-        &RQF_LLOG_ORIGIN_CONNECT
+       &RQF_LLOG_ORIGIN_CONNECT,
+       &RQF_CONNECT,
 };
 
 struct req_msg_field {
 };
 
 struct req_msg_field {
@@ -1508,6 +1509,10 @@ struct req_format RQF_LLOG_ORIGIN_CONNECT =
         DEFINE_REQ_FMT0("LLOG_ORIGIN_CONNECT", llogd_conn_body_only, empty);
 EXPORT_SYMBOL(RQF_LLOG_ORIGIN_CONNECT);
 
         DEFINE_REQ_FMT0("LLOG_ORIGIN_CONNECT", llogd_conn_body_only, empty);
 EXPORT_SYMBOL(RQF_LLOG_ORIGIN_CONNECT);
 
+struct req_format RQF_CONNECT =
+       DEFINE_REQ_FMT0("CONNECT", obd_connect_client, obd_connect_server);
+EXPORT_SYMBOL(RQF_CONNECT);
+
 struct req_format RQF_OST_CONNECT =
         DEFINE_REQ_FMT0("OST_CONNECT",
                         obd_connect_client, obd_connect_server);
 struct req_format RQF_OST_CONNECT =
         DEFINE_REQ_FMT0("OST_CONNECT",
                         obd_connect_client, obd_connect_server);
index 44a15a1..2b2488b 100644 (file)
@@ -47,9 +47,9 @@
 #endif
 
 #include <obd_class.h>
 #endif
 
 #include <obd_class.h>
+#include <lu_target.h>
 #include <lustre_log.h>
 #include <lustre_net.h>
 #include <lustre_log.h>
 #include <lustre_net.h>
-#include <lustre_fsfilt.h>
 
 #if defined(__KERNEL__) && defined(LUSTRE_LOG_SERVER)
 static int llog_origin_close(const struct lu_env *env, struct llog_handle *lgh)
 
 #if defined(__KERNEL__) && defined(LUSTRE_LOG_SERVER)
 static int llog_origin_close(const struct lu_env *env, struct llog_handle *lgh)
@@ -65,8 +65,6 @@ int llog_origin_handle_open(struct ptlrpc_request *req)
 {
        struct obd_export       *exp = req->rq_export;
        struct obd_device       *obd = exp->exp_obd;
 {
        struct obd_export       *exp = req->rq_export;
        struct obd_device       *obd = exp->exp_obd;
-       struct obd_device       *disk_obd;
-       struct lvfs_run_ctxt     saved;
        struct llog_handle      *loghandle;
        struct llogd_body       *body;
        struct llog_logid       *logid = NULL;
        struct llog_handle      *loghandle;
        struct llogd_body       *body;
        struct llog_logid       *logid = NULL;
@@ -76,46 +74,42 @@ int llog_origin_handle_open(struct ptlrpc_request *req)
 
        ENTRY;
 
 
        ENTRY;
 
-        body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
-        if (body == NULL)
-                RETURN(-EFAULT);
+       body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
+       if (body == NULL)
+               RETURN(err_serious(-EFAULT));
+
+       rc = req_capsule_server_pack(&req->rq_pill);
+       if (rc)
+               RETURN(err_serious(-ENOMEM));
 
        if (ostid_id(&body->lgd_logid.lgl_oi) > 0)
                logid = &body->lgd_logid;
 
 
        if (ostid_id(&body->lgd_logid.lgl_oi) > 0)
                logid = &body->lgd_logid;
 
-        if (req_capsule_field_present(&req->rq_pill, &RMF_NAME, RCL_CLIENT)) {
-                name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
-                if (name == NULL)
-                        RETURN(-EFAULT);
-                CDEBUG(D_INFO, "%s: opening log %s\n", obd->obd_name, name);
-        }
-
-        ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
-        if (ctxt == NULL) {
-                CDEBUG(D_WARNING, "%s: no ctxt. group=%p idx=%d name=%s\n",
-                       obd->obd_name, &obd->obd_olg, body->lgd_ctxt_idx, name);
-                RETURN(-ENODEV);
-        }
-        disk_obd = ctxt->loc_exp->exp_obd;
-        push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+       if (req_capsule_field_present(&req->rq_pill, &RMF_NAME, RCL_CLIENT)) {
+               name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
+               if (name == NULL)
+                       RETURN(-EFAULT);
+               CDEBUG(D_INFO, "%s: opening log %s\n", obd->obd_name, name);
+       }
+
+       ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
+       if (ctxt == NULL) {
+               CDEBUG(D_WARNING, "%s: no ctxt. group=%p idx=%d name=%s\n",
+                      obd->obd_name, &obd->obd_olg, body->lgd_ctxt_idx, name);
+               RETURN(-ENODEV);
+       }
 
        rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle, logid,
                       name, LLOG_OPEN_EXISTS);
 
        rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle, logid,
                       name, LLOG_OPEN_EXISTS);
-        if (rc)
-                GOTO(out_pop, rc);
-
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                GOTO(out_close, rc = -ENOMEM);
+       if (rc)
+               GOTO(out_ctxt, rc);
 
 
-        body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
-        body->lgd_logid = loghandle->lgh_id;
+       body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
+       body->lgd_logid = loghandle->lgh_id;
 
 
-       EXIT;
-out_close:
        llog_origin_close(req->rq_svc_thread->t_env, loghandle);
        llog_origin_close(req->rq_svc_thread->t_env, loghandle);
-out_pop:
-       pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+       EXIT;
+out_ctxt:
        llog_ctxt_put(ctxt);
        return rc;
 }
        llog_ctxt_put(ctxt);
        return rc;
 }
@@ -123,8 +117,6 @@ EXPORT_SYMBOL(llog_origin_handle_open);
 
 int llog_origin_handle_destroy(struct ptlrpc_request *req)
 {
 
 int llog_origin_handle_destroy(struct ptlrpc_request *req)
 {
-       struct obd_device       *disk_obd;
-       struct lvfs_run_ctxt     saved;
        struct llogd_body       *body;
        struct llog_logid       *logid = NULL;
        struct llog_ctxt        *ctxt;
        struct llogd_body       *body;
        struct llog_logid       *logid = NULL;
        struct llog_ctxt        *ctxt;
@@ -132,9 +124,13 @@ int llog_origin_handle_destroy(struct ptlrpc_request *req)
 
        ENTRY;
 
 
        ENTRY;
 
-        body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
-        if (body == NULL)
-                RETURN(-EFAULT);
+       body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
+       if (body == NULL)
+               RETURN(err_serious(-EFAULT));
+
+       rc = req_capsule_server_pack(&req->rq_pill);
+       if (rc < 0)
+               RETURN(err_serious(-ENOMEM));
 
        if (ostid_id(&body->lgd_logid.lgl_oi) > 0)
                logid = &body->lgd_logid;
 
        if (ostid_id(&body->lgd_logid.lgl_oi) > 0)
                logid = &body->lgd_logid;
@@ -147,14 +143,7 @@ int llog_origin_handle_destroy(struct ptlrpc_request *req)
        if (ctxt == NULL)
                RETURN(-ENODEV);
 
        if (ctxt == NULL)
                RETURN(-ENODEV);
 
-       disk_obd = ctxt->loc_exp->exp_obd;
-       push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-
-       rc = req_capsule_server_pack(&req->rq_pill);
-       /* erase only if no error and logid is valid */
-       if (rc == 0)
-               rc = llog_erase(req->rq_svc_thread->t_env, ctxt, logid, NULL);
-       pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+       rc = llog_erase(req->rq_svc_thread->t_env, ctxt, logid, NULL);
        llog_ctxt_put(ctxt);
        RETURN(rc);
 }
        llog_ctxt_put(ctxt);
        RETURN(rc);
 }
@@ -162,50 +151,45 @@ EXPORT_SYMBOL(llog_origin_handle_destroy);
 
 int llog_origin_handle_next_block(struct ptlrpc_request *req)
 {
 
 int llog_origin_handle_next_block(struct ptlrpc_request *req)
 {
-        struct obd_device   *disk_obd;
-        struct llog_handle  *loghandle;
-        struct llogd_body   *body;
-        struct llogd_body   *repbody;
-        struct lvfs_run_ctxt saved;
-        struct llog_ctxt    *ctxt;
-        __u32                flags;
-        void                *ptr;
-       int                  rc;
-
-        ENTRY;
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
-        if (body == NULL)
-                RETURN(-EFAULT);
+       struct llog_handle      *loghandle;
+       struct llogd_body       *body;
+       struct llogd_body       *repbody;
+       struct llog_ctxt        *ctxt;
+       __u32                    flags;
+       void                    *ptr;
+       int                      rc;
+
+       ENTRY;
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
+       if (body == NULL)
+               RETURN(err_serious(-EFAULT));
+
+       req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
+                            LLOG_CHUNK_SIZE);
+       rc = req_capsule_server_pack(&req->rq_pill);
+       if (rc)
+               RETURN(err_serious(-ENOMEM));
 
        ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
        if (ctxt == NULL)
                RETURN(-ENODEV);
 
 
        ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
        if (ctxt == NULL)
                RETURN(-ENODEV);
 
-       disk_obd = ctxt->loc_exp->exp_obd;
-       push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-
        rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
                       &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
        if (rc)
        rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
                       &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
        if (rc)
-               GOTO(out_pop, rc);
+               GOTO(out_ctxt, rc);
 
        flags = body->lgd_llh_flags;
        rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
                              NULL);
 
        flags = body->lgd_llh_flags;
        rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
                              NULL);
-        if (rc)
-                GOTO(out_close, rc);
-
-        req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
-                             LLOG_CHUNK_SIZE);
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                GOTO(out_close, rc = -ENOMEM);
+       if (rc)
+               GOTO(out_close, rc);
 
 
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
-        *repbody = *body;
+       repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
+       *repbody = *body;
 
 
-        ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
+       ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
        rc = llog_next_block(req->rq_svc_thread->t_env, loghandle,
                             &repbody->lgd_saved_index, repbody->lgd_index,
                             &repbody->lgd_cur_offset, ptr, LLOG_CHUNK_SIZE);
        rc = llog_next_block(req->rq_svc_thread->t_env, loghandle,
                             &repbody->lgd_saved_index, repbody->lgd_index,
                             &repbody->lgd_cur_offset, ptr, LLOG_CHUNK_SIZE);
@@ -214,8 +198,7 @@ int llog_origin_handle_next_block(struct ptlrpc_request *req)
        EXIT;
 out_close:
        llog_origin_close(req->rq_svc_thread->t_env, loghandle);
        EXIT;
 out_close:
        llog_origin_close(req->rq_svc_thread->t_env, loghandle);
-out_pop:
-       pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+out_ctxt:
        llog_ctxt_put(ctxt);
        return rc;
 }
        llog_ctxt_put(ctxt);
        return rc;
 }
@@ -223,50 +206,45 @@ EXPORT_SYMBOL(llog_origin_handle_next_block);
 
 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
 {
 
 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
 {
-        struct llog_handle   *loghandle;
-        struct llogd_body    *body;
-        struct llogd_body    *repbody;
-        struct obd_device    *disk_obd;
-        struct lvfs_run_ctxt  saved;
-        struct llog_ctxt     *ctxt;
-        __u32                 flags;
-        void                 *ptr;
-       int                   rc;
-
-        ENTRY;
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
-        if (body == NULL)
-                RETURN(-EFAULT);
+       struct llog_handle      *loghandle;
+       struct llogd_body       *body;
+       struct llogd_body       *repbody;
+       struct llog_ctxt        *ctxt;
+       __u32                    flags;
+       void                    *ptr;
+       int                      rc;
+
+       ENTRY;
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
+       if (body == NULL)
+               RETURN(err_serious(-EFAULT));
+
+       req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
+                            LLOG_CHUNK_SIZE);
+       rc = req_capsule_server_pack(&req->rq_pill);
+       if (rc)
+               RETURN(err_serious(-ENOMEM));
 
        ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
        if (ctxt == NULL)
                RETURN(-ENODEV);
 
 
        ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
        if (ctxt == NULL)
                RETURN(-ENODEV);
 
-        disk_obd = ctxt->loc_exp->exp_obd;
-        push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-
        rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
                         &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
        if (rc)
        rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
                         &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
        if (rc)
-               GOTO(out_pop, rc);
+               GOTO(out_ctxt, rc);
 
        flags = body->lgd_llh_flags;
        rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
                              NULL);
 
        flags = body->lgd_llh_flags;
        rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
                              NULL);
-        if (rc)
-                GOTO(out_close, rc);
-
-        req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
-                             LLOG_CHUNK_SIZE);
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                GOTO(out_close, rc = -ENOMEM);
+       if (rc)
+               GOTO(out_close, rc);
 
 
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
-        *repbody = *body;
+       repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
+       *repbody = *body;
 
 
-        ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
+       ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
        rc = llog_prev_block(req->rq_svc_thread->t_env, loghandle,
                             body->lgd_index, ptr, LLOG_CHUNK_SIZE);
        if (rc)
        rc = llog_prev_block(req->rq_svc_thread->t_env, loghandle,
                             body->lgd_index, ptr, LLOG_CHUNK_SIZE);
        if (rc)
@@ -275,8 +253,7 @@ int llog_origin_handle_prev_block(struct ptlrpc_request *req)
        EXIT;
 out_close:
        llog_origin_close(req->rq_svc_thread->t_env, loghandle);
        EXIT;
 out_close:
        llog_origin_close(req->rq_svc_thread->t_env, loghandle);
-out_pop:
-       pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+out_ctxt:
        llog_ctxt_put(ctxt);
        return rc;
 }
        llog_ctxt_put(ctxt);
        return rc;
 }
@@ -284,32 +261,31 @@ EXPORT_SYMBOL(llog_origin_handle_prev_block);
 
 int llog_origin_handle_read_header(struct ptlrpc_request *req)
 {
 
 int llog_origin_handle_read_header(struct ptlrpc_request *req)
 {
-        struct obd_device    *disk_obd;
-        struct llog_handle   *loghandle;
-        struct llogd_body    *body;
-        struct llog_log_hdr  *hdr;
-        struct lvfs_run_ctxt  saved;
-        struct llog_ctxt     *ctxt;
-        __u32                 flags;
-       int                   rc;
+       struct llog_handle      *loghandle;
+       struct llogd_body       *body;
+       struct llog_log_hdr     *hdr;
+       struct llog_ctxt        *ctxt;
+       __u32                    flags;
+       int                      rc;
+
+       ENTRY;
 
 
-        ENTRY;
+       body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
+       if (body == NULL)
+               RETURN(err_serious(-EFAULT));
 
 
-        body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
-        if (body == NULL)
-                RETURN(-EFAULT);
+       rc = req_capsule_server_pack(&req->rq_pill);
+       if (rc)
+               RETURN(err_serious(-ENOMEM));
 
        ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
        if (ctxt == NULL)
                RETURN(-ENODEV);
 
 
        ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
        if (ctxt == NULL)
                RETURN(-ENODEV);
 
-        disk_obd = ctxt->loc_exp->exp_obd;
-        push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-
        rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
                       &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
        if (rc)
        rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
                       &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
        if (rc)
-               GOTO(out_pop, rc);
+               GOTO(out_ctxt, rc);
 
        /*
         * llog_init_handle() reads the llog header
 
        /*
         * llog_init_handle() reads the llog header
@@ -321,17 +297,12 @@ int llog_origin_handle_read_header(struct ptlrpc_request *req)
                GOTO(out_close, rc);
        flags = loghandle->lgh_hdr->llh_flags;
 
                GOTO(out_close, rc);
        flags = loghandle->lgh_hdr->llh_flags;
 
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                GOTO(out_close, rc = -ENOMEM);
-
-        hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
-        *hdr = *loghandle->lgh_hdr;
-        EXIT;
+       hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
+       *hdr = *loghandle->lgh_hdr;
+       EXIT;
 out_close:
        llog_origin_close(req->rq_svc_thread->t_env, loghandle);
 out_close:
        llog_origin_close(req->rq_svc_thread->t_env, loghandle);
-out_pop:
-       pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
+out_ctxt:
        llog_ctxt_put(ctxt);
        return rc;
 }
        llog_ctxt_put(ctxt);
        return rc;
 }
@@ -339,94 +310,16 @@ EXPORT_SYMBOL(llog_origin_handle_read_header);
 
 int llog_origin_handle_close(struct ptlrpc_request *req)
 {
 
 int llog_origin_handle_close(struct ptlrpc_request *req)
 {
-        ENTRY;
-        /* Nothing to do */
-        RETURN(0);
-}
-EXPORT_SYMBOL(llog_origin_handle_close);
+       int      rc;
 
 
-int llog_origin_handle_cancel(struct ptlrpc_request *req)
-{
-        int num_cookies, rc = 0, err, i, failed = 0;
-        struct obd_device *disk_obd;
-        struct llog_cookie *logcookies;
-        struct llog_ctxt *ctxt = NULL;
-        struct lvfs_run_ctxt saved;
-        struct llog_handle *cathandle;
-        struct inode *inode;
-        void *handle;
-        ENTRY;
-
-        logcookies = req_capsule_client_get(&req->rq_pill, &RMF_LOGCOOKIES);
-        num_cookies = req_capsule_get_size(&req->rq_pill, &RMF_LOGCOOKIES,
-                                           RCL_CLIENT) / sizeof(*logcookies);
-        if (logcookies == NULL || num_cookies == 0) {
-                DEBUG_REQ(D_HA, req, "No llog cookies sent");
-                RETURN(-EFAULT);
-        }
-
-       ctxt = llog_get_context(req->rq_export->exp_obd,
-                               logcookies->lgc_subsys);
-        if (ctxt == NULL)
-                RETURN(-ENODEV);
-
-        disk_obd = ctxt->loc_exp->exp_obd;
-        push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-        for (i = 0; i < num_cookies; i++, logcookies++) {
-                cathandle = ctxt->loc_handle;
-                LASSERT(cathandle != NULL);
-                inode = cathandle->lgh_file->f_dentry->d_inode;
-
-                handle = fsfilt_start_log(disk_obd, inode,
-                                          FSFILT_OP_CANCEL_UNLINK, NULL, 1);
-                if (IS_ERR(handle)) {
-                        CERROR("fsfilt_start_log() failed: %ld\n",
-                               PTR_ERR(handle));
-                        GOTO(pop_ctxt, rc = PTR_ERR(handle));
-                }
-
-               rc = llog_cat_cancel_records(req->rq_svc_thread->t_env,
-                                            cathandle, 1, logcookies);
-
-                /*
-                 * Do not raise -ENOENT errors for resent rpcs. This rec already
-                 * might be killed.
-                 */
-                if (rc == -ENOENT &&
-                    (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) {
-                        /*
-                         * Do not change this message, reply-single.sh test_59b
-                         * expects to find this in log.
-                         */
-                        CDEBUG(D_RPCTRACE, "RESENT cancel req %p - ignored\n",
-                               req);
-                        rc = 0;
-                } else if (rc == 0) {
-                        CDEBUG(D_RPCTRACE, "Canceled %d llog-records\n",
-                               num_cookies);
-                }
-
-                err = fsfilt_commit(disk_obd, inode, handle, 0);
-                if (err) {
-                        CERROR("Error committing transaction: %d\n", err);
-                        if (!rc)
-                                rc = err;
-                        failed++;
-                        GOTO(pop_ctxt, rc);
-                } else if (rc)
-                        failed++;
-        }
-        GOTO(pop_ctxt, rc);
-pop_ctxt:
-        pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-        if (rc)
-                CERROR("Cancel %d of %d llog-records failed: %d\n",
-                       failed, num_cookies, rc);
-
-        llog_ctxt_put(ctxt);
-        return rc;
+       ENTRY;
+
+       rc = req_capsule_server_pack(&req->rq_pill);
+       if (rc)
+               RETURN(err_serious(-ENOMEM));
+       RETURN(0);
 }
 }
-EXPORT_SYMBOL(llog_origin_handle_cancel);
+EXPORT_SYMBOL(llog_origin_handle_close);
 
 #else /* !__KERNEL__ */
 int llog_origin_handle_open(struct ptlrpc_request *req)
 
 #else /* !__KERNEL__ */
 int llog_origin_handle_open(struct ptlrpc_request *req)
@@ -461,9 +354,4 @@ int llog_origin_handle_close(struct ptlrpc_request *req)
         LBUG();
         return 0;
 }
         LBUG();
         return 0;
 }
-int llog_origin_handle_cancel(struct ptlrpc_request *req)
-{
-        LBUG();
-        return 0;
-}
 #endif
 #endif
index a3f16d8..47ed364 100644 (file)
@@ -117,10 +117,10 @@ struct ll_rpc_opcode {
         { MGS_SET_INFO,     "mgs_set_info" },
         { MGS_CONFIG_READ,  "mgs_config_read" },
         { OBD_PING,         "obd_ping" },
         { MGS_SET_INFO,     "mgs_set_info" },
         { MGS_CONFIG_READ,  "mgs_config_read" },
         { OBD_PING,         "obd_ping" },
-        { OBD_LOG_CANCEL,   "llog_origin_handle_cancel" },
+       { OBD_LOG_CANCEL,       "llog_cancel" },
         { OBD_QC_CALLBACK,  "obd_quota_callback" },
        { OBD_IDX_READ,     "dt_index_read" },
         { OBD_QC_CALLBACK,  "obd_quota_callback" },
        { OBD_IDX_READ,     "dt_index_read" },
-        { LLOG_ORIGIN_HANDLE_CREATE,     "llog_origin_handle_create" },
+       { LLOG_ORIGIN_HANDLE_CREATE,     "llog_origin_handle_open" },
         { LLOG_ORIGIN_HANDLE_NEXT_BLOCK, "llog_origin_handle_next_block" },
         { LLOG_ORIGIN_HANDLE_READ_HEADER,"llog_origin_handle_read_header" },
         { LLOG_ORIGIN_HANDLE_WRITE_REC,  "llog_origin_handle_write_rec" },
         { LLOG_ORIGIN_HANDLE_NEXT_BLOCK, "llog_origin_handle_next_block" },
         { LLOG_ORIGIN_HANDLE_READ_HEADER,"llog_origin_handle_read_header" },
         { LLOG_ORIGIN_HANDLE_WRITE_REC,  "llog_origin_handle_write_rec" },
index f6bcf83..5fa5155 100644 (file)
@@ -31,4 +31,4 @@
 #
 
 MOSTLYCLEANFILES := @MOSTLYCLEANFILES@
 #
 
 MOSTLYCLEANFILES := @MOSTLYCLEANFILES@
-EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_internal.h
+EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_handler.c tgt_internal.h
diff --git a/lustre/target/tgt_handler.c b/lustre/target/tgt_handler.c
new file mode 100644 (file)
index 0000000..8151fda
--- /dev/null
@@ -0,0 +1,772 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2011, 2012, Intel Corporation.
+ */
+/*
+ * lustre/target/tgt_handler.c
+ *
+ * Lustre Unified Target request handler code
+ *
+ * Author: Brian Behlendorf <behlendorf1@llnl.gov>
+ * Author: Mikhail Pershin <mike.pershin@intel.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_CLASS
+
+#include <obd.h>
+#include <obd_class.h>
+
+#include "tgt_internal.h"
+
+char *tgt_name(struct lu_target *tgt)
+{
+       LASSERT(tgt->lut_obd != NULL);
+       return tgt->lut_obd->obd_name;
+}
+EXPORT_SYMBOL(tgt_name);
+
+static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags)
+{
+       struct req_capsule      *pill = tsi->tsi_pill;
+       const struct mdt_body   *body = NULL;
+       int                      rc = 0;
+
+       ENTRY;
+
+       if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
+               body = req_capsule_client_get(pill, &RMF_MDT_BODY);
+               if (body == NULL)
+                       RETURN(-EFAULT);
+       }
+
+       if (flags & HABEO_REFERO) {
+               /* Pack reply */
+               if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
+                       req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
+                                            body ? body->eadatasize : 0);
+               if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
+                       req_capsule_set_size(pill, &RMF_LOGCOOKIES,
+                                            RCL_SERVER, 0);
+
+               rc = req_capsule_server_pack(pill);
+       }
+       RETURN(rc);
+}
+
+/*
+ * Invoke handler for this request opc. Also do necessary preprocessing
+ * (according to handler ->th_flags), and post-processing (setting of
+ * ->last_{xid,committed}).
+ */
+static int tgt_handle_request0(struct tgt_session_info *tsi,
+                              struct tgt_handler *h,
+                              struct ptlrpc_request *req)
+{
+       int      serious = 0;
+       int      rc;
+       __u32    flags;
+
+       ENTRY;
+
+       LASSERT(h->th_act != NULL);
+       LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
+       LASSERT(current->journal_info == NULL);
+
+       rc = 0;
+       flags = h->th_flags;
+       LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
+                    h->th_fmt != NULL));
+       if (h->th_fmt != NULL) {
+               req_capsule_set(tsi->tsi_pill, h->th_fmt);
+               rc = tgt_unpack_req_pack_rep(tsi, flags);
+       }
+
+       if (rc == 0 && flags & MUTABOR &&
+           tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
+               rc = -EROFS;
+
+       if (rc == 0 && flags & HABEO_CLAVIS) {
+               struct ldlm_request *dlm_req;
+
+               LASSERT(h->th_fmt != NULL);
+
+               dlm_req = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ);
+               if (dlm_req != NULL) {
+                       if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
+                                    LDLM_IBITS &&
+                                    dlm_req->lock_desc.l_policy_data.\
+                                    l_inodebits.bits == 0)) {
+                               /*
+                                * Lock without inodebits makes no sense and
+                                * will oops later in ldlm. If client miss to
+                                * set such bits, do not trigger ASSERTION.
+                                *
+                                * For liblustre flock case, it maybe zero.
+                                */
+                               rc = -EPROTO;
+                       } else {
+                               tsi->tsi_dlm_req = dlm_req;
+                       }
+               } else {
+                       rc = -EFAULT;
+               }
+       }
+
+       if (likely(rc == 0)) {
+               /*
+                * Process request, there can be two types of rc:
+                * 1) errors with msg unpack/pack, other failures outside the
+                * operation itself. This is counted as serious errors;
+                * 2) errors during fs operation, should be placed in rq_status
+                * only
+                */
+               rc = h->th_act(tsi);
+               if (!is_serious(rc) &&
+                   !req->rq_no_reply && req->rq_reply_state == NULL) {
+                       DEBUG_REQ(D_ERROR, req, "%s \"handler\" %s did not "
+                                 "pack reply and returned 0 error\n",
+                                 tgt_name(tsi->tsi_tgt), h->th_name);
+                       LBUG();
+               }
+               serious = is_serious(rc);
+               rc = clear_serious(rc);
+       } else {
+               serious = 1;
+       }
+
+       req->rq_status = rc;
+
+       /*
+        * ELDLM_* codes which > 0 should be in rq_status only as well as
+        * all non-serious errors.
+        */
+       if (rc > 0 || !serious)
+               rc = 0;
+
+       LASSERT(current->journal_info == NULL);
+
+       /*
+        * If we're DISCONNECTing, the export_data is already freed
+        *
+        * WAS if (likely(... && h->mh_opc != MDS_DISCONNECT))
+        */
+       if (likely(rc == 0 && req->rq_export))
+               target_committed_to_req(req);
+
+       target_send_reply(req, rc, tsi->tsi_reply_fail_id);
+       RETURN(0);
+}
+
+static int tgt_filter_recovery_request(struct ptlrpc_request *req,
+                                      struct obd_device *obd, int *process)
+{
+       switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+       case MDS_DISCONNECT:
+       case OST_DISCONNECT:
+               *process = 1;
+               RETURN(0);
+       case MDS_CLOSE:
+       case MDS_DONE_WRITING:
+       case MDS_SYNC: /* used in unmounting */
+       case OBD_PING:
+       case MDS_REINT:
+       case SEQ_QUERY:
+       case FLD_QUERY:
+       case LDLM_ENQUEUE:
+               *process = target_queue_recovery_request(req, obd);
+               RETURN(0);
+
+       default:
+               DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
+               *process = -EAGAIN;
+               RETURN(0);
+       }
+}
+
+/*
+ * Handle recovery. Return:
+ *        +1: continue request processing;
+ *       -ve: abort immediately with the given error code;
+ *         0: send reply with error code in req->rq_status;
+ */
+int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
+{
+       ENTRY;
+
+       switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+       case SEC_CTX_INIT:
+       case SEC_CTX_INIT_CONT:
+       case SEC_CTX_FINI:
+               RETURN(+1);
+       }
+
+       if (unlikely(!class_connected_export(req->rq_export))) {
+               CERROR("%s: operation %d on unconnected export from %s\n",
+                      req->rq_export != NULL ?
+                      req->rq_export->exp_obd->obd_name : "?",
+                      lustre_msg_get_opc(req->rq_reqmsg),
+                      libcfs_id2str(req->rq_peer));
+               req->rq_status = -ENOTCONN;
+               target_send_reply(req, -ENOTCONN, reply_fail_id);
+               RETURN(0);
+       }
+
+       if (!req->rq_export->exp_obd->obd_replayable)
+               RETURN(+1);
+
+       /* sanity check: if the xid matches, the request must be marked as a
+        * resent or replayed */
+       if (req_xid_is_last(req)) {
+               if (!(lustre_msg_get_flags(req->rq_reqmsg) &
+                     (MSG_RESENT | MSG_REPLAY))) {
+                       DEBUG_REQ(D_WARNING, req, "rq_xid "LPU64" matches "
+                                 "last_xid, expected REPLAY or RESENT flag "
+                                 "(%x)", req->rq_xid,
+                                 lustre_msg_get_flags(req->rq_reqmsg));
+                       req->rq_status = -ENOTCONN;
+                       RETURN(-ENOTCONN);
+               }
+       }
+       /* else: note the opposite is not always true; a RESENT req after a
+        * failover will usually not match the last_xid, since it was likely
+        * never committed. A REPLAYed request will almost never match the
+        * last xid, however it could for a committed, but still retained,
+        * open. */
+
+       /* Check for aborted recovery... */
+       if (unlikely(req->rq_export->exp_obd->obd_recovering)) {
+               int rc;
+               int should_process;
+
+               DEBUG_REQ(D_INFO, req, "Got new replay");
+               rc = tgt_filter_recovery_request(req, req->rq_export->exp_obd,
+                                                &should_process);
+               if (rc != 0 || !should_process)
+                       RETURN(rc);
+               else if (should_process < 0) {
+                       req->rq_status = should_process;
+                       rc = ptlrpc_error(req);
+                       RETURN(rc);
+               }
+       }
+       RETURN(+1);
+}
+
+int tgt_request_handle(struct ptlrpc_request *req)
+{
+       struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
+       struct lustre_msg       *msg = req->rq_reqmsg;
+       struct tgt_handler      *h;
+       struct tgt_opc_slice    *s;
+       struct lu_target        *tgt;
+       int                      request_fail_id = 0;
+       __u32                    opc = lustre_msg_get_opc(msg);
+       int                      rc;
+
+       ENTRY;
+
+       req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+       tsi->tsi_pill = &req->rq_pill;
+       tsi->tsi_env = req->rq_svc_thread->t_env;
+       tsi->tsi_dlm_req = NULL;
+
+       /* if request has export then get handlers slice from corresponding
+        * target, otherwise that should be connect operation */
+       if (opc == MDS_CONNECT || opc == OST_CONNECT ||
+           opc == MGS_CONNECT) {
+               req_capsule_set(&req->rq_pill, &RQF_CONNECT);
+               rc = target_handle_connect(req);
+               if (rc != 0) {
+                       rc = ptlrpc_error(req);
+                       GOTO(out, rc);
+               }
+       }
+
+       /* this should be assertion actually, but keep it reporting error
+        * for unified target development time */
+       if (req->rq_export == NULL) {
+               CERROR("Request with no export from %s, opcode %u\n",
+                      libcfs_nid2str(req->rq_peer.nid), opc);
+               req->rq_status = -EFAULT;
+               rc = ptlrpc_error(req);
+               GOTO(out, rc);
+       }
+
+
+       tsi->tsi_tgt = tgt = class_exp2tgt(req->rq_export);
+       tsi->tsi_exp = req->rq_export;
+
+       request_fail_id = tgt->lut_request_fail_id;
+       tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
+
+       for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
+               if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
+                       break;
+
+       /* opcode was not found in slice */
+       if (unlikely(s->tos_hs == NULL)) {
+               CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt), opc);
+               req->rq_status = -ENOTSUPP;
+               rc = ptlrpc_error(req);
+               GOTO(out, rc);
+       }
+
+       if (CFS_FAIL_CHECK_ORSET(request_fail_id, CFS_FAIL_ONCE))
+               GOTO(out, rc = 0);
+
+       LASSERT(current->journal_info == NULL);
+
+       LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
+       h = s->tos_hs + (opc - s->tos_opc_start);
+       if (unlikely(h->th_opc == 0)) {
+               CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
+               req->rq_status = -ENOTSUPP;
+               rc = ptlrpc_error(req);
+               GOTO(out, rc);
+       }
+
+       rc = lustre_msg_check_version(msg, h->th_version);
+       if (unlikely(rc)) {
+               DEBUG_REQ(D_ERROR, req, "%s: drop mal-formed request, version"
+                         " %08x, expecting %08x\n", tgt_name(tgt),
+                         lustre_msg_get_version(msg), h->th_version);
+               req->rq_status = -EINVAL;
+               rc = ptlrpc_error(req);
+               GOTO(out, rc);
+       }
+
+       rc = tgt_handle_recovery(req, tsi->tsi_reply_fail_id);
+       if (likely(rc == 1)) {
+               LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
+                        h->th_opc, opc);
+               rc = tgt_handle_request0(tsi, h, req);
+               if (rc)
+                       GOTO(out, rc);
+       }
+       EXIT;
+out:
+       req_capsule_fini(tsi->tsi_pill);
+       tsi->tsi_pill = NULL;
+       return rc;
+}
+EXPORT_SYMBOL(tgt_request_handle);
+
+void tgt_counter_incr(struct obd_export *exp, int opcode)
+{
+       lprocfs_counter_incr(exp->exp_obd->obd_stats, opcode);
+       if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats != NULL)
+               lprocfs_counter_incr(exp->exp_nid_stats->nid_stats, opcode);
+}
+EXPORT_SYMBOL(tgt_counter_incr);
+
+/*
+ * Unified target generic handlers.
+ */
+
+/*
+ * Security functions
+ */
+static inline void tgt_init_sec_none(struct obd_connect_data *reply)
+{
+       reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
+                                     OBD_CONNECT_RMT_CLIENT_FORCE |
+                                     OBD_CONNECT_MDS_CAPA |
+                                     OBD_CONNECT_OSS_CAPA);
+}
+
+static int tgt_init_sec_level(struct ptlrpc_request *req)
+{
+       struct lu_target        *tgt = class_exp2tgt(req->rq_export);
+       char                    *client = libcfs_nid2str(req->rq_peer.nid);
+       struct obd_connect_data *data, *reply;
+       int                      rc = 0;
+       bool                     remote;
+
+       ENTRY;
+
+       data = req_capsule_client_get(&req->rq_pill, &RMF_CONNECT_DATA);
+       reply = req_capsule_server_get(&req->rq_pill, &RMF_CONNECT_DATA);
+       if (data == NULL || reply == NULL)
+               RETURN(-EFAULT);
+
+       /* connection from MDT is always trusted */
+       if (req->rq_auth_usr_mdt) {
+               tgt_init_sec_none(reply);
+               RETURN(0);
+       }
+
+       /* no GSS support case */
+       if (!req->rq_auth_gss) {
+               if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
+                       CWARN("client %s -> target %s does not use GSS, "
+                             "can not run under security level %d.\n",
+                             client, tgt_name(tgt), tgt->lut_sec_level);
+                       RETURN(-EACCES);
+               } else {
+                       tgt_init_sec_none(reply);
+                       RETURN(0);
+               }
+       }
+
+       /* old version case */
+       if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
+                    !(data->ocd_connect_flags & OBD_CONNECT_MDS_CAPA) ||
+                    !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
+               if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
+                       CWARN("client %s -> target %s uses old version, "
+                             "can not run under security level %d.\n",
+                             client, tgt_name(tgt), tgt->lut_sec_level);
+                       RETURN(-EACCES);
+               } else {
+                       CWARN("client %s -> target %s uses old version, "
+                             "run under security level %d.\n",
+                             client, tgt_name(tgt), tgt->lut_sec_level);
+                       tgt_init_sec_none(reply);
+                       RETURN(0);
+               }
+       }
+
+       remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
+       if (remote) {
+               if (!req->rq_auth_remote)
+                       CDEBUG(D_SEC, "client (local realm) %s -> target %s "
+                              "asked to be remote.\n", client, tgt_name(tgt));
+       } else if (req->rq_auth_remote) {
+               remote = true;
+               CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
+                      "as remote by default.\n", client, tgt_name(tgt));
+       }
+
+       if (remote) {
+               if (!tgt->lut_oss_capa) {
+                       CDEBUG(D_SEC,
+                              "client %s -> target %s is set as remote,"
+                              " but OSS capabilities are not enabled: %d.\n",
+                              client, tgt_name(tgt), tgt->lut_oss_capa);
+                       RETURN(-EACCES);
+               }
+       } else {
+               if (req->rq_auth_uid == INVALID_UID) {
+                       CDEBUG(D_SEC, "client %s -> target %s: user is not "
+                              "authenticated!\n", client, tgt_name(tgt));
+                       RETURN(-EACCES);
+               }
+       }
+
+
+       switch (tgt->lut_sec_level) {
+       case LUSTRE_SEC_NONE:
+               if (remote) {
+                       CDEBUG(D_SEC,
+                              "client %s -> target %s is set as remote, "
+                              "can not run under security level %d.\n",
+                              client, tgt_name(tgt), tgt->lut_sec_level);
+                       RETURN(-EACCES);
+               }
+               tgt_init_sec_none(reply);
+               break;
+       case LUSTRE_SEC_REMOTE:
+               if (!remote)
+                       tgt_init_sec_none(reply);
+               break;
+       case LUSTRE_SEC_ALL:
+               if (remote)
+                       break;
+               reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
+                                             OBD_CONNECT_RMT_CLIENT_FORCE);
+               if (!tgt->lut_oss_capa)
+                       reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
+               if (!tgt->lut_mds_capa)
+                       reply->ocd_connect_flags &= ~OBD_CONNECT_MDS_CAPA;
+               break;
+       default:
+               RETURN(-EINVAL);
+       }
+
+       RETURN(rc);
+}
+
+int tgt_connect_check_sptlrpc(struct ptlrpc_request *req, struct obd_export *exp)
+{
+       struct lu_target        *tgt = class_exp2tgt(exp);
+       struct sptlrpc_flavor    flvr;
+       int                      rc = 0;
+
+       LASSERT(tgt);
+       LASSERT(tgt->lut_obd);
+       LASSERT(tgt->lut_slice);
+
+       /* always allow ECHO client */
+       if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
+                           LUSTRE_ECHO_NAME) == 0)) {
+               exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
+               return 0;
+       }
+
+       if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
+               read_lock(&tgt->lut_sptlrpc_lock);
+               sptlrpc_target_choose_flavor(&tgt->lut_sptlrpc_rset,
+                                            req->rq_sp_from,
+                                            req->rq_peer.nid,
+                                            &flvr);
+               read_unlock(&tgt->lut_sptlrpc_lock);
+
+               spin_lock(&exp->exp_lock);
+               exp->exp_sp_peer = req->rq_sp_from;
+               exp->exp_flvr = flvr;
+               if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
+                   exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
+                       CERROR("%s: unauthorized rpc flavor %x from %s, "
+                              "expect %x\n", tgt_name(tgt),
+                              req->rq_flvr.sf_rpc,
+                              libcfs_nid2str(req->rq_peer.nid),
+                              exp->exp_flvr.sf_rpc);
+                       rc = -EACCES;
+               }
+               spin_unlock(&exp->exp_lock);
+       } else {
+               if (exp->exp_sp_peer != req->rq_sp_from) {
+                       CERROR("%s: RPC source %s doesn't match %s\n",
+                              tgt_name(tgt),
+                              sptlrpc_part2name(req->rq_sp_from),
+                              sptlrpc_part2name(exp->exp_sp_peer));
+                       rc = -EACCES;
+               } else {
+                       rc = sptlrpc_target_export_check(exp, req);
+               }
+       }
+
+       return rc;
+}
+
+int tgt_connect(struct tgt_session_info *tsi)
+{
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       struct obd_connect_data *reply;
+       int                      rc;
+
+       ENTRY;
+
+       rc = tgt_init_sec_level(req);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       /* XXX: better to call this check right after getting new export but
+        * before last_rcvd slot allocation to avoid server load upon insecure
+        * connects. This is to be fixed after unifiyng all targets.
+        */
+       rc = tgt_connect_check_sptlrpc(req, tsi->tsi_exp);
+       if (rc)
+               GOTO(out, rc);
+
+       /* To avoid exposing partially initialized connection flags, changes up
+        * to this point have been staged in reply->ocd_connect_flags. Now that
+        * connection handling has completed successfully, atomically update
+        * the connect flags in the shared export data structure. LU-1623 */
+       reply = req_capsule_server_get(tsi->tsi_pill, &RMF_CONNECT_DATA);
+       spin_lock(&tsi->tsi_exp->exp_lock);
+       *exp_connect_flags_ptr(tsi->tsi_exp) = reply->ocd_connect_flags;
+       spin_unlock(&tsi->tsi_exp->exp_lock);
+
+       RETURN(0);
+out:
+       obd_disconnect(class_export_get(tsi->tsi_exp));
+       return rc;
+}
+EXPORT_SYMBOL(tgt_connect);
+
+int tgt_disconnect(struct tgt_session_info *tsi)
+{
+       int rc;
+
+       ENTRY;
+
+       rc = target_handle_disconnect(tgt_ses_req(tsi));
+       if (rc)
+               RETURN(err_serious(rc));
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_disconnect);
+
+/*
+ * Unified target OBD handlers
+ */
+int tgt_obd_ping(struct tgt_session_info *tsi)
+{
+       int rc;
+
+       ENTRY;
+
+       rc = target_handle_ping(tgt_ses_req(tsi));
+       if (rc)
+               RETURN(err_serious(rc));
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_obd_ping);
+
+int tgt_obd_log_cancel(struct tgt_session_info *tsi)
+{
+       return err_serious(-EOPNOTSUPP);
+}
+EXPORT_SYMBOL(tgt_obd_log_cancel);
+
+int tgt_obd_qc_callback(struct tgt_session_info *tsi)
+{
+       return err_serious(-EOPNOTSUPP);
+}
+EXPORT_SYMBOL(tgt_obd_qc_callback);
+
+/*
+ * Unified target DLM handlers.
+ */
+struct ldlm_callback_suite tgt_dlm_cbs = {
+       .lcs_completion = ldlm_server_completion_ast,
+       .lcs_blocking = ldlm_server_blocking_ast,
+};
+
+int tgt_enqueue(struct tgt_session_info *tsi)
+{
+       struct ptlrpc_request *req = tgt_ses_req(tsi);
+       int rc;
+
+       ENTRY;
+       /*
+        * tsi->tsi_dlm_req was already swapped and (if necessary) converted,
+        * tsi->tsi_dlm_cbs was set by the *_req_handle() function.
+        */
+       LASSERT(tsi->tsi_dlm_req != NULL);
+
+       rc = ldlm_handle_enqueue0(tsi->tsi_exp->exp_obd->obd_namespace, req,
+                                 tsi->tsi_dlm_req, &tgt_dlm_cbs);
+       if (rc)
+               RETURN(err_serious(rc));
+
+       RETURN(req->rq_status);
+}
+EXPORT_SYMBOL(tgt_enqueue);
+
+int tgt_convert(struct tgt_session_info *tsi)
+{
+       struct ptlrpc_request *req = tgt_ses_req(tsi);
+       int rc;
+
+       ENTRY;
+       LASSERT(tsi->tsi_dlm_req);
+       rc = ldlm_handle_convert0(req, tsi->tsi_dlm_req);
+       if (rc)
+               RETURN(err_serious(rc));
+
+       RETURN(req->rq_status);
+}
+EXPORT_SYMBOL(tgt_convert);
+
+int tgt_bl_callback(struct tgt_session_info *tsi)
+{
+       return err_serious(-EOPNOTSUPP);
+}
+EXPORT_SYMBOL(tgt_bl_callback);
+
+int tgt_cp_callback(struct tgt_session_info *tsi)
+{
+       return err_serious(-EOPNOTSUPP);
+}
+EXPORT_SYMBOL(tgt_cp_callback);
+
+/*
+ * Unified target LLOG handlers.
+ */
+int tgt_llog_open(struct tgt_session_info *tsi)
+{
+       int rc;
+
+       ENTRY;
+
+       rc = llog_origin_handle_open(tgt_ses_req(tsi));
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_llog_open);
+
+int tgt_llog_close(struct tgt_session_info *tsi)
+{
+       int rc;
+
+       ENTRY;
+
+       rc = llog_origin_handle_close(tgt_ses_req(tsi));
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_llog_close);
+
+
+int tgt_llog_destroy(struct tgt_session_info *tsi)
+{
+       int rc;
+
+       ENTRY;
+
+       rc = llog_origin_handle_destroy(tgt_ses_req(tsi));
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_llog_destroy);
+
+int tgt_llog_read_header(struct tgt_session_info *tsi)
+{
+       int rc;
+
+       ENTRY;
+
+       rc = llog_origin_handle_read_header(tgt_ses_req(tsi));
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_llog_read_header);
+
+int tgt_llog_next_block(struct tgt_session_info *tsi)
+{
+       int rc;
+
+       ENTRY;
+
+       rc = llog_origin_handle_next_block(tgt_ses_req(tsi));
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_llog_next_block);
+
+int tgt_llog_prev_block(struct tgt_session_info *tsi)
+{
+       int rc;
+
+       ENTRY;
+
+       rc = llog_origin_handle_prev_block(tgt_ses_req(tsi));
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_llog_prev_block);
index 3623bde..02cc499 100644 (file)
@@ -66,4 +66,18 @@ static inline struct tgt_thread_info *tgt_th_info(const struct lu_env *env)
        return tti;
 }
 
        return tti;
 }
 
+#define MGS_SERVICE_WATCHDOG_FACTOR      (2)
+
+int tgt_request_handle(struct ptlrpc_request *req);
+
+/* check if request's xid is equal to last one or not*/
+static inline int req_xid_is_last(struct ptlrpc_request *req)
+{
+       struct lsd_client_data *lcd = req->rq_export->exp_target_data.ted_lcd;
+
+       LASSERT(lcd != NULL);
+       return (req->rq_xid == lcd->lcd_last_xid ||
+               req->rq_xid == lcd->lcd_last_close_xid);
+}
+
 #endif /* _TG_INTERNAL_H */
 #endif /* _TG_INTERNAL_H */
index 876cef2..f7f9b6e 100644 (file)
 #define DEBUG_SUBSYSTEM S_CLASS
 
 #include <obd.h>
 #define DEBUG_SUBSYSTEM S_CLASS
 
 #include <obd.h>
-
 #include "tgt_internal.h"
 
 int tgt_init(const struct lu_env *env, struct lu_target *lut,
 #include "tgt_internal.h"
 
 int tgt_init(const struct lu_env *env, struct lu_target *lut,
-            struct obd_device *obd, struct dt_device *dt)
+            struct obd_device *obd, struct dt_device *dt,
+            struct tgt_opc_slice *slice, int request_fail_id,
+            int reply_fail_id)
 {
 {
-       struct dt_object_format dof;
-       struct lu_attr          attr;
-       struct lu_fid           fid;
-       struct dt_object       *o;
-       int                     rc = 0;
+       struct dt_object_format  dof;
+       struct lu_attr           attr;
+       struct lu_fid            fid;
+       struct dt_object        *o;
+       int                      rc = 0;
 
        ENTRY;
 
 
        ENTRY;
 
@@ -53,9 +54,25 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut,
        lut->lut_obd = obd;
        lut->lut_bottom = dt;
        lut->lut_last_rcvd = NULL;
        lut->lut_obd = obd;
        lut->lut_bottom = dt;
        lut->lut_last_rcvd = NULL;
+       lut->lut_client_bitmap = NULL;
        obd->u.obt.obt_lut = lut;
        obd->u.obt.obt_magic = OBT_MAGIC;
 
        obd->u.obt.obt_lut = lut;
        obd->u.obt.obt_magic = OBT_MAGIC;
 
+       /* set request handler slice and parameters */
+       lut->lut_slice = slice;
+       lut->lut_reply_fail_id = reply_fail_id;
+       lut->lut_request_fail_id = request_fail_id;
+
+       /* sptlrcp variables init */
+       rwlock_init(&lut->lut_sptlrpc_lock);
+       sptlrpc_rule_set_init(&lut->lut_sptlrpc_rset);
+       lut->lut_mds_capa = 1;
+       lut->lut_oss_capa = 1;
+
+       /* last_rcvd initialization is needed by replayable targets only */
+       if (!obd->obd_replayable)
+               RETURN(0);
+
        spin_lock_init(&lut->lut_translock);
 
        OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
        spin_lock_init(&lut->lut_translock);
 
        OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
@@ -87,6 +104,8 @@ void tgt_fini(const struct lu_env *env, struct lu_target *lut)
 {
        ENTRY;
 
 {
        ENTRY;
 
+       sptlrpc_rule_set_free(&lut->lut_sptlrpc_rset);
+
        if (lut->lut_client_bitmap) {
                OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
                lut->lut_client_bitmap = NULL;
        if (lut->lut_client_bitmap) {
                OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
                lut->lut_client_bitmap = NULL;
@@ -106,17 +125,41 @@ LU_KEY_INIT_FINI(tgt, struct tgt_thread_info);
 LU_CONTEXT_KEY_DEFINE(tgt, LCT_MD_THREAD | LCT_DT_THREAD);
 EXPORT_SYMBOL(tgt_thread_key);
 
 LU_CONTEXT_KEY_DEFINE(tgt, LCT_MD_THREAD | LCT_DT_THREAD);
 EXPORT_SYMBOL(tgt_thread_key);
 
-LU_KEY_INIT_GENERIC(tg);
+LU_KEY_INIT_GENERIC(tgt);
+
+/* context key constructor/destructor: tgt_ses_key_init, tgt_ses_key_fini */
+LU_KEY_INIT_FINI(tgt_ses, struct tgt_session_info);
+
+/* context key: tgt_session_key */
+struct lu_context_key tgt_session_key = {
+       .lct_tags = LCT_SESSION,
+       .lct_init = tgt_ses_key_init,
+       .lct_fini = tgt_ses_key_fini,
+};
+EXPORT_SYMBOL(tgt_session_key);
+
+LU_KEY_INIT_GENERIC(tgt_ses);
+
+struct lprocfs_vars lprocfs_srv_module_vars[] = {
+       { 0 },
+};
 
 int tgt_mod_init(void)
 {
 
 int tgt_mod_init(void)
 {
-       tg_key_init_generic(&tgt_thread_key, NULL);
+       ENTRY;
+
+       tgt_key_init_generic(&tgt_thread_key, NULL);
        lu_context_key_register_many(&tgt_thread_key, NULL);
        lu_context_key_register_many(&tgt_thread_key, NULL);
-       return 0;
+
+       tgt_ses_key_init_generic(&tgt_session_key, NULL);
+       lu_context_key_register_many(&tgt_session_key, NULL);
+
+       RETURN(0);
 }
 
 void tgt_mod_exit(void)
 {
        lu_context_key_degister(&tgt_thread_key);
 }
 
 void tgt_mod_exit(void)
 {
        lu_context_key_degister(&tgt_thread_key);
+       lu_context_key_degister(&tgt_session_key);
 }
 
 }