Whamcloud - gitweb
Land from b_hd_pid to HEAD
[fs/lustre-release.git] / lustre / ptlrpc / ptlrpcd.c
index b7b9700..df8a760 100644 (file)
@@ -43,7 +43,6 @@
 #include <linux/lustre_net.h>
 
 #ifndef  __CYGWIN__
-# include <linux/ctype.h>
 # include <linux/init.h>
 #else
 # include <ctype.h>
@@ -54,7 +53,7 @@
 #include <linux/lprocfs_status.h>
 
 #define LIOD_STOP 0
-static struct ptlrpcd_ctl {
+struct ptlrpcd_ctl {
         unsigned long             pc_flags;
         spinlock_t                pc_lock;
         struct completion         pc_starting;
@@ -62,17 +61,41 @@ static struct ptlrpcd_ctl {
         struct list_head          pc_req_list;
         wait_queue_head_t         pc_waitq;
         struct ptlrpc_request_set *pc_set;
-} ptlrpcd_pc;
+        char                      pc_name[16];
+#ifndef __KERNEL__
+        int                       pc_recurred;
+        void                     *pc_callback;
+#endif
+};
+
+static struct ptlrpcd_ctl ptlrpcd_pc;
+static struct ptlrpcd_ctl ptlrpcd_recovery_pc;
 
 static DECLARE_MUTEX(ptlrpcd_sem);
 static int ptlrpcd_users = 0;
 
+void ptlrpcd_wake(struct ptlrpc_request *req)
+{
+        struct ptlrpcd_ctl *pc = req->rq_ptlrpcd_data;
+
+        LASSERT(pc != NULL);
+
+        wake_up(&pc->pc_waitq);
+}
+
 void ptlrpcd_add_req(struct ptlrpc_request *req)
 {
-        struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
+        struct ptlrpcd_ctl *pc;
+
+        if (req->rq_send_state == LUSTRE_IMP_FULL)
+                pc = &ptlrpcd_pc;
+        else 
+                pc = &ptlrpcd_recovery_pc;
 
         ptlrpc_set_add_new_req(pc->pc_set, req);
-        wake_up(&pc->pc_waitq);
+        req->rq_ptlrpcd_data = pc;
+                
+        ptlrpcd_wake(req);
 }
 
 static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
@@ -125,7 +148,7 @@ static int ptlrpcd(void *arg)
         unsigned long flags;
         ENTRY;
 
-        kportal_daemonize("ptlrpcd");
+        kportal_daemonize(pc->pc_name);
 
         SIGNAL_MASK_LOCK(current, flags);
         sigfillset(&current->blocked);
@@ -134,9 +157,6 @@ static int ptlrpcd(void *arg)
 
         complete(&pc->pc_starting);
 
-        /* like kswapd */
-        current->flags |= PF_MEMALLOC;
-
         /* this mainloop strongly resembles ptlrpc_set_wait except
          * that our set never completes.  ptlrpcd_check calls ptlrpc_check_set
          * when there are requests in the set.  new requests come in
@@ -159,13 +179,13 @@ static int ptlrpcd(void *arg)
                 if (test_bit(LIOD_STOP, &pc->pc_flags))
                         break;
         }
-        /* XXX should be making sure we don't have anything in flight */
+        /* wait for inflight requests to drain */
+        if (!list_empty(&pc->pc_set->set_requests))
+                ptlrpc_set_wait(pc->pc_set);
         complete(&pc->pc_finishing);
         return 0;
 }
 #else
-static int ptlrpcd_recurred = 0;
-static void *ptlrpcd_callback;
 
 int ptlrpcd_check_async_rpcs(void *arg)
 {
@@ -173,25 +193,19 @@ int ptlrpcd_check_async_rpcs(void *arg)
         int                  rc = 0;
 
         /* single threaded!! */
-        ptlrpcd_recurred++;
+        pc->pc_recurred++;
 
-        if (ptlrpcd_recurred == 1)
+        if (pc->pc_recurred == 1)
                 rc = ptlrpcd_check(pc);
 
-        ptlrpcd_recurred--;
+        pc->pc_recurred--;
         return rc;
 }
 #endif
 
-int ptlrpcd_addref(void)
+static int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc)
 {
-        struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
         int rc = 0;
-        ENTRY;
-
-        down(&ptlrpcd_sem);
-        if (++ptlrpcd_users != 1)
-                GOTO(out, rc);
 
         memset(pc, 0, sizeof(*pc));
         init_completion(&pc->pc_starting);
@@ -200,6 +214,7 @@ int ptlrpcd_addref(void)
         pc->pc_flags = 0;
         spin_lock_init(&pc->pc_lock);
         INIT_LIST_HEAD(&pc->pc_req_list);
+        snprintf (pc->pc_name, sizeof (pc->pc_name), name);
 
         pc->pc_set = ptlrpc_prep_set();
         if (pc->pc_set == NULL)
@@ -213,28 +228,57 @@ int ptlrpcd_addref(void)
 
         wait_for_completion(&pc->pc_starting);
 #else
-        ptlrpcd_callback =
+        pc->pc_callback =
                 liblustre_register_wait_callback(&ptlrpcd_check_async_rpcs, pc);
 #endif
 out:
+        RETURN(rc);
+}
+
+static void ptlrpcd_stop(struct ptlrpcd_ctl *pc)
+{
+        set_bit(LIOD_STOP, &pc->pc_flags);
+        wake_up(&pc->pc_waitq);
+#ifdef __KERNEL__
+        wait_for_completion(&pc->pc_finishing);
+#else
+        liblustre_deregister_wait_callback(pc->pc_callback);
+#endif
+        ptlrpc_set_destroy(pc->pc_set);
+}
+
+int ptlrpcd_addref(void)
+{
+        int rc = 0;
+        ENTRY;
+
+        down(&ptlrpcd_sem);
+        if (++ptlrpcd_users != 1)
+                GOTO(out, rc);
+
+        rc = ptlrpcd_start("ptlrpcd", &ptlrpcd_pc);
+        if (rc) {
+                --ptlrpcd_users;
+                GOTO(out, rc);
+        }
+
+        rc = ptlrpcd_start("ptlrpcd-recov", &ptlrpcd_recovery_pc);
+        if (rc) {
+                ptlrpcd_stop(&ptlrpcd_pc);
+                --ptlrpcd_users;
+                GOTO(out, rc);
+        }
+out:
         up(&ptlrpcd_sem);
         RETURN(rc);
 }
 
 void ptlrpcd_decref(void)
 {
-        struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
-
         down(&ptlrpcd_sem);
         if (--ptlrpcd_users == 0) {
-                set_bit(LIOD_STOP, &pc->pc_flags);
-                wake_up(&pc->pc_waitq);
-#ifdef __KERNEL__
-                wait_for_completion(&pc->pc_finishing);
-#else
-                liblustre_deregister_wait_callback(ptlrpcd_callback);
-#endif
-                ptlrpc_set_destroy(pc->pc_set);
+                ptlrpcd_stop(&ptlrpcd_pc);
+                ptlrpcd_stop(&ptlrpcd_recovery_pc);
         }
         up(&ptlrpcd_sem);
 }