From: yury Date: Thu, 7 Aug 2008 08:47:11 +0000 (+0000) Subject: b=14608 X-Git-Tag: v1_9_50~56 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=112a9a630f3f89b10f77a67df35233cfc4e23686 b=14608 r=shadow,wangdi - re-implements recov_thread.c in more elegant way (kills lots of code, thread, etc) and fixes couple of issues by the way. Look at bug for detailed info. --- diff --git a/lustre/include/Makefile.am b/lustre/include/Makefile.am index 150d348..bc7a500 100644 --- a/lustre/include/Makefile.am +++ b/lustre/include/Makefile.am @@ -37,12 +37,11 @@ SUBDIRS = linux lustre EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h \ - lustre_commit_confd.h lustre_debug.h lustre_disk.h \ - lustre_dlm.h lustre_export.h lustre_fsfilt.h lustre_ha.h \ - lustre_handles.h lustre_import.h lustre_lib.h lustre_sec.h \ - lustre_lite.h lustre_log.h lustre_mds.h lustre_mdc.h \ - lustre_net.h lustre_quota.h lustre_ucache.h lvfs.h class_hash.h \ - obd_cache.h obd_class.h obd_echo.h obd.h obd_lov.h \ + lustre_debug.h lustre_disk.h lustre_dlm.h lustre_export.h \ + lustre_fsfilt.h lustre_ha.h lustre_handles.h lustre_import.h \ + lustre_lib.h lustre_sec.h lustre_lite.h lustre_log.h lustre_mds.h \ + lustre_mdc.h lustre_net.h lustre_quota.h lustre_ucache.h lvfs.h \ + class_hash.h obd_cache.h obd_class.h obd_echo.h obd.h obd_lov.h \ obd_ost.h obd_support.h lustre_ver.h lu_object.h lu_time.h \ md_object.h dt_object.h lustre_param.h lustre_mdt.h \ lustre_fid.h lustre_fld.h lustre_req_layout.h lustre_capa.h \ diff --git a/lustre/include/lustre_log.h b/lustre/include/lustre_log.h index 94199d4..9e5d3ba 100644 --- a/lustre/include/lustre_log.h +++ b/lustre/include/lustre_log.h @@ -112,16 +112,44 @@ extern int llog_cancel_rec(struct llog_handle *loghandle, int index); extern int llog_close(struct llog_handle *cathandle); extern int llog_get_size(struct llog_handle *loghandle); -/* llog_cat.c - catalog api */ +/* llog_cat.c - catalog api */ struct llog_process_data { - void *lpd_data; - llog_cb_t lpd_cb; + /** + * Any useful data needed while processing catalog. This is + * passed later to process callback. + */ + void *lpd_data; + /** + * Catalog process callback function, called for each record + * in catalog. + */ + llog_cb_t lpd_cb; }; struct llog_process_cat_data { - int first_idx; - int last_idx; - /* to process catalog across zero record */ + /** + * Temporary stored first_idx while scanning log. + */ + int lpcd_first_idx; + /** + * Temporary stored last_idx while scanning log. + */ + int lpcd_last_idx; +}; + +struct llog_process_cat_args { + /** + * Llog context used in recovery thread on OST (recov_thread.c) + */ + struct llog_ctxt *lpca_ctxt; + /** + * Llog callback used in recovery thread on OST (recov_thread.c) + */ + void *lpca_cb; + /** + * Data pointer for llog callback. + */ + void *lpca_arg; }; int llog_cat_put(struct llog_handle *cathandle); @@ -130,6 +158,7 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec, int llog_cat_cancel_records(struct llog_handle *cathandle, int count, struct llog_cookie *cookies); int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data); +int llog_cat_process_thread(void *data); int llog_cat_reverse_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data); int llog_cat_set_first_idx(struct llog_handle *cathandle, int index); @@ -180,9 +209,9 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags); int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp); -int llog_repl_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_gen *gen, - struct obd_uuid *uuid); +int llog_obd_repl_connect(struct llog_ctxt *ctxt, int count, + struct llog_logid *logid, struct llog_gen *gen, + struct obd_uuid *uuid); struct llog_operations { int (*lop_write_rec)(struct llog_handle *loghandle, @@ -224,19 +253,82 @@ struct llog_ctxt { int loc_idx; /* my index the obd array of ctxt's */ struct llog_gen loc_gen; struct obd_device *loc_obd; /* points back to the containing obd*/ - struct obd_llog_group *loc_olg; /* group containing that ctxt */ + struct obd_llog_group *loc_olg; /* group containing that ctxt */ struct obd_export *loc_exp; /* parent "disk" export (e.g. MDS) */ struct obd_import *loc_imp; /* to use in RPC's: can be backward pointing import */ struct llog_operations *loc_logops; struct llog_handle *loc_handle; + struct llog_commit_master *loc_lcm; struct llog_canceld_ctxt *loc_llcd; struct semaphore loc_sem; /* protects loc_llcd and loc_imp */ - atomic_t loc_refcount; - struct llog_commit_master *loc_lcm; + atomic_t loc_refcount; void *llog_proc_cb; }; +#define LCM_NAME_SIZE 64 + +struct llog_commit_master { + /** + * Thread control flags (start, stop, etc.) + */ + long lcm_flags; + /** + * Number of llcds onthis lcm. + */ + atomic_t lcm_count; + /** + * Ptlrpc requests set. All cancel rpcs go via this request set. + */ + struct ptlrpc_request_set *lcm_set; + /** + * Thread control structure. Used for control commit thread. + */ + struct ptlrpcd_ctl lcm_pc; + /** + * Commit thread name buffer. Only used for thread start. + */ + char lcm_name[LCM_NAME_SIZE]; +}; + +struct llog_canceld_ctxt { + /** + * Llog context this llcd is attached to. Used for accessing + * ->loc_import and others in process of canceling cookies + * gathered in this llcd. + */ + struct llog_ctxt *llcd_ctxt; + /** + * Cancel thread control stucture pointer. Used for accessing + * it to see if should stop processing and other needs. + */ + struct llog_commit_master *llcd_lcm; + /** + * Maximal llcd size. Used in calculations on how much of room + * left in llcd to cookie comming cookies. + */ + int llcd_size; + /** + * Current llcd size while gathering cookies. This should not be + * more than ->llcd_size. Used for determining if we need to + * send this llcd (if full) and allocate new one. This is also + * used for copying new cookie at the end of buffer. + */ + int llcd_cookiebytes; + /** + * Pointer to the start of cookies buffer. + */ + struct llog_cookie llcd_cookies[0]; +}; + +/* ptlrpc/recov_thread.c */ +extern struct llog_commit_master *llog_recov_thread_init(char *name); +extern void llog_recov_thread_fini(struct llog_commit_master *lcm, + int force); +extern int llog_recov_thread_start(struct llog_commit_master *lcm); +extern void llog_recov_thread_stop(struct llog_commit_master *lcm, + int force); + #ifndef __KERNEL__ #define cap_raise(c, flag) do {} while(0) @@ -310,10 +402,10 @@ static inline void llog_ctxt_put(struct llog_ctxt *ctxt) { if (ctxt == NULL) return; - CDEBUG(D_INFO, "PUTting ctxt %p : new refcount %d\n", ctxt, - atomic_read(&ctxt->loc_refcount) - 1); LASSERT(atomic_read(&ctxt->loc_refcount) > 0); LASSERT(atomic_read(&ctxt->loc_refcount) < 0x5a5a5a); + CDEBUG(D_INFO, "PUTting ctxt %p : new refcount %d\n", ctxt, + atomic_read(&ctxt->loc_refcount) - 1); __llog_ctxt_put(ctxt); } diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 334405c..153e1eb 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -714,6 +714,69 @@ struct ptlrpc_service { //struct ptlrpc_srv_ni srv_interfaces[0]; }; +struct ptlrpcd_ctl { + /** + * Ptlrpc thread control flags (LIOD_START, LIOD_STOP, LIOD_STOP_FORCE) + */ + unsigned long pc_flags; + /** + * Thread lock protecting structure fields. + */ + spinlock_t pc_lock; + /** + * Start completion. + */ + struct completion pc_starting; + /** + * Stop completion. + */ + struct completion pc_finishing; + /** + * Thread requests set. + */ + struct ptlrpc_request_set *pc_set; + /** + * Thread name used in cfs_daemonize() + */ + char pc_name[16]; +#ifndef __KERNEL__ + /** + * Async rpcs flag to make sure that ptlrpcd_check() is called only + * once. + */ + int pc_recurred; + /** + * Currently not used. + */ + void *pc_callback; + /** + * User-space async rpcs callback. + */ + void *pc_wait_callback; + /** + * User-space check idle rpcs callback. + */ + void *pc_idle_callback; +#endif +}; + +/* Bits for pc_flags */ +enum ptlrpcd_ctl_flags { + /** + * Ptlrpc thread start flag. + */ + LIOD_START = 1 << 0, + /** + * Ptlrpc thread stop flag. + */ + LIOD_STOP = 1 << 1, + /** + * Ptlrpc thread stop force flag. This will cause also + * aborting any inflight rpcs handled by thread. + */ + LIOD_STOP_FORCE = 1 << 2 +}; + /* ptlrpc/events.c */ extern lnet_handle_eq_t ptlrpc_eq_h; extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid, @@ -794,6 +857,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req); void ptlrpc_unregister_reply(struct ptlrpc_request *req); void ptlrpc_restart_req(struct ptlrpc_request *req); void ptlrpc_abort_inflight(struct obd_import *imp); +void ptlrpc_abort_set(struct ptlrpc_request_set *set); struct ptlrpc_request_set *ptlrpc_prep_set(void); int ptlrpc_set_add_cb(struct ptlrpc_request_set *set, @@ -806,13 +870,16 @@ void ptlrpc_interrupted_set(void *data); void ptlrpc_mark_interrupted(struct ptlrpc_request *req); void ptlrpc_set_destroy(struct ptlrpc_request_set *); void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *); -void ptlrpc_set_add_new_req(struct ptlrpc_request_set *, - struct ptlrpc_request *); +int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc, + struct ptlrpc_request *req); void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool); void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq); -struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int, int, - void (*populate_pool)(struct ptlrpc_request_pool *, int)); + +struct ptlrpc_request_pool * +ptlrpc_init_rq_pool(int, int, + void (*populate_pool)(struct ptlrpc_request_pool *, int)); + void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req); struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp, const struct req_format *format); @@ -1065,6 +1132,8 @@ void ping_evictor_stop(void); int ptlrpc_check_and_wait_suspend(struct ptlrpc_request *req); /* ptlrpc/ptlrpcd.c */ +int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc); +void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force); void ptlrpcd_wake(struct ptlrpc_request *req); void ptlrpcd_add_req(struct ptlrpc_request *req); int ptlrpcd_addref(void); diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 3778715..bb3f482 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -401,8 +401,7 @@ struct filter_obd { unsigned int fo_fl_oss_capa; struct list_head fo_capa_keys; struct hlist_head *fo_capa_hash; - - void *fo_lcm; + struct llog_commit_master *fo_lcm; }; #define OSC_MAX_RIF_DEFAULT 8 diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 0074d7d..beda0d3 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -60,7 +60,6 @@ #include #include #include -#include #include #include #include diff --git a/lustre/mds/mds_log.c b/lustre/mds/mds_log.c index 8d1ef17..ff7c307 100644 --- a/lustre/mds/mds_log.c +++ b/lustre/mds/mds_log.c @@ -52,9 +52,7 @@ #include #include #include -#include #include - #include "mds_internal.h" static int mds_llog_origin_add(struct llog_ctxt *ctxt, diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c index 819a8b0..303e890 100644 --- a/lustre/mgs/mgs_handler.c +++ b/lustre/mgs/mgs_handler.c @@ -57,7 +57,6 @@ #include #include #include -#include #include #include "mgs_internal.h" diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 98c15fe..fc42536 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -822,13 +822,13 @@ void class_import_put(struct obd_import *import) { ENTRY; - CDEBUG(D_INFO, "import %p refcount=%d\n", import, - atomic_read(&import->imp_refcount) - 1); - LASSERT(atomic_read(&import->imp_refcount) > 0); LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a); LASSERT(list_empty(&import->imp_zombie_chain)); + CDEBUG(D_INFO, "import %p refcount=%d\n", import, + atomic_read(&import->imp_refcount) - 1); + if (atomic_dec_and_test(&import->imp_refcount)) { CDEBUG(D_INFO, "final put import %p\n", import); diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index de2b350..aec0202 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -245,11 +245,11 @@ static int llog_process_thread(void *arg) cfs_daemonize_ctxt("llog_process_thread"); if (cd != NULL) { - last_called_index = cd->first_idx; - index = cd->first_idx + 1; + last_called_index = cd->lpcd_first_idx; + index = cd->lpcd_first_idx + 1; } - if (cd != NULL && cd->last_idx) - last_index = cd->last_idx; + if (cd != NULL && cd->lpcd_last_idx) + last_index = cd->lpcd_last_idx; else last_index = LLOG_BITMAP_BYTES * 8 - 1; @@ -348,7 +348,7 @@ static int llog_process_thread(void *arg) out: if (cd != NULL) - cd->last_idx = last_called_index; + cd->lpcd_last_idx = last_called_index; if (buf) OBD_FREE(buf, LLOG_CHUNK_SIZE); lpi->lpi_rc = rc; @@ -415,9 +415,9 @@ int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb, RETURN(-ENOMEM); if (cd != NULL) - first_index = cd->first_idx + 1; - if (cd != NULL && cd->last_idx) - index = cd->last_idx; + first_index = cd->lpcd_first_idx + 1; + if (cd != NULL && cd->lpcd_last_idx) + index = cd->lpcd_last_idx; else index = LLOG_BITMAP_BYTES * 8 - 1; diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index dda79a0..c142dfe 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -395,14 +395,14 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data) CWARN("catlog "LPX64" crosses index zero\n", cat_llh->lgh_id.lgl_oid); - cd.first_idx = llh->llh_cat_idx; - cd.last_idx = 0; + cd.lpcd_first_idx = llh->llh_cat_idx; + cd.lpcd_last_idx = 0; rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd); if (rc != 0) RETURN(rc); - cd.first_idx = 0; - cd.last_idx = cat_llh->lgh_last_idx; + cd.lpcd_first_idx = 0; + cd.lpcd_last_idx = cat_llh->lgh_last_idx; rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd); } else { rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL); @@ -412,6 +412,56 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data) } EXPORT_SYMBOL(llog_cat_process); +#ifdef __KERNEL__ +int llog_cat_process_thread(void *data) +{ + struct llog_process_cat_args *args = data; + struct llog_ctxt *ctxt = args->lpca_ctxt; + struct llog_handle *llh = NULL; + void *cb = args->lpca_cb; + struct llog_logid logid; + int rc; + ENTRY; + + cfs_daemonize_ctxt("ll_log_process"); + + logid = *(struct llog_logid *)(args->lpca_arg); + rc = llog_create(ctxt, &llh, &logid, NULL); + if (rc) { + CERROR("llog_create() failed %d\n", rc); + GOTO(out, rc); + } + rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL); + if (rc) { + CERROR("llog_init_handle failed %d\n", rc); + GOTO(release_llh, rc); + } + + if (cb) { + rc = llog_cat_process(llh, (llog_cb_t)cb, NULL); + if (rc != LLOG_PROC_BREAK) + CERROR("llog_cat_process() failed %d\n", rc); + } else { + CWARN("No callback function for recovery\n"); + } + + /* + * Make sure that all cached data is sent. + */ + llog_sync(ctxt, NULL); + EXIT; +release_llh: + rc = llog_cat_put(llh); + if (rc) + CERROR("llog_cat_put() failed %d\n", rc); +out: + llog_ctxt_put(ctxt); + OBD_FREE(args, sizeof(*args)); + return rc; +} +EXPORT_SYMBOL(llog_cat_process_thread); +#endif + static int llog_cat_reverse_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec, void *data) { @@ -456,15 +506,15 @@ int llog_cat_reverse_process(struct llog_handle *cat_llh, CWARN("catalog "LPX64" crosses index zero\n", cat_llh->lgh_id.lgl_oid); - cd.first_idx = 0; - cd.last_idx = cat_llh->lgh_last_idx; + cd.lpcd_first_idx = 0; + cd.lpcd_last_idx = cat_llh->lgh_last_idx; rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb, &d, &cd); if (rc != 0) RETURN(rc); - cd.first_idx = le32_to_cpu(llh->llh_cat_idx); - cd.last_idx = 0; + cd.lpcd_first_idx = le32_to_cpu(llh->llh_cat_idx); + cd.lpcd_last_idx = 0; rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb, &d, &cd); } else { diff --git a/lustre/obdclass/llog_obd.c b/lustre/obdclass/llog_obd.c index 7256232..945224c 100644 --- a/lustre/obdclass/llog_obd.c +++ b/lustre/obdclass/llog_obd.c @@ -126,6 +126,9 @@ int llog_cleanup(struct llog_ctxt *ctxt) /* try to free the ctxt */ rc = __llog_ctxt_put(ctxt); + if (rc) + CERROR("Error %d while cleaning up ctxt %p\n", + rc, ctxt); l_wait_event(olg->olg_waitq, llog_group_ctxt_null(olg, idx), &lwi); diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index 4811205..5113166 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -473,7 +473,6 @@ int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg) if (err) CERROR("Precleanup %s returned %d\n", obd->obd_name, err); - class_decref(obd); obd->obd_set_up = 0; RETURN(0); @@ -1083,16 +1082,16 @@ int class_config_parse_llog(struct llog_ctxt *ctxt, char *name, /* continue processing from where we last stopped to end-of-log */ if (cfg) - cd.first_idx = cfg->cfg_last_idx; - cd.last_idx = 0; + cd.lpcd_first_idx = cfg->cfg_last_idx; + cd.lpcd_last_idx = 0; rc = llog_process(llh, class_config_llog_handler, cfg, &cd); CDEBUG(D_CONFIG, "Processed log %s gen %d-%d (rc=%d)\n", name, - cd.first_idx + 1, cd.last_idx, rc); + cd.lpcd_first_idx + 1, cd.lpcd_last_idx, rc); if (cfg) - cfg->cfg_last_idx = cd.last_idx; + cfg->cfg_last_idx = cd.lpcd_last_idx; parse_out: rc2 = llog_close(llh); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 80e5c47..adbf725 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -71,7 +71,6 @@ #include #include #include -#include #include #include #include @@ -2171,7 +2170,15 @@ static int filter_setup(struct obd_device *obd, struct lustre_cfg* lcfg) return rc; } -static int filter_group_llog_finish(struct obd_llog_group *olg) +static struct llog_operations filter_mds_ost_repl_logops; + +static struct llog_operations filter_size_orig_logops = { + .lop_setup = llog_obd_origin_setup, + .lop_cleanup = llog_obd_origin_cleanup, + .lop_add = llog_obd_origin_add +}; + +static int filter_olg_fini(struct obd_llog_group *olg) { struct llog_ctxt *ctxt; int rc = 0, rc2 = 0; @@ -2190,20 +2197,12 @@ static int filter_group_llog_finish(struct obd_llog_group *olg) RETURN(rc); } -static struct llog_operations filter_mds_ost_repl_logops /* initialized below*/; -static struct llog_operations filter_size_orig_logops = { - lop_setup: llog_obd_origin_setup, - lop_cleanup: llog_obd_origin_cleanup, - lop_add: llog_obd_origin_add -}; - -/** - * Init obd_llog_group of the filter - */ -static int filter_olg_init(struct obd_device *obd, struct obd_llog_group *olg, - struct obd_device *tgt) +static int +filter_olg_init(struct obd_device *obd, struct obd_llog_group *olg, + struct obd_device *tgt) { int rc; + ENTRY; rc = llog_setup(obd, olg, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL, &filter_mds_ost_repl_logops); @@ -2214,10 +2213,11 @@ static int filter_olg_init(struct obd_device *obd, struct obd_llog_group *olg, &filter_size_orig_logops); if (rc) GOTO(cleanup, rc); + EXIT; cleanup: if (rc) - filter_group_llog_finish(olg); - RETURN(rc); + filter_olg_fini(olg); + return rc; } /** @@ -2230,48 +2230,44 @@ filter_default_olg_init(struct obd_device *obd, struct obd_llog_group *olg, struct filter_obd *filter = &obd->u.filter; struct llog_ctxt *ctxt; int rc; + ENTRY; - LASSERT(filter->fo_lcm == NULL); - OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master)); + filter->fo_lcm = llog_recov_thread_init(obd->obd_name); if (!filter->fo_lcm) RETURN(-ENOMEM); - rc = llog_init_commit_master((struct llog_commit_master *) - filter->fo_lcm); - if (rc) - GOTO(cleanup, rc); - filter_mds_ost_repl_logops = llog_client_ops; filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel; - filter_mds_ost_repl_logops.lop_connect = llog_repl_connect; + filter_mds_ost_repl_logops.lop_connect = llog_obd_repl_connect; filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync; rc = filter_olg_init(obd, olg, tgt); if (rc) - GOTO(cleanup, rc); + GOTO(cleanup_lcm, rc); ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT); - LASSERT(ctxt != NULL); + if (!ctxt) { + CERROR("Can't get ctxt for %p:%x\n", olg, + LLOG_MDS_OST_REPL_CTXT); + GOTO(cleanup_olg, rc = -ENODEV); + } + ctxt->loc_lcm = filter->fo_lcm; ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb; - ctxt->loc_lcm = obd->u.filter.fo_lcm; - /* Only start log commit thread for default llog ctxt*/ - rc = llog_start_commit_thread(ctxt->loc_lcm); llog_ctxt_put(ctxt); - if (rc) - GOTO(cleanup, rc); -cleanup: - if (rc) { - llog_cleanup_commit_master(filter->fo_lcm, 0); - OBD_FREE(filter->fo_lcm, sizeof(struct llog_commit_master)); - filter->fo_lcm = NULL; - } - RETURN(rc); + + RETURN(0); +cleanup_olg: + filter_olg_fini(olg); +cleanup_lcm: + llog_recov_thread_fini(filter->fo_lcm, 1); + filter->fo_lcm = NULL; + return rc; } -static int filter_llog_init(struct obd_device *obd, struct obd_llog_group *olg, - struct obd_device *tgt, int count, - struct llog_catid *catid, - struct obd_uuid *uuid) +static int +filter_llog_init(struct obd_device *obd, struct obd_llog_group *olg, + struct obd_device *tgt, int count, struct llog_catid *catid, + struct obd_uuid *uuid) { struct filter_obd *filter = &obd->u.filter; struct llog_ctxt *ctxt; @@ -2282,38 +2278,49 @@ static int filter_llog_init(struct obd_device *obd, struct obd_llog_group *olg, if (olg == &obd->obd_olg) return filter_default_olg_init(obd, olg, tgt); - /* For other group llog, which will be initialized in - * llog_connect(after default_olg_init, so fo_lcm must - * already be initialized */ LASSERT(filter->fo_lcm != NULL); rc = filter_olg_init(obd, olg, tgt); if (rc) RETURN(rc); ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT); - LASSERT(ctxt != NULL); + if (!ctxt) { + CERROR("Can't get ctxt for %p:%x\n", olg, + LLOG_MDS_OST_REPL_CTXT); + filter_olg_fini(olg); + RETURN(-ENODEV); + } ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb; - ctxt->loc_lcm = obd->u.filter.fo_lcm; + ctxt->loc_lcm = filter->fo_lcm; llog_ctxt_put(ctxt); - RETURN(rc); } static int filter_llog_finish(struct obd_device *obd, int count) { - int rc; + struct filter_obd *filter = &obd->u.filter; + struct llog_ctxt *ctxt; ENTRY; - - if (obd->u.filter.fo_lcm) { - llog_cleanup_commit_master((struct llog_commit_master *) - obd->u.filter.fo_lcm, 1); - OBD_FREE(obd->u.filter.fo_lcm, - sizeof(struct llog_commit_master)); - obd->u.filter.fo_lcm = NULL; + + ctxt = llog_group_get_ctxt(&obd->obd_olg, LLOG_MDS_OST_REPL_CTXT); + LASSERT(ctxt != NULL); + mutex_down(&ctxt->loc_sem); + if (ctxt->loc_imp) { + /* + * Balance class_import_get() in llog_receptor_accept(). This + * is safe to do here, as llog is already synchronized and its + * import may go. + */ + class_import_put(ctxt->loc_imp); + ctxt->loc_imp = NULL; } - /* finish obd llog group */ - rc = filter_group_llog_finish(&obd->obd_olg); + mutex_up(&ctxt->loc_sem); + llog_ctxt_put(ctxt); - RETURN(rc); + if (filter->fo_lcm) { + llog_recov_thread_fini(filter->fo_lcm, obd->obd_force); + filter->fo_lcm = NULL; + } + RETURN(filter_olg_fini(&obd->obd_olg)); } /** @@ -2441,7 +2448,7 @@ static int filter_llog_connect(struct obd_export *exp, RETURN(rc); } -static int filter_llog_preclean (struct obd_device *obd) +static int filter_llog_preclean(struct obd_device *obd) { struct obd_llog_group *olg, *tmp; struct filter_obd *filter; @@ -2467,7 +2474,7 @@ static int filter_llog_preclean (struct obd_device *obd) list_for_each_entry_safe(olg, tmp, &remove_list, olg_list) { list_del_init(&olg->olg_list); - rc = filter_group_llog_finish(olg); + rc = filter_olg_fini(olg); if (rc) CERROR("failed to cleanup llogging subsystem for %u\n", olg->olg_group); diff --git a/lustre/obdfilter/filter_log.c b/lustre/obdfilter/filter_log.c index a6aedf0..e82d5e1 100644 --- a/lustre/obdfilter/filter_log.c +++ b/lustre/obdfilter/filter_log.c @@ -50,9 +50,8 @@ #include #include +#include #include -#include - #include "filter_internal.h" int filter_log_sz_change(struct llog_handle *cathandle, diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 3596108..bc414e6 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -53,7 +53,6 @@ #include #include #include -#include #include #include #include "ost_internal.h" diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 8b133e4..693e28d 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -473,6 +473,7 @@ static struct ptlrpc_request *ptlrpc_prep_req_from_pool(struct ptlrpc_request_po request->rq_reqbuf = reqbuf; request->rq_reqbuf_len = pool->prp_rq_size; request->rq_pool = pool; + return request; } @@ -781,16 +782,36 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set, atomic_inc(&req->rq_import->imp_inflight); } -/* lock so many callers can add things, the context that owns the set - * is supposed to notice these and move them into the set proper. */ -void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set, - struct ptlrpc_request *req) +/** + * Lock so many callers can add things, the context that owns the set + * is supposed to notice these and move them into the set proper. + */ +int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc, + struct ptlrpc_request *req) { + struct ptlrpc_request_set *set = pc->pc_set; + + /* + * Let caller know that we stopped and will not handle this request. + * It needs to take care itself of request. + */ + if (test_bit(LIOD_STOP, &pc->pc_flags)) + return -EALREADY; + spin_lock(&set->set_new_req_lock); - /* The set takes over the caller's request reference */ + /* + * The set takes over the caller's request reference. + */ list_add_tail(&req->rq_set_chain, &set->set_new_requests); req->rq_set = set; spin_unlock(&set->set_new_req_lock); + + /* + * Let thead know that we added something and better it to wake up + * and process. + */ + cfs_waitq_signal(&set->set_waitq); + return 0; } /* @@ -2331,6 +2352,29 @@ void ptlrpc_abort_inflight(struct obd_import *imp) EXIT; } +void ptlrpc_abort_set(struct ptlrpc_request_set *set) +{ + struct list_head *tmp, *n; + + LASSERT(set != NULL); + + list_for_each_safe(tmp, n, &set->set_requests) { + struct ptlrpc_request *req = + list_entry(tmp, struct ptlrpc_request, rq_set_chain); + + spin_lock (&req->rq_lock); + if (req->rq_phase != RQ_PHASE_RPC) { + spin_unlock (&req->rq_lock); + continue; + } + + req->rq_err = 1; + req->rq_status = -EINTR; + ptlrpc_wake_client_req(req); + spin_unlock (&req->rq_lock); + } +} + static __u64 ptlrpc_last_xid = 0; spinlock_t ptlrpc_last_xid_lock; diff --git a/lustre/ptlrpc/ptlrpc_internal.h b/lustre/ptlrpc/ptlrpc_internal.h index 1a62924..6a00fad 100644 --- a/lustre/ptlrpc/ptlrpc_internal.h +++ b/lustre/ptlrpc/ptlrpc_internal.h @@ -124,6 +124,10 @@ void sptlrpc_gc_stop_thread(void); int __init sptlrpc_init(void); void __exit sptlrpc_fini(void); +/* recov_thread.c */ +int llog_recov_init(void); +void llog_recov_fini(void); + static inline int ll_rpc_recoverable_error(int rc) { return (rc == -ENOTCONN || rc == -ENODEV); diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index 94e3df4..508b432 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -101,10 +101,17 @@ __init int ptlrpc_init(void) if (rc) GOTO(cleanup, rc); + cleanup_phase = 6; + rc = llog_recov_init(); + if (rc) + GOTO(cleanup, rc); + RETURN(0); cleanup: switch(cleanup_phase) { + case 6: + sptlrpc_fini(); case 5: ldlm_exit(); case 4: @@ -124,6 +131,7 @@ cleanup: #ifdef __KERNEL__ static void __exit ptlrpc_exit(void) { + llog_recov_fini(); sptlrpc_fini(); ldlm_exit(); ptlrpc_stop_pinger(); diff --git a/lustre/ptlrpc/ptlrpcd.c b/lustre/ptlrpc/ptlrpcd.c index c09663a..8699e7d 100644 --- a/lustre/ptlrpc/ptlrpcd.c +++ b/lustre/ptlrpc/ptlrpcd.c @@ -53,22 +53,6 @@ #include /* for OBD_FAIL_CHECK */ #include -#define LIOD_STOP 0 -struct ptlrpcd_ctl { - unsigned long pc_flags; - spinlock_t pc_lock; - struct completion pc_starting; - struct completion pc_finishing; - struct ptlrpc_request_set *pc_set; - char pc_name[16]; -#ifndef __KERNEL__ - int pc_recurred; - void *pc_callback; - void *pc_wait_callback; - void *pc_idle_callback; -#endif -}; - static struct ptlrpcd_ctl ptlrpcd_pc; static struct ptlrpcd_ctl ptlrpcd_recovery_pc; @@ -84,19 +68,40 @@ void ptlrpcd_wake(struct ptlrpc_request *req) cfs_waitq_signal(&rq_set->set_waitq); } -/* requests that are added to the ptlrpcd queue are sent via - * ptlrpcd_check->ptlrpc_check_set() */ +/* + * Requests that are added to the ptlrpcd queue are sent via + * ptlrpcd_check->ptlrpc_check_set(). + */ void ptlrpcd_add_req(struct ptlrpc_request *req) { struct ptlrpcd_ctl *pc; + int rc; if (req->rq_send_state == LUSTRE_IMP_FULL) pc = &ptlrpcd_pc; else pc = &ptlrpcd_recovery_pc; - ptlrpc_set_add_new_req(pc->pc_set, req); - cfs_waitq_signal(&pc->pc_set->set_waitq); + rc = ptlrpc_set_add_new_req(pc, req); + if (rc) { + int (*interpreter)(struct ptlrpc_request *, + void *, int); + + interpreter = req->rq_interpret_reply; + + /* + * Thread is probably in stop now so we need to + * kill this rpc as it was not added. Let's call + * interpret for it to let know we're killing it + * so that higher levels might free assosiated + * resources. + */ + req->rq_status = -EBADR; + interpreter(req, &req->rq_async_args, + req->rq_status); + req->rq_set = NULL; + ptlrpc_req_finished(req); + } } static int ptlrpcd_check(struct ptlrpcd_ctl *pc) @@ -114,15 +119,20 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) req = list_entry(pos, struct ptlrpc_request, rq_set_chain); list_del_init(&req->rq_set_chain); ptlrpc_set_add_req(pc->pc_set, req); - rc = 1; /* need to calculate its timeout */ + /* + * Need to calculate its timeout. + */ + rc = 1; } spin_unlock(&pc->pc_set->set_new_req_lock); if (pc->pc_set->set_remaining) { rc = rc | ptlrpc_check_set(pc->pc_set); - /* XXX our set never completes, so we prune the completed - * reqs after each iteration. boy could this be smarter. */ + /* + * XXX: our set never completes, so we prune the completed + * reqs after each iteration. boy could this be smarter. + */ list_for_each_safe(pos, tmp, &pc->pc_set->set_requests) { req = list_entry(pos, struct ptlrpc_request, rq_set_chain); @@ -136,7 +146,9 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) } if (rc == 0) { - /* If new requests have been added, make sure to wake up */ + /* + * If new requests have been added, make sure to wake up. + */ spin_lock(&pc->pc_set->set_new_req_lock); rc = !list_empty(&pc->pc_set->set_new_requests); spin_unlock(&pc->pc_set->set_new_req_lock); @@ -146,9 +158,11 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) } #ifdef __KERNEL__ -/* ptlrpc's code paths like to execute in process context, so we have this - * thread which spins on a set which contains the io rpcs. llite specifies - * ptlrpcd's set when it pushes pages down into the oscs */ +/* + * ptlrpc's code paths like to execute in process context, so we have this + * thread which spins on a set which contains the io rpcs. llite specifies + * ptlrpcd's set when it pushes pages down into the oscs. + */ static int ptlrpcd(void *arg) { struct ptlrpcd_ctl *pc = arg; @@ -157,16 +171,17 @@ static int ptlrpcd(void *arg) if ((rc = cfs_daemonize_ctxt(pc->pc_name))) { complete(&pc->pc_starting); - return rc; + goto out; } complete(&pc->pc_starting); - /* this mainloop strongly resembles ptlrpc_set_wait except - * that our set never completes. ptlrpcd_check calls ptlrpc_check_set - * when there are requests in the set. new requests come in - * on the set's new_req_list and ptlrpcd_check moves them into - * the set. */ + /* + * This mainloop strongly resembles ptlrpc_set_wait() except that our + * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when + * there are requests in the set. New requests come in on the set's + * new_req_list and ptlrpcd_check() moves them into the set. + */ while (1) { struct l_wait_info lwi; cfs_duration_t timeout; @@ -176,13 +191,26 @@ static int ptlrpcd(void *arg) l_wait_event(pc->pc_set->set_waitq, ptlrpcd_check(pc), &lwi); + /* + * Abort inflight rpcs for forced stop case. + */ + if (test_bit(LIOD_STOP_FORCE, &pc->pc_flags)) + ptlrpc_abort_set(pc->pc_set); + if (test_bit(LIOD_STOP, &pc->pc_flags)) break; } - /* wait for inflight requests to drain */ + + /* + * Wait for inflight requests to drain. + */ if (!list_empty(&pc->pc_set->set_requests)) ptlrpc_set_wait(pc->pc_set); + complete(&pc->pc_finishing); +out: + clear_bit(LIOD_START, &pc->pc_flags); + clear_bit(LIOD_STOP, &pc->pc_flags); return 0; } @@ -193,14 +221,18 @@ int ptlrpcd_check_async_rpcs(void *arg) struct ptlrpcd_ctl *pc = arg; int rc = 0; - /* single threaded!! */ + /* + * Single threaded!! + */ pc->pc_recurred++; if (pc->pc_recurred == 1) { rc = ptlrpcd_check(pc); if (!rc) ptlrpc_expired_set(pc->pc_set); - /*XXX send replay requests */ + /* + * XXX: send replay requests. + */ if (pc == &ptlrpcd_recovery_pc) rc = ptlrpcd_check(pc); } @@ -219,29 +251,36 @@ int ptlrpcd_idle(void *arg) #endif -static int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc) +int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc) { - int rc; - + int rc = 0; ENTRY; - memset(pc, 0, sizeof(*pc)); + + /* + * Do not allow start second thread for one pc. + */ + if (test_and_set_bit(LIOD_START, &pc->pc_flags)) { + CERROR("Starting second thread (%s) for same pc %p\n", + name, pc); + RETURN(-EALREADY); + } + init_completion(&pc->pc_starting); init_completion(&pc->pc_finishing); - pc->pc_flags = 0; spin_lock_init(&pc->pc_lock); snprintf (pc->pc_name, sizeof (pc->pc_name), name); pc->pc_set = ptlrpc_prep_set(); if (pc->pc_set == NULL) - RETURN(-ENOMEM); + GOTO(out, rc = -ENOMEM); #ifdef __KERNEL__ rc = cfs_kernel_thread(ptlrpcd, pc, 0); if (rc < 0) { ptlrpc_set_destroy(pc->pc_set); - RETURN(rc); + GOTO(out, rc); } - + rc = 0; wait_for_completion(&pc->pc_starting); #else pc->pc_wait_callback = @@ -250,14 +289,23 @@ static int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc) pc->pc_idle_callback = liblustre_register_idle_callback("ptlrpcd_check_idle_rpcs", &ptlrpcd_idle, pc); - (void)rc; #endif - RETURN(0); +out: + if (rc) + clear_bit(LIOD_START, &pc->pc_flags); + RETURN(rc); } -static void ptlrpcd_stop(struct ptlrpcd_ctl *pc) +void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force) { + if (!test_bit(LIOD_START, &pc->pc_flags)) { + CERROR("Thread for pc %p was not started\n", pc); + return; + } + set_bit(LIOD_STOP, &pc->pc_flags); + if (force) + set_bit(LIOD_STOP_FORCE, &pc->pc_flags); cfs_waitq_signal(&pc->pc_set->set_waitq); #ifdef __KERNEL__ wait_for_completion(&pc->pc_finishing); @@ -285,7 +333,7 @@ int ptlrpcd_addref(void) rc = ptlrpcd_start("ptlrpcd-recov", &ptlrpcd_recovery_pc); if (rc) { - ptlrpcd_stop(&ptlrpcd_pc); + ptlrpcd_stop(&ptlrpcd_pc, 0); --ptlrpcd_users; GOTO(out, rc); } @@ -298,8 +346,8 @@ void ptlrpcd_decref(void) { mutex_down(&ptlrpcd_sem); if (--ptlrpcd_users == 0) { - ptlrpcd_stop(&ptlrpcd_pc); - ptlrpcd_stop(&ptlrpcd_recovery_pc); + ptlrpcd_stop(&ptlrpcd_pc, 0); + ptlrpcd_stop(&ptlrpcd_recovery_pc, 0); } mutex_up(&ptlrpcd_sem); } diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c index f2a5ba0..b882d03 100644 --- a/lustre/ptlrpc/recov_thread.c +++ b/lustre/ptlrpc/recov_thread.c @@ -40,7 +40,9 @@ * - we do not share logs among different OST<->MDS connections, so that * if an OST or MDS fails it need only look at log(s) relevant to itself * - * Author: Andreas Dilger + * Author: Andreas Dilger + * Yury Umanets + * Alexey Lyashkov */ #define DEBUG_SUBSYSTEM S_LOG @@ -57,7 +59,6 @@ #endif #include -#include #include #include #include @@ -66,616 +67,575 @@ #include #include "ptlrpc_internal.h" -#ifdef __KERNEL__ +static atomic_t llcd_count = ATOMIC_INIT(0); +static cfs_mem_cache_t *llcd_cache = NULL; -/* Allocate new commit structs in case we do not have enough. - * Make the llcd size small enough that it fits into a single page when we - * are sending/receiving it. */ -static int llcd_alloc(struct llog_commit_master *lcm) +#ifdef __KERNEL__ +enum { + LLOG_LCM_FL_START = 1 << 0, + LLOG_LCM_FL_EXIT = 1 << 1 +}; + +/** + * Allocate new llcd from cache, init it and return to caller. + * Bumps number of objects allocated. + */ +static struct llog_canceld_ctxt *llcd_alloc(void) { struct llog_canceld_ctxt *llcd; int llcd_size; - /* payload of lustre_msg V2 is bigger */ - llcd_size = 4096 - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL); - OBD_ALLOC(llcd, - llcd_size + offsetof(struct llog_canceld_ctxt, llcd_cookies)); - if (llcd == NULL) - return -ENOMEM; + /* + * Payload of lustre_msg V2 is bigger. + */ + llcd_size = CFS_PAGE_SIZE - + lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL); + llcd_size += offsetof(struct llog_canceld_ctxt, llcd_cookies); + OBD_SLAB_ALLOC(llcd, llcd_cache, CFS_ALLOC_STD, llcd_size); + if (!llcd) + return NULL; llcd->llcd_size = llcd_size; - llcd->llcd_lcm = lcm; - - spin_lock(&lcm->lcm_llcd_lock); - list_add(&llcd->llcd_list, &lcm->lcm_llcd_free); - atomic_inc(&lcm->lcm_llcd_numfree); - spin_unlock(&lcm->lcm_llcd_lock); - - return 0; -} - -/* Get a free cookie struct from the list */ -static struct llog_canceld_ctxt *llcd_grab(struct llog_commit_master *lcm) -{ - struct llog_canceld_ctxt *llcd; - -repeat: - spin_lock(&lcm->lcm_llcd_lock); - if (list_empty(&lcm->lcm_llcd_free)) { - spin_unlock(&lcm->lcm_llcd_lock); - if (llcd_alloc(lcm) < 0) { - CERROR("unable to allocate log commit data!\n"); - return NULL; - } - /* check new llcd wasn't grabbed while lock dropped, b=7407 */ - goto repeat; - } - - llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list); - list_del(&llcd->llcd_list); - atomic_dec(&lcm->lcm_llcd_numfree); - spin_unlock(&lcm->lcm_llcd_lock); - llcd->llcd_cookiebytes = 0; - + atomic_inc(&llcd_count); return llcd; } -static void llcd_put(struct llog_canceld_ctxt *llcd) +/** + * Returns passed llcd to cache. + */ +static void llcd_free(struct llog_canceld_ctxt *llcd) { - struct llog_commit_master *lcm = llcd->llcd_lcm; - - llog_ctxt_put(llcd->llcd_ctxt); - if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) { - int llcd_size = llcd->llcd_size + - offsetof(struct llog_canceld_ctxt, llcd_cookies); - OBD_FREE(llcd, llcd_size); - } else { - spin_lock(&lcm->lcm_llcd_lock); - list_add(&llcd->llcd_list, &lcm->lcm_llcd_free); - atomic_inc(&lcm->lcm_llcd_numfree); - spin_unlock(&lcm->lcm_llcd_lock); - } + OBD_SLAB_FREE(llcd, llcd_cache, llcd->llcd_size); + atomic_dec(&llcd_count); } -/* Send some cookies to the appropriate target */ -static void llcd_send(struct llog_canceld_ctxt *llcd) +/** + * Copy passed @cookies to @llcd. + */ +static void llcd_copy(struct llog_canceld_ctxt *llcd, + struct llog_cookie *cookies) { - if (!(llcd->llcd_lcm->lcm_flags & LLOG_LCM_FL_EXIT)) { - spin_lock(&llcd->llcd_lcm->lcm_llcd_lock); - list_add_tail(&llcd->llcd_list, - &llcd->llcd_lcm->lcm_llcd_pending); - spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock); - } - cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1); + memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, + cookies, sizeof(*cookies)); + llcd->llcd_cookiebytes += sizeof(*cookies); } /** - * Grab llcd and assign it to passed @ctxt. Also set up backward link - * and get ref on @ctxt. + * Checks if passed cookie fits into llcd free space buffer. Returns + * 1 if yes and 0 otherwise. */ -static struct llog_canceld_ctxt *ctxt_llcd_grab(struct llog_ctxt *ctxt) +static int llcd_fit(struct llog_canceld_ctxt *llcd, + struct llog_cookie *cookies) { - struct llog_canceld_ctxt *llcd; - - LASSERT_SEM_LOCKED(&ctxt->loc_sem); - llcd = llcd_grab(ctxt->loc_lcm); - if (llcd == NULL) - return NULL; - - llcd->llcd_ctxt = llog_ctxt_get(ctxt); - ctxt->loc_llcd = llcd; + return (llcd->llcd_size - + llcd->llcd_cookiebytes) >= sizeof(*cookies); +} - CDEBUG(D_RPCTRACE,"grab llcd %p:%p\n", ctxt->loc_llcd, ctxt); - return llcd; +static void llcd_print(struct llog_canceld_ctxt *llcd, + const char *func, int line) +{ + CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line); + CDEBUG(D_RPCTRACE, " size: %d\n", llcd->llcd_size); + CDEBUG(D_RPCTRACE, " ctxt: %p\n", llcd->llcd_ctxt); + CDEBUG(D_RPCTRACE, " lcm : %p\n", llcd->llcd_lcm); + CDEBUG(D_RPCTRACE, " cookiebytes : %d\n", llcd->llcd_cookiebytes); } /** - * Put llcd in passed @ctxt. Set ->loc_llcd to NULL. + * Llcd completion function. Called uppon llcd send finish regardless + * sending result. Error is passed in @rc. Note, that this will be called + * in cleanup time when all inflight rpcs aborted. */ -static void ctxt_llcd_put(struct llog_ctxt *ctxt) +static int +llcd_interpret(struct ptlrpc_request *req, void *noused, int rc) { - mutex_down(&ctxt->loc_sem); - if (ctxt->loc_llcd != NULL) { - CDEBUG(D_RPCTRACE,"put llcd %p:%p\n", ctxt->loc_llcd, ctxt); - llcd_put(ctxt->loc_llcd); - ctxt->loc_llcd = NULL; - } - if (ctxt->loc_imp) { - class_import_put(ctxt->loc_imp); - ctxt->loc_imp = NULL; - } - mutex_up(&ctxt->loc_sem); + struct llog_canceld_ctxt *llcd = req->rq_async_args.pointer_arg[0]; + CDEBUG(D_RPCTRACE, "Sent llcd %p (%d)\n", llcd, rc); + llcd_free(llcd); + return 0; } - -/* deleted objects have a commit callback that cancels the MDS - * log record for the deletion. The commit callback calls this - * function + +/** + * Send @llcd to remote node. Free llcd uppon completion or error. Sending + * is performed in async style so this function will return asap without + * blocking. */ -int llog_obd_repl_cancel(struct llog_ctxt *ctxt, - struct lov_stripe_md *lsm, int count, - struct llog_cookie *cookies, int flags) +static int llcd_send(struct llog_canceld_ctxt *llcd) { - struct llog_canceld_ctxt *llcd; - int rc = 0; + char *bufs[2] = { NULL, (char *)llcd->llcd_cookies }; + struct obd_import *import = NULL; + struct llog_commit_master *lcm; + struct ptlrpc_request *req; + struct llog_ctxt *ctxt; + int rc; ENTRY; - LASSERT(ctxt); - - mutex_down(&ctxt->loc_sem); - llcd = ctxt->loc_llcd; - - if (ctxt->loc_imp == NULL) { - CDEBUG(D_RPCTRACE, "no import for ctxt %p\n", ctxt); - GOTO(out, rc = 0); + ctxt = llcd->llcd_ctxt; + if (!ctxt) { + CERROR("Invalid llcd with NULL ctxt found (%p)\n", + llcd); + llcd_print(llcd, __FUNCTION__, __LINE__); + LBUG(); } + LASSERT_SEM_LOCKED(&ctxt->loc_sem); - if (count > 0 && cookies != NULL) { - if (llcd == NULL) { - llcd = ctxt_llcd_grab(ctxt); - if (llcd == NULL) { - CERROR("couldn't get an llcd - dropped "LPX64 - ":%x+%u\n", - cookies->lgc_lgl.lgl_oid, - cookies->lgc_lgl.lgl_ogen, - cookies->lgc_index); - GOTO(out, rc = -ENOMEM); - } - } + if (llcd->llcd_cookiebytes == 0) + GOTO(exit, rc = 0); + + lcm = llcd->llcd_lcm; + + /* + * Check if we're in exit stage. Do not send llcd in + * this case. + */ + if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) + GOTO(exit, rc = -ENODEV); + + CDEBUG(D_RPCTRACE, "Sending llcd %p\n", llcd); + + import = llcd->llcd_ctxt->loc_imp; + if (!import || (import == LP_POISON) || + (import->imp_client == LP_POISON)) { + CERROR("Invalid import %p for llcd %p\n", + import, llcd); + GOTO(exit, rc = -ENODEV); + } - memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, - cookies, sizeof(*cookies)); - llcd->llcd_cookiebytes += sizeof(*cookies); - } else { - if (llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW)) - GOTO(out, rc); + OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10); + + /* + * No need to get import here as it is already done in + * llog_receptor_accept(). + */ + req = ptlrpc_request_alloc(import, &RQF_LOG_CANCEL); + if (req == NULL) { + CERROR("Can't allocate request for sending llcd %p\n", + llcd); + GOTO(exit, rc = -ENOMEM); } + req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, + RCL_CLIENT, llcd->llcd_cookiebytes); - if ((llcd->llcd_size - llcd->llcd_cookiebytes) < sizeof(*cookies) || - (flags & OBD_LLOG_FL_SENDNOW)) { - CDEBUG(D_RPCTRACE, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt); - ctxt->loc_llcd = NULL; - llcd_send(llcd); + rc = ptlrpc_request_bufs_pack(req, LUSTRE_LOG_VERSION, + OBD_LOG_CANCEL, bufs, NULL); + if (rc) { + ptlrpc_request_free(req); + GOTO(exit, rc); } -out: - mutex_up(&ctxt->loc_sem); - return rc; -} -EXPORT_SYMBOL(llog_obd_repl_cancel); -int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp) -{ - int rc = 0; - ENTRY; + ptlrpc_at_set_req_timeout(req); + ptlrpc_request_set_replen(req); - if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) { - CDEBUG(D_RPCTRACE,"reverse import disconnect\n"); - /* - * We put llcd because it is not going to sending list and - * thus, its refc will not be handled. We will handle it here. - */ - ctxt_llcd_put(ctxt); - } else { - /* - * Sending cancel. This means that ctxt->loc_llcd wil be - * put on sending list in llog_obd_repl_cancel() and in - * this case recovery thread will take care of it refc. - */ - rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW); - } - RETURN(rc); + /* bug 5515 */ + req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL; + req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL; + req->rq_interpret_reply = llcd_interpret; + req->rq_async_args.pointer_arg[0] = llcd; + rc = ptlrpc_set_add_new_req(&lcm->lcm_pc, req); + if (rc) + GOTO(exit, rc); + + RETURN(0); +exit: + CDEBUG(D_RPCTRACE, "Refused llcd %p\n", llcd); + llcd_free(llcd); + return rc; } -EXPORT_SYMBOL(llog_obd_repl_sync); -static inline void stop_log_commit(struct llog_commit_master *lcm, - struct llog_commit_daemon *lcd, - int rc) +/** + * Attach @llcd to @ctxt. Establish llcd vs. ctxt reserve connection + * so hat they can refer each other. + */ +static int +llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd) { - CERROR("error preparing commit: rc %d\n", rc); + struct llog_commit_master *lcm; - spin_lock(&lcm->lcm_llcd_lock); - list_splice_init(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend); - spin_unlock(&lcm->lcm_llcd_lock); + LASSERT(ctxt != NULL && llcd != NULL); + LASSERT_SEM_LOCKED(&ctxt->loc_sem); + LASSERT(ctxt->loc_llcd == NULL); + lcm = ctxt->loc_lcm; + atomic_inc(&lcm->lcm_count); + CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p (%d)\n", + llcd, ctxt, atomic_read(&lcm->lcm_count)); + llcd->llcd_ctxt = llog_ctxt_get(ctxt); + llcd->llcd_lcm = ctxt->loc_lcm; + ctxt->loc_llcd = llcd; + return 0; } -static int log_commit_thread(void *arg) +/** + * Opposite to llcd_attach(). Detaches llcd from its @ctxt. This makes + * sure that this llcd will not be found another time we try to cancel. + */ +static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt) { - struct llog_commit_master *lcm = arg; - struct llog_commit_daemon *lcd; - struct llog_canceld_ctxt *llcd, *n; - struct obd_import *import = NULL; - ENTRY; - - OBD_ALLOC(lcd, sizeof(*lcd)); - if (lcd == NULL) - RETURN(-ENOMEM); - - spin_lock(&lcm->lcm_thread_lock); - THREAD_NAME(cfs_curproc_comm(), CFS_CURPROC_COMM_MAX - 1, - "ll_log_comt_%02d", atomic_read(&lcm->lcm_thread_total)); - atomic_inc(&lcm->lcm_thread_total); - spin_unlock(&lcm->lcm_thread_lock); - - ptlrpc_daemonize(cfs_curproc_comm()); /* thread never needs to do IO */ - - CFS_INIT_LIST_HEAD(&lcd->lcd_lcm_list); - CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list); - lcd->lcd_lcm = lcm; - - CDEBUG(D_HA, "%s started\n", cfs_curproc_comm()); - do { - struct ptlrpc_request *request; - struct list_head *sending_list; - int rc = 0; - - if (import) - class_import_put(import); - import = NULL; - - /* If we do not have enough pages available, allocate some */ - while (atomic_read(&lcm->lcm_llcd_numfree) < - lcm->lcm_llcd_minfree) { - if (llcd_alloc(lcm) < 0) - break; - } - - spin_lock(&lcm->lcm_thread_lock); - atomic_inc(&lcm->lcm_thread_numidle); - list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_idle); - spin_unlock(&lcm->lcm_thread_lock); - - wait_event_interruptible(lcm->lcm_waitq, - !list_empty(&lcm->lcm_llcd_pending) || - lcm->lcm_flags & LLOG_LCM_FL_EXIT); - - /* If we are the last available thread, start a new one in case - * we get blocked on an RPC (nobody else will start a new one)*/ - spin_lock(&lcm->lcm_thread_lock); - atomic_dec(&lcm->lcm_thread_numidle); - list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy); - spin_unlock(&lcm->lcm_thread_lock); - - sending_list = &lcm->lcm_llcd_pending; - resend: - if (import) - class_import_put(import); - import = NULL; - if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) { - lcm->lcm_llcd_maxfree = 0; - lcm->lcm_llcd_minfree = 0; - lcm->lcm_thread_max = 0; - - if (list_empty(&lcm->lcm_llcd_pending) || - lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) - break; - } + struct llog_commit_master *lcm; + struct llog_canceld_ctxt *llcd; - if (atomic_read(&lcm->lcm_thread_numidle) <= 1 && - atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) { - rc = llog_start_commit_thread(lcm); - if (rc < 0) - CERROR("error starting thread: rc %d\n", rc); - } + LASSERT(ctxt != NULL); + LASSERT_SEM_LOCKED(&ctxt->loc_sem); - /* Move all of the pending cancels from the same OST off of - * the list, so we don't get multiple threads blocked and/or - * doing upcalls on the same OST in case of failure. */ - spin_lock(&lcm->lcm_llcd_lock); - if (!list_empty(sending_list)) { - list_move_tail(sending_list->next, - &lcd->lcd_llcd_list); - llcd = list_entry(lcd->lcd_llcd_list.next, - typeof(*llcd), llcd_list); - LASSERT(llcd->llcd_lcm == lcm); - import = llcd->llcd_ctxt->loc_imp; - if (import) - class_import_get(import); - } - list_for_each_entry_safe(llcd, n, sending_list, llcd_list) { - LASSERT(llcd->llcd_lcm == lcm); - if (import == llcd->llcd_ctxt->loc_imp) - list_move_tail(&llcd->llcd_list, - &lcd->lcd_llcd_list); - } - if (sending_list != &lcm->lcm_llcd_resend) { - list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend, - llcd_list) { - LASSERT(llcd->llcd_lcm == lcm); - if (import == llcd->llcd_ctxt->loc_imp) - list_move_tail(&llcd->llcd_list, - &lcd->lcd_llcd_list); - } - } - spin_unlock(&lcm->lcm_llcd_lock); - - /* We are the only one manipulating our local list - no lock */ - list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){ - char *bufs[2] = { NULL, (char *)llcd->llcd_cookies }; - - list_del(&llcd->llcd_list); - if (llcd->llcd_cookiebytes == 0) { - CDEBUG(D_RPCTRACE, "put empty llcd %p:%p\n", - llcd, llcd->llcd_ctxt); - llcd_put(llcd); - continue; - } - - mutex_down(&llcd->llcd_ctxt->loc_sem); - if (llcd->llcd_ctxt->loc_imp == NULL) { - mutex_up(&llcd->llcd_ctxt->loc_sem); - CWARN("import will be destroyed, put " - "llcd %p:%p\n", llcd, llcd->llcd_ctxt); - llcd_put(llcd); - continue; - } - mutex_up(&llcd->llcd_ctxt->loc_sem); - - if (!import || (import == LP_POISON) || - (import->imp_client == LP_POISON)) { - CERROR("No import %p (llcd=%p, ctxt=%p)\n", - import, llcd, llcd->llcd_ctxt); - llcd_put(llcd); - continue; - } - - OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10); - - request = ptlrpc_request_alloc(import, &RQF_LOG_CANCEL); - if (request == NULL) { - rc = -ENOMEM; - stop_log_commit(lcm, lcd, rc); - break; - } - - req_capsule_set_size(&request->rq_pill, &RMF_LOGCOOKIES, - RCL_CLIENT,llcd->llcd_cookiebytes); - - rc = ptlrpc_request_bufs_pack(request, - LUSTRE_LOG_VERSION, - OBD_LOG_CANCEL, bufs, - NULL); - if (rc) { - ptlrpc_request_free(request); - stop_log_commit(lcm, lcd, rc); - break; - } - - /* bug 5515 */ - request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL; - request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL; - ptlrpc_at_set_req_timeout(request); - - ptlrpc_request_set_replen(request); - mutex_down(&llcd->llcd_ctxt->loc_sem); - if (llcd->llcd_ctxt->loc_imp == NULL) { - mutex_up(&llcd->llcd_ctxt->loc_sem); - CWARN("import will be destroyed, put " - "llcd %p:%p\n", llcd, llcd->llcd_ctxt); - llcd_put(llcd); - ptlrpc_req_finished(request); - continue; - } - mutex_up(&llcd->llcd_ctxt->loc_sem); - rc = ptlrpc_queue_wait(request); - ptlrpc_req_finished(request); - - /* If the RPC failed, we put this and the remaining - * messages onto the resend list for another time. */ - if (rc == 0) { - llcd_put(llcd); - continue; - } - - CERROR("commit %p:%p drop %d cookies: rc %d\n", - llcd, llcd->llcd_ctxt, - (int)(llcd->llcd_cookiebytes / - sizeof(*llcd->llcd_cookies)), rc); - llcd_put(llcd); - } + llcd = ctxt->loc_llcd; + if (!llcd) + return NULL; - if (rc == 0) { - sending_list = &lcm->lcm_llcd_resend; - if (!list_empty(sending_list)) - goto resend; - } - } while(1); + lcm = ctxt->loc_lcm; + if (atomic_read(&lcm->lcm_count) == 0) { + CERROR("Invalid detach occured %p:%p\n", ctxt, llcd); + llcd_print(llcd, __FUNCTION__, __LINE__); + LBUG(); + } + atomic_dec(&lcm->lcm_count); + ctxt->loc_llcd = NULL; + + CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p (%d)\n", + llcd, ctxt, atomic_read(&lcm->lcm_count)); - if (import) - class_import_put(import); + llog_ctxt_put(ctxt); + return llcd; +} - /* If we are force exiting, just drop all of the cookies. */ - if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) { - spin_lock(&lcm->lcm_llcd_lock); - list_splice_init(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list); - list_splice_init(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list); - list_splice_init(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list); - spin_unlock(&lcm->lcm_llcd_lock); +/** + * Return @llcd cached in @ctxt. Allocate new one if required. Attach it + * to ctxt so that it may be used for gathering cookies and sending. + */ +static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt) +{ + struct llog_canceld_ctxt *llcd; - list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list) - llcd_put(llcd); + llcd = llcd_alloc(); + if (!llcd) { + CERROR("Couldn't alloc an llcd for ctxt %p\n", ctxt); + return NULL; } + llcd_attach(ctxt, llcd); + return llcd; +} - spin_lock(&lcm->lcm_thread_lock); - list_del(&lcd->lcd_lcm_list); - spin_unlock(&lcm->lcm_thread_lock); - OBD_FREE(lcd, sizeof(*lcd)); +/** + * Deatch llcd from its @ctxt. Free llcd. + */ +static void llcd_put(struct llog_ctxt *ctxt) +{ + struct llog_canceld_ctxt *llcd; - CDEBUG(D_HA, "%s exiting\n", cfs_curproc_comm()); + llcd = llcd_detach(ctxt); + if (llcd) + llcd_free(llcd); +} - spin_lock(&lcm->lcm_thread_lock); - atomic_dec(&lcm->lcm_thread_total); - spin_unlock(&lcm->lcm_thread_lock); - cfs_waitq_signal(&lcm->lcm_waitq); +/** + * Detach llcd from its @ctxt so that nobody will find it with try to + * re-use. Send llcd to remote node. + */ +static int llcd_push(struct llog_ctxt *ctxt) +{ + struct llog_canceld_ctxt *llcd; + int rc; - return 0; + /* + * Make sure that this llcd will not be sent again as we detach + * it from ctxt. + */ + llcd = llcd_detach(ctxt); + if (!llcd) { + CERROR("Invalid detached llcd found %p\n", llcd); + llcd_print(llcd, __FUNCTION__, __LINE__); + LBUG(); + } + + rc = llcd_send(llcd); + if (rc) + CERROR("Couldn't send llcd %p (%d)\n", llcd, rc); + return rc; } -int llog_start_commit_thread(struct llog_commit_master *lcm) +static atomic_t llog_tcount = ATOMIC_INIT(0); + +/** + * Start recovery thread which actually deals llcd sending. This + * is all ptlrpc standard thread based so there is not much of work + * to do. + */ +int llog_recov_thread_start(struct llog_commit_master *lcm) { int rc; ENTRY; - if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max) - RETURN(0); - - rc = cfs_kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES); - if (rc < 0) { - CERROR("error starting thread #%d: %d\n", - atomic_read(&lcm->lcm_thread_total), rc); + rc = ptlrpcd_start(lcm->lcm_name, &lcm->lcm_pc); + if (rc) { + CERROR("Error %d while starting recovery thread %s\n", + rc, lcm->lcm_name); RETURN(rc); } + lcm->lcm_set = lcm->lcm_pc.pc_set; + atomic_inc(&llog_tcount); - RETURN(0); -} -EXPORT_SYMBOL(llog_start_commit_thread); - -static struct llog_process_args { - struct semaphore llpa_sem; - struct llog_ctxt *llpa_ctxt; - void *llpa_cb; - void *llpa_arg; -} llpa; - -int llog_init_commit_master(struct llog_commit_master *lcm) -{ - CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy); - CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle); - spin_lock_init(&lcm->lcm_thread_lock); - atomic_set(&lcm->lcm_thread_numidle, 0); - cfs_waitq_init(&lcm->lcm_waitq); - CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_pending); - CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_resend); - CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_free); - spin_lock_init(&lcm->lcm_llcd_lock); - atomic_set(&lcm->lcm_llcd_numfree, 0); - lcm->lcm_llcd_minfree = 0; - lcm->lcm_thread_max = 5; - /* FIXME initialize semaphore for llog_process_args */ - sema_init(&llpa.llpa_sem, 1); - return 0; + RETURN(rc); } -EXPORT_SYMBOL(llog_init_commit_master); +EXPORT_SYMBOL(llog_recov_thread_start); -int llog_cleanup_commit_master(struct llog_commit_master *lcm, - int force) +/** + * Stop recovery thread. Complement to llog_recov_thread_start(). + */ +void llog_recov_thread_stop(struct llog_commit_master *lcm, int force) { - lcm->lcm_flags |= LLOG_LCM_FL_EXIT; - if (force) - lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE; - cfs_waitq_signal(&lcm->lcm_waitq); - - wait_event_interruptible(lcm->lcm_waitq, - atomic_read(&lcm->lcm_thread_total) == 0); - return 0; + ENTRY; + + /** + * Let all know that we're stopping. This will also make + * llcd_send() refuse any new llcds. + */ + set_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags); + + /** + * Stop processing thread. No new rpcs will be accepted for + * for processing now. + */ + ptlrpcd_stop(&lcm->lcm_pc, force); + + /* + * No llcds on this @lcm should left. + */ + LASSERTF(atomic_read(&lcm->lcm_count) == 0, + "Busy llcds found on lcm %p - (%d)\n", + lcm, atomic_read(&lcm->lcm_count)); + EXIT; } -EXPORT_SYMBOL(llog_cleanup_commit_master); +EXPORT_SYMBOL(llog_recov_thread_stop); -static int log_process_thread(void *args) +/** + * Initialize commit master structure and start recovery thread on it. + */ +struct llog_commit_master *llog_recov_thread_init(char *name) { - struct llog_process_args *data = args; - struct llog_ctxt *ctxt = data->llpa_ctxt; - void *cb = data->llpa_cb; - struct llog_logid logid = *(struct llog_logid *)(data->llpa_arg); - struct llog_handle *llh = NULL; + struct llog_commit_master *lcm; int rc; ENTRY; - mutex_up(&data->llpa_sem); - ptlrpc_daemonize("llog_process"); /* thread does IO to log files */ + OBD_ALLOC_PTR(lcm); + if (!lcm) + RETURN(NULL); - rc = llog_create(ctxt, &llh, &logid, NULL); + /* + * Try to create threads with unique names and user id. + */ + snprintf(lcm->lcm_name, sizeof(lcm->lcm_name), + "ll_log_commit_%s_%02d", name, + atomic_read(&llog_tcount)); + + strncpy(lcm->lcm_name, name, sizeof(lcm->lcm_name)); + atomic_set(&lcm->lcm_count, 0); + rc = llog_recov_thread_start(lcm); if (rc) { - CERROR("llog_create failed %d\n", rc); + CERROR("Can't start commit thread, rc %d\n", rc); GOTO(out, rc); } - rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL); - if (rc) { - CERROR("llog_init_handle failed %d\n", rc); - GOTO(release_llh, rc); - } - - if (cb) { - rc = llog_cat_process(llh, (llog_cb_t)cb, NULL); - if (rc != LLOG_PROC_BREAK) - CERROR("llog_cat_process failed %d\n", rc); - } else { - CWARN("no callback function for recovery\n"); - } - - CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n", - ctxt->loc_llcd, ctxt); - llog_sync(ctxt, NULL); - -release_llh: - rc = llog_cat_put(llh); - if (rc) - CERROR("llog_cat_put failed %d\n", rc); + RETURN(lcm); out: - llog_ctxt_put(ctxt); - RETURN(rc); + OBD_FREE_PTR(lcm); + return NULL; +} +EXPORT_SYMBOL(llog_recov_thread_init); + +/** + * Finalize commit master and its recovery thread. + */ +void llog_recov_thread_fini(struct llog_commit_master *lcm, int force) +{ + ENTRY; + llog_recov_thread_stop(lcm, force); + OBD_FREE_PTR(lcm); + EXIT; } +EXPORT_SYMBOL(llog_recov_thread_fini); -static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg) +static int llog_obd_repl_generic(struct llog_ctxt *ctxt, + void *handle, void *arg) { struct obd_device *obd = ctxt->loc_obd; + struct llog_process_cat_args *lpca; int rc; ENTRY; if (obd->obd_stopping) RETURN(-ENODEV); - mutex_down(&llpa.llpa_sem); - llpa.llpa_cb = handle; - llpa.llpa_arg = arg; - llpa.llpa_ctxt = llog_ctxt_get(ctxt); - if (!llpa.llpa_ctxt) { - up(&llpa.llpa_sem); + /* + * This will be balanced in llog_cat_process_thread() + */ + OBD_ALLOC_PTR(lpca); + if (!lpca) + RETURN(-ENOMEM); + + lpca->lpca_cb = handle; + lpca->lpca_arg = arg; + + /* + * This will be balanced in llog_cat_process_thread() + */ + lpca->lpca_ctxt = llog_ctxt_get(ctxt); + if (!lpca->lpca_ctxt) { + OBD_FREE_PTR(lpca); RETURN(-ENODEV); } - rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES); + rc = cfs_kernel_thread(llog_cat_process_thread, lpca, + CLONE_VM | CLONE_FILES); if (rc < 0) { + CERROR("Error starting llog_cat_process_thread(): %d\n", rc); + OBD_FREE_PTR(lpca); llog_ctxt_put(ctxt); - CERROR("error starting log_process_thread: %d\n", rc); } else { - CDEBUG(D_HA, "log_process_thread: %d\n", rc); + CDEBUG(D_HA, "Started llog_cat_process_thread(): %d\n", rc); rc = 0; } RETURN(rc); } -int llog_repl_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_gen *gen, - struct obd_uuid *uuid) +int llog_obd_repl_connect(struct llog_ctxt *ctxt, int count, + struct llog_logid *logid, struct llog_gen *gen, + struct obd_uuid *uuid) { struct llog_canceld_ctxt *llcd; int rc; ENTRY; - /* send back llcd before recovery from llog */ - if (ctxt->loc_llcd != NULL) { - CWARN("llcd %p:%p not empty\n", ctxt->loc_llcd, ctxt); + mutex_down(&ctxt->loc_sem); + + /* + * Send back cached llcd before recovery from llog if we have any. + */ + if (ctxt->loc_llcd) { + CWARN("Llcd %p:%p is not empty\n", ctxt->loc_llcd, ctxt); + mutex_up(&ctxt->loc_sem); llog_sync(ctxt, NULL); + mutex_down(&ctxt->loc_sem); } - mutex_down(&ctxt->loc_sem); - ctxt->loc_gen = *gen; - llcd = ctxt_llcd_grab(ctxt); - if (llcd == NULL) { - CERROR("couldn't get an llcd\n"); + llcd = llcd_get(ctxt); + if (!llcd) { mutex_up(&ctxt->loc_sem); RETURN(-ENOMEM); } - mutex_up(&ctxt->loc_sem); - rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid); + ctxt->loc_gen = *gen; + + rc = llog_obd_repl_generic(ctxt, ctxt->llog_proc_cb, logid); if (rc != 0) { - ctxt_llcd_put(ctxt); - CERROR("error recovery process: %d\n", rc); + llcd_put(ctxt); + CERROR("Error recovery process: %d\n", rc); + } + mutex_up(&ctxt->loc_sem); + RETURN(rc); +} +EXPORT_SYMBOL(llog_obd_repl_connect); + +/** + * Deleted objects have a commit callback that cancels the MDS + * log record for the deletion. The commit callback calls this + * function. + */ +int llog_obd_repl_cancel(struct llog_ctxt *ctxt, + struct lov_stripe_md *lsm, int count, + struct llog_cookie *cookies, int flags) +{ + struct llog_canceld_ctxt *llcd; + int rc = 0; + ENTRY; + + LASSERT(ctxt != NULL); + + mutex_down(&ctxt->loc_sem); + + /* + * Let's check if we have all structures alive. We also check for + * possible shutdown. Do nothing if we're stopping. + */ + if (ctxt->loc_imp == NULL) { + CDEBUG(D_RPCTRACE, "No import for ctxt %p\n", ctxt); + GOTO(out, rc = -ENODEV); + } + + if (ctxt->loc_obd->obd_stopping) { + CDEBUG(D_RPCTRACE, "Obd is stopping for ctxt %p\n", ctxt); + GOTO(out, rc = -ENODEV); + } + + if (test_bit(LLOG_LCM_FL_EXIT, &ctxt->loc_lcm->lcm_flags)) { + CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n", + ctxt); + GOTO(out, rc = -ENODEV); + } + + llcd = ctxt->loc_llcd; + + if (count > 0 && cookies != NULL) { + /* + * Get new llcd from ctxt if required. + */ + if (!llcd) { + llcd = llcd_get(ctxt); + if (!llcd) + GOTO(out, rc = -ENOMEM); + } + + /* + * Llcd does not have enough room for @cookies. Let's push + * it out and allocate new one. + */ + if (!llcd_fit(llcd, cookies)) { + rc = llcd_push(ctxt); + if (rc) + GOTO(out, rc); + llcd = llcd_get(ctxt); + if (!llcd) + GOTO(out, rc = -ENOMEM); + } + + /* + * Copy cookies to @llcd, no matter old or new allocated one. + */ + llcd_copy(llcd, cookies); + } + + /* + * Let's check if we need to send copied @cookies asap. If yes - do it. + */ + if (llcd && (flags & OBD_LLOG_FL_SENDNOW)) { + rc = llcd_push(ctxt); + if (rc) + GOTO(out, rc); + } + EXIT; +out: + mutex_up(&ctxt->loc_sem); + return rc; +} +EXPORT_SYMBOL(llog_obd_repl_cancel); + +int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp) +{ + int rc = 0; + ENTRY; + + if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) { + CDEBUG(D_RPCTRACE, "Reverse import disconnect\n"); + /* + * Check for llcd which might be left attached to @ctxt. + * Let's kill it. + */ + mutex_down(&ctxt->loc_sem); + llcd_put(ctxt); + mutex_up(&ctxt->loc_sem); + } else { + rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW); } RETURN(rc); } -EXPORT_SYMBOL(llog_repl_connect); +EXPORT_SYMBOL(llog_obd_repl_sync); #else /* !__KERNEL__ */ @@ -686,3 +646,46 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, return 0; } #endif + +/** + * Module init time fucntion. Initializes slab for llcd objects. + */ +int llog_recov_init(void) +{ + int llcd_size; + + llcd_size = CFS_PAGE_SIZE - + lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL); + llcd_size += offsetof(struct llog_canceld_ctxt, llcd_cookies); + llcd_cache = cfs_mem_cache_create("llcd_cache", llcd_size, 0, 0); + if (!llcd_cache) { + CERROR("Error allocating llcd cache\n"); + return -ENOMEM; + } + return 0; +} + +/** + * Module fini time fucntion. Releases slab for llcd objects. + */ +void llog_recov_fini(void) +{ + int count; + + /* + * Kill llcd cache when thread is stopped and we're sure no + * llcd in use left. + */ + if (llcd_cache) { + /* + * In 2.6.22 cfs_mem_cache_destroy() will not return error + * for busy resources. Let's check it another way. + */ + count = atomic_read(&llcd_count); + LASSERTF(count == 0, "Can't destroy llcd cache! Number of " + "busy llcds: %d\n", count); + cfs_mem_cache_destroy(llcd_cache); + llcd_cache = NULL; + } +} +