return;
}
-struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int num_rq, int msgsize,
- void (*populate_pool)(struct ptlrpc_request_pool *, int))
+struct ptlrpc_request_pool *
+ptlrpc_init_rq_pool(int num_rq, int msgsize,
+ void (*populate_pool)(struct ptlrpc_request_pool *, int))
{
struct ptlrpc_request_pool *pool;
return pool;
}
-static struct ptlrpc_request *ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
+static struct ptlrpc_request *
+ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
{
struct ptlrpc_request *request;
struct lustre_msg *reqbuf;
spin_lock(&pool->prp_lock);
LASSERT(list_empty(&request->rq_list));
+ LASSERT(!request->rq_receiving_reply);
list_add_tail(&request->rq_list, &pool->prp_req_list);
spin_unlock(&pool->prp_lock);
}
request->rq_reply_cbid.cbid_fn = reply_in_callback;
request->rq_reply_cbid.cbid_arg = request;
+ request->rq_reply_deadline = 0;
request->rq_phase = RQ_PHASE_NEW;
+ request->rq_next_phase = RQ_PHASE_UNDEFINED;
request->rq_request_portal = imp->imp_client->cli_request_portal;
request->rq_reply_portal = imp->imp_client->cli_reply_portal;
/* serialise with network callback */
spin_lock(&req->rq_lock);
- if (req->rq_replied)
+ if (ptlrpc_client_replied(req))
GOTO(out, rc = 1);
if (req->rq_net_err && !req->rq_timedout) {
spin_unlock(&req->rq_lock);
- rc = ptlrpc_expire_one_request(req);
+ rc = ptlrpc_expire_one_request(req, 0);
spin_lock(&req->rq_lock);
GOTO(out, rc);
}
if (req->rq_restart)
GOTO(out, rc = 1);
- if (req->rq_early) {
+ if (ptlrpc_client_early(req)) {
ptlrpc_at_recv_early_reply(req);
GOTO(out, rc = 0); /* keep waiting */
}
if (req->rq_sent && (req->rq_sent > cfs_time_current_sec()))
RETURN (0);
- req->rq_phase = RQ_PHASE_RPC;
+ ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
imp = req->rq_import;
spin_lock(&imp->imp_lock);
if (rc != 0) {
spin_unlock(&imp->imp_lock);
req->rq_status = rc;
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
RETURN(rc);
}
ptlrpc_send_new_req(req)) {
force_timer_recalc = 1;
}
+
/* delayed send - skip */
if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
continue;
if (!(req->rq_phase == RQ_PHASE_RPC ||
req->rq_phase == RQ_PHASE_BULK ||
req->rq_phase == RQ_PHASE_INTERPRET ||
+ req->rq_phase == RQ_PHASE_UNREGISTERING ||
req->rq_phase == RQ_PHASE_COMPLETE)) {
DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
LBUG();
}
+ if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
+ LASSERT(req->rq_next_phase != req->rq_phase);
+ LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
+
+ /*
+ * Skip processing until reply is unlinked. We
+ * can't return to pool before that and we can't
+ * call interpret before that. We need to make
+ * sure that all rdma transfers finished and will
+ * not corrupt any data.
+ */
+ if (ptlrpc_client_recv_or_unlink(req))
+ continue;
+
+ /*
+ * Turn fail_loc off to prevent it from looping
+ * forever.
+ */
+ OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK |
+ OBD_FAIL_ONCE);
+
+ /*
+ * Move to next phase if reply was successfully
+ * unlinked.
+ */
+ ptlrpc_rqphase_move(req, req->rq_next_phase);
+ }
+
if (req->rq_phase == RQ_PHASE_COMPLETE)
continue;
if (req->rq_phase == RQ_PHASE_INTERPRET)
GOTO(interpret, req->rq_status);
- if (req->rq_net_err && !req->rq_timedout)
- ptlrpc_expire_one_request(req);
+ /*
+ * Note that this also will start async reply unlink.
+ */
+ if (req->rq_net_err && !req->rq_timedout) {
+ ptlrpc_expire_one_request(req, 1);
+
+ /*
+ * Check if we still need to wait for unlink.
+ */
+ if (ptlrpc_client_recv_or_unlink(req))
+ continue;
+ }
if (req->rq_err) {
- ptlrpc_unregister_reply(req);
req->rq_replied = 0;
if (req->rq_status == 0)
req->rq_status = -EIO;
- req->rq_phase = RQ_PHASE_INTERPRET;
-
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_list);
- spin_unlock(&imp->imp_lock);
-
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
GOTO(interpret, req->rq_status);
}
* interrupted rpcs after they have timed out */
if (req->rq_intr && (req->rq_timedout || req->rq_waiting ||
req->rq_wait_ctx)) {
- /* NB could be on delayed list */
- ptlrpc_unregister_reply(req);
req->rq_status = -EINTR;
- req->rq_phase = RQ_PHASE_INTERPRET;
-
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_list);
- spin_unlock(&imp->imp_lock);
-
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
GOTO(interpret, req->rq_status);
}
req->rq_waiting || req->rq_wait_ctx) {
int status;
- ptlrpc_unregister_reply(req);
+ if (!ptlrpc_unregister_reply(req, 1))
+ continue;
spin_lock(&imp->imp_lock);
continue;
}
- list_del_init(&req->rq_list);
if (status != 0) {
req->rq_status = status;
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req,
+ RQ_PHASE_INTERPRET);
spin_unlock(&imp->imp_lock);
GOTO(interpret, req->rq_status);
}
if (req->rq_no_resend && !req->rq_wait_ctx) {
req->rq_status = -ENOTCONN;
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req,
+ RQ_PHASE_INTERPRET);
spin_unlock(&imp->imp_lock);
GOTO(interpret, req->rq_status);
}
+
+ list_del_init(&req->rq_list);
list_add_tail(&req->rq_list,
&imp->imp_sending_list);
if (req->rq_bulk) {
__u64 old_xid = req->rq_xid;
- ptlrpc_unregister_bulk (req);
+ ptlrpc_unregister_bulk(req);
/* ensure previous bulk fails */
req->rq_xid = ptlrpc_next_xid();
spin_lock(&req->rq_lock);
- if (req->rq_early) {
+ if (ptlrpc_client_early(req)) {
ptlrpc_at_recv_early_reply(req);
spin_unlock(&req->rq_lock);
continue;
}
/* Still waiting for a reply? */
- if (req->rq_receiving_reply) {
+ if (ptlrpc_client_recv(req)) {
spin_unlock(&req->rq_lock);
continue;
}
/* Did we actually receive a reply? */
- if (!req->rq_replied) {
+ if (!ptlrpc_client_replied(req)) {
spin_unlock(&req->rq_lock);
continue;
}
spin_unlock(&req->rq_lock);
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_list);
- spin_unlock(&imp->imp_lock);
-
req->rq_status = after_reply(req);
if (req->rq_resend) {
/* Add this req to the delayed list so
it can be errored if the import is
evicted after recovery. */
spin_lock(&imp->imp_lock);
+ list_del_init(&req->rq_list);
list_add_tail(&req->rq_list,
&imp->imp_delayed_list);
spin_unlock(&imp->imp_lock);
/* If there is no bulk associated with this request,
* then we're done and should let the interpreter
- * process the reply. Similarly if the RPC returned
+ * process the reply. Similarly if the RPC returned
* an error, and therefore the bulk will never arrive.
*/
if (req->rq_bulk == NULL || req->rq_status != 0) {
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
GOTO(interpret, req->rq_status);
}
- req->rq_phase = RQ_PHASE_BULK;
+ ptlrpc_rqphase_move(req, RQ_PHASE_BULK);
}
LASSERT(req->rq_phase == RQ_PHASE_BULK);
LBUG();
}
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
interpret:
LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
- LASSERT(!req->rq_receiving_reply);
- ptlrpc_unregister_reply(req);
+ /* This moves to "unregistering" phase we need to wait for
+ * reply unlink. */
+ if (!ptlrpc_unregister_reply(req, 1))
+ continue;
+
if (req->rq_bulk != NULL)
- ptlrpc_unregister_bulk (req);
+ ptlrpc_unregister_bulk(req);
+
+ /* When calling interpret receiving already should be
+ * finished. */
+ LASSERT(!req->rq_receiving_reply);
if (req->rq_interpret_reply != NULL) {
ptlrpc_interpterer_t interpreter =
&req->rq_async_args,
req->rq_status);
}
- req->rq_phase = RQ_PHASE_COMPLETE;
+ ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:"
"opc %s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
libcfs_nid2str(imp->imp_connection->c_peer.nid),
lustre_msg_get_opc(req->rq_reqmsg));
+ spin_lock(&imp->imp_lock);
+ if (!list_empty(&req->rq_list))
+ list_del_init(&req->rq_list);
atomic_dec(&imp->imp_inflight);
+ spin_unlock(&imp->imp_lock);
+
set->set_remaining--;
cfs_waitq_signal(&imp->imp_recovery_waitq);
}
}
/* Return 1 if we should give up, else 0 */
-int ptlrpc_expire_one_request(struct ptlrpc_request *req)
+int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
{
struct obd_import *imp = req->rq_import;
int rc = 0;
req->rq_timedout = 1;
spin_unlock(&req->rq_lock);
- ptlrpc_unregister_reply (req);
+ ptlrpc_unregister_reply(req, async_unlink);
if (obd_dump_on_timeout)
libcfs_debug_dumplog();
LASSERT(set != NULL);
- /* A timeout expired; see which reqs it applies to... */
+ /*
+ * A timeout expired. See which reqs it applies to...
+ */
list_for_each (tmp, &set->set_requests) {
struct ptlrpc_request *req =
list_entry(tmp, struct ptlrpc_request, rq_set_chain);
- /* request in-flight? */
- if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting &&
- !req->rq_resend) ||
+ /* Request in-flight? */
+ if (!((req->rq_phase &
+ (RQ_PHASE_RPC | RQ_PHASE_UNREGISTERING) &&
+ !req->rq_waiting && !req->rq_resend) ||
(req->rq_phase == RQ_PHASE_BULK)))
continue;
-
- if (req->rq_timedout || /* already dealt with */
- req->rq_deadline > now) /* not expired */
+
+ if (req->rq_timedout || /* already dealt with */
+ req->rq_deadline > now) /* not expired */
continue;
- /* deal with this guy */
- ptlrpc_expire_one_request (req);
+ /* Deal with this guy. Do it asynchronously to not block
+ * ptlrpcd thread. */
+ ptlrpc_expire_one_request(req, 1);
}
- /* When waiting for a whole set, we always to break out of the
+ /*
+ * When waiting for a whole set, we always to break out of the
* sleep so we can recalculate the timeout, or enable interrupts
- * iff everyone's timed out.
+ * if everyone's timed out.
*/
RETURN(1);
}
struct ptlrpc_request *req =
list_entry(tmp, struct ptlrpc_request, rq_set_chain);
- if (req->rq_phase != RQ_PHASE_RPC)
+ if (req->rq_phase != RQ_PHASE_RPC &&
+ req->rq_phase != RQ_PHASE_UNREGISTERING)
continue;
ptlrpc_mark_interrupted(req);
}
}
-/* get the smallest timeout in the set; this does NOT set a timeout. */
+/**
+ * Get the smallest timeout in the set; this does NOT set a timeout.
+ */
int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
{
struct list_head *tmp;
list_for_each(tmp, &set->set_requests) {
req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
- /* request in-flight? */
- if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
+ /*
+ * Request in-flight?
+ */
+ if (!(((req->rq_phase &
+ (RQ_PHASE_RPC | RQ_PHASE_UNREGISTERING)) &&
+ !req->rq_waiting) ||
(req->rq_phase == RQ_PHASE_BULK) ||
(req->rq_phase == RQ_PHASE_NEW)))
continue;
- if (req->rq_timedout) /* already timed out */
+ /*
+ * Check those waiting for long reply unlink every one
+ * second.
+ */
+ if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
+ timeout = 1;
+ break;
+ }
+
+ /*
+ * Already timed out.
+ */
+ if (req->rq_timedout)
continue;
- if (req->rq_wait_ctx) /* waiting for ctx */
+ /*
+ * Waiting for ctx.
+ */
+ if (req->rq_wait_ctx)
continue;
if (req->rq_phase == RQ_PHASE_NEW)
* IDEMPOTENT, but _not_ safe against concurrent callers.
* The request owner (i.e. the thread doing the I/O) must call...
*/
-void ptlrpc_unregister_reply (struct ptlrpc_request *request)
+int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
{
int rc;
cfs_waitq_t *wq;
struct l_wait_info lwi;
- LASSERT(!in_interrupt ()); /* might sleep */
+ /*
+ * Might sleep.
+ */
+ LASSERT(!in_interrupt());
+
+ /*
+ * Let's setup deadline for reply unlink.
+ */
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+ async && request->rq_reply_deadline == 0)
+ request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK;
+
+ /*
+ * Nothing left to do.
+ */
if (!ptlrpc_client_recv_or_unlink(request))
- /* Nothing left to do */
- return;
+ RETURN(1);
- LNetMDUnlink (request->rq_reply_md_h);
+ LNetMDUnlink(request->rq_reply_md_h);
- /* We have to l_wait_event() whatever the result, to give liblustre
- * a chance to run reply_in_callback(), and to make sure we've
- * unlinked before returning a req to the pool */
+ /*
+ * Let's check it once again.
+ */
+ if (!ptlrpc_client_recv_or_unlink(request))
+ RETURN(1);
+
+ /*
+ * Move to "Unregistering" phase as reply was not unlinked yet.
+ */
+ ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING);
+ /*
+ * Do not wait for unlink to finish.
+ */
+ if (async)
+ RETURN(0);
+
+ /*
+ * We have to l_wait_event() whatever the result, to give liblustre
+ * a chance to run reply_in_callback(), and to make sure we've
+ * unlinked before returning a req to the pool.
+ */
if (request->rq_set != NULL)
wq = &request->rq_set->set_waitq;
else
/* Network access will complete in finite time but the HUGE
* timeout lets us CWARN for visibility of sluggish NALs */
lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL);
- rc = l_wait_event (*wq, !ptlrpc_client_recv_or_unlink(request),
- &lwi);
- if (rc == 0)
- return;
-
- LASSERT (rc == -ETIMEDOUT);
+ rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
+ &lwi);
+ if (rc == 0) {
+ ptlrpc_rqphase_move(request, request->rq_next_phase);
+ RETURN(1);
+ }
+
+ LASSERT(rc == -ETIMEDOUT);
DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout "
"rvcng=%d unlnk=%d", request->rq_receiving_reply,
request->rq_must_unlink);
}
+ RETURN(0);
}
/* caller must hold imp->imp_lock */
CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
old_xid, req->rq_xid);
}
- ptlrpc_wake_client_req(req);
+ ptlrpc_client_wake_req(req);
spin_unlock(&req->rq_lock);
}
spin_lock(&req->rq_lock);
req->rq_restart = 1;
req->rq_timedout = 0;
- ptlrpc_wake_client_req(req);
+ ptlrpc_client_wake_req(req);
spin_unlock(&req->rq_lock);
}
struct ptlrpc_request *req = data;
ENTRY;
- /* some failure can suspend regular timeouts */
+ /*
+ * Some failure can suspend regular timeouts.
+ */
if (ptlrpc_check_suspend())
RETURN(1);
- /* deadline may have changed with an early reply */
+ /*
+ * Deadline may have changed with an early reply.
+ */
if (req->rq_deadline > cfs_time_current_sec())
RETURN(1);
- RETURN(ptlrpc_expire_one_request(req));
+ RETURN(ptlrpc_expire_one_request(req, 0));
}
static void interrupted_request(void *data)
lustre_msg_get_opc(req->rq_reqmsg));
/* Mark phase here for a little debug help */
- req->rq_phase = RQ_PHASE_RPC;
+ ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
spin_lock(&imp->imp_lock);
req->rq_import_generation = imp->imp_generation;
/* If the reply was received normally, this just grabs the spinlock
* (ensuring the reply callback has returned), sees that
* req->rq_receiving_reply is clear and returns. */
- ptlrpc_unregister_reply (req);
-
+ ptlrpc_unregister_reply(req, 0);
if (req->rq_err) {
DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d",
GOTO(out, rc = -ETIMEDOUT);
}
- if (!req->rq_replied) {
+ if (!ptlrpc_client_replied(req)) {
/* How can this be? -eeb */
DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
LBUG();
}
LASSERT(!req->rq_receiving_reply);
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
atomic_dec(&imp->imp_inflight);
cfs_waitq_signal(&imp->imp_recovery_waitq);
ENTRY;
atomic_dec(&imp->imp_replay_inflight);
- if (!req->rq_replied) {
+ if (!ptlrpc_client_replied(req)) {
CERROR("request replay timed out, restarting recovery\n");
GOTO(out, rc = -ETIMEDOUT);
}
if (req->rq_replay_cb)
req->rq_replay_cb(req);
- if (req->rq_replied &&
+ if (ptlrpc_client_replied(req) &&
lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) {
DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
lustre_msg_get_status(req->rq_repmsg),
aa->praa_old_state = req->rq_send_state;
req->rq_send_state = LUSTRE_IMP_REPLAY;
req->rq_phase = RQ_PHASE_NEW;
+ req->rq_next_phase = RQ_PHASE_UNDEFINED;
if (req->rq_repmsg)
aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
req->rq_status = 0;
if (req->rq_import_generation < imp->imp_generation) {
req->rq_err = 1;
req->rq_status = -EINTR;
- ptlrpc_wake_client_req(req);
+ ptlrpc_client_wake_req(req);
}
spin_unlock (&req->rq_lock);
}
if (req->rq_import_generation < imp->imp_generation) {
req->rq_err = 1;
req->rq_status = -EINTR;
- ptlrpc_wake_client_req(req);
+ ptlrpc_client_wake_req(req);
}
spin_unlock (&req->rq_lock);
}
req->rq_err = 1;
req->rq_status = -EINTR;
- ptlrpc_wake_client_req(req);
+ ptlrpc_client_wake_req(req);
spin_unlock (&req->rq_lock);
}
}