request->rq_reply_cbid.cbid_fn = reply_in_callback;
request->rq_reply_cbid.cbid_arg = request;
+ request->rq_reply_deadline = 0;
request->rq_phase = RQ_PHASE_NEW;
+ request->rq_next_phase = RQ_PHASE_UNDEFINED;
request->rq_request_portal = imp->imp_client->cli_request_portal;
request->rq_reply_portal = imp->imp_client->cli_reply_portal;
/* serialise with network callback */
spin_lock(&req->rq_lock);
- if (req->rq_replied) {
+ if (ptlrpc_client_replied(req)) {
what = "REPLIED: ";
GOTO(out, rc = 1);
}
if (req->rq_net_err && !req->rq_timedout) {
what = "NETERR: ";
spin_unlock(&req->rq_lock);
- rc = ptlrpc_expire_one_request(req);
+ rc = ptlrpc_expire_one_request(req, 0);
spin_lock(&req->rq_lock);
GOTO(out, rc);
}
GOTO(out, rc = 1);
}
- if (req->rq_early) {
+ if (ptlrpc_client_early(req)) {
what = "EARLYREP: ";
ptlrpc_at_recv_early_reply(req);
GOTO(out, rc = 0); /* keep waiting */
if (req->rq_sent && (req->rq_sent > CURRENT_SECONDS))
RETURN (0);
- req->rq_phase = RQ_PHASE_RPC;
+ ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
imp = req->rq_import;
spin_lock(&imp->imp_lock);
if (rc != 0) {
spin_unlock(&imp->imp_lock);
req->rq_status = rc;
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
RETURN(rc);
}
ptlrpc_send_new_req(req)) {
force_timer_recalc = 1;
}
+
/* delayed send - skip */
if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
continue;
if (!(req->rq_phase == RQ_PHASE_RPC ||
req->rq_phase == RQ_PHASE_BULK ||
req->rq_phase == RQ_PHASE_INTERPRET ||
+ req->rq_phase == RQ_PHASE_UNREGISTERING ||
req->rq_phase == RQ_PHASE_COMPLETE)) {
DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
LBUG();
}
+ if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
+ LASSERT(req->rq_next_phase != req->rq_phase);
+ LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
+
+ /* Skip processing until reply is unlinked. We
+ * can't return to pool before that and we can't
+ * call interpret before that. We need to make
+ * sure that all rdma transfers finished and will
+ * not corrupt any data. */
+ if (ptlrpc_client_recv_or_unlink(req))
+ continue;
+
+ /* Turn fail_loc off to prevent it from looping
+ * forever. */
+ OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK |
+ OBD_FAIL_ONCE);
+
+ /* Move to next phase if reply was successfully
+ * unlinked. */
+ ptlrpc_rqphase_move(req, req->rq_next_phase);
+ }
+
if (req->rq_phase == RQ_PHASE_COMPLETE)
continue;
if (req->rq_phase == RQ_PHASE_INTERPRET)
GOTO(interpret, req->rq_status);
- if (req->rq_net_err && !req->rq_timedout)
- ptlrpc_expire_one_request(req);
+ /* Note that this also will start async reply unlink */
+ if (req->rq_net_err && !req->rq_timedout) {
+ ptlrpc_expire_one_request(req, 1);
+
+ /* Check if we still need to wait for unlink. */
+ if (ptlrpc_client_recv_or_unlink(req))
+ continue;
+ }
if (req->rq_err) {
- ptlrpc_unregister_reply(req);
if (req->rq_status == 0)
req->rq_status = -EIO;
- req->rq_phase = RQ_PHASE_INTERPRET;
-
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_list);
- spin_unlock(&imp->imp_lock);
-
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
GOTO(interpret, req->rq_status);
}
* seen a timeout. our policy is to only interpret
* interrupted rpcs after they have timed out */
if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
- /* NB could be on delayed list */
- ptlrpc_unregister_reply(req);
req->rq_status = -EINTR;
- req->rq_phase = RQ_PHASE_INTERPRET;
-
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_list);
- spin_unlock(&imp->imp_lock);
-
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
GOTO(interpret, req->rq_status);
}
if (req->rq_timedout||req->rq_waiting||req->rq_resend) {
int status;
- ptlrpc_unregister_reply(req);
+ if (!ptlrpc_unregister_reply(req, 1))
+ continue;
spin_lock(&imp->imp_lock);
continue;
}
- list_del_init(&req->rq_list);
if (status != 0) {
req->rq_status = status;
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req,
+ RQ_PHASE_INTERPRET);
spin_unlock(&imp->imp_lock);
GOTO(interpret, req->rq_status);
}
if (req->rq_no_resend) {
req->rq_status = -ENOTCONN;
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req,
+ RQ_PHASE_INTERPRET);
spin_unlock(&imp->imp_lock);
GOTO(interpret, req->rq_status);
}
+
+ list_del_init(&req->rq_list);
list_add_tail(&req->rq_list,
&imp->imp_sending_list);
if (req->rq_bulk) {
__u64 old_xid = req->rq_xid;
- ptlrpc_unregister_bulk (req);
+ ptlrpc_unregister_bulk(req);
/* ensure previous bulk fails */
req->rq_xid = ptlrpc_next_xid();
spin_lock(&req->rq_lock);
- if (req->rq_early) {
+ if (ptlrpc_client_early(req)) {
ptlrpc_at_recv_early_reply(req);
spin_unlock(&req->rq_lock);
continue;
}
/* Still waiting for a reply? */
- if (req->rq_receiving_reply) {
+ if (ptlrpc_client_recv(req)) {
spin_unlock(&req->rq_lock);
continue;
}
/* Did we actually receive a reply? */
- if (!req->rq_replied) {
+ if (!ptlrpc_client_replied(req)) {
spin_unlock(&req->rq_lock);
continue;
}
spin_unlock(&req->rq_lock);
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_list);
- spin_unlock(&imp->imp_lock);
-
req->rq_status = after_reply(req);
if (req->rq_resend) {
/* Add this req to the delayed list so
it can be errored if the import is
evicted after recovery. */
spin_lock(&imp->imp_lock);
+ list_del_init(&req->rq_list);
list_add_tail(&req->rq_list,
&imp->imp_delayed_list);
spin_unlock(&imp->imp_lock);
/* If there is no bulk associated with this request,
* then we're done and should let the interpreter
- * process the reply. Similarly if the RPC returned
+ * process the reply. Similarly if the RPC returned
* an error, and therefore the bulk will never arrive.
*/
if (req->rq_bulk == NULL || req->rq_status != 0) {
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
GOTO(interpret, req->rq_status);
}
- req->rq_phase = RQ_PHASE_BULK;
+ ptlrpc_rqphase_move(req, RQ_PHASE_BULK);
}
LASSERT(req->rq_phase == RQ_PHASE_BULK);
* the ACK for her PUT. */
DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
req->rq_status = -EIO;
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
GOTO(interpret, req->rq_status);
}
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
interpret:
LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
- LASSERT(!req->rq_receiving_reply);
- ptlrpc_unregister_reply(req);
+ /* This moves to "unregistering" phase we need to wait for
+ * reply unlink. */
+ if (!ptlrpc_unregister_reply(req, 1))
+ continue;
+
if (req->rq_bulk != NULL)
- ptlrpc_unregister_bulk (req);
+ ptlrpc_unregister_bulk(req);
+
+ /* When calling interpret receiving already should be
+ * finished. */
+ LASSERT(!req->rq_receiving_reply);
if (req->rq_interpret_reply != NULL) {
int (*interpreter)(struct ptlrpc_request *,void *,int) =
req->rq_status = interpreter(req, &req->rq_async_args,
req->rq_status);
}
- req->rq_phase = RQ_PHASE_COMPLETE;
+ ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:"
"opc %s:%s:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
libcfs_nid2str(imp->imp_connection->c_peer.nid),
lustre_msg_get_opc(req->rq_reqmsg));
- set->set_remaining--;
-
+ spin_lock(&imp->imp_lock);
+ if (!list_empty(&req->rq_list))
+ list_del_init(&req->rq_list);
atomic_dec(&imp->imp_inflight);
+ spin_unlock(&imp->imp_lock);
+
+ set->set_remaining--;
cfs_waitq_signal(&imp->imp_recovery_waitq);
}
}
/* Return 1 if we should give up, else 0 */
-int ptlrpc_expire_one_request(struct ptlrpc_request *req)
+int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
{
struct obd_import *imp = req->rq_import;
int rc = 0;
req->rq_timedout = 1;
spin_unlock(&req->rq_lock);
- ptlrpc_unregister_reply (req);
+ ptlrpc_unregister_reply(req, async_unlink);
if (obd_dump_on_timeout)
libcfs_debug_dumplog();
struct ptlrpc_request *req =
list_entry(tmp, struct ptlrpc_request, rq_set_chain);
- /* request in-flight? */
- if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting &&
- !req->rq_resend) ||
+ /* Request in-flight? */
+ if (!((req->rq_phase == RQ_PHASE_RPC &&
+ !req->rq_waiting && !req->rq_resend) ||
(req->rq_phase == RQ_PHASE_BULK)))
continue;
- if (req->rq_timedout || /* already dealt with */
+ if (req->rq_timedout || /* already dealt with */
req->rq_deadline > now) /* not expired */
continue;
- /* deal with this guy */
- ptlrpc_expire_one_request (req);
+ /* Deal with this guy. Do it asynchronously to not block
+ * ptlrpcd thread. */
+ ptlrpc_expire_one_request(req, 1);
}
/* When waiting for a whole set, we always to break out of the
* sleep so we can recalculate the timeout, or enable interrupts
- * iff everyone's timed out.
- */
+ * if everyone's timed out. */
RETURN(1);
}
req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
/* request in-flight? */
- if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
+ if (!(((req->rq_phase & (RQ_PHASE_RPC |
+ RQ_PHASE_UNREGISTERING)) &&
+ !req->rq_waiting) ||
(req->rq_phase == RQ_PHASE_BULK) ||
(req->rq_phase == RQ_PHASE_NEW)))
continue;
timeout = 1; /* ASAP */
break;
}
- if ((timeout == 0) || (timeout > (deadline - now))) {
+ if ((timeout == 0) || (timeout > (deadline - now)))
timeout = deadline - now;
- }
}
RETURN(timeout);
}
struct ptlrpc_request_pool *pool = request->rq_pool;
spin_lock(&pool->prp_lock);
+ LASSERT(list_empty(&request->rq_list));
+ LASSERT(!request->rq_receiving_reply);
list_add_tail(&request->rq_list, &pool->prp_req_list);
spin_unlock(&pool->prp_lock);
}
* IDEMPOTENT, but _not_ safe against concurrent callers.
* The request owner (i.e. the thread doing the I/O) must call...
*/
-void ptlrpc_unregister_reply (struct ptlrpc_request *request)
+int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
{
int rc;
cfs_waitq_t *wq;
struct l_wait_info lwi;
ENTRY;
- LASSERT(!in_interrupt ()); /* might sleep */
+ /* Might sleep. */
+ LASSERT(!in_interrupt());
+
+ /* Let's setup deadline for reply unlink. */
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+ request->rq_reply_deadline == 0)
+ request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK;
+
+ /* Nothing left to do. */
if (!ptlrpc_client_recv_or_unlink(request))
- /* Nothing left to do */
- return;
+ RETURN(1);
+
+ LNetMDUnlink(request->rq_reply_md_h);
+
+ /* Let's check it once again. */
+ if (!ptlrpc_client_recv_or_unlink(request))
+ RETURN(1);
+
+ /* Move to "Unregistering" phase as reply was not unlinked yet. */
+ ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING);
- LNetMDUnlink (request->rq_reply_md_h);
+ /* Do not wait for unlink to finish. */
+ if (async)
+ RETURN(0);
/* We have to l_wait_event() whatever the result, to give liblustre
* a chance to run reply_in_callback(), and to make sure we've
* unlinked before returning a req to the pool */
-
if (request->rq_set != NULL)
wq = &request->rq_set->set_waitq;
else
/* Network access will complete in finite time but the HUGE
* timeout lets us CWARN for visibility of sluggish NALs */
lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL);
- rc = l_wait_event (*wq, !ptlrpc_client_recv_or_unlink(request),
- &lwi);
- if (rc == 0)
- return;
+ rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
+ &lwi);
+ if (rc == 0) {
+ ptlrpc_rqphase_move(request, request->rq_next_phase);
+ RETURN(1);
+ }
- LASSERT (rc == -ETIMEDOUT);
+ LASSERT(rc == -ETIMEDOUT);
DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout "
"rvcng=%d unlnk=%d", request->rq_receiving_reply,
request->rq_must_unlink);
}
- EXIT;
+ RETURN(0);
}
/* caller must hold imp->imp_lock */
CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
old_xid, req->rq_xid);
}
- ptlrpc_wake_client_req(req);
+ ptlrpc_client_wake_req(req);
spin_unlock(&req->rq_lock);
}
spin_lock(&req->rq_lock);
req->rq_restart = 1;
req->rq_timedout = 0;
- ptlrpc_wake_client_req(req);
+ ptlrpc_client_wake_req(req);
spin_unlock(&req->rq_lock);
}
lustre_msg_get_opc(req->rq_reqmsg));
/* Mark phase here for a little debug help */
- req->rq_phase = RQ_PHASE_RPC;
+ ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
spin_lock(&imp->imp_lock);
req->rq_import_generation = imp->imp_generation;
} while ((brc == -ETIMEDOUT) &&
(req->rq_deadline > cfs_time_current_sec()));
- if ((brc == -ETIMEDOUT) && !ptlrpc_expire_one_request(req)) {
+ if ((brc == -ETIMEDOUT) && !ptlrpc_expire_one_request(req, 0)) {
/* Wait forever for reconnect / replay or failure */
lwi = LWI_INTR(interrupted_request, req);
brc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req),
/* If the reply was received normally, this just grabs the spinlock
* (ensuring the reply callback has returned), sees that
* req->rq_receiving_reply is clear and returns. */
- ptlrpc_unregister_reply (req);
-
+ ptlrpc_unregister_reply(req, 0);
if (req->rq_err) {
DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d",
GOTO(out, rc = -ETIMEDOUT);
}
- if (!req->rq_replied) {
+ if (!ptlrpc_client_replied(req)) {
/* How can this be? -eeb */
DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
LBUG();
}
LASSERT(!req->rq_receiving_reply);
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
atomic_dec(&imp->imp_inflight);
cfs_waitq_signal(&imp->imp_recovery_waitq);
ENTRY;
atomic_dec(&imp->imp_replay_inflight);
- if (!req->rq_replied) {
+ if (!ptlrpc_client_replied(req)) {
CERROR("request replay timed out, restarting recovery\n");
GOTO(out, rc = -ETIMEDOUT);
}
if (req->rq_replay_cb)
req->rq_replay_cb(req);
- if (req->rq_replied &&
+ if (ptlrpc_client_replied(req) &&
lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) {
DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
lustre_msg_get_status(req->rq_repmsg),
aa->praa_old_state = req->rq_send_state;
req->rq_send_state = LUSTRE_IMP_REPLAY;
req->rq_phase = RQ_PHASE_NEW;
+ req->rq_next_phase = RQ_PHASE_UNDEFINED;
if (req->rq_repmsg)
aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
req->rq_status = 0;
if (req->rq_import_generation < imp->imp_generation) {
req->rq_err = 1;
req->rq_status = -EINTR;
- ptlrpc_wake_client_req(req);
+ ptlrpc_client_wake_req(req);
}
spin_unlock (&req->rq_lock);
}
if (req->rq_import_generation < imp->imp_generation) {
req->rq_err = 1;
req->rq_status = -EINTR;
- ptlrpc_wake_client_req(req);
+ ptlrpc_client_wake_req(req);
}
spin_unlock (&req->rq_lock);
}
req->rq_err = 1;
req->rq_status = -EINTR;
- ptlrpc_wake_client_req(req);
+ ptlrpc_client_wake_req(req);
spin_unlock (&req->rq_lock);
}
}