Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / ptlrpc / recov_thread.c
index 6fe00c8..eac5128 100644 (file)
@@ -42,7 +42,6 @@
 # include <liblustre.h>
 #endif
 
-#include <libcfs/kp30.h>
 #include <obd_class.h>
 #include <lustre_commit_confd.h>
 #include <obd_support.h>
 
 #ifdef __KERNEL__
 
-static struct llog_commit_master lustre_lcm;
-static struct llog_commit_master *lcm = &lustre_lcm;
-
 /* Allocate new commit structs in case we do not have enough.
  * Make the llcd size small enough that it fits into a single page when we
  * are sending/receiving it. */
-static int llcd_alloc(void)
+static int llcd_alloc(struct llog_commit_master *lcm)
 {
         struct llog_canceld_ctxt *llcd;
         int llcd_size;
@@ -85,7 +81,7 @@ static int llcd_alloc(void)
 }
 
 /* Get a free cookie struct from the list */
-struct llog_canceld_ctxt *llcd_grab(void)
+static struct llog_canceld_ctxt *llcd_grab(struct llog_commit_master *lcm)
 {
         struct llog_canceld_ctxt *llcd;
 
@@ -93,7 +89,7 @@ repeat:
         spin_lock(&lcm->lcm_llcd_lock);
         if (list_empty(&lcm->lcm_llcd_free)) {
                 spin_unlock(&lcm->lcm_llcd_lock);
-                if (llcd_alloc() < 0) {
+                if (llcd_alloc(lcm) < 0) {
                         CERROR("unable to allocate log commit data!\n");
                         return NULL;
                 }
@@ -110,10 +106,12 @@ repeat:
 
         return llcd;
 }
-EXPORT_SYMBOL(llcd_grab);
 
 static void llcd_put(struct llog_canceld_ctxt *llcd)
 {
+        struct llog_commit_master *lcm = llcd->llcd_lcm;
+
+        llog_ctxt_put(llcd->llcd_ctxt);
         if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
                 int llcd_size = llcd->llcd_size +
                          offsetof(struct llog_canceld_ctxt, llcd_cookies);
@@ -127,15 +125,54 @@ static void llcd_put(struct llog_canceld_ctxt *llcd)
 }
 
 /* Send some cookies to the appropriate target */
-void llcd_send(struct llog_canceld_ctxt *llcd)
+static void llcd_send(struct llog_canceld_ctxt *llcd)
 {
-        spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
-        list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending);
-        spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
-
+        if (!(llcd->llcd_lcm->lcm_flags & LLOG_LCM_FL_EXIT)) {
+                spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
+                list_add_tail(&llcd->llcd_list,
+                              &llcd->llcd_lcm->lcm_llcd_pending);
+                spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
+        }
         cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1);
 }
-EXPORT_SYMBOL(llcd_send);
+
+/**
+ * Grab llcd and assign it to passed @ctxt. Also set up backward link
+ * and get ref on @ctxt.
+ */
+static struct llog_canceld_ctxt *ctxt_llcd_grab(struct llog_ctxt *ctxt)
+{
+        struct llog_canceld_ctxt *llcd;
+
+        LASSERT_SEM_LOCKED(&ctxt->loc_sem);
+        llcd = llcd_grab(ctxt->loc_lcm);
+        if (llcd == NULL)
+                return NULL;
+
+        llcd->llcd_ctxt = llog_ctxt_get(ctxt);
+        ctxt->loc_llcd = llcd;
+
+        CDEBUG(D_RPCTRACE,"grab llcd %p:%p\n", ctxt->loc_llcd, ctxt);
+        return llcd;
+}
+
+/**
+ * Put llcd in passed @ctxt. Set ->loc_llcd to NULL.
+ */
+static void ctxt_llcd_put(struct llog_ctxt *ctxt)
+{
+        mutex_down(&ctxt->loc_sem);
+        if (ctxt->loc_llcd != NULL) {
+                CDEBUG(D_RPCTRACE,"put llcd %p:%p\n", ctxt->loc_llcd, ctxt);
+                llcd_put(ctxt->loc_llcd);
+                ctxt->loc_llcd = NULL;
+        }
+        if (ctxt->loc_imp) {
+                class_import_put(ctxt->loc_imp);
+                ctxt->loc_imp = NULL;
+        }
+        mutex_up(&ctxt->loc_sem);
+}
 
 /* deleted objects have a commit callback that cancels the MDS
  * log record for the deletion.  The commit callback calls this
@@ -152,16 +189,16 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
         LASSERT(ctxt);
 
         mutex_down(&ctxt->loc_sem);
+        llcd = ctxt->loc_llcd;
+
         if (ctxt->loc_imp == NULL) {
-                CDEBUG(D_HA, "no import for ctxt %p\n", ctxt);
+                CDEBUG(D_RPCTRACE, "no import for ctxt %p\n", ctxt);
                 GOTO(out, rc = 0);
         }
 
-        llcd = ctxt->loc_llcd;
-
         if (count > 0 && cookies != NULL) {
                 if (llcd == NULL) {
-                        llcd = llcd_grab();
+                        llcd = ctxt_llcd_grab(ctxt);
                         if (llcd == NULL) {
                                 CERROR("couldn't get an llcd - dropped "LPX64
                                        ":%x+%u\n",
@@ -170,8 +207,6 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                                        cookies->lgc_index);
                                 GOTO(out, rc = -ENOMEM);
                         }
-                        llcd->llcd_ctxt = ctxt;
-                        ctxt->loc_llcd = llcd;
                 }
 
                 memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, 
@@ -184,7 +219,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
 
         if ((llcd->llcd_size - llcd->llcd_cookiebytes) < sizeof(*cookies) ||
             (flags & OBD_LLOG_FL_SENDNOW)) {
-                CDEBUG(D_HA, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
+                CDEBUG(D_RPCTRACE, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
                 ctxt->loc_llcd = NULL;
                 llcd_send(llcd);
         }
@@ -200,23 +235,35 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
         ENTRY;
 
         if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
-                CDEBUG(D_HA, "reverse import disconnected, put llcd %p:%p\n",
-                       ctxt->loc_llcd, ctxt);
-                mutex_down(&ctxt->loc_sem);
-                if (ctxt->loc_llcd != NULL) {
-                        llcd_put(ctxt->loc_llcd);
-                        ctxt->loc_llcd = NULL;
-                }
-                ctxt->loc_imp = NULL;
-                mutex_up(&ctxt->loc_sem);
+                CDEBUG(D_RPCTRACE,"reverse import disconnect\n");
+                /* 
+                 * We put llcd because it is not going to sending list and
+                 * thus, its refc will not be handled. We will handle it here.
+                 */
+                ctxt_llcd_put(ctxt);
         } else {
+                /* 
+                 * Sending cancel. This means that ctxt->loc_llcd wil be
+                 * put on sending list in llog_obd_repl_cancel() and in
+                 * this case recovery thread will take care of it refc.
+                 */
                 rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
         }
-
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_obd_repl_sync);
 
+static inline void stop_log_commit(struct llog_commit_master *lcm,
+                                   struct llog_commit_daemon *lcd,
+                                   int rc)
+{
+        CERROR("error preparing commit: rc %d\n", rc);
+
+        spin_lock(&lcm->lcm_llcd_lock);
+        list_splice_init(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
+        spin_unlock(&lcm->lcm_llcd_lock);
+}
+
 static int log_commit_thread(void *arg)
 {
         struct llog_commit_master *lcm = arg;
@@ -254,7 +301,7 @@ static int log_commit_thread(void *arg)
                 /* If we do not have enough pages available, allocate some */
                 while (atomic_read(&lcm->lcm_llcd_numfree) <
                        lcm->lcm_llcd_minfree) {
-                        if (llcd_alloc() < 0)
+                        if (llcd_alloc(lcm) < 0)
                                 break;
                 }
 
@@ -291,7 +338,7 @@ static int log_commit_thread(void *arg)
 
                 if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
                     atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
-                        rc = llog_start_commit_thread();
+                        rc = llog_start_commit_thread(lcm);
                         if (rc < 0)
                                 CERROR("error starting thread: rc %d\n", rc);
                 }
@@ -329,13 +376,11 @@ static int log_commit_thread(void *arg)
 
                 /* We are the only one manipulating our local list - no lock */
                 list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
-                        int size[2] = { sizeof(struct ptlrpc_body),
-                                        llcd->llcd_cookiebytes };
                         char *bufs[2] = { NULL, (char *)llcd->llcd_cookies };
 
                         list_del(&llcd->llcd_list);
                         if (llcd->llcd_cookiebytes == 0) {
-                                CDEBUG(D_HA, "put empty llcd %p:%p\n",
+                                CDEBUG(D_RPCTRACE, "put empty llcd %p:%p\n",
                                        llcd, llcd->llcd_ctxt);
                                 llcd_put(llcd);
                                 continue;
@@ -361,25 +406,32 @@ static int log_commit_thread(void *arg)
 
                         OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10);
 
-                        request = ptlrpc_prep_req(import, LUSTRE_LOG_VERSION,
-                                                  OBD_LOG_CANCEL, 2, size,bufs);
+                        request = ptlrpc_request_alloc(import, &RQF_LOG_CANCEL);
                         if (request == NULL) {
                                 rc = -ENOMEM;
-                                CERROR("error preparing commit: rc %d\n", rc);
+                                stop_log_commit(lcm, lcd, rc);
+                                break;
+                        }
+
+                        req_capsule_set_size(&request->rq_pill, &RMF_LOGCOOKIES,
+                                             RCL_CLIENT,llcd->llcd_cookiebytes);
 
-                                spin_lock(&lcm->lcm_llcd_lock);
-                                list_splice(&lcd->lcd_llcd_list,
-                                            &lcm->lcm_llcd_resend);
-                                CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list);
-                                spin_unlock(&lcm->lcm_llcd_lock);
+                        rc = ptlrpc_request_bufs_pack(request,
+                                                      LUSTRE_LOG_VERSION,
+                                                      OBD_LOG_CANCEL, bufs,
+                                                      NULL);
+                        if (rc) {
+                                ptlrpc_request_free(request);
+                                stop_log_commit(lcm, lcd, rc);
                                 break;
                         }
 
-                        /* XXX FIXME bug 249, 5515 */
+                        /* bug 5515 */
                         request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
                         request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
+                        ptlrpc_at_set_req_timeout(request);
 
-                        ptlrpc_req_set_repsize(request, 1, NULL);
+                        ptlrpc_request_set_replen(request);
                         mutex_down(&llcd->llcd_ctxt->loc_sem);
                         if (llcd->llcd_ctxt->loc_imp == NULL) {
                                 mutex_up(&llcd->llcd_ctxt->loc_sem);
@@ -420,9 +472,9 @@ static int log_commit_thread(void *arg)
         /* If we are force exiting, just drop all of the cookies. */
         if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) {
                 spin_lock(&lcm->lcm_llcd_lock);
-                list_splice(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list);
-                list_splice(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list);
-                list_splice(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list);
+                list_splice_init(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list);
+                list_splice_init(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list);
+                list_splice_init(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list);
                 spin_unlock(&lcm->lcm_llcd_lock);
 
                 list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list)
@@ -444,7 +496,7 @@ static int log_commit_thread(void *arg)
         return 0;
 }
 
-int llog_start_commit_thread(void)
+int llog_start_commit_thread(struct llog_commit_master *lcm)
 {
         int rc;
         ENTRY;
@@ -470,7 +522,7 @@ static struct llog_process_args {
         void                    *llpa_arg;
 } llpa;
 
-int llog_init_commit_master(void)
+int llog_init_commit_master(struct llog_commit_master *lcm)
 {
         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy);
         CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle);
@@ -488,8 +540,10 @@ int llog_init_commit_master(void)
         sema_init(&llpa.llpa_sem, 1);
         return 0;
 }
+EXPORT_SYMBOL(llog_init_commit_master);
 
-int llog_cleanup_commit_master(int force)
+int llog_cleanup_commit_master(struct llog_commit_master *lcm,
+                               int force)
 {
         lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
         if (force)
@@ -500,6 +554,7 @@ int llog_cleanup_commit_master(int force)
                                  atomic_read(&lcm->lcm_thread_total) == 0);
         return 0;
 }
+EXPORT_SYMBOL(llog_cleanup_commit_master);
 
 static int log_process_thread(void *args)
 {
@@ -517,12 +572,12 @@ static int log_process_thread(void *args)
         rc = llog_create(ctxt, &llh, &logid, NULL);
         if (rc) {
                 CERROR("llog_create failed %d\n", rc);
-                RETURN(rc);
+                GOTO(out, rc);
         }
         rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
         if (rc) {
                 CERROR("llog_init_handle failed %d\n", rc);
-                GOTO(out, rc);
+                GOTO(release_llh, rc);
         }
 
         if (cb) {
@@ -536,28 +591,38 @@ static int log_process_thread(void *args)
         CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
                ctxt->loc_llcd, ctxt);
         llog_sync(ctxt, NULL);
-out:
+
+release_llh:
         rc = llog_cat_put(llh);
         if (rc)
                 CERROR("llog_cat_put failed %d\n", rc);
-
+out:
+        llog_ctxt_put(ctxt);
         RETURN(rc);
 }
 
 static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
 {
+        struct obd_device *obd = ctxt->loc_obd;
         int rc;
         ENTRY;
 
+        if (obd->obd_stopping)
+                RETURN(-ENODEV);
+
         mutex_down(&llpa.llpa_sem);
-        llpa.llpa_ctxt = ctxt;
         llpa.llpa_cb = handle;
         llpa.llpa_arg = arg;
-
+        llpa.llpa_ctxt = llog_ctxt_get(ctxt);
+        if (!llpa.llpa_ctxt) {
+                up(&llpa.llpa_sem);
+                RETURN(-ENODEV);
+        }
         rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
-        if (rc < 0)
+        if (rc < 0) {
+                llog_ctxt_put(ctxt);
                 CERROR("error starting log_process_thread: %d\n", rc);
-        else {
+        else {
                 CDEBUG(D_HA, "log_process_thread: %d\n", rc);
                 rc = 0;
         }
@@ -581,20 +646,19 @@ int llog_repl_connect(struct llog_ctxt *ctxt, int count,
 
         mutex_down(&ctxt->loc_sem);
         ctxt->loc_gen = *gen;
-        llcd = llcd_grab();
+        llcd = ctxt_llcd_grab(ctxt);
         if (llcd == NULL) {
                 CERROR("couldn't get an llcd\n");
                 mutex_up(&ctxt->loc_sem);
                 RETURN(-ENOMEM);
         }
-        llcd->llcd_ctxt = ctxt;
-        ctxt->loc_llcd = llcd;
         mutex_up(&ctxt->loc_sem);
 
         rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid);
-        if (rc != 0)
+        if (rc != 0) {
+                ctxt_llcd_put(ctxt);
                 CERROR("error recovery process: %d\n", rc);
-
+        }
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_repl_connect);