#define MAX_OBD_DEVICES 8192
-/* this is really local to the OSC */
-struct loi_oap_pages {
- cfs_list_t lop_pending;
- cfs_list_t lop_urgent;
- cfs_list_t lop_pending_group;
- int lop_num_pending;
-};
-
struct osc_async_rc {
int ar_rc;
int ar_force_sync;
int loi_ost_idx; /* OST stripe index in lov_tgt_desc->tgts */
int loi_ost_gen; /* generation of this loi_ost_idx */
- /* used by the osc to keep track of what objects to build into rpcs */
- struct loi_oap_pages loi_read_lop;
- struct loi_oap_pages loi_write_lop;
- cfs_list_t loi_ready_item;
- cfs_list_t loi_hp_ready_item;
- cfs_list_t loi_write_item;
- cfs_list_t loi_read_item;
-
unsigned long loi_kms_valid:1;
__u64 loi_kms; /* known minimum size */
struct ost_lvb loi_lvb;
static inline void loi_init(struct lov_oinfo *loi)
{
- CFS_INIT_LIST_HEAD(&loi->loi_read_lop.lop_pending);
- CFS_INIT_LIST_HEAD(&loi->loi_read_lop.lop_urgent);
- CFS_INIT_LIST_HEAD(&loi->loi_read_lop.lop_pending_group);
- CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending);
- CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_urgent);
- CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending_group);
- CFS_INIT_LIST_HEAD(&loi->loi_ready_item);
- CFS_INIT_LIST_HEAD(&loi->loi_hp_ready_item);
- CFS_INIT_LIST_HEAD(&loi->loi_write_item);
- CFS_INIT_LIST_HEAD(&loi->loi_read_item);
}
struct lov_stripe_md {
* Exact type of ->cl_loi_list_lock is defined in arch/obd.h together
* with client_obd_list_{un,}lock() and
* client_obd_list_lock_{init,done}() functions.
+ *
+ * NB by Jinshan: though field names are still _loi_, but actually
+ * osc_object{}s are in the list.
*/
client_obd_lock_t cl_loi_list_lock;
cfs_list_t cl_loi_ready_list;
MODULES := osc
-osc-objs := osc_request.o lproc_osc.o osc_create.o osc_dev.o osc_object.o osc_page.o osc_lock.o osc_io.o osc_quota.o
+osc-objs := osc_request.o lproc_osc.o osc_create.o osc_dev.o osc_object.o osc_page.o osc_lock.o osc_io.o osc_quota.o osc_cache.o
EXTRA_DIST = $(osc-objs:%.o=%.c) osc_internal.h osc_cl_internal.h
if LIBLUSTRE
noinst_LIBRARIES = libosc.a
-libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h osc_cl_internal.h osc_dev.c osc_object.c osc_page.c osc_lock.c osc_io.c osc_quota.c
+libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h osc_cl_internal.h osc_dev.c osc_object.c osc_page.c osc_lock.c osc_io.c osc_quota.c osc_cache.c
libosc_a_CPPFLAGS = $(LLCPPFLAGS)
libosc_a_CFLAGS = $(LLCFLAGS)
osc_lock.c \
osc_io.c \
osc_request.c \
- osc_quota.c
+ osc_quota.c \
+ osc_cache.c
osc_CFLAGS := $(EXTRA_KCFLAGS)
osc_LDFLAGS := $(EXTRA_KLDFLAGS)
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * osc cache management.
+ *
+ * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_OSC
+
+#include "osc_cl_internal.h"
+#include "osc_internal.h"
+
+static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
+ struct osc_async_page *oap);
+static int osc_enter_cache_try(const struct lu_env *env, struct client_obd *cli,
+ struct osc_async_page *oap, int transient);
+static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
+ int sent);
+
+/** \addtogroup osc
+ * @{
+ */
+
+#define OSC_IO_DEBUG(OSC, STR, args...) \
+ CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
+ !cfs_list_empty(&(OSC)->oo_ready_item) || \
+ !cfs_list_empty(&(OSC)->oo_hp_ready_item), \
+ (OSC)->oo_write_pages.oop_num_pending, \
+ !cfs_list_empty(&(OSC)->oo_write_pages.oop_urgent), \
+ (OSC)->oo_read_pages.oop_num_pending, \
+ !cfs_list_empty(&(OSC)->oo_read_pages.oop_urgent), \
+ args)
+
+static inline struct osc_page *oap2osc_page(struct osc_async_page *oap)
+{
+ return (struct osc_page *)container_of(oap, struct osc_page, ops_oap);
+}
+
+static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
+ int cmd)
+{
+ struct osc_page *opg = oap2osc_page(oap);
+ struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page);
+ int result;
+
+ LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
+
+ ENTRY;
+ result = cl_page_make_ready(env, page, CRT_WRITE);
+ if (result == 0)
+ opg->ops_submit_time = cfs_time_current();
+ RETURN(result);
+}
+
+static int osc_refresh_count(const struct lu_env *env,
+ struct osc_async_page *oap, int cmd)
+{
+ struct osc_page *opg = oap2osc_page(oap);
+ struct cl_page *page;
+ struct cl_object *obj;
+ struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+
+ int result;
+ loff_t kms;
+
+ /* readpage queues with _COUNT_STABLE, shouldn't get here. */
+ LASSERT(!(cmd & OBD_BRW_READ));
+ LASSERT(opg != NULL);
+ page = opg->ops_cl.cpl_page;
+ obj = opg->ops_cl.cpl_obj;
+
+ cl_object_attr_lock(obj);
+ result = cl_object_attr_get(env, obj, attr);
+ cl_object_attr_unlock(obj);
+ if (result < 0)
+ return result;
+ kms = attr->cat_kms;
+ if (cl_offset(obj, page->cp_index) >= kms)
+ /* catch race with truncate */
+ return 0;
+ else if (cl_offset(obj, page->cp_index + 1) > kms)
+ /* catch sub-page write at end of file */
+ return kms % CFS_PAGE_SIZE;
+ else
+ return CFS_PAGE_SIZE;
+}
+
+static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
+ int cmd, struct obdo *oa, int rc)
+{
+ struct osc_page *opg = oap2osc_page(oap);
+ struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page);
+ struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
+ enum cl_req_type crt;
+ int srvlock;
+
+ ENTRY;
+
+ cmd &= ~OBD_BRW_NOQUOTA;
+ LASSERT(equi(page->cp_state == CPS_PAGEIN, cmd == OBD_BRW_READ));
+ LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE));
+ LASSERT(opg->ops_transfer_pinned);
+
+ /*
+ * page->cp_req can be NULL if io submission failed before
+ * cl_req was allocated.
+ */
+ if (page->cp_req != NULL)
+ cl_req_page_done(env, page);
+ LASSERT(page->cp_req == NULL);
+
+ /* As the transfer for this page is being done, clear the flags */
+ cfs_spin_lock(&oap->oap_lock);
+ oap->oap_async_flags = 0;
+ cfs_spin_unlock(&oap->oap_lock);
+
+ crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
+ /* Clear opg->ops_transfer_pinned before VM lock is released. */
+ opg->ops_transfer_pinned = 0;
+
+ cfs_spin_lock(&obj->oo_seatbelt);
+ LASSERT(opg->ops_submitter != NULL);
+ LASSERT(!cfs_list_empty(&opg->ops_inflight));
+ cfs_list_del_init(&opg->ops_inflight);
+ cfs_spin_unlock(&obj->oo_seatbelt);
+
+ opg->ops_submit_time = 0;
+ srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
+
+ cl_page_completion(env, page, crt, rc);
+
+ /* statistic */
+ if (rc == 0 && srvlock) {
+ struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev;
+ struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
+ int bytes = oap->oap_count;
+
+ if (crt == CRT_READ)
+ stats->os_lockless_reads += bytes;
+ else
+ stats->os_lockless_writes += bytes;
+ }
+
+ /*
+ * This has to be the last operation with the page, as locks are
+ * released in cl_page_completion() and nothing except for the
+ * reference counter protects page from concurrent reclaim.
+ */
+ lu_ref_del(&page->cp_reference, "transfer", page);
+ /*
+ * As page->cp_obj is pinned by a reference from page->cp_req, it is
+ * safe to call cl_page_put() without risking object destruction in a
+ * non-blocking context.
+ */
+ cl_page_put(env, page);
+ RETURN(0);
+}
+
+/* caller must hold loi_list_lock */
+static void osc_consume_write_grant(struct client_obd *cli,
+ struct brw_page *pga)
+{
+ LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
+ LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
+ cfs_atomic_inc(&obd_dirty_pages);
+ cli->cl_dirty += CFS_PAGE_SIZE;
+ cli->cl_avail_grant -= CFS_PAGE_SIZE;
+ pga->flag |= OBD_BRW_FROM_GRANT;
+ CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
+ CFS_PAGE_SIZE, pga, pga->pg);
+ LASSERT(cli->cl_avail_grant >= 0);
+ osc_update_next_shrink(cli);
+}
+
+/* the companion to osc_consume_write_grant, called when a brw has completed.
+ * must be called with the loi lock held. */
+static void osc_release_write_grant(struct client_obd *cli,
+ struct brw_page *pga, int sent)
+{
+ int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
+ ENTRY;
+
+ LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
+ if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
+ EXIT;
+ return;
+ }
+
+ pga->flag &= ~OBD_BRW_FROM_GRANT;
+ cfs_atomic_dec(&obd_dirty_pages);
+ cli->cl_dirty -= CFS_PAGE_SIZE;
+ if (pga->flag & OBD_BRW_NOCACHE) {
+ pga->flag &= ~OBD_BRW_NOCACHE;
+ cfs_atomic_dec(&obd_dirty_transit_pages);
+ cli->cl_dirty_transit -= CFS_PAGE_SIZE;
+ }
+ if (!sent) {
+ /* Reclaim grant from truncated pages. This is used to solve
+ * write-truncate and grant all gone(to lost_grant) problem.
+ * For a vfs write this problem can be easily solved by a sync
+ * write, however, this is not an option for page_mkwrite()
+ * because grant has to be allocated before a page becomes
+ * dirty. */
+ if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE)
+ cli->cl_avail_grant += CFS_PAGE_SIZE;
+ else
+ cli->cl_lost_grant += CFS_PAGE_SIZE;
+ CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
+ cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
+ } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
+ /* For short writes we shouldn't count parts of pages that
+ * span a whole block on the OST side, or our accounting goes
+ * wrong. Should match the code in filter_grant_check. */
+ int offset = pga->off & ~CFS_PAGE_MASK;
+ int count = pga->count + (offset & (blocksize - 1));
+ int end = (offset + pga->count) & (blocksize - 1);
+ if (end)
+ count += blocksize - end;
+
+ cli->cl_lost_grant += CFS_PAGE_SIZE - count;
+ CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
+ CFS_PAGE_SIZE - count, cli->cl_lost_grant,
+ cli->cl_avail_grant, cli->cl_dirty);
+ }
+
+ EXIT;
+}
+
+/* The companion to osc_enter_cache(), called when @oap is no longer part of
+ * the dirty accounting. Writeback completes or truncate happens before
+ * writing starts. Must be called with the loi lock held. */
+static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
+ int sent)
+{
+ osc_release_write_grant(cli, &oap->oap_brw_page, sent);
+}
+
+/**
+ * Non-blocking version of osc_enter_cache() that consumes grant only when it
+ * is available.
+ */
+static int osc_enter_cache_try(const struct lu_env *env, struct client_obd *cli,
+ struct osc_async_page *oap, int transient)
+{
+ int has_grant;
+
+ has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
+ if (has_grant) {
+ osc_consume_write_grant(cli, &oap->oap_brw_page);
+ if (transient) {
+ cli->cl_dirty_transit += CFS_PAGE_SIZE;
+ cfs_atomic_inc(&obd_dirty_transit_pages);
+ oap->oap_brw_flags |= OBD_BRW_NOCACHE;
+ }
+ }
+ return has_grant;
+}
+
+/* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
+ * grant or cache space. */
+static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
+ struct osc_async_page *oap)
+{
+ struct osc_object *osc = oap->oap_obj;
+ struct lov_oinfo *loi = osc->oo_oinfo;
+ struct osc_cache_waiter ocw;
+ struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+ int rc = -EDQUOT;
+ ENTRY;
+
+ CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
+ "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
+ cli->cl_dirty_max, obd_max_dirty_pages,
+ cli->cl_lost_grant, cli->cl_avail_grant);
+
+ /* force the caller to try sync io. this can jump the list
+ * of queued writes and create a discontiguous rpc stream */
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
+ cli->cl_dirty_max < CFS_PAGE_SIZE ||
+ cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
+ RETURN(-EDQUOT);
+
+ /* Hopefully normal case - cache space and write credits available */
+ if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
+ cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
+ osc_enter_cache_try(env, cli, oap, 0))
+ RETURN(0);
+
+ /* We can get here for two reasons: too many dirty pages in cache, or
+ * run out of grants. In both cases we should write dirty pages out.
+ * Adding a cache waiter will trigger urgent write-out no matter what
+ * RPC size will be.
+ * The exiting condition is no avail grants and no dirty pages caching,
+ * that really means there is no space on the OST. */
+ cfs_waitq_init(&ocw.ocw_waitq);
+ ocw.ocw_oap = oap;
+ while (cli->cl_dirty > 0) {
+ cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
+ ocw.ocw_rc = 0;
+
+ osc_io_unplug(env, cli, osc, PDL_POLICY_ROUND);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+ CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
+ cli->cl_import->imp_obd->obd_name, &ocw, oap);
+
+ rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry),
+ &lwi);
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ cfs_list_del_init(&ocw.ocw_entry);
+ if (rc < 0)
+ break;
+
+ rc = ocw.ocw_rc;
+ if (rc != -EDQUOT)
+ break;
+ }
+
+ RETURN(rc);
+}
+
+/* caller must hold loi_list_lock */
+void osc_wake_cache_waiters(struct client_obd *cli)
+{
+ cfs_list_t *l, *tmp;
+ struct osc_cache_waiter *ocw;
+
+ ENTRY;
+ cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
+ /* if we can't dirty more, we must wait until some is written */
+ if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
+ (cfs_atomic_read(&obd_dirty_pages) + 1 >
+ obd_max_dirty_pages)) {
+ CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
+ "osc max %ld, sys max %d\n", cli->cl_dirty,
+ cli->cl_dirty_max, obd_max_dirty_pages);
+ return;
+ }
+
+ /* if still dirty cache but no grant wait for pending RPCs that
+ * may yet return us some grant before doing sync writes */
+ if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
+ CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
+ cli->cl_w_in_flight);
+ return;
+ }
+
+ ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
+ cfs_list_del_init(&ocw->ocw_entry);
+ if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
+ /* no more RPCs in flight to return grant, do sync IO */
+ ocw->ocw_rc = -EDQUOT;
+ CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
+ } else {
+ osc_consume_write_grant(cli,
+ &ocw->ocw_oap->oap_brw_page);
+ }
+
+ CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n",
+ ocw, ocw->ocw_oap, cli->cl_avail_grant);
+
+ cfs_waitq_signal(&ocw->ocw_waitq);
+ }
+
+ EXIT;
+}
+
+static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc)
+{
+ struct osc_async_page *oap;
+ int hprpc = 0;
+
+ if (!cfs_list_empty(&osc->oo_write_pages.oop_urgent)) {
+ oap = cfs_list_entry(osc->oo_write_pages.oop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+ hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+ }
+
+ if (!hprpc && !cfs_list_empty(&osc->oo_read_pages.oop_urgent)) {
+ oap = cfs_list_entry(osc->oo_read_pages.oop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+ hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+ }
+
+ return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
+}
+
+/* This maintains the lists of pending pages to read/write for a given object
+ * (lop). This is used by osc_check_rpcs->osc_next_obj() and osc_list_maint()
+ * to quickly find objects that are ready to send an RPC. */
+static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc,
+ int cmd)
+{
+ struct osc_oap_pages *lop;
+ ENTRY;
+
+ if (cmd & OBD_BRW_WRITE) {
+ lop = &osc->oo_write_pages;
+ } else {
+ lop = &osc->oo_read_pages;
+ }
+
+ if (lop->oop_num_pending == 0)
+ RETURN(0);
+
+ /* if we have an invalid import we want to drain the queued pages
+ * by forcing them through rpcs that immediately fail and complete
+ * the pages. recovery relies on this to empty the queued pages
+ * before canceling the locks and evicting down the llite pages */
+ if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
+ RETURN(1);
+
+ /* stream rpcs in queue order as long as as there is an urgent page
+ * queued. this is our cheap solution for good batching in the case
+ * where writepage marks some random page in the middle of the file
+ * as urgent because of, say, memory pressure */
+ if (!cfs_list_empty(&lop->oop_urgent)) {
+ CDEBUG(D_CACHE, "urgent request forcing RPC\n");
+ RETURN(1);
+ }
+
+ if (cmd & OBD_BRW_WRITE) {
+ /* trigger a write rpc stream as long as there are dirtiers
+ * waiting for space. as they're waiting, they're not going to
+ * create more pages to coalesce with what's waiting.. */
+ if (!cfs_list_empty(&cli->cl_cache_waiters)) {
+ CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
+ RETURN(1);
+ }
+ }
+ if (lop->oop_num_pending >= cli->cl_max_pages_per_rpc)
+ RETURN(1);
+
+ RETURN(0);
+}
+
+static void lop_update_pending(struct client_obd *cli,
+ struct osc_oap_pages *lop, int cmd, int delta)
+{
+ lop->oop_num_pending += delta;
+ if (cmd & OBD_BRW_WRITE)
+ cli->cl_pending_w_pages += delta;
+ else
+ cli->cl_pending_r_pages += delta;
+}
+
+static int osc_makes_hprpc(struct osc_oap_pages *lop)
+{
+ struct osc_async_page *oap;
+ ENTRY;
+
+ if (cfs_list_empty(&lop->oop_urgent))
+ RETURN(0);
+
+ oap = cfs_list_entry(lop->oop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+
+ if (oap->oap_async_flags & ASYNC_HP) {
+ CDEBUG(D_CACHE, "hp request forcing RPC\n");
+ RETURN(1);
+ }
+
+ RETURN(0);
+}
+
+static void on_list(cfs_list_t *item, cfs_list_t *list, int should_be_on)
+{
+ if (cfs_list_empty(item) && should_be_on)
+ cfs_list_add_tail(item, list);
+ else if (!cfs_list_empty(item) && !should_be_on)
+ cfs_list_del_init(item);
+}
+
+/* maintain the osc's cli list membership invariants so that osc_send_oap_rpc
+ * can find pages to build into rpcs quickly */
+static void osc_list_maint(struct client_obd *cli, struct osc_object *osc)
+{
+ if (osc_makes_hprpc(&osc->oo_write_pages) ||
+ osc_makes_hprpc(&osc->oo_read_pages)) {
+ /* HP rpc */
+ on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list, 0);
+ on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
+ } else {
+ on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
+ on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list,
+ osc_makes_rpc(cli, osc, OBD_BRW_WRITE) ||
+ osc_makes_rpc(cli, osc, OBD_BRW_READ));
+ }
+
+ on_list(&osc->oo_write_item, &cli->cl_loi_write_list,
+ osc->oo_write_pages.oop_num_pending);
+
+ on_list(&osc->oo_read_item, &cli->cl_loi_read_list,
+ osc->oo_read_pages.oop_num_pending);
+}
+
+/* this is trying to propogate async writeback errors back up to the
+ * application. As an async write fails we record the error code for later if
+ * the app does an fsync. As long as errors persist we force future rpcs to be
+ * sync so that the app can get a sync error and break the cycle of queueing
+ * pages for which writeback will fail. */
+static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
+ int rc)
+{
+ if (rc) {
+ if (!ar->ar_rc)
+ ar->ar_rc = rc;
+
+ ar->ar_force_sync = 1;
+ ar->ar_min_xid = ptlrpc_sample_next_xid();
+ return;
+
+ }
+
+ if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
+ ar->ar_force_sync = 0;
+}
+
+static void osc_oap_to_pending(struct osc_async_page *oap)
+{
+ struct osc_object *osc = oap->oap_obj;
+ struct osc_oap_pages *lop;
+
+ if (oap->oap_cmd & OBD_BRW_WRITE)
+ lop = &osc->oo_write_pages;
+ else
+ lop = &osc->oo_read_pages;
+
+ if (oap->oap_async_flags & ASYNC_HP)
+ cfs_list_add(&oap->oap_urgent_item, &lop->oop_urgent);
+ else if (oap->oap_async_flags & ASYNC_URGENT)
+ cfs_list_add_tail(&oap->oap_urgent_item, &lop->oop_urgent);
+ cfs_list_add_tail(&oap->oap_pending_item, &lop->oop_pending);
+ lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
+}
+
+/* this must be called holding the loi list lock to give coverage to exit_cache,
+ * async_flag maintenance, and oap_request */
+void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
+ struct obdo *oa, struct osc_async_page *oap,
+ int sent, int rc)
+{
+ struct osc_object *osc = oap->oap_obj;
+ struct lov_oinfo *loi = osc->oo_oinfo;
+ __u64 xid = 0;
+
+ ENTRY;
+ if (oap->oap_request != NULL) {
+ xid = ptlrpc_req_xid(oap->oap_request);
+ ptlrpc_req_finished(oap->oap_request);
+ oap->oap_request = NULL;
+ }
+
+ cfs_spin_lock(&oap->oap_lock);
+ oap->oap_async_flags = 0;
+ cfs_spin_unlock(&oap->oap_lock);
+ oap->oap_interrupted = 0;
+
+ if (oap->oap_cmd & OBD_BRW_WRITE) {
+ osc_process_ar(&cli->cl_ar, xid, rc);
+ osc_process_ar(&loi->loi_ar, xid, rc);
+ }
+
+ if (rc == 0 && oa != NULL) {
+ if (oa->o_valid & OBD_MD_FLBLOCKS)
+ loi->loi_lvb.lvb_blocks = oa->o_blocks;
+ if (oa->o_valid & OBD_MD_FLMTIME)
+ loi->loi_lvb.lvb_mtime = oa->o_mtime;
+ if (oa->o_valid & OBD_MD_FLATIME)
+ loi->loi_lvb.lvb_atime = oa->o_atime;
+ if (oa->o_valid & OBD_MD_FLCTIME)
+ loi->loi_lvb.lvb_ctime = oa->o_ctime;
+ }
+
+ rc = osc_completion(env, oap, oap->oap_cmd, oa, rc);
+
+ /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
+ * start, but OSC calls it under lock and thus we can add oap back to
+ * pending safely */
+ if (rc)
+ /* upper layer wants to leave the page on pending queue */
+ osc_oap_to_pending(oap);
+ else
+ osc_exit_cache(cli, oap, sent);
+ EXIT;
+}
+
+/**
+ * prepare pages for ASYNC io and put pages in send queue.
+ *
+ * \param cmd OBD_BRW_* macroses
+ * \param lop pending pages
+ *
+ * \return zero if no page added to send queue.
+ * \return 1 if pages successfully added to send queue.
+ * \return negative on errors.
+ */
+static int
+osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
+ struct osc_object *osc, int cmd,
+ struct osc_oap_pages *lop, pdl_policy_t pol)
+{
+ obd_count page_count = 0;
+ struct osc_async_page *oap = NULL, *tmp;
+ CFS_LIST_HEAD(rpc_list);
+ int srvlock = 0, mem_tight = 0;
+ obd_off starting_offset = OBD_OBJECT_EOF;
+ unsigned int ending_offset;
+ int starting_page_off = 0;
+ int rc;
+ ENTRY;
+
+ /* ASYNC_HP pages first. At present, when the lock the pages is
+ * to be canceled, the pages covered by the lock will be sent out
+ * with ASYNC_HP. We have to send out them as soon as possible. */
+ cfs_list_for_each_entry_safe(oap, tmp, &lop->oop_urgent, oap_urgent_item) {
+ if (oap->oap_async_flags & ASYNC_HP)
+ cfs_list_move(&oap->oap_pending_item, &rpc_list);
+ else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
+ /* only do this for writeback pages. */
+ cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
+ if (++page_count >= cli->cl_max_pages_per_rpc)
+ break;
+ }
+ cfs_list_splice_init(&rpc_list, &lop->oop_pending);
+ page_count = 0;
+
+ /* first we find the pages we're allowed to work with */
+ cfs_list_for_each_entry_safe(oap, tmp, &lop->oop_pending,
+ oap_pending_item) {
+ LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
+ "magic 0x%x\n", oap, oap->oap_magic);
+
+ if (page_count != 0 &&
+ srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
+ CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
+ " oap %p, page %p, srvlock %u\n",
+ oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
+ break;
+ }
+
+ /* If there is a gap at the start of this page, it can't merge
+ * with any previous page, so we'll hand the network a
+ * "fragmented" page array that it can't transfer in 1 RDMA */
+ if (oap->oap_obj_off < starting_offset) {
+ if (starting_page_off != 0)
+ break;
+
+ starting_page_off = oap->oap_page_off;
+ starting_offset = oap->oap_obj_off + starting_page_off;
+ } else if (oap->oap_page_off != 0)
+ break;
+
+ /* in llite being 'ready' equates to the page being locked
+ * until completion unlocks it. commit_write submits a page
+ * as not ready because its unlock will happen unconditionally
+ * as the call returns. if we race with commit_write giving
+ * us that page we don't want to create a hole in the page
+ * stream, so we stop and leave the rpc to be fired by
+ * another dirtier or kupdated interval (the not ready page
+ * will still be on the dirty list). we could call in
+ * at the end of ll_file_write to process the queue again. */
+ if (!(oap->oap_async_flags & ASYNC_READY)) {
+ int rc = osc_make_ready(env, oap, cmd);
+ if (rc < 0)
+ CDEBUG(D_INODE, "oap %p page %p returned %d "
+ "instead of ready\n", oap,
+ oap->oap_page, rc);
+ switch (rc) {
+ case -EAGAIN:
+ /* llite is telling us that the page is still
+ * in commit_write and that we should try
+ * and put it in an rpc again later. we
+ * break out of the loop so we don't create
+ * a hole in the sequence of pages in the rpc
+ * stream.*/
+ oap = NULL;
+ break;
+ case -EINTR:
+ /* the io isn't needed.. tell the checks
+ * below to complete the rpc with EINTR */
+ cfs_spin_lock(&oap->oap_lock);
+ oap->oap_async_flags |= ASYNC_COUNT_STABLE;
+ cfs_spin_unlock(&oap->oap_lock);
+ oap->oap_count = -EINTR;
+ break;
+ case 0:
+ cfs_spin_lock(&oap->oap_lock);
+ oap->oap_async_flags |= ASYNC_READY;
+ cfs_spin_unlock(&oap->oap_lock);
+ break;
+ default:
+ LASSERTF(0, "oap %p page %p returned %d "
+ "from make_ready\n", oap,
+ oap->oap_page, rc);
+ break;
+ }
+ }
+ if (oap == NULL)
+ break;
+
+ /* take the page out of our book-keeping */
+ cfs_list_del_init(&oap->oap_pending_item);
+ lop_update_pending(cli, lop, cmd, -1);
+ cfs_list_del_init(&oap->oap_urgent_item);
+
+ /* ask the caller for the size of the io as the rpc leaves. */
+ if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
+ oap->oap_count = osc_refresh_count(env, oap, cmd);
+ LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
+ }
+ if (oap->oap_count <= 0) {
+ CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
+ oap->oap_count);
+ osc_ap_completion(env, cli, NULL,
+ oap, 0, oap->oap_count);
+ continue;
+ }
+
+ /* now put the page back in our accounting */
+ cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
+ if (page_count++ == 0)
+ srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
+
+ if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
+ mem_tight = 1;
+
+ /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
+ * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
+ * have the same alignment as the initial writes that allocated
+ * extents on the server. */
+ ending_offset = oap->oap_obj_off + oap->oap_page_off +
+ oap->oap_count;
+ if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
+ break;
+
+ if (page_count >= cli->cl_max_pages_per_rpc)
+ break;
+
+ /* If there is a gap at the end of this page, it can't merge
+ * with any subsequent pages, so we'll hand the network a
+ * "fragmented" page array that it can't transfer in 1 RDMA */
+ if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
+ break;
+ }
+
+ osc_list_maint(cli, osc);
+
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+ if (page_count == 0) {
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ RETURN(0);
+ }
+
+ if (mem_tight)
+ cmd |= OBD_BRW_MEMALLOC;
+ rc = osc_build_rpc(env, cli, &rpc_list, page_count, cmd, pol);
+ if (rc != 0) {
+ LASSERT(cfs_list_empty(&rpc_list));
+ osc_list_maint(cli, osc);
+ RETURN(rc);
+ }
+
+ starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
+ if (cmd == OBD_BRW_READ) {
+ cli->cl_r_in_flight++;
+ lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
+ lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
+ lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
+ (starting_offset >> CFS_PAGE_SHIFT) + 1);
+ } else {
+ cli->cl_w_in_flight++;
+ lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
+ lprocfs_oh_tally(&cli->cl_write_rpc_hist,
+ cli->cl_w_in_flight);
+ lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
+ (starting_offset >> CFS_PAGE_SHIFT) + 1);
+ }
+
+ RETURN(1);
+}
+
+#define list_to_obj(list, item) \
+ cfs_list_entry((list)->next, struct osc_object, oo_##item)
+
+/* This is called by osc_check_rpcs() to find which objects have pages that
+ * we could be sending. These lists are maintained by osc_makes_rpc(). */
+static struct osc_object *osc_next_obj(struct client_obd *cli)
+{
+ ENTRY;
+
+ /* First return objects that have blocked locks so that they
+ * will be flushed quickly and other clients can get the lock,
+ * then objects which have pages ready to be stuffed into RPCs */
+ if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
+ RETURN(list_to_obj(&cli->cl_loi_hp_ready_list, hp_ready_item));
+ if (!cfs_list_empty(&cli->cl_loi_ready_list))
+ RETURN(list_to_obj(&cli->cl_loi_ready_list, ready_item));
+
+ /* then if we have cache waiters, return all objects with queued
+ * writes. This is especially important when many small files
+ * have filled up the cache and not been fired into rpcs because
+ * they don't pass the nr_pending/object threshhold */
+ if (!cfs_list_empty(&cli->cl_cache_waiters) &&
+ !cfs_list_empty(&cli->cl_loi_write_list))
+ RETURN(list_to_obj(&cli->cl_loi_write_list, write_item));
+
+ /* then return all queued objects when we have an invalid import
+ * so that they get flushed */
+ if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
+ if (!cfs_list_empty(&cli->cl_loi_write_list))
+ RETURN(list_to_obj(&cli->cl_loi_write_list,
+ write_item));
+ if (!cfs_list_empty(&cli->cl_loi_read_list))
+ RETURN(list_to_obj(&cli->cl_loi_read_list,
+ read_item));
+ }
+ RETURN(NULL);
+}
+
+/* called with the loi list lock held */
+static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
+ pdl_policy_t pol)
+{
+ struct osc_object *osc;
+ int rc = 0, race_counter = 0;
+ ENTRY;
+
+ while ((osc = osc_next_obj(cli)) != NULL) {
+ OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli));
+
+ if (osc_max_rpc_in_flight(cli, osc))
+ break;
+
+ /* attempt some read/write balancing by alternating between
+ * reads and writes in an object. The makes_rpc checks here
+ * would be redundant if we were getting read/write work items
+ * instead of objects. we don't want send_oap_rpc to drain a
+ * partial read pending queue when we're given this object to
+ * do io on writes while there are cache waiters */
+ if (osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) {
+ rc = osc_send_oap_rpc(env, cli, osc, OBD_BRW_WRITE,
+ &osc->oo_write_pages, pol);
+ if (rc < 0) {
+ CERROR("Write request failed with %d\n", rc);
+
+ /* osc_send_oap_rpc failed, mostly because of
+ * memory pressure.
+ *
+ * It can't break here, because if:
+ * - a page was submitted by osc_io_submit, so
+ * page locked;
+ * - no request in flight
+ * - no subsequent request
+ * The system will be in live-lock state,
+ * because there is no chance to call
+ * osc_io_unplug() and osc_check_rpcs() any
+ * more. pdflush can't help in this case,
+ * because it might be blocked at grabbing
+ * the page lock as we mentioned.
+ *
+ * Anyway, continue to drain pages. */
+ /* break; */
+ }
+
+ if (rc > 0)
+ race_counter = 0;
+ else if (rc == 0)
+ race_counter++;
+ }
+ if (osc_makes_rpc(cli, osc, OBD_BRW_READ)) {
+ rc = osc_send_oap_rpc(env, cli, osc, OBD_BRW_READ,
+ &osc->oo_read_pages, pol);
+ if (rc < 0)
+ CERROR("Read request failed with %d\n", rc);
+
+ if (rc > 0)
+ race_counter = 0;
+ else if (rc == 0)
+ race_counter++;
+ }
+
+ /* attempt some inter-object balancing by issuing rpcs
+ * for each object in turn */
+ if (!cfs_list_empty(&osc->oo_hp_ready_item))
+ cfs_list_del_init(&osc->oo_hp_ready_item);
+ if (!cfs_list_empty(&osc->oo_ready_item))
+ cfs_list_del_init(&osc->oo_ready_item);
+ if (!cfs_list_empty(&osc->oo_write_item))
+ cfs_list_del_init(&osc->oo_write_item);
+ if (!cfs_list_empty(&osc->oo_read_item))
+ cfs_list_del_init(&osc->oo_read_item);
+
+ osc_list_maint(cli, osc);
+
+ /* send_oap_rpc fails with 0 when make_ready tells it to
+ * back off. llite's make_ready does this when it tries
+ * to lock a page queued for write that is already locked.
+ * we want to try sending rpcs from many objects, but we
+ * don't want to spin failing with 0. */
+ if (race_counter == 10)
+ break;
+ }
+}
+
+void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
+ struct osc_object *osc, pdl_policy_t pol)
+{
+ if (osc)
+ osc_list_maint(cli, osc);
+ osc_check_rpcs(env, cli, pol);
+}
+
+int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
+ cfs_page_t *page, loff_t offset)
+{
+ struct obd_export *exp = osc_export(osc);
+ struct osc_async_page *oap = &ops->ops_oap;
+ ENTRY;
+
+ if (!page)
+ return cfs_size_round(sizeof(*oap));
+
+ oap->oap_magic = OAP_MAGIC;
+ oap->oap_cli = &exp->exp_obd->u.cli;
+ oap->oap_obj = osc;
+
+ oap->oap_page = page;
+ oap->oap_obj_off = offset;
+ LASSERT(!(offset & ~CFS_PAGE_MASK));
+
+ if (!client_is_remote(exp) && cfs_capable(CFS_CAP_SYS_RESOURCE))
+ oap->oap_brw_flags = OBD_BRW_NOQUOTA;
+
+ CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
+ CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
+ CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
+
+ cfs_spin_lock_init(&oap->oap_lock);
+ CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n",
+ oap, page, oap->oap_obj_off);
+ RETURN(0);
+}
+
+int osc_queue_async_io(const struct lu_env *env, struct osc_page *ops)
+{
+ struct osc_async_page *oap = &ops->ops_oap;
+ struct client_obd *cli = oap->oap_cli;
+ struct osc_object *osc = oap->oap_obj;
+ struct obd_export *exp = osc_export(osc);
+ int brw_flags = OBD_BRW_ASYNC;
+ int cmd = OBD_BRW_WRITE;
+ int rc = 0;
+ ENTRY;
+
+ if (oap->oap_magic != OAP_MAGIC)
+ RETURN(-EINVAL);
+
+ if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
+ RETURN(-EIO);
+
+ if (!cfs_list_empty(&oap->oap_pending_item) ||
+ !cfs_list_empty(&oap->oap_urgent_item) ||
+ !cfs_list_empty(&oap->oap_rpc_item))
+ RETURN(-EBUSY);
+
+ /* Set the OBD_BRW_SRVLOCK before the page is queued. */
+ brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
+ if (!client_is_remote(exp) && cfs_capable(CFS_CAP_SYS_RESOURCE)) {
+ brw_flags |= OBD_BRW_NOQUOTA;
+ cmd |= OBD_BRW_NOQUOTA;
+ }
+
+ /* check if the file's owner/group is over quota */
+ if (!(cmd & OBD_BRW_NOQUOTA)) {
+ struct cl_object *obj;
+ struct cl_attr *attr;
+ unsigned int qid[MAXQUOTAS];
+
+ obj = cl_object_top(&osc->oo_cl);
+ attr = &osc_env_info(env)->oti_attr;
+
+ cl_object_attr_lock(obj);
+ rc = cl_object_attr_get(env, obj, attr);
+ cl_object_attr_unlock(obj);
+
+ qid[USRQUOTA] = attr->cat_uid;
+ qid[GRPQUOTA] = attr->cat_gid;
+ if (rc == 0 &&
+ osc_quota_chkdq(cli, qid) == NO_QUOTA)
+ rc = -EDQUOT;
+ if (rc)
+ RETURN(rc);
+ }
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+
+ oap->oap_cmd = cmd;
+ oap->oap_page_off = ops->ops_from;
+ oap->oap_count = ops->ops_to - ops->ops_from;
+ oap->oap_async_flags = 0;
+ oap->oap_brw_flags = brw_flags;
+ /* Give a hint to OST that requests are coming from kswapd - bug19529 */
+ if (cfs_memory_pressure_get())
+ oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
+
+ rc = osc_enter_cache(env, cli, oap);
+ if (rc) {
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+ RETURN(rc);
+ }
+
+ OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n",
+ oap, oap->oap_page, cmd);
+
+ osc_oap_to_pending(oap);
+ osc_list_maint(cli, osc);
+ if (!osc_max_rpc_in_flight(cli, osc) &&
+ osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) {
+ LASSERT(cli->cl_writeback_work != NULL);
+ rc = ptlrpcd_queue_work(cli->cl_writeback_work);
+
+ CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
+ cli, rc);
+ }
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+ RETURN(0);
+}
+
+int osc_teardown_async_page(struct osc_object *obj, struct osc_page *ops)
+{
+ struct osc_async_page *oap = &ops->ops_oap;
+ struct client_obd *cli = oap->oap_cli;
+ struct osc_oap_pages *lop;
+ int rc = 0;
+ ENTRY;
+
+ if (oap->oap_magic != OAP_MAGIC)
+ RETURN(-EINVAL);
+
+ if (oap->oap_cmd & OBD_BRW_WRITE) {
+ lop = &obj->oo_write_pages;
+ } else {
+ lop = &obj->oo_read_pages;
+ }
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+
+ if (!cfs_list_empty(&oap->oap_rpc_item))
+ GOTO(out, rc = -EBUSY);
+
+ osc_exit_cache(cli, oap, 0);
+ osc_wake_cache_waiters(cli);
+
+ if (!cfs_list_empty(&oap->oap_urgent_item)) {
+ cfs_list_del_init(&oap->oap_urgent_item);
+ cfs_spin_lock(&oap->oap_lock);
+ oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
+ cfs_spin_unlock(&oap->oap_lock);
+ }
+ if (!cfs_list_empty(&oap->oap_pending_item)) {
+ cfs_list_del_init(&oap->oap_pending_item);
+ lop_update_pending(cli, lop, oap->oap_cmd, -1);
+ }
+ osc_list_maint(cli, obj);
+ OSC_IO_DEBUG(obj, "oap %p page %p torn down\n", oap, oap->oap_page);
+out:
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+ RETURN(rc);
+}
+
+/* aka (~was & now & flag), but this is more clear :) */
+#define SETTING(was, now, flag) (!(was & flag) && (now & flag))
+
+int osc_set_async_flags(struct osc_object *obj, struct osc_page *opg,
+ obd_flag async_flags)
+{
+ struct osc_async_page *oap = &opg->ops_oap;
+ struct osc_oap_pages *lop;
+ int flags = 0;
+ ENTRY;
+
+ LASSERT(!cfs_list_empty(&oap->oap_pending_item));
+
+ if (oap->oap_cmd & OBD_BRW_WRITE) {
+ lop = &obj->oo_write_pages;
+ } else {
+ lop = &obj->oo_read_pages;
+ }
+
+ if ((oap->oap_async_flags & async_flags) == async_flags)
+ RETURN(0);
+
+ if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
+ flags |= ASYNC_READY;
+
+ if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
+ cfs_list_empty(&oap->oap_rpc_item)) {
+ if (oap->oap_async_flags & ASYNC_HP)
+ cfs_list_add(&oap->oap_urgent_item, &lop->oop_urgent);
+ else
+ cfs_list_add_tail(&oap->oap_urgent_item,
+ &lop->oop_urgent);
+ flags |= ASYNC_URGENT;
+ osc_list_maint(oap->oap_cli, obj);
+ }
+ cfs_spin_lock(&oap->oap_lock);
+ oap->oap_async_flags |= flags;
+ cfs_spin_unlock(&oap->oap_lock);
+
+ OSC_IO_DEBUG(obj, "oap %p page %p has flags %x\n", oap,
+ oap->oap_page, oap->oap_async_flags);
+ RETURN(0);
+}
+
+/**
+ * this is called when a sync waiter receives an interruption. Its job is to
+ * get the caller woken as soon as possible. If its page hasn't been put in an
+ * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
+ * desiring interruption which will forcefully complete the rpc once the rpc
+ * has timed out.
+ */
+int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
+{
+ struct osc_async_page *oap = &ops->ops_oap;
+ int rc = -EBUSY;
+ ENTRY;
+
+ LASSERT(!oap->oap_interrupted);
+ oap->oap_interrupted = 1;
+
+ /* ok, it's been put in an rpc. only one oap gets a request reference */
+ if (oap->oap_request != NULL) {
+ ptlrpc_mark_interrupted(oap->oap_request);
+ ptlrpcd_wake(oap->oap_request);
+ ptlrpc_req_finished(oap->oap_request);
+ oap->oap_request = NULL;
+ }
+
+ /*
+ * page completion may be called only if ->cpo_prep() method was
+ * executed by osc_io_submit(), that also adds page the to pending list
+ */
+ if (!cfs_list_empty(&oap->oap_pending_item)) {
+ struct osc_oap_pages *lop;
+ struct osc_object *osc = oap->oap_obj;
+
+ cfs_list_del_init(&oap->oap_pending_item);
+ cfs_list_del_init(&oap->oap_urgent_item);
+
+ lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
+ &osc->oo_write_pages : &osc->oo_read_pages;
+ lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
+ osc_list_maint(oap->oap_cli, osc);
+ rc = osc_completion(env, oap, oap->oap_cmd, NULL, -EINTR);
+ }
+
+ RETURN(rc);
+}
+
+int osc_queue_sync_page(const struct lu_env *env, struct osc_page *opg,
+ int cmd, int brw_flags)
+{
+ struct osc_async_page *oap = &opg->ops_oap;
+ struct client_obd *cli = oap->oap_cli;
+ int flags = 0;
+ ENTRY;
+
+ oap->oap_cmd = cmd;
+ oap->oap_page_off = opg->ops_from;
+ oap->oap_count = opg->ops_to - opg->ops_from;
+ oap->oap_brw_flags = OBD_BRW_SYNC | brw_flags;
+
+ /* Give a hint to OST that requests are coming from kswapd - bug19529 */
+ if (cfs_memory_pressure_get())
+ oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
+
+ if (!client_is_remote(osc_export(cl2osc(opg->ops_cl.cpl_obj))) &&
+ cfs_capable(CFS_CAP_SYS_RESOURCE)) {
+ oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
+ oap->oap_cmd |= OBD_BRW_NOQUOTA;
+ }
+
+ if (oap->oap_cmd & OBD_BRW_READ)
+ flags = ASYNC_COUNT_STABLE;
+ else if (!(oap->oap_brw_page.flag & OBD_BRW_FROM_GRANT))
+ osc_enter_cache_try(env, cli, oap, 1);
+
+ cfs_spin_lock(&oap->oap_lock);
+ oap->oap_async_flags |= OSC_FLAGS | flags;
+ cfs_spin_unlock(&oap->oap_lock);
+
+ osc_oap_to_pending(oap);
+ RETURN(0);
+}
+
+/** @} osc */
struct cl_page_list oti_plist;
};
+/**
+ * Manage osc_async_page
+ */
+struct osc_oap_pages {
+ cfs_list_t oop_pending;
+ cfs_list_t oop_urgent;
+ int oop_num_pending;
+};
+
+static inline void osc_oap_pages_init(struct osc_oap_pages *list)
+{
+ CFS_INIT_LIST_HEAD(&list->oop_pending);
+ CFS_INIT_LIST_HEAD(&list->oop_urgent);
+ list->oop_num_pending = 0;
+}
+
struct osc_object {
struct cl_object oo_cl;
struct lov_oinfo *oo_oinfo;
* locked during take-off and landing.
*/
cfs_spinlock_t oo_seatbelt;
+
+ /**
+ * used by the osc to keep track of what objects to build into rpcs
+ */
+ struct osc_oap_pages oo_read_pages;
+ struct osc_oap_pages oo_write_pages;
+ cfs_list_t oo_ready_item;
+ cfs_list_t oo_hp_ready_item;
+ cfs_list_t oo_write_item;
+ cfs_list_t oo_read_item;
};
/*
pgoff_t start, pgoff_t end);
int osc_lvb_print (const struct lu_env *env, void *cookie,
lu_printer_t p, const struct ost_lvb *lvb);
+
void osc_io_submit_page(const struct lu_env *env,
struct osc_io *oio, struct osc_page *opg,
enum cl_req_type crt);
+void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
+ struct obdo *oa, struct osc_async_page *oap,
+ int sent, int rc);
+int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops);
+int osc_set_async_flags(struct osc_object *obj, struct osc_page *opg,
+ obd_flag async_flags);
+int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
+ cfs_page_t *page, loff_t offset);
+int osc_queue_async_io(const struct lu_env *env, struct osc_page *ops);
+int osc_teardown_async_page(struct osc_object *obj,
+ struct osc_page *ops);
+int osc_queue_sync_page(const struct lu_env *env, struct osc_page *ops,
+ int cmd, int brw_flags);
+void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
+ struct osc_object *osc, pdl_policy_t pol);
void osc_object_set_contended (struct osc_object *obj);
void osc_object_clear_contended(struct osc_object *obj);
ASYNC_HP = 0x10,
};
-struct obd_async_page_ops {
- int (*ap_make_ready)(const struct lu_env *env, void *data, int cmd);
- int (*ap_refresh_count)(const struct lu_env *env, void *data, int cmd);
- int (*ap_completion)(const struct lu_env *env,
- void *data, int cmd, struct obdo *oa, int rc);
-};
-
struct osc_async_page {
int oap_magic;
unsigned short oap_cmd;
struct ptlrpc_request *oap_request;
struct client_obd *oap_cli;
- struct lov_oinfo *oap_loi;
+ struct osc_object *oap_obj;
- const struct obd_async_page_ops *oap_caller_ops;
- void *oap_caller_data;
- cfs_list_t oap_page_list;
struct ldlm_lock *oap_ldlm_lock;
cfs_spinlock_t oap_lock;
};
void oscc_init(struct obd_device *obd);
void osc_wake_cache_waiters(struct client_obd *cli);
int osc_shrink_grant_to_target(struct client_obd *cli, long target);
+void osc_update_next_shrink(struct client_obd *cli);
/*
* cl integration.
obd_enqueue_update_f upcall, void *cookie,
struct ptlrpc_request_set *rqset);
-int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
- struct lov_oinfo *loi, cfs_page_t *page,
- obd_off offset, const struct obd_async_page_ops *ops,
- void *data, void **res, int nocache,
- struct lustre_handle *lockh);
-void osc_oap_to_pending(struct osc_async_page *oap);
-int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap);
-void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
-void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli);
-int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
- struct lov_stripe_md *lsm, struct lov_oinfo *loi,
- struct osc_async_page *oap, int cmd, int off,
- int count, obd_flag brw_flags, enum async_flags async_flags);
-int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
- struct lov_oinfo *loi, struct osc_async_page *oap);
int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *cfg);
-int osc_set_async_flags_base(struct client_obd *cli,
- struct lov_oinfo *loi, struct osc_async_page *oap,
- obd_flag async_flags);
-int osc_enter_cache_try(const struct lu_env *env,
- struct client_obd *cli, struct lov_oinfo *loi,
- struct osc_async_page *oap, int transient);
+int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
+ cfs_list_t *rpc_list, int page_count, int cmd,
+ pdl_policy_t p);
struct cl_page *osc_oap2cl_page(struct osc_async_page *oap);
extern cfs_spinlock_t osc_ast_guard;
rc == -EAGAIN || rc == -EINPROGRESS);
}
+static inline unsigned long rpcs_in_flight(struct client_obd *cli)
+{
+ return cli->cl_r_in_flight + cli->cl_w_in_flight;
+}
+
#ifndef min_t
#define min_t(type,x,y) \
({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
* Implementation of cl_io for OSC layer.
*
* Author: Nikita Danilov <nikita.danilov@sun.com>
+ * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
*/
#define DEBUG_SUBSYSTEM S_OSC
return container_of(oap, struct osc_page, ops_oap)->ops_cl.cpl_page;
}
-static void osc_io_unplug(const struct lu_env *env, struct osc_object *osc,
- struct client_obd *cli)
-{
- loi_list_maint(cli, osc->oo_oinfo);
- osc_check_rpcs(env, cli);
-}
-
/**
* An implementation of cl_io_operations::cio_io_submit() method for osc
* layer. Iterates over pages in the in-queue, prepares each for io by calling
* cl_page_prep() and then either submits them through osc_io_submit_page()
* or, if page is already submitted, changes osc flags through
- * osc_set_async_flags_base().
+ * osc_set_async_flags().
*/
static int osc_io_submit(const struct lu_env *env,
const struct cl_io_slice *ios,
osc_io_submit_page(env, cl2osc_io(env, ios),
opg, crt);
} else {
- result = osc_set_async_flags_base(cli,
- osc->oo_oinfo,
- oap,
- OSC_FLAGS);
- /*
- * bug 18881: we can't just break out here when
- * error occurs after cl_page_prep has been
- * called against the page. The correct
- * way is to call page's completion routine,
- * as in osc_oap_interrupted. For simplicity,
- * we just force osc_set_async_flags_base() to
- * not return error.
- */
+ result = osc_set_async_flags(osc, opg,
+ OSC_FLAGS);
+ /*
+ * bug 18881: we can't just break out here when
+ * error occurs after cl_page_prep has been
+ * called against the page. The correct
+ * way is to call page's completion routine,
+ * as in osc_oap_interrupted. For simplicity,
+ * we just force osc_set_async_flags() to
+ * not return error.
+ */
LASSERT(result == 0);
}
opg->ops_submit_time = cfs_time_current();
LASSERT(ergo(result == 0, osc == osc0));
if (queued > 0)
- osc_io_unplug(env, osc, cli);
+ osc_io_unplug(env, cli, osc, PDL_POLICY_ROUND);
if (osc0)
client_obd_list_unlock(&cli->cl_loi_list_lock);
CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result);
cfs_spin_lock_init(&osc->oo_seatbelt);
for (i = 0; i < CRT_NR; ++i)
CFS_INIT_LIST_HEAD(&osc->oo_inflight[i]);
- return 0;
+
+ osc_oap_pages_init(&osc->oo_read_pages);
+ osc_oap_pages_init(&osc->oo_write_pages);
+ CFS_INIT_LIST_HEAD(&osc->oo_ready_item);
+ CFS_INIT_LIST_HEAD(&osc->oo_hp_ready_item);
+ CFS_INIT_LIST_HEAD(&osc->oo_write_item);
+ CFS_INIT_LIST_HEAD(&osc->oo_read_item);
+
+ return 0;
}
static void osc_object_free(const struct lu_env *env, struct lu_object *obj)
const struct cl_page_slice *slice,
struct cl_io *unused)
{
- struct osc_page *opg = cl2osc_page(slice);
- struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
- int result;
- /* All cacheable IO is async-capable */
- int brw_flags = OBD_BRW_ASYNC;
- int noquota = 0;
-
- LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0));
- ENTRY;
-
- /* Set the OBD_BRW_SRVLOCK before the page is queued. */
- brw_flags |= opg->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
- if (!client_is_remote(osc_export(obj)) &&
- cfs_capable(CFS_CAP_SYS_RESOURCE)) {
- brw_flags |= OBD_BRW_NOQUOTA;
- noquota = OBD_BRW_NOQUOTA;
- }
-
- osc_page_transfer_get(opg, "transfer\0cache");
- result = osc_queue_async_io(env, osc_export(obj), NULL, obj->oo_oinfo,
- &opg->ops_oap, OBD_BRW_WRITE | noquota,
- opg->ops_from, opg->ops_to - opg->ops_from,
- brw_flags, 0);
- if (result != 0)
- osc_page_transfer_put(env, opg);
- else
- osc_page_transfer_add(env, opg, CRT_WRITE);
- RETURN(result);
+ struct osc_page *opg = cl2osc_page(slice);
+ int result;
+ ENTRY;
+
+ LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0));
+
+ osc_page_transfer_get(opg, "transfer\0cache");
+ result = osc_queue_async_io(env, opg);
+ if (result != 0)
+ osc_page_transfer_put(env, opg);
+ else
+ osc_page_transfer_add(env, opg, CRT_WRITE);
+ RETURN(result);
}
void osc_index2policy(ldlm_policy_data_t *policy, const struct cl_object *obj,
struct osc_async_page *oap = &opg->ops_oap;
struct osc_object *obj = cl2osc(slice->cpl_obj);
struct client_obd *cli = &osc_export(obj)->exp_obd->u.cli;
- struct lov_oinfo *loi = obj->oo_oinfo;
return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p: "
"1< %#x %d %u %s %s %s > "
- "2< "LPU64" %u %u %#x %#x | %p %p %p %p %p > "
+ "2< "LPU64" %u %u %#x %#x | %p %p %p > "
"3< %s %p %d %lu %d > "
"4< %d %d %d %lu %s | %s %s %s %s > "
"5< %s %s %s %s | %d %s %s | %d %s %s>\n",
/* 2 */
oap->oap_obj_off, oap->oap_page_off, oap->oap_count,
oap->oap_async_flags, oap->oap_brw_flags,
- oap->oap_request,
- oap->oap_cli, oap->oap_loi, oap->oap_caller_ops,
- oap->oap_caller_data,
+ oap->oap_request, oap->oap_cli, obj,
/* 3 */
osc_list(&opg->ops_inflight),
opg->ops_submitter, opg->ops_transfer_pinned,
osc_list(&cli->cl_loi_write_list),
osc_list(&cli->cl_loi_read_list),
/* 5 */
- osc_list(&loi->loi_ready_item),
- osc_list(&loi->loi_hp_ready_item),
- osc_list(&loi->loi_write_item),
- osc_list(&loi->loi_read_item),
- loi->loi_read_lop.lop_num_pending,
- osc_list(&loi->loi_read_lop.lop_pending),
- osc_list(&loi->loi_read_lop.lop_urgent),
- loi->loi_write_lop.lop_num_pending,
- osc_list(&loi->loi_write_lop.lop_pending),
- osc_list(&loi->loi_write_lop.lop_urgent));
+ osc_list(&obj->oo_ready_item),
+ osc_list(&obj->oo_hp_ready_item),
+ osc_list(&obj->oo_write_item),
+ osc_list(&obj->oo_read_item),
+ obj->oo_read_pages.oop_num_pending,
+ osc_list(&obj->oo_read_pages.oop_pending),
+ osc_list(&obj->oo_read_pages.oop_urgent),
+ obj->oo_write_pages.oop_num_pending,
+ osc_list(&obj->oo_write_pages.oop_pending),
+ osc_list(&obj->oo_write_pages.oop_urgent));
}
static void osc_page_delete(const struct lu_env *env,
const struct cl_page_slice *slice)
{
- struct osc_page *opg = cl2osc_page(slice);
- struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
- struct osc_async_page *oap = &opg->ops_oap;
+ struct osc_page *opg = cl2osc_page(slice);
+ struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
int rc;
LINVRNT(opg->ops_temp || osc_page_protected(env, opg, CLM_READ, 1));
ENTRY;
CDEBUG(D_TRACE, "%p\n", opg);
osc_page_transfer_put(env, opg);
- rc = osc_teardown_async_page(osc_export(obj), NULL, obj->oo_oinfo, oap);
+ rc = osc_teardown_async_page(obj, opg);
if (rc) {
CL_PAGE_DEBUG(D_ERROR, env, cl_page_top(slice->cpl_page),
"Trying to teardown failed: %d\n", rc);
* is completed, or not even queued. */
if (opg->ops_transfer_pinned)
/* FIXME: may not be interrupted.. */
- rc = osc_oap_interrupted(env, oap);
+ rc = osc_cancel_async_page(env, opg);
LASSERT(ergo(rc == 0, opg->ops_transfer_pinned == 0));
client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
return rc;
.cpo_cancel = osc_page_cancel
};
-static int osc_make_ready(const struct lu_env *env, void *data, int cmd)
-{
- struct osc_page *opg = data;
- struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page);
- int result;
-
- LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
- LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 1));
-
- ENTRY;
- result = cl_page_make_ready(env, page, CRT_WRITE);
- if (result == 0)
- opg->ops_submit_time = cfs_time_current();
- RETURN(result);
-}
-
-static int osc_refresh_count(const struct lu_env *env, void *data, int cmd)
-{
- struct cl_page *page;
- struct osc_page *osc = data;
- struct cl_object *obj;
- struct cl_attr *attr = &osc_env_info(env)->oti_attr;
-
- int result;
- loff_t kms;
-
- LINVRNT(osc_page_protected(env, osc, CLM_READ, 1));
-
- /* readpage queues with _COUNT_STABLE, shouldn't get here. */
- LASSERT(!(cmd & OBD_BRW_READ));
- LASSERT(osc != NULL);
- page = osc->ops_cl.cpl_page;
- obj = osc->ops_cl.cpl_obj;
-
- cl_object_attr_lock(obj);
- result = cl_object_attr_get(env, obj, attr);
- cl_object_attr_unlock(obj);
- if (result < 0)
- return result;
- kms = attr->cat_kms;
- if (cl_offset(obj, page->cp_index) >= kms)
- /* catch race with truncate */
- return 0;
- else if (cl_offset(obj, page->cp_index + 1) > kms)
- /* catch sub-page write at end of file */
- return kms % CFS_PAGE_SIZE;
- else
- return CFS_PAGE_SIZE;
-}
-
-static int osc_completion(const struct lu_env *env,
- void *data, int cmd, struct obdo *oa, int rc)
-{
- struct osc_page *opg = data;
- struct osc_async_page *oap = &opg->ops_oap;
- struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page);
- struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
- enum cl_req_type crt;
- int srvlock;
-
- LINVRNT(osc_page_protected(env, opg, CLM_READ, 1));
-
- ENTRY;
-
- cmd &= ~OBD_BRW_NOQUOTA;
- LASSERT(equi(page->cp_state == CPS_PAGEIN, cmd == OBD_BRW_READ));
- LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE));
- LASSERT(opg->ops_transfer_pinned);
-
- /*
- * page->cp_req can be NULL if io submission failed before
- * cl_req was allocated.
- */
- if (page->cp_req != NULL)
- cl_req_page_done(env, page);
- LASSERT(page->cp_req == NULL);
-
- /* As the transfer for this page is being done, clear the flags */
- cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags = 0;
- cfs_spin_unlock(&oap->oap_lock);
-
- crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
- /* Clear opg->ops_transfer_pinned before VM lock is released. */
- opg->ops_transfer_pinned = 0;
-
- cfs_spin_lock(&obj->oo_seatbelt);
- LASSERT(opg->ops_submitter != NULL);
- LASSERT(!cfs_list_empty(&opg->ops_inflight));
- cfs_list_del_init(&opg->ops_inflight);
- cfs_spin_unlock(&obj->oo_seatbelt);
-
- opg->ops_submit_time = 0;
- srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
-
- cl_page_completion(env, page, crt, rc);
-
- /* statistic */
- if (rc == 0 && srvlock) {
- struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev;
- struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
- int bytes = oap->oap_count;
-
- if (crt == CRT_READ)
- stats->os_lockless_reads += bytes;
- else
- stats->os_lockless_writes += bytes;
- }
-
- /*
- * This has to be the last operation with the page, as locks are
- * released in cl_page_completion() and nothing except for the
- * reference counter protects page from concurrent reclaim.
- */
- lu_ref_del(&page->cp_reference, "transfer", page);
- /*
- * As page->cp_obj is pinned by a reference from page->cp_req, it is
- * safe to call cl_page_put() without risking object destruction in a
- * non-blocking context.
- */
- cl_page_put(env, page);
- RETURN(0);
-}
-
-const static struct obd_async_page_ops osc_async_page_ops = {
- .ap_make_ready = osc_make_ready,
- .ap_refresh_count = osc_refresh_count,
- .ap_completion = osc_completion
-};
-
struct cl_page *osc_page_init(const struct lu_env *env,
struct cl_object *obj,
struct cl_page *page, cfs_page_t *vmpage)
OBD_SLAB_ALLOC_PTR_GFP(opg, osc_page_kmem, CFS_ALLOC_IO);
if (opg != NULL) {
- void *oap = &opg->ops_oap;
-
opg->ops_from = 0;
opg->ops_to = CFS_PAGE_SIZE;
- result = osc_prep_async_page(osc_export(osc),
- NULL, osc->oo_oinfo, vmpage,
- cl_offset(obj, page->cp_index),
- &osc_async_page_ops,
- opg, (void **)&oap, 1, NULL);
+ result = osc_prep_async_page(osc, opg, vmpage,
+ cl_offset(obj, page->cp_index));
if (result == 0) {
struct osc_io *oio = osc_env_io(env);
opg->ops_srvlock = osc_io_srvlock(oio);
struct osc_io *oio, struct osc_page *opg,
enum cl_req_type crt)
{
- struct osc_async_page *oap = &opg->ops_oap;
- struct client_obd *cli = oap->oap_cli;
- int flags = 0;
-
LINVRNT(osc_page_protected(env, opg,
crt == CRT_WRITE ? CLM_WRITE : CLM_READ, 1));
- oap->oap_page_off = opg->ops_from;
- oap->oap_count = opg->ops_to - opg->ops_from;
- /* Give a hint to OST that requests are coming from kswapd - bug19529 */
- if (cfs_memory_pressure_get())
- oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
- oap->oap_brw_flags |= OBD_BRW_SYNC;
- if (osc_io_srvlock(oio))
- oap->oap_brw_flags |= OBD_BRW_SRVLOCK;
-
- oap->oap_cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
- if (!client_is_remote(osc_export(cl2osc(opg->ops_cl.cpl_obj))) &&
- cfs_capable(CFS_CAP_SYS_RESOURCE)) {
- oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
- oap->oap_cmd |= OBD_BRW_NOQUOTA;
- }
-
- if (oap->oap_cmd & OBD_BRW_READ)
- flags = ASYNC_COUNT_STABLE;
- else if (!(oap->oap_brw_page.flag & OBD_BRW_FROM_GRANT))
- osc_enter_cache_try(env, cli, oap->oap_loi, oap, 1);
-
- cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags |= OSC_FLAGS | flags;
- cfs_spin_unlock(&oap->oap_lock);
+ osc_queue_sync_page(env, opg,
+ crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
+ osc_io_srvlock(oio) ? OBD_BRW_SRVLOCK : 0);
- osc_oap_to_pending(oap);
osc_page_transfer_get(opg, "transfer\0imm");
osc_page_transfer_add(env, opg, crt);
}
#include <lustre_debug.h>
#include <lustre_param.h>
#include "osc_internal.h"
+#include "osc_cl_internal.h"
static void osc_release_ppga(struct brw_page **ppga, obd_count count);
static int brw_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc);
-static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
- int ptlrpc);
int osc_cleanup(struct obd_device *obd);
/* Pack OSC object metadata for disk storage (LE byte order). */
}
-static void osc_update_next_shrink(struct client_obd *cli)
+void osc_update_next_shrink(struct client_obd *cli)
{
cli->cl_next_shrink_grant =
cfs_time_shift(cli->cl_grant_shrink_interval);
cli->cl_next_shrink_grant);
}
-/* caller must hold loi_list_lock */
-static void osc_consume_write_grant(struct client_obd *cli,
- struct brw_page *pga)
-{
- LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
- LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
- cfs_atomic_inc(&obd_dirty_pages);
- cli->cl_dirty += CFS_PAGE_SIZE;
- cli->cl_avail_grant -= CFS_PAGE_SIZE;
- pga->flag |= OBD_BRW_FROM_GRANT;
- CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
- CFS_PAGE_SIZE, pga, pga->pg);
- LASSERT(cli->cl_avail_grant >= 0);
- osc_update_next_shrink(cli);
-}
-
-/* the companion to osc_consume_write_grant, called when a brw has completed.
- * must be called with the loi lock held. */
-static void osc_release_write_grant(struct client_obd *cli,
- struct brw_page *pga, int sent)
-{
- int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
- ENTRY;
-
- LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
- if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
- EXIT;
- return;
- }
-
- pga->flag &= ~OBD_BRW_FROM_GRANT;
- cfs_atomic_dec(&obd_dirty_pages);
- cli->cl_dirty -= CFS_PAGE_SIZE;
- if (pga->flag & OBD_BRW_NOCACHE) {
- pga->flag &= ~OBD_BRW_NOCACHE;
- cfs_atomic_dec(&obd_dirty_transit_pages);
- cli->cl_dirty_transit -= CFS_PAGE_SIZE;
- }
- if (!sent) {
- /* Reclaim grant from truncated pages. This is used to solve
- * write-truncate and grant all gone(to lost_grant) problem.
- * For a vfs write this problem can be easily solved by a sync
- * write, however, this is not an option for page_mkwrite()
- * because grant has to be allocated before a page becomes
- * dirty. */
- if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE)
- cli->cl_avail_grant += CFS_PAGE_SIZE;
- else
- cli->cl_lost_grant += CFS_PAGE_SIZE;
- CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
- cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
- } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
- /* For short writes we shouldn't count parts of pages that
- * span a whole block on the OST side, or our accounting goes
- * wrong. Should match the code in filter_grant_check. */
- int offset = pga->off & ~CFS_PAGE_MASK;
- int count = pga->count + (offset & (blocksize - 1));
- int end = (offset + pga->count) & (blocksize - 1);
- if (end)
- count += blocksize - end;
-
- cli->cl_lost_grant += CFS_PAGE_SIZE - count;
- CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
- CFS_PAGE_SIZE - count, cli->cl_lost_grant,
- cli->cl_avail_grant, cli->cl_dirty);
- }
-
- EXIT;
-}
-
-static unsigned long rpcs_in_flight(struct client_obd *cli)
-{
- return cli->cl_r_in_flight + cli->cl_w_in_flight;
-}
-
-/* caller must hold loi_list_lock */
-void osc_wake_cache_waiters(struct client_obd *cli)
-{
- cfs_list_t *l, *tmp;
- struct osc_cache_waiter *ocw;
-
- ENTRY;
- cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
- /* if we can't dirty more, we must wait until some is written */
- if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
- (cfs_atomic_read(&obd_dirty_pages) + 1 >
- obd_max_dirty_pages)) {
- CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
- "osc max %ld, sys max %d\n", cli->cl_dirty,
- cli->cl_dirty_max, obd_max_dirty_pages);
- return;
- }
-
- /* if still dirty cache but no grant wait for pending RPCs that
- * may yet return us some grant before doing sync writes */
- if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
- CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
- cli->cl_w_in_flight);
- return;
- }
-
- ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
- cfs_list_del_init(&ocw->ocw_entry);
- if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
- /* no more RPCs in flight to return grant, do sync IO */
- ocw->ocw_rc = -EDQUOT;
- CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
- } else {
- osc_consume_write_grant(cli,
- &ocw->ocw_oap->oap_brw_page);
- }
-
- CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n",
- ocw, ocw->ocw_oap, cli->cl_avail_grant);
-
- cfs_waitq_signal(&ocw->ocw_waitq);
- }
-
- EXIT;
-}
-
static void __osc_update_grant(struct client_obd *cli, obd_size grant)
{
client_obd_list_lock(&cli->cl_loi_list_lock);
RETURN(rc);
}
-/* The companion to osc_enter_cache(), called when @oap is no longer part of
- * the dirty accounting. Writeback completes or truncate happens before
- * writing starts. Must be called with the loi lock held. */
-static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
- int sent)
-{
- osc_release_write_grant(cli, &oap->oap_brw_page, sent);
-}
-
-
-/* This maintains the lists of pending pages to read/write for a given object
- * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
- * to quickly find objects that are ready to send an RPC. */
-static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
- int cmd)
-{
- ENTRY;
-
- if (lop->lop_num_pending == 0)
- RETURN(0);
-
- /* if we have an invalid import we want to drain the queued pages
- * by forcing them through rpcs that immediately fail and complete
- * the pages. recovery relies on this to empty the queued pages
- * before canceling the locks and evicting down the llite pages */
- if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
- RETURN(1);
-
- /* stream rpcs in queue order as long as as there is an urgent page
- * queued. this is our cheap solution for good batching in the case
- * where writepage marks some random page in the middle of the file
- * as urgent because of, say, memory pressure */
- if (!cfs_list_empty(&lop->lop_urgent)) {
- CDEBUG(D_CACHE, "urgent request forcing RPC\n");
- RETURN(1);
- }
-
- if (cmd & OBD_BRW_WRITE) {
- /* trigger a write rpc stream as long as there are dirtiers
- * waiting for space. as they're waiting, they're not going to
- * create more pages to coalesce with what's waiting.. */
- if (!cfs_list_empty(&cli->cl_cache_waiters)) {
- CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
- RETURN(1);
- }
- }
- if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
- RETURN(1);
-
- RETURN(0);
-}
-
-static int lop_makes_hprpc(struct loi_oap_pages *lop)
-{
- struct osc_async_page *oap;
- ENTRY;
-
- if (cfs_list_empty(&lop->lop_urgent))
- RETURN(0);
-
- oap = cfs_list_entry(lop->lop_urgent.next,
- struct osc_async_page, oap_urgent_item);
-
- if (oap->oap_async_flags & ASYNC_HP) {
- CDEBUG(D_CACHE, "hp request forcing RPC\n");
- RETURN(1);
- }
-
- RETURN(0);
-}
-
-static void on_list(cfs_list_t *item, cfs_list_t *list,
- int should_be_on)
-{
- if (cfs_list_empty(item) && should_be_on)
- cfs_list_add_tail(item, list);
- else if (!cfs_list_empty(item) && !should_be_on)
- cfs_list_del_init(item);
-}
-
-/* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
- * can find pages to build into rpcs quickly */
-void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
-{
- if (lop_makes_hprpc(&loi->loi_write_lop) ||
- lop_makes_hprpc(&loi->loi_read_lop)) {
- /* HP rpc */
- on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
- on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
- } else {
- on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
- on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
- lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
- lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
- }
-
- on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
- loi->loi_write_lop.lop_num_pending);
-
- on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
- loi->loi_read_lop.lop_num_pending);
-}
-
-static void lop_update_pending(struct client_obd *cli,
- struct loi_oap_pages *lop, int cmd, int delta)
-{
- lop->lop_num_pending += delta;
- if (cmd & OBD_BRW_WRITE)
- cli->cl_pending_w_pages += delta;
- else
- cli->cl_pending_r_pages += delta;
-}
-
-/**
- * this is called when a sync waiter receives an interruption. Its job is to
- * get the caller woken as soon as possible. If its page hasn't been put in an
- * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
- * desiring interruption which will forcefully complete the rpc once the rpc
- * has timed out.
- */
-int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
-{
- struct loi_oap_pages *lop;
- struct lov_oinfo *loi;
- int rc = -EBUSY;
- ENTRY;
-
- LASSERT(!oap->oap_interrupted);
- oap->oap_interrupted = 1;
-
- /* ok, it's been put in an rpc. only one oap gets a request reference */
- if (oap->oap_request != NULL) {
- ptlrpc_mark_interrupted(oap->oap_request);
- ptlrpcd_wake(oap->oap_request);
- ptlrpc_req_finished(oap->oap_request);
- oap->oap_request = NULL;
- }
-
- /*
- * page completion may be called only if ->cpo_prep() method was
- * executed by osc_io_submit(), that also adds page the to pending list
- */
- if (!cfs_list_empty(&oap->oap_pending_item)) {
- cfs_list_del_init(&oap->oap_pending_item);
- cfs_list_del_init(&oap->oap_urgent_item);
-
- loi = oap->oap_loi;
- lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
- &loi->loi_write_lop : &loi->loi_read_lop;
- lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
- loi_list_maint(oap->oap_cli, oap->oap_loi);
- rc = oap->oap_caller_ops->ap_completion(env,
- oap->oap_caller_data,
- oap->oap_cmd, NULL, -EINTR);
- }
-
- RETURN(rc);
-}
-
-/* this is trying to propogate async writeback errors back up to the
- * application. As an async write fails we record the error code for later if
- * the app does an fsync. As long as errors persist we force future rpcs to be
- * sync so that the app can get a sync error and break the cycle of queueing
- * pages for which writeback will fail. */
-static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
- int rc)
-{
- if (rc) {
- if (!ar->ar_rc)
- ar->ar_rc = rc;
-
- ar->ar_force_sync = 1;
- ar->ar_min_xid = ptlrpc_sample_next_xid();
- return;
-
- }
-
- if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
- ar->ar_force_sync = 0;
-}
-
-void osc_oap_to_pending(struct osc_async_page *oap)
-{
- struct loi_oap_pages *lop;
-
- if (oap->oap_cmd & OBD_BRW_WRITE)
- lop = &oap->oap_loi->loi_write_lop;
- else
- lop = &oap->oap_loi->loi_read_lop;
-
- if (oap->oap_async_flags & ASYNC_HP)
- cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
- else if (oap->oap_async_flags & ASYNC_URGENT)
- cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
- cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
- lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
-}
-
-/* this must be called holding the loi list lock to give coverage to exit_cache,
- * async_flag maintenance, and oap_request */
-static void osc_ap_completion(const struct lu_env *env,
- struct client_obd *cli, struct obdo *oa,
- struct osc_async_page *oap, int sent, int rc)
-{
- __u64 xid = 0;
-
- ENTRY;
- if (oap->oap_request != NULL) {
- xid = ptlrpc_req_xid(oap->oap_request);
- ptlrpc_req_finished(oap->oap_request);
- oap->oap_request = NULL;
- }
-
- cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags = 0;
- cfs_spin_unlock(&oap->oap_lock);
- oap->oap_interrupted = 0;
-
- if (oap->oap_cmd & OBD_BRW_WRITE) {
- osc_process_ar(&cli->cl_ar, xid, rc);
- osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
- }
-
- if (rc == 0 && oa != NULL) {
- if (oa->o_valid & OBD_MD_FLBLOCKS)
- oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
- if (oa->o_valid & OBD_MD_FLMTIME)
- oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
- if (oa->o_valid & OBD_MD_FLATIME)
- oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
- if (oa->o_valid & OBD_MD_FLCTIME)
- oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
- }
-
- rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
- oap->oap_cmd, oa, rc);
-
- /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
- * start, but OSC calls it under lock and thus we can add oap back to
- * pending safely */
- if (rc)
- /* upper layer wants to leave the page on pending queue */
- osc_oap_to_pending(oap);
- else
- osc_exit_cache(cli, oap, sent);
- EXIT;
-}
-
-static int brw_queue_work(const struct lu_env *env, void *data)
-{
- struct client_obd *cli = data;
-
- CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
-
- client_obd_list_lock(&cli->cl_loi_list_lock);
- osc_check_rpcs0(env, cli, 1);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- RETURN(0);
-}
-
static int brw_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc)
{
struct osc_brw_async_args *aa = data;
+ struct osc_async_page *oap, *tmp;
struct client_obd *cli;
- int async;
ENTRY;
rc = osc_brw_fini_request(req, rc);
else
cli->cl_r_in_flight--;
- async = cfs_list_empty(&aa->aa_oaps);
- if (!async) { /* from osc_send_oap_rpc() */
- struct osc_async_page *oap, *tmp;
- /* the caller may re-use the oap after the completion call so
- * we need to clean it up a little */
- cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
- oap_rpc_item) {
- cfs_list_del_init(&oap->oap_rpc_item);
- osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
- }
- OBDO_FREE(aa->aa_oa);
- } else { /* from async_internal() */
- obd_count i;
- for (i = 0; i < aa->aa_page_count; i++)
- osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
- }
- osc_wake_cache_waiters(cli);
- osc_check_rpcs0(env, cli, 1);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
+ /* the caller may re-use the oap after the completion call so
+ * we need to clean it up a little */
+ cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
+ oap_rpc_item) {
+ cfs_list_del_init(&oap->oap_rpc_item);
+ osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
+ }
+ OBDO_FREE(aa->aa_oa);
- if (!async)
- cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
- req->rq_bulk->bd_nob_transferred);
- osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
- ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
+ osc_wake_cache_waiters(cli);
+ osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
- RETURN(rc);
+ cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
+ req->rq_bulk->bd_nob_transferred);
+ osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
+ ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
+
+ RETURN(rc);
}
-static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
- struct client_obd *cli,
- cfs_list_t *rpc_list,
- int page_count, int cmd)
+/* The most tricky part of this function is that it will return with
+ * cli->cli_loi_list_lock held.
+ */
+int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
+ cfs_list_t *rpc_list, int page_count, int cmd,
+ pdl_policy_t pol)
{
- struct ptlrpc_request *req;
- struct brw_page **pga = NULL;
- struct osc_brw_async_args *aa;
+ struct ptlrpc_request *req = NULL;
+ struct brw_page **pga = NULL;
+ struct osc_brw_async_args *aa = NULL;
struct obdo *oa = NULL;
- const struct obd_async_page_ops *ops = NULL;
struct osc_async_page *oap;
struct osc_async_page *tmp;
struct cl_req *clerq = NULL;
memset(&crattr, 0, sizeof crattr);
OBD_ALLOC(pga, sizeof(*pga) * page_count);
if (pga == NULL)
- GOTO(out, req = ERR_PTR(-ENOMEM));
-
- OBDO_ALLOC(oa);
- if (oa == NULL)
- GOTO(out, req = ERR_PTR(-ENOMEM));
-
- i = 0;
- cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
- struct cl_page *page = osc_oap2cl_page(oap);
- if (ops == NULL) {
- ops = oap->oap_caller_ops;
-
- clerq = cl_req_alloc(env, page, crt,
- 1 /* only 1-object rpcs for
- * now */);
- if (IS_ERR(clerq))
- GOTO(out, req = (void *)clerq);
+ GOTO(out, rc = -ENOMEM);
+
+ OBDO_ALLOC(oa);
+ if (oa == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ i = 0;
+ cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
+ struct cl_page *page = osc_oap2cl_page(oap);
+ if (clerq == NULL) {
+ clerq = cl_req_alloc(env, page, crt,
+ 1 /* only 1-object rpcs for
+ * now */);
+ if (IS_ERR(clerq))
+ GOTO(out, rc = PTR_ERR(clerq));
lock = oap->oap_ldlm_lock;
}
pga[i] = &oap->oap_brw_page;
}
/* always get the data for the obdo for the rpc */
- LASSERT(ops != NULL);
+ LASSERT(clerq != NULL);
crattr.cra_oa = oa;
crattr.cra_capa = NULL;
memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
rc = cl_req_prep(env, clerq);
if (rc != 0) {
CERROR("cl_req_prep failed: %d\n", rc);
- GOTO(out, req = ERR_PTR(rc));
+ GOTO(out, rc);
}
sort_brw_pages(pga, page_count);
pga, &req, crattr.cra_capa, 1, 0);
if (rc != 0) {
CERROR("prep_req failed: %d\n", rc);
- GOTO(out, req = ERR_PTR(rc));
- }
+ GOTO(out, rc);
+ }
+ req->rq_interpret_reply = brw_interpret;
if (cmd & OBD_BRW_MEMALLOC)
req->rq_memalloc = 1;
cfs_memory_pressure_restore(mpflag);
capa_put(crattr.cra_capa);
- if (IS_ERR(req)) {
+ if (rc != 0) {
+ LASSERT(req == NULL);
+
if (oa)
OBDO_FREE(oa);
if (pga)
oap->oap_count);
continue;
}
- osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
- }
- if (clerq && !IS_ERR(clerq))
- cl_req_completion(env, clerq, PTR_ERR(req));
- }
- RETURN(req);
-}
-
-/**
- * prepare pages for ASYNC io and put pages in send queue.
- *
- * \param cmd OBD_BRW_* macroses
- * \param lop pending pages
- *
- * \return zero if no page added to send queue.
- * \return 1 if pages successfully added to send queue.
- * \return negative on errors.
- */
-static int
-osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
- struct lov_oinfo *loi, int cmd,
- struct loi_oap_pages *lop, pdl_policy_t pol)
-{
- struct ptlrpc_request *req;
- obd_count page_count = 0;
- struct osc_async_page *oap = NULL, *tmp;
- struct osc_brw_async_args *aa;
- const struct obd_async_page_ops *ops;
- CFS_LIST_HEAD(rpc_list);
- int srvlock = 0, mem_tight = 0;
- struct cl_object *clob = NULL;
- obd_off starting_offset = OBD_OBJECT_EOF;
- unsigned int ending_offset;
- int starting_page_off = 0;
- ENTRY;
-
- /* ASYNC_HP pages first. At present, when the lock the pages is
- * to be canceled, the pages covered by the lock will be sent out
- * with ASYNC_HP. We have to send out them as soon as possible. */
- cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
- if (oap->oap_async_flags & ASYNC_HP)
- cfs_list_move(&oap->oap_pending_item, &rpc_list);
- else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
- /* only do this for writeback pages. */
- cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
- if (++page_count >= cli->cl_max_pages_per_rpc)
- break;
- }
- cfs_list_splice_init(&rpc_list, &lop->lop_pending);
- page_count = 0;
-
- /* first we find the pages we're allowed to work with */
- cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
- oap_pending_item) {
- ops = oap->oap_caller_ops;
-
- LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
- "magic 0x%x\n", oap, oap->oap_magic);
-
- if (clob == NULL) {
- /* pin object in memory, so that completion call-backs
- * can be safely called under client_obd_list lock. */
- clob = osc_oap2cl_page(oap)->cp_obj;
- cl_object_get(clob);
- }
-
- if (page_count != 0 &&
- srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
- CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
- " oap %p, page %p, srvlock %u\n",
- oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
- break;
- }
-
- /* If there is a gap at the start of this page, it can't merge
- * with any previous page, so we'll hand the network a
- * "fragmented" page array that it can't transfer in 1 RDMA */
- if (oap->oap_obj_off < starting_offset) {
- if (starting_page_off != 0)
- break;
-
- starting_page_off = oap->oap_page_off;
- starting_offset = oap->oap_obj_off + starting_page_off;
- } else if (oap->oap_page_off != 0)
- break;
-
- /* in llite being 'ready' equates to the page being locked
- * until completion unlocks it. commit_write submits a page
- * as not ready because its unlock will happen unconditionally
- * as the call returns. if we race with commit_write giving
- * us that page we don't want to create a hole in the page
- * stream, so we stop and leave the rpc to be fired by
- * another dirtier or kupdated interval (the not ready page
- * will still be on the dirty list). we could call in
- * at the end of ll_file_write to process the queue again. */
- if (!(oap->oap_async_flags & ASYNC_READY)) {
- int rc = ops->ap_make_ready(env, oap->oap_caller_data,
- cmd);
- if (rc < 0)
- CDEBUG(D_INODE, "oap %p page %p returned %d "
- "instead of ready\n", oap,
- oap->oap_page, rc);
- switch (rc) {
- case -EAGAIN:
- /* llite is telling us that the page is still
- * in commit_write and that we should try
- * and put it in an rpc again later. we
- * break out of the loop so we don't create
- * a hole in the sequence of pages in the rpc
- * stream.*/
- oap = NULL;
- break;
- case -EINTR:
- /* the io isn't needed.. tell the checks
- * below to complete the rpc with EINTR */
- cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags |= ASYNC_COUNT_STABLE;
- cfs_spin_unlock(&oap->oap_lock);
- oap->oap_count = -EINTR;
- break;
- case 0:
- cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags |= ASYNC_READY;
- cfs_spin_unlock(&oap->oap_lock);
- break;
- default:
- LASSERTF(0, "oap %p page %p returned %d "
- "from make_ready\n", oap,
- oap->oap_page, rc);
- break;
- }
- }
- if (oap == NULL)
- break;
-
- /* take the page out of our book-keeping */
- cfs_list_del_init(&oap->oap_pending_item);
- lop_update_pending(cli, lop, cmd, -1);
- cfs_list_del_init(&oap->oap_urgent_item);
-
- /* ask the caller for the size of the io as the rpc leaves. */
- if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
- oap->oap_count =
- ops->ap_refresh_count(env, oap->oap_caller_data,
- cmd);
- LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
- }
- if (oap->oap_count <= 0) {
- CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
- oap->oap_count);
- osc_ap_completion(env, cli, NULL,
- oap, 0, oap->oap_count);
- continue;
- }
-
- /* now put the page back in our accounting */
- cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
- if (page_count++ == 0)
- srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
-
- if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
- mem_tight = 1;
-
- /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
- * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
- * have the same alignment as the initial writes that allocated
- * extents on the server. */
- ending_offset = oap->oap_obj_off + oap->oap_page_off +
- oap->oap_count;
- if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
- break;
-
- if (page_count >= cli->cl_max_pages_per_rpc)
- break;
-
- /* If there is a gap at the end of this page, it can't merge
- * with any subsequent pages, so we'll hand the network a
- * "fragmented" page array that it can't transfer in 1 RDMA */
- if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
- break;
- }
-
- loi_list_maint(cli, loi);
-
- client_obd_list_unlock(&cli->cl_loi_list_lock);
-
- if (clob != NULL)
- cl_object_put(env, clob);
-
- if (page_count == 0) {
- client_obd_list_lock(&cli->cl_loi_list_lock);
- RETURN(0);
- }
-
- req = osc_build_req(env, cli, &rpc_list, page_count,
- mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
- if (IS_ERR(req)) {
- LASSERT(cfs_list_empty(&rpc_list));
- loi_list_maint(cli, loi);
- RETURN(PTR_ERR(req));
- }
-
- aa = ptlrpc_req_async_args(req);
-
- starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
- if (cmd == OBD_BRW_READ) {
- lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
- lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
- lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
- (starting_offset >> CFS_PAGE_SHIFT) + 1);
- } else {
- lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
- lprocfs_oh_tally(&cli->cl_write_rpc_hist,
- cli->cl_w_in_flight);
- lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
- (starting_offset >> CFS_PAGE_SHIFT) + 1);
- }
-
- client_obd_list_lock(&cli->cl_loi_list_lock);
-
- if (cmd == OBD_BRW_READ)
- cli->cl_r_in_flight++;
- else
- cli->cl_w_in_flight++;
-
- /* queued sync pages can be torn down while the pages
- * were between the pending list and the rpc */
- tmp = NULL;
- cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
- /* only one oap gets a request reference */
- if (tmp == NULL)
- tmp = oap;
- if (oap->oap_interrupted && !req->rq_intr) {
- CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
- oap, req);
- ptlrpc_mark_interrupted(req);
- }
- }
- if (tmp != NULL)
- tmp->oap_request = ptlrpc_request_addref(req);
-
- DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
- page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
-
- req->rq_interpret_reply = brw_interpret;
-
- /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
- * CPU/NUMA node the majority of pages were allocated on, and try
- * to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
- * to reduce cross-CPU memory traffic.
- *
- * But on the other hand, we expect that multiple ptlrpcd threads
- * and the initial write sponsor can run in parallel, especially
- * when data checksum is enabled, which is CPU-bound operation and
- * single ptlrpcd thread cannot process in time. So more ptlrpcd
- * threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
- */
- ptlrpcd_add_req(req, pol, -1);
- RETURN(1);
-}
-
-#define LOI_DEBUG(LOI, STR, args...) \
- CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
- !cfs_list_empty(&(LOI)->loi_ready_item) || \
- !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
- (LOI)->loi_write_lop.lop_num_pending, \
- !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
- (LOI)->loi_read_lop.lop_num_pending, \
- !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
- args) \
-
-/* This is called by osc_check_rpcs() to find which objects have pages that
- * we could be sending. These lists are maintained by lop_makes_rpc(). */
-struct lov_oinfo *osc_next_loi(struct client_obd *cli)
-{
- ENTRY;
-
- /* First return objects that have blocked locks so that they
- * will be flushed quickly and other clients can get the lock,
- * then objects which have pages ready to be stuffed into RPCs */
- if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
- RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
- struct lov_oinfo, loi_hp_ready_item));
- if (!cfs_list_empty(&cli->cl_loi_ready_list))
- RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
- struct lov_oinfo, loi_ready_item));
-
- /* then if we have cache waiters, return all objects with queued
- * writes. This is especially important when many small files
- * have filled up the cache and not been fired into rpcs because
- * they don't pass the nr_pending/object threshhold */
- if (!cfs_list_empty(&cli->cl_cache_waiters) &&
- !cfs_list_empty(&cli->cl_loi_write_list))
- RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
- struct lov_oinfo, loi_write_item));
-
- /* then return all queued objects when we have an invalid import
- * so that they get flushed */
- if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
- if (!cfs_list_empty(&cli->cl_loi_write_list))
- RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
- struct lov_oinfo,
- loi_write_item));
- if (!cfs_list_empty(&cli->cl_loi_read_list))
- RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
- struct lov_oinfo, loi_read_item));
- }
- RETURN(NULL);
-}
-
-static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
-{
- struct osc_async_page *oap;
- int hprpc = 0;
-
- if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
- oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
- struct osc_async_page, oap_urgent_item);
- hprpc = !!(oap->oap_async_flags & ASYNC_HP);
- }
-
- if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
- oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
- struct osc_async_page, oap_urgent_item);
- hprpc = !!(oap->oap_async_flags & ASYNC_HP);
- }
-
- return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
-}
-
-/* called with the loi list lock held */
-static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
-{
- struct lov_oinfo *loi;
- int rc = 0, race_counter = 0;
- pdl_policy_t pol;
- ENTRY;
-
- pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
-
- while ((loi = osc_next_loi(cli)) != NULL) {
- LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
-
- if (osc_max_rpc_in_flight(cli, loi))
- break;
-
- /* attempt some read/write balancing by alternating between
- * reads and writes in an object. The makes_rpc checks here
- * would be redundant if we were getting read/write work items
- * instead of objects. we don't want send_oap_rpc to drain a
- * partial read pending queue when we're given this object to
- * do io on writes while there are cache waiters */
- if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
- rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
- &loi->loi_write_lop, pol);
- if (rc < 0) {
- CERROR("Write request failed with %d\n", rc);
-
- /* osc_send_oap_rpc failed, mostly because of
- * memory pressure.
- *
- * It can't break here, because if:
- * - a page was submitted by osc_io_submit, so
- * page locked;
- * - no request in flight
- * - no subsequent request
- * The system will be in live-lock state,
- * because there is no chance to call
- * osc_io_unplug() and osc_check_rpcs() any
- * more. pdflush can't help in this case,
- * because it might be blocked at grabbing
- * the page lock as we mentioned.
- *
- * Anyway, continue to drain pages. */
- /* break; */
- }
-
- if (rc > 0)
- race_counter = 0;
- else if (rc == 0)
- race_counter++;
- }
- if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
- rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
- &loi->loi_read_lop, pol);
- if (rc < 0)
- CERROR("Read request failed with %d\n", rc);
-
- if (rc > 0)
- race_counter = 0;
- else if (rc == 0)
- race_counter++;
- }
-
- /* attempt some inter-object balancing by issuing rpcs
- * for each object in turn */
- if (!cfs_list_empty(&loi->loi_hp_ready_item))
- cfs_list_del_init(&loi->loi_hp_ready_item);
- if (!cfs_list_empty(&loi->loi_ready_item))
- cfs_list_del_init(&loi->loi_ready_item);
- if (!cfs_list_empty(&loi->loi_write_item))
- cfs_list_del_init(&loi->loi_write_item);
- if (!cfs_list_empty(&loi->loi_read_item))
- cfs_list_del_init(&loi->loi_read_item);
-
- loi_list_maint(cli, loi);
-
- /* send_oap_rpc fails with 0 when make_ready tells it to
- * back off. llite's make_ready does this when it tries
- * to lock a page queued for write that is already locked.
- * we want to try sending rpcs from many objects, but we
- * don't want to spin failing with 0. */
- if (race_counter == 10)
- break;
- }
-}
-
-void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
-{
- osc_check_rpcs0(env, cli, 0);
-}
-
-/**
- * Non-blocking version of osc_enter_cache() that consumes grant only when it
- * is available.
- */
-int osc_enter_cache_try(const struct lu_env *env,
- struct client_obd *cli, struct lov_oinfo *loi,
- struct osc_async_page *oap, int transient)
-{
- int has_grant;
-
- has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
- if (has_grant) {
- osc_consume_write_grant(cli, &oap->oap_brw_page);
- if (transient) {
- cli->cl_dirty_transit += CFS_PAGE_SIZE;
- cfs_atomic_inc(&obd_dirty_transit_pages);
- oap->oap_brw_flags |= OBD_BRW_NOCACHE;
- }
- }
- return has_grant;
-}
-
-/* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
- * grant or cache space. */
-static int osc_enter_cache(const struct lu_env *env,
- struct client_obd *cli, struct lov_oinfo *loi,
- struct osc_async_page *oap)
-{
- struct osc_cache_waiter ocw;
- struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
- int rc = -EDQUOT;
- ENTRY;
-
- CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
- "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
- cli->cl_dirty_max, obd_max_dirty_pages,
- cli->cl_lost_grant, cli->cl_avail_grant);
-
- /* force the caller to try sync io. this can jump the list
- * of queued writes and create a discontiguous rpc stream */
- if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
- cli->cl_dirty_max < CFS_PAGE_SIZE ||
- cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
- RETURN(-EDQUOT);
-
- /* Hopefully normal case - cache space and write credits available */
- if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
- cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
- osc_enter_cache_try(env, cli, loi, oap, 0))
- RETURN(0);
-
- /* We can get here for two reasons: too many dirty pages in cache, or
- * run out of grants. In both cases we should write dirty pages out.
- * Adding a cache waiter will trigger urgent write-out no matter what
- * RPC size will be.
- * The exiting condition is no avail grants and no dirty pages caching,
- * that really means there is no space on the OST. */
- cfs_waitq_init(&ocw.ocw_waitq);
- ocw.ocw_oap = oap;
- while (cli->cl_dirty > 0) {
- cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
- ocw.ocw_rc = 0;
-
- loi_list_maint(cli, loi);
- osc_check_rpcs(env, cli);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
-
- CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
- cli->cl_import->imp_obd->obd_name, &ocw, oap);
-
- rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry), &lwi);
-
- client_obd_list_lock(&cli->cl_loi_list_lock);
- cfs_list_del_init(&ocw.ocw_entry);
- if (rc < 0)
- break;
-
- rc = ocw.ocw_rc;
- if (rc != -EDQUOT)
- break;
- }
-
- RETURN(rc);
-}
-
-
-int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
- struct lov_oinfo *loi, cfs_page_t *page,
- obd_off offset, const struct obd_async_page_ops *ops,
- void *data, void **res, int nocache,
- struct lustre_handle *lockh)
-{
- struct osc_async_page *oap;
-
- ENTRY;
-
- if (!page)
- return cfs_size_round(sizeof(*oap));
-
- oap = *res;
- oap->oap_magic = OAP_MAGIC;
- oap->oap_cli = &exp->exp_obd->u.cli;
- oap->oap_loi = loi;
-
- oap->oap_caller_ops = ops;
- oap->oap_caller_data = data;
-
- oap->oap_page = page;
- oap->oap_obj_off = offset;
- if (!client_is_remote(exp) &&
- cfs_capable(CFS_CAP_SYS_RESOURCE))
- oap->oap_brw_flags = OBD_BRW_NOQUOTA;
-
- LASSERT(!(offset & ~CFS_PAGE_MASK));
-
- CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
- CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
- CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
- CFS_INIT_LIST_HEAD(&oap->oap_page_list);
-
- cfs_spin_lock_init(&oap->oap_lock);
- CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
- RETURN(0);
-}
-
-int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
- struct lov_stripe_md *lsm, struct lov_oinfo *loi,
- struct osc_async_page *oap, int cmd, int off,
- int count, obd_flag brw_flags, enum async_flags async_flags)
-{
- struct client_obd *cli = &exp->exp_obd->u.cli;
- int rc = 0;
- ENTRY;
-
- if (oap->oap_magic != OAP_MAGIC)
- RETURN(-EINVAL);
-
- if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
- RETURN(-EIO);
-
- if (!cfs_list_empty(&oap->oap_pending_item) ||
- !cfs_list_empty(&oap->oap_urgent_item) ||
- !cfs_list_empty(&oap->oap_rpc_item))
- RETURN(-EBUSY);
-
- /* check if the file's owner/group is over quota */
- if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
- struct cl_object *obj;
- struct cl_attr attr; /* XXX put attr into thread info */
- unsigned int qid[MAXQUOTAS];
-
- obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
-
- cl_object_attr_lock(obj);
- rc = cl_object_attr_get(env, obj, &attr);
- cl_object_attr_unlock(obj);
-
- qid[USRQUOTA] = attr.cat_uid;
- qid[GRPQUOTA] = attr.cat_gid;
- if (rc == 0 &&
- osc_quota_chkdq(cli, qid) == NO_QUOTA)
- rc = -EDQUOT;
- if (rc)
- RETURN(rc);
- }
-
- if (loi == NULL)
- loi = lsm->lsm_oinfo[0];
-
- client_obd_list_lock(&cli->cl_loi_list_lock);
-
- LASSERT(off + count <= CFS_PAGE_SIZE);
- oap->oap_cmd = cmd;
- oap->oap_page_off = off;
- oap->oap_count = count;
- oap->oap_brw_flags = brw_flags;
- /* Give a hint to OST that requests are coming from kswapd - bug19529 */
- if (cfs_memory_pressure_get())
- oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
- cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags = async_flags;
- cfs_spin_unlock(&oap->oap_lock);
-
- if (cmd & OBD_BRW_WRITE) {
- rc = osc_enter_cache(env, cli, loi, oap);
- if (rc) {
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- RETURN(rc);
- }
- }
-
- LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
- cmd);
-
- osc_oap_to_pending(oap);
- loi_list_maint(cli, loi);
- if (!osc_max_rpc_in_flight(cli, loi) &&
- lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
- LASSERT(cli->cl_writeback_work != NULL);
- rc = ptlrpcd_queue_work(cli->cl_writeback_work);
-
- CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
- cli, rc);
- }
- client_obd_list_unlock(&cli->cl_loi_list_lock);
-
- RETURN(0);
-}
-
-/* aka (~was & now & flag), but this is more clear :) */
-#define SETTING(was, now, flag) (!(was & flag) && (now & flag))
-
-int osc_set_async_flags_base(struct client_obd *cli,
- struct lov_oinfo *loi, struct osc_async_page *oap,
- obd_flag async_flags)
-{
- struct loi_oap_pages *lop;
- int flags = 0;
- ENTRY;
-
- LASSERT(!cfs_list_empty(&oap->oap_pending_item));
-
- if (oap->oap_cmd & OBD_BRW_WRITE) {
- lop = &loi->loi_write_lop;
- } else {
- lop = &loi->loi_read_lop;
- }
-
- if ((oap->oap_async_flags & async_flags) == async_flags)
- RETURN(0);
-
- if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
- flags |= ASYNC_READY;
-
- if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
- cfs_list_empty(&oap->oap_rpc_item)) {
- if (oap->oap_async_flags & ASYNC_HP)
- cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
- else
- cfs_list_add_tail(&oap->oap_urgent_item,
- &lop->lop_urgent);
- flags |= ASYNC_URGENT;
- loi_list_maint(cli, loi);
- }
- cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags |= flags;
- cfs_spin_unlock(&oap->oap_lock);
-
- LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
- oap->oap_async_flags);
- RETURN(0);
-}
-
-int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
- struct lov_oinfo *loi, struct osc_async_page *oap)
-{
- struct client_obd *cli = &exp->exp_obd->u.cli;
- struct loi_oap_pages *lop;
- int rc = 0;
- ENTRY;
-
- if (oap->oap_magic != OAP_MAGIC)
- RETURN(-EINVAL);
-
- if (loi == NULL)
- loi = lsm->lsm_oinfo[0];
-
- if (oap->oap_cmd & OBD_BRW_WRITE) {
- lop = &loi->loi_write_lop;
- } else {
- lop = &loi->loi_read_lop;
- }
-
- client_obd_list_lock(&cli->cl_loi_list_lock);
-
- if (!cfs_list_empty(&oap->oap_rpc_item))
- GOTO(out, rc = -EBUSY);
-
- osc_exit_cache(cli, oap, 0);
- osc_wake_cache_waiters(cli);
-
- if (!cfs_list_empty(&oap->oap_urgent_item)) {
- cfs_list_del_init(&oap->oap_urgent_item);
- cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
- cfs_spin_unlock(&oap->oap_lock);
- }
- if (!cfs_list_empty(&oap->oap_pending_item)) {
- cfs_list_del_init(&oap->oap_pending_item);
- lop_update_pending(cli, lop, oap->oap_cmd, -1);
- }
- loi_list_maint(cli, loi);
- LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
-out:
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- RETURN(rc);
+ osc_ap_completion(env, cli, NULL, oap, 0, rc);
+ }
+ if (clerq && !IS_ERR(clerq))
+ cl_req_completion(env, clerq, rc);
+ } else {
+ struct osc_async_page *tmp = NULL;
+
+ /* queued sync pages can be torn down while the pages
+ * were between the pending list and the rpc */
+ LASSERT(aa != NULL);
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
+ /* only one oap gets a request reference */
+ if (tmp == NULL)
+ tmp = oap;
+ if (oap->oap_interrupted && !req->rq_intr) {
+ CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
+ oap, req);
+ ptlrpc_mark_interrupted(req);
+ }
+ }
+ if (tmp != NULL)
+ tmp->oap_request = ptlrpc_request_addref(req);
+
+ DEBUG_REQ(D_INODE,req, "%d pages, aa %p. now %dr/%dw in flight",
+ page_count, aa, cli->cl_r_in_flight,
+ cli->cl_w_in_flight);
+
+ /* XXX: Maybe the caller can check the RPC bulk descriptor to
+ * see which CPU/NUMA node the majority of pages were allocated
+ * on, and try to assign the async RPC to the CPU core
+ * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
+ *
+ * But on the other hand, we expect that multiple ptlrpcd
+ * threads and the initial write sponsor can run in parallel,
+ * especially when data checksum is enabled, which is CPU-bound
+ * operation and single ptlrpcd thread cannot process in time.
+ * So more ptlrpcd threads sharing BRW load
+ * (with PDL_POLICY_ROUND) seems better.
+ */
+ ptlrpcd_add_req(req, pol, -1);
+ }
+ RETURN(rc);
}
static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
client_obd_list_lock(&cli->cl_loi_list_lock);
/* all pages go to failing rpcs due to the invalid
* import */
- osc_check_rpcs(env, cli);
+ osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
client_obd_list_unlock(&cli->cl_loi_list_lock);
ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
RETURN(0);
}
+static int brw_queue_work(const struct lu_env *env, void *data)
+{
+ struct client_obd *cli = data;
+
+ CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+ RETURN(0);
+}
+
int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
{
struct client_obd *cli = &obd->u.cli;