Whamcloud - gitweb
LU-3467 target: move OUT to the unified target code 63/6763/10
authorMikhail Pershin <mike.pershin@intel.com>
Thu, 13 Jun 2013 18:22:10 +0000 (22:22 +0400)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 1 Aug 2013 01:02:01 +0000 (01:02 +0000)
- Move OUT handler to the unified target code, so it can be
  used by both MDS and OST.
- Make OFD able to handle OUT requests.
- Rename MDS_MDS_PORTAL to the OUT_PORTAL and MDS_OUT_... defines
  just OUT_... since they are independent from MDS now.

Signed-off-by: Mikhail Pershin <mike.pershin@intel.com>
Change-Id: I3d595f18245f1db2619b7d15f94e4b61e882f114
Reviewed-on: http://review.whamcloud.com/6763
Tested-by: Hudson
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
22 files changed:
lustre/include/lu_target.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_net.h
lustre/include/obd_target.h
lustre/ldlm/ldlm_lib.c
lustre/lod/lod_lov.c
lustre/mdc/mdc_request.c
lustre/mdt/Makefile.in
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_mds.c
lustre/mdt/mdt_recovery.c
lustre/mgs/mgs_handler.c
lustre/obdecho/echo_client.c
lustre/ofd/ofd_dev.c
lustre/ost/ost_handler.c
lustre/ptlrpc/Makefile.in
lustre/target/Makefile.am
lustre/target/out_handler.c [moved from lustre/mdt/out_handler.c with 66% similarity]
lustre/target/tgt_handler.c
lustre/target/tgt_internal.h
lustre/target/tgt_lastrcvd.c

index 66e6f76..114073b 100644 (file)
@@ -39,6 +39,7 @@
 
 #include <dt_object.h>
 #include <lustre_export.h>
+#include <lustre_update.h>
 #include <lustre_disk.h>
 
 struct lu_target {
@@ -177,6 +178,12 @@ static inline __u64 tgt_conn_flags(struct tgt_session_info *tsi)
        return exp_connect_flags(tsi->tsi_exp);
 }
 
+static inline int req_is_replay(struct ptlrpc_request *req)
+{
+       LASSERT(req->rq_reqmsg);
+       return !!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
+}
+
 /* target/tgt_handler.c */
 int tgt_request_handle(struct ptlrpc_request *req);
 char *tgt_name(struct lu_target *tgt);
@@ -200,6 +207,12 @@ int tgt_sec_ctx_init(struct tgt_session_info *tsi);
 int tgt_sec_ctx_init_cont(struct tgt_session_info *tsi);
 int tgt_sec_ctx_fini(struct tgt_session_info *tsi);
 
+extern struct tgt_handler tgt_sec_ctx_handlers[];
+extern struct tgt_handler tgt_obd_handlers[];
+extern struct tgt_handler tgt_dlm_handlers[];
+extern struct tgt_handler tgt_llog_handlers[];
+extern struct tgt_handler tgt_out_handlers[];
+
 typedef void (*tgt_cb_t)(struct lu_target *lut, __u64 transno,
                         void *data, int err);
 struct tgt_commit_cb {
@@ -229,8 +242,13 @@ int tgt_client_data_write(const struct lu_env *env, struct lu_target *tg,
 int tgt_server_data_read(const struct lu_env *env, struct lu_target *tg);
 int tgt_server_data_write(const struct lu_env *env, struct lu_target *tg,
                          struct thandle *th);
-int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg, int sync);
-int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg, loff_t off);
+int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg,
+                          int sync);
+int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
+                          loff_t off);
+int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
+                        struct dt_object *obj, __u64 opdata,
+                        struct thandle *th, struct ptlrpc_request *req);
 
 enum {
        ESERIOUS = 0x0001000
@@ -268,46 +286,77 @@ static inline int is_serious(int rc)
        .th_version     = version                                       \
 }
 
+/* MDT Request with a format known in advance */
+#define TGT_MDT_HDL(flags, name, fn)                                   \
+       TGT_RPC_HANDLER(MDS_FIRST_OPC, flags, name, fn, &RQF_ ## name,  \
+                       LUSTRE_MDS_VERSION)
+/* Request with a format we do not yet know */
+#define TGT_MDT_HDL_VAR(flags, name, fn)                               \
+       TGT_RPC_HANDLER(MDS_FIRST_OPC, flags, name, fn, NULL,           \
+                       LUSTRE_MDS_VERSION)
+
 /* MGS request with a format known in advance */
 #define TGT_MGS_HDL(flags, name, fn)                                   \
-       TGT_RPC_HANDLER(MGS_FIRST_OPC, flags, name, fn, &RQF_ ## name,\
-                          LUSTRE_MGS_VERSION)
+       TGT_RPC_HANDLER(MGS_FIRST_OPC, flags, name, fn, &RQF_ ## name,  \
+                       LUSTRE_MGS_VERSION)
 #define TGT_MGS_HDL_VAR(flags, name, fn)                               \
-       TGT_RPC_HANDLER(MGS_FIRST_OPC, flags, name, fn, NULL,   \
-                          LUSTRE_MGS_VERSION)
+       TGT_RPC_HANDLER(MGS_FIRST_OPC, flags, name, fn, NULL,           \
+                       LUSTRE_MGS_VERSION)
 
 /*
  * OBD handler macros and generic functions.
  */
 #define TGT_OBD_HDL(flags, name, fn)                                   \
-       TGT_RPC_HANDLER(OBD_FIRST_OPC, flags, name, fn, &RQF_ ## name,\
-                          LUSTRE_OBD_VERSION)
+       TGT_RPC_HANDLER(OBD_FIRST_OPC, flags, name, fn, &RQF_ ## name,  \
+                       LUSTRE_OBD_VERSION)
+#define TGT_OBD_HDL_VAR(flags, name, fn)                               \
+       TGT_RPC_HANDLER(OBD_FIRST_OPC, flags, name, fn, NULL,           \
+                       LUSTRE_OBD_VERSION)
 
 /*
  * DLM handler macros and generic functions.
  */
 #define TGT_DLM_HDL_VAR(flags, name, fn)                               \
-       TGT_RPC_HANDLER(LDLM_FIRST_OPC, flags, name, fn, NULL,  \
-                          LUSTRE_DLM_VERSION)
+       TGT_RPC_HANDLER(LDLM_FIRST_OPC, flags, name, fn, NULL,          \
+                       LUSTRE_DLM_VERSION)
 #define TGT_DLM_HDL(flags, name, fn)                                   \
-       TGT_RPC_HANDLER(LDLM_FIRST_OPC, flags, name, fn, &RQF_ ## name,\
-                          LUSTRE_DLM_VERSION)
+       TGT_RPC_HANDLER(LDLM_FIRST_OPC, flags, name, fn, &RQF_ ## name, \
+                       LUSTRE_DLM_VERSION)
 
 /*
  * LLOG handler macros and generic functions.
  */
 #define TGT_LLOG_HDL_VAR(flags, name, fn)                              \
-       TGT_RPC_HANDLER(LLOG_FIRST_OPC, flags, name, fn, NULL,  \
-                          LUSTRE_LOG_VERSION)
+       TGT_RPC_HANDLER(LLOG_FIRST_OPC, flags, name, fn, NULL,          \
+                       LUSTRE_LOG_VERSION)
 #define TGT_LLOG_HDL(flags, name, fn)                                  \
-       TGT_RPC_HANDLER(LLOG_FIRST_OPC, flags, name, fn, &RQF_ ## name,\
-                          LUSTRE_LOG_VERSION)
+       TGT_RPC_HANDLER(LLOG_FIRST_OPC, flags, name, fn, &RQF_ ## name, \
+                       LUSTRE_LOG_VERSION)
 
 /*
  * Sec context handler macros and generic functions.
  */
 #define TGT_SEC_HDL_VAR(flags, name, fn)                               \
-       TGT_RPC_HANDLER(SEC_FIRST_OPC, flags, name, fn, NULL,   \
-                          LUSTRE_OBD_VERSION)
+       TGT_RPC_HANDLER(SEC_FIRST_OPC, flags, name, fn, NULL,           \
+                       LUSTRE_OBD_VERSION)
+
+#define TGT_QUOTA_HDL(flags, name, fn)                                 \
+       TGT_RPC_HANDLER(QUOTA_DQACQ, flags, name, fn, &RQF_ ## name,    \
+                       LUSTRE_MDS_VERSION)
+
+/* Sequence service handlers */
+#define TGT_SEQ_HDL(flags, name, fn)                                   \
+       TGT_RPC_HANDLER(SEQ_QUERY, flags, name, fn, &RQF_ ## name,      \
+                       LUSTRE_MDS_VERSION)
+
+/* FID Location Database handlers */
+#define TGT_FLD_HDL(flags, name, fn)                                   \
+       TGT_RPC_HANDLER(FLD_QUERY, flags, name, fn, &RQF_ ## name,      \
+                       LUSTRE_MDS_VERSION)
+
+/* Request with a format known in advance */
+#define TGT_UPDATE_HDL(flags, name, fn)                                        \
+       TGT_RPC_HANDLER(UPDATE_OBJ, flags, name, fn, &RQF_ ## name,     \
+                       LUSTRE_MDS_VERSION)
 
 #endif /* __LUSTRE_LU_TARGET_H */
index 593aded..1114c00 100644 (file)
 //#define PTLBD_BULK_PORTAL              21
 #define MDS_SETATTR_PORTAL             22
 #define MDS_READPAGE_PORTAL            23
-#define MDS_MDS_PORTAL                 24
-
+#define OUT_PORTAL                     24
 #define MGC_REPLY_PORTAL               25
 #define MGS_REQUEST_PORTAL             26
 #define MGS_REPLY_PORTAL               27
index d5a9731..e6a8c4c 100644 (file)
  * include linkea (4K maxim), together with other updates, we set it to 9K:
  * lustre_msg + ptlrpc_body + UPDATE_BUF_SIZE (8K)
  */
-#define MDS_OUT_MAXREQSIZE     (9 * 1024)
-#define MDS_OUT_MAXREPSIZE     MDS_MAXREPSIZE
+#define OUT_MAXREQSIZE (9 * 1024)
+#define OUT_MAXREPSIZE MDS_MAXREPSIZE
 
 /** MDS_BUFSIZE = max_reqsize (w/o LOV EA) + max sptlrpc payload size */
 #define MDS_BUFSIZE            max(MDS_MAXREQSIZE + SPTLRPC_MAX_PAYLOAD, \
                                    160 * 1024)
 
 /**
- * MDS_OUT_BUFSIZE = max_out_reqsize + max sptlrpc payload (~1K) which is
+ * OUT_BUFSIZE = max_out_reqsize + max sptlrpc payload (~1K) which is
  * about 10K, for the same reason as MDS_REG_BUFSIZE, we also give some
  * extra bytes to each request buffer to improve buffer utilization rate.
   */
-#define MDS_OUT_BUFSIZE                max(MDS_OUT_MAXREQSIZE + SPTLRPC_MAX_PAYLOAD, \
+#define OUT_BUFSIZE            max(OUT_MAXREQSIZE + SPTLRPC_MAX_PAYLOAD, \
                                    24 * 1024)
 
 /** FLD_MAXREQSIZE == lustre_msg + __u32 padding + ptlrpc_body + opc */
index 0d7ea23..d59282f 100644 (file)
@@ -73,11 +73,12 @@ struct echo_obd {
 };
 
 struct ost_obd {
-       struct ptlrpc_service          *ost_service;
-       struct ptlrpc_service          *ost_create_service;
-       struct ptlrpc_service          *ost_io_service;
-       struct ptlrpc_service          *ost_seq_service;
-       struct mutex                    ost_health_mutex;
+       struct ptlrpc_service   *ost_service;
+       struct ptlrpc_service   *ost_create_service;
+       struct ptlrpc_service   *ost_io_service;
+       struct ptlrpc_service   *ost_seq_service;
+       struct ptlrpc_service   *ost_out_service;
+       struct mutex             ost_health_mutex;
 };
 
 #endif /* __OBD_TARGET_H */
index eafe743..84d7660 100644 (file)
@@ -1062,6 +1062,9 @@ dont_check_exports:
                         rc = obd_connect(req->rq_svc_thread->t_env,
                                          &export, target, &cluuid, data,
                                          client_nid);
+                       if (mds_conn && OBD_FAIL_CHECK(OBD_FAIL_TGT_RCVG_FLAG))
+                               lustre_msg_add_op_flags(req->rq_repmsg,
+                                               MSG_CONNECT_RECOVERING);
                         if (rc == 0)
                                 conn.cookie = export->exp_handle.h_cookie;
                 }
index 03efa26..823f37c 100644 (file)
@@ -247,7 +247,7 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod,
                spin_lock(&imp->imp_lock);
                imp->imp_server_timeout = 1;
                spin_unlock(&imp->imp_lock);
-               imp->imp_client->cli_request_portal = MDS_MDS_PORTAL;
+               imp->imp_client->cli_request_portal = OUT_PORTAL;
                CDEBUG(D_OTHER, "%s: Set 'mds' portal and timeout\n",
                      obd->obd_name);
                ltd = &lod->lod_mdt_descs;
index aea0265..498a2fd 100644 (file)
@@ -2173,15 +2173,6 @@ int mdc_set_info_async(const struct lu_env *env,
                 sptlrpc_import_flush_my_ctx(imp);
                 RETURN(0);
         }
-        if (KEY_IS(KEY_MDS_CONN)) {
-                /* mds-mds import */
-               spin_lock(&imp->imp_lock);
-               imp->imp_server_timeout = 1;
-               spin_unlock(&imp->imp_lock);
-                imp->imp_client->cli_request_portal = MDS_MDS_PORTAL;
-                CDEBUG(D_OTHER, "%s: timeout / 2\n", exp->exp_obd->obd_name);
-                RETURN(0);
-        }
         if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
                 rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION,
                                        keylen, key, vallen, val, set);
@@ -2705,27 +2696,6 @@ static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc,
         RETURN(0);
 }
 
-static int mdc_connect(const struct lu_env *env,
-                       struct obd_export **exp,
-                       struct obd_device *obd, struct obd_uuid *cluuid,
-                       struct obd_connect_data *data,
-                       void *localdata)
-{
-       struct obd_import *imp = obd->u.cli.cl_import;
-
-       /* mds-mds import features */
-       if (data && (data->ocd_connect_flags & OBD_CONNECT_MDS_MDS)) {
-               spin_lock(&imp->imp_lock);
-               imp->imp_server_timeout = 1;
-               spin_unlock(&imp->imp_lock);
-                imp->imp_client->cli_request_portal = MDS_MDS_PORTAL;
-                CDEBUG(D_OTHER, "%s: Set 'mds' portal and timeout\n",
-                       obd->obd_name);
-        }
-
-        return client_connect_import(env, exp, obd, cluuid, data, NULL);
-}
-
 struct obd_ops mdc_obd_ops = {
         .o_owner            = THIS_MODULE,
         .o_setup            = mdc_setup,
@@ -2733,7 +2703,7 @@ struct obd_ops mdc_obd_ops = {
         .o_cleanup          = mdc_cleanup,
         .o_add_conn         = client_import_add_conn,
         .o_del_conn         = client_import_del_conn,
-        .o_connect          = mdc_connect,
+        .o_connect          = client_connect_import,
         .o_disconnect       = client_disconnect_export,
         .o_iocontrol        = mdc_iocontrol,
         .o_set_info_async   = mdc_set_info_async,
index 8a077bb..a6b55e5 100644 (file)
@@ -1,7 +1,7 @@
 MODULES := mdt
 mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_recovery.o
 mdt-objs += mdt_open.o mdt_idmap.o mdt_identity.o mdt_capa.o mdt_lproc.o mdt_fs.o
-mdt-objs += mdt_lvb.o mdt_hsm.o mdt_mds.o out_handler.o
+mdt-objs += mdt_lvb.o mdt_hsm.o mdt_mds.o
 mdt-objs += mdt_hsm_cdt_actions.o
 mdt-objs += mdt_hsm_cdt_requests.o
 mdt-objs += mdt_hsm_cdt_client.o
index 2d09345..4dc54ca 100644 (file)
@@ -1567,6 +1567,9 @@ int mdt_set_info(struct mdt_thread_info *info)
         RETURN(0);
 }
 
+int mdt_connect_check_sptlrpc(struct mdt_device *mdt, struct obd_export *exp,
+                             struct ptlrpc_request *req);
+
 /**
  * Top-level handler for MDT connection requests.
  */
@@ -1577,32 +1580,39 @@ int mdt_connect(struct mdt_thread_info *info)
        struct obd_export *exp;
        struct ptlrpc_request *req = mdt_info_req(info);
 
+       ENTRY;
+
        rc = target_handle_connect(req);
        if (rc != 0)
-               return err_serious(rc);
+               RETURN(err_serious(rc));
 
        LASSERT(req->rq_export != NULL);
-       info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
+       exp = req->rq_export;
+       info->mti_exp = exp;
+       info->mti_mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
        rc = mdt_init_sec_level(info);
-       if (rc != 0) {
-               obd_disconnect(class_export_get(req->rq_export));
-               return rc;
-       }
+       if (rc != 0)
+               GOTO(err, rc);
+
+       rc = mdt_connect_check_sptlrpc(info->mti_mdt, exp, req);
+       if (rc)
+               GOTO(err, rc);
 
        /* To avoid exposing partially initialized connection flags, changes up
         * to this point have been staged in reply->ocd_connect_flags. Now that
         * connection handling has completed successfully, atomically update
         * the connect flags in the shared export data structure. LU-1623 */
        reply = req_capsule_server_get(info->mti_pill, &RMF_CONNECT_DATA);
-       exp = req->rq_export;
        spin_lock(&exp->exp_lock);
        *exp_connect_flags_ptr(exp) = reply->ocd_connect_flags;
        spin_unlock(&exp->exp_lock);
 
        rc = mdt_init_idmap(info);
        if (rc != 0)
-               obd_disconnect(class_export_get(req->rq_export));
-
+               GOTO(err, rc);
+       RETURN(0);
+err:
+       obd_disconnect(class_export_get(req->rq_export));
        return rc;
 }
 
@@ -3077,12 +3087,11 @@ void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
  * uninitialized state, because it's too expensive to zero out whole
  * mdt_thread_info (> 1K) on each request arrival.
  */
-static void mdt_thread_info_init(struct ptlrpc_request *req,
-                                 struct mdt_thread_info *info)
+void mdt_thread_info_init(struct ptlrpc_request *req,
+                         struct mdt_thread_info *info)
 {
         int i;
 
-        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
         info->mti_pill = &req->rq_pill;
 
         /* lock handle */
@@ -3115,11 +3124,10 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
        info->mti_spec.sp_rm_entry = 0;
 }
 
-static void mdt_thread_info_fini(struct mdt_thread_info *info)
+void mdt_thread_info_fini(struct mdt_thread_info *info)
 {
        int i;
 
-       req_capsule_fini(info->mti_pill);
        if (info->mti_object != NULL) {
                mdt_object_put(info->mti_env, info->mti_object);
                info->mti_object = NULL;
@@ -3128,11 +3136,41 @@ static void mdt_thread_info_fini(struct mdt_thread_info *info)
        for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
                mdt_lock_handle_fini(&info->mti_lh[i]);
        info->mti_env = NULL;
+       info->mti_pill = NULL;
+       info->mti_exp = NULL;
 
        if (unlikely(info->mti_big_buf.lb_buf != NULL))
                lu_buf_free(&info->mti_big_buf);
 }
 
+int mdt_tgt_connect(struct tgt_session_info *tsi)
+{
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       struct mdt_thread_info  *mti;
+       int                      rc;
+
+       ENTRY;
+
+       rc = tgt_connect(tsi);
+       if (rc != 0)
+               RETURN(rc);
+
+       /* XXX: switch mdt_init_idmap() to use tgt_session_info */
+       lu_env_refill((void *)tsi->tsi_env);
+       mti = lu_context_key_get(&tsi->tsi_env->le_ctx, &mdt_thread_key);
+       LASSERT(mti != NULL);
+
+       mdt_thread_info_init(req, mti);
+       rc = mdt_init_idmap(mti);
+       mdt_thread_info_fini(mti);
+       if (rc != 0)
+               GOTO(err, rc);
+       RETURN(0);
+err:
+       obd_disconnect(class_export_get(req->rq_export));
+       return rc;
+}
+
 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
                                        struct obd_device *obd, int *process)
 {
@@ -3407,11 +3445,13 @@ int mdt_handle_common(struct ptlrpc_request *req,
         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
         LASSERT(info != NULL);
 
+       req_capsule_init(&req->rq_pill, req, RCL_SERVER);
         mdt_thread_info_init(req, info);
 
         rc = mdt_handle0(req, info, supported);
 
         mdt_thread_info_fini(info);
+       req_capsule_fini(&req->rq_pill);
         RETURN(rc);
 }
 
@@ -4640,6 +4680,46 @@ static void mdt_quota_fini(const struct lu_env *env, struct mdt_device *mdt)
        EXIT;
 }
 
+static struct tgt_handler mdt_tgt_handlers[] = {
+TGT_RPC_HANDLER(MDS_FIRST_OPC,
+               0,                      MDS_CONNECT,    mdt_tgt_connect,
+               &RQF_CONNECT, LUSTRE_OBD_VERSION),
+TGT_RPC_HANDLER(MDS_FIRST_OPC,
+               0,                      MDS_DISCONNECT, tgt_disconnect,
+               &RQF_MDS_DISCONNECT, LUSTRE_OBD_VERSION),
+};
+
+static struct tgt_opc_slice mdt_common_slice[] = {
+       {
+               .tos_opc_start = MDS_FIRST_OPC,
+               .tos_opc_end   = MDS_LAST_OPC,
+               .tos_hs        = mdt_tgt_handlers
+       },
+       {
+               .tos_opc_start = OBD_FIRST_OPC,
+               .tos_opc_end   = OBD_LAST_OPC,
+               .tos_hs        = tgt_obd_handlers
+       },
+       {
+               .tos_opc_start = LDLM_FIRST_OPC,
+               .tos_opc_end   = LDLM_LAST_OPC,
+               .tos_hs        = tgt_dlm_handlers
+       },
+       {
+               .tos_opc_start = SEC_FIRST_OPC,
+               .tos_opc_end   = SEC_LAST_OPC,
+               .tos_hs        = tgt_sec_ctx_handlers
+       },
+       {
+               .tos_opc_start = UPDATE_OBJ,
+               .tos_opc_end   = UPDATE_LAST_OPC,
+               .tos_hs        = tgt_out_handlers
+       },
+       {
+               .tos_hs        = NULL
+       }
+};
+
 static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 {
        struct md_device  *next = m->mdt_child;
@@ -4793,7 +4873,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         }
 
        rwlock_init(&m->mdt_sptlrpc_lock);
-        sptlrpc_rule_set_init(&m->mdt_sptlrpc_rset);
+       sptlrpc_rule_set_init(&m->mdt_sptlrpc_rset);
 
        spin_lock_init(&m->mdt_ioepoch_lock);
         m->mdt_opts.mo_compat_resname = 0;
@@ -4885,7 +4965,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         if (rc)
                 GOTO(err_free_ns, rc);
 
-       rc = tgt_init(env, &m->mdt_lut, obd, m->mdt_bottom, NULL,
+       rc = tgt_init(env, &m->mdt_lut, obd, m->mdt_bottom, mdt_common_slice,
                      OBD_FAIL_MDS_ALL_REQUEST_NET,
                      OBD_FAIL_MDS_ALL_REPLY_NET);
        if (rc)
@@ -4960,7 +5040,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                 ldlm_timeout = MDS_LDLM_TIMEOUT_DEFAULT;
 
         RETURN(0);
-
 err_procfs:
        mdt_procfs_fini(m);
 err_recovery:
@@ -5347,9 +5426,8 @@ static int mdt_connect_internal(struct obd_export *exp,
        return 0;
 }
 
-static int mdt_connect_check_sptlrpc(struct mdt_device *mdt,
-                                    struct obd_export *exp,
-                                    struct ptlrpc_request *req)
+int mdt_connect_check_sptlrpc(struct mdt_device *mdt, struct obd_export *exp,
+                             struct ptlrpc_request *req)
 {
        struct sptlrpc_flavor   flvr;
        int                     rc = 0;
@@ -5398,11 +5476,9 @@ static int mdt_obd_connect(const struct lu_env *env,
                            struct obd_connect_data *data,
                            void *localdata)
 {
-        struct mdt_thread_info *info;
         struct obd_export      *lexp;
         struct lustre_handle    conn = { 0 };
         struct mdt_device      *mdt;
-        struct ptlrpc_request  *req;
         int                     rc;
         ENTRY;
 
@@ -5410,9 +5486,7 @@ static int mdt_obd_connect(const struct lu_env *env,
         if (!exp || !obd || !cluuid)
                 RETURN(-EINVAL);
 
-        info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-        req = info->mti_pill->rc_req;
-        mdt = mdt_dev(obd->obd_lu_dev);
+       mdt = mdt_dev(obd->obd_lu_dev);
 
        /*
         * first, check whether the stack is ready to handle requests
@@ -5434,26 +5508,17 @@ static int mdt_obd_connect(const struct lu_env *env,
         lexp = class_conn2export(&conn);
         LASSERT(lexp != NULL);
 
-        rc = mdt_connect_check_sptlrpc(mdt, lexp, req);
-        if (rc)
-                GOTO(out, rc);
-
-        if (OBD_FAIL_CHECK(OBD_FAIL_TGT_RCVG_FLAG))
-                lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
-
         rc = mdt_connect_internal(lexp, mdt, data);
         if (rc == 0) {
                 struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd;
 
                 LASSERT(lcd);
-               info->mti_exp = lexp;
                memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid);
                rc = tgt_client_new(env, lexp);
                 if (rc == 0)
                         mdt_export_stats_init(obd, lexp, localdata);
         }
 
-out:
         if (rc != 0) {
                 class_disconnect(lexp);
                 *exp = NULL;
@@ -5470,23 +5535,12 @@ static int mdt_obd_reconnect(const struct lu_env *env,
                              struct obd_connect_data *data,
                              void *localdata)
 {
-        struct mdt_thread_info *info;
-        struct mdt_device      *mdt;
-        struct ptlrpc_request  *req;
         int                     rc;
         ENTRY;
 
         if (exp == NULL || obd == NULL || cluuid == NULL)
                 RETURN(-EINVAL);
 
-        info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-        req = info->mti_pill->rc_req;
-        mdt = mdt_dev(obd->obd_lu_dev);
-
-        rc = mdt_connect_check_sptlrpc(mdt, exp, req);
-        if (rc)
-                RETURN(rc);
-
         rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
         if (rc == 0)
                 mdt_export_stats_init(obd, exp, localdata);
index e8b5830..2283d99 100644 (file)
@@ -413,6 +413,9 @@ struct mdt_thread_info {
 
         struct mdt_device         *mti_mdt;
         const struct lu_env       *mti_env;
+       /* XXX: temporary flag to have healthy mti during OUT calls
+        * to be removed upon moving MDT to the unified target code */
+       bool                       mti_txn_compat;
 
         /*
          * Additional fail id that can be set by handler. Passed to
@@ -627,12 +630,6 @@ static inline struct ptlrpc_request *mdt_info_req(struct mdt_thread_info *info)
          return info->mti_pill ? info->mti_pill->rc_req : NULL;
 }
 
-static inline int req_is_replay(struct ptlrpc_request *req)
-{
-        LASSERT(req->rq_reqmsg);
-        return !!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
-}
-
 static inline __u64 mdt_conn_flags(struct mdt_thread_info *info)
 {
        LASSERT(info->mti_exp);
@@ -864,6 +861,10 @@ int mdt_llog_prev_block(struct mdt_thread_info *info);
 int mdt_sec_ctx_handle(struct mdt_thread_info *info);
 int mdt_readpage(struct mdt_thread_info *info);
 int mdt_obd_idx_read(struct mdt_thread_info *info);
+int mdt_tgt_connect(struct tgt_session_info *tsi);
+void mdt_thread_info_init(struct ptlrpc_request *req,
+                         struct mdt_thread_info *mti);
+void mdt_thread_info_fini(struct mdt_thread_info *mti);
 
 extern struct mdt_opc_slice mdt_regular_handlers[];
 extern struct mdt_opc_slice mdt_seq_handlers[];
@@ -1285,37 +1286,5 @@ static inline char *mdt_obd_name(struct mdt_device *mdt)
 int mds_mod_init(void);
 void mds_mod_exit(void);
 
-/* Update handlers */
-int out_handle(struct mdt_thread_info *info);
-
-#define out_tx_create(info, obj, attr, fid, dof, th, reply, idx) \
-       __out_tx_create(info, obj, attr, fid, dof, th, reply, idx, \
-                       __FILE__, __LINE__)
-
-#define out_tx_attr_set(info, obj, attr, th, reply, idx) \
-       __out_tx_attr_set(info, obj, attr, th, reply, idx, \
-                         __FILE__, __LINE__)
-
-#define out_tx_xattr_set(info, obj, buf, name, fl, th, reply, idx)     \
-       __out_tx_xattr_set(info, obj, buf, name, fl, th, reply, idx,    \
-                          __FILE__, __LINE__)
-
-#define out_tx_ref_add(info, obj, th, reply, idx) \
-       __out_tx_ref_add(info, obj, th, reply, idx, __FILE__, __LINE__)
-
-#define out_tx_ref_del(info, obj, th, reply, idx) \
-       __out_tx_ref_del(info, obj, th, reply, idx, __FILE__, __LINE__)
-
-#define out_tx_index_insert(info, obj, th, name, fid, reply, idx) \
-       __out_tx_index_insert(info, obj, th, name, fid, reply, idx, \
-                             __FILE__, __LINE__)
-
-#define out_tx_index_delete(info, obj, th, name, reply, idx) \
-       __out_tx_index_delete(info, obj, th, name, reply, idx, \
-                             __FILE__, __LINE__)
-
-#define out_tx_destroy(info, obj, th, reply, idx) \
-       __out_tx_destroy(info, obj, th, reply, idx, __FILE__, __LINE__)
-
 #endif /* __KERNEL__ */
 #endif /* _MDT_H */
index ab00870..02f5643 100644 (file)
@@ -307,46 +307,6 @@ struct mdt_opc_slice mdt_fld_handlers[] = {
        }
 };
 
-/* Request with a format known in advance */
-#define DEF_UPDATE_HDL(flags, name, fn)                                        \
-       DEFINE_RPC_HANDLER(UPDATE_OBJ, flags, name, fn, &RQF_ ## name)
-
-#define target_handler mdt_handler
-static struct target_handler out_ops[] = {
-       DEF_UPDATE_HDL(MUTABOR,         UPDATE_OBJ,     out_handle),
-};
-
-static struct mdt_opc_slice update_handlers[] = {
-       {
-               .mos_opc_start = MDS_GETATTR,
-               .mos_opc_end   = MDS_LAST_OPC,
-               .mos_hs        = mdt_mds_ops
-       },
-       {
-               .mos_opc_start = OBD_PING,
-               .mos_opc_end   = OBD_LAST_OPC,
-               .mos_hs        = mdt_obd_ops
-       },
-       {
-               .mos_opc_start = LDLM_ENQUEUE,
-               .mos_opc_end   = LDLM_LAST_OPC,
-               .mos_hs        = mdt_dlm_ops
-       },
-       {
-               .mos_opc_start = SEC_CTX_INIT,
-               .mos_opc_end   = SEC_LAST_OPC,
-               .mos_hs        = mdt_sec_ctx_ops
-       },
-       {
-               .mos_opc_start = UPDATE_OBJ,
-               .mos_opc_end   = UPDATE_LAST_OPC,
-               .mos_hs        = out_ops
-       },
-       {
-               .mos_hs        = NULL
-       }
-};
-
 static int mds_regular_handle(struct ptlrpc_request *req)
 {
        return mdt_handle_common(req, mdt_regular_handlers);
@@ -362,11 +322,6 @@ static int mds_mdsc_handle(struct ptlrpc_request *req)
        return mdt_handle_common(req, mdt_seq_handlers);
 }
 
-static int mdt_out_handle(struct ptlrpc_request *req)
-{
-       return mdt_handle_common(req, update_handlers);
-}
-
 static int mds_mdss_handle(struct ptlrpc_request *req)
 {
        return mdt_handle_common(req, mdt_seq_handlers);
@@ -562,10 +517,10 @@ static int mds_start_ptlrpc_service(struct mds_device *m)
                .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
                .psc_buf                = {
                        .bc_nbufs               = MDS_NBUFS,
-                       .bc_buf_size            = MDS_OUT_BUFSIZE,
-                       .bc_req_max_size        = MDS_OUT_MAXREQSIZE,
-                       .bc_rep_max_size        = MDS_OUT_MAXREPSIZE,
-                       .bc_req_portal          = MDS_MDS_PORTAL,
+                       .bc_buf_size            = OUT_BUFSIZE,
+                       .bc_req_max_size        = OUT_MAXREQSIZE,
+                       .bc_rep_max_size        = OUT_MAXREPSIZE,
+                       .bc_req_portal          = OUT_PORTAL,
                        .bc_rep_portal          = MDC_REPLY_PORTAL,
                },
                /*
@@ -586,7 +541,7 @@ static int mds_start_ptlrpc_service(struct mds_device *m)
                        .cc_pattern             = mds_num_cpts,
                },
                .psc_ops                = {
-                       .so_req_handler         = mdt_out_handle,
+                       .so_req_handler         = tgt_request_handle,
                        .so_req_printer         = target_print_req,
                        .so_hpreq_handler       = NULL,
                },
@@ -778,7 +733,6 @@ static struct lu_device *mds_device_alloc(const struct lu_env *env,
                l = ERR_PTR(rc);
                return l;
        }
-
        return l;
 }
 
index e7986c4..3a67cdf 100644 (file)
@@ -347,153 +347,35 @@ out:
 /*
  * last_rcvd & last_committed update callbacks
  */
-static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
-                                struct thandle *th)
-{
-        struct mdt_device *mdt = mti->mti_mdt;
-        struct ptlrpc_request *req = mdt_info_req(mti);
-        struct tg_export_data *ted;
-        struct lsd_client_data *lcd;
-        loff_t off;
-        int err;
-        __s32 rc = th->th_result;
-
-        ENTRY;
-        LASSERT(req);
-        LASSERT(req->rq_export);
-        LASSERT(mdt);
-        ted = &req->rq_export->exp_target_data;
-        LASSERT(ted);
-
-       mutex_lock(&ted->ted_lcd_lock);
-       lcd = ted->ted_lcd;
-       /* if the export has already been disconnected, we have no last_rcvd
-        * slot, update server data with latest transno then */
-       if (lcd == NULL) {
-               mutex_unlock(&ted->ted_lcd_lock);
-                CWARN("commit transaction for disconnected client %s: rc %d\n",
-                      req->rq_export->exp_client_uuid.uuid, rc);
-               err = tgt_server_data_write(mti->mti_env, &mdt->mdt_lut, th);
-                RETURN(err);
-        }
-
-        off = ted->ted_lr_off;
-        LASSERT(ergo(mti->mti_transno == 0, rc != 0));
-        if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
-            lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
-                if (mti->mti_transno != 0) {
-                        if (lcd->lcd_last_close_transno > mti->mti_transno) {
-                                CERROR("Trying to overwrite bigger transno:"
-                                       "on-disk: "LPU64", new: "LPU64" "
-                                       "replay: %d. see LU-617.\n",
-                                       lcd->lcd_last_close_transno,
-                                       mti->mti_transno, req_is_replay(req));
-                                if (req_is_replay(req)) {
-                                       spin_lock(&req->rq_export->exp_lock);
-                                       req->rq_export->exp_vbr_failed = 1;
-                                       spin_unlock(&req->rq_export->exp_lock);
-                               }
-                               mutex_unlock(&ted->ted_lcd_lock);
-                                RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
-                        }
-                        lcd->lcd_last_close_transno = mti->mti_transno;
-                }
-                lcd->lcd_last_close_xid = req->rq_xid;
-                lcd->lcd_last_close_result = rc;
-        } else {
-                /* VBR: save versions in last_rcvd for reconstruct. */
-                __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
-                if (pre_versions) {
-                        lcd->lcd_pre_versions[0] = pre_versions[0];
-                        lcd->lcd_pre_versions[1] = pre_versions[1];
-                        lcd->lcd_pre_versions[2] = pre_versions[2];
-                        lcd->lcd_pre_versions[3] = pre_versions[3];
-                }
-                if (mti->mti_transno != 0) {
-                        if (lcd->lcd_last_transno > mti->mti_transno) {
-                                CERROR("Trying to overwrite bigger transno:"
-                                       "on-disk: "LPU64", new: "LPU64" "
-                                       "replay: %d. see LU-617.\n",
-                                       lcd->lcd_last_transno,
-                                       mti->mti_transno, req_is_replay(req));
-                                if (req_is_replay(req)) {
-                                       spin_lock(&req->rq_export->exp_lock);
-                                       req->rq_export->exp_vbr_failed = 1;
-                                       spin_unlock(&req->rq_export->exp_lock);
-                               }
-                               mutex_unlock(&ted->ted_lcd_lock);
-                                RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
-                        }
-                        lcd->lcd_last_transno = mti->mti_transno;
-                }
-                lcd->lcd_last_xid = req->rq_xid;
-                lcd->lcd_last_result = rc;
-                /*XXX: save intent_disposition in mdt_thread_info?
-                 * also there is bug - intent_dispostion is __u64,
-                 * see struct ldlm_reply->lock_policy_res1; */
-                lcd->lcd_last_data = mti->mti_opdata;
-        }
-
-       if (exp_connect_flags(mti->mti_exp) & OBD_CONNECT_LIGHTWEIGHT) {
-               /* Although lightweight (LW) connections have no slot in
-                * last_rcvd, we still want to maintain the in-memory
-                * lsd_client_data structure in order to properly handle reply
-                * reconstruction. */
-               struct lu_target        *tg = &mdt->mdt_lut;
-               bool                     update = false;
-
-               mutex_unlock(&ted->ted_lcd_lock);
-               err = 0;
-
-               /* All operations performed by LW clients are synchronous and
-                * we store the committed transno in the last_rcvd header */
-               spin_lock(&tg->lut_translock);
-               if (mti->mti_transno > tg->lut_lsd.lsd_last_transno) {
-                       tg->lut_lsd.lsd_last_transno = mti->mti_transno;
-                       update = true;
-               }
-               spin_unlock(&tg->lut_translock);
-
-               if (update)
-                       err = tgt_server_data_write(mti->mti_env, tg, th);
-       } else if (off <= 0) {
-               CERROR("%s: client idx %d has offset %lld\n",
-                      mdt_obd_name(mdt), ted->ted_lr_idx, off);
-               mutex_unlock(&ted->ted_lcd_lock);
-               err = -EINVAL;
-       } else {
-               err = tgt_client_data_write(mti->mti_env, &mdt->mdt_lut, lcd,
-                                           &off, th);
-               mutex_unlock(&ted->ted_lcd_lock);
-       }
-       RETURN(err);
-}
-
 extern struct lu_context_key mdt_thread_key;
 
 /* add credits for last_rcvd update */
 static int mdt_txn_start_cb(const struct lu_env *env,
                            struct thandle *th, void *cookie)
 {
-       struct mdt_device *mdt = cookie;
-       struct mdt_thread_info *mti;
-       int rc;
-       ENTRY;
+       struct lu_target        *tgt = cookie;
+       struct mdt_thread_info  *mti;
+       int                      rc;
+
+       /* if there is no session, then this transaction is not result of
+        * request processing but some local operation or echo client */
+       if (env->le_ses == NULL)
+               return 0;
 
        mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
 
-       LASSERT(mdt->mdt_lut.lut_last_rcvd);
+       LASSERT(tgt->lut_last_rcvd);
        if (mti->mti_exp == NULL)
-               RETURN(0);
+               return 0;
 
-       rc = dt_declare_record_write(env, mdt->mdt_lut.lut_last_rcvd,
+       rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
                                     sizeof(struct lsd_client_data),
                                     mti->mti_exp->exp_target_data.ted_lr_off,
                                     th);
        if (rc)
                return rc;
 
-       rc = dt_declare_record_write(env, mdt->mdt_lut.lut_last_rcvd,
+       rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
                                     sizeof(struct lr_server_data), 0, th);
        if (rc)
                return rc;
@@ -510,66 +392,34 @@ static int mdt_txn_start_cb(const struct lu_env *env,
 static int mdt_txn_stop_cb(const struct lu_env *env,
                            struct thandle *txn, void *cookie)
 {
-        struct mdt_device *mdt = cookie;
-        struct mdt_thread_info *mti;
-        struct ptlrpc_request *req;
-
-        mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-        req = mdt_info_req(mti);
+       struct lu_target        *tgt = cookie;
+       struct mdt_thread_info  *mti;
+       struct dt_object        *obj = NULL;
+       int                      rc;
 
-       if (mti->mti_mdt == NULL || req == NULL)
+       if (env->le_ses == NULL)
                return 0;
 
-        if (mti->mti_has_trans) {
-                /* XXX: currently there are allowed cases, but the wrong cases
-                 * are also possible, so better check is needed here */
-                CDEBUG(D_INFO, "More than one transaction "LPU64"\n",
-                       mti->mti_transno);
-                return 0;
-        }
-
-        mti->mti_has_trans = 1;
-       spin_lock(&mdt->mdt_lut.lut_translock);
-        if (txn->th_result != 0) {
-                if (mti->mti_transno != 0) {
-                       CERROR("Replay transno "LPU64" failed: rc %d\n",
-                               mti->mti_transno, txn->th_result);
-                       spin_unlock(&mdt->mdt_lut.lut_translock);
-                       return 0;
-                }
-        } else if (mti->mti_transno == 0) {
-                mti->mti_transno = ++ mdt->mdt_lut.lut_last_transno;
-        } else {
-                /* should be replay */
-                if (mti->mti_transno > mdt->mdt_lut.lut_last_transno)
-                        mdt->mdt_lut.lut_last_transno = mti->mti_transno;
-        }
-       spin_unlock(&mdt->mdt_lut.lut_translock);
-        /* sometimes the reply message has not been successfully packed */
-        LASSERT(req != NULL && req->rq_repmsg != NULL);
+       mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+       LASSERT(mti);
+       if (mti->mti_has_trans) {
+               /* XXX: currently there are allowed cases, but the wrong cases
+                * are also possible, so better check is needed here */
+               CDEBUG(D_INFO, "More than one transaction "LPU64"\n",
+                      mti->mti_transno);
+               return 0;
+       }
 
-        /** VBR: set new versions */
-       /* we probably should not set local transno to the remote object
-        * on another storage, What about VBR on remote object? XXX */
-       if (txn->th_result == 0 && mti->mti_mos != NULL &&
-           !mdt_object_remote(mti->mti_mos)) {
+       mti->mti_has_trans = 1;
 
-                dt_version_set(env, mdt_obj2dt(mti->mti_mos),
-                               mti->mti_transno, txn);
-                mti->mti_mos = NULL;
-        }
+       if (mti->mti_mos != NULL &&
+           mdt_object_remote(mti->mti_mos)) {
+               obj = mdt_obj2dt(mti->mti_mos);
+       }
 
-        /* filling reply data */
-        CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
-               mti->mti_transno, req->rq_export->exp_obd->obd_last_committed);
-
-        req->rq_transno = mti->mti_transno;
-        lustre_msg_set_transno(req->rq_repmsg, mti->mti_transno);
-        /* if can't add callback, do sync write */
-        txn->th_sync |= !!tgt_last_commit_cb_add(txn, &mdt->mdt_lut,
-                                                 mti->mti_exp,
-                                                 mti->mti_transno);
-        return mdt_last_rcvd_update(mti, txn);
+       rc = tgt_last_rcvd_update(env, tgt, obj, mti->mti_opdata, txn,
+                                 mdt_info_req(mti));
+       return rc;
 }
 
 int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
@@ -586,13 +436,15 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
         mdt->mdt_txn_cb.dtc_txn_start = mdt_txn_start_cb;
         mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
         mdt->mdt_txn_cb.dtc_txn_commit = NULL;
-        mdt->mdt_txn_cb.dtc_cookie = mdt;
+        mdt->mdt_txn_cb.dtc_cookie = &mdt->mdt_lut;
         mdt->mdt_txn_cb.dtc_tag = LCT_MD_THREAD;
         CFS_INIT_LIST_HEAD(&mdt->mdt_txn_cb.dtc_linkage);
 
-        dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
+       rc = mdt_server_data_init(env, mdt, lsi);
+       if (rc != 0)
+               RETURN(rc);
 
-        rc = mdt_server_data_init(env, mdt, lsi);
+       dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
 
        RETURN(rc);
 }
index 045e94c..782c6d5 100644 (file)
@@ -550,15 +550,6 @@ static int mgs_llog_open(struct tgt_session_info *tsi)
        RETURN(rc);
 }
 
-/*
- * sec context handlers
- */
-/* XXX: Implement based on mdt_sec_ctx_handle()? */
-static int mgs_sec_ctx_handle(struct tgt_session_info *tsi)
-{
-       return 0;
-}
-
 static inline int mgs_init_export(struct obd_export *exp)
 {
        struct mgs_export_data *data = &exp->u.eu_mgs_data;
@@ -883,12 +874,6 @@ TGT_LLOG_HDL_VAR(0,        LLOG_ORIGIN_HANDLE_CLOSE,       tgt_llog_close),
 TGT_LLOG_HDL    (0,    LLOG_ORIGIN_HANDLE_PREV_BLOCK,  tgt_llog_prev_block),
 };
 
-static struct tgt_handler mgs_sec_ctx_handlers[] = {
-TGT_SEC_HDL_VAR(0,     SEC_CTX_INIT,           mgs_sec_ctx_handle),
-TGT_SEC_HDL_VAR(0,     SEC_CTX_INIT_CONT,      mgs_sec_ctx_handle),
-TGT_SEC_HDL_VAR(0,     SEC_CTX_FINI,           mgs_sec_ctx_handle),
-};
-
 static struct tgt_opc_slice mgs_common_slice[] = {
        {
                .tos_opc_start = MGS_FIRST_OPC,
@@ -913,7 +898,7 @@ static struct tgt_opc_slice mgs_common_slice[] = {
        {
                .tos_opc_start = SEC_FIRST_OPC,
                .tos_opc_end   = SEC_LAST_OPC,
-               .tos_hs        = mgs_sec_ctx_handlers
+               .tos_hs        = tgt_sec_ctx_handlers
        },
        {
                .tos_hs        = NULL
index cdfe87b..6082c31 100644 (file)
@@ -2809,7 +2809,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         if (env == NULL)
                 RETURN(-ENOMEM);
 
-        rc = lu_env_init(env, LCT_DT_THREAD);
+        rc = lu_env_init(env, LCT_DT_THREAD | LCT_MD_THREAD);
         if (rc)
                 GOTO(out, rc = -ENOMEM);
 
index ab9b24c..6d7fd09 100644 (file)
@@ -610,6 +610,17 @@ out_free:
        return rc;
 }
 
+static struct tgt_opc_slice ofd_common_slice[] = {
+       {
+               .tos_opc_start = UPDATE_OBJ,
+               .tos_opc_end   = UPDATE_LAST_OPC,
+               .tos_hs        = tgt_out_handlers
+       },
+       {
+               .tos_hs         = NULL
+       }
+};
+
 static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
                     struct lu_device_type *ldt, struct lustre_cfg *cfg)
 {
@@ -748,7 +759,7 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
        m->ofd_grant_ratio =
                ofd_grant_ratio_conv(m->ofd_dt_conf.ddp_grant_reserved);
 
-       rc = tgt_init(env, &m->ofd_lut, obd, m->ofd_osd, NULL,
+       rc = tgt_init(env, &m->ofd_lut, obd, m->ofd_osd, ofd_common_slice,
                      OBD_FAIL_OST_ALL_REQUEST_NET,
                      OBD_FAIL_OST_ALL_REPLY_NET);
        if (rc)
index adc9304..acc1a1d 100644 (file)
@@ -2800,9 +2800,56 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                GOTO(out_io, rc);
        }
 
-        ping_evictor_start();
+       /* Object update service */
+       memset(&svc_conf, 0, sizeof(svc_conf));
+       svc_conf = (typeof(svc_conf)) {
+               .psc_name               = "ost_out",
+               .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = OST_NBUFS,
+                       .bc_buf_size            = OUT_BUFSIZE,
+                       .bc_req_max_size        = OUT_MAXREQSIZE,
+                       .bc_rep_max_size        = OUT_MAXREPSIZE,
+                       .bc_req_portal          = OUT_PORTAL,
+                       .bc_rep_portal          = OSC_REPLY_PORTAL,
+               },
+               /*
+                * We'd like to have a mechanism to set this on a per-device
+                * basis, but alas...
+                */
+               .psc_thr                = {
+                       .tc_thr_name            = "ll_ost_out",
+                       .tc_thr_factor          = OSS_CR_THR_FACTOR,
+                       .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
+                       .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
+                       .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
+                       .tc_nthrs_user          = oss_num_create_threads,
+                       .tc_cpu_affinity        = 1,
+                       .tc_ctx_tags            = LCT_DT_THREAD,
+               },
+               .psc_cpt                = {
+                       .cc_pattern             = oss_cpts,
+               },
+               .psc_ops                = {
+                       .so_req_handler         = tgt_request_handle,
+                       .so_req_printer         = target_print_req,
+                       .so_hpreq_handler       = NULL,
+               },
+       };
+       ost->ost_out_service = ptlrpc_register_service(&svc_conf,
+                                                      obd->obd_proc_entry);
+       if (IS_ERR(ost->ost_out_service)) {
+               rc = PTR_ERR(ost->ost_out_service);
+               CERROR("failed to start out service: %d\n", rc);
+               ost->ost_out_service = NULL;
+               GOTO(out_seq, rc);
+       }
+       ping_evictor_start();
 
-        RETURN(0);
+       RETURN(0);
+out_seq:
+       ptlrpc_unregister_service(ost->ost_seq_service);
+       ost->ost_seq_service = NULL;
 out_io:
        ptlrpc_unregister_service(ost->ost_io_service);
        ost->ost_io_service = NULL;
@@ -2833,10 +2880,12 @@ static int ost_cleanup(struct obd_device *obd)
        ptlrpc_unregister_service(ost->ost_create_service);
        ptlrpc_unregister_service(ost->ost_io_service);
        ptlrpc_unregister_service(ost->ost_seq_service);
+       ptlrpc_unregister_service(ost->ost_out_service);
        ost->ost_service = NULL;
        ost->ost_create_service = NULL;
        ost->ost_io_service = NULL;
        ost->ost_seq_service = NULL;
+       ost->ost_out_service = NULL;
 
        mutex_unlock(&ost->ost_health_mutex);
 
index dd23c84..a28f497 100644 (file)
@@ -18,7 +18,7 @@ ptlrpc_objs += sec_null.o sec_plain.o nrs.o nrs_fifo.o nrs_crr.o nrs_orr.o
 ptlrpc_objs += errno.o
 
 target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o
-target_objs += $(TARGET)tgt_handler.o
+target_objs += $(TARGET)tgt_handler.o $(TARGET)out_handler.o
 
 ptlrpc-objs := $(ldlm_objs) $(ptlrpc_objs)
 @SERVER_TRUE@ptlrpc-objs += $(target_objs)
index 5fa5155..59eee1d 100644 (file)
@@ -31,4 +31,4 @@
 #
 
 MOSTLYCLEANFILES := @MOSTLYCLEANFILES@
-EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_handler.c tgt_internal.h
+EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_handler.c tgt_internal.h out_handler.c
similarity index 66%
rename from lustre/mdt/out_handler.c
rename to lustre/target/out_handler.c
index 4e86c8e..ab75768 100644 (file)
  * Author: di.wang <di.wang@intel.com>
  */
 
-#define DEBUG_SUBSYSTEM S_MDS
+#define DEBUG_SUBSYSTEM S_CLASS
 
-#include "mdt_internal.h"
+#include <obd_class.h>
+#include <md_object.h>
+#include "tgt_internal.h"
 #include <lustre_update.h>
 
-static const char dot[] = ".";
-static const char dotdot[] = "..";
-
-/* Current out and mdt shared the same thread info, but in the future,
- * this should be decoupled with MDT XXX*/
-#define out_thread_info                mdt_thread_info
-#define out_thread_key         mdt_thread_key
-
-struct out_thread_info *out_env_info(const struct lu_env *env)
-{
-       struct out_thread_info *info;
-
-       info = lu_context_key_get(&env->le_ctx, &out_thread_key);
-       LASSERT(info != NULL);
-       return info;
-}
-
-static inline char *dt_obd_name(struct dt_device *dt)
-{
-       return dt->dd_lu_dev.ld_obd->obd_name;
-}
-
 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
                           tx_exec_func_t undo, char *file, int line)
 {
@@ -78,89 +58,89 @@ struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
 }
 
 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
-                       struct thandle_exec_args *th)
+                       struct thandle_exec_args *ta)
 {
-       memset(th, 0, sizeof(*th));
-       th->ta_handle = dt_trans_create(env, dt);
-       if (IS_ERR(th->ta_handle)) {
+       memset(ta, 0, sizeof(*ta));
+       ta->ta_handle = dt_trans_create(env, dt);
+       if (IS_ERR(ta->ta_handle)) {
                CERROR("%s: start handle error: rc = %ld\n",
-                      dt_obd_name(dt), PTR_ERR(th->ta_handle));
-               return PTR_ERR(th->ta_handle);
+                      dt_obd_name(dt), PTR_ERR(ta->ta_handle));
+               return PTR_ERR(ta->ta_handle);
        }
-       th->ta_dev = dt;
+       ta->ta_dev = dt;
        /*For phase I, sync for cross-ref operation*/
-       th->ta_handle->th_sync = 1;
+       ta->ta_handle->th_sync = 1;
        return 0;
 }
 
 static int out_trans_start(const struct lu_env *env,
-                          struct thandle_exec_args *th)
+                          struct thandle_exec_args *ta)
 {
        /* Always do sync commit for Phase I */
-       LASSERT(th->ta_handle->th_sync != 0);
-       return dt_trans_start(env, th->ta_dev, th->ta_handle);
+       LASSERT(ta->ta_handle->th_sync != 0);
+       return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
 }
 
 static int out_trans_stop(const struct lu_env *env,
-                         struct thandle_exec_args *th, int err)
+                         struct thandle_exec_args *ta, int err)
 {
        int i;
        int rc;
 
-       th->ta_handle->th_result = err;
-       LASSERT(th->ta_handle->th_sync != 0);
-       rc = dt_trans_stop(env, th->ta_dev, th->ta_handle);
-       for (i = 0; i < th->ta_argno; i++) {
-               if (th->ta_args[i].object != NULL) {
-                       lu_object_put(env, &th->ta_args[i].object->do_lu);
-                       th->ta_args[i].object = NULL;
+       ta->ta_handle->th_result = err;
+       LASSERT(ta->ta_handle->th_sync != 0);
+       rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
+       for (i = 0; i < ta->ta_argno; i++) {
+               if (ta->ta_args[i].object != NULL) {
+                       lu_object_put(env, &ta->ta_args[i].object->do_lu);
+                       ta->ta_args[i].object = NULL;
                }
        }
 
        return rc;
 }
 
-int out_tx_end(const struct lu_env *env, struct thandle_exec_args *th)
+int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
 {
-       struct out_thread_info *info = out_env_info(env);
+       struct tgt_session_info *tsi = tgt_ses_info(env);
        int i = 0, rc;
 
-       LASSERT(th->ta_dev);
-       LASSERT(th->ta_handle);
+       LASSERT(ta->ta_dev);
+       LASSERT(ta->ta_handle);
 
-       if (th->ta_err != 0 || th->ta_argno == 0)
-               GOTO(stop, rc = th->ta_err);
+       if (ta->ta_err != 0 || ta->ta_argno == 0)
+               GOTO(stop, rc = ta->ta_err);
 
-       rc = out_trans_start(env, th);
+       rc = out_trans_start(env, ta);
        if (unlikely(rc))
                GOTO(stop, rc);
 
-       for (i = 0; i < th->ta_argno; i++) {
-               rc = th->ta_args[i].exec_fn(env, th->ta_handle,
-                                           &th->ta_args[i]);
+       for (i = 0; i < ta->ta_argno; i++) {
+               rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
+                                           &ta->ta_args[i]);
                if (unlikely(rc)) {
                        CDEBUG(D_INFO, "error during execution of #%u from"
-                              " %s:%d: rc = %d\n", i, th->ta_args[i].file,
-                              th->ta_args[i].line, rc);
+                              " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
+                              ta->ta_args[i].line, rc);
                        while (--i >= 0) {
-                               LASSERTF(th->ta_args[i].undo_fn != NULL,
+                               LASSERTF(ta->ta_args[i].undo_fn != NULL,
                                    "can't undo changes, hope for failover!\n");
-                               th->ta_args[i].undo_fn(env, th->ta_handle,
-                                                      &th->ta_args[i]);
+                               ta->ta_args[i].undo_fn(env, ta->ta_handle,
+                                                      &ta->ta_args[i]);
                        }
                        break;
                }
        }
 
        /* Only fail for real update */
-       info->mti_fail_id = OBD_FAIL_UPDATE_OBJ_NET_REP;
+       tsi->tsi_reply_fail_id = OBD_FAIL_UPDATE_OBJ_NET_REP;
 stop:
        CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
-              dt_obd_name(th->ta_dev), i, th->ta_argno, rc);
-       out_trans_stop(env, th, rc);
-       th->ta_handle = NULL;
-       th->ta_argno = 0;
-       th->ta_err = 0;
+              dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
+       out_trans_stop(env, ta, rc);
+       ta->ta_handle = NULL;
+       ta->ta_argno = 0;
+       ta->ta_err = 0;
 
        RETURN(rc);
 }
@@ -239,8 +219,8 @@ int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
                       struct tx_arg *arg)
 {
-       struct dt_object *dt_obj = arg->object;
-       int rc;
+       struct dt_object        *dt_obj = arg->object;
+       int                      rc;
 
        CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
               dt_obd_name(th->th_dev),
@@ -265,19 +245,19 @@ int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
                           struct lu_attr *attr, struct lu_fid *parent_fid,
                           struct dt_object_format *dof,
-                          struct thandle_exec_args *th,
+                          struct thandle_exec_args *ta,
                           struct update_reply *reply,
                           int index, char *file, int line)
 {
        struct tx_arg *arg;
 
-       LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
-                                      th->ta_handle);
-       if (th->ta_err != 0)
-               return th->ta_err;
+       LASSERT(ta->ta_handle != NULL);
+       ta->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
+                                      ta->ta_handle);
+       if (ta->ta_err != 0)
+               return ta->ta_err;
 
-       arg = tx_add_exec(th, out_tx_create_exec, out_tx_create_undo, file,
+       arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
                          line);
        LASSERT(arg);
 
@@ -295,13 +275,14 @@ static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
        return 0;
 }
 
-static int out_create(struct out_thread_info *info)
+static int out_create(struct tgt_session_info *tsi)
 {
-       struct update           *update = info->mti_u.update.mti_update;
-       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
-       struct dt_object_format *dof = &info->mti_u.update.mti_update_dof;
-       struct obdo             *lobdo = &info->mti_u.update.mti_obdo;
-       struct lu_attr          *attr = &info->mti_attr.ma_attr;
+       struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
+       struct update           *update = tti->tti_u.update.tti_update;
+       struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
+       struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
+       struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
+       struct lu_attr          *attr = &tti->tti_attr;
        struct lu_fid           *fid = NULL;
        struct obdo             *wobdo;
        int                     size;
@@ -312,7 +293,7 @@ static int out_create(struct out_thread_info *info)
        wobdo = update_param_buf(update, 0, &size);
        if (wobdo == NULL || size != sizeof(*wobdo)) {
                CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), -EPROTO);
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
@@ -327,14 +308,13 @@ static int out_create(struct out_thread_info *info)
                fid = update_param_buf(update, 1, &size);
                if (fid == NULL || size != sizeof(*fid)) {
                        CERROR("%s: invalid fid: rc = %d\n",
-                              mdt_obd_name(info->mti_mdt), -EPROTO);
+                              tgt_name(tsi->tsi_tgt), -EPROTO);
                        RETURN(err_serious(-EPROTO));
                }
                fid_le_to_cpu(fid, fid);
                if (!fid_is_sane(fid)) {
                        CERROR("%s: invalid fid "DFID": rc = %d\n",
-                              mdt_obd_name(info->mti_mdt),
-                              PFID(fid), -EPROTO);
+                              tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
                        RETURN(err_serious(-EPROTO));
                }
        }
@@ -342,10 +322,10 @@ static int out_create(struct out_thread_info *info)
        if (lu_object_exists(&obj->do_lu))
                RETURN(-EEXIST);
 
-       rc = out_tx_create(info->mti_env, obj, attr, fid, dof,
-                          &info->mti_handle,
-                          info->mti_u.update.mti_update_reply,
-                          info->mti_u.update.mti_update_reply_index);
+       rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
+                          &tti->tti_tea,
+                          tti->tti_u.update.tti_update_reply,
+                          tti->tti_u.update.tti_update_reply_index);
 
        RETURN(rc);
 }
@@ -370,8 +350,7 @@ static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
               PFID(lu_object_fid(&dt_obj->do_lu)));
 
        dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
-       rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr,
-                        th, NULL);
+       rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
        dt_write_unlock(env, dt_obj);
 
        CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
@@ -407,22 +386,23 @@ static int __out_tx_attr_set(const struct lu_env *env,
        return 0;
 }
 
-static int out_attr_set(struct out_thread_info *info)
+static int out_attr_set(struct tgt_session_info *tsi)
 {
-       struct update           *update = info->mti_u.update.mti_update;
-       struct lu_attr          *attr = &info->mti_attr.ma_attr;
-       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
-       struct obdo             *lobdo = &info->mti_u.update.mti_obdo;
+       struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
+       struct update           *update = tti->tti_u.update.tti_update;
+       struct lu_attr          *attr = &tti->tti_attr;
+       struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
+       struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
        struct obdo             *wobdo;
-       int                     size;
-       int                     rc;
+       int                      size;
+       int                      rc;
 
        ENTRY;
 
        wobdo = update_param_buf(update, 0, &size);
        if (wobdo == NULL || size != sizeof(*wobdo)) {
                CERROR("%s: empty obdo in the update: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), -EPROTO);
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
@@ -432,19 +412,20 @@ static int out_attr_set(struct out_thread_info *info)
        lustre_get_wire_obdo(NULL, lobdo, wobdo);
        la_from_obdo(attr, lobdo, lobdo->o_valid);
 
-       rc = out_tx_attr_set(info->mti_env, obj, attr, &info->mti_handle,
-                            info->mti_u.update.mti_update_reply,
-                            info->mti_u.update.mti_update_reply_index);
+       rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
+                            tti->tti_u.update.tti_update_reply,
+                            tti->tti_u.update.tti_update_reply_index);
 
        RETURN(rc);
 }
 
-static int out_attr_get(struct out_thread_info *info)
+static int out_attr_get(struct tgt_session_info *tsi)
 {
-       struct obdo             *obdo = &info->mti_u.update.mti_obdo;
-       const struct lu_env     *env = info->mti_env;
-       struct lu_attr          *la = &info->mti_attr.ma_attr;
-       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
+       const struct lu_env     *env = tsi->tsi_env;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
+       struct lu_attr          *la = &tti->tti_attr;
+       struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
        int                     rc;
 
        ENTRY;
@@ -464,8 +445,8 @@ static int out_attr_get(struct out_thread_info *info)
         */
        la->la_flags = 0;
        if (S_ISDIR(la->la_mode)) {
-               struct dt_it     *it;
-               const struct dt_it_ops *iops;
+               struct dt_it            *it;
+               const struct dt_it_ops  *iops;
 
                if (!dt_try_as_dir(env, obj))
                        GOTO(out_unlock, rc = -ENOTDIR);
@@ -502,31 +483,32 @@ out_unlock:
        dt_read_unlock(env, obj);
 
        CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
-              mdt_obd_name(info->mti_mdt),
-              info->mti_u.update.mti_update_reply, 0, rc);
+              tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
+              0, rc);
 
-       update_insert_reply(info->mti_u.update.mti_update_reply, obdo,
+       update_insert_reply(tti->tti_u.update.tti_update_reply, obdo,
                            sizeof(*obdo), 0, rc);
        RETURN(rc);
 }
 
-static int out_xattr_get(struct out_thread_info *info)
+static int out_xattr_get(struct tgt_session_info *tsi)
 {
-       struct update           *update = info->mti_u.update.mti_update;
-       const struct lu_env     *env = info->mti_env;
-       struct lu_buf           *lbuf = &info->mti_buf;
-       struct update_reply     *reply = info->mti_u.update.mti_update_reply;
-       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
+       const struct lu_env     *env = tsi->tsi_env;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct update           *update = tti->tti_u.update.tti_update;
+       struct lu_buf           *lbuf = &tti->tti_buf;
+       struct update_reply     *reply = tti->tti_u.update.tti_update_reply;
+       struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
        char                    *name;
        void                    *ptr;
-       int                     rc;
+       int                      rc;
 
        ENTRY;
 
        name = (char *)update_param_buf(update, 0, NULL);
        if (name == NULL) {
                CERROR("%s: empty name for xattr get: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), -EPROTO);
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
@@ -550,7 +532,7 @@ static int out_xattr_get(struct out_thread_info *info)
        lbuf->lb_len = rc;
        rc = 0;
        CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
-              mdt_obd_name(info->mti_mdt), PFID(lu_object_fid(&obj->do_lu)),
+              tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
               name, (int)lbuf->lb_len);
 out:
        *(int *)ptr = rc;
@@ -558,13 +540,14 @@ out:
        RETURN(rc);
 }
 
-static int out_index_lookup(struct out_thread_info *info)
+static int out_index_lookup(struct tgt_session_info *tsi)
 {
-       struct update           *update = info->mti_u.update.mti_update;
-       const struct lu_env     *env = info->mti_env;
-       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
+       const struct lu_env     *env = tsi->tsi_env;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct update           *update = tti->tti_u.update.tti_update;
+       struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
        char                    *name;
-       int                     rc;
+       int                      rc;
 
        ENTRY;
 
@@ -574,7 +557,7 @@ static int out_index_lookup(struct out_thread_info *info)
        name = (char *)update_param_buf(update, 0, NULL);
        if (name == NULL) {
                CERROR("%s: empty name for lookup: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), -EPROTO);
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
@@ -582,7 +565,7 @@ static int out_index_lookup(struct out_thread_info *info)
        if (!dt_try_as_dir(env, obj))
                GOTO(out_unlock, rc = -ENOTDIR);
 
-       rc = dt_lookup(env, obj, (struct dt_rec *)&info->mti_tmp_fid1,
+       rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
                (struct dt_key *)name, NULL);
 
        if (rc < 0)
@@ -593,19 +576,18 @@ static int out_index_lookup(struct out_thread_info *info)
 
        CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
               PFID(lu_object_fid(&obj->do_lu)), name,
-              PFID(&info->mti_tmp_fid1), rc);
-       fid_cpu_to_le(&info->mti_tmp_fid1, &info->mti_tmp_fid1);
+              PFID(&tti->tti_fid1), rc);
+       fid_cpu_to_le(&tti->tti_fid1, &tti->tti_fid1);
 
 out_unlock:
        dt_read_unlock(env, obj);
 
        CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
-              mdt_obd_name(info->mti_mdt),
-              info->mti_u.update.mti_update_reply, 0, rc);
+              tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
+              0, rc);
 
-       update_insert_reply(info->mti_u.update.mti_update_reply,
-                           &info->mti_tmp_fid1, sizeof(info->mti_tmp_fid1),
-                           0, rc);
+       update_insert_reply(tti->tti_u.update.tti_update_reply,
+                           &tti->tti_fid1, sizeof(tti->tti_fid1), 0, rc);
        RETURN(rc);
 }
 
@@ -644,19 +626,19 @@ static int __out_tx_xattr_set(const struct lu_env *env,
                              struct dt_object *dt_obj,
                              const struct lu_buf *buf,
                              const char *name, int flags,
-                             struct thandle_exec_args *th,
+                             struct thandle_exec_args *ta,
                              struct update_reply *reply, int index,
                              char *file, int line)
 {
        struct tx_arg           *arg;
 
-       LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
-                                         flags, th->ta_handle);
-       if (th->ta_err != 0)
-               return th->ta_err;
+       LASSERT(ta->ta_handle != NULL);
+       ta->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
+                                         flags, ta->ta_handle);
+       if (ta->ta_err != 0)
+               return ta->ta_err;
 
-       arg = tx_add_exec(th, out_tx_xattr_set_exec, NULL, file, line);
+       arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
        LASSERT(arg);
        lu_object_get(&dt_obj->do_lu);
        arg->object = dt_obj;
@@ -669,30 +651,31 @@ static int __out_tx_xattr_set(const struct lu_env *env,
        return 0;
 }
 
-static int out_xattr_set(struct out_thread_info *info)
+static int out_xattr_set(struct tgt_session_info *tsi)
 {
-       struct update           *update = info->mti_u.update.mti_update;
-       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
-       struct lu_buf           *lbuf = &info->mti_buf;
+       struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
+       struct update           *update = tti->tti_u.update.tti_update;
+       struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
+       struct lu_buf           *lbuf = &tti->tti_buf;
        char                    *name;
        char                    *buf;
        char                    *tmp;
-       int                     buf_len = 0;
-       int                     flag;
-       int                     rc;
+       int                      buf_len = 0;
+       int                      flag;
+       int                      rc;
        ENTRY;
 
        name = update_param_buf(update, 0, NULL);
        if (name == NULL) {
                CERROR("%s: empty name for xattr set: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), -EPROTO);
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
        buf = (char *)update_param_buf(update, 1, &buf_len);
        if (buf == NULL || buf_len == 0) {
                CERROR("%s: empty buf for xattr set: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), -EPROTO);
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
@@ -702,16 +685,16 @@ static int out_xattr_set(struct out_thread_info *info)
        tmp = (char *)update_param_buf(update, 2, NULL);
        if (tmp == NULL) {
                CERROR("%s: empty flag for xattr set: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), -EPROTO);
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
        flag = le32_to_cpu(*(int *)tmp);
 
-       rc = out_tx_xattr_set(info->mti_env, obj, lbuf, name, flag,
-                             &info->mti_handle,
-                             info->mti_u.update.mti_update_reply,
-                             info->mti_u.update.mti_update_reply_index);
+       rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
+                             &tti->tti_tea,
+                             tti->tti_u.update.tti_update_reply,
+                             tti->tti_u.update.tti_update_reply_index);
        RETURN(rc);
 }
 
@@ -764,18 +747,18 @@ static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
 
 static int __out_tx_ref_add(const struct lu_env *env,
                            struct dt_object *dt_obj,
-                           struct thandle_exec_args *th,
+                           struct thandle_exec_args *ta,
                            struct update_reply *reply,
                            int index, char *file, int line)
 {
-       struct tx_arg           *arg;
+       struct tx_arg   *arg;
 
-       LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_ref_add(env, dt_obj, th->ta_handle);
-       if (th->ta_err != 0)
-               return th->ta_err;
+       LASSERT(ta->ta_handle != NULL);
+       ta->ta_err = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
+       if (ta->ta_err != 0)
+               return ta->ta_err;
 
-       arg = tx_add_exec(th, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
+       arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
                          line);
        LASSERT(arg);
        lu_object_get(&dt_obj->do_lu);
@@ -788,24 +771,25 @@ static int __out_tx_ref_add(const struct lu_env *env,
 /**
  * increase ref of the object
  **/
-static int out_ref_add(struct out_thread_info *info)
+static int out_ref_add(struct tgt_session_info *tsi)
 {
-       struct dt_object  *obj = info->mti_u.update.mti_dt_object;
-       int               rc;
+       struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
+       struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
+       int                      rc;
 
        ENTRY;
 
-       rc = out_tx_ref_add(info->mti_env, obj, &info->mti_handle,
-                           info->mti_u.update.mti_update_reply,
-                           info->mti_u.update.mti_update_reply_index);
+       rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
+                           tti->tti_u.update.tti_update_reply,
+                           tti->tti_u.update.tti_update_reply_index);
        RETURN(rc);
 }
 
 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
                               struct tx_arg *arg)
 {
-       struct dt_object *dt_obj = arg->object;
-       int rc;
+       struct dt_object        *dt_obj = arg->object;
+       int                      rc;
 
        rc = out_obj_ref_del(env, dt_obj, th);
 
@@ -825,18 +809,18 @@ static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
 
 static int __out_tx_ref_del(const struct lu_env *env,
                            struct dt_object *dt_obj,
-                           struct thandle_exec_args *th,
+                           struct thandle_exec_args *ta,
                            struct update_reply *reply,
                            int index, char *file, int line)
 {
-       struct tx_arg           *arg;
+       struct tx_arg   *arg;
 
-       LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_ref_del(env, dt_obj, th->ta_handle);
-       if (th->ta_err != 0)
-               return th->ta_err;
+       LASSERT(ta->ta_handle != NULL);
+       ta->ta_err = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
+       if (ta->ta_err != 0)
+               return ta->ta_err;
 
-       arg = tx_add_exec(th, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
+       arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
                          line);
        LASSERT(arg);
        lu_object_get(&dt_obj->do_lu);
@@ -846,19 +830,20 @@ static int __out_tx_ref_del(const struct lu_env *env,
        return 0;
 }
 
-static int out_ref_del(struct out_thread_info *info)
+static int out_ref_del(struct tgt_session_info *tsi)
 {
-       struct dt_object  *obj = info->mti_u.update.mti_dt_object;
-       int               rc;
+       struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
+       struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
+       int                      rc;
 
        ENTRY;
 
        if (!lu_object_exists(&obj->do_lu))
                RETURN(-ENOENT);
 
-       rc = out_tx_ref_del(info->mti_env, obj, &info->mti_handle,
-                           info->mti_u.update.mti_update_reply,
-                           info->mti_u.update.mti_update_reply_index);
+       rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
+                           tti->tti_u.update.tti_update_reply,
+                           tti->tti_u.update.tti_update_reply_index);
        RETURN(rc);
 }
 
@@ -931,29 +916,29 @@ static int out_tx_index_insert_undo(const struct lu_env *env,
 static int __out_tx_index_insert(const struct lu_env *env,
                                 struct dt_object *dt_obj,
                                 char *name, struct lu_fid *fid,
-                                struct thandle_exec_args *th,
+                                struct thandle_exec_args *ta,
                                 struct update_reply *reply,
                                 int index, char *file, int line)
 {
        struct tx_arg *arg;
 
-       LASSERT(th->ta_handle != NULL);
+       LASSERT(ta->ta_handle != NULL);
 
        if (lu_object_exists(&dt_obj->do_lu)) {
                if (dt_try_as_dir(env, dt_obj) == 0) {
-                       th->ta_err = -ENOTDIR;
-                       return th->ta_err;
+                       ta->ta_err = -ENOTDIR;
+                       return ta->ta_err;
                }
-               th->ta_err = dt_declare_insert(env, dt_obj,
+               ta->ta_err = dt_declare_insert(env, dt_obj,
                                               (struct dt_rec *)fid,
                                               (struct dt_key *)name,
-                                              th->ta_handle);
+                                              ta->ta_handle);
        }
 
-       if (th->ta_err != 0)
-               return th->ta_err;
+       if (ta->ta_err != 0)
+               return ta->ta_err;
 
-       arg = tx_add_exec(th, out_tx_index_insert_exec,
+       arg = tx_add_exec(ta, out_tx_index_insert_exec,
                          out_tx_index_insert_undo, file,
                          line);
        LASSERT(arg);
@@ -967,42 +952,43 @@ static int __out_tx_index_insert(const struct lu_env *env,
        return 0;
 }
 
-static int out_index_insert(struct out_thread_info *info)
+static int out_index_insert(struct tgt_session_info *tsi)
 {
-       struct update     *update = info->mti_u.update.mti_update;
-       struct dt_object  *obj = info->mti_u.update.mti_dt_object;
+       struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
+       struct update     *update = tti->tti_u.update.tti_update;
+       struct dt_object  *obj = tti->tti_u.update.tti_dt_object;
        struct lu_fid     *fid;
        char              *name;
-       int               rc = 0;
-       int               size;
+       int                rc = 0;
+       int                size;
+
        ENTRY;
 
        name = (char *)update_param_buf(update, 0, NULL);
        if (name == NULL) {
                CERROR("%s: empty name for index insert: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), -EPROTO);
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
        fid = (struct lu_fid *)update_param_buf(update, 1, &size);
        if (fid == NULL || size != sizeof(*fid)) {
                CERROR("%s: invalid fid: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), -EPROTO);
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
                       RETURN(err_serious(-EPROTO));
        }
 
        fid_le_to_cpu(fid, fid);
        if (!fid_is_sane(fid)) {
                CERROR("%s: invalid FID "DFID": rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), PFID(fid),
-                      -EPROTO);
+                      tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
-       rc = out_tx_index_insert(info->mti_env, obj, name, fid,
-                                &info->mti_handle,
-                                info->mti_u.update.mti_update_reply,
-                                info->mti_u.update.mti_update_reply_index);
+       rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
+                                &tti->tti_tea,
+                                tti->tti_u.update.tti_update_reply,
+                                tti->tti_u.update.tti_update_reply_index);
        RETURN(rc);
 }
 
@@ -1033,25 +1019,25 @@ static int out_tx_index_delete_undo(const struct lu_env *env,
 
 static int __out_tx_index_delete(const struct lu_env *env,
                                 struct dt_object *dt_obj, char *name,
-                                struct thandle_exec_args *th,
+                                struct thandle_exec_args *ta,
                                 struct update_reply *reply,
                                 int index, char *file, int line)
 {
        struct tx_arg *arg;
 
        if (dt_try_as_dir(env, dt_obj) == 0) {
-               th->ta_err = -ENOTDIR;
-               return th->ta_err;
+               ta->ta_err = -ENOTDIR;
+               return ta->ta_err;
        }
 
-       LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_delete(env, dt_obj,
+       LASSERT(ta->ta_handle != NULL);
+       ta->ta_err = dt_declare_delete(env, dt_obj,
                                       (struct dt_key *)name,
-                                      th->ta_handle);
-       if (th->ta_err != 0)
-               return th->ta_err;
+                                      ta->ta_handle);
+       if (ta->ta_err != 0)
+               return ta->ta_err;
 
-       arg = tx_add_exec(th, out_tx_index_delete_exec,
+       arg = tx_add_exec(ta, out_tx_index_delete_exec,
                          out_tx_index_delete_undo, file,
                          line);
        LASSERT(arg);
@@ -1063,25 +1049,27 @@ static int __out_tx_index_delete(const struct lu_env *env,
        return 0;
 }
 
-static int out_index_delete(struct out_thread_info *info)
+static int out_index_delete(struct tgt_session_info *tsi)
 {
-       struct update           *update = info->mti_u.update.mti_update;
-       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
+       struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
+       struct update           *update = tti->tti_u.update.tti_update;
+       struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
        char                    *name;
-       int                     rc = 0;
+       int                      rc = 0;
 
        if (!lu_object_exists(&obj->do_lu))
                RETURN(-ENOENT);
+
        name = (char *)update_param_buf(update, 0, NULL);
        if (name == NULL) {
                CERROR("%s: empty name for index delete: rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), -EPROTO);
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
-       rc = out_tx_index_delete(info->mti_env, obj, name, &info->mti_handle,
-                                info->mti_u.update.mti_update_reply,
-                                info->mti_u.update.mti_update_reply_index);
+       rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
+                                tti->tti_u.update.tti_update_reply,
+                                tti->tti_u.update.tti_update_reply_index);
        RETURN(rc);
 }
 
@@ -1110,18 +1098,18 @@ static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
 }
 
 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
-                            struct thandle_exec_args *th,
+                            struct thandle_exec_args *ta,
                             struct update_reply *reply,
                             int index, char *file, int line)
 {
        struct tx_arg *arg;
 
-       LASSERT(th->ta_handle != NULL);
-       th->ta_err = dt_declare_destroy(env, dt_obj, th->ta_handle);
-       if (th->ta_err)
-               return th->ta_err;
+       LASSERT(ta->ta_handle != NULL);
+       ta->ta_err = dt_declare_destroy(env, dt_obj, ta->ta_handle);
+       if (ta->ta_err)
+               return ta->ta_err;
 
-       arg = tx_add_exec(th, out_tx_destroy_exec, out_tx_destroy_undo,
+       arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
                          file, line);
        LASSERT(arg);
        lu_object_get(&dt_obj->do_lu);
@@ -1131,126 +1119,134 @@ static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
        return 0;
 }
 
-static int out_destroy(struct out_thread_info *info)
+static int out_destroy(struct tgt_session_info *tsi)
 {
-       struct update           *update = info->mti_u.update.mti_update;
-       struct dt_object        *obj = info->mti_u.update.mti_dt_object;
+       struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
+       struct update           *update = tti->tti_u.update.tti_update;
+       struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
        struct lu_fid           *fid;
-       int                     rc;
+       int                      rc;
        ENTRY;
 
        fid = &update->u_fid;
        fid_le_to_cpu(fid, fid);
        if (!fid_is_sane(fid)) {
                CERROR("%s: invalid FID "DFID": rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), PFID(fid), -EPROTO);
+                      tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
        if (!lu_object_exists(&obj->do_lu))
                RETURN(-ENOENT);
 
-       rc = out_tx_destroy(info->mti_env, obj, &info->mti_handle,
-                           info->mti_u.update.mti_update_reply,
-                           info->mti_u.update.mti_update_reply_index);
+       rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
+                           tti->tti_u.update.tti_update_reply,
+                           tti->tti_u.update.tti_update_reply_index);
 
        RETURN(rc);
 }
 
-#define DEF_OUT_HNDL(opc, name, fail_id, flags, fn)     \
+#define DEF_OUT_HNDL(opc, name, flags, fn)     \
 [opc - OBJ_CREATE] = {                                 \
-       .mh_name    = name,                             \
-       .mh_fail_id = fail_id,                          \
-       .mh_opc     = opc,                              \
-       .mh_flags   = flags,                            \
-       .mh_act     = fn,                               \
-       .mh_fmt     = NULL                              \
+       .th_name    = name,                             \
+       .th_fail_id = 0,                                \
+       .th_opc     = opc,                              \
+       .th_flags   = flags,                            \
+       .th_act     = fn,                               \
+       .th_fmt     = NULL,                             \
+       .th_version = 0,                                \
 }
 
 #define out_handler mdt_handler
-static struct out_handler out_update_ops[] = {
-       DEF_OUT_HNDL(OBJ_CREATE, "obj_create", 0, MUTABOR | HABEO_REFERO,
+static struct tgt_handler out_update_ops[] = {
+       DEF_OUT_HNDL(OBJ_CREATE, "obj_create", MUTABOR | HABEO_REFERO,
                     out_create),
-       DEF_OUT_HNDL(OBJ_DESTROY, "obj_create", 0, MUTABOR | HABEO_REFERO,
+       DEF_OUT_HNDL(OBJ_DESTROY, "obj_create", MUTABOR | HABEO_REFERO,
                     out_destroy),
-       DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", 0, MUTABOR | HABEO_REFERO,
+       DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", MUTABOR | HABEO_REFERO,
                     out_ref_add),
-       DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", 0, MUTABOR | HABEO_REFERO,
+       DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", MUTABOR | HABEO_REFERO,
                     out_ref_del),
-       DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set", 0,  MUTABOR | HABEO_REFERO,
+       DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set",  MUTABOR | HABEO_REFERO,
                     out_attr_set),
-       DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get", 0,  HABEO_REFERO,
+       DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get",  HABEO_REFERO,
                     out_attr_get),
-       DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", 0, MUTABOR | HABEO_REFERO,
+       DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", MUTABOR | HABEO_REFERO,
                     out_xattr_set),
-       DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", 0, HABEO_REFERO,
+       DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", HABEO_REFERO,
                     out_xattr_get),
-       DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", 0, HABEO_REFERO,
+       DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", HABEO_REFERO,
                     out_index_lookup),
-       DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert", 0,
+       DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert",
                     MUTABOR | HABEO_REFERO, out_index_insert),
-       DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete", 0,
+       DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete",
                     MUTABOR | HABEO_REFERO, out_index_delete),
 };
 
-#define out_opc_slice mdt_opc_slice
-static struct out_opc_slice out_handlers[] = {
-       {
-               .mos_opc_start = OBJ_CREATE,
-               .mos_opc_end   = OBJ_LAST,
-               .mos_hs = out_update_ops
-       },
-};
+struct tgt_handler *out_handler_find(__u32 opc)
+{
+       struct tgt_handler *h;
+
+       h = NULL;
+       if (OBJ_CREATE <= opc && opc < OBJ_LAST) {
+               h = &out_update_ops[opc - OBJ_CREATE];
+               LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
+                        h->th_opc, opc);
+       } else {
+               h = NULL; /* unsupported opc */
+       }
+       return h;
+}
 
 /**
  * Object updates between Targets. Because all the updates has been
- * dis-assemblied into object updates in master MDD layer, so out
- * will skip MDD layer, and call OSD API directly to execute these
- * updates.
+ * dis-assemblied into object updates at sender side, so OUT will
+ * call OSD API directly to execute these updates.
  *
- * In phase I, all of the updates in the request need to be executed
+ * In DNE phase I all of the updates in the request need to be executed
  * in one transaction, and the transaction has to be synchronously.
  *
  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
  * format.
  */
-int out_handle(struct out_thread_info *info)
+int out_handle(struct tgt_session_info *tsi)
 {
-       struct thandle_exec_args        *th = &info->mti_handle;
-       struct req_capsule              *pill = info->mti_pill;
-       struct mdt_device               *mdt = info->mti_mdt;
-       struct dt_device                *dt = mdt->mdt_bottom;
-       const struct lu_env             *env = info->mti_env;
+       const struct lu_env             *env = tsi->tsi_env;
+       struct tgt_thread_info          *tti = tgt_th_info(env);
+       struct thandle_exec_args        *ta = &tti->tti_tea;
+       struct req_capsule              *pill = tsi->tsi_pill;
+       struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
        struct update_buf               *ubuf;
        struct update                   *update;
        struct update_reply             *update_reply;
-       int                             bufsize;
-       int                             count;
-       int                             old_batchid = -1;
-       unsigned                        off;
-       int                             i;
-       int                             rc = 0;
-       int                             rc1 = 0;
+       int                              bufsize;
+       int                              count;
+       int                              old_batchid = -1;
+       unsigned                         off;
+       int                              i;
+       int                              rc = 0;
+       int                              rc1 = 0;
+
        ENTRY;
 
        req_capsule_set(pill, &RQF_UPDATE_OBJ);
        bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
        if (bufsize != UPDATE_BUFFER_SIZE) {
                CERROR("%s: invalid bufsize %d: rc = %d\n",
-                      mdt_obd_name(mdt), bufsize, -EPROTO);
+                      tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
        ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
        if (ubuf == NULL) {
-               CERROR("%s: No buf!: rc = %d\n", mdt_obd_name(mdt),
+               CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
                       -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
        if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
                CERROR("%s: invalid magic %x expect %x: rc = %d\n",
-                      mdt_obd_name(mdt), le32_to_cpu(ubuf->ub_magic),
+                      tgt_name(tsi->tsi_tgt), le32_to_cpu(ubuf->ub_magic),
                       UPDATE_BUFFER_MAGIC, -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
@@ -1258,7 +1254,7 @@ int out_handle(struct out_thread_info *info)
        count = le32_to_cpu(ubuf->ub_count);
        if (count <= 0) {
                CERROR("%s: No update!: rc = %d\n",
-                      mdt_obd_name(mdt), -EPROTO);
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
@@ -1267,24 +1263,24 @@ int out_handle(struct out_thread_info *info)
        rc = req_capsule_server_pack(pill);
        if (rc != 0) {
                CERROR("%s: Can't pack response: rc = %d\n",
-                      mdt_obd_name(mdt), rc);
+                      tgt_name(tsi->tsi_tgt), rc);
                RETURN(rc);
        }
 
        /* Prepare the update reply buffer */
        update_reply = req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
        update_init_reply_buf(update_reply, count);
-       info->mti_u.update.mti_update_reply = update_reply;
+       tti->tti_u.update.tti_update_reply = update_reply;
 
-       rc = out_tx_start(env, dt, th);
+       rc = out_tx_start(env, dt, ta);
        if (rc != 0)
                RETURN(rc);
 
        /* Walk through updates in the request to execute them synchronously */
        off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
        for (i = 0; i < count; i++) {
-               struct out_handler *h;
-               struct dt_object   *dt_obj;
+               struct tgt_handler      *h;
+               struct dt_object        *dt_obj;
 
                update = (struct update *)((char *)ubuf + off);
                if (old_batchid == -1) {
@@ -1292,11 +1288,11 @@ int out_handle(struct out_thread_info *info)
                } else if (old_batchid != update->u_batchid) {
                        /* Stop the current update transaction,
                         * create a new one */
-                       rc = out_tx_end(env, th);
+                       rc = out_tx_end(env, ta);
                        if (rc != 0)
                                RETURN(rc);
 
-                       rc = out_tx_start(env, dt, th);
+                       rc = out_tx_start(env, dt, ta);
                        if (rc != 0)
                                RETURN(rc);
                        old_batchid = update->u_batchid;
@@ -1305,7 +1301,7 @@ int out_handle(struct out_thread_info *info)
                fid_le_to_cpu(&update->u_fid, &update->u_fid);
                if (!fid_is_sane(&update->u_fid)) {
                        CERROR("%s: invalid FID "DFID": rc = %d\n",
-                              mdt_obd_name(mdt), PFID(&update->u_fid),
+                              tgt_name(tsi->tsi_tgt), PFID(&update->u_fid),
                               -EPROTO);
                        GOTO(out, rc = err_serious(-EPROTO));
                }
@@ -1314,16 +1310,16 @@ int out_handle(struct out_thread_info *info)
                if (IS_ERR(dt_obj))
                        GOTO(out, rc = PTR_ERR(dt_obj));
 
-               info->mti_u.update.mti_dt_object = dt_obj;
-               info->mti_u.update.mti_update = update;
-               info->mti_u.update.mti_update_reply_index = i;
+               tti->tti_u.update.tti_dt_object = dt_obj;
+               tti->tti_u.update.tti_update = update;
+               tti->tti_u.update.tti_update_reply_index = i;
 
-               h = mdt_handler_find(update->u_type, out_handlers);
+               h = out_handler_find(update->u_type);
                if (likely(h != NULL)) {
                        /* For real modification RPC, check if the update
                         * has been executed */
-                       if (h->mh_flags & MUTABOR) {
-                               struct ptlrpc_request *req = mdt_info_req(info);
+                       if (h->th_flags & MUTABOR) {
+                               struct ptlrpc_request *req = tgt_ses_req(tsi);
 
                                if (out_check_resent(env, dt, dt_obj, req,
                                                     out_reconstruct,
@@ -1331,10 +1327,10 @@ int out_handle(struct out_thread_info *info)
                                        GOTO(next, rc);
                        }
 
-                       rc = h->mh_act(info);
+                       rc = h->th_act(tsi);
                } else {
                        CERROR("%s: The unsupported opc: 0x%x\n",
-                              mdt_obd_name(mdt), update->u_type);
+                              tgt_name(tsi->tsi_tgt), update->u_type);
                        lu_object_put(env, &dt_obj->do_lu);
                        GOTO(out, rc = -ENOTSUPP);
                }
@@ -1345,7 +1341,14 @@ next:
                off += cfs_size_round(update_size(update));
        }
 out:
-       rc1 = out_tx_end(env, th);
-       rc = rc == 0 ? rc1 : rc;
+       rc1 = out_tx_end(env, ta);
+       if (rc == 0)
+               rc = rc1;
        RETURN(rc);
 }
+
+struct tgt_handler tgt_out_handlers[] = {
+TGT_UPDATE_HDL(MUTABOR,        UPDATE_OBJ,     out_handle),
+};
+EXPORT_SYMBOL(tgt_out_handlers);
+
index 8151fda..0ebfbd0 100644 (file)
@@ -184,6 +184,7 @@ static int tgt_filter_recovery_request(struct ptlrpc_request *req,
        switch (lustre_msg_get_opc(req->rq_reqmsg)) {
        case MDS_DISCONNECT:
        case OST_DISCONNECT:
+       case OBD_IDX_READ:
                *process = 1;
                RETURN(0);
        case MDS_CLOSE:
@@ -191,6 +192,7 @@ static int tgt_filter_recovery_request(struct ptlrpc_request *req,
        case MDS_SYNC: /* used in unmounting */
        case OBD_PING:
        case MDS_REINT:
+       case UPDATE_OBJ:
        case SEQ_QUERY:
        case FLD_QUERY:
        case LDLM_ENQUEUE:
@@ -215,23 +217,15 @@ int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
        ENTRY;
 
        switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+       case MDS_CONNECT:
+       case OST_CONNECT:
+       case MGS_CONNECT:
        case SEC_CTX_INIT:
        case SEC_CTX_INIT_CONT:
        case SEC_CTX_FINI:
                RETURN(+1);
        }
 
-       if (unlikely(!class_connected_export(req->rq_export))) {
-               CERROR("%s: operation %d on unconnected export from %s\n",
-                      req->rq_export != NULL ?
-                      req->rq_export->exp_obd->obd_name : "?",
-                      lustre_msg_get_opc(req->rq_reqmsg),
-                      libcfs_id2str(req->rq_peer));
-               req->rq_status = -ENOTCONN;
-               target_send_reply(req, -ENOTCONN, reply_fail_id);
-               RETURN(0);
-       }
-
        if (!req->rq_export->exp_obd->obd_replayable)
                RETURN(+1);
 
@@ -276,6 +270,7 @@ int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
 int tgt_request_handle(struct ptlrpc_request *req)
 {
        struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
+
        struct lustre_msg       *msg = req->rq_reqmsg;
        struct tgt_handler      *h;
        struct tgt_opc_slice    *s;
@@ -303,17 +298,14 @@ int tgt_request_handle(struct ptlrpc_request *req)
                }
        }
 
-       /* this should be assertion actually, but keep it reporting error
-        * for unified target development time */
-       if (req->rq_export == NULL) {
-               CERROR("Request with no export from %s, opcode %u\n",
-                      libcfs_nid2str(req->rq_peer.nid), opc);
-               req->rq_status = -EFAULT;
+       if (unlikely(!class_connected_export(req->rq_export))) {
+               CDEBUG(D_HA, "operation %d on unconnected OST from %s\n",
+                      opc, libcfs_id2str(req->rq_peer));
+               req->rq_status = -ENOTCONN;
                rc = ptlrpc_error(req);
                GOTO(out, rc);
        }
 
-
        tsi->tsi_tgt = tgt = class_exp2tgt(req->rq_export);
        tsi->tsi_exp = req->rq_export;
 
@@ -639,12 +631,145 @@ int tgt_obd_qc_callback(struct tgt_session_info *tsi)
 }
 EXPORT_SYMBOL(tgt_obd_qc_callback);
 
+static int tgt_sendpage(struct tgt_session_info *tsi, struct lu_rdpg *rdpg,
+                       int nob)
+{
+       struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       struct obd_export       *exp = req->rq_export;
+       struct ptlrpc_bulk_desc *desc;
+       struct l_wait_info      *lwi = &tti->tti_u.rdpg.tti_wait_info;
+       int                      tmpcount;
+       int                      tmpsize;
+       int                      i;
+       int                      rc;
+
+       ENTRY;
+
+       desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1, BULK_PUT_SOURCE,
+                                   MDS_BULK_PORTAL);
+       if (desc == NULL)
+               RETURN(-ENOMEM);
+
+       if (!(exp_connect_flags(exp) & OBD_CONNECT_BRW_SIZE))
+               /* old client requires reply size in it's PAGE_CACHE_SIZE,
+                * which is rdpg->rp_count */
+               nob = rdpg->rp_count;
+
+       for (i = 0, tmpcount = nob; i < rdpg->rp_npages && tmpcount > 0;
+            i++, tmpcount -= tmpsize) {
+               tmpsize = min_t(int, tmpcount, PAGE_CACHE_SIZE);
+               ptlrpc_prep_bulk_page_pin(desc, rdpg->rp_pages[i], 0, tmpsize);
+       }
+
+       LASSERT(desc->bd_nob == nob);
+       rc = target_bulk_io(exp, desc, lwi);
+       ptlrpc_free_bulk_pin(desc);
+       RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_sendpage);
+
+/*
+ * OBD_IDX_READ handler
+ */
+int tgt_obd_idx_read(struct tgt_session_info *tsi)
+{
+       struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
+       struct lu_rdpg          *rdpg = &tti->tti_u.rdpg.tti_rdpg;
+       struct idx_info         *req_ii, *rep_ii;
+       int                      rc, i;
+
+       ENTRY;
+
+       memset(rdpg, 0, sizeof(*rdpg));
+       req_capsule_set(tsi->tsi_pill, &RQF_OBD_IDX_READ);
+
+       /* extract idx_info buffer from request & reply */
+       req_ii = req_capsule_client_get(tsi->tsi_pill, &RMF_IDX_INFO);
+       if (req_ii == NULL || req_ii->ii_magic != IDX_INFO_MAGIC)
+               RETURN(err_serious(-EPROTO));
+
+       rc = req_capsule_server_pack(tsi->tsi_pill);
+       if (rc)
+               RETURN(err_serious(rc));
+
+       rep_ii = req_capsule_server_get(tsi->tsi_pill, &RMF_IDX_INFO);
+       if (rep_ii == NULL)
+               RETURN(err_serious(-EFAULT));
+       rep_ii->ii_magic = IDX_INFO_MAGIC;
+
+       /* extract hash to start with */
+       rdpg->rp_hash = req_ii->ii_hash_start;
+
+       /* extract requested attributes */
+       rdpg->rp_attrs = req_ii->ii_attrs;
+
+       /* check that fid packed in request is valid and supported */
+       if (!fid_is_sane(&req_ii->ii_fid))
+               RETURN(-EINVAL);
+       rep_ii->ii_fid = req_ii->ii_fid;
+
+       /* copy flags */
+       rep_ii->ii_flags = req_ii->ii_flags;
+
+       /* compute number of pages to allocate, ii_count is the number of 4KB
+        * containers */
+       if (req_ii->ii_count <= 0)
+               GOTO(out, rc = -EFAULT);
+       rdpg->rp_count = min_t(unsigned int, req_ii->ii_count << LU_PAGE_SHIFT,
+                              exp_max_brw_size(tsi->tsi_exp));
+       rdpg->rp_npages = (rdpg->rp_count + PAGE_CACHE_SIZE -1) >> PAGE_CACHE_SHIFT;
+
+       /* allocate pages to store the containers */
+       OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
+       if (rdpg->rp_pages == NULL)
+               GOTO(out, rc = -ENOMEM);
+       for (i = 0; i < rdpg->rp_npages; i++) {
+               rdpg->rp_pages[i] = alloc_page(GFP_IOFS);
+               if (rdpg->rp_pages[i] == NULL)
+                       GOTO(out, rc = -ENOMEM);
+       }
+
+       /* populate pages with key/record pairs */
+       rc = dt_index_read(tsi->tsi_env, tsi->tsi_tgt->lut_bottom, rep_ii, rdpg);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       LASSERTF(rc <= rdpg->rp_count, "dt_index_read() returned more than "
+                "asked %d > %d\n", rc, rdpg->rp_count);
+
+       /* send pages to client */
+       rc = tgt_sendpage(tsi, rdpg, rc);
+       if (rc)
+               GOTO(out, rc);
+       EXIT;
+out:
+       if (rdpg->rp_pages) {
+               for (i = 0; i < rdpg->rp_npages; i++)
+                       if (rdpg->rp_pages[i])
+                               __free_page(rdpg->rp_pages[i]);
+               OBD_FREE(rdpg->rp_pages,
+                        rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
+       }
+       return rc;
+}
+EXPORT_SYMBOL(tgt_obd_idx_read);
+
+struct tgt_handler tgt_obd_handlers[] = {
+TGT_OBD_HDL    (0,     OBD_PING,               tgt_obd_ping),
+TGT_OBD_HDL_VAR(0,     OBD_LOG_CANCEL,         tgt_obd_log_cancel),
+TGT_OBD_HDL_VAR(0,     OBD_QC_CALLBACK,        tgt_obd_qc_callback),
+TGT_OBD_HDL    (0,     OBD_IDX_READ,           tgt_obd_idx_read)
+};
+EXPORT_SYMBOL(tgt_obd_handlers);
+
 /*
  * Unified target DLM handlers.
  */
 struct ldlm_callback_suite tgt_dlm_cbs = {
-       .lcs_completion = ldlm_server_completion_ast,
-       .lcs_blocking = ldlm_server_blocking_ast,
+       .lcs_completion = ldlm_server_completion_ast,
+       .lcs_blocking   = ldlm_server_blocking_ast,
+       .lcs_glimpse    = ldlm_server_glimpse_ast
 };
 
 int tgt_enqueue(struct tgt_session_info *tsi)
@@ -695,6 +820,15 @@ int tgt_cp_callback(struct tgt_session_info *tsi)
 }
 EXPORT_SYMBOL(tgt_cp_callback);
 
+/* generic LDLM target handler */
+struct tgt_handler tgt_dlm_handlers[] = {
+TGT_DLM_HDL    (HABEO_CLAVIS,  LDLM_ENQUEUE,           tgt_enqueue),
+TGT_DLM_HDL_VAR(HABEO_CLAVIS,  LDLM_CONVERT,           tgt_convert),
+TGT_DLM_HDL_VAR(0,             LDLM_BL_CALLBACK,       tgt_bl_callback),
+TGT_DLM_HDL_VAR(0,             LDLM_CP_CALLBACK,       tgt_cp_callback)
+};
+EXPORT_SYMBOL(tgt_dlm_handlers);
+
 /*
  * Unified target LLOG handlers.
  */
@@ -770,3 +904,30 @@ int tgt_llog_prev_block(struct tgt_session_info *tsi)
        RETURN(rc);
 }
 EXPORT_SYMBOL(tgt_llog_prev_block);
+
+/* generic llog target handler */
+struct tgt_handler tgt_llog_handlers[] = {
+TGT_LLOG_HDL    (0,    LLOG_ORIGIN_HANDLE_CREATE,      tgt_llog_open),
+TGT_LLOG_HDL    (0,    LLOG_ORIGIN_HANDLE_NEXT_BLOCK,  tgt_llog_next_block),
+TGT_LLOG_HDL    (0,    LLOG_ORIGIN_HANDLE_READ_HEADER, tgt_llog_read_header),
+TGT_LLOG_HDL    (0,    LLOG_ORIGIN_HANDLE_PREV_BLOCK,  tgt_llog_prev_block),
+TGT_LLOG_HDL    (0,    LLOG_ORIGIN_HANDLE_DESTROY,     tgt_llog_destroy),
+TGT_LLOG_HDL_VAR(0,    LLOG_ORIGIN_HANDLE_CLOSE,       tgt_llog_close),
+};
+EXPORT_SYMBOL(tgt_llog_handlers);
+
+/*
+ * sec context handlers
+ */
+/* XXX: Implement based on mdt_sec_ctx_handle()? */
+int tgt_sec_ctx_handle(struct tgt_session_info *tsi)
+{
+       return 0;
+}
+
+struct tgt_handler tgt_sec_ctx_handlers[] = {
+TGT_SEC_HDL_VAR(0,     SEC_CTX_INIT,           tgt_sec_ctx_handle),
+TGT_SEC_HDL_VAR(0,     SEC_CTX_INIT_CONT,      tgt_sec_ctx_handle),
+TGT_SEC_HDL_VAR(0,     SEC_CTX_FINI,           tgt_sec_ctx_handle),
+};
+EXPORT_SYMBOL(tgt_sec_ctx_handlers);
index 02cc499..5f45ca1 100644 (file)
 #include <lustre_req_layout.h>
 #include <lustre_sec.h>
 
-extern struct lu_context_key tgt_thread_key;
+struct tx_arg;
+typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th,
+                             struct tx_arg *ta);
+
+struct tx_arg {
+       tx_exec_func_t           exec_fn;
+       tx_exec_func_t           undo_fn;
+       struct dt_object        *object;
+       char                    *file;
+       struct update_reply     *reply;
+       int                      line;
+       int                      index;
+       union {
+               struct {
+                       const struct dt_rec     *rec;
+                       const struct dt_key     *key;
+               } insert;
+               struct {
+               } ref;
+               struct {
+                       struct lu_attr   attr;
+               } attr_set;
+               struct {
+                       struct lu_buf    buf;
+                       const char      *name;
+                       int              flags;
+                       __u32            csum;
+               } xattr_set;
+               struct {
+                       struct lu_attr                  attr;
+                       struct dt_allocation_hint       hint;
+                       struct dt_object_format         dof;
+                       struct lu_fid                   fid;
+               } create;
+               struct {
+                       struct lu_buf   buf;
+                       loff_t          pos;
+               } write;
+               struct {
+                       struct ost_body     *body;
+               } destroy;
+       } u;
+};
+
+#define TX_MAX_OPS       10
+struct thandle_exec_args {
+       struct thandle          *ta_handle;
+       struct dt_device        *ta_dev;
+       int                      ta_err;
+       struct tx_arg            ta_args[TX_MAX_OPS];
+       int                      ta_argno;   /* used args */
+};
 
 /**
  * Common data shared by tg-level handlers. This is allocated per-thread to
@@ -55,8 +106,35 @@ struct tgt_thread_info {
        struct lsd_client_data   tti_lcd;
        struct lu_buf            tti_buf;
        loff_t                   tti_off;
+
+       struct lu_attr           tti_attr;
+       struct lu_fid            tti_fid1;
+
+       /* transno storage during last_rcvd update */
+       __u64                    tti_transno;
+
+       /* Updates data for OUT target */
+       struct thandle_exec_args tti_tea;
+       union {
+               struct {
+                       /* for tgt_readpage()      */
+                       struct lu_rdpg     tti_rdpg;
+                       /* for tgt_sendpage()      */
+                       struct l_wait_info tti_wait_info;
+               } rdpg;
+               struct {
+                       struct dt_object_format  tti_update_dof;
+                       struct update_reply     *tti_update_reply;
+                       struct update           *tti_update;
+                       int                      tti_update_reply_index;
+                       struct obdo              tti_obdo;
+                       struct dt_object        *tti_dt_object;
+               } update;
+       } tti_u;
 };
 
+extern struct lu_context_key tgt_thread_key;
+
 static inline struct tgt_thread_info *tgt_th_info(const struct lu_env *env)
 {
        struct tgt_thread_info *tti;
@@ -80,4 +158,41 @@ static inline int req_xid_is_last(struct ptlrpc_request *req)
                req->rq_xid == lcd->lcd_last_close_xid);
 }
 
+static inline char *dt_obd_name(struct dt_device *dt)
+{
+       return dt->dd_lu_dev.ld_obd->obd_name;
+}
+
+/* Update handlers */
+int out_handle(struct tgt_session_info *tsi);
+
+#define out_tx_create(info, obj, attr, fid, dof, th, reply, idx) \
+       __out_tx_create(info, obj, attr, fid, dof, th, reply, idx, \
+                       __FILE__, __LINE__)
+
+#define out_tx_attr_set(info, obj, attr, th, reply, idx) \
+       __out_tx_attr_set(info, obj, attr, th, reply, idx, \
+                         __FILE__, __LINE__)
+
+#define out_tx_xattr_set(info, obj, buf, name, fl, th, reply, idx)     \
+       __out_tx_xattr_set(info, obj, buf, name, fl, th, reply, idx,    \
+                          __FILE__, __LINE__)
+
+#define out_tx_ref_add(info, obj, th, reply, idx) \
+       __out_tx_ref_add(info, obj, th, reply, idx, __FILE__, __LINE__)
+
+#define out_tx_ref_del(info, obj, th, reply, idx) \
+       __out_tx_ref_del(info, obj, th, reply, idx, __FILE__, __LINE__)
+
+#define out_tx_index_insert(info, obj, th, name, fid, reply, idx) \
+       __out_tx_index_insert(info, obj, th, name, fid, reply, idx, \
+                             __FILE__, __LINE__)
+
+#define out_tx_index_delete(info, obj, th, name, reply, idx) \
+       __out_tx_index_delete(info, obj, th, name, reply, idx, \
+                             __FILE__, __LINE__)
+
+#define out_tx_destroy(info, obj, th, reply, idx) \
+       __out_tx_destroy(info, obj, th, reply, idx, __FILE__, __LINE__)
+
 #endif /* _TG_INTERNAL_H */
index 5c221aa..4bf52f0 100644 (file)
@@ -684,3 +684,149 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
        RETURN(rc);
 }
 EXPORT_SYMBOL(tgt_client_del);
+
+/*
+ * last_rcvd & last_committed update callbacks
+ */
+int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
+                        struct dt_object *obj, __u64 opdata,
+                        struct thandle *th, struct ptlrpc_request *req)
+{
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct tg_export_data   *ted;
+       __u64                   *transno_p;
+       int                      rc = 0;
+       bool                     lw_client, update = false;
+
+       ENTRY;
+
+       /* that can be OUT target and we need tgt_session_info */
+       if (req == NULL) {
+               struct tgt_session_info *tsi = tgt_ses_info(env);
+
+               req = tgt_ses_req(tsi);
+               if (req == NULL) /* echo client case */
+                       RETURN(0);
+       }
+
+       ted = &req->rq_export->exp_target_data;
+
+       lw_client = exp_connect_flags(req->rq_export) & OBD_CONNECT_LIGHTWEIGHT;
+
+       tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
+       spin_lock(&tgt->lut_translock);
+       if (th->th_result != 0) {
+               if (tti->tti_transno != 0) {
+                       CERROR("%s: replay transno "LPU64" failed: rc = %d\n",
+                              tgt_name(tgt), tti->tti_transno, th->th_result);
+               }
+       } else if (tti->tti_transno == 0) {
+               tti->tti_transno = ++tgt->lut_last_transno;
+       } else {
+               /* should be replay */
+               if (tti->tti_transno > tgt->lut_last_transno)
+                       tgt->lut_last_transno = tti->tti_transno;
+       }
+       spin_unlock(&tgt->lut_translock);
+
+       /** VBR: set new versions */
+       if (th->th_result == 0 && obj != NULL)
+               dt_version_set(env, obj, tti->tti_transno, th);
+
+       /* filling reply data */
+       CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
+              tti->tti_transno, tgt->lut_obd->obd_last_committed);
+
+       req->rq_transno = tti->tti_transno;
+       lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
+
+       /* if can't add callback, do sync write */
+       th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, req->rq_export,
+                                               tti->tti_transno);
+
+       if (lw_client) {
+               /* All operations performed by LW clients are synchronous and
+                * we store the committed transno in the last_rcvd header */
+               spin_lock(&tgt->lut_translock);
+               if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
+                       tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
+                       update = true;
+               }
+               spin_unlock(&tgt->lut_translock);
+               /* Although lightweight (LW) connections have no slot in
+                * last_rcvd, we still want to maintain the in-memory
+                * lsd_client_data structure in order to properly handle reply
+                * reconstruction. */
+       } else if (ted->ted_lr_off <= 0) {
+               CERROR("%s: client idx %d has offset %lld\n",
+                      tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
+               RETURN(-EINVAL);
+       }
+
+       /* if the export has already been disconnected, we have no last_rcvd
+        * slot, update server data with latest transno then */
+       if (ted->ted_lcd == NULL) {
+               CWARN("commit transaction for disconnected client %s: rc %d\n",
+                     req->rq_export->exp_client_uuid.uuid, rc);
+               GOTO(srv_update, rc = 0);
+       }
+
+       mutex_lock(&ted->ted_lcd_lock);
+       LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
+       if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
+           lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
+               transno_p = &ted->ted_lcd->lcd_last_close_transno;
+               ted->ted_lcd->lcd_last_close_xid = req->rq_xid;
+               ted->ted_lcd->lcd_last_close_result = th->th_result;
+       } else {
+               /* VBR: save versions in last_rcvd for reconstruct. */
+               __u64 *pre_versions = lustre_msg_get_versions(req->rq_reqmsg);
+
+               if (pre_versions) {
+                       ted->ted_lcd->lcd_pre_versions[0] = pre_versions[0];
+                       ted->ted_lcd->lcd_pre_versions[1] = pre_versions[1];
+                       ted->ted_lcd->lcd_pre_versions[2] = pre_versions[2];
+                       ted->ted_lcd->lcd_pre_versions[3] = pre_versions[3];
+               }
+               transno_p = &ted->ted_lcd->lcd_last_transno;
+               ted->ted_lcd->lcd_last_xid = req->rq_xid;
+               ted->ted_lcd->lcd_last_result = th->th_result;
+               /* XXX: lcd_last_data is __u32 but intent_dispostion is __u64,
+                * see struct ldlm_reply->lock_policy_res1; */
+               ted->ted_lcd->lcd_last_data = opdata;
+       }
+
+       /* Update transno in slot only if non-zero number, i.e. no errors */
+       if (likely(tti->tti_transno != 0)) {
+               if (*transno_p > tti->tti_transno) {
+                       CERROR("%s: trying to overwrite bigger transno:"
+                              "on-disk: "LPU64", new: "LPU64" replay: %d. "
+                              "see LU-617.\n", tgt_name(tgt), *transno_p,
+                              tti->tti_transno, req_is_replay(req));
+                       if (req_is_replay(req)) {
+                               spin_lock(&req->rq_export->exp_lock);
+                               req->rq_export->exp_vbr_failed = 1;
+                               spin_unlock(&req->rq_export->exp_lock);
+                       }
+                       mutex_unlock(&ted->ted_lcd_lock);
+                       RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
+               }
+               *transno_p = tti->tti_transno;
+       }
+
+       if (!lw_client) {
+               tti->tti_off = ted->ted_lr_off;
+               rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
+               if (rc < 0) {
+                       mutex_unlock(&ted->ted_lcd_lock);
+                       RETURN(rc);
+               }
+       }
+       mutex_unlock(&ted->ted_lcd_lock);
+       EXIT;
+srv_update:
+       if (update)
+               rc = tgt_server_data_write(env, tgt, th);
+       return rc;
+}
+EXPORT_SYMBOL(tgt_last_rcvd_update);