From: Jinshan Xiong Date: Tue, 12 Jun 2012 00:03:56 +0000 (-0700) Subject: LU-1030 osc: new IO engine implementation X-Git-Tag: 2.2.57~7 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=9fe4b52ad2ffadf125d9b5c78bb2ff9a01725707 LU-1030 osc: new IO engine implementation New IO engine to manage dirty pages with osc_extent. Osc_extent is a data structure to manage a series of contiguous blocks; however, the pages in an extent is not required to be contiguous. An extent must be written out in one RPC. The purpose of introducing extents are: 1. make grants work for extent-based OSD; 2. form better IO by picking up contiguous pages to compose RPC; 3. reimplement ll_writepages() with CIT_FSYNC. Signed-off-by: Jinshan Xiong Change-Id: I3ef619c1f07eefd201236ab55e5fd858791d41e0 Reviewed-on: http://review.whamcloud.com/2270 Reviewed-by: Johann Lombardi Tested-by: Hudson Tested-by: Maloo Reviewed-by: Andreas Dilger --- diff --git a/libcfs/include/libcfs/linux/libcfs.h b/libcfs/include/libcfs/linux/libcfs.h index a6c8edd..3635a7f 100644 --- a/libcfs/include/libcfs/linux/libcfs.h +++ b/libcfs/include/libcfs/linux/libcfs.h @@ -62,6 +62,7 @@ #include #include #include /* THREAD_SIZE */ +#include #define CFS_THREAD_SIZE THREAD_SIZE #define LUSTRE_TRACE_SIZE (THREAD_SIZE >> 5) diff --git a/libcfs/include/libcfs/posix/libcfs.h b/libcfs/include/libcfs/posix/libcfs.h index 57a6239..17d37ce 100644 --- a/libcfs/include/libcfs/posix/libcfs.h +++ b/libcfs/include/libcfs/posix/libcfs.h @@ -415,6 +415,78 @@ static inline void radix_tree_preload_end(void) { } +/*************************************************************************** + * + * Linux kernel red black tree emulation. + * + ***************************************************************************/ +struct rb_node { + unsigned long rb_parent_color; +#define RB_RED 0 +#define RB_BLACK 1 + struct rb_node *rb_right; + struct rb_node *rb_left; +}; + +struct rb_root { + struct rb_node *rb_node; +}; + + +#define rb_parent(r) ((struct rb_node *)((r)->rb_parent_color & ~3)) +#define rb_color(r) ((r)->rb_parent_color & 1) +#define rb_is_red(r) (!rb_color(r)) +#define rb_is_black(r) rb_color(r) +#define rb_set_red(r) do { (r)->rb_parent_color &= ~1; } while (0) +#define rb_set_black(r) do { (r)->rb_parent_color |= 1; } while (0) + +static inline void rb_set_parent(struct rb_node *rb, struct rb_node *p) +{ + rb->rb_parent_color = (rb->rb_parent_color & 3) | (unsigned long)p; +} +static inline void rb_set_color(struct rb_node *rb, int color) +{ + rb->rb_parent_color = (rb->rb_parent_color & ~1) | color; +} + +#define RB_ROOT ((struct rb_root) { NULL, }) +#define rb_entry(ptr, type, member) container_of(ptr, type, member) + +#define RB_EMPTY_ROOT(root) ((root)->rb_node == NULL) +#define RB_EMPTY_NODE(node) (rb_parent(node) == node) +#define RB_CLEAR_NODE(node) (rb_set_parent(node, node)) + +static inline void rb_init_node(struct rb_node *rb) +{ + rb->rb_parent_color = 0; + rb->rb_right = NULL; + rb->rb_left = NULL; + RB_CLEAR_NODE(rb); +} + +extern void rb_insert_color(struct rb_node *, struct rb_root *); +extern void rb_erase(struct rb_node *, struct rb_root *); + +/* Find logical next and previous nodes in a tree */ +extern struct rb_node *rb_next(const struct rb_node *); +extern struct rb_node *rb_prev(const struct rb_node *); +extern struct rb_node *rb_first(const struct rb_root *); +extern struct rb_node *rb_last(const struct rb_root *); +static inline void rb_link_node(struct rb_node *node, struct rb_node *parent, + struct rb_node **rb_link) +{ + node->rb_parent_color = (unsigned long)parent; + node->rb_left = node->rb_right = NULL; + + *rb_link = node; +} + +/*************************************************************************** + * + * End of Linux kernel red black tree emulation. + * + ***************************************************************************/ + typedef ssize_t (*read_actor_t)(); #define CFS_IFSHIFT 12 diff --git a/libcfs/libcfs/autoMakefile.am b/libcfs/libcfs/autoMakefile.am index 7ce8133..3b143e7 100644 --- a/libcfs/libcfs/autoMakefile.am +++ b/libcfs/libcfs/autoMakefile.am @@ -44,7 +44,8 @@ if LIBLUSTRE noinst_LIBRARIES= libcfs.a libcfs_a_SOURCES= posix/posix-debug.c user-prim.c user-lock.c user-tcpip.c \ prng.c user-bitops.c user-mem.c hash.c kernel_user_comm.c \ - workitem.c fail.c libcfs_cpu.c libcfs_mem.c libcfs_lock.c + workitem.c fail.c libcfs_cpu.c libcfs_mem.c libcfs_lock.c \ + posix/rbtree.c libcfs_a_CPPFLAGS = $(LLCPPFLAGS) libcfs_a_CFLAGS = $(LLCFLAGS) endif @@ -70,7 +71,7 @@ nodist_libcfs_SOURCES := darwin/darwin-sync.c darwin/darwin-mem.c \ darwin/darwin-debug.c darwin/darwin-proc.c \ darwin/darwin-tracefile.c darwin/darwin-module.c \ posix/posix-debug.c module.c tracefile.c nidstrings.c watchdog.c \ - kernel_user_comm.c hash.c + kernel_user_comm.c hash.c posix/rbtree.c libcfs_CFLAGS := $(EXTRA_KCFLAGS) libcfs_LDFLAGS := $(EXTRA_KLDFLAGS) diff --git a/libcfs/libcfs/posix/rbtree.c b/libcfs/libcfs/posix/rbtree.c new file mode 100644 index 0000000..9d0e5bb --- /dev/null +++ b/libcfs/libcfs/posix/rbtree.c @@ -0,0 +1,333 @@ +/* + Red Black Trees + (C) 1999 Andrea Arcangeli + (C) 2002 David Woodhouse + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + linux/lib/rbtree.c +*/ + +#include + +static void __rb_rotate_left(struct rb_node *node, struct rb_root *root) +{ + struct rb_node *right = node->rb_right; + struct rb_node *parent = rb_parent(node); + + node->rb_right = right->rb_left; + if (node->rb_right != NULL) + rb_set_parent(right->rb_left, node); + right->rb_left = node; + + rb_set_parent(right, parent); + + if (parent) { + if (node == parent->rb_left) + parent->rb_left = right; + else + parent->rb_right = right; + } else + root->rb_node = right; + rb_set_parent(node, right); +} + +static void __rb_rotate_right(struct rb_node *node, struct rb_root *root) +{ + struct rb_node *left = node->rb_left; + struct rb_node *parent = rb_parent(node); + + node->rb_left = left->rb_right; + if (node->rb_left != NULL) + rb_set_parent(left->rb_right, node); + left->rb_right = node; + + rb_set_parent(left, parent); + + if (parent) { + if (node == parent->rb_right) + parent->rb_right = left; + else + parent->rb_left = left; + } else + root->rb_node = left; + rb_set_parent(node, left); +} + +void rb_insert_color(struct rb_node *node, struct rb_root *root) +{ + struct rb_node *parent, *gparent; + + while ((parent = rb_parent(node)) != NULL && rb_is_red(parent)) { + gparent = rb_parent(parent); + + if (parent == gparent->rb_left) { + register struct rb_node *uncle = gparent->rb_right; + if (uncle && rb_is_red(uncle)) { + rb_set_black(uncle); + rb_set_black(parent); + rb_set_red(gparent); + node = gparent; + continue; + } + + if (parent->rb_right == node) { + register struct rb_node *tmp; + __rb_rotate_left(parent, root); + tmp = parent; + parent = node; + node = tmp; + } + + rb_set_black(parent); + rb_set_red(gparent); + __rb_rotate_right(gparent, root); + } else { + register struct rb_node *uncle = gparent->rb_left; + if (uncle && rb_is_red(uncle)) { + rb_set_black(uncle); + rb_set_black(parent); + rb_set_red(gparent); + node = gparent; + continue; + } + + if (parent->rb_left == node) { + register struct rb_node *tmp; + __rb_rotate_right(parent, root); + tmp = parent; + parent = node; + node = tmp; + } + + rb_set_black(parent); + rb_set_red(gparent); + __rb_rotate_left(gparent, root); + } + } + + rb_set_black(root->rb_node); +} + +static void __rb_erase_color(struct rb_node *node, struct rb_node *parent, + struct rb_root *root) +{ + struct rb_node *ptr; + + while ((!node || rb_is_black(node)) && node != root->rb_node) { + if (parent->rb_left == node) { + ptr = parent->rb_right; + if (rb_is_red(ptr)) { + rb_set_black(ptr); + rb_set_red(parent); + __rb_rotate_left(parent, root); + ptr = parent->rb_right; + } + if ((!ptr->rb_left || rb_is_black(ptr->rb_left)) && + (!ptr->rb_right || rb_is_black(ptr->rb_right))) { + rb_set_red(ptr); + node = parent; + parent = rb_parent(node); + } else { + if (!ptr->rb_right || + rb_is_black(ptr->rb_right)) { + rb_set_black(ptr->rb_left); + rb_set_red(ptr); + __rb_rotate_right(ptr, root); + ptr = parent->rb_right; + } + rb_set_color(ptr, rb_color(parent)); + rb_set_black(parent); + rb_set_black(ptr->rb_right); + __rb_rotate_left(parent, root); + node = root->rb_node; + break; + } + } else { + ptr = parent->rb_left; + if (rb_is_red(ptr)) { + rb_set_black(ptr); + rb_set_red(parent); + __rb_rotate_right(parent, root); + ptr = parent->rb_left; + } + if ((!ptr->rb_left || rb_is_black(ptr->rb_left)) && + (!ptr->rb_right || rb_is_black(ptr->rb_right))) { + rb_set_red(ptr); + node = parent; + parent = rb_parent(node); + } else { + if (!ptr->rb_left || + rb_is_black(ptr->rb_left)) { + rb_set_black(ptr->rb_right); + rb_set_red(ptr); + __rb_rotate_left(ptr, root); + ptr = parent->rb_left; + } + rb_set_color(ptr, rb_color(parent)); + rb_set_black(parent); + rb_set_black(ptr->rb_left); + __rb_rotate_right(parent, root); + node = root->rb_node; + break; + } + } + } + if (node) + rb_set_black(node); +} + +void rb_erase(struct rb_node *node, struct rb_root *root) +{ + struct rb_node *child, *parent; + int color; + + if (!node->rb_left) + child = node->rb_right; + else if (!node->rb_right) + child = node->rb_left; + else { + struct rb_node *old = node, *left; + + node = node->rb_right; + while ((left = node->rb_left) != NULL) + node = left; + + if (rb_parent(old)) { + if (rb_parent(old)->rb_left == old) + rb_parent(old)->rb_left = node; + else + rb_parent(old)->rb_right = node; + } else + root->rb_node = node; + + child = node->rb_right; + parent = rb_parent(node); + color = rb_color(node); + + if (parent == old) { + parent = node; + } else { + if (child) + rb_set_parent(child, parent); + parent->rb_left = child; + + node->rb_right = old->rb_right; + rb_set_parent(old->rb_right, node); + } + + node->rb_parent_color = old->rb_parent_color; + node->rb_left = old->rb_left; + rb_set_parent(old->rb_left, node); + + goto color; + } + + parent = rb_parent(node); + color = rb_color(node); + + if (child) + rb_set_parent(child, parent); + if (parent) { + if (parent->rb_left == node) + parent->rb_left = child; + else + parent->rb_right = child; + } else + root->rb_node = child; + + color: + if (color == RB_BLACK) + __rb_erase_color(child, parent, root); +} + +/* + * This function returns the first node (in sort order) of the tree. + */ +struct rb_node *rb_first(const struct rb_root *root) +{ + struct rb_node *n; + + n = root->rb_node; + if (!n) + return NULL; + while (n->rb_left) + n = n->rb_left; + return n; +} + +struct rb_node *rb_last(const struct rb_root *root) +{ + struct rb_node *n; + + n = root->rb_node; + if (!n) + return NULL; + while (n->rb_right) + n = n->rb_right; + return n; +} + +struct rb_node *rb_next(const struct rb_node *node) +{ + struct rb_node *parent; + + if (rb_parent(node) == node) + return NULL; + + /* If we have a right-hand child, go down and then left as far + as we can. */ + if (node->rb_right) { + node = node->rb_right; + while (node->rb_left) + node = node->rb_left; + return (struct rb_node *)node; + } + + /* No right-hand children. Everything down and left is + smaller than us, so any 'next' node must be in the general + direction of our parent. Go up the tree; any time the + ancestor is a right-hand child of its parent, keep going + up. First time it's a left-hand child of its parent, said + parent is our 'next' node. */ + while ((parent = rb_parent(node)) && node == parent->rb_right) + node = parent; + + return parent; +} + +struct rb_node *rb_prev(const struct rb_node *node) +{ + struct rb_node *parent; + + if (rb_parent(node) == node) + return NULL; + + /* If we have a left-hand child, go down and then right as far + as we can. */ + if (node->rb_left) { + node = node->rb_left; + while (node->rb_right) + node = node->rb_right; + return (struct rb_node *)node; + } + + /* No left-hand children. Go up till we find an ancestor which + is a right-hand child of its parent */ + while ((parent = rb_parent(node)) && node == parent->rb_left) + node = parent; + + return parent; +} diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 1acd1ba..53bb5ca 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -1054,6 +1054,15 @@ struct cl_page_operations { */ int (*cpo_cancel)(const struct lu_env *env, const struct cl_page_slice *slice); + /** + * Write out a page by kernel. This is only called by ll_writepage + * right now. + * + * \see cl_page_flush() + */ + int (*cpo_flush)(const struct lu_env *env, + const struct cl_page_slice *slice, + struct cl_io *io); /** @} transfer */ }; @@ -1960,11 +1969,6 @@ enum cl_io_state { CIS_FINI }; -enum cl_req_priority { - CRP_NORMAL, - CRP_CANCEL -}; - /** * IO state private for a layer. * @@ -2082,8 +2086,7 @@ struct cl_io_operations { int (*cio_submit)(const struct lu_env *env, const struct cl_io_slice *slice, enum cl_req_type crt, - struct cl_2queue *queue, - enum cl_req_priority priority); + struct cl_2queue *queue); } req_op[CRT_NR]; /** * Read missing page. @@ -2245,6 +2248,18 @@ enum cl_io_lock_dmd { CILR_PEEK }; +enum cl_fsync_mode { + /** start writeback, do not wait for them to finish */ + CL_FSYNC_NONE = 0, + /** start writeback and wait for them to finish */ + CL_FSYNC_LOCAL = 1, + /** discard all of dirty pages in a specific file range */ + CL_FSYNC_DISCARD = 2, + /** start writeback and make sure they have reached storage before + * return. OST_SYNC RPC must be issued and finished */ + CL_FSYNC_ALL = 3 +}; + struct cl_io_rw_common { loff_t crw_pos; size_t crw_count; @@ -2291,6 +2306,7 @@ struct cl_io { struct cl_wr_io { struct cl_io_rw_common wr; int wr_append; + int wr_sync; } ci_wr; struct cl_io_rw_common ci_rw; struct cl_setattr_io { @@ -2318,6 +2334,9 @@ struct cl_io { struct obd_capa *fi_capa; /** file system level fid */ struct lu_fid *fi_fid; + enum cl_fsync_mode fi_mode; + /* how many pages were written/discarded */ + unsigned int fi_nr_written; } ci_fsync; } u; struct cl_2queue ci_queue; @@ -2769,6 +2788,8 @@ int cl_page_cache_add (const struct lu_env *env, struct cl_io *io, void cl_page_clip (const struct lu_env *env, struct cl_page *pg, int from, int to); int cl_page_cancel (const struct lu_env *env, struct cl_page *page); +int cl_page_flush (const struct lu_env *env, struct cl_io *io, + struct cl_page *pg); /** @} transfer */ @@ -2815,9 +2836,19 @@ struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io, struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io, const struct cl_lock_descr *need, const char *scope, const void *source); -struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj, - struct cl_page *page, struct cl_lock *except, - int pending, int canceld); +struct cl_lock *cl_lock_at_pgoff(const struct lu_env *env, + struct cl_object *obj, pgoff_t index, + struct cl_lock *except, int pending, + int canceld); +static inline struct cl_lock *cl_lock_at_page(const struct lu_env *env, + struct cl_object *obj, + struct cl_page *page, + struct cl_lock *except, + int pending, int canceld) +{ + return cl_lock_at_pgoff(env, obj, page->cp_index, except, + pending, canceld); +} const struct cl_lock_slice *cl_lock_at(const struct cl_lock *lock, const struct lu_device_type *dtype); @@ -2899,8 +2930,7 @@ int cl_lock_mutex_try (const struct lu_env *env, struct cl_lock *lock); void cl_lock_mutex_put (const struct lu_env *env, struct cl_lock *lock); int cl_lock_is_mutexed (struct cl_lock *lock); int cl_lock_nr_mutexed (const struct lu_env *env); -int cl_lock_page_out (const struct lu_env *env, struct cl_lock *lock, - int discard); +int cl_lock_discard_pages(const struct lu_env *env, struct cl_lock *lock); int cl_lock_ext_match (const struct cl_lock_descr *has, const struct cl_lock_descr *need); int cl_lock_descr_match(const struct cl_lock_descr *has, @@ -2958,11 +2988,10 @@ int cl_io_prepare_write(const struct lu_env *env, struct cl_io *io, int cl_io_commit_write (const struct lu_env *env, struct cl_io *io, struct cl_page *page, unsigned from, unsigned to); int cl_io_submit_rw (const struct lu_env *env, struct cl_io *io, - enum cl_req_type iot, struct cl_2queue *queue, - enum cl_req_priority priority); + enum cl_req_type iot, struct cl_2queue *queue); int cl_io_submit_sync (const struct lu_env *env, struct cl_io *io, - enum cl_req_type iot, struct cl_2queue *queue, - enum cl_req_priority priority, long timeout); + enum cl_req_type iot, struct cl_2queue *queue, + long timeout); void cl_io_rw_advance (const struct lu_env *env, struct cl_io *io, size_t nob); int cl_io_cancel (const struct lu_env *env, struct cl_io *io, @@ -2977,6 +3006,16 @@ static inline int cl_io_is_append(const struct cl_io *io) return io->ci_type == CIT_WRITE && io->u.ci_wr.wr_append; } +static inline int cl_io_is_sync_write(const struct cl_io *io) +{ + return io->ci_type == CIT_WRITE && io->u.ci_wr.wr_sync; +} + +static inline int cl_io_is_mkwrite(const struct cl_io *io) +{ + return io->ci_type == CIT_FAULT && io->u.ci_fault.ft_mkwrite; +} + /** * True, iff \a io is a truncate(2). */ diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index 6b78f62..5c8024f 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -54,6 +54,7 @@ # include #endif +#define CLIENT_OBD_LIST_LOCK_DEBUG 1 typedef struct { cfs_spinlock_t lock; diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 0ab359f..ff325fb 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -244,7 +244,7 @@ union ptlrpc_async_args { * least big enough for that. */ void *pointer_arg[11]; - __u64 space[6]; + __u64 space[7]; }; struct ptlrpc_request_set; diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 61e6649..07ef9af 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -453,10 +453,22 @@ struct client_obd { long cl_dirty_transit; /* dirty synchronous */ long cl_avail_grant; /* bytes of credit for ost */ long cl_lost_grant; /* lost credits (trunc) */ - cfs_list_t cl_cache_waiters; /* waiting for cache/grant */ - cfs_time_t cl_next_shrink_grant; /* jiffies */ - cfs_list_t cl_grant_shrink_list; /* Timeout event list */ - int cl_grant_shrink_interval; /* seconds */ + + /* since we allocate grant by blocks, we don't know how many grant will + * be used to add a page into cache. As a solution, we reserve maximum + * grant before trying to dirty a page and unreserve the rest. + * See osc_{reserve|unreserve}_grant for details. */ + long cl_reserved_grant; + cfs_list_t cl_cache_waiters; /* waiting for cache/grant */ + cfs_time_t cl_next_shrink_grant; /* jiffies */ + cfs_list_t cl_grant_shrink_list; /* Timeout event list */ + int cl_grant_shrink_interval; /* seconds */ + + /* A chunk is an optimal size used by osc_extent to determine + * the extent size. A chunk is max(CFS_PAGE_SIZE, OST block size) */ + int cl_chunkbits; + int cl_chunk; + int cl_extent_tax; /* extent overhead, by bytes */ /* keep track of objects that have lois that contain pages which * have been queued for async brw. this lock also protects the @@ -478,7 +490,7 @@ struct client_obd { * * NB by Jinshan: though field names are still _loi_, but actually * osc_object{}s are in the list. - */ + */ client_obd_lock_t cl_loi_list_lock; cfs_list_t cl_loi_ready_list; cfs_list_t cl_loi_hp_ready_list; @@ -487,9 +499,9 @@ struct client_obd { int cl_r_in_flight; int cl_w_in_flight; /* just a sum of the loi/lop pending numbers to be exported by /proc */ - int cl_pending_w_pages; - int cl_pending_r_pages; - int cl_max_pages_per_rpc; + cfs_atomic_t cl_pending_w_pages; + cfs_atomic_t cl_pending_r_pages; + int cl_max_pages_per_rpc; int cl_max_rpcs_in_flight; struct obd_histogram cl_read_rpc_hist; struct obd_histogram cl_write_rpc_hist; diff --git a/lustre/include/obd_ost.h b/lustre/include/obd_ost.h index 17d9c03..761d780 100644 --- a/lustre/include/obd_ost.h +++ b/lustre/include/obd_ost.h @@ -54,6 +54,7 @@ struct osc_brw_async_args { struct brw_page **aa_ppga; struct client_obd *aa_cli; cfs_list_t aa_oaps; + cfs_list_t aa_exts; struct obd_capa *aa_ocapa; struct cl_req *aa_clerq; }; diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 213e94d..31d55f4 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -313,6 +313,8 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg) CFS_INIT_LIST_HEAD(&cli->cl_loi_write_list); CFS_INIT_LIST_HEAD(&cli->cl_loi_read_list); client_obd_list_lock_init(&cli->cl_loi_list_lock); + cfs_atomic_set(&cli->cl_pending_w_pages, 0); + cfs_atomic_set(&cli->cl_pending_r_pages, 0); cli->cl_r_in_flight = 0; cli->cl_w_in_flight = 0; diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 3fc1345..c484953 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -460,7 +460,6 @@ static void ldlm_res_hop_get_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode) res = cfs_hlist_entry(hnode, struct ldlm_resource, lr_hash); ldlm_resource_getref(res); - LDLM_RESOURCE_ADDREF(res); } static void ldlm_res_hop_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode) @@ -469,7 +468,6 @@ static void ldlm_res_hop_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode) res = cfs_hlist_entry(hnode, struct ldlm_resource, lr_hash); /* cfs_hash_for_each_nolock is the only chance we call it */ - LDLM_RESOURCE_DELREF(res); ldlm_resource_putref_locked(res); } @@ -478,7 +476,6 @@ static void ldlm_res_hop_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode) struct ldlm_resource *res; res = cfs_hlist_entry(hnode, struct ldlm_resource, lr_hash); - LDLM_RESOURCE_DELREF(res); ldlm_resource_putref(res); } diff --git a/lustre/liblustre/llite_cl.c b/lustre/liblustre/llite_cl.c index d6fbf12..d6fc898 100644 --- a/lustre/liblustre/llite_cl.c +++ b/lustre/liblustre/llite_cl.c @@ -558,7 +558,7 @@ static int llu_queue_pio(const struct lu_env *env, struct cl_io *io, if (rc == 0) { enum cl_req_type iot; iot = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE; - rc = cl_io_submit_sync(env, io, iot, queue, CRP_NORMAL, 0); + rc = cl_io_submit_sync(env, io, iot, queue, 0); } group->lig_rc = rc; diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 109d3b9..cc6214c 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -809,8 +809,10 @@ void ll_io_init(struct cl_io *io, const struct file *file, int write) struct inode *inode = file->f_dentry->d_inode; io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK; - if (write) - io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND); + if (write) { + io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND); + io->u.ci_wr.wr_sync = file->f_flags & O_SYNC || IS_SYNC(inode); + } io->ci_obj = ll_i2info(inode)->lli_clob; io->ci_lockreq = CILR_MAYBE; if (ll_file_nolock(file)) { @@ -1985,8 +1987,11 @@ int ll_flush(struct file *file) /** * Called to make sure a portion of file has been written out. * if @local_only is not true, it will send OST_SYNC RPCs to ost. + * + * Return how many pages have been written. */ -int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end) +int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end, + enum cl_fsync_mode mode) { struct cl_env_nest nest; struct lu_env *env; @@ -1996,6 +2001,10 @@ int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end) int result; ENTRY; + if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL && + mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL) + RETURN(-EINVAL); + env = cl_env_nested_get(&nest); if (IS_ERR(env)) RETURN(PTR_ERR(env)); @@ -2011,11 +2020,15 @@ int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end) fio->fi_start = start; fio->fi_end = end; fio->fi_fid = ll_inode2fid(inode); + fio->fi_mode = mode; + fio->fi_nr_written = 0; if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0) result = cl_io_loop(env, io); else result = io->ci_result; + if (result == 0) + result = fio->fi_nr_written; cl_io_fini(env, io); cl_env_nested_put(&nest, env); @@ -2071,8 +2084,9 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data) ptlrpc_req_finished(req); if (data && lsm) { - err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF); - if (!rc) + err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF, + CL_FSYNC_ALL); + if (rc == 0 && err < 0) rc = err; lli->lli_write_rc = rc < 0 ? rc : 0; } diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 2ecdf02..cdf30ed 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -679,6 +679,7 @@ struct dentry *ll_splice_alias(struct inode *inode, struct dentry *de); int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to); int ll_commit_write(struct file *, struct page *, unsigned from, unsigned to); int ll_writepage(struct page *page, struct writeback_control *wbc); +int ll_writepages(struct address_space *, struct writeback_control *wbc); void ll_removepage(struct page *page); int ll_readpage(struct file *file, struct page *page); void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras); @@ -1407,7 +1408,8 @@ static inline int cl_merge_lvb(struct inode *inode) struct obd_capa *cl_capa_lookup(struct inode *inode, enum cl_req_type crt); -int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end); +int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end, + enum cl_fsync_mode mode); /** direct write pages */ struct ll_dio_pages { diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 78ba6bb..050a9a3 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -1839,13 +1839,19 @@ void ll_read_inode2(struct inode *inode, void *opaque) void ll_delete_inode(struct inode *inode) { - struct ll_sb_info *sbi = ll_i2sbi(inode); - int rc; - ENTRY; - - rc = obd_fid_delete(sbi->ll_md_exp, ll_inode2fid(inode)); - if (rc) - CERROR("fid_delete() failed, rc %d\n", rc); + struct cl_inode_info *lli = cl_i2info(inode); + struct ll_sb_info *sbi = ll_i2sbi(inode); + int rc; + ENTRY; + + rc = obd_fid_delete(sbi->ll_md_exp, ll_inode2fid(inode)); + if (rc) + CERROR("fid_delete() failed, rc %d\n", rc); + + if (S_ISREG(inode->i_mode) && lli->lli_clob != NULL) + /* discard all dirty pages before truncating them, required by + * osc_extent implementation at LU-1030. */ + cl_sync_file_range(inode, 0, OBD_OBJECT_EOF, CL_FSYNC_DISCARD); truncate_inode_pages(&inode->i_data, 0); diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index e0909f5..db4d451 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -141,7 +141,21 @@ static struct ll_cl_context *ll_cl_init(struct file *file, cio = ccc_env_io(env); io = cio->cui_cl.cis_io; if (io == NULL && create) { - loff_t pos; + struct inode *inode = vmpage->mapping->host; + loff_t pos; + + if (TRYLOCK_INODE_MUTEX(inode)) { + UNLOCK_INODE_MUTEX(inode); + + /* this is too bad. Someone is trying to write the + * page w/o holding inode mutex. This means we can + * add dirty pages into cache during truncate */ + CERROR("Proc %s is dirting page w/o inode lock, this" + "will break truncate.\n", cfs_current()->comm); + libcfs_debug_dumpstack(NULL); + LBUG(); + return ERR_PTR(-EIO); + } /* * Loop-back driver calls ->prepare_write() and ->sendfile() @@ -1152,8 +1166,8 @@ int ll_writepage(struct page *vmpage, struct writeback_control *wbc) struct cl_io *io; struct cl_page *page; struct cl_object *clob; - struct cl_2queue *queue; struct cl_env_nest nest; + int redirtied = 0; int result; ENTRY; @@ -1167,7 +1181,6 @@ int ll_writepage(struct page *vmpage, struct writeback_control *wbc) if (IS_ERR(env)) RETURN(PTR_ERR(env)); - queue = &vvp_env_info(env)->vti_queue; clob = ll_i2info(inode)->lli_clob; LASSERT(clob != NULL); @@ -1181,41 +1194,90 @@ int ll_writepage(struct page *vmpage, struct writeback_control *wbc) lu_ref_add(&page->cp_reference, "writepage", cfs_current()); cl_page_assume(env, io, page); - /* - * Mark page dirty, because this is what - * ->vio_submit()->cpo_prep_write() assumes. - * - * XXX better solution is to detect this from within - * cl_io_submit_rw() somehow. - */ - set_page_dirty(vmpage); - cl_2queue_init_page(queue, page); - result = cl_io_submit_rw(env, io, CRT_WRITE, - queue, CRP_NORMAL); - if (result != 0) { - /* - * Re-dirty page on error so it retries write, - * but not in case when IO has actually - * occurred and completed with an error. - */ - if (!PageError(vmpage)) { - redirty_page_for_writepage(wbc, vmpage); - result = 0; - } - } - cl_page_list_disown(env, io, &queue->c2_qin); - LASSERT(!cl_page_is_owned(page, io)); + result = cl_page_flush(env, io, page); + if (result != 0) { + /* + * Re-dirty page on error so it retries write, + * but not in case when IO has actually + * occurred and completed with an error. + */ + if (!PageError(vmpage)) { + redirty_page_for_writepage(wbc, vmpage); + result = 0; + redirtied = 1; + } + } + cl_page_disown(env, io, page); lu_ref_del(&page->cp_reference, "writepage", cfs_current()); cl_page_put(env, page); - cl_2queue_fini(env, queue); } } cl_io_fini(env, io); + + if (redirtied && wbc->sync_mode == WB_SYNC_ALL) { + loff_t offset = cl_offset(clob, vmpage->index); + + /* Flush page failed because the extent is being written out. + * Wait for the write of extent to be finished to avoid + * breaking kernel which assumes ->writepage should mark + * PageWriteback or clean the page. */ + result = cl_sync_file_range(inode, offset, + offset + CFS_PAGE_SIZE - 1, + CL_FSYNC_LOCAL); + if (result > 0) { + /* actually we may have written more than one page. + * decreasing this page because the caller will count + * it. */ + wbc->nr_to_write -= result - 1; + result = 0; + } + } + cl_env_nested_put(&nest, env); RETURN(result); } +int ll_writepages(struct address_space *mapping, struct writeback_control *wbc) +{ + struct inode *inode = mapping->host; + loff_t start; + loff_t end; + enum cl_fsync_mode mode; + int range_whole = 0; + int result; + ENTRY; + + if (wbc->range_cyclic) { + start = mapping->writeback_index << CFS_PAGE_SHIFT; + end = OBD_OBJECT_EOF; + } else { + start = wbc->range_start; + end = wbc->range_end; + if (end == LLONG_MAX) { + end = OBD_OBJECT_EOF; + range_whole = start == 0; + } + } + + mode = CL_FSYNC_NONE; + if (wbc->sync_mode == WB_SYNC_ALL) + mode = CL_FSYNC_LOCAL; + + result = cl_sync_file_range(inode, start, end, mode); + if (result > 0) { + wbc->nr_to_write -= result; + result = 0; + } + + if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) { + if (end == OBD_OBJECT_EOF) + end = i_size_read(inode); + mapping->writeback_index = (end >> CFS_PAGE_SHIFT) + 1; + } + RETURN(result); +} + int ll_readpage(struct file *file, struct page *vmpage) { struct ll_cl_context *lcc; diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index 661ae5b..6335b7f 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -332,7 +332,7 @@ ssize_t ll_direct_rw_pages(const struct lu_env *env, struct cl_io *io, if (rc == 0 && io_pages) { rc = cl_io_submit_sync(env, io, rw == READ ? CRT_READ : CRT_WRITE, - queue, CRP_NORMAL, 0); + queue, 0); } if (rc == 0) rc = pv->ldp_size; @@ -553,7 +553,7 @@ struct address_space_operations ll_aops = { // .readpages = ll_readpages, .direct_IO = ll_direct_IO_26, .writepage = ll_writepage, - .writepages = generic_writepages, + .writepages = ll_writepages, .set_page_dirty = ll_set_page_dirty, #ifdef HAVE_KERNEL_WRITE_BEGIN_END .write_begin = ll_write_begin, @@ -575,7 +575,7 @@ struct address_space_operations_ext ll_aops = { // .orig_aops.readpages = ll_readpages, .orig_aops.direct_IO = ll_direct_IO_26, .orig_aops.writepage = ll_writepage, - .orig_aops.writepages = generic_writepages, + .orig_aops.writepages = ll_writepages, .orig_aops.set_page_dirty = ll_set_page_dirty, .orig_aops.prepare_write = ll_prepare_write, .orig_aops.commit_write = ll_commit_write, diff --git a/lustre/llite/vvp_io.c b/lustre/llite/vvp_io.c index 4a65b9e..f534184 100644 --- a/lustre/llite/vvp_io.c +++ b/lustre/llite/vvp_io.c @@ -36,6 +36,7 @@ * Implementation of cl_io for VVP layer. * * Author: Nikita Danilov + * Author: Jinshan Xiong */ #define DEBUG_SUBSYSTEM S_LLITE @@ -347,44 +348,8 @@ static int vvp_io_setattr_trunc(const struct lu_env *env, const struct cl_io_slice *ios, struct inode *inode, loff_t size) { - struct vvp_io *vio = cl2vvp_io(env, ios); - struct cl_io *io = ios->cis_io; - struct cl_object *obj = ios->cis_obj; - pgoff_t start = cl_index(obj, size); - int result; - - DOWN_WRITE_I_ALLOC_SEM(inode); - - result = vvp_do_vmtruncate(inode, size); - - /* - * If a page is partially truncated, keep it owned across truncate to - * prevent... races. - * - * XXX this properly belongs to osc, because races in question are OST - * specific. - */ - if (cl_offset(obj, start) != size) { - struct cl_object_header *hdr; - - hdr = cl_object_header(obj); - cfs_spin_lock(&hdr->coh_page_guard); - vio->cui_partpage = cl_page_lookup(hdr, start); - cfs_spin_unlock(&hdr->coh_page_guard); - - if (vio->cui_partpage != NULL) - /* - * Wait for the transfer completion for a partially - * truncated page to avoid dead-locking an OST with - * the concurrent page-wise overlapping WRITE and - * PUNCH requests. BUG:17397. - * - * Partial page is disowned in vvp_io_trunc_end(). - */ - cl_page_own(env, io, vio->cui_partpage); - } else - vio->cui_partpage = NULL; - return result; + DOWN_WRITE_I_ALLOC_SEM(inode); + return 0; } static int vvp_io_setattr_time(const struct lu_env *env, @@ -434,23 +399,15 @@ static int vvp_io_setattr_start(const struct lu_env *env, static void vvp_io_setattr_end(const struct lu_env *env, const struct cl_io_slice *ios) { - struct vvp_io *vio = cl2vvp_io(env, ios); struct cl_io *io = ios->cis_io; struct inode *inode = ccc_object_inode(io->ci_obj); if (!cl_io_is_trunc(io)) return; - if (vio->cui_partpage != NULL) { - cl_page_disown(env, ios->cis_io, vio->cui_partpage); - cl_page_put(env, vio->cui_partpage); - vio->cui_partpage = NULL; - } - /* - * Do vmtruncate again, to remove possible stale pages populated by - * competing read threads. bz20645. - */ - vvp_do_vmtruncate(inode, io->u.ci_setattr.sa_attr.lvb_size); + /* Truncate in memory pages - they must be clean pages because osc + * has already notified to destroy osc_extents. */ + vvp_do_vmtruncate(inode, io->u.ci_setattr.sa_attr.lvb_size); } static void vvp_io_setattr_fini(const struct lu_env *env, @@ -721,6 +678,11 @@ static int vvp_io_fault_start(const struct lu_env *env, /* must return locked page */ if (fio->ft_mkwrite) { + /* we grab alloc_sem to exclude truncate case. + * Otherwise, we could add dirty pages into osc cache + * while truncate is on-going. */ + DOWN_READ_I_ALLOC_SEM(inode); + LASSERT(cfio->ft_vmpage != NULL); lock_page(cfio->ft_vmpage); } else { @@ -766,15 +728,22 @@ static int vvp_io_fault_start(const struct lu_env *env, * started before the page is really made dirty, we * still have chance to detect it. */ result = cl_page_cache_add(env, io, page, CRT_WRITE); - if (result < 0) { - cl_page_unassume(env, io, page); - cl_page_put(env, page); - - /* we're in big trouble, what can we do now? */ - if (result == -EDQUOT) - result = -ENOSPC; - GOTO(out, result); - } + LASSERT(cl_page_is_owned(page, io)); + + vmpage = NULL; + if (result < 0) { + cl_page_unmap(env, io, page); + cl_page_discard(env, io, page); + cl_page_disown(env, io, page); + + cl_page_put(env, page); + + /* we're in big trouble, what can we do now? */ + if (result == -EDQUOT) + result = -ENOSPC; + GOTO(out, result); + } else + cl_page_disown(env, io, page); } } @@ -795,18 +764,23 @@ static int vvp_io_fault_start(const struct lu_env *env, out: /* return unlocked vmpage to avoid deadlocking */ - unlock_page(vmpage); + if (vmpage != NULL) + unlock_page(vmpage); + if (fio->ft_mkwrite) + UP_READ_I_ALLOC_SEM(inode); #ifdef HAVE_VM_OP_FAULT - cfio->fault.ft_flags &= ~VM_FAULT_LOCKED; + cfio->fault.ft_flags &= ~VM_FAULT_LOCKED; #endif - return result; + return result; } -static void vvp_io_fsync_end(const struct lu_env *env, - const struct cl_io_slice *ios) +static int vvp_io_fsync_start(const struct lu_env *env, + const struct cl_io_slice *ios) { - /* never try to verify there is no dirty pages in sync range - * because page_mkwrite() can generate new dirty pages any time. */ + /* we should mark TOWRITE bit to each dirty page in radix tree to + * verify pages have been written, but this is difficult because of + * race. */ + return 0; } static int vvp_io_read_page(const struct lu_env *env, @@ -874,7 +848,7 @@ static int vvp_page_sync_io(const struct lu_env *env, struct cl_io *io, queue = &io->ci_queue; cl_2queue_init_page(queue, page); - result = cl_io_submit_sync(env, io, crt, queue, CRP_NORMAL, 0); + result = cl_io_submit_sync(env, io, crt, queue, 0); LASSERT(cl_page_is_owned(page, io)); if (crt == CRT_READ) @@ -1049,6 +1023,7 @@ static int vvp_io_commit_write(const struct lu_env *env, } if (need_clip) cl_page_clip(env, pg, 0, to); + clear_page_dirty_for_io(vmpage); result = vvp_page_sync_io(env, io, pg, cp, CRT_WRITE); if (result) CERROR("Write page %lu of inode %p failed %d\n", @@ -1108,7 +1083,7 @@ static const struct cl_io_operations vvp_io_ops = { .cio_end = ccc_io_end }, [CIT_FSYNC] = { - .cio_end = vvp_io_fsync_end, + .cio_start = vvp_io_fsync_start, .cio_fini = vvp_io_fini }, [CIT_MISC] = { diff --git a/lustre/llite/vvp_page.c b/lustre/llite/vvp_page.c index bc0e2b1..990d59b 100644 --- a/lustre/llite/vvp_page.c +++ b/lustre/llite/vvp_page.c @@ -36,6 +36,7 @@ * Implementation of cl_page for VVP layer. * * Author: Nikita Danilov + * Author: Jinshan Xiong */ #define DEBUG_SUBSYSTEM S_LLITE @@ -225,21 +226,15 @@ static int vvp_page_prep_write(const struct lu_env *env, const struct cl_page_slice *slice, struct cl_io *unused) { - struct cl_page *cp = slice->cpl_page; - cfs_page_t *vmpage = cl2vm_page(slice); - int result; + cfs_page_t *vmpage = cl2vm_page(slice); - if (clear_page_dirty_for_io(vmpage)) { - set_page_writeback(vmpage); - vvp_write_pending(cl2ccc(slice->cpl_obj), cl2ccc_page(slice)); - result = 0; + LASSERT(PageLocked(vmpage)); + LASSERT(!PageDirty(vmpage)); - /* only turn on writeback for async write. */ - if (cp->cp_sync_io == NULL) - unlock_page(vmpage); - } else - result = -EALREADY; - return result; + set_page_writeback(vmpage); + vvp_write_pending(cl2ccc(slice->cpl_obj), cl2ccc_page(slice)); + + return 0; } /** @@ -342,37 +337,32 @@ static void vvp_page_completion_write(const struct lu_env *env, * truncated. Skip it. */ static int vvp_page_make_ready(const struct lu_env *env, - const struct cl_page_slice *slice) + const struct cl_page_slice *slice) { - cfs_page_t *vmpage = cl2vm_page(slice); - struct cl_page *pg = slice->cpl_page; - int result; - - result = -EAGAIN; - /* we're trying to write, but the page is locked.. come back later */ - if (!TestSetPageLocked(vmpage)) { - if (pg->cp_state == CPS_CACHED) { - /* - * We can cancel IO if page wasn't dirty after all. - */ - clear_page_dirty_for_io(vmpage); - /* - * This actually clears the dirty bit in the radix - * tree. - */ - set_page_writeback(vmpage); - vvp_write_pending(cl2ccc(slice->cpl_obj), - cl2ccc_page(slice)); - CL_PAGE_HEADER(D_PAGE, env, pg, "readied\n"); - result = 0; - } else - /* - * Page was concurrently truncated. - */ - LASSERT(pg->cp_state == CPS_FREEING); - unlock_page(vmpage); - } - RETURN(result); + cfs_page_t *vmpage = cl2vm_page(slice); + struct cl_page *pg = slice->cpl_page; + int result = 0; + + lock_page(vmpage); + if (clear_page_dirty_for_io(vmpage)) { + LASSERT(pg->cp_state == CPS_CACHED); + /* This actually clears the dirty bit in the radix + * tree. */ + set_page_writeback(vmpage); + vvp_write_pending(cl2ccc(slice->cpl_obj), + cl2ccc_page(slice)); + CL_PAGE_HEADER(D_PAGE, env, pg, "readied\n"); + } else if (pg->cp_state == CPS_PAGEOUT) { + /* is it possible for osc_flush_async_page() to already + * make it ready? */ + result = -EALREADY; + } else { + CL_PAGE_DEBUG(D_ERROR, env, pg, "Unexpecting page state %d.\n", + pg->cp_state); + LBUG(); + } + unlock_page(vmpage); + RETURN(result); } static int vvp_page_print(const struct lu_env *env, diff --git a/lustre/lov/lov_io.c b/lustre/lov/lov_io.c index 3cf97dd..ab0e41b 100644 --- a/lustre/lov/lov_io.c +++ b/lustre/lov/lov_io.c @@ -114,10 +114,12 @@ static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio, io->u.ci_fsync.fi_end = end; io->u.ci_fsync.fi_capa = parent->u.ci_fsync.fi_capa; io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid; + io->u.ci_fsync.fi_mode = parent->u.ci_fsync.fi_mode; break; } - case CIT_READ: - case CIT_WRITE: { + case CIT_READ: + case CIT_WRITE: { + io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent); if (cl_io_is_append(parent)) { io->u.ci_wr.wr_append = 1; } else { @@ -573,8 +575,7 @@ static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld, */ static int lov_io_submit(const struct lu_env *env, const struct cl_io_slice *ios, - enum cl_req_type crt, struct cl_2queue *queue, - enum cl_req_priority priority) + enum cl_req_type crt, struct cl_2queue *queue) { struct lov_io *lio = cl2lov_io(env, ios); struct lov_object *obj = lio->lis_object; @@ -605,7 +606,7 @@ static int lov_io_submit(const struct lu_env *env, LASSERT(!IS_ERR(sub)); LASSERT(sub->sub_io == &lio->lis_single_subio); rc = cl_io_submit_rw(sub->sub_env, sub->sub_io, - crt, queue, priority); + crt, queue); lov_sub_put(sub); RETURN(rc); } @@ -646,7 +647,7 @@ static int lov_io_submit(const struct lu_env *env, sub = lov_sub_get(env, lio, stripe); if (!IS_ERR(sub)) { rc = cl_io_submit_rw(sub->sub_env, sub->sub_io, - crt, cl2q, priority); + crt, cl2q); lov_sub_put(sub); } else rc = PTR_ERR(sub); @@ -743,6 +744,28 @@ static int lov_io_fault_start(const struct lu_env *env, RETURN(lov_io_start(env, ios)); } +static void lov_io_fsync_end(const struct lu_env *env, + const struct cl_io_slice *ios) +{ + struct lov_io *lio = cl2lov_io(env, ios); + struct lov_io_sub *sub; + unsigned int *written = &ios->cis_io->u.ci_fsync.fi_nr_written; + ENTRY; + + *written = 0; + cfs_list_for_each_entry(sub, &lio->lis_active, sub_linkage) { + struct cl_io *subio = sub->sub_io; + + lov_sub_enter(sub); + lov_io_end_wrapper(sub->sub_env, subio); + lov_sub_exit(sub); + + if (subio->ci_result == 0) + *written += subio->u.ci_fsync.fi_nr_written; + } + RETURN_EXIT; +} + static const struct cl_io_operations lov_io_ops = { .op = { [CIT_READ] = { @@ -788,7 +811,7 @@ static const struct cl_io_operations lov_io_ops = { .cio_lock = lov_io_lock, .cio_unlock = lov_io_unlock, .cio_start = lov_io_start, - .cio_end = lov_io_end + .cio_end = lov_io_fsync_end }, [CIT_MISC] = { .cio_fini = lov_io_fini diff --git a/lustre/lov/lov_page.c b/lustre/lov/lov_page.c index ca3548f..03335ff 100644 --- a/lustre/lov/lov_page.c +++ b/lustre/lov/lov_page.c @@ -109,6 +109,30 @@ static void lov_page_assume(const struct lu_env *env, lov_page_own(env, slice, io, 0); } +static int lov_page_cache_add(const struct lu_env *env, + const struct cl_page_slice *slice, + struct cl_io *io) +{ + struct lov_io *lio = lov_env_io(env); + struct lov_io_sub *sub; + int rc = 0; + + LINVRNT(lov_page_invariant(slice)); + LINVRNT(!cl2lov_page(slice)->lps_invalid); + ENTRY; + + sub = lov_page_subio(env, lio, slice); + if (!IS_ERR(sub)) { + rc = cl_page_cache_add(sub->sub_env, sub->sub_io, + slice->cpl_page->cp_child, CRT_WRITE); + lov_sub_put(sub); + } else { + rc = PTR_ERR(sub); + CL_PAGE_DEBUG(D_ERROR, env, slice->cpl_page, "rc = %d\n", rc); + } + RETURN(rc); +} + static int lov_page_print(const struct lu_env *env, const struct cl_page_slice *slice, void *cookie, lu_printer_t printer) @@ -122,6 +146,11 @@ static const struct cl_page_operations lov_page_ops = { .cpo_fini = lov_page_fini, .cpo_own = lov_page_own, .cpo_assume = lov_page_assume, + .io = { + [CRT_WRITE] = { + .cpo_cache_add = lov_page_cache_add + } + }, .cpo_print = lov_page_print }; diff --git a/lustre/obdclass/cl_internal.h b/lustre/obdclass/cl_internal.h index b17ec42..48ba8da 100644 --- a/lustre/obdclass/cl_internal.h +++ b/lustre/obdclass/cl_internal.h @@ -110,7 +110,7 @@ struct cl_thread_info { */ struct cl_sync_io clt_anchor; /** - * Fields used by cl_lock_page_out(). + * Fields used by cl_lock_discard_pages(). */ pgoff_t clt_next_index; pgoff_t clt_fn_index; /* first non-overlapped index */ diff --git a/lustre/obdclass/cl_io.c b/lustre/obdclass/cl_io.c index 4ac5087..2e8619a 100644 --- a/lustre/obdclass/cl_io.c +++ b/lustre/obdclass/cl_io.c @@ -790,7 +790,7 @@ int cl_io_read_page(const struct lu_env *env, struct cl_io *io, } } if (result == 0) - result = cl_io_submit_rw(env, io, CRT_READ, queue, CRP_NORMAL); + result = cl_io_submit_rw(env, io, CRT_READ, queue); /* * Unlock unsent pages in case of error. */ @@ -886,8 +886,7 @@ EXPORT_SYMBOL(cl_io_commit_write); * \see cl_io_operations::cio_submit() */ int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io, - enum cl_req_type crt, struct cl_2queue *queue, - enum cl_req_priority priority) + enum cl_req_type crt, struct cl_2queue *queue) { const struct cl_io_slice *scan; int result = 0; @@ -899,7 +898,7 @@ int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io, if (scan->cis_iop->req_op[crt].cio_submit == NULL) continue; result = scan->cis_iop->req_op[crt].cio_submit(env, scan, crt, - queue, priority); + queue); if (result != 0) break; } @@ -917,21 +916,19 @@ EXPORT_SYMBOL(cl_io_submit_rw); */ int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io, enum cl_req_type iot, struct cl_2queue *queue, - enum cl_req_priority prio, long timeout) + long timeout) { struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor; struct cl_page *pg; int rc; - LASSERT(prio == CRP_NORMAL || prio == CRP_CANCEL); - cl_page_list_for_each(pg, &queue->c2_qin) { LASSERT(pg->cp_sync_io == NULL); pg->cp_sync_io = anchor; } cl_sync_io_init(anchor, queue->c2_qin.pl_nr); - rc = cl_io_submit_rw(env, io, iot, queue, prio); + rc = cl_io_submit_rw(env, io, iot, queue); if (rc == 0) { /* * If some pages weren't sent for any reason (e.g., diff --git a/lustre/obdclass/cl_lock.c b/lustre/obdclass/cl_lock.c index 677d652..fdfeed8 100644 --- a/lustre/obdclass/cl_lock.c +++ b/lustre/obdclass/cl_lock.c @@ -1830,12 +1830,13 @@ void cl_lock_cancel(const struct lu_env *env, struct cl_lock *lock) EXPORT_SYMBOL(cl_lock_cancel); /** - * Finds an existing lock covering given page and optionally different from a + * Finds an existing lock covering given index and optionally different from a * given \a except lock. */ -struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj, - struct cl_page *page, struct cl_lock *except, - int pending, int canceld) +struct cl_lock *cl_lock_at_pgoff(const struct lu_env *env, + struct cl_object *obj, pgoff_t index, + struct cl_lock *except, + int pending, int canceld) { struct cl_object_header *head; struct cl_lock *scan; @@ -1850,7 +1851,7 @@ struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj, need->cld_mode = CLM_READ; /* CLM_READ matches both READ & WRITE, but * not PHANTOM */ - need->cld_start = need->cld_end = page->cp_index; + need->cld_start = need->cld_end = index; need->cld_enq_flags = 0; cfs_spin_lock(&head->coh_lock_guard); @@ -1879,7 +1880,7 @@ struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj, cfs_spin_unlock(&head->coh_lock_guard); RETURN(lock); } -EXPORT_SYMBOL(cl_lock_at_page); +EXPORT_SYMBOL(cl_lock_at_pgoff); /** * Calculate the page offset at the layer of @lock. @@ -1935,60 +1936,45 @@ static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io, return CLP_GANG_OKAY; } -static int pageout_cb(const struct lu_env *env, struct cl_io *io, +static int discard_cb(const struct lu_env *env, struct cl_io *io, struct cl_page *page, void *cbdata) { - struct cl_thread_info *info = cl_env_info(env); - struct cl_page_list *queue = &info->clt_queue.c2_qin; - struct cl_lock *lock = cbdata; - typeof(cl_page_own) *page_own; - int rc = CLP_GANG_OKAY; - - page_own = queue->pl_nr ? cl_page_own_try : cl_page_own; - if (page_own(env, io, page) == 0) { - cl_page_list_add(queue, page); - info->clt_next_index = pgoff_at_lock(page, lock) + 1; - } else if (page->cp_state != CPS_FREEING) { - /* cl_page_own() won't fail unless - * the page is being freed. */ - LASSERT(queue->pl_nr != 0); - rc = CLP_GANG_AGAIN; - } + struct cl_thread_info *info = cl_env_info(env); + struct cl_lock *lock = cbdata; + + LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE); + KLASSERT(ergo(page->cp_type == CPT_CACHEABLE, + !PageWriteback(cl_page_vmpage(env, page)))); + KLASSERT(ergo(page->cp_type == CPT_CACHEABLE, + !PageDirty(cl_page_vmpage(env, page)))); + + info->clt_next_index = pgoff_at_lock(page, lock) + 1; + if (cl_page_own(env, io, page) == 0) { + /* discard the page */ + cl_page_unmap(env, io, page); + cl_page_discard(env, io, page); + cl_page_disown(env, io, page); + } else { + LASSERT(page->cp_state == CPS_FREEING); + } - return rc; + return CLP_GANG_OKAY; } /** - * Invalidate pages protected by the given lock, sending them out to the - * server first, if necessary. - * - * This function does the following: - * - * - collects a list of pages to be invalidated, - * - * - unmaps them from the user virtual memory, - * - * - sends dirty pages to the server, - * - * - waits for transfer completion, - * - * - discards pages, and throws them out of memory. - * - * If \a discard is set, pages are discarded without sending them to the - * server. + * Discard pages protected by the given lock. This function traverses radix + * tree to find all covering pages and discard them. If a page is being covered + * by other locks, it should remain in cache. * * If error happens on any step, the process continues anyway (the reasoning * behind this being that lock cancellation cannot be delayed indefinitely). */ -int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock, - int discard) +int cl_lock_discard_pages(const struct lu_env *env, struct cl_lock *lock) { struct cl_thread_info *info = cl_env_info(env); struct cl_io *io = &info->clt_io; - struct cl_2queue *queue = &info->clt_queue; struct cl_lock_descr *descr = &lock->cll_descr; cl_page_gang_cb_t cb; - long page_count; int res; int result; @@ -2000,38 +1986,12 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock, if (result != 0) GOTO(out, result); - cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : pageout_cb; + cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb; info->clt_fn_index = info->clt_next_index = descr->cld_start; do { - cl_2queue_init(queue); res = cl_page_gang_lookup(env, descr->cld_obj, io, info->clt_next_index, descr->cld_end, cb, (void *)lock); - page_count = queue->c2_qin.pl_nr; - if (page_count > 0) { - /* must be writeback case */ - LASSERTF(descr->cld_mode >= CLM_WRITE, "lock mode %s\n", - cl_lock_mode_name(descr->cld_mode)); - - result = cl_page_list_unmap(env, io, &queue->c2_qin); - if (!discard) { - long timeout = 600; /* 10 minutes. */ - /* for debug purpose, if this request can't be - * finished in 10 minutes, we hope it can - * notify us. - */ - result = cl_io_submit_sync(env, io, CRT_WRITE, - queue, CRP_CANCEL, - timeout); - if (result) - CWARN("Writing %lu pages error: %d\n", - page_count, result); - } - cl_2queue_discard(env, io, queue); - cl_2queue_disown(env, io, queue); - cl_2queue_fini(env, queue); - } - if (info->clt_next_index > descr->cld_end) break; @@ -2042,7 +2002,7 @@ out: cl_io_fini(env, io); RETURN(result); } -EXPORT_SYMBOL(cl_lock_page_out); +EXPORT_SYMBOL(cl_lock_discard_pages); /** * Eliminate all locks for a given object. diff --git a/lustre/obdclass/cl_page.c b/lustre/obdclass/cl_page.c index 217ee00..e2f6f93 100644 --- a/lustre/obdclass/cl_page.c +++ b/lustre/obdclass/cl_page.c @@ -1406,32 +1406,61 @@ EXPORT_SYMBOL(cl_page_make_ready); * its queues. * * \pre cl_page_is_owned(pg, io) - * \post ergo(result == 0, - * pg->cp_state == CPS_PAGEIN || pg->cp_state == CPS_PAGEOUT) + * \post cl_page_is_owned(pg, io) * * \see cl_page_operations::cpo_cache_add() */ int cl_page_cache_add(const struct lu_env *env, struct cl_io *io, struct cl_page *pg, enum cl_req_type crt) { - int result; + const struct cl_page_slice *scan; + int result = 0; - PINVRNT(env, pg, crt < CRT_NR); - PINVRNT(env, pg, cl_page_is_owned(pg, io)); - PINVRNT(env, pg, cl_page_invariant(pg)); + PINVRNT(env, pg, crt < CRT_NR); + PINVRNT(env, pg, cl_page_is_owned(pg, io)); + PINVRNT(env, pg, cl_page_invariant(pg)); - ENTRY; - result = cl_page_invoke(env, io, pg, CL_PAGE_OP(io[crt].cpo_cache_add)); - if (result == 0) { - cl_page_owner_clear(pg); - cl_page_state_set(env, pg, CPS_CACHED); - } - CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result); - RETURN(result); + ENTRY; + + cfs_list_for_each_entry(scan, &pg->cp_layers, cpl_linkage) { + if (scan->cpl_ops->io[crt].cpo_cache_add == NULL) + continue; + + result = scan->cpl_ops->io[crt].cpo_cache_add(env, scan, io); + if (result != 0) + break; + } + CL_PAGE_HEADER(D_TRACE, env, pg, "%d %d\n", crt, result); + RETURN(result); } EXPORT_SYMBOL(cl_page_cache_add); /** + * Called if a pge is being written back by kernel's intention. + * + * \pre cl_page_is_owned(pg, io) + * \post ergo(result == 0, pg->cp_state == CPS_PAGEOUT) + * + * \see cl_page_operations::cpo_flush() + */ +int cl_page_flush(const struct lu_env *env, struct cl_io *io, + struct cl_page *pg) +{ + int result; + + PINVRNT(env, pg, cl_page_is_owned(pg, io)); + PINVRNT(env, pg, cl_page_invariant(pg)); + + ENTRY; + + result = cl_page_invoke(env, io, pg, CL_PAGE_OP(cpo_flush)); + + CL_PAGE_HEADER(D_TRACE, env, pg, "%d\n", result); + RETURN(result); +} +EXPORT_SYMBOL(cl_page_flush); + +/** * Checks whether page is protected by any extent lock is at least required * mode. * diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index 4b957f7..b8b6a38 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -2457,32 +2457,6 @@ int lprocfs_obd_rd_max_pages_per_rpc(char *page, char **start, off_t off, } EXPORT_SYMBOL(lprocfs_obd_rd_max_pages_per_rpc); -int lprocfs_obd_wr_max_pages_per_rpc(struct file *file, const char *buffer, - unsigned long count, void *data) -{ - struct obd_device *dev = data; - struct client_obd *cli = &dev->u.cli; - struct obd_connect_data *ocd = &cli->cl_import->imp_connect_data; - int val, rc; - - rc = lprocfs_write_helper(buffer, count, &val); - if (rc) - return rc; - - LPROCFS_CLIMP_CHECK(dev); - if (val < 1 || val > ocd->ocd_brw_size >> CFS_PAGE_SHIFT) { - LPROCFS_CLIMP_EXIT(dev); - return -ERANGE; - } - client_obd_list_lock(&cli->cl_loi_list_lock); - cli->cl_max_pages_per_rpc = val; - client_obd_list_unlock(&cli->cl_loi_list_lock); - - LPROCFS_CLIMP_EXIT(dev); - return count; -} -EXPORT_SYMBOL(lprocfs_obd_wr_max_pages_per_rpc); - int lprocfs_target_rd_instance(char *page, char **start, off_t off, int count, int *eof, void *data) { diff --git a/lustre/obdclass/lu_ref.c b/lustre/obdclass/lu_ref.c index 84afc67..3471bd3 100644 --- a/lustre/obdclass/lu_ref.c +++ b/lustre/obdclass/lu_ref.c @@ -57,18 +57,17 @@ * Asserts a condition for a given lu_ref. Must be called with * lu_ref::lf_guard held. */ -#define REFASSERT(ref, expr) \ - do { \ - struct lu_ref *__ref = (ref); \ - \ - if (unlikely(!(expr))) { \ - lu_ref_print(__ref); \ - cfs_spin_unlock(&__ref->lf_guard); \ - lu_ref_print_all(); \ - LASSERT(0); \ - cfs_spin_lock(&__ref->lf_guard); \ - } \ - } while (0) +#define REFASSERT(ref, expr) do { \ + struct lu_ref *__tmp = (ref); \ + \ + if (unlikely(!(expr))) { \ + lu_ref_print(__tmp); \ + cfs_spin_unlock(&__tmp->lf_guard); \ + lu_ref_print_all(); \ + LASSERT(0); \ + cfs_spin_lock(&__tmp->lf_guard); \ + } \ +} while (0) struct lu_ref_link { struct lu_ref *ll_ref; diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index 0cc6d1d..65d7429e 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -1336,8 +1336,7 @@ static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset, if (async) rc = cl_echo_async_brw(env, io, typ, queue); else - rc = cl_io_submit_sync(env, io, typ, queue, - CRP_NORMAL, 0); + rc = cl_io_submit_sync(env, io, typ, queue, 0); CDEBUG(D_INFO, "echo_client %s write returns %d\n", async ? "async" : "sync", rc); } diff --git a/lustre/osc/lproc_osc.c b/lustre/osc/lproc_osc.c index 4018132..742733c 100644 --- a/lustre/osc/lproc_osc.c +++ b/lustre/osc/lproc_osc.c @@ -565,6 +565,35 @@ static int osc_rd_destroys_in_flight(char *page, char **start, off_t off, cfs_atomic_read(&obd->u.cli.cl_destroy_in_flight)); } +static int lprocfs_osc_wr_max_pages_per_rpc(struct file *file, + const char *buffer, unsigned long count, void *data) +{ + struct obd_device *dev = data; + struct client_obd *cli = &dev->u.cli; + struct obd_connect_data *ocd = &cli->cl_import->imp_connect_data; + int chunk_mask, val, rc; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + + LPROCFS_CLIMP_CHECK(dev); + + chunk_mask = ~((1 << (cli->cl_chunkbits - CFS_PAGE_SHIFT)) - 1); + /* max_pages_per_rpc must be chunk aligned */ + val = (val + ~chunk_mask) & chunk_mask; + if (val == 0 || val > ocd->ocd_brw_size >> CFS_PAGE_SHIFT) { + LPROCFS_CLIMP_EXIT(dev); + return -ERANGE; + } + client_obd_list_lock(&cli->cl_loi_list_lock); + cli->cl_max_pages_per_rpc = val; + client_obd_list_unlock(&cli->cl_loi_list_lock); + + LPROCFS_CLIMP_EXIT(dev); + return count; +} + static struct lprocfs_vars lprocfs_osc_obd_vars[] = { { "uuid", lprocfs_rd_uuid, 0, 0 }, { "ping", 0, lprocfs_wr_ping, 0, 0, 0222 }, @@ -581,7 +610,7 @@ static struct lprocfs_vars lprocfs_osc_obd_vars[] = { { "active", osc_rd_active, osc_wr_active, 0 }, { "max_pages_per_rpc", lprocfs_obd_rd_max_pages_per_rpc, - lprocfs_obd_wr_max_pages_per_rpc, 0 }, + lprocfs_osc_wr_max_pages_per_rpc, 0 }, { "max_rpcs_in_flight", osc_rd_max_rpcs_in_flight, osc_wr_max_rpcs_in_flight, 0 }, { "destroys_in_flight", osc_rd_destroys_in_flight, 0, 0 }, @@ -638,9 +667,9 @@ static int osc_rpc_stats_seq_show(struct seq_file *seq, void *v) seq_printf(seq, "write RPCs in flight: %d\n", cli->cl_w_in_flight); seq_printf(seq, "pending write pages: %d\n", - cli->cl_pending_w_pages); + cfs_atomic_read(&cli->cl_pending_w_pages)); seq_printf(seq, "pending read pages: %d\n", - cli->cl_pending_r_pages); + cfs_atomic_read(&cli->cl_pending_r_pages)); seq_printf(seq, "\n\t\t\tread\t\t\twrite\n"); seq_printf(seq, "pages per rpc rpcs %% cum %% |"); diff --git a/lustre/osc/osc_cache.c b/lustre/osc/osc_cache.c index 9f12ad6..b67911b 100644 --- a/lustre/osc/osc_cache.c +++ b/lustre/osc/osc_cache.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -46,37 +44,1137 @@ #include "osc_cl_internal.h" #include "osc_internal.h" -static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, - struct osc_async_page *oap); -static int osc_enter_cache_try(const struct lu_env *env, struct client_obd *cli, - struct osc_async_page *oap, int transient); -static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap, - int sent); +static int extent_debug; /* set it to be true for more debug */ + +static void osc_update_pending(struct osc_object *obj, int cmd, int delta); +static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext, + int state); +static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, + struct osc_async_page *oap, int sent, int rc); +static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap, + int cmd); +static int osc_refresh_count(const struct lu_env *env, + struct osc_async_page *oap, int cmd); +static int osc_io_unplug_async(const struct lu_env *env, + struct client_obd *cli, struct osc_object *osc); +static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages, + unsigned int lost_grant); + +static void osc_extent_tree_dump0(int level, struct osc_object *obj, + const char *func, int line); +#define osc_extent_tree_dump(lvl, obj) \ + osc_extent_tree_dump0(lvl, obj, __func__, __LINE__) /** \addtogroup osc * @{ */ -#define OSC_IO_DEBUG(OSC, STR, args...) \ - CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \ - !cfs_list_empty(&(OSC)->oo_ready_item) || \ - !cfs_list_empty(&(OSC)->oo_hp_ready_item), \ - (OSC)->oo_write_pages.oop_num_pending, \ - !cfs_list_empty(&(OSC)->oo_write_pages.oop_urgent), \ - (OSC)->oo_read_pages.oop_num_pending, \ - !cfs_list_empty(&(OSC)->oo_read_pages.oop_urgent), \ - args) +/* ------------------ osc extent ------------------ */ +static inline char *ext_flags(struct osc_extent *ext, char *flags) +{ + char *buf = flags; + *buf++ = ext->oe_rw ? 'r' : 'w'; + if (ext->oe_intree) + *buf++ = 'i'; + if (ext->oe_srvlock) + *buf++ = 's'; + if (ext->oe_hp) + *buf++ = 'h'; + if (ext->oe_urgent) + *buf++ = 'u'; + if (ext->oe_memalloc) + *buf++ = 'm'; + if (ext->oe_trunc_pending) + *buf++ = 't'; + if (ext->oe_fsync_wait) + *buf++ = 'Y'; + *buf = 0; + return flags; +} + +static inline char list_empty_marker(cfs_list_t *list) +{ + return cfs_list_empty(list) ? '-' : '+'; +} + +#define EXTSTR "[%lu -> %lu/%lu]" +#define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end + +#define OSC_EXTENT_DUMP(lvl, extent, fmt, ...) do { \ + struct osc_extent *__ext = (extent); \ + const char *__str[] = OES_STRINGS; \ + char __buf[16]; \ + \ + CDEBUG(lvl, \ + "extent %p@{" EXTSTR ", " \ + "[%d|%d|%c|%s|%s|%p], [%d|%d|%c|%c|%p|%u|%p]} " fmt, \ + /* ----- extent part 0 ----- */ \ + __ext, EXTPARA(__ext), \ + /* ----- part 1 ----- */ \ + cfs_atomic_read(&__ext->oe_refc), \ + cfs_atomic_read(&__ext->oe_users), \ + list_empty_marker(&__ext->oe_link), \ + __str[__ext->oe_state], ext_flags(__ext, __buf), \ + __ext->oe_obj, \ + /* ----- part 2 ----- */ \ + __ext->oe_grants, __ext->oe_nr_pages, \ + list_empty_marker(&__ext->oe_pages), \ + cfs_waitq_active(&__ext->oe_waitq) ? '+' : '-', \ + __ext->oe_osclock, __ext->oe_mppr, __ext->oe_owner, \ + /* ----- part 4 ----- */ \ + ## __VA_ARGS__); \ +} while (0) + +#undef EASSERTF +#define EASSERTF(expr, ext, fmt, args...) do { \ + if (!(expr)) { \ + OSC_EXTENT_DUMP(D_ERROR, (ext), fmt, ##args); \ + osc_extent_tree_dump(D_ERROR, (ext)->oe_obj); \ + LASSERT(expr); \ + } \ +} while (0) + +#undef EASSERT +#define EASSERT(expr, ext) EASSERTF(expr, ext, "\n") + +static inline struct osc_extent *rb_extent(struct rb_node *n) +{ + if (n == NULL) + return NULL; + + return container_of(n, struct osc_extent, oe_node); +} + +static inline struct osc_extent *next_extent(struct osc_extent *ext) +{ + if (ext == NULL) + return NULL; + + LASSERT(ext->oe_intree); + return rb_extent(rb_next(&ext->oe_node)); +} + +static inline struct osc_extent *prev_extent(struct osc_extent *ext) +{ + if (ext == NULL) + return NULL; + + LASSERT(ext->oe_intree); + return rb_extent(rb_prev(&ext->oe_node)); +} + +static inline struct osc_extent *first_extent(struct osc_object *obj) +{ + return rb_extent(rb_first(&obj->oo_root)); +} + +/* object must be locked by caller. */ +static int osc_extent_sanity_check0(struct osc_extent *ext, + const char *func, const int line) +{ + struct osc_object *obj = ext->oe_obj; + struct osc_async_page *oap; + int page_count; + int rc = 0; + + if (!osc_object_is_locked(obj)) + GOTO(out, rc = 9); + + if (ext->oe_state >= OES_STATE_MAX) + GOTO(out, rc = 10); + + if (cfs_atomic_read(&ext->oe_refc) <= 0) + GOTO(out, rc = 20); + + if (cfs_atomic_read(&ext->oe_refc) < cfs_atomic_read(&ext->oe_users)) + GOTO(out, rc = 30); + + switch (ext->oe_state) { + case OES_INV: + if (ext->oe_nr_pages > 0 || !cfs_list_empty(&ext->oe_pages)) + GOTO(out, rc = 35); + GOTO(out, rc = 0); + break; + case OES_ACTIVE: + if (cfs_atomic_read(&ext->oe_users) == 0) + GOTO(out, rc = 40); + if (ext->oe_hp) + GOTO(out, rc = 50); + if (ext->oe_fsync_wait && !ext->oe_urgent) + GOTO(out, rc = 55); + break; + case OES_CACHE: + if (ext->oe_grants == 0) + GOTO(out, rc = 60); + if (ext->oe_fsync_wait && !ext->oe_urgent && !ext->oe_hp) + GOTO(out, rc = 65); + default: + if (cfs_atomic_read(&ext->oe_users) > 0) + GOTO(out, rc = 70); + } + + if (ext->oe_max_end < ext->oe_end || ext->oe_end < ext->oe_start) + GOTO(out, rc = 80); + + if (ext->oe_osclock == NULL && ext->oe_grants > 0) + GOTO(out, rc = 90); + + if (ext->oe_osclock) { + struct cl_lock_descr *descr; + descr = &ext->oe_osclock->cll_descr; + if (!(descr->cld_start <= ext->oe_start && + descr->cld_end >= ext->oe_max_end)) + GOTO(out, rc = 100); + } + + if (ext->oe_nr_pages > ext->oe_mppr) + GOTO(out, rc = 105); + + /* Do not verify page list if extent is in RPC. This is because an + * in-RPC extent is supposed to be exclusively accessible w/o lock. */ + if (ext->oe_state > OES_CACHE) + GOTO(out, rc = 0); + + if (!extent_debug) + GOTO(out, rc = 0); + + page_count = 0; + cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { + pgoff_t index = oap2cl_page(oap)->cp_index; + ++page_count; + if (index > ext->oe_end || index < ext->oe_start) + GOTO(out, rc = 110); + } + if (page_count != ext->oe_nr_pages) + GOTO(out, rc = 120); + +out: + if (rc != 0) + OSC_EXTENT_DUMP(D_ERROR, ext, + "%s:%d sanity check %p failed with rc = %d\n", + func, line, ext, rc); + return rc; +} + +#define sanity_check_nolock(ext) \ + osc_extent_sanity_check0(ext, __func__, __LINE__) + +#define sanity_check(ext) ({ \ + int __res; \ + osc_object_lock((ext)->oe_obj); \ + __res = sanity_check_nolock(ext); \ + osc_object_unlock((ext)->oe_obj); \ + __res; \ +}) + + +/** + * sanity check - to make sure there is no overlapped extent in the tree. + */ +static int osc_extent_is_overlapped(struct osc_object *obj, + struct osc_extent *ext) +{ + struct osc_extent *tmp; + + LASSERT(osc_object_is_locked(obj)); + + if (!extent_debug) + return 0; + + for (tmp = first_extent(obj); tmp != NULL; tmp = next_extent(tmp)) { + if (tmp == ext) + continue; + if (tmp->oe_end >= ext->oe_start && + tmp->oe_start <= ext->oe_end) + return 1; + } + return 0; +} + +static void osc_extent_state_set(struct osc_extent *ext, int state) +{ + LASSERT(osc_object_is_locked(ext->oe_obj)); + LASSERT(state >= OES_INV && state < OES_STATE_MAX); + + /* Never try to sanity check a state changing extent :-) */ + /* LASSERT(sanity_check_nolock(ext) == 0); */ + + /* TODO: validate the state machine */ + ext->oe_state = state; + cfs_waitq_broadcast(&ext->oe_waitq); +} + +static struct osc_extent *osc_extent_alloc(struct osc_object *obj) +{ + struct osc_extent *ext; + + OBD_SLAB_ALLOC_PTR_GFP(ext, osc_extent_kmem, CFS_ALLOC_STD); + if (ext == NULL) + return NULL; + + RB_CLEAR_NODE(&ext->oe_node); + ext->oe_obj = obj; + cfs_atomic_set(&ext->oe_refc, 1); + cfs_atomic_set(&ext->oe_users, 0); + CFS_INIT_LIST_HEAD(&ext->oe_link); + ext->oe_state = OES_INV; + CFS_INIT_LIST_HEAD(&ext->oe_pages); + cfs_waitq_init(&ext->oe_waitq); + ext->oe_osclock = NULL; + + return ext; +} + +static void osc_extent_free(struct osc_extent *ext) +{ + OBD_SLAB_FREE_PTR(ext, osc_extent_kmem); +} + +static struct osc_extent *osc_extent_get(struct osc_extent *ext) +{ + LASSERT(cfs_atomic_read(&ext->oe_refc) >= 0); + cfs_atomic_inc(&ext->oe_refc); + return ext; +} + +static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext) +{ + LASSERT(cfs_atomic_read(&ext->oe_refc) > 0); + if (cfs_atomic_dec_and_test(&ext->oe_refc)) { + LASSERT(cfs_list_empty(&ext->oe_link)); + LASSERT(cfs_atomic_read(&ext->oe_users) == 0); + LASSERT(ext->oe_state == OES_INV); + LASSERT(!ext->oe_intree); + + if (ext->oe_osclock) { + cl_lock_put(env, ext->oe_osclock); + ext->oe_osclock = NULL; + } + osc_extent_free(ext); + } +} + +/** + * osc_extent_put_trust() is a special version of osc_extent_put() when + * it's known that the caller is not the last user. This is to address the + * problem of lacking of lu_env ;-). + */ +static void osc_extent_put_trust(struct osc_extent *ext) +{ + LASSERT(cfs_atomic_read(&ext->oe_refc) > 1); + LASSERT(osc_object_is_locked(ext->oe_obj)); + cfs_atomic_dec(&ext->oe_refc); +} + +/** + * Return the extent which includes pgoff @index, or return the greatest + * previous extent in the tree. + */ +static struct osc_extent *osc_extent_search(struct osc_object *obj, + pgoff_t index) +{ + struct rb_node *n = obj->oo_root.rb_node; + struct osc_extent *tmp, *p = NULL; + + LASSERT(osc_object_is_locked(obj)); + while (n != NULL) { + tmp = rb_extent(n); + if (index < tmp->oe_start) { + n = n->rb_left; + } else if (index > tmp->oe_end) { + p = rb_extent(n); + n = n->rb_right; + } else { + return tmp; + } + } + return p; +} + +/* + * Return the extent covering @index, otherwise return NULL. + * caller must have held object lock. + */ +static struct osc_extent *osc_extent_lookup(struct osc_object *obj, + pgoff_t index) +{ + struct osc_extent *ext; + + ext = osc_extent_search(obj, index); + if (ext != NULL && ext->oe_start <= index && index <= ext->oe_end) + return osc_extent_get(ext); + return NULL; +} + +/* caller must have held object lock. */ +static void osc_extent_insert(struct osc_object *obj, struct osc_extent *ext) +{ + struct rb_node **n = &obj->oo_root.rb_node; + struct rb_node *parent = NULL; + struct osc_extent *tmp; + + LASSERT(ext->oe_intree == 0); + LASSERT(ext->oe_obj == obj); + LASSERT(osc_object_is_locked(obj)); + while (*n != NULL) { + tmp = rb_extent(*n); + parent = *n; + + if (ext->oe_end < tmp->oe_start) + n = &(*n)->rb_left; + else if (ext->oe_start > tmp->oe_end) + n = &(*n)->rb_right; + else + EASSERTF(0, tmp, EXTSTR, EXTPARA(ext)); + } + rb_link_node(&ext->oe_node, parent, n); + rb_insert_color(&ext->oe_node, &obj->oo_root); + osc_extent_get(ext); + ext->oe_intree = 1; +} + +/* caller must have held object lock. */ +static void osc_extent_erase(struct osc_extent *ext) +{ + struct osc_object *obj = ext->oe_obj; + LASSERT(osc_object_is_locked(obj)); + if (ext->oe_intree) { + rb_erase(&ext->oe_node, &obj->oo_root); + ext->oe_intree = 0; + /* rbtree held a refcount */ + osc_extent_put_trust(ext); + } +} + +static struct osc_extent *osc_extent_hold(struct osc_extent *ext) +{ + struct osc_object *obj = ext->oe_obj; + + LASSERT(osc_object_is_locked(obj)); + LASSERT(ext->oe_state == OES_ACTIVE || ext->oe_state == OES_CACHE); + if (ext->oe_state == OES_CACHE) { + osc_extent_state_set(ext, OES_ACTIVE); + osc_update_pending(obj, OBD_BRW_WRITE, -ext->oe_nr_pages); + } + cfs_atomic_inc(&ext->oe_users); + cfs_list_del_init(&ext->oe_link); + return osc_extent_get(ext); +} + +static void __osc_extent_remove(struct osc_extent *ext) +{ + LASSERT(osc_object_is_locked(ext->oe_obj)); + LASSERT(cfs_list_empty(&ext->oe_pages)); + osc_extent_erase(ext); + cfs_list_del_init(&ext->oe_link); + osc_extent_state_set(ext, OES_INV); + OSC_EXTENT_DUMP(D_CACHE, ext, "destroyed.\n"); +} + +static void osc_extent_remove(struct osc_extent *ext) +{ + struct osc_object *obj = ext->oe_obj; + + osc_object_lock(obj); + __osc_extent_remove(ext); + osc_object_unlock(obj); +} + +/** + * This function is used to merge extents to get better performance. It checks + * if @cur and @victim are contiguous at chunk level. + */ +static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur, + struct osc_extent *victim) +{ + struct osc_object *obj = cur->oe_obj; + pgoff_t chunk_start; + pgoff_t chunk_end; + int ppc_bits; + + LASSERT(cur->oe_state == OES_CACHE); + LASSERT(osc_object_is_locked(obj)); + if (victim == NULL) + return -EINVAL; + + if (victim->oe_state != OES_CACHE || victim->oe_fsync_wait) + return -EBUSY; + + if (cur->oe_max_end != victim->oe_max_end) + return -ERANGE; + + LASSERT(cur->oe_osclock == victim->oe_osclock); + ppc_bits = osc_cli(obj)->cl_chunkbits - CFS_PAGE_SHIFT; + chunk_start = cur->oe_start >> ppc_bits; + chunk_end = cur->oe_end >> ppc_bits; + if (chunk_start != (victim->oe_end >> ppc_bits) + 1 && + chunk_end + 1 != victim->oe_start >> ppc_bits) + return -ERANGE; + + OSC_EXTENT_DUMP(D_CACHE, victim, "will be merged by %p.\n", cur); + + cur->oe_start = min(cur->oe_start, victim->oe_start); + cur->oe_end = max(cur->oe_end, victim->oe_end); + cur->oe_grants += victim->oe_grants; + cur->oe_nr_pages += victim->oe_nr_pages; + /* only the following bits are needed to merge */ + cur->oe_urgent |= victim->oe_urgent; + cur->oe_memalloc |= victim->oe_memalloc; + cfs_list_splice_init(&victim->oe_pages, &cur->oe_pages); + cfs_list_del_init(&victim->oe_link); + victim->oe_nr_pages = 0; + + osc_extent_get(victim); + __osc_extent_remove(victim); + osc_extent_put(env, victim); + + OSC_EXTENT_DUMP(D_CACHE, cur, "after merging %p.\n", victim); + return 0; +} + +/** + * Drop user count of osc_extent, and unplug IO asynchronously. + */ +int osc_extent_release(const struct lu_env *env, struct osc_extent *ext) +{ + struct osc_object *obj = ext->oe_obj; + int rc = 0; + ENTRY; + + LASSERT(cfs_atomic_read(&ext->oe_users) > 0); + LASSERT(sanity_check(ext) == 0); + LASSERT(ext->oe_grants > 0); + + if (cfs_atomic_dec_and_lock(&ext->oe_users, &obj->oo_lock)) { + LASSERT(ext->oe_state == OES_ACTIVE); + if (ext->oe_trunc_pending) { + /* a truncate process is waiting for this extent. + * This may happen due to a race, check + * osc_cache_truncate_start(). */ + osc_extent_state_set(ext, OES_TRUNC); + ext->oe_trunc_pending = 0; + } else { + osc_extent_state_set(ext, OES_CACHE); + osc_update_pending(obj, OBD_BRW_WRITE, + ext->oe_nr_pages); + + /* try to merge the previous and next extent. */ + osc_extent_merge(env, ext, prev_extent(ext)); + osc_extent_merge(env, ext, next_extent(ext)); + + if (ext->oe_urgent) + cfs_list_move_tail(&ext->oe_link, + &obj->oo_urgent_exts); + } + osc_object_unlock(obj); + + osc_io_unplug_async(env, osc_cli(obj), obj); + } + osc_extent_put(env, ext); + RETURN(rc); +} + +static inline int overlapped(struct osc_extent *ex1, struct osc_extent *ex2) +{ + return !(ex1->oe_end < ex2->oe_start || ex2->oe_end < ex1->oe_start); +} + +/** + * Find or create an extent which includes @index, core function to manage + * extent tree. + */ +struct osc_extent *osc_extent_find(const struct lu_env *env, + struct osc_object *obj, pgoff_t index, + int *grants) + +{ + struct client_obd *cli = osc_cli(obj); + struct cl_lock *lock; + struct osc_extent *cur; + struct osc_extent *ext; + struct osc_extent *conflict = NULL; + struct osc_extent *found = NULL; + pgoff_t chunk; + pgoff_t max_end; + int max_pages; /* max_pages_per_rpc */ + int chunksize; + int ppc_bits; /* pages per chunk bits */ + int chunk_mask; + int rc; + ENTRY; + + cur = osc_extent_alloc(obj); + if (cur == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + lock = cl_lock_at_pgoff(env, osc2cl(obj), index, NULL, 1, 0); + LASSERT(lock != NULL); + LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE); + + LASSERT(cli->cl_chunkbits >= CFS_PAGE_SHIFT); + ppc_bits = cli->cl_chunkbits - CFS_PAGE_SHIFT; + chunk_mask = ~((1 << ppc_bits) - 1); + chunksize = 1 << cli->cl_chunkbits; + chunk = index >> ppc_bits; + + /* align end to rpc edge, rpc size may not be a power 2 integer. */ + max_pages = cli->cl_max_pages_per_rpc; + LASSERT((max_pages & ~chunk_mask) == 0); + max_end = index - (index % max_pages) + max_pages - 1; + max_end = min_t(pgoff_t, max_end, lock->cll_descr.cld_end); + + /* initialize new extent by parameters so far */ + cur->oe_max_end = max_end; + cur->oe_start = index & chunk_mask; + cur->oe_end = ((index + ~chunk_mask + 1) & chunk_mask) - 1; + if (cur->oe_start < lock->cll_descr.cld_start) + cur->oe_start = lock->cll_descr.cld_start; + if (cur->oe_end > max_end) + cur->oe_end = max_end; + cur->oe_osclock = lock; + cur->oe_grants = 0; + cur->oe_mppr = max_pages; + + /* grants has been allocated by caller */ + LASSERTF(*grants >= chunksize + cli->cl_extent_tax, + "%u/%u/%u.\n", *grants, chunksize, cli->cl_extent_tax); + LASSERTF((max_end - cur->oe_start) < max_pages, EXTSTR, EXTPARA(cur)); + +restart: + osc_object_lock(obj); + ext = osc_extent_search(obj, cur->oe_start); + if (ext == NULL) + ext = first_extent(obj); + while (ext != NULL) { + loff_t ext_chk_start = ext->oe_start >> ppc_bits; + loff_t ext_chk_end = ext->oe_end >> ppc_bits; + + LASSERT(sanity_check_nolock(ext) == 0); + if (chunk > ext_chk_end + 1) + break; + + /* if covering by different locks, no chance to match */ + if (lock != ext->oe_osclock) { + EASSERTF(!overlapped(ext, cur), ext, + EXTSTR, EXTPARA(cur)); + + ext = next_extent(ext); + continue; + } + + /* discontiguous chunks? */ + if (chunk + 1 < ext_chk_start) { + ext = next_extent(ext); + continue; + } + + /* ok, from now on, ext and cur have these attrs: + * 1. covered by the same lock + * 2. contiguous at chunk level or overlapping. */ + + if (overlapped(ext, cur)) { + /* cur is the minimum unit, so overlapping means + * full contain. */ + EASSERTF((ext->oe_start <= cur->oe_start && + ext->oe_end >= cur->oe_end), + ext, EXTSTR, EXTPARA(cur)); + + if (ext->oe_state > OES_CACHE || ext->oe_fsync_wait) { + /* for simplicity, we wait for this extent to + * finish before going forward. */ + conflict = osc_extent_get(ext); + break; + } + + found = osc_extent_hold(ext); + break; + } + + /* non-overlapped extent */ + if (ext->oe_state != OES_CACHE || ext->oe_fsync_wait) { + /* we can't do anything for a non OES_CACHE extent, or + * if there is someone waiting for this extent to be + * flushed, try next one. */ + ext = next_extent(ext); + continue; + } + + /* check if they belong to the same rpc slot before trying to + * merge. the extents are not overlapped and contiguous at + * chunk level to get here. */ + if (ext->oe_max_end != max_end) { + /* if they don't belong to the same RPC slot or + * max_pages_per_rpc has ever changed, do not merge. */ + ext = next_extent(ext); + continue; + } + + /* it's required that an extent must be contiguous at chunk + * level so that we know the whole extent is covered by grant + * (the pages in the extent are NOT required to be contiguous). + * Otherwise, it will be too much difficult to know which + * chunks have grants allocated. */ + + /* try to do front merge - extend ext's start */ + if (chunk + 1 == ext_chk_start) { + /* ext must be chunk size aligned */ + EASSERT((ext->oe_start & ~chunk_mask) == 0, ext); + + /* pull ext's start back to cover cur */ + ext->oe_start = cur->oe_start; + ext->oe_grants += chunksize; + *grants -= chunksize; + + found = osc_extent_hold(ext); + } else if (chunk == ext_chk_end + 1) { + /* rear merge */ + ext->oe_end = cur->oe_end; + ext->oe_grants += chunksize; + *grants -= chunksize; + + /* try to merge with the next one because we just fill + * in a gap */ + if (osc_extent_merge(env, ext, next_extent(ext)) == 0) + /* we can save extent tax from next extent */ + *grants += cli->cl_extent_tax; + + found = osc_extent_hold(ext); + } + if (found != NULL) + break; + + ext = next_extent(ext); + } + + osc_extent_tree_dump(D_CACHE, obj); + if (found != NULL) { + LASSERT(conflict == NULL); + if (!IS_ERR(found)) { + LASSERT(found->oe_osclock == cur->oe_osclock); + OSC_EXTENT_DUMP(D_CACHE, found, + "found caching ext for %lu.\n", index); + } + } else if (conflict == NULL) { + /* create a new extent */ + EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur); + cur->oe_grants = chunksize + cli->cl_extent_tax; + *grants -= cur->oe_grants; + LASSERT(*grants >= 0); + + cur->oe_state = OES_CACHE; + found = osc_extent_hold(cur); + osc_extent_insert(obj, cur); + OSC_EXTENT_DUMP(D_CACHE, cur, "add into tree %lu/%lu.\n", + index, lock->cll_descr.cld_end); + } + osc_object_unlock(obj); + + if (conflict != NULL) { + LASSERT(found == NULL); + + /* waiting for IO to finish. Please notice that it's impossible + * to be an OES_TRUNC extent. */ + rc = osc_extent_wait(env, conflict, OES_INV); + osc_extent_put(env, conflict); + conflict = NULL; + if (rc < 0) + GOTO(out, found = ERR_PTR(rc)); + + goto restart; + } + EXIT; + +out: + osc_extent_put(env, cur); + LASSERT(*grants >= 0); + return found; +} + +/** + * Called when IO is finished to an extent. + */ +int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, + int sent, int rc) +{ + struct client_obd *cli = osc_cli(ext->oe_obj); + struct osc_async_page *oap; + struct osc_async_page *tmp; + struct osc_async_page *last = NULL; + int nr_pages = ext->oe_nr_pages; + int lost_grant = 0; + int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096; + ENTRY; + + OSC_EXTENT_DUMP(D_CACHE, ext, "extent finished.\n"); + + ext->oe_rc = rc ?: ext->oe_nr_pages; + EASSERT(ergo(rc == 0, ext->oe_state == OES_RPC), ext); + cfs_list_for_each_entry_safe(oap, tmp, &ext->oe_pages, + oap_pending_item) { + cfs_list_del_init(&oap->oap_rpc_item); + cfs_list_del_init(&oap->oap_pending_item); + if (last == NULL || last->oap_obj_off < oap->oap_obj_off) + last = oap; + + --ext->oe_nr_pages; + osc_ap_completion(env, cli, oap, sent, rc); + } + EASSERT(ext->oe_nr_pages == 0, ext); + + if (!sent) { + lost_grant = ext->oe_grants; + } else if (blocksize < CFS_PAGE_SIZE && + last->oap_count != CFS_PAGE_SIZE) { + /* For short writes we shouldn't count parts of pages that + * span a whole chunk on the OST side, or our accounting goes + * wrong. Should match the code in filter_grant_check. */ + int offset = oap->oap_page_off & ~CFS_PAGE_MASK; + int count = oap->oap_count + (offset & (blocksize - 1)); + int end = (offset + oap->oap_count) & (blocksize - 1); + if (end) + count += blocksize - end; + + lost_grant = CFS_PAGE_SIZE - count; + } + if (ext->oe_grants > 0) + osc_free_grant(cli, nr_pages, lost_grant); + + osc_extent_remove(ext); + /* put the refcount for RPC */ + osc_extent_put(env, ext); + RETURN(0); +} + +/** + * Wait for the extent's state to become @state. + */ +static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext, + int state) +{ + struct osc_object *obj = ext->oe_obj; + struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); + int rc = 0; + ENTRY; + + osc_object_lock(obj); + LASSERT(sanity_check_nolock(ext) == 0); + /* `Kick' this extent only if the caller is waiting for it to be + * written out. */ + if (state == OES_INV && !ext->oe_urgent && !ext->oe_hp) { + if (ext->oe_state == OES_ACTIVE) { + ext->oe_urgent = 1; + } else if (ext->oe_state == OES_CACHE) { + ext->oe_urgent = 1; + osc_extent_hold(ext); + rc = 1; + } + } + osc_object_unlock(obj); + if (rc == 1) + osc_extent_release(env, ext); + + /* wait for the extent until its state becomes @state */ + rc = l_wait_event(ext->oe_waitq, ext->oe_state == state, &lwi); + if (rc == 0 && ext->oe_rc < 0) + rc = ext->oe_rc; + RETURN(rc); +} + +/** + * Discard pages with index greater than @size. If @ext is overlapped with + * @size, then partial truncate happens. + */ +static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index) +{ + struct cl_env_nest nest; + struct lu_env *env; + struct cl_io *io; + struct osc_object *obj = ext->oe_obj; + struct client_obd *cli = osc_cli(obj); + struct osc_async_page *oap; + struct osc_async_page *tmp; + int pages_in_chunk = 0; + int ppc_bits = cli->cl_chunkbits - CFS_PAGE_SHIFT; + __u64 trunc_chunk = trunc_index >> ppc_bits; + int grants = 0; + int nr_pages = 0; + int rc = 0; + ENTRY; + + LASSERT(sanity_check(ext) == 0); + LASSERT(ext->oe_state == OES_TRUNC); + LASSERT(!ext->oe_urgent); + + /* Request new lu_env. + * We can't use that env from osc_cache_truncate_start() because + * it's from lov_io_sub and not fully initialized. */ + env = cl_env_nested_get(&nest); + io = &osc_env_info(env)->oti_io; + io->ci_obj = cl_object_top(osc2cl(obj)); + rc = cl_io_init(env, io, CIT_MISC, io->ci_obj); + if (rc < 0) + GOTO(out, rc); + + /* discard all pages with index greater then trunc_index */ + cfs_list_for_each_entry_safe(oap, tmp, &ext->oe_pages, + oap_pending_item) { + struct cl_page *sub = oap2cl_page(oap); + struct cl_page *page = cl_page_top(sub); + + LASSERT(cfs_list_empty(&oap->oap_rpc_item)); + + /* only discard the pages with their index greater than + * trunc_index, and ... */ + if (sub->cp_index < trunc_index) { + /* accounting how many pages remaining in the chunk + * so that we can calculate grants correctly. */ + if (sub->cp_index >> ppc_bits == trunc_chunk) + ++pages_in_chunk; + continue; + } + + cfs_list_del_init(&oap->oap_pending_item); + + cl_page_get(page); + lu_ref_add(&page->cp_reference, "truncate", cfs_current()); + + if (cl_page_own(env, io, page) == 0) { + cl_page_unmap(env, io, page); + cl_page_discard(env, io, page); + cl_page_disown(env, io, page); + } else { + LASSERT(page->cp_state == CPS_FREEING); + LASSERT(0); + } + + lu_ref_del(&page->cp_reference, "truncate", cfs_current()); + cl_page_put(env, page); + + --ext->oe_nr_pages; + ++nr_pages; + } + EASSERTF(ergo(ext->oe_start >= trunc_index, ext->oe_nr_pages == 0), + ext, "trunc_index %lu\n", trunc_index); + + osc_object_lock(obj); + if (ext->oe_nr_pages == 0) { + LASSERT(pages_in_chunk == 0); + grants = ext->oe_grants; + ext->oe_grants = 0; + } else { /* calculate how many grants we can free */ + int chunks = (ext->oe_end >> ppc_bits) - trunc_chunk; + pgoff_t last_index; + + + /* if there is no pages in this chunk, we can also free grants + * for the last chunk */ + if (pages_in_chunk == 0) { + /* if this is the 1st chunk and no pages in this chunk, + * ext->oe_nr_pages must be zero, so we should be in + * the other if-clause. */ + LASSERT(trunc_chunk > 0); + --trunc_chunk; + ++chunks; + } + + /* this is what we can free from this extent */ + grants = chunks << cli->cl_chunkbits; + ext->oe_grants -= grants; + last_index = ((trunc_chunk + 1) << ppc_bits) - 1; + ext->oe_end = min(last_index, ext->oe_max_end); + LASSERT(ext->oe_end >= ext->oe_start); + LASSERT(ext->oe_grants > 0); + } + osc_object_unlock(obj); + + if (grants > 0 || nr_pages > 0) + osc_free_grant(cli, nr_pages, grants); + +out: + cl_io_fini(env, io); + cl_env_nested_put(&nest, env); + RETURN(rc); +} + +/** + * This function is used to make the extent prepared for transfer. + * A race with flusing page - ll_writepage() has to be handled cautiously. + */ +static int osc_extent_make_ready(const struct lu_env *env, + struct osc_extent *ext) +{ + struct osc_async_page *oap; + struct osc_async_page *last = NULL; + struct osc_object *obj = ext->oe_obj; + int page_count = 0; + int rc; + ENTRY; + + /* we're going to grab page lock, so object lock must not be taken. */ + LASSERT(sanity_check(ext) == 0); + /* in locking state, any process should not touch this extent. */ + EASSERT(ext->oe_state == OES_LOCKING, ext); + EASSERT(ext->oe_owner != NULL, ext); + + OSC_EXTENT_DUMP(D_CACHE, ext, "make ready\n"); + + cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { + ++page_count; + if (last == NULL || last->oap_obj_off < oap->oap_obj_off) + last = oap; + + /* checking ASYNC_READY is race safe */ + if ((oap->oap_async_flags & ASYNC_READY) != 0) + continue; + + rc = osc_make_ready(env, oap, OBD_BRW_WRITE); + switch (rc) { + case 0: + cfs_spin_lock(&oap->oap_lock); + oap->oap_async_flags |= ASYNC_READY; + cfs_spin_unlock(&oap->oap_lock); + break; + case -EALREADY: + LASSERT((oap->oap_async_flags & ASYNC_READY) != 0); + break; + default: + LASSERTF(0, "unknown return code: %d\n", rc); + } + } + + LASSERT(page_count == ext->oe_nr_pages); + LASSERT(last != NULL); + /* the last page is the only one we need to refresh its count by + * the size of file. */ + if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) { + last->oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE); + LASSERT(last->oap_count > 0); + LASSERT(last->oap_page_off + last->oap_count <= CFS_PAGE_SIZE); + last->oap_async_flags |= ASYNC_COUNT_STABLE; + } + + /* for the rest of pages, we don't need to call osf_refresh_count() + * because it's known they are not the last page */ + cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { + if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) { + oap->oap_count = CFS_PAGE_SIZE - oap->oap_page_off; + oap->oap_async_flags |= ASYNC_COUNT_STABLE; + } + } + + osc_object_lock(obj); + osc_extent_state_set(ext, OES_RPC); + osc_object_unlock(obj); + /* get a refcount for RPC. */ + osc_extent_get(ext); + + RETURN(0); +} + +/** + * Quick and simple version of osc_extent_find(). This function is frequently + * called to expand the extent for the same IO. To expand the extent, the + * page index must be in the same or next chunk of ext->oe_end. + */ +static int osc_extent_expand(struct osc_extent *ext, pgoff_t index, int *grants) +{ + struct osc_object *obj = ext->oe_obj; + struct client_obd *cli = osc_cli(obj); + struct osc_extent *next; + int ppc_bits = cli->cl_chunkbits - CFS_PAGE_SHIFT; + pgoff_t chunk = index >> ppc_bits; + pgoff_t end_chunk; + pgoff_t end_index; + int chunksize = 1 << cli->cl_chunkbits; + int rc = 0; + ENTRY; + + LASSERT(ext->oe_max_end >= index && ext->oe_start <= index); + osc_object_lock(obj); + LASSERT(sanity_check_nolock(ext) == 0); + end_chunk = ext->oe_end >> ppc_bits; + if (chunk > end_chunk + 1) + GOTO(out, rc = -ERANGE); + + if (end_chunk >= chunk) + GOTO(out, rc = 0); + + LASSERT(end_chunk + 1 == chunk); + /* try to expand this extent to cover @index */ + end_index = min(ext->oe_max_end, ((chunk + 1) << ppc_bits) - 1); + + next = next_extent(ext); + if (next != NULL && next->oe_start <= end_index) + /* complex mode - overlapped with the next extent, + * this case will be handled by osc_extent_find() */ + GOTO(out, rc = -EAGAIN); + + ext->oe_end = end_index; + ext->oe_grants += chunksize; + *grants -= chunksize; + LASSERT(*grants >= 0); + EASSERTF(osc_extent_is_overlapped(obj, ext) == 0, ext, + "overlapped after expanding for %lu.\n", index); + EXIT; + +out: + osc_object_unlock(obj); + RETURN(rc); +} + +static void osc_extent_tree_dump0(int level, struct osc_object *obj, + const char *func, int line) +{ + struct osc_extent *ext; + int cnt; + + CDEBUG(level, "Dump object %p extents at %s:%d, mppr: %u.\n", + obj, func, line, osc_cli(obj)->cl_max_pages_per_rpc); + + /* osc_object_lock(obj); */ + cnt = 1; + for (ext = first_extent(obj); ext != NULL; ext = next_extent(ext)) + OSC_EXTENT_DUMP(level, ext, "in tree %d.\n", cnt++); + + cnt = 1; + cfs_list_for_each_entry(ext, &obj->oo_hp_exts, oe_link) + OSC_EXTENT_DUMP(level, ext, "hp %d.\n", cnt++); + + cnt = 1; + cfs_list_for_each_entry(ext, &obj->oo_urgent_exts, oe_link) + OSC_EXTENT_DUMP(level, ext, "urgent %d.\n", cnt++); + + cnt = 1; + cfs_list_for_each_entry(ext, &obj->oo_reading_exts, oe_link) + OSC_EXTENT_DUMP(level, ext, "reading %d.\n", cnt++); + /* osc_object_unlock(obj); */ +} + +/* ------------------ osc extent end ------------------ */ -static inline struct osc_page *oap2osc_page(struct osc_async_page *oap) +static inline int osc_is_ready(struct osc_object *osc) { - return (struct osc_page *)container_of(oap, struct osc_page, ops_oap); + return !cfs_list_empty(&osc->oo_ready_item) || + !cfs_list_empty(&osc->oo_hp_ready_item); } +#define OSC_IO_DEBUG(OSC, STR, args...) \ + CDEBUG(D_CACHE, "obj %p ready %d|%c|%c wr %d|%c|%c rd %d|%c " STR, \ + (OSC), osc_is_ready(OSC), \ + list_empty_marker(&(OSC)->oo_hp_ready_item), \ + list_empty_marker(&(OSC)->oo_ready_item), \ + cfs_atomic_read(&(OSC)->oo_nr_writes), \ + list_empty_marker(&(OSC)->oo_hp_exts), \ + list_empty_marker(&(OSC)->oo_urgent_exts), \ + cfs_atomic_read(&(OSC)->oo_nr_reads), \ + list_empty_marker(&(OSC)->oo_reading_exts), \ + ##args) + static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap, int cmd) { struct osc_page *opg = oap2osc_page(oap); - struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page); + struct cl_page *page = cl_page_top(oap2cl_page(oap)); int result; LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */ @@ -92,7 +1190,7 @@ static int osc_refresh_count(const struct lu_env *env, struct osc_async_page *oap, int cmd) { struct osc_page *opg = oap2osc_page(oap); - struct cl_page *page; + struct cl_page *page = oap2cl_page(oap); struct cl_object *obj; struct cl_attr *attr = &osc_env_info(env)->oti_attr; @@ -102,7 +1200,6 @@ static int osc_refresh_count(const struct lu_env *env, /* readpage queues with _COUNT_STABLE, shouldn't get here. */ LASSERT(!(cmd & OBD_BRW_READ)); LASSERT(opg != NULL); - page = opg->ops_cl.cpl_page; obj = opg->ops_cl.cpl_obj; cl_object_attr_lock(obj); @@ -122,10 +1219,10 @@ static int osc_refresh_count(const struct lu_env *env, } static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, - int cmd, struct obdo *oa, int rc) + int cmd, int rc) { struct osc_page *opg = oap2osc_page(oap); - struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page); + struct cl_page *page = cl_page_top(oap2cl_page(oap)); struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj); enum cl_req_type crt; int srvlock; @@ -145,11 +1242,6 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, cl_req_page_done(env, page); LASSERT(page->cp_req == NULL); - /* As the transfer for this page is being done, clear the flags */ - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags = 0; - cfs_spin_unlock(&oap->oap_lock); - crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE; /* Clear opg->ops_transfer_pinned before VM lock is released. */ opg->ops_transfer_pinned = 0; @@ -158,6 +1250,7 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, LASSERT(opg->ops_submitter != NULL); LASSERT(!cfs_list_empty(&opg->ops_inflight)); cfs_list_del_init(&opg->ops_inflight); + opg->ops_submitter = NULL; cfs_spin_unlock(&obj->oo_seatbelt); opg->ops_submit_time = 0; @@ -192,6 +1285,17 @@ static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, RETURN(0); } +#define OSC_DUMP_GRANT(cli, fmt, args...) do { \ + struct client_obd *__tmp = (cli); \ + CDEBUG(D_CACHE, "%s: { dirty: %ld/%ld dirty_pages: %d/%d " \ + "dropped: %ld avail: %ld, reserved: %ld, flight: %d } " fmt, \ + __tmp->cl_import->imp_obd->obd_name, \ + __tmp->cl_dirty, __tmp->cl_dirty_max, \ + cfs_atomic_read(&obd_dirty_pages), obd_max_dirty_pages, \ + __tmp->cl_lost_grant, __tmp->cl_avail_grant, \ + __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, ##args); \ +} while (0) + /* caller must hold loi_list_lock */ static void osc_consume_write_grant(struct client_obd *cli, struct brw_page *pga) @@ -200,20 +1304,17 @@ static void osc_consume_write_grant(struct client_obd *cli, LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT)); cfs_atomic_inc(&obd_dirty_pages); cli->cl_dirty += CFS_PAGE_SIZE; - cli->cl_avail_grant -= CFS_PAGE_SIZE; pga->flag |= OBD_BRW_FROM_GRANT; CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n", CFS_PAGE_SIZE, pga, pga->pg); - LASSERT(cli->cl_avail_grant >= 0); osc_update_next_shrink(cli); } /* the companion to osc_consume_write_grant, called when a brw has completed. * must be called with the loi lock held. */ static void osc_release_write_grant(struct client_obd *cli, - struct brw_page *pga, int sent) + struct brw_page *pga) { - int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096; ENTRY; LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock); @@ -230,72 +1331,132 @@ static void osc_release_write_grant(struct client_obd *cli, cfs_atomic_dec(&obd_dirty_transit_pages); cli->cl_dirty_transit -= CFS_PAGE_SIZE; } - if (!sent) { - /* Reclaim grant from truncated pages. This is used to solve - * write-truncate and grant all gone(to lost_grant) problem. - * For a vfs write this problem can be easily solved by a sync - * write, however, this is not an option for page_mkwrite() - * because grant has to be allocated before a page becomes - * dirty. */ - if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE) - cli->cl_avail_grant += CFS_PAGE_SIZE; - else - cli->cl_lost_grant += CFS_PAGE_SIZE; - CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n", - cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty); - } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) { - /* For short writes we shouldn't count parts of pages that - * span a whole block on the OST side, or our accounting goes - * wrong. Should match the code in filter_grant_check. */ - int offset = pga->off & ~CFS_PAGE_MASK; - int count = pga->count + (offset & (blocksize - 1)); - int end = (offset + pga->count) & (blocksize - 1); - if (end) - count += blocksize - end; + EXIT; +} + +/** + * To avoid sleeping with object lock held, it's good for us allocate enough + * grants before entering into critical section. + * + * client_obd_list_lock held by caller + */ +static int osc_reserve_grant(struct client_obd *cli, unsigned int bytes) +{ + int rc = -EDQUOT; - cli->cl_lost_grant += CFS_PAGE_SIZE - count; - CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n", - CFS_PAGE_SIZE - count, cli->cl_lost_grant, - cli->cl_avail_grant, cli->cl_dirty); + if (cli->cl_avail_grant >= bytes) { + cli->cl_avail_grant -= bytes; + cli->cl_reserved_grant += bytes; + rc = 0; } + return rc; +} - EXIT; +static void __osc_unreserve_grant(struct client_obd *cli, + unsigned int reserved, unsigned int unused) +{ + /* it's quite normal for us to get more grant than reserved. + * Thinking about a case that two extents merged by adding a new + * chunk, we can save one extent tax. If extent tax is greater than + * one chunk, we can save more grant by adding a new chunk */ + cli->cl_reserved_grant -= reserved; + if (unused > reserved) { + cli->cl_avail_grant += reserved; + cli->cl_lost_grant += unused - reserved; + } else { + cli->cl_avail_grant += unused; + } + if (unused > 0) + osc_wake_cache_waiters(cli); +} + +void osc_unreserve_grant(struct client_obd *cli, + unsigned int reserved, unsigned int unused) +{ + client_obd_list_lock(&cli->cl_loi_list_lock); + __osc_unreserve_grant(cli, reserved, unused); + client_obd_list_unlock(&cli->cl_loi_list_lock); +} + +/** + * Free grant after IO is finished or canceled. + * + * @lost_grant is used to remember how many grants we have allocated but not + * used, we should return these grants to OST. There're two cases where grants + * can be lost: + * 1. truncate; + * 2. blocksize at OST is less than CFS_PAGE_SIZE and a partial page was + * written. In this case OST may use less chunks to serve this partial + * write. OSTs don't actually know the page size on the client side. so + * clients have to calculate lost grant by the blocksize on the OST. + * See filter_grant_check() for details. + */ +static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages, + unsigned int lost_grant) +{ + int grant = (1 << cli->cl_chunkbits) + cli->cl_extent_tax; + + client_obd_list_lock(&cli->cl_loi_list_lock); + cfs_atomic_sub(nr_pages, &obd_dirty_pages); + cli->cl_dirty -= nr_pages << CFS_PAGE_SHIFT; + cli->cl_lost_grant += lost_grant; + if (cli->cl_avail_grant < grant && cli->cl_lost_grant >= grant) { + /* borrow some grant from truncate to avoid the case that + * truncate uses up all avail grant */ + cli->cl_lost_grant -= grant; + cli->cl_avail_grant += grant; + } + osc_wake_cache_waiters(cli); + client_obd_list_unlock(&cli->cl_loi_list_lock); + CDEBUG(D_CACHE, "lost %u grant: %lu avail: %lu dirty: %lu\n", + lost_grant, cli->cl_lost_grant, + cli->cl_avail_grant, cli->cl_dirty); } /* The companion to osc_enter_cache(), called when @oap is no longer part of * the dirty accounting. Writeback completes or truncate happens before * writing starts. Must be called with the loi lock held. */ -static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap, - int sent) +static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap) { - osc_release_write_grant(cli, &oap->oap_brw_page, sent); + osc_release_write_grant(cli, &oap->oap_brw_page); } /** * Non-blocking version of osc_enter_cache() that consumes grant only when it * is available. */ -static int osc_enter_cache_try(const struct lu_env *env, struct client_obd *cli, - struct osc_async_page *oap, int transient) +static int osc_enter_cache_try(struct client_obd *cli, + struct osc_async_page *oap, + int bytes, int transient) { - int has_grant; + int rc; - has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE; - if (has_grant) { + OSC_DUMP_GRANT(cli, "need:%d.\n", bytes); + + rc = osc_reserve_grant(cli, bytes); + if (rc < 0) + return 0; + + if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max && + cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) { osc_consume_write_grant(cli, &oap->oap_brw_page); if (transient) { cli->cl_dirty_transit += CFS_PAGE_SIZE; cfs_atomic_inc(&obd_dirty_transit_pages); oap->oap_brw_flags |= OBD_BRW_NOCACHE; } + rc = 1; + } else { + __osc_unreserve_grant(cli, bytes, bytes); + rc = 0; } - return has_grant; + return rc; } /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for * grant or cache space. */ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, - struct osc_async_page *oap) + struct osc_async_page *oap, int bytes) { struct osc_object *osc = oap->oap_obj; struct lov_oinfo *loi = osc->oo_oinfo; @@ -304,23 +1465,20 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, int rc = -EDQUOT; ENTRY; - CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu " - "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages), - cli->cl_dirty_max, obd_max_dirty_pages, - cli->cl_lost_grant, cli->cl_avail_grant); + OSC_DUMP_GRANT(cli, "need:%d.\n", bytes); + + client_obd_list_lock(&cli->cl_loi_list_lock); /* force the caller to try sync io. this can jump the list * of queued writes and create a discontiguous rpc stream */ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) || cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) - RETURN(-EDQUOT); + GOTO(out, rc = -EDQUOT); /* Hopefully normal case - cache space and write credits available */ - if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max && - cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages && - osc_enter_cache_try(env, cli, oap, 0)) - RETURN(0); + if (osc_enter_cache_try(cli, oap, bytes, 0)) + GOTO(out, rc = 0); /* We can get here for two reasons: too many dirty pages in cache, or * run out of grants. In both cases we should write dirty pages out. @@ -329,19 +1487,20 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, * The exiting condition is no avail grants and no dirty pages caching, * that really means there is no space on the OST. */ cfs_waitq_init(&ocw.ocw_waitq); - ocw.ocw_oap = oap; - while (cli->cl_dirty > 0) { + ocw.ocw_oap = oap; + ocw.ocw_grant = bytes; + while (cli->cl_dirty > 0 || cli->cl_w_in_flight > 0) { cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters); ocw.ocw_rc = 0; + client_obd_list_unlock(&cli->cl_loi_list_lock); osc_io_unplug(env, cli, osc, PDL_POLICY_ROUND); - client_obd_list_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n", cli->cl_import->imp_obd->obd_name, &ocw, oap); - rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry), - &lwi); + rc = l_wait_event(ocw.ocw_waitq, + cfs_list_empty(&ocw.ocw_entry), &lwi); client_obd_list_lock(&cli->cl_loi_list_lock); cfs_list_del_init(&ocw.ocw_entry); @@ -351,8 +1510,16 @@ static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, rc = ocw.ocw_rc; if (rc != -EDQUOT) break; + if (osc_enter_cache_try(cli, oap, bytes, 0)) { + rc = 0; + break; + } } + EXIT; +out: + client_obd_list_unlock(&cli->cl_loi_list_lock); + OSC_DUMP_GRANT(cli, "returned %d.\n", rc); RETURN(rc); } @@ -366,8 +1533,8 @@ void osc_wake_cache_waiters(struct client_obd *cli) cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { /* if we can't dirty more, we must wait until some is written */ if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) || - (cfs_atomic_read(&obd_dirty_pages) + 1 > - obd_max_dirty_pages)) { + (cfs_atomic_read(&obd_dirty_pages) + 1 > + obd_max_dirty_pages)) { CDEBUG(D_CACHE, "no dirty room: dirty: %ld " "osc max %ld, sys max %d\n", cli->cl_dirty, cli->cl_dirty_max, obd_max_dirty_pages); @@ -384,17 +1551,13 @@ void osc_wake_cache_waiters(struct client_obd *cli) ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry); cfs_list_del_init(&ocw->ocw_entry); - if (cli->cl_avail_grant < CFS_PAGE_SIZE) { - /* no more RPCs in flight to return grant, do sync IO */ + + ocw->ocw_rc = 0; + if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0)) ocw->ocw_rc = -EDQUOT; - CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap); - } else { - osc_consume_write_grant(cli, - &ocw->ocw_oap->oap_brw_page); - } - CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n", - ocw, ocw->ocw_oap, cli->cl_avail_grant); + CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n", + ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc); cfs_waitq_signal(&ocw->ocw_waitq); } @@ -404,21 +1567,7 @@ void osc_wake_cache_waiters(struct client_obd *cli) static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc) { - struct osc_async_page *oap; - int hprpc = 0; - - if (!cfs_list_empty(&osc->oo_write_pages.oop_urgent)) { - oap = cfs_list_entry(osc->oo_write_pages.oop_urgent.next, - struct osc_async_page, oap_urgent_item); - hprpc = !!(oap->oap_async_flags & ASYNC_HP); - } - - if (!hprpc && !cfs_list_empty(&osc->oo_read_pages.oop_urgent)) { - oap = cfs_list_entry(osc->oo_read_pages.oop_urgent.next, - struct osc_async_page, oap_urgent_item); - hprpc = !!(oap->oap_async_flags & ASYNC_HP); - } - + int hprpc = !!cfs_list_empty(&osc->oo_hp_exts); return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc; } @@ -428,35 +1577,31 @@ static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc) static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc, int cmd) { - struct osc_oap_pages *lop; + int invalid_import = 0; ENTRY; - if (cmd & OBD_BRW_WRITE) { - lop = &osc->oo_write_pages; - } else { - lop = &osc->oo_read_pages; - } - - if (lop->oop_num_pending == 0) - RETURN(0); - /* if we have an invalid import we want to drain the queued pages * by forcing them through rpcs that immediately fail and complete * the pages. recovery relies on this to empty the queued pages * before canceling the locks and evicting down the llite pages */ - if (cli->cl_import == NULL || cli->cl_import->imp_invalid) - RETURN(1); - - /* stream rpcs in queue order as long as as there is an urgent page - * queued. this is our cheap solution for good batching in the case - * where writepage marks some random page in the middle of the file - * as urgent because of, say, memory pressure */ - if (!cfs_list_empty(&lop->oop_urgent)) { - CDEBUG(D_CACHE, "urgent request forcing RPC\n"); - RETURN(1); - } + if ((cli->cl_import == NULL || cli->cl_import->imp_invalid)) + invalid_import = 1; if (cmd & OBD_BRW_WRITE) { + if (cfs_atomic_read(&osc->oo_nr_writes) == 0) + RETURN(0); + if (invalid_import) { + CDEBUG(D_CACHE, "invalid import forcing RPC\n"); + RETURN(1); + } + if (!cfs_list_empty(&osc->oo_hp_exts)) { + CDEBUG(D_CACHE, "high prio request forcing RPC\n"); + RETURN(1); + } + if (!cfs_list_empty(&osc->oo_urgent_exts)) { + CDEBUG(D_CACHE, "urgent request forcing RPC\n"); + RETURN(1); + } /* trigger a write rpc stream as long as there are dirtiers * waiting for space. as they're waiting, they're not going to * create more pages to coalesce with what's waiting.. */ @@ -464,40 +1609,42 @@ static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc, CDEBUG(D_CACHE, "cache waiters forcing RPC\n"); RETURN(1); } + if (cfs_atomic_read(&osc->oo_nr_writes) >= + cli->cl_max_pages_per_rpc) + RETURN(1); + } else { + if (cfs_atomic_read(&osc->oo_nr_reads) == 0) + RETURN(0); + if (invalid_import) { + CDEBUG(D_CACHE, "invalid import forcing RPC\n"); + RETURN(1); + } + /* all read are urgent. */ + if (!cfs_list_empty(&osc->oo_reading_exts)) + RETURN(1); } - if (lop->oop_num_pending >= cli->cl_max_pages_per_rpc) - RETURN(1); RETURN(0); } -static void lop_update_pending(struct client_obd *cli, - struct osc_oap_pages *lop, int cmd, int delta) +static void osc_update_pending(struct osc_object *obj, int cmd, int delta) { - lop->oop_num_pending += delta; - if (cmd & OBD_BRW_WRITE) - cli->cl_pending_w_pages += delta; - else - cli->cl_pending_r_pages += delta; + struct client_obd *cli = osc_cli(obj); + if (cmd & OBD_BRW_WRITE) { + cfs_atomic_add(delta, &obj->oo_nr_writes); + cfs_atomic_add(delta, &cli->cl_pending_w_pages); + LASSERT(cfs_atomic_read(&obj->oo_nr_writes) >= 0); + } else { + cfs_atomic_add(delta, &obj->oo_nr_reads); + cfs_atomic_add(delta, &cli->cl_pending_r_pages); + LASSERT(cfs_atomic_read(&obj->oo_nr_reads) >= 0); + } + OSC_IO_DEBUG(obj, "update pending cmd %d delta %d.\n", cmd, delta); } -static int osc_makes_hprpc(struct osc_oap_pages *lop) +static int osc_makes_hprpc(struct osc_object *obj) { - struct osc_async_page *oap; - ENTRY; - - if (cfs_list_empty(&lop->oop_urgent)) - RETURN(0); - - oap = cfs_list_entry(lop->oop_urgent.next, - struct osc_async_page, oap_urgent_item); - - if (oap->oap_async_flags & ASYNC_HP) { - CDEBUG(D_CACHE, "hp request forcing RPC\n"); - RETURN(1); - } - - RETURN(0); + return !cfs_list_empty(&obj->oo_hp_exts); } static void on_list(cfs_list_t *item, cfs_list_t *list, int should_be_on) @@ -510,10 +1657,9 @@ static void on_list(cfs_list_t *item, cfs_list_t *list, int should_be_on) /* maintain the osc's cli list membership invariants so that osc_send_oap_rpc * can find pages to build into rpcs quickly */ -static void osc_list_maint(struct client_obd *cli, struct osc_object *osc) +static int __osc_list_maint(struct client_obd *cli, struct osc_object *osc) { - if (osc_makes_hprpc(&osc->oo_write_pages) || - osc_makes_hprpc(&osc->oo_read_pages)) { + if (osc_makes_hprpc(osc)) { /* HP rpc */ on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list, 0); on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 1); @@ -525,10 +1671,23 @@ static void osc_list_maint(struct client_obd *cli, struct osc_object *osc) } on_list(&osc->oo_write_item, &cli->cl_loi_write_list, - osc->oo_write_pages.oop_num_pending); + cfs_atomic_read(&osc->oo_nr_writes) > 0); on_list(&osc->oo_read_item, &cli->cl_loi_read_list, - osc->oo_read_pages.oop_num_pending); + cfs_atomic_read(&osc->oo_nr_reads) > 0); + + return osc_is_ready(osc); +} + +static int osc_list_maint(struct client_obd *cli, struct osc_object *osc) +{ + int is_ready; + + client_obd_list_lock(&cli->cl_loi_list_lock); + is_ready = __osc_list_maint(cli, osc); + client_obd_list_unlock(&cli->cl_loi_list_lock); + + return is_ready; } /* this is trying to propogate async writeback errors back up to the @@ -553,29 +1712,10 @@ static void osc_process_ar(struct osc_async_rc *ar, __u64 xid, ar->ar_force_sync = 0; } -static void osc_oap_to_pending(struct osc_async_page *oap) -{ - struct osc_object *osc = oap->oap_obj; - struct osc_oap_pages *lop; - - if (oap->oap_cmd & OBD_BRW_WRITE) - lop = &osc->oo_write_pages; - else - lop = &osc->oo_read_pages; - - if (oap->oap_async_flags & ASYNC_HP) - cfs_list_add(&oap->oap_urgent_item, &lop->oop_urgent); - else if (oap->oap_async_flags & ASYNC_URGENT) - cfs_list_add_tail(&oap->oap_urgent_item, &lop->oop_urgent); - cfs_list_add_tail(&oap->oap_pending_item, &lop->oop_pending); - lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1); -} - /* this must be called holding the loi list lock to give coverage to exit_cache, * async_flag maintenance, and oap_request */ -void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, - struct obdo *oa, struct osc_async_page *oap, - int sent, int rc) +static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, + struct osc_async_page *oap, int sent, int rc) { struct osc_object *osc = oap->oap_obj; struct lov_oinfo *loi = osc->oo_oinfo; @@ -588,38 +1728,205 @@ void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, oap->oap_request = NULL; } - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags = 0; - cfs_spin_unlock(&oap->oap_lock); - oap->oap_interrupted = 0; + /* As the transfer for this page is being done, clear the flags */ + cfs_spin_lock(&oap->oap_lock); + oap->oap_async_flags = 0; + cfs_spin_unlock(&oap->oap_lock); + oap->oap_interrupted = 0; + + if (oap->oap_cmd & OBD_BRW_WRITE && xid > 0) { + client_obd_list_lock(&cli->cl_loi_list_lock); + osc_process_ar(&cli->cl_ar, xid, rc); + osc_process_ar(&loi->loi_ar, xid, rc); + client_obd_list_unlock(&cli->cl_loi_list_lock); + } + + rc = osc_completion(env, oap, oap->oap_cmd, rc); + if (rc) + CERROR("completion on oap %p obj %p returns %d.\n", + oap, osc, rc); + + EXIT; +} + +/** + * Try to add extent to one RPC. We need to think about the following things: + * - # of pages must not be over max_pages_per_rpc + * - extent must be compatible with previous ones + */ +static int try_to_add_extent_for_io(struct client_obd *cli, + struct osc_extent *ext, cfs_list_t *rpclist, + int *pc, unsigned int *max_pages) +{ + struct osc_extent *tmp; + ENTRY; + + EASSERT((ext->oe_state == OES_CACHE || ext->oe_state == OES_LOCK_DONE), + ext); + + *max_pages = max(ext->oe_mppr, *max_pages); + if (*pc + ext->oe_nr_pages > *max_pages) + RETURN(0); + + cfs_list_for_each_entry(tmp, rpclist, oe_link) { + EASSERT(tmp->oe_owner == cfs_current(), tmp); +#if 0 + if (overlapped(tmp, ext)) { + OSC_EXTENT_DUMP(D_ERROR, tmp, "overlapped %p.\n", ext); + EASSERT(0, ext); + } +#endif + + if (tmp->oe_srvlock != ext->oe_srvlock || + !tmp->oe_grants != !ext->oe_grants) + RETURN(0); + + /* remove break for strict check */ + break; + } + + *pc += ext->oe_nr_pages; + cfs_list_move_tail(&ext->oe_link, rpclist); + ext->oe_owner = cfs_current(); + RETURN(1); +} + +/** + * In order to prevent multiple ptlrpcd from breaking contiguous extents, + * get_write_extent() takes all appropriate extents in atomic. + * + * The following policy is used to collect extents for IO: + * 1. Add as many HP extents as possible; + * 2. Add the first urgent extent in urgent extent list and take it out of + * urgent list; + * 3. Add subsequent extents of this urgent extent; + * 4. If urgent list is not empty, goto 2; + * 5. Traverse the extent tree from the 1st extent; + * 6. Above steps exit if there is no space in this RPC. + */ +static int get_write_extents(struct osc_object *obj, cfs_list_t *rpclist) +{ + struct client_obd *cli = osc_cli(obj); + struct osc_extent *ext; + int page_count = 0; + unsigned int max_pages = cli->cl_max_pages_per_rpc; + + LASSERT(osc_object_is_locked(obj)); + while (!cfs_list_empty(&obj->oo_hp_exts)) { + ext = cfs_list_entry(obj->oo_hp_exts.next, struct osc_extent, + oe_link); + LASSERT(ext->oe_state == OES_CACHE); + if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count, + &max_pages)) + return page_count; + EASSERT(ext->oe_nr_pages <= max_pages, ext); + } + if (page_count == max_pages) + return page_count; + + while (!cfs_list_empty(&obj->oo_urgent_exts)) { + ext = cfs_list_entry(obj->oo_urgent_exts.next, + struct osc_extent, oe_link); + if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count, + &max_pages)) + return page_count; + + if (!ext->oe_intree) + continue; + + while ((ext = next_extent(ext)) != NULL) { + if ((ext->oe_state != OES_CACHE) || + (!cfs_list_empty(&ext->oe_link) && + ext->oe_owner != NULL)) + continue; + + if (!try_to_add_extent_for_io(cli, ext, rpclist, + &page_count, &max_pages)) + return page_count; + } + } + if (page_count == max_pages) + return page_count; + + ext = first_extent(obj); + while (ext != NULL) { + if ((ext->oe_state != OES_CACHE) || + /* this extent may be already in current rpclist */ + (!cfs_list_empty(&ext->oe_link) && ext->oe_owner != NULL)) { + ext = next_extent(ext); + continue; + } + + if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count, + &max_pages)) + return page_count; + + ext = next_extent(ext); + } + return page_count; +} + +static int +osc_send_write_rpc(const struct lu_env *env, struct client_obd *cli, + struct osc_object *osc, pdl_policy_t pol) +{ + CFS_LIST_HEAD(rpclist); + struct osc_extent *ext; + struct osc_extent *tmp; + struct osc_extent *first = NULL; + obd_count page_count = 0; + int srvlock = 0; + int rc = 0; + ENTRY; + + LASSERT(osc_object_is_locked(osc)); + + page_count = get_write_extents(osc, &rpclist); + LASSERT(equi(page_count == 0, cfs_list_empty(&rpclist))); - if (oap->oap_cmd & OBD_BRW_WRITE) { - osc_process_ar(&cli->cl_ar, xid, rc); - osc_process_ar(&loi->loi_ar, xid, rc); + if (cfs_list_empty(&rpclist)) + RETURN(0); + + osc_update_pending(osc, OBD_BRW_WRITE, -page_count); + + cfs_list_for_each_entry(ext, &rpclist, oe_link) { + LASSERT(ext->oe_state == OES_CACHE || + ext->oe_state == OES_LOCK_DONE); + if (ext->oe_state == OES_CACHE) + osc_extent_state_set(ext, OES_LOCKING); + else + osc_extent_state_set(ext, OES_RPC); } - if (rc == 0 && oa != NULL) { - if (oa->o_valid & OBD_MD_FLBLOCKS) - loi->loi_lvb.lvb_blocks = oa->o_blocks; - if (oa->o_valid & OBD_MD_FLMTIME) - loi->loi_lvb.lvb_mtime = oa->o_mtime; - if (oa->o_valid & OBD_MD_FLATIME) - loi->loi_lvb.lvb_atime = oa->o_atime; - if (oa->o_valid & OBD_MD_FLCTIME) - loi->loi_lvb.lvb_ctime = oa->o_ctime; + /* we're going to grab page lock, so release object lock because + * lock order is page lock -> object lock. */ + osc_object_unlock(osc); + + cfs_list_for_each_entry_safe(ext, tmp, &rpclist, oe_link) { + if (ext->oe_state == OES_LOCKING) { + rc = osc_extent_make_ready(env, ext); + if (unlikely(rc < 0)) { + cfs_list_del_init(&ext->oe_link); + osc_extent_finish(env, ext, 0, rc); + continue; + } + } + if (first == NULL) { + first = ext; + srvlock = ext->oe_srvlock; + } else { + LASSERT(srvlock == ext->oe_srvlock); + } } - rc = osc_completion(env, oap, oap->oap_cmd, oa, rc); + if (!cfs_list_empty(&rpclist)) { + LASSERT(page_count > 0); + rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_WRITE, pol); + LASSERT(cfs_list_empty(&rpclist)); + } - /* cl_page_completion() drops PG_locked. so, a new I/O on the page could - * start, but OSC calls it under lock and thus we can add oap back to - * pending safely */ - if (rc) - /* upper layer wants to leave the page on pending queue */ - osc_oap_to_pending(oap); - else - osc_exit_cache(cli, oap, sent); - EXIT; + osc_object_lock(osc); + RETURN(rc); } /** @@ -633,193 +1940,48 @@ void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, * \return negative on errors. */ static int -osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, - struct osc_object *osc, int cmd, - struct osc_oap_pages *lop, pdl_policy_t pol) +osc_send_read_rpc(const struct lu_env *env, struct client_obd *cli, + struct osc_object *osc, pdl_policy_t pol) { - obd_count page_count = 0; - struct osc_async_page *oap = NULL, *tmp; - CFS_LIST_HEAD(rpc_list); - int srvlock = 0, mem_tight = 0; - obd_off starting_offset = OBD_OBJECT_EOF; - unsigned int ending_offset; - int starting_page_off = 0; - int rc; + struct osc_extent *ext; + struct osc_extent *next; + CFS_LIST_HEAD(rpclist); + int page_count = 0; + unsigned int max_pages = cli->cl_max_pages_per_rpc; + int rc = 0; ENTRY; - /* ASYNC_HP pages first. At present, when the lock the pages is - * to be canceled, the pages covered by the lock will be sent out - * with ASYNC_HP. We have to send out them as soon as possible. */ - cfs_list_for_each_entry_safe(oap, tmp, &lop->oop_urgent, oap_urgent_item) { - if (oap->oap_async_flags & ASYNC_HP) - cfs_list_move(&oap->oap_pending_item, &rpc_list); - else if (!(oap->oap_brw_flags & OBD_BRW_SYNC)) - /* only do this for writeback pages. */ - cfs_list_move_tail(&oap->oap_pending_item, &rpc_list); - if (++page_count >= cli->cl_max_pages_per_rpc) - break; - } - cfs_list_splice_init(&rpc_list, &lop->oop_pending); - page_count = 0; - - /* first we find the pages we're allowed to work with */ - cfs_list_for_each_entry_safe(oap, tmp, &lop->oop_pending, - oap_pending_item) { - LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, " - "magic 0x%x\n", oap, oap->oap_magic); - - if (page_count != 0 && - srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) { - CDEBUG(D_PAGE, "SRVLOCK flag mismatch," - " oap %p, page %p, srvlock %u\n", - oap, oap->oap_brw_page.pg, (unsigned)!srvlock); - break; - } - - /* If there is a gap at the start of this page, it can't merge - * with any previous page, so we'll hand the network a - * "fragmented" page array that it can't transfer in 1 RDMA */ - if (oap->oap_obj_off < starting_offset) { - if (starting_page_off != 0) - break; - - starting_page_off = oap->oap_page_off; - starting_offset = oap->oap_obj_off + starting_page_off; - } else if (oap->oap_page_off != 0) - break; - - /* in llite being 'ready' equates to the page being locked - * until completion unlocks it. commit_write submits a page - * as not ready because its unlock will happen unconditionally - * as the call returns. if we race with commit_write giving - * us that page we don't want to create a hole in the page - * stream, so we stop and leave the rpc to be fired by - * another dirtier or kupdated interval (the not ready page - * will still be on the dirty list). we could call in - * at the end of ll_file_write to process the queue again. */ - if (!(oap->oap_async_flags & ASYNC_READY)) { - int rc = osc_make_ready(env, oap, cmd); - if (rc < 0) - CDEBUG(D_INODE, "oap %p page %p returned %d " - "instead of ready\n", oap, - oap->oap_page, rc); - switch (rc) { - case -EAGAIN: - /* llite is telling us that the page is still - * in commit_write and that we should try - * and put it in an rpc again later. we - * break out of the loop so we don't create - * a hole in the sequence of pages in the rpc - * stream.*/ - oap = NULL; - break; - case -EINTR: - /* the io isn't needed.. tell the checks - * below to complete the rpc with EINTR */ - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags |= ASYNC_COUNT_STABLE; - cfs_spin_unlock(&oap->oap_lock); - oap->oap_count = -EINTR; - break; - case 0: - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags |= ASYNC_READY; - cfs_spin_unlock(&oap->oap_lock); - break; - default: - LASSERTF(0, "oap %p page %p returned %d " - "from make_ready\n", oap, - oap->oap_page, rc); - break; - } - } - if (oap == NULL) - break; - - /* take the page out of our book-keeping */ - cfs_list_del_init(&oap->oap_pending_item); - lop_update_pending(cli, lop, cmd, -1); - cfs_list_del_init(&oap->oap_urgent_item); - - /* ask the caller for the size of the io as the rpc leaves. */ - if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) { - oap->oap_count = osc_refresh_count(env, oap, cmd); - LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE); - } - if (oap->oap_count <= 0) { - CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap, - oap->oap_count); - osc_ap_completion(env, cli, NULL, - oap, 0, oap->oap_count); - continue; - } - - /* now put the page back in our accounting */ - cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list); - if (page_count++ == 0) - srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK); - - if (oap->oap_brw_flags & OBD_BRW_MEMALLOC) - mem_tight = 1; - - /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized - * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads - * have the same alignment as the initial writes that allocated - * extents on the server. */ - ending_offset = oap->oap_obj_off + oap->oap_page_off + - oap->oap_count; - if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1))) - break; - - if (page_count >= cli->cl_max_pages_per_rpc) - break; - - /* If there is a gap at the end of this page, it can't merge - * with any subsequent pages, so we'll hand the network a - * "fragmented" page array that it can't transfer in 1 RDMA */ - if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE) + LASSERT(osc_object_is_locked(osc)); + cfs_list_for_each_entry_safe(ext, next, + &osc->oo_reading_exts, oe_link) { + EASSERT(ext->oe_state == OES_LOCK_DONE, ext); + if (!try_to_add_extent_for_io(cli, ext, &rpclist, &page_count, + &max_pages)) break; + osc_extent_state_set(ext, OES_RPC); + EASSERT(ext->oe_nr_pages <= max_pages, ext); } + LASSERT(page_count <= max_pages); - osc_list_maint(cli, osc); - - client_obd_list_unlock(&cli->cl_loi_list_lock); + osc_update_pending(osc, OBD_BRW_READ, -page_count); - if (page_count == 0) { - client_obd_list_lock(&cli->cl_loi_list_lock); - RETURN(0); - } + if (!cfs_list_empty(&rpclist)) { + osc_object_unlock(osc); - if (mem_tight) - cmd |= OBD_BRW_MEMALLOC; - rc = osc_build_rpc(env, cli, &rpc_list, page_count, cmd, pol); - if (rc != 0) { - LASSERT(cfs_list_empty(&rpc_list)); - osc_list_maint(cli, osc); - RETURN(rc); - } + LASSERT(page_count > 0); + rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_READ, pol); + LASSERT(cfs_list_empty(&rpclist)); - starting_offset &= PTLRPC_MAX_BRW_SIZE - 1; - if (cmd == OBD_BRW_READ) { - cli->cl_r_in_flight++; - lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); - lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); - lprocfs_oh_tally_log2(&cli->cl_read_offset_hist, - (starting_offset >> CFS_PAGE_SHIFT) + 1); - } else { - cli->cl_w_in_flight++; - lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count); - lprocfs_oh_tally(&cli->cl_write_rpc_hist, - cli->cl_w_in_flight); - lprocfs_oh_tally_log2(&cli->cl_write_offset_hist, - (starting_offset >> CFS_PAGE_SHIFT) + 1); + osc_object_lock(osc); } - - RETURN(1); + RETURN(rc); } -#define list_to_obj(list, item) \ - cfs_list_entry((list)->next, struct osc_object, oo_##item) +#define list_to_obj(list, item) ({ \ + cfs_list_t *__tmp = (list)->next; \ + cfs_list_del_init(__tmp); \ + cfs_list_entry(__tmp, struct osc_object, oo_##item); \ +}) /* This is called by osc_check_rpcs() to find which objects have pages that * we could be sending. These lists are maintained by osc_makes_rpc(). */ @@ -861,14 +2023,23 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli, pdl_policy_t pol) { struct osc_object *osc; - int rc = 0, race_counter = 0; + int rc = 0; ENTRY; while ((osc = osc_next_obj(cli)) != NULL) { + struct cl_object *obj = osc2cl(osc); + struct lu_ref_link *link; + OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli)); - if (osc_max_rpc_in_flight(cli, osc)) + if (osc_max_rpc_in_flight(cli, osc)) { + __osc_list_maint(cli, osc); break; + } + + cl_object_get(obj); + client_obd_list_unlock(&cli->cl_loi_list_lock); + link = lu_object_ref_add(&obj->co_lu, "check", cfs_current()); /* attempt some read/write balancing by alternating between * reads and writes in an object. The makes_rpc checks here @@ -876,13 +2047,13 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli, * instead of objects. we don't want send_oap_rpc to drain a * partial read pending queue when we're given this object to * do io on writes while there are cache waiters */ + osc_object_lock(osc); if (osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) { - rc = osc_send_oap_rpc(env, cli, osc, OBD_BRW_WRITE, - &osc->oo_write_pages, pol); + rc = osc_send_write_rpc(env, cli, osc, pol); if (rc < 0) { CERROR("Write request failed with %d\n", rc); - /* osc_send_oap_rpc failed, mostly because of + /* osc_send_write_rpc failed, mostly because of * memory pressure. * * It can't break here, because if: @@ -900,53 +2071,56 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli, * Anyway, continue to drain pages. */ /* break; */ } - - if (rc > 0) - race_counter = 0; - else if (rc == 0) - race_counter++; } if (osc_makes_rpc(cli, osc, OBD_BRW_READ)) { - rc = osc_send_oap_rpc(env, cli, osc, OBD_BRW_READ, - &osc->oo_read_pages, pol); + rc = osc_send_read_rpc(env, cli, osc, pol); if (rc < 0) CERROR("Read request failed with %d\n", rc); - - if (rc > 0) - race_counter = 0; - else if (rc == 0) - race_counter++; } - - /* attempt some inter-object balancing by issuing rpcs - * for each object in turn */ - if (!cfs_list_empty(&osc->oo_hp_ready_item)) - cfs_list_del_init(&osc->oo_hp_ready_item); - if (!cfs_list_empty(&osc->oo_ready_item)) - cfs_list_del_init(&osc->oo_ready_item); - if (!cfs_list_empty(&osc->oo_write_item)) - cfs_list_del_init(&osc->oo_write_item); - if (!cfs_list_empty(&osc->oo_read_item)) - cfs_list_del_init(&osc->oo_read_item); + osc_object_unlock(osc); osc_list_maint(cli, osc); + lu_object_ref_del_at(&obj->co_lu, link, "check", cfs_current()); + cl_object_put(env, obj); - /* send_oap_rpc fails with 0 when make_ready tells it to - * back off. llite's make_ready does this when it tries - * to lock a page queued for write that is already locked. - * we want to try sending rpcs from many objects, but we - * don't want to spin failing with 0. */ - if (race_counter == 10) - break; + client_obd_list_lock(&cli->cl_loi_list_lock); + } +} + +static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli, + struct osc_object *osc, pdl_policy_t pol, int async) +{ + int has_rpcs = 1; + int rc = 0; + + client_obd_list_lock(&cli->cl_loi_list_lock); + if (osc != NULL) + has_rpcs = __osc_list_maint(cli, osc); + if (has_rpcs) { + if (!async) { + osc_check_rpcs(env, cli, pol); + } else { + CDEBUG(D_CACHE, "Queue writeback work for client %p.\n", + cli); + LASSERT(cli->cl_writeback_work != NULL); + rc = ptlrpcd_queue_work(cli->cl_writeback_work); + } } + client_obd_list_unlock(&cli->cl_loi_list_lock); + return rc; +} + +static int osc_io_unplug_async(const struct lu_env *env, + struct client_obd *cli, struct osc_object *osc) +{ + /* XXX: policy is no use actually. */ + return osc_io_unplug0(env, cli, osc, PDL_POLICY_ROUND, 1); } void osc_io_unplug(const struct lu_env *env, struct client_obd *cli, struct osc_object *osc, pdl_policy_t pol) { - if (osc) - osc_list_maint(cli, osc); - osc_check_rpcs(env, cli, pol); + (void)osc_io_unplug0(env, cli, osc, pol, 0); } int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops, @@ -971,24 +2145,28 @@ int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops, oap->oap_brw_flags = OBD_BRW_NOQUOTA; CFS_INIT_LIST_HEAD(&oap->oap_pending_item); - CFS_INIT_LIST_HEAD(&oap->oap_urgent_item); CFS_INIT_LIST_HEAD(&oap->oap_rpc_item); cfs_spin_lock_init(&oap->oap_lock); - CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", + CDEBUG(D_INFO, "oap %p page %p obj off "LPU64"\n", oap, page, oap->oap_obj_off); RETURN(0); } -int osc_queue_async_io(const struct lu_env *env, struct osc_page *ops) +int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops) { + struct osc_io *oio = osc_env_io(env); + struct osc_extent *ext = NULL; struct osc_async_page *oap = &ops->ops_oap; struct client_obd *cli = oap->oap_cli; struct osc_object *osc = oap->oap_obj; - struct obd_export *exp = osc_export(osc); - int brw_flags = OBD_BRW_ASYNC; - int cmd = OBD_BRW_WRITE; - int rc = 0; + pgoff_t index; + int grants = 0; + int brw_flags = OBD_BRW_ASYNC; + int cmd = OBD_BRW_WRITE; + int need_release = 0; + int rc = 0; ENTRY; if (oap->oap_magic != OAP_MAGIC) @@ -998,13 +2176,13 @@ int osc_queue_async_io(const struct lu_env *env, struct osc_page *ops) RETURN(-EIO); if (!cfs_list_empty(&oap->oap_pending_item) || - !cfs_list_empty(&oap->oap_urgent_item) || !cfs_list_empty(&oap->oap_rpc_item)) RETURN(-EBUSY); /* Set the OBD_BRW_SRVLOCK before the page is queued. */ brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0; - if (!client_is_remote(exp) && cfs_capable(CFS_CAP_SYS_RESOURCE)) { + if (!client_is_remote(osc_export(osc)) && + cfs_capable(CFS_CAP_SYS_RESOURCE)) { brw_flags |= OBD_BRW_NOQUOTA; cmd |= OBD_BRW_NOQUOTA; } @@ -1024,132 +2202,228 @@ int osc_queue_async_io(const struct lu_env *env, struct osc_page *ops) qid[USRQUOTA] = attr->cat_uid; qid[GRPQUOTA] = attr->cat_gid; - if (rc == 0 && - osc_quota_chkdq(cli, qid) == NO_QUOTA) + if (rc == 0 && osc_quota_chkdq(cli, qid) == NO_QUOTA) rc = -EDQUOT; if (rc) RETURN(rc); } - client_obd_list_lock(&cli->cl_loi_list_lock); - oap->oap_cmd = cmd; oap->oap_page_off = ops->ops_from; oap->oap_count = ops->ops_to - ops->ops_from; oap->oap_async_flags = 0; oap->oap_brw_flags = brw_flags; - /* Give a hint to OST that requests are coming from kswapd - bug19529 */ - if (cfs_memory_pressure_get()) - oap->oap_brw_flags |= OBD_BRW_MEMALLOC; - rc = osc_enter_cache(env, cli, oap); - if (rc) { + OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n", + oap, oap->oap_page, oap->oap_cmd & OBD_BRW_RWMASK); + + index = oap2cl_page(oap)->cp_index; + + /* Add this page into extent by the following steps: + * 1. if there exists an active extent for this IO, mostly this page + * can be added to the active extent and sometimes we need to + * expand extent to accomodate this page; + * 2. otherwise, a new extent will be allocated. */ + + ext = oio->oi_active; + if (ext != NULL && ext->oe_start <= index && ext->oe_max_end >= index) { + /* one chunk plus extent overhead must be enough to write this + * page */ + grants = (1 << cli->cl_chunkbits) + cli->cl_extent_tax; + if (ext->oe_end >= index) + grants = 0; + + /* it doesn't need any grant to dirty this page */ + client_obd_list_lock(&cli->cl_loi_list_lock); + rc = osc_enter_cache_try(cli, oap, grants, 0); client_obd_list_unlock(&cli->cl_loi_list_lock); - RETURN(rc); + if (rc == 0) { /* try failed */ + grants = 0; + need_release = 1; + } else if (ext->oe_end < index) { + int tmp = grants; + /* try to expand this extent */ + rc = osc_extent_expand(ext, index, &tmp); + if (rc < 0) { + need_release = 1; + /* don't free reserved grant */ + } else { + OSC_EXTENT_DUMP(D_CACHE, ext, + "expanded for %lu.\n", index); + osc_unreserve_grant(cli, grants, tmp); + grants = 0; + } + } + rc = 0; + } else if (ext != NULL) { + /* index is located outside of active extent */ + need_release = 1; + } + if (need_release) { + osc_extent_release(env, ext); + oio->oi_active = NULL; + ext = NULL; } - OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n", - oap, oap->oap_page, cmd); - - osc_oap_to_pending(oap); - osc_list_maint(cli, osc); - if (!osc_max_rpc_in_flight(cli, osc) && - osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) { - LASSERT(cli->cl_writeback_work != NULL); - rc = ptlrpcd_queue_work(cli->cl_writeback_work); + if (ext == NULL) { + int tmp = (1 << cli->cl_chunkbits) + cli->cl_extent_tax; + + /* try to find new extent to cover this page */ + LASSERT(oio->oi_active == NULL); + /* we may have allocated grant for this page if we failed + * to expand the previous active extent. */ + LASSERT(ergo(grants > 0, grants >= tmp)); + + rc = 0; + if (grants == 0) { + /* we haven't allocated grant for this page. */ + rc = osc_enter_cache(env, cli, oap, tmp); + if (rc == 0) + grants = tmp; + } - CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n", - cli, rc); + tmp = grants; + if (rc == 0) { + ext = osc_extent_find(env, osc, index, &tmp); + if (IS_ERR(ext)) { + LASSERT(tmp == grants); + osc_exit_cache(cli, oap); + rc = PTR_ERR(ext); + ext = NULL; + } else { + oio->oi_active = ext; + } + } + if (grants > 0) + osc_unreserve_grant(cli, grants, tmp); } - client_obd_list_unlock(&cli->cl_loi_list_lock); - RETURN(0); + LASSERT(ergo(rc == 0, ext != NULL)); + if (ext != NULL) { + EASSERTF(ext->oe_end >= index && ext->oe_start <= index, + ext, "index = %lu.\n", index); + LASSERT((oap->oap_brw_flags & OBD_BRW_FROM_GRANT) != 0); + + osc_object_lock(osc); + if (ext->oe_nr_pages == 0) + ext->oe_srvlock = ops->ops_srvlock; + else + LASSERT(ext->oe_srvlock == ops->ops_srvlock); + ++ext->oe_nr_pages; + cfs_list_add_tail(&oap->oap_pending_item, &ext->oe_pages); + osc_object_unlock(osc); + } + RETURN(rc); } -int osc_teardown_async_page(struct osc_object *obj, struct osc_page *ops) +int osc_teardown_async_page(const struct lu_env *env, + struct osc_object *obj, struct osc_page *ops) { struct osc_async_page *oap = &ops->ops_oap; - struct client_obd *cli = oap->oap_cli; - struct osc_oap_pages *lop; + struct osc_extent *ext = NULL; int rc = 0; ENTRY; - if (oap->oap_magic != OAP_MAGIC) - RETURN(-EINVAL); - - if (oap->oap_cmd & OBD_BRW_WRITE) { - lop = &obj->oo_write_pages; - } else { - lop = &obj->oo_read_pages; - } - - client_obd_list_lock(&cli->cl_loi_list_lock); - - if (!cfs_list_empty(&oap->oap_rpc_item)) - GOTO(out, rc = -EBUSY); - - osc_exit_cache(cli, oap, 0); - osc_wake_cache_waiters(cli); - - if (!cfs_list_empty(&oap->oap_urgent_item)) { - cfs_list_del_init(&oap->oap_urgent_item); - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP); - cfs_spin_unlock(&oap->oap_lock); - } - if (!cfs_list_empty(&oap->oap_pending_item)) { - cfs_list_del_init(&oap->oap_pending_item); - lop_update_pending(cli, lop, oap->oap_cmd, -1); + LASSERT(oap->oap_magic == OAP_MAGIC); + + CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n", + oap, ops, oap2cl_page(oap)->cp_index); + + osc_object_lock(obj); + if (!cfs_list_empty(&oap->oap_rpc_item)) { + CDEBUG(D_CACHE, "oap %p is not in cache.\n", oap); + rc = -EBUSY; + } else if (!cfs_list_empty(&oap->oap_pending_item)) { + ext = osc_extent_lookup(obj, oap2cl_page(oap)->cp_index); + /* only truncated pages are allowed to be taken out. + * See osc_extent_truncate() and osc_cache_truncate_start() + * for details. */ + if (ext != NULL && ext->oe_state != OES_TRUNC) { + OSC_EXTENT_DUMP(D_ERROR, ext, "trunc at %lu.\n", + oap2cl_page(oap)->cp_index); + rc = -EBUSY; + } } - osc_list_maint(cli, obj); - OSC_IO_DEBUG(obj, "oap %p page %p torn down\n", oap, oap->oap_page); -out: - client_obd_list_unlock(&cli->cl_loi_list_lock); + osc_object_unlock(obj); + if (ext != NULL) + osc_extent_put(env, ext); RETURN(rc); } -/* aka (~was & now & flag), but this is more clear :) */ -#define SETTING(was, now, flag) (!(was & flag) && (now & flag)) - -int osc_set_async_flags(struct osc_object *obj, struct osc_page *opg, - obd_flag async_flags) +/** + * This is called when a page is picked up by kernel to write out. + * + * We should find out the corresponding extent and add the whole extent + * into urgent list. The extent may be being truncated or used, handle it + * carefully. + */ +int osc_flush_async_page(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops) { - struct osc_async_page *oap = &opg->ops_oap; - struct osc_oap_pages *lop; - int flags = 0; + struct osc_extent *ext = NULL; + struct osc_object *obj = cl2osc(ops->ops_cl.cpl_obj); + struct cl_page *cp = ops->ops_cl.cpl_page; + pgoff_t index = cp->cp_index; + struct osc_async_page *oap = &ops->ops_oap; + int unplug = 0; + int rc = 0; ENTRY; - LASSERT(!cfs_list_empty(&oap->oap_pending_item)); - - if (oap->oap_cmd & OBD_BRW_WRITE) { - lop = &obj->oo_write_pages; - } else { - lop = &obj->oo_read_pages; + osc_object_lock(obj); + ext = osc_extent_lookup(obj, index); + if (ext == NULL) { + osc_extent_tree_dump(D_ERROR, obj); + LASSERTF(0, "page index %lu is NOT covered.\n", index); } - if ((oap->oap_async_flags & async_flags) == async_flags) - RETURN(0); + switch (ext->oe_state) { + case OES_RPC: + case OES_LOCK_DONE: + CL_PAGE_DEBUG(D_ERROR, env, cl_page_top(cp), + "flush an in-rpc page?\n"); + LASSERT(0); + break; + case OES_LOCKING: + /* If we know this extent is being written out, we should abort + * so that the writer can make this page ready. Otherwise, there + * exists a deadlock problem because other process can wait for + * page writeback bit holding page lock; and meanwhile in + * vvp_page_make_ready(), we need to grab page lock before + * really sending the RPC. */ + case OES_TRUNC: + /* race with truncate, page will be redirtied */ + GOTO(out, rc = -EAGAIN); + default: + break; + } - if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY)) - flags |= ASYNC_READY; + rc = cl_page_prep(env, io, cl_page_top(cp), CRT_WRITE); + if (rc) + GOTO(out, rc); - if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) && - cfs_list_empty(&oap->oap_rpc_item)) { - if (oap->oap_async_flags & ASYNC_HP) - cfs_list_add(&oap->oap_urgent_item, &lop->oop_urgent); - else - cfs_list_add_tail(&oap->oap_urgent_item, - &lop->oop_urgent); - flags |= ASYNC_URGENT; - osc_list_maint(oap->oap_cli, obj); - } cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags |= flags; + oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT; cfs_spin_unlock(&oap->oap_lock); - OSC_IO_DEBUG(obj, "oap %p page %p has flags %x\n", oap, - oap->oap_page, oap->oap_async_flags); - RETURN(0); + if (cfs_memory_pressure_get()) + ext->oe_memalloc = 1; + + ext->oe_urgent = 1; + if (ext->oe_state == OES_CACHE && cfs_list_empty(&ext->oe_link)) { + OSC_EXTENT_DUMP(D_CACHE, ext, + "flush page %p make it urgent.\n", oap); + cfs_list_add_tail(&ext->oe_link, &obj->oo_urgent_exts); + unplug = 1; + } + rc = 0; + EXIT; + +out: + osc_object_unlock(obj); + osc_extent_put(env, ext); + if (unplug) + osc_io_unplug_async(env, osc_cli(obj), obj); + return rc; } /** @@ -1162,75 +2436,422 @@ int osc_set_async_flags(struct osc_object *obj, struct osc_page *opg, int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops) { struct osc_async_page *oap = &ops->ops_oap; - int rc = -EBUSY; + struct osc_object *obj = oap->oap_obj; + struct client_obd *cli = osc_cli(obj); + struct osc_extent *ext; + struct osc_extent *found = NULL; + cfs_list_t *plist; + pgoff_t index = oap2cl_page(oap)->cp_index; + int rc = -EBUSY; + int cmd; ENTRY; LASSERT(!oap->oap_interrupted); oap->oap_interrupted = 1; - /* ok, it's been put in an rpc. only one oap gets a request reference */ - if (oap->oap_request != NULL) { - ptlrpc_mark_interrupted(oap->oap_request); - ptlrpcd_wake(oap->oap_request); - ptlrpc_req_finished(oap->oap_request); - oap->oap_request = NULL; + /* Find out the caching extent */ + osc_object_lock(obj); + if (oap->oap_cmd & OBD_BRW_WRITE) { + plist = &obj->oo_urgent_exts; + cmd = OBD_BRW_WRITE; + } else { + plist = &obj->oo_reading_exts; + cmd = OBD_BRW_READ; + } + cfs_list_for_each_entry(ext, plist, oe_link) { + if (ext->oe_start <= index && ext->oe_end >= index) { + LASSERT(ext->oe_state == OES_LOCK_DONE); + /* For OES_LOCK_DONE state extent, it has already held + * a refcount for RPC. */ + found = osc_extent_get(ext); + break; + } + } + if (found != NULL) { + cfs_list_del_init(&found->oe_link); + osc_update_pending(obj, cmd, -found->oe_nr_pages); + osc_object_unlock(obj); + + osc_extent_finish(env, found, 0, -EINTR); + osc_extent_put(env, found); + rc = 0; + } else { + osc_object_unlock(obj); + /* ok, it's been put in an rpc. only one oap gets a request + * reference */ + if (oap->oap_request != NULL) { + ptlrpc_mark_interrupted(oap->oap_request); + ptlrpcd_wake(oap->oap_request); + ptlrpc_req_finished(oap->oap_request); + oap->oap_request = NULL; + } } - /* - * page completion may be called only if ->cpo_prep() method was - * executed by osc_io_submit(), that also adds page the to pending list - */ - if (!cfs_list_empty(&oap->oap_pending_item)) { - struct osc_oap_pages *lop; - struct osc_object *osc = oap->oap_obj; + osc_list_maint(cli, obj); + RETURN(rc); +} - cfs_list_del_init(&oap->oap_pending_item); - cfs_list_del_init(&oap->oap_urgent_item); +int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj, + cfs_list_t *list, int cmd, int brw_flags) +{ + struct client_obd *cli = osc_cli(obj); + struct osc_extent *ext; + struct osc_async_page *oap; + int page_count = 0; + int mppr = cli->cl_max_pages_per_rpc; + pgoff_t start = CL_PAGE_EOF; + pgoff_t end = 0; + ENTRY; - lop = (oap->oap_cmd & OBD_BRW_WRITE) ? - &osc->oo_write_pages : &osc->oo_read_pages; - lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1); - osc_list_maint(oap->oap_cli, osc); - rc = osc_completion(env, oap, oap->oap_cmd, NULL, -EINTR); + cfs_list_for_each_entry(oap, list, oap_pending_item) { + struct cl_page *cp = oap2cl_page(oap); + if (cp->cp_index > end) + end = cp->cp_index; + if (cp->cp_index < start) + start = cp->cp_index; + ++page_count; + mppr <<= (page_count > mppr); } - RETURN(rc); + ext = osc_extent_alloc(obj); + if (ext == NULL) { + cfs_list_for_each_entry(oap, list, oap_pending_item) { + cfs_list_del_init(&oap->oap_pending_item); + osc_ap_completion(env, cli, oap, 0, -ENOMEM); + } + RETURN(-ENOMEM); + } + + ext->oe_rw = !!(cmd & OBD_BRW_READ); + ext->oe_urgent = 1; + ext->oe_start = start; + ext->oe_end = ext->oe_max_end = end; + ext->oe_obj = obj; + ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK); + ext->oe_nr_pages = page_count; + ext->oe_mppr = mppr; + cfs_list_splice_init(list, &ext->oe_pages); + + osc_object_lock(obj); + /* Reuse the initial refcount for RPC, don't drop it */ + osc_extent_state_set(ext, OES_LOCK_DONE); + if (cmd & OBD_BRW_WRITE) { + cfs_list_add_tail(&ext->oe_link, &obj->oo_urgent_exts); + osc_update_pending(obj, OBD_BRW_WRITE, page_count); + } else { + cfs_list_add_tail(&ext->oe_link, &obj->oo_reading_exts); + osc_update_pending(obj, OBD_BRW_READ, page_count); + } + osc_object_unlock(obj); + + osc_io_unplug(env, cli, obj, PDL_POLICY_ROUND); + RETURN(0); } -int osc_queue_sync_page(const struct lu_env *env, struct osc_page *opg, - int cmd, int brw_flags) +/** + * Called by osc_io_setattr_start() to freeze and destroy covering extents. + */ +int osc_cache_truncate_start(const struct lu_env *env, struct osc_io *oio, + struct osc_object *obj, __u64 size) { - struct osc_async_page *oap = &opg->ops_oap; - struct client_obd *cli = oap->oap_cli; - int flags = 0; + struct client_obd *cli = osc_cli(obj); + struct osc_extent *ext; + struct osc_extent *waiting = NULL; + pgoff_t index; + CFS_LIST_HEAD(list); + int result = 0; ENTRY; - oap->oap_cmd = cmd; - oap->oap_page_off = opg->ops_from; - oap->oap_count = opg->ops_to - opg->ops_from; - oap->oap_brw_flags = OBD_BRW_SYNC | brw_flags; + /* pages with index greater or equal to index will be truncated. */ + index = cl_index(osc2cl(obj), size + CFS_PAGE_SIZE - 1); + +again: + osc_object_lock(obj); + ext = osc_extent_search(obj, index); + if (ext == NULL) + ext = first_extent(obj); + else if (ext->oe_end < index) + ext = next_extent(ext); + while (ext != NULL) { + EASSERT(ext->oe_state != OES_TRUNC, ext); + + if (ext->oe_state > OES_CACHE || ext->oe_urgent) { + /* if ext is in urgent state, it means there must exist + * a page already having been flushed by write_page(). + * We have to wait for this extent because we can't + * truncate that page. */ + LASSERT(!ext->oe_hp); + OSC_EXTENT_DUMP(D_CACHE, ext, + "waiting for busy extent\n"); + waiting = osc_extent_get(ext); + break; + } - /* Give a hint to OST that requests are coming from kswapd - bug19529 */ - if (cfs_memory_pressure_get()) - oap->oap_brw_flags |= OBD_BRW_MEMALLOC; + osc_extent_get(ext); + if (ext->oe_state == OES_ACTIVE) { + /* though we grab inode mutex for write path, but we + * release it before releasing extent(in osc_io_end()), + * so there is a race window that an extent is still + * in OES_ACTIVE when truncate starts. */ + LASSERT(!ext->oe_trunc_pending); + ext->oe_trunc_pending = 1; + } else { + EASSERT(ext->oe_state == OES_CACHE, ext); + osc_extent_state_set(ext, OES_TRUNC); + osc_update_pending(obj, OBD_BRW_WRITE, + -ext->oe_nr_pages); + } + EASSERT(cfs_list_empty(&ext->oe_link), ext); + cfs_list_add_tail(&ext->oe_link, &list); - if (!client_is_remote(osc_export(cl2osc(opg->ops_cl.cpl_obj))) && - cfs_capable(CFS_CAP_SYS_RESOURCE)) { - oap->oap_brw_flags |= OBD_BRW_NOQUOTA; - oap->oap_cmd |= OBD_BRW_NOQUOTA; + ext = next_extent(ext); } + osc_object_unlock(obj); + + osc_list_maint(cli, obj); - if (oap->oap_cmd & OBD_BRW_READ) - flags = ASYNC_COUNT_STABLE; - else if (!(oap->oap_brw_page.flag & OBD_BRW_FROM_GRANT)) - osc_enter_cache_try(env, cli, oap, 1); + while (!cfs_list_empty(&list)) { + int rc; - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags |= OSC_FLAGS | flags; - cfs_spin_unlock(&oap->oap_lock); + ext = cfs_list_entry(list.next, struct osc_extent, oe_link); + cfs_list_del_init(&ext->oe_link); - osc_oap_to_pending(oap); - RETURN(0); + /* extent may be in OES_ACTIVE state because inode mutex + * is released before osc_io_end() in file write case */ + if (ext->oe_state != OES_TRUNC) + osc_extent_wait(env, ext, OES_TRUNC); + + rc = osc_extent_truncate(ext, index); + if (rc < 0) { + if (result == 0) + result = rc; + + OSC_EXTENT_DUMP(D_ERROR, ext, + "truncate error %d\n", rc); + } else if (ext->oe_nr_pages == 0) { + osc_extent_remove(ext); + } else { + /* this must be an overlapped extent which means only + * part of pages in this extent have been truncated. + */ + EASSERTF(ext->oe_start < index, ext, + "trunc index = %lu.\n", index); + /* fix index to skip this partially truncated extent */ + index = ext->oe_end + 1; + + /* we need to hold this extent in OES_TRUNC state so + * that no writeback will happen. This is to avoid + * BUG 17397. */ + LASSERT(oio->oi_trunc == NULL); + oio->oi_trunc = osc_extent_get(ext); + OSC_EXTENT_DUMP(D_CACHE, ext, + "trunc at "LPU64"\n", size); + } + osc_extent_put(env, ext); + } + if (waiting != NULL) { + if (result == 0) + result = osc_extent_wait(env, waiting, OES_INV); + + osc_extent_put(env, waiting); + waiting = NULL; + if (result == 0) + goto again; + } + RETURN(result); +} + +/** + * Called after osc_io_setattr_end to add oio->oi_trunc back to cache. + */ +void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio, + struct osc_object *obj) +{ + struct osc_extent *ext = oio->oi_trunc; + + oio->oi_trunc = NULL; + if (ext != NULL) { + EASSERT(ext->oe_nr_pages > 0, ext); + EASSERT(ext->oe_state == OES_TRUNC, ext); + EASSERT(!ext->oe_urgent, ext); + + OSC_EXTENT_DUMP(D_CACHE, ext, "trunc -> cache.\n"); + osc_object_lock(obj); + osc_extent_state_set(ext, OES_CACHE); + osc_update_pending(obj, OBD_BRW_WRITE, ext->oe_nr_pages); + osc_object_unlock(obj); + osc_extent_put(env, ext); + + osc_list_maint(osc_cli(obj), obj); + } +} + +/** + * Wait for extents in a specific range to be written out. + * The caller must have called osc_cache_writeback_range() to issue IO + * otherwise it will take a long time for this function to finish. + * + * Caller must hold inode_mutex and i_alloc_sem, or cancel exclusive + * dlm lock so that nobody else can dirty this range of file while we're + * waiting for extents to be written. + */ +int osc_cache_wait_range(const struct lu_env *env, struct osc_object *obj, + pgoff_t start, pgoff_t end) +{ + struct osc_extent *ext; + pgoff_t index = start; + int result = 0; + ENTRY; + +again: + osc_object_lock(obj); + ext = osc_extent_search(obj, index); + if (ext == NULL) + ext = first_extent(obj); + else if (ext->oe_end < index) + ext = next_extent(ext); + while (ext != NULL) { + int rc; + + if (ext->oe_start > end) + break; + + if (!ext->oe_fsync_wait) { + ext = next_extent(ext); + continue; + } + + EASSERT(ergo(ext->oe_state == OES_CACHE, + ext->oe_hp || ext->oe_urgent), ext); + EASSERT(ergo(ext->oe_state == OES_ACTIVE, + !ext->oe_hp && ext->oe_urgent), ext); + + index = ext->oe_end + 1; + osc_extent_get(ext); + osc_object_unlock(obj); + + rc = osc_extent_wait(env, ext, OES_INV); + if (result == 0) + result = rc; + osc_extent_put(env, ext); + goto again; + } + osc_object_unlock(obj); + + OSC_IO_DEBUG(obj, "sync file range.\n"); + RETURN(result); +} + +/** + * Called to write out a range of osc object. + * + * @hp : should be set this is caused by lock cancel; + * @discard: is set if dirty pages should be dropped - file will be deleted or + * truncated, this implies there is no partially discarding extents. + * + * Return how many pages will be issued, or error code if error occurred. + */ +int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj, + pgoff_t start, pgoff_t end, int hp, int discard) +{ + struct osc_extent *ext; + CFS_LIST_HEAD(discard_list); + int unplug = 0; + int result = 0; + ENTRY; + + osc_object_lock(obj); + ext = osc_extent_search(obj, start); + if (ext == NULL) + ext = first_extent(obj); + else if (ext->oe_end < start) + ext = next_extent(ext); + while (ext != NULL) { + if (ext->oe_start > end) + break; + + ext->oe_fsync_wait = 1; + switch (ext->oe_state) { + case OES_CACHE: + result += ext->oe_nr_pages; + if (!discard) { + cfs_list_t *list = NULL; + if (hp) { + EASSERT(!ext->oe_hp, ext); + ext->oe_hp = 1; + list = &obj->oo_hp_exts; + } else if (!ext->oe_urgent) { + ext->oe_urgent = 1; + list = &obj->oo_urgent_exts; + } + if (list != NULL) { + cfs_list_move_tail(&ext->oe_link, list); + unplug = 1; + } + } else { + /* the only discarder is lock cancelling, so + * [start, end] must contain this extent */ + EASSERT(ext->oe_start >= start && + ext->oe_max_end <= end, ext); + osc_extent_state_set(ext, OES_LOCKING); + ext->oe_owner = cfs_current(); + cfs_list_move_tail(&ext->oe_link, + &discard_list); + osc_update_pending(obj, OBD_BRW_WRITE, + -ext->oe_nr_pages); + } + break; + case OES_ACTIVE: + /* It's pretty bad to wait for ACTIVE extents, because + * we don't know how long we will wait for it to be + * flushed since it may be blocked at awaiting more + * grants. We do this for the correctness of fsync. */ + LASSERT(hp == 0 && discard == 0); + ext->oe_urgent = 1; + default: + break; + } + ext = next_extent(ext); + } + osc_object_unlock(obj); + + LASSERT(ergo(!discard, cfs_list_empty(&discard_list))); + if (!cfs_list_empty(&discard_list)) { + struct osc_extent *tmp; + int rc; + + osc_list_maint(osc_cli(obj), obj); + cfs_list_for_each_entry_safe(ext, tmp, &discard_list, oe_link) { + cfs_list_del_init(&ext->oe_link); + EASSERT(ext->oe_state == OES_LOCKING, ext); + + /* Discard caching pages. We don't actually write this + * extent out but we complete it as if we did. */ + rc = osc_extent_make_ready(env, ext); + if (unlikely(rc < 0)) { + OSC_EXTENT_DUMP(D_ERROR, ext, + "make_ready returned %d\n", rc); + if (result >= 0) + result = rc; + } + + /* finish the extent as if the pages were sent */ + osc_extent_finish(env, ext, 0, 0); + } + } + + if (unplug) + osc_io_unplug(env, osc_cli(obj), obj, PDL_POLICY_ROUND); + + if (hp || discard) { + int rc; + rc = osc_cache_wait_range(env, obj, start, end); + if (result >= 0 && rc < 0) + result = rc; + } + + OSC_IO_DEBUG(obj, "cache page out.\n"); + RETURN(result); } /** @} osc */ diff --git a/lustre/osc/osc_cl_internal.h b/lustre/osc/osc_cl_internal.h index 3094ca2..54ded82 100644 --- a/lustre/osc/osc_cl_internal.h +++ b/lustre/osc/osc_cl_internal.h @@ -40,6 +40,7 @@ * Internal interfaces of OSC layer. * * Author: Nikita Danilov + * Author: Jinshan Xiong */ #ifndef OSC_CL_INTERNAL_H @@ -61,6 +62,8 @@ * @{ */ +struct osc_extent; + /** * State maintained by osc layer for each IO context. */ @@ -69,6 +72,12 @@ struct osc_io { struct cl_io_slice oi_cl; /** true if this io is lockless. */ int oi_lockless; + /** active extents, we know how many bytes is going to be written, + * so having an active extent will prevent it from being fragmented */ + struct osc_extent *oi_active; + /** partially truncated extent, we need to hold this extent to prevent + * page writeback from happening. */ + struct osc_extent *oi_trunc; struct obd_info oi_info; struct obdo oi_oa; @@ -99,24 +108,9 @@ struct osc_thread_info { struct cl_attr oti_attr; struct lustre_handle oti_handle; struct cl_page_list oti_plist; + struct cl_io oti_io; }; -/** - * Manage osc_async_page - */ -struct osc_oap_pages { - cfs_list_t oop_pending; - cfs_list_t oop_urgent; - int oop_num_pending; -}; - -static inline void osc_oap_pages_init(struct osc_oap_pages *list) -{ - CFS_INIT_LIST_HEAD(&list->oop_pending); - CFS_INIT_LIST_HEAD(&list->oop_urgent); - list->oop_num_pending = 0; -} - struct osc_object { struct cl_object oo_cl; struct lov_oinfo *oo_oinfo; @@ -144,16 +138,55 @@ struct osc_object { cfs_spinlock_t oo_seatbelt; /** - * used by the osc to keep track of what objects to build into rpcs + * used by the osc to keep track of what objects to build into rpcs. + * Protected by client_obd->cli_loi_list_lock. + */ + cfs_list_t oo_ready_item; + cfs_list_t oo_hp_ready_item; + cfs_list_t oo_write_item; + cfs_list_t oo_read_item; + + /** + * extent is a red black tree to manage (async) dirty pages. */ - struct osc_oap_pages oo_read_pages; - struct osc_oap_pages oo_write_pages; - cfs_list_t oo_ready_item; - cfs_list_t oo_hp_ready_item; - cfs_list_t oo_write_item; - cfs_list_t oo_read_item; + struct rb_root oo_root; + /** + * Manage write(dirty) extents. + */ + cfs_list_t oo_hp_exts; /* list of hp extents */ + cfs_list_t oo_urgent_exts; /* list of writeback extents */ + cfs_list_t oo_rpc_exts; + + cfs_list_t oo_reading_exts; + + cfs_atomic_t oo_nr_reads; + cfs_atomic_t oo_nr_writes; + + /** Protect extent tree. Will be used to protect + * oo_{read|write}_pages soon. */ + cfs_spinlock_t oo_lock; }; +static inline void osc_object_lock(struct osc_object *obj) +{ + cfs_spin_lock(&obj->oo_lock); +} + +static inline int osc_object_trylock(struct osc_object *obj) +{ + return cfs_spin_trylock(&obj->oo_lock); +} + +static inline void osc_object_unlock(struct osc_object *obj) +{ + cfs_spin_unlock(&obj->oo_lock); +} + +static inline int osc_object_is_locked(struct osc_object *obj) +{ + return cfs_spin_is_locked(&obj->oo_lock); +} + /* * Lock "micro-states" for osc layer. */ @@ -361,6 +394,7 @@ extern cfs_mem_cache_t *osc_object_kmem; extern cfs_mem_cache_t *osc_thread_kmem; extern cfs_mem_cache_t *osc_session_kmem; extern cfs_mem_cache_t *osc_req_kmem; +extern cfs_mem_cache_t *osc_extent_kmem; extern struct lu_device_type osc_device_type; extern struct lu_context_key osc_key; @@ -389,22 +423,29 @@ void osc_index2policy (ldlm_policy_data_t *policy, const struct cl_object *obj, int osc_lvb_print (const struct lu_env *env, void *cookie, lu_printer_t p, const struct ost_lvb *lvb); -void osc_io_submit_page(const struct lu_env *env, - struct osc_io *oio, struct osc_page *opg, - enum cl_req_type crt); -void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, - struct obdo *oa, struct osc_async_page *oap, - int sent, int rc); +void osc_page_submit(const struct lu_env *env, struct osc_page *opg, + enum cl_req_type crt, int brw_flags); int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops); int osc_set_async_flags(struct osc_object *obj, struct osc_page *opg, obd_flag async_flags); int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops, cfs_page_t *page, loff_t offset); -int osc_queue_async_io(const struct lu_env *env, struct osc_page *ops); -int osc_teardown_async_page(struct osc_object *obj, +int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops); +int osc_teardown_async_page(const struct lu_env *env, struct osc_object *obj, struct osc_page *ops); -int osc_queue_sync_page(const struct lu_env *env, struct osc_page *ops, - int cmd, int brw_flags); +int osc_flush_async_page(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops); +int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj, + cfs_list_t *list, int cmd, int brw_flags); +int osc_cache_truncate_start(const struct lu_env *env, struct osc_io *oio, + struct osc_object *obj, __u64 size); +void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio, + struct osc_object *obj); +int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj, + pgoff_t start, pgoff_t end, int hp, int discard); +int osc_cache_wait_range(const struct lu_env *env, struct osc_object *obj, + pgoff_t start, pgoff_t end); void osc_io_unplug(const struct lu_env *env, struct client_obd *cli, struct osc_object *osc, pdl_policy_t pol); @@ -459,12 +500,22 @@ static inline struct obd_export *osc_export(const struct osc_object *obj) return lu2osc_dev(obj->oo_cl.co_lu.lo_dev)->od_exp; } +static inline struct client_obd *osc_cli(const struct osc_object *obj) +{ + return &osc_export(obj)->exp_obd->u.cli; +} + static inline struct osc_object *cl2osc(const struct cl_object *obj) { LINVRNT(osc_is_object(&obj->co_lu)); return container_of0(obj, struct osc_object, oo_cl); } +static inline struct cl_object *osc2cl(const struct osc_object *obj) +{ + return (struct cl_object *)&obj->oo_cl; +} + static inline ldlm_mode_t osc_cl_lock2ldlm(enum cl_lock_mode mode) { LASSERT(mode == CLM_READ || mode == CLM_WRITE || mode == CLM_GROUP); @@ -493,6 +544,21 @@ static inline struct osc_page *cl2osc_page(const struct cl_page_slice *slice) return container_of0(slice, struct osc_page, ops_cl); } +static inline struct osc_page *oap2osc(struct osc_async_page *oap) +{ + return container_of0(oap, struct osc_page, ops_oap); +} + +static inline struct cl_page *oap2cl_page(struct osc_async_page *oap) +{ + return oap2osc(oap)->ops_cl.cpl_page; +} + +static inline struct osc_page *oap2osc_page(struct osc_async_page *oap) +{ + return (struct osc_page *)container_of(oap, struct osc_page, ops_oap); +} + static inline struct osc_lock *cl2osc_lock(const struct cl_lock_slice *slice) { LINVRNT(osc_is_object(&slice->cls_obj->co_lu)); @@ -509,6 +575,106 @@ static inline int osc_io_srvlock(struct osc_io *oio) return (oio->oi_lockless && !oio->oi_cl.cis_io->ci_no_srvlock); } +enum osc_extent_state { + OES_INV = 0, /** extent is just initialized or destroyed */ + OES_ACTIVE = 1, /** process is using this extent */ + OES_CACHE = 2, /** extent is ready for IO */ + OES_LOCKING = 3, /** locking page to prepare IO */ + OES_LOCK_DONE = 4, /** locking finished, ready to send */ + OES_RPC = 5, /** in RPC */ + OES_TRUNC = 6, /** being truncated */ + OES_STATE_MAX +}; +#define OES_STRINGS { "inv", "active", "cache", "locking", "lockdone", "rpc", \ + "trunc", NULL } + +/** + * osc_extent data to manage dirty pages. + * osc_extent has the following attributes: + * 1. all pages in the same must be in one RPC in write back; + * 2. # of pages must be less than max_pages_per_rpc - implied by 1; + * 3. must be covered by only 1 osc_lock; + * 4. exclusive. It's impossible to have overlapped osc_extent. + * + * The lifetime of an extent is from when the 1st page is dirtied to when + * all pages inside it are written out. + * + * LOCKING ORDER + * ============= + * page lock -> client_obd_list_lock -> object lock(osc_object::oo_lock) + */ +struct osc_extent { + /** red-black tree node */ + struct rb_node oe_node; + /** osc_object of this extent */ + struct osc_object *oe_obj; + /** refcount, removed from red-black tree if reaches zero. */ + cfs_atomic_t oe_refc; + /** busy if non-zero */ + cfs_atomic_t oe_users; + /** link list of osc_object's oo_{hp|urgent|locking}_exts. */ + cfs_list_t oe_link; + /** state of this extent */ + unsigned int oe_state; + /** flags for this extent. */ + unsigned int oe_intree:1, + /** 0 is write, 1 is read */ + oe_rw:1, + oe_srvlock:1, + oe_memalloc:1, + /** an ACTIVE extent is going to be truncated, so when this extent + * is released, it will turn into TRUNC state instead of CACHE. */ + oe_trunc_pending:1, + /** this extent should be written asap and someone may wait for the + * write to finish. This bit is usually set along with urgent if + * the extent was CACHE state. + * fsync_wait extent can't be merged because new extent region may + * exceed fsync range. */ + oe_fsync_wait:1, + /** covering lock is being canceled */ + oe_hp:1, + /** this extent should be written back asap. set if one of pages is + * called by page WB daemon, or sync write or reading requests. */ + oe_urgent:1; + /** how many grants allocated for this extent. + * Grant allocated for this extent. There is no grant allocated + * for reading extents and sync write extents. */ + unsigned int oe_grants; + /** # of dirty pages in this extent */ + unsigned int oe_nr_pages; + /** list of pending oap pages. Pages in this list are NOT sorted. */ + cfs_list_t oe_pages; + /** Since an extent has to be written out in atomic, this is used to + * remember the next page need to be locked to write this extent out. + * Not used right now. + */ + struct osc_page *oe_next_page; + /** start and end index of this extent, include start and end + * themselves. Page offset here is the page index of osc_pages. + * oe_start is used as keyword for red-black tree. */ + pgoff_t oe_start; + pgoff_t oe_end; + /** maximum ending index of this extent, this is limited by + * max_pages_per_rpc, lock extent and chunk size. */ + pgoff_t oe_max_end; + /** waitqueue - for those who want to be notified if this extent's + * state has changed. */ + cfs_waitq_t oe_waitq; + /** lock covering this extent */ + struct cl_lock *oe_osclock; + /** terminator of this extent. Must be true if this extent is in IO. */ + cfs_task_t *oe_owner; + /** return value of writeback. If somebody is waiting for this extent, + * this value can be known by outside world. */ + int oe_rc; + /** max pages per rpc when this extent was created */ + unsigned int oe_mppr; +}; + +int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext, + int sent, int rc); +int osc_extent_release(const struct lu_env *env, struct osc_extent *ext); + /** @} osc */ #endif /* OSC_CL_INTERNAL_H */ diff --git a/lustre/osc/osc_dev.c b/lustre/osc/osc_dev.c index 7f5174a..281ea61 100644 --- a/lustre/osc/osc_dev.c +++ b/lustre/osc/osc_dev.c @@ -53,6 +53,7 @@ cfs_mem_cache_t *osc_object_kmem; cfs_mem_cache_t *osc_thread_kmem; cfs_mem_cache_t *osc_session_kmem; cfs_mem_cache_t *osc_req_kmem; +cfs_mem_cache_t *osc_extent_kmem; struct lu_kmem_descr osc_caches[] = { { @@ -86,6 +87,11 @@ struct lu_kmem_descr osc_caches[] = { .ckd_size = sizeof (struct osc_req) }, { + .ckd_cache = &osc_extent_kmem, + .ckd_name = "osc_extent_kmem", + .ckd_size = sizeof (struct osc_extent) + }, + { .ckd_cache = NULL } }; diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index 3f81c7b..d3ebac1 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -57,7 +57,6 @@ struct osc_async_page { unsigned short oap_interrupted:1; cfs_list_t oap_pending_item; - cfs_list_t oap_urgent_item; cfs_list_t oap_rpc_item; obd_off oap_obj_off; @@ -82,6 +81,7 @@ struct osc_cache_waiter { cfs_list_t ocw_entry; cfs_waitq_t ocw_waitq; struct osc_async_page *ocw_oap; + int ocw_grant; int ocw_rc; }; @@ -142,10 +142,8 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *cfg); int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, - cfs_list_t *rpc_list, int page_count, int cmd, - pdl_policy_t p); + cfs_list_t *ext_list, int cmd, pdl_policy_t p); -struct cl_page *osc_oap2cl_page(struct osc_async_page *oap); extern cfs_spinlock_t osc_ast_guard; int osc_cleanup(struct obd_device *obd); diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 5818b8d..a94ac61 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -88,11 +88,6 @@ static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io) { } -struct cl_page *osc_oap2cl_page(struct osc_async_page *oap) -{ - return container_of(oap, struct osc_page, ops_oap)->ops_cl.cpl_page; -} - /** * An implementation of cl_io_operations::cio_io_submit() method for osc * layer. Iterates over pages in the in-queue, prepares each for io by calling @@ -102,32 +97,43 @@ struct cl_page *osc_oap2cl_page(struct osc_async_page *oap) */ static int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios, - enum cl_req_type crt, struct cl_2queue *queue, - enum cl_req_priority priority) + enum cl_req_type crt, struct cl_2queue *queue) { + struct obd_export *exp; struct cl_page *page; struct cl_page *tmp; - struct osc_object *osc0 = NULL; struct client_obd *cli = NULL; struct osc_object *osc = NULL; /* to keep gcc happy */ struct osc_page *opg; struct cl_io *io; + CFS_LIST_HEAD (list); - struct cl_page_list *qin = &queue->c2_qin; - struct cl_page_list *qout = &queue->c2_qout; - int queued = 0; - int result = 0; + struct cl_page_list *qin = &queue->c2_qin; + struct cl_page_list *qout = &queue->c2_qout; + int queued = 0; + int result = 0; + int cmd; + int brw_flags; + int max_pages; - LASSERT(qin->pl_nr > 0); + LASSERT(qin->pl_nr > 0); + + CDEBUG(D_CACHE, "%d %d\n", qin->pl_nr, crt); + + osc = cl2osc(ios->cis_obj); + exp = osc_export(osc); + cli = osc_cli(osc); + max_pages = cli->cl_max_pages_per_rpc; + + cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ; + brw_flags = osc_io_srvlock(cl2osc_io(env, ios)) ? OBD_BRW_SRVLOCK : 0; - CDEBUG(D_INFO, "%d %d\n", qin->pl_nr, crt); /* * NOTE: here @page is a top-level page. This is done to avoid * creation of sub-page-list. */ cl_page_list_for_each_safe(page, tmp, qin) { struct osc_async_page *oap; - struct obd_export *exp; /* Top level IO. */ io = page->cp_owner; @@ -135,52 +141,18 @@ static int osc_io_submit(const struct lu_env *env, opg = osc_cl_page_osc(page); oap = &opg->ops_oap; - osc = cl2osc(opg->ops_cl.cpl_obj); - exp = osc_export(osc); + LASSERT(osc == oap->oap_obj); - if (priority > CRP_NORMAL) { - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags |= ASYNC_HP; - cfs_spin_unlock(&oap->oap_lock); - } - - if (osc0 == NULL) { /* first iteration */ - cli = &exp->exp_obd->u.cli; - osc0 = osc; - client_obd_list_lock(&cli->cl_loi_list_lock); - } else /* check that all pages are against the same object - * (for now) */ - LASSERT(osc == osc0); - - if (!cfs_list_empty(&oap->oap_urgent_item) || - !cfs_list_empty(&oap->oap_rpc_item)) { + if (!cfs_list_empty(&oap->oap_pending_item) || + !cfs_list_empty(&oap->oap_rpc_item)) { + CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n", + oap, opg); result = -EBUSY; break; } result = cl_page_prep(env, io, page, crt); - if (result == 0) { - ++queued; - cl_page_list_move(qout, qin, page); - if (cfs_list_empty(&oap->oap_pending_item)) { - osc_io_submit_page(env, cl2osc_io(env, ios), - opg, crt); - } else { - result = osc_set_async_flags(osc, opg, - OSC_FLAGS); - /* - * bug 18881: we can't just break out here when - * error occurs after cl_page_prep has been - * called against the page. The correct - * way is to call page's completion routine, - * as in osc_oap_interrupted. For simplicity, - * we just force osc_set_async_flags() to - * not return error. - */ - LASSERT(result == 0); - } - opg->ops_submit_time = cfs_time_current(); - } else { + if (result != 0) { LASSERT(result < 0); if (result != -EALREADY) break; @@ -190,30 +162,29 @@ static int osc_io_submit(const struct lu_env *env, * is not dirty. */ result = 0; + continue; } - /* - * We might hold client_obd_list_lock() for too long and cause - * soft-lockups (see bug 16651). But on the other hand, pages - * are queued here with ASYNC_URGENT flag, thus will be sent - * out immediately once osc_io_unplug() be called, possibly - * resulting sub-optimal RPCs. - * - * We think creating optimal-sized RPCs is more important than - * avoiding the transient soft-lockups, plus I believe the - * soft-locks only happen in full debug testing. - */ - } - - LASSERT(ergo(result == 0, cli != NULL)); - LASSERT(ergo(result == 0, osc == osc0)); - - if (queued > 0) - osc_io_unplug(env, cli, osc, PDL_POLICY_ROUND); - if (osc0) - client_obd_list_unlock(&cli->cl_loi_list_lock); - CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result); - return qout->pl_nr > 0 ? 0 : result; + cl_page_list_move(qout, qin, page); + oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY; + oap->oap_async_flags |= ASYNC_COUNT_STABLE; + + osc_page_submit(env, opg, crt, brw_flags); + cfs_list_add_tail(&oap->oap_pending_item, &list); + if (++queued == max_pages) { + queued = 0; + result = osc_queue_sync_pages(env, osc, &list, cmd, + brw_flags); + if (result < 0) + break; + } + } + + if (queued > 0) + result = osc_queue_sync_pages(env, osc, &list, cmd, brw_flags); + + CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result); + return qout->pl_nr > 0 ? 0 : result; } static void osc_page_touch_at(const struct lu_env *env, @@ -372,64 +343,64 @@ static int osc_async_upcall(void *a, int rc) return 0; } -/* Disable osc_trunc_check() because it is naturally race between read and - * truncate. See bug 20645 for details. - */ -#if 0 && defined(__KERNEL__) +#if defined(__KERNEL__) /** * Checks that there are no pages being written in the extent being truncated. */ +static int trunc_check_cb(const struct lu_env *env, struct cl_io *io, + struct cl_page *page, void *cbdata) +{ + const struct cl_page_slice *slice; + struct osc_page *ops; + struct osc_async_page *oap; + __u64 start = *(__u64 *)cbdata; + + slice = cl_page_at(page, &osc_device_type); + LASSERT(slice != NULL); + ops = cl2osc_page(slice); + oap = &ops->ops_oap; + + if (oap->oap_cmd & OBD_BRW_WRITE && + !cfs_list_empty(&oap->oap_pending_item)) + CL_PAGE_DEBUG(D_ERROR, env, page, "exists " LPU64 "/%s.\n", + start, current->comm); + +#ifdef __linux__ + { + cfs_page_t *vmpage = cl_page_vmpage(env, page); + if (PageLocked(vmpage)) + CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n", + ops, page->cp_index, + (oap->oap_cmd & OBD_BRW_RWMASK)); + } +#endif + + return CLP_GANG_OKAY; +} + static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, - struct osc_io *oio, size_t size) + struct osc_io *oio, __u64 size) { - struct osc_page *cp; - struct osc_object *obj; - struct cl_object *clob; - struct cl_page *page; - struct cl_page_list *list; - int partial; - pgoff_t start; + struct cl_object *clob; + int partial; + pgoff_t start; clob = oio->oi_cl.cis_obj; - obj = cl2osc(clob); start = cl_index(clob, size); partial = cl_offset(clob, start) < size; - list = &osc_env_info(env)->oti_plist; /* * Complain if there are pages in the truncated region. - * - * XXX this is quite expensive check. */ - cl_page_list_init(list); - cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list); - - cl_page_list_for_each(page, list) - CL_PAGE_DEBUG(D_ERROR, env, page, "exists %lu\n", start); - - cl_page_list_disown(env, io, list); - cl_page_list_fini(env, list); - - cfs_spin_lock(&obj->oo_seatbelt); - cfs_list_for_each_entry(cp, &obj->oo_inflight[CRT_WRITE], - ops_inflight) { - page = cp->ops_cl.cpl_page; - if (page->cp_index >= start + partial) { - cfs_task_t *submitter; - - submitter = cp->ops_submitter; - /* - * XXX Linux specific debugging stuff. - */ - CL_PAGE_DEBUG(D_ERROR, env, page, "%s/%d %lu\n", - submitter->comm, submitter->pid, start); - libcfs_debug_dumpstack(submitter); - } - } - cfs_spin_unlock(&obj->oo_seatbelt); + cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, + trunc_check_cb, (void *)&size); } #else /* __KERNEL__ */ -# define osc_trunc_check(env, io, oio, size) do {;} while (0) +static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, + struct osc_io *oio, __u64 size) +{ + return; +} #endif static int osc_io_setattr_start(const struct lu_env *env, @@ -442,34 +413,36 @@ static int osc_io_setattr_start(const struct lu_env *env, struct cl_attr *attr = &osc_env_info(env)->oti_attr; struct obdo *oa = &oio->oi_oa; struct osc_async_cbargs *cbargs = &oio->oi_cbarg; - loff_t size = io->u.ci_setattr.sa_attr.lvb_size; - unsigned int ia_valid = io->u.ci_setattr.sa_valid; - int result = 0; - struct obd_info oinfo = { { { 0 } } }; - - if (ia_valid & ATTR_SIZE) - osc_trunc_check(env, io, oio, size); - - if (oio->oi_lockless == 0) { - cl_object_attr_lock(obj); - result = cl_object_attr_get(env, obj, attr); - if (result == 0) { - unsigned int cl_valid = 0; - - if (ia_valid & ATTR_SIZE) { - attr->cat_size = attr->cat_kms = size; - cl_valid = (CAT_SIZE | CAT_KMS); - } - if (ia_valid & ATTR_MTIME_SET) { - attr->cat_mtime = io->u.ci_setattr.sa_attr.lvb_mtime; - cl_valid |= CAT_MTIME; - } - if (ia_valid & ATTR_ATIME_SET) { - attr->cat_atime = io->u.ci_setattr.sa_attr.lvb_atime; - cl_valid |= CAT_ATIME; - } - if (ia_valid & ATTR_CTIME_SET) { - attr->cat_ctime = io->u.ci_setattr.sa_attr.lvb_ctime; + __u64 size = io->u.ci_setattr.sa_attr.lvb_size; + unsigned int ia_valid = io->u.ci_setattr.sa_valid; + int result = 0; + struct obd_info oinfo = { { { 0 } } }; + + /* truncate cache dirty pages first */ + if (cl_io_is_trunc(io)) + result = osc_cache_truncate_start(env, oio, cl2osc(obj), size); + + if (result == 0 && oio->oi_lockless == 0) { + cl_object_attr_lock(obj); + result = cl_object_attr_get(env, obj, attr); + if (result == 0) { + struct ost_lvb *lvb = &io->u.ci_setattr.sa_attr; + unsigned int cl_valid = 0; + + if (ia_valid & ATTR_SIZE) { + attr->cat_size = attr->cat_kms = size; + cl_valid = (CAT_SIZE | CAT_KMS); + } + if (ia_valid & ATTR_MTIME_SET) { + attr->cat_mtime = lvb->lvb_mtime; + cl_valid |= CAT_MTIME; + } + if (ia_valid & ATTR_ATIME_SET) { + attr->cat_atime = lvb->lvb_atime; + cl_valid |= CAT_ATIME; + } + if (ia_valid & ATTR_CTIME_SET) { + attr->cat_ctime = lvb->lvb_ctime; cl_valid |= CAT_CTIME; } result = cl_object_attr_set(env, obj, attr, cl_valid); @@ -518,8 +491,9 @@ static int osc_io_setattr_start(const struct lu_env *env, static void osc_io_setattr_end(const struct lu_env *env, const struct cl_io_slice *slice) { - struct cl_io *io = slice->cis_io; - struct osc_io *oio = cl2osc_io(env, slice); + struct cl_io *io = slice->cis_io; + struct osc_io *oio = cl2osc_io(env, slice); + struct cl_object *obj = slice->cis_obj; struct osc_async_cbargs *cbargs = &oio->oi_cbarg; int result; @@ -527,7 +501,6 @@ static void osc_io_setattr_end(const struct lu_env *env, result = io->ci_result = cbargs->opc_rc; if (result == 0) { - struct cl_object *obj = slice->cis_obj; if (oio->oi_lockless) { /* lockless truncate */ struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); @@ -537,6 +510,15 @@ static void osc_io_setattr_end(const struct lu_env *env, osd->od_stats.os_lockless_truncates++; } } + + if (cl_io_is_trunc(io)) { + __u64 size = io->u.ci_setattr.sa_attr.lvb_size; + osc_trunc_check(env, io, oio, size); + if (oio->oi_trunc != NULL) { + osc_cache_truncate_end(env, oio, cl2osc(obj)); + oio->oi_trunc = NULL; + } + } } static int osc_io_read_start(const struct lu_env *env, @@ -584,17 +566,15 @@ static int osc_io_write_start(const struct lu_env *env, RETURN(result); } -static int osc_io_fsync_start(const struct lu_env *env, - const struct cl_io_slice *slice) +static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj, + struct cl_fsync_io *fio) { - struct cl_io *io = slice->cis_io; - struct osc_io *oio = cl2osc_io(env, slice); + struct osc_io *oio = osc_env_io(env); struct obdo *oa = &oio->oi_oa; struct obd_info *oinfo = &oio->oi_info; + struct lov_oinfo *loi = obj->oo_oinfo; struct osc_async_cbargs *cbargs = &oio->oi_cbarg; - struct cl_object *obj = slice->cis_obj; - struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; - int result = 0; + int rc = 0; ENTRY; memset(oa, 0, sizeof(*oa)); @@ -602,32 +582,94 @@ static int osc_io_fsync_start(const struct lu_env *env, oa->o_seq = loi->loi_seq; oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; - /* reload size and blocks for start and end of sync range */ - oa->o_size = io->u.ci_fsync.fi_start; - oa->o_blocks = io->u.ci_fsync.fi_end; + /* reload size abd blocks for start and end of sync range */ + oa->o_size = fio->fi_start; + oa->o_blocks = fio->fi_end; oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; - obdo_set_parent_fid(oa, io->u.ci_fsync.fi_fid); + obdo_set_parent_fid(oa, fio->fi_fid); memset(oinfo, 0, sizeof(*oinfo)); oinfo->oi_oa = oa; - oinfo->oi_capa = io->u.ci_fsync.fi_capa; + oinfo->oi_capa = fio->fi_capa; cfs_init_completion(&cbargs->opc_sync); - result = osc_sync_base(osc_export(cl2osc(obj)), oinfo, - osc_async_upcall, cbargs, PTLRPCD_SET); + rc = osc_sync_base(osc_export(obj), oinfo, osc_async_upcall, cbargs, + PTLRPCD_SET); + RETURN(rc); +} + +static int osc_io_fsync_start(const struct lu_env *env, + const struct cl_io_slice *slice) +{ + struct cl_io *io = slice->cis_io; + struct cl_fsync_io *fio = &io->u.ci_fsync; + struct cl_object *obj = slice->cis_obj; + struct osc_object *osc = cl2osc(obj); + pgoff_t start = cl_index(obj, fio->fi_start); + pgoff_t end = cl_index(obj, fio->fi_end); + int result = 0; + ENTRY; + + if (fio->fi_end == OBD_OBJECT_EOF) + end = CL_PAGE_EOF; + + result = osc_cache_writeback_range(env, osc, start, end, 0, + fio->fi_mode == CL_FSYNC_DISCARD); + if (result > 0) { + fio->fi_nr_written += result; + result = 0; + } + if (fio->fi_mode == CL_FSYNC_ALL) { + int rc; + + /* we have to wait for writeback to finish before we can + * send OST_SYNC RPC. This is bad because it causes extents + * to be written osc by osc. However, we usually start + * writeback before CL_FSYNC_ALL so this won't have any real + * problem. */ + rc = osc_cache_wait_range(env, osc, start, end); + if (result == 0) + result = rc; + rc = osc_fsync_ost(env, osc, fio); + if (result == 0) + result = rc; + } + RETURN(result); } static void osc_io_fsync_end(const struct lu_env *env, const struct cl_io_slice *slice) { - struct cl_io *io = slice->cis_io; + struct cl_fsync_io *fio = &slice->cis_io->u.ci_fsync; + struct cl_object *obj = slice->cis_obj; + pgoff_t start = cl_index(obj, fio->fi_start); + pgoff_t end = cl_index(obj, fio->fi_end); + int result = 0; + + if (fio->fi_mode == CL_FSYNC_LOCAL) { + result = osc_cache_wait_range(env, cl2osc(obj), start, end); + } else if (fio->fi_mode == CL_FSYNC_ALL) { + struct osc_io *oio = cl2osc_io(env, slice); + struct osc_async_cbargs *cbargs = &oio->oi_cbarg; + + cfs_wait_for_completion(&cbargs->opc_sync); + if (result == 0) + result = cbargs->opc_rc; + } + slice->cis_io->ci_result = result; +} + +static void osc_io_end(const struct lu_env *env, + const struct cl_io_slice *slice) +{ struct osc_io *oio = cl2osc_io(env, slice); - struct osc_async_cbargs *cbargs = &oio->oi_cbarg; - cfs_wait_for_completion(&cbargs->opc_sync); - io->ci_result = cbargs->opc_rc; + if (oio->oi_active) { + osc_extent_release(env, oio->oi_active); + oio->oi_active = NULL; + } } static const struct cl_io_operations osc_io_ops = { @@ -638,6 +680,7 @@ static const struct cl_io_operations osc_io_ops = { }, [CIT_WRITE] = { .cio_start = osc_io_write_start, + .cio_end = osc_io_end, .cio_fini = osc_io_fini }, [CIT_SETATTR] = { @@ -645,9 +688,10 @@ static const struct cl_io_operations osc_io_ops = { .cio_end = osc_io_setattr_end }, [CIT_FAULT] = { - .cio_fini = osc_io_fini, - .cio_start = osc_io_fault_start - }, + .cio_start = osc_io_fault_start, + .cio_end = osc_io_end, + .cio_fini = osc_io_fini + }, [CIT_FSYNC] = { .cio_start = osc_io_fsync_start, .cio_end = osc_io_fsync_end, diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index de06556..c673e9f 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -1330,14 +1330,32 @@ static int osc_lock_use(const struct lu_env *env, static int osc_lock_flush(struct osc_lock *ols, int discard) { - struct cl_lock *lock = ols->ols_cl.cls_lock; - struct cl_env_nest nest; - struct lu_env *env; - int result = 0; + struct cl_lock *lock = ols->ols_cl.cls_lock; + struct cl_env_nest nest; + struct lu_env *env; + int result = 0; + ENTRY; + + env = cl_env_nested_get(&nest); + if (!IS_ERR(env)) { + struct osc_object *obj = cl2osc(ols->ols_cl.cls_obj); + struct cl_lock_descr *descr = &lock->cll_descr; + int rc = 0; + + if (descr->cld_mode >= CLM_WRITE) { + result = osc_cache_writeback_range(env, obj, + descr->cld_start, descr->cld_end, + 1, discard); + CDEBUG(D_DLMTRACE, "write out %d pages for lock %p.\n", + result, lock); + if (result > 0) + result = 0; + } + + rc = cl_lock_discard_pages(env, lock); + if (result == 0 && rc < 0) + result = rc; - env = cl_env_nested_get(&nest); - if (!IS_ERR(env)) { - result = cl_lock_page_out(env, lock, discard); cl_env_nested_put(&nest, env); } else result = PTR_ERR(env); @@ -1345,7 +1363,7 @@ static int osc_lock_flush(struct osc_lock *ols, int discard) ols->ols_flush = 1; LINVRNT(!osc_lock_has_pages(ols)); } - return result; + RETURN(result); } /** diff --git a/lustre/osc/osc_object.c b/lustre/osc/osc_object.c index 47ebe5f..5f577ea 100644 --- a/lustre/osc/osc_object.c +++ b/lustre/osc/osc_object.c @@ -84,26 +84,46 @@ static int osc_object_init(const struct lu_env *env, struct lu_object *obj, for (i = 0; i < CRT_NR; ++i) CFS_INIT_LIST_HEAD(&osc->oo_inflight[i]); - osc_oap_pages_init(&osc->oo_read_pages); - osc_oap_pages_init(&osc->oo_write_pages); CFS_INIT_LIST_HEAD(&osc->oo_ready_item); CFS_INIT_LIST_HEAD(&osc->oo_hp_ready_item); CFS_INIT_LIST_HEAD(&osc->oo_write_item); CFS_INIT_LIST_HEAD(&osc->oo_read_item); + osc->oo_root.rb_node = NULL; + CFS_INIT_LIST_HEAD(&osc->oo_hp_exts); + CFS_INIT_LIST_HEAD(&osc->oo_urgent_exts); + CFS_INIT_LIST_HEAD(&osc->oo_rpc_exts); + CFS_INIT_LIST_HEAD(&osc->oo_reading_exts); + cfs_atomic_set(&osc->oo_nr_reads, 0); + cfs_atomic_set(&osc->oo_nr_writes, 0); + cfs_spin_lock_init(&osc->oo_lock); + return 0; } static void osc_object_free(const struct lu_env *env, struct lu_object *obj) { - struct osc_object *osc = lu2osc(obj); - int i; - - for (i = 0; i < CRT_NR; ++i) - LASSERT(cfs_list_empty(&osc->oo_inflight[i])); - - lu_object_fini(obj); - OBD_SLAB_FREE_PTR(osc, osc_object_kmem); + struct osc_object *osc = lu2osc(obj); + int i; + + for (i = 0; i < CRT_NR; ++i) + LASSERT(cfs_list_empty(&osc->oo_inflight[i])); + + LASSERT(cfs_list_empty(&osc->oo_ready_item)); + LASSERT(cfs_list_empty(&osc->oo_hp_ready_item)); + LASSERT(cfs_list_empty(&osc->oo_write_item)); + LASSERT(cfs_list_empty(&osc->oo_read_item)); + + LASSERT(osc->oo_root.rb_node == NULL); + LASSERT(cfs_list_empty(&osc->oo_hp_exts)); + LASSERT(cfs_list_empty(&osc->oo_urgent_exts)); + LASSERT(cfs_list_empty(&osc->oo_rpc_exts)); + LASSERT(cfs_list_empty(&osc->oo_reading_exts)); + LASSERT(cfs_atomic_read(&osc->oo_nr_reads) == 0); + LASSERT(cfs_atomic_read(&osc->oo_nr_writes) == 0); + + lu_object_fini(obj); + OBD_SLAB_FREE_PTR(osc, osc_object_kmem); } int osc_lvb_print(const struct lu_env *env, void *cookie, diff --git a/lustre/osc/osc_page.c b/lustre/osc/osc_page.c index 32fb5f4..bc234e2 100644 --- a/lustre/osc/osc_page.c +++ b/lustre/osc/osc_page.c @@ -206,9 +206,10 @@ static void osc_page_transfer_add(const struct lu_env *env, } static int osc_page_cache_add(const struct lu_env *env, - const struct cl_page_slice *slice, - struct cl_io *unused) + const struct cl_page_slice *slice, + struct cl_io *io) { + struct osc_io *oio = osc_env_io(env); struct osc_page *opg = cl2osc_page(slice); int result; ENTRY; @@ -216,11 +217,22 @@ static int osc_page_cache_add(const struct lu_env *env, LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0)); osc_page_transfer_get(opg, "transfer\0cache"); - result = osc_queue_async_io(env, opg); + result = osc_queue_async_io(env, io, opg); if (result != 0) osc_page_transfer_put(env, opg); else osc_page_transfer_add(env, opg, CRT_WRITE); + + /* for sync write, kernel will wait for this page to be flushed before + * osc_io_end() is called, so release it earlier. + * for mkwrite(), it's known there is no further pages. */ + if (cl_io_is_sync_write(io) || cl_io_is_mkwrite(io)) { + if (oio->oi_active != NULL) { + osc_extent_release(env, oio->oi_active); + oio->oi_active = NULL; + } + } + RETURN(result); } @@ -341,17 +353,16 @@ static int osc_page_print(const struct lu_env *env, struct client_obd *cli = &osc_export(obj)->exp_obd->u.cli; return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p: " - "1< %#x %d %u %s %s %s > " + "1< %#x %d %u %s %s > " "2< "LPU64" %u %u %#x %#x | %p %p %p > " - "3< %s %p %d %lu %d > " - "4< %d %d %d %lu %s | %s %s %s %s > " - "5< %s %s %s %s | %d %s %s | %d %s %s>\n", + "3< %s %p %d %lu %d > " + "4< %d %d %d %lu %s | %s %s %s %s > " + "5< %s %s %s %s | %d %s | %d %s %s>\n", opg, /* 1 */ oap->oap_magic, oap->oap_cmd, oap->oap_interrupted, osc_list(&oap->oap_pending_item), - osc_list(&oap->oap_urgent_item), osc_list(&oap->oap_rpc_item), /* 2 */ oap->oap_obj_off, oap->oap_page_off, oap->oap_count, @@ -375,12 +386,11 @@ static int osc_page_print(const struct lu_env *env, osc_list(&obj->oo_hp_ready_item), osc_list(&obj->oo_write_item), osc_list(&obj->oo_read_item), - obj->oo_read_pages.oop_num_pending, - osc_list(&obj->oo_read_pages.oop_pending), - osc_list(&obj->oo_read_pages.oop_urgent), - obj->oo_write_pages.oop_num_pending, - osc_list(&obj->oo_write_pages.oop_pending), - osc_list(&obj->oo_write_pages.oop_urgent)); + cfs_atomic_read(&obj->oo_nr_reads), + osc_list(&obj->oo_reading_exts), + cfs_atomic_read(&obj->oo_nr_writes), + osc_list(&obj->oo_hp_exts), + osc_list(&obj->oo_urgent_exts)); } static void osc_page_delete(const struct lu_env *env, @@ -395,7 +405,7 @@ static void osc_page_delete(const struct lu_env *env, ENTRY; CDEBUG(D_TRACE, "%p\n", opg); osc_page_transfer_put(env, opg); - rc = osc_teardown_async_page(obj, opg); + rc = osc_teardown_async_page(env, obj, opg); if (rc) { CL_PAGE_DEBUG(D_ERROR, env, cl_page_top(slice->cpl_page), "Trying to teardown failed: %d\n", rc); @@ -425,23 +435,31 @@ void osc_page_clip(const struct lu_env *env, const struct cl_page_slice *slice, static int osc_page_cancel(const struct lu_env *env, const struct cl_page_slice *slice) { - struct osc_page *opg = cl2osc_page(slice); - struct osc_async_page *oap = &opg->ops_oap; + struct osc_page *opg = cl2osc_page(slice); int rc = 0; LINVRNT(osc_page_protected(env, opg, CLM_READ, 0)); - client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock); /* Check if the transferring against this page * is completed, or not even queued. */ if (opg->ops_transfer_pinned) /* FIXME: may not be interrupted.. */ rc = osc_cancel_async_page(env, opg); LASSERT(ergo(rc == 0, opg->ops_transfer_pinned == 0)); - client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock); return rc; } +static int osc_page_flush(const struct lu_env *env, + const struct cl_page_slice *slice, + struct cl_io *io) +{ + struct osc_page *opg = cl2osc_page(slice); + int rc = 0; + ENTRY; + rc = osc_flush_async_page(env, io, opg); + RETURN(rc); +} + static const struct cl_page_operations osc_page_ops = { .cpo_fini = osc_page_fini, .cpo_print = osc_page_print, @@ -458,7 +476,8 @@ static const struct cl_page_operations osc_page_ops = { } }, .cpo_clip = osc_page_clip, - .cpo_cancel = osc_page_cancel + .cpo_cancel = osc_page_cancel, + .cpo_flush = osc_page_flush }; struct cl_page *osc_page_init(const struct lu_env *env, @@ -499,19 +518,34 @@ struct cl_page *osc_page_init(const struct lu_env *env, * Helper function called by osc_io_submit() for every page in an immediate * transfer (i.e., transferred synchronously). */ -void osc_io_submit_page(const struct lu_env *env, - struct osc_io *oio, struct osc_page *opg, - enum cl_req_type crt) +void osc_page_submit(const struct lu_env *env, struct osc_page *opg, + enum cl_req_type crt, int brw_flags) { - LINVRNT(osc_page_protected(env, opg, - crt == CRT_WRITE ? CLM_WRITE : CLM_READ, 1)); - - osc_queue_sync_page(env, opg, - crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, - osc_io_srvlock(oio) ? OBD_BRW_SRVLOCK : 0); - - osc_page_transfer_get(opg, "transfer\0imm"); - osc_page_transfer_add(env, opg, crt); + struct osc_async_page *oap = &opg->ops_oap; + struct osc_object *obj = oap->oap_obj; + + LINVRNT(osc_page_protected(env, opg, + crt == CRT_WRITE ? CLM_WRITE : CLM_READ, 1)); + + LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, " + "magic 0x%x\n", oap, oap->oap_magic); + LASSERT(oap->oap_async_flags & ASYNC_READY); + LASSERT(oap->oap_async_flags & ASYNC_COUNT_STABLE); + + oap->oap_cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ; + oap->oap_page_off = opg->ops_from; + oap->oap_count = opg->ops_to - opg->ops_from; + oap->oap_brw_flags = OBD_BRW_SYNC | brw_flags; + + if (!client_is_remote(osc_export(obj)) && + cfs_capable(CFS_CAP_SYS_RESOURCE)) { + oap->oap_brw_flags |= OBD_BRW_NOQUOTA; + oap->oap_cmd |= OBD_BRW_NOQUOTA; + } + + opg->ops_submit_time = cfs_time_current(); + osc_page_transfer_get(opg, "transfer\0imm"); + osc_page_transfer_add(env, opg, crt); } /** @} osc */ diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 3aaf175..8e86162 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -815,7 +815,7 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, (cli->cl_max_rpcs_in_flight + 1); oa->o_undirty = max(cli->cl_dirty_max, max_in_flight); } - oa->o_grant = cli->cl_avail_grant; + oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant; oa->o_dropped = cli->cl_lost_grant; cli->cl_lost_grant = 0; client_obd_list_unlock(&cli->cl_loi_list_lock); @@ -1030,15 +1030,17 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) cli->cl_avail_grant = ocd->ocd_grant; } - client_obd_list_unlock(&cli->cl_loi_list_lock); + /* determine the appropriate chunk size used by osc_extent. */ + cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize); + client_obd_list_unlock(&cli->cl_loi_list_lock); - CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n", - cli->cl_import->imp_obd->obd_name, - cli->cl_avail_grant, cli->cl_lost_grant); + CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld." + "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name, + cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits); - if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK && - cfs_list_empty(&cli->cl_grant_shrink_list)) - osc_add_shrink_grant(cli); + if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK && + cfs_list_empty(&cli->cl_grant_shrink_list)) + osc_add_shrink_grant(cli); } /* We assume that the reason this OSC got a short read is because it read @@ -1652,7 +1654,6 @@ int osc_brw_redo_request(struct ptlrpc_request *request, struct osc_brw_async_args *aa) { struct ptlrpc_request *new_req; - struct ptlrpc_request_set *set = request->rq_set; struct osc_brw_async_args *new_aa; struct osc_async_page *oap; int rc = 0; @@ -1669,15 +1670,12 @@ int osc_brw_redo_request(struct ptlrpc_request *request, if (rc) RETURN(rc); - client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock); - cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { if (oap->oap_request != NULL) { LASSERTF(request == oap->oap_request, "request %p != oap_request %p\n", request, oap->oap_request); if (oap->oap_interrupted) { - client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock); ptlrpc_req_finished(new_req); RETURN(-EINTR); } @@ -1695,8 +1693,9 @@ int osc_brw_redo_request(struct ptlrpc_request *request, new_aa = ptlrpc_req_async_args(new_req); CFS_INIT_LIST_HEAD(&new_aa->aa_oaps); - cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps); - CFS_INIT_LIST_HEAD(&aa->aa_oaps); + cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps); + CFS_INIT_LIST_HEAD(&new_aa->aa_exts); + cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts); cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) { if (oap->oap_request) { @@ -1708,16 +1707,14 @@ int osc_brw_redo_request(struct ptlrpc_request *request, new_aa->aa_ocapa = aa->aa_ocapa; aa->aa_ocapa = NULL; - /* use ptlrpc_set_add_req is safe because interpret functions work - * in check_set context. only one way exist with access to request - * from different thread got -EINTR - this way protected with - * cl_loi_list_lock */ - ptlrpc_set_add_req(set, new_req); + /* XXX: This code will run into problem if we're going to support + * to add a series of BRW RPCs into a self-defined ptlrpc_request_set + * and wait for all of them to be finished. We should inherit request + * set from old request. */ + ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1); - client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock); - - DEBUG_REQ(D_INFO, new_req, "new request"); - RETURN(0); + DEBUG_REQ(D_INFO, new_req, "new request"); + RETURN(0); } /* @@ -1874,9 +1871,11 @@ out: static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc) { - struct osc_brw_async_args *aa = data; - struct osc_async_page *oap, *tmp; - struct client_obd *cli; + struct osc_brw_async_args *aa = data; + struct osc_extent *ext; + struct osc_extent *tmp; + struct cl_object *obj = NULL; + struct client_obd *cli = aa->aa_cli; ENTRY; rc = osc_brw_fini_request(req, rc); @@ -1911,46 +1910,80 @@ static int brw_interpret(const struct lu_env *env, aa->aa_ocapa = NULL; } - cli = aa->aa_cli; - client_obd_list_lock(&cli->cl_loi_list_lock); + cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { + if (obj == NULL && rc == 0) { + obj = osc2cl(ext->oe_obj); + cl_object_get(obj); + } - /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters - * is called so we know whether to go to sync BRWs or wait for more - * RPCs to complete */ - if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) - cli->cl_w_in_flight--; - else - cli->cl_r_in_flight--; - - /* the caller may re-use the oap after the completion call so - * we need to clean it up a little */ - cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, - oap_rpc_item) { - cfs_list_del_init(&oap->oap_rpc_item); - osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc); + cfs_list_del_init(&ext->oe_link); + osc_extent_finish(env, ext, 1, rc); + } + LASSERT(cfs_list_empty(&aa->aa_exts)); + LASSERT(cfs_list_empty(&aa->aa_oaps)); + + if (obj != NULL) { + struct obdo *oa = aa->aa_oa; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + unsigned long valid = 0; + + LASSERT(rc == 0); + if (oa->o_valid & OBD_MD_FLBLOCKS) { + attr->cat_blocks = oa->o_blocks; + valid |= CAT_BLOCKS; + } + if (oa->o_valid & OBD_MD_FLMTIME) { + attr->cat_mtime = oa->o_mtime; + valid |= CAT_MTIME; + } + if (oa->o_valid & OBD_MD_FLATIME) { + attr->cat_atime = oa->o_atime; + valid |= CAT_ATIME; + } + if (oa->o_valid & OBD_MD_FLCTIME) { + attr->cat_ctime = oa->o_ctime; + valid |= CAT_CTIME; + } + if (valid != 0) { + cl_object_attr_lock(obj); + cl_object_attr_set(env, obj, attr, valid); + cl_object_attr_unlock(obj); + } + cl_object_put(env, obj); } OBDO_FREE(aa->aa_oa); - osc_wake_cache_waiters(cli); - osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); - client_obd_list_unlock(&cli->cl_loi_list_lock); - cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc : req->rq_bulk->bd_nob_transferred); osc_release_ppga(aa->aa_ppga, aa->aa_page_count); ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); + client_obd_list_lock(&cli->cl_loi_list_lock); + /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters + * is called so we know whether to go to sync BRWs or wait for more + * RPCs to complete */ + if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) + cli->cl_w_in_flight--; + else + cli->cl_r_in_flight--; + osc_wake_cache_waiters(cli); + client_obd_list_unlock(&cli->cl_loi_list_lock); + + osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); RETURN(rc); } -/* The most tricky part of this function is that it will return with - * cli->cli_loi_list_lock held. +/** + * Build an RPC by the list of extent @ext_list. The caller must ensure + * that the total pages in this list are NOT over max pages per RPC. + * Extents in the list must be in OES_RPC state. */ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, - cfs_list_t *rpc_list, int page_count, int cmd, - pdl_policy_t pol) + cfs_list_t *ext_list, int cmd, pdl_policy_t pol) { struct ptlrpc_request *req = NULL; + struct osc_extent *ext; + CFS_LIST_HEAD(rpc_list); struct brw_page **pga = NULL; struct osc_brw_async_args *aa = NULL; struct obdo *oa = NULL; @@ -1960,17 +1993,39 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ; struct ldlm_lock *lock = NULL; struct cl_req_attr crattr; - int i, rc, mpflag = 0; + obd_off starting_offset = OBD_OBJECT_EOF; + obd_off ending_offset = 0; + int i, rc, mpflag = 0, mem_tight = 0, page_count = 0; - ENTRY; - LASSERT(!cfs_list_empty(rpc_list)); + ENTRY; + LASSERT(!cfs_list_empty(ext_list)); + + /* add pages into rpc_list to build BRW rpc */ + cfs_list_for_each_entry(ext, ext_list, oe_link) { + LASSERT(ext->oe_state == OES_RPC); + mem_tight |= ext->oe_memalloc; + cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { + ++page_count; + cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list); + if (starting_offset > oap->oap_obj_off) + starting_offset = oap->oap_obj_off; + else + LASSERT(oap->oap_page_off == 0); + if (ending_offset < oap->oap_obj_off + oap->oap_count) + ending_offset = oap->oap_obj_off + + oap->oap_count; + else + LASSERT(oap->oap_page_off + oap->oap_count == + CFS_PAGE_SIZE); + } + } - if (cmd & OBD_BRW_MEMALLOC) - mpflag = cfs_memory_pressure_get_and_set(); + if (mem_tight) + mpflag = cfs_memory_pressure_get_and_set(); - memset(&crattr, 0, sizeof crattr); - OBD_ALLOC(pga, sizeof(*pga) * page_count); - if (pga == NULL) + memset(&crattr, 0, sizeof crattr); + OBD_ALLOC(pga, sizeof(*pga) * page_count); + if (pga == NULL) GOTO(out, rc = -ENOMEM); OBDO_ALLOC(oa); @@ -1978,16 +2033,18 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, GOTO(out, rc = -ENOMEM); i = 0; - cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) { - struct cl_page *page = osc_oap2cl_page(oap); + cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) { + struct cl_page *page = oap2cl_page(oap); if (clerq == NULL) { clerq = cl_req_alloc(env, page, crt, 1 /* only 1-object rpcs for * now */); if (IS_ERR(clerq)) GOTO(out, rc = PTR_ERR(clerq)); - lock = oap->oap_ldlm_lock; - } + lock = oap->oap_ldlm_lock; + } + if (mem_tight) + oap->oap_brw_flags |= OBD_BRW_MEMALLOC; pga[i] = &oap->oap_brw_page; pga[i]->off = oap->oap_obj_off + oap->oap_page_off; CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n", @@ -1998,8 +2055,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, /* always get the data for the obdo for the rpc */ LASSERT(clerq != NULL); - crattr.cra_oa = oa; - crattr.cra_capa = NULL; + crattr.cra_oa = oa; + crattr.cra_capa = NULL; memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE); cl_req_attr_set(env, clerq, &crattr, ~0ULL); if (lock) { @@ -2011,18 +2068,18 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, if (rc != 0) { CERROR("cl_req_prep failed: %d\n", rc); GOTO(out, rc); - } + } - sort_brw_pages(pga, page_count); - rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, - pga, &req, crattr.cra_capa, 1, 0); - if (rc != 0) { - CERROR("prep_req failed: %d\n", rc); + sort_brw_pages(pga, page_count); + rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, + pga, &req, crattr.cra_capa, 1, 0); + if (rc != 0) { + CERROR("prep_req failed: %d\n", rc); GOTO(out, rc); } req->rq_interpret_reply = brw_interpret; - if (cmd & OBD_BRW_MEMALLOC) + if (mem_tight != 0) req->rq_memalloc = 1; /* Need to update the timestamps after the request is built in case @@ -2035,17 +2092,72 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid); - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); - CFS_INIT_LIST_HEAD(&aa->aa_oaps); - cfs_list_splice(rpc_list, &aa->aa_oaps); - CFS_INIT_LIST_HEAD(rpc_list); - aa->aa_clerq = clerq; + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + CFS_INIT_LIST_HEAD(&aa->aa_oaps); + cfs_list_splice_init(&rpc_list, &aa->aa_oaps); + CFS_INIT_LIST_HEAD(&aa->aa_exts); + cfs_list_splice_init(ext_list, &aa->aa_exts); + aa->aa_clerq = clerq; + + /* queued sync pages can be torn down while the pages + * were between the pending list and the rpc */ + tmp = NULL; + cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { + /* only one oap gets a request reference */ + if (tmp == NULL) + tmp = oap; + if (oap->oap_interrupted && !req->rq_intr) { + CDEBUG(D_INODE, "oap %p in req %p interrupted\n", + oap, req); + ptlrpc_mark_interrupted(req); + } + } + if (tmp != NULL) + tmp->oap_request = ptlrpc_request_addref(req); + + client_obd_list_lock(&cli->cl_loi_list_lock); + starting_offset >>= CFS_PAGE_SHIFT; + if (cmd == OBD_BRW_READ) { + cli->cl_r_in_flight++; + lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); + lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); + lprocfs_oh_tally_log2(&cli->cl_read_offset_hist, + starting_offset + 1); + } else { + cli->cl_w_in_flight++; + lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count); + lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight); + lprocfs_oh_tally_log2(&cli->cl_write_offset_hist, + starting_offset + 1); + } + client_obd_list_unlock(&cli->cl_loi_list_lock); + + DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight", + page_count, aa, cli->cl_r_in_flight, + cli->cl_w_in_flight); + + /* XXX: Maybe the caller can check the RPC bulk descriptor to + * see which CPU/NUMA node the majority of pages were allocated + * on, and try to assign the async RPC to the CPU core + * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic. + * + * But on the other hand, we expect that multiple ptlrpcd + * threads and the initial write sponsor can run in parallel, + * especially when data checksum is enabled, which is CPU-bound + * operation and single ptlrpcd thread cannot process in time. + * So more ptlrpcd threads sharing BRW load + * (with PDL_POLICY_ROUND) seems better. + */ + ptlrpcd_add_req(req, pol, -1); + rc = 0; + EXIT; + out: - if (cmd & OBD_BRW_MEMALLOC) - cfs_memory_pressure_restore(mpflag); + if (mem_tight != 0) + cfs_memory_pressure_restore(mpflag); - capa_put(crattr.cra_capa); + capa_put(crattr.cra_capa); if (rc != 0) { LASSERT(req == NULL); @@ -2055,59 +2167,14 @@ out: OBD_FREE(pga, sizeof(*pga) * page_count); /* this should happen rarely and is pretty bad, it makes the * pending list not follow the dirty order */ - client_obd_list_lock(&cli->cl_loi_list_lock); - cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) { - cfs_list_del_init(&oap->oap_rpc_item); - - /* queued sync pages can be torn down while the pages - * were between the pending list and the rpc */ - if (oap->oap_interrupted) { - CDEBUG(D_INODE, "oap %p interrupted\n", oap); - osc_ap_completion(env, cli, NULL, oap, 0, - oap->oap_count); - continue; - } - osc_ap_completion(env, cli, NULL, oap, 0, rc); + while (!cfs_list_empty(ext_list)) { + ext = cfs_list_entry(ext_list->next, struct osc_extent, + oe_link); + cfs_list_del_init(&ext->oe_link); + osc_extent_finish(env, ext, 0, rc); } if (clerq && !IS_ERR(clerq)) cl_req_completion(env, clerq, rc); - } else { - struct osc_async_page *tmp = NULL; - - /* queued sync pages can be torn down while the pages - * were between the pending list and the rpc */ - LASSERT(aa != NULL); - client_obd_list_lock(&cli->cl_loi_list_lock); - cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { - /* only one oap gets a request reference */ - if (tmp == NULL) - tmp = oap; - if (oap->oap_interrupted && !req->rq_intr) { - CDEBUG(D_INODE, "oap %p in req %p interrupted\n", - oap, req); - ptlrpc_mark_interrupted(req); - } - } - if (tmp != NULL) - tmp->oap_request = ptlrpc_request_addref(req); - - DEBUG_REQ(D_INODE,req, "%d pages, aa %p. now %dr/%dw in flight", - page_count, aa, cli->cl_r_in_flight, - cli->cl_w_in_flight); - - /* XXX: Maybe the caller can check the RPC bulk descriptor to - * see which CPU/NUMA node the majority of pages were allocated - * on, and try to assign the async RPC to the CPU core - * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic. - * - * But on the other hand, we expect that multiple ptlrpcd - * threads and the initial write sponsor can run in parallel, - * especially when data checksum is enabled, which is CPU-bound - * operation and single ptlrpcd thread cannot process in time. - * So more ptlrpcd threads sharing BRW load - * (with PDL_POLICY_ROUND) seems better. - */ - ptlrpcd_add_req(req, pol, -1); } RETURN(rc); } @@ -3281,15 +3348,12 @@ static int osc_reconnect(const struct lu_env *env, cli->cl_lost_grant = 0; client_obd_list_unlock(&cli->cl_loi_list_lock); - CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld " - "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant, - cli->cl_avail_grant, cli->cl_dirty, lost_grant); CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d" - " ocd_grant: %d\n", data->ocd_connect_flags, - data->ocd_version, data->ocd_grant); - } + " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags, + data->ocd_version, data->ocd_grant, lost_grant); + } - RETURN(0); + RETURN(0); } static int osc_disconnect(struct obd_export *exp) @@ -3374,11 +3438,9 @@ static int osc_import_event(struct obd_device *obd, if (!IS_ERR(env)) { /* Reset grants */ cli = &obd->u.cli; - client_obd_list_lock(&cli->cl_loi_list_lock); /* all pages go to failing rpcs due to the invalid * import */ osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND); - client_obd_list_unlock(&cli->cl_loi_list_lock); ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); cl_env_put(env, &refcheck); @@ -3459,9 +3521,7 @@ static int brw_queue_work(const struct lu_env *env, void *data) CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli); - client_obd_list_lock(&cli->cl_loi_list_lock); osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); - client_obd_list_unlock(&cli->cl_loi_list_lock); RETURN(0); } diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 261e50d..140b342 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -2776,7 +2776,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) cfs_spin_lock (&req->rq_lock); if (req->rq_import_generation < imp->imp_generation) { req->rq_err = 1; - req->rq_status = -EINTR; + req->rq_status = -EIO; ptlrpc_client_wake_req(req); } cfs_spin_unlock (&req->rq_lock); @@ -2791,7 +2791,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) cfs_spin_lock (&req->rq_lock); if (req->rq_import_generation < imp->imp_generation) { req->rq_err = 1; - req->rq_status = -EINTR; + req->rq_status = -EIO; ptlrpc_client_wake_req(req); } cfs_spin_unlock (&req->rq_lock); diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 0a667b7..a008b0b 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -3032,6 +3032,29 @@ test_48e() { # bug 4134 } run_test 48e "Access to recreated parent subdir (should return errors)" +test_49() { # LU-1030 + # get ost1 size - lustre-OST0000 + ost1_size=$(do_facet ost1 lfs df |grep ${ost1_svc} |awk '{print $4}') + # write 800M at maximum + [ $ost1_size -gt 819200 ] && ost1_size=819200 + + lfs setstripe -c 1 -i 0 $DIR/$tfile + dd if=/dev/zero of=$DIR/$tfile bs=4k count=$((ost1_size >> 2)) & + local dd_pid=$! + + # change max_pages_per_rpc while writing the file + local osc1_mppc=osc.$(get_osc_import_name client ost1).max_pages_per_rpc + local orig_mppc=`$LCTL get_param -n $osc1_mppc` + # loop until dd process exits + while ps ax -opid | grep -q $dd_pid; do + $LCTL set_param $osc1_mppc=$((RANDOM % 256 + 1)) + sleep $((RANDOM % 5 + 1)) + done + # restore original max_pages_per_rpc + $LCTL set_param $osc1_mppc=$orig_mppc +} +run_test 49 "Change max_pages_per_rpc won't break osc extent" + test_50() { # bug 1485 mkdir $DIR/d50