Whamcloud - gitweb
8bf930795dc1b0e0831746f7bd0bf1867eebcd1e
[fs/lustre-release.git] / lustre / fld / fld_cache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/fld/fld_cache.c
37  *
38  * FLD (Fids Location Database)
39  *
40  * Author: Yury Umanets <umka@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_FLD
47
48 #ifdef __KERNEL__
49 # include <libcfs/libcfs.h>
50 # include <linux/module.h>
51 # include <linux/jbd.h>
52 # include <asm/div64.h>
53 #else /* __KERNEL__ */
54 # include <liblustre.h>
55 # include <libcfs/list.h>
56 #endif
57
58 #include <obd.h>
59 #include <obd_class.h>
60 #include <lustre_ver.h>
61 #include <obd_support.h>
62 #include <lprocfs_status.h>
63
64 #include <dt_object.h>
65 #include <md_object.h>
66 #include <lustre_req_layout.h>
67 #include <lustre_fld.h>
68 #include "fld_internal.h"
69
70 #ifdef __KERNEL__
71 static inline __u32 fld_cache_hash(seqno_t seq)
72 {
73         return (__u32)seq;
74 }
75
76 void fld_cache_flush(struct fld_cache *cache)
77 {
78         struct fld_cache_entry *flde;
79         struct hlist_head *bucket;
80         struct hlist_node *scan;
81         struct hlist_node *next;
82         int i;
83         ENTRY;
84
85         /* Free all cache entries. */
86         spin_lock(&cache->fci_lock);
87         for (i = 0; i < cache->fci_hash_size; i++) {
88                 bucket = cache->fci_hash_table + i;
89                 hlist_for_each_entry_safe(flde, scan, next, bucket, fce_list) {
90                         hlist_del_init(&flde->fce_list);
91                         list_del_init(&flde->fce_lru);
92                         cache->fci_cache_count--;
93                         OBD_FREE_PTR(flde);
94                 }
95         }
96         spin_unlock(&cache->fci_lock);
97         EXIT;
98 }
99
100 struct fld_cache *fld_cache_init(const char *name, int hash_size,
101                                  int cache_size, int cache_threshold)
102 {
103         struct fld_cache *cache;
104         int i;
105         ENTRY;
106
107         LASSERT(name != NULL);
108         LASSERT(IS_PO2(hash_size));
109         LASSERT(cache_threshold < cache_size);
110
111         OBD_ALLOC_PTR(cache);
112         if (cache == NULL)
113                 RETURN(ERR_PTR(-ENOMEM));
114
115         INIT_LIST_HEAD(&cache->fci_lru);
116
117         cache->fci_cache_count = 0;
118         spin_lock_init(&cache->fci_lock);
119
120         strncpy(cache->fci_name, name,
121                 sizeof(cache->fci_name));
122
123         cache->fci_hash_size = hash_size;
124         cache->fci_cache_size = cache_size;
125         cache->fci_threshold = cache_threshold;
126
127         /* Init fld cache info. */
128         cache->fci_hash_mask = hash_size - 1;
129         OBD_ALLOC(cache->fci_hash_table,
130                   hash_size * sizeof(*cache->fci_hash_table));
131         if (cache->fci_hash_table == NULL) {
132                 OBD_FREE_PTR(cache);
133                 RETURN(ERR_PTR(-ENOMEM));
134         }
135
136         for (i = 0; i < hash_size; i++)
137                 INIT_HLIST_HEAD(&cache->fci_hash_table[i]);
138         memset(&cache->fci_stat, 0, sizeof(cache->fci_stat));
139
140         CDEBUG(D_INFO, "%s: FLD cache - Size: %d, Threshold: %d\n",
141                cache->fci_name, cache_size, cache_threshold);
142
143         RETURN(cache);
144 }
145 EXPORT_SYMBOL(fld_cache_init);
146
147 void fld_cache_fini(struct fld_cache *cache)
148 {
149         __u64 pct;
150         ENTRY;
151
152         LASSERT(cache != NULL);
153         fld_cache_flush(cache);
154
155         if (cache->fci_stat.fst_count > 0) {
156                 pct = cache->fci_stat.fst_cache * 100;
157                 do_div(pct, cache->fci_stat.fst_count);
158         } else {
159                 pct = 0;
160         }
161
162         printk("FLD cache statistics (%s):\n", cache->fci_name);
163         printk("  Total reqs: "LPU64"\n", cache->fci_stat.fst_count);
164         printk("  Cache reqs: "LPU64"\n", cache->fci_stat.fst_cache);
165         printk("  Saved RPCs: "LPU64"\n", cache->fci_stat.fst_inflight);
166         printk("  Cache hits: "LPU64"%%\n", pct);
167
168         OBD_FREE(cache->fci_hash_table, cache->fci_hash_size *
169                  sizeof(*cache->fci_hash_table));
170         OBD_FREE_PTR(cache);
171         
172         EXIT;
173 }
174 EXPORT_SYMBOL(fld_cache_fini);
175
176 static inline struct hlist_head *
177 fld_cache_bucket(struct fld_cache *cache, seqno_t seq)
178 {
179         return cache->fci_hash_table + (fld_cache_hash(seq) &
180                                         cache->fci_hash_mask);
181 }
182
183 /*
184  * Check if cache needs to be shrinked. If so - do it. Tries to keep all
185  * collision lists well balanced. That is, check all of them and remove one
186  * entry in list and so on until cache is shrinked enough.
187  */
188 static int fld_cache_shrink(struct fld_cache *cache)
189 {
190         struct fld_cache_entry *flde;
191         struct list_head *curr;
192         int num = 0;
193         ENTRY;
194
195         LASSERT(cache != NULL);
196
197         if (cache->fci_cache_count < cache->fci_cache_size)
198                 RETURN(0);
199
200         curr = cache->fci_lru.prev;
201
202         while (cache->fci_cache_count + cache->fci_threshold >
203                cache->fci_cache_size && curr != &cache->fci_lru)
204         {
205                 flde = list_entry(curr, struct fld_cache_entry, fce_lru);
206                 curr = curr->prev;
207
208                 /* keep inflights */
209                 if (flde->fce_inflight)
210                         continue;
211
212                 hlist_del_init(&flde->fce_list);
213                 list_del_init(&flde->fce_lru);
214                 cache->fci_cache_count--;
215                 OBD_FREE_PTR(flde);
216                 num++;
217         }
218
219         CDEBUG(D_INFO, "%s: FLD cache - Shrinked by "
220                "%d entries\n", cache->fci_name, num);
221
222         RETURN(0);
223 }
224
225 int fld_cache_insert_inflight(struct fld_cache *cache, seqno_t seq)
226 {
227         struct fld_cache_entry *flde, *fldt;
228         struct hlist_head *bucket;
229         struct hlist_node *scan;
230         ENTRY;
231
232         spin_lock(&cache->fci_lock);
233
234         /* Check if cache already has the entry with such a seq. */
235         bucket = fld_cache_bucket(cache, seq);
236         hlist_for_each_entry(fldt, scan, bucket, fce_list) {
237                 if (fldt->fce_seq == seq) {
238                         spin_unlock(&cache->fci_lock);
239                         RETURN(-EEXIST);
240                 }
241         }
242         spin_unlock(&cache->fci_lock);
243
244         /* Allocate new entry. */
245         OBD_ALLOC_PTR(flde);
246         if (!flde)
247                 RETURN(-ENOMEM);
248
249         /*
250          * Check if cache has the entry with such a seq again. It could be added
251          * while we were allocating new entry.
252          */
253         spin_lock(&cache->fci_lock);
254         hlist_for_each_entry(fldt, scan, bucket, fce_list) {
255                 if (fldt->fce_seq == seq) {
256                         spin_unlock(&cache->fci_lock);
257                         OBD_FREE_PTR(flde);
258                         RETURN(0);
259                 }
260         }
261
262         /* Add new entry to cache and lru list. */
263         INIT_HLIST_NODE(&flde->fce_list);
264         flde->fce_inflight = 1;
265         flde->fce_invalid = 1;
266         cfs_waitq_init(&flde->fce_waitq);
267         flde->fce_seq = seq;
268
269         hlist_add_head(&flde->fce_list, bucket);
270         list_add(&flde->fce_lru, &cache->fci_lru);
271         cache->fci_cache_count++;
272
273         spin_unlock(&cache->fci_lock);
274
275         RETURN(0);
276 }
277 EXPORT_SYMBOL(fld_cache_insert_inflight);
278
279 int fld_cache_insert(struct fld_cache *cache,
280                      seqno_t seq, mdsno_t mds)
281 {
282         struct fld_cache_entry *flde, *fldt;
283         struct hlist_head *bucket;
284         struct hlist_node *scan;
285         int rc;
286         ENTRY;
287
288         spin_lock(&cache->fci_lock);
289
290         /* Check if need to shrink cache. */
291         rc = fld_cache_shrink(cache);
292         if (rc) {
293                 spin_unlock(&cache->fci_lock);
294                 RETURN(rc);
295         }
296
297         /* Check if cache already has the entry with such a seq. */
298         bucket = fld_cache_bucket(cache, seq);
299         hlist_for_each_entry(fldt, scan, bucket, fce_list) {
300                 if (fldt->fce_seq == seq) {
301                         if (fldt->fce_inflight) {
302                                 /* set mds for inflight entry */
303                                 fldt->fce_mds = mds;
304                                 fldt->fce_inflight = 0;
305                                 fldt->fce_invalid = 0;
306                                 cfs_waitq_signal(&fldt->fce_waitq);
307                                 rc = 0;
308                         } else
309                                 rc = -EEXIST;
310                         spin_unlock(&cache->fci_lock);
311                         RETURN(rc);
312                 }
313         }
314         spin_unlock(&cache->fci_lock);
315
316         /* Allocate new entry. */
317         OBD_ALLOC_PTR(flde);
318         if (!flde)
319                 RETURN(-ENOMEM);
320
321         /*
322          * Check if cache has the entry with such a seq again. It could be added
323          * while we were allocating new entry.
324          */
325         spin_lock(&cache->fci_lock);
326         hlist_for_each_entry(fldt, scan, bucket, fce_list) {
327                 if (fldt->fce_seq == seq) {
328                         spin_unlock(&cache->fci_lock);
329                         OBD_FREE_PTR(flde);
330                         RETURN(0);
331                 }
332         }
333
334         /* Add new entry to cache and lru list. */
335         INIT_HLIST_NODE(&flde->fce_list);
336         flde->fce_mds = mds;
337         flde->fce_seq = seq;
338         flde->fce_inflight = 0;
339         flde->fce_invalid = 0;
340
341         hlist_add_head(&flde->fce_list, bucket);
342         list_add(&flde->fce_lru, &cache->fci_lru);
343         cache->fci_cache_count++;
344
345         spin_unlock(&cache->fci_lock);
346
347         RETURN(0);
348 }
349 EXPORT_SYMBOL(fld_cache_insert);
350
351 void fld_cache_delete(struct fld_cache *cache, seqno_t seq)
352 {
353         struct fld_cache_entry *flde;
354         struct hlist_node *scan, *n;
355         struct hlist_head *bucket;
356         ENTRY;
357
358         bucket = fld_cache_bucket(cache, seq);
359         
360         spin_lock(&cache->fci_lock);
361         hlist_for_each_entry_safe(flde, scan, n, bucket, fce_list) {
362                 if (flde->fce_seq == seq) {
363                         hlist_del_init(&flde->fce_list);
364                         list_del_init(&flde->fce_lru);
365                         if (flde->fce_inflight) {
366                                 flde->fce_inflight = 0;
367                                 flde->fce_invalid = 1;
368                                 cfs_waitq_signal(&flde->fce_waitq);
369                         }
370                         cache->fci_cache_count--;
371                         OBD_FREE_PTR(flde);
372                         GOTO(out_unlock, 0);
373                 }
374         }
375
376         EXIT;
377 out_unlock:
378         spin_unlock(&cache->fci_lock);
379 }
380 EXPORT_SYMBOL(fld_cache_delete);
381
382 static int fld_check_inflight(struct fld_cache_entry *flde)
383 {
384         return (flde->fce_inflight);
385 }
386
387 int fld_cache_lookup(struct fld_cache *cache,
388                      seqno_t seq, mdsno_t *mds)
389 {
390         struct fld_cache_entry *flde;
391         struct hlist_node *scan, *n;
392         struct hlist_head *bucket;
393         ENTRY;
394
395         bucket = fld_cache_bucket(cache, seq);
396
397         spin_lock(&cache->fci_lock);
398         cache->fci_stat.fst_count++;
399         hlist_for_each_entry_safe(flde, scan, n, bucket, fce_list) {
400                 if (flde->fce_seq == seq) {
401                         if (flde->fce_inflight) {
402                                 /* lookup RPC is inflight need to wait */
403                                 struct l_wait_info lwi;
404                                 spin_unlock(&cache->fci_lock);
405                                 lwi = LWI_TIMEOUT(0, NULL, NULL);
406                                 l_wait_event(flde->fce_waitq,
407                                              !fld_check_inflight(flde), &lwi);
408                                 LASSERT(!flde->fce_inflight);
409                                 if (flde->fce_invalid) 
410                                         RETURN(-ENOENT);
411                                 
412                                 *mds = flde->fce_mds;
413                                 cache->fci_stat.fst_inflight++;
414                         } else {
415                                 LASSERT(!flde->fce_invalid);
416                                 *mds = flde->fce_mds;
417                                 list_del(&flde->fce_lru);
418                                 list_add(&flde->fce_lru, &cache->fci_lru);
419                                 cache->fci_stat.fst_cache++;
420                                 spin_unlock(&cache->fci_lock);
421                         }
422                         RETURN(0);
423                 }
424         }
425         spin_unlock(&cache->fci_lock);
426         RETURN(-ENOENT);
427 }
428 EXPORT_SYMBOL(fld_cache_lookup);
429 #else
430 int fld_cache_insert_inflight(struct fld_cache *cache, seqno_t seq)
431 {
432         return -ENOTSUPP;
433 }
434 EXPORT_SYMBOL(fld_cache_insert_inflight);
435
436 int fld_cache_insert(struct fld_cache *cache,
437                      seqno_t seq, mdsno_t mds)
438 {
439         return -ENOTSUPP;
440 }
441 EXPORT_SYMBOL(fld_cache_insert);
442
443 void fld_cache_delete(struct fld_cache *cache,
444                       seqno_t seq)
445 {
446         return;
447 }
448 EXPORT_SYMBOL(fld_cache_delete);
449
450 int fld_cache_lookup(struct fld_cache *cache,
451                      seqno_t seq, mdsno_t *mds)
452 {
453         return -ENOTSUPP;
454 }
455 EXPORT_SYMBOL(fld_cache_lookup);
456 #endif