Whamcloud - gitweb
fix IS_ERR implementation in liblustre.h for right detect errors
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  */
26
27 #define DEBUG_SUBSYSTEM S_LDLM
28
29 #ifdef __KERNEL__
30 # include <libcfs/libcfs.h>
31 # include <linux/lustre_intent.h>
32 #else
33 # include <liblustre.h>
34 # include <libcfs/kp30.h>
35 #endif
36
37 #include <obd_class.h>
38 #include "ldlm_internal.h"
39
40 //struct lustre_lock ldlm_everything_lock;
41
42 /* lock's skip list pointers fix mode */
43 #define LDLM_JOIN_NONE          0
44 #define LDLM_MODE_JOIN_RIGHT    1
45 #define LDLM_MODE_JOIN_LEFT     (1 << 1)
46 #define LDLM_POLICY_JOIN_RIGHT  (1 << 2)
47 #define LDLM_POLICY_JOIN_LEFT   (1 << 3)
48
49 /* lock types */
50 char *ldlm_lockname[] = {
51         [0] "--",
52         [LCK_EX] "EX",
53         [LCK_PW] "PW",
54         [LCK_PR] "PR",
55         [LCK_CW] "CW",
56         [LCK_CR] "CR",
57         [LCK_NL] "NL",
58         [LCK_GROUP] "GROUP"
59 };
60
61 char *ldlm_typename[] = {
62         [LDLM_PLAIN] "PLN",
63         [LDLM_EXTENT] "EXT",
64         [LDLM_FLOCK] "FLK",
65         [LDLM_IBITS] "IBT",
66 };
67
68 char *ldlm_it2str(int it)
69 {
70         switch (it) {
71         case IT_OPEN:
72                 return "open";
73         case IT_CREAT:
74                 return "creat";
75         case (IT_OPEN | IT_CREAT):
76                 return "open|creat";
77         case IT_READDIR:
78                 return "readdir";
79         case IT_GETATTR:
80                 return "getattr";
81         case IT_LOOKUP:
82                 return "lookup";
83         case IT_UNLINK:
84                 return "unlink";
85         case IT_GETXATTR:
86                 return "getxattr";
87         default:
88                 CERROR("Unknown intent %d\n", it);
89                 return "UNKNOWN";
90         }
91 }
92
93 extern cfs_mem_cache_t *ldlm_lock_slab;
94
95 static ldlm_processing_policy ldlm_processing_policy_table[] = {
96         [LDLM_PLAIN] ldlm_process_plain_lock,
97         [LDLM_EXTENT] ldlm_process_extent_lock,
98 #ifdef __KERNEL__
99         [LDLM_FLOCK] ldlm_process_flock_lock,
100 #endif
101         [LDLM_IBITS] ldlm_process_inodebits_lock,
102 };
103
104 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
105 {
106         return ldlm_processing_policy_table[res->lr_type];
107 }
108
109 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
110 {
111         ns->ns_policy = arg;
112 }
113
114 /*
115  * REFCOUNTED LOCK OBJECTS
116  */
117
118
119 /*
120  * Lock refcounts, during creation:
121  *   - one special one for allocation, dec'd only once in destroy
122  *   - one for being a lock that's in-use
123  *   - one for the addref associated with a new lock
124  */
125 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
126 {
127         atomic_inc(&lock->l_refc);
128         return lock;
129 }
130
131 static void ldlm_lock_free(struct ldlm_lock *lock, size_t size)
132 {
133         LASSERT(size == sizeof(*lock));
134         OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
135 }
136
137 void ldlm_lock_put(struct ldlm_lock *lock)
138 {
139         ENTRY;
140
141         LASSERT(lock->l_resource != LP_POISON);
142         LASSERT(atomic_read(&lock->l_refc) > 0);
143         if (atomic_dec_and_test(&lock->l_refc)) {
144                 struct ldlm_resource *res;
145
146                 LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing it.");
147
148                 res = lock->l_resource;
149                 LASSERT(lock->l_destroyed);
150                 LASSERT(list_empty(&lock->l_res_link));
151                 LASSERT(list_empty(&lock->l_pending_chain));
152
153                 atomic_dec(&res->lr_namespace->ns_locks);
154                 ldlm_resource_putref(res);
155                 lock->l_resource = NULL;
156                 if (lock->l_export) {
157                         class_export_put(lock->l_export);
158                         lock->l_export = NULL;
159                 }
160
161                 if (lock->l_lvb_data != NULL)
162                         OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
163
164                 OBD_FREE_RCU_CB(lock, sizeof(*lock), &lock->l_handle, 
165                                 ldlm_lock_free);
166         }
167
168         EXIT;
169 }
170
171 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
172 {
173         int rc = 0;
174         if (!list_empty(&lock->l_lru)) {
175                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
176                 list_del_init(&lock->l_lru);
177                 lock->l_resource->lr_namespace->ns_nr_unused--;
178                 LASSERT(lock->l_resource->lr_namespace->ns_nr_unused >= 0);
179                 rc = 1;
180         }
181         return rc;
182 }
183
184 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
185 {
186         int rc;
187         ENTRY;
188         spin_lock(&lock->l_resource->lr_namespace->ns_unused_lock);
189         rc = ldlm_lock_remove_from_lru_nolock(lock);
190         spin_unlock(&lock->l_resource->lr_namespace->ns_unused_lock);
191         EXIT;
192         return rc;
193 }
194
195 /* This used to have a 'strict' flag, which recovery would use to mark an
196  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
197  * shall explain why it's gone: with the new hash table scheme, once you call
198  * ldlm_lock_destroy, you can never drop your final references on this lock.
199  * Because it's not in the hash table anymore.  -phil */
200 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
201 {
202         ENTRY;
203
204         if (lock->l_readers || lock->l_writers) {
205                 LDLM_ERROR(lock, "lock still has references");
206                 ldlm_lock_dump(D_ERROR, lock, 0);
207                 LBUG();
208         }
209
210         if (!list_empty(&lock->l_res_link)) {
211                 LDLM_ERROR(lock, "lock still on resource");
212                 ldlm_lock_dump(D_ERROR, lock, 0);
213                 LBUG();
214         }
215
216         if (lock->l_destroyed) {
217                 LASSERT(list_empty(&lock->l_lru));
218                 EXIT;
219                 return 0;
220         }
221         lock->l_destroyed = 1;
222
223         if (lock->l_export)
224                 spin_lock(&lock->l_export->exp_ldlm_data.led_lock);
225         list_del_init(&lock->l_export_chain);
226         if (lock->l_export)
227                 spin_unlock(&lock->l_export->exp_ldlm_data.led_lock);
228
229         ldlm_lock_remove_from_lru(lock);
230         class_handle_unhash(&lock->l_handle);
231
232 #if 0
233         /* Wake anyone waiting for this lock */
234         /* FIXME: I should probably add yet another flag, instead of using
235          * l_export to only call this on clients */
236         if (lock->l_export)
237                 class_export_put(lock->l_export);
238         lock->l_export = NULL;
239         if (lock->l_export && lock->l_completion_ast)
240                 lock->l_completion_ast(lock, 0);
241 #endif
242         EXIT;
243         return 1;
244 }
245
246 void ldlm_lock_destroy(struct ldlm_lock *lock)
247 {
248         int first;
249         ENTRY;
250         lock_res_and_lock(lock);
251         first = ldlm_lock_destroy_internal(lock);
252         unlock_res_and_lock(lock);
253
254         /* drop reference from hashtable only for first destroy */
255         if (first)
256                 LDLM_LOCK_PUT(lock);
257         EXIT;
258 }
259
260 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
261 {
262         int first;
263         ENTRY;
264         first = ldlm_lock_destroy_internal(lock);
265         /* drop reference from hashtable only for first destroy */
266         if (first)
267                 LDLM_LOCK_PUT(lock);
268         EXIT;
269 }
270
271 /* this is called by portals_handle2object with the handle lock taken */
272 static void lock_handle_addref(void *lock)
273 {
274         LDLM_LOCK_GET((struct ldlm_lock *)lock);
275 }
276
277 /*
278  * usage: pass in a resource on which you have done ldlm_resource_get
279  *        pass in a parent lock on which you have done a ldlm_lock_get
280  *        after return, ldlm_*_put the resource and parent
281  * returns: lock with refcount 2 - one for current caller and one for remote
282  */
283 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
284 {
285         struct ldlm_lock *lock;
286         ENTRY;
287
288         if (resource == NULL)
289                 LBUG();
290
291         OBD_SLAB_ALLOC(lock, ldlm_lock_slab, CFS_ALLOC_IO, sizeof(*lock));
292         if (lock == NULL)
293                 RETURN(NULL);
294
295         lock->l_resource = ldlm_resource_getref(resource);
296
297         atomic_set(&lock->l_refc, 2);
298         CFS_INIT_LIST_HEAD(&lock->l_res_link);
299         CFS_INIT_LIST_HEAD(&lock->l_lru);
300         CFS_INIT_LIST_HEAD(&lock->l_export_chain);
301         CFS_INIT_LIST_HEAD(&lock->l_pending_chain);
302         CFS_INIT_LIST_HEAD(&lock->l_bl_ast);
303         CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
304         cfs_waitq_init(&lock->l_waitq);
305         lock->l_blocking_lock = NULL;
306         lock->l_pidb = 0;
307         lock->l_sl_mode.prev = NULL;
308         lock->l_sl_mode.next = NULL;
309         lock->l_sl_policy.prev = NULL;
310         lock->l_sl_policy.next = NULL;
311
312         atomic_inc(&resource->lr_namespace->ns_locks);
313         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
314         class_handle_hash(&lock->l_handle, lock_handle_addref);
315
316         RETURN(lock);
317 }
318
319 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
320                               struct ldlm_res_id new_resid)
321 {
322         struct ldlm_resource *oldres = lock->l_resource;
323         struct ldlm_resource *newres;
324         int type;
325         ENTRY;
326
327         LASSERT(ns->ns_client != 0);
328
329         lock_res_and_lock(lock);
330         if (memcmp(&new_resid, &lock->l_resource->lr_name,
331                    sizeof(lock->l_resource->lr_name)) == 0) {
332                 /* Nothing to do */
333                 unlock_res_and_lock(lock);
334                 RETURN(0);
335         }
336
337         LASSERT(new_resid.name[0] != 0);
338
339         /* This function assumes that the lock isn't on any lists */
340         LASSERT(list_empty(&lock->l_res_link));
341
342         type = oldres->lr_type;
343         unlock_res_and_lock(lock);
344
345         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
346         if (newres == NULL) {
347                 LBUG();
348                 RETURN(-ENOMEM);
349         }
350
351         lock_res_and_lock(lock);
352         LASSERT(memcmp(&new_resid, &lock->l_resource->lr_name,
353                        sizeof(lock->l_resource->lr_name)) != 0);
354         lock_res(newres);
355         lock->l_resource = newres;
356         unlock_res(newres);
357         unlock_res(oldres);
358         unlock_bitlock(lock);
359
360         /* ...and the flowers are still standing! */
361         ldlm_resource_putref(oldres);
362
363         RETURN(0);
364 }
365
366 /*
367  *  HANDLES
368  */
369
370 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
371 {
372         lockh->cookie = lock->l_handle.h_cookie;
373 }
374
375 /* if flags: atomically get the lock and set the flags.
376  *           Return NULL if flag already set
377  */
378
379 struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags)
380 {
381         struct ldlm_namespace *ns;
382         struct ldlm_lock *lock = NULL, *retval = NULL;
383         ENTRY;
384
385         LASSERT(handle);
386
387         lock = class_handle2object(handle->cookie);
388         if (lock == NULL)
389                 RETURN(NULL);
390
391         LASSERT(lock->l_resource != NULL);
392         ns = lock->l_resource->lr_namespace;
393         LASSERT(ns != NULL);
394
395         lock_res_and_lock(lock);
396
397         /* It's unlikely but possible that someone marked the lock as
398          * destroyed after we did handle2object on it */
399         if (lock->l_destroyed) {
400                 unlock_res_and_lock(lock);
401                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
402                 LDLM_LOCK_PUT(lock);
403                 GOTO(out, retval);
404         }
405
406         if (flags && (lock->l_flags & flags)) {
407                 unlock_res_and_lock(lock);
408                 LDLM_LOCK_PUT(lock);
409                 GOTO(out, retval);
410         }
411
412         if (flags)
413                 lock->l_flags |= flags;
414
415         unlock_res_and_lock(lock);
416         retval = lock;
417         EXIT;
418  out:
419         return retval;
420 }
421
422 struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns,
423                                       struct lustre_handle *handle)
424 {
425         struct ldlm_lock *retval = NULL;
426         retval = __ldlm_handle2lock(handle, 0);
427         return retval;
428 }
429
430 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
431 {
432         struct obd_export *exp = lock->l_export?:lock->l_conn_export;
433         /* INODEBITS_INTEROP: If the other side does not support
434          * inodebits, reply with a plain lock descriptor.
435          */
436         if ((lock->l_resource->lr_type == LDLM_IBITS) &&
437             (exp && !(exp->exp_connect_flags & OBD_CONNECT_IBITS))) {
438                 struct ldlm_resource res = *lock->l_resource;
439
440                 /* Make sure all the right bits are set in this lock we
441                    are going to pass to client */
442                 LASSERTF(lock->l_policy_data.l_inodebits.bits ==
443                          (MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE),
444                          "Inappropriate inode lock bits during "
445                          "conversion " LPU64 "\n",
446                          lock->l_policy_data.l_inodebits.bits);
447                 res.lr_type = LDLM_PLAIN;
448                 ldlm_res2desc(&res, &desc->l_resource);
449                 /* Convert "new" lock mode to something old client can
450                    understand */
451                 if ((lock->l_req_mode == LCK_CR) ||
452                     (lock->l_req_mode == LCK_CW))
453                         desc->l_req_mode = LCK_PR;
454                 else
455                         desc->l_req_mode = lock->l_req_mode;
456                 if ((lock->l_granted_mode == LCK_CR) ||
457                     (lock->l_granted_mode == LCK_CW)) {
458                         desc->l_granted_mode = LCK_PR;
459                 } else {
460                         /* We never grant PW/EX locks to clients */
461                         LASSERT((lock->l_granted_mode != LCK_PW) &&
462                                 (lock->l_granted_mode != LCK_EX));
463                         desc->l_granted_mode = lock->l_granted_mode;
464                 }
465
466                 /* We do not copy policy here, because there is no
467                    policy for plain locks */
468         } else {
469                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
470                 desc->l_req_mode = lock->l_req_mode;
471                 desc->l_granted_mode = lock->l_granted_mode;
472                 desc->l_policy_data = lock->l_policy_data;
473         }
474 }
475
476 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
477                            struct list_head *work_list)
478 {
479         if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
480                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
481                 lock->l_flags |= LDLM_FL_AST_SENT;
482                 /* If the enqueuing client said so, tell the AST recipient to
483                  * discard dirty data, rather than writing back. */
484                 if (new->l_flags & LDLM_AST_DISCARD_DATA)
485                         lock->l_flags |= LDLM_FL_DISCARD_DATA;
486                 LASSERT(list_empty(&lock->l_bl_ast));
487                 list_add(&lock->l_bl_ast, work_list);
488                 LDLM_LOCK_GET(lock);
489                 LASSERT(lock->l_blocking_lock == NULL);
490                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
491         }
492 }
493
494 void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list)
495 {
496         if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
497                 lock->l_flags |= LDLM_FL_CP_REQD;
498                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
499                 LASSERT(list_empty(&lock->l_cp_ast));
500                 list_add(&lock->l_cp_ast, work_list);
501                 LDLM_LOCK_GET(lock);
502         }
503 }
504
505 /* must be called with lr_lock held */
506 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
507                                 struct list_head *work_list)
508 {
509         ENTRY;
510         check_res_locked(lock->l_resource);
511         if (new)
512                 ldlm_add_bl_work_item(lock, new, work_list);
513         else 
514                 ldlm_add_cp_work_item(lock, work_list);
515         EXIT;
516 }
517
518 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
519 {
520         struct ldlm_lock *lock;
521
522         lock = ldlm_handle2lock(lockh);
523         LASSERT(lock != NULL);
524         ldlm_lock_addref_internal(lock, mode);
525         LDLM_LOCK_PUT(lock);
526 }
527
528 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
529 {
530         ldlm_lock_remove_from_lru(lock);
531         if (mode & (LCK_NL | LCK_CR | LCK_PR))
532                 lock->l_readers++;
533         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP))
534                 lock->l_writers++;
535         lock->l_last_used = cfs_time_current();
536         LDLM_LOCK_GET(lock);
537         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
538 }
539
540 /* only called for local locks */
541 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
542 {
543         lock_res_and_lock(lock);
544         ldlm_lock_addref_internal_nolock(lock, mode);
545         unlock_res_and_lock(lock);
546 }
547
548 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
549 {
550         struct ldlm_namespace *ns;
551         ENTRY;
552
553         lock_res_and_lock(lock);
554
555         ns = lock->l_resource->lr_namespace;
556
557         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
558         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
559                 LASSERT(lock->l_readers > 0);
560                 lock->l_readers--;
561         }
562         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP)) {
563                 LASSERT(lock->l_writers > 0);
564                 lock->l_writers--;
565         }
566
567         if (lock->l_flags & LDLM_FL_LOCAL &&
568             !lock->l_readers && !lock->l_writers) {
569                 /* If this is a local lock on a server namespace and this was
570                  * the last reference, cancel the lock. */
571                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
572                 lock->l_flags |= LDLM_FL_CBPENDING;
573         }
574
575         if (!lock->l_readers && !lock->l_writers &&
576             (lock->l_flags & LDLM_FL_CBPENDING)) {
577                 /* If we received a blocked AST and this was the last reference,
578                  * run the callback. */
579                 if (ns->ns_client == LDLM_NAMESPACE_SERVER && lock->l_export)
580                         CERROR("FL_CBPENDING set on non-local lock--just a "
581                                "warning\n");
582
583                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
584
585                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
586                 ldlm_lock_remove_from_lru(lock);
587                 unlock_res_and_lock(lock);
588                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
589                                 ldlm_bl_to_thread(ns, NULL, lock, 0) != 0)
590                         ldlm_handle_bl_callback(ns, NULL, lock);
591         } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT &&
592                    !lock->l_readers && !lock->l_writers &&
593                    !(lock->l_flags & LDLM_FL_NO_LRU)) {
594                 /* If this is a client-side namespace and this was the last
595                  * reference, put it on the LRU. */
596                 LASSERT(list_empty(&lock->l_lru));
597                 LASSERT(ns->ns_nr_unused >= 0);
598                 lock->l_last_used = cfs_time_current();
599                 spin_lock(&ns->ns_unused_lock);
600                 list_add_tail(&lock->l_lru, &ns->ns_unused_list);
601                 ns->ns_nr_unused++;
602                 spin_unlock(&ns->ns_unused_lock);
603                 unlock_res_and_lock(lock);
604                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL is not supported
605                  * by the server, otherwise, it is done on enqueue. */
606                 if (!exp_connect_cancelset(lock->l_conn_export))
607                         ldlm_cancel_lru(ns, LDLM_ASYNC);
608         } else {
609                 unlock_res_and_lock(lock);
610         }
611
612         LDLM_LOCK_PUT(lock);    /* matches the ldlm_lock_get in addref */
613
614         EXIT;
615 }
616
617 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
618 {
619         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
620         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
621         ldlm_lock_decref_internal(lock, mode);
622         LDLM_LOCK_PUT(lock);
623 }
624
625 /* This will drop a lock reference and mark it for destruction, but will not
626  * necessarily cancel the lock before returning. */
627 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
628 {
629         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
630         ENTRY;
631
632         LASSERT(lock != NULL);
633
634         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
635         lock_res_and_lock(lock);
636         lock->l_flags |= LDLM_FL_CBPENDING;
637         unlock_res_and_lock(lock);
638         ldlm_lock_decref_internal(lock, mode);
639         LDLM_LOCK_PUT(lock);
640 }
641
642 /*
643  * search_granted_lock
644  *
645  * Description:
646  *      Finds a position to insert the new lock.
647  * Parameters:
648  *      queue [input]:  the granted list where search acts on;
649  *      req [input]:    the lock whose position to be located;
650  *      lockp [output]: the position where the lock should be inserted before, or
651  *                      NULL indicating @req should be appended to @queue.
652  * Return Values:
653  *      Bit-masks combination of following values indicating in which way the 
654  *      lock need to be inserted.
655  *      - LDLM_JOIN_NONE:       noting about skip list needs to be fixed;
656  *      - LDLM_MODE_JOIN_RIGHT: @req needs join right becoming the head of a 
657  *                              mode group;
658  *      - LDLM_POLICY_JOIN_RIGHT: @req needs join right becoming the head of
659  *                                a policy group.
660  * NOTE: called by
661  *  - ldlm_grant_lock_with_skiplist
662  */
663 static int search_granted_lock(struct list_head *queue, 
664                         struct ldlm_lock *req,
665                         struct ldlm_lock **lockp)
666 {
667         struct list_head *tmp, *tmp_tail;
668         struct ldlm_lock *lock, *mode_head_lock;
669         int rc = LDLM_JOIN_NONE;
670         ENTRY;
671
672         list_for_each(tmp, queue) {
673                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
674
675                 if (lock->l_req_mode != req->l_req_mode) {
676                         if (LDLM_SL_HEAD(&lock->l_sl_mode))
677                                 tmp = &list_entry(lock->l_sl_mode.next,
678                                                   struct ldlm_lock,
679                                                   l_sl_mode)->l_res_link;
680                         continue;
681                 }
682                 
683                 /* found the same mode group */
684                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
685                         *lockp = lock;
686                         rc = LDLM_MODE_JOIN_RIGHT;
687                         GOTO(out, rc);
688                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
689                         tmp_tail = tmp;
690                         if (LDLM_SL_HEAD(&lock->l_sl_mode))
691                                 tmp_tail = &list_entry(lock->l_sl_mode.next,
692                                                        struct ldlm_lock,
693                                                        l_sl_mode)->l_res_link;
694                         mode_head_lock = lock;
695                         for (;;) {
696                                 if (lock->l_policy_data.l_inodebits.bits ==
697                                     req->l_policy_data.l_inodebits.bits) {
698                                         /* matched policy lock is found */
699                                         *lockp = lock;
700                                         rc |= LDLM_POLICY_JOIN_RIGHT;
701
702                                         /* if the policy group head is also a 
703                                          * mode group head or a single mode
704                                          * group lock */
705                                         if (LDLM_SL_HEAD(&lock->l_sl_mode) ||
706                                             (tmp == tmp_tail &&
707                                              LDLM_SL_EMPTY(&lock->l_sl_mode)))
708                                                 rc |= LDLM_MODE_JOIN_RIGHT;
709                                         GOTO(out, rc);
710                                 }
711
712                                 if (LDLM_SL_HEAD(&lock->l_sl_policy))
713                                         tmp = &list_entry(lock->l_sl_policy.next,
714                                                           struct ldlm_lock,
715                                                           l_sl_policy)->l_res_link;
716
717                                 if (tmp == tmp_tail)
718                                         break;
719                                 else
720                                         tmp = tmp->next;
721                                 lock = list_entry(tmp, struct ldlm_lock, 
722                                                   l_res_link);
723                         }  /* for all locks in the matched mode group */
724
725                         /* no matched policy group is found, insert before
726                          * the mode group head lock */
727                         *lockp = mode_head_lock;
728                         rc = LDLM_MODE_JOIN_RIGHT;
729                         GOTO(out, rc);
730                 } else {
731                         LDLM_ERROR(lock, "is not LDLM_PLAIN or LDLM_IBITS lock");
732                         LBUG();
733                 }
734         }
735
736         /* no matched mode group is found, append to the end */
737         *lockp = NULL;
738         rc = LDLM_JOIN_NONE;
739         EXIT;
740 out:
741         return rc;
742 }
743
744 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock, 
745                                        struct ldlm_lock *lockp,
746                                        int join)
747 {
748         struct ldlm_resource *res = lock->l_resource;
749         ENTRY;
750
751         LASSERT(lockp || join == LDLM_JOIN_NONE);
752
753         check_res_locked(res);
754
755         ldlm_resource_dump(D_OTHER, res);
756         CDEBUG(D_OTHER, "About to add this lock:\n");
757         ldlm_lock_dump(D_OTHER, lock, 0);
758
759         if (lock->l_destroyed) {
760                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
761                 return;
762         }
763
764         LASSERT(list_empty(&lock->l_res_link));
765
766         if (!lockp)
767                 list_add_tail(&lock->l_res_link, &lock->l_resource->lr_granted);
768         else if ((join & LDLM_MODE_JOIN_LEFT) || (join & LDLM_POLICY_JOIN_LEFT))
769                 list_add(&lock->l_res_link, &lockp->l_res_link);
770         else
771                 list_add_tail(&lock->l_res_link, &lockp->l_res_link);
772
773         /* fix skip lists */
774         if (join & LDLM_MODE_JOIN_RIGHT) {
775                 LASSERT(! LDLM_SL_TAIL(&lockp->l_sl_mode));
776                 if (LDLM_SL_EMPTY(&lockp->l_sl_mode)) {
777                         lock->l_sl_mode.next = &lockp->l_sl_mode;
778                         lockp->l_sl_mode.prev = &lock->l_sl_mode;
779                 } else if (LDLM_SL_HEAD(&lockp->l_sl_mode)) {
780                         lock->l_sl_mode.next = lockp->l_sl_mode.next;
781                         lockp->l_sl_mode.next = NULL;
782                         lock->l_sl_mode.next->prev = &lock->l_sl_mode;
783                 }
784         } else if (join & LDLM_MODE_JOIN_LEFT) {
785                 LASSERT(! LDLM_SL_HEAD(&lockp->l_sl_mode));
786                 if (LDLM_SL_EMPTY(&lockp->l_sl_mode)) {
787                         lock->l_sl_mode.prev = &lockp->l_sl_mode;
788                         lockp->l_sl_mode.next = &lock->l_sl_mode;
789                 } else if (LDLM_SL_TAIL(&lockp->l_sl_mode)) {
790                         lock->l_sl_mode.prev = lockp->l_sl_mode.prev;
791                         lockp->l_sl_mode.prev = NULL;
792                         lock->l_sl_mode.prev->next = &lock->l_sl_mode;
793                 }
794         }
795         
796         if (join & LDLM_POLICY_JOIN_RIGHT) {
797                 LASSERT(! LDLM_SL_TAIL(&lockp->l_sl_policy));
798                 if (LDLM_SL_EMPTY(&lockp->l_sl_policy)) {
799                         lock->l_sl_policy.next = &lockp->l_sl_policy;
800                         lockp->l_sl_policy.prev = &lock->l_sl_policy;
801                 } else if (LDLM_SL_HEAD(&lockp->l_sl_policy)) {
802                         lock->l_sl_policy.next = lockp->l_sl_policy.next;
803                         lockp->l_sl_policy.next = NULL;
804                         lock->l_sl_policy.next->prev = &lock->l_sl_policy;
805                 }
806         } else if (join & LDLM_POLICY_JOIN_LEFT) {
807                 LASSERT(! LDLM_SL_HEAD(&lockp->l_sl_policy));
808                 if (LDLM_SL_EMPTY(&lockp->l_sl_policy)) {
809                         lock->l_sl_policy.prev = &lockp->l_sl_policy;
810                         lockp->l_sl_policy.next = &lock->l_sl_policy;
811                 } else if (LDLM_SL_TAIL(&lockp->l_sl_policy)) {
812                         lock->l_sl_policy.prev = lockp->l_sl_policy.prev;
813                         lockp->l_sl_policy.prev = NULL;
814                         lock->l_sl_policy.prev->next = &lock->l_sl_policy;
815                 }
816         }
817
818         EXIT;
819 }
820
821 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
822 {
823         int join = LDLM_JOIN_NONE;
824         struct ldlm_lock *lockp = NULL;
825         ENTRY;
826
827         LASSERT(lock->l_req_mode == lock->l_granted_mode);
828
829         join = search_granted_lock(&lock->l_resource->lr_granted, lock, &lockp);
830         ldlm_granted_list_add_lock(lock, lockp, join);
831         EXIT;
832 }
833
834 /* NOTE: called by
835  *  - ldlm_lock_enqueue
836  *  - ldlm_reprocess_queue
837  *  - ldlm_lock_convert
838  *
839  * must be called with lr_lock held
840  */
841 void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
842 {
843         struct ldlm_resource *res = lock->l_resource;
844         ENTRY;
845
846         check_res_locked(res);
847
848         lock->l_granted_mode = lock->l_req_mode;
849         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
850                 ldlm_grant_lock_with_skiplist(lock);
851         else
852                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
853
854         if (lock->l_granted_mode < res->lr_most_restr)
855                 res->lr_most_restr = lock->l_granted_mode;
856
857         if (work_list && lock->l_completion_ast != NULL)
858                 ldlm_add_ast_work_item(lock, NULL, work_list);
859
860         EXIT;
861 }
862
863 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
864  * comment above ldlm_lock_match */
865 static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
866                                       ldlm_policy_data_t *policy,
867                                       struct ldlm_lock *old_lock, int flags)
868 {
869         struct ldlm_lock *lock;
870         struct list_head *tmp;
871
872         list_for_each(tmp, queue) {
873                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
874
875                 if (lock == old_lock)
876                         break;
877
878                 /* llite sometimes wants to match locks that will be
879                  * canceled when their users drop, but we allow it to match
880                  * if it passes in CBPENDING and the lock still has users.
881                  * this is generally only going to be used by children
882                  * whose parents already hold a lock so forward progress
883                  * can still happen. */
884                 if (lock->l_flags & LDLM_FL_CBPENDING &&
885                     !(flags & LDLM_FL_CBPENDING))
886                         continue;
887                 if (lock->l_flags & LDLM_FL_CBPENDING &&
888                     lock->l_readers == 0 && lock->l_writers == 0)
889                         continue;
890
891                 if (!(lock->l_req_mode & mode))
892                         continue;
893
894                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
895                     (lock->l_policy_data.l_extent.start >
896                      policy->l_extent.start ||
897                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
898                         continue;
899
900                 if (unlikely(mode == LCK_GROUP) &&
901                     lock->l_resource->lr_type == LDLM_EXTENT &&
902                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
903                         continue;
904
905                 /* We match if we have existing lock with same or wider set
906                    of bits. */
907                 if (lock->l_resource->lr_type == LDLM_IBITS &&
908                      ((lock->l_policy_data.l_inodebits.bits &
909                       policy->l_inodebits.bits) !=
910                       policy->l_inodebits.bits))
911                         continue;
912
913                 if (lock->l_destroyed || (lock->l_flags & LDLM_FL_FAILED))
914                         continue;
915
916                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
917                     !(lock->l_flags & LDLM_FL_LOCAL))
918                         continue;
919
920                 if (flags & LDLM_FL_TEST_LOCK)
921                         LDLM_LOCK_GET(lock);
922                 else
923                         ldlm_lock_addref_internal_nolock(lock, mode);
924                 return lock;
925         }
926
927         return NULL;
928 }
929
930 void ldlm_lock_allow_match(struct ldlm_lock *lock)
931 {
932         lock_res_and_lock(lock);
933         lock->l_flags |= LDLM_FL_LVB_READY;
934         cfs_waitq_signal(&lock->l_waitq);
935         unlock_res_and_lock(lock);
936 }
937
938 /* Can be called in two ways:
939  *
940  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
941  * for a duplicate of.
942  *
943  * Otherwise, all of the fields must be filled in, to match against.
944  *
945  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
946  *     server (ie, connh is NULL)
947  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
948  *     list will be considered
949  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
950  *     to be canceled can still be matched as long as they still have reader
951  *     or writer refernces
952  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
953  *     just tell us if we would have matched.
954  *
955  * Returns 1 if it finds an already-existing lock that is compatible; in this
956  * case, lockh is filled in with a addref()ed lock
957  */
958 int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
959                     struct ldlm_res_id *res_id, ldlm_type_t type,
960                     ldlm_policy_data_t *policy, ldlm_mode_t mode,
961                     struct lustre_handle *lockh)
962 {
963         struct ldlm_resource *res;
964         struct ldlm_lock *lock, *old_lock = NULL;
965         int rc = 0;
966         ENTRY;
967
968         if (ns == NULL) {
969                 old_lock = ldlm_handle2lock(lockh);
970                 LASSERT(old_lock);
971
972                 ns = old_lock->l_resource->lr_namespace;
973                 res_id = &old_lock->l_resource->lr_name;
974                 type = old_lock->l_resource->lr_type;
975                 mode = old_lock->l_req_mode;
976         }
977
978         res = ldlm_resource_get(ns, NULL, *res_id, type, 0);
979         if (res == NULL) {
980                 LASSERT(old_lock == NULL);
981                 RETURN(0);
982         }
983
984         lock_res(res);
985
986         lock = search_queue(&res->lr_granted, mode, policy, old_lock, flags);
987         if (lock != NULL)
988                 GOTO(out, rc = 1);
989         if (flags & LDLM_FL_BLOCK_GRANTED)
990                 GOTO(out, rc = 0);
991         lock = search_queue(&res->lr_converting, mode, policy, old_lock, flags);
992         if (lock != NULL)
993                 GOTO(out, rc = 1);
994         lock = search_queue(&res->lr_waiting, mode, policy, old_lock, flags);
995         if (lock != NULL)
996                 GOTO(out, rc = 1);
997
998         EXIT;
999  out:
1000         unlock_res(res);
1001         ldlm_resource_putref(res);
1002
1003         if (lock) {
1004                 ldlm_lock2handle(lock, lockh);
1005                 if ((flags & LDLM_FL_LVB_READY) && (!(lock->l_flags & LDLM_FL_LVB_READY))) {
1006                         struct l_wait_info lwi;
1007                         if (lock->l_completion_ast) {
1008                                 int err = lock->l_completion_ast(lock,
1009                                                           LDLM_FL_WAIT_NOREPROC,
1010                                                                  NULL);
1011                                 if (err) {
1012                                         if (flags & LDLM_FL_TEST_LOCK)
1013                                                 LDLM_LOCK_PUT(lock);
1014                                         else
1015                                                 ldlm_lock_decref_internal(lock, mode);
1016                                         rc = 0;
1017                                         goto out2;
1018                                 }
1019                         }
1020
1021                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout), NULL,
1022                                                LWI_ON_SIGNAL_NOOP, NULL);
1023
1024                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1025                         l_wait_event(lock->l_waitq,
1026                                      (lock->l_flags & LDLM_FL_LVB_READY), &lwi);
1027                 }
1028         }
1029  out2:
1030         if (rc) {
1031                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1032                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1033                                 res_id->name[2] : policy->l_extent.start,
1034                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1035                                 res_id->name[3] : policy->l_extent.end);
1036         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1037                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1038                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1039                                   type, mode, res_id->name[0], res_id->name[1],
1040                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1041                                         res_id->name[2] :policy->l_extent.start,
1042                                 (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1043                                         res_id->name[3] : policy->l_extent.end);
1044         }
1045         if (old_lock)
1046                 LDLM_LOCK_PUT(old_lock);
1047         if (flags & LDLM_FL_TEST_LOCK && rc)
1048                 LDLM_LOCK_PUT(lock);
1049
1050         return rc;
1051 }
1052
1053 /* Returns a referenced lock */
1054 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1055                                    struct ldlm_res_id res_id, ldlm_type_t type,
1056                                    ldlm_mode_t mode,
1057                                    ldlm_blocking_callback blocking,
1058                                    ldlm_completion_callback completion,
1059                                    ldlm_glimpse_callback glimpse,
1060                                    void *data, __u32 lvb_len)
1061 {
1062         struct ldlm_lock *lock;
1063         struct ldlm_resource *res;
1064         ENTRY;
1065
1066         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1067         if (res == NULL)
1068                 RETURN(NULL);
1069
1070         lock = ldlm_lock_new(res);
1071         ldlm_resource_putref(res);
1072
1073         if (lock == NULL)
1074                 RETURN(NULL);
1075
1076         lock->l_req_mode = mode;
1077         lock->l_ast_data = data;
1078         lock->l_blocking_ast = blocking;
1079         lock->l_completion_ast = completion;
1080         lock->l_glimpse_ast = glimpse;
1081         lock->l_pid = cfs_curproc_pid();
1082
1083         if (lvb_len) {
1084                 lock->l_lvb_len = lvb_len;
1085                 OBD_ALLOC(lock->l_lvb_data, lvb_len);
1086                 if (lock->l_lvb_data == NULL) {
1087                         OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
1088                         RETURN(NULL);
1089                 }
1090         }
1091
1092         RETURN(lock);
1093 }
1094
1095 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1096                                struct ldlm_lock **lockp,
1097                                void *cookie, int *flags)
1098 {
1099         struct ldlm_lock *lock = *lockp;
1100         struct ldlm_resource *res = lock->l_resource;
1101         int local = res->lr_namespace->ns_client;
1102         ldlm_processing_policy policy;
1103         ldlm_error_t rc = ELDLM_OK;
1104         ENTRY;
1105
1106         do_gettimeofday(&lock->l_enqueued_time);
1107         /* policies are not executed on the client or during replay */
1108         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1109             && !local && ns->ns_policy) {
1110                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1111                                    NULL);
1112                 if (rc == ELDLM_LOCK_REPLACED) {
1113                         /* The lock that was returned has already been granted,
1114                          * and placed into lockp.  If it's not the same as the
1115                          * one we passed in, then destroy the old one and our
1116                          * work here is done. */
1117                         if (lock != *lockp) {
1118                                 ldlm_lock_destroy(lock);
1119                                 LDLM_LOCK_PUT(lock);
1120                         }
1121                         *flags |= LDLM_FL_LOCK_CHANGED;
1122                         RETURN(0);
1123                 } else if (rc != ELDLM_OK ||
1124                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1125                         ldlm_lock_destroy(lock);
1126                         RETURN(rc);
1127                 }
1128         }
1129
1130         lock_res_and_lock(lock);
1131         if (local && lock->l_req_mode == lock->l_granted_mode) {
1132                 /* The server returned a blocked lock, but it was granted before
1133                  * we got a chance to actually enqueue it.  We don't need to do
1134                  * anything else. */
1135                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1136                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1137                 GOTO(out, ELDLM_OK);
1138         }
1139
1140         /* Some flags from the enqueue want to make it into the AST, via the
1141          * lock's l_flags. */
1142         lock->l_flags |= *flags & LDLM_AST_DISCARD_DATA;
1143
1144         /* This distinction between local lock trees is very important; a client
1145          * namespace only has information about locks taken by that client, and
1146          * thus doesn't have enough information to decide for itself if it can
1147          * be granted (below).  In this case, we do exactly what the server
1148          * tells us to do, as dictated by the 'flags'.
1149          *
1150          * We do exactly the same thing during recovery, when the server is
1151          * more or less trusting the clients not to lie.
1152          *
1153          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1154          * granted/converting queues. */
1155         ldlm_resource_unlink_lock(lock);
1156         if (local) {
1157                 if (*flags & LDLM_FL_BLOCK_CONV)
1158                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1159                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1160                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1161                 else
1162                         ldlm_grant_lock(lock, NULL);
1163                 GOTO(out, ELDLM_OK);
1164         } else if (*flags & LDLM_FL_REPLAY) {
1165                 if (*flags & LDLM_FL_BLOCK_CONV) {
1166                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1167                         GOTO(out, ELDLM_OK);
1168                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1169                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1170                         GOTO(out, ELDLM_OK);
1171                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1172                         ldlm_grant_lock(lock, NULL);
1173                         GOTO(out, ELDLM_OK);
1174                 }
1175                 /* If no flags, fall through to normal enqueue path. */
1176         }
1177
1178         policy = ldlm_processing_policy_table[res->lr_type];
1179         policy(lock, flags, 1, &rc, NULL);
1180         GOTO(out, rc);
1181 out:
1182         unlock_res_and_lock(lock);
1183         return rc;
1184 }
1185
1186 /* Must be called with namespace taken: queue is waiting or converting. */
1187 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
1188                          struct list_head *work_list)
1189 {
1190         struct list_head *tmp, *pos;
1191         ldlm_processing_policy policy;
1192         int flags;
1193         int rc = LDLM_ITER_CONTINUE;
1194         ldlm_error_t err;
1195         ENTRY;
1196
1197         check_res_locked(res);
1198
1199         policy = ldlm_processing_policy_table[res->lr_type];
1200         LASSERT(policy);
1201
1202         list_for_each_safe(tmp, pos, queue) {
1203                 struct ldlm_lock *pending;
1204                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
1205
1206                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1207
1208                 flags = 0;
1209                 rc = policy(pending, &flags, 0, &err, work_list);
1210                 if (rc != LDLM_ITER_CONTINUE)
1211                         break;
1212         }
1213
1214         RETURN(rc);
1215 }
1216
1217 int ldlm_run_bl_ast_work(struct list_head *rpc_list)
1218 {
1219         struct list_head *tmp, *pos;
1220         struct ldlm_lock_desc d;
1221         int rc = 0, retval = 0;
1222         ENTRY;
1223
1224         list_for_each_safe(tmp, pos, rpc_list) {
1225                 struct ldlm_lock *lock =
1226                         list_entry(tmp, struct ldlm_lock, l_bl_ast);
1227
1228                 /* nobody should touch l_bl_ast */
1229                 lock_res_and_lock(lock);
1230                 list_del_init(&lock->l_bl_ast);
1231
1232                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1233                 LASSERT(lock->l_bl_ast_run == 0);
1234                 LASSERT(lock->l_blocking_lock);
1235                 lock->l_bl_ast_run++;
1236                 unlock_res_and_lock(lock);
1237
1238                 ldlm_lock2desc(lock->l_blocking_lock, &d);
1239
1240                 LDLM_LOCK_PUT(lock->l_blocking_lock);
1241                 lock->l_blocking_lock = NULL;
1242                 rc = lock->l_blocking_ast(lock, &d, NULL, LDLM_CB_BLOCKING);
1243
1244                 if (rc == -ERESTART)
1245                         retval = rc;
1246                 else if (rc)
1247                         CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
1248                                "disconnect client\n");
1249                 LDLM_LOCK_PUT(lock);
1250         }
1251         RETURN(retval);
1252 }
1253
1254 int ldlm_run_cp_ast_work(struct list_head *rpc_list)
1255 {
1256         struct list_head *tmp, *pos;
1257         int rc = 0, retval = 0;
1258         ENTRY;
1259
1260         /* It's possible to receive a completion AST before we've set
1261          * the l_completion_ast pointer: either because the AST arrived
1262          * before the reply, or simply because there's a small race
1263          * window between receiving the reply and finishing the local
1264          * enqueue. (bug 842)
1265          *
1266          * This can't happen with the blocking_ast, however, because we
1267          * will never call the local blocking_ast until we drop our
1268          * reader/writer reference, which we won't do until we get the
1269          * reply and finish enqueueing. */
1270         
1271         list_for_each_safe(tmp, pos, rpc_list) {
1272                 struct ldlm_lock *lock =
1273                         list_entry(tmp, struct ldlm_lock, l_cp_ast);
1274
1275                 /* nobody should touch l_cp_ast */
1276                 lock_res_and_lock(lock);
1277                 list_del_init(&lock->l_cp_ast);
1278                 LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
1279                 lock->l_flags &= ~LDLM_FL_CP_REQD;
1280                 unlock_res_and_lock(lock);
1281
1282                 if (lock->l_completion_ast != NULL)
1283                         rc = lock->l_completion_ast(lock, 0, 0);
1284                 if (rc == -ERESTART)
1285                         retval = rc;
1286                 else if (rc)
1287                         CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
1288                                "disconnect client\n");
1289                 LDLM_LOCK_PUT(lock);
1290         }
1291         RETURN(retval);
1292 }
1293
1294 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
1295 {
1296         ldlm_reprocess_all(res);
1297         return LDLM_ITER_CONTINUE;
1298 }
1299
1300 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
1301 {
1302         struct list_head *tmp;
1303         int i, rc;
1304
1305         ENTRY;
1306         spin_lock(&ns->ns_hash_lock);
1307         for (i = 0; i < RES_HASH_SIZE; i++) {
1308                 tmp = ns->ns_hash[i].next;
1309                 while (tmp != &(ns->ns_hash[i])) {
1310                         struct ldlm_resource *res =
1311                                 list_entry(tmp, struct ldlm_resource, lr_hash);
1312
1313                         ldlm_resource_getref(res);
1314                         spin_unlock(&ns->ns_hash_lock);
1315
1316                         rc = reprocess_one_queue(res, NULL);
1317
1318                         spin_lock(&ns->ns_hash_lock);
1319                         tmp = tmp->next;
1320                         ldlm_resource_putref_locked(res);
1321
1322                         if (rc == LDLM_ITER_STOP)
1323                                 GOTO(out, rc);
1324                 }
1325         }
1326  out:
1327         spin_unlock(&ns->ns_hash_lock);
1328         EXIT;
1329 }
1330
1331 void ldlm_reprocess_all(struct ldlm_resource *res)
1332 {
1333         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1334         int rc;
1335         ENTRY;
1336
1337         /* Local lock trees don't get reprocessed. */
1338         if (res->lr_namespace->ns_client) {
1339                 EXIT;
1340                 return;
1341         }
1342
1343  restart:
1344         lock_res(res);
1345         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
1346         if (rc == LDLM_ITER_CONTINUE)
1347                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
1348         unlock_res(res);
1349
1350         rc = ldlm_run_cp_ast_work(&rpc_list);
1351         if (rc == -ERESTART) {
1352                 LASSERT(list_empty(&rpc_list));
1353                 goto restart;
1354         }
1355         EXIT;
1356 }
1357
1358 void ldlm_cancel_callback(struct ldlm_lock *lock)
1359 {
1360         check_res_locked(lock->l_resource);
1361         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
1362                 lock->l_flags |= LDLM_FL_CANCEL;
1363                 if (lock->l_blocking_ast) {
1364                         // l_check_no_ns_lock(ns);
1365                         unlock_res_and_lock(lock);
1366                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
1367                                              LDLM_CB_CANCELING);
1368                         lock_res_and_lock(lock);
1369                 } else {
1370                         LDLM_DEBUG(lock, "no blocking ast");
1371                 }
1372         }
1373         lock->l_flags |= LDLM_FL_BL_DONE;
1374 }
1375
1376 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
1377 {
1378         struct ldlm_lock *lock;
1379
1380         if (req->l_resource->lr_type != LDLM_PLAIN &&
1381             req->l_resource->lr_type != LDLM_IBITS)
1382                 return;
1383         
1384         if (LDLM_SL_HEAD(&req->l_sl_mode)) {
1385                 lock = list_entry(req->l_res_link.next, struct ldlm_lock,
1386                                   l_res_link);
1387                 if (req->l_sl_mode.next == &lock->l_sl_mode) {
1388                         lock->l_sl_mode.prev = NULL;
1389                 } else {
1390                         lock->l_sl_mode.next = req->l_sl_mode.next;
1391                         lock->l_sl_mode.next->prev = &lock->l_sl_mode;
1392                 }
1393                 req->l_sl_mode.next = NULL;
1394         } else if (LDLM_SL_TAIL(&req->l_sl_mode)) {
1395                 lock = list_entry(req->l_res_link.prev, struct ldlm_lock,
1396                                   l_res_link);
1397                 if (req->l_sl_mode.prev == &lock->l_sl_mode) {
1398                         lock->l_sl_mode.next = NULL;
1399                 } else {
1400                         lock->l_sl_mode.prev = req->l_sl_mode.prev;
1401                         lock->l_sl_mode.prev->next = &lock->l_sl_mode;
1402                 }
1403                 req->l_sl_mode.prev = NULL;
1404         }
1405
1406         if (LDLM_SL_HEAD(&req->l_sl_policy)) {
1407                 lock = list_entry(req->l_res_link.next, struct ldlm_lock,
1408                                   l_res_link);
1409                 if (req->l_sl_policy.next == &lock->l_sl_policy) {
1410                         lock->l_sl_policy.prev = NULL;
1411                 } else {
1412                         lock->l_sl_policy.next = req->l_sl_policy.next;
1413                         lock->l_sl_policy.next->prev = &lock->l_sl_policy;
1414                 }
1415                 req->l_sl_policy.next = NULL;
1416         } else if (LDLM_SL_TAIL(&req->l_sl_policy)) {
1417                 lock = list_entry(req->l_res_link.prev, struct ldlm_lock,
1418                                   l_res_link);
1419                 if (req->l_sl_policy.prev == &lock->l_sl_policy) {
1420                         lock->l_sl_policy.next = NULL;
1421                 } else {
1422                         lock->l_sl_policy.prev = req->l_sl_policy.prev;
1423                         lock->l_sl_policy.prev->next = &lock->l_sl_policy;
1424                 }
1425                 req->l_sl_policy.prev = NULL;
1426         }
1427 }
1428
1429 void ldlm_lock_cancel(struct ldlm_lock *lock)
1430 {
1431         struct ldlm_resource *res;
1432         struct ldlm_namespace *ns;
1433         ENTRY;
1434
1435         lock_res_and_lock(lock);
1436
1437         res = lock->l_resource;
1438         ns = res->lr_namespace;
1439
1440         /* Please do not, no matter how tempting, remove this LBUG without
1441          * talking to me first. -phik */
1442         if (lock->l_readers || lock->l_writers) {
1443                 LDLM_ERROR(lock, "lock still has references");
1444                 LBUG();
1445         }
1446
1447         ldlm_del_waiting_lock(lock);
1448
1449         /* Releases res lock */
1450         ldlm_cancel_callback(lock);
1451
1452         /* Yes, second time, just in case it was added again while we were
1453            running with no res lock in ldlm_cancel_callback */
1454         ldlm_del_waiting_lock(lock); 
1455         ldlm_resource_unlink_lock(lock);
1456         ldlm_lock_destroy_nolock(lock);
1457         unlock_res_and_lock(lock);
1458
1459         EXIT;
1460 }
1461
1462 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
1463 {
1464         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1465         ENTRY;
1466
1467         if (lock == NULL)
1468                 RETURN(-EINVAL);
1469
1470         lock->l_ast_data = data;
1471         LDLM_LOCK_PUT(lock);
1472         RETURN(0);
1473 }
1474
1475 void ldlm_cancel_locks_for_export(struct obd_export *exp)
1476 {
1477         struct ldlm_lock *lock;
1478         struct ldlm_resource *res;
1479
1480         spin_lock(&exp->exp_ldlm_data.led_lock);
1481         while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) {
1482                 lock = list_entry(exp->exp_ldlm_data.led_held_locks.next,
1483                                   struct ldlm_lock, l_export_chain);
1484                 res = ldlm_resource_getref(lock->l_resource);
1485                 LDLM_LOCK_GET(lock);
1486                 spin_unlock(&exp->exp_ldlm_data.led_lock);
1487
1488                 LDLM_DEBUG(lock, "export %p", exp);
1489                 ldlm_res_lvbo_update(res, NULL, 0, 1);
1490
1491                 ldlm_lock_cancel(lock);
1492                 ldlm_reprocess_all(res);
1493
1494                 ldlm_resource_putref(res);
1495                 LDLM_LOCK_PUT(lock);
1496                 spin_lock(&exp->exp_ldlm_data.led_lock);
1497         }
1498         spin_unlock(&exp->exp_ldlm_data.led_lock);
1499 }
1500
1501 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
1502                                         int *flags)
1503 {
1504         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1505         struct ldlm_resource *res;
1506         struct ldlm_namespace *ns;
1507         int granted = 0;
1508         int old_mode, rc;
1509         struct ldlm_lock *mark_lock = NULL;
1510         int join= LDLM_JOIN_NONE;
1511         ldlm_error_t err;
1512         ENTRY;
1513
1514         if (new_mode == lock->l_granted_mode) { // No changes? Just return.
1515                 *flags |= LDLM_FL_BLOCK_GRANTED;
1516                 RETURN(lock->l_resource);
1517         }
1518
1519         LASSERTF(new_mode == LCK_PW && lock->l_granted_mode == LCK_PR,
1520                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
1521
1522         lock_res_and_lock(lock);
1523
1524         res = lock->l_resource;
1525         ns = res->lr_namespace;
1526
1527         old_mode = lock->l_req_mode;
1528         lock->l_req_mode = new_mode;
1529         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
1530                 /* remember the lock position where the lock might be 
1531                  * added back to the granted list later and also 
1532                  * remember the join mode for skiplist fixing. */
1533                 if (LDLM_SL_HEAD(&lock->l_sl_mode))
1534                         join = LDLM_MODE_JOIN_RIGHT;
1535                 else if (LDLM_SL_TAIL(&lock->l_sl_mode))
1536                         join = LDLM_MODE_JOIN_LEFT;
1537                 if (LDLM_SL_HEAD(&lock->l_sl_policy))
1538                         join |= LDLM_POLICY_JOIN_RIGHT;
1539                 else if (LDLM_SL_TAIL(&lock->l_sl_policy))
1540                         join |= LDLM_POLICY_JOIN_LEFT;
1541
1542                 LASSERT(!((join & LDLM_MODE_JOIN_RIGHT) &&
1543                           (join & LDLM_POLICY_JOIN_LEFT)));
1544                 LASSERT(!((join & LDLM_MODE_JOIN_LEFT) &&
1545                           (join & LDLM_POLICY_JOIN_RIGHT)));
1546
1547                 if ((join & LDLM_MODE_JOIN_LEFT) ||
1548                     (join & LDLM_POLICY_JOIN_LEFT))
1549                         mark_lock = list_entry(lock->l_res_link.prev,
1550                                                struct ldlm_lock, l_res_link);
1551                 else if (lock->l_res_link.next != &res->lr_granted)
1552                         mark_lock = list_entry(lock->l_res_link.next,
1553                                                struct ldlm_lock, l_res_link);
1554         }
1555         ldlm_resource_unlink_lock(lock);
1556
1557         /* If this is a local resource, put it on the appropriate list. */
1558         if (res->lr_namespace->ns_client) {
1559                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
1560                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1561                 } else {
1562                         /* This should never happen, because of the way the
1563                          * server handles conversions. */
1564                         LDLM_ERROR(lock, "Erroneous flags %d on local lock\n",
1565                                    *flags);
1566                         LBUG();
1567
1568                         ldlm_grant_lock(lock, &rpc_list);
1569                         granted = 1;
1570                         /* FIXME: completion handling not with ns_lock held ! */
1571                         if (lock->l_completion_ast)
1572                                 lock->l_completion_ast(lock, 0, NULL);
1573                 }
1574         } else {
1575                 int pflags = 0;
1576                 ldlm_processing_policy policy;
1577                 policy = ldlm_processing_policy_table[res->lr_type];
1578                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
1579                 if (rc == LDLM_ITER_STOP) {
1580                         lock->l_req_mode = old_mode;
1581                         ldlm_granted_list_add_lock(lock, mark_lock, join);
1582                         res = NULL;
1583                 } else {
1584                         *flags |= LDLM_FL_BLOCK_GRANTED;
1585                         granted = 1;
1586                 }
1587         }
1588         unlock_res_and_lock(lock);
1589
1590         if (granted)
1591                 ldlm_run_cp_ast_work(&rpc_list);
1592         RETURN(res);
1593 }
1594
1595 void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos)
1596 {
1597         struct obd_device *obd = NULL;
1598
1599         if (!((libcfs_debug | D_ERROR) & level))
1600                 return;
1601
1602         if (!lock) {
1603                 CDEBUG(level, "  NULL LDLM lock\n");
1604                 return;
1605         }
1606
1607         CDEBUG(level," -- Lock dump: %p/"LPX64" (rc: %d) (pos: %d) (pid: %d)\n",
1608                lock, lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
1609                pos, lock->l_pid);
1610         if (lock->l_conn_export != NULL)
1611                 obd = lock->l_conn_export->exp_obd;
1612         if (lock->l_export && lock->l_export->exp_connection) {
1613                 CDEBUG(level, "  Node: NID %s (rhandle: "LPX64")\n",
1614                      libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid),
1615                      lock->l_remote_handle.cookie);
1616         } else if (obd == NULL) {
1617                 CDEBUG(level, "  Node: local\n");
1618         } else {
1619                 struct obd_import *imp = obd->u.cli.cl_import;
1620                 CDEBUG(level, "  Node: NID %s (rhandle: "LPX64")\n",
1621                        libcfs_nid2str(imp->imp_connection->c_peer.nid),
1622                        lock->l_remote_handle.cookie);
1623         }
1624         CDEBUG(level, "  Resource: %p ("LPU64"/"LPU64")\n", lock->l_resource,
1625                lock->l_resource->lr_name.name[0],
1626                lock->l_resource->lr_name.name[1]);
1627         CDEBUG(level, "  Req mode: %s, grant mode: %s, rc: %u, read: %d, "
1628                "write: %d flags: %#x\n", ldlm_lockname[lock->l_req_mode],
1629                ldlm_lockname[lock->l_granted_mode],
1630                atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers,
1631                lock->l_flags);
1632         if (lock->l_resource->lr_type == LDLM_EXTENT)
1633                 CDEBUG(level, "  Extent: "LPU64" -> "LPU64
1634                        " (req "LPU64"-"LPU64")\n",
1635                        lock->l_policy_data.l_extent.start,
1636                        lock->l_policy_data.l_extent.end,
1637                        lock->l_req_extent.start, lock->l_req_extent.end);
1638         else if (lock->l_resource->lr_type == LDLM_FLOCK)
1639                 CDEBUG(level, "  Pid: %d Extent: "LPU64" -> "LPU64"\n",
1640                        lock->l_policy_data.l_flock.pid,
1641                        lock->l_policy_data.l_flock.start,
1642                        lock->l_policy_data.l_flock.end);
1643        else if (lock->l_resource->lr_type == LDLM_IBITS)
1644                 CDEBUG(level, "  Bits: "LPX64"\n",
1645                        lock->l_policy_data.l_inodebits.bits);
1646 }
1647
1648 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
1649 {
1650         struct ldlm_lock *lock;
1651
1652         if (!((libcfs_debug | D_ERROR) & level))
1653                 return;
1654
1655         lock = ldlm_handle2lock(lockh);
1656         if (lock == NULL)
1657                 return;
1658
1659         ldlm_lock_dump(D_OTHER, lock, 0);
1660
1661         LDLM_LOCK_PUT(lock);
1662 }
1663
1664 void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
1665                       struct libcfs_debug_msg_data *data, const char *fmt,
1666                       ...)
1667 {
1668         va_list args;
1669         cfs_debug_limit_state_t *cdls = data->msg_cdls;
1670
1671         va_start(args, fmt);
1672         if (lock->l_resource == NULL) {
1673                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level, data->msg_file,
1674                                    data->msg_fn, data->msg_line, fmt, args,
1675                                    " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1676                                    "res: \?\? rrc=\?\? type: \?\?\? flags: %x remote: "
1677                                    LPX64" expref: %d pid: %u\n", lock,
1678                                    lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
1679                                    lock->l_readers, lock->l_writers,
1680                                    ldlm_lockname[lock->l_granted_mode],
1681                                    ldlm_lockname[lock->l_req_mode],
1682                                    lock->l_flags, lock->l_remote_handle.cookie,
1683                                    lock->l_export ?
1684                                         atomic_read(&lock->l_export->exp_refcount) : -99,
1685                                    lock->l_pid);
1686                 va_end(args);
1687                 return;
1688         }
1689
1690         switch (lock->l_resource->lr_type) {
1691         case LDLM_EXTENT:
1692                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level, data->msg_file,
1693                                    data->msg_fn, data->msg_line, fmt, args,
1694                                    " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1695                                    "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64
1696                                    "] (req "LPU64"->"LPU64") flags: %x remote: "LPX64
1697                                     " expref: %d pid: %u\n",
1698                                     lock->l_resource->lr_namespace->ns_name, lock,
1699                                     lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
1700                                     lock->l_readers, lock->l_writers,
1701                                     ldlm_lockname[lock->l_granted_mode],
1702                                     ldlm_lockname[lock->l_req_mode],
1703                                     lock->l_resource->lr_name.name[0],
1704                                     lock->l_resource->lr_name.name[1],
1705                                     atomic_read(&lock->l_resource->lr_refcount),
1706                                     ldlm_typename[lock->l_resource->lr_type],
1707                                     lock->l_policy_data.l_extent.start,
1708                                     lock->l_policy_data.l_extent.end,
1709                                     lock->l_req_extent.start, lock->l_req_extent.end,
1710                                     lock->l_flags, lock->l_remote_handle.cookie,
1711                                     lock->l_export ?
1712                                         atomic_read(&lock->l_export->exp_refcount) : -99,
1713                                     lock->l_pid);
1714                 break;
1715         case LDLM_FLOCK:
1716                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level, data->msg_file,
1717                                    data->msg_fn, data->msg_line, fmt, args,
1718                                    " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1719                                    "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d "
1720                                    "["LPU64"->"LPU64"] flags: %x remote: "LPX64
1721                                    " expref: %d pid: %u\n",
1722                                    lock->l_resource->lr_namespace->ns_name, lock,
1723                                    lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
1724                                    lock->l_readers, lock->l_writers,
1725                                    ldlm_lockname[lock->l_granted_mode],
1726                                    ldlm_lockname[lock->l_req_mode],
1727                                    lock->l_resource->lr_name.name[0],
1728                                    lock->l_resource->lr_name.name[1],
1729                                    atomic_read(&lock->l_resource->lr_refcount),
1730                                    ldlm_typename[lock->l_resource->lr_type],
1731                                    lock->l_policy_data.l_flock.pid,
1732                                    lock->l_policy_data.l_flock.start,
1733                                    lock->l_policy_data.l_flock.end,
1734                                    lock->l_flags, lock->l_remote_handle.cookie,
1735                                    lock->l_export ?
1736                                         atomic_read(&lock->l_export->exp_refcount) : -99,
1737                                    lock->l_pid);
1738                 break;
1739         case LDLM_IBITS:
1740                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level, data->msg_file,
1741                                    data->msg_fn, data->msg_line, fmt, args,
1742                                    " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1743                                    "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "
1744                                    "flags: %x remote: "LPX64" expref: %d "
1745                                    "pid %u\n",
1746                                    lock->l_resource->lr_namespace->ns_name,
1747                                    lock, lock->l_handle.h_cookie,
1748                                    atomic_read (&lock->l_refc),
1749                                    lock->l_readers, lock->l_writers,
1750                                    ldlm_lockname[lock->l_granted_mode],
1751                                    ldlm_lockname[lock->l_req_mode],
1752                                    lock->l_resource->lr_name.name[0],
1753                                    lock->l_resource->lr_name.name[1],
1754                                    lock->l_policy_data.l_inodebits.bits,
1755                                    atomic_read(&lock->l_resource->lr_refcount),
1756                                    ldlm_typename[lock->l_resource->lr_type],
1757                                    lock->l_flags, lock->l_remote_handle.cookie,
1758                                    lock->l_export ?
1759                                         atomic_read(&lock->l_export->exp_refcount) : -99,
1760                                    lock->l_pid);
1761                 break;
1762         default:
1763                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level, data->msg_file,
1764                                    data->msg_fn, data->msg_line, fmt, args,
1765                                    " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1766                                    "res: "LPU64"/"LPU64" rrc: %d type: %s flags: %x "
1767                                    "remote: "LPX64" expref: %d pid: %u\n",
1768                                    lock->l_resource->lr_namespace->ns_name,
1769                                    lock, lock->l_handle.h_cookie,
1770                                    atomic_read (&lock->l_refc),
1771                                    lock->l_readers, lock->l_writers,
1772                                    ldlm_lockname[lock->l_granted_mode],
1773                                    ldlm_lockname[lock->l_req_mode],
1774                                    lock->l_resource->lr_name.name[0],
1775                                    lock->l_resource->lr_name.name[1],
1776                                    atomic_read(&lock->l_resource->lr_refcount),
1777                                    ldlm_typename[lock->l_resource->lr_type],
1778                                    lock->l_flags, lock->l_remote_handle.cookie,
1779                                    lock->l_export ?
1780                                          atomic_read(&lock->l_export->exp_refcount) : -99,
1781                                    lock->l_pid);
1782                 break;
1783         }
1784         va_end(args);
1785 }
1786 EXPORT_SYMBOL(_ldlm_lock_debug);