1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011 Whamcloud, Inc.
36 * This file is part of Lustre, http://www.lustre.org/
37 * Lustre is a trademark of Sun Microsystems, Inc.
39 * osc cache management.
41 * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
44 #define DEBUG_SUBSYSTEM S_OSC
46 #include "osc_cl_internal.h"
47 #include "osc_internal.h"
49 static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
50 struct osc_async_page *oap);
51 static int osc_enter_cache_try(const struct lu_env *env, struct client_obd *cli,
52 struct osc_async_page *oap, int transient);
53 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
60 #define OSC_IO_DEBUG(OSC, STR, args...) \
61 CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
62 !cfs_list_empty(&(OSC)->oo_ready_item) || \
63 !cfs_list_empty(&(OSC)->oo_hp_ready_item), \
64 (OSC)->oo_write_pages.oop_num_pending, \
65 !cfs_list_empty(&(OSC)->oo_write_pages.oop_urgent), \
66 (OSC)->oo_read_pages.oop_num_pending, \
67 !cfs_list_empty(&(OSC)->oo_read_pages.oop_urgent), \
70 static inline struct osc_page *oap2osc_page(struct osc_async_page *oap)
72 return (struct osc_page *)container_of(oap, struct osc_page, ops_oap);
75 static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
78 struct osc_page *opg = oap2osc_page(oap);
79 struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page);
82 LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
85 result = cl_page_make_ready(env, page, CRT_WRITE);
87 opg->ops_submit_time = cfs_time_current();
91 static int osc_refresh_count(const struct lu_env *env,
92 struct osc_async_page *oap, int cmd)
94 struct osc_page *opg = oap2osc_page(oap);
96 struct cl_object *obj;
97 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
102 /* readpage queues with _COUNT_STABLE, shouldn't get here. */
103 LASSERT(!(cmd & OBD_BRW_READ));
104 LASSERT(opg != NULL);
105 page = opg->ops_cl.cpl_page;
106 obj = opg->ops_cl.cpl_obj;
108 cl_object_attr_lock(obj);
109 result = cl_object_attr_get(env, obj, attr);
110 cl_object_attr_unlock(obj);
114 if (cl_offset(obj, page->cp_index) >= kms)
115 /* catch race with truncate */
117 else if (cl_offset(obj, page->cp_index + 1) > kms)
118 /* catch sub-page write at end of file */
119 return kms % CFS_PAGE_SIZE;
121 return CFS_PAGE_SIZE;
124 static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
125 int cmd, struct obdo *oa, int rc)
127 struct osc_page *opg = oap2osc_page(oap);
128 struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page);
129 struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
130 enum cl_req_type crt;
135 cmd &= ~OBD_BRW_NOQUOTA;
136 LASSERT(equi(page->cp_state == CPS_PAGEIN, cmd == OBD_BRW_READ));
137 LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE));
138 LASSERT(opg->ops_transfer_pinned);
141 * page->cp_req can be NULL if io submission failed before
142 * cl_req was allocated.
144 if (page->cp_req != NULL)
145 cl_req_page_done(env, page);
146 LASSERT(page->cp_req == NULL);
148 /* As the transfer for this page is being done, clear the flags */
149 cfs_spin_lock(&oap->oap_lock);
150 oap->oap_async_flags = 0;
151 cfs_spin_unlock(&oap->oap_lock);
153 crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
154 /* Clear opg->ops_transfer_pinned before VM lock is released. */
155 opg->ops_transfer_pinned = 0;
157 cfs_spin_lock(&obj->oo_seatbelt);
158 LASSERT(opg->ops_submitter != NULL);
159 LASSERT(!cfs_list_empty(&opg->ops_inflight));
160 cfs_list_del_init(&opg->ops_inflight);
161 cfs_spin_unlock(&obj->oo_seatbelt);
163 opg->ops_submit_time = 0;
164 srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
166 cl_page_completion(env, page, crt, rc);
169 if (rc == 0 && srvlock) {
170 struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev;
171 struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
172 int bytes = oap->oap_count;
175 stats->os_lockless_reads += bytes;
177 stats->os_lockless_writes += bytes;
181 * This has to be the last operation with the page, as locks are
182 * released in cl_page_completion() and nothing except for the
183 * reference counter protects page from concurrent reclaim.
185 lu_ref_del(&page->cp_reference, "transfer", page);
187 * As page->cp_obj is pinned by a reference from page->cp_req, it is
188 * safe to call cl_page_put() without risking object destruction in a
189 * non-blocking context.
191 cl_page_put(env, page);
195 /* caller must hold loi_list_lock */
196 static void osc_consume_write_grant(struct client_obd *cli,
197 struct brw_page *pga)
199 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
200 LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
201 cfs_atomic_inc(&obd_dirty_pages);
202 cli->cl_dirty += CFS_PAGE_SIZE;
203 cli->cl_avail_grant -= CFS_PAGE_SIZE;
204 pga->flag |= OBD_BRW_FROM_GRANT;
205 CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
206 CFS_PAGE_SIZE, pga, pga->pg);
207 LASSERT(cli->cl_avail_grant >= 0);
208 osc_update_next_shrink(cli);
211 /* the companion to osc_consume_write_grant, called when a brw has completed.
212 * must be called with the loi lock held. */
213 static void osc_release_write_grant(struct client_obd *cli,
214 struct brw_page *pga, int sent)
216 int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
219 LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
220 if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
225 pga->flag &= ~OBD_BRW_FROM_GRANT;
226 cfs_atomic_dec(&obd_dirty_pages);
227 cli->cl_dirty -= CFS_PAGE_SIZE;
228 if (pga->flag & OBD_BRW_NOCACHE) {
229 pga->flag &= ~OBD_BRW_NOCACHE;
230 cfs_atomic_dec(&obd_dirty_transit_pages);
231 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
234 /* Reclaim grant from truncated pages. This is used to solve
235 * write-truncate and grant all gone(to lost_grant) problem.
236 * For a vfs write this problem can be easily solved by a sync
237 * write, however, this is not an option for page_mkwrite()
238 * because grant has to be allocated before a page becomes
240 if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE)
241 cli->cl_avail_grant += CFS_PAGE_SIZE;
243 cli->cl_lost_grant += CFS_PAGE_SIZE;
244 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
245 cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
246 } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
247 /* For short writes we shouldn't count parts of pages that
248 * span a whole block on the OST side, or our accounting goes
249 * wrong. Should match the code in filter_grant_check. */
250 int offset = pga->off & ~CFS_PAGE_MASK;
251 int count = pga->count + (offset & (blocksize - 1));
252 int end = (offset + pga->count) & (blocksize - 1);
254 count += blocksize - end;
256 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
257 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
258 CFS_PAGE_SIZE - count, cli->cl_lost_grant,
259 cli->cl_avail_grant, cli->cl_dirty);
265 /* The companion to osc_enter_cache(), called when @oap is no longer part of
266 * the dirty accounting. Writeback completes or truncate happens before
267 * writing starts. Must be called with the loi lock held. */
268 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
271 osc_release_write_grant(cli, &oap->oap_brw_page, sent);
275 * Non-blocking version of osc_enter_cache() that consumes grant only when it
278 static int osc_enter_cache_try(const struct lu_env *env, struct client_obd *cli,
279 struct osc_async_page *oap, int transient)
283 has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
285 osc_consume_write_grant(cli, &oap->oap_brw_page);
287 cli->cl_dirty_transit += CFS_PAGE_SIZE;
288 cfs_atomic_inc(&obd_dirty_transit_pages);
289 oap->oap_brw_flags |= OBD_BRW_NOCACHE;
295 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
296 * grant or cache space. */
297 static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
298 struct osc_async_page *oap)
300 struct osc_object *osc = oap->oap_obj;
301 struct lov_oinfo *loi = osc->oo_oinfo;
302 struct osc_cache_waiter ocw;
303 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
307 CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
308 "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
309 cli->cl_dirty_max, obd_max_dirty_pages,
310 cli->cl_lost_grant, cli->cl_avail_grant);
312 /* force the caller to try sync io. this can jump the list
313 * of queued writes and create a discontiguous rpc stream */
314 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
315 cli->cl_dirty_max < CFS_PAGE_SIZE ||
316 cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
319 /* Hopefully normal case - cache space and write credits available */
320 if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
321 cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
322 osc_enter_cache_try(env, cli, oap, 0))
325 /* We can get here for two reasons: too many dirty pages in cache, or
326 * run out of grants. In both cases we should write dirty pages out.
327 * Adding a cache waiter will trigger urgent write-out no matter what
329 * The exiting condition is no avail grants and no dirty pages caching,
330 * that really means there is no space on the OST. */
331 cfs_waitq_init(&ocw.ocw_waitq);
333 while (cli->cl_dirty > 0) {
334 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
337 osc_io_unplug(env, cli, osc, PDL_POLICY_ROUND);
338 client_obd_list_unlock(&cli->cl_loi_list_lock);
340 CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
341 cli->cl_import->imp_obd->obd_name, &ocw, oap);
343 rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry),
346 client_obd_list_lock(&cli->cl_loi_list_lock);
347 cfs_list_del_init(&ocw.ocw_entry);
359 /* caller must hold loi_list_lock */
360 void osc_wake_cache_waiters(struct client_obd *cli)
363 struct osc_cache_waiter *ocw;
366 cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
367 /* if we can't dirty more, we must wait until some is written */
368 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
369 (cfs_atomic_read(&obd_dirty_pages) + 1 >
370 obd_max_dirty_pages)) {
371 CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
372 "osc max %ld, sys max %d\n", cli->cl_dirty,
373 cli->cl_dirty_max, obd_max_dirty_pages);
377 /* if still dirty cache but no grant wait for pending RPCs that
378 * may yet return us some grant before doing sync writes */
379 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
380 CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
381 cli->cl_w_in_flight);
385 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
386 cfs_list_del_init(&ocw->ocw_entry);
387 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
388 /* no more RPCs in flight to return grant, do sync IO */
389 ocw->ocw_rc = -EDQUOT;
390 CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
392 osc_consume_write_grant(cli,
393 &ocw->ocw_oap->oap_brw_page);
396 CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n",
397 ocw, ocw->ocw_oap, cli->cl_avail_grant);
399 cfs_waitq_signal(&ocw->ocw_waitq);
405 static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc)
407 struct osc_async_page *oap;
410 if (!cfs_list_empty(&osc->oo_write_pages.oop_urgent)) {
411 oap = cfs_list_entry(osc->oo_write_pages.oop_urgent.next,
412 struct osc_async_page, oap_urgent_item);
413 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
416 if (!hprpc && !cfs_list_empty(&osc->oo_read_pages.oop_urgent)) {
417 oap = cfs_list_entry(osc->oo_read_pages.oop_urgent.next,
418 struct osc_async_page, oap_urgent_item);
419 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
422 return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
425 /* This maintains the lists of pending pages to read/write for a given object
426 * (lop). This is used by osc_check_rpcs->osc_next_obj() and osc_list_maint()
427 * to quickly find objects that are ready to send an RPC. */
428 static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc,
431 struct osc_oap_pages *lop;
434 if (cmd & OBD_BRW_WRITE) {
435 lop = &osc->oo_write_pages;
437 lop = &osc->oo_read_pages;
440 if (lop->oop_num_pending == 0)
443 /* if we have an invalid import we want to drain the queued pages
444 * by forcing them through rpcs that immediately fail and complete
445 * the pages. recovery relies on this to empty the queued pages
446 * before canceling the locks and evicting down the llite pages */
447 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
450 /* stream rpcs in queue order as long as as there is an urgent page
451 * queued. this is our cheap solution for good batching in the case
452 * where writepage marks some random page in the middle of the file
453 * as urgent because of, say, memory pressure */
454 if (!cfs_list_empty(&lop->oop_urgent)) {
455 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
459 if (cmd & OBD_BRW_WRITE) {
460 /* trigger a write rpc stream as long as there are dirtiers
461 * waiting for space. as they're waiting, they're not going to
462 * create more pages to coalesce with what's waiting.. */
463 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
464 CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
468 if (lop->oop_num_pending >= cli->cl_max_pages_per_rpc)
474 static void lop_update_pending(struct client_obd *cli,
475 struct osc_oap_pages *lop, int cmd, int delta)
477 lop->oop_num_pending += delta;
478 if (cmd & OBD_BRW_WRITE)
479 cli->cl_pending_w_pages += delta;
481 cli->cl_pending_r_pages += delta;
484 static int osc_makes_hprpc(struct osc_oap_pages *lop)
486 struct osc_async_page *oap;
489 if (cfs_list_empty(&lop->oop_urgent))
492 oap = cfs_list_entry(lop->oop_urgent.next,
493 struct osc_async_page, oap_urgent_item);
495 if (oap->oap_async_flags & ASYNC_HP) {
496 CDEBUG(D_CACHE, "hp request forcing RPC\n");
503 static void on_list(cfs_list_t *item, cfs_list_t *list, int should_be_on)
505 if (cfs_list_empty(item) && should_be_on)
506 cfs_list_add_tail(item, list);
507 else if (!cfs_list_empty(item) && !should_be_on)
508 cfs_list_del_init(item);
511 /* maintain the osc's cli list membership invariants so that osc_send_oap_rpc
512 * can find pages to build into rpcs quickly */
513 static void osc_list_maint(struct client_obd *cli, struct osc_object *osc)
515 if (osc_makes_hprpc(&osc->oo_write_pages) ||
516 osc_makes_hprpc(&osc->oo_read_pages)) {
518 on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list, 0);
519 on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
521 on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
522 on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list,
523 osc_makes_rpc(cli, osc, OBD_BRW_WRITE) ||
524 osc_makes_rpc(cli, osc, OBD_BRW_READ));
527 on_list(&osc->oo_write_item, &cli->cl_loi_write_list,
528 osc->oo_write_pages.oop_num_pending);
530 on_list(&osc->oo_read_item, &cli->cl_loi_read_list,
531 osc->oo_read_pages.oop_num_pending);
534 /* this is trying to propogate async writeback errors back up to the
535 * application. As an async write fails we record the error code for later if
536 * the app does an fsync. As long as errors persist we force future rpcs to be
537 * sync so that the app can get a sync error and break the cycle of queueing
538 * pages for which writeback will fail. */
539 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
546 ar->ar_force_sync = 1;
547 ar->ar_min_xid = ptlrpc_sample_next_xid();
552 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
553 ar->ar_force_sync = 0;
556 static void osc_oap_to_pending(struct osc_async_page *oap)
558 struct osc_object *osc = oap->oap_obj;
559 struct osc_oap_pages *lop;
561 if (oap->oap_cmd & OBD_BRW_WRITE)
562 lop = &osc->oo_write_pages;
564 lop = &osc->oo_read_pages;
566 if (oap->oap_async_flags & ASYNC_HP)
567 cfs_list_add(&oap->oap_urgent_item, &lop->oop_urgent);
568 else if (oap->oap_async_flags & ASYNC_URGENT)
569 cfs_list_add_tail(&oap->oap_urgent_item, &lop->oop_urgent);
570 cfs_list_add_tail(&oap->oap_pending_item, &lop->oop_pending);
571 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
574 /* this must be called holding the loi list lock to give coverage to exit_cache,
575 * async_flag maintenance, and oap_request */
576 void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
577 struct obdo *oa, struct osc_async_page *oap,
580 struct osc_object *osc = oap->oap_obj;
581 struct lov_oinfo *loi = osc->oo_oinfo;
585 if (oap->oap_request != NULL) {
586 xid = ptlrpc_req_xid(oap->oap_request);
587 ptlrpc_req_finished(oap->oap_request);
588 oap->oap_request = NULL;
591 cfs_spin_lock(&oap->oap_lock);
592 oap->oap_async_flags = 0;
593 cfs_spin_unlock(&oap->oap_lock);
594 oap->oap_interrupted = 0;
596 if (oap->oap_cmd & OBD_BRW_WRITE) {
597 osc_process_ar(&cli->cl_ar, xid, rc);
598 osc_process_ar(&loi->loi_ar, xid, rc);
601 if (rc == 0 && oa != NULL) {
602 if (oa->o_valid & OBD_MD_FLBLOCKS)
603 loi->loi_lvb.lvb_blocks = oa->o_blocks;
604 if (oa->o_valid & OBD_MD_FLMTIME)
605 loi->loi_lvb.lvb_mtime = oa->o_mtime;
606 if (oa->o_valid & OBD_MD_FLATIME)
607 loi->loi_lvb.lvb_atime = oa->o_atime;
608 if (oa->o_valid & OBD_MD_FLCTIME)
609 loi->loi_lvb.lvb_ctime = oa->o_ctime;
612 rc = osc_completion(env, oap, oap->oap_cmd, oa, rc);
614 /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
615 * start, but OSC calls it under lock and thus we can add oap back to
618 /* upper layer wants to leave the page on pending queue */
619 osc_oap_to_pending(oap);
621 osc_exit_cache(cli, oap, sent);
626 * prepare pages for ASYNC io and put pages in send queue.
628 * \param cmd OBD_BRW_* macroses
629 * \param lop pending pages
631 * \return zero if no page added to send queue.
632 * \return 1 if pages successfully added to send queue.
633 * \return negative on errors.
636 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
637 struct osc_object *osc, int cmd,
638 struct osc_oap_pages *lop, pdl_policy_t pol)
640 obd_count page_count = 0;
641 struct osc_async_page *oap = NULL, *tmp;
642 CFS_LIST_HEAD(rpc_list);
643 int srvlock = 0, mem_tight = 0;
644 obd_off starting_offset = OBD_OBJECT_EOF;
645 unsigned int ending_offset;
646 int starting_page_off = 0;
650 /* ASYNC_HP pages first. At present, when the lock the pages is
651 * to be canceled, the pages covered by the lock will be sent out
652 * with ASYNC_HP. We have to send out them as soon as possible. */
653 cfs_list_for_each_entry_safe(oap, tmp, &lop->oop_urgent, oap_urgent_item) {
654 if (oap->oap_async_flags & ASYNC_HP)
655 cfs_list_move(&oap->oap_pending_item, &rpc_list);
656 else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
657 /* only do this for writeback pages. */
658 cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
659 if (++page_count >= cli->cl_max_pages_per_rpc)
662 cfs_list_splice_init(&rpc_list, &lop->oop_pending);
665 /* first we find the pages we're allowed to work with */
666 cfs_list_for_each_entry_safe(oap, tmp, &lop->oop_pending,
668 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
669 "magic 0x%x\n", oap, oap->oap_magic);
671 if (page_count != 0 &&
672 srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
673 CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
674 " oap %p, page %p, srvlock %u\n",
675 oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
679 /* If there is a gap at the start of this page, it can't merge
680 * with any previous page, so we'll hand the network a
681 * "fragmented" page array that it can't transfer in 1 RDMA */
682 if (oap->oap_obj_off < starting_offset) {
683 if (starting_page_off != 0)
686 starting_page_off = oap->oap_page_off;
687 starting_offset = oap->oap_obj_off + starting_page_off;
688 } else if (oap->oap_page_off != 0)
691 /* in llite being 'ready' equates to the page being locked
692 * until completion unlocks it. commit_write submits a page
693 * as not ready because its unlock will happen unconditionally
694 * as the call returns. if we race with commit_write giving
695 * us that page we don't want to create a hole in the page
696 * stream, so we stop and leave the rpc to be fired by
697 * another dirtier or kupdated interval (the not ready page
698 * will still be on the dirty list). we could call in
699 * at the end of ll_file_write to process the queue again. */
700 if (!(oap->oap_async_flags & ASYNC_READY)) {
701 int rc = osc_make_ready(env, oap, cmd);
703 CDEBUG(D_INODE, "oap %p page %p returned %d "
704 "instead of ready\n", oap,
708 /* llite is telling us that the page is still
709 * in commit_write and that we should try
710 * and put it in an rpc again later. we
711 * break out of the loop so we don't create
712 * a hole in the sequence of pages in the rpc
717 /* the io isn't needed.. tell the checks
718 * below to complete the rpc with EINTR */
719 cfs_spin_lock(&oap->oap_lock);
720 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
721 cfs_spin_unlock(&oap->oap_lock);
722 oap->oap_count = -EINTR;
725 cfs_spin_lock(&oap->oap_lock);
726 oap->oap_async_flags |= ASYNC_READY;
727 cfs_spin_unlock(&oap->oap_lock);
730 LASSERTF(0, "oap %p page %p returned %d "
731 "from make_ready\n", oap,
739 /* take the page out of our book-keeping */
740 cfs_list_del_init(&oap->oap_pending_item);
741 lop_update_pending(cli, lop, cmd, -1);
742 cfs_list_del_init(&oap->oap_urgent_item);
744 /* ask the caller for the size of the io as the rpc leaves. */
745 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
746 oap->oap_count = osc_refresh_count(env, oap, cmd);
747 LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
749 if (oap->oap_count <= 0) {
750 CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
752 osc_ap_completion(env, cli, NULL,
753 oap, 0, oap->oap_count);
757 /* now put the page back in our accounting */
758 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
759 if (page_count++ == 0)
760 srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
762 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
765 /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
766 * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
767 * have the same alignment as the initial writes that allocated
768 * extents on the server. */
769 ending_offset = oap->oap_obj_off + oap->oap_page_off +
771 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
774 if (page_count >= cli->cl_max_pages_per_rpc)
777 /* If there is a gap at the end of this page, it can't merge
778 * with any subsequent pages, so we'll hand the network a
779 * "fragmented" page array that it can't transfer in 1 RDMA */
780 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
784 osc_list_maint(cli, osc);
786 client_obd_list_unlock(&cli->cl_loi_list_lock);
788 if (page_count == 0) {
789 client_obd_list_lock(&cli->cl_loi_list_lock);
794 cmd |= OBD_BRW_MEMALLOC;
795 rc = osc_build_rpc(env, cli, &rpc_list, page_count, cmd, pol);
797 LASSERT(cfs_list_empty(&rpc_list));
798 osc_list_maint(cli, osc);
802 starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
803 if (cmd == OBD_BRW_READ) {
804 cli->cl_r_in_flight++;
805 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
806 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
807 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
808 (starting_offset >> CFS_PAGE_SHIFT) + 1);
810 cli->cl_w_in_flight++;
811 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
812 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
813 cli->cl_w_in_flight);
814 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
815 (starting_offset >> CFS_PAGE_SHIFT) + 1);
821 #define list_to_obj(list, item) \
822 cfs_list_entry((list)->next, struct osc_object, oo_##item)
824 /* This is called by osc_check_rpcs() to find which objects have pages that
825 * we could be sending. These lists are maintained by osc_makes_rpc(). */
826 static struct osc_object *osc_next_obj(struct client_obd *cli)
830 /* First return objects that have blocked locks so that they
831 * will be flushed quickly and other clients can get the lock,
832 * then objects which have pages ready to be stuffed into RPCs */
833 if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
834 RETURN(list_to_obj(&cli->cl_loi_hp_ready_list, hp_ready_item));
835 if (!cfs_list_empty(&cli->cl_loi_ready_list))
836 RETURN(list_to_obj(&cli->cl_loi_ready_list, ready_item));
838 /* then if we have cache waiters, return all objects with queued
839 * writes. This is especially important when many small files
840 * have filled up the cache and not been fired into rpcs because
841 * they don't pass the nr_pending/object threshhold */
842 if (!cfs_list_empty(&cli->cl_cache_waiters) &&
843 !cfs_list_empty(&cli->cl_loi_write_list))
844 RETURN(list_to_obj(&cli->cl_loi_write_list, write_item));
846 /* then return all queued objects when we have an invalid import
847 * so that they get flushed */
848 if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
849 if (!cfs_list_empty(&cli->cl_loi_write_list))
850 RETURN(list_to_obj(&cli->cl_loi_write_list,
852 if (!cfs_list_empty(&cli->cl_loi_read_list))
853 RETURN(list_to_obj(&cli->cl_loi_read_list,
859 /* called with the loi list lock held */
860 static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
863 struct osc_object *osc;
864 int rc = 0, race_counter = 0;
867 while ((osc = osc_next_obj(cli)) != NULL) {
868 OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli));
870 if (osc_max_rpc_in_flight(cli, osc))
873 /* attempt some read/write balancing by alternating between
874 * reads and writes in an object. The makes_rpc checks here
875 * would be redundant if we were getting read/write work items
876 * instead of objects. we don't want send_oap_rpc to drain a
877 * partial read pending queue when we're given this object to
878 * do io on writes while there are cache waiters */
879 if (osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) {
880 rc = osc_send_oap_rpc(env, cli, osc, OBD_BRW_WRITE,
881 &osc->oo_write_pages, pol);
883 CERROR("Write request failed with %d\n", rc);
885 /* osc_send_oap_rpc failed, mostly because of
888 * It can't break here, because if:
889 * - a page was submitted by osc_io_submit, so
891 * - no request in flight
892 * - no subsequent request
893 * The system will be in live-lock state,
894 * because there is no chance to call
895 * osc_io_unplug() and osc_check_rpcs() any
896 * more. pdflush can't help in this case,
897 * because it might be blocked at grabbing
898 * the page lock as we mentioned.
900 * Anyway, continue to drain pages. */
909 if (osc_makes_rpc(cli, osc, OBD_BRW_READ)) {
910 rc = osc_send_oap_rpc(env, cli, osc, OBD_BRW_READ,
911 &osc->oo_read_pages, pol);
913 CERROR("Read request failed with %d\n", rc);
921 /* attempt some inter-object balancing by issuing rpcs
922 * for each object in turn */
923 if (!cfs_list_empty(&osc->oo_hp_ready_item))
924 cfs_list_del_init(&osc->oo_hp_ready_item);
925 if (!cfs_list_empty(&osc->oo_ready_item))
926 cfs_list_del_init(&osc->oo_ready_item);
927 if (!cfs_list_empty(&osc->oo_write_item))
928 cfs_list_del_init(&osc->oo_write_item);
929 if (!cfs_list_empty(&osc->oo_read_item))
930 cfs_list_del_init(&osc->oo_read_item);
932 osc_list_maint(cli, osc);
934 /* send_oap_rpc fails with 0 when make_ready tells it to
935 * back off. llite's make_ready does this when it tries
936 * to lock a page queued for write that is already locked.
937 * we want to try sending rpcs from many objects, but we
938 * don't want to spin failing with 0. */
939 if (race_counter == 10)
944 void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
945 struct osc_object *osc, pdl_policy_t pol)
948 osc_list_maint(cli, osc);
949 osc_check_rpcs(env, cli, pol);
952 int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
953 cfs_page_t *page, loff_t offset)
955 struct obd_export *exp = osc_export(osc);
956 struct osc_async_page *oap = &ops->ops_oap;
960 return cfs_size_round(sizeof(*oap));
962 oap->oap_magic = OAP_MAGIC;
963 oap->oap_cli = &exp->exp_obd->u.cli;
966 oap->oap_page = page;
967 oap->oap_obj_off = offset;
968 LASSERT(!(offset & ~CFS_PAGE_MASK));
970 if (!client_is_remote(exp) && cfs_capable(CFS_CAP_SYS_RESOURCE))
971 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
973 CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
974 CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
975 CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
977 cfs_spin_lock_init(&oap->oap_lock);
978 CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n",
979 oap, page, oap->oap_obj_off);
983 int osc_queue_async_io(const struct lu_env *env, struct osc_page *ops)
985 struct osc_async_page *oap = &ops->ops_oap;
986 struct client_obd *cli = oap->oap_cli;
987 struct osc_object *osc = oap->oap_obj;
988 struct obd_export *exp = osc_export(osc);
989 int brw_flags = OBD_BRW_ASYNC;
990 int cmd = OBD_BRW_WRITE;
994 if (oap->oap_magic != OAP_MAGIC)
997 if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1000 if (!cfs_list_empty(&oap->oap_pending_item) ||
1001 !cfs_list_empty(&oap->oap_urgent_item) ||
1002 !cfs_list_empty(&oap->oap_rpc_item))
1005 /* Set the OBD_BRW_SRVLOCK before the page is queued. */
1006 brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
1007 if (!client_is_remote(exp) && cfs_capable(CFS_CAP_SYS_RESOURCE)) {
1008 brw_flags |= OBD_BRW_NOQUOTA;
1009 cmd |= OBD_BRW_NOQUOTA;
1012 /* check if the file's owner/group is over quota */
1013 if (!(cmd & OBD_BRW_NOQUOTA)) {
1014 struct cl_object *obj;
1015 struct cl_attr *attr;
1016 unsigned int qid[MAXQUOTAS];
1018 obj = cl_object_top(&osc->oo_cl);
1019 attr = &osc_env_info(env)->oti_attr;
1021 cl_object_attr_lock(obj);
1022 rc = cl_object_attr_get(env, obj, attr);
1023 cl_object_attr_unlock(obj);
1025 qid[USRQUOTA] = attr->cat_uid;
1026 qid[GRPQUOTA] = attr->cat_gid;
1028 osc_quota_chkdq(cli, qid) == NO_QUOTA)
1034 client_obd_list_lock(&cli->cl_loi_list_lock);
1037 oap->oap_page_off = ops->ops_from;
1038 oap->oap_count = ops->ops_to - ops->ops_from;
1039 oap->oap_async_flags = 0;
1040 oap->oap_brw_flags = brw_flags;
1041 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
1042 if (cfs_memory_pressure_get())
1043 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1045 rc = osc_enter_cache(env, cli, oap);
1047 client_obd_list_unlock(&cli->cl_loi_list_lock);
1051 OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n",
1052 oap, oap->oap_page, cmd);
1054 osc_oap_to_pending(oap);
1055 osc_list_maint(cli, osc);
1056 if (!osc_max_rpc_in_flight(cli, osc) &&
1057 osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) {
1058 LASSERT(cli->cl_writeback_work != NULL);
1059 rc = ptlrpcd_queue_work(cli->cl_writeback_work);
1061 CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
1064 client_obd_list_unlock(&cli->cl_loi_list_lock);
1069 int osc_teardown_async_page(struct osc_object *obj, struct osc_page *ops)
1071 struct osc_async_page *oap = &ops->ops_oap;
1072 struct client_obd *cli = oap->oap_cli;
1073 struct osc_oap_pages *lop;
1077 if (oap->oap_magic != OAP_MAGIC)
1080 if (oap->oap_cmd & OBD_BRW_WRITE) {
1081 lop = &obj->oo_write_pages;
1083 lop = &obj->oo_read_pages;
1086 client_obd_list_lock(&cli->cl_loi_list_lock);
1088 if (!cfs_list_empty(&oap->oap_rpc_item))
1089 GOTO(out, rc = -EBUSY);
1091 osc_exit_cache(cli, oap, 0);
1092 osc_wake_cache_waiters(cli);
1094 if (!cfs_list_empty(&oap->oap_urgent_item)) {
1095 cfs_list_del_init(&oap->oap_urgent_item);
1096 cfs_spin_lock(&oap->oap_lock);
1097 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
1098 cfs_spin_unlock(&oap->oap_lock);
1100 if (!cfs_list_empty(&oap->oap_pending_item)) {
1101 cfs_list_del_init(&oap->oap_pending_item);
1102 lop_update_pending(cli, lop, oap->oap_cmd, -1);
1104 osc_list_maint(cli, obj);
1105 OSC_IO_DEBUG(obj, "oap %p page %p torn down\n", oap, oap->oap_page);
1107 client_obd_list_unlock(&cli->cl_loi_list_lock);
1111 /* aka (~was & now & flag), but this is more clear :) */
1112 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
1114 int osc_set_async_flags(struct osc_object *obj, struct osc_page *opg,
1115 obd_flag async_flags)
1117 struct osc_async_page *oap = &opg->ops_oap;
1118 struct osc_oap_pages *lop;
1122 LASSERT(!cfs_list_empty(&oap->oap_pending_item));
1124 if (oap->oap_cmd & OBD_BRW_WRITE) {
1125 lop = &obj->oo_write_pages;
1127 lop = &obj->oo_read_pages;
1130 if ((oap->oap_async_flags & async_flags) == async_flags)
1133 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
1134 flags |= ASYNC_READY;
1136 if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
1137 cfs_list_empty(&oap->oap_rpc_item)) {
1138 if (oap->oap_async_flags & ASYNC_HP)
1139 cfs_list_add(&oap->oap_urgent_item, &lop->oop_urgent);
1141 cfs_list_add_tail(&oap->oap_urgent_item,
1143 flags |= ASYNC_URGENT;
1144 osc_list_maint(oap->oap_cli, obj);
1146 cfs_spin_lock(&oap->oap_lock);
1147 oap->oap_async_flags |= flags;
1148 cfs_spin_unlock(&oap->oap_lock);
1150 OSC_IO_DEBUG(obj, "oap %p page %p has flags %x\n", oap,
1151 oap->oap_page, oap->oap_async_flags);
1156 * this is called when a sync waiter receives an interruption. Its job is to
1157 * get the caller woken as soon as possible. If its page hasn't been put in an
1158 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
1159 * desiring interruption which will forcefully complete the rpc once the rpc
1162 int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
1164 struct osc_async_page *oap = &ops->ops_oap;
1168 LASSERT(!oap->oap_interrupted);
1169 oap->oap_interrupted = 1;
1171 /* ok, it's been put in an rpc. only one oap gets a request reference */
1172 if (oap->oap_request != NULL) {
1173 ptlrpc_mark_interrupted(oap->oap_request);
1174 ptlrpcd_wake(oap->oap_request);
1175 ptlrpc_req_finished(oap->oap_request);
1176 oap->oap_request = NULL;
1180 * page completion may be called only if ->cpo_prep() method was
1181 * executed by osc_io_submit(), that also adds page the to pending list
1183 if (!cfs_list_empty(&oap->oap_pending_item)) {
1184 struct osc_oap_pages *lop;
1185 struct osc_object *osc = oap->oap_obj;
1187 cfs_list_del_init(&oap->oap_pending_item);
1188 cfs_list_del_init(&oap->oap_urgent_item);
1190 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1191 &osc->oo_write_pages : &osc->oo_read_pages;
1192 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1193 osc_list_maint(oap->oap_cli, osc);
1194 rc = osc_completion(env, oap, oap->oap_cmd, NULL, -EINTR);
1200 int osc_queue_sync_page(const struct lu_env *env, struct osc_page *opg,
1201 int cmd, int brw_flags)
1203 struct osc_async_page *oap = &opg->ops_oap;
1204 struct client_obd *cli = oap->oap_cli;
1209 oap->oap_page_off = opg->ops_from;
1210 oap->oap_count = opg->ops_to - opg->ops_from;
1211 oap->oap_brw_flags = OBD_BRW_SYNC | brw_flags;
1213 /* Give a hint to OST that requests are coming from kswapd - bug19529 */
1214 if (cfs_memory_pressure_get())
1215 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1217 if (!client_is_remote(osc_export(cl2osc(opg->ops_cl.cpl_obj))) &&
1218 cfs_capable(CFS_CAP_SYS_RESOURCE)) {
1219 oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
1220 oap->oap_cmd |= OBD_BRW_NOQUOTA;
1223 if (oap->oap_cmd & OBD_BRW_READ)
1224 flags = ASYNC_COUNT_STABLE;
1225 else if (!(oap->oap_brw_page.flag & OBD_BRW_FROM_GRANT))
1226 osc_enter_cache_try(env, cli, oap, 1);
1228 cfs_spin_lock(&oap->oap_lock);
1229 oap->oap_async_flags |= OSC_FLAGS | flags;
1230 cfs_spin_unlock(&oap->oap_lock);
1232 osc_oap_to_pending(oap);