Whamcloud - gitweb
588a8a97cf74794e26ae8106400fc0b60b31b970
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #ifdef __KERNEL__
45 # include <libcfs/libcfs.h>
46 # include <linux/lustre_intent.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include "ldlm_internal.h"
53
54 /* lock types */
55 char *ldlm_lockname[] = {
56         [0] = "--",
57         [LCK_EX] = "EX",
58         [LCK_PW] = "PW",
59         [LCK_PR] = "PR",
60         [LCK_CW] = "CW",
61         [LCK_CR] = "CR",
62         [LCK_NL] = "NL",
63         [LCK_GROUP] = "GROUP",
64         [LCK_COS] = "COS"
65 };
66 EXPORT_SYMBOL(ldlm_lockname);
67
68 char *ldlm_typename[] = {
69         [LDLM_PLAIN] = "PLN",
70         [LDLM_EXTENT] = "EXT",
71         [LDLM_FLOCK] = "FLK",
72         [LDLM_IBITS] = "IBT",
73 };
74 EXPORT_SYMBOL(ldlm_typename);
75
76 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
77         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_wire_to_local,
78         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
79         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_wire18_to_local,
80         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_wire_to_local,
81 };
82
83 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
84         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_wire_to_local,
85         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
86         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_wire21_to_local,
87         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_wire_to_local,
88 };
89
90 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
91         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_local_to_wire,
92         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_local_to_wire,
93         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_local_to_wire,
94         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_local_to_wire,
95 };
96
97 /**
98  * Converts lock policy from local format to on the wire lock_desc format
99  */
100 void ldlm_convert_policy_to_wire(ldlm_type_t type,
101                                  const ldlm_policy_data_t *lpolicy,
102                                  ldlm_wire_policy_data_t *wpolicy)
103 {
104         ldlm_policy_local_to_wire_t convert;
105
106         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
107
108         convert(lpolicy, wpolicy);
109 }
110
111 /**
112  * Converts lock policy from on the wire lock_desc format to local format
113  */
114 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
115                                   const ldlm_wire_policy_data_t *wpolicy,
116                                   ldlm_policy_data_t *lpolicy)
117 {
118         ldlm_policy_wire_to_local_t convert;
119         int new_client;
120
121         /** some badness for 2.0.0 clients, but 2.0.0 isn't supported */
122         new_client = (exp_connect_flags(exp) & OBD_CONNECT_FULL20) != 0;
123         if (new_client)
124                 convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
125         else
126                 convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
127
128         convert(wpolicy, lpolicy);
129 }
130
131 char *ldlm_it2str(int it)
132 {
133         switch (it) {
134         case IT_OPEN:
135                 return "open";
136         case IT_CREAT:
137                 return "creat";
138         case (IT_OPEN | IT_CREAT):
139                 return "open|creat";
140         case IT_READDIR:
141                 return "readdir";
142         case IT_GETATTR:
143                 return "getattr";
144         case IT_LOOKUP:
145                 return "lookup";
146         case IT_UNLINK:
147                 return "unlink";
148         case IT_GETXATTR:
149                 return "getxattr";
150         case IT_LAYOUT:
151                 return "layout";
152         default:
153                 CERROR("Unknown intent %d\n", it);
154                 return "UNKNOWN";
155         }
156 }
157 EXPORT_SYMBOL(ldlm_it2str);
158
159 extern struct kmem_cache *ldlm_lock_slab;
160
161 #ifdef HAVE_SERVER_SUPPORT
162 static ldlm_processing_policy ldlm_processing_policy_table[] = {
163         [LDLM_PLAIN]    = ldlm_process_plain_lock,
164         [LDLM_EXTENT]   = ldlm_process_extent_lock,
165 # ifdef __KERNEL__
166         [LDLM_FLOCK]    = ldlm_process_flock_lock,
167 # endif
168         [LDLM_IBITS]    = ldlm_process_inodebits_lock,
169 };
170
171 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
172 {
173         return ldlm_processing_policy_table[res->lr_type];
174 }
175 EXPORT_SYMBOL(ldlm_get_processing_policy);
176 #endif /* HAVE_SERVER_SUPPORT */
177
178 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
179 {
180         ns->ns_policy = arg;
181 }
182 EXPORT_SYMBOL(ldlm_register_intent);
183
184 /*
185  * REFCOUNTED LOCK OBJECTS
186  */
187
188
189 /**
190  * Get a reference on a lock.
191  *
192  * Lock refcounts, during creation:
193  *   - one special one for allocation, dec'd only once in destroy
194  *   - one for being a lock that's in-use
195  *   - one for the addref associated with a new lock
196  */
197 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
198 {
199         atomic_inc(&lock->l_refc);
200         return lock;
201 }
202 EXPORT_SYMBOL(ldlm_lock_get);
203
204 /**
205  * Release lock reference.
206  *
207  * Also frees the lock if it was last reference.
208  */
209 void ldlm_lock_put(struct ldlm_lock *lock)
210 {
211         ENTRY;
212
213         LASSERT(lock->l_resource != LP_POISON);
214         LASSERT(atomic_read(&lock->l_refc) > 0);
215         if (atomic_dec_and_test(&lock->l_refc)) {
216                 struct ldlm_resource *res;
217
218                 LDLM_DEBUG(lock,
219                            "final lock_put on destroyed lock, freeing it.");
220
221                 res = lock->l_resource;
222                 LASSERT(ldlm_is_destroyed(lock));
223                 LASSERT(list_empty(&lock->l_res_link));
224                 LASSERT(list_empty(&lock->l_pending_chain));
225
226                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
227                                      LDLM_NSS_LOCKS);
228                 lu_ref_del(&res->lr_reference, "lock", lock);
229                 ldlm_resource_putref(res);
230                 lock->l_resource = NULL;
231                 if (lock->l_export) {
232                         class_export_lock_put(lock->l_export, lock);
233                         lock->l_export = NULL;
234                 }
235
236                 if (lock->l_lvb_data != NULL)
237                         OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
238
239                 ldlm_interval_free(ldlm_interval_detach(lock));
240                 lu_ref_fini(&lock->l_reference);
241                 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
242         }
243
244         EXIT;
245 }
246 EXPORT_SYMBOL(ldlm_lock_put);
247
248 /**
249  * Removes LDLM lock \a lock from LRU. Assumes LRU is already locked.
250  */
251 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
252 {
253         int rc = 0;
254         if (!list_empty(&lock->l_lru)) {
255                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
256
257                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
258                 list_del_init(&lock->l_lru);
259                 LASSERT(ns->ns_nr_unused > 0);
260                 ns->ns_nr_unused--;
261                 rc = 1;
262         }
263         return rc;
264 }
265
266 /**
267  * Removes LDLM lock \a lock from LRU. Obtains the LRU lock first.
268  */
269 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
270 {
271         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
272         int rc;
273
274         ENTRY;
275         if (ldlm_is_ns_srv(lock)) {
276                 LASSERT(list_empty(&lock->l_lru));
277                 RETURN(0);
278         }
279
280         spin_lock(&ns->ns_lock);
281         rc = ldlm_lock_remove_from_lru_nolock(lock);
282         spin_unlock(&ns->ns_lock);
283         EXIT;
284         return rc;
285 }
286
287 /**
288  * Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked.
289  */
290 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
291 {
292         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
293
294         lock->l_last_used = cfs_time_current();
295         LASSERT(list_empty(&lock->l_lru));
296         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
297         list_add_tail(&lock->l_lru, &ns->ns_unused_list);
298         ldlm_clear_skipped(lock);
299         LASSERT(ns->ns_nr_unused >= 0);
300         ns->ns_nr_unused++;
301 }
302
303 /**
304  * Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks
305  * first.
306  */
307 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
308 {
309         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
310
311         ENTRY;
312         spin_lock(&ns->ns_lock);
313         ldlm_lock_add_to_lru_nolock(lock);
314         spin_unlock(&ns->ns_lock);
315         EXIT;
316 }
317
318 /**
319  * Moves LDLM lock \a lock that is already in namespace LRU to the tail of
320  * the LRU. Performs necessary LRU locking
321  */
322 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
323 {
324         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
325
326         ENTRY;
327         if (ldlm_is_ns_srv(lock)) {
328                 LASSERT(list_empty(&lock->l_lru));
329                 EXIT;
330                 return;
331         }
332
333         spin_lock(&ns->ns_lock);
334         if (!list_empty(&lock->l_lru)) {
335                 ldlm_lock_remove_from_lru_nolock(lock);
336                 ldlm_lock_add_to_lru_nolock(lock);
337         }
338         spin_unlock(&ns->ns_lock);
339         EXIT;
340 }
341
342 /**
343  * Helper to destroy a locked lock.
344  *
345  * Used by ldlm_lock_destroy and ldlm_lock_destroy_nolock
346  * Must be called with l_lock and lr_lock held.
347  *
348  * Does not actually free the lock data, but rather marks the lock as
349  * destroyed by setting l_destroyed field in the lock to 1.  Destroys a
350  * handle->lock association too, so that the lock can no longer be found
351  * and removes the lock from LRU list.  Actual lock freeing occurs when
352  * last lock reference goes away.
353  *
354  * Original comment (of some historical value):
355  * This used to have a 'strict' flag, which recovery would use to mark an
356  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
357  * shall explain why it's gone: with the new hash table scheme, once you call
358  * ldlm_lock_destroy, you can never drop your final references on this lock.
359  * Because it's not in the hash table anymore.  -phil
360  */
361 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
362 {
363         ENTRY;
364
365         if (lock->l_readers || lock->l_writers) {
366                 LDLM_ERROR(lock, "lock still has references");
367                 LBUG();
368         }
369
370         if (!list_empty(&lock->l_res_link)) {
371                 LDLM_ERROR(lock, "lock still on resource");
372                 LBUG();
373         }
374
375         if (ldlm_is_destroyed(lock)) {
376                 LASSERT(list_empty(&lock->l_lru));
377                 EXIT;
378                 return 0;
379         }
380         ldlm_set_destroyed(lock);
381
382         if (lock->l_export && lock->l_export->exp_lock_hash) {
383                 /* NB: it's safe to call cfs_hash_del() even lock isn't
384                  * in exp_lock_hash. */
385                 /* In the function below, .hs_keycmp resolves to
386                  * ldlm_export_lock_keycmp() */
387                 /* coverity[overrun-buffer-val] */
388                 cfs_hash_del(lock->l_export->exp_lock_hash,
389                              &lock->l_remote_handle, &lock->l_exp_hash);
390         }
391
392         ldlm_lock_remove_from_lru(lock);
393         class_handle_unhash(&lock->l_handle);
394
395 #if 0
396         /* Wake anyone waiting for this lock */
397         /* FIXME: I should probably add yet another flag, instead of using
398          * l_export to only call this on clients */
399         if (lock->l_export)
400                 class_export_put(lock->l_export);
401         lock->l_export = NULL;
402         if (lock->l_export && lock->l_completion_ast)
403                 lock->l_completion_ast(lock, 0);
404 #endif
405         EXIT;
406         return 1;
407 }
408
409 /**
410  * Destroys a LDLM lock \a lock. Performs necessary locking first.
411  */
412 void ldlm_lock_destroy(struct ldlm_lock *lock)
413 {
414         int first;
415         ENTRY;
416         lock_res_and_lock(lock);
417         first = ldlm_lock_destroy_internal(lock);
418         unlock_res_and_lock(lock);
419
420         /* drop reference from hashtable only for first destroy */
421         if (first) {
422                 lu_ref_del(&lock->l_reference, "hash", lock);
423                 LDLM_LOCK_RELEASE(lock);
424         }
425         EXIT;
426 }
427
428 /**
429  * Destroys a LDLM lock \a lock that is already locked.
430  */
431 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
432 {
433         int first;
434         ENTRY;
435         first = ldlm_lock_destroy_internal(lock);
436         /* drop reference from hashtable only for first destroy */
437         if (first) {
438                 lu_ref_del(&lock->l_reference, "hash", lock);
439                 LDLM_LOCK_RELEASE(lock);
440         }
441         EXIT;
442 }
443
444 /* this is called by portals_handle2object with the handle lock taken */
445 static void lock_handle_addref(void *lock)
446 {
447         LDLM_LOCK_GET((struct ldlm_lock *)lock);
448 }
449
450 static void lock_handle_free(void *lock, int size)
451 {
452         LASSERT(size == sizeof(struct ldlm_lock));
453         OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
454 }
455
456 struct portals_handle_ops lock_handle_ops = {
457         .hop_addref = lock_handle_addref,
458         .hop_free   = lock_handle_free,
459 };
460
461 /**
462  *
463  * Allocate and initialize new lock structure.
464  *
465  * usage: pass in a resource on which you have done ldlm_resource_get
466  *        new lock will take over the refcount.
467  * returns: lock with refcount 2 - one for current caller and one for remote
468  */
469 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
470 {
471         struct ldlm_lock *lock;
472         ENTRY;
473
474         if (resource == NULL)
475                 LBUG();
476
477         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, GFP_NOFS);
478         if (lock == NULL)
479                 RETURN(NULL);
480
481         spin_lock_init(&lock->l_lock);
482         lock->l_resource = resource;
483         lu_ref_add(&resource->lr_reference, "lock", lock);
484
485         atomic_set(&lock->l_refc, 2);
486         INIT_LIST_HEAD(&lock->l_res_link);
487         INIT_LIST_HEAD(&lock->l_lru);
488         INIT_LIST_HEAD(&lock->l_pending_chain);
489         INIT_LIST_HEAD(&lock->l_bl_ast);
490         INIT_LIST_HEAD(&lock->l_cp_ast);
491         INIT_LIST_HEAD(&lock->l_rk_ast);
492         init_waitqueue_head(&lock->l_waitq);
493         lock->l_blocking_lock = NULL;
494         INIT_LIST_HEAD(&lock->l_sl_mode);
495         INIT_LIST_HEAD(&lock->l_sl_policy);
496         INIT_HLIST_NODE(&lock->l_exp_hash);
497         INIT_HLIST_NODE(&lock->l_exp_flock_hash);
498
499         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
500                              LDLM_NSS_LOCKS);
501         INIT_LIST_HEAD(&lock->l_handle.h_link);
502         class_handle_hash(&lock->l_handle, &lock_handle_ops);
503
504         lu_ref_init(&lock->l_reference);
505         lu_ref_add(&lock->l_reference, "hash", lock);
506         lock->l_callback_timeout = 0;
507
508 #if LUSTRE_TRACKS_LOCK_EXP_REFS
509         INIT_LIST_HEAD(&lock->l_exp_refs_link);
510         lock->l_exp_refs_nr = 0;
511         lock->l_exp_refs_target = NULL;
512 #endif
513         INIT_LIST_HEAD(&lock->l_exp_list);
514
515         RETURN(lock);
516 }
517
518 /**
519  * Moves LDLM lock \a lock to another resource.
520  * This is used on client when server returns some other lock than requested
521  * (typically as a result of intent operation)
522  */
523 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
524                               const struct ldlm_res_id *new_resid)
525 {
526         struct ldlm_resource *oldres = lock->l_resource;
527         struct ldlm_resource *newres;
528         int type;
529         ENTRY;
530
531         LASSERT(ns_is_client(ns));
532
533         lock_res_and_lock(lock);
534         if (memcmp(new_resid, &lock->l_resource->lr_name,
535                    sizeof(lock->l_resource->lr_name)) == 0) {
536                 /* Nothing to do */
537                 unlock_res_and_lock(lock);
538                 RETURN(0);
539         }
540
541         LASSERT(new_resid->name[0] != 0);
542
543         /* This function assumes that the lock isn't on any lists */
544         LASSERT(list_empty(&lock->l_res_link));
545
546         type = oldres->lr_type;
547         unlock_res_and_lock(lock);
548
549         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
550         if (IS_ERR(newres))
551                 RETURN(PTR_ERR(newres));
552
553         lu_ref_add(&newres->lr_reference, "lock", lock);
554         /*
555          * To flip the lock from the old to the new resource, lock, oldres and
556          * newres have to be locked. Resource spin-locks are nested within
557          * lock->l_lock, and are taken in the memory address order to avoid
558          * dead-locks.
559          */
560         spin_lock(&lock->l_lock);
561         oldres = lock->l_resource;
562         if (oldres < newres) {
563                 lock_res(oldres);
564                 lock_res_nested(newres, LRT_NEW);
565         } else {
566                 lock_res(newres);
567                 lock_res_nested(oldres, LRT_NEW);
568         }
569         LASSERT(memcmp(new_resid, &oldres->lr_name,
570                        sizeof oldres->lr_name) != 0);
571         lock->l_resource = newres;
572         unlock_res(oldres);
573         unlock_res_and_lock(lock);
574
575         /* ...and the flowers are still standing! */
576         lu_ref_del(&oldres->lr_reference, "lock", lock);
577         ldlm_resource_putref(oldres);
578
579         RETURN(0);
580 }
581 EXPORT_SYMBOL(ldlm_lock_change_resource);
582
583 /** \defgroup ldlm_handles LDLM HANDLES
584  * Ways to get hold of locks without any addresses.
585  * @{
586  */
587
588 /**
589  * Fills in handle for LDLM lock \a lock into supplied \a lockh
590  * Does not take any references.
591  */
592 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
593 {
594         lockh->cookie = lock->l_handle.h_cookie;
595 }
596 EXPORT_SYMBOL(ldlm_lock2handle);
597
598 /**
599  * Obtain a lock reference by handle.
600  *
601  * if \a flags: atomically get the lock and set the flags.
602  *              Return NULL if flag already set
603  */
604 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
605                                      __u64 flags)
606 {
607         struct ldlm_lock *lock;
608         ENTRY;
609
610         LASSERT(handle);
611
612         lock = class_handle2object(handle->cookie, NULL);
613         if (lock == NULL)
614                 RETURN(NULL);
615
616         /* It's unlikely but possible that someone marked the lock as
617          * destroyed after we did handle2object on it */
618         if ((flags == 0) && !ldlm_is_destroyed(lock)) {
619                 lu_ref_add(&lock->l_reference, "handle", current);
620                 RETURN(lock);
621         }
622
623         lock_res_and_lock(lock);
624
625         LASSERT(lock->l_resource != NULL);
626
627         lu_ref_add_atomic(&lock->l_reference, "handle", current);
628         if (unlikely(ldlm_is_destroyed(lock))) {
629                 unlock_res_and_lock(lock);
630                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
631                 LDLM_LOCK_PUT(lock);
632                 RETURN(NULL);
633         }
634
635         /* If we're setting flags, make sure none of them are already set. */
636         if (flags != 0) {
637                 if ((lock->l_flags & flags) != 0) {
638                         unlock_res_and_lock(lock);
639                         LDLM_LOCK_PUT(lock);
640                         RETURN(NULL);
641                 }
642
643                 lock->l_flags |= flags;
644         }
645
646         unlock_res_and_lock(lock);
647         RETURN(lock);
648 }
649 EXPORT_SYMBOL(__ldlm_handle2lock);
650 /** @} ldlm_handles */
651
652 /**
653  * Fill in "on the wire" representation for given LDLM lock into supplied
654  * lock descriptor \a desc structure.
655  */
656 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
657 {
658         ldlm_res2desc(lock->l_resource, &desc->l_resource);
659         desc->l_req_mode = lock->l_req_mode;
660         desc->l_granted_mode = lock->l_granted_mode;
661         ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
662                                     &lock->l_policy_data,
663                                     &desc->l_policy_data);
664 }
665 EXPORT_SYMBOL(ldlm_lock2desc);
666
667 /**
668  * Add a lock to list of conflicting locks to send AST to.
669  *
670  * Only add if we have not sent a blocking AST to the lock yet.
671  */
672 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
673                            struct list_head *work_list)
674 {
675         if (!ldlm_is_ast_sent(lock)) {
676                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
677                 ldlm_set_ast_sent(lock);
678                 /* If the enqueuing client said so, tell the AST recipient to
679                  * discard dirty data, rather than writing back. */
680                 if (ldlm_is_ast_discard_data(new))
681                         ldlm_set_discard_data(lock);
682                 LASSERT(list_empty(&lock->l_bl_ast));
683                 list_add(&lock->l_bl_ast, work_list);
684                 LDLM_LOCK_GET(lock);
685                 LASSERT(lock->l_blocking_lock == NULL);
686                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
687         }
688 }
689
690 /**
691  * Add a lock to list of just granted locks to send completion AST to.
692  */
693 void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list)
694 {
695         if (!ldlm_is_cp_reqd(lock)) {
696                 ldlm_set_cp_reqd(lock);
697                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
698                 LASSERT(list_empty(&lock->l_cp_ast));
699                 list_add(&lock->l_cp_ast, work_list);
700                 LDLM_LOCK_GET(lock);
701         }
702 }
703
704 /**
705  * Aggregator function to add AST work items into a list. Determines
706  * what sort of an AST work needs to be done and calls the proper
707  * adding function.
708  * Must be called with lr_lock held.
709  */
710 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
711                             struct list_head *work_list)
712 {
713         ENTRY;
714         check_res_locked(lock->l_resource);
715         if (new)
716                 ldlm_add_bl_work_item(lock, new, work_list);
717         else
718                 ldlm_add_cp_work_item(lock, work_list);
719         EXIT;
720 }
721
722 /**
723  * Add specified reader/writer reference to LDLM lock with handle \a lockh.
724  * r/w reference type is determined by \a mode
725  * Calls ldlm_lock_addref_internal.
726  */
727 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
728 {
729         struct ldlm_lock *lock;
730
731         lock = ldlm_handle2lock(lockh);
732         LASSERT(lock != NULL);
733         ldlm_lock_addref_internal(lock, mode);
734         LDLM_LOCK_PUT(lock);
735 }
736 EXPORT_SYMBOL(ldlm_lock_addref);
737
738 /**
739  * Helper function.
740  * Add specified reader/writer reference to LDLM lock \a lock.
741  * r/w reference type is determined by \a mode
742  * Removes lock from LRU if it is there.
743  * Assumes the LDLM lock is already locked.
744  */
745 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
746 {
747         ldlm_lock_remove_from_lru(lock);
748         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
749                 lock->l_readers++;
750                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
751         }
752         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
753                 lock->l_writers++;
754                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
755         }
756         LDLM_LOCK_GET(lock);
757         lu_ref_add_atomic(&lock->l_reference, "user", lock);
758         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
759 }
760
761 /**
762  * Attempts to add reader/writer reference to a lock with handle \a lockh, and
763  * fails if lock is already LDLM_FL_CBPENDING or destroyed.
764  *
765  * \retval 0 success, lock was addref-ed
766  *
767  * \retval -EAGAIN lock is being canceled.
768  */
769 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
770 {
771         struct ldlm_lock *lock;
772         int               result;
773
774         result = -EAGAIN;
775         lock = ldlm_handle2lock(lockh);
776         if (lock != NULL) {
777                 lock_res_and_lock(lock);
778                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
779                     !ldlm_is_cbpending(lock)) {
780                         ldlm_lock_addref_internal_nolock(lock, mode);
781                         result = 0;
782                 }
783                 unlock_res_and_lock(lock);
784                 LDLM_LOCK_PUT(lock);
785         }
786         return result;
787 }
788 EXPORT_SYMBOL(ldlm_lock_addref_try);
789
790 /**
791  * Add specified reader/writer reference to LDLM lock \a lock.
792  * Locks LDLM lock and calls ldlm_lock_addref_internal_nolock to do the work.
793  * Only called for local locks.
794  */
795 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
796 {
797         lock_res_and_lock(lock);
798         ldlm_lock_addref_internal_nolock(lock, mode);
799         unlock_res_and_lock(lock);
800 }
801
802 /**
803  * Removes reader/writer reference for LDLM lock \a lock.
804  * Assumes LDLM lock is already locked.
805  * only called in ldlm_flock_destroy and for local locks.
806  * Does NOT add lock to LRU if no r/w references left to accomodate flock locks
807  * that cannot be placed in LRU.
808  */
809 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
810 {
811         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
812         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
813                 LASSERT(lock->l_readers > 0);
814                 lu_ref_del(&lock->l_reference, "reader", lock);
815                 lock->l_readers--;
816         }
817         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
818                 LASSERT(lock->l_writers > 0);
819                 lu_ref_del(&lock->l_reference, "writer", lock);
820                 lock->l_writers--;
821         }
822
823         lu_ref_del(&lock->l_reference, "user", lock);
824         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
825 }
826
827 /**
828  * Removes reader/writer reference for LDLM lock \a lock.
829  * Locks LDLM lock first.
830  * If the lock is determined to be client lock on a client and r/w refcount
831  * drops to zero and the lock is not blocked, the lock is added to LRU lock
832  * on the namespace.
833  * For blocked LDLM locks if r/w count drops to zero, blocking_ast is called.
834  */
835 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
836 {
837         struct ldlm_namespace *ns;
838         ENTRY;
839
840         lock_res_and_lock(lock);
841
842         ns = ldlm_lock_to_ns(lock);
843
844         ldlm_lock_decref_internal_nolock(lock, mode);
845
846         if (ldlm_is_local(lock) &&
847             !lock->l_readers && !lock->l_writers) {
848                 /* If this is a local lock on a server namespace and this was
849                  * the last reference, cancel the lock. */
850                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
851                 ldlm_set_cbpending(lock);
852         }
853
854         if (!lock->l_readers && !lock->l_writers &&
855             ldlm_is_cbpending(lock)) {
856                 /* If we received a blocked AST and this was the last reference,
857                  * run the callback. */
858                 if (ldlm_is_ns_srv(lock) && lock->l_export)
859                         CERROR("FL_CBPENDING set on non-local lock--just a "
860                                "warning\n");
861
862                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
863
864                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
865                 ldlm_lock_remove_from_lru(lock);
866                 unlock_res_and_lock(lock);
867
868                 if (ldlm_is_fail_loc(lock))
869                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
870
871                 if (ldlm_is_atomic_cb(lock) ||
872                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
873                         ldlm_handle_bl_callback(ns, NULL, lock);
874         } else if (ns_is_client(ns) &&
875                    !lock->l_readers && !lock->l_writers &&
876                    !ldlm_is_no_lru(lock) &&
877                    !ldlm_is_bl_ast(lock)) {
878
879                 LDLM_DEBUG(lock, "add lock into lru list");
880
881                 /* If this is a client-side namespace and this was the last
882                  * reference, put it on the LRU. */
883                 ldlm_lock_add_to_lru(lock);
884                 unlock_res_and_lock(lock);
885
886                 if (ldlm_is_fail_loc(lock))
887                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
888
889                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
890                  * are not supported by the server, otherwise, it is done on
891                  * enqueue. */
892                 if (!exp_connect_cancelset(lock->l_conn_export) &&
893                     !ns_connect_lru_resize(ns))
894                         ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
895         } else {
896                 LDLM_DEBUG(lock, "do not add lock into lru list");
897                 unlock_res_and_lock(lock);
898         }
899
900         EXIT;
901 }
902
903 /**
904  * Decrease reader/writer refcount for LDLM lock with handle \a lockh
905  */
906 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
907 {
908         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
909         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
910         ldlm_lock_decref_internal(lock, mode);
911         LDLM_LOCK_PUT(lock);
912 }
913 EXPORT_SYMBOL(ldlm_lock_decref);
914
915 /**
916  * Decrease reader/writer refcount for LDLM lock with handle
917  * \a lockh and mark it for subsequent cancellation once r/w refcount
918  * drops to zero instead of putting into LRU.
919  *
920  * Typical usage is for GROUP locks which we cannot allow to be cached.
921  */
922 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
923 {
924         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
925         ENTRY;
926
927         LASSERT(lock != NULL);
928
929         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
930         lock_res_and_lock(lock);
931         ldlm_set_cbpending(lock);
932         unlock_res_and_lock(lock);
933         ldlm_lock_decref_internal(lock, mode);
934         LDLM_LOCK_PUT(lock);
935 }
936 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
937
938 struct sl_insert_point {
939         struct list_head *res_link;
940         struct list_head *mode_link;
941         struct list_head *policy_link;
942 };
943
944 /**
945  * Finds a position to insert the new lock into granted lock list.
946  *
947  * Used for locks eligible for skiplist optimization.
948  *
949  * Parameters:
950  *      queue [input]:  the granted list where search acts on;
951  *      req [input]:    the lock whose position to be located;
952  *      prev [output]:  positions within 3 lists to insert @req to
953  * Return Value:
954  *      filled @prev
955  * NOTE: called by
956  *  - ldlm_grant_lock_with_skiplist
957  */
958 static void search_granted_lock(struct list_head *queue,
959                                 struct ldlm_lock *req,
960                                 struct sl_insert_point *prev)
961 {
962         struct list_head *tmp;
963         struct ldlm_lock *lock, *mode_end, *policy_end;
964         ENTRY;
965
966         list_for_each(tmp, queue) {
967                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
968
969                 mode_end = list_entry(lock->l_sl_mode.prev,
970                                           struct ldlm_lock, l_sl_mode);
971
972                 if (lock->l_req_mode != req->l_req_mode) {
973                         /* jump to last lock of mode group */
974                         tmp = &mode_end->l_res_link;
975                         continue;
976                 }
977
978                 /* suitable mode group is found */
979                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
980                         /* insert point is last lock of the mode group */
981                         prev->res_link = &mode_end->l_res_link;
982                         prev->mode_link = &mode_end->l_sl_mode;
983                         prev->policy_link = &req->l_sl_policy;
984                         EXIT;
985                         return;
986                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
987                         for (;;) {
988                                 policy_end =
989                                         list_entry(lock->l_sl_policy.prev,
990                                                        struct ldlm_lock,
991                                                        l_sl_policy);
992
993                                 if (lock->l_policy_data.l_inodebits.bits ==
994                                     req->l_policy_data.l_inodebits.bits) {
995                                         /* insert point is last lock of
996                                          * the policy group */
997                                         prev->res_link =
998                                                 &policy_end->l_res_link;
999                                         prev->mode_link =
1000                                                 &policy_end->l_sl_mode;
1001                                         prev->policy_link =
1002                                                 &policy_end->l_sl_policy;
1003                                         EXIT;
1004                                         return;
1005                                 }
1006
1007                                 if (policy_end == mode_end)
1008                                         /* done with mode group */
1009                                         break;
1010
1011                                 /* go to next policy group within mode group */
1012                                 tmp = policy_end->l_res_link.next;
1013                                 lock = list_entry(tmp, struct ldlm_lock,
1014                                                       l_res_link);
1015                         }  /* loop over policy groups within the mode group */
1016
1017                         /* insert point is last lock of the mode group,
1018                          * new policy group is started */
1019                         prev->res_link = &mode_end->l_res_link;
1020                         prev->mode_link = &mode_end->l_sl_mode;
1021                         prev->policy_link = &req->l_sl_policy;
1022                         EXIT;
1023                         return;
1024                 } else {
1025                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
1026                         LBUG();
1027                 }
1028         }
1029
1030         /* insert point is last lock on the queue,
1031          * new mode group and new policy group are started */
1032         prev->res_link = queue->prev;
1033         prev->mode_link = &req->l_sl_mode;
1034         prev->policy_link = &req->l_sl_policy;
1035         EXIT;
1036         return;
1037 }
1038
1039 /**
1040  * Add a lock into resource granted list after a position described by
1041  * \a prev.
1042  */
1043 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
1044                                        struct sl_insert_point *prev)
1045 {
1046         struct ldlm_resource *res = lock->l_resource;
1047         ENTRY;
1048
1049         check_res_locked(res);
1050
1051         ldlm_resource_dump(D_INFO, res);
1052         LDLM_DEBUG(lock, "About to add lock:");
1053
1054         if (ldlm_is_destroyed(lock)) {
1055                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
1056                 return;
1057         }
1058
1059         LASSERT(list_empty(&lock->l_res_link));
1060         LASSERT(list_empty(&lock->l_sl_mode));
1061         LASSERT(list_empty(&lock->l_sl_policy));
1062
1063         /*
1064          * lock->link == prev->link means lock is first starting the group.
1065          * Don't re-add to itself to suppress kernel warnings.
1066          */
1067         if (&lock->l_res_link != prev->res_link)
1068                 list_add(&lock->l_res_link, prev->res_link);
1069         if (&lock->l_sl_mode != prev->mode_link)
1070                 list_add(&lock->l_sl_mode, prev->mode_link);
1071         if (&lock->l_sl_policy != prev->policy_link)
1072                 list_add(&lock->l_sl_policy, prev->policy_link);
1073
1074         EXIT;
1075 }
1076
1077 /**
1078  * Add a lock to granted list on a resource maintaining skiplist
1079  * correctness.
1080  */
1081 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
1082 {
1083         struct sl_insert_point prev;
1084         ENTRY;
1085
1086         LASSERT(lock->l_req_mode == lock->l_granted_mode);
1087
1088         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
1089         ldlm_granted_list_add_lock(lock, &prev);
1090         EXIT;
1091 }
1092
1093 /**
1094  * Perform lock granting bookkeeping.
1095  *
1096  * Includes putting the lock into granted list and updating lock mode.
1097  * NOTE: called by
1098  *  - ldlm_lock_enqueue
1099  *  - ldlm_reprocess_queue
1100  *  - ldlm_lock_convert
1101  *
1102  * must be called with lr_lock held
1103  */
1104 void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
1105 {
1106         struct ldlm_resource *res = lock->l_resource;
1107         ENTRY;
1108
1109         check_res_locked(res);
1110
1111         lock->l_granted_mode = lock->l_req_mode;
1112
1113         if (work_list && lock->l_completion_ast != NULL)
1114                 ldlm_add_ast_work_item(lock, NULL, work_list);
1115
1116         /* We should not add locks to granted list in the following cases:
1117          * - this is an UNLOCK but not a real lock;
1118          * - this is a TEST lock;
1119          * - this is a F_CANCELLK lock (async flock has req_mode == 0)
1120          * - this is a deadlock (flock cannot be granted) */
1121         if (lock->l_req_mode == 0 ||
1122             lock->l_req_mode == LCK_NL ||
1123             ldlm_is_test_lock(lock) ||
1124             ldlm_is_flock_deadlock(lock))
1125                 RETURN_EXIT;
1126
1127         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1128                 ldlm_grant_lock_with_skiplist(lock);
1129         else if (res->lr_type == LDLM_EXTENT)
1130                 ldlm_extent_add_lock(res, lock);
1131         else
1132                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1133
1134         if (lock->l_granted_mode < res->lr_most_restr)
1135                 res->lr_most_restr = lock->l_granted_mode;
1136
1137         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1138         EXIT;
1139 }
1140
1141 /**
1142  * Search for a lock with given properties in a queue.
1143  *
1144  * \retval a referenced lock or NULL.  See the flag descriptions below, in the
1145  * comment above ldlm_lock_match
1146  */
1147 static struct ldlm_lock *search_queue(struct list_head *queue,
1148                                       ldlm_mode_t *mode,
1149                                       ldlm_policy_data_t *policy,
1150                                       struct ldlm_lock *old_lock,
1151                                       __u64 flags, int unref)
1152 {
1153         struct ldlm_lock *lock;
1154         struct list_head       *tmp;
1155
1156         list_for_each(tmp, queue) {
1157                 ldlm_mode_t match;
1158
1159                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1160
1161                 if (lock == old_lock)
1162                         break;
1163
1164                 /* Check if this lock can be matched.
1165                  * Used by LU-2919(exclusive open) for open lease lock */
1166                 if (ldlm_is_excl(lock))
1167                         continue;
1168
1169                 /* llite sometimes wants to match locks that will be
1170                  * canceled when their users drop, but we allow it to match
1171                  * if it passes in CBPENDING and the lock still has users.
1172                  * this is generally only going to be used by children
1173                  * whose parents already hold a lock so forward progress
1174                  * can still happen. */
1175                 if (ldlm_is_cbpending(lock) &&
1176                     !(flags & LDLM_FL_CBPENDING))
1177                         continue;
1178                 if (!unref && ldlm_is_cbpending(lock) &&
1179                     lock->l_readers == 0 && lock->l_writers == 0)
1180                         continue;
1181
1182                 if (!(lock->l_req_mode & *mode))
1183                         continue;
1184                 match = lock->l_req_mode;
1185
1186                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
1187                     (lock->l_policy_data.l_extent.start >
1188                      policy->l_extent.start ||
1189                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
1190                         continue;
1191
1192                 if (unlikely(match == LCK_GROUP) &&
1193                     lock->l_resource->lr_type == LDLM_EXTENT &&
1194                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1195                         continue;
1196
1197                 /* We match if we have existing lock with same or wider set
1198                    of bits. */
1199                 if (lock->l_resource->lr_type == LDLM_IBITS &&
1200                      ((lock->l_policy_data.l_inodebits.bits &
1201                       policy->l_inodebits.bits) !=
1202                       policy->l_inodebits.bits))
1203                         continue;
1204
1205                 if (!unref && LDLM_HAVE_MASK(lock, GONE))
1206                         continue;
1207
1208                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
1209                     !ldlm_is_local(lock))
1210                         continue;
1211
1212                 if (flags & LDLM_FL_TEST_LOCK) {
1213                         LDLM_LOCK_GET(lock);
1214                         ldlm_lock_touch_in_lru(lock);
1215                 } else {
1216                         ldlm_lock_addref_internal_nolock(lock, match);
1217                 }
1218                 *mode = match;
1219                 return lock;
1220         }
1221
1222         return NULL;
1223 }
1224
1225 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1226 {
1227         if ((lock->l_flags & LDLM_FL_FAIL_NOTIFIED) == 0) {
1228                 lock->l_flags |= LDLM_FL_FAIL_NOTIFIED;
1229                 wake_up_all(&lock->l_waitq);
1230         }
1231 }
1232 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1233
1234 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1235 {
1236         lock_res_and_lock(lock);
1237         ldlm_lock_fail_match_locked(lock);
1238         unlock_res_and_lock(lock);
1239 }
1240 EXPORT_SYMBOL(ldlm_lock_fail_match);
1241
1242 /**
1243  * Mark lock as "matchable" by OST.
1244  *
1245  * Used to prevent certain races in LOV/OSC where the lock is granted, but LVB
1246  * is not yet valid.
1247  * Assumes LDLM lock is already locked.
1248  */
1249 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1250 {
1251         ldlm_set_lvb_ready(lock);
1252         wake_up_all(&lock->l_waitq);
1253 }
1254 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
1255
1256 /**
1257  * Mark lock as "matchable" by OST.
1258  * Locks the lock and then \see ldlm_lock_allow_match_locked
1259  */
1260 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1261 {
1262         lock_res_and_lock(lock);
1263         ldlm_lock_allow_match_locked(lock);
1264         unlock_res_and_lock(lock);
1265 }
1266 EXPORT_SYMBOL(ldlm_lock_allow_match);
1267
1268 /**
1269  * Attempt to find a lock with specified properties.
1270  *
1271  * Typically returns a reference to matched lock unless LDLM_FL_TEST_LOCK is
1272  * set in \a flags
1273  *
1274  * Can be called in two ways:
1275  *
1276  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1277  * for a duplicate of.
1278  *
1279  * Otherwise, all of the fields must be filled in, to match against.
1280  *
1281  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1282  *     server (ie, connh is NULL)
1283  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1284  *     list will be considered
1285  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1286  *     to be canceled can still be matched as long as they still have reader
1287  *     or writer refernces
1288  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1289  *     just tell us if we would have matched.
1290  *
1291  * \retval 1 if it finds an already-existing lock that is compatible; in this
1292  * case, lockh is filled in with a addref()ed lock
1293  *
1294  * We also check security context, and if that fails we simply return 0 (to
1295  * keep caller code unchanged), the context failure will be discovered by
1296  * caller sometime later.
1297  */
1298 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
1299                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1300                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1301                             struct lustre_handle *lockh, int unref)
1302 {
1303         struct ldlm_resource *res;
1304         struct ldlm_lock *lock, *old_lock = NULL;
1305         int rc = 0;
1306         ENTRY;
1307
1308         if (ns == NULL) {
1309                 old_lock = ldlm_handle2lock(lockh);
1310                 LASSERT(old_lock);
1311
1312                 ns = ldlm_lock_to_ns(old_lock);
1313                 res_id = &old_lock->l_resource->lr_name;
1314                 type = old_lock->l_resource->lr_type;
1315                 mode = old_lock->l_req_mode;
1316         }
1317
1318         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1319         if (IS_ERR(res)) {
1320                 LASSERT(old_lock == NULL);
1321                 RETURN(0);
1322         }
1323
1324         LDLM_RESOURCE_ADDREF(res);
1325         lock_res(res);
1326
1327         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1328                             flags, unref);
1329         if (lock != NULL)
1330                 GOTO(out, rc = 1);
1331         if (flags & LDLM_FL_BLOCK_GRANTED)
1332                 GOTO(out, rc = 0);
1333         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1334                             flags, unref);
1335         if (lock != NULL)
1336                 GOTO(out, rc = 1);
1337         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1338                             flags, unref);
1339         if (lock != NULL)
1340                 GOTO(out, rc = 1);
1341
1342         EXIT;
1343  out:
1344         unlock_res(res);
1345         LDLM_RESOURCE_DELREF(res);
1346         ldlm_resource_putref(res);
1347
1348         if (lock) {
1349                 ldlm_lock2handle(lock, lockh);
1350                 if ((flags & LDLM_FL_LVB_READY) &&
1351                     (!ldlm_is_lvb_ready(lock))) {
1352                         __u64 wait_flags = LDLM_FL_LVB_READY |
1353                                 LDLM_FL_DESTROYED | LDLM_FL_FAIL_NOTIFIED;
1354                         struct l_wait_info lwi;
1355                         if (lock->l_completion_ast) {
1356                                 int err = lock->l_completion_ast(lock,
1357                                                           LDLM_FL_WAIT_NOREPROC,
1358                                                                  NULL);
1359                                 if (err) {
1360                                         if (flags & LDLM_FL_TEST_LOCK)
1361                                                 LDLM_LOCK_RELEASE(lock);
1362                                         else
1363                                                 ldlm_lock_decref_internal(lock,
1364                                                                           mode);
1365                                         rc = 0;
1366                                         goto out2;
1367                                 }
1368                         }
1369
1370                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1371                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1372
1373                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1374                         l_wait_event(lock->l_waitq,
1375                                      lock->l_flags & wait_flags,
1376                                      &lwi);
1377                         if (!ldlm_is_lvb_ready(lock)) {
1378                                 if (flags & LDLM_FL_TEST_LOCK)
1379                                         LDLM_LOCK_RELEASE(lock);
1380                                 else
1381                                         ldlm_lock_decref_internal(lock, mode);
1382                                 rc = 0;
1383                         }
1384                 }
1385         }
1386  out2:
1387         if (rc) {
1388                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1389                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1390                                 res_id->name[2] : policy->l_extent.start,
1391                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1392                                 res_id->name[3] : policy->l_extent.end);
1393
1394                 /* check user's security context */
1395                 if (lock->l_conn_export &&
1396                     sptlrpc_import_check_ctx(
1397                                 class_exp2cliimp(lock->l_conn_export))) {
1398                         if (!(flags & LDLM_FL_TEST_LOCK))
1399                                 ldlm_lock_decref_internal(lock, mode);
1400                         rc = 0;
1401                 }
1402
1403                 if (flags & LDLM_FL_TEST_LOCK)
1404                         LDLM_LOCK_RELEASE(lock);
1405
1406         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1407                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1408                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1409                                   type, mode, res_id->name[0], res_id->name[1],
1410                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1411                                         res_id->name[2] :policy->l_extent.start,
1412                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1413                                         res_id->name[3] : policy->l_extent.end);
1414         }
1415         if (old_lock)
1416                 LDLM_LOCK_PUT(old_lock);
1417
1418         return rc ? mode : 0;
1419 }
1420 EXPORT_SYMBOL(ldlm_lock_match);
1421
1422 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1423                                         __u64 *bits)
1424 {
1425         struct ldlm_lock *lock;
1426         ldlm_mode_t mode = 0;
1427         ENTRY;
1428
1429         lock = ldlm_handle2lock(lockh);
1430         if (lock != NULL) {
1431                 lock_res_and_lock(lock);
1432                 if (LDLM_HAVE_MASK(lock, GONE))
1433                         GOTO(out, mode);
1434
1435                 if (ldlm_is_cbpending(lock) &&
1436                     lock->l_readers == 0 && lock->l_writers == 0)
1437                         GOTO(out, mode);
1438
1439                 if (bits)
1440                         *bits = lock->l_policy_data.l_inodebits.bits;
1441                 mode = lock->l_granted_mode;
1442                 ldlm_lock_addref_internal_nolock(lock, mode);
1443         }
1444
1445         EXIT;
1446
1447 out:
1448         if (lock != NULL) {
1449                 unlock_res_and_lock(lock);
1450                 LDLM_LOCK_PUT(lock);
1451         }
1452         return mode;
1453 }
1454 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1455
1456 /** The caller must guarantee that the buffer is large enough. */
1457 int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
1458                   enum req_location loc, void *data, int size)
1459 {
1460         void *lvb;
1461         ENTRY;
1462
1463         LASSERT(data != NULL);
1464         LASSERT(size >= 0);
1465
1466         switch (lock->l_lvb_type) {
1467         case LVB_T_OST:
1468                 if (size == sizeof(struct ost_lvb)) {
1469                         if (loc == RCL_CLIENT)
1470                                 lvb = req_capsule_client_swab_get(pill,
1471                                                 &RMF_DLM_LVB,
1472                                                 lustre_swab_ost_lvb);
1473                         else
1474                                 lvb = req_capsule_server_swab_get(pill,
1475                                                 &RMF_DLM_LVB,
1476                                                 lustre_swab_ost_lvb);
1477                         if (unlikely(lvb == NULL)) {
1478                                 LDLM_ERROR(lock, "no LVB");
1479                                 RETURN(-EPROTO);
1480                         }
1481
1482                         memcpy(data, lvb, size);
1483                 } else if (size == sizeof(struct ost_lvb_v1)) {
1484                         struct ost_lvb *olvb = data;
1485
1486                         if (loc == RCL_CLIENT)
1487                                 lvb = req_capsule_client_swab_get(pill,
1488                                                 &RMF_DLM_LVB,
1489                                                 lustre_swab_ost_lvb_v1);
1490                         else
1491                                 lvb = req_capsule_server_sized_swab_get(pill,
1492                                                 &RMF_DLM_LVB, size,
1493                                                 lustre_swab_ost_lvb_v1);
1494                         if (unlikely(lvb == NULL)) {
1495                                 LDLM_ERROR(lock, "no LVB");
1496                                 RETURN(-EPROTO);
1497                         }
1498
1499                         memcpy(data, lvb, size);
1500                         olvb->lvb_mtime_ns = 0;
1501                         olvb->lvb_atime_ns = 0;
1502                         olvb->lvb_ctime_ns = 0;
1503                 } else {
1504                         LDLM_ERROR(lock, "Replied unexpected ost LVB size %d",
1505                                    size);
1506                         RETURN(-EINVAL);
1507                 }
1508                 break;
1509         case LVB_T_LQUOTA:
1510                 if (size == sizeof(struct lquota_lvb)) {
1511                         if (loc == RCL_CLIENT)
1512                                 lvb = req_capsule_client_swab_get(pill,
1513                                                 &RMF_DLM_LVB,
1514                                                 lustre_swab_lquota_lvb);
1515                         else
1516                                 lvb = req_capsule_server_swab_get(pill,
1517                                                 &RMF_DLM_LVB,
1518                                                 lustre_swab_lquota_lvb);
1519                         if (unlikely(lvb == NULL)) {
1520                                 LDLM_ERROR(lock, "no LVB");
1521                                 RETURN(-EPROTO);
1522                         }
1523
1524                         memcpy(data, lvb, size);
1525                 } else {
1526                         LDLM_ERROR(lock, "Replied unexpected lquota LVB size %d",
1527                                    size);
1528                         RETURN(-EINVAL);
1529                 }
1530                 break;
1531         case LVB_T_LAYOUT:
1532                 if (size == 0)
1533                         break;
1534
1535                 if (loc == RCL_CLIENT)
1536                         lvb = req_capsule_client_get(pill, &RMF_DLM_LVB);
1537                 else
1538                         lvb = req_capsule_server_get(pill, &RMF_DLM_LVB);
1539                 if (unlikely(lvb == NULL)) {
1540                         LDLM_ERROR(lock, "no LVB");
1541                         RETURN(-EPROTO);
1542                 }
1543
1544                 memcpy(data, lvb, size);
1545                 break;
1546         default:
1547                 LDLM_ERROR(lock, "Unknown LVB type: %d\n", lock->l_lvb_type);
1548                 libcfs_debug_dumpstack(NULL);
1549                 RETURN(-EINVAL);
1550         }
1551
1552         RETURN(0);
1553 }
1554
1555 /**
1556  * Create and fill in new LDLM lock with specified properties.
1557  * Returns a referenced lock
1558  */
1559 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1560                                    const struct ldlm_res_id *res_id,
1561                                    ldlm_type_t type,
1562                                    ldlm_mode_t mode,
1563                                    const struct ldlm_callback_suite *cbs,
1564                                    void *data, __u32 lvb_len,
1565                                    enum lvb_type lvb_type)
1566 {
1567         struct ldlm_lock        *lock;
1568         struct ldlm_resource    *res;
1569         int                     rc;
1570         ENTRY;
1571
1572         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1573         if (IS_ERR(res))
1574                 RETURN(ERR_CAST(res));
1575
1576         lock = ldlm_lock_new(res);
1577         if (lock == NULL)
1578                 RETURN(ERR_PTR(-ENOMEM));
1579
1580         lock->l_req_mode = mode;
1581         lock->l_ast_data = data;
1582         lock->l_pid = current_pid();
1583         if (ns_is_server(ns))
1584                 ldlm_set_ns_srv(lock);
1585         if (cbs) {
1586                 lock->l_blocking_ast = cbs->lcs_blocking;
1587                 lock->l_completion_ast = cbs->lcs_completion;
1588                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1589         }
1590
1591         lock->l_tree_node = NULL;
1592         /* if this is the extent lock, allocate the interval tree node */
1593         if (type == LDLM_EXTENT)
1594                 if (ldlm_interval_alloc(lock) == NULL)
1595                         GOTO(out, rc = -ENOMEM);
1596
1597         if (lvb_len) {
1598                 lock->l_lvb_len = lvb_len;
1599                 OBD_ALLOC_LARGE(lock->l_lvb_data, lvb_len);
1600                 if (lock->l_lvb_data == NULL)
1601                         GOTO(out, rc = -ENOMEM);
1602         }
1603
1604         lock->l_lvb_type = lvb_type;
1605         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1606                 GOTO(out, rc = -ENOENT);
1607
1608         RETURN(lock);
1609
1610 out:
1611         ldlm_lock_destroy(lock);
1612         LDLM_LOCK_RELEASE(lock);
1613         RETURN(ERR_PTR(rc));
1614 }
1615
1616 /**
1617  * Enqueue (request) a lock.
1618  *
1619  * Does not block. As a result of enqueue the lock would be put
1620  * into granted or waiting list.
1621  *
1622  * If namespace has intent policy sent and the lock has LDLM_FL_HAS_INTENT flag
1623  * set, skip all the enqueueing and delegate lock processing to intent policy
1624  * function.
1625  */
1626 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1627                                struct ldlm_lock **lockp,
1628                                void *cookie, __u64 *flags)
1629 {
1630         struct ldlm_lock *lock = *lockp;
1631         struct ldlm_resource *res = lock->l_resource;
1632         int local = ns_is_client(ldlm_res_to_ns(res));
1633 #ifdef HAVE_SERVER_SUPPORT
1634         ldlm_processing_policy policy;
1635 #endif
1636         ldlm_error_t rc = ELDLM_OK;
1637         struct ldlm_interval *node = NULL;
1638         ENTRY;
1639
1640         lock->l_last_activity = cfs_time_current_sec();
1641         /* policies are not executed on the client or during replay */
1642         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1643             && !local && ns->ns_policy) {
1644                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1645                                    NULL);
1646                 if (rc == ELDLM_LOCK_REPLACED) {
1647                         /* The lock that was returned has already been granted,
1648                          * and placed into lockp.  If it's not the same as the
1649                          * one we passed in, then destroy the old one and our
1650                          * work here is done. */
1651                         if (lock != *lockp) {
1652                                 ldlm_lock_destroy(lock);
1653                                 LDLM_LOCK_RELEASE(lock);
1654                         }
1655                         *flags |= LDLM_FL_LOCK_CHANGED;
1656                         RETURN(0);
1657                 } else if (rc != ELDLM_OK ||
1658                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1659                         ldlm_lock_destroy(lock);
1660                         RETURN(rc);
1661                 }
1662         }
1663
1664         if (*flags & LDLM_FL_RESENT)
1665                 RETURN(ELDLM_OK);
1666
1667         /* For a replaying lock, it might be already in granted list. So
1668          * unlinking the lock will cause the interval node to be freed, we
1669          * have to allocate the interval node early otherwise we can't regrant
1670          * this lock in the future. - jay */
1671         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1672                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
1673
1674         lock_res_and_lock(lock);
1675         if (local && lock->l_req_mode == lock->l_granted_mode) {
1676                 /* The server returned a blocked lock, but it was granted
1677                  * before we got a chance to actually enqueue it.  We don't
1678                  * need to do anything else. */
1679                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1680                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1681                 GOTO(out, rc = ELDLM_OK);
1682         }
1683
1684         ldlm_resource_unlink_lock(lock);
1685         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1686                 if (node == NULL) {
1687                         ldlm_lock_destroy_nolock(lock);
1688                         GOTO(out, rc = -ENOMEM);
1689                 }
1690
1691                 INIT_LIST_HEAD(&node->li_group);
1692                 ldlm_interval_attach(node, lock);
1693                 node = NULL;
1694         }
1695
1696         /* Some flags from the enqueue want to make it into the AST, via the
1697          * lock's l_flags. */
1698         if (*flags & LDLM_FL_AST_DISCARD_DATA)
1699                 ldlm_set_ast_discard_data(lock);
1700         if (*flags & LDLM_FL_TEST_LOCK)
1701                 ldlm_set_test_lock(lock);
1702
1703         /* This distinction between local lock trees is very important; a client
1704          * namespace only has information about locks taken by that client, and
1705          * thus doesn't have enough information to decide for itself if it can
1706          * be granted (below).  In this case, we do exactly what the server
1707          * tells us to do, as dictated by the 'flags'.
1708          *
1709          * We do exactly the same thing during recovery, when the server is
1710          * more or less trusting the clients not to lie.
1711          *
1712          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1713          * granted/converting queues. */
1714         if (local) {
1715                 if (*flags & LDLM_FL_BLOCK_CONV)
1716                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1717                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1718                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1719                 else
1720                         ldlm_grant_lock(lock, NULL);
1721                 GOTO(out, rc = ELDLM_OK);
1722 #ifdef HAVE_SERVER_SUPPORT
1723         } else if (*flags & LDLM_FL_REPLAY) {
1724                 if (*flags & LDLM_FL_BLOCK_CONV) {
1725                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1726                         GOTO(out, rc = ELDLM_OK);
1727                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1728                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1729                         GOTO(out, rc = ELDLM_OK);
1730                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1731                         ldlm_grant_lock(lock, NULL);
1732                         GOTO(out, rc = ELDLM_OK);
1733                 }
1734                 /* If no flags, fall through to normal enqueue path. */
1735         }
1736
1737         policy = ldlm_processing_policy_table[res->lr_type];
1738         policy(lock, flags, 1, &rc, NULL);
1739         GOTO(out, rc);
1740 #else
1741         } else {
1742                 CERROR("This is client-side-only module, cannot handle "
1743                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1744                 LBUG();
1745         }
1746 #endif
1747
1748 out:
1749         unlock_res_and_lock(lock);
1750         if (node)
1751                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1752         return rc;
1753 }
1754
1755 #ifdef HAVE_SERVER_SUPPORT
1756 /**
1757  * Iterate through all waiting locks on a given resource queue and attempt to
1758  * grant them.
1759  *
1760  * Must be called with resource lock held.
1761  */
1762 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
1763                          struct list_head *work_list)
1764 {
1765         struct list_head *tmp, *pos;
1766         ldlm_processing_policy policy;
1767         __u64 flags;
1768         int rc = LDLM_ITER_CONTINUE;
1769         ldlm_error_t err;
1770         ENTRY;
1771
1772         check_res_locked(res);
1773
1774         policy = ldlm_processing_policy_table[res->lr_type];
1775         LASSERT(policy);
1776
1777         list_for_each_safe(tmp, pos, queue) {
1778                 struct ldlm_lock *pending;
1779                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
1780
1781                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1782
1783                 flags = 0;
1784                 rc = policy(pending, &flags, 0, &err, work_list);
1785                 if (rc != LDLM_ITER_CONTINUE)
1786                         break;
1787         }
1788
1789         RETURN(rc);
1790 }
1791 #endif
1792
1793 /**
1794  * Process a call to blocking AST callback for a lock in ast_work list
1795  */
1796 static int
1797 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1798 {
1799         struct ldlm_cb_set_arg *arg = opaq;
1800         struct ldlm_lock_desc   d;
1801         int                     rc;
1802         struct ldlm_lock       *lock;
1803         ENTRY;
1804
1805         if (list_empty(arg->list))
1806                 RETURN(-ENOENT);
1807
1808         lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1809
1810         /* nobody should touch l_bl_ast */
1811         lock_res_and_lock(lock);
1812         list_del_init(&lock->l_bl_ast);
1813
1814         LASSERT(ldlm_is_ast_sent(lock));
1815         LASSERT(lock->l_bl_ast_run == 0);
1816         LASSERT(lock->l_blocking_lock);
1817         lock->l_bl_ast_run++;
1818         unlock_res_and_lock(lock);
1819
1820         ldlm_lock2desc(lock->l_blocking_lock, &d);
1821
1822         rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1823         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1824         lock->l_blocking_lock = NULL;
1825         LDLM_LOCK_RELEASE(lock);
1826
1827         RETURN(rc);
1828 }
1829
1830 /**
1831  * Process a call to completion AST callback for a lock in ast_work list
1832  */
1833 static int
1834 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1835 {
1836         struct ldlm_cb_set_arg  *arg = opaq;
1837         int                      rc = 0;
1838         struct ldlm_lock        *lock;
1839         ldlm_completion_callback completion_callback;
1840         ENTRY;
1841
1842         if (list_empty(arg->list))
1843                 RETURN(-ENOENT);
1844
1845         lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1846
1847         /* It's possible to receive a completion AST before we've set
1848          * the l_completion_ast pointer: either because the AST arrived
1849          * before the reply, or simply because there's a small race
1850          * window between receiving the reply and finishing the local
1851          * enqueue. (bug 842)
1852          *
1853          * This can't happen with the blocking_ast, however, because we
1854          * will never call the local blocking_ast until we drop our
1855          * reader/writer reference, which we won't do until we get the
1856          * reply and finish enqueueing. */
1857
1858         /* nobody should touch l_cp_ast */
1859         lock_res_and_lock(lock);
1860         list_del_init(&lock->l_cp_ast);
1861         LASSERT(ldlm_is_cp_reqd(lock));
1862         /* save l_completion_ast since it can be changed by
1863          * mds_intent_policy(), see bug 14225 */
1864         completion_callback = lock->l_completion_ast;
1865         ldlm_clear_cp_reqd(lock);
1866         unlock_res_and_lock(lock);
1867
1868         if (completion_callback != NULL)
1869                 rc = completion_callback(lock, 0, (void *)arg);
1870         LDLM_LOCK_RELEASE(lock);
1871
1872         RETURN(rc);
1873 }
1874
1875 /**
1876  * Process a call to revocation AST callback for a lock in ast_work list
1877  */
1878 static int
1879 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1880 {
1881         struct ldlm_cb_set_arg *arg = opaq;
1882         struct ldlm_lock_desc   desc;
1883         int                     rc;
1884         struct ldlm_lock       *lock;
1885         ENTRY;
1886
1887         if (list_empty(arg->list))
1888                 RETURN(-ENOENT);
1889
1890         lock = list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1891         list_del_init(&lock->l_rk_ast);
1892
1893         /* the desc just pretend to exclusive */
1894         ldlm_lock2desc(lock, &desc);
1895         desc.l_req_mode = LCK_EX;
1896         desc.l_granted_mode = 0;
1897
1898         rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1899         LDLM_LOCK_RELEASE(lock);
1900
1901         RETURN(rc);
1902 }
1903
1904 /**
1905  * Process a call to glimpse AST callback for a lock in ast_work list
1906  */
1907 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1908 {
1909         struct ldlm_cb_set_arg          *arg = opaq;
1910         struct ldlm_glimpse_work        *gl_work;
1911         struct ldlm_lock                *lock;
1912         int                              rc = 0;
1913         ENTRY;
1914
1915         if (list_empty(arg->list))
1916                 RETURN(-ENOENT);
1917
1918         gl_work = list_entry(arg->list->next, struct ldlm_glimpse_work,
1919                                  gl_list);
1920         list_del_init(&gl_work->gl_list);
1921
1922         lock = gl_work->gl_lock;
1923
1924         /* transfer the glimpse descriptor to ldlm_cb_set_arg */
1925         arg->gl_desc = gl_work->gl_desc;
1926
1927         /* invoke the actual glimpse callback */
1928         if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
1929                 rc = 1;
1930
1931         LDLM_LOCK_RELEASE(lock);
1932
1933         if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
1934                 OBD_FREE_PTR(gl_work);
1935
1936         RETURN(rc);
1937 }
1938
1939 /**
1940  * Process list of locks in need of ASTs being sent.
1941  *
1942  * Used on server to send multiple ASTs together instead of sending one by
1943  * one.
1944  */
1945 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
1946                       ldlm_desc_ast_t ast_type)
1947 {
1948         struct ldlm_cb_set_arg *arg;
1949         set_producer_func       work_ast_lock;
1950         int                     rc;
1951
1952         if (list_empty(rpc_list))
1953                 RETURN(0);
1954
1955         OBD_ALLOC_PTR(arg);
1956         if (arg == NULL)
1957                 RETURN(-ENOMEM);
1958
1959         atomic_set(&arg->restart, 0);
1960         arg->list = rpc_list;
1961
1962         switch (ast_type) {
1963                 case LDLM_WORK_BL_AST:
1964                         arg->type = LDLM_BL_CALLBACK;
1965                         work_ast_lock = ldlm_work_bl_ast_lock;
1966                         break;
1967                 case LDLM_WORK_CP_AST:
1968                         arg->type = LDLM_CP_CALLBACK;
1969                         work_ast_lock = ldlm_work_cp_ast_lock;
1970                         break;
1971                 case LDLM_WORK_REVOKE_AST:
1972                         arg->type = LDLM_BL_CALLBACK;
1973                         work_ast_lock = ldlm_work_revoke_ast_lock;
1974                         break;
1975                 case LDLM_WORK_GL_AST:
1976                         arg->type = LDLM_GL_CALLBACK;
1977                         work_ast_lock = ldlm_work_gl_ast_lock;
1978                         break;
1979                 default:
1980                         LBUG();
1981         }
1982
1983         /* We create a ptlrpc request set with flow control extension.
1984          * This request set will use the work_ast_lock function to produce new
1985          * requests and will send a new request each time one completes in order
1986          * to keep the number of requests in flight to ns_max_parallel_ast */
1987         arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1988                                      work_ast_lock, arg);
1989         if (arg->set == NULL)
1990                 GOTO(out, rc = -ENOMEM);
1991
1992         ptlrpc_set_wait(arg->set);
1993         ptlrpc_set_destroy(arg->set);
1994
1995         rc = atomic_read(&arg->restart) ? -ERESTART : 0;
1996         GOTO(out, rc);
1997 out:
1998         OBD_FREE_PTR(arg);
1999         return rc;
2000 }
2001
2002 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
2003 {
2004         ldlm_reprocess_all(res);
2005         return LDLM_ITER_CONTINUE;
2006 }
2007
2008 static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2009                               struct hlist_node *hnode, void *arg)
2010 {
2011         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2012         int    rc;
2013
2014         rc = reprocess_one_queue(res, arg);
2015
2016         return rc == LDLM_ITER_STOP;
2017 }
2018
2019 /**
2020  * Iterate through all resources on a namespace attempting to grant waiting
2021  * locks.
2022  */
2023 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
2024 {
2025         ENTRY;
2026
2027         if (ns != NULL) {
2028                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2029                                          ldlm_reprocess_res, NULL);
2030         }
2031         EXIT;
2032 }
2033 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
2034
2035 /**
2036  * Try to grant all waiting locks on a resource.
2037  *
2038  * Calls ldlm_reprocess_queue on converting and waiting queues.
2039  *
2040  * Typically called after some resource locks are cancelled to see
2041  * if anything could be granted as a result of the cancellation.
2042  */
2043 void ldlm_reprocess_all(struct ldlm_resource *res)
2044 {
2045         struct list_head rpc_list;
2046 #ifdef HAVE_SERVER_SUPPORT
2047         int rc;
2048         ENTRY;
2049
2050         INIT_LIST_HEAD(&rpc_list);
2051         /* Local lock trees don't get reprocessed. */
2052         if (ns_is_client(ldlm_res_to_ns(res))) {
2053                 EXIT;
2054                 return;
2055         }
2056
2057 restart:
2058         lock_res(res);
2059         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
2060         if (rc == LDLM_ITER_CONTINUE)
2061                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
2062         unlock_res(res);
2063
2064         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
2065                                LDLM_WORK_CP_AST);
2066         if (rc == -ERESTART) {
2067                 LASSERT(list_empty(&rpc_list));
2068                 goto restart;
2069         }
2070 #else
2071         ENTRY;
2072
2073         INIT_LIST_HEAD(&rpc_list);
2074         if (!ns_is_client(ldlm_res_to_ns(res))) {
2075                 CERROR("This is client-side-only module, cannot handle "
2076                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2077                 LBUG();
2078         }
2079 #endif
2080         EXIT;
2081 }
2082 EXPORT_SYMBOL(ldlm_reprocess_all);
2083
2084 /**
2085  * Helper function to call blocking AST for LDLM lock \a lock in a
2086  * "cancelling" mode.
2087  */
2088 void ldlm_cancel_callback(struct ldlm_lock *lock)
2089 {
2090         check_res_locked(lock->l_resource);
2091         if (!ldlm_is_cancel(lock)) {
2092                 ldlm_set_cancel(lock);
2093                 if (lock->l_blocking_ast) {
2094                         unlock_res_and_lock(lock);
2095                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
2096                                              LDLM_CB_CANCELING);
2097                         lock_res_and_lock(lock);
2098                 } else {
2099                         LDLM_DEBUG(lock, "no blocking ast");
2100                 }
2101         }
2102         ldlm_set_bl_done(lock);
2103 }
2104
2105 /**
2106  * Remove skiplist-enabled LDLM lock \a req from granted list
2107  */
2108 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
2109 {
2110         if (req->l_resource->lr_type != LDLM_PLAIN &&
2111             req->l_resource->lr_type != LDLM_IBITS)
2112                 return;
2113
2114         list_del_init(&req->l_sl_policy);
2115         list_del_init(&req->l_sl_mode);
2116 }
2117
2118 /**
2119  * Attempts to cancel LDLM lock \a lock that has no reader/writer references.
2120  */
2121 void ldlm_lock_cancel(struct ldlm_lock *lock)
2122 {
2123         struct ldlm_resource *res;
2124         struct ldlm_namespace *ns;
2125         ENTRY;
2126
2127         lock_res_and_lock(lock);
2128
2129         res = lock->l_resource;
2130         ns  = ldlm_res_to_ns(res);
2131
2132         /* Please do not, no matter how tempting, remove this LBUG without
2133          * talking to me first. -phik */
2134         if (lock->l_readers || lock->l_writers) {
2135                 LDLM_ERROR(lock, "lock still has references");
2136                 LBUG();
2137         }
2138
2139         if (ldlm_is_waited(lock))
2140                 ldlm_del_waiting_lock(lock);
2141
2142         /* Releases cancel callback. */
2143         ldlm_cancel_callback(lock);
2144
2145         /* Yes, second time, just in case it was added again while we were
2146          * running with no res lock in ldlm_cancel_callback */
2147         if (ldlm_is_waited(lock))
2148                 ldlm_del_waiting_lock(lock);
2149
2150         ldlm_resource_unlink_lock(lock);
2151         ldlm_lock_destroy_nolock(lock);
2152
2153         if (lock->l_granted_mode == lock->l_req_mode)
2154                 ldlm_pool_del(&ns->ns_pool, lock);
2155
2156         /* Make sure we will not be called again for same lock what is possible
2157          * if not to zero out lock->l_granted_mode */
2158         lock->l_granted_mode = LCK_MINMODE;
2159         unlock_res_and_lock(lock);
2160
2161         EXIT;
2162 }
2163 EXPORT_SYMBOL(ldlm_lock_cancel);
2164
2165 /**
2166  * Set opaque data into the lock that only makes sense to upper layer.
2167  */
2168 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
2169 {
2170         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2171         int rc = -EINVAL;
2172         ENTRY;
2173
2174         if (lock) {
2175                 if (lock->l_ast_data == NULL)
2176                         lock->l_ast_data = data;
2177                 if (lock->l_ast_data == data)
2178                         rc = 0;
2179                 LDLM_LOCK_PUT(lock);
2180         }
2181         RETURN(rc);
2182 }
2183 EXPORT_SYMBOL(ldlm_lock_set_data);
2184
2185 struct export_cl_data {
2186         struct obd_export       *ecl_exp;
2187         int                     ecl_loop;
2188 };
2189
2190 /**
2191  * Iterator function for ldlm_cancel_locks_for_export.
2192  * Cancels passed locks.
2193  */
2194 int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2195                                     struct hlist_node *hnode, void *data)
2196
2197 {
2198         struct export_cl_data   *ecl = (struct export_cl_data *)data;
2199         struct obd_export       *exp  = ecl->ecl_exp;
2200         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
2201         struct ldlm_resource *res;
2202
2203         res = ldlm_resource_getref(lock->l_resource);
2204         LDLM_LOCK_GET(lock);
2205
2206         LDLM_DEBUG(lock, "export %p", exp);
2207         ldlm_res_lvbo_update(res, NULL, 1);
2208         ldlm_lock_cancel(lock);
2209         ldlm_reprocess_all(res);
2210         ldlm_resource_putref(res);
2211         LDLM_LOCK_RELEASE(lock);
2212
2213         ecl->ecl_loop++;
2214         if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
2215                 CDEBUG(D_INFO,
2216                        "Cancel lock %p for export %p (loop %d), still have "
2217                        "%d locks left on hash table.\n",
2218                        lock, exp, ecl->ecl_loop,
2219                        atomic_read(&hs->hs_count));
2220         }
2221
2222         return 0;
2223 }
2224
2225 /**
2226  * Cancel all locks for given export.
2227  *
2228  * Typically called on client disconnection/eviction
2229  */
2230 void ldlm_cancel_locks_for_export(struct obd_export *exp)
2231 {
2232         struct export_cl_data   ecl = {
2233                 .ecl_exp        = exp,
2234                 .ecl_loop       = 0,
2235         };
2236
2237         cfs_hash_for_each_empty(exp->exp_lock_hash,
2238                                 ldlm_cancel_locks_for_export_cb, &ecl);
2239 }
2240
2241 /**
2242  * Downgrade an exclusive lock.
2243  *
2244  * A fast variant of ldlm_lock_convert for convertion of exclusive
2245  * locks. The convertion is always successful.
2246  * Used by Commit on Sharing (COS) code.
2247  *
2248  * \param lock A lock to convert
2249  * \param new_mode new lock mode
2250  */
2251 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
2252 {
2253         ENTRY;
2254
2255         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
2256         LASSERT(new_mode == LCK_COS);
2257
2258         lock_res_and_lock(lock);
2259         ldlm_resource_unlink_lock(lock);
2260         /*
2261          * Remove the lock from pool as it will be added again in
2262          * ldlm_grant_lock() called below.
2263          */
2264         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
2265
2266         lock->l_req_mode = new_mode;
2267         ldlm_grant_lock(lock, NULL);
2268         unlock_res_and_lock(lock);
2269         ldlm_reprocess_all(lock->l_resource);
2270
2271         EXIT;
2272 }
2273 EXPORT_SYMBOL(ldlm_lock_downgrade);
2274
2275 /**
2276  * Attempt to convert already granted lock to a different mode.
2277  *
2278  * While lock conversion is not currently used, future client-side
2279  * optimizations could take advantage of it to avoid discarding cached
2280  * pages on a file.
2281  */
2282 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
2283                                         __u32 *flags)
2284 {
2285         struct list_head rpc_list;
2286         struct ldlm_resource *res;
2287         struct ldlm_namespace *ns;
2288         int granted = 0;
2289 #ifdef HAVE_SERVER_SUPPORT
2290         int old_mode;
2291         struct sl_insert_point prev;
2292 #endif
2293         struct ldlm_interval *node;
2294         ENTRY;
2295
2296         INIT_LIST_HEAD(&rpc_list);
2297         /* Just return if mode is unchanged. */
2298         if (new_mode == lock->l_granted_mode) {
2299                 *flags |= LDLM_FL_BLOCK_GRANTED;
2300                 RETURN(lock->l_resource);
2301         }
2302
2303         /* I can't check the type of lock here because the bitlock of lock
2304          * is not held here, so do the allocation blindly. -jay */
2305         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
2306         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
2307                 RETURN(NULL);
2308
2309         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
2310                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
2311
2312         lock_res_and_lock(lock);
2313
2314         res = lock->l_resource;
2315         ns  = ldlm_res_to_ns(res);
2316
2317 #ifdef HAVE_SERVER_SUPPORT
2318         old_mode = lock->l_req_mode;
2319 #endif
2320         lock->l_req_mode = new_mode;
2321         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
2322 #ifdef HAVE_SERVER_SUPPORT
2323                 /* remember the lock position where the lock might be
2324                  * added back to the granted list later and also
2325                  * remember the join mode for skiplist fixing. */
2326                 prev.res_link = lock->l_res_link.prev;
2327                 prev.mode_link = lock->l_sl_mode.prev;
2328                 prev.policy_link = lock->l_sl_policy.prev;
2329 #endif
2330                 ldlm_resource_unlink_lock(lock);
2331         } else {
2332                 ldlm_resource_unlink_lock(lock);
2333                 if (res->lr_type == LDLM_EXTENT) {
2334                         /* FIXME: ugly code, I have to attach the lock to a
2335                          * interval node again since perhaps it will be granted
2336                          * soon */
2337                         INIT_LIST_HEAD(&node->li_group);
2338                         ldlm_interval_attach(node, lock);
2339                         node = NULL;
2340                 }
2341         }
2342
2343         /*
2344          * Remove old lock from the pool before adding the lock with new
2345          * mode below in ->policy()
2346          */
2347         ldlm_pool_del(&ns->ns_pool, lock);
2348
2349         /* If this is a local resource, put it on the appropriate list. */
2350         if (ns_is_client(ldlm_res_to_ns(res))) {
2351                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
2352                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
2353                 } else {
2354                         /* This should never happen, because of the way the
2355                          * server handles conversions. */
2356                         LDLM_ERROR(lock, "Erroneous flags %x on local lock\n",
2357                                    *flags);
2358                         LBUG();
2359
2360                         ldlm_grant_lock(lock, &rpc_list);
2361                         granted = 1;
2362                         /* FIXME: completion handling not with lr_lock held ! */
2363                         if (lock->l_completion_ast)
2364                                 lock->l_completion_ast(lock, 0, NULL);
2365                 }
2366 #ifdef HAVE_SERVER_SUPPORT
2367         } else {
2368                 int rc;
2369                 ldlm_error_t err;
2370                 __u64 pflags = 0;
2371                 ldlm_processing_policy policy;
2372                 policy = ldlm_processing_policy_table[res->lr_type];
2373                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
2374                 if (rc == LDLM_ITER_STOP) {
2375                         lock->l_req_mode = old_mode;
2376                         if (res->lr_type == LDLM_EXTENT)
2377                                 ldlm_extent_add_lock(res, lock);
2378                         else
2379                                 ldlm_granted_list_add_lock(lock, &prev);
2380
2381                         res = NULL;
2382                 } else {
2383                         *flags |= LDLM_FL_BLOCK_GRANTED;
2384                         granted = 1;
2385                 }
2386         }
2387 #else
2388         } else {
2389                 CERROR("This is client-side-only module, cannot handle "
2390                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2391                 LBUG();
2392         }
2393 #endif
2394         unlock_res_and_lock(lock);
2395
2396         if (granted)
2397                 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2398         if (node)
2399                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2400         RETURN(res);
2401 }
2402 EXPORT_SYMBOL(ldlm_lock_convert);
2403
2404 /**
2405  * Print lock with lock handle \a lockh description into debug log.
2406  *
2407  * Used when printing all locks on a resource for debug purposes.
2408  */
2409 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2410 {
2411         struct ldlm_lock *lock;
2412
2413         if (!((libcfs_debug | D_ERROR) & level))
2414                 return;
2415
2416         lock = ldlm_handle2lock(lockh);
2417         if (lock == NULL)
2418                 return;
2419
2420         LDLM_DEBUG_LIMIT(level, lock, "###");
2421
2422         LDLM_LOCK_PUT(lock);
2423 }
2424 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2425
2426 /**
2427  * Print lock information with custom message into debug log.
2428  * Helper function.
2429  */
2430 void _ldlm_lock_debug(struct ldlm_lock *lock,
2431                       struct libcfs_debug_msg_data *msgdata,
2432                       const char *fmt, ...)
2433 {
2434         va_list args;
2435         struct obd_export *exp = lock->l_export;
2436         struct ldlm_resource *resource = lock->l_resource;
2437         char *nid = "local";
2438
2439         va_start(args, fmt);
2440
2441         if (exp && exp->exp_connection) {
2442                 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2443         } else if (exp && exp->exp_obd != NULL) {
2444                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2445                 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2446         }
2447
2448         if (resource == NULL) {
2449                 libcfs_debug_vmsg2(msgdata, fmt, args,
2450                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2451                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2452                        "remote: "LPX64" expref: %d pid: %u timeout: %lu "
2453                        "lvb_type: %d\n",
2454                        lock,
2455                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2456                        lock->l_readers, lock->l_writers,
2457                        ldlm_lockname[lock->l_granted_mode],
2458                        ldlm_lockname[lock->l_req_mode],
2459                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2460                        exp ? atomic_read(&exp->exp_refcount) : -99,
2461                        lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2462                 va_end(args);
2463                 return;
2464         }
2465
2466         switch (resource->lr_type) {
2467         case LDLM_EXTENT:
2468                 libcfs_debug_vmsg2(msgdata, fmt, args,
2469                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2470                         "res: "DLDLMRES" rrc: %d type: %s ["LPU64"->"LPU64"] "
2471                         "(req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote: "
2472                         LPX64" expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2473                         ldlm_lock_to_ns_name(lock), lock,
2474                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2475                         lock->l_readers, lock->l_writers,
2476                         ldlm_lockname[lock->l_granted_mode],
2477                         ldlm_lockname[lock->l_req_mode],
2478                         PLDLMRES(resource),
2479                         atomic_read(&resource->lr_refcount),
2480                         ldlm_typename[resource->lr_type],
2481                         lock->l_policy_data.l_extent.start,
2482                         lock->l_policy_data.l_extent.end,
2483                         lock->l_req_extent.start, lock->l_req_extent.end,
2484                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2485                         exp ? atomic_read(&exp->exp_refcount) : -99,
2486                         lock->l_pid, lock->l_callback_timeout,
2487                         lock->l_lvb_type);
2488                 break;
2489
2490         case LDLM_FLOCK:
2491                 libcfs_debug_vmsg2(msgdata, fmt, args,
2492                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2493                         "res: "DLDLMRES" rrc: %d type: %s pid: %d "
2494                         "["LPU64"->"LPU64"] flags: "LPX64" nid: %s "
2495                         "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
2496                         ldlm_lock_to_ns_name(lock), lock,
2497                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2498                         lock->l_readers, lock->l_writers,
2499                         ldlm_lockname[lock->l_granted_mode],
2500                         ldlm_lockname[lock->l_req_mode],
2501                         PLDLMRES(resource),
2502                         atomic_read(&resource->lr_refcount),
2503                         ldlm_typename[resource->lr_type],
2504                         lock->l_policy_data.l_flock.pid,
2505                         lock->l_policy_data.l_flock.start,
2506                         lock->l_policy_data.l_flock.end,
2507                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2508                         exp ? atomic_read(&exp->exp_refcount) : -99,
2509                         lock->l_pid, lock->l_callback_timeout);
2510                 break;
2511
2512         case LDLM_IBITS:
2513                 libcfs_debug_vmsg2(msgdata, fmt, args,
2514                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2515                         "res: "DLDLMRES" bits "LPX64" rrc: %d type: %s "
2516                         "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2517                         "pid: %u timeout: %lu lvb_type: %d\n",
2518                         ldlm_lock_to_ns_name(lock),
2519                         lock, lock->l_handle.h_cookie,
2520                         atomic_read(&lock->l_refc),
2521                         lock->l_readers, lock->l_writers,
2522                         ldlm_lockname[lock->l_granted_mode],
2523                         ldlm_lockname[lock->l_req_mode],
2524                         PLDLMRES(resource),
2525                         lock->l_policy_data.l_inodebits.bits,
2526                         atomic_read(&resource->lr_refcount),
2527                         ldlm_typename[resource->lr_type],
2528                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2529                         exp ? atomic_read(&exp->exp_refcount) : -99,
2530                         lock->l_pid, lock->l_callback_timeout,
2531                         lock->l_lvb_type);
2532                 break;
2533
2534         default:
2535                 libcfs_debug_vmsg2(msgdata, fmt, args,
2536                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2537                         "res: "DLDLMRES" rrc: %d type: %s flags: "LPX64" "
2538                         "nid: %s remote: "LPX64" expref: %d pid: %u "
2539                         "timeout: %lu lvb_type: %d\n",
2540                         ldlm_lock_to_ns_name(lock),
2541                         lock, lock->l_handle.h_cookie,
2542                         atomic_read(&lock->l_refc),
2543                         lock->l_readers, lock->l_writers,
2544                         ldlm_lockname[lock->l_granted_mode],
2545                         ldlm_lockname[lock->l_req_mode],
2546                         PLDLMRES(resource),
2547                         atomic_read(&resource->lr_refcount),
2548                         ldlm_typename[resource->lr_type],
2549                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2550                         exp ? atomic_read(&exp->exp_refcount) : -99,
2551                         lock->l_pid, lock->l_callback_timeout,
2552                         lock->l_lvb_type);
2553                 break;
2554         }
2555         va_end(args);
2556 }
2557 EXPORT_SYMBOL(_ldlm_lock_debug);