#include <linux/lustre_net.h>
#ifndef __CYGWIN__
-# include <linux/ctype.h>
# include <linux/init.h>
#else
# include <ctype.h>
#include <linux/lprocfs_status.h>
#define LIOD_STOP 0
-static struct ptlrpcd_ctl {
+struct ptlrpcd_ctl {
unsigned long pc_flags;
spinlock_t pc_lock;
struct completion pc_starting;
struct list_head pc_req_list;
wait_queue_head_t pc_waitq;
struct ptlrpc_request_set *pc_set;
-} ptlrpcd_pc;
+ char pc_name[16];
+#ifndef __KERNEL__
+ int pc_recurred;
+ void *pc_callback;
+#endif
+};
+
+static struct ptlrpcd_ctl ptlrpcd_pc;
+static struct ptlrpcd_ctl ptlrpcd_recovery_pc;
static DECLARE_MUTEX(ptlrpcd_sem);
static int ptlrpcd_users = 0;
+void ptlrpcd_wake(struct ptlrpc_request *req)
+{
+ struct ptlrpcd_ctl *pc = req->rq_ptlrpcd_data;
+
+ LASSERT(pc != NULL);
+
+ wake_up(&pc->pc_waitq);
+}
+
void ptlrpcd_add_req(struct ptlrpc_request *req)
{
- struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
+ struct ptlrpcd_ctl *pc;
+
+ if (req->rq_send_state == LUSTRE_IMP_FULL)
+ pc = &ptlrpcd_pc;
+ else
+ pc = &ptlrpcd_recovery_pc;
ptlrpc_set_add_new_req(pc->pc_set, req);
- wake_up(&pc->pc_waitq);
+ req->rq_ptlrpcd_data = pc;
+
+ ptlrpcd_wake(req);
}
static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
unsigned long flags;
ENTRY;
- kportal_daemonize("ptlrpcd");
+ kportal_daemonize(pc->pc_name);
SIGNAL_MASK_LOCK(current, flags);
sigfillset(¤t->blocked);
complete(&pc->pc_starting);
- /* like kswapd */
- current->flags |= PF_MEMALLOC;
-
/* this mainloop strongly resembles ptlrpc_set_wait except
* that our set never completes. ptlrpcd_check calls ptlrpc_check_set
* when there are requests in the set. new requests come in
if (test_bit(LIOD_STOP, &pc->pc_flags))
break;
}
- /* XXX should be making sure we don't have anything in flight */
+ /* wait for inflight requests to drain */
+ if (!list_empty(&pc->pc_set->set_requests))
+ ptlrpc_set_wait(pc->pc_set);
complete(&pc->pc_finishing);
return 0;
}
#else
-static int ptlrpcd_recurred = 0;
-static void *ptlrpcd_callback;
int ptlrpcd_check_async_rpcs(void *arg)
{
int rc = 0;
/* single threaded!! */
- ptlrpcd_recurred++;
+ pc->pc_recurred++;
- if (ptlrpcd_recurred == 1)
+ if (pc->pc_recurred == 1)
rc = ptlrpcd_check(pc);
- ptlrpcd_recurred--;
+ pc->pc_recurred--;
return rc;
}
#endif
-int ptlrpcd_addref(void)
+static int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc)
{
- struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
int rc = 0;
- ENTRY;
-
- down(&ptlrpcd_sem);
- if (++ptlrpcd_users != 1)
- GOTO(out, rc);
memset(pc, 0, sizeof(*pc));
init_completion(&pc->pc_starting);
pc->pc_flags = 0;
spin_lock_init(&pc->pc_lock);
INIT_LIST_HEAD(&pc->pc_req_list);
+ snprintf (pc->pc_name, sizeof (pc->pc_name), name);
pc->pc_set = ptlrpc_prep_set();
if (pc->pc_set == NULL)
wait_for_completion(&pc->pc_starting);
#else
- ptlrpcd_callback =
+ pc->pc_callback =
liblustre_register_wait_callback(&ptlrpcd_check_async_rpcs, pc);
#endif
out:
+ RETURN(rc);
+}
+
+static void ptlrpcd_stop(struct ptlrpcd_ctl *pc)
+{
+ set_bit(LIOD_STOP, &pc->pc_flags);
+ wake_up(&pc->pc_waitq);
+#ifdef __KERNEL__
+ wait_for_completion(&pc->pc_finishing);
+#else
+ liblustre_deregister_wait_callback(pc->pc_callback);
+#endif
+ ptlrpc_set_destroy(pc->pc_set);
+}
+
+int ptlrpcd_addref(void)
+{
+ int rc = 0;
+ ENTRY;
+
+ down(&ptlrpcd_sem);
+ if (++ptlrpcd_users != 1)
+ GOTO(out, rc);
+
+ rc = ptlrpcd_start("ptlrpcd", &ptlrpcd_pc);
+ if (rc) {
+ --ptlrpcd_users;
+ GOTO(out, rc);
+ }
+
+ rc = ptlrpcd_start("ptlrpcd-recov", &ptlrpcd_recovery_pc);
+ if (rc) {
+ ptlrpcd_stop(&ptlrpcd_pc);
+ --ptlrpcd_users;
+ GOTO(out, rc);
+ }
+out:
up(&ptlrpcd_sem);
RETURN(rc);
}
void ptlrpcd_decref(void)
{
- struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
-
down(&ptlrpcd_sem);
if (--ptlrpcd_users == 0) {
- set_bit(LIOD_STOP, &pc->pc_flags);
- wake_up(&pc->pc_waitq);
-#ifdef __KERNEL__
- wait_for_completion(&pc->pc_finishing);
-#else
- liblustre_deregister_wait_callback(ptlrpcd_callback);
-#endif
- ptlrpc_set_destroy(pc->pc_set);
+ ptlrpcd_stop(&ptlrpcd_pc);
+ ptlrpcd_stop(&ptlrpcd_recovery_pc);
}
up(&ptlrpcd_sem);
}