Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ptlrpc / recov_thread.c
index 1cb65fd..dc4b13b 100644 (file)
@@ -93,23 +93,24 @@ static void llcd_print(struct llog_canceld_ctxt *llcd,
 static struct llog_canceld_ctxt *llcd_alloc(struct llog_commit_master *lcm)
 {
         struct llog_canceld_ctxt *llcd;
-        int llcd_size;
+        int size, overhead;
 
         LASSERT(lcm != NULL);
 
         /*
-         * Payload of lustre_msg V2 is bigger.
+         * We want to send one page of cookies with rpc header. This buffer
+         * will be assigned later to the rpc, this is why we preserve the
+         * space for rpc header.
          */
-        llcd_size = CFS_PAGE_SIZE -
-                lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
-        llcd_size += offsetof(struct llog_canceld_ctxt, llcd_cookies);
-        OBD_SLAB_ALLOC(llcd, llcd_cache, CFS_ALLOC_STD, llcd_size);
+        size = CFS_PAGE_SIZE - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
+        overhead =  offsetof(struct llog_canceld_ctxt, llcd_cookies);
+        OBD_SLAB_ALLOC(llcd, llcd_cache, CFS_ALLOC_STD, size + overhead);
         if (!llcd)
                 return NULL;
 
         CFS_INIT_LIST_HEAD(&llcd->llcd_list);
-        llcd->llcd_size = llcd_size;
         llcd->llcd_cookiebytes = 0;
+        llcd->llcd_size = size;
 
         spin_lock(&lcm->lcm_lock);
         llcd->llcd_lcm = lcm;
@@ -130,6 +131,7 @@ static struct llog_canceld_ctxt *llcd_alloc(struct llog_commit_master *lcm)
 static void llcd_free(struct llog_canceld_ctxt *llcd)
 {
         struct llog_commit_master *lcm = llcd->llcd_lcm;
+        int size;
 
         if (lcm) {
                 if (atomic_read(&lcm->lcm_count) == 0) {
@@ -142,36 +144,39 @@ static void llcd_free(struct llog_canceld_ctxt *llcd)
                 list_del_init(&llcd->llcd_list);
                 atomic_dec(&lcm->lcm_count);
                 spin_unlock(&lcm->lcm_lock);
-        }
 
-        CDEBUG(D_RPCTRACE, "Free llcd %p on lcm %p (%d)\n", 
-               llcd, lcm, atomic_read(&lcm->lcm_count));
+                CDEBUG(D_RPCTRACE, "Free llcd %p on lcm %p (%d)\n", 
+                       llcd, lcm, atomic_read(&lcm->lcm_count));
+        }
 
         LASSERT(atomic_read(&llcd_count) > 0);
         atomic_dec(&llcd_count);
-        OBD_SLAB_FREE(llcd, llcd_cache, llcd->llcd_size);
+
+        size = offsetof(struct llog_canceld_ctxt, llcd_cookies) + 
+            llcd->llcd_size;
+        OBD_SLAB_FREE(llcd, llcd_cache, size);
 }
 
 /**
- * Copy passed @cookies to @llcd.
+ * Checks if passed cookie fits into llcd free space buffer. Returns
+ * 1 if yes and 0 otherwise.
  */
-static void llcd_copy(struct llog_canceld_ctxt *llcd,
-                      struct llog_cookie *cookies)
+static inline int 
+llcd_fit(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies)
 {
-        memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes,
-              cookies, sizeof(*cookies));
-        llcd->llcd_cookiebytes += sizeof(*cookies);
+        return (llcd->llcd_size - llcd->llcd_cookiebytes >= sizeof(*cookies));
 }
 
 /**
- * Checks if passed cookie fits into llcd free space buffer. Returns
- * 1 if yes and 0 otherwise.
+ * Copy passed @cookies to @llcd.
  */
-static int llcd_fit(struct llog_canceld_ctxt *llcd,
-                    struct llog_cookie *cookies)
+static inline void 
+llcd_copy(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies)
 {
-        return (llcd->llcd_size -
-                llcd->llcd_cookiebytes) >= sizeof(*cookies);
+        LASSERT(llcd_fit(llcd, cookies));
+        memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, 
+              cookies, sizeof(*cookies));
+        llcd->llcd_cookiebytes += sizeof(*cookies);
 }
 
 /**
@@ -265,6 +270,11 @@ static int llcd_send(struct llog_canceld_ctxt *llcd)
         req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
         req->rq_interpret_reply = (ptlrpc_interpterer_t)llcd_interpret;
         req->rq_async_args.pointer_arg[0] = llcd;
+
+        /* llog cancels will be replayed after reconnect so this will do twice
+         * first from replay llog, second for resended rpc */
+        req->rq_no_delay = req->rq_no_resend = 1;
+
         rc = ptlrpc_set_add_new_req(&lcm->lcm_pc, req);
         if (rc) {
                 ptlrpc_request_free(req);
@@ -468,7 +478,6 @@ struct llog_commit_master *llog_recov_thread_init(char *name)
         snprintf(lcm->lcm_name, sizeof(lcm->lcm_name),
                  "ll_log_commit_%s", name);
 
-        strncpy(lcm->lcm_name, name, sizeof(lcm->lcm_name));
         atomic_set(&lcm->lcm_count, 0);
         spin_lock_init(&lcm->lcm_lock);
         CFS_INIT_LIST_HEAD(&lcm->lcm_llcds);