Whamcloud - gitweb
- Replace per-namespace recursive lock with an ldlm-global lock, to close the
[fs/lustre-release.git] / lustre / ldlm / ldlm_resource.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define DEBUG_SUBSYSTEM S_LDLM
13
14 #include <linux/lustre_dlm.h>
15 #include <linux/obd_class.h>
16
17 kmem_cache_t *ldlm_resource_slab, *ldlm_lock_slab;
18
19 spinlock_t ldlm_namespace_lock = SPIN_LOCK_UNLOCKED;
20 struct list_head ldlm_namespace_list = LIST_HEAD_INIT(ldlm_namespace_list);
21 static struct proc_dir_entry *ldlm_ns_proc_dir = NULL;
22
23 int ldlm_proc_setup(struct obd_device *obd)
24 {
25         ENTRY;
26
27         if (obd->obd_proc_entry == NULL)
28                 RETURN(-EINVAL);
29
30         ldlm_ns_proc_dir = proc_mkdir("namespaces", obd->obd_proc_entry);
31         if (ldlm_ns_proc_dir == NULL) {
32                 CERROR("Couldn't create /proc/lustre/ldlm/namespaces\n");
33                 RETURN(-EPERM);
34         }
35         RETURN(0);
36 }
37
38 void ldlm_proc_cleanup(struct obd_device *obd)
39 {
40         proc_lustre_remove_obd_entry("namespaces", obd);
41 }
42
43 /* FIXME: This can go away when we start to really use lprocfs */
44 static int lprocfs_ll_rd(char *page, char **start, off_t off,
45                          int count, int *eof, void *data)
46 {
47         int len;
48         __u64 *temp = (__u64 *)data;
49
50         len = snprintf(page, count, "%Lu\n", *temp);
51
52         return len;
53 }
54
55 struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client)
56 {
57         struct ldlm_namespace *ns = NULL;
58         struct list_head *bucket;
59         struct proc_dir_entry *proc_entry;
60
61         OBD_ALLOC(ns, sizeof(*ns));
62         if (!ns) {
63                 LBUG();
64                 GOTO(out, NULL);
65         }
66
67         ns->ns_hash = vmalloc(sizeof(*ns->ns_hash) * RES_HASH_SIZE);
68         if (!ns->ns_hash) {
69                 LBUG();
70                 GOTO(out, ns);
71         }
72         obd_memory += sizeof(*ns->ns_hash) * RES_HASH_SIZE;
73
74         OBD_ALLOC(ns->ns_name, strlen(name) + 1);
75         if (!ns->ns_name) {
76                 LBUG();
77                 GOTO(out, ns);
78         }
79         strcpy(ns->ns_name, name);
80
81         INIT_LIST_HEAD(&ns->ns_root_list);
82         ns->ns_refcount = 0;
83         ns->ns_client = client;
84         spin_lock_init(&ns->ns_counter_lock);
85         ns->ns_locks = 0;
86         ns->ns_resources = 0;
87
88         for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash;
89              bucket--)
90                 INIT_LIST_HEAD(bucket);
91
92         spin_lock(&ldlm_namespace_lock);
93         list_add(&ns->ns_list_chain, &ldlm_namespace_list);
94         spin_unlock(&ldlm_namespace_lock);
95
96         ns->ns_proc_dir = proc_mkdir(ns->ns_name, ldlm_ns_proc_dir);
97         if (ns->ns_proc_dir == NULL)
98                 CERROR("Unable to create proc directory for namespace.\n");
99         proc_entry = create_proc_entry("resource_count", 0444, ns->ns_proc_dir);
100         proc_entry->read_proc = lprocfs_ll_rd;
101         proc_entry->data = &ns->ns_resources;
102         proc_entry = create_proc_entry("lock_count", 0444, ns->ns_proc_dir);
103         proc_entry->read_proc = lprocfs_ll_rd;
104         proc_entry->data = &ns->ns_locks;
105
106         RETURN(ns);
107
108  out:
109         if (ns && ns->ns_hash) {
110                 vfree(ns->ns_hash);
111                 obd_memory -= sizeof(*ns->ns_hash) * RES_HASH_SIZE;
112         }
113         if (ns && ns->ns_name)
114                 OBD_FREE(ns->ns_name, strlen(name) + 1);
115         if (ns)
116                 OBD_FREE(ns, sizeof(*ns));
117         return NULL;
118 }
119
120 extern struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock);
121
122 /* If 'local_only' is true, don't try to tell the server, just cleanup. */
123 static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
124                              int local_only)
125 {
126         struct list_head *tmp, *pos;
127         int rc = 0, client = res->lr_namespace->ns_client;
128         ENTRY;
129
130         list_for_each_safe(tmp, pos, q) {
131                 struct ldlm_lock *lock;
132                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
133                 LDLM_LOCK_GET(lock);
134
135                 if (client) {
136                         struct lustre_handle lockh;
137                         ldlm_lock2handle(lock, &lockh);
138                         if (!local_only) {
139                                 rc = ldlm_cli_cancel(&lockh);
140                                 if (rc)
141                                         CERROR("ldlm_cli_cancel: %d\n", rc);
142                         }
143                         /* Force local cleanup on errors, too. */
144                         if (local_only || rc != ELDLM_OK)
145                                 ldlm_lock_cancel(lock);
146                 } else {
147                         LDLM_DEBUG(lock, "Freeing a lock still held by a "
148                                    "client node.\n");
149
150                         ldlm_resource_unlink_lock(lock);
151                         ldlm_lock_destroy(lock);
152                 }
153                 LDLM_LOCK_PUT(lock);
154         }
155 }
156
157 int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int local_only)
158 {
159         int i;
160
161         l_lock(&ldlm_everything_lock);
162         for (i = 0; i < RES_HASH_SIZE; i++) {
163                 struct list_head *tmp, *pos;
164                 list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
165                         struct ldlm_resource *res;
166                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
167                         ldlm_resource_getref(res);
168
169                         cleanup_resource(res, &res->lr_granted, local_only);
170                         cleanup_resource(res, &res->lr_converting, local_only);
171                         cleanup_resource(res, &res->lr_waiting, local_only);
172
173                         /* XXX this is a bit counter-intuitive and should
174                          * probably be cleaner: don't force cleanup if we're
175                          * local_only (which is only used by recovery).  We
176                          * probably still have outstanding lock refs which
177                          * reference these resources. -phil */
178                         if (!ldlm_resource_put(res) && !local_only) {
179                                 CERROR("Resource refcount nonzero (%d) after "
180                                        "lock cleanup; forcing cleanup.\n",
181                                        atomic_read(&res->lr_refcount));
182                                 ldlm_resource_dump(res);
183                                 atomic_set(&res->lr_refcount, 1);
184                                 ldlm_resource_put(res);
185                         }
186                 }
187         }
188         l_unlock(&ldlm_everything_lock);
189
190         return ELDLM_OK;
191 }
192
193 /* Cleanup, but also free, the namespace */
194 int ldlm_namespace_free(struct ldlm_namespace *ns)
195 {
196         if (!ns)
197                 RETURN(ELDLM_OK);
198
199         spin_lock(&ldlm_namespace_lock);
200         list_del(&ns->ns_list_chain);
201         remove_proc_entry("resource_count", ns->ns_proc_dir);
202         remove_proc_entry("lock_count", ns->ns_proc_dir);
203         remove_proc_entry(ns->ns_name, ldlm_ns_proc_dir);
204         spin_unlock(&ldlm_namespace_lock);
205
206         ldlm_namespace_cleanup(ns, 0);
207
208         vfree(ns->ns_hash /* , sizeof(*ns->ns_hash) * RES_HASH_SIZE */);
209         obd_memory -= sizeof(*ns->ns_hash) * RES_HASH_SIZE;
210         OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1);
211         OBD_FREE(ns, sizeof(*ns));
212
213         return ELDLM_OK;
214 }
215
216 int ldlm_client_free(struct obd_export *exp)
217 {
218         struct ldlm_export_data *led = &exp->exp_ldlm_data;
219         ptlrpc_cleanup_client(&led->led_import);
220         RETURN(0);
221 }
222
223 static __u32 ldlm_hash_fn(struct ldlm_resource *parent, __u64 *name)
224 {
225         __u32 hash = 0;
226         int i;
227
228         for (i = 0; i < RES_NAME_SIZE; i++)
229                 hash += name[i];
230
231         hash += (__u32)((unsigned long)parent >> 4);
232
233         return (hash & RES_HASH_MASK);
234 }
235
236 static struct ldlm_resource *ldlm_resource_new(void)
237 {
238         struct ldlm_resource *res;
239
240         res = kmem_cache_alloc(ldlm_resource_slab, SLAB_KERNEL);
241         if (res == NULL) {
242                 LBUG();
243                 return NULL;
244         }
245         memset(res, 0, sizeof(*res));
246
247         INIT_LIST_HEAD(&res->lr_children);
248         INIT_LIST_HEAD(&res->lr_childof);
249         INIT_LIST_HEAD(&res->lr_granted);
250         INIT_LIST_HEAD(&res->lr_converting);
251         INIT_LIST_HEAD(&res->lr_waiting);
252
253         atomic_set(&res->lr_refcount, 1);
254
255         return res;
256 }
257
258 /* Args: locked namespace
259  * Returns: newly-allocated, referenced, unlocked resource */
260 static struct ldlm_resource *ldlm_resource_add(struct ldlm_namespace *ns,
261                                                struct ldlm_resource *parent,
262                                                __u64 *name, __u32 type)
263 {
264         struct list_head *bucket;
265         struct ldlm_resource *res;
266         ENTRY;
267
268         if (type < LDLM_MIN_TYPE || type > LDLM_MAX_TYPE) {
269                 LBUG();
270                 RETURN(NULL);
271         }
272
273         res = ldlm_resource_new();
274         if (!res) {
275                 LBUG();
276                 RETURN(NULL);
277         }
278
279         spin_lock(&ns->ns_counter_lock);
280         ns->ns_resources++;
281         spin_unlock(&ns->ns_counter_lock);
282
283         memcpy(res->lr_name, name, sizeof(res->lr_name));
284         res->lr_namespace = ns;
285         ns->ns_refcount++;
286
287         res->lr_type = type;
288         res->lr_most_restr = LCK_NL;
289
290         bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
291         list_add(&res->lr_hash, bucket);
292
293         if (parent == NULL)
294                 list_add(&res->lr_childof, &ns->ns_root_list);
295         else {
296                 res->lr_parent = parent;
297                 list_add(&res->lr_childof, &parent->lr_children);
298         }
299
300         RETURN(res);
301 }
302
303 /* Args: unlocked namespace
304  * Locks: takes and releases ns->ns_lock and res->lr_lock
305  * Returns: referenced, unlocked ldlm_resource or NULL */
306 struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
307                                         struct ldlm_resource *parent,
308                                         __u64 *name, __u32 type, int create)
309 {
310         struct list_head *bucket;
311         struct list_head *tmp = bucket;
312         struct ldlm_resource *res = NULL;
313         ENTRY;
314
315         if (ns == NULL || ns->ns_hash == NULL) {
316                 LBUG();
317                 RETURN(NULL);
318         }
319
320         l_lock(&ldlm_everything_lock);
321         bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
322
323         list_for_each(tmp, bucket) {
324                 struct ldlm_resource *chk;
325                 chk = list_entry(tmp, struct ldlm_resource, lr_hash);
326
327                 if (memcmp(chk->lr_name, name, sizeof(chk->lr_name)) == 0) {
328                         res = chk;
329                         atomic_inc(&res->lr_refcount);
330                         EXIT;
331                         break;
332                 }
333         }
334
335         if (res == NULL && create)
336                 res = ldlm_resource_add(ns, parent, name, type);
337         l_unlock(&ldlm_everything_lock);
338
339         RETURN(res);
340 }
341
342 struct ldlm_resource *ldlm_resource_getref(struct ldlm_resource *res)
343 {
344         atomic_inc(&res->lr_refcount);
345         return res;
346 }
347
348 /* Returns 1 if the resource was freed, 0 if it remains. */
349 int ldlm_resource_put(struct ldlm_resource *res)
350 {
351         int rc = 0;
352
353         if (atomic_dec_and_test(&res->lr_refcount)) {
354                 struct ldlm_namespace *ns = res->lr_namespace;
355                 ENTRY;
356
357                 l_lock(&ldlm_everything_lock);
358
359                 if (atomic_read(&res->lr_refcount) != 0) {
360                         /* We lost the race. */
361                         l_unlock(&ldlm_everything_lock);
362                         goto out;
363                 }
364
365                 if (!list_empty(&res->lr_granted))
366                         LBUG();
367
368                 if (!list_empty(&res->lr_converting))
369                         LBUG();
370
371                 if (!list_empty(&res->lr_waiting))
372                         LBUG();
373
374                 if (!list_empty(&res->lr_children))
375                         LBUG();
376
377                 ns->ns_refcount--;
378                 list_del(&res->lr_hash);
379                 list_del(&res->lr_childof);
380
381                 kmem_cache_free(ldlm_resource_slab, res);
382                 l_unlock(&ldlm_everything_lock);
383
384                 spin_lock(&ns->ns_counter_lock);
385                 ns->ns_resources--;
386                 spin_unlock(&ns->ns_counter_lock);
387
388                 rc = 1;
389         } else {
390                 ENTRY;
391         out:
392                 if (atomic_read(&res->lr_refcount) < 0)
393                         LBUG();
394         }
395
396         RETURN(rc);
397 }
398
399 void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
400                             struct ldlm_lock *lock)
401 {
402         l_lock(&ldlm_everything_lock);
403
404         ldlm_resource_dump(res);
405         ldlm_lock_dump(lock);
406
407         if (!list_empty(&lock->l_res_link))
408                 LBUG();
409
410         list_add(&lock->l_res_link, head);
411         l_unlock(&ldlm_everything_lock);
412 }
413
414 void ldlm_resource_unlink_lock(struct ldlm_lock *lock)
415 {
416         l_lock(&ldlm_everything_lock);
417         list_del_init(&lock->l_res_link);
418         l_unlock(&ldlm_everything_lock);
419 }
420
421 void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
422 {
423         desc->lr_type = res->lr_type;
424         memcpy(desc->lr_name, res->lr_name, sizeof(desc->lr_name));
425         memcpy(desc->lr_version, res->lr_version, sizeof(desc->lr_version));
426 }
427
428 void ldlm_dump_all_namespaces(void)
429 {
430         struct list_head *tmp;
431
432         spin_lock(&ldlm_namespace_lock);
433
434         list_for_each(tmp, &ldlm_namespace_list) {
435                 struct ldlm_namespace *ns;
436                 ns = list_entry(tmp, struct ldlm_namespace, ns_list_chain);
437                 ldlm_namespace_dump(ns);
438         }
439
440         spin_unlock(&ldlm_namespace_lock);
441 }
442
443 void ldlm_namespace_dump(struct ldlm_namespace *ns)
444 {
445         struct list_head *tmp;
446
447         l_lock(&ldlm_everything_lock);
448         CDEBUG(D_OTHER, "--- Namespace: %s (rc: %d, client: %d)\n", ns->ns_name,
449                ns->ns_refcount, ns->ns_client);
450
451         list_for_each(tmp, &ns->ns_root_list) {
452                 struct ldlm_resource *res;
453                 res = list_entry(tmp, struct ldlm_resource, lr_childof);
454
455                 /* Once we have resources with children, this should really dump
456                  * them recursively. */
457                 ldlm_resource_dump(res);
458         }
459         l_unlock(&ldlm_everything_lock);
460 }
461
462 void ldlm_resource_dump(struct ldlm_resource *res)
463 {
464         struct list_head *tmp;
465         char name[256];
466
467         if (RES_NAME_SIZE != 3)
468                 LBUG();
469
470         snprintf(name, sizeof(name), "%Lx %Lx %Lx",
471                  (unsigned long long)res->lr_name[0],
472                  (unsigned long long)res->lr_name[1],
473                  (unsigned long long)res->lr_name[2]);
474
475         CDEBUG(D_OTHER, "--- Resource: %p (%s) (rc: %d)\n", res, name,
476                atomic_read(&res->lr_refcount));
477         CDEBUG(D_OTHER, "Namespace: %p (%s)\n", res->lr_namespace,
478                res->lr_namespace->ns_name);
479         CDEBUG(D_OTHER, "Parent: %p, root: %p\n", res->lr_parent, res->lr_root);
480
481         CDEBUG(D_OTHER, "Granted locks:\n");
482         list_for_each(tmp, &res->lr_granted) {
483                 struct ldlm_lock *lock;
484                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
485                 ldlm_lock_dump(lock);
486         }
487
488         CDEBUG(D_OTHER, "Converting locks:\n");
489         list_for_each(tmp, &res->lr_converting) {
490                 struct ldlm_lock *lock;
491                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
492                 ldlm_lock_dump(lock);
493         }
494
495         CDEBUG(D_OTHER, "Waiting locks:\n");
496         list_for_each(tmp, &res->lr_waiting) {
497                 struct ldlm_lock *lock;
498                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
499                 ldlm_lock_dump(lock);
500         }
501 }