Whamcloud - gitweb
b=21252 add reference on extent lock during readahead
authorHongchao.Zhang@Sun.COM <Hongchao.Zhang@Sun.COM>
Fri, 18 Dec 2009 19:44:34 +0000 (03:44 +0800)
committerjohann <johann@granier.local>
Fri, 18 Dec 2009 10:50:03 +0000 (11:50 +0100)
add an extra LCK_PR reference on the extent lock
covering the readhead page to pin it during the readahead

i=oleg.drokin@sun.com
i=johann@sun.com
i=tom.wang@sun.com

lustre/llite/llite_internal.h
lustre/llite/rw.c
lustre/lov/lov_obd.c
lustre/osc/cache.c

index 76e0dde..e91cb75 100644 (file)
@@ -615,6 +615,7 @@ struct ll_async_page {
         struct list_head llap_pglist_item;
         /* checksum for paranoid I/O debugging */
         __u32 llap_checksum;
+        struct lustre_handle llap_lockh_granted;
 };
 
 /*
index 5bf009c..0a9400d 100644 (file)
@@ -787,6 +787,7 @@ static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
 
         llap = llap_cast_private(page);
         if (llap != NULL) {
+#if 0 /* disabled since we take lock ref in readahead, see bug 16774/21252 */
                 if (origin == LLAP_ORIGIN_READAHEAD && lockh) {
                         /* the page could belong to another lock for which
                          * we don't hold a reference. We need to check that
@@ -803,6 +804,7 @@ static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
                                           lockh, flags))
                                 RETURN(ERR_PTR(-ENOLCK));
                 }
+#endif
                 /* move to end of LRU list, except when page is just about to
                  * die */
                 if (origin != LLAP_ORIGIN_REMOVEPAGE) {
@@ -1139,6 +1141,8 @@ int ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
 {
         struct ll_async_page *llap;
         struct page *page;
+        struct obd_export *exp;
+        obd_off end;
         int ret = 0;
         ENTRY;
 
@@ -1149,8 +1153,16 @@ int ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
 
         LL_CDEBUG_PAGE(D_PAGE, page, "completing cmd %d with %d\n", cmd, rc);
 
-        if (cmd & OBD_BRW_READ && llap->llap_defer_uptodate)
+        if (cmd & OBD_BRW_READ && llap->llap_defer_uptodate) {
                 ll_ra_count_put(ll_i2sbi(page->mapping->host), 1);
+                LASSERT(lustre_handle_is_used(&llap->llap_lockh_granted));
+                exp = ll_i2obdexp(page->mapping->host);
+                end = ((loff_t)page->index) << CFS_PAGE_SHIFT;
+                end += CFS_PAGE_SIZE - 1;
+                obd_cancel(exp, ll_i2info(page->mapping->host)->lli_smd, LCK_PR,
+                           &llap->llap_lockh_granted, OBD_FAST_LOCK, end);
+        }
 
         if (rc == 0)  {
                 if (cmd & OBD_BRW_READ) {
@@ -1421,8 +1433,8 @@ static int ll_read_ahead_page(struct obd_export *exp, struct obd_io_group *oig,
         struct page *page;
         unsigned int gfp_mask = 0;
         int rc = 0, flags = 0;
-        struct ll_thread_data *ltd;
-        struct lustre_handle *lockh = NULL;
+        struct lustre_handle lockh = { 0 };
+        obd_off start, end;
 
         gfp_mask = GFP_HIGHUSER & ~__GFP_WAIT;
 #ifdef __GFP_NOWARN
@@ -1442,6 +1454,11 @@ static int ll_read_ahead_page(struct obd_export *exp, struct obd_io_group *oig,
                 GOTO(unlock_page, rc = 0);
         }
 
+#if 0 /* the fast lock stored in ltd can't be guaranteed to be the lock used
+       * by the llap returned by "llap_from_page_with_lockh" if there is a
+       * ready llap, for lock check against readahead is disabled. 
+       * see bug 16774/21252 */
+
         ltd = ll_td_get();
         if (ltd && ltd->lock_style > 0) {
                 __u64 offset = ((loff_t)page->index) << CFS_PAGE_SHIFT;
@@ -1450,12 +1467,14 @@ static int ll_read_ahead_page(struct obd_export *exp, struct obd_io_group *oig,
                 if (ltd->lock_style == LL_LOCK_STYLE_FASTLOCK)
                         flags = OBD_FAST_LOCK;
         }
+#endif
 
         /* we do this first so that we can see the page in the /proc
          * accounting */
-        llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READAHEAD, lockh,
+        llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READAHEAD, &lockh,
                                          flags);
         if (IS_ERR(llap) || llap->llap_defer_uptodate) {
+                /* bail out when we hit the end of the lock. */
                 if (PTR_ERR(llap) == -ENOLCK) {
                         ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH);
                         CDEBUG(D_READA | D_PAGE,
@@ -1472,13 +1491,27 @@ static int ll_read_ahead_page(struct obd_export *exp, struct obd_io_group *oig,
         if (Page_Uptodate(page))
                 GOTO(unlock_page, rc = 0);
 
-        /* bail out when we hit the end of the lock. */
         rc = ll_issue_page_read(exp, llap, oig, 1);
         if (rc == 0) {
                 LL_CDEBUG_PAGE(D_READA | D_PAGE, page, "started read-ahead\n");
                 rc = 1;
+
+                if (!lustre_handle_is_used(&lockh)) {
+                        start = ((loff_t)index) << CFS_PAGE_SHIFT;
+                        end = start + CFS_PAGE_SIZE - 1;
+                        rc = obd_get_lock(exp,
+                                          ll_i2info(mapping->host)->lli_smd,
+                                          &llap->llap_cookie, OBD_BRW_READ, 
+                                          start, end, &lockh, OBD_FAST_LOCK);
+                        LASSERT(rc);
+                }
+
+                llap->llap_lockh_granted = lockh;
         } else {
 unlock_page:
+                if (lustre_handle_is_used(&lockh))
+                        ldlm_lock_decref(&lockh, LCK_PR);
+
                 unlock_page(page);
                 LL_CDEBUG_PAGE(D_READA | D_PAGE, page, "skipping read-ahead\n");
         }
index 28e2d94..738a9c3 100644 (file)
@@ -1942,7 +1942,7 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
 
-        if (lockh && !(flags & OBD_FAST_LOCK)) {
+        if (lockh && lustre_handle_is_used(lockh) && !(flags & OBD_FAST_LOCK)) {
                 lov_lockh = lov_handle2llh(lockh);
                 if (lov_lockh) {
                         lockh = lov_lockh->llh_handles + lap->lap_stripe;
@@ -3256,7 +3256,7 @@ static int lov_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         /* ensure we don't cross stripe boundaries */
         lov_extent_calc(exp, lsm, OBD_CALC_STRIPE_END, &stripe_end);
-        if (stripe_end <= end)
+        if (stripe_end < end)
                 GOTO(out, rc = 0);
 
         /* map the region limits to the object limits */
index 119a2ca..ae1a547 100644 (file)
@@ -141,6 +141,16 @@ int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
                                "under us\n");
                         RETURN(-ENOLCK);
                 }
+               
+                /* XXX Note! if the caller passed a unused lock handle,
+                 * it expects us to return the lockh of the lock we matched,
+                 * reference(LCK_PR) of the lock is increased here to assure
+                 * its validity, and the caller should decrease the reference
+                 * when it isn't used any more. */ 
+                if (lockh && !lustre_handle_is_used(lockh)) {
+                        ldlm_lock_addref(&tmplockh, LCK_PR);
+                        lustre_handle_copy(lockh, &tmplockh);
+                }
         }
 
         spin_lock(&lock->l_extents_list_lock);