Whamcloud - gitweb
LU-394: LND failure casued by discontiguous KIOV
authorJinshan Xiong <jay@whamcloud.com>
Tue, 12 Jul 2011 17:55:19 +0000 (10:55 -0700)
committerOleg Drokin <green@whamcloud.com>
Tue, 26 Jul 2011 15:47:48 +0000 (11:47 -0400)
This issue was imported by bug 18881 where I moved the urgent
pages to front of lop_pending to fix a deadlock issue.
I reverted bug 18881 in this patch and came up with a new solution:
cl_page_gang_lookup() only blocks on the first page. This is also
for deadlock avoid since we should never grab multiple pages' lock
without try method.

Change-Id: I5dce35e3929e4f79a350e56ddc9e752269db060e
Signed-off-by: Jinshan Xiong <jay@whamcloud.com>
Signed-off-by: Oleg Drokin <green@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/911

lustre/include/cl_object.h
lustre/include/obd_support.h
lustre/llite/vvp_io.c
lustre/obdclass/cl_lock.c
lustre/obdclass/cl_page.c
lustre/osc/osc_io.c
lustre/osc/osc_lock.c
lustre/osc/osc_object.c
lustre/osc/osc_page.c
lustre/osc/osc_request.c
lustre/tests/sanity.sh

index 76ee194..7398ca3 100644 (file)
@@ -2654,15 +2654,19 @@ static inline int cl_object_same(struct cl_object *o0, struct cl_object *o1)
 
 /** \defgroup cl_page cl_page
  * @{ */
-struct cl_page       *cl_page_lookup(struct cl_object_header *hdr,
+enum {
+        CLP_GANG_OKAY = 0,
+        CLP_GANG_AGAIN,
+        CLP_GANG_RESCHED
+};
+
+int             cl_page_gang_lookup (const struct lu_env *env,
+                                     struct cl_object *obj,
+                                     struct cl_io *io,
+                                     pgoff_t start, pgoff_t end,
+                                     struct cl_page_list *plist);
+struct cl_page *cl_page_lookup      (struct cl_object_header *hdr,
                                      pgoff_t index);
-void                  cl_page_gang_lookup(const struct lu_env *env,
-                                          struct cl_object *obj,
-                                          struct cl_io *io,
-                                          pgoff_t start, pgoff_t end,
-                                          struct cl_page_list *plist,
-                                          int nonblock,
-                                          int *resched);
 struct cl_page *cl_page_find        (const struct lu_env *env,
                                      struct cl_object *obj,
                                      pgoff_t idx, struct page *vmpage,
@@ -2964,6 +2968,15 @@ do {                                                                    \
  * @{ */
 
 /**
+ * Last page in the page list.
+ */
+static inline struct cl_page *cl_page_list_last(struct cl_page_list *plist)
+{
+        LASSERT(plist->pl_nr > 0);
+        return cfs_list_entry(plist->pl_pages.prev, struct cl_page, cp_batch);
+}
+
+/**
  * Iterate over pages in a page list.
  */
 #define cl_page_list_for_each(page, list)                               \
index 7fa0686..5cdebfd 100644 (file)
@@ -326,6 +326,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_OSC_OBJECT_CONTENTION   0x40e
 #define OBD_FAIL_OSC_CP_CANCEL_RACE      0x40f
 #define OBD_FAIL_OSC_CP_ENQ_RACE         0x410
+#define OBD_FAIL_OSC_NO_GRANT            0x411
 
 #define OBD_FAIL_PTLRPC                  0x500
 #define OBD_FAIL_PTLRPC_ACK              0x501
index 78e1da7..62b0072 100644 (file)
@@ -807,7 +807,7 @@ static int vvp_io_read_page(const struct lu_env *env,
 
 static int vvp_page_sync_io(const struct lu_env *env, struct cl_io *io,
                             struct cl_page *page, struct ccc_page *cp,
-                            int to, enum cl_req_type crt)
+                            enum cl_req_type crt)
 {
         struct cl_2queue  *queue;
         int result;
@@ -815,13 +815,10 @@ static int vvp_page_sync_io(const struct lu_env *env, struct cl_io *io,
         LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
 
         queue = &io->ci_queue;
-
         cl_2queue_init_page(queue, page);
-        cl_page_clip(env, page, 0, to);
 
         result = cl_io_submit_sync(env, io, crt, queue, CRP_NORMAL, 0);
         LASSERT(cl_page_is_owned(page, io));
-        cl_page_clip(env, page, 0, CFS_PAGE_SIZE);
 
         if (crt == CRT_READ)
                 /*
@@ -863,8 +860,7 @@ static int vvp_io_prepare_partial(const struct lu_env *env, struct cl_io *io,
                 } else if (cp->cpg_defer_uptodate)
                         cp->cpg_ra_used = 1;
                 else
-                        result = vvp_page_sync_io(env, io, pg, cp,
-                                                  CFS_PAGE_SIZE, CRT_READ);
+                        result = vvp_page_sync_io(env, io, pg, cp, CRT_READ);
                 /*
                  * In older implementations, obdo_refresh_inode is called here
                  * to update the inode because the write might modify the
@@ -968,7 +964,10 @@ static int vvp_io_commit_write(const struct lu_env *env,
                  * it will not soon. */
                 vvp_write_pending(cl2ccc(obj), cp);
                 result = cl_page_cache_add(env, io, pg, CRT_WRITE);
-                if (result == -EDQUOT)
+                if (result == -EDQUOT) {
+                        pgoff_t last_index = i_size_read(inode) >> CFS_PAGE_SHIFT;
+                        bool need_clip = true;
+
                         /*
                          * Client ran out of disk space grant. Possible
                          * strategies are:
@@ -983,11 +982,21 @@ static int vvp_io_commit_write(const struct lu_env *env,
                          * what the new code continues to do for the time
                          * being.
                          */
-                        result = vvp_page_sync_io(env, io, pg, cp,
-                                                  to, CRT_WRITE);
+                        if (last_index > pg->cp_index) {
+                                to = CFS_PAGE_SIZE;
+                                need_clip = false;
+                        } else if (last_index == pg->cp_index) {
+                                int size_to = i_size_read(inode) & ~CFS_PAGE_MASK;
+                                if (to < size_to)
+                                        to = size_to;
+                        }
+                        if (need_clip)
+                                cl_page_clip(env, pg, 0, to);
+                        result = vvp_page_sync_io(env, io, pg, cp, CRT_WRITE);
                         if (result)
                                 CERROR("Write page %lu of inode %p failed %d\n",
                                        pg->cp_index, inode, result);
+                }
         } else {
                 tallyop = LPROC_LL_DIRTY_HITS;
                 result = 0;
@@ -1010,7 +1019,6 @@ static int vvp_io_commit_write(const struct lu_env *env,
                         cl_page_discard(env, io, pg);
         }
         ll_inode_size_unlock(inode, 0);
-        
         RETURN(result);
 }
 
index 21919b3..9c62e89 100644 (file)
@@ -1902,17 +1902,14 @@ void cl_lock_page_list_fixup(const struct lu_env *env,
         LINVRNT(cl_lock_invariant(env, lock));
         ENTRY;
 
-        /* Now, we have a list of cl_pages under the \a lock, we need
-         * to check if some of pages are covered by other ldlm lock.
-         * If this is the case, they aren't needed to be written out this time.
-         *
-         * For example, we have A:[0,200] & B:[100,300] PW locks on client, now
-         * the latter is to be canceled, this means other client is
-         * reading/writing [200,300] since A won't canceled. Actually
-         * we just need to write the pages covered by [200,300]. This is safe,
-         * since [100,200] is also protected lock A.
-         */
+        /* No need to fix for WRITE lock because it is exclusive. */
+        if (lock->cll_descr.cld_mode >= CLM_WRITE)
+                RETURN_EXIT;
 
+        /* For those pages who are still covered by other PR locks, we should
+         * not discard them otherwise a [0, EOF) PR lock will discard all
+         * pages.
+         */
         cl_page_list_init(plist);
         cl_page_list_for_each_safe(page, temp, queue) {
                 pgoff_t                idx = page->cp_index;
@@ -1978,8 +1975,10 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock,
         struct cl_io          *io    = &info->clt_io;
         struct cl_2queue      *queue = &info->clt_queue;
         struct cl_lock_descr  *descr = &lock->cll_descr;
+        struct lu_device_type *dtype;
         long page_count;
-        int nonblock = 1, resched;
+        pgoff_t next_index;
+        int res;
         int result;
 
         LINVRNT(cl_lock_invariant(env, lock));
@@ -1990,36 +1989,49 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock,
         if (result != 0)
                 GOTO(out, result);
 
+        dtype = descr->cld_obj->co_lu.lo_dev->ld_type;
+        next_index = descr->cld_start;
         do {
+                const struct cl_page_slice *slice;
+
                 cl_2queue_init(queue);
-                cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start,
-                                    descr->cld_end, &queue->c2_qin, nonblock,
-                                    &resched);
+                res = cl_page_gang_lookup(env, descr->cld_obj, io,
+                                          next_index, descr->cld_end,
+                                          &queue->c2_qin);
                 page_count = queue->c2_qin.pl_nr;
-                if (page_count > 0) {
-                        result = cl_page_list_unmap(env, io, &queue->c2_qin);
-                        if (!discard) {
-                                long timeout = 600; /* 10 minutes. */
-                                /* for debug purpose, if this request can't be
-                                 * finished in 10 minutes, we hope it can
-                                 * notify us.
-                                 */
-                                result = cl_io_submit_sync(env, io, CRT_WRITE,
-                                                           queue, CRP_CANCEL,
-                                                           timeout);
-                                if (result)
-                                        CWARN("Writing %lu pages error: %d\n",
-                                              page_count, result);
-                        }
-                        cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
-                        cl_2queue_discard(env, io, queue);
-                        cl_2queue_disown(env, io, queue);
+                if (page_count == 0)
+                        break;
+
+                /* cl_page_gang_lookup() uses subobj and sublock to look for
+                 * covered pages, but @queue->c2_qin contains the list of top
+                 * pages. We have to turn the page back to subpage so as to
+                 * get `correct' next index. -jay */
+                slice = cl_page_at(cl_page_list_last(&queue->c2_qin), dtype);
+                next_index = slice->cpl_page->cp_index + 1;
+
+                result = cl_page_list_unmap(env, io, &queue->c2_qin);
+                if (!discard) {
+                        long timeout = 600; /* 10 minutes. */
+                        /* for debug purpose, if this request can't be
+                         * finished in 10 minutes, we hope it can notify us.
+                         */
+                        result = cl_io_submit_sync(env, io, CRT_WRITE, queue,
+                                                   CRP_CANCEL, timeout);
+                        if (result)
+                                CWARN("Writing %lu pages error: %d\n",
+                                      page_count, result);
                 }
+                cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
+                cl_2queue_discard(env, io, queue);
+                cl_2queue_disown(env, io, queue);
                 cl_2queue_fini(env, queue);
 
-                if (resched)
+                if (next_index > descr->cld_end)
+                        break;
+
+                if (res == CLP_GANG_RESCHED)
                         cfs_cond_resched();
-        } while (resched || nonblock--);
+        } while (res != CLP_GANG_OKAY);
 out:
         cl_io_fini(env, io);
         RETURN(result);
index 0a466e9..ff67ce0 100644 (file)
@@ -184,11 +184,12 @@ EXPORT_SYMBOL(cl_page_lookup);
  *
  * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
  * crucial in the face of [offset, EOF] locks.
+ *
+ * Return at least one page in @queue unless there is no covered page.
  */
-void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
-                         struct cl_io *io, pgoff_t start, pgoff_t end,
-                         struct cl_page_list *queue, int nonblock,
-                         int *resched)
+int cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
+                        struct cl_io *io, pgoff_t start, pgoff_t end,
+                        struct cl_page_list *queue)
 {
         struct cl_object_header *hdr;
         struct cl_page          *page;
@@ -199,15 +200,10 @@ void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
         unsigned int             nr;
         unsigned int             i;
         unsigned int             j;
-        int                    (*page_own)(const struct lu_env *env,
-                                           struct cl_io *io,
-                                           struct cl_page *pg);
+        int                      res = CLP_GANG_OKAY;
+        int                      tree_lock = 1;
         ENTRY;
 
-        if (resched != NULL)
-                *resched = 0;
-        page_own = nonblock ? cl_page_own_try : cl_page_own;
-
         idx = start;
         hdr = cl_object_header(obj);
         pvec = cl_env_info(env)->clt_pvec;
@@ -215,14 +211,17 @@ void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
         cfs_spin_lock(&hdr->coh_page_guard);
         while ((nr = radix_tree_gang_lookup(&hdr->coh_tree, (void **)pvec,
                                             idx, CLT_PVEC_SIZE)) > 0) {
+                int end_of_region = 0;
                 idx = pvec[nr - 1]->cp_index + 1;
                 for (i = 0, j = 0; i < nr; ++i) {
                         page = pvec[i];
                         pvec[i] = NULL;
 
                         LASSERT(page->cp_type == CPT_CACHEABLE);
-                        if (page->cp_index > end)
+                        if (page->cp_index > end) {
+                                end_of_region = 1;
                                 break;
+                        }
                         if (page->cp_state == CPS_FREEING)
                                 continue;
 
@@ -256,24 +255,44 @@ void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
                  * error in the latter case).
                  */
                 cfs_spin_unlock(&hdr->coh_page_guard);
+                tree_lock = 0;
+
                 for (i = 0; i < j; ++i) {
                         page = pvec[i];
-                        if (page_own(env, io, page) == 0)
-                                cl_page_list_add(queue, page);
+                        if (res == CLP_GANG_OKAY) {
+                                typeof(cl_page_own) *page_own;
+
+                                page_own = queue->pl_nr ?
+                                           cl_page_own_try : cl_page_own;
+                                if (page_own(env, io, page) == 0) {
+                                        cl_page_list_add(queue, page);
+                                } else if (page->cp_state != CPS_FREEING) {
+                                        /* cl_page_own() won't fail unless
+                                         * the page is being freed. */
+                                        LASSERT(queue->pl_nr != 0);
+                                        res = CLP_GANG_AGAIN;
+                                }
+                        }
                         lu_ref_del(&page->cp_reference,
                                    "page_list", cfs_current());
                         cl_page_put(env, page);
                 }
-                cfs_spin_lock(&hdr->coh_page_guard);
-                if (nr < CLT_PVEC_SIZE)
+                if (nr < CLT_PVEC_SIZE || end_of_region)
                         break;
-                if (resched != NULL && cfs_need_resched()) {
-                        *resched = 1;
+
+                /* if the number of pages is zero, this will mislead the caller
+                 * that there is no page any more. */
+                if (queue->pl_nr && cfs_need_resched())
+                        res = CLP_GANG_RESCHED;
+                if (res != CLP_GANG_OKAY)
                         break;
-                }
+
+                cfs_spin_lock(&hdr->coh_page_guard);
+                tree_lock = 1;
         }
-        cfs_spin_unlock(&hdr->coh_page_guard);
-        EXIT;
+        if (tree_lock)
+                cfs_spin_unlock(&hdr->coh_page_guard);
+        RETURN(res);
 }
 EXPORT_SYMBOL(cl_page_gang_lookup);
 
@@ -960,7 +979,7 @@ static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
         io = cl_io_top(io);
 
         if (pg->cp_state == CPS_FREEING) {
-                result = -EAGAIN;
+                result = -ENOENT;
         } else {
                 result = CL_PAGE_INVOKE(env, pg, CL_PAGE_OP(cpo_own),
                                         (const struct lu_env *,
@@ -977,7 +996,7 @@ static int cl_page_own0(const struct lu_env *env, struct cl_io *io,
                                 cl_page_state_set(env, pg, CPS_OWNED);
                         } else {
                                 cl_page_disown0(env, io, pg);
-                                result = -EAGAIN;
+                                result = -ENOENT;
                         }
                 }
         }
@@ -1465,7 +1484,6 @@ int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj)
         struct cl_object        *obj = cl_object_top(clobj);
         struct cl_io            *io;
         struct cl_page_list     *plist;
-        int                      resched;
         int                      result;
 
         ENTRY;
@@ -1486,8 +1504,8 @@ int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj)
 
         do {
                 cl_page_list_init(plist);
-                cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF, plist, 0,
-                                    &resched);
+                result = cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF,
+                                             plist);
                 /*
                  * Since we're purging the pages of an object, we don't care
                  * the possible outcomes of the following functions.
@@ -1497,9 +1515,9 @@ int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj)
                 cl_page_list_disown(env, io, plist);
                 cl_page_list_fini(env, plist);
 
-                if (resched)
+                if (result == CLP_GANG_RESCHED)
                         cfs_cond_resched();
-        } while (resched);
+        } while (result != CLP_GANG_OKAY);
 
         cl_io_fini(env, io);
         RETURN(result);
index 4cae4a2..5a1fc4b 100644 (file)
@@ -411,7 +411,7 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
          * XXX this is quite expensive check.
          */
         cl_page_list_init(list);
-        cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list, 0);
+        cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list);
 
         cl_page_list_for_each(page, list)
                 CL_PAGE_DEBUG(D_ERROR, env, page, "exists %lu\n", start);
index 0b03ea7..6991275 100644 (file)
@@ -1313,7 +1313,7 @@ static void osc_lock_cancel(const struct lu_env *env,
         if (dlmlock != NULL) {
                 int do_cancel;
 
-                discard = dlmlock->l_flags & LDLM_FL_DISCARD_DATA;
+                discard = !!(dlmlock->l_flags & LDLM_FL_DISCARD_DATA);
                 result = osc_lock_flush(olck, discard);
                 osc_lock_unhold(olck);
 
@@ -1374,8 +1374,7 @@ static int osc_lock_has_pages(struct osc_lock *olck)
                 io->ci_obj = cl_object_top(obj);
                 cl_io_init(env, io, CIT_MISC, io->ci_obj);
                 cl_page_gang_lookup(env, obj, io,
-                                    descr->cld_start, descr->cld_end, plist, 0,
-                                    NULL);
+                                    descr->cld_start, descr->cld_end, plist);
                 cl_lock_page_list_fixup(env, io, lock, plist);
                 if (plist->pl_nr > 0) {
                         CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n");
index 28be07e..d10a372 100644 (file)
@@ -151,8 +151,11 @@ int osc_attr_set(const struct lu_env *env, struct cl_object *obj,
                 lvb->lvb_ctime = attr->cat_ctime;
         if (valid & CAT_BLOCKS)
                 lvb->lvb_blocks = attr->cat_blocks;
-        if (valid & CAT_KMS)
+        if (valid & CAT_KMS) {
+                CDEBUG(D_CACHE, "set kms from "LPU64"to "LPU64"\n",
+                       oinfo->loi_kms, (__u64)attr->cat_kms);
                 loi_kms_set(oinfo, attr->cat_kms);
+        }
         return 0;
 }
 
index d5e24e2..5dd2640 100644 (file)
@@ -588,7 +588,7 @@ static int osc_completion(const struct lu_env *env,
         if (rc == 0 && oap->oap_brw_flags & OBD_BRW_SRVLOCK) {
                 struct lu_device *ld    = opg->ops_cl.cpl_obj->co_lu.lo_dev;
                 struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
-                int bytes = opg->ops_to - opg->ops_from;
+                int bytes = oap->oap_count;
 
                 if (crt == CRT_READ)
                         stats->os_lockless_reads += bytes;
index b4abf57..964c5bc 100644 (file)
@@ -1349,11 +1349,17 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         pg_prev = pga[0];
         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
                 struct brw_page *pg = pga[i];
+                int poff = pg->off & ~CFS_PAGE_MASK;
 
                 LASSERT(pg->count > 0);
-                LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
-                         "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
-                         pg->off, pg->count);
+                /* make sure there is no gap in the middle of page array */
+                LASSERTF(page_count == 1 ||
+                         (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
+                          ergo(i > 0 && i < page_count - 1,
+                               poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
+                          ergo(i == page_count - 1, poff == 0)),
+                         "i: %d/%d pg: %p off: "LPU64", count: %u\n",
+                         i, page_count, pg, pg->off, pg->count);
 #ifdef __linux__
                 LASSERTF(i == 0 || pg->off > pg_prev->off,
                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
@@ -1369,8 +1375,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
                         (pg->flag & OBD_BRW_SRVLOCK));
 
-                ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
-                                      pg->count);
+                ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
                 requested_nob += pg->count;
 
                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
@@ -2434,12 +2439,11 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
         struct osc_brw_async_args *aa;
         const struct obd_async_page_ops *ops;
         CFS_LIST_HEAD(rpc_list);
-        CFS_LIST_HEAD(tmp_list);
-        unsigned int ending_offset;
-        obd_off starting_offset = OBD_OBJECT_EOF;
-        int starting_page_off = 0;
         int srvlock = 0, mem_tight = 0;
         struct cl_object *clob = NULL;
+        obd_off starting_offset = OBD_OBJECT_EOF;
+        unsigned int ending_offset;
+        int starting_page_off = 0;
         ENTRY;
 
         /* ASYNC_HP pages first. At present, when the lock the pages is
@@ -2447,14 +2451,10 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
          * with ASYNC_HP. We have to send out them as soon as possible. */
         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
                 if (oap->oap_async_flags & ASYNC_HP)
-                        cfs_list_move(&oap->oap_pending_item, &tmp_list);
-                else
-                        cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
+                        cfs_list_move(&oap->oap_pending_item, &lop->lop_pending);
                 if (++page_count >= cli->cl_max_pages_per_rpc)
                         break;
         }
-
-        cfs_list_splice(&tmp_list, &lop->lop_pending);
         page_count = 0;
 
         /* first we find the pages we're allowed to work with */
@@ -2584,20 +2584,25 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
 
                 /* now put the page back in our accounting */
                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
+                if (page_count++ == 0) {
+                        srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
+                        starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
+                                          (PTLRPC_MAX_BRW_SIZE - 1);
+                }
+
                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
                         mem_tight = 1;
-                if (page_count == 0)
-                        srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
-                if (++page_count >= cli->cl_max_pages_per_rpc)
-                        break;
 
                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
                  * have the same alignment as the initial writes that allocated
                  * extents on the server. */
-                ending_offset = (oap->oap_obj_off + oap->oap_page_off +
-                                 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
-                if (ending_offset == 0)
+                ending_offset = oap->oap_obj_off + oap->oap_page_off +
+                                oap->oap_count;
+                if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
+                        break;
+
+                if (page_count >= cli->cl_max_pages_per_rpc)
                         break;
 
                 /* If there is a gap at the end of this page, it can't merge
@@ -2792,7 +2797,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
 
                         if (rc > 0)
                                 race_counter = 0;
-                        else
+                        else if (rc == 0)
                                 race_counter++;
                 }
                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
@@ -2803,7 +2808,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
 
                         if (rc > 0)
                                 race_counter = 0;
-                        else
+                        else if (rc == 0)
                                 race_counter++;
                 }
 
@@ -2889,8 +2894,9 @@ static int osc_enter_cache(const struct lu_env *env,
 
         /* force the caller to try sync io.  this can jump the list
          * of queued writes and create a discontiguous rpc stream */
-        if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
-            loi->loi_ar.ar_force_sync)
+        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
+            cli->cl_dirty_max < CFS_PAGE_SIZE     ||
+            cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
                 RETURN(-EDQUOT);
 
         /* Hopefully normal case - cache space and write credits available */
index f337115..b1e0583 100644 (file)
@@ -8077,6 +8077,21 @@ test_218() {
 }
 run_test 218 "parallel read and truncate should not deadlock ======================="
 
+test_219() {
+        # write one partial page
+        dd if=/dev/zero of=$DIR/$tfile bs=1024 count=1
+        # set no grant so vvp_io_commit_write will do sync write
+        $LCTL set_param fail_loc=0x411
+        # write a full page at the end of file
+        dd if=/dev/zero of=$DIR/$tfile bs=4096 count=1 seek=1 conv=notrunc
+
+        $LCTL set_param fail_loc=0
+        dd if=/dev/zero of=$DIR/$tfile bs=4096 count=1 seek=3
+        $LCTL set_param fail_loc=0x411
+        dd if=/dev/zero of=$DIR/$tfile bs=1024 count=1 seek=2 conv=notrunc
+}
+run_test 219 "LU-394: Write partial won't cause uncontiguous pages vec at LND"
+
 #
 # tests that do cleanup/setup should be run at the end
 #