Whamcloud - gitweb
LU-8901 misc: update Intel copyright messages for 2016
[fs/lustre-release.git] / lustre / ldlm / ldlm_resource.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ldlm/ldlm_resource.c
33  *
34  * Author: Phil Schwan <phil@clusterfs.com>
35  * Author: Peter Braam <braam@clusterfs.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LDLM
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <obd_class.h>
42 #include "ldlm_internal.h"
43
44 struct kmem_cache *ldlm_resource_slab, *ldlm_lock_slab;
45 struct kmem_cache *ldlm_interval_tree_slab;
46
47 int ldlm_srv_namespace_nr = 0;
48 int ldlm_cli_namespace_nr = 0;
49
50 struct mutex ldlm_srv_namespace_lock;
51 struct list_head ldlm_srv_namespace_list;
52
53 struct mutex ldlm_cli_namespace_lock;
54 /* Client Namespaces that have active resources in them.
55  * Once all resources go away, ldlm_poold moves such namespaces to the
56  * inactive list */
57 struct list_head ldlm_cli_active_namespace_list;
58 /* Client namespaces that don't have any locks in them */
59 struct list_head ldlm_cli_inactive_namespace_list;
60
61 static struct proc_dir_entry *ldlm_type_proc_dir;
62 static struct proc_dir_entry *ldlm_ns_proc_dir;
63 struct proc_dir_entry *ldlm_svc_proc_dir;
64
65 /* during debug dump certain amount of granted locks for one resource to avoid
66  * DDOS. */
67 static unsigned int ldlm_dump_granted_max = 256;
68
69 #ifdef CONFIG_PROC_FS
70 static ssize_t
71 lprocfs_dump_ns_seq_write(struct file *file, const char __user *buffer,
72                           size_t count, loff_t *off)
73 {
74         ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
75         ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
76         RETURN(count);
77 }
78 LPROC_SEQ_FOPS_WO_TYPE(ldlm, dump_ns);
79
80 LPROC_SEQ_FOPS_RW_TYPE(ldlm_rw, uint);
81 LPROC_SEQ_FOPS_RO_TYPE(ldlm, uint);
82
83 #ifdef HAVE_SERVER_SUPPORT
84
85 static int seq_watermark_show(struct seq_file *m, void *data)
86 {
87         seq_printf(m, "%llu\n", *(__u64 *)m->private);
88         return 0;
89 }
90
91 static ssize_t seq_watermark_write(struct file *file,
92                                    const char __user *buffer, size_t count,
93                                    loff_t *off)
94 {
95         __s64 value;
96         __u64 watermark;
97         __u64 *data = ((struct seq_file *)file->private_data)->private;
98         bool wm_low = (data == &ldlm_reclaim_threshold_mb) ? true : false;
99         int rc;
100
101         rc = lprocfs_str_with_units_to_s64(buffer, count, &value, 'M');
102         if (rc) {
103                 CERROR("Failed to set %s, rc = %d.\n",
104                        wm_low ? "lock_reclaim_threshold_mb" : "lock_limit_mb",
105                        rc);
106                 return rc;
107         } else if (value != 0 && value < (1 << 20)) {
108                 CERROR("%s should be greater than 1MB.\n",
109                        wm_low ? "lock_reclaim_threshold_mb" : "lock_limit_mb");
110                 return -EINVAL;
111         }
112         watermark = value >> 20;
113
114         if (wm_low) {
115                 if (ldlm_lock_limit_mb != 0 && watermark > ldlm_lock_limit_mb) {
116                         CERROR("lock_reclaim_threshold_mb must be smaller than "
117                                "lock_limit_mb.\n");
118                         return -EINVAL;
119                 }
120
121                 *data = watermark;
122                 if (watermark != 0) {
123                         watermark <<= 20;
124                         do_div(watermark, sizeof(struct ldlm_lock));
125                 }
126                 ldlm_reclaim_threshold = watermark;
127         } else {
128                 if (ldlm_reclaim_threshold_mb != 0 &&
129                     watermark < ldlm_reclaim_threshold_mb) {
130                         CERROR("lock_limit_mb must be greater than "
131                                "lock_reclaim_threshold_mb.\n");
132                         return -EINVAL;
133                 }
134
135                 *data = watermark;
136                 if (watermark != 0) {
137                         watermark <<= 20;
138                         do_div(watermark, sizeof(struct ldlm_lock));
139                 }
140                 ldlm_lock_limit = watermark;
141         }
142
143         return count;
144 }
145
146 static int seq_watermark_open(struct inode *inode, struct file *file)
147 {
148         return single_open(file, seq_watermark_show, PDE_DATA(inode));
149 }
150
151 static const struct file_operations ldlm_watermark_fops = {
152         .owner          = THIS_MODULE,
153         .open           = seq_watermark_open,
154         .read           = seq_read,
155         .write          = seq_watermark_write,
156         .llseek         = seq_lseek,
157         .release        = lprocfs_single_release,
158 };
159
160 static int seq_granted_show(struct seq_file *m, void *data)
161 {
162         seq_printf(m, "%llu\n", percpu_counter_sum_positive(
163                    (struct percpu_counter *)m->private));
164         return 0;
165 }
166
167 static int seq_granted_open(struct inode *inode, struct file *file)
168 {
169         return single_open(file, seq_granted_show, PDE_DATA(inode));
170 }
171
172 static const struct file_operations ldlm_granted_fops = {
173         .owner  = THIS_MODULE,
174         .open   = seq_granted_open,
175         .read   = seq_read,
176         .llseek = seq_lseek,
177         .release = seq_release,
178 };
179
180 #endif /* HAVE_SERVER_SUPPORT */
181
182 int ldlm_proc_setup(void)
183 {
184         int rc;
185         struct lprocfs_vars list[] = {
186                 { .name =       "dump_namespaces",
187                   .fops =       &ldlm_dump_ns_fops,
188                   .proc_mode =  0222 },
189                 { .name =       "dump_granted_max",
190                   .fops =       &ldlm_rw_uint_fops,
191                   .data =       &ldlm_dump_granted_max },
192                 { .name =       "cancel_unused_locks_before_replay",
193                   .fops =       &ldlm_rw_uint_fops,
194                   .data =       &ldlm_cancel_unused_locks_before_replay },
195 #ifdef HAVE_SERVER_SUPPORT
196                 { .name =       "lock_reclaim_threshold_mb",
197                   .fops =       &ldlm_watermark_fops,
198                   .data =       &ldlm_reclaim_threshold_mb },
199                 { .name =       "lock_limit_mb",
200                   .fops =       &ldlm_watermark_fops,
201                   .data =       &ldlm_lock_limit_mb },
202                 { .name =       "lock_granted_count",
203                   .fops =       &ldlm_granted_fops,
204                   .data =       &ldlm_granted_total },
205 #endif
206                 { NULL }};
207         ENTRY;
208         LASSERT(ldlm_ns_proc_dir == NULL);
209
210         ldlm_type_proc_dir = lprocfs_register(OBD_LDLM_DEVICENAME,
211                                               proc_lustre_root,
212                                               NULL, NULL);
213         if (IS_ERR(ldlm_type_proc_dir)) {
214                 CERROR("LProcFS failed in ldlm-init\n");
215                 rc = PTR_ERR(ldlm_type_proc_dir);
216                 GOTO(err, rc);
217         }
218
219         ldlm_ns_proc_dir = lprocfs_register("namespaces",
220                                             ldlm_type_proc_dir,
221                                             NULL, NULL);
222         if (IS_ERR(ldlm_ns_proc_dir)) {
223                 CERROR("LProcFS failed in ldlm-init\n");
224                 rc = PTR_ERR(ldlm_ns_proc_dir);
225                 GOTO(err_type, rc);
226         }
227
228         ldlm_svc_proc_dir = lprocfs_register("services",
229                                              ldlm_type_proc_dir,
230                                              NULL, NULL);
231         if (IS_ERR(ldlm_svc_proc_dir)) {
232                 CERROR("LProcFS failed in ldlm-init\n");
233                 rc = PTR_ERR(ldlm_svc_proc_dir);
234                 GOTO(err_ns, rc);
235         }
236
237         rc = lprocfs_add_vars(ldlm_type_proc_dir, list, NULL);
238         if (rc != 0) {
239                 CERROR("LProcFS failed in ldlm-init\n");
240                 GOTO(err_svc, rc);
241         }
242
243         RETURN(0);
244
245 err_svc:
246         lprocfs_remove(&ldlm_svc_proc_dir);
247 err_ns:
248         lprocfs_remove(&ldlm_ns_proc_dir);
249 err_type:
250         lprocfs_remove(&ldlm_type_proc_dir);
251 err:
252         ldlm_svc_proc_dir = NULL;
253         RETURN(rc);
254 }
255
256 void ldlm_proc_cleanup(void)
257 {
258         if (ldlm_svc_proc_dir)
259                 lprocfs_remove(&ldlm_svc_proc_dir);
260
261         if (ldlm_ns_proc_dir)
262                 lprocfs_remove(&ldlm_ns_proc_dir);
263
264         if (ldlm_type_proc_dir)
265                 lprocfs_remove(&ldlm_type_proc_dir);
266 }
267
268 static int lprocfs_ns_resources_seq_show(struct seq_file *m, void *v)
269 {
270         struct ldlm_namespace   *ns  = m->private;
271         __u64                   res = 0;
272         struct cfs_hash_bd              bd;
273         int                     i;
274
275         /* result is not strictly consistant */
276         cfs_hash_for_each_bucket(ns->ns_rs_hash, &bd, i)
277                 res += cfs_hash_bd_count_get(&bd);
278         return lprocfs_u64_seq_show(m, &res);
279 }
280 LPROC_SEQ_FOPS_RO(lprocfs_ns_resources);
281
282 static int lprocfs_ns_locks_seq_show(struct seq_file *m, void *v)
283 {
284         struct ldlm_namespace   *ns = m->private;
285         __u64                   locks;
286
287         locks = lprocfs_stats_collector(ns->ns_stats, LDLM_NSS_LOCKS,
288                                         LPROCFS_FIELDS_FLAGS_SUM);
289         return lprocfs_u64_seq_show(m, &locks);
290 }
291 LPROC_SEQ_FOPS_RO(lprocfs_ns_locks);
292
293 static int lprocfs_lru_size_seq_show(struct seq_file *m, void *v)
294 {
295         struct ldlm_namespace *ns = m->private;
296         __u32 *nr = &ns->ns_max_unused;
297
298         if (ns_connect_lru_resize(ns))
299                 nr = &ns->ns_nr_unused;
300         return lprocfs_uint_seq_show(m, nr);
301 }
302
303 static ssize_t lprocfs_lru_size_seq_write(struct file *file,
304                                           const char __user *buffer,
305                                           size_t count, loff_t *off)
306 {
307         struct ldlm_namespace *ns = ((struct seq_file *)file->private_data)->private;
308         char                   dummy[MAX_STRING_SIZE + 1];
309         char                  *end;
310         unsigned long          tmp;
311         int                    lru_resize;
312
313         if (count >= sizeof(dummy))
314                 return -EINVAL;
315
316         if (count == 0)
317                 return 0;
318
319         if (copy_from_user(dummy, buffer, count))
320                 return -EFAULT;
321
322         dummy[count] = 0;
323
324         if (strncmp(dummy, "clear", 5) == 0) {
325                 CDEBUG(D_DLMTRACE,
326                        "dropping all unused locks from namespace %s\n",
327                        ldlm_ns_name(ns));
328                 if (ns_connect_lru_resize(ns)) {
329                         int canceled, unused  = ns->ns_nr_unused;
330
331                         /* Try to cancel all @ns_nr_unused locks. */
332                         canceled = ldlm_cancel_lru(ns, unused, 0,
333                                                    LDLM_LRU_FLAG_PASSED);
334                         if (canceled < unused) {
335                                 CDEBUG(D_DLMTRACE,
336                                        "not all requested locks are canceled, "
337                                        "requested: %d, canceled: %d\n", unused,
338                                        canceled);
339                                 return -EINVAL;
340                         }
341                 } else {
342                         tmp = ns->ns_max_unused;
343                         ns->ns_max_unused = 0;
344                         ldlm_cancel_lru(ns, 0, 0, LDLM_LRU_FLAG_PASSED);
345                         ns->ns_max_unused = tmp;
346                 }
347                 return count;
348         }
349
350         tmp = simple_strtoul(dummy, &end, 0);
351         if (dummy == end) {
352                 CERROR("invalid value written\n");
353                 return -EINVAL;
354         }
355         lru_resize = (tmp == 0);
356
357         if (ns_connect_lru_resize(ns)) {
358                 if (!lru_resize)
359                         ns->ns_max_unused = tmp;
360
361                 if (tmp > ns->ns_nr_unused)
362                         tmp = ns->ns_nr_unused;
363                 tmp = ns->ns_nr_unused - tmp;
364
365                 CDEBUG(D_DLMTRACE,
366                        "changing namespace %s unused locks from %u to %u\n",
367                        ldlm_ns_name(ns), ns->ns_nr_unused,
368                        (unsigned int)tmp);
369                 ldlm_cancel_lru(ns, tmp, LCF_ASYNC, LDLM_LRU_FLAG_PASSED);
370
371                 if (!lru_resize) {
372                         CDEBUG(D_DLMTRACE,
373                                "disable lru_resize for namespace %s\n",
374                                ldlm_ns_name(ns));
375                         ns->ns_connect_flags &= ~OBD_CONNECT_LRU_RESIZE;
376                 }
377         } else {
378                 CDEBUG(D_DLMTRACE,
379                        "changing namespace %s max_unused from %u to %u\n",
380                        ldlm_ns_name(ns), ns->ns_max_unused,
381                        (unsigned int)tmp);
382                 ns->ns_max_unused = (unsigned int)tmp;
383                 ldlm_cancel_lru(ns, 0, LCF_ASYNC, LDLM_LRU_FLAG_PASSED);
384
385                 /* Make sure that LRU resize was originally supported before
386                  * turning it on here. */
387                 if (lru_resize &&
388                     (ns->ns_orig_connect_flags & OBD_CONNECT_LRU_RESIZE)) {
389                         CDEBUG(D_DLMTRACE,
390                                "enable lru_resize for namespace %s\n",
391                                ldlm_ns_name(ns));
392                         ns->ns_connect_flags |= OBD_CONNECT_LRU_RESIZE;
393                 }
394         }
395
396         return count;
397 }
398 LPROC_SEQ_FOPS(lprocfs_lru_size);
399
400 static int lprocfs_elc_seq_show(struct seq_file *m, void *v)
401 {
402         struct ldlm_namespace *ns = m->private;
403         unsigned int supp = ns_connect_cancelset(ns);
404
405         return lprocfs_uint_seq_show(m, &supp);
406 }
407
408 static ssize_t lprocfs_elc_seq_write(struct file *file,
409                                      const char __user *buffer,
410                                      size_t count, loff_t *off)
411 {
412         struct ldlm_namespace *ns = ((struct seq_file *)file->private_data)->private;
413         unsigned int supp = -1;
414         int rc;
415
416         rc = lprocfs_wr_uint(file, buffer, count, &supp);
417         if (rc < 0)
418                 return rc;
419
420         if (supp == 0)
421                 ns->ns_connect_flags &= ~OBD_CONNECT_CANCELSET;
422         else if (ns->ns_orig_connect_flags & OBD_CONNECT_CANCELSET)
423                 ns->ns_connect_flags |= OBD_CONNECT_CANCELSET;
424         return count;
425 }
426 LPROC_SEQ_FOPS(lprocfs_elc);
427
428 static void ldlm_namespace_proc_unregister(struct ldlm_namespace *ns)
429 {
430         if (ns->ns_proc_dir_entry == NULL)
431                 CERROR("dlm namespace %s has no procfs dir?\n",
432                        ldlm_ns_name(ns));
433         else
434                 lprocfs_remove(&ns->ns_proc_dir_entry);
435
436         if (ns->ns_stats != NULL)
437                 lprocfs_free_stats(&ns->ns_stats);
438 }
439
440 static int ldlm_namespace_proc_register(struct ldlm_namespace *ns)
441 {
442         struct lprocfs_vars lock_vars[2];
443         char lock_name[MAX_STRING_SIZE + 1];
444         struct proc_dir_entry *ns_pde;
445
446         LASSERT(ns != NULL);
447         LASSERT(ns->ns_rs_hash != NULL);
448
449         if (ns->ns_proc_dir_entry != NULL) {
450                 ns_pde = ns->ns_proc_dir_entry;
451         } else {
452                 ns_pde = proc_mkdir(ldlm_ns_name(ns), ldlm_ns_proc_dir);
453                 if (ns_pde == NULL)
454                         return -ENOMEM;
455                 ns->ns_proc_dir_entry = ns_pde;
456         }
457
458         ns->ns_stats = lprocfs_alloc_stats(LDLM_NSS_LAST, 0);
459         if (ns->ns_stats == NULL)
460                 return -ENOMEM;
461
462         lprocfs_counter_init(ns->ns_stats, LDLM_NSS_LOCKS,
463                              LPROCFS_CNTR_AVGMINMAX, "locks", "locks");
464
465         lock_name[MAX_STRING_SIZE] = '\0';
466
467         memset(lock_vars, 0, sizeof(lock_vars));
468         lock_vars[0].name = lock_name;
469
470         ldlm_add_var(&lock_vars[0], ns_pde, "resource_count", ns,
471                      &lprocfs_ns_resources_fops);
472         ldlm_add_var(&lock_vars[0], ns_pde, "lock_count", ns,
473                      &lprocfs_ns_locks_fops);
474
475         if (ns_is_client(ns)) {
476                 ldlm_add_var(&lock_vars[0], ns_pde, "lock_unused_count",
477                              &ns->ns_nr_unused, &ldlm_uint_fops);
478                 ldlm_add_var(&lock_vars[0], ns_pde, "lru_size", ns,
479                              &lprocfs_lru_size_fops);
480                 ldlm_add_var(&lock_vars[0], ns_pde, "lru_max_age",
481                              &ns->ns_max_age, &ldlm_rw_uint_fops);
482                 ldlm_add_var(&lock_vars[0], ns_pde, "early_lock_cancel",
483                              ns, &lprocfs_elc_fops);
484         } else {
485                 ldlm_add_var(&lock_vars[0], ns_pde, "ctime_age_limit",
486                              &ns->ns_ctime_age_limit, &ldlm_rw_uint_fops);
487                 ldlm_add_var(&lock_vars[0], ns_pde, "lock_timeouts",
488                              &ns->ns_timeouts, &ldlm_uint_fops);
489                 ldlm_add_var(&lock_vars[0], ns_pde, "max_nolock_bytes",
490                              &ns->ns_max_nolock_size, &ldlm_rw_uint_fops);
491                 ldlm_add_var(&lock_vars[0], ns_pde, "contention_seconds",
492                              &ns->ns_contention_time, &ldlm_rw_uint_fops);
493                 ldlm_add_var(&lock_vars[0], ns_pde, "contended_locks",
494                              &ns->ns_contended_locks, &ldlm_rw_uint_fops);
495                 ldlm_add_var(&lock_vars[0], ns_pde, "max_parallel_ast",
496                              &ns->ns_max_parallel_ast, &ldlm_rw_uint_fops);
497         }
498         return 0;
499 }
500 #undef MAX_STRING_SIZE
501 #else /* CONFIG_PROC_FS */
502
503 #define ldlm_namespace_proc_unregister(ns)      ({;})
504 #define ldlm_namespace_proc_register(ns)        ({0;})
505
506 #endif /* CONFIG_PROC_FS */
507
508 static unsigned ldlm_res_hop_hash(struct cfs_hash *hs,
509                                   const void *key, unsigned mask)
510 {
511         const struct ldlm_res_id     *id  = key;
512         unsigned                val = 0;
513         unsigned                i;
514
515         for (i = 0; i < RES_NAME_SIZE; i++)
516                 val += id->name[i];
517         return val & mask;
518 }
519
520 static unsigned ldlm_res_hop_fid_hash(struct cfs_hash *hs,
521                                       const void *key, unsigned mask)
522 {
523         const struct ldlm_res_id *id = key;
524         struct lu_fid       fid;
525         __u32               hash;
526         __u32               val;
527
528         fid.f_seq = id->name[LUSTRE_RES_ID_SEQ_OFF];
529         fid.f_oid = (__u32)id->name[LUSTRE_RES_ID_VER_OID_OFF];
530         fid.f_ver = (__u32)(id->name[LUSTRE_RES_ID_VER_OID_OFF] >> 32);
531
532         hash = fid_flatten32(&fid);
533         hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
534         if (id->name[LUSTRE_RES_ID_HSH_OFF] != 0) {
535                 val = id->name[LUSTRE_RES_ID_HSH_OFF];
536                 hash += (val >> 5) + (val << 11);
537         } else {
538                 val = fid_oid(&fid);
539         }
540         hash = hash_long(hash, hs->hs_bkt_bits);
541         /* give me another random factor */
542         hash -= hash_long((unsigned long)hs, val % 11 + 3);
543
544         hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
545         hash |= ldlm_res_hop_hash(hs, key, CFS_HASH_NBKT(hs) - 1);
546
547         return hash & mask;
548 }
549
550 static void *ldlm_res_hop_key(struct hlist_node *hnode)
551 {
552         struct ldlm_resource   *res;
553
554         res = hlist_entry(hnode, struct ldlm_resource, lr_hash);
555         return &res->lr_name;
556 }
557
558 static int ldlm_res_hop_keycmp(const void *key, struct hlist_node *hnode)
559 {
560         struct ldlm_resource   *res;
561
562         res = hlist_entry(hnode, struct ldlm_resource, lr_hash);
563         return ldlm_res_eq((const struct ldlm_res_id *)key,
564                            (const struct ldlm_res_id *)&res->lr_name);
565 }
566
567 static void *ldlm_res_hop_object(struct hlist_node *hnode)
568 {
569         return hlist_entry(hnode, struct ldlm_resource, lr_hash);
570 }
571
572 static void
573 ldlm_res_hop_get_locked(struct cfs_hash *hs, struct hlist_node *hnode)
574 {
575         struct ldlm_resource *res;
576
577         res = hlist_entry(hnode, struct ldlm_resource, lr_hash);
578         ldlm_resource_getref(res);
579 }
580
581 static void ldlm_res_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
582 {
583         struct ldlm_resource *res;
584
585         res = hlist_entry(hnode, struct ldlm_resource, lr_hash);
586         ldlm_resource_putref(res);
587 }
588
589 static struct cfs_hash_ops ldlm_ns_hash_ops = {
590         .hs_hash        = ldlm_res_hop_hash,
591         .hs_key         = ldlm_res_hop_key,
592         .hs_keycmp      = ldlm_res_hop_keycmp,
593         .hs_keycpy      = NULL,
594         .hs_object      = ldlm_res_hop_object,
595         .hs_get         = ldlm_res_hop_get_locked,
596         .hs_put         = ldlm_res_hop_put
597 };
598
599 static struct cfs_hash_ops ldlm_ns_fid_hash_ops = {
600         .hs_hash        = ldlm_res_hop_fid_hash,
601         .hs_key         = ldlm_res_hop_key,
602         .hs_keycmp      = ldlm_res_hop_keycmp,
603         .hs_keycpy      = NULL,
604         .hs_object      = ldlm_res_hop_object,
605         .hs_get         = ldlm_res_hop_get_locked,
606         .hs_put         = ldlm_res_hop_put
607 };
608
609 typedef struct ldlm_ns_hash_def {
610         enum ldlm_ns_type       nsd_type;
611         /** hash bucket bits */
612         unsigned                nsd_bkt_bits;
613         /** hash bits */
614         unsigned                nsd_all_bits;
615         /** hash operations */
616         struct cfs_hash_ops *nsd_hops;
617 } ldlm_ns_hash_def_t;
618
619 static struct ldlm_ns_hash_def ldlm_ns_hash_defs[] =
620 {
621         {
622                 .nsd_type       = LDLM_NS_TYPE_MDC,
623                 .nsd_bkt_bits   = 11,
624                 .nsd_all_bits   = 16,
625                 .nsd_hops       = &ldlm_ns_fid_hash_ops,
626         },
627         {
628                 .nsd_type       = LDLM_NS_TYPE_MDT,
629                 .nsd_bkt_bits   = 14,
630                 .nsd_all_bits   = 21,
631                 .nsd_hops       = &ldlm_ns_fid_hash_ops,
632         },
633         {
634                 .nsd_type       = LDLM_NS_TYPE_OSC,
635                 .nsd_bkt_bits   = 8,
636                 .nsd_all_bits   = 12,
637                 .nsd_hops       = &ldlm_ns_hash_ops,
638         },
639         {
640                 .nsd_type       = LDLM_NS_TYPE_OST,
641                 .nsd_bkt_bits   = 11,
642                 .nsd_all_bits   = 17,
643                 .nsd_hops       = &ldlm_ns_hash_ops,
644         },
645         {
646                 .nsd_type       = LDLM_NS_TYPE_MGC,
647                 .nsd_bkt_bits   = 4,
648                 .nsd_all_bits   = 4,
649                 .nsd_hops       = &ldlm_ns_hash_ops,
650         },
651         {
652                 .nsd_type       = LDLM_NS_TYPE_MGT,
653                 .nsd_bkt_bits   = 4,
654                 .nsd_all_bits   = 4,
655                 .nsd_hops       = &ldlm_ns_hash_ops,
656         },
657         {
658                 .nsd_type       = LDLM_NS_TYPE_UNKNOWN,
659         },
660 };
661
662 /**
663  * Create and initialize new empty namespace.
664  */
665 struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obd, char *name,
666                                           enum ldlm_side client,
667                                           enum ldlm_appetite apt,
668                                           enum ldlm_ns_type ns_type)
669 {
670         struct ldlm_namespace *ns = NULL;
671         struct ldlm_ns_bucket *nsb;
672         struct ldlm_ns_hash_def *nsd;
673         struct cfs_hash_bd bd;
674         int idx;
675         int rc;
676         ENTRY;
677
678         LASSERT(obd != NULL);
679
680         rc = ldlm_get_ref();
681         if (rc) {
682                 CERROR("ldlm_get_ref failed: %d\n", rc);
683                 RETURN(NULL);
684         }
685
686         for (idx = 0;;idx++) {
687                 nsd = &ldlm_ns_hash_defs[idx];
688                 if (nsd->nsd_type == LDLM_NS_TYPE_UNKNOWN) {
689                         CERROR("Unknown type %d for ns %s\n", ns_type, name);
690                         GOTO(out_ref, NULL);
691                 }
692
693                 if (nsd->nsd_type == ns_type)
694                         break;
695         }
696
697         OBD_ALLOC_PTR(ns);
698         if (!ns)
699                 GOTO(out_ref, NULL);
700
701         ns->ns_rs_hash = cfs_hash_create(name,
702                                          nsd->nsd_all_bits, nsd->nsd_all_bits,
703                                          nsd->nsd_bkt_bits, sizeof(*nsb),
704                                          CFS_HASH_MIN_THETA,
705                                          CFS_HASH_MAX_THETA,
706                                          nsd->nsd_hops,
707                                          CFS_HASH_DEPTH |
708                                          CFS_HASH_BIGNAME |
709                                          CFS_HASH_SPIN_BKTLOCK |
710                                          CFS_HASH_NO_ITEMREF);
711         if (ns->ns_rs_hash == NULL)
712                 GOTO(out_ns, NULL);
713
714         cfs_hash_for_each_bucket(ns->ns_rs_hash, &bd, idx) {
715                 nsb = cfs_hash_bd_extra_get(ns->ns_rs_hash, &bd);
716                 at_init(&nsb->nsb_at_estimate, ldlm_enqueue_min, 0);
717                 nsb->nsb_namespace = ns;
718                 nsb->nsb_reclaim_start = 0;
719         }
720
721         ns->ns_obd      = obd;
722         ns->ns_appetite = apt;
723         ns->ns_client   = client;
724
725         INIT_LIST_HEAD(&ns->ns_list_chain);
726         INIT_LIST_HEAD(&ns->ns_unused_list);
727         spin_lock_init(&ns->ns_lock);
728         atomic_set(&ns->ns_bref, 0);
729         init_waitqueue_head(&ns->ns_waitq);
730
731         ns->ns_max_nolock_size    = NS_DEFAULT_MAX_NOLOCK_BYTES;
732         ns->ns_contention_time    = NS_DEFAULT_CONTENTION_SECONDS;
733         ns->ns_contended_locks    = NS_DEFAULT_CONTENDED_LOCKS;
734
735         ns->ns_max_parallel_ast   = LDLM_DEFAULT_PARALLEL_AST_LIMIT;
736         ns->ns_nr_unused          = 0;
737         ns->ns_max_unused         = LDLM_DEFAULT_LRU_SIZE;
738         ns->ns_max_age            = LDLM_DEFAULT_MAX_ALIVE;
739         ns->ns_ctime_age_limit    = LDLM_CTIME_AGE_LIMIT;
740         ns->ns_timeouts           = 0;
741         ns->ns_orig_connect_flags = 0;
742         ns->ns_connect_flags      = 0;
743         ns->ns_stopping           = 0;
744         ns->ns_reclaim_start      = 0;
745         rc = ldlm_namespace_proc_register(ns);
746         if (rc != 0) {
747                 CERROR("Can't initialize ns proc, rc %d\n", rc);
748                 GOTO(out_hash, rc);
749         }
750
751         idx = ldlm_namespace_nr_read(client);
752         rc = ldlm_pool_init(&ns->ns_pool, ns, idx, client);
753         if (rc) {
754                 CERROR("Can't initialize lock pool, rc %d\n", rc);
755                 GOTO(out_proc, rc);
756         }
757
758         ldlm_namespace_register(ns, client);
759         RETURN(ns);
760 out_proc:
761         ldlm_namespace_proc_unregister(ns);
762         ldlm_namespace_cleanup(ns, 0);
763 out_hash:
764         cfs_hash_putref(ns->ns_rs_hash);
765 out_ns:
766         OBD_FREE_PTR(ns);
767 out_ref:
768         ldlm_put_ref();
769         RETURN(NULL);
770 }
771 EXPORT_SYMBOL(ldlm_namespace_new);
772
773 extern struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock);
774
775 /**
776  * Cancel and destroy all locks on a resource.
777  *
778  * If flags contains FL_LOCAL_ONLY, don't try to tell the server, just
779  * clean up.  This is currently only used for recovery, and we make
780  * certain assumptions as a result--notably, that we shouldn't cancel
781  * locks with refs.
782  */
783 static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
784                              __u64 flags)
785 {
786         struct list_head *tmp;
787         int rc = 0, client = ns_is_client(ldlm_res_to_ns(res));
788         bool local_only = !!(flags & LDLM_FL_LOCAL_ONLY);
789
790         do {
791                 struct ldlm_lock *lock = NULL;
792
793                 /* First, we look for non-cleaned-yet lock
794                  * all cleaned locks are marked by CLEANED flag. */
795                 lock_res(res);
796                 list_for_each(tmp, q) {
797                         lock = list_entry(tmp, struct ldlm_lock,
798                                           l_res_link);
799                         if (ldlm_is_cleaned(lock)) {
800                                 lock = NULL;
801                                 continue;
802                         }
803                         LDLM_LOCK_GET(lock);
804                         ldlm_set_cleaned(lock);
805                         break;
806                 }
807
808                 if (lock == NULL) {
809                         unlock_res(res);
810                         break;
811                 }
812
813                 /* Set CBPENDING so nothing in the cancellation path
814                  * can match this lock. */
815                 ldlm_set_cbpending(lock);
816                 ldlm_set_failed(lock);
817                 lock->l_flags |= flags;
818
819                 /* ... without sending a CANCEL message for local_only. */
820                 if (local_only)
821                         ldlm_set_local_only(lock);
822
823                 if (local_only && (lock->l_readers || lock->l_writers)) {
824                         /* This is a little bit gross, but much better than the
825                          * alternative: pretend that we got a blocking AST from
826                          * the server, so that when the lock is decref'd, it
827                          * will go away ... */
828                         unlock_res(res);
829                         LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
830                         if (lock->l_flags & LDLM_FL_FAIL_LOC) {
831                                 set_current_state(TASK_UNINTERRUPTIBLE);
832                                 schedule_timeout(cfs_time_seconds(4));
833                                 set_current_state(TASK_RUNNING);
834                         }
835                         if (lock->l_completion_ast)
836                                 lock->l_completion_ast(lock,
837                                                        LDLM_FL_FAILED, NULL);
838                         LDLM_LOCK_RELEASE(lock);
839                         continue;
840                 }
841
842                 if (client) {
843                         struct lustre_handle lockh;
844
845                         unlock_res(res);
846                         ldlm_lock2handle(lock, &lockh);
847                         rc = ldlm_cli_cancel(&lockh, LCF_LOCAL);
848                         if (rc)
849                                 CERROR("ldlm_cli_cancel: %d\n", rc);
850                 } else {
851                         unlock_res(res);
852                         LDLM_DEBUG(lock, "Freeing a lock still held by a "
853                                    "client node");
854                         ldlm_lock_cancel(lock);
855                 }
856                 LDLM_LOCK_RELEASE(lock);
857         } while (1);
858 }
859
860 static int ldlm_resource_clean(struct cfs_hash *hs, struct cfs_hash_bd *bd,
861                                struct hlist_node *hnode, void *arg)
862 {
863         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
864         __u64 flags = *(__u64 *)arg;
865
866         cleanup_resource(res, &res->lr_granted, flags);
867         cleanup_resource(res, &res->lr_converting, flags);
868         cleanup_resource(res, &res->lr_waiting, flags);
869
870         return 0;
871 }
872
873 static int ldlm_resource_complain(struct cfs_hash *hs, struct cfs_hash_bd *bd,
874                                   struct hlist_node *hnode, void *arg)
875 {
876         struct ldlm_resource  *res = cfs_hash_object(hs, hnode);
877
878         lock_res(res);
879         CERROR("%s: namespace resource "DLDLMRES" (%p) refcount nonzero "
880                "(%d) after lock cleanup; forcing cleanup.\n",
881                ldlm_ns_name(ldlm_res_to_ns(res)), PLDLMRES(res), res,
882                atomic_read(&res->lr_refcount) - 1);
883
884         ldlm_resource_dump(D_ERROR, res);
885         unlock_res(res);
886         return 0;
887 }
888
889 /**
890  * Cancel and destroy all locks in the namespace.
891  *
892  * Typically used during evictions when server notified client that it was
893  * evicted and all of its state needs to be destroyed.
894  * Also used during shutdown.
895  */
896 int ldlm_namespace_cleanup(struct ldlm_namespace *ns, __u64 flags)
897 {
898         if (ns == NULL) {
899                 CDEBUG(D_INFO, "NULL ns, skipping cleanup\n");
900                 return ELDLM_OK;
901         }
902
903         cfs_hash_for_each_nolock(ns->ns_rs_hash, ldlm_resource_clean,
904                                  &flags, 0);
905         cfs_hash_for_each_nolock(ns->ns_rs_hash, ldlm_resource_complain,
906                                  NULL, 0);
907         return ELDLM_OK;
908 }
909 EXPORT_SYMBOL(ldlm_namespace_cleanup);
910
911 /**
912  * Attempts to free namespace.
913  *
914  * Only used when namespace goes away, like during an unmount.
915  */
916 static int __ldlm_namespace_free(struct ldlm_namespace *ns, int force)
917 {
918         ENTRY;
919
920         /* At shutdown time, don't call the cancellation callback */
921         ldlm_namespace_cleanup(ns, force ? LDLM_FL_LOCAL_ONLY : 0);
922
923         if (atomic_read(&ns->ns_bref) > 0) {
924                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
925                 int rc;
926                 CDEBUG(D_DLMTRACE,
927                        "dlm namespace %s free waiting on refcount %d\n",
928                        ldlm_ns_name(ns), atomic_read(&ns->ns_bref));
929 force_wait:
930                 if (force)
931                         lwi = LWI_TIMEOUT(msecs_to_jiffies(obd_timeout *
932                                           MSEC_PER_SEC) / 4, NULL, NULL);
933
934                 rc = l_wait_event(ns->ns_waitq,
935                                   atomic_read(&ns->ns_bref) == 0, &lwi);
936
937                 /* Forced cleanups should be able to reclaim all references,
938                  * so it's safe to wait forever... we can't leak locks... */
939                 if (force && rc == -ETIMEDOUT) {
940                         LCONSOLE_ERROR("Forced cleanup waiting for %s "
941                                        "namespace with %d resources in use, "
942                                        "(rc=%d)\n", ldlm_ns_name(ns),
943                                        atomic_read(&ns->ns_bref), rc);
944                         GOTO(force_wait, rc);
945                 }
946
947                 if (atomic_read(&ns->ns_bref)) {
948                         LCONSOLE_ERROR("Cleanup waiting for %s namespace "
949                                        "with %d resources in use, (rc=%d)\n",
950                                        ldlm_ns_name(ns),
951                                        atomic_read(&ns->ns_bref), rc);
952                         RETURN(ELDLM_NAMESPACE_EXISTS);
953                 }
954                 CDEBUG(D_DLMTRACE, "dlm namespace %s free done waiting\n",
955                        ldlm_ns_name(ns));
956         }
957
958         RETURN(ELDLM_OK);
959 }
960
961 /**
962  * Performs various cleanups for passed \a ns to make it drop refc and be
963  * ready for freeing. Waits for refc == 0.
964  *
965  * The following is done:
966  * (0) Unregister \a ns from its list to make inaccessible for potential
967  * users like pools thread and others;
968  * (1) Clear all locks in \a ns.
969  */
970 void ldlm_namespace_free_prior(struct ldlm_namespace *ns,
971                                struct obd_import *imp,
972                                int force)
973 {
974         int rc;
975         ENTRY;
976         if (!ns) {
977                 EXIT;
978                 return;
979         }
980
981         spin_lock(&ns->ns_lock);
982         ns->ns_stopping = 1;
983         spin_unlock(&ns->ns_lock);
984
985         /*
986          * Can fail with -EINTR when force == 0 in which case try harder.
987          */
988         rc = __ldlm_namespace_free(ns, force);
989         if (rc != ELDLM_OK) {
990                 if (imp) {
991                         ptlrpc_disconnect_import(imp, 0);
992                         ptlrpc_invalidate_import(imp);
993                 }
994
995                 /*
996                  * With all requests dropped and the import inactive
997                  * we are gaurenteed all reference will be dropped.
998                  */
999                 rc = __ldlm_namespace_free(ns, 1);
1000                 LASSERT(rc == 0);
1001         }
1002         EXIT;
1003 }
1004 EXPORT_SYMBOL(ldlm_namespace_free_prior);
1005
1006 /**
1007  * Performs freeing memory structures related to \a ns. This is only done
1008  * when ldlm_namespce_free_prior() successfully removed all resources
1009  * referencing \a ns and its refc == 0.
1010  */
1011 void ldlm_namespace_free_post(struct ldlm_namespace *ns)
1012 {
1013         ENTRY;
1014         if (!ns) {
1015                 EXIT;
1016                 return;
1017         }
1018
1019         /* Make sure that nobody can find this ns in its list. */
1020         ldlm_namespace_unregister(ns, ns->ns_client);
1021         /* Fini pool _before_ parent proc dir is removed. This is important as
1022          * ldlm_pool_fini() removes own proc dir which is child to @dir.
1023          * Removing it after @dir may cause oops. */
1024         ldlm_pool_fini(&ns->ns_pool);
1025
1026         ldlm_namespace_proc_unregister(ns);
1027         cfs_hash_putref(ns->ns_rs_hash);
1028         /* Namespace \a ns should be not on list at this time, otherwise
1029          * this will cause issues related to using freed \a ns in poold
1030          * thread. */
1031         LASSERT(list_empty(&ns->ns_list_chain));
1032         OBD_FREE_PTR(ns);
1033         ldlm_put_ref();
1034         EXIT;
1035 }
1036 EXPORT_SYMBOL(ldlm_namespace_free_post);
1037
1038 /**
1039  * Cleanup the resource, and free namespace.
1040  * bug 12864:
1041  * Deadlock issue:
1042  * proc1: destroy import
1043  *        class_disconnect_export(grab cl_sem) ->
1044  *              -> ldlm_namespace_free ->
1045  *              -> lprocfs_remove(grab _lprocfs_lock).
1046  * proc2: read proc info
1047  *        lprocfs_fops_read(grab _lprocfs_lock) ->
1048  *              -> osc_rd_active, etc(grab cl_sem).
1049  *
1050  * So that I have to split the ldlm_namespace_free into two parts - the first
1051  * part ldlm_namespace_free_prior is used to cleanup the resource which is
1052  * being used; the 2nd part ldlm_namespace_free_post is used to unregister the
1053  * lprocfs entries, and then free memory. It will be called w/o cli->cl_sem
1054  * held.
1055  */
1056 void ldlm_namespace_free(struct ldlm_namespace *ns,
1057                          struct obd_import *imp,
1058                          int force)
1059 {
1060         ldlm_namespace_free_prior(ns, imp, force);
1061         ldlm_namespace_free_post(ns);
1062 }
1063 EXPORT_SYMBOL(ldlm_namespace_free);
1064
1065 void ldlm_namespace_get(struct ldlm_namespace *ns)
1066 {
1067         atomic_inc(&ns->ns_bref);
1068 }
1069
1070 /* This is only for callers that care about refcount */
1071 static int ldlm_namespace_get_return(struct ldlm_namespace *ns)
1072 {
1073         return atomic_inc_return(&ns->ns_bref);
1074 }
1075
1076 void ldlm_namespace_put(struct ldlm_namespace *ns)
1077 {
1078         if (atomic_dec_and_lock(&ns->ns_bref, &ns->ns_lock)) {
1079                 wake_up(&ns->ns_waitq);
1080                 spin_unlock(&ns->ns_lock);
1081         }
1082 }
1083
1084 /** Register \a ns in the list of namespaces */
1085 void ldlm_namespace_register(struct ldlm_namespace *ns, enum ldlm_side client)
1086 {
1087         mutex_lock(ldlm_namespace_lock(client));
1088         LASSERT(list_empty(&ns->ns_list_chain));
1089         list_add(&ns->ns_list_chain, ldlm_namespace_inactive_list(client));
1090         ldlm_namespace_nr_inc(client);
1091         mutex_unlock(ldlm_namespace_lock(client));
1092 }
1093
1094 /** Unregister \a ns from the list of namespaces. */
1095 void ldlm_namespace_unregister(struct ldlm_namespace *ns, enum ldlm_side client)
1096 {
1097         mutex_lock(ldlm_namespace_lock(client));
1098         LASSERT(!list_empty(&ns->ns_list_chain));
1099         /* Some asserts and possibly other parts of the code are still
1100          * using list_empty(&ns->ns_list_chain). This is why it is
1101          * important to use list_del_init() here. */
1102         list_del_init(&ns->ns_list_chain);
1103         ldlm_namespace_nr_dec(client);
1104         mutex_unlock(ldlm_namespace_lock(client));
1105 }
1106
1107 /** Should be called with ldlm_namespace_lock(client) taken. */
1108 void ldlm_namespace_move_to_active_locked(struct ldlm_namespace *ns,
1109                                           enum ldlm_side client)
1110 {
1111         LASSERT(!list_empty(&ns->ns_list_chain));
1112         LASSERT(mutex_is_locked(ldlm_namespace_lock(client)));
1113         list_move_tail(&ns->ns_list_chain, ldlm_namespace_list(client));
1114 }
1115
1116 /** Should be called with ldlm_namespace_lock(client) taken. */
1117 void ldlm_namespace_move_to_inactive_locked(struct ldlm_namespace *ns,
1118                                             enum ldlm_side client)
1119 {
1120         LASSERT(!list_empty(&ns->ns_list_chain));
1121         LASSERT(mutex_is_locked(ldlm_namespace_lock(client)));
1122         list_move_tail(&ns->ns_list_chain,
1123                        ldlm_namespace_inactive_list(client));
1124 }
1125
1126 /** Should be called with ldlm_namespace_lock(client) taken. */
1127 struct ldlm_namespace *ldlm_namespace_first_locked(enum ldlm_side client)
1128 {
1129         LASSERT(mutex_is_locked(ldlm_namespace_lock(client)));
1130         LASSERT(!list_empty(ldlm_namespace_list(client)));
1131         return container_of(ldlm_namespace_list(client)->next,
1132                             struct ldlm_namespace, ns_list_chain);
1133 }
1134
1135 /** Create and initialize new resource. */
1136 static struct ldlm_resource *ldlm_resource_new(enum ldlm_type ldlm_type)
1137 {
1138         struct ldlm_resource *res;
1139         int idx;
1140
1141         OBD_SLAB_ALLOC_PTR_GFP(res, ldlm_resource_slab, GFP_NOFS);
1142         if (res == NULL)
1143                 return NULL;
1144
1145         if (ldlm_type == LDLM_EXTENT) {
1146                 OBD_SLAB_ALLOC(res->lr_itree, ldlm_interval_tree_slab,
1147                                sizeof(*res->lr_itree) * LCK_MODE_NUM);
1148                 if (res->lr_itree == NULL) {
1149                         OBD_SLAB_FREE_PTR(res, ldlm_resource_slab);
1150                         return NULL;
1151                 }
1152                 /* Initialize interval trees for each lock mode. */
1153                 for (idx = 0; idx < LCK_MODE_NUM; idx++) {
1154                         res->lr_itree[idx].lit_size = 0;
1155                         res->lr_itree[idx].lit_mode = 1 << idx;
1156                         res->lr_itree[idx].lit_root = NULL;
1157                 }
1158         }
1159
1160         INIT_LIST_HEAD(&res->lr_granted);
1161         INIT_LIST_HEAD(&res->lr_converting);
1162         INIT_LIST_HEAD(&res->lr_waiting);
1163
1164         atomic_set(&res->lr_refcount, 1);
1165         spin_lock_init(&res->lr_lock);
1166         lu_ref_init(&res->lr_reference);
1167
1168         /* Since LVB init can be delayed now, there is no longer need to
1169          * immediatelly acquire mutex here. */
1170         mutex_init(&res->lr_lvb_mutex);
1171         res->lr_lvb_initialized = false;
1172
1173         return res;
1174 }
1175
1176 /**
1177  * Return a reference to resource with given name, creating it if necessary.
1178  * Args: namespace with ns_lock unlocked
1179  * Locks: takes and releases NS hash-lock and res->lr_lock
1180  * Returns: referenced, unlocked ldlm_resource or NULL
1181  */
1182 struct ldlm_resource *
1183 ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
1184                   const struct ldlm_res_id *name, enum ldlm_type type,
1185                   int create)
1186 {
1187         struct hlist_node       *hnode;
1188         struct ldlm_resource    *res = NULL;
1189         struct cfs_hash_bd              bd;
1190         __u64                   version;
1191         int                     ns_refcount = 0;
1192
1193         LASSERT(ns != NULL);
1194         LASSERT(parent == NULL);
1195         LASSERT(ns->ns_rs_hash != NULL);
1196         LASSERT(name->name[0] != 0);
1197
1198         cfs_hash_bd_get_and_lock(ns->ns_rs_hash, (void *)name, &bd, 0);
1199         hnode = cfs_hash_bd_lookup_locked(ns->ns_rs_hash, &bd, (void *)name);
1200         if (hnode != NULL) {
1201                 cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 0);
1202                 GOTO(found, res);
1203         }
1204
1205         version = cfs_hash_bd_version_get(&bd);
1206         cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 0);
1207
1208         if (create == 0)
1209                 return ERR_PTR(-ENOENT);
1210
1211         LASSERTF(type >= LDLM_MIN_TYPE && type < LDLM_MAX_TYPE,
1212                  "type: %d\n", type);
1213         res = ldlm_resource_new(type);
1214         if (res == NULL)
1215                 return ERR_PTR(-ENOMEM);
1216
1217         res->lr_ns_bucket = cfs_hash_bd_extra_get(ns->ns_rs_hash, &bd);
1218         res->lr_name = *name;
1219         res->lr_type = type;
1220
1221         cfs_hash_bd_lock(ns->ns_rs_hash, &bd, 1);
1222         hnode = (version == cfs_hash_bd_version_get(&bd)) ? NULL :
1223                 cfs_hash_bd_lookup_locked(ns->ns_rs_hash, &bd, (void *)name);
1224
1225         if (hnode != NULL) {
1226                 /* Someone won the race and already added the resource. */
1227                 cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 1);
1228                 /* Clean lu_ref for failed resource. */
1229                 lu_ref_fini(&res->lr_reference);
1230                 if (res->lr_itree != NULL)
1231                         OBD_SLAB_FREE(res->lr_itree, ldlm_interval_tree_slab,
1232                                       sizeof(*res->lr_itree) * LCK_MODE_NUM);
1233                 OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
1234 found:
1235                 res = hlist_entry(hnode, struct ldlm_resource, lr_hash);
1236                 return res;
1237         }
1238         /* We won! Let's add the resource. */
1239         cfs_hash_bd_add_locked(ns->ns_rs_hash, &bd, &res->lr_hash);
1240         if (cfs_hash_bd_count_get(&bd) == 1)
1241                 ns_refcount = ldlm_namespace_get_return(ns);
1242
1243         cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 1);
1244
1245         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CREATE_RESOURCE, 2);
1246
1247         /* Let's see if we happened to be the very first resource in this
1248          * namespace. If so, and this is a client namespace, we need to move
1249          * the namespace into the active namespaces list to be patrolled by
1250          * the ldlm_poold. */
1251         if (ns_is_client(ns) && ns_refcount == 1) {
1252                 mutex_lock(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
1253                 ldlm_namespace_move_to_active_locked(ns, LDLM_NAMESPACE_CLIENT);
1254                 mutex_unlock(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
1255         }
1256
1257         return res;
1258 }
1259 EXPORT_SYMBOL(ldlm_resource_get);
1260
1261 struct ldlm_resource *ldlm_resource_getref(struct ldlm_resource *res)
1262 {
1263         LASSERT(res != NULL);
1264         LASSERT(res != LP_POISON);
1265         atomic_inc(&res->lr_refcount);
1266         CDEBUG(D_INFO, "getref res: %p count: %d\n", res,
1267                atomic_read(&res->lr_refcount));
1268         return res;
1269 }
1270
1271 static void __ldlm_resource_putref_final(struct cfs_hash_bd *bd,
1272                                          struct ldlm_resource *res)
1273 {
1274         struct ldlm_ns_bucket *nsb = res->lr_ns_bucket;
1275
1276         if (!list_empty(&res->lr_granted)) {
1277                 ldlm_resource_dump(D_ERROR, res);
1278                 LBUG();
1279         }
1280
1281         if (!list_empty(&res->lr_converting)) {
1282                 ldlm_resource_dump(D_ERROR, res);
1283                 LBUG();
1284         }
1285
1286         if (!list_empty(&res->lr_waiting)) {
1287                 ldlm_resource_dump(D_ERROR, res);
1288                 LBUG();
1289         }
1290
1291         cfs_hash_bd_del_locked(nsb->nsb_namespace->ns_rs_hash,
1292                                bd, &res->lr_hash);
1293         lu_ref_fini(&res->lr_reference);
1294         if (cfs_hash_bd_count_get(bd) == 0)
1295                 ldlm_namespace_put(nsb->nsb_namespace);
1296 }
1297
1298 /* Returns 1 if the resource was freed, 0 if it remains. */
1299 int ldlm_resource_putref(struct ldlm_resource *res)
1300 {
1301         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
1302         struct cfs_hash_bd   bd;
1303
1304         LASSERT_ATOMIC_GT_LT(&res->lr_refcount, 0, LI_POISON);
1305         CDEBUG(D_INFO, "putref res: %p count: %d\n",
1306                res, atomic_read(&res->lr_refcount) - 1);
1307
1308         cfs_hash_bd_get(ns->ns_rs_hash, &res->lr_name, &bd);
1309         if (cfs_hash_bd_dec_and_lock(ns->ns_rs_hash, &bd, &res->lr_refcount)) {
1310                 __ldlm_resource_putref_final(&bd, res);
1311                 cfs_hash_bd_unlock(ns->ns_rs_hash, &bd, 1);
1312                 if (ns->ns_lvbo && ns->ns_lvbo->lvbo_free)
1313                         ns->ns_lvbo->lvbo_free(res);
1314                 if (res->lr_itree != NULL)
1315                         OBD_SLAB_FREE(res->lr_itree, ldlm_interval_tree_slab,
1316                                       sizeof(*res->lr_itree) * LCK_MODE_NUM);
1317                 OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
1318                 return 1;
1319         }
1320         return 0;
1321 }
1322 EXPORT_SYMBOL(ldlm_resource_putref);
1323
1324 /**
1325  * Add a lock into a given resource into specified lock list.
1326  */
1327 void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
1328                             struct ldlm_lock *lock)
1329 {
1330         check_res_locked(res);
1331
1332         LDLM_DEBUG(lock, "About to add this lock");
1333
1334         if (ldlm_is_destroyed(lock)) {
1335                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
1336                 return;
1337         }
1338
1339         LASSERT(list_empty(&lock->l_res_link));
1340
1341         list_add_tail(&lock->l_res_link, head);
1342 }
1343
1344 /**
1345  * Insert a lock into resource after specified lock.
1346  *
1347  * Obtain resource description from the lock we are inserting after.
1348  */
1349 void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
1350                                      struct ldlm_lock *new)
1351 {
1352         struct ldlm_resource *res = original->l_resource;
1353
1354         check_res_locked(res);
1355
1356         ldlm_resource_dump(D_INFO, res);
1357         LDLM_DEBUG(new, "About to insert this lock after %p: ", original);
1358
1359         if (ldlm_is_destroyed(new)) {
1360                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
1361                 goto out;
1362         }
1363
1364         LASSERT(list_empty(&new->l_res_link));
1365
1366         list_add(&new->l_res_link, &original->l_res_link);
1367  out:;
1368 }
1369
1370 void ldlm_resource_unlink_lock(struct ldlm_lock *lock)
1371 {
1372         int type = lock->l_resource->lr_type;
1373
1374         check_res_locked(lock->l_resource);
1375         if (type == LDLM_IBITS || type == LDLM_PLAIN)
1376                 ldlm_unlink_lock_skiplist(lock);
1377         else if (type == LDLM_EXTENT)
1378                 ldlm_extent_unlink_lock(lock);
1379         list_del_init(&lock->l_res_link);
1380 }
1381 EXPORT_SYMBOL(ldlm_resource_unlink_lock);
1382
1383 void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
1384 {
1385         desc->lr_type = res->lr_type;
1386         desc->lr_name = res->lr_name;
1387 }
1388
1389 /**
1390  * Print information about all locks in all namespaces on this node to debug
1391  * log.
1392  */
1393 void ldlm_dump_all_namespaces(enum ldlm_side client, int level)
1394 {
1395         struct list_head *tmp;
1396
1397         if (!((libcfs_debug | D_ERROR) & level))
1398                 return;
1399
1400         mutex_lock(ldlm_namespace_lock(client));
1401
1402         list_for_each(tmp, ldlm_namespace_list(client)) {
1403                 struct ldlm_namespace *ns;
1404
1405                 ns = list_entry(tmp, struct ldlm_namespace, ns_list_chain);
1406                 ldlm_namespace_dump(level, ns);
1407         }
1408
1409         mutex_unlock(ldlm_namespace_lock(client));
1410 }
1411
1412 static int ldlm_res_hash_dump(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1413                               struct hlist_node *hnode, void *arg)
1414 {
1415         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1416         int    level = (int)(unsigned long)arg;
1417
1418         lock_res(res);
1419         ldlm_resource_dump(level, res);
1420         unlock_res(res);
1421
1422         return 0;
1423 }
1424
1425 /**
1426  * Print information about all locks in this namespace on this node to debug
1427  * log.
1428  */
1429 void ldlm_namespace_dump(int level, struct ldlm_namespace *ns)
1430 {
1431         if (!((libcfs_debug | D_ERROR) & level))
1432                 return;
1433
1434         CDEBUG(level, "--- Namespace: %s (rc: %d, side: %s)\n",
1435                ldlm_ns_name(ns), atomic_read(&ns->ns_bref),
1436                ns_is_client(ns) ? "client" : "server");
1437
1438         if (cfs_time_before(cfs_time_current(), ns->ns_next_dump))
1439                 return;
1440
1441         cfs_hash_for_each_nolock(ns->ns_rs_hash,
1442                                  ldlm_res_hash_dump,
1443                                  (void *)(unsigned long)level, 0);
1444         spin_lock(&ns->ns_lock);
1445         ns->ns_next_dump = cfs_time_shift(10);
1446         spin_unlock(&ns->ns_lock);
1447 }
1448
1449 /**
1450  * Print information about all locks in this resource to debug log.
1451  */
1452 void ldlm_resource_dump(int level, struct ldlm_resource *res)
1453 {
1454         struct ldlm_lock *lock;
1455         unsigned int granted = 0;
1456
1457         CLASSERT(RES_NAME_SIZE == 4);
1458
1459         if (!((libcfs_debug | D_ERROR) & level))
1460                 return;
1461
1462         CDEBUG(level, "--- Resource: "DLDLMRES" (%p) refcount = %d\n",
1463                PLDLMRES(res), res, atomic_read(&res->lr_refcount));
1464
1465         if (!list_empty(&res->lr_granted)) {
1466                 CDEBUG(level, "Granted locks (in reverse order):\n");
1467                 list_for_each_entry_reverse(lock, &res->lr_granted,
1468                                                 l_res_link) {
1469                         LDLM_DEBUG_LIMIT(level, lock, "###");
1470                         if (!(level & D_CANTMASK) &&
1471                             ++granted > ldlm_dump_granted_max) {
1472                                 CDEBUG(level, "only dump %d granted locks to "
1473                                        "avoid DDOS.\n", granted);
1474                                 break;
1475                         }
1476                 }
1477         }
1478         if (!list_empty(&res->lr_converting)) {
1479                 CDEBUG(level, "Converting locks:\n");
1480                 list_for_each_entry(lock, &res->lr_converting, l_res_link)
1481                         LDLM_DEBUG_LIMIT(level, lock, "###");
1482         }
1483         if (!list_empty(&res->lr_waiting)) {
1484                 CDEBUG(level, "Waiting locks:\n");
1485                 list_for_each_entry(lock, &res->lr_waiting, l_res_link)
1486                         LDLM_DEBUG_LIMIT(level, lock, "###");
1487         }
1488 }
1489 EXPORT_SYMBOL(ldlm_resource_dump);