Whamcloud - gitweb
7e43e3fe071c10ee0f7b211b7794f8ce4eab896a
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  */
26
27 #define DEBUG_SUBSYSTEM S_LDLM
28
29 #ifdef __KERNEL__
30 # include <libcfs/libcfs.h>
31 # include <linux/lustre_intent.h>
32 #else
33 # include <liblustre.h>
34 # include <libcfs/kp30.h>
35 #endif
36
37 #include <obd_class.h>
38 #include "ldlm_internal.h"
39
40 //struct lustre_lock ldlm_everything_lock;
41
42 /* lock's skip list pointers fix mode */
43 #define LDLM_JOIN_NONE          0
44 #define LDLM_MODE_JOIN_RIGHT    1
45 #define LDLM_MODE_JOIN_LEFT     (1 << 1)
46 #define LDLM_POLICY_JOIN_RIGHT  (1 << 2)
47 #define LDLM_POLICY_JOIN_LEFT   (1 << 3)
48
49 /* lock types */
50 char *ldlm_lockname[] = {
51         [0] "--",
52         [LCK_EX] "EX",
53         [LCK_PW] "PW",
54         [LCK_PR] "PR",
55         [LCK_CW] "CW",
56         [LCK_CR] "CR",
57         [LCK_NL] "NL",
58         [LCK_GROUP] "GROUP"
59 };
60
61 char *ldlm_typename[] = {
62         [LDLM_PLAIN] "PLN",
63         [LDLM_EXTENT] "EXT",
64         [LDLM_FLOCK] "FLK",
65         [LDLM_IBITS] "IBT",
66 };
67
68 char *ldlm_it2str(int it)
69 {
70         switch (it) {
71         case IT_OPEN:
72                 return "open";
73         case IT_CREAT:
74                 return "creat";
75         case (IT_OPEN | IT_CREAT):
76                 return "open|creat";
77         case IT_READDIR:
78                 return "readdir";
79         case IT_GETATTR:
80                 return "getattr";
81         case IT_LOOKUP:
82                 return "lookup";
83         case IT_UNLINK:
84                 return "unlink";
85         case IT_GETXATTR:
86                 return "getxattr";
87         default:
88                 CERROR("Unknown intent %d\n", it);
89                 return "UNKNOWN";
90         }
91 }
92
93 extern cfs_mem_cache_t *ldlm_lock_slab;
94
95 static ldlm_processing_policy ldlm_processing_policy_table[] = {
96         [LDLM_PLAIN] ldlm_process_plain_lock,
97         [LDLM_EXTENT] ldlm_process_extent_lock,
98 #ifdef __KERNEL__
99         [LDLM_FLOCK] ldlm_process_flock_lock,
100 #endif
101         [LDLM_IBITS] ldlm_process_inodebits_lock,
102 };
103
104 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
105 {
106         return ldlm_processing_policy_table[res->lr_type];
107 }
108
109 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
110 {
111         ns->ns_policy = arg;
112 }
113
114 /*
115  * REFCOUNTED LOCK OBJECTS
116  */
117
118
119 /*
120  * Lock refcounts, during creation:
121  *   - one special one for allocation, dec'd only once in destroy
122  *   - one for being a lock that's in-use
123  *   - one for the addref associated with a new lock
124  */
125 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
126 {
127         atomic_inc(&lock->l_refc);
128         return lock;
129 }
130
131 static void ldlm_lock_free(struct ldlm_lock *lock, size_t size)
132 {
133         LASSERT(size == sizeof(*lock));
134         OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
135 }
136
137 void ldlm_lock_put(struct ldlm_lock *lock)
138 {
139         ENTRY;
140
141         LASSERT(lock->l_resource != LP_POISON);
142         LASSERT(atomic_read(&lock->l_refc) > 0);
143         if (atomic_dec_and_test(&lock->l_refc)) {
144                 struct ldlm_resource *res;
145
146                 LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing it.");
147
148                 res = lock->l_resource;
149                 LASSERT(lock->l_destroyed);
150                 LASSERT(list_empty(&lock->l_res_link));
151                 LASSERT(list_empty(&lock->l_pending_chain));
152
153                 if (lock->l_parent)
154                         LDLM_LOCK_PUT(lock->l_parent);
155
156                 atomic_dec(&res->lr_namespace->ns_locks);
157                 ldlm_resource_putref(res);
158                 lock->l_resource = NULL;
159                 if (lock->l_export) {
160                         class_export_put(lock->l_export);
161                         lock->l_export = NULL;
162                 }
163
164                 if (lock->l_lvb_data != NULL)
165                         OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
166
167                 OBD_FREE_RCU_CB(lock, sizeof(*lock), &lock->l_handle, 
168                                 ldlm_lock_free);
169         }
170
171         EXIT;
172 }
173
174 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
175 {
176         int rc = 0;
177         if (!list_empty(&lock->l_lru)) {
178                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
179                 list_del_init(&lock->l_lru);
180                 lock->l_resource->lr_namespace->ns_nr_unused--;
181                 LASSERT(lock->l_resource->lr_namespace->ns_nr_unused >= 0);
182                 rc = 1;
183         }
184         return rc;
185 }
186
187 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
188 {
189         int rc;
190         ENTRY;
191         spin_lock(&lock->l_resource->lr_namespace->ns_unused_lock);
192         rc = ldlm_lock_remove_from_lru_nolock(lock);
193         spin_unlock(&lock->l_resource->lr_namespace->ns_unused_lock);
194         EXIT;
195         return rc;
196 }
197
198 /* This used to have a 'strict' flag, which recovery would use to mark an
199  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
200  * shall explain why it's gone: with the new hash table scheme, once you call
201  * ldlm_lock_destroy, you can never drop your final references on this lock.
202  * Because it's not in the hash table anymore.  -phil */
203 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
204 {
205         ENTRY;
206
207         if (!list_empty(&lock->l_children)) {
208                 LDLM_ERROR(lock, "still has children (%p)!",
209                            lock->l_children.next);
210                 ldlm_lock_dump(D_ERROR, lock, 0);
211                 LBUG();
212         }
213         if (lock->l_readers || lock->l_writers) {
214                 LDLM_ERROR(lock, "lock still has references");
215                 ldlm_lock_dump(D_ERROR, lock, 0);
216                 LBUG();
217         }
218
219         if (!list_empty(&lock->l_res_link)) {
220                 LDLM_ERROR(lock, "lock still on resource");
221                 ldlm_lock_dump(D_ERROR, lock, 0);
222                 LBUG();
223         }
224
225         if (lock->l_destroyed) {
226                 LASSERT(list_empty(&lock->l_lru));
227                 EXIT;
228                 return 0;
229         }
230         lock->l_destroyed = 1;
231
232         if (lock->l_export)
233                 spin_lock(&lock->l_export->exp_ldlm_data.led_lock);
234         list_del_init(&lock->l_export_chain);
235         if (lock->l_export)
236                 spin_unlock(&lock->l_export->exp_ldlm_data.led_lock);
237
238         ldlm_lock_remove_from_lru(lock);
239         class_handle_unhash(&lock->l_handle);
240
241 #if 0
242         /* Wake anyone waiting for this lock */
243         /* FIXME: I should probably add yet another flag, instead of using
244          * l_export to only call this on clients */
245         if (lock->l_export)
246                 class_export_put(lock->l_export);
247         lock->l_export = NULL;
248         if (lock->l_export && lock->l_completion_ast)
249                 lock->l_completion_ast(lock, 0);
250 #endif
251         EXIT;
252         return 1;
253 }
254
255 void ldlm_lock_destroy(struct ldlm_lock *lock)
256 {
257         int first;
258         ENTRY;
259         lock_res_and_lock(lock);
260         first = ldlm_lock_destroy_internal(lock);
261         unlock_res_and_lock(lock);
262
263         /* drop reference from hashtable only for first destroy */
264         if (first)
265                 LDLM_LOCK_PUT(lock);
266         EXIT;
267 }
268
269 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
270 {
271         int first;
272         ENTRY;
273         first = ldlm_lock_destroy_internal(lock);
274         /* drop reference from hashtable only for first destroy */
275         if (first)
276                 LDLM_LOCK_PUT(lock);
277         EXIT;
278 }
279
280 /* this is called by portals_handle2object with the handle lock taken */
281 static void lock_handle_addref(void *lock)
282 {
283         LDLM_LOCK_GET((struct ldlm_lock *)lock);
284 }
285
286 /*
287  * usage: pass in a resource on which you have done ldlm_resource_get
288  *        pass in a parent lock on which you have done a ldlm_lock_get
289  *        after return, ldlm_*_put the resource and parent
290  * returns: lock with refcount 2 - one for current caller and one for remote
291  */
292 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
293                                        struct ldlm_resource *resource)
294 {
295         struct ldlm_lock *lock;
296         ENTRY;
297
298         if (resource == NULL)
299                 LBUG();
300
301         OBD_SLAB_ALLOC(lock, ldlm_lock_slab, CFS_ALLOC_IO, sizeof(*lock));
302         if (lock == NULL)
303                 RETURN(NULL);
304
305         lock->l_resource = ldlm_resource_getref(resource);
306
307         atomic_set(&lock->l_refc, 2);
308         CFS_INIT_LIST_HEAD(&lock->l_children);
309         CFS_INIT_LIST_HEAD(&lock->l_res_link);
310         CFS_INIT_LIST_HEAD(&lock->l_lru);
311         CFS_INIT_LIST_HEAD(&lock->l_export_chain);
312         CFS_INIT_LIST_HEAD(&lock->l_pending_chain);
313         CFS_INIT_LIST_HEAD(&lock->l_tmp);
314         CFS_INIT_LIST_HEAD(&lock->l_bl_ast);
315         CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
316         cfs_waitq_init(&lock->l_waitq);
317         lock->l_blocking_lock = NULL;
318         lock->l_pidb = 0;
319         lock->l_sl_mode.prev = NULL;
320         lock->l_sl_mode.next = NULL;
321         lock->l_sl_policy.prev = NULL;
322         lock->l_sl_policy.next = NULL;
323
324         atomic_inc(&resource->lr_namespace->ns_locks);
325
326         if (parent != NULL) {
327                 spin_lock(&resource->lr_namespace->ns_hash_lock);
328                 lock->l_parent = LDLM_LOCK_GET(parent);
329                 list_add(&lock->l_childof, &parent->l_children);
330                 spin_unlock(&resource->lr_namespace->ns_hash_lock);
331         }
332
333         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
334         class_handle_hash(&lock->l_handle, lock_handle_addref);
335
336         RETURN(lock);
337 }
338
339 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
340                               struct ldlm_res_id new_resid)
341 {
342         struct ldlm_resource *oldres = lock->l_resource;
343         struct ldlm_resource *newres;
344         int type;
345         ENTRY;
346
347         LASSERT(ns->ns_client != 0);
348
349         lock_res_and_lock(lock);
350         if (memcmp(&new_resid, &lock->l_resource->lr_name,
351                    sizeof(lock->l_resource->lr_name)) == 0) {
352                 /* Nothing to do */
353                 unlock_res_and_lock(lock);
354                 RETURN(0);
355         }
356
357         LASSERT(new_resid.name[0] != 0);
358
359         /* This function assumes that the lock isn't on any lists */
360         LASSERT(list_empty(&lock->l_res_link));
361
362         type = oldres->lr_type;
363         unlock_res_and_lock(lock);
364
365         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
366         if (newres == NULL) {
367                 LBUG();
368                 RETURN(-ENOMEM);
369         }
370
371         lock_res_and_lock(lock);
372         LASSERT(memcmp(&new_resid, &lock->l_resource->lr_name,
373                        sizeof(lock->l_resource->lr_name)) != 0);
374         lock_res(newres);
375         lock->l_resource = newres;
376         unlock_res(newres);
377         unlock_res(oldres);
378         unlock_bitlock(lock);
379
380         /* ...and the flowers are still standing! */
381         ldlm_resource_putref(oldres);
382
383         RETURN(0);
384 }
385
386 /*
387  *  HANDLES
388  */
389
390 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
391 {
392         lockh->cookie = lock->l_handle.h_cookie;
393 }
394
395 /* if flags: atomically get the lock and set the flags.
396  *           Return NULL if flag already set
397  */
398
399 struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags)
400 {
401         struct ldlm_namespace *ns;
402         struct ldlm_lock *lock = NULL, *retval = NULL;
403         ENTRY;
404
405         LASSERT(handle);
406
407         lock = class_handle2object(handle->cookie);
408         if (lock == NULL)
409                 RETURN(NULL);
410
411         LASSERT(lock->l_resource != NULL);
412         ns = lock->l_resource->lr_namespace;
413         LASSERT(ns != NULL);
414
415         lock_res_and_lock(lock);
416
417         /* It's unlikely but possible that someone marked the lock as
418          * destroyed after we did handle2object on it */
419         if (lock->l_destroyed) {
420                 unlock_res_and_lock(lock);
421                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
422                 LDLM_LOCK_PUT(lock);
423                 GOTO(out, retval);
424         }
425
426         if (flags && (lock->l_flags & flags)) {
427                 unlock_res_and_lock(lock);
428                 LDLM_LOCK_PUT(lock);
429                 GOTO(out, retval);
430         }
431
432         if (flags)
433                 lock->l_flags |= flags;
434
435         unlock_res_and_lock(lock);
436         retval = lock;
437         EXIT;
438  out:
439         return retval;
440 }
441
442 struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns,
443                                       struct lustre_handle *handle)
444 {
445         struct ldlm_lock *retval = NULL;
446         retval = __ldlm_handle2lock(handle, 0);
447         return retval;
448 }
449
450 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
451 {
452         struct obd_export *exp = lock->l_export?:lock->l_conn_export;
453         /* INODEBITS_INTEROP: If the other side does not support
454          * inodebits, reply with a plain lock descriptor.
455          */
456         if ((lock->l_resource->lr_type == LDLM_IBITS) &&
457             (exp && !(exp->exp_connect_flags & OBD_CONNECT_IBITS))) {
458                 struct ldlm_resource res = *lock->l_resource;
459
460                 /* Make sure all the right bits are set in this lock we
461                    are going to pass to client */
462                 LASSERTF(lock->l_policy_data.l_inodebits.bits ==
463                          (MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE),
464                          "Inappropriate inode lock bits during "
465                          "conversion " LPU64 "\n",
466                          lock->l_policy_data.l_inodebits.bits);
467                 res.lr_type = LDLM_PLAIN;
468                 ldlm_res2desc(&res, &desc->l_resource);
469                 /* Convert "new" lock mode to something old client can
470                    understand */
471                 if ((lock->l_req_mode == LCK_CR) ||
472                     (lock->l_req_mode == LCK_CW))
473                         desc->l_req_mode = LCK_PR;
474                 else
475                         desc->l_req_mode = lock->l_req_mode;
476                 if ((lock->l_granted_mode == LCK_CR) ||
477                     (lock->l_granted_mode == LCK_CW)) {
478                         desc->l_granted_mode = LCK_PR;
479                 } else {
480                         /* We never grant PW/EX locks to clients */
481                         LASSERT((lock->l_granted_mode != LCK_PW) &&
482                                 (lock->l_granted_mode != LCK_EX));
483                         desc->l_granted_mode = lock->l_granted_mode;
484                 }
485
486                 /* We do not copy policy here, because there is no
487                    policy for plain locks */
488         } else {
489                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
490                 desc->l_req_mode = lock->l_req_mode;
491                 desc->l_granted_mode = lock->l_granted_mode;
492                 desc->l_policy_data = lock->l_policy_data;
493         }
494 }
495
496 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
497                            struct list_head *work_list)
498 {
499         if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
500                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
501                 lock->l_flags |= LDLM_FL_AST_SENT;
502                 /* If the enqueuing client said so, tell the AST recipient to
503                  * discard dirty data, rather than writing back. */
504                 if (new->l_flags & LDLM_AST_DISCARD_DATA)
505                         lock->l_flags |= LDLM_FL_DISCARD_DATA;
506                 LASSERT(list_empty(&lock->l_bl_ast));
507                 list_add(&lock->l_bl_ast, work_list);
508                 LDLM_LOCK_GET(lock);
509                 LASSERT(lock->l_blocking_lock == NULL);
510                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
511         }
512 }
513
514 void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list)
515 {
516         if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
517                 lock->l_flags |= LDLM_FL_CP_REQD;
518                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
519                 LASSERT(list_empty(&lock->l_cp_ast));
520                 list_add(&lock->l_cp_ast, work_list);
521                 LDLM_LOCK_GET(lock);
522         }
523 }
524
525 /* must be called with lr_lock held */
526 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
527                                 struct list_head *work_list)
528 {
529         ENTRY;
530         check_res_locked(lock->l_resource);
531         if (new)
532                 ldlm_add_bl_work_item(lock, new, work_list);
533         else 
534                 ldlm_add_cp_work_item(lock, work_list);
535         EXIT;
536 }
537
538 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
539 {
540         struct ldlm_lock *lock;
541
542         lock = ldlm_handle2lock(lockh);
543         LASSERT(lock != NULL);
544         ldlm_lock_addref_internal(lock, mode);
545         LDLM_LOCK_PUT(lock);
546 }
547
548 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
549 {
550         ldlm_lock_remove_from_lru(lock);
551         if (mode & (LCK_NL | LCK_CR | LCK_PR))
552                 lock->l_readers++;
553         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP))
554                 lock->l_writers++;
555         lock->l_last_used = cfs_time_current();
556         LDLM_LOCK_GET(lock);
557         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
558 }
559
560 /* only called for local locks */
561 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
562 {
563         lock_res_and_lock(lock);
564         ldlm_lock_addref_internal_nolock(lock, mode);
565         unlock_res_and_lock(lock);
566 }
567
568 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
569 {
570         struct ldlm_namespace *ns;
571         ENTRY;
572
573         lock_res_and_lock(lock);
574
575         ns = lock->l_resource->lr_namespace;
576
577         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
578         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
579                 LASSERT(lock->l_readers > 0);
580                 lock->l_readers--;
581         }
582         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP)) {
583                 LASSERT(lock->l_writers > 0);
584                 lock->l_writers--;
585         }
586
587         if (lock->l_flags & LDLM_FL_LOCAL &&
588             !lock->l_readers && !lock->l_writers) {
589                 /* If this is a local lock on a server namespace and this was
590                  * the last reference, cancel the lock. */
591                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
592                 lock->l_flags |= LDLM_FL_CBPENDING;
593         }
594
595         if (!lock->l_readers && !lock->l_writers &&
596             (lock->l_flags & LDLM_FL_CBPENDING)) {
597                 /* If we received a blocked AST and this was the last reference,
598                  * run the callback. */
599                 if (ns->ns_client == LDLM_NAMESPACE_SERVER && lock->l_export)
600                         CERROR("FL_CBPENDING set on non-local lock--just a "
601                                "warning\n");
602
603                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
604
605                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
606                 ldlm_lock_remove_from_lru(lock);
607                 unlock_res_and_lock(lock);
608                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
609                                 ldlm_bl_to_thread(ns, NULL, lock) != 0)
610                         ldlm_handle_bl_callback(ns, NULL, lock);
611         } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT &&
612                    !lock->l_readers && !lock->l_writers &&
613                    !(lock->l_flags & LDLM_FL_NO_LRU)) {
614                 /* If this is a client-side namespace and this was the last
615                  * reference, put it on the LRU. */
616                 LASSERT(list_empty(&lock->l_lru));
617                 LASSERT(ns->ns_nr_unused >= 0);
618                 spin_lock(&ns->ns_unused_lock);
619                 list_add_tail(&lock->l_lru, &ns->ns_unused_list);
620                 ns->ns_nr_unused++;
621                 spin_unlock(&ns->ns_unused_lock);
622                 unlock_res_and_lock(lock);
623                 ldlm_cancel_lru(ns, LDLM_ASYNC);
624         } else {
625                 unlock_res_and_lock(lock);
626         }
627
628         LDLM_LOCK_PUT(lock);    /* matches the ldlm_lock_get in addref */
629
630         EXIT;
631 }
632
633 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
634 {
635         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
636         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
637         ldlm_lock_decref_internal(lock, mode);
638         LDLM_LOCK_PUT(lock);
639 }
640
641 /* This will drop a lock reference and mark it for destruction, but will not
642  * necessarily cancel the lock before returning. */
643 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
644 {
645         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
646         ENTRY;
647
648         LASSERT(lock != NULL);
649
650         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
651         lock_res_and_lock(lock);
652         lock->l_flags |= LDLM_FL_CBPENDING;
653         unlock_res_and_lock(lock);
654         ldlm_lock_decref_internal(lock, mode);
655         LDLM_LOCK_PUT(lock);
656 }
657
658 /*
659  * search_granted_lock
660  *
661  * Description:
662  *      Finds a position to insert the new lock.
663  * Parameters:
664  *      queue [input]:  the granted list where search acts on;
665  *      req [input]:    the lock whose position to be located;
666  *      lockp [output]: the position where the lock should be inserted before, or
667  *                      NULL indicating @req should be appended to @queue.
668  * Return Values:
669  *      Bit-masks combination of following values indicating in which way the 
670  *      lock need to be inserted.
671  *      - LDLM_JOIN_NONE:       noting about skip list needs to be fixed;
672  *      - LDLM_MODE_JOIN_RIGHT: @req needs join right becoming the head of a 
673  *                              mode group;
674  *      - LDLM_POLICY_JOIN_RIGHT: @req needs join right becoming the head of
675  *                                a policy group.
676  * NOTE: called by
677  *  - ldlm_grant_lock_with_skiplist
678  */
679 static int search_granted_lock(struct list_head *queue, 
680                         struct ldlm_lock *req,
681                         struct ldlm_lock **lockp)
682 {
683         struct list_head *tmp, *tmp_tail;
684         struct ldlm_lock *lock, *mode_head_lock;
685         int rc = LDLM_JOIN_NONE;
686         ENTRY;
687
688         list_for_each(tmp, queue) {
689                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
690
691                 if (lock->l_req_mode != req->l_req_mode) {
692                         if (LDLM_SL_HEAD(&lock->l_sl_mode))
693                                 tmp = &list_entry(lock->l_sl_mode.next,
694                                                   struct ldlm_lock,
695                                                   l_sl_mode)->l_res_link;
696                         continue;
697                 }
698                 
699                 /* found the same mode group */
700                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
701                         *lockp = lock;
702                         rc = LDLM_MODE_JOIN_RIGHT;
703                         GOTO(out, rc);
704                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
705                         tmp_tail = tmp;
706                         if (LDLM_SL_HEAD(&lock->l_sl_mode))
707                                 tmp_tail = &list_entry(lock->l_sl_mode.next,
708                                                        struct ldlm_lock,
709                                                        l_sl_mode)->l_res_link;
710                         mode_head_lock = lock;
711                         for (;;) {
712                                 if (lock->l_policy_data.l_inodebits.bits ==
713                                     req->l_policy_data.l_inodebits.bits) {
714                                         /* matched policy lock is found */
715                                         *lockp = lock;
716                                         rc |= LDLM_POLICY_JOIN_RIGHT;
717
718                                         /* if the policy group head is also a 
719                                          * mode group head or a single mode
720                                          * group lock */
721                                         if (LDLM_SL_HEAD(&lock->l_sl_mode) ||
722                                             (tmp == tmp_tail &&
723                                              LDLM_SL_EMPTY(&lock->l_sl_mode)))
724                                                 rc |= LDLM_MODE_JOIN_RIGHT;
725                                         GOTO(out, rc);
726                                 }
727
728                                 if (LDLM_SL_HEAD(&lock->l_sl_policy))
729                                         tmp = &list_entry(lock->l_sl_policy.next,
730                                                           struct ldlm_lock,
731                                                           l_sl_policy)->l_res_link;
732
733                                 if (tmp == tmp_tail)
734                                         break;
735                                 else
736                                         tmp = tmp->next;
737                                 lock = list_entry(tmp, struct ldlm_lock, 
738                                                   l_res_link);
739                         }  /* for all locks in the matched mode group */
740
741                         /* no matched policy group is found, insert before
742                          * the mode group head lock */
743                         *lockp = mode_head_lock;
744                         rc = LDLM_MODE_JOIN_RIGHT;
745                         GOTO(out, rc);
746                 } else {
747                         LDLM_ERROR(lock, "is not LDLM_PLAIN or LDLM_IBITS lock");
748                         LBUG();
749                 }
750         }
751
752         /* no matched mode group is found, append to the end */
753         *lockp = NULL;
754         rc = LDLM_JOIN_NONE;
755         EXIT;
756 out:
757         return rc;
758 }
759
760 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock, 
761                                        struct ldlm_lock *lockp,
762                                        int join)
763 {
764         struct ldlm_resource *res = lock->l_resource;
765         ENTRY;
766
767         LASSERT(lockp || join == LDLM_JOIN_NONE);
768
769         check_res_locked(res);
770
771         ldlm_resource_dump(D_OTHER, res);
772         CDEBUG(D_OTHER, "About to add this lock:\n");
773         ldlm_lock_dump(D_OTHER, lock, 0);
774
775         if (lock->l_destroyed) {
776                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
777                 return;
778         }
779
780         LASSERT(list_empty(&lock->l_res_link));
781
782         if (!lockp)
783                 list_add_tail(&lock->l_res_link, &lock->l_resource->lr_granted);
784         else if ((join & LDLM_MODE_JOIN_LEFT) || (join & LDLM_POLICY_JOIN_LEFT))
785                 list_add(&lock->l_res_link, &lockp->l_res_link);
786         else
787                 list_add_tail(&lock->l_res_link, &lockp->l_res_link);
788
789         /* fix skip lists */
790         if (join & LDLM_MODE_JOIN_RIGHT) {
791                 LASSERT(! LDLM_SL_TAIL(&lockp->l_sl_mode));
792                 if (LDLM_SL_EMPTY(&lockp->l_sl_mode)) {
793                         lock->l_sl_mode.next = &lockp->l_sl_mode;
794                         lockp->l_sl_mode.prev = &lock->l_sl_mode;
795                 } else if (LDLM_SL_HEAD(&lockp->l_sl_mode)) {
796                         lock->l_sl_mode.next = lockp->l_sl_mode.next;
797                         lockp->l_sl_mode.next = NULL;
798                         lock->l_sl_mode.next->prev = &lock->l_sl_mode;
799                 }
800         } else if (join & LDLM_MODE_JOIN_LEFT) {
801                 LASSERT(! LDLM_SL_HEAD(&lockp->l_sl_mode));
802                 if (LDLM_SL_EMPTY(&lockp->l_sl_mode)) {
803                         lock->l_sl_mode.prev = &lockp->l_sl_mode;
804                         lockp->l_sl_mode.next = &lock->l_sl_mode;
805                 } else if (LDLM_SL_TAIL(&lockp->l_sl_mode)) {
806                         lock->l_sl_mode.prev = lockp->l_sl_mode.prev;
807                         lockp->l_sl_mode.prev = NULL;
808                         lock->l_sl_mode.prev->next = &lock->l_sl_mode;
809                 }
810         }
811         
812         if (join & LDLM_POLICY_JOIN_RIGHT) {
813                 LASSERT(! LDLM_SL_TAIL(&lockp->l_sl_policy));
814                 if (LDLM_SL_EMPTY(&lockp->l_sl_policy)) {
815                         lock->l_sl_policy.next = &lockp->l_sl_policy;
816                         lockp->l_sl_policy.prev = &lock->l_sl_policy;
817                 } else if (LDLM_SL_HEAD(&lockp->l_sl_policy)) {
818                         lock->l_sl_policy.next = lockp->l_sl_policy.next;
819                         lockp->l_sl_policy.next = NULL;
820                         lock->l_sl_policy.next->prev = &lock->l_sl_policy;
821                 }
822         } else if (join & LDLM_POLICY_JOIN_LEFT) {
823                 LASSERT(! LDLM_SL_HEAD(&lockp->l_sl_policy));
824                 if (LDLM_SL_EMPTY(&lockp->l_sl_policy)) {
825                         lock->l_sl_policy.prev = &lockp->l_sl_policy;
826                         lockp->l_sl_policy.next = &lock->l_sl_policy;
827                 } else if (LDLM_SL_TAIL(&lockp->l_sl_policy)) {
828                         lock->l_sl_policy.prev = lockp->l_sl_policy.prev;
829                         lockp->l_sl_policy.prev = NULL;
830                         lock->l_sl_policy.prev->next = &lock->l_sl_policy;
831                 }
832         }
833
834         EXIT;
835 }
836
837 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
838 {
839         int join = LDLM_JOIN_NONE;
840         struct ldlm_lock *lockp = NULL;
841         ENTRY;
842
843         LASSERT(lock->l_req_mode == lock->l_granted_mode);
844
845         join = search_granted_lock(&lock->l_resource->lr_granted, lock, &lockp);
846         ldlm_granted_list_add_lock(lock, lockp, join);
847         EXIT;
848 }
849
850 /* NOTE: called by
851  *  - ldlm_lock_enqueue
852  *  - ldlm_reprocess_queue
853  *  - ldlm_lock_convert
854  *
855  * must be called with lr_lock held
856  */
857 void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
858 {
859         struct ldlm_resource *res = lock->l_resource;
860         ENTRY;
861
862         check_res_locked(res);
863
864         lock->l_granted_mode = lock->l_req_mode;
865         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
866                 ldlm_grant_lock_with_skiplist(lock);
867         else
868                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
869
870         if (lock->l_granted_mode < res->lr_most_restr)
871                 res->lr_most_restr = lock->l_granted_mode;
872
873         if (work_list && lock->l_completion_ast != NULL)
874                 ldlm_add_ast_work_item(lock, NULL, work_list);
875
876         EXIT;
877 }
878
879 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
880  * comment above ldlm_lock_match */
881 static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
882                                       ldlm_policy_data_t *policy,
883                                       struct ldlm_lock *old_lock, int flags)
884 {
885         struct ldlm_lock *lock;
886         struct list_head *tmp;
887
888         list_for_each(tmp, queue) {
889                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
890
891                 if (lock == old_lock)
892                         break;
893
894                 /* llite sometimes wants to match locks that will be
895                  * canceled when their users drop, but we allow it to match
896                  * if it passes in CBPENDING and the lock still has users.
897                  * this is generally only going to be used by children
898                  * whose parents already hold a lock so forward progress
899                  * can still happen. */
900                 if (lock->l_flags & LDLM_FL_CBPENDING &&
901                     !(flags & LDLM_FL_CBPENDING))
902                         continue;
903                 if (lock->l_flags & LDLM_FL_CBPENDING &&
904                     lock->l_readers == 0 && lock->l_writers == 0)
905                         continue;
906
907                 if (!(lock->l_req_mode & mode))
908                         continue;
909
910                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
911                     (lock->l_policy_data.l_extent.start >
912                      policy->l_extent.start ||
913                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
914                         continue;
915
916                 if (unlikely(mode == LCK_GROUP) &&
917                     lock->l_resource->lr_type == LDLM_EXTENT &&
918                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
919                         continue;
920
921                 /* We match if we have existing lock with same or wider set
922                    of bits. */
923                 if (lock->l_resource->lr_type == LDLM_IBITS &&
924                      ((lock->l_policy_data.l_inodebits.bits &
925                       policy->l_inodebits.bits) !=
926                       policy->l_inodebits.bits))
927                         continue;
928
929                 if (lock->l_destroyed || (lock->l_flags & LDLM_FL_FAILED))
930                         continue;
931
932                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
933                     !(lock->l_flags & LDLM_FL_LOCAL))
934                         continue;
935
936                 if (flags & LDLM_FL_TEST_LOCK)
937                         LDLM_LOCK_GET(lock);
938                 else
939                         ldlm_lock_addref_internal_nolock(lock, mode);
940                 return lock;
941         }
942
943         return NULL;
944 }
945
946 void ldlm_lock_allow_match(struct ldlm_lock *lock)
947 {
948         lock_res_and_lock(lock);
949         lock->l_flags |= LDLM_FL_LVB_READY;
950         cfs_waitq_signal(&lock->l_waitq);
951         unlock_res_and_lock(lock);
952 }
953
954 /* Can be called in two ways:
955  *
956  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
957  * for a duplicate of.
958  *
959  * Otherwise, all of the fields must be filled in, to match against.
960  *
961  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
962  *     server (ie, connh is NULL)
963  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
964  *     list will be considered
965  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
966  *     to be canceled can still be matched as long as they still have reader
967  *     or writer refernces
968  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
969  *     just tell us if we would have matched.
970  *
971  * Returns 1 if it finds an already-existing lock that is compatible; in this
972  * case, lockh is filled in with a addref()ed lock
973  */
974 int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
975                     struct ldlm_res_id *res_id, ldlm_type_t type,
976                     ldlm_policy_data_t *policy, ldlm_mode_t mode,
977                     struct lustre_handle *lockh)
978 {
979         struct ldlm_resource *res;
980         struct ldlm_lock *lock, *old_lock = NULL;
981         int rc = 0;
982         ENTRY;
983
984         if (ns == NULL) {
985                 old_lock = ldlm_handle2lock(lockh);
986                 LASSERT(old_lock);
987
988                 ns = old_lock->l_resource->lr_namespace;
989                 res_id = &old_lock->l_resource->lr_name;
990                 type = old_lock->l_resource->lr_type;
991                 mode = old_lock->l_req_mode;
992         }
993
994         res = ldlm_resource_get(ns, NULL, *res_id, type, 0);
995         if (res == NULL) {
996                 LASSERT(old_lock == NULL);
997                 RETURN(0);
998         }
999
1000         lock_res(res);
1001
1002         lock = search_queue(&res->lr_granted, mode, policy, old_lock, flags);
1003         if (lock != NULL)
1004                 GOTO(out, rc = 1);
1005         if (flags & LDLM_FL_BLOCK_GRANTED)
1006                 GOTO(out, rc = 0);
1007         lock = search_queue(&res->lr_converting, mode, policy, old_lock, flags);
1008         if (lock != NULL)
1009                 GOTO(out, rc = 1);
1010         lock = search_queue(&res->lr_waiting, mode, policy, old_lock, flags);
1011         if (lock != NULL)
1012                 GOTO(out, rc = 1);
1013
1014         EXIT;
1015  out:
1016         unlock_res(res);
1017         ldlm_resource_putref(res);
1018
1019         if (lock) {
1020                 ldlm_lock2handle(lock, lockh);
1021                 if ((flags & LDLM_FL_LVB_READY) && (!(lock->l_flags & LDLM_FL_LVB_READY))) {
1022                         struct l_wait_info lwi;
1023                         if (lock->l_completion_ast) {
1024                                 int err = lock->l_completion_ast(lock,
1025                                                           LDLM_FL_WAIT_NOREPROC,
1026                                                                  NULL);
1027                                 if (err) {
1028                                         if (flags & LDLM_FL_TEST_LOCK)
1029                                                 LDLM_LOCK_PUT(lock);
1030                                         else
1031                                                 ldlm_lock_decref_internal(lock, mode);
1032                                         rc = 0;
1033                                         goto out2;
1034                                 }
1035                         }
1036
1037                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout), NULL,
1038                                                LWI_ON_SIGNAL_NOOP, NULL);
1039
1040                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1041                         l_wait_event(lock->l_waitq,
1042                                      (lock->l_flags & LDLM_FL_LVB_READY), &lwi);
1043                 }
1044         }
1045  out2:
1046         if (rc) {
1047                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1048                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1049                                 res_id->name[2] : policy->l_extent.start,
1050                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1051                                 res_id->name[3] : policy->l_extent.end);
1052         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1053                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1054                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1055                                   type, mode, res_id->name[0], res_id->name[1],
1056                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1057                                         res_id->name[2] :policy->l_extent.start,
1058                                 (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1059                                         res_id->name[3] : policy->l_extent.end);
1060         }
1061         if (old_lock)
1062                 LDLM_LOCK_PUT(old_lock);
1063         if (flags & LDLM_FL_TEST_LOCK && rc)
1064                 LDLM_LOCK_PUT(lock);
1065
1066         return rc;
1067 }
1068
1069 /* Returns a referenced lock */
1070 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1071                                    struct lustre_handle *parent_lock_handle,
1072                                    struct ldlm_res_id res_id, ldlm_type_t type,
1073                                    ldlm_mode_t mode,
1074                                    ldlm_blocking_callback blocking,
1075                                    ldlm_completion_callback completion,
1076                                    ldlm_glimpse_callback glimpse,
1077                                    void *data, __u32 lvb_len)
1078 {
1079         struct ldlm_resource *res, *parent_res = NULL;
1080         struct ldlm_lock *lock, *parent_lock = NULL;
1081         ENTRY;
1082
1083         if (parent_lock_handle) {
1084                 parent_lock = ldlm_handle2lock(parent_lock_handle);
1085                 if (parent_lock)
1086                         parent_res = parent_lock->l_resource;
1087         }
1088
1089         res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
1090         if (res == NULL)
1091                 RETURN(NULL);
1092
1093         lock = ldlm_lock_new(parent_lock, res);
1094         ldlm_resource_putref(res);
1095         if (parent_lock != NULL)
1096                 LDLM_LOCK_PUT(parent_lock);
1097
1098         if (lock == NULL)
1099                 RETURN(NULL);
1100
1101         lock->l_req_mode = mode;
1102         lock->l_ast_data = data;
1103         lock->l_blocking_ast = blocking;
1104         lock->l_completion_ast = completion;
1105         lock->l_glimpse_ast = glimpse;
1106         lock->l_pid = cfs_curproc_pid();
1107
1108         if (lvb_len) {
1109                 lock->l_lvb_len = lvb_len;
1110                 OBD_ALLOC(lock->l_lvb_data, lvb_len);
1111                 if (lock->l_lvb_data == NULL) {
1112                         OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
1113                         RETURN(NULL);
1114                 }
1115         }
1116
1117         RETURN(lock);
1118 }
1119
1120 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1121                                struct ldlm_lock **lockp,
1122                                void *cookie, int *flags)
1123 {
1124         struct ldlm_lock *lock = *lockp;
1125         struct ldlm_resource *res = lock->l_resource;
1126         int local = res->lr_namespace->ns_client;
1127         ldlm_processing_policy policy;
1128         ldlm_error_t rc = ELDLM_OK;
1129         ENTRY;
1130
1131         do_gettimeofday(&lock->l_enqueued_time);
1132         /* policies are not executed on the client or during replay */
1133         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1134             && !local && ns->ns_policy) {
1135                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1136                                    NULL);
1137                 if (rc == ELDLM_LOCK_REPLACED) {
1138                         /* The lock that was returned has already been granted,
1139                          * and placed into lockp.  If it's not the same as the
1140                          * one we passed in, then destroy the old one and our
1141                          * work here is done. */
1142                         if (lock != *lockp) {
1143                                 ldlm_lock_destroy(lock);
1144                                 LDLM_LOCK_PUT(lock);
1145                         }
1146                         *flags |= LDLM_FL_LOCK_CHANGED;
1147                         RETURN(0);
1148                 } else if (rc != ELDLM_OK ||
1149                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1150                         ldlm_lock_destroy(lock);
1151                         RETURN(rc);
1152                 }
1153         }
1154
1155         lock_res_and_lock(lock);
1156         if (local && lock->l_req_mode == lock->l_granted_mode) {
1157                 /* The server returned a blocked lock, but it was granted before
1158                  * we got a chance to actually enqueue it.  We don't need to do
1159                  * anything else. */
1160                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1161                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1162                 GOTO(out, ELDLM_OK);
1163         }
1164
1165         /* Some flags from the enqueue want to make it into the AST, via the
1166          * lock's l_flags. */
1167         lock->l_flags |= *flags & LDLM_AST_DISCARD_DATA;
1168
1169         /* This distinction between local lock trees is very important; a client
1170          * namespace only has information about locks taken by that client, and
1171          * thus doesn't have enough information to decide for itself if it can
1172          * be granted (below).  In this case, we do exactly what the server
1173          * tells us to do, as dictated by the 'flags'.
1174          *
1175          * We do exactly the same thing during recovery, when the server is
1176          * more or less trusting the clients not to lie.
1177          *
1178          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1179          * granted/converting queues. */
1180         ldlm_resource_unlink_lock(lock);
1181         if (local) {
1182                 if (*flags & LDLM_FL_BLOCK_CONV)
1183                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1184                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1185                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1186                 else
1187                         ldlm_grant_lock(lock, NULL);
1188                 GOTO(out, ELDLM_OK);
1189         } else if (*flags & LDLM_FL_REPLAY) {
1190                 if (*flags & LDLM_FL_BLOCK_CONV) {
1191                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1192                         GOTO(out, ELDLM_OK);
1193                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1194                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1195                         GOTO(out, ELDLM_OK);
1196                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1197                         ldlm_grant_lock(lock, NULL);
1198                         GOTO(out, ELDLM_OK);
1199                 }
1200                 /* If no flags, fall through to normal enqueue path. */
1201         }
1202
1203         policy = ldlm_processing_policy_table[res->lr_type];
1204         policy(lock, flags, 1, &rc, NULL);
1205         GOTO(out, rc);
1206 out:
1207         unlock_res_and_lock(lock);
1208         return rc;
1209 }
1210
1211 /* Must be called with namespace taken: queue is waiting or converting. */
1212 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
1213                          struct list_head *work_list)
1214 {
1215         struct list_head *tmp, *pos;
1216         ldlm_processing_policy policy;
1217         int flags;
1218         int rc = LDLM_ITER_CONTINUE;
1219         ldlm_error_t err;
1220         ENTRY;
1221
1222         check_res_locked(res);
1223
1224         policy = ldlm_processing_policy_table[res->lr_type];
1225         LASSERT(policy);
1226
1227         list_for_each_safe(tmp, pos, queue) {
1228                 struct ldlm_lock *pending;
1229                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
1230
1231                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1232
1233                 flags = 0;
1234                 rc = policy(pending, &flags, 0, &err, work_list);
1235                 if (rc != LDLM_ITER_CONTINUE)
1236                         break;
1237         }
1238
1239         RETURN(rc);
1240 }
1241
1242 int ldlm_run_bl_ast_work(struct list_head *rpc_list)
1243 {
1244         struct list_head *tmp, *pos;
1245         struct ldlm_lock_desc d;
1246         int rc = 0, retval = 0;
1247         ENTRY;
1248
1249         list_for_each_safe(tmp, pos, rpc_list) {
1250                 struct ldlm_lock *lock =
1251                         list_entry(tmp, struct ldlm_lock, l_bl_ast);
1252
1253                 /* nobody should touch l_bl_ast */
1254                 lock_res_and_lock(lock);
1255                 list_del_init(&lock->l_bl_ast);
1256
1257                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1258                 LASSERT(lock->l_bl_ast_run == 0);
1259                 LASSERT(lock->l_blocking_lock);
1260                 lock->l_bl_ast_run++;
1261                 unlock_res_and_lock(lock);
1262
1263                 ldlm_lock2desc(lock->l_blocking_lock, &d);
1264
1265                 LDLM_LOCK_PUT(lock->l_blocking_lock);
1266                 lock->l_blocking_lock = NULL;
1267                 rc = lock->l_blocking_ast(lock, &d, NULL, LDLM_CB_BLOCKING);
1268
1269                 if (rc == -ERESTART)
1270                         retval = rc;
1271                 else if (rc)
1272                         CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
1273                                "disconnect client\n");
1274                 LDLM_LOCK_PUT(lock);
1275         }
1276         RETURN(retval);
1277 }
1278
1279 int ldlm_run_cp_ast_work(struct list_head *rpc_list)
1280 {
1281         struct list_head *tmp, *pos;
1282         int rc = 0, retval = 0;
1283         ENTRY;
1284
1285         /* It's possible to receive a completion AST before we've set
1286          * the l_completion_ast pointer: either because the AST arrived
1287          * before the reply, or simply because there's a small race
1288          * window between receiving the reply and finishing the local
1289          * enqueue. (bug 842)
1290          *
1291          * This can't happen with the blocking_ast, however, because we
1292          * will never call the local blocking_ast until we drop our
1293          * reader/writer reference, which we won't do until we get the
1294          * reply and finish enqueueing. */
1295         
1296         list_for_each_safe(tmp, pos, rpc_list) {
1297                 struct ldlm_lock *lock =
1298                         list_entry(tmp, struct ldlm_lock, l_cp_ast);
1299
1300                 /* nobody should touch l_cp_ast */
1301                 lock_res_and_lock(lock);
1302                 list_del_init(&lock->l_cp_ast);
1303                 LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
1304                 lock->l_flags &= ~LDLM_FL_CP_REQD;
1305                 unlock_res_and_lock(lock);
1306
1307                 if (lock->l_completion_ast != NULL)
1308                         rc = lock->l_completion_ast(lock, 0, 0);
1309                 if (rc == -ERESTART)
1310                         retval = rc;
1311                 else if (rc)
1312                         CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
1313                                "disconnect client\n");
1314                 LDLM_LOCK_PUT(lock);
1315         }
1316         RETURN(retval);
1317 }
1318
1319 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
1320 {
1321         ldlm_reprocess_all(res);
1322         return LDLM_ITER_CONTINUE;
1323 }
1324
1325 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
1326 {
1327         struct list_head *tmp;
1328         int i, rc;
1329
1330         ENTRY;
1331         spin_lock(&ns->ns_hash_lock);
1332         for (i = 0; i < RES_HASH_SIZE; i++) {
1333                 tmp = ns->ns_hash[i].next;
1334                 while (tmp != &(ns->ns_hash[i])) {
1335                         struct ldlm_resource *res =
1336                                 list_entry(tmp, struct ldlm_resource, lr_hash);
1337
1338                         ldlm_resource_getref(res);
1339                         spin_unlock(&ns->ns_hash_lock);
1340
1341                         rc = reprocess_one_queue(res, NULL);
1342
1343                         spin_lock(&ns->ns_hash_lock);
1344                         tmp = tmp->next;
1345                         ldlm_resource_putref_locked(res);
1346
1347                         if (rc == LDLM_ITER_STOP)
1348                                 GOTO(out, rc);
1349                 }
1350         }
1351  out:
1352         spin_unlock(&ns->ns_hash_lock);
1353         EXIT;
1354 }
1355
1356 void ldlm_reprocess_all(struct ldlm_resource *res)
1357 {
1358         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1359         int rc;
1360         ENTRY;
1361
1362         /* Local lock trees don't get reprocessed. */
1363         if (res->lr_namespace->ns_client) {
1364                 EXIT;
1365                 return;
1366         }
1367
1368  restart:
1369         lock_res(res);
1370         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
1371         if (rc == LDLM_ITER_CONTINUE)
1372                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
1373         unlock_res(res);
1374
1375         rc = ldlm_run_cp_ast_work(&rpc_list);
1376         if (rc == -ERESTART) {
1377                 LASSERT(list_empty(&rpc_list));
1378                 goto restart;
1379         }
1380         EXIT;
1381 }
1382
1383 void ldlm_cancel_callback(struct ldlm_lock *lock)
1384 {
1385         check_res_locked(lock->l_resource);
1386         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
1387                 lock->l_flags |= LDLM_FL_CANCEL;
1388                 if (lock->l_blocking_ast) {
1389                         // l_check_no_ns_lock(ns);
1390                         unlock_res_and_lock(lock);
1391                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
1392                                              LDLM_CB_CANCELING);
1393                         lock_res_and_lock(lock);
1394                 } else {
1395                         LDLM_DEBUG(lock, "no blocking ast");
1396                 }
1397         }
1398 }
1399
1400 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
1401 {
1402         struct ldlm_lock *lock;
1403
1404         if (req->l_resource->lr_type != LDLM_PLAIN &&
1405             req->l_resource->lr_type != LDLM_IBITS)
1406                 return;
1407         
1408         if (LDLM_SL_HEAD(&req->l_sl_mode)) {
1409                 lock = list_entry(req->l_res_link.next, struct ldlm_lock,
1410                                   l_res_link);
1411                 if (req->l_sl_mode.next == &lock->l_sl_mode) {
1412                         lock->l_sl_mode.prev = NULL;
1413                 } else {
1414                         lock->l_sl_mode.next = req->l_sl_mode.next;
1415                         lock->l_sl_mode.next->prev = &lock->l_sl_mode;
1416                 }
1417                 req->l_sl_mode.next = NULL;
1418         } else if (LDLM_SL_TAIL(&req->l_sl_mode)) {
1419                 lock = list_entry(req->l_res_link.prev, struct ldlm_lock,
1420                                   l_res_link);
1421                 if (req->l_sl_mode.prev == &lock->l_sl_mode) {
1422                         lock->l_sl_mode.next = NULL;
1423                 } else {
1424                         lock->l_sl_mode.prev = req->l_sl_mode.prev;
1425                         lock->l_sl_mode.prev->next = &lock->l_sl_mode;
1426                 }
1427                 req->l_sl_mode.prev = NULL;
1428         }
1429
1430         if (LDLM_SL_HEAD(&req->l_sl_policy)) {
1431                 lock = list_entry(req->l_res_link.next, struct ldlm_lock,
1432                                   l_res_link);
1433                 if (req->l_sl_policy.next == &lock->l_sl_policy) {
1434                         lock->l_sl_policy.prev = NULL;
1435                 } else {
1436                         lock->l_sl_policy.next = req->l_sl_policy.next;
1437                         lock->l_sl_policy.next->prev = &lock->l_sl_policy;
1438                 }
1439                 req->l_sl_policy.next = NULL;
1440         } else if (LDLM_SL_TAIL(&req->l_sl_policy)) {
1441                 lock = list_entry(req->l_res_link.prev, struct ldlm_lock,
1442                                   l_res_link);
1443                 if (req->l_sl_policy.prev == &lock->l_sl_policy) {
1444                         lock->l_sl_policy.next = NULL;
1445                 } else {
1446                         lock->l_sl_policy.prev = req->l_sl_policy.prev;
1447                         lock->l_sl_policy.prev->next = &lock->l_sl_policy;
1448                 }
1449                 req->l_sl_policy.prev = NULL;
1450         }
1451 }
1452
1453 void ldlm_lock_cancel(struct ldlm_lock *lock)
1454 {
1455         struct ldlm_resource *res;
1456         struct ldlm_namespace *ns;
1457         ENTRY;
1458
1459         lock_res_and_lock(lock);
1460
1461         res = lock->l_resource;
1462         ns = res->lr_namespace;
1463
1464         /* Please do not, no matter how tempting, remove this LBUG without
1465          * talking to me first. -phik */
1466         if (lock->l_readers || lock->l_writers) {
1467                 LDLM_ERROR(lock, "lock still has references");
1468                 LBUG();
1469         }
1470
1471         ldlm_del_waiting_lock(lock);
1472
1473         /* Releases res lock */
1474         ldlm_cancel_callback(lock);
1475
1476         /* Yes, second time, just in case it was added again while we were
1477            running with no res lock in ldlm_cancel_callback */
1478         ldlm_del_waiting_lock(lock); 
1479         ldlm_resource_unlink_lock(lock);
1480         ldlm_lock_destroy_nolock(lock);
1481         unlock_res_and_lock(lock);
1482
1483         EXIT;
1484 }
1485
1486 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
1487 {
1488         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1489         ENTRY;
1490
1491         if (lock == NULL)
1492                 RETURN(-EINVAL);
1493
1494         lock->l_ast_data = data;
1495         LDLM_LOCK_PUT(lock);
1496         RETURN(0);
1497 }
1498
1499 void ldlm_cancel_locks_for_export(struct obd_export *exp)
1500 {
1501         struct ldlm_lock *lock;
1502         struct ldlm_resource *res;
1503
1504         spin_lock(&exp->exp_ldlm_data.led_lock);
1505         while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) {
1506                 lock = list_entry(exp->exp_ldlm_data.led_held_locks.next,
1507                                   struct ldlm_lock, l_export_chain);
1508                 res = ldlm_resource_getref(lock->l_resource);
1509                 LDLM_LOCK_GET(lock);
1510                 spin_unlock(&exp->exp_ldlm_data.led_lock);
1511
1512                 LDLM_DEBUG(lock, "export %p", exp);
1513                 ldlm_lock_cancel(lock);
1514                 ldlm_reprocess_all(res);
1515
1516                 ldlm_resource_putref(res);
1517                 LDLM_LOCK_PUT(lock);
1518                 spin_lock(&exp->exp_ldlm_data.led_lock);
1519         }
1520         spin_unlock(&exp->exp_ldlm_data.led_lock);
1521 }
1522
1523 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
1524                                         int *flags)
1525 {
1526         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1527         struct ldlm_resource *res;
1528         struct ldlm_namespace *ns;
1529         int granted = 0;
1530         int old_mode, rc;
1531         struct ldlm_lock *mark_lock = NULL;
1532         int join= LDLM_JOIN_NONE;
1533         ldlm_error_t err;
1534         ENTRY;
1535
1536         if (new_mode == lock->l_granted_mode) { // No changes? Just return.
1537                 *flags |= LDLM_FL_BLOCK_GRANTED;
1538                 RETURN(lock->l_resource);
1539         }
1540
1541         LASSERTF(new_mode == LCK_PW && lock->l_granted_mode == LCK_PR,
1542                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
1543
1544         lock_res_and_lock(lock);
1545
1546         res = lock->l_resource;
1547         ns = res->lr_namespace;
1548
1549         old_mode = lock->l_req_mode;
1550         lock->l_req_mode = new_mode;
1551         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
1552                 /* remember the lock position where the lock might be 
1553                  * added back to the granted list later and also 
1554                  * remember the join mode for skiplist fixing. */
1555                 if (LDLM_SL_HEAD(&lock->l_sl_mode))
1556                         join = LDLM_MODE_JOIN_RIGHT;
1557                 else if (LDLM_SL_TAIL(&lock->l_sl_mode))
1558                         join = LDLM_MODE_JOIN_LEFT;
1559                 if (LDLM_SL_HEAD(&lock->l_sl_policy))
1560                         join |= LDLM_POLICY_JOIN_RIGHT;
1561                 else if (LDLM_SL_TAIL(&lock->l_sl_policy))
1562                         join |= LDLM_POLICY_JOIN_LEFT;
1563
1564                 LASSERT(!((join & LDLM_MODE_JOIN_RIGHT) &&
1565                           (join & LDLM_POLICY_JOIN_LEFT)));
1566                 LASSERT(!((join & LDLM_MODE_JOIN_LEFT) &&
1567                           (join & LDLM_POLICY_JOIN_RIGHT)));
1568
1569                 if ((join & LDLM_MODE_JOIN_LEFT) ||
1570                     (join & LDLM_POLICY_JOIN_LEFT))
1571                         mark_lock = list_entry(lock->l_res_link.prev,
1572                                                struct ldlm_lock, l_res_link);
1573                 else if (lock->l_res_link.next != &res->lr_granted)
1574                         mark_lock = list_entry(lock->l_res_link.next,
1575                                                struct ldlm_lock, l_res_link);
1576         }
1577         ldlm_resource_unlink_lock(lock);
1578
1579         /* If this is a local resource, put it on the appropriate list. */
1580         if (res->lr_namespace->ns_client) {
1581                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
1582                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1583                 } else {
1584                         /* This should never happen, because of the way the
1585                          * server handles conversions. */
1586                         LDLM_ERROR(lock, "Erroneous flags %d on local lock\n",
1587                                    *flags);
1588                         LBUG();
1589
1590                         ldlm_grant_lock(lock, &rpc_list);
1591                         granted = 1;
1592                         /* FIXME: completion handling not with ns_lock held ! */
1593                         if (lock->l_completion_ast)
1594                                 lock->l_completion_ast(lock, 0, NULL);
1595                 }
1596         } else {
1597                 int pflags = 0;
1598                 ldlm_processing_policy policy;
1599                 policy = ldlm_processing_policy_table[res->lr_type];
1600                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
1601                 if (rc == LDLM_ITER_STOP) {
1602                         lock->l_req_mode = old_mode;
1603                         ldlm_granted_list_add_lock(lock, mark_lock, join);
1604                         res = NULL;
1605                 } else {
1606                         *flags |= LDLM_FL_BLOCK_GRANTED;
1607                         granted = 1;
1608                 }
1609         }
1610         unlock_res_and_lock(lock);
1611
1612         if (granted)
1613                 ldlm_run_cp_ast_work(&rpc_list);
1614         RETURN(res);
1615 }
1616
1617 void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos)
1618 {
1619         struct obd_device *obd = NULL;
1620
1621         if (!((libcfs_debug | D_ERROR) & level))
1622                 return;
1623
1624         if (!lock) {
1625                 CDEBUG(level, "  NULL LDLM lock\n");
1626                 return;
1627         }
1628
1629         CDEBUG(level," -- Lock dump: %p/"LPX64" (rc: %d) (pos: %d) (pid: %d)\n",
1630                lock, lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
1631                pos, lock->l_pid);
1632         if (lock->l_conn_export != NULL)
1633                 obd = lock->l_conn_export->exp_obd;
1634         if (lock->l_export && lock->l_export->exp_connection) {
1635                 CDEBUG(level, "  Node: NID %s (rhandle: "LPX64")\n",
1636                      libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid),
1637                      lock->l_remote_handle.cookie);
1638         } else if (obd == NULL) {
1639                 CDEBUG(level, "  Node: local\n");
1640         } else {
1641                 struct obd_import *imp = obd->u.cli.cl_import;
1642                 CDEBUG(level, "  Node: NID %s (rhandle: "LPX64")\n",
1643                        libcfs_nid2str(imp->imp_connection->c_peer.nid),
1644                        lock->l_remote_handle.cookie);
1645         }
1646         CDEBUG(level, "  Resource: %p ("LPU64"/"LPU64")\n", lock->l_resource,
1647                lock->l_resource->lr_name.name[0],
1648                lock->l_resource->lr_name.name[1]);
1649         CDEBUG(level, "  Req mode: %s, grant mode: %s, rc: %u, read: %d, "
1650                "write: %d flags: %#x\n", ldlm_lockname[lock->l_req_mode],
1651                ldlm_lockname[lock->l_granted_mode],
1652                atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers,
1653                lock->l_flags);
1654         if (lock->l_resource->lr_type == LDLM_EXTENT)
1655                 CDEBUG(level, "  Extent: "LPU64" -> "LPU64
1656                        " (req "LPU64"-"LPU64")\n",
1657                        lock->l_policy_data.l_extent.start,
1658                        lock->l_policy_data.l_extent.end,
1659                        lock->l_req_extent.start, lock->l_req_extent.end);
1660         else if (lock->l_resource->lr_type == LDLM_FLOCK)
1661                 CDEBUG(level, "  Pid: %d Extent: "LPU64" -> "LPU64"\n",
1662                        lock->l_policy_data.l_flock.pid,
1663                        lock->l_policy_data.l_flock.start,
1664                        lock->l_policy_data.l_flock.end);
1665        else if (lock->l_resource->lr_type == LDLM_IBITS)
1666                 CDEBUG(level, "  Bits: "LPX64"\n",
1667                        lock->l_policy_data.l_inodebits.bits);
1668 }
1669
1670 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
1671 {
1672         struct ldlm_lock *lock;
1673
1674         if (!((libcfs_debug | D_ERROR) & level))
1675                 return;
1676
1677         lock = ldlm_handle2lock(lockh);
1678         if (lock == NULL)
1679                 return;
1680
1681         ldlm_lock_dump(D_OTHER, lock, 0);
1682
1683         LDLM_LOCK_PUT(lock);
1684 }
1685
1686 void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
1687                       struct libcfs_debug_msg_data *data, const char *fmt,
1688                       ...)
1689 {
1690         va_list args;
1691         cfs_debug_limit_state_t *cdls = data->msg_cdls;
1692
1693         va_start(args, fmt);
1694         if (lock->l_resource == NULL) {
1695                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level, data->msg_file,
1696                                    data->msg_fn, data->msg_line, fmt, args,
1697                                    " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1698                                    "res: \?\? rrc=\?\? type: \?\?\? flags: %x remote: "
1699                                    LPX64" expref: %d pid: %u\n", lock,
1700                                    lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
1701                                    lock->l_readers, lock->l_writers,
1702                                    ldlm_lockname[lock->l_granted_mode],
1703                                    ldlm_lockname[lock->l_req_mode],
1704                                    lock->l_flags, lock->l_remote_handle.cookie,
1705                                    lock->l_export ?
1706                                         atomic_read(&lock->l_export->exp_refcount) : -99,
1707                                    lock->l_pid);
1708                 va_end(args);
1709                 return;
1710         }
1711
1712         switch (lock->l_resource->lr_type) {
1713         case LDLM_EXTENT:
1714                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level, data->msg_file,
1715                                    data->msg_fn, data->msg_line, fmt, args,
1716                                    " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1717                                    "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64
1718                                    "] (req "LPU64"->"LPU64") flags: %x remote: "LPX64
1719                                     " expref: %d pid: %u\n",
1720                                     lock->l_resource->lr_namespace->ns_name, lock,
1721                                     lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
1722                                     lock->l_readers, lock->l_writers,
1723                                     ldlm_lockname[lock->l_granted_mode],
1724                                     ldlm_lockname[lock->l_req_mode],
1725                                     lock->l_resource->lr_name.name[0],
1726                                     lock->l_resource->lr_name.name[1],
1727                                     atomic_read(&lock->l_resource->lr_refcount),
1728                                     ldlm_typename[lock->l_resource->lr_type],
1729                                     lock->l_policy_data.l_extent.start,
1730                                     lock->l_policy_data.l_extent.end,
1731                                     lock->l_req_extent.start, lock->l_req_extent.end,
1732                                     lock->l_flags, lock->l_remote_handle.cookie,
1733                                     lock->l_export ?
1734                                         atomic_read(&lock->l_export->exp_refcount) : -99,
1735                                     lock->l_pid);
1736                 break;
1737         case LDLM_FLOCK:
1738                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level, data->msg_file,
1739                                    data->msg_fn, data->msg_line, fmt, args,
1740                                    " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1741                                    "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d "
1742                                    "["LPU64"->"LPU64"] flags: %x remote: "LPX64
1743                                    " expref: %d pid: %u\n",
1744                                    lock->l_resource->lr_namespace->ns_name, lock,
1745                                    lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
1746                                    lock->l_readers, lock->l_writers,
1747                                    ldlm_lockname[lock->l_granted_mode],
1748                                    ldlm_lockname[lock->l_req_mode],
1749                                    lock->l_resource->lr_name.name[0],
1750                                    lock->l_resource->lr_name.name[1],
1751                                    atomic_read(&lock->l_resource->lr_refcount),
1752                                    ldlm_typename[lock->l_resource->lr_type],
1753                                    lock->l_policy_data.l_flock.pid,
1754                                    lock->l_policy_data.l_flock.start,
1755                                    lock->l_policy_data.l_flock.end,
1756                                    lock->l_flags, lock->l_remote_handle.cookie,
1757                                    lock->l_export ?
1758                                         atomic_read(&lock->l_export->exp_refcount) : -99,
1759                                    lock->l_pid);
1760                 break;
1761         case LDLM_IBITS:
1762                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level, data->msg_file,
1763                                    data->msg_fn, data->msg_line, fmt, args,
1764                                    " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1765                                    "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "
1766                                    "flags: %x remote: "LPX64" expref: %d "
1767                                    "pid %u\n",
1768                                    lock->l_resource->lr_namespace->ns_name,
1769                                    lock, lock->l_handle.h_cookie,
1770                                    atomic_read (&lock->l_refc),
1771                                    lock->l_readers, lock->l_writers,
1772                                    ldlm_lockname[lock->l_granted_mode],
1773                                    ldlm_lockname[lock->l_req_mode],
1774                                    lock->l_resource->lr_name.name[0],
1775                                    lock->l_resource->lr_name.name[1],
1776                                    lock->l_policy_data.l_inodebits.bits,
1777                                    atomic_read(&lock->l_resource->lr_refcount),
1778                                    ldlm_typename[lock->l_resource->lr_type],
1779                                    lock->l_flags, lock->l_remote_handle.cookie,
1780                                    lock->l_export ?
1781                                         atomic_read(&lock->l_export->exp_refcount) : -99,
1782                                    lock->l_pid);
1783                 break;
1784         default:
1785                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level, data->msg_file,
1786                                    data->msg_fn, data->msg_line, fmt, args,
1787                                    " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1788                                    "res: "LPU64"/"LPU64" rrc: %d type: %s flags: %x "
1789                                    "remote: "LPX64" expref: %d pid: %u\n",
1790                                    lock->l_resource->lr_namespace->ns_name,
1791                                    lock, lock->l_handle.h_cookie,
1792                                    atomic_read (&lock->l_refc),
1793                                    lock->l_readers, lock->l_writers,
1794                                    ldlm_lockname[lock->l_granted_mode],
1795                                    ldlm_lockname[lock->l_req_mode],
1796                                    lock->l_resource->lr_name.name[0],
1797                                    lock->l_resource->lr_name.name[1],
1798                                    atomic_read(&lock->l_resource->lr_refcount),
1799                                    ldlm_typename[lock->l_resource->lr_type],
1800                                    lock->l_flags, lock->l_remote_handle.cookie,
1801                                    lock->l_export ?
1802                                          atomic_read(&lock->l_export->exp_refcount) : -99,
1803                                    lock->l_pid);
1804                 break;
1805         }
1806         va_end(args);
1807 }
1808 EXPORT_SYMBOL(_ldlm_lock_debug);