Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ldlm / ldlm_pool.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2007 Cluster File Systems, Inc.
5  *   Author: Yury Umanets <umka@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  */
25
26 /* 
27  * Idea of this code is rather simple. Each second, for each server namespace
28  * we have SLV - server lock volume which is calculated on current number of
29  * granted locks, grant speed for past period, etc - that is, locking load.
30  * This SLV number may be thought as a flow definition for simplicity. It is
31  * sent to clients with each occasion to let them know what is current load
32  * situation on the server. By default, at the beginning, SLV on server is
33  * set max value which is calculated as the following: allow to one client
34  * have all locks of limit ->pl_limit for 10h.
35  *
36  * Next, on clients, number of cached locks is not limited artificially in any
37  * way as it was before. Instead, client calculates CLV, that is, client lock
38  * volume for each lock and compares it with last SLV from the server. CLV is
39  * calculated as the number of locks in LRU * lock live time in seconds. If
40  * CLV > SLV - lock is canceled.
41  *
42  * Client has LVF, that is, lock volume factor which regulates how much sensitive
43  * client should be about last SLV from server. The higher LVF is the more locks
44  * will be canceled on client. Default value for it is 1. Setting LVF to 2 means
45  * that client will cancel locks 2 times faster.
46  *
47  * Locks on a client will be canceled more intensively in these cases:
48  * (1) if SLV is smaller, that is, load is higher on the server;
49  * (2) client has a lot of locks (the more locks are held by client, the bigger
50  *     chances that some of them should be canceled);
51  * (3) client has old locks (taken some time ago);
52  *
53  * Thus, according to flow paradigm that we use for better understanding SLV,
54  * CLV is the volume of particle in flow described by SLV. According to this,
55  * if flow is getting thinner, more and more particles become outside of it and
56  * as particles are locks, they should be canceled.
57  *
58  * General idea of this belongs to Vitaly Fertman (vitaly@clusterfs.com). Andreas
59  * Dilger (adilger@clusterfs.com) proposed few nice ideas like using LVF and many
60  * cleanups. Flow definition to allow more easy understanding of the logic belongs
61  * to Nikita Danilov (nikita@clusterfs.com) as well as many cleanups and fixes.
62  * And design and implementation are done by Yury Umanets (umka@clusterfs.com).
63  *
64  * Glossary for terms used:
65  *
66  * pl_limit - Number of allowed locks in pool. Applies to server and client
67  * side (tunable);
68  *
69  * pl_granted - Number of granted locks (calculated);
70  * pl_grant_rate - Number of granted locks for last T (calculated);
71  * pl_cancel_rate - Number of canceled locks for last T (calculated);
72  * pl_grant_speed - Grant speed (GR - CR) for last T (calculated);
73  * pl_grant_plan - Planned number of granted locks for next T (calculated);
74  *
75  * pl_grant_step - Grant plan step, that is how ->pl_grant_plan
76  * will change in next T (tunable);
77  *
78  * pl_server_lock_volume - Current server lock volume (calculated);
79  *
80  * As it may be seen from list above, we have few possible tunables which may
81  * affect behavior much. They all may be modified via proc. However, they also
82  * give a possibility for constructing few pre-defined behavior policies. If
83  * none of predefines is suitable for a working pattern being used, new one may
84  * be "constructed" via proc tunables.
85  */
86
87 #define DEBUG_SUBSYSTEM S_LDLM
88
89 #ifdef __KERNEL__
90 # include <lustre_dlm.h>
91 #else
92 # include <liblustre.h>
93 #endif
94
95 #include <obd_class.h>
96 #include <obd_support.h>
97 #include "ldlm_internal.h"
98
99 #ifdef HAVE_LRU_RESIZE_SUPPORT
100
101 /*
102  * 50 ldlm locks for 1MB of RAM. 
103  */
104 #define LDLM_POOL_HOST_L ((num_physpages >> (20 - CFS_PAGE_SHIFT)) * 50)
105
106 /*
107  * Default step in % for grant plan. 
108  */
109 #define LDLM_POOL_GSP (10)
110
111 /* 
112  * LDLM_POOL_GSP% of all locks is default GP. 
113  */
114 #define LDLM_POOL_GP(L)   (((L) * LDLM_POOL_GSP) / 100)
115
116 /* 
117  * Max age for locks on clients. 
118  */
119 #define LDLM_POOL_MAX_AGE (36000)
120
121 #ifdef __KERNEL__
122 extern cfs_proc_dir_entry_t *ldlm_ns_proc_dir;
123 #endif
124
125 #define avg(src, add) \
126         ((src) = ((src) + (add)) / 2)
127
128 static inline __u64 dru(__u64 val, __u32 div)
129 {
130         __u64 ret = val + (div - 1);
131         do_div(ret, div);
132         return ret;
133 }
134
135 static inline __u64 ldlm_pool_slv_max(__u32 L)
136 {
137         /*
138          * Allow to have all locks for 1 client for 10 hrs.
139          * Formula is the following: limit * 10h / 1 client. 
140          */
141         __u64 lim = L *  LDLM_POOL_MAX_AGE / 1;
142         return lim;
143 }
144
145 static inline __u64 ldlm_pool_slv_min(__u32 L)
146 {
147         return 1;
148 }
149
150 enum {
151         LDLM_POOL_FIRST_STAT = 0,
152         LDLM_POOL_GRANTED_STAT = LDLM_POOL_FIRST_STAT,
153         LDLM_POOL_GRANT_STAT,
154         LDLM_POOL_CANCEL_STAT,
155         LDLM_POOL_GRANT_RATE_STAT,
156         LDLM_POOL_CANCEL_RATE_STAT,
157         LDLM_POOL_GRANT_PLAN_STAT,
158         LDLM_POOL_SLV_STAT,
159         LDLM_POOL_SHRINK_REQTD_STAT,
160         LDLM_POOL_SHRINK_FREED_STAT,
161         LDLM_POOL_RECALC_STAT,
162         LDLM_POOL_TIMING_STAT,
163         LDLM_POOL_LAST_STAT
164 };
165
166 static inline struct ldlm_namespace *ldlm_pl2ns(struct ldlm_pool *pl)
167 {
168         return container_of(pl, struct ldlm_namespace, ns_pool);
169 }
170
171 /**
172  * Recalculates next grant limit on passed \a pl.
173  *
174  * \pre ->pl_lock is locked. 
175  */
176 static inline void ldlm_pool_recalc_grant_plan(struct ldlm_pool *pl)
177 {
178         int granted, grant_step, limit;
179         
180         limit = ldlm_pool_get_limit(pl);
181         granted = atomic_read(&pl->pl_granted);
182
183         grant_step = ((limit - granted) * pl->pl_grant_step) / 100;
184         pl->pl_grant_plan = granted + grant_step;
185 }
186
187 /**
188  * Recalculates next SLV on passed \a pl.
189  *
190  * \pre ->pl_lock is locked. 
191  */
192 static inline void ldlm_pool_recalc_slv(struct ldlm_pool *pl)
193 {
194         int grant_usage, granted, grant_plan;
195         __u64 slv, slv_factor;
196         __u32 limit;
197
198         slv = pl->pl_server_lock_volume;
199         grant_plan = pl->pl_grant_plan;
200         limit = ldlm_pool_get_limit(pl);
201         granted = atomic_read(&pl->pl_granted);
202
203         grant_usage = limit - (granted - grant_plan);
204         if (grant_usage <= 0)
205                 grant_usage = 1;
206
207         /* 
208          * Find out SLV change factor which is the ratio of grant usage 
209          * from limit. SLV changes as fast as the ratio of grant plan 
210          * consumtion. The more locks from grant plan are not consumed 
211          * by clients in last interval (idle time), the faster grows 
212          * SLV. And the opposite, the more grant plan is over-consumed
213          * (load time) the faster drops SLV. 
214          */
215         slv_factor = (grant_usage * 100) / limit;
216         if (2 * abs(granted - limit) > limit) {
217                 slv_factor *= slv_factor;
218                 slv_factor = dru(slv_factor, 100);
219         }
220         slv = slv * slv_factor;
221         slv = dru(slv, 100);
222
223         if (slv > ldlm_pool_slv_max(limit)) {
224                 slv = ldlm_pool_slv_max(limit);
225         } else if (slv < ldlm_pool_slv_min(limit)) {
226                 slv = ldlm_pool_slv_min(limit);
227         }
228
229         pl->pl_server_lock_volume = slv;
230 }
231
232 /**
233  * Recalculates next stats on passed \a pl.
234  *
235  * \pre ->pl_lock is locked. 
236  */
237 static inline void ldlm_pool_recalc_stats(struct ldlm_pool *pl)
238 {
239         int grant_plan = pl->pl_grant_plan;
240         __u64 slv = pl->pl_server_lock_volume;
241         int granted = atomic_read(&pl->pl_granted);
242         int grant_rate = atomic_read(&pl->pl_grant_rate);
243         int cancel_rate = atomic_read(&pl->pl_cancel_rate);
244
245         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_SLV_STAT, 
246                             slv);
247         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANTED_STAT,
248                             granted);
249         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANT_RATE_STAT,
250                             grant_rate);
251         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANT_PLAN_STAT,
252                             grant_plan);
253         lprocfs_counter_add(pl->pl_stats, LDLM_POOL_CANCEL_RATE_STAT,
254                             cancel_rate);
255 }
256
257 /**
258  * Sets current SLV into obd accessible via ldlm_pl2ns(pl)->ns_obd.
259  */
260 static void ldlm_srv_pool_push_slv(struct ldlm_pool *pl)
261 {
262         struct obd_device *obd;
263
264         /* 
265          * Set new SLV in obd field for using it later without accessing the
266          * pool. This is required to avoid race between sending reply to client
267          * with new SLV and cleanup server stack in which we can't guarantee
268          * that namespace is still alive. We know only that obd is alive as
269          * long as valid export is alive. 
270          */
271         obd = ldlm_pl2ns(pl)->ns_obd;
272         LASSERT(obd != NULL);
273         write_lock(&obd->obd_pool_lock);
274         obd->obd_pool_slv = pl->pl_server_lock_volume;
275         write_unlock(&obd->obd_pool_lock);
276 }
277
278 /**
279  * Recalculates all pool fields on passed \a pl.
280  *
281  * \pre ->pl_lock is not locked. 
282  */
283 static int ldlm_srv_pool_recalc(struct ldlm_pool *pl)
284 {
285         time_t recalc_interval_sec;
286         ENTRY;
287
288         spin_lock(&pl->pl_lock);
289         recalc_interval_sec = cfs_time_current_sec() - pl->pl_recalc_time;
290         if (recalc_interval_sec > 0) {
291                 /* 
292                  * Update statistics.
293                  */
294                 ldlm_pool_recalc_stats(pl);
295
296                 /* 
297                  * Recalc SLV after last period. This should be done
298                  * _before_ recalculating new grant plan. 
299                  */
300                 ldlm_pool_recalc_slv(pl);
301                 
302                 /* 
303                  * Make sure that pool informed obd of last SLV changes. 
304                  */
305                 ldlm_srv_pool_push_slv(pl);
306
307                 /* 
308                  * Update grant_plan for new period. 
309                  */
310                 ldlm_pool_recalc_grant_plan(pl);
311
312                 /* 
313                  * Zero out all rates and speed for the last period. 
314                  */
315                 atomic_set(&pl->pl_grant_rate, 0);
316                 atomic_set(&pl->pl_cancel_rate, 0);
317                 atomic_set(&pl->pl_grant_speed, 0);
318                 pl->pl_recalc_time = cfs_time_current_sec();
319                 lprocfs_counter_add(pl->pl_stats, LDLM_POOL_TIMING_STAT, 
320                                     recalc_interval_sec);
321         }
322         spin_unlock(&pl->pl_lock);
323         RETURN(0);
324 }
325
326 /**
327  * This function is used on server side as main entry point for memory
328  * preasure handling. It decreases SLV on \a pl according to passed
329  * \a nr and \a gfp_mask.
330  * 
331  * Our goal here is to decrease SLV such a way that clients hold \a nr
332  * locks smaller in next 10h. 
333  */
334 static int ldlm_srv_pool_shrink(struct ldlm_pool *pl,
335                                 int nr, unsigned int gfp_mask)
336 {
337         __u32 limit;
338         ENTRY;
339
340         /* 
341          * VM is asking how many entries may be potentially freed. 
342          */
343         if (nr == 0)
344                 RETURN(atomic_read(&pl->pl_granted));
345
346         /* 
347          * Client already canceled locks but server is already in shrinker
348          * and can't cancel anything. Let's catch this race. 
349          */
350         if (atomic_read(&pl->pl_granted) == 0)
351                 RETURN(0);
352
353         spin_lock(&pl->pl_lock);
354
355         /* 
356          * We want shrinker to possibly cause cancelation of @nr locks from
357          * clients or grant approximately @nr locks smaller next intervals.
358          *
359          * This is why we decresed SLV by @nr. This effect will only be as
360          * long as one re-calc interval (1s these days) and this should be
361          * enough to pass this decreased SLV to all clients. On next recalc
362          * interval pool will either increase SLV if locks load is not high
363          * or will keep on same level or even decrease again, thus, shrinker
364          * decreased SLV will affect next recalc intervals and this way will
365          * make locking load lower. 
366          */
367         if (nr < pl->pl_server_lock_volume) {
368                 pl->pl_server_lock_volume = pl->pl_server_lock_volume - nr;
369         } else {
370                 limit = ldlm_pool_get_limit(pl);
371                 pl->pl_server_lock_volume = ldlm_pool_slv_min(limit);
372         }
373
374         /* 
375          * Make sure that pool informed obd of last SLV changes. 
376          */
377         ldlm_srv_pool_push_slv(pl);
378         spin_unlock(&pl->pl_lock);
379
380         /* 
381          * We did not really free any memory here so far, it only will be
382          * freed later may be, so that we return 0 to not confuse VM. 
383          */
384         RETURN(0);
385 }
386
387 /**
388  * Setup server side pool \a pl with passed \a limit.
389  */
390 static int ldlm_srv_pool_setup(struct ldlm_pool *pl, int limit)
391 {
392         struct obd_device *obd;
393         ENTRY;
394         
395         obd = ldlm_pl2ns(pl)->ns_obd;
396         LASSERT(obd != NULL && obd != LP_POISON);
397         LASSERT(obd->obd_type != LP_POISON);
398         write_lock(&obd->obd_pool_lock);
399         obd->obd_pool_limit = limit;
400         write_unlock(&obd->obd_pool_lock);
401
402         ldlm_pool_set_limit(pl, limit);
403         RETURN(0);
404 }
405
406 /**
407  * Sets SLV and Limit from ldlm_pl2ns(pl)->ns_obd tp passed \a pl.
408  */
409 static void ldlm_cli_pool_pop_slv(struct ldlm_pool *pl)
410 {
411         struct obd_device *obd;
412
413         /* 
414          * Get new SLV and Limit from obd which is updated with comming 
415          * RPCs. 
416          */
417         obd = ldlm_pl2ns(pl)->ns_obd;
418         LASSERT(obd != NULL);
419         read_lock(&obd->obd_pool_lock);
420         pl->pl_server_lock_volume = obd->obd_pool_slv;
421         ldlm_pool_set_limit(pl, obd->obd_pool_limit);
422         read_unlock(&obd->obd_pool_lock);
423 }
424
425 /**
426  * Recalculates client sise pool \a pl according to current SLV and Limit.
427  */
428 static int ldlm_cli_pool_recalc(struct ldlm_pool *pl)
429 {
430         time_t recalc_interval_sec;
431         ENTRY;
432
433         spin_lock(&pl->pl_lock);
434
435         /* 
436          * Make sure that pool knows last SLV and Limit from obd. 
437          */
438         ldlm_cli_pool_pop_slv(pl);
439
440         recalc_interval_sec = cfs_time_current_sec() - pl->pl_recalc_time;
441         if (recalc_interval_sec > 0) {
442                 /* 
443                  * Update statistics only every T. 
444                  */
445                 ldlm_pool_recalc_stats(pl);
446
447                 /* 
448                  * Zero out grant/cancel rates and speed for last period. 
449                  */
450                 atomic_set(&pl->pl_grant_rate, 0);
451                 atomic_set(&pl->pl_cancel_rate, 0);
452                 atomic_set(&pl->pl_grant_speed, 0);
453                 pl->pl_recalc_time = cfs_time_current_sec();
454                 lprocfs_counter_add(pl->pl_stats, LDLM_POOL_TIMING_STAT, 
455                                     recalc_interval_sec);
456         }
457         spin_unlock(&pl->pl_lock);
458
459         /* 
460          * Do not cancel locks in case lru resize is disabled for this ns. 
461          */
462         if (!ns_connect_lru_resize(ldlm_pl2ns(pl)))
463                 RETURN(0);
464
465         /* 
466          * In the time of canceling locks on client we do not need to maintain
467          * sharp timing, we only want to cancel locks asap according to new SLV.
468          * It may be called when SLV has changed much, this is why we do not
469          * take into account pl->pl_recalc_time here. 
470          */
471         RETURN(ldlm_cancel_lru(ldlm_pl2ns(pl), 0, LDLM_ASYNC, 
472                                LDLM_CANCEL_LRUR));
473 }
474
475 /**
476  * This function is main entry point for memory preasure handling on client side.
477  * Main goal of this function is to cancel some number of locks on passed \a pl
478  * according to \a nr and \a gfp_mask.
479  */
480 static int ldlm_cli_pool_shrink(struct ldlm_pool *pl,
481                                 int nr, unsigned int gfp_mask)
482 {
483         ENTRY;
484         
485         /* 
486          * Do not cancel locks in case lru resize is disabled for this ns. 
487          */
488         if (!ns_connect_lru_resize(ldlm_pl2ns(pl)))
489                 RETURN(0);
490
491         /* 
492          * Make sure that pool knows last SLV and Limit from obd. 
493          */
494         ldlm_cli_pool_pop_slv(pl);
495
496         /* 
497          * Find out how many locks may be released according to shrink 
498          * policy. 
499          */
500         if (nr == 0)
501                 RETURN(ldlm_cancel_lru_estimate(ldlm_pl2ns(pl), 0, 0, 
502                                                 LDLM_CANCEL_SHRINK));
503
504         /* 
505          * Cancel @nr locks accoding to shrink policy. 
506          */
507         RETURN(ldlm_cancel_lru(ldlm_pl2ns(pl), nr, LDLM_SYNC, 
508                                LDLM_CANCEL_SHRINK));
509 }
510
511 struct ldlm_pool_ops ldlm_srv_pool_ops = {
512         .po_recalc = ldlm_srv_pool_recalc,
513         .po_shrink = ldlm_srv_pool_shrink,
514         .po_setup  = ldlm_srv_pool_setup
515 };
516
517 struct ldlm_pool_ops ldlm_cli_pool_ops = {
518         .po_recalc = ldlm_cli_pool_recalc,
519         .po_shrink = ldlm_cli_pool_shrink
520 };
521
522 /**
523  * Pool recalc wrapper. Will call either client or server pool recalc callback
524  * depending what pool \a pl is used.
525  */
526 int ldlm_pool_recalc(struct ldlm_pool *pl)
527 {
528         int count;
529
530         if (pl->pl_ops->po_recalc != NULL) {
531                 count = pl->pl_ops->po_recalc(pl);
532                 lprocfs_counter_add(pl->pl_stats, LDLM_POOL_RECALC_STAT, 
533                                     count);
534                 return count;
535         }
536         return 0;
537 }
538 EXPORT_SYMBOL(ldlm_pool_recalc);
539
540 /**
541  * Pool shrink wrapper. Will call either client or server pool recalc callback
542  * depending what pool \a pl is used.
543  */
544 int ldlm_pool_shrink(struct ldlm_pool *pl, int nr,
545                      unsigned int gfp_mask)
546 {
547         int cancel = 0;
548         
549         if (pl->pl_ops->po_shrink != NULL) {
550                 cancel = pl->pl_ops->po_shrink(pl, nr, gfp_mask);
551                 if (nr > 0) {
552                         lprocfs_counter_add(pl->pl_stats, 
553                                             LDLM_POOL_SHRINK_REQTD_STAT,
554                                             nr);
555                         lprocfs_counter_add(pl->pl_stats, 
556                                             LDLM_POOL_SHRINK_FREED_STAT,
557                                             cancel);
558                         CDEBUG(D_DLMTRACE, "%s: request to shrink %d locks, "
559                                "shrunk %d\n", pl->pl_name, nr, cancel);
560                 }
561         }
562         return cancel;
563 }
564 EXPORT_SYMBOL(ldlm_pool_shrink);
565
566 /**
567  * Pool setup wrapper. Will call either client or server pool recalc callback
568  * depending what pool \a pl is used.
569  *
570  * Sets passed \a limit into pool \a pl.
571  */
572 int ldlm_pool_setup(struct ldlm_pool *pl, int limit)
573 {
574         ENTRY;
575         if (pl->pl_ops->po_setup != NULL)
576                 RETURN(pl->pl_ops->po_setup(pl, limit));
577         RETURN(0);
578 }
579 EXPORT_SYMBOL(ldlm_pool_setup);
580
581 #ifdef __KERNEL__
582 static int lprocfs_rd_pool_state(char *page, char **start, off_t off,
583                                  int count, int *eof, void *data)
584 {
585         int granted, grant_rate, cancel_rate, grant_step;
586         int nr = 0, grant_speed, grant_plan;
587         struct ldlm_pool *pl = data;
588         __u64 slv, clv;
589         __u32 limit;
590
591         spin_lock(&pl->pl_lock);
592         slv = pl->pl_server_lock_volume;
593         clv = pl->pl_client_lock_volume;
594         limit = ldlm_pool_get_limit(pl);
595         grant_plan = pl->pl_grant_plan;
596         grant_step = pl->pl_grant_step;
597         granted = atomic_read(&pl->pl_granted);
598         grant_rate = atomic_read(&pl->pl_grant_rate);
599         grant_speed = atomic_read(&pl->pl_grant_speed);
600         cancel_rate = atomic_read(&pl->pl_cancel_rate);
601         spin_unlock(&pl->pl_lock);
602
603         nr += snprintf(page + nr, count - nr, "LDLM pool state (%s):\n",
604                        pl->pl_name);
605         nr += snprintf(page + nr, count - nr, "  SLV: "LPU64"\n", slv);
606         nr += snprintf(page + nr, count - nr, "  CLV: "LPU64"\n", clv);
607
608         nr += snprintf(page + nr, count - nr, "  LVF: %d\n",
609                        atomic_read(&pl->pl_lock_volume_factor));
610
611         nr += snprintf(page + nr, count - nr, "  GSP: %d%%\n",
612                        grant_step);
613         nr += snprintf(page + nr, count - nr, "  GP:  %d\n",
614                        grant_plan);
615         nr += snprintf(page + nr, count - nr, "  GR:  %d\n",
616                        grant_rate);
617         nr += snprintf(page + nr, count - nr, "  CR:  %d\n",
618                        cancel_rate);
619         nr += snprintf(page + nr, count - nr, "  GS:  %d\n",
620                        grant_speed);
621         nr += snprintf(page + nr, count - nr, "  G:   %d\n",
622                        granted);
623         nr += snprintf(page + nr, count - nr, "  L:   %d\n",
624                        limit);
625         return nr;
626 }
627
628 LDLM_POOL_PROC_READER(grant_plan, int);
629 LDLM_POOL_PROC_READER(grant_step, int);
630 LDLM_POOL_PROC_WRITER(grant_step, int);
631
632 static int ldlm_pool_proc_init(struct ldlm_pool *pl)
633 {
634         struct ldlm_namespace *ns = ldlm_pl2ns(pl);
635         struct proc_dir_entry *parent_ns_proc;
636         struct lprocfs_vars pool_vars[2];
637         char *var_name = NULL;
638         int rc = 0;
639         ENTRY;
640
641         OBD_ALLOC(var_name, MAX_STRING_SIZE + 1);
642         if (!var_name)
643                 RETURN(-ENOMEM);
644
645         parent_ns_proc = lprocfs_srch(ldlm_ns_proc_dir, ns->ns_name);
646         if (parent_ns_proc == NULL) {
647                 CERROR("%s: proc entry is not initialized\n",
648                        ns->ns_name);
649                 GOTO(out_free_name, rc = -EINVAL);
650         }
651         pl->pl_proc_dir = lprocfs_register("pool", parent_ns_proc,
652                                            NULL, NULL);
653         if (IS_ERR(pl->pl_proc_dir)) {
654                 CERROR("LProcFS failed in ldlm-pool-init\n");
655                 rc = PTR_ERR(pl->pl_proc_dir);
656                 GOTO(out_free_name, rc);
657         }
658
659         var_name[MAX_STRING_SIZE] = '\0';
660         memset(pool_vars, 0, sizeof(pool_vars));
661         pool_vars[0].name = var_name;
662
663         snprintf(var_name, MAX_STRING_SIZE, "server_lock_volume");
664         pool_vars[0].data = &pl->pl_server_lock_volume;
665         pool_vars[0].read_fptr = lprocfs_rd_u64;
666         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
667
668         snprintf(var_name, MAX_STRING_SIZE, "limit");
669         pool_vars[0].data = &pl->pl_limit;
670         pool_vars[0].read_fptr = lprocfs_rd_atomic;
671         pool_vars[0].write_fptr = lprocfs_wr_atomic;
672         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
673
674         snprintf(var_name, MAX_STRING_SIZE, "granted");
675         pool_vars[0].data = &pl->pl_granted;
676         pool_vars[0].read_fptr = lprocfs_rd_atomic;
677         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
678
679         snprintf(var_name, MAX_STRING_SIZE, "grant_speed");
680         pool_vars[0].data = &pl->pl_grant_speed;
681         pool_vars[0].read_fptr = lprocfs_rd_atomic;
682         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
683
684         snprintf(var_name, MAX_STRING_SIZE, "cancel_rate");
685         pool_vars[0].data = &pl->pl_cancel_rate;
686         pool_vars[0].read_fptr = lprocfs_rd_atomic;
687         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
688
689         snprintf(var_name, MAX_STRING_SIZE, "grant_rate");
690         pool_vars[0].data = &pl->pl_grant_rate;
691         pool_vars[0].read_fptr = lprocfs_rd_atomic;
692         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
693
694         snprintf(var_name, MAX_STRING_SIZE, "grant_plan");
695         pool_vars[0].data = pl;
696         pool_vars[0].read_fptr = lprocfs_rd_grant_plan;
697         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
698
699         snprintf(var_name, MAX_STRING_SIZE, "grant_step");
700         pool_vars[0].data = pl;
701         pool_vars[0].read_fptr = lprocfs_rd_grant_step;
702         if (ns_is_server(ns))
703                 pool_vars[0].write_fptr = lprocfs_wr_grant_step;
704         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
705
706         snprintf(var_name, MAX_STRING_SIZE, "lock_volume_factor");
707         pool_vars[0].data = &pl->pl_lock_volume_factor;
708         pool_vars[0].read_fptr = lprocfs_rd_atomic;
709         pool_vars[0].write_fptr = lprocfs_wr_atomic;
710         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
711
712         snprintf(var_name, MAX_STRING_SIZE, "state");
713         pool_vars[0].data = pl;
714         pool_vars[0].read_fptr = lprocfs_rd_pool_state;
715         lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
716
717         pl->pl_stats = lprocfs_alloc_stats(LDLM_POOL_LAST_STAT -
718                                            LDLM_POOL_FIRST_STAT, 0);
719         if (!pl->pl_stats)
720                 GOTO(out_free_name, rc = -ENOMEM);
721
722         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANTED_STAT,
723                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
724                              "granted", "locks");
725         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANT_STAT, 
726                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
727                              "grant", "locks");
728         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_CANCEL_STAT, 
729                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
730                              "cancel", "locks");
731         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANT_RATE_STAT,
732                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
733                              "grant_rate", "locks/s");
734         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_CANCEL_RATE_STAT,
735                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
736                              "cancel_rate", "locks/s");
737         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANT_PLAN_STAT,
738                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
739                              "grant_plan", "locks/s");
740         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_SLV_STAT,
741                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
742                              "slv", "slv");
743         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_SHRINK_REQTD_STAT,
744                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
745                              "shrink_request", "locks");
746         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_SHRINK_FREED_STAT,
747                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
748                              "shrink_freed", "locks");
749         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_RECALC_STAT,
750                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
751                              "recalc_freed", "locks");
752         lprocfs_counter_init(pl->pl_stats, LDLM_POOL_TIMING_STAT,
753                              LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
754                              "recalc_timing", "sec");
755         lprocfs_register_stats(pl->pl_proc_dir, "stats", pl->pl_stats);
756
757         EXIT;
758 out_free_name:
759         OBD_FREE(var_name, MAX_STRING_SIZE + 1);
760         return rc;
761 }
762
763 static void ldlm_pool_proc_fini(struct ldlm_pool *pl)
764 {
765         if (pl->pl_stats != NULL) {
766                 lprocfs_free_stats(&pl->pl_stats);
767                 pl->pl_stats = NULL;
768         }
769         if (pl->pl_proc_dir != NULL) {
770                 lprocfs_remove(&pl->pl_proc_dir);
771                 pl->pl_proc_dir = NULL;
772         }
773 }
774 #else /* !__KERNEL__*/
775 #define ldlm_pool_proc_init(pl) (0)
776 #define ldlm_pool_proc_fini(pl) while (0) {}
777 #endif
778
779 int ldlm_pool_init(struct ldlm_pool *pl, struct ldlm_namespace *ns,
780                    int idx, ldlm_side_t client)
781 {
782         int rc;
783         ENTRY;
784
785         spin_lock_init(&pl->pl_lock);
786         atomic_set(&pl->pl_granted, 0);
787         pl->pl_recalc_time = cfs_time_current_sec();
788         atomic_set(&pl->pl_lock_volume_factor, 1);
789
790         atomic_set(&pl->pl_grant_rate, 0);
791         atomic_set(&pl->pl_cancel_rate, 0);
792         atomic_set(&pl->pl_grant_speed, 0);
793         pl->pl_grant_step = LDLM_POOL_GSP;
794         pl->pl_grant_plan = LDLM_POOL_GP(LDLM_POOL_HOST_L);
795
796         snprintf(pl->pl_name, sizeof(pl->pl_name), "ldlm-pool-%s-%d",
797                  ns->ns_name, idx);
798
799         if (client == LDLM_NAMESPACE_SERVER) {
800                 pl->pl_ops = &ldlm_srv_pool_ops;
801                 ldlm_pool_set_limit(pl, LDLM_POOL_HOST_L);
802                 pl->pl_server_lock_volume = ldlm_pool_slv_max(LDLM_POOL_HOST_L);
803         } else {
804                 pl->pl_server_lock_volume = 1;
805                 ldlm_pool_set_limit(pl, 1);
806                 pl->pl_ops = &ldlm_cli_pool_ops;
807         }
808         pl->pl_client_lock_volume = 0;
809         rc = ldlm_pool_proc_init(pl);
810         if (rc)
811                 RETURN(rc);
812
813         CDEBUG(D_DLMTRACE, "Lock pool %s is initialized\n", pl->pl_name);
814
815         RETURN(rc);
816 }
817 EXPORT_SYMBOL(ldlm_pool_init);
818
819 void ldlm_pool_fini(struct ldlm_pool *pl)
820 {
821         ENTRY;
822         ldlm_pool_proc_fini(pl);
823         
824         /* 
825          * Pool should not be used after this point. We can't free it here as
826          * it lives in struct ldlm_namespace, but still interested in catching
827          * any abnormal using cases.
828          */
829         POISON(pl, 0x5a, sizeof(*pl));
830         EXIT;
831 }
832 EXPORT_SYMBOL(ldlm_pool_fini);
833
834 /**
835  * Add new taken ldlm lock \a lock into pool \a pl accounting.
836  */
837 void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock)
838 {
839         /* 
840          * FLOCK locks are special in a sense that they are almost never
841          * cancelled, instead special kind of lock is used to drop them.
842          * also there is no LRU for flock locks, so no point in tracking
843          * them anyway. 
844          */
845         if (lock->l_resource->lr_type == LDLM_FLOCK)
846                 return;
847
848         ENTRY;
849                 
850         atomic_inc(&pl->pl_granted);
851         atomic_inc(&pl->pl_grant_rate);
852         atomic_inc(&pl->pl_grant_speed);
853
854         lprocfs_counter_incr(pl->pl_stats, LDLM_POOL_GRANT_STAT);
855  
856         /* 
857          * Do not do pool recalc for client side as all locks which
858          * potentially may be canceled has already been packed into 
859          * enqueue/cancel rpc. Also we do not want to run out of stack
860          * with too long call paths. 
861          */
862         if (ns_is_server(ldlm_pl2ns(pl)))
863                 ldlm_pool_recalc(pl);
864         EXIT;
865 }
866 EXPORT_SYMBOL(ldlm_pool_add);
867
868 /**
869  * Remove ldlm lock \a lock from pool \a pl accounting.
870  */
871 void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock)
872 {
873         /*
874          * Filter out FLOCK locks. Read above comment in ldlm_pool_add().
875          */
876         if (lock->l_resource->lr_type == LDLM_FLOCK)
877                 return;
878         ENTRY;
879         LASSERT(atomic_read(&pl->pl_granted) > 0);
880         atomic_dec(&pl->pl_granted);
881         atomic_inc(&pl->pl_cancel_rate);
882         atomic_dec(&pl->pl_grant_speed);
883         
884         lprocfs_counter_incr(pl->pl_stats, LDLM_POOL_CANCEL_STAT);
885
886         if (ns_is_server(ldlm_pl2ns(pl)))
887                 ldlm_pool_recalc(pl);
888         EXIT;
889 }
890 EXPORT_SYMBOL(ldlm_pool_del);
891
892 /**
893  * Returns current \a pl SLV.
894  *
895  * \pre ->pl_lock is not locked. 
896  */
897 __u64 ldlm_pool_get_slv(struct ldlm_pool *pl)
898 {
899         __u64 slv;
900         spin_lock(&pl->pl_lock);
901         slv = pl->pl_server_lock_volume;
902         spin_unlock(&pl->pl_lock);
903         return slv;
904 }
905 EXPORT_SYMBOL(ldlm_pool_get_slv);
906
907 /**
908  * Sets passed \a slv to \a pl.
909  *
910  * \pre ->pl_lock is not locked. 
911  */
912 void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv)
913 {
914         spin_lock(&pl->pl_lock);
915         pl->pl_server_lock_volume = slv;
916         spin_unlock(&pl->pl_lock);
917 }
918 EXPORT_SYMBOL(ldlm_pool_set_slv);
919
920 /**
921  * Returns current \a pl CLV.
922  *
923  * \pre ->pl_lock is not locked. 
924  */
925 __u64 ldlm_pool_get_clv(struct ldlm_pool *pl)
926 {
927         __u64 slv;
928         spin_lock(&pl->pl_lock);
929         slv = pl->pl_client_lock_volume;
930         spin_unlock(&pl->pl_lock);
931         return slv;
932 }
933 EXPORT_SYMBOL(ldlm_pool_get_clv);
934
935 /**
936  * Sets passed \a clv to \a pl.
937  *
938  * \pre ->pl_lock is not locked. 
939  */
940 void ldlm_pool_set_clv(struct ldlm_pool *pl, __u64 clv)
941 {
942         spin_lock(&pl->pl_lock);
943         pl->pl_client_lock_volume = clv;
944         spin_unlock(&pl->pl_lock);
945 }
946 EXPORT_SYMBOL(ldlm_pool_set_clv);
947
948 /**
949  * Returns current \a pl limit.
950  */
951 __u32 ldlm_pool_get_limit(struct ldlm_pool *pl)
952 {
953         return atomic_read(&pl->pl_limit);
954 }
955 EXPORT_SYMBOL(ldlm_pool_get_limit);
956
957 /**
958  * Sets passed \a limit to \a pl.
959  */
960 void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit)
961 {
962         atomic_set(&pl->pl_limit, limit);
963 }
964 EXPORT_SYMBOL(ldlm_pool_set_limit);
965
966 /**
967  * Returns current LVF from \a pl.
968  */
969 __u32 ldlm_pool_get_lvf(struct ldlm_pool *pl)
970 {
971         return atomic_read(&pl->pl_lock_volume_factor);
972 }
973 EXPORT_SYMBOL(ldlm_pool_get_lvf);
974
975 #ifdef __KERNEL__
976 static int ldlm_pool_granted(struct ldlm_pool *pl)
977 {
978         return atomic_read(&pl->pl_granted);
979 }
980
981 static struct ptlrpc_thread *ldlm_pools_thread;
982 static struct shrinker *ldlm_pools_srv_shrinker;
983 static struct shrinker *ldlm_pools_cli_shrinker;
984 static struct completion ldlm_pools_comp;
985
986 void ldlm_pools_wakeup(void)
987 {
988         ENTRY;
989         if (ldlm_pools_thread == NULL)
990                 return;
991         ldlm_pools_thread->t_flags |= SVC_EVENT;
992         cfs_waitq_signal(&ldlm_pools_thread->t_ctl_waitq);
993         EXIT;
994 }
995 EXPORT_SYMBOL(ldlm_pools_wakeup);
996
997 /* 
998  * Cancel \a nr locks from all namespaces (if possible). Returns number of
999  * cached locks after shrink is finished. All namespaces are asked to
1000  * cancel approximately equal amount of locks to keep balancing.
1001  */
1002 static int ldlm_pools_shrink(ldlm_side_t client, int nr, 
1003                              unsigned int gfp_mask)
1004 {
1005         int total = 0, cached = 0, nr_ns;
1006         struct ldlm_namespace *ns;
1007
1008         if (nr != 0 && !(gfp_mask & __GFP_FS))
1009                 return -1;
1010
1011         CDEBUG(D_DLMTRACE, "Request to shrink %d %s locks from all pools\n",
1012                nr, client == LDLM_NAMESPACE_CLIENT ? "client" : "server");
1013
1014         /* 
1015          * Find out how many resources we may release. 
1016          */
1017         for (nr_ns = atomic_read(ldlm_namespace_nr(client)); 
1018              nr_ns > 0; nr_ns--) 
1019         {
1020                 mutex_down(ldlm_namespace_lock(client));
1021                 if (list_empty(ldlm_namespace_list(client))) {
1022                         mutex_up(ldlm_namespace_lock(client));
1023                         return 0;
1024                 }
1025                 ns = ldlm_namespace_first_locked(client);
1026                 ldlm_namespace_get(ns);
1027                 ldlm_namespace_move_locked(ns, client);
1028                 mutex_up(ldlm_namespace_lock(client));
1029                 total += ldlm_pool_shrink(&ns->ns_pool, 0, gfp_mask);
1030                 ldlm_namespace_put(ns, 1);
1031         }
1032  
1033         if (nr == 0 || total == 0)
1034                 return total;
1035
1036         /* 
1037          * Shrink at least ldlm_namespace_nr(client) namespaces. 
1038          */
1039         for (nr_ns = atomic_read(ldlm_namespace_nr(client)); 
1040              nr_ns > 0; nr_ns--) 
1041         {
1042                 int cancel, nr_locks;
1043
1044                 /* 
1045                  * Do not call shrink under ldlm_namespace_lock(client) 
1046                  */
1047                 mutex_down(ldlm_namespace_lock(client));
1048                 if (list_empty(ldlm_namespace_list(client))) {
1049                         mutex_up(ldlm_namespace_lock(client));
1050                         /* 
1051                          * If list is empty, we can't return any @cached > 0,
1052                          * that probably would cause needless shrinker
1053                          * call. 
1054                          */
1055                         cached = 0;
1056                         break;
1057                 }
1058                 ns = ldlm_namespace_first_locked(client);
1059                 ldlm_namespace_get(ns);
1060                 ldlm_namespace_move_locked(ns, client);
1061                 mutex_up(ldlm_namespace_lock(client));
1062                 
1063                 nr_locks = ldlm_pool_granted(&ns->ns_pool);
1064                 cancel = 1 + nr_locks * nr / total;
1065                 ldlm_pool_shrink(&ns->ns_pool, cancel, gfp_mask);
1066                 cached += ldlm_pool_granted(&ns->ns_pool);
1067                 ldlm_namespace_put(ns, 1);
1068         }
1069         return cached;
1070 }
1071
1072 static int ldlm_pools_srv_shrink(int nr, unsigned int gfp_mask)
1073 {
1074         return ldlm_pools_shrink(LDLM_NAMESPACE_SERVER, nr, gfp_mask);
1075 }
1076
1077 static int ldlm_pools_cli_shrink(int nr, unsigned int gfp_mask)
1078 {
1079         return ldlm_pools_shrink(LDLM_NAMESPACE_CLIENT, nr, gfp_mask);
1080 }
1081
1082 void ldlm_pools_recalc(ldlm_side_t client)
1083 {
1084         __u32 nr_l = 0, nr_p = 0, l;
1085         struct ldlm_namespace *ns;
1086         int nr, equal = 0;
1087
1088         /* 
1089          * No need to setup pool limit for client pools.
1090          */
1091         if (client == LDLM_NAMESPACE_SERVER) {
1092                 /* 
1093                  * Check all modest namespaces first. 
1094                  */
1095                 mutex_down(ldlm_namespace_lock(client));
1096                 list_for_each_entry(ns, ldlm_namespace_list(client), 
1097                                     ns_list_chain) 
1098                 {
1099                         if (ns->ns_appetite != LDLM_NAMESPACE_MODEST)
1100                                 continue;
1101
1102                         l = ldlm_pool_granted(&ns->ns_pool);
1103                         if (l == 0)
1104                                 l = 1;
1105
1106                         /* 
1107                          * Set the modest pools limit equal to their avg granted
1108                          * locks + 5%. 
1109                          */
1110                         l += dru(l * LDLM_POOLS_MODEST_MARGIN, 100);
1111                         ldlm_pool_setup(&ns->ns_pool, l);
1112                         nr_l += l;
1113                         nr_p++;
1114                 }
1115
1116                 /* 
1117                  * Make sure that modest namespaces did not eat more that 2/3 
1118                  * of limit. 
1119                  */
1120                 if (nr_l >= 2 * (LDLM_POOL_HOST_L / 3)) {
1121                         CWARN("\"Modest\" pools eat out 2/3 of server locks "
1122                               "limit (%d of %lu). This means that you have too "
1123                               "many clients for this amount of server RAM. "
1124                               "Upgrade server!\n", nr_l, LDLM_POOL_HOST_L);
1125                         equal = 1;
1126                 }
1127
1128                 /* 
1129                  * The rest is given to greedy namespaces. 
1130                  */
1131                 list_for_each_entry(ns, ldlm_namespace_list(client), 
1132                                     ns_list_chain) 
1133                 {
1134                         if (!equal && ns->ns_appetite != LDLM_NAMESPACE_GREEDY)
1135                                 continue;
1136
1137                         if (equal) {
1138                                 /* 
1139                                  * In the case 2/3 locks are eaten out by
1140                                  * modest pools, we re-setup equal limit
1141                                  * for _all_ pools. 
1142                                  */
1143                                 l = LDLM_POOL_HOST_L /
1144                                         atomic_read(ldlm_namespace_nr(client));
1145                         } else {
1146                                 /* 
1147                                  * All the rest of greedy pools will have
1148                                  * all locks in equal parts.
1149                                  */
1150                                 l = (LDLM_POOL_HOST_L - nr_l) /
1151                                         (atomic_read(ldlm_namespace_nr(client)) -
1152                                          nr_p);
1153                         }
1154                         ldlm_pool_setup(&ns->ns_pool, l);
1155                 }
1156                 mutex_up(ldlm_namespace_lock(client));
1157         }
1158
1159         /* 
1160          * Recalc at least ldlm_namespace_nr(client) namespaces. 
1161          */
1162         for (nr = atomic_read(ldlm_namespace_nr(client)); nr > 0; nr--) {
1163                 /* 
1164                  * Lock the list, get first @ns in the list, getref, move it
1165                  * to the tail, unlock and call pool recalc. This way we avoid
1166                  * calling recalc under @ns lock what is really good as we get
1167                  * rid of potential deadlock on client nodes when canceling
1168                  * locks synchronously. 
1169                  */
1170                 mutex_down(ldlm_namespace_lock(client));
1171                 if (list_empty(ldlm_namespace_list(client))) {
1172                         mutex_up(ldlm_namespace_lock(client));
1173                         break;
1174                 }
1175                 ns = ldlm_namespace_first_locked(client);
1176                 ldlm_namespace_get(ns);
1177                 ldlm_namespace_move_locked(ns, client);
1178                 mutex_up(ldlm_namespace_lock(client));
1179
1180                 /* 
1181                  * After setup is done - recalc the pool. 
1182                  */
1183                 ldlm_pool_recalc(&ns->ns_pool);
1184                 ldlm_namespace_put(ns, 1);
1185         }
1186 }
1187 EXPORT_SYMBOL(ldlm_pools_recalc);
1188
1189 static int ldlm_pools_thread_main(void *arg)
1190 {
1191         struct ptlrpc_thread *thread = (struct ptlrpc_thread *)arg;
1192         char *t_name = "ldlm_poold";
1193         ENTRY;
1194
1195         cfs_daemonize(t_name);
1196         thread->t_flags = SVC_RUNNING;
1197         cfs_waitq_signal(&thread->t_ctl_waitq);
1198
1199         CDEBUG(D_DLMTRACE, "%s: pool thread starting, process %d\n",
1200                t_name, cfs_curproc_pid());
1201
1202         while (1) {
1203                 struct l_wait_info lwi;
1204
1205                 /*
1206                  * Recal all pools on this tick. 
1207                  */
1208                 ldlm_pools_recalc(LDLM_NAMESPACE_SERVER);
1209                 ldlm_pools_recalc(LDLM_NAMESPACE_CLIENT);
1210                 
1211                 /*
1212                  * Wait until the next check time, or until we're
1213                  * stopped. 
1214                  */
1215                 lwi = LWI_TIMEOUT(cfs_time_seconds(LDLM_POOLS_THREAD_PERIOD),
1216                                   NULL, NULL);
1217                 l_wait_event(thread->t_ctl_waitq, (thread->t_flags &
1218                                                    (SVC_STOPPING|SVC_EVENT)),
1219                              &lwi);
1220
1221                 if (thread->t_flags & SVC_STOPPING) {
1222                         thread->t_flags &= ~SVC_STOPPING;
1223                         break;
1224                 } else if (thread->t_flags & SVC_EVENT) {
1225                         thread->t_flags &= ~SVC_EVENT;
1226                 }
1227         }
1228
1229         thread->t_flags = SVC_STOPPED;
1230         cfs_waitq_signal(&thread->t_ctl_waitq);
1231
1232         CDEBUG(D_DLMTRACE, "%s: pool thread exiting, process %d\n",
1233                t_name, cfs_curproc_pid());
1234
1235         complete_and_exit(&ldlm_pools_comp, 0);
1236 }
1237
1238 static int ldlm_pools_thread_start(void)
1239 {
1240         struct l_wait_info lwi = { 0 };
1241         int rc;
1242         ENTRY;
1243
1244         if (ldlm_pools_thread != NULL)
1245                 RETURN(-EALREADY);
1246
1247         OBD_ALLOC_PTR(ldlm_pools_thread);
1248         if (ldlm_pools_thread == NULL)
1249                 RETURN(-ENOMEM);
1250
1251         init_completion(&ldlm_pools_comp);
1252         cfs_waitq_init(&ldlm_pools_thread->t_ctl_waitq);
1253
1254         /* 
1255          * CLONE_VM and CLONE_FILES just avoid a needless copy, because we
1256          * just drop the VM and FILES in ptlrpc_daemonize() right away. 
1257          */
1258         rc = cfs_kernel_thread(ldlm_pools_thread_main, ldlm_pools_thread,
1259                                CLONE_VM | CLONE_FILES);
1260         if (rc < 0) {
1261                 CERROR("Can't start pool thread, error %d\n",
1262                        rc);
1263                 OBD_FREE(ldlm_pools_thread, sizeof(*ldlm_pools_thread));
1264                 ldlm_pools_thread = NULL;
1265                 RETURN(rc);
1266         }
1267         l_wait_event(ldlm_pools_thread->t_ctl_waitq,
1268                      (ldlm_pools_thread->t_flags & SVC_RUNNING), &lwi);
1269         RETURN(0);
1270 }
1271
1272 static void ldlm_pools_thread_stop(void)
1273 {
1274         ENTRY;
1275
1276         if (ldlm_pools_thread == NULL) {
1277                 EXIT;
1278                 return;
1279         }
1280
1281         ldlm_pools_thread->t_flags = SVC_STOPPING;
1282         cfs_waitq_signal(&ldlm_pools_thread->t_ctl_waitq);
1283
1284         /* 
1285          * Make sure that pools thread is finished before freeing @thread.
1286          * This fixes possible race and oops due to accessing freed memory
1287          * in pools thread. 
1288          */
1289         wait_for_completion(&ldlm_pools_comp);
1290         OBD_FREE_PTR(ldlm_pools_thread);
1291         ldlm_pools_thread = NULL;
1292         EXIT;
1293 }
1294
1295 int ldlm_pools_init(void)
1296 {
1297         int rc;
1298         ENTRY;
1299
1300         rc = ldlm_pools_thread_start();
1301         if (rc == 0) {
1302                 ldlm_pools_srv_shrinker = set_shrinker(DEFAULT_SEEKS,
1303                                                        ldlm_pools_srv_shrink);
1304                 ldlm_pools_cli_shrinker = set_shrinker(DEFAULT_SEEKS,
1305                                                        ldlm_pools_cli_shrink);
1306         }
1307         RETURN(rc);
1308 }
1309 EXPORT_SYMBOL(ldlm_pools_init);
1310
1311 void ldlm_pools_fini(void)
1312 {
1313         if (ldlm_pools_srv_shrinker != NULL) {
1314                 remove_shrinker(ldlm_pools_srv_shrinker);
1315                 ldlm_pools_srv_shrinker = NULL;
1316         }
1317         if (ldlm_pools_cli_shrinker != NULL) {
1318                 remove_shrinker(ldlm_pools_cli_shrinker);
1319                 ldlm_pools_cli_shrinker = NULL;
1320         }
1321         ldlm_pools_thread_stop();
1322 }
1323 EXPORT_SYMBOL(ldlm_pools_fini);
1324 #endif /* __KERNEL__ */
1325
1326 #else /* !HAVE_LRU_RESIZE_SUPPORT */
1327 int ldlm_pool_setup(struct ldlm_pool *pl, int limit)
1328 {
1329         return 0;
1330 }
1331 EXPORT_SYMBOL(ldlm_pool_setup);
1332
1333 int ldlm_pool_recalc(struct ldlm_pool *pl)
1334 {
1335         return 0;
1336 }
1337 EXPORT_SYMBOL(ldlm_pool_recalc);
1338
1339 int ldlm_pool_shrink(struct ldlm_pool *pl,
1340                      int nr, unsigned int gfp_mask)
1341 {
1342         return 0;
1343 }
1344 EXPORT_SYMBOL(ldlm_pool_shrink);
1345
1346 int ldlm_pool_init(struct ldlm_pool *pl, struct ldlm_namespace *ns,
1347                    int idx, ldlm_side_t client)
1348 {
1349         return 0;
1350 }
1351 EXPORT_SYMBOL(ldlm_pool_init);
1352
1353 void ldlm_pool_fini(struct ldlm_pool *pl)
1354 {
1355         return;
1356 }
1357 EXPORT_SYMBOL(ldlm_pool_fini);
1358
1359 void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock)
1360 {
1361         return;
1362 }
1363 EXPORT_SYMBOL(ldlm_pool_add);
1364
1365 void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock)
1366 {
1367         return;
1368 }
1369 EXPORT_SYMBOL(ldlm_pool_del);
1370
1371 __u64 ldlm_pool_get_slv(struct ldlm_pool *pl)
1372 {
1373         return 1;
1374 }
1375 EXPORT_SYMBOL(ldlm_pool_get_slv);
1376
1377 void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv)
1378 {
1379         return;
1380 }
1381 EXPORT_SYMBOL(ldlm_pool_set_slv);
1382
1383 __u64 ldlm_pool_get_clv(struct ldlm_pool *pl)
1384 {
1385         return 1;
1386 }
1387 EXPORT_SYMBOL(ldlm_pool_get_clv);
1388
1389 void ldlm_pool_set_clv(struct ldlm_pool *pl, __u64 clv)
1390 {
1391         return;
1392 }
1393 EXPORT_SYMBOL(ldlm_pool_set_clv);
1394
1395 __u32 ldlm_pool_get_limit(struct ldlm_pool *pl)
1396 {
1397         return 0;
1398 }
1399 EXPORT_SYMBOL(ldlm_pool_get_limit);
1400
1401 void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit)
1402 {
1403         return;
1404 }
1405 EXPORT_SYMBOL(ldlm_pool_set_limit);
1406
1407 __u32 ldlm_pool_get_lvf(struct ldlm_pool *pl)
1408 {
1409         return 0;
1410 }
1411 EXPORT_SYMBOL(ldlm_pool_get_lvf);
1412
1413 int ldlm_pools_init(void)
1414 {
1415         return 0;
1416 }
1417 EXPORT_SYMBOL(ldlm_pools_init);
1418
1419 void ldlm_pools_fini(void)
1420 {
1421         return;
1422 }
1423 EXPORT_SYMBOL(ldlm_pools_fini);
1424
1425 void ldlm_pools_wakeup(void)
1426 {
1427         return;
1428 }
1429 EXPORT_SYMBOL(ldlm_pools_wakeup);
1430
1431 void ldlm_pools_recalc(ldlm_side_t client)
1432 {
1433         return;
1434 }
1435 EXPORT_SYMBOL(ldlm_pools_recalc);
1436 #endif /* HAVE_LRU_RESIZE_SUPPORT */