Whamcloud - gitweb
- cleanups in ldlm_pool.c - all pools recalc is moved to separate function ldlm_pools...
[fs/lustre-release.git] / lustre / ldlm / ldlm_resource.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Phil Schwan <phil@clusterfs.com>
6  *   Author: Peter Braam <braam@clusterfs.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  */
26
27 #define DEBUG_SUBSYSTEM S_LDLM
28 #ifdef __KERNEL__
29 # include <lustre_dlm.h>
30 #else
31 # include <liblustre.h>
32 #endif
33
34 #include <obd_class.h>
35 #include "ldlm_internal.h"
36
37 cfs_mem_cache_t *ldlm_resource_slab, *ldlm_lock_slab;
38
39 atomic_t ldlm_srv_namespace_nr = ATOMIC_INIT(0);
40 atomic_t ldlm_cli_namespace_nr = ATOMIC_INIT(0);
41 struct semaphore ldlm_namespace_lock;
42 struct list_head ldlm_namespace_list = CFS_LIST_HEAD_INIT(ldlm_namespace_list);
43 cfs_proc_dir_entry_t *ldlm_type_proc_dir = NULL;
44 cfs_proc_dir_entry_t *ldlm_ns_proc_dir = NULL;
45 cfs_proc_dir_entry_t *ldlm_svc_proc_dir = NULL;
46
47 #ifdef LPROCFS
48 static int ldlm_proc_dump_ns(struct file *file, const char *buffer,
49                              unsigned long count, void *data)
50 {
51         ldlm_dump_all_namespaces(D_DLMTRACE);
52         RETURN(count);
53 }
54
55 int ldlm_proc_setup(void)
56 {
57         int rc;
58         struct lprocfs_vars list[] = {
59                 { "dump_namespaces", NULL, ldlm_proc_dump_ns, NULL },
60                 { NULL }};
61         ENTRY;
62         LASSERT(ldlm_ns_proc_dir == NULL);
63
64         ldlm_type_proc_dir = lprocfs_register(OBD_LDLM_DEVICENAME,
65                                               proc_lustre_root,
66                                               NULL, NULL);
67         if (IS_ERR(ldlm_type_proc_dir)) {
68                 CERROR("LProcFS failed in ldlm-init\n");
69                 rc = PTR_ERR(ldlm_type_proc_dir);
70                 GOTO(err, rc);
71         }
72
73         ldlm_ns_proc_dir = lprocfs_register("namespaces",
74                                             ldlm_type_proc_dir,
75                                             NULL, NULL);
76         if (IS_ERR(ldlm_ns_proc_dir)) {
77                 CERROR("LProcFS failed in ldlm-init\n");
78                 rc = PTR_ERR(ldlm_ns_proc_dir);
79                 GOTO(err_type, rc);
80         }
81
82         ldlm_svc_proc_dir = lprocfs_register("services",
83                                             ldlm_type_proc_dir,
84                                             NULL, NULL);
85         if (IS_ERR(ldlm_svc_proc_dir)) {
86                 CERROR("LProcFS failed in ldlm-init\n");
87                 rc = PTR_ERR(ldlm_svc_proc_dir);
88                 GOTO(err_ns, rc);
89         }
90
91         rc = lprocfs_add_vars(ldlm_type_proc_dir, list, NULL);
92
93         RETURN(0);
94
95 err_ns:
96         lprocfs_remove(&ldlm_ns_proc_dir);
97 err_type:
98         lprocfs_remove(&ldlm_type_proc_dir);
99 err:
100         ldlm_svc_proc_dir = NULL;
101         RETURN(rc);
102 }
103
104 void ldlm_proc_cleanup(void)
105 {
106         if (ldlm_svc_proc_dir) 
107                 lprocfs_remove(&ldlm_svc_proc_dir);
108
109         if (ldlm_ns_proc_dir) 
110                 lprocfs_remove(&ldlm_ns_proc_dir);
111
112         if (ldlm_type_proc_dir)
113                 lprocfs_remove(&ldlm_type_proc_dir);
114 }
115
116 static int lprocfs_rd_lru_size(char *page, char **start, off_t off,
117                                int count, int *eof, void *data)
118 {
119         struct ldlm_namespace *ns = data;
120         __u32 *nr = &ns->ns_max_unused;
121
122         if (ns_connect_lru_resize(ns))
123                 nr = &ns->ns_nr_unused;
124         return lprocfs_rd_uint(page, start, off, count, eof, nr);
125 }
126
127 static int lprocfs_wr_lru_size(struct file *file, const char *buffer,
128                                unsigned long count, void *data)
129 {
130         struct ldlm_namespace *ns = data;
131         char dummy[MAX_STRING_SIZE + 1], *end;
132         unsigned long tmp;
133
134         dummy[MAX_STRING_SIZE] = '\0';
135         if (copy_from_user(dummy, buffer, MAX_STRING_SIZE))
136                 return -EFAULT;
137
138         if (count == 6 && memcmp(dummy, "clear", 5) == 0) {
139                 CDEBUG(D_DLMTRACE,
140                        "dropping all unused locks from namespace %s\n",
141                        ns->ns_name);
142                 if (ns_connect_lru_resize(ns)) {
143                         int canceled, unused  = ns->ns_nr_unused;
144                         
145                         /* Try to cancel all @ns_nr_unused locks. */
146                         canceled = ldlm_cancel_lru(ns, unused, LDLM_SYNC);
147                         if (canceled < unused) {
148                                 CERROR("not all requested locks are canceled, "
149                                        "requested: %d, canceled: %d\n", unused, 
150                                        canceled);
151                                 return -EINVAL;
152                         }
153                 } else {
154                         tmp = ns->ns_max_unused;
155                         ns->ns_max_unused = 0;
156                         ldlm_cancel_lru(ns, 0, LDLM_SYNC);
157                         ns->ns_max_unused = tmp;
158                 }
159                 return count;
160         }
161
162         tmp = simple_strtoul(dummy, &end, 0);
163         if (*end) {
164                 CERROR("invalid value written\n");
165                 return -EINVAL;
166         }
167
168         if (ns_connect_lru_resize(ns)) {
169                 if (tmp > ns->ns_nr_unused)
170                         tmp = ns->ns_nr_unused;
171                 tmp = ns->ns_nr_unused - tmp;
172                 
173                 CDEBUG(D_DLMTRACE, "changing namespace %s unused locks from %u to %u\n", 
174                        ns->ns_name, ns->ns_nr_unused, (unsigned int)tmp);
175                 ldlm_cancel_lru(ns, (unsigned int)tmp, LDLM_ASYNC);
176         } else {
177                 CDEBUG(D_DLMTRACE, "changing namespace %s max_unused from %u to %u\n",
178                        ns->ns_name, ns->ns_max_unused, (unsigned int)tmp);
179                 ns->ns_max_unused = (unsigned int)tmp;
180                 ldlm_cancel_lru(ns, 0, LDLM_ASYNC);
181         }
182         return count;
183 }
184
185 void ldlm_proc_namespace(struct ldlm_namespace *ns)
186 {
187         struct lprocfs_vars lock_vars[2];
188         char lock_name[MAX_STRING_SIZE + 1];
189
190         LASSERT(ns != NULL);
191         LASSERT(ns->ns_name != NULL);
192
193         lock_name[MAX_STRING_SIZE] = '\0';
194
195         memset(lock_vars, 0, sizeof(lock_vars));
196         lock_vars[0].name = lock_name;
197
198         snprintf(lock_name, MAX_STRING_SIZE, "%s/resource_count", ns->ns_name);
199         lock_vars[0].data = &ns->ns_refcount;
200         lock_vars[0].read_fptr = lprocfs_rd_atomic;
201         lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
202
203         snprintf(lock_name, MAX_STRING_SIZE, "%s/lock_count", ns->ns_name);
204         lock_vars[0].data = &ns->ns_locks;
205         lock_vars[0].read_fptr = lprocfs_rd_atomic;
206         lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
207
208         if (ns->ns_client) {
209                 snprintf(lock_name, MAX_STRING_SIZE, "%s/lock_unused_count",
210                          ns->ns_name);
211                 lock_vars[0].data = &ns->ns_nr_unused;
212                 lock_vars[0].read_fptr = lprocfs_rd_uint;
213                 lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
214
215                 snprintf(lock_name, MAX_STRING_SIZE, "%s/lru_size",
216                          ns->ns_name);
217                 lock_vars[0].data = ns;
218                 lock_vars[0].read_fptr = lprocfs_rd_lru_size;
219                 lock_vars[0].write_fptr = lprocfs_wr_lru_size;
220                 lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
221                 
222                 snprintf(lock_name, MAX_STRING_SIZE, "%s/lru_max_age",
223                          ns->ns_name);
224                 lock_vars[0].data = &ns->ns_max_age;
225                 lock_vars[0].read_fptr = lprocfs_rd_uint;
226                 lock_vars[0].write_fptr = lprocfs_wr_uint;
227                 lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
228         } else {
229                 snprintf(lock_name, MAX_STRING_SIZE, "%s/max_nolock_bytes",
230                          ns->ns_name);
231                 lock_vars[0].data = &ns->ns_max_nolock_size;
232                 lock_vars[0].read_fptr = lprocfs_rd_uint;
233                 lock_vars[0].write_fptr = lprocfs_wr_uint;
234                 lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
235
236                 snprintf(lock_name, MAX_STRING_SIZE, "%s/contention_seconds",
237                          ns->ns_name);
238                 lock_vars[0].data = &ns->ns_contention_time;
239                 lock_vars[0].read_fptr = lprocfs_rd_uint;
240                 lock_vars[0].write_fptr = lprocfs_wr_uint;
241                 lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
242
243                 snprintf(lock_name, MAX_STRING_SIZE, "%s/contended_locks",
244                          ns->ns_name);
245                 lock_vars[0].data = &ns->ns_contended_locks;
246                 lock_vars[0].read_fptr = lprocfs_rd_uint;
247                 lock_vars[0].write_fptr = lprocfs_wr_uint;
248                 lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
249         }
250 }
251 #undef MAX_STRING_SIZE
252 #else
253 #define ldlm_proc_namespace(ns) do {} while (0)
254 #endif /* LPROCFS */
255
256 static atomic_t *ldlm_namespace_nr(ldlm_side_t client)
257 {
258         return client == LDLM_NAMESPACE_SERVER ? 
259                 &ldlm_srv_namespace_nr : &ldlm_cli_namespace_nr;
260 }
261
262 struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client, 
263                                           ldlm_appetite_t apt)
264 {
265         struct ldlm_namespace *ns = NULL;
266         struct list_head *bucket;
267         int rc, idx;
268         ENTRY;
269
270         rc = ldlm_get_ref(client);
271         if (rc) {
272                 CERROR("ldlm_get_ref failed: %d\n", rc);
273                 RETURN(NULL);
274         }
275
276         OBD_ALLOC_PTR(ns);
277         if (!ns)
278                 GOTO(out_ref, NULL);
279
280         OBD_VMALLOC(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
281         if (!ns->ns_hash)
282                 GOTO(out_ns, NULL);
283
284         OBD_ALLOC(ns->ns_name, strlen(name) + 1);
285         if (!ns->ns_name)
286                 GOTO(out_hash, NULL);
287
288         ns->ns_appetite = apt;
289         strcpy(ns->ns_name, name);
290
291         CFS_INIT_LIST_HEAD(&ns->ns_root_list);
292         ns->ns_refcount = 0;
293         ns->ns_client = client;
294         spin_lock_init(&ns->ns_hash_lock);
295         atomic_set(&ns->ns_locks, 0);
296         ns->ns_resources = 0;
297         cfs_waitq_init(&ns->ns_waitq);
298         ns->ns_max_nolock_size = NS_DEFAULT_MAX_NOLOCK_BYTES;
299         ns->ns_contention_time = NS_DEFAULT_CONTENTION_SECONDS;
300         ns->ns_contended_locks = NS_DEFAULT_CONTENDED_LOCKS;
301
302         for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash;
303              bucket--)
304                 CFS_INIT_LIST_HEAD(bucket);
305
306         CFS_INIT_LIST_HEAD(&ns->ns_unused_list);
307         ns->ns_nr_unused = 0;
308         ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE;
309         ns->ns_max_age = LDLM_DEFAULT_MAX_ALIVE;
310         spin_lock_init(&ns->ns_unused_lock);
311
312         ns->ns_connect_flags = 0;
313         mutex_down(&ldlm_namespace_lock);
314         list_add(&ns->ns_list_chain, &ldlm_namespace_list);
315         idx = atomic_read(ldlm_namespace_nr(client));
316         atomic_inc(ldlm_namespace_nr(client));
317         mutex_up(&ldlm_namespace_lock);
318         
319         ldlm_proc_namespace(ns);
320         
321         rc = ldlm_pool_init(&ns->ns_pool, ns, idx, client);
322         if (rc) {
323                 CERROR("can't initialize lock pool, rc %d\n", rc);
324                 GOTO(out_del, rc);
325         }
326         RETURN(ns);
327
328 out_del:
329         mutex_down(&ldlm_namespace_lock);
330         list_del(&ns->ns_list_chain);
331         atomic_dec(ldlm_namespace_nr(client));
332         mutex_up(&ldlm_namespace_lock);
333 out_hash:
334         POISON(ns->ns_hash, 0x5a, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
335         OBD_VFREE(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
336 out_ns:
337         OBD_FREE_PTR(ns);
338 out_ref:
339         ldlm_put_ref(client, 0);
340         RETURN(NULL);
341 }
342
343 extern struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock);
344
345 /* If flags contains FL_LOCAL_ONLY, don't try to tell the server, just cleanup.
346  * This is currently only used for recovery, and we make certain assumptions
347  * as a result--notably, that we shouldn't cancel locks with refs. -phil
348  *
349  * Called with the ns_lock held. */
350 static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
351                              int flags)
352 {
353         struct list_head *tmp;
354         int rc = 0, client = res->lr_namespace->ns_client;
355         int local_only = (flags & LDLM_FL_LOCAL_ONLY);
356         ENTRY;
357
358         
359         do {
360                 struct ldlm_lock *lock = NULL;
361  
362                 /* first, we look for non-cleaned-yet lock
363                  * all cleaned locks are marked by CLEANED flag */
364                 lock_res(res);
365                 list_for_each(tmp, q) {
366                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
367                         if (lock->l_flags & LDLM_FL_CLEANED) {
368                                 lock = NULL;
369                                 continue;
370                         }
371                         LDLM_LOCK_GET(lock);
372                         lock->l_flags |= LDLM_FL_CLEANED;
373                         break;
374                 }
375                 
376                 if (lock == NULL) {
377                         unlock_res(res);
378                         break;
379                 }
380
381                 /* Set CBPENDING so nothing in the cancellation path
382                  * can match this lock */
383                 lock->l_flags |= LDLM_FL_CBPENDING;
384                 lock->l_flags |= LDLM_FL_FAILED;
385                 lock->l_flags |= flags;
386
387                 /* ... without sending a CANCEL message for local_only. */
388                 if (local_only)
389                         lock->l_flags |= LDLM_FL_LOCAL_ONLY;
390
391                 if (local_only && (lock->l_readers || lock->l_writers)) {
392                         /* This is a little bit gross, but much better than the
393                          * alternative: pretend that we got a blocking AST from
394                          * the server, so that when the lock is decref'd, it
395                          * will go away ... */
396                         unlock_res(res);
397                         LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
398                         if (lock->l_completion_ast)
399                                 lock->l_completion_ast(lock, 0, NULL);
400                         LDLM_LOCK_PUT(lock);
401                         continue;
402                 }
403
404                 if (client) {
405                         struct lustre_handle lockh;
406
407                         unlock_res(res);
408                         ldlm_lock2handle(lock, &lockh);
409                         rc = ldlm_cli_cancel(&lockh);
410                         if (rc)
411                                 CERROR("ldlm_cli_cancel: %d\n", rc);
412                 } else {
413                         ldlm_resource_unlink_lock(lock);
414                         unlock_res(res);
415                         LDLM_DEBUG(lock, "Freeing a lock still held by a "
416                                    "client node");
417                         ldlm_lock_destroy(lock);
418                 }
419                 LDLM_LOCK_PUT(lock);
420         } while (1);
421
422         EXIT;
423 }
424
425 int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags)
426 {
427         struct list_head *tmp;
428         int i;
429
430         if (ns == NULL) {
431                 CDEBUG(D_INFO, "NULL ns, skipping cleanup\n");
432                 return ELDLM_OK;
433         }
434
435         for (i = 0; i < RES_HASH_SIZE; i++) {
436                 spin_lock(&ns->ns_hash_lock);
437                 tmp = ns->ns_hash[i].next;
438                 while (tmp != &(ns->ns_hash[i])) {
439                         struct ldlm_resource *res;
440                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
441                         ldlm_resource_getref(res);
442                         spin_unlock(&ns->ns_hash_lock);
443
444                         cleanup_resource(res, &res->lr_granted, flags);
445                         cleanup_resource(res, &res->lr_converting, flags);
446                         cleanup_resource(res, &res->lr_waiting, flags);
447
448                         spin_lock(&ns->ns_hash_lock);
449                         tmp  = tmp->next;
450
451                         /* XXX: former stuff caused issues in case of race
452                          * between ldlm_namespace_cleanup() and lockd() when
453                          * client gets blocking ast when lock gets distracted by
454                          * server. This is 1_4 branch solution, let's see how
455                          * will it behave. */
456                         if (!ldlm_resource_putref_locked(res))
457                                 CDEBUG(D_INFO,
458                                        "Namespace %s resource refcount nonzero "
459                                        "(%d) after lock cleanup; forcing cleanup.\n",
460                                        ns->ns_name, atomic_read(&res->lr_refcount));
461                 }
462                 spin_unlock(&ns->ns_hash_lock);
463         }
464
465         return ELDLM_OK;
466 }
467
468 int ldlm_namespace_free_prior(struct ldlm_namespace *ns)
469 {
470         ENTRY;
471         if (!ns)
472                 RETURN(ELDLM_OK);
473
474         mutex_down(&ldlm_namespace_lock);
475         list_del(&ns->ns_list_chain);
476         atomic_dec(ldlm_namespace_nr(ns->ns_client));
477         ldlm_pool_fini(&ns->ns_pool);
478         mutex_up(&ldlm_namespace_lock);
479
480         /* At shutdown time, don't call the cancellation callback */
481         ldlm_namespace_cleanup(ns, 0);
482
483         if (ns->ns_refcount > 0) {
484                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
485                 int rc;
486                 CDEBUG(D_DLMTRACE,
487                        "dlm namespace %s free waiting on refcount %d\n",
488                        ns->ns_name, ns->ns_refcount);
489                 rc = l_wait_event(ns->ns_waitq,
490                                   ns->ns_refcount == 0, &lwi);
491                 if (ns->ns_refcount)
492                         LCONSOLE_ERROR_MSG(0x139, "Lock manager: wait for %s "
493                                            "namespace cleanup aborted with %d "
494                                            "resources in use. (%d)\nI'm going "
495                                            "to try to clean up anyway, but I "
496                                            "might need a reboot of this node.\n",
497                                             ns->ns_name, (int) ns->ns_refcount, 
498                                             rc);
499                 CDEBUG(D_DLMTRACE,
500                        "dlm namespace %s free done waiting\n", ns->ns_name);
501         }
502
503         RETURN(ELDLM_OK);
504 }
505
506 int ldlm_namespace_free_post(struct ldlm_namespace *ns, int force)
507 {
508         ldlm_side_t client;
509         ENTRY;
510         if (!ns)
511                 RETURN(ELDLM_OK);
512
513 #ifdef LPROCFS
514         {
515                 struct proc_dir_entry *dir;
516                 dir = lprocfs_srch(ldlm_ns_proc_dir, ns->ns_name);
517                 if (dir == NULL) {
518                         CERROR("dlm namespace %s has no procfs dir?\n",
519                                ns->ns_name);
520                 } else {
521                         lprocfs_remove(&dir);
522                 }
523         }
524 #endif
525         client = ns->ns_client;
526         POISON(ns->ns_hash, 0x5a, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
527         OBD_VFREE(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
528         OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1);
529         OBD_FREE_PTR(ns);
530         ldlm_put_ref(client, force);
531         RETURN(ELDLM_OK);
532 }
533
534 /* Cleanup the resource, and free namespace.
535  * bug 12864:
536  * Deadlock issue: 
537  * proc1: destroy import 
538  *        class_disconnect_export(grab cl_sem) -> 
539  *              -> ldlm_namespace_free -> 
540  *              -> lprocfs_remove(grab _lprocfs_lock).
541  * proc2: read proc info
542  *        lprocfs_fops_read(grab _lprocfs_lock) ->
543  *              -> osc_rd_active, etc(grab cl_sem).
544  *
545  * So that I have to split the ldlm_namespace_free into two parts - the first
546  * part ldlm_namespace_free_prior is used to cleanup the resource which is
547  * being used; the 2nd part ldlm_namespace_free_post is used to unregister the
548  * lprocfs entries, and then free memory. It will be called w/o cli->cl_sem 
549  * held.
550  */
551 int ldlm_namespace_free(struct ldlm_namespace *ns, int force)
552 {
553         ldlm_namespace_free_prior(ns);
554         ldlm_namespace_free_post(ns, force);
555         return ELDLM_OK;
556 }
557
558 static __u32 ldlm_hash_fn(struct ldlm_resource *parent, struct ldlm_res_id name)
559 {
560         __u32 hash = 0;
561         int i;
562
563         for (i = 0; i < RES_NAME_SIZE; i++)
564                 hash += name.name[i];
565
566         hash += (__u32)((unsigned long)parent >> 4);
567
568         return (hash & RES_HASH_MASK);
569 }
570
571 static struct ldlm_resource *ldlm_resource_new(void)
572 {
573         struct ldlm_resource *res;
574
575         OBD_SLAB_ALLOC(res, ldlm_resource_slab, CFS_ALLOC_IO, sizeof *res);
576         if (res == NULL)
577                 return NULL;
578
579         memset(res, 0, sizeof(*res));
580
581         CFS_INIT_LIST_HEAD(&res->lr_children);
582         CFS_INIT_LIST_HEAD(&res->lr_childof);
583         CFS_INIT_LIST_HEAD(&res->lr_granted);
584         CFS_INIT_LIST_HEAD(&res->lr_converting);
585         CFS_INIT_LIST_HEAD(&res->lr_waiting);
586         atomic_set(&res->lr_refcount, 1);
587         spin_lock_init(&res->lr_lock);
588
589         /* one who creates the resource must unlock
590          * the semaphore after lvb initialization */
591         init_MUTEX_LOCKED(&res->lr_lvb_sem);
592
593         return res;
594 }
595
596 /* must be called with hash lock held */
597 static struct ldlm_resource *
598 ldlm_resource_find(struct ldlm_namespace *ns, struct ldlm_res_id name, __u32 hash)
599 {
600         struct list_head *bucket, *tmp;
601         struct ldlm_resource *res;
602
603         LASSERT_SPIN_LOCKED(&ns->ns_hash_lock);
604         bucket = ns->ns_hash + hash;
605
606         list_for_each(tmp, bucket) {
607                 res = list_entry(tmp, struct ldlm_resource, lr_hash);
608                 if (memcmp(&res->lr_name, &name, sizeof(res->lr_name)) == 0)
609                         return res;
610         }
611
612         return NULL;
613 }
614
615 /* Args: locked namespace
616  * Returns: newly-allocated, referenced, unlocked resource */
617 static struct ldlm_resource *
618 ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent,
619                   struct ldlm_res_id name, __u32 hash, ldlm_type_t type)
620 {
621         struct list_head *bucket;
622         struct ldlm_resource *res, *old_res;
623         ENTRY;
624
625         LASSERTF(type >= LDLM_MIN_TYPE && type < LDLM_MAX_TYPE,
626                  "type: %d\n", type);
627
628         res = ldlm_resource_new();
629         if (!res)
630                 RETURN(NULL);
631
632         res->lr_name = name;
633         res->lr_namespace = ns;
634         res->lr_type = type;
635         res->lr_most_restr = LCK_NL;
636
637         spin_lock(&ns->ns_hash_lock);
638         old_res = ldlm_resource_find(ns, name, hash);
639         if (old_res) {
640                 /* someone won the race and added the resource before */
641                 ldlm_resource_getref(old_res);
642                 spin_unlock(&ns->ns_hash_lock);
643                 OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
644                 /* synchronize WRT resource creation */
645                 if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
646                         down(&old_res->lr_lvb_sem);
647                         up(&old_res->lr_lvb_sem);
648                 }
649                 RETURN(old_res);
650         }
651
652         /* we won! let's add the resource */
653         bucket = ns->ns_hash + hash;
654         list_add(&res->lr_hash, bucket);
655         ns->ns_resources++;
656         ns->ns_refcount++;
657
658         if (parent == NULL) {
659                 list_add(&res->lr_childof, &ns->ns_root_list);
660         } else {
661                 res->lr_parent = parent;
662                 list_add(&res->lr_childof, &parent->lr_children);
663         }
664         spin_unlock(&ns->ns_hash_lock);
665
666         if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
667                 int rc;
668
669                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2);
670                 rc = ns->ns_lvbo->lvbo_init(res);
671                 if (rc)
672                         CERROR("lvbo_init failed for resource "
673                                LPU64": rc %d\n", name.name[0], rc);
674                 /* we create resource with locked lr_lvb_sem */
675                 up(&res->lr_lvb_sem);
676         }
677
678         RETURN(res);
679 }
680
681 /* Args: unlocked namespace
682  * Locks: takes and releases ns->ns_lock and res->lr_lock
683  * Returns: referenced, unlocked ldlm_resource or NULL */
684 struct ldlm_resource *
685 ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
686                   struct ldlm_res_id name, ldlm_type_t type, int create)
687 {
688         __u32 hash = ldlm_hash_fn(parent, name);
689         struct ldlm_resource *res = NULL;
690         ENTRY;
691
692         LASSERT(ns != NULL);
693         LASSERT(ns->ns_hash != NULL);
694         LASSERT(name.name[0] != 0);
695
696         spin_lock(&ns->ns_hash_lock);
697         res = ldlm_resource_find(ns, name, hash);
698         if (res) {
699                 ldlm_resource_getref(res);
700                 spin_unlock(&ns->ns_hash_lock);
701                 /* synchronize WRT resource creation */
702                 if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
703                         down(&res->lr_lvb_sem);
704                         up(&res->lr_lvb_sem);
705                 }
706                 RETURN(res);
707         }
708         spin_unlock(&ns->ns_hash_lock);
709
710         if (create == 0)
711                 RETURN(NULL);
712
713         res = ldlm_resource_add(ns, parent, name, hash, type);
714         RETURN(res);
715 }
716
717 struct ldlm_resource *ldlm_resource_getref(struct ldlm_resource *res)
718 {
719         LASSERT(res != NULL);
720         LASSERT(res != LP_POISON);
721         atomic_inc(&res->lr_refcount);
722         CDEBUG(D_INFO, "getref res: %p count: %d\n", res,
723                atomic_read(&res->lr_refcount));
724         return res;
725 }
726
727 void __ldlm_resource_putref_final(struct ldlm_resource *res)
728 {
729         struct ldlm_namespace *ns = res->lr_namespace;
730
731         LASSERT_SPIN_LOCKED(&ns->ns_hash_lock);
732
733         if (!list_empty(&res->lr_granted)) {
734                 ldlm_resource_dump(D_ERROR, res);
735                 LBUG();
736         }
737
738         if (!list_empty(&res->lr_converting)) {
739                 ldlm_resource_dump(D_ERROR, res);
740                 LBUG();
741         }
742
743         if (!list_empty(&res->lr_waiting)) {
744                 ldlm_resource_dump(D_ERROR, res);
745                 LBUG();
746         }
747
748         if (!list_empty(&res->lr_children)) {
749                 ldlm_resource_dump(D_ERROR, res);
750                 LBUG();
751         }
752
753         ns->ns_refcount--;
754         list_del_init(&res->lr_hash);
755         list_del_init(&res->lr_childof);
756
757         ns->ns_resources--;
758         if (ns->ns_resources == 0)
759                 wake_up(&ns->ns_waitq);
760 }
761
762 /* Returns 1 if the resource was freed, 0 if it remains. */
763 int ldlm_resource_putref(struct ldlm_resource *res)
764 {
765         struct ldlm_namespace *ns = res->lr_namespace;
766         int rc = 0;
767         ENTRY;
768
769         CDEBUG(D_INFO, "putref res: %p count: %d\n", res,
770                atomic_read(&res->lr_refcount) - 1);
771         LASSERTF(atomic_read(&res->lr_refcount) > 0, "%d",
772                  atomic_read(&res->lr_refcount));
773         LASSERTF(atomic_read(&res->lr_refcount) < LI_POISON, "%d",
774                  atomic_read(&res->lr_refcount));
775
776         if (atomic_dec_and_lock(&res->lr_refcount, &ns->ns_hash_lock)) {
777                 __ldlm_resource_putref_final(res);
778                 spin_unlock(&ns->ns_hash_lock);
779                 if (res->lr_lvb_data)
780                         OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
781                 OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
782                 rc = 1;
783         }
784
785         RETURN(rc);
786 }
787
788 /* Returns 1 if the resource was freed, 0 if it remains. */
789 int ldlm_resource_putref_locked(struct ldlm_resource *res)
790 {
791         int rc = 0;
792         ENTRY;
793
794         CDEBUG(D_INFO, "putref res: %p count: %d\n", res,
795                atomic_read(&res->lr_refcount) - 1);
796         LASSERT(atomic_read(&res->lr_refcount) > 0);
797         LASSERT(atomic_read(&res->lr_refcount) < LI_POISON);
798
799         LASSERT(atomic_read(&res->lr_refcount) >= 0);
800         if (atomic_dec_and_test(&res->lr_refcount)) {
801                 __ldlm_resource_putref_final(res);
802                 if (res->lr_lvb_data)
803                         OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
804                 OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
805                 rc = 1;
806         }
807
808         RETURN(rc);
809 }
810
811 void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
812                             struct ldlm_lock *lock)
813 {
814         check_res_locked(res);
815
816         ldlm_resource_dump(D_OTHER, res);
817         CDEBUG(D_OTHER, "About to add this lock:\n");
818         ldlm_lock_dump(D_OTHER, lock, 0);
819
820         if (lock->l_destroyed) {
821                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
822                 return;
823         }
824
825         LASSERT(list_empty(&lock->l_res_link));
826
827         list_add_tail(&lock->l_res_link, head);
828 }
829
830 void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
831                                      struct ldlm_lock *new)
832 {
833         struct ldlm_resource *res = original->l_resource;
834
835         check_res_locked(res);
836
837         ldlm_resource_dump(D_OTHER, res);
838         CDEBUG(D_OTHER, "About to insert this lock after %p:\n", original);
839         ldlm_lock_dump(D_OTHER, new, 0);
840
841         if (new->l_destroyed) {
842                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
843                 goto out;
844         }
845
846         LASSERT(list_empty(&new->l_res_link));
847
848         list_add(&new->l_res_link, &original->l_res_link);
849  out:;
850 }
851
852 void ldlm_resource_unlink_lock(struct ldlm_lock *lock)
853 {
854         check_res_locked(lock->l_resource);
855         ldlm_unlink_lock_skiplist(lock);
856         list_del_init(&lock->l_res_link);
857 }
858
859 void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
860 {
861         desc->lr_type = res->lr_type;
862         desc->lr_name = res->lr_name;
863 }
864
865 void ldlm_dump_all_namespaces(int level)
866 {
867         struct list_head *tmp;
868
869         if (!((libcfs_debug | D_ERROR) & level))
870                 return;
871
872         mutex_down(&ldlm_namespace_lock);
873
874         list_for_each(tmp, &ldlm_namespace_list) {
875                 struct ldlm_namespace *ns;
876                 ns = list_entry(tmp, struct ldlm_namespace, ns_list_chain);
877                 ldlm_namespace_dump(level, ns);
878         }
879
880         mutex_up(&ldlm_namespace_lock);
881 }
882
883 void ldlm_namespace_dump(int level, struct ldlm_namespace *ns)
884 {
885         struct list_head *tmp;
886
887         if (!((libcfs_debug | D_ERROR) & level))
888                 return;
889
890         CDEBUG(level, "--- Namespace: %s (rc: %d, client: %d)\n", 
891                ns->ns_name, ns->ns_refcount, ns->ns_client);
892
893         if (cfs_time_before(cfs_time_current(), ns->ns_next_dump))
894                 return;
895
896         spin_lock(&ns->ns_hash_lock);
897         tmp = ns->ns_root_list.next;
898         while (tmp != &ns->ns_root_list) {
899                 struct ldlm_resource *res;
900                 res = list_entry(tmp, struct ldlm_resource, lr_childof);
901
902                 ldlm_resource_getref(res);
903                 spin_unlock(&ns->ns_hash_lock);
904
905                 lock_res(res);
906                 ldlm_resource_dump(level, res);
907                 unlock_res(res);
908                 
909                 spin_lock(&ns->ns_hash_lock);
910                 tmp = tmp->next;
911                 ldlm_resource_putref_locked(res);
912         }
913         ns->ns_next_dump = cfs_time_shift(10);
914         spin_unlock(&ns->ns_hash_lock);
915 }
916
917 void ldlm_resource_dump(int level, struct ldlm_resource *res)
918 {
919         struct list_head *tmp;
920         int pos;
921
922         CLASSERT(RES_NAME_SIZE == 4);
923
924         if (!((libcfs_debug | D_ERROR) & level))
925                 return;
926
927         CDEBUG(level, "--- Resource: %p ("LPU64"/"LPU64"/"LPU64"/"LPU64
928                ") (rc: %d)\n", res, res->lr_name.name[0], res->lr_name.name[1],
929                res->lr_name.name[2], res->lr_name.name[3],
930                atomic_read(&res->lr_refcount));
931
932         if (!list_empty(&res->lr_granted)) {
933                 pos = 0;
934                 CDEBUG(level, "Granted locks:\n");
935                 list_for_each(tmp, &res->lr_granted) {
936                         struct ldlm_lock *lock;
937                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
938                         ldlm_lock_dump(level, lock, ++pos);
939                 }
940         }
941         if (!list_empty(&res->lr_converting)) {
942                 pos = 0;
943                 CDEBUG(level, "Converting locks:\n");
944                 list_for_each(tmp, &res->lr_converting) {
945                         struct ldlm_lock *lock;
946                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
947                         ldlm_lock_dump(level, lock, ++pos);
948                 }
949         }
950         if (!list_empty(&res->lr_waiting)) {
951                 pos = 0;
952                 CDEBUG(level, "Waiting locks:\n");
953                 list_for_each(tmp, &res->lr_waiting) {
954                         struct ldlm_lock *lock;
955                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
956                         ldlm_lock_dump(level, lock, ++pos);
957                 }
958         }
959 }