/** \defgroup cl_page cl_page
* @{ */
-struct cl_page *cl_page_lookup(struct cl_object_header *hdr,
+enum {
+ CLP_GANG_OKAY = 0,
+ CLP_GANG_AGAIN,
+ CLP_GANG_RESCHED
+};
+
+int cl_page_gang_lookup (const struct lu_env *env,
+ struct cl_object *obj,
+ struct cl_io *io,
+ pgoff_t start, pgoff_t end,
+ struct cl_page_list *plist);
+struct cl_page *cl_page_lookup (struct cl_object_header *hdr,
pgoff_t index);
-void cl_page_gang_lookup(const struct lu_env *env,
- struct cl_object *obj,
- struct cl_io *io,
- pgoff_t start, pgoff_t end,
- struct cl_page_list *plist,
- int nonblock,
- int *resched);
struct cl_page *cl_page_find (const struct lu_env *env,
struct cl_object *obj,
pgoff_t idx, struct page *vmpage,
* @{ */
/**
+ * Last page in the page list.
+ */
+static inline struct cl_page *cl_page_list_last(struct cl_page_list *plist)
+{
+ LASSERT(plist->pl_nr > 0);
+ return cfs_list_entry(plist->pl_pages.prev, struct cl_page, cp_batch);
+}
+
+/**
* Iterate over pages in a page list.
*/
#define cl_page_list_for_each(page, list) \
#define OBD_FAIL_OSC_OBJECT_CONTENTION 0x40e
#define OBD_FAIL_OSC_CP_CANCEL_RACE 0x40f
#define OBD_FAIL_OSC_CP_ENQ_RACE 0x410
+#define OBD_FAIL_OSC_NO_GRANT 0x411
#define OBD_FAIL_PTLRPC 0x500
#define OBD_FAIL_PTLRPC_ACK 0x501
static int vvp_page_sync_io(const struct lu_env *env, struct cl_io *io,
struct cl_page *page, struct ccc_page *cp,
- int to, enum cl_req_type crt)
+ enum cl_req_type crt)
{
struct cl_2queue *queue;
int result;
LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
queue = &io->ci_queue;
-
cl_2queue_init_page(queue, page);
- cl_page_clip(env, page, 0, to);
result = cl_io_submit_sync(env, io, crt, queue, CRP_NORMAL, 0);
LASSERT(cl_page_is_owned(page, io));
- cl_page_clip(env, page, 0, CFS_PAGE_SIZE);
if (crt == CRT_READ)
/*
} else if (cp->cpg_defer_uptodate)
cp->cpg_ra_used = 1;
else
- result = vvp_page_sync_io(env, io, pg, cp,
- CFS_PAGE_SIZE, CRT_READ);
+ result = vvp_page_sync_io(env, io, pg, cp, CRT_READ);
/*
* In older implementations, obdo_refresh_inode is called here
* to update the inode because the write might modify the
* it will not soon. */
vvp_write_pending(cl2ccc(obj), cp);
result = cl_page_cache_add(env, io, pg, CRT_WRITE);
- if (result == -EDQUOT)
+ if (result == -EDQUOT) {
+ pgoff_t last_index = i_size_read(inode) >> CFS_PAGE_SHIFT;
+ bool need_clip = true;
+
/*
* Client ran out of disk space grant. Possible
* strategies are:
* what the new code continues to do for the time
* being.
*/
- result = vvp_page_sync_io(env, io, pg, cp,
- to, CRT_WRITE);
+ if (last_index > pg->cp_index) {
+ to = CFS_PAGE_SIZE;
+ need_clip = false;
+ } else if (last_index == pg->cp_index) {
+ int size_to = i_size_read(inode) & ~CFS_PAGE_MASK;
+ if (to < size_to)
+ to = size_to;
+ }
+ if (need_clip)
+ cl_page_clip(env, pg, 0, to);
+ result = vvp_page_sync_io(env, io, pg, cp, CRT_WRITE);
if (result)
CERROR("Write page %lu of inode %p failed %d\n",
pg->cp_index, inode, result);
+ }
} else {
tallyop = LPROC_LL_DIRTY_HITS;
result = 0;
cl_page_discard(env, io, pg);
}
ll_inode_size_unlock(inode, 0);
-
RETURN(result);
}
LINVRNT(cl_lock_invariant(env, lock));
ENTRY;
- /* Now, we have a list of cl_pages under the \a lock, we need
- * to check if some of pages are covered by other ldlm lock.
- * If this is the case, they aren't needed to be written out this time.
- *
- * For example, we have A:[0,200] & B:[100,300] PW locks on client, now
- * the latter is to be canceled, this means other client is
- * reading/writing [200,300] since A won't canceled. Actually
- * we just need to write the pages covered by [200,300]. This is safe,
- * since [100,200] is also protected lock A.
- */
+ /* No need to fix for WRITE lock because it is exclusive. */
+ if (lock->cll_descr.cld_mode >= CLM_WRITE)
+ RETURN_EXIT;
+ /* For those pages who are still covered by other PR locks, we should
+ * not discard them otherwise a [0, EOF) PR lock will discard all
+ * pages.
+ */
cl_page_list_init(plist);
cl_page_list_for_each_safe(page, temp, queue) {
pgoff_t idx = page->cp_index;
struct cl_io *io = &info->clt_io;
struct cl_2queue *queue = &info->clt_queue;
struct cl_lock_descr *descr = &lock->cll_descr;
+ struct lu_device_type *dtype;
long page_count;
- int nonblock = 1, resched;
+ pgoff_t next_index;
+ int res;
int result;
LINVRNT(cl_lock_invariant(env, lock));
if (result != 0)
GOTO(out, result);
+ dtype = descr->cld_obj->co_lu.lo_dev->ld_type;
+ next_index = descr->cld_start;
do {
+ const struct cl_page_slice *slice;
+
cl_2queue_init(queue);
- cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start,
- descr->cld_end, &queue->c2_qin, nonblock,
- &resched);
+ res = cl_page_gang_lookup(env, descr->cld_obj, io,
+ next_index, descr->cld_end,
+ &queue->c2_qin);
page_count = queue->c2_qin.pl_nr;
- if (page_count > 0) {
- result = cl_page_list_unmap(env, io, &queue->c2_qin);
- if (!discard) {
- long timeout = 600; /* 10 minutes. */
- /* for debug purpose, if this request can't be
- * finished in 10 minutes, we hope it can
- * notify us.
- */
- result = cl_io_submit_sync(env, io, CRT_WRITE,
- queue, CRP_CANCEL,
- timeout);
- if (result)
- CWARN("Writing %lu pages error: %d\n",
- page_count, result);
- }
- cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
- cl_2queue_discard(env, io, queue);
- cl_2queue_disown(env, io, queue);
+ if (page_count == 0)
+ break;
+
+ /* cl_page_gang_lookup() uses subobj and sublock to look for
+ * covered pages, but @queue->c2_qin contains the list of top
+ * pages. We have to turn the page back to subpage so as to
+ * get `correct' next index. -jay */
+ slice = cl_page_at(cl_page_list_last(&queue->c2_qin), dtype);
+ next_index = slice->cpl_page->cp_index + 1;
+
+ result = cl_page_list_unmap(env, io, &queue->c2_qin);
+ if (!discard) {
+ long timeout = 600; /* 10 minutes. */
+ /* for debug purpose, if this request can't be
+ * finished in 10 minutes, we hope it can notify us.
+ */
+ result = cl_io_submit_sync(env, io, CRT_WRITE, queue,
+ CRP_CANCEL, timeout);
+ if (result)
+ CWARN("Writing %lu pages error: %d\n",
+ page_count, result);
}
+ cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
+ cl_2queue_discard(env, io, queue);
+ cl_2queue_disown(env, io, queue);
cl_2queue_fini(env, queue);
- if (resched)
+ if (next_index > descr->cld_end)
+ break;
+
+ if (res == CLP_GANG_RESCHED)
cfs_cond_resched();
- } while (resched || nonblock--);
+ } while (res != CLP_GANG_OKAY);
out:
cl_io_fini(env, io);
RETURN(result);
*
* Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
* crucial in the face of [offset, EOF] locks.
+ *
+ * Return at least one page in @queue unless there is no covered page.
*/
-void cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
- struct cl_io *io, pgoff_t start, pgoff_t end,
- struct cl_page_list *queue, int nonblock,
- int *resched)
+int cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
+ struct cl_io *io, pgoff_t start, pgoff_t end,
+ struct cl_page_list *queue)
{
struct cl_object_header *hdr;
struct cl_page *page;
unsigned int nr;
unsigned int i;
unsigned int j;
- int (*page_own)(const struct lu_env *env,
- struct cl_io *io,
- struct cl_page *pg);
+ int res = CLP_GANG_OKAY;
+ int tree_lock = 1;
ENTRY;
- if (resched != NULL)
- *resched = 0;
- page_own = nonblock ? cl_page_own_try : cl_page_own;
-
idx = start;
hdr = cl_object_header(obj);
pvec = cl_env_info(env)->clt_pvec;
cfs_spin_lock(&hdr->coh_page_guard);
while ((nr = radix_tree_gang_lookup(&hdr->coh_tree, (void **)pvec,
idx, CLT_PVEC_SIZE)) > 0) {
+ int end_of_region = 0;
idx = pvec[nr - 1]->cp_index + 1;
for (i = 0, j = 0; i < nr; ++i) {
page = pvec[i];
pvec[i] = NULL;
LASSERT(page->cp_type == CPT_CACHEABLE);
- if (page->cp_index > end)
+ if (page->cp_index > end) {
+ end_of_region = 1;
break;
+ }
if (page->cp_state == CPS_FREEING)
continue;
* error in the latter case).
*/
cfs_spin_unlock(&hdr->coh_page_guard);
+ tree_lock = 0;
+
for (i = 0; i < j; ++i) {
page = pvec[i];
- if (page_own(env, io, page) == 0)
- cl_page_list_add(queue, page);
+ if (res == CLP_GANG_OKAY) {
+ typeof(cl_page_own) *page_own;
+
+ page_own = queue->pl_nr ?
+ cl_page_own_try : cl_page_own;
+ if (page_own(env, io, page) == 0) {
+ cl_page_list_add(queue, page);
+ } else if (page->cp_state != CPS_FREEING) {
+ /* cl_page_own() won't fail unless
+ * the page is being freed. */
+ LASSERT(queue->pl_nr != 0);
+ res = CLP_GANG_AGAIN;
+ }
+ }
lu_ref_del(&page->cp_reference,
"page_list", cfs_current());
cl_page_put(env, page);
}
- cfs_spin_lock(&hdr->coh_page_guard);
- if (nr < CLT_PVEC_SIZE)
+ if (nr < CLT_PVEC_SIZE || end_of_region)
break;
- if (resched != NULL && cfs_need_resched()) {
- *resched = 1;
+
+ /* if the number of pages is zero, this will mislead the caller
+ * that there is no page any more. */
+ if (queue->pl_nr && cfs_need_resched())
+ res = CLP_GANG_RESCHED;
+ if (res != CLP_GANG_OKAY)
break;
- }
+
+ cfs_spin_lock(&hdr->coh_page_guard);
+ tree_lock = 1;
}
- cfs_spin_unlock(&hdr->coh_page_guard);
- EXIT;
+ if (tree_lock)
+ cfs_spin_unlock(&hdr->coh_page_guard);
+ RETURN(res);
}
EXPORT_SYMBOL(cl_page_gang_lookup);
io = cl_io_top(io);
if (pg->cp_state == CPS_FREEING) {
- result = -EAGAIN;
+ result = -ENOENT;
} else {
result = CL_PAGE_INVOKE(env, pg, CL_PAGE_OP(cpo_own),
(const struct lu_env *,
cl_page_state_set(env, pg, CPS_OWNED);
} else {
cl_page_disown0(env, io, pg);
- result = -EAGAIN;
+ result = -ENOENT;
}
}
}
struct cl_object *obj = cl_object_top(clobj);
struct cl_io *io;
struct cl_page_list *plist;
- int resched;
int result;
ENTRY;
do {
cl_page_list_init(plist);
- cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF, plist, 0,
- &resched);
+ result = cl_page_gang_lookup(env, obj, io, 0, CL_PAGE_EOF,
+ plist);
/*
* Since we're purging the pages of an object, we don't care
* the possible outcomes of the following functions.
cl_page_list_disown(env, io, plist);
cl_page_list_fini(env, plist);
- if (resched)
+ if (result == CLP_GANG_RESCHED)
cfs_cond_resched();
- } while (resched);
+ } while (result != CLP_GANG_OKAY);
cl_io_fini(env, io);
RETURN(result);
* XXX this is quite expensive check.
*/
cl_page_list_init(list);
- cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list, 0);
+ cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list);
cl_page_list_for_each(page, list)
CL_PAGE_DEBUG(D_ERROR, env, page, "exists %lu\n", start);
if (dlmlock != NULL) {
int do_cancel;
- discard = dlmlock->l_flags & LDLM_FL_DISCARD_DATA;
+ discard = !!(dlmlock->l_flags & LDLM_FL_DISCARD_DATA);
result = osc_lock_flush(olck, discard);
osc_lock_unhold(olck);
io->ci_obj = cl_object_top(obj);
cl_io_init(env, io, CIT_MISC, io->ci_obj);
cl_page_gang_lookup(env, obj, io,
- descr->cld_start, descr->cld_end, plist, 0,
- NULL);
+ descr->cld_start, descr->cld_end, plist);
cl_lock_page_list_fixup(env, io, lock, plist);
if (plist->pl_nr > 0) {
CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n");
lvb->lvb_ctime = attr->cat_ctime;
if (valid & CAT_BLOCKS)
lvb->lvb_blocks = attr->cat_blocks;
- if (valid & CAT_KMS)
+ if (valid & CAT_KMS) {
+ CDEBUG(D_CACHE, "set kms from "LPU64"to "LPU64"\n",
+ oinfo->loi_kms, (__u64)attr->cat_kms);
loi_kms_set(oinfo, attr->cat_kms);
+ }
return 0;
}
if (rc == 0 && oap->oap_brw_flags & OBD_BRW_SRVLOCK) {
struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev;
struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
- int bytes = opg->ops_to - opg->ops_from;
+ int bytes = oap->oap_count;
if (crt == CRT_READ)
stats->os_lockless_reads += bytes;
pg_prev = pga[0];
for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
struct brw_page *pg = pga[i];
+ int poff = pg->off & ~CFS_PAGE_MASK;
LASSERT(pg->count > 0);
- LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
- "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
- pg->off, pg->count);
+ /* make sure there is no gap in the middle of page array */
+ LASSERTF(page_count == 1 ||
+ (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
+ ergo(i > 0 && i < page_count - 1,
+ poff == 0 && pg->count == CFS_PAGE_SIZE) &&
+ ergo(i == page_count - 1, poff == 0)),
+ "i: %d/%d pg: %p off: "LPU64", count: %u\n",
+ i, page_count, pg, pg->off, pg->count);
#ifdef __linux__
LASSERTF(i == 0 || pg->off > pg_prev->off,
"i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
(pg->flag & OBD_BRW_SRVLOCK));
- ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
- pg->count);
+ ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
requested_nob += pg->count;
if (i > 0 && can_merge_pages(pg_prev, pg)) {
struct osc_brw_async_args *aa;
const struct obd_async_page_ops *ops;
CFS_LIST_HEAD(rpc_list);
- CFS_LIST_HEAD(tmp_list);
- unsigned int ending_offset;
- obd_off starting_offset = OBD_OBJECT_EOF;
- int starting_page_off = 0;
int srvlock = 0, mem_tight = 0;
struct cl_object *clob = NULL;
+ obd_off starting_offset = OBD_OBJECT_EOF;
+ unsigned int ending_offset;
+ int starting_page_off = 0;
ENTRY;
/* ASYNC_HP pages first. At present, when the lock the pages is
* with ASYNC_HP. We have to send out them as soon as possible. */
cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
if (oap->oap_async_flags & ASYNC_HP)
- cfs_list_move(&oap->oap_pending_item, &tmp_list);
- else
- cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
+ cfs_list_move(&oap->oap_pending_item, &lop->lop_pending);
if (++page_count >= cli->cl_max_pages_per_rpc)
break;
}
-
- cfs_list_splice(&tmp_list, &lop->lop_pending);
page_count = 0;
/* first we find the pages we're allowed to work with */
/* now put the page back in our accounting */
cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
+ if (page_count++ == 0) {
+ srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
+ starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
+ (PTLRPC_MAX_BRW_SIZE - 1);
+ }
+
if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
mem_tight = 1;
- if (page_count == 0)
- srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
- if (++page_count >= cli->cl_max_pages_per_rpc)
- break;
/* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
* RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
* have the same alignment as the initial writes that allocated
* extents on the server. */
- ending_offset = (oap->oap_obj_off + oap->oap_page_off +
- oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
- if (ending_offset == 0)
+ ending_offset = oap->oap_obj_off + oap->oap_page_off +
+ oap->oap_count;
+ if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
+ break;
+
+ if (page_count >= cli->cl_max_pages_per_rpc)
break;
/* If there is a gap at the end of this page, it can't merge
if (rc > 0)
race_counter = 0;
- else
+ else if (rc == 0)
race_counter++;
}
if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
if (rc > 0)
race_counter = 0;
- else
+ else if (rc == 0)
race_counter++;
}
/* force the caller to try sync io. this can jump the list
* of queued writes and create a discontiguous rpc stream */
- if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
- loi->loi_ar.ar_force_sync)
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
+ cli->cl_dirty_max < CFS_PAGE_SIZE ||
+ cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
RETURN(-EDQUOT);
/* Hopefully normal case - cache space and write credits available */
}
run_test 218 "parallel read and truncate should not deadlock ======================="
+test_219() {
+ # write one partial page
+ dd if=/dev/zero of=$DIR/$tfile bs=1024 count=1
+ # set no grant so vvp_io_commit_write will do sync write
+ $LCTL set_param fail_loc=0x411
+ # write a full page at the end of file
+ dd if=/dev/zero of=$DIR/$tfile bs=4096 count=1 seek=1 conv=notrunc
+
+ $LCTL set_param fail_loc=0
+ dd if=/dev/zero of=$DIR/$tfile bs=4096 count=1 seek=3
+ $LCTL set_param fail_loc=0x411
+ dd if=/dev/zero of=$DIR/$tfile bs=1024 count=1 seek=2 conv=notrunc
+}
+run_test 219 "LU-394: Write partial won't cause uncontiguous pages vec at LND"
+
#
# tests that do cleanup/setup should be run at the end
#