From: Jinshan Xiong Date: Tue, 12 Jul 2011 17:55:19 +0000 (-0700) Subject: LU-394: LND failure casued by discontiguous KIOV X-Git-Tag: 2.0.66.0~1 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=419016ac3e53e798453106ec04412a4843620916 LU-394: LND failure casued by discontiguous KIOV This issue was imported by bug 18881 where I moved the urgent pages to front of lop_pending to fix a deadlock issue. I reverted bug 18881 in this patch and came up with a new solution: cl_page_gang_lookup() only blocks on the first page. This is also for deadlock avoid since we should never grab multiple pages' lock without try method. Change-Id: I5dce35e3929e4f79a350e56ddc9e752269db060e Signed-off-by: Jinshan Xiong Signed-off-by: Oleg Drokin Reviewed-on: http://review.whamcloud.com/911 --- diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 76ee194..7398ca3 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -2654,15 +2654,19 @@ static inline int cl_object_same(struct cl_object *o0, struct cl_object *o1) /** \defgroup cl_page cl_page * @{ */ -struct cl_page *cl_page_lookup(struct cl_object_header *hdr, +enum { + CLP_GANG_OKAY = 0, + CLP_GANG_AGAIN, + CLP_GANG_RESCHED +}; + +int cl_page_gang_lookup (const struct lu_env *env, + struct cl_object *obj, + struct cl_io *io, + pgoff_t start, pgoff_t end, + struct cl_page_list *plist); +struct cl_page *cl_page_lookup (struct cl_object_header *hdr, pgoff_t index); -void cl_page_gang_lookup(const struct lu_env *env, - struct cl_object *obj, - struct cl_io *io, - pgoff_t start, pgoff_t end, - struct cl_page_list *plist, - int nonblock, - int *resched); struct cl_page *cl_page_find (const struct lu_env *env, struct cl_object *obj, pgoff_t idx, struct page *vmpage, @@ -2964,6 +2968,15 @@ do { \ * @{ */ /** + * Last page in the page list. + */ +static inline struct cl_page *cl_page_list_last(struct cl_page_list *plist) +{ + LASSERT(plist->pl_nr > 0); + return cfs_list_entry(plist->pl_pages.prev, struct cl_page, cp_batch); +} + +/** * Iterate over pages in a page list. */ #define cl_page_list_for_each(page, list) \ diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 7fa0686..5cdebfd 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -326,6 +326,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_OSC_OBJECT_CONTENTION 0x40e #define OBD_FAIL_OSC_CP_CANCEL_RACE 0x40f #define OBD_FAIL_OSC_CP_ENQ_RACE 0x410 +#define OBD_FAIL_OSC_NO_GRANT 0x411 #define OBD_FAIL_PTLRPC 0x500 #define OBD_FAIL_PTLRPC_ACK 0x501 diff --git a/lustre/llite/vvp_io.c b/lustre/llite/vvp_io.c index 78e1da7..62b0072 100644 --- a/lustre/llite/vvp_io.c +++ b/lustre/llite/vvp_io.c @@ -807,7 +807,7 @@ static int vvp_io_read_page(const struct lu_env *env, static int vvp_page_sync_io(const struct lu_env *env, struct cl_io *io, struct cl_page *page, struct ccc_page *cp, - int to, enum cl_req_type crt) + enum cl_req_type crt) { struct cl_2queue *queue; int result; @@ -815,13 +815,10 @@ static int vvp_page_sync_io(const struct lu_env *env, struct cl_io *io, LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE); queue = &io->ci_queue; - cl_2queue_init_page(queue, page); - cl_page_clip(env, page, 0, to); result = cl_io_submit_sync(env, io, crt, queue, CRP_NORMAL, 0); LASSERT(cl_page_is_owned(page, io)); - cl_page_clip(env, page, 0, CFS_PAGE_SIZE); if (crt == CRT_READ) /* @@ -863,8 +860,7 @@ static int vvp_io_prepare_partial(const struct lu_env *env, struct cl_io *io, } else if (cp->cpg_defer_uptodate) cp->cpg_ra_used = 1; else - result = vvp_page_sync_io(env, io, pg, cp, - CFS_PAGE_SIZE, CRT_READ); + result = vvp_page_sync_io(env, io, pg, cp, CRT_READ); /* * In older implementations, obdo_refresh_inode is called here * to update the inode because the write might modify the @@ -968,7 +964,10 @@ static int vvp_io_commit_write(const struct lu_env *env, * it will not soon. */ vvp_write_pending(cl2ccc(obj), cp); result = cl_page_cache_add(env, io, pg, CRT_WRITE); - if (result == -EDQUOT) + if (result == -EDQUOT) { + pgoff_t last_index = i_size_read(inode) >> CFS_PAGE_SHIFT; + bool need_clip = true; + /* * Client ran out of disk space grant. Possible * strategies are: @@ -983,11 +982,21 @@ static int vvp_io_commit_write(const struct lu_env *env, * what the new code continues to do for the time * being. */ - result = vvp_page_sync_io(env, io, pg, cp, - to, CRT_WRITE); + if (last_index > pg->cp_index) { + to = CFS_PAGE_SIZE; + need_clip = false; + } else if (last_index == pg->cp_index) { + int size_to = i_size_read(inode) & ~CFS_PAGE_MASK; + if (to < size_to) + to = size_to; + } + if (need_clip) + cl_page_clip(env, pg, 0, to); + result = vvp_page_sync_io(env, io, pg, cp, CRT_WRITE); if (result) CERROR("Write page %lu of inode %p failed %d\n", pg->cp_index, inode, result); + } } else { tallyop = LPROC_LL_DIRTY_HITS; result = 0; @@ -1010,7 +1019,6 @@ static int vvp_io_commit_write(const struct lu_env *env, cl_page_discard(env, io, pg); } ll_inode_size_unlock(inode, 0); - RETURN(result); } diff --git a/lustre/obdclass/cl_lock.c b/lustre/obdclass/cl_lock.c index 21919b3..9c62e89 100644 --- a/lustre/obdclass/cl_lock.c +++ b/lustre/obdclass/cl_lock.c @@ -1902,17 +1902,14 @@ void cl_lock_page_list_fixup(const struct lu_env *env, LINVRNT(cl_lock_invariant(env, lock)); ENTRY; - /* Now, we have a list of cl_pages under the \a lock, we need - * to check if some of pages are covered by other ldlm lock. - * If this is the case, they aren't needed to be written out this time. - * - * For example, we have A:[0,200] & B:[100,300] PW locks on client, now - * the latter is to be canceled, this means other client is - * reading/writing [200,300] since A won't canceled. Actually - * we just need to write the pages covered by [200,300]. This is safe, - * since [100,200] is also protected lock A. - */ + /* No need to fix for WRITE lock because it is exclusive. */ + if (lock->cll_descr.cld_mode >= CLM_WRITE) + RETURN_EXIT; + /* For those pages who are still covered by other PR locks, we should + * not discard them otherwise a [0, EOF) PR lock will discard all + * pages. + */ cl_page_list_init(plist); cl_page_list_for_each_safe(page, temp, queue) { pgoff_t idx = page->cp_index; @@ -1978,8 +1975,10 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock, struct cl_io *io = &info->clt_io; struct cl_2queue *queue = &info->clt_queue; struct cl_lock_descr *descr = &lock->cll_descr; + struct lu_device_type *dtype; long page_count; - int nonblock = 1, resched; + pgoff_t next_index; + int res; int result; LINVRNT(cl_lock_invariant(env, lock)); @@ -1990,36 +1989,49 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock, if (result != 0) GOTO(out, result); + dtype = descr->cld_obj->co_lu.lo_dev->ld_type; + next_index = descr->cld_start; do { + const struct cl_page_slice *slice; + cl_2queue_init(queue); - cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start, - descr->cld_end, &queue->c2_qin, nonblock, - &resched); + res = cl_page_gang_lookup(env, descr->cld_obj, io, + next_index, descr->cld_end, + &queue->c2_qin); page_count = queue->c2_qin.pl_nr; - if (page_count > 0) { - result = cl_page_list_unmap(env, io, &queue->c2_qin); - if (!discard) { - long timeout = 600; /* 10 minutes. */ - /* for debug purpose, if this request can't be - * finished in 10 minutes, we hope it can - * notify us. - */ - result = cl_io_submit_sync(env, io, CRT_WRITE, - queue, CRP_CANCEL, - timeout); - if (result) - CWARN("Writing %lu pages error: %d\n", - page_count, result); - } - cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout); - cl_2queue_discard(env, io, queue); - cl_2queue_disown(env, io, queue); + if (page_count == 0) + break; + + /* cl_page_gang_lookup() uses subobj and sublock to look for + * covered pages, but @queue->c2_qin contains the list of top + * pages. We have to turn the page back to subpage so as to + * get `correct' next index. -jay */ + slice = cl_page_at(cl_page_list_last(&queue->c2_qin), dtype); + next_index = slice->cpl_page->cp_index + 1; + + result = cl_page_list_unmap(env, io, &queue->c2_qin); + if (!discard) { + long timeout = 600; /* 10 minutes. */ + /* for debug purpose, if this request can't be + * finished in 10 minutes, we hope it can notify us. + */ + result = cl_io_submit_sync(env, io, CRT_WRITE, queue, + CRP_CANCEL, timeout); + if (result) + CWARN("Writing %lu pages error: %d\n", + page_count, result); } + cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout); + cl_2queue_discard(env, io, queue); + cl_2queue_disown(env, io, queue); cl_2queue_fini(env, queue); - if (resched) + if (next_index > descr->cld_end) + break; + + if (res == CLP_GANG_RESCHED) cfs_cond_resched(); - } while (resched || nonblock--); + } while (res != CLP_GANG_OKAY); out: cl_io_fini(env, io); RETURN(result); diff --git a/lustre/obdclass/cl_page.c b/lustre/obdclass/cl_page.c index 0a466e9..ff67ce0 100644 --- a/lustre/obdclass/cl_page.c +++ b/lustre/obdclass/cl_page.c @@ -184,11 +184,12 @@ EXPORT_SYMBOL(cl_page_lookup); * * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely * crucial in the face of [offset, EOF] locks. + * + * Return at least one page in @queue unless there is no covered page. */ -void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj, - struct cl_io *io, pgoff_t start, pgoff_t end, - struct cl_page_list *queue, int nonblock, - int *resched) +int cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj, + struct cl_io *io, pgoff_t start, pgoff_t end, + struct cl_page_list *queue) { struct cl_object_header *hdr; struct cl_page *page; @@ -199,15 +200,10 @@ void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj, unsigned int nr; unsigned int i; unsigned int j; - int (*page_own)(const struct lu_env *env, - struct cl_io *io, - struct cl_page *pg); + int res = CLP_GANG_OKAY; + int tree_lock = 1; ENTRY; - if (resched != NULL) - *resched = 0; - page_own = nonblock ? cl_page_own_try : cl_page_own; - idx = start; hdr = cl_object_header(obj); pvec = cl_env_info(env)->clt_pvec; @@ -215,14 +211,17 @@ void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj, cfs_spin_lock(&hdr->coh_page_guard); while ((nr = radix_tree_gang_lookup(&hdr->coh_tree, (void **)pvec, idx, CLT_PVEC_SIZE)) > 0) { + int end_of_region = 0; idx = pvec[nr - 1]->cp_index + 1; for (i = 0, j = 0; i < nr; ++i) { page = pvec[i]; pvec[i] = NULL; LASSERT(page->cp_type == CPT_CACHEABLE); - if (page->cp_index > end) + if (page->cp_index > end) { + end_of_region = 1; break; + } if (page->cp_state == CPS_FREEING) continue; @@ -256,24 +255,44 @@ void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj, * error in the latter case). */ cfs_spin_unlock(&hdr->coh_page_guard); + tree_lock = 0; + for (i = 0; i < j; ++i) { page = pvec[i]; - if (page_own(env, io, page) == 0) - cl_page_list_add(queue, page); + if (res == CLP_GANG_OKAY) { + typeof(cl_page_own) *page_own; + + page_own = queue->pl_nr ? + cl_page_own_try : cl_page_own; + if (page_own(env, io, page) == 0) { + cl_page_list_add(queue, page); + } else if (page->cp_state != CPS_FREEING) { + /* cl_page_own() won't fail unless + * the page is being freed. */ + LASSERT(queue->pl_nr != 0); + res = CLP_GANG_AGAIN; + } + } lu_ref_del(&page->cp_reference, "page_list", cfs_current()); cl_page_put(env, page); } - cfs_spin_lock(&hdr->coh_page_guard); - if (nr < CLT_PVEC_SIZE) + if (nr < CLT_PVEC_SIZE || end_of_region) break; - if (resched != NULL && cfs_need_resched()) { - *resched = 1; + + /* if the number of pages is zero, this will mislead the caller + * that there is no page any more. */ + if (queue->pl_nr && cfs_need_resched()) + res = CLP_GANG_RESCHED; + if (res != CLP_GANG_OKAY) break; - } + + cfs_spin_lock(&hdr->coh_page_guard); + tree_lock = 1; } - cfs_spin_unlock(&hdr->coh_page_guard); - EXIT; + if (tree_lock) + cfs_spin_unlock(&hdr->coh_page_guard); + RETURN(res); } EXPORT_SYMBOL(cl_page_gang_lookup); @@ -960,7 +979,7 @@ static int cl_page_own0(const struct lu_env *env, struct cl_io *io, io = cl_io_top(io); if (pg->cp_state == CPS_FREEING) { - result = -EAGAIN; + result = -ENOENT; } else { result = CL_PAGE_INVOKE(env, pg, CL_PAGE_OP(cpo_own), (const struct lu_env *, @@ -977,7 +996,7 @@ static int cl_page_own0(const struct lu_env *env, struct cl_io *io, cl_page_state_set(env, pg, CPS_OWNED); } else { cl_page_disown0(env, io, pg); - result = -EAGAIN; + result = -ENOENT; } } } @@ -1465,7 +1484,6 @@ int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj) struct cl_object *obj = cl_object_top(clobj); struct cl_io *io; struct cl_page_list *plist; - int resched; int result; ENTRY; @@ -1486,8 +1504,8 @@ int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj) do { cl_page_list_init(plist); - cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF, plist, 0, - &resched); + result = cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF, + plist); /* * Since we're purging the pages of an object, we don't care * the possible outcomes of the following functions. @@ -1497,9 +1515,9 @@ int cl_pages_prune(const struct lu_env *env, struct cl_object *clobj) cl_page_list_disown(env, io, plist); cl_page_list_fini(env, plist); - if (resched) + if (result == CLP_GANG_RESCHED) cfs_cond_resched(); - } while (resched); + } while (result != CLP_GANG_OKAY); cl_io_fini(env, io); RETURN(result); diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 4cae4a2..5a1fc4b 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -411,7 +411,7 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, * XXX this is quite expensive check. */ cl_page_list_init(list); - cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list, 0); + cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list); cl_page_list_for_each(page, list) CL_PAGE_DEBUG(D_ERROR, env, page, "exists %lu\n", start); diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 0b03ea7..6991275 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -1313,7 +1313,7 @@ static void osc_lock_cancel(const struct lu_env *env, if (dlmlock != NULL) { int do_cancel; - discard = dlmlock->l_flags & LDLM_FL_DISCARD_DATA; + discard = !!(dlmlock->l_flags & LDLM_FL_DISCARD_DATA); result = osc_lock_flush(olck, discard); osc_lock_unhold(olck); @@ -1374,8 +1374,7 @@ static int osc_lock_has_pages(struct osc_lock *olck) io->ci_obj = cl_object_top(obj); cl_io_init(env, io, CIT_MISC, io->ci_obj); cl_page_gang_lookup(env, obj, io, - descr->cld_start, descr->cld_end, plist, 0, - NULL); + descr->cld_start, descr->cld_end, plist); cl_lock_page_list_fixup(env, io, lock, plist); if (plist->pl_nr > 0) { CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n"); diff --git a/lustre/osc/osc_object.c b/lustre/osc/osc_object.c index 28be07e..d10a372 100644 --- a/lustre/osc/osc_object.c +++ b/lustre/osc/osc_object.c @@ -151,8 +151,11 @@ int osc_attr_set(const struct lu_env *env, struct cl_object *obj, lvb->lvb_ctime = attr->cat_ctime; if (valid & CAT_BLOCKS) lvb->lvb_blocks = attr->cat_blocks; - if (valid & CAT_KMS) + if (valid & CAT_KMS) { + CDEBUG(D_CACHE, "set kms from "LPU64"to "LPU64"\n", + oinfo->loi_kms, (__u64)attr->cat_kms); loi_kms_set(oinfo, attr->cat_kms); + } return 0; } diff --git a/lustre/osc/osc_page.c b/lustre/osc/osc_page.c index d5e24e2..5dd2640 100644 --- a/lustre/osc/osc_page.c +++ b/lustre/osc/osc_page.c @@ -588,7 +588,7 @@ static int osc_completion(const struct lu_env *env, if (rc == 0 && oap->oap_brw_flags & OBD_BRW_SRVLOCK) { struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev; struct osc_stats *stats = &lu2osc_dev(ld)->od_stats; - int bytes = opg->ops_to - opg->ops_from; + int bytes = oap->oap_count; if (crt == CRT_READ) stats->os_lockless_reads += bytes; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index b4abf57..964c5bc 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1349,11 +1349,17 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { struct brw_page *pg = pga[i]; + int poff = pg->off & ~CFS_PAGE_MASK; LASSERT(pg->count > 0); - LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE, - "i: %d pg: %p off: "LPU64", count: %u\n", i, pg, - pg->off, pg->count); + /* make sure there is no gap in the middle of page array */ + LASSERTF(page_count == 1 || + (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) && + ergo(i > 0 && i < page_count - 1, + poff == 0 && pg->count == CFS_PAGE_SIZE) && + ergo(i == page_count - 1, poff == 0)), + "i: %d/%d pg: %p off: "LPU64", count: %u\n", + i, page_count, pg, pg->off, pg->count); #ifdef __linux__ LASSERTF(i == 0 || pg->off > pg_prev->off, "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64 @@ -1369,8 +1375,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == (pg->flag & OBD_BRW_SRVLOCK)); - ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK, - pg->count); + ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count); requested_nob += pg->count; if (i > 0 && can_merge_pages(pg_prev, pg)) { @@ -2434,12 +2439,11 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, struct osc_brw_async_args *aa; const struct obd_async_page_ops *ops; CFS_LIST_HEAD(rpc_list); - CFS_LIST_HEAD(tmp_list); - unsigned int ending_offset; - obd_off starting_offset = OBD_OBJECT_EOF; - int starting_page_off = 0; int srvlock = 0, mem_tight = 0; struct cl_object *clob = NULL; + obd_off starting_offset = OBD_OBJECT_EOF; + unsigned int ending_offset; + int starting_page_off = 0; ENTRY; /* ASYNC_HP pages first. At present, when the lock the pages is @@ -2447,14 +2451,10 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, * with ASYNC_HP. We have to send out them as soon as possible. */ cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) { if (oap->oap_async_flags & ASYNC_HP) - cfs_list_move(&oap->oap_pending_item, &tmp_list); - else - cfs_list_move_tail(&oap->oap_pending_item, &tmp_list); + cfs_list_move(&oap->oap_pending_item, &lop->lop_pending); if (++page_count >= cli->cl_max_pages_per_rpc) break; } - - cfs_list_splice(&tmp_list, &lop->lop_pending); page_count = 0; /* first we find the pages we're allowed to work with */ @@ -2584,20 +2584,25 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, /* now put the page back in our accounting */ cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list); + if (page_count++ == 0) { + srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK); + starting_offset = (oap->oap_obj_off+oap->oap_page_off) & + (PTLRPC_MAX_BRW_SIZE - 1); + } + if (oap->oap_brw_flags & OBD_BRW_MEMALLOC) mem_tight = 1; - if (page_count == 0) - srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK); - if (++page_count >= cli->cl_max_pages_per_rpc) - break; /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads * have the same alignment as the initial writes that allocated * extents on the server. */ - ending_offset = (oap->oap_obj_off + oap->oap_page_off + - oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1); - if (ending_offset == 0) + ending_offset = oap->oap_obj_off + oap->oap_page_off + + oap->oap_count; + if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1))) + break; + + if (page_count >= cli->cl_max_pages_per_rpc) break; /* If there is a gap at the end of this page, it can't merge @@ -2792,7 +2797,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) if (rc > 0) race_counter = 0; - else + else if (rc == 0) race_counter++; } if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) { @@ -2803,7 +2808,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) if (rc > 0) race_counter = 0; - else + else if (rc == 0) race_counter++; } @@ -2889,8 +2894,9 @@ static int osc_enter_cache(const struct lu_env *env, /* force the caller to try sync io. this can jump the list * of queued writes and create a discontiguous rpc stream */ - if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync || - loi->loi_ar.ar_force_sync) + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) || + cli->cl_dirty_max < CFS_PAGE_SIZE || + cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) RETURN(-EDQUOT); /* Hopefully normal case - cache space and write credits available */ diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index f337115..b1e0583 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -8077,6 +8077,21 @@ test_218() { } run_test 218 "parallel read and truncate should not deadlock =======================" +test_219() { + # write one partial page + dd if=/dev/zero of=$DIR/$tfile bs=1024 count=1 + # set no grant so vvp_io_commit_write will do sync write + $LCTL set_param fail_loc=0x411 + # write a full page at the end of file + dd if=/dev/zero of=$DIR/$tfile bs=4096 count=1 seek=1 conv=notrunc + + $LCTL set_param fail_loc=0 + dd if=/dev/zero of=$DIR/$tfile bs=4096 count=1 seek=3 + $LCTL set_param fail_loc=0x411 + dd if=/dev/zero of=$DIR/$tfile bs=1024 count=1 seek=2 conv=notrunc +} +run_test 219 "LU-394: Write partial won't cause uncontiguous pages vec at LND" + # # tests that do cleanup/setup should be run at the end #