int i;
int size = 1;
- while (size < pool->prp_rq_size + SPTLRPC_MAX_PAYLOAD)
+ while (size < pool->prp_rq_size)
size <<= 1;
LASSERTF(cfs_list_empty(&pool->prp_req_list) ||
cfs_spin_lock_init(&pool->prp_lock);
CFS_INIT_LIST_HEAD(&pool->prp_req_list);
- pool->prp_rq_size = msgsize;
+ pool->prp_rq_size = msgsize + SPTLRPC_MAX_PAYLOAD;
pool->prp_populate = populate_pool;
populate_pool(pool, num_rq);
CFS_INIT_LIST_HEAD(&request->rq_history_list);
CFS_INIT_LIST_HEAD(&request->rq_exp_list);
cfs_waitq_init(&request->rq_reply_waitq);
+ cfs_waitq_init(&request->rq_set_waitq);
request->rq_xid = ptlrpc_next_xid();
cfs_atomic_set(&request->rq_refcount, 1);
CFS_INIT_LIST_HEAD(&request->rq_history_list);
CFS_INIT_LIST_HEAD(&request->rq_exp_list);
cfs_waitq_init(&request->rq_reply_waitq);
+ cfs_waitq_init(&request->rq_set_waitq);
request->rq_xid = ptlrpc_next_xid();
cfs_atomic_set(&request->rq_refcount, 1);
struct ptlrpc_request_set *set = req->rq_set;
if (set)
- set->set_remaining --;
+ cfs_atomic_dec(&set->set_remaining);
}
ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
RETURN(NULL);
CFS_INIT_LIST_HEAD(&set->set_requests);
cfs_waitq_init(&set->set_waitq);
- set->set_remaining = 0;
+ cfs_atomic_set(&set->set_remaining, 0);
cfs_spin_lock_init(&set->set_new_req_lock);
CFS_INIT_LIST_HEAD(&set->set_new_requests);
CFS_INIT_LIST_HEAD(&set->set_cblist);
ENTRY;
/* Requests on the set should either all be completed, or all be new */
- expected_phase = (set->set_remaining == 0) ?
+ expected_phase = (cfs_atomic_read(&set->set_remaining) == 0) ?
RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
cfs_list_for_each (tmp, &set->set_requests) {
struct ptlrpc_request *req =
n++;
}
- LASSERTF(set->set_remaining == 0 || set->set_remaining == n, "%d / %d\n",
- set->set_remaining, n);
+ LASSERTF(cfs_atomic_read(&set->set_remaining) == 0 ||
+ cfs_atomic_read(&set->set_remaining) == n, "%d / %d\n",
+ cfs_atomic_read(&set->set_remaining), n);
cfs_list_for_each_safe(tmp, next, &set->set_requests) {
struct ptlrpc_request *req =
if (req->rq_phase == RQ_PHASE_NEW) {
ptlrpc_req_interpret(NULL, req, -EBADR);
- set->set_remaining--;
+ cfs_atomic_dec(&set->set_remaining);
}
req->rq_set = NULL;
ptlrpc_req_finished (req);
}
- LASSERT(set->set_remaining == 0);
+ LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
OBD_FREE(set, sizeof(*set));
EXIT;
/* The set takes over the caller's request reference */
cfs_list_add_tail(&req->rq_set_chain, &set->set_requests);
req->rq_set = set;
- set->set_remaining++;
+ cfs_atomic_inc(&set->set_remaining);
+ req->rq_queued_time = cfs_time_current(); /* Where is the best place to set this? */
}
/**
} else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
*status = -EIO;
+ } else if (imp->imp_obd->obd_no_recov) {
+ *status = -ESHUTDOWN;
+ } else if (ptlrpc_send_limit_expired(req)) {
+ /* probably doesn't need to be a D_ERROR after initial testing */
+ DEBUG_REQ(D_ERROR, req, "send limit expired ");
+ *status = -EIO;
} else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
imp->imp_state == LUSTRE_IMP_CONNECTING) {
/* allow CONNECT even if import is invalid */ ;
DEBUG_REQ(D_ERROR, req, "invalidate in flight");
*status = -EIO;
}
- } else if ((imp->imp_invalid && (!imp->imp_recon_bk)) ||
- imp->imp_obd->obd_no_recov) {
- /* If the import has been invalidated (such as by an OST
- * failure), and if the import(MGC) tried all of its connection
- * list (Bug 13464), the request must fail with -ESHUTDOWN.
- * This indicates the requests should be discarded; an -EIO
- * may result in a resend of the request. */
+ } else if (imp->imp_invalid) {
if (!imp->imp_deactive)
DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
*status = -ESHUTDOWN; /* bz 12940 */
LASSERT(!req->rq_receiving_reply && !req->rq_must_unlink);
if (req->rq_reply_truncate) {
- if (req->rq_no_resend) {
+ if (ptlrpc_no_resend(req)) {
DEBUG_REQ(D_ERROR, req, "reply buffer overflow,"
" expected: %d, actual size: %d",
req->rq_nob_received, req->rq_repbuf_len);
int force_timer_recalc = 0;
ENTRY;
- if (set->set_remaining == 0)
+ if (cfs_atomic_read(&set->set_remaining) == 0)
RETURN(1);
cfs_list_for_each(tmp, &set->set_requests) {
}
if (req->rq_err) {
+ cfs_spin_lock(&req->rq_lock);
req->rq_replied = 0;
+ cfs_spin_unlock(&req->rq_lock);
if (req->rq_status == 0)
req->rq_status = -EIO;
ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
cfs_spin_unlock(&imp->imp_lock);
GOTO(interpret, req->rq_status);
}
- if (req->rq_no_resend && !req->rq_wait_ctx) {
+ if (ptlrpc_no_resend(req) && !req->rq_wait_ctx) {
req->rq_status = -ENOTCONN;
ptlrpc_rqphase_move(req,
RQ_PHASE_INTERPRET);
cfs_spin_unlock(&imp->imp_lock);
+ cfs_spin_lock(&req->rq_lock);
req->rq_waiting = 0;
+ cfs_spin_unlock(&req->rq_lock);
- if (req->rq_timedout||req->rq_resend) {
+ if (req->rq_timedout || req->rq_resend) {
/* This is re-sending anyways,
* let's mark req as resend. */
+ cfs_spin_lock(&req->rq_lock);
req->rq_resend = 1;
+ cfs_spin_unlock(&req->rq_lock);
if (req->rq_bulk) {
__u64 old_xid;
if (status) {
if (req->rq_err) {
req->rq_status = status;
+ cfs_spin_lock(&req->rq_lock);
req->rq_wait_ctx = 0;
+ cfs_spin_unlock(&req->rq_lock);
force_timer_recalc = 1;
} else {
+ cfs_spin_lock(&req->rq_lock);
req->rq_wait_ctx = 1;
+ cfs_spin_unlock(&req->rq_lock);
}
continue;
} else {
+ cfs_spin_lock(&req->rq_lock);
req->rq_wait_ctx = 0;
+ cfs_spin_unlock(&req->rq_lock);
}
rc = ptl_send_rpc(req, 0);
DEBUG_REQ(D_HA, req, "send failed (%d)",
rc);
force_timer_recalc = 1;
+ cfs_spin_lock(&req->rq_lock);
req->rq_net_err = 1;
+ cfs_spin_unlock(&req->rq_lock);
}
/* need to reset the timeout */
force_timer_recalc = 1;
}
cfs_spin_unlock(&imp->imp_lock);
- set->set_remaining--;
+ cfs_atomic_dec(&set->set_remaining);
cfs_waitq_broadcast(&imp->imp_recovery_waitq);
}
/* If we hit an error, we want to recover promptly. */
- RETURN(set->set_remaining == 0 || force_timer_recalc);
+ RETURN(cfs_atomic_read(&set->set_remaining) == 0 || force_timer_recalc);
}
/* Return 1 if we should give up, else 0 */
/* if a request can't be resent we can't wait for an answer after
the timeout */
- if (req->rq_no_resend) {
+ if (ptlrpc_no_resend(req)) {
DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:");
rc = 1;
}
* EINTR.
* I don't really care if we go once more round the loop in
* the error cases -eeb. */
- } while (rc != 0 || set->set_remaining != 0);
+ if (rc == 0 && cfs_atomic_read(&set->set_remaining) == 0) {
+ cfs_list_for_each(tmp, &set->set_requests) {
+ req = cfs_list_entry(tmp, struct ptlrpc_request,
+ rq_set_chain);
+ cfs_spin_lock(&req->rq_lock);
+ req->rq_invalid_rqset = 1;
+ cfs_spin_unlock(&req->rq_lock);
+ }
+ }
+ } while (rc != 0 || cfs_atomic_read(&set->set_remaining) != 0);
- LASSERT(set->set_remaining == 0);
+ LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
rc = 0;
cfs_list_for_each(tmp, &set->set_requests) {