Whamcloud - gitweb
LU-11444 ptlrpc: resend may corrupt the data 14/35114/15
authorAndriy Skulysh <c17819@cray.com>
Sat, 8 Jun 2019 11:30:55 +0000 (14:30 +0300)
committerOleg Drokin <green@whamcloud.com>
Mon, 30 Sep 2019 23:12:15 +0000 (23:12 +0000)
Late resend if arrives much later than another modification RPC
which has been already handled on this slot, may be still applied
and therefore overrides the last one

Send RPCs from client in increasing order for each tag
and check it on server to check late resend.

A slot can be reused by a client after kill while
the server continue to rely on it.

Add flag for such obsolete requests, here we trust the
client and perform xid check for all in progress requests.

Cray-bug-id: LUS-6272, LUS-7277, LUS-7339
Signed-off-by: Andriy Skulysh <c17819@cray.com>
Reviewed-by: Vitaly Fertman <c17818@cray.com>
Reviewed-by: Alexander Boyko <c17825@cray.com>
Reviewed-by: Andrew Perepechko <c17827@cray.com>
Change-Id: I97806577cf979f49a75379ffc55947cc3dcd02b1
Reviewed-on: https://review.whamcloud.com/35114
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Alexandr Boyko <c17825@cray.com>
Reviewed-by: Mike Pershin <mpershin@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
18 files changed:
lustre/include/lu_target.h
lustre/include/lustre_export.h
lustre/include/lustre_mdc.h
lustre/include/lustre_net.h
lustre/include/obd_support.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/llite/llite_lib.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_reint.c
lustre/obdclass/genops.c
lustre/ptlrpc/client.c
lustre/ptlrpc/service.c
lustre/target/tgt_handler.c
lustre/target/tgt_internal.h
lustre/target/tgt_lastrcvd.c
lustre/tests/recovery-small.sh
lustre/tests/sanity.sh

index caca284..6a8afdd 100644 (file)
@@ -455,7 +455,7 @@ void tgt_register_lfsck_query(int (*query)(const struct lu_env *,
                                           struct lfsck_request *,
                                           struct lfsck_reply *,
                                           struct lfsck_query *));
-bool req_can_reconstruct(struct ptlrpc_request *req, struct tg_reply_data *trd);
+int req_can_reconstruct(struct ptlrpc_request *req, struct tg_reply_data *trd);
 
 extern struct tgt_handler tgt_sec_ctx_handlers[];
 extern struct tgt_handler tgt_lfsck_handlers[];
@@ -493,9 +493,10 @@ int tgt_client_new(const struct lu_env *env, struct obd_export *exp);
 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg,
                           int sync);
 int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt);
-bool tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd);
+int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd);
 int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
                       struct tg_export_data *ted, struct tg_reply_data *trd,
+                      struct ptlrpc_request *req,
                       struct thandle *th, bool update_lrd_file);
 struct tg_reply_data *tgt_lookup_reply_by_xid(struct tg_export_data *ted,
                                               __u64 xid);
index 83e3bbe..5862cb8 100644 (file)
@@ -307,7 +307,8 @@ struct obd_export {
        /** highest XID received by export client that has no
         * unreceived lower-numbered XID
         */
-       __u64                     exp_last_xid;
+       __u64                   exp_last_xid;
+       long                    *exp_used_slots;
 };
 
 #define exp_target_data u.eu_target_data
index 8c76386..fef3fa0 100644 (file)
@@ -73,6 +73,7 @@ static inline void mdc_get_mod_rpc_slot(struct ptlrpc_request *req,
        opc = lustre_msg_get_opc(req->rq_reqmsg);
        tag = obd_get_mod_rpc_slot(cli, opc, it);
        lustre_msg_set_tag(req->rq_reqmsg, tag);
+       ptlrpc_reassign_next_xid(req);
 }
 
 static inline void mdc_put_mod_rpc_slot(struct ptlrpc_request *req,
index fd1589a..c3b61ef 100644 (file)
@@ -999,7 +999,8 @@ struct ptlrpc_request {
        unsigned int
                rq_hp:1,                /**< high priority RPC */
                rq_at_linked:1,         /**< link into service's srv_at_array */
-               rq_packed_final:1;      /**< packed final reply */
+               rq_packed_final:1,      /**< packed final reply */
+               rq_obsolete:1;          /* aborted by a signal on a client */
        /** @} */
 
        /** one of RQ_PHASE_* */
@@ -2169,6 +2170,7 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
 __u64 ptlrpc_next_xid(void);
 __u64 ptlrpc_sample_next_xid(void);
 __u64 ptlrpc_req_xid(struct ptlrpc_request *request);
+void ptlrpc_reassign_next_xid(struct ptlrpc_request *req);
 
 /* Set of routines to run a function in ptlrpcd context */
 void *ptlrpcd_alloc_work(struct obd_import *imp,
index 8a120a5..b71fc45 100644 (file)
@@ -450,6 +450,7 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK 0x51c
 #define OBD_FAIL_PTLRPC_CLIENT_BULK_CB3  0x520
 #define OBD_FAIL_PTLRPC_BULK_ATTACH      0x521
+#define OBD_FAIL_PTLRPC_RESEND_RACE     0x525
 #define OBD_FAIL_PTLRPC_ROUND_XID       0x530
 #define OBD_FAIL_PTLRPC_CONNECT_RACE    0x531
 
index 36b5571..57dd526 100644 (file)
@@ -896,6 +896,7 @@ struct ptlrpc_body_v2 {
                                OBD_CONNECT2_FLR |\
                                OBD_CONNECT2_LOCK_CONVERT | \
                                OBD_CONNECT2_ARCHIVE_ID_ARRAY | \
+                               OBD_CONNECT2_INC_XID | \
                                OBD_CONNECT2_SELINUX_POLICY | \
                                OBD_CONNECT2_LSOM | \
                                OBD_CONNECT2_ASYNC_DISCARD | \
@@ -920,7 +921,7 @@ struct ptlrpc_body_v2 {
                                OBD_CONNECT_GRANT_PARAM | \
                                OBD_CONNECT_SHORTIO | OBD_CONNECT_FLAGS2)
 
-#define OST_CONNECT_SUPPORTED2 OBD_CONNECT2_LOCKAHEAD
+#define OST_CONNECT_SUPPORTED2 (OBD_CONNECT2_LOCKAHEAD | OBD_CONNECT2_INC_XID)
 
 #define ECHO_CONNECT_SUPPORTED (OBD_CONNECT_FID)
 #define ECHO_CONNECT_SUPPORTED2 0
index 38bd1d4..1edf572 100644 (file)
@@ -255,6 +255,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                   OBD_CONNECT2_FLR |
                                   OBD_CONNECT2_LOCK_CONVERT |
                                   OBD_CONNECT2_ARCHIVE_ID_ARRAY |
+                                  OBD_CONNECT2_INC_XID |
                                   OBD_CONNECT2_LSOM |
                                   OBD_CONNECT2_ASYNC_DISCARD |
                                   OBD_CONNECT2_PCC;
@@ -464,7 +465,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
        data->ocd_connect_flags |= OBD_CONNECT_LOCKAHEAD_OLD;
 #endif
 
-       data->ocd_connect_flags2 = OBD_CONNECT2_LOCKAHEAD;
+       data->ocd_connect_flags2 = OBD_CONNECT2_LOCKAHEAD |
+                                  OBD_CONNECT2_INC_XID;
 
        if (!OBD_FAIL_CHECK(OBD_FAIL_OSC_CONNECT_GRANT_PARAM))
                data->ocd_connect_flags |= OBD_CONNECT_GRANT_PARAM;
index 6db4735..f74b3fc 100644 (file)
@@ -3924,7 +3924,7 @@ void mdt_intent_fixup_resent(struct mdt_thread_info *info,
         * If the xid matches, then we know this is a resent request, and allow
         * it. (It's probably an OPEN, for which we don't send a lock.
         */
-       if (req_can_reconstruct(req, NULL))
+       if (req_can_reconstruct(req, NULL) == 1)
                return;
 
         /*
@@ -6314,6 +6314,11 @@ static int mdt_init_export(struct obd_export *exp)
        exp->exp_connecting = 1;
        spin_unlock(&exp->exp_lock);
 
+       OBD_ALLOC(exp->exp_used_slots,
+                 BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
+       if (exp->exp_used_slots == NULL)
+               RETURN(-ENOMEM);
+
         /* self-export doesn't need client data and ldlm initialization */
         if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
                                      &exp->exp_client_uuid)))
@@ -6332,6 +6337,10 @@ static int mdt_init_export(struct obd_export *exp)
 err_free:
        tgt_client_free(exp);
 err:
+       OBD_FREE(exp->exp_used_slots,
+                BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
+       exp->exp_used_slots = NULL;
+
        CERROR("%s: Failed to initialize export: rc = %d\n",
               exp->exp_obd->obd_name, rc);
        return rc;
@@ -6342,6 +6351,10 @@ static int mdt_destroy_export(struct obd_export *exp)
         ENTRY;
 
         target_destroy_export(exp);
+       if (exp->exp_used_slots)
+               OBD_FREE(exp->exp_used_slots,
+                        BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
+
         /* destroy can be called from failed obd_setup, so
          * checking uuid is safer than obd_self_export */
         if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
index c3900a9..7fe6b87 100644 (file)
@@ -1157,13 +1157,13 @@ static inline int mdt_check_resent(struct mdt_thread_info *info,
                if (info->mti_reply_data == NULL)
                        RETURN(-ENOMEM);
 
-               if (req_can_reconstruct(req, info->mti_reply_data)) {
+               rc = req_can_reconstruct(req, info->mti_reply_data);
+               if (rc == 1) {
                        reconstruct(info, lhc);
-                       rc = 1;
                } else {
                        DEBUG_REQ(D_HA, req,
-                                 "no reply data found for RESENT req");
-                       rc = 0;
+                                 "no reply data found for RESENT req, rc = %d",
+                                 rc);
                }
                OBD_FREE_PTR(info->mti_reply_data);
                info->mti_reply_data = NULL;
index 241bed1..0fa2b6b 100644 (file)
@@ -649,6 +649,8 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
        if (info->mti_dlm_req)
                ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
 
+       OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
+
        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
        mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
        if (IS_ERR(mo))
@@ -1129,6 +1131,11 @@ static int mdt_reint_link(struct mdt_thread_info *info,
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
                RETURN(err_serious(-ENOENT));
 
+       if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE)) {
+               req->rq_no_reply = 1;
+               RETURN(err_serious(-ENOENT));
+       }
+
        if (info->mti_dlm_req)
                ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
 
index 32a4727..df1a31d 100644 (file)
@@ -2356,6 +2356,12 @@ __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
                        LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
                        spin_unlock(&cli->cl_mod_rpcs_lock);
                        /* tag 0 is reserved for non-modify RPCs */
+
+                       CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
+                              "opc %u, max %hu\n",
+                              cli->cl_import->imp_obd->obd_name,
+                              i + 1, opc, max);
+
                        return i + 1;
                }
                spin_unlock(&cli->cl_mod_rpcs_lock);
index 48ade0f..fbe09e9 100644 (file)
@@ -737,6 +737,16 @@ static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req)
 
 static atomic64_t ptlrpc_last_xid;
 
+void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
+{
+       spin_lock(&req->rq_import->imp_lock);
+       list_del_init(&req->rq_unreplied_list);
+       ptlrpc_assign_next_xid_nolock(req);
+       spin_unlock(&req->rq_import->imp_lock);
+       DEBUG_REQ(D_RPCTRACE, req, "reassign xid");
+}
+EXPORT_SYMBOL(ptlrpc_reassign_next_xid);
+
 int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
                             __u32 version, int opcode, char **bufs,
                             struct ptlrpc_cli_ctx *ctx)
index 1d39dd0..974969b 100644 (file)
@@ -41,6 +41,7 @@
 #include <lu_object.h>
 #include <uapi/linux/lnet/lnet-types.h>
 #include "ptlrpc_internal.h"
+#include <linux/delay.h>
 
 /* The following are visible and mutable through /sys/module/ptlrpc */
 int test_req_buffer_pressure = 0;
@@ -1024,6 +1025,30 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
        }
 }
 
+static void ptlrpc_add_exp_list_nolock(struct ptlrpc_request *req,
+                                      struct obd_export *export, bool hp)
+{
+       __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+       if (hp)
+               list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
+       else
+               list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
+       if (tag && export->exp_used_slots)
+               set_bit(tag - 1, export->exp_used_slots);
+}
+
+static void ptlrpc_del_exp_list(struct ptlrpc_request *req)
+{
+       __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+       spin_lock(&req->rq_export->exp_rpc_lock);
+       list_del_init(&req->rq_exp_list);
+       if (tag && !req->rq_obsolete && req->rq_export->exp_used_slots)
+               clear_bit(tag - 1, req->rq_export->exp_used_slots);
+       spin_unlock(&req->rq_export->exp_rpc_lock);
+}
+
 /** Change request export and move hp request from old export to new */
 void ptlrpc_request_change_export(struct ptlrpc_request *req,
                                  struct obd_export *export)
@@ -1031,19 +1056,13 @@ void ptlrpc_request_change_export(struct ptlrpc_request *req,
        if (req->rq_export != NULL) {
                LASSERT(!list_empty(&req->rq_exp_list));
                /* remove rq_exp_list from last export */
-               spin_lock(&req->rq_export->exp_rpc_lock);
-               list_del_init(&req->rq_exp_list);
-               spin_unlock(&req->rq_export->exp_rpc_lock);
-               /*
-                * export has one reference already, so it`s safe to
+               ptlrpc_del_exp_list(req);
+               /* export has one reference already, so it's safe to
                 * add req to export queue here and get another
                 * reference for request later
                 */
                spin_lock(&export->exp_rpc_lock);
-               if (req->rq_ops != NULL) /* hp request */
-                       list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
-               else
-                       list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
+               ptlrpc_add_exp_list_nolock(req, export, req->rq_ops != NULL);
                spin_unlock(&export->exp_rpc_lock);
 
                class_export_rpc_dec(req->rq_export);
@@ -1664,6 +1683,47 @@ found:
        return tmp;
 }
 
+#ifdef HAVE_SERVER_SUPPORT
+static void ptlrpc_server_mark_obsolete(struct ptlrpc_request *req)
+{
+       req->rq_obsolete = 1;
+}
+
+static void
+ptlrpc_server_mark_in_progress_obsolete(struct ptlrpc_request *req)
+{
+       struct ptlrpc_request   *tmp = NULL;
+       __u16                   tag;
+
+       if (!tgt_is_increasing_xid_client(req->rq_export) ||
+           req->rq_export->exp_used_slots == NULL)
+               return;
+
+       tag = lustre_msg_get_tag(req->rq_reqmsg);
+       if (tag == 0)
+               return;
+
+       if (!test_bit(tag - 1, req->rq_export->exp_used_slots))
+               return;
+
+       /* This list should not be longer than max_requests in
+        * flights on the client, so it is not all that long.
+        * Also we only hit this codepath in case of a resent
+        * request which makes it even more rarely hit */
+       list_for_each_entry(tmp, &req->rq_export->exp_reg_rpcs, rq_exp_list) {
+               if (tag == lustre_msg_get_tag(tmp->rq_reqmsg) &&
+                   req->rq_xid > tmp->rq_xid)
+                       ptlrpc_server_mark_obsolete(tmp);
+
+       }
+       list_for_each_entry(tmp, &req->rq_export->exp_hp_rpcs, rq_exp_list) {
+               if (tag == lustre_msg_get_tag(tmp->rq_reqmsg) &&
+                   req->rq_xid > tmp->rq_xid)
+                       ptlrpc_server_mark_obsolete(tmp);
+       }
+}
+#endif
+
 /**
  * Check if a request should be assigned with a high priority.
  *
@@ -1721,9 +1781,7 @@ static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req)
                if (req->rq_ops && req->rq_ops->hpreq_fini)
                        req->rq_ops->hpreq_fini(req);
 
-               spin_lock(&req->rq_export->exp_rpc_lock);
-               list_del_init(&req->rq_exp_list);
-               spin_unlock(&req->rq_export->exp_rpc_lock);
+               ptlrpc_del_exp_list(req);
        }
        EXIT;
 }
@@ -1770,7 +1828,7 @@ static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
        hp = rc > 0;
        ptlrpc_nrs_req_initialize(svcpt, req, hp);
 
-       if (req->rq_export != NULL) {
+       while (req->rq_export != NULL) {
                struct obd_export *exp = req->rq_export;
 
                /*
@@ -1778,7 +1836,18 @@ static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
                 * atomically
                 */
                spin_lock_bh(&exp->exp_rpc_lock);
+#ifdef HAVE_SERVER_SUPPORT
+               ptlrpc_server_mark_in_progress_obsolete(req);
+#endif
                orig = ptlrpc_server_check_resend_in_progress(req);
+               if (orig && OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE)) {
+                       spin_unlock_bh(&exp->exp_rpc_lock);
+
+                       OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
+                       msleep(4 * MSEC_PER_SEC);
+                       continue;
+               }
+
                if (orig && likely(atomic_inc_not_zero(&orig->rq_refcount))) {
                        bool linked;
 
@@ -1801,14 +1870,17 @@ static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
                                ptlrpc_at_add_timed(orig);
                        ptlrpc_server_drop_request(orig);
                        ptlrpc_nrs_req_finalize(req);
+
+                       /* don't mark slot unused for resend in progress */
+                       req->rq_obsolete = 1;
+
                        RETURN(-EBUSY);
                }
 
-               if (hp || req->rq_ops != NULL)
-                       list_add(&req->rq_exp_list, &exp->exp_hp_rpcs);
-               else
-                       list_add(&req->rq_exp_list, &exp->exp_reg_rpcs);
+               ptlrpc_add_exp_list_nolock(req, exp, hp || req->rq_ops != NULL);
+
                spin_unlock_bh(&exp->exp_rpc_lock);
+               break;
        }
 
        /*
index d9d5a81..e41c59c 100644 (file)
@@ -552,7 +552,7 @@ static int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
 
        /* sanity check: if the xid matches, the request must be marked as a
         * resent or replayed */
-       if (req_can_reconstruct(req, NULL)) {
+       if (req_can_reconstruct(req, NULL) == 1) {
                if (!(lustre_msg_get_flags(req->rq_reqmsg) &
                      (MSG_RESENT | MSG_REPLAY))) {
                        DEBUG_REQ(D_WARNING, req,
@@ -629,18 +629,20 @@ static struct tgt_handler *tgt_handler_find_check(struct ptlrpc_request *req)
 static int process_req_last_xid(struct ptlrpc_request *req)
 {
        __u64   last_xid;
+       int rc = 0;
+       struct obd_export *exp = req->rq_export;
+       struct tg_export_data *ted = &exp->exp_target_data;
+       bool need_lock = tgt_is_multimodrpcs_client(exp);
        ENTRY;
 
+       if (need_lock)
+               mutex_lock(&ted->ted_lcd_lock);
        /* check request's xid is consistent with export's last_xid */
        last_xid = lustre_msg_get_last_xid(req->rq_reqmsg);
-       if (last_xid > req->rq_export->exp_last_xid)
-               req->rq_export->exp_last_xid = last_xid;
+       if (last_xid > exp->exp_last_xid)
+               exp->exp_last_xid = last_xid;
 
-       if (req->rq_xid == 0 ||
-           (req->rq_xid <= req->rq_export->exp_last_xid)) {
-               DEBUG_REQ(D_WARNING, req,
-                         "unexpected rq_xid=%llx != exp_last_xid=%llx",
-                         req->rq_xid, req->rq_export->exp_last_xid);
+       if (req->rq_xid == 0 || req->rq_xid <= exp->exp_last_xid) {
                /* Some request is allowed to be sent during replay,
                 * such as OUT update requests, FLD requests, so it
                 * is possible that replay requests has smaller XID
@@ -657,7 +659,13 @@ static int process_req_last_xid(struct ptlrpc_request *req)
                 * - The former RPC got chance to be processed;
                 */
                if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY))
-                       RETURN(-EPROTO);
+                       rc = -EPROTO;
+
+               DEBUG_REQ(D_WARNING, req,
+                         "unexpected xid=%llx != exp_last_xid=%llx, rc = %d",
+                         req->rq_xid, exp->exp_last_xid, rc);
+               if (rc)
+                       GOTO(out, rc);
        }
 
        /* The "last_xid" is the minimum xid among unreplied requests,
@@ -671,18 +679,19 @@ static int process_req_last_xid(struct ptlrpc_request *req)
         */
        if (req->rq_export->exp_conn_cnt >
            lustre_msg_get_conn_cnt(req->rq_reqmsg))
-               RETURN(-ESTALE);
+               GOTO(out, rc = -ESTALE);
 
        /* try to release in-memory reply data */
-       if (tgt_is_multimodrpcs_client(req->rq_export)) {
-               tgt_handle_received_xid(req->rq_export,
-                               lustre_msg_get_last_xid(req->rq_reqmsg));
-               if (!(lustre_msg_get_flags(req->rq_reqmsg) &
-                     (MSG_RESENT | MSG_REPLAY)))
-                       tgt_handle_tag(req->rq_export,
-                                      lustre_msg_get_tag(req->rq_reqmsg));
+       if (tgt_is_multimodrpcs_client(exp)) {
+               tgt_handle_received_xid(exp, last_xid);
+               rc = tgt_handle_tag(req);
        }
-       RETURN(0);
+
+out:
+       if (need_lock)
+               mutex_unlock(&ted->ted_lcd_lock);
+
+       RETURN(rc);
 }
 
 int tgt_request_handle(struct ptlrpc_request *req)
@@ -2729,12 +2738,12 @@ EXPORT_SYMBOL(tgt_brw_write);
 /* Check if request can be reconstructed from saved reply data
  * A copy of the reply data is returned in @trd if the pointer is not NULL
  */
-bool req_can_reconstruct(struct ptlrpc_request *req,
+int req_can_reconstruct(struct ptlrpc_request *req,
                         struct tg_reply_data *trd)
 {
        struct tg_export_data *ted = &req->rq_export->exp_target_data;
        struct lsd_client_data *lcd = ted->ted_lcd;
-       bool found;
+       int found;
 
        if (tgt_is_multimodrpcs_client(req->rq_export))
                return tgt_lookup_reply(req, trd);
index 91617b1..39fb410 100644 (file)
@@ -241,7 +241,7 @@ int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
 int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
                    void *cookie);
 int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid);
-int tgt_handle_tag(struct obd_export *exp, __u16 tag);
+int tgt_handle_tag(struct ptlrpc_request *req);
 
 void update_records_dump(const struct update_records *records,
                         unsigned int mask, bool dump_updates);
index bcb4ff9..b36908b 100644 (file)
@@ -1145,8 +1145,30 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
 }
 EXPORT_SYMBOL(tgt_client_del);
 
+static void tgt_clean_by_tag(struct obd_export *exp, __u64 xid, __u16 tag)
+{
+       struct tg_export_data   *ted = &exp->exp_target_data;
+       struct lu_target        *lut = class_exp2tgt(exp);
+       struct tg_reply_data    *trd, *tmp;
+
+       if (tag == 0)
+               return;
+
+       list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
+               if (trd->trd_tag != tag)
+                       continue;
+
+               LASSERT(ergo(tgt_is_increasing_xid_client(exp),
+                            trd->trd_reply.lrd_xid <= xid));
+
+               ted->ted_release_tag++;
+               tgt_release_reply_data(lut, ted, trd);
+       }
+}
+
 int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
                       struct tg_export_data *ted, struct tg_reply_data *trd,
+                      struct ptlrpc_request *req,
                       struct thandle *th, bool update_lrd_file)
 {
        struct lsd_reply_data   *lrd;
@@ -1183,6 +1205,19 @@ int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
        }
        /* add reply data to target export's reply list */
        mutex_lock(&ted->ted_lcd_lock);
+       if (req != NULL) {
+               int exclude = tgt_is_increasing_xid_client(req->rq_export) ?
+                             MSG_REPLAY : MSG_REPLAY|MSG_RESENT;
+
+               if (req->rq_obsolete) {
+                       mutex_unlock(&ted->ted_lcd_lock);
+                       RETURN(-EALREADY);
+               }
+
+               if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude))
+                       tgt_clean_by_tag(req->rq_export, req->rq_xid,
+                                        trd->trd_tag);
+       }
        list_add(&trd->trd_list, &ted->ted_reply_list);
        ted->ted_reply_cnt++;
        if (ted->ted_reply_cnt > ted->ted_reply_max)
@@ -1192,7 +1227,8 @@ int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
        CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, "
               "tag %hu, client gen %u, slot idx %d\n",
               trd, lrd->lrd_xid, lrd->lrd_transno,
-              trd->trd_tag, lrd->lrd_client_gen, i);
+              trd->trd_tag, lrd->lrd_client_gen, trd->trd_index);
+
        RETURN(0);
 }
 EXPORT_SYMBOL(tgt_add_reply_data);
@@ -1323,7 +1359,8 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
                        trd->trd_pre_versions[3] = pre_versions[3];
                }
 
-               rc = tgt_add_reply_data(env, tgt, ted, trd, th, write_update);
+               rc = tgt_add_reply_data(env, tgt, ted, trd, req,
+                                       th, write_update);
                if (rc < 0)
                        OBD_FREE_PTR(trd);
                return rc;
@@ -2044,43 +2081,70 @@ out:
        return rc;
 }
 
-struct tg_reply_data *tgt_lookup_reply_by_xid(struct tg_export_data *ted,
-                                             __u64 xid)
+static int tgt_check_lookup_req(struct ptlrpc_request *req, int lookup,
+                               struct tg_reply_data *trd)
 {
-       struct tg_reply_data    *found = NULL;
-       struct tg_reply_data    *reply;
+       struct tg_export_data *ted = &req->rq_export->exp_target_data;
+       struct lu_target *lut = class_exp2tgt(req->rq_export);
+       __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+       int rc = 0;
+       struct tg_reply_data *reply;
+       bool check_increasing;
+
+       if (tag == 0)
+               return 0;
+
+       check_increasing = tgt_is_increasing_xid_client(req->rq_export) &&
+                          !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
+       if (!lookup && !check_increasing)
+               return 0;
 
-       mutex_lock(&ted->ted_lcd_lock);
        list_for_each_entry(reply, &ted->ted_reply_list, trd_list) {
-               if (reply->trd_reply.lrd_xid == xid) {
-                       found = reply;
+               if (lookup && reply->trd_reply.lrd_xid == req->rq_xid) {
+                       rc = 1;
+                       if (trd != NULL)
+                               *trd = *reply;
+                       break;
+               } else if (check_increasing && reply->trd_tag == tag &&
+                          reply->trd_reply.lrd_xid > req->rq_xid) {
+                       rc = -EPROTO;
+                       CERROR("%s: busy tag=%u req_xid=%llu, trd=%p: xid=%llu transno=%llu client_gen=%u slot_idx=%d: rc = %d\n",
+                              tgt_name(lut), tag, req->rq_xid, trd,
+                              reply->trd_reply.lrd_xid,
+                              reply->trd_reply.lrd_transno,
+                              reply->trd_reply.lrd_client_gen,
+                              reply->trd_index, rc);
                        break;
                }
        }
-       mutex_unlock(&ted->ted_lcd_lock);
-       return found;
+
+       return rc;
 }
-EXPORT_SYMBOL(tgt_lookup_reply_by_xid);
 
 /* Look for a reply data matching specified request @req
  * A copy is returned in @trd if the pointer is not NULL
  */
-bool tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
+int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
 {
-       struct tg_export_data   *ted = &req->rq_export->exp_target_data;
-       struct tg_reply_data    *reply;
-       bool                     found = false;
-
-       reply = tgt_lookup_reply_by_xid(ted, req->rq_xid);
-       if (reply != NULL) {
-               found = true;
-               if (trd != NULL)
-                       *trd = *reply;
+       struct tg_export_data *ted = &req->rq_export->exp_target_data;
+       int found = 0;
+       bool not_replay = !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
+
+       mutex_lock(&ted->ted_lcd_lock);
+       if (not_replay && req->rq_xid <= req->rq_export->exp_last_xid) {
+               /* A check for the last_xid is needed here in case there is
+                * no reply data is left in the list. It may happen if another
+                * RPC on another slot increased the last_xid between our
+                * process_req_last_xid & tgt_lookup_reply calls */
+               found = -EPROTO;
+       } else {
+               found = tgt_check_lookup_req(req, 1, trd);
        }
+       mutex_unlock(&ted->ted_lcd_lock);
 
-       CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d\n",
-              tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid,
-              found ? 1 : 0);
+       CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d last_xid %llu\n",
+              tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid, found,
+              req->rq_export->exp_last_xid);
 
        return found;
 }
@@ -2092,37 +2156,19 @@ int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid)
        struct lu_target        *lut = class_exp2tgt(exp);
        struct tg_reply_data    *trd, *tmp;
 
-       mutex_lock(&ted->ted_lcd_lock);
+
        list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
                if (trd->trd_reply.lrd_xid > rcvd_xid)
                        continue;
                ted->ted_release_xid++;
                tgt_release_reply_data(lut, ted, trd);
        }
-       mutex_unlock(&ted->ted_lcd_lock);
 
        return 0;
 }
 
-int tgt_handle_tag(struct obd_export *exp, __u16 tag)
+int tgt_handle_tag(struct ptlrpc_request *req)
 {
-       struct tg_export_data   *ted = &exp->exp_target_data;
-       struct lu_target        *lut = class_exp2tgt(exp);
-       struct tg_reply_data    *trd, *tmp;
-
-       if (tag == 0)
-               return 0;
-
-       mutex_lock(&ted->ted_lcd_lock);
-       list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
-               if (trd->trd_tag != tag)
-                       continue;
-               ted->ted_release_tag++;
-               tgt_release_reply_data(lut, ted, trd);
-               break;
-       }
-       mutex_unlock(&ted->ted_lcd_lock);
-
-       return 0;
+       return tgt_check_lookup_req(req, 0, NULL);
 }
 
index 8f7c48d..0ce0509 100755 (executable)
@@ -2847,6 +2847,34 @@ test_136() {
 }
 run_test 136 "changelog_deregister leaving pending records"
 
+test_137() {
+       df $DIR
+       mkdir -p $DIR/d1
+       mkdir -p $DIR/d2
+       dd if=/dev/zero of=$DIR/d1/$tfile bs=4096 count=1
+       dd if=/dev/zero of=$DIR/d2/$tfile bs=4096 count=1
+       cancel_lru_locks osc
+
+       #define OBD_FAIL_PTLRPC_RESEND_RACE      0x525
+       do_facet $SINGLEMDS "lctl set_param fail_loc=0x80000525"
+
+       # RPC1: any reply is to be delayed to disable last_xid logic
+       ln $DIR/d1/$tfile $DIR/d1/f2 &
+       sleep 1
+
+       # RPC2: setattr1 reply is delayed & resent
+       # original reply comes to client; the resend get asleep
+       chmod 666 $DIR/d2/$tfile
+
+       # RPC3: setattr2 on the same file; run ahead of RPC2 resend
+       chmod 777 $DIR/d2/$tfile
+
+       # RPC2 resend wakes up
+       sleep 5
+       [ $(stat -c "%a" $DIR/d2/$tfile) == 777 ] || error "resend got applied"
+}
+run_test 137 "late resend must be skipped if already applied"
+
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status
index 4453d11..644563f 100644 (file)
@@ -21351,6 +21351,46 @@ test_421g() {
 }
 run_test 421g "rmfid to return errors properly"
 
+test_422() {
+       test_mkdir -i 0 -c 1 -p $DIR/$tdir/d1
+       test_mkdir -i 0 -c 1 -p $DIR/$tdir/d2
+       test_mkdir -i 0 -c 1 -p $DIR/$tdir/d3
+       dd if=/dev/zero of=$DIR/$tdir/d1/file1 bs=1k count=1
+       dd if=/dev/zero of=$DIR/$tdir/d2/file1 bs=1k count=1
+
+       local amc=$(at_max_get client)
+       local amo=$(at_max_get mds1)
+       local timeout=`lctl get_param -n timeout`
+
+       at_max_set 0 client
+       at_max_set 0 mds1
+
+#define OBD_FAIL_PTLRPC_PAUSE_REQ        0x50a
+       do_facet mds1 $LCTL set_param fail_loc=0x8000050a \
+                       fail_val=$(((2*timeout + 10)*1000))
+       touch $DIR/$tdir/d3/file &
+       sleep 2
+#define OBD_FAIL_TGT_REPLY_DATA_RACE    0x722
+       do_facet mds1 $LCTL set_param fail_loc=0x80000722 \
+                       fail_val=$((2*timeout + 5))
+       mv $DIR/$tdir/d1/file1 $DIR/$tdir/d1/file2 &
+       local pid=$!
+       sleep 1
+       kill -9 $pid
+       sleep $((2 * timeout))
+       echo kill $pid
+       kill -9 $pid
+       lctl mark touch
+       touch $DIR/$tdir/d2/file3
+       touch $DIR/$tdir/d2/file4
+       touch $DIR/$tdir/d2/file5
+
+       wait
+       at_max_set $amc client
+       at_max_set $amo mds1
+}
+run_test 422 "kill a process with RPC in progress"
+
 prep_801() {
        [[ $(lustre_version_code mds1) -lt $(version_code 2.9.55) ]] ||
        [[ $OST1_VERSION -lt $(version_code 2.9.55) ]] &&