RETURN(delay);
}
-static int ptlrpc_check_reply(struct ptlrpc_request *req)
-{
- int rc = 0;
- ENTRY;
-
- /* serialise with network callback */
- spin_lock(&req->rq_lock);
-
- if (ptlrpc_client_replied(req))
- GOTO(out, rc = 1);
-
- if (req->rq_net_err && !req->rq_timedout) {
- spin_unlock(&req->rq_lock);
- rc = ptlrpc_expire_one_request(req, 0);
- spin_lock(&req->rq_lock);
- GOTO(out, rc);
- }
-
- if (req->rq_err)
- GOTO(out, rc = 1);
-
- if (req->rq_resend)
- GOTO(out, rc = 1);
-
- if (req->rq_restart)
- GOTO(out, rc = 1);
-
- if (ptlrpc_client_early(req)) {
- ptlrpc_at_recv_early_reply(req);
- GOTO(out, rc = 0); /* keep waiting */
- }
-
- EXIT;
- out:
- spin_unlock(&req->rq_lock);
- DEBUG_REQ(D_NET, req, "rc = %d for", rc);
- return rc;
-}
/* Conditionally suppress specific console messages */
static int ptlrpc_console_allow(struct ptlrpc_request *req)
return 1;
}
-
static int ptlrpc_check_status(struct ptlrpc_request *req)
{
int err;
GOTO(interpret, req->rq_status);
}
- /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
- * will only be set after rq_timedout, but the synchronous IO
- * waiting path sets rq_intr irrespective of whether ptlrpcd
- * has seen a timeout. our policy is to only interpret
- * interrupted rpcs after they have timed out */
+ /* ptlrpc_set_wait->l_wait_event sets lwi_allow_intr
+ * so it sets rq_intr regardless of individual rpc
+ * timeouts. The synchronous IO waiting path sets
+ * rq_intr irrespective of whether ptlrpcd
+ * has seen a timeout. Our policy is to only interpret
+ * interrupted rpcs after they have timed out, so we
+ * need to enforce that here.
+ */
+
if (req->rq_intr && (req->rq_timedout || req->rq_waiting ||
req->rq_wait_ctx)) {
req->rq_status = -EINTR;
* req times out */
CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n",
set, timeout);
- lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout ? timeout : 1),
- ptlrpc_expired_set,
- ptlrpc_interrupted_set, set);
- rc = l_wait_event(set->set_waitq,
- ptlrpc_check_set(NULL, set), &lwi);
+
+ if (timeout == 0 && !cfs_signal_pending())
+ /*
+ * No requests are in-flight (ether timed out
+ * or delayed), so we can allow interrupts.
+ * We still want to block for a limited time,
+ * so we allow interrupts during the timeout.
+ */
+ lwi = LWI_TIMEOUT_INTR_ALL(cfs_time_seconds(1),
+ ptlrpc_expired_set,
+ ptlrpc_interrupted_set, set);
+ else
+ /*
+ * At least one request is in flight, so no
+ * interrupts are allowed. Wait until all
+ * complete, or an in-flight req times out.
+ */
+ lwi = LWI_TIMEOUT(cfs_time_seconds(timeout? timeout : 1),
+ ptlrpc_expired_set, set);
+
+ rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi);
LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
spin_unlock(&req->rq_lock);
}
-static int expired_request(void *data)
-{
- struct ptlrpc_request *req = data;
- ENTRY;
-
- /*
- * Some failure can suspend regular timeouts.
- */
- if (ptlrpc_check_suspend())
- RETURN(1);
-
- /*
- * Deadline may have changed with an early reply.
- */
- if (req->rq_deadline > cfs_time_current_sec())
- RETURN(1);
-
- RETURN(ptlrpc_expire_one_request(req, 0));
-}
-
-static void interrupted_request(void *data)
-{
- struct ptlrpc_request *req = data;
- DEBUG_REQ(D_HA, req, "request interrupted");
- spin_lock(&req->rq_lock);
- req->rq_intr = 1;
- spin_unlock(&req->rq_lock);
-}
-
struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
{
ENTRY;
int ptlrpc_queue_wait(struct ptlrpc_request *req)
{
- int rc = 0;
- int brc;
- struct l_wait_info lwi;
- struct obd_import *imp = req->rq_import;
- cfs_duration_t timeout = CFS_TICK;
- long timeoutl;
+ struct ptlrpc_request_set *set;
+ int rc;
ENTRY;
LASSERT(req->rq_set == NULL);
LASSERT(!req->rq_receiving_reply);
- /* for distributed debugging */
- lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid());
- LASSERT(imp->imp_obd != NULL);
- CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc "
- "%s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
- imp->imp_obd->obd_uuid.uuid,
- lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
- libcfs_nid2str(imp->imp_connection->c_peer.nid),
- lustre_msg_get_opc(req->rq_reqmsg));
-
- /* Mark phase here for a little debug help */
- ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
-
- spin_lock(&imp->imp_lock);
- req->rq_import_generation = imp->imp_generation;
-restart:
- if (ptlrpc_import_delay_req(imp, req, &rc)) {
- list_del_init(&req->rq_list);
- list_add_tail(&req->rq_list, &imp->imp_delayed_list);
- atomic_inc(&imp->imp_inflight);
- spin_unlock(&imp->imp_lock);
-
- DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
- cfs_curproc_comm(),
- ptlrpc_import_state_name(req->rq_send_state),
- ptlrpc_import_state_name(imp->imp_state));
- lwi = LWI_INTR(interrupted_request, req);
- rc = l_wait_event(req->rq_reply_waitq,
- (req->rq_send_state == imp->imp_state ||
- req->rq_err || req->rq_intr),
- &lwi);
- DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d/%d == 1)",
- cfs_curproc_comm(),
- ptlrpc_import_state_name(imp->imp_state),
- ptlrpc_import_state_name(req->rq_send_state),
- req->rq_err, req->rq_intr);
-
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_list);
- atomic_dec(&imp->imp_inflight);
-
- if (req->rq_err) {
- /* rq_status was set locally */
- rc = req->rq_status ? req->rq_status : -EIO;
- }
- else if (req->rq_intr) {
- rc = -EINTR;
- }
- else if (req->rq_no_resend) {
- rc = -ETIMEDOUT;
- } else {
- GOTO(restart, rc);
- }
- }
-
- if (rc != 0) {
- spin_unlock(&imp->imp_lock);
- req->rq_status = rc; // XXX this ok?
- GOTO(out, rc);
- }
-
- if (req->rq_resend) {
- if (req->rq_bulk != NULL) {
- ptlrpc_unregister_bulk(req, 0);
-
- /* bulk requests are supposed to be
- * idempotent, so we are free to bump the xid
- * here, which we need to do before
- * registering the bulk again (bug 6371).
- * print the old xid first for sanity.
- */
- DEBUG_REQ(D_HA, req, "bumping xid for bulk: ");
- req->rq_xid = ptlrpc_next_xid();
- }
-
- DEBUG_REQ(D_HA, req, "resending: ");
- }
-
- /* XXX this is the same as ptlrpc_set_wait */
- LASSERT(list_empty(&req->rq_list));
- list_add_tail(&req->rq_list, &imp->imp_sending_list);
- atomic_inc(&imp->imp_inflight);
- spin_unlock(&imp->imp_lock);
-
- rc = sptlrpc_req_refresh_ctx(req, 0);
- if (rc) {
- if (req->rq_err) {
- /* we got fatal ctx refresh error, directly jump out
- * thus we can pass back the actual error code.
- */
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_list);
- atomic_dec(&imp->imp_inflight);
- spin_unlock(&imp->imp_lock);
-
- CERROR("Failed to refresh ctx of req %p: %d\n",
- req, rc);
- GOTO(out, rc);
- }
- /* simulating we got error during send rpc */
- goto after_send;
- }
-
- rc = ptl_send_rpc(req, 0);
- if (rc)
- DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
-repeat:
- timeoutl = req->rq_deadline - cfs_time_current_sec();
- timeout = (timeoutl <= 0 || rc) ? CFS_TICK :
- cfs_time_seconds(timeoutl);
- DEBUG_REQ(D_NET, req,
- "-- sleeping for "CFS_DURATION_T" ticks", timeout);
- lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
- req);
- brc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
- if (brc == -ETIMEDOUT && ((req->rq_deadline > cfs_time_current_sec()) ||
- ptlrpc_check_and_wait_suspend(req)))
- goto repeat;
-
-after_send:
- CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:opc "
- "%s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
- imp->imp_obd->obd_uuid.uuid,
- lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
- libcfs_nid2str(imp->imp_connection->c_peer.nid),
- lustre_msg_get_opc(req->rq_reqmsg));
-
- /* If the reply was received normally, this just grabs the spinlock
- * (ensuring the reply callback has returned), sees that
- * req->rq_receiving_reply is clear and returns. */
- ptlrpc_unregister_reply(req, 0);
-
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_list);
- atomic_dec(&imp->imp_inflight);
- spin_unlock(&imp->imp_lock);
-
- if (req->rq_err) {
- DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d",
- rc, req->rq_status);
- rc = rc ? rc : req->rq_status;
- GOTO(out, rc = rc ? rc : -EIO);
- }
-
- if (req->rq_intr) {
- /* Should only be interrupted if we timed out. */
- if (!req->rq_timedout)
- DEBUG_REQ(D_ERROR, req,
- "rq_intr set but rq_timedout not");
- GOTO(out, rc = -EINTR);
- }
-
- /* Resend if we need to */
- if (req->rq_resend||req->rq_timedout) {
- /* ...unless we were specifically told otherwise. */
- if (req->rq_no_resend)
- GOTO(out, rc = -ETIMEDOUT);
- spin_lock(&imp->imp_lock);
- /* we can have rq_timeout on dlm fake import which not support
- * recovery - but me need resend request on this import instead
- * of return error */
- req->rq_resend = 1;
- goto restart;
- }
-
- if (!ptlrpc_client_replied(req)) {
- /* How can this be? -eeb */
- DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
- LBUG();
- GOTO(out, rc = req->rq_status);
+ set = ptlrpc_prep_set();
+ if (set == NULL) {
+ CERROR("Unable to allocate ptlrpc set.");
+ RETURN(-ENOMEM);
}
- rc = after_reply(req);
- /* NB may return +ve success rc */
- if (req->rq_resend) {
- spin_lock(&imp->imp_lock);
- goto restart;
- }
+ /* for distributed debugging */
+ lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid());
- out:
- if (req->rq_bulk != NULL) {
- if (rc >= 0) {
- /* success so far. Note that anything going wrong
- * with bulk now, is EXTREMELY strange, since the
- * server must have believed that the bulk
- * transferred OK before she replied with success to
- * me. */
- lwi = LWI_TIMEOUT(timeout, NULL, NULL);
- brc = l_wait_event(req->rq_reply_waitq,
- !ptlrpc_client_bulk_active(req),
- &lwi);
- LASSERT(brc == 0 || brc == -ETIMEDOUT);
- if (brc != 0) {
- LASSERT(brc == -ETIMEDOUT);
- DEBUG_REQ(D_ERROR, req, "bulk timed out");
- rc = brc;
- } else if (!req->rq_bulk->bd_success) {
- DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
- rc = -EIO;
- }
- }
- if (rc < 0)
- ptlrpc_unregister_bulk(req, 0);
- }
+ /* add a ref for the set (see comment in ptlrpc_set_add_req) */
+ ptlrpc_request_addref(req);
+ ptlrpc_set_add_req(set, req);
+ rc = ptlrpc_set_wait(set);
+ ptlrpc_set_destroy(set);
- LASSERT(!req->rq_receiving_reply);
- ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
- cfs_waitq_broadcast(&imp->imp_recovery_waitq);
RETURN(rc);
}