#define cfs_page_index(p) ((p)->index)
+#define cfs_page_pin(page) page_cache_get(page)
+#define cfs_page_unpin(page) page_cache_release(page)
+
/*
* Memory allocator
* XXX Liang: move these declare to public file
#define cfs_get_page(p) __I_should_not_be_called__(at_all)
#define cfs_page_count(p) __I_should_not_be_called__(at_all)
#define cfs_page_index(p) ((p)->index)
+#define cfs_page_pin(page) do {} while (0)
+#define cfs_page_unpin(page) do {} while (0)
/*
* Memory allocator
#define OBD_BRW_NOCACHE 0x80 /* this page is a part of non-cached IO */
#define OBD_BRW_NOQUOTA 0x100
#define OBD_BRW_SRVLOCK 0x200 /* Client holds no lock over this page */
+#define OBD_BRW_ASYNC 0x400 /* Server may delay commit to disk */
#define OBD_BRW_MEMALLOC 0x800 /* Client runs in the "kswapd" context */
#define OBD_OBJECT_EOF 0xffffffffffffffffULL
int fo_fmd_max_num; /* per exp filter_mod_data */
int fo_fmd_max_age; /* jiffies to fmd expiry */
+ unsigned long fo_syncjournal:1, /* sync journal on writes */
+ fo_sync_lock_cancel:2;/* sync on lock cancel */
+
/* sptlrpc stuff */
cfs_rwlock_t fo_sptlrpc_lock;
#define OSC_MAX_DIRTY_MB_MAX 2048 /* arbitrary, but < MAX_LONG bytes */
#define OSC_DEFAULT_RESENDS 10
+/* possible values for fo_sync_lock_cancel */
+enum {
+ NEVER_SYNC_ON_CANCEL = 0,
+ BLOCKING_SYNC_ON_CANCEL = 1,
+ ALWAYS_SYNC_ON_CANCEL = 2,
+ NUM_SYNC_ON_CANCEL_STATES
+};
+
#define MDC_MAX_RIF_DEFAULT 8
#define MDC_MAX_RIF_MAX 512
/* KEY_SET_INFO in lustre_idl.h */
#define KEY_SPTLRPC_CONF "sptlrpc_conf"
#define KEY_CONNECT_FLAG "connect_flags"
+#define KEY_SYNC_LOCK_CANCEL "sync_lock_cancel"
struct lu_context;
#define OBD_RECOVERY_TIME_SOFT (obd_timeout * 3)
/* Change recovery-small 26b time if you change this */
#define PING_INTERVAL max(obd_timeout / 4, 1U)
+/* a bit more than maximal journal commit time in seconds */
+#define PING_INTERVAL_SHORT min(PING_INTERVAL, 7U)
/* Client may skip 1 ping; we must wait at least 2.5. But for multiple
* failover targets the client only pings one server at a time, and pings
* can be lost on a loaded network. Since eviction has serious consequences,
int rc;
int verify;
int gfp_mask;
+ int brw_flags = 0;
ENTRY;
verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
/* XXX think again with misaligned I/O */
npages = count >> CFS_PAGE_SHIFT;
+ if (rw == OBD_BRW_WRITE)
+ brw_flags = OBD_BRW_ASYNC;
+
OBD_ALLOC(pga, npages * sizeof(*pga));
if (pga == NULL)
RETURN(-ENOMEM);
pages[i] = pgp->pg;
pgp->count = CFS_PAGE_SIZE;
pgp->off = off;
- pgp->flag = 0;
+ pgp->flag = brw_flags;
if (verify)
echo_client_page_debug_setup(lsm, pgp->pg, rw,
filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
filter->fo_fmd_max_num = FILTER_FMD_MAX_NUM_DEFAULT;
filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT;
+ filter->fo_syncjournal = 0; /* Don't sync journals on i/o by default */
+ filter_slc_set(filter); /* initialize sync on lock cancel */
rc = filter_prep(obd);
if (rc)
RETURN(rc);
}
-static int filter_create(struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md **ea, struct obd_trans_info *oti)
+int filter_create(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
struct obd_device *obd = exp->exp_obd;
struct filter_export_data *fed;
if ((oa->o_valid & OBD_MD_FLFLAGS) &&
(oa->o_flags & OBD_FL_RECREATE_OBJS)) {
- if (oa->o_id > filter_last_id(filter, oa->o_seq)) {
+ if (!obd->obd_recovering ||
+ oa->o_id > filter_last_id(filter, oa->o_seq)) {
CERROR("recreate objid "LPU64" > last id "LPU64"\n",
oa->o_id, filter_last_id(filter, oa->o_seq));
rc = -EINVAL;
RETURN(rc);
}
+ if (KEY_IS(KEY_SYNC_LOCK_CANCEL)) {
+ *((__u32 *) val) = obd->u.filter.fo_sync_lock_cancel;
+ *vallen = sizeof(__u32);
+ RETURN(0);
+ }
+
CDEBUG(D_IOCTL, "invalid key\n");
RETURN(-EINVAL);
}
int filter_setattr(struct obd_export *exp, struct obd_info *oinfo,
struct obd_trans_info *oti);
-struct dentry *filter_create_object(struct obd_device *obd, struct obdo *oa);
+int filter_create(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md **ea, struct obd_trans_info *oti);
struct obd_llog_group *filter_find_olg(struct obd_device *obd, int seq);
void blacklist_del(uid_t uid);
int blacklist_display(char *buf, int bufsize);
+/* sync on lock cancel is useless when we force a journal flush,
+ * and if we enable async journal commit, we should also turn on
+ * sync on lock cancel if it is not enabled already. */
+static inline void filter_slc_set(struct filter_obd *filter)
+{
+ if (filter->fo_syncjournal == 1)
+ filter->fo_sync_lock_cancel = NEVER_SYNC_ON_CANCEL;
+ else if (filter->fo_sync_lock_cancel == NEVER_SYNC_ON_CANCEL)
+ filter->fo_sync_lock_cancel = ALWAYS_SYNC_ON_CANCEL;
+}
+
#endif /* _FILTER_INTERNAL_H */
cleanup_phase = 2;
if (dentry->d_inode == NULL) {
- CERROR("%s: trying to BRW to non-existent file "LPU64"\n",
- obd->obd_name, obj->ioo_id);
- GOTO(cleanup, rc = -ENOENT);
+ if (exp->exp_obd->obd_recovering) {
+ struct obdo *noa = oa;
+
+ if (oa == NULL) {
+ OBDO_ALLOC(noa);
+ if (noa == NULL)
+ GOTO(recreate_out, rc = -ENOMEM);
+ noa->o_id = obj->ioo_id;
+ noa->o_valid = OBD_MD_FLID;
+ }
+
+ if (filter_create(exp, noa, NULL, oti) == 0) {
+ f_dput(dentry);
+ dentry = filter_fid2dentry(exp->exp_obd, NULL,
+ obj->ioo_seq,
+ obj->ioo_id);
+ }
+ if (oa == NULL)
+ OBDO_FREE(noa);
+ }
+ recreate_out:
+ if (IS_ERR(dentry) || dentry->d_inode == NULL) {
+ CERROR("%s: BRW to missing obj "LPU64"/"LPU64":rc %d\n",
+ exp->exp_obd->obd_name,
+ obj->ioo_id, obj->ioo_seq,
+ IS_ERR(dentry) ? (int)PTR_ERR(dentry) : -ENOENT);
+ if (IS_ERR(dentry))
+ cleanup_phase = 1;
+ GOTO(cleanup, rc = -ENOENT);
+ }
}
if (oa->o_valid & (OBD_MD_FLUID | OBD_MD_FLGID) &&
lnb[i].page = pga[i].pg;
rnb[i].offset = pga[i].off;
rnb[i].len = pga[i].count;
+ lnb[i].flags = rnb[i].flags = pga[i].flag;
}
obdo_to_ioobj(oinfo->oi_oa, &ioo);
rc = rc2;
}
- rc2 = fsfilt_commit_async(obd,inode,oti->oti_handle,
- wait_handle);
+ if (wait_handle)
+ rc2 = fsfilt_commit_async(obd,inode,oti->oti_handle,
+ wait_handle);
+ else
+ rc2 = fsfilt_commit(obd, inode, oti->oti_handle, 0);
if (rc == 0)
rc = rc2;
if (rc != 0)
int i, err, cleanup_phase = 0;
struct obd_device *obd = exp->exp_obd;
struct filter_obd *fo = &obd->u.filter;
- void *wait_handle;
+ void *wait_handle = NULL;
int total_size = 0;
unsigned int qcids[MAXQUOTAS] = { oa->o_uid, oa->o_gid };
int rec_pending[MAXQUOTAS] = { 0, 0 }, quota_pages = 0;
+ int sync_journal_commit = obd->u.filter.fo_syncjournal;
ENTRY;
LASSERT(oti != NULL);
(flags & (OBD_BRW_FROM_GRANT | OBD_BRW_SYNC)) ==
OBD_BRW_FROM_GRANT)
iobuf->dr_ignore_quota = 1;
+
+ if (!(lnb->flags & OBD_BRW_ASYNC)) {
+ sync_journal_commit = 1;
+ }
}
/* we try to get enough quota to write here, and let ldiskfs
/* filter_direct_io drops i_mutex */
rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr,
- oti, &wait_handle);
+ oti, sync_journal_commit ? &wait_handle : NULL);
obdo_from_inode(oa, inode, NULL, rc == 0 ? FILTER_VALID_FLAGS : 0 |
OBD_MD_FLUID |OBD_MD_FLGID);
fsfilt_check_slow(obd, now, "direct_io");
- err = fsfilt_commit_wait(obd, inode, wait_handle);
+ if (wait_handle)
+ err = fsfilt_commit_wait(obd, inode, wait_handle);
+ else
+ err = 0;
+
if (err) {
CERROR("Failure to commit OST transaction (%d)?\n", err);
- rc = err;
+ if (rc == 0)
+ rc = err;
}
- if (obd->obd_replayable && !rc)
+ if (obd->obd_replayable && !rc && wait_handle)
LASSERTF(oti->oti_transno <= obd->obd_last_committed,
"oti_transno "LPU64" last_committed "LPU64"\n",
oti->oti_transno, obd->obd_last_committed);
return count;
}
+int lprocfs_filter_rd_syncjournal(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct obd_device *obd = data;
+ int rc;
+
+ rc = snprintf(page, count, "%u\n", obd->u.filter.fo_syncjournal);
+ return rc;
+}
+
+int lprocfs_filter_wr_syncjournal(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct obd_device *obd = data;
+ int val;
+ int rc;
+
+ rc = lprocfs_write_helper(buffer, count, &val);
+ if (rc)
+ return rc;
+
+ if (val < 0)
+ return -EINVAL;
+
+ obd->u.filter.fo_syncjournal = !!val;
+ filter_slc_set(&obd->u.filter);
+
+ return count;
+}
+
+static char *sync_on_cancel_states[] = {"never",
+ "blocking",
+ "always" };
+
+int lprocfs_filter_rd_sync_lock_cancel(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct obd_device *obd = data;
+ int rc;
+
+ rc = snprintf(page, count, "%s\n",
+ sync_on_cancel_states[obd->u.filter.fo_sync_lock_cancel]);
+ return rc;
+}
+
+int lprocfs_filter_wr_sync_lock_cancel(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct obd_device *obd = data;
+ int val = -1;
+ int i;
+
+ for (i = 0 ; i < NUM_SYNC_ON_CANCEL_STATES; i++) {
+ if (memcmp(buffer, sync_on_cancel_states[i],
+ strlen(sync_on_cancel_states[i])) == 0) {
+ val = i;
+ break;
+ }
+ }
+ if (val == -1) {
+ int rc;
+ rc = lprocfs_write_helper(buffer, count, &val);
+ if (rc)
+ return rc;
+ }
+
+ if (val < 0 || val > 2)
+ return -EINVAL;
+
+ obd->u.filter.fo_sync_lock_cancel = val;
+ return count;
+}
+
static struct lprocfs_vars lprocfs_filter_obd_vars[] = {
{ "uuid", lprocfs_rd_uuid, 0, 0 },
{ "blocksize", lprocfs_rd_blksize, 0, 0 },
{ "mds_sync", lprocfs_filter_rd_mds_sync, 0, 0},
{ "degraded", lprocfs_filter_rd_degraded,
lprocfs_filter_wr_degraded, 0 },
+ { "sync_journal", lprocfs_filter_rd_syncjournal,
+ lprocfs_filter_wr_syncjournal, 0 },
+ { "sync_on_lock_cancel", lprocfs_filter_rd_sync_lock_cancel,
+ lprocfs_filter_wr_sync_lock_cancel, 0 },
{ 0 }
};
struct osc_page *opg = cl2osc_page(slice);
struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
int result;
- int brw_flags;
+ /* All cacheable IO is async-capable */
+ int brw_flags = OBD_BRW_ASYNC;
int noquota = 0;
LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0));
ENTRY;
/* Set the OBD_BRW_SRVLOCK before the page is queued. */
- brw_flags = opg->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
+ brw_flags |= opg->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
if (!client_is_remote(osc_export(obj)) &&
cfs_capable(CFS_CAP_SYS_RESOURCE)) {
brw_flags |= OBD_BRW_NOQUOTA;
{
if (p1->flag != p2->flag) {
unsigned mask = ~(OBD_BRW_FROM_GRANT|
- OBD_BRW_NOCACHE|OBD_BRW_SYNC);
+ OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
/* warn if we try to combine flags that we don't know to be
* safe to combine */
}
OBDO_FREE(aa->aa_oa);
} else { /* from async_internal() */
- int i;
+ obd_count i;
for (i = 0; i < aa->aa_page_count; i++)
osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
if (!async)
cl_req_completion(env, aa->aa_clerq, rc);
osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
+
RETURN(rc);
}
if (desc)
ptlrpc_free_bulk(desc);
out:
- /* XXX: don't send reply if obd rdonly mode, this can cause data loss
- * on client, see bug 22190. Remove this when async bulk will be done.
- * Meanwhile, if this is umount then don't reply anything. */
- if (req->rq_export->exp_obd->obd_no_transno) {
- no_reply = req->rq_export->exp_obd->obd_stopping;
- rc = -EIO;
- }
-
if (rc == 0) {
oti_to_request(oti, req);
target_committed_to_req(req);
return rc;
}
+/* Ensure that data and metadata are synced to the disk when lock is cancelled
+ * (if requested) */
+int ost_blocking_ast(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *desc,
+ void *data, int flag)
+{
+ __u32 sync_lock_cancel = 0;
+ __u32 len = sizeof(sync_lock_cancel);
+ int rc = 0;
+ ENTRY;
+
+ rc = obd_get_info(lock->l_export, sizeof(KEY_SYNC_LOCK_CANCEL),
+ KEY_SYNC_LOCK_CANCEL, &len, &sync_lock_cancel, NULL);
+
+ if (!rc && flag == LDLM_CB_CANCELING &&
+ (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) &&
+ (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
+ (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
+ lock->l_flags & LDLM_FL_CBPENDING))) {
+ struct obdo *oa;
+ int rc;
+
+ OBDO_ALLOC(oa);
+ oa->o_id = lock->l_resource->lr_name.name[0];
+ oa->o_seq = lock->l_resource->lr_name.name[1];
+ oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP;
+
+ rc = obd_sync(lock->l_export, oa, NULL,
+ lock->l_policy_data.l_extent.start,
+ lock->l_policy_data.l_extent.end, NULL);
+ if (rc)
+ CERROR("Error %d syncing data on lock cancel\n", rc);
+
+ OBDO_FREE(oa);
+ }
+
+ return ldlm_server_blocking_ast(lock, desc, data, flag);
+}
+
static int ost_filter_recovery_request(struct ptlrpc_request *req,
struct obd_device *obd, int *process)
{
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE))
RETURN(0);
rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
- ldlm_server_blocking_ast,
+ ost_blocking_ast,
ldlm_server_glimpse_ast);
fail = OBD_FAIL_OST_LDLM_REPLY_NET;
break;
desc->bd_nob += len;
+ cfs_page_pin(page);
ptlrpc_add_bulk_page(desc, page, pageoffset, len);
}
*/
void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
{
+ int i;
ENTRY;
LASSERT(desc != NULL);
else
class_import_put(desc->bd_import);
+ for (i = 0; i < desc->bd_iov_count ; i++)
+ cfs_page_unpin(desc->bd_iov[i].kiov_page);
+
OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
bd_iov[desc->bd_max_iov]));
EXIT;
lustre_msg_get_last_committed(req->rq_repmsg);
}
ptlrpc_free_committed(imp);
+
+ if (req->rq_transno > imp->imp_peer_committed_transno)
+ ptlrpc_pinger_commit_expected(imp);
+
cfs_spin_unlock(&imp->imp_lock);
}
ENTRY;
LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
- /* Not handling automatic bulk replay yet (or ever?) */
- LASSERT(req->rq_bulk == NULL);
LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
aa = ptlrpc_req_async_args(req);
/* XXX Registering the same xid on retried bulk makes my head
* explode trying to understand how the original request's bulk
- * might interfere with the retried request -eeb */
- LASSERTF (!desc->bd_registered || req->rq_xid != desc->bd_last_xid,
- "registered: %d rq_xid: "LPU64" bd_last_xid: "LPU64"\n",
- desc->bd_registered, req->rq_xid, desc->bd_last_xid);
+ * might interfere with the retried request -eeb
+ * On the other hand replaying with the same xid is fine, since
+ * we are guaranteed old request have completed. -green */
+ LASSERTF(!(desc->bd_registered &&
+ req->rq_send_state != LUSTRE_IMP_REPLAY) ||
+ req->rq_xid != desc->bd_last_xid,
+ "registered: %d rq_xid: "LPU64" bd_last_xid: "LPU64"\n",
+ desc->bd_registered, req->rq_xid, desc->bd_last_xid);
desc->bd_registered = 1;
desc->bd_last_xid = req->rq_xid;
RETURN(0);
}
-void ptlrpc_update_next_ping(struct obd_import *imp)
+void ptlrpc_update_next_ping(struct obd_import *imp, int soon)
{
#ifdef ENABLE_PINGER
- int time = PING_INTERVAL;
+ int time = soon ? PING_INTERVAL_SHORT : PING_INTERVAL;
if (imp->imp_state == LUSTRE_IMP_DISCON) {
int dtime = max_t(int, CONNECTION_SWITCH_MIN,
AT_OFF ? 0 :
cfs_time_after(imp->imp_next_ping,
cfs_time_add(this_ping,
cfs_time_seconds(PING_INTERVAL))))
- ptlrpc_update_next_ping(imp);
+ ptlrpc_update_next_ping(imp, 0);
}
cfs_mutex_up(&pinger_sem);
/* update memory usage info */
void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
{
- ptlrpc_update_next_ping(imp);
+ ptlrpc_update_next_ping(imp, 0);
+}
+
+void ptlrpc_pinger_commit_expected(struct obd_import *imp)
+{
+ ptlrpc_update_next_ping(imp, 1);
}
int ptlrpc_pinger_add_import(struct obd_import *imp)
imp->imp_obd->obd_uuid.uuid, obd2cli_tgt(imp->imp_obd));
/* if we add to pinger we want recovery on this import */
imp->imp_obd->obd_no_recov = 0;
- ptlrpc_update_next_ping(imp);
+ ptlrpc_update_next_ping(imp, 0);
/* XXX sort, blah blah */
cfs_list_add_tail(&imp->imp_pinger_chain, &pinger_imports);
class_import_get(imp);
{
#ifdef ENABLE_PINGER
cfs_mutex_down(&pinger_sem);
- ptlrpc_update_next_ping(imp);
+ ptlrpc_update_next_ping(imp, 0);
if (pinger_args.pd_set == NULL &&
cfs_time_before(imp->imp_next_ping, pinger_args.pd_next_ping)) {
CDEBUG(D_HA, "set next ping to "CFS_TIME_T"(cur "CFS_TIME_T")\n",
#endif
}
+void ptlrpc_pinger_commit_expected(struct obd_import *imp)
+{
+#ifdef ENABLE_PINGER
+ cfs_mutex_down(&pinger_sem);
+ ptlrpc_update_next_ping(imp, 1);
+ if (pinger_args.pd_set == NULL &&
+ cfs_time_before(imp->imp_next_ping, pinger_args.pd_next_ping)) {
+ CDEBUG(D_HA,"set next ping to "CFS_TIME_T"(cur "CFS_TIME_T")\n",
+ imp->imp_next_ping, cfs_time_current());
+ pinger_args.pd_next_ping = imp->imp_next_ping;
+ }
+ cfs_mutex_up(&pinger_sem);
+#endif
+}
+
int ptlrpc_add_timeout_client(int time, enum timeout_event event,
timeout_cb_t cb, void *data,
cfs_list_t *obd_list)
int ptlrpc_start_pinger(void);
int ptlrpc_stop_pinger(void);
void ptlrpc_pinger_sending_on_import(struct obd_import *imp);
+void ptlrpc_pinger_commit_expected(struct obd_import *imp);
void ptlrpc_pinger_wake_up(void);
void ptlrpc_ping_import_soon(struct obd_import *imp);
#ifdef __KERNEL__
}
run_test 84a "stale open during export disconnect"
-test_85() { # bug 22190
- local fail=0
- do_facet ost1 "lctl set_param -n obdfilter.${ost1_svc}.sync_journal 1"
-
- replay_barrier ost1
- lfs setstripe -i 0 -c 1 $DIR/$tfile
- dd oflag=dsync if=/dev/urandom of=$DIR/$tfile bs=4k count=100 || fail=1
- fail_abort ost1
- echo "FAIL $fail"
- [ $fail -ne 0 ] || error "Write was successful"
-}
-run_test 85 "ensure there is no reply on bulk write if obd is in rdonly mode"
-
test_86() {
local clients=${CLIENTS:-$HOSTNAME}
}
run_test 86 "umount server after clear nid_stats should not hit LBUG"
+test_87() {
+ do_facet ost1 "lctl set_param -n obdfilter.${ost1_svc}.sync_journal 0"
+
+ replay_barrier ost1
+ lfs setstripe -i 0 -c 1 $DIR/$tfile
+ dd if=/dev/urandom of=$DIR/$tfile bs=1024k count=8 || error "Cannot write"
+ cksum=`md5sum $DIR/$tfile | awk '{print $1}'`
+ cancel_lru_locks osc
+ fail ost1
+ dd if=$DIR/$tfile of=/dev/null bs=1024k count=8 || error "Cannot read"
+ cksum2=`md5sum $DIR/$tfile | awk '{print $1}'`
+ if [ $cksum != $cksum2 ] ; then
+ error "New checksum $cksum2 does not match original $cksum"
+ fi
+}
+run_test 87 "write replay"
+
+test_87b() {
+ do_facet ost1 "lctl set_param -n obdfilter.${ost1_svc}.sync_journal 0"
+
+ replay_barrier ost1
+ lfs setstripe -i 0 -c 1 $DIR/$tfile
+ dd if=/dev/urandom of=$DIR/$tfile bs=1024k count=8 || error "Cannot write"
+ sleep 1 # Give it a chance to flush dirty data
+ echo TESTTEST | dd of=$DIR/$tfile bs=1 count=8 seek=64
+ cksum=`md5sum $DIR/$tfile | awk '{print $1}'`
+ cancel_lru_locks osc
+ fail ost1
+ dd if=$DIR/$tfile of=/dev/null bs=1024k count=8 || error "Cannot read"
+ cksum2=`md5sum $DIR/$tfile | awk '{print $1}'`
+ if [ $cksum != $cksum2 ] ; then
+ error "New checksum $cksum2 does not match original $cksum"
+ fi
+}
+run_test 87b "write replay with changed data (checksum resend)"
+
equals_msg `basename $0`: test complete, cleaning up
check_and_cleanup_lustre
[ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG && grep -q FAIL $TESTSUITELOG && exit 1 || true