From 364c4cdb58ed75996017d35dcbbc7e7acc6d076e Mon Sep 17 00:00:00 2001 From: Jinshan Xiong Date: Mon, 20 Feb 2012 11:01:00 -0800 Subject: [PATCH] LU-1030 osc: move io data from lov_oinfo into osc_object Cleanup the code and move io related data structure into osc_object so that it will be easier to manage pages with red-black tree. Signed-off-by: Jinshan Xiong Change-Id: I7cd8d6278bb70b221351d9db4193c679f93aeb21 Reviewed-on: http://review.whamcloud.com/2009 Tested-by: Hudson Reviewed-by: Johann Lombardi Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/include/obd.h | 29 +- lustre/osc/Makefile.in | 2 +- lustre/osc/autoMakefile.am | 5 +- lustre/osc/osc_cache.c | 1236 ++++++++++++++++++++++++++++++++++++++++ lustre/osc/osc_cl_internal.h | 42 ++ lustre/osc/osc_internal.h | 42 +- lustre/osc/osc_io.c | 36 +- lustre/osc/osc_object.c | 10 +- lustre/osc/osc_page.c | 248 ++------- lustre/osc/osc_request.c | 1267 ++++-------------------------------------- 10 files changed, 1460 insertions(+), 1457 deletions(-) create mode 100644 lustre/osc/osc_cache.c diff --git a/lustre/include/obd.h b/lustre/include/obd.h index da354e2..66b14f0 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -72,14 +72,6 @@ #define MAX_OBD_DEVICES 8192 -/* this is really local to the OSC */ -struct loi_oap_pages { - cfs_list_t lop_pending; - cfs_list_t lop_urgent; - cfs_list_t lop_pending_group; - int lop_num_pending; -}; - struct osc_async_rc { int ar_rc; int ar_force_sync; @@ -91,14 +83,6 @@ struct lov_oinfo { /* per-stripe data structure */ int loi_ost_idx; /* OST stripe index in lov_tgt_desc->tgts */ int loi_ost_gen; /* generation of this loi_ost_idx */ - /* used by the osc to keep track of what objects to build into rpcs */ - struct loi_oap_pages loi_read_lop; - struct loi_oap_pages loi_write_lop; - cfs_list_t loi_ready_item; - cfs_list_t loi_hp_ready_item; - cfs_list_t loi_write_item; - cfs_list_t loi_read_item; - unsigned long loi_kms_valid:1; __u64 loi_kms; /* known minimum size */ struct ost_lvb loi_lvb; @@ -115,16 +99,6 @@ static inline void loi_kms_set(struct lov_oinfo *oinfo, __u64 kms) static inline void loi_init(struct lov_oinfo *loi) { - CFS_INIT_LIST_HEAD(&loi->loi_read_lop.lop_pending); - CFS_INIT_LIST_HEAD(&loi->loi_read_lop.lop_urgent); - CFS_INIT_LIST_HEAD(&loi->loi_read_lop.lop_pending_group); - CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending); - CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_urgent); - CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending_group); - CFS_INIT_LIST_HEAD(&loi->loi_ready_item); - CFS_INIT_LIST_HEAD(&loi->loi_hp_ready_item); - CFS_INIT_LIST_HEAD(&loi->loi_write_item); - CFS_INIT_LIST_HEAD(&loi->loi_read_item); } struct lov_stripe_md { @@ -501,6 +475,9 @@ struct client_obd { * Exact type of ->cl_loi_list_lock is defined in arch/obd.h together * with client_obd_list_{un,}lock() and * client_obd_list_lock_{init,done}() functions. + * + * NB by Jinshan: though field names are still _loi_, but actually + * osc_object{}s are in the list. */ client_obd_lock_t cl_loi_list_lock; cfs_list_t cl_loi_ready_list; diff --git a/lustre/osc/Makefile.in b/lustre/osc/Makefile.in index 34d6c6e..13a8517 100644 --- a/lustre/osc/Makefile.in +++ b/lustre/osc/Makefile.in @@ -1,5 +1,5 @@ MODULES := osc -osc-objs := osc_request.o lproc_osc.o osc_create.o osc_dev.o osc_object.o osc_page.o osc_lock.o osc_io.o osc_quota.o +osc-objs := osc_request.o lproc_osc.o osc_create.o osc_dev.o osc_object.o osc_page.o osc_lock.o osc_io.o osc_quota.o osc_cache.o EXTRA_DIST = $(osc-objs:%.o=%.c) osc_internal.h osc_cl_internal.h diff --git a/lustre/osc/autoMakefile.am b/lustre/osc/autoMakefile.am index 1a94992..e93e90d 100644 --- a/lustre/osc/autoMakefile.am +++ b/lustre/osc/autoMakefile.am @@ -38,7 +38,7 @@ if LIBLUSTRE noinst_LIBRARIES = libosc.a -libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h osc_cl_internal.h osc_dev.c osc_object.c osc_page.c osc_lock.c osc_io.c osc_quota.c +libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h osc_cl_internal.h osc_dev.c osc_object.c osc_page.c osc_lock.c osc_io.c osc_quota.c osc_cache.c libosc_a_CPPFLAGS = $(LLCPPFLAGS) libosc_a_CFLAGS = $(LLCFLAGS) @@ -61,7 +61,8 @@ osc_SOURCES := \ osc_lock.c \ osc_io.c \ osc_request.c \ - osc_quota.c + osc_quota.c \ + osc_cache.c osc_CFLAGS := $(EXTRA_KCFLAGS) osc_LDFLAGS := $(EXTRA_KLDFLAGS) diff --git a/lustre/osc/osc_cache.c b/lustre/osc/osc_cache.c new file mode 100644 index 0000000..9f12ad6 --- /dev/null +++ b/lustre/osc/osc_cache.c @@ -0,0 +1,1236 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + * + * Copyright (c) 2011 Whamcloud, Inc. + * + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * osc cache management. + * + * Author: Jinshan Xiong + */ + +#define DEBUG_SUBSYSTEM S_OSC + +#include "osc_cl_internal.h" +#include "osc_internal.h" + +static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, + struct osc_async_page *oap); +static int osc_enter_cache_try(const struct lu_env *env, struct client_obd *cli, + struct osc_async_page *oap, int transient); +static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap, + int sent); + +/** \addtogroup osc + * @{ + */ + +#define OSC_IO_DEBUG(OSC, STR, args...) \ + CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \ + !cfs_list_empty(&(OSC)->oo_ready_item) || \ + !cfs_list_empty(&(OSC)->oo_hp_ready_item), \ + (OSC)->oo_write_pages.oop_num_pending, \ + !cfs_list_empty(&(OSC)->oo_write_pages.oop_urgent), \ + (OSC)->oo_read_pages.oop_num_pending, \ + !cfs_list_empty(&(OSC)->oo_read_pages.oop_urgent), \ + args) + +static inline struct osc_page *oap2osc_page(struct osc_async_page *oap) +{ + return (struct osc_page *)container_of(oap, struct osc_page, ops_oap); +} + +static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap, + int cmd) +{ + struct osc_page *opg = oap2osc_page(oap); + struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page); + int result; + + LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */ + + ENTRY; + result = cl_page_make_ready(env, page, CRT_WRITE); + if (result == 0) + opg->ops_submit_time = cfs_time_current(); + RETURN(result); +} + +static int osc_refresh_count(const struct lu_env *env, + struct osc_async_page *oap, int cmd) +{ + struct osc_page *opg = oap2osc_page(oap); + struct cl_page *page; + struct cl_object *obj; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + + int result; + loff_t kms; + + /* readpage queues with _COUNT_STABLE, shouldn't get here. */ + LASSERT(!(cmd & OBD_BRW_READ)); + LASSERT(opg != NULL); + page = opg->ops_cl.cpl_page; + obj = opg->ops_cl.cpl_obj; + + cl_object_attr_lock(obj); + result = cl_object_attr_get(env, obj, attr); + cl_object_attr_unlock(obj); + if (result < 0) + return result; + kms = attr->cat_kms; + if (cl_offset(obj, page->cp_index) >= kms) + /* catch race with truncate */ + return 0; + else if (cl_offset(obj, page->cp_index + 1) > kms) + /* catch sub-page write at end of file */ + return kms % CFS_PAGE_SIZE; + else + return CFS_PAGE_SIZE; +} + +static int osc_completion(const struct lu_env *env, struct osc_async_page *oap, + int cmd, struct obdo *oa, int rc) +{ + struct osc_page *opg = oap2osc_page(oap); + struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page); + struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj); + enum cl_req_type crt; + int srvlock; + + ENTRY; + + cmd &= ~OBD_BRW_NOQUOTA; + LASSERT(equi(page->cp_state == CPS_PAGEIN, cmd == OBD_BRW_READ)); + LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE)); + LASSERT(opg->ops_transfer_pinned); + + /* + * page->cp_req can be NULL if io submission failed before + * cl_req was allocated. + */ + if (page->cp_req != NULL) + cl_req_page_done(env, page); + LASSERT(page->cp_req == NULL); + + /* As the transfer for this page is being done, clear the flags */ + cfs_spin_lock(&oap->oap_lock); + oap->oap_async_flags = 0; + cfs_spin_unlock(&oap->oap_lock); + + crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE; + /* Clear opg->ops_transfer_pinned before VM lock is released. */ + opg->ops_transfer_pinned = 0; + + cfs_spin_lock(&obj->oo_seatbelt); + LASSERT(opg->ops_submitter != NULL); + LASSERT(!cfs_list_empty(&opg->ops_inflight)); + cfs_list_del_init(&opg->ops_inflight); + cfs_spin_unlock(&obj->oo_seatbelt); + + opg->ops_submit_time = 0; + srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK; + + cl_page_completion(env, page, crt, rc); + + /* statistic */ + if (rc == 0 && srvlock) { + struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev; + struct osc_stats *stats = &lu2osc_dev(ld)->od_stats; + int bytes = oap->oap_count; + + if (crt == CRT_READ) + stats->os_lockless_reads += bytes; + else + stats->os_lockless_writes += bytes; + } + + /* + * This has to be the last operation with the page, as locks are + * released in cl_page_completion() and nothing except for the + * reference counter protects page from concurrent reclaim. + */ + lu_ref_del(&page->cp_reference, "transfer", page); + /* + * As page->cp_obj is pinned by a reference from page->cp_req, it is + * safe to call cl_page_put() without risking object destruction in a + * non-blocking context. + */ + cl_page_put(env, page); + RETURN(0); +} + +/* caller must hold loi_list_lock */ +static void osc_consume_write_grant(struct client_obd *cli, + struct brw_page *pga) +{ + LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock); + LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT)); + cfs_atomic_inc(&obd_dirty_pages); + cli->cl_dirty += CFS_PAGE_SIZE; + cli->cl_avail_grant -= CFS_PAGE_SIZE; + pga->flag |= OBD_BRW_FROM_GRANT; + CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n", + CFS_PAGE_SIZE, pga, pga->pg); + LASSERT(cli->cl_avail_grant >= 0); + osc_update_next_shrink(cli); +} + +/* the companion to osc_consume_write_grant, called when a brw has completed. + * must be called with the loi lock held. */ +static void osc_release_write_grant(struct client_obd *cli, + struct brw_page *pga, int sent) +{ + int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096; + ENTRY; + + LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock); + if (!(pga->flag & OBD_BRW_FROM_GRANT)) { + EXIT; + return; + } + + pga->flag &= ~OBD_BRW_FROM_GRANT; + cfs_atomic_dec(&obd_dirty_pages); + cli->cl_dirty -= CFS_PAGE_SIZE; + if (pga->flag & OBD_BRW_NOCACHE) { + pga->flag &= ~OBD_BRW_NOCACHE; + cfs_atomic_dec(&obd_dirty_transit_pages); + cli->cl_dirty_transit -= CFS_PAGE_SIZE; + } + if (!sent) { + /* Reclaim grant from truncated pages. This is used to solve + * write-truncate and grant all gone(to lost_grant) problem. + * For a vfs write this problem can be easily solved by a sync + * write, however, this is not an option for page_mkwrite() + * because grant has to be allocated before a page becomes + * dirty. */ + if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE) + cli->cl_avail_grant += CFS_PAGE_SIZE; + else + cli->cl_lost_grant += CFS_PAGE_SIZE; + CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n", + cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty); + } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) { + /* For short writes we shouldn't count parts of pages that + * span a whole block on the OST side, or our accounting goes + * wrong. Should match the code in filter_grant_check. */ + int offset = pga->off & ~CFS_PAGE_MASK; + int count = pga->count + (offset & (blocksize - 1)); + int end = (offset + pga->count) & (blocksize - 1); + if (end) + count += blocksize - end; + + cli->cl_lost_grant += CFS_PAGE_SIZE - count; + CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n", + CFS_PAGE_SIZE - count, cli->cl_lost_grant, + cli->cl_avail_grant, cli->cl_dirty); + } + + EXIT; +} + +/* The companion to osc_enter_cache(), called when @oap is no longer part of + * the dirty accounting. Writeback completes or truncate happens before + * writing starts. Must be called with the loi lock held. */ +static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap, + int sent) +{ + osc_release_write_grant(cli, &oap->oap_brw_page, sent); +} + +/** + * Non-blocking version of osc_enter_cache() that consumes grant only when it + * is available. + */ +static int osc_enter_cache_try(const struct lu_env *env, struct client_obd *cli, + struct osc_async_page *oap, int transient) +{ + int has_grant; + + has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE; + if (has_grant) { + osc_consume_write_grant(cli, &oap->oap_brw_page); + if (transient) { + cli->cl_dirty_transit += CFS_PAGE_SIZE; + cfs_atomic_inc(&obd_dirty_transit_pages); + oap->oap_brw_flags |= OBD_BRW_NOCACHE; + } + } + return has_grant; +} + +/* Caller must hold loi_list_lock - we drop/regain it if we need to wait for + * grant or cache space. */ +static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli, + struct osc_async_page *oap) +{ + struct osc_object *osc = oap->oap_obj; + struct lov_oinfo *loi = osc->oo_oinfo; + struct osc_cache_waiter ocw; + struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); + int rc = -EDQUOT; + ENTRY; + + CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu " + "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages), + cli->cl_dirty_max, obd_max_dirty_pages, + cli->cl_lost_grant, cli->cl_avail_grant); + + /* force the caller to try sync io. this can jump the list + * of queued writes and create a discontiguous rpc stream */ + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) || + cli->cl_dirty_max < CFS_PAGE_SIZE || + cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) + RETURN(-EDQUOT); + + /* Hopefully normal case - cache space and write credits available */ + if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max && + cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages && + osc_enter_cache_try(env, cli, oap, 0)) + RETURN(0); + + /* We can get here for two reasons: too many dirty pages in cache, or + * run out of grants. In both cases we should write dirty pages out. + * Adding a cache waiter will trigger urgent write-out no matter what + * RPC size will be. + * The exiting condition is no avail grants and no dirty pages caching, + * that really means there is no space on the OST. */ + cfs_waitq_init(&ocw.ocw_waitq); + ocw.ocw_oap = oap; + while (cli->cl_dirty > 0) { + cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters); + ocw.ocw_rc = 0; + + osc_io_unplug(env, cli, osc, PDL_POLICY_ROUND); + client_obd_list_unlock(&cli->cl_loi_list_lock); + + CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n", + cli->cl_import->imp_obd->obd_name, &ocw, oap); + + rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry), + &lwi); + + client_obd_list_lock(&cli->cl_loi_list_lock); + cfs_list_del_init(&ocw.ocw_entry); + if (rc < 0) + break; + + rc = ocw.ocw_rc; + if (rc != -EDQUOT) + break; + } + + RETURN(rc); +} + +/* caller must hold loi_list_lock */ +void osc_wake_cache_waiters(struct client_obd *cli) +{ + cfs_list_t *l, *tmp; + struct osc_cache_waiter *ocw; + + ENTRY; + cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { + /* if we can't dirty more, we must wait until some is written */ + if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) || + (cfs_atomic_read(&obd_dirty_pages) + 1 > + obd_max_dirty_pages)) { + CDEBUG(D_CACHE, "no dirty room: dirty: %ld " + "osc max %ld, sys max %d\n", cli->cl_dirty, + cli->cl_dirty_max, obd_max_dirty_pages); + return; + } + + /* if still dirty cache but no grant wait for pending RPCs that + * may yet return us some grant before doing sync writes */ + if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) { + CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n", + cli->cl_w_in_flight); + return; + } + + ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry); + cfs_list_del_init(&ocw->ocw_entry); + if (cli->cl_avail_grant < CFS_PAGE_SIZE) { + /* no more RPCs in flight to return grant, do sync IO */ + ocw->ocw_rc = -EDQUOT; + CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap); + } else { + osc_consume_write_grant(cli, + &ocw->ocw_oap->oap_brw_page); + } + + CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n", + ocw, ocw->ocw_oap, cli->cl_avail_grant); + + cfs_waitq_signal(&ocw->ocw_waitq); + } + + EXIT; +} + +static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc) +{ + struct osc_async_page *oap; + int hprpc = 0; + + if (!cfs_list_empty(&osc->oo_write_pages.oop_urgent)) { + oap = cfs_list_entry(osc->oo_write_pages.oop_urgent.next, + struct osc_async_page, oap_urgent_item); + hprpc = !!(oap->oap_async_flags & ASYNC_HP); + } + + if (!hprpc && !cfs_list_empty(&osc->oo_read_pages.oop_urgent)) { + oap = cfs_list_entry(osc->oo_read_pages.oop_urgent.next, + struct osc_async_page, oap_urgent_item); + hprpc = !!(oap->oap_async_flags & ASYNC_HP); + } + + return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc; +} + +/* This maintains the lists of pending pages to read/write for a given object + * (lop). This is used by osc_check_rpcs->osc_next_obj() and osc_list_maint() + * to quickly find objects that are ready to send an RPC. */ +static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc, + int cmd) +{ + struct osc_oap_pages *lop; + ENTRY; + + if (cmd & OBD_BRW_WRITE) { + lop = &osc->oo_write_pages; + } else { + lop = &osc->oo_read_pages; + } + + if (lop->oop_num_pending == 0) + RETURN(0); + + /* if we have an invalid import we want to drain the queued pages + * by forcing them through rpcs that immediately fail and complete + * the pages. recovery relies on this to empty the queued pages + * before canceling the locks and evicting down the llite pages */ + if (cli->cl_import == NULL || cli->cl_import->imp_invalid) + RETURN(1); + + /* stream rpcs in queue order as long as as there is an urgent page + * queued. this is our cheap solution for good batching in the case + * where writepage marks some random page in the middle of the file + * as urgent because of, say, memory pressure */ + if (!cfs_list_empty(&lop->oop_urgent)) { + CDEBUG(D_CACHE, "urgent request forcing RPC\n"); + RETURN(1); + } + + if (cmd & OBD_BRW_WRITE) { + /* trigger a write rpc stream as long as there are dirtiers + * waiting for space. as they're waiting, they're not going to + * create more pages to coalesce with what's waiting.. */ + if (!cfs_list_empty(&cli->cl_cache_waiters)) { + CDEBUG(D_CACHE, "cache waiters forcing RPC\n"); + RETURN(1); + } + } + if (lop->oop_num_pending >= cli->cl_max_pages_per_rpc) + RETURN(1); + + RETURN(0); +} + +static void lop_update_pending(struct client_obd *cli, + struct osc_oap_pages *lop, int cmd, int delta) +{ + lop->oop_num_pending += delta; + if (cmd & OBD_BRW_WRITE) + cli->cl_pending_w_pages += delta; + else + cli->cl_pending_r_pages += delta; +} + +static int osc_makes_hprpc(struct osc_oap_pages *lop) +{ + struct osc_async_page *oap; + ENTRY; + + if (cfs_list_empty(&lop->oop_urgent)) + RETURN(0); + + oap = cfs_list_entry(lop->oop_urgent.next, + struct osc_async_page, oap_urgent_item); + + if (oap->oap_async_flags & ASYNC_HP) { + CDEBUG(D_CACHE, "hp request forcing RPC\n"); + RETURN(1); + } + + RETURN(0); +} + +static void on_list(cfs_list_t *item, cfs_list_t *list, int should_be_on) +{ + if (cfs_list_empty(item) && should_be_on) + cfs_list_add_tail(item, list); + else if (!cfs_list_empty(item) && !should_be_on) + cfs_list_del_init(item); +} + +/* maintain the osc's cli list membership invariants so that osc_send_oap_rpc + * can find pages to build into rpcs quickly */ +static void osc_list_maint(struct client_obd *cli, struct osc_object *osc) +{ + if (osc_makes_hprpc(&osc->oo_write_pages) || + osc_makes_hprpc(&osc->oo_read_pages)) { + /* HP rpc */ + on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list, 0); + on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 1); + } else { + on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 0); + on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list, + osc_makes_rpc(cli, osc, OBD_BRW_WRITE) || + osc_makes_rpc(cli, osc, OBD_BRW_READ)); + } + + on_list(&osc->oo_write_item, &cli->cl_loi_write_list, + osc->oo_write_pages.oop_num_pending); + + on_list(&osc->oo_read_item, &cli->cl_loi_read_list, + osc->oo_read_pages.oop_num_pending); +} + +/* this is trying to propogate async writeback errors back up to the + * application. As an async write fails we record the error code for later if + * the app does an fsync. As long as errors persist we force future rpcs to be + * sync so that the app can get a sync error and break the cycle of queueing + * pages for which writeback will fail. */ +static void osc_process_ar(struct osc_async_rc *ar, __u64 xid, + int rc) +{ + if (rc) { + if (!ar->ar_rc) + ar->ar_rc = rc; + + ar->ar_force_sync = 1; + ar->ar_min_xid = ptlrpc_sample_next_xid(); + return; + + } + + if (ar->ar_force_sync && (xid >= ar->ar_min_xid)) + ar->ar_force_sync = 0; +} + +static void osc_oap_to_pending(struct osc_async_page *oap) +{ + struct osc_object *osc = oap->oap_obj; + struct osc_oap_pages *lop; + + if (oap->oap_cmd & OBD_BRW_WRITE) + lop = &osc->oo_write_pages; + else + lop = &osc->oo_read_pages; + + if (oap->oap_async_flags & ASYNC_HP) + cfs_list_add(&oap->oap_urgent_item, &lop->oop_urgent); + else if (oap->oap_async_flags & ASYNC_URGENT) + cfs_list_add_tail(&oap->oap_urgent_item, &lop->oop_urgent); + cfs_list_add_tail(&oap->oap_pending_item, &lop->oop_pending); + lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1); +} + +/* this must be called holding the loi list lock to give coverage to exit_cache, + * async_flag maintenance, and oap_request */ +void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, + struct obdo *oa, struct osc_async_page *oap, + int sent, int rc) +{ + struct osc_object *osc = oap->oap_obj; + struct lov_oinfo *loi = osc->oo_oinfo; + __u64 xid = 0; + + ENTRY; + if (oap->oap_request != NULL) { + xid = ptlrpc_req_xid(oap->oap_request); + ptlrpc_req_finished(oap->oap_request); + oap->oap_request = NULL; + } + + cfs_spin_lock(&oap->oap_lock); + oap->oap_async_flags = 0; + cfs_spin_unlock(&oap->oap_lock); + oap->oap_interrupted = 0; + + if (oap->oap_cmd & OBD_BRW_WRITE) { + osc_process_ar(&cli->cl_ar, xid, rc); + osc_process_ar(&loi->loi_ar, xid, rc); + } + + if (rc == 0 && oa != NULL) { + if (oa->o_valid & OBD_MD_FLBLOCKS) + loi->loi_lvb.lvb_blocks = oa->o_blocks; + if (oa->o_valid & OBD_MD_FLMTIME) + loi->loi_lvb.lvb_mtime = oa->o_mtime; + if (oa->o_valid & OBD_MD_FLATIME) + loi->loi_lvb.lvb_atime = oa->o_atime; + if (oa->o_valid & OBD_MD_FLCTIME) + loi->loi_lvb.lvb_ctime = oa->o_ctime; + } + + rc = osc_completion(env, oap, oap->oap_cmd, oa, rc); + + /* cl_page_completion() drops PG_locked. so, a new I/O on the page could + * start, but OSC calls it under lock and thus we can add oap back to + * pending safely */ + if (rc) + /* upper layer wants to leave the page on pending queue */ + osc_oap_to_pending(oap); + else + osc_exit_cache(cli, oap, sent); + EXIT; +} + +/** + * prepare pages for ASYNC io and put pages in send queue. + * + * \param cmd OBD_BRW_* macroses + * \param lop pending pages + * + * \return zero if no page added to send queue. + * \return 1 if pages successfully added to send queue. + * \return negative on errors. + */ +static int +osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, + struct osc_object *osc, int cmd, + struct osc_oap_pages *lop, pdl_policy_t pol) +{ + obd_count page_count = 0; + struct osc_async_page *oap = NULL, *tmp; + CFS_LIST_HEAD(rpc_list); + int srvlock = 0, mem_tight = 0; + obd_off starting_offset = OBD_OBJECT_EOF; + unsigned int ending_offset; + int starting_page_off = 0; + int rc; + ENTRY; + + /* ASYNC_HP pages first. At present, when the lock the pages is + * to be canceled, the pages covered by the lock will be sent out + * with ASYNC_HP. We have to send out them as soon as possible. */ + cfs_list_for_each_entry_safe(oap, tmp, &lop->oop_urgent, oap_urgent_item) { + if (oap->oap_async_flags & ASYNC_HP) + cfs_list_move(&oap->oap_pending_item, &rpc_list); + else if (!(oap->oap_brw_flags & OBD_BRW_SYNC)) + /* only do this for writeback pages. */ + cfs_list_move_tail(&oap->oap_pending_item, &rpc_list); + if (++page_count >= cli->cl_max_pages_per_rpc) + break; + } + cfs_list_splice_init(&rpc_list, &lop->oop_pending); + page_count = 0; + + /* first we find the pages we're allowed to work with */ + cfs_list_for_each_entry_safe(oap, tmp, &lop->oop_pending, + oap_pending_item) { + LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, " + "magic 0x%x\n", oap, oap->oap_magic); + + if (page_count != 0 && + srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) { + CDEBUG(D_PAGE, "SRVLOCK flag mismatch," + " oap %p, page %p, srvlock %u\n", + oap, oap->oap_brw_page.pg, (unsigned)!srvlock); + break; + } + + /* If there is a gap at the start of this page, it can't merge + * with any previous page, so we'll hand the network a + * "fragmented" page array that it can't transfer in 1 RDMA */ + if (oap->oap_obj_off < starting_offset) { + if (starting_page_off != 0) + break; + + starting_page_off = oap->oap_page_off; + starting_offset = oap->oap_obj_off + starting_page_off; + } else if (oap->oap_page_off != 0) + break; + + /* in llite being 'ready' equates to the page being locked + * until completion unlocks it. commit_write submits a page + * as not ready because its unlock will happen unconditionally + * as the call returns. if we race with commit_write giving + * us that page we don't want to create a hole in the page + * stream, so we stop and leave the rpc to be fired by + * another dirtier or kupdated interval (the not ready page + * will still be on the dirty list). we could call in + * at the end of ll_file_write to process the queue again. */ + if (!(oap->oap_async_flags & ASYNC_READY)) { + int rc = osc_make_ready(env, oap, cmd); + if (rc < 0) + CDEBUG(D_INODE, "oap %p page %p returned %d " + "instead of ready\n", oap, + oap->oap_page, rc); + switch (rc) { + case -EAGAIN: + /* llite is telling us that the page is still + * in commit_write and that we should try + * and put it in an rpc again later. we + * break out of the loop so we don't create + * a hole in the sequence of pages in the rpc + * stream.*/ + oap = NULL; + break; + case -EINTR: + /* the io isn't needed.. tell the checks + * below to complete the rpc with EINTR */ + cfs_spin_lock(&oap->oap_lock); + oap->oap_async_flags |= ASYNC_COUNT_STABLE; + cfs_spin_unlock(&oap->oap_lock); + oap->oap_count = -EINTR; + break; + case 0: + cfs_spin_lock(&oap->oap_lock); + oap->oap_async_flags |= ASYNC_READY; + cfs_spin_unlock(&oap->oap_lock); + break; + default: + LASSERTF(0, "oap %p page %p returned %d " + "from make_ready\n", oap, + oap->oap_page, rc); + break; + } + } + if (oap == NULL) + break; + + /* take the page out of our book-keeping */ + cfs_list_del_init(&oap->oap_pending_item); + lop_update_pending(cli, lop, cmd, -1); + cfs_list_del_init(&oap->oap_urgent_item); + + /* ask the caller for the size of the io as the rpc leaves. */ + if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) { + oap->oap_count = osc_refresh_count(env, oap, cmd); + LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE); + } + if (oap->oap_count <= 0) { + CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap, + oap->oap_count); + osc_ap_completion(env, cli, NULL, + oap, 0, oap->oap_count); + continue; + } + + /* now put the page back in our accounting */ + cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list); + if (page_count++ == 0) + srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK); + + if (oap->oap_brw_flags & OBD_BRW_MEMALLOC) + mem_tight = 1; + + /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized + * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads + * have the same alignment as the initial writes that allocated + * extents on the server. */ + ending_offset = oap->oap_obj_off + oap->oap_page_off + + oap->oap_count; + if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1))) + break; + + if (page_count >= cli->cl_max_pages_per_rpc) + break; + + /* If there is a gap at the end of this page, it can't merge + * with any subsequent pages, so we'll hand the network a + * "fragmented" page array that it can't transfer in 1 RDMA */ + if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE) + break; + } + + osc_list_maint(cli, osc); + + client_obd_list_unlock(&cli->cl_loi_list_lock); + + if (page_count == 0) { + client_obd_list_lock(&cli->cl_loi_list_lock); + RETURN(0); + } + + if (mem_tight) + cmd |= OBD_BRW_MEMALLOC; + rc = osc_build_rpc(env, cli, &rpc_list, page_count, cmd, pol); + if (rc != 0) { + LASSERT(cfs_list_empty(&rpc_list)); + osc_list_maint(cli, osc); + RETURN(rc); + } + + starting_offset &= PTLRPC_MAX_BRW_SIZE - 1; + if (cmd == OBD_BRW_READ) { + cli->cl_r_in_flight++; + lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); + lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); + lprocfs_oh_tally_log2(&cli->cl_read_offset_hist, + (starting_offset >> CFS_PAGE_SHIFT) + 1); + } else { + cli->cl_w_in_flight++; + lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count); + lprocfs_oh_tally(&cli->cl_write_rpc_hist, + cli->cl_w_in_flight); + lprocfs_oh_tally_log2(&cli->cl_write_offset_hist, + (starting_offset >> CFS_PAGE_SHIFT) + 1); + } + + RETURN(1); +} + +#define list_to_obj(list, item) \ + cfs_list_entry((list)->next, struct osc_object, oo_##item) + +/* This is called by osc_check_rpcs() to find which objects have pages that + * we could be sending. These lists are maintained by osc_makes_rpc(). */ +static struct osc_object *osc_next_obj(struct client_obd *cli) +{ + ENTRY; + + /* First return objects that have blocked locks so that they + * will be flushed quickly and other clients can get the lock, + * then objects which have pages ready to be stuffed into RPCs */ + if (!cfs_list_empty(&cli->cl_loi_hp_ready_list)) + RETURN(list_to_obj(&cli->cl_loi_hp_ready_list, hp_ready_item)); + if (!cfs_list_empty(&cli->cl_loi_ready_list)) + RETURN(list_to_obj(&cli->cl_loi_ready_list, ready_item)); + + /* then if we have cache waiters, return all objects with queued + * writes. This is especially important when many small files + * have filled up the cache and not been fired into rpcs because + * they don't pass the nr_pending/object threshhold */ + if (!cfs_list_empty(&cli->cl_cache_waiters) && + !cfs_list_empty(&cli->cl_loi_write_list)) + RETURN(list_to_obj(&cli->cl_loi_write_list, write_item)); + + /* then return all queued objects when we have an invalid import + * so that they get flushed */ + if (cli->cl_import == NULL || cli->cl_import->imp_invalid) { + if (!cfs_list_empty(&cli->cl_loi_write_list)) + RETURN(list_to_obj(&cli->cl_loi_write_list, + write_item)); + if (!cfs_list_empty(&cli->cl_loi_read_list)) + RETURN(list_to_obj(&cli->cl_loi_read_list, + read_item)); + } + RETURN(NULL); +} + +/* called with the loi list lock held */ +static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli, + pdl_policy_t pol) +{ + struct osc_object *osc; + int rc = 0, race_counter = 0; + ENTRY; + + while ((osc = osc_next_obj(cli)) != NULL) { + OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli)); + + if (osc_max_rpc_in_flight(cli, osc)) + break; + + /* attempt some read/write balancing by alternating between + * reads and writes in an object. The makes_rpc checks here + * would be redundant if we were getting read/write work items + * instead of objects. we don't want send_oap_rpc to drain a + * partial read pending queue when we're given this object to + * do io on writes while there are cache waiters */ + if (osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) { + rc = osc_send_oap_rpc(env, cli, osc, OBD_BRW_WRITE, + &osc->oo_write_pages, pol); + if (rc < 0) { + CERROR("Write request failed with %d\n", rc); + + /* osc_send_oap_rpc failed, mostly because of + * memory pressure. + * + * It can't break here, because if: + * - a page was submitted by osc_io_submit, so + * page locked; + * - no request in flight + * - no subsequent request + * The system will be in live-lock state, + * because there is no chance to call + * osc_io_unplug() and osc_check_rpcs() any + * more. pdflush can't help in this case, + * because it might be blocked at grabbing + * the page lock as we mentioned. + * + * Anyway, continue to drain pages. */ + /* break; */ + } + + if (rc > 0) + race_counter = 0; + else if (rc == 0) + race_counter++; + } + if (osc_makes_rpc(cli, osc, OBD_BRW_READ)) { + rc = osc_send_oap_rpc(env, cli, osc, OBD_BRW_READ, + &osc->oo_read_pages, pol); + if (rc < 0) + CERROR("Read request failed with %d\n", rc); + + if (rc > 0) + race_counter = 0; + else if (rc == 0) + race_counter++; + } + + /* attempt some inter-object balancing by issuing rpcs + * for each object in turn */ + if (!cfs_list_empty(&osc->oo_hp_ready_item)) + cfs_list_del_init(&osc->oo_hp_ready_item); + if (!cfs_list_empty(&osc->oo_ready_item)) + cfs_list_del_init(&osc->oo_ready_item); + if (!cfs_list_empty(&osc->oo_write_item)) + cfs_list_del_init(&osc->oo_write_item); + if (!cfs_list_empty(&osc->oo_read_item)) + cfs_list_del_init(&osc->oo_read_item); + + osc_list_maint(cli, osc); + + /* send_oap_rpc fails with 0 when make_ready tells it to + * back off. llite's make_ready does this when it tries + * to lock a page queued for write that is already locked. + * we want to try sending rpcs from many objects, but we + * don't want to spin failing with 0. */ + if (race_counter == 10) + break; + } +} + +void osc_io_unplug(const struct lu_env *env, struct client_obd *cli, + struct osc_object *osc, pdl_policy_t pol) +{ + if (osc) + osc_list_maint(cli, osc); + osc_check_rpcs(env, cli, pol); +} + +int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops, + cfs_page_t *page, loff_t offset) +{ + struct obd_export *exp = osc_export(osc); + struct osc_async_page *oap = &ops->ops_oap; + ENTRY; + + if (!page) + return cfs_size_round(sizeof(*oap)); + + oap->oap_magic = OAP_MAGIC; + oap->oap_cli = &exp->exp_obd->u.cli; + oap->oap_obj = osc; + + oap->oap_page = page; + oap->oap_obj_off = offset; + LASSERT(!(offset & ~CFS_PAGE_MASK)); + + if (!client_is_remote(exp) && cfs_capable(CFS_CAP_SYS_RESOURCE)) + oap->oap_brw_flags = OBD_BRW_NOQUOTA; + + CFS_INIT_LIST_HEAD(&oap->oap_pending_item); + CFS_INIT_LIST_HEAD(&oap->oap_urgent_item); + CFS_INIT_LIST_HEAD(&oap->oap_rpc_item); + + cfs_spin_lock_init(&oap->oap_lock); + CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", + oap, page, oap->oap_obj_off); + RETURN(0); +} + +int osc_queue_async_io(const struct lu_env *env, struct osc_page *ops) +{ + struct osc_async_page *oap = &ops->ops_oap; + struct client_obd *cli = oap->oap_cli; + struct osc_object *osc = oap->oap_obj; + struct obd_export *exp = osc_export(osc); + int brw_flags = OBD_BRW_ASYNC; + int cmd = OBD_BRW_WRITE; + int rc = 0; + ENTRY; + + if (oap->oap_magic != OAP_MAGIC) + RETURN(-EINVAL); + + if (cli->cl_import == NULL || cli->cl_import->imp_invalid) + RETURN(-EIO); + + if (!cfs_list_empty(&oap->oap_pending_item) || + !cfs_list_empty(&oap->oap_urgent_item) || + !cfs_list_empty(&oap->oap_rpc_item)) + RETURN(-EBUSY); + + /* Set the OBD_BRW_SRVLOCK before the page is queued. */ + brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0; + if (!client_is_remote(exp) && cfs_capable(CFS_CAP_SYS_RESOURCE)) { + brw_flags |= OBD_BRW_NOQUOTA; + cmd |= OBD_BRW_NOQUOTA; + } + + /* check if the file's owner/group is over quota */ + if (!(cmd & OBD_BRW_NOQUOTA)) { + struct cl_object *obj; + struct cl_attr *attr; + unsigned int qid[MAXQUOTAS]; + + obj = cl_object_top(&osc->oo_cl); + attr = &osc_env_info(env)->oti_attr; + + cl_object_attr_lock(obj); + rc = cl_object_attr_get(env, obj, attr); + cl_object_attr_unlock(obj); + + qid[USRQUOTA] = attr->cat_uid; + qid[GRPQUOTA] = attr->cat_gid; + if (rc == 0 && + osc_quota_chkdq(cli, qid) == NO_QUOTA) + rc = -EDQUOT; + if (rc) + RETURN(rc); + } + + client_obd_list_lock(&cli->cl_loi_list_lock); + + oap->oap_cmd = cmd; + oap->oap_page_off = ops->ops_from; + oap->oap_count = ops->ops_to - ops->ops_from; + oap->oap_async_flags = 0; + oap->oap_brw_flags = brw_flags; + /* Give a hint to OST that requests are coming from kswapd - bug19529 */ + if (cfs_memory_pressure_get()) + oap->oap_brw_flags |= OBD_BRW_MEMALLOC; + + rc = osc_enter_cache(env, cli, oap); + if (rc) { + client_obd_list_unlock(&cli->cl_loi_list_lock); + RETURN(rc); + } + + OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n", + oap, oap->oap_page, cmd); + + osc_oap_to_pending(oap); + osc_list_maint(cli, osc); + if (!osc_max_rpc_in_flight(cli, osc) && + osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) { + LASSERT(cli->cl_writeback_work != NULL); + rc = ptlrpcd_queue_work(cli->cl_writeback_work); + + CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n", + cli, rc); + } + client_obd_list_unlock(&cli->cl_loi_list_lock); + + RETURN(0); +} + +int osc_teardown_async_page(struct osc_object *obj, struct osc_page *ops) +{ + struct osc_async_page *oap = &ops->ops_oap; + struct client_obd *cli = oap->oap_cli; + struct osc_oap_pages *lop; + int rc = 0; + ENTRY; + + if (oap->oap_magic != OAP_MAGIC) + RETURN(-EINVAL); + + if (oap->oap_cmd & OBD_BRW_WRITE) { + lop = &obj->oo_write_pages; + } else { + lop = &obj->oo_read_pages; + } + + client_obd_list_lock(&cli->cl_loi_list_lock); + + if (!cfs_list_empty(&oap->oap_rpc_item)) + GOTO(out, rc = -EBUSY); + + osc_exit_cache(cli, oap, 0); + osc_wake_cache_waiters(cli); + + if (!cfs_list_empty(&oap->oap_urgent_item)) { + cfs_list_del_init(&oap->oap_urgent_item); + cfs_spin_lock(&oap->oap_lock); + oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP); + cfs_spin_unlock(&oap->oap_lock); + } + if (!cfs_list_empty(&oap->oap_pending_item)) { + cfs_list_del_init(&oap->oap_pending_item); + lop_update_pending(cli, lop, oap->oap_cmd, -1); + } + osc_list_maint(cli, obj); + OSC_IO_DEBUG(obj, "oap %p page %p torn down\n", oap, oap->oap_page); +out: + client_obd_list_unlock(&cli->cl_loi_list_lock); + RETURN(rc); +} + +/* aka (~was & now & flag), but this is more clear :) */ +#define SETTING(was, now, flag) (!(was & flag) && (now & flag)) + +int osc_set_async_flags(struct osc_object *obj, struct osc_page *opg, + obd_flag async_flags) +{ + struct osc_async_page *oap = &opg->ops_oap; + struct osc_oap_pages *lop; + int flags = 0; + ENTRY; + + LASSERT(!cfs_list_empty(&oap->oap_pending_item)); + + if (oap->oap_cmd & OBD_BRW_WRITE) { + lop = &obj->oo_write_pages; + } else { + lop = &obj->oo_read_pages; + } + + if ((oap->oap_async_flags & async_flags) == async_flags) + RETURN(0); + + if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY)) + flags |= ASYNC_READY; + + if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) && + cfs_list_empty(&oap->oap_rpc_item)) { + if (oap->oap_async_flags & ASYNC_HP) + cfs_list_add(&oap->oap_urgent_item, &lop->oop_urgent); + else + cfs_list_add_tail(&oap->oap_urgent_item, + &lop->oop_urgent); + flags |= ASYNC_URGENT; + osc_list_maint(oap->oap_cli, obj); + } + cfs_spin_lock(&oap->oap_lock); + oap->oap_async_flags |= flags; + cfs_spin_unlock(&oap->oap_lock); + + OSC_IO_DEBUG(obj, "oap %p page %p has flags %x\n", oap, + oap->oap_page, oap->oap_async_flags); + RETURN(0); +} + +/** + * this is called when a sync waiter receives an interruption. Its job is to + * get the caller woken as soon as possible. If its page hasn't been put in an + * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as + * desiring interruption which will forcefully complete the rpc once the rpc + * has timed out. + */ +int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops) +{ + struct osc_async_page *oap = &ops->ops_oap; + int rc = -EBUSY; + ENTRY; + + LASSERT(!oap->oap_interrupted); + oap->oap_interrupted = 1; + + /* ok, it's been put in an rpc. only one oap gets a request reference */ + if (oap->oap_request != NULL) { + ptlrpc_mark_interrupted(oap->oap_request); + ptlrpcd_wake(oap->oap_request); + ptlrpc_req_finished(oap->oap_request); + oap->oap_request = NULL; + } + + /* + * page completion may be called only if ->cpo_prep() method was + * executed by osc_io_submit(), that also adds page the to pending list + */ + if (!cfs_list_empty(&oap->oap_pending_item)) { + struct osc_oap_pages *lop; + struct osc_object *osc = oap->oap_obj; + + cfs_list_del_init(&oap->oap_pending_item); + cfs_list_del_init(&oap->oap_urgent_item); + + lop = (oap->oap_cmd & OBD_BRW_WRITE) ? + &osc->oo_write_pages : &osc->oo_read_pages; + lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1); + osc_list_maint(oap->oap_cli, osc); + rc = osc_completion(env, oap, oap->oap_cmd, NULL, -EINTR); + } + + RETURN(rc); +} + +int osc_queue_sync_page(const struct lu_env *env, struct osc_page *opg, + int cmd, int brw_flags) +{ + struct osc_async_page *oap = &opg->ops_oap; + struct client_obd *cli = oap->oap_cli; + int flags = 0; + ENTRY; + + oap->oap_cmd = cmd; + oap->oap_page_off = opg->ops_from; + oap->oap_count = opg->ops_to - opg->ops_from; + oap->oap_brw_flags = OBD_BRW_SYNC | brw_flags; + + /* Give a hint to OST that requests are coming from kswapd - bug19529 */ + if (cfs_memory_pressure_get()) + oap->oap_brw_flags |= OBD_BRW_MEMALLOC; + + if (!client_is_remote(osc_export(cl2osc(opg->ops_cl.cpl_obj))) && + cfs_capable(CFS_CAP_SYS_RESOURCE)) { + oap->oap_brw_flags |= OBD_BRW_NOQUOTA; + oap->oap_cmd |= OBD_BRW_NOQUOTA; + } + + if (oap->oap_cmd & OBD_BRW_READ) + flags = ASYNC_COUNT_STABLE; + else if (!(oap->oap_brw_page.flag & OBD_BRW_FROM_GRANT)) + osc_enter_cache_try(env, cli, oap, 1); + + cfs_spin_lock(&oap->oap_lock); + oap->oap_async_flags |= OSC_FLAGS | flags; + cfs_spin_unlock(&oap->oap_lock); + + osc_oap_to_pending(oap); + RETURN(0); +} + +/** @} osc */ diff --git a/lustre/osc/osc_cl_internal.h b/lustre/osc/osc_cl_internal.h index 39f866c..57350ea 100644 --- a/lustre/osc/osc_cl_internal.h +++ b/lustre/osc/osc_cl_internal.h @@ -100,6 +100,22 @@ struct osc_thread_info { struct cl_page_list oti_plist; }; +/** + * Manage osc_async_page + */ +struct osc_oap_pages { + cfs_list_t oop_pending; + cfs_list_t oop_urgent; + int oop_num_pending; +}; + +static inline void osc_oap_pages_init(struct osc_oap_pages *list) +{ + CFS_INIT_LIST_HEAD(&list->oop_pending); + CFS_INIT_LIST_HEAD(&list->oop_urgent); + list->oop_num_pending = 0; +} + struct osc_object { struct cl_object oo_cl; struct lov_oinfo *oo_oinfo; @@ -125,6 +141,16 @@ struct osc_object { * locked during take-off and landing. */ cfs_spinlock_t oo_seatbelt; + + /** + * used by the osc to keep track of what objects to build into rpcs + */ + struct osc_oap_pages oo_read_pages; + struct osc_oap_pages oo_write_pages; + cfs_list_t oo_ready_item; + cfs_list_t oo_hp_ready_item; + cfs_list_t oo_write_item; + cfs_list_t oo_read_item; }; /* @@ -361,9 +387,25 @@ void osc_index2policy (ldlm_policy_data_t *policy, const struct cl_object *obj, pgoff_t start, pgoff_t end); int osc_lvb_print (const struct lu_env *env, void *cookie, lu_printer_t p, const struct ost_lvb *lvb); + void osc_io_submit_page(const struct lu_env *env, struct osc_io *oio, struct osc_page *opg, enum cl_req_type crt); +void osc_ap_completion(const struct lu_env *env, struct client_obd *cli, + struct obdo *oa, struct osc_async_page *oap, + int sent, int rc); +int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops); +int osc_set_async_flags(struct osc_object *obj, struct osc_page *opg, + obd_flag async_flags); +int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops, + cfs_page_t *page, loff_t offset); +int osc_queue_async_io(const struct lu_env *env, struct osc_page *ops); +int osc_teardown_async_page(struct osc_object *obj, + struct osc_page *ops); +int osc_queue_sync_page(const struct lu_env *env, struct osc_page *ops, + int cmd, int brw_flags); +void osc_io_unplug(const struct lu_env *env, struct client_obd *cli, + struct osc_object *osc, pdl_policy_t pol); void osc_object_set_contended (struct osc_object *obj); void osc_object_clear_contended(struct osc_object *obj); diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index 6890547..7293627 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -51,13 +51,6 @@ enum async_flags { ASYNC_HP = 0x10, }; -struct obd_async_page_ops { - int (*ap_make_ready)(const struct lu_env *env, void *data, int cmd); - int (*ap_refresh_count)(const struct lu_env *env, void *data, int cmd); - int (*ap_completion)(const struct lu_env *env, - void *data, int cmd, struct obdo *oa, int rc); -}; - struct osc_async_page { int oap_magic; unsigned short oap_cmd; @@ -75,11 +68,8 @@ struct osc_async_page { struct ptlrpc_request *oap_request; struct client_obd *oap_cli; - struct lov_oinfo *oap_loi; + struct osc_object *oap_obj; - const struct obd_async_page_ops *oap_caller_ops; - void *oap_caller_data; - cfs_list_t oap_page_list; struct ldlm_lock *oap_ldlm_lock; cfs_spinlock_t oap_lock; }; @@ -116,6 +106,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, void oscc_init(struct obd_device *obd); void osc_wake_cache_waiters(struct client_obd *cli); int osc_shrink_grant_to_target(struct client_obd *cli, long target); +void osc_update_next_shrink(struct client_obd *cli); /* * cl integration. @@ -146,28 +137,10 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, obd_enqueue_update_f upcall, void *cookie, struct ptlrpc_request_set *rqset); -int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, - struct lov_oinfo *loi, cfs_page_t *page, - obd_off offset, const struct obd_async_page_ops *ops, - void *data, void **res, int nocache, - struct lustre_handle *lockh); -void osc_oap_to_pending(struct osc_async_page *oap); -int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap); -void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi); -void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli); -int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp, - struct lov_stripe_md *lsm, struct lov_oinfo *loi, - struct osc_async_page *oap, int cmd, int off, - int count, obd_flag brw_flags, enum async_flags async_flags); -int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, - struct lov_oinfo *loi, struct osc_async_page *oap); int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *cfg); -int osc_set_async_flags_base(struct client_obd *cli, - struct lov_oinfo *loi, struct osc_async_page *oap, - obd_flag async_flags); -int osc_enter_cache_try(const struct lu_env *env, - struct client_obd *cli, struct lov_oinfo *loi, - struct osc_async_page *oap, int transient); +int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, + cfs_list_t *rpc_list, int page_count, int cmd, + pdl_policy_t p); struct cl_page *osc_oap2cl_page(struct osc_async_page *oap); extern cfs_spinlock_t osc_ast_guard; @@ -194,6 +167,11 @@ static inline int osc_recoverable_error(int rc) rc == -EAGAIN || rc == -EINPROGRESS); } +static inline unsigned long rpcs_in_flight(struct client_obd *cli) +{ + return cli->cl_r_in_flight + cli->cl_w_in_flight; +} + #ifndef min_t #define min_t(type,x,y) \ ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; }) diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index e7aeed8..32044fa 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -36,6 +36,7 @@ * Implementation of cl_io for OSC layer. * * Author: Nikita Danilov + * Author: Jinshan Xiong */ #define DEBUG_SUBSYSTEM S_OSC @@ -92,19 +93,12 @@ struct cl_page *osc_oap2cl_page(struct osc_async_page *oap) return container_of(oap, struct osc_page, ops_oap)->ops_cl.cpl_page; } -static void osc_io_unplug(const struct lu_env *env, struct osc_object *osc, - struct client_obd *cli) -{ - loi_list_maint(cli, osc->oo_oinfo); - osc_check_rpcs(env, cli); -} - /** * An implementation of cl_io_operations::cio_io_submit() method for osc * layer. Iterates over pages in the in-queue, prepares each for io by calling * cl_page_prep() and then either submits them through osc_io_submit_page() * or, if page is already submitted, changes osc flags through - * osc_set_async_flags_base(). + * osc_set_async_flags(). */ static int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios, @@ -172,19 +166,17 @@ static int osc_io_submit(const struct lu_env *env, osc_io_submit_page(env, cl2osc_io(env, ios), opg, crt); } else { - result = osc_set_async_flags_base(cli, - osc->oo_oinfo, - oap, - OSC_FLAGS); - /* - * bug 18881: we can't just break out here when - * error occurs after cl_page_prep has been - * called against the page. The correct - * way is to call page's completion routine, - * as in osc_oap_interrupted. For simplicity, - * we just force osc_set_async_flags_base() to - * not return error. - */ + result = osc_set_async_flags(osc, opg, + OSC_FLAGS); + /* + * bug 18881: we can't just break out here when + * error occurs after cl_page_prep has been + * called against the page. The correct + * way is to call page's completion routine, + * as in osc_oap_interrupted. For simplicity, + * we just force osc_set_async_flags() to + * not return error. + */ LASSERT(result == 0); } opg->ops_submit_time = cfs_time_current(); @@ -217,7 +209,7 @@ static int osc_io_submit(const struct lu_env *env, LASSERT(ergo(result == 0, osc == osc0)); if (queued > 0) - osc_io_unplug(env, osc, cli); + osc_io_unplug(env, cli, osc, PDL_POLICY_ROUND); if (osc0) client_obd_list_unlock(&cli->cl_loi_list_lock); CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result); diff --git a/lustre/osc/osc_object.c b/lustre/osc/osc_object.c index c4f78aa..47ebe5f 100644 --- a/lustre/osc/osc_object.c +++ b/lustre/osc/osc_object.c @@ -83,7 +83,15 @@ static int osc_object_init(const struct lu_env *env, struct lu_object *obj, cfs_spin_lock_init(&osc->oo_seatbelt); for (i = 0; i < CRT_NR; ++i) CFS_INIT_LIST_HEAD(&osc->oo_inflight[i]); - return 0; + + osc_oap_pages_init(&osc->oo_read_pages); + osc_oap_pages_init(&osc->oo_write_pages); + CFS_INIT_LIST_HEAD(&osc->oo_ready_item); + CFS_INIT_LIST_HEAD(&osc->oo_hp_ready_item); + CFS_INIT_LIST_HEAD(&osc->oo_write_item); + CFS_INIT_LIST_HEAD(&osc->oo_read_item); + + return 0; } static void osc_object_free(const struct lu_env *env, struct lu_object *obj) diff --git a/lustre/osc/osc_page.c b/lustre/osc/osc_page.c index da58fc41..32fb5f4 100644 --- a/lustre/osc/osc_page.c +++ b/lustre/osc/osc_page.c @@ -209,34 +209,19 @@ static int osc_page_cache_add(const struct lu_env *env, const struct cl_page_slice *slice, struct cl_io *unused) { - struct osc_page *opg = cl2osc_page(slice); - struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj); - int result; - /* All cacheable IO is async-capable */ - int brw_flags = OBD_BRW_ASYNC; - int noquota = 0; - - LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0)); - ENTRY; - - /* Set the OBD_BRW_SRVLOCK before the page is queued. */ - brw_flags |= opg->ops_srvlock ? OBD_BRW_SRVLOCK : 0; - if (!client_is_remote(osc_export(obj)) && - cfs_capable(CFS_CAP_SYS_RESOURCE)) { - brw_flags |= OBD_BRW_NOQUOTA; - noquota = OBD_BRW_NOQUOTA; - } - - osc_page_transfer_get(opg, "transfer\0cache"); - result = osc_queue_async_io(env, osc_export(obj), NULL, obj->oo_oinfo, - &opg->ops_oap, OBD_BRW_WRITE | noquota, - opg->ops_from, opg->ops_to - opg->ops_from, - brw_flags, 0); - if (result != 0) - osc_page_transfer_put(env, opg); - else - osc_page_transfer_add(env, opg, CRT_WRITE); - RETURN(result); + struct osc_page *opg = cl2osc_page(slice); + int result; + ENTRY; + + LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0)); + + osc_page_transfer_get(opg, "transfer\0cache"); + result = osc_queue_async_io(env, opg); + if (result != 0) + osc_page_transfer_put(env, opg); + else + osc_page_transfer_add(env, opg, CRT_WRITE); + RETURN(result); } void osc_index2policy(ldlm_policy_data_t *policy, const struct cl_object *obj, @@ -354,11 +339,10 @@ static int osc_page_print(const struct lu_env *env, struct osc_async_page *oap = &opg->ops_oap; struct osc_object *obj = cl2osc(slice->cpl_obj); struct client_obd *cli = &osc_export(obj)->exp_obd->u.cli; - struct lov_oinfo *loi = obj->oo_oinfo; return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p: " "1< %#x %d %u %s %s %s > " - "2< "LPU64" %u %u %#x %#x | %p %p %p %p %p > " + "2< "LPU64" %u %u %#x %#x | %p %p %p > " "3< %s %p %d %lu %d > " "4< %d %d %d %lu %s | %s %s %s %s > " "5< %s %s %s %s | %d %s %s | %d %s %s>\n", @@ -372,9 +356,7 @@ static int osc_page_print(const struct lu_env *env, /* 2 */ oap->oap_obj_off, oap->oap_page_off, oap->oap_count, oap->oap_async_flags, oap->oap_brw_flags, - oap->oap_request, - oap->oap_cli, oap->oap_loi, oap->oap_caller_ops, - oap->oap_caller_data, + oap->oap_request, oap->oap_cli, obj, /* 3 */ osc_list(&opg->ops_inflight), opg->ops_submitter, opg->ops_transfer_pinned, @@ -389,24 +371,23 @@ static int osc_page_print(const struct lu_env *env, osc_list(&cli->cl_loi_write_list), osc_list(&cli->cl_loi_read_list), /* 5 */ - osc_list(&loi->loi_ready_item), - osc_list(&loi->loi_hp_ready_item), - osc_list(&loi->loi_write_item), - osc_list(&loi->loi_read_item), - loi->loi_read_lop.lop_num_pending, - osc_list(&loi->loi_read_lop.lop_pending), - osc_list(&loi->loi_read_lop.lop_urgent), - loi->loi_write_lop.lop_num_pending, - osc_list(&loi->loi_write_lop.lop_pending), - osc_list(&loi->loi_write_lop.lop_urgent)); + osc_list(&obj->oo_ready_item), + osc_list(&obj->oo_hp_ready_item), + osc_list(&obj->oo_write_item), + osc_list(&obj->oo_read_item), + obj->oo_read_pages.oop_num_pending, + osc_list(&obj->oo_read_pages.oop_pending), + osc_list(&obj->oo_read_pages.oop_urgent), + obj->oo_write_pages.oop_num_pending, + osc_list(&obj->oo_write_pages.oop_pending), + osc_list(&obj->oo_write_pages.oop_urgent)); } static void osc_page_delete(const struct lu_env *env, const struct cl_page_slice *slice) { - struct osc_page *opg = cl2osc_page(slice); - struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj); - struct osc_async_page *oap = &opg->ops_oap; + struct osc_page *opg = cl2osc_page(slice); + struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj); int rc; LINVRNT(opg->ops_temp || osc_page_protected(env, opg, CLM_READ, 1)); @@ -414,7 +395,7 @@ static void osc_page_delete(const struct lu_env *env, ENTRY; CDEBUG(D_TRACE, "%p\n", opg); osc_page_transfer_put(env, opg); - rc = osc_teardown_async_page(osc_export(obj), NULL, obj->oo_oinfo, oap); + rc = osc_teardown_async_page(obj, opg); if (rc) { CL_PAGE_DEBUG(D_ERROR, env, cl_page_top(slice->cpl_page), "Trying to teardown failed: %d\n", rc); @@ -455,7 +436,7 @@ static int osc_page_cancel(const struct lu_env *env, * is completed, or not even queued. */ if (opg->ops_transfer_pinned) /* FIXME: may not be interrupted.. */ - rc = osc_oap_interrupted(env, oap); + rc = osc_cancel_async_page(env, opg); LASSERT(ergo(rc == 0, opg->ops_transfer_pinned == 0)); client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock); return rc; @@ -480,136 +461,6 @@ static const struct cl_page_operations osc_page_ops = { .cpo_cancel = osc_page_cancel }; -static int osc_make_ready(const struct lu_env *env, void *data, int cmd) -{ - struct osc_page *opg = data; - struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page); - int result; - - LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */ - LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 1)); - - ENTRY; - result = cl_page_make_ready(env, page, CRT_WRITE); - if (result == 0) - opg->ops_submit_time = cfs_time_current(); - RETURN(result); -} - -static int osc_refresh_count(const struct lu_env *env, void *data, int cmd) -{ - struct cl_page *page; - struct osc_page *osc = data; - struct cl_object *obj; - struct cl_attr *attr = &osc_env_info(env)->oti_attr; - - int result; - loff_t kms; - - LINVRNT(osc_page_protected(env, osc, CLM_READ, 1)); - - /* readpage queues with _COUNT_STABLE, shouldn't get here. */ - LASSERT(!(cmd & OBD_BRW_READ)); - LASSERT(osc != NULL); - page = osc->ops_cl.cpl_page; - obj = osc->ops_cl.cpl_obj; - - cl_object_attr_lock(obj); - result = cl_object_attr_get(env, obj, attr); - cl_object_attr_unlock(obj); - if (result < 0) - return result; - kms = attr->cat_kms; - if (cl_offset(obj, page->cp_index) >= kms) - /* catch race with truncate */ - return 0; - else if (cl_offset(obj, page->cp_index + 1) > kms) - /* catch sub-page write at end of file */ - return kms % CFS_PAGE_SIZE; - else - return CFS_PAGE_SIZE; -} - -static int osc_completion(const struct lu_env *env, - void *data, int cmd, struct obdo *oa, int rc) -{ - struct osc_page *opg = data; - struct osc_async_page *oap = &opg->ops_oap; - struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page); - struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj); - enum cl_req_type crt; - int srvlock; - - LINVRNT(osc_page_protected(env, opg, CLM_READ, 1)); - - ENTRY; - - cmd &= ~OBD_BRW_NOQUOTA; - LASSERT(equi(page->cp_state == CPS_PAGEIN, cmd == OBD_BRW_READ)); - LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE)); - LASSERT(opg->ops_transfer_pinned); - - /* - * page->cp_req can be NULL if io submission failed before - * cl_req was allocated. - */ - if (page->cp_req != NULL) - cl_req_page_done(env, page); - LASSERT(page->cp_req == NULL); - - /* As the transfer for this page is being done, clear the flags */ - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags = 0; - cfs_spin_unlock(&oap->oap_lock); - - crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE; - /* Clear opg->ops_transfer_pinned before VM lock is released. */ - opg->ops_transfer_pinned = 0; - - cfs_spin_lock(&obj->oo_seatbelt); - LASSERT(opg->ops_submitter != NULL); - LASSERT(!cfs_list_empty(&opg->ops_inflight)); - cfs_list_del_init(&opg->ops_inflight); - cfs_spin_unlock(&obj->oo_seatbelt); - - opg->ops_submit_time = 0; - srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK; - - cl_page_completion(env, page, crt, rc); - - /* statistic */ - if (rc == 0 && srvlock) { - struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev; - struct osc_stats *stats = &lu2osc_dev(ld)->od_stats; - int bytes = oap->oap_count; - - if (crt == CRT_READ) - stats->os_lockless_reads += bytes; - else - stats->os_lockless_writes += bytes; - } - - /* - * This has to be the last operation with the page, as locks are - * released in cl_page_completion() and nothing except for the - * reference counter protects page from concurrent reclaim. - */ - lu_ref_del(&page->cp_reference, "transfer", page); - /* - * As page->cp_obj is pinned by a reference from page->cp_req, it is - * safe to call cl_page_put() without risking object destruction in a - * non-blocking context. - */ - cl_page_put(env, page); - RETURN(0); -} - -const static struct obd_async_page_ops osc_async_page_ops = { - .ap_make_ready = osc_make_ready, - .ap_refresh_count = osc_refresh_count, - .ap_completion = osc_completion -}; - struct cl_page *osc_page_init(const struct lu_env *env, struct cl_object *obj, struct cl_page *page, cfs_page_t *vmpage) @@ -620,16 +471,11 @@ struct cl_page *osc_page_init(const struct lu_env *env, OBD_SLAB_ALLOC_PTR_GFP(opg, osc_page_kmem, CFS_ALLOC_IO); if (opg != NULL) { - void *oap = &opg->ops_oap; - opg->ops_from = 0; opg->ops_to = CFS_PAGE_SIZE; - result = osc_prep_async_page(osc_export(osc), - NULL, osc->oo_oinfo, vmpage, - cl_offset(obj, page->cp_index), - &osc_async_page_ops, - opg, (void **)&oap, 1, NULL); + result = osc_prep_async_page(osc, opg, vmpage, + cl_offset(obj, page->cp_index)); if (result == 0) { struct osc_io *oio = osc_env_io(env); opg->ops_srvlock = osc_io_srvlock(oio); @@ -657,39 +503,13 @@ void osc_io_submit_page(const struct lu_env *env, struct osc_io *oio, struct osc_page *opg, enum cl_req_type crt) { - struct osc_async_page *oap = &opg->ops_oap; - struct client_obd *cli = oap->oap_cli; - int flags = 0; - LINVRNT(osc_page_protected(env, opg, crt == CRT_WRITE ? CLM_WRITE : CLM_READ, 1)); - oap->oap_page_off = opg->ops_from; - oap->oap_count = opg->ops_to - opg->ops_from; - /* Give a hint to OST that requests are coming from kswapd - bug19529 */ - if (cfs_memory_pressure_get()) - oap->oap_brw_flags |= OBD_BRW_MEMALLOC; - oap->oap_brw_flags |= OBD_BRW_SYNC; - if (osc_io_srvlock(oio)) - oap->oap_brw_flags |= OBD_BRW_SRVLOCK; - - oap->oap_cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ; - if (!client_is_remote(osc_export(cl2osc(opg->ops_cl.cpl_obj))) && - cfs_capable(CFS_CAP_SYS_RESOURCE)) { - oap->oap_brw_flags |= OBD_BRW_NOQUOTA; - oap->oap_cmd |= OBD_BRW_NOQUOTA; - } - - if (oap->oap_cmd & OBD_BRW_READ) - flags = ASYNC_COUNT_STABLE; - else if (!(oap->oap_brw_page.flag & OBD_BRW_FROM_GRANT)) - osc_enter_cache_try(env, cli, oap->oap_loi, oap, 1); - - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags |= OSC_FLAGS | flags; - cfs_spin_unlock(&oap->oap_lock); + osc_queue_sync_page(env, opg, + crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, + osc_io_srvlock(oio) ? OBD_BRW_SRVLOCK : 0); - osc_oap_to_pending(oap); osc_page_transfer_get(opg, "transfer\0imm"); osc_page_transfer_add(env, opg, crt); } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index ba5946b..0de44b7 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -59,12 +59,11 @@ #include #include #include "osc_internal.h" +#include "osc_cl_internal.h" static void osc_release_ppga(struct brw_page **ppga, obd_count count); static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc); -static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, - int ptlrpc); int osc_cleanup(struct obd_device *obd); /* Pack OSC object metadata for disk storage (LE byte order). */ @@ -809,7 +808,7 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, } -static void osc_update_next_shrink(struct client_obd *cli) +void osc_update_next_shrink(struct client_obd *cli) { cli->cl_next_shrink_grant = cfs_time_shift(cli->cl_grant_shrink_interval); @@ -817,127 +816,6 @@ static void osc_update_next_shrink(struct client_obd *cli) cli->cl_next_shrink_grant); } -/* caller must hold loi_list_lock */ -static void osc_consume_write_grant(struct client_obd *cli, - struct brw_page *pga) -{ - LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock); - LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT)); - cfs_atomic_inc(&obd_dirty_pages); - cli->cl_dirty += CFS_PAGE_SIZE; - cli->cl_avail_grant -= CFS_PAGE_SIZE; - pga->flag |= OBD_BRW_FROM_GRANT; - CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n", - CFS_PAGE_SIZE, pga, pga->pg); - LASSERT(cli->cl_avail_grant >= 0); - osc_update_next_shrink(cli); -} - -/* the companion to osc_consume_write_grant, called when a brw has completed. - * must be called with the loi lock held. */ -static void osc_release_write_grant(struct client_obd *cli, - struct brw_page *pga, int sent) -{ - int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096; - ENTRY; - - LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock); - if (!(pga->flag & OBD_BRW_FROM_GRANT)) { - EXIT; - return; - } - - pga->flag &= ~OBD_BRW_FROM_GRANT; - cfs_atomic_dec(&obd_dirty_pages); - cli->cl_dirty -= CFS_PAGE_SIZE; - if (pga->flag & OBD_BRW_NOCACHE) { - pga->flag &= ~OBD_BRW_NOCACHE; - cfs_atomic_dec(&obd_dirty_transit_pages); - cli->cl_dirty_transit -= CFS_PAGE_SIZE; - } - if (!sent) { - /* Reclaim grant from truncated pages. This is used to solve - * write-truncate and grant all gone(to lost_grant) problem. - * For a vfs write this problem can be easily solved by a sync - * write, however, this is not an option for page_mkwrite() - * because grant has to be allocated before a page becomes - * dirty. */ - if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE) - cli->cl_avail_grant += CFS_PAGE_SIZE; - else - cli->cl_lost_grant += CFS_PAGE_SIZE; - CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n", - cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty); - } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) { - /* For short writes we shouldn't count parts of pages that - * span a whole block on the OST side, or our accounting goes - * wrong. Should match the code in filter_grant_check. */ - int offset = pga->off & ~CFS_PAGE_MASK; - int count = pga->count + (offset & (blocksize - 1)); - int end = (offset + pga->count) & (blocksize - 1); - if (end) - count += blocksize - end; - - cli->cl_lost_grant += CFS_PAGE_SIZE - count; - CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n", - CFS_PAGE_SIZE - count, cli->cl_lost_grant, - cli->cl_avail_grant, cli->cl_dirty); - } - - EXIT; -} - -static unsigned long rpcs_in_flight(struct client_obd *cli) -{ - return cli->cl_r_in_flight + cli->cl_w_in_flight; -} - -/* caller must hold loi_list_lock */ -void osc_wake_cache_waiters(struct client_obd *cli) -{ - cfs_list_t *l, *tmp; - struct osc_cache_waiter *ocw; - - ENTRY; - cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { - /* if we can't dirty more, we must wait until some is written */ - if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) || - (cfs_atomic_read(&obd_dirty_pages) + 1 > - obd_max_dirty_pages)) { - CDEBUG(D_CACHE, "no dirty room: dirty: %ld " - "osc max %ld, sys max %d\n", cli->cl_dirty, - cli->cl_dirty_max, obd_max_dirty_pages); - return; - } - - /* if still dirty cache but no grant wait for pending RPCs that - * may yet return us some grant before doing sync writes */ - if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) { - CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n", - cli->cl_w_in_flight); - return; - } - - ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry); - cfs_list_del_init(&ocw->ocw_entry); - if (cli->cl_avail_grant < CFS_PAGE_SIZE) { - /* no more RPCs in flight to return grant, do sync IO */ - ocw->ocw_rc = -EDQUOT; - CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap); - } else { - osc_consume_write_grant(cli, - &ocw->ocw_oap->oap_brw_page); - } - - CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n", - ocw, ocw->ocw_oap, cli->cl_avail_grant); - - cfs_waitq_signal(&ocw->ocw_waitq); - } - - EXIT; -} - static void __osc_update_grant(struct client_obd *cli, obd_size grant) { client_obd_list_lock(&cli->cl_loi_list_lock); @@ -1977,272 +1855,12 @@ out: RETURN(rc); } -/* The companion to osc_enter_cache(), called when @oap is no longer part of - * the dirty accounting. Writeback completes or truncate happens before - * writing starts. Must be called with the loi lock held. */ -static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap, - int sent) -{ - osc_release_write_grant(cli, &oap->oap_brw_page, sent); -} - - -/* This maintains the lists of pending pages to read/write for a given object - * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint() - * to quickly find objects that are ready to send an RPC. */ -static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, - int cmd) -{ - ENTRY; - - if (lop->lop_num_pending == 0) - RETURN(0); - - /* if we have an invalid import we want to drain the queued pages - * by forcing them through rpcs that immediately fail and complete - * the pages. recovery relies on this to empty the queued pages - * before canceling the locks and evicting down the llite pages */ - if (cli->cl_import == NULL || cli->cl_import->imp_invalid) - RETURN(1); - - /* stream rpcs in queue order as long as as there is an urgent page - * queued. this is our cheap solution for good batching in the case - * where writepage marks some random page in the middle of the file - * as urgent because of, say, memory pressure */ - if (!cfs_list_empty(&lop->lop_urgent)) { - CDEBUG(D_CACHE, "urgent request forcing RPC\n"); - RETURN(1); - } - - if (cmd & OBD_BRW_WRITE) { - /* trigger a write rpc stream as long as there are dirtiers - * waiting for space. as they're waiting, they're not going to - * create more pages to coalesce with what's waiting.. */ - if (!cfs_list_empty(&cli->cl_cache_waiters)) { - CDEBUG(D_CACHE, "cache waiters forcing RPC\n"); - RETURN(1); - } - } - if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc) - RETURN(1); - - RETURN(0); -} - -static int lop_makes_hprpc(struct loi_oap_pages *lop) -{ - struct osc_async_page *oap; - ENTRY; - - if (cfs_list_empty(&lop->lop_urgent)) - RETURN(0); - - oap = cfs_list_entry(lop->lop_urgent.next, - struct osc_async_page, oap_urgent_item); - - if (oap->oap_async_flags & ASYNC_HP) { - CDEBUG(D_CACHE, "hp request forcing RPC\n"); - RETURN(1); - } - - RETURN(0); -} - -static void on_list(cfs_list_t *item, cfs_list_t *list, - int should_be_on) -{ - if (cfs_list_empty(item) && should_be_on) - cfs_list_add_tail(item, list); - else if (!cfs_list_empty(item) && !should_be_on) - cfs_list_del_init(item); -} - -/* maintain the loi's cli list membership invariants so that osc_send_oap_rpc - * can find pages to build into rpcs quickly */ -void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi) -{ - if (lop_makes_hprpc(&loi->loi_write_lop) || - lop_makes_hprpc(&loi->loi_read_lop)) { - /* HP rpc */ - on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0); - on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1); - } else { - on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0); - on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, - lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)|| - lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)); - } - - on_list(&loi->loi_write_item, &cli->cl_loi_write_list, - loi->loi_write_lop.lop_num_pending); - - on_list(&loi->loi_read_item, &cli->cl_loi_read_list, - loi->loi_read_lop.lop_num_pending); -} - -static void lop_update_pending(struct client_obd *cli, - struct loi_oap_pages *lop, int cmd, int delta) -{ - lop->lop_num_pending += delta; - if (cmd & OBD_BRW_WRITE) - cli->cl_pending_w_pages += delta; - else - cli->cl_pending_r_pages += delta; -} - -/** - * this is called when a sync waiter receives an interruption. Its job is to - * get the caller woken as soon as possible. If its page hasn't been put in an - * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as - * desiring interruption which will forcefully complete the rpc once the rpc - * has timed out. - */ -int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap) -{ - struct loi_oap_pages *lop; - struct lov_oinfo *loi; - int rc = -EBUSY; - ENTRY; - - LASSERT(!oap->oap_interrupted); - oap->oap_interrupted = 1; - - /* ok, it's been put in an rpc. only one oap gets a request reference */ - if (oap->oap_request != NULL) { - ptlrpc_mark_interrupted(oap->oap_request); - ptlrpcd_wake(oap->oap_request); - ptlrpc_req_finished(oap->oap_request); - oap->oap_request = NULL; - } - - /* - * page completion may be called only if ->cpo_prep() method was - * executed by osc_io_submit(), that also adds page the to pending list - */ - if (!cfs_list_empty(&oap->oap_pending_item)) { - cfs_list_del_init(&oap->oap_pending_item); - cfs_list_del_init(&oap->oap_urgent_item); - - loi = oap->oap_loi; - lop = (oap->oap_cmd & OBD_BRW_WRITE) ? - &loi->loi_write_lop : &loi->loi_read_lop; - lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1); - loi_list_maint(oap->oap_cli, oap->oap_loi); - rc = oap->oap_caller_ops->ap_completion(env, - oap->oap_caller_data, - oap->oap_cmd, NULL, -EINTR); - } - - RETURN(rc); -} - -/* this is trying to propogate async writeback errors back up to the - * application. As an async write fails we record the error code for later if - * the app does an fsync. As long as errors persist we force future rpcs to be - * sync so that the app can get a sync error and break the cycle of queueing - * pages for which writeback will fail. */ -static void osc_process_ar(struct osc_async_rc *ar, __u64 xid, - int rc) -{ - if (rc) { - if (!ar->ar_rc) - ar->ar_rc = rc; - - ar->ar_force_sync = 1; - ar->ar_min_xid = ptlrpc_sample_next_xid(); - return; - - } - - if (ar->ar_force_sync && (xid >= ar->ar_min_xid)) - ar->ar_force_sync = 0; -} - -void osc_oap_to_pending(struct osc_async_page *oap) -{ - struct loi_oap_pages *lop; - - if (oap->oap_cmd & OBD_BRW_WRITE) - lop = &oap->oap_loi->loi_write_lop; - else - lop = &oap->oap_loi->loi_read_lop; - - if (oap->oap_async_flags & ASYNC_HP) - cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent); - else if (oap->oap_async_flags & ASYNC_URGENT) - cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent); - cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending); - lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1); -} - -/* this must be called holding the loi list lock to give coverage to exit_cache, - * async_flag maintenance, and oap_request */ -static void osc_ap_completion(const struct lu_env *env, - struct client_obd *cli, struct obdo *oa, - struct osc_async_page *oap, int sent, int rc) -{ - __u64 xid = 0; - - ENTRY; - if (oap->oap_request != NULL) { - xid = ptlrpc_req_xid(oap->oap_request); - ptlrpc_req_finished(oap->oap_request); - oap->oap_request = NULL; - } - - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags = 0; - cfs_spin_unlock(&oap->oap_lock); - oap->oap_interrupted = 0; - - if (oap->oap_cmd & OBD_BRW_WRITE) { - osc_process_ar(&cli->cl_ar, xid, rc); - osc_process_ar(&oap->oap_loi->loi_ar, xid, rc); - } - - if (rc == 0 && oa != NULL) { - if (oa->o_valid & OBD_MD_FLBLOCKS) - oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks; - if (oa->o_valid & OBD_MD_FLMTIME) - oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime; - if (oa->o_valid & OBD_MD_FLATIME) - oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime; - if (oa->o_valid & OBD_MD_FLCTIME) - oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime; - } - - rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data, - oap->oap_cmd, oa, rc); - - /* cl_page_completion() drops PG_locked. so, a new I/O on the page could - * start, but OSC calls it under lock and thus we can add oap back to - * pending safely */ - if (rc) - /* upper layer wants to leave the page on pending queue */ - osc_oap_to_pending(oap); - else - osc_exit_cache(cli, oap, sent); - EXIT; -} - -static int brw_queue_work(const struct lu_env *env, void *data) -{ - struct client_obd *cli = data; - - CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli); - - client_obd_list_lock(&cli->cl_loi_list_lock); - osc_check_rpcs0(env, cli, 1); - client_obd_list_unlock(&cli->cl_loi_list_lock); - RETURN(0); -} - static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc) { struct osc_brw_async_args *aa = data; + struct osc_async_page *oap, *tmp; struct client_obd *cli; - int async; ENTRY; rc = osc_brw_fini_request(req, rc); @@ -2288,45 +1906,38 @@ static int brw_interpret(const struct lu_env *env, else cli->cl_r_in_flight--; - async = cfs_list_empty(&aa->aa_oaps); - if (!async) { /* from osc_send_oap_rpc() */ - struct osc_async_page *oap, *tmp; - /* the caller may re-use the oap after the completion call so - * we need to clean it up a little */ - cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, - oap_rpc_item) { - cfs_list_del_init(&oap->oap_rpc_item); - osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc); - } - OBDO_FREE(aa->aa_oa); - } else { /* from async_internal() */ - obd_count i; - for (i = 0; i < aa->aa_page_count; i++) - osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1); - } - osc_wake_cache_waiters(cli); - osc_check_rpcs0(env, cli, 1); - client_obd_list_unlock(&cli->cl_loi_list_lock); + /* the caller may re-use the oap after the completion call so + * we need to clean it up a little */ + cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, + oap_rpc_item) { + cfs_list_del_init(&oap->oap_rpc_item); + osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc); + } + OBDO_FREE(aa->aa_oa); - if (!async) - cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc : - req->rq_bulk->bd_nob_transferred); - osc_release_ppga(aa->aa_ppga, aa->aa_page_count); - ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); + osc_wake_cache_waiters(cli); + osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); + client_obd_list_unlock(&cli->cl_loi_list_lock); - RETURN(rc); + cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc : + req->rq_bulk->bd_nob_transferred); + osc_release_ppga(aa->aa_ppga, aa->aa_page_count); + ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); + + RETURN(rc); } -static struct ptlrpc_request *osc_build_req(const struct lu_env *env, - struct client_obd *cli, - cfs_list_t *rpc_list, - int page_count, int cmd) +/* The most tricky part of this function is that it will return with + * cli->cli_loi_list_lock held. + */ +int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, + cfs_list_t *rpc_list, int page_count, int cmd, + pdl_policy_t pol) { - struct ptlrpc_request *req; - struct brw_page **pga = NULL; - struct osc_brw_async_args *aa; + struct ptlrpc_request *req = NULL; + struct brw_page **pga = NULL; + struct osc_brw_async_args *aa = NULL; struct obdo *oa = NULL; - const struct obd_async_page_ops *ops = NULL; struct osc_async_page *oap; struct osc_async_page *tmp; struct cl_req *clerq = NULL; @@ -2344,23 +1955,21 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env, memset(&crattr, 0, sizeof crattr); OBD_ALLOC(pga, sizeof(*pga) * page_count); if (pga == NULL) - GOTO(out, req = ERR_PTR(-ENOMEM)); - - OBDO_ALLOC(oa); - if (oa == NULL) - GOTO(out, req = ERR_PTR(-ENOMEM)); - - i = 0; - cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) { - struct cl_page *page = osc_oap2cl_page(oap); - if (ops == NULL) { - ops = oap->oap_caller_ops; - - clerq = cl_req_alloc(env, page, crt, - 1 /* only 1-object rpcs for - * now */); - if (IS_ERR(clerq)) - GOTO(out, req = (void *)clerq); + GOTO(out, rc = -ENOMEM); + + OBDO_ALLOC(oa); + if (oa == NULL) + GOTO(out, rc = -ENOMEM); + + i = 0; + cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) { + struct cl_page *page = osc_oap2cl_page(oap); + if (clerq == NULL) { + clerq = cl_req_alloc(env, page, crt, + 1 /* only 1-object rpcs for + * now */); + if (IS_ERR(clerq)) + GOTO(out, rc = PTR_ERR(clerq)); lock = oap->oap_ldlm_lock; } pga[i] = &oap->oap_brw_page; @@ -2372,7 +1981,7 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env, } /* always get the data for the obdo for the rpc */ - LASSERT(ops != NULL); + LASSERT(clerq != NULL); crattr.cra_oa = oa; crattr.cra_capa = NULL; memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE); @@ -2385,7 +1994,7 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env, rc = cl_req_prep(env, clerq); if (rc != 0) { CERROR("cl_req_prep failed: %d\n", rc); - GOTO(out, req = ERR_PTR(rc)); + GOTO(out, rc); } sort_brw_pages(pga, page_count); @@ -2393,9 +2002,10 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env, pga, &req, crattr.cra_capa, 1, 0); if (rc != 0) { CERROR("prep_req failed: %d\n", rc); - GOTO(out, req = ERR_PTR(rc)); - } + GOTO(out, rc); + } + req->rq_interpret_reply = brw_interpret; if (cmd & OBD_BRW_MEMALLOC) req->rq_memalloc = 1; @@ -2420,7 +2030,9 @@ out: cfs_memory_pressure_restore(mpflag); capa_put(crattr.cra_capa); - if (IS_ERR(req)) { + if (rc != 0) { + LASSERT(req == NULL); + if (oa) OBDO_FREE(oa); if (pga) @@ -2439,724 +2051,49 @@ out: oap->oap_count); continue; } - osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req)); - } - if (clerq && !IS_ERR(clerq)) - cl_req_completion(env, clerq, PTR_ERR(req)); - } - RETURN(req); -} - -/** - * prepare pages for ASYNC io and put pages in send queue. - * - * \param cmd OBD_BRW_* macroses - * \param lop pending pages - * - * \return zero if no page added to send queue. - * \return 1 if pages successfully added to send queue. - * \return negative on errors. - */ -static int -osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, - struct lov_oinfo *loi, int cmd, - struct loi_oap_pages *lop, pdl_policy_t pol) -{ - struct ptlrpc_request *req; - obd_count page_count = 0; - struct osc_async_page *oap = NULL, *tmp; - struct osc_brw_async_args *aa; - const struct obd_async_page_ops *ops; - CFS_LIST_HEAD(rpc_list); - int srvlock = 0, mem_tight = 0; - struct cl_object *clob = NULL; - obd_off starting_offset = OBD_OBJECT_EOF; - unsigned int ending_offset; - int starting_page_off = 0; - ENTRY; - - /* ASYNC_HP pages first. At present, when the lock the pages is - * to be canceled, the pages covered by the lock will be sent out - * with ASYNC_HP. We have to send out them as soon as possible. */ - cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) { - if (oap->oap_async_flags & ASYNC_HP) - cfs_list_move(&oap->oap_pending_item, &rpc_list); - else if (!(oap->oap_brw_flags & OBD_BRW_SYNC)) - /* only do this for writeback pages. */ - cfs_list_move_tail(&oap->oap_pending_item, &rpc_list); - if (++page_count >= cli->cl_max_pages_per_rpc) - break; - } - cfs_list_splice_init(&rpc_list, &lop->lop_pending); - page_count = 0; - - /* first we find the pages we're allowed to work with */ - cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending, - oap_pending_item) { - ops = oap->oap_caller_ops; - - LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, " - "magic 0x%x\n", oap, oap->oap_magic); - - if (clob == NULL) { - /* pin object in memory, so that completion call-backs - * can be safely called under client_obd_list lock. */ - clob = osc_oap2cl_page(oap)->cp_obj; - cl_object_get(clob); - } - - if (page_count != 0 && - srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) { - CDEBUG(D_PAGE, "SRVLOCK flag mismatch," - " oap %p, page %p, srvlock %u\n", - oap, oap->oap_brw_page.pg, (unsigned)!srvlock); - break; - } - - /* If there is a gap at the start of this page, it can't merge - * with any previous page, so we'll hand the network a - * "fragmented" page array that it can't transfer in 1 RDMA */ - if (oap->oap_obj_off < starting_offset) { - if (starting_page_off != 0) - break; - - starting_page_off = oap->oap_page_off; - starting_offset = oap->oap_obj_off + starting_page_off; - } else if (oap->oap_page_off != 0) - break; - - /* in llite being 'ready' equates to the page being locked - * until completion unlocks it. commit_write submits a page - * as not ready because its unlock will happen unconditionally - * as the call returns. if we race with commit_write giving - * us that page we don't want to create a hole in the page - * stream, so we stop and leave the rpc to be fired by - * another dirtier or kupdated interval (the not ready page - * will still be on the dirty list). we could call in - * at the end of ll_file_write to process the queue again. */ - if (!(oap->oap_async_flags & ASYNC_READY)) { - int rc = ops->ap_make_ready(env, oap->oap_caller_data, - cmd); - if (rc < 0) - CDEBUG(D_INODE, "oap %p page %p returned %d " - "instead of ready\n", oap, - oap->oap_page, rc); - switch (rc) { - case -EAGAIN: - /* llite is telling us that the page is still - * in commit_write and that we should try - * and put it in an rpc again later. we - * break out of the loop so we don't create - * a hole in the sequence of pages in the rpc - * stream.*/ - oap = NULL; - break; - case -EINTR: - /* the io isn't needed.. tell the checks - * below to complete the rpc with EINTR */ - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags |= ASYNC_COUNT_STABLE; - cfs_spin_unlock(&oap->oap_lock); - oap->oap_count = -EINTR; - break; - case 0: - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags |= ASYNC_READY; - cfs_spin_unlock(&oap->oap_lock); - break; - default: - LASSERTF(0, "oap %p page %p returned %d " - "from make_ready\n", oap, - oap->oap_page, rc); - break; - } - } - if (oap == NULL) - break; - - /* take the page out of our book-keeping */ - cfs_list_del_init(&oap->oap_pending_item); - lop_update_pending(cli, lop, cmd, -1); - cfs_list_del_init(&oap->oap_urgent_item); - - /* ask the caller for the size of the io as the rpc leaves. */ - if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) { - oap->oap_count = - ops->ap_refresh_count(env, oap->oap_caller_data, - cmd); - LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE); - } - if (oap->oap_count <= 0) { - CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap, - oap->oap_count); - osc_ap_completion(env, cli, NULL, - oap, 0, oap->oap_count); - continue; - } - - /* now put the page back in our accounting */ - cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list); - if (page_count++ == 0) - srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK); - - if (oap->oap_brw_flags & OBD_BRW_MEMALLOC) - mem_tight = 1; - - /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized - * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads - * have the same alignment as the initial writes that allocated - * extents on the server. */ - ending_offset = oap->oap_obj_off + oap->oap_page_off + - oap->oap_count; - if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1))) - break; - - if (page_count >= cli->cl_max_pages_per_rpc) - break; - - /* If there is a gap at the end of this page, it can't merge - * with any subsequent pages, so we'll hand the network a - * "fragmented" page array that it can't transfer in 1 RDMA */ - if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE) - break; - } - - loi_list_maint(cli, loi); - - client_obd_list_unlock(&cli->cl_loi_list_lock); - - if (clob != NULL) - cl_object_put(env, clob); - - if (page_count == 0) { - client_obd_list_lock(&cli->cl_loi_list_lock); - RETURN(0); - } - - req = osc_build_req(env, cli, &rpc_list, page_count, - mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd); - if (IS_ERR(req)) { - LASSERT(cfs_list_empty(&rpc_list)); - loi_list_maint(cli, loi); - RETURN(PTR_ERR(req)); - } - - aa = ptlrpc_req_async_args(req); - - starting_offset &= PTLRPC_MAX_BRW_SIZE - 1; - if (cmd == OBD_BRW_READ) { - lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); - lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); - lprocfs_oh_tally_log2(&cli->cl_read_offset_hist, - (starting_offset >> CFS_PAGE_SHIFT) + 1); - } else { - lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count); - lprocfs_oh_tally(&cli->cl_write_rpc_hist, - cli->cl_w_in_flight); - lprocfs_oh_tally_log2(&cli->cl_write_offset_hist, - (starting_offset >> CFS_PAGE_SHIFT) + 1); - } - - client_obd_list_lock(&cli->cl_loi_list_lock); - - if (cmd == OBD_BRW_READ) - cli->cl_r_in_flight++; - else - cli->cl_w_in_flight++; - - /* queued sync pages can be torn down while the pages - * were between the pending list and the rpc */ - tmp = NULL; - cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { - /* only one oap gets a request reference */ - if (tmp == NULL) - tmp = oap; - if (oap->oap_interrupted && !req->rq_intr) { - CDEBUG(D_INODE, "oap %p in req %p interrupted\n", - oap, req); - ptlrpc_mark_interrupted(req); - } - } - if (tmp != NULL) - tmp->oap_request = ptlrpc_request_addref(req); - - DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight", - page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight); - - req->rq_interpret_reply = brw_interpret; - - /* XXX: Maybe the caller can check the RPC bulk descriptor to see which - * CPU/NUMA node the majority of pages were allocated on, and try - * to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED) - * to reduce cross-CPU memory traffic. - * - * But on the other hand, we expect that multiple ptlrpcd threads - * and the initial write sponsor can run in parallel, especially - * when data checksum is enabled, which is CPU-bound operation and - * single ptlrpcd thread cannot process in time. So more ptlrpcd - * threads sharing BRW load (with PDL_POLICY_ROUND) seems better. - */ - ptlrpcd_add_req(req, pol, -1); - RETURN(1); -} - -#define LOI_DEBUG(LOI, STR, args...) \ - CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \ - !cfs_list_empty(&(LOI)->loi_ready_item) || \ - !cfs_list_empty(&(LOI)->loi_hp_ready_item), \ - (LOI)->loi_write_lop.lop_num_pending, \ - !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \ - (LOI)->loi_read_lop.lop_num_pending, \ - !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \ - args) \ - -/* This is called by osc_check_rpcs() to find which objects have pages that - * we could be sending. These lists are maintained by lop_makes_rpc(). */ -struct lov_oinfo *osc_next_loi(struct client_obd *cli) -{ - ENTRY; - - /* First return objects that have blocked locks so that they - * will be flushed quickly and other clients can get the lock, - * then objects which have pages ready to be stuffed into RPCs */ - if (!cfs_list_empty(&cli->cl_loi_hp_ready_list)) - RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next, - struct lov_oinfo, loi_hp_ready_item)); - if (!cfs_list_empty(&cli->cl_loi_ready_list)) - RETURN(cfs_list_entry(cli->cl_loi_ready_list.next, - struct lov_oinfo, loi_ready_item)); - - /* then if we have cache waiters, return all objects with queued - * writes. This is especially important when many small files - * have filled up the cache and not been fired into rpcs because - * they don't pass the nr_pending/object threshhold */ - if (!cfs_list_empty(&cli->cl_cache_waiters) && - !cfs_list_empty(&cli->cl_loi_write_list)) - RETURN(cfs_list_entry(cli->cl_loi_write_list.next, - struct lov_oinfo, loi_write_item)); - - /* then return all queued objects when we have an invalid import - * so that they get flushed */ - if (cli->cl_import == NULL || cli->cl_import->imp_invalid) { - if (!cfs_list_empty(&cli->cl_loi_write_list)) - RETURN(cfs_list_entry(cli->cl_loi_write_list.next, - struct lov_oinfo, - loi_write_item)); - if (!cfs_list_empty(&cli->cl_loi_read_list)) - RETURN(cfs_list_entry(cli->cl_loi_read_list.next, - struct lov_oinfo, loi_read_item)); - } - RETURN(NULL); -} - -static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi) -{ - struct osc_async_page *oap; - int hprpc = 0; - - if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) { - oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next, - struct osc_async_page, oap_urgent_item); - hprpc = !!(oap->oap_async_flags & ASYNC_HP); - } - - if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) { - oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next, - struct osc_async_page, oap_urgent_item); - hprpc = !!(oap->oap_async_flags & ASYNC_HP); - } - - return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc; -} - -/* called with the loi list lock held */ -static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc) -{ - struct lov_oinfo *loi; - int rc = 0, race_counter = 0; - pdl_policy_t pol; - ENTRY; - - pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND; - - while ((loi = osc_next_loi(cli)) != NULL) { - LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli)); - - if (osc_max_rpc_in_flight(cli, loi)) - break; - - /* attempt some read/write balancing by alternating between - * reads and writes in an object. The makes_rpc checks here - * would be redundant if we were getting read/write work items - * instead of objects. we don't want send_oap_rpc to drain a - * partial read pending queue when we're given this object to - * do io on writes while there are cache waiters */ - if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) { - rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE, - &loi->loi_write_lop, pol); - if (rc < 0) { - CERROR("Write request failed with %d\n", rc); - - /* osc_send_oap_rpc failed, mostly because of - * memory pressure. - * - * It can't break here, because if: - * - a page was submitted by osc_io_submit, so - * page locked; - * - no request in flight - * - no subsequent request - * The system will be in live-lock state, - * because there is no chance to call - * osc_io_unplug() and osc_check_rpcs() any - * more. pdflush can't help in this case, - * because it might be blocked at grabbing - * the page lock as we mentioned. - * - * Anyway, continue to drain pages. */ - /* break; */ - } - - if (rc > 0) - race_counter = 0; - else if (rc == 0) - race_counter++; - } - if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) { - rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ, - &loi->loi_read_lop, pol); - if (rc < 0) - CERROR("Read request failed with %d\n", rc); - - if (rc > 0) - race_counter = 0; - else if (rc == 0) - race_counter++; - } - - /* attempt some inter-object balancing by issuing rpcs - * for each object in turn */ - if (!cfs_list_empty(&loi->loi_hp_ready_item)) - cfs_list_del_init(&loi->loi_hp_ready_item); - if (!cfs_list_empty(&loi->loi_ready_item)) - cfs_list_del_init(&loi->loi_ready_item); - if (!cfs_list_empty(&loi->loi_write_item)) - cfs_list_del_init(&loi->loi_write_item); - if (!cfs_list_empty(&loi->loi_read_item)) - cfs_list_del_init(&loi->loi_read_item); - - loi_list_maint(cli, loi); - - /* send_oap_rpc fails with 0 when make_ready tells it to - * back off. llite's make_ready does this when it tries - * to lock a page queued for write that is already locked. - * we want to try sending rpcs from many objects, but we - * don't want to spin failing with 0. */ - if (race_counter == 10) - break; - } -} - -void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) -{ - osc_check_rpcs0(env, cli, 0); -} - -/** - * Non-blocking version of osc_enter_cache() that consumes grant only when it - * is available. - */ -int osc_enter_cache_try(const struct lu_env *env, - struct client_obd *cli, struct lov_oinfo *loi, - struct osc_async_page *oap, int transient) -{ - int has_grant; - - has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE; - if (has_grant) { - osc_consume_write_grant(cli, &oap->oap_brw_page); - if (transient) { - cli->cl_dirty_transit += CFS_PAGE_SIZE; - cfs_atomic_inc(&obd_dirty_transit_pages); - oap->oap_brw_flags |= OBD_BRW_NOCACHE; - } - } - return has_grant; -} - -/* Caller must hold loi_list_lock - we drop/regain it if we need to wait for - * grant or cache space. */ -static int osc_enter_cache(const struct lu_env *env, - struct client_obd *cli, struct lov_oinfo *loi, - struct osc_async_page *oap) -{ - struct osc_cache_waiter ocw; - struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); - int rc = -EDQUOT; - ENTRY; - - CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu " - "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages), - cli->cl_dirty_max, obd_max_dirty_pages, - cli->cl_lost_grant, cli->cl_avail_grant); - - /* force the caller to try sync io. this can jump the list - * of queued writes and create a discontiguous rpc stream */ - if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) || - cli->cl_dirty_max < CFS_PAGE_SIZE || - cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) - RETURN(-EDQUOT); - - /* Hopefully normal case - cache space and write credits available */ - if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max && - cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages && - osc_enter_cache_try(env, cli, loi, oap, 0)) - RETURN(0); - - /* We can get here for two reasons: too many dirty pages in cache, or - * run out of grants. In both cases we should write dirty pages out. - * Adding a cache waiter will trigger urgent write-out no matter what - * RPC size will be. - * The exiting condition is no avail grants and no dirty pages caching, - * that really means there is no space on the OST. */ - cfs_waitq_init(&ocw.ocw_waitq); - ocw.ocw_oap = oap; - while (cli->cl_dirty > 0) { - cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters); - ocw.ocw_rc = 0; - - loi_list_maint(cli, loi); - osc_check_rpcs(env, cli); - client_obd_list_unlock(&cli->cl_loi_list_lock); - - CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n", - cli->cl_import->imp_obd->obd_name, &ocw, oap); - - rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry), &lwi); - - client_obd_list_lock(&cli->cl_loi_list_lock); - cfs_list_del_init(&ocw.ocw_entry); - if (rc < 0) - break; - - rc = ocw.ocw_rc; - if (rc != -EDQUOT) - break; - } - - RETURN(rc); -} - - -int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, - struct lov_oinfo *loi, cfs_page_t *page, - obd_off offset, const struct obd_async_page_ops *ops, - void *data, void **res, int nocache, - struct lustre_handle *lockh) -{ - struct osc_async_page *oap; - - ENTRY; - - if (!page) - return cfs_size_round(sizeof(*oap)); - - oap = *res; - oap->oap_magic = OAP_MAGIC; - oap->oap_cli = &exp->exp_obd->u.cli; - oap->oap_loi = loi; - - oap->oap_caller_ops = ops; - oap->oap_caller_data = data; - - oap->oap_page = page; - oap->oap_obj_off = offset; - if (!client_is_remote(exp) && - cfs_capable(CFS_CAP_SYS_RESOURCE)) - oap->oap_brw_flags = OBD_BRW_NOQUOTA; - - LASSERT(!(offset & ~CFS_PAGE_MASK)); - - CFS_INIT_LIST_HEAD(&oap->oap_pending_item); - CFS_INIT_LIST_HEAD(&oap->oap_urgent_item); - CFS_INIT_LIST_HEAD(&oap->oap_rpc_item); - CFS_INIT_LIST_HEAD(&oap->oap_page_list); - - cfs_spin_lock_init(&oap->oap_lock); - CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset); - RETURN(0); -} - -int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp, - struct lov_stripe_md *lsm, struct lov_oinfo *loi, - struct osc_async_page *oap, int cmd, int off, - int count, obd_flag brw_flags, enum async_flags async_flags) -{ - struct client_obd *cli = &exp->exp_obd->u.cli; - int rc = 0; - ENTRY; - - if (oap->oap_magic != OAP_MAGIC) - RETURN(-EINVAL); - - if (cli->cl_import == NULL || cli->cl_import->imp_invalid) - RETURN(-EIO); - - if (!cfs_list_empty(&oap->oap_pending_item) || - !cfs_list_empty(&oap->oap_urgent_item) || - !cfs_list_empty(&oap->oap_rpc_item)) - RETURN(-EBUSY); - - /* check if the file's owner/group is over quota */ - if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) { - struct cl_object *obj; - struct cl_attr attr; /* XXX put attr into thread info */ - unsigned int qid[MAXQUOTAS]; - - obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj); - - cl_object_attr_lock(obj); - rc = cl_object_attr_get(env, obj, &attr); - cl_object_attr_unlock(obj); - - qid[USRQUOTA] = attr.cat_uid; - qid[GRPQUOTA] = attr.cat_gid; - if (rc == 0 && - osc_quota_chkdq(cli, qid) == NO_QUOTA) - rc = -EDQUOT; - if (rc) - RETURN(rc); - } - - if (loi == NULL) - loi = lsm->lsm_oinfo[0]; - - client_obd_list_lock(&cli->cl_loi_list_lock); - - LASSERT(off + count <= CFS_PAGE_SIZE); - oap->oap_cmd = cmd; - oap->oap_page_off = off; - oap->oap_count = count; - oap->oap_brw_flags = brw_flags; - /* Give a hint to OST that requests are coming from kswapd - bug19529 */ - if (cfs_memory_pressure_get()) - oap->oap_brw_flags |= OBD_BRW_MEMALLOC; - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags = async_flags; - cfs_spin_unlock(&oap->oap_lock); - - if (cmd & OBD_BRW_WRITE) { - rc = osc_enter_cache(env, cli, loi, oap); - if (rc) { - client_obd_list_unlock(&cli->cl_loi_list_lock); - RETURN(rc); - } - } - - LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page, - cmd); - - osc_oap_to_pending(oap); - loi_list_maint(cli, loi); - if (!osc_max_rpc_in_flight(cli, loi) && - lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) { - LASSERT(cli->cl_writeback_work != NULL); - rc = ptlrpcd_queue_work(cli->cl_writeback_work); - - CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n", - cli, rc); - } - client_obd_list_unlock(&cli->cl_loi_list_lock); - - RETURN(0); -} - -/* aka (~was & now & flag), but this is more clear :) */ -#define SETTING(was, now, flag) (!(was & flag) && (now & flag)) - -int osc_set_async_flags_base(struct client_obd *cli, - struct lov_oinfo *loi, struct osc_async_page *oap, - obd_flag async_flags) -{ - struct loi_oap_pages *lop; - int flags = 0; - ENTRY; - - LASSERT(!cfs_list_empty(&oap->oap_pending_item)); - - if (oap->oap_cmd & OBD_BRW_WRITE) { - lop = &loi->loi_write_lop; - } else { - lop = &loi->loi_read_lop; - } - - if ((oap->oap_async_flags & async_flags) == async_flags) - RETURN(0); - - if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY)) - flags |= ASYNC_READY; - - if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) && - cfs_list_empty(&oap->oap_rpc_item)) { - if (oap->oap_async_flags & ASYNC_HP) - cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent); - else - cfs_list_add_tail(&oap->oap_urgent_item, - &lop->lop_urgent); - flags |= ASYNC_URGENT; - loi_list_maint(cli, loi); - } - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags |= flags; - cfs_spin_unlock(&oap->oap_lock); - - LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page, - oap->oap_async_flags); - RETURN(0); -} - -int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, - struct lov_oinfo *loi, struct osc_async_page *oap) -{ - struct client_obd *cli = &exp->exp_obd->u.cli; - struct loi_oap_pages *lop; - int rc = 0; - ENTRY; - - if (oap->oap_magic != OAP_MAGIC) - RETURN(-EINVAL); - - if (loi == NULL) - loi = lsm->lsm_oinfo[0]; - - if (oap->oap_cmd & OBD_BRW_WRITE) { - lop = &loi->loi_write_lop; - } else { - lop = &loi->loi_read_lop; - } - - client_obd_list_lock(&cli->cl_loi_list_lock); - - if (!cfs_list_empty(&oap->oap_rpc_item)) - GOTO(out, rc = -EBUSY); - - osc_exit_cache(cli, oap, 0); - osc_wake_cache_waiters(cli); - - if (!cfs_list_empty(&oap->oap_urgent_item)) { - cfs_list_del_init(&oap->oap_urgent_item); - cfs_spin_lock(&oap->oap_lock); - oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP); - cfs_spin_unlock(&oap->oap_lock); - } - if (!cfs_list_empty(&oap->oap_pending_item)) { - cfs_list_del_init(&oap->oap_pending_item); - lop_update_pending(cli, lop, oap->oap_cmd, -1); - } - loi_list_maint(cli, loi); - LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page); -out: - client_obd_list_unlock(&cli->cl_loi_list_lock); - RETURN(rc); + osc_ap_completion(env, cli, NULL, oap, 0, rc); + } + if (clerq && !IS_ERR(clerq)) + cl_req_completion(env, clerq, rc); + } else { + struct osc_async_page *tmp = NULL; + + /* queued sync pages can be torn down while the pages + * were between the pending list and the rpc */ + LASSERT(aa != NULL); + client_obd_list_lock(&cli->cl_loi_list_lock); + cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { + /* only one oap gets a request reference */ + if (tmp == NULL) + tmp = oap; + if (oap->oap_interrupted && !req->rq_intr) { + CDEBUG(D_INODE, "oap %p in req %p interrupted\n", + oap, req); + ptlrpc_mark_interrupted(req); + } + } + if (tmp != NULL) + tmp->oap_request = ptlrpc_request_addref(req); + + DEBUG_REQ(D_INODE,req, "%d pages, aa %p. now %dr/%dw in flight", + page_count, aa, cli->cl_r_in_flight, + cli->cl_w_in_flight); + + /* XXX: Maybe the caller can check the RPC bulk descriptor to + * see which CPU/NUMA node the majority of pages were allocated + * on, and try to assign the async RPC to the CPU core + * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic. + * + * But on the other hand, we expect that multiple ptlrpcd + * threads and the initial write sponsor can run in parallel, + * especially when data checksum is enabled, which is CPU-bound + * operation and single ptlrpcd thread cannot process in time. + * So more ptlrpcd threads sharing BRW load + * (with PDL_POLICY_ROUND) seems better. + */ + ptlrpcd_add_req(req, pol, -1); + } + RETURN(rc); } static int osc_set_lock_data_with_check(struct ldlm_lock *lock, @@ -4424,7 +3361,7 @@ static int osc_import_event(struct obd_device *obd, client_obd_list_lock(&cli->cl_loi_list_lock); /* all pages go to failing rpcs due to the invalid * import */ - osc_check_rpcs(env, cli); + osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND); client_obd_list_unlock(&cli->cl_loi_list_lock); ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); @@ -4500,6 +3437,18 @@ static int osc_cancel_for_recovery(struct ldlm_lock *lock) RETURN(0); } +static int brw_queue_work(const struct lu_env *env, void *data) +{ + struct client_obd *cli = data; + + CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli); + + client_obd_list_lock(&cli->cl_loi_list_lock); + osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); + client_obd_list_unlock(&cli->cl_loi_list_lock); + RETURN(0); +} + int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { struct client_obd *cli = &obd->u.cli; -- 1.8.3.1