Whamcloud - gitweb
LU-2791 ldlm: release reference against failed lock
authorFan Yong <yong.fan@whamcloud.com>
Fri, 8 Feb 2013 19:33:52 +0000 (03:33 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 13 Mar 2013 06:15:28 +0000 (02:15 -0400)
On client-side, when ldlm_cli_enqueue_fini() gets reply from
server, which contains unexpected LVB size, it will mark the
lock as failure, but it does not release one reference, then
the failed lock prevents the lock from being freed. And then
umount client will be blocked.

The ldlm_lvbo_fill() caller should handle failure cases.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I197759a0b964e028627ecb6025820db9517fad7e
Reviewed-on: http://review.whamcloud.com/5634
Reviewed-by: Niu Yawei <yawei.niu@intel.com>
Tested-by: Hudson
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
lustre/include/obd_support.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ofd/ofd_lvb.c
lustre/quota/qmt_lock.c
lustre/tests/sanity.sh

index eeef78b..6d3ccf8 100644 (file)
@@ -337,6 +337,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LDLM_NEW_LOCK           0x319
 #define OBD_FAIL_LDLM_AGL_DELAY          0x31a
 #define OBD_FAIL_LDLM_AGL_NOLOCK         0x31b
 #define OBD_FAIL_LDLM_NEW_LOCK           0x319
 #define OBD_FAIL_LDLM_AGL_DELAY          0x31a
 #define OBD_FAIL_LDLM_AGL_NOLOCK         0x31b
+#define OBD_FAIL_LDLM_OST_LVB           0x31c
 
 /* LOCKLESS IO */
 #define OBD_FAIL_LDLM_SET_CONTENTION     0x385
 
 /* LOCKLESS IO */
 #define OBD_FAIL_LDLM_SET_CONTENTION     0x385
index c331276..18bbd28 100644 (file)
@@ -976,8 +976,20 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
                void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
 
                lvb_len = ldlm_lvbo_fill(lock, lvb, lvb_len);
                void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
 
                lvb_len = ldlm_lvbo_fill(lock, lvb, lvb_len);
-               req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB,
-                                  lvb_len, RCL_CLIENT);
+               if (lvb_len < 0) {
+                       /* We still need to send the RPC to wake up the blocked
+                        * enqueue thread on the client.
+                        *
+                        * Consider old client, there is no better way to notify
+                        * the failure, just zero-sized the LVB, then the client
+                        * will fail out as "-EPROTO". */
+                       req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, 0,
+                                          RCL_CLIENT);
+                       instant_cancel = 1;
+               } else {
+                       req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len,
+                                          RCL_CLIENT);
+               }
         }
 
         LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
         }
 
         LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
@@ -1038,7 +1050,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
 
        rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
 
 
        rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
 
-        RETURN(rc);
+       RETURN(lvb_len < 0 ? lvb_len : rc);
 }
 EXPORT_SYMBOL(ldlm_server_completion_ast);
 
 }
 EXPORT_SYMBOL(ldlm_server_completion_ast);
 
@@ -1427,8 +1439,12 @@ existing_lock:
                                buflen = req_capsule_get_size(&req->rq_pill,
                                                &RMF_DLM_LVB, RCL_SERVER);
                                buflen = ldlm_lvbo_fill(lock, buf, buflen);
                                buflen = req_capsule_get_size(&req->rq_pill,
                                                &RMF_DLM_LVB, RCL_SERVER);
                                buflen = ldlm_lvbo_fill(lock, buf, buflen);
-                               req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB,
-                                                  buflen, RCL_SERVER);
+                               if (buflen >= 0)
+                                       req_capsule_shrink(&req->rq_pill,
+                                                          &RMF_DLM_LVB,
+                                                          buflen, RCL_SERVER);
+                               else
+                                       rc = buflen;
                        }
                 } else {
                         lock_res_and_lock(lock);
                        }
                 } else {
                         lock_res_and_lock(lock);
index 8e1dcef..106241b 100644 (file)
@@ -678,8 +678,10 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                        rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
                                           lock->l_lvb_data, size);
                unlock_res_and_lock(lock);
                        rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
                                           lock->l_lvb_data, size);
                unlock_res_and_lock(lock);
-               if (rc < 0)
+               if (rc < 0) {
+                       cleanup_phase = 1;
                        GOTO(cleanup, rc);
                        GOTO(cleanup, rc);
+               }
         }
 
         if (!is_replay) {
         }
 
         if (!is_replay) {
index be22922..fa1e6f4 100644 (file)
@@ -72,13 +72,16 @@ static int ofd_lvbo_init(struct ldlm_resource *res)
        ofd = ldlm_res_to_ns(res)->ns_lvbp;
        LASSERT(ofd != NULL);
 
        ofd = ldlm_res_to_ns(res)->ns_lvbp;
        LASSERT(ofd != NULL);
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_LVB))
+               RETURN(-ENOMEM);
+
        rc = lu_env_init(&env, LCT_DT_THREAD);
        if (rc)
                RETURN(rc);
 
        OBD_ALLOC_PTR(lvb);
        if (lvb == NULL)
        rc = lu_env_init(&env, LCT_DT_THREAD);
        if (rc)
                RETURN(rc);
 
        OBD_ALLOC_PTR(lvb);
        if (lvb == NULL)
-               RETURN(-ENOMEM);
+               GOTO(out, rc = -ENOMEM);
 
        res->lr_lvb_data = lvb;
        res->lr_lvb_len = sizeof(*lvb);
 
        res->lr_lvb_data = lvb;
        res->lr_lvb_len = sizeof(*lvb);
@@ -109,7 +112,7 @@ out_put:
        ofd_object_put(&env, fo);
 out:
        lu_env_fini(&env);
        ofd_object_put(&env, fo);
 out:
        lu_env_fini(&env);
-       if (rc)
+       if (rc && lvb != NULL)
                OST_LVB_SET_ERR(lvb->lvb_blocks, rc);
        /* Don't free lvb data on lookup error */
        return rc;
                OST_LVB_SET_ERR(lvb->lvb_blocks, rc);
        /* Don't free lvb data on lookup error */
        return rc;
@@ -283,6 +286,10 @@ static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int buflen)
        struct ldlm_resource *res = lock->l_resource;
        int lvb_len;
 
        struct ldlm_resource *res = lock->l_resource;
        int lvb_len;
 
+       /* Former lvbo_init not allocate the "LVB". */
+       if (unlikely(res->lr_lvb_len == 0))
+               return 0;
+
        lvb_len = ofd_lvbo_size(lock);
        LASSERT(lvb_len <= res->lr_lvb_len);
 
        lvb_len = ofd_lvbo_size(lock);
        LASSERT(lvb_len <= res->lr_lvb_len);
 
index 05c60b0..3236fb5 100644 (file)
@@ -137,6 +137,9 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
        lvb_len = ldlm_lvbo_size(*lockp);
        lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
        lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
        lvb_len = ldlm_lvbo_size(*lockp);
        lvb_len = ldlm_lvbo_fill(*lockp, lvb, lvb_len);
+       if (lvb_len < 0)
+               GOTO(out, rc = lvb_len);
+
        req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
        EXIT;
 out:
        req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
        EXIT;
 out:
index f16be85..8778301 100644 (file)
@@ -10864,6 +10864,20 @@ test_231b() {
 }
 run_test 231b "must not assert on fully utilized OST request buffer"
 
 }
 run_test 231b "must not assert on fully utilized OST request buffer"
 
+test_232() {
+       mkdir -p $DIR/$tdir
+       #define OBD_FAIL_LDLM_OST_LVB            0x31c
+       $LCTL set_param fail_loc=0x31c
+
+       # ignore dd failure
+       dd if=/dev/zero of=$DIR/$tdir/$tfile bs=1M count=1 || true
+
+       $LCTL set_param fail_loc=0
+       umount_client $MOUNT || error "umount failed"
+       mount_client $MOUNT || error "mount failed"
+}
+run_test 232 "failed lock should not block umount"
+
 #
 # tests that do cleanup/setup should be run at the end
 #
 #
 # tests that do cleanup/setup should be run at the end
 #