Whamcloud - gitweb
LU-2675 cleanup: remove unused recov_thread.c and llog code
authorJohn L. Hammond <john.hammond@intel.com>
Thu, 28 Feb 2013 09:14:30 +0000 (03:14 -0600)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 22 Mar 2013 18:26:37 +0000 (14:26 -0400)
Remove recov_thread.c and the following exported functions:
  llog_cat_process_thread
  llog_obd_origin_add
  llog_catalog_list
  llog_receptor_accept
  llog_origin_connect
  llog_handle_connect
  llog_obd_repl_cancel
  llog_obd_repl_sync
  llog_obd_repl_connect
  llog_get_cat_list
  llog_put_cat_list
  llog_recov_thread_start
  llog_recov_thread_stop

Signed-off-by: John L. Hammond <john.hammond@intel.com>
Change-Id: I620c641646294fcf697f1e2d4d1f389d71bb3ddd
Reviewed-on: http://review.whamcloud.com/5190
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Liu Xuezhao <xuezhao.liu@emc.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
15 files changed:
lustre/include/lustre_log.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_internal.h
lustre/obdclass/llog_ioctl.c
lustre/obdclass/llog_lvfs.c
lustre/obdclass/llog_obd.c
lustre/ptlrpc/Makefile.in
lustre/ptlrpc/autoMakefile.am
lustre/ptlrpc/llog_net.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/recov_thread.c [deleted file]

index 17c8434..2419161 100644 (file)
@@ -169,21 +169,6 @@ struct llog_process_cat_data {
         int                  lpcd_last_idx;
 };
 
-struct llog_process_cat_args {
-        /**
-         * Llog context used in recovery thread on OST (recov_thread.c)
-         */
-        struct llog_ctxt    *lpca_ctxt;
-        /**
-         * Llog callback used in recovery thread on OST (recov_thread.c)
-         */
-        void                *lpca_cb;
-        /**
-         * Data pointer for llog callback.
-         */
-        void                *lpca_arg;
-};
-
 int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle);
 int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle,
                     struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
@@ -202,7 +187,6 @@ int llog_cat_process_or_fork(const struct lu_env *env,
                             void *data, int startcat, int startidx, bool fork);
 int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
                     llog_cb_t cb, void *data, int startcat, int startidx);
-int llog_cat_process_thread(void *data);
 int llog_cat_reverse_process(const struct lu_env *env,
                             struct llog_handle *cat_llh, llog_cb_t cb,
                             void *data);
@@ -222,9 +206,6 @@ int llog_obd_add(const struct lu_env *env, struct llog_ctxt *ctxt,
 int llog_cancel(const struct lu_env *env, struct llog_ctxt *ctxt,
                struct lov_stripe_md *lsm, int count,
                struct llog_cookie *cookies, int flags);
-int llog_obd_origin_add(const struct lu_env *env, struct llog_ctxt *ctxt,
-                       struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
-                       struct llog_cookie *logcookies, int numcookies);
 
 int obd_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
                   struct obd_device *disk_obd, int *idx);
@@ -234,26 +215,9 @@ int obd_llog_finish(struct obd_device *obd, int count);
 /* llog_ioctl.c */
 int llog_ioctl(const struct lu_env *env, struct llog_ctxt *ctxt, int cmd,
               struct obd_ioctl_data *data);
-int llog_catalog_list(struct obd_device *obd, int count,
-                      struct obd_ioctl_data *data);
 
 /* llog_net.c */
 int llog_initiator_connect(struct llog_ctxt *ctxt);
-int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp);
-int llog_origin_connect(struct llog_ctxt *ctxt,
-                        struct llog_logid *logid, struct llog_gen *gen,
-                        struct obd_uuid *uuid);
-int llog_handle_connect(struct ptlrpc_request *req);
-
-/* recov_thread.c */
-int llog_obd_repl_cancel(const struct lu_env *env, struct llog_ctxt *ctxt,
-                        struct lov_stripe_md *lsm, int count,
-                        struct llog_cookie *cookies, int flags);
-int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp,
-                      int flags);
-int llog_obd_repl_connect(struct llog_ctxt *ctxt,
-                          struct llog_logid *logid, struct llog_gen *gen,
-                          struct obd_uuid *uuid);
 
 struct llog_operations {
        int (*lop_destroy)(const struct lu_env *env,
@@ -357,12 +321,6 @@ struct llog_handle {
 
 /* llog_lvfs.c */
 extern struct llog_operations llog_lvfs_ops;
-int llog_get_cat_list(struct obd_device *disk_obd,
-                      char *name, int idx, int count,
-                      struct llog_catid *idarray);
-
-int llog_put_cat_list(struct obd_device *disk_obd,
-                      char *name, int idx, int count, struct llog_catid *idarray);
 
 /* llog_osd.c */
 extern struct llog_operations llog_osd_ops;
@@ -378,7 +336,6 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
 
 struct llog_ctxt {
         int                      loc_idx; /* my index the obd array of ctxt's */
-        struct llog_gen          loc_gen;
         struct obd_device       *loc_obd; /* points back to the containing obd*/
         struct obd_llog_group   *loc_olg; /* group containing that ctxt */
         struct obd_export       *loc_exp; /* parent "disk" export (e.g. MDS) */
@@ -386,125 +343,12 @@ struct llog_ctxt {
                                              pointing import */
         struct llog_operations  *loc_logops;
         struct llog_handle      *loc_handle;
-        struct llog_commit_master *loc_lcm;
-        struct llog_canceld_ctxt *loc_llcd;
-       struct mutex             loc_mutex; /* protect loc_llcd and loc_imp */
+       struct mutex             loc_mutex; /* protect loc_imp */
         cfs_atomic_t             loc_refcount;
-        void                    *llog_proc_cb;
         long                     loc_flags; /* flags, see above defines */
        struct dt_object        *loc_dir;
 };
 
-#define LCM_NAME_SIZE 64
-
-struct llog_commit_master {
-        /**
-         * Thread control flags (start, stop, etc.)
-         */
-        long                       lcm_flags;
-        /**
-         * Number of llcds onthis lcm.
-         */
-        cfs_atomic_t               lcm_count;
-        /**
-         * The refcount for lcm
-         */
-         cfs_atomic_t              lcm_refcount;
-        /**
-         * Thread control structure. Used for control commit thread.
-         */
-        struct ptlrpcd_ctl         lcm_pc;
-        /**
-         * Lock protecting list of llcds.
-         */
-       spinlock_t                 lcm_lock;
-        /**
-         * Llcds in flight for debugging purposes.
-         */
-        cfs_list_t                 lcm_llcds;
-        /**
-         * Commit thread name buffer. Only used for thread start.
-         */
-        char                       lcm_name[LCM_NAME_SIZE];
-};
-
-static inline struct llog_commit_master
-*lcm_get(struct llog_commit_master *lcm)
-{
-        cfs_atomic_inc(&lcm->lcm_refcount);
-        return lcm;
-}
-
-static inline void
-lcm_put(struct llog_commit_master *lcm)
-{
-        LASSERT_ATOMIC_POS(&lcm->lcm_refcount);
-        if (cfs_atomic_dec_and_test(&lcm->lcm_refcount))
-                OBD_FREE_PTR(lcm);
-}
-
-struct llog_canceld_ctxt {
-        /**
-         * Llog context this llcd is attached to. Used for accessing
-         * ->loc_import and others in process of canceling cookies
-         * gathered in this llcd.
-         */
-        struct llog_ctxt          *llcd_ctxt;
-        /**
-         * Cancel thread control stucture pointer. Used for accessing
-         * it to see if should stop processing and other needs.
-         */
-        struct llog_commit_master *llcd_lcm;
-        /**
-         * Maximal llcd size. Used in calculations on how much of room
-         * left in llcd to cookie comming cookies.
-         */
-        int                        llcd_size;
-        /**
-         * Link to lcm llcds list.
-         */
-        cfs_list_t                 llcd_list;
-        /**
-         * Current llcd size while gathering cookies. This should not be
-         * more than ->llcd_size. Used for determining if we need to
-         * send this llcd (if full) and allocate new one. This is also
-         * used for copying new cookie at the end of buffer.
-         */
-        int                        llcd_cookiebytes;
-        /**
-         * Pointer to the start of cookies buffer.
-         */
-        struct llog_cookie         llcd_cookies[0];
-};
-
-/* ptlrpc/recov_thread.c */
-extern struct llog_commit_master *llog_recov_thread_init(char *name);
-extern void llog_recov_thread_fini(struct llog_commit_master *lcm,
-                                   int force);
-extern int llog_recov_thread_start(struct llog_commit_master *lcm);
-extern void llog_recov_thread_stop(struct llog_commit_master *lcm,
-                                    int force);
-
-static inline void llog_gen_init(struct llog_ctxt *ctxt)
-{
-        struct obd_device *obd = ctxt->loc_exp->exp_obd;
-
-        LASSERTF(obd->u.obt.obt_magic == OBT_MAGIC,
-                 "%s: wrong obt magic %#x\n",
-                 obd->obd_name, obd->u.obt.obt_magic);
-        ctxt->loc_gen.mnt_cnt = obd->u.obt.obt_mount_count;
-        ctxt->loc_gen.conn_cnt++;
-}
-
-static inline int llog_gen_lt(struct llog_gen a, struct llog_gen b)
-{
-        if (a.mnt_cnt < b.mnt_cnt)
-                return 1;
-        if (a.mnt_cnt > b.mnt_cnt)
-                return 0;
-        return(a.conn_cnt < b.conn_cnt ? 1 : 0);
-}
-
 #define LLOG_PROC_BREAK 0x0001
 #define LLOG_DEL_RECORD 0x0002
 
@@ -562,20 +406,6 @@ static inline void llog_group_init(struct obd_llog_group *olg, int group)
        olg->olg_seq = group;
 }
 
-static inline void llog_group_set_export(struct obd_llog_group *olg,
-                                         struct obd_export *exp)
-{
-       LASSERT(exp != NULL);
-
-       spin_lock(&olg->olg_lock);
-       if (olg->olg_exp != NULL && olg->olg_exp != exp)
-               CWARN("%s: export for group %d is changed: 0x%p -> 0x%p\n",
-                     exp->exp_obd->obd_name, olg->olg_seq,
-                     olg->olg_exp, exp);
-       olg->olg_exp = exp;
-       spin_unlock(&olg->olg_lock);
-}
-
 static inline int llog_group_set_ctxt(struct obd_llog_group *olg,
                                       struct llog_ctxt *ctxt, int index)
 {
index 60a7e57..70b0555 100644 (file)
@@ -370,9 +370,6 @@ struct filter_obd {
         */
        struct cfs_hash         *fo_iobuf_hash;
 
-       cfs_list_t              fo_llog_list;
-       spinlock_t              fo_llog_list_lock;
-
         struct brw_stats         fo_filter_stats;
 
         int                      fo_fmd_max_num; /* per exp filter_mod_data */
@@ -389,7 +386,6 @@ struct filter_obd {
         unsigned int             fo_fl_oss_capa;
         cfs_list_t               fo_capa_keys;
         cfs_hlist_head_t        *fo_capa_hash;
-        struct llog_commit_master *fo_lcm;
         int                      fo_sec_level;
 };
 
@@ -437,9 +433,6 @@ struct client_obd {
         enum lustre_sec_part     cl_sp_to;
         struct sptlrpc_flavor    cl_flvr_mgc;   /* fixed flavor of mgc->mgs */
 
-        //struct llog_canceld_ctxt *cl_llcd; /* it's included by obd_llog_ctxt */
-        void                    *cl_llcd_offset;
-
         /* the grant values are protected by loi_list_lock below */
         long                     cl_dirty;         /* all _dirty_ in bytes */
         long                     cl_dirty_max;     /* allowed w/o rpc */
@@ -959,13 +952,10 @@ struct target_recovery_data {
 };
 
 struct obd_llog_group {
-        cfs_list_t         olg_list;
         int                olg_seq;
         struct llog_ctxt  *olg_ctxts[LLOG_MAX_CTXTS];
         cfs_waitq_t        olg_waitq;
        spinlock_t         olg_lock;
-       struct obd_export *olg_exp;
-       int                olg_initializing;
        struct mutex       olg_cat_processing;
 };
 
index f710646..575955d 100644 (file)
@@ -1213,18 +1213,6 @@ obd_lvfs_fid2dentry(struct obd_export *exp, __u64 id_ino, __u32 gen, __u64 gr)
        return ctxt->cb_ops.l_fid2dentry(id_ino, gen, gr, exp->exp_obd);
 }
 
-static inline int
-obd_lvfs_open_llog(struct obd_export *exp, __u64 id_ino, struct dentry *dentry)
-{
-        LASSERT(exp->exp_obd);
-        CERROR("FIXME what's the story here?  This needs to be an obd fn?\n");
-#if 0
-        return lvfs_open_llog(&exp->exp_obd->obd_lvfs_ctxt, id_ino,
-                              dentry, exp->exp_obd);
-#endif
-        return 0;
-}
-
 /* @max_age is the oldest time in jiffies that we accept using a cached data.
  * If the cache is older than @max_age we will get a new value from the
  * target.  Use a value of "cfs_time_current() + HZ" to guarantee freshness. */
index fbf2766..ced4c76 100644 (file)
@@ -635,72 +635,6 @@ int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
 }
 EXPORT_SYMBOL(llog_cat_process);
 
-#ifdef __KERNEL__
-int llog_cat_process_thread(void *data)
-{
-        struct llog_process_cat_args *args = data;
-        struct llog_ctxt *ctxt = args->lpca_ctxt;
-        struct llog_handle *llh = NULL;
-        llog_cb_t cb = args->lpca_cb;
-       struct llog_thread_info *lgi;
-       struct lu_env            env;
-        int rc;
-        ENTRY;
-
-        cfs_daemonize_ctxt("ll_log_process");
-
-       rc = lu_env_init(&env, LCT_LOCAL);
-       if (rc)
-               GOTO(out, rc);
-       lgi = llog_info(&env);
-       LASSERT(lgi);
-
-       lgi->lgi_logid = *(struct llog_logid *)(args->lpca_arg);
-       rc = llog_open(&env, ctxt, &llh, &lgi->lgi_logid, NULL,
-                      LLOG_OPEN_EXISTS);
-       if (rc) {
-               CERROR("%s: cannot open llog "LPX64":%x: rc = %d\n",
-                      ctxt->loc_obd->obd_name, lgi->lgi_logid.lgl_oid,
-                      lgi->lgi_logid.lgl_ogen, rc);
-               GOTO(out_env, rc);
-       }
-       rc = llog_init_handle(&env, llh, LLOG_F_IS_CAT, NULL);
-       if (rc) {
-               CERROR("%s: llog_init_handle failed: rc = %d\n",
-                      llh->lgh_ctxt->loc_obd->obd_name, rc);
-               GOTO(release_llh, rc);
-       }
-
-       if (cb) {
-               rc = llog_cat_process(&env, llh, cb, NULL, 0, 0);
-               if (rc != LLOG_PROC_BREAK && rc != 0)
-                       CERROR("%s: llog_cat_process() failed: rc = %d\n",
-                              llh->lgh_ctxt->loc_obd->obd_name, rc);
-               cb(&env, llh, NULL, NULL);
-       } else {
-               CWARN("No callback function for recovery\n");
-       }
-
-       /*
-        * Make sure that all cached data is sent.
-        */
-       llog_sync(ctxt, NULL, 0);
-       GOTO(release_llh, rc);
-release_llh:
-       rc = llog_cat_close(&env, llh);
-       if (rc)
-               CERROR("%s: llog_cat_close() failed: rc = %d\n",
-                      llh->lgh_ctxt->loc_obd->obd_name, rc);
-out_env:
-       lu_env_fini(&env);
-out:
-       llog_ctxt_put(ctxt);
-       OBD_FREE_PTR(args);
-       return rc;
-}
-EXPORT_SYMBOL(llog_cat_process_thread);
-#endif
-
 static int llog_cat_reverse_process_cb(const struct lu_env *env,
                                       struct llog_handle *cat_llh,
                                       struct llog_rec_hdr *rec, void *data)
index 9d51c6a..32f3f96 100644 (file)
@@ -45,7 +45,6 @@ struct llog_process_info {
         void               *lpi_cbdata;
         void               *lpi_catdata;
         int                 lpi_rc;
-        int                 lpi_flags;
        struct completion       lpi_completion;
        const struct lu_env     *lpi_env;
 
@@ -54,17 +53,11 @@ struct llog_process_info {
 struct llog_thread_info {
        struct lu_attr                   lgi_attr;
        struct lu_fid                    lgi_fid;
-       struct llog_logid                lgi_logid;
        struct dt_object_format          lgi_dof;
-       struct llog_process_data         lgi_lpd;
-       struct lustre_mdt_attrs          lgi_lma_attr;
-
        struct lu_buf                    lgi_buf;
        loff_t                           lgi_off;
-
        struct llog_rec_hdr              lgi_lrh;
        struct llog_rec_tail             lgi_tail;
-       struct llog_logid_rec            lgi_lid;
 };
 
 extern struct lu_context_key llog_thread_key;
index f377397..e210d95 100644 (file)
@@ -422,51 +422,3 @@ out_close:
        RETURN(rc);
 }
 EXPORT_SYMBOL(llog_ioctl);
-
-#ifdef HAVE_LDISKFS_OSD
-int llog_catalog_list(struct obd_device *obd, int count,
-                      struct obd_ioctl_data *data)
-{
-        int size, i;
-        struct llog_catid *idarray;
-        struct llog_logid *id;
-        char name[32] = CATLIST;
-        char *out;
-        int l, remains, rc = 0;
-
-        ENTRY;
-        size = sizeof(*idarray) * count;
-
-        OBD_ALLOC_LARGE(idarray, size);
-        if (!idarray)
-                RETURN(-ENOMEM);
-
-       mutex_lock(&obd->obd_olg.olg_cat_processing);
-        rc = llog_get_cat_list(obd, name, 0, count, idarray);
-        if (rc)
-                GOTO(out, rc);
-
-        out = data->ioc_bulk;
-        remains = data->ioc_inllen1;
-        for (i = 0; i < count; i++) {
-                id = &idarray[i].lci_logid;
-                l = snprintf(out, remains,
-                             "catalog log: #"LPX64"#"LPX64"#%08x\n",
-                             id->lgl_oid, id->lgl_oseq, id->lgl_ogen);
-                out += l;
-                remains -= l;
-                if (remains <= 0) {
-                        CWARN("not enough memory for catlog list\n");
-                        break;
-                }
-        }
-out:
-        /* release semaphore */
-       mutex_unlock(&obd->obd_olg.olg_cat_processing);
-
-        OBD_FREE_LARGE(idarray, size);
-        RETURN(rc);
-
-}
-EXPORT_SYMBOL(llog_catalog_list);
-#endif
index 9373474..8d9deb1 100644 (file)
@@ -839,107 +839,6 @@ static int llog_lvfs_destroy(const struct lu_env *env,
         RETURN(rc);
 }
 
-/* reads the catalog list */
-int llog_get_cat_list(struct obd_device *disk_obd,
-                      char *name, int idx, int count, struct llog_catid *idarray)
-{
-        struct lvfs_run_ctxt saved;
-        struct l_file *file;
-        int rc, rc1 = 0;
-        int size = sizeof(*idarray) * count;
-        loff_t off = idx *  sizeof(*idarray);
-        ENTRY;
-
-        if (!count)
-                RETURN(0);
-
-        push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-        file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
-        if (!file || IS_ERR(file)) {
-                rc = PTR_ERR(file);
-                CERROR("OBD filter: cannot open/create %s: rc = %d\n",
-                       name, rc);
-                GOTO(out, rc);
-        }
-
-        if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
-                CERROR("%s is not a regular file!: mode = %o\n", name,
-                       file->f_dentry->d_inode->i_mode);
-                GOTO(out, rc = -ENOENT);
-        }
-
-        CDEBUG(D_CONFIG, "cat list: disk size=%d, read=%d\n",
-               (int)i_size_read(file->f_dentry->d_inode), size);
-
-        /* read for new ost index or for empty file */
-        memset(idarray, 0, size);
-        if (i_size_read(file->f_dentry->d_inode) < off)
-                GOTO(out, rc = 0);
-
-        rc = fsfilt_read_record(disk_obd, file, idarray, size, &off);
-        if (rc) {
-                CERROR("OBD filter: error reading %s: rc %d\n", name, rc);
-                GOTO(out, rc);
-        }
-
-        EXIT;
- out:
-        pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-        if (file && !IS_ERR(file))
-                rc1 = filp_close(file, 0);
-        if (rc == 0)
-                rc = rc1;
-        return rc;
-}
-EXPORT_SYMBOL(llog_get_cat_list);
-
-/* writes the cat list */
-int llog_put_cat_list(struct obd_device *disk_obd,
-                      char *name, int idx, int count, struct llog_catid *idarray)
-{
-        struct lvfs_run_ctxt saved;
-        struct l_file *file;
-        int rc, rc1 = 0;
-        int size = sizeof(*idarray) * count;
-        loff_t off = idx * sizeof(*idarray);
-
-        if (!count)
-                GOTO(out1, rc = 0);
-
-        push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-        file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
-        if (!file || IS_ERR(file)) {
-                rc = PTR_ERR(file);
-                CERROR("OBD filter: cannot open/create %s: rc = %d\n",
-                       name, rc);
-                GOTO(out, rc);
-        }
-
-        if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
-                CERROR("%s is not a regular file!: mode = %o\n", name,
-                       file->f_dentry->d_inode->i_mode);
-                GOTO(out, rc = -ENOENT);
-        }
-
-        rc = fsfilt_write_record(disk_obd, file, idarray, size, &off, 1);
-        if (rc) {
-                CDEBUG(D_INODE,"OBD filter: error writeing %s: rc %d\n",
-                       name, rc);
-                GOTO(out, rc);
-        }
-
-out:
-        pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
-        if (file && !IS_ERR(file))
-                rc1 = filp_close(file, 0);
-
-        if (rc == 0)
-                rc = rc1;
-out1:
-        RETURN(rc);
-}
-EXPORT_SYMBOL(llog_put_cat_list);
-
 static int llog_lvfs_declare_create(const struct lu_env *env,
                                    struct llog_handle *res,
                                    struct thandle *th)
@@ -970,21 +869,5 @@ struct llog_operations llog_lvfs_ops = {
 };
 EXPORT_SYMBOL(llog_lvfs_ops);
 #else /* !__KERNEL__ */
-int llog_get_cat_list(struct obd_device *disk_obd,
-                     char *name, int idx, int count,
-                     struct llog_catid *idarray)
-{
-       LBUG();
-       return 0;
-}
-
-int llog_put_cat_list(struct obd_device *disk_obd,
-                     char *name, int idx, int count,
-                     struct llog_catid *idarray)
-{
-       LBUG();
-       return 0;
-}
-
 struct llog_operations llog_lvfs_ops = {};
 #endif
index da54ff1..775dc11 100644 (file)
@@ -69,7 +69,6 @@ static void llog_ctxt_destroy(struct llog_ctxt *ctxt)
                 class_import_put(ctxt->loc_imp);
                 ctxt->loc_imp = NULL;
         }
-        LASSERT(ctxt->loc_llcd == NULL);
         OBD_FREE_PTR(ctxt);
 }
 
@@ -87,9 +86,6 @@ int __llog_ctxt_put(const struct lu_env *env, struct llog_ctxt *ctxt)
        olg->olg_ctxts[ctxt->loc_idx] = NULL;
        spin_unlock(&olg->olg_lock);
 
-       if (ctxt->loc_lcm)
-               lcm_put(ctxt->loc_lcm);
-
        obd = ctxt->loc_obd;
        spin_lock(&obd->obd_dev_lock);
        /* sync with llog ctxt user thread */
@@ -281,24 +277,6 @@ int llog_cancel(const struct lu_env *env, struct llog_ctxt *ctxt,
 }
 EXPORT_SYMBOL(llog_cancel);
 
-/* add for obdfilter/sz and mds/unlink */
-int llog_obd_origin_add(const struct lu_env *env, struct llog_ctxt *ctxt,
-                       struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
-                       struct llog_cookie *logcookies, int numcookies)
-{
-        struct llog_handle *cathandle;
-        int rc;
-        ENTRY;
-
-        cathandle = ctxt->loc_handle;
-        LASSERT(cathandle != NULL);
-       rc = llog_cat_add(env, cathandle, rec, logcookies, NULL);
-        if (rc != 0 && rc != 1)
-                CERROR("write one catalog record failed: %d\n", rc);
-        RETURN(rc);
-}
-EXPORT_SYMBOL(llog_obd_origin_add);
-
 int obd_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
                   struct obd_device *disk_obd, int *index)
 {
index 0ea5fbf..fbfaa85 100644 (file)
@@ -10,7 +10,7 @@ ldlm_objs += $(LDLM)ldlm_flock.o $(LDLM)ldlm_inodebits.o
 ldlm_objs += $(LDLM)ldlm_pool.o
 ldlm_objs += $(LDLM)interval_tree.o
 ptlrpc_objs := client.o recover.o connection.o niobuf.o pack_generic.o
-ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o recov_thread.o
+ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o
 ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o
 ptlrpc_objs += pers.o lproc_ptlrpc.o wiretest.o layout.o
 ptlrpc_objs += sec.o sec_bulk.o sec_gc.o sec_config.o sec_lproc.o
index 2b1d88a..77ad215 100644 (file)
@@ -51,7 +51,7 @@ LDLM_COMM_SOURCES= $(top_srcdir)/lustre/ldlm/l_lock.c \
        $(top_srcdir)/lustre/ldlm/ldlm_pool.c
 
 COMMON_SOURCES = client.c recover.c connection.c niobuf.c pack_generic.c       \
-       events.c ptlrpc_module.c service.c pinger.c recov_thread.c llog_net.c  \
+       events.c ptlrpc_module.c service.c pinger.c llog_net.c                 \
        llog_client.c llog_server.c import.c ptlrpcd.c pers.c wiretest.c       \
        ptlrpc_internal.h layout.c sec.c sec_bulk.c sec_gc.c sec_config.c      \
        sec_lproc.c sec_null.c sec_plain.c lproc_ptlrpc.c nrs.c nrs_fifo.c     \
@@ -91,7 +91,6 @@ ptlrpc_SOURCES =      \
        pinger.c        \
        ptlrpcd.c       \
        recover.c       \
-       recov_thread.c  \
        service.c       \
        nrs.c           \
        nrs_fifo.c      \
index ffc378b..3245055 100644 (file)
 #include <lvfs.h>
 #include <lustre_fsfilt.h>
 
-#ifdef __KERNEL__
-int llog_origin_connect(struct llog_ctxt *ctxt,
-                        struct llog_logid *logid, struct llog_gen *gen,
-                        struct obd_uuid *uuid)
-{
-        struct llog_gen_rec    *lgr;
-        struct ptlrpc_request  *req;
-        struct llogd_conn_body *req_body;
-        struct inode* inode = ctxt->loc_handle->lgh_file->f_dentry->d_inode;
-        void *handle;
-        int rc, rc1;
-
-        ENTRY;
-
-        if (cfs_list_empty(&ctxt->loc_handle->u.chd.chd_head)) {
-                CDEBUG(D_HA, "there is no record related to ctxt %p\n", ctxt);
-                RETURN(0);
-        }
-
-        /* FIXME what value for gen->conn_cnt */
-        llog_gen_init(ctxt);
-
-        /* first add llog_gen_rec */
-        OBD_ALLOC_PTR(lgr);
-        if (!lgr)
-                RETURN(-ENOMEM);
-        lgr->lgr_hdr.lrh_len = lgr->lgr_tail.lrt_len = sizeof(*lgr);
-        lgr->lgr_hdr.lrh_type = LLOG_GEN_REC;
-
-        handle = fsfilt_start_log(ctxt->loc_exp->exp_obd, inode, 
-                                  FSFILT_OP_CANCEL_UNLINK, NULL, 1);
-        if (IS_ERR(handle)) {
-               CERROR("fsfilt_start failed: %ld\n", PTR_ERR(handle));
-               OBD_FREE(lgr, sizeof(*lgr));
-               rc = PTR_ERR(handle);
-               RETURN(rc);
-        }
-
-        lgr->lgr_gen = ctxt->loc_gen;
-       rc = llog_obd_add(NULL, ctxt, &lgr->lgr_hdr, NULL, NULL, 1);
-        OBD_FREE_PTR(lgr);
-        rc1 = fsfilt_commit(ctxt->loc_exp->exp_obd, inode, handle, 0);
-        if (rc != 1 || rc1 != 0) {
-                rc = (rc != 1) ? rc : rc1;
-                RETURN(rc);
-        }
-
-        LASSERT(ctxt->loc_imp);
-        req = ptlrpc_request_alloc_pack(ctxt->loc_imp, &RQF_LLOG_ORIGIN_CONNECT,
-                                        LUSTRE_LOG_VERSION,
-                                        LLOG_ORIGIN_CONNECT);
-        if (req == NULL)
-                RETURN(-ENOMEM);
-
-        CDEBUG(D_OTHER, "%s mount_count "LPU64", connection count "LPU64"\n",
-               ctxt->loc_exp->exp_obd->obd_type->typ_name,
-               ctxt->loc_gen.mnt_cnt, ctxt->loc_gen.conn_cnt);
-
-        req_body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
-        req_body->lgdc_gen = ctxt->loc_gen;
-        req_body->lgdc_logid = ctxt->loc_handle->lgh_id;
-        req_body->lgdc_ctxt_idx = ctxt->loc_idx + 1;
-        ptlrpc_request_set_replen(req);
-
-        req->rq_no_resend = req->rq_no_delay = 1;
-        rc = ptlrpc_queue_wait(req);
-        ptlrpc_req_finished(req);
-
-        RETURN(rc);
-}
-EXPORT_SYMBOL(llog_origin_connect);
-
-int llog_handle_connect(struct ptlrpc_request *req)
-{
-        struct obd_device *obd = req->rq_export->exp_obd;
-        struct llogd_conn_body *req_body;
-        struct llog_ctxt *ctxt;
-        int rc;
-        ENTRY;
-
-        req_body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
-
-        ctxt = llog_get_context(obd, req_body->lgdc_ctxt_idx);
-        rc = llog_connect(ctxt, &req_body->lgdc_logid,
-                          &req_body->lgdc_gen, NULL);
-
-        llog_ctxt_put(ctxt);
-        if (rc != 0)
-                CERROR("failed at llog_relp_connect\n");
-
-        RETURN(rc);
-}
-EXPORT_SYMBOL(llog_handle_connect);
-
-int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp)
-{
-        ENTRY;
-
-        LASSERT(ctxt);
-       mutex_lock(&ctxt->loc_mutex);
-        if (ctxt->loc_imp != imp) {
-                if (ctxt->loc_imp) {
-                        CWARN("changing the import %p - %p\n",
-                              ctxt->loc_imp, imp);
-                        class_import_put(ctxt->loc_imp);
-                }
-                ctxt->loc_imp = class_import_get(imp);
-        }
-       mutex_unlock(&ctxt->loc_mutex);
-        RETURN(0);
-}
-EXPORT_SYMBOL(llog_receptor_accept);
-
-#else /* !__KERNEL__ */
-
-int llog_origin_connect(struct llog_ctxt *ctxt,
-                        struct llog_logid *logid, struct llog_gen *gen,
-                        struct obd_uuid *uuid)
-{
-        return 0;
-}
-#endif
-
 int llog_initiator_connect(struct llog_ctxt *ctxt)
 {
         struct obd_import *new_imp;
index e72ee14..b24185a 100644 (file)
@@ -2294,16 +2294,6 @@ void lustre_swab_ldlm_reply (struct ldlm_reply *r)
 }
 EXPORT_SYMBOL(lustre_swab_ldlm_reply);
 
-/* no one calls this */
-int llog_log_swabbed(struct llog_log_hdr *hdr)
-{
-        if (hdr->llh_hdr.lrh_type == __swab32(LLOG_HDR_MAGIC))
-                return 1;
-        if (hdr->llh_hdr.lrh_type == LLOG_HDR_MAGIC)
-                return 0;
-        return -1;
-}
-
 void lustre_swab_quota_body(struct quota_body *b)
 {
        lustre_swab_lu_fid(&b->qb_fid);
index c2d5050..5c24498 100644 (file)
@@ -268,10 +268,6 @@ void sptlrpc_conf_fini(void);
 int  sptlrpc_init(void);
 void sptlrpc_fini(void);
 
-/* recov_thread.c */
-int llog_recov_init(void);
-void llog_recov_fini(void);
-
 static inline int ll_rpc_recoverable_error(int rc)
 {
         return (rc == -ENOTCONN || rc == -ENODEV);
index 51869db..dd689b7 100644 (file)
@@ -104,11 +104,6 @@ __init int ptlrpc_init(void)
         if (rc)
                 GOTO(cleanup, rc);
 
-        cleanup_phase = 6;
-        rc = llog_recov_init();
-        if (rc)
-                GOTO(cleanup, rc);
-
        cleanup_phase = 7;
        rc = ptlrpc_nrs_init();
        if (rc)
@@ -129,8 +124,6 @@ cleanup:
                ptlrpc_nrs_fini();
 #endif
        case 7:
-               llog_recov_fini();
-        case 6:
                 sptlrpc_fini();
         case 5:
                 ldlm_exit();
@@ -154,7 +147,6 @@ static void __exit ptlrpc_exit(void)
 {
        tgt_mod_exit();
        ptlrpc_nrs_fini();
-        llog_recov_fini();
         sptlrpc_fini();
         ldlm_exit();
         ptlrpc_stop_pinger();
diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c
deleted file mode 100644 (file)
index dee77cd..0000000
+++ /dev/null
@@ -1,776 +0,0 @@
-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- *
- * Copyright (c) 2011, 2012, Intel Corporation.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/ptlrpc/recov_thread.c
- *
- * OST<->MDS recovery logging thread.
- * Invariants in implementation:
- * - we do not share logs among different OST<->MDS connections, so that
- *   if an OST or MDS fails it need only look at log(s) relevant to itself
- *
- * Author: Andreas Dilger   <adilger@clusterfs.com>
- *         Yury Umanets     <yury.umanets@sun.com>
- *         Alexey Lyashkov  <alexey.lyashkov@sun.com>
- */
-
-#define DEBUG_SUBSYSTEM S_LOG
-
-#ifdef __KERNEL__
-# include <libcfs/libcfs.h>
-#else
-# include <libcfs/list.h>
-# include <liblustre.h>
-#endif
-
-#include <obd_class.h>
-#include <obd_support.h>
-#include <obd_class.h>
-#include <lustre_net.h>
-#include <lnet/types.h>
-#include <libcfs/list.h>
-#include <lustre_log.h>
-#include "ptlrpc_internal.h"
-
-static cfs_atomic_t               llcd_count = CFS_ATOMIC_INIT(0);
-static cfs_mem_cache_t           *llcd_cache = NULL;
-
-#ifdef __KERNEL__
-enum {
-        LLOG_LCM_FL_START       = 1 << 0,
-        LLOG_LCM_FL_EXIT        = 1 << 1
-};
-
-struct llcd_async_args {
-        struct llog_canceld_ctxt *la_ctxt;
-};
-
-static void llcd_print(struct llog_canceld_ctxt *llcd,
-                       const char *func, int line)
-{
-        CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line);
-        CDEBUG(D_RPCTRACE, "  size: %d\n", llcd->llcd_size);
-        CDEBUG(D_RPCTRACE, "  ctxt: %p\n", llcd->llcd_ctxt);
-        CDEBUG(D_RPCTRACE, "  lcm : %p\n", llcd->llcd_lcm);
-        CDEBUG(D_RPCTRACE, "  cookiebytes : %d\n", llcd->llcd_cookiebytes);
-}
-
-/**
- * Allocate new llcd from cache, init it and return to caller.
- * Bumps number of objects allocated.
- */
-static struct llog_canceld_ctxt *llcd_alloc(struct llog_commit_master *lcm)
-{
-        struct llog_canceld_ctxt *llcd;
-        int size, overhead;
-
-        LASSERT(lcm != NULL);
-
-        /*
-         * We want to send one page of cookies with rpc header. This buffer
-         * will be assigned later to the rpc, this is why we preserve the
-         * space for rpc header.
-         */
-        size = CFS_PAGE_SIZE - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
-        overhead =  offsetof(struct llog_canceld_ctxt, llcd_cookies);
-       OBD_SLAB_ALLOC_GFP(llcd, llcd_cache, size + overhead, CFS_ALLOC_STD);
-        if (!llcd)
-                return NULL;
-
-        CFS_INIT_LIST_HEAD(&llcd->llcd_list);
-        llcd->llcd_cookiebytes = 0;
-        llcd->llcd_size = size;
-
-       spin_lock(&lcm->lcm_lock);
-       llcd->llcd_lcm = lcm;
-       cfs_atomic_inc(&lcm->lcm_count);
-       cfs_list_add_tail(&llcd->llcd_list, &lcm->lcm_llcds);
-       spin_unlock(&lcm->lcm_lock);
-        cfs_atomic_inc(&llcd_count);
-
-        CDEBUG(D_RPCTRACE, "Alloc llcd %p on lcm %p (%d)\n",
-               llcd, lcm, cfs_atomic_read(&lcm->lcm_count));
-
-        return llcd;
-}
-
-/**
- * Returns passed llcd to cache.
- */
-static void llcd_free(struct llog_canceld_ctxt *llcd)
-{
-        struct llog_commit_master *lcm = llcd->llcd_lcm;
-        int size;
-
-        if (lcm) {
-                if (cfs_atomic_read(&lcm->lcm_count) == 0) {
-                        CERROR("Invalid llcd free %p\n", llcd);
-                        llcd_print(llcd, __FUNCTION__, __LINE__);
-                        LBUG();
-                }
-               spin_lock(&lcm->lcm_lock);
-               LASSERT(!cfs_list_empty(&llcd->llcd_list));
-               cfs_list_del_init(&llcd->llcd_list);
-               cfs_atomic_dec(&lcm->lcm_count);
-               spin_unlock(&lcm->lcm_lock);
-
-                CDEBUG(D_RPCTRACE, "Free llcd %p on lcm %p (%d)\n",
-                       llcd, lcm, cfs_atomic_read(&lcm->lcm_count));
-        }
-
-        LASSERT(cfs_atomic_read(&llcd_count) > 0);
-        cfs_atomic_dec(&llcd_count);
-
-        size = offsetof(struct llog_canceld_ctxt, llcd_cookies) +
-            llcd->llcd_size;
-        OBD_SLAB_FREE(llcd, llcd_cache, size);
-}
-
-/**
- * Checks if passed cookie fits into llcd free space buffer. Returns
- * 1 if yes and 0 otherwise.
- */
-static inline int
-llcd_fit(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies)
-{
-        return (llcd->llcd_size - llcd->llcd_cookiebytes >= sizeof(*cookies));
-}
-
-/**
- * Copy passed @cookies to @llcd.
- */
-static inline void
-llcd_copy(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies)
-{
-        LASSERT(llcd_fit(llcd, cookies));
-        memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes,
-              cookies, sizeof(*cookies));
-        llcd->llcd_cookiebytes += sizeof(*cookies);
-}
-
-/**
- * Llcd completion function. Called uppon llcd send finish regardless
- * sending result. Error is passed in @rc. Note, that this will be called
- * in cleanup time when all inflight rpcs aborted.
- */
-static int
-llcd_interpret(const struct lu_env *env,
-               struct ptlrpc_request *req, void *args, int rc)
-{
-        struct llcd_async_args *la = args;
-        struct llog_canceld_ctxt *llcd = la->la_ctxt;
-
-        CDEBUG(D_RPCTRACE, "Sent llcd %p (%d) - killing it\n", llcd, rc);
-        llcd_free(llcd);
-        return 0;
-}
-
-/**
- * Send @llcd to remote node. Free llcd uppon completion or error. Sending
- * is performed in async style so this function will return asap without
- * blocking.
- */
-static int llcd_send(struct llog_canceld_ctxt *llcd)
-{
-        char *bufs[2] = { NULL, (char *)llcd->llcd_cookies };
-        struct obd_import *import = NULL;
-        struct llog_commit_master *lcm;
-        struct llcd_async_args *la;
-        struct ptlrpc_request *req;
-        struct llog_ctxt *ctxt;
-        int rc;
-        ENTRY;
-
-        ctxt = llcd->llcd_ctxt;
-        if (!ctxt) {
-                CERROR("Invalid llcd with NULL ctxt found (%p)\n",
-                       llcd);
-                llcd_print(llcd, __FUNCTION__, __LINE__);
-                LBUG();
-        }
-        LASSERT_MUTEX_LOCKED(&ctxt->loc_mutex);
-
-        if (llcd->llcd_cookiebytes == 0)
-                GOTO(exit, rc = 0);
-
-        lcm = llcd->llcd_lcm;
-
-        /*
-         * Check if we're in exit stage. Do not send llcd in
-         * this case.
-         */
-       if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
-                GOTO(exit, rc = -ENODEV);
-
-        CDEBUG(D_RPCTRACE, "Sending llcd %p\n", llcd);
-
-        import = llcd->llcd_ctxt->loc_imp;
-        if (!import || (import == LP_POISON) ||
-            (import->imp_client == LP_POISON)) {
-                CERROR("Invalid import %p for llcd %p\n",
-                       import, llcd);
-                GOTO(exit, rc = -ENODEV);
-        }
-
-        OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10);
-
-        /*
-         * No need to get import here as it is already done in
-         * llog_receptor_accept().
-         */
-        req = ptlrpc_request_alloc(import, &RQF_LOG_CANCEL);
-        if (req == NULL) {
-                CERROR("Can't allocate request for sending llcd %p\n",
-                       llcd);
-                GOTO(exit, rc = -ENOMEM);
-        }
-        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES,
-                             RCL_CLIENT, llcd->llcd_cookiebytes);
-
-        rc = ptlrpc_request_bufs_pack(req, LUSTRE_LOG_VERSION,
-                                      OBD_LOG_CANCEL, bufs, NULL);
-        if (rc) {
-                ptlrpc_request_free(req);
-                GOTO(exit, rc);
-        }
-
-        ptlrpc_at_set_req_timeout(req);
-        ptlrpc_request_set_replen(req);
-
-        /* bug 5515 */
-        req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
-        req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
-
-        req->rq_interpret_reply = (ptlrpc_interpterer_t)llcd_interpret;
-
-        CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
-        la = ptlrpc_req_async_args(req);
-        la->la_ctxt = llcd;
-
-        /* llog cancels will be replayed after reconnect so this will do twice
-         * first from replay llog, second for resended rpc */
-        req->rq_no_delay = req->rq_no_resend = 1;
-
-        ptlrpc_set_add_new_req(&lcm->lcm_pc, req);
-        RETURN(0);
-exit:
-        CDEBUG(D_RPCTRACE, "Refused llcd %p\n", llcd);
-        llcd_free(llcd);
-        return rc;
-}
-
-/**
- * Attach @llcd to @ctxt. Establish llcd vs. ctxt reserve connection
- * so hat they can refer each other.
- */
-static int
-llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd)
-{
-        LASSERT(ctxt != NULL && llcd != NULL);
-        LASSERT_MUTEX_LOCKED(&ctxt->loc_mutex);
-        LASSERT(ctxt->loc_llcd == NULL);
-        llcd->llcd_ctxt = llog_ctxt_get(ctxt);
-        ctxt->loc_llcd = llcd;
-
-        CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p\n",
-               llcd, ctxt);
-
-        return 0;
-}
-
-/**
- * Opposite to llcd_attach(). Detaches llcd from its @ctxt. This makes
- * sure that this llcd will not be found another time we try to cancel.
- */
-static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt)
-{
-        struct llog_canceld_ctxt *llcd;
-
-        LASSERT(ctxt != NULL);
-        LASSERT_MUTEX_LOCKED(&ctxt->loc_mutex);
-
-        llcd = ctxt->loc_llcd;
-        if (!llcd)
-                return NULL;
-
-        CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p\n",
-               llcd, ctxt);
-
-        ctxt->loc_llcd = NULL;
-        llog_ctxt_put(ctxt);
-        return llcd;
-}
-
-/**
- * Return @llcd cached in @ctxt. Allocate new one if required. Attach it
- * to ctxt so that it may be used for gathering cookies and sending.
- */
-static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt)
-{
-        struct llog_canceld_ctxt *llcd;
-        LASSERT(ctxt);
-        llcd = llcd_alloc(ctxt->loc_lcm);
-        if (!llcd) {
-                CERROR("Can't alloc an llcd for ctxt %p\n", ctxt);
-                return NULL;
-        }
-        llcd_attach(ctxt, llcd);
-        return llcd;
-}
-
-/**
- * Deatch llcd from its @ctxt. Free llcd.
- */
-static void llcd_put(struct llog_ctxt *ctxt)
-{
-        struct llog_canceld_ctxt *llcd;
-
-        llcd = llcd_detach(ctxt);
-        if (llcd)
-                llcd_free(llcd);
-}
-
-/**
- * Detach llcd from its @ctxt so that nobody will find it with try to
- * re-use. Send llcd to remote node.
- */
-static int llcd_push(struct llog_ctxt *ctxt)
-{
-        struct llog_canceld_ctxt *llcd;
-        int rc;
-
-        /*
-         * Make sure that this llcd will not be sent again as we detach
-         * it from ctxt.
-         */
-        llcd = llcd_detach(ctxt);
-        if (!llcd) {
-                CERROR("Invalid detached llcd found %p\n", llcd);
-                llcd_print(llcd, __FUNCTION__, __LINE__);
-                LBUG();
-        }
-
-        rc = llcd_send(llcd);
-        if (rc)
-                CERROR("Couldn't send llcd %p (%d)\n", llcd, rc);
-        return rc;
-}
-
-/**
- * Start recovery thread which actually deals llcd sending. This
- * is all ptlrpc standard thread based so there is not much of work
- * to do.
- */
-int llog_recov_thread_start(struct llog_commit_master *lcm)
-{
-        int rc;
-        ENTRY;
-
-        rc = ptlrpcd_start(-1, 1, lcm->lcm_name, &lcm->lcm_pc);
-        if (rc) {
-                CERROR("Error %d while starting recovery thread %s\n",
-                       rc, lcm->lcm_name);
-                RETURN(rc);
-        }
-        RETURN(rc);
-}
-EXPORT_SYMBOL(llog_recov_thread_start);
-
-/**
- * Stop recovery thread. Complement to llog_recov_thread_start().
- */
-void llog_recov_thread_stop(struct llog_commit_master *lcm, int force)
-{
-        ENTRY;
-
-        /*
-         * Let all know that we're stopping. This will also make
-         * llcd_send() refuse any new llcds.
-         */
-       set_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags);
-
-        /*
-         * Stop processing thread. No new rpcs will be accepted for
-         * for processing now.
-         */
-        ptlrpcd_stop(&lcm->lcm_pc, force);
-       ptlrpcd_free(&lcm->lcm_pc);
-
-        /*
-         * By this point no alive inflight llcds should be left. Only
-         * those forgotten in sync may still be attached to ctxt. Let's
-         * print them.
-         */
-        if (cfs_atomic_read(&lcm->lcm_count) != 0) {
-                struct llog_canceld_ctxt *llcd;
-                cfs_list_t               *tmp;
-
-                CERROR("Busy llcds found (%d) on lcm %p\n",
-                       cfs_atomic_read(&lcm->lcm_count), lcm);
-
-               spin_lock(&lcm->lcm_lock);
-               cfs_list_for_each(tmp, &lcm->lcm_llcds) {
-                       llcd = cfs_list_entry(tmp, struct llog_canceld_ctxt,
-                                             llcd_list);
-                       llcd_print(llcd, __func__, __LINE__);
-               }
-               spin_unlock(&lcm->lcm_lock);
-
-                /*
-                 * No point to go further with busy llcds at this point
-                 * as this is clear bug. It might mean we got hanging
-                 * rpc which holds import ref and this means we will not
-                 * be able to cleanup anyways.
-                 *
-                 * Or we just missed to kill them when they were not
-                 * attached to ctxt. In this case our slab will remind
-                 * us about this a bit later.
-                 */
-                LBUG();
-        }
-        EXIT;
-}
-EXPORT_SYMBOL(llog_recov_thread_stop);
-
-/**
- * Initialize commit master structure and start recovery thread on it.
- */
-struct llog_commit_master *llog_recov_thread_init(char *name)
-{
-        struct llog_commit_master *lcm;
-        int rc;
-        ENTRY;
-
-        OBD_ALLOC_PTR(lcm);
-        if (!lcm)
-                RETURN(NULL);
-
-        /*
-         * Try to create threads with unique names.
-         */
-        snprintf(lcm->lcm_name, sizeof(lcm->lcm_name),
-                 "lcm_%s", name);
-
-        cfs_atomic_set(&lcm->lcm_count, 0);
-        cfs_atomic_set(&lcm->lcm_refcount, 1);
-       spin_lock_init(&lcm->lcm_lock);
-        CFS_INIT_LIST_HEAD(&lcm->lcm_llcds);
-        rc = llog_recov_thread_start(lcm);
-        if (rc) {
-                CERROR("Can't start commit thread, rc %d\n", rc);
-                GOTO(out, rc);
-        }
-        RETURN(lcm);
-out:
-        OBD_FREE_PTR(lcm);
-        return NULL;
-}
-EXPORT_SYMBOL(llog_recov_thread_init);
-
-/**
- * Finalize commit master and its recovery thread.
- */
-void llog_recov_thread_fini(struct llog_commit_master *lcm, int force)
-{
-        ENTRY;
-        llog_recov_thread_stop(lcm, force);
-        lcm_put(lcm);
-        EXIT;
-}
-EXPORT_SYMBOL(llog_recov_thread_fini);
-
-static int llog_recov_thread_replay(struct llog_ctxt *ctxt,
-                                    void *cb, void *arg)
-{
-        struct obd_device *obd = ctxt->loc_obd;
-        struct llog_process_cat_args *lpca;
-        int rc;
-        ENTRY;
-
-        if (obd->obd_stopping)
-                RETURN(-ENODEV);
-
-        /*
-         * This will be balanced in llog_cat_process_thread()
-         */
-        OBD_ALLOC_PTR(lpca);
-        if (!lpca)
-                RETURN(-ENOMEM);
-
-        lpca->lpca_cb = cb;
-        lpca->lpca_arg = arg;
-
-        /*
-         * This will be balanced in llog_cat_process_thread()
-         */
-        lpca->lpca_ctxt = llog_ctxt_get(ctxt);
-        if (!lpca->lpca_ctxt) {
-                OBD_FREE_PTR(lpca);
-                RETURN(-ENODEV);
-        }
-        rc = cfs_create_thread(llog_cat_process_thread, lpca, CFS_DAEMON_FLAGS);
-        if (rc < 0) {
-                CERROR("Error starting llog_cat_process_thread(): %d\n", rc);
-                OBD_FREE_PTR(lpca);
-                llog_ctxt_put(ctxt);
-        } else {
-                CDEBUG(D_HA, "Started llog_cat_process_thread(): %d\n", rc);
-                rc = 0;
-        }
-
-        RETURN(rc);
-}
-
-int llog_obd_repl_connect(struct llog_ctxt *ctxt,
-                          struct llog_logid *logid, struct llog_gen *gen,
-                          struct obd_uuid *uuid)
-{
-        int rc;
-        ENTRY;
-
-        /*
-         * Send back cached llcd from llog before recovery if we have any.
-         * This is void is nothing cached is found there.
-         */
-       llog_sync(ctxt, NULL, 0);
-
-        /*
-         * Start recovery in separate thread.
-         */
-       mutex_lock(&ctxt->loc_mutex);
-        ctxt->loc_gen = *gen;
-        rc = llog_recov_thread_replay(ctxt, ctxt->llog_proc_cb, logid);
-       mutex_unlock(&ctxt->loc_mutex);
-
-        RETURN(rc);
-}
-EXPORT_SYMBOL(llog_obd_repl_connect);
-
-/**
- * Deleted objects have a commit callback that cancels the MDS
- * log record for the deletion. The commit callback calls this
- * function.
- */
-int llog_obd_repl_cancel(const struct lu_env *env, struct llog_ctxt *ctxt,
-                        struct lov_stripe_md *lsm, int count,
-                        struct llog_cookie *cookies, int flags)
-{
-        struct llog_commit_master *lcm;
-        struct llog_canceld_ctxt *llcd;
-        int rc = 0;
-        ENTRY;
-
-        LASSERT(ctxt != NULL);
-
-       mutex_lock(&ctxt->loc_mutex);
-        if (!ctxt->loc_lcm) {
-                CDEBUG(D_RPCTRACE, "No lcm for ctxt %p\n", ctxt);
-                GOTO(out, rc = -ENODEV);
-        }
-        lcm = ctxt->loc_lcm;
-        CDEBUG(D_INFO, "cancel on lsm %p\n", lcm);
-
-        /*
-         * Let's check if we have all structures alive. We also check for
-         * possible shutdown. Do nothing if we're stopping.
-         */
-       if (ctxt->loc_flags & LLOG_CTXT_FLAG_STOP) {
-               CDEBUG(D_RPCTRACE, "Last sync was done for ctxt %p\n", ctxt);
-               GOTO(out, rc = -ENODEV);
-       }
-
-       if (ctxt->loc_imp == NULL) {
-                CDEBUG(D_RPCTRACE, "No import for ctxt %p\n", ctxt);
-                GOTO(out, rc = -ENODEV);
-        }
-
-       if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) {
-                CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n",
-                       ctxt);
-                GOTO(out, rc = -ENODEV);
-        }
-
-        llcd = ctxt->loc_llcd;
-
-        if (count > 0 && cookies != NULL) {
-                /*
-                 * Get new llcd from ctxt if required.
-                 */
-                if (!llcd) {
-                        llcd = llcd_get(ctxt);
-                        if (!llcd)
-                                GOTO(out, rc = -ENOMEM);
-                        /*
-                         * Allocation is successful, let's check for stop
-                         * flag again to fall back as soon as possible.
-                         */
-                       if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
-                                GOTO(out, rc = -ENODEV);
-                }
-
-                /*
-                 * Llcd does not have enough room for @cookies. Let's push
-                 * it out and allocate new one.
-                 */
-                if (!llcd_fit(llcd, cookies)) {
-                        rc = llcd_push(ctxt);
-                        if (rc)
-                                GOTO(out, rc);
-                        llcd = llcd_get(ctxt);
-                        if (!llcd)
-                                GOTO(out, rc = -ENOMEM);
-                        /*
-                         * Allocation is successful, let's check for stop
-                         * flag again to fall back as soon as possible.
-                         */
-                       if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags))
-                                GOTO(out, rc = -ENODEV);
-                }
-
-                /*
-                 * Copy cookies to @llcd, no matter old or new allocated
-                 * one.
-                 */
-                llcd_copy(llcd, cookies);
-        }
-
-        /*
-         * Let's check if we need to send copied @cookies asap. If yes
-         * then do it.
-         */
-        if (llcd && (flags & OBD_LLOG_FL_SENDNOW)) {
-                CDEBUG(D_RPCTRACE, "Sync llcd %p\n", llcd);
-                rc = llcd_push(ctxt);
-                if (rc)
-                        GOTO(out, rc);
-        }
-        EXIT;
-out:
-        if (rc)
-                llcd_put(ctxt);
-
-       if (flags & OBD_LLOG_FL_EXIT)
-               ctxt->loc_flags = LLOG_CTXT_FLAG_STOP;
-
-       mutex_unlock(&ctxt->loc_mutex);
-        return rc;
-}
-EXPORT_SYMBOL(llog_obd_repl_cancel);
-
-int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp,
-                      int flags)
-{
-        int rc = 0;
-        ENTRY;
-
-        /*
-         * Flush any remaining llcd.
-         */
-       mutex_lock(&ctxt->loc_mutex);
-        if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
-                /*
-                 * This is ost->mds connection, we can't be sure that mds
-                 * can still receive cookies, let's killed the cached llcd.
-                 */
-                CDEBUG(D_RPCTRACE, "Kill cached llcd\n");
-                llcd_put(ctxt);
-
-               if (flags & OBD_LLOG_FL_EXIT)
-                       ctxt->loc_flags = LLOG_CTXT_FLAG_STOP;
-
-               mutex_unlock(&ctxt->loc_mutex);
-        } else {
-                /*
-                 * This is either llog_sync() from generic llog code or sync
-                 * on client disconnect. In either way let's do it and send
-                 * llcds to the target with waiting for completion.
-                 */
-                CDEBUG(D_RPCTRACE, "Sync cached llcd\n");
-               mutex_unlock(&ctxt->loc_mutex);
-               rc = llog_cancel(NULL, ctxt, NULL, 0, NULL,
-                                OBD_LLOG_FL_SENDNOW | flags);
-        }
-        RETURN(rc);
-}
-EXPORT_SYMBOL(llog_obd_repl_sync);
-
-#else /* !__KERNEL__ */
-
-int llog_obd_repl_cancel(const struct lu_env *env, struct llog_ctxt *ctxt,
-                         struct lov_stripe_md *lsm, int count,
-                         struct llog_cookie *cookies, int flags)
-{
-        return 0;
-}
-#endif
-
-/**
- * Module init time fucntion. Initializes slab for llcd objects.
- */
-int llog_recov_init(void)
-{
-        int llcd_size;
-
-        llcd_size = CFS_PAGE_SIZE -
-                lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
-        llcd_size += offsetof(struct llog_canceld_ctxt, llcd_cookies);
-        llcd_cache = cfs_mem_cache_create("llcd_cache", llcd_size, 0, 0);
-        if (!llcd_cache) {
-                CERROR("Error allocating llcd cache\n");
-                return -ENOMEM;
-        }
-        return 0;
-}
-
-/**
- * Module fini time fucntion. Releases slab for llcd objects.
- */
-void llog_recov_fini(void)
-{
-        /*
-         * Kill llcd cache when thread is stopped and we're sure no
-         * llcd in use left.
-         */
-        if (llcd_cache) {
-                /*
-                 * In 2.6.22 cfs_mem_cache_destroy() will not return error
-                 * for busy resources. Let's check it another way.
-                 */
-                LASSERTF(cfs_atomic_read(&llcd_count) == 0,
-                         "Can't destroy llcd cache! Number of "
-                         "busy llcds: %d\n", cfs_atomic_read(&llcd_count));
-                cfs_mem_cache_destroy(llcd_cache);
-                llcd_cache = NULL;
-        }
-}