Whamcloud - gitweb
LU-948 clio: add a callback to cl_page_gang_lookup()
authorJinshan Xiong <jinshan.xiong@whamcloud.com>
Thu, 12 Jan 2012 00:03:41 +0000 (16:03 -0800)
committerOleg Drokin <green@whamcloud.com>
Fri, 20 Jan 2012 19:22:11 +0000 (14:22 -0500)
Add a callback to cl_page_gang_lookup() so that it will be easier to
fix this issue and be helpful for new IO engine.

If a read lock is being canceled, we used to grab page lock and then
check if they are covered by another lock, otherwise they will be
discarded. This is unnecessary because we can do this w/o grabbing
page lock.

With the above fix, when a read-ahead page is in IO during recovery,
and one of covering locks is being canceled by early cancel for
recovery, it will detect that this page is being covered by another
one, and then this page will be skipped w/o trying to grab page lock.

Signed-off-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Change-Id: I22a3ea0790f5c0e01c12c29208b6d60c38058f12
Reviewed-on: http://review.whamcloud.com/1955
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/cl_object.h
lustre/obdclass/cl_internal.h
lustre/obdclass/cl_lock.c
lustre/obdclass/cl_page.c
lustre/osc/osc_lock.c

index b4c9bd8..25e518b 100644 (file)
@@ -2673,15 +2673,19 @@ static inline int cl_object_same(struct cl_object *o0, struct cl_object *o1)
  * @{ */
 enum {
         CLP_GANG_OKAY = 0,
+        CLP_GANG_RESCHED,
         CLP_GANG_AGAIN,
-        CLP_GANG_RESCHED
+        CLP_GANG_ABORT
 };
 
+/* callback of cl_page_gang_lookup() */
+typedef int   (*cl_page_gang_cb_t)  (const struct lu_env *, struct cl_io *,
+                                     struct cl_page *, void *);
 int             cl_page_gang_lookup (const struct lu_env *env,
                                      struct cl_object *obj,
                                      struct cl_io *io,
                                      pgoff_t start, pgoff_t end,
-                                     struct cl_page_list *plist);
+                                     cl_page_gang_cb_t cb, void *cbdata);
 struct cl_page *cl_page_lookup      (struct cl_object_header *hdr,
                                      pgoff_t index);
 struct cl_page *cl_page_find        (const struct lu_env *env,
index c1d543b..b1c360b 100644 (file)
@@ -109,6 +109,11 @@ struct cl_thread_info {
          * Used for submitting a sync io.
          */
         struct cl_sync_io    clt_anchor;
+        /**
+         * Fields used by cl_lock_page_out().
+         */
+        pgoff_t              clt_next_index;
+        pgoff_t              clt_fn_index; /* first non-overlapped index */
 };
 
 struct cl_thread_info *cl_env_info(const struct lu_env *env);
index 7d8fc62..70d94a9 100644 (file)
@@ -1887,65 +1887,79 @@ struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj,
 EXPORT_SYMBOL(cl_lock_at_page);
 
 /**
- * Returns a list of pages protected (only) by a given lock.
- *
- * Scans an extent of page radix tree, corresponding to the \a lock and queues
- * all pages that are not protected by locks other than \a lock into \a queue.
+ * Calculate the page offset at the layer of @lock.
+ * At the time of this writing, @page is top page and @lock is sub lock.
  */
-void cl_lock_page_list_fixup(const struct lu_env *env,
-                             struct cl_io *io, struct cl_lock *lock,
-                             struct cl_page_list *queue)
+static pgoff_t pgoff_at_lock(struct cl_page *page, struct cl_lock *lock)
 {
-        struct cl_page        *page;
-        struct cl_page        *temp;
-        struct cl_page_list   *plist = &cl_env_info(env)->clt_list;
-
-        LINVRNT(cl_lock_invariant(env, lock));
-        ENTRY;
+        struct lu_device_type *dtype;
+        const struct cl_page_slice *slice;
 
-        /* No need to fix for WRITE lock because it is exclusive. */
-        if (lock->cll_descr.cld_mode >= CLM_WRITE)
-                RETURN_EXIT;
+        dtype = lock->cll_descr.cld_obj->co_lu.lo_dev->ld_type;
+        slice = cl_page_at(page, dtype);
+        LASSERT(slice != NULL);
+        return slice->cpl_page->cp_index;
+}
 
-        /* For those pages who are still covered by other PR locks, we should
-         * not discard them otherwise a [0, EOF) PR lock will discard all
-         * pages.
-         */
-        cl_page_list_init(plist);
-        cl_page_list_for_each_safe(page, temp, queue) {
-                pgoff_t                idx = page->cp_index;
-                struct cl_lock        *found;
-                struct cl_lock_descr  *descr;
-
-                /* The algorithm counts on the index-ascending page index. */
-                LASSERT(ergo(&temp->cp_batch != &queue->pl_pages,
-                        page->cp_index < temp->cp_index));
-
-                found = cl_lock_at_page(env, lock->cll_descr.cld_obj,
-                                        page, lock, 1, 0);
-                if (found == NULL)
-                        continue;
-
-                descr = &found->cll_descr;
-                cfs_list_for_each_entry_safe_from(page, temp, &queue->pl_pages,
-                                                  cp_batch) {
-                        idx = page->cp_index;
-                        if (descr->cld_start > idx || descr->cld_end < idx)
-                                break;
-                        cl_page_list_move(plist, queue, page);
+/**
+ * Check if page @page is covered by an extra lock or discard it.
+ */
+static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
+                                struct cl_page *page, void *cbdata)
+{
+        struct cl_thread_info *info = cl_env_info(env);
+        struct cl_lock *lock = cbdata;
+        pgoff_t index = pgoff_at_lock(page, lock);
+
+        if (index >= info->clt_fn_index) {
+                struct cl_lock *tmp;
+
+                /* refresh non-overlapped index */
+                tmp = cl_lock_at_page(env, lock->cll_descr.cld_obj, page, lock,
+                                      1, 0);
+                if (tmp != NULL) {
+                        /* Cache the first-non-overlapped index so as to skip
+                         * all pages within [index, clt_fn_index). This
+                         * is safe because if tmp lock is canceled, it will
+                         * discard these pages. */
+                        info->clt_fn_index = tmp->cll_descr.cld_end + 1;
+                        if (tmp->cll_descr.cld_end == CL_PAGE_EOF)
+                                info->clt_fn_index = CL_PAGE_EOF;
+                        cl_lock_put(env, tmp);
+                } else { /* discard the page */
+                        cl_page_own(env, io, page);
+                        cl_page_unmap(env, io, page);
+                        cl_page_discard(env, io, page);
+                        cl_page_disown(env, io, page);
                 }
-                cl_lock_put(env, found);
         }
 
-        /* The pages in plist are covered by other locks, don't handle them
-         * this time.
-         */
-        if (io != NULL)
-                cl_page_list_disown(env, io, plist);
-        cl_page_list_fini(env, plist);
-        EXIT;
+        info->clt_next_index = index + 1;
+        return CLP_GANG_OKAY;
+}
+
+static int pageout_cb(const struct lu_env *env, struct cl_io *io,
+                      struct cl_page *page, void *cbdata)
+{
+        struct cl_thread_info *info  = cl_env_info(env);
+        struct cl_page_list   *queue = &info->clt_queue.c2_qin;
+        struct cl_lock        *lock  = cbdata;
+        typeof(cl_page_own)   *page_own;
+        int rc = CLP_GANG_OKAY;
+
+        page_own = queue->pl_nr ? cl_page_own_try : cl_page_own;
+        if (page_own(env, io, page) == 0) {
+                cl_page_list_add(queue, page);
+                info->clt_next_index = pgoff_at_lock(page, lock) + 1;
+        } else if (page->cp_state != CPS_FREEING) {
+                /* cl_page_own() won't fail unless
+                 * the page is being freed. */
+                LASSERT(queue->pl_nr != 0);
+                rc = CLP_GANG_AGAIN;
+        }
+
+        return rc;
 }
-EXPORT_SYMBOL(cl_lock_page_list_fixup);
 
 /**
  * Invalidate pages protected by the given lock, sending them out to the
@@ -1976,9 +1990,8 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock,
         struct cl_io          *io    = &info->clt_io;
         struct cl_2queue      *queue = &info->clt_queue;
         struct cl_lock_descr  *descr = &lock->cll_descr;
-        struct lu_device_type *dtype;
+        cl_page_gang_cb_t      cb;
         long page_count;
-        pgoff_t next_index;
         int res;
         int result;
 
@@ -1990,44 +2003,39 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock,
         if (result != 0)
                 GOTO(out, result);
 
-        dtype = descr->cld_obj->co_lu.lo_dev->ld_type;
-        next_index = descr->cld_start;
+        cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : pageout_cb;
+        info->clt_fn_index = info->clt_next_index = descr->cld_start;
         do {
-                const struct cl_page_slice *slice;
-
                 cl_2queue_init(queue);
                 res = cl_page_gang_lookup(env, descr->cld_obj, io,
-                                          next_index, descr->cld_end,
-                                          &queue->c2_qin);
+                                          info->clt_next_index, descr->cld_end,
+                                          cb, (void *)lock);
                 page_count = queue->c2_qin.pl_nr;
-                if (page_count == 0)
-                        break;
-
-                /* cl_page_gang_lookup() uses subobj and sublock to look for
-                 * covered pages, but @queue->c2_qin contains the list of top
-                 * pages. We have to turn the page back to subpage so as to
-                 * get `correct' next index. -jay */
-                slice = cl_page_at(cl_page_list_last(&queue->c2_qin), dtype);
-                next_index = slice->cpl_page->cp_index + 1;
-
-                result = cl_page_list_unmap(env, io, &queue->c2_qin);
-                if (!discard) {
-                        long timeout = 600; /* 10 minutes. */
-                        /* for debug purpose, if this request can't be
-                         * finished in 10 minutes, we hope it can notify us.
-                         */
-                        result = cl_io_submit_sync(env, io, CRT_WRITE, queue,
-                                                   CRP_CANCEL, timeout);
-                        if (result)
-                                CWARN("Writing %lu pages error: %d\n",
-                                      page_count, result);
+                if (page_count > 0) {
+                        /* must be writeback case */
+                        LASSERTF(descr->cld_mode >= CLM_WRITE, "lock mode %s\n",
+                                 cl_lock_mode_name(descr->cld_mode));
+
+                        result = cl_page_list_unmap(env, io, &queue->c2_qin);
+                        if (!discard) {
+                                long timeout = 600; /* 10 minutes. */
+                                /* for debug purpose, if this request can't be
+                                 * finished in 10 minutes, we hope it can
+                                 * notify us.
+                                 */
+                                result = cl_io_submit_sync(env, io, CRT_WRITE,
+                                                           queue, CRP_CANCEL,
+                                                           timeout);
+                                if (result)
+                                        CWARN("Writing %lu pages error: %d\n",
+                                              page_count, result);
+                        }
+                        cl_2queue_discard(env, io, queue);
+                        cl_2queue_disown(env, io, queue);
+                        cl_2queue_fini(env, queue);
                 }
-                cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
-                cl_2queue_discard(env, io, queue);
-                cl_2queue_disown(env, io, queue);
-                cl_2queue_fini(env, queue);
 
-                if (next_index > descr->cld_end)
+                if (info->clt_next_index > descr->cld_end)
                         break;
 
                 if (res == CLP_GANG_RESCHED)
index 5c8973f..002bc08 100644 (file)
@@ -192,7 +192,7 @@ EXPORT_SYMBOL(cl_page_lookup);
  */
 int cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
                         struct cl_io *io, pgoff_t start, pgoff_t end,
-                        struct cl_page_list *queue)
+                        cl_page_gang_cb_t cb, void *cbdata)
 {
         struct cl_object_header *hdr;
         struct cl_page          *page;
@@ -245,7 +245,7 @@ int cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
                          */
                         cl_page_get_trust(page);
                         lu_ref_add_atomic(&page->cp_reference,
-                                          "page_list", cfs_current());
+                                          "gang_lookup", cfs_current());
                         pvec[j++] = page;
                 }
 
@@ -262,30 +262,16 @@ int cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
 
                 for (i = 0; i < j; ++i) {
                         page = pvec[i];
-                        if (res == CLP_GANG_OKAY) {
-                                typeof(cl_page_own) *page_own;
-
-                                page_own = queue->pl_nr ?
-                                           cl_page_own_try : cl_page_own;
-                                if (page_own(env, io, page) == 0) {
-                                        cl_page_list_add(queue, page);
-                                } else if (page->cp_state != CPS_FREEING) {
-                                        /* cl_page_own() won't fail unless
-                                         * the page is being freed. */
-                                        LASSERT(queue->pl_nr != 0);
-                                        res = CLP_GANG_AGAIN;
-                                }
-                        }
+                        if (res == CLP_GANG_OKAY)
+                                res = (*cb)(env, io, page, cbdata);
                         lu_ref_del(&page->cp_reference,
-                                   "page_list", cfs_current());
+                                   "gang_lookup", cfs_current());
                         cl_page_put(env, page);
                 }
                 if (nr < CLT_PVEC_SIZE || end_of_region)
                         break;
 
-                /* if the number of pages is zero, this will mislead the caller
-                 * that there is no page any more. */
-                if (queue->pl_nr && cfs_need_resched())
+                if (res == CLP_GANG_OKAY && cfs_need_resched())
                         res = CLP_GANG_RESCHED;
                 if (res != CLP_GANG_OKAY)
                         break;
@@ -1476,6 +1462,16 @@ int cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io,
 }
 EXPORT_SYMBOL(cl_page_is_under_lock);
 
+static int page_prune_cb(const struct lu_env *env, struct cl_io *io,
+                         struct cl_page *page, void *cbdata)
+{
+        cl_page_own(env, io, page);
+        cl_page_unmap(env, io, page);
+        cl_page_discard(env, io, page);
+        cl_page_disown(env, io, page);
+        return CLP_GANG_OKAY;
+}
+
 /**
  * Purges all cached pages belonging to the object \a obj.
  */
@@ -1484,12 +1480,10 @@ int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj)
         struct cl_thread_info   *info;
         struct cl_object        *obj = cl_object_top(clobj);
         struct cl_io            *io;
-        struct cl_page_list     *plist;
         int                      result;
 
         ENTRY;
         info  = cl_env_info(env);
-        plist = &info->clt_list;
         io    = &info->clt_io;
 
         /*
@@ -1504,18 +1498,8 @@ int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj)
         }
 
         do {
-                cl_page_list_init(plist);
                 result = cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF,
-                                             plist);
-                /*
-                 * Since we're purging the pages of an object, we don't care
-                 * the possible outcomes of the following functions.
-                 */
-                cl_page_list_unmap(env, io, plist);
-                cl_page_list_discard(env, io, plist);
-                cl_page_list_disown(env, io, plist);
-                cl_page_list_fini(env, plist);
-
+                                             page_prune_cb, NULL);
                 if (result == CLP_GANG_RESCHED)
                         cfs_cond_resched();
         } while (result != CLP_GANG_OKAY);
index 9aa7f59..f239956 100644 (file)
@@ -1390,11 +1390,27 @@ static void osc_lock_cancel(const struct lu_env *env,
         osc_lock_detach(env, olck);
 }
 
-void cl_lock_page_list_fixup(const struct lu_env *env,
-                             struct cl_io *io, struct cl_lock *lock,
-                             struct cl_page_list *queue);
-
 #ifdef INVARIANT_CHECK
+static int check_cb(const struct lu_env *env, struct cl_io *io,
+                    struct cl_page *page, void *cbdata)
+{
+        struct cl_lock *lock = cbdata;
+
+        if (lock->cll_descr.cld_mode == CLM_READ) {
+                struct cl_lock *tmp;
+                tmp = cl_lock_at_page(env, lock->cll_descr.cld_obj,
+                                     page, lock, 1, 0);
+                if (tmp != NULL) {
+                        cl_lock_put(env, tmp);
+                        return CLP_GANG_OKAY;
+                }
+        }
+
+        CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n");
+        CL_PAGE_DEBUG(D_ERROR, env, page, "\n");
+        return CLP_GANG_ABORT;
+}
+
 /**
  * Returns true iff there are pages under \a olck not protected by other
  * locks.
@@ -1405,44 +1421,39 @@ static int osc_lock_has_pages(struct osc_lock *olck)
         struct cl_lock_descr *descr;
         struct cl_object     *obj;
         struct osc_object    *oob;
-        struct cl_page_list  *plist;
-        struct cl_page       *page;
         struct cl_env_nest    nest;
         struct cl_io         *io;
         struct lu_env        *env;
         int                   result;
 
         env = cl_env_nested_get(&nest);
-        if (!IS_ERR(env)) {
-                obj   = olck->ols_cl.cls_obj;
-                oob   = cl2osc(obj);
-                io    = &oob->oo_debug_io;
-                lock  = olck->ols_cl.cls_lock;
-                descr = &lock->cll_descr;
-                plist = &osc_env_info(env)->oti_plist;
-                cl_page_list_init(plist);
-
-                cfs_mutex_lock(&oob->oo_debug_mutex);
-
-                io->ci_obj = cl_object_top(obj);
-                cl_io_init(env, io, CIT_MISC, io->ci_obj);
-                cl_page_gang_lookup(env, obj, io,
-                                    descr->cld_start, descr->cld_end, plist);
-                cl_lock_page_list_fixup(env, io, lock, plist);
-                if (plist->pl_nr > 0) {
-                        CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n");
-                        cl_page_list_for_each(page, plist)
-                                CL_PAGE_DEBUG(D_ERROR, env, page, "\n");
-                }
-                result = plist->pl_nr > 0;
-                cl_page_list_disown(env, io, plist);
-                cl_page_list_fini(env, plist);
-                cl_io_fini(env, io);
-                cfs_mutex_unlock(&oob->oo_debug_mutex);
-                cl_env_nested_put(&nest, env);
-        } else
-                result = 0;
-        return result;
+        if (IS_ERR(env))
+                return 0;
+
+        obj   = olck->ols_cl.cls_obj;
+        oob   = cl2osc(obj);
+        io    = &oob->oo_debug_io;
+        lock  = olck->ols_cl.cls_lock;
+        descr = &lock->cll_descr;
+
+        cfs_mutex_lock(&oob->oo_debug_mutex);
+
+        io->ci_obj = cl_object_top(obj);
+        cl_io_init(env, io, CIT_MISC, io->ci_obj);
+        do {
+                result = cl_page_gang_lookup(env, obj, io,
+                                             descr->cld_start, descr->cld_end,
+                                             check_cb, (void *)lock);
+                if (result == CLP_GANG_ABORT)
+                        break;
+                if (result == CLP_GANG_RESCHED)
+                        cfs_cond_resched();
+        } while (result != CLP_GANG_OKAY);
+        cl_io_fini(env, io);
+        cfs_mutex_unlock(&oob->oo_debug_mutex);
+        cl_env_nested_put(&nest, env);
+
+        return (result == CLP_GANG_ABORT);
 }
 #else
 static int osc_lock_has_pages(struct osc_lock *olck)