Whamcloud - gitweb
b=16367
[fs/lustre-release.git] / lustre / ldlm / ldlm_pool.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_pool.c
37  *
38  * Author: Yury Umanets <umka@clusterfs.com>
39  */
40
41 /* 
42  * Idea of this code is rather simple. Each second, for each server namespace
43  * we have SLV - server lock volume which is calculated on current number of
44  * granted locks, grant speed for past period, etc - that is, locking load.
45  * This SLV number may be thought as a flow definition for simplicity. It is
46  * sent to clients with each occasion to let them know what is current load
47  * situation on the server. By default, at the beginning, SLV on server is
48  * set max value which is calculated as the following: allow to one client
49  * have all locks of limit ->pl_limit for 10h.
50  *
51  * Next, on clients, number of cached locks is not limited artificially in any
52  * way as it was before. Instead, client calculates CLV, that is, client lock
53  * volume for each lock and compares it with last SLV from the server. CLV is
54  * calculated as the number of locks in LRU * lock live time in seconds. If
55  * CLV > SLV - lock is canceled.
56  *
57  * Client has LVF, that is, lock volume factor which regulates how much sensitive
58  * client should be about last SLV from server. The higher LVF is the more locks
59  * will be canceled on client. Default value for it is 1. Setting LVF to 2 means
60  * that client will cancel locks 2 times faster.
61  *
62  * Locks on a client will be canceled more intensively in these cases:
63  * (1) if SLV is smaller, that is, load is higher on the server;
64  * (2) client has a lot of locks (the more locks are held by client, the bigger
65  *     chances that some of them should be canceled);
66  * (3) client has old locks (taken some time ago);
67  *
68  * Thus, according to flow paradigm that we use for better understanding SLV,
69  * CLV is the volume of particle in flow described by SLV. According to this,
70  * if flow is getting thinner, more and more particles become outside of it and
71  * as particles are locks, they should be canceled.
72  *
73  * General idea of this belongs to Vitaly Fertman (vitaly@clusterfs.com). Andreas
74  * Dilger (adilger@clusterfs.com) proposed few nice ideas like using LVF and many
75  * cleanups. Flow definition to allow more easy understanding of the logic belongs
76  * to Nikita Danilov (nikita@clusterfs.com) as well as many cleanups and fixes.
77  * And design and implementation are done by Yury Umanets (umka@clusterfs.com).
78  *
79  * Glossary for terms used:
80  *
81  * pl_limit - Number of allowed locks in pool. Applies to server and client
82  * side (tunable);
83  *
84  * pl_granted - Number of granted locks (calculated);
85  * pl_grant_rate - Number of granted locks for last T (calculated);
86  * pl_cancel_rate - Number of canceled locks for last T (calculated);
87  * pl_grant_speed - Grant speed (GR - CR) for last T (calculated);
88  * pl_grant_plan - Planned number of granted locks for next T (calculated);
89  * pl_server_lock_volume - Current server lock volume (calculated);
90  *
91  * As it may be seen from list above, we have few possible tunables which may
92  * affect behavior much. They all may be modified via proc. However, they also
93  * give a possibility for constructing few pre-defined behavior policies. If
94  * none of predefines is suitable for a working pattern being used, new one may
95  * be "constructed" via proc tunables.
96  */
97
98 #define DEBUG_SUBSYSTEM S_LDLM
99
100 #ifdef __KERNEL__
101 # include <lustre_dlm.h>
102 #else
103 # include <liblustre.h>
104 #endif
105
106 #include <obd_class.h>
107 #include <obd_support.h>
108 #include "ldlm_internal.h"
109
110 #ifdef HAVE_LRU_RESIZE_SUPPORT
111
112 /*
113  * 50 ldlm locks for 1MB of RAM. 
114  */
115 #define LDLM_POOL_HOST_L ((num_physpages >> (20 - CFS_PAGE_SHIFT)) * 50)
116
117 /*
118  * Maximal possible grant step plan in %. 
119  */
120 #define LDLM_POOL_MAX_GSP (30)
121
122 /*
123  * Minimal possible grant step plan in %. 
124  */
125 #define LDLM_POOL_MIN_GSP (1)
126
127 /*
128  * This controls the speed of reaching LDLM_POOL_MAX_GSP
129  * with increasing thread period.
130  */
131 #define LDLM_POOL_GSP_STEP (4)
132
133 /* 
134  * LDLM_POOL_GSP% of all locks is default GP. 
135  */
136 #define LDLM_POOL_GP(L)   (((L) * LDLM_POOL_MAX_GSP) / 100)
137
138 /* 
139  * Max age for locks on clients. 
140  */
141 #define LDLM_POOL_MAX_AGE (36000)
142
143 #ifdef __KERNEL__
144 extern cfs_proc_dir_entry_t *ldlm_ns_proc_dir;
145 #endif
146
147 #define avg(src, add) \
148         ((src) = ((src) + (add)) / 2)
149
150 static inline __u64 dru(__u64 val, __u32 div)
151 {
152         __u64 ret = val + (div - 1);
153         do_div(ret, div);
154         return ret;
155 }
156
157 static inline __u64 ldlm_pool_slv_max(__u32 L)
158 {
159         /*
160          * Allow to have all locks for 1 client for 10 hrs.
161          * Formula is the following: limit * 10h / 1 client. 
162          */
163         __u64 lim = L *  LDLM_POOL_MAX_AGE / 1;
164         return lim;
165 }
166
167 static inline __u64 ldlm_pool_slv_min(__u32 L)
168 {
169         return 1;
170 }
171
172 enum {
173         LDLM_POOL_FIRST_STAT = 0,
174         LDLM_POOL_GRANTED_STAT = LDLM_POOL_FIRST_STAT,
175         LDLM_POOL_GRANT_STAT,
176         LDLM_POOL_CANCEL_STAT,
177         LDLM_POOL_GRANT_RATE_STAT,
178         LDLM_POOL_CANCEL_RATE_STAT,
179         LDLM_POOL_GRANT_PLAN_STAT,
180         LDLM_POOL_SLV_STAT,
181         LDLM_POOL_SHRINK_REQTD_STAT,
182         LDLM_POOL_SHRINK_FREED_STAT,
183         LDLM_POOL_RECALC_STAT,
184         LDLM_POOL_TIMING_STAT,
185         LDLM_POOL_LAST_STAT
186 };
187
188 static inline struct ldlm_namespace *ldlm_pl2ns(struct ldlm_pool *pl)
189 {
190         return container_of(pl, struct ldlm_namespace, ns_pool);
191 }
192
193 /**
194  * Calculates suggested grant_step in % of available locks for passed 
195  * \a period. This is later used in grant_plan calculations.
196  */
197 static inline int ldlm_pool_t2gsp(int t)
198 {
199         /*
200          * This yeilds 1% grant step for anything below LDLM_POOL_GSP_STEP
201          * and up to 30% for anything higher than LDLM_POOL_GSP_STEP.
202          * 
203          * How this will affect execution is the following:
204          *
205          * - for thread peroid 1s we will have grant_step 1% which good from
206          * pov of taking some load off from server and push it out to clients.
207          * This is like that because 1% for grant_step means that server will
208          * not allow clients to get lots of locks inshort period of time and
209          * keep all old locks in their caches. Clients will always have to
210          * get some locks back if they want to take some new;
211          *
212          * - for thread period 10s (which is default) we will have 23% which
213          * means that clients will have enough of room to take some new locks
214          * without getting some back. All locks from this 23% which were not 
215          * taken by clients in current period will contribute in SLV growing.
216          * SLV growing means more locks cached on clients until limit or grant
217          * plan is reached.
218          */
219         return LDLM_POOL_MAX_GSP - 
220                 (LDLM_POOL_MAX_GSP - LDLM_POOL_MIN_GSP) / 
221                 (1 << (t / LDLM_POOL_GSP_STEP));
222 }
223
224 /**
225  * Recalculates next grant limit on passed \a pl.
226  *
227  * \pre ->pl_lock is locked. 
228  */
229 static inline void ldlm_pool_recalc_grant_plan(struct ldlm_pool *pl)
230 {
231         int granted, grant_step, limit;
232         
233         limit = ldlm_pool_get_limit(pl);
234         granted = atomic_read(&pl->pl_granted);
235
236         grant_step = ldlm_pool_t2gsp(pl->pl_recalc_period);
237         grant_step = ((limit - granted) * grant_step) / 100;
238         pl->pl_grant_plan = granted + grant_step;
239 }
240
241 /**
242  * Recalculates next SLV on passed \a pl.
243  *
244  * \pre ->pl_lock is locked. 
245  */
246 static inline void ldlm_pool_recalc_slv(struct ldlm_pool *pl)
247 {
248         int grant_usage, granted, grant_plan;
249         __u64 slv, slv_factor;
250         __u32 limit;
251
252         slv = pl->pl_server_lock_volume;
253         grant_plan = pl->pl_grant_plan;
254         limit = ldlm_pool_get_limit(pl);
255         granted = atomic_read(&pl->pl_granted);
256
257         grant_usage = limit - (granted - grant_plan);
258         if (grant_usage <= 0)
259                 grant_usage = 1;
260
261         /* 
262          * Find out SLV change factor which is the ratio of grant usage 
263          * from limit. SLV changes as fast as the ratio of grant plan 
264          * consumtion. The more locks from grant plan are not consumed 
265          * by clients in last interval (idle time), the faster grows 
266          * SLV. And the opposite, the more grant plan is over-consumed
267          * (load time) the faster drops SLV. 
268          */
269         slv_factor = (grant_usage * 100) / limit;
270         if (2 * abs(granted - limit) > limit) {
271                 slv_factor *= slv_factor;
272                 slv_factor = dru(slv_factor, 100);
273         }
274         slv = slv * slv_factor;
275         slv = dru(slv, 100);
276
277         if (slv > ldlm_pool_slv_max(limit)) {
278                 slv = ldlm_pool_slv_max(limit);
279         } else if (slv < ldlm_pool_slv_min(limit)) {
280                 slv = ldlm_pool_slv_min(limit);
281         }
282
283         pl->pl_server_lock_volume = slv;
284 }
285
286 /**
287  * Recalculates next stats on passed \a pl.
288  *
289  * \pre ->pl_lock is locked. 
290  */
291 static inline void ldlm_pool_recalc_stats(struct ldlm_pool *pl)
292 {
293         int grant_plan = pl->pl_grant_plan;
294         __u64 slv = pl->pl_server_lock_volume;
295         int granted = atomic_read(&pl->pl_granted);
296         int grant_rate = atomic_read(&pl->pl_grant_rate);
297         int cancel_rate = atomic_read(&pl->pl_cancel_rate);
298
299         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_SLV_STAT, 
300                             slv);
301         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANTED_STAT,
302                             granted);
303         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANT_RATE_STAT,
304                             grant_rate);
305         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANT_PLAN_STAT,
306                             grant_plan);
307         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_CANCEL_RATE_STAT,
308                             cancel_rate);
309 }
310
311 /**
312  * Sets current SLV into obd accessible via ldlm_pl2ns(pl)->ns_obd.
313  */
314 static void ldlm_srv_pool_push_slv(struct ldlm_pool *pl)
315 {
316         struct obd_device *obd;
317
318         /* 
319          * Set new SLV in obd field for using it later without accessing the
320          * pool. This is required to avoid race between sending reply to client
321          * with new SLV and cleanup server stack in which we can't guarantee
322          * that namespace is still alive. We know only that obd is alive as
323          * long as valid export is alive. 
324          */
325         obd = ldlm_pl2ns(pl)->ns_obd;
326         LASSERT(obd != NULL);
327         write_lock(&obd->obd_pool_lock);
328         obd->obd_pool_slv = pl->pl_server_lock_volume;
329         write_unlock(&obd->obd_pool_lock);
330 }
331
332 /**
333  * Recalculates all pool fields on passed \a pl.
334  *
335  * \pre ->pl_lock is not locked. 
336  */
337 static int ldlm_srv_pool_recalc(struct ldlm_pool *pl)
338 {
339         time_t recalc_interval_sec;
340         ENTRY;
341
342         spin_lock(&pl->pl_lock);
343         recalc_interval_sec = cfs_time_current_sec() - pl->pl_recalc_time;
344         if (recalc_interval_sec >= pl->pl_recalc_period) {
345                 /*
346                  * Recalc SLV after last period. This should be done
347                  * _before_ recalculating new grant plan. 
348                  */
349                 ldlm_pool_recalc_slv(pl);
350                 
351                 /*
352                  * Make sure that pool informed obd of last SLV changes. 
353                  */
354                 ldlm_srv_pool_push_slv(pl);
355
356                 /*
357                  * Update grant_plan for new period. 
358                  */
359                 ldlm_pool_recalc_grant_plan(pl);
360
361                 pl->pl_recalc_time = cfs_time_current_sec();
362                 lprocfs_counter_add(pl->pl_stats, LDLM_POOL_TIMING_STAT, 
363                                     recalc_interval_sec);
364         }
365
366         spin_unlock(&pl->pl_lock);
367         RETURN(0);
368 }
369
370 /**
371  * This function is used on server side as main entry point for memory
372  * preasure handling. It decreases SLV on \a pl according to passed
373  * \a nr and \a gfp_mask.
374  * 
375  * Our goal here is to decrease SLV such a way that clients hold \a nr
376  * locks smaller in next 10h. 
377  */
378 static int ldlm_srv_pool_shrink(struct ldlm_pool *pl,
379                                 int nr, unsigned int gfp_mask)
380 {
381         __u32 limit;
382         ENTRY;
383
384         /* 
385          * VM is asking how many entries may be potentially freed. 
386          */
387         if (nr == 0)
388                 RETURN(atomic_read(&pl->pl_granted));
389
390         /* 
391          * Client already canceled locks but server is already in shrinker
392          * and can't cancel anything. Let's catch this race. 
393          */
394         if (atomic_read(&pl->pl_granted) == 0)
395                 RETURN(0);
396
397         spin_lock(&pl->pl_lock);
398
399         /* 
400          * We want shrinker to possibly cause cancelation of @nr locks from
401          * clients or grant approximately @nr locks smaller next intervals.
402          *
403          * This is why we decresed SLV by @nr. This effect will only be as
404          * long as one re-calc interval (1s these days) and this should be
405          * enough to pass this decreased SLV to all clients. On next recalc
406          * interval pool will either increase SLV if locks load is not high
407          * or will keep on same level or even decrease again, thus, shrinker
408          * decreased SLV will affect next recalc intervals and this way will
409          * make locking load lower. 
410          */
411         if (nr < pl->pl_server_lock_volume) {
412                 pl->pl_server_lock_volume = pl->pl_server_lock_volume - nr;
413         } else {
414                 limit = ldlm_pool_get_limit(pl);
415                 pl->pl_server_lock_volume = ldlm_pool_slv_min(limit);
416         }
417
418         /* 
419          * Make sure that pool informed obd of last SLV changes. 
420          */
421         ldlm_srv_pool_push_slv(pl);
422         spin_unlock(&pl->pl_lock);
423
424         /* 
425          * We did not really free any memory here so far, it only will be
426          * freed later may be, so that we return 0 to not confuse VM. 
427          */
428         RETURN(0);
429 }
430
431 /**
432  * Setup server side pool \a pl with passed \a limit.
433  */
434 static int ldlm_srv_pool_setup(struct ldlm_pool *pl, int limit)
435 {
436         struct obd_device *obd;
437         ENTRY;
438         
439         obd = ldlm_pl2ns(pl)->ns_obd;
440         LASSERT(obd != NULL && obd != LP_POISON);
441         LASSERT(obd->obd_type != LP_POISON);
442         write_lock(&obd->obd_pool_lock);
443         obd->obd_pool_limit = limit;
444         write_unlock(&obd->obd_pool_lock);
445
446         ldlm_pool_set_limit(pl, limit);
447         RETURN(0);
448 }
449
450 /**
451  * Sets SLV and Limit from ldlm_pl2ns(pl)->ns_obd tp passed \a pl.
452  */
453 static void ldlm_cli_pool_pop_slv(struct ldlm_pool *pl)
454 {
455         struct obd_device *obd;
456
457         /* 
458          * Get new SLV and Limit from obd which is updated with comming 
459          * RPCs. 
460          */
461         obd = ldlm_pl2ns(pl)->ns_obd;
462         LASSERT(obd != NULL);
463         read_lock(&obd->obd_pool_lock);
464         pl->pl_server_lock_volume = obd->obd_pool_slv;
465         ldlm_pool_set_limit(pl, obd->obd_pool_limit);
466         read_unlock(&obd->obd_pool_lock);
467 }
468
469 /**
470  * Recalculates client sise pool \a pl according to current SLV and Limit.
471  */
472 static int ldlm_cli_pool_recalc(struct ldlm_pool *pl)
473 {
474         time_t recalc_interval_sec;
475         ENTRY;
476
477         spin_lock(&pl->pl_lock);
478         /*
479          * Check if we need to recalc lists now.
480          */
481         recalc_interval_sec = cfs_time_current_sec() - pl->pl_recalc_time;
482         if (recalc_interval_sec < pl->pl_recalc_period) {
483                 spin_unlock(&pl->pl_lock);
484                 RETURN(0);
485         }
486
487         /* 
488          * Make sure that pool knows last SLV and Limit from obd. 
489          */
490         ldlm_cli_pool_pop_slv(pl);
491
492         pl->pl_recalc_time = cfs_time_current_sec();
493         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_TIMING_STAT, 
494                             recalc_interval_sec);
495         spin_unlock(&pl->pl_lock);
496
497         /* 
498          * Do not cancel locks in case lru resize is disabled for this ns. 
499          */
500         if (!ns_connect_lru_resize(ldlm_pl2ns(pl)))
501                 RETURN(0);
502
503         /* 
504          * In the time of canceling locks on client we do not need to maintain
505          * sharp timing, we only want to cancel locks asap according to new SLV.
506          * It may be called when SLV has changed much, this is why we do not
507          * take into account pl->pl_recalc_time here. 
508          */
509         RETURN(ldlm_cancel_lru(ldlm_pl2ns(pl), 0, LDLM_ASYNC, 
510                                LDLM_CANCEL_LRUR));
511 }
512
513 /**
514  * This function is main entry point for memory preasure handling on client side.
515  * Main goal of this function is to cancel some number of locks on passed \a pl
516  * according to \a nr and \a gfp_mask.
517  */
518 static int ldlm_cli_pool_shrink(struct ldlm_pool *pl,
519                                 int nr, unsigned int gfp_mask)
520 {
521         ENTRY;
522         
523         /* 
524          * Do not cancel locks in case lru resize is disabled for this ns. 
525          */
526         if (!ns_connect_lru_resize(ldlm_pl2ns(pl)))
527                 RETURN(0);
528
529         /* 
530          * Make sure that pool knows last SLV and Limit from obd. 
531          */
532         ldlm_cli_pool_pop_slv(pl);
533
534         /* 
535          * Find out how many locks may be released according to shrink 
536          * policy. 
537          */
538         if (nr == 0)
539                 RETURN(ldlm_cancel_lru_estimate(ldlm_pl2ns(pl), 0, 0, 
540                                                 LDLM_CANCEL_SHRINK));
541
542         /* 
543          * Cancel @nr locks accoding to shrink policy. 
544          */
545         RETURN(ldlm_cancel_lru(ldlm_pl2ns(pl), nr, LDLM_SYNC, 
546                                LDLM_CANCEL_SHRINK));
547 }
548
549 struct ldlm_pool_ops ldlm_srv_pool_ops = {
550         .po_recalc = ldlm_srv_pool_recalc,
551         .po_shrink = ldlm_srv_pool_shrink,
552         .po_setup  = ldlm_srv_pool_setup
553 };
554
555 struct ldlm_pool_ops ldlm_cli_pool_ops = {
556         .po_recalc = ldlm_cli_pool_recalc,
557         .po_shrink = ldlm_cli_pool_shrink
558 };
559
560 /**
561  * Pool recalc wrapper. Will call either client or server pool recalc callback
562  * depending what pool \a pl is used.
563  */
564 int ldlm_pool_recalc(struct ldlm_pool *pl)
565 {
566         time_t recalc_interval_sec;
567         int count;
568
569         spin_lock(&pl->pl_lock);
570         recalc_interval_sec = cfs_time_current_sec() - pl->pl_recalc_time;
571         if (recalc_interval_sec > 0) {
572                 /*
573                  * Update pool statistics every 1s.
574                  */
575                 ldlm_pool_recalc_stats(pl);
576
577                 /*
578                  * Zero out all rates and speed for the last period. 
579                  */
580                 atomic_set(&pl->pl_grant_rate, 0);
581                 atomic_set(&pl->pl_cancel_rate, 0);
582                 atomic_set(&pl->pl_grant_speed, 0);
583         }
584         spin_unlock(&pl->pl_lock);
585
586         if (pl->pl_ops->po_recalc != NULL) {
587                 count = pl->pl_ops->po_recalc(pl);
588                 lprocfs_counter_add(pl->pl_stats, LDLM_POOL_RECALC_STAT, 
589                                     count);
590                 return count;
591         }
592
593         return 0;
594 }
595 EXPORT_SYMBOL(ldlm_pool_recalc);
596
597 /**
598  * Pool shrink wrapper. Will call either client or server pool recalc callback
599  * depending what pool \a pl is used.
600  */
601 int ldlm_pool_shrink(struct ldlm_pool *pl, int nr,
602                      unsigned int gfp_mask)
603 {
604         int cancel = 0;
605         
606         if (pl->pl_ops->po_shrink != NULL) {
607                 cancel = pl->pl_ops->po_shrink(pl, nr, gfp_mask);
608                 if (nr > 0) {
609                         lprocfs_counter_add(pl->pl_stats, 
610                                             LDLM_POOL_SHRINK_REQTD_STAT,
611                                             nr);
612                         lprocfs_counter_add(pl->pl_stats, 
613                                             LDLM_POOL_SHRINK_FREED_STAT,
614                                             cancel);
615                         CDEBUG(D_DLMTRACE, "%s: request to shrink %d locks, "
616                                "shrunk %d\n", pl->pl_name, nr, cancel);
617                 }
618         }
619         return cancel;
620 }
621 EXPORT_SYMBOL(ldlm_pool_shrink);
622
623 /**
624  * Pool setup wrapper. Will call either client or server pool recalc callback
625  * depending what pool \a pl is used.
626  *
627  * Sets passed \a limit into pool \a pl.
628  */
629 int ldlm_pool_setup(struct ldlm_pool *pl, int limit)
630 {
631         ENTRY;
632         if (pl->pl_ops->po_setup != NULL)
633                 RETURN(pl->pl_ops->po_setup(pl, limit));
634         RETURN(0);
635 }
636 EXPORT_SYMBOL(ldlm_pool_setup);
637
638 #ifdef __KERNEL__
639 static int lprocfs_rd_pool_state(char *page, char **start, off_t off,
640                                  int count, int *eof, void *data)
641 {
642         int granted, grant_rate, cancel_rate, grant_step;
643         int nr = 0, grant_speed, grant_plan, lvf;
644         struct ldlm_pool *pl = data;
645         __u64 slv, clv;
646         __u32 limit;
647
648         spin_lock(&pl->pl_lock);
649         slv = pl->pl_server_lock_volume;
650         clv = pl->pl_client_lock_volume;
651         limit = ldlm_pool_get_limit(pl);
652         grant_plan = pl->pl_grant_plan;
653         granted = atomic_read(&pl->pl_granted);
654         grant_rate = atomic_read(&pl->pl_grant_rate);
655         lvf = atomic_read(&pl->pl_lock_volume_factor);
656         grant_speed = atomic_read(&pl->pl_grant_speed);
657         cancel_rate = atomic_read(&pl->pl_cancel_rate);
658         grant_step = ldlm_pool_t2gsp(pl->pl_recalc_period);
659         spin_unlock(&pl->pl_lock);
660
661         nr += snprintf(page + nr, count - nr, "LDLM pool state (%s):\n",
662                        pl->pl_name);
663         nr += snprintf(page + nr, count - nr, "  SLV: "LPU64"\n", slv);
664         nr += snprintf(page + nr, count - nr, "  CLV: "LPU64"\n", clv);
665         nr += snprintf(page + nr, count - nr, "  LVF: %d\n", lvf);
666
667         if (ns_is_server(ldlm_pl2ns(pl))) {
668                 nr += snprintf(page + nr, count - nr, "  GSP: %d%%\n",
669                                grant_step);
670                 nr += snprintf(page + nr, count - nr, "  GP:  %d\n",
671                                grant_plan);
672         }
673         nr += snprintf(page + nr, count - nr, "  GR:  %d\n",
674                        grant_rate);
675         nr += snprintf(page + nr, count - nr, "  CR:  %d\n",
676                        cancel_rate);
677         nr += snprintf(page + nr, count - nr, "  GS:  %d\n",
678                        grant_speed);
679         nr += snprintf(page + nr, count - nr, "  G:   %d\n",
680                        granted);
681         nr += snprintf(page + nr, count - nr, "  L:   %d\n",
682                        limit);
683         return nr;
684 }
685
686 LDLM_POOL_PROC_READER(grant_plan, int);
687 LDLM_POOL_PROC_READER(recalc_period, int);
688 LDLM_POOL_PROC_WRITER(recalc_period, int);
689
690 static int ldlm_pool_proc_init(struct ldlm_pool *pl)
691 {
692         struct ldlm_namespace *ns = ldlm_pl2ns(pl);
693         struct proc_dir_entry *parent_ns_proc;
694         struct lprocfs_vars pool_vars[2];
695         char *var_name = NULL;
696         int rc = 0;
697         ENTRY;
698
699         OBD_ALLOC(var_name, MAX_STRING_SIZE + 1);
700         if (!var_name)
701                 RETURN(-ENOMEM);
702
703         parent_ns_proc = lprocfs_srch(ldlm_ns_proc_dir, ns->ns_name);
704         if (parent_ns_proc == NULL) {
705                 CERROR("%s: proc entry is not initialized\n",
706                        ns->ns_name);
707                 GOTO(out_free_name, rc = -EINVAL);
708         }
709         pl->pl_proc_dir = lprocfs_register("pool", parent_ns_proc,
710                                            NULL, NULL);
711         if (IS_ERR(pl->pl_proc_dir)) {
712                 CERROR("LProcFS failed in ldlm-pool-init\n");
713                 rc = PTR_ERR(pl->pl_proc_dir);
714                 GOTO(out_free_name, rc);
715         }
716
717         var_name[MAX_STRING_SIZE] = '\0';
718         memset(pool_vars, 0, sizeof(pool_vars));
719         pool_vars[0].name = var_name;
720
721         snprintf(var_name, MAX_STRING_SIZE, "server_lock_volume");
722         pool_vars[0].data = &pl->pl_server_lock_volume;
723         pool_vars[0].read_fptr = lprocfs_rd_u64;
724         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
725
726         snprintf(var_name, MAX_STRING_SIZE, "limit");
727         pool_vars[0].data = &pl->pl_limit;
728         pool_vars[0].read_fptr = lprocfs_rd_atomic;
729         pool_vars[0].write_fptr = lprocfs_wr_atomic;
730         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
731
732         snprintf(var_name, MAX_STRING_SIZE, "granted");
733         pool_vars[0].data = &pl->pl_granted;
734         pool_vars[0].read_fptr = lprocfs_rd_atomic;
735         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
736
737         snprintf(var_name, MAX_STRING_SIZE, "grant_speed");
738         pool_vars[0].data = &pl->pl_grant_speed;
739         pool_vars[0].read_fptr = lprocfs_rd_atomic;
740         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
741
742         snprintf(var_name, MAX_STRING_SIZE, "cancel_rate");
743         pool_vars[0].data = &pl->pl_cancel_rate;
744         pool_vars[0].read_fptr = lprocfs_rd_atomic;
745         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
746
747         snprintf(var_name, MAX_STRING_SIZE, "grant_rate");
748         pool_vars[0].data = &pl->pl_grant_rate;
749         pool_vars[0].read_fptr = lprocfs_rd_atomic;
750         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
751
752         snprintf(var_name, MAX_STRING_SIZE, "grant_plan");
753         pool_vars[0].data = pl;
754         pool_vars[0].read_fptr = lprocfs_rd_grant_plan;
755         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
756
757         snprintf(var_name, MAX_STRING_SIZE, "recalc_period");
758         pool_vars[0].data = pl;
759         pool_vars[0].read_fptr = lprocfs_rd_recalc_period;
760         pool_vars[0].write_fptr = lprocfs_wr_recalc_period;
761         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
762
763         snprintf(var_name, MAX_STRING_SIZE, "lock_volume_factor");
764         pool_vars[0].data = &pl->pl_lock_volume_factor;
765         pool_vars[0].read_fptr = lprocfs_rd_atomic;
766         pool_vars[0].write_fptr = lprocfs_wr_atomic;
767         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
768
769         snprintf(var_name, MAX_STRING_SIZE, "state");
770         pool_vars[0].data = pl;
771         pool_vars[0].read_fptr = lprocfs_rd_pool_state;
772         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
773
774         pl->pl_stats = lprocfs_alloc_stats(LDLM_POOL_LAST_STAT -
775                                            LDLM_POOL_FIRST_STAT, 0);
776         if (!pl->pl_stats)
777                 GOTO(out_free_name, rc = -ENOMEM);
778
779         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANTED_STAT,
780                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
781                              "granted", "locks");
782         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANT_STAT, 
783                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
784                              "grant", "locks");
785         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_CANCEL_STAT, 
786                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
787                              "cancel", "locks");
788         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANT_RATE_STAT,
789                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
790                              "grant_rate", "locks/s");
791         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_CANCEL_RATE_STAT,
792                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
793                              "cancel_rate", "locks/s");
794         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANT_PLAN_STAT,
795                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
796                              "grant_plan", "locks/s");
797         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_SLV_STAT,
798                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
799                              "slv", "slv");
800         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_SHRINK_REQTD_STAT,
801                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
802                              "shrink_request", "locks");
803         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_SHRINK_FREED_STAT,
804                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
805                              "shrink_freed", "locks");
806         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_RECALC_STAT,
807                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
808                              "recalc_freed", "locks");
809         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_TIMING_STAT,
810                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
811                              "recalc_timing", "sec");
812         lprocfs_register_stats(pl->pl_proc_dir, "stats", pl->pl_stats);
813
814         EXIT;
815 out_free_name:
816         OBD_FREE(var_name, MAX_STRING_SIZE + 1);
817         return rc;
818 }
819
820 static void ldlm_pool_proc_fini(struct ldlm_pool *pl)
821 {
822         if (pl->pl_stats != NULL) {
823                 lprocfs_free_stats(&pl->pl_stats);
824                 pl->pl_stats = NULL;
825         }
826         if (pl->pl_proc_dir != NULL) {
827                 lprocfs_remove(&pl->pl_proc_dir);
828                 pl->pl_proc_dir = NULL;
829         }
830 }
831 #else /* !__KERNEL__*/
832 #define ldlm_pool_proc_init(pl) (0)
833 #define ldlm_pool_proc_fini(pl) while (0) {}
834 #endif
835
836 int ldlm_pool_init(struct ldlm_pool *pl, struct ldlm_namespace *ns,
837                    int idx, ldlm_side_t client)
838 {
839         int rc;
840         ENTRY;
841
842         spin_lock_init(&pl->pl_lock);
843         atomic_set(&pl->pl_granted, 0);
844         pl->pl_recalc_time = cfs_time_current_sec();
845         atomic_set(&pl->pl_lock_volume_factor, 1);
846
847         atomic_set(&pl->pl_grant_rate, 0);
848         atomic_set(&pl->pl_cancel_rate, 0);
849         atomic_set(&pl->pl_grant_speed, 0);
850         pl->pl_grant_plan = LDLM_POOL_GP(LDLM_POOL_HOST_L);
851
852         snprintf(pl->pl_name, sizeof(pl->pl_name), "ldlm-pool-%s-%d",
853                  ns->ns_name, idx);
854
855         if (client == LDLM_NAMESPACE_SERVER) {
856                 pl->pl_ops = &ldlm_srv_pool_ops;
857                 ldlm_pool_set_limit(pl, LDLM_POOL_HOST_L);
858                 pl->pl_recalc_period = LDLM_POOL_SRV_DEF_RECALC_PERIOD;
859                 pl->pl_server_lock_volume = ldlm_pool_slv_max(LDLM_POOL_HOST_L);
860         } else {
861                 ldlm_pool_set_limit(pl, 1);
862                 pl->pl_server_lock_volume = 1;
863                 pl->pl_ops = &ldlm_cli_pool_ops;
864                 pl->pl_recalc_period = LDLM_POOL_CLI_DEF_RECALC_PERIOD;
865         }
866         pl->pl_client_lock_volume = 0;
867         rc = ldlm_pool_proc_init(pl);
868         if (rc)
869                 RETURN(rc);
870
871         CDEBUG(D_DLMTRACE, "Lock pool %s is initialized\n", pl->pl_name);
872
873         RETURN(rc);
874 }
875 EXPORT_SYMBOL(ldlm_pool_init);
876
877 void ldlm_pool_fini(struct ldlm_pool *pl)
878 {
879         ENTRY;
880         ldlm_pool_proc_fini(pl);
881         
882         /* 
883          * Pool should not be used after this point. We can't free it here as
884          * it lives in struct ldlm_namespace, but still interested in catching
885          * any abnormal using cases.
886          */
887         POISON(pl, 0x5a, sizeof(*pl));
888         EXIT;
889 }
890 EXPORT_SYMBOL(ldlm_pool_fini);
891
892 /**
893  * Add new taken ldlm lock \a lock into pool \a pl accounting.
894  */
895 void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock)
896 {
897         /* 
898          * FLOCK locks are special in a sense that they are almost never
899          * cancelled, instead special kind of lock is used to drop them.
900          * also there is no LRU for flock locks, so no point in tracking
901          * them anyway. 
902          */
903         if (lock->l_resource->lr_type == LDLM_FLOCK)
904                 return;
905         ENTRY;
906                 
907         atomic_inc(&pl->pl_granted);
908         atomic_inc(&pl->pl_grant_rate);
909         atomic_inc(&pl->pl_grant_speed);
910
911         lprocfs_counter_incr(pl->pl_stats, LDLM_POOL_GRANT_STAT);
912  
913         /* 
914          * Do not do pool recalc for client side as all locks which
915          * potentially may be canceled has already been packed into 
916          * enqueue/cancel rpc. Also we do not want to run out of stack
917          * with too long call paths. 
918          */
919         if (ns_is_server(ldlm_pl2ns(pl)))
920                 ldlm_pool_recalc(pl);
921         EXIT;
922 }
923 EXPORT_SYMBOL(ldlm_pool_add);
924
925 /**
926  * Remove ldlm lock \a lock from pool \a pl accounting.
927  */
928 void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock)
929 {
930         /*
931          * Filter out FLOCK locks. Read above comment in ldlm_pool_add().
932          */
933         if (lock->l_resource->lr_type == LDLM_FLOCK)
934                 return;
935         ENTRY;
936
937         LASSERT(atomic_read(&pl->pl_granted) > 0);
938         atomic_dec(&pl->pl_granted);
939         atomic_inc(&pl->pl_cancel_rate);
940         atomic_dec(&pl->pl_grant_speed);
941         
942         lprocfs_counter_incr(pl->pl_stats, LDLM_POOL_CANCEL_STAT);
943
944         if (ns_is_server(ldlm_pl2ns(pl)))
945                 ldlm_pool_recalc(pl);
946         EXIT;
947 }
948 EXPORT_SYMBOL(ldlm_pool_del);
949
950 /**
951  * Returns current \a pl SLV.
952  *
953  * \pre ->pl_lock is not locked. 
954  */
955 __u64 ldlm_pool_get_slv(struct ldlm_pool *pl)
956 {
957         __u64 slv;
958         spin_lock(&pl->pl_lock);
959         slv = pl->pl_server_lock_volume;
960         spin_unlock(&pl->pl_lock);
961         return slv;
962 }
963 EXPORT_SYMBOL(ldlm_pool_get_slv);
964
965 /**
966  * Sets passed \a slv to \a pl.
967  *
968  * \pre ->pl_lock is not locked. 
969  */
970 void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv)
971 {
972         spin_lock(&pl->pl_lock);
973         pl->pl_server_lock_volume = slv;
974         spin_unlock(&pl->pl_lock);
975 }
976 EXPORT_SYMBOL(ldlm_pool_set_slv);
977
978 /**
979  * Returns current \a pl CLV.
980  *
981  * \pre ->pl_lock is not locked. 
982  */
983 __u64 ldlm_pool_get_clv(struct ldlm_pool *pl)
984 {
985         __u64 slv;
986         spin_lock(&pl->pl_lock);
987         slv = pl->pl_client_lock_volume;
988         spin_unlock(&pl->pl_lock);
989         return slv;
990 }
991 EXPORT_SYMBOL(ldlm_pool_get_clv);
992
993 /**
994  * Sets passed \a clv to \a pl.
995  *
996  * \pre ->pl_lock is not locked. 
997  */
998 void ldlm_pool_set_clv(struct ldlm_pool *pl, __u64 clv)
999 {
1000         spin_lock(&pl->pl_lock);
1001         pl->pl_client_lock_volume = clv;
1002         spin_unlock(&pl->pl_lock);
1003 }
1004 EXPORT_SYMBOL(ldlm_pool_set_clv);
1005
1006 /**
1007  * Returns current \a pl limit.
1008  */
1009 __u32 ldlm_pool_get_limit(struct ldlm_pool *pl)
1010 {
1011         return atomic_read(&pl->pl_limit);
1012 }
1013 EXPORT_SYMBOL(ldlm_pool_get_limit);
1014
1015 /**
1016  * Sets passed \a limit to \a pl.
1017  */
1018 void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit)
1019 {
1020         atomic_set(&pl->pl_limit, limit);
1021 }
1022 EXPORT_SYMBOL(ldlm_pool_set_limit);
1023
1024 /**
1025  * Returns current LVF from \a pl.
1026  */
1027 __u32 ldlm_pool_get_lvf(struct ldlm_pool *pl)
1028 {
1029         return atomic_read(&pl->pl_lock_volume_factor);
1030 }
1031 EXPORT_SYMBOL(ldlm_pool_get_lvf);
1032
1033 #ifdef __KERNEL__
1034 static int ldlm_pool_granted(struct ldlm_pool *pl)
1035 {
1036         return atomic_read(&pl->pl_granted);
1037 }
1038
1039 static struct ptlrpc_thread *ldlm_pools_thread;
1040 static struct shrinker *ldlm_pools_srv_shrinker;
1041 static struct shrinker *ldlm_pools_cli_shrinker;
1042 static struct completion ldlm_pools_comp;
1043
1044 /* 
1045  * Cancel \a nr locks from all namespaces (if possible). Returns number of
1046  * cached locks after shrink is finished. All namespaces are asked to
1047  * cancel approximately equal amount of locks to keep balancing.
1048  */
1049 static int ldlm_pools_shrink(ldlm_side_t client, int nr, 
1050                              unsigned int gfp_mask)
1051 {
1052         int total = 0, cached = 0, nr_ns;
1053         struct ldlm_namespace *ns;
1054
1055         if (nr != 0 && !(gfp_mask & __GFP_FS))
1056                 return -1;
1057
1058         CDEBUG(D_DLMTRACE, "Request to shrink %d %s locks from all pools\n",
1059                nr, client == LDLM_NAMESPACE_CLIENT ? "client" : "server");
1060
1061         /* 
1062          * Find out how many resources we may release. 
1063          */
1064         for (nr_ns = atomic_read(ldlm_namespace_nr(client)); 
1065              nr_ns > 0; nr_ns--) 
1066         {
1067                 mutex_down(ldlm_namespace_lock(client));
1068                 if (list_empty(ldlm_namespace_list(client))) {
1069                         mutex_up(ldlm_namespace_lock(client));
1070                         return 0;
1071                 }
1072                 ns = ldlm_namespace_first_locked(client);
1073                 ldlm_namespace_get(ns);
1074                 ldlm_namespace_move_locked(ns, client);
1075                 mutex_up(ldlm_namespace_lock(client));
1076                 total += ldlm_pool_shrink(&ns->ns_pool, 0, gfp_mask);
1077                 ldlm_namespace_put(ns, 1);
1078         }
1079  
1080         if (nr == 0 || total == 0)
1081                 return total;
1082
1083         /* 
1084          * Shrink at least ldlm_namespace_nr(client) namespaces. 
1085          */
1086         for (nr_ns = atomic_read(ldlm_namespace_nr(client)); 
1087              nr_ns > 0; nr_ns--) 
1088         {
1089                 int cancel, nr_locks;
1090
1091                 /* 
1092                  * Do not call shrink under ldlm_namespace_lock(client) 
1093                  */
1094                 mutex_down(ldlm_namespace_lock(client));
1095                 if (list_empty(ldlm_namespace_list(client))) {
1096                         mutex_up(ldlm_namespace_lock(client));
1097                         /* 
1098                          * If list is empty, we can't return any @cached > 0,
1099                          * that probably would cause needless shrinker
1100                          * call. 
1101                          */
1102                         cached = 0;
1103                         break;
1104                 }
1105                 ns = ldlm_namespace_first_locked(client);
1106                 ldlm_namespace_get(ns);
1107                 ldlm_namespace_move_locked(ns, client);
1108                 mutex_up(ldlm_namespace_lock(client));
1109                 
1110                 nr_locks = ldlm_pool_granted(&ns->ns_pool);
1111                 cancel = 1 + nr_locks * nr / total;
1112                 ldlm_pool_shrink(&ns->ns_pool, cancel, gfp_mask);
1113                 cached += ldlm_pool_granted(&ns->ns_pool);
1114                 ldlm_namespace_put(ns, 1);
1115         }
1116         return cached;
1117 }
1118
1119 static int ldlm_pools_srv_shrink(int nr, unsigned int gfp_mask)
1120 {
1121         return ldlm_pools_shrink(LDLM_NAMESPACE_SERVER, nr, gfp_mask);
1122 }
1123
1124 static int ldlm_pools_cli_shrink(int nr, unsigned int gfp_mask)
1125 {
1126         return ldlm_pools_shrink(LDLM_NAMESPACE_CLIENT, nr, gfp_mask);
1127 }
1128
1129 void ldlm_pools_recalc(ldlm_side_t client)
1130 {
1131         __u32 nr_l = 0, nr_p = 0, l;
1132         struct ldlm_namespace *ns;
1133         int nr, equal = 0;
1134
1135         /* 
1136          * No need to setup pool limit for client pools.
1137          */
1138         if (client == LDLM_NAMESPACE_SERVER) {
1139                 /* 
1140                  * Check all modest namespaces first. 
1141                  */
1142                 mutex_down(ldlm_namespace_lock(client));
1143                 list_for_each_entry(ns, ldlm_namespace_list(client), 
1144                                     ns_list_chain) 
1145                 {
1146                         if (ns->ns_appetite != LDLM_NAMESPACE_MODEST)
1147                                 continue;
1148
1149                         l = ldlm_pool_granted(&ns->ns_pool);
1150                         if (l == 0)
1151                                 l = 1;
1152
1153                         /* 
1154                          * Set the modest pools limit equal to their avg granted
1155                          * locks + 5%. 
1156                          */
1157                         l += dru(l * LDLM_POOLS_MODEST_MARGIN, 100);
1158                         ldlm_pool_setup(&ns->ns_pool, l);
1159                         nr_l += l;
1160                         nr_p++;
1161                 }
1162
1163                 /* 
1164                  * Make sure that modest namespaces did not eat more that 2/3 
1165                  * of limit. 
1166                  */
1167                 if (nr_l >= 2 * (LDLM_POOL_HOST_L / 3)) {
1168                         CWARN("\"Modest\" pools eat out 2/3 of server locks "
1169                               "limit (%d of %lu). This means that you have too "
1170                               "many clients for this amount of server RAM. "
1171                               "Upgrade server!\n", nr_l, LDLM_POOL_HOST_L);
1172                         equal = 1;
1173                 }
1174
1175                 /* 
1176                  * The rest is given to greedy namespaces. 
1177                  */
1178                 list_for_each_entry(ns, ldlm_namespace_list(client), 
1179                                     ns_list_chain) 
1180                 {
1181                         if (!equal && ns->ns_appetite != LDLM_NAMESPACE_GREEDY)
1182                                 continue;
1183
1184                         if (equal) {
1185                                 /* 
1186                                  * In the case 2/3 locks are eaten out by
1187                                  * modest pools, we re-setup equal limit
1188                                  * for _all_ pools. 
1189                                  */
1190                                 l = LDLM_POOL_HOST_L /
1191                                         atomic_read(ldlm_namespace_nr(client));
1192                         } else {
1193                                 /* 
1194                                  * All the rest of greedy pools will have
1195                                  * all locks in equal parts.
1196                                  */
1197                                 l = (LDLM_POOL_HOST_L - nr_l) /
1198                                         (atomic_read(ldlm_namespace_nr(client)) -
1199                                          nr_p);
1200                         }
1201                         ldlm_pool_setup(&ns->ns_pool, l);
1202                 }
1203                 mutex_up(ldlm_namespace_lock(client));
1204         }
1205
1206         /* 
1207          * Recalc at least ldlm_namespace_nr(client) namespaces. 
1208          */
1209         for (nr = atomic_read(ldlm_namespace_nr(client)); nr > 0; nr--) {
1210                 /* 
1211                  * Lock the list, get first @ns in the list, getref, move it
1212                  * to the tail, unlock and call pool recalc. This way we avoid
1213                  * calling recalc under @ns lock what is really good as we get
1214                  * rid of potential deadlock on client nodes when canceling
1215                  * locks synchronously. 
1216                  */
1217                 mutex_down(ldlm_namespace_lock(client));
1218                 if (list_empty(ldlm_namespace_list(client))) {
1219                         mutex_up(ldlm_namespace_lock(client));
1220                         break;
1221                 }
1222                 ns = ldlm_namespace_first_locked(client);
1223                 ldlm_namespace_get(ns);
1224                 ldlm_namespace_move_locked(ns, client);
1225                 mutex_up(ldlm_namespace_lock(client));
1226
1227                 /* 
1228                  * After setup is done - recalc the pool. 
1229                  */
1230                 ldlm_pool_recalc(&ns->ns_pool);
1231                 ldlm_namespace_put(ns, 1);
1232         }
1233 }
1234 EXPORT_SYMBOL(ldlm_pools_recalc);
1235
1236 static int ldlm_pools_thread_main(void *arg)
1237 {
1238         struct ptlrpc_thread *thread = (struct ptlrpc_thread *)arg;
1239         char *t_name = "ldlm_poold";
1240         ENTRY;
1241
1242         cfs_daemonize(t_name);
1243         thread->t_flags = SVC_RUNNING;
1244         cfs_waitq_signal(&thread->t_ctl_waitq);
1245
1246         CDEBUG(D_DLMTRACE, "%s: pool thread starting, process %d\n",
1247                t_name, cfs_curproc_pid());
1248
1249         while (1) {
1250                 struct l_wait_info lwi;
1251
1252                 /*
1253                  * Recal all pools on this tick. 
1254                  */
1255                 ldlm_pools_recalc(LDLM_NAMESPACE_SERVER);
1256                 ldlm_pools_recalc(LDLM_NAMESPACE_CLIENT);
1257                 
1258                 /*
1259                  * Wait until the next check time, or until we're
1260                  * stopped. 
1261                  */
1262                 lwi = LWI_TIMEOUT(cfs_time_seconds(LDLM_POOLS_THREAD_PERIOD),
1263                                   NULL, NULL);
1264                 l_wait_event(thread->t_ctl_waitq, (thread->t_flags &
1265                                                    (SVC_STOPPING|SVC_EVENT)),
1266                              &lwi);
1267
1268                 if (thread->t_flags & SVC_STOPPING) {
1269                         thread->t_flags &= ~SVC_STOPPING;
1270                         break;
1271                 } else if (thread->t_flags & SVC_EVENT) {
1272                         thread->t_flags &= ~SVC_EVENT;
1273                 }
1274         }
1275
1276         thread->t_flags = SVC_STOPPED;
1277         cfs_waitq_signal(&thread->t_ctl_waitq);
1278
1279         CDEBUG(D_DLMTRACE, "%s: pool thread exiting, process %d\n",
1280                t_name, cfs_curproc_pid());
1281
1282         complete_and_exit(&ldlm_pools_comp, 0);
1283 }
1284
1285 static int ldlm_pools_thread_start(void)
1286 {
1287         struct l_wait_info lwi = { 0 };
1288         int rc;
1289         ENTRY;
1290
1291         if (ldlm_pools_thread != NULL)
1292                 RETURN(-EALREADY);
1293
1294         OBD_ALLOC_PTR(ldlm_pools_thread);
1295         if (ldlm_pools_thread == NULL)
1296                 RETURN(-ENOMEM);
1297
1298         init_completion(&ldlm_pools_comp);
1299         cfs_waitq_init(&ldlm_pools_thread->t_ctl_waitq);
1300
1301         /* 
1302          * CLONE_VM and CLONE_FILES just avoid a needless copy, because we
1303          * just drop the VM and FILES in ptlrpc_daemonize() right away. 
1304          */
1305         rc = cfs_kernel_thread(ldlm_pools_thread_main, ldlm_pools_thread,
1306                                CLONE_VM | CLONE_FILES);
1307         if (rc < 0) {
1308                 CERROR("Can't start pool thread, error %d\n",
1309                        rc);
1310                 OBD_FREE(ldlm_pools_thread, sizeof(*ldlm_pools_thread));
1311                 ldlm_pools_thread = NULL;
1312                 RETURN(rc);
1313         }
1314         l_wait_event(ldlm_pools_thread->t_ctl_waitq,
1315                      (ldlm_pools_thread->t_flags & SVC_RUNNING), &lwi);
1316         RETURN(0);
1317 }
1318
1319 static void ldlm_pools_thread_stop(void)
1320 {
1321         ENTRY;
1322
1323         if (ldlm_pools_thread == NULL) {
1324                 EXIT;
1325                 return;
1326         }
1327
1328         ldlm_pools_thread->t_flags = SVC_STOPPING;
1329         cfs_waitq_signal(&ldlm_pools_thread->t_ctl_waitq);
1330
1331         /* 
1332          * Make sure that pools thread is finished before freeing @thread.
1333          * This fixes possible race and oops due to accessing freed memory
1334          * in pools thread. 
1335          */
1336         wait_for_completion(&ldlm_pools_comp);
1337         OBD_FREE_PTR(ldlm_pools_thread);
1338         ldlm_pools_thread = NULL;
1339         EXIT;
1340 }
1341
1342 int ldlm_pools_init(void)
1343 {
1344         int rc;
1345         ENTRY;
1346
1347         rc = ldlm_pools_thread_start();
1348         if (rc == 0) {
1349                 ldlm_pools_srv_shrinker = set_shrinker(DEFAULT_SEEKS,
1350                                                        ldlm_pools_srv_shrink);
1351                 ldlm_pools_cli_shrinker = set_shrinker(DEFAULT_SEEKS,
1352                                                        ldlm_pools_cli_shrink);
1353         }
1354         RETURN(rc);
1355 }
1356 EXPORT_SYMBOL(ldlm_pools_init);
1357
1358 void ldlm_pools_fini(void)
1359 {
1360         if (ldlm_pools_srv_shrinker != NULL) {
1361                 remove_shrinker(ldlm_pools_srv_shrinker);
1362                 ldlm_pools_srv_shrinker = NULL;
1363         }
1364         if (ldlm_pools_cli_shrinker != NULL) {
1365                 remove_shrinker(ldlm_pools_cli_shrinker);
1366                 ldlm_pools_cli_shrinker = NULL;
1367         }
1368         ldlm_pools_thread_stop();
1369 }
1370 EXPORT_SYMBOL(ldlm_pools_fini);
1371 #endif /* __KERNEL__ */
1372
1373 #else /* !HAVE_LRU_RESIZE_SUPPORT */
1374 int ldlm_pool_setup(struct ldlm_pool *pl, int limit)
1375 {
1376         return 0;
1377 }
1378 EXPORT_SYMBOL(ldlm_pool_setup);
1379
1380 int ldlm_pool_recalc(struct ldlm_pool *pl)
1381 {
1382         return 0;
1383 }
1384 EXPORT_SYMBOL(ldlm_pool_recalc);
1385
1386 int ldlm_pool_shrink(struct ldlm_pool *pl,
1387                      int nr, unsigned int gfp_mask)
1388 {
1389         return 0;
1390 }
1391 EXPORT_SYMBOL(ldlm_pool_shrink);
1392
1393 int ldlm_pool_init(struct ldlm_pool *pl, struct ldlm_namespace *ns,
1394                    int idx, ldlm_side_t client)
1395 {
1396         return 0;
1397 }
1398 EXPORT_SYMBOL(ldlm_pool_init);
1399
1400 void ldlm_pool_fini(struct ldlm_pool *pl)
1401 {
1402         return;
1403 }
1404 EXPORT_SYMBOL(ldlm_pool_fini);
1405
1406 void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock)
1407 {
1408         return;
1409 }
1410 EXPORT_SYMBOL(ldlm_pool_add);
1411
1412 void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock)
1413 {
1414         return;
1415 }
1416 EXPORT_SYMBOL(ldlm_pool_del);
1417
1418 __u64 ldlm_pool_get_slv(struct ldlm_pool *pl)
1419 {
1420         return 1;
1421 }
1422 EXPORT_SYMBOL(ldlm_pool_get_slv);
1423
1424 void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv)
1425 {
1426         return;
1427 }
1428 EXPORT_SYMBOL(ldlm_pool_set_slv);
1429
1430 __u64 ldlm_pool_get_clv(struct ldlm_pool *pl)
1431 {
1432         return 1;
1433 }
1434 EXPORT_SYMBOL(ldlm_pool_get_clv);
1435
1436 void ldlm_pool_set_clv(struct ldlm_pool *pl, __u64 clv)
1437 {
1438         return;
1439 }
1440 EXPORT_SYMBOL(ldlm_pool_set_clv);
1441
1442 __u32 ldlm_pool_get_limit(struct ldlm_pool *pl)
1443 {
1444         return 0;
1445 }
1446 EXPORT_SYMBOL(ldlm_pool_get_limit);
1447
1448 void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit)
1449 {
1450         return;
1451 }
1452 EXPORT_SYMBOL(ldlm_pool_set_limit);
1453
1454 __u32 ldlm_pool_get_lvf(struct ldlm_pool *pl)
1455 {
1456         return 0;
1457 }
1458 EXPORT_SYMBOL(ldlm_pool_get_lvf);
1459
1460 int ldlm_pools_init(void)
1461 {
1462         return 0;
1463 }
1464 EXPORT_SYMBOL(ldlm_pools_init);
1465
1466 void ldlm_pools_fini(void)
1467 {
1468         return;
1469 }
1470 EXPORT_SYMBOL(ldlm_pools_fini);
1471
1472 void ldlm_pools_recalc(ldlm_side_t client)
1473 {
1474         return;
1475 }
1476 EXPORT_SYMBOL(ldlm_pools_recalc);
1477 #endif /* HAVE_LRU_RESIZE_SUPPORT */