Whamcloud - gitweb
LU-4971 ldlm: drop redundant ibits lock interoperability check
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #ifdef __KERNEL__
45 # include <libcfs/libcfs.h>
46 # include <linux/lustre_intent.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include "ldlm_internal.h"
53
54 /* lock types */
55 char *ldlm_lockname[] = {
56         [0] = "--",
57         [LCK_EX] = "EX",
58         [LCK_PW] = "PW",
59         [LCK_PR] = "PR",
60         [LCK_CW] = "CW",
61         [LCK_CR] = "CR",
62         [LCK_NL] = "NL",
63         [LCK_GROUP] = "GROUP",
64         [LCK_COS] = "COS"
65 };
66 EXPORT_SYMBOL(ldlm_lockname);
67
68 char *ldlm_typename[] = {
69         [LDLM_PLAIN] = "PLN",
70         [LDLM_EXTENT] = "EXT",
71         [LDLM_FLOCK] = "FLK",
72         [LDLM_IBITS] = "IBT",
73 };
74 EXPORT_SYMBOL(ldlm_typename);
75
76 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
77         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_wire_to_local,
78         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
79         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_wire18_to_local,
80         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_wire_to_local,
81 };
82
83 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
84         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_wire_to_local,
85         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
86         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_wire21_to_local,
87         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_wire_to_local,
88 };
89
90 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
91         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_local_to_wire,
92         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_local_to_wire,
93         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_local_to_wire,
94         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_local_to_wire,
95 };
96
97 /**
98  * Converts lock policy from local format to on the wire lock_desc format
99  */
100 void ldlm_convert_policy_to_wire(ldlm_type_t type,
101                                  const ldlm_policy_data_t *lpolicy,
102                                  ldlm_wire_policy_data_t *wpolicy)
103 {
104         ldlm_policy_local_to_wire_t convert;
105
106         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
107
108         convert(lpolicy, wpolicy);
109 }
110
111 /**
112  * Converts lock policy from on the wire lock_desc format to local format
113  */
114 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
115                                   const ldlm_wire_policy_data_t *wpolicy,
116                                   ldlm_policy_data_t *lpolicy)
117 {
118         ldlm_policy_wire_to_local_t convert;
119         int new_client;
120
121         /** some badness for 2.0.0 clients, but 2.0.0 isn't supported */
122         new_client = (exp_connect_flags(exp) & OBD_CONNECT_FULL20) != 0;
123         if (new_client)
124                 convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
125         else
126                 convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
127
128         convert(wpolicy, lpolicy);
129 }
130
131 char *ldlm_it2str(int it)
132 {
133         switch (it) {
134         case IT_OPEN:
135                 return "open";
136         case IT_CREAT:
137                 return "creat";
138         case (IT_OPEN | IT_CREAT):
139                 return "open|creat";
140         case IT_READDIR:
141                 return "readdir";
142         case IT_GETATTR:
143                 return "getattr";
144         case IT_LOOKUP:
145                 return "lookup";
146         case IT_UNLINK:
147                 return "unlink";
148         case IT_GETXATTR:
149                 return "getxattr";
150         case IT_LAYOUT:
151                 return "layout";
152         default:
153                 CERROR("Unknown intent %d\n", it);
154                 return "UNKNOWN";
155         }
156 }
157 EXPORT_SYMBOL(ldlm_it2str);
158
159 extern struct kmem_cache *ldlm_lock_slab;
160
161 #ifdef HAVE_SERVER_SUPPORT
162 static ldlm_processing_policy ldlm_processing_policy_table[] = {
163         [LDLM_PLAIN]    = ldlm_process_plain_lock,
164         [LDLM_EXTENT]   = ldlm_process_extent_lock,
165 # ifdef __KERNEL__
166         [LDLM_FLOCK]    = ldlm_process_flock_lock,
167 # endif
168         [LDLM_IBITS]    = ldlm_process_inodebits_lock,
169 };
170
171 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
172 {
173         return ldlm_processing_policy_table[res->lr_type];
174 }
175 EXPORT_SYMBOL(ldlm_get_processing_policy);
176 #endif /* HAVE_SERVER_SUPPORT */
177
178 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
179 {
180         ns->ns_policy = arg;
181 }
182 EXPORT_SYMBOL(ldlm_register_intent);
183
184 /*
185  * REFCOUNTED LOCK OBJECTS
186  */
187
188
189 /**
190  * Get a reference on a lock.
191  *
192  * Lock refcounts, during creation:
193  *   - one special one for allocation, dec'd only once in destroy
194  *   - one for being a lock that's in-use
195  *   - one for the addref associated with a new lock
196  */
197 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
198 {
199         atomic_inc(&lock->l_refc);
200         return lock;
201 }
202 EXPORT_SYMBOL(ldlm_lock_get);
203
204 /**
205  * Release lock reference.
206  *
207  * Also frees the lock if it was last reference.
208  */
209 void ldlm_lock_put(struct ldlm_lock *lock)
210 {
211         ENTRY;
212
213         LASSERT(lock->l_resource != LP_POISON);
214         LASSERT(atomic_read(&lock->l_refc) > 0);
215         if (atomic_dec_and_test(&lock->l_refc)) {
216                 struct ldlm_resource *res;
217
218                 LDLM_DEBUG(lock,
219                            "final lock_put on destroyed lock, freeing it.");
220
221                 res = lock->l_resource;
222                 LASSERT(ldlm_is_destroyed(lock));
223                 LASSERT(cfs_list_empty(&lock->l_res_link));
224                 LASSERT(cfs_list_empty(&lock->l_pending_chain));
225
226                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
227                                      LDLM_NSS_LOCKS);
228                 lu_ref_del(&res->lr_reference, "lock", lock);
229                 ldlm_resource_putref(res);
230                 lock->l_resource = NULL;
231                 if (lock->l_export) {
232                         class_export_lock_put(lock->l_export, lock);
233                         lock->l_export = NULL;
234                 }
235
236                 if (lock->l_lvb_data != NULL)
237                         OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
238
239                 ldlm_interval_free(ldlm_interval_detach(lock));
240                 lu_ref_fini(&lock->l_reference);
241                 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
242         }
243
244         EXIT;
245 }
246 EXPORT_SYMBOL(ldlm_lock_put);
247
248 /**
249  * Removes LDLM lock \a lock from LRU. Assumes LRU is already locked.
250  */
251 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
252 {
253         int rc = 0;
254         if (!cfs_list_empty(&lock->l_lru)) {
255                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
256
257                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
258                 cfs_list_del_init(&lock->l_lru);
259                 LASSERT(ns->ns_nr_unused > 0);
260                 ns->ns_nr_unused--;
261                 rc = 1;
262         }
263         return rc;
264 }
265
266 /**
267  * Removes LDLM lock \a lock from LRU. Obtains the LRU lock first.
268  */
269 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
270 {
271         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
272         int rc;
273
274         ENTRY;
275         if (ldlm_is_ns_srv(lock)) {
276                 LASSERT(cfs_list_empty(&lock->l_lru));
277                 RETURN(0);
278         }
279
280         spin_lock(&ns->ns_lock);
281         rc = ldlm_lock_remove_from_lru_nolock(lock);
282         spin_unlock(&ns->ns_lock);
283         EXIT;
284         return rc;
285 }
286
287 /**
288  * Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked.
289  */
290 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
291 {
292         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
293
294         lock->l_last_used = cfs_time_current();
295         LASSERT(cfs_list_empty(&lock->l_lru));
296         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
297         cfs_list_add_tail(&lock->l_lru, &ns->ns_unused_list);
298         ldlm_clear_skipped(lock);
299         LASSERT(ns->ns_nr_unused >= 0);
300         ns->ns_nr_unused++;
301 }
302
303 /**
304  * Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks
305  * first.
306  */
307 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
308 {
309         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
310
311         ENTRY;
312         spin_lock(&ns->ns_lock);
313         ldlm_lock_add_to_lru_nolock(lock);
314         spin_unlock(&ns->ns_lock);
315         EXIT;
316 }
317
318 /**
319  * Moves LDLM lock \a lock that is already in namespace LRU to the tail of
320  * the LRU. Performs necessary LRU locking
321  */
322 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
323 {
324         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
325
326         ENTRY;
327         if (ldlm_is_ns_srv(lock)) {
328                 LASSERT(cfs_list_empty(&lock->l_lru));
329                 EXIT;
330                 return;
331         }
332
333         spin_lock(&ns->ns_lock);
334         if (!cfs_list_empty(&lock->l_lru)) {
335                 ldlm_lock_remove_from_lru_nolock(lock);
336                 ldlm_lock_add_to_lru_nolock(lock);
337         }
338         spin_unlock(&ns->ns_lock);
339         EXIT;
340 }
341
342 /**
343  * Helper to destroy a locked lock.
344  *
345  * Used by ldlm_lock_destroy and ldlm_lock_destroy_nolock
346  * Must be called with l_lock and lr_lock held.
347  *
348  * Does not actually free the lock data, but rather marks the lock as
349  * destroyed by setting l_destroyed field in the lock to 1.  Destroys a
350  * handle->lock association too, so that the lock can no longer be found
351  * and removes the lock from LRU list.  Actual lock freeing occurs when
352  * last lock reference goes away.
353  *
354  * Original comment (of some historical value):
355  * This used to have a 'strict' flag, which recovery would use to mark an
356  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
357  * shall explain why it's gone: with the new hash table scheme, once you call
358  * ldlm_lock_destroy, you can never drop your final references on this lock.
359  * Because it's not in the hash table anymore.  -phil
360  */
361 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
362 {
363         ENTRY;
364
365         if (lock->l_readers || lock->l_writers) {
366                 LDLM_ERROR(lock, "lock still has references");
367                 LBUG();
368         }
369
370         if (!cfs_list_empty(&lock->l_res_link)) {
371                 LDLM_ERROR(lock, "lock still on resource");
372                 LBUG();
373         }
374
375         if (ldlm_is_destroyed(lock)) {
376                 LASSERT(cfs_list_empty(&lock->l_lru));
377                 EXIT;
378                 return 0;
379         }
380         ldlm_set_destroyed(lock);
381
382         if (lock->l_export && lock->l_export->exp_lock_hash) {
383                 /* NB: it's safe to call cfs_hash_del() even lock isn't
384                  * in exp_lock_hash. */
385                 /* In the function below, .hs_keycmp resolves to
386                  * ldlm_export_lock_keycmp() */
387                 /* coverity[overrun-buffer-val] */
388                 cfs_hash_del(lock->l_export->exp_lock_hash,
389                              &lock->l_remote_handle, &lock->l_exp_hash);
390         }
391
392         ldlm_lock_remove_from_lru(lock);
393         class_handle_unhash(&lock->l_handle);
394
395 #if 0
396         /* Wake anyone waiting for this lock */
397         /* FIXME: I should probably add yet another flag, instead of using
398          * l_export to only call this on clients */
399         if (lock->l_export)
400                 class_export_put(lock->l_export);
401         lock->l_export = NULL;
402         if (lock->l_export && lock->l_completion_ast)
403                 lock->l_completion_ast(lock, 0);
404 #endif
405         EXIT;
406         return 1;
407 }
408
409 /**
410  * Destroys a LDLM lock \a lock. Performs necessary locking first.
411  */
412 void ldlm_lock_destroy(struct ldlm_lock *lock)
413 {
414         int first;
415         ENTRY;
416         lock_res_and_lock(lock);
417         first = ldlm_lock_destroy_internal(lock);
418         unlock_res_and_lock(lock);
419
420         /* drop reference from hashtable only for first destroy */
421         if (first) {
422                 lu_ref_del(&lock->l_reference, "hash", lock);
423                 LDLM_LOCK_RELEASE(lock);
424         }
425         EXIT;
426 }
427
428 /**
429  * Destroys a LDLM lock \a lock that is already locked.
430  */
431 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
432 {
433         int first;
434         ENTRY;
435         first = ldlm_lock_destroy_internal(lock);
436         /* drop reference from hashtable only for first destroy */
437         if (first) {
438                 lu_ref_del(&lock->l_reference, "hash", lock);
439                 LDLM_LOCK_RELEASE(lock);
440         }
441         EXIT;
442 }
443
444 /* this is called by portals_handle2object with the handle lock taken */
445 static void lock_handle_addref(void *lock)
446 {
447         LDLM_LOCK_GET((struct ldlm_lock *)lock);
448 }
449
450 static void lock_handle_free(void *lock, int size)
451 {
452         LASSERT(size == sizeof(struct ldlm_lock));
453         OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
454 }
455
456 struct portals_handle_ops lock_handle_ops = {
457         .hop_addref = lock_handle_addref,
458         .hop_free   = lock_handle_free,
459 };
460
461 /**
462  *
463  * Allocate and initialize new lock structure.
464  *
465  * usage: pass in a resource on which you have done ldlm_resource_get
466  *        new lock will take over the refcount.
467  * returns: lock with refcount 2 - one for current caller and one for remote
468  */
469 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
470 {
471         struct ldlm_lock *lock;
472         ENTRY;
473
474         if (resource == NULL)
475                 LBUG();
476
477         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, GFP_NOFS);
478         if (lock == NULL)
479                 RETURN(NULL);
480
481         spin_lock_init(&lock->l_lock);
482         lock->l_resource = resource;
483         lu_ref_add(&resource->lr_reference, "lock", lock);
484
485         atomic_set(&lock->l_refc, 2);
486         CFS_INIT_LIST_HEAD(&lock->l_res_link);
487         CFS_INIT_LIST_HEAD(&lock->l_lru);
488         CFS_INIT_LIST_HEAD(&lock->l_pending_chain);
489         CFS_INIT_LIST_HEAD(&lock->l_bl_ast);
490         CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
491         CFS_INIT_LIST_HEAD(&lock->l_rk_ast);
492         init_waitqueue_head(&lock->l_waitq);
493         lock->l_blocking_lock = NULL;
494         CFS_INIT_LIST_HEAD(&lock->l_sl_mode);
495         CFS_INIT_LIST_HEAD(&lock->l_sl_policy);
496         CFS_INIT_HLIST_NODE(&lock->l_exp_hash);
497         CFS_INIT_HLIST_NODE(&lock->l_exp_flock_hash);
498
499         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
500                              LDLM_NSS_LOCKS);
501         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
502         class_handle_hash(&lock->l_handle, &lock_handle_ops);
503
504         lu_ref_init(&lock->l_reference);
505         lu_ref_add(&lock->l_reference, "hash", lock);
506         lock->l_callback_timeout = 0;
507
508 #if LUSTRE_TRACKS_LOCK_EXP_REFS
509         CFS_INIT_LIST_HEAD(&lock->l_exp_refs_link);
510         lock->l_exp_refs_nr = 0;
511         lock->l_exp_refs_target = NULL;
512 #endif
513         CFS_INIT_LIST_HEAD(&lock->l_exp_list);
514
515         RETURN(lock);
516 }
517
518 /**
519  * Moves LDLM lock \a lock to another resource.
520  * This is used on client when server returns some other lock than requested
521  * (typically as a result of intent operation)
522  */
523 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
524                               const struct ldlm_res_id *new_resid)
525 {
526         struct ldlm_resource *oldres = lock->l_resource;
527         struct ldlm_resource *newres;
528         int type;
529         ENTRY;
530
531         LASSERT(ns_is_client(ns));
532
533         lock_res_and_lock(lock);
534         if (memcmp(new_resid, &lock->l_resource->lr_name,
535                    sizeof(lock->l_resource->lr_name)) == 0) {
536                 /* Nothing to do */
537                 unlock_res_and_lock(lock);
538                 RETURN(0);
539         }
540
541         LASSERT(new_resid->name[0] != 0);
542
543         /* This function assumes that the lock isn't on any lists */
544         LASSERT(cfs_list_empty(&lock->l_res_link));
545
546         type = oldres->lr_type;
547         unlock_res_and_lock(lock);
548
549         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
550         if (IS_ERR(newres))
551                 RETURN(PTR_ERR(newres));
552
553         lu_ref_add(&newres->lr_reference, "lock", lock);
554         /*
555          * To flip the lock from the old to the new resource, lock, oldres and
556          * newres have to be locked. Resource spin-locks are nested within
557          * lock->l_lock, and are taken in the memory address order to avoid
558          * dead-locks.
559          */
560         spin_lock(&lock->l_lock);
561         oldres = lock->l_resource;
562         if (oldres < newres) {
563                 lock_res(oldres);
564                 lock_res_nested(newres, LRT_NEW);
565         } else {
566                 lock_res(newres);
567                 lock_res_nested(oldres, LRT_NEW);
568         }
569         LASSERT(memcmp(new_resid, &oldres->lr_name,
570                        sizeof oldres->lr_name) != 0);
571         lock->l_resource = newres;
572         unlock_res(oldres);
573         unlock_res_and_lock(lock);
574
575         /* ...and the flowers are still standing! */
576         lu_ref_del(&oldres->lr_reference, "lock", lock);
577         ldlm_resource_putref(oldres);
578
579         RETURN(0);
580 }
581 EXPORT_SYMBOL(ldlm_lock_change_resource);
582
583 /** \defgroup ldlm_handles LDLM HANDLES
584  * Ways to get hold of locks without any addresses.
585  * @{
586  */
587
588 /**
589  * Fills in handle for LDLM lock \a lock into supplied \a lockh
590  * Does not take any references.
591  */
592 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
593 {
594         lockh->cookie = lock->l_handle.h_cookie;
595 }
596 EXPORT_SYMBOL(ldlm_lock2handle);
597
598 /**
599  * Obtain a lock reference by handle.
600  *
601  * if \a flags: atomically get the lock and set the flags.
602  *              Return NULL if flag already set
603  */
604 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
605                                      __u64 flags)
606 {
607         struct ldlm_lock *lock;
608         ENTRY;
609
610         LASSERT(handle);
611
612         lock = class_handle2object(handle->cookie, NULL);
613         if (lock == NULL)
614                 RETURN(NULL);
615
616         /* It's unlikely but possible that someone marked the lock as
617          * destroyed after we did handle2object on it */
618         if ((flags == 0) && !ldlm_is_destroyed(lock)) {
619                 lu_ref_add(&lock->l_reference, "handle", current);
620                 RETURN(lock);
621         }
622
623         lock_res_and_lock(lock);
624
625         LASSERT(lock->l_resource != NULL);
626
627         lu_ref_add_atomic(&lock->l_reference, "handle", current);
628         if (unlikely(ldlm_is_destroyed(lock))) {
629                 unlock_res_and_lock(lock);
630                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
631                 LDLM_LOCK_PUT(lock);
632                 RETURN(NULL);
633         }
634
635         /* If we're setting flags, make sure none of them are already set. */
636         if (flags != 0) {
637                 if ((lock->l_flags & flags) != 0) {
638                         unlock_res_and_lock(lock);
639                         LDLM_LOCK_PUT(lock);
640                         RETURN(NULL);
641                 }
642
643                 lock->l_flags |= flags;
644         }
645
646         unlock_res_and_lock(lock);
647         RETURN(lock);
648 }
649 EXPORT_SYMBOL(__ldlm_handle2lock);
650 /** @} ldlm_handles */
651
652 /**
653  * Fill in "on the wire" representation for given LDLM lock into supplied
654  * lock descriptor \a desc structure.
655  */
656 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
657 {
658         ldlm_res2desc(lock->l_resource, &desc->l_resource);
659         desc->l_req_mode = lock->l_req_mode;
660         desc->l_granted_mode = lock->l_granted_mode;
661         ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
662                                     &lock->l_policy_data,
663                                     &desc->l_policy_data);
664 }
665 EXPORT_SYMBOL(ldlm_lock2desc);
666
667 /**
668  * Add a lock to list of conflicting locks to send AST to.
669  *
670  * Only add if we have not sent a blocking AST to the lock yet.
671  */
672 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
673                            cfs_list_t *work_list)
674 {
675         if (!ldlm_is_ast_sent(lock)) {
676                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
677                 ldlm_set_ast_sent(lock);
678                 /* If the enqueuing client said so, tell the AST recipient to
679                  * discard dirty data, rather than writing back. */
680                 if (ldlm_is_ast_discard_data(new))
681                         ldlm_set_discard_data(lock);
682                 LASSERT(cfs_list_empty(&lock->l_bl_ast));
683                 cfs_list_add(&lock->l_bl_ast, work_list);
684                 LDLM_LOCK_GET(lock);
685                 LASSERT(lock->l_blocking_lock == NULL);
686                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
687         }
688 }
689
690 /**
691  * Add a lock to list of just granted locks to send completion AST to.
692  */
693 void ldlm_add_cp_work_item(struct ldlm_lock *lock, cfs_list_t *work_list)
694 {
695         if (!ldlm_is_cp_reqd(lock)) {
696                 ldlm_set_cp_reqd(lock);
697                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
698                 LASSERT(cfs_list_empty(&lock->l_cp_ast));
699                 cfs_list_add(&lock->l_cp_ast, work_list);
700                 LDLM_LOCK_GET(lock);
701         }
702 }
703
704 /**
705  * Aggregator function to add AST work items into a list. Determines
706  * what sort of an AST work needs to be done and calls the proper
707  * adding function.
708  * Must be called with lr_lock held.
709  */
710 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
711                             cfs_list_t *work_list)
712 {
713         ENTRY;
714         check_res_locked(lock->l_resource);
715         if (new)
716                 ldlm_add_bl_work_item(lock, new, work_list);
717         else
718                 ldlm_add_cp_work_item(lock, work_list);
719         EXIT;
720 }
721
722 /**
723  * Add specified reader/writer reference to LDLM lock with handle \a lockh.
724  * r/w reference type is determined by \a mode
725  * Calls ldlm_lock_addref_internal.
726  */
727 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
728 {
729         struct ldlm_lock *lock;
730
731         lock = ldlm_handle2lock(lockh);
732         LASSERT(lock != NULL);
733         ldlm_lock_addref_internal(lock, mode);
734         LDLM_LOCK_PUT(lock);
735 }
736 EXPORT_SYMBOL(ldlm_lock_addref);
737
738 /**
739  * Helper function.
740  * Add specified reader/writer reference to LDLM lock \a lock.
741  * r/w reference type is determined by \a mode
742  * Removes lock from LRU if it is there.
743  * Assumes the LDLM lock is already locked.
744  */
745 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
746 {
747         ldlm_lock_remove_from_lru(lock);
748         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
749                 lock->l_readers++;
750                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
751         }
752         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
753                 lock->l_writers++;
754                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
755         }
756         LDLM_LOCK_GET(lock);
757         lu_ref_add_atomic(&lock->l_reference, "user", lock);
758         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
759 }
760
761 /**
762  * Attempts to add reader/writer reference to a lock with handle \a lockh, and
763  * fails if lock is already LDLM_FL_CBPENDING or destroyed.
764  *
765  * \retval 0 success, lock was addref-ed
766  *
767  * \retval -EAGAIN lock is being canceled.
768  */
769 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
770 {
771         struct ldlm_lock *lock;
772         int               result;
773
774         result = -EAGAIN;
775         lock = ldlm_handle2lock(lockh);
776         if (lock != NULL) {
777                 lock_res_and_lock(lock);
778                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
779                     !ldlm_is_cbpending(lock)) {
780                         ldlm_lock_addref_internal_nolock(lock, mode);
781                         result = 0;
782                 }
783                 unlock_res_and_lock(lock);
784                 LDLM_LOCK_PUT(lock);
785         }
786         return result;
787 }
788 EXPORT_SYMBOL(ldlm_lock_addref_try);
789
790 /**
791  * Add specified reader/writer reference to LDLM lock \a lock.
792  * Locks LDLM lock and calls ldlm_lock_addref_internal_nolock to do the work.
793  * Only called for local locks.
794  */
795 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
796 {
797         lock_res_and_lock(lock);
798         ldlm_lock_addref_internal_nolock(lock, mode);
799         unlock_res_and_lock(lock);
800 }
801
802 /**
803  * Removes reader/writer reference for LDLM lock \a lock.
804  * Assumes LDLM lock is already locked.
805  * only called in ldlm_flock_destroy and for local locks.
806  * Does NOT add lock to LRU if no r/w references left to accomodate flock locks
807  * that cannot be placed in LRU.
808  */
809 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
810 {
811         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
812         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
813                 LASSERT(lock->l_readers > 0);
814                 lu_ref_del(&lock->l_reference, "reader", lock);
815                 lock->l_readers--;
816         }
817         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
818                 LASSERT(lock->l_writers > 0);
819                 lu_ref_del(&lock->l_reference, "writer", lock);
820                 lock->l_writers--;
821         }
822
823         lu_ref_del(&lock->l_reference, "user", lock);
824         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
825 }
826
827 /**
828  * Removes reader/writer reference for LDLM lock \a lock.
829  * Locks LDLM lock first.
830  * If the lock is determined to be client lock on a client and r/w refcount
831  * drops to zero and the lock is not blocked, the lock is added to LRU lock
832  * on the namespace.
833  * For blocked LDLM locks if r/w count drops to zero, blocking_ast is called.
834  */
835 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
836 {
837         struct ldlm_namespace *ns;
838         ENTRY;
839
840         lock_res_and_lock(lock);
841
842         ns = ldlm_lock_to_ns(lock);
843
844         ldlm_lock_decref_internal_nolock(lock, mode);
845
846         if (ldlm_is_local(lock) &&
847             !lock->l_readers && !lock->l_writers) {
848                 /* If this is a local lock on a server namespace and this was
849                  * the last reference, cancel the lock. */
850                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
851                 ldlm_set_cbpending(lock);
852         }
853
854         if (!lock->l_readers && !lock->l_writers &&
855             ldlm_is_cbpending(lock)) {
856                 /* If we received a blocked AST and this was the last reference,
857                  * run the callback. */
858                 if (ldlm_is_ns_srv(lock) && lock->l_export)
859                         CERROR("FL_CBPENDING set on non-local lock--just a "
860                                "warning\n");
861
862                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
863
864                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
865                 ldlm_lock_remove_from_lru(lock);
866                 unlock_res_and_lock(lock);
867
868                 if (ldlm_is_fail_loc(lock))
869                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
870
871                 if (ldlm_is_atomic_cb(lock) ||
872                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
873                         ldlm_handle_bl_callback(ns, NULL, lock);
874         } else if (ns_is_client(ns) &&
875                    !lock->l_readers && !lock->l_writers &&
876                    !ldlm_is_no_lru(lock) &&
877                    !ldlm_is_bl_ast(lock)) {
878
879                 LDLM_DEBUG(lock, "add lock into lru list");
880
881                 /* If this is a client-side namespace and this was the last
882                  * reference, put it on the LRU. */
883                 ldlm_lock_add_to_lru(lock);
884                 unlock_res_and_lock(lock);
885
886                 if (ldlm_is_fail_loc(lock))
887                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
888
889                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
890                  * are not supported by the server, otherwise, it is done on
891                  * enqueue. */
892                 if (!exp_connect_cancelset(lock->l_conn_export) &&
893                     !ns_connect_lru_resize(ns))
894                         ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
895         } else {
896                 LDLM_DEBUG(lock, "do not add lock into lru list");
897                 unlock_res_and_lock(lock);
898         }
899
900         EXIT;
901 }
902
903 /**
904  * Decrease reader/writer refcount for LDLM lock with handle \a lockh
905  */
906 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
907 {
908         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
909         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
910         ldlm_lock_decref_internal(lock, mode);
911         LDLM_LOCK_PUT(lock);
912 }
913 EXPORT_SYMBOL(ldlm_lock_decref);
914
915 /**
916  * Decrease reader/writer refcount for LDLM lock with handle
917  * \a lockh and mark it for subsequent cancellation once r/w refcount
918  * drops to zero instead of putting into LRU.
919  *
920  * Typical usage is for GROUP locks which we cannot allow to be cached.
921  */
922 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
923 {
924         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
925         ENTRY;
926
927         LASSERT(lock != NULL);
928
929         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
930         lock_res_and_lock(lock);
931         ldlm_set_cbpending(lock);
932         unlock_res_and_lock(lock);
933         ldlm_lock_decref_internal(lock, mode);
934         LDLM_LOCK_PUT(lock);
935 }
936 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
937
938 struct sl_insert_point {
939         cfs_list_t *res_link;
940         cfs_list_t *mode_link;
941         cfs_list_t *policy_link;
942 };
943
944 /**
945  * Finds a position to insert the new lock into granted lock list.
946  *
947  * Used for locks eligible for skiplist optimization.
948  *
949  * Parameters:
950  *      queue [input]:  the granted list where search acts on;
951  *      req [input]:    the lock whose position to be located;
952  *      prev [output]:  positions within 3 lists to insert @req to
953  * Return Value:
954  *      filled @prev
955  * NOTE: called by
956  *  - ldlm_grant_lock_with_skiplist
957  */
958 static void search_granted_lock(cfs_list_t *queue,
959                                 struct ldlm_lock *req,
960                                 struct sl_insert_point *prev)
961 {
962         cfs_list_t *tmp;
963         struct ldlm_lock *lock, *mode_end, *policy_end;
964         ENTRY;
965
966         cfs_list_for_each(tmp, queue) {
967                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
968
969                 mode_end = cfs_list_entry(lock->l_sl_mode.prev,
970                                           struct ldlm_lock, l_sl_mode);
971
972                 if (lock->l_req_mode != req->l_req_mode) {
973                         /* jump to last lock of mode group */
974                         tmp = &mode_end->l_res_link;
975                         continue;
976                 }
977
978                 /* suitable mode group is found */
979                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
980                         /* insert point is last lock of the mode group */
981                         prev->res_link = &mode_end->l_res_link;
982                         prev->mode_link = &mode_end->l_sl_mode;
983                         prev->policy_link = &req->l_sl_policy;
984                         EXIT;
985                         return;
986                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
987                         for (;;) {
988                                 policy_end =
989                                         cfs_list_entry(lock->l_sl_policy.prev,
990                                                        struct ldlm_lock,
991                                                        l_sl_policy);
992
993                                 if (lock->l_policy_data.l_inodebits.bits ==
994                                     req->l_policy_data.l_inodebits.bits) {
995                                         /* insert point is last lock of
996                                          * the policy group */
997                                         prev->res_link =
998                                                 &policy_end->l_res_link;
999                                         prev->mode_link =
1000                                                 &policy_end->l_sl_mode;
1001                                         prev->policy_link =
1002                                                 &policy_end->l_sl_policy;
1003                                         EXIT;
1004                                         return;
1005                                 }
1006
1007                                 if (policy_end == mode_end)
1008                                         /* done with mode group */
1009                                         break;
1010
1011                                 /* go to next policy group within mode group */
1012                                 tmp = policy_end->l_res_link.next;
1013                                 lock = cfs_list_entry(tmp, struct ldlm_lock,
1014                                                       l_res_link);
1015                         }  /* loop over policy groups within the mode group */
1016
1017                         /* insert point is last lock of the mode group,
1018                          * new policy group is started */
1019                         prev->res_link = &mode_end->l_res_link;
1020                         prev->mode_link = &mode_end->l_sl_mode;
1021                         prev->policy_link = &req->l_sl_policy;
1022                         EXIT;
1023                         return;
1024                 } else {
1025                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
1026                         LBUG();
1027                 }
1028         }
1029
1030         /* insert point is last lock on the queue,
1031          * new mode group and new policy group are started */
1032         prev->res_link = queue->prev;
1033         prev->mode_link = &req->l_sl_mode;
1034         prev->policy_link = &req->l_sl_policy;
1035         EXIT;
1036         return;
1037 }
1038
1039 /**
1040  * Add a lock into resource granted list after a position described by
1041  * \a prev.
1042  */
1043 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
1044                                        struct sl_insert_point *prev)
1045 {
1046         struct ldlm_resource *res = lock->l_resource;
1047         ENTRY;
1048
1049         check_res_locked(res);
1050
1051         ldlm_resource_dump(D_INFO, res);
1052         LDLM_DEBUG(lock, "About to add lock:");
1053
1054         if (ldlm_is_destroyed(lock)) {
1055                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
1056                 return;
1057         }
1058
1059         LASSERT(cfs_list_empty(&lock->l_res_link));
1060         LASSERT(cfs_list_empty(&lock->l_sl_mode));
1061         LASSERT(cfs_list_empty(&lock->l_sl_policy));
1062
1063         /*
1064          * lock->link == prev->link means lock is first starting the group.
1065          * Don't re-add to itself to suppress kernel warnings.
1066          */
1067         if (&lock->l_res_link != prev->res_link)
1068                 cfs_list_add(&lock->l_res_link, prev->res_link);
1069         if (&lock->l_sl_mode != prev->mode_link)
1070                 cfs_list_add(&lock->l_sl_mode, prev->mode_link);
1071         if (&lock->l_sl_policy != prev->policy_link)
1072                 cfs_list_add(&lock->l_sl_policy, prev->policy_link);
1073
1074         EXIT;
1075 }
1076
1077 /**
1078  * Add a lock to granted list on a resource maintaining skiplist
1079  * correctness.
1080  */
1081 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
1082 {
1083         struct sl_insert_point prev;
1084         ENTRY;
1085
1086         LASSERT(lock->l_req_mode == lock->l_granted_mode);
1087
1088         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
1089         ldlm_granted_list_add_lock(lock, &prev);
1090         EXIT;
1091 }
1092
1093 /**
1094  * Perform lock granting bookkeeping.
1095  *
1096  * Includes putting the lock into granted list and updating lock mode.
1097  * NOTE: called by
1098  *  - ldlm_lock_enqueue
1099  *  - ldlm_reprocess_queue
1100  *  - ldlm_lock_convert
1101  *
1102  * must be called with lr_lock held
1103  */
1104 void ldlm_grant_lock(struct ldlm_lock *lock, cfs_list_t *work_list)
1105 {
1106         struct ldlm_resource *res = lock->l_resource;
1107         ENTRY;
1108
1109         check_res_locked(res);
1110
1111         lock->l_granted_mode = lock->l_req_mode;
1112
1113         if (work_list && lock->l_completion_ast != NULL)
1114                 ldlm_add_ast_work_item(lock, NULL, work_list);
1115
1116         /* We should not add locks to granted list in the following cases:
1117          * - this is an UNLOCK but not a real lock;
1118          * - this is a TEST lock;
1119          * - this is a F_CANCELLK lock (async flock has req_mode == 0)
1120          * - this is a deadlock (flock cannot be granted) */
1121         if (lock->l_req_mode == 0 ||
1122             lock->l_req_mode == LCK_NL ||
1123             ldlm_is_test_lock(lock) ||
1124             ldlm_is_flock_deadlock(lock))
1125                 RETURN_EXIT;
1126
1127         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1128                 ldlm_grant_lock_with_skiplist(lock);
1129         else if (res->lr_type == LDLM_EXTENT)
1130                 ldlm_extent_add_lock(res, lock);
1131         else
1132                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1133
1134         if (lock->l_granted_mode < res->lr_most_restr)
1135                 res->lr_most_restr = lock->l_granted_mode;
1136
1137         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1138         EXIT;
1139 }
1140
1141 /**
1142  * Search for a lock with given properties in a queue.
1143  *
1144  * \retval a referenced lock or NULL.  See the flag descriptions below, in the
1145  * comment above ldlm_lock_match
1146  */
1147 static struct ldlm_lock *search_queue(cfs_list_t *queue,
1148                                       ldlm_mode_t *mode,
1149                                       ldlm_policy_data_t *policy,
1150                                       struct ldlm_lock *old_lock,
1151                                       __u64 flags, int unref)
1152 {
1153         struct ldlm_lock *lock;
1154         cfs_list_t       *tmp;
1155
1156         cfs_list_for_each(tmp, queue) {
1157                 ldlm_mode_t match;
1158
1159                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1160
1161                 if (lock == old_lock)
1162                         break;
1163
1164                 /* Check if this lock can be matched.
1165                  * Used by LU-2919(exclusive open) for open lease lock */
1166                 if (ldlm_is_excl(lock))
1167                         continue;
1168
1169                 /* llite sometimes wants to match locks that will be
1170                  * canceled when their users drop, but we allow it to match
1171                  * if it passes in CBPENDING and the lock still has users.
1172                  * this is generally only going to be used by children
1173                  * whose parents already hold a lock so forward progress
1174                  * can still happen. */
1175                 if (ldlm_is_cbpending(lock) &&
1176                     !(flags & LDLM_FL_CBPENDING))
1177                         continue;
1178                 if (!unref && ldlm_is_cbpending(lock) &&
1179                     lock->l_readers == 0 && lock->l_writers == 0)
1180                         continue;
1181
1182                 if (!(lock->l_req_mode & *mode))
1183                         continue;
1184                 match = lock->l_req_mode;
1185
1186                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
1187                     (lock->l_policy_data.l_extent.start >
1188                      policy->l_extent.start ||
1189                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
1190                         continue;
1191
1192                 if (unlikely(match == LCK_GROUP) &&
1193                     lock->l_resource->lr_type == LDLM_EXTENT &&
1194                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1195                         continue;
1196
1197                 /* We match if we have existing lock with same or wider set
1198                    of bits. */
1199                 if (lock->l_resource->lr_type == LDLM_IBITS &&
1200                      ((lock->l_policy_data.l_inodebits.bits &
1201                       policy->l_inodebits.bits) !=
1202                       policy->l_inodebits.bits))
1203                         continue;
1204
1205                 if (!unref && LDLM_HAVE_MASK(lock, GONE))
1206                         continue;
1207
1208                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
1209                     !ldlm_is_local(lock))
1210                         continue;
1211
1212                 if (flags & LDLM_FL_TEST_LOCK) {
1213                         LDLM_LOCK_GET(lock);
1214                         ldlm_lock_touch_in_lru(lock);
1215                 } else {
1216                         ldlm_lock_addref_internal_nolock(lock, match);
1217                 }
1218                 *mode = match;
1219                 return lock;
1220         }
1221
1222         return NULL;
1223 }
1224
1225 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1226 {
1227         if ((lock->l_flags & LDLM_FL_FAIL_NOTIFIED) == 0) {
1228                 lock->l_flags |= LDLM_FL_FAIL_NOTIFIED;
1229                 wake_up_all(&lock->l_waitq);
1230         }
1231 }
1232 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1233
1234 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1235 {
1236         lock_res_and_lock(lock);
1237         ldlm_lock_fail_match_locked(lock);
1238         unlock_res_and_lock(lock);
1239 }
1240 EXPORT_SYMBOL(ldlm_lock_fail_match);
1241
1242 /**
1243  * Mark lock as "matchable" by OST.
1244  *
1245  * Used to prevent certain races in LOV/OSC where the lock is granted, but LVB
1246  * is not yet valid.
1247  * Assumes LDLM lock is already locked.
1248  */
1249 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1250 {
1251         ldlm_set_lvb_ready(lock);
1252         wake_up_all(&lock->l_waitq);
1253 }
1254 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
1255
1256 /**
1257  * Mark lock as "matchable" by OST.
1258  * Locks the lock and then \see ldlm_lock_allow_match_locked
1259  */
1260 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1261 {
1262         lock_res_and_lock(lock);
1263         ldlm_lock_allow_match_locked(lock);
1264         unlock_res_and_lock(lock);
1265 }
1266 EXPORT_SYMBOL(ldlm_lock_allow_match);
1267
1268 /**
1269  * Attempt to find a lock with specified properties.
1270  *
1271  * Typically returns a reference to matched lock unless LDLM_FL_TEST_LOCK is
1272  * set in \a flags
1273  *
1274  * Can be called in two ways:
1275  *
1276  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1277  * for a duplicate of.
1278  *
1279  * Otherwise, all of the fields must be filled in, to match against.
1280  *
1281  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1282  *     server (ie, connh is NULL)
1283  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1284  *     list will be considered
1285  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1286  *     to be canceled can still be matched as long as they still have reader
1287  *     or writer refernces
1288  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1289  *     just tell us if we would have matched.
1290  *
1291  * \retval 1 if it finds an already-existing lock that is compatible; in this
1292  * case, lockh is filled in with a addref()ed lock
1293  *
1294  * We also check security context, and if that fails we simply return 0 (to
1295  * keep caller code unchanged), the context failure will be discovered by
1296  * caller sometime later.
1297  */
1298 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
1299                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1300                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1301                             struct lustre_handle *lockh, int unref)
1302 {
1303         struct ldlm_resource *res;
1304         struct ldlm_lock *lock, *old_lock = NULL;
1305         int rc = 0;
1306         ENTRY;
1307
1308         if (ns == NULL) {
1309                 old_lock = ldlm_handle2lock(lockh);
1310                 LASSERT(old_lock);
1311
1312                 ns = ldlm_lock_to_ns(old_lock);
1313                 res_id = &old_lock->l_resource->lr_name;
1314                 type = old_lock->l_resource->lr_type;
1315                 mode = old_lock->l_req_mode;
1316         }
1317
1318         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1319         if (IS_ERR(res)) {
1320                 LASSERT(old_lock == NULL);
1321                 RETURN(0);
1322         }
1323
1324         LDLM_RESOURCE_ADDREF(res);
1325         lock_res(res);
1326
1327         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1328                             flags, unref);
1329         if (lock != NULL)
1330                 GOTO(out, rc = 1);
1331         if (flags & LDLM_FL_BLOCK_GRANTED)
1332                 GOTO(out, rc = 0);
1333         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1334                             flags, unref);
1335         if (lock != NULL)
1336                 GOTO(out, rc = 1);
1337         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1338                             flags, unref);
1339         if (lock != NULL)
1340                 GOTO(out, rc = 1);
1341
1342         EXIT;
1343  out:
1344         unlock_res(res);
1345         LDLM_RESOURCE_DELREF(res);
1346         ldlm_resource_putref(res);
1347
1348         if (lock) {
1349                 ldlm_lock2handle(lock, lockh);
1350                 if ((flags & LDLM_FL_LVB_READY) &&
1351                     (!ldlm_is_lvb_ready(lock))) {
1352                         __u64 wait_flags = LDLM_FL_LVB_READY |
1353                                 LDLM_FL_DESTROYED | LDLM_FL_FAIL_NOTIFIED;
1354                         struct l_wait_info lwi;
1355                         if (lock->l_completion_ast) {
1356                                 int err = lock->l_completion_ast(lock,
1357                                                           LDLM_FL_WAIT_NOREPROC,
1358                                                                  NULL);
1359                                 if (err) {
1360                                         if (flags & LDLM_FL_TEST_LOCK)
1361                                                 LDLM_LOCK_RELEASE(lock);
1362                                         else
1363                                                 ldlm_lock_decref_internal(lock,
1364                                                                           mode);
1365                                         rc = 0;
1366                                         goto out2;
1367                                 }
1368                         }
1369
1370                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1371                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1372
1373                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1374                         l_wait_event(lock->l_waitq,
1375                                      lock->l_flags & wait_flags,
1376                                      &lwi);
1377                         if (!ldlm_is_lvb_ready(lock)) {
1378                                 if (flags & LDLM_FL_TEST_LOCK)
1379                                         LDLM_LOCK_RELEASE(lock);
1380                                 else
1381                                         ldlm_lock_decref_internal(lock, mode);
1382                                 rc = 0;
1383                         }
1384                 }
1385         }
1386  out2:
1387         if (rc) {
1388                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1389                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1390                                 res_id->name[2] : policy->l_extent.start,
1391                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1392                                 res_id->name[3] : policy->l_extent.end);
1393
1394                 /* check user's security context */
1395                 if (lock->l_conn_export &&
1396                     sptlrpc_import_check_ctx(
1397                                 class_exp2cliimp(lock->l_conn_export))) {
1398                         if (!(flags & LDLM_FL_TEST_LOCK))
1399                                 ldlm_lock_decref_internal(lock, mode);
1400                         rc = 0;
1401                 }
1402
1403                 if (flags & LDLM_FL_TEST_LOCK)
1404                         LDLM_LOCK_RELEASE(lock);
1405
1406         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1407                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1408                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1409                                   type, mode, res_id->name[0], res_id->name[1],
1410                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1411                                         res_id->name[2] :policy->l_extent.start,
1412                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1413                                         res_id->name[3] : policy->l_extent.end);
1414         }
1415         if (old_lock)
1416                 LDLM_LOCK_PUT(old_lock);
1417
1418         return rc ? mode : 0;
1419 }
1420 EXPORT_SYMBOL(ldlm_lock_match);
1421
1422 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1423                                         __u64 *bits)
1424 {
1425         struct ldlm_lock *lock;
1426         ldlm_mode_t mode = 0;
1427         ENTRY;
1428
1429         lock = ldlm_handle2lock(lockh);
1430         if (lock != NULL) {
1431                 lock_res_and_lock(lock);
1432                 if (LDLM_HAVE_MASK(lock, GONE))
1433                         GOTO(out, mode);
1434
1435                 if (ldlm_is_cbpending(lock) &&
1436                     lock->l_readers == 0 && lock->l_writers == 0)
1437                         GOTO(out, mode);
1438
1439                 if (bits)
1440                         *bits = lock->l_policy_data.l_inodebits.bits;
1441                 mode = lock->l_granted_mode;
1442                 ldlm_lock_addref_internal_nolock(lock, mode);
1443         }
1444
1445         EXIT;
1446
1447 out:
1448         if (lock != NULL) {
1449                 unlock_res_and_lock(lock);
1450                 LDLM_LOCK_PUT(lock);
1451         }
1452         return mode;
1453 }
1454 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1455
1456 /** The caller must guarantee that the buffer is large enough. */
1457 int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
1458                   enum req_location loc, void *data, int size)
1459 {
1460         void *lvb;
1461         ENTRY;
1462
1463         LASSERT(data != NULL);
1464         LASSERT(size >= 0);
1465
1466         switch (lock->l_lvb_type) {
1467         case LVB_T_OST:
1468                 if (size == sizeof(struct ost_lvb)) {
1469                         if (loc == RCL_CLIENT)
1470                                 lvb = req_capsule_client_swab_get(pill,
1471                                                 &RMF_DLM_LVB,
1472                                                 lustre_swab_ost_lvb);
1473                         else
1474                                 lvb = req_capsule_server_swab_get(pill,
1475                                                 &RMF_DLM_LVB,
1476                                                 lustre_swab_ost_lvb);
1477                         if (unlikely(lvb == NULL)) {
1478                                 LDLM_ERROR(lock, "no LVB");
1479                                 RETURN(-EPROTO);
1480                         }
1481
1482                         memcpy(data, lvb, size);
1483                 } else if (size == sizeof(struct ost_lvb_v1)) {
1484                         struct ost_lvb *olvb = data;
1485
1486                         if (loc == RCL_CLIENT)
1487                                 lvb = req_capsule_client_swab_get(pill,
1488                                                 &RMF_DLM_LVB,
1489                                                 lustre_swab_ost_lvb_v1);
1490                         else
1491                                 lvb = req_capsule_server_sized_swab_get(pill,
1492                                                 &RMF_DLM_LVB, size,
1493                                                 lustre_swab_ost_lvb_v1);
1494                         if (unlikely(lvb == NULL)) {
1495                                 LDLM_ERROR(lock, "no LVB");
1496                                 RETURN(-EPROTO);
1497                         }
1498
1499                         memcpy(data, lvb, size);
1500                         olvb->lvb_mtime_ns = 0;
1501                         olvb->lvb_atime_ns = 0;
1502                         olvb->lvb_ctime_ns = 0;
1503                 } else {
1504                         LDLM_ERROR(lock, "Replied unexpected ost LVB size %d",
1505                                    size);
1506                         RETURN(-EINVAL);
1507                 }
1508                 break;
1509         case LVB_T_LQUOTA:
1510                 if (size == sizeof(struct lquota_lvb)) {
1511                         if (loc == RCL_CLIENT)
1512                                 lvb = req_capsule_client_swab_get(pill,
1513                                                 &RMF_DLM_LVB,
1514                                                 lustre_swab_lquota_lvb);
1515                         else
1516                                 lvb = req_capsule_server_swab_get(pill,
1517                                                 &RMF_DLM_LVB,
1518                                                 lustre_swab_lquota_lvb);
1519                         if (unlikely(lvb == NULL)) {
1520                                 LDLM_ERROR(lock, "no LVB");
1521                                 RETURN(-EPROTO);
1522                         }
1523
1524                         memcpy(data, lvb, size);
1525                 } else {
1526                         LDLM_ERROR(lock, "Replied unexpected lquota LVB size %d",
1527                                    size);
1528                         RETURN(-EINVAL);
1529                 }
1530                 break;
1531         case LVB_T_LAYOUT:
1532                 if (size == 0)
1533                         break;
1534
1535                 if (loc == RCL_CLIENT)
1536                         lvb = req_capsule_client_get(pill, &RMF_DLM_LVB);
1537                 else
1538                         lvb = req_capsule_server_get(pill, &RMF_DLM_LVB);
1539                 if (unlikely(lvb == NULL)) {
1540                         LDLM_ERROR(lock, "no LVB");
1541                         RETURN(-EPROTO);
1542                 }
1543
1544                 memcpy(data, lvb, size);
1545                 break;
1546         default:
1547                 LDLM_ERROR(lock, "Unknown LVB type: %d\n", lock->l_lvb_type);
1548                 libcfs_debug_dumpstack(NULL);
1549                 RETURN(-EINVAL);
1550         }
1551
1552         RETURN(0);
1553 }
1554
1555 /**
1556  * Create and fill in new LDLM lock with specified properties.
1557  * Returns a referenced lock
1558  */
1559 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1560                                    const struct ldlm_res_id *res_id,
1561                                    ldlm_type_t type,
1562                                    ldlm_mode_t mode,
1563                                    const struct ldlm_callback_suite *cbs,
1564                                    void *data, __u32 lvb_len,
1565                                    enum lvb_type lvb_type)
1566 {
1567         struct ldlm_lock        *lock;
1568         struct ldlm_resource    *res;
1569         int                     rc;
1570         ENTRY;
1571
1572         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1573         if (IS_ERR(res))
1574                 RETURN(ERR_CAST(res));
1575
1576         lock = ldlm_lock_new(res);
1577         if (lock == NULL)
1578                 RETURN(ERR_PTR(-ENOMEM));
1579
1580         lock->l_req_mode = mode;
1581         lock->l_ast_data = data;
1582         lock->l_pid = current_pid();
1583         if (ns_is_server(ns))
1584                 ldlm_set_ns_srv(lock);
1585         if (cbs) {
1586                 lock->l_blocking_ast = cbs->lcs_blocking;
1587                 lock->l_completion_ast = cbs->lcs_completion;
1588                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1589         }
1590
1591         lock->l_tree_node = NULL;
1592         /* if this is the extent lock, allocate the interval tree node */
1593         if (type == LDLM_EXTENT)
1594                 if (ldlm_interval_alloc(lock) == NULL)
1595                         GOTO(out, rc = -ENOMEM);
1596
1597         if (lvb_len) {
1598                 lock->l_lvb_len = lvb_len;
1599                 OBD_ALLOC_LARGE(lock->l_lvb_data, lvb_len);
1600                 if (lock->l_lvb_data == NULL)
1601                         GOTO(out, rc = -ENOMEM);
1602         }
1603
1604         lock->l_lvb_type = lvb_type;
1605         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1606                 GOTO(out, rc = -ENOENT);
1607
1608         RETURN(lock);
1609
1610 out:
1611         ldlm_lock_destroy(lock);
1612         LDLM_LOCK_RELEASE(lock);
1613         RETURN(ERR_PTR(rc));
1614 }
1615
1616 /**
1617  * Enqueue (request) a lock.
1618  *
1619  * Does not block. As a result of enqueue the lock would be put
1620  * into granted or waiting list.
1621  *
1622  * If namespace has intent policy sent and the lock has LDLM_FL_HAS_INTENT flag
1623  * set, skip all the enqueueing and delegate lock processing to intent policy
1624  * function.
1625  */
1626 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1627                                struct ldlm_lock **lockp,
1628                                void *cookie, __u64 *flags)
1629 {
1630         struct ldlm_lock *lock = *lockp;
1631         struct ldlm_resource *res = lock->l_resource;
1632         int local = ns_is_client(ldlm_res_to_ns(res));
1633 #ifdef HAVE_SERVER_SUPPORT
1634         ldlm_processing_policy policy;
1635 #endif
1636         ldlm_error_t rc = ELDLM_OK;
1637         struct ldlm_interval *node = NULL;
1638         ENTRY;
1639
1640         lock->l_last_activity = cfs_time_current_sec();
1641         /* policies are not executed on the client or during replay */
1642         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1643             && !local && ns->ns_policy) {
1644                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1645                                    NULL);
1646                 if (rc == ELDLM_LOCK_REPLACED) {
1647                         /* The lock that was returned has already been granted,
1648                          * and placed into lockp.  If it's not the same as the
1649                          * one we passed in, then destroy the old one and our
1650                          * work here is done. */
1651                         if (lock != *lockp) {
1652                                 ldlm_lock_destroy(lock);
1653                                 LDLM_LOCK_RELEASE(lock);
1654                         }
1655                         *flags |= LDLM_FL_LOCK_CHANGED;
1656                         RETURN(0);
1657                 } else if (rc != ELDLM_OK ||
1658                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1659                         ldlm_lock_destroy(lock);
1660                         RETURN(rc);
1661                 }
1662         }
1663
1664         /* For a replaying lock, it might be already in granted list. So
1665          * unlinking the lock will cause the interval node to be freed, we
1666          * have to allocate the interval node early otherwise we can't regrant
1667          * this lock in the future. - jay */
1668         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1669                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
1670
1671         lock_res_and_lock(lock);
1672         if (local && lock->l_req_mode == lock->l_granted_mode) {
1673                 /* The server returned a blocked lock, but it was granted
1674                  * before we got a chance to actually enqueue it.  We don't
1675                  * need to do anything else. */
1676                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1677                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1678                 GOTO(out, rc = ELDLM_OK);
1679         }
1680
1681         ldlm_resource_unlink_lock(lock);
1682         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1683                 if (node == NULL) {
1684                         ldlm_lock_destroy_nolock(lock);
1685                         GOTO(out, rc = -ENOMEM);
1686                 }
1687
1688                 CFS_INIT_LIST_HEAD(&node->li_group);
1689                 ldlm_interval_attach(node, lock);
1690                 node = NULL;
1691         }
1692
1693         /* Some flags from the enqueue want to make it into the AST, via the
1694          * lock's l_flags. */
1695         if (*flags & LDLM_FL_AST_DISCARD_DATA)
1696                 ldlm_set_ast_discard_data(lock);
1697         if (*flags & LDLM_FL_TEST_LOCK)
1698                 ldlm_set_test_lock(lock);
1699
1700         /* This distinction between local lock trees is very important; a client
1701          * namespace only has information about locks taken by that client, and
1702          * thus doesn't have enough information to decide for itself if it can
1703          * be granted (below).  In this case, we do exactly what the server
1704          * tells us to do, as dictated by the 'flags'.
1705          *
1706          * We do exactly the same thing during recovery, when the server is
1707          * more or less trusting the clients not to lie.
1708          *
1709          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1710          * granted/converting queues. */
1711         if (local) {
1712                 if (*flags & LDLM_FL_BLOCK_CONV)
1713                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1714                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1715                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1716                 else
1717                         ldlm_grant_lock(lock, NULL);
1718                 GOTO(out, rc = ELDLM_OK);
1719 #ifdef HAVE_SERVER_SUPPORT
1720         } else if (*flags & LDLM_FL_RESENT) {
1721                 GOTO(out, rc = ELDLM_OK);
1722         } else if (*flags & LDLM_FL_REPLAY) {
1723                 if (*flags & LDLM_FL_BLOCK_CONV) {
1724                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1725                         GOTO(out, rc = ELDLM_OK);
1726                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1727                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1728                         GOTO(out, rc = ELDLM_OK);
1729                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1730                         ldlm_grant_lock(lock, NULL);
1731                         GOTO(out, rc = ELDLM_OK);
1732                 }
1733                 /* If no flags, fall through to normal enqueue path. */
1734         }
1735
1736         policy = ldlm_processing_policy_table[res->lr_type];
1737         policy(lock, flags, 1, &rc, NULL);
1738         GOTO(out, rc);
1739 #else
1740         } else {
1741                 CERROR("This is client-side-only module, cannot handle "
1742                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1743                 LBUG();
1744         }
1745 #endif
1746
1747 out:
1748         unlock_res_and_lock(lock);
1749         if (node)
1750                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1751         return rc;
1752 }
1753
1754 #ifdef HAVE_SERVER_SUPPORT
1755 /**
1756  * Iterate through all waiting locks on a given resource queue and attempt to
1757  * grant them.
1758  *
1759  * Must be called with resource lock held.
1760  */
1761 int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue,
1762                          cfs_list_t *work_list)
1763 {
1764         cfs_list_t *tmp, *pos;
1765         ldlm_processing_policy policy;
1766         __u64 flags;
1767         int rc = LDLM_ITER_CONTINUE;
1768         ldlm_error_t err;
1769         ENTRY;
1770
1771         check_res_locked(res);
1772
1773         policy = ldlm_processing_policy_table[res->lr_type];
1774         LASSERT(policy);
1775
1776         cfs_list_for_each_safe(tmp, pos, queue) {
1777                 struct ldlm_lock *pending;
1778                 pending = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1779
1780                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1781
1782                 flags = 0;
1783                 rc = policy(pending, &flags, 0, &err, work_list);
1784                 if (rc != LDLM_ITER_CONTINUE)
1785                         break;
1786         }
1787
1788         RETURN(rc);
1789 }
1790 #endif
1791
1792 /**
1793  * Process a call to blocking AST callback for a lock in ast_work list
1794  */
1795 static int
1796 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1797 {
1798         struct ldlm_cb_set_arg *arg = opaq;
1799         struct ldlm_lock_desc   d;
1800         int                     rc;
1801         struct ldlm_lock       *lock;
1802         ENTRY;
1803
1804         if (cfs_list_empty(arg->list))
1805                 RETURN(-ENOENT);
1806
1807         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1808
1809         /* nobody should touch l_bl_ast */
1810         lock_res_and_lock(lock);
1811         cfs_list_del_init(&lock->l_bl_ast);
1812
1813         LASSERT(ldlm_is_ast_sent(lock));
1814         LASSERT(lock->l_bl_ast_run == 0);
1815         LASSERT(lock->l_blocking_lock);
1816         lock->l_bl_ast_run++;
1817         unlock_res_and_lock(lock);
1818
1819         ldlm_lock2desc(lock->l_blocking_lock, &d);
1820
1821         rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1822         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1823         lock->l_blocking_lock = NULL;
1824         LDLM_LOCK_RELEASE(lock);
1825
1826         RETURN(rc);
1827 }
1828
1829 /**
1830  * Process a call to completion AST callback for a lock in ast_work list
1831  */
1832 static int
1833 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1834 {
1835         struct ldlm_cb_set_arg  *arg = opaq;
1836         int                      rc = 0;
1837         struct ldlm_lock        *lock;
1838         ldlm_completion_callback completion_callback;
1839         ENTRY;
1840
1841         if (cfs_list_empty(arg->list))
1842                 RETURN(-ENOENT);
1843
1844         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1845
1846         /* It's possible to receive a completion AST before we've set
1847          * the l_completion_ast pointer: either because the AST arrived
1848          * before the reply, or simply because there's a small race
1849          * window between receiving the reply and finishing the local
1850          * enqueue. (bug 842)
1851          *
1852          * This can't happen with the blocking_ast, however, because we
1853          * will never call the local blocking_ast until we drop our
1854          * reader/writer reference, which we won't do until we get the
1855          * reply and finish enqueueing. */
1856
1857         /* nobody should touch l_cp_ast */
1858         lock_res_and_lock(lock);
1859         cfs_list_del_init(&lock->l_cp_ast);
1860         LASSERT(ldlm_is_cp_reqd(lock));
1861         /* save l_completion_ast since it can be changed by
1862          * mds_intent_policy(), see bug 14225 */
1863         completion_callback = lock->l_completion_ast;
1864         ldlm_clear_cp_reqd(lock);
1865         unlock_res_and_lock(lock);
1866
1867         if (completion_callback != NULL)
1868                 rc = completion_callback(lock, 0, (void *)arg);
1869         LDLM_LOCK_RELEASE(lock);
1870
1871         RETURN(rc);
1872 }
1873
1874 /**
1875  * Process a call to revocation AST callback for a lock in ast_work list
1876  */
1877 static int
1878 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1879 {
1880         struct ldlm_cb_set_arg *arg = opaq;
1881         struct ldlm_lock_desc   desc;
1882         int                     rc;
1883         struct ldlm_lock       *lock;
1884         ENTRY;
1885
1886         if (cfs_list_empty(arg->list))
1887                 RETURN(-ENOENT);
1888
1889         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1890         cfs_list_del_init(&lock->l_rk_ast);
1891
1892         /* the desc just pretend to exclusive */
1893         ldlm_lock2desc(lock, &desc);
1894         desc.l_req_mode = LCK_EX;
1895         desc.l_granted_mode = 0;
1896
1897         rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1898         LDLM_LOCK_RELEASE(lock);
1899
1900         RETURN(rc);
1901 }
1902
1903 /**
1904  * Process a call to glimpse AST callback for a lock in ast_work list
1905  */
1906 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1907 {
1908         struct ldlm_cb_set_arg          *arg = opaq;
1909         struct ldlm_glimpse_work        *gl_work;
1910         struct ldlm_lock                *lock;
1911         int                              rc = 0;
1912         ENTRY;
1913
1914         if (cfs_list_empty(arg->list))
1915                 RETURN(-ENOENT);
1916
1917         gl_work = cfs_list_entry(arg->list->next, struct ldlm_glimpse_work,
1918                                  gl_list);
1919         cfs_list_del_init(&gl_work->gl_list);
1920
1921         lock = gl_work->gl_lock;
1922
1923         /* transfer the glimpse descriptor to ldlm_cb_set_arg */
1924         arg->gl_desc = gl_work->gl_desc;
1925
1926         /* invoke the actual glimpse callback */
1927         if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
1928                 rc = 1;
1929
1930         LDLM_LOCK_RELEASE(lock);
1931
1932         if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
1933                 OBD_FREE_PTR(gl_work);
1934
1935         RETURN(rc);
1936 }
1937
1938 /**
1939  * Process list of locks in need of ASTs being sent.
1940  *
1941  * Used on server to send multiple ASTs together instead of sending one by
1942  * one.
1943  */
1944 int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
1945                       ldlm_desc_ast_t ast_type)
1946 {
1947         struct ldlm_cb_set_arg *arg;
1948         set_producer_func       work_ast_lock;
1949         int                     rc;
1950
1951         if (cfs_list_empty(rpc_list))
1952                 RETURN(0);
1953
1954         OBD_ALLOC_PTR(arg);
1955         if (arg == NULL)
1956                 RETURN(-ENOMEM);
1957
1958         atomic_set(&arg->restart, 0);
1959         arg->list = rpc_list;
1960
1961         switch (ast_type) {
1962                 case LDLM_WORK_BL_AST:
1963                         arg->type = LDLM_BL_CALLBACK;
1964                         work_ast_lock = ldlm_work_bl_ast_lock;
1965                         break;
1966                 case LDLM_WORK_CP_AST:
1967                         arg->type = LDLM_CP_CALLBACK;
1968                         work_ast_lock = ldlm_work_cp_ast_lock;
1969                         break;
1970                 case LDLM_WORK_REVOKE_AST:
1971                         arg->type = LDLM_BL_CALLBACK;
1972                         work_ast_lock = ldlm_work_revoke_ast_lock;
1973                         break;
1974                 case LDLM_WORK_GL_AST:
1975                         arg->type = LDLM_GL_CALLBACK;
1976                         work_ast_lock = ldlm_work_gl_ast_lock;
1977                         break;
1978                 default:
1979                         LBUG();
1980         }
1981
1982         /* We create a ptlrpc request set with flow control extension.
1983          * This request set will use the work_ast_lock function to produce new
1984          * requests and will send a new request each time one completes in order
1985          * to keep the number of requests in flight to ns_max_parallel_ast */
1986         arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1987                                      work_ast_lock, arg);
1988         if (arg->set == NULL)
1989                 GOTO(out, rc = -ENOMEM);
1990
1991         ptlrpc_set_wait(arg->set);
1992         ptlrpc_set_destroy(arg->set);
1993
1994         rc = atomic_read(&arg->restart) ? -ERESTART : 0;
1995         GOTO(out, rc);
1996 out:
1997         OBD_FREE_PTR(arg);
1998         return rc;
1999 }
2000
2001 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
2002 {
2003         ldlm_reprocess_all(res);
2004         return LDLM_ITER_CONTINUE;
2005 }
2006
2007 static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2008                               cfs_hlist_node_t *hnode, void *arg)
2009 {
2010         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2011         int    rc;
2012
2013         rc = reprocess_one_queue(res, arg);
2014
2015         return rc == LDLM_ITER_STOP;
2016 }
2017
2018 /**
2019  * Iterate through all resources on a namespace attempting to grant waiting
2020  * locks.
2021  */
2022 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
2023 {
2024         ENTRY;
2025
2026         if (ns != NULL) {
2027                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2028                                          ldlm_reprocess_res, NULL);
2029         }
2030         EXIT;
2031 }
2032 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
2033
2034 /**
2035  * Try to grant all waiting locks on a resource.
2036  *
2037  * Calls ldlm_reprocess_queue on converting and waiting queues.
2038  *
2039  * Typically called after some resource locks are cancelled to see
2040  * if anything could be granted as a result of the cancellation.
2041  */
2042 void ldlm_reprocess_all(struct ldlm_resource *res)
2043 {
2044         CFS_LIST_HEAD(rpc_list);
2045
2046 #ifdef HAVE_SERVER_SUPPORT
2047         int rc;
2048         ENTRY;
2049         /* Local lock trees don't get reprocessed. */
2050         if (ns_is_client(ldlm_res_to_ns(res))) {
2051                 EXIT;
2052                 return;
2053         }
2054
2055 restart:
2056         lock_res(res);
2057         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
2058         if (rc == LDLM_ITER_CONTINUE)
2059                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
2060         unlock_res(res);
2061
2062         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
2063                                LDLM_WORK_CP_AST);
2064         if (rc == -ERESTART) {
2065                 LASSERT(cfs_list_empty(&rpc_list));
2066                 goto restart;
2067         }
2068 #else
2069         ENTRY;
2070         if (!ns_is_client(ldlm_res_to_ns(res))) {
2071                 CERROR("This is client-side-only module, cannot handle "
2072                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2073                 LBUG();
2074         }
2075 #endif
2076         EXIT;
2077 }
2078 EXPORT_SYMBOL(ldlm_reprocess_all);
2079
2080 /**
2081  * Helper function to call blocking AST for LDLM lock \a lock in a
2082  * "cancelling" mode.
2083  */
2084 void ldlm_cancel_callback(struct ldlm_lock *lock)
2085 {
2086         check_res_locked(lock->l_resource);
2087         if (!ldlm_is_cancel(lock)) {
2088                 ldlm_set_cancel(lock);
2089                 if (lock->l_blocking_ast) {
2090                         unlock_res_and_lock(lock);
2091                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
2092                                              LDLM_CB_CANCELING);
2093                         lock_res_and_lock(lock);
2094                 } else {
2095                         LDLM_DEBUG(lock, "no blocking ast");
2096                 }
2097         }
2098         ldlm_set_bl_done(lock);
2099 }
2100
2101 /**
2102  * Remove skiplist-enabled LDLM lock \a req from granted list
2103  */
2104 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
2105 {
2106         if (req->l_resource->lr_type != LDLM_PLAIN &&
2107             req->l_resource->lr_type != LDLM_IBITS)
2108                 return;
2109
2110         cfs_list_del_init(&req->l_sl_policy);
2111         cfs_list_del_init(&req->l_sl_mode);
2112 }
2113
2114 /**
2115  * Attempts to cancel LDLM lock \a lock that has no reader/writer references.
2116  */
2117 void ldlm_lock_cancel(struct ldlm_lock *lock)
2118 {
2119         struct ldlm_resource *res;
2120         struct ldlm_namespace *ns;
2121         ENTRY;
2122
2123         lock_res_and_lock(lock);
2124
2125         res = lock->l_resource;
2126         ns  = ldlm_res_to_ns(res);
2127
2128         /* Please do not, no matter how tempting, remove this LBUG without
2129          * talking to me first. -phik */
2130         if (lock->l_readers || lock->l_writers) {
2131                 LDLM_ERROR(lock, "lock still has references");
2132                 LBUG();
2133         }
2134
2135         if (ldlm_is_waited(lock))
2136                 ldlm_del_waiting_lock(lock);
2137
2138         /* Releases cancel callback. */
2139         ldlm_cancel_callback(lock);
2140
2141         /* Yes, second time, just in case it was added again while we were
2142          * running with no res lock in ldlm_cancel_callback */
2143         if (ldlm_is_waited(lock))
2144                 ldlm_del_waiting_lock(lock);
2145
2146         ldlm_resource_unlink_lock(lock);
2147         ldlm_lock_destroy_nolock(lock);
2148
2149         if (lock->l_granted_mode == lock->l_req_mode)
2150                 ldlm_pool_del(&ns->ns_pool, lock);
2151
2152         /* Make sure we will not be called again for same lock what is possible
2153          * if not to zero out lock->l_granted_mode */
2154         lock->l_granted_mode = LCK_MINMODE;
2155         unlock_res_and_lock(lock);
2156
2157         EXIT;
2158 }
2159 EXPORT_SYMBOL(ldlm_lock_cancel);
2160
2161 /**
2162  * Set opaque data into the lock that only makes sense to upper layer.
2163  */
2164 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
2165 {
2166         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2167         int rc = -EINVAL;
2168         ENTRY;
2169
2170         if (lock) {
2171                 if (lock->l_ast_data == NULL)
2172                         lock->l_ast_data = data;
2173                 if (lock->l_ast_data == data)
2174                         rc = 0;
2175                 LDLM_LOCK_PUT(lock);
2176         }
2177         RETURN(rc);
2178 }
2179 EXPORT_SYMBOL(ldlm_lock_set_data);
2180
2181 struct export_cl_data {
2182         struct obd_export       *ecl_exp;
2183         int                     ecl_loop;
2184 };
2185
2186 /**
2187  * Iterator function for ldlm_cancel_locks_for_export.
2188  * Cancels passed locks.
2189  */
2190 int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2191                                     cfs_hlist_node_t *hnode, void *data)
2192
2193 {
2194         struct export_cl_data   *ecl = (struct export_cl_data *)data;
2195         struct obd_export       *exp  = ecl->ecl_exp;
2196         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
2197         struct ldlm_resource *res;
2198
2199         res = ldlm_resource_getref(lock->l_resource);
2200         LDLM_LOCK_GET(lock);
2201
2202         LDLM_DEBUG(lock, "export %p", exp);
2203         ldlm_res_lvbo_update(res, NULL, 1);
2204         ldlm_lock_cancel(lock);
2205         ldlm_reprocess_all(res);
2206         ldlm_resource_putref(res);
2207         LDLM_LOCK_RELEASE(lock);
2208
2209         ecl->ecl_loop++;
2210         if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
2211                 CDEBUG(D_INFO,
2212                        "Cancel lock %p for export %p (loop %d), still have "
2213                        "%d locks left on hash table.\n",
2214                        lock, exp, ecl->ecl_loop,
2215                        atomic_read(&hs->hs_count));
2216         }
2217
2218         return 0;
2219 }
2220
2221 /**
2222  * Cancel all locks for given export.
2223  *
2224  * Typically called on client disconnection/eviction
2225  */
2226 void ldlm_cancel_locks_for_export(struct obd_export *exp)
2227 {
2228         struct export_cl_data   ecl = {
2229                 .ecl_exp        = exp,
2230                 .ecl_loop       = 0,
2231         };
2232
2233         cfs_hash_for_each_empty(exp->exp_lock_hash,
2234                                 ldlm_cancel_locks_for_export_cb, &ecl);
2235 }
2236
2237 /**
2238  * Downgrade an exclusive lock.
2239  *
2240  * A fast variant of ldlm_lock_convert for convertion of exclusive
2241  * locks. The convertion is always successful.
2242  * Used by Commit on Sharing (COS) code.
2243  *
2244  * \param lock A lock to convert
2245  * \param new_mode new lock mode
2246  */
2247 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
2248 {
2249         ENTRY;
2250
2251         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
2252         LASSERT(new_mode == LCK_COS);
2253
2254         lock_res_and_lock(lock);
2255         ldlm_resource_unlink_lock(lock);
2256         /*
2257          * Remove the lock from pool as it will be added again in
2258          * ldlm_grant_lock() called below.
2259          */
2260         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
2261
2262         lock->l_req_mode = new_mode;
2263         ldlm_grant_lock(lock, NULL);
2264         unlock_res_and_lock(lock);
2265         ldlm_reprocess_all(lock->l_resource);
2266
2267         EXIT;
2268 }
2269 EXPORT_SYMBOL(ldlm_lock_downgrade);
2270
2271 /**
2272  * Attempt to convert already granted lock to a different mode.
2273  *
2274  * While lock conversion is not currently used, future client-side
2275  * optimizations could take advantage of it to avoid discarding cached
2276  * pages on a file.
2277  */
2278 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
2279                                         __u32 *flags)
2280 {
2281         CFS_LIST_HEAD(rpc_list);
2282         struct ldlm_resource *res;
2283         struct ldlm_namespace *ns;
2284         int granted = 0;
2285 #ifdef HAVE_SERVER_SUPPORT
2286         int old_mode;
2287         struct sl_insert_point prev;
2288 #endif
2289         struct ldlm_interval *node;
2290         ENTRY;
2291
2292         /* Just return if mode is unchanged. */
2293         if (new_mode == lock->l_granted_mode) {
2294                 *flags |= LDLM_FL_BLOCK_GRANTED;
2295                 RETURN(lock->l_resource);
2296         }
2297
2298         /* I can't check the type of lock here because the bitlock of lock
2299          * is not held here, so do the allocation blindly. -jay */
2300         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
2301         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
2302                 RETURN(NULL);
2303
2304         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
2305                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
2306
2307         lock_res_and_lock(lock);
2308
2309         res = lock->l_resource;
2310         ns  = ldlm_res_to_ns(res);
2311
2312 #ifdef HAVE_SERVER_SUPPORT
2313         old_mode = lock->l_req_mode;
2314 #endif
2315         lock->l_req_mode = new_mode;
2316         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
2317 #ifdef HAVE_SERVER_SUPPORT
2318                 /* remember the lock position where the lock might be
2319                  * added back to the granted list later and also
2320                  * remember the join mode for skiplist fixing. */
2321                 prev.res_link = lock->l_res_link.prev;
2322                 prev.mode_link = lock->l_sl_mode.prev;
2323                 prev.policy_link = lock->l_sl_policy.prev;
2324 #endif
2325                 ldlm_resource_unlink_lock(lock);
2326         } else {
2327                 ldlm_resource_unlink_lock(lock);
2328                 if (res->lr_type == LDLM_EXTENT) {
2329                         /* FIXME: ugly code, I have to attach the lock to a
2330                          * interval node again since perhaps it will be granted
2331                          * soon */
2332                         CFS_INIT_LIST_HEAD(&node->li_group);
2333                         ldlm_interval_attach(node, lock);
2334                         node = NULL;
2335                 }
2336         }
2337
2338         /*
2339          * Remove old lock from the pool before adding the lock with new
2340          * mode below in ->policy()
2341          */
2342         ldlm_pool_del(&ns->ns_pool, lock);
2343
2344         /* If this is a local resource, put it on the appropriate list. */
2345         if (ns_is_client(ldlm_res_to_ns(res))) {
2346                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
2347                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
2348                 } else {
2349                         /* This should never happen, because of the way the
2350                          * server handles conversions. */
2351                         LDLM_ERROR(lock, "Erroneous flags %x on local lock\n",
2352                                    *flags);
2353                         LBUG();
2354
2355                         ldlm_grant_lock(lock, &rpc_list);
2356                         granted = 1;
2357                         /* FIXME: completion handling not with lr_lock held ! */
2358                         if (lock->l_completion_ast)
2359                                 lock->l_completion_ast(lock, 0, NULL);
2360                 }
2361 #ifdef HAVE_SERVER_SUPPORT
2362         } else {
2363                 int rc;
2364                 ldlm_error_t err;
2365                 __u64 pflags = 0;
2366                 ldlm_processing_policy policy;
2367                 policy = ldlm_processing_policy_table[res->lr_type];
2368                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
2369                 if (rc == LDLM_ITER_STOP) {
2370                         lock->l_req_mode = old_mode;
2371                         if (res->lr_type == LDLM_EXTENT)
2372                                 ldlm_extent_add_lock(res, lock);
2373                         else
2374                                 ldlm_granted_list_add_lock(lock, &prev);
2375
2376                         res = NULL;
2377                 } else {
2378                         *flags |= LDLM_FL_BLOCK_GRANTED;
2379                         granted = 1;
2380                 }
2381         }
2382 #else
2383         } else {
2384                 CERROR("This is client-side-only module, cannot handle "
2385                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2386                 LBUG();
2387         }
2388 #endif
2389         unlock_res_and_lock(lock);
2390
2391         if (granted)
2392                 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2393         if (node)
2394                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2395         RETURN(res);
2396 }
2397 EXPORT_SYMBOL(ldlm_lock_convert);
2398
2399 /**
2400  * Print lock with lock handle \a lockh description into debug log.
2401  *
2402  * Used when printing all locks on a resource for debug purposes.
2403  */
2404 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2405 {
2406         struct ldlm_lock *lock;
2407
2408         if (!((libcfs_debug | D_ERROR) & level))
2409                 return;
2410
2411         lock = ldlm_handle2lock(lockh);
2412         if (lock == NULL)
2413                 return;
2414
2415         LDLM_DEBUG_LIMIT(level, lock, "###");
2416
2417         LDLM_LOCK_PUT(lock);
2418 }
2419 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2420
2421 /**
2422  * Print lock information with custom message into debug log.
2423  * Helper function.
2424  */
2425 void _ldlm_lock_debug(struct ldlm_lock *lock,
2426                       struct libcfs_debug_msg_data *msgdata,
2427                       const char *fmt, ...)
2428 {
2429         va_list args;
2430         struct obd_export *exp = lock->l_export;
2431         struct ldlm_resource *resource = lock->l_resource;
2432         char *nid = "local";
2433
2434         va_start(args, fmt);
2435
2436         if (exp && exp->exp_connection) {
2437                 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2438         } else if (exp && exp->exp_obd != NULL) {
2439                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2440                 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2441         }
2442
2443         if (resource == NULL) {
2444                 libcfs_debug_vmsg2(msgdata, fmt, args,
2445                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2446                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2447                        "remote: "LPX64" expref: %d pid: %u timeout: %lu "
2448                        "lvb_type: %d\n",
2449                        lock,
2450                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2451                        lock->l_readers, lock->l_writers,
2452                        ldlm_lockname[lock->l_granted_mode],
2453                        ldlm_lockname[lock->l_req_mode],
2454                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2455                        exp ? atomic_read(&exp->exp_refcount) : -99,
2456                        lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2457                 va_end(args);
2458                 return;
2459         }
2460
2461         switch (resource->lr_type) {
2462         case LDLM_EXTENT:
2463                 libcfs_debug_vmsg2(msgdata, fmt, args,
2464                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2465                         "res: "DLDLMRES" rrc: %d type: %s ["LPU64"->"LPU64"] "
2466                         "(req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote: "
2467                         LPX64" expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2468                         ldlm_lock_to_ns_name(lock), lock,
2469                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2470                         lock->l_readers, lock->l_writers,
2471                         ldlm_lockname[lock->l_granted_mode],
2472                         ldlm_lockname[lock->l_req_mode],
2473                         PLDLMRES(resource),
2474                         atomic_read(&resource->lr_refcount),
2475                         ldlm_typename[resource->lr_type],
2476                         lock->l_policy_data.l_extent.start,
2477                         lock->l_policy_data.l_extent.end,
2478                         lock->l_req_extent.start, lock->l_req_extent.end,
2479                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2480                         exp ? atomic_read(&exp->exp_refcount) : -99,
2481                         lock->l_pid, lock->l_callback_timeout,
2482                         lock->l_lvb_type);
2483                 break;
2484
2485         case LDLM_FLOCK:
2486                 libcfs_debug_vmsg2(msgdata, fmt, args,
2487                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2488                         "res: "DLDLMRES" rrc: %d type: %s pid: %d "
2489                         "["LPU64"->"LPU64"] flags: "LPX64" nid: %s "
2490                         "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
2491                         ldlm_lock_to_ns_name(lock), lock,
2492                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2493                         lock->l_readers, lock->l_writers,
2494                         ldlm_lockname[lock->l_granted_mode],
2495                         ldlm_lockname[lock->l_req_mode],
2496                         PLDLMRES(resource),
2497                         atomic_read(&resource->lr_refcount),
2498                         ldlm_typename[resource->lr_type],
2499                         lock->l_policy_data.l_flock.pid,
2500                         lock->l_policy_data.l_flock.start,
2501                         lock->l_policy_data.l_flock.end,
2502                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2503                         exp ? atomic_read(&exp->exp_refcount) : -99,
2504                         lock->l_pid, lock->l_callback_timeout);
2505                 break;
2506
2507         case LDLM_IBITS:
2508                 libcfs_debug_vmsg2(msgdata, fmt, args,
2509                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2510                         "res: "DLDLMRES" bits "LPX64" rrc: %d type: %s "
2511                         "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2512                         "pid: %u timeout: %lu lvb_type: %d\n",
2513                         ldlm_lock_to_ns_name(lock),
2514                         lock, lock->l_handle.h_cookie,
2515                         atomic_read(&lock->l_refc),
2516                         lock->l_readers, lock->l_writers,
2517                         ldlm_lockname[lock->l_granted_mode],
2518                         ldlm_lockname[lock->l_req_mode],
2519                         PLDLMRES(resource),
2520                         lock->l_policy_data.l_inodebits.bits,
2521                         atomic_read(&resource->lr_refcount),
2522                         ldlm_typename[resource->lr_type],
2523                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2524                         exp ? atomic_read(&exp->exp_refcount) : -99,
2525                         lock->l_pid, lock->l_callback_timeout,
2526                         lock->l_lvb_type);
2527                 break;
2528
2529         default:
2530                 libcfs_debug_vmsg2(msgdata, fmt, args,
2531                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2532                         "res: "DLDLMRES" rrc: %d type: %s flags: "LPX64" "
2533                         "nid: %s remote: "LPX64" expref: %d pid: %u "
2534                         "timeout: %lu lvb_type: %d\n",
2535                         ldlm_lock_to_ns_name(lock),
2536                         lock, lock->l_handle.h_cookie,
2537                         atomic_read(&lock->l_refc),
2538                         lock->l_readers, lock->l_writers,
2539                         ldlm_lockname[lock->l_granted_mode],
2540                         ldlm_lockname[lock->l_req_mode],
2541                         PLDLMRES(resource),
2542                         atomic_read(&resource->lr_refcount),
2543                         ldlm_typename[resource->lr_type],
2544                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2545                         exp ? atomic_read(&exp->exp_refcount) : -99,
2546                         lock->l_pid, lock->l_callback_timeout,
2547                         lock->l_lvb_type);
2548                 break;
2549         }
2550         va_end(args);
2551 }
2552 EXPORT_SYMBOL(_ldlm_lock_debug);