Whamcloud - gitweb
1a2e3de52ea69bb581e37cbe3fcf8da74f7f6a71
[fs/lustre-release.git] / lustre / ldlm / ldlm_resource.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Phil Schwan <phil@clusterfs.com>
6  *   Author: Peter Braam <braam@clusterfs.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  */
26
27 #define DEBUG_SUBSYSTEM S_LDLM
28 #ifdef __KERNEL__
29 # include <lustre_dlm.h>
30 #else
31 # include <liblustre.h>
32 #endif
33
34 #include <obd_class.h>
35 #include "ldlm_internal.h"
36
37 cfs_mem_cache_t *ldlm_resource_slab, *ldlm_lock_slab;
38
39 atomic_t ldlm_srv_namespace_nr = ATOMIC_INIT(0);
40 atomic_t ldlm_cli_namespace_nr = ATOMIC_INIT(0);
41
42 struct semaphore ldlm_srv_namespace_lock;
43 struct list_head ldlm_srv_namespace_list = 
44         CFS_LIST_HEAD_INIT(ldlm_srv_namespace_list);
45
46 struct semaphore ldlm_cli_namespace_lock;
47 struct list_head ldlm_cli_namespace_list = 
48         CFS_LIST_HEAD_INIT(ldlm_cli_namespace_list);
49
50 cfs_proc_dir_entry_t *ldlm_type_proc_dir = NULL;
51 cfs_proc_dir_entry_t *ldlm_ns_proc_dir = NULL;
52 cfs_proc_dir_entry_t *ldlm_svc_proc_dir = NULL;
53
54 #ifdef LPROCFS
55 static int ldlm_proc_dump_ns(struct file *file, const char *buffer,
56                              unsigned long count, void *data)
57 {
58         ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
59         ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
60         RETURN(count);
61 }
62
63 int ldlm_proc_setup(void)
64 {
65         int rc;
66         struct lprocfs_vars list[] = {
67                 { "dump_namespaces", NULL, ldlm_proc_dump_ns, NULL },
68                 { NULL }};
69         ENTRY;
70         LASSERT(ldlm_ns_proc_dir == NULL);
71
72         ldlm_type_proc_dir = lprocfs_register(OBD_LDLM_DEVICENAME,
73                                               proc_lustre_root,
74                                               NULL, NULL);
75         if (IS_ERR(ldlm_type_proc_dir)) {
76                 CERROR("LProcFS failed in ldlm-init\n");
77                 rc = PTR_ERR(ldlm_type_proc_dir);
78                 GOTO(err, rc);
79         }
80
81         ldlm_ns_proc_dir = lprocfs_register("namespaces",
82                                             ldlm_type_proc_dir,
83                                             NULL, NULL);
84         if (IS_ERR(ldlm_ns_proc_dir)) {
85                 CERROR("LProcFS failed in ldlm-init\n");
86                 rc = PTR_ERR(ldlm_ns_proc_dir);
87                 GOTO(err_type, rc);
88         }
89
90         ldlm_svc_proc_dir = lprocfs_register("services",
91                                             ldlm_type_proc_dir,
92                                             NULL, NULL);
93         if (IS_ERR(ldlm_svc_proc_dir)) {
94                 CERROR("LProcFS failed in ldlm-init\n");
95                 rc = PTR_ERR(ldlm_svc_proc_dir);
96                 GOTO(err_ns, rc);
97         }
98
99         rc = lprocfs_add_vars(ldlm_type_proc_dir, list, NULL);
100
101         RETURN(0);
102
103 err_ns:
104         lprocfs_remove(&ldlm_ns_proc_dir);
105 err_type:
106         lprocfs_remove(&ldlm_type_proc_dir);
107 err:
108         ldlm_svc_proc_dir = NULL;
109         RETURN(rc);
110 }
111
112 void ldlm_proc_cleanup(void)
113 {
114         if (ldlm_svc_proc_dir)
115                 lprocfs_remove(&ldlm_svc_proc_dir);
116
117         if (ldlm_ns_proc_dir)
118                 lprocfs_remove(&ldlm_ns_proc_dir);
119
120         if (ldlm_type_proc_dir)
121                 lprocfs_remove(&ldlm_type_proc_dir);
122 }
123
124 static int lprocfs_rd_lru_size(char *page, char **start, off_t off,
125                                int count, int *eof, void *data)
126 {
127         struct ldlm_namespace *ns = data;
128         __u32 *nr = &ns->ns_max_unused;
129
130         if (ns_connect_lru_resize(ns))
131                 nr = &ns->ns_nr_unused;
132         return lprocfs_rd_uint(page, start, off, count, eof, nr);
133 }
134
135 static int lprocfs_wr_lru_size(struct file *file, const char *buffer,
136                                unsigned long count, void *data)
137 {
138         struct ldlm_namespace *ns = data;
139         char dummy[MAX_STRING_SIZE + 1], *end;
140         unsigned long tmp;
141         int lru_resize;
142
143         dummy[MAX_STRING_SIZE] = '\0';
144         if (copy_from_user(dummy, buffer, MAX_STRING_SIZE))
145                 return -EFAULT;
146
147         if (count == 6 && memcmp(dummy, "clear", 5) == 0) {
148                 CDEBUG(D_DLMTRACE,
149                        "dropping all unused locks from namespace %s\n",
150                        ns->ns_name);
151                 if (ns_connect_lru_resize(ns)) {
152                         int canceled, unused  = ns->ns_nr_unused;
153                         
154                         /* Try to cancel all @ns_nr_unused locks. */
155                         canceled = ldlm_cancel_lru(ns, unused, LDLM_SYNC);
156                         if (canceled < unused) {
157                                 CERROR("not all requested locks are canceled, "
158                                        "requested: %d, canceled: %d\n", unused, 
159                                        canceled);
160                                 return -EINVAL;
161                         }
162                 } else {
163                         tmp = ns->ns_max_unused;
164                         ns->ns_max_unused = 0;
165                         ldlm_cancel_lru(ns, 0, LDLM_SYNC);
166                         ns->ns_max_unused = tmp;
167                 }
168                 return count;
169         }
170
171         tmp = simple_strtoul(dummy, &end, 0);
172         if (dummy == end) {
173                 CERROR("invalid value written\n");
174                 return -EINVAL;
175         }
176         lru_resize = (tmp == 0);
177         
178         if (ns_connect_lru_resize(ns)) {
179                 if (!lru_resize)
180                         ns->ns_max_unused = (unsigned int)tmp;
181                         
182                 if (tmp > ns->ns_nr_unused)
183                         tmp = ns->ns_nr_unused;
184                 tmp = ns->ns_nr_unused - tmp;
185                 
186                 CDEBUG(D_DLMTRACE, "changing namespace %s unused locks from %u to %u\n", 
187                        ns->ns_name, ns->ns_nr_unused, (unsigned int)tmp);
188                 ldlm_cancel_lru(ns, (unsigned int)tmp, LDLM_ASYNC);
189                 
190                 if (!lru_resize) {
191                         CDEBUG(D_DLMTRACE, "disable lru_resize for namespace %s\n", 
192                                ns->ns_name);
193                         ns->ns_connect_flags &= ~OBD_CONNECT_LRU_RESIZE;
194                 }
195         } else {
196                 CDEBUG(D_DLMTRACE, "changing namespace %s max_unused from %u to %u\n",
197                        ns->ns_name, ns->ns_max_unused, (unsigned int)tmp);
198                 ns->ns_max_unused = (unsigned int)tmp;
199                 ldlm_cancel_lru(ns, 0, LDLM_ASYNC);
200                 
201                 /* Make sure that originally lru resize was supported before 
202                  * turning it on here. */
203                 if (lru_resize && 
204                     (ns->ns_orig_connect_flags & OBD_CONNECT_LRU_RESIZE)) {
205                         CDEBUG(D_DLMTRACE, "enable lru_resize for namespace %s\n", 
206                                ns->ns_name);
207                         ns->ns_connect_flags |= OBD_CONNECT_LRU_RESIZE;
208                 }
209         }
210
211         return count;
212 }
213
214 void ldlm_proc_namespace(struct ldlm_namespace *ns)
215 {
216         struct lprocfs_vars lock_vars[2];
217         char lock_name[MAX_STRING_SIZE + 1];
218
219         LASSERT(ns != NULL);
220         LASSERT(ns->ns_name != NULL);
221
222         lock_name[MAX_STRING_SIZE] = '\0';
223
224         memset(lock_vars, 0, sizeof(lock_vars));
225         lock_vars[0].name = lock_name;
226
227         snprintf(lock_name, MAX_STRING_SIZE, "%s/resource_count", ns->ns_name);
228         lock_vars[0].data = &ns->ns_refcount;
229         lock_vars[0].read_fptr = lprocfs_rd_atomic;
230         lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
231
232         snprintf(lock_name, MAX_STRING_SIZE, "%s/lock_count", ns->ns_name);
233         lock_vars[0].data = &ns->ns_locks;
234         lock_vars[0].read_fptr = lprocfs_rd_atomic;
235         lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
236
237         if (ns_is_client(ns)) {
238                 snprintf(lock_name, MAX_STRING_SIZE, "%s/lock_unused_count",
239                          ns->ns_name);
240                 lock_vars[0].data = &ns->ns_nr_unused;
241                 lock_vars[0].read_fptr = lprocfs_rd_uint;
242                 lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
243
244                 snprintf(lock_name, MAX_STRING_SIZE, "%s/lru_size",
245                          ns->ns_name);
246                 lock_vars[0].data = ns;
247                 lock_vars[0].read_fptr = lprocfs_rd_lru_size;
248                 lock_vars[0].write_fptr = lprocfs_wr_lru_size;
249                 lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
250
251                 snprintf(lock_name, MAX_STRING_SIZE, "%s/lru_max_age",
252                          ns->ns_name);
253                 lock_vars[0].data = &ns->ns_max_age;
254                 lock_vars[0].read_fptr = lprocfs_rd_uint;
255                 lock_vars[0].write_fptr = lprocfs_wr_uint;
256                 lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
257
258         }
259 }
260 #undef MAX_STRING_SIZE
261 #else
262 #define ldlm_proc_namespace(ns) do {} while (0)
263 #endif /* LPROCFS */
264
265 struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client, 
266                                           ldlm_appetite_t apt)
267 {
268         struct ldlm_namespace *ns = NULL;
269         struct list_head *bucket;
270         int rc, idx, namelen;
271         ENTRY;
272
273         rc = ldlm_get_ref();
274         if (rc) {
275                 CERROR("ldlm_get_ref failed: %d\n", rc);
276                 RETURN(NULL);
277         }
278
279         OBD_ALLOC_PTR(ns);
280         if (!ns)
281                 GOTO(out_ref, NULL);
282
283         OBD_VMALLOC(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
284         if (!ns->ns_hash)
285                 GOTO(out_ns, NULL);
286
287         ns->ns_appetite = apt;
288         namelen = strlen(name);
289         OBD_ALLOC(ns->ns_name, namelen + 1);
290         if (!ns->ns_name)
291                 GOTO(out_hash, NULL);
292
293         strcpy(ns->ns_name, name);
294
295         CFS_INIT_LIST_HEAD(&ns->ns_root_list);
296         ns->ns_refcount = 0;
297         ns->ns_client = client;
298         spin_lock_init(&ns->ns_hash_lock);
299         atomic_set(&ns->ns_locks, 0);
300         ns->ns_resources = 0;
301         cfs_waitq_init(&ns->ns_waitq);
302
303         for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash;
304              bucket--)
305                 CFS_INIT_LIST_HEAD(bucket);
306
307         CFS_INIT_LIST_HEAD(&ns->ns_unused_list);
308         ns->ns_nr_unused = 0;
309         ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE;
310         ns->ns_max_age = LDLM_DEFAULT_MAX_ALIVE;
311         spin_lock_init(&ns->ns_unused_lock);
312         ns->ns_orig_connect_flags = 0;
313         ns->ns_connect_flags = 0;
314         ldlm_proc_namespace(ns);
315
316         idx = atomic_read(ldlm_namespace_nr(client));
317         rc = ldlm_pool_init(&ns->ns_pool, ns, idx, client);
318         if (rc) {
319                 CERROR("Can't initialize lock pool, rc %d\n", rc);
320                 GOTO(out_proc, rc);
321         }
322
323         mutex_down(ldlm_namespace_lock(client));
324         list_add(&ns->ns_list_chain, ldlm_namespace_list(client));
325         atomic_inc(ldlm_namespace_nr(client));
326         mutex_up(ldlm_namespace_lock(client));
327
328         RETURN(ns);
329 out_proc:
330         ldlm_namespace_cleanup(ns, 0);
331         OBD_FREE(ns->ns_name, namelen + 1);
332 out_hash:
333         OBD_VFREE(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
334 out_ns:
335         OBD_FREE_PTR(ns);
336 out_ref:
337         ldlm_put_ref(0);
338         RETURN(NULL);
339 }
340
341 extern struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock);
342
343 /* If flags contains FL_LOCAL_ONLY, don't try to tell the server, just cleanup.
344  * This is currently only used for recovery, and we make certain assumptions
345  * as a result--notably, that we shouldn't cancel locks with refs. -phil
346  *
347  * Called with the ns_lock held. */
348 static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
349                              int flags)
350 {
351         struct list_head *tmp;
352         int rc = 0, client = ns_is_client(res->lr_namespace);
353         int local_only = (flags & LDLM_FL_LOCAL_ONLY);
354         ENTRY;
355
356
357         do {
358                 struct ldlm_lock *lock = NULL;
359
360                 /* first, we look for non-cleaned-yet lock
361                  * all cleaned locks are marked by CLEANED flag */
362                 lock_res(res);
363                 list_for_each(tmp, q) {
364                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
365                         if (lock->l_flags & LDLM_FL_CLEANED) {
366                                 lock = NULL;
367                                 continue;
368                         }
369                         LDLM_LOCK_GET(lock);
370                         lock->l_flags |= LDLM_FL_CLEANED;
371                         break;
372                 }
373
374                 if (lock == NULL) {
375                         unlock_res(res);
376                         break;
377                 }
378
379                 /* Set CBPENDING so nothing in the cancellation path
380                  * can match this lock */
381                 lock->l_flags |= LDLM_FL_CBPENDING;
382                 lock->l_flags |= LDLM_FL_FAILED;
383                 lock->l_flags |= flags;
384
385                 /* ... without sending a CANCEL message for local_only. */
386                 if (local_only)
387                         lock->l_flags |= LDLM_FL_LOCAL_ONLY;
388
389                 if (local_only && (lock->l_readers || lock->l_writers)) {
390                         /* This is a little bit gross, but much better than the
391                          * alternative: pretend that we got a blocking AST from
392                          * the server, so that when the lock is decref'd, it
393                          * will go away ... */
394                         unlock_res(res);
395                         LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
396                         if (lock->l_completion_ast)
397                                 lock->l_completion_ast(lock, 0, NULL);
398                         LDLM_LOCK_PUT(lock);
399                         continue;
400                 }
401
402                 if (client) {
403                         struct lustre_handle lockh;
404
405                         unlock_res(res);
406                         ldlm_lock2handle(lock, &lockh);
407                         rc = ldlm_cli_cancel(&lockh);
408                         if (rc)
409                                 CERROR("ldlm_cli_cancel: %d\n", rc);
410                 } else {
411                         ldlm_resource_unlink_lock(lock);
412                         unlock_res(res);
413                         LDLM_DEBUG(lock, "Freeing a lock still held by a "
414                                    "client node");
415                         ldlm_lock_destroy(lock);
416                 }
417                 LDLM_LOCK_PUT(lock);
418         } while (1);
419
420         EXIT;
421 }
422
423 int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags)
424 {
425         struct list_head *tmp;
426         int i;
427
428         if (ns == NULL) {
429                 CDEBUG(D_INFO, "NULL ns, skipping cleanup\n");
430                 return ELDLM_OK;
431         }
432
433         for (i = 0; i < RES_HASH_SIZE; i++) {
434                 spin_lock(&ns->ns_hash_lock);
435                 tmp = ns->ns_hash[i].next;
436                 while (tmp != &(ns->ns_hash[i])) {
437                         struct ldlm_resource *res;
438                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
439                         ldlm_resource_getref(res);
440                         spin_unlock(&ns->ns_hash_lock);
441
442                         cleanup_resource(res, &res->lr_granted, flags);
443                         cleanup_resource(res, &res->lr_converting, flags);
444                         cleanup_resource(res, &res->lr_waiting, flags);
445
446                         spin_lock(&ns->ns_hash_lock);
447                         tmp  = tmp->next;
448
449                         /* XXX: former stuff caused issues in case of race
450                          * between ldlm_namespace_cleanup() and lockd() when
451                          * client gets blocking ast when lock gets distracted by
452                          * server. This is 1_4 branch solution, let's see how
453                          * will it behave. */
454                         if (!ldlm_resource_putref_locked(res))
455                                 CDEBUG(D_INFO,
456                                        "Namespace %s resource refcount nonzero "
457                                        "(%d) after lock cleanup; forcing cleanup.\n",
458                                        ns->ns_name, atomic_read(&res->lr_refcount));
459                 }
460                 spin_unlock(&ns->ns_hash_lock);
461         }
462
463         return ELDLM_OK;
464 }
465
466 /* Cleanup, but also free, the namespace */
467 int ldlm_namespace_free(struct ldlm_namespace *ns, int force)
468 {
469         ENTRY;
470         if (!ns)
471                 RETURN(ELDLM_OK);
472
473         mutex_down(ldlm_namespace_lock(ns->ns_client));
474         list_del(&ns->ns_list_chain);
475         atomic_dec(ldlm_namespace_nr(ns->ns_client));
476         ldlm_pool_fini(&ns->ns_pool);
477         mutex_up(ldlm_namespace_lock(ns->ns_client));
478
479         /* At shutdown time, don't call the cancellation callback */
480         ldlm_namespace_cleanup(ns, 0);
481
482 #ifdef LPROCFS
483         {
484                 struct proc_dir_entry *dir;
485                 dir = lprocfs_srch(ldlm_ns_proc_dir, ns->ns_name);
486                 if (dir == NULL) {
487                         CERROR("dlm namespace %s has no procfs dir?\n",
488                                ns->ns_name);
489                 } else {
490                         lprocfs_remove(&dir);
491                 }
492         }
493 #endif
494
495         if (ns->ns_refcount > 0) {
496                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
497                 int rc;
498                 CDEBUG(D_DLMTRACE,
499                        "dlm namespace %s free waiting on refcount %d\n",
500                        ns->ns_name, ns->ns_refcount);
501                 rc = l_wait_event(ns->ns_waitq,
502                                   ns->ns_refcount == 0, &lwi);
503                 if (ns->ns_refcount)
504                         LCONSOLE_ERROR_MSG(0x139, "Lock manager: wait for %s "
505                                            "namespace cleanup aborted with %d "
506                                            "resources in use. (%d)\nI'm going "
507                                            "to try to clean up anyway, but I "
508                                            "might need a reboot of this node.\n",
509                                             ns->ns_name, (int) ns->ns_refcount, 
510                                             rc);
511                 CDEBUG(D_DLMTRACE,
512                        "dlm namespace %s free done waiting\n", ns->ns_name);
513         }
514
515         OBD_VFREE(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
516         OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1);
517         OBD_FREE_PTR(ns);
518
519         ldlm_put_ref(force);
520
521         RETURN(ELDLM_OK);
522 }
523
524 void ldlm_namespace_get_nolock(struct ldlm_namespace *ns)
525 {
526         LASSERT(ns->ns_refcount >= 0);
527         ns->ns_refcount++;
528 }
529
530 void ldlm_namespace_get(struct ldlm_namespace *ns)
531 {
532         spin_lock(&ns->ns_hash_lock);
533         ldlm_namespace_get_nolock(ns);
534         spin_unlock(&ns->ns_hash_lock);
535 }
536
537 void ldlm_namespace_put_nolock(struct ldlm_namespace *ns, int wakeup)
538 {
539         LASSERT(ns->ns_refcount > 0);
540         ns->ns_refcount--;
541         if (ns->ns_refcount == 0 && wakeup)
542                 wake_up(&ns->ns_waitq);
543 }
544
545 void ldlm_namespace_put(struct ldlm_namespace *ns, int wakeup)
546 {
547         spin_lock(&ns->ns_hash_lock);
548         ldlm_namespace_put_nolock(ns, wakeup);
549         spin_unlock(&ns->ns_hash_lock);
550 }
551
552 /* Should be called under ldlm_namespace_lock(client) taken */
553 void ldlm_namespace_move(struct ldlm_namespace *ns, ldlm_side_t client)
554 {
555         LASSERT(!list_empty(&ns->ns_list_chain));
556         LASSERT_SEM_LOCKED(ldlm_namespace_lock(client));
557         list_move_tail(&ns->ns_list_chain, ldlm_namespace_list(client));
558 }
559
560 /* Should be called under ldlm_namespace_lock(client) taken */
561 struct ldlm_namespace *ldlm_namespace_first(ldlm_side_t client)
562 {
563         LASSERT_SEM_LOCKED(ldlm_namespace_lock(client));
564         LASSERT(!list_empty(ldlm_namespace_list(client)));
565         return container_of(ldlm_namespace_list(client)->next, 
566                 struct ldlm_namespace, ns_list_chain);
567 }
568 static __u32 ldlm_hash_fn(struct ldlm_resource *parent,
569                           const struct ldlm_res_id *name)
570 {
571         __u32 hash = 0;
572         int i;
573
574         for (i = 0; i < RES_NAME_SIZE; i++)
575                 hash += name->name[i];
576
577         hash += (__u32)((unsigned long)parent >> 4);
578
579         return (hash & RES_HASH_MASK);
580 }
581
582 static struct ldlm_resource *ldlm_resource_new(void)
583 {
584         struct ldlm_resource *res;
585
586         OBD_SLAB_ALLOC(res, ldlm_resource_slab, CFS_ALLOC_IO, sizeof *res);
587         if (res == NULL)
588                 return NULL;
589
590         memset(res, 0, sizeof(*res));
591
592         CFS_INIT_LIST_HEAD(&res->lr_children);
593         CFS_INIT_LIST_HEAD(&res->lr_childof);
594         CFS_INIT_LIST_HEAD(&res->lr_granted);
595         CFS_INIT_LIST_HEAD(&res->lr_converting);
596         CFS_INIT_LIST_HEAD(&res->lr_waiting);
597         atomic_set(&res->lr_refcount, 1);
598         spin_lock_init(&res->lr_lock);
599
600         /* one who creates the resource must unlock
601          * the semaphore after lvb initialization */
602         init_MUTEX_LOCKED(&res->lr_lvb_sem);
603
604         return res;
605 }
606
607 /* must be called with hash lock held */
608 static struct ldlm_resource *
609 ldlm_resource_find(struct ldlm_namespace *ns, const struct ldlm_res_id *name,
610                    __u32 hash)
611 {
612         struct list_head *bucket, *tmp;
613         struct ldlm_resource *res;
614
615         LASSERT_SPIN_LOCKED(&ns->ns_hash_lock);
616         bucket = ns->ns_hash + hash;
617
618         list_for_each(tmp, bucket) {
619                 res = list_entry(tmp, struct ldlm_resource, lr_hash);
620                 if (memcmp(&res->lr_name, name, sizeof(res->lr_name)) == 0)
621                         return res;
622         }
623
624         return NULL;
625 }
626
627 /* Args: locked namespace
628  * Returns: newly-allocated, referenced, unlocked resource */
629 static struct ldlm_resource *
630 ldlm_resource_add(struct ldlm_namespace *ns, struct ldlm_resource *parent,
631                   const struct ldlm_res_id *name, __u32 hash, ldlm_type_t type)
632 {
633         struct list_head *bucket;
634         struct ldlm_resource *res, *old_res;
635         ENTRY;
636
637         LASSERTF(type >= LDLM_MIN_TYPE && type < LDLM_MAX_TYPE,
638                  "type: %d\n", type);
639
640         res = ldlm_resource_new();
641         if (!res)
642                 RETURN(NULL);
643
644         res->lr_name = *name;
645         res->lr_namespace = ns;
646         res->lr_type = type;
647         res->lr_most_restr = LCK_NL;
648
649         spin_lock(&ns->ns_hash_lock);
650         old_res = ldlm_resource_find(ns, name, hash);
651         if (old_res) {
652                 /* someone won the race and added the resource before */
653                 ldlm_resource_getref(old_res);
654                 spin_unlock(&ns->ns_hash_lock);
655                 OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
656                 /* synchronize WRT resource creation */
657                 if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
658                         down(&old_res->lr_lvb_sem);
659                         up(&old_res->lr_lvb_sem);
660                 }
661                 RETURN(old_res);
662         }
663
664         /* we won! let's add the resource */
665         bucket = ns->ns_hash + hash;
666         list_add(&res->lr_hash, bucket);
667         ns->ns_resources++;
668         ldlm_namespace_get_nolock(ns);
669
670         if (parent == NULL) {
671                 list_add(&res->lr_childof, &ns->ns_root_list);
672         } else {
673                 res->lr_parent = parent;
674                 list_add(&res->lr_childof, &parent->lr_children);
675         }
676         spin_unlock(&ns->ns_hash_lock);
677
678         if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
679                 int rc;
680
681                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2);
682                 rc = ns->ns_lvbo->lvbo_init(res);
683                 if (rc)
684                         CERROR("lvbo_init failed for resource "
685                                LPU64": rc %d\n", name->name[0], rc);
686                 /* we create resource with locked lr_lvb_sem */
687                 up(&res->lr_lvb_sem);
688         }
689
690         RETURN(res);
691 }
692
693 /* Args: unlocked namespace
694  * Locks: takes and releases ns->ns_lock and res->lr_lock
695  * Returns: referenced, unlocked ldlm_resource or NULL */
696 struct ldlm_resource *
697 ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
698                   const struct ldlm_res_id *name, ldlm_type_t type, int create)
699 {
700         __u32 hash = ldlm_hash_fn(parent, name);
701         struct ldlm_resource *res = NULL;
702         ENTRY;
703
704         LASSERT(ns != NULL);
705         LASSERT(ns->ns_hash != NULL);
706         LASSERT(name->name[0] != 0);
707
708         spin_lock(&ns->ns_hash_lock);
709         res = ldlm_resource_find(ns, name, hash);
710         if (res) {
711                 ldlm_resource_getref(res);
712                 spin_unlock(&ns->ns_hash_lock);
713                 /* synchronize WRT resource creation */
714                 if (ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
715                         down(&res->lr_lvb_sem);
716                         up(&res->lr_lvb_sem);
717                 }
718                 RETURN(res);
719         }
720         spin_unlock(&ns->ns_hash_lock);
721
722         if (create == 0)
723                 RETURN(NULL);
724
725         res = ldlm_resource_add(ns, parent, name, hash, type);
726         RETURN(res);
727 }
728
729 struct ldlm_resource *ldlm_resource_getref(struct ldlm_resource *res)
730 {
731         LASSERT(res != NULL);
732         LASSERT(res != LP_POISON);
733         atomic_inc(&res->lr_refcount);
734         CDEBUG(D_INFO, "getref res: %p count: %d\n", res,
735                atomic_read(&res->lr_refcount));
736         return res;
737 }
738
739 void __ldlm_resource_putref_final(struct ldlm_resource *res)
740 {
741         struct ldlm_namespace *ns = res->lr_namespace;
742
743         LASSERT_SPIN_LOCKED(&ns->ns_hash_lock);
744
745         if (!list_empty(&res->lr_granted)) {
746                 ldlm_resource_dump(D_ERROR, res);
747                 LBUG();
748         }
749
750         if (!list_empty(&res->lr_converting)) {
751                 ldlm_resource_dump(D_ERROR, res);
752                 LBUG();
753         }
754
755         if (!list_empty(&res->lr_waiting)) {
756                 ldlm_resource_dump(D_ERROR, res);
757                 LBUG();
758         }
759
760         if (!list_empty(&res->lr_children)) {
761                 ldlm_resource_dump(D_ERROR, res);
762                 LBUG();
763         }
764
765         /* Pass 0 here to not wake ->ns_waitq up yet, we will do it few 
766          * lines below when all children are freed. */
767         ldlm_namespace_put_nolock(ns, 0);
768         list_del_init(&res->lr_hash);
769         list_del_init(&res->lr_childof);
770
771         ns->ns_resources--;
772         if (ns->ns_resources == 0)
773                 wake_up(&ns->ns_waitq);
774 }
775
776 /* Returns 1 if the resource was freed, 0 if it remains. */
777 int ldlm_resource_putref(struct ldlm_resource *res)
778 {
779         struct ldlm_namespace *ns = res->lr_namespace;
780         int rc = 0;
781         ENTRY;
782
783         CDEBUG(D_INFO, "putref res: %p count: %d\n", res,
784                atomic_read(&res->lr_refcount) - 1);
785         LASSERTF(atomic_read(&res->lr_refcount) > 0, "%d",
786                  atomic_read(&res->lr_refcount));
787         LASSERTF(atomic_read(&res->lr_refcount) < LI_POISON, "%d",
788                  atomic_read(&res->lr_refcount));
789
790         if (atomic_dec_and_lock(&res->lr_refcount, &ns->ns_hash_lock)) {
791                 __ldlm_resource_putref_final(res);
792                 spin_unlock(&ns->ns_hash_lock);
793                 if (res->lr_lvb_data)
794                         OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
795                 OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
796                 rc = 1;
797         }
798
799         RETURN(rc);
800 }
801
802 /* Returns 1 if the resource was freed, 0 if it remains. */
803 int ldlm_resource_putref_locked(struct ldlm_resource *res)
804 {
805         int rc = 0;
806         ENTRY;
807
808         CDEBUG(D_INFO, "putref res: %p count: %d\n", res,
809                atomic_read(&res->lr_refcount) - 1);
810         LASSERT(atomic_read(&res->lr_refcount) > 0);
811         LASSERT(atomic_read(&res->lr_refcount) < LI_POISON);
812
813         LASSERT(atomic_read(&res->lr_refcount) >= 0);
814         if (atomic_dec_and_test(&res->lr_refcount)) {
815                 __ldlm_resource_putref_final(res);
816                 if (res->lr_lvb_data)
817                         OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
818                 OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
819                 rc = 1;
820         }
821
822         RETURN(rc);
823 }
824
825 void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
826                             struct ldlm_lock *lock)
827 {
828         check_res_locked(res);
829
830         ldlm_resource_dump(D_OTHER, res);
831         CDEBUG(D_OTHER, "About to add this lock:\n");
832         ldlm_lock_dump(D_OTHER, lock, 0);
833
834         if (lock->l_destroyed) {
835                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
836                 return;
837         }
838
839         LASSERT(list_empty(&lock->l_res_link));
840
841         list_add_tail(&lock->l_res_link, head);
842 }
843
844 void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
845                                      struct ldlm_lock *new)
846 {
847         struct ldlm_resource *res = original->l_resource;
848
849         check_res_locked(res);
850
851         ldlm_resource_dump(D_OTHER, res);
852         CDEBUG(D_OTHER, "About to insert this lock after %p:\n", original);
853         ldlm_lock_dump(D_OTHER, new, 0);
854
855         if (new->l_destroyed) {
856                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
857                 goto out;
858         }
859
860         LASSERT(list_empty(&new->l_res_link));
861
862         list_add(&new->l_res_link, &original->l_res_link);
863  out:;
864 }
865
866 void ldlm_resource_unlink_lock(struct ldlm_lock *lock)
867 {
868         check_res_locked(lock->l_resource);
869         ldlm_unlink_lock_skiplist(lock);
870         list_del_init(&lock->l_res_link);
871 }
872
873 void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
874 {
875         desc->lr_type = res->lr_type;
876         desc->lr_name = res->lr_name;
877 }
878
879 void ldlm_dump_all_namespaces(ldlm_side_t client, int level)
880 {
881         struct list_head *tmp;
882
883         if (!((libcfs_debug | D_ERROR) & level))
884                 return;
885
886         mutex_down(ldlm_namespace_lock(client));
887
888         list_for_each(tmp, ldlm_namespace_list(client)) {
889                 struct ldlm_namespace *ns;
890                 ns = list_entry(tmp, struct ldlm_namespace, ns_list_chain);
891                 ldlm_namespace_dump(level, ns);
892         }
893
894         mutex_up(ldlm_namespace_lock(client));
895 }
896
897 void ldlm_namespace_dump(int level, struct ldlm_namespace *ns)
898 {
899         struct list_head *tmp;
900
901         if (!((libcfs_debug | D_ERROR) & level))
902                 return;
903
904         CDEBUG(level, "--- Namespace: %s (rc: %d, side: %s)\n", 
905                ns->ns_name, ns->ns_refcount, 
906                ns_is_client(ns) ? "client" : "server");
907
908         if (cfs_time_before(cfs_time_current(), ns->ns_next_dump))
909                 return;
910
911         spin_lock(&ns->ns_hash_lock);
912         tmp = ns->ns_root_list.next;
913         while (tmp != &ns->ns_root_list) {
914                 struct ldlm_resource *res;
915                 res = list_entry(tmp, struct ldlm_resource, lr_childof);
916
917                 ldlm_resource_getref(res);
918                 spin_unlock(&ns->ns_hash_lock);
919
920                 lock_res(res);
921                 ldlm_resource_dump(level, res);
922                 unlock_res(res);
923
924                 spin_lock(&ns->ns_hash_lock);
925                 tmp = tmp->next;
926                 ldlm_resource_putref_locked(res);
927         }
928         ns->ns_next_dump = cfs_time_shift(10);
929         spin_unlock(&ns->ns_hash_lock);
930 }
931
932 void ldlm_resource_dump(int level, struct ldlm_resource *res)
933 {
934         struct list_head *tmp;
935         int pos;
936
937         CLASSERT(RES_NAME_SIZE == 4);
938
939         if (!((libcfs_debug | D_ERROR) & level))
940                 return;
941
942         CDEBUG(level, "--- Resource: %p ("LPU64"/"LPU64"/"LPU64"/"LPU64
943                ") (rc: %d)\n", res, res->lr_name.name[0], res->lr_name.name[1],
944                res->lr_name.name[2], res->lr_name.name[3],
945                atomic_read(&res->lr_refcount));
946
947         if (!list_empty(&res->lr_granted)) {
948                 pos = 0;
949                 CDEBUG(level, "Granted locks:\n");
950                 list_for_each(tmp, &res->lr_granted) {
951                         struct ldlm_lock *lock;
952                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
953                         ldlm_lock_dump(level, lock, ++pos);
954                 }
955         }
956         if (!list_empty(&res->lr_converting)) {
957                 pos = 0;
958                 CDEBUG(level, "Converting locks:\n");
959                 list_for_each(tmp, &res->lr_converting) {
960                         struct ldlm_lock *lock;
961                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
962                         ldlm_lock_dump(level, lock, ++pos);
963                 }
964         }
965         if (!list_empty(&res->lr_waiting)) {
966                 pos = 0;
967                 CDEBUG(level, "Waiting locks:\n");
968                 list_for_each(tmp, &res->lr_waiting) {
969                         struct ldlm_lock *lock;
970                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
971                         ldlm_lock_dump(level, lock, ++pos);
972                 }
973         }
974 }