typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int);
struct ptlrpc_request_set {
- int set_remaining; /* # uncompleted requests */
+ cfs_atomic_t set_remaining; /* # uncompleted requests */
cfs_waitq_t set_waitq;
cfs_waitq_t *set_wakeup_ptr;
cfs_list_t set_requests;
rq_hp:1, /* high priority RPC */
rq_at_linked:1, /* link into service's srv_at_array */
rq_reply_truncate:1,
- rq_committed:1;
+ rq_committed:1,
+ /* whether the "rq_set" is a valid one */
+ rq_invalid_rqset:1;
enum rq_phase rq_phase; /* one of RQ_PHASE_* */
enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
int rq_timeout; /* service time estimate (secs) */
/* Multi-rpc bits */
- cfs_list_t rq_set_chain;
+ cfs_list_t rq_set_chain;
+ cfs_waitq_t rq_set_waitq;
struct ptlrpc_request_set *rq_set;
/** Async completion handler */
ptlrpc_interpterer_t rq_interpret_reply;
CFS_INIT_LIST_HEAD(&request->rq_history_list);
CFS_INIT_LIST_HEAD(&request->rq_exp_list);
cfs_waitq_init(&request->rq_reply_waitq);
+ cfs_waitq_init(&request->rq_set_waitq);
request->rq_xid = ptlrpc_next_xid();
cfs_atomic_set(&request->rq_refcount, 1);
CFS_INIT_LIST_HEAD(&request->rq_history_list);
CFS_INIT_LIST_HEAD(&request->rq_exp_list);
cfs_waitq_init(&request->rq_reply_waitq);
+ cfs_waitq_init(&request->rq_set_waitq);
request->rq_xid = ptlrpc_next_xid();
cfs_atomic_set(&request->rq_refcount, 1);
struct ptlrpc_request_set *set = req->rq_set;
if (set)
- set->set_remaining --;
+ cfs_atomic_dec(&set->set_remaining);
}
ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
RETURN(NULL);
CFS_INIT_LIST_HEAD(&set->set_requests);
cfs_waitq_init(&set->set_waitq);
- set->set_remaining = 0;
+ cfs_atomic_set(&set->set_remaining, 0);
cfs_spin_lock_init(&set->set_new_req_lock);
CFS_INIT_LIST_HEAD(&set->set_new_requests);
CFS_INIT_LIST_HEAD(&set->set_cblist);
ENTRY;
/* Requests on the set should either all be completed, or all be new */
- expected_phase = (set->set_remaining == 0) ?
+ expected_phase = (cfs_atomic_read(&set->set_remaining) == 0) ?
RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
cfs_list_for_each (tmp, &set->set_requests) {
struct ptlrpc_request *req =
n++;
}
- LASSERTF(set->set_remaining == 0 || set->set_remaining == n, "%d / %d\n",
- set->set_remaining, n);
+ LASSERTF(cfs_atomic_read(&set->set_remaining) == 0 ||
+ cfs_atomic_read(&set->set_remaining) == n, "%d / %d\n",
+ cfs_atomic_read(&set->set_remaining), n);
cfs_list_for_each_safe(tmp, next, &set->set_requests) {
struct ptlrpc_request *req =
if (req->rq_phase == RQ_PHASE_NEW) {
ptlrpc_req_interpret(NULL, req, -EBADR);
- set->set_remaining--;
+ cfs_atomic_dec(&set->set_remaining);
}
req->rq_set = NULL;
ptlrpc_req_finished (req);
}
- LASSERT(set->set_remaining == 0);
+ LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
OBD_FREE(set, sizeof(*set));
EXIT;
/* The set takes over the caller's request reference */
cfs_list_add_tail(&req->rq_set_chain, &set->set_requests);
req->rq_set = set;
- set->set_remaining++;
+ cfs_atomic_inc(&set->set_remaining);
}
/**
int force_timer_recalc = 0;
ENTRY;
- if (set->set_remaining == 0)
+ if (cfs_atomic_read(&set->set_remaining) == 0)
RETURN(1);
cfs_list_for_each(tmp, &set->set_requests) {
}
cfs_spin_unlock(&imp->imp_lock);
- set->set_remaining--;
+ cfs_atomic_dec(&set->set_remaining);
cfs_waitq_broadcast(&imp->imp_recovery_waitq);
}
/* If we hit an error, we want to recover promptly. */
- RETURN(set->set_remaining == 0 || force_timer_recalc);
+ RETURN(cfs_atomic_read(&set->set_remaining) == 0 || force_timer_recalc);
}
/* Return 1 if we should give up, else 0 */
* EINTR.
* I don't really care if we go once more round the loop in
* the error cases -eeb. */
- } while (rc != 0 || set->set_remaining != 0);
+ if (rc == 0 && cfs_atomic_read(&set->set_remaining) == 0) {
+ cfs_list_for_each(tmp, &set->set_requests) {
+ req = cfs_list_entry(tmp, struct ptlrpc_request,
+ rq_set_chain);
+ cfs_spin_lock(&req->rq_lock);
+ req->rq_invalid_rqset = 1;
+ cfs_spin_unlock(&req->rq_lock);
+ }
+ }
+ } while (rc != 0 || cfs_atomic_read(&set->set_remaining) != 0);
- LASSERT(set->set_remaining == 0);
+ LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
rc = 0;
cfs_list_for_each(tmp, &set->set_requests) {
cfs_list_del_init(&req->rq_set_chain);
req->rq_set = NULL;
ptlrpcd_add_req(req, PSCOPE_OTHER);
- set->set_remaining--;
+ cfs_atomic_dec(&set->set_remaining);
}
- LASSERT(set->set_remaining == 0);
+ LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
}
EXPORT_SYMBOL(ptlrpcd_add_rqset);
int rc;
LASSERT(scope < PSCOPE_NR);
+
+ cfs_spin_lock(&req->rq_lock);
+ if (req->rq_invalid_rqset) {
+ cfs_duration_t timeout;
+ struct l_wait_info lwi;
+
+ req->rq_invalid_rqset = 0;
+ cfs_spin_unlock(&req->rq_lock);
+
+ timeout = cfs_time_seconds(5);
+ lwi = LWI_TIMEOUT(timeout, back_to_sleep, NULL);
+ l_wait_event(req->rq_reply_waitq, (req->rq_set == NULL), &lwi);
+ } else if (req->rq_set) {
+ LASSERT(req->rq_phase == RQ_PHASE_NEW);
+ LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY);
+
+ /* ptlrpc_check_set will decrease the count */
+ cfs_atomic_inc(&req->rq_set->set_remaining);
+ cfs_spin_unlock(&req->rq_lock);
+
+ cfs_waitq_signal(&req->rq_set->set_waitq);
+ } else {
+ cfs_spin_unlock(&req->rq_lock);
+ }
+
pt = req->rq_send_state == LUSTRE_IMP_FULL ? PT_NORMAL : PT_RECOVERY;
pc = &ptlrpcd_scopes[scope].pscope_thread[pt].pt_ctl;
rc = ptlrpc_set_add_new_req(pc, req);
}
cfs_spin_unlock(&pc->pc_set->set_new_req_lock);
- if (pc->pc_set->set_remaining) {
+ if (cfs_atomic_read(&pc->pc_set->set_remaining)) {
rc = rc | ptlrpc_check_set(env, pc->pc_set);
/*
struct ptlrpcd_ctl *pc = arg;
return (cfs_list_empty(&pc->pc_set->set_new_requests) &&
- pc->pc_set->set_remaining == 0);
+ cfs_atomic_read(&pc->pc_set->set_remaining) == 0);
}
#endif