Whamcloud - gitweb
b=21938 use req->rq_set itself during recovery
authorhongchao.zhang <Hongchao.Zhang@Sun.COM>
Sun, 7 Mar 2010 06:52:55 +0000 (14:52 +0800)
committerJohann Lombardi <johann@sun.com>
Wed, 10 Mar 2010 09:40:21 +0000 (10:40 +0100)
during recovery, uses req->rq_set itself to replay the request
instead of ptlrpcd_recovery_pc

i=tappro@sun.com
i=johann@sun.com

lustre/include/lustre_lib.h
lustre/include/lustre_net.h
lustre/ptlrpc/client.c
lustre/ptlrpc/import.c
lustre/ptlrpc/pinger.c
lustre/ptlrpc/ptlrpcd.c

index d8efcf1..a7f92ae 100644 (file)
@@ -595,6 +595,11 @@ static inline void obd_ioctl_freedata(char *buf, int len)
  *
  */
 
+static inline int back_to_sleep(void *arg)
+{
+        return 0;
+}
+
 #define LWI_ON_SIGNAL_NOOP ((void (*)(void *))(-1))
 
 struct l_wait_info {
index 4d90174..2d6abab 100644 (file)
@@ -194,7 +194,7 @@ struct ptlrpc_request_set;
 typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int);
 
 struct ptlrpc_request_set {
-        int               set_remaining; /* # uncompleted requests */
+        atomic_t          set_remaining; /* # uncompleted requests */
         cfs_waitq_t       set_waitq;
         cfs_waitq_t      *set_wakeup_ptr;
         struct list_head  set_requests;
@@ -326,7 +326,9 @@ struct ptlrpc_request {
                 rq_reply_truncate:1, /* reply is truncated */
                 rq_fake:1,          /* fake request - just for timeout only */
                 /* the request is queued to replay during recovery */
-                rq_copy_queued:1;
+                rq_copy_queued:1,
+                /* whether the "rq_set" is a valid one */
+                rq_invalid_rqset;
         enum rq_phase rq_phase;     /* one of RQ_PHASE_* */
         enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
         atomic_t rq_refcount;   /* client-side refcount for SENT race,
@@ -397,6 +399,7 @@ struct ptlrpc_request {
 
         /* Multi-rpc bits */
         struct list_head rq_set_chain;
+        cfs_waitq_t      rq_set_waitq;
         struct ptlrpc_request_set *rq_set;
         int (*rq_interpret_reply)(struct ptlrpc_request *req, void *data,
                                   int rc); /* async interpret handler */
index ef872ec..73f174a 100644 (file)
@@ -594,6 +594,7 @@ ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode,
         CFS_INIT_LIST_HEAD(&request->rq_history_list);
         CFS_INIT_LIST_HEAD(&request->rq_exp_list);
         cfs_waitq_init(&request->rq_reply_waitq);
+        cfs_waitq_init(&request->rq_set_waitq);
         request->rq_xid = ptlrpc_next_xid();
         atomic_set(&request->rq_refcount, 1);
 
@@ -650,6 +651,7 @@ struct ptlrpc_request *ptlrpc_prep_fakereq(struct obd_import *imp,
         CFS_INIT_LIST_HEAD(&request->rq_history_list);
         CFS_INIT_LIST_HEAD(&request->rq_exp_list);
         cfs_waitq_init(&request->rq_reply_waitq);
+        cfs_waitq_init(&request->rq_set_waitq);
 
         request->rq_xid = ptlrpc_next_xid();
         atomic_set(&request->rq_refcount, 1);
@@ -664,7 +666,7 @@ void ptlrpc_fakereq_finished(struct ptlrpc_request *req)
                 struct ptlrpc_request_set *set = req->rq_set;
 
                 if (set)
-                        set->set_remaining --;
+                        atomic_dec(&set->set_remaining);
         }
 
         ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
@@ -682,7 +684,7 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void)
                 RETURN(NULL);
         CFS_INIT_LIST_HEAD(&set->set_requests);
         cfs_waitq_init(&set->set_waitq);
-        set->set_remaining = 0;
+        atomic_set(&set->set_remaining, 0);
         spin_lock_init(&set->set_new_req_lock);
         CFS_INIT_LIST_HEAD(&set->set_new_requests);
         CFS_INIT_LIST_HEAD(&set->set_cblist);
@@ -700,7 +702,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
         ENTRY;
 
         /* Requests on the set should either all be completed, or all be new */
-        expected_phase = (set->set_remaining == 0) ?
+        expected_phase = (atomic_read(&set->set_remaining) == 0) ?
                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
         list_for_each (tmp, &set->set_requests) {
                 struct ptlrpc_request *req =
@@ -710,8 +712,9 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
                 n++;
         }
 
-        LASSERTF(set->set_remaining == 0 || set->set_remaining == n, "%d / %d\n",
-                 set->set_remaining, n);
+        LASSERTF(atomic_read(&set->set_remaining) == 0 || 
+                 atomic_read(&set->set_remaining) == n, "%d / %d\n",
+                 atomic_read(&set->set_remaining), n);
 
         list_for_each_safe(tmp, next, &set->set_requests) {
                 struct ptlrpc_request *req =
@@ -733,14 +736,19 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
                                 interpreter(req, &req->rq_async_args,
                                             req->rq_status);
                         }
-                        set->set_remaining--;
+                        atomic_dec(&set->set_remaining);
                 }
 
+                spin_lock(&req->rq_lock);
                 req->rq_set = NULL;
+                req->rq_invalid_rqset = 0;
+                spin_unlock(&req->rq_lock);
+
+                cfs_waitq_signal(&req->rq_set_waitq);
                 ptlrpc_req_finished (req);
         }
 
-        LASSERT(set->set_remaining == 0);
+        LASSERT(atomic_read(&set->set_remaining) == 0);
 
         OBD_FREE(set, sizeof(*set));
         EXIT;
@@ -767,9 +775,10 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
                         struct ptlrpc_request *req)
 {
         /* The set takes over the caller's request reference */
+        LASSERT(list_empty(&req->rq_set_chain));
         list_add_tail(&req->rq_set_chain, &set->set_requests);
         req->rq_set = set;
-        set->set_remaining++;
+        atomic_inc(&set->set_remaining);
 }
 
 /** 
@@ -792,6 +801,7 @@ int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
         /* 
          * The set takes over the caller's request reference. 
          */
+        LASSERT(list_empty(&req->rq_set_chain));
         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
         req->rq_set = set;
         spin_unlock(&set->set_new_req_lock);
@@ -1164,7 +1174,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
         int force_timer_recalc = 0;
         ENTRY;
 
-        if (set->set_remaining == 0)
+        if (atomic_read(&set->set_remaining) == 0)
                 RETURN(1);
 
         list_for_each(tmp, &set->set_requests) {
@@ -1409,12 +1419,12 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                 }
                 spin_unlock(&imp->imp_lock);
 
-                set->set_remaining--;
+                atomic_dec(&set->set_remaining);
                 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
         }
 
         /* If we hit an error, we want to recover promptly. */
-        RETURN(set->set_remaining == 0 || force_timer_recalc);
+        RETURN(atomic_read(&set->set_remaining) == 0 || force_timer_recalc);
 }
 
 /* Return 1 if we should give up, else 0 */
@@ -1626,9 +1636,18 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                  * EINTR.
                  * I don't really care if we go once more round the loop in
                  * the error cases -eeb. */
-        } while (rc != 0 || set->set_remaining != 0);
+                if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
+                        list_for_each(tmp, &set->set_requests) {
+                                req = list_entry(tmp, struct ptlrpc_request, 
+                                                 rq_set_chain);
+                                spin_lock(&req->rq_lock);
+                                req->rq_invalid_rqset = 1;
+                                spin_unlock(&req->rq_lock);
+                        }
+                }
+        } while (rc != 0 || atomic_read(&set->set_remaining) != 0);
 
-        LASSERT(set->set_remaining == 0);
+        LASSERT(atomic_read(&set->set_remaining) == 0);
 
         rc = 0;
         list_for_each(tmp, &set->set_requests) {
index 06e4c3c..f7424e4 100644 (file)
@@ -1372,11 +1372,6 @@ int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
         RETURN(rc);
 }
 
-static int back_to_sleep(void *unused)
-{
-        return 0;
-}
-
 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
 {
         struct ptlrpc_request *req;
index 130621f..547c2f6 100644 (file)
@@ -806,7 +806,7 @@ static int pinger_check_rpcs(void *arg)
         mutex_up(&pinger_sem);
 
         /* Might be empty, that's OK. */
-        if (set->set_remaining == 0)
+        if (atomic_read(&set->set_remaining) == 0)
                 CDEBUG(D_RPCTRACE, "nothing to ping\n");
 
         list_for_each(iter, &set->set_requests) {
@@ -855,7 +855,7 @@ do_check_set:
                         atomic_dec(&imp->imp_inflight);
                 }
                 spin_unlock(&imp->imp_lock);
-                set->set_remaining--;
+                atomic_dec(&set->set_remaining);
         }
         mutex_up(&pinger_sem);
 
index 0bcc528..f771b1e 100644 (file)
@@ -85,9 +85,9 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
                 list_del_init(&req->rq_set_chain);
                 req->rq_set = NULL;
                 ptlrpcd_add_req(req);
-                set->set_remaining--;
+                atomic_dec(&set->set_remaining);
         }
-        LASSERT(set->set_remaining == 0);
+        LASSERT(atomic_read(&set->set_remaining) == 0);
 }
 EXPORT_SYMBOL(ptlrpcd_add_rqset);
 
@@ -100,6 +100,31 @@ int ptlrpcd_add_req(struct ptlrpc_request *req)
         struct ptlrpcd_ctl *pc;
         int rc;
 
+        spin_lock(&req->rq_lock);
+        if (req->rq_invalid_rqset) {
+                cfs_duration_t timeout;
+                struct l_wait_info lwi;
+
+                req->rq_invalid_rqset = 0;
+                spin_unlock(&req->rq_lock);
+                
+                timeout = cfs_time_seconds(5);
+                lwi = LWI_TIMEOUT(timeout, back_to_sleep, NULL);
+                l_wait_event(req->rq_reply_waitq, (req->rq_set == NULL), &lwi);
+        } else if (req->rq_set) {
+                LASSERT(req->rq_phase == RQ_PHASE_NEW);
+                LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY);
+
+                /* ptlrpc_check_set will decrease the count */
+                atomic_inc(&req->rq_set->set_remaining);
+                spin_unlock(&req->rq_lock);
+
+                cfs_waitq_signal(&req->rq_set->set_waitq);
+                return 0;
+        } else {
+                spin_unlock(&req->rq_lock);
+        }
+
         if (req->rq_send_state == LUSTRE_IMP_FULL)
                 pc = &ptlrpcd_pc;
         else
@@ -146,7 +171,7 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
         }
         spin_unlock(&pc->pc_set->set_new_req_lock);
 
-        if (pc->pc_set->set_remaining) {
+        if (atomic_read(&pc->pc_set->set_remaining)) {
                 rc = rc | ptlrpc_check_set(pc->pc_set);
 
                 /*
@@ -273,7 +298,7 @@ int ptlrpcd_idle(void *arg)
         struct ptlrpcd_ctl *pc = arg;
 
         return (list_empty(&pc->pc_set->set_new_requests) &&
-                pc->pc_set->set_remaining == 0);
+                atomic_read(&pc->pc_set->set_remaining) == 0);
 }
 
 #endif