#include <linux/lprocfs_status.h>
#define LIOD_STOP 0
-static struct ptlrpcd_ctl {
+struct ptlrpcd_ctl {
unsigned long pc_flags;
spinlock_t pc_lock;
struct completion pc_starting;
struct list_head pc_req_list;
wait_queue_head_t pc_waitq;
struct ptlrpc_request_set *pc_set;
-} ptlrpcd_pc;
+#ifndef __KERNEL__
+ int pc_recurred;
+ void *pc_callback;
+#endif
+};
+
+static struct ptlrpcd_ctl ptlrpcd_pc;
+static struct ptlrpcd_ctl ptlrpcd_recovery_pc;
static DECLARE_MUTEX(ptlrpcd_sem);
static int ptlrpcd_users = 0;
-void ptlrpcd_wake(void)
+void ptlrpcd_wake(struct ptlrpc_request *req)
{
- struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
+ struct ptlrpcd_ctl *pc = req->rq_ptlrpcd_data;
+
+ LASSERT(pc != NULL);
+
wake_up(&pc->pc_waitq);
}
void ptlrpcd_add_req(struct ptlrpc_request *req)
{
- struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
+ struct ptlrpcd_ctl *pc;
+
+ if (req->rq_send_state == LUSTRE_IMP_FULL)
+ pc = &ptlrpcd_pc;
+ else
+ pc = &ptlrpcd_recovery_pc;
ptlrpc_set_add_new_req(pc->pc_set, req);
- ptlrpcd_wake();
+ req->rq_ptlrpcd_data = pc;
+
+ ptlrpcd_wake(req);
}
static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
if (test_bit(LIOD_STOP, &pc->pc_flags))
break;
}
- /* XXX should be making sure we don't have anything in flight */
+ /* wait for inflight requests to drain */
+ if (!list_empty(&pc->pc_set->set_requests))
+ ptlrpc_set_wait(pc->pc_set);
complete(&pc->pc_finishing);
return 0;
}
#else
-static int ptlrpcd_recurred = 0;
-static void *ptlrpcd_callback;
int ptlrpcd_check_async_rpcs(void *arg)
{
int rc = 0;
/* single threaded!! */
- ptlrpcd_recurred++;
+ pc->pc_recurred++;
- if (ptlrpcd_recurred == 1)
+ if (pc->pc_recurred == 1)
rc = ptlrpcd_check(pc);
- ptlrpcd_recurred--;
+ pc->pc_recurred--;
return rc;
}
#endif
-int ptlrpcd_addref(void)
+static int ptlrpcd_start(struct ptlrpcd_ctl *pc)
{
- struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
int rc = 0;
- ENTRY;
-
- down(&ptlrpcd_sem);
- if (++ptlrpcd_users != 1)
- GOTO(out, rc);
memset(pc, 0, sizeof(*pc));
init_completion(&pc->pc_starting);
wait_for_completion(&pc->pc_starting);
#else
- ptlrpcd_callback =
+ pc->pc_callback =
liblustre_register_wait_callback(&ptlrpcd_check_async_rpcs, pc);
#endif
out:
+ RETURN(rc);
+}
+
+static void ptlrpcd_stop(struct ptlrpcd_ctl *pc)
+{
+ set_bit(LIOD_STOP, &pc->pc_flags);
+ wake_up(&pc->pc_waitq);
+#ifdef __KERNEL__
+ wait_for_completion(&pc->pc_finishing);
+#else
+ liblustre_deregister_wait_callback(pc->pc_callback);
+#endif
+ ptlrpc_set_destroy(pc->pc_set);
+}
+
+int ptlrpcd_addref(void)
+{
+ int rc = 0;
+ ENTRY;
+
+ down(&ptlrpcd_sem);
+ if (++ptlrpcd_users != 1)
+ GOTO(out, rc);
+
+ rc = ptlrpcd_start(&ptlrpcd_pc);
+ if (rc) {
+ --ptlrpcd_users;
+ GOTO(out, rc);
+ }
+
+ rc = ptlrpcd_start(&ptlrpcd_recovery_pc);
+ if (rc) {
+ ptlrpcd_stop(&ptlrpcd_pc);
+ --ptlrpcd_users;
+ GOTO(out, rc);
+ }
+out:
up(&ptlrpcd_sem);
RETURN(rc);
}
void ptlrpcd_decref(void)
{
- struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
-
down(&ptlrpcd_sem);
if (--ptlrpcd_users == 0) {
- set_bit(LIOD_STOP, &pc->pc_flags);
- wake_up(&pc->pc_waitq);
-#ifdef __KERNEL__
- wait_for_completion(&pc->pc_finishing);
-#else
- liblustre_deregister_wait_callback(ptlrpcd_callback);
-#endif
- ptlrpc_set_destroy(pc->pc_set);
+ ptlrpcd_stop(&ptlrpcd_pc);
+ ptlrpcd_stop(&ptlrpcd_recovery_pc);
}
up(&ptlrpcd_sem);
}