Whamcloud - gitweb
b=14608
authoryury <yury>
Thu, 7 Aug 2008 08:47:11 +0000 (08:47 +0000)
committeryury <yury>
Thu, 7 Aug 2008 08:47:11 +0000 (08:47 +0000)
r=shadow,wangdi

- re-implements recov_thread.c in more elegant way (kills lots of code, thread, etc) and fixes couple of issues by the way. Look at bug for detailed info.

20 files changed:
lustre/include/Makefile.am
lustre/include/lustre_log.h
lustre/include/lustre_net.h
lustre/include/obd.h
lustre/mds/handler.c
lustre/mds/mds_log.c
lustre/mgs/mgs_handler.c
lustre/obdclass/genops.c
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_obd.c
lustre/obdclass/obd_config.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_log.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/ptlrpcd.c
lustre/ptlrpc/recov_thread.c

index 150d348..bc7a500 100644 (file)
 SUBDIRS = linux lustre
 
 EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h         \
 SUBDIRS = linux lustre
 
 EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h         \
-            lustre_commit_confd.h lustre_debug.h lustre_disk.h \
-            lustre_dlm.h lustre_export.h lustre_fsfilt.h lustre_ha.h \
-            lustre_handles.h lustre_import.h lustre_lib.h lustre_sec.h \
-            lustre_lite.h lustre_log.h lustre_mds.h lustre_mdc.h \
-            lustre_net.h lustre_quota.h lustre_ucache.h lvfs.h class_hash.h \
-            obd_cache.h obd_class.h obd_echo.h obd.h obd_lov.h \
+            lustre_debug.h lustre_disk.h lustre_dlm.h lustre_export.h \
+             lustre_fsfilt.h lustre_ha.h lustre_handles.h lustre_import.h \
+             lustre_lib.h lustre_sec.h lustre_lite.h lustre_log.h lustre_mds.h \
+             lustre_mdc.h lustre_net.h lustre_quota.h lustre_ucache.h lvfs.h \
+             class_hash.h obd_cache.h obd_class.h obd_echo.h obd.h obd_lov.h \
             obd_ost.h obd_support.h lustre_ver.h lu_object.h lu_time.h  \
              md_object.h dt_object.h lustre_param.h lustre_mdt.h \
              lustre_fid.h lustre_fld.h lustre_req_layout.h lustre_capa.h \
             obd_ost.h obd_support.h lustre_ver.h lu_object.h lu_time.h  \
              md_object.h dt_object.h lustre_param.h lustre_mdt.h \
              lustre_fid.h lustre_fld.h lustre_req_layout.h lustre_capa.h \
index 94199d4..9e5d3ba 100644 (file)
@@ -112,16 +112,44 @@ extern int llog_cancel_rec(struct llog_handle *loghandle, int index);
 extern int llog_close(struct llog_handle *cathandle);
 extern int llog_get_size(struct llog_handle *loghandle);
 
 extern int llog_close(struct llog_handle *cathandle);
 extern int llog_get_size(struct llog_handle *loghandle);
 
-/* llog_cat.c   -  catalog api */
+/* llog_cat.c - catalog api */
 struct llog_process_data {
 struct llog_process_data {
-        void *lpd_data;
-        llog_cb_t lpd_cb;
+        /**
+         * Any useful data needed while processing catalog. This is
+         * passed later to process callback.
+         */
+        void                *lpd_data;
+        /**
+         * Catalog process callback function, called for each record
+         * in catalog.
+         */
+        llog_cb_t            lpd_cb;
 };
 
 struct llog_process_cat_data {
 };
 
 struct llog_process_cat_data {
-        int     first_idx;
-        int     last_idx;
-        /* to process catalog across zero record */
+        /**
+         * Temporary stored first_idx while scanning log.
+         */
+        int                  lpcd_first_idx;
+        /**
+         * Temporary stored last_idx while scanning log.
+         */
+        int                  lpcd_last_idx;
+};
+
+struct llog_process_cat_args {
+        /**
+         * Llog context used in recovery thread on OST (recov_thread.c)
+         */
+        struct llog_ctxt    *lpca_ctxt;
+        /**
+         * Llog callback used in recovery thread on OST (recov_thread.c)
+         */
+        void                *lpca_cb;
+        /**
+         * Data pointer for llog callback.
+         */
+        void                *lpca_arg;
 };
 
 int llog_cat_put(struct llog_handle *cathandle);
 };
 
 int llog_cat_put(struct llog_handle *cathandle);
@@ -130,6 +158,7 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
                             struct llog_cookie *cookies);
 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data);
 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
                             struct llog_cookie *cookies);
 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data);
+int llog_cat_process_thread(void *data);
 int llog_cat_reverse_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data);
 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index);
 
 int llog_cat_reverse_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data);
 int llog_cat_set_first_idx(struct llog_handle *cathandle, int index);
 
@@ -180,9 +209,9 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                          struct lov_stripe_md *lsm, int count,
                          struct llog_cookie *cookies, int flags);
 int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
                          struct lov_stripe_md *lsm, int count,
                          struct llog_cookie *cookies, int flags);
 int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
-int llog_repl_connect(struct llog_ctxt *ctxt, int count,
-                      struct llog_logid *logid, struct llog_gen *gen,
-                      struct obd_uuid *uuid);
+int llog_obd_repl_connect(struct llog_ctxt *ctxt, int count,
+                          struct llog_logid *logid, struct llog_gen *gen,
+                          struct obd_uuid *uuid);
 
 struct llog_operations {
         int (*lop_write_rec)(struct llog_handle *loghandle,
 
 struct llog_operations {
         int (*lop_write_rec)(struct llog_handle *loghandle,
@@ -224,19 +253,82 @@ struct llog_ctxt {
         int                      loc_idx; /* my index the obd array of ctxt's */
         struct llog_gen          loc_gen;
         struct obd_device       *loc_obd; /* points back to the containing obd*/
         int                      loc_idx; /* my index the obd array of ctxt's */
         struct llog_gen          loc_gen;
         struct obd_device       *loc_obd; /* points back to the containing obd*/
-        struct obd_llog_group     *loc_olg; /* group containing that ctxt */
+        struct obd_llog_group   *loc_olg; /* group containing that ctxt */
         struct obd_export       *loc_exp; /* parent "disk" export (e.g. MDS) */
         struct obd_import       *loc_imp; /* to use in RPC's: can be backward
                                              pointing import */
         struct llog_operations  *loc_logops;
         struct llog_handle      *loc_handle;
         struct obd_export       *loc_exp; /* parent "disk" export (e.g. MDS) */
         struct obd_import       *loc_imp; /* to use in RPC's: can be backward
                                              pointing import */
         struct llog_operations  *loc_logops;
         struct llog_handle      *loc_handle;
+        struct llog_commit_master *loc_lcm;
         struct llog_canceld_ctxt *loc_llcd;
         struct semaphore         loc_sem; /* protects loc_llcd and loc_imp */
         struct llog_canceld_ctxt *loc_llcd;
         struct semaphore         loc_sem; /* protects loc_llcd and loc_imp */
-        atomic_t                   loc_refcount;
-        struct llog_commit_master *loc_lcm;
+        atomic_t                 loc_refcount;
         void                    *llog_proc_cb;
 };
 
         void                    *llog_proc_cb;
 };
 
+#define LCM_NAME_SIZE 64
+
+struct llog_commit_master {
+        /**
+         * Thread control flags (start, stop, etc.)
+         */
+        long                       lcm_flags;
+        /**
+         * Number of llcds onthis lcm.
+         */
+        atomic_t                   lcm_count;
+        /**
+         * Ptlrpc requests set. All cancel rpcs go via this request set.
+         */
+        struct ptlrpc_request_set *lcm_set;
+        /**
+         * Thread control structure. Used for control commit thread.
+         */
+        struct ptlrpcd_ctl         lcm_pc;
+        /**
+         * Commit thread name buffer. Only used for thread start.
+         */
+        char                       lcm_name[LCM_NAME_SIZE];
+};
+
+struct llog_canceld_ctxt {
+        /**
+         * Llog context this llcd is attached to. Used for accessing
+         * ->loc_import and others in process of canceling cookies
+         * gathered in this llcd.
+         */
+        struct llog_ctxt          *llcd_ctxt;
+        /**
+         * Cancel thread control stucture pointer. Used for accessing
+         * it to see if should stop processing and other needs.
+         */
+        struct llog_commit_master *llcd_lcm;
+        /**
+         * Maximal llcd size. Used in calculations on how much of room
+         * left in llcd to cookie comming cookies.
+         */
+        int                        llcd_size;
+        /**
+         * Current llcd size while gathering cookies. This should not be
+         * more than ->llcd_size. Used for determining if we need to
+         * send this llcd (if full) and allocate new one. This is also
+         * used for copying new cookie at the end of buffer.
+         */
+        int                        llcd_cookiebytes;
+        /**
+         * Pointer to the start of cookies buffer.
+         */
+        struct llog_cookie         llcd_cookies[0];
+};
+
+/* ptlrpc/recov_thread.c */
+extern struct llog_commit_master *llog_recov_thread_init(char *name);
+extern void llog_recov_thread_fini(struct llog_commit_master *lcm, 
+                                   int force);
+extern int llog_recov_thread_start(struct llog_commit_master *lcm);
+extern void llog_recov_thread_stop(struct llog_commit_master *lcm, 
+                                    int force);
+
 #ifndef __KERNEL__
 
 #define cap_raise(c, flag) do {} while(0)
 #ifndef __KERNEL__
 
 #define cap_raise(c, flag) do {} while(0)
@@ -310,10 +402,10 @@ static inline void llog_ctxt_put(struct llog_ctxt *ctxt)
 {
         if (ctxt == NULL)
                 return;
 {
         if (ctxt == NULL)
                 return;
-        CDEBUG(D_INFO, "PUTting ctxt %p : new refcount %d\n", ctxt,
-               atomic_read(&ctxt->loc_refcount) - 1);
         LASSERT(atomic_read(&ctxt->loc_refcount) > 0);
         LASSERT(atomic_read(&ctxt->loc_refcount) < 0x5a5a5a);
         LASSERT(atomic_read(&ctxt->loc_refcount) > 0);
         LASSERT(atomic_read(&ctxt->loc_refcount) < 0x5a5a5a);
+        CDEBUG(D_INFO, "PUTting ctxt %p : new refcount %d\n", ctxt,
+               atomic_read(&ctxt->loc_refcount) - 1);
         __llog_ctxt_put(ctxt);
 }
 
         __llog_ctxt_put(ctxt);
 }
 
index 334405c..153e1eb 100644 (file)
@@ -714,6 +714,69 @@ struct ptlrpc_service {
         //struct ptlrpc_srv_ni srv_interfaces[0];
 };
 
         //struct ptlrpc_srv_ni srv_interfaces[0];
 };
 
+struct ptlrpcd_ctl {
+        /**
+         * Ptlrpc thread control flags (LIOD_START, LIOD_STOP, LIOD_STOP_FORCE)
+         */
+        unsigned long               pc_flags;
+        /**
+         * Thread lock protecting structure fields.
+         */
+        spinlock_t                  pc_lock;
+        /**
+         * Start completion.
+         */
+        struct completion           pc_starting;
+        /**
+         * Stop completion.
+         */
+        struct completion           pc_finishing;
+        /**
+         * Thread requests set.
+         */
+        struct ptlrpc_request_set  *pc_set;
+        /**
+         * Thread name used in cfs_daemonize()
+         */
+        char                        pc_name[16];
+#ifndef __KERNEL__
+        /**
+         * Async rpcs flag to make sure that ptlrpcd_check() is called only 
+         * once.
+         */
+        int                         pc_recurred;
+        /**
+         * Currently not used.
+         */
+        void                       *pc_callback;
+        /**
+         * User-space async rpcs callback.
+         */
+        void                       *pc_wait_callback;
+        /**
+         * User-space check idle rpcs callback.
+         */
+        void                       *pc_idle_callback;
+#endif
+};
+
+/* Bits for pc_flags */
+enum ptlrpcd_ctl_flags {
+        /**
+         * Ptlrpc thread start flag.
+         */
+        LIOD_START       = 1 << 0,
+        /**
+         * Ptlrpc thread stop flag.
+         */
+        LIOD_STOP        = 1 << 1,
+        /**
+         * Ptlrpc thread stop force flag. This will cause also 
+         * aborting any inflight rpcs handled by thread.
+         */
+        LIOD_STOP_FORCE  = 1 << 2
+};
+
 /* ptlrpc/events.c */
 extern lnet_handle_eq_t ptlrpc_eq_h;
 extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
 /* ptlrpc/events.c */
 extern lnet_handle_eq_t ptlrpc_eq_h;
 extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
@@ -794,6 +857,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req);
 void ptlrpc_unregister_reply(struct ptlrpc_request *req);
 void ptlrpc_restart_req(struct ptlrpc_request *req);
 void ptlrpc_abort_inflight(struct obd_import *imp);
 void ptlrpc_unregister_reply(struct ptlrpc_request *req);
 void ptlrpc_restart_req(struct ptlrpc_request *req);
 void ptlrpc_abort_inflight(struct obd_import *imp);
+void ptlrpc_abort_set(struct ptlrpc_request_set *set);
 
 struct ptlrpc_request_set *ptlrpc_prep_set(void);
 int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
 
 struct ptlrpc_request_set *ptlrpc_prep_set(void);
 int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
@@ -806,13 +870,16 @@ void ptlrpc_interrupted_set(void *data);
 void ptlrpc_mark_interrupted(struct ptlrpc_request *req);
 void ptlrpc_set_destroy(struct ptlrpc_request_set *);
 void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *);
 void ptlrpc_mark_interrupted(struct ptlrpc_request *req);
 void ptlrpc_set_destroy(struct ptlrpc_request_set *);
 void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *);
-void ptlrpc_set_add_new_req(struct ptlrpc_request_set *,
-                            struct ptlrpc_request *);
+int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
+                           struct ptlrpc_request *req);
 
 void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool);
 void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq);
 
 void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool);
 void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq);
-struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int, int,
-                                                void (*populate_pool)(struct ptlrpc_request_pool *, int));
+
+struct ptlrpc_request_pool *
+ptlrpc_init_rq_pool(int, int,
+                    void (*populate_pool)(struct ptlrpc_request_pool *, int));
+
 void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req);
 struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp,
                                             const struct req_format *format);
 void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req);
 struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp,
                                             const struct req_format *format);
@@ -1065,6 +1132,8 @@ void ping_evictor_stop(void);
 int ptlrpc_check_and_wait_suspend(struct ptlrpc_request *req);
 
 /* ptlrpc/ptlrpcd.c */
 int ptlrpc_check_and_wait_suspend(struct ptlrpc_request *req);
 
 /* ptlrpc/ptlrpcd.c */
+int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc);
+void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force);
 void ptlrpcd_wake(struct ptlrpc_request *req);
 void ptlrpcd_add_req(struct ptlrpc_request *req);
 int ptlrpcd_addref(void);
 void ptlrpcd_wake(struct ptlrpc_request *req);
 void ptlrpcd_add_req(struct ptlrpc_request *req);
 int ptlrpcd_addref(void);
index 3778715..bb3f482 100644 (file)
@@ -401,8 +401,7 @@ struct filter_obd {
         unsigned int             fo_fl_oss_capa;
         struct list_head         fo_capa_keys;
         struct hlist_head       *fo_capa_hash;
         unsigned int             fo_fl_oss_capa;
         struct list_head         fo_capa_keys;
         struct hlist_head       *fo_capa_hash;
-
-        void                    *fo_lcm;
+        struct llog_commit_master *fo_lcm;
 };
 
 #define OSC_MAX_RIF_DEFAULT       8
 };
 
 #define OSC_MAX_RIF_DEFAULT       8
index 0074d7d..beda0d3 100644 (file)
@@ -60,7 +60,6 @@
 #include <obd_lov.h>
 #include <lustre_fsfilt.h>
 #include <lprocfs_status.h>
 #include <obd_lov.h>
 #include <lustre_fsfilt.h>
 #include <lprocfs_status.h>
-#include <lustre_commit_confd.h>
 #include <lustre_quota.h>
 #include <lustre_disk.h>
 #include <lustre_param.h>
 #include <lustre_quota.h>
 #include <lustre_disk.h>
 #include <lustre_param.h>
index 8d1ef17..ff7c307 100644 (file)
@@ -52,9 +52,7 @@
 #include <obd_class.h>
 #include <lustre_fsfilt.h>
 #include <lustre_mds.h>
 #include <obd_class.h>
 #include <lustre_fsfilt.h>
 #include <lustre_mds.h>
-#include <lustre_commit_confd.h>
 #include <lustre_log.h>
 #include <lustre_log.h>
-
 #include "mds_internal.h"
 
 static int mds_llog_origin_add(struct llog_ctxt *ctxt,
 #include "mds_internal.h"
 
 static int mds_llog_origin_add(struct llog_ctxt *ctxt,
index 819a8b0..303e890 100644 (file)
@@ -57,7 +57,6 @@
 #include <lustre_dlm.h>
 #include <lprocfs_status.h>
 #include <lustre_fsfilt.h>
 #include <lustre_dlm.h>
 #include <lprocfs_status.h>
 #include <lustre_fsfilt.h>
-#include <lustre_commit_confd.h>
 #include <lustre_disk.h>
 #include "mgs_internal.h"
 
 #include <lustre_disk.h>
 #include "mgs_internal.h"
 
index 98c15fe..fc42536 100644 (file)
@@ -822,13 +822,13 @@ void class_import_put(struct obd_import *import)
 {
         ENTRY;
 
 {
         ENTRY;
 
-        CDEBUG(D_INFO, "import %p refcount=%d\n", import,
-               atomic_read(&import->imp_refcount) - 1);
-
         LASSERT(atomic_read(&import->imp_refcount) > 0);
         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
         LASSERT(list_empty(&import->imp_zombie_chain));
 
         LASSERT(atomic_read(&import->imp_refcount) > 0);
         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
         LASSERT(list_empty(&import->imp_zombie_chain));
 
+        CDEBUG(D_INFO, "import %p refcount=%d\n", import,
+               atomic_read(&import->imp_refcount) - 1);
+
         if (atomic_dec_and_test(&import->imp_refcount)) {
 
                 CDEBUG(D_INFO, "final put import %p\n", import);
         if (atomic_dec_and_test(&import->imp_refcount)) {
 
                 CDEBUG(D_INFO, "final put import %p\n", import);
index de2b350..aec0202 100644 (file)
@@ -245,11 +245,11 @@ static int llog_process_thread(void *arg)
         cfs_daemonize_ctxt("llog_process_thread");
 
         if (cd != NULL) {
         cfs_daemonize_ctxt("llog_process_thread");
 
         if (cd != NULL) {
-                last_called_index = cd->first_idx;
-                index = cd->first_idx + 1;
+                last_called_index = cd->lpcd_first_idx;
+                index = cd->lpcd_first_idx + 1;
         }
         }
-        if (cd != NULL && cd->last_idx)
-                last_index = cd->last_idx;
+        if (cd != NULL && cd->lpcd_last_idx)
+                last_index = cd->lpcd_last_idx;
         else
                 last_index = LLOG_BITMAP_BYTES * 8 - 1;
 
         else
                 last_index = LLOG_BITMAP_BYTES * 8 - 1;
 
@@ -348,7 +348,7 @@ static int llog_process_thread(void *arg)
 
  out:
         if (cd != NULL)
 
  out:
         if (cd != NULL)
-                cd->last_idx = last_called_index;
+                cd->lpcd_last_idx = last_called_index;
         if (buf)
                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
         lpi->lpi_rc = rc;
         if (buf)
                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
         lpi->lpi_rc = rc;
@@ -415,9 +415,9 @@ int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb,
                 RETURN(-ENOMEM);
 
         if (cd != NULL)
                 RETURN(-ENOMEM);
 
         if (cd != NULL)
-                first_index = cd->first_idx + 1;
-        if (cd != NULL && cd->last_idx)
-                index = cd->last_idx;
+                first_index = cd->lpcd_first_idx + 1;
+        if (cd != NULL && cd->lpcd_last_idx)
+                index = cd->lpcd_last_idx;
         else
                 index = LLOG_BITMAP_BYTES * 8 - 1;
 
         else
                 index = LLOG_BITMAP_BYTES * 8 - 1;
 
index dda79a0..c142dfe 100644 (file)
@@ -395,14 +395,14 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
                 CWARN("catlog "LPX64" crosses index zero\n",
                       cat_llh->lgh_id.lgl_oid);
 
                 CWARN("catlog "LPX64" crosses index zero\n",
                       cat_llh->lgh_id.lgl_oid);
 
-                cd.first_idx = llh->llh_cat_idx;
-                cd.last_idx = 0;
+                cd.lpcd_first_idx = llh->llh_cat_idx;
+                cd.lpcd_last_idx = 0;
                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
                 if (rc != 0)
                         RETURN(rc);
 
                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
                 if (rc != 0)
                         RETURN(rc);
 
-                cd.first_idx = 0;
-                cd.last_idx = cat_llh->lgh_last_idx;
+                cd.lpcd_first_idx = 0;
+                cd.lpcd_last_idx = cat_llh->lgh_last_idx;
                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
         } else {
                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL);
                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
         } else {
                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL);
@@ -412,6 +412,56 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
 }
 EXPORT_SYMBOL(llog_cat_process);
 
 }
 EXPORT_SYMBOL(llog_cat_process);
 
+#ifdef __KERNEL__
+int llog_cat_process_thread(void *data)
+{
+        struct llog_process_cat_args *args = data;
+        struct llog_ctxt *ctxt = args->lpca_ctxt;
+        struct llog_handle *llh = NULL;
+        void  *cb = args->lpca_cb;
+        struct llog_logid logid;
+        int rc;
+        ENTRY;
+
+        cfs_daemonize_ctxt("ll_log_process");
+
+        logid = *(struct llog_logid *)(args->lpca_arg);
+        rc = llog_create(ctxt, &llh, &logid, NULL);
+        if (rc) {
+                CERROR("llog_create() failed %d\n", rc);
+                GOTO(out, rc);
+        }
+        rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
+        if (rc) {
+                CERROR("llog_init_handle failed %d\n", rc);
+                GOTO(release_llh, rc);
+        }
+
+        if (cb) {
+                rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
+                if (rc != LLOG_PROC_BREAK)
+                        CERROR("llog_cat_process() failed %d\n", rc);
+        } else {
+                CWARN("No callback function for recovery\n");
+        }
+
+        /* 
+         * Make sure that all cached data is sent. 
+         */
+        llog_sync(ctxt, NULL);
+        EXIT;
+release_llh:
+        rc = llog_cat_put(llh);
+        if (rc)
+                CERROR("llog_cat_put() failed %d\n", rc);
+out:
+        llog_ctxt_put(ctxt);
+        OBD_FREE(args, sizeof(*args));
+        return rc;
+}
+EXPORT_SYMBOL(llog_cat_process_thread);
+#endif
+
 static int llog_cat_reverse_process_cb(struct llog_handle *cat_llh,
                                        struct llog_rec_hdr *rec, void *data)
 {
 static int llog_cat_reverse_process_cb(struct llog_handle *cat_llh,
                                        struct llog_rec_hdr *rec, void *data)
 {
@@ -456,15 +506,15 @@ int llog_cat_reverse_process(struct llog_handle *cat_llh,
                 CWARN("catalog "LPX64" crosses index zero\n",
                       cat_llh->lgh_id.lgl_oid);
 
                 CWARN("catalog "LPX64" crosses index zero\n",
                       cat_llh->lgh_id.lgl_oid);
 
-                cd.first_idx = 0;
-                cd.last_idx = cat_llh->lgh_last_idx;
+                cd.lpcd_first_idx = 0;
+                cd.lpcd_last_idx = cat_llh->lgh_last_idx;
                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
                                           &d, &cd);
                 if (rc != 0)
                         RETURN(rc);
 
                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
                                           &d, &cd);
                 if (rc != 0)
                         RETURN(rc);
 
-                cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
-                cd.last_idx = 0;
+                cd.lpcd_first_idx = le32_to_cpu(llh->llh_cat_idx);
+                cd.lpcd_last_idx = 0;
                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
                                           &d, &cd);
         } else {
                 rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb,
                                           &d, &cd);
         } else {
index 7256232..945224c 100644 (file)
@@ -126,6 +126,9 @@ int llog_cleanup(struct llog_ctxt *ctxt)
 
         /* try to free the ctxt */
         rc = __llog_ctxt_put(ctxt);
 
         /* try to free the ctxt */
         rc = __llog_ctxt_put(ctxt);
+        if (rc)
+                CERROR("Error %d while cleaning up ctxt %p\n", 
+                       rc, ctxt);
 
         l_wait_event(olg->olg_waitq,
                      llog_group_ctxt_null(olg, idx), &lwi);
 
         l_wait_event(olg->olg_waitq,
                      llog_group_ctxt_null(olg, idx), &lwi);
index 4811205..5113166 100644 (file)
@@ -473,7 +473,6 @@ int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg)
         if (err)
                 CERROR("Precleanup %s returned %d\n",
                        obd->obd_name, err);
         if (err)
                 CERROR("Precleanup %s returned %d\n",
                        obd->obd_name, err);
-
         class_decref(obd);
         obd->obd_set_up = 0;
         RETURN(0);
         class_decref(obd);
         obd->obd_set_up = 0;
         RETURN(0);
@@ -1083,16 +1082,16 @@ int class_config_parse_llog(struct llog_ctxt *ctxt, char *name,
 
         /* continue processing from where we last stopped to end-of-log */
         if (cfg)
 
         /* continue processing from where we last stopped to end-of-log */
         if (cfg)
-                cd.first_idx = cfg->cfg_last_idx;
-        cd.last_idx = 0;
+                cd.lpcd_first_idx = cfg->cfg_last_idx;
+        cd.lpcd_last_idx = 0;
 
         rc = llog_process(llh, class_config_llog_handler, cfg, &cd);
 
         CDEBUG(D_CONFIG, "Processed log %s gen %d-%d (rc=%d)\n", name, 
 
         rc = llog_process(llh, class_config_llog_handler, cfg, &cd);
 
         CDEBUG(D_CONFIG, "Processed log %s gen %d-%d (rc=%d)\n", name, 
-               cd.first_idx + 1, cd.last_idx, rc);
+               cd.lpcd_first_idx + 1, cd.lpcd_last_idx, rc);
 
         if (cfg)
 
         if (cfg)
-                cfg->cfg_last_idx = cd.last_idx;
+                cfg->cfg_last_idx = cd.lpcd_last_idx;
 
 parse_out:
         rc2 = llog_close(llh);
 
 parse_out:
         rc2 = llog_close(llh);
index 80e5c47..adbf725 100644 (file)
@@ -71,7 +71,6 @@
 #include <lustre_fsfilt.h>
 #include <lprocfs_status.h>
 #include <lustre_log.h>
 #include <lustre_fsfilt.h>
 #include <lprocfs_status.h>
 #include <lustre_log.h>
-#include <lustre_commit_confd.h>
 #include <libcfs/list.h>
 #include <lustre_disk.h>
 #include <lustre_quota.h>
 #include <libcfs/list.h>
 #include <lustre_disk.h>
 #include <lustre_quota.h>
@@ -2171,7 +2170,15 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
         return rc;
 }
 
         return rc;
 }
 
-static int filter_group_llog_finish(struct obd_llog_group *olg)
+static struct llog_operations filter_mds_ost_repl_logops;
+
+static struct llog_operations filter_size_orig_logops = {
+        .lop_setup   = llog_obd_origin_setup,
+        .lop_cleanup = llog_obd_origin_cleanup,
+        .lop_add     = llog_obd_origin_add
+};
+
+static int filter_olg_fini(struct obd_llog_group *olg)
 {
         struct llog_ctxt *ctxt;
         int rc = 0, rc2 = 0;
 {
         struct llog_ctxt *ctxt;
         int rc = 0, rc2 = 0;
@@ -2190,20 +2197,12 @@ static int filter_group_llog_finish(struct obd_llog_group *olg)
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static struct llog_operations filter_mds_ost_repl_logops /* initialized below*/;
-static struct llog_operations filter_size_orig_logops = {
-        lop_setup: llog_obd_origin_setup,
-        lop_cleanup: llog_obd_origin_cleanup,
-        lop_add: llog_obd_origin_add
-};
-
-/**
- * Init obd_llog_group of the filter
- */
-static int filter_olg_init(struct obd_device *obd, struct obd_llog_group *olg,
-                           struct obd_device *tgt)
+static int 
+filter_olg_init(struct obd_device *obd, struct obd_llog_group *olg,
+                struct obd_device *tgt)
 {
         int rc;
 {
         int rc;
+        ENTRY;
 
         rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
                         &filter_mds_ost_repl_logops);
 
         rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL,
                         &filter_mds_ost_repl_logops);
@@ -2214,10 +2213,11 @@ static int filter_olg_init(struct obd_device *obd, struct obd_llog_group *olg,
                         &filter_size_orig_logops);
         if (rc)
                 GOTO(cleanup, rc);
                         &filter_size_orig_logops);
         if (rc)
                 GOTO(cleanup, rc);
+        EXIT;
 cleanup:
         if (rc)
 cleanup:
         if (rc)
-              filter_group_llog_finish(olg);
-        RETURN(rc);
+                filter_olg_fini(olg);
+        return rc;
 }
 
 /**
 }
 
 /**
@@ -2230,48 +2230,44 @@ filter_default_olg_init(struct obd_device *obd, struct obd_llog_group *olg,
         struct filter_obd *filter = &obd->u.filter;
         struct llog_ctxt *ctxt;
         int rc;
         struct filter_obd *filter = &obd->u.filter;
         struct llog_ctxt *ctxt;
         int rc;
+        ENTRY;
 
 
-        LASSERT(filter->fo_lcm == NULL);
-        OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master));
+        filter->fo_lcm = llog_recov_thread_init(obd->obd_name);
         if (!filter->fo_lcm)
                 RETURN(-ENOMEM);
 
         if (!filter->fo_lcm)
                 RETURN(-ENOMEM);
 
-        rc = llog_init_commit_master((struct llog_commit_master *)
-                                     filter->fo_lcm);
-        if (rc)
-                GOTO(cleanup, rc);
-
         filter_mds_ost_repl_logops = llog_client_ops;
         filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel;
         filter_mds_ost_repl_logops = llog_client_ops;
         filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel;
-        filter_mds_ost_repl_logops.lop_connect = llog_repl_connect;
+        filter_mds_ost_repl_logops.lop_connect = llog_obd_repl_connect;
         filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
 
         rc = filter_olg_init(obd, olg, tgt);
         if (rc)
         filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync;
 
         rc = filter_olg_init(obd, olg, tgt);
         if (rc)
-                GOTO(cleanup, rc);
+                GOTO(cleanup_lcm, rc);
 
         ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
 
         ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
-        LASSERT(ctxt != NULL);
+        if (!ctxt) {
+                CERROR("Can't get ctxt for %p:%x\n", olg, 
+                       LLOG_MDS_OST_REPL_CTXT);
+                GOTO(cleanup_olg, rc = -ENODEV);
+        }
+        ctxt->loc_lcm = filter->fo_lcm;
         ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
         ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
-        ctxt->loc_lcm = obd->u.filter.fo_lcm;
-        /* Only start log commit thread for default llog ctxt*/
-        rc = llog_start_commit_thread(ctxt->loc_lcm);
         llog_ctxt_put(ctxt);
         llog_ctxt_put(ctxt);
-        if (rc)
-                GOTO(cleanup, rc);
-cleanup:
-        if (rc) {
-                llog_cleanup_commit_master(filter->fo_lcm, 0);
-                OBD_FREE(filter->fo_lcm, sizeof(struct llog_commit_master));
-                filter->fo_lcm = NULL;
-        }
-        RETURN(rc);
+
+        RETURN(0);
+cleanup_olg:
+        filter_olg_fini(olg);
+cleanup_lcm:
+        llog_recov_thread_fini(filter->fo_lcm, 1);
+        filter->fo_lcm = NULL;
+        return rc;
 }
                      
 }
                      
-static int filter_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
-                            struct obd_device *tgt, int count,
-                            struct llog_catid *catid,
-                            struct obd_uuid *uuid)
+static int 
+filter_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
+                 struct obd_device *tgt, int count, struct llog_catid *catid,
+                 struct obd_uuid *uuid)
 {
         struct filter_obd *filter = &obd->u.filter;
         struct llog_ctxt *ctxt;
 {
         struct filter_obd *filter = &obd->u.filter;
         struct llog_ctxt *ctxt;
@@ -2282,38 +2278,49 @@ static int filter_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
         if (olg == &obd->obd_olg)
                 return filter_default_olg_init(obd, olg, tgt);
 
         if (olg == &obd->obd_olg)
                 return filter_default_olg_init(obd, olg, tgt);
 
-        /* For other group llog, which will be initialized in
-         * llog_connect(after default_olg_init, so fo_lcm must
-         * already be initialized */ 
         LASSERT(filter->fo_lcm != NULL);
         rc = filter_olg_init(obd, olg, tgt);
         if (rc)
                 RETURN(rc);
         ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
         LASSERT(filter->fo_lcm != NULL);
         rc = filter_olg_init(obd, olg, tgt);
         if (rc)
                 RETURN(rc);
         ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
-        LASSERT(ctxt != NULL);
+        if (!ctxt) {
+                CERROR("Can't get ctxt for %p:%x\n", olg, 
+                       LLOG_MDS_OST_REPL_CTXT);
+                filter_olg_fini(olg);
+                RETURN(-ENODEV);
+        }
         ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
         ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb;
-        ctxt->loc_lcm = obd->u.filter.fo_lcm;
+        ctxt->loc_lcm = filter->fo_lcm;
         llog_ctxt_put(ctxt);
         llog_ctxt_put(ctxt);
-
         RETURN(rc);
 }
 
 static int filter_llog_finish(struct obd_device *obd, int count)
 {
         RETURN(rc);
 }
 
 static int filter_llog_finish(struct obd_device *obd, int count)
 {
-        int rc;
+        struct filter_obd *filter = &obd->u.filter;
+        struct llog_ctxt *ctxt;
         ENTRY;
         ENTRY;
-
-        if (obd->u.filter.fo_lcm) { 
-                llog_cleanup_commit_master((struct llog_commit_master *)
-                                           obd->u.filter.fo_lcm, 1);
-                OBD_FREE(obd->u.filter.fo_lcm, 
-                         sizeof(struct llog_commit_master));
-                obd->u.filter.fo_lcm = NULL;
+        
+        ctxt = llog_group_get_ctxt(&obd->obd_olg, LLOG_MDS_OST_REPL_CTXT);
+        LASSERT(ctxt != NULL);
+        mutex_down(&ctxt->loc_sem);
+        if (ctxt->loc_imp) {
+                /*
+                 * Balance class_import_get() in llog_receptor_accept(). This
+                 * is safe to do here, as llog is already synchronized and its
+                 * import may go.
+                 */
+                class_import_put(ctxt->loc_imp);
+                ctxt->loc_imp = NULL;
         }
         }
-        /* finish obd llog group */
-        rc = filter_group_llog_finish(&obd->obd_olg);
+        mutex_up(&ctxt->loc_sem);
+        llog_ctxt_put(ctxt);
 
 
-        RETURN(rc);
+        if (filter->fo_lcm) {
+                llog_recov_thread_fini(filter->fo_lcm, obd->obd_force);
+                filter->fo_lcm = NULL;
+        }
+        RETURN(filter_olg_fini(&obd->obd_olg));
 }
 
 /**
 }
 
 /**
@@ -2441,7 +2448,7 @@ static int filter_llog_connect(struct obd_export *exp,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static int filter_llog_preclean (struct obd_device *obd)
+static int filter_llog_preclean(struct obd_device *obd)
 {
         struct obd_llog_group *olg, *tmp;
         struct filter_obd *filter;
 {
         struct obd_llog_group *olg, *tmp;
         struct filter_obd *filter;
@@ -2467,7 +2474,7 @@ static int filter_llog_preclean (struct obd_device *obd)
         
         list_for_each_entry_safe(olg, tmp, &remove_list, olg_list) {  
                 list_del_init(&olg->olg_list);
         
         list_for_each_entry_safe(olg, tmp, &remove_list, olg_list) {  
                 list_del_init(&olg->olg_list);
-                rc = filter_group_llog_finish(olg);
+                rc = filter_olg_fini(olg);
                 if (rc)
                         CERROR("failed to cleanup llogging subsystem for %u\n",
                                olg->olg_group);
                 if (rc)
                         CERROR("failed to cleanup llogging subsystem for %u\n",
                                olg->olg_group);
index a6aedf0..e82d5e1 100644 (file)
@@ -50,9 +50,8 @@
 
 #include <libcfs/list.h>
 #include <obd_class.h>
 
 #include <libcfs/list.h>
 #include <obd_class.h>
+#include <lustre_log.h>
 #include <lustre_fsfilt.h>
 #include <lustre_fsfilt.h>
-#include <lustre_commit_confd.h>
-
 #include "filter_internal.h"
 
 int filter_log_sz_change(struct llog_handle *cathandle,
 #include "filter_internal.h"
 
 int filter_log_sz_change(struct llog_handle *cathandle,
index 3596108..bc414e6 100644 (file)
@@ -53,7 +53,6 @@
 #include <lustre_debug.h>
 #include <linux/init.h>
 #include <lprocfs_status.h>
 #include <lustre_debug.h>
 #include <linux/init.h>
 #include <lprocfs_status.h>
-#include <lustre_commit_confd.h>
 #include <libcfs/list.h>
 #include <lustre_quota.h>
 #include "ost_internal.h"
 #include <libcfs/list.h>
 #include <lustre_quota.h>
 #include "ost_internal.h"
index 8b133e4..693e28d 100644 (file)
@@ -473,6 +473,7 @@ static struct ptlrpc_request *ptlrpc_prep_req_from_pool(struct ptlrpc_request_po
         request->rq_reqbuf = reqbuf;
         request->rq_reqbuf_len = pool->prp_rq_size;
         request->rq_pool = pool;
         request->rq_reqbuf = reqbuf;
         request->rq_reqbuf_len = pool->prp_rq_size;
         request->rq_pool = pool;
+
         return request;
 }
 
         return request;
 }
 
@@ -781,16 +782,36 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
         atomic_inc(&req->rq_import->imp_inflight);
 }
 
         atomic_inc(&req->rq_import->imp_inflight);
 }
 
-/* lock so many callers can add things, the context that owns the set
- * is supposed to notice these and move them into the set proper. */
-void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
-                            struct ptlrpc_request *req)
+/** 
+ * Lock so many callers can add things, the context that owns the set
+ * is supposed to notice these and move them into the set proper. 
+ */
+int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
+                           struct ptlrpc_request *req)
 {
 {
+        struct ptlrpc_request_set *set = pc->pc_set;
+
+        /* 
+         * Let caller know that we stopped and will not handle this request.
+         * It needs to take care itself of request.
+         */
+        if (test_bit(LIOD_STOP, &pc->pc_flags))
+                return -EALREADY;
+
         spin_lock(&set->set_new_req_lock);
         spin_lock(&set->set_new_req_lock);
-        /* The set takes over the caller's request reference */
+        /* 
+         * The set takes over the caller's request reference. 
+         */
         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
         req->rq_set = set;
         spin_unlock(&set->set_new_req_lock);
         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
         req->rq_set = set;
         spin_unlock(&set->set_new_req_lock);
+
+        /*
+         * Let thead know that we added something and better it to wake up 
+         * and process.
+         */
+        cfs_waitq_signal(&set->set_waitq);
+        return 0;
 }
 
 /*
 }
 
 /*
@@ -2331,6 +2352,29 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
         EXIT;
 }
 
         EXIT;
 }
 
+void ptlrpc_abort_set(struct ptlrpc_request_set *set)
+{
+        struct list_head *tmp, *n;
+
+        LASSERT(set != NULL);
+
+        list_for_each_safe(tmp, n, &set->set_requests) {
+                struct ptlrpc_request *req =
+                        list_entry(tmp, struct ptlrpc_request, rq_set_chain);
+
+                spin_lock (&req->rq_lock);
+                if (req->rq_phase != RQ_PHASE_RPC) {
+                        spin_unlock (&req->rq_lock);
+                        continue;
+                }
+
+                req->rq_err = 1;
+                req->rq_status = -EINTR;
+                ptlrpc_wake_client_req(req);
+                spin_unlock (&req->rq_lock);
+        }
+}
+
 static __u64 ptlrpc_last_xid = 0;
 spinlock_t ptlrpc_last_xid_lock;
 
 static __u64 ptlrpc_last_xid = 0;
 spinlock_t ptlrpc_last_xid_lock;
 
index 1a62924..6a00fad 100644 (file)
@@ -124,6 +124,10 @@ void sptlrpc_gc_stop_thread(void);
 int  __init sptlrpc_init(void);
 void __exit sptlrpc_fini(void);
 
 int  __init sptlrpc_init(void);
 void __exit sptlrpc_fini(void);
 
+/* recov_thread.c */
+int llog_recov_init(void);
+void llog_recov_fini(void);
+
 static inline int ll_rpc_recoverable_error(int rc)
 { 
         return (rc == -ENOTCONN || rc == -ENODEV);
 static inline int ll_rpc_recoverable_error(int rc)
 { 
         return (rc == -ENOTCONN || rc == -ENODEV);
index 94e3df4..508b432 100644 (file)
@@ -101,10 +101,17 @@ __init int ptlrpc_init(void)
         if (rc)
                 GOTO(cleanup, rc);
 
         if (rc)
                 GOTO(cleanup, rc);
 
+        cleanup_phase = 6;
+        rc = llog_recov_init();
+        if (rc)
+                GOTO(cleanup, rc);
+
         RETURN(0);
 
 cleanup:
         switch(cleanup_phase) {
         RETURN(0);
 
 cleanup:
         switch(cleanup_phase) {
+        case 6:
+                sptlrpc_fini();
         case 5:
                 ldlm_exit();
         case 4:
         case 5:
                 ldlm_exit();
         case 4:
@@ -124,6 +131,7 @@ cleanup:
 #ifdef __KERNEL__
 static void __exit ptlrpc_exit(void)
 {
 #ifdef __KERNEL__
 static void __exit ptlrpc_exit(void)
 {
+        llog_recov_fini();
         sptlrpc_fini();
         ldlm_exit();
         ptlrpc_stop_pinger();
         sptlrpc_fini();
         ldlm_exit();
         ptlrpc_stop_pinger();
index c09663a..8699e7d 100644 (file)
 #include <obd_support.h> /* for OBD_FAIL_CHECK */
 #include <lprocfs_status.h>
 
 #include <obd_support.h> /* for OBD_FAIL_CHECK */
 #include <lprocfs_status.h>
 
-#define LIOD_STOP 0
-struct ptlrpcd_ctl {
-        unsigned long             pc_flags;
-        spinlock_t                pc_lock;
-        struct completion         pc_starting;
-        struct completion         pc_finishing;
-        struct ptlrpc_request_set *pc_set;
-        char                      pc_name[16];
-#ifndef __KERNEL__
-        int                       pc_recurred;
-        void                     *pc_callback;
-        void                     *pc_wait_callback;
-        void                     *pc_idle_callback;
-#endif
-};
-
 static struct ptlrpcd_ctl ptlrpcd_pc;
 static struct ptlrpcd_ctl ptlrpcd_recovery_pc;
 
 static struct ptlrpcd_ctl ptlrpcd_pc;
 static struct ptlrpcd_ctl ptlrpcd_recovery_pc;
 
@@ -84,19 +68,40 @@ void ptlrpcd_wake(struct ptlrpc_request *req)
         cfs_waitq_signal(&rq_set->set_waitq);
 }
 
         cfs_waitq_signal(&rq_set->set_waitq);
 }
 
-/* requests that are added to the ptlrpcd queue are sent via
- * ptlrpcd_check->ptlrpc_check_set() */
+/* 
+ * Requests that are added to the ptlrpcd queue are sent via
+ * ptlrpcd_check->ptlrpc_check_set().
+ */
 void ptlrpcd_add_req(struct ptlrpc_request *req)
 {
         struct ptlrpcd_ctl *pc;
 void ptlrpcd_add_req(struct ptlrpc_request *req)
 {
         struct ptlrpcd_ctl *pc;
+        int rc;
 
         if (req->rq_send_state == LUSTRE_IMP_FULL)
                 pc = &ptlrpcd_pc;
         else
                 pc = &ptlrpcd_recovery_pc;
 
 
         if (req->rq_send_state == LUSTRE_IMP_FULL)
                 pc = &ptlrpcd_pc;
         else
                 pc = &ptlrpcd_recovery_pc;
 
-        ptlrpc_set_add_new_req(pc->pc_set, req);
-        cfs_waitq_signal(&pc->pc_set->set_waitq);
+        rc = ptlrpc_set_add_new_req(pc, req);
+        if (rc) {
+                int (*interpreter)(struct ptlrpc_request *,
+                                   void *, int);
+                                   
+                interpreter = req->rq_interpret_reply;
+
+                /*
+                 * Thread is probably in stop now so we need to
+                 * kill this rpc as it was not added. Let's call
+                 * interpret for it to let know we're killing it
+                 * so that higher levels might free assosiated
+                 * resources.
+                 */
+                req->rq_status = -EBADR;
+                interpreter(req, &req->rq_async_args,
+                            req->rq_status);
+                req->rq_set = NULL;
+                ptlrpc_req_finished(req);
+        }
 }
 
 static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
 }
 
 static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
@@ -114,15 +119,20 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
                 req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
                 list_del_init(&req->rq_set_chain);
                 ptlrpc_set_add_req(pc->pc_set, req);
                 req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
                 list_del_init(&req->rq_set_chain);
                 ptlrpc_set_add_req(pc->pc_set, req);
-                rc = 1; /* need to calculate its timeout */
+                /* 
+                 * Need to calculate its timeout. 
+                 */
+                rc = 1;
         }
         spin_unlock(&pc->pc_set->set_new_req_lock);
 
         if (pc->pc_set->set_remaining) {
                 rc = rc | ptlrpc_check_set(pc->pc_set);
 
         }
         spin_unlock(&pc->pc_set->set_new_req_lock);
 
         if (pc->pc_set->set_remaining) {
                 rc = rc | ptlrpc_check_set(pc->pc_set);
 
-                /* XXX our set never completes, so we prune the completed
-                 * reqs after each iteration. boy could this be smarter. */
+                /* 
+                 * XXX: our set never completes, so we prune the completed
+                 * reqs after each iteration. boy could this be smarter. 
+                 */
                 list_for_each_safe(pos, tmp, &pc->pc_set->set_requests) {
                         req = list_entry(pos, struct ptlrpc_request,
                                          rq_set_chain);
                 list_for_each_safe(pos, tmp, &pc->pc_set->set_requests) {
                         req = list_entry(pos, struct ptlrpc_request,
                                          rq_set_chain);
@@ -136,7 +146,9 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
         }
 
         if (rc == 0) {
         }
 
         if (rc == 0) {
-                /* If new requests have been added, make sure to wake up */
+                /* 
+                 * If new requests have been added, make sure to wake up. 
+                 */
                 spin_lock(&pc->pc_set->set_new_req_lock);
                 rc = !list_empty(&pc->pc_set->set_new_requests);
                 spin_unlock(&pc->pc_set->set_new_req_lock);
                 spin_lock(&pc->pc_set->set_new_req_lock);
                 rc = !list_empty(&pc->pc_set->set_new_requests);
                 spin_unlock(&pc->pc_set->set_new_req_lock);
@@ -146,9 +158,11 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
 }
 
 #ifdef __KERNEL__
 }
 
 #ifdef __KERNEL__
-/* ptlrpc's code paths like to execute in process context, so we have this
- * thread which spins on a set which contains the io rpcs.  llite specifies
- * ptlrpcd's set when it pushes pages down into the oscs */
+/* 
+ * ptlrpc's code paths like to execute in process context, so we have this
+ * thread which spins on a set which contains the io rpcs. llite specifies
+ * ptlrpcd's set when it pushes pages down into the oscs.
+ */
 static int ptlrpcd(void *arg)
 {
         struct ptlrpcd_ctl *pc = arg;
 static int ptlrpcd(void *arg)
 {
         struct ptlrpcd_ctl *pc = arg;
@@ -157,16 +171,17 @@ static int ptlrpcd(void *arg)
 
         if ((rc = cfs_daemonize_ctxt(pc->pc_name))) {
                 complete(&pc->pc_starting);
 
         if ((rc = cfs_daemonize_ctxt(pc->pc_name))) {
                 complete(&pc->pc_starting);
-                return rc;
+                goto out;
         }
 
         complete(&pc->pc_starting);
 
         }
 
         complete(&pc->pc_starting);
 
-        /* this mainloop strongly resembles ptlrpc_set_wait except
-         * that our set never completes.  ptlrpcd_check calls ptlrpc_check_set
-         * when there are requests in the set.  new requests come in
-         * on the set's new_req_list and ptlrpcd_check moves them into
-         * the set. */
+        /* 
+         * This mainloop strongly resembles ptlrpc_set_wait() except that our
+         * set never completes.  ptlrpcd_check() calls ptlrpc_check_set() when
+         * there are requests in the set. New requests come in on the set's 
+         * new_req_list and ptlrpcd_check() moves them into the set. 
+         */
         while (1) {
                 struct l_wait_info lwi;
                 cfs_duration_t timeout;
         while (1) {
                 struct l_wait_info lwi;
                 cfs_duration_t timeout;
@@ -176,13 +191,26 @@ static int ptlrpcd(void *arg)
 
                 l_wait_event(pc->pc_set->set_waitq, ptlrpcd_check(pc), &lwi);
 
 
                 l_wait_event(pc->pc_set->set_waitq, ptlrpcd_check(pc), &lwi);
 
+                /*
+                 * Abort inflight rpcs for forced stop case.
+                 */
+                if (test_bit(LIOD_STOP_FORCE, &pc->pc_flags))
+                        ptlrpc_abort_set(pc->pc_set);
+
                 if (test_bit(LIOD_STOP, &pc->pc_flags))
                         break;
         }
                 if (test_bit(LIOD_STOP, &pc->pc_flags))
                         break;
         }
-        /* wait for inflight requests to drain */
+
+        /* 
+         * Wait for inflight requests to drain. 
+         */
         if (!list_empty(&pc->pc_set->set_requests))
                 ptlrpc_set_wait(pc->pc_set);
         if (!list_empty(&pc->pc_set->set_requests))
                 ptlrpc_set_wait(pc->pc_set);
+
         complete(&pc->pc_finishing);
         complete(&pc->pc_finishing);
+out:
+        clear_bit(LIOD_START, &pc->pc_flags);
+        clear_bit(LIOD_STOP, &pc->pc_flags);
         return 0;
 }
 
         return 0;
 }
 
@@ -193,14 +221,18 @@ int ptlrpcd_check_async_rpcs(void *arg)
         struct ptlrpcd_ctl *pc = arg;
         int                  rc = 0;
 
         struct ptlrpcd_ctl *pc = arg;
         int                  rc = 0;
 
-        /* single threaded!! */
+        /* 
+         * Single threaded!! 
+         */
         pc->pc_recurred++;
 
         if (pc->pc_recurred == 1) {
                 rc = ptlrpcd_check(pc);
                 if (!rc)
                         ptlrpc_expired_set(pc->pc_set);
         pc->pc_recurred++;
 
         if (pc->pc_recurred == 1) {
                 rc = ptlrpcd_check(pc);
                 if (!rc)
                         ptlrpc_expired_set(pc->pc_set);
-                /*XXX send replay requests */
+                /* 
+                 * XXX: send replay requests. 
+                 */
                 if (pc == &ptlrpcd_recovery_pc)
                         rc = ptlrpcd_check(pc);
         }
                 if (pc == &ptlrpcd_recovery_pc)
                         rc = ptlrpcd_check(pc);
         }
@@ -219,29 +251,36 @@ int ptlrpcd_idle(void *arg)
 
 #endif
 
 
 #endif
 
-static int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc)
+int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc)
 {
 {
-        int rc;
-
+        int rc = 0;
         ENTRY;
         ENTRY;
-        memset(pc, 0, sizeof(*pc));
+        /* 
+         * Do not allow start second thread for one pc. 
+         */
+        if (test_and_set_bit(LIOD_START, &pc->pc_flags)) {
+                CERROR("Starting second thread (%s) for same pc %p\n",
+                       name, pc);
+                RETURN(-EALREADY);
+        }
+
         init_completion(&pc->pc_starting);
         init_completion(&pc->pc_finishing);
         init_completion(&pc->pc_starting);
         init_completion(&pc->pc_finishing);
-        pc->pc_flags = 0;
         spin_lock_init(&pc->pc_lock);
         snprintf (pc->pc_name, sizeof (pc->pc_name), name);
 
         pc->pc_set = ptlrpc_prep_set();
         if (pc->pc_set == NULL)
         spin_lock_init(&pc->pc_lock);
         snprintf (pc->pc_name, sizeof (pc->pc_name), name);
 
         pc->pc_set = ptlrpc_prep_set();
         if (pc->pc_set == NULL)
-                RETURN(-ENOMEM);
+                GOTO(out, rc = -ENOMEM);
 
 #ifdef __KERNEL__
         rc = cfs_kernel_thread(ptlrpcd, pc, 0);
         if (rc < 0)  {
                 ptlrpc_set_destroy(pc->pc_set);
 
 #ifdef __KERNEL__
         rc = cfs_kernel_thread(ptlrpcd, pc, 0);
         if (rc < 0)  {
                 ptlrpc_set_destroy(pc->pc_set);
-                RETURN(rc);
+                GOTO(out, rc);
         }
         }
-
+        rc = 0;
         wait_for_completion(&pc->pc_starting);
 #else
         pc->pc_wait_callback =
         wait_for_completion(&pc->pc_starting);
 #else
         pc->pc_wait_callback =
@@ -250,14 +289,23 @@ static int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc)
         pc->pc_idle_callback =
                 liblustre_register_idle_callback("ptlrpcd_check_idle_rpcs",
                                                  &ptlrpcd_idle, pc);
         pc->pc_idle_callback =
                 liblustre_register_idle_callback("ptlrpcd_check_idle_rpcs",
                                                  &ptlrpcd_idle, pc);
-        (void)rc;
 #endif
 #endif
-        RETURN(0);
+out:
+        if (rc)
+                clear_bit(LIOD_START, &pc->pc_flags);
+        RETURN(rc);
 }
 
 }
 
-static void ptlrpcd_stop(struct ptlrpcd_ctl *pc)
+void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force)
 {
 {
+        if (!test_bit(LIOD_START, &pc->pc_flags)) {
+                CERROR("Thread for pc %p was not started\n", pc);
+                return;
+        }
+
         set_bit(LIOD_STOP, &pc->pc_flags);
         set_bit(LIOD_STOP, &pc->pc_flags);
+        if (force)
+                set_bit(LIOD_STOP_FORCE, &pc->pc_flags);
         cfs_waitq_signal(&pc->pc_set->set_waitq);
 #ifdef __KERNEL__
         wait_for_completion(&pc->pc_finishing);
         cfs_waitq_signal(&pc->pc_set->set_waitq);
 #ifdef __KERNEL__
         wait_for_completion(&pc->pc_finishing);
@@ -285,7 +333,7 @@ int ptlrpcd_addref(void)
 
         rc = ptlrpcd_start("ptlrpcd-recov", &ptlrpcd_recovery_pc);
         if (rc) {
 
         rc = ptlrpcd_start("ptlrpcd-recov", &ptlrpcd_recovery_pc);
         if (rc) {
-                ptlrpcd_stop(&ptlrpcd_pc);
+                ptlrpcd_stop(&ptlrpcd_pc, 0);
                 --ptlrpcd_users;
                 GOTO(out, rc);
         }
                 --ptlrpcd_users;
                 GOTO(out, rc);
         }
@@ -298,8 +346,8 @@ void ptlrpcd_decref(void)
 {
         mutex_down(&ptlrpcd_sem);
         if (--ptlrpcd_users == 0) {
 {
         mutex_down(&ptlrpcd_sem);
         if (--ptlrpcd_users == 0) {
-                ptlrpcd_stop(&ptlrpcd_pc);
-                ptlrpcd_stop(&ptlrpcd_recovery_pc);
+                ptlrpcd_stop(&ptlrpcd_pc, 0);
+                ptlrpcd_stop(&ptlrpcd_recovery_pc, 0);
         }
         mutex_up(&ptlrpcd_sem);
 }
         }
         mutex_up(&ptlrpcd_sem);
 }
index f2a5ba0..b882d03 100644 (file)
@@ -40,7 +40,9 @@
  * - we do not share logs among different OST<->MDS connections, so that
  *   if an OST or MDS fails it need only look at log(s) relevant to itself
  *
  * - we do not share logs among different OST<->MDS connections, so that
  *   if an OST or MDS fails it need only look at log(s) relevant to itself
  *
- * Author: Andreas Dilger <adilger@clusterfs.com>
+ * Author: Andreas Dilger   <adilger@clusterfs.com>
+ *         Yury Umanets     <yury.umanets@sun.com>
+ *         Alexey Lyashkov  <alexey.lyashkov@sun.com>
  */
 
 #define DEBUG_SUBSYSTEM S_LOG
  */
 
 #define DEBUG_SUBSYSTEM S_LOG
@@ -57,7 +59,6 @@
 #endif
 
 #include <obd_class.h>
 #endif
 
 #include <obd_class.h>
-#include <lustre_commit_confd.h>
 #include <obd_support.h>
 #include <obd_class.h>
 #include <lustre_net.h>
 #include <obd_support.h>
 #include <obd_class.h>
 #include <lustre_net.h>
 #include <lustre_log.h>
 #include "ptlrpc_internal.h"
 
 #include <lustre_log.h>
 #include "ptlrpc_internal.h"
 
-#ifdef __KERNEL__
+static atomic_t                   llcd_count = ATOMIC_INIT(0);
+static cfs_mem_cache_t           *llcd_cache = NULL;
 
 
-/* Allocate new commit structs in case we do not have enough.
- * Make the llcd size small enough that it fits into a single page when we
- * are sending/receiving it. */
-static int llcd_alloc(struct llog_commit_master *lcm)
+#ifdef __KERNEL__
+enum {
+        LLOG_LCM_FL_START       = 1 << 0,
+        LLOG_LCM_FL_EXIT        = 1 << 1
+};
+
+/** 
+ * Allocate new llcd from cache, init it and return to caller.
+ * Bumps number of objects allocated.
+ */
+static struct llog_canceld_ctxt *llcd_alloc(void)
 {
         struct llog_canceld_ctxt *llcd;
         int llcd_size;
 
 {
         struct llog_canceld_ctxt *llcd;
         int llcd_size;
 
-        /* payload of lustre_msg V2 is bigger */
-        llcd_size = 4096 - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
-        OBD_ALLOC(llcd,
-                  llcd_size + offsetof(struct llog_canceld_ctxt, llcd_cookies));
-        if (llcd == NULL)
-                return -ENOMEM;
+        /* 
+         * Payload of lustre_msg V2 is bigger.
+         */
+        llcd_size = CFS_PAGE_SIZE - 
+                lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
+        llcd_size += offsetof(struct llog_canceld_ctxt, llcd_cookies);
+        OBD_SLAB_ALLOC(llcd, llcd_cache, CFS_ALLOC_STD, llcd_size);
+        if (!llcd)
+                return NULL;
 
         llcd->llcd_size = llcd_size;
 
         llcd->llcd_size = llcd_size;
-        llcd->llcd_lcm = lcm;
-
-        spin_lock(&lcm->lcm_llcd_lock);
-        list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
-        atomic_inc(&lcm->lcm_llcd_numfree);
-        spin_unlock(&lcm->lcm_llcd_lock);
-
-        return 0;
-}
-
-/* Get a free cookie struct from the list */
-static struct llog_canceld_ctxt *llcd_grab(struct llog_commit_master *lcm)
-{
-        struct llog_canceld_ctxt *llcd;
-
-repeat:
-        spin_lock(&lcm->lcm_llcd_lock);
-        if (list_empty(&lcm->lcm_llcd_free)) {
-                spin_unlock(&lcm->lcm_llcd_lock);
-                if (llcd_alloc(lcm) < 0) {
-                        CERROR("unable to allocate log commit data!\n");
-                        return NULL;
-                }
-                /* check new llcd wasn't grabbed while lock dropped, b=7407 */
-                goto repeat;
-        }
-
-        llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list);
-        list_del(&llcd->llcd_list);
-        atomic_dec(&lcm->lcm_llcd_numfree);
-        spin_unlock(&lcm->lcm_llcd_lock);
-
         llcd->llcd_cookiebytes = 0;
         llcd->llcd_cookiebytes = 0;
-
+        atomic_inc(&llcd_count);
         return llcd;
 }
 
         return llcd;
 }
 
-static void llcd_put(struct llog_canceld_ctxt *llcd)
+/**
+ * Returns passed llcd to cache.
+ */
+static void llcd_free(struct llog_canceld_ctxt *llcd)
 {
 {
-        struct llog_commit_master *lcm = llcd->llcd_lcm;
-
-        llog_ctxt_put(llcd->llcd_ctxt);
-        if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
-                int llcd_size = llcd->llcd_size +
-                         offsetof(struct llog_canceld_ctxt, llcd_cookies);
-                OBD_FREE(llcd, llcd_size);
-        } else {
-                spin_lock(&lcm->lcm_llcd_lock);
-                list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
-                atomic_inc(&lcm->lcm_llcd_numfree);
-                spin_unlock(&lcm->lcm_llcd_lock);
-        }
+        OBD_SLAB_FREE(llcd, llcd_cache, llcd->llcd_size);
+        atomic_dec(&llcd_count);
 }
 
 }
 
-/* Send some cookies to the appropriate target */
-static void llcd_send(struct llog_canceld_ctxt *llcd)
+/**
+ * Copy passed @cookies to @llcd.
+ */
+static void llcd_copy(struct llog_canceld_ctxt *llcd, 
+                      struct llog_cookie *cookies)
 {
 {
-        if (!(llcd->llcd_lcm->lcm_flags & LLOG_LCM_FL_EXIT)) {
-                spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
-                list_add_tail(&llcd->llcd_list,
-                              &llcd->llcd_lcm->lcm_llcd_pending);
-                spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
-        }
-        cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1);
+        memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, 
+              cookies, sizeof(*cookies));
+        llcd->llcd_cookiebytes += sizeof(*cookies);
 }
 
 /**
 }
 
 /**
- * Grab llcd and assign it to passed @ctxt. Also set up backward link
- * and get ref on @ctxt.
+ * Checks if passed cookie fits into llcd free space buffer. Returns
+ * 1 if yes and 0 otherwise.
  */
  */
-static struct llog_canceld_ctxt *ctxt_llcd_grab(struct llog_ctxt *ctxt)
+static int llcd_fit(struct llog_canceld_ctxt *llcd,
+                 struct llog_cookie *cookies)
 {
 {
-        struct llog_canceld_ctxt *llcd;
-
-        LASSERT_SEM_LOCKED(&ctxt->loc_sem);
-        llcd = llcd_grab(ctxt->loc_lcm);
-        if (llcd == NULL)
-                return NULL;
-
-        llcd->llcd_ctxt = llog_ctxt_get(ctxt);
-        ctxt->loc_llcd = llcd;
+        return (llcd->llcd_size - 
+                llcd->llcd_cookiebytes) >= sizeof(*cookies);
+}
 
 
-        CDEBUG(D_RPCTRACE,"grab llcd %p:%p\n", ctxt->loc_llcd, ctxt);
-        return llcd;
+static void llcd_print(struct llog_canceld_ctxt *llcd, 
+                       const char *func, int line) 
+{
+        CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line);
+        CDEBUG(D_RPCTRACE, "  size: %d\n", llcd->llcd_size);
+        CDEBUG(D_RPCTRACE, "  ctxt: %p\n", llcd->llcd_ctxt);
+        CDEBUG(D_RPCTRACE, "  lcm : %p\n", llcd->llcd_lcm);
+        CDEBUG(D_RPCTRACE, "  cookiebytes : %d\n", llcd->llcd_cookiebytes);
 }
 
 /**
 }
 
 /**
- * Put llcd in passed @ctxt. Set ->loc_llcd to NULL.
+ * Llcd completion function. Called uppon llcd send finish regardless
+ * sending result. Error is passed in @rc. Note, that this will be called
+ * in cleanup time when all inflight rpcs aborted.
  */
  */
-static void ctxt_llcd_put(struct llog_ctxt *ctxt)
+static int 
+llcd_interpret(struct ptlrpc_request *req, void *noused, int rc)
 {
 {
-        mutex_down(&ctxt->loc_sem);
-        if (ctxt->loc_llcd != NULL) {
-                CDEBUG(D_RPCTRACE,"put llcd %p:%p\n", ctxt->loc_llcd, ctxt);
-                llcd_put(ctxt->loc_llcd);
-                ctxt->loc_llcd = NULL;
-        }
-        if (ctxt->loc_imp) {
-                class_import_put(ctxt->loc_imp);
-                ctxt->loc_imp = NULL;
-        }
-        mutex_up(&ctxt->loc_sem);
+        struct llog_canceld_ctxt *llcd = req->rq_async_args.pointer_arg[0];
+        CDEBUG(D_RPCTRACE, "Sent llcd %p (%d)\n", llcd, rc);
+        llcd_free(llcd);
+        return 0;
 }
 }
-
-/* deleted objects have a commit callback that cancels the MDS
- * log record for the deletion.  The commit callback calls this
- * function
+/**
+ * Send @llcd to remote node. Free llcd uppon completion or error. Sending
+ * is performed in async style so this function will return asap without 
+ * blocking.
  */
  */
-int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
-                         struct lov_stripe_md *lsm, int count,
-                         struct llog_cookie *cookies, int flags)
+static int llcd_send(struct llog_canceld_ctxt *llcd)
 {
 {
-        struct llog_canceld_ctxt *llcd;
-        int rc = 0;
+        char *bufs[2] = { NULL, (char *)llcd->llcd_cookies };
+        struct obd_import *import = NULL;
+        struct llog_commit_master *lcm;
+        struct ptlrpc_request *req;
+        struct llog_ctxt *ctxt;
+        int rc;
         ENTRY;
 
         ENTRY;
 
-        LASSERT(ctxt);
-
-        mutex_down(&ctxt->loc_sem);
-        llcd = ctxt->loc_llcd;
-
-        if (ctxt->loc_imp == NULL) {
-                CDEBUG(D_RPCTRACE, "no import for ctxt %p\n", ctxt);
-                GOTO(out, rc = 0);
+        ctxt = llcd->llcd_ctxt;
+        if (!ctxt) {
+                CERROR("Invalid llcd with NULL ctxt found (%p)\n", 
+                       llcd);
+                llcd_print(llcd, __FUNCTION__, __LINE__);
+                LBUG();
         }
         }
+        LASSERT_SEM_LOCKED(&ctxt->loc_sem);
 
 
-        if (count > 0 && cookies != NULL) {
-                if (llcd == NULL) {
-                        llcd = ctxt_llcd_grab(ctxt);
-                        if (llcd == NULL) {
-                                CERROR("couldn't get an llcd - dropped "LPX64
-                                       ":%x+%u\n",
-                                       cookies->lgc_lgl.lgl_oid,
-                                       cookies->lgc_lgl.lgl_ogen, 
-                                       cookies->lgc_index);
-                                GOTO(out, rc = -ENOMEM);
-                        }
-                }
+        if (llcd->llcd_cookiebytes == 0)
+                GOTO(exit, rc = 0);
+
+        lcm = llcd->llcd_lcm;
+        
+        /* 
+         * Check if we're in exit stage. Do not send llcd in
+         * this case. 
+         */
+        if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
+                GOTO(exit, rc = -ENODEV);
+
+        CDEBUG(D_RPCTRACE, "Sending llcd %p\n", llcd);
+
+        import = llcd->llcd_ctxt->loc_imp;
+        if (!import || (import == LP_POISON) || 
+            (import->imp_client == LP_POISON)) {
+                CERROR("Invalid import %p for llcd %p\n", 
+                       import, llcd);
+                GOTO(exit, rc = -ENODEV);
+        }
 
 
-                memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, 
-                       cookies, sizeof(*cookies));
-                llcd->llcd_cookiebytes += sizeof(*cookies);
-        } else {
-                if (llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW))
-                        GOTO(out, rc);
+        OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10);
+
+        /*
+         * No need to get import here as it is already done in 
+         * llog_receptor_accept().
+         */
+        req = ptlrpc_request_alloc(import, &RQF_LOG_CANCEL);
+        if (req == NULL) {
+                CERROR("Can't allocate request for sending llcd %p\n", 
+                       llcd);
+                GOTO(exit, rc = -ENOMEM);
         }
         }
+        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES,
+                             RCL_CLIENT, llcd->llcd_cookiebytes);
 
 
-        if ((llcd->llcd_size - llcd->llcd_cookiebytes) < sizeof(*cookies) ||
-            (flags & OBD_LLOG_FL_SENDNOW)) {
-                CDEBUG(D_RPCTRACE, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
-                ctxt->loc_llcd = NULL;
-                llcd_send(llcd);
+        rc = ptlrpc_request_bufs_pack(req, LUSTRE_LOG_VERSION,
+                                      OBD_LOG_CANCEL, bufs, NULL);
+        if (rc) {
+                ptlrpc_request_free(req);
+                GOTO(exit, rc);
         }
         }
-out:
-        mutex_up(&ctxt->loc_sem);
-        return rc;
-}
-EXPORT_SYMBOL(llog_obd_repl_cancel);
 
 
-int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
-{
-        int rc = 0;
-        ENTRY;
+        ptlrpc_at_set_req_timeout(req);
+        ptlrpc_request_set_replen(req);
 
 
-        if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
-                CDEBUG(D_RPCTRACE,"reverse import disconnect\n");
-                /* 
-                 * We put llcd because it is not going to sending list and
-                 * thus, its refc will not be handled. We will handle it here.
-                 */
-                ctxt_llcd_put(ctxt);
-        } else {
-                /* 
-                 * Sending cancel. This means that ctxt->loc_llcd wil be
-                 * put on sending list in llog_obd_repl_cancel() and in
-                 * this case recovery thread will take care of it refc.
-                 */
-                rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
-        }
-        RETURN(rc);
+        /* bug 5515 */
+        req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
+        req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
+        req->rq_interpret_reply = llcd_interpret;
+        req->rq_async_args.pointer_arg[0] = llcd;
+        rc = ptlrpc_set_add_new_req(&lcm->lcm_pc, req);
+        if (rc)
+                GOTO(exit, rc);
+
+        RETURN(0);
+exit:
+        CDEBUG(D_RPCTRACE, "Refused llcd %p\n", llcd);
+        llcd_free(llcd);
+        return rc;
 }
 }
-EXPORT_SYMBOL(llog_obd_repl_sync);
 
 
-static inline void stop_log_commit(struct llog_commit_master *lcm,
-                                   struct llog_commit_daemon *lcd,
-                                   int rc)
+/**
+ * Attach @llcd to @ctxt. Establish llcd vs. ctxt reserve connection
+ * so hat they can refer each other.
+ */
+static int
+llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd)
 {
 {
-        CERROR("error preparing commit: rc %d\n", rc);
+        struct llog_commit_master *lcm;
 
 
-        spin_lock(&lcm->lcm_llcd_lock);
-        list_splice_init(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
-        spin_unlock(&lcm->lcm_llcd_lock);
+        LASSERT(ctxt != NULL && llcd != NULL);
+        LASSERT_SEM_LOCKED(&ctxt->loc_sem);
+        LASSERT(ctxt->loc_llcd == NULL);
+        lcm = ctxt->loc_lcm;
+        atomic_inc(&lcm->lcm_count);
+        CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p (%d)\n",
+               llcd, ctxt, atomic_read(&lcm->lcm_count));
+        llcd->llcd_ctxt = llog_ctxt_get(ctxt);
+        llcd->llcd_lcm = ctxt->loc_lcm;
+        ctxt->loc_llcd = llcd;
+        return 0;
 }
 
 }
 
-static int log_commit_thread(void *arg)
+/**
+ * Opposite to llcd_attach(). Detaches llcd from its @ctxt. This makes
+ * sure that this llcd will not be found another time we try to cancel.
+ */
+static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt)
 {
 {
-        struct llog_commit_master *lcm = arg;
-        struct llog_commit_daemon *lcd;
-        struct llog_canceld_ctxt *llcd, *n;
-        struct obd_import *import = NULL;
-        ENTRY;
-
-        OBD_ALLOC(lcd, sizeof(*lcd));
-        if (lcd == NULL)
-                RETURN(-ENOMEM);
-
-        spin_lock(&lcm->lcm_thread_lock);
-        THREAD_NAME(cfs_curproc_comm(), CFS_CURPROC_COMM_MAX - 1,
-                    "ll_log_comt_%02d", atomic_read(&lcm->lcm_thread_total));
-        atomic_inc(&lcm->lcm_thread_total);
-        spin_unlock(&lcm->lcm_thread_lock);
-
-        ptlrpc_daemonize(cfs_curproc_comm()); /* thread never needs to do IO */
-
-        CFS_INIT_LIST_HEAD(&lcd->lcd_lcm_list);
-        CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list);
-        lcd->lcd_lcm = lcm;
-
-        CDEBUG(D_HA, "%s started\n", cfs_curproc_comm());
-        do {
-                struct ptlrpc_request *request;
-                struct list_head *sending_list;
-                int rc = 0;
-
-                if (import)
-                        class_import_put(import);
-                import = NULL;
-
-                /* If we do not have enough pages available, allocate some */
-                while (atomic_read(&lcm->lcm_llcd_numfree) <
-                       lcm->lcm_llcd_minfree) {
-                        if (llcd_alloc(lcm) < 0)
-                                break;
-                }
-
-                spin_lock(&lcm->lcm_thread_lock);
-                atomic_inc(&lcm->lcm_thread_numidle);
-                list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_idle);
-                spin_unlock(&lcm->lcm_thread_lock);
-
-                wait_event_interruptible(lcm->lcm_waitq,
-                                         !list_empty(&lcm->lcm_llcd_pending) ||
-                                         lcm->lcm_flags & LLOG_LCM_FL_EXIT);
-
-                /* If we are the last available thread, start a new one in case
-                 * we get blocked on an RPC (nobody else will start a new one)*/
-                spin_lock(&lcm->lcm_thread_lock);
-                atomic_dec(&lcm->lcm_thread_numidle);
-                list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy);
-                spin_unlock(&lcm->lcm_thread_lock);
-
-                sending_list = &lcm->lcm_llcd_pending;
-        resend:
-                if (import)
-                        class_import_put(import);
-                import = NULL;
-                if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) {
-                        lcm->lcm_llcd_maxfree = 0;
-                        lcm->lcm_llcd_minfree = 0;
-                        lcm->lcm_thread_max = 0;
-
-                        if (list_empty(&lcm->lcm_llcd_pending) ||
-                            lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE)
-                                break;
-                }
+        struct llog_commit_master *lcm;
+        struct llog_canceld_ctxt *llcd;
 
 
-                if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
-                    atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
-                        rc = llog_start_commit_thread(lcm);
-                        if (rc < 0)
-                                CERROR("error starting thread: rc %d\n", rc);
-                }
+        LASSERT(ctxt != NULL);
+        LASSERT_SEM_LOCKED(&ctxt->loc_sem);
 
 
-                /* Move all of the pending cancels from the same OST off of
-                 * the list, so we don't get multiple threads blocked and/or
-                 * doing upcalls on the same OST in case of failure. */
-                spin_lock(&lcm->lcm_llcd_lock);
-                if (!list_empty(sending_list)) {
-                        list_move_tail(sending_list->next,
-                                       &lcd->lcd_llcd_list);
-                        llcd = list_entry(lcd->lcd_llcd_list.next,
-                                          typeof(*llcd), llcd_list);
-                        LASSERT(llcd->llcd_lcm == lcm);
-                        import = llcd->llcd_ctxt->loc_imp;
-                        if (import)
-                                class_import_get(import);
-                }
-                list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {
-                        LASSERT(llcd->llcd_lcm == lcm);
-                        if (import == llcd->llcd_ctxt->loc_imp)
-                                list_move_tail(&llcd->llcd_list,
-                                               &lcd->lcd_llcd_list);
-                }
-                if (sending_list != &lcm->lcm_llcd_resend) {
-                        list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,
-                                                 llcd_list) {
-                                LASSERT(llcd->llcd_lcm == lcm);
-                                if (import == llcd->llcd_ctxt->loc_imp)
-                                        list_move_tail(&llcd->llcd_list,
-                                                       &lcd->lcd_llcd_list);
-                        }
-                }
-                spin_unlock(&lcm->lcm_llcd_lock);
-
-                /* We are the only one manipulating our local list - no lock */
-                list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
-                        char *bufs[2] = { NULL, (char *)llcd->llcd_cookies };
-
-                        list_del(&llcd->llcd_list);
-                        if (llcd->llcd_cookiebytes == 0) {
-                                CDEBUG(D_RPCTRACE, "put empty llcd %p:%p\n",
-                                       llcd, llcd->llcd_ctxt);
-                                llcd_put(llcd);
-                                continue;
-                        }
-
-                        mutex_down(&llcd->llcd_ctxt->loc_sem);
-                        if (llcd->llcd_ctxt->loc_imp == NULL) {
-                                mutex_up(&llcd->llcd_ctxt->loc_sem);
-                                CWARN("import will be destroyed, put "
-                                      "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
-                                llcd_put(llcd);
-                                continue;
-                        }
-                        mutex_up(&llcd->llcd_ctxt->loc_sem);
-
-                        if (!import || (import == LP_POISON) ||
-                            (import->imp_client == LP_POISON)) {
-                                CERROR("No import %p (llcd=%p, ctxt=%p)\n",
-                                       import, llcd, llcd->llcd_ctxt);
-                                llcd_put(llcd);
-                                continue;
-                        }
-
-                        OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10);
-
-                        request = ptlrpc_request_alloc(import, &RQF_LOG_CANCEL);
-                        if (request == NULL) {
-                                rc = -ENOMEM;
-                                stop_log_commit(lcm, lcd, rc);
-                                break;
-                        }
-
-                        req_capsule_set_size(&request->rq_pill, &RMF_LOGCOOKIES,
-                                             RCL_CLIENT,llcd->llcd_cookiebytes);
-
-                        rc = ptlrpc_request_bufs_pack(request,
-                                                      LUSTRE_LOG_VERSION,
-                                                      OBD_LOG_CANCEL, bufs,
-                                                      NULL);
-                        if (rc) {
-                                ptlrpc_request_free(request);
-                                stop_log_commit(lcm, lcd, rc);
-                                break;
-                        }
-
-                        /* bug 5515 */
-                        request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
-                        request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
-                        ptlrpc_at_set_req_timeout(request);
-
-                        ptlrpc_request_set_replen(request);
-                        mutex_down(&llcd->llcd_ctxt->loc_sem);
-                        if (llcd->llcd_ctxt->loc_imp == NULL) {
-                                mutex_up(&llcd->llcd_ctxt->loc_sem);
-                                CWARN("import will be destroyed, put "
-                                      "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
-                                llcd_put(llcd);
-                                ptlrpc_req_finished(request);
-                                continue;
-                        }
-                        mutex_up(&llcd->llcd_ctxt->loc_sem);
-                        rc = ptlrpc_queue_wait(request);
-                        ptlrpc_req_finished(request);
-
-                        /* If the RPC failed, we put this and the remaining
-                         * messages onto the resend list for another time. */
-                        if (rc == 0) {
-                                llcd_put(llcd);
-                                continue;
-                        }
-
-                        CERROR("commit %p:%p drop %d cookies: rc %d\n",
-                               llcd, llcd->llcd_ctxt,
-                               (int)(llcd->llcd_cookiebytes /
-                                     sizeof(*llcd->llcd_cookies)), rc);
-                        llcd_put(llcd);
-                }
+        llcd = ctxt->loc_llcd;
+        if (!llcd)
+                return NULL;
 
 
-                if (rc == 0) {
-                        sending_list = &lcm->lcm_llcd_resend;
-                        if (!list_empty(sending_list))
-                                goto resend;
-                }
-        } while(1);
+        lcm = ctxt->loc_lcm;
+        if (atomic_read(&lcm->lcm_count) == 0) {
+                CERROR("Invalid detach occured %p:%p\n", ctxt, llcd);
+                llcd_print(llcd, __FUNCTION__, __LINE__);
+                LBUG();
+        }
+        atomic_dec(&lcm->lcm_count);
+        ctxt->loc_llcd = NULL;
+        
+        CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p (%d)\n", 
+               llcd, ctxt, atomic_read(&lcm->lcm_count));
 
 
-        if (import)
-                class_import_put(import);
+        llog_ctxt_put(ctxt);
+        return llcd;
+}
 
 
-        /* If we are force exiting, just drop all of the cookies. */
-        if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) {
-                spin_lock(&lcm->lcm_llcd_lock);
-                list_splice_init(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list);
-                list_splice_init(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list);
-                list_splice_init(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list);
-                spin_unlock(&lcm->lcm_llcd_lock);
+/**
+ * Return @llcd cached in @ctxt. Allocate new one if required. Attach it
+ * to ctxt so that it may be used for gathering cookies and sending.
+ */
+static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt)
+{
+        struct llog_canceld_ctxt *llcd;
 
 
-                list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list)
-                        llcd_put(llcd);
+        llcd = llcd_alloc();
+        if (!llcd) {
+                CERROR("Couldn't alloc an llcd for ctxt %p\n", ctxt);
+                return NULL;
         }
         }
+        llcd_attach(ctxt, llcd);
+        return llcd;
+}
 
 
-        spin_lock(&lcm->lcm_thread_lock);
-        list_del(&lcd->lcd_lcm_list);
-        spin_unlock(&lcm->lcm_thread_lock);
-        OBD_FREE(lcd, sizeof(*lcd));
+/**
+ * Deatch llcd from its @ctxt. Free llcd.
+ */
+static void llcd_put(struct llog_ctxt *ctxt)
+{
+        struct llog_canceld_ctxt *llcd;
 
 
-        CDEBUG(D_HA, "%s exiting\n", cfs_curproc_comm());
+        llcd = llcd_detach(ctxt);
+        if (llcd)
+                llcd_free(llcd);
+}
 
 
-        spin_lock(&lcm->lcm_thread_lock);
-        atomic_dec(&lcm->lcm_thread_total);
-        spin_unlock(&lcm->lcm_thread_lock);
-        cfs_waitq_signal(&lcm->lcm_waitq);
+/**
+ * Detach llcd from its @ctxt so that nobody will find it with try to
+ * re-use. Send llcd to remote node.
+ */
+static int llcd_push(struct llog_ctxt *ctxt)
+{
+        struct llog_canceld_ctxt *llcd;
+        int rc;
 
 
-        return 0;
+        /*
+         * Make sure that this llcd will not be sent again as we detach 
+         * it from ctxt.
+         */
+        llcd = llcd_detach(ctxt);
+        if (!llcd) {
+                CERROR("Invalid detached llcd found %p\n", llcd);
+                llcd_print(llcd, __FUNCTION__, __LINE__);
+                LBUG();
+        }
+        
+        rc = llcd_send(llcd);
+        if (rc)
+                CERROR("Couldn't send llcd %p (%d)\n", llcd, rc);
+        return rc;
 }
 
 }
 
-int llog_start_commit_thread(struct llog_commit_master *lcm)
+static atomic_t llog_tcount = ATOMIC_INIT(0);
+
+/**
+ * Start recovery thread which actually deals llcd sending. This
+ * is all ptlrpc standard thread based so there is not much of work
+ * to do.
+ */
+int llog_recov_thread_start(struct llog_commit_master *lcm)
 {
         int rc;
         ENTRY;
 
 {
         int rc;
         ENTRY;
 
-        if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max)
-                RETURN(0);
-
-        rc = cfs_kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES);
-        if (rc < 0) {
-                CERROR("error starting thread #%d: %d\n",
-                       atomic_read(&lcm->lcm_thread_total), rc);
+        rc = ptlrpcd_start(lcm->lcm_name, &lcm->lcm_pc);
+        if (rc) {
+                CERROR("Error %d while starting recovery thread %s\n", 
+                       rc, lcm->lcm_name);
                 RETURN(rc);
         }
                 RETURN(rc);
         }
+        lcm->lcm_set = lcm->lcm_pc.pc_set;
+        atomic_inc(&llog_tcount);
 
 
-        RETURN(0);
-}
-EXPORT_SYMBOL(llog_start_commit_thread);
-
-static struct llog_process_args {
-        struct semaphore         llpa_sem;
-        struct llog_ctxt        *llpa_ctxt;
-        void                    *llpa_cb;
-        void                    *llpa_arg;
-} llpa;
-
-int llog_init_commit_master(struct llog_commit_master *lcm)
-{
-        CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy);
-        CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle);
-        spin_lock_init(&lcm->lcm_thread_lock);
-        atomic_set(&lcm->lcm_thread_numidle, 0);
-        cfs_waitq_init(&lcm->lcm_waitq);
-        CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_pending);
-        CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_resend);
-        CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_free);
-        spin_lock_init(&lcm->lcm_llcd_lock);
-        atomic_set(&lcm->lcm_llcd_numfree, 0);
-        lcm->lcm_llcd_minfree = 0;
-        lcm->lcm_thread_max = 5;
-        /* FIXME initialize semaphore for llog_process_args */
-        sema_init(&llpa.llpa_sem, 1);
-        return 0;
+        RETURN(rc);
 }
 }
-EXPORT_SYMBOL(llog_init_commit_master);
+EXPORT_SYMBOL(llog_recov_thread_start);
 
 
-int llog_cleanup_commit_master(struct llog_commit_master *lcm,
-                               int force)
+/**
+ * Stop recovery thread. Complement to llog_recov_thread_start().
+ */
+void llog_recov_thread_stop(struct llog_commit_master *lcm, int force)
 {
 {
-        lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
-        if (force)
-                lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE;
-        cfs_waitq_signal(&lcm->lcm_waitq);
-
-        wait_event_interruptible(lcm->lcm_waitq,
-                                 atomic_read(&lcm->lcm_thread_total) == 0);
-        return 0;
+        ENTRY;
+        
+        /**
+         * Let all know that we're stopping. This will also make 
+         * llcd_send() refuse any new llcds.
+         */
+        set_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags);
+
+        /**
+         * Stop processing thread. No new rpcs will be accepted for
+         * for processing now.
+         */
+        ptlrpcd_stop(&lcm->lcm_pc, force);
+        
+        /*
+         * No llcds on this @lcm should left.
+         */
+        LASSERTF(atomic_read(&lcm->lcm_count) == 0, 
+                 "Busy llcds found on lcm %p - (%d)\n", 
+                 lcm, atomic_read(&lcm->lcm_count));
+        EXIT;
 }
 }
-EXPORT_SYMBOL(llog_cleanup_commit_master);
+EXPORT_SYMBOL(llog_recov_thread_stop);
 
 
-static int log_process_thread(void *args)
+/**
+ * Initialize commit master structure and start recovery thread on it.
+ */
+struct llog_commit_master *llog_recov_thread_init(char *name)
 {
 {
-        struct llog_process_args *data = args;
-        struct llog_ctxt *ctxt = data->llpa_ctxt;
-        void   *cb = data->llpa_cb;
-        struct llog_logid logid = *(struct llog_logid *)(data->llpa_arg);
-        struct llog_handle *llh = NULL;
+        struct llog_commit_master *lcm;
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
-        mutex_up(&data->llpa_sem);
-        ptlrpc_daemonize("llog_process");     /* thread does IO to log files */
+        OBD_ALLOC_PTR(lcm);
+        if (!lcm)
+                RETURN(NULL);
 
 
-        rc = llog_create(ctxt, &llh, &logid, NULL);
+        /*
+         * Try to create threads with unique names and user id.
+         */
+        snprintf(lcm->lcm_name, sizeof(lcm->lcm_name), 
+                 "ll_log_commit_%s_%02d", name, 
+                 atomic_read(&llog_tcount));
+
+        strncpy(lcm->lcm_name, name, sizeof(lcm->lcm_name));
+        atomic_set(&lcm->lcm_count, 0);
+        rc = llog_recov_thread_start(lcm);
         if (rc) {
         if (rc) {
-                CERROR("llog_create failed %d\n", rc);
+                CERROR("Can't start commit thread, rc %d\n", rc);
                 GOTO(out, rc);
         }
                 GOTO(out, rc);
         }
-        rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
-        if (rc) {
-                CERROR("llog_init_handle failed %d\n", rc);
-                GOTO(release_llh, rc);
-        }
-
-        if (cb) {
-                rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
-                if (rc != LLOG_PROC_BREAK)
-                        CERROR("llog_cat_process failed %d\n", rc);
-        } else {
-                CWARN("no callback function for recovery\n");
-        }
-
-        CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
-               ctxt->loc_llcd, ctxt);
-        llog_sync(ctxt, NULL);
-
-release_llh:
-        rc = llog_cat_put(llh);
-        if (rc)
-                CERROR("llog_cat_put failed %d\n", rc);
+        RETURN(lcm);
 out:
 out:
-        llog_ctxt_put(ctxt);
-        RETURN(rc);
+        OBD_FREE_PTR(lcm);
+        return NULL;
+}
+EXPORT_SYMBOL(llog_recov_thread_init);
+
+/**
+ * Finalize commit master and its recovery thread.
+ */
+void llog_recov_thread_fini(struct llog_commit_master *lcm, int force)
+{
+        ENTRY;
+        llog_recov_thread_stop(lcm, force);
+        OBD_FREE_PTR(lcm);
+        EXIT;
 }
 }
+EXPORT_SYMBOL(llog_recov_thread_fini);
 
 
-static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
+static int llog_obd_repl_generic(struct llog_ctxt *ctxt, 
+                                 void *handle, void *arg)
 {
         struct obd_device *obd = ctxt->loc_obd;
 {
         struct obd_device *obd = ctxt->loc_obd;
+        struct llog_process_cat_args *lpca;
         int rc;
         ENTRY;
 
         if (obd->obd_stopping)
                 RETURN(-ENODEV);
 
         int rc;
         ENTRY;
 
         if (obd->obd_stopping)
                 RETURN(-ENODEV);
 
-        mutex_down(&llpa.llpa_sem);
-        llpa.llpa_cb = handle;
-        llpa.llpa_arg = arg;
-        llpa.llpa_ctxt = llog_ctxt_get(ctxt);
-        if (!llpa.llpa_ctxt) {
-                up(&llpa.llpa_sem);
+        /*
+         * This will be balanced in llog_cat_process_thread()
+         */
+        OBD_ALLOC_PTR(lpca);
+        if (!lpca)
+                RETURN(-ENOMEM);
+
+        lpca->lpca_cb = handle;
+        lpca->lpca_arg = arg;
+
+        /*
+         * This will be balanced in llog_cat_process_thread()
+         */
+        lpca->lpca_ctxt = llog_ctxt_get(ctxt);
+        if (!lpca->lpca_ctxt) {
+                OBD_FREE_PTR(lpca);
                 RETURN(-ENODEV);
         }
                 RETURN(-ENODEV);
         }
-        rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
+        rc = cfs_kernel_thread(llog_cat_process_thread, lpca, 
+                               CLONE_VM | CLONE_FILES);
         if (rc < 0) {
         if (rc < 0) {
+                CERROR("Error starting llog_cat_process_thread(): %d\n", rc);
+                OBD_FREE_PTR(lpca);
                 llog_ctxt_put(ctxt);
                 llog_ctxt_put(ctxt);
-                CERROR("error starting log_process_thread: %d\n", rc);
         } else {
         } else {
-                CDEBUG(D_HA, "log_process_thread: %d\n", rc);
+                CDEBUG(D_HA, "Started llog_cat_process_thread(): %d\n", rc);
                 rc = 0;
         }
 
         RETURN(rc);
 }
 
                 rc = 0;
         }
 
         RETURN(rc);
 }
 
-int llog_repl_connect(struct llog_ctxt *ctxt, int count,
-                      struct llog_logid *logid, struct llog_gen *gen,
-                      struct obd_uuid *uuid)
+int llog_obd_repl_connect(struct llog_ctxt *ctxt, int count,
+                          struct llog_logid *logid, struct llog_gen *gen,
+                          struct obd_uuid *uuid)
 {
         struct llog_canceld_ctxt *llcd;
         int rc;
         ENTRY;
 
 {
         struct llog_canceld_ctxt *llcd;
         int rc;
         ENTRY;
 
-        /* send back llcd before recovery from llog */
-        if (ctxt->loc_llcd != NULL) {
-                CWARN("llcd %p:%p not empty\n", ctxt->loc_llcd, ctxt);
+        mutex_down(&ctxt->loc_sem);
+
+        /* 
+         * Send back cached llcd before recovery from llog if we have any. 
+         */
+        if (ctxt->loc_llcd) {
+                CWARN("Llcd %p:%p is not empty\n", ctxt->loc_llcd, ctxt);
+                mutex_up(&ctxt->loc_sem);
                 llog_sync(ctxt, NULL);
                 llog_sync(ctxt, NULL);
+                mutex_down(&ctxt->loc_sem);
         }
 
         }
 
-        mutex_down(&ctxt->loc_sem);
-        ctxt->loc_gen = *gen;
-        llcd = ctxt_llcd_grab(ctxt);
-        if (llcd == NULL) {
-                CERROR("couldn't get an llcd\n");
+        llcd = llcd_get(ctxt);
+        if (!llcd) {
                 mutex_up(&ctxt->loc_sem);
                 RETURN(-ENOMEM);
         }
                 mutex_up(&ctxt->loc_sem);
                 RETURN(-ENOMEM);
         }
-        mutex_up(&ctxt->loc_sem);
 
 
-        rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid);
+        ctxt->loc_gen = *gen;
+
+        rc = llog_obd_repl_generic(ctxt, ctxt->llog_proc_cb, logid);
         if (rc != 0) {
         if (rc != 0) {
-                ctxt_llcd_put(ctxt);
-                CERROR("error recovery process: %d\n", rc);
+                llcd_put(ctxt);
+                CERROR("Error recovery process: %d\n", rc);
+        }
+        mutex_up(&ctxt->loc_sem);
+        RETURN(rc);
+}
+EXPORT_SYMBOL(llog_obd_repl_connect);
+
+/** 
+ * Deleted objects have a commit callback that cancels the MDS
+ * log record for the deletion. The commit callback calls this
+ * function.
+ */
+int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
+                         struct lov_stripe_md *lsm, int count,
+                         struct llog_cookie *cookies, int flags)
+{
+        struct llog_canceld_ctxt *llcd;
+        int rc = 0;
+        ENTRY;
+
+        LASSERT(ctxt != NULL);
+
+        mutex_down(&ctxt->loc_sem);
+        
+        /*
+         * Let's check if we have all structures alive. We also check for
+         * possible shutdown. Do nothing if we're stopping.
+         */
+        if (ctxt->loc_imp == NULL) {
+                CDEBUG(D_RPCTRACE, "No import for ctxt %p\n", ctxt);
+                GOTO(out, rc = -ENODEV);
+        }
+
+        if (ctxt->loc_obd->obd_stopping) {
+                CDEBUG(D_RPCTRACE, "Obd is stopping for ctxt %p\n", ctxt);
+                GOTO(out, rc = -ENODEV);
+        }
+
+        if (test_bit(LLOG_LCM_FL_EXIT, &ctxt->loc_lcm->lcm_flags)) {
+                CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n", 
+                       ctxt);
+                GOTO(out, rc = -ENODEV);
+        }
+
+        llcd = ctxt->loc_llcd;
+
+        if (count > 0 && cookies != NULL) {
+                /*
+                 * Get new llcd from ctxt if required. 
+                 */
+                if (!llcd) {
+                        llcd = llcd_get(ctxt);
+                        if (!llcd)
+                                GOTO(out, rc = -ENOMEM);
+                }
+
+                /*
+                 * Llcd does not have enough room for @cookies. Let's push 
+                 * it out and allocate new one. 
+                 */
+                if (!llcd_fit(llcd, cookies)) {
+                        rc = llcd_push(ctxt);
+                        if (rc)
+                                GOTO(out, rc);
+                        llcd = llcd_get(ctxt);
+                        if (!llcd)
+                                GOTO(out, rc = -ENOMEM);
+                }
+
+                /*
+                 * Copy cookies to @llcd, no matter old or new allocated one.
+                 */
+                llcd_copy(llcd, cookies);
+        }
+
+        /*
+         * Let's check if we need to send copied @cookies asap. If yes - do it.
+         */
+        if (llcd && (flags & OBD_LLOG_FL_SENDNOW)) {
+                rc = llcd_push(ctxt);
+                if (rc)
+                        GOTO(out, rc);
+        }
+        EXIT;
+out:
+        mutex_up(&ctxt->loc_sem);
+        return rc;
+}
+EXPORT_SYMBOL(llog_obd_repl_cancel);
+
+int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
+{
+        int rc = 0;
+        ENTRY;
+
+        if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
+                CDEBUG(D_RPCTRACE, "Reverse import disconnect\n");
+                /*
+                 * Check for llcd which might be left attached to @ctxt.
+                 * Let's kill it.
+                 */
+                mutex_down(&ctxt->loc_sem);
+                llcd_put(ctxt);
+                mutex_up(&ctxt->loc_sem);
+        } else {
+                rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
         }
         RETURN(rc);
 }
         }
         RETURN(rc);
 }
-EXPORT_SYMBOL(llog_repl_connect);
+EXPORT_SYMBOL(llog_obd_repl_sync);
 
 #else /* !__KERNEL__ */
 
 
 #else /* !__KERNEL__ */
 
@@ -686,3 +646,46 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
         return 0;
 }
 #endif
         return 0;
 }
 #endif
+
+/**
+ * Module init time fucntion. Initializes slab for llcd objects.
+ */
+int llog_recov_init(void)
+{
+        int llcd_size;
+
+        llcd_size = CFS_PAGE_SIZE - 
+                lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
+        llcd_size += offsetof(struct llog_canceld_ctxt, llcd_cookies);
+        llcd_cache = cfs_mem_cache_create("llcd_cache", llcd_size, 0, 0);
+        if (!llcd_cache) {
+                CERROR("Error allocating llcd cache\n");
+                return -ENOMEM;
+        }
+        return 0;
+}
+
+/**
+ * Module fini time fucntion. Releases slab for llcd objects.
+ */
+void llog_recov_fini(void)
+{
+        int count;
+
+        /*
+         * Kill llcd cache when thread is stopped and we're sure no 
+         * llcd in use left.
+         */
+        if (llcd_cache) {
+                /*
+                 * In 2.6.22 cfs_mem_cache_destroy() will not return error
+                 * for busy resources. Let's check it another way.
+                 */
+                count = atomic_read(&llcd_count);
+                LASSERTF(count == 0, "Can't destroy llcd cache! Number of "
+                         "busy llcds: %d\n", count);
+                cfs_mem_cache_destroy(llcd_cache);
+                llcd_cache = NULL;
+        }
+}
+