Whamcloud - gitweb
bd89cfaa3e6d966515be1e18f9b52682c827420e
[fs/lustre-release.git] / lustre / ldlm / ldlm_pool.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2007 Cluster File Systems, Inc.
5  *   Author: Yury Umanets <umka@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  */
25
26 /* 
27  * Idea of this code is rather simple. Each second, for each server namespace
28  * we have SLV - server lock volume which is calculated on current number of
29  * granted locks, grant speed for past period, etc - that is, locking load.
30  * This SLV number may be thought as a flow definition for simplicity. It is
31  * sent to clients with each occasion to let them know what is current load
32  * situation on the server. By default, at the beginning, SLV on server is
33  * set max value which is calculated as the following: allow to one client
34  * have all locks of limit ->pl_limit for 10h.
35  *
36  * Next, on clients, number of cached locks is not limited artificially in any
37  * way as it was before. Instead, client calculates CLV, that is, client lock
38  * volume for each lock and compares it with last SLV from the server. CLV is
39  * calculated as the number of locks in LRU * lock live time in seconds. If
40  * CLV > SLV - lock is canceled.
41  *
42  * Client has LVF, that is, lock volume factor which regulates how much sensitive
43  * client should be about last SLV from server. The higher LVF is the more locks
44  * will be canceled on client. Default value for it is 1. Setting LVF to 2 means
45  * that client will cancel locks 2 times faster.
46  *
47  * Locks on a client will be canceled more intensively in these cases:
48  * (1) if SLV is smaller, that is, load is higher on the server;
49  * (2) client has a lot of locks (the more locks are held by client, the bigger
50  *     chances that some of them should be canceled);
51  * (3) client has old locks (taken some time ago);
52  *
53  * Thus, according to flow paradigm that we use for better understanding SLV,
54  * CLV is the volume of particle in flow described by SLV. According to this,
55  * if flow is getting thinner, more and more particles become outside of it and
56  * as particles are locks, they should be canceled.
57  *
58  * General idea of this belongs to Vitaly Fertman (vitaly@clusterfs.com). Andreas
59  * Dilger (adilger@clusterfs.com) proposed few nice ideas like using LVF and many
60  * cleanups. Flow definition to allow more easy understanding of the logic belongs
61  * to Nikita Danilov (nikita@clusterfs.com) as well as many cleanups and fixes.
62  * And design and implementation are done by Yury Umanets (umka@clusterfs.com).
63  *
64  * Glossary for terms used:
65  *
66  * pl_limit - Number of allowed locks in pool. Applies to server and client
67  * side (tunable);
68  *
69  * pl_granted - Number of granted locks (calculated);
70  * pl_grant_rate - Number of granted locks for last T (calculated);
71  * pl_cancel_rate - Number of canceled locks for last T (calculated);
72  * pl_grant_speed - Grant speed (GR - CR) for last T (calculated);
73  * pl_grant_plan - Planned number of granted locks for next T (calculated);
74  *
75  * pl_grant_step - Grant plan step, that is how ->pl_grant_plan
76  * will change in next T (tunable);
77  *
78  * pl_server_lock_volume - Current server lock volume (calculated);
79  *
80  * As it may be seen from list above, we have few possible tunables which may
81  * affect behavior much. They all may be modified via proc. However, they also
82  * give a possibility for constructing few pre-defined behavior policies. If
83  * none of predefines is suitable for a working pattern being used, new one may
84  * be "constructed" via proc tunables.
85  */
86
87 #define DEBUG_SUBSYSTEM S_LDLM
88
89 #ifdef __KERNEL__
90 # include <lustre_dlm.h>
91 #else
92 # include <liblustre.h>
93 # include <libcfs/kp30.h>
94 #endif
95
96 #include <obd_class.h>
97 #include <obd_support.h>
98 #include "ldlm_internal.h"
99
100 #ifdef HAVE_LRU_RESIZE_SUPPORT
101
102 /*
103  * 50 ldlm locks for 1MB of RAM. 
104  */
105 #define LDLM_POOL_HOST_L ((num_physpages >> (20 - CFS_PAGE_SHIFT)) * 50)
106
107 /*
108  * Default step in % for grant plan. 
109  */
110 #define LDLM_POOL_GSP (10)
111
112 /* 
113  * LDLM_POOL_GSP% of all locks is default GP. 
114  */
115 #define LDLM_POOL_GP(L)   (((L) * LDLM_POOL_GSP) / 100)
116
117 /* 
118  * Max age for locks on clients. 
119  */
120 #define LDLM_POOL_MAX_AGE (36000)
121
122 #ifdef __KERNEL__
123 extern cfs_proc_dir_entry_t *ldlm_ns_proc_dir;
124 #endif
125
126 #define avg(src, add) \
127         ((src) = ((src) + (add)) / 2)
128
129 static inline __u64 dru(__u64 val, __u32 div)
130 {
131         __u64 ret = val + (div - 1);
132         do_div(ret, div);
133         return ret;
134 }
135
136 static inline __u64 ldlm_pool_slv_max(__u32 L)
137 {
138         /*
139          * Allow to have all locks for 1 client for 10 hrs.
140          * Formula is the following: limit * 10h / 1 client. 
141          */
142         __u64 lim = L *  LDLM_POOL_MAX_AGE / 1;
143         return lim;
144 }
145
146 static inline __u64 ldlm_pool_slv_min(__u32 L)
147 {
148         return 1;
149 }
150
151 enum {
152         LDLM_POOL_FIRST_STAT = 0,
153         LDLM_POOL_GRANTED_STAT = LDLM_POOL_FIRST_STAT,
154         LDLM_POOL_GRANT_STAT,
155         LDLM_POOL_CANCEL_STAT,
156         LDLM_POOL_GRANT_RATE_STAT,
157         LDLM_POOL_CANCEL_RATE_STAT,
158         LDLM_POOL_GRANT_PLAN_STAT,
159         LDLM_POOL_SLV_STAT,
160         LDLM_POOL_SHRINK_REQTD_STAT,
161         LDLM_POOL_SHRINK_FREED_STAT,
162         LDLM_POOL_RECALC_STAT,
163         LDLM_POOL_TIMING_STAT,
164         LDLM_POOL_LAST_STAT
165 };
166
167 static inline struct ldlm_namespace *ldlm_pl2ns(struct ldlm_pool *pl)
168 {
169         return container_of(pl, struct ldlm_namespace, ns_pool);
170 }
171
172 /**
173  * Recalculates next grant limit on passed \a pl.
174  *
175  * \pre ->pl_lock is locked. 
176  */
177 static inline void ldlm_pool_recalc_grant_plan(struct ldlm_pool *pl)
178 {
179         int granted, grant_step, limit;
180         
181         limit = ldlm_pool_get_limit(pl);
182         granted = atomic_read(&pl->pl_granted);
183
184         grant_step = ((limit - granted) * pl->pl_grant_step) / 100;
185         pl->pl_grant_plan = granted + grant_step;
186 }
187
188 /**
189  * Recalculates next SLV on passed \a pl.
190  *
191  * \pre ->pl_lock is locked. 
192  */
193 static inline void ldlm_pool_recalc_slv(struct ldlm_pool *pl)
194 {
195         int grant_usage, granted, grant_plan;
196         __u64 slv, slv_factor;
197         __u32 limit;
198
199         slv = pl->pl_server_lock_volume;
200         grant_plan = pl->pl_grant_plan;
201         limit = ldlm_pool_get_limit(pl);
202         granted = atomic_read(&pl->pl_granted);
203
204         grant_usage = limit - (granted - grant_plan);
205         if (grant_usage <= 0)
206                 grant_usage = 1;
207
208         /* 
209          * Find out SLV change factor which is the ratio of grant usage 
210          * from limit. SLV changes as fast as the ratio of grant plan 
211          * consumtion. The more locks from grant plan are not consumed 
212          * by clients in last interval (idle time), the faster grows 
213          * SLV. And the opposite, the more grant plan is over-consumed
214          * (load time) the faster drops SLV. 
215          */
216         slv_factor = (grant_usage * 100) / limit;
217         if (2 * abs(granted - limit) > limit) {
218                 slv_factor *= slv_factor;
219                 slv_factor = dru(slv_factor, 100);
220         }
221         slv = slv * slv_factor;
222         slv = dru(slv, 100);
223
224         if (slv > ldlm_pool_slv_max(limit)) {
225                 slv = ldlm_pool_slv_max(limit);
226         } else if (slv < ldlm_pool_slv_min(limit)) {
227                 slv = ldlm_pool_slv_min(limit);
228         }
229
230         pl->pl_server_lock_volume = slv;
231 }
232
233 /**
234  * Recalculates next stats on passed \a pl.
235  *
236  * \pre ->pl_lock is locked. 
237  */
238 static inline void ldlm_pool_recalc_stats(struct ldlm_pool *pl)
239 {
240         int grant_plan = pl->pl_grant_plan;
241         __u64 slv = pl->pl_server_lock_volume;
242         int granted = atomic_read(&pl->pl_granted);
243         int grant_rate = atomic_read(&pl->pl_grant_rate);
244         int cancel_rate = atomic_read(&pl->pl_cancel_rate);
245
246         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_SLV_STAT, 
247                             slv);
248         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANTED_STAT,
249                             granted);
250         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANT_RATE_STAT,
251                             grant_rate);
252         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANT_PLAN_STAT,
253                             grant_plan);
254         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_CANCEL_RATE_STAT,
255                             cancel_rate);
256 }
257
258 /**
259  * Sets current SLV into obd accessible via ldlm_pl2ns(pl)->ns_obd.
260  */
261 static void ldlm_srv_pool_push_slv(struct ldlm_pool *pl)
262 {
263         struct obd_device *obd;
264
265         /* 
266          * Set new SLV in obd field for using it later without accessing the
267          * pool. This is required to avoid race between sending reply to client
268          * with new SLV and cleanup server stack in which we can't guarantee
269          * that namespace is still alive. We know only that obd is alive as
270          * long as valid export is alive. 
271          */
272         obd = ldlm_pl2ns(pl)->ns_obd;
273         LASSERT(obd != NULL);
274         write_lock(&obd->obd_pool_lock);
275         obd->obd_pool_slv = pl->pl_server_lock_volume;
276         write_unlock(&obd->obd_pool_lock);
277 }
278
279 /**
280  * Recalculates all pool fields on passed \a pl.
281  *
282  * \pre ->pl_lock is not locked. 
283  */
284 static int ldlm_srv_pool_recalc(struct ldlm_pool *pl)
285 {
286         time_t recalc_interval_sec;
287         ENTRY;
288
289         spin_lock(&pl->pl_lock);
290         recalc_interval_sec = cfs_time_current_sec() - pl->pl_recalc_time;
291         if (recalc_interval_sec > 0) {
292                 /* 
293                  * Update statistics.
294                  */
295                 ldlm_pool_recalc_stats(pl);
296
297                 /* 
298                  * Recalc SLV after last period. This should be done
299                  * _before_ recalculating new grant plan. 
300                  */
301                 ldlm_pool_recalc_slv(pl);
302                 
303                 /* 
304                  * Make sure that pool informed obd of last SLV changes. 
305                  */
306                 ldlm_srv_pool_push_slv(pl);
307
308                 /* 
309                  * Update grant_plan for new period. 
310                  */
311                 ldlm_pool_recalc_grant_plan(pl);
312
313                 /* 
314                  * Zero out all rates and speed for the last period. 
315                  */
316                 atomic_set(&pl->pl_grant_rate, 0);
317                 atomic_set(&pl->pl_cancel_rate, 0);
318                 atomic_set(&pl->pl_grant_speed, 0);
319                 pl->pl_recalc_time = cfs_time_current_sec();
320                 lprocfs_counter_add(pl->pl_stats, LDLM_POOL_TIMING_STAT, 
321                                     recalc_interval_sec);
322         }
323         spin_unlock(&pl->pl_lock);
324         RETURN(0);
325 }
326
327 /**
328  * This function is used on server side as main entry point for memory
329  * preasure handling. It decreases SLV on \a pl according to passed
330  * \a nr and \a gfp_mask.
331  * 
332  * Our goal here is to decrease SLV such a way that clients hold \a nr
333  * locks smaller in next 10h. 
334  */
335 static int ldlm_srv_pool_shrink(struct ldlm_pool *pl,
336                                 int nr, unsigned int gfp_mask)
337 {
338         __u32 limit;
339         ENTRY;
340
341         /* 
342          * VM is asking how many entries may be potentially freed. 
343          */
344         if (nr == 0)
345                 RETURN(atomic_read(&pl->pl_granted));
346
347         /* 
348          * Client already canceled locks but server is already in shrinker
349          * and can't cancel anything. Let's catch this race. 
350          */
351         if (atomic_read(&pl->pl_granted) == 0)
352                 RETURN(0);
353
354         spin_lock(&pl->pl_lock);
355
356         /* 
357          * We want shrinker to possibly cause cancelation of @nr locks from
358          * clients or grant approximately @nr locks smaller next intervals.
359          *
360          * This is why we decresed SLV by @nr. This effect will only be as
361          * long as one re-calc interval (1s these days) and this should be
362          * enough to pass this decreased SLV to all clients. On next recalc
363          * interval pool will either increase SLV if locks load is not high
364          * or will keep on same level or even decrease again, thus, shrinker
365          * decreased SLV will affect next recalc intervals and this way will
366          * make locking load lower. 
367          */
368         if (nr < pl->pl_server_lock_volume) {
369                 pl->pl_server_lock_volume = pl->pl_server_lock_volume - nr;
370         } else {
371                 limit = ldlm_pool_get_limit(pl);
372                 pl->pl_server_lock_volume = ldlm_pool_slv_min(limit);
373         }
374
375         /* 
376          * Make sure that pool informed obd of last SLV changes. 
377          */
378         ldlm_srv_pool_push_slv(pl);
379         spin_unlock(&pl->pl_lock);
380
381         /* 
382          * We did not really free any memory here so far, it only will be
383          * freed later may be, so that we return 0 to not confuse VM. 
384          */
385         RETURN(0);
386 }
387
388 /**
389  * Setup server side pool \a pl with passed \a limit.
390  */
391 static int ldlm_srv_pool_setup(struct ldlm_pool *pl, int limit)
392 {
393         struct obd_device *obd;
394         ENTRY;
395         
396         obd = ldlm_pl2ns(pl)->ns_obd;
397         LASSERT(obd != NULL && obd != LP_POISON);
398         LASSERT(obd->obd_type != LP_POISON);
399         write_lock(&obd->obd_pool_lock);
400         obd->obd_pool_limit = limit;
401         write_unlock(&obd->obd_pool_lock);
402
403         ldlm_pool_set_limit(pl, limit);
404         RETURN(0);
405 }
406
407 /**
408  * Sets SLV and Limit from ldlm_pl2ns(pl)->ns_obd tp passed \a pl.
409  */
410 static void ldlm_cli_pool_pop_slv(struct ldlm_pool *pl)
411 {
412         struct obd_device *obd;
413
414         /* 
415          * Get new SLV and Limit from obd which is updated with comming 
416          * RPCs. 
417          */
418         obd = ldlm_pl2ns(pl)->ns_obd;
419         LASSERT(obd != NULL);
420         read_lock(&obd->obd_pool_lock);
421         pl->pl_server_lock_volume = obd->obd_pool_slv;
422         ldlm_pool_set_limit(pl, obd->obd_pool_limit);
423         read_unlock(&obd->obd_pool_lock);
424 }
425
426 /**
427  * Recalculates client sise pool \a pl according to current SLV and Limit.
428  */
429 static int ldlm_cli_pool_recalc(struct ldlm_pool *pl)
430 {
431         time_t recalc_interval_sec;
432         ENTRY;
433
434         spin_lock(&pl->pl_lock);
435
436         /* 
437          * Make sure that pool knows last SLV and Limit from obd. 
438          */
439         ldlm_cli_pool_pop_slv(pl);
440
441         recalc_interval_sec = cfs_time_current_sec() - pl->pl_recalc_time;
442         if (recalc_interval_sec > 0) {
443                 /* 
444                  * Update statistics only every T. 
445                  */
446                 ldlm_pool_recalc_stats(pl);
447
448                 /* 
449                  * Zero out grant/cancel rates and speed for last period. 
450                  */
451                 atomic_set(&pl->pl_grant_rate, 0);
452                 atomic_set(&pl->pl_cancel_rate, 0);
453                 atomic_set(&pl->pl_grant_speed, 0);
454                 pl->pl_recalc_time = cfs_time_current_sec();
455                 lprocfs_counter_add(pl->pl_stats, LDLM_POOL_TIMING_STAT, 
456                                     recalc_interval_sec);
457         }
458         spin_unlock(&pl->pl_lock);
459
460         /* 
461          * Do not cancel locks in case lru resize is disabled for this ns. 
462          */
463         if (!ns_connect_lru_resize(ldlm_pl2ns(pl)))
464                 RETURN(0);
465
466         /* 
467          * In the time of canceling locks on client we do not need to maintain
468          * sharp timing, we only want to cancel locks asap according to new SLV.
469          * It may be called when SLV has changed much, this is why we do not
470          * take into account pl->pl_recalc_time here. 
471          */
472         RETURN(ldlm_cancel_lru(ldlm_pl2ns(pl), 0, LDLM_ASYNC, 
473                                LDLM_CANCEL_LRUR));
474 }
475
476 /**
477  * This function is main entry point for memory preasure handling on client side.
478  * Main goal of this function is to cancel some number of locks on passed \a pl
479  * according to \a nr and \a gfp_mask.
480  */
481 static int ldlm_cli_pool_shrink(struct ldlm_pool *pl,
482                                 int nr, unsigned int gfp_mask)
483 {
484         ENTRY;
485         
486         /* 
487          * Do not cancel locks in case lru resize is disabled for this ns. 
488          */
489         if (!ns_connect_lru_resize(ldlm_pl2ns(pl)))
490                 RETURN(0);
491
492         /* 
493          * Make sure that pool knows last SLV and Limit from obd. 
494          */
495         ldlm_cli_pool_pop_slv(pl);
496
497         /* 
498          * Find out how many locks may be released according to shrink 
499          * policy. 
500          */
501         if (nr == 0)
502                 RETURN(ldlm_cancel_lru_estimate(ldlm_pl2ns(pl), 0, 0, 
503                                                 LDLM_CANCEL_SHRINK));
504
505         /* 
506          * Cancel @nr locks accoding to shrink policy. 
507          */
508         RETURN(ldlm_cancel_lru(ldlm_pl2ns(pl), nr, LDLM_SYNC, 
509                                LDLM_CANCEL_SHRINK));
510 }
511
512 struct ldlm_pool_ops ldlm_srv_pool_ops = {
513         .po_recalc = ldlm_srv_pool_recalc,
514         .po_shrink = ldlm_srv_pool_shrink,
515         .po_setup  = ldlm_srv_pool_setup
516 };
517
518 struct ldlm_pool_ops ldlm_cli_pool_ops = {
519         .po_recalc = ldlm_cli_pool_recalc,
520         .po_shrink = ldlm_cli_pool_shrink
521 };
522
523 /**
524  * Pool recalc wrapper. Will call either client or server pool recalc callback
525  * depending what pool \a pl is used.
526  */
527 int ldlm_pool_recalc(struct ldlm_pool *pl)
528 {
529         int count;
530
531         if (pl->pl_ops->po_recalc != NULL) {
532                 count = pl->pl_ops->po_recalc(pl);
533                 lprocfs_counter_add(pl->pl_stats, LDLM_POOL_RECALC_STAT, 
534                                     count);
535                 return count;
536         }
537         return 0;
538 }
539 EXPORT_SYMBOL(ldlm_pool_recalc);
540
541 /**
542  * Pool shrink wrapper. Will call either client or server pool recalc callback
543  * depending what pool \a pl is used.
544  */
545 int ldlm_pool_shrink(struct ldlm_pool *pl, int nr,
546                      unsigned int gfp_mask)
547 {
548         int cancel = 0;
549         
550         if (pl->pl_ops->po_shrink != NULL) {
551                 cancel = pl->pl_ops->po_shrink(pl, nr, gfp_mask);
552                 if (nr > 0) {
553                         lprocfs_counter_add(pl->pl_stats, 
554                                             LDLM_POOL_SHRINK_REQTD_STAT,
555                                             nr);
556                         lprocfs_counter_add(pl->pl_stats, 
557                                             LDLM_POOL_SHRINK_FREED_STAT,
558                                             cancel);
559                         CDEBUG(D_DLMTRACE, "%s: request to shrink %d locks, "
560                                "shrunk %d\n", pl->pl_name, nr, cancel);
561                 }
562         }
563         return cancel;
564 }
565 EXPORT_SYMBOL(ldlm_pool_shrink);
566
567 /**
568  * Pool setup wrapper. Will call either client or server pool recalc callback
569  * depending what pool \a pl is used.
570  *
571  * Sets passed \a limit into pool \a pl.
572  */
573 int ldlm_pool_setup(struct ldlm_pool *pl, int limit)
574 {
575         ENTRY;
576         if (pl->pl_ops->po_setup != NULL)
577                 RETURN(pl->pl_ops->po_setup(pl, limit));
578         RETURN(0);
579 }
580 EXPORT_SYMBOL(ldlm_pool_setup);
581
582 #ifdef __KERNEL__
583 static int lprocfs_rd_pool_state(char *page, char **start, off_t off,
584                                  int count, int *eof, void *data)
585 {
586         int granted, grant_rate, cancel_rate, grant_step;
587         int nr = 0, grant_speed, grant_plan;
588         struct ldlm_pool *pl = data;
589         __u64 slv, clv;
590         __u32 limit;
591
592         spin_lock(&pl->pl_lock);
593         slv = pl->pl_server_lock_volume;
594         clv = pl->pl_client_lock_volume;
595         limit = ldlm_pool_get_limit(pl);
596         grant_plan = pl->pl_grant_plan;
597         grant_step = pl->pl_grant_step;
598         granted = atomic_read(&pl->pl_granted);
599         grant_rate = atomic_read(&pl->pl_grant_rate);
600         grant_speed = atomic_read(&pl->pl_grant_speed);
601         cancel_rate = atomic_read(&pl->pl_cancel_rate);
602         spin_unlock(&pl->pl_lock);
603
604         nr += snprintf(page + nr, count - nr, "LDLM pool state (%s):\n",
605                        pl->pl_name);
606         nr += snprintf(page + nr, count - nr, "  SLV: "LPU64"\n", slv);
607         nr += snprintf(page + nr, count - nr, "  CLV: "LPU64"\n", clv);
608
609         nr += snprintf(page + nr, count - nr, "  LVF: %d\n",
610                        atomic_read(&pl->pl_lock_volume_factor));
611
612         nr += snprintf(page + nr, count - nr, "  GSP: %d%%\n",
613                        grant_step);
614         nr += snprintf(page + nr, count - nr, "  GP:  %d\n",
615                        grant_plan);
616         nr += snprintf(page + nr, count - nr, "  GR:  %d\n",
617                        grant_rate);
618         nr += snprintf(page + nr, count - nr, "  CR:  %d\n",
619                        cancel_rate);
620         nr += snprintf(page + nr, count - nr, "  GS:  %d\n",
621                        grant_speed);
622         nr += snprintf(page + nr, count - nr, "  G:   %d\n",
623                        granted);
624         nr += snprintf(page + nr, count - nr, "  L:   %d\n",
625                        limit);
626         return nr;
627 }
628
629 LDLM_POOL_PROC_READER(grant_plan, int);
630 LDLM_POOL_PROC_READER(grant_step, int);
631 LDLM_POOL_PROC_WRITER(grant_step, int);
632
633 static int ldlm_pool_proc_init(struct ldlm_pool *pl)
634 {
635         struct ldlm_namespace *ns = ldlm_pl2ns(pl);
636         struct proc_dir_entry *parent_ns_proc;
637         struct lprocfs_vars pool_vars[2];
638         char *var_name = NULL;
639         int rc = 0;
640         ENTRY;
641
642         OBD_ALLOC(var_name, MAX_STRING_SIZE + 1);
643         if (!var_name)
644                 RETURN(-ENOMEM);
645
646         parent_ns_proc = lprocfs_srch(ldlm_ns_proc_dir, ns->ns_name);
647         if (parent_ns_proc == NULL) {
648                 CERROR("%s: proc entry is not initialized\n",
649                        ns->ns_name);
650                 GOTO(out_free_name, rc = -EINVAL);
651         }
652         pl->pl_proc_dir = lprocfs_register("pool", parent_ns_proc,
653                                            NULL, NULL);
654         if (IS_ERR(pl->pl_proc_dir)) {
655                 CERROR("LProcFS failed in ldlm-pool-init\n");
656                 rc = PTR_ERR(pl->pl_proc_dir);
657                 GOTO(out_free_name, rc);
658         }
659
660         var_name[MAX_STRING_SIZE] = '\0';
661         memset(pool_vars, 0, sizeof(pool_vars));
662         pool_vars[0].name = var_name;
663
664         snprintf(var_name, MAX_STRING_SIZE, "server_lock_volume");
665         pool_vars[0].data = &pl->pl_server_lock_volume;
666         pool_vars[0].read_fptr = lprocfs_rd_u64;
667         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
668
669         snprintf(var_name, MAX_STRING_SIZE, "limit");
670         pool_vars[0].data = &pl->pl_limit;
671         pool_vars[0].read_fptr = lprocfs_rd_atomic;
672         pool_vars[0].write_fptr = lprocfs_wr_atomic;
673         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
674
675         snprintf(var_name, MAX_STRING_SIZE, "granted");
676         pool_vars[0].data = &pl->pl_granted;
677         pool_vars[0].read_fptr = lprocfs_rd_atomic;
678         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
679
680         snprintf(var_name, MAX_STRING_SIZE, "grant_speed");
681         pool_vars[0].data = &pl->pl_grant_speed;
682         pool_vars[0].read_fptr = lprocfs_rd_atomic;
683         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
684
685         snprintf(var_name, MAX_STRING_SIZE, "cancel_rate");
686         pool_vars[0].data = &pl->pl_cancel_rate;
687         pool_vars[0].read_fptr = lprocfs_rd_atomic;
688         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
689
690         snprintf(var_name, MAX_STRING_SIZE, "grant_rate");
691         pool_vars[0].data = &pl->pl_grant_rate;
692         pool_vars[0].read_fptr = lprocfs_rd_atomic;
693         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
694
695         snprintf(var_name, MAX_STRING_SIZE, "grant_plan");
696         pool_vars[0].data = pl;
697         pool_vars[0].read_fptr = lprocfs_rd_grant_plan;
698         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
699
700         snprintf(var_name, MAX_STRING_SIZE, "grant_step");
701         pool_vars[0].data = pl;
702         pool_vars[0].read_fptr = lprocfs_rd_grant_step;
703         if (ns_is_server(ns))
704                 pool_vars[0].write_fptr = lprocfs_wr_grant_step;
705         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
706
707         snprintf(var_name, MAX_STRING_SIZE, "lock_volume_factor");
708         pool_vars[0].data = &pl->pl_lock_volume_factor;
709         pool_vars[0].read_fptr = lprocfs_rd_atomic;
710         pool_vars[0].write_fptr = lprocfs_wr_atomic;
711         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
712
713         snprintf(var_name, MAX_STRING_SIZE, "state");
714         pool_vars[0].data = pl;
715         pool_vars[0].read_fptr = lprocfs_rd_pool_state;
716         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
717
718         pl->pl_stats = lprocfs_alloc_stats(LDLM_POOL_LAST_STAT -
719                                            LDLM_POOL_FIRST_STAT, 0);
720         if (!pl->pl_stats)
721                 GOTO(out_free_name, rc = -ENOMEM);
722
723         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANTED_STAT,
724                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
725                              "granted", "locks");
726         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANT_STAT, 
727                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
728                              "grant", "locks");
729         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_CANCEL_STAT, 
730                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
731                              "cancel", "locks");
732         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANT_RATE_STAT,
733                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
734                              "grant_rate", "locks/s");
735         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_CANCEL_RATE_STAT,
736                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
737                              "cancel_rate", "locks/s");
738         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANT_PLAN_STAT,
739                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
740                              "grant_plan", "locks/s");
741         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_SLV_STAT,
742                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
743                              "slv", "slv");
744         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_SHRINK_REQTD_STAT,
745                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
746                              "shrink_request", "locks");
747         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_SHRINK_FREED_STAT,
748                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
749                              "shrink_freed", "locks");
750         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_RECALC_STAT,
751                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
752                              "recalc_freed", "locks");
753         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_TIMING_STAT,
754                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
755                              "recalc_timing", "sec");
756         lprocfs_register_stats(pl->pl_proc_dir, "stats", pl->pl_stats);
757
758         EXIT;
759 out_free_name:
760         OBD_FREE(var_name, MAX_STRING_SIZE + 1);
761         return rc;
762 }
763
764 static void ldlm_pool_proc_fini(struct ldlm_pool *pl)
765 {
766         if (pl->pl_stats != NULL) {
767                 lprocfs_free_stats(&pl->pl_stats);
768                 pl->pl_stats = NULL;
769         }
770         if (pl->pl_proc_dir != NULL) {
771                 lprocfs_remove(&pl->pl_proc_dir);
772                 pl->pl_proc_dir = NULL;
773         }
774 }
775 #else /* !__KERNEL__*/
776 #define ldlm_pool_proc_init(pl) (0)
777 #define ldlm_pool_proc_fini(pl) while (0) {}
778 #endif
779
780 int ldlm_pool_init(struct ldlm_pool *pl, struct ldlm_namespace *ns,
781                    int idx, ldlm_side_t client)
782 {
783         int rc;
784         ENTRY;
785
786         spin_lock_init(&pl->pl_lock);
787         atomic_set(&pl->pl_granted, 0);
788         pl->pl_recalc_time = cfs_time_current_sec();
789         atomic_set(&pl->pl_lock_volume_factor, 1);
790
791         atomic_set(&pl->pl_grant_rate, 0);
792         atomic_set(&pl->pl_cancel_rate, 0);
793         atomic_set(&pl->pl_grant_speed, 0);
794         pl->pl_grant_step = LDLM_POOL_GSP;
795         pl->pl_grant_plan = LDLM_POOL_GP(LDLM_POOL_HOST_L);
796
797         snprintf(pl->pl_name, sizeof(pl->pl_name), "ldlm-pool-%s-%d",
798                  ns->ns_name, idx);
799
800         if (client == LDLM_NAMESPACE_SERVER) {
801                 pl->pl_ops = &ldlm_srv_pool_ops;
802                 ldlm_pool_set_limit(pl, LDLM_POOL_HOST_L);
803                 pl->pl_server_lock_volume = ldlm_pool_slv_max(LDLM_POOL_HOST_L);
804         } else {
805                 pl->pl_server_lock_volume = 1;
806                 ldlm_pool_set_limit(pl, 1);
807                 pl->pl_ops = &ldlm_cli_pool_ops;
808         }
809         pl->pl_client_lock_volume = 0;
810         rc = ldlm_pool_proc_init(pl);
811         if (rc)
812                 RETURN(rc);
813
814         CDEBUG(D_DLMTRACE, "Lock pool %s is initialized\n", pl->pl_name);
815
816         RETURN(rc);
817 }
818 EXPORT_SYMBOL(ldlm_pool_init);
819
820 void ldlm_pool_fini(struct ldlm_pool *pl)
821 {
822         ENTRY;
823         ldlm_pool_proc_fini(pl);
824         
825         /* 
826          * Pool should not be used after this point. We can't free it here as
827          * it lives in struct ldlm_namespace, but still interested in catching
828          * any abnormal using cases.
829          */
830         POISON(pl, 0x5a, sizeof(*pl));
831         EXIT;
832 }
833 EXPORT_SYMBOL(ldlm_pool_fini);
834
835 /**
836  * Add new taken ldlm lock \a lock into pool \a pl accounting.
837  */
838 void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock)
839 {
840         /* 
841          * FLOCK locks are special in a sense that they are almost never
842          * cancelled, instead special kind of lock is used to drop them.
843          * also there is no LRU for flock locks, so no point in tracking
844          * them anyway. 
845          */
846         if (lock->l_resource->lr_type == LDLM_FLOCK)
847                 return;
848
849         ENTRY;
850                 
851         atomic_inc(&pl->pl_granted);
852         atomic_inc(&pl->pl_grant_rate);
853         atomic_inc(&pl->pl_grant_speed);
854
855         lprocfs_counter_incr(pl->pl_stats, LDLM_POOL_GRANT_STAT);
856  
857         /* 
858          * Do not do pool recalc for client side as all locks which
859          * potentially may be canceled has already been packed into 
860          * enqueue/cancel rpc. Also we do not want to run out of stack
861          * with too long call paths. 
862          */
863         if (ns_is_server(ldlm_pl2ns(pl)))
864                 ldlm_pool_recalc(pl);
865         EXIT;
866 }
867 EXPORT_SYMBOL(ldlm_pool_add);
868
869 /**
870  * Remove ldlm lock \a lock from pool \a pl accounting.
871  */
872 void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock)
873 {
874         /*
875          * Filter out FLOCK locks. Read above comment in ldlm_pool_add().
876          */
877         if (lock->l_resource->lr_type == LDLM_FLOCK)
878                 return;
879         ENTRY;
880         LASSERT(atomic_read(&pl->pl_granted) > 0);
881         atomic_dec(&pl->pl_granted);
882         atomic_inc(&pl->pl_cancel_rate);
883         atomic_dec(&pl->pl_grant_speed);
884         
885         lprocfs_counter_incr(pl->pl_stats, LDLM_POOL_CANCEL_STAT);
886
887         if (ns_is_server(ldlm_pl2ns(pl)))
888                 ldlm_pool_recalc(pl);
889         EXIT;
890 }
891 EXPORT_SYMBOL(ldlm_pool_del);
892
893 /**
894  * Returns current \a pl SLV.
895  *
896  * \pre ->pl_lock is not locked. 
897  */
898 __u64 ldlm_pool_get_slv(struct ldlm_pool *pl)
899 {
900         __u64 slv;
901         spin_lock(&pl->pl_lock);
902         slv = pl->pl_server_lock_volume;
903         spin_unlock(&pl->pl_lock);
904         return slv;
905 }
906 EXPORT_SYMBOL(ldlm_pool_get_slv);
907
908 /**
909  * Sets passed \a slv to \a pl.
910  *
911  * \pre ->pl_lock is not locked. 
912  */
913 void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv)
914 {
915         spin_lock(&pl->pl_lock);
916         pl->pl_server_lock_volume = slv;
917         spin_unlock(&pl->pl_lock);
918 }
919 EXPORT_SYMBOL(ldlm_pool_set_slv);
920
921 /**
922  * Returns current \a pl CLV.
923  *
924  * \pre ->pl_lock is not locked. 
925  */
926 __u64 ldlm_pool_get_clv(struct ldlm_pool *pl)
927 {
928         __u64 slv;
929         spin_lock(&pl->pl_lock);
930         slv = pl->pl_client_lock_volume;
931         spin_unlock(&pl->pl_lock);
932         return slv;
933 }
934 EXPORT_SYMBOL(ldlm_pool_get_clv);
935
936 /**
937  * Sets passed \a clv to \a pl.
938  *
939  * \pre ->pl_lock is not locked. 
940  */
941 void ldlm_pool_set_clv(struct ldlm_pool *pl, __u64 clv)
942 {
943         spin_lock(&pl->pl_lock);
944         pl->pl_client_lock_volume = clv;
945         spin_unlock(&pl->pl_lock);
946 }
947 EXPORT_SYMBOL(ldlm_pool_set_clv);
948
949 /**
950  * Returns current \a pl limit.
951  */
952 __u32 ldlm_pool_get_limit(struct ldlm_pool *pl)
953 {
954         return atomic_read(&pl->pl_limit);
955 }
956 EXPORT_SYMBOL(ldlm_pool_get_limit);
957
958 /**
959  * Sets passed \a limit to \a pl.
960  */
961 void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit)
962 {
963         atomic_set(&pl->pl_limit, limit);
964 }
965 EXPORT_SYMBOL(ldlm_pool_set_limit);
966
967 /**
968  * Returns current LVF from \a pl.
969  */
970 __u32 ldlm_pool_get_lvf(struct ldlm_pool *pl)
971 {
972         return atomic_read(&pl->pl_lock_volume_factor);
973 }
974 EXPORT_SYMBOL(ldlm_pool_get_lvf);
975
976 #ifdef __KERNEL__
977 static int ldlm_pool_granted(struct ldlm_pool *pl)
978 {
979         return atomic_read(&pl->pl_granted);
980 }
981
982 static struct ptlrpc_thread *ldlm_pools_thread;
983 static struct shrinker *ldlm_pools_srv_shrinker;
984 static struct shrinker *ldlm_pools_cli_shrinker;
985 static struct completion ldlm_pools_comp;
986
987 void ldlm_pools_wakeup(void)
988 {
989         ENTRY;
990         if (ldlm_pools_thread == NULL)
991                 return;
992         ldlm_pools_thread->t_flags |= SVC_EVENT;
993         cfs_waitq_signal(&ldlm_pools_thread->t_ctl_waitq);
994         EXIT;
995 }
996 EXPORT_SYMBOL(ldlm_pools_wakeup);
997
998 /* 
999  * Cancel \a nr locks from all namespaces (if possible). Returns number of
1000  * cached locks after shrink is finished. All namespaces are asked to
1001  * cancel approximately equal amount of locks to keep balancing.
1002  */
1003 static int ldlm_pools_shrink(ldlm_side_t client, int nr, 
1004                              unsigned int gfp_mask)
1005 {
1006         int total = 0, cached = 0, nr_ns;
1007         struct ldlm_namespace *ns;
1008
1009         if (nr != 0 && !(gfp_mask & __GFP_FS))
1010                 return -1;
1011
1012         CDEBUG(D_DLMTRACE, "Request to shrink %d %s locks from all pools\n",
1013                nr, client == LDLM_NAMESPACE_CLIENT ? "client" : "server");
1014
1015         /* 
1016          * Find out how many resources we may release. 
1017          */
1018         for (nr_ns = atomic_read(ldlm_namespace_nr(client)); 
1019              nr_ns > 0; nr_ns--) 
1020         {
1021                 mutex_down(ldlm_namespace_lock(client));
1022                 if (list_empty(ldlm_namespace_list(client))) {
1023                         mutex_up(ldlm_namespace_lock(client));
1024                         return 0;
1025                 }
1026                 ns = ldlm_namespace_first_locked(client);
1027                 ldlm_namespace_get(ns);
1028                 ldlm_namespace_move_locked(ns, client);
1029                 mutex_up(ldlm_namespace_lock(client));
1030                 total += ldlm_pool_shrink(&ns->ns_pool, 0, gfp_mask);
1031                 ldlm_namespace_put(ns, 1);
1032         }
1033  
1034         if (nr == 0 || total == 0)
1035                 return total;
1036
1037         /* 
1038          * Shrink at least ldlm_namespace_nr(client) namespaces. 
1039          */
1040         for (nr_ns = atomic_read(ldlm_namespace_nr(client)); 
1041              nr_ns > 0; nr_ns--) 
1042         {
1043                 int cancel, nr_locks;
1044
1045                 /* 
1046                  * Do not call shrink under ldlm_namespace_lock(client) 
1047                  */
1048                 mutex_down(ldlm_namespace_lock(client));
1049                 if (list_empty(ldlm_namespace_list(client))) {
1050                         mutex_up(ldlm_namespace_lock(client));
1051                         /* 
1052                          * If list is empty, we can't return any @cached > 0,
1053                          * that probably would cause needless shrinker
1054                          * call. 
1055                          */
1056                         cached = 0;
1057                         break;
1058                 }
1059                 ns = ldlm_namespace_first_locked(client);
1060                 ldlm_namespace_get(ns);
1061                 ldlm_namespace_move_locked(ns, client);
1062                 mutex_up(ldlm_namespace_lock(client));
1063                 
1064                 nr_locks = ldlm_pool_granted(&ns->ns_pool);
1065                 cancel = 1 + nr_locks * nr / total;
1066                 ldlm_pool_shrink(&ns->ns_pool, cancel, gfp_mask);
1067                 cached += ldlm_pool_granted(&ns->ns_pool);
1068                 ldlm_namespace_put(ns, 1);
1069         }
1070         return cached;
1071 }
1072
1073 static int ldlm_pools_srv_shrink(int nr, unsigned int gfp_mask)
1074 {
1075         return ldlm_pools_shrink(LDLM_NAMESPACE_SERVER, nr, gfp_mask);
1076 }
1077
1078 static int ldlm_pools_cli_shrink(int nr, unsigned int gfp_mask)
1079 {
1080         return ldlm_pools_shrink(LDLM_NAMESPACE_CLIENT, nr, gfp_mask);
1081 }
1082
1083 void ldlm_pools_recalc(ldlm_side_t client)
1084 {
1085         __u32 nr_l = 0, nr_p = 0, l;
1086         struct ldlm_namespace *ns;
1087         int nr, equal = 0;
1088
1089         /* 
1090          * No need to setup pool limit for client pools.
1091          */
1092         if (client == LDLM_NAMESPACE_SERVER) {
1093                 /* 
1094                  * Check all modest namespaces first. 
1095                  */
1096                 mutex_down(ldlm_namespace_lock(client));
1097                 list_for_each_entry(ns, ldlm_namespace_list(client), 
1098                                     ns_list_chain) 
1099                 {
1100                         if (ns->ns_appetite != LDLM_NAMESPACE_MODEST)
1101                                 continue;
1102
1103                         l = ldlm_pool_granted(&ns->ns_pool);
1104                         if (l == 0)
1105                                 l = 1;
1106
1107                         /* 
1108                          * Set the modest pools limit equal to their avg granted
1109                          * locks + 5%. 
1110                          */
1111                         l += dru(l * LDLM_POOLS_MODEST_MARGIN, 100);
1112                         ldlm_pool_setup(&ns->ns_pool, l);
1113                         nr_l += l;
1114                         nr_p++;
1115                 }
1116
1117                 /* 
1118                  * Make sure that modest namespaces did not eat more that 2/3 
1119                  * of limit. 
1120                  */
1121                 if (nr_l >= 2 * (LDLM_POOL_HOST_L / 3)) {
1122                         CWARN("\"Modest\" pools eat out 2/3 of server locks "
1123                               "limit (%d of %lu). This means that you have too "
1124                               "many clients for this amount of server RAM. "
1125                               "Upgrade server!\n", nr_l, LDLM_POOL_HOST_L);
1126                         equal = 1;
1127                 }
1128
1129                 /* 
1130                  * The rest is given to greedy namespaces. 
1131                  */
1132                 list_for_each_entry(ns, ldlm_namespace_list(client), 
1133                                     ns_list_chain) 
1134                 {
1135                         if (!equal && ns->ns_appetite != LDLM_NAMESPACE_GREEDY)
1136                                 continue;
1137
1138                         if (equal) {
1139                                 /* 
1140                                  * In the case 2/3 locks are eaten out by
1141                                  * modest pools, we re-setup equal limit
1142                                  * for _all_ pools. 
1143                                  */
1144                                 l = LDLM_POOL_HOST_L /
1145                                         atomic_read(ldlm_namespace_nr(client));
1146                         } else {
1147                                 /* 
1148                                  * All the rest of greedy pools will have
1149                                  * all locks in equal parts.
1150                                  */
1151                                 l = (LDLM_POOL_HOST_L - nr_l) /
1152                                         (atomic_read(ldlm_namespace_nr(client)) -
1153                                          nr_p);
1154                         }
1155                         ldlm_pool_setup(&ns->ns_pool, l);
1156                 }
1157                 mutex_up(ldlm_namespace_lock(client));
1158         }
1159
1160         /* 
1161          * Recalc at least ldlm_namespace_nr(client) namespaces. 
1162          */
1163         for (nr = atomic_read(ldlm_namespace_nr(client)); nr > 0; nr--) {
1164                 /* 
1165                  * Lock the list, get first @ns in the list, getref, move it
1166                  * to the tail, unlock and call pool recalc. This way we avoid
1167                  * calling recalc under @ns lock what is really good as we get
1168                  * rid of potential deadlock on client nodes when canceling
1169                  * locks synchronously. 
1170                  */
1171                 mutex_down(ldlm_namespace_lock(client));
1172                 if (list_empty(ldlm_namespace_list(client))) {
1173                         mutex_up(ldlm_namespace_lock(client));
1174                         break;
1175                 }
1176                 ns = ldlm_namespace_first_locked(client);
1177                 ldlm_namespace_get(ns);
1178                 ldlm_namespace_move_locked(ns, client);
1179                 mutex_up(ldlm_namespace_lock(client));
1180
1181                 /* 
1182                  * After setup is done - recalc the pool. 
1183                  */
1184                 ldlm_pool_recalc(&ns->ns_pool);
1185                 ldlm_namespace_put(ns, 1);
1186         }
1187 }
1188 EXPORT_SYMBOL(ldlm_pools_recalc);
1189
1190 static int ldlm_pools_thread_main(void *arg)
1191 {
1192         struct ptlrpc_thread *thread = (struct ptlrpc_thread *)arg;
1193         char *t_name = "ldlm_poold";
1194         ENTRY;
1195
1196         cfs_daemonize(t_name);
1197         thread->t_flags = SVC_RUNNING;
1198         cfs_waitq_signal(&thread->t_ctl_waitq);
1199
1200         CDEBUG(D_DLMTRACE, "%s: pool thread starting, process %d\n",
1201                t_name, cfs_curproc_pid());
1202
1203         while (1) {
1204                 struct l_wait_info lwi;
1205
1206                 /*
1207                  * Recal all pools on this tick. 
1208                  */
1209                 ldlm_pools_recalc(LDLM_NAMESPACE_SERVER);
1210                 ldlm_pools_recalc(LDLM_NAMESPACE_CLIENT);
1211                 
1212                 /*
1213                  * Wait until the next check time, or until we're
1214                  * stopped. 
1215                  */
1216                 lwi = LWI_TIMEOUT(cfs_time_seconds(LDLM_POOLS_THREAD_PERIOD),
1217                                   NULL, NULL);
1218                 l_wait_event(thread->t_ctl_waitq, (thread->t_flags &
1219                                                    (SVC_STOPPING|SVC_EVENT)),
1220                              &lwi);
1221
1222                 if (thread->t_flags & SVC_STOPPING) {
1223                         thread->t_flags &= ~SVC_STOPPING;
1224                         break;
1225                 } else if (thread->t_flags & SVC_EVENT) {
1226                         thread->t_flags &= ~SVC_EVENT;
1227                 }
1228         }
1229
1230         thread->t_flags = SVC_STOPPED;
1231         cfs_waitq_signal(&thread->t_ctl_waitq);
1232
1233         CDEBUG(D_DLMTRACE, "%s: pool thread exiting, process %d\n",
1234                t_name, cfs_curproc_pid());
1235
1236         complete_and_exit(&ldlm_pools_comp, 0);
1237 }
1238
1239 static int ldlm_pools_thread_start(void)
1240 {
1241         struct l_wait_info lwi = { 0 };
1242         int rc;
1243         ENTRY;
1244
1245         if (ldlm_pools_thread != NULL)
1246                 RETURN(-EALREADY);
1247
1248         OBD_ALLOC_PTR(ldlm_pools_thread);
1249         if (ldlm_pools_thread == NULL)
1250                 RETURN(-ENOMEM);
1251
1252         init_completion(&ldlm_pools_comp);
1253         cfs_waitq_init(&ldlm_pools_thread->t_ctl_waitq);
1254
1255         /* 
1256          * CLONE_VM and CLONE_FILES just avoid a needless copy, because we
1257          * just drop the VM and FILES in ptlrpc_daemonize() right away. 
1258          */
1259         rc = cfs_kernel_thread(ldlm_pools_thread_main, ldlm_pools_thread,
1260                                CLONE_VM | CLONE_FILES);
1261         if (rc < 0) {
1262                 CERROR("Can't start pool thread, error %d\n",
1263                        rc);
1264                 OBD_FREE(ldlm_pools_thread, sizeof(*ldlm_pools_thread));
1265                 ldlm_pools_thread = NULL;
1266                 RETURN(rc);
1267         }
1268         l_wait_event(ldlm_pools_thread->t_ctl_waitq,
1269                      (ldlm_pools_thread->t_flags & SVC_RUNNING), &lwi);
1270         RETURN(0);
1271 }
1272
1273 static void ldlm_pools_thread_stop(void)
1274 {
1275         ENTRY;
1276
1277         if (ldlm_pools_thread == NULL) {
1278                 EXIT;
1279                 return;
1280         }
1281
1282         ldlm_pools_thread->t_flags = SVC_STOPPING;
1283         cfs_waitq_signal(&ldlm_pools_thread->t_ctl_waitq);
1284
1285         /* 
1286          * Make sure that pools thread is finished before freeing @thread.
1287          * This fixes possible race and oops due to accessing freed memory
1288          * in pools thread. 
1289          */
1290         wait_for_completion(&ldlm_pools_comp);
1291         OBD_FREE_PTR(ldlm_pools_thread);
1292         ldlm_pools_thread = NULL;
1293         EXIT;
1294 }
1295
1296 int ldlm_pools_init(void)
1297 {
1298         int rc;
1299         ENTRY;
1300
1301         rc = ldlm_pools_thread_start();
1302         if (rc == 0) {
1303                 ldlm_pools_srv_shrinker = set_shrinker(DEFAULT_SEEKS,
1304                                                        ldlm_pools_srv_shrink);
1305                 ldlm_pools_cli_shrinker = set_shrinker(DEFAULT_SEEKS,
1306                                                        ldlm_pools_cli_shrink);
1307         }
1308         RETURN(rc);
1309 }
1310 EXPORT_SYMBOL(ldlm_pools_init);
1311
1312 void ldlm_pools_fini(void)
1313 {
1314         if (ldlm_pools_srv_shrinker != NULL) {
1315                 remove_shrinker(ldlm_pools_srv_shrinker);
1316                 ldlm_pools_srv_shrinker = NULL;
1317         }
1318         if (ldlm_pools_cli_shrinker != NULL) {
1319                 remove_shrinker(ldlm_pools_cli_shrinker);
1320                 ldlm_pools_cli_shrinker = NULL;
1321         }
1322         ldlm_pools_thread_stop();
1323 }
1324 EXPORT_SYMBOL(ldlm_pools_fini);
1325 #endif /* __KERNEL__ */
1326
1327 #else /* !HAVE_LRU_RESIZE_SUPPORT */
1328 int ldlm_pool_setup(struct ldlm_pool *pl, int limit)
1329 {
1330         return 0;
1331 }
1332 EXPORT_SYMBOL(ldlm_pool_setup);
1333
1334 int ldlm_pool_recalc(struct ldlm_pool *pl)
1335 {
1336         return 0;
1337 }
1338 EXPORT_SYMBOL(ldlm_pool_recalc);
1339
1340 int ldlm_pool_shrink(struct ldlm_pool *pl,
1341                      int nr, unsigned int gfp_mask)
1342 {
1343         return 0;
1344 }
1345 EXPORT_SYMBOL(ldlm_pool_shrink);
1346
1347 int ldlm_pool_init(struct ldlm_pool *pl, struct ldlm_namespace *ns,
1348                    int idx, ldlm_side_t client)
1349 {
1350         return 0;
1351 }
1352 EXPORT_SYMBOL(ldlm_pool_init);
1353
1354 void ldlm_pool_fini(struct ldlm_pool *pl)
1355 {
1356         return;
1357 }
1358 EXPORT_SYMBOL(ldlm_pool_fini);
1359
1360 void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock)
1361 {
1362         return;
1363 }
1364 EXPORT_SYMBOL(ldlm_pool_add);
1365
1366 void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock)
1367 {
1368         return;
1369 }
1370 EXPORT_SYMBOL(ldlm_pool_del);
1371
1372 __u64 ldlm_pool_get_slv(struct ldlm_pool *pl)
1373 {
1374         return 1;
1375 }
1376 EXPORT_SYMBOL(ldlm_pool_get_slv);
1377
1378 void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv)
1379 {
1380         return;
1381 }
1382 EXPORT_SYMBOL(ldlm_pool_set_slv);
1383
1384 __u64 ldlm_pool_get_clv(struct ldlm_pool *pl)
1385 {
1386         return 1;
1387 }
1388 EXPORT_SYMBOL(ldlm_pool_get_clv);
1389
1390 void ldlm_pool_set_clv(struct ldlm_pool *pl, __u64 clv)
1391 {
1392         return;
1393 }
1394 EXPORT_SYMBOL(ldlm_pool_set_clv);
1395
1396 __u32 ldlm_pool_get_limit(struct ldlm_pool *pl)
1397 {
1398         return 0;
1399 }
1400 EXPORT_SYMBOL(ldlm_pool_get_limit);
1401
1402 void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit)
1403 {
1404         return;
1405 }
1406 EXPORT_SYMBOL(ldlm_pool_set_limit);
1407
1408 __u32 ldlm_pool_get_lvf(struct ldlm_pool *pl)
1409 {
1410         return 0;
1411 }
1412 EXPORT_SYMBOL(ldlm_pool_get_lvf);
1413
1414 int ldlm_pools_init(void)
1415 {
1416         return 0;
1417 }
1418 EXPORT_SYMBOL(ldlm_pools_init);
1419
1420 void ldlm_pools_fini(void)
1421 {
1422         return;
1423 }
1424 EXPORT_SYMBOL(ldlm_pools_fini);
1425
1426 void ldlm_pools_wakeup(void)
1427 {
1428         return;
1429 }
1430 EXPORT_SYMBOL(ldlm_pools_wakeup);
1431
1432 void ldlm_pools_recalc(ldlm_side_t client)
1433 {
1434         return;
1435 }
1436 EXPORT_SYMBOL(ldlm_pools_recalc);
1437 #endif /* HAVE_LRU_RESIZE_SUPPORT */