Whamcloud - gitweb
LU-1329 ptlrpc: handle -EINPROGRESS for create
authorNiu Yawei <niu@whamcloud.com>
Wed, 18 Apr 2012 07:35:38 +0000 (00:35 -0700)
committerOleg Drokin <green@whamcloud.com>
Mon, 7 May 2012 19:36:31 +0000 (15:36 -0400)
If some metadata operation get -EINPROGRESS from server, client
should retry the operation infinitely. Which is required by
the new quota design in the DNE environment.

We handle -EINPROGRESS only for create operation for now, if
necessary, it can be extended for other operations later.

Signed-off-by: Niu Yawei <niu@whamcloud.com>
Change-Id: Iad568afeff0af1d4df840d3acf2df161df6f7690
Reviewed-on: http://review.whamcloud.com/2572
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre/lustre_idl.h
lustre/include/obd_support.h
lustre/llite/llite_lib.c
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_reint.c
lustre/mdd/mdd_dir.c
lustre/osc/osc_request.c
lustre/tests/replay-ost-single.sh

index e0e22db..9b7ddc8 100644 (file)
@@ -1158,7 +1158,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                 OBD_CONNECT_FID | LRU_RESIZE_CONNECT_FLAG | \
                                 OBD_CONNECT_VBR | OBD_CONNECT_LOV_V3 | \
                                 OBD_CONNECT_SOM | OBD_CONNECT_FULL20 | \
-                                OBD_CONNECT_64BITHASH)
+                                OBD_CONNECT_64BITHASH | \
+                                OBD_CONNECT_EINPROGRESS)
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
                                 OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \
index 33cc91e..ad2d222 100644 (file)
@@ -245,6 +245,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_MDS_WRITEPAGE_PACK      0x184
 #define OBD_FAIL_MDS_RECOVERY_ACCEPTS_GAPS 0x185
 #define OBD_FAIL_MDS_GET_INFO_NET        0x186
+#define OBD_FAIL_MDS_DQACQ_NET           0x187
 
 #define OBD_FAIL_OST                     0x200
 #define OBD_FAIL_OST_CONNECT_NET         0x201
index efaa76f..8f03c8e 100644 (file)
@@ -215,7 +215,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                   OBD_CONNECT_CANCELSET | OBD_CONNECT_FID     |
                                   OBD_CONNECT_AT       | OBD_CONNECT_LOV_V3   |
                                   OBD_CONNECT_RMT_CLIENT | OBD_CONNECT_VBR    |
-                                  OBD_CONNECT_FULL20   | OBD_CONNECT_64BITHASH;
+                                  OBD_CONNECT_FULL20   | OBD_CONNECT_64BITHASH|
+                                  OBD_CONNECT_EINPROGRESS;
 
         if (sbi->ll_flags & LL_SBI_SOM_PREVIEW)
                 data->ocd_connect_flags |= OBD_CONNECT_SOM;
index 759c3f5..df77acf 100644 (file)
@@ -635,7 +635,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 {
         struct obd_device     *obddev = class_exp2obd(exp);
         struct ptlrpc_request *req = NULL;
-        int                    flags = extra_lock_flags;
+        int                    flags, saved_flags = extra_lock_flags;
         int                    rc;
         struct ldlm_res_id res_id;
         static const ldlm_policy_data_t lookup_policy =
@@ -643,6 +643,8 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
         static const ldlm_policy_data_t update_policy =
                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
         ldlm_policy_data_t const *policy = &lookup_policy;
+        int                    generation, resends = 0;
+        struct ldlm_reply     *lockrep;
         ENTRY;
 
         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
@@ -651,13 +653,15 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
 
         if (it)
-                flags |= LDLM_FL_HAS_INTENT;
+                saved_flags |= LDLM_FL_HAS_INTENT;
         if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
                 policy = &update_policy;
 
-        if (reqp)
-                req = *reqp;
+        LASSERT(reqp == NULL);
 
+        generation = obddev->u.cli.cl_import->imp_generation;
+resend:
+        flags = saved_flags;
         if (!it) {
                 /* The only way right now is FLOCK, in this case we hide flock
                    policy as lmm, but lmmsize is 0 */
@@ -686,6 +690,12 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
         if (IS_ERR(req))
                 RETURN(PTR_ERR(req));
 
+        if (resends) {
+                req->rq_generation_set = 1;
+                req->rq_import_generation = generation;
+                req->rq_sent = cfs_time_current_sec() + resends;
+        }
+
         /* It is important to obtain rpc_lock first (if applicable), so that
          * threads that are serialised with rpc_lock are not polluting our
          * rpcs in flight counter. We do not do flock request limiting, though*/
@@ -702,13 +712,6 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 
         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
                               0, lockh, 0);
-        if (reqp)
-                *reqp = req;
-
-        if (it) {
-                mdc_exit_request(&obddev->u.cli);
-                mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
-        }
         if (!it) {
                 /* For flock requests we immediatelly return without further
                    delay and let caller deal with the rest, since rest of
@@ -717,12 +720,39 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                 RETURN(rc);
         }
 
+        mdc_exit_request(&obddev->u.cli);
+        mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+
         if (rc < 0) {
                 CERROR("ldlm_cli_enqueue: %d\n", rc);
                 mdc_clear_replay_flag(req, rc);
                 ptlrpc_req_finished(req);
                 RETURN(rc);
         }
+
+        lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+        LASSERT(lockrep != NULL);
+
+        /* Retry the create infinitely when we get -EINPROGRESS from
+         * server. This is required by the new quota design. */
+        if (it && it->it_op & IT_CREAT &&
+            (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
+                mdc_clear_replay_flag(req, rc);
+                ptlrpc_req_finished(req);
+                resends++;
+
+                CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
+                       obddev->obd_name, resends, it->it_op,
+                       PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
+
+                if (generation == obddev->u.cli.cl_import->imp_generation) {
+                        goto resend;
+                } else {
+                        CDEBUG(D_HA, "resned cross eviction\n");
+                        RETURN(-EIO);
+                }
+        }
+
         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
 
         RETURN(rc);
index f8f6f16..034569c 100644 (file)
@@ -222,7 +222,9 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
 {
         struct ptlrpc_request *req;
         int level, rc;
-        int count = 0;
+        int count, resends = 0;
+        struct obd_import *import = exp->exp_obd->u.cli.cl_import;
+        int generation = import->imp_generation;
         CFS_LIST_HEAD(cancels);
         ENTRY;
 
@@ -239,6 +241,8 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
                 }
         }
 
+rebuild:
+        count = 0;
         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
             (fid_is_sane(&op_data->op_fid1)))
                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
@@ -272,6 +276,11 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
 
         ptlrpc_request_set_replen(req);
 
+        if (resends) {
+                req->rq_generation_set = 1;
+                req->rq_import_generation = generation;
+                req->rq_sent = cfs_time_current_sec() + resends;
+        }
         level = LUSTRE_IMP_FULL;
  resend:
         rc = mdc_reint(req, exp->exp_obd->u.cli.cl_rpc_lock, level);
@@ -280,6 +289,22 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
         if (rc == -ERESTARTSYS) {
                 level = LUSTRE_IMP_RECOVER;
                 goto resend;
+        } else if (rc == -EINPROGRESS) {
+                /* Retry create infinitely until succeed or get other
+                 * error code. */
+                ptlrpc_req_finished(req);
+                resends++;
+
+                CDEBUG(D_HA, "%s: resend:%d create on "DFID"/"DFID"\n",
+                       exp->exp_obd->obd_name, resends,
+                       PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
+
+                if (generation == import->imp_generation) {
+                        goto rebuild;
+                } else {
+                        CDEBUG(D_HA, "resend cross eviction\n");
+                        RETURN(-EIO);
+                }
         } else if (rc == 0) {
                 struct mdt_body *body;
                 struct lustre_capa *capa;
index 099fcc3..ebc061a 100644 (file)
@@ -1998,6 +1998,9 @@ static int mdd_create(const struct lu_env *env,
         }
 #endif
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
+                GOTO(out_pending, rc = -EINPROGRESS);
+
         /*
          * No RPC inside the transaction, so OST objects should be created at
          * first.
@@ -2192,6 +2195,10 @@ out_pending:
                               quota_opc);
         }
 #endif
+        /* The child object shouldn't be cached anymore */
+        if (rc)
+                cfs_set_bit(LU_OBJECT_HEARD_BANSHEE,
+                            &child->mo_lu.lo_header->loh_flags);
         return rc;
 }
 
index 995391d..85b02e7 100644 (file)
@@ -1713,6 +1713,7 @@ restart_bulk:
         if (resends) {
                 req->rq_generation_set = 1;
                 req->rq_import_generation = generation;
+                req->rq_sent = cfs_time_current_sec() + resends;
         }
 
         rc = ptlrpc_queue_wait(req);
index 036750b..46e4f02 100755 (executable)
@@ -305,6 +305,39 @@ test_8c() {
 run_test 8c "Verify redo io: redo io should fail after eviction"
 
 
+test_9d() {
+#define OBD_FAIL_MDS_DQACQ_NET           0x187
+    do_facet $SINGLEMDS "lctl set_param fail_loc=0x187"
+    # test the non-intent create path
+    mcreate $TDIR/$tfile &
+    cpid=$!
+    sleep $TIMEOUT
+    if ! ps -p $cpid  > /dev/null 2>&1; then
+            error "mknod finished incorrectly"
+            return 1
+    fi
+    do_facet $SINGLEMDS "lctl set_param fail_loc=0"
+    wait $cpid || return 2
+    stat $TDIR/$tfile || error "mknod failed"
+
+    rm $TDIR/$tfile
+
+#define OBD_FAIL_MDS_DQACQ_NET           0x187
+    do_facet $SINGLEMDS "lctl set_param fail_loc=0x187"
+    # test the intent create path
+    openfile -f O_RDWR:O_CREAT $TDIR/$tfile &
+    cpid=$!
+    sleep $TIMEOUT
+    if ! ps -p $cpid > /dev/null 2>&1; then
+            error "open finished incorrectly"
+            return 3
+    fi
+    do_facet $SINGLEMDS "lctl set_param fail_loc=0"
+    wait $cpid || return 4
+    stat $TDIR/$tfile || error "open failed"
+}
+run_test 9d "Verify redo creation on -EINPROGRESS"
+
 complete $(basename $0) $SECONDS
 check_and_cleanup_lustre
 exit_status