Whamcloud - gitweb
LU-14739 quota: nodemap squashed root cannot bypass quota
[fs/lustre-release.git] / lustre / osc / osc_cache.c
index f1be465..658e8f4 100644 (file)
@@ -28,7 +28,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * osc cache management.
  *
@@ -531,12 +530,21 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
        if (victim == NULL)
                return -EINVAL;
 
-       if (victim->oe_state != OES_CACHE || victim->oe_fsync_wait)
+       if (victim->oe_state != OES_INV &&
+           (victim->oe_state != OES_CACHE || victim->oe_fsync_wait))
                return -EBUSY;
 
        if (cur->oe_max_end != victim->oe_max_end)
                return -ERANGE;
 
+       /*
+        * In the rare case max_pages_per_rpc (mppr) is changed, don't
+        * merge extents until after old ones have been sent, or the
+        * "extents are aligned to RPCs" checks are unhappy.
+        */
+       if (cur->oe_mppr != victim->oe_mppr)
+               return -ERANGE;
+
        LASSERT(cur->oe_dlmlock == victim->oe_dlmlock);
        ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_SHIFT;
        chunk_start = cur->oe_start >> ppc_bits;
@@ -562,7 +570,6 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
        cur->oe_urgent   |= victim->oe_urgent;
        cur->oe_memalloc |= victim->oe_memalloc;
        list_splice_init(&victim->oe_pages, &cur->oe_pages);
-       list_del_init(&victim->oe_link);
        victim->oe_nr_pages = 0;
 
        osc_extent_get(victim);
@@ -690,7 +697,7 @@ static struct osc_extent *osc_extent_find(const struct lu_env *env,
                cur->oe_start = descr->cld_start;
        if (cur->oe_end > max_end)
                cur->oe_end = max_end;
-       cur->oe_grants  = 0;
+       cur->oe_grants  = chunksize + cli->cl_grant_extent_tax;
        cur->oe_mppr    = max_pages;
        if (olck->ols_dlmlock != NULL) {
                LASSERT(olck->ols_hold);
@@ -758,54 +765,21 @@ restart:
                         * flushed, try next one. */
                        continue;
 
-               /* check if they belong to the same rpc slot before trying to
-                * merge. the extents are not overlapped and contiguous at
-                * chunk level to get here. */
-               if (ext->oe_max_end != max_end)
-                       /* if they don't belong to the same RPC slot or
-                        * max_pages_per_rpc has ever changed, do not merge. */
-                       continue;
-
-               /* check whether maximum extent size will be hit */
-               if ((ext_chk_end - ext_chk_start + 1 + 1) << ppc_bits >
-                   cli->cl_max_extent_pages)
-                       continue;
-
-               /* it's required that an extent must be contiguous at chunk
-                * level so that we know the whole extent is covered by grant
-                * (the pages in the extent are NOT required to be contiguous).
-                * Otherwise, it will be too much difficult to know which
-                * chunks have grants allocated. */
-
-               /* try to do front merge - extend ext's start */
-               if (chunk + 1 == ext_chk_start) {
-                       /* ext must be chunk size aligned */
-                       EASSERT((ext->oe_start & ~chunk_mask) == 0, ext);
-
-                       /* pull ext's start back to cover cur */
-                       ext->oe_start   = cur->oe_start;
-                       ext->oe_grants += chunksize;
-                       LASSERT(*grants >= chunksize);
-                       *grants -= chunksize;
-
-                       found = osc_extent_hold(ext);
-               } else if (chunk == ext_chk_end + 1) {
-                       /* rear merge */
-                       ext->oe_end     = cur->oe_end;
-                       ext->oe_grants += chunksize;
+               if (osc_extent_merge(env, ext, cur) == 0) {
                        LASSERT(*grants >= chunksize);
                        *grants -= chunksize;
 
-                       /* try to merge with the next one because we just fill
-                        * in a gap */
+                       /*
+                        * Try to merge with the next one too because we
+                        * might have just filled in a gap.
+                        */
                        if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
                                /* we can save extent tax from next extent */
                                *grants += cli->cl_grant_extent_tax;
 
                        found = osc_extent_hold(ext);
-               }
-               if (found != NULL)
                        break;
+               }
        }
 
        osc_extent_tree_dump(D_CACHE, obj);
@@ -819,7 +793,6 @@ restart:
        } else if (conflict == NULL) {
                /* create a new extent */
                EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur);
-               cur->oe_grants = chunksize + cli->cl_grant_extent_tax;
                LASSERT(*grants >= cur->oe_grants);
                *grants -= cur->oe_grants;
 
@@ -1397,7 +1370,6 @@ static void osc_consume_write_grant(struct client_obd *cli,
        pga->flag |= OBD_BRW_FROM_GRANT;
        CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
               PAGE_SIZE, pga, pga->pg);
-       osc_update_next_shrink(cli);
 }
 
 /* the companion to osc_consume_write_grant, called when a brw has completed.
@@ -2265,10 +2237,11 @@ int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
 EXPORT_SYMBOL(osc_io_unplug0);
 
 int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
-                       struct page *page, loff_t offset)
+                       struct cl_page *page, loff_t offset)
 {
        struct obd_export     *exp = osc_export(osc);
        struct osc_async_page *oap = &ops->ops_oap;
+       struct page           *vmpage = page->cp_vmpage;
        ENTRY;
 
        if (!page)
@@ -2278,16 +2251,24 @@ int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
        oap->oap_cli = &exp->exp_obd->u.cli;
        oap->oap_obj = osc;
 
-       oap->oap_page = page;
+       oap->oap_page = vmpage;
        oap->oap_obj_off = offset;
        LASSERT(!(offset & ~PAGE_MASK));
 
+       /* Count of transient (direct i/o) pages is always stable by the time
+        * they're submitted.  Setting this here lets us avoid calling
+        * cl_page_clip later to set this.
+        */
+       if (page->cp_type == CPT_TRANSIENT)
+               oap->oap_async_flags |= ASYNC_COUNT_STABLE|ASYNC_URGENT|
+                                       ASYNC_READY;
+
        INIT_LIST_HEAD(&oap->oap_pending_item);
        INIT_LIST_HEAD(&oap->oap_rpc_item);
 
        spin_lock_init(&oap->oap_lock);
-       CDEBUG(D_INFO, "oap %p page %p obj off %llu\n",
-              oap, page, oap->oap_obj_off);
+       CDEBUG(D_INFO, "oap %p vmpage %p obj off %llu\n",
+              oap, vmpage, oap->oap_obj_off);
        RETURN(0);
 }
 EXPORT_SYMBOL(osc_prep_async_page);
@@ -2328,7 +2309,7 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
        }
 
        /* check if the file's owner/group is over quota */
-       if (!(cmd & OBD_BRW_NOQUOTA)) {
+       if (!io->ci_noquota) {
                struct cl_object *obj;
                struct cl_attr   *attr;
                unsigned int qid[LL_MAXQUOTAS];
@@ -2587,7 +2568,7 @@ out:
        return rc;
 }
 
-int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+int osc_queue_sync_pages(const struct lu_env *env, struct cl_io *io,
                         struct osc_object *obj, struct list_head *list,
                         int brw_flags)
 {
@@ -2647,6 +2628,7 @@ int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
                grants += (1 << cli->cl_chunkbits) *
                        ((page_count + ppc - 1) / ppc);
 
+               CDEBUG(D_CACHE, "requesting %d bytes grant\n", grants);
                spin_lock(&cli->cl_loi_list_lock);
                if (osc_reserve_grant(cli, grants) == 0) {
                        list_for_each_entry(oap, list, oap_pending_item) {
@@ -2656,6 +2638,15 @@ int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
                        }
                        osc_unreserve_grant_nolock(cli, grants, 0);
                        ext->oe_grants = grants;
+               } else {
+                       /* We cannot report ENOSPC correctly if we do parallel
+                        * DIO (async RPC submission), so turn off parallel dio
+                        * if there is not sufficient grant available.  This
+                        * makes individual RPCs synchronous.
+                        */
+                       io->ci_parallel_dio = false;
+                       CDEBUG(D_CACHE,
+                       "not enough grant available, switching to sync for this i/o\n");
                }
                spin_unlock(&cli->cl_loi_list_lock);
        }
@@ -3109,11 +3100,10 @@ bool osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
                spin_unlock(&osc->oo_tree_lock);
                tree_lock = false;
 
+               res = (*cb)(env, io, pvec, j, cbdata);
+
                for (i = 0; i < j; ++i) {
                        ops = pvec[i];
-                       if (res)
-                               res = (*cb)(env, io, ops, cbdata);
-
                        page = ops->ops_cl.cpl_page;
                        lu_ref_del(&page->cp_reference, "gang_lookup", current);
                        cl_pagevec_put(env, page, pagevec);
@@ -3125,6 +3115,15 @@ bool osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
 
                if (!res)
                        break;
+
+               if (io->ci_type == CIT_MISC &&
+                   io->u.ci_misc.lm_next_rpc_time &&
+                   ktime_get_seconds() > io->u.ci_misc.lm_next_rpc_time) {
+                       osc_send_empty_rpc(osc, idx << PAGE_SHIFT);
+                       io->u.ci_misc.lm_next_rpc_time = ktime_get_seconds() +
+                                                        5 * obd_timeout / 16;
+               }
+
                if (need_resched())
                        cond_resched();
 
@@ -3141,85 +3140,101 @@ EXPORT_SYMBOL(osc_page_gang_lookup);
  * Check if page @page is covered by an extra lock or discard it.
  */
 static bool check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
-                               struct osc_page *ops, void *cbdata)
+                                void **pvec, int count, void *cbdata)
 {
        struct osc_thread_info *info = osc_env_info(env);
        struct osc_object *osc = cbdata;
-       struct cl_page *page = ops->ops_cl.cpl_page;
-       pgoff_t index;
-       bool discard = false;
-
-       index = osc_index(ops);
-
-       /* negative lock caching */
-       if (index < info->oti_ng_index) {
-               discard = true;
-       } else if (index >= info->oti_fn_index) {
-               struct ldlm_lock *tmp;
-               /* refresh non-overlapped index */
-               tmp = osc_dlmlock_at_pgoff(env, osc, index,
-                                          OSC_DAP_FL_TEST_LOCK |
-                                          OSC_DAP_FL_AST | OSC_DAP_FL_RIGHT);
-               if (tmp != NULL) {
-                       __u64 end = tmp->l_policy_data.l_extent.end;
-                       __u64 start = tmp->l_policy_data.l_extent.start;
-
-                       /* no lock covering this page */
-                       if (index < cl_index(osc2cl(osc), start)) {
-                               /* no lock at @index, first lock at @start */
-                               info->oti_ng_index = cl_index(osc2cl(osc),
-                                                    start);
-                               discard = true;
+       int i;
+
+       for (i = 0; i < count; i++) {
+               struct osc_page *ops = pvec[i];
+               struct cl_page *page = ops->ops_cl.cpl_page;
+               pgoff_t index = osc_index(ops);
+               bool discard = false;
+
+               /* negative lock caching */
+               if (index < info->oti_ng_index) {
+                       discard = true;
+               } else if (index >= info->oti_fn_index) {
+                       struct ldlm_lock *tmp;
+                       /* refresh non-overlapped index */
+                       tmp = osc_dlmlock_at_pgoff(env, osc, index,
+                                       OSC_DAP_FL_TEST_LOCK |
+                                       OSC_DAP_FL_AST |
+                                       OSC_DAP_FL_RIGHT);
+                       if (tmp != NULL) {
+                               __u64 end =
+                                       tmp->l_policy_data.l_extent.end;
+                               __u64 start =
+                                       tmp->l_policy_data.l_extent.start;
+
+                               /* no lock covering this page */
+                               if (index < cl_index(osc2cl(osc), start)) {
+                                       /* no lock at @index,
+                                        * first lock at @start
+                                        */
+                                       info->oti_ng_index =
+                                               cl_index(osc2cl(osc), start);
+                                       discard = true;
+                               } else {
+                                       /* Cache the first-non-overlapped
+                                        * index so as to skip all pages
+                                        * within [index, oti_fn_index).
+                                        * This is safe because if tmp lock
+                                        * is canceled, it will discard these
+                                        * pages.
+                                        */
+                                       info->oti_fn_index =
+                                               cl_index(osc2cl(osc), end + 1);
+                                       if (end == OBD_OBJECT_EOF)
+                                               info->oti_fn_index =
+                                                       CL_PAGE_EOF;
+                               }
+                               LDLM_LOCK_PUT(tmp);
                        } else {
-                               /* Cache the first-non-overlapped index so as to
-                                * skip all pages within [index, oti_fn_index).
-                                * This is safe because if tmp lock is canceled,
-                                * it will discard these pages.
-                                */
-                               info->oti_fn_index = cl_index(osc2cl(osc),
-                                                    end + 1);
-                               if (end == OBD_OBJECT_EOF)
-                                       info->oti_fn_index = CL_PAGE_EOF;
+                               info->oti_ng_index = CL_PAGE_EOF;
+                               discard = true;
                        }
-                       LDLM_LOCK_PUT(tmp);
-               } else {
-                       info->oti_ng_index = CL_PAGE_EOF;
-                       discard = true;
                }
-       }
 
-       if (discard) {
-               if (cl_page_own(env, io, page) == 0) {
-                       cl_page_discard(env, io, page);
-                       cl_page_disown(env, io, page);
-               } else {
-                       LASSERT(page->cp_state == CPS_FREEING);
+               if (discard) {
+                       if (cl_page_own(env, io, page) == 0) {
+                               cl_page_discard(env, io, page);
+                               cl_page_disown(env, io, page);
+                       } else {
+                               LASSERT(page->cp_state == CPS_FREEING);
+                       }
                }
-       }
 
-       info->oti_next_index = index + 1;
+               info->oti_next_index = index + 1;
+       }
        return true;
 }
 
 bool osc_discard_cb(const struct lu_env *env, struct cl_io *io,
-                  struct osc_page *ops, void *cbdata)
+                   void **pvec, int count, void *cbdata)
 {
        struct osc_thread_info *info = osc_env_info(env);
-       struct cl_page *page = ops->ops_cl.cpl_page;
-
-       /* page is top page. */
-       info->oti_next_index = osc_index(ops) + 1;
-       if (cl_page_own(env, io, page) == 0) {
-               if (!ergo(page->cp_type == CPT_CACHEABLE,
-                         !PageDirty(cl_page_vmpage(page))))
-                       CL_PAGE_DEBUG(D_ERROR, env, page,
-                                       "discard dirty page?\n");
-
-               /* discard the page */
-               cl_page_discard(env, io, page);
-               cl_page_disown(env, io, page);
-       } else {
-               LASSERT(page->cp_state == CPS_FREEING);
+       int i;
+
+       for (i = 0; i < count; i++) {
+               struct osc_page *ops = pvec[i];
+               struct cl_page *page = ops->ops_cl.cpl_page;
+
+               /* page is top page. */
+               info->oti_next_index = osc_index(ops) + 1;
+               if (cl_page_own(env, io, page) == 0) {
+                       if (!ergo(page->cp_type == CPT_CACHEABLE,
+                                 !PageDirty(cl_page_vmpage(page))))
+                               CL_PAGE_DEBUG(D_ERROR, env, page,
+                                             "discard dirty page?\n");
+
+                       /* discard the page */
+                       cl_page_discard(env, io, page);
+                       cl_page_disown(env, io, page);
+               } else {
+                       LASSERT(page->cp_state == CPS_FREEING);
+               }
        }
 
        return true;
@@ -3246,6 +3261,8 @@ int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
 
        io->ci_obj = cl_object_top(osc2cl(osc));
        io->ci_ignore_layout = 1;
+       io->u.ci_misc.lm_next_rpc_time = ktime_get_seconds() +
+                                        5 * obd_timeout / 16;
        result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
        if (result != 0)
                GOTO(out, result);