Whamcloud - gitweb
b=14340
[fs/lustre-release.git] / lustre / obdclass / lustre_handles.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Phil Schwan <phil@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  */
25
26 #define DEBUG_SUBSYSTEM S_CLASS
27 #ifndef __KERNEL__
28 # include <liblustre.h>
29 #endif
30
31 #include <obd_support.h>
32 #include <lustre_handles.h>
33 #include <lustre_lib.h>
34
35 #if !defined(HAVE_RCU) || !defined(__KERNEL__)
36 # define list_add_rcu            list_add
37 # define list_del_rcu            list_del
38 # define list_for_each_rcu       list_for_each
39 # define list_for_each_safe_rcu  list_for_each_safe
40 # define rcu_read_lock()         spin_lock(&bucket->lock)
41 # define rcu_read_unlock()       spin_unlock(&bucket->lock)
42 #endif /* ifndef HAVE_RCU */
43
44 static __u64 handle_base;
45 #define HANDLE_INCR 7
46 static spinlock_t handle_base_lock;
47
48 static struct handle_bucket {
49         spinlock_t lock;
50         struct list_head head;
51 } *handle_hash;
52
53 static atomic_t handle_count = ATOMIC_INIT(0);
54
55 #ifdef __arch_um__
56 /* For unknown reason, UML uses kmalloc rather than vmalloc to allocate
57  * memory(OBD_VMALLOC). Therefore, we have to redefine the
58  * HANDLE_HASH_SIZE to make the hash heads don't exceed 128K.
59  */
60 #define HANDLE_HASH_SIZE 4096
61 #else
62 #define HANDLE_HASH_SIZE (1 << 14)
63 #endif /* ifdef __arch_um__ */
64
65 #define HANDLE_HASH_MASK (HANDLE_HASH_SIZE - 1)
66
67 /*
68  * Generate a unique 64bit cookie (hash) for a handle and insert it into
69  * global (per-node) hash-table.
70  */
71 void class_handle_hash(struct portals_handle *h, portals_handle_addref_cb cb)
72 {
73         struct handle_bucket *bucket;
74         ENTRY;
75
76         LASSERT(h != NULL);
77         LASSERT(list_empty(&h->h_link));
78
79         /*
80          * This is fast, but simplistic cookie generation algorithm, it will
81          * need a re-do at some point in the future for security.
82          */
83         spin_lock(&handle_base_lock);
84         handle_base += HANDLE_INCR;
85
86         h->h_cookie = handle_base;
87         if (unlikely(handle_base == 0)) {
88                 /*
89                  * Cookie of zero is "dangerous", because in many places it's
90                  * assumed that 0 means "unassigned" handle, not bound to any
91                  * object.
92                  */
93                 CWARN("The universe has been exhausted: cookie wrap-around.\n");
94                 handle_base += HANDLE_INCR;
95         }
96         spin_unlock(&handle_base_lock);
97  
98         atomic_inc(&handle_count);
99         h->h_addref = cb;
100         spin_lock_init(&h->h_lock);
101
102         bucket = &handle_hash[h->h_cookie & HANDLE_HASH_MASK];
103         spin_lock(&bucket->lock);
104         list_add_rcu(&h->h_link, &bucket->head);
105         h->h_in = 1;
106         spin_unlock(&bucket->lock);
107
108         CDEBUG(D_INFO, "added object %p with handle "LPX64" to hash\n",
109                h, h->h_cookie);
110         EXIT;
111 }
112
113 static void class_handle_unhash_nolock(struct portals_handle *h)
114 {
115         if (list_empty(&h->h_link)) {
116                 CERROR("removing an already-removed handle ("LPX64")\n",
117                        h->h_cookie);
118                 return;
119         }
120
121         CDEBUG(D_INFO, "removing object %p with handle "LPX64" from hash\n",
122                h, h->h_cookie);
123
124         spin_lock(&h->h_lock);
125         if (h->h_in == 0) {
126                 spin_unlock(&h->h_lock);
127                 return;
128         }
129         h->h_in = 0;
130         spin_unlock(&h->h_lock);
131         list_del_rcu(&h->h_link);
132 }
133
134 void class_handle_unhash(struct portals_handle *h)
135 {
136         struct handle_bucket *bucket;
137         bucket = handle_hash + (h->h_cookie & HANDLE_HASH_MASK);
138
139         spin_lock(&bucket->lock);
140         class_handle_unhash_nolock(h);
141         spin_unlock(&bucket->lock);
142
143         atomic_dec(&handle_count);
144 }
145
146 void class_handle_hash_back(struct portals_handle *h)
147 {
148         struct handle_bucket *bucket;
149         ENTRY;
150
151         bucket = handle_hash + (h->h_cookie & HANDLE_HASH_MASK);
152
153         atomic_inc(&handle_count);
154         spin_lock(&bucket->lock);
155         list_add_rcu(&h->h_link, &bucket->head);
156         h->h_in = 1;
157         spin_unlock(&bucket->lock);
158
159         EXIT;
160 }
161
162 void *class_handle2object(__u64 cookie)
163 {
164         struct handle_bucket *bucket;
165         struct list_head *tmp;
166         void *retval = NULL;
167         ENTRY;
168
169         LASSERT(handle_hash != NULL);
170
171         /* Be careful when you want to change this code. See the 
172          * rcu_read_lock() definition on top this file. - jxiong */
173         bucket = handle_hash + (cookie & HANDLE_HASH_MASK);
174
175         rcu_read_lock();
176         list_for_each_rcu(tmp, &bucket->head) {
177                 struct portals_handle *h;
178                 h = list_entry(tmp, struct portals_handle, h_link);
179                 if (h->h_cookie != cookie)
180                         continue;
181
182                 spin_lock(&h->h_lock);
183                 if (likely(h->h_cookie != 0)) {
184                         h->h_addref(h);
185                         retval = h;
186                 }
187                 spin_unlock(&h->h_lock);
188                 break;
189         }
190         rcu_read_unlock();
191
192         RETURN(retval);
193 }
194
195 void class_handle_free_cb(struct rcu_head *rcu)
196 {
197         struct portals_handle *h = RCU2HANDLE(rcu);
198         if (h->h_free_cb) {
199                 h->h_free_cb(h->h_ptr, h->h_size);
200         } else {
201                 void *ptr = h->h_ptr;
202                 unsigned int size = h->h_size;
203                 OBD_FREE(ptr, size);
204         }
205 }
206
207 int class_handle_init(void)
208 {
209         struct handle_bucket *bucket;
210
211         LASSERT(handle_hash == NULL);
212
213         OBD_VMALLOC(handle_hash, sizeof(*bucket) * HANDLE_HASH_SIZE);
214         if (handle_hash == NULL)
215                 return -ENOMEM;
216
217         spin_lock_init(&handle_base_lock);
218         for (bucket = handle_hash + HANDLE_HASH_SIZE - 1; bucket >= handle_hash;
219              bucket--) {
220                 CFS_INIT_LIST_HEAD(&bucket->head);
221                 spin_lock_init(&bucket->lock);
222         }
223         ll_get_random_bytes(&handle_base, sizeof(handle_base));
224         LASSERT(handle_base != 0ULL);
225
226         return 0;
227 }
228
229 static void cleanup_all_handles(void)
230 {
231         int i;
232
233         for (i = 0; i < HANDLE_HASH_SIZE; i++) {
234                 struct list_head *tmp, *pos;
235                 spin_lock(&handle_hash[i].lock);
236                 list_for_each_safe_rcu(tmp, pos, &(handle_hash[i].head)) {
237                         struct portals_handle *h;
238                         h = list_entry(tmp, struct portals_handle, h_link);
239
240                         CERROR("force clean handle "LPX64" addr %p addref %p\n",
241                                h->h_cookie, h, h->h_addref);
242
243                         class_handle_unhash_nolock(h);
244                 }
245                 spin_unlock(&handle_hash[i].lock);
246         }
247 }
248
249 void class_handle_cleanup(void)
250 {
251         int count;
252         LASSERT(handle_hash != NULL);
253
254         count = atomic_read(&handle_count);
255         if (count != 0) {
256                 CERROR("handle_count at cleanup: %d\n", count);
257                 cleanup_all_handles();
258         }
259
260         OBD_VFREE(handle_hash, sizeof(*handle_hash) * HANDLE_HASH_SIZE);
261         handle_hash = NULL;
262
263         if (atomic_read(&handle_count))
264                 CERROR("leaked %d handles\n", atomic_read(&handle_count));
265 }