Whamcloud - gitweb
again merge b_devel to b_eq 20031113
authorericm <ericm>
Thu, 13 Nov 2003 07:54:41 +0000 (07:54 +0000)
committerericm <ericm>
Thu, 13 Nov 2003 07:54:41 +0000 (07:54 +0000)
yesterday's merge brought in some nasty bugs.

lustre/include/linux/lustre_commit_confd.h
lustre/include/linux/lustre_log.h
lustre/llite/llite_lib.c
lustre/mds/commit_confd.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_log.c
lustre/ptlrpc/recov_thread.c

index 980e6ce..a749911 100644 (file)
 
 #include <linux/lustre_log.h>
 
-struct llog_commit_data {
+struct llog_canceld_ctxt {
         struct list_head           llcd_list;  /* free or pending struct list */
         struct obd_import         *llcd_import;
         struct llog_commit_master *llcd_lcm;
         int                        llcd_tries; /* number of tries to send */
+        struct llog_ctxt_gen       llcd_gen; 
         int                        llcd_cookiebytes;
         struct llog_cookie         llcd_cookies[0];
 };
@@ -46,9 +47,9 @@ struct llog_commit_master {
         int                     lcm_flags;
         wait_queue_head_t       lcm_waitq;
 
-        struct list_head        lcm_llcd_pending; /* llog_commit_data to send */
+        struct list_head        lcm_llcd_pending; /* llog_canceld_ctxt to send */
         struct list_head        lcm_llcd_resend;  /* try to resend this data */
-        struct list_head        lcm_llcd_free;    /* free llog_commit_data */
+        struct list_head        lcm_llcd_free;    /* free llog_canceld_ctxt */
         spinlock_t              lcm_llcd_lock;    /* protects llcd_free */
         atomic_t                lcm_llcd_numfree; /* items on llcd_free */
         int                     lcm_llcd_minfree; /* min free on llcd_free */
@@ -67,7 +68,7 @@ struct llog_commit_daemon {
 
 /* ptlrpc/recov_thread.c */
 int llog_start_commit_thread(void);
-struct llog_commit_data *llcd_grab(void);
-void llcd_send(struct llog_commit_data *llcd);
+struct llog_canceld_ctxt *llcd_grab(void);
+void llcd_send(struct llog_canceld_ctxt *llcd);
 
 #endif /* _LUSTRE_COMMIT_CONFD_H */
index 6e2ce07..c6cbd64 100644 (file)
@@ -38,6 +38,9 @@
 #include <linux/obd.h>
 #include <linux/lustre_idl.h>
 
+#define LOG_NAME_LIMIT(logname, name)                   \
+        snprintf(logname, sizeof(logname), "LOGS/%s", name)
+
 struct plain_handle_data {
         struct list_head   phd_entry;
         struct llog_handle     *phd_cat_handle; 
@@ -54,11 +57,10 @@ struct cat_handle_data {
 struct llog_handle {
         struct semaphore        lgh_lock;
         struct llog_logid       lgh_id;              /* id of this log */
-        struct obd_device      *lgh_obd;
         struct llog_log_hdr    *lgh_hdr;
         struct file            *lgh_file;
         int                     lgh_last_idx;
-        struct llog_obd_ctxt   *lgh_ctxt;
+        struct llog_ctxt       *lgh_ctxt;
         union {
                 struct plain_handle_data phd;
                 struct cat_handle_data   chd;
@@ -92,18 +94,19 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data);
 /* llog_obd.c */
 int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
                int count,  struct llog_logid *logid, struct llog_operations *op);
-int llog_cleanup(struct llog_obd_ctxt *);
-int llog_add(struct llog_obd_ctxt *ctxt,
+int llog_cleanup(struct llog_ctxt *);
+int llog_precleanup(struct llog_ctxt *);
+int llog_add(struct llog_ctxt *ctxt,
                         struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
                         struct llog_cookie *logcookies, int numcookies);
-int llog_cancel(struct llog_obd_ctxt *, struct lov_stripe_md *lsm,
+int llog_cancel(struct llog_ctxt *, struct lov_stripe_md *lsm,
                     int count, struct llog_cookie *cookies, int flags);
 
 int llog_obd_origin_setup(struct obd_device *obd, int index, 
                           struct obd_device *disk_obd, int count, 
                           struct llog_logid *logid);
-int llog_obd_origin_cleanup(struct llog_obd_ctxt *ctxt);
-int llog_obd_origin_add(struct llog_obd_ctxt *ctxt,
+int llog_obd_origin_cleanup(struct llog_ctxt *ctxt);
+int llog_obd_origin_add(struct llog_ctxt *ctxt,
                         struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
                         struct llog_cookie *logcookies, int numcookies);
 
@@ -114,15 +117,19 @@ int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
 int obd_llog_finish(struct obd_device *obd, int count);
 
 /* llog_net.c */
-int llog_initiator_connect(struct llog_obd_ctxt *ctxt);
-int llog_receptor_accept(struct llog_obd_ctxt *ctxt, struct obd_import *imp);
-int llog_origin_handle_cancel(struct llog_obd_ctxt *ctxt, 
-                              struct ptlrpc_request *req);
+int llog_initiator_connect(struct llog_ctxt *ctxt);
+int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp);
+int llog_origin_handle_cancel(struct ptlrpc_request *req);
+int llog_origin_connect(struct llog_ctxt *ctxt, int count,
+                        struct llog_logid *logid, struct llog_ctxt_gen *gen);
+int llog_handle_connect(struct ptlrpc_request *req);
 
 /* recov_thread.c */
-int llog_obd_repl_cancel(struct llog_obd_ctxt *ctxt,
+int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                          struct lov_stripe_md *lsm, int count,
                          struct llog_cookie *cookies, int flags);
+int llog_repl_connect(struct llog_ctxt *ctxt, int count, 
+                      struct llog_logid *logid, struct llog_ctxt_gen *gen);
 
 struct llog_operations {
         int (*lop_write_rec)(struct llog_handle *loghandle,
@@ -138,7 +145,7 @@ struct llog_operations {
                               __u64 *offset, 
                               void *buf, 
                               int len);
-        int (*lop_create)(struct llog_obd_ctxt *ctxt, struct llog_handle **,
+        int (*lop_create)(struct llog_ctxt *ctxt, struct llog_handle **,
                           struct llog_logid *logid, char *name);
         int (*lop_close)(struct llog_handle *handle);
         int (*lop_read_header)(struct llog_handle *handle);
@@ -146,51 +153,59 @@ struct llog_operations {
         int (*lop_setup)(struct obd_device *obd, int ctxt_idx, 
                          struct obd_device *disk_obd, int count, 
                          struct llog_logid *logid);
-        int (*lop_cleanup)(struct llog_obd_ctxt *ctxt);
-        int (*lop_add)(struct llog_obd_ctxt *ctxt, struct llog_rec_hdr *rec, 
+        int (*lop_precleanup)(struct llog_ctxt *ctxt);
+        int (*lop_cleanup)(struct llog_ctxt *ctxt);
+        int (*lop_add)(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, 
                        struct lov_stripe_md *lsm, 
                        struct llog_cookie *logcookies, int numcookies);
-        int (*lop_cancel)(struct llog_obd_ctxt *ctxt, struct lov_stripe_md *lsm,
+        int (*lop_cancel)(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm,
                           int count, struct llog_cookie *cookies, int flags);
+        int (*lop_connect)(struct llog_ctxt *ctxt, int count,
+                           struct llog_logid *logid, struct llog_ctxt_gen *gen);
         /* XXX add 2 more: commit callbacks and llog recovery functions */
 };
 
 extern struct llog_operations llog_lvfs_ops;
 
-/* MDS stored handles in OSC */
-#define LLOG_OBD_DEL_LOG_HANDLE 0
-
-/* OBDFILTER stored handles in OBDFILTER */
-#define LLOG_OBD_SZ_LOG_HANDLE  0
-#define LLOG_OBD_RD1_LOG_HANDLE 1
 
-struct llog_obd_ctxt {
+struct llog_ctxt {
         int                      loc_idx; /* my index the obd array of ctxt's */
+        struct llog_ctxt_gen     loc_gen; 
         struct obd_device       *loc_obd; /* points back to the containing obd*/
         struct obd_export       *loc_exp;
         struct obd_import       *loc_imp; /* to use in RPC's: can be backward 
                                              pointing import */
         struct llog_operations  *loc_logops;
         struct llog_handle      *loc_handle;
-        struct llog_commit_data *loc_llcd;
+        struct llog_canceld_ctxt *loc_llcd;
         struct semaphore         loc_sem; /* protects loc_llcd */
+        void                    *llog_proc_cb;
 };
 
-#if 0
-int obd_log_cancel(struct obd_export *exp, struct llog_handle *cathandle, 
-                   void *buf, int count, struct llog_cookie *cookies, 
-                   int flags);
+static inline void log_gen_init(struct llog_ctxt *ctxt)
+{
+        struct obd_device *obd = ctxt->loc_exp->exp_obd;
 
+        if (!strcmp(obd->obd_type->typ_name, "mds"))
+                ctxt->loc_gen.mnt_cnt = obd->u.mds.mds_mount_count;
+        else if (!strstr(obd->obd_type->typ_name, "filter")) {
+                ctxt->loc_gen.mnt_cnt = obd->u.filter.fo_mount_count; 
+        }
+        else
+                ctxt->loc_gen.mnt_cnt = 0; 
+}
+
+static inline int log_gen_lt(struct llog_ctxt_gen a, struct llog_ctxt_gen b)
+{
+        if (a.mnt_cnt < b.mnt_cnt)
+                return 1;
+        if (a.mnt_cnt > b.mnt_cnt)
+                return 0;
+        return(a.conn_cnt < b.conn_cnt ? 1 : 0);
+}
 
-int llog_originator_setup(struct obd_device *, int);
-int llog_originator_cleanup(struct obd_device *);
-int llog_originator_open(struct obd_device *originator, 
-                         struct obd_device *disk_obd,
-                         int index, int named, int flags, 
-                         struct obd_uuid *log_uuid);
-#endif
 
-static inline int llog_obd2ops(struct llog_obd_ctxt *ctxt,
+static inline int llog_obd2ops(struct llog_ctxt *ctxt,
                                struct llog_operations **lop)
 {
         if (ctxt == NULL)
@@ -219,6 +234,15 @@ static inline int llog_data_len(int len)
                 remains : (((len + mask) & (~mask)) + remains);
 }
 
+static inline struct llog_ctxt *llog_get_context(struct obd_device *obd,
+                                                 int index)
+{
+        if (index < 0 || index >= LLOG_MAX_CTXTS)
+                return NULL;
+        else
+                return obd->obd_llog_ctxt[index];
+}
+
 static inline int llog_write_rec(struct llog_handle *handle,
                                  struct llog_rec_hdr *rec,
                                  struct llog_cookie *logcookies,
@@ -316,7 +340,7 @@ static inline int llog_next_block(struct llog_handle *loghandle, int *cur_idx,
         RETURN(rc);
 }
 
-static inline int llog_create(struct llog_obd_ctxt *ctxt, 
+static inline int llog_create(struct llog_ctxt *ctxt, 
                               struct llog_handle **res,
                               struct llog_logid *logid, char *name)
 {
@@ -333,4 +357,37 @@ static inline int llog_create(struct llog_obd_ctxt *ctxt,
         rc = lop->lop_create(ctxt, res, logid, name);
         RETURN(rc);
 }
+
+static inline int llog_connect(struct llog_ctxt *ctxt, int count,
+                               struct llog_logid *logid,
+                               struct llog_ctxt_gen *gen)
+{
+        struct llog_operations *lop;
+        int rc;
+        ENTRY;
+
+        rc = llog_obd2ops(ctxt, &lop);
+        if (rc)
+                RETURN(rc);
+        if (lop->lop_connect == NULL)
+                RETURN(-EOPNOTSUPP);
+
+        rc = lop->lop_connect(ctxt, count, logid, gen);
+        RETURN(rc);
+}
+
+static inline int cathandle_print_cb(struct llog_handle *llh, 
+                                     struct llog_rec_hdr *rec, void *data)
+{
+        struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
+
+        if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
+                CERROR("invalid record in catalog\n");
+                RETURN(-EINVAL);
+        }
+
+        CDEBUG(D_HA, "seeing record at index %d in log "LPX64"\n", 
+               le32_to_cpu(rec->lrh_index), lir->lid_id.lgl_oid);
+        RETURN(0);
+}
 #endif
index f141fee..4e3210d 100644 (file)
@@ -358,7 +358,7 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile,
         char * name = "mdc_dev";
         class_uuid_t uuid;
         struct obd_uuid mdc_uuid;
-        struct llog_obd_ctxt *ctxt;
+        struct llog_ctxt *ctxt;
         int rc = 0;
         int err;
         ENTRY;
@@ -427,7 +427,7 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile,
         
         exp = class_conn2export(&mdc_conn);
         
-        ctxt = exp->exp_obd->obd_llog_ctxt[LLOG_CONFIG_REPL_CTXT];
+        ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
         rc = class_config_parse_llog(ctxt, profile, cfg);
         if (rc) {
                 CERROR("class_config_parse_llog failed: rc = %d\n", rc);
index 557dc55..d08bd4b 100644 (file)
@@ -24,13 +24,13 @@ void commit_confd_conf_import(struct obd_import *import,
 
 
         list_for_each_safe(&import->import_cc_list, tmp, save) {
-                struct llog_commit_data *cd;
+                struct llog_canceld_ctxt *cd;
 
                 if (atomic_read(import->import_cc_count) <=
                     lccd->llcconf_lowwater)
                         break;
 
-                cd = list_entry(tmp, struct llog_commit_data *, llcconf_entry);
+                cd = list_entry(tmp, struct llog_canceld_ctxt *, llcconf_entry);
                 atomic_dec(&import->import_cc_count);
                 commit_confd_add_and_fire(cd);
         }
index 32fa3fb..d5bb1ea 100644 (file)
@@ -141,10 +141,11 @@ int filter_log_sz_change(struct llog_handle *cathandle,
                          __u32 io_epoch,
                          struct llog_cookie *logcookie, 
                          struct inode *inode);
-int filter_get_catalog(struct obd_device *);
+//int filter_get_catalog(struct obd_device *);
 void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
                               void *cb_data, int error);
-
+int filter_recov_log_unlink_cb(struct llog_handle *llh,
+                               struct llog_rec_hdr *rec, void *data);
 
 /* filter_san.c */
 int filter_san_setup(struct obd_device *obd, obd_count len, void *buf);
index 8f526b6..341ad6c 100644 (file)
@@ -97,7 +97,63 @@ void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
                                      void *cb_data, int error)
 {
         struct llog_cookie *cookie = cb_data;
-        llog_obd_repl_cancel(obd->obd_llog_ctxt[LLOG_UNLINK_REPL_CTXT],
-                             NULL, 1, cookie, OBD_LLOG_FL_SENDNOW);
+        llog_cancel(llog_get_context(obd, cookie->lgc_subsys + 1),
+                             NULL, 1, cookie, 0);
+                             //NULL, 1, cookie, OBD_LLOG_FL_SENDNOW);
         OBD_FREE(cb_data, sizeof(struct llog_cookie));
 }
+
+/* Callback for processing the unlink log record received from MDS by 
+ * llog_client_api.
+ */
+int filter_recov_log_unlink_cb(struct llog_handle *llh, 
+                               struct llog_rec_hdr *rec, void *data)
+{
+        struct llog_ctxt *ctxt = llh->lgh_ctxt;
+        struct obd_device *obd = ctxt->loc_obd;
+        struct obd_export *exp = obd->obd_self_export;
+        struct llog_cookie cookie;
+        struct llog_unlink_rec *lur;
+        struct obdo *oa;
+        struct obd_trans_info oti = { 0 };
+        obd_id oid;
+        int rc = 0;
+        ENTRY;
+                                                                                                                             
+        if (!le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) {
+                CERROR("log is not plain\n");
+                RETURN(-EINVAL);
+        }
+        if (rec->lrh_type != MDS_UNLINK_REC) {
+                CERROR("log record type error\n");
+                RETURN(-EINVAL);
+        }
+        cookie.lgc_lgl = llh->lgh_id;
+        cookie.lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
+        cookie.lgc_index = le32_to_cpu(rec->lrh_index);
+                                                                                                                             
+        lur = (struct llog_unlink_rec *)rec;
+        
+        oa = obdo_alloc();
+        if (oa == NULL) 
+                RETURN(-ENOMEM);
+        oa->o_valid |= OBD_MD_FLCOOKIE;
+        oa->o_id = lur->lur_oid;
+        oa->o_gr = lur->lur_ogen;
+        memcpy(obdo_logcookie(oa), &cookie, sizeof(cookie));
+        oid = oa->o_id;
+
+        rc = obd_destroy(exp, oa, NULL, &oti);
+        obdo_free(oa);
+        if (rc == -ENOENT) {
+                CERROR("object already removed: send cookie\n");
+                llog_cancel(ctxt, NULL, 1, &cookie, 0);
+                RETURN(0);
+        }
+
+        if (rc == 0)
+                CERROR("object: "LPU64" in record destroyed successful\n", oid);
+
+        RETURN(rc);
+}
index 2a8bfbb..4772481 100644 (file)
@@ -58,9 +58,10 @@ static struct llog_commit_master *lcm = &lustre_lcm;
 /* Allocate new commit structs in case we do not have enough */
 static int llcd_alloc(void)
 {
-        struct llog_commit_data *llcd;
+        struct llog_canceld_ctxt *llcd;
+        int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies);
 
-        OBD_ALLOC(llcd, PAGE_SIZE);
+        OBD_ALLOC(llcd, PAGE_SIZE + offset);
         if (llcd == NULL)
                 return -ENOMEM;
 
@@ -75,9 +76,9 @@ static int llcd_alloc(void)
 }
 
 /* Get a free cookie struct from the list */
-struct llog_commit_data *llcd_grab(void)
+struct llog_canceld_ctxt *llcd_grab(void)
 {
-        struct llog_commit_data *llcd;
+        struct llog_canceld_ctxt *llcd;
 
         spin_lock(&lcm->lcm_llcd_lock);
         if (list_empty(&lcm->lcm_llcd_free)) {
@@ -101,10 +102,12 @@ struct llog_commit_data *llcd_grab(void)
 }
 EXPORT_SYMBOL(llcd_grab);
 
-static void llcd_put(struct llog_commit_data *llcd)
+static void llcd_put(struct llog_canceld_ctxt *llcd)
 {
+        int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies);
+
         if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
-                OBD_FREE(llcd, PAGE_SIZE);
+                OBD_FREE(llcd, PAGE_SIZE + offset);
         } else {
                 spin_lock(&lcm->lcm_llcd_lock);
                 list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
@@ -114,7 +117,7 @@ static void llcd_put(struct llog_commit_data *llcd)
 }
 
 /* Send some cookies to the appropriate target */
-void llcd_send(struct llog_commit_data *llcd)
+void llcd_send(struct llog_canceld_ctxt *llcd)
 {
         spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
         list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending);
@@ -128,11 +131,11 @@ EXPORT_SYMBOL(llcd_send);
  * log record for the deletion.  The commit callback calls this 
  * function 
  */
-int llog_obd_repl_cancel(struct llog_obd_ctxt *ctxt,
+int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                          struct lov_stripe_md *lsm, int count,
                          struct llog_cookie *cookies, int flags)
 {
-        struct llog_commit_data *llcd;
+        struct llog_canceld_ctxt *llcd;
         int rc = 0;
         ENTRY;
 
@@ -158,17 +161,18 @@ int llog_obd_repl_cancel(struct llog_obd_ctxt *ctxt,
                         GOTO(out, rc = -ENOMEM);
                 }
                 llcd->llcd_import = ctxt->loc_imp;
+                llcd->llcd_gen = ctxt->loc_gen;
                 ctxt->loc_llcd = llcd;
         }
 
-        memcpy(llcd->llcd_cookies + llcd->llcd_cookiebytes, cookies,
+        memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, cookies,
                sizeof(*cookies));
         llcd->llcd_cookiebytes += sizeof(*cookies);
 
-        GOTO(send_now, rc);
 send_now:
         if ((PAGE_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) ||
              flags & OBD_LLOG_FL_SENDNOW)) {
+                CDEBUG(D_HA, "send llcd: %p\n", llcd);
                 ctxt->loc_llcd = NULL;
                 llcd_send(llcd);
         }
@@ -182,7 +186,7 @@ static int log_commit_thread(void *arg)
 {
         struct llog_commit_master *lcm = arg;
         struct llog_commit_daemon *lcd;
-        struct llog_commit_data *llcd, *n;
+        struct llog_canceld_ctxt *llcd, *n;
         unsigned long flags;
         ENTRY;
 
@@ -290,7 +294,27 @@ static int log_commit_thread(void *arg)
                 /* We are the only one manipulating our local list - no lock */
                 list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
                         char *bufs[1] = {(char *)llcd->llcd_cookies};
+                        struct obd_device *obd = import->imp_obd;
+                        struct llog_ctxt *ctxt;
+
                         list_del(&llcd->llcd_list);
+                        if (llcd->llcd_cookiebytes == 0) {
+                                CDEBUG(D_HA, "just put empty llcd %p\n", llcd);
+                                llcd_put(llcd);
+                                continue;
+                        }
+                        /* check whether the cookies are new. if new then send, otherwise
+                         * just put llcd */
+                        ctxt = llog_get_context(obd, llcd->llcd_cookies[0].lgc_subsys + 1);
+                        LASSERT(ctxt != NULL);
+                        down(&ctxt->loc_sem);
+                        if (log_gen_lt(llcd->llcd_gen, ctxt->loc_gen)) {
+                                up(&ctxt->loc_sem); 
+                                CDEBUG(D_HA, "just put stale llcd %p\n", llcd);
+                                llcd_put(llcd);
+                                continue;
+                        }
+                        up(&ctxt->loc_sem); 
 
                         request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1,
                                                   &llcd->llcd_cookiebytes,
@@ -318,6 +342,7 @@ static int log_commit_thread(void *arg)
                                 continue;
                         }
 
+#if 0                   /* FIXME just put llcd, not send it again */
                         spin_lock(&lcm->lcm_llcd_lock);
                         list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
                         if (++llcd->llcd_tries < 5) {
@@ -329,12 +354,13 @@ static int log_commit_thread(void *arg)
                                 spin_unlock(&lcm->lcm_llcd_lock);
                         } else {
                                 spin_unlock(&lcm->lcm_llcd_lock);
+#endif
                                 CERROR("commit %p dropped %d cookies: rc %d\n",
                                        llcd, (int)(llcd->llcd_cookiebytes /
                                                    sizeof(*llcd->llcd_cookies)),
                                        rc);
                                 llcd_put(llcd);
-                        }
+//                        }
                         break;
                 }
 
@@ -390,6 +416,12 @@ int llog_start_commit_thread(void)
 }
 EXPORT_SYMBOL(llog_start_commit_thread);
 
+static struct llog_process_args {
+        struct semaphore         llpa_sem; 
+        struct llog_ctxt        *llpa_ctxt;
+        void                    *llpa_cb;
+        void                    *llpa_arg;
+} llpa;
 int llog_init_commit_master(void)
 {
         INIT_LIST_HEAD(&lcm->lcm_thread_busy);
@@ -404,6 +436,8 @@ int llog_init_commit_master(void)
         atomic_set(&lcm->lcm_llcd_numfree, 0);
         lcm->lcm_llcd_minfree = 0;
         lcm->lcm_thread_max = 5;
+        /* FIXME initialize semaphore for llog_process_args */
+        sema_init(&llpa.llpa_sem, 1);
         return 0;
 }
 
@@ -419,9 +453,118 @@ int llog_cleanup_commit_master(int force)
         return 0;
 }
 
+
+static int log_process_thread(void *args)
+{
+        struct llog_process_args *data = args;
+        struct llog_ctxt *ctxt = data->llpa_ctxt;
+        void   *cb = data->llpa_cb;
+        struct llog_logid logid = *(struct llog_logid *)(data->llpa_arg);
+        struct llog_handle *llh = NULL;
+        unsigned long flags;
+        int rc;
+        ENTRY;
+                                                                                                                             
+        up(&data->llpa_sem);
+        lock_kernel();
+        ptlrpc_daemonize(); /* thread never needs to do IO */
+                                                                                                                             
+        SIGNAL_MASK_LOCK(current, flags);
+        sigfillset(&current->blocked);
+        RECALC_SIGPENDING;
+        SIGNAL_MASK_UNLOCK(current, flags);
+        unlock_kernel();
+                                                                                                                             
+        rc = llog_create(ctxt, &llh, &logid, NULL);
+        if (rc) {
+                CERROR("llog_create failed %d\n", rc);
+                RETURN(rc);
+        }
+        rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
+        if (rc) {
+                CERROR("llog_init_handle failed %d\n", rc);
+                GOTO(out, rc);
+        }
+                                                                                                                             
+        rc = llog_process(llh, cathandle_print_cb, NULL);
+        if (rc) {
+                CERROR("llog_process with cathandle_print_cb failed %d\n", rc);
+                GOTO(out, rc);
+        }
+                                                                                                                             
+        if (cb) {
+                rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
+                if (rc)
+                        CERROR("llog_cat_process failed %d\n", rc);
+        } else
+                CERROR("no cb func for recovery\n");
+
+        CDEBUG(D_HA, "send to llcd :%p forcibly\n", ctxt->loc_llcd);
+        llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
+out:
+        rc = llog_cat_put(llh);
+        if (rc)
+                CERROR("llog_cat_put failed %d\n", rc);
+                                                                                                                             
+        RETURN(rc);
+}
+static int llog_recovery_generic(struct llog_ctxt *ctxt,
+                                 void *handle,
+                                 void *arg)
+{
+        int rc;
+        ENTRY;
+
+        down(&llpa.llpa_sem);
+        llpa.llpa_ctxt = ctxt;
+        llpa.llpa_cb = handle;
+        llpa.llpa_arg = arg;
+
+        rc = kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
+        if (rc < 0)
+                CERROR("error starting log_process_thread: %d\n", rc);
+        else {
+                CDEBUG(D_HA, "log_process_thread: %d\n", rc);
+                rc = 0;
+        }
+
+        RETURN(rc);
+}
+int llog_repl_connect(struct llog_ctxt *ctxt, int count,
+                      struct llog_logid *logid, struct llog_ctxt_gen *gen)
+{
+        struct llog_canceld_ctxt *llcd;
+        int rc;
+        ENTRY;
+                                                                                                                             
+        down(&ctxt->loc_sem);
+        ctxt->loc_gen = *gen;
+        llcd = ctxt->loc_llcd;
+        if (llcd) {
+                CDEBUG(D_HA, "put current llcd when new connection arrives\n");
+                llcd_put(llcd);
+        }
+        llcd = llcd_grab();
+        if (llcd == NULL) {
+                CERROR("couldn't get an llcd\n");
+                RETURN(-ENOMEM);
+        }
+        llcd->llcd_import = ctxt->loc_imp;
+        llcd->llcd_gen = ctxt->loc_gen;
+        ctxt->loc_llcd = llcd;
+        up(&ctxt->loc_sem);
+
+        rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid); 
+        if (rc != 0)
+                CERROR("error recovery process: %d\n", rc);
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(llog_repl_connect);
+
 #else /* !__KERNEL__ */
 
-int llog_obd_repl_cancel(struct llog_obd_ctxt *ctxt,
+int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                          struct lov_stripe_md *lsm, int count,
                          struct llog_cookie *cookies, int flags)
 {