Whamcloud - gitweb
b=16774 cancel unused osc locks before replay.
authorEric Mei <eric.mei@oracle.com>
Tue, 21 Sep 2010 18:20:49 +0000 (22:20 +0400)
committerMikhail Pershin <tappro@sun.com>
Mon, 27 Sep 2010 13:31:01 +0000 (17:31 +0400)
r=oleg.drokin
r=di.wang

lustre/osc/osc_cl_internal.h
lustre/osc/osc_internal.h
lustre/osc/osc_lock.c
lustre/osc/osc_page.c
lustre/osc/osc_request.c
lustre/tests/replay-single.sh

index 16cd956..6478c26 100644 (file)
@@ -196,6 +196,17 @@ struct osc_lock {
         struct lustre_handle     ols_handle;
         struct ldlm_enqueue_info ols_einfo;
         enum osc_lock_state      ols_state;
         struct lustre_handle     ols_handle;
         struct ldlm_enqueue_info ols_einfo;
         enum osc_lock_state      ols_state;
+
+        /**
+         * How many pages are using this lock for io, currently only used by
+         * read-ahead. If non-zero, the underlying dlm lock won't be cancelled
+         * during recovery to avoid deadlock. see bz16774.
+         *
+         * \see osc_page::ops_lock
+         * \see osc_page_addref_lock(), osc_page_putref_lock()
+         */
+        cfs_atomic_t             ols_pageref;
+
         /**
          * true, if ldlm_lock_addref() was called against
          * osc_lock::ols_lock. This is used for sanity checking.
         /**
          * true, if ldlm_lock_addref() was called against
          * osc_lock::ols_lock. This is used for sanity checking.
@@ -301,6 +312,16 @@ struct osc_page {
          * Submit time - the time when the page is starting RPC. For debugging.
          */
         cfs_time_t            ops_submit_time;
          * Submit time - the time when the page is starting RPC. For debugging.
          */
         cfs_time_t            ops_submit_time;
+
+        /**
+         * A lock of which we hold a reference covers this page. Only used by
+         * read-ahead: for a readahead page, we hold it's covering lock to
+         * prevent it from being canceled during recovery.
+         *
+         * \see osc_lock::ols_pageref
+         * \see osc_page_addref_lock(), osc_page_putref_lock().
+         */
+        struct cl_lock       *ops_lock;
 };
 
 extern cfs_mem_cache_t *osc_page_kmem;
 };
 
 extern cfs_mem_cache_t *osc_page_kmem;
index 8bf8039..9928169 100644 (file)
@@ -227,5 +227,6 @@ static inline struct osc_device *obd2osc_dev(const struct obd_device *d)
         return container_of0(d->obd_lu_dev, struct osc_device, od_cl.cd_lu_dev);
 }
 
         return container_of0(d->obd_lu_dev, struct osc_device, od_cl.cd_lu_dev);
 }
 
+int osc_dlm_lock_pageref(struct ldlm_lock *dlm);
 
 #endif /* OSC_INTERNAL_H */
 
 #endif /* OSC_INTERNAL_H */
index 73f6083..42de3de 100644 (file)
@@ -54,6 +54,8 @@
  *  @{ 
  */
 
  *  @{ 
  */
 
+#define _PAGEREF_MAGIC  (-10000000)
+
 /*****************************************************************************
  *
  * Type conversions.
 /*****************************************************************************
  *
  * Type conversions.
@@ -223,6 +225,8 @@ static void osc_lock_fini(const struct lu_env *env,
          */
         osc_lock_unhold(ols);
         LASSERT(ols->ols_lock == NULL);
          */
         osc_lock_unhold(ols);
         LASSERT(ols->ols_lock == NULL);
+        LASSERT(cfs_atomic_read(&ols->ols_pageref) == 0 ||
+                cfs_atomic_read(&ols->ols_pageref) == _PAGEREF_MAGIC);
 
         OBD_SLAB_FREE_PTR(ols, osc_lock_kmem);
 }
 
         OBD_SLAB_FREE_PTR(ols, osc_lock_kmem);
 }
@@ -1599,6 +1603,7 @@ int osc_lock_init(const struct lu_env *env,
         OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, CFS_ALLOC_IO);
         if (clk != NULL) {
                 osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
         OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, CFS_ALLOC_IO);
         if (clk != NULL) {
                 osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
+                cfs_atomic_set(&clk->ols_pageref, 0);
                 clk->ols_state = OLS_NEW;
                 cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
                 result = 0;
                 clk->ols_state = OLS_NEW;
                 cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
                 result = 0;
@@ -1607,5 +1612,26 @@ int osc_lock_init(const struct lu_env *env,
         return result;
 }
 
         return result;
 }
 
+int osc_dlm_lock_pageref(struct ldlm_lock *dlm)
+{
+        struct osc_lock *olock;
+        int              rc = 0;
+
+        cfs_spin_lock(&osc_ast_guard);
+        olock = dlm->l_ast_data;
+        /*
+         * there's a very rare race with osc_page_addref_lock(), but that
+         * doesn't matter because in the worst case we don't cancel a lock
+         * which we actually can, that's no harm.
+         */
+        if (olock != NULL &&
+            cfs_atomic_add_return(_PAGEREF_MAGIC,
+                                  &olock->ols_pageref) != _PAGEREF_MAGIC) {
+                cfs_atomic_sub(_PAGEREF_MAGIC, &olock->ols_pageref);
+                rc = 1;
+        }
+        cfs_spin_unlock(&osc_ast_guard);
+        return rc;
+}
 
 /** @} osc */
 
 /** @} osc */
index 167e3eb..d7f7772 100644 (file)
@@ -162,6 +162,7 @@ static void osc_page_fini(const struct lu_env *env,
 {
         struct osc_page *opg = cl2osc_page(slice);
         CDEBUG(D_TRACE, "%p\n", opg);
 {
         struct osc_page *opg = cl2osc_page(slice);
         CDEBUG(D_TRACE, "%p\n", opg);
+        LASSERT(opg->ops_lock == NULL);
         OBD_SLAB_FREE_PTR(opg, osc_page_kmem);
 }
 
         OBD_SLAB_FREE_PTR(opg, osc_page_kmem);
 }
 
@@ -247,6 +248,48 @@ void osc_index2policy(ldlm_policy_data_t *policy, const struct cl_object *obj,
         policy->l_extent.end   = cl_offset(obj, end + 1) - 1;
 }
 
         policy->l_extent.end   = cl_offset(obj, end + 1) - 1;
 }
 
+static int osc_page_addref_lock(const struct lu_env *env,
+                                struct osc_page *opg,
+                                struct cl_lock *lock)
+{
+        struct osc_lock *olock;
+        int              rc;
+
+        LASSERT(opg->ops_lock == NULL);
+
+        olock = osc_lock_at(lock);
+        if (cfs_atomic_inc_return(&olock->ols_pageref) <= 0) {
+                cfs_atomic_dec(&olock->ols_pageref);
+                cl_lock_put(env, lock);
+                rc = 1;
+        } else {
+                opg->ops_lock = lock;
+                rc = 0;
+        }
+        return rc;
+}
+
+static void osc_page_putref_lock(const struct lu_env *env,
+                                 struct osc_page *opg)
+{
+        struct cl_lock  *lock = opg->ops_lock;
+        struct osc_lock *olock;
+
+        LASSERT(lock != NULL);
+        olock = osc_lock_at(lock);
+
+        cfs_atomic_dec(&olock->ols_pageref);
+        opg->ops_lock = NULL;
+
+        /*
+         * Note: usually this won't be the last reference of the lock, but if
+         * it is, then all the lock_put do is at most just freeing some memory,
+         * so it would be OK that caller is holding spinlocks.
+         */
+        LASSERT(cfs_atomic_read(&lock->cll_ref) > 1 || olock->ols_hold == 0);
+        cl_lock_put(env, lock);
+}
+
 static int osc_page_is_under_lock(const struct lu_env *env,
                                   const struct cl_page_slice *slice,
                                   struct cl_io *unused)
 static int osc_page_is_under_lock(const struct lu_env *env,
                                   const struct cl_page_slice *slice,
                                   struct cl_io *unused)
@@ -257,14 +300,34 @@ static int osc_page_is_under_lock(const struct lu_env *env,
         ENTRY;
         lock = cl_lock_at_page(env, slice->cpl_obj, slice->cpl_page,
                                NULL, 1, 0);
         ENTRY;
         lock = cl_lock_at_page(env, slice->cpl_obj, slice->cpl_page,
                                NULL, 1, 0);
-        if (lock != NULL) {
-                cl_lock_put(env, lock);
+        if (lock != NULL &&
+            osc_page_addref_lock(env, cl2osc_page(slice), lock) == 0)
                 result = -EBUSY;
                 result = -EBUSY;
-        else
+        else
                 result = -ENODATA;
         RETURN(result);
 }
 
                 result = -ENODATA;
         RETURN(result);
 }
 
+static void osc_page_disown(const struct lu_env *env,
+                            const struct cl_page_slice *slice,
+                            struct cl_io *io)
+{
+        struct osc_page *opg = cl2osc_page(slice);
+
+        if (unlikely(opg->ops_lock))
+                osc_page_putref_lock(env, opg);
+}
+
+static void osc_page_completion_read(const struct lu_env *env,
+                                     const struct cl_page_slice *slice,
+                                     int ioret)
+{
+        struct osc_page *opg = cl2osc_page(slice);
+
+        if (likely(opg->ops_lock))
+                osc_page_putref_lock(env, opg);
+}
+
 static int osc_page_fail(const struct lu_env *env,
                          const struct cl_page_slice *slice,
                          struct cl_io *unused)
 static int osc_page_fail(const struct lu_env *env,
                          const struct cl_page_slice *slice,
                          struct cl_io *unused)
@@ -410,12 +473,14 @@ static const struct cl_page_operations osc_page_ops = {
         .cpo_print         = osc_page_print,
         .cpo_delete        = osc_page_delete,
         .cpo_is_under_lock = osc_page_is_under_lock,
         .cpo_print         = osc_page_print,
         .cpo_delete        = osc_page_delete,
         .cpo_is_under_lock = osc_page_is_under_lock,
+        .cpo_disown        = osc_page_disown,
         .io = {
                 [CRT_READ] = {
         .io = {
                 [CRT_READ] = {
-                        .cpo_cache_add = osc_page_fail
+                        .cpo_cache_add  = osc_page_fail,
+                        .cpo_completion = osc_page_completion_read
                 },
                 [CRT_WRITE] = {
                 },
                 [CRT_WRITE] = {
-                        .cpo_cache_add = osc_page_cache_add
+                        .cpo_cache_add  = osc_page_cache_add
                 }
         },
         .cpo_clip           = osc_page_clip,
                 }
         },
         .cpo_clip           = osc_page_clip,
index 404bca2..7144f0b 100644 (file)
@@ -4362,6 +4362,32 @@ static int osc_import_event(struct obd_device *obd,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
+/**
+ * Determine whether the lock can be canceled before replaying the lock
+ * during recovery, see bug16774 for detailed information.
+ *
+ * \retval zero the lock can't be canceled
+ * \retval other ok to cancel
+ */
+static int osc_cancel_for_recovery(struct ldlm_lock *lock)
+{
+        check_res_locked(lock->l_resource);
+
+        /*
+         * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
+         *
+         * XXX as a future improvement, we can also cancel unused write lock
+         * if it doesn't have dirty data and active mmaps.
+         */
+        if (lock->l_resource->lr_type == LDLM_EXTENT &&
+            (lock->l_granted_mode == LCK_PR ||
+             lock->l_granted_mode == LCK_CR) &&
+            (osc_dlm_lock_pageref(lock) == 0))
+                RETURN(1);
+
+        RETURN(0);
+}
+
 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
         int rc;
 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
         int rc;
@@ -4400,6 +4426,8 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 
                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
                 cfs_sema_init(&cli->cl_grant_sem, 1);
 
                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
                 cfs_sema_init(&cli->cl_grant_sem, 1);
+
+                ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
         }
 
         RETURN(rc);
         }
 
         RETURN(rc);
index e8c790a..a7e3512 100755 (executable)
@@ -2097,6 +2097,37 @@ test_85a() { #bug 16774
 }
 run_test 85a "check the cancellation of unused locks during recovery(IBITS)"
 
 }
 run_test 85a "check the cancellation of unused locks during recovery(IBITS)"
 
+test_85b() { #bug 16774
+    lctl set_param -n ldlm.cancel_unused_locks_before_replay "1"
+
+    lfs setstripe -o 0 -c 1 $DIR
+
+    for i in `seq 100`; do
+        dd if=/dev/urandom of=$DIR/$tfile-$i bs=4096 count=32 >/dev/null 2>&1
+    done
+
+    cancel_lru_locks osc
+
+    for i in `seq 100`; do
+        dd if=$DIR/$tfile-$i of=/dev/null bs=4096 count=32 >/dev/null 2>&1
+    done
+
+    lov_id=`lctl dl | grep "clilov"`
+    addr=`echo $lov_id | awk '{print $4}' | awk -F '-' '{print $3}'`
+    count=`lctl get_param -n ldlm.namespaces.*OST0000*$addr.lock_unused_count`
+    echo "before recovery: unused locks count = $count"
+
+    fail ost1
+
+    count2=`lctl get_param -n ldlm.namespaces.*OST0000*$addr.lock_unused_count`
+    echo "after recovery: unused locks count = $count2"
+
+    if [ $count2 -ge $count ]; then
+        error "unused locks are not canceled"
+    fi
+}
+run_test 85b "check the cancellation of unused locks during recovery(EXTENT)"
+
 test_86() {
         local clients=${CLIENTS:-$HOSTNAME}
 
 test_86() {
         local clients=${CLIENTS:-$HOSTNAME}