Whamcloud - gitweb
b=21938 use the same set during replay
authorhongchao.zhang <Hongchao.Zhang@sun.com>
Wed, 21 Apr 2010 00:54:53 +0000 (08:54 +0800)
committerRobert Read <robert.read@oracle.com>
Mon, 26 Apr 2010 18:28:06 +0000 (11:28 -0700)
some requests use its own ptlrpc_request_set to process its requests, but Lustre
will use a specific ptlrpc_request_set to process the requests during recovery.
this patch fixes this problem to allow the requests to use its own set if it have
one

i=johann@sun.com
i=tappro@sun.com

lustre/include/lustre_lib.h
lustre/include/lustre_net.h
lustre/ptlrpc/client.c
lustre/ptlrpc/import.c
lustre/ptlrpc/pinger.c
lustre/ptlrpc/ptlrpcd.c
lustre/ptlrpc/sec.c

index d15277c..5e2627b 100644 (file)
@@ -607,6 +607,10 @@ static inline void obd_ioctl_freedata(char *buf, int len)
  * XXX nikita: some ptlrpc daemon threads have races of that sort.
  *
  */
+static inline int back_to_sleep(void *arg)
+{
+        return 0;
+}
 
 #define LWI_ON_SIGNAL_NOOP ((void (*)(void *))(-1))
 
index 8c3f7d6..d507cb8 100644 (file)
@@ -222,7 +222,7 @@ struct ptlrpc_request_set;
 typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int);
 
 struct ptlrpc_request_set {
-        int                   set_remaining; /* # uncompleted requests */
+        cfs_atomic_t          set_remaining; /* # uncompleted requests */
         cfs_waitq_t           set_waitq;
         cfs_waitq_t          *set_wakeup_ptr;
         cfs_list_t            set_requests;
@@ -375,7 +375,9 @@ struct ptlrpc_request {
                 rq_hp:1,            /* high priority RPC */
                 rq_at_linked:1,     /* link into service's srv_at_array */
                 rq_reply_truncate:1,
-                rq_committed:1;
+                rq_committed:1,
+                /* whether the "rq_set" is a valid one */
+                rq_invalid_rqset:1;
 
         enum rq_phase rq_phase; /* one of RQ_PHASE_* */
         enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
@@ -492,7 +494,8 @@ struct ptlrpc_request {
         int    rq_timeout;               /* service time estimate (secs) */
 
         /* Multi-rpc bits */
-        cfs_list_t rq_set_chain;
+        cfs_list_t  rq_set_chain;
+        cfs_waitq_t rq_set_waitq;
         struct ptlrpc_request_set *rq_set;
         /** Async completion handler */
         ptlrpc_interpterer_t rq_interpret_reply;
index eb69133..b04fccb 100644 (file)
@@ -533,6 +533,7 @@ static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
         CFS_INIT_LIST_HEAD(&request->rq_history_list);
         CFS_INIT_LIST_HEAD(&request->rq_exp_list);
         cfs_waitq_init(&request->rq_reply_waitq);
+        cfs_waitq_init(&request->rq_set_waitq);
         request->rq_xid = ptlrpc_next_xid();
         cfs_atomic_set(&request->rq_refcount, 1);
 
@@ -715,6 +716,7 @@ struct ptlrpc_request *ptlrpc_prep_fakereq(struct obd_import *imp,
         CFS_INIT_LIST_HEAD(&request->rq_history_list);
         CFS_INIT_LIST_HEAD(&request->rq_exp_list);
         cfs_waitq_init(&request->rq_reply_waitq);
+        cfs_waitq_init(&request->rq_set_waitq);
 
         request->rq_xid = ptlrpc_next_xid();
         cfs_atomic_set(&request->rq_refcount, 1);
@@ -729,7 +731,7 @@ void ptlrpc_fakereq_finished(struct ptlrpc_request *req)
                 struct ptlrpc_request_set *set = req->rq_set;
 
                 if (set)
-                        set->set_remaining --;
+                        cfs_atomic_dec(&set->set_remaining);
         }
 
         ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
@@ -747,7 +749,7 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void)
                 RETURN(NULL);
         CFS_INIT_LIST_HEAD(&set->set_requests);
         cfs_waitq_init(&set->set_waitq);
-        set->set_remaining = 0;
+        cfs_atomic_set(&set->set_remaining, 0);
         cfs_spin_lock_init(&set->set_new_req_lock);
         CFS_INIT_LIST_HEAD(&set->set_new_requests);
         CFS_INIT_LIST_HEAD(&set->set_cblist);
@@ -765,7 +767,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
         ENTRY;
 
         /* Requests on the set should either all be completed, or all be new */
-        expected_phase = (set->set_remaining == 0) ?
+        expected_phase = (cfs_atomic_read(&set->set_remaining) == 0) ?
                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
         cfs_list_for_each (tmp, &set->set_requests) {
                 struct ptlrpc_request *req =
@@ -776,8 +778,9 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
                 n++;
         }
 
-        LASSERTF(set->set_remaining == 0 || set->set_remaining == n, "%d / %d\n",
-                 set->set_remaining, n);
+        LASSERTF(cfs_atomic_read(&set->set_remaining) == 0 || 
+                 cfs_atomic_read(&set->set_remaining) == n, "%d / %d\n",
+                 cfs_atomic_read(&set->set_remaining), n);
 
         cfs_list_for_each_safe(tmp, next, &set->set_requests) {
                 struct ptlrpc_request *req =
@@ -789,14 +792,14 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
 
                 if (req->rq_phase == RQ_PHASE_NEW) {
                         ptlrpc_req_interpret(NULL, req, -EBADR);
-                        set->set_remaining--;
+                        cfs_atomic_dec(&set->set_remaining);
                 }
 
                 req->rq_set = NULL;
                 ptlrpc_req_finished (req);
         }
 
-        LASSERT(set->set_remaining == 0);
+        LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
 
         OBD_FREE(set, sizeof(*set));
         EXIT;
@@ -824,7 +827,7 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
         /* The set takes over the caller's request reference */
         cfs_list_add_tail(&req->rq_set_chain, &set->set_requests);
         req->rq_set = set;
-        set->set_remaining++;
+        cfs_atomic_inc(&set->set_remaining);
 }
 
 /**
@@ -1227,7 +1230,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
         int force_timer_recalc = 0;
         ENTRY;
 
-        if (set->set_remaining == 0)
+        if (cfs_atomic_read(&set->set_remaining) == 0)
                 RETURN(1);
 
         cfs_list_for_each(tmp, &set->set_requests) {
@@ -1539,12 +1542,12 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                 }
                 cfs_spin_unlock(&imp->imp_lock);
 
-                set->set_remaining--;
+                cfs_atomic_dec(&set->set_remaining);
                 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
         }
 
         /* If we hit an error, we want to recover promptly. */
-        RETURN(set->set_remaining == 0 || force_timer_recalc);
+        RETURN(cfs_atomic_read(&set->set_remaining) == 0 || force_timer_recalc);
 }
 
 /* Return 1 if we should give up, else 0 */
@@ -1794,9 +1797,18 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                  * EINTR.
                  * I don't really care if we go once more round the loop in
                  * the error cases -eeb. */
-        } while (rc != 0 || set->set_remaining != 0);
+                if (rc == 0 && cfs_atomic_read(&set->set_remaining) == 0) {
+                        cfs_list_for_each(tmp, &set->set_requests) {
+                                req = cfs_list_entry(tmp, struct ptlrpc_request,
+                                                     rq_set_chain);
+                                cfs_spin_lock(&req->rq_lock);
+                                req->rq_invalid_rqset = 1;
+                                cfs_spin_unlock(&req->rq_lock);
+                        }
+                }
+        } while (rc != 0 || cfs_atomic_read(&set->set_remaining) != 0);
 
-        LASSERT(set->set_remaining == 0);
+        LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
 
         rc = 0;
         cfs_list_for_each(tmp, &set->set_requests) {
index d2a9ad4..95eb543 100644 (file)
@@ -1371,11 +1371,6 @@ out:
         RETURN(rc);
 }
 
-static int back_to_sleep(void *unused)
-{
-        return 0;
-}
-
 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
 {
         struct ptlrpc_request *req;
index fdc172f..8584adb 100644 (file)
@@ -809,7 +809,7 @@ static int pinger_check_rpcs(void *arg)
         cfs_mutex_up(&pinger_sem);
 
         /* Might be empty, that's OK. */
-        if (set->set_remaining == 0)
+        if (cfs_atomic_read(&set->set_remaining) == 0)
                 CDEBUG(D_RPCTRACE, "nothing to ping\n");
 
         cfs_list_for_each(iter, &set->set_requests) {
@@ -858,7 +858,7 @@ do_check_set:
                         cfs_atomic_dec(&imp->imp_inflight);
                 }
                 cfs_spin_unlock(&imp->imp_lock);
-                set->set_remaining--;
+                cfs_atomic_dec(&set->set_remaining);
         }
         cfs_mutex_up(&pinger_sem);
 
index e6bbf18..225db05 100644 (file)
@@ -119,9 +119,9 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
                 cfs_list_del_init(&req->rq_set_chain);
                 req->rq_set = NULL;
                 ptlrpcd_add_req(req, PSCOPE_OTHER);
-                set->set_remaining--;
+                cfs_atomic_dec(&set->set_remaining);
         }
-        LASSERT(set->set_remaining == 0);
+        LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
 }
 EXPORT_SYMBOL(ptlrpcd_add_rqset);
 
@@ -136,6 +136,31 @@ int ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope)
         int rc;
 
         LASSERT(scope < PSCOPE_NR);
+        
+        cfs_spin_lock(&req->rq_lock);
+        if (req->rq_invalid_rqset) {
+                cfs_duration_t timeout;
+                struct l_wait_info lwi;
+
+                req->rq_invalid_rqset = 0;
+                cfs_spin_unlock(&req->rq_lock);
+
+                timeout = cfs_time_seconds(5);
+                lwi = LWI_TIMEOUT(timeout, back_to_sleep, NULL);
+                l_wait_event(req->rq_reply_waitq, (req->rq_set == NULL), &lwi);
+        } else if (req->rq_set) {
+                LASSERT(req->rq_phase == RQ_PHASE_NEW);
+                LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY);
+
+                /* ptlrpc_check_set will decrease the count */
+                cfs_atomic_inc(&req->rq_set->set_remaining);
+                cfs_spin_unlock(&req->rq_lock);
+
+                cfs_waitq_signal(&req->rq_set->set_waitq);
+        } else {
+                cfs_spin_unlock(&req->rq_lock);
+        }
+
         pt = req->rq_send_state == LUSTRE_IMP_FULL ? PT_NORMAL : PT_RECOVERY;
         pc = &ptlrpcd_scopes[scope].pscope_thread[pt].pt_ctl;
         rc = ptlrpc_set_add_new_req(pc, req);
@@ -184,7 +209,7 @@ static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc)
         }
         cfs_spin_unlock(&pc->pc_set->set_new_req_lock);
 
-        if (pc->pc_set->set_remaining) {
+        if (cfs_atomic_read(&pc->pc_set->set_remaining)) {
                 rc = rc | ptlrpc_check_set(env, pc->pc_set);
 
                 /*
@@ -346,7 +371,7 @@ int ptlrpcd_idle(void *arg)
         struct ptlrpcd_ctl *pc = arg;
 
         return (cfs_list_empty(&pc->pc_set->set_new_requests) &&
-                pc->pc_set->set_remaining == 0);
+                cfs_atomic_read(&pc->pc_set->set_remaining) == 0);
 }
 
 #endif
index defe085..fce57c7 100644 (file)
@@ -896,6 +896,7 @@ int sptlrpc_import_check_ctx(struct obd_import *imp)
         cfs_atomic_set(&req->rq_refcount, 10000);
         CFS_INIT_LIST_HEAD(&req->rq_ctx_chain);
         cfs_waitq_init(&req->rq_reply_waitq);
+        cfs_waitq_init(&req->rq_set_waitq);
         req->rq_import = imp;
         req->rq_flvr = sec->ps_flvr;
         req->rq_cli_ctx = ctx;