- /* for distributed debugging */
- lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid());
- LASSERT(imp->imp_obd != NULL);
- CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc "
- "%s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
- imp->imp_obd->obd_uuid.uuid,
- lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
- libcfs_nid2str(imp->imp_connection->c_peer.nid),
- lustre_msg_get_opc(req->rq_reqmsg));
-
- /* Mark phase here for a little debug help */
- ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
-
- spin_lock(&imp->imp_lock);
- req->rq_import_generation = imp->imp_generation;
-restart:
- if (ptlrpc_import_delay_req(imp, req, &rc)) {
- list_del_init(&req->rq_list);
- list_add_tail(&req->rq_list, &imp->imp_delayed_list);
- atomic_inc(&imp->imp_inflight);
- spin_unlock(&imp->imp_lock);
-
- DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
- cfs_curproc_comm(),
- ptlrpc_import_state_name(req->rq_send_state),
- ptlrpc_import_state_name(imp->imp_state));
- lwi = LWI_INTR(interrupted_request, req);
- rc = l_wait_event(req->rq_reply_waitq,
- (req->rq_send_state == imp->imp_state ||
- req->rq_err || req->rq_intr),
- &lwi);
- DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d/%d == 1)",
- cfs_curproc_comm(),
- ptlrpc_import_state_name(imp->imp_state),
- ptlrpc_import_state_name(req->rq_send_state),
- req->rq_err, req->rq_intr);
-
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_list);
- atomic_dec(&imp->imp_inflight);
-
- if (req->rq_err) {
- /* rq_status was set locally */
- rc = -EIO;
- }
- else if (req->rq_intr) {
- rc = -EINTR;
- }
- else if (req->rq_no_resend) {
- spin_unlock(&imp->imp_lock);
- GOTO(out, rc = -ETIMEDOUT);
- }
- else {
- GOTO(restart, rc);
- }
- }
-
- if (rc != 0) {
- spin_unlock(&imp->imp_lock);
- req->rq_status = rc; // XXX this ok?
- GOTO(out, rc);
- }
-
- if (req->rq_resend) {
- if (req->rq_bulk != NULL) {
- ptlrpc_unregister_bulk(req, 0);
-
- /* bulk requests are supposed to be
- * idempotent, so we are free to bump the xid
- * here, which we need to do before
- * registering the bulk again (bug 6371).
- * print the old xid first for sanity.
- */
- DEBUG_REQ(D_HA, req, "bumping xid for bulk: ");
- req->rq_xid = ptlrpc_next_xid();
- }
-
- DEBUG_REQ(D_HA, req, "resending: ");
- }
-
- /* XXX this is the same as ptlrpc_set_wait */
- LASSERT(list_empty(&req->rq_list));
- list_add_tail(&req->rq_list, &imp->imp_sending_list);
- atomic_inc(&imp->imp_inflight);
- spin_unlock(&imp->imp_lock);
-
- rc = sptlrpc_req_refresh_ctx(req, 0);
- if (rc) {
- if (req->rq_err) {
- /* we got fatal ctx refresh error, directly jump out
- * thus we can pass back the actual error code.
- */
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_list);
- atomic_dec(&imp->imp_inflight);
- spin_unlock(&imp->imp_lock);
-
- CERROR("Failed to refresh ctx of req %p: %d\n",
- req, rc);
- GOTO(out, rc);
- }
- /* simulating we got error during send rpc */
- goto after_send;
- }
-
- rc = ptl_send_rpc(req, 0);
- if (rc)
- DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
-repeat:
- timeoutl = req->rq_deadline - cfs_time_current_sec();
- timeout = (timeoutl <= 0 || rc) ? CFS_TICK :
- cfs_time_seconds(timeoutl);
- DEBUG_REQ(D_NET, req,
- "-- sleeping for "CFS_DURATION_T" ticks", timeout);
- lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
- req);
- brc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
- if (brc == -ETIMEDOUT && ((req->rq_deadline > cfs_time_current_sec()) ||
- ptlrpc_check_and_wait_suspend(req)))
- goto repeat;
-
-after_send:
- CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:opc "
- "%s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
- imp->imp_obd->obd_uuid.uuid,
- lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
- libcfs_nid2str(imp->imp_connection->c_peer.nid),
- lustre_msg_get_opc(req->rq_reqmsg));
-
- /* If the reply was received normally, this just grabs the spinlock
- * (ensuring the reply callback has returned), sees that
- * req->rq_receiving_reply is clear and returns. */
- ptlrpc_unregister_reply(req, 0);
-
- spin_lock(&imp->imp_lock);
- list_del_init(&req->rq_list);
- atomic_dec(&imp->imp_inflight);
- spin_unlock(&imp->imp_lock);
-
- if (req->rq_err) {
- DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d",
- rc, req->rq_status);
- GOTO(out, rc = rc ? rc : -EIO);
- }
-
- if (req->rq_intr) {
- /* Should only be interrupted if we timed out. */
- if (!req->rq_timedout)
- DEBUG_REQ(D_ERROR, req,
- "rq_intr set but rq_timedout not");
- GOTO(out, rc = -EINTR);
- }
-
- /* Resend if we need to */
- if (req->rq_resend||req->rq_timedout) {
- /* ...unless we were specifically told otherwise. */
- if (req->rq_no_resend)
- GOTO(out, rc = -ETIMEDOUT);
- spin_lock(&imp->imp_lock);
- /* we can have rq_timeout on dlm fake import which not support
- * recovery - but me need resend request on this import instead
- * of return error */
- req->rq_resend = 1;
- goto restart;
- }
-
- if (!ptlrpc_client_replied(req)) {
- /* How can this be? -eeb */
- DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
- LBUG();
- GOTO(out, rc = req->rq_status);