Whamcloud - gitweb
Branch HEAD
authorbobijam <bobijam>
Tue, 20 May 2008 01:57:51 +0000 (01:57 +0000)
committerbobijam <bobijam>
Tue, 20 May 2008 01:57:51 +0000 (01:57 +0000)
b=15575
i=wangdi, johann, green, shadow

Description: Stack overflow during MDS log replay
Details    : ease stack pressure by using a thread dealing llog_process.

lustre/ChangeLog
lustre/obdclass/llog.c
lustre/obdclass/llog_internal.h

index fb897c0..3476fff 100644 (file)
@@ -12,6 +12,11 @@ tbd  Sun Microsystems, Inc.
        * RHEL 4 and RHEL 5/SLES 10 clients behaves differently on 'cd' to a
         removed cwd "./" (refer to Bugzilla 14399).
 
+Severity   : major
+Bugzilla   : 15575
+Description: Stack overflow during MDS log replay
+Details    : ease stack pressure by using a thread dealing llog_process.
+
 Severity   : normal
 Bugzilla   : 15443
 Description: wait until IO finished before start new when do lock cancel.
index 6cb4272..8eeb587 100644 (file)
@@ -42,6 +42,7 @@
 #include <obd_class.h>
 #include <lustre_log.h>
 #include <libcfs/list.h>
+#include "llog_internal.h"
 
 /* Allocate a new log or catalog handle */
 struct llog_handle *llog_alloc_handle(void)
@@ -204,22 +205,30 @@ int llog_close(struct llog_handle *loghandle)
 }
 EXPORT_SYMBOL(llog_close);
 
-int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
-                 void *data, void *catdata)
+static int llog_process_thread(void *arg)
 {
-        struct llog_log_hdr *llh = loghandle->lgh_hdr;
-        struct llog_process_cat_data *cd = catdata;
-        char *buf;
-        __u64 cur_offset = LLOG_CHUNK_SIZE, last_offset;
-        int rc = 0, index = 1, last_index;
-        int saved_index = 0, last_called_index = 0;
-        ENTRY;
+        struct llog_process_info     *lpi = (struct llog_process_info *)arg;
+        struct llog_handle           *loghandle = lpi->lpi_loghandle;
+        struct llog_log_hdr          *llh = loghandle->lgh_hdr;
+        struct llog_process_cat_data *cd  = lpi->lpi_catdata;
+        char                         *buf;
+        __u64                         cur_offset = LLOG_CHUNK_SIZE;
+        __u64                         last_offset;
+        int                           rc = 0, index = 1, last_index;
+        int                           saved_index = 0, last_called_index = 0;
 
         LASSERT(llh);
 
         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
-        if (!buf)
-                RETURN(-ENOMEM);
+        if (!buf) {
+                lpi->lpi_rc = -ENOMEM;
+#ifdef __KERNEL__
+                complete(&lpi->lpi_completion);
+#endif
+                return 0;
+        }
+
+        cfs_daemonize_ctxt("llog_process_thread");
 
         if (cd != NULL) {
                 last_called_index = cd->first_idx;
@@ -268,7 +277,7 @@ int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
 
                         CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
                                rec->lrh_type, rec->lrh_index);
+
                         if (rec->lrh_index == 0)
                                 GOTO(out, 0); /* no more records */
 
@@ -292,11 +301,12 @@ int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
 
                         loghandle->lgh_cur_idx = rec->lrh_index;
                         loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
-                                last_offset;
+                                                    last_offset;
 
                         /* if set, process the callback on this record */
                         if (ext2_test_bit(index, llh->llh_bitmap)) {
-                                rc = cb(loghandle, rec, data);
+                                rc = lpi->lpi_cb(loghandle, rec,
+                                                 lpi->lpi_cbdata);
                                 last_called_index = index;
                                 if (rc == LLOG_PROC_BREAK) {
                                         CDEBUG(D_HA, "recovery from log: "LPX64
@@ -305,7 +315,8 @@ int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
                                                loghandle->lgh_id.lgl_ogen);
                                         GOTO(out, rc);
                                 } else if (rc == LLOG_DEL_RECORD) {
-                                        llog_cancel_rec(loghandle, rec->lrh_index);
+                                        llog_cancel_rec(loghandle,
+                                                        rec->lrh_index);
                                         rc = 0;
                                 }
                                 if (rc)
@@ -326,6 +337,44 @@ int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
                 cd->last_idx = last_called_index;
         if (buf)
                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
+        lpi->lpi_rc = rc;
+#ifdef __KERNEL__
+        complete(&lpi->lpi_completion);
+#endif
+        return 0;
+}
+
+int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
+                 void *data, void *catdata)
+{
+        struct llog_process_info *lpi;
+        int                      rc;
+        ENTRY;
+
+        OBD_ALLOC_PTR(lpi);
+        if (lpi == NULL) {
+                CERROR("cannot alloc pointer\n");
+                RETURN(-ENOMEM);
+        }
+        lpi->lpi_loghandle = loghandle;
+        lpi->lpi_cb        = cb;
+        lpi->lpi_cbdata    = data;
+        lpi->lpi_catdata   = catdata;
+
+#ifdef __KERNEL__
+        init_completion(&lpi->lpi_completion);
+        rc = cfs_kernel_thread(llog_process_thread, lpi, CLONE_VM | CLONE_FILES);
+        if (rc < 0) {
+                CERROR("cannot start thread: %d\n", rc);
+                OBD_FREE_PTR(lpi);
+                RETURN(rc);
+        }
+        wait_for_completion(&lpi->lpi_completion);
+#else
+        llog_process_thread(lpi);
+#endif
+        rc = lpi->lpi_rc;
+        OBD_FREE_PTR(lpi);
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_process);
index d4be19c..82bb2e3 100644 (file)
@@ -1,6 +1,17 @@
 #ifndef __LLOG_INTERNAL_H__
 #define __LLOG_INTERNAL_H__
 
+#include <lustre_log.h>
+
+struct llog_process_info {
+        struct llog_handle *lpi_loghandle;
+        llog_cb_t           lpi_cb;
+        void               *lpi_cbdata;
+        void               *lpi_catdata;
+        int                 lpi_rc;
+        struct completion   lpi_completion;
+};
+
 int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
                       char *name, int count, struct llog_catid *idarray);
 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,