Whamcloud - gitweb
2b405f4d9c7ee865cf033080906c237002fdf6c8
[fs/lustre-release.git] / lustre / osc / osc_page.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_page for OSC layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_OSC
43
44 #include "osc_cl_internal.h"
45
46 static void osc_lru_del(struct client_obd *cli, struct osc_page *opg);
47 static void osc_lru_use(struct client_obd *cli, struct osc_page *opg);
48 static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj,
49                            struct osc_page *opg);
50
51 /** \addtogroup osc
52  *  @{
53  */
54
55 /*
56  * Comment out osc_page_protected because it may sleep inside the
57  * the client_obd_list_lock.
58  * client_obd_list_lock -> osc_ap_completion -> osc_completion ->
59  *   -> osc_page_protected -> osc_page_is_dlocked -> osc_match_base
60  *   -> ldlm_lock_match -> sptlrpc_import_check_ctx -> sleep.
61  */
62 #if 0
63 static int osc_page_is_dlocked(const struct lu_env *env,
64                                const struct osc_page *opg,
65                                enum cl_lock_mode mode, int pending, int unref)
66 {
67         struct cl_page         *page;
68         struct osc_object      *obj;
69         struct osc_thread_info *info;
70         struct ldlm_res_id     *resname;
71         struct lustre_handle   *lockh;
72         ldlm_policy_data_t     *policy;
73         ldlm_mode_t             dlmmode;
74         __u64                   flags;
75
76         might_sleep();
77
78         info = osc_env_info(env);
79         resname = &info->oti_resname;
80         policy = &info->oti_policy;
81         lockh = &info->oti_handle;
82         page = opg->ops_cl.cpl_page;
83         obj = cl2osc(opg->ops_cl.cpl_obj);
84
85         flags = LDLM_FL_TEST_LOCK | LDLM_FL_BLOCK_GRANTED;
86         if (pending)
87                 flags |= LDLM_FL_CBPENDING;
88
89         dlmmode = osc_cl_lock2ldlm(mode) | LCK_PW;
90         osc_lock_build_res(env, obj, resname);
91         osc_index2policy(policy, page->cp_obj, page->cp_index, page->cp_index);
92         return osc_match_base(osc_export(obj), resname, LDLM_EXTENT, policy,
93                               dlmmode, &flags, NULL, lockh, unref);
94 }
95
96 /**
97  * Checks an invariant that a page in the cache is covered by a lock, as
98  * needed.
99  */
100 static int osc_page_protected(const struct lu_env *env,
101                               const struct osc_page *opg,
102                               enum cl_lock_mode mode, int unref)
103 {
104         struct cl_object_header *hdr;
105         struct cl_lock          *scan;
106         struct cl_page          *page;
107         struct cl_lock_descr    *descr;
108         int result;
109
110         LINVRNT(!opg->ops_temp);
111
112         page = opg->ops_cl.cpl_page;
113         if (page->cp_owner != NULL &&
114             cl_io_top(page->cp_owner)->ci_lockreq == CILR_NEVER)
115                 /*
116                  * If IO is done without locks (liblustre, or lloop), lock is
117                  * not required.
118                  */
119                 result = 1;
120         else
121                 /* otherwise check for a DLM lock */
122         result = osc_page_is_dlocked(env, opg, mode, 1, unref);
123         if (result == 0) {
124                 /* maybe this page is a part of a lockless io? */
125                 hdr = cl_object_header(opg->ops_cl.cpl_obj);
126                 descr = &osc_env_info(env)->oti_descr;
127                 descr->cld_mode = mode;
128                 descr->cld_start = page->cp_index;
129                 descr->cld_end   = page->cp_index;
130                 spin_lock(&hdr->coh_lock_guard);
131                 cfs_list_for_each_entry(scan, &hdr->coh_locks, cll_linkage) {
132                         /*
133                          * Lock-less sub-lock has to be either in HELD state
134                          * (when io is actively going on), or in CACHED state,
135                          * when top-lock is being unlocked:
136                          * cl_io_unlock()->cl_unuse()->...->lov_lock_unuse().
137                          */
138                         if ((scan->cll_state == CLS_HELD ||
139                              scan->cll_state == CLS_CACHED) &&
140                             cl_lock_ext_match(&scan->cll_descr, descr)) {
141                                 struct osc_lock *olck;
142
143                                 olck = osc_lock_at(scan);
144                                 result = osc_lock_is_lockless(olck);
145                                 break;
146                         }
147                 }
148                 spin_unlock(&hdr->coh_lock_guard);
149         }
150         return result;
151 }
152 #else
153 static int osc_page_protected(const struct lu_env *env,
154                               const struct osc_page *opg,
155                               enum cl_lock_mode mode, int unref)
156 {
157         return 1;
158 }
159 #endif
160
161 /*****************************************************************************
162  *
163  * Page operations.
164  *
165  */
166 static void osc_page_fini(const struct lu_env *env,
167                           struct cl_page_slice *slice)
168 {
169         struct osc_page *opg = cl2osc_page(slice);
170         CDEBUG(D_TRACE, "%p\n", opg);
171         LASSERT(opg->ops_lock == NULL);
172 }
173
174 static void osc_page_transfer_get(struct osc_page *opg, const char *label)
175 {
176         struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page);
177
178         LASSERT(!opg->ops_transfer_pinned);
179         cl_page_get(page);
180         lu_ref_add_atomic(&page->cp_reference, label, page);
181         opg->ops_transfer_pinned = 1;
182 }
183
184 static void osc_page_transfer_put(const struct lu_env *env,
185                                   struct osc_page *opg)
186 {
187         struct cl_page *page = cl_page_top(opg->ops_cl.cpl_page);
188
189         if (opg->ops_transfer_pinned) {
190                 opg->ops_transfer_pinned = 0;
191                 lu_ref_del(&page->cp_reference, "transfer", page);
192                 cl_page_put(env, page);
193         }
194 }
195
196 /**
197  * This is called once for every page when it is submitted for a transfer
198  * either opportunistic (osc_page_cache_add()), or immediate
199  * (osc_page_submit()).
200  */
201 static void osc_page_transfer_add(const struct lu_env *env,
202                                   struct osc_page *opg, enum cl_req_type crt)
203 {
204         struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
205
206         /* ops_lru and ops_inflight share the same field, so take it from LRU
207          * first and then use it as inflight. */
208         osc_lru_use(osc_cli(obj), opg);
209
210         spin_lock(&obj->oo_seatbelt);
211         cfs_list_add(&opg->ops_inflight, &obj->oo_inflight[crt]);
212         opg->ops_submitter = current;
213         spin_unlock(&obj->oo_seatbelt);
214 }
215
216 int osc_page_cache_add(const struct lu_env *env,
217                         const struct cl_page_slice *slice, struct cl_io *io)
218 {
219         struct osc_page *opg = cl2osc_page(slice);
220         int result;
221         ENTRY;
222
223         LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0));
224
225         osc_page_transfer_get(opg, "transfer\0cache");
226         result = osc_queue_async_io(env, io, opg);
227         if (result != 0)
228                 osc_page_transfer_put(env, opg);
229         else
230                 osc_page_transfer_add(env, opg, CRT_WRITE);
231
232         RETURN(result);
233 }
234
235 void osc_index2policy(ldlm_policy_data_t *policy, const struct cl_object *obj,
236                       pgoff_t start, pgoff_t end)
237 {
238         memset(policy, 0, sizeof *policy);
239         policy->l_extent.start = cl_offset(obj, start);
240         policy->l_extent.end   = cl_offset(obj, end + 1) - 1;
241 }
242
243 static int osc_page_addref_lock(const struct lu_env *env,
244                                 struct osc_page *opg,
245                                 struct cl_lock *lock)
246 {
247         struct osc_lock *olock;
248         int              rc;
249
250         LASSERT(opg->ops_lock == NULL);
251
252         olock = osc_lock_at(lock);
253         if (cfs_atomic_inc_return(&olock->ols_pageref) <= 0) {
254                 cfs_atomic_dec(&olock->ols_pageref);
255                 rc = -ENODATA;
256         } else {
257                 cl_lock_get(lock);
258                 opg->ops_lock = lock;
259                 rc = 0;
260         }
261         return rc;
262 }
263
264 static void osc_page_putref_lock(const struct lu_env *env,
265                                  struct osc_page *opg)
266 {
267         struct cl_lock  *lock = opg->ops_lock;
268         struct osc_lock *olock;
269
270         LASSERT(lock != NULL);
271         olock = osc_lock_at(lock);
272
273         cfs_atomic_dec(&olock->ols_pageref);
274         opg->ops_lock = NULL;
275
276         cl_lock_put(env, lock);
277 }
278
279 static int osc_page_is_under_lock(const struct lu_env *env,
280                                   const struct cl_page_slice *slice,
281                                   struct cl_io *unused)
282 {
283         struct cl_lock *lock;
284         int             result = -ENODATA;
285
286         ENTRY;
287         lock = cl_lock_at_page(env, slice->cpl_obj, slice->cpl_page,
288                                NULL, 1, 0);
289         if (lock != NULL) {
290                 if (osc_page_addref_lock(env, cl2osc_page(slice), lock) == 0)
291                         result = -EBUSY;
292                 cl_lock_put(env, lock);
293         }
294         RETURN(result);
295 }
296
297 static void osc_page_disown(const struct lu_env *env,
298                             const struct cl_page_slice *slice,
299                             struct cl_io *io)
300 {
301         struct osc_page *opg = cl2osc_page(slice);
302
303         if (unlikely(opg->ops_lock))
304                 osc_page_putref_lock(env, opg);
305 }
306
307 static void osc_page_completion_read(const struct lu_env *env,
308                                      const struct cl_page_slice *slice,
309                                      int ioret)
310 {
311         struct osc_page   *opg = cl2osc_page(slice);
312
313         if (likely(opg->ops_lock))
314                 osc_page_putref_lock(env, opg);
315 }
316
317 static void osc_page_completion_write(const struct lu_env *env,
318                                       const struct cl_page_slice *slice,
319                                       int ioret)
320 {
321 }
322
323 static const char *osc_list(cfs_list_t *head)
324 {
325         return cfs_list_empty(head) ? "-" : "+";
326 }
327
328 static inline cfs_time_t osc_submit_duration(struct osc_page *opg)
329 {
330         if (opg->ops_submit_time == 0)
331                 return 0;
332
333         return (cfs_time_current() - opg->ops_submit_time);
334 }
335
336 static int osc_page_print(const struct lu_env *env,
337                           const struct cl_page_slice *slice,
338                           void *cookie, lu_printer_t printer)
339 {
340         struct osc_page       *opg = cl2osc_page(slice);
341         struct osc_async_page *oap = &opg->ops_oap;
342         struct osc_object     *obj = cl2osc(slice->cpl_obj);
343         struct client_obd     *cli = &osc_export(obj)->exp_obd->u.cli;
344
345         return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p: "
346                           "1< %#x %d %u %s %s > "
347                           "2< "LPU64" %u %u %#x %#x | %p %p %p > "
348                           "3< %s %p %d %lu %d > "
349                           "4< %d %d %d %lu %s | %s %s %s %s > "
350                           "5< %s %s %s %s | %d %s | %d %s %s>\n",
351                           opg,
352                           /* 1 */
353                           oap->oap_magic, oap->oap_cmd,
354                           oap->oap_interrupted,
355                           osc_list(&oap->oap_pending_item),
356                           osc_list(&oap->oap_rpc_item),
357                           /* 2 */
358                           oap->oap_obj_off, oap->oap_page_off, oap->oap_count,
359                           oap->oap_async_flags, oap->oap_brw_flags,
360                           oap->oap_request, oap->oap_cli, obj,
361                           /* 3 */
362                           osc_list(&opg->ops_inflight),
363                           opg->ops_submitter, opg->ops_transfer_pinned,
364                           osc_submit_duration(opg), opg->ops_srvlock,
365                           /* 4 */
366                           cli->cl_r_in_flight, cli->cl_w_in_flight,
367                           cli->cl_max_rpcs_in_flight,
368                           cli->cl_avail_grant,
369                           osc_list(&cli->cl_cache_waiters),
370                           osc_list(&cli->cl_loi_ready_list),
371                           osc_list(&cli->cl_loi_hp_ready_list),
372                           osc_list(&cli->cl_loi_write_list),
373                           osc_list(&cli->cl_loi_read_list),
374                           /* 5 */
375                           osc_list(&obj->oo_ready_item),
376                           osc_list(&obj->oo_hp_ready_item),
377                           osc_list(&obj->oo_write_item),
378                           osc_list(&obj->oo_read_item),
379                           cfs_atomic_read(&obj->oo_nr_reads),
380                           osc_list(&obj->oo_reading_exts),
381                           cfs_atomic_read(&obj->oo_nr_writes),
382                           osc_list(&obj->oo_hp_exts),
383                           osc_list(&obj->oo_urgent_exts));
384 }
385
386 static void osc_page_delete(const struct lu_env *env,
387                             const struct cl_page_slice *slice)
388 {
389         struct osc_page   *opg = cl2osc_page(slice);
390         struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
391         int rc;
392
393         LINVRNT(opg->ops_temp || osc_page_protected(env, opg, CLM_READ, 1));
394
395         ENTRY;
396         CDEBUG(D_TRACE, "%p\n", opg);
397         osc_page_transfer_put(env, opg);
398         rc = osc_teardown_async_page(env, obj, opg);
399         if (rc) {
400                 CL_PAGE_DEBUG(D_ERROR, env, cl_page_top(slice->cpl_page),
401                               "Trying to teardown failed: %d\n", rc);
402                 LASSERT(0);
403         }
404
405         spin_lock(&obj->oo_seatbelt);
406         if (opg->ops_submitter != NULL) {
407                 LASSERT(!cfs_list_empty(&opg->ops_inflight));
408                 cfs_list_del_init(&opg->ops_inflight);
409                 opg->ops_submitter = NULL;
410         }
411         spin_unlock(&obj->oo_seatbelt);
412
413         osc_lru_del(osc_cli(obj), opg);
414
415         if (slice->cpl_page->cp_type == CPT_CACHEABLE) {
416                 void *value;
417
418                 spin_lock(&obj->oo_tree_lock);
419                 value = radix_tree_delete(&obj->oo_tree, osc_index(opg));
420                 if (value != NULL)
421                         --obj->oo_npages;
422                 spin_unlock(&obj->oo_tree_lock);
423
424                 LASSERT(ergo(value != NULL, value == opg));
425         }
426
427         EXIT;
428 }
429
430 void osc_page_clip(const struct lu_env *env, const struct cl_page_slice *slice,
431                    int from, int to)
432 {
433         struct osc_page       *opg = cl2osc_page(slice);
434         struct osc_async_page *oap = &opg->ops_oap;
435
436         LINVRNT(osc_page_protected(env, opg, CLM_READ, 0));
437
438         opg->ops_from = from;
439         opg->ops_to   = to;
440         spin_lock(&oap->oap_lock);
441         oap->oap_async_flags |= ASYNC_COUNT_STABLE;
442         spin_unlock(&oap->oap_lock);
443 }
444
445 static int osc_page_cancel(const struct lu_env *env,
446                            const struct cl_page_slice *slice)
447 {
448         struct osc_page *opg = cl2osc_page(slice);
449         int rc = 0;
450
451         LINVRNT(osc_page_protected(env, opg, CLM_READ, 0));
452
453         /* Check if the transferring against this page
454          * is completed, or not even queued. */
455         if (opg->ops_transfer_pinned)
456                 /* FIXME: may not be interrupted.. */
457                 rc = osc_cancel_async_page(env, opg);
458         LASSERT(ergo(rc == 0, opg->ops_transfer_pinned == 0));
459         return rc;
460 }
461
462 static int osc_page_flush(const struct lu_env *env,
463                           const struct cl_page_slice *slice,
464                           struct cl_io *io)
465 {
466         struct osc_page *opg = cl2osc_page(slice);
467         int rc = 0;
468         ENTRY;
469         rc = osc_flush_async_page(env, io, opg);
470         RETURN(rc);
471 }
472
473 static const struct cl_page_operations osc_page_ops = {
474         .cpo_fini          = osc_page_fini,
475         .cpo_print         = osc_page_print,
476         .cpo_delete        = osc_page_delete,
477         .cpo_is_under_lock = osc_page_is_under_lock,
478         .cpo_disown        = osc_page_disown,
479         .io = {
480                 [CRT_READ] = {
481                         .cpo_completion = osc_page_completion_read
482                 },
483                 [CRT_WRITE] = {
484                         .cpo_completion = osc_page_completion_write
485                 }
486         },
487         .cpo_clip           = osc_page_clip,
488         .cpo_cancel         = osc_page_cancel,
489         .cpo_flush          = osc_page_flush
490 };
491
492 int osc_page_init(const struct lu_env *env, struct cl_object *obj,
493                   struct cl_page *page, struct page *vmpage)
494 {
495         struct osc_object *osc = cl2osc(obj);
496         struct osc_page   *opg = cl_object_page_slice(obj, page);
497         int result;
498
499         opg->ops_from = 0;
500         opg->ops_to   = PAGE_CACHE_SIZE;
501
502         result = osc_prep_async_page(osc, opg, vmpage,
503                                         cl_offset(obj, page->cp_index));
504         if (result == 0) {
505                 struct osc_io *oio = osc_env_io(env);
506                 opg->ops_srvlock = osc_io_srvlock(oio);
507                 cl_page_slice_add(page, &opg->ops_cl, obj,
508                                 &osc_page_ops);
509         }
510         /*
511          * Cannot assert osc_page_protected() here as read-ahead
512          * creates temporary pages outside of a lock.
513          */
514 #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
515         opg->ops_temp = !osc_page_protected(env, opg, CLM_READ, 1);
516 #endif
517         /* ops_inflight and ops_lru are the same field, but it doesn't
518          * hurt to initialize it twice :-) */
519         CFS_INIT_LIST_HEAD(&opg->ops_inflight);
520         CFS_INIT_LIST_HEAD(&opg->ops_lru);
521
522         /* reserve an LRU space for this page */
523         if (page->cp_type == CPT_CACHEABLE && result == 0) {
524                 result = osc_lru_reserve(env, osc, opg);
525                 if (result == 0) {
526                         spin_lock(&osc->oo_tree_lock);
527                         result = radix_tree_insert(&osc->oo_tree,
528                                                    page->cp_index, opg);
529                         if (result == 0)
530                                 ++osc->oo_npages;
531                         spin_unlock(&osc->oo_tree_lock);
532                         LASSERT(result == 0);
533                 }
534         }
535
536         return result;
537 }
538
539 int osc_over_unstable_soft_limit(struct client_obd *cli)
540 {
541         long obd_upages, obd_dpages, osc_upages;
542
543         /* Can't check cli->cl_unstable_count, therefore, no soft limit */
544         if (cli == NULL)
545                 return 0;
546
547         obd_upages = cfs_atomic_read(&obd_unstable_pages);
548         obd_dpages = cfs_atomic_read(&obd_dirty_pages);
549
550         osc_upages = cfs_atomic_read(&cli->cl_unstable_count);
551
552         /* obd_max_dirty_pages is the max number of (dirty + unstable)
553          * pages allowed at any given time. To simulate an unstable page
554          * only limit, we subtract the current number of dirty pages
555          * from this max. This difference is roughly the amount of pages
556          * currently available for unstable pages. Thus, the soft limit
557          * is half of that difference. Check osc_upages to ensure we don't
558          * set SOFT_SYNC for OSCs without any outstanding unstable pages. */
559         return osc_upages != 0 &&
560                obd_upages >= (obd_max_dirty_pages - obd_dpages) / 2;
561 }
562
563 /**
564  * Helper function called by osc_io_submit() for every page in an immediate
565  * transfer (i.e., transferred synchronously).
566  */
567 void osc_page_submit(const struct lu_env *env, struct osc_page *opg,
568                      enum cl_req_type crt, int brw_flags)
569 {
570         struct osc_async_page *oap = &opg->ops_oap;
571         struct osc_object     *obj = oap->oap_obj;
572
573         LINVRNT(osc_page_protected(env, opg,
574                                    crt == CRT_WRITE ? CLM_WRITE : CLM_READ, 1));
575
576         LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
577                  "magic 0x%x\n", oap, oap->oap_magic);
578         LASSERT(oap->oap_async_flags & ASYNC_READY);
579         LASSERT(oap->oap_async_flags & ASYNC_COUNT_STABLE);
580
581         oap->oap_cmd       = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
582         oap->oap_page_off  = opg->ops_from;
583         oap->oap_count     = opg->ops_to - opg->ops_from;
584         oap->oap_brw_flags = OBD_BRW_SYNC | brw_flags;
585
586         if (osc_over_unstable_soft_limit(oap->oap_cli))
587                 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
588
589         if (!client_is_remote(osc_export(obj)) &&
590                         cfs_capable(CFS_CAP_SYS_RESOURCE)) {
591                 oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
592                 oap->oap_cmd |= OBD_BRW_NOQUOTA;
593         }
594
595         opg->ops_submit_time = cfs_time_current();
596         osc_page_transfer_get(opg, "transfer\0imm");
597         osc_page_transfer_add(env, opg, crt);
598 }
599
600 /* --------------- LRU page management ------------------ */
601
602 /* OSC is a natural place to manage LRU pages as applications are specialized
603  * to write OSC by OSC. Ideally, if one OSC is used more frequently it should
604  * occupy more LRU slots. On the other hand, we should avoid using up all LRU
605  * slots (client_obd::cl_lru_left) otherwise process has to be put into sleep
606  * for free LRU slots - this will be very bad so the algorithm requires each
607  * OSC to free slots voluntarily to maintain a reasonable number of free slots
608  * at any time.
609  */
610
611 static CFS_DECL_WAITQ(osc_lru_waitq);
612 /* LRU pages are freed in batch mode. OSC should at least free this
613  * number of pages to avoid running out of LRU budget, and.. */
614 static const int lru_shrink_min = 2 << (20 - PAGE_CACHE_SHIFT); /* 2M */
615 /* free this number at most otherwise it will take too long time to finsih. */
616 static const int lru_shrink_max = 8 << (20 - PAGE_CACHE_SHIFT); /* 8M */
617
618 /* Check if we can free LRU slots from this OSC. If there exists LRU waiters,
619  * we should free slots aggressively. In this way, slots are freed in a steady
620  * step to maintain fairness among OSCs.
621  *
622  * Return how many LRU pages should be freed. */
623 static int osc_cache_too_much(struct client_obd *cli)
624 {
625         struct cl_client_cache *cache = cli->cl_cache;
626         int pages = cfs_atomic_read(&cli->cl_lru_in_list);
627         unsigned long budget;
628
629         budget = cache->ccc_lru_max / cfs_atomic_read(&cache->ccc_users);
630
631         /* if it's going to run out LRU slots, we should free some, but not
632          * too much to maintain faireness among OSCs. */
633         if (cfs_atomic_read(cli->cl_lru_left) < cache->ccc_lru_max >> 4) {
634                 if (pages >= budget)
635                         return lru_shrink_max;
636                 else if (pages >= budget / 2)
637                         return lru_shrink_min;
638         } else if (pages >= budget * 2)
639                 return lru_shrink_min;
640         return 0;
641 }
642
643 int lru_queue_work(const struct lu_env *env, void *data)
644 {
645         struct client_obd *cli = data;
646
647         CDEBUG(D_CACHE, "Run LRU work for client obd %p.\n", cli);
648
649         if (osc_cache_too_much(cli))
650                 osc_lru_shrink(env, cli, lru_shrink_max, true);
651
652         RETURN(0);
653 }
654
655 void osc_lru_add_batch(struct client_obd *cli, cfs_list_t *plist)
656 {
657         CFS_LIST_HEAD(lru);
658         struct osc_async_page *oap;
659         int npages = 0;
660
661         cfs_list_for_each_entry(oap, plist, oap_pending_item) {
662                 struct osc_page *opg = oap2osc_page(oap);
663
664                 if (!opg->ops_in_lru)
665                         continue;
666
667                 ++npages;
668                 LASSERT(cfs_list_empty(&opg->ops_lru));
669                 cfs_list_add(&opg->ops_lru, &lru);
670         }
671
672         if (npages > 0) {
673                 client_obd_list_lock(&cli->cl_lru_list_lock);
674                 cfs_list_splice_tail(&lru, &cli->cl_lru_list);
675                 cfs_atomic_sub(npages, &cli->cl_lru_busy);
676                 cfs_atomic_add(npages, &cli->cl_lru_in_list);
677                 client_obd_list_unlock(&cli->cl_lru_list_lock);
678
679                 /* XXX: May set force to be true for better performance */
680                 if (osc_cache_too_much(cli))
681                         (void)ptlrpcd_queue_work(cli->cl_lru_work);
682         }
683 }
684
685 static void __osc_lru_del(struct client_obd *cli, struct osc_page *opg)
686 {
687         LASSERT(cfs_atomic_read(&cli->cl_lru_in_list) > 0);
688         cfs_list_del_init(&opg->ops_lru);
689         cfs_atomic_dec(&cli->cl_lru_in_list);
690 }
691
692 /**
693  * Page is being destroyed. The page may be not in LRU list, if the transfer
694  * has never finished(error occurred).
695  */
696 static void osc_lru_del(struct client_obd *cli, struct osc_page *opg)
697 {
698         if (opg->ops_in_lru) {
699                 client_obd_list_lock(&cli->cl_lru_list_lock);
700                 if (!cfs_list_empty(&opg->ops_lru)) {
701                         __osc_lru_del(cli, opg);
702                 } else {
703                         LASSERT(cfs_atomic_read(&cli->cl_lru_busy) > 0);
704                         cfs_atomic_dec(&cli->cl_lru_busy);
705                 }
706                 client_obd_list_unlock(&cli->cl_lru_list_lock);
707
708                 cfs_atomic_inc(cli->cl_lru_left);
709                 /* this is a great place to release more LRU pages if
710                  * this osc occupies too many LRU pages and kernel is
711                  * stealing one of them. */
712                 if (!memory_pressure_get())
713                         (void)ptlrpcd_queue_work(cli->cl_lru_work);
714                 wake_up(&osc_lru_waitq);
715         } else {
716                 LASSERT(cfs_list_empty(&opg->ops_lru));
717         }
718 }
719
720 /**
721  * Delete page from LRUlist for redirty.
722  */
723 static void osc_lru_use(struct client_obd *cli, struct osc_page *opg)
724 {
725         /* If page is being transfered for the first time,
726          * ops_lru should be empty */
727         if (opg->ops_in_lru && !cfs_list_empty(&opg->ops_lru)) {
728                 client_obd_list_lock(&cli->cl_lru_list_lock);
729                 __osc_lru_del(cli, opg);
730                 client_obd_list_unlock(&cli->cl_lru_list_lock);
731                 cfs_atomic_inc(&cli->cl_lru_busy);
732         }
733 }
734
735 static void discard_pagevec(const struct lu_env *env, struct cl_io *io,
736                                 struct cl_page **pvec, int max_index)
737 {
738         int i;
739
740         for (i = 0; i < max_index; i++) {
741                 struct cl_page *page = pvec[i];
742
743                 LASSERT(cl_page_is_owned(page, io));
744                 cl_page_discard(env, io, page);
745                 cl_page_disown(env, io, page);
746                 cl_page_put(env, page);
747
748                 pvec[i] = NULL;
749         }
750 }
751
752 /**
753  * Drop @target of pages from LRU at most.
754  */
755 int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
756                    int target, bool force)
757 {
758         struct cl_io *io;
759         struct cl_object *clobj = NULL;
760         struct cl_page **pvec;
761         struct osc_page *opg;
762         int maxscan = 0;
763         int count = 0;
764         int index = 0;
765         int rc = 0;
766         ENTRY;
767
768         LASSERT(cfs_atomic_read(&cli->cl_lru_in_list) >= 0);
769         if (cfs_atomic_read(&cli->cl_lru_in_list) == 0 || target <= 0)
770                 RETURN(0);
771
772         if (!force) {
773                 if (cfs_atomic_read(&cli->cl_lru_shrinkers) > 0)
774                         RETURN(-EBUSY);
775
776                 if (cfs_atomic_inc_return(&cli->cl_lru_shrinkers) > 1) {
777                         cfs_atomic_dec(&cli->cl_lru_shrinkers);
778                         RETURN(-EBUSY);
779                 }
780         } else {
781                 cfs_atomic_inc(&cli->cl_lru_shrinkers);
782         }
783
784         pvec = (struct cl_page **)osc_env_info(env)->oti_pvec;
785         io = &osc_env_info(env)->oti_io;
786
787         client_obd_list_lock(&cli->cl_lru_list_lock);
788         maxscan = min(target << 1, cfs_atomic_read(&cli->cl_lru_in_list));
789         while (!cfs_list_empty(&cli->cl_lru_list)) {
790                 struct cl_page *page;
791                 bool will_free = false;
792
793                 if (--maxscan < 0)
794                         break;
795
796                 opg = cfs_list_entry(cli->cl_lru_list.next, struct osc_page,
797                                      ops_lru);
798                 page = cl_page_top(opg->ops_cl.cpl_page);
799                 if (cl_page_in_use_noref(page)) {
800                         cfs_list_move_tail(&opg->ops_lru, &cli->cl_lru_list);
801                         continue;
802                 }
803
804                 LASSERT(page->cp_obj != NULL);
805                 if (clobj != page->cp_obj) {
806                         struct cl_object *tmp = page->cp_obj;
807
808                         cl_object_get(tmp);
809                         client_obd_list_unlock(&cli->cl_lru_list_lock);
810
811                         if (clobj != NULL) {
812                                 discard_pagevec(env, io, pvec, index);
813                                 index = 0;
814
815                                 cl_io_fini(env, io);
816                                 cl_object_put(env, clobj);
817                                 clobj = NULL;
818                         }
819
820                         clobj = tmp;
821                         io->ci_obj = clobj;
822                         io->ci_ignore_layout = 1;
823                         rc = cl_io_init(env, io, CIT_MISC, clobj);
824
825                         client_obd_list_lock(&cli->cl_lru_list_lock);
826
827                         if (rc != 0)
828                                 break;
829
830                         ++maxscan;
831                         continue;
832                 }
833
834                 if (cl_page_own_try(env, io, page) == 0) {
835                         if (!cl_page_in_use_noref(page)) {
836                                 /* remove it from lru list earlier to avoid
837                                  * lock contention */
838                                 __osc_lru_del(cli, opg);
839                                 opg->ops_in_lru = 0; /* will be discarded */
840
841                                 cl_page_get(page);
842                                 will_free = true;
843                         } else {
844                                 cl_page_disown(env, io, page);
845                         }
846                 }
847
848                 if (!will_free) {
849                         cfs_list_move_tail(&opg->ops_lru, &cli->cl_lru_list);
850                         continue;
851                 }
852
853                 /* Don't discard and free the page with cl_lru_list held */
854                 pvec[index++] = page;
855                 if (unlikely(index == OTI_PVEC_SIZE)) {
856                         client_obd_list_unlock(&cli->cl_lru_list_lock);
857                         discard_pagevec(env, io, pvec, index);
858                         index = 0;
859
860                         client_obd_list_lock(&cli->cl_lru_list_lock);
861                 }
862
863                 if (++count >= target)
864                         break;
865         }
866         client_obd_list_unlock(&cli->cl_lru_list_lock);
867
868         if (clobj != NULL) {
869                 discard_pagevec(env, io, pvec, index);
870
871                 cl_io_fini(env, io);
872                 cl_object_put(env, clobj);
873         }
874
875         cfs_atomic_dec(&cli->cl_lru_shrinkers);
876         if (count > 0) {
877                 cfs_atomic_add(count, cli->cl_lru_left);
878                 wake_up_all(&osc_lru_waitq);
879         }
880         RETURN(count > 0 ? count : rc);
881 }
882
883 static inline int max_to_shrink(struct client_obd *cli)
884 {
885         return min(cfs_atomic_read(&cli->cl_lru_in_list) >> 1, lru_shrink_max);
886 }
887
888 int osc_lru_reclaim(struct client_obd *cli)
889 {
890         struct cl_env_nest nest;
891         struct lu_env *env;
892         struct cl_client_cache *cache = cli->cl_cache;
893         int max_scans;
894         int rc = 0;
895         ENTRY;
896
897         LASSERT(cache != NULL);
898         LASSERT(!cfs_list_empty(&cache->ccc_lru));
899
900         env = cl_env_nested_get(&nest);
901         if (IS_ERR(env))
902                 RETURN(rc);
903
904         rc = osc_lru_shrink(env, cli, osc_cache_too_much(cli), false);
905         if (rc != 0) {
906                 if (rc == -EBUSY)
907                         rc = 0;
908
909                 CDEBUG(D_CACHE, "%s: Free %d pages from own LRU: %p.\n",
910                         cli->cl_import->imp_obd->obd_name, rc, cli);
911                 GOTO(out, rc);
912         }
913
914         CDEBUG(D_CACHE, "%s: cli %p no free slots, pages: %d, busy: %d.\n",
915                 cli->cl_import->imp_obd->obd_name, cli,
916                 cfs_atomic_read(&cli->cl_lru_in_list),
917                 cfs_atomic_read(&cli->cl_lru_busy));
918
919         /* Reclaim LRU slots from other client_obd as it can't free enough
920          * from its own. This should rarely happen. */
921         spin_lock(&cache->ccc_lru_lock);
922         cache->ccc_lru_shrinkers++;
923         cfs_list_move_tail(&cli->cl_lru_osc, &cache->ccc_lru);
924
925         max_scans = cfs_atomic_read(&cache->ccc_users);
926         while (--max_scans > 0 && !cfs_list_empty(&cache->ccc_lru)) {
927                 cli = cfs_list_entry(cache->ccc_lru.next, struct client_obd,
928                                         cl_lru_osc);
929
930                 CDEBUG(D_CACHE, "%s: cli %p LRU pages: %d, busy: %d.\n",
931                         cli->cl_import->imp_obd->obd_name, cli,
932                         cfs_atomic_read(&cli->cl_lru_in_list),
933                         cfs_atomic_read(&cli->cl_lru_busy));
934
935                 cfs_list_move_tail(&cli->cl_lru_osc, &cache->ccc_lru);
936                 if (osc_cache_too_much(cli) > 0) {
937                         spin_unlock(&cache->ccc_lru_lock);
938
939                         rc = osc_lru_shrink(env, cli, osc_cache_too_much(cli),
940                                             true);
941                         spin_lock(&cache->ccc_lru_lock);
942                         if (rc != 0)
943                                 break;
944                 }
945         }
946         spin_unlock(&cache->ccc_lru_lock);
947
948 out:
949         cl_env_nested_put(&nest, env);
950         CDEBUG(D_CACHE, "%s: cli %p freed %d pages.\n",
951                 cli->cl_import->imp_obd->obd_name, cli, rc);
952         return rc;
953 }
954
955 static int osc_lru_reserve(const struct lu_env *env, struct osc_object *obj,
956                            struct osc_page *opg)
957 {
958         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
959         struct osc_io *oio = osc_env_io(env);
960         struct client_obd *cli = osc_cli(obj);
961         int rc = 0;
962         ENTRY;
963
964         if (cli->cl_cache == NULL) /* shall not be in LRU */
965                 RETURN(0);
966
967         if (oio->oi_lru_reserved > 0) {
968                 --oio->oi_lru_reserved;
969                 goto out;
970         }
971
972         LASSERT(cfs_atomic_read(cli->cl_lru_left) >= 0);
973         while (!cfs_atomic_add_unless(cli->cl_lru_left, -1, 0)) {
974
975                 /* run out of LRU spaces, try to drop some by itself */
976                 rc = osc_lru_reclaim(cli);
977                 if (rc < 0)
978                         break;
979                 if (rc > 0)
980                         continue;
981
982                 cond_resched();
983                 rc = l_wait_event(osc_lru_waitq,
984                                 cfs_atomic_read(cli->cl_lru_left) > 0,
985                                 &lwi);
986                 if (rc < 0)
987                         break;
988         }
989
990 out:
991         if (rc >= 0) {
992                 cfs_atomic_inc(&cli->cl_lru_busy);
993                 opg->ops_in_lru = 1;
994                 rc = 0;
995         }
996
997         RETURN(rc);
998 }
999
1000 /** @} osc */