Whamcloud - gitweb
b=17310
authoryury <yury>
Thu, 6 Nov 2008 07:32:43 +0000 (07:32 +0000)
committeryury <yury>
Thu, 6 Nov 2008 07:32:43 +0000 (07:32 +0000)
r=johann,shadow

- fixes ptlrpcd blocking on very long reply unlink waiting. To do so new rpc phase introduced
RQ_PHASE_UNREGISTERING in which request stay until we have reply_in_callback() called by lnet
signaling that reply is unlinked. All requests in this state are skipped in processing by prlrcd
instead of waiting n * 300s on each of them. This allows ptlrpcd to process other rpcs in the set;

- make sure that inflight count is coherent with being present on sending or delay list. That is,
if we see inflight != 0, rpc must be on one of these lists. This is very helpful in
ptlrpc_invalidate_import() to show all rpcs still waiting after invalidating import;

- in ptlrpc_invalidate_import() wait maximal rq_deadline - now from all inflight rpcs instead of
obd_timeout which may be much longer. If calculated timeout is 0, obd_timeout is used. This fixes
the issue that rq_deadline - now > obd_timeout (very easy to see in logs) which led to inflight !=
0 assert because inflight rpcs timed out later than our wait period is finished;

- in ptlrpc_invalidate_import() wait forever for rpcs in UNREGISTERING phase. Check in assert for
inflight == 0 for wait timed out case if no rpcs in UNREGISTERING phase. Only those in
UNREGISTERING phase are allowed to stay longer than obd_timeout;

- added ptlrpc_move_rqphase() function. All phase changes go through it. Add debug_req() there to
track down all phase changes;

- conf_sanity.sh test_45 added to emulate very long reply unlink and also situation when
rq_deadline - now > obd_timeout;

- fixed using rq_timedout in debug_req();

- do not wait forever in ptlrpc_unregister_reply() for async case (using it from sets). Sync case
left unchanged.

lustre/ptlrpc/client.c

index 3aa63ff..70c852b 100644 (file)
@@ -578,7 +578,9 @@ ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode,
         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
         request->rq_reply_cbid.cbid_arg = request;
 
+        request->rq_reply_deadline = 0;
         request->rq_phase = RQ_PHASE_NEW;
+        request->rq_next_phase = RQ_PHASE_UNDEFINED;
 
         request->rq_request_portal = imp->imp_client->cli_request_portal;
         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
@@ -810,7 +812,7 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
         /* serialise with network callback */
         spin_lock(&req->rq_lock);
 
-        if (req->rq_replied) {
+        if (ptlrpc_client_replied(req)) {
                 what = "REPLIED: ";
                 GOTO(out, rc = 1);
         }
@@ -818,7 +820,7 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
         if (req->rq_net_err && !req->rq_timedout) {
                 what = "NETERR: ";
                 spin_unlock(&req->rq_lock);
-                rc = ptlrpc_expire_one_request(req);
+                rc = ptlrpc_expire_one_request(req, 0);
                 spin_lock(&req->rq_lock);
                 GOTO(out, rc);
         }
@@ -838,7 +840,7 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
                 GOTO(out, rc = 1);
         }
 
-        if (req->rq_early) {
+        if (ptlrpc_client_early(req)) {
                 what = "EARLYREP: ";
                 ptlrpc_at_recv_early_reply(req);
                 GOTO(out, rc = 0); /* keep waiting */
@@ -999,7 +1001,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
         if (req->rq_sent && (req->rq_sent > CURRENT_SECONDS))
                 RETURN (0);
 
-        req->rq_phase = RQ_PHASE_RPC;
+        ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
 
         imp = req->rq_import;
         spin_lock(&imp->imp_lock);
@@ -1026,7 +1028,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
         if (rc != 0) {
                 spin_unlock(&imp->imp_lock);
                 req->rq_status = rc;
-                req->rq_phase = RQ_PHASE_INTERPRET;
+                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
                 RETURN(rc);
         }
 
@@ -1072,6 +1074,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                     ptlrpc_send_new_req(req)) {
                         force_timer_recalc = 1;
                 }
+
                 /* delayed send - skip */
                 if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
                         continue;
@@ -1079,30 +1082,53 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                 if (!(req->rq_phase == RQ_PHASE_RPC ||
                       req->rq_phase == RQ_PHASE_BULK ||
                       req->rq_phase == RQ_PHASE_INTERPRET ||
+                      req->rq_phase == RQ_PHASE_UNREGISTERING ||
                       req->rq_phase == RQ_PHASE_COMPLETE)) {
                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
                         LBUG();
                 }
 
+                if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
+                        LASSERT(req->rq_next_phase != req->rq_phase);
+                        LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
+
+                        /* Skip processing until reply is unlinked. We
+                         * can't return to pool before that and we can't
+                         * call interpret before that. We need to make
+                         * sure that all rdma transfers finished and will
+                         * not corrupt any data. */
+                        if (ptlrpc_client_recv_or_unlink(req))
+                                continue;
+                        
+                        /* Turn fail_loc off to prevent it from looping
+                         * forever. */
+                        OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK | 
+                                       OBD_FAIL_ONCE);
+
+                        /* Move to next phase if reply was successfully 
+                         * unlinked. */
+                        ptlrpc_rqphase_move(req, req->rq_next_phase);
+                }
+
                 if (req->rq_phase == RQ_PHASE_COMPLETE)
                         continue;
 
                 if (req->rq_phase == RQ_PHASE_INTERPRET)
                         GOTO(interpret, req->rq_status);
 
-                if (req->rq_net_err && !req->rq_timedout)
-                        ptlrpc_expire_one_request(req);
+                /* Note that this also will start async reply unlink */
+                if (req->rq_net_err && !req->rq_timedout) {
+                        ptlrpc_expire_one_request(req, 1);
+                        
+                        /* Check if we still need to wait for unlink. */
+                        if (ptlrpc_client_recv_or_unlink(req))
+                                continue;
+                }
 
                 if (req->rq_err) {
-                        ptlrpc_unregister_reply(req);
                         if (req->rq_status == 0)
                                 req->rq_status = -EIO;
-                        req->rq_phase = RQ_PHASE_INTERPRET;
-
-                        spin_lock(&imp->imp_lock);
-                        list_del_init(&req->rq_list);
-                        spin_unlock(&imp->imp_lock);
-
+                        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
                         GOTO(interpret, req->rq_status);
                 }
 
@@ -1112,15 +1138,8 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                  * seen a timeout.  our policy is to only interpret
                  * interrupted rpcs after they have timed out */
                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
-                        /* NB could be on delayed list */
-                        ptlrpc_unregister_reply(req);
                         req->rq_status = -EINTR;
-                        req->rq_phase = RQ_PHASE_INTERPRET;
-
-                        spin_lock(&imp->imp_lock);
-                        list_del_init(&req->rq_list);
-                        spin_unlock(&imp->imp_lock);
-
+                        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
                         GOTO(interpret, req->rq_status);
                 }
 
@@ -1128,7 +1147,8 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                         if (req->rq_timedout||req->rq_waiting||req->rq_resend) {
                                 int status;
 
-                                ptlrpc_unregister_reply(req);
+                                if (!ptlrpc_unregister_reply(req, 1))
+                                        continue;
 
                                 spin_lock(&imp->imp_lock);
 
@@ -1137,19 +1157,22 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                                         continue;
                                 }
 
-                                list_del_init(&req->rq_list);
                                 if (status != 0)  {
                                         req->rq_status = status;
-                                        req->rq_phase = RQ_PHASE_INTERPRET;
+                                        ptlrpc_rqphase_move(req, 
+                                                RQ_PHASE_INTERPRET);
                                         spin_unlock(&imp->imp_lock);
                                         GOTO(interpret, req->rq_status);
                                 }
                                 if (req->rq_no_resend) {
                                         req->rq_status = -ENOTCONN;
-                                        req->rq_phase = RQ_PHASE_INTERPRET;
+                                        ptlrpc_rqphase_move(req, 
+                                                RQ_PHASE_INTERPRET);
                                         spin_unlock(&imp->imp_lock);
                                         GOTO(interpret, req->rq_status);
                                 }
+
+                                list_del_init(&req->rq_list);
                                 list_add_tail(&req->rq_list,
                                               &imp->imp_sending_list);
 
@@ -1166,7 +1189,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                                         if (req->rq_bulk) {
                                                 __u64 old_xid = req->rq_xid;
 
-                                                ptlrpc_unregister_bulk (req);
+                                                ptlrpc_unregister_bulk(req);
 
                                                 /* ensure previous bulk fails */
                                                 req->rq_xid = ptlrpc_next_xid();
@@ -1190,36 +1213,33 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
 
                         spin_lock(&req->rq_lock);
 
-                        if (req->rq_early) {
+                        if (ptlrpc_client_early(req)) {
                                 ptlrpc_at_recv_early_reply(req);
                                 spin_unlock(&req->rq_lock);
                                 continue;
                         }
 
                         /* Still waiting for a reply? */
-                        if (req->rq_receiving_reply) {
+                        if (ptlrpc_client_recv(req)) {
                                 spin_unlock(&req->rq_lock);
                                 continue;
                         }
 
                         /* Did we actually receive a reply? */
-                        if (!req->rq_replied) {
+                        if (!ptlrpc_client_replied(req)) {
                                 spin_unlock(&req->rq_lock);
                                 continue;
                         }
 
                         spin_unlock(&req->rq_lock);
 
-                        spin_lock(&imp->imp_lock);
-                        list_del_init(&req->rq_list);
-                        spin_unlock(&imp->imp_lock);
-
                         req->rq_status = after_reply(req);
                         if (req->rq_resend) {
                                 /* Add this req to the delayed list so
                                    it can be errored if the import is
                                    evicted after recovery. */
                                 spin_lock(&imp->imp_lock);
+                                list_del_init(&req->rq_list);
                                 list_add_tail(&req->rq_list,
                                               &imp->imp_delayed_list);
                                 spin_unlock(&imp->imp_lock);
@@ -1228,15 +1248,15 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
 
                         /* If there is no bulk associated with this request,
                          * then we're done and should let the interpreter
-                         * process the reply.  Similarly if the RPC returned
+                         * process the reply. Similarly if the RPC returned
                          * an error, and therefore the bulk will never arrive.
                          */
                         if (req->rq_bulk == NULL || req->rq_status != 0) {
-                                req->rq_phase = RQ_PHASE_INTERPRET;
+                                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
                                 GOTO(interpret, req->rq_status);
                         }
 
-                        req->rq_phase = RQ_PHASE_BULK;
+                        ptlrpc_rqphase_move(req, RQ_PHASE_BULK);
                 }
 
                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
@@ -1250,19 +1270,26 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                          * the ACK for her PUT. */
                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
                         req->rq_status = -EIO;
-                        req->rq_phase = RQ_PHASE_INTERPRET;
+                        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
                         GOTO(interpret, req->rq_status);
                 }
 
-                req->rq_phase = RQ_PHASE_INTERPRET;
+                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
 
         interpret:
                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
-                LASSERT(!req->rq_receiving_reply);
 
-                ptlrpc_unregister_reply(req);
+                /* This moves to "unregistering" phase we need to wait for
+                 * reply unlink. */
+                if (!ptlrpc_unregister_reply(req, 1))
+                        continue;
+
                 if (req->rq_bulk != NULL)
-                        ptlrpc_unregister_bulk (req);
+                        ptlrpc_unregister_bulk(req);
+
+                /* When calling interpret receiving already should be
+                 * finished. */
+                LASSERT(!req->rq_receiving_reply);
 
                 if (req->rq_interpret_reply != NULL) {
                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
@@ -1270,7 +1297,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                         req->rq_status = interpreter(req, &req->rq_async_args,
                                                      req->rq_status);
                 }
-                req->rq_phase = RQ_PHASE_COMPLETE;
+                ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
 
                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:"
                        "opc %s:%s:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
@@ -1279,9 +1306,13 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                        libcfs_nid2str(imp->imp_connection->c_peer.nid),
                        lustre_msg_get_opc(req->rq_reqmsg));
 
-                set->set_remaining--;
-
+                spin_lock(&imp->imp_lock);
+                if (!list_empty(&req->rq_list))
+                        list_del_init(&req->rq_list);
                 atomic_dec(&imp->imp_inflight);
+                spin_unlock(&imp->imp_lock);
+
+                set->set_remaining--;
                 cfs_waitq_signal(&imp->imp_recovery_waitq);
         }
 
@@ -1290,7 +1321,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
 }
 
 /* Return 1 if we should give up, else 0 */
-int ptlrpc_expire_one_request(struct ptlrpc_request *req)
+int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
 {
         struct obd_import *imp = req->rq_import;
         int rc = 0;
@@ -1316,7 +1347,7 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req)
         req->rq_timedout = 1;
         spin_unlock(&req->rq_lock);
 
-        ptlrpc_unregister_reply (req);
+        ptlrpc_unregister_reply(req, async_unlink);
 
         if (obd_dump_on_timeout)
                 libcfs_debug_dumplog();
@@ -1373,24 +1404,24 @@ int ptlrpc_expired_set(void *data)
                 struct ptlrpc_request *req =
                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
 
-                /* request in-flight? */
-                if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting &&
-                       !req->rq_resend) ||
+                /* Request in-flight? */
+                if (!((req->rq_phase == RQ_PHASE_RPC &&
+                       !req->rq_waiting && !req->rq_resend) ||
                       (req->rq_phase == RQ_PHASE_BULK)))
                         continue;
 
-                if (req->rq_timedout ||           /* already dealt with */
+                if (req->rq_timedout ||     /* already dealt with */
                     req->rq_deadline > now) /* not expired */
                         continue;
 
-                /* deal with this guy */
-                ptlrpc_expire_one_request (req);
+                /* Deal with this guy. Do it asynchronously to not block
+                 * ptlrpcd thread. */
+                ptlrpc_expire_one_request(req, 1);
         }
 
         /* When waiting for a whole set, we always to break out of the
          * sleep so we can recalculate the timeout, or enable interrupts
-         * iff everyone's timed out.
-         */
+         * if everyone's timed out. */
         RETURN(1);
 }
 
@@ -1436,7 +1467,9 @@ int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
 
                 /* request in-flight? */
-                if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
+                if (!(((req->rq_phase & (RQ_PHASE_RPC | 
+                                         RQ_PHASE_UNREGISTERING)) && 
+                      !req->rq_waiting) ||
                       (req->rq_phase == RQ_PHASE_BULK) ||
                       (req->rq_phase == RQ_PHASE_NEW)))
                         continue;
@@ -1453,9 +1486,8 @@ int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
                         timeout = 1;    /* ASAP */
                         break;
                 }
-                if ((timeout == 0) || (timeout > (deadline - now))) {
+                if ((timeout == 0) || (timeout > (deadline - now)))
                         timeout = deadline - now;
-                }
         }
         RETURN(timeout);
 }
@@ -1538,6 +1570,8 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
         struct ptlrpc_request_pool *pool = request->rq_pool;
 
         spin_lock(&pool->prp_lock);
+        LASSERT(list_empty(&request->rq_list));
+        LASSERT(!request->rq_receiving_reply);
         list_add_tail(&request->rq_list, &pool->prp_req_list);
         spin_unlock(&pool->prp_lock);
 }
@@ -1648,24 +1682,41 @@ EXPORT_SYMBOL(ptlrpc_req_xid);
  * IDEMPOTENT, but _not_ safe against concurrent callers.
  * The request owner (i.e. the thread doing the I/O) must call...
  */
-void ptlrpc_unregister_reply (struct ptlrpc_request *request)
+int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
 {
         int                rc;
         cfs_waitq_t       *wq;
         struct l_wait_info lwi;
         ENTRY;
 
-        LASSERT(!in_interrupt ());             /* might sleep */
+        /* Might sleep. */
+        LASSERT(!in_interrupt());
+
+        /* Let's setup deadline for reply unlink. */
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) && 
+            request->rq_reply_deadline == 0)
+                request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK;
+
+        /* Nothing left to do. */
         if (!ptlrpc_client_recv_or_unlink(request))
-                /* Nothing left to do */
-                return;
+                RETURN(1);
+
+        LNetMDUnlink(request->rq_reply_md_h);
+
+        /* Let's check it once again. */        
+        if (!ptlrpc_client_recv_or_unlink(request))
+                RETURN(1);
+
+        /* Move to "Unregistering" phase as reply was not unlinked yet. */
+        ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING);
 
-        LNetMDUnlink (request->rq_reply_md_h);
+        /* Do not wait for unlink to finish. */
+        if (async)
+                RETURN(0);
 
         /* We have to l_wait_event() whatever the result, to give liblustre
          * a chance to run reply_in_callback(), and to make sure we've
          * unlinked before returning a req to the pool */
-
         if (request->rq_set != NULL)
                 wq = &request->rq_set->set_waitq;
         else
@@ -1675,17 +1726,19 @@ void ptlrpc_unregister_reply (struct ptlrpc_request *request)
                 /* Network access will complete in finite time but the HUGE
                  * timeout lets us CWARN for visibility of sluggish NALs */
                 lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL);
-                rc = l_wait_event (*wq, !ptlrpc_client_recv_or_unlink(request),
-                                   &lwi);
-                if (rc == 0)
-                        return;
+                rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
+                                  &lwi);
+                if (rc == 0) {
+                        ptlrpc_rqphase_move(request, request->rq_next_phase);
+                        RETURN(1);
+                }
 
-                LASSERT (rc == -ETIMEDOUT);
+                LASSERT(rc == -ETIMEDOUT);
                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout "
                           "rvcng=%d unlnk=%d", request->rq_receiving_reply,
                           request->rq_must_unlink);
         }
-        EXIT;
+        RETURN(0);
 }
 
 /* caller must hold imp->imp_lock */
@@ -1779,7 +1832,7 @@ void ptlrpc_resend_req(struct ptlrpc_request *req)
                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
                        old_xid, req->rq_xid);
         }
-        ptlrpc_wake_client_req(req);
+        ptlrpc_client_wake_req(req);
         spin_unlock(&req->rq_lock);
 }
 
@@ -1792,7 +1845,7 @@ void ptlrpc_restart_req(struct ptlrpc_request *req)
         spin_lock(&req->rq_lock);
         req->rq_restart = 1;
         req->rq_timedout = 0;
-        ptlrpc_wake_client_req(req);
+        ptlrpc_client_wake_req(req);
         spin_unlock(&req->rq_lock);
 }
 
@@ -1883,7 +1936,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                lustre_msg_get_opc(req->rq_reqmsg));
 
         /* Mark phase here for a little debug help */
-        req->rq_phase = RQ_PHASE_RPC;
+        ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
 
         spin_lock(&imp->imp_lock);
         req->rq_import_generation = imp->imp_generation;
@@ -1975,7 +2028,7 @@ restart:
         } while ((brc == -ETIMEDOUT) &&
                  (req->rq_deadline > cfs_time_current_sec()));
 
-        if ((brc == -ETIMEDOUT) && !ptlrpc_expire_one_request(req)) {
+        if ((brc == -ETIMEDOUT) && !ptlrpc_expire_one_request(req, 0)) {
                 /* Wait forever for reconnect / replay or failure */
                 lwi = LWI_INTR(interrupted_request, req);
                 brc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req),
@@ -1995,8 +2048,7 @@ restart:
         /* If the reply was received normally, this just grabs the spinlock
          * (ensuring the reply callback has returned), sees that
          * req->rq_receiving_reply is clear and returns. */
-        ptlrpc_unregister_reply (req);
-
+        ptlrpc_unregister_reply(req, 0);
 
         if (req->rq_err) {
                 DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d",
@@ -2025,7 +2077,7 @@ restart:
                 GOTO(out, rc = -ETIMEDOUT);
         }
 
-        if (!req->rq_replied) {
+        if (!ptlrpc_client_replied(req)) {
                 /* How can this be? -eeb */
                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
                 LBUG();
@@ -2066,7 +2118,7 @@ restart:
         }
 
         LASSERT(!req->rq_receiving_reply);
-        req->rq_phase = RQ_PHASE_INTERPRET;
+        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
 
         atomic_dec(&imp->imp_inflight);
         cfs_waitq_signal(&imp->imp_recovery_waitq);
@@ -2087,7 +2139,7 @@ static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
         ENTRY;
         atomic_dec(&imp->imp_replay_inflight);
 
-        if (!req->rq_replied) {
+        if (!ptlrpc_client_replied(req)) {
                 CERROR("request replay timed out, restarting recovery\n");
                 GOTO(out, rc = -ETIMEDOUT);
         }
@@ -2126,7 +2178,7 @@ static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
         if (req->rq_replay_cb)
                 req->rq_replay_cb(req);
 
-        if (req->rq_replied &&
+        if (ptlrpc_client_replied(req) &&
             lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) {
                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
                           lustre_msg_get_status(req->rq_repmsg),
@@ -2166,6 +2218,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         aa->praa_old_state = req->rq_send_state;
         req->rq_send_state = LUSTRE_IMP_REPLAY;
         req->rq_phase = RQ_PHASE_NEW;
+        req->rq_next_phase = RQ_PHASE_UNDEFINED;
         if (req->rq_repmsg)
                 aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
         req->rq_status = 0;
@@ -2207,7 +2260,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                 if (req->rq_import_generation < imp->imp_generation) {
                         req->rq_err = 1;
                         req->rq_status = -EINTR;
-                        ptlrpc_wake_client_req(req);
+                        ptlrpc_client_wake_req(req);
                 }
                 spin_unlock (&req->rq_lock);
         }
@@ -2222,7 +2275,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                 if (req->rq_import_generation < imp->imp_generation) {
                         req->rq_err = 1;
                         req->rq_status = -EINTR;
-                        ptlrpc_wake_client_req(req);
+                        ptlrpc_client_wake_req(req);
                 }
                 spin_unlock (&req->rq_lock);
         }
@@ -2255,7 +2308,7 @@ void ptlrpc_abort_set(struct ptlrpc_request_set *set)
 
                 req->rq_err = 1;
                 req->rq_status = -EINTR;
-                ptlrpc_wake_client_req(req);
+                ptlrpc_client_wake_req(req);
                 spin_unlock (&req->rq_lock);
         }
 }