Whamcloud - gitweb
LU-7434 ptlrpc: lost bulk leads to a hang 53/19953/4
authorVitaly Fertman <vitaly.fertman@seagate.com>
Tue, 1 Mar 2016 23:46:31 +0000 (02:46 +0300)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 16 Jun 2016 22:16:12 +0000 (22:16 +0000)
This is a combination of two commits. The original commit from:
http://review.whamcloud.com/17221
and a fix to that commit from:
http://review.whamcloud.com/19758

Description of the original patch:
The reverse order of request_out_callback() and reply_in_callback()
puts the RPC into UNREGISTERING state, which is waiting for RPC &
bulk md unlink, whereas only RPC md unlink has been called so far.
If bulk is lost, even expired_set does not check for UNREGISTERING
state.

The same for write if server returns an error.

This phase is ambiguous, split to UNREG_RPC and UNREG_BULK.

The fix to the original commit was originally pushed against LU-8062.
That fix is described thusly:
LU-8062 test: fix fail_val in recovery-small/115b

The fail_loc OBD_FAIL_OST_ENOSPC is caught in 2 places:
1. ofd_statfs() - where it is checked against the fail_val
and if fail_val matches the OST ID it is caught here
itself and hence test fails.
2. tgt_brw_write() - the actual place where the fail_loc should
be caught.

The patch makes the fail_loc to be caught at the appropriate place
by setting the fail_val to $OSTCOUNT. So even if fail_loc is set
in the statfs part it is not caught there and therefore caught in
tgt_brw_write().

Please note the author of the original patch was Vitaly Fertman
<vitaly.fertman@seagate.com>, and the author of the test fix was
Bhagyesh Dudhediya <bhagyesh.dudhediya@seagate.com>

Test-Parameters: testlist=recovery-small,recovery-small,recovery-small,recovery-small,recovery-small,recovery-small
Signed-off-by: Vitaly Fertman <vitaly.fertman@seagate.com>
Signed-off-by: Bhagyesh Dudhediya <bhagyesh.dudhediya@seagate.com>
Signed-off-by: Chris Horn <hornc@cray.com>
Seagate-bug-id:  MRP-2953, MRP-3206, MRP-3150
Reviewed-by: Andriy Skulysh <andriy.skulysh@seagate.com>
Reviewed-by: Alexey Leonidovich Lyashkov <alexey.lyashkov@seagate.com>
Tested-by: Elena V. Gryaznova <elena.gryaznova@seagate.com>
Change-Id: I17319d40881c41f247c102aafc3a1b0db82d0b4a
Reviewed-on: http://review.whamcloud.com/19953
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Ann Koehler <amk@cray.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_net.h
lustre/include/obd_support.h
lustre/ptlrpc/client.c
lustre/ptlrpc/import.c
lustre/ptlrpc/niobuf.c
lustre/target/tgt_handler.c
lustre/tests/conf-sanity.sh
lustre/tests/recovery-small.sh
lustre/tests/test-framework.sh

index 8858384..019aec0 100644 (file)
@@ -714,13 +714,14 @@ struct ptlrpc_thread;
 
 /** RPC stages */
 enum rq_phase {
-        RQ_PHASE_NEW            = 0xebc0de00,
-        RQ_PHASE_RPC            = 0xebc0de01,
-        RQ_PHASE_BULK           = 0xebc0de02,
-        RQ_PHASE_INTERPRET      = 0xebc0de03,
-        RQ_PHASE_COMPLETE       = 0xebc0de04,
-        RQ_PHASE_UNREGISTERING  = 0xebc0de05,
-        RQ_PHASE_UNDEFINED      = 0xebc0de06
+       RQ_PHASE_NEW            = 0xebc0de00,
+       RQ_PHASE_RPC            = 0xebc0de01,
+       RQ_PHASE_BULK           = 0xebc0de02,
+       RQ_PHASE_INTERPRET      = 0xebc0de03,
+       RQ_PHASE_COMPLETE       = 0xebc0de04,
+       RQ_PHASE_UNREG_RPC      = 0xebc0de05,
+       RQ_PHASE_UNREG_BULK     = 0xebc0de06,
+       RQ_PHASE_UNDEFINED      = 0xebc0de07
 };
 
 /** Type of request interpreter call-back */
@@ -795,6 +796,8 @@ struct ptlrpc_cli_req {
        time_t                           cr_reply_deadline;
        /** when req bulk unlink must finish. */
        time_t                           cr_bulk_deadline;
+       /** when req unlink must finish. */
+       time_t                           cr_req_deadline;
        /** Portal to which this request would be sent */
        short                            cr_req_ptl;
        /** Portal where to wait for reply and where reply would be sent */
@@ -853,6 +856,7 @@ struct ptlrpc_cli_req {
 #define rq_real_sent           rq_cli.cr_sent_out
 #define rq_reply_deadline      rq_cli.cr_reply_deadline
 #define rq_bulk_deadline       rq_cli.cr_bulk_deadline
+#define rq_req_deadline                rq_cli.cr_req_deadline
 #define rq_nr_resend           rq_cli.cr_resend_nr
 #define rq_request_portal      rq_cli.cr_req_ptl
 #define rq_reply_portal                rq_cli.cr_rep_ptl
@@ -1228,22 +1232,24 @@ static inline void lustre_set_rep_swabbed(struct ptlrpc_request *req,
 static inline const char *
 ptlrpc_phase2str(enum rq_phase phase)
 {
-        switch (phase) {
-        case RQ_PHASE_NEW:
-                return "New";
-        case RQ_PHASE_RPC:
-                return "Rpc";
-        case RQ_PHASE_BULK:
-                return "Bulk";
-        case RQ_PHASE_INTERPRET:
-                return "Interpret";
-        case RQ_PHASE_COMPLETE:
-                return "Complete";
-        case RQ_PHASE_UNREGISTERING:
-                return "Unregistering";
-        default:
-                return "?Phase?";
-        }
+       switch (phase) {
+       case RQ_PHASE_NEW:
+               return "New";
+       case RQ_PHASE_RPC:
+               return "Rpc";
+       case RQ_PHASE_BULK:
+               return "Bulk";
+       case RQ_PHASE_INTERPRET:
+               return "Interpret";
+       case RQ_PHASE_COMPLETE:
+               return "Complete";
+       case RQ_PHASE_UNREG_RPC:
+               return "UnregRPC";
+       case RQ_PHASE_UNREG_BULK:
+               return "UnregBULK";
+       default:
+               return "?Phase?";
+       }
 }
 
 /**
@@ -1264,18 +1270,18 @@ ptlrpc_rqphase2str(struct ptlrpc_request *req)
 #define FLAG(field, str) (field ? str : "")
 
 /** Convert bit flags into a string */
-#define DEBUG_REQ_FLAGS(req)                                                    \
-        ptlrpc_rqphase2str(req),                                                \
-        FLAG(req->rq_intr, "I"), FLAG(req->rq_replied, "R"),                    \
-        FLAG(req->rq_err, "E"),                                                 \
-        FLAG(req->rq_timedout, "X") /* eXpired */, FLAG(req->rq_resend, "S"),   \
-        FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"),                  \
-        FLAG(req->rq_no_resend, "N"),                                           \
-        FLAG(req->rq_waiting, "W"),                                             \
-        FLAG(req->rq_wait_ctx, "C"), FLAG(req->rq_hp, "H"),                     \
-        FLAG(req->rq_committed, "M")
-
-#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s%s%s"
+#define DEBUG_REQ_FLAGS(req)                                                   \
+       ptlrpc_rqphase2str(req),                                               \
+       FLAG(req->rq_intr, "I"), FLAG(req->rq_replied, "R"),                   \
+       FLAG(req->rq_err, "E"), FLAG(req->rq_net_err, "e"),                    \
+       FLAG(req->rq_timedout, "X") /* eXpired */, FLAG(req->rq_resend, "S"),  \
+       FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"),                 \
+       FLAG(req->rq_no_resend, "N"),                                          \
+       FLAG(req->rq_waiting, "W"),                                            \
+       FLAG(req->rq_wait_ctx, "C"), FLAG(req->rq_hp, "H"),                    \
+       FLAG(req->rq_committed, "M")
+
+#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s%s%s%s"
 
 void _debug_req(struct ptlrpc_request *req,
                 struct libcfs_debug_msg_data *data, const char *fmt, ...)
@@ -2044,17 +2050,16 @@ int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async);
 static inline int ptlrpc_client_bulk_active(struct ptlrpc_request *req)
 {
        struct ptlrpc_bulk_desc *desc;
-        int                      rc;
+       int rc;
 
-        LASSERT(req != NULL);
+       LASSERT(req != NULL);
        desc = req->rq_bulk;
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK) &&
-            req->rq_bulk_deadline > cfs_time_current_sec())
-                return 1;
+       if (req->rq_bulk_deadline > cfs_time_current_sec())
+               return 1;
 
-        if (!desc)
-                return 0;
+       if (!desc)
+               return 0;
 
        spin_lock(&desc->bd_lock);
        rc = desc->bd_md_count;
@@ -2430,13 +2435,20 @@ ptlrpc_rqphase_move(struct ptlrpc_request *req, enum rq_phase new_phase)
        if (req->rq_phase == new_phase)
                return;
 
-       if (new_phase == RQ_PHASE_UNREGISTERING) {
+       if (new_phase == RQ_PHASE_UNREG_RPC ||
+           new_phase == RQ_PHASE_UNREG_BULK) {
+               /* No embedded unregistering phases */
+               if (req->rq_phase == RQ_PHASE_UNREG_RPC ||
+                   req->rq_phase == RQ_PHASE_UNREG_BULK)
+                       return;
+
                req->rq_next_phase = req->rq_phase;
                if (req->rq_import)
                        atomic_inc(&req->rq_import->imp_unregistering);
        }
 
-       if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
+       if (req->rq_phase == RQ_PHASE_UNREG_RPC ||
+           req->rq_phase == RQ_PHASE_UNREG_BULK) {
                if (req->rq_import)
                        atomic_dec(&req->rq_import->imp_unregistering);
        }
@@ -2453,9 +2465,6 @@ ptlrpc_rqphase_move(struct ptlrpc_request *req, enum rq_phase new_phase)
 static inline int
 ptlrpc_client_early(struct ptlrpc_request *req)
 {
-        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
-            req->rq_reply_deadline > cfs_time_current_sec())
-                return 0;
         return req->rq_early;
 }
 
@@ -2465,20 +2474,18 @@ ptlrpc_client_early(struct ptlrpc_request *req)
 static inline int
 ptlrpc_client_replied(struct ptlrpc_request *req)
 {
-        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
-            req->rq_reply_deadline > cfs_time_current_sec())
-                return 0;
-        return req->rq_replied;
+       if (req->rq_reply_deadline > cfs_time_current_sec())
+               return 0;
+       return req->rq_replied;
 }
 
 /** Returns true if request \a req is in process of receiving server reply */
 static inline int
 ptlrpc_client_recv(struct ptlrpc_request *req)
 {
-        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
-            req->rq_reply_deadline > cfs_time_current_sec())
-                return 1;
-        return req->rq_receiving_reply;
+       if (req->rq_reply_deadline > cfs_time_current_sec())
+               return 1;
+       return req->rq_receiving_reply;
 }
 
 static inline int
@@ -2487,11 +2494,15 @@ ptlrpc_client_recv_or_unlink(struct ptlrpc_request *req)
        int rc;
 
        spin_lock(&req->rq_lock);
-       if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
-           req->rq_reply_deadline > cfs_time_current_sec()) {
+       if (req->rq_reply_deadline > cfs_time_current_sec()) {
+               spin_unlock(&req->rq_lock);
+               return 1;
+       }
+       if (req->rq_req_deadline > cfs_time_current_sec()) {
                spin_unlock(&req->rq_lock);
                return 1;
        }
+
        rc = !req->rq_req_unlinked || !req->rq_reply_unlinked ||
             req->rq_receiving_reply;
        spin_unlock(&req->rq_lock);
index a6a83d9..87dcb30 100644 (file)
@@ -415,6 +415,9 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_PTLRPC_CLIENT_BULK_CB2  0x515
 #define OBD_FAIL_PTLRPC_DELAY_IMP_FULL   0x516
 #define OBD_FAIL_PTLRPC_CANCEL_RESEND    0x517
+#define OBD_FAIL_PTLRPC_DROP_BULK       0x51a
+#define OBD_FAIL_PTLRPC_LONG_REQ_UNLINK  0x51b
+#define OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK 0x51c
 #define OBD_FAIL_PTLRPC_CLIENT_BULK_CB3  0x520
 
 #define OBD_FAIL_OBD_PING_NET            0x600
index ecb2384..f550adf 100644 (file)
@@ -731,6 +731,8 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
        request->rq_reply_cbid.cbid_arg = request;
 
        request->rq_reply_deadline = 0;
+       request->rq_bulk_deadline = 0;
+       request->rq_req_deadline = 0;
        request->rq_phase = RQ_PHASE_NEW;
        request->rq_next_phase = RQ_PHASE_UNDEFINED;
 
@@ -742,6 +744,35 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
        lustre_msg_set_opc(request->rq_reqmsg, opcode);
        ptlrpc_assign_next_xid(request);
 
+       /* Let's setup deadline for req/reply/bulk unlink for opcode. */
+       if (cfs_fail_val == opcode) {
+               time_t *fail_t = NULL, *fail2_t = NULL;
+
+               if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK))
+                       fail_t = &request->rq_bulk_deadline;
+               else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK))
+                       fail_t = &request->rq_reply_deadline;
+               else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK))
+                       fail_t = &request->rq_req_deadline;
+               else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK)) {
+                       fail_t = &request->rq_reply_deadline;
+                       fail2_t = &request->rq_bulk_deadline;
+               }
+
+               if (fail_t) {
+                       *fail_t = cfs_time_current_sec() + LONG_UNLINK;
+
+                       if (fail2_t)
+                               *fail2_t = cfs_time_current_sec() + LONG_UNLINK;
+
+                       /* The RPC is infected, let the test to change the
+                        * fail_loc */
+                       set_current_state(TASK_UNINTERRUPTIBLE);
+                       schedule_timeout(cfs_time_seconds(2));
+                       set_current_state(TASK_RUNNING);
+               }
+       }
+
        RETURN(0);
 
 out_ctx:
@@ -1704,25 +1735,40 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                if (!(req->rq_phase == RQ_PHASE_RPC ||
                      req->rq_phase == RQ_PHASE_BULK ||
                      req->rq_phase == RQ_PHASE_INTERPRET ||
-                     req->rq_phase == RQ_PHASE_UNREGISTERING)) {
-                        DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
-                        LBUG();
-                }
+                     req->rq_phase == RQ_PHASE_UNREG_RPC ||
+                     req->rq_phase == RQ_PHASE_UNREG_BULK)) {
+                       DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
+                       LBUG();
+               }
 
-                if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
-                        LASSERT(req->rq_next_phase != req->rq_phase);
-                        LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
+               if (req->rq_phase == RQ_PHASE_UNREG_RPC ||
+                   req->rq_phase == RQ_PHASE_UNREG_BULK) {
+                       LASSERT(req->rq_next_phase != req->rq_phase);
+                       LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
+
+                       if (req->rq_req_deadline &&
+                           !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK))
+                               req->rq_req_deadline = 0;
+                       if (req->rq_reply_deadline &&
+                           !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK))
+                               req->rq_reply_deadline = 0;
+                       if (req->rq_bulk_deadline &&
+                           !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK))
+                               req->rq_bulk_deadline = 0;
 
-                        /*
-                         * Skip processing until reply is unlinked. We
-                         * can't return to pool before that and we can't
-                         * call interpret before that. We need to make
-                         * sure that all rdma transfers finished and will
-                         * not corrupt any data.
-                         */
-                        if (ptlrpc_client_recv_or_unlink(req) ||
-                            ptlrpc_client_bulk_active(req))
-                                continue;
+                       /*
+                        * Skip processing until reply is unlinked. We
+                        * can't return to pool before that and we can't
+                        * call interpret before that. We need to make
+                        * sure that all rdma transfers finished and will
+                        * not corrupt any data.
+                        */
+                       if (req->rq_phase == RQ_PHASE_UNREG_RPC &&
+                           ptlrpc_client_recv_or_unlink(req))
+                               continue;
+                       if (req->rq_phase == RQ_PHASE_UNREG_BULK &&
+                           ptlrpc_client_bulk_active(req))
+                               continue;
 
                         /*
                          * Turn fail_loc off to prevent it from looping
@@ -2193,7 +2239,7 @@ static void ptlrpc_interrupted_set(void *data)
                        continue;
 
                if (req->rq_phase != RQ_PHASE_RPC &&
-                   req->rq_phase != RQ_PHASE_UNREGISTERING &&
+                   req->rq_phase != RQ_PHASE_UNREG_RPC &&
                    !req->rq_allow_intr)
                        continue;
 
@@ -2519,12 +2565,11 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
         */
        LASSERT(!in_interrupt());
 
-       /*
-        * Let's setup deadline for reply unlink.
-        */
-        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
-            async && request->rq_reply_deadline == 0)
-                request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK;
+       /* Let's setup deadline for reply unlink. */
+       if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
+           async && request->rq_reply_deadline == 0 && cfs_fail_val == 0)
+               request->rq_reply_deadline =
+                       cfs_time_current_sec() + LONG_UNLINK;
 
         /*
          * Nothing left to do.
@@ -2540,10 +2585,8 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
         if (!ptlrpc_client_recv_or_unlink(request))
                 RETURN(1);
 
-        /*
-         * Move to "Unregistering" phase as reply was not unlinked yet.
-         */
-        ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING);
+       /* Move to "Unregistering" phase as reply was not unlinked yet. */
+       ptlrpc_rqphase_move(request, RQ_PHASE_UNREG_RPC);
 
         /*
          * Do not wait for unlink to finish.
@@ -3277,7 +3320,6 @@ static void ptlrpcd_add_work_req(struct ptlrpc_request *req)
        req->rq_timeout         = obd_timeout;
        req->rq_sent            = cfs_time_current_sec();
        req->rq_deadline        = req->rq_sent + req->rq_timeout;
-       req->rq_reply_deadline  = req->rq_deadline;
        req->rq_phase           = RQ_PHASE_INTERPRET;
        req->rq_next_phase      = RQ_PHASE_COMPLETE;
        req->rq_xid             = ptlrpc_next_xid();
index 5ac3837..2b839e5 100644 (file)
@@ -372,10 +372,9 @@ void ptlrpc_invalidate_import(struct obd_import *imp)
                                                  "still on delayed list");
                                }
 
-                               CERROR("%s: RPCs in \"%s\" phase found (%d). "
+                               CERROR("%s: Unregistering RPCs found (%d). "
                                       "Network is sluggish? Waiting them "
                                       "to error out.\n", cli_tgt,
-                                      ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
                                       atomic_read(&imp->imp_unregistering));
                        }
                        spin_unlock(&imp->imp_lock);
index e7a694a..349638b 100644 (file)
@@ -428,7 +428,7 @@ int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async)
 
        /* Let's setup deadline for reply unlink. */
        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK) &&
-           async && req->rq_bulk_deadline == 0)
+           async && req->rq_bulk_deadline == 0 && cfs_fail_val == 0)
                req->rq_bulk_deadline = cfs_time_current_sec() + LONG_UNLINK;
 
        if (ptlrpc_client_bulk_active(req) == 0)        /* completed or */
@@ -445,14 +445,14 @@ int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async)
        if (ptlrpc_client_bulk_active(req) == 0)        /* completed or */
                RETURN(1);                              /* never registered */
 
-        /* Move to "Unregistering" phase as bulk was not unlinked yet. */
-        ptlrpc_rqphase_move(req, RQ_PHASE_UNREGISTERING);
+       /* Move to "Unregistering" phase as bulk was not unlinked yet. */
+       ptlrpc_rqphase_move(req, RQ_PHASE_UNREG_BULK);
 
-        /* Do not wait for unlink to finish. */
-        if (async)
-                RETURN(0);
+       /* Do not wait for unlink to finish. */
+       if (async)
+               RETURN(0);
 
-        for (;;) {
+       for (;;) {
                /* The wq argument is ignored by user-space wait_event macros */
                wait_queue_head_t *wq = (req->rq_set != NULL) ?
                                        &req->rq_set->set_waitq :
index c7157cc..69ef827 100644 (file)
@@ -1980,7 +1980,8 @@ int tgt_brw_read(struct tgt_session_info *tsi)
        /* Check if client was evicted while we were doing i/o before touching
         * network */
        if (likely(rc == 0 &&
-                  !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
+                  !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2) &&
+                  !CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_BULK))) {
                rc = target_bulk_io(exp, desc, &lwi);
                no_reply = rc != 0;
        }
index ae35999..ba15cbe 100755 (executable)
@@ -3199,8 +3199,8 @@ test_45() { #17310
        df -h $MOUNT &
        log "sleep 60 sec"
        sleep 60
-       #define OBD_FAIL_PTLRPC_LONG_UNLINK   0x50f
-       do_facet client "$LCTL set_param fail_loc=0x50f"
+#define OBD_FAIL_PTLRPC_LONG_REPL_UNLINK   0x50f
+       do_facet client "$LCTL set_param fail_loc=0x50f fail_val=0"
        log "sleep 10 sec"
        sleep 10
        manual_umount_client --force || error "manual_umount_client failed"
index e833e58..1089c6d 100755 (executable)
@@ -2118,6 +2118,123 @@ test_112a() {
 }
 run_test 112a "bulk resend while orignal request is in progress"
 
+test_115_read() {
+       local fail1=$1
+       local fail2=$2
+
+       df $DIR
+       dd if=/dev/zero of=$DIR/$tfile bs=4096 count=1
+       cancel_lru_locks osc
+
+       # OST_READ       =  3,
+       $LCTL set_param fail_loc=$fail1 fail_val=3
+       dd of=/dev/null if=$DIR/$tfile bs=4096 count=1 &
+       pid=$!
+       sleep 1
+
+       set_nodes_failloc "$(osts_nodes)" $fail2
+
+       wait $pid || error "dd failed"
+       return 0
+}
+
+test_115_write() {
+       local fail1=$1
+       local fail2=$2
+       local error=$3
+       local fail_val2=${4:-0}
+
+       df $DIR
+       touch $DIR/$tfile
+
+       # OST_WRITE      =  4,
+       $LCTL set_param fail_loc=$fail1 fail_val=4
+       dd if=/dev/zero of=$DIR/$tfile bs=4096 count=1 oflag=dsync &
+       pid=$!
+       sleep 1
+
+       df $MOUNT
+       set_nodes_failloc "$(osts_nodes)" $fail2 $fail_val2
+
+       wait $pid
+       rc=$?
+       [ $error -eq 0 ] && [ $rc -ne 0 ] && error "dd error ($rc)"
+       [ $error -ne 0 ] && [ $rc -eq 0 ] && error "dd success"
+       return 0
+}
+
+test_115a() {
+       [ $(lustre_version_code ost1) -lt $(version_code 2.8.50) ] &&
+               skip "need at least 2.8.50 on OST" && return 0
+
+       #define OBD_FAIL_PTLRPC_LONG_REQ_UNLINK  0x51b
+       #define OBD_FAIL_PTLRPC_DROP_BULK        0x51a
+       test_115_read 0x8000051b 0x8000051a
+}
+run_test 115a "read: late REQ MDunlink and no bulk"
+
+test_115b() {
+       [ $(lustre_version_code ost1) -lt $(version_code 2.8.50) ] &&
+               skip "need at least 2.8.50 on OST" && return 0
+
+       #define OBD_FAIL_PTLRPC_LONG_REQ_UNLINK  0x51b
+       #define OBD_FAIL_OST_ENOSPC              0x215
+
+       # pass $OSTCOUNT for the fail_loc to be caught
+       # appropriately by the IO thread
+       test_115_write 0x8000051b 0x80000215 1 $OSTCOUNT
+}
+run_test 115b "write: late REQ MDunlink and no bulk"
+
+test_115c() {
+       [ $(lustre_version_code ost1) -lt $(version_code 2.8.50) ] &&
+               skip "need at least 2.8.50 on OST" && return 0
+
+       #define OBD_FAIL_PTLRPC_LONG_REPL_UNLINK 0x50f
+       #define OBD_FAIL_PTLRPC_DROP_BULK        0x51a
+       test_115_read 0x8000050f 0x8000051a
+}
+run_test 115c "read: late Reply MDunlink and no bulk"
+
+test_115d() {
+       [ $(lustre_version_code ost1) -lt $(version_code 2.8.50) ] &&
+               skip "need at least 2.8.50 on OST" && return 0
+
+       #define OBD_FAIL_PTLRPC_LONG_REPL_UNLINK 0x50f
+       #define OBD_FAIL_OST_ENOSPC              0x215
+       test_115_write 0x8000050f 0x80000215 0
+}
+run_test 115d "write: late Reply MDunlink and no bulk"
+
+test_115e() {
+       [ $(lustre_version_code ost1) -lt $(version_code 2.8.50) ] &&
+               skip "need at least 2.8.50 on OST" && return 0
+
+       #define OBD_FAIL_PTLRPC_LONG_BULK_UNLINK 0x510
+       #define OBD_FAIL_OST_ALL_REPLY_NET       0x211
+       test_115_read 0x80000510 0x80000211
+}
+run_test 115e "read: late Bulk MDunlink and no reply"
+
+test_115f() {
+       [ $(lustre_version_code ost1) -lt $(version_code 2.8.50) ] &&
+               skip "need at least 2.8.50 on OST" && return 0
+
+       #define OBD_FAIL_PTLRPC_LONG_REQ_UNLINK  0x51b
+       #define OBD_FAIL_OST_ALL_REPLY_NET       0x211
+       test_115_read 0x8000051b 0x80000211
+}
+run_test 115f "read: late REQ MDunlink and no reply"
+
+test_115g() {
+       [ $(lustre_version_code ost1) -lt $(version_code 2.8.50) ] &&
+               skip "need at least 2.8.50 on OST" && return 0
+
+       #define OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK 0x51c
+       test_115_read 0x8000051c 0
+}
+run_test 115g "read: late REQ MDunlink and Reply MDunlink"
+
 # parameters: fail_loc CMD RC
 test_120_reply() {
        local PID
index cc573be..352cd3f 100755 (executable)
@@ -4631,7 +4631,8 @@ clear_failloc() {
 }
 
 set_nodes_failloc () {
-       do_nodes $(comma_list $1)  lctl set_param fail_val=0 fail_loc=$2
+       local fv=${3:-0}
+       do_nodes $(comma_list $1)  lctl set_param fail_val=$fv fail_loc=$2
 }
 
 cancel_lru_locks() {