Whamcloud - gitweb
LU-744 osc: add lru pages management - new RPC
authorJinshan Xiong <jinshan.xiong@whamcloud.com>
Wed, 16 May 2012 03:11:37 +0000 (20:11 -0700)
committerOleg Drokin <green@whamcloud.com>
Tue, 16 Oct 2012 20:59:26 +0000 (16:59 -0400)
Add a cache management at OSC layer, this way we can control how much
memory can be used to cache lustre pages and avoid complex solution
as what we did in b1_8.

In this patch, admins can set how much memory will be used for caching
Lustre pages per file system. A self-adapative algorithm is used to
balance those budget among OSCs.

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: I76c840aef5ca9a3a4619f06fcaee7de7f95b05f5
Reviewed-on: http://review.whamcloud.com/2514
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
19 files changed:
libcfs/include/libcfs/linux/linux-prim.h
libcfs/include/libcfs/posix/libcfs.h
libcfs/include/libcfs/user-lock.h
libcfs/include/libcfs/user-prim.h
lustre/include/lclient.h
lustre/include/lprocfs_status.h
lustre/include/obd.h
lustre/ldlm/ldlm_lib.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/lproc_llite.c
lustre/lov/lov_obd.c
lustre/obdclass/cl_page.c
lustre/obdclass/lprocfs_status.c
lustre/osc/lproc_osc.c
lustre/osc/osc_cl_internal.h
lustre/osc/osc_internal.h
lustre/osc/osc_page.c
lustre/osc/osc_request.c

index 8e8f7e7..b7f8e06 100644 (file)
@@ -179,6 +179,8 @@ typedef wait_queue_t                        cfs_waitlink_t;
 typedef wait_queue_head_t              cfs_waitq_t;
 typedef long                            cfs_task_state_t;
 
+#define CFS_DECL_WAITQ(wq)             DECLARE_WAIT_QUEUE_HEAD(wq)
+
 #define cfs_kthread_run(fn, data, fmt, arg...) kthread_run(fn, data, fmt, ##arg)
 
 /* Kernel thread */
@@ -276,6 +278,7 @@ typedef atomic_t cfs_atomic_t;
 #define cfs_atomic_inc_and_test(atom)        atomic_inc_and_test(atom)
 #define cfs_atomic_inc_return(atom)          atomic_inc_return(atom)
 #define cfs_atomic_inc_not_zero(atom)        atomic_inc_not_zero(atom)
+#define cfs_atomic_add_unless(atom, a, u)    atomic_add_unless(atom, a, u)
 #define cfs_atomic_dec(atom)                 atomic_dec(atom)
 #define cfs_atomic_dec_and_test(atom)        atomic_dec_and_test(atom)
 #define cfs_atomic_dec_and_lock(atom, lock)  atomic_dec_and_lock(atom, lock)
@@ -286,6 +289,7 @@ typedef atomic_t cfs_atomic_t;
 #define cfs_atomic_sub(value, atom)          atomic_sub(value, atom)
 #define cfs_atomic_sub_and_test(value, atom) atomic_sub_and_test(value, atom)
 #define cfs_atomic_sub_return(value, atom)   atomic_sub_return(value, atom)
+#define cfs_atomic_cmpxchg(atom, old, nv)    atomic_cmpxchg(atom, old, nv)
 #define CFS_ATOMIC_INIT(i)                   ATOMIC_INIT(i)
 
 /*
index 17d37ce..33691dc 100644 (file)
@@ -64,6 +64,7 @@
 #include <sys/socket.h>
 #include <sys/utsname.h>
 #include <ctype.h>
+#include <stdbool.h>
 
 #ifdef HAVE_NETDB_H
 #include <netdb.h>
index dbb97ff..cd4983b 100644 (file)
@@ -246,6 +246,8 @@ typedef struct { volatile int counter; } cfs_atomic_t;
 #define cfs_atomic_add_unless(v, a, u) \
         ((v)->counter != u ? (v)->counter += a : 0)
 #define cfs_atomic_inc_not_zero(v) cfs_atomic_add_unless((v), 1, 0)
+#define cfs_atomic_cmpxchg(v, ov, nv) \
+       ((v)->counter == ov ? ((v)->counter = nv, ov) : (v)->counter)
 
 #ifdef HAVE_LIBPTHREAD
 #include <pthread.h>
index 756062b..a884960 100644 (file)
@@ -89,6 +89,8 @@ typedef struct cfs_waitq {
         cfs_list_t sleepers;
 } cfs_waitq_t;
 
+#define CFS_DECL_WAITQ(wq) cfs_waitq_t wq
+
 /*
  * Task states
  */
index 8be6c5b..9c56770 100644 (file)
@@ -422,4 +422,16 @@ int lov_read_and_clear_async_rc(struct cl_object *clob);
 struct lov_stripe_md *ccc_inode_lsm_get(struct inode *inode);
 void ccc_inode_lsm_put(struct inode *inode, struct lov_stripe_md *lsm);
 
+/**
+ * Data structures for LRU management on lustre client mount
+ */
+struct cl_client_lru {
+       cfs_atomic_t    ccl_users; /* how many users(OSCs) of this data */
+       cfs_atomic_t    ccl_page_left;
+       unsigned long   ccl_page_max;
+       cfs_list_t      ccl_list; /* entities for lru - actually osc list */
+       cfs_spinlock_t  ccl_lock; /* lock for list */
+       unsigned int    ccl_reclaim_count; /* statistics */
+};
+
 #endif /*LCLIENT_H */
index dc32c08..1e43ab2 100644 (file)
@@ -659,6 +659,8 @@ extern int lprocfs_write_u64_helper(const char *buffer, unsigned long count,
 extern int lprocfs_write_frac_u64_helper(const char *buffer,
                                          unsigned long count,
                                          __u64 *val, int mult);
+char *lprocfs_find_named_value(const char *buffer, const char *name,
+                               unsigned long *count);
 void lprocfs_oh_tally(struct obd_histogram *oh, unsigned int value);
 void lprocfs_oh_tally_log2(struct obd_histogram *oh, unsigned int value);
 void lprocfs_oh_clear(struct obd_histogram *oh);
index 1113a74..c5fc083 100644 (file)
@@ -503,6 +503,16 @@ struct client_obd {
         struct obd_histogram     cl_read_offset_hist;
         struct obd_histogram     cl_write_offset_hist;
 
+       /* lru for osc caching pages */
+       struct cl_client_lru    *cl_lru;
+       cfs_list_t               cl_lru_osc; /* member of cl_lru->ccl_list */
+       cfs_atomic_t            *cl_lru_left;
+       cfs_atomic_t             cl_lru_busy;
+       cfs_atomic_t             cl_lru_shrinkers;
+       cfs_atomic_t             cl_lru_in_list;
+       cfs_list_t               cl_lru_list; /* lru page list */
+       client_obd_lock_t        cl_lru_list_lock; /* page list protector */
+
         /* number of in flight destroy rpcs is limited to max_rpcs_in_flight */
         cfs_atomic_t             cl_destroy_in_flight;
         cfs_waitq_t              cl_destroy_waitq;
@@ -755,6 +765,9 @@ struct lov_obd {
         cfs_list_t              lov_pool_list; /* used for sequential access */
         cfs_proc_dir_entry_t   *lov_pool_proc_entry;
         enum lustre_sec_part    lov_sp_me;
+
+       /* cached LRU data from upper layer */
+       void                   *lov_lru;
 };
 
 struct lmv_tgt_desc {
@@ -1240,6 +1253,9 @@ enum obd_cleanup_stage {
 #define KEY_CONNECT_FLAG        "connect_flags"
 #define KEY_SYNC_LOCK_CANCEL    "sync_lock_cancel"
 
+#define KEY_LRU_SET            "lru_set"
+#define KEY_LRU_SHRINK         "lru_shrink"
+
 struct lu_context;
 
 /* /!\ must be coherent with include/linux/namei.h on patched kernel */
index a85eea8..3bf5de1 100644 (file)
@@ -359,6 +359,15 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
         cfs_spin_lock_init(&cli->cl_write_page_hist.oh_lock);
         cfs_spin_lock_init(&cli->cl_read_offset_hist.oh_lock);
         cfs_spin_lock_init(&cli->cl_write_offset_hist.oh_lock);
+
+       /* lru for osc. */
+       CFS_INIT_LIST_HEAD(&cli->cl_lru_osc);
+       cfs_atomic_set(&cli->cl_lru_shrinkers, 0);
+       cfs_atomic_set(&cli->cl_lru_busy, 0);
+       cfs_atomic_set(&cli->cl_lru_in_list, 0);
+       CFS_INIT_LIST_HEAD(&cli->cl_lru_list);
+       client_obd_list_lock_init(&cli->cl_lru_list_lock);
+
         cfs_waitq_init(&cli->cl_destroy_waitq);
         cfs_atomic_set(&cli->cl_destroy_in_flight, 0);
 #ifdef ENABLE_CHECKSUM
index e5732de..385f387 100644 (file)
@@ -450,8 +450,7 @@ struct ll_sb_info {
 
         struct lprocfs_stats     *ll_stats; /* lprocfs stats counter */
 
-        unsigned long             ll_async_page_max;
-        unsigned long             ll_async_page_count;
+       struct cl_client_lru      ll_lru;
 
         struct lprocfs_stats     *ll_ra_stats;
 
index 08936e2..d62ce20 100644 (file)
@@ -73,8 +73,9 @@ extern struct address_space_operations_ext ll_aops;
 
 static struct ll_sb_info *ll_init_sbi(void)
 {
-        struct ll_sb_info *sbi = NULL;
-        unsigned long pages;
+       struct ll_sb_info *sbi = NULL;
+       unsigned long pages;
+       unsigned long lru_page_max;
         struct sysinfo si;
         class_uuid_t uuid;
         int i;
@@ -94,13 +95,20 @@ static struct ll_sb_info *ll_init_sbi(void)
         pages = si.totalram - si.totalhigh;
         if (pages >> (20 - CFS_PAGE_SHIFT) < 512) {
 #ifdef HAVE_BGL_SUPPORT
-                sbi->ll_async_page_max = pages / 4;
+               lru_page_max = pages / 4;
 #else
-                sbi->ll_async_page_max = pages / 2;
+               lru_page_max = pages / 2;
 #endif
-        } else {
-                sbi->ll_async_page_max = (pages / 4) * 3;
-        }
+       } else {
+               lru_page_max = (pages / 4) * 3;
+       }
+
+       /* initialize lru data */
+       cfs_atomic_set(&sbi->ll_lru.ccl_users, 0);
+       sbi->ll_lru.ccl_page_max = lru_page_max;
+       cfs_atomic_set(&sbi->ll_lru.ccl_page_left, lru_page_max);
+       cfs_spin_lock_init(&sbi->ll_lru.ccl_lock);
+       CFS_INIT_LIST_HEAD(&sbi->ll_lru.ccl_list);
 
         sbi->ll_ra_info.ra_max_pages_per_file = min(pages / 32,
                                            SBI_DEFAULT_READAHEAD_MAX);
@@ -543,7 +551,11 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                  NULL);
         cl_sb_init(sb);
 
-        sb->s_root = d_alloc_root(root);
+       err = obd_set_info_async(NULL, sbi->ll_dt_exp, sizeof(KEY_LRU_SET),
+                                KEY_LRU_SET, sizeof(sbi->ll_lru),
+                                &sbi->ll_lru, NULL);
+
+       sb->s_root = d_alloc_root(root);
 #ifdef HAVE_DCACHE_LOCK
        sb->s_root->d_op = &ll_d_root_ops;
 #else
index b3a7fe1..239e3c5 100644 (file)
@@ -363,46 +363,109 @@ static int ll_wr_max_read_ahead_whole_mb(struct file *file, const char *buffer,
 static int ll_rd_max_cached_mb(char *page, char **start, off_t off,
                                int count, int *eof, void *data)
 {
-        struct super_block *sb = data;
-        struct ll_sb_info *sbi = ll_s2sbi(sb);
-        long pages_number;
-        int mult;
-
-        cfs_spin_lock(&sbi->ll_lock);
-        pages_number = sbi->ll_async_page_max;
-        cfs_spin_unlock(&sbi->ll_lock);
-
-        mult = 1 << (20 - CFS_PAGE_SHIFT);
-        return lprocfs_read_frac_helper(page, count, pages_number, mult);;
+       struct super_block *sb = data;
+       struct ll_sb_info *sbi = ll_s2sbi(sb);
+       struct cl_client_lru *lru = &sbi->ll_lru;
+       int shift = 20 - CFS_PAGE_SHIFT;
+       int max_cached_mb;
+       int unused_mb;
+
+       *eof = 1;
+       max_cached_mb = lru->ccl_page_max >> shift;
+       unused_mb = cfs_atomic_read(&lru->ccl_page_left) >> shift;
+       return snprintf(page, count,
+                       "users: %d\n"
+                       "max_cached_mb: %d\n"
+                       "used_mb: %d\n"
+                       "unused_mb: %d\n"
+                       "reclaim_count: %u\n",
+                       cfs_atomic_read(&lru->ccl_users),
+                       max_cached_mb,
+                       max_cached_mb - unused_mb,
+                       unused_mb,
+                       lru->ccl_reclaim_count);
 }
 
 static int ll_wr_max_cached_mb(struct file *file, const char *buffer,
                                unsigned long count, void *data)
 {
-        struct super_block *sb = data;
-        struct ll_sb_info *sbi = ll_s2sbi(sb);
-        int mult, rc, pages_number;
+       struct super_block *sb = data;
+       struct ll_sb_info *sbi = ll_s2sbi(sb);
+       struct cl_client_lru *lru = &sbi->ll_lru;
+       int mult, rc, pages_number;
+       int diff = 0;
+       int nrpages = 0;
+       ENTRY;
+
+       mult = 1 << (20 - CFS_PAGE_SHIFT);
+       buffer = lprocfs_find_named_value(buffer, "max_cached_mb:", &count);
+       rc = lprocfs_write_frac_helper(buffer, count, &pages_number, mult);
+       if (rc)
+               RETURN(rc);
+
+       if (pages_number < 0 || pages_number > cfs_num_physpages) {
+               CERROR("%s: can't set max cache more than %lu MB\n",
+                      ll_get_fsname(sb, NULL, 0),
+                      cfs_num_physpages >> (20 - CFS_PAGE_SHIFT));
+               RETURN(-ERANGE);
+       }
+
+       if (sbi->ll_dt_exp == NULL)
+               RETURN(-ENODEV);
+
+       cfs_spin_lock(&sbi->ll_lock);
+       diff = pages_number - lru->ccl_page_max;
+       cfs_spin_unlock(&sbi->ll_lock);
+
+       /* easy - add more LRU slots. */
+       if (diff >= 0) {
+               cfs_atomic_add(diff, &lru->ccl_page_left);
+               GOTO(out, rc = 0);
+       }
+
+       diff = -diff;
+       while (diff > 0) {
+               int tmp;
+
+               /* reduce LRU budget from free slots. */
+               do {
+                       int ov, nv;
+
+                       ov = cfs_atomic_read(&lru->ccl_page_left);
+                       if (ov == 0)
+                               break;
+
+                       nv = ov > diff ? ov - diff : 0;
+                       rc = cfs_atomic_cmpxchg(&lru->ccl_page_left, ov, nv);
+                       if (likely(ov == rc)) {
+                               diff -= ov - nv;
+                               nrpages += ov - nv;
+                               break;
+                       }
+               } while (1);
+
+               if (diff <= 0)
+                       break;
+
+               /* difficult - have to ask OSCs to drop LRU slots. */
+               tmp = diff << 1;
+               rc = obd_set_info_async(NULL, sbi->ll_dt_exp,
+                               sizeof(KEY_LRU_SHRINK), KEY_LRU_SHRINK,
+                               sizeof(tmp), &tmp, NULL);
+               if (rc < 0)
+                       break;
+       }
 
-        mult = 1 << (20 - CFS_PAGE_SHIFT);
-        rc = lprocfs_write_frac_helper(buffer, count, &pages_number, mult);
-        if (rc)
-                return rc;
-
-        if (pages_number < 0 || pages_number > cfs_num_physpages) {
-                CERROR("can't set max cache more than %lu MB\n",
-                        cfs_num_physpages >> (20 - CFS_PAGE_SHIFT));
-                return -ERANGE;
-        }
-
-        cfs_spin_lock(&sbi->ll_lock);
-        sbi->ll_async_page_max = pages_number ;
-        cfs_spin_unlock(&sbi->ll_lock);
-
-        if (!sbi->ll_dt_exp)
-                /* Not set up yet, don't call llap_shrink_cache */
-                return count;
-
-        return count;
+out:
+       if (rc >= 0) {
+               cfs_spin_lock(&sbi->ll_lock);
+               lru->ccl_page_max = pages_number;
+               cfs_spin_unlock(&sbi->ll_lock);
+               rc = count;
+       } else {
+               cfs_atomic_add(nrpages, &lru->ccl_page_left);
+       }
+       return rc;
 }
 
 static int ll_rd_checksum(char *page, char **start, off_t off,
index 098bce5..207f39c 100644 (file)
@@ -61,6 +61,7 @@
 #include <lprocfs_status.h>
 #include <lustre_param.h>
 #include <cl_object.h>
+#include <lclient.h> /* for cl_client_lru */
 #include <lustre/ll_fiemap.h>
 #include <lustre_log.h>
 
@@ -636,6 +637,15 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
         if (!tgt->ltd_exp)
                 GOTO(out, rc = 0);
 
+       if (lov->lov_lru != NULL) {
+               rc = obd_set_info_async(NULL, tgt->ltd_exp,
+                               sizeof(KEY_LRU_SET), KEY_LRU_SET,
+                               sizeof(struct cl_client_lru), lov->lov_lru,
+                               NULL);
+               if (rc < 0)
+                       GOTO(out, rc);
+       }
+
         rc = lov_notify(obd, tgt->ltd_exp->exp_obd,
                         active ? OBD_NOTIFY_CONNECT : OBD_NOTIFY_INACTIVE,
                         (void *)&index);
@@ -2707,7 +2717,11 @@ static int lov_set_info_async(const struct lu_env *env, struct obd_export *exp,
                 mds_con = 1;
         } else if (KEY_IS(KEY_CAPA_KEY)) {
                 capa = 1;
-        }
+       } else if (KEY_IS(KEY_LRU_SET)) {
+               LASSERT(lov->lov_lru == NULL);
+               lov->lov_lru = val;
+               do_inactive = 1;
+       }
 
         for (i = 0; i < count; i++, val = (char *)val + incr) {
                 if (next_id) {
index ea160eb..ba12d8a 100644 (file)
@@ -634,10 +634,9 @@ static void cl_page_state_set(const struct lu_env *env,
  */
 void cl_page_get(struct cl_page *page)
 {
-        ENTRY;
-        LASSERT(page->cp_state != CPS_FREEING);
-        cl_page_get_trust(page);
-        EXIT;
+       ENTRY;
+       cl_page_get_trust(page);
+       EXIT;
 }
 EXPORT_SYMBOL(cl_page_get);
 
index dc595f5..6b889b2 100644 (file)
@@ -2332,6 +2332,52 @@ int lprocfs_write_frac_u64_helper(const char *buffer, unsigned long count,
 }
 EXPORT_SYMBOL(lprocfs_write_frac_u64_helper);
 
+static char *lprocfs_strnstr(const char *s1, const char *s2, size_t len)
+{
+       size_t l2;
+
+       l2 = strlen(s2);
+       if (!l2)
+               return (char *)s1;
+       while (len >= l2) {
+               len--;
+               if (!memcmp(s1, s2, l2))
+                       return (char *)s1;
+               s1++;
+       }
+       return NULL;
+}
+
+/**
+ * Find the string \a name in the input \a buffer, and return a pointer to the
+ * value immediately following \a name, reducing \a count appropriately.
+ * If \a name is not found the original \a buffer is returned.
+ */
+char *lprocfs_find_named_value(const char *buffer, const char *name,
+                               unsigned long *count)
+{
+       char *val;
+       size_t buflen = *count;
+
+       /* there is no strnstr() in rhel5 and ubuntu kernels */
+       val = lprocfs_strnstr(buffer, name, buflen);
+       if (val == NULL)
+               return (char *)buffer;
+
+       val += strlen(name);                             /* skip prefix */
+       while (val < buffer + buflen && isspace(*val)) /* skip separator */
+               val++;
+
+       *count = 0;
+       while (val < buffer + buflen && isalnum(*val)) {
+               ++*count;
+               ++val;
+       }
+
+       return val - *count;
+}
+EXPORT_SYMBOL(lprocfs_find_named_value);
+
 int lprocfs_seq_create(cfs_proc_dir_entry_t *parent, char *name, mode_t mode,
                        struct file_operations *seq_fops, void *data)
 {
index 742733c..5bf6332 100644 (file)
@@ -160,6 +160,48 @@ static int osc_wr_max_dirty_mb(struct file *file, const char *buffer,
         return count;
 }
 
+static int osc_rd_cached_mb(char *page, char **start, off_t off, int count,
+                           int *eof, void *data)
+{
+       struct obd_device *dev = data;
+       struct client_obd *cli = &dev->u.cli;
+       int shift = 20 - CFS_PAGE_SHIFT;
+       int rc;
+
+       rc = snprintf(page, count,
+                     "used_mb: %d\n"
+                     "busy_cnt: %d\n",
+                     (cfs_atomic_read(&cli->cl_lru_in_list) +
+                       cfs_atomic_read(&cli->cl_lru_busy)) >> shift,
+                     cfs_atomic_read(&cli->cl_lru_busy));
+
+       return rc;
+}
+
+/* shrink the number of caching pages to a specific number */
+static int osc_wr_cached_mb(struct file *file, const char *buffer,
+                           unsigned long count, void *data)
+{
+       struct obd_device *dev = data;
+       struct client_obd *cli = &dev->u.cli;
+       int pages_number, mult, rc;
+
+       mult = 1 << (20 - CFS_PAGE_SHIFT);
+       buffer = lprocfs_find_named_value(buffer, "used_mb:", &count);
+       rc = lprocfs_write_frac_helper(buffer, count, &pages_number, mult);
+       if (rc)
+               return rc;
+
+       if (pages_number < 0)
+               return -ERANGE;
+
+       rc = cfs_atomic_read(&cli->cl_lru_in_list) - pages_number;
+       if (rc > 0)
+               (void)osc_lru_shrink(cli, rc);
+
+       return count;
+}
+
 static int osc_rd_cur_dirty_bytes(char *page, char **start, off_t off,
                                   int count, int *eof, void *data)
 {
@@ -615,6 +657,7 @@ static struct lprocfs_vars lprocfs_osc_obd_vars[] = {
                                 osc_wr_max_rpcs_in_flight, 0 },
         { "destroys_in_flight", osc_rd_destroys_in_flight, 0, 0 },
         { "max_dirty_mb",    osc_rd_max_dirty_mb, osc_wr_max_dirty_mb, 0 },
+       { "osc_cached_mb",   osc_rd_cached_mb,     osc_wr_cached_mb, 0 },
         { "cur_dirty_bytes", osc_rd_cur_dirty_bytes, 0, 0 },
         { "cur_grant_bytes", osc_rd_cur_grant_bytes,
                              osc_wr_cur_grant_bytes, 0 },
index 54ded82..79dcc8b 100644 (file)
@@ -56,6 +56,7 @@
 /* osc_build_res_name() */
 #include <obd_ost.h>
 #include <cl_object.h>
+#include <lclient.h>
 #include "osc_internal.h"
 
 /** \defgroup osc osc
@@ -101,6 +102,7 @@ struct osc_session {
         struct osc_io       os_io;
 };
 
+#define OTI_PVEC_SIZE 64
 struct osc_thread_info {
         struct ldlm_res_id      oti_resname;
         ldlm_policy_data_t      oti_policy;
@@ -108,7 +110,8 @@ struct osc_thread_info {
         struct cl_attr          oti_attr;
         struct lustre_handle    oti_handle;
         struct cl_page_list     oti_plist;
-       struct cl_io        oti_io;
+       struct cl_io            oti_io;
+       struct cl_page         *oti_pvec[OTI_PVEC_SIZE];
 };
 
 struct osc_object {
@@ -360,14 +363,25 @@ struct osc_page {
          */
                               ops_temp:1,
         /**
+        * in LRU?
+        */
+                             ops_in_lru:1,
+       /**
          * Set if the page must be transferred with OBD_BRW_SRVLOCK.
          */
                               ops_srvlock:1;
-        /**
-         * Linkage into a per-osc_object list of pages in flight. For
-         * debugging.
-         */
-        cfs_list_t            ops_inflight;
+       union {
+               /**
+                * lru page list. ops_inflight and ops_lru are exclusive so
+                * that they can share the same data.
+                */
+               cfs_list_t            ops_lru;
+               /**
+                * Linkage into a per-osc_object list of pages in flight. For
+                * debugging.
+                */
+               cfs_list_t            ops_inflight;
+       };
         /**
          * Thread that submitted this page for transfer. For debugging.
          */
index 0c21792..99c30cc 100644 (file)
@@ -143,6 +143,7 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *cfg);
 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                  cfs_list_t *ext_list, int cmd, pdl_policy_t p);
+int osc_lru_shrink(struct client_obd *cli, int target);
 
 extern cfs_spinlock_t osc_ast_guard;
 
index bc234e2..c46db69 100644 (file)
 
 #include "osc_cl_internal.h"
 
-/** \addtogroup osc 
- *  @{ 
+static void osc_lru_del(struct client_obd *cli, struct osc_page *opg, bool del);
+static void osc_lru_add(struct client_obd *cli, struct osc_page *opg);
+static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj,
+                          struct osc_page *opg);
+
+/** \addtogroup osc
+ *  @{
  */
 
-/* 
+/*
  * Comment out osc_page_protected because it may sleep inside the
  * the client_obd_list_lock.
  * client_obd_list_lock -> osc_ap_completion -> osc_completion ->
@@ -196,9 +201,12 @@ static void osc_page_transfer_put(const struct lu_env *env,
 static void osc_page_transfer_add(const struct lu_env *env,
                                   struct osc_page *opg, enum cl_req_type crt)
 {
-        struct osc_object *obj;
+       struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
+
+       /* ops_lru and ops_inflight share the same field, so take it from LRU
+        * first and then use it as inflight. */
+       osc_lru_del(osc_cli(obj), opg, false);
 
-        obj = cl2osc(opg->ops_cl.cpl_obj);
         cfs_spin_lock(&obj->oo_seatbelt);
         cfs_list_add(&opg->ops_inflight, &obj->oo_inflight[crt]);
         opg->ops_submitter = cfs_current();
@@ -312,10 +320,22 @@ static void osc_page_completion_read(const struct lu_env *env,
                                      const struct cl_page_slice *slice,
                                      int ioret)
 {
-        struct osc_page *opg = cl2osc_page(slice);
+       struct osc_page   *opg = cl2osc_page(slice);
+       struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
 
-        if (likely(opg->ops_lock))
-                osc_page_putref_lock(env, opg);
+       if (likely(opg->ops_lock))
+               osc_page_putref_lock(env, opg);
+       osc_lru_add(osc_cli(obj), opg);
+}
+
+static void osc_page_completion_write(const struct lu_env *env,
+                                     const struct cl_page_slice *slice,
+                                     int ioret)
+{
+       struct osc_page   *opg = cl2osc_page(slice);
+       struct osc_object *obj = cl2osc(slice->cpl_obj);
+
+       osc_lru_add(osc_cli(obj), opg);
 }
 
 static int osc_page_fail(const struct lu_env *env,
@@ -411,10 +431,17 @@ static void osc_page_delete(const struct lu_env *env,
                               "Trying to teardown failed: %d\n", rc);
                 LASSERT(0);
         }
-        cfs_spin_lock(&obj->oo_seatbelt);
-        cfs_list_del_init(&opg->ops_inflight);
-        cfs_spin_unlock(&obj->oo_seatbelt);
-        EXIT;
+
+       cfs_spin_lock(&obj->oo_seatbelt);
+       if (opg->ops_submitter != NULL) {
+               LASSERT(!cfs_list_empty(&opg->ops_inflight));
+               cfs_list_del_init(&opg->ops_inflight);
+               opg->ops_submitter = NULL;
+       }
+       cfs_spin_unlock(&obj->oo_seatbelt);
+
+       osc_lru_del(osc_cli(obj), opg, true);
+       EXIT;
 }
 
 void osc_page_clip(const struct lu_env *env, const struct cl_page_slice *slice,
@@ -472,10 +499,11 @@ static const struct cl_page_operations osc_page_ops = {
                         .cpo_completion = osc_page_completion_read
                 },
                 [CRT_WRITE] = {
-                        .cpo_cache_add  = osc_page_cache_add
-                }
-        },
-        .cpo_clip           = osc_page_clip,
+                       .cpo_cache_add  = osc_page_cache_add,
+                       .cpo_completion = osc_page_completion_write
+               }
+       },
+       .cpo_clip           = osc_page_clip,
        .cpo_cancel         = osc_page_cancel,
        .cpo_flush          = osc_page_flush
 };
@@ -508,10 +536,18 @@ struct cl_page *osc_page_init(const struct lu_env *env,
 #ifdef INVARIANT_CHECK
                 opg->ops_temp = !osc_page_protected(env, opg, CLM_READ, 1);
 #endif
+               /* ops_inflight and ops_lru are the same field, but it doesn't
+                * hurt to initialize it twice :-) */
                 CFS_INIT_LIST_HEAD(&opg->ops_inflight);
-        } else
-                result = -ENOMEM;
-        return ERR_PTR(result);
+               CFS_INIT_LIST_HEAD(&opg->ops_lru);
+       } else
+               result = -ENOMEM;
+
+       /* reserve an LRU space for this page */
+       if (page->cp_type == CPT_CACHEABLE && result == 0)
+               result = osc_lru_reserve(env, osc, opg);
+
+       return ERR_PTR(result);
 }
 
 /**
@@ -548,4 +584,346 @@ void osc_page_submit(const struct lu_env *env, struct osc_page *opg,
        osc_page_transfer_add(env, opg, crt);
 }
 
+/* --------------- LRU page management ------------------ */
+
+/* OSC is a natural place to manage LRU pages as applications are specialized
+ * to write OSC by OSC. Ideally, if one OSC is used more frequently it should
+ * occupy more LRU slots. On the other hand, we should avoid using up all LRU
+ * slots (client_obd::cl_lru_left) otherwise process has to be put into sleep
+ * for free LRU slots - this will be very bad so the algorithm requires each
+ * OSC to free slots voluntarily to maintain a reasonable number of free slots
+ * at any time.
+ */
+
+static CFS_DECL_WAITQ(osc_lru_waitq);
+static cfs_atomic_t osc_lru_waiters = CFS_ATOMIC_INIT(0);
+/* LRU pages are freed in batch mode. OSC should at least free this
+ * number of pages to avoid running out of LRU budget, and.. */
+static const int lru_shrink_min = 2 << (20 - CFS_PAGE_SHIFT);  /* 2M */
+/* free this number at most otherwise it will take too long time to finsih. */
+static const int lru_shrink_max = 32 << (20 - CFS_PAGE_SHIFT); /* 32M */
+
+/* Check if we can free LRU slots from this OSC. If there exists LRU waiters,
+ * we should free slots aggressively. In this way, slots are freed in a steady
+ * step to maintain fairness among OSCs.
+ *
+ * Return how many LRU pages should be freed. */
+static int osc_cache_too_much(struct client_obd *cli)
+{
+       struct cl_client_lru *lru = cli->cl_lru;
+       int pages = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
+
+       if (cfs_atomic_read(&osc_lru_waiters) > 0 &&
+           cfs_atomic_read(cli->cl_lru_left) < lru_shrink_max)
+               /* drop lru pages aggressively */
+               return min(pages, lru_shrink_max);
+
+       /* if it's going to run out LRU slots, we should free some, but not
+        * too much to maintain faireness among OSCs. */
+       if (cfs_atomic_read(cli->cl_lru_left) < lru->ccl_page_max >> 4) {
+               unsigned long budget;
+
+               budget = lru->ccl_page_max / cfs_atomic_read(&lru->ccl_users);
+               if (pages > budget)
+                       return min(pages, lru_shrink_max);
+
+               return pages > lru_shrink_min ? lru_shrink_min : 0;
+       }
+
+       return 0;
+}
+
+/* Return how many pages are not discarded in @pvec. */
+static int discard_pagevec(const struct lu_env *env, struct cl_io *io,
+                          struct cl_page **pvec, int max_index)
+{
+       int count;
+       int i;
+
+       for (count = 0, i = 0; i < max_index; i++) {
+               struct cl_page *page = pvec[i];
+               if (cl_page_own_try(env, io, page) == 0) {
+                       /* free LRU page only if nobody is using it.
+                        * This check is necessary to avoid freeing the pages
+                        * having already been removed from LRU and pinned
+                        * for IO. */
+                       if (cfs_atomic_read(&page->cp_ref) == 1) {
+                               cl_page_unmap(env, io, page);
+                               cl_page_discard(env, io, page);
+                               ++count;
+                       }
+                       cl_page_disown(env, io, page);
+               }
+               cl_page_put(env, page);
+               pvec[i] = NULL;
+       }
+       return max_index - count;
+}
+
+/**
+ * Drop @target of pages from LRU at most.
+ */
+int osc_lru_shrink(struct client_obd *cli, int target)
+{
+       struct cl_env_nest nest;
+       struct lu_env *env;
+       struct cl_io *io;
+       struct cl_object *clobj = NULL;
+       struct cl_page **pvec;
+       struct osc_page *opg;
+       int maxscan = 0;
+       int count = 0;
+       int index = 0;
+       int rc = 0;
+       ENTRY;
+
+       LASSERT(cfs_atomic_read(&cli->cl_lru_in_list) >= 0);
+       if (cfs_atomic_read(&cli->cl_lru_in_list) == 0 || target <= 0)
+               RETURN(0);
+
+       env = cl_env_nested_get(&nest);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
+
+       pvec = osc_env_info(env)->oti_pvec;
+       io = &osc_env_info(env)->oti_io;
+
+       client_obd_list_lock(&cli->cl_lru_list_lock);
+       cfs_atomic_inc(&cli->cl_lru_shrinkers);
+       maxscan = min(target << 1, cfs_atomic_read(&cli->cl_lru_in_list));
+       while (!cfs_list_empty(&cli->cl_lru_list)) {
+               struct cl_page *page;
+
+               if (--maxscan < 0)
+                       break;
+
+               opg = cfs_list_entry(cli->cl_lru_list.next, struct osc_page,
+                                    ops_lru);
+               page = cl_page_top(opg->ops_cl.cpl_page);
+               if (page->cp_state == CPS_FREEING ||
+                   cfs_atomic_read(&page->cp_ref) > 0) {
+                       cfs_list_move_tail(&opg->ops_lru, &cli->cl_lru_list);
+                       continue;
+               }
+
+               LASSERT(page->cp_obj != NULL);
+               if (clobj != page->cp_obj) {
+                       struct cl_object *tmp = page->cp_obj;
+
+                       cl_object_get(tmp);
+                       client_obd_list_unlock(&cli->cl_lru_list_lock);
+
+                       if (clobj != NULL) {
+                               count -= discard_pagevec(env, io, pvec, index);
+                               index = 0;
+
+                               cl_io_fini(env, io);
+                               cl_object_put(env, clobj);
+                               clobj = NULL;
+                       }
+
+                       clobj = tmp;
+                       io->ci_obj = clobj;
+                       rc = cl_io_init(env, io, CIT_MISC, clobj);
+                       if (rc != 0)
+                               break;
+
+                       ++maxscan;
+                       client_obd_list_lock(&cli->cl_lru_list_lock);
+                       continue;
+               }
+
+               /* move this page to the end of list as it will be discarded
+                * soon. The page will be finally removed from LRU list in
+                * osc_page_delete().  */
+               cfs_list_move_tail(&opg->ops_lru, &cli->cl_lru_list);
+
+               /* it's okay to grab a refcount here w/o holding lock because
+                * it has to grab cl_lru_list_lock to delete the page. */
+               cl_page_get(page);
+               pvec[index++] = page;
+               if (++count >= target)
+                       break;
+
+               if (unlikely(index == OTI_PVEC_SIZE)) {
+                       client_obd_list_unlock(&cli->cl_lru_list_lock);
+                       count -= discard_pagevec(env, io, pvec, index);
+                       index = 0;
+
+                       client_obd_list_lock(&cli->cl_lru_list_lock);
+               }
+       }
+       client_obd_list_unlock(&cli->cl_lru_list_lock);
+
+       if (clobj != NULL) {
+               count -= discard_pagevec(env, io, pvec, index);
+
+               cl_io_fini(env, io);
+               cl_object_put(env, clobj);
+       }
+       cl_env_nested_put(&nest, env);
+
+       cfs_atomic_dec(&cli->cl_lru_shrinkers);
+       RETURN(count > 0 ? count : rc);
+}
+
+static void osc_lru_add(struct client_obd *cli, struct osc_page *opg)
+{
+       bool wakeup = false;
+
+       if (!opg->ops_in_lru)
+               return;
+
+       cfs_atomic_dec(&cli->cl_lru_busy);
+       client_obd_list_lock(&cli->cl_lru_list_lock);
+       if (cfs_list_empty(&opg->ops_lru)) {
+               cfs_list_move_tail(&opg->ops_lru, &cli->cl_lru_list);
+               cfs_atomic_inc_return(&cli->cl_lru_in_list);
+               wakeup = cfs_atomic_read(&osc_lru_waiters) > 0;
+       }
+       client_obd_list_unlock(&cli->cl_lru_list_lock);
+
+       if (wakeup)
+               cfs_waitq_broadcast(&osc_lru_waitq);
+}
+
+/* delete page from LRUlist. The page can be deleted from LRUlist for two
+ * reasons: redirtied or deleted from page cache. */
+static void osc_lru_del(struct client_obd *cli, struct osc_page *opg, bool del)
+{
+       if (opg->ops_in_lru) {
+               client_obd_list_lock(&cli->cl_lru_list_lock);
+               if (!cfs_list_empty(&opg->ops_lru)) {
+                       LASSERT(cfs_atomic_read(&cli->cl_lru_in_list) > 0);
+                       cfs_list_del_init(&opg->ops_lru);
+                       cfs_atomic_dec(&cli->cl_lru_in_list);
+                       if (!del)
+                               cfs_atomic_inc(&cli->cl_lru_busy);
+               } else if (del) {
+                       LASSERT(cfs_atomic_read(&cli->cl_lru_busy) > 0);
+                       cfs_atomic_dec(&cli->cl_lru_busy);
+               }
+               client_obd_list_unlock(&cli->cl_lru_list_lock);
+               if (del) {
+                       cfs_atomic_inc(cli->cl_lru_left);
+                       /* this is a great place to release more LRU pages if
+                        * this osc occupies too many LRU pages and kernel is
+                        * stealing one of them.
+                        * cl_lru_shrinkers is to avoid recursive call in case
+                        * we're already in the context of osc_lru_shrink(). */
+                       if (cfs_atomic_read(&cli->cl_lru_shrinkers) == 0)
+                               osc_lru_shrink(cli, osc_cache_too_much(cli));
+                       cfs_waitq_signal(&osc_lru_waitq);
+               }
+       } else {
+               LASSERT(cfs_list_empty(&opg->ops_lru));
+       }
+}
+
+static int osc_lru_reclaim(struct client_obd *cli)
+{
+       struct cl_client_lru *lru = cli->cl_lru;
+       struct client_obd *victim;
+       struct client_obd *tmp;
+       int rc;
+
+       LASSERT(lru != NULL);
+       LASSERT(!cfs_list_empty(&lru->ccl_list));
+
+       rc = osc_lru_shrink(cli, lru_shrink_min);
+       if (rc > 0) {
+               CDEBUG(D_CACHE, "%s: Free %d pages from own LRU: %p.\n",
+                       cli->cl_import->imp_obd->obd_name, rc, cli);
+               return rc;
+       }
+
+       CDEBUG(D_CACHE, "%s: cli %p no free slots, pages: %d, busy: %d.\n",
+               cli->cl_import->imp_obd->obd_name, cli,
+               cfs_atomic_read(&cli->cl_lru_in_list),
+               cfs_atomic_read(&cli->cl_lru_busy));
+
+       /* Reclaim LRU slots from other client_obd as it can't free enough
+        * from its own. This should rarely happen. */
+       cfs_spin_lock(&lru->ccl_lock);
+       lru->ccl_reclaim_count++;
+       cfs_list_move_tail(&cli->cl_lru_osc, &lru->ccl_list);
+       cfs_list_for_each_entry_safe(victim, tmp, &lru->ccl_list, cl_lru_osc) {
+               if (victim == cli)
+                       break;
+
+               CDEBUG(D_CACHE, "%s: cli %p LRU pages: %d, busy: %d.\n",
+                       victim->cl_import->imp_obd->obd_name, victim,
+                       cfs_atomic_read(&victim->cl_lru_in_list),
+                       cfs_atomic_read(&victim->cl_lru_busy));
+
+               cfs_list_move_tail(&victim->cl_lru_osc, &lru->ccl_list);
+               if (cfs_atomic_read(&victim->cl_lru_in_list) > 0)
+                       break;
+       }
+       cfs_spin_unlock(&lru->ccl_lock);
+       if (victim == cli) {
+               CDEBUG(D_CACHE, "%s: can't get any free LRU slots.\n",
+                       cli->cl_import->imp_obd->obd_name);
+               return 0;
+       }
+
+       rc = osc_lru_shrink(victim,
+                           min(cfs_atomic_read(&victim->cl_lru_in_list) >> 1,
+                               lru_shrink_max));
+
+       CDEBUG(D_CACHE, "%s: Free %d pages from other cli: %p.\n",
+               cli->cl_import->imp_obd->obd_name, rc, victim);
+
+       return rc;
+}
+
+static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj,
+                          struct osc_page *opg)
+{
+       struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+       struct client_obd *cli = osc_cli(obj);
+       int rc = 0;
+       ENTRY;
+
+       if (cli->cl_lru == NULL) /* shall not be in LRU */
+               RETURN(0);
+
+       LASSERT(cfs_atomic_read(cli->cl_lru_left) >= 0);
+       while (!cfs_atomic_add_unless(cli->cl_lru_left, -1, 0)) {
+               int gen;
+
+               /* run out of LRU spaces, try to drop some by itself */
+               rc = osc_lru_reclaim(cli);
+               if (rc < 0)
+                       break;
+               if (rc > 0)
+                       continue;
+
+               cfs_cond_resched();
+
+               /* slowest case, all of caching pages are busy, notifying
+                * other OSCs that we're lack of LRU slots. */
+               cfs_atomic_inc(&osc_lru_waiters);
+
+               gen = cfs_atomic_read(&cli->cl_lru_in_list);
+               rc = l_wait_event(osc_lru_waitq,
+                               cfs_atomic_read(cli->cl_lru_left) > 0 ||
+                               (cfs_atomic_read(&cli->cl_lru_in_list) > 0 &&
+                                gen != cfs_atomic_read(&cli->cl_lru_in_list)),
+                               &lwi);
+
+               cfs_atomic_dec(&osc_lru_waiters);
+               if (rc < 0)
+                       break;
+       }
+
+       if (rc >= 0) {
+               cfs_atomic_inc(&cli->cl_lru_busy);
+               opg->ops_in_lru = 1;
+               rc = 0;
+       }
+
+       RETURN(rc);
+}
+
 /** @} osc */
index 8f990cf..19d60f0 100644 (file)
@@ -3199,6 +3199,33 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
                 RETURN(0);
         }
 
+       if (KEY_IS(KEY_LRU_SET)) {
+               struct client_obd *cli = &obd->u.cli;
+
+               LASSERT(cli->cl_lru == NULL); /* only once */
+               cli->cl_lru = (struct cl_client_lru *)val;
+               cfs_atomic_inc(&cli->cl_lru->ccl_users);
+               cli->cl_lru_left = &cli->cl_lru->ccl_page_left;
+
+               /* add this osc into entity list */
+               LASSERT(cfs_list_empty(&cli->cl_lru_osc));
+               cfs_spin_lock(&cli->cl_lru->ccl_lock);
+               cfs_list_add(&cli->cl_lru_osc, &cli->cl_lru->ccl_list);
+               cfs_spin_unlock(&cli->cl_lru->ccl_lock);
+
+               RETURN(0);
+       }
+
+       if (KEY_IS(KEY_LRU_SHRINK)) {
+               struct client_obd *cli = &obd->u.cli;
+               int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
+               int target = *(int *)val;
+
+               nr = osc_lru_shrink(cli, min(nr, target));
+               *(int *)val -= nr;
+               RETURN(0);
+       }
+
         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
                 RETURN(-EINVAL);
 
@@ -3594,9 +3621,21 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
 
 int osc_cleanup(struct obd_device *obd)
 {
-        int rc;
+       struct client_obd *cli = &obd->u.cli;
+       int rc;
 
-        ENTRY;
+       ENTRY;
+
+       /* lru cleanup */
+       if (cli->cl_lru != NULL) {
+               LASSERT(cfs_atomic_read(&cli->cl_lru->ccl_users) > 0);
+               cfs_spin_lock(&cli->cl_lru->ccl_lock);
+               cfs_list_del_init(&cli->cl_lru_osc);
+               cfs_spin_unlock(&cli->cl_lru->ccl_lock);
+               cli->cl_lru_left = NULL;
+               cfs_atomic_dec(&cli->cl_lru->ccl_users);
+               cli->cl_lru = NULL;
+       }
 
         /* free memory of osc quota cache */
         osc_quota_cleanup(obd);