Whamcloud - gitweb
LU-3321 osc: add weight function for DLM lock 94/7894/16
authorJinshan Xiong <jinshan.xiong@intel.com>
Wed, 6 Nov 2013 04:42:40 +0000 (20:42 -0800)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 2 Dec 2013 15:49:09 +0000 (15:49 +0000)
Use weigh_ast to decide if a lock covers any pages.
In recovery, weigh_ast will be used to decide if a DLM read lock
covers any locked pages, or it will be canceled instead being
recovered.

The problem with the original implementation is that it attached
each osc_page to an osc_lock also changed lock state to add every
pages for readahead.

Bugzilla-bug-Id: b=16774

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: I34efb0db52574c8cf5a7165d80ed527c80ba4031
Reviewed-on: http://review.whamcloud.com/7894
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Bobi Jam <bobijam@gmail.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/ldlm/ldlm_request.c
lustre/osc/osc_cl_internal.h
lustre/osc/osc_internal.h
lustre/osc/osc_lock.c
lustre/osc/osc_page.c
lustre/osc/osc_request.c

index 74087e0..369b8df 100644 (file)
@@ -1448,25 +1448,27 @@ static ldlm_policy_res_t ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns,
                                                     int unused, int added,
                                                     int count)
 {
-        ldlm_policy_res_t result = LDLM_POLICY_CANCEL_LOCK;
-        ldlm_cancel_for_recovery cb = ns->ns_cancel_for_recovery;
-        lock_res_and_lock(lock);
-
-        /* don't check added & count since we want to process all locks
-         * from unused list */
-        switch (lock->l_resource->lr_type) {
-                case LDLM_EXTENT:
-                case LDLM_IBITS:
-                        if (cb && cb(lock))
-                                break;
-                default:
-                        result = LDLM_POLICY_SKIP_LOCK;
+       ldlm_policy_res_t result = LDLM_POLICY_CANCEL_LOCK;
+       ldlm_cancel_for_recovery cb = ns->ns_cancel_for_recovery;
+
+       /* don't check added & count since we want to process all locks
+        * from unused list.
+        * It's fine to not take lock to access lock->l_resource since
+        * the lock has already been granted so it won't change. */
+       switch (lock->l_resource->lr_type) {
+               case LDLM_EXTENT:
+               case LDLM_IBITS:
+                       if (cb && cb(lock))
+                               break;
+               default:
+                       result = LDLM_POLICY_SKIP_LOCK;
+                       lock_res_and_lock(lock);
                        ldlm_set_skipped(lock);
-                        break;
-        }
+                       unlock_res_and_lock(lock);
+                       break;
+       }
 
-        unlock_res_and_lock(lock);
-        RETURN(result);
+       RETURN(result);
 }
 
 /**
index f23f927..406cafe 100644 (file)
@@ -276,16 +276,6 @@ struct osc_lock {
         enum osc_lock_state      ols_state;
 
         /**
-         * How many pages are using this lock for io, currently only used by
-         * read-ahead. If non-zero, the underlying dlm lock won't be cancelled
-         * during recovery to avoid deadlock. see bz16774.
-         *
-         * \see osc_page::ops_lock
-         * \see osc_page_addref_lock(), osc_page_putref_lock()
-         */
-        cfs_atomic_t             ols_pageref;
-
-        /**
          * true, if ldlm_lock_addref() was called against
          * osc_lock::ols_lock. This is used for sanity checking.
          *
@@ -402,16 +392,6 @@ struct osc_page {
         * Submit time - the time when the page is starting RPC. For debugging.
         */
        cfs_time_t            ops_submit_time;
-
-        /**
-         * A lock of which we hold a reference covers this page. Only used by
-         * read-ahead: for a readahead page, we hold it's covering lock to
-         * prevent it from being canceled during recovery.
-         *
-         * \see osc_lock::ols_pageref
-         * \see osc_page_addref_lock(), osc_page_putref_lock().
-         */
-        struct cl_lock       *ops_lock;
 };
 
 extern struct kmem_cache *osc_lock_kmem;
index 5c4e785..0772c2b 100644 (file)
@@ -140,6 +140,7 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
 int osc_lru_reclaim(struct client_obd *cli);
 
 extern spinlock_t osc_ast_guard;
+unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock);
 
 int osc_cleanup(struct obd_device *obd);
 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg);
@@ -194,8 +195,6 @@ static inline struct osc_device *obd2osc_dev(const struct obd_device *d)
         return container_of0(d->obd_lu_dev, struct osc_device, od_cl.cd_lu_dev);
 }
 
-int osc_dlm_lock_pageref(struct ldlm_lock *dlm);
-
 extern struct kmem_cache *osc_quota_kmem;
 struct osc_quota_info {
         /** linkage for quota hash table */
index 78f3ceb..e06a900 100644 (file)
@@ -55,8 +55,6 @@
  *  @{ 
  */
 
-#define _PAGEREF_MAGIC  (-10000000)
-
 /*****************************************************************************
  *
  * Type conversions.
@@ -253,9 +251,6 @@ static void osc_lock_fini(const struct lu_env *env,
          */
         osc_lock_unhold(ols);
         LASSERT(ols->ols_lock == NULL);
-        LASSERT(cfs_atomic_read(&ols->ols_pageref) == 0 ||
-                cfs_atomic_read(&ols->ols_pageref) == _PAGEREF_MAGIC);
-
         OBD_SLAB_FREE_PTR(ols, osc_lock_kmem);
 }
 
@@ -908,11 +903,90 @@ static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
         return result;
 }
 
-static unsigned long osc_lock_weigh(const struct lu_env *env,
-                                    const struct cl_lock_slice *slice)
+static int weigh_cb(const struct lu_env *env, struct cl_io *io,
+                   struct osc_page *ops, void *cbdata)
+{
+       struct cl_page *page = ops->ops_cl.cpl_page;
+
+       if (cl_page_is_vmlocked(env, page)) {
+               (*(unsigned long *)cbdata)++;
+               return CLP_GANG_ABORT;
+       }
+
+       return CLP_GANG_OKAY;
+}
+
+static unsigned long osc_lock_weight(const struct lu_env *env,
+                                    const struct osc_lock *ols)
+{
+       struct cl_io *io = &osc_env_info(env)->oti_io;
+       struct cl_lock_descr *descr = &ols->ols_cl.cls_lock->cll_descr;
+       struct cl_object *obj = ols->ols_cl.cls_obj;
+       unsigned long npages = 0;
+       int result;
+       ENTRY;
+
+       io->ci_obj = cl_object_top(obj);
+       io->ci_ignore_layout = 1;
+       result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
+       if (result != 0)
+               RETURN(result);
+
+       do {
+               result = osc_page_gang_lookup(env, io, cl2osc(obj),
+                                             descr->cld_start, descr->cld_end,
+                                             weigh_cb, (void *)&npages);
+               if (result == CLP_GANG_ABORT)
+                       break;
+               if (result == CLP_GANG_RESCHED)
+                       cond_resched();
+       } while (result != CLP_GANG_OKAY);
+       cl_io_fini(env, io);
+
+       return npages;
+}
+
+/**
+ * Get the weight of dlm lock for early cancellation.
+ */
+unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
 {
-       /* TODO: check how many pages are covered by this lock */
-       return cl2osc(slice->cls_obj)->oo_npages;
+       struct cl_env_nest       nest;
+       struct lu_env           *env;
+       struct osc_lock         *lock;
+       unsigned long            weight;
+       ENTRY;
+
+       might_sleep();
+       /*
+        * osc_ldlm_weigh_ast has a complex context since it might be called
+        * because of lock canceling, or from user's input. We have to make
+        * a new environment for it. Probably it is implementation safe to use
+        * the upper context because cl_lock_put don't modify environment
+        * variables. But just in case ..
+        */
+       env = cl_env_nested_get(&nest);
+       if (IS_ERR(env))
+               /* Mostly because lack of memory, do not eliminate this lock */
+               RETURN(1);
+
+       LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT);
+       lock = osc_ast_data_get(dlmlock);
+       if (lock == NULL) {
+               /* cl_lock was destroyed because of memory pressure.
+                * It is much reasonable to assign this type of lock
+                * a lower cost.
+                */
+               GOTO(out, weight = 0);
+       }
+
+       weight = osc_lock_weight(env, lock);
+       osc_ast_data_put(env, lock);
+       EXIT;
+
+out:
+       cl_env_nested_put(&nest, env);
+       return weight;
 }
 
 static void osc_lock_build_einfo(const struct lu_env *env,
@@ -1548,7 +1622,6 @@ static const struct cl_lock_operations osc_lock_ops = {
         .clo_delete  = osc_lock_delete,
         .clo_state   = osc_lock_state,
         .clo_cancel  = osc_lock_cancel,
-        .clo_weigh   = osc_lock_weigh,
         .clo_print   = osc_lock_print,
         .clo_fits_into = osc_lock_fits_into,
 };
@@ -1649,7 +1722,6 @@ int osc_lock_init(const struct lu_env *env,
                __u32 enqflags = lock->cll_descr.cld_enq_flags;
 
                osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
-               cfs_atomic_set(&clk->ols_pageref, 0);
                clk->ols_state = OLS_NEW;
 
                clk->ols_flags = osc_enq2ldlm_flags(enqflags);
@@ -1676,26 +1748,4 @@ int osc_lock_init(const struct lu_env *env,
        return result;
 }
 
-int osc_dlm_lock_pageref(struct ldlm_lock *dlm)
-{
-       struct osc_lock *olock;
-       int              rc = 0;
-
-       spin_lock(&osc_ast_guard);
-        olock = dlm->l_ast_data;
-        /*
-         * there's a very rare race with osc_page_addref_lock(), but that
-         * doesn't matter because in the worst case we don't cancel a lock
-         * which we actually can, that's no harm.
-         */
-        if (olock != NULL &&
-            cfs_atomic_add_return(_PAGEREF_MAGIC,
-                                  &olock->ols_pageref) != _PAGEREF_MAGIC) {
-                cfs_atomic_sub(_PAGEREF_MAGIC, &olock->ols_pageref);
-                rc = 1;
-        }
-       spin_unlock(&osc_ast_guard);
-       return rc;
-}
-
 /** @} osc */
index 2b405f4..a4dc541 100644 (file)
@@ -166,9 +166,6 @@ static int osc_page_protected(const struct lu_env *env,
 static void osc_page_fini(const struct lu_env *env,
                           struct cl_page_slice *slice)
 {
-        struct osc_page *opg = cl2osc_page(slice);
-        CDEBUG(D_TRACE, "%p\n", opg);
-        LASSERT(opg->ops_lock == NULL);
 }
 
 static void osc_page_transfer_get(struct osc_page *opg, const char *label)
@@ -240,42 +237,6 @@ void osc_index2policy(ldlm_policy_data_t *policy, const struct cl_object *obj,
         policy->l_extent.end   = cl_offset(obj, end + 1) - 1;
 }
 
-static int osc_page_addref_lock(const struct lu_env *env,
-                                struct osc_page *opg,
-                                struct cl_lock *lock)
-{
-        struct osc_lock *olock;
-        int              rc;
-
-        LASSERT(opg->ops_lock == NULL);
-
-        olock = osc_lock_at(lock);
-        if (cfs_atomic_inc_return(&olock->ols_pageref) <= 0) {
-                cfs_atomic_dec(&olock->ols_pageref);
-                rc = -ENODATA;
-        } else {
-               cl_lock_get(lock);
-                opg->ops_lock = lock;
-                rc = 0;
-        }
-        return rc;
-}
-
-static void osc_page_putref_lock(const struct lu_env *env,
-                                 struct osc_page *opg)
-{
-        struct cl_lock  *lock = opg->ops_lock;
-        struct osc_lock *olock;
-
-        LASSERT(lock != NULL);
-        olock = osc_lock_at(lock);
-
-        cfs_atomic_dec(&olock->ols_pageref);
-        opg->ops_lock = NULL;
-
-        cl_lock_put(env, lock);
-}
-
 static int osc_page_is_under_lock(const struct lu_env *env,
                                   const struct cl_page_slice *slice,
                                   struct cl_io *unused)
@@ -287,39 +248,12 @@ static int osc_page_is_under_lock(const struct lu_env *env,
         lock = cl_lock_at_page(env, slice->cpl_obj, slice->cpl_page,
                                NULL, 1, 0);
         if (lock != NULL) {
-               if (osc_page_addref_lock(env, cl2osc_page(slice), lock) == 0)
-                       result = -EBUSY;
                cl_lock_put(env, lock);
+               result = -EBUSY;
        }
         RETURN(result);
 }
 
-static void osc_page_disown(const struct lu_env *env,
-                            const struct cl_page_slice *slice,
-                            struct cl_io *io)
-{
-        struct osc_page *opg = cl2osc_page(slice);
-
-        if (unlikely(opg->ops_lock))
-                osc_page_putref_lock(env, opg);
-}
-
-static void osc_page_completion_read(const struct lu_env *env,
-                                     const struct cl_page_slice *slice,
-                                     int ioret)
-{
-       struct osc_page   *opg = cl2osc_page(slice);
-
-       if (likely(opg->ops_lock))
-               osc_page_putref_lock(env, opg);
-}
-
-static void osc_page_completion_write(const struct lu_env *env,
-                                     const struct cl_page_slice *slice,
-                                     int ioret)
-{
-}
-
 static const char *osc_list(cfs_list_t *head)
 {
         return cfs_list_empty(head) ? "-" : "+";
@@ -475,15 +409,6 @@ static const struct cl_page_operations osc_page_ops = {
        .cpo_print         = osc_page_print,
        .cpo_delete        = osc_page_delete,
        .cpo_is_under_lock = osc_page_is_under_lock,
-       .cpo_disown        = osc_page_disown,
-       .io = {
-               [CRT_READ] = {
-                       .cpo_completion = osc_page_completion_read
-               },
-               [CRT_WRITE] = {
-                       .cpo_completion = osc_page_completion_write
-               }
-       },
        .cpo_clip           = osc_page_clip,
        .cpo_cancel         = osc_page_cancel,
        .cpo_flush          = osc_page_flush
index b40c456..4cb6a05 100644 (file)
@@ -3493,21 +3493,18 @@ static int osc_import_event(struct obd_device *obd,
  */
 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
 {
-        check_res_locked(lock->l_resource);
-
-        /*
-         * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
-         *
-         * XXX as a future improvement, we can also cancel unused write lock
-         * if it doesn't have dirty data and active mmaps.
-         */
-        if (lock->l_resource->lr_type == LDLM_EXTENT &&
-            (lock->l_granted_mode == LCK_PR ||
-             lock->l_granted_mode == LCK_CR) &&
-            (osc_dlm_lock_pageref(lock) == 0))
-                RETURN(1);
+       /*
+        * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
+        *
+        * XXX as a future improvement, we can also cancel unused write lock
+        * if it doesn't have dirty data and active mmaps.
+        */
+       if (lock->l_resource->lr_type == LDLM_EXTENT &&
+           (lock->l_granted_mode == LCK_PR || lock->l_granted_mode == LCK_CR)&&
+           osc_ldlm_weigh_ast(lock) == 0)
+               RETURN(1);
 
-        RETURN(0);
+       RETURN(0);
 }
 
 static int brw_queue_work(const struct lu_env *env, void *data)