From f10a4cfd1146989e3947fc8061f2769002b3f044 Mon Sep 17 00:00:00 2001 From: Oleg Drokin Date: Fri, 23 Jul 2010 14:03:02 +0400 Subject: [PATCH] b=16919 Async journal commit support Async journal commit for 2.0 branch. Also reverts patch from bug 22190 as no longer needed. Also included changes from b1_8 that were introduced in bug 22241. Async commit is ENABLED by default. i=adilger i=tappro --- libcfs/include/libcfs/linux/linux-mem.h | 3 ++ libcfs/include/libcfs/user-mem.h | 2 + lustre/include/lustre/lustre_idl.h | 1 + lustre/include/obd.h | 12 +++++ lustre/include/obd_support.h | 2 + lustre/obdecho/echo_client.c | 6 ++- lustre/obdfilter/filter.c | 15 +++++-- lustre/obdfilter/filter_internal.h | 14 +++++- lustre/obdfilter/filter_io.c | 34 +++++++++++++-- lustre/obdfilter/filter_io_26.c | 27 +++++++++--- lustre/obdfilter/lproc_obdfilter.c | 77 +++++++++++++++++++++++++++++++++ lustre/osc/osc_page.c | 5 ++- lustre/osc/osc_request.c | 5 ++- lustre/ost/ost_handler.c | 49 +++++++++++++++++---- lustre/ptlrpc/client.c | 11 ++++- lustre/ptlrpc/niobuf.c | 12 +++-- lustre/ptlrpc/pinger.c | 32 +++++++++++--- lustre/ptlrpc/ptlrpc_internal.h | 1 + lustre/tests/replay-single.sh | 49 +++++++++++++++------ 19 files changed, 304 insertions(+), 53 deletions(-) diff --git a/libcfs/include/libcfs/linux/linux-mem.h b/libcfs/include/libcfs/linux/linux-mem.h index 18c64e7..08b8e63 100644 --- a/libcfs/include/libcfs/linux/linux-mem.h +++ b/libcfs/include/libcfs/linux/linux-mem.h @@ -98,6 +98,9 @@ static inline int cfs_page_count(cfs_page_t *page) #define cfs_page_index(p) ((p)->index) +#define cfs_page_pin(page) page_cache_get(page) +#define cfs_page_unpin(page) page_cache_release(page) + /* * Memory allocator * XXX Liang: move these declare to public file diff --git a/libcfs/include/libcfs/user-mem.h b/libcfs/include/libcfs/user-mem.h index 3e72e20..df32135 100644 --- a/libcfs/include/libcfs/user-mem.h +++ b/libcfs/include/libcfs/user-mem.h @@ -49,6 +49,8 @@ void cfs_kunmap(cfs_page_t *pg); #define cfs_get_page(p) __I_should_not_be_called__(at_all) #define cfs_page_count(p) __I_should_not_be_called__(at_all) #define cfs_page_index(p) ((p)->index) +#define cfs_page_pin(page) do {} while (0) +#define cfs_page_unpin(page) do {} while (0) /* * Memory allocator diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 2eaa676..e8df3f5 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -1346,6 +1346,7 @@ extern void lustre_swab_obd_statfs (struct obd_statfs *os); #define OBD_BRW_NOCACHE 0x80 /* this page is a part of non-cached IO */ #define OBD_BRW_NOQUOTA 0x100 #define OBD_BRW_SRVLOCK 0x200 /* Client holds no lock over this page */ +#define OBD_BRW_ASYNC 0x400 /* Server may delay commit to disk */ #define OBD_BRW_MEMALLOC 0x800 /* Client runs in the "kswapd" context */ #define OBD_OBJECT_EOF 0xffffffffffffffffULL diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 79674c1..2c7da99 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -345,6 +345,9 @@ struct filter_obd { int fo_fmd_max_num; /* per exp filter_mod_data */ int fo_fmd_max_age; /* jiffies to fmd expiry */ + unsigned long fo_syncjournal:1, /* sync journal on writes */ + fo_sync_lock_cancel:2;/* sync on lock cancel */ + /* sptlrpc stuff */ cfs_rwlock_t fo_sptlrpc_lock; @@ -373,6 +376,14 @@ struct timeout_item { #define OSC_MAX_DIRTY_MB_MAX 2048 /* arbitrary, but < MAX_LONG bytes */ #define OSC_DEFAULT_RESENDS 10 +/* possible values for fo_sync_lock_cancel */ +enum { + NEVER_SYNC_ON_CANCEL = 0, + BLOCKING_SYNC_ON_CANCEL = 1, + ALWAYS_SYNC_ON_CANCEL = 2, + NUM_SYNC_ON_CANCEL_STATES +}; + #define MDC_MAX_RIF_DEFAULT 8 #define MDC_MAX_RIF_MAX 512 @@ -1140,6 +1151,7 @@ enum obd_cleanup_stage { /* KEY_SET_INFO in lustre_idl.h */ #define KEY_SPTLRPC_CONF "sptlrpc_conf" #define KEY_CONNECT_FLAG "connect_flags" +#define KEY_SYNC_LOCK_CANCEL "sync_lock_cancel" struct lu_context; diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index c944348..d9def8b 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -121,6 +121,8 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_RECOVERY_TIME_SOFT (obd_timeout * 3) /* Change recovery-small 26b time if you change this */ #define PING_INTERVAL max(obd_timeout / 4, 1U) +/* a bit more than maximal journal commit time in seconds */ +#define PING_INTERVAL_SHORT min(PING_INTERVAL, 7U) /* Client may skip 1 ping; we must wait at least 2.5. But for multiple * failover targets the client only pings one server at a time, and pings * can be lost on a loaded network. Since eviction has serious consequences, diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index 912d8b6..b2c8815 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -1515,6 +1515,7 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa, int rc; int verify; int gfp_mask; + int brw_flags = 0; ENTRY; verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID && @@ -1534,6 +1535,9 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa, /* XXX think again with misaligned I/O */ npages = count >> CFS_PAGE_SHIFT; + if (rw == OBD_BRW_WRITE) + brw_flags = OBD_BRW_ASYNC; + OBD_ALLOC(pga, npages * sizeof(*pga)); if (pga == NULL) RETURN(-ENOMEM); @@ -1558,7 +1562,7 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa, pages[i] = pgp->pg; pgp->count = CFS_PAGE_SIZE; pgp->off = off; - pgp->flag = 0; + pgp->flag = brw_flags; if (verify) echo_client_page_debug_setup(lsm, pgp->pg, rw, diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 82dfe24..8d11d00 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -2046,6 +2046,8 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE; filter->fo_fmd_max_num = FILTER_FMD_MAX_NUM_DEFAULT; filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT; + filter->fo_syncjournal = 0; /* Don't sync journals on i/o by default */ + filter_slc_set(filter); /* initialize sync on lock cancel */ rc = filter_prep(obd); if (rc) @@ -3921,8 +3923,8 @@ set_last_id: RETURN(rc); } -static int filter_create(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md **ea, struct obd_trans_info *oti) +int filter_create(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md **ea, struct obd_trans_info *oti) { struct obd_device *obd = exp->exp_obd; struct filter_export_data *fed; @@ -3961,7 +3963,8 @@ static int filter_create(struct obd_export *exp, struct obdo *oa, if ((oa->o_valid & OBD_MD_FLFLAGS) && (oa->o_flags & OBD_FL_RECREATE_OBJS)) { - if (oa->o_id > filter_last_id(filter, oa->o_seq)) { + if (!obd->obd_recovering || + oa->o_id > filter_last_id(filter, oa->o_seq)) { CERROR("recreate objid "LPU64" > last id "LPU64"\n", oa->o_id, filter_last_id(filter, oa->o_seq)); rc = -EINVAL; @@ -4337,6 +4340,12 @@ static int filter_get_info(struct obd_export *exp, __u32 keylen, RETURN(rc); } + if (KEY_IS(KEY_SYNC_LOCK_CANCEL)) { + *((__u32 *) val) = obd->u.filter.fo_sync_lock_cancel; + *vallen = sizeof(__u32); + RETURN(0); + } + CDEBUG(D_IOCTL, "invalid key\n"); RETURN(-EINVAL); } diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index 91d8ef9..7c73619 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -152,7 +152,8 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, struct obd_trans_info *oti); -struct dentry *filter_create_object(struct obd_device *obd, struct obdo *oa); +int filter_create(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md **ea, struct obd_trans_info *oti); struct obd_llog_group *filter_find_olg(struct obd_device *obd, int seq); @@ -243,4 +244,15 @@ void blacklist_add(uid_t uid); void blacklist_del(uid_t uid); int blacklist_display(char *buf, int bufsize); +/* sync on lock cancel is useless when we force a journal flush, + * and if we enable async journal commit, we should also turn on + * sync on lock cancel if it is not enabled already. */ +static inline void filter_slc_set(struct filter_obd *filter) +{ + if (filter->fo_syncjournal == 1) + filter->fo_sync_lock_cancel = NEVER_SYNC_ON_CANCEL; + else if (filter->fo_sync_lock_cancel == NEVER_SYNC_ON_CANCEL) + filter->fo_sync_lock_cancel = ALWAYS_SYNC_ON_CANCEL; +} + #endif /* _FILTER_INTERNAL_H */ diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index 17d4685..c53c4359 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -681,9 +681,36 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, cleanup_phase = 2; if (dentry->d_inode == NULL) { - CERROR("%s: trying to BRW to non-existent file "LPU64"\n", - obd->obd_name, obj->ioo_id); - GOTO(cleanup, rc = -ENOENT); + if (exp->exp_obd->obd_recovering) { + struct obdo *noa = oa; + + if (oa == NULL) { + OBDO_ALLOC(noa); + if (noa == NULL) + GOTO(recreate_out, rc = -ENOMEM); + noa->o_id = obj->ioo_id; + noa->o_valid = OBD_MD_FLID; + } + + if (filter_create(exp, noa, NULL, oti) == 0) { + f_dput(dentry); + dentry = filter_fid2dentry(exp->exp_obd, NULL, + obj->ioo_seq, + obj->ioo_id); + } + if (oa == NULL) + OBDO_FREE(noa); + } + recreate_out: + if (IS_ERR(dentry) || dentry->d_inode == NULL) { + CERROR("%s: BRW to missing obj "LPU64"/"LPU64":rc %d\n", + exp->exp_obd->obd_name, + obj->ioo_id, obj->ioo_seq, + IS_ERR(dentry) ? (int)PTR_ERR(dentry) : -ENOENT); + if (IS_ERR(dentry)) + cleanup_phase = 1; + GOTO(cleanup, rc = -ENOENT); + } } if (oa->o_valid & (OBD_MD_FLUID | OBD_MD_FLGID) && @@ -999,6 +1026,7 @@ int filter_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo, lnb[i].page = pga[i].pg; rnb[i].offset = pga[i].off; rnb[i].len = pga[i].count; + lnb[i].flags = rnb[i].flags = pga[i].flag; } obdo_to_ioobj(oinfo->oi_oa, &ioo); diff --git a/lustre/obdfilter/filter_io_26.c b/lustre/obdfilter/filter_io_26.c index 7b39704..82cdfba 100644 --- a/lustre/obdfilter/filter_io_26.c +++ b/lustre/obdfilter/filter_io_26.c @@ -510,8 +510,11 @@ int filter_direct_io(int rw, struct dentry *dchild, struct filter_iobuf *iobuf, rc = rc2; } - rc2 = fsfilt_commit_async(obd,inode,oti->oti_handle, - wait_handle); + if (wait_handle) + rc2 = fsfilt_commit_async(obd,inode,oti->oti_handle, + wait_handle); + else + rc2 = fsfilt_commit(obd, inode, oti->oti_handle, 0); if (rc == 0) rc = rc2; if (rc != 0) @@ -572,10 +575,11 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int i, err, cleanup_phase = 0; struct obd_device *obd = exp->exp_obd; struct filter_obd *fo = &obd->u.filter; - void *wait_handle; + void *wait_handle = NULL; int total_size = 0; unsigned int qcids[MAXQUOTAS] = { oa->o_uid, oa->o_gid }; int rec_pending[MAXQUOTAS] = { 0, 0 }, quota_pages = 0; + int sync_journal_commit = obd->u.filter.fo_syncjournal; ENTRY; LASSERT(oti != NULL); @@ -643,6 +647,10 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, (flags & (OBD_BRW_FROM_GRANT | OBD_BRW_SYNC)) == OBD_BRW_FROM_GRANT) iobuf->dr_ignore_quota = 1; + + if (!(lnb->flags & OBD_BRW_ASYNC)) { + sync_journal_commit = 1; + } } /* we try to get enough quota to write here, and let ldiskfs @@ -716,7 +724,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, /* filter_direct_io drops i_mutex */ rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr, - oti, &wait_handle); + oti, sync_journal_commit ? &wait_handle : NULL); obdo_from_inode(oa, inode, NULL, rc == 0 ? FILTER_VALID_FLAGS : 0 | OBD_MD_FLUID |OBD_MD_FLGID); @@ -725,13 +733,18 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, fsfilt_check_slow(obd, now, "direct_io"); - err = fsfilt_commit_wait(obd, inode, wait_handle); + if (wait_handle) + err = fsfilt_commit_wait(obd, inode, wait_handle); + else + err = 0; + if (err) { CERROR("Failure to commit OST transaction (%d)?\n", err); - rc = err; + if (rc == 0) + rc = err; } - if (obd->obd_replayable && !rc) + if (obd->obd_replayable && !rc && wait_handle) LASSERTF(oti->oti_transno <= obd->obd_last_committed, "oti_transno "LPU64" last_committed "LPU64"\n", oti->oti_transno, obd->obd_last_committed); diff --git a/lustre/obdfilter/lproc_obdfilter.c b/lustre/obdfilter/lproc_obdfilter.c index 92effc9..99bc28c 100644 --- a/lustre/obdfilter/lproc_obdfilter.c +++ b/lustre/obdfilter/lproc_obdfilter.c @@ -354,6 +354,79 @@ int lprocfs_filter_wr_degraded(struct file *file, const char *buffer, return count; } +int lprocfs_filter_rd_syncjournal(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = data; + int rc; + + rc = snprintf(page, count, "%u\n", obd->u.filter.fo_syncjournal); + return rc; +} + +int lprocfs_filter_wr_syncjournal(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct obd_device *obd = data; + int val; + int rc; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + + if (val < 0) + return -EINVAL; + + obd->u.filter.fo_syncjournal = !!val; + filter_slc_set(&obd->u.filter); + + return count; +} + +static char *sync_on_cancel_states[] = {"never", + "blocking", + "always" }; + +int lprocfs_filter_rd_sync_lock_cancel(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = data; + int rc; + + rc = snprintf(page, count, "%s\n", + sync_on_cancel_states[obd->u.filter.fo_sync_lock_cancel]); + return rc; +} + +int lprocfs_filter_wr_sync_lock_cancel(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct obd_device *obd = data; + int val = -1; + int i; + + for (i = 0 ; i < NUM_SYNC_ON_CANCEL_STATES; i++) { + if (memcmp(buffer, sync_on_cancel_states[i], + strlen(sync_on_cancel_states[i])) == 0) { + val = i; + break; + } + } + if (val == -1) { + int rc; + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + } + + if (val < 0 || val > 2) + return -EINVAL; + + obd->u.filter.fo_sync_lock_cancel = val; + return count; +} + static struct lprocfs_vars lprocfs_filter_obd_vars[] = { { "uuid", lprocfs_rd_uuid, 0, 0 }, { "blocksize", lprocfs_rd_blksize, 0, 0 }, @@ -400,6 +473,10 @@ static struct lprocfs_vars lprocfs_filter_obd_vars[] = { { "mds_sync", lprocfs_filter_rd_mds_sync, 0, 0}, { "degraded", lprocfs_filter_rd_degraded, lprocfs_filter_wr_degraded, 0 }, + { "sync_journal", lprocfs_filter_rd_syncjournal, + lprocfs_filter_wr_syncjournal, 0 }, + { "sync_on_lock_cancel", lprocfs_filter_rd_sync_lock_cancel, + lprocfs_filter_wr_sync_lock_cancel, 0 }, { 0 } }; diff --git a/lustre/osc/osc_page.c b/lustre/osc/osc_page.c index 29568a7..167e3eb 100644 --- a/lustre/osc/osc_page.c +++ b/lustre/osc/osc_page.c @@ -213,14 +213,15 @@ static int osc_page_cache_add(const struct lu_env *env, struct osc_page *opg = cl2osc_page(slice); struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj); int result; - int brw_flags; + /* All cacheable IO is async-capable */ + int brw_flags = OBD_BRW_ASYNC; int noquota = 0; LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0)); ENTRY; /* Set the OBD_BRW_SRVLOCK before the page is queued. */ - brw_flags = opg->ops_srvlock ? OBD_BRW_SRVLOCK : 0; + brw_flags |= opg->ops_srvlock ? OBD_BRW_SRVLOCK : 0; if (!client_is_remote(osc_export(obj)) && cfs_capable(CFS_CAP_SYS_RESOURCE)) { brw_flags |= OBD_BRW_NOQUOTA; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 6a21fc2..b8914c3 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1177,7 +1177,7 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) { if (p1->flag != p2->flag) { unsigned mask = ~(OBD_BRW_FROM_GRANT| - OBD_BRW_NOCACHE|OBD_BRW_SYNC); + OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC); /* warn if we try to combine flags that we don't know to be * safe to combine */ @@ -2216,7 +2216,7 @@ static int brw_interpret(const struct lu_env *env, } OBDO_FREE(aa->aa_oa); } else { /* from async_internal() */ - int i; + obd_count i; for (i = 0; i < aa->aa_page_count; i++) osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1); @@ -2229,6 +2229,7 @@ static int brw_interpret(const struct lu_env *env, if (!async) cl_req_completion(env, aa->aa_clerq, rc); osc_release_ppga(aa->aa_ppga, aa->aa_page_count); + RETURN(rc); } diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 2d00451..e2cde0b 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -1323,14 +1323,6 @@ out_bulk: if (desc) ptlrpc_free_bulk(desc); out: - /* XXX: don't send reply if obd rdonly mode, this can cause data loss - * on client, see bug 22190. Remove this when async bulk will be done. - * Meanwhile, if this is umount then don't reply anything. */ - if (req->rq_export->exp_obd->obd_no_transno) { - no_reply = req->rq_export->exp_obd->obd_stopping; - rc = -EIO; - } - if (rc == 0) { oti_to_request(oti, req); target_committed_to_req(req); @@ -1731,6 +1723,45 @@ static int ost_connect_check_sptlrpc(struct ptlrpc_request *req) return rc; } +/* Ensure that data and metadata are synced to the disk when lock is cancelled + * (if requested) */ +int ost_blocking_ast(struct ldlm_lock *lock, + struct ldlm_lock_desc *desc, + void *data, int flag) +{ + __u32 sync_lock_cancel = 0; + __u32 len = sizeof(sync_lock_cancel); + int rc = 0; + ENTRY; + + rc = obd_get_info(lock->l_export, sizeof(KEY_SYNC_LOCK_CANCEL), + KEY_SYNC_LOCK_CANCEL, &len, &sync_lock_cancel, NULL); + + if (!rc && flag == LDLM_CB_CANCELING && + (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) && + (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL || + (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL && + lock->l_flags & LDLM_FL_CBPENDING))) { + struct obdo *oa; + int rc; + + OBDO_ALLOC(oa); + oa->o_id = lock->l_resource->lr_name.name[0]; + oa->o_seq = lock->l_resource->lr_name.name[1]; + oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP; + + rc = obd_sync(lock->l_export, oa, NULL, + lock->l_policy_data.l_extent.start, + lock->l_policy_data.l_extent.end, NULL); + if (rc) + CERROR("Error %d syncing data on lock cancel\n", rc); + + OBDO_FREE(oa); + } + + return ldlm_server_blocking_ast(lock, desc, data, flag); +} + static int ost_filter_recovery_request(struct ptlrpc_request *req, struct obd_device *obd, int *process) { @@ -2366,7 +2397,7 @@ int ost_handle(struct ptlrpc_request *req) if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE)) RETURN(0); rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast, - ldlm_server_blocking_ast, + ost_blocking_ast, ldlm_server_glimpse_ast); fail = OBD_FAIL_OST_LDLM_REPLY_NET; break; diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 4e1a4de..7a26e90 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -194,6 +194,7 @@ void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc, desc->bd_nob += len; + cfs_page_pin(page); ptlrpc_add_bulk_page(desc, page, pageoffset, len); } @@ -203,6 +204,7 @@ void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc, */ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc) { + int i; ENTRY; LASSERT(desc != NULL); @@ -217,6 +219,9 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc) else class_import_put(desc->bd_import); + for (i = 0; i < desc->bd_iov_count ; i++) + cfs_page_unpin(desc->bd_iov[i].kiov_page); + OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[desc->bd_max_iov])); EXIT; @@ -1306,6 +1311,10 @@ static int after_reply(struct ptlrpc_request *req) lustre_msg_get_last_committed(req->rq_repmsg); } ptlrpc_free_committed(imp); + + if (req->rq_transno > imp->imp_peer_committed_transno) + ptlrpc_pinger_commit_expected(imp); + cfs_spin_unlock(&imp->imp_lock); } @@ -2576,8 +2585,6 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) ENTRY; LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY); - /* Not handling automatic bulk replay yet (or ever?) */ - LASSERT(req->rq_bulk == NULL); LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args)); aa = ptlrpc_req_async_args(req); diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 537be4f..515014e 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -254,10 +254,14 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req) /* XXX Registering the same xid on retried bulk makes my head * explode trying to understand how the original request's bulk - * might interfere with the retried request -eeb */ - LASSERTF (!desc->bd_registered || req->rq_xid != desc->bd_last_xid, - "registered: %d rq_xid: "LPU64" bd_last_xid: "LPU64"\n", - desc->bd_registered, req->rq_xid, desc->bd_last_xid); + * might interfere with the retried request -eeb + * On the other hand replaying with the same xid is fine, since + * we are guaranteed old request have completed. -green */ + LASSERTF(!(desc->bd_registered && + req->rq_send_state != LUSTRE_IMP_REPLAY) || + req->rq_xid != desc->bd_last_xid, + "registered: %d rq_xid: "LPU64" bd_last_xid: "LPU64"\n", + desc->bd_registered, req->rq_xid, desc->bd_last_xid); desc->bd_registered = 1; desc->bd_last_xid = req->rq_xid; diff --git a/lustre/ptlrpc/pinger.c b/lustre/ptlrpc/pinger.c index d2f56f5..821f55c 100644 --- a/lustre/ptlrpc/pinger.c +++ b/lustre/ptlrpc/pinger.c @@ -105,10 +105,10 @@ int ptlrpc_ping(struct obd_import *imp) RETURN(0); } -void ptlrpc_update_next_ping(struct obd_import *imp) +void ptlrpc_update_next_ping(struct obd_import *imp, int soon) { #ifdef ENABLE_PINGER - int time = PING_INTERVAL; + int time = soon ? PING_INTERVAL_SHORT : PING_INTERVAL; if (imp->imp_state == LUSTRE_IMP_DISCON) { int dtime = max_t(int, CONNECTION_SWITCH_MIN, AT_OFF ? 0 : @@ -296,7 +296,7 @@ static int ptlrpc_pinger_main(void *arg) cfs_time_after(imp->imp_next_ping, cfs_time_add(this_ping, cfs_time_seconds(PING_INTERVAL)))) - ptlrpc_update_next_ping(imp); + ptlrpc_update_next_ping(imp, 0); } cfs_mutex_up(&pinger_sem); /* update memory usage info */ @@ -406,7 +406,12 @@ int ptlrpc_stop_pinger(void) void ptlrpc_pinger_sending_on_import(struct obd_import *imp) { - ptlrpc_update_next_ping(imp); + ptlrpc_update_next_ping(imp, 0); +} + +void ptlrpc_pinger_commit_expected(struct obd_import *imp) +{ + ptlrpc_update_next_ping(imp, 1); } int ptlrpc_pinger_add_import(struct obd_import *imp) @@ -420,7 +425,7 @@ int ptlrpc_pinger_add_import(struct obd_import *imp) imp->imp_obd->obd_uuid.uuid, obd2cli_tgt(imp->imp_obd)); /* if we add to pinger we want recovery on this import */ imp->imp_obd->obd_no_recov = 0; - ptlrpc_update_next_ping(imp); + ptlrpc_update_next_ping(imp, 0); /* XXX sort, blah blah */ cfs_list_add_tail(&imp->imp_pinger_chain, &pinger_imports); class_import_get(imp); @@ -902,7 +907,7 @@ void ptlrpc_pinger_sending_on_import(struct obd_import *imp) { #ifdef ENABLE_PINGER cfs_mutex_down(&pinger_sem); - ptlrpc_update_next_ping(imp); + ptlrpc_update_next_ping(imp, 0); if (pinger_args.pd_set == NULL && cfs_time_before(imp->imp_next_ping, pinger_args.pd_next_ping)) { CDEBUG(D_HA, "set next ping to "CFS_TIME_T"(cur "CFS_TIME_T")\n", @@ -913,6 +918,21 @@ void ptlrpc_pinger_sending_on_import(struct obd_import *imp) #endif } +void ptlrpc_pinger_commit_expected(struct obd_import *imp) +{ +#ifdef ENABLE_PINGER + cfs_mutex_down(&pinger_sem); + ptlrpc_update_next_ping(imp, 1); + if (pinger_args.pd_set == NULL && + cfs_time_before(imp->imp_next_ping, pinger_args.pd_next_ping)) { + CDEBUG(D_HA,"set next ping to "CFS_TIME_T"(cur "CFS_TIME_T")\n", + imp->imp_next_ping, cfs_time_current()); + pinger_args.pd_next_ping = imp->imp_next_ping; + } + cfs_mutex_up(&pinger_sem); +#endif +} + int ptlrpc_add_timeout_client(int time, enum timeout_event event, timeout_cb_t cb, void *data, cfs_list_t *obd_list) diff --git a/lustre/ptlrpc/ptlrpc_internal.h b/lustre/ptlrpc/ptlrpc_internal.h index 9186d5d..57bf5ba 100644 --- a/lustre/ptlrpc/ptlrpc_internal.h +++ b/lustre/ptlrpc/ptlrpc_internal.h @@ -96,6 +96,7 @@ void lustre_put_emerg_rs(struct ptlrpc_reply_state *rs); int ptlrpc_start_pinger(void); int ptlrpc_stop_pinger(void); void ptlrpc_pinger_sending_on_import(struct obd_import *imp); +void ptlrpc_pinger_commit_expected(struct obd_import *imp); void ptlrpc_pinger_wake_up(void); void ptlrpc_ping_import_soon(struct obd_import *imp); #ifdef __KERNEL__ diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh index bdb9f40..1d073ec 100755 --- a/lustre/tests/replay-single.sh +++ b/lustre/tests/replay-single.sh @@ -2074,19 +2074,6 @@ test_84a() { } run_test 84a "stale open during export disconnect" -test_85() { # bug 22190 - local fail=0 - do_facet ost1 "lctl set_param -n obdfilter.${ost1_svc}.sync_journal 1" - - replay_barrier ost1 - lfs setstripe -i 0 -c 1 $DIR/$tfile - dd oflag=dsync if=/dev/urandom of=$DIR/$tfile bs=4k count=100 || fail=1 - fail_abort ost1 - echo "FAIL $fail" - [ $fail -ne 0 ] || error "Write was successful" -} -run_test 85 "ensure there is no reply on bulk write if obd is in rdonly mode" - test_86() { local clients=${CLIENTS:-$HOSTNAME} @@ -2097,6 +2084,42 @@ test_86() { } run_test 86 "umount server after clear nid_stats should not hit LBUG" +test_87() { + do_facet ost1 "lctl set_param -n obdfilter.${ost1_svc}.sync_journal 0" + + replay_barrier ost1 + lfs setstripe -i 0 -c 1 $DIR/$tfile + dd if=/dev/urandom of=$DIR/$tfile bs=1024k count=8 || error "Cannot write" + cksum=`md5sum $DIR/$tfile | awk '{print $1}'` + cancel_lru_locks osc + fail ost1 + dd if=$DIR/$tfile of=/dev/null bs=1024k count=8 || error "Cannot read" + cksum2=`md5sum $DIR/$tfile | awk '{print $1}'` + if [ $cksum != $cksum2 ] ; then + error "New checksum $cksum2 does not match original $cksum" + fi +} +run_test 87 "write replay" + +test_87b() { + do_facet ost1 "lctl set_param -n obdfilter.${ost1_svc}.sync_journal 0" + + replay_barrier ost1 + lfs setstripe -i 0 -c 1 $DIR/$tfile + dd if=/dev/urandom of=$DIR/$tfile bs=1024k count=8 || error "Cannot write" + sleep 1 # Give it a chance to flush dirty data + echo TESTTEST | dd of=$DIR/$tfile bs=1 count=8 seek=64 + cksum=`md5sum $DIR/$tfile | awk '{print $1}'` + cancel_lru_locks osc + fail ost1 + dd if=$DIR/$tfile of=/dev/null bs=1024k count=8 || error "Cannot read" + cksum2=`md5sum $DIR/$tfile | awk '{print $1}'` + if [ $cksum != $cksum2 ] ; then + error "New checksum $cksum2 does not match original $cksum" + fi +} +run_test 87b "write replay with changed data (checksum resend)" + equals_msg `basename $0`: test complete, cleaning up check_and_cleanup_lustre [ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG && grep -q FAIL $TESTSUITELOG && exit 1 || true -- 1.8.3.1