typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int);
struct ptlrpc_request_set {
- int set_remaining; /* # uncompleted requests */
+ atomic_t set_remaining; /* # uncompleted requests */
cfs_waitq_t set_waitq;
cfs_waitq_t *set_wakeup_ptr;
struct list_head set_requests;
rq_reply_truncate:1, /* reply is truncated */
rq_fake:1, /* fake request - just for timeout only */
/* the request is queued to replay during recovery */
- rq_copy_queued:1;
+ rq_copy_queued:1,
+ /* whether the "rq_set" is a valid one */
+ rq_invalid_rqset;
enum rq_phase rq_phase; /* one of RQ_PHASE_* */
enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
atomic_t rq_refcount; /* client-side refcount for SENT race,
/* Multi-rpc bits */
struct list_head rq_set_chain;
+ cfs_waitq_t rq_set_waitq;
struct ptlrpc_request_set *rq_set;
int (*rq_interpret_reply)(struct ptlrpc_request *req, void *data,
int rc); /* async interpret handler */
CFS_INIT_LIST_HEAD(&request->rq_history_list);
CFS_INIT_LIST_HEAD(&request->rq_exp_list);
cfs_waitq_init(&request->rq_reply_waitq);
+ cfs_waitq_init(&request->rq_set_waitq);
request->rq_xid = ptlrpc_next_xid();
atomic_set(&request->rq_refcount, 1);
CFS_INIT_LIST_HEAD(&request->rq_history_list);
CFS_INIT_LIST_HEAD(&request->rq_exp_list);
cfs_waitq_init(&request->rq_reply_waitq);
+ cfs_waitq_init(&request->rq_set_waitq);
request->rq_xid = ptlrpc_next_xid();
atomic_set(&request->rq_refcount, 1);
struct ptlrpc_request_set *set = req->rq_set;
if (set)
- set->set_remaining --;
+ atomic_dec(&set->set_remaining);
}
ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
RETURN(NULL);
CFS_INIT_LIST_HEAD(&set->set_requests);
cfs_waitq_init(&set->set_waitq);
- set->set_remaining = 0;
+ atomic_set(&set->set_remaining, 0);
spin_lock_init(&set->set_new_req_lock);
CFS_INIT_LIST_HEAD(&set->set_new_requests);
CFS_INIT_LIST_HEAD(&set->set_cblist);
ENTRY;
/* Requests on the set should either all be completed, or all be new */
- expected_phase = (set->set_remaining == 0) ?
+ expected_phase = (atomic_read(&set->set_remaining) == 0) ?
RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
list_for_each (tmp, &set->set_requests) {
struct ptlrpc_request *req =
n++;
}
- LASSERTF(set->set_remaining == 0 || set->set_remaining == n, "%d / %d\n",
- set->set_remaining, n);
+ LASSERTF(atomic_read(&set->set_remaining) == 0 ||
+ atomic_read(&set->set_remaining) == n, "%d / %d\n",
+ atomic_read(&set->set_remaining), n);
list_for_each_safe(tmp, next, &set->set_requests) {
struct ptlrpc_request *req =
interpreter(req, &req->rq_async_args,
req->rq_status);
}
- set->set_remaining--;
+ atomic_dec(&set->set_remaining);
}
+ spin_lock(&req->rq_lock);
req->rq_set = NULL;
+ req->rq_invalid_rqset = 0;
+ spin_unlock(&req->rq_lock);
+
+ cfs_waitq_signal(&req->rq_set_waitq);
ptlrpc_req_finished (req);
}
- LASSERT(set->set_remaining == 0);
+ LASSERT(atomic_read(&set->set_remaining) == 0);
OBD_FREE(set, sizeof(*set));
EXIT;
struct ptlrpc_request *req)
{
/* The set takes over the caller's request reference */
+ LASSERT(list_empty(&req->rq_set_chain));
list_add_tail(&req->rq_set_chain, &set->set_requests);
req->rq_set = set;
- set->set_remaining++;
+ atomic_inc(&set->set_remaining);
}
/**
/*
* The set takes over the caller's request reference.
*/
+ LASSERT(list_empty(&req->rq_set_chain));
list_add_tail(&req->rq_set_chain, &set->set_new_requests);
req->rq_set = set;
spin_unlock(&set->set_new_req_lock);
int force_timer_recalc = 0;
ENTRY;
- if (set->set_remaining == 0)
+ if (atomic_read(&set->set_remaining) == 0)
RETURN(1);
list_for_each(tmp, &set->set_requests) {
}
spin_unlock(&imp->imp_lock);
- set->set_remaining--;
+ atomic_dec(&set->set_remaining);
cfs_waitq_broadcast(&imp->imp_recovery_waitq);
}
/* If we hit an error, we want to recover promptly. */
- RETURN(set->set_remaining == 0 || force_timer_recalc);
+ RETURN(atomic_read(&set->set_remaining) == 0 || force_timer_recalc);
}
/* Return 1 if we should give up, else 0 */
* EINTR.
* I don't really care if we go once more round the loop in
* the error cases -eeb. */
- } while (rc != 0 || set->set_remaining != 0);
+ if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
+ list_for_each(tmp, &set->set_requests) {
+ req = list_entry(tmp, struct ptlrpc_request,
+ rq_set_chain);
+ spin_lock(&req->rq_lock);
+ req->rq_invalid_rqset = 1;
+ spin_unlock(&req->rq_lock);
+ }
+ }
+ } while (rc != 0 || atomic_read(&set->set_remaining) != 0);
- LASSERT(set->set_remaining == 0);
+ LASSERT(atomic_read(&set->set_remaining) == 0);
rc = 0;
list_for_each(tmp, &set->set_requests) {
list_del_init(&req->rq_set_chain);
req->rq_set = NULL;
ptlrpcd_add_req(req);
- set->set_remaining--;
+ atomic_dec(&set->set_remaining);
}
- LASSERT(set->set_remaining == 0);
+ LASSERT(atomic_read(&set->set_remaining) == 0);
}
EXPORT_SYMBOL(ptlrpcd_add_rqset);
struct ptlrpcd_ctl *pc;
int rc;
+ spin_lock(&req->rq_lock);
+ if (req->rq_invalid_rqset) {
+ cfs_duration_t timeout;
+ struct l_wait_info lwi;
+
+ req->rq_invalid_rqset = 0;
+ spin_unlock(&req->rq_lock);
+
+ timeout = cfs_time_seconds(5);
+ lwi = LWI_TIMEOUT(timeout, back_to_sleep, NULL);
+ l_wait_event(req->rq_reply_waitq, (req->rq_set == NULL), &lwi);
+ } else if (req->rq_set) {
+ LASSERT(req->rq_phase == RQ_PHASE_NEW);
+ LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY);
+
+ /* ptlrpc_check_set will decrease the count */
+ atomic_inc(&req->rq_set->set_remaining);
+ spin_unlock(&req->rq_lock);
+
+ cfs_waitq_signal(&req->rq_set->set_waitq);
+ return 0;
+ } else {
+ spin_unlock(&req->rq_lock);
+ }
+
if (req->rq_send_state == LUSTRE_IMP_FULL)
pc = &ptlrpcd_pc;
else
}
spin_unlock(&pc->pc_set->set_new_req_lock);
- if (pc->pc_set->set_remaining) {
+ if (atomic_read(&pc->pc_set->set_remaining)) {
rc = rc | ptlrpc_check_set(pc->pc_set);
/*
struct ptlrpcd_ctl *pc = arg;
return (list_empty(&pc->pc_set->set_new_requests) &&
- pc->pc_set->set_remaining == 0);
+ atomic_read(&pc->pc_set->set_remaining) == 0);
}
#endif