Whamcloud - gitweb
b=18551 libcfs hash
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #ifdef __KERNEL__
45 # include <libcfs/libcfs.h>
46 # ifndef HAVE_VFS_INTENT_PATCHES
47 # include <linux/lustre_intent.h>
48 # endif
49 #else
50 # include <liblustre.h>
51 #endif
52
53 #include <obd_class.h>
54 #include "ldlm_internal.h"
55
56 /* lock types */
57 char *ldlm_lockname[] = {
58         [0] "--",
59         [LCK_EX] "EX",
60         [LCK_PW] "PW",
61         [LCK_PR] "PR",
62         [LCK_CW] "CW",
63         [LCK_CR] "CR",
64         [LCK_NL] "NL",
65         [LCK_GROUP] "GROUP",
66         [LCK_COS] "COS"
67 };
68
69 char *ldlm_typename[] = {
70         [LDLM_PLAIN] "PLN",
71         [LDLM_EXTENT] "EXT",
72         [LDLM_FLOCK] "FLK",
73         [LDLM_IBITS] "IBT",
74 };
75
76 char *ldlm_it2str(int it)
77 {
78         switch (it) {
79         case IT_OPEN:
80                 return "open";
81         case IT_CREAT:
82                 return "creat";
83         case (IT_OPEN | IT_CREAT):
84                 return "open|creat";
85         case IT_READDIR:
86                 return "readdir";
87         case IT_GETATTR:
88                 return "getattr";
89         case IT_LOOKUP:
90                 return "lookup";
91         case IT_UNLINK:
92                 return "unlink";
93         case IT_GETXATTR:
94                 return "getxattr";
95         default:
96                 CERROR("Unknown intent %d\n", it);
97                 return "UNKNOWN";
98         }
99 }
100
101 extern cfs_mem_cache_t *ldlm_lock_slab;
102
103 static ldlm_processing_policy ldlm_processing_policy_table[] = {
104         [LDLM_PLAIN] ldlm_process_plain_lock,
105         [LDLM_EXTENT] ldlm_process_extent_lock,
106 #ifdef __KERNEL__
107         [LDLM_FLOCK] ldlm_process_flock_lock,
108 #endif
109         [LDLM_IBITS] ldlm_process_inodebits_lock,
110 };
111
112 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
113 {
114         return ldlm_processing_policy_table[res->lr_type];
115 }
116
117 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
118 {
119         ns->ns_policy = arg;
120 }
121
122 /*
123  * REFCOUNTED LOCK OBJECTS
124  */
125
126
127 /*
128  * Lock refcounts, during creation:
129  *   - one special one for allocation, dec'd only once in destroy
130  *   - one for being a lock that's in-use
131  *   - one for the addref associated with a new lock
132  */
133 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
134 {
135         cfs_atomic_inc(&lock->l_refc);
136         return lock;
137 }
138
139 static void ldlm_lock_free(struct ldlm_lock *lock, size_t size)
140 {
141         LASSERT(size == sizeof(*lock));
142         OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
143 }
144
145 void ldlm_lock_put(struct ldlm_lock *lock)
146 {
147         ENTRY;
148
149         LASSERT(lock->l_resource != LP_POISON);
150         LASSERT(cfs_atomic_read(&lock->l_refc) > 0);
151         if (cfs_atomic_dec_and_test(&lock->l_refc)) {
152                 struct ldlm_resource *res;
153
154                 LDLM_DEBUG(lock,
155                            "final lock_put on destroyed lock, freeing it.");
156
157                 res = lock->l_resource;
158                 LASSERT(lock->l_destroyed);
159                 LASSERT(cfs_list_empty(&lock->l_res_link));
160                 LASSERT(cfs_list_empty(&lock->l_pending_chain));
161
162                 cfs_atomic_dec(&res->lr_namespace->ns_locks);
163                 lu_ref_del(&res->lr_reference, "lock", lock);
164                 ldlm_resource_putref(res);
165                 lock->l_resource = NULL;
166                 if (lock->l_export) {
167                         class_export_lock_put(lock->l_export, lock);
168                         lock->l_export = NULL;
169                 }
170
171                 if (lock->l_lvb_data != NULL)
172                         OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
173
174                 ldlm_interval_free(ldlm_interval_detach(lock));
175                 lu_ref_fini(&lock->l_reference);
176                 OBD_FREE_RCU_CB(lock, sizeof(*lock), &lock->l_handle,
177                                 ldlm_lock_free);
178         }
179
180         EXIT;
181 }
182
183 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
184 {
185         int rc = 0;
186         if (!cfs_list_empty(&lock->l_lru)) {
187                 struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
188                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
189                 cfs_list_del_init(&lock->l_lru);
190                 if (lock->l_flags & LDLM_FL_SKIPPED)
191                         lock->l_flags &= ~LDLM_FL_SKIPPED;
192                 LASSERT(ns->ns_nr_unused > 0);
193                 ns->ns_nr_unused--;
194                 rc = 1;
195         }
196         return rc;
197 }
198
199 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
200 {
201         struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
202         int rc;
203         ENTRY;
204         cfs_spin_lock(&ns->ns_unused_lock);
205         rc = ldlm_lock_remove_from_lru_nolock(lock);
206         cfs_spin_unlock(&ns->ns_unused_lock);
207         EXIT;
208         return rc;
209 }
210
211 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
212 {
213         struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
214         lock->l_last_used = cfs_time_current();
215         LASSERT(cfs_list_empty(&lock->l_lru));
216         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
217         cfs_list_add_tail(&lock->l_lru, &ns->ns_unused_list);
218         LASSERT(ns->ns_nr_unused >= 0);
219         ns->ns_nr_unused++;
220 }
221
222 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
223 {
224         struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
225         ENTRY;
226         cfs_spin_lock(&ns->ns_unused_lock);
227         ldlm_lock_add_to_lru_nolock(lock);
228         cfs_spin_unlock(&ns->ns_unused_lock);
229         EXIT;
230 }
231
232 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
233 {
234         struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
235         ENTRY;
236         cfs_spin_lock(&ns->ns_unused_lock);
237         if (!cfs_list_empty(&lock->l_lru)) {
238                 ldlm_lock_remove_from_lru_nolock(lock);
239                 ldlm_lock_add_to_lru_nolock(lock);
240         }
241         cfs_spin_unlock(&ns->ns_unused_lock);
242         EXIT;
243 }
244
245 /* This used to have a 'strict' flag, which recovery would use to mark an
246  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
247  * shall explain why it's gone: with the new hash table scheme, once you call
248  * ldlm_lock_destroy, you can never drop your final references on this lock.
249  * Because it's not in the hash table anymore.  -phil */
250 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
251 {
252         ENTRY;
253
254         if (lock->l_readers || lock->l_writers) {
255                 LDLM_ERROR(lock, "lock still has references");
256                 ldlm_lock_dump(D_ERROR, lock, 0);
257                 LBUG();
258         }
259
260         if (!cfs_list_empty(&lock->l_res_link)) {
261                 LDLM_ERROR(lock, "lock still on resource");
262                 ldlm_lock_dump(D_ERROR, lock, 0);
263                 LBUG();
264         }
265
266         if (lock->l_destroyed) {
267                 LASSERT(cfs_list_empty(&lock->l_lru));
268                 EXIT;
269                 return 0;
270         }
271         lock->l_destroyed = 1;
272
273         if (lock->l_export && lock->l_export->exp_lock_hash &&
274             !cfs_hlist_unhashed(&lock->l_exp_hash))
275                 cfs_hash_del(lock->l_export->exp_lock_hash,
276                              &lock->l_remote_handle, &lock->l_exp_hash);
277
278         ldlm_lock_remove_from_lru(lock);
279         class_handle_unhash(&lock->l_handle);
280
281 #if 0
282         /* Wake anyone waiting for this lock */
283         /* FIXME: I should probably add yet another flag, instead of using
284          * l_export to only call this on clients */
285         if (lock->l_export)
286                 class_export_put(lock->l_export);
287         lock->l_export = NULL;
288         if (lock->l_export && lock->l_completion_ast)
289                 lock->l_completion_ast(lock, 0);
290 #endif
291         EXIT;
292         return 1;
293 }
294
295 void ldlm_lock_destroy(struct ldlm_lock *lock)
296 {
297         int first;
298         ENTRY;
299         lock_res_and_lock(lock);
300         first = ldlm_lock_destroy_internal(lock);
301         unlock_res_and_lock(lock);
302
303         /* drop reference from hashtable only for first destroy */
304         if (first) {
305                 lu_ref_del(&lock->l_reference, "hash", lock);
306                 LDLM_LOCK_RELEASE(lock);
307         }
308         EXIT;
309 }
310
311 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
312 {
313         int first;
314         ENTRY;
315         first = ldlm_lock_destroy_internal(lock);
316         /* drop reference from hashtable only for first destroy */
317         if (first) {
318                 lu_ref_del(&lock->l_reference, "hash", lock);
319                 LDLM_LOCK_RELEASE(lock);
320         }
321         EXIT;
322 }
323
324 /* this is called by portals_handle2object with the handle lock taken */
325 static void lock_handle_addref(void *lock)
326 {
327         LDLM_LOCK_GET((struct ldlm_lock *)lock);
328 }
329
330 /*
331  * usage: pass in a resource on which you have done ldlm_resource_get
332  *        pass in a parent lock on which you have done a ldlm_lock_get
333  *        after return, ldlm_*_put the resource and parent
334  * returns: lock with refcount 2 - one for current caller and one for remote
335  */
336 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
337 {
338         struct ldlm_lock *lock;
339         ENTRY;
340
341         if (resource == NULL)
342                 LBUG();
343
344         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, CFS_ALLOC_IO);
345         if (lock == NULL)
346                 RETURN(NULL);
347
348         cfs_spin_lock_init(&lock->l_lock);
349         lock->l_resource = ldlm_resource_getref(resource);
350         lu_ref_add(&resource->lr_reference, "lock", lock);
351
352         cfs_atomic_set(&lock->l_refc, 2);
353         CFS_INIT_LIST_HEAD(&lock->l_res_link);
354         CFS_INIT_LIST_HEAD(&lock->l_lru);
355         CFS_INIT_LIST_HEAD(&lock->l_pending_chain);
356         CFS_INIT_LIST_HEAD(&lock->l_bl_ast);
357         CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
358         CFS_INIT_LIST_HEAD(&lock->l_rk_ast);
359         cfs_waitq_init(&lock->l_waitq);
360         lock->l_blocking_lock = NULL;
361         CFS_INIT_LIST_HEAD(&lock->l_sl_mode);
362         CFS_INIT_LIST_HEAD(&lock->l_sl_policy);
363         CFS_INIT_HLIST_NODE(&lock->l_exp_hash);
364
365         cfs_atomic_inc(&resource->lr_namespace->ns_locks);
366         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
367         class_handle_hash(&lock->l_handle, lock_handle_addref);
368
369         CFS_INIT_LIST_HEAD(&lock->l_extents_list);
370         cfs_spin_lock_init(&lock->l_extents_list_lock);
371         CFS_INIT_LIST_HEAD(&lock->l_cache_locks_list);
372         lu_ref_init(&lock->l_reference);
373         lu_ref_add(&lock->l_reference, "hash", lock);
374         lock->l_callback_timeout = 0;
375
376 #if LUSTRE_TRACKS_LOCK_EXP_REFS
377         CFS_INIT_LIST_HEAD(&lock->l_exp_refs_link);
378         lock->l_exp_refs_nr = 0;
379         lock->l_exp_refs_target = NULL;
380 #endif
381
382         RETURN(lock);
383 }
384
385 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
386                               const struct ldlm_res_id *new_resid)
387 {
388         struct ldlm_resource *oldres = lock->l_resource;
389         struct ldlm_resource *newres;
390         int type;
391         ENTRY;
392
393         LASSERT(ns_is_client(ns));
394
395         lock_res_and_lock(lock);
396         if (memcmp(new_resid, &lock->l_resource->lr_name,
397                    sizeof(lock->l_resource->lr_name)) == 0) {
398                 /* Nothing to do */
399                 unlock_res_and_lock(lock);
400                 RETURN(0);
401         }
402
403         LASSERT(new_resid->name[0] != 0);
404
405         /* This function assumes that the lock isn't on any lists */
406         LASSERT(cfs_list_empty(&lock->l_res_link));
407
408         type = oldres->lr_type;
409         unlock_res_and_lock(lock);
410
411         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
412         lu_ref_add(&newres->lr_reference, "lock", lock);
413         if (newres == NULL)
414                 RETURN(-ENOMEM);
415         /*
416          * To flip the lock from the old to the new resource, lock, oldres and
417          * newres have to be locked. Resource spin-locks are nested within
418          * lock->l_lock, and are taken in the memory address order to avoid
419          * dead-locks.
420          */
421         cfs_spin_lock(&lock->l_lock);
422         oldres = lock->l_resource;
423         if (oldres < newres) {
424                 lock_res(oldres);
425                 lock_res_nested(newres, LRT_NEW);
426         } else {
427                 lock_res(newres);
428                 lock_res_nested(oldres, LRT_NEW);
429         }
430         LASSERT(memcmp(new_resid, &oldres->lr_name,
431                        sizeof oldres->lr_name) != 0);
432         lock->l_resource = newres;
433         unlock_res(oldres);
434         unlock_res_and_lock(lock);
435
436         /* ...and the flowers are still standing! */
437         lu_ref_del(&oldres->lr_reference, "lock", lock);
438         ldlm_resource_putref(oldres);
439
440         RETURN(0);
441 }
442
443 /*
444  *  HANDLES
445  */
446
447 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
448 {
449         lockh->cookie = lock->l_handle.h_cookie;
450 }
451
452 /* if flags: atomically get the lock and set the flags.
453  *           Return NULL if flag already set
454  */
455
456 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
457                                      int flags)
458 {
459         struct ldlm_namespace *ns;
460         struct ldlm_lock *lock, *retval = NULL;
461         ENTRY;
462
463         LASSERT(handle);
464
465         lock = class_handle2object(handle->cookie);
466         if (lock == NULL)
467                 RETURN(NULL);
468
469         LASSERT(lock->l_resource != NULL);
470         ns = lock->l_resource->lr_namespace;
471         LASSERT(ns != NULL);
472
473         lu_ref_add_atomic(&lock->l_reference, "handle", cfs_current());
474         lock_res_and_lock(lock);
475
476         /* It's unlikely but possible that someone marked the lock as
477          * destroyed after we did handle2object on it */
478         if (lock->l_destroyed) {
479                 unlock_res_and_lock(lock);
480                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
481                 LDLM_LOCK_PUT(lock);
482                 GOTO(out, retval);
483         }
484
485         if (flags && (lock->l_flags & flags)) {
486                 unlock_res_and_lock(lock);
487                 LDLM_LOCK_PUT(lock);
488                 GOTO(out, retval);
489         }
490
491         if (flags)
492                 lock->l_flags |= flags;
493
494         unlock_res_and_lock(lock);
495         retval = lock;
496         EXIT;
497  out:
498         return retval;
499 }
500
501 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
502 {
503         struct obd_export *exp = lock->l_export?:lock->l_conn_export;
504         /* INODEBITS_INTEROP: If the other side does not support
505          * inodebits, reply with a plain lock descriptor.
506          */
507         if ((lock->l_resource->lr_type == LDLM_IBITS) &&
508             (exp && !(exp->exp_connect_flags & OBD_CONNECT_IBITS))) {
509                 /* Make sure all the right bits are set in this lock we
510                    are going to pass to client */
511                 LASSERTF(lock->l_policy_data.l_inodebits.bits ==
512                          (MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE),
513                          "Inappropriate inode lock bits during "
514                          "conversion " LPU64 "\n",
515                          lock->l_policy_data.l_inodebits.bits);
516
517                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
518                 desc->l_resource.lr_type = LDLM_PLAIN;
519
520                 /* Convert "new" lock mode to something old client can
521                    understand */
522                 if ((lock->l_req_mode == LCK_CR) ||
523                     (lock->l_req_mode == LCK_CW))
524                         desc->l_req_mode = LCK_PR;
525                 else
526                         desc->l_req_mode = lock->l_req_mode;
527                 if ((lock->l_granted_mode == LCK_CR) ||
528                     (lock->l_granted_mode == LCK_CW)) {
529                         desc->l_granted_mode = LCK_PR;
530                 } else {
531                         /* We never grant PW/EX locks to clients */
532                         LASSERT((lock->l_granted_mode != LCK_PW) &&
533                                 (lock->l_granted_mode != LCK_EX));
534                         desc->l_granted_mode = lock->l_granted_mode;
535                 }
536
537                 /* We do not copy policy here, because there is no
538                    policy for plain locks */
539         } else {
540                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
541                 desc->l_req_mode = lock->l_req_mode;
542                 desc->l_granted_mode = lock->l_granted_mode;
543                 desc->l_policy_data = lock->l_policy_data;
544         }
545 }
546
547 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
548                            cfs_list_t *work_list)
549 {
550         if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
551                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
552                 lock->l_flags |= LDLM_FL_AST_SENT;
553                 /* If the enqueuing client said so, tell the AST recipient to
554                  * discard dirty data, rather than writing back. */
555                 if (new->l_flags & LDLM_AST_DISCARD_DATA)
556                         lock->l_flags |= LDLM_FL_DISCARD_DATA;
557                 LASSERT(cfs_list_empty(&lock->l_bl_ast));
558                 cfs_list_add(&lock->l_bl_ast, work_list);
559                 LDLM_LOCK_GET(lock);
560                 LASSERT(lock->l_blocking_lock == NULL);
561                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
562         }
563 }
564
565 void ldlm_add_cp_work_item(struct ldlm_lock *lock, cfs_list_t *work_list)
566 {
567         if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
568                 lock->l_flags |= LDLM_FL_CP_REQD;
569                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
570                 LASSERT(cfs_list_empty(&lock->l_cp_ast));
571                 cfs_list_add(&lock->l_cp_ast, work_list);
572                 LDLM_LOCK_GET(lock);
573         }
574 }
575
576 /* must be called with lr_lock held */
577 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
578                             cfs_list_t *work_list)
579 {
580         ENTRY;
581         check_res_locked(lock->l_resource);
582         if (new)
583                 ldlm_add_bl_work_item(lock, new, work_list);
584         else
585                 ldlm_add_cp_work_item(lock, work_list);
586         EXIT;
587 }
588
589 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
590 {
591         struct ldlm_lock *lock;
592
593         lock = ldlm_handle2lock(lockh);
594         LASSERT(lock != NULL);
595         ldlm_lock_addref_internal(lock, mode);
596         LDLM_LOCK_PUT(lock);
597 }
598
599 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
600 {
601         ldlm_lock_remove_from_lru(lock);
602         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
603                 lock->l_readers++;
604                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
605         }
606         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
607                 lock->l_writers++;
608                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
609         }
610         LDLM_LOCK_GET(lock);
611         lu_ref_add_atomic(&lock->l_reference, "user", lock);
612         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
613 }
614
615 /**
616  * Attempts to addref a lock, and fails if lock is already LDLM_FL_CBPENDING
617  * or destroyed.
618  *
619  * \retval 0 success, lock was addref-ed
620  *
621  * \retval -EAGAIN lock is being canceled.
622  */
623 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
624 {
625         struct ldlm_lock *lock;
626         int               result;
627
628         result = -EAGAIN;
629         lock = ldlm_handle2lock(lockh);
630         if (lock != NULL) {
631                 lock_res_and_lock(lock);
632                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
633                     !(lock->l_flags & LDLM_FL_CBPENDING)) {
634                         ldlm_lock_addref_internal_nolock(lock, mode);
635                         result = 0;
636                 }
637                 unlock_res_and_lock(lock);
638                 LDLM_LOCK_PUT(lock);
639         }
640         return result;
641 }
642
643 /* only called for local locks */
644 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
645 {
646         lock_res_and_lock(lock);
647         ldlm_lock_addref_internal_nolock(lock, mode);
648         unlock_res_and_lock(lock);
649 }
650
651 /* only called in ldlm_flock_destroy and for local locks.
652  *  * for LDLM_FLOCK type locks, l_blocking_ast is null, and
653  *   * ldlm_lock_remove_from_lru() does nothing, it is safe
654  *    * for ldlm_flock_destroy usage by dropping some code */
655 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
656 {
657         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
658         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
659                 LASSERT(lock->l_readers > 0);
660                 lu_ref_del(&lock->l_reference, "reader", lock);
661                 lock->l_readers--;
662         }
663         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
664                 LASSERT(lock->l_writers > 0);
665                 lu_ref_del(&lock->l_reference, "writer", lock);
666                 lock->l_writers--;
667         }
668
669         lu_ref_del(&lock->l_reference, "user", lock);
670         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
671 }
672
673 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
674 {
675         struct ldlm_namespace *ns;
676         ENTRY;
677
678         lock_res_and_lock(lock);
679
680         ns = lock->l_resource->lr_namespace;
681
682         ldlm_lock_decref_internal_nolock(lock, mode);
683
684         if (lock->l_flags & LDLM_FL_LOCAL &&
685             !lock->l_readers && !lock->l_writers) {
686                 /* If this is a local lock on a server namespace and this was
687                  * the last reference, cancel the lock. */
688                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
689                 lock->l_flags |= LDLM_FL_CBPENDING;
690         }
691
692         if (!lock->l_readers && !lock->l_writers &&
693             (lock->l_flags & LDLM_FL_CBPENDING)) {
694                 /* If we received a blocked AST and this was the last reference,
695                  * run the callback. */
696                 if (ns_is_server(ns) && lock->l_export)
697                         CERROR("FL_CBPENDING set on non-local lock--just a "
698                                "warning\n");
699
700                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
701
702                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
703                 ldlm_lock_remove_from_lru(lock);
704                 unlock_res_and_lock(lock);
705
706                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
707                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
708
709                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
710                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
711                         ldlm_handle_bl_callback(ns, NULL, lock);
712         } else if (ns_is_client(ns) &&
713                    !lock->l_readers && !lock->l_writers &&
714                    !(lock->l_flags & LDLM_FL_BL_AST)) {
715                 /* If this is a client-side namespace and this was the last
716                  * reference, put it on the LRU. */
717                 ldlm_lock_add_to_lru(lock);
718                 unlock_res_and_lock(lock);
719
720                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
721                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
722
723                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
724                  * are not supported by the server, otherwise, it is done on
725                  * enqueue. */
726                 if (!exp_connect_cancelset(lock->l_conn_export) &&
727                     !ns_connect_lru_resize(ns))
728                         ldlm_cancel_lru(ns, 0, LDLM_ASYNC, 0);
729         } else {
730                 unlock_res_and_lock(lock);
731         }
732
733         EXIT;
734 }
735
736 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
737 {
738         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
739         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
740         ldlm_lock_decref_internal(lock, mode);
741         LDLM_LOCK_PUT(lock);
742 }
743
744 /* This will drop a lock reference and mark it for destruction, but will not
745  * necessarily cancel the lock before returning. */
746 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
747 {
748         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
749         ENTRY;
750
751         LASSERT(lock != NULL);
752
753         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
754         lock_res_and_lock(lock);
755         lock->l_flags |= LDLM_FL_CBPENDING;
756         unlock_res_and_lock(lock);
757         ldlm_lock_decref_internal(lock, mode);
758         LDLM_LOCK_PUT(lock);
759 }
760
761 struct sl_insert_point {
762         cfs_list_t *res_link;
763         cfs_list_t *mode_link;
764         cfs_list_t *policy_link;
765 };
766
767 /*
768  * search_granted_lock
769  *
770  * Description:
771  *      Finds a position to insert the new lock.
772  * Parameters:
773  *      queue [input]:  the granted list where search acts on;
774  *      req [input]:    the lock whose position to be located;
775  *      prev [output]:  positions within 3 lists to insert @req to
776  * Return Value:
777  *      filled @prev
778  * NOTE: called by
779  *  - ldlm_grant_lock_with_skiplist
780  */
781 static void search_granted_lock(cfs_list_t *queue,
782                                 struct ldlm_lock *req,
783                                 struct sl_insert_point *prev)
784 {
785         cfs_list_t *tmp;
786         struct ldlm_lock *lock, *mode_end, *policy_end;
787         ENTRY;
788
789         cfs_list_for_each(tmp, queue) {
790                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
791
792                 mode_end = cfs_list_entry(lock->l_sl_mode.prev,
793                                           struct ldlm_lock, l_sl_mode);
794
795                 if (lock->l_req_mode != req->l_req_mode) {
796                         /* jump to last lock of mode group */
797                         tmp = &mode_end->l_res_link;
798                         continue;
799                 }
800
801                 /* suitable mode group is found */
802                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
803                         /* insert point is last lock of the mode group */
804                         prev->res_link = &mode_end->l_res_link;
805                         prev->mode_link = &mode_end->l_sl_mode;
806                         prev->policy_link = &req->l_sl_policy;
807                         EXIT;
808                         return;
809                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
810                         for (;;) {
811                                 policy_end =
812                                         cfs_list_entry(lock->l_sl_policy.prev,
813                                                        struct ldlm_lock,
814                                                        l_sl_policy);
815
816                                 if (lock->l_policy_data.l_inodebits.bits ==
817                                     req->l_policy_data.l_inodebits.bits) {
818                                         /* insert point is last lock of
819                                          * the policy group */
820                                         prev->res_link =
821                                                 &policy_end->l_res_link;
822                                         prev->mode_link =
823                                                 &policy_end->l_sl_mode;
824                                         prev->policy_link =
825                                                 &policy_end->l_sl_policy;
826                                         EXIT;
827                                         return;
828                                 }
829
830                                 if (policy_end == mode_end)
831                                         /* done with mode group */
832                                         break;
833
834                                 /* go to next policy group within mode group */
835                                 tmp = policy_end->l_res_link.next;
836                                 lock = cfs_list_entry(tmp, struct ldlm_lock,
837                                                       l_res_link);
838                         }  /* loop over policy groups within the mode group */
839
840                         /* insert point is last lock of the mode group,
841                          * new policy group is started */
842                         prev->res_link = &mode_end->l_res_link;
843                         prev->mode_link = &mode_end->l_sl_mode;
844                         prev->policy_link = &req->l_sl_policy;
845                         EXIT;
846                         return;
847                 } else {
848                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
849                         LBUG();
850                 }
851         }
852
853         /* insert point is last lock on the queue,
854          * new mode group and new policy group are started */
855         prev->res_link = queue->prev;
856         prev->mode_link = &req->l_sl_mode;
857         prev->policy_link = &req->l_sl_policy;
858         EXIT;
859         return;
860 }
861
862 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
863                                        struct sl_insert_point *prev)
864 {
865         struct ldlm_resource *res = lock->l_resource;
866         ENTRY;
867
868         check_res_locked(res);
869
870         ldlm_resource_dump(D_INFO, res);
871         CDEBUG(D_OTHER, "About to add this lock:\n");
872         ldlm_lock_dump(D_OTHER, lock, 0);
873
874         if (lock->l_destroyed) {
875                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
876                 return;
877         }
878
879         LASSERT(cfs_list_empty(&lock->l_res_link));
880         LASSERT(cfs_list_empty(&lock->l_sl_mode));
881         LASSERT(cfs_list_empty(&lock->l_sl_policy));
882
883         cfs_list_add(&lock->l_res_link, prev->res_link);
884         cfs_list_add(&lock->l_sl_mode, prev->mode_link);
885         cfs_list_add(&lock->l_sl_policy, prev->policy_link);
886
887         EXIT;
888 }
889
890 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
891 {
892         struct sl_insert_point prev;
893         ENTRY;
894
895         LASSERT(lock->l_req_mode == lock->l_granted_mode);
896
897         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
898         ldlm_granted_list_add_lock(lock, &prev);
899         EXIT;
900 }
901
902 /* NOTE: called by
903  *  - ldlm_lock_enqueue
904  *  - ldlm_reprocess_queue
905  *  - ldlm_lock_convert
906  *
907  * must be called with lr_lock held
908  */
909 void ldlm_grant_lock(struct ldlm_lock *lock, cfs_list_t *work_list)
910 {
911         struct ldlm_resource *res = lock->l_resource;
912         ENTRY;
913
914         check_res_locked(res);
915
916         lock->l_granted_mode = lock->l_req_mode;
917         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
918                 ldlm_grant_lock_with_skiplist(lock);
919         else if (res->lr_type == LDLM_EXTENT)
920                 ldlm_extent_add_lock(res, lock);
921         else
922                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
923
924         if (lock->l_granted_mode < res->lr_most_restr)
925                 res->lr_most_restr = lock->l_granted_mode;
926
927         if (work_list && lock->l_completion_ast != NULL)
928                 ldlm_add_ast_work_item(lock, NULL, work_list);
929
930         ldlm_pool_add(&res->lr_namespace->ns_pool, lock);
931         EXIT;
932 }
933
934 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
935  * comment above ldlm_lock_match */
936 static struct ldlm_lock *search_queue(cfs_list_t *queue,
937                                       ldlm_mode_t *mode,
938                                       ldlm_policy_data_t *policy,
939                                       struct ldlm_lock *old_lock,
940                                       int flags, int unref)
941 {
942         struct ldlm_lock *lock;
943         cfs_list_t       *tmp;
944
945         cfs_list_for_each(tmp, queue) {
946                 ldlm_mode_t match;
947
948                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
949
950                 if (lock == old_lock)
951                         break;
952
953                 /* llite sometimes wants to match locks that will be
954                  * canceled when their users drop, but we allow it to match
955                  * if it passes in CBPENDING and the lock still has users.
956                  * this is generally only going to be used by children
957                  * whose parents already hold a lock so forward progress
958                  * can still happen. */
959                 if (lock->l_flags & LDLM_FL_CBPENDING &&
960                     !(flags & LDLM_FL_CBPENDING))
961                         continue;
962                 if (!unref && lock->l_flags & LDLM_FL_CBPENDING &&
963                     lock->l_readers == 0 && lock->l_writers == 0)
964                         continue;
965
966                 if (!(lock->l_req_mode & *mode))
967                         continue;
968                 match = lock->l_req_mode;
969
970                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
971                     (lock->l_policy_data.l_extent.start >
972                      policy->l_extent.start ||
973                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
974                         continue;
975
976                 if (unlikely(match == LCK_GROUP) &&
977                     lock->l_resource->lr_type == LDLM_EXTENT &&
978                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
979                         continue;
980
981                 /* We match if we have existing lock with same or wider set
982                    of bits. */
983                 if (lock->l_resource->lr_type == LDLM_IBITS &&
984                      ((lock->l_policy_data.l_inodebits.bits &
985                       policy->l_inodebits.bits) !=
986                       policy->l_inodebits.bits))
987                         continue;
988
989                 if (!unref &&
990                     (lock->l_destroyed || (lock->l_flags & LDLM_FL_FAILED)))
991                         continue;
992
993                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
994                     !(lock->l_flags & LDLM_FL_LOCAL))
995                         continue;
996
997                 if (flags & LDLM_FL_TEST_LOCK) {
998                         LDLM_LOCK_GET(lock);
999                         ldlm_lock_touch_in_lru(lock);
1000                 } else {
1001                         ldlm_lock_addref_internal_nolock(lock, match);
1002                 }
1003                 *mode = match;
1004                 return lock;
1005         }
1006
1007         return NULL;
1008 }
1009
1010 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1011 {
1012         lock->l_flags |= LDLM_FL_LVB_READY;
1013         cfs_waitq_signal(&lock->l_waitq);
1014 }
1015
1016 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1017 {
1018         lock_res_and_lock(lock);
1019         ldlm_lock_allow_match_locked(lock);
1020         unlock_res_and_lock(lock);
1021 }
1022
1023 /* Can be called in two ways:
1024  *
1025  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1026  * for a duplicate of.
1027  *
1028  * Otherwise, all of the fields must be filled in, to match against.
1029  *
1030  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1031  *     server (ie, connh is NULL)
1032  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1033  *     list will be considered
1034  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1035  *     to be canceled can still be matched as long as they still have reader
1036  *     or writer refernces
1037  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1038  *     just tell us if we would have matched.
1039  *
1040  * Returns 1 if it finds an already-existing lock that is compatible; in this
1041  * case, lockh is filled in with a addref()ed lock
1042  *
1043  * we also check security context, if that failed we simply return 0 (to keep
1044  * caller code unchanged), the context failure will be discovered by caller
1045  * sometime later.
1046  */
1047 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
1048                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1049                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1050                             struct lustre_handle *lockh, int unref)
1051 {
1052         struct ldlm_resource *res;
1053         struct ldlm_lock *lock, *old_lock = NULL;
1054         int rc = 0;
1055         ENTRY;
1056
1057         if (ns == NULL) {
1058                 old_lock = ldlm_handle2lock(lockh);
1059                 LASSERT(old_lock);
1060
1061                 ns = old_lock->l_resource->lr_namespace;
1062                 res_id = &old_lock->l_resource->lr_name;
1063                 type = old_lock->l_resource->lr_type;
1064                 mode = old_lock->l_req_mode;
1065         }
1066
1067         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1068         if (res == NULL) {
1069                 LASSERT(old_lock == NULL);
1070                 RETURN(0);
1071         }
1072
1073         LDLM_RESOURCE_ADDREF(res);
1074         lock_res(res);
1075
1076         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1077                             flags, unref);
1078         if (lock != NULL)
1079                 GOTO(out, rc = 1);
1080         if (flags & LDLM_FL_BLOCK_GRANTED)
1081                 GOTO(out, rc = 0);
1082         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1083                             flags, unref);
1084         if (lock != NULL)
1085                 GOTO(out, rc = 1);
1086         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1087                             flags, unref);
1088         if (lock != NULL)
1089                 GOTO(out, rc = 1);
1090
1091         EXIT;
1092  out:
1093         unlock_res(res);
1094         LDLM_RESOURCE_DELREF(res);
1095         ldlm_resource_putref(res);
1096
1097         if (lock) {
1098                 ldlm_lock2handle(lock, lockh);
1099                 if ((flags & LDLM_FL_LVB_READY) &&
1100                     (!(lock->l_flags & LDLM_FL_LVB_READY))) {
1101                         struct l_wait_info lwi;
1102                         if (lock->l_completion_ast) {
1103                                 int err = lock->l_completion_ast(lock,
1104                                                           LDLM_FL_WAIT_NOREPROC,
1105                                                                  NULL);
1106                                 if (err) {
1107                                         if (flags & LDLM_FL_TEST_LOCK)
1108                                                 LDLM_LOCK_RELEASE(lock);
1109                                         else
1110                                                 ldlm_lock_decref_internal(lock,
1111                                                                           mode);
1112                                         rc = 0;
1113                                         goto out2;
1114                                 }
1115                         }
1116
1117                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1118                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1119
1120                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1121                         l_wait_event(lock->l_waitq,
1122                                      (lock->l_flags & LDLM_FL_LVB_READY), &lwi);
1123                 }
1124         }
1125  out2:
1126         if (rc) {
1127                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1128                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1129                                 res_id->name[2] : policy->l_extent.start,
1130                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1131                                 res_id->name[3] : policy->l_extent.end);
1132
1133                 /* check user's security context */
1134                 if (lock->l_conn_export &&
1135                     sptlrpc_import_check_ctx(
1136                                 class_exp2cliimp(lock->l_conn_export))) {
1137                         if (!(flags & LDLM_FL_TEST_LOCK))
1138                                 ldlm_lock_decref_internal(lock, mode);
1139                         rc = 0;
1140                 }
1141
1142                 if (flags & LDLM_FL_TEST_LOCK)
1143                         LDLM_LOCK_RELEASE(lock);
1144
1145         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1146                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1147                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1148                                   type, mode, res_id->name[0], res_id->name[1],
1149                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1150                                         res_id->name[2] :policy->l_extent.start,
1151                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1152                                         res_id->name[3] : policy->l_extent.end);
1153         }
1154         if (old_lock)
1155                 LDLM_LOCK_PUT(old_lock);
1156
1157         return rc ? mode : 0;
1158 }
1159
1160 /* Returns a referenced lock */
1161 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1162                                    const struct ldlm_res_id *res_id,
1163                                    ldlm_type_t type,
1164                                    ldlm_mode_t mode,
1165                                    const struct ldlm_callback_suite *cbs,
1166                                    void *data, __u32 lvb_len)
1167 {
1168         struct ldlm_lock *lock;
1169         struct ldlm_resource *res;
1170         ENTRY;
1171
1172         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1173         if (res == NULL)
1174                 RETURN(NULL);
1175
1176         lock = ldlm_lock_new(res);
1177         ldlm_resource_putref(res);
1178
1179         if (lock == NULL)
1180                 RETURN(NULL);
1181
1182         lock->l_req_mode = mode;
1183         lock->l_ast_data = data;
1184         lock->l_pid = cfs_curproc_pid();
1185         if (cbs) {
1186                 lock->l_blocking_ast = cbs->lcs_blocking;
1187                 lock->l_completion_ast = cbs->lcs_completion;
1188                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1189                 lock->l_weigh_ast = cbs->lcs_weigh;
1190         }
1191
1192         lock->l_tree_node = NULL;
1193         /* if this is the extent lock, allocate the interval tree node */
1194         if (type == LDLM_EXTENT) {
1195                 if (ldlm_interval_alloc(lock) == NULL)
1196                         GOTO(out, 0);
1197         }
1198
1199         if (lvb_len) {
1200                 lock->l_lvb_len = lvb_len;
1201                 OBD_ALLOC(lock->l_lvb_data, lvb_len);
1202                 if (lock->l_lvb_data == NULL)
1203                         GOTO(out, 0);
1204         }
1205
1206         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1207                 GOTO(out, 0);
1208
1209         RETURN(lock);
1210
1211 out:
1212         ldlm_lock_destroy(lock);
1213         LDLM_LOCK_RELEASE(lock);
1214         return NULL;
1215 }
1216
1217 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1218                                struct ldlm_lock **lockp,
1219                                void *cookie, int *flags)
1220 {
1221         struct ldlm_lock *lock = *lockp;
1222         struct ldlm_resource *res = lock->l_resource;
1223         int local = ns_is_client(res->lr_namespace);
1224         ldlm_processing_policy policy;
1225         ldlm_error_t rc = ELDLM_OK;
1226         struct ldlm_interval *node = NULL;
1227         ENTRY;
1228
1229         lock->l_last_activity = cfs_time_current_sec();
1230         /* policies are not executed on the client or during replay */
1231         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1232             && !local && ns->ns_policy) {
1233                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1234                                    NULL);
1235                 if (rc == ELDLM_LOCK_REPLACED) {
1236                         /* The lock that was returned has already been granted,
1237                          * and placed into lockp.  If it's not the same as the
1238                          * one we passed in, then destroy the old one and our
1239                          * work here is done. */
1240                         if (lock != *lockp) {
1241                                 ldlm_lock_destroy(lock);
1242                                 LDLM_LOCK_RELEASE(lock);
1243                         }
1244                         *flags |= LDLM_FL_LOCK_CHANGED;
1245                         RETURN(0);
1246                 } else if (rc != ELDLM_OK ||
1247                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1248                         ldlm_lock_destroy(lock);
1249                         RETURN(rc);
1250                 }
1251         }
1252
1253         /* For a replaying lock, it might be already in granted list. So
1254          * unlinking the lock will cause the interval node to be freed, we
1255          * have to allocate the interval node early otherwise we can't regrant
1256          * this lock in the future. - jay */
1257         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1258                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1259
1260         lock_res_and_lock(lock);
1261         if (local && lock->l_req_mode == lock->l_granted_mode) {
1262                 /* The server returned a blocked lock, but it was granted
1263                  * before we got a chance to actually enqueue it.  We don't
1264                  * need to do anything else. */
1265                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1266                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1267                 GOTO(out, ELDLM_OK);
1268         }
1269
1270         ldlm_resource_unlink_lock(lock);
1271         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1272                 if (node == NULL) {
1273                         ldlm_lock_destroy_nolock(lock);
1274                         GOTO(out, rc = -ENOMEM);
1275                 }
1276
1277                 CFS_INIT_LIST_HEAD(&node->li_group);
1278                 ldlm_interval_attach(node, lock);
1279                 node = NULL;
1280         }
1281
1282         /* Some flags from the enqueue want to make it into the AST, via the
1283          * lock's l_flags. */
1284         lock->l_flags |= *flags & LDLM_AST_DISCARD_DATA;
1285
1286         /* This distinction between local lock trees is very important; a client
1287          * namespace only has information about locks taken by that client, and
1288          * thus doesn't have enough information to decide for itself if it can
1289          * be granted (below).  In this case, we do exactly what the server
1290          * tells us to do, as dictated by the 'flags'.
1291          *
1292          * We do exactly the same thing during recovery, when the server is
1293          * more or less trusting the clients not to lie.
1294          *
1295          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1296          * granted/converting queues. */
1297         if (local) {
1298                 if (*flags & LDLM_FL_BLOCK_CONV)
1299                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1300                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1301                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1302                 else
1303                         ldlm_grant_lock(lock, NULL);
1304                 GOTO(out, ELDLM_OK);
1305         } else if (*flags & LDLM_FL_REPLAY) {
1306                 if (*flags & LDLM_FL_BLOCK_CONV) {
1307                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1308                         GOTO(out, ELDLM_OK);
1309                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1310                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1311                         GOTO(out, ELDLM_OK);
1312                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1313                         ldlm_grant_lock(lock, NULL);
1314                         GOTO(out, ELDLM_OK);
1315                 }
1316                 /* If no flags, fall through to normal enqueue path. */
1317         }
1318
1319         policy = ldlm_processing_policy_table[res->lr_type];
1320         policy(lock, flags, 1, &rc, NULL);
1321         GOTO(out, rc);
1322 out:
1323         unlock_res_and_lock(lock);
1324         if (node)
1325                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1326         return rc;
1327 }
1328
1329 /* Must be called with namespace taken: queue is waiting or converting. */
1330 int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue,
1331                          cfs_list_t *work_list)
1332 {
1333         cfs_list_t *tmp, *pos;
1334         ldlm_processing_policy policy;
1335         int flags;
1336         int rc = LDLM_ITER_CONTINUE;
1337         ldlm_error_t err;
1338         ENTRY;
1339
1340         check_res_locked(res);
1341
1342         policy = ldlm_processing_policy_table[res->lr_type];
1343         LASSERT(policy);
1344
1345         cfs_list_for_each_safe(tmp, pos, queue) {
1346                 struct ldlm_lock *pending;
1347                 pending = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1348
1349                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1350
1351                 flags = 0;
1352                 rc = policy(pending, &flags, 0, &err, work_list);
1353                 if (rc != LDLM_ITER_CONTINUE)
1354                         break;
1355         }
1356
1357         RETURN(rc);
1358 }
1359
1360 /* Helper function for ldlm_run_ast_work().
1361  *
1362  * Send an existing rpc set specified by @arg->set and then
1363  * destroy it. Create new one if @do_create flag is set. */
1364 static void
1365 ldlm_send_and_maybe_create_set(struct ldlm_cb_set_arg *arg, int do_create)
1366 {
1367         ENTRY;
1368
1369         ptlrpc_set_wait(arg->set);
1370         if (arg->type == LDLM_BL_CALLBACK)
1371                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
1372         ptlrpc_set_destroy(arg->set);
1373
1374         if (do_create)
1375                 arg->set = ptlrpc_prep_set();
1376
1377         EXIT;
1378 }
1379
1380 static int
1381 ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
1382 {
1383         struct ldlm_lock_desc d;
1384         struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock,
1385                                                 l_bl_ast);
1386         ENTRY;
1387
1388         /* nobody should touch l_bl_ast */
1389         lock_res_and_lock(lock);
1390         cfs_list_del_init(&lock->l_bl_ast);
1391
1392         LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1393         LASSERT(lock->l_bl_ast_run == 0);
1394         LASSERT(lock->l_blocking_lock);
1395         lock->l_bl_ast_run++;
1396         unlock_res_and_lock(lock);
1397
1398         ldlm_lock2desc(lock->l_blocking_lock, &d);
1399
1400         lock->l_blocking_ast(lock, &d, (void *)arg,
1401                              LDLM_CB_BLOCKING);
1402         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1403         lock->l_blocking_lock = NULL;
1404         LDLM_LOCK_RELEASE(lock);
1405
1406         RETURN(1);
1407 }
1408
1409 static int
1410 ldlm_work_cp_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
1411 {
1412         struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock, l_cp_ast);
1413         ldlm_completion_callback completion_callback;
1414         int rc = 0;
1415         ENTRY;
1416
1417         /* It's possible to receive a completion AST before we've set
1418          * the l_completion_ast pointer: either because the AST arrived
1419          * before the reply, or simply because there's a small race
1420          * window between receiving the reply and finishing the local
1421          * enqueue. (bug 842)
1422          *
1423          * This can't happen with the blocking_ast, however, because we
1424          * will never call the local blocking_ast until we drop our
1425          * reader/writer reference, which we won't do until we get the
1426          * reply and finish enqueueing. */
1427
1428         /* nobody should touch l_cp_ast */
1429         lock_res_and_lock(lock);
1430         cfs_list_del_init(&lock->l_cp_ast);
1431         LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
1432         /* save l_completion_ast since it can be changed by
1433          * mds_intent_policy(), see bug 14225 */
1434         completion_callback = lock->l_completion_ast;
1435         lock->l_flags &= ~LDLM_FL_CP_REQD;
1436         unlock_res_and_lock(lock);
1437
1438         if (completion_callback != NULL) {
1439                 completion_callback(lock, 0, (void *)arg);
1440                 rc = 1;
1441         }
1442         LDLM_LOCK_RELEASE(lock);
1443
1444         RETURN(rc);
1445 }
1446
1447 static int
1448 ldlm_work_revoke_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
1449 {
1450         struct ldlm_lock_desc desc;
1451         struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock,
1452                                                 l_rk_ast);
1453         ENTRY;
1454
1455         cfs_list_del_init(&lock->l_rk_ast);
1456
1457         /* the desc just pretend to exclusive */
1458         ldlm_lock2desc(lock, &desc);
1459         desc.l_req_mode = LCK_EX;
1460         desc.l_granted_mode = 0;
1461
1462         lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1463         LDLM_LOCK_RELEASE(lock);
1464
1465         RETURN(1);
1466 }
1467
1468 int ldlm_run_ast_work(cfs_list_t *rpc_list, ldlm_desc_ast_t ast_type)
1469 {
1470         struct ldlm_cb_set_arg arg;
1471         cfs_list_t *tmp, *pos;
1472         int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg);
1473         int ast_count;
1474         ENTRY;
1475
1476         if (cfs_list_empty(rpc_list))
1477                 RETURN(0);
1478
1479         arg.set = ptlrpc_prep_set();
1480         if (NULL == arg.set)
1481                 RETURN(-ERESTART);
1482         cfs_atomic_set(&arg.restart, 0);
1483         switch (ast_type) {
1484         case LDLM_WORK_BL_AST:
1485                 arg.type = LDLM_BL_CALLBACK;
1486                 work_ast_lock = ldlm_work_bl_ast_lock;
1487                 break;
1488         case LDLM_WORK_CP_AST:
1489                 arg.type = LDLM_CP_CALLBACK;
1490                 work_ast_lock = ldlm_work_cp_ast_lock;
1491                 break;
1492         case LDLM_WORK_REVOKE_AST:
1493                 arg.type = LDLM_BL_CALLBACK;
1494                 work_ast_lock = ldlm_work_revoke_ast_lock;
1495                 break;
1496         default:
1497                 LBUG();
1498         }
1499
1500         ast_count = 0;
1501         cfs_list_for_each_safe(tmp, pos, rpc_list) {
1502                 ast_count += work_ast_lock(tmp, &arg);
1503
1504                 /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,
1505                  * and create a new set for requests that remained in
1506                  * @rpc_list */
1507                 if (unlikely(ast_count == PARALLEL_AST_LIMIT)) {
1508                         ldlm_send_and_maybe_create_set(&arg, 1);
1509                         ast_count = 0;
1510                 }
1511         }
1512
1513         if (ast_count > 0)
1514                 ldlm_send_and_maybe_create_set(&arg, 0);
1515         else
1516                 /* In case when number of ASTs is multiply of
1517                  * PARALLEL_AST_LIMIT or @rpc_list was initially empty,
1518                  * @arg.set must be destroyed here, otherwise we get
1519                  * write memory leaking. */
1520                 ptlrpc_set_destroy(arg.set);
1521
1522         RETURN(cfs_atomic_read(&arg.restart) ? -ERESTART : 0);
1523 }
1524
1525 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
1526 {
1527         ldlm_reprocess_all(res);
1528         return LDLM_ITER_CONTINUE;
1529 }
1530
1531 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
1532 {
1533         cfs_list_t *tmp;
1534         int i, rc;
1535
1536         if (ns == NULL)
1537                 return;
1538
1539         ENTRY;
1540         cfs_spin_lock(&ns->ns_hash_lock);
1541         for (i = 0; i < RES_HASH_SIZE; i++) {
1542                 tmp = ns->ns_hash[i].next;
1543                 while (tmp != &(ns->ns_hash[i])) {
1544                         struct ldlm_resource *res =
1545                                 cfs_list_entry(tmp, struct ldlm_resource,
1546                                                lr_hash);
1547
1548                         ldlm_resource_getref(res);
1549                         cfs_spin_unlock(&ns->ns_hash_lock);
1550                         LDLM_RESOURCE_ADDREF(res);
1551
1552                         rc = reprocess_one_queue(res, NULL);
1553
1554                         LDLM_RESOURCE_DELREF(res);
1555                         cfs_spin_lock(&ns->ns_hash_lock);
1556                         tmp = tmp->next;
1557                         ldlm_resource_putref_locked(res);
1558
1559                         if (rc == LDLM_ITER_STOP)
1560                                 GOTO(out, rc);
1561                 }
1562         }
1563  out:
1564         cfs_spin_unlock(&ns->ns_hash_lock);
1565         EXIT;
1566 }
1567
1568 void ldlm_reprocess_all(struct ldlm_resource *res)
1569 {
1570         CFS_LIST_HEAD(rpc_list);
1571         int rc;
1572         ENTRY;
1573
1574         /* Local lock trees don't get reprocessed. */
1575         if (ns_is_client(res->lr_namespace)) {
1576                 EXIT;
1577                 return;
1578         }
1579
1580  restart:
1581         lock_res(res);
1582         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
1583         if (rc == LDLM_ITER_CONTINUE)
1584                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
1585         unlock_res(res);
1586
1587         rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST);
1588         if (rc == -ERESTART) {
1589                 LASSERT(cfs_list_empty(&rpc_list));
1590                 goto restart;
1591         }
1592         EXIT;
1593 }
1594
1595 void ldlm_cancel_callback(struct ldlm_lock *lock)
1596 {
1597         check_res_locked(lock->l_resource);
1598         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
1599                 lock->l_flags |= LDLM_FL_CANCEL;
1600                 if (lock->l_blocking_ast) {
1601                         // l_check_no_ns_lock(ns);
1602                         unlock_res_and_lock(lock);
1603                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
1604                                              LDLM_CB_CANCELING);
1605                         lock_res_and_lock(lock);
1606                 } else {
1607                         LDLM_DEBUG(lock, "no blocking ast");
1608                 }
1609         }
1610         lock->l_flags |= LDLM_FL_BL_DONE;
1611 }
1612
1613 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
1614 {
1615         if (req->l_resource->lr_type != LDLM_PLAIN &&
1616             req->l_resource->lr_type != LDLM_IBITS)
1617                 return;
1618
1619         cfs_list_del_init(&req->l_sl_policy);
1620         cfs_list_del_init(&req->l_sl_mode);
1621 }
1622
1623 void ldlm_lock_cancel(struct ldlm_lock *lock)
1624 {
1625         struct ldlm_resource *res;
1626         struct ldlm_namespace *ns;
1627         ENTRY;
1628
1629         lock_res_and_lock(lock);
1630
1631         res = lock->l_resource;
1632         ns = res->lr_namespace;
1633
1634         /* Please do not, no matter how tempting, remove this LBUG without
1635          * talking to me first. -phik */
1636         if (lock->l_readers || lock->l_writers) {
1637                 LDLM_ERROR(lock, "lock still has references");
1638                 LBUG();
1639         }
1640
1641         ldlm_del_waiting_lock(lock);
1642
1643         /* Releases cancel callback. */
1644         ldlm_cancel_callback(lock);
1645
1646         /* Yes, second time, just in case it was added again while we were
1647            running with no res lock in ldlm_cancel_callback */
1648         ldlm_del_waiting_lock(lock);
1649         ldlm_resource_unlink_lock(lock);
1650         ldlm_lock_destroy_nolock(lock);
1651
1652         if (lock->l_granted_mode == lock->l_req_mode)
1653                 ldlm_pool_del(&ns->ns_pool, lock);
1654
1655         /* Make sure we will not be called again for same lock what is possible
1656          * if not to zero out lock->l_granted_mode */
1657         lock->l_granted_mode = LCK_MINMODE;
1658         unlock_res_and_lock(lock);
1659
1660         EXIT;
1661 }
1662
1663 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
1664 {
1665         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1666         ENTRY;
1667
1668         if (lock == NULL)
1669                 RETURN(-EINVAL);
1670
1671         lock->l_ast_data = data;
1672         LDLM_LOCK_PUT(lock);
1673         RETURN(0);
1674 }
1675
1676 int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1677                                     cfs_hlist_node_t *hnode, void *data)
1678
1679 {
1680         struct obd_export    *exp  = data;
1681         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
1682         struct ldlm_resource *res;
1683
1684         res = ldlm_resource_getref(lock->l_resource);
1685         LDLM_LOCK_GET(lock);
1686
1687         LDLM_DEBUG(lock, "export %p", exp);
1688         ldlm_res_lvbo_update(res, NULL, 1);
1689         ldlm_lock_cancel(lock);
1690         ldlm_reprocess_all(res);
1691         ldlm_resource_putref(res);
1692         LDLM_LOCK_RELEASE(lock);
1693         return 0;
1694 }
1695
1696 void ldlm_cancel_locks_for_export(struct obd_export *exp)
1697 {
1698         cfs_hash_for_each_empty(exp->exp_lock_hash,
1699                                 ldlm_cancel_locks_for_export_cb, exp);
1700 }
1701
1702 /**
1703  * Downgrade an exclusive lock.
1704  *
1705  * A fast variant of ldlm_lock_convert for convertion of exclusive
1706  * locks. The convertion is always successful.
1707  *
1708  * \param lock A lock to convert
1709  * \param new_mode new lock mode
1710  */
1711 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
1712 {
1713         struct ldlm_namespace *ns;
1714         ENTRY;
1715
1716         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
1717         LASSERT(new_mode == LCK_COS);
1718
1719         lock_res_and_lock(lock);
1720         ldlm_resource_unlink_lock(lock);
1721         /*
1722          * Remove the lock from pool as it will be added again in
1723          * ldlm_grant_lock() called below.
1724          */
1725         ns = lock->l_resource->lr_namespace;
1726         ldlm_pool_del(&ns->ns_pool, lock);
1727
1728         lock->l_req_mode = new_mode;
1729         ldlm_grant_lock(lock, NULL);
1730         unlock_res_and_lock(lock);
1731         ldlm_reprocess_all(lock->l_resource);
1732
1733         EXIT;
1734 }
1735
1736 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
1737                                         __u32 *flags)
1738 {
1739         CFS_LIST_HEAD(rpc_list);
1740         struct ldlm_resource *res;
1741         struct ldlm_namespace *ns;
1742         int granted = 0;
1743         int old_mode, rc;
1744         struct sl_insert_point prev;
1745         ldlm_error_t err;
1746         struct ldlm_interval *node;
1747         ENTRY;
1748
1749         if (new_mode == lock->l_granted_mode) { // No changes? Just return.
1750                 *flags |= LDLM_FL_BLOCK_GRANTED;
1751                 RETURN(lock->l_resource);
1752         }
1753
1754         /* I can't check the type of lock here because the bitlock of lock
1755          * is not held here, so do the allocation blindly. -jay */
1756         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1757         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
1758                 RETURN(NULL);
1759
1760         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
1761                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
1762
1763         lock_res_and_lock(lock);
1764
1765         res = lock->l_resource;
1766         ns = res->lr_namespace;
1767
1768         old_mode = lock->l_req_mode;
1769         lock->l_req_mode = new_mode;
1770         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
1771                 /* remember the lock position where the lock might be
1772                  * added back to the granted list later and also
1773                  * remember the join mode for skiplist fixing. */
1774                 prev.res_link = lock->l_res_link.prev;
1775                 prev.mode_link = lock->l_sl_mode.prev;
1776                 prev.policy_link = lock->l_sl_policy.prev;
1777                 ldlm_resource_unlink_lock(lock);
1778         } else {
1779                 ldlm_resource_unlink_lock(lock);
1780                 if (res->lr_type == LDLM_EXTENT) {
1781                         /* FIXME: ugly code, I have to attach the lock to a
1782                          * interval node again since perhaps it will be granted
1783                          * soon */
1784                         CFS_INIT_LIST_HEAD(&node->li_group);
1785                         ldlm_interval_attach(node, lock);
1786                         node = NULL;
1787                 }
1788         }
1789
1790         /*
1791          * Remove old lock from the pool before adding the lock with new
1792          * mode below in ->policy()
1793          */
1794         ldlm_pool_del(&ns->ns_pool, lock);
1795
1796         /* If this is a local resource, put it on the appropriate list. */
1797         if (ns_is_client(res->lr_namespace)) {
1798                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
1799                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1800                 } else {
1801                         /* This should never happen, because of the way the
1802                          * server handles conversions. */
1803                         LDLM_ERROR(lock, "Erroneous flags %d on local lock\n",
1804                                    *flags);
1805                         LBUG();
1806
1807                         ldlm_grant_lock(lock, &rpc_list);
1808                         granted = 1;
1809                         /* FIXME: completion handling not with ns_lock held ! */
1810                         if (lock->l_completion_ast)
1811                                 lock->l_completion_ast(lock, 0, NULL);
1812                 }
1813         } else {
1814                 int pflags = 0;
1815                 ldlm_processing_policy policy;
1816                 policy = ldlm_processing_policy_table[res->lr_type];
1817                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
1818                 if (rc == LDLM_ITER_STOP) {
1819                         lock->l_req_mode = old_mode;
1820                         if (res->lr_type == LDLM_EXTENT)
1821                                 ldlm_extent_add_lock(res, lock);
1822                         else
1823                                 ldlm_granted_list_add_lock(lock, &prev);
1824
1825                         res = NULL;
1826                 } else {
1827                         *flags |= LDLM_FL_BLOCK_GRANTED;
1828                         granted = 1;
1829                 }
1830         }
1831         unlock_res_and_lock(lock);
1832
1833         if (granted)
1834                 ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST);
1835         if (node)
1836                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1837         RETURN(res);
1838 }
1839
1840 void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos)
1841 {
1842         struct obd_device *obd = NULL;
1843
1844         if (!((libcfs_debug | D_ERROR) & level))
1845                 return;
1846
1847         if (!lock) {
1848                 CDEBUG(level, "  NULL LDLM lock\n");
1849                 return;
1850         }
1851
1852         CDEBUG(level," -- Lock dump: %p/"LPX64" (rc: %d) (pos: %d) (pid: %d)\n",
1853                lock, lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
1854                pos, lock->l_pid);
1855         if (lock->l_conn_export != NULL)
1856                 obd = lock->l_conn_export->exp_obd;
1857         if (lock->l_export && lock->l_export->exp_connection) {
1858                 CDEBUG(level, "  Node: NID %s (rhandle: "LPX64")\n",
1859                      libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid),
1860                      lock->l_remote_handle.cookie);
1861         } else if (obd == NULL) {
1862                 CDEBUG(level, "  Node: local\n");
1863         } else {
1864                 struct obd_import *imp = obd->u.cli.cl_import;
1865                 CDEBUG(level, "  Node: NID %s (rhandle: "LPX64")\n",
1866                        libcfs_nid2str(imp->imp_connection->c_peer.nid),
1867                        lock->l_remote_handle.cookie);
1868         }
1869         CDEBUG(level, "  Resource: %p ("LPU64"/"LPU64"/"LPU64")\n",
1870                   lock->l_resource,
1871                   lock->l_resource->lr_name.name[0],
1872                   lock->l_resource->lr_name.name[1],
1873                   lock->l_resource->lr_name.name[2]);
1874         CDEBUG(level, "  Req mode: %s, grant mode: %s, rc: %u, read: %d, "
1875                "write: %d flags: "LPX64"\n", ldlm_lockname[lock->l_req_mode],
1876                ldlm_lockname[lock->l_granted_mode],
1877                cfs_atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers,
1878                lock->l_flags);
1879         if (lock->l_resource->lr_type == LDLM_EXTENT)
1880                 CDEBUG(level, "  Extent: "LPU64" -> "LPU64
1881                        " (req "LPU64"-"LPU64")\n",
1882                        lock->l_policy_data.l_extent.start,
1883                        lock->l_policy_data.l_extent.end,
1884                        lock->l_req_extent.start, lock->l_req_extent.end);
1885         else if (lock->l_resource->lr_type == LDLM_FLOCK)
1886                 CDEBUG(level, "  Pid: %d Extent: "LPU64" -> "LPU64"\n",
1887                        lock->l_policy_data.l_flock.pid,
1888                        lock->l_policy_data.l_flock.start,
1889                        lock->l_policy_data.l_flock.end);
1890        else if (lock->l_resource->lr_type == LDLM_IBITS)
1891                 CDEBUG(level, "  Bits: "LPX64"\n",
1892                        lock->l_policy_data.l_inodebits.bits);
1893 }
1894
1895 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
1896 {
1897         struct ldlm_lock *lock;
1898
1899         if (!((libcfs_debug | D_ERROR) & level))
1900                 return;
1901
1902         lock = ldlm_handle2lock(lockh);
1903         if (lock == NULL)
1904                 return;
1905
1906         ldlm_lock_dump(D_OTHER, lock, 0);
1907
1908         LDLM_LOCK_PUT(lock);
1909 }
1910
1911 void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
1912                       struct libcfs_debug_msg_data *data, const char *fmt,
1913                       ...)
1914 {
1915         va_list args;
1916         cfs_debug_limit_state_t *cdls = data->msg_cdls;
1917
1918         va_start(args, fmt);
1919
1920         if (lock->l_resource == NULL) {
1921                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level,data->msg_file,
1922                                    data->msg_fn, data->msg_line, fmt, args,
1923                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1924                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" remote: "
1925                        LPX64" expref: %d pid: %u timeout: %lu\n", lock,
1926                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
1927                        lock->l_readers, lock->l_writers,
1928                        ldlm_lockname[lock->l_granted_mode],
1929                        ldlm_lockname[lock->l_req_mode],
1930                        lock->l_flags, lock->l_remote_handle.cookie,
1931                        lock->l_export ?
1932                        cfs_atomic_read(&lock->l_export->exp_refcount) : -99,
1933                        lock->l_pid, lock->l_callback_timeout);
1934                 va_end(args);
1935                 return;
1936         }
1937
1938         switch (lock->l_resource->lr_type) {
1939         case LDLM_EXTENT:
1940                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level,data->msg_file,
1941                                    data->msg_fn, data->msg_line, fmt, args,
1942                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1943                        "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64
1944                        "] (req "LPU64"->"LPU64") flags: "LPX64" remote: "LPX64
1945                        " expref: %d pid: %u timeout %lu\n",
1946                        lock->l_resource->lr_namespace->ns_name, lock,
1947                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
1948                        lock->l_readers, lock->l_writers,
1949                        ldlm_lockname[lock->l_granted_mode],
1950                        ldlm_lockname[lock->l_req_mode],
1951                        lock->l_resource->lr_name.name[0],
1952                        lock->l_resource->lr_name.name[1],
1953                        cfs_atomic_read(&lock->l_resource->lr_refcount),
1954                        ldlm_typename[lock->l_resource->lr_type],
1955                        lock->l_policy_data.l_extent.start,
1956                        lock->l_policy_data.l_extent.end,
1957                        lock->l_req_extent.start, lock->l_req_extent.end,
1958                        lock->l_flags, lock->l_remote_handle.cookie,
1959                        lock->l_export ?
1960                        cfs_atomic_read(&lock->l_export->exp_refcount) : -99,
1961                        lock->l_pid, lock->l_callback_timeout);
1962                 break;
1963
1964         case LDLM_FLOCK:
1965                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level,data->msg_file,
1966                                    data->msg_fn, data->msg_line, fmt, args,
1967                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1968                        "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d "
1969                        "["LPU64"->"LPU64"] flags: "LPX64" remote: "LPX64
1970                        " expref: %d pid: %u timeout: %lu\n",
1971                        lock->l_resource->lr_namespace->ns_name, lock,
1972                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
1973                        lock->l_readers, lock->l_writers,
1974                        ldlm_lockname[lock->l_granted_mode],
1975                        ldlm_lockname[lock->l_req_mode],
1976                        lock->l_resource->lr_name.name[0],
1977                        lock->l_resource->lr_name.name[1],
1978                        cfs_atomic_read(&lock->l_resource->lr_refcount),
1979                        ldlm_typename[lock->l_resource->lr_type],
1980                        lock->l_policy_data.l_flock.pid,
1981                        lock->l_policy_data.l_flock.start,
1982                        lock->l_policy_data.l_flock.end,
1983                        lock->l_flags, lock->l_remote_handle.cookie,
1984                        lock->l_export ?
1985                        cfs_atomic_read(&lock->l_export->exp_refcount) : -99,
1986                        lock->l_pid, lock->l_callback_timeout);
1987                 break;
1988
1989         case LDLM_IBITS:
1990                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level,data->msg_file,
1991                                    data->msg_fn, data->msg_line, fmt, args,
1992                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1993                        "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "
1994                        "flags: "LPX64" remote: "LPX64" expref: %d "
1995                        "pid: %u timeout: %lu\n",
1996                        lock->l_resource->lr_namespace->ns_name,
1997                        lock, lock->l_handle.h_cookie,
1998                        cfs_atomic_read (&lock->l_refc),
1999                        lock->l_readers, lock->l_writers,
2000                        ldlm_lockname[lock->l_granted_mode],
2001                        ldlm_lockname[lock->l_req_mode],
2002                        lock->l_resource->lr_name.name[0],
2003                        lock->l_resource->lr_name.name[1],
2004                        lock->l_policy_data.l_inodebits.bits,
2005                        cfs_atomic_read(&lock->l_resource->lr_refcount),
2006                        ldlm_typename[lock->l_resource->lr_type],
2007                        lock->l_flags, lock->l_remote_handle.cookie,
2008                        lock->l_export ?
2009                        cfs_atomic_read(&lock->l_export->exp_refcount) : -99,
2010                        lock->l_pid, lock->l_callback_timeout);
2011                 break;
2012
2013         default:
2014                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level,data->msg_file,
2015                                    data->msg_fn, data->msg_line, fmt, args,
2016                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2017                        "res: "LPU64"/"LPU64" rrc: %d type: %s flags: "LPX64" "
2018                        "remote: "LPX64" expref: %d pid: %u timeout %lu\n",
2019                        lock->l_resource->lr_namespace->ns_name,
2020                        lock, lock->l_handle.h_cookie,
2021                        cfs_atomic_read (&lock->l_refc),
2022                        lock->l_readers, lock->l_writers,
2023                        ldlm_lockname[lock->l_granted_mode],
2024                        ldlm_lockname[lock->l_req_mode],
2025                        lock->l_resource->lr_name.name[0],
2026                        lock->l_resource->lr_name.name[1],
2027                        cfs_atomic_read(&lock->l_resource->lr_refcount),
2028                        ldlm_typename[lock->l_resource->lr_type],
2029                        lock->l_flags, lock->l_remote_handle.cookie,
2030                        lock->l_export ?
2031                        cfs_atomic_read(&lock->l_export->exp_refcount) : -99,
2032                        lock->l_pid, lock->l_callback_timeout);
2033                 break;
2034         }
2035         va_end(args);
2036 }
2037 EXPORT_SYMBOL(_ldlm_lock_debug);