#define OBD_CONNECT_JOBSTATS 0x20000000000ULL /* jobid in ptlrpc_body */
#define OBD_CONNECT_UMASK 0x40000000000ULL /* create uses client umask */
#define OBD_CONNECT_EINPROGRESS 0x80000000000ULL /* client handles -EINPROGRESS
- * write RPC error properly */
+ * RPC error properly */
#define OBD_CONNECT_GRANT_PARAM 0x100000000000ULL/* extra grant params used for
* finer space reservation */
#define OBD_CONNECT_FLOCK_OWNER 0x200000000000ULL /* for the fixed 1.8
OBD_CONNECT_NODEVOH | OBD_CONNECT_ATTRFID | \
OBD_CONNECT_CANCELSET | OBD_CONNECT_AT | \
LRU_RESIZE_CONNECT_FLAG | OBD_CONNECT_VBR |\
- OBD_CONNECT_LOV_V3)
+ OBD_CONNECT_LOV_V3 | OBD_CONNECT_EINPROGRESS)
#define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \
OBD_CONNECT_CANCELSET | OBD_CONNECT_AT | \
LRU_RESIZE_CONNECT_FLAG | OBD_CONNECT_CKSUM | \
OBD_CONNECT_VBR | OBD_CONNECT_CHANGE_QS | \
- OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN)
+ OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN | \
+ OBD_CONNECT_EINPROGRESS)
#define ECHO_CONNECT_SUPPORTED (0)
#define MGS_CONNECT_SUPPORTED (OBD_CONNECT_VERSION | OBD_CONNECT_AT)
/* the request is queued to replay during recovery */
rq_copy_queued:1,
/* whether the "rq_set" is a valid one */
- rq_invalid_rqset:1;
+ rq_invalid_rqset:1,
+ rq_generation_set:1,
+ /* do not resend request on -EINPROGRESS */
+ rq_no_retry_einprogress:1;
+
+ unsigned int rq_nr_resend;
+
enum rq_phase rq_phase; /* one of RQ_PHASE_* */
enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
atomic_t rq_refcount; /* client-side refcount for SENT race,
#define OBD_FAIL_MDS_LOV_PREP_CREATE 0x141
#define OBD_FAIL_MDS_SPLIT_OPEN 0x142
#define OBD_FAIL_MDS_READLINK_EPROTO 0x143
+#define OBD_FAIL_MDS_DQACQ_NET 0x187
#define OBD_FAIL_OST 0x200
#define OBD_FAIL_OST_CONNECT_NET 0x201
#define OBD_FAIL_OST_NOMEM 0x226
#define OBD_FAIL_OST_BRW_PAUSE_BULK2 0x227
#define OBD_FAIL_OST_MAPBLK_ENOSPC 0x228
+#define OBD_FAIL_OST_DQACQ_NET 0x230
+#define OBD_FAIL_OST_STATFS_EINPROGRESS 0x231
#define OBD_FAIL_LDLM 0x300
#define OBD_FAIL_LDLM_NAMESPACE_NEW 0x301
obd_register_lock_cancel_cb(obd, llu_extent_lock_cancel_cb);
ocd.ocd_connect_flags = OBD_CONNECT_SRVLOCK | OBD_CONNECT_REQPORTAL |
- OBD_CONNECT_VERSION | OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_AT;
+ OBD_CONNECT_VERSION | OBD_CONNECT_TRUNCLOCK |
+ OBD_CONNECT_AT | OBD_CONNECT_EINPROGRESS;
ocd.ocd_version = LUSTRE_VERSION_CODE;
err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, &ocd, &sbi->ll_osc_exp);
if (err) {
OBD_CONNECT_NODEVOH | OBD_CONNECT_CANCELSET |
OBD_CONNECT_AT | OBD_CONNECT_FID |
OBD_CONNECT_VBR | OBD_CONNECT_LOV_V3 |
- OBD_CONNECT_64BITHASH;
+ OBD_CONNECT_64BITHASH |
+ OBD_CONNECT_EINPROGRESS;
#ifdef HAVE_LRU_RESIZE_SUPPORT
if (sbi->ll_flags & LL_SBI_LRU_RESIZE)
data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE |
OBD_CONNECT_SRVLOCK | OBD_CONNECT_CANCELSET|
OBD_CONNECT_AT | OBD_CONNECT_FID |
- OBD_CONNECT_VBR | OBD_CONNECT_TRUNCLOCK;
+ OBD_CONNECT_VBR | OBD_CONNECT_TRUNCLOCK|
+ OBD_CONNECT_EINPROGRESS;
if (!OBD_FAIL_CHECK(OBD_FAIL_OSC_CONNECT_CKSUM)) {
/* OBD_CONNECT_CKSUM should always be set, even if checksums are
struct obd_device *obddev = class_exp2obd(exp);
struct ldlm_res_id res_id;
ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
- int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
+ int flags;
+ int generation, resends = 0;
+ struct ldlm_reply *lockrep;
int rc;
ENTRY;
fid_build_reg_res_name((void *)&data->fid1, &res_id);
LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
+
+ generation = obddev->u.cli.cl_import->imp_generation;
+resend:
+ flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
if (!req)
RETURN(-ENOMEM);
+ if (it->it_op & IT_CREAT)
+ /* ask ptlrpc not to resend on EINPROGRESS since we have our own
+ * retry logic */
+ req->rq_no_retry_einprogress = 1;
+
+ if (resends) {
+ req->rq_generation_set = 1;
+ req->rq_import_generation = generation;
+ req->rq_sent = CURRENT_SECONDS + resends;
+ }
+
/* It is important to obtain rpc_lock first (if applicable), so that
* threads that are serialised with rpc_lock are not polluting our
* rpcs in flight counter */
ptlrpc_req_finished(req);
RETURN(rc);
}
+
+ lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
+ sizeof(*lockrep));
+ LASSERT(lockrep != NULL);
+
+ /* Retry the create infinitely when we get -EINPROGRESS from
+ * server. This is required by the new quota design. */
+ if (it->it_op & IT_CREAT &&
+ (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
+ mdc_clear_replay_flag(req, rc);
+ ptlrpc_req_finished(req);
+ resends++;
+
+ CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
+ obddev->obd_name, resends, it->it_op,
+ PFID((void *)&data->fid1),
+ PFID((void *)&data->fid2));
+
+ if (generation == obddev->u.cli.cl_import->imp_generation) {
+ goto resend;
+ } else {
+ CDEBUG(D_HA, "resend cross eviction\n");
+ RETURN(-EIO);
+ }
+ }
+
rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
RETURN(rc);
CFS_LIST_HEAD(cancels);
struct obd_device *obd = exp->exp_obd;
struct ptlrpc_request *req;
- int level, bufcount = 3, rc;
- __u32 size[6] = { sizeof(struct ptlrpc_body),
- sizeof(struct mds_rec_create),
- op_data->namelen + 1, 0, sizeof(struct ldlm_request) };
- int offset = REQ_REC_OFF + 3;
- int count;
+ int level, bufcount, rc;
+ __u32 size[6];
+ int offset;
+ int count, resends = 0;
+ struct obd_import *import = obd->u.cli.cl_import;
+ int generation = import->imp_generation;
ENTRY;
if (mdc_exp_is_2_0_server(exp)) {
+ struct client_obd *cli = &obd->u.cli;
+ rc = mdc_fid_alloc(cli->cl_seq, (void *)&op_data->fid2);
+ if (rc) {
+ CERROR("fid allocation result: %d\n", rc);
+ RETURN(rc);
+ }
+ }
+
+rebuild:
+ size[0] = sizeof(struct ptlrpc_body);
+ size[1] = sizeof(struct mds_rec_create);
+ size[2] = op_data->namelen + 1;
+ size[3] = 0;
+ size[4] = sizeof(struct ldlm_request);
+ offset = REQ_REC_OFF + 3;
+ bufcount = 3;
+
+ if (mdc_exp_is_2_0_server(exp)) {
size[REQ_REC_OFF] = sizeof(struct mdt_rec_create);
size[REQ_REC_OFF + 1] = 0; /* capa */
size[REQ_REC_OFF + 2] = op_data->namelen + 1;
}
}
- if (mdc_exp_is_2_0_server(exp)) {
- struct client_obd *cli = &obd->u.cli;
- rc = mdc_fid_alloc(cli->cl_seq, (void *)&op_data->fid2);
- if (rc) {
- CERROR("fid allocation result: %d\n", rc);
- RETURN(rc);
- }
- }
-
req = mdc_prep_elc_req(exp, bufcount, size,
offset, &cancels, count);
if (req == NULL)
size[REPLY_REC_OFF+1] = sizeof(struct ost_lvb);
ptlrpc_req_set_repsize(req, 3, size);
+ /* ask ptlrpc not to resend on EINPROGRESS since we have our own retry
+ * logic here */
+ req->rq_no_retry_einprogress = 1;
+
+ if (resends) {
+ req->rq_generation_set = 1;
+ req->rq_import_generation = generation;
+ req->rq_sent = cfs_time_current_sec() + resends;
+ }
level = LUSTRE_IMP_FULL;
- resend:
+resend:
rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, level);
/* Resend if we were told to. */
if (rc == -ERESTARTSYS) {
level = LUSTRE_IMP_RECOVER;
goto resend;
+ } else if (rc == -EINPROGRESS) {
+ /* Retry create infinitely until succeed or get other
+ * error code. */
+ ptlrpc_req_finished(req);
+ resends++;
+
+ CDEBUG(D_HA, "%s: resend:%d create on "DFID"/"DFID"\n",
+ obd->obd_name, resends,
+ PFID((void *)&op_data->fid1),
+ PFID((void *)&op_data->fid2));
+
+ if (generation == import->imp_generation)
+ goto rebuild;
+ CDEBUG(D_HA, "resend cross eviction\n");
+ RETURN(-EIO);
}
if (!rc)
NULL, NULL, 0);
ldlm_reply_set_disposition(rep, DISP_OPEN_CREATE);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
+ GOTO(cleanup, rc = -EINPROGRESS);
+
handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE,
NULL);
if (IS_ERR(handle)) {
lquota_chkquota(mds_quota_interface_ref, req->rq_export, ids[0], ids[1],
1, quota_pending, NULL, NULL, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
+ GOTO(cleanup, rc = -EINPROGRESS);
+
switch (type) {
case S_IFREG:{
handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE, NULL);
if (rc == -ENOTCONN)
GOTO(cleanup, rc);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_DQACQ_NET))
+ GOTO(cleanup, rc = -EINPROGRESS);
+
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
cleanup_phase = 2;
static inline int osc_recoverable_error(int rc)
{
- return (rc == -EIO || rc == -EROFS || rc == -ENOMEM || rc == -EAGAIN);
+ return (rc == -EIO || rc == -EROFS || rc == -ENOMEM ||
+ rc == -EAGAIN || rc == -EINPROGRESS);
}
/* return 1 if osc should be resend request */
req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
ptlrpc_at_set_req_timeout(req);
+ /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
+ * retry logic */
+ req->rq_no_retry_einprogress = 1;
if (opc == OST_WRITE)
desc = ptlrpc_prep_bulk_imp (req, page_count,
struct ptlrpc_request *request;
int rc;
cfs_waitq_t waitq;
- int resends = 0;
+ int generation, resends = 0;
struct l_wait_info lwi;
ENTRY;
init_waitqueue_head(&waitq);
+ generation = exp->exp_obd->u.cli.cl_import->imp_generation;
restart_bulk:
rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
if (rc != 0)
return (rc);
+ if (resends) {
+ request->rq_generation_set = 1;
+ request->rq_import_generation = generation;
+ request->rq_sent = CURRENT_SECONDS + resends;
+ }
+
rc = ptlrpc_queue_wait(request);
if (rc == -ETIMEDOUT && request->rq_resend) {
rc = osc_brw_fini_request(request, rc);
ptlrpc_req_finished(request);
+ /* When server return -EINPROGRESS, client should always retry
+ * regardless of the number of times the bulk was resent already.*/
if (osc_recoverable_error(rc)) {
resends++;
- if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
- CERROR("too many resend retries, returning error\n");
- RETURN(-EIO);
+ if (rc != -EINPROGRESS &&
+ !osc_should_resend(resends, &exp->exp_obd->u.cli)) {
+ CERROR("%s: too many resend retries for object: "
+ ""LPU64", rc = %d.\n",
+ exp->exp_obd->obd_name, oa->o_id, rc);
+ goto out;
+ }
+ if (generation !=
+ exp->exp_obd->u.cli.cl_import->imp_generation) {
+ CDEBUG(D_HA, "%s: resend cross eviction for object: "
+ ""LPU64", rc = %d.\n",
+ exp->exp_obd->obd_name, oa->o_id, rc);
+ goto out;
}
- lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
+ lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
+ NULL);
l_wait_event(waitq, 0, &lwi);
goto restart_bulk;
}
- RETURN(rc);
+out:
+ if (rc == -EAGAIN || rc == -EINPROGRESS)
+ rc = -EIO;
+ RETURN (rc);
}
-int osc_brw_redo_request(struct ptlrpc_request *request,
- struct osc_brw_async_args *aa)
+static int osc_brw_redo_request(struct ptlrpc_request *request,
+ struct osc_brw_async_args *aa, int rc)
{
struct ptlrpc_request *new_req;
struct ptlrpc_request_set *set = request->rq_set;
struct osc_brw_async_args *new_aa;
struct osc_async_page *oap;
- int rc = 0;
ENTRY;
- if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
- CERROR("too many resent retries, returning error\n");
- RETURN(-EIO);
- }
-
- DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
+ DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
+ "redo for recoverable error %d", rc);
rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
aa->aa_resends++;
new_req->rq_interpret_reply = request->rq_interpret_reply;
new_req->rq_async_args = request->rq_async_args;
- new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
+ /* cap resend delay to the current request timeout, this is similar to
+ * what ptlrpc does (see after_reply()) */
+ if (aa->aa_resends > new_req->rq_timeout)
+ new_req->rq_sent = CURRENT_SECONDS + new_req->rq_timeout;
+ else
+ new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
+ new_req->rq_generation_set = 1;
+ new_req->rq_import_generation = request->rq_import_generation;
new_aa = ptlrpc_req_async_args(new_req);
rc = osc_brw_fini_request(request, rc);
CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
-
+ /* When server return -EINPROGRESS, client should always retry
+ * regardless of the number of times the bulk was resent already. */
if (osc_recoverable_error(rc)) {
/* Only retry once for mmaped files since the mmaped page
* might be modified at anytime. We have to retry at least
aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
aa->aa_oa->o_flags & OBD_FL_MMAP) {
rc = 0;
+ } else if (request->rq_import_generation !=
+ request->rq_import->imp_generation) {
+ CDEBUG(D_HA, "%s: resend cross eviction for object: "
+ ""LPU64", rc = %d.\n",
+ request->rq_import->imp_obd->obd_name,
+ aa->aa_oa->o_id, rc);
+ rc = -EIO;
+ } else if (rc == -EINPROGRESS ||
+ osc_should_resend(aa->aa_resends, aa->aa_cli)) {
+ rc = osc_brw_redo_request(request, aa, rc);
+ if (rc == 0)
+ RETURN(0);
} else {
- rc = osc_brw_redo_request(request, aa);
- if (rc == 0)
- RETURN(0);
+ CERROR("%s: too many resent retries for object: "
+ ""LPU64", rc = %d.\n",
+ request->rq_import->imp_obd->obd_name,
+ aa->aa_oa->o_id, rc);
+ rc = -EIO;
}
}
if (req->rq_status != 0)
CERROR("ost: statfs failed: rc %d\n", req->rq_status);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_EINPROGRESS))
+ req->rq_status = -EINPROGRESS;
+
RETURN(0);
}
LASSERT ((char *)req->rq_repmsg + req->rq_nob_received <=
(char *)req->rq_repbuf + req->rq_replen);
+
+ /* retry indefinitely on EINPROGRESS */
+ if (lustre_msg_get_status(req->rq_repmsg) == -EINPROGRESS &&
+ req->rq_no_resend == 0 && !req->rq_no_retry_einprogress) {
+ DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS");
+ req->rq_resend = 1;
+ req->rq_nr_resend++;
+
+ /* allocate new xid to avoid reply reconstruction */
+ if (!req->rq_bulk) {
+ /* new xid is already allocated for bulk in
+ * ptlrpc_check_set() */
+ req->rq_xid = ptlrpc_next_xid();
+ DEBUG_REQ(D_RPCTRACE, req, "Allocating new xid for "
+ "resend on EINPROGRESS");
+ }
+
+ /* Readjust the timeout for current conditions */
+ ptlrpc_at_set_req_timeout(req);
+ /* delay resend to give a chance to the server to get ready.
+ * The delay is increased by 1s on every resend and is capped to
+ * the current request timeout (i.e. obd_timeout if AT is off,
+ * or AT service time x 125% + 5s, see at_est2timeout) */
+ if (req->rq_nr_resend > req->rq_timeout)
+ req->rq_sent = CURRENT_SECONDS + req->rq_timeout;
+ else
+ req->rq_sent = CURRENT_SECONDS + req->rq_nr_resend;
+ RETURN(0);
+ }
+
rc = unpack_reply(req);
if (rc)
RETURN(rc);
RETURN(rc);
}
+/**
+ * Helper function to send request \a req over the network for the first time
+ * Also adjusts request phase.
+ * Returns 0 on success or error code.
+ */
static int ptlrpc_send_new_req(struct ptlrpc_request *req)
{
- struct obd_import *imp;
+ struct obd_import *imp = req->rq_import;
int rc;
ENTRY;
LASSERT(req->rq_phase == RQ_PHASE_NEW);
- if (req->rq_sent && (req->rq_sent > CURRENT_SECONDS))
+ if (req->rq_sent && (req->rq_sent > cfs_time_current_sec()) &&
+ (!req->rq_generation_set ||
+ req->rq_import_generation == imp->imp_generation))
RETURN (0);
ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
- imp = req->rq_import;
spin_lock(&imp->imp_lock);
-
- req->rq_import_generation = imp->imp_generation;
+ if (!req->rq_generation_set)
+ req->rq_import_generation = imp->imp_generation;
if (ptlrpc_import_delay_req(imp, req, &rc)) {
spin_lock(&req->rq_lock);
/* delayed send - skip */
if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
- continue;
+ continue;
+
+ /* delayed resend - skip */
+ if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend &&
+ req->rq_sent > cfs_time_current_sec())
+ continue;
if (!(req->rq_phase == RQ_PHASE_RPC ||
req->rq_phase == RQ_PHASE_BULK ||
if (req->rq_phase == RQ_PHASE_NEW)
deadline = req->rq_sent; /* delayed send */
+ else if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend)
+ deadline = req->rq_sent;
else
deadline = req->rq_deadline;
}
if (req->rq_status != -ENOSPC && req->rq_status != -EACCES &&
- req->rq_status != -EPERM)
+ req->rq_status != -EPERM && req->rq_status != -ENOENT &&
+ req->rq_status != -EINPROGRESS)
req->rq_type = PTL_RPC_MSG_ERR;
rc = ptlrpc_send_reply(req, may_be_difficult);
}
run_test 7 "Fail OST before obd_destroy"
+test_8a() {
+ [ -z "$(lctl get_param -n osc.${FSNAME}-*.connect_flags|grep einprogress)" \
+ ] && skip_env "OSTs don't support EINPROGRESS" && return
+ verify=$ROOT/tmp/verify-$$
+ dd if=/dev/urandom of=$verify bs=4096 count=1280 ||
+ error "Create verify file failed"
+#define OBD_FAIL_OST_DQACQ_NET 0x230
+ do_facet ost1 "lctl set_param fail_loc=0x230"
+ dd if=$verify of=$TDIR/$tfile bs=4096 count=1280 oflag=sync &
+ ddpid=$!
+ sleep $TIMEOUT # wait for the io to become redo io
+ if ! ps -p $ddpid > /dev/null 2>&1; then
+ error "redo io finished incorrectly"
+ return 1
+ fi
+ do_facet ost1 "lctl set_param fail_loc=0"
+ wait $ddpid || true
+ cancel_lru_locks osc
+ cmp $verify $TDIR/$tfile || return 2
+ rm -f $verify $TDIR/$tfile
+ message=`dmesg | grep "redo for recoverable error -115"`
+ [ -z "$message" ] || error "redo error messages found in dmesg"
+}
+run_test 8a "Verify redo io: redo io when get -EINPROGRESS error"
+
+test_8b() {
+ [ -z "$(lctl get_param -n osc.${FSNAME}-*.connect_flags|grep einprogress)" \
+ ] && skip_env "OSTs don't support EINPROGRESS" && return
+ verify=$ROOT/tmp/verify-$$
+ dd if=/dev/urandom of=$verify bs=4096 count=1280 ||
+ error "Create verify file failed"
+#define OBD_FAIL_OST_DQACQ_NET 0x230
+ do_facet ost1 "lctl set_param fail_loc=0x230"
+ dd if=$verify of=$TDIR/$tfile bs=4096 count=1280 oflag=sync &
+ ddpid=$!
+ sleep $TIMEOUT # wait for the io to become redo io
+ fail ost1
+ do_facet ost1 "lctl set_param fail_loc=0"
+ wait $ddpid || return 1
+ cancel_lru_locks osc
+ cmp $verify $TDIR/$tfile || return 2
+ rm -f $verify $TDIR/$tfile
+}
+run_test 8b "Verify redo io: redo io should success after recovery"
+
+test_8c() {
+ [ -z "$(lctl get_param -n osc.${FSNAME}-*.connect_flags|grep einprogress)" \
+ ] && skip_env "OSTs don't support EINPROGRESS" && return
+ verify=$ROOT/tmp/verify-$$
+ dd if=/dev/urandom of=$verify bs=4096 count=1280 ||
+ error "Create verify file failed"
+#define OBD_FAIL_OST_DQACQ_NET 0x230
+ do_facet ost1 "lctl set_param fail_loc=0x230"
+ dd if=$verify of=$TDIR/$tfile bs=4096 count=1280 oflag=sync &
+ ddpid=$!
+ sleep $TIMEOUT # wait for the io to become redo io
+ ost_evict_client
+ # allow recovery to complete
+ sleep $((TIMEOUT + 2))
+ do_facet ost1 "lctl set_param fail_loc=0"
+ wait $ddpid
+ cancel_lru_locks osc
+ cmp $verify $TDIR/$tfile && return 2
+ rm -f $verify $TDIR/$tfile
+}
+run_test 8c "Verify redo io: redo io should fail after eviction"
+
+test_8d() {
+ [ -z "$(lctl get_param -n mdc.${FSNAME}-*.connect_flags|grep einprogress)" \
+ ] && skip_env "MDS doesn't support EINPROGRESS" && return
+#define OBD_FAIL_MDS_DQACQ_NET 0x187
+ do_facet $SINGLEMDS "lctl set_param fail_loc=0x187"
+ # test the non-intent create path
+ mcreate $TDIR/$tfile &
+ cpid=$!
+ sleep $TIMEOUT
+ if ! ps -p $cpid > /dev/null 2>&1; then
+ error "mknod finished incorrectly"
+ return 1
+ fi
+ do_facet $SINGLEMDS "lctl set_param fail_loc=0"
+ wait $cpid || return 2
+ stat $TDIR/$tfile || error "mknod failed"
+
+ rm $TDIR/$tfile
+
+#define OBD_FAIL_MDS_DQACQ_NET 0x187
+ do_facet $SINGLEMDS "lctl set_param fail_loc=0x187"
+ # test the intent create path
+ openfile -f O_RDWR:O_CREAT $TDIR/$tfile &
+ cpid=$!
+ sleep $TIMEOUT
+ if ! ps -p $cpid > /dev/null 2>&1; then
+ error "open finished incorrectly"
+ return 3
+ fi
+ do_facet $SINGLEMDS "lctl set_param fail_loc=0"
+ wait $cpid || return 4
+ stat $TDIR/$tfile || error "open failed"
+}
+run_test 8d "Verify redo creation on -EINPROGRESS"
+
+test_8e() {
+ [ -z "$(lctl get_param -n osc.${FSNAME}-*.connect_flags|grep einprogress)" \
+ ] && skip_env "OSTs don't support EINPROGRESS" && return
+ sleep 1 # ensure we have a fresh statfs
+#define OBD_FAIL_OST_STATFS_EINPROGRESS 0x231
+ do_facet ost1 "lctl set_param fail_loc=0x231"
+ df $MOUNT &
+ dfpid=$!
+ sleep $TIMEOUT
+ if ! ps -p $dfpid > /dev/null 2>&1; then
+ do_facet ost1 "lctl set_param fail_loc=0"
+ error "df shouldn't have completed!"
+ return 1
+ fi
+ do_facet ost1 "lctl set_param fail_loc=0"
+}
+run_test 8e "Verify that ptlrpc resends request on -EINPROGRESS"
+
complete $(basename $0) $SECONDS
check_and_cleanup_lustre
exit_status