Whamcloud - gitweb
b=14608
authoryury <yury>
Sat, 23 Aug 2008 13:58:38 +0000 (13:58 +0000)
committeryury <yury>
Sat, 23 Aug 2008 13:58:38 +0000 (13:58 +0000)
r=shadow,wangdi

- fixes reply_dual test_17 after landing new recov_thread stuff;
- some cleanups.

lustre/include/lustre_log.h
lustre/ptlrpc/recov_thread.c

index b81e34d..66ebc17 100644 (file)
@@ -286,6 +286,10 @@ struct llog_commit_master {
          */
         struct ptlrpcd_ctl         lcm_pc;
         /**
+         * Busy resources waitq
+         */
+        cfs_waitq_t                lcm_waitq;
+        /**
          * Commit thread name buffer. Only used for thread start.
          */
         char                       lcm_name[LCM_NAME_SIZE];
index 188574b..60a4aaf 100644 (file)
@@ -106,6 +106,7 @@ static struct llog_canceld_ctxt *llcd_alloc(void)
  */
 static void llcd_free(struct llog_canceld_ctxt *llcd)
 {
+        LASSERT(atomic_read(&llcd_count) > 0);
         OBD_SLAB_FREE(llcd, llcd_cache, llcd->llcd_size);
         atomic_dec(&llcd_count);
 }
@@ -319,11 +320,16 @@ static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt)
  */
 static void llcd_put(struct llog_ctxt *ctxt)
 {
+        struct llog_commit_master *lcm;
         struct llog_canceld_ctxt *llcd;
 
+        lcm = ctxt->loc_lcm;
         llcd = llcd_detach(ctxt);
         if (llcd)
                 llcd_free(llcd);
+
+        if (atomic_read(&lcm->lcm_count) == 0)
+                cfs_waitq_signal(&lcm->lcm_waitq);
 }
 
 /**
@@ -352,8 +358,6 @@ static int llcd_push(struct llog_ctxt *ctxt)
         return rc;
 }
 
-static atomic_t llog_tcount = ATOMIC_INIT(0);
-
 /**
  * Start recovery thread which actually deals llcd sending. This
  * is all ptlrpc standard thread based so there is not much of work
@@ -371,8 +375,6 @@ int llog_recov_thread_start(struct llog_commit_master *lcm)
                 RETURN(rc);
         }
         lcm->lcm_set = lcm->lcm_pc.pc_set;
-        atomic_inc(&llog_tcount);
-
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_recov_thread_start);
@@ -382,8 +384,9 @@ EXPORT_SYMBOL(llog_recov_thread_start);
  */
 void llog_recov_thread_stop(struct llog_commit_master *lcm, int force)
 {
+        struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
         ENTRY;
-        
+
         /**
          * Let all know that we're stopping. This will also make 
          * llcd_send() refuse any new llcds.
@@ -395,13 +398,13 @@ void llog_recov_thread_stop(struct llog_commit_master *lcm, int force)
          * for processing now.
          */
         ptlrpcd_stop(&lcm->lcm_pc, force);
-        
+
         /*
-         * No llcds on this @lcm should left.
+         * Wait for llcd number == 0. Note, this is infinite wait.
+         * All other parts should make sure that no lost llcd is left.
          */
-        LASSERTF(atomic_read(&lcm->lcm_count) == 0, 
-                 "Busy llcds found on lcm %p - (%d)\n", 
-                 lcm, atomic_read(&lcm->lcm_count));
+        l_wait_event(lcm->lcm_waitq,
+                     atomic_read(&lcm->lcm_count) == 0, &lwi);
         EXIT;
 }
 EXPORT_SYMBOL(llog_recov_thread_stop);
@@ -420,13 +423,13 @@ struct llog_commit_master *llog_recov_thread_init(char *name)
                 RETURN(NULL);
 
         /*
-         * Try to create threads with unique names and user id.
+         * Try to create threads with unique names.
          */
         snprintf(lcm->lcm_name, sizeof(lcm->lcm_name), 
-                 "ll_log_commit_%s_%02d", name, 
-                 atomic_read(&llog_tcount));
+                 "ll_log_commit_%s", name);
 
         strncpy(lcm->lcm_name, name, sizeof(lcm->lcm_name));
+        cfs_waitq_init(&lcm->lcm_waitq);
         atomic_set(&lcm->lcm_count, 0);
         rc = llog_recov_thread_start(lcm);
         if (rc) {
@@ -452,8 +455,8 @@ void llog_recov_thread_fini(struct llog_commit_master *lcm, int force)
 }
 EXPORT_SYMBOL(llog_recov_thread_fini);
 
-static int llog_obd_repl_generic(struct llog_ctxt *ctxt, 
-                                 void *handle, void *arg)
+static int llog_recov_thread_replay(struct llog_ctxt *ctxt, 
+                                    void *cb, void *arg)
 {
         struct obd_device *obd = ctxt->loc_obd;
         struct llog_process_cat_args *lpca;
@@ -470,7 +473,7 @@ static int llog_obd_repl_generic(struct llog_ctxt *ctxt,
         if (!lpca)
                 RETURN(-ENOMEM);
 
-        lpca->lpca_cb = handle;
+        lpca->lpca_cb = cb;
         lpca->lpca_arg = arg;
 
         /*
@@ -495,40 +498,27 @@ static int llog_obd_repl_generic(struct llog_ctxt *ctxt,
         RETURN(rc);
 }
 
-int llog_obd_repl_connect(struct llog_ctxt *ctxt, int count,
+int llog_obd_repl_connect(struct llog_ctxt *ctxt, int count, 
                           struct llog_logid *logid, struct llog_gen *gen,
                           struct obd_uuid *uuid)
 {
-        struct llog_canceld_ctxt *llcd;
         int rc;
         ENTRY;
 
-        mutex_down(&ctxt->loc_sem);
-
         /* 
-         * Send back cached llcd before recovery from llog if we have any. 
+         * Send back cached llcd from llog before recovery if we have any.
+         * This is void is nothing cached is found there.
          */
-        if (ctxt->loc_llcd) {
-                CWARN("Llcd %p:%p is not empty\n", ctxt->loc_llcd, ctxt);
-                mutex_up(&ctxt->loc_sem);
-                llog_sync(ctxt, NULL);
-                mutex_down(&ctxt->loc_sem);
-        }
-
-        llcd = llcd_get(ctxt);
-        if (!llcd) {
-                mutex_up(&ctxt->loc_sem);
-                RETURN(-ENOMEM);
-        }
+        llog_sync(ctxt, NULL);
 
+        /* 
+         * Start recovery in separate thread. 
+         */
+        mutex_down(&ctxt->loc_sem);
         ctxt->loc_gen = *gen;
-
-        rc = llog_obd_repl_generic(ctxt, ctxt->llog_proc_cb, logid);
-        if (rc != 0) {
-                llcd_put(ctxt);
-                CERROR("Error recovery process: %d\n", rc);
-        }
+        rc = llog_recov_thread_replay(ctxt, ctxt->llog_proc_cb, logid);
         mutex_up(&ctxt->loc_sem);
+
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_obd_repl_connect);
@@ -542,6 +532,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                          struct lov_stripe_md *lsm, int count,
                          struct llog_cookie *cookies, int flags)
 {
+        struct llog_commit_master *lcm;
         struct llog_canceld_ctxt *llcd;
         int rc = 0;
         ENTRY;
@@ -549,6 +540,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
         LASSERT(ctxt != NULL);
 
         mutex_down(&ctxt->loc_sem);
+        lcm = ctxt->loc_lcm;
         
         /*
          * Let's check if we have all structures alive. We also check for
@@ -564,7 +556,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                 GOTO(out, rc = -ENODEV);
         }
 
-        if (test_bit(LLOG_LCM_FL_EXIT, &ctxt->loc_lcm->lcm_flags)) {
+        if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) {
                 CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n", 
                        ctxt);
                 GOTO(out, rc = -ENODEV);
@@ -580,6 +572,12 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                         llcd = llcd_get(ctxt);
                         if (!llcd)
                                 GOTO(out, rc = -ENOMEM);
+                        /*
+                         * Allocation is successful, let's check for stop
+                         * flag again to fall back as soon as possible.
+                         */
+                        if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
+                                GOTO(out, rc = -ENODEV);
                 }
 
                 /*
@@ -593,6 +591,12 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                         llcd = llcd_get(ctxt);
                         if (!llcd)
                                 GOTO(out, rc = -ENOMEM);
+                        /*
+                         * Allocation is successful, let's check for stop
+                         * flag again to fall back as soon as possible.
+                         */
+                        if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
+                                GOTO(out, rc = -ENODEV);
                 }
 
                 /*
@@ -611,6 +615,8 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
         }
         EXIT;
 out:
+        if (rc)
+                llcd_put(ctxt);
         mutex_up(&ctxt->loc_sem);
         return rc;
 }
@@ -621,16 +627,17 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
         int rc = 0;
         ENTRY;
 
+        mutex_down(&ctxt->loc_sem);
         if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
                 CDEBUG(D_RPCTRACE, "Reverse import disconnect\n");
                 /*
                  * Check for llcd which might be left attached to @ctxt.
                  * Let's kill it.
                  */
-                mutex_down(&ctxt->loc_sem);
                 llcd_put(ctxt);
                 mutex_up(&ctxt->loc_sem);
         } else {
+                mutex_up(&ctxt->loc_sem);
                 rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
         }
         RETURN(rc);
@@ -670,8 +677,6 @@ int llog_recov_init(void)
  */
 void llog_recov_fini(void)
 {
-        int count;
-
         /*
          * Kill llcd cache when thread is stopped and we're sure no 
          * llcd in use left.
@@ -681,9 +686,9 @@ void llog_recov_fini(void)
                  * In 2.6.22 cfs_mem_cache_destroy() will not return error
                  * for busy resources. Let's check it another way.
                  */
-                count = atomic_read(&llcd_count);
-                LASSERTF(count == 0, "Can't destroy llcd cache! Number of "
-                         "busy llcds: %d\n", count);
+                LASSERTF(atomic_read(&llcd_count) == 0, 
+                         "Can't destroy llcd cache! Number of "
+                         "busy llcds: %d\n", atomic_read(&llcd_count));
                 cfs_mem_cache_destroy(llcd_cache);
                 llcd_cache = NULL;
         }