Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / fld / fld_cache.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fld/fld_cache.c
5  *  FLD (Fids Location Database)
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_FLD
32
33 #ifdef __KERNEL__
34 # include <libcfs/libcfs.h>
35 # include <linux/module.h>
36 # include <linux/jbd.h>
37 # include <asm/div64.h>
38 #else /* __KERNEL__ */
39 # include <liblustre.h>
40 # include <libcfs/list.h>
41 #endif
42
43 #include <obd.h>
44 #include <obd_class.h>
45 #include <lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
48
49 #include <dt_object.h>
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include <lustre_fld.h>
53 #include "fld_internal.h"
54
55 #ifdef __KERNEL__
56 static inline __u32 fld_cache_hash(seqno_t seq)
57 {
58         return (__u32)seq;
59 }
60
61 void fld_cache_flush(struct fld_cache *cache)
62 {
63         struct fld_cache_entry *flde;
64         struct hlist_head *bucket;
65         struct hlist_node *scan;
66         struct hlist_node *next;
67         int i;
68         ENTRY;
69
70         /* Free all cache entries. */
71         spin_lock(&cache->fci_lock);
72         for (i = 0; i < cache->fci_hash_size; i++) {
73                 bucket = cache->fci_hash_table + i;
74                 hlist_for_each_entry_safe(flde, scan, next, bucket, fce_list) {
75                         hlist_del_init(&flde->fce_list);
76                         list_del_init(&flde->fce_lru);
77                         cache->fci_cache_count--;
78                         OBD_FREE_PTR(flde);
79                 }
80         }
81         spin_unlock(&cache->fci_lock);
82         EXIT;
83 }
84
85 struct fld_cache *fld_cache_init(const char *name, int hash_size,
86                                  int cache_size, int cache_threshold)
87 {
88         struct fld_cache *cache;
89         int i;
90         ENTRY;
91
92         LASSERT(name != NULL);
93         LASSERT(IS_PO2(hash_size));
94         LASSERT(cache_threshold < cache_size);
95
96         OBD_ALLOC_PTR(cache);
97         if (cache == NULL)
98                 RETURN(ERR_PTR(-ENOMEM));
99
100         INIT_LIST_HEAD(&cache->fci_lru);
101
102         cache->fci_cache_count = 0;
103         spin_lock_init(&cache->fci_lock);
104
105         strncpy(cache->fci_name, name,
106                 sizeof(cache->fci_name));
107
108         cache->fci_hash_size = hash_size;
109         cache->fci_cache_size = cache_size;
110         cache->fci_threshold = cache_threshold;
111
112         /* Init fld cache info. */
113         cache->fci_hash_mask = hash_size - 1;
114         OBD_ALLOC(cache->fci_hash_table,
115                   hash_size * sizeof(*cache->fci_hash_table));
116         if (cache->fci_hash_table == NULL) {
117                 OBD_FREE_PTR(cache);
118                 RETURN(ERR_PTR(-ENOMEM));
119         }
120
121         for (i = 0; i < hash_size; i++)
122                 INIT_HLIST_HEAD(&cache->fci_hash_table[i]);
123         memset(&cache->fci_stat, 0, sizeof(cache->fci_stat));
124
125         CDEBUG(D_INFO, "%s: FLD cache - Size: %d, Threshold: %d\n",
126                cache->fci_name, cache_size, cache_threshold);
127
128         RETURN(cache);
129 }
130 EXPORT_SYMBOL(fld_cache_init);
131
132 void fld_cache_fini(struct fld_cache *cache)
133 {
134         __u64 pct;
135         ENTRY;
136
137         LASSERT(cache != NULL);
138         fld_cache_flush(cache);
139
140         if (cache->fci_stat.fst_count > 0) {
141                 pct = cache->fci_stat.fst_cache * 100;
142                 do_div(pct, cache->fci_stat.fst_count);
143         } else {
144                 pct = 0;
145         }
146
147         printk("FLD cache statistics (%s):\n", cache->fci_name);
148         printk("  Total reqs: "LPU64"\n", cache->fci_stat.fst_count);
149         printk("  Cache reqs: "LPU64"\n", cache->fci_stat.fst_cache);
150         printk("  Saved RPCs: "LPU64"\n", cache->fci_stat.fst_inflight);
151         printk("  Cache hits: "LPU64"%%\n", pct);
152
153         OBD_FREE(cache->fci_hash_table, cache->fci_hash_size *
154                  sizeof(*cache->fci_hash_table));
155         OBD_FREE_PTR(cache);
156         
157         EXIT;
158 }
159 EXPORT_SYMBOL(fld_cache_fini);
160
161 static inline struct hlist_head *
162 fld_cache_bucket(struct fld_cache *cache, seqno_t seq)
163 {
164         return cache->fci_hash_table + (fld_cache_hash(seq) &
165                                         cache->fci_hash_mask);
166 }
167
168 /*
169  * Check if cache needs to be shrinked. If so - do it. Tries to keep all
170  * collision lists well balanced. That is, check all of them and remove one
171  * entry in list and so on until cache is shrinked enough.
172  */
173 static int fld_cache_shrink(struct fld_cache *cache)
174 {
175         struct fld_cache_entry *flde;
176         struct list_head *curr;
177         int num = 0;
178         ENTRY;
179
180         LASSERT(cache != NULL);
181
182         if (cache->fci_cache_count < cache->fci_cache_size)
183                 RETURN(0);
184
185         curr = cache->fci_lru.prev;
186
187         while (cache->fci_cache_count + cache->fci_threshold >
188                cache->fci_cache_size && curr != &cache->fci_lru)
189         {
190                 flde = list_entry(curr, struct fld_cache_entry, fce_lru);
191                 curr = curr->prev;
192
193                 /* keep inflights */
194                 if (flde->fce_inflight)
195                         continue;
196
197                 hlist_del_init(&flde->fce_list);
198                 list_del_init(&flde->fce_lru);
199                 cache->fci_cache_count--;
200                 OBD_FREE_PTR(flde);
201                 num++;
202         }
203
204         CDEBUG(D_INFO, "%s: FLD cache - Shrinked by "
205                "%d entries\n", cache->fci_name, num);
206
207         RETURN(0);
208 }
209
210 int fld_cache_insert_inflight(struct fld_cache *cache, seqno_t seq)
211 {
212         struct fld_cache_entry *flde, *fldt;
213         struct hlist_head *bucket;
214         struct hlist_node *scan;
215         ENTRY;
216
217         spin_lock(&cache->fci_lock);
218
219         /* Check if cache already has the entry with such a seq. */
220         bucket = fld_cache_bucket(cache, seq);
221         hlist_for_each_entry(fldt, scan, bucket, fce_list) {
222                 if (fldt->fce_seq == seq) {
223                         spin_unlock(&cache->fci_lock);
224                         RETURN(-EEXIST);
225                 }
226         }
227         spin_unlock(&cache->fci_lock);
228
229         /* Allocate new entry. */
230         OBD_ALLOC_PTR(flde);
231         if (!flde)
232                 RETURN(-ENOMEM);
233
234         /*
235          * Check if cache has the entry with such a seq again. It could be added
236          * while we were allocating new entry.
237          */
238         spin_lock(&cache->fci_lock);
239         hlist_for_each_entry(fldt, scan, bucket, fce_list) {
240                 if (fldt->fce_seq == seq) {
241                         spin_unlock(&cache->fci_lock);
242                         OBD_FREE_PTR(flde);
243                         RETURN(0);
244                 }
245         }
246
247         /* Add new entry to cache and lru list. */
248         INIT_HLIST_NODE(&flde->fce_list);
249         flde->fce_inflight = 1;
250         flde->fce_invalid = 1;
251         cfs_waitq_init(&flde->fce_waitq);
252         flde->fce_seq = seq;
253
254         hlist_add_head(&flde->fce_list, bucket);
255         list_add(&flde->fce_lru, &cache->fci_lru);
256         cache->fci_cache_count++;
257
258         spin_unlock(&cache->fci_lock);
259
260         RETURN(0);
261 }
262 EXPORT_SYMBOL(fld_cache_insert_inflight);
263
264 int fld_cache_insert(struct fld_cache *cache,
265                      seqno_t seq, mdsno_t mds)
266 {
267         struct fld_cache_entry *flde, *fldt;
268         struct hlist_head *bucket;
269         struct hlist_node *scan;
270         int rc;
271         ENTRY;
272
273         spin_lock(&cache->fci_lock);
274
275         /* Check if need to shrink cache. */
276         rc = fld_cache_shrink(cache);
277         if (rc) {
278                 spin_unlock(&cache->fci_lock);
279                 RETURN(rc);
280         }
281
282         /* Check if cache already has the entry with such a seq. */
283         bucket = fld_cache_bucket(cache, seq);
284         hlist_for_each_entry(fldt, scan, bucket, fce_list) {
285                 if (fldt->fce_seq == seq) {
286                         if (fldt->fce_inflight) {
287                                 /* set mds for inflight entry */
288                                 fldt->fce_mds = mds;
289                                 fldt->fce_inflight = 0;
290                                 fldt->fce_invalid = 0;
291                                 cfs_waitq_signal(&fldt->fce_waitq);
292                                 rc = 0;
293                         } else
294                                 rc = -EEXIST;
295                         spin_unlock(&cache->fci_lock);
296                         RETURN(rc);
297                 }
298         }
299         spin_unlock(&cache->fci_lock);
300
301         /* Allocate new entry. */
302         OBD_ALLOC_PTR(flde);
303         if (!flde)
304                 RETURN(-ENOMEM);
305
306         /*
307          * Check if cache has the entry with such a seq again. It could be added
308          * while we were allocating new entry.
309          */
310         spin_lock(&cache->fci_lock);
311         hlist_for_each_entry(fldt, scan, bucket, fce_list) {
312                 if (fldt->fce_seq == seq) {
313                         spin_unlock(&cache->fci_lock);
314                         OBD_FREE_PTR(flde);
315                         RETURN(0);
316                 }
317         }
318
319         /* Add new entry to cache and lru list. */
320         INIT_HLIST_NODE(&flde->fce_list);
321         flde->fce_mds = mds;
322         flde->fce_seq = seq;
323         flde->fce_inflight = 0;
324         flde->fce_invalid = 0;
325
326         hlist_add_head(&flde->fce_list, bucket);
327         list_add(&flde->fce_lru, &cache->fci_lru);
328         cache->fci_cache_count++;
329
330         spin_unlock(&cache->fci_lock);
331
332         RETURN(0);
333 }
334 EXPORT_SYMBOL(fld_cache_insert);
335
336 void fld_cache_delete(struct fld_cache *cache, seqno_t seq)
337 {
338         struct fld_cache_entry *flde;
339         struct hlist_node *scan, *n;
340         struct hlist_head *bucket;
341         ENTRY;
342
343         bucket = fld_cache_bucket(cache, seq);
344         
345         spin_lock(&cache->fci_lock);
346         hlist_for_each_entry_safe(flde, scan, n, bucket, fce_list) {
347                 if (flde->fce_seq == seq) {
348                         hlist_del_init(&flde->fce_list);
349                         list_del_init(&flde->fce_lru);
350                         if (flde->fce_inflight) {
351                                 flde->fce_inflight = 0;
352                                 flde->fce_invalid = 1;
353                                 cfs_waitq_signal(&flde->fce_waitq);
354                         }
355                         cache->fci_cache_count--;
356                         OBD_FREE_PTR(flde);
357                         GOTO(out_unlock, 0);
358                 }
359         }
360
361         EXIT;
362 out_unlock:
363         spin_unlock(&cache->fci_lock);
364 }
365 EXPORT_SYMBOL(fld_cache_delete);
366
367 static int fld_check_inflight(struct fld_cache_entry *flde)
368 {
369         return (flde->fce_inflight);
370 }
371
372 int fld_cache_lookup(struct fld_cache *cache,
373                      seqno_t seq, mdsno_t *mds)
374 {
375         struct fld_cache_entry *flde;
376         struct hlist_node *scan, *n;
377         struct hlist_head *bucket;
378         ENTRY;
379
380         bucket = fld_cache_bucket(cache, seq);
381
382         spin_lock(&cache->fci_lock);
383         cache->fci_stat.fst_count++;
384         hlist_for_each_entry_safe(flde, scan, n, bucket, fce_list) {
385                 if (flde->fce_seq == seq) {
386                         if (flde->fce_inflight) {
387                                 /* lookup RPC is inflight need to wait */
388                                 struct l_wait_info lwi;
389                                 spin_unlock(&cache->fci_lock);
390                                 lwi = LWI_TIMEOUT(0, NULL, NULL);
391                                 l_wait_event(flde->fce_waitq,
392                                              !fld_check_inflight(flde), &lwi);
393                                 LASSERT(!flde->fce_inflight);
394                                 if (flde->fce_invalid) 
395                                         RETURN(-ENOENT);
396                                 
397                                 *mds = flde->fce_mds;
398                                 cache->fci_stat.fst_inflight++;
399                         } else {
400                                 LASSERT(!flde->fce_invalid);
401                                 *mds = flde->fce_mds;
402                                 list_del(&flde->fce_lru);
403                                 list_add(&flde->fce_lru, &cache->fci_lru);
404                                 cache->fci_stat.fst_cache++;
405                                 spin_unlock(&cache->fci_lock);
406                         }
407                         RETURN(0);
408                 }
409         }
410         spin_unlock(&cache->fci_lock);
411         RETURN(-ENOENT);
412 }
413 EXPORT_SYMBOL(fld_cache_lookup);
414 #else
415 int fld_cache_insert_inflight(struct fld_cache *cache, seqno_t seq)
416 {
417         return -ENOTSUPP;
418 }
419 EXPORT_SYMBOL(fld_cache_insert_inflight);
420
421 int fld_cache_insert(struct fld_cache *cache,
422                      seqno_t seq, mdsno_t mds)
423 {
424         return -ENOTSUPP;
425 }
426 EXPORT_SYMBOL(fld_cache_insert);
427
428 void fld_cache_delete(struct fld_cache *cache,
429                       seqno_t seq)
430 {
431         return;
432 }
433 EXPORT_SYMBOL(fld_cache_delete);
434
435 int fld_cache_lookup(struct fld_cache *cache,
436                      seqno_t seq, mdsno_t *mds)
437 {
438         return -ENOTSUPP;
439 }
440 EXPORT_SYMBOL(fld_cache_lookup);
441 #endif
442