From 2f8dcb7b8da13c8621a982be730a335bb189df08 Mon Sep 17 00:00:00 2001 From: "John L. Hammond" Date: Thu, 28 Feb 2013 03:14:30 -0600 Subject: [PATCH] LU-2675 cleanup: remove unused recov_thread.c and llog code Remove recov_thread.c and the following exported functions: llog_cat_process_thread llog_obd_origin_add llog_catalog_list llog_receptor_accept llog_origin_connect llog_handle_connect llog_obd_repl_cancel llog_obd_repl_sync llog_obd_repl_connect llog_get_cat_list llog_put_cat_list llog_recov_thread_start llog_recov_thread_stop Signed-off-by: John L. Hammond Change-Id: I620c641646294fcf697f1e2d4d1f389d71bb3ddd Reviewed-on: http://review.whamcloud.com/5190 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Liu Xuezhao Reviewed-by: Andreas Dilger Reviewed-by: Mike Pershin --- lustre/include/lustre_log.h | 172 +-------- lustre/include/obd.h | 10 - lustre/include/obd_class.h | 12 - lustre/obdclass/llog_cat.c | 66 ---- lustre/obdclass/llog_internal.h | 7 - lustre/obdclass/llog_ioctl.c | 48 --- lustre/obdclass/llog_lvfs.c | 117 ------ lustre/obdclass/llog_obd.c | 22 -- lustre/ptlrpc/Makefile.in | 2 +- lustre/ptlrpc/autoMakefile.am | 3 +- lustre/ptlrpc/llog_net.c | 123 ------- lustre/ptlrpc/pack_generic.c | 10 - lustre/ptlrpc/ptlrpc_internal.h | 4 - lustre/ptlrpc/ptlrpc_module.c | 8 - lustre/ptlrpc/recov_thread.c | 776 ---------------------------------------- 15 files changed, 3 insertions(+), 1377 deletions(-) delete mode 100644 lustre/ptlrpc/recov_thread.c diff --git a/lustre/include/lustre_log.h b/lustre/include/lustre_log.h index 17c8434..2419161 100644 --- a/lustre/include/lustre_log.h +++ b/lustre/include/lustre_log.h @@ -169,21 +169,6 @@ struct llog_process_cat_data { int lpcd_last_idx; }; -struct llog_process_cat_args { - /** - * Llog context used in recovery thread on OST (recov_thread.c) - */ - struct llog_ctxt *lpca_ctxt; - /** - * Llog callback used in recovery thread on OST (recov_thread.c) - */ - void *lpca_cb; - /** - * Data pointer for llog callback. - */ - void *lpca_arg; -}; - int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle); int llog_cat_add_rec(const struct lu_env *env, struct llog_handle *cathandle, struct llog_rec_hdr *rec, struct llog_cookie *reccookie, @@ -202,7 +187,6 @@ int llog_cat_process_or_fork(const struct lu_env *env, void *data, int startcat, int startidx, bool fork); int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh, llog_cb_t cb, void *data, int startcat, int startidx); -int llog_cat_process_thread(void *data); int llog_cat_reverse_process(const struct lu_env *env, struct llog_handle *cat_llh, llog_cb_t cb, void *data); @@ -222,9 +206,6 @@ int llog_obd_add(const struct lu_env *env, struct llog_ctxt *ctxt, int llog_cancel(const struct lu_env *env, struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags); -int llog_obd_origin_add(const struct lu_env *env, struct llog_ctxt *ctxt, - struct llog_rec_hdr *rec, struct lov_stripe_md *lsm, - struct llog_cookie *logcookies, int numcookies); int obd_llog_init(struct obd_device *obd, struct obd_llog_group *olg, struct obd_device *disk_obd, int *idx); @@ -234,26 +215,9 @@ int obd_llog_finish(struct obd_device *obd, int count); /* llog_ioctl.c */ int llog_ioctl(const struct lu_env *env, struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data); -int llog_catalog_list(struct obd_device *obd, int count, - struct obd_ioctl_data *data); /* llog_net.c */ int llog_initiator_connect(struct llog_ctxt *ctxt); -int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp); -int llog_origin_connect(struct llog_ctxt *ctxt, - struct llog_logid *logid, struct llog_gen *gen, - struct obd_uuid *uuid); -int llog_handle_connect(struct ptlrpc_request *req); - -/* recov_thread.c */ -int llog_obd_repl_cancel(const struct lu_env *env, struct llog_ctxt *ctxt, - struct lov_stripe_md *lsm, int count, - struct llog_cookie *cookies, int flags); -int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp, - int flags); -int llog_obd_repl_connect(struct llog_ctxt *ctxt, - struct llog_logid *logid, struct llog_gen *gen, - struct obd_uuid *uuid); struct llog_operations { int (*lop_destroy)(const struct lu_env *env, @@ -357,12 +321,6 @@ struct llog_handle { /* llog_lvfs.c */ extern struct llog_operations llog_lvfs_ops; -int llog_get_cat_list(struct obd_device *disk_obd, - char *name, int idx, int count, - struct llog_catid *idarray); - -int llog_put_cat_list(struct obd_device *disk_obd, - char *name, int idx, int count, struct llog_catid *idarray); /* llog_osd.c */ extern struct llog_operations llog_osd_ops; @@ -378,7 +336,6 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, struct llog_ctxt { int loc_idx; /* my index the obd array of ctxt's */ - struct llog_gen loc_gen; struct obd_device *loc_obd; /* points back to the containing obd*/ struct obd_llog_group *loc_olg; /* group containing that ctxt */ struct obd_export *loc_exp; /* parent "disk" export (e.g. MDS) */ @@ -386,125 +343,12 @@ struct llog_ctxt { pointing import */ struct llog_operations *loc_logops; struct llog_handle *loc_handle; - struct llog_commit_master *loc_lcm; - struct llog_canceld_ctxt *loc_llcd; - struct mutex loc_mutex; /* protect loc_llcd and loc_imp */ + struct mutex loc_mutex; /* protect loc_imp */ cfs_atomic_t loc_refcount; - void *llog_proc_cb; long loc_flags; /* flags, see above defines */ struct dt_object *loc_dir; }; -#define LCM_NAME_SIZE 64 - -struct llog_commit_master { - /** - * Thread control flags (start, stop, etc.) - */ - long lcm_flags; - /** - * Number of llcds onthis lcm. - */ - cfs_atomic_t lcm_count; - /** - * The refcount for lcm - */ - cfs_atomic_t lcm_refcount; - /** - * Thread control structure. Used for control commit thread. - */ - struct ptlrpcd_ctl lcm_pc; - /** - * Lock protecting list of llcds. - */ - spinlock_t lcm_lock; - /** - * Llcds in flight for debugging purposes. - */ - cfs_list_t lcm_llcds; - /** - * Commit thread name buffer. Only used for thread start. - */ - char lcm_name[LCM_NAME_SIZE]; -}; - -static inline struct llog_commit_master -*lcm_get(struct llog_commit_master *lcm) -{ - cfs_atomic_inc(&lcm->lcm_refcount); - return lcm; -} - -static inline void -lcm_put(struct llog_commit_master *lcm) -{ - LASSERT_ATOMIC_POS(&lcm->lcm_refcount); - if (cfs_atomic_dec_and_test(&lcm->lcm_refcount)) - OBD_FREE_PTR(lcm); -} - -struct llog_canceld_ctxt { - /** - * Llog context this llcd is attached to. Used for accessing - * ->loc_import and others in process of canceling cookies - * gathered in this llcd. - */ - struct llog_ctxt *llcd_ctxt; - /** - * Cancel thread control stucture pointer. Used for accessing - * it to see if should stop processing and other needs. - */ - struct llog_commit_master *llcd_lcm; - /** - * Maximal llcd size. Used in calculations on how much of room - * left in llcd to cookie comming cookies. - */ - int llcd_size; - /** - * Link to lcm llcds list. - */ - cfs_list_t llcd_list; - /** - * Current llcd size while gathering cookies. This should not be - * more than ->llcd_size. Used for determining if we need to - * send this llcd (if full) and allocate new one. This is also - * used for copying new cookie at the end of buffer. - */ - int llcd_cookiebytes; - /** - * Pointer to the start of cookies buffer. - */ - struct llog_cookie llcd_cookies[0]; -}; - -/* ptlrpc/recov_thread.c */ -extern struct llog_commit_master *llog_recov_thread_init(char *name); -extern void llog_recov_thread_fini(struct llog_commit_master *lcm, - int force); -extern int llog_recov_thread_start(struct llog_commit_master *lcm); -extern void llog_recov_thread_stop(struct llog_commit_master *lcm, - int force); - -static inline void llog_gen_init(struct llog_ctxt *ctxt) -{ - struct obd_device *obd = ctxt->loc_exp->exp_obd; - - LASSERTF(obd->u.obt.obt_magic == OBT_MAGIC, - "%s: wrong obt magic %#x\n", - obd->obd_name, obd->u.obt.obt_magic); - ctxt->loc_gen.mnt_cnt = obd->u.obt.obt_mount_count; - ctxt->loc_gen.conn_cnt++; -} - -static inline int llog_gen_lt(struct llog_gen a, struct llog_gen b) -{ - if (a.mnt_cnt < b.mnt_cnt) - return 1; - if (a.mnt_cnt > b.mnt_cnt) - return 0; - return(a.conn_cnt < b.conn_cnt ? 1 : 0); -} - #define LLOG_PROC_BREAK 0x0001 #define LLOG_DEL_RECORD 0x0002 @@ -562,20 +406,6 @@ static inline void llog_group_init(struct obd_llog_group *olg, int group) olg->olg_seq = group; } -static inline void llog_group_set_export(struct obd_llog_group *olg, - struct obd_export *exp) -{ - LASSERT(exp != NULL); - - spin_lock(&olg->olg_lock); - if (olg->olg_exp != NULL && olg->olg_exp != exp) - CWARN("%s: export for group %d is changed: 0x%p -> 0x%p\n", - exp->exp_obd->obd_name, olg->olg_seq, - olg->olg_exp, exp); - olg->olg_exp = exp; - spin_unlock(&olg->olg_lock); -} - static inline int llog_group_set_ctxt(struct obd_llog_group *olg, struct llog_ctxt *ctxt, int index) { diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 60a7e57..70b0555 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -370,9 +370,6 @@ struct filter_obd { */ struct cfs_hash *fo_iobuf_hash; - cfs_list_t fo_llog_list; - spinlock_t fo_llog_list_lock; - struct brw_stats fo_filter_stats; int fo_fmd_max_num; /* per exp filter_mod_data */ @@ -389,7 +386,6 @@ struct filter_obd { unsigned int fo_fl_oss_capa; cfs_list_t fo_capa_keys; cfs_hlist_head_t *fo_capa_hash; - struct llog_commit_master *fo_lcm; int fo_sec_level; }; @@ -437,9 +433,6 @@ struct client_obd { enum lustre_sec_part cl_sp_to; struct sptlrpc_flavor cl_flvr_mgc; /* fixed flavor of mgc->mgs */ - //struct llog_canceld_ctxt *cl_llcd; /* it's included by obd_llog_ctxt */ - void *cl_llcd_offset; - /* the grant values are protected by loi_list_lock below */ long cl_dirty; /* all _dirty_ in bytes */ long cl_dirty_max; /* allowed w/o rpc */ @@ -959,13 +952,10 @@ struct target_recovery_data { }; struct obd_llog_group { - cfs_list_t olg_list; int olg_seq; struct llog_ctxt *olg_ctxts[LLOG_MAX_CTXTS]; cfs_waitq_t olg_waitq; spinlock_t olg_lock; - struct obd_export *olg_exp; - int olg_initializing; struct mutex olg_cat_processing; }; diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index f710646..575955d 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1213,18 +1213,6 @@ obd_lvfs_fid2dentry(struct obd_export *exp, __u64 id_ino, __u32 gen, __u64 gr) return ctxt->cb_ops.l_fid2dentry(id_ino, gen, gr, exp->exp_obd); } -static inline int -obd_lvfs_open_llog(struct obd_export *exp, __u64 id_ino, struct dentry *dentry) -{ - LASSERT(exp->exp_obd); - CERROR("FIXME what's the story here? This needs to be an obd fn?\n"); -#if 0 - return lvfs_open_llog(&exp->exp_obd->obd_lvfs_ctxt, id_ino, - dentry, exp->exp_obd); -#endif - return 0; -} - /* @max_age is the oldest time in jiffies that we accept using a cached data. * If the cache is older than @max_age we will get a new value from the * target. Use a value of "cfs_time_current() + HZ" to guarantee freshness. */ diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index fbf2766..ced4c76 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -635,72 +635,6 @@ int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh, } EXPORT_SYMBOL(llog_cat_process); -#ifdef __KERNEL__ -int llog_cat_process_thread(void *data) -{ - struct llog_process_cat_args *args = data; - struct llog_ctxt *ctxt = args->lpca_ctxt; - struct llog_handle *llh = NULL; - llog_cb_t cb = args->lpca_cb; - struct llog_thread_info *lgi; - struct lu_env env; - int rc; - ENTRY; - - cfs_daemonize_ctxt("ll_log_process"); - - rc = lu_env_init(&env, LCT_LOCAL); - if (rc) - GOTO(out, rc); - lgi = llog_info(&env); - LASSERT(lgi); - - lgi->lgi_logid = *(struct llog_logid *)(args->lpca_arg); - rc = llog_open(&env, ctxt, &llh, &lgi->lgi_logid, NULL, - LLOG_OPEN_EXISTS); - if (rc) { - CERROR("%s: cannot open llog "LPX64":%x: rc = %d\n", - ctxt->loc_obd->obd_name, lgi->lgi_logid.lgl_oid, - lgi->lgi_logid.lgl_ogen, rc); - GOTO(out_env, rc); - } - rc = llog_init_handle(&env, llh, LLOG_F_IS_CAT, NULL); - if (rc) { - CERROR("%s: llog_init_handle failed: rc = %d\n", - llh->lgh_ctxt->loc_obd->obd_name, rc); - GOTO(release_llh, rc); - } - - if (cb) { - rc = llog_cat_process(&env, llh, cb, NULL, 0, 0); - if (rc != LLOG_PROC_BREAK && rc != 0) - CERROR("%s: llog_cat_process() failed: rc = %d\n", - llh->lgh_ctxt->loc_obd->obd_name, rc); - cb(&env, llh, NULL, NULL); - } else { - CWARN("No callback function for recovery\n"); - } - - /* - * Make sure that all cached data is sent. - */ - llog_sync(ctxt, NULL, 0); - GOTO(release_llh, rc); -release_llh: - rc = llog_cat_close(&env, llh); - if (rc) - CERROR("%s: llog_cat_close() failed: rc = %d\n", - llh->lgh_ctxt->loc_obd->obd_name, rc); -out_env: - lu_env_fini(&env); -out: - llog_ctxt_put(ctxt); - OBD_FREE_PTR(args); - return rc; -} -EXPORT_SYMBOL(llog_cat_process_thread); -#endif - static int llog_cat_reverse_process_cb(const struct lu_env *env, struct llog_handle *cat_llh, struct llog_rec_hdr *rec, void *data) diff --git a/lustre/obdclass/llog_internal.h b/lustre/obdclass/llog_internal.h index 9d51c6a..32f3f96 100644 --- a/lustre/obdclass/llog_internal.h +++ b/lustre/obdclass/llog_internal.h @@ -45,7 +45,6 @@ struct llog_process_info { void *lpi_cbdata; void *lpi_catdata; int lpi_rc; - int lpi_flags; struct completion lpi_completion; const struct lu_env *lpi_env; @@ -54,17 +53,11 @@ struct llog_process_info { struct llog_thread_info { struct lu_attr lgi_attr; struct lu_fid lgi_fid; - struct llog_logid lgi_logid; struct dt_object_format lgi_dof; - struct llog_process_data lgi_lpd; - struct lustre_mdt_attrs lgi_lma_attr; - struct lu_buf lgi_buf; loff_t lgi_off; - struct llog_rec_hdr lgi_lrh; struct llog_rec_tail lgi_tail; - struct llog_logid_rec lgi_lid; }; extern struct lu_context_key llog_thread_key; diff --git a/lustre/obdclass/llog_ioctl.c b/lustre/obdclass/llog_ioctl.c index f377397..e210d95 100644 --- a/lustre/obdclass/llog_ioctl.c +++ b/lustre/obdclass/llog_ioctl.c @@ -422,51 +422,3 @@ out_close: RETURN(rc); } EXPORT_SYMBOL(llog_ioctl); - -#ifdef HAVE_LDISKFS_OSD -int llog_catalog_list(struct obd_device *obd, int count, - struct obd_ioctl_data *data) -{ - int size, i; - struct llog_catid *idarray; - struct llog_logid *id; - char name[32] = CATLIST; - char *out; - int l, remains, rc = 0; - - ENTRY; - size = sizeof(*idarray) * count; - - OBD_ALLOC_LARGE(idarray, size); - if (!idarray) - RETURN(-ENOMEM); - - mutex_lock(&obd->obd_olg.olg_cat_processing); - rc = llog_get_cat_list(obd, name, 0, count, idarray); - if (rc) - GOTO(out, rc); - - out = data->ioc_bulk; - remains = data->ioc_inllen1; - for (i = 0; i < count; i++) { - id = &idarray[i].lci_logid; - l = snprintf(out, remains, - "catalog log: #"LPX64"#"LPX64"#%08x\n", - id->lgl_oid, id->lgl_oseq, id->lgl_ogen); - out += l; - remains -= l; - if (remains <= 0) { - CWARN("not enough memory for catlog list\n"); - break; - } - } -out: - /* release semaphore */ - mutex_unlock(&obd->obd_olg.olg_cat_processing); - - OBD_FREE_LARGE(idarray, size); - RETURN(rc); - -} -EXPORT_SYMBOL(llog_catalog_list); -#endif diff --git a/lustre/obdclass/llog_lvfs.c b/lustre/obdclass/llog_lvfs.c index 9373474..8d9deb1 100644 --- a/lustre/obdclass/llog_lvfs.c +++ b/lustre/obdclass/llog_lvfs.c @@ -839,107 +839,6 @@ static int llog_lvfs_destroy(const struct lu_env *env, RETURN(rc); } -/* reads the catalog list */ -int llog_get_cat_list(struct obd_device *disk_obd, - char *name, int idx, int count, struct llog_catid *idarray) -{ - struct lvfs_run_ctxt saved; - struct l_file *file; - int rc, rc1 = 0; - int size = sizeof(*idarray) * count; - loff_t off = idx * sizeof(*idarray); - ENTRY; - - if (!count) - RETURN(0); - - push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL); - file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700); - if (!file || IS_ERR(file)) { - rc = PTR_ERR(file); - CERROR("OBD filter: cannot open/create %s: rc = %d\n", - name, rc); - GOTO(out, rc); - } - - if (!S_ISREG(file->f_dentry->d_inode->i_mode)) { - CERROR("%s is not a regular file!: mode = %o\n", name, - file->f_dentry->d_inode->i_mode); - GOTO(out, rc = -ENOENT); - } - - CDEBUG(D_CONFIG, "cat list: disk size=%d, read=%d\n", - (int)i_size_read(file->f_dentry->d_inode), size); - - /* read for new ost index or for empty file */ - memset(idarray, 0, size); - if (i_size_read(file->f_dentry->d_inode) < off) - GOTO(out, rc = 0); - - rc = fsfilt_read_record(disk_obd, file, idarray, size, &off); - if (rc) { - CERROR("OBD filter: error reading %s: rc %d\n", name, rc); - GOTO(out, rc); - } - - EXIT; - out: - pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL); - if (file && !IS_ERR(file)) - rc1 = filp_close(file, 0); - if (rc == 0) - rc = rc1; - return rc; -} -EXPORT_SYMBOL(llog_get_cat_list); - -/* writes the cat list */ -int llog_put_cat_list(struct obd_device *disk_obd, - char *name, int idx, int count, struct llog_catid *idarray) -{ - struct lvfs_run_ctxt saved; - struct l_file *file; - int rc, rc1 = 0; - int size = sizeof(*idarray) * count; - loff_t off = idx * sizeof(*idarray); - - if (!count) - GOTO(out1, rc = 0); - - push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL); - file = filp_open(name, O_RDWR | O_CREAT | O_LARGEFILE, 0700); - if (!file || IS_ERR(file)) { - rc = PTR_ERR(file); - CERROR("OBD filter: cannot open/create %s: rc = %d\n", - name, rc); - GOTO(out, rc); - } - - if (!S_ISREG(file->f_dentry->d_inode->i_mode)) { - CERROR("%s is not a regular file!: mode = %o\n", name, - file->f_dentry->d_inode->i_mode); - GOTO(out, rc = -ENOENT); - } - - rc = fsfilt_write_record(disk_obd, file, idarray, size, &off, 1); - if (rc) { - CDEBUG(D_INODE,"OBD filter: error writeing %s: rc %d\n", - name, rc); - GOTO(out, rc); - } - -out: - pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL); - if (file && !IS_ERR(file)) - rc1 = filp_close(file, 0); - - if (rc == 0) - rc = rc1; -out1: - RETURN(rc); -} -EXPORT_SYMBOL(llog_put_cat_list); - static int llog_lvfs_declare_create(const struct lu_env *env, struct llog_handle *res, struct thandle *th) @@ -970,21 +869,5 @@ struct llog_operations llog_lvfs_ops = { }; EXPORT_SYMBOL(llog_lvfs_ops); #else /* !__KERNEL__ */ -int llog_get_cat_list(struct obd_device *disk_obd, - char *name, int idx, int count, - struct llog_catid *idarray) -{ - LBUG(); - return 0; -} - -int llog_put_cat_list(struct obd_device *disk_obd, - char *name, int idx, int count, - struct llog_catid *idarray) -{ - LBUG(); - return 0; -} - struct llog_operations llog_lvfs_ops = {}; #endif diff --git a/lustre/obdclass/llog_obd.c b/lustre/obdclass/llog_obd.c index da54ff1..775dc11 100644 --- a/lustre/obdclass/llog_obd.c +++ b/lustre/obdclass/llog_obd.c @@ -69,7 +69,6 @@ static void llog_ctxt_destroy(struct llog_ctxt *ctxt) class_import_put(ctxt->loc_imp); ctxt->loc_imp = NULL; } - LASSERT(ctxt->loc_llcd == NULL); OBD_FREE_PTR(ctxt); } @@ -87,9 +86,6 @@ int __llog_ctxt_put(const struct lu_env *env, struct llog_ctxt *ctxt) olg->olg_ctxts[ctxt->loc_idx] = NULL; spin_unlock(&olg->olg_lock); - if (ctxt->loc_lcm) - lcm_put(ctxt->loc_lcm); - obd = ctxt->loc_obd; spin_lock(&obd->obd_dev_lock); /* sync with llog ctxt user thread */ @@ -281,24 +277,6 @@ int llog_cancel(const struct lu_env *env, struct llog_ctxt *ctxt, } EXPORT_SYMBOL(llog_cancel); -/* add for obdfilter/sz and mds/unlink */ -int llog_obd_origin_add(const struct lu_env *env, struct llog_ctxt *ctxt, - struct llog_rec_hdr *rec, struct lov_stripe_md *lsm, - struct llog_cookie *logcookies, int numcookies) -{ - struct llog_handle *cathandle; - int rc; - ENTRY; - - cathandle = ctxt->loc_handle; - LASSERT(cathandle != NULL); - rc = llog_cat_add(env, cathandle, rec, logcookies, NULL); - if (rc != 0 && rc != 1) - CERROR("write one catalog record failed: %d\n", rc); - RETURN(rc); -} -EXPORT_SYMBOL(llog_obd_origin_add); - int obd_llog_init(struct obd_device *obd, struct obd_llog_group *olg, struct obd_device *disk_obd, int *index) { diff --git a/lustre/ptlrpc/Makefile.in b/lustre/ptlrpc/Makefile.in index 0ea5fbf..fbfaa85 100644 --- a/lustre/ptlrpc/Makefile.in +++ b/lustre/ptlrpc/Makefile.in @@ -10,7 +10,7 @@ ldlm_objs += $(LDLM)ldlm_flock.o $(LDLM)ldlm_inodebits.o ldlm_objs += $(LDLM)ldlm_pool.o ldlm_objs += $(LDLM)interval_tree.o ptlrpc_objs := client.o recover.o connection.o niobuf.o pack_generic.o -ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o recov_thread.o +ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o ptlrpc_objs += pers.o lproc_ptlrpc.o wiretest.o layout.o ptlrpc_objs += sec.o sec_bulk.o sec_gc.o sec_config.o sec_lproc.o diff --git a/lustre/ptlrpc/autoMakefile.am b/lustre/ptlrpc/autoMakefile.am index 2b1d88a..77ad215 100644 --- a/lustre/ptlrpc/autoMakefile.am +++ b/lustre/ptlrpc/autoMakefile.am @@ -51,7 +51,7 @@ LDLM_COMM_SOURCES= $(top_srcdir)/lustre/ldlm/l_lock.c \ $(top_srcdir)/lustre/ldlm/ldlm_pool.c COMMON_SOURCES = client.c recover.c connection.c niobuf.c pack_generic.c \ - events.c ptlrpc_module.c service.c pinger.c recov_thread.c llog_net.c \ + events.c ptlrpc_module.c service.c pinger.c llog_net.c \ llog_client.c llog_server.c import.c ptlrpcd.c pers.c wiretest.c \ ptlrpc_internal.h layout.c sec.c sec_bulk.c sec_gc.c sec_config.c \ sec_lproc.c sec_null.c sec_plain.c lproc_ptlrpc.c nrs.c nrs_fifo.c \ @@ -91,7 +91,6 @@ ptlrpc_SOURCES = \ pinger.c \ ptlrpcd.c \ recover.c \ - recov_thread.c \ service.c \ nrs.c \ nrs_fifo.c \ diff --git a/lustre/ptlrpc/llog_net.c b/lustre/ptlrpc/llog_net.c index ffc378b..3245055 100644 --- a/lustre/ptlrpc/llog_net.c +++ b/lustre/ptlrpc/llog_net.c @@ -58,129 +58,6 @@ #include #include -#ifdef __KERNEL__ -int llog_origin_connect(struct llog_ctxt *ctxt, - struct llog_logid *logid, struct llog_gen *gen, - struct obd_uuid *uuid) -{ - struct llog_gen_rec *lgr; - struct ptlrpc_request *req; - struct llogd_conn_body *req_body; - struct inode* inode = ctxt->loc_handle->lgh_file->f_dentry->d_inode; - void *handle; - int rc, rc1; - - ENTRY; - - if (cfs_list_empty(&ctxt->loc_handle->u.chd.chd_head)) { - CDEBUG(D_HA, "there is no record related to ctxt %p\n", ctxt); - RETURN(0); - } - - /* FIXME what value for gen->conn_cnt */ - llog_gen_init(ctxt); - - /* first add llog_gen_rec */ - OBD_ALLOC_PTR(lgr); - if (!lgr) - RETURN(-ENOMEM); - lgr->lgr_hdr.lrh_len = lgr->lgr_tail.lrt_len = sizeof(*lgr); - lgr->lgr_hdr.lrh_type = LLOG_GEN_REC; - - handle = fsfilt_start_log(ctxt->loc_exp->exp_obd, inode, - FSFILT_OP_CANCEL_UNLINK, NULL, 1); - if (IS_ERR(handle)) { - CERROR("fsfilt_start failed: %ld\n", PTR_ERR(handle)); - OBD_FREE(lgr, sizeof(*lgr)); - rc = PTR_ERR(handle); - RETURN(rc); - } - - lgr->lgr_gen = ctxt->loc_gen; - rc = llog_obd_add(NULL, ctxt, &lgr->lgr_hdr, NULL, NULL, 1); - OBD_FREE_PTR(lgr); - rc1 = fsfilt_commit(ctxt->loc_exp->exp_obd, inode, handle, 0); - if (rc != 1 || rc1 != 0) { - rc = (rc != 1) ? rc : rc1; - RETURN(rc); - } - - LASSERT(ctxt->loc_imp); - req = ptlrpc_request_alloc_pack(ctxt->loc_imp, &RQF_LLOG_ORIGIN_CONNECT, - LUSTRE_LOG_VERSION, - LLOG_ORIGIN_CONNECT); - if (req == NULL) - RETURN(-ENOMEM); - - CDEBUG(D_OTHER, "%s mount_count "LPU64", connection count "LPU64"\n", - ctxt->loc_exp->exp_obd->obd_type->typ_name, - ctxt->loc_gen.mnt_cnt, ctxt->loc_gen.conn_cnt); - - req_body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY); - req_body->lgdc_gen = ctxt->loc_gen; - req_body->lgdc_logid = ctxt->loc_handle->lgh_id; - req_body->lgdc_ctxt_idx = ctxt->loc_idx + 1; - ptlrpc_request_set_replen(req); - - req->rq_no_resend = req->rq_no_delay = 1; - rc = ptlrpc_queue_wait(req); - ptlrpc_req_finished(req); - - RETURN(rc); -} -EXPORT_SYMBOL(llog_origin_connect); - -int llog_handle_connect(struct ptlrpc_request *req) -{ - struct obd_device *obd = req->rq_export->exp_obd; - struct llogd_conn_body *req_body; - struct llog_ctxt *ctxt; - int rc; - ENTRY; - - req_body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY); - - ctxt = llog_get_context(obd, req_body->lgdc_ctxt_idx); - rc = llog_connect(ctxt, &req_body->lgdc_logid, - &req_body->lgdc_gen, NULL); - - llog_ctxt_put(ctxt); - if (rc != 0) - CERROR("failed at llog_relp_connect\n"); - - RETURN(rc); -} -EXPORT_SYMBOL(llog_handle_connect); - -int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp) -{ - ENTRY; - - LASSERT(ctxt); - mutex_lock(&ctxt->loc_mutex); - if (ctxt->loc_imp != imp) { - if (ctxt->loc_imp) { - CWARN("changing the import %p - %p\n", - ctxt->loc_imp, imp); - class_import_put(ctxt->loc_imp); - } - ctxt->loc_imp = class_import_get(imp); - } - mutex_unlock(&ctxt->loc_mutex); - RETURN(0); -} -EXPORT_SYMBOL(llog_receptor_accept); - -#else /* !__KERNEL__ */ - -int llog_origin_connect(struct llog_ctxt *ctxt, - struct llog_logid *logid, struct llog_gen *gen, - struct obd_uuid *uuid) -{ - return 0; -} -#endif - int llog_initiator_connect(struct llog_ctxt *ctxt) { struct obd_import *new_imp; diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index e72ee14..b24185a 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -2294,16 +2294,6 @@ void lustre_swab_ldlm_reply (struct ldlm_reply *r) } EXPORT_SYMBOL(lustre_swab_ldlm_reply); -/* no one calls this */ -int llog_log_swabbed(struct llog_log_hdr *hdr) -{ - if (hdr->llh_hdr.lrh_type == __swab32(LLOG_HDR_MAGIC)) - return 1; - if (hdr->llh_hdr.lrh_type == LLOG_HDR_MAGIC) - return 0; - return -1; -} - void lustre_swab_quota_body(struct quota_body *b) { lustre_swab_lu_fid(&b->qb_fid); diff --git a/lustre/ptlrpc/ptlrpc_internal.h b/lustre/ptlrpc/ptlrpc_internal.h index c2d5050..5c24498 100644 --- a/lustre/ptlrpc/ptlrpc_internal.h +++ b/lustre/ptlrpc/ptlrpc_internal.h @@ -268,10 +268,6 @@ void sptlrpc_conf_fini(void); int sptlrpc_init(void); void sptlrpc_fini(void); -/* recov_thread.c */ -int llog_recov_init(void); -void llog_recov_fini(void); - static inline int ll_rpc_recoverable_error(int rc) { return (rc == -ENOTCONN || rc == -ENODEV); diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index 51869db..dd689b7 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -104,11 +104,6 @@ __init int ptlrpc_init(void) if (rc) GOTO(cleanup, rc); - cleanup_phase = 6; - rc = llog_recov_init(); - if (rc) - GOTO(cleanup, rc); - cleanup_phase = 7; rc = ptlrpc_nrs_init(); if (rc) @@ -129,8 +124,6 @@ cleanup: ptlrpc_nrs_fini(); #endif case 7: - llog_recov_fini(); - case 6: sptlrpc_fini(); case 5: ldlm_exit(); @@ -154,7 +147,6 @@ static void __exit ptlrpc_exit(void) { tgt_mod_exit(); ptlrpc_nrs_fini(); - llog_recov_fini(); sptlrpc_fini(); ldlm_exit(); ptlrpc_stop_pinger(); diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c deleted file mode 100644 index dee77cd..0000000 --- a/lustre/ptlrpc/recov_thread.c +++ /dev/null @@ -1,776 +0,0 @@ -/* - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License version 2 for more details (a copy is included - * in the LICENSE file that accompanied this code). - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - * - * GPL HEADER END - */ -/* - * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. - * Use is subject to license terms. - * - * Copyright (c) 2011, 2012, Intel Corporation. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. - * - * lustre/ptlrpc/recov_thread.c - * - * OST<->MDS recovery logging thread. - * Invariants in implementation: - * - we do not share logs among different OST<->MDS connections, so that - * if an OST or MDS fails it need only look at log(s) relevant to itself - * - * Author: Andreas Dilger - * Yury Umanets - * Alexey Lyashkov - */ - -#define DEBUG_SUBSYSTEM S_LOG - -#ifdef __KERNEL__ -# include -#else -# include -# include -#endif - -#include -#include -#include -#include -#include -#include -#include -#include "ptlrpc_internal.h" - -static cfs_atomic_t llcd_count = CFS_ATOMIC_INIT(0); -static cfs_mem_cache_t *llcd_cache = NULL; - -#ifdef __KERNEL__ -enum { - LLOG_LCM_FL_START = 1 << 0, - LLOG_LCM_FL_EXIT = 1 << 1 -}; - -struct llcd_async_args { - struct llog_canceld_ctxt *la_ctxt; -}; - -static void llcd_print(struct llog_canceld_ctxt *llcd, - const char *func, int line) -{ - CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line); - CDEBUG(D_RPCTRACE, " size: %d\n", llcd->llcd_size); - CDEBUG(D_RPCTRACE, " ctxt: %p\n", llcd->llcd_ctxt); - CDEBUG(D_RPCTRACE, " lcm : %p\n", llcd->llcd_lcm); - CDEBUG(D_RPCTRACE, " cookiebytes : %d\n", llcd->llcd_cookiebytes); -} - -/** - * Allocate new llcd from cache, init it and return to caller. - * Bumps number of objects allocated. - */ -static struct llog_canceld_ctxt *llcd_alloc(struct llog_commit_master *lcm) -{ - struct llog_canceld_ctxt *llcd; - int size, overhead; - - LASSERT(lcm != NULL); - - /* - * We want to send one page of cookies with rpc header. This buffer - * will be assigned later to the rpc, this is why we preserve the - * space for rpc header. - */ - size = CFS_PAGE_SIZE - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL); - overhead = offsetof(struct llog_canceld_ctxt, llcd_cookies); - OBD_SLAB_ALLOC_GFP(llcd, llcd_cache, size + overhead, CFS_ALLOC_STD); - if (!llcd) - return NULL; - - CFS_INIT_LIST_HEAD(&llcd->llcd_list); - llcd->llcd_cookiebytes = 0; - llcd->llcd_size = size; - - spin_lock(&lcm->lcm_lock); - llcd->llcd_lcm = lcm; - cfs_atomic_inc(&lcm->lcm_count); - cfs_list_add_tail(&llcd->llcd_list, &lcm->lcm_llcds); - spin_unlock(&lcm->lcm_lock); - cfs_atomic_inc(&llcd_count); - - CDEBUG(D_RPCTRACE, "Alloc llcd %p on lcm %p (%d)\n", - llcd, lcm, cfs_atomic_read(&lcm->lcm_count)); - - return llcd; -} - -/** - * Returns passed llcd to cache. - */ -static void llcd_free(struct llog_canceld_ctxt *llcd) -{ - struct llog_commit_master *lcm = llcd->llcd_lcm; - int size; - - if (lcm) { - if (cfs_atomic_read(&lcm->lcm_count) == 0) { - CERROR("Invalid llcd free %p\n", llcd); - llcd_print(llcd, __FUNCTION__, __LINE__); - LBUG(); - } - spin_lock(&lcm->lcm_lock); - LASSERT(!cfs_list_empty(&llcd->llcd_list)); - cfs_list_del_init(&llcd->llcd_list); - cfs_atomic_dec(&lcm->lcm_count); - spin_unlock(&lcm->lcm_lock); - - CDEBUG(D_RPCTRACE, "Free llcd %p on lcm %p (%d)\n", - llcd, lcm, cfs_atomic_read(&lcm->lcm_count)); - } - - LASSERT(cfs_atomic_read(&llcd_count) > 0); - cfs_atomic_dec(&llcd_count); - - size = offsetof(struct llog_canceld_ctxt, llcd_cookies) + - llcd->llcd_size; - OBD_SLAB_FREE(llcd, llcd_cache, size); -} - -/** - * Checks if passed cookie fits into llcd free space buffer. Returns - * 1 if yes and 0 otherwise. - */ -static inline int -llcd_fit(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies) -{ - return (llcd->llcd_size - llcd->llcd_cookiebytes >= sizeof(*cookies)); -} - -/** - * Copy passed @cookies to @llcd. - */ -static inline void -llcd_copy(struct llog_canceld_ctxt *llcd, struct llog_cookie *cookies) -{ - LASSERT(llcd_fit(llcd, cookies)); - memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, - cookies, sizeof(*cookies)); - llcd->llcd_cookiebytes += sizeof(*cookies); -} - -/** - * Llcd completion function. Called uppon llcd send finish regardless - * sending result. Error is passed in @rc. Note, that this will be called - * in cleanup time when all inflight rpcs aborted. - */ -static int -llcd_interpret(const struct lu_env *env, - struct ptlrpc_request *req, void *args, int rc) -{ - struct llcd_async_args *la = args; - struct llog_canceld_ctxt *llcd = la->la_ctxt; - - CDEBUG(D_RPCTRACE, "Sent llcd %p (%d) - killing it\n", llcd, rc); - llcd_free(llcd); - return 0; -} - -/** - * Send @llcd to remote node. Free llcd uppon completion or error. Sending - * is performed in async style so this function will return asap without - * blocking. - */ -static int llcd_send(struct llog_canceld_ctxt *llcd) -{ - char *bufs[2] = { NULL, (char *)llcd->llcd_cookies }; - struct obd_import *import = NULL; - struct llog_commit_master *lcm; - struct llcd_async_args *la; - struct ptlrpc_request *req; - struct llog_ctxt *ctxt; - int rc; - ENTRY; - - ctxt = llcd->llcd_ctxt; - if (!ctxt) { - CERROR("Invalid llcd with NULL ctxt found (%p)\n", - llcd); - llcd_print(llcd, __FUNCTION__, __LINE__); - LBUG(); - } - LASSERT_MUTEX_LOCKED(&ctxt->loc_mutex); - - if (llcd->llcd_cookiebytes == 0) - GOTO(exit, rc = 0); - - lcm = llcd->llcd_lcm; - - /* - * Check if we're in exit stage. Do not send llcd in - * this case. - */ - if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) - GOTO(exit, rc = -ENODEV); - - CDEBUG(D_RPCTRACE, "Sending llcd %p\n", llcd); - - import = llcd->llcd_ctxt->loc_imp; - if (!import || (import == LP_POISON) || - (import->imp_client == LP_POISON)) { - CERROR("Invalid import %p for llcd %p\n", - import, llcd); - GOTO(exit, rc = -ENODEV); - } - - OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10); - - /* - * No need to get import here as it is already done in - * llog_receptor_accept(). - */ - req = ptlrpc_request_alloc(import, &RQF_LOG_CANCEL); - if (req == NULL) { - CERROR("Can't allocate request for sending llcd %p\n", - llcd); - GOTO(exit, rc = -ENOMEM); - } - req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, - RCL_CLIENT, llcd->llcd_cookiebytes); - - rc = ptlrpc_request_bufs_pack(req, LUSTRE_LOG_VERSION, - OBD_LOG_CANCEL, bufs, NULL); - if (rc) { - ptlrpc_request_free(req); - GOTO(exit, rc); - } - - ptlrpc_at_set_req_timeout(req); - ptlrpc_request_set_replen(req); - - /* bug 5515 */ - req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL; - req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL; - - req->rq_interpret_reply = (ptlrpc_interpterer_t)llcd_interpret; - - CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args)); - la = ptlrpc_req_async_args(req); - la->la_ctxt = llcd; - - /* llog cancels will be replayed after reconnect so this will do twice - * first from replay llog, second for resended rpc */ - req->rq_no_delay = req->rq_no_resend = 1; - - ptlrpc_set_add_new_req(&lcm->lcm_pc, req); - RETURN(0); -exit: - CDEBUG(D_RPCTRACE, "Refused llcd %p\n", llcd); - llcd_free(llcd); - return rc; -} - -/** - * Attach @llcd to @ctxt. Establish llcd vs. ctxt reserve connection - * so hat they can refer each other. - */ -static int -llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd) -{ - LASSERT(ctxt != NULL && llcd != NULL); - LASSERT_MUTEX_LOCKED(&ctxt->loc_mutex); - LASSERT(ctxt->loc_llcd == NULL); - llcd->llcd_ctxt = llog_ctxt_get(ctxt); - ctxt->loc_llcd = llcd; - - CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p\n", - llcd, ctxt); - - return 0; -} - -/** - * Opposite to llcd_attach(). Detaches llcd from its @ctxt. This makes - * sure that this llcd will not be found another time we try to cancel. - */ -static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt) -{ - struct llog_canceld_ctxt *llcd; - - LASSERT(ctxt != NULL); - LASSERT_MUTEX_LOCKED(&ctxt->loc_mutex); - - llcd = ctxt->loc_llcd; - if (!llcd) - return NULL; - - CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p\n", - llcd, ctxt); - - ctxt->loc_llcd = NULL; - llog_ctxt_put(ctxt); - return llcd; -} - -/** - * Return @llcd cached in @ctxt. Allocate new one if required. Attach it - * to ctxt so that it may be used for gathering cookies and sending. - */ -static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt) -{ - struct llog_canceld_ctxt *llcd; - LASSERT(ctxt); - llcd = llcd_alloc(ctxt->loc_lcm); - if (!llcd) { - CERROR("Can't alloc an llcd for ctxt %p\n", ctxt); - return NULL; - } - llcd_attach(ctxt, llcd); - return llcd; -} - -/** - * Deatch llcd from its @ctxt. Free llcd. - */ -static void llcd_put(struct llog_ctxt *ctxt) -{ - struct llog_canceld_ctxt *llcd; - - llcd = llcd_detach(ctxt); - if (llcd) - llcd_free(llcd); -} - -/** - * Detach llcd from its @ctxt so that nobody will find it with try to - * re-use. Send llcd to remote node. - */ -static int llcd_push(struct llog_ctxt *ctxt) -{ - struct llog_canceld_ctxt *llcd; - int rc; - - /* - * Make sure that this llcd will not be sent again as we detach - * it from ctxt. - */ - llcd = llcd_detach(ctxt); - if (!llcd) { - CERROR("Invalid detached llcd found %p\n", llcd); - llcd_print(llcd, __FUNCTION__, __LINE__); - LBUG(); - } - - rc = llcd_send(llcd); - if (rc) - CERROR("Couldn't send llcd %p (%d)\n", llcd, rc); - return rc; -} - -/** - * Start recovery thread which actually deals llcd sending. This - * is all ptlrpc standard thread based so there is not much of work - * to do. - */ -int llog_recov_thread_start(struct llog_commit_master *lcm) -{ - int rc; - ENTRY; - - rc = ptlrpcd_start(-1, 1, lcm->lcm_name, &lcm->lcm_pc); - if (rc) { - CERROR("Error %d while starting recovery thread %s\n", - rc, lcm->lcm_name); - RETURN(rc); - } - RETURN(rc); -} -EXPORT_SYMBOL(llog_recov_thread_start); - -/** - * Stop recovery thread. Complement to llog_recov_thread_start(). - */ -void llog_recov_thread_stop(struct llog_commit_master *lcm, int force) -{ - ENTRY; - - /* - * Let all know that we're stopping. This will also make - * llcd_send() refuse any new llcds. - */ - set_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags); - - /* - * Stop processing thread. No new rpcs will be accepted for - * for processing now. - */ - ptlrpcd_stop(&lcm->lcm_pc, force); - ptlrpcd_free(&lcm->lcm_pc); - - /* - * By this point no alive inflight llcds should be left. Only - * those forgotten in sync may still be attached to ctxt. Let's - * print them. - */ - if (cfs_atomic_read(&lcm->lcm_count) != 0) { - struct llog_canceld_ctxt *llcd; - cfs_list_t *tmp; - - CERROR("Busy llcds found (%d) on lcm %p\n", - cfs_atomic_read(&lcm->lcm_count), lcm); - - spin_lock(&lcm->lcm_lock); - cfs_list_for_each(tmp, &lcm->lcm_llcds) { - llcd = cfs_list_entry(tmp, struct llog_canceld_ctxt, - llcd_list); - llcd_print(llcd, __func__, __LINE__); - } - spin_unlock(&lcm->lcm_lock); - - /* - * No point to go further with busy llcds at this point - * as this is clear bug. It might mean we got hanging - * rpc which holds import ref and this means we will not - * be able to cleanup anyways. - * - * Or we just missed to kill them when they were not - * attached to ctxt. In this case our slab will remind - * us about this a bit later. - */ - LBUG(); - } - EXIT; -} -EXPORT_SYMBOL(llog_recov_thread_stop); - -/** - * Initialize commit master structure and start recovery thread on it. - */ -struct llog_commit_master *llog_recov_thread_init(char *name) -{ - struct llog_commit_master *lcm; - int rc; - ENTRY; - - OBD_ALLOC_PTR(lcm); - if (!lcm) - RETURN(NULL); - - /* - * Try to create threads with unique names. - */ - snprintf(lcm->lcm_name, sizeof(lcm->lcm_name), - "lcm_%s", name); - - cfs_atomic_set(&lcm->lcm_count, 0); - cfs_atomic_set(&lcm->lcm_refcount, 1); - spin_lock_init(&lcm->lcm_lock); - CFS_INIT_LIST_HEAD(&lcm->lcm_llcds); - rc = llog_recov_thread_start(lcm); - if (rc) { - CERROR("Can't start commit thread, rc %d\n", rc); - GOTO(out, rc); - } - RETURN(lcm); -out: - OBD_FREE_PTR(lcm); - return NULL; -} -EXPORT_SYMBOL(llog_recov_thread_init); - -/** - * Finalize commit master and its recovery thread. - */ -void llog_recov_thread_fini(struct llog_commit_master *lcm, int force) -{ - ENTRY; - llog_recov_thread_stop(lcm, force); - lcm_put(lcm); - EXIT; -} -EXPORT_SYMBOL(llog_recov_thread_fini); - -static int llog_recov_thread_replay(struct llog_ctxt *ctxt, - void *cb, void *arg) -{ - struct obd_device *obd = ctxt->loc_obd; - struct llog_process_cat_args *lpca; - int rc; - ENTRY; - - if (obd->obd_stopping) - RETURN(-ENODEV); - - /* - * This will be balanced in llog_cat_process_thread() - */ - OBD_ALLOC_PTR(lpca); - if (!lpca) - RETURN(-ENOMEM); - - lpca->lpca_cb = cb; - lpca->lpca_arg = arg; - - /* - * This will be balanced in llog_cat_process_thread() - */ - lpca->lpca_ctxt = llog_ctxt_get(ctxt); - if (!lpca->lpca_ctxt) { - OBD_FREE_PTR(lpca); - RETURN(-ENODEV); - } - rc = cfs_create_thread(llog_cat_process_thread, lpca, CFS_DAEMON_FLAGS); - if (rc < 0) { - CERROR("Error starting llog_cat_process_thread(): %d\n", rc); - OBD_FREE_PTR(lpca); - llog_ctxt_put(ctxt); - } else { - CDEBUG(D_HA, "Started llog_cat_process_thread(): %d\n", rc); - rc = 0; - } - - RETURN(rc); -} - -int llog_obd_repl_connect(struct llog_ctxt *ctxt, - struct llog_logid *logid, struct llog_gen *gen, - struct obd_uuid *uuid) -{ - int rc; - ENTRY; - - /* - * Send back cached llcd from llog before recovery if we have any. - * This is void is nothing cached is found there. - */ - llog_sync(ctxt, NULL, 0); - - /* - * Start recovery in separate thread. - */ - mutex_lock(&ctxt->loc_mutex); - ctxt->loc_gen = *gen; - rc = llog_recov_thread_replay(ctxt, ctxt->llog_proc_cb, logid); - mutex_unlock(&ctxt->loc_mutex); - - RETURN(rc); -} -EXPORT_SYMBOL(llog_obd_repl_connect); - -/** - * Deleted objects have a commit callback that cancels the MDS - * log record for the deletion. The commit callback calls this - * function. - */ -int llog_obd_repl_cancel(const struct lu_env *env, struct llog_ctxt *ctxt, - struct lov_stripe_md *lsm, int count, - struct llog_cookie *cookies, int flags) -{ - struct llog_commit_master *lcm; - struct llog_canceld_ctxt *llcd; - int rc = 0; - ENTRY; - - LASSERT(ctxt != NULL); - - mutex_lock(&ctxt->loc_mutex); - if (!ctxt->loc_lcm) { - CDEBUG(D_RPCTRACE, "No lcm for ctxt %p\n", ctxt); - GOTO(out, rc = -ENODEV); - } - lcm = ctxt->loc_lcm; - CDEBUG(D_INFO, "cancel on lsm %p\n", lcm); - - /* - * Let's check if we have all structures alive. We also check for - * possible shutdown. Do nothing if we're stopping. - */ - if (ctxt->loc_flags & LLOG_CTXT_FLAG_STOP) { - CDEBUG(D_RPCTRACE, "Last sync was done for ctxt %p\n", ctxt); - GOTO(out, rc = -ENODEV); - } - - if (ctxt->loc_imp == NULL) { - CDEBUG(D_RPCTRACE, "No import for ctxt %p\n", ctxt); - GOTO(out, rc = -ENODEV); - } - - if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) { - CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n", - ctxt); - GOTO(out, rc = -ENODEV); - } - - llcd = ctxt->loc_llcd; - - if (count > 0 && cookies != NULL) { - /* - * Get new llcd from ctxt if required. - */ - if (!llcd) { - llcd = llcd_get(ctxt); - if (!llcd) - GOTO(out, rc = -ENOMEM); - /* - * Allocation is successful, let's check for stop - * flag again to fall back as soon as possible. - */ - if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) - GOTO(out, rc = -ENODEV); - } - - /* - * Llcd does not have enough room for @cookies. Let's push - * it out and allocate new one. - */ - if (!llcd_fit(llcd, cookies)) { - rc = llcd_push(ctxt); - if (rc) - GOTO(out, rc); - llcd = llcd_get(ctxt); - if (!llcd) - GOTO(out, rc = -ENOMEM); - /* - * Allocation is successful, let's check for stop - * flag again to fall back as soon as possible. - */ - if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) - GOTO(out, rc = -ENODEV); - } - - /* - * Copy cookies to @llcd, no matter old or new allocated - * one. - */ - llcd_copy(llcd, cookies); - } - - /* - * Let's check if we need to send copied @cookies asap. If yes - * then do it. - */ - if (llcd && (flags & OBD_LLOG_FL_SENDNOW)) { - CDEBUG(D_RPCTRACE, "Sync llcd %p\n", llcd); - rc = llcd_push(ctxt); - if (rc) - GOTO(out, rc); - } - EXIT; -out: - if (rc) - llcd_put(ctxt); - - if (flags & OBD_LLOG_FL_EXIT) - ctxt->loc_flags = LLOG_CTXT_FLAG_STOP; - - mutex_unlock(&ctxt->loc_mutex); - return rc; -} -EXPORT_SYMBOL(llog_obd_repl_cancel); - -int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp, - int flags) -{ - int rc = 0; - ENTRY; - - /* - * Flush any remaining llcd. - */ - mutex_lock(&ctxt->loc_mutex); - if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) { - /* - * This is ost->mds connection, we can't be sure that mds - * can still receive cookies, let's killed the cached llcd. - */ - CDEBUG(D_RPCTRACE, "Kill cached llcd\n"); - llcd_put(ctxt); - - if (flags & OBD_LLOG_FL_EXIT) - ctxt->loc_flags = LLOG_CTXT_FLAG_STOP; - - mutex_unlock(&ctxt->loc_mutex); - } else { - /* - * This is either llog_sync() from generic llog code or sync - * on client disconnect. In either way let's do it and send - * llcds to the target with waiting for completion. - */ - CDEBUG(D_RPCTRACE, "Sync cached llcd\n"); - mutex_unlock(&ctxt->loc_mutex); - rc = llog_cancel(NULL, ctxt, NULL, 0, NULL, - OBD_LLOG_FL_SENDNOW | flags); - } - RETURN(rc); -} -EXPORT_SYMBOL(llog_obd_repl_sync); - -#else /* !__KERNEL__ */ - -int llog_obd_repl_cancel(const struct lu_env *env, struct llog_ctxt *ctxt, - struct lov_stripe_md *lsm, int count, - struct llog_cookie *cookies, int flags) -{ - return 0; -} -#endif - -/** - * Module init time fucntion. Initializes slab for llcd objects. - */ -int llog_recov_init(void) -{ - int llcd_size; - - llcd_size = CFS_PAGE_SIZE - - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL); - llcd_size += offsetof(struct llog_canceld_ctxt, llcd_cookies); - llcd_cache = cfs_mem_cache_create("llcd_cache", llcd_size, 0, 0); - if (!llcd_cache) { - CERROR("Error allocating llcd cache\n"); - return -ENOMEM; - } - return 0; -} - -/** - * Module fini time fucntion. Releases slab for llcd objects. - */ -void llog_recov_fini(void) -{ - /* - * Kill llcd cache when thread is stopped and we're sure no - * llcd in use left. - */ - if (llcd_cache) { - /* - * In 2.6.22 cfs_mem_cache_destroy() will not return error - * for busy resources. Let's check it another way. - */ - LASSERTF(cfs_atomic_read(&llcd_count) == 0, - "Can't destroy llcd cache! Number of " - "busy llcds: %d\n", cfs_atomic_read(&llcd_count)); - cfs_mem_cache_destroy(llcd_cache); - llcd_cache = NULL; - } -} -- 1.8.3.1