}
/*
+ * Move all request from an existing request set to the ptlrpcd queue.
+ * All requests from the set must be in phase RQ_PHASE_NEW.
+ */
+void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
+{
+ struct list_head *tmp, *pos;
+
+ list_for_each_safe(pos, tmp, &set->set_requests) {
+ struct ptlrpc_request *req =
+ list_entry(pos, struct ptlrpc_request, rq_set_chain);
+
+ LASSERT(req->rq_phase == RQ_PHASE_NEW);
+ list_del_init(&req->rq_set_chain);
+ req->rq_set = NULL;
+ ptlrpcd_add_req(req, PSCOPE_OTHER);
+ set->set_remaining--;
+ }
+ LASSERT(set->set_remaining == 0);
+}
+EXPORT_SYMBOL(ptlrpcd_add_rqset);
+
+/*
* Requests that are added to the ptlrpcd queue are sent via
* ptlrpcd_check->ptlrpc_check_set().
*/
-void ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope)
+int ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope)
{
struct ptlrpcd_ctl *pc;
enum pscope_thread pt;
* so that higher levels might free assosiated
* resources.
*/
- req->rq_status = -EBADR;
- interpreter(NULL, req, &req->rq_async_args,
- req->rq_status);
+ ptlrpc_req_interpret(NULL, req, -EBADR);
req->rq_set = NULL;
ptlrpc_req_finished(req);
}
+
+ return rc;
}
static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc)
int rc = 0;
ENTRY;
- if (test_bit(LIOD_STOP, &pc->pc_flags))
- RETURN(1);
-
spin_lock(&pc->pc_set->set_new_req_lock);
list_for_each_safe(pos, tmp, &pc->pc_set->set_new_requests) {
req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
{
struct ptlrpcd_ctl *pc = arg;
struct lu_env env = { .le_ses = NULL };
- int rc;
+ int rc, exit = 0;
ENTRY;
rc = cfs_daemonize_ctxt(pc->pc_name);
if (rc != 0)
RETURN(rc);
env.le_ctx.lc_cookie = 0x7;
+
/*
* This mainloop strongly resembles ptlrpc_set_wait() except that our
* set never completes. ptlrpcd_check() calls ptlrpc_check_set() when
* there are requests in the set. New requests come in on the set's
* new_req_list and ptlrpcd_check() moves them into the set.
*/
- while (1) {
+ do {
struct l_wait_info lwi;
int timeout;
/*
* Abort inflight rpcs for forced stop case.
*/
- if (test_bit(LIOD_STOP_FORCE, &pc->pc_flags))
- ptlrpc_abort_set(pc->pc_set);
+ if (test_bit(LIOD_STOP, &pc->pc_flags)) {
+ if (test_bit(LIOD_FORCE, &pc->pc_flags))
+ ptlrpc_abort_set(pc->pc_set);
+ exit++;
+ }
- if (test_bit(LIOD_STOP, &pc->pc_flags))
- break;
- }
+ /*
+ * Let's make one more loop to make sure that ptlrpcd_check()
+ * copied all raced new rpcs into the set so we can kill them.
+ */
+ } while (exit < 2);
/*
* Wait for inflight requests to drain.
clear_bit(LIOD_START, &pc->pc_flags);
clear_bit(LIOD_STOP, &pc->pc_flags);
+ clear_bit(LIOD_FORCE, &pc->pc_flags);
return 0;
}
pc->pc_recurred++;
if (pc->pc_recurred == 1) {
- lu_context_enter(&pc->pc_env.le_ctx);
- rc = ptlrpcd_check(&pc->pc_env, pc);
- lu_context_exit(&pc->pc_env.le_ctx);
- if (!rc)
- ptlrpc_expired_set(pc->pc_set);
- /*
- * XXX: send replay requests.
- */
- if (test_bit(LIOD_RECOVERY, &pc->pc_flags))
+ rc = lu_env_refill(&pc->pc_env);
+ if (rc == 0) {
+ lu_context_enter(&pc->pc_env.le_ctx);
rc = ptlrpcd_check(&pc->pc_env, pc);
+ lu_context_exit(&pc->pc_env.le_ctx);
+ if (!rc)
+ ptlrpc_expired_set(pc->pc_set);
+ /*
+ * XXX: send replay requests.
+ */
+ if (test_bit(LIOD_RECOVERY, &pc->pc_flags))
+ rc = ptlrpcd_check(&pc->pc_env, pc);
+ }
}
pc->pc_recurred--;
set_bit(LIOD_STOP, &pc->pc_flags);
if (force)
- set_bit(LIOD_STOP_FORCE, &pc->pc_flags);
+ set_bit(LIOD_FORCE, &pc->pc_flags);
cfs_waitq_signal(&pc->pc_set->set_waitq);
#ifdef __KERNEL__
wait_for_completion(&pc->pc_finishing);