From e0ba1a08ac7886be5aaca26953c54c7793e738d7 Mon Sep 17 00:00:00 2001 From: adilger Date: Thu, 5 Feb 2004 23:47:08 +0000 Subject: [PATCH] Update b_eq from HEAD (20040205_1502) Most of the changes are from the landing of b_orphan onto HEAD. --- lustre/include/linux/lustre_commit_confd.h | 3 +- lustre/include/linux/lustre_log.h | 107 ++++++++++++----------- lustre/llite/llite_lib.c | 2 +- lustre/obdfilter/filter_internal.h | 2 + lustre/obdfilter/filter_io.c | 17 +++- lustre/obdfilter/filter_log.c | 25 ++++-- lustre/ptlrpc/Makefile.mk | 7 +- lustre/ptlrpc/recov_thread.c | 133 ++++++++++++++++------------- 8 files changed, 172 insertions(+), 124 deletions(-) diff --git a/lustre/include/linux/lustre_commit_confd.h b/lustre/include/linux/lustre_commit_confd.h index a749911..6183596 100644 --- a/lustre/include/linux/lustre_commit_confd.h +++ b/lustre/include/linux/lustre_commit_confd.h @@ -28,10 +28,9 @@ struct llog_canceld_ctxt { struct list_head llcd_list; /* free or pending struct list */ - struct obd_import *llcd_import; + struct llog_ctxt *llcd_ctxt; struct llog_commit_master *llcd_lcm; int llcd_tries; /* number of tries to send */ - struct llog_ctxt_gen llcd_gen; int llcd_cookiebytes; struct llog_cookie llcd_cookies[0]; }; diff --git a/lustre/include/linux/lustre_log.h b/lustre/include/linux/lustre_log.h index c3ff249..2b62378 100644 --- a/lustre/include/linux/lustre_log.h +++ b/lustre/include/linux/lustre_log.h @@ -42,10 +42,10 @@ snprintf(logname, sizeof(logname), "LOGS/%s", name) struct plain_handle_data { - struct list_head phd_entry; - struct llog_handle *phd_cat_handle; - struct llog_cookie phd_cookie; /* cookie of this log in its cat */ - int phd_last_idx; + struct list_head phd_entry; + struct llog_handle *phd_cat_handle; + struct llog_cookie phd_cookie; /* cookie of this log in its cat */ + int phd_last_idx; }; struct cat_handle_data { @@ -71,9 +71,10 @@ struct llog_handle { /* llog.c - general API */ typedef int (*llog_cb_t)(struct llog_handle *, struct llog_rec_hdr *, void *); -int llog_init_handle(struct llog_handle *handle, int flags, +int llog_init_handle(struct llog_handle *handle, int flags, struct obd_uuid *uuid); -int llog_process(struct llog_handle *loghandle, llog_cb_t cb, void *data); +int llog_process(struct llog_handle *loghandle, llog_cb_t cb, + void *data, void *catdata); extern struct llog_handle *llog_alloc_handle(void); extern void llog_free_handle(struct llog_handle *handle); extern int llog_close(struct llog_handle *cathandle); @@ -84,26 +85,34 @@ struct llog_process_data { void *lpd_data; llog_cb_t lpd_cb; }; + +struct llog_process_cat_data { + int first_idx; + int last_idx; + /* to process catlog across zero record */ +}; + int llog_cat_put(struct llog_handle *cathandle); int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec, struct llog_cookie *reccookie, void *buf); int llog_cat_cancel_records(struct llog_handle *cathandle, int count, struct llog_cookie *cookies); int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data); +int llog_cat_set_first_idx(struct llog_handle *cathandle, int index); /* llog_obd.c */ int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd, - int count, struct llog_logid *logid, struct llog_operations *op); + int count, struct llog_logid *logid,struct llog_operations *op); int llog_cleanup(struct llog_ctxt *); int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp); -int llog_add(struct llog_ctxt *ctxt, - struct llog_rec_hdr *rec, struct lov_stripe_md *lsm, - struct llog_cookie *logcookies, int numcookies); +int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, + struct lov_stripe_md *lsm, struct llog_cookie *logcookies, + int numcookies); int llog_cancel(struct llog_ctxt *, struct lov_stripe_md *lsm, - int count, struct llog_cookie *cookies, int flags); + int count, struct llog_cookie *cookies, int flags); -int llog_obd_origin_setup(struct obd_device *obd, int index, - struct obd_device *disk_obd, int count, +int llog_obd_origin_setup(struct obd_device *obd, int index, + struct obd_device *disk_obd, int count, struct llog_logid *logid); int llog_obd_origin_cleanup(struct llog_ctxt *ctxt); int llog_obd_origin_add(struct llog_ctxt *ctxt, @@ -118,14 +127,14 @@ int obd_llog_finish(struct obd_device *obd, int count); /* llog_ioctl.c */ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data); -int llog_catlog_list(struct obd_device *obd, int count, +int llog_catlog_list(struct obd_device *obd, int count, struct obd_ioctl_data *data); /* llog_net.c */ int llog_initiator_connect(struct llog_ctxt *ctxt); int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp); int llog_origin_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_ctxt_gen *gen); + struct llog_logid *logid, struct llog_gen *gen); int llog_handle_connect(struct ptlrpc_request *req); /* recov_thread.c */ @@ -133,40 +142,34 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags); int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp); -int llog_repl_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_ctxt_gen *gen); +int llog_repl_connect(struct llog_ctxt *ctxt, int count, + struct llog_logid *logid, struct llog_gen *gen); struct llog_operations { int (*lop_write_rec)(struct llog_handle *loghandle, - struct llog_rec_hdr *rec, - struct llog_cookie *logcookies, - int numcookies, - void *, - int idx); + struct llog_rec_hdr *rec, + struct llog_cookie *logcookies, int numcookies, + void *, int idx); int (*lop_destroy)(struct llog_handle *handle); - int (*lop_next_block)(struct llog_handle *h, - int *curr_idx, - int next_idx, - __u64 *offset, - void *buf, - int len); + int (*lop_next_block)(struct llog_handle *h, int *curr_idx, + int next_idx, __u64 *offset, void *buf, int len); int (*lop_create)(struct llog_ctxt *ctxt, struct llog_handle **, struct llog_logid *logid, char *name); int (*lop_close)(struct llog_handle *handle); int (*lop_read_header)(struct llog_handle *handle); - int (*lop_setup)(struct obd_device *obd, int ctxt_idx, - struct obd_device *disk_obd, int count, + int (*lop_setup)(struct obd_device *obd, int ctxt_idx, + struct obd_device *disk_obd, int count, struct llog_logid *logid); int (*lop_sync)(struct llog_ctxt *ctxt, struct obd_export *exp); int (*lop_cleanup)(struct llog_ctxt *ctxt); - int (*lop_add)(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, - struct lov_stripe_md *lsm, + int (*lop_add)(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, + struct lov_stripe_md *lsm, struct llog_cookie *logcookies, int numcookies); int (*lop_cancel)(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags); int (*lop_connect)(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_ctxt_gen *gen); + struct llog_logid *logid, struct llog_gen *gen); /* XXX add 2 more: commit callbacks and llog recovery functions */ }; @@ -178,10 +181,10 @@ extern struct llog_operations llog_lvfs_ops; struct llog_ctxt { int loc_idx; /* my index the obd array of ctxt's */ - struct llog_ctxt_gen loc_gen; + struct llog_gen loc_gen; struct obd_device *loc_obd; /* points back to the containing obd*/ struct obd_export *loc_exp; - struct obd_import *loc_imp; /* to use in RPC's: can be backward + struct obd_import *loc_imp; /* to use in RPC's: can be backward pointing import */ struct llog_operations *loc_logops; struct llog_handle *loc_handle; @@ -190,20 +193,19 @@ struct llog_ctxt { void *llog_proc_cb; }; -static inline void log_gen_init(struct llog_ctxt *ctxt) +static inline void llog_gen_init(struct llog_ctxt *ctxt) { struct obd_device *obd = ctxt->loc_exp->exp_obd; if (!strcmp(obd->obd_type->typ_name, "mds")) ctxt->loc_gen.mnt_cnt = obd->u.mds.mds_mount_count; - else if (!strstr(obd->obd_type->typ_name, "filter")) { - ctxt->loc_gen.mnt_cnt = obd->u.filter.fo_mount_count; - } + else if (!strstr(obd->obd_type->typ_name, "filter")) + ctxt->loc_gen.mnt_cnt = obd->u.filter.fo_mount_count; else - ctxt->loc_gen.mnt_cnt = 0; + ctxt->loc_gen.mnt_cnt = 0; } -static inline int log_gen_lt(struct llog_ctxt_gen a, struct llog_ctxt_gen b) +static inline int llog_gen_lt(struct llog_gen a, struct llog_gen b) { if (a.mnt_cnt < b.mnt_cnt) return 1; @@ -212,15 +214,19 @@ static inline int log_gen_lt(struct llog_ctxt_gen a, struct llog_ctxt_gen b) return(a.conn_cnt < b.conn_cnt ? 1 : 0); } +#define LLOG_GEN_INC(gen) ((gen).conn_cnt) ++ +#define LLOG_PROC_BREAK 0x0001 static inline int llog_obd2ops(struct llog_ctxt *ctxt, struct llog_operations **lop) { if (ctxt == NULL) return -ENOTCONN; + *lop = ctxt->loc_logops; if (*lop == NULL) return -EOPNOTSUPP; + return 0; } @@ -229,6 +235,7 @@ static inline int llog_handle2ops(struct llog_handle *loghandle, { if (loghandle == NULL) return -EINVAL; + return llog_obd2ops(loghandle->lgh_ctxt, lop); } @@ -242,8 +249,8 @@ static inline struct llog_ctxt *llog_get_context(struct obd_device *obd, { if (index < 0 || index >= LLOG_MAX_CTXTS) return NULL; - else - return obd->obd_llog_ctxt[index]; + + return obd->obd_llog_ctxt[index]; } static inline int llog_write_rec(struct llog_handle *handle, @@ -254,7 +261,7 @@ static inline int llog_write_rec(struct llog_handle *handle, struct llog_operations *lop; int rc, buflen; ENTRY; - + rc = llog_handle2ops(handle, &lop); if (rc) RETURN(rc); @@ -277,7 +284,7 @@ static inline int llog_read_header(struct llog_handle *handle) struct llog_operations *lop; int rc; ENTRY; - + rc = llog_handle2ops(handle, &lop); if (rc) RETURN(rc); @@ -293,7 +300,7 @@ static inline int llog_destroy(struct llog_handle *handle) struct llog_operations *lop; int rc; ENTRY; - + rc = llog_handle2ops(handle, &lop); if (rc) RETURN(rc); @@ -322,7 +329,7 @@ static inline int llog_cancel(struct obd_export *exp, rc = lop->lop_cancel(exp, lsm, count, cookies, flags); RETURN(rc); } -#endif +#endif static inline int llog_next_block(struct llog_handle *loghandle, int *cur_idx, int next_idx, __u64 *cur_offset, void *buf, @@ -343,8 +350,7 @@ static inline int llog_next_block(struct llog_handle *loghandle, int *cur_idx, RETURN(rc); } -static inline int llog_create(struct llog_ctxt *ctxt, - struct llog_handle **res, +static inline int llog_create(struct llog_ctxt *ctxt, struct llog_handle **res, struct llog_logid *logid, char *name) { struct llog_operations *lop; @@ -362,8 +368,7 @@ static inline int llog_create(struct llog_ctxt *ctxt, } static inline int llog_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, - struct llog_ctxt_gen *gen) + struct llog_logid *logid, struct llog_gen *gen) { struct llog_operations *lop; int rc; diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 5ca5873..267c61c 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -403,7 +403,7 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile, pcfg.pcfg_nid = lmd->lmd_server_nid; pcfg.pcfg_id = lmd->lmd_server_ipaddr; pcfg.pcfg_misc = lmd->lmd_port; - pcfg.pcfg_size = 0; + pcfg.pcfg_size = 8388608; pcfg.pcfg_flags = 0x4; /*share*/ err = kportal_nal_cmd(&pcfg); if (err <0) diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index 610d969..ce7b4a3 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -90,6 +90,8 @@ enum { LPROC_FILTER_LAST, }; +#define FILTER_MAX_CACHE_SIZE OBD_OBJECT_EOF + /* filter.c */ struct dentry *filter_parent(struct obd_device *, obd_gr group, obd_id objid); struct dentry *filter_parent_lock(struct obd_device *, obd_gr, obd_id, diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index 774e669..afb49db 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -360,13 +360,24 @@ static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa, { struct obd_ioobj *o; struct niobuf_local *lnb; - int i, j; + int i, j, drop = 0; ENTRY; + if (res->dentry != NULL) + drop = (res->dentry->d_inode->i_size > + exp->exp_obd->u.filter.fo_readcache_max_filesize); + for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) { for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) { - if (lnb->page != NULL) - page_cache_release(lnb->page); + if (lnb->page == NULL) + continue; + /* drop from cache like truncate_list_pages() */ + if (drop && !TryLockPage(lnb->page)) { + if (lnb->page->mapping) + truncate_complete_page(lnb->page); + unlock_page(lnb->page); + } + page_cache_release(lnb->page); } } if (res->dentry != NULL) diff --git a/lustre/obdfilter/filter_log.c b/lustre/obdfilter/filter_log.c index b63c4c2..686fd30 100644 --- a/lustre/obdfilter/filter_log.c +++ b/lustre/obdfilter/filter_log.c @@ -113,18 +113,19 @@ int filter_recov_log_unlink_cb(struct llog_handle *llh, struct obd_device *obd = ctxt->loc_obd; struct obd_export *exp = obd->obd_self_export; struct llog_cookie cookie; + struct llog_gen_rec *lgr; struct llog_unlink_rec *lur; struct obdo *oa; - struct obd_trans_info oti = { 0 }; obd_id oid; int rc = 0; ENTRY; - if (!le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) { + if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) { CERROR("log is not plain\n"); RETURN(-EINVAL); } - if (rec->lrh_type != MDS_UNLINK_REC) { + if (rec->lrh_type != MDS_UNLINK_REC && + rec->lrh_type != LLOG_GEN_REC) { CERROR("log record type error\n"); RETURN(-EINVAL); } @@ -132,9 +133,19 @@ int filter_recov_log_unlink_cb(struct llog_handle *llh, cookie.lgc_lgl = llh->lgh_id; cookie.lgc_subsys = LLOG_UNLINK_ORIG_CTXT; cookie.lgc_index = le32_to_cpu(rec->lrh_index); - + + if (rec->lrh_type == LLOG_GEN_REC) { + lgr = (struct llog_gen_rec *)rec; + if (llog_gen_lt(lgr->lgr_gen, ctxt->loc_gen)) + rc = 0; + else + rc = LLOG_PROC_BREAK; + CWARN("fetch generation log, send cookie\n"); + llog_cancel(ctxt, NULL, 1, &cookie, 0); + RETURN(rc); + } + lur = (struct llog_unlink_rec *)rec; - oa = obdo_alloc(); if (oa == NULL) RETURN(-ENOMEM); @@ -144,10 +155,10 @@ int filter_recov_log_unlink_cb(struct llog_handle *llh, memcpy(obdo_logcookie(oa), &cookie, sizeof(cookie)); oid = oa->o_id; - rc = obd_destroy(exp, oa, NULL, &oti); + rc = obd_destroy(exp, oa, NULL, NULL); obdo_free(oa); if (rc == -ENOENT) { - CWARN("object already removed: send cookie\n"); + CWARN("object already removed, send cookie\n"); llog_cancel(ctxt, NULL, 1, &cookie, 0); RETURN(0); } diff --git a/lustre/ptlrpc/Makefile.mk b/lustre/ptlrpc/Makefile.mk index 888e1bc..4154582 100644 --- a/lustre/ptlrpc/Makefile.mk +++ b/lustre/ptlrpc/Makefile.mk @@ -6,7 +6,12 @@ include $(src)/../portals/Kernelenv obj-y += ptlrpc.o + ptlrpc-objs := recover.o connection.o ptlrpc_module.o events.o service.o \ client.o niobuf.o pack_generic.o lproc_ptlrpc.o pinger.o \ recov_thread.o import.o llog_net.o llog_client.o \ - llog_server.o ptlrpcd.o + llog_server.o ptlrpcd.o ../ldlm/l_lock.o ../ldlm/ldlm_lock.o \ + ../ldlm/ldlm_resource.o ../ldlm/ldlm_extent.o \ + ../ldlm/ldlm_request.o ../ldlm/ldlm_lockd.o \ + ../ldlm/ldlm_lib.o ../ldlm/ldlm_flock.o ../ldlm/ldlm_plain.o + diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c index 74aa72b..d28b0b6 100644 --- a/lustre/ptlrpc/recov_thread.c +++ b/lustre/ptlrpc/recov_thread.c @@ -50,6 +50,8 @@ #include #include "ptlrpc_internal.h" +#define LLCD_SIZE 4096 + #ifdef __KERNEL__ static struct llog_commit_master lustre_lcm; @@ -61,7 +63,7 @@ static int llcd_alloc(void) struct llog_canceld_ctxt *llcd; int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies); - OBD_ALLOC(llcd, PAGE_SIZE + offset); + OBD_ALLOC(llcd, LLCD_SIZE + offset); if (llcd == NULL) return -ENOMEM; @@ -107,7 +109,7 @@ static void llcd_put(struct llog_canceld_ctxt *llcd) int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies); if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) { - OBD_FREE(llcd, PAGE_SIZE + offset); + OBD_FREE(llcd, LLCD_SIZE + offset); } else { spin_lock(&lcm->lcm_llcd_lock); list_add(&llcd->llcd_list, &lcm->lcm_llcd_free); @@ -128,8 +130,8 @@ void llcd_send(struct llog_canceld_ctxt *llcd) EXPORT_SYMBOL(llcd_send); /* deleted objects have a commit callback that cancels the MDS - * log record for the deletion. The commit callback calls this - * function + * log record for the deletion. The commit callback calls this + * function */ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, @@ -160,8 +162,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, cookies->lgc_lgl.lgl_ogen, cookies->lgc_index); GOTO(out, rc = -ENOMEM); } - llcd->llcd_import = ctxt->loc_imp; - llcd->llcd_gen = ctxt->loc_gen; + llcd->llcd_ctxt = ctxt; ctxt->loc_llcd = llcd; } @@ -170,9 +171,9 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, llcd->llcd_cookiebytes += sizeof(*cookies); send_now: - if ((PAGE_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) || + if ((LLCD_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) || flags & OBD_LLOG_FL_SENDNOW)) { - CDEBUG(D_HA, "send llcd: %p\n", llcd); + CDEBUG(D_HA, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt); ctxt->loc_llcd = NULL; llcd_send(llcd); } @@ -187,16 +188,17 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp) int rc = 0; ENTRY; - LASSERT(ctxt->loc_llcd); - if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) { - CWARN("import will be destroyed, put llcd %p\n", - ctxt->loc_llcd); - llcd_put(ctxt->loc_llcd); - ctxt->loc_llcd = NULL; + down(&ctxt->loc_sem); + if (ctxt->loc_llcd != NULL) { + CWARN("import will be destroyed, put " + "llcd %p:%p\n", ctxt->loc_llcd, ctxt); + llcd_put(ctxt->loc_llcd); + ctxt->loc_llcd = NULL; + ctxt->loc_imp = NULL; + } up(&ctxt->loc_sem); } else { - up(&ctxt->loc_sem); rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW); } @@ -294,11 +296,11 @@ static int log_commit_thread(void *arg) llcd = list_entry(lcd->lcd_llcd_list.next, typeof(*llcd), llcd_list); LASSERT(llcd->llcd_lcm == lcm); - import = llcd->llcd_import; + import = llcd->llcd_ctxt->loc_imp; } list_for_each_entry_safe(llcd, n, sending_list, llcd_list) { LASSERT(llcd->llcd_lcm == lcm); - if (import == llcd->llcd_import) + if (import == llcd->llcd_ctxt->loc_imp) list_move_tail(&llcd->llcd_list, &lcd->lcd_llcd_list); } @@ -306,7 +308,7 @@ static int log_commit_thread(void *arg) list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend, llcd_list) { LASSERT(llcd->llcd_lcm == lcm); - if (import == llcd->llcd_import) + if (import == llcd->llcd_ctxt->loc_imp) list_move_tail(&llcd->llcd_list, &lcd->lcd_llcd_list); } @@ -316,31 +318,29 @@ static int log_commit_thread(void *arg) /* We are the only one manipulating our local list - no lock */ list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){ char *bufs[1] = {(char *)llcd->llcd_cookies}; - struct obd_device *obd = import->imp_obd; - struct llog_ctxt *ctxt; list_del(&llcd->llcd_list); if (llcd->llcd_cookiebytes == 0) { - CDEBUG(D_HA, "just put empty llcd %p\n", llcd); + CDEBUG(D_HA, "put empty llcd %p:%p\n", + llcd, llcd->llcd_ctxt); llcd_put(llcd); continue; } - /* check whether the cookies are new. if new then send, otherwise - * just put llcd */ - ctxt = llog_get_context(obd, llcd->llcd_cookies[0].lgc_subsys + 1); - LASSERT(ctxt != NULL); - down(&ctxt->loc_sem); - if (log_gen_lt(llcd->llcd_gen, ctxt->loc_gen)) { - up(&ctxt->loc_sem); - CDEBUG(D_HA, "just put stale llcd %p\n", llcd); + + down(&llcd->llcd_ctxt->loc_sem); + if (llcd->llcd_ctxt->loc_imp == NULL) { + up(&llcd->llcd_ctxt->loc_sem); + CWARN("import will be destroyed, put " + "llcd %p:%p\n", llcd, llcd->llcd_ctxt); llcd_put(llcd); continue; } - up(&ctxt->loc_sem); request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1, &llcd->llcd_cookiebytes, bufs); + up(&llcd->llcd_ctxt->loc_sem); + if (request == NULL) { rc = -ENOMEM; CERROR("error preparing commit: rc %d\n", rc); @@ -354,8 +354,18 @@ static int log_commit_thread(void *arg) } request->rq_replen = lustre_msg_size(0, NULL); + down(&llcd->llcd_ctxt->loc_sem); + if (llcd->llcd_ctxt->loc_imp == NULL) { + up(&llcd->llcd_ctxt->loc_sem); + CWARN("import will be destroyed, put " + "llcd %p:%p\n", llcd, llcd->llcd_ctxt); + llcd_put(llcd); + ptlrpc_req_finished(request); + continue; + } rc = ptlrpc_queue_wait(request); ptlrpc_req_finished(request); + up(&llcd->llcd_ctxt->loc_sem); /* If the RPC failed, we put this and the remaining * messages onto the resend list for another time. */ @@ -364,7 +374,7 @@ static int log_commit_thread(void *arg) continue; } -#if 0 /* FIXME just put llcd, not send it again */ +#if 0 /* FIXME just put llcd, not put it on resend list */ spin_lock(&lcm->lcm_llcd_lock); list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend); if (++llcd->llcd_tries < 5) { @@ -377,13 +387,15 @@ static int log_commit_thread(void *arg) } else { spin_unlock(&lcm->lcm_llcd_lock); #endif - CERROR("commit %p dropped %d cookies: rc %d\n", - llcd, (int)(llcd->llcd_cookiebytes / - sizeof(*llcd->llcd_cookies)), - rc); + CERROR("commit %p:%p drop %d cookies: rc %d\n", + llcd, llcd->llcd_ctxt, + (int)(llcd->llcd_cookiebytes / + sizeof(*llcd->llcd_cookies)), rc); llcd_put(llcd); -// } +#if 0 + } break; +#endif } if (rc == 0) { @@ -439,11 +451,12 @@ int llog_start_commit_thread(void) EXPORT_SYMBOL(llog_start_commit_thread); static struct llog_process_args { - struct semaphore llpa_sem; + struct semaphore llpa_sem; struct llog_ctxt *llpa_ctxt; void *llpa_cb; void *llpa_arg; } llpa; + int llog_init_commit_master(void) { INIT_LIST_HEAD(&lcm->lcm_thread_busy); @@ -475,7 +488,6 @@ int llog_cleanup_commit_master(int force) return 0; } - static int log_process_thread(void *args) { struct llog_process_args *data = args; @@ -486,17 +498,17 @@ static int log_process_thread(void *args) unsigned long flags; int rc; ENTRY; - + up(&data->llpa_sem); lock_kernel(); ptlrpc_daemonize(); /* thread never needs to do IO */ - + SIGNAL_MASK_LOCK(current, flags); sigfillset(¤t->blocked); RECALC_SIGPENDING; SIGNAL_MASK_UNLOCK(current, flags); unlock_kernel(); - + rc = llog_create(ctxt, &llh, &logid, NULL); if (rc) { CERROR("llog_create failed %d\n", rc); @@ -507,26 +519,27 @@ static int log_process_thread(void *args) CERROR("llog_init_handle failed %d\n", rc); GOTO(out, rc); } - + if (cb) { rc = llog_cat_process(llh, (llog_cb_t)cb, NULL); - if (rc) + if (rc != LLOG_PROC_BREAK) CERROR("llog_cat_process failed %d\n", rc); - } else + } else { CWARN("no callback function for recovery\n"); + } - CDEBUG(D_HA, "send to llcd :%p forcibly\n", ctxt->loc_llcd); + CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n", + ctxt->loc_llcd, ctxt); llog_sync(ctxt, NULL); out: rc = llog_cat_put(llh); if (rc) CERROR("llog_cat_put failed %d\n", rc); - + RETURN(rc); } -static int llog_recovery_generic(struct llog_ctxt *ctxt, - void *handle, - void *arg) + +static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg) { int rc; ENTRY; @@ -546,31 +559,33 @@ static int llog_recovery_generic(struct llog_ctxt *ctxt, RETURN(rc); } + int llog_repl_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_ctxt_gen *gen) + struct llog_logid *logid, struct llog_gen *gen) { struct llog_canceld_ctxt *llcd; int rc; ENTRY; - + + /* send back llcd before recovery from llog */ + if (ctxt->loc_llcd != NULL) { + CWARN("llcd %p:%p not empty\n", ctxt->loc_llcd, ctxt); + llog_sync(ctxt, NULL); + } + down(&ctxt->loc_sem); ctxt->loc_gen = *gen; - llcd = ctxt->loc_llcd; - if (llcd) { - CDEBUG(D_HA, "put current llcd when new connection arrives\n"); - llcd_put(llcd); - } llcd = llcd_grab(); if (llcd == NULL) { CERROR("couldn't get an llcd\n"); + up(&ctxt->loc_sem); RETURN(-ENOMEM); } - llcd->llcd_import = ctxt->loc_imp; - llcd->llcd_gen = ctxt->loc_gen; + llcd->llcd_ctxt = ctxt; ctxt->loc_llcd = llcd; up(&ctxt->loc_sem); - rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid); + rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid); if (rc != 0) CERROR("error recovery process: %d\n", rc); -- 1.8.3.1