From: Hongchao.Zhang@Sun.COM Date: Fri, 18 Dec 2009 19:44:34 +0000 (+0800) Subject: b=21252 add reference on extent lock during readahead X-Git-Tag: v1_8_1_58~8 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=29309746c4049aa7da6cde4cb9a44ec0df2b1af3;p=fs%2Flustre-release.git b=21252 add reference on extent lock during readahead add an extra LCK_PR reference on the extent lock covering the readhead page to pin it during the readahead i=oleg.drokin@sun.com i=johann@sun.com i=tom.wang@sun.com --- diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 76e0dde..e91cb75 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -615,6 +615,7 @@ struct ll_async_page { struct list_head llap_pglist_item; /* checksum for paranoid I/O debugging */ __u32 llap_checksum; + struct lustre_handle llap_lockh_granted; }; /* diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 5bf009c..0a9400d 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -787,6 +787,7 @@ static struct ll_async_page *llap_from_page_with_lockh(struct page *page, llap = llap_cast_private(page); if (llap != NULL) { +#if 0 /* disabled since we take lock ref in readahead, see bug 16774/21252 */ if (origin == LLAP_ORIGIN_READAHEAD && lockh) { /* the page could belong to another lock for which * we don't hold a reference. We need to check that @@ -803,6 +804,7 @@ static struct ll_async_page *llap_from_page_with_lockh(struct page *page, lockh, flags)) RETURN(ERR_PTR(-ENOLCK)); } +#endif /* move to end of LRU list, except when page is just about to * die */ if (origin != LLAP_ORIGIN_REMOVEPAGE) { @@ -1139,6 +1141,8 @@ int ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc) { struct ll_async_page *llap; struct page *page; + struct obd_export *exp; + obd_off end; int ret = 0; ENTRY; @@ -1149,8 +1153,16 @@ int ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc) LL_CDEBUG_PAGE(D_PAGE, page, "completing cmd %d with %d\n", cmd, rc); - if (cmd & OBD_BRW_READ && llap->llap_defer_uptodate) + if (cmd & OBD_BRW_READ && llap->llap_defer_uptodate) { ll_ra_count_put(ll_i2sbi(page->mapping->host), 1); + + LASSERT(lustre_handle_is_used(&llap->llap_lockh_granted)); + exp = ll_i2obdexp(page->mapping->host); + end = ((loff_t)page->index) << CFS_PAGE_SHIFT; + end += CFS_PAGE_SIZE - 1; + obd_cancel(exp, ll_i2info(page->mapping->host)->lli_smd, LCK_PR, + &llap->llap_lockh_granted, OBD_FAST_LOCK, end); + } if (rc == 0) { if (cmd & OBD_BRW_READ) { @@ -1421,8 +1433,8 @@ static int ll_read_ahead_page(struct obd_export *exp, struct obd_io_group *oig, struct page *page; unsigned int gfp_mask = 0; int rc = 0, flags = 0; - struct ll_thread_data *ltd; - struct lustre_handle *lockh = NULL; + struct lustre_handle lockh = { 0 }; + obd_off start, end; gfp_mask = GFP_HIGHUSER & ~__GFP_WAIT; #ifdef __GFP_NOWARN @@ -1442,6 +1454,11 @@ static int ll_read_ahead_page(struct obd_export *exp, struct obd_io_group *oig, GOTO(unlock_page, rc = 0); } +#if 0 /* the fast lock stored in ltd can't be guaranteed to be the lock used + * by the llap returned by "llap_from_page_with_lockh" if there is a + * ready llap, for lock check against readahead is disabled. + * see bug 16774/21252 */ + ltd = ll_td_get(); if (ltd && ltd->lock_style > 0) { __u64 offset = ((loff_t)page->index) << CFS_PAGE_SHIFT; @@ -1450,12 +1467,14 @@ static int ll_read_ahead_page(struct obd_export *exp, struct obd_io_group *oig, if (ltd->lock_style == LL_LOCK_STYLE_FASTLOCK) flags = OBD_FAST_LOCK; } +#endif /* we do this first so that we can see the page in the /proc * accounting */ - llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READAHEAD, lockh, + llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READAHEAD, &lockh, flags); if (IS_ERR(llap) || llap->llap_defer_uptodate) { + /* bail out when we hit the end of the lock. */ if (PTR_ERR(llap) == -ENOLCK) { ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH); CDEBUG(D_READA | D_PAGE, @@ -1472,13 +1491,27 @@ static int ll_read_ahead_page(struct obd_export *exp, struct obd_io_group *oig, if (Page_Uptodate(page)) GOTO(unlock_page, rc = 0); - /* bail out when we hit the end of the lock. */ rc = ll_issue_page_read(exp, llap, oig, 1); if (rc == 0) { LL_CDEBUG_PAGE(D_READA | D_PAGE, page, "started read-ahead\n"); rc = 1; + + if (!lustre_handle_is_used(&lockh)) { + start = ((loff_t)index) << CFS_PAGE_SHIFT; + end = start + CFS_PAGE_SIZE - 1; + rc = obd_get_lock(exp, + ll_i2info(mapping->host)->lli_smd, + &llap->llap_cookie, OBD_BRW_READ, + start, end, &lockh, OBD_FAST_LOCK); + LASSERT(rc); + } + + llap->llap_lockh_granted = lockh; } else { unlock_page: + if (lustre_handle_is_used(&lockh)) + ldlm_lock_decref(&lockh, LCK_PR); + unlock_page(page); LL_CDEBUG_PAGE(D_READA | D_PAGE, page, "skipping read-ahead\n"); } diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 28e2d94..738a9c3 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -1942,7 +1942,7 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap)); - if (lockh && !(flags & OBD_FAST_LOCK)) { + if (lockh && lustre_handle_is_used(lockh) && !(flags & OBD_FAST_LOCK)) { lov_lockh = lov_handle2llh(lockh); if (lov_lockh) { lockh = lov_lockh->llh_handles + lap->lap_stripe; @@ -3256,7 +3256,7 @@ static int lov_get_lock(struct obd_export *exp, struct lov_stripe_md *lsm, /* ensure we don't cross stripe boundaries */ lov_extent_calc(exp, lsm, OBD_CALC_STRIPE_END, &stripe_end); - if (stripe_end <= end) + if (stripe_end < end) GOTO(out, rc = 0); /* map the region limits to the object limits */ diff --git a/lustre/osc/cache.c b/lustre/osc/cache.c index 119a2ca..ae1a547 100644 --- a/lustre/osc/cache.c +++ b/lustre/osc/cache.c @@ -141,6 +141,16 @@ int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res, "under us\n"); RETURN(-ENOLCK); } + + /* XXX Note! if the caller passed a unused lock handle, + * it expects us to return the lockh of the lock we matched, + * reference(LCK_PR) of the lock is increased here to assure + * its validity, and the caller should decrease the reference + * when it isn't used any more. */ + if (lockh && !lustre_handle_is_used(lockh)) { + ldlm_lock_addref(&tmplockh, LCK_PR); + lustre_handle_copy(lockh, &tmplockh); + } } spin_lock(&lock->l_extents_list_lock);