ptlrpc_deactivate_and_unlock_import(imp);
}
+static unsigned int
+ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
+{
+ long dl;
+
+ if (!(((req->rq_phase & (RQ_PHASE_RPC | RQ_PHASE_UNREGISTERING)) &&
+ !req->rq_waiting) ||
+ (req->rq_phase == RQ_PHASE_BULK) ||
+ (req->rq_phase == RQ_PHASE_NEW)))
+ return 0;
+
+ if (req->rq_timedout)
+ return 0;
+
+ if (req->rq_phase == RQ_PHASE_NEW)
+ dl = req->rq_sent;
+ else
+ dl = req->rq_deadline;
+
+ if (dl <= now)
+ return 0;
+
+ return dl - now;
+}
+
+static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
+{
+ time_t now = cfs_time_current_sec();
+ struct list_head *tmp, *n;
+ struct ptlrpc_request *req;
+ unsigned int timeout = 0;
+
+ spin_lock(&imp->imp_lock);
+ list_for_each_safe(tmp, n, &imp->imp_sending_list) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
+ }
+ spin_unlock(&imp->imp_lock);
+ return timeout;
+}
+
/*
* This function will invalidate the import, if necessary, then block
* for all the RPC completions, and finally notify the obd to
struct list_head *tmp, *n;
struct ptlrpc_request *req;
struct l_wait_info lwi;
+ unsigned int timeout;
int rc;
atomic_inc(&imp->imp_inval_count);
LASSERT(imp->imp_invalid);
- /* wait for all requests to error out and call completion callbacks.
- Cap it at obd_timeout -- these should all have been locally
- cancelled by ptlrpc_abort_inflight. */
- lwi = LWI_TIMEOUT_INTERVAL(
- cfs_timeout_cap(cfs_time_seconds(obd_timeout)),
- cfs_time_seconds(1), NULL, NULL);
- rc = l_wait_event(imp->imp_recovery_waitq,
- (atomic_read(&imp->imp_inflight) == 0), &lwi);
+ /* Wait forever until inflight == 0. We really can't do it another
+ * way because in some cases we need to wait for very long reply
+ * unlink. We can't do anything before that because there is really
+ * no guarantee that some rdma transfer is not in progress right now. */
+ do {
+ /* Calculate max timeout for waiting on rpcs to error
+ * out. Use obd_timeout if calculated value is smaller
+ * than it. */
+ timeout = ptlrpc_inflight_timeout(imp);
+ timeout += timeout / 3;
+
+ if (timeout == 0)
+ timeout = obd_timeout;
+
+ CDEBUG(D_RPCTRACE, "Sleeping %d sec for inflight to error out\n",
+ timeout);
+
+ /* Wait for all requests to error out and call completion
+ * callbacks. Cap it at obd_timeout -- these should all
+ * have been locally cancelled by ptlrpc_abort_inflight. */
+ lwi = LWI_TIMEOUT_INTERVAL(
+ cfs_timeout_cap(cfs_time_seconds(timeout)),
+ cfs_time_seconds(1), NULL, NULL);
+ rc = l_wait_event(imp->imp_recovery_waitq,
+ (atomic_read(&imp->imp_inflight) == 0), &lwi);
+ if (rc) {
+ const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
- if (rc) {
- CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
- obd2cli_tgt(imp->imp_obd), rc,
- atomic_read(&imp->imp_inflight));
- spin_lock(&imp->imp_lock);
- list_for_each_safe(tmp, n, &imp->imp_sending_list) {
- req = list_entry(tmp, struct ptlrpc_request, rq_list);
- DEBUG_REQ(D_ERROR, req, "still on sending list");
- }
- list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
- req = list_entry(tmp, struct ptlrpc_request, rq_list);
- DEBUG_REQ(D_ERROR, req, "still on delayed list");
- }
- spin_unlock(&imp->imp_lock);
- LASSERT(atomic_read(&imp->imp_inflight) == 0);
- }
+ CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
+ cli_tgt, rc, atomic_read(&imp->imp_inflight));
+ spin_lock(&imp->imp_lock);
+ list_for_each_safe(tmp, n, &imp->imp_sending_list) {
+ req = list_entry(tmp, struct ptlrpc_request,
+ rq_list);
+ DEBUG_REQ(D_ERROR, req, "still on sending list");
+ }
+ list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
+ req = list_entry(tmp, struct ptlrpc_request,
+ rq_list);
+ DEBUG_REQ(D_ERROR, req, "still on delayed list");
+ }
+
+ if (atomic_read(&imp->imp_unregistering) == 0) {
+ /* We know that only "unregistering" rpcs may
+ * still survive in sending or delaying lists
+ * (They are waiting for long reply unlink in
+ * sluggish nets). Let's check this. If there
+ * is no unregistering and inflight != 0 this
+ * is bug. */
+ LASSERT(atomic_read(&imp->imp_inflight) == 0);
+
+ /* Let's save one loop as soon as inflight have
+ * dropped to zero. No new inflights possible at
+ * this point. */
+ rc = 0;
+ } else {
+ CERROR("%s: RPCs in \"%s\" phase found (%d). "
+ "Network is sluggish? Waiting them "
+ "to error out.\n", cli_tgt,
+ ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
+ atomic_read(&imp->imp_unregistering));
+ }
+ spin_unlock(&imp->imp_lock);
+ }
+ } while (rc != 0);
+
+ /*
+ * Let's additionally check that no new rpcs added to import in
+ * "invalidate" state.
+ */
+ LASSERT(atomic_read(&imp->imp_inflight) == 0);
out:
obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
sptlrpc_import_flush_all_ctx(imp);
#endif
lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_NEXT_VER);
+ request->rq_no_resend = request->rq_no_delay = 1;
request->rq_send_state = LUSTRE_IMP_CONNECTING;
/* Allow a slightly larger reply for future growth compatibility */
req_capsule_set_size(&request->rq_pill, &RMF_CONNECT_DATA, RCL_SERVER,