Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / ptlrpc / recov_thread.c
index b74c72e..9ec7074 100644 (file)
 
 #ifdef __KERNEL__
 
-static struct llog_commit_master lustre_lcm;
-static struct llog_commit_master *lcm = &lustre_lcm;
-
 /* Allocate new commit structs in case we do not have enough.
  * Make the llcd size small enough that it fits into a single page when we
  * are sending/receiving it. */
-static int llcd_alloc(void)
+static int llcd_alloc(struct llog_commit_master *lcm)
 {
         struct llog_canceld_ctxt *llcd;
         int llcd_size;
@@ -85,7 +82,7 @@ static int llcd_alloc(void)
 }
 
 /* Get a free cookie struct from the list */
-struct llog_canceld_ctxt *llcd_grab(void)
+static struct llog_canceld_ctxt *llcd_grab(struct llog_commit_master *lcm)
 {
         struct llog_canceld_ctxt *llcd;
 
@@ -93,7 +90,7 @@ repeat:
         spin_lock(&lcm->lcm_llcd_lock);
         if (list_empty(&lcm->lcm_llcd_free)) {
                 spin_unlock(&lcm->lcm_llcd_lock);
-                if (llcd_alloc() < 0) {
+                if (llcd_alloc(lcm) < 0) {
                         CERROR("unable to allocate log commit data!\n");
                         return NULL;
                 }
@@ -110,10 +107,12 @@ repeat:
 
         return llcd;
 }
-EXPORT_SYMBOL(llcd_grab);
 
 static void llcd_put(struct llog_canceld_ctxt *llcd)
 {
+        struct llog_commit_master *lcm = llcd->llcd_lcm;
+
+        llog_ctxt_put(llcd->llcd_ctxt);
         if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
                 int llcd_size = llcd->llcd_size +
                          offsetof(struct llog_canceld_ctxt, llcd_cookies);
@@ -127,15 +126,16 @@ static void llcd_put(struct llog_canceld_ctxt *llcd)
 }
 
 /* Send some cookies to the appropriate target */
-void llcd_send(struct llog_canceld_ctxt *llcd)
+static void llcd_send(struct llog_canceld_ctxt *llcd)
 {
-        spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
-        list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending);
-        spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
-
+        if (!(llcd->llcd_lcm->lcm_flags & LLOG_LCM_FL_EXIT)) {
+                spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
+                list_add_tail(&llcd->llcd_list,
+                              &llcd->llcd_lcm->lcm_llcd_pending);
+                spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
+        }
         cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1);
 }
-EXPORT_SYMBOL(llcd_send);
 
 /* deleted objects have a commit callback that cancels the MDS
  * log record for the deletion.  The commit callback calls this
@@ -153,7 +153,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
 
         mutex_down(&ctxt->loc_sem);
         if (ctxt->loc_imp == NULL) {
-                CDEBUG(D_HA, "no import for ctxt %p\n", ctxt);
+                CDEBUG(D_RPCTRACE, "no import for ctxt %p\n", ctxt);
                 GOTO(out, rc = 0);
         }
 
@@ -161,7 +161,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
 
         if (count > 0 && cookies != NULL) {
                 if (llcd == NULL) {
-                        llcd = llcd_grab();
+                        llcd = llcd_grab(ctxt->loc_lcm);
                         if (llcd == NULL) {
                                 CERROR("couldn't get an llcd - dropped "LPX64
                                        ":%x+%u\n",
@@ -170,7 +170,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                                        cookies->lgc_index);
                                 GOTO(out, rc = -ENOMEM);
                         }
-                        llcd->llcd_ctxt = ctxt;
+                        llcd->llcd_ctxt = llog_ctxt_get(ctxt);
                         ctxt->loc_llcd = llcd;
                 }
 
@@ -184,7 +184,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
 
         if ((llcd->llcd_size - llcd->llcd_cookiebytes) < sizeof(*cookies) ||
             (flags & OBD_LLOG_FL_SENDNOW)) {
-                CDEBUG(D_HA, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
+                CDEBUG(D_RPCTRACE, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
                 ctxt->loc_llcd = NULL;
                 llcd_send(llcd);
         }
@@ -200,7 +200,7 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
         ENTRY;
 
         if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
-                CDEBUG(D_HA, "reverse import disconnected, put llcd %p:%p\n",
+                CDEBUG(D_RPCTRACE,"reverse import disconnect, put llcd %p:%p\n",
                        ctxt->loc_llcd, ctxt);
                 mutex_down(&ctxt->loc_sem);
                 if (ctxt->loc_llcd != NULL) {
@@ -217,31 +217,26 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
 }
 EXPORT_SYMBOL(llog_obd_repl_sync);
 
+static void llog_lcm_dec(struct llog_commit_master *lcm)
+{
+        atomic_dec(&lcm->lcm_thread_total);
+        cfs_waitq_signal(&lcm->lcm_waitq);
+}
+
 static int log_commit_thread(void *arg)
 {
-        struct llog_commit_master *lcm = arg;
-        struct llog_commit_daemon *lcd;
+        struct llog_commit_daemon *lcd = arg;
+        struct llog_commit_master *lcm = lcd->lcd_lcm;
         struct llog_canceld_ctxt *llcd, *n;
         struct obd_import *import = NULL;
         ENTRY;
 
-        OBD_ALLOC(lcd, sizeof(*lcd));
-        if (lcd == NULL)
-                RETURN(-ENOMEM);
-
-        spin_lock(&lcm->lcm_thread_lock);
         THREAD_NAME(cfs_curproc_comm(), CFS_CURPROC_COMM_MAX - 1,
-                    "ll_log_comt_%02d", atomic_read(&lcm->lcm_thread_total));
-        atomic_inc(&lcm->lcm_thread_total);
-        spin_unlock(&lcm->lcm_thread_lock);
-
+                    "ll_log_comt_%02d", lcd->lcd_index);
+        
         ptlrpc_daemonize(cfs_curproc_comm()); /* thread never needs to do IO */
-
-        CFS_INIT_LIST_HEAD(&lcd->lcd_lcm_list);
-        CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list);
-        lcd->lcd_lcm = lcm;
-
         CDEBUG(D_HA, "%s started\n", cfs_curproc_comm());
+        
         do {
                 struct ptlrpc_request *request;
                 struct list_head *sending_list;
@@ -254,7 +249,7 @@ static int log_commit_thread(void *arg)
                 /* If we do not have enough pages available, allocate some */
                 while (atomic_read(&lcm->lcm_llcd_numfree) <
                        lcm->lcm_llcd_minfree) {
-                        if (llcd_alloc() < 0)
+                        if (llcd_alloc(lcm) < 0)
                                 break;
                 }
 
@@ -291,7 +286,7 @@ static int log_commit_thread(void *arg)
 
                 if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
                     atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
-                        rc = llog_start_commit_thread();
+                        rc = llog_start_commit_thread(lcm);
                         if (rc < 0)
                                 CERROR("error starting thread: rc %d\n", rc);
                 }
@@ -335,7 +330,7 @@ static int log_commit_thread(void *arg)
 
                         list_del(&llcd->llcd_list);
                         if (llcd->llcd_cookiebytes == 0) {
-                                CDEBUG(D_HA, "put empty llcd %p:%p\n",
+                                CDEBUG(D_RPCTRACE, "put empty llcd %p:%p\n",
                                        llcd, llcd->llcd_ctxt);
                                 llcd_put(llcd);
                                 continue;
@@ -375,9 +370,10 @@ static int log_commit_thread(void *arg)
                                 break;
                         }
 
-                        /* XXX FIXME bug 249, 5515 */
+                        /* bug 5515 */
                         request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
                         request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
+                        ptlrpc_at_set_req_timeout(request);
 
                         ptlrpc_req_set_repsize(request, 1, NULL);
                         mutex_down(&llcd->llcd_ctxt->loc_sem);
@@ -429,36 +425,58 @@ static int log_commit_thread(void *arg)
                         llcd_put(llcd);
         }
 
-        spin_lock(&lcm->lcm_thread_lock);
-        list_del(&lcd->lcd_lcm_list);
-        spin_unlock(&lcm->lcm_thread_lock);
-        OBD_FREE(lcd, sizeof(*lcd));
 
         CDEBUG(D_HA, "%s exiting\n", cfs_curproc_comm());
 
         spin_lock(&lcm->lcm_thread_lock);
-        atomic_dec(&lcm->lcm_thread_total);
+        list_del(&lcd->lcd_lcm_list);
         spin_unlock(&lcm->lcm_thread_lock);
-        cfs_waitq_signal(&lcm->lcm_waitq);
+        OBD_FREE_PTR(lcd);
+        llog_lcm_dec(lcm);
 
-        return 0;
+        RETURN(0);
 }
 
-int llog_start_commit_thread(void)
+int llog_start_commit_thread(struct llog_commit_master *lcm)
 {
-        int rc;
+        struct llog_commit_daemon *lcd;
+        int rc, index; 
         ENTRY;
 
         if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max)
                 RETURN(0);
 
-        rc = cfs_kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES);
+        /* Check whether it will be cleanup llog commit thread first,
+         * If not, increate the lcm_thread_total count to prevent the 
+         * lcm being freed when the log_commit_thread is started */
+        spin_lock(&lcm->lcm_thread_lock);
+        if (!lcm->lcm_flags & LLOG_LCM_FL_EXIT) { 
+                atomic_inc(&lcm->lcm_thread_total);
+                index = atomic_read(&lcm->lcm_thread_total);
+                spin_unlock(&lcm->lcm_thread_lock);
+        } else {
+                spin_unlock(&lcm->lcm_thread_lock);
+                RETURN(0);
+        }
+
+        OBD_ALLOC_PTR(lcd);
+        if (lcd == NULL)
+                GOTO(cleanup, rc = -ENOMEM);
+
+        CFS_INIT_LIST_HEAD(&lcd->lcd_lcm_list);
+        CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list);
+        lcd->lcd_index = index;
+        lcd->lcd_lcm = lcm;
+
+        rc = cfs_kernel_thread(log_commit_thread, lcd, CLONE_VM | CLONE_FILES);
+cleanup:
         if (rc < 0) {
-                CERROR("error starting thread #%d: %d\n",
-                       atomic_read(&lcm->lcm_thread_total), rc);
+                CERROR("error starting thread #%d: %d\n", lcd->lcd_index, rc);
+                llog_lcm_dec(lcm);
+                if (lcd) 
+                        OBD_FREE_PTR(lcd);
                 RETURN(rc);
         }
-
         RETURN(0);
 }
 EXPORT_SYMBOL(llog_start_commit_thread);
@@ -470,7 +488,7 @@ static struct llog_process_args {
         void                    *llpa_arg;
 } llpa;
 
-int llog_init_commit_master(void)
+int llog_init_commit_master(struct llog_commit_master *lcm)
 {
         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy);
         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle);
@@ -488,18 +506,25 @@ int llog_init_commit_master(void)
         sema_init(&llpa.llpa_sem, 1);
         return 0;
 }
+EXPORT_SYMBOL(llog_init_commit_master);
 
-int llog_cleanup_commit_master(int force)
+int llog_cleanup_commit_master(struct llog_commit_master *lcm,
+                               int force)
 {
+        spin_lock(&lcm->lcm_thread_lock);
         lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
         if (force)
                 lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE;
+        
+        spin_unlock(&lcm->lcm_thread_lock);
+        
         cfs_waitq_signal(&lcm->lcm_waitq);
 
         wait_event_interruptible(lcm->lcm_waitq,
                                  atomic_read(&lcm->lcm_thread_total) == 0);
         return 0;
 }
+EXPORT_SYMBOL(llog_cleanup_commit_master);
 
 static int log_process_thread(void *args)
 {
@@ -517,12 +542,12 @@ static int log_process_thread(void *args)
         rc = llog_create(ctxt, &llh, &logid, NULL);
         if (rc) {
                 CERROR("llog_create failed %d\n", rc);
-                RETURN(rc);
+                GOTO(out, rc);
         }
         rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
         if (rc) {
                 CERROR("llog_init_handle failed %d\n", rc);
-                GOTO(out, rc);
+                GOTO(release_llh, rc);
         }
 
         if (cb) {
@@ -536,24 +561,33 @@ static int log_process_thread(void *args)
         CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
                ctxt->loc_llcd, ctxt);
         llog_sync(ctxt, NULL);
-out:
+
+release_llh:
         rc = llog_cat_put(llh);
         if (rc)
                 CERROR("llog_cat_put failed %d\n", rc);
-
+out:
+        llog_ctxt_put(ctxt);
         RETURN(rc);
 }
 
 static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
 {
+        struct obd_device *obd = ctxt->loc_obd;
         int rc;
         ENTRY;
 
+        if (obd->obd_stopping)
+                RETURN(-ENODEV);
+
         mutex_down(&llpa.llpa_sem);
-        llpa.llpa_ctxt = ctxt;
         llpa.llpa_cb = handle;
         llpa.llpa_arg = arg;
-
+        llpa.llpa_ctxt = llog_get_context(ctxt->loc_obd, ctxt->loc_idx);
+        if (!llpa.llpa_ctxt) {
+                mutex_up(&llpa.llpa_sem);
+                RETURN(-ENODEV);
+        }
         rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
         if (rc < 0)
                 CERROR("error starting log_process_thread: %d\n", rc);
@@ -581,13 +615,13 @@ int llog_repl_connect(struct llog_ctxt *ctxt, int count,
 
         mutex_down(&ctxt->loc_sem);
         ctxt->loc_gen = *gen;
-        llcd = llcd_grab();
+        llcd = llcd_grab(ctxt->loc_lcm);
         if (llcd == NULL) {
                 CERROR("couldn't get an llcd\n");
                 mutex_up(&ctxt->loc_sem);
                 RETURN(-ENOMEM);
         }
-        llcd->llcd_ctxt = ctxt;
+        llcd->llcd_ctxt = llog_ctxt_get(ctxt);
         ctxt->loc_llcd = llcd;
         mutex_up(&ctxt->loc_sem);