#include <lu_object.h>
#include <lustre_req_layout.h>
+#include <obd_support.h>
+
/* MD flags we _always_ use */
#define PTLRPC_MD_OPTIONS 0
struct ptlrpc_thread;
enum rq_phase {
- RQ_PHASE_NEW = 0xebc0de00,
- RQ_PHASE_RPC = 0xebc0de01,
- RQ_PHASE_BULK = 0xebc0de02,
- RQ_PHASE_INTERPRET = 0xebc0de03,
- RQ_PHASE_COMPLETE = 0xebc0de04,
+ RQ_PHASE_NEW = 0xebc0de00,
+ RQ_PHASE_RPC = 0xebc0de01,
+ RQ_PHASE_BULK = 0xebc0de02,
+ RQ_PHASE_INTERPRET = 0xebc0de03,
+ RQ_PHASE_COMPLETE = 0xebc0de04,
+ RQ_PHASE_UNREGISTERING = 0xebc0de05,
+ RQ_PHASE_UNDEFINED = 0xebc0de06
};
/** Type of request interpreter call-back */
rq_sent_final:1; /* stop sending early replies */
enum rq_phase rq_phase; /* one of RQ_PHASE_* */
+ enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
atomic_t rq_refcount; /* client-side refcount for SENT race,
server-side refcounf for multiple replies */
volatile time_t rq_deadline; /* when request must finish. volatile
so that servers' early reply updates to the deadline aren't
kept in per-cpu cache */
+ time_t rq_reply_deadline; /* when req reply unlink must finish. */
int rq_timeout; /* service time estimate (secs) */
/* Multi-rpc bits */
}
static inline const char *
-ptlrpc_rqphase2str(const struct ptlrpc_request *req)
+ptlrpc_phase2str(enum rq_phase phase)
{
- switch (req->rq_phase) {
+ switch (phase) {
case RQ_PHASE_NEW:
return "New";
case RQ_PHASE_RPC:
return "Interpret";
case RQ_PHASE_COMPLETE:
return "Complete";
+ case RQ_PHASE_UNREGISTERING:
+ return "Unregistering";
default:
return "?Phase?";
}
}
+static inline const char *
+ptlrpc_rqphase2str(struct ptlrpc_request *req)
+{
+ return ptlrpc_phase2str(req->rq_phase);
+}
+
/* Spare the preprocessor, spoil the bugs. */
#define FLAG(field, str) (field ? str : "")
void ptlrpc_cleanup_client(struct obd_import *imp);
struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid);
-static inline int
-ptlrpc_client_recv_or_unlink (struct ptlrpc_request *req)
-{
- int rc;
-
- spin_lock(&req->rq_lock);
- rc = req->rq_receiving_reply || req->rq_must_unlink;
- spin_unlock(&req->rq_lock);
- return (rc);
-}
-
-static inline void
-ptlrpc_wake_client_req (struct ptlrpc_request *req)
-{
- if (req->rq_set == NULL)
- cfs_waitq_signal(&req->rq_reply_waitq);
- else
- cfs_waitq_signal(&req->rq_set->set_waitq);
-}
-
int ptlrpc_queue_wait(struct ptlrpc_request *req);
int ptlrpc_replay_req(struct ptlrpc_request *req);
-void ptlrpc_unregister_reply(struct ptlrpc_request *req);
+int ptlrpc_unregister_reply(struct ptlrpc_request *req, int async);
void ptlrpc_restart_req(struct ptlrpc_request *req);
void ptlrpc_abort_inflight(struct obd_import *imp);
void ptlrpc_abort_set(struct ptlrpc_request_set *set);
}
static inline void
+ptlrpc_rqphase_move(struct ptlrpc_request *req, enum rq_phase new_phase)
+{
+ if (req->rq_phase == new_phase)
+ return;
+
+ if (new_phase == RQ_PHASE_UNREGISTERING) {
+ req->rq_next_phase = req->rq_phase;
+ if (req->rq_import)
+ atomic_inc(&req->rq_import->imp_unregistering);
+ }
+
+ if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
+ if (req->rq_import)
+ atomic_dec(&req->rq_import->imp_unregistering);
+ }
+
+ DEBUG_REQ(D_RPCTRACE, req, "move req \"%s\" -> \"%s\"",
+ ptlrpc_rqphase2str(req), ptlrpc_phase2str(new_phase));
+
+ req->rq_phase = new_phase;
+}
+
+static inline int
+ptlrpc_client_early(struct ptlrpc_request *req)
+{
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+ req->rq_reply_deadline > cfs_time_current_sec())
+ return 0;
+ return req->rq_early;
+}
+
+static inline int
+ptlrpc_client_replied(struct ptlrpc_request *req)
+{
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+ req->rq_reply_deadline > cfs_time_current_sec())
+ return 0;
+ return req->rq_replied;
+}
+
+static inline int
+ptlrpc_client_recv(struct ptlrpc_request *req)
+{
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+ req->rq_reply_deadline > cfs_time_current_sec())
+ return 1;
+ return req->rq_receiving_reply;
+}
+
+static inline int
+ptlrpc_client_recv_or_unlink(struct ptlrpc_request *req)
+{
+ int rc;
+
+ spin_lock(&req->rq_lock);
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+ req->rq_reply_deadline > cfs_time_current_sec()) {
+ spin_unlock(&req->rq_lock);
+ return 1;
+ }
+ rc = req->rq_receiving_reply || req->rq_must_unlink;
+ spin_unlock(&req->rq_lock);
+ return rc;
+}
+
+static inline void
+ptlrpc_client_wake_req(struct ptlrpc_request *req)
+{
+ if (req->rq_set == NULL)
+ cfs_waitq_signal(&req->rq_reply_waitq);
+ else
+ cfs_waitq_signal(&req->rq_set->set_waitq);
+}
+
+static inline void
ptlrpc_rs_addref(struct ptlrpc_reply_state *rs)
{
LASSERT(atomic_read(&rs->rs_refcount) > 0);
return;
}
-struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int num_rq, int msgsize,
- void (*populate_pool)(struct ptlrpc_request_pool *, int))
+struct ptlrpc_request_pool *
+ptlrpc_init_rq_pool(int num_rq, int msgsize,
+ void (*populate_pool)(struct ptlrpc_request_pool *, int))
{
struct ptlrpc_request_pool *pool;
return pool;
}
-static struct ptlrpc_request *ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
+static struct ptlrpc_request *
+ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
{
struct ptlrpc_request *request;
struct lustre_msg *reqbuf;
spin_lock(&pool->prp_lock);
LASSERT(list_empty(&request->rq_list));
+ LASSERT(!request->rq_receiving_reply);
list_add_tail(&request->rq_list, &pool->prp_req_list);
spin_unlock(&pool->prp_lock);
}
request->rq_reply_cbid.cbid_fn = reply_in_callback;
request->rq_reply_cbid.cbid_arg = request;
+ request->rq_reply_deadline = 0;
request->rq_phase = RQ_PHASE_NEW;
+ request->rq_next_phase = RQ_PHASE_UNDEFINED;
request->rq_request_portal = imp->imp_client->cli_request_portal;
request->rq_reply_portal = imp->imp_client->cli_reply_portal;
/* serialise with network callback */
spin_lock(&req->rq_lock);
- if (req->rq_replied)
+ if (ptlrpc_client_replied(req))
GOTO(out, rc = 1);
if (req->rq_net_err && !req->rq_timedout) {
spin_unlock(&req->rq_lock);
- rc = ptlrpc_expire_one_request(req);
+ rc = ptlrpc_expire_one_request(req, 0);
spin_lock(&req->rq_lock);
GOTO(out, rc);
}
if (req->rq_restart)
GOTO(out, rc = 1);
- if (req->rq_early) {
+ if (ptlrpc_client_early(req)) {
ptlrpc_at_recv_early_reply(req);
GOTO(out, rc = 0); /* keep waiting */
}
if (req->rq_sent && (req->rq_sent > cfs_time_current_sec()))
RETURN (0);
- req->rq_phase = RQ_PHASE_RPC;
+ ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
imp = req->rq_import;
spin_lock(&imp->imp_lock);
if (rc != 0) {
spin_unlock(&imp->imp_lock);
req->rq_status = rc;
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
RETURN(rc);
}
ptlrpc_send_new_req(req)) {
force_timer_recalc = 1;
}
+
/* delayed send - skip */
if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
continue;
if (!(req->rq_phase == RQ_PHASE_RPC ||
req->rq_phase == RQ_PHASE_BULK ||
req->rq_phase == RQ_PHASE_INTERPRET ||
+ req->rq_phase == RQ_PHASE_UNREGISTERING ||
req->rq_phase == RQ_PHASE_COMPLETE)) {
DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
LBUG();
}
+ if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
+ LASSERT(req->rq_next_phase != req->rq_phase);
+ LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
+
+ /*
+ * Skip processing until reply is unlinked. We
+ * can't return to pool before that and we can't
+ * call interpret before that. We need to make
+ * sure that all rdma transfers finished and will
+ * not corrupt any data.
+ */
+ if (ptlrpc_client_recv_or_unlink(req))
+ continue;
+
+ /*
+ * Turn fail_loc off to prevent it from looping
+ * forever.
+ */
+ OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK |
+ OBD_FAIL_ONCE);
+
+ /*
+ * Move to next phase if reply was successfully
+ * unlinked.
+ */
+ ptlrpc_rqphase_move(req, req->rq_next_phase);
+ }
+
if (req->rq_phase == RQ_PHASE_COMPLETE)
continue;
if (req->rq_phase == RQ_PHASE_INTERPRET)
GOTO(interpret, req->rq_status);
- if (req->rq_net_err && !req->rq_timedout)
- ptlrpc_expire_one_request(req);
+ /*
+ * Note that this also will start async reply unlink.
+ */
+ if (req->rq_net_err && !req->rq_timedout) {
+ ptlrpc_expire_one_request(req, 1);
+
+ /*
+ * Check if we still need to wait for unlink.
+ */
+ if (ptlrpc_client_recv_or_unlink(req))
+ continue;
+ }
if (req->rq_err) {
- ptlrpc_unregister_reply(req);
req->rq_replied = 0;
if (req->rq_status == 0)
req->rq_status = -EIO;
- req->rq_phase = RQ_PHASE_INTERPRET;
-
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_list);
- spin_unlock(&imp->imp_lock);
-
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
GOTO(interpret, req->rq_status);
}
* interrupted rpcs after they have timed out */
if (req->rq_intr && (req->rq_timedout || req->rq_waiting ||
req->rq_wait_ctx)) {
- /* NB could be on delayed list */
- ptlrpc_unregister_reply(req);
req->rq_status = -EINTR;
- req->rq_phase = RQ_PHASE_INTERPRET;
-
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_list);
- spin_unlock(&imp->imp_lock);
-
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
GOTO(interpret, req->rq_status);
}
req->rq_waiting || req->rq_wait_ctx) {
int status;
- ptlrpc_unregister_reply(req);
+ if (!ptlrpc_unregister_reply(req, 1))
+ continue;
spin_lock(&imp->imp_lock);
continue;
}
- list_del_init(&req->rq_list);
if (status != 0) {
req->rq_status = status;
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req,
+ RQ_PHASE_INTERPRET);
spin_unlock(&imp->imp_lock);
GOTO(interpret, req->rq_status);
}
if (req->rq_no_resend && !req->rq_wait_ctx) {
req->rq_status = -ENOTCONN;
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req,
+ RQ_PHASE_INTERPRET);
spin_unlock(&imp->imp_lock);
GOTO(interpret, req->rq_status);
}
+
+ list_del_init(&req->rq_list);
list_add_tail(&req->rq_list,
&imp->imp_sending_list);
if (req->rq_bulk) {
__u64 old_xid = req->rq_xid;
- ptlrpc_unregister_bulk (req);
+ ptlrpc_unregister_bulk(req);
/* ensure previous bulk fails */
req->rq_xid = ptlrpc_next_xid();
spin_lock(&req->rq_lock);
- if (req->rq_early) {
+ if (ptlrpc_client_early(req)) {
ptlrpc_at_recv_early_reply(req);
spin_unlock(&req->rq_lock);
continue;
}
/* Still waiting for a reply? */
- if (req->rq_receiving_reply) {
+ if (ptlrpc_client_recv(req)) {
spin_unlock(&req->rq_lock);
continue;
}
/* Did we actually receive a reply? */
- if (!req->rq_replied) {
+ if (!ptlrpc_client_replied(req)) {
spin_unlock(&req->rq_lock);
continue;
}
spin_unlock(&req->rq_lock);
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_list);
- spin_unlock(&imp->imp_lock);
-
req->rq_status = after_reply(req);
if (req->rq_resend) {
/* Add this req to the delayed list so
it can be errored if the import is
evicted after recovery. */
spin_lock(&imp->imp_lock);
+ list_del_init(&req->rq_list);
list_add_tail(&req->rq_list,
&imp->imp_delayed_list);
spin_unlock(&imp->imp_lock);
/* If there is no bulk associated with this request,
* then we're done and should let the interpreter
- * process the reply. Similarly if the RPC returned
+ * process the reply. Similarly if the RPC returned
* an error, and therefore the bulk will never arrive.
*/
if (req->rq_bulk == NULL || req->rq_status != 0) {
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
GOTO(interpret, req->rq_status);
}
- req->rq_phase = RQ_PHASE_BULK;
+ ptlrpc_rqphase_move(req, RQ_PHASE_BULK);
}
LASSERT(req->rq_phase == RQ_PHASE_BULK);
LBUG();
}
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
interpret:
LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
- LASSERT(!req->rq_receiving_reply);
- ptlrpc_unregister_reply(req);
+ /* This moves to "unregistering" phase we need to wait for
+ * reply unlink. */
+ if (!ptlrpc_unregister_reply(req, 1))
+ continue;
+
if (req->rq_bulk != NULL)
- ptlrpc_unregister_bulk (req);
+ ptlrpc_unregister_bulk(req);
+
+ /* When calling interpret receiving already should be
+ * finished. */
+ LASSERT(!req->rq_receiving_reply);
if (req->rq_interpret_reply != NULL) {
ptlrpc_interpterer_t interpreter =
&req->rq_async_args,
req->rq_status);
}
- req->rq_phase = RQ_PHASE_COMPLETE;
+ ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:"
"opc %s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
libcfs_nid2str(imp->imp_connection->c_peer.nid),
lustre_msg_get_opc(req->rq_reqmsg));
+ spin_lock(&imp->imp_lock);
+ if (!list_empty(&req->rq_list))
+ list_del_init(&req->rq_list);
atomic_dec(&imp->imp_inflight);
+ spin_unlock(&imp->imp_lock);
+
set->set_remaining--;
cfs_waitq_signal(&imp->imp_recovery_waitq);
}
}
/* Return 1 if we should give up, else 0 */
-int ptlrpc_expire_one_request(struct ptlrpc_request *req)
+int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
{
struct obd_import *imp = req->rq_import;
int rc = 0;
req->rq_timedout = 1;
spin_unlock(&req->rq_lock);
- ptlrpc_unregister_reply (req);
+ ptlrpc_unregister_reply(req, async_unlink);
if (obd_dump_on_timeout)
libcfs_debug_dumplog();
LASSERT(set != NULL);
- /* A timeout expired; see which reqs it applies to... */
+ /*
+ * A timeout expired. See which reqs it applies to...
+ */
list_for_each (tmp, &set->set_requests) {
struct ptlrpc_request *req =
list_entry(tmp, struct ptlrpc_request, rq_set_chain);
- /* request in-flight? */
- if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting &&
- !req->rq_resend) ||
+ /* Request in-flight? */
+ if (!((req->rq_phase &
+ (RQ_PHASE_RPC | RQ_PHASE_UNREGISTERING) &&
+ !req->rq_waiting && !req->rq_resend) ||
(req->rq_phase == RQ_PHASE_BULK)))
continue;
-
- if (req->rq_timedout || /* already dealt with */
- req->rq_deadline > now) /* not expired */
+
+ if (req->rq_timedout || /* already dealt with */
+ req->rq_deadline > now) /* not expired */
continue;
- /* deal with this guy */
- ptlrpc_expire_one_request (req);
+ /* Deal with this guy. Do it asynchronously to not block
+ * ptlrpcd thread. */
+ ptlrpc_expire_one_request(req, 1);
}
- /* When waiting for a whole set, we always to break out of the
+ /*
+ * When waiting for a whole set, we always to break out of the
* sleep so we can recalculate the timeout, or enable interrupts
- * iff everyone's timed out.
+ * if everyone's timed out.
*/
RETURN(1);
}
struct ptlrpc_request *req =
list_entry(tmp, struct ptlrpc_request, rq_set_chain);
- if (req->rq_phase != RQ_PHASE_RPC)
+ if (req->rq_phase != RQ_PHASE_RPC &&
+ req->rq_phase != RQ_PHASE_UNREGISTERING)
continue;
ptlrpc_mark_interrupted(req);
}
}
-/* get the smallest timeout in the set; this does NOT set a timeout. */
+/**
+ * Get the smallest timeout in the set; this does NOT set a timeout.
+ */
int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
{
struct list_head *tmp;
list_for_each(tmp, &set->set_requests) {
req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
- /* request in-flight? */
- if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
+ /*
+ * Request in-flight?
+ */
+ if (!(((req->rq_phase &
+ (RQ_PHASE_RPC | RQ_PHASE_UNREGISTERING)) &&
+ !req->rq_waiting) ||
(req->rq_phase == RQ_PHASE_BULK) ||
(req->rq_phase == RQ_PHASE_NEW)))
continue;
- if (req->rq_timedout) /* already timed out */
+ /*
+ * Check those waiting for long reply unlink every one
+ * second.
+ */
+ if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
+ timeout = 1;
+ break;
+ }
+
+ /*
+ * Already timed out.
+ */
+ if (req->rq_timedout)
continue;
- if (req->rq_wait_ctx) /* waiting for ctx */
+ /*
+ * Waiting for ctx.
+ */
+ if (req->rq_wait_ctx)
continue;
if (req->rq_phase == RQ_PHASE_NEW)
* IDEMPOTENT, but _not_ safe against concurrent callers.
* The request owner (i.e. the thread doing the I/O) must call...
*/
-void ptlrpc_unregister_reply (struct ptlrpc_request *request)
+int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
{
int rc;
cfs_waitq_t *wq;
struct l_wait_info lwi;
- LASSERT(!in_interrupt ()); /* might sleep */
+ /*
+ * Might sleep.
+ */
+ LASSERT(!in_interrupt());
+
+ /*
+ * Let's setup deadline for reply unlink.
+ */
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+ async && request->rq_reply_deadline == 0)
+ request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK;
+
+ /*
+ * Nothing left to do.
+ */
if (!ptlrpc_client_recv_or_unlink(request))
- /* Nothing left to do */
- return;
+ RETURN(1);
- LNetMDUnlink (request->rq_reply_md_h);
+ LNetMDUnlink(request->rq_reply_md_h);
- /* We have to l_wait_event() whatever the result, to give liblustre
- * a chance to run reply_in_callback(), and to make sure we've
- * unlinked before returning a req to the pool */
+ /*
+ * Let's check it once again.
+ */
+ if (!ptlrpc_client_recv_or_unlink(request))
+ RETURN(1);
+
+ /*
+ * Move to "Unregistering" phase as reply was not unlinked yet.
+ */
+ ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING);
+ /*
+ * Do not wait for unlink to finish.
+ */
+ if (async)
+ RETURN(0);
+
+ /*
+ * We have to l_wait_event() whatever the result, to give liblustre
+ * a chance to run reply_in_callback(), and to make sure we've
+ * unlinked before returning a req to the pool.
+ */
if (request->rq_set != NULL)
wq = &request->rq_set->set_waitq;
else
/* Network access will complete in finite time but the HUGE
* timeout lets us CWARN for visibility of sluggish NALs */
lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL);
- rc = l_wait_event (*wq, !ptlrpc_client_recv_or_unlink(request),
- &lwi);
- if (rc == 0)
- return;
-
- LASSERT (rc == -ETIMEDOUT);
+ rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
+ &lwi);
+ if (rc == 0) {
+ ptlrpc_rqphase_move(request, request->rq_next_phase);
+ RETURN(1);
+ }
+
+ LASSERT(rc == -ETIMEDOUT);
DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout "
"rvcng=%d unlnk=%d", request->rq_receiving_reply,
request->rq_must_unlink);
}
+ RETURN(0);
}
/* caller must hold imp->imp_lock */
CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
old_xid, req->rq_xid);
}
- ptlrpc_wake_client_req(req);
+ ptlrpc_client_wake_req(req);
spin_unlock(&req->rq_lock);
}
spin_lock(&req->rq_lock);
req->rq_restart = 1;
req->rq_timedout = 0;
- ptlrpc_wake_client_req(req);
+ ptlrpc_client_wake_req(req);
spin_unlock(&req->rq_lock);
}
struct ptlrpc_request *req = data;
ENTRY;
- /* some failure can suspend regular timeouts */
+ /*
+ * Some failure can suspend regular timeouts.
+ */
if (ptlrpc_check_suspend())
RETURN(1);
- /* deadline may have changed with an early reply */
+ /*
+ * Deadline may have changed with an early reply.
+ */
if (req->rq_deadline > cfs_time_current_sec())
RETURN(1);
- RETURN(ptlrpc_expire_one_request(req));
+ RETURN(ptlrpc_expire_one_request(req, 0));
}
static void interrupted_request(void *data)
lustre_msg_get_opc(req->rq_reqmsg));
/* Mark phase here for a little debug help */
- req->rq_phase = RQ_PHASE_RPC;
+ ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
spin_lock(&imp->imp_lock);
req->rq_import_generation = imp->imp_generation;
/* If the reply was received normally, this just grabs the spinlock
* (ensuring the reply callback has returned), sees that
* req->rq_receiving_reply is clear and returns. */
- ptlrpc_unregister_reply (req);
-
+ ptlrpc_unregister_reply(req, 0);
if (req->rq_err) {
DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d",
GOTO(out, rc = -ETIMEDOUT);
}
- if (!req->rq_replied) {
+ if (!ptlrpc_client_replied(req)) {
/* How can this be? -eeb */
DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
LBUG();
}
LASSERT(!req->rq_receiving_reply);
- req->rq_phase = RQ_PHASE_INTERPRET;
+ ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
atomic_dec(&imp->imp_inflight);
cfs_waitq_signal(&imp->imp_recovery_waitq);
ENTRY;
atomic_dec(&imp->imp_replay_inflight);
- if (!req->rq_replied) {
+ if (!ptlrpc_client_replied(req)) {
CERROR("request replay timed out, restarting recovery\n");
GOTO(out, rc = -ETIMEDOUT);
}
if (req->rq_replay_cb)
req->rq_replay_cb(req);
- if (req->rq_replied &&
+ if (ptlrpc_client_replied(req) &&
lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) {
DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
lustre_msg_get_status(req->rq_repmsg),
aa->praa_old_state = req->rq_send_state;
req->rq_send_state = LUSTRE_IMP_REPLAY;
req->rq_phase = RQ_PHASE_NEW;
+ req->rq_next_phase = RQ_PHASE_UNDEFINED;
if (req->rq_repmsg)
aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
req->rq_status = 0;
if (req->rq_import_generation < imp->imp_generation) {
req->rq_err = 1;
req->rq_status = -EINTR;
- ptlrpc_wake_client_req(req);
+ ptlrpc_client_wake_req(req);
}
spin_unlock (&req->rq_lock);
}
if (req->rq_import_generation < imp->imp_generation) {
req->rq_err = 1;
req->rq_status = -EINTR;
- ptlrpc_wake_client_req(req);
+ ptlrpc_client_wake_req(req);
}
spin_unlock (&req->rq_lock);
}
req->rq_err = 1;
req->rq_status = -EINTR;
- ptlrpc_wake_client_req(req);
+ ptlrpc_client_wake_req(req);
spin_unlock (&req->rq_lock);
}
}
ptlrpc_deactivate_and_unlock_import(imp);
}
+static unsigned int
+ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
+{
+ long dl;
+
+ if (!(((req->rq_phase & (RQ_PHASE_RPC | RQ_PHASE_UNREGISTERING)) &&
+ !req->rq_waiting) ||
+ (req->rq_phase == RQ_PHASE_BULK) ||
+ (req->rq_phase == RQ_PHASE_NEW)))
+ return 0;
+
+ if (req->rq_timedout)
+ return 0;
+
+ if (req->rq_phase == RQ_PHASE_NEW)
+ dl = req->rq_sent;
+ else
+ dl = req->rq_deadline;
+
+ if (dl <= now)
+ return 0;
+
+ return dl - now;
+}
+
+static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
+{
+ time_t now = cfs_time_current_sec();
+ struct list_head *tmp, *n;
+ struct ptlrpc_request *req;
+ unsigned int timeout = 0;
+
+ spin_lock(&imp->imp_lock);
+ list_for_each_safe(tmp, n, &imp->imp_sending_list) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
+ }
+ spin_unlock(&imp->imp_lock);
+ return timeout;
+}
+
/*
* This function will invalidate the import, if necessary, then block
* for all the RPC completions, and finally notify the obd to
struct list_head *tmp, *n;
struct ptlrpc_request *req;
struct l_wait_info lwi;
+ unsigned int timeout;
int rc;
atomic_inc(&imp->imp_inval_count);
LASSERT(imp->imp_invalid);
- /* wait for all requests to error out and call completion callbacks.
- Cap it at obd_timeout -- these should all have been locally
- cancelled by ptlrpc_abort_inflight. */
- lwi = LWI_TIMEOUT_INTERVAL(
- cfs_timeout_cap(cfs_time_seconds(obd_timeout)),
- cfs_time_seconds(1), NULL, NULL);
- rc = l_wait_event(imp->imp_recovery_waitq,
- (atomic_read(&imp->imp_inflight) == 0), &lwi);
+ /* Wait forever until inflight == 0. We really can't do it another
+ * way because in some cases we need to wait for very long reply
+ * unlink. We can't do anything before that because there is really
+ * no guarantee that some rdma transfer is not in progress right now. */
+ do {
+ /* Calculate max timeout for waiting on rpcs to error
+ * out. Use obd_timeout if calculated value is smaller
+ * than it. */
+ timeout = ptlrpc_inflight_timeout(imp);
+ timeout += timeout / 3;
+
+ if (timeout == 0)
+ timeout = obd_timeout;
+
+ CDEBUG(D_RPCTRACE, "Sleeping %d sec for inflight to error out\n",
+ timeout);
+
+ /* Wait for all requests to error out and call completion
+ * callbacks. Cap it at obd_timeout -- these should all
+ * have been locally cancelled by ptlrpc_abort_inflight. */
+ lwi = LWI_TIMEOUT_INTERVAL(
+ cfs_timeout_cap(cfs_time_seconds(timeout)),
+ cfs_time_seconds(1), NULL, NULL);
+ rc = l_wait_event(imp->imp_recovery_waitq,
+ (atomic_read(&imp->imp_inflight) == 0), &lwi);
+ if (rc) {
+ const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
- if (rc) {
- CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
- obd2cli_tgt(imp->imp_obd), rc,
- atomic_read(&imp->imp_inflight));
- spin_lock(&imp->imp_lock);
- list_for_each_safe(tmp, n, &imp->imp_sending_list) {
- req = list_entry(tmp, struct ptlrpc_request, rq_list);
- DEBUG_REQ(D_ERROR, req, "still on sending list");
- }
- list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
- req = list_entry(tmp, struct ptlrpc_request, rq_list);
- DEBUG_REQ(D_ERROR, req, "still on delayed list");
- }
- spin_unlock(&imp->imp_lock);
- LASSERT(atomic_read(&imp->imp_inflight) == 0);
- }
+ CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
+ cli_tgt, rc, atomic_read(&imp->imp_inflight));
+ spin_lock(&imp->imp_lock);
+ list_for_each_safe(tmp, n, &imp->imp_sending_list) {
+ req = list_entry(tmp, struct ptlrpc_request,
+ rq_list);
+ DEBUG_REQ(D_ERROR, req, "still on sending list");
+ }
+ list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
+ req = list_entry(tmp, struct ptlrpc_request,
+ rq_list);
+ DEBUG_REQ(D_ERROR, req, "still on delayed list");
+ }
+
+ if (atomic_read(&imp->imp_unregistering) == 0) {
+ /* We know that only "unregistering" rpcs may
+ * still survive in sending or delaying lists
+ * (They are waiting for long reply unlink in
+ * sluggish nets). Let's check this. If there
+ * is no unregistering and inflight != 0 this
+ * is bug. */
+ LASSERT(atomic_read(&imp->imp_inflight) == 0);
+
+ /* Let's save one loop as soon as inflight have
+ * dropped to zero. No new inflights possible at
+ * this point. */
+ rc = 0;
+ } else {
+ CERROR("%s: RPCs in \"%s\" phase found (%d). "
+ "Network is sluggish? Waiting them "
+ "to error out.\n", cli_tgt,
+ ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
+ atomic_read(&imp->imp_unregistering));
+ }
+ spin_unlock(&imp->imp_lock);
+ }
+ } while (rc != 0);
+
+ /*
+ * Let's additionally check that no new rpcs added to import in
+ * "invalidate" state.
+ */
+ LASSERT(atomic_read(&imp->imp_inflight) == 0);
out:
obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
sptlrpc_import_flush_all_ctx(imp);
#endif
lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_NEXT_VER);
+ request->rq_no_resend = request->rq_no_delay = 1;
request->rq_send_state = LUSTRE_IMP_CONNECTING;
/* Allow a slightly larger reply for future growth compatibility */
req_capsule_set_size(&request->rq_pill, &RMF_CONNECT_DATA, RCL_SERVER,