Whamcloud - gitweb
LU-12142 clio: fix hang on urgent cached pages 37/40237/12
authorWang Shilong <wshilong@ddn.com>
Wed, 14 Oct 2020 02:49:49 +0000 (10:49 +0800)
committerOleg Drokin <green@whamcloud.com>
Tue, 6 Apr 2021 03:01:51 +0000 (03:01 +0000)
Few problems addressed by this patch:

1) We try to reserve cl_pages in batch, but we don't do
that for append IO, there is no reason to skip that.

2) IO might be not page aligned, calculate reserved pages
correctly for this case.

3) If we issue one large IO block size which is larger
than max_cached_mb, IO will never be finished, because
we don't have enough cl pages to finish it, split IO
in this case.

4) Readahead should fail if we are short of LRU page
slots to avoid deadlock.

After above adjustment, LRU slots are guranteed for normal
buffer write before IO starts, if block size is too large
for max LRU slots, IO will be split.

For extra readahead, don't try hard and quit if we
are short of LRU pages, since readahead could tolerate
errors, applications won't be aware of it.

besides newly added tests, following command with 64M
max_cached_mb setting and don't see client hang any more.

/usr/lib64/openmpi/bin/mpirun --allow-run-as-root -np 12
-wd /mnt/lustre ior -g -e -w -r -b 1g -T 10 -F -C -t 64m

Todo:
Performance benchmark for readahead

Signed-off-by: Wang Shilong <wshilong@ddn.com>
Change-Id: I5c85454a40daeefb4fb97609d6aa28df2eafb99c
Reviewed-on: https://review.whamcloud.com/40237
Tested-by: jenkins <devops@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Bobi Jam <bobijam@hotmail.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/cl_object.h
lustre/include/lustre_osc.h
lustre/llite/file.c
lustre/llite/rw.c
lustre/llite/vvp_io.c
lustre/lov/lov_io.c
lustre/mdc/mdc_dev.c
lustre/obdclass/cl_io.c
lustre/osc/osc_io.c
lustre/osc/osc_page.c
lustre/tests/sanity.sh

index 486e695..9849ea2 100644 (file)
@@ -1482,9 +1482,13 @@ struct cl_read_ahead {
        unsigned long   cra_rpc_pages;
        /* Release callback. If readahead holds resources underneath, this
         * function should be called to release it. */
-       void            (*cra_release)(const struct lu_env *env, void *cbdata);
+       void            (*cra_release)(const struct lu_env *env,
+                                      struct cl_read_ahead *ra);
+
        /* Callback data for cra_release routine */
-       void            *cra_cbdata;
+       void            *cra_dlmlock;
+       void            *cra_oio;
+
        /* whether lock is in contention */
        bool            cra_contention;
 };
@@ -1493,7 +1497,7 @@ static inline void cl_read_ahead_release(const struct lu_env *env,
                                         struct cl_read_ahead *ra)
 {
        if (ra->cra_release != NULL)
-               ra->cra_release(env, ra->cra_cbdata);
+               ra->cra_release(env, ra);
        memset(ra, 0, sizeof(*ra));
 }
 
@@ -1616,6 +1620,13 @@ struct cl_io_operations {
        int (*cio_read_ahead)(const struct lu_env *env,
                              const struct cl_io_slice *slice,
                              pgoff_t start, struct cl_read_ahead *ra);
+       /**
+        *
+        * Reserve LRU slots before IO.
+        */
+       int (*cio_lru_reserve) (const struct lu_env *env,
+                               const struct cl_io_slice *slice,
+                               loff_t pos, size_t bytes);
         /**
          * Optional debugging helper. Print given io slice.
          */
@@ -2421,6 +2432,8 @@ int   cl_io_commit_async (const struct lu_env *env, struct cl_io *io,
                          struct cl_page_list *queue, int from, int to,
                          cl_commit_cbt cb);
 void  cl_io_extent_release (const struct lu_env *env, struct cl_io *io);
+int cl_io_lru_reserve(const struct lu_env *env, struct cl_io *io,
+                     loff_t pos, size_t bytes);
 int   cl_io_read_ahead   (const struct lu_env *env, struct cl_io *io,
                          pgoff_t start, struct cl_read_ahead *ra);
 void  cl_io_rw_advance   (const struct lu_env *env, struct cl_io *io,
index ebce916..8e18d61 100644 (file)
@@ -129,7 +129,9 @@ struct osc_io {
        /** true if this io is counted as active IO */
                           oi_is_active:1,
        /** true if this io has CAP_SYS_RESOURCE */
-                          oi_cap_sys_resource:1;
+                          oi_cap_sys_resource:1,
+       /** true if this io issued by readahead */
+                          oi_is_readahead:1;
        /** how many LRU pages are reserved for this IO */
        unsigned long      oi_lru_reserved;
 
@@ -684,8 +686,6 @@ void osc_io_extent_release(const struct lu_env *env,
 int osc_io_iter_init(const struct lu_env *env, const struct cl_io_slice *ios);
 void osc_io_iter_fini(const struct lu_env *env,
                      const struct cl_io_slice *ios);
-int osc_io_rw_iter_init(const struct lu_env *env,
-                       const struct cl_io_slice *ios);
 void osc_io_rw_iter_fini(const struct lu_env *env,
                            const struct cl_io_slice *ios);
 int osc_io_fault_start(const struct lu_env *env, const struct cl_io_slice *ios);
@@ -700,11 +700,13 @@ int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj,
                  struct cl_fsync_io *fio);
 void osc_io_fsync_end(const struct lu_env *env,
                      const struct cl_io_slice *slice);
-void osc_read_ahead_release(const struct lu_env *env, void *cbdata);
+void osc_read_ahead_release(const struct lu_env *env, struct cl_read_ahead *ra);
 int osc_io_lseek_start(const struct lu_env *env,
                       const struct cl_io_slice *slice);
 void osc_io_lseek_end(const struct lu_env *env,
                      const struct cl_io_slice *slice);
+int osc_io_lru_reserve(const struct lu_env *env, const struct cl_io_slice *ios,
+                      loff_t pos, size_t count);
 
 /* osc_lock.c */
 void osc_lock_to_lockless(const struct lu_env *env, struct osc_lock *ols,
index 76350c0..e0a902a 100644 (file)
@@ -1542,6 +1542,7 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
        struct vvp_io *vio = vvp_env_io(env);
        struct inode *inode = file_inode(file);
        struct ll_inode_info *lli = ll_i2info(inode);
+       struct ll_sb_info *sbi = ll_i2sbi(inode);
        struct ll_file_data *fd  = file->private_data;
        struct range_lock range;
        struct cl_io *io;
@@ -1550,6 +1551,9 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
        unsigned int retried = 0, dio_lock = 0;
        bool is_aio = false;
        struct cl_dio_aio *ci_aio = NULL;
+       size_t per_bytes;
+       bool partial_io = false;
+       size_t max_io_pages, max_cached_pages;
 
        ENTRY;
 
@@ -1557,6 +1561,11 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
                file_dentry(file)->d_name.name,
                iot == CIT_READ ? "read" : "write", *ppos, count);
 
+       max_io_pages = PTLRPC_MAX_BRW_PAGES * OBD_MAX_RIF_DEFAULT;
+       max_cached_pages = sbi->ll_cache->ccc_lru_max;
+       if (max_io_pages > (max_cached_pages >> 2))
+               max_io_pages = max_cached_pages >> 2;
+
        io = vvp_env_thread_io(env);
        if (file->f_flags & O_DIRECT) {
                if (!is_sync_kiocb(args->u.normal.via_iocb))
@@ -1567,19 +1576,29 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
        }
 
 restart:
+       /**
+        * IO block size need be aware of cached page limit, otherwise
+        * if we have small max_cached_mb but large block IO issued, io
+        * could not be finished and blocked whole client.
+        */
+       if (file->f_flags & O_DIRECT)
+               per_bytes = count;
+       else
+               per_bytes = min(max_io_pages << PAGE_SHIFT, count);
+       partial_io = per_bytes < count;
        io = vvp_env_thread_io(env);
        ll_io_init(io, file, iot, args);
        io->ci_aio = ci_aio;
        io->ci_dio_lock = dio_lock;
        io->ci_ndelay_tried = retried;
 
-       if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
+       if (cl_io_rw_init(env, io, iot, *ppos, per_bytes) == 0) {
                bool range_locked = false;
 
                if (file->f_flags & O_APPEND)
                        range_lock_init(&range, 0, LUSTRE_EOF);
                else
-                       range_lock_init(&range, *ppos, *ppos + count - 1);
+                       range_lock_init(&range, *ppos, *ppos + per_bytes - 1);
 
                vio->vui_fd  = file->private_data;
                vio->vui_iter = args->u.normal.via_iter;
@@ -1631,6 +1650,16 @@ restart:
                /* prepare IO restart */
                if (count > 0)
                        args->u.normal.via_iter = vio->vui_iter;
+
+               if (partial_io) {
+                       /**
+                        * Reexpand iov count because it was zero
+                        * after IO finish.
+                        */
+                       iov_iter_reexpand(vio->vui_iter, count);
+                       if (per_bytes == io->ci_nob)
+                               io->ci_need_restart = 1;
+               }
        }
 out:
        cl_io_fini(env, io);
index 743e2c1..c1563a9 100644 (file)
@@ -85,17 +85,26 @@ static unsigned long ll_ra_count_get(struct ll_sb_info *sbi,
                                     unsigned long pages,
                                     unsigned long pages_min)
 {
-        struct ll_ra_info *ra = &sbi->ll_ra_info;
-        long ret;
+       struct ll_ra_info *ra = &sbi->ll_ra_info;
+       long ret;
+
         ENTRY;
 
-        /* If read-ahead pages left are less than 1M, do not do read-ahead,
-         * otherwise it will form small read RPC(< 1M), which hurt server
-         * performance a lot. */
+       /**
+        * Don't try readahead agreesively if we are limited
+        * LRU pages, otherwise, it could cause deadlock.
+        */
+       pages = min(sbi->ll_cache->ccc_lru_max >> 2, pages);
+
+       /*
+        * If read-ahead pages left are less than 1M, do not do read-ahead,
+        * otherwise it will form small read RPC(< 1M), which hurt server
+        * performance a lot.
+        */
        ret = min(ra->ra_max_pages - atomic_read(&ra->ra_cur_pages),
                  pages);
-        if (ret < 0 || ret < min_t(long, PTLRPC_MAX_BRW_PAGES, pages))
-                GOTO(out, ret = 0);
+       if (ret < 0 || ret < min_t(long, PTLRPC_MAX_BRW_PAGES, pages))
+               GOTO(out, ret = 0);
 
        if (atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) {
                atomic_sub(ret, &ra->ra_cur_pages);
@@ -691,10 +700,26 @@ static int ll_readahead(const struct lu_env *env, struct cl_io *io,
        struct cl_object *clob;
        int ret = 0;
        __u64 kms;
+       struct ll_sb_info *sbi;
+       struct ll_ra_info *ra;
+
        ENTRY;
 
+        ENTRY;
+
        clob = io->ci_obj;
        inode = vvp_object_inode(clob);
+       sbi = ll_i2sbi(inode);
+       ra = &sbi->ll_ra_info;
+
+       /**
+        * In case we have a limited max_cached_mb, readahead
+        * should be stopped if it have run out of all LRU slots.
+        */
+       if (atomic_read(&ra->ra_cur_pages) >= sbi->ll_cache->ccc_lru_max) {
+               ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
+               RETURN(0);
+       }
 
        memset(ria, 0, sizeof(*ria));
        ret = ll_readahead_file_kms(env, io, &kms);
@@ -1700,6 +1725,15 @@ static int kickoff_async_readahead(struct file *file, unsigned long pages)
        pgoff_t start_idx = ras_align(ras, ras->ras_next_readahead_idx);
        pgoff_t end_idx = start_idx + pages - 1;
 
+       /**
+        * In case we have a limited max_cached_mb, readahead
+        * should be stopped if it have run out of all LRU slots.
+        */
+       if (atomic_read(&ra->ra_cur_pages) >= sbi->ll_cache->ccc_lru_max) {
+               ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
+               return 0;
+       }
+
        throttle = min(ra->ra_async_pages_per_file_threshold,
                       ra->ra_max_pages_per_file);
        /*
index f36d036..e4faf2c 100644 (file)
@@ -818,6 +818,12 @@ static int vvp_io_read_start(const struct lu_env *env,
        if (!can_populate_pages(env, io, inode))
                RETURN(0);
 
+       if (!(file->f_flags & O_DIRECT)) {
+               result = cl_io_lru_reserve(env, io, pos, cnt);
+               if (result)
+                       RETURN(result);
+       }
+
        /* Unless this is reading a sparse file, otherwise the lock has already
         * been acquired so vvp_prep_size() is an empty op. */
        result = vvp_prep_size(env, obj, io, pos, cnt, &exceed);
@@ -1236,6 +1242,12 @@ static int vvp_io_write_start(const struct lu_env *env,
        if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_IMUTEX_NOSEC) && lock_inode)
                RETURN(-EINVAL);
 
+       if (!(file->f_flags & O_DIRECT)) {
+               result = cl_io_lru_reserve(env, io, pos, cnt);
+               if (result)
+                       RETURN(result);
+       }
+
        if (vio->vui_iter == NULL) {
                /* from a temp io in ll_cl_init(). */
                result = 0;
index 4cd93fa..5cfb773 100644 (file)
@@ -1206,6 +1206,60 @@ static int lov_io_read_ahead(const struct lu_env *env,
        RETURN(0);
 }
 
+int lov_io_lru_reserve(const struct lu_env *env,
+                      const struct cl_io_slice *ios, loff_t pos, size_t bytes)
+{
+       struct lov_io *lio = cl2lov_io(env, ios);
+       struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
+       struct lov_io_sub *sub;
+       struct lu_extent ext;
+       int index;
+       int rc = 0;
+
+       ENTRY;
+
+       ext.e_start = pos;
+       ext.e_end = pos + bytes;
+       lov_foreach_io_layout(index, lio, &ext) {
+               struct lov_layout_entry *le = lov_entry(lio->lis_object, index);
+               struct lov_layout_raid0 *r0 = &le->lle_raid0;
+               u64 start;
+               u64 end;
+               int stripe;
+
+               if (!lsm_entry_inited(lsm, index))
+                       continue;
+
+               if (!le->lle_valid && !ios->cis_io->ci_designated_mirror) {
+                       CERROR(DFID": I/O to invalid component: %d, mirror: %d\n",
+                              PFID(lu_object_fid(lov2lu(lio->lis_object))),
+                              index, lio->lis_mirror_index);
+                       RETURN(-EIO);
+               }
+
+               for (stripe = 0; stripe < r0->lo_nr; stripe++) {
+                       if (!lov_stripe_intersects(lsm, index, stripe,
+                                                  &ext, &start, &end))
+                               continue;
+
+                       if (unlikely(!r0->lo_sub[stripe]))
+                               RETURN(-EIO);
+
+                       sub = lov_sub_get(env, lio,
+                                         lov_comp_index(index, stripe));
+                       if (IS_ERR(sub))
+                               return PTR_ERR(sub);
+
+                       rc = cl_io_lru_reserve(sub->sub_env, &sub->sub_io, start,
+                                              end - start + 1);
+                       if (rc != 0)
+                               RETURN(rc);
+               }
+       }
+
+       RETURN(0);
+}
+
 /**
  * lov implementation of cl_operations::cio_submit() method. It takes a list
  * of pages in \a queue, splits it into per-stripe sub-lists, invokes
@@ -1618,6 +1672,7 @@ static const struct cl_io_operations lov_io_ops = {
                }
        },
        .cio_read_ahead                = lov_io_read_ahead,
+       .cio_lru_reserve               = lov_io_lru_reserve,
        .cio_submit                    = lov_io_submit,
        .cio_commit_async              = lov_io_commit_async,
 };
index f179e6f..db9e83b 100644 (file)
@@ -1135,6 +1135,7 @@ static int mdc_io_read_ahead(const struct lu_env *env,
                             pgoff_t start, struct cl_read_ahead *ra)
 {
        struct osc_object *osc = cl2osc(ios->cis_obj);
+       struct osc_io *oio = cl2osc_io(env, ios);
        struct ldlm_lock *dlmlock;
 
        ENTRY;
@@ -1143,6 +1144,7 @@ static int mdc_io_read_ahead(const struct lu_env *env,
        if (dlmlock == NULL)
                RETURN(-ENODATA);
 
+       oio->oi_is_readahead = 1;
        if (dlmlock->l_req_mode != LCK_PR) {
                struct lustre_handle lockh;
 
@@ -1154,7 +1156,8 @@ static int mdc_io_read_ahead(const struct lu_env *env,
        ra->cra_rpc_pages = osc_cli(osc)->cl_max_pages_per_rpc;
        ra->cra_end_idx = CL_PAGE_EOF;
        ra->cra_release = osc_read_ahead_release;
-       ra->cra_cbdata = dlmlock;
+       ra->cra_dlmlock = dlmlock;
+       ra->cra_oio = oio;
 
        RETURN(0);
 }
@@ -1317,12 +1320,12 @@ static void mdc_io_data_version_end(const struct lu_env *env,
 static const struct cl_io_operations mdc_io_ops = {
        .op = {
                [CIT_READ] = {
-                       .cio_iter_init = osc_io_rw_iter_init,
+                       .cio_iter_init = osc_io_iter_init,
                        .cio_iter_fini = osc_io_rw_iter_fini,
                        .cio_start     = osc_io_read_start,
                },
                [CIT_WRITE] = {
-                       .cio_iter_init = osc_io_rw_iter_init,
+                       .cio_iter_init = osc_io_iter_init,
                        .cio_iter_fini = osc_io_rw_iter_fini,
                        .cio_start     = osc_io_write_start,
                        .cio_end       = osc_io_end,
@@ -1353,6 +1356,7 @@ static const struct cl_io_operations mdc_io_ops = {
                },
        },
        .cio_read_ahead   = mdc_io_read_ahead,
+       .cio_lru_reserve  = osc_io_lru_reserve,
        .cio_submit       = osc_io_submit,
        .cio_commit_async = osc_io_commit_async,
        .cio_extent_release = osc_io_extent_release,
index 425567f..84f2109 100644 (file)
@@ -584,6 +584,35 @@ int cl_io_read_ahead(const struct lu_env *env, struct cl_io *io,
 EXPORT_SYMBOL(cl_io_read_ahead);
 
 /**
+ * Called before io start, to reserve enough LRU slots to avoid
+ * deadlock.
+ *
+ * \see cl_io_operations::cio_lru_reserve()
+ */
+int cl_io_lru_reserve(const struct lu_env *env, struct cl_io *io,
+                     loff_t pos, size_t bytes)
+{
+       const struct cl_io_slice *scan;
+       int result = 0;
+
+       LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
+       LINVRNT(cl_io_invariant(io));
+       ENTRY;
+
+       list_for_each_entry(scan, &io->ci_layers, cis_linkage) {
+               if (scan->cis_iop->cio_lru_reserve) {
+                       result = scan->cis_iop->cio_lru_reserve(env, scan,
+                                                               pos, bytes);
+                       if (result)
+                               break;
+               }
+       }
+
+       RETURN(result);
+}
+EXPORT_SYMBOL(cl_io_lru_reserve);
+
+/**
  * Commit a list of contiguous pages into writeback cache.
  *
  * \returns 0 if all pages committed, or errcode if error occurred.
index 062fbe4..b2ec8e7 100644 (file)
@@ -58,11 +58,13 @@ static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io)
 {
 }
 
-void osc_read_ahead_release(const struct lu_env *env, void *cbdata)
+void osc_read_ahead_release(const struct lu_env *env, struct cl_read_ahead *ra)
 {
-       struct ldlm_lock *dlmlock = cbdata;
+       struct ldlm_lock *dlmlock = ra->cra_dlmlock;
+       struct osc_io *oio = ra->cra_oio;
        struct lustre_handle lockh;
 
+       oio->oi_is_readahead = 0;
        ldlm_lock2handle(dlmlock, &lockh);
        ldlm_lock_decref(&lockh, LCK_PR);
        LDLM_LOCK_PUT(dlmlock);
@@ -73,11 +75,14 @@ static int osc_io_read_ahead(const struct lu_env *env,
                             const struct cl_io_slice *ios,
                             pgoff_t start, struct cl_read_ahead *ra)
 {
-       struct osc_object       *osc = cl2osc(ios->cis_obj);
-       struct ldlm_lock        *dlmlock;
-       int                     result = -ENODATA;
+       struct osc_object *osc = cl2osc(ios->cis_obj);
+       struct osc_io *oio = cl2osc_io(env, ios);
+       struct ldlm_lock *dlmlock;
+       int result = -ENODATA;
+
        ENTRY;
 
+       oio->oi_is_readahead = true;
        dlmlock = osc_dlmlock_at_pgoff(env, osc, start, 0);
        if (dlmlock != NULL) {
                LASSERT(dlmlock->l_ast_data == osc);
@@ -92,7 +97,8 @@ static int osc_io_read_ahead(const struct lu_env *env,
                ra->cra_end_idx = cl_index(osc2cl(osc),
                                           dlmlock->l_policy_data.l_extent.end);
                ra->cra_release = osc_read_ahead_release;
-               ra->cra_cbdata = dlmlock;
+               ra->cra_dlmlock = dlmlock;
+               ra->cra_oio = oio;
                if (ra->cra_end_idx != CL_PAGE_EOF)
                        ra->cra_contention = true;
                result = 0;
@@ -398,6 +404,7 @@ int osc_io_iter_init(const struct lu_env *env, const struct cl_io_slice *ios)
        struct obd_import *imp = osc_cli(osc)->cl_import;
        struct osc_io *oio = osc_env_io(env);
        int rc = -EIO;
+
        ENTRY;
 
        spin_lock(&imp->imp_lock);
@@ -422,28 +429,6 @@ int osc_io_iter_init(const struct lu_env *env, const struct cl_io_slice *ios)
 }
 EXPORT_SYMBOL(osc_io_iter_init);
 
-int osc_io_rw_iter_init(const struct lu_env *env,
-                       const struct cl_io_slice *ios)
-{
-       struct cl_io *io = ios->cis_io;
-       struct osc_io *oio = osc_env_io(env);
-       struct osc_object *osc = cl2osc(ios->cis_obj);
-       unsigned long npages;
-       ENTRY;
-
-       if (cl_io_is_append(io))
-               RETURN(osc_io_iter_init(env, ios));
-
-       npages = io->u.ci_rw.crw_count >> PAGE_SHIFT;
-       if (io->u.ci_rw.crw_pos & ~PAGE_MASK)
-               ++npages;
-
-       oio->oi_lru_reserved = osc_lru_reserve(osc_cli(osc), npages);
-
-       RETURN(osc_io_iter_init(env, ios));
-}
-EXPORT_SYMBOL(osc_io_rw_iter_init);
-
 void osc_io_iter_fini(const struct lu_env *env,
                      const struct cl_io_slice *ios)
 {
@@ -1190,16 +1175,42 @@ void osc_io_lseek_end(const struct lu_env *env,
 }
 EXPORT_SYMBOL(osc_io_lseek_end);
 
+int osc_io_lru_reserve(const struct lu_env *env,
+                      const struct cl_io_slice *ios,
+                      loff_t pos, size_t bytes)
+{
+       struct osc_object *osc = cl2osc(ios->cis_obj);
+       struct osc_io *oio = osc_env_io(env);
+       unsigned long npages = 0;
+       size_t page_offset;
+
+       ENTRY;
+
+       page_offset = pos & ~PAGE_MASK;
+       if (page_offset) {
+               ++npages;
+               if (bytes > PAGE_SIZE - page_offset)
+                       bytes -= (PAGE_SIZE - page_offset);
+               else
+                       bytes = 0;
+       }
+       npages += (bytes + PAGE_SIZE - 1) >> PAGE_SHIFT;
+       oio->oi_lru_reserved = osc_lru_reserve(osc_cli(osc), npages);
+
+       RETURN(0);
+}
+EXPORT_SYMBOL(osc_io_lru_reserve);
+
 static const struct cl_io_operations osc_io_ops = {
        .op = {
                [CIT_READ] = {
-                       .cio_iter_init = osc_io_rw_iter_init,
+                       .cio_iter_init = osc_io_iter_init,
                        .cio_iter_fini = osc_io_rw_iter_fini,
                        .cio_start  = osc_io_read_start,
                        .cio_fini   = osc_io_fini
                },
                [CIT_WRITE] = {
-                       .cio_iter_init = osc_io_rw_iter_init,
+                       .cio_iter_init = osc_io_iter_init,
                        .cio_iter_fini = osc_io_rw_iter_fini,
                        .cio_start  = osc_io_write_start,
                        .cio_end    = osc_io_end,
@@ -1242,6 +1253,7 @@ static const struct cl_io_operations osc_io_ops = {
                }
        },
        .cio_read_ahead             = osc_io_read_ahead,
+       .cio_lru_reserve            = osc_io_lru_reserve,
        .cio_submit                 = osc_io_submit,
        .cio_commit_async           = osc_io_commit_async,
        .cio_extent_release         = osc_io_extent_release
index 0f009b3..065bebe 100644 (file)
@@ -794,6 +794,13 @@ static int osc_lru_alloc(const struct lu_env *env, struct client_obd *cli,
                        break;
                if (rc > 0)
                        continue;
+               /* IO issued by readahead, don't try hard */
+               if (oio->oi_is_readahead) {
+                       if (atomic_long_read(cli->cl_lru_left) > 0)
+                               continue;
+                       rc = -EBUSY;
+                       break;
+               }
 
                cond_resched();
                rc = l_wait_event_abortable(
@@ -826,16 +833,23 @@ unsigned long osc_lru_reserve(struct client_obd *cli, unsigned long npages)
        unsigned long reserved = 0;
        unsigned long max_pages;
        unsigned long c;
+       int rc;
 
-       /* reserve a full RPC window at most to avoid that a thread accidentally
-        * consumes too many LRU slots */
-       max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
-       if (npages > max_pages)
-               npages = max_pages;
-
+again:
        c = atomic_long_read(cli->cl_lru_left);
        if (c < npages && osc_lru_reclaim(cli, npages) > 0)
                c = atomic_long_read(cli->cl_lru_left);
+
+       if (c < npages) {
+               /*
+                * Trigger writeback in the hope some LRU slot could
+                * be freed.
+                */
+               rc = ptlrpcd_queue_work(cli->cl_writeback_work);
+               if (rc)
+                       return 0;
+       }
+
        while (c >= npages) {
                if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) {
                        reserved = npages;
@@ -843,6 +857,16 @@ unsigned long osc_lru_reserve(struct client_obd *cli, unsigned long npages)
                }
                c = atomic_long_read(cli->cl_lru_left);
        }
+
+       if (reserved != npages) {
+               cond_resched();
+               rc = l_wait_event_abortable(
+                       osc_lru_waitq,
+                       atomic_long_read(cli->cl_lru_left) > 0);
+               goto again;
+       }
+
+       max_pages = cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
        if (atomic_long_read(cli->cl_lru_left) < max_pages) {
                /* If there aren't enough pages in the per-OSC LRU then
                 * wake up the LRU thread to try and clear out space, so
index dba21e5..dbd3dab 100755 (executable)
@@ -24270,6 +24270,32 @@ test_427() {
 }
 run_test 427 "Failed DNE2 update request shouldn't corrupt updatelog"
 
+test_428() {
+       [ $PARALLEL == "yes" ] && skip "skip parallel run"
+       local cache_limit=$CACHE_MAX
+
+       stack_trap "$LCTL set_param -n llite.*.max_cached_mb=$cache_limit"
+       $LCTL set_param -n llite.*.max_cached_mb=64
+
+       mkdir $DIR/$tdir
+       $LFS setstripe -c 1 $DIR/$tdir
+       eval touch $DIR/$tdir/$tfile.{1..$OSTCOUNT}
+       stack_trap "rm -f $DIR/$tdir/$tfile.*"
+       #test write
+       for f in $(seq 4); do
+               dd if=/dev/zero of=$DIR/$tdir/$tfile.$f bs=128M count=1 &
+       done
+       wait
+
+       cancel_lru_locks osc
+       # Test read
+       for f in $(seq 4); do
+               dd if=$DIR/$tdir/$tfile.$f of=/dev/null bs=128M count=1 &
+       done
+       wait
+}
+run_test 428 "large block size IO should not hang"
+
 lseek_test_430() {
        local offset
        local file=$1