Whamcloud - gitweb
9f12ad6ae857973aea7610f44553dea90e2718e7
[fs/lustre-release.git] / lustre / osc / osc_cache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * osc cache management.
40  *
41  * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_OSC
45
46 #include "osc_cl_internal.h"
47 #include "osc_internal.h"
48
49 static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
50                            struct osc_async_page *oap);
51 static int osc_enter_cache_try(const struct lu_env *env, struct client_obd *cli,
52                                struct osc_async_page *oap, int transient);
53 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
54                            int sent);
55
56 /** \addtogroup osc
57  *  @{
58  */
59
60 #define OSC_IO_DEBUG(OSC, STR, args...)                           \
61         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,     \
62                !cfs_list_empty(&(OSC)->oo_ready_item) ||                 \
63                !cfs_list_empty(&(OSC)->oo_hp_ready_item),               \
64                (OSC)->oo_write_pages.oop_num_pending,               \
65                !cfs_list_empty(&(OSC)->oo_write_pages.oop_urgent),       \
66                (OSC)->oo_read_pages.oop_num_pending,                 \
67                !cfs_list_empty(&(OSC)->oo_read_pages.oop_urgent),       \
68                args)
69
70 static inline struct osc_page *oap2osc_page(struct osc_async_page *oap)
71 {
72         return (struct osc_page *)container_of(oap, struct osc_page, ops_oap);
73 }
74
75 static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
76                           int cmd)
77 {
78         struct osc_page *opg  = oap2osc_page(oap);
79         struct cl_page  *page = cl_page_top(opg->ops_cl.cpl_page);
80         int result;
81
82         LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
83
84         ENTRY;
85         result = cl_page_make_ready(env, page, CRT_WRITE);
86         if (result == 0)
87                 opg->ops_submit_time = cfs_time_current();
88         RETURN(result);
89 }
90
91 static int osc_refresh_count(const struct lu_env *env,
92                              struct osc_async_page *oap, int cmd)
93 {
94         struct osc_page  *opg = oap2osc_page(oap);
95         struct cl_page   *page;
96         struct cl_object *obj;
97         struct cl_attr   *attr = &osc_env_info(env)->oti_attr;
98
99         int result;
100         loff_t kms;
101
102         /* readpage queues with _COUNT_STABLE, shouldn't get here. */
103         LASSERT(!(cmd & OBD_BRW_READ));
104         LASSERT(opg != NULL);
105         page = opg->ops_cl.cpl_page;
106         obj = opg->ops_cl.cpl_obj;
107
108         cl_object_attr_lock(obj);
109         result = cl_object_attr_get(env, obj, attr);
110         cl_object_attr_unlock(obj);
111         if (result < 0)
112                 return result;
113         kms = attr->cat_kms;
114         if (cl_offset(obj, page->cp_index) >= kms)
115                 /* catch race with truncate */
116                 return 0;
117         else if (cl_offset(obj, page->cp_index + 1) > kms)
118                 /* catch sub-page write at end of file */
119                 return kms % CFS_PAGE_SIZE;
120         else
121                 return CFS_PAGE_SIZE;
122 }
123
124 static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
125                           int cmd, struct obdo *oa, int rc)
126 {
127         struct osc_page   *opg  = oap2osc_page(oap);
128         struct cl_page    *page = cl_page_top(opg->ops_cl.cpl_page);
129         struct osc_object *obj  = cl2osc(opg->ops_cl.cpl_obj);
130         enum cl_req_type   crt;
131         int srvlock;
132
133         ENTRY;
134
135         cmd &= ~OBD_BRW_NOQUOTA;
136         LASSERT(equi(page->cp_state == CPS_PAGEIN,  cmd == OBD_BRW_READ));
137         LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE));
138         LASSERT(opg->ops_transfer_pinned);
139
140         /*
141          * page->cp_req can be NULL if io submission failed before
142          * cl_req was allocated.
143          */
144         if (page->cp_req != NULL)
145                 cl_req_page_done(env, page);
146         LASSERT(page->cp_req == NULL);
147
148         /* As the transfer for this page is being done, clear the flags */
149         cfs_spin_lock(&oap->oap_lock);
150         oap->oap_async_flags = 0;
151         cfs_spin_unlock(&oap->oap_lock);
152
153         crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
154         /* Clear opg->ops_transfer_pinned before VM lock is released. */
155         opg->ops_transfer_pinned = 0;
156
157         cfs_spin_lock(&obj->oo_seatbelt);
158         LASSERT(opg->ops_submitter != NULL);
159         LASSERT(!cfs_list_empty(&opg->ops_inflight));
160         cfs_list_del_init(&opg->ops_inflight);
161         cfs_spin_unlock(&obj->oo_seatbelt);
162
163         opg->ops_submit_time = 0;
164         srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
165
166         cl_page_completion(env, page, crt, rc);
167
168         /* statistic */
169         if (rc == 0 && srvlock) {
170                 struct lu_device *ld    = opg->ops_cl.cpl_obj->co_lu.lo_dev;
171                 struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
172                 int bytes = oap->oap_count;
173
174                 if (crt == CRT_READ)
175                         stats->os_lockless_reads += bytes;
176                 else
177                         stats->os_lockless_writes += bytes;
178         }
179
180         /*
181          * This has to be the last operation with the page, as locks are
182          * released in cl_page_completion() and nothing except for the
183          * reference counter protects page from concurrent reclaim.
184          */
185         lu_ref_del(&page->cp_reference, "transfer", page);
186         /*
187          * As page->cp_obj is pinned by a reference from page->cp_req, it is
188          * safe to call cl_page_put() without risking object destruction in a
189          * non-blocking context.
190          */
191         cl_page_put(env, page);
192         RETURN(0);
193 }
194
195 /* caller must hold loi_list_lock */
196 static void osc_consume_write_grant(struct client_obd *cli,
197                                     struct brw_page *pga)
198 {
199         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
200         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
201         cfs_atomic_inc(&obd_dirty_pages);
202         cli->cl_dirty += CFS_PAGE_SIZE;
203         cli->cl_avail_grant -= CFS_PAGE_SIZE;
204         pga->flag |= OBD_BRW_FROM_GRANT;
205         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
206                CFS_PAGE_SIZE, pga, pga->pg);
207         LASSERT(cli->cl_avail_grant >= 0);
208         osc_update_next_shrink(cli);
209 }
210
211 /* the companion to osc_consume_write_grant, called when a brw has completed.
212  * must be called with the loi lock held. */
213 static void osc_release_write_grant(struct client_obd *cli,
214                                     struct brw_page *pga, int sent)
215 {
216         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
217         ENTRY;
218
219         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
220         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
221                 EXIT;
222                 return;
223         }
224
225         pga->flag &= ~OBD_BRW_FROM_GRANT;
226         cfs_atomic_dec(&obd_dirty_pages);
227         cli->cl_dirty -= CFS_PAGE_SIZE;
228         if (pga->flag & OBD_BRW_NOCACHE) {
229                 pga->flag &= ~OBD_BRW_NOCACHE;
230                 cfs_atomic_dec(&obd_dirty_transit_pages);
231                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
232         }
233         if (!sent) {
234                 /* Reclaim grant from truncated pages. This is used to solve
235                  * write-truncate and grant all gone(to lost_grant) problem.
236                  * For a vfs write this problem can be easily solved by a sync
237                  * write, however, this is not an option for page_mkwrite()
238                  * because grant has to be allocated before a page becomes
239                  * dirty. */
240                 if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE)
241                         cli->cl_avail_grant += CFS_PAGE_SIZE;
242                 else
243                         cli->cl_lost_grant += CFS_PAGE_SIZE;
244                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
245                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
246         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
247                 /* For short writes we shouldn't count parts of pages that
248                  * span a whole block on the OST side, or our accounting goes
249                  * wrong.  Should match the code in filter_grant_check. */
250                 int offset = pga->off & ~CFS_PAGE_MASK;
251                 int count = pga->count + (offset & (blocksize - 1));
252                 int end = (offset + pga->count) & (blocksize - 1);
253                 if (end)
254                         count += blocksize - end;
255
256                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
257                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
258                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
259                        cli->cl_avail_grant, cli->cl_dirty);
260         }
261
262         EXIT;
263 }
264
265 /* The companion to osc_enter_cache(), called when @oap is no longer part of
266  * the dirty accounting.  Writeback completes or truncate happens before
267  * writing starts.  Must be called with the loi lock held. */
268 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
269                            int sent)
270 {
271         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
272 }
273
274 /**
275  * Non-blocking version of osc_enter_cache() that consumes grant only when it
276  * is available.
277  */
278 static int osc_enter_cache_try(const struct lu_env *env, struct client_obd *cli,
279                                struct osc_async_page *oap, int transient)
280 {
281         int has_grant;
282
283         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
284         if (has_grant) {
285                 osc_consume_write_grant(cli, &oap->oap_brw_page);
286                 if (transient) {
287                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
288                         cfs_atomic_inc(&obd_dirty_transit_pages);
289                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
290                 }
291         }
292         return has_grant;
293 }
294
295 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
296  * grant or cache space. */
297 static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
298                            struct osc_async_page *oap)
299 {
300         struct osc_object *osc = oap->oap_obj;
301         struct lov_oinfo  *loi = osc->oo_oinfo;
302         struct osc_cache_waiter ocw;
303         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
304         int rc = -EDQUOT;
305         ENTRY;
306
307         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
308                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
309                cli->cl_dirty_max, obd_max_dirty_pages,
310                cli->cl_lost_grant, cli->cl_avail_grant);
311
312         /* force the caller to try sync io.  this can jump the list
313          * of queued writes and create a discontiguous rpc stream */
314         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
315             cli->cl_dirty_max < CFS_PAGE_SIZE     ||
316             cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
317                 RETURN(-EDQUOT);
318
319         /* Hopefully normal case - cache space and write credits available */
320         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
321             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
322             osc_enter_cache_try(env, cli, oap, 0))
323                 RETURN(0);
324
325         /* We can get here for two reasons: too many dirty pages in cache, or
326          * run out of grants. In both cases we should write dirty pages out.
327          * Adding a cache waiter will trigger urgent write-out no matter what
328          * RPC size will be.
329          * The exiting condition is no avail grants and no dirty pages caching,
330          * that really means there is no space on the OST. */
331         cfs_waitq_init(&ocw.ocw_waitq);
332         ocw.ocw_oap = oap;
333         while (cli->cl_dirty > 0) {
334                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
335                 ocw.ocw_rc = 0;
336
337                 osc_io_unplug(env, cli, osc, PDL_POLICY_ROUND);
338                 client_obd_list_unlock(&cli->cl_loi_list_lock);
339
340                 CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
341                        cli->cl_import->imp_obd->obd_name, &ocw, oap);
342
343                 rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry),
344                                   &lwi);
345
346                 client_obd_list_lock(&cli->cl_loi_list_lock);
347                 cfs_list_del_init(&ocw.ocw_entry);
348                 if (rc < 0)
349                         break;
350
351                 rc = ocw.ocw_rc;
352                 if (rc != -EDQUOT)
353                         break;
354         }
355
356         RETURN(rc);
357 }
358
359 /* caller must hold loi_list_lock */
360 void osc_wake_cache_waiters(struct client_obd *cli)
361 {
362         cfs_list_t *l, *tmp;
363         struct osc_cache_waiter *ocw;
364
365         ENTRY;
366         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
367                 /* if we can't dirty more, we must wait until some is written */
368                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
369                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
370                     obd_max_dirty_pages)) {
371                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
372                                "osc max %ld, sys max %d\n", cli->cl_dirty,
373                                cli->cl_dirty_max, obd_max_dirty_pages);
374                         return;
375                 }
376
377                 /* if still dirty cache but no grant wait for pending RPCs that
378                  * may yet return us some grant before doing sync writes */
379                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
380                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
381                                cli->cl_w_in_flight);
382                         return;
383                 }
384
385                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
386                 cfs_list_del_init(&ocw->ocw_entry);
387                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
388                         /* no more RPCs in flight to return grant, do sync IO */
389                         ocw->ocw_rc = -EDQUOT;
390                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
391                 } else {
392                         osc_consume_write_grant(cli,
393                                                 &ocw->ocw_oap->oap_brw_page);
394                 }
395
396                 CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n",
397                        ocw, ocw->ocw_oap, cli->cl_avail_grant);
398
399                 cfs_waitq_signal(&ocw->ocw_waitq);
400         }
401
402         EXIT;
403 }
404
405 static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc)
406 {
407         struct osc_async_page *oap;
408         int hprpc = 0;
409
410         if (!cfs_list_empty(&osc->oo_write_pages.oop_urgent)) {
411                 oap = cfs_list_entry(osc->oo_write_pages.oop_urgent.next,
412                                      struct osc_async_page, oap_urgent_item);
413                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
414         }
415
416         if (!hprpc && !cfs_list_empty(&osc->oo_read_pages.oop_urgent)) {
417                 oap = cfs_list_entry(osc->oo_read_pages.oop_urgent.next,
418                                      struct osc_async_page, oap_urgent_item);
419                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
420         }
421
422         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
423 }
424
425 /* This maintains the lists of pending pages to read/write for a given object
426  * (lop).  This is used by osc_check_rpcs->osc_next_obj() and osc_list_maint()
427  * to quickly find objects that are ready to send an RPC. */
428 static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc,
429                          int cmd)
430 {
431         struct osc_oap_pages *lop;
432         ENTRY;
433
434         if (cmd & OBD_BRW_WRITE) {
435                 lop = &osc->oo_write_pages;
436         } else {
437                 lop = &osc->oo_read_pages;
438         }
439
440         if (lop->oop_num_pending == 0)
441                 RETURN(0);
442
443         /* if we have an invalid import we want to drain the queued pages
444          * by forcing them through rpcs that immediately fail and complete
445          * the pages.  recovery relies on this to empty the queued pages
446          * before canceling the locks and evicting down the llite pages */
447         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
448                 RETURN(1);
449
450         /* stream rpcs in queue order as long as as there is an urgent page
451          * queued.  this is our cheap solution for good batching in the case
452          * where writepage marks some random page in the middle of the file
453          * as urgent because of, say, memory pressure */
454         if (!cfs_list_empty(&lop->oop_urgent)) {
455                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
456                 RETURN(1);
457         }
458
459         if (cmd & OBD_BRW_WRITE) {
460                 /* trigger a write rpc stream as long as there are dirtiers
461                  * waiting for space.  as they're waiting, they're not going to
462                  * create more pages to coalesce with what's waiting.. */
463                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
464                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
465                         RETURN(1);
466                 }
467         }
468         if (lop->oop_num_pending >= cli->cl_max_pages_per_rpc)
469                 RETURN(1);
470
471         RETURN(0);
472 }
473
474 static void lop_update_pending(struct client_obd *cli,
475                                struct osc_oap_pages *lop, int cmd, int delta)
476 {
477         lop->oop_num_pending += delta;
478         if (cmd & OBD_BRW_WRITE)
479                 cli->cl_pending_w_pages += delta;
480         else
481                 cli->cl_pending_r_pages += delta;
482 }
483
484 static int osc_makes_hprpc(struct osc_oap_pages *lop)
485 {
486         struct osc_async_page *oap;
487         ENTRY;
488
489         if (cfs_list_empty(&lop->oop_urgent))
490                 RETURN(0);
491
492         oap = cfs_list_entry(lop->oop_urgent.next,
493                          struct osc_async_page, oap_urgent_item);
494
495         if (oap->oap_async_flags & ASYNC_HP) {
496                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
497                 RETURN(1);
498         }
499
500         RETURN(0);
501 }
502
503 static void on_list(cfs_list_t *item, cfs_list_t *list, int should_be_on)
504 {
505         if (cfs_list_empty(item) && should_be_on)
506                 cfs_list_add_tail(item, list);
507         else if (!cfs_list_empty(item) && !should_be_on)
508                 cfs_list_del_init(item);
509 }
510
511 /* maintain the osc's cli list membership invariants so that osc_send_oap_rpc
512  * can find pages to build into rpcs quickly */
513 static void osc_list_maint(struct client_obd *cli, struct osc_object *osc)
514 {
515         if (osc_makes_hprpc(&osc->oo_write_pages) ||
516             osc_makes_hprpc(&osc->oo_read_pages)) {
517                 /* HP rpc */
518                 on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list, 0);
519                 on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
520         } else {
521                 on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
522                 on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list,
523                         osc_makes_rpc(cli, osc, OBD_BRW_WRITE) ||
524                         osc_makes_rpc(cli, osc, OBD_BRW_READ));
525         }
526
527         on_list(&osc->oo_write_item, &cli->cl_loi_write_list,
528                 osc->oo_write_pages.oop_num_pending);
529
530         on_list(&osc->oo_read_item, &cli->cl_loi_read_list,
531                 osc->oo_read_pages.oop_num_pending);
532 }
533
534 /* this is trying to propogate async writeback errors back up to the
535  * application.  As an async write fails we record the error code for later if
536  * the app does an fsync.  As long as errors persist we force future rpcs to be
537  * sync so that the app can get a sync error and break the cycle of queueing
538  * pages for which writeback will fail. */
539 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
540                            int rc)
541 {
542         if (rc) {
543                 if (!ar->ar_rc)
544                         ar->ar_rc = rc;
545
546                 ar->ar_force_sync = 1;
547                 ar->ar_min_xid = ptlrpc_sample_next_xid();
548                 return;
549
550         }
551
552         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
553                 ar->ar_force_sync = 0;
554 }
555
556 static void osc_oap_to_pending(struct osc_async_page *oap)
557 {
558         struct osc_object    *osc = oap->oap_obj;
559         struct osc_oap_pages *lop;
560
561         if (oap->oap_cmd & OBD_BRW_WRITE)
562                 lop = &osc->oo_write_pages;
563         else
564                 lop = &osc->oo_read_pages;
565
566         if (oap->oap_async_flags & ASYNC_HP)
567                 cfs_list_add(&oap->oap_urgent_item, &lop->oop_urgent);
568         else if (oap->oap_async_flags & ASYNC_URGENT)
569                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->oop_urgent);
570         cfs_list_add_tail(&oap->oap_pending_item, &lop->oop_pending);
571         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
572 }
573
574 /* this must be called holding the loi list lock to give coverage to exit_cache,
575  * async_flag maintenance, and oap_request */
576 void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
577                        struct obdo *oa, struct osc_async_page *oap,
578                        int sent, int rc)
579 {
580         struct osc_object *osc = oap->oap_obj;
581         struct lov_oinfo  *loi = osc->oo_oinfo;
582         __u64 xid = 0;
583
584         ENTRY;
585         if (oap->oap_request != NULL) {
586                 xid = ptlrpc_req_xid(oap->oap_request);
587                 ptlrpc_req_finished(oap->oap_request);
588                 oap->oap_request = NULL;
589         }
590
591         cfs_spin_lock(&oap->oap_lock);
592         oap->oap_async_flags = 0;
593         cfs_spin_unlock(&oap->oap_lock);
594         oap->oap_interrupted = 0;
595
596         if (oap->oap_cmd & OBD_BRW_WRITE) {
597                 osc_process_ar(&cli->cl_ar, xid, rc);
598                 osc_process_ar(&loi->loi_ar, xid, rc);
599         }
600
601         if (rc == 0 && oa != NULL) {
602                 if (oa->o_valid & OBD_MD_FLBLOCKS)
603                         loi->loi_lvb.lvb_blocks = oa->o_blocks;
604                 if (oa->o_valid & OBD_MD_FLMTIME)
605                         loi->loi_lvb.lvb_mtime = oa->o_mtime;
606                 if (oa->o_valid & OBD_MD_FLATIME)
607                         loi->loi_lvb.lvb_atime = oa->o_atime;
608                 if (oa->o_valid & OBD_MD_FLCTIME)
609                         loi->loi_lvb.lvb_ctime = oa->o_ctime;
610         }
611
612         rc = osc_completion(env, oap, oap->oap_cmd, oa, rc);
613
614         /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
615          * start, but OSC calls it under lock and thus we can add oap back to
616          * pending safely */
617         if (rc)
618                 /* upper layer wants to leave the page on pending queue */
619                 osc_oap_to_pending(oap);
620         else
621                 osc_exit_cache(cli, oap, sent);
622         EXIT;
623 }
624
625 /**
626  * prepare pages for ASYNC io and put pages in send queue.
627  *
628  * \param cmd OBD_BRW_* macroses
629  * \param lop pending pages
630  *
631  * \return zero if no page added to send queue.
632  * \return 1 if pages successfully added to send queue.
633  * \return negative on errors.
634  */
635 static int
636 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
637                  struct osc_object *osc, int cmd,
638                  struct osc_oap_pages *lop, pdl_policy_t pol)
639 {
640         obd_count page_count = 0;
641         struct osc_async_page *oap = NULL, *tmp;
642         CFS_LIST_HEAD(rpc_list);
643         int srvlock = 0, mem_tight = 0;
644         obd_off starting_offset = OBD_OBJECT_EOF;
645         unsigned int ending_offset;
646         int starting_page_off = 0;
647         int rc;
648         ENTRY;
649
650         /* ASYNC_HP pages first. At present, when the lock the pages is
651          * to be canceled, the pages covered by the lock will be sent out
652          * with ASYNC_HP. We have to send out them as soon as possible. */
653         cfs_list_for_each_entry_safe(oap, tmp, &lop->oop_urgent, oap_urgent_item) {
654                 if (oap->oap_async_flags & ASYNC_HP)
655                         cfs_list_move(&oap->oap_pending_item, &rpc_list);
656                 else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
657                         /* only do this for writeback pages. */
658                         cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
659                 if (++page_count >= cli->cl_max_pages_per_rpc)
660                         break;
661         }
662         cfs_list_splice_init(&rpc_list, &lop->oop_pending);
663         page_count = 0;
664
665         /* first we find the pages we're allowed to work with */
666         cfs_list_for_each_entry_safe(oap, tmp, &lop->oop_pending,
667                                      oap_pending_item) {
668                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
669                          "magic 0x%x\n", oap, oap->oap_magic);
670
671                 if (page_count != 0 &&
672                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
673                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
674                                " oap %p, page %p, srvlock %u\n",
675                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
676                         break;
677                 }
678
679                 /* If there is a gap at the start of this page, it can't merge
680                  * with any previous page, so we'll hand the network a
681                  * "fragmented" page array that it can't transfer in 1 RDMA */
682                 if (oap->oap_obj_off < starting_offset) {
683                         if (starting_page_off != 0)
684                                 break;
685
686                         starting_page_off = oap->oap_page_off;
687                         starting_offset = oap->oap_obj_off + starting_page_off;
688                 } else if (oap->oap_page_off != 0)
689                         break;
690
691                 /* in llite being 'ready' equates to the page being locked
692                  * until completion unlocks it.  commit_write submits a page
693                  * as not ready because its unlock will happen unconditionally
694                  * as the call returns.  if we race with commit_write giving
695                  * us that page we don't want to create a hole in the page
696                  * stream, so we stop and leave the rpc to be fired by
697                  * another dirtier or kupdated interval (the not ready page
698                  * will still be on the dirty list).  we could call in
699                  * at the end of ll_file_write to process the queue again. */
700                 if (!(oap->oap_async_flags & ASYNC_READY)) {
701                         int rc = osc_make_ready(env, oap, cmd);
702                         if (rc < 0)
703                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
704                                                 "instead of ready\n", oap,
705                                                 oap->oap_page, rc);
706                         switch (rc) {
707                         case -EAGAIN:
708                                 /* llite is telling us that the page is still
709                                  * in commit_write and that we should try
710                                  * and put it in an rpc again later.  we
711                                  * break out of the loop so we don't create
712                                  * a hole in the sequence of pages in the rpc
713                                  * stream.*/
714                                 oap = NULL;
715                                 break;
716                         case -EINTR:
717                                 /* the io isn't needed.. tell the checks
718                                  * below to complete the rpc with EINTR */
719                                 cfs_spin_lock(&oap->oap_lock);
720                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
721                                 cfs_spin_unlock(&oap->oap_lock);
722                                 oap->oap_count = -EINTR;
723                                 break;
724                         case 0:
725                                 cfs_spin_lock(&oap->oap_lock);
726                                 oap->oap_async_flags |= ASYNC_READY;
727                                 cfs_spin_unlock(&oap->oap_lock);
728                                 break;
729                         default:
730                                 LASSERTF(0, "oap %p page %p returned %d "
731                                             "from make_ready\n", oap,
732                                             oap->oap_page, rc);
733                                 break;
734                         }
735                 }
736                 if (oap == NULL)
737                         break;
738
739                 /* take the page out of our book-keeping */
740                 cfs_list_del_init(&oap->oap_pending_item);
741                 lop_update_pending(cli, lop, cmd, -1);
742                 cfs_list_del_init(&oap->oap_urgent_item);
743
744                 /* ask the caller for the size of the io as the rpc leaves. */
745                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
746                         oap->oap_count = osc_refresh_count(env, oap, cmd);
747                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
748                 }
749                 if (oap->oap_count <= 0) {
750                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
751                                oap->oap_count);
752                         osc_ap_completion(env, cli, NULL,
753                                           oap, 0, oap->oap_count);
754                         continue;
755                 }
756
757                 /* now put the page back in our accounting */
758                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
759                 if (page_count++ == 0)
760                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
761
762                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
763                         mem_tight = 1;
764
765                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
766                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
767                  * have the same alignment as the initial writes that allocated
768                  * extents on the server. */
769                 ending_offset = oap->oap_obj_off + oap->oap_page_off +
770                                 oap->oap_count;
771                 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
772                         break;
773
774                 if (page_count >= cli->cl_max_pages_per_rpc)
775                         break;
776
777                 /* If there is a gap at the end of this page, it can't merge
778                  * with any subsequent pages, so we'll hand the network a
779                  * "fragmented" page array that it can't transfer in 1 RDMA */
780                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
781                         break;
782         }
783
784         osc_list_maint(cli, osc);
785
786         client_obd_list_unlock(&cli->cl_loi_list_lock);
787
788         if (page_count == 0) {
789                 client_obd_list_lock(&cli->cl_loi_list_lock);
790                 RETURN(0);
791         }
792
793         if (mem_tight)
794                 cmd |= OBD_BRW_MEMALLOC;
795         rc = osc_build_rpc(env, cli, &rpc_list, page_count, cmd, pol);
796         if (rc != 0) {
797                 LASSERT(cfs_list_empty(&rpc_list));
798                 osc_list_maint(cli, osc);
799                 RETURN(rc);
800         }
801
802         starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
803         if (cmd == OBD_BRW_READ) {
804                 cli->cl_r_in_flight++;
805                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
806                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
807                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
808                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
809         } else {
810                 cli->cl_w_in_flight++;
811                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
812                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
813                                  cli->cl_w_in_flight);
814                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
815                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
816         }
817
818         RETURN(1);
819 }
820
821 #define list_to_obj(list, item) \
822         cfs_list_entry((list)->next, struct osc_object, oo_##item)
823
824 /* This is called by osc_check_rpcs() to find which objects have pages that
825  * we could be sending.  These lists are maintained by osc_makes_rpc(). */
826 static struct osc_object *osc_next_obj(struct client_obd *cli)
827 {
828         ENTRY;
829
830         /* First return objects that have blocked locks so that they
831          * will be flushed quickly and other clients can get the lock,
832          * then objects which have pages ready to be stuffed into RPCs */
833         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
834                 RETURN(list_to_obj(&cli->cl_loi_hp_ready_list, hp_ready_item));
835         if (!cfs_list_empty(&cli->cl_loi_ready_list))
836                 RETURN(list_to_obj(&cli->cl_loi_ready_list, ready_item));
837
838         /* then if we have cache waiters, return all objects with queued
839          * writes.  This is especially important when many small files
840          * have filled up the cache and not been fired into rpcs because
841          * they don't pass the nr_pending/object threshhold */
842         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
843             !cfs_list_empty(&cli->cl_loi_write_list))
844                 RETURN(list_to_obj(&cli->cl_loi_write_list, write_item));
845
846         /* then return all queued objects when we have an invalid import
847          * so that they get flushed */
848         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
849                 if (!cfs_list_empty(&cli->cl_loi_write_list))
850                         RETURN(list_to_obj(&cli->cl_loi_write_list,
851                                            write_item));
852                 if (!cfs_list_empty(&cli->cl_loi_read_list))
853                         RETURN(list_to_obj(&cli->cl_loi_read_list,
854                                            read_item));
855         }
856         RETURN(NULL);
857 }
858
859 /* called with the loi list lock held */
860 static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
861                            pdl_policy_t pol)
862 {
863         struct osc_object *osc;
864         int rc = 0, race_counter = 0;
865         ENTRY;
866
867         while ((osc = osc_next_obj(cli)) != NULL) {
868                 OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli));
869
870                 if (osc_max_rpc_in_flight(cli, osc))
871                         break;
872
873                 /* attempt some read/write balancing by alternating between
874                  * reads and writes in an object.  The makes_rpc checks here
875                  * would be redundant if we were getting read/write work items
876                  * instead of objects.  we don't want send_oap_rpc to drain a
877                  * partial read pending queue when we're given this object to
878                  * do io on writes while there are cache waiters */
879                 if (osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) {
880                         rc = osc_send_oap_rpc(env, cli, osc, OBD_BRW_WRITE,
881                                               &osc->oo_write_pages, pol);
882                         if (rc < 0) {
883                                 CERROR("Write request failed with %d\n", rc);
884
885                                 /* osc_send_oap_rpc failed, mostly because of
886                                  * memory pressure.
887                                  *
888                                  * It can't break here, because if:
889                                  *  - a page was submitted by osc_io_submit, so
890                                  *    page locked;
891                                  *  - no request in flight
892                                  *  - no subsequent request
893                                  * The system will be in live-lock state,
894                                  * because there is no chance to call
895                                  * osc_io_unplug() and osc_check_rpcs() any
896                                  * more. pdflush can't help in this case,
897                                  * because it might be blocked at grabbing
898                                  * the page lock as we mentioned.
899                                  *
900                                  * Anyway, continue to drain pages. */
901                                 /* break; */
902                         }
903
904                         if (rc > 0)
905                                 race_counter = 0;
906                         else if (rc == 0)
907                                 race_counter++;
908                 }
909                 if (osc_makes_rpc(cli, osc, OBD_BRW_READ)) {
910                         rc = osc_send_oap_rpc(env, cli, osc, OBD_BRW_READ,
911                                               &osc->oo_read_pages, pol);
912                         if (rc < 0)
913                                 CERROR("Read request failed with %d\n", rc);
914
915                         if (rc > 0)
916                                 race_counter = 0;
917                         else if (rc == 0)
918                                 race_counter++;
919                 }
920
921                 /* attempt some inter-object balancing by issuing rpcs
922                  * for each object in turn */
923                 if (!cfs_list_empty(&osc->oo_hp_ready_item))
924                         cfs_list_del_init(&osc->oo_hp_ready_item);
925                 if (!cfs_list_empty(&osc->oo_ready_item))
926                         cfs_list_del_init(&osc->oo_ready_item);
927                 if (!cfs_list_empty(&osc->oo_write_item))
928                         cfs_list_del_init(&osc->oo_write_item);
929                 if (!cfs_list_empty(&osc->oo_read_item))
930                         cfs_list_del_init(&osc->oo_read_item);
931
932                 osc_list_maint(cli, osc);
933
934                 /* send_oap_rpc fails with 0 when make_ready tells it to
935                  * back off.  llite's make_ready does this when it tries
936                  * to lock a page queued for write that is already locked.
937                  * we want to try sending rpcs from many objects, but we
938                  * don't want to spin failing with 0.  */
939                 if (race_counter == 10)
940                         break;
941         }
942 }
943
944 void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
945                    struct osc_object *osc, pdl_policy_t pol)
946 {
947         if (osc)
948                 osc_list_maint(cli, osc);
949         osc_check_rpcs(env, cli, pol);
950 }
951
952 int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
953                         cfs_page_t *page, loff_t offset)
954 {
955         struct obd_export     *exp = osc_export(osc);
956         struct osc_async_page *oap = &ops->ops_oap;
957         ENTRY;
958
959         if (!page)
960                 return cfs_size_round(sizeof(*oap));
961
962         oap->oap_magic = OAP_MAGIC;
963         oap->oap_cli = &exp->exp_obd->u.cli;
964         oap->oap_obj = osc;
965
966         oap->oap_page = page;
967         oap->oap_obj_off = offset;
968         LASSERT(!(offset & ~CFS_PAGE_MASK));
969
970         if (!client_is_remote(exp) && cfs_capable(CFS_CAP_SYS_RESOURCE))
971                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
972
973         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
974         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
975         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
976
977         cfs_spin_lock_init(&oap->oap_lock);
978         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n",
979                oap, page, oap->oap_obj_off);
980         RETURN(0);
981 }
982
983 int osc_queue_async_io(const struct lu_env *env, struct osc_page *ops)
984 {
985         struct osc_async_page *oap = &ops->ops_oap;
986         struct client_obd     *cli = oap->oap_cli;
987         struct osc_object     *osc = oap->oap_obj;
988         struct obd_export     *exp = osc_export(osc);
989         int brw_flags = OBD_BRW_ASYNC;
990         int cmd = OBD_BRW_WRITE;
991         int rc = 0;
992         ENTRY;
993
994         if (oap->oap_magic != OAP_MAGIC)
995                 RETURN(-EINVAL);
996
997         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
998                 RETURN(-EIO);
999
1000         if (!cfs_list_empty(&oap->oap_pending_item) ||
1001             !cfs_list_empty(&oap->oap_urgent_item) ||
1002             !cfs_list_empty(&oap->oap_rpc_item))
1003                 RETURN(-EBUSY);
1004
1005         /* Set the OBD_BRW_SRVLOCK before the page is queued. */
1006         brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
1007         if (!client_is_remote(exp) && cfs_capable(CFS_CAP_SYS_RESOURCE)) {
1008                 brw_flags |= OBD_BRW_NOQUOTA;
1009                 cmd |= OBD_BRW_NOQUOTA;
1010         }
1011
1012         /* check if the file's owner/group is over quota */
1013         if (!(cmd & OBD_BRW_NOQUOTA)) {
1014                 struct cl_object *obj;
1015                 struct cl_attr   *attr;
1016                 unsigned int qid[MAXQUOTAS];
1017
1018                 obj = cl_object_top(&osc->oo_cl);
1019                 attr = &osc_env_info(env)->oti_attr;
1020
1021                 cl_object_attr_lock(obj);
1022                 rc = cl_object_attr_get(env, obj, attr);
1023                 cl_object_attr_unlock(obj);
1024
1025                 qid[USRQUOTA] = attr->cat_uid;
1026                 qid[GRPQUOTA] = attr->cat_gid;
1027                 if (rc == 0 &&
1028                     osc_quota_chkdq(cli, qid) == NO_QUOTA)
1029                         rc = -EDQUOT;
1030                 if (rc)
1031                         RETURN(rc);
1032         }
1033
1034         client_obd_list_lock(&cli->cl_loi_list_lock);
1035
1036         oap->oap_cmd = cmd;
1037         oap->oap_page_off = ops->ops_from;
1038         oap->oap_count = ops->ops_to - ops->ops_from;
1039         oap->oap_async_flags = 0;
1040         oap->oap_brw_flags = brw_flags;
1041         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
1042         if (cfs_memory_pressure_get())
1043                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1044
1045         rc = osc_enter_cache(env, cli, oap);
1046         if (rc) {
1047                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1048                 RETURN(rc);
1049         }
1050
1051         OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n",
1052                      oap, oap->oap_page, cmd);
1053
1054         osc_oap_to_pending(oap);
1055         osc_list_maint(cli, osc);
1056         if (!osc_max_rpc_in_flight(cli, osc) &&
1057             osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) {
1058                 LASSERT(cli->cl_writeback_work != NULL);
1059                 rc = ptlrpcd_queue_work(cli->cl_writeback_work);
1060
1061                 CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
1062                        cli, rc);
1063         }
1064         client_obd_list_unlock(&cli->cl_loi_list_lock);
1065
1066         RETURN(0);
1067 }
1068
1069 int osc_teardown_async_page(struct osc_object *obj, struct osc_page *ops)
1070 {
1071         struct osc_async_page *oap = &ops->ops_oap;
1072         struct client_obd     *cli = oap->oap_cli;
1073         struct osc_oap_pages  *lop;
1074         int rc = 0;
1075         ENTRY;
1076
1077         if (oap->oap_magic != OAP_MAGIC)
1078                 RETURN(-EINVAL);
1079
1080         if (oap->oap_cmd & OBD_BRW_WRITE) {
1081                 lop = &obj->oo_write_pages;
1082         } else {
1083                 lop = &obj->oo_read_pages;
1084         }
1085
1086         client_obd_list_lock(&cli->cl_loi_list_lock);
1087
1088         if (!cfs_list_empty(&oap->oap_rpc_item))
1089                 GOTO(out, rc = -EBUSY);
1090
1091         osc_exit_cache(cli, oap, 0);
1092         osc_wake_cache_waiters(cli);
1093
1094         if (!cfs_list_empty(&oap->oap_urgent_item)) {
1095                 cfs_list_del_init(&oap->oap_urgent_item);
1096                 cfs_spin_lock(&oap->oap_lock);
1097                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
1098                 cfs_spin_unlock(&oap->oap_lock);
1099         }
1100         if (!cfs_list_empty(&oap->oap_pending_item)) {
1101                 cfs_list_del_init(&oap->oap_pending_item);
1102                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
1103         }
1104         osc_list_maint(cli, obj);
1105         OSC_IO_DEBUG(obj, "oap %p page %p torn down\n", oap, oap->oap_page);
1106 out:
1107         client_obd_list_unlock(&cli->cl_loi_list_lock);
1108         RETURN(rc);
1109 }
1110
1111 /* aka (~was & now & flag), but this is more clear :) */
1112 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
1113
1114 int osc_set_async_flags(struct osc_object *obj, struct osc_page *opg,
1115                         obd_flag async_flags)
1116 {
1117         struct osc_async_page *oap = &opg->ops_oap;
1118         struct osc_oap_pages *lop;
1119         int flags = 0;
1120         ENTRY;
1121
1122         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
1123
1124         if (oap->oap_cmd & OBD_BRW_WRITE) {
1125                 lop = &obj->oo_write_pages;
1126         } else {
1127                 lop = &obj->oo_read_pages;
1128         }
1129
1130         if ((oap->oap_async_flags & async_flags) == async_flags)
1131                 RETURN(0);
1132
1133         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
1134                 flags |= ASYNC_READY;
1135
1136         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
1137             cfs_list_empty(&oap->oap_rpc_item)) {
1138                 if (oap->oap_async_flags & ASYNC_HP)
1139                         cfs_list_add(&oap->oap_urgent_item, &lop->oop_urgent);
1140                 else
1141                         cfs_list_add_tail(&oap->oap_urgent_item,
1142                                           &lop->oop_urgent);
1143                 flags |= ASYNC_URGENT;
1144                 osc_list_maint(oap->oap_cli, obj);
1145         }
1146         cfs_spin_lock(&oap->oap_lock);
1147         oap->oap_async_flags |= flags;
1148         cfs_spin_unlock(&oap->oap_lock);
1149
1150         OSC_IO_DEBUG(obj, "oap %p page %p has flags %x\n", oap,
1151                      oap->oap_page, oap->oap_async_flags);
1152         RETURN(0);
1153 }
1154
1155 /**
1156  * this is called when a sync waiter receives an interruption.  Its job is to
1157  * get the caller woken as soon as possible.  If its page hasn't been put in an
1158  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1159  * desiring interruption which will forcefully complete the rpc once the rpc
1160  * has timed out.
1161  */
1162 int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
1163 {
1164         struct osc_async_page *oap = &ops->ops_oap;
1165         int rc = -EBUSY;
1166         ENTRY;
1167
1168         LASSERT(!oap->oap_interrupted);
1169         oap->oap_interrupted = 1;
1170
1171         /* ok, it's been put in an rpc. only one oap gets a request reference */
1172         if (oap->oap_request != NULL) {
1173                 ptlrpc_mark_interrupted(oap->oap_request);
1174                 ptlrpcd_wake(oap->oap_request);
1175                 ptlrpc_req_finished(oap->oap_request);
1176                 oap->oap_request = NULL;
1177         }
1178
1179         /*
1180          * page completion may be called only if ->cpo_prep() method was
1181          * executed by osc_io_submit(), that also adds page the to pending list
1182          */
1183         if (!cfs_list_empty(&oap->oap_pending_item)) {
1184                 struct osc_oap_pages *lop;
1185                 struct osc_object *osc = oap->oap_obj;
1186
1187                 cfs_list_del_init(&oap->oap_pending_item);
1188                 cfs_list_del_init(&oap->oap_urgent_item);
1189
1190                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1191                         &osc->oo_write_pages : &osc->oo_read_pages;
1192                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1193                 osc_list_maint(oap->oap_cli, osc);
1194                 rc = osc_completion(env, oap, oap->oap_cmd, NULL, -EINTR);
1195         }
1196
1197         RETURN(rc);
1198 }
1199
1200 int osc_queue_sync_page(const struct lu_env *env, struct osc_page *opg,
1201                         int cmd, int brw_flags)
1202 {
1203         struct osc_async_page *oap = &opg->ops_oap;
1204         struct client_obd     *cli = oap->oap_cli;
1205         int flags = 0;
1206         ENTRY;
1207
1208         oap->oap_cmd       = cmd;
1209         oap->oap_page_off  = opg->ops_from;
1210         oap->oap_count     = opg->ops_to - opg->ops_from;
1211         oap->oap_brw_flags = OBD_BRW_SYNC | brw_flags;
1212
1213         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
1214         if (cfs_memory_pressure_get())
1215                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1216
1217         if (!client_is_remote(osc_export(cl2osc(opg->ops_cl.cpl_obj))) &&
1218             cfs_capable(CFS_CAP_SYS_RESOURCE)) {
1219                 oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
1220                 oap->oap_cmd |= OBD_BRW_NOQUOTA;
1221         }
1222
1223         if (oap->oap_cmd & OBD_BRW_READ)
1224                 flags = ASYNC_COUNT_STABLE;
1225         else if (!(oap->oap_brw_page.flag & OBD_BRW_FROM_GRANT))
1226                 osc_enter_cache_try(env, cli, oap, 1);
1227
1228         cfs_spin_lock(&oap->oap_lock);
1229         oap->oap_async_flags |= OSC_FLAGS | flags;
1230         cfs_spin_unlock(&oap->oap_lock);
1231
1232         osc_oap_to_pending(oap);
1233         RETURN(0);
1234 }
1235
1236 /** @} osc */