Whamcloud - gitweb
- add resource and lock counters to the namespace
[fs/lustre-release.git] / lustre / ldlm / ldlm_resource.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define DEBUG_SUBSYSTEM S_LDLM
13
14 #include <linux/lustre_dlm.h>
15 #include <linux/obd_class.h>
16
17 kmem_cache_t *ldlm_resource_slab, *ldlm_lock_slab;
18
19 spinlock_t ldlm_namespace_lock = SPIN_LOCK_UNLOCKED;
20 struct list_head ldlm_namespace_list = LIST_HEAD_INIT(ldlm_namespace_list);
21 static struct proc_dir_entry *ldlm_ns_proc_dir = NULL;
22
23 int ldlm_proc_setup(struct obd_device *obd)
24 {
25         ENTRY;
26
27         if (obd->obd_proc_entry == NULL)
28                 RETURN(-EINVAL);
29
30         ldlm_ns_proc_dir = proc_mkdir("namespaces", obd->obd_proc_entry);
31         if (ldlm_ns_proc_dir == NULL) {
32                 CERROR("Couldn't create /proc/lustre/ldlm/namespaces\n");
33                 RETURN(-EPERM);
34         }
35         RETURN(0);
36 }
37
38 void ldlm_proc_cleanup(struct obd_device *obd)
39 {
40         proc_lustre_remove_obd_entry("namespaces", obd);
41 }
42
43 /* FIXME: This can go away when we start to really use lprocfs */
44 static int lprocfs_ll_rd(char *page, char **start, off_t off,
45                          int count, int *eof, void *data)
46 {
47         int len;
48         __u64 *temp = (__u64 *)data;
49
50         len = snprintf(page, count, "%Lu\n", *temp);
51
52         return len;
53 }
54
55 struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client)
56 {
57         struct ldlm_namespace *ns = NULL;
58         struct list_head *bucket;
59         struct proc_dir_entry *proc_entry;
60
61         OBD_ALLOC(ns, sizeof(*ns));
62         if (!ns) {
63                 LBUG();
64                 GOTO(out, NULL);
65         }
66
67         ns->ns_hash = vmalloc(sizeof(*ns->ns_hash) * RES_HASH_SIZE);
68         if (!ns->ns_hash) {
69                 LBUG();
70                 GOTO(out, ns);
71         }
72         obd_memory += sizeof(*ns->ns_hash) * RES_HASH_SIZE;
73
74         OBD_ALLOC(ns->ns_name, strlen(name) + 1);
75         if (!ns->ns_name) {
76                 LBUG();
77                 GOTO(out, ns);
78         }
79         strcpy(ns->ns_name, name);
80
81         INIT_LIST_HEAD(&ns->ns_root_list);
82         l_lock_init(&ns->ns_lock);
83         ns->ns_refcount = 0;
84         ns->ns_client = client;
85         spin_lock_init(&ns->ns_counter_lock);
86         ns->ns_locks = 0;
87         ns->ns_resources = 0;
88
89         for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash;
90              bucket--)
91                 INIT_LIST_HEAD(bucket);
92
93         spin_lock(&ldlm_namespace_lock);
94         list_add(&ns->ns_list_chain, &ldlm_namespace_list);
95         spin_unlock(&ldlm_namespace_lock);
96
97         ns->ns_proc_dir = proc_mkdir(ns->ns_name, ldlm_ns_proc_dir);
98         if (ns->ns_proc_dir == NULL)
99                 CERROR("Unable to create proc directory for namespace.\n");
100         proc_entry = create_proc_entry("resource_count", 0444, ns->ns_proc_dir);
101         proc_entry->read_proc = lprocfs_ll_rd;
102         proc_entry->data = &ns->ns_resources;
103         proc_entry = create_proc_entry("lock_count", 0444, ns->ns_proc_dir);
104         proc_entry->read_proc = lprocfs_ll_rd;
105         proc_entry->data = &ns->ns_locks;
106
107         RETURN(ns);
108
109  out:
110         if (ns && ns->ns_hash) {
111                 vfree(ns->ns_hash);
112                 obd_memory -= sizeof(*ns->ns_hash) * RES_HASH_SIZE;
113         }
114         if (ns && ns->ns_name)
115                 OBD_FREE(ns->ns_name, strlen(name) + 1);
116         if (ns)
117                 OBD_FREE(ns, sizeof(*ns));
118         return NULL;
119 }
120
121 extern struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock);
122
123 static void cleanup_resource(struct ldlm_resource *res, struct list_head *q)
124 {
125         struct list_head *tmp, *pos;
126         int rc = 0, client = res->lr_namespace->ns_client;
127         ENTRY;
128
129         list_for_each_safe(tmp, pos, q) {
130                 struct ldlm_lock *lock;
131                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
132                 LDLM_LOCK_GET(lock);
133
134                 if (client) {
135                         struct lustre_handle lockh;
136                         ldlm_lock2handle(lock, &lockh);
137                         /* can we get away without a connh here? */
138                         rc = ldlm_cli_cancel(&lockh);
139                         if (rc != ELDLM_OK) {
140                                 /* It failed remotely, but we'll force it to
141                                  * cleanup locally. */
142                                 CERROR("ldlm_cli_cancel: %d\n", rc);
143                                 ldlm_lock_cancel(lock);
144                         }
145                 } else {
146                         LDLM_DEBUG(lock, "Freeing a lock still held by a "
147                                    "client node.\n");
148
149                         ldlm_resource_unlink_lock(lock);
150                         ldlm_lock_destroy(lock);
151                 }
152                 LDLM_LOCK_PUT(lock);
153         }
154 }
155
156 int ldlm_namespace_free(struct ldlm_namespace *ns)
157 {
158         struct list_head *tmp, *pos;
159         int i;
160
161         if (!ns)
162                 RETURN(ELDLM_OK);
163
164         spin_lock(&ldlm_namespace_lock);
165         list_del(&ns->ns_list_chain);
166         remove_proc_entry("resource_count", ns->ns_proc_dir);
167         remove_proc_entry("lock_count", ns->ns_proc_dir);
168         remove_proc_entry(ns->ns_name, ldlm_ns_proc_dir);
169         spin_unlock(&ldlm_namespace_lock);
170
171         l_lock(&ns->ns_lock);
172
173         for (i = 0; i < RES_HASH_SIZE; i++) {
174                 list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
175                         struct ldlm_resource *res;
176                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
177                         ldlm_resource_getref(res);
178
179                         cleanup_resource(res, &res->lr_granted);
180                         cleanup_resource(res, &res->lr_converting);
181                         cleanup_resource(res, &res->lr_waiting);
182
183                         if (!ldlm_resource_put(res)) {
184                                 CERROR("Resource refcount nonzero (%d) after "
185                                        "lock cleanup; forcing cleanup.\n",
186                                        atomic_read(&res->lr_refcount));
187                                 ldlm_resource_dump(res);
188                                 atomic_set(&res->lr_refcount, 1);
189                                 ldlm_resource_put(res);
190                         }
191                 }
192         }
193
194         vfree(ns->ns_hash /* , sizeof(*ns->ns_hash) * RES_HASH_SIZE */);
195         obd_memory -= sizeof(*ns->ns_hash) * RES_HASH_SIZE;
196         OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1);
197         OBD_FREE(ns, sizeof(*ns));
198
199         return ELDLM_OK;
200 }
201
202 int ldlm_client_free(struct obd_export *exp)
203 {
204         struct ldlm_export_data *led = &exp->exp_ldlm_data;
205         ptlrpc_cleanup_client(&led->led_import);
206         RETURN(0);
207 }
208
209 static __u32 ldlm_hash_fn(struct ldlm_resource *parent, __u64 *name)
210 {
211         __u32 hash = 0;
212         int i;
213
214         for (i = 0; i < RES_NAME_SIZE; i++)
215                 hash += name[i];
216
217         hash += (__u32)((unsigned long)parent >> 4);
218
219         return (hash & RES_HASH_MASK);
220 }
221
222 static struct ldlm_resource *ldlm_resource_new(void)
223 {
224         struct ldlm_resource *res;
225
226         res = kmem_cache_alloc(ldlm_resource_slab, SLAB_KERNEL);
227         if (res == NULL) {
228                 LBUG();
229                 return NULL;
230         }
231         memset(res, 0, sizeof(*res));
232
233         INIT_LIST_HEAD(&res->lr_children);
234         INIT_LIST_HEAD(&res->lr_childof);
235         INIT_LIST_HEAD(&res->lr_granted);
236         INIT_LIST_HEAD(&res->lr_converting);
237         INIT_LIST_HEAD(&res->lr_waiting);
238
239         atomic_set(&res->lr_refcount, 1);
240
241         return res;
242 }
243
244 /* Args: locked namespace
245  * Returns: newly-allocated, referenced, unlocked resource */
246 static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns,
247                                                struct ldlm_resource *parent,
248                                                __u64 *name, __u32 type)
249 {
250         struct list_head *bucket;
251         struct ldlm_resource *res;
252         ENTRY;
253
254         if (type < LDLM_MIN_TYPE || type > LDLM_MAX_TYPE) {
255                 LBUG();
256                 RETURN(NULL);
257         }
258
259         res = ldlm_resource_new();
260         if (!res) {
261                 LBUG();
262                 RETURN(NULL);
263         }
264
265         spin_lock(&ns->ns_counter_lock);
266         ns->ns_resources++;
267         spin_unlock(&ns->ns_counter_lock);
268
269         memcpy(res->lr_name, name, sizeof(res->lr_name));
270         res->lr_namespace = ns;
271         ns->ns_refcount++;
272
273         res->lr_type = type;
274         res->lr_most_restr = LCK_NL;
275
276         bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
277         list_add(&res->lr_hash, bucket);
278
279         if (parent == NULL)
280                 list_add(&res->lr_childof, &ns->ns_root_list);
281         else {
282                 res->lr_parent = parent;
283                 list_add(&res->lr_childof, &parent->lr_children);
284         }
285
286         RETURN(res);
287 }
288
289 /* Args: unlocked namespace
290  * Locks: takes and releases ns->ns_lock and res->lr_lock
291  * Returns: referenced, unlocked ldlm_resource or NULL */
292 struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
293                                         struct ldlm_resource *parent,
294                                         __u64 *name, __u32 type, int create)
295 {
296         struct list_head *bucket;
297         struct list_head *tmp = bucket;
298         struct ldlm_resource *res = NULL;
299         ENTRY;
300
301         if (ns == NULL || ns->ns_hash == NULL) {
302                 LBUG();
303                 RETURN(NULL);
304         }
305
306         l_lock(&ns->ns_lock);
307         bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
308
309         list_for_each(tmp, bucket) {
310                 struct ldlm_resource *chk;
311                 chk = list_entry(tmp, struct ldlm_resource, lr_hash);
312
313                 if (memcmp(chk->lr_name, name, sizeof(chk->lr_name)) == 0) {
314                         res = chk;
315                         atomic_inc(&res->lr_refcount);
316                         EXIT;
317                         break;
318                 }
319         }
320
321         if (res == NULL && create)
322                 res = ldlm_resource_add(ns, parent, name, type);
323         l_unlock(&ns->ns_lock);
324
325         RETURN(res);
326 }
327
328 struct ldlm_resource *ldlm_resource_getref(struct ldlm_resource *res)
329 {
330         atomic_inc(&res->lr_refcount);
331         return res;
332 }
333
334 /* Returns 1 if the resource was freed, 0 if it remains. */
335 int ldlm_resource_put(struct ldlm_resource *res)
336 {
337         int rc = 0;
338
339         if (atomic_dec_and_test(&res->lr_refcount)) {
340                 struct ldlm_namespace *ns = res->lr_namespace;
341                 ENTRY;
342
343                 l_lock(&ns->ns_lock);
344
345                 if (atomic_read(&res->lr_refcount) != 0) {
346                         /* We lost the race. */
347                         l_unlock(&ns->ns_lock);
348                         goto out;
349                 }
350
351                 if (!list_empty(&res->lr_granted))
352                         LBUG();
353
354                 if (!list_empty(&res->lr_converting))
355                         LBUG();
356
357                 if (!list_empty(&res->lr_waiting))
358                         LBUG();
359
360                 if (!list_empty(&res->lr_children))
361                         LBUG();
362
363                 ns->ns_refcount--;
364                 list_del(&res->lr_hash);
365                 list_del(&res->lr_childof);
366
367                 kmem_cache_free(ldlm_resource_slab, res);
368                 l_unlock(&ns->ns_lock);
369
370                 spin_lock(&ns->ns_counter_lock);
371                 ns->ns_resources--;
372                 spin_unlock(&ns->ns_counter_lock);
373
374                 rc = 1;
375         } else {
376                 ENTRY;
377         out:
378                 if (atomic_read(&res->lr_refcount) < 0)
379                         LBUG();
380         }
381
382         RETURN(rc);
383 }
384
385 void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
386                             struct ldlm_lock *lock)
387 {
388         l_lock(&res->lr_namespace->ns_lock);
389
390         ldlm_resource_dump(res);
391         ldlm_lock_dump(lock);
392
393         if (!list_empty(&lock->l_res_link))
394                 LBUG();
395
396         list_add(&lock->l_res_link, head);
397         l_unlock(&res->lr_namespace->ns_lock);
398 }
399
400 void ldlm_resource_unlink_lock(struct ldlm_lock *lock)
401 {
402         l_lock(&lock->l_resource->lr_namespace->ns_lock);
403         list_del_init(&lock->l_res_link);
404         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
405 }
406
407 void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
408 {
409         desc->lr_type = res->lr_type;
410         memcpy(desc->lr_name, res->lr_name, sizeof(desc->lr_name));
411         memcpy(desc->lr_version, res->lr_version, sizeof(desc->lr_version));
412 }
413
414 void ldlm_dump_all_namespaces(void)
415 {
416         struct list_head *tmp;
417
418         spin_lock(&ldlm_namespace_lock);
419
420         list_for_each(tmp, &ldlm_namespace_list) {
421                 struct ldlm_namespace *ns;
422                 ns = list_entry(tmp, struct ldlm_namespace, ns_list_chain);
423                 ldlm_namespace_dump(ns);
424         }
425
426         spin_unlock(&ldlm_namespace_lock);
427 }
428
429 void ldlm_namespace_dump(struct ldlm_namespace *ns)
430 {
431         struct list_head *tmp;
432
433         l_lock(&ns->ns_lock);
434         CDEBUG(D_OTHER, "--- Namespace: %s (rc: %d, client: %d)\n", ns->ns_name,
435                ns->ns_refcount, ns->ns_client);
436
437         list_for_each(tmp, &ns->ns_root_list) {
438                 struct ldlm_resource *res;
439                 res = list_entry(tmp, struct ldlm_resource, lr_childof);
440
441                 /* Once we have resources with children, this should really dump
442                  * them recursively. */
443                 ldlm_resource_dump(res);
444         }
445         l_unlock(&ns->ns_lock);
446 }
447
448 void ldlm_resource_dump(struct ldlm_resource *res)
449 {
450         struct list_head *tmp;
451         char name[256];
452
453         if (RES_NAME_SIZE != 3)
454                 LBUG();
455
456         snprintf(name, sizeof(name), "%Lx %Lx %Lx",
457                  (unsigned long long)res->lr_name[0],
458                  (unsigned long long)res->lr_name[1],
459                  (unsigned long long)res->lr_name[2]);
460
461         CDEBUG(D_OTHER, "--- Resource: %p (%s) (rc: %d)\n", res, name,
462                atomic_read(&res->lr_refcount));
463         CDEBUG(D_OTHER, "Namespace: %p (%s)\n", res->lr_namespace,
464                res->lr_namespace->ns_name);
465         CDEBUG(D_OTHER, "Parent: %p, root: %p\n", res->lr_parent, res->lr_root);
466
467         CDEBUG(D_OTHER, "Granted locks:\n");
468         list_for_each(tmp, &res->lr_granted) {
469                 struct ldlm_lock *lock;
470                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
471                 ldlm_lock_dump(lock);
472         }
473
474         CDEBUG(D_OTHER, "Converting locks:\n");
475         list_for_each(tmp, &res->lr_converting) {
476                 struct ldlm_lock *lock;
477                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
478                 ldlm_lock_dump(lock);
479         }
480
481         CDEBUG(D_OTHER, "Waiting locks:\n");
482         list_for_each(tmp, &res->lr_waiting) {
483                 struct ldlm_lock *lock;
484                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
485                 ldlm_lock_dump(lock);
486         }
487 }