From 6ecb6b0ac0b43c006f5f660b7217a47cd50b5423 Mon Sep 17 00:00:00 2001 From: ericm Date: Thu, 13 Nov 2003 07:54:41 +0000 Subject: [PATCH] again merge b_devel to b_eq 20031113 yesterday's merge brought in some nasty bugs. --- lustre/include/linux/lustre_commit_confd.h | 11 +- lustre/include/linux/lustre_log.h | 131 +++++++++++++++------- lustre/llite/llite_lib.c | 4 +- lustre/mds/commit_confd.c | 4 +- lustre/obdfilter/filter_internal.h | 5 +- lustre/obdfilter/filter_log.c | 60 +++++++++- lustre/ptlrpc/recov_thread.c | 171 ++++++++++++++++++++++++++--- 7 files changed, 322 insertions(+), 64 deletions(-) diff --git a/lustre/include/linux/lustre_commit_confd.h b/lustre/include/linux/lustre_commit_confd.h index 980e6ce..a749911 100644 --- a/lustre/include/linux/lustre_commit_confd.h +++ b/lustre/include/linux/lustre_commit_confd.h @@ -26,11 +26,12 @@ #include -struct llog_commit_data { +struct llog_canceld_ctxt { struct list_head llcd_list; /* free or pending struct list */ struct obd_import *llcd_import; struct llog_commit_master *llcd_lcm; int llcd_tries; /* number of tries to send */ + struct llog_ctxt_gen llcd_gen; int llcd_cookiebytes; struct llog_cookie llcd_cookies[0]; }; @@ -46,9 +47,9 @@ struct llog_commit_master { int lcm_flags; wait_queue_head_t lcm_waitq; - struct list_head lcm_llcd_pending; /* llog_commit_data to send */ + struct list_head lcm_llcd_pending; /* llog_canceld_ctxt to send */ struct list_head lcm_llcd_resend; /* try to resend this data */ - struct list_head lcm_llcd_free; /* free llog_commit_data */ + struct list_head lcm_llcd_free; /* free llog_canceld_ctxt */ spinlock_t lcm_llcd_lock; /* protects llcd_free */ atomic_t lcm_llcd_numfree; /* items on llcd_free */ int lcm_llcd_minfree; /* min free on llcd_free */ @@ -67,7 +68,7 @@ struct llog_commit_daemon { /* ptlrpc/recov_thread.c */ int llog_start_commit_thread(void); -struct llog_commit_data *llcd_grab(void); -void llcd_send(struct llog_commit_data *llcd); +struct llog_canceld_ctxt *llcd_grab(void); +void llcd_send(struct llog_canceld_ctxt *llcd); #endif /* _LUSTRE_COMMIT_CONFD_H */ diff --git a/lustre/include/linux/lustre_log.h b/lustre/include/linux/lustre_log.h index 6e2ce07..c6cbd64 100644 --- a/lustre/include/linux/lustre_log.h +++ b/lustre/include/linux/lustre_log.h @@ -38,6 +38,9 @@ #include #include +#define LOG_NAME_LIMIT(logname, name) \ + snprintf(logname, sizeof(logname), "LOGS/%s", name) + struct plain_handle_data { struct list_head phd_entry; struct llog_handle *phd_cat_handle; @@ -54,11 +57,10 @@ struct cat_handle_data { struct llog_handle { struct semaphore lgh_lock; struct llog_logid lgh_id; /* id of this log */ - struct obd_device *lgh_obd; struct llog_log_hdr *lgh_hdr; struct file *lgh_file; int lgh_last_idx; - struct llog_obd_ctxt *lgh_ctxt; + struct llog_ctxt *lgh_ctxt; union { struct plain_handle_data phd; struct cat_handle_data chd; @@ -92,18 +94,19 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data); /* llog_obd.c */ int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd, int count, struct llog_logid *logid, struct llog_operations *op); -int llog_cleanup(struct llog_obd_ctxt *); -int llog_add(struct llog_obd_ctxt *ctxt, +int llog_cleanup(struct llog_ctxt *); +int llog_precleanup(struct llog_ctxt *); +int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, struct lov_stripe_md *lsm, struct llog_cookie *logcookies, int numcookies); -int llog_cancel(struct llog_obd_ctxt *, struct lov_stripe_md *lsm, +int llog_cancel(struct llog_ctxt *, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags); int llog_obd_origin_setup(struct obd_device *obd, int index, struct obd_device *disk_obd, int count, struct llog_logid *logid); -int llog_obd_origin_cleanup(struct llog_obd_ctxt *ctxt); -int llog_obd_origin_add(struct llog_obd_ctxt *ctxt, +int llog_obd_origin_cleanup(struct llog_ctxt *ctxt); +int llog_obd_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, struct lov_stripe_md *lsm, struct llog_cookie *logcookies, int numcookies); @@ -114,15 +117,19 @@ int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd, int obd_llog_finish(struct obd_device *obd, int count); /* llog_net.c */ -int llog_initiator_connect(struct llog_obd_ctxt *ctxt); -int llog_receptor_accept(struct llog_obd_ctxt *ctxt, struct obd_import *imp); -int llog_origin_handle_cancel(struct llog_obd_ctxt *ctxt, - struct ptlrpc_request *req); +int llog_initiator_connect(struct llog_ctxt *ctxt); +int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp); +int llog_origin_handle_cancel(struct ptlrpc_request *req); +int llog_origin_connect(struct llog_ctxt *ctxt, int count, + struct llog_logid *logid, struct llog_ctxt_gen *gen); +int llog_handle_connect(struct ptlrpc_request *req); /* recov_thread.c */ -int llog_obd_repl_cancel(struct llog_obd_ctxt *ctxt, +int llog_obd_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags); +int llog_repl_connect(struct llog_ctxt *ctxt, int count, + struct llog_logid *logid, struct llog_ctxt_gen *gen); struct llog_operations { int (*lop_write_rec)(struct llog_handle *loghandle, @@ -138,7 +145,7 @@ struct llog_operations { __u64 *offset, void *buf, int len); - int (*lop_create)(struct llog_obd_ctxt *ctxt, struct llog_handle **, + int (*lop_create)(struct llog_ctxt *ctxt, struct llog_handle **, struct llog_logid *logid, char *name); int (*lop_close)(struct llog_handle *handle); int (*lop_read_header)(struct llog_handle *handle); @@ -146,51 +153,59 @@ struct llog_operations { int (*lop_setup)(struct obd_device *obd, int ctxt_idx, struct obd_device *disk_obd, int count, struct llog_logid *logid); - int (*lop_cleanup)(struct llog_obd_ctxt *ctxt); - int (*lop_add)(struct llog_obd_ctxt *ctxt, struct llog_rec_hdr *rec, + int (*lop_precleanup)(struct llog_ctxt *ctxt); + int (*lop_cleanup)(struct llog_ctxt *ctxt); + int (*lop_add)(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, struct lov_stripe_md *lsm, struct llog_cookie *logcookies, int numcookies); - int (*lop_cancel)(struct llog_obd_ctxt *ctxt, struct lov_stripe_md *lsm, + int (*lop_cancel)(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags); + int (*lop_connect)(struct llog_ctxt *ctxt, int count, + struct llog_logid *logid, struct llog_ctxt_gen *gen); /* XXX add 2 more: commit callbacks and llog recovery functions */ }; extern struct llog_operations llog_lvfs_ops; -/* MDS stored handles in OSC */ -#define LLOG_OBD_DEL_LOG_HANDLE 0 - -/* OBDFILTER stored handles in OBDFILTER */ -#define LLOG_OBD_SZ_LOG_HANDLE 0 -#define LLOG_OBD_RD1_LOG_HANDLE 1 -struct llog_obd_ctxt { +struct llog_ctxt { int loc_idx; /* my index the obd array of ctxt's */ + struct llog_ctxt_gen loc_gen; struct obd_device *loc_obd; /* points back to the containing obd*/ struct obd_export *loc_exp; struct obd_import *loc_imp; /* to use in RPC's: can be backward pointing import */ struct llog_operations *loc_logops; struct llog_handle *loc_handle; - struct llog_commit_data *loc_llcd; + struct llog_canceld_ctxt *loc_llcd; struct semaphore loc_sem; /* protects loc_llcd */ + void *llog_proc_cb; }; -#if 0 -int obd_log_cancel(struct obd_export *exp, struct llog_handle *cathandle, - void *buf, int count, struct llog_cookie *cookies, - int flags); +static inline void log_gen_init(struct llog_ctxt *ctxt) +{ + struct obd_device *obd = ctxt->loc_exp->exp_obd; + if (!strcmp(obd->obd_type->typ_name, "mds")) + ctxt->loc_gen.mnt_cnt = obd->u.mds.mds_mount_count; + else if (!strstr(obd->obd_type->typ_name, "filter")) { + ctxt->loc_gen.mnt_cnt = obd->u.filter.fo_mount_count; + } + else + ctxt->loc_gen.mnt_cnt = 0; +} + +static inline int log_gen_lt(struct llog_ctxt_gen a, struct llog_ctxt_gen b) +{ + if (a.mnt_cnt < b.mnt_cnt) + return 1; + if (a.mnt_cnt > b.mnt_cnt) + return 0; + return(a.conn_cnt < b.conn_cnt ? 1 : 0); +} -int llog_originator_setup(struct obd_device *, int); -int llog_originator_cleanup(struct obd_device *); -int llog_originator_open(struct obd_device *originator, - struct obd_device *disk_obd, - int index, int named, int flags, - struct obd_uuid *log_uuid); -#endif -static inline int llog_obd2ops(struct llog_obd_ctxt *ctxt, +static inline int llog_obd2ops(struct llog_ctxt *ctxt, struct llog_operations **lop) { if (ctxt == NULL) @@ -219,6 +234,15 @@ static inline int llog_data_len(int len) remains : (((len + mask) & (~mask)) + remains); } +static inline struct llog_ctxt *llog_get_context(struct obd_device *obd, + int index) +{ + if (index < 0 || index >= LLOG_MAX_CTXTS) + return NULL; + else + return obd->obd_llog_ctxt[index]; +} + static inline int llog_write_rec(struct llog_handle *handle, struct llog_rec_hdr *rec, struct llog_cookie *logcookies, @@ -316,7 +340,7 @@ static inline int llog_next_block(struct llog_handle *loghandle, int *cur_idx, RETURN(rc); } -static inline int llog_create(struct llog_obd_ctxt *ctxt, +static inline int llog_create(struct llog_ctxt *ctxt, struct llog_handle **res, struct llog_logid *logid, char *name) { @@ -333,4 +357,37 @@ static inline int llog_create(struct llog_obd_ctxt *ctxt, rc = lop->lop_create(ctxt, res, logid, name); RETURN(rc); } + +static inline int llog_connect(struct llog_ctxt *ctxt, int count, + struct llog_logid *logid, + struct llog_ctxt_gen *gen) +{ + struct llog_operations *lop; + int rc; + ENTRY; + + rc = llog_obd2ops(ctxt, &lop); + if (rc) + RETURN(rc); + if (lop->lop_connect == NULL) + RETURN(-EOPNOTSUPP); + + rc = lop->lop_connect(ctxt, count, logid, gen); + RETURN(rc); +} + +static inline int cathandle_print_cb(struct llog_handle *llh, + struct llog_rec_hdr *rec, void *data) +{ + struct llog_logid_rec *lir = (struct llog_logid_rec *)rec; + + if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) { + CERROR("invalid record in catalog\n"); + RETURN(-EINVAL); + } + + CDEBUG(D_HA, "seeing record at index %d in log "LPX64"\n", + le32_to_cpu(rec->lrh_index), lir->lid_id.lgl_oid); + RETURN(0); +} #endif diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index f141fee..4e3210d 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -358,7 +358,7 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile, char * name = "mdc_dev"; class_uuid_t uuid; struct obd_uuid mdc_uuid; - struct llog_obd_ctxt *ctxt; + struct llog_ctxt *ctxt; int rc = 0; int err; ENTRY; @@ -427,7 +427,7 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile, exp = class_conn2export(&mdc_conn); - ctxt = exp->exp_obd->obd_llog_ctxt[LLOG_CONFIG_REPL_CTXT]; + ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT); rc = class_config_parse_llog(ctxt, profile, cfg); if (rc) { CERROR("class_config_parse_llog failed: rc = %d\n", rc); diff --git a/lustre/mds/commit_confd.c b/lustre/mds/commit_confd.c index 557dc55..d08bd4b 100644 --- a/lustre/mds/commit_confd.c +++ b/lustre/mds/commit_confd.c @@ -24,13 +24,13 @@ void commit_confd_conf_import(struct obd_import *import, list_for_each_safe(&import->import_cc_list, tmp, save) { - struct llog_commit_data *cd; + struct llog_canceld_ctxt *cd; if (atomic_read(import->import_cc_count) <= lccd->llcconf_lowwater) break; - cd = list_entry(tmp, struct llog_commit_data *, llcconf_entry); + cd = list_entry(tmp, struct llog_canceld_ctxt *, llcconf_entry); atomic_dec(&import->import_cc_count); commit_confd_add_and_fire(cd); } diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index 32fa3fb..d5bb1ea 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -141,10 +141,11 @@ int filter_log_sz_change(struct llog_handle *cathandle, __u32 io_epoch, struct llog_cookie *logcookie, struct inode *inode); -int filter_get_catalog(struct obd_device *); +//int filter_get_catalog(struct obd_device *); void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno, void *cb_data, int error); - +int filter_recov_log_unlink_cb(struct llog_handle *llh, + struct llog_rec_hdr *rec, void *data); /* filter_san.c */ int filter_san_setup(struct obd_device *obd, obd_count len, void *buf); diff --git a/lustre/obdfilter/filter_log.c b/lustre/obdfilter/filter_log.c index 8f526b6..341ad6c 100644 --- a/lustre/obdfilter/filter_log.c +++ b/lustre/obdfilter/filter_log.c @@ -97,7 +97,63 @@ void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno, void *cb_data, int error) { struct llog_cookie *cookie = cb_data; - llog_obd_repl_cancel(obd->obd_llog_ctxt[LLOG_UNLINK_REPL_CTXT], - NULL, 1, cookie, OBD_LLOG_FL_SENDNOW); + llog_cancel(llog_get_context(obd, cookie->lgc_subsys + 1), + NULL, 1, cookie, 0); + //NULL, 1, cookie, OBD_LLOG_FL_SENDNOW); OBD_FREE(cb_data, sizeof(struct llog_cookie)); } + +/* Callback for processing the unlink log record received from MDS by + * llog_client_api. + */ +int filter_recov_log_unlink_cb(struct llog_handle *llh, + struct llog_rec_hdr *rec, void *data) +{ + struct llog_ctxt *ctxt = llh->lgh_ctxt; + struct obd_device *obd = ctxt->loc_obd; + struct obd_export *exp = obd->obd_self_export; + struct llog_cookie cookie; + struct llog_unlink_rec *lur; + struct obdo *oa; + struct obd_trans_info oti = { 0 }; + obd_id oid; + int rc = 0; + ENTRY; + + if (!le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) { + CERROR("log is not plain\n"); + RETURN(-EINVAL); + } + if (rec->lrh_type != MDS_UNLINK_REC) { + CERROR("log record type error\n"); + RETURN(-EINVAL); + } + + cookie.lgc_lgl = llh->lgh_id; + cookie.lgc_subsys = LLOG_UNLINK_ORIG_CTXT; + cookie.lgc_index = le32_to_cpu(rec->lrh_index); + + lur = (struct llog_unlink_rec *)rec; + + oa = obdo_alloc(); + if (oa == NULL) + RETURN(-ENOMEM); + oa->o_valid |= OBD_MD_FLCOOKIE; + oa->o_id = lur->lur_oid; + oa->o_gr = lur->lur_ogen; + memcpy(obdo_logcookie(oa), &cookie, sizeof(cookie)); + oid = oa->o_id; + + rc = obd_destroy(exp, oa, NULL, &oti); + obdo_free(oa); + if (rc == -ENOENT) { + CERROR("object already removed: send cookie\n"); + llog_cancel(ctxt, NULL, 1, &cookie, 0); + RETURN(0); + } + + if (rc == 0) + CERROR("object: "LPU64" in record destroyed successful\n", oid); + + RETURN(rc); +} diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c index 2a8bfbb..4772481 100644 --- a/lustre/ptlrpc/recov_thread.c +++ b/lustre/ptlrpc/recov_thread.c @@ -58,9 +58,10 @@ static struct llog_commit_master *lcm = &lustre_lcm; /* Allocate new commit structs in case we do not have enough */ static int llcd_alloc(void) { - struct llog_commit_data *llcd; + struct llog_canceld_ctxt *llcd; + int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies); - OBD_ALLOC(llcd, PAGE_SIZE); + OBD_ALLOC(llcd, PAGE_SIZE + offset); if (llcd == NULL) return -ENOMEM; @@ -75,9 +76,9 @@ static int llcd_alloc(void) } /* Get a free cookie struct from the list */ -struct llog_commit_data *llcd_grab(void) +struct llog_canceld_ctxt *llcd_grab(void) { - struct llog_commit_data *llcd; + struct llog_canceld_ctxt *llcd; spin_lock(&lcm->lcm_llcd_lock); if (list_empty(&lcm->lcm_llcd_free)) { @@ -101,10 +102,12 @@ struct llog_commit_data *llcd_grab(void) } EXPORT_SYMBOL(llcd_grab); -static void llcd_put(struct llog_commit_data *llcd) +static void llcd_put(struct llog_canceld_ctxt *llcd) { + int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies); + if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) { - OBD_FREE(llcd, PAGE_SIZE); + OBD_FREE(llcd, PAGE_SIZE + offset); } else { spin_lock(&lcm->lcm_llcd_lock); list_add(&llcd->llcd_list, &lcm->lcm_llcd_free); @@ -114,7 +117,7 @@ static void llcd_put(struct llog_commit_data *llcd) } /* Send some cookies to the appropriate target */ -void llcd_send(struct llog_commit_data *llcd) +void llcd_send(struct llog_canceld_ctxt *llcd) { spin_lock(&llcd->llcd_lcm->lcm_llcd_lock); list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending); @@ -128,11 +131,11 @@ EXPORT_SYMBOL(llcd_send); * log record for the deletion. The commit callback calls this * function */ -int llog_obd_repl_cancel(struct llog_obd_ctxt *ctxt, +int llog_obd_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags) { - struct llog_commit_data *llcd; + struct llog_canceld_ctxt *llcd; int rc = 0; ENTRY; @@ -158,17 +161,18 @@ int llog_obd_repl_cancel(struct llog_obd_ctxt *ctxt, GOTO(out, rc = -ENOMEM); } llcd->llcd_import = ctxt->loc_imp; + llcd->llcd_gen = ctxt->loc_gen; ctxt->loc_llcd = llcd; } - memcpy(llcd->llcd_cookies + llcd->llcd_cookiebytes, cookies, + memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, cookies, sizeof(*cookies)); llcd->llcd_cookiebytes += sizeof(*cookies); - GOTO(send_now, rc); send_now: if ((PAGE_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) || flags & OBD_LLOG_FL_SENDNOW)) { + CDEBUG(D_HA, "send llcd: %p\n", llcd); ctxt->loc_llcd = NULL; llcd_send(llcd); } @@ -182,7 +186,7 @@ static int log_commit_thread(void *arg) { struct llog_commit_master *lcm = arg; struct llog_commit_daemon *lcd; - struct llog_commit_data *llcd, *n; + struct llog_canceld_ctxt *llcd, *n; unsigned long flags; ENTRY; @@ -290,7 +294,27 @@ static int log_commit_thread(void *arg) /* We are the only one manipulating our local list - no lock */ list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){ char *bufs[1] = {(char *)llcd->llcd_cookies}; + struct obd_device *obd = import->imp_obd; + struct llog_ctxt *ctxt; + list_del(&llcd->llcd_list); + if (llcd->llcd_cookiebytes == 0) { + CDEBUG(D_HA, "just put empty llcd %p\n", llcd); + llcd_put(llcd); + continue; + } + /* check whether the cookies are new. if new then send, otherwise + * just put llcd */ + ctxt = llog_get_context(obd, llcd->llcd_cookies[0].lgc_subsys + 1); + LASSERT(ctxt != NULL); + down(&ctxt->loc_sem); + if (log_gen_lt(llcd->llcd_gen, ctxt->loc_gen)) { + up(&ctxt->loc_sem); + CDEBUG(D_HA, "just put stale llcd %p\n", llcd); + llcd_put(llcd); + continue; + } + up(&ctxt->loc_sem); request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1, &llcd->llcd_cookiebytes, @@ -318,6 +342,7 @@ static int log_commit_thread(void *arg) continue; } +#if 0 /* FIXME just put llcd, not send it again */ spin_lock(&lcm->lcm_llcd_lock); list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend); if (++llcd->llcd_tries < 5) { @@ -329,12 +354,13 @@ static int log_commit_thread(void *arg) spin_unlock(&lcm->lcm_llcd_lock); } else { spin_unlock(&lcm->lcm_llcd_lock); +#endif CERROR("commit %p dropped %d cookies: rc %d\n", llcd, (int)(llcd->llcd_cookiebytes / sizeof(*llcd->llcd_cookies)), rc); llcd_put(llcd); - } +// } break; } @@ -390,6 +416,12 @@ int llog_start_commit_thread(void) } EXPORT_SYMBOL(llog_start_commit_thread); +static struct llog_process_args { + struct semaphore llpa_sem; + struct llog_ctxt *llpa_ctxt; + void *llpa_cb; + void *llpa_arg; +} llpa; int llog_init_commit_master(void) { INIT_LIST_HEAD(&lcm->lcm_thread_busy); @@ -404,6 +436,8 @@ int llog_init_commit_master(void) atomic_set(&lcm->lcm_llcd_numfree, 0); lcm->lcm_llcd_minfree = 0; lcm->lcm_thread_max = 5; + /* FIXME initialize semaphore for llog_process_args */ + sema_init(&llpa.llpa_sem, 1); return 0; } @@ -419,9 +453,118 @@ int llog_cleanup_commit_master(int force) return 0; } + +static int log_process_thread(void *args) +{ + struct llog_process_args *data = args; + struct llog_ctxt *ctxt = data->llpa_ctxt; + void *cb = data->llpa_cb; + struct llog_logid logid = *(struct llog_logid *)(data->llpa_arg); + struct llog_handle *llh = NULL; + unsigned long flags; + int rc; + ENTRY; + + up(&data->llpa_sem); + lock_kernel(); + ptlrpc_daemonize(); /* thread never needs to do IO */ + + SIGNAL_MASK_LOCK(current, flags); + sigfillset(¤t->blocked); + RECALC_SIGPENDING; + SIGNAL_MASK_UNLOCK(current, flags); + unlock_kernel(); + + rc = llog_create(ctxt, &llh, &logid, NULL); + if (rc) { + CERROR("llog_create failed %d\n", rc); + RETURN(rc); + } + rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL); + if (rc) { + CERROR("llog_init_handle failed %d\n", rc); + GOTO(out, rc); + } + + rc = llog_process(llh, cathandle_print_cb, NULL); + if (rc) { + CERROR("llog_process with cathandle_print_cb failed %d\n", rc); + GOTO(out, rc); + } + + if (cb) { + rc = llog_cat_process(llh, (llog_cb_t)cb, NULL); + if (rc) + CERROR("llog_cat_process failed %d\n", rc); + } else + CERROR("no cb func for recovery\n"); + + CDEBUG(D_HA, "send to llcd :%p forcibly\n", ctxt->loc_llcd); + llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW); +out: + rc = llog_cat_put(llh); + if (rc) + CERROR("llog_cat_put failed %d\n", rc); + + RETURN(rc); +} +static int llog_recovery_generic(struct llog_ctxt *ctxt, + void *handle, + void *arg) +{ + int rc; + ENTRY; + + down(&llpa.llpa_sem); + llpa.llpa_ctxt = ctxt; + llpa.llpa_cb = handle; + llpa.llpa_arg = arg; + + rc = kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES); + if (rc < 0) + CERROR("error starting log_process_thread: %d\n", rc); + else { + CDEBUG(D_HA, "log_process_thread: %d\n", rc); + rc = 0; + } + + RETURN(rc); +} +int llog_repl_connect(struct llog_ctxt *ctxt, int count, + struct llog_logid *logid, struct llog_ctxt_gen *gen) +{ + struct llog_canceld_ctxt *llcd; + int rc; + ENTRY; + + down(&ctxt->loc_sem); + ctxt->loc_gen = *gen; + llcd = ctxt->loc_llcd; + if (llcd) { + CDEBUG(D_HA, "put current llcd when new connection arrives\n"); + llcd_put(llcd); + } + llcd = llcd_grab(); + if (llcd == NULL) { + CERROR("couldn't get an llcd\n"); + RETURN(-ENOMEM); + } + llcd->llcd_import = ctxt->loc_imp; + llcd->llcd_gen = ctxt->loc_gen; + ctxt->loc_llcd = llcd; + up(&ctxt->loc_sem); + + rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid); + if (rc != 0) + CERROR("error recovery process: %d\n", rc); + + RETURN(rc); +} +EXPORT_SYMBOL(llog_repl_connect); + #else /* !__KERNEL__ */ -int llog_obd_repl_cancel(struct llog_obd_ctxt *ctxt, +int llog_obd_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags) { -- 1.8.3.1