Whamcloud - gitweb
For the patchless client, LNET needs to come from it's own branch too.
[fs/lustre-release.git] / lustre / ptlrpc / recov_thread.c
index 3d66eae..8b386ee 100644 (file)
 #ifdef __KERNEL__
 #include <linux/fs.h>
 #else
-# include <portals/list.h>
+# include <libcfs/list.h>
 # include <liblustre.h>
 #endif
 
-#include <linux/kp30.h>
+#include <libcfs/kp30.h>
 #include <linux/obd_class.h>
 #include <linux/lustre_commit_confd.h>
 #include <linux/obd_support.h>
 #include <linux/obd_class.h>
 #include <linux/lustre_net.h>
 #include <portals/types.h>
-#include <portals/list.h>
+#include <libcfs/list.h>
 #include <linux/lustre_log.h>
 #include "ptlrpc_internal.h"
 
@@ -133,49 +133,41 @@ EXPORT_SYMBOL(llcd_send);
  * log record for the deletion.  The commit callback calls this
  * function
  */
-int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
-                         struct lov_stripe_md *lsm, int count,
-                         struct llog_cookie *cookies, int flags)
+int llog_obd_repl_cancel(struct llog_ctxt *ctxt, int count,
+                         struct llog_cookie *cookies, int flags, void *data)
 {
         struct llog_canceld_ctxt *llcd;
         int rc = 0;
         ENTRY;
 
         LASSERT(ctxt);
-
+        down(&ctxt->loc_sem);
         if (ctxt->loc_imp == NULL) {
                 CWARN("no import for ctxt %p\n", ctxt);
-                RETURN(0);
-        }
-
-        if (count == 0 || cookies == NULL) {
-                down(&ctxt->loc_sem);
-                if (ctxt->loc_llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))
-                        GOTO(out, rc);
-
-                llcd = ctxt->loc_llcd;
-                GOTO(send_now, rc);
+                GOTO(out, rc = 0);
         }
 
-        down(&ctxt->loc_sem);
         llcd = ctxt->loc_llcd;
-        if (llcd == NULL) {
-                llcd = llcd_grab();
-                if (llcd == NULL) {
-                        CERROR("couldn't get an llcd - dropped "LPX64":%x+%u\n",
-                               cookies->lgc_lgl.lgl_oid,
-                               cookies->lgc_lgl.lgl_ogen, cookies->lgc_index);
-                        GOTO(out, rc = -ENOMEM);
+        if (count > 0 && cookies != NULL) {
+                if (llcd == NULL) {      
+                        llcd = llcd_grab();
+                        if (llcd == NULL) {
+                                CERROR("couldn't get an llcd - dropped "LPX64":%x+%u\n",
+                                       cookies->lgc_lgl.lgl_oid,
+                                       cookies->lgc_lgl.lgl_ogen, cookies->lgc_index);
+                                GOTO(out, rc = -ENOMEM);
+                        }
+                        llcd->llcd_ctxt = ctxt;
+                        ctxt->loc_llcd = llcd;
                 }
-                llcd->llcd_ctxt = ctxt;
-                ctxt->loc_llcd = llcd;
+                memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes,
+                       cookies, sizeof(*cookies));
+                llcd->llcd_cookiebytes += sizeof(*cookies);
+        } else {
+                if (llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))
+                        GOTO(out, rc);
         }
 
-        memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, cookies,
-               sizeof(*cookies));
-        llcd->llcd_cookiebytes += sizeof(*cookies);
-
-send_now:
         if ((LLCD_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) ||
              flags & OBD_LLOG_FL_SENDNOW)) {
                 CDEBUG(D_HA, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
@@ -196,15 +188,15 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
         if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
                 down(&ctxt->loc_sem);
                 if (ctxt->loc_llcd != NULL) {
-                        CWARN("import will be destroyed, put "
-                              "llcd %p:%p\n", ctxt->loc_llcd, ctxt);
                         llcd_put(ctxt->loc_llcd);
                         ctxt->loc_llcd = NULL;
-                        ctxt->loc_imp = NULL;
                 }
+                CWARN("reverse import disconnected, put "
+                      "llcd %p:%p\n", ctxt->loc_llcd, ctxt);
+                ctxt->loc_imp = NULL;
                 up(&ctxt->loc_sem);
         } else {
-                rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
+                rc = llog_cancel(ctxt, 0, NULL, OBD_LLOG_FL_SENDNOW, NULL);
         }
 
         RETURN(rc);
@@ -232,8 +224,9 @@ static int log_commit_thread(void *arg)
         SIGNAL_MASK_UNLOCK(current, flags);
 
         spin_lock(&lcm->lcm_thread_lock);
-        THREAD_NAME(current->comm, "ll_log_commit_%d",
-                    atomic_read(&lcm->lcm_thread_total));
+
+        THREAD_NAME(current->comm, sizeof(current->comm) - 1,
+                    "ll_log_comt_%02d", atomic_read(&lcm->lcm_thread_total));
         atomic_inc(&lcm->lcm_thread_total);
         spin_unlock(&lcm->lcm_thread_lock);
         unlock_kernel();
@@ -341,10 +334,11 @@ static int log_commit_thread(void *arg)
                                 continue;
                         }
 
-                        request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1,
+                        up(&llcd->llcd_ctxt->loc_sem);
+                        request = ptlrpc_prep_req(import, LUSTRE_LOG_VERSION,
+                                                  OBD_LOG_CANCEL, 1,
                                                   &llcd->llcd_cookiebytes,
                                                   bufs);
-                        up(&llcd->llcd_ctxt->loc_sem);
 
                         if (request == NULL) {
                                 rc = -ENOMEM;
@@ -368,9 +362,9 @@ static int log_commit_thread(void *arg)
                                 ptlrpc_req_finished(request);
                                 continue;
                         }
+                        up(&llcd->llcd_ctxt->loc_sem);
                         rc = ptlrpc_queue_wait(request);
                         ptlrpc_req_finished(request);
-                        up(&llcd->llcd_ctxt->loc_sem);
 
                         /* If the RPC failed, we put this and the remaining
                          * messages onto the resend list for another time. */
@@ -492,6 +486,7 @@ int llog_cleanup_commit_master(int force)
                                  atomic_read(&lcm->lcm_thread_total) == 0);
         return 0;
 }
+EXPORT_SYMBOL(llog_cleanup_commit_master);
 
 static int log_process_thread(void *args)
 {
@@ -508,13 +503,14 @@ static int log_process_thread(void *args)
         lock_kernel();
         ptlrpc_daemonize(); /* thread never needs to do IO */
 
+        THREAD_NAME(current->comm, sizeof(current->comm) - 1, "llog_process");
         SIGNAL_MASK_LOCK(current, flags);
         sigfillset(&current->blocked);
         RECALC_SIGPENDING;
         SIGNAL_MASK_UNLOCK(current, flags);
         unlock_kernel();
 
-        rc = llog_create(ctxt, &llh, &logid, NULL);
+        rc = llog_open(ctxt, &llh, &logid, NULL, 0);
         if (rc) {
                 CERROR("llog_create failed %d\n", rc);
                 RETURN(rc);
@@ -601,9 +597,8 @@ EXPORT_SYMBOL(llog_repl_connect);
 
 #else /* !__KERNEL__ */
 
-int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
-                         struct lov_stripe_md *lsm, int count,
-                         struct llog_cookie *cookies, int flags)
+int llog_obd_repl_cancel(struct llog_ctxt *ctxt, int count,
+                         struct llog_cookie *cookies, int flags, void *data)
 {
         return 0;
 }