Whamcloud - gitweb
b=17686
authoryury <yury>
Wed, 19 Nov 2008 14:18:37 +0000 (14:18 +0000)
committeryury <yury>
Wed, 19 Nov 2008 14:18:37 +0000 (14:18 +0000)
r=shadow,panda

- fixes race in ptlrpcd caused busy obd and inability to cleanup;
- cleanups and debugs in llcd code.

lustre/include/lustre_net.h
lustre/obdclass/genops.c
lustre/ptlrpc/client.c
lustre/ptlrpc/ptlrpcd.c
lustre/ptlrpc/recov_thread.c

index 1bb9ac2..9d7f95b 100644 (file)
@@ -751,7 +751,7 @@ struct ptlrpc_service {
 
 struct ptlrpcd_ctl {
         /**
-         * Ptlrpc thread control flags (LIOD_START, LIOD_STOP, LIOD_STOP_FORCE)
+         * Ptlrpc thread control flags (LIOD_START, LIOD_STOP, LIOD_FORCE)
          */
         unsigned long               pc_flags;
         /**
@@ -810,10 +810,11 @@ enum ptlrpcd_ctl_flags {
          */
         LIOD_STOP        = 1 << 1,
         /**
-         * Ptlrpc thread stop force flag. This will cause also
-         * aborting any inflight rpcs handled by thread.
+         * Ptlrpc thread force flag (only stop force so far).
+         * This will cause aborting any inflight rpcs handled
+         * by thread if LIOD_STOP is specified.
          */
-        LIOD_STOP_FORCE  = 1 << 2,
+        LIOD_FORCE       = 1 << 2,
         /**
          * This is a recovery ptlrpc thread.
          */
index 854b5c6..edd6dfb 100644 (file)
@@ -813,8 +813,9 @@ struct obd_import *class_import_get(struct obd_import *import)
         LASSERT(atomic_read(&import->imp_refcount) >= 0);
         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
         atomic_inc(&import->imp_refcount);
-        CDEBUG(D_INFO, "import %p refcount=%d\n", import,
-               atomic_read(&import->imp_refcount));
+        CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
+               atomic_read(&import->imp_refcount), 
+               import->imp_obd->obd_name);
         return import;
 }
 EXPORT_SYMBOL(class_import_get);
@@ -827,13 +828,12 @@ void class_import_put(struct obd_import *import)
         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
         LASSERT(list_empty(&import->imp_zombie_chain));
 
-        CDEBUG(D_INFO, "import %p refcount=%d\n", import,
-               atomic_read(&import->imp_refcount) - 1);
+        CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
+               atomic_read(&import->imp_refcount) - 1, 
+               import->imp_obd->obd_name);
 
         if (atomic_dec_and_test(&import->imp_refcount)) {
-
                 CDEBUG(D_INFO, "final put import %p\n", import);
-
                 spin_lock(&obd_zombie_impexp_lock);
                 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
                 spin_unlock(&obd_zombie_impexp_lock);
index 165dd29..000904c 100644 (file)
@@ -2445,24 +2445,24 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
 
 void ptlrpc_abort_set(struct ptlrpc_request_set *set)
 {
-        struct list_head *tmp, *n;
+        struct list_head *tmp, *pos;
 
         LASSERT(set != NULL);
 
-        list_for_each_safe(tmp, n, &set->set_requests) {
+        list_for_each_safe(pos, tmp, &set->set_requests) {
                 struct ptlrpc_request *req =
-                        list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+                        list_entry(pos, struct ptlrpc_request, rq_set_chain);
 
-                spin_lock (&req->rq_lock);
+                spin_lock(&req->rq_lock);
                 if (req->rq_phase != RQ_PHASE_RPC) {
-                        spin_unlock (&req->rq_lock);
+                        spin_unlock(&req->rq_lock);
                         continue;
                 }
 
                 req->rq_err = 1;
                 req->rq_status = -EINTR;
                 ptlrpc_client_wake_req(req);
-                spin_unlock (&req->rq_lock);
+                spin_unlock(&req->rq_lock);
         }
 }
 
index 30c2823..a52e62b 100644 (file)
@@ -146,9 +146,6 @@ static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc)
         int rc = 0;
         ENTRY;
 
-        if (test_bit(LIOD_STOP, &pc->pc_flags))
-                RETURN(1);
-
         spin_lock(&pc->pc_set->set_new_req_lock);
         list_for_each_safe(pos, tmp, &pc->pc_set->set_new_requests) {
                 req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
@@ -202,7 +199,7 @@ static int ptlrpcd(void *arg)
 {
         struct ptlrpcd_ctl *pc = arg;
         struct lu_env env = { .le_ses = NULL };
-        int rc;
+        int rc, exit = 0;
         ENTRY;
 
         rc = cfs_daemonize_ctxt(pc->pc_name);
@@ -221,13 +218,14 @@ static int ptlrpcd(void *arg)
         if (rc != 0)
                 RETURN(rc);
         env.le_ctx.lc_cookie = 0x7;
+
         /*
          * This mainloop strongly resembles ptlrpc_set_wait() except that our
          * set never completes.  ptlrpcd_check() calls ptlrpc_check_set() when
          * there are requests in the set. New requests come in on the set's
          * new_req_list and ptlrpcd_check() moves them into the set.
          */
-        while (1) {
+        do {
                 struct l_wait_info lwi;
                 int timeout;
 
@@ -259,12 +257,17 @@ static int ptlrpcd(void *arg)
                 /*
                  * Abort inflight rpcs for forced stop case.
                  */
-                if (test_bit(LIOD_STOP_FORCE, &pc->pc_flags))
-                        ptlrpc_abort_set(pc->pc_set);
+                if (test_bit(LIOD_STOP, &pc->pc_flags)) {
+                        if (test_bit(LIOD_FORCE, &pc->pc_flags))
+                                ptlrpc_abort_set(pc->pc_set);
+                        exit++;
+                }
 
-                if (test_bit(LIOD_STOP, &pc->pc_flags))
-                        break;
-        }
+                /* 
+                 * Let's make one more loop to make sure that ptlrpcd_check()
+                 * copied all raced new rpcs into the set so we can kill them.
+                 */
+        } while (exit < 2);
 
         /*
          * Wait for inflight requests to drain.
@@ -276,6 +279,7 @@ static int ptlrpcd(void *arg)
 
         clear_bit(LIOD_START, &pc->pc_flags);
         clear_bit(LIOD_STOP, &pc->pc_flags);
+        clear_bit(LIOD_FORCE, &pc->pc_flags);
         return 0;
 }
 
@@ -382,7 +386,7 @@ void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force)
 
         set_bit(LIOD_STOP, &pc->pc_flags);
         if (force)
-                set_bit(LIOD_STOP_FORCE, &pc->pc_flags);
+                set_bit(LIOD_FORCE, &pc->pc_flags);
         cfs_waitq_signal(&pc->pc_set->set_waitq);
 #ifdef __KERNEL__
         wait_for_completion(&pc->pc_finishing);
index 6fa95a7..1cb65fd 100644 (file)
@@ -76,15 +76,27 @@ enum {
         LLOG_LCM_FL_EXIT        = 1 << 1
 };
 
+static void llcd_print(struct llog_canceld_ctxt *llcd, 
+                       const char *func, int line) 
+{
+        CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line);
+        CDEBUG(D_RPCTRACE, "  size: %d\n", llcd->llcd_size);
+        CDEBUG(D_RPCTRACE, "  ctxt: %p\n", llcd->llcd_ctxt);
+        CDEBUG(D_RPCTRACE, "  lcm : %p\n", llcd->llcd_lcm);
+        CDEBUG(D_RPCTRACE, "  cookiebytes : %d\n", llcd->llcd_cookiebytes);
+}
+
 /**
  * Allocate new llcd from cache, init it and return to caller.
  * Bumps number of objects allocated.
  */
-static struct llog_canceld_ctxt *llcd_alloc(void)
+static struct llog_canceld_ctxt *llcd_alloc(struct llog_commit_master *lcm)
 {
         struct llog_canceld_ctxt *llcd;
         int llcd_size;
 
+        LASSERT(lcm != NULL);
+
         /*
          * Payload of lustre_msg V2 is bigger.
          */
@@ -98,7 +110,17 @@ static struct llog_canceld_ctxt *llcd_alloc(void)
         CFS_INIT_LIST_HEAD(&llcd->llcd_list);
         llcd->llcd_size = llcd_size;
         llcd->llcd_cookiebytes = 0;
+
+        spin_lock(&lcm->lcm_lock);
+        llcd->llcd_lcm = lcm;
+        atomic_inc(&lcm->lcm_count);
+        list_add_tail(&llcd->llcd_list, &lcm->lcm_llcds);
+        spin_unlock(&lcm->lcm_lock);
         atomic_inc(&llcd_count);
+
+        CDEBUG(D_RPCTRACE, "Alloc llcd %p on lcm %p (%d)\n",
+               llcd, lcm, atomic_read(&lcm->lcm_count));
+
         return llcd;
 }
 
@@ -107,9 +129,27 @@ static struct llog_canceld_ctxt *llcd_alloc(void)
  */
 static void llcd_free(struct llog_canceld_ctxt *llcd)
 {
+        struct llog_commit_master *lcm = llcd->llcd_lcm;
+
+        if (lcm) {
+                if (atomic_read(&lcm->lcm_count) == 0) {
+                        CERROR("Invalid llcd free %p\n", llcd);
+                        llcd_print(llcd, __FUNCTION__, __LINE__);
+                        LBUG();
+                }
+                spin_lock(&lcm->lcm_lock);
+                LASSERT(!list_empty(&llcd->llcd_list));
+                list_del_init(&llcd->llcd_list);
+                atomic_dec(&lcm->lcm_count);
+                spin_unlock(&lcm->lcm_lock);
+        }
+
+        CDEBUG(D_RPCTRACE, "Free llcd %p on lcm %p (%d)\n", 
+               llcd, lcm, atomic_read(&lcm->lcm_count));
+
         LASSERT(atomic_read(&llcd_count) > 0);
-        OBD_SLAB_FREE(llcd, llcd_cache, llcd->llcd_size);
         atomic_dec(&llcd_count);
+        OBD_SLAB_FREE(llcd, llcd_cache, llcd->llcd_size);
 }
 
 /**
@@ -128,22 +168,12 @@ static void llcd_copy(struct llog_canceld_ctxt *llcd,
  * 1 if yes and 0 otherwise.
  */
 static int llcd_fit(struct llog_canceld_ctxt *llcd,
-                 struct llog_cookie *cookies)
+                    struct llog_cookie *cookies)
 {
         return (llcd->llcd_size -
                 llcd->llcd_cookiebytes) >= sizeof(*cookies);
 }
 
-static void llcd_print(struct llog_canceld_ctxt *llcd,
-                       const char *func, int line)
-{
-        CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line);
-        CDEBUG(D_RPCTRACE, "  size: %d\n", llcd->llcd_size);
-        CDEBUG(D_RPCTRACE, "  ctxt: %p\n", llcd->llcd_ctxt);
-        CDEBUG(D_RPCTRACE, "  lcm : %p\n", llcd->llcd_lcm);
-        CDEBUG(D_RPCTRACE, "  cookiebytes : %d\n", llcd->llcd_cookiebytes);
-}
-
 /**
  * Llcd completion function. Called uppon llcd send finish regardless
  * sending result. Error is passed in @rc. Note, that this will be called
@@ -154,7 +184,7 @@ llcd_interpret(const struct lu_env *env,
                struct ptlrpc_request *req, void *noused, int rc)
 {
         struct llog_canceld_ctxt *llcd = req->rq_async_args.pointer_arg[0];
-        CDEBUG(D_RPCTRACE, "Sent llcd %p (%d)\n", llcd, rc);
+        CDEBUG(D_RPCTRACE, "Sent llcd %p (%d) - killing it\n", llcd, rc);
         llcd_free(llcd);
         return 0;
 }
@@ -254,21 +284,15 @@ exit:
 static int
 llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd)
 {
-        struct llog_commit_master *lcm;
-
         LASSERT(ctxt != NULL && llcd != NULL);
         LASSERT_SEM_LOCKED(&ctxt->loc_sem);
         LASSERT(ctxt->loc_llcd == NULL);
-        lcm = ctxt->loc_lcm;
-        spin_lock(&lcm->lcm_lock);
-        atomic_inc(&lcm->lcm_count);
-        list_add_tail(&llcd->llcd_list, &lcm->lcm_llcds);
-        spin_unlock(&lcm->lcm_lock);
-        CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p (%d)\n",
-               llcd, ctxt, atomic_read(&lcm->lcm_count));
         llcd->llcd_ctxt = llog_ctxt_get(ctxt);
-        llcd->llcd_lcm = ctxt->loc_lcm;
         ctxt->loc_llcd = llcd;
+
+        CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p\n",
+               llcd, ctxt);
+
         return 0;
 }
 
@@ -278,7 +302,6 @@ llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd)
  */
 static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt)
 {
-        struct llog_commit_master *lcm;
         struct llog_canceld_ctxt *llcd;
 
         LASSERT(ctxt != NULL);
@@ -288,22 +311,10 @@ static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt)
         if (!llcd)
                 return NULL;
 
-        lcm = ctxt->loc_lcm;
-        if (atomic_read(&lcm->lcm_count) == 0) {
-                CERROR("Invalid detach occured %p:%p\n", ctxt, llcd);
-                llcd_print(llcd, __FUNCTION__, __LINE__);
-                LBUG();
-        }
-        spin_lock(&lcm->lcm_lock);
-        LASSERT(!list_empty(&llcd->llcd_list));
-        list_del_init(&llcd->llcd_list);
-        atomic_dec(&lcm->lcm_count);
-        spin_unlock(&lcm->lcm_lock);
-        ctxt->loc_llcd = NULL;
-
-        CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p (%d)\n",
-               llcd, ctxt, atomic_read(&lcm->lcm_count));
+        CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p\n", 
+               llcd, ctxt);
 
+        ctxt->loc_llcd = NULL;
         llog_ctxt_put(ctxt);
         return llcd;
 }
@@ -316,9 +327,9 @@ static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt)
 {
         struct llog_canceld_ctxt *llcd;
 
-        llcd = llcd_alloc();
+        llcd = llcd_alloc(ctxt->loc_lcm);
         if (!llcd) {
-                CERROR("Couldn't alloc an llcd for ctxt %p\n", ctxt);
+                CERROR("Can't alloc an llcd for ctxt %p\n", ctxt);
                 return NULL;
         }
         llcd_attach(ctxt, llcd);
@@ -330,10 +341,8 @@ static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt)
  */
 static void llcd_put(struct llog_ctxt *ctxt)
 {
-        struct llog_commit_master *lcm;
         struct llog_canceld_ctxt *llcd;
 
-        lcm = ctxt->loc_lcm;
         llcd = llcd_detach(ctxt);
         if (llcd)
                 llcd_free(llcd);
@@ -423,6 +432,18 @@ void llog_recov_thread_stop(struct llog_commit_master *lcm, int force)
                         llcd_print(llcd, __FUNCTION__, __LINE__);
                 }
                 spin_unlock(&lcm->lcm_lock);
+                
+                /*
+                 * No point to go further with busy llcds at this point
+                 * as this is clear bug. It might mean we got hanging
+                 * rpc which holds import ref and this means we will not
+                 * be able to cleanup anyways.
+                 *
+                 * Or we just missed to kill them when they were not
+                 * attached to ctxt. In this case our slab will remind
+                 * us about this a bit later.
+                 */
+                LBUG();
         }
         EXIT;
 }
@@ -626,6 +647,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
          * then do it.
          */
         if (llcd && (flags & OBD_LLOG_FL_SENDNOW)) {
+                CDEBUG(D_RPCTRACE, "Sync llcd %p\n", llcd);
                 rc = llcd_push(ctxt);
                 if (rc)
                         GOTO(out, rc);