Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lustre / ptlrpc / ptlrpcd.c
index 9dce7f7..5b82f57 100644 (file)
@@ -38,6 +38,7 @@
 # include <lustre_lib.h>
 
 #include <lustre_ha.h>
+#include <obd_class.h>   /* for obd_zombie */
 #include <obd_support.h> /* for OBD_FAIL_CHECK */
 #include <lprocfs_status.h>
 
@@ -47,13 +48,12 @@ struct ptlrpcd_ctl {
         spinlock_t                pc_lock;
         struct completion         pc_starting;
         struct completion         pc_finishing;
-        struct list_head          pc_req_list;
-        cfs_waitq_t               pc_waitq;
         struct ptlrpc_request_set *pc_set;
         char                      pc_name[16];
 #ifndef __KERNEL__
         int                       pc_recurred;
-        void                     *pc_callback;
+        void                     *pc_wait_callback;
+        void                     *pc_idle_callback;
 #endif
 };
 
@@ -65,11 +65,11 @@ static int ptlrpcd_users = 0;
 
 void ptlrpcd_wake(struct ptlrpc_request *req)
 {
-        struct ptlrpcd_ctl *pc = req->rq_ptlrpcd_data;
+        struct ptlrpc_request_set *rq_set = req->rq_set;
 
-        LASSERT(pc != NULL);
+        LASSERT(rq_set != NULL);
 
-        cfs_waitq_signal(&pc->pc_waitq);
+        cfs_waitq_signal(&rq_set->set_waitq);
 }
 
 /* requests that are added to the ptlrpcd queue are sent via
@@ -83,9 +83,8 @@ void ptlrpcd_add_req(struct ptlrpc_request *req)
         else
                 pc = &ptlrpcd_recovery_pc;
 
-        req->rq_ptlrpcd_data = pc;
         ptlrpc_set_add_new_req(pc->pc_set, req);
-        wake_up(&pc->pc_waitq);
+        cfs_waitq_signal(&pc->pc_set->set_waitq);
 }
 
 static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
@@ -141,9 +140,13 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
 static int ptlrpcd(void *arg)
 {
         struct ptlrpcd_ctl *pc = arg;
+        int rc;
         ENTRY;
 
-        cfs_daemonize(pc->pc_name);
+        if ((rc = cfs_daemonize_ctxt(pc->pc_name))) {
+                complete(&pc->pc_starting);
+                return rc;
+        }
 
         complete(&pc->pc_starting);
 
@@ -153,19 +156,13 @@ static int ptlrpcd(void *arg)
          * on the set's new_req_list and ptlrpcd_check moves them into
          * the set. */
         while (1) {
-                cfs_waitlink_t set_wait;
                 struct l_wait_info lwi;
                 cfs_duration_t timeout;
 
                 timeout = cfs_time_seconds(ptlrpc_set_next_timeout(pc->pc_set));
                 lwi = LWI_TIMEOUT(timeout, ptlrpc_expired_set, pc->pc_set);
 
-                /* ala the pinger, wait on pc's waitqueue and the set's */
-                cfs_waitlink_init(&set_wait);
-                cfs_waitq_add(&pc->pc_set->set_waitq, &set_wait);
-                cfs_waitq_forward(&set_wait, &pc->pc_waitq);
-                l_wait_event(pc->pc_waitq, ptlrpcd_check(pc), &lwi);
-                cfs_waitq_del(&pc->pc_set->set_waitq, &set_wait);
+                l_wait_event(pc->pc_set->set_waitq, ptlrpcd_check(pc), &lwi);
 
                 if (test_bit(LIOD_STOP, &pc->pc_flags))
                         break;
@@ -176,6 +173,7 @@ static int ptlrpcd(void *arg)
         complete(&pc->pc_finishing);
         return 0;
 }
+
 #else
 
 int ptlrpcd_check_async_rpcs(void *arg)
@@ -190,11 +188,23 @@ int ptlrpcd_check_async_rpcs(void *arg)
                 rc = ptlrpcd_check(pc);
                 if (!rc)
                         ptlrpc_expired_set(pc->pc_set);
+                /*XXX send replay requests */
+                if (pc == &ptlrpcd_recovery_pc)
+                        rc = ptlrpcd_check(pc);
         }
 
         pc->pc_recurred--;
         return rc;
 }
+
+int ptlrpcd_idle(void *arg)
+{
+        struct ptlrpcd_ctl *pc = arg;
+
+        return (list_empty(&pc->pc_set->set_new_requests) &&
+                pc->pc_set->set_remaining == 0);
+}
+
 #endif
 
 static int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc)
@@ -205,10 +215,8 @@ static int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc)
         memset(pc, 0, sizeof(*pc));
         init_completion(&pc->pc_starting);
         init_completion(&pc->pc_finishing);
-        cfs_waitq_init(&pc->pc_waitq);
         pc->pc_flags = 0;
         spin_lock_init(&pc->pc_lock);
-        CFS_INIT_LIST_HEAD(&pc->pc_req_list);
         snprintf (pc->pc_name, sizeof (pc->pc_name), name);
 
         pc->pc_set = ptlrpc_prep_set();
@@ -224,8 +232,12 @@ static int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc)
 
         wait_for_completion(&pc->pc_starting);
 #else
-        pc->pc_callback =
-                liblustre_register_wait_callback(&ptlrpcd_check_async_rpcs, pc);
+        pc->pc_wait_callback =
+                liblustre_register_wait_callback("ptlrpcd_check_async_rpcs",
+                                                 &ptlrpcd_check_async_rpcs, pc);
+        pc->pc_idle_callback =
+                liblustre_register_idle_callback("ptlrpcd_check_idle_rpcs",
+                                                 &ptlrpcd_idle, pc);
         (void)rc;
 #endif
         RETURN(0);
@@ -234,11 +246,12 @@ static int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc)
 static void ptlrpcd_stop(struct ptlrpcd_ctl *pc)
 {
         set_bit(LIOD_STOP, &pc->pc_flags);
-        cfs_waitq_signal(&pc->pc_waitq);
+        cfs_waitq_signal(&pc->pc_set->set_waitq);
 #ifdef __KERNEL__
         wait_for_completion(&pc->pc_finishing);
 #else
-        liblustre_deregister_wait_callback(pc->pc_callback);
+        liblustre_deregister_wait_callback(pc->pc_wait_callback);
+        liblustre_deregister_idle_callback(pc->pc_idle_callback);
 #endif
         ptlrpc_set_destroy(pc->pc_set);
 }