Whamcloud - gitweb
LU-2371 ptlrpc: add support for -EINPROGRESS
authorNiu Yawei <niu@whamcloud.com>
Fri, 13 Jan 2012 08:33:22 +0000 (00:33 -0800)
committerJohann Lombardi <johann.lombardi@intel.com>
Tue, 11 Dec 2012 14:56:48 +0000 (09:56 -0500)
Backport patches from LU-904, LU-1329 and LU-1788 to introduce
support for -EINPROGRESS in lustre 1.8. This is needed for
quota interoperability with 2.4 servers.

Signed-off-by: Johann Lombardi <johann@whamcloud.com>
Change-Id: I9136112ca82dbf6caba41c2d41643ec646372852
Signed-off-by: Niu Yawei <niu@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/4655
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Johann Lombardi <johann.lombardi@intel.com>
16 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_net.h
lustre/include/obd_support.h
lustre/liblustre/super.c
lustre/llite/llite_lib.c
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_reint.c
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/obdfilter/filter_io_26.c
lustre/osc/osc_internal.h
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/niobuf.c
lustre/tests/replay-ost-single.sh

index f0e794e..0ed459d 100644 (file)
@@ -368,7 +368,7 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb, int msgsize);
 #define OBD_CONNECT_JOBSTATS    0x20000000000ULL /* jobid in ptlrpc_body */
 #define OBD_CONNECT_UMASK       0x40000000000ULL /* create uses client umask */
 #define OBD_CONNECT_EINPROGRESS 0x80000000000ULL /* client handles -EINPROGRESS
-                                                  * write RPC error properly */
+                                                  * RPC error properly */
 #define OBD_CONNECT_GRANT_PARAM 0x100000000000ULL/* extra grant params used for
                                                   * finer space reservation */
 #define OBD_CONNECT_FLOCK_OWNER  0x200000000000ULL /* for the fixed 1.8
@@ -396,7 +396,7 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb, int msgsize);
                                 OBD_CONNECT_NODEVOH | OBD_CONNECT_ATTRFID | \
                                 OBD_CONNECT_CANCELSET | OBD_CONNECT_AT | \
                                 LRU_RESIZE_CONNECT_FLAG | OBD_CONNECT_VBR |\
-                                OBD_CONNECT_LOV_V3)
+                                OBD_CONNECT_LOV_V3 | OBD_CONNECT_EINPROGRESS)
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
                                 OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \
@@ -404,7 +404,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb, int msgsize);
                                 OBD_CONNECT_CANCELSET | OBD_CONNECT_AT | \
                                 LRU_RESIZE_CONNECT_FLAG | OBD_CONNECT_CKSUM | \
                                 OBD_CONNECT_VBR | OBD_CONNECT_CHANGE_QS | \
-                                OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN)
+                                OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN | \
+                                OBD_CONNECT_EINPROGRESS)
 #define ECHO_CONNECT_SUPPORTED (0)
 #define MGS_CONNECT_SUPPORTED  (OBD_CONNECT_VERSION | OBD_CONNECT_AT)
 
index c1b9a10..de38452 100644 (file)
@@ -335,7 +335,13 @@ struct ptlrpc_request {
                 /* the request is queued to replay during recovery */
                 rq_copy_queued:1,
                 /* whether the "rq_set" is a valid one */
-                rq_invalid_rqset:1;
+                rq_invalid_rqset:1,
+                rq_generation_set:1,
+                /* do not resend request on -EINPROGRESS */
+                rq_no_retry_einprogress:1;
+
+        unsigned int rq_nr_resend;
+
         enum rq_phase rq_phase;     /* one of RQ_PHASE_* */
         enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
         atomic_t rq_refcount;   /* client-side refcount for SENT race,
index ada8d35..3ca828b 100644 (file)
@@ -193,6 +193,7 @@ extern unsigned int obd_alloc_fail_rate;
 #define OBD_FAIL_MDS_LOV_PREP_CREATE     0x141
 #define OBD_FAIL_MDS_SPLIT_OPEN          0x142
 #define OBD_FAIL_MDS_READLINK_EPROTO     0x143
+#define OBD_FAIL_MDS_DQACQ_NET           0x187
 
 #define OBD_FAIL_OST                     0x200
 #define OBD_FAIL_OST_CONNECT_NET         0x201
@@ -235,6 +236,8 @@ extern unsigned int obd_alloc_fail_rate;
 #define OBD_FAIL_OST_NOMEM               0x226
 #define OBD_FAIL_OST_BRW_PAUSE_BULK2     0x227
 #define OBD_FAIL_OST_MAPBLK_ENOSPC       0x228
+#define OBD_FAIL_OST_DQACQ_NET           0x230
+#define OBD_FAIL_OST_STATFS_EINPROGRESS  0x231
 
 #define OBD_FAIL_LDLM                    0x300
 #define OBD_FAIL_LDLM_NAMESPACE_NEW      0x301
index 1da4955..b87a777 100644 (file)
@@ -2079,7 +2079,8 @@ llu_fsswop_mount(const char *source,
         obd_register_lock_cancel_cb(obd, llu_extent_lock_cancel_cb);
 
         ocd.ocd_connect_flags = OBD_CONNECT_SRVLOCK | OBD_CONNECT_REQPORTAL |
-                OBD_CONNECT_VERSION | OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_AT;
+                                OBD_CONNECT_VERSION | OBD_CONNECT_TRUNCLOCK |
+                                OBD_CONNECT_AT | OBD_CONNECT_EINPROGRESS;
         ocd.ocd_version = LUSTRE_VERSION_CODE;
         err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, &ocd, &sbi->ll_osc_exp);
         if (err) {
index 0995701..891ee0e 100644 (file)
@@ -288,7 +288,8 @@ static int client_common_fill_super(struct super_block *sb,
                                   OBD_CONNECT_NODEVOH | OBD_CONNECT_CANCELSET  |
                                   OBD_CONNECT_AT      | OBD_CONNECT_FID        |
                                   OBD_CONNECT_VBR     | OBD_CONNECT_LOV_V3     |
-                                  OBD_CONNECT_64BITHASH;
+                                  OBD_CONNECT_64BITHASH |
+                                  OBD_CONNECT_EINPROGRESS;
 #ifdef HAVE_LRU_RESIZE_SUPPORT
         if (sbi->ll_flags & LL_SBI_LRU_RESIZE)
                 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
@@ -399,7 +400,8 @@ static int client_common_fill_super(struct super_block *sb,
                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE |
                                   OBD_CONNECT_SRVLOCK   | OBD_CONNECT_CANCELSET|
                                   OBD_CONNECT_AT        | OBD_CONNECT_FID      |
-                                  OBD_CONNECT_VBR       | OBD_CONNECT_TRUNCLOCK;
+                                  OBD_CONNECT_VBR       | OBD_CONNECT_TRUNCLOCK|
+                                  OBD_CONNECT_EINPROGRESS;
 
         if (!OBD_FAIL_CHECK(OBD_FAIL_OSC_CONNECT_CKSUM)) {
                 /* OBD_CONNECT_CKSUM should always be set, even if checksums are
index fceff9e..d91a85e 100644 (file)
@@ -603,12 +603,18 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
         struct obd_device *obddev = class_exp2obd(exp);
         struct ldlm_res_id res_id;
         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
-        int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
+        int flags;
+        int generation, resends = 0;
+        struct ldlm_reply *lockrep;
         int rc;
         ENTRY;
 
         fid_build_reg_res_name((void *)&data->fid1, &res_id);
         LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
+
+        generation = obddev->u.cli.cl_import->imp_generation;
+resend:
+        flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
         if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
 
@@ -640,6 +646,17 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
         if (!req)
                 RETURN(-ENOMEM);
 
+        if (it->it_op & IT_CREAT)
+                /* ask ptlrpc not to resend on EINPROGRESS since we have our own
+                 * retry logic */
+                req->rq_no_retry_einprogress = 1;
+
+        if (resends) {
+                req->rq_generation_set = 1;
+                req->rq_import_generation = generation;
+                req->rq_sent = CURRENT_SECONDS + resends;
+        }
+
          /* It is important to obtain rpc_lock first (if applicable), so that
           * threads that are serialised with rpc_lock are not polluting our
           * rpcs in flight counter */
@@ -658,6 +675,32 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                 ptlrpc_req_finished(req);
                 RETURN(rc);
         }
+
+        lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
+                                 sizeof(*lockrep));
+        LASSERT(lockrep != NULL);
+
+        /* Retry the create infinitely when we get -EINPROGRESS from
+         * server. This is required by the new quota design. */
+        if (it->it_op & IT_CREAT &&
+            (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
+                mdc_clear_replay_flag(req, rc);
+                ptlrpc_req_finished(req);
+                resends++;
+
+                CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
+                       obddev->obd_name, resends, it->it_op,
+                       PFID((void *)&data->fid1),
+                       PFID((void *)&data->fid2));
+
+                if (generation == obddev->u.cli.cl_import->imp_generation) {
+                        goto resend;
+                } else {
+                        CDEBUG(D_HA, "resend cross eviction\n");
+                        RETURN(-EIO);
+                }
+        }
+
         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
 
         RETURN(rc);
index 8eadea5..f010428 100644 (file)
@@ -201,15 +201,33 @@ int mdc_create(struct obd_export *exp, struct mdc_op_data *op_data,
         CFS_LIST_HEAD(cancels);
         struct obd_device *obd = exp->exp_obd;
         struct ptlrpc_request *req;
-        int level, bufcount = 3, rc;
-        __u32 size[6] = { sizeof(struct ptlrpc_body),
-                        sizeof(struct mds_rec_create),
-                        op_data->namelen + 1, 0, sizeof(struct ldlm_request) };
-        int offset = REQ_REC_OFF + 3;
-        int count;
+        int level, bufcount, rc;
+        __u32 size[6];
+        int offset;
+        int count, resends = 0;
+        struct obd_import *import = obd->u.cli.cl_import;
+        int generation = import->imp_generation;
         ENTRY;
 
         if (mdc_exp_is_2_0_server(exp)) {
+                struct client_obd *cli = &obd->u.cli;
+                rc = mdc_fid_alloc(cli->cl_seq, (void *)&op_data->fid2);
+                if (rc) {
+                        CERROR("fid allocation result: %d\n", rc);
+                        RETURN(rc);
+                }
+        }
+
+rebuild:
+        size[0] = sizeof(struct ptlrpc_body);
+        size[1] = sizeof(struct mds_rec_create);
+        size[2] = op_data->namelen + 1;
+        size[3] = 0;
+        size[4] = sizeof(struct ldlm_request);
+        offset = REQ_REC_OFF + 3;
+        bufcount = 3;
+
+        if (mdc_exp_is_2_0_server(exp)) {
                 size[REQ_REC_OFF] = sizeof(struct mdt_rec_create);
                 size[REQ_REC_OFF + 1] = 0; /* capa */
                 size[REQ_REC_OFF + 2] = op_data->namelen + 1;
@@ -232,15 +250,6 @@ int mdc_create(struct obd_export *exp, struct mdc_op_data *op_data,
                 }
         }
 
-        if (mdc_exp_is_2_0_server(exp)) {
-                struct client_obd *cli = &obd->u.cli;
-                rc = mdc_fid_alloc(cli->cl_seq, (void *)&op_data->fid2);
-                if (rc) {
-                        CERROR("fid allocation result: %d\n", rc);
-                        RETURN(rc);
-                }
-        }
-
         req = mdc_prep_elc_req(exp, bufcount, size,
                                offset, &cancels, count);
         if (req == NULL)
@@ -255,13 +264,37 @@ int mdc_create(struct obd_export *exp, struct mdc_op_data *op_data,
         size[REPLY_REC_OFF+1] = sizeof(struct ost_lvb);
         ptlrpc_req_set_repsize(req, 3, size);
 
+        /* ask ptlrpc not to resend on EINPROGRESS since we have our own retry
+         * logic here */
+        req->rq_no_retry_einprogress = 1;
+
+        if (resends) {
+                req->rq_generation_set = 1;
+                req->rq_import_generation = generation;
+                req->rq_sent = cfs_time_current_sec() + resends;
+        }
         level = LUSTRE_IMP_FULL;
- resend:
+resend:
         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, level);
         /* Resend if we were told to. */
         if (rc == -ERESTARTSYS) {
                 level = LUSTRE_IMP_RECOVER;
                 goto resend;
+        } else if (rc == -EINPROGRESS) {
+                /* Retry create infinitely until succeed or get other
+                 * error code. */
+                ptlrpc_req_finished(req);
+                resends++;
+
+                CDEBUG(D_HA, "%s: resend:%d create on "DFID"/"DFID"\n",
+                       obd->obd_name, resends,
+                       PFID((void *)&op_data->fid1),
+                       PFID((void *)&op_data->fid2));
+
+                if (generation == import->imp_generation)
+                        goto rebuild;
+                CDEBUG(D_HA, "resend cross eviction\n");
+                RETURN(-EIO);
         }
 
         if (!rc)
index a629110..2d67f65 100644 (file)
@@ -1209,6 +1209,10 @@ int mds_open(struct mds_update_record *rec, int offset,
                                 NULL, NULL, 0);
 
                 ldlm_reply_set_disposition(rep, DISP_OPEN_CREATE);
+
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
+                        GOTO(cleanup, rc = -EINPROGRESS);
+
                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE,
                                       NULL);
                 if (IS_ERR(handle)) {
index a47e844..541b2d7 100644 (file)
@@ -1083,6 +1083,9 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         lquota_chkquota(mds_quota_interface_ref, req->rq_export, ids[0], ids[1],
                         1, quota_pending, NULL, NULL, 0);
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
+                GOTO(cleanup, rc = -EINPROGRESS);
+
         switch (type) {
         case S_IFREG:{
                 handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE, NULL);
index dbd90d6..ad8cea5 100644 (file)
@@ -688,6 +688,9 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         if (rc == -ENOTCONN)
                 GOTO(cleanup, rc);
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_OST_DQACQ_NET))
+                GOTO(cleanup, rc = -EINPROGRESS);
+
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         cleanup_phase = 2;
 
index 1c62269..88233d7 100644 (file)
@@ -120,7 +120,8 @@ static inline void lprocfs_osc_init_vars(struct lprocfs_static_vars *lvars)
 
 static inline int osc_recoverable_error(int rc)
 {
-        return (rc == -EIO || rc == -EROFS || rc == -ENOMEM || rc == -EAGAIN);
+        return (rc == -EIO || rc == -EROFS || rc == -ENOMEM ||
+                rc == -EAGAIN || rc == -EINPROGRESS);
 }
 
 /* return 1 if osc should be resend request */
index 7113817..9be8ee1 100644 (file)
@@ -1222,6 +1222,9 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
 
         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
         ptlrpc_at_set_req_timeout(req);
+       /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
+        * retry logic */
+       req->rq_no_retry_einprogress = 1;
 
         if (opc == OST_WRITE)
                 desc = ptlrpc_prep_bulk_imp (req, page_count,
@@ -1573,11 +1576,12 @@ static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
         struct ptlrpc_request *request;
         int                    rc;
         cfs_waitq_t            waitq;
-        int                    resends = 0;
+        int                    generation, resends = 0;
         struct l_wait_info     lwi;
 
         ENTRY;
         init_waitqueue_head(&waitq);
+        generation = exp->exp_obd->u.cli.cl_import->imp_generation;
 
 restart_bulk:
         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
@@ -1585,6 +1589,12 @@ restart_bulk:
         if (rc != 0)
                 return (rc);
 
+        if (resends) {
+                request->rq_generation_set = 1;
+                request->rq_import_generation = generation;
+                request->rq_sent = CURRENT_SECONDS + resends;
+        }
+
         rc = ptlrpc_queue_wait(request);
 
         if (rc == -ETIMEDOUT && request->rq_resend) {
@@ -1596,37 +1606,48 @@ restart_bulk:
         rc = osc_brw_fini_request(request, rc);
 
         ptlrpc_req_finished(request);
+        /* When server return -EINPROGRESS, client should always retry
+         * regardless of the number of times the bulk was resent already.*/
         if (osc_recoverable_error(rc)) {
                 resends++;
-                if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
-                        CERROR("too many resend retries, returning error\n");
-                        RETURN(-EIO);
+                if (rc != -EINPROGRESS &&
+                    !osc_should_resend(resends, &exp->exp_obd->u.cli)) {
+                        CERROR("%s: too many resend retries for object: "
+                               ""LPU64", rc = %d.\n",
+                               exp->exp_obd->obd_name, oa->o_id, rc);
+                        goto out;
+                }
+                if (generation !=
+                    exp->exp_obd->u.cli.cl_import->imp_generation) {
+                        CDEBUG(D_HA, "%s: resend cross eviction for object: "
+                               ""LPU64", rc = %d.\n",
+                               exp->exp_obd->obd_name, oa->o_id, rc);
+                        goto out;
                 }
 
-                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
+                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
+                                       NULL);
                 l_wait_event(waitq, 0, &lwi);
 
                 goto restart_bulk;
         }
-        RETURN(rc);
+out:
+        if (rc == -EAGAIN || rc == -EINPROGRESS)
+                rc = -EIO;
+        RETURN (rc);
 }
 
-int osc_brw_redo_request(struct ptlrpc_request *request,
-                         struct osc_brw_async_args *aa)
+static int osc_brw_redo_request(struct ptlrpc_request *request,
+                               struct osc_brw_async_args *aa, int rc)
 {
         struct ptlrpc_request *new_req;
         struct ptlrpc_request_set *set = request->rq_set;
         struct osc_brw_async_args *new_aa;
         struct osc_async_page *oap;
-        int rc = 0;
         ENTRY;
 
-        if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
-                CERROR("too many resent retries, returning error\n");
-                RETURN(-EIO);
-        }
-
-        DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
+       DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
+                 "redo for recoverable error %d", rc);
 
         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
@@ -1656,7 +1677,14 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
         aa->aa_resends++;
         new_req->rq_interpret_reply = request->rq_interpret_reply;
         new_req->rq_async_args = request->rq_async_args;
-        new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
+       /* cap resend delay to the current request timeout, this is similar to
+        * what ptlrpc does (see after_reply()) */
+       if (aa->aa_resends > new_req->rq_timeout)
+               new_req->rq_sent = CURRENT_SECONDS + new_req->rq_timeout;
+       else
+               new_req->rq_sent = CURRENT_SECONDS  + aa->aa_resends;
+        new_req->rq_generation_set = 1;
+        new_req->rq_import_generation = request->rq_import_generation;
 
         new_aa = ptlrpc_req_async_args(new_req);
 
@@ -2265,7 +2293,8 @@ static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
 
         rc = osc_brw_fini_request(request, rc);
         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
-
+        /* When server return -EINPROGRESS, client should always retry
+         * regardless of the number of times the bulk was resent already. */
         if (osc_recoverable_error(rc)) {
                 /* Only retry once for mmaped files since the mmaped page
                  * might be modified at anytime. We have to retry at least
@@ -2276,10 +2305,24 @@ static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
                     aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
                     aa->aa_oa->o_flags & OBD_FL_MMAP) {
                         rc = 0;
+                } else if (request->rq_import_generation !=
+                           request->rq_import->imp_generation) {
+                        CDEBUG(D_HA, "%s: resend cross eviction for object: "
+                               ""LPU64", rc = %d.\n",
+                               request->rq_import->imp_obd->obd_name,
+                               aa->aa_oa->o_id, rc);
+                        rc = -EIO;
+                } else if (rc == -EINPROGRESS ||
+                           osc_should_resend(aa->aa_resends, aa->aa_cli)) {
+                        rc = osc_brw_redo_request(request, aa, rc);
+                        if (rc == 0)
+                                RETURN(0);
                 } else {
-                       rc = osc_brw_redo_request(request, aa);
-                       if (rc == 0)
-                               RETURN(0);
+                        CERROR("%s: too many resent retries for object: "
+                               ""LPU64", rc = %d.\n",
+                               request->rq_import->imp_obd->obd_name,
+                               aa->aa_oa->o_id, rc);
+                        rc = -EIO;
                }
         }
 
index c1f3b4a..90311f7 100644 (file)
@@ -183,6 +183,9 @@ static int ost_statfs(struct ptlrpc_request *req)
         if (req->rq_status != 0)
                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_EINPROGRESS))
+               req->rq_status = -EINPROGRESS;
+
         RETURN(0);
 }
 
index 8ecbc5b..d2819fb 100644 (file)
@@ -1039,6 +1039,36 @@ static int after_reply(struct ptlrpc_request *req)
 
         LASSERT ((char *)req->rq_repmsg + req->rq_nob_received <=
                  (char *)req->rq_repbuf + req->rq_replen);
+
+        /* retry indefinitely on EINPROGRESS */
+        if (lustre_msg_get_status(req->rq_repmsg) == -EINPROGRESS &&
+            req->rq_no_resend == 0 && !req->rq_no_retry_einprogress) {
+                DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS");
+                req->rq_resend = 1;
+                req->rq_nr_resend++;
+
+                /* allocate new xid to avoid reply reconstruction */
+                if (!req->rq_bulk) {
+                        /* new xid is already allocated for bulk in
+                         * ptlrpc_check_set() */
+                        req->rq_xid = ptlrpc_next_xid();
+                        DEBUG_REQ(D_RPCTRACE, req, "Allocating new xid for "
+                                  "resend on EINPROGRESS");
+                }
+
+                /* Readjust the timeout for current conditions */
+                ptlrpc_at_set_req_timeout(req);
+                /* delay resend to give a chance to the server to get ready.
+                 * The delay is increased by 1s on every resend and is capped to
+                 * the current request timeout (i.e. obd_timeout if AT is off,
+                 * or AT service time x 125% + 5s, see at_est2timeout) */
+                if (req->rq_nr_resend > req->rq_timeout)
+                        req->rq_sent = CURRENT_SECONDS + req->rq_timeout;
+                else
+                        req->rq_sent = CURRENT_SECONDS + req->rq_nr_resend;
+                RETURN(0);
+        }
+
         rc = unpack_reply(req);
         if (rc)
                 RETURN(rc);
@@ -1119,22 +1149,28 @@ static int after_reply(struct ptlrpc_request *req)
         RETURN(rc);
 }
 
+/**
+ * Helper function to send request \a req over the network for the first time
+ * Also adjusts request phase.
+ * Returns 0 on success or error code.
+ */
 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
 {
-        struct obd_import     *imp;
+        struct obd_import     *imp = req->rq_import;
         int rc;
         ENTRY;
 
         LASSERT(req->rq_phase == RQ_PHASE_NEW);
-        if (req->rq_sent && (req->rq_sent > CURRENT_SECONDS))
+        if (req->rq_sent && (req->rq_sent > cfs_time_current_sec()) &&
+            (!req->rq_generation_set ||
+             req->rq_import_generation == imp->imp_generation))
                 RETURN (0);
 
         ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
 
-        imp = req->rq_import;
         spin_lock(&imp->imp_lock);
-
-        req->rq_import_generation = imp->imp_generation;
+        if (!req->rq_generation_set)
+                req->rq_import_generation = imp->imp_generation;
 
         if (ptlrpc_import_delay_req(imp, req, &rc)) {
                 spin_lock(&req->rq_lock);
@@ -1208,7 +1244,12 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
 
                 /* delayed send - skip */
                 if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
-                        continue;
+                       continue;
+
+               /* delayed resend - skip */
+               if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend &&
+                   req->rq_sent > cfs_time_current_sec())
+                       continue;
 
                 if (!(req->rq_phase == RQ_PHASE_RPC ||
                       req->rq_phase == RQ_PHASE_BULK ||
@@ -1627,6 +1668,8 @@ int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
 
                 if (req->rq_phase == RQ_PHASE_NEW)
                         deadline = req->rq_sent;    /* delayed send */
+               else if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend)
+                       deadline = req->rq_sent;
                 else
                         deadline = req->rq_deadline;
 
index ff06df8..f9d9b8f 100644 (file)
@@ -474,7 +474,8 @@ int ptlrpc_send_error(struct ptlrpc_request *req, int may_be_difficult)
         }
 
         if (req->rq_status != -ENOSPC && req->rq_status != -EACCES &&
-            req->rq_status != -EPERM)
+           req->rq_status != -EPERM && req->rq_status != -ENOENT &&
+           req->rq_status != -EINPROGRESS)
                 req->rq_type = PTL_RPC_MSG_ERR;
 
         rc = ptlrpc_send_reply(req, may_be_difficult);
index 40afe70..1ac89c3 100755 (executable)
@@ -231,6 +231,126 @@ test_7() {
 }
 run_test 7 "Fail OST before obd_destroy"
 
+test_8a() {
+    [ -z "$(lctl get_param -n osc.${FSNAME}-*.connect_flags|grep einprogress)" \
+        ] && skip_env "OSTs don't support EINPROGRESS" && return
+    verify=$ROOT/tmp/verify-$$
+    dd if=/dev/urandom of=$verify bs=4096 count=1280 ||
+        error "Create verify file failed"
+#define OBD_FAIL_OST_DQACQ_NET           0x230
+    do_facet ost1 "lctl set_param fail_loc=0x230"
+    dd if=$verify of=$TDIR/$tfile bs=4096 count=1280 oflag=sync &
+    ddpid=$!
+    sleep $TIMEOUT  # wait for the io to become redo io
+    if ! ps -p $ddpid  > /dev/null 2>&1; then
+            error "redo io finished incorrectly"
+            return 1
+    fi
+    do_facet ost1 "lctl set_param fail_loc=0"
+    wait $ddpid || true
+    cancel_lru_locks osc
+    cmp $verify $TDIR/$tfile || return 2
+    rm -f $verify $TDIR/$tfile
+       message=`dmesg | grep "redo for recoverable error -115"`
+       [ -z "$message" ] || error "redo error messages found in dmesg"
+}
+run_test 8a "Verify redo io: redo io when get -EINPROGRESS error"
+
+test_8b() {
+    [ -z "$(lctl get_param -n osc.${FSNAME}-*.connect_flags|grep einprogress)" \
+        ] && skip_env "OSTs don't support EINPROGRESS" && return
+    verify=$ROOT/tmp/verify-$$
+    dd if=/dev/urandom of=$verify bs=4096 count=1280 ||
+        error "Create verify file failed"
+#define OBD_FAIL_OST_DQACQ_NET           0x230
+    do_facet ost1 "lctl set_param fail_loc=0x230"
+    dd if=$verify of=$TDIR/$tfile bs=4096 count=1280 oflag=sync &
+    ddpid=$!
+    sleep $TIMEOUT  # wait for the io to become redo io
+    fail ost1
+    do_facet ost1 "lctl set_param fail_loc=0"
+    wait $ddpid || return 1
+    cancel_lru_locks osc
+    cmp $verify $TDIR/$tfile || return 2
+    rm -f $verify $TDIR/$tfile
+}
+run_test 8b "Verify redo io: redo io should success after recovery"
+
+test_8c() {
+    [ -z "$(lctl get_param -n osc.${FSNAME}-*.connect_flags|grep einprogress)" \
+        ] && skip_env "OSTs don't support EINPROGRESS" && return
+    verify=$ROOT/tmp/verify-$$
+    dd if=/dev/urandom of=$verify bs=4096 count=1280 ||
+        error "Create verify file failed"
+#define OBD_FAIL_OST_DQACQ_NET           0x230
+    do_facet ost1 "lctl set_param fail_loc=0x230"
+    dd if=$verify of=$TDIR/$tfile bs=4096 count=1280 oflag=sync &
+    ddpid=$!
+    sleep $TIMEOUT  # wait for the io to become redo io
+    ost_evict_client
+    # allow recovery to complete
+    sleep $((TIMEOUT + 2))
+    do_facet ost1 "lctl set_param fail_loc=0"
+    wait $ddpid
+    cancel_lru_locks osc
+    cmp $verify $TDIR/$tfile && return 2
+    rm -f $verify $TDIR/$tfile
+}
+run_test 8c "Verify redo io: redo io should fail after eviction"
+
+test_8d() {
+    [ -z "$(lctl get_param -n mdc.${FSNAME}-*.connect_flags|grep einprogress)" \
+        ] && skip_env "MDS doesn't support EINPROGRESS" && return
+#define OBD_FAIL_MDS_DQACQ_NET           0x187
+    do_facet $SINGLEMDS "lctl set_param fail_loc=0x187"
+    # test the non-intent create path
+    mcreate $TDIR/$tfile &
+    cpid=$!
+    sleep $TIMEOUT
+    if ! ps -p $cpid  > /dev/null 2>&1; then
+            error "mknod finished incorrectly"
+            return 1
+    fi
+    do_facet $SINGLEMDS "lctl set_param fail_loc=0"
+    wait $cpid || return 2
+    stat $TDIR/$tfile || error "mknod failed"
+
+    rm $TDIR/$tfile
+
+#define OBD_FAIL_MDS_DQACQ_NET           0x187
+    do_facet $SINGLEMDS "lctl set_param fail_loc=0x187"
+    # test the intent create path
+    openfile -f O_RDWR:O_CREAT $TDIR/$tfile &
+    cpid=$!
+    sleep $TIMEOUT
+    if ! ps -p $cpid > /dev/null 2>&1; then
+            error "open finished incorrectly"
+            return 3
+    fi
+    do_facet $SINGLEMDS "lctl set_param fail_loc=0"
+    wait $cpid || return 4
+    stat $TDIR/$tfile || error "open failed"
+}
+run_test 8d "Verify redo creation on -EINPROGRESS"
+
+test_8e() {
+    [ -z "$(lctl get_param -n osc.${FSNAME}-*.connect_flags|grep einprogress)" \
+        ] && skip_env "OSTs don't support EINPROGRESS" && return
+    sleep 1 # ensure we have a fresh statfs
+#define OBD_FAIL_OST_STATFS_EINPROGRESS  0x231
+    do_facet ost1 "lctl set_param fail_loc=0x231"
+    df $MOUNT &
+    dfpid=$!
+    sleep $TIMEOUT
+    if ! ps -p $dfpid  > /dev/null 2>&1; then
+        do_facet ost1 "lctl set_param fail_loc=0"
+        error "df shouldn't have completed!"
+        return 1
+    fi
+    do_facet ost1 "lctl set_param fail_loc=0"
+}
+run_test 8e "Verify that ptlrpc resends request on -EINPROGRESS"
+
 complete $(basename $0) $SECONDS
 check_and_cleanup_lustre
 exit_status