idx = import_at_get_index(req->rq_import, req->rq_request_portal);
/* max service estimates are tracked on the server side,
so just keep minimal history here */
- oldse = at_add(&at->iat_service_estimate[idx], serv_est);
+ oldse = at_measured(&at->iat_service_estimate[idx], serv_est);
if (oldse != 0)
CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d "
"has changed from %d to %d\n",
CFS_DURATION_T"\n", service_time,
cfs_time_sub(now, req->rq_sent));
- oldnl = at_add(&at->iat_net_latency, nl);
+ oldnl = at_measured(&at->iat_net_latency, nl);
if (oldnl != 0)
CDEBUG(D_ADAPTTO, "The network latency for %s (nid %s) "
"has changed from %d to %d\n",
int i;
int size = 1;
- while (size < pool->prp_rq_size + SPTLRPC_MAX_PAYLOAD)
+ while (size < pool->prp_rq_size)
size <<= 1;
LASSERTF(cfs_list_empty(&pool->prp_req_list) ||
cfs_spin_lock_init(&pool->prp_lock);
CFS_INIT_LIST_HEAD(&pool->prp_req_list);
- pool->prp_rq_size = msgsize;
+ pool->prp_rq_size = msgsize + SPTLRPC_MAX_PAYLOAD;
pool->prp_populate = populate_pool;
populate_pool(pool, num_rq);
CFS_INIT_LIST_HEAD(&request->rq_history_list);
CFS_INIT_LIST_HEAD(&request->rq_exp_list);
cfs_waitq_init(&request->rq_reply_waitq);
+ cfs_waitq_init(&request->rq_set_waitq);
request->rq_xid = ptlrpc_next_xid();
cfs_atomic_set(&request->rq_refcount, 1);
CFS_INIT_LIST_HEAD(&request->rq_history_list);
CFS_INIT_LIST_HEAD(&request->rq_exp_list);
cfs_waitq_init(&request->rq_reply_waitq);
+ cfs_waitq_init(&request->rq_set_waitq);
request->rq_xid = ptlrpc_next_xid();
cfs_atomic_set(&request->rq_refcount, 1);
struct ptlrpc_request_set *set = req->rq_set;
if (set)
- set->set_remaining --;
+ cfs_atomic_dec(&set->set_remaining);
}
ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
RETURN(NULL);
CFS_INIT_LIST_HEAD(&set->set_requests);
cfs_waitq_init(&set->set_waitq);
- set->set_remaining = 0;
+ cfs_atomic_set(&set->set_remaining, 0);
cfs_spin_lock_init(&set->set_new_req_lock);
CFS_INIT_LIST_HEAD(&set->set_new_requests);
CFS_INIT_LIST_HEAD(&set->set_cblist);
ENTRY;
/* Requests on the set should either all be completed, or all be new */
- expected_phase = (set->set_remaining == 0) ?
+ expected_phase = (cfs_atomic_read(&set->set_remaining) == 0) ?
RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
cfs_list_for_each (tmp, &set->set_requests) {
struct ptlrpc_request *req =
n++;
}
- LASSERTF(set->set_remaining == 0 || set->set_remaining == n, "%d / %d\n",
- set->set_remaining, n);
+ LASSERTF(cfs_atomic_read(&set->set_remaining) == 0 ||
+ cfs_atomic_read(&set->set_remaining) == n, "%d / %d\n",
+ cfs_atomic_read(&set->set_remaining), n);
cfs_list_for_each_safe(tmp, next, &set->set_requests) {
struct ptlrpc_request *req =
if (req->rq_phase == RQ_PHASE_NEW) {
ptlrpc_req_interpret(NULL, req, -EBADR);
- set->set_remaining--;
+ cfs_atomic_dec(&set->set_remaining);
}
req->rq_set = NULL;
ptlrpc_req_finished (req);
}
- LASSERT(set->set_remaining == 0);
+ LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
OBD_FREE(set, sizeof(*set));
EXIT;
/* The set takes over the caller's request reference */
cfs_list_add_tail(&req->rq_set_chain, &set->set_requests);
req->rq_set = set;
- set->set_remaining++;
+ cfs_atomic_inc(&set->set_remaining);
+ req->rq_queued_time = cfs_time_current(); /* Where is the best place to set this? */
}
/**
} else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
*status = -EIO;
+ } else if (imp->imp_obd->obd_no_recov) {
+ *status = -ESHUTDOWN;
+ } else if (ptlrpc_send_limit_expired(req)) {
+ /* probably doesn't need to be a D_ERROR after initial testing */
+ DEBUG_REQ(D_ERROR, req, "send limit expired ");
+ *status = -EIO;
} else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
imp->imp_state == LUSTRE_IMP_CONNECTING) {
/* allow CONNECT even if import is invalid */ ;
DEBUG_REQ(D_ERROR, req, "invalidate in flight");
*status = -EIO;
}
- } else if ((imp->imp_invalid && (!imp->imp_recon_bk)) ||
- imp->imp_obd->obd_no_recov) {
- /* If the import has been invalidated (such as by an OST
- * failure), and if the import(MGC) tried all of its connection
- * list (Bug 13464), the request must fail with -ESHUTDOWN.
- * This indicates the requests should be discarded; an -EIO
- * may result in a resend of the request. */
+ } else if (imp->imp_invalid) {
if (!imp->imp_deactive)
DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
*status = -ESHUTDOWN; /* bz 12940 */
long timediff;
ENTRY;
- LASSERT(!req->rq_receiving_reply);
- LASSERT(obd);
- LASSERT(req->rq_nob_received <= req->rq_repbuf_len);
+ LASSERT(obd != NULL);
+ /* repbuf must be unlinked */
+ LASSERT(!req->rq_receiving_reply && !req->rq_must_unlink);
+
+ if (req->rq_reply_truncate) {
+ if (ptlrpc_no_resend(req)) {
+ DEBUG_REQ(D_ERROR, req, "reply buffer overflow,"
+ " expected: %d, actual size: %d",
+ req->rq_nob_received, req->rq_repbuf_len);
+ RETURN(-EOVERFLOW);
+ }
- if (req->rq_reply_truncate && !req->rq_no_resend) {
- req->rq_resend = 1;
sptlrpc_cli_free_repbuf(req);
- req->rq_replen = req->rq_nob_received;
+ /* Pass the required reply buffer size (include
+ * space for early reply).
+ * NB: no need to roundup because alloc_repbuf
+ * will roundup it */
+ req->rq_replen = req->rq_nob_received;
+ req->rq_nob_received = 0;
+ req->rq_resend = 1;
RETURN(0);
}
* NB Until this point, the whole of the incoming message,
* including buflens, status etc is in the sender's byte order.
*/
-
rc = sptlrpc_cli_unwrap_reply(req);
if (rc) {
DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
int force_timer_recalc = 0;
ENTRY;
- if (set->set_remaining == 0)
+ if (cfs_atomic_read(&set->set_remaining) == 0)
RETURN(1);
cfs_list_for_each(tmp, &set->set_requests) {
cfs_list_entry(tmp, struct ptlrpc_request,
rq_set_chain);
struct obd_import *imp = req->rq_import;
+ int unregistered = 0;
int rc = 0;
if (req->rq_phase == RQ_PHASE_NEW &&
}
if (req->rq_err) {
+ cfs_spin_lock(&req->rq_lock);
req->rq_replied = 0;
+ cfs_spin_unlock(&req->rq_lock);
if (req->rq_status == 0)
req->rq_status = -EIO;
ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
cfs_spin_unlock(&imp->imp_lock);
GOTO(interpret, req->rq_status);
}
- if (req->rq_no_resend && !req->rq_wait_ctx) {
+ if (ptlrpc_no_resend(req) && !req->rq_wait_ctx) {
req->rq_status = -ENOTCONN;
ptlrpc_rqphase_move(req,
RQ_PHASE_INTERPRET);
cfs_spin_unlock(&imp->imp_lock);
+ cfs_spin_lock(&req->rq_lock);
req->rq_waiting = 0;
+ cfs_spin_unlock(&req->rq_lock);
- if (req->rq_timedout||req->rq_resend) {
+ if (req->rq_timedout || req->rq_resend) {
/* This is re-sending anyways,
* let's mark req as resend. */
+ cfs_spin_lock(&req->rq_lock);
req->rq_resend = 1;
+ cfs_spin_unlock(&req->rq_lock);
if (req->rq_bulk) {
__u64 old_xid;
if (status) {
if (req->rq_err) {
req->rq_status = status;
+ cfs_spin_lock(&req->rq_lock);
req->rq_wait_ctx = 0;
+ cfs_spin_unlock(&req->rq_lock);
force_timer_recalc = 1;
} else {
+ cfs_spin_lock(&req->rq_lock);
req->rq_wait_ctx = 1;
+ cfs_spin_unlock(&req->rq_lock);
}
continue;
} else {
+ cfs_spin_lock(&req->rq_lock);
req->rq_wait_ctx = 0;
+ cfs_spin_unlock(&req->rq_lock);
}
rc = ptl_send_rpc(req, 0);
DEBUG_REQ(D_HA, req, "send failed (%d)",
rc);
force_timer_recalc = 1;
+ cfs_spin_lock(&req->rq_lock);
req->rq_net_err = 1;
+ cfs_spin_unlock(&req->rq_lock);
}
/* need to reset the timeout */
force_timer_recalc = 1;
cfs_spin_unlock(&req->rq_lock);
+ /* unlink from net because we are going to
+ * swab in-place of reply buffer */
+ unregistered = ptlrpc_unregister_reply(req, 1);
+ if (!unregistered)
+ continue;
+
req->rq_status = after_reply(req);
if (req->rq_resend)
continue;
/* This moves to "unregistering" phase we need to wait for
* reply unlink. */
- if (!ptlrpc_unregister_reply(req, 1))
+ if (!unregistered && !ptlrpc_unregister_reply(req, 1))
continue;
if (!ptlrpc_unregister_bulk(req, 1))
}
cfs_spin_unlock(&imp->imp_lock);
- set->set_remaining--;
+ cfs_atomic_dec(&set->set_remaining);
cfs_waitq_broadcast(&imp->imp_recovery_waitq);
}
/* If we hit an error, we want to recover promptly. */
- RETURN(set->set_remaining == 0 || force_timer_recalc);
+ RETURN(cfs_atomic_read(&set->set_remaining) == 0 || force_timer_recalc);
}
/* Return 1 if we should give up, else 0 */
/* if a request can't be resent we can't wait for an answer after
the timeout */
- if (req->rq_no_resend) {
+ if (ptlrpc_no_resend(req)) {
DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:");
rc = 1;
}
* EINTR.
* I don't really care if we go once more round the loop in
* the error cases -eeb. */
- } while (rc != 0 || set->set_remaining != 0);
+ if (rc == 0 && cfs_atomic_read(&set->set_remaining) == 0) {
+ cfs_list_for_each(tmp, &set->set_requests) {
+ req = cfs_list_entry(tmp, struct ptlrpc_request,
+ rq_set_chain);
+ cfs_spin_lock(&req->rq_lock);
+ req->rq_invalid_rqset = 1;
+ cfs_spin_unlock(&req->rq_lock);
+ }
+ }
+ } while (rc != 0 || cfs_atomic_read(&set->set_remaining) != 0);
- LASSERT(set->set_remaining == 0);
+ LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
rc = 0;
cfs_list_for_each(tmp, &set->set_requests) {