Whamcloud - gitweb
LU-2139 osc: Use SOFT_SYNC to urge server commit
[fs/lustre-release.git] / lustre / osc / osc_page.c
index 4f467fe..affd0d2 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, Whamcloud, Inc.
+ * Copyright (c) 2011, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -63,23 +63,23 @@ static int osc_page_is_dlocked(const struct lu_env *env,
                                const struct osc_page *opg,
                                enum cl_lock_mode mode, int pending, int unref)
 {
-        struct cl_page         *page;
-        struct osc_object      *obj;
-        struct osc_thread_info *info;
-        struct ldlm_res_id     *resname;
-        struct lustre_handle   *lockh;
-        ldlm_policy_data_t     *policy;
-        ldlm_mode_t             dlmmode;
-        int                     flags;
-
-        cfs_might_sleep();
-
-        info = osc_env_info(env);
-        resname = &info->oti_resname;
-        policy = &info->oti_policy;
-        lockh = &info->oti_handle;
-        page = opg->ops_cl.cpl_page;
-        obj = cl2osc(opg->ops_cl.cpl_obj);
+       struct cl_page         *page;
+       struct osc_object      *obj;
+       struct osc_thread_info *info;
+       struct ldlm_res_id     *resname;
+       struct lustre_handle   *lockh;
+       ldlm_policy_data_t     *policy;
+       ldlm_mode_t             dlmmode;
+       __u64                   flags;
+
+       might_sleep();
+
+       info = osc_env_info(env);
+       resname = &info->oti_resname;
+       policy = &info->oti_policy;
+       lockh = &info->oti_handle;
+       page = opg->ops_cl.cpl_page;
+       obj = cl2osc(opg->ops_cl.cpl_obj);
 
         flags = LDLM_FL_TEST_LOCK | LDLM_FL_BLOCK_GRANTED;
         if (pending)
@@ -168,7 +168,6 @@ static void osc_page_fini(const struct lu_env *env,
         struct osc_page *opg = cl2osc_page(slice);
         CDEBUG(D_TRACE, "%p\n", opg);
         LASSERT(opg->ops_lock == NULL);
-        OBD_SLAB_FREE_PTR(opg, osc_page_kmem);
 }
 
 static void osc_page_transfer_get(struct osc_page *opg, const char *label)
@@ -209,7 +208,7 @@ static void osc_page_transfer_add(const struct lu_env *env,
 
        spin_lock(&obj->oo_seatbelt);
        cfs_list_add(&opg->ops_inflight, &obj->oo_inflight[crt]);
-       opg->ops_submitter = cfs_current();
+       opg->ops_submitter = current;
        spin_unlock(&obj->oo_seatbelt);
 }
 
@@ -508,46 +507,65 @@ static const struct cl_page_operations osc_page_ops = {
        .cpo_flush          = osc_page_flush
 };
 
-struct cl_page *osc_page_init(const struct lu_env *env,
-                              struct cl_object *obj,
-                              struct cl_page *page, cfs_page_t *vmpage)
+int osc_page_init(const struct lu_env *env, struct cl_object *obj,
+               struct cl_page *page, struct page *vmpage)
 {
-        struct osc_object *osc = cl2osc(obj);
-        struct osc_page   *opg;
-        int result;
+       struct osc_object *osc = cl2osc(obj);
+       struct osc_page   *opg = cl_object_page_slice(obj, page);
+       int result;
 
-        OBD_SLAB_ALLOC_PTR_GFP(opg, osc_page_kmem, CFS_ALLOC_IO);
-        if (opg != NULL) {
-                opg->ops_from = 0;
-                opg->ops_to   = CFS_PAGE_SIZE;
-
-               result = osc_prep_async_page(osc, opg, vmpage,
-                                            cl_offset(obj, page->cp_index));
-                if (result == 0) {
-                        struct osc_io *oio = osc_env_io(env);
-                        opg->ops_srvlock = osc_io_srvlock(oio);
-                        cl_page_slice_add(page, &opg->ops_cl, obj,
-                                          &osc_page_ops);
-                }
-                /*
-                 * Cannot assert osc_page_protected() here as read-ahead
-                 * creates temporary pages outside of a lock.
-                 */
-#ifdef INVARIANT_CHECK
-                opg->ops_temp = !osc_page_protected(env, opg, CLM_READ, 1);
+       opg->ops_from = 0;
+       opg->ops_to   = PAGE_CACHE_SIZE;
+
+       result = osc_prep_async_page(osc, opg, vmpage,
+                                       cl_offset(obj, page->cp_index));
+       if (result == 0) {
+               struct osc_io *oio = osc_env_io(env);
+               opg->ops_srvlock = osc_io_srvlock(oio);
+               cl_page_slice_add(page, &opg->ops_cl, obj,
+                               &osc_page_ops);
+       }
+       /*
+        * Cannot assert osc_page_protected() here as read-ahead
+        * creates temporary pages outside of a lock.
+        */
+#ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
+       opg->ops_temp = !osc_page_protected(env, opg, CLM_READ, 1);
 #endif
-               /* ops_inflight and ops_lru are the same field, but it doesn't
-                * hurt to initialize it twice :-) */
-                CFS_INIT_LIST_HEAD(&opg->ops_inflight);
-               CFS_INIT_LIST_HEAD(&opg->ops_lru);
-       } else
-               result = -ENOMEM;
+       /* ops_inflight and ops_lru are the same field, but it doesn't
+        * hurt to initialize it twice :-) */
+       CFS_INIT_LIST_HEAD(&opg->ops_inflight);
+       CFS_INIT_LIST_HEAD(&opg->ops_lru);
 
        /* reserve an LRU space for this page */
        if (page->cp_type == CPT_CACHEABLE && result == 0)
                result = osc_lru_reserve(env, osc, opg);
 
-       return ERR_PTR(result);
+       return result;
+}
+
+int osc_over_unstable_soft_limit(struct client_obd *cli)
+{
+       long obd_upages, obd_dpages, osc_upages;
+
+       /* Can't check cli->cl_unstable_count, therefore, no soft limit */
+       if (cli == NULL)
+               return 0;
+
+       obd_upages = cfs_atomic_read(&obd_unstable_pages);
+       obd_dpages = cfs_atomic_read(&obd_dirty_pages);
+
+       osc_upages = cfs_atomic_read(&cli->cl_unstable_count);
+
+       /* obd_max_dirty_pages is the max number of (dirty + unstable)
+        * pages allowed at any given time. To simulate an unstable page
+        * only limit, we subtract the current number of dirty pages
+        * from this max. This difference is roughly the amount of pages
+        * currently available for unstable pages. Thus, the soft limit
+        * is half of that difference. Check osc_upages to ensure we don't
+        * set SOFT_SYNC for OSCs without any outstanding unstable pages. */
+       return osc_upages != 0 &&
+              obd_upages >= (obd_max_dirty_pages - obd_dpages) / 2;
 }
 
 /**
@@ -573,6 +591,9 @@ void osc_page_submit(const struct lu_env *env, struct osc_page *opg,
        oap->oap_count     = opg->ops_to - opg->ops_from;
        oap->oap_brw_flags = OBD_BRW_SYNC | brw_flags;
 
+       if (osc_over_unstable_soft_limit(oap->oap_cli))
+               oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
+
        if (!client_is_remote(osc_export(obj)) &&
                        cfs_capable(CFS_CAP_SYS_RESOURCE)) {
                oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
@@ -599,9 +620,9 @@ static CFS_DECL_WAITQ(osc_lru_waitq);
 static cfs_atomic_t osc_lru_waiters = CFS_ATOMIC_INIT(0);
 /* LRU pages are freed in batch mode. OSC should at least free this
  * number of pages to avoid running out of LRU budget, and.. */
-static const int lru_shrink_min = 2 << (20 - CFS_PAGE_SHIFT);  /* 2M */
+static const int lru_shrink_min = 2 << (20 - PAGE_CACHE_SHIFT);  /* 2M */
 /* free this number at most otherwise it will take too long time to finsih. */
-static const int lru_shrink_max = 32 << (20 - CFS_PAGE_SHIFT); /* 32M */
+static const int lru_shrink_max = 32 << (20 - PAGE_CACHE_SHIFT); /* 32M */
 
 /* Check if we can free LRU slots from this OSC. If there exists LRU waiters,
  * we should free slots aggressively. In this way, slots are freed in a steady
@@ -723,12 +744,15 @@ int osc_lru_shrink(struct client_obd *cli, int target)
 
                        clobj = tmp;
                        io->ci_obj = clobj;
+                       io->ci_ignore_layout = 1;
                        rc = cl_io_init(env, io, CIT_MISC, clobj);
+
+                       client_obd_list_lock(&cli->cl_lru_list_lock);
+
                        if (rc != 0)
                                break;
 
                        ++maxscan;
-                       client_obd_list_lock(&cli->cl_lru_list_lock);
                        continue;
                }
 
@@ -782,8 +806,10 @@ static void osc_lru_add(struct client_obd *cli, struct osc_page *opg)
        }
        client_obd_list_unlock(&cli->cl_lru_list_lock);
 
-       if (wakeup)
-               cfs_waitq_broadcast(&osc_lru_waitq);
+       if (wakeup) {
+               osc_lru_shrink(cli, osc_cache_too_much(cli));
+               wake_up_all(&osc_lru_waitq);
+       }
 }
 
 /* delete page from LRUlist. The page can be deleted from LRUlist for two
@@ -810,27 +836,32 @@ static void osc_lru_del(struct client_obd *cli, struct osc_page *opg, bool del)
                         * stealing one of them.
                         * cl_lru_shrinkers is to avoid recursive call in case
                         * we're already in the context of osc_lru_shrink(). */
-                       if (cfs_atomic_read(&cli->cl_lru_shrinkers) == 0)
+                       if (cfs_atomic_read(&cli->cl_lru_shrinkers) == 0 &&
+                           !memory_pressure_get())
                                osc_lru_shrink(cli, osc_cache_too_much(cli));
-                       cfs_waitq_signal(&osc_lru_waitq);
+                       wake_up(&osc_lru_waitq);
                }
        } else {
                LASSERT(cfs_list_empty(&opg->ops_lru));
        }
 }
 
+static inline int max_to_shrink(struct client_obd *cli)
+{
+       return min(cfs_atomic_read(&cli->cl_lru_in_list) >> 1, lru_shrink_max);
+}
+
 static int osc_lru_reclaim(struct client_obd *cli)
 {
        struct cl_client_cache *cache = cli->cl_cache;
-       struct client_obd *victim;
-       struct client_obd *tmp;
+       int max_scans;
        int rc;
 
        LASSERT(cache != NULL);
        LASSERT(!cfs_list_empty(&cache->ccc_lru));
 
        rc = osc_lru_shrink(cli, lru_shrink_min);
-       if (rc > 0) {
+       if (rc != 0) {
                CDEBUG(D_CACHE, "%s: Free %d pages from own LRU: %p.\n",
                        cli->cl_import->imp_obd->obd_name, rc, cli);
                return rc;
@@ -846,33 +877,31 @@ static int osc_lru_reclaim(struct client_obd *cli)
        spin_lock(&cache->ccc_lru_lock);
        cache->ccc_lru_shrinkers++;
        cfs_list_move_tail(&cli->cl_lru_osc, &cache->ccc_lru);
-       cfs_list_for_each_entry_safe(victim, tmp, &cache->ccc_lru, cl_lru_osc) {
-               if (victim == cli)
-                       break;
+
+       max_scans = cfs_atomic_read(&cache->ccc_users);
+       while (--max_scans > 0 && !cfs_list_empty(&cache->ccc_lru)) {
+               cli = cfs_list_entry(cache->ccc_lru.next, struct client_obd,
+                                       cl_lru_osc);
 
                CDEBUG(D_CACHE, "%s: cli %p LRU pages: %d, busy: %d.\n",
-                       victim->cl_import->imp_obd->obd_name, victim,
-                       cfs_atomic_read(&victim->cl_lru_in_list),
-                       cfs_atomic_read(&victim->cl_lru_busy));
+                       cli->cl_import->imp_obd->obd_name, cli,
+                       cfs_atomic_read(&cli->cl_lru_in_list),
+                       cfs_atomic_read(&cli->cl_lru_busy));
 
-               cfs_list_move_tail(&victim->cl_lru_osc, &cache->ccc_lru);
-               if (cfs_atomic_read(&victim->cl_lru_in_list) > 0)
-                       break;
+               cfs_list_move_tail(&cli->cl_lru_osc, &cache->ccc_lru);
+               if (cfs_atomic_read(&cli->cl_lru_in_list) > 0) {
+                       spin_unlock(&cache->ccc_lru_lock);
+
+                       rc = osc_lru_shrink(cli, max_to_shrink(cli));
+                       spin_lock(&cache->ccc_lru_lock);
+                       if (rc != 0)
+                               break;
+               }
        }
        spin_unlock(&cache->ccc_lru_lock);
-       if (victim == cli) {
-               CDEBUG(D_CACHE, "%s: can't get any free LRU slots.\n",
-                       cli->cl_import->imp_obd->obd_name);
-               return 0;
-       }
-
-       rc = osc_lru_shrink(victim,
-                           min(cfs_atomic_read(&victim->cl_lru_in_list) >> 1,
-                               lru_shrink_max));
-
-       CDEBUG(D_CACHE, "%s: Free %d pages from other cli: %p.\n",
-               cli->cl_import->imp_obd->obd_name, rc, victim);
 
+       CDEBUG(D_CACHE, "%s: cli %p freed %d pages.\n",
+               cli->cl_import->imp_obd->obd_name, cli, rc);
        return rc;
 }
 
@@ -898,7 +927,7 @@ static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj,
                if (rc > 0)
                        continue;
 
-               cfs_cond_resched();
+               cond_resched();
 
                /* slowest case, all of caching pages are busy, notifying
                 * other OSCs that we're lack of LRU slots. */