Whamcloud - gitweb
LU-12828 ldlm: FLOCK request can be processed twice 40/36340/4
authorAndriy Skulysh <c17819@cray.com>
Thu, 9 Aug 2018 14:58:17 +0000 (17:58 +0300)
committerOleg Drokin <green@whamcloud.com>
Sat, 14 Dec 2019 05:56:32 +0000 (05:56 +0000)
Original request can be processed after resend
request, so it can create a lock on MDT without
client lock or unlock other lock.

Make flock enqueue to use modify RPC slot.

Change-Id: Icfee202fe2e389beda1116f78f8b933c7ea182fb
Cray-bug-id: LUS-5739
Signed-off-by: Andriy Skulysh <c17819@cray.com>
Signed-off-by: Vitaly Fertman <c17818@cray.com>
Reviewed-by: Alexander Boyko <c17825@cray.com>
Reviewed-by: Andrew Perepechko <c17827@cray.com>
Reviewed-on: https://review.whamcloud.com/36340
Tested-by: jenkins <devops@whamcloud.com>
Reviewed-by: Alexandr Boyko <c17825@cray.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
14 files changed:
lustre/include/lu_target.h
lustre/include/lustre_dlm.h
lustre/include/lustre_mdc.h
lustre/include/lustre_net.h
lustre/include/obd_class.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/obdclass/genops.c
lustre/ptlrpc/client.c
lustre/target/tgt_lastrcvd.c

index 6a8afdd..c3fe060 100644 (file)
@@ -220,6 +220,8 @@ struct lu_target {
 #define LUT_REPLY_SLOTS_PER_CHUNK (1<<20)
 #define LUT_REPLY_SLOTS_MAX_CHUNKS 16
 
+#define TRD_INDEX_MEMORY -1
+
 /**
  * Target reply data
  */
@@ -494,10 +496,10 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg,
                           int sync);
 int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt);
 int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd);
-int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
-                      struct tg_export_data *ted, struct tg_reply_data *trd,
-                      struct ptlrpc_request *req,
-                      struct thandle *th, bool update_lrd_file);
+int tgt_mk_reply_data(const struct lu_env *env, struct lu_target *tgt,
+                     struct tg_export_data *ted, struct ptlrpc_request *req,
+                     __u64 opdata, struct thandle *th, bool write_update,
+                     __u64 transno);
 struct tg_reply_data *tgt_lookup_reply_by_xid(struct tg_export_data *ted,
                                               __u64 xid);
 int tgt_tunables_init(struct lu_target *lut);
index 08d9093..181518d 100644 (file)
@@ -1200,6 +1200,7 @@ struct ldlm_enqueue_info {
        void            *ei_namespace;  /** lock namespace **/
        u64             ei_inodebits;   /** lock inode bits **/
        unsigned int    ei_enq_slave:1; /** whether enqueue slave stripes */
+       unsigned int    ei_enq_slot:1;  /** whether acquire rpc slot */
 };
 
 #define ei_res_id      ei_cb_gl
index fef3fa0..3883993 100644 (file)
@@ -63,32 +63,6 @@ struct obd_export;
 struct ptlrpc_request;
 struct obd_device;
 
-static inline void mdc_get_mod_rpc_slot(struct ptlrpc_request *req,
-                                       struct lookup_intent *it)
-{
-       struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
-       __u32 opc;
-       __u16 tag;
-
-       opc = lustre_msg_get_opc(req->rq_reqmsg);
-       tag = obd_get_mod_rpc_slot(cli, opc, it);
-       lustre_msg_set_tag(req->rq_reqmsg, tag);
-       ptlrpc_reassign_next_xid(req);
-}
-
-static inline void mdc_put_mod_rpc_slot(struct ptlrpc_request *req,
-                                       struct lookup_intent *it)
-{
-       struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
-       __u32 opc;
-       __u16 tag;
-
-       opc = lustre_msg_get_opc(req->rq_reqmsg);
-       tag = lustre_msg_get_tag(req->rq_reqmsg);
-       obd_put_mod_rpc_slot(cli, opc, it, tag);
-}
-
-
 /**
  * Update the maximum possible easize.
  *
index 8bee050..a758cb1 100644 (file)
@@ -2167,7 +2167,8 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
 __u64 ptlrpc_next_xid(void);
 __u64 ptlrpc_sample_next_xid(void);
 __u64 ptlrpc_req_xid(struct ptlrpc_request *request);
-void ptlrpc_reassign_next_xid(struct ptlrpc_request *req);
+void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req);
+void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req);
 
 /* Set of routines to run a function in ptlrpcd context */
 void *ptlrpcd_alloc_work(struct obd_import *imp,
index 11ddb0e..ad0f5e8 100644 (file)
@@ -129,10 +129,8 @@ __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli);
 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max);
 int obd_mod_rpc_stats_seq_show(struct client_obd *cli, struct seq_file *seq);
 
-__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
-                          struct lookup_intent *it);
-void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
-                         struct lookup_intent *it, __u16 tag);
+__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc);
+void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag);
 
 struct llog_handle;
 struct llog_rec_hdr;
index 66304f1..3bdf02b 100644 (file)
@@ -1759,6 +1759,9 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
        int local = ns_is_client(ldlm_res_to_ns(res));
        enum ldlm_error rc = ELDLM_OK;
        struct ldlm_interval *node = NULL;
+#ifdef HAVE_SERVER_SUPPORT
+       bool reconstruct = false;
+#endif
        ENTRY;
 
         /* policies are not executed on the client or during replay */
@@ -1814,6 +1817,19 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
        if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
                OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
 
+#ifdef HAVE_SERVER_SUPPORT
+       reconstruct = !local && res->lr_type == LDLM_FLOCK &&
+                     !(*flags & LDLM_FL_TEST_LOCK);
+       if (reconstruct) {
+               rc = req_can_reconstruct(cookie, NULL);
+               if (rc != 0) {
+                       if (rc == 1)
+                               rc = 0;
+                       RETURN(rc);
+               }
+       }
+#endif
+
        lock_res_and_lock(lock);
        if (local && ldlm_is_granted(lock)) {
                 /* The server returned a blocked lock, but it was granted
@@ -1887,6 +1903,16 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
 
 out:
         unlock_res_and_lock(lock);
+
+#ifdef HAVE_SERVER_SUPPORT
+       if (reconstruct) {
+               struct ptlrpc_request *req = cookie;
+
+               tgt_mk_reply_data(NULL, NULL,
+                                 &req->rq_export->exp_target_data,
+                                 req, 0, NULL, false, 0);
+       }
+#endif
         if (node)
                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
         return rc;
index b337009..16ad0e7 100644 (file)
@@ -1349,18 +1349,11 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                lock->l_req_extent = lock->l_policy_data.l_extent;
 
 existing_lock:
-       if (flags & LDLM_FL_HAS_INTENT) {
-               /*
-                * In this case, the reply buffer is allocated deep in
-                * local_lock_enqueue by the policy function.
-                */
-               cookie = req;
-       } else {
-               /*
-                * based on the assumption that lvb size never changes during
+       cookie = req;
+       if (!(flags & LDLM_FL_HAS_INTENT)) {
+               /* based on the assumption that lvb size never changes during
                 * resource life time otherwise it need resource->lr_lock's
-                * protection
-                */
+                * protection */
                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
                                     RCL_SERVER, ldlm_lvbo_size(lock));
 
index fa05c94..ac40691 100644 (file)
@@ -583,6 +583,11 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
        }
 }
 
+static bool ldlm_request_slot_needed(enum ldlm_type type)
+{
+       return type == LDLM_FLOCK || type == LDLM_IBITS;
+}
+
 /**
  * Finishing portion of client lock enqueue code.
  *
@@ -603,6 +608,11 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 
        ENTRY;
 
+       if (ldlm_request_slot_needed(type))
+               obd_put_request_slot(&req->rq_import->imp_obd->u.cli);
+
+       ptlrpc_put_mod_rpc_slot(req);
+
        if (req && req->rq_svc_thread)
                env = req->rq_svc_thread->t_env;
 
@@ -1072,6 +1082,24 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                                             LDLM_GLIMPSE_ENQUEUE);
        }
 
+       /* It is important to obtain modify RPC slot first (if applicable), so
+        * that threads that are waiting for a modify RPC slot are not polluting
+        * our rpcs in flight counter. */
+
+       if (einfo->ei_enq_slot)
+               ptlrpc_get_mod_rpc_slot(req);
+
+       if (ldlm_request_slot_needed(einfo->ei_type)) {
+               rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli);
+               if (rc) {
+                       if (einfo->ei_enq_slot)
+                               ptlrpc_put_mod_rpc_slot(req);
+                       failed_lock_cleanup(ns, lock, einfo->ei_mode);
+                       LDLM_LOCK_RELEASE(lock);
+                       GOTO(out, rc);
+               }
+       }
+
        if (async) {
                LASSERT(reqp != NULL);
                RETURN(0);
@@ -1094,6 +1122,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
        else
                rc = err;
 
+out:
        if (!req_passed_in && req != NULL) {
                ptlrpc_req_finished(req);
                if (reqp)
index 7923428..7bceb53 100644 (file)
@@ -860,6 +860,16 @@ out_lock:
        RETURN(rc);
 }
 
+static inline bool mdc_skip_mod_rpc_slot(const struct lookup_intent *it)
+{
+       if (it != NULL &&
+           (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
+            it->it_op == IT_READDIR ||
+            (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
+               return true;
+       return false;
+}
+
 /* We always reserve enough space in the reply packet for a stripe MD, because
  * we don't know in advance the file type. */
 static int mdc_enqueue_base(struct obd_export *exp,
@@ -871,7 +881,7 @@ static int mdc_enqueue_base(struct obd_export *exp,
                            __u64 extra_lock_flags)
 {
        struct obd_device *obddev = class_exp2obd(exp);
-       struct ptlrpc_request *req = NULL;
+       struct ptlrpc_request *req;
        __u64 flags, saved_flags = extra_lock_flags;
        struct ldlm_res_id res_id;
        static const union ldlm_policy_data lookup_policy = {
@@ -922,6 +932,7 @@ resend:
                LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
                         einfo->ei_type);
                res_id.name[3] = LDLM_FLOCK;
+               req = ldlm_enqueue_pack(exp, 0);
        } else if (it->it_op & IT_OPEN) {
                req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
        } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
@@ -949,20 +960,7 @@ resend:
                req->rq_sent = ktime_get_real_seconds() + resends;
         }
 
-       /* It is important to obtain modify RPC slot first (if applicable), so
-        * that threads that are waiting for a modify RPC slot are not polluting
-        * our rpcs in flight counter.
-        * We do not do flock request limiting, though */
-       if (it) {
-               mdc_get_mod_rpc_slot(req, it);
-               rc = obd_get_request_slot(&obddev->u.cli);
-               if (rc != 0) {
-                       mdc_put_mod_rpc_slot(req, it);
-                       mdc_clear_replay_flag(req, 0);
-                       ptlrpc_req_finished(req);
-                       RETURN(rc);
-               }
-       }
+       einfo->ei_enq_slot = !mdc_skip_mod_rpc_slot(it);
 
        /* With Data-on-MDT the glimpse callback is needed too.
         * It is set here in advance but not in mdc_finish_enqueue()
@@ -973,6 +971,7 @@ resend:
 
        rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
                              0, lvb_type, lockh, 0);
+
        if (!it) {
                /* For flock requests we immediatelly return without further
                   delay and let caller deal with the rest, since rest of
@@ -986,12 +985,10 @@ resend:
                    (einfo->ei_type == LDLM_FLOCK) &&
                    (einfo->ei_mode == LCK_NL))
                        goto resend;
+               ptlrpc_req_finished(req);
                RETURN(rc);
        }
 
-       obd_put_request_slot(&obddev->u.cli);
-       mdc_put_mod_rpc_slot(req, it);
-
        if (rc < 0) {
                CDEBUG(D_INFO,
                      "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
@@ -1335,19 +1332,15 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
        struct obd_export *exp = ga->ga_exp;
        struct md_enqueue_info *minfo = ga->ga_minfo;
        struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
-       struct lookup_intent *it;
-       struct lustre_handle *lockh;
-       struct obd_device *obddev;
-       struct ldlm_reply *lockrep;
-       __u64 flags = LDLM_FL_HAS_INTENT;
+       struct lookup_intent     *it;
+       struct lustre_handle     *lockh;
+       struct ldlm_reply        *lockrep;
+       __u64                     flags = LDLM_FL_HAS_INTENT;
        ENTRY;
 
         it    = &minfo->mi_it;
         lockh = &minfo->mi_lockh;
 
-        obddev = class_exp2obd(exp);
-
-       obd_put_request_slot(&obddev->u.cli);
         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
                 rc = -ETIMEDOUT;
 
@@ -1384,7 +1377,6 @@ int mdc_intent_getattr_async(struct obd_export *exp,
        struct lookup_intent    *it = &minfo->mi_it;
        struct ptlrpc_request   *req;
        struct mdc_getattr_args *ga;
-       struct obd_device       *obddev = class_exp2obd(exp);
        struct ldlm_res_id       res_id;
        union ldlm_policy_data policy = {
                                .l_inodebits = { MDS_INODELOCK_LOOKUP |
@@ -1405,12 +1397,6 @@ int mdc_intent_getattr_async(struct obd_export *exp,
        if (IS_ERR(req))
                RETURN(PTR_ERR(req));
 
-       rc = obd_get_request_slot(&obddev->u.cli);
-       if (rc != 0) {
-               ptlrpc_req_finished(req);
-               RETURN(rc);
-       }
-
        /* With Data-on-MDT the glimpse callback is needed too.
         * It is set here in advance but not in mdc_finish_enqueue()
         * to avoid possible races. It is safe to have glimpse handler
@@ -1421,7 +1407,6 @@ int mdc_intent_getattr_async(struct obd_export *exp,
        rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
                              &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
        if (rc < 0) {
-               obd_put_request_slot(&obddev->u.cli);
                ptlrpc_req_finished(req);
                RETURN(rc);
        }
index cc48828..b733474 100644 (file)
@@ -46,9 +46,9 @@ static int mdc_reint(struct ptlrpc_request *request, int level)
 
         request->rq_send_state = level;
 
-       mdc_get_mod_rpc_slot(request, NULL);
+       ptlrpc_get_mod_rpc_slot(request);
        rc = ptlrpc_queue_wait(request);
-       mdc_put_mod_rpc_slot(request, NULL);
+       ptlrpc_put_mod_rpc_slot(request);
         if (rc)
                 CDEBUG(D_INFO, "error in handling %d\n", rc);
         else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY)) {
index 8b3eefc..a46b44b 100644 (file)
@@ -413,12 +413,12 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
 
         /* make rpc */
         if (opcode == MDS_REINT)
-               mdc_get_mod_rpc_slot(req, NULL);
+               ptlrpc_get_mod_rpc_slot(req);
 
         rc = ptlrpc_queue_wait(req);
 
        if (opcode == MDS_REINT)
-               mdc_put_mod_rpc_slot(req, NULL);
+               ptlrpc_put_mod_rpc_slot(req);
 
         if (rc)
                 ptlrpc_req_finished(req);
@@ -980,9 +980,9 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
 
         ptlrpc_request_set_replen(req);
 
-       mdc_get_mod_rpc_slot(req, NULL);
+       ptlrpc_get_mod_rpc_slot(req);
        rc = ptlrpc_queue_wait(req);
-       mdc_put_mod_rpc_slot(req, NULL);
+       ptlrpc_put_mod_rpc_slot(req);
 
        if (req->rq_repmsg == NULL) {
                CDEBUG(D_RPCTRACE, "request %p failed to send: rc = %d\n", req,
@@ -1757,9 +1757,9 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       mdc_get_mod_rpc_slot(req, NULL);
+       ptlrpc_get_mod_rpc_slot(req);
        rc = ptlrpc_queue_wait(req);
-       mdc_put_mod_rpc_slot(req, NULL);
+       ptlrpc_put_mod_rpc_slot(req);
 
        GOTO(out, rc);
 out:
@@ -1960,9 +1960,9 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       mdc_get_mod_rpc_slot(req, NULL);
+       ptlrpc_get_mod_rpc_slot(req);
        rc = ptlrpc_queue_wait(req);
-       mdc_put_mod_rpc_slot(req, NULL);
+       ptlrpc_put_mod_rpc_slot(req);
 
        GOTO(out, rc);
 out:
@@ -2020,9 +2020,9 @@ static int mdc_ioc_hsm_request(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       mdc_get_mod_rpc_slot(req, NULL);
+       ptlrpc_get_mod_rpc_slot(req);
        rc = ptlrpc_queue_wait(req);
-       mdc_put_mod_rpc_slot(req, NULL);
+       ptlrpc_put_mod_rpc_slot(req);
 
        GOTO(out, rc);
 
index 436e5c7..af218a1 100644 (file)
@@ -2286,15 +2286,6 @@ static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
        return avail;
 }
 
-static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
-{
-       if (it != NULL &&
-           (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
-            it->it_op == IT_READDIR ||
-            (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
-                       return true;
-       return false;
-}
 
 /* Get a modify RPC slot from the obd client @cli according
  * to the kind of operation @opc that is going to be sent
@@ -2304,18 +2295,11 @@ static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
  * Returns the tag to be set in the request message. Tag 0
  * is reserved for non-modifying requests.
  */
-__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
-                          struct lookup_intent *it)
+__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
 {
        bool                    close_req = false;
        __u16                   i, max;
 
-       /* read-only metadata RPCs don't consume a slot on MDT
-        * for reply reconstruction
-        */
-       if (obd_skip_mod_rpc_slot(it))
-               return 0;
-
        if (opc == MDS_CLOSE)
                close_req = true;
 
@@ -2358,15 +2342,13 @@ __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
 
 /* Put a modify RPC slot from the obd client @cli according
- * to the kind of operation @opc that has been sent and the
- * intent @it of the operation if it applies.
+ * to the kind of operation @opc that has been sent.
  */
-void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
-                         struct lookup_intent *it, __u16 tag)
+void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
 {
        bool                    close_req = false;
 
-       if (obd_skip_mod_rpc_slot(it))
+       if (tag == 0)
                return;
 
        if (opc == MDS_CLOSE)
index fbe09e9..94bae92 100644 (file)
@@ -737,7 +737,7 @@ static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req)
 
 static atomic64_t ptlrpc_last_xid;
 
-void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
+static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
 {
        spin_lock(&req->rq_import->imp_lock);
        list_del_init(&req->rq_unreplied_list);
@@ -745,7 +745,32 @@ void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
        spin_unlock(&req->rq_import->imp_lock);
        DEBUG_REQ(D_RPCTRACE, req, "reassign xid");
 }
-EXPORT_SYMBOL(ptlrpc_reassign_next_xid);
+
+void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req)
+{
+       struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+       __u32 opc;
+       __u16 tag;
+
+       opc = lustre_msg_get_opc(req->rq_reqmsg);
+       tag = obd_get_mod_rpc_slot(cli, opc);
+       lustre_msg_set_tag(req->rq_reqmsg, tag);
+       ptlrpc_reassign_next_xid(req);
+}
+EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot);
+
+void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req)
+{
+       __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+       if (tag != 0) {
+               struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+               __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+               obd_put_mod_rpc_slot(cli, opc, tag);
+       }
+}
+EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot);
 
 int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
                             __u32 version, int opcode, char **bufs,
index 99a39c3..398a60a 100644 (file)
@@ -331,7 +331,7 @@ static void tgt_free_reply_data(struct lu_target *lut,
 
        list_del(&trd->trd_list);
        ted->ted_reply_cnt--;
-       if (lut != NULL)
+       if (lut != NULL && trd->trd_index != TRD_INDEX_MEMORY)
                tgt_clear_reply_slot(lut, trd->trd_index);
        OBD_FREE_PTR(trd);
 }
@@ -1166,7 +1166,7 @@ static void tgt_clean_by_tag(struct obd_export *exp, __u64 xid, __u16 tag)
        }
 }
 
-int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
+static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
                       struct tg_export_data *ted, struct tg_reply_data *trd,
                       struct ptlrpc_request *req,
                       struct thandle *th, bool update_lrd_file)
@@ -1181,28 +1181,33 @@ int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
                ted->ted_lcd->lcd_last_transno = lrd->lrd_transno;
        mutex_unlock(&ted->ted_lcd_lock);
 
-       /* find a empty slot */
-       i = tgt_find_free_reply_slot(tgt);
-       if (unlikely(i < 0)) {
-               CERROR("%s: couldn't find a slot for reply data: "
-                      "rc = %d\n", tgt_name(tgt), i);
-               RETURN(i);
-       }
-       trd->trd_index = i;
+       if (tgt != NULL) {
+               /* find a empty slot */
+               i = tgt_find_free_reply_slot(tgt);
+               if (unlikely(i < 0)) {
+                       CERROR("%s: couldn't find a slot for reply data: "
+                              "rc = %d\n", tgt_name(tgt), i);
+                       RETURN(i);
+               }
+               trd->trd_index = i;
 
-       if (update_lrd_file) {
-               loff_t  off;
-               int     rc;
+               if (update_lrd_file) {
+                       loff_t  off;
+                       int     rc;
 
-               /* write reply data to disk */
-               off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
-               rc = tgt_reply_data_write(env, tgt, lrd, off, th);
-               if (unlikely(rc != 0)) {
-                       CERROR("%s: can't update %s file: rc = %d\n",
-                              tgt_name(tgt), REPLY_DATA, rc);
-                       RETURN(rc);
+                       /* write reply data to disk */
+                       off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
+                       rc = tgt_reply_data_write(env, tgt, lrd, off, th);
+                       if (unlikely(rc != 0)) {
+                               CERROR("%s: can't update %s file: rc = %d\n",
+                                      tgt_name(tgt), REPLY_DATA, rc);
+                               RETURN(rc);
+                       }
                }
+       } else {
+               trd->trd_index = TRD_INDEX_MEMORY;
        }
+
        /* add reply data to target export's reply list */
        mutex_lock(&ted->ted_lcd_lock);
        if (req != NULL) {
@@ -1231,7 +1236,64 @@ int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
 
        RETURN(0);
 }
-EXPORT_SYMBOL(tgt_add_reply_data);
+
+int tgt_mk_reply_data(const struct lu_env *env,
+                     struct lu_target *tgt,
+                     struct tg_export_data *ted,
+                     struct ptlrpc_request *req,
+                     __u64 opdata,
+                     struct thandle *th,
+                     bool write_update,
+                     __u64 transno)
+{
+       struct tg_reply_data    *trd;
+       struct lsd_reply_data   *lrd;
+       __u64                   *pre_versions = NULL;
+       int                     rc;
+
+       OBD_ALLOC_PTR(trd);
+       if (unlikely(trd == NULL))
+               RETURN(-ENOMEM);
+
+       /* fill reply data information */
+       lrd = &trd->trd_reply;
+       lrd->lrd_transno = transno;
+       if (req != NULL) {
+               lrd->lrd_xid = req->rq_xid;
+               trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
+               lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
+               if (write_update) {
+                       pre_versions = lustre_msg_get_versions(req->rq_repmsg);
+                       lrd->lrd_result = th->th_result;
+               }
+       } else {
+               struct tgt_session_info *tsi;
+
+               LASSERT(env != NULL);
+               tsi = tgt_ses_info(env);
+               LASSERT(tsi->tsi_xid != 0);
+
+               lrd->lrd_xid = tsi->tsi_xid;
+               lrd->lrd_result = tsi->tsi_result;
+               lrd->lrd_client_gen = tsi->tsi_client_gen;
+       }
+
+       lrd->lrd_data = opdata;
+       if (pre_versions) {
+               trd->trd_pre_versions[0] = pre_versions[0];
+               trd->trd_pre_versions[1] = pre_versions[1];
+               trd->trd_pre_versions[2] = pre_versions[2];
+               trd->trd_pre_versions[3] = pre_versions[3];
+       }
+
+       rc = tgt_add_reply_data(env, tgt, ted, trd, req,
+                               th, write_update);
+       if (rc < 0)
+               OBD_FREE_PTR(trd);
+       return rc;
+
+}
+EXPORT_SYMBOL(tgt_mk_reply_data);
 
 /*
  * last_rcvd & last_committed update callbacks
@@ -1322,48 +1384,8 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
 
        /* Target that supports multiple reply data */
        if (tgt_is_multimodrpcs_client(exp)) {
-               struct tg_reply_data    *trd;
-               struct lsd_reply_data   *lrd;
-               __u64                   *pre_versions;
-               bool                    write_update;
-
-               OBD_ALLOC_PTR(trd);
-               if (unlikely(trd == NULL))
-                       RETURN(-ENOMEM);
-
-               /* fill reply data information */
-               lrd = &trd->trd_reply;
-               lrd->lrd_transno = tti->tti_transno;
-               if (req != NULL) {
-                       lrd->lrd_xid = req->rq_xid;
-                       trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
-                       pre_versions = lustre_msg_get_versions(req->rq_repmsg);
-                       lrd->lrd_result = th->th_result;
-                       lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
-                       write_update = true;
-               } else {
-                       LASSERT(tsi->tsi_xid != 0);
-                       lrd->lrd_xid = tsi->tsi_xid;
-                       lrd->lrd_result = tsi->tsi_result;
-                       lrd->lrd_client_gen = tsi->tsi_client_gen;
-                       trd->trd_tag = 0;
-                       pre_versions = NULL;
-                       write_update = false;
-               }
-
-               lrd->lrd_data = opdata;
-               if (pre_versions) {
-                       trd->trd_pre_versions[0] = pre_versions[0];
-                       trd->trd_pre_versions[1] = pre_versions[1];
-                       trd->trd_pre_versions[2] = pre_versions[2];
-                       trd->trd_pre_versions[3] = pre_versions[3];
-               }
-
-               rc = tgt_add_reply_data(env, tgt, ted, trd, req,
-                                       th, write_update);
-               if (rc < 0)
-                       OBD_FREE_PTR(trd);
-               return rc;
+               return tgt_mk_reply_data(env, tgt, ted, req, opdata, th,
+                                        !!(req != NULL), tti->tti_transno);
        }
 
        /* Enough for update replay, let's return */