Whamcloud - gitweb
3e132b49325319786d1b94b86add4e8f100c0771
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #include <libcfs/libcfs.h>
45 #include <obd_class.h>
46 #include "ldlm_internal.h"
47
48 /* lock types */
49 char *ldlm_lockname[] = {
50         [0] = "--",
51         [LCK_EX] = "EX",
52         [LCK_PW] = "PW",
53         [LCK_PR] = "PR",
54         [LCK_CW] = "CW",
55         [LCK_CR] = "CR",
56         [LCK_NL] = "NL",
57         [LCK_GROUP] = "GROUP",
58         [LCK_COS] = "COS"
59 };
60 EXPORT_SYMBOL(ldlm_lockname);
61
62 char *ldlm_typename[] = {
63         [LDLM_PLAIN] = "PLN",
64         [LDLM_EXTENT] = "EXT",
65         [LDLM_FLOCK] = "FLK",
66         [LDLM_IBITS] = "IBT",
67 };
68 EXPORT_SYMBOL(ldlm_typename);
69
70 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
71         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_wire_to_local,
72         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
73         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_wire18_to_local,
74         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_wire_to_local,
75 };
76
77 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
78         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_wire_to_local,
79         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
80         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_wire21_to_local,
81         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_wire_to_local,
82 };
83
84 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
85         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_local_to_wire,
86         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_local_to_wire,
87         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_local_to_wire,
88         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_local_to_wire,
89 };
90
91 /**
92  * Converts lock policy from local format to on the wire lock_desc format
93  */
94 void ldlm_convert_policy_to_wire(ldlm_type_t type,
95                                  const ldlm_policy_data_t *lpolicy,
96                                  ldlm_wire_policy_data_t *wpolicy)
97 {
98         ldlm_policy_local_to_wire_t convert;
99
100         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
101
102         convert(lpolicy, wpolicy);
103 }
104
105 /**
106  * Converts lock policy from on the wire lock_desc format to local format
107  */
108 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
109                                   const ldlm_wire_policy_data_t *wpolicy,
110                                   ldlm_policy_data_t *lpolicy)
111 {
112         ldlm_policy_wire_to_local_t convert;
113         int new_client;
114
115         /** some badness for 2.0.0 clients, but 2.0.0 isn't supported */
116         new_client = (exp_connect_flags(exp) & OBD_CONNECT_FULL20) != 0;
117         if (new_client)
118                 convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
119         else
120                 convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
121
122         convert(wpolicy, lpolicy);
123 }
124
125 char *ldlm_it2str(int it)
126 {
127         switch (it) {
128         case IT_OPEN:
129                 return "open";
130         case IT_CREAT:
131                 return "creat";
132         case (IT_OPEN | IT_CREAT):
133                 return "open|creat";
134         case IT_READDIR:
135                 return "readdir";
136         case IT_GETATTR:
137                 return "getattr";
138         case IT_LOOKUP:
139                 return "lookup";
140         case IT_UNLINK:
141                 return "unlink";
142         case IT_GETXATTR:
143                 return "getxattr";
144         case IT_LAYOUT:
145                 return "layout";
146         default:
147                 CERROR("Unknown intent %d\n", it);
148                 return "UNKNOWN";
149         }
150 }
151 EXPORT_SYMBOL(ldlm_it2str);
152
153 extern struct kmem_cache *ldlm_lock_slab;
154
155 #ifdef HAVE_SERVER_SUPPORT
156 static ldlm_processing_policy ldlm_processing_policy_table[] = {
157         [LDLM_PLAIN]    = ldlm_process_plain_lock,
158         [LDLM_EXTENT]   = ldlm_process_extent_lock,
159         [LDLM_FLOCK]    = ldlm_process_flock_lock,
160         [LDLM_IBITS]    = ldlm_process_inodebits_lock,
161 };
162
163 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
164 {
165         return ldlm_processing_policy_table[res->lr_type];
166 }
167 EXPORT_SYMBOL(ldlm_get_processing_policy);
168 #endif /* HAVE_SERVER_SUPPORT */
169
170 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
171 {
172         ns->ns_policy = arg;
173 }
174 EXPORT_SYMBOL(ldlm_register_intent);
175
176 /*
177  * REFCOUNTED LOCK OBJECTS
178  */
179
180
181 /**
182  * Get a reference on a lock.
183  *
184  * Lock refcounts, during creation:
185  *   - one special one for allocation, dec'd only once in destroy
186  *   - one for being a lock that's in-use
187  *   - one for the addref associated with a new lock
188  */
189 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
190 {
191         atomic_inc(&lock->l_refc);
192         return lock;
193 }
194 EXPORT_SYMBOL(ldlm_lock_get);
195
196 /**
197  * Release lock reference.
198  *
199  * Also frees the lock if it was last reference.
200  */
201 void ldlm_lock_put(struct ldlm_lock *lock)
202 {
203         ENTRY;
204
205         LASSERT(lock->l_resource != LP_POISON);
206         LASSERT(atomic_read(&lock->l_refc) > 0);
207         if (atomic_dec_and_test(&lock->l_refc)) {
208                 struct ldlm_resource *res;
209
210                 LDLM_DEBUG(lock,
211                            "final lock_put on destroyed lock, freeing it.");
212
213                 res = lock->l_resource;
214                 LASSERT(ldlm_is_destroyed(lock));
215                 LASSERT(list_empty(&lock->l_res_link));
216                 LASSERT(list_empty(&lock->l_pending_chain));
217
218                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
219                                      LDLM_NSS_LOCKS);
220                 lu_ref_del(&res->lr_reference, "lock", lock);
221                 ldlm_resource_putref(res);
222                 lock->l_resource = NULL;
223                 if (lock->l_export) {
224                         class_export_lock_put(lock->l_export, lock);
225                         lock->l_export = NULL;
226                 }
227
228                 if (lock->l_lvb_data != NULL)
229                         OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
230
231                 ldlm_interval_free(ldlm_interval_detach(lock));
232                 lu_ref_fini(&lock->l_reference);
233                 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
234         }
235
236         EXIT;
237 }
238 EXPORT_SYMBOL(ldlm_lock_put);
239
240 /**
241  * Removes LDLM lock \a lock from LRU. Assumes LRU is already locked.
242  */
243 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
244 {
245         int rc = 0;
246         if (!list_empty(&lock->l_lru)) {
247                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
248
249                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
250                 list_del_init(&lock->l_lru);
251                 LASSERT(ns->ns_nr_unused > 0);
252                 ns->ns_nr_unused--;
253                 rc = 1;
254         }
255         return rc;
256 }
257
258 /**
259  * Removes LDLM lock \a lock from LRU. Obtains the LRU lock first.
260  */
261 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
262 {
263         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
264         int rc;
265
266         ENTRY;
267         if (ldlm_is_ns_srv(lock)) {
268                 LASSERT(list_empty(&lock->l_lru));
269                 RETURN(0);
270         }
271
272         spin_lock(&ns->ns_lock);
273         rc = ldlm_lock_remove_from_lru_nolock(lock);
274         spin_unlock(&ns->ns_lock);
275         EXIT;
276         return rc;
277 }
278
279 /**
280  * Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked.
281  */
282 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
283 {
284         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
285
286         lock->l_last_used = cfs_time_current();
287         LASSERT(list_empty(&lock->l_lru));
288         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
289         list_add_tail(&lock->l_lru, &ns->ns_unused_list);
290         ldlm_clear_skipped(lock);
291         LASSERT(ns->ns_nr_unused >= 0);
292         ns->ns_nr_unused++;
293 }
294
295 /**
296  * Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks
297  * first.
298  */
299 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
300 {
301         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
302
303         ENTRY;
304         spin_lock(&ns->ns_lock);
305         ldlm_lock_add_to_lru_nolock(lock);
306         spin_unlock(&ns->ns_lock);
307         EXIT;
308 }
309
310 /**
311  * Moves LDLM lock \a lock that is already in namespace LRU to the tail of
312  * the LRU. Performs necessary LRU locking
313  */
314 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
315 {
316         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
317
318         ENTRY;
319         if (ldlm_is_ns_srv(lock)) {
320                 LASSERT(list_empty(&lock->l_lru));
321                 EXIT;
322                 return;
323         }
324
325         spin_lock(&ns->ns_lock);
326         if (!list_empty(&lock->l_lru)) {
327                 ldlm_lock_remove_from_lru_nolock(lock);
328                 ldlm_lock_add_to_lru_nolock(lock);
329         }
330         spin_unlock(&ns->ns_lock);
331         EXIT;
332 }
333
334 /**
335  * Helper to destroy a locked lock.
336  *
337  * Used by ldlm_lock_destroy and ldlm_lock_destroy_nolock
338  * Must be called with l_lock and lr_lock held.
339  *
340  * Does not actually free the lock data, but rather marks the lock as
341  * destroyed by setting l_destroyed field in the lock to 1.  Destroys a
342  * handle->lock association too, so that the lock can no longer be found
343  * and removes the lock from LRU list.  Actual lock freeing occurs when
344  * last lock reference goes away.
345  *
346  * Original comment (of some historical value):
347  * This used to have a 'strict' flag, which recovery would use to mark an
348  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
349  * shall explain why it's gone: with the new hash table scheme, once you call
350  * ldlm_lock_destroy, you can never drop your final references on this lock.
351  * Because it's not in the hash table anymore.  -phil
352  */
353 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
354 {
355         ENTRY;
356
357         if (lock->l_readers || lock->l_writers) {
358                 LDLM_ERROR(lock, "lock still has references");
359                 LBUG();
360         }
361
362         if (!list_empty(&lock->l_res_link)) {
363                 LDLM_ERROR(lock, "lock still on resource");
364                 LBUG();
365         }
366
367         if (ldlm_is_destroyed(lock)) {
368                 LASSERT(list_empty(&lock->l_lru));
369                 EXIT;
370                 return 0;
371         }
372         ldlm_set_destroyed(lock);
373
374         if (lock->l_export && lock->l_export->exp_lock_hash) {
375                 /* NB: it's safe to call cfs_hash_del() even lock isn't
376                  * in exp_lock_hash. */
377                 /* In the function below, .hs_keycmp resolves to
378                  * ldlm_export_lock_keycmp() */
379                 /* coverity[overrun-buffer-val] */
380                 cfs_hash_del(lock->l_export->exp_lock_hash,
381                              &lock->l_remote_handle, &lock->l_exp_hash);
382         }
383
384         ldlm_lock_remove_from_lru(lock);
385         class_handle_unhash(&lock->l_handle);
386
387 #if 0
388         /* Wake anyone waiting for this lock */
389         /* FIXME: I should probably add yet another flag, instead of using
390          * l_export to only call this on clients */
391         if (lock->l_export)
392                 class_export_put(lock->l_export);
393         lock->l_export = NULL;
394         if (lock->l_export && lock->l_completion_ast)
395                 lock->l_completion_ast(lock, 0);
396 #endif
397         EXIT;
398         return 1;
399 }
400
401 /**
402  * Destroys a LDLM lock \a lock. Performs necessary locking first.
403  */
404 void ldlm_lock_destroy(struct ldlm_lock *lock)
405 {
406         int first;
407         ENTRY;
408         lock_res_and_lock(lock);
409         first = ldlm_lock_destroy_internal(lock);
410         unlock_res_and_lock(lock);
411
412         /* drop reference from hashtable only for first destroy */
413         if (first) {
414                 lu_ref_del(&lock->l_reference, "hash", lock);
415                 LDLM_LOCK_RELEASE(lock);
416         }
417         EXIT;
418 }
419
420 /**
421  * Destroys a LDLM lock \a lock that is already locked.
422  */
423 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
424 {
425         int first;
426         ENTRY;
427         first = ldlm_lock_destroy_internal(lock);
428         /* drop reference from hashtable only for first destroy */
429         if (first) {
430                 lu_ref_del(&lock->l_reference, "hash", lock);
431                 LDLM_LOCK_RELEASE(lock);
432         }
433         EXIT;
434 }
435
436 /* this is called by portals_handle2object with the handle lock taken */
437 static void lock_handle_addref(void *lock)
438 {
439         LDLM_LOCK_GET((struct ldlm_lock *)lock);
440 }
441
442 static void lock_handle_free(void *lock, int size)
443 {
444         LASSERT(size == sizeof(struct ldlm_lock));
445         OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
446 }
447
448 struct portals_handle_ops lock_handle_ops = {
449         .hop_addref = lock_handle_addref,
450         .hop_free   = lock_handle_free,
451 };
452
453 /**
454  *
455  * Allocate and initialize new lock structure.
456  *
457  * usage: pass in a resource on which you have done ldlm_resource_get
458  *        new lock will take over the refcount.
459  * returns: lock with refcount 2 - one for current caller and one for remote
460  */
461 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
462 {
463         struct ldlm_lock *lock;
464         ENTRY;
465
466         if (resource == NULL)
467                 LBUG();
468
469         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, GFP_NOFS);
470         if (lock == NULL)
471                 RETURN(NULL);
472
473         spin_lock_init(&lock->l_lock);
474         lock->l_resource = resource;
475         lu_ref_add(&resource->lr_reference, "lock", lock);
476
477         atomic_set(&lock->l_refc, 2);
478         INIT_LIST_HEAD(&lock->l_res_link);
479         INIT_LIST_HEAD(&lock->l_lru);
480         INIT_LIST_HEAD(&lock->l_pending_chain);
481         INIT_LIST_HEAD(&lock->l_bl_ast);
482         INIT_LIST_HEAD(&lock->l_cp_ast);
483         INIT_LIST_HEAD(&lock->l_rk_ast);
484         init_waitqueue_head(&lock->l_waitq);
485         lock->l_blocking_lock = NULL;
486         INIT_LIST_HEAD(&lock->l_sl_mode);
487         INIT_LIST_HEAD(&lock->l_sl_policy);
488         INIT_HLIST_NODE(&lock->l_exp_hash);
489         INIT_HLIST_NODE(&lock->l_exp_flock_hash);
490
491         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
492                              LDLM_NSS_LOCKS);
493         INIT_LIST_HEAD(&lock->l_handle.h_link);
494         class_handle_hash(&lock->l_handle, &lock_handle_ops);
495
496         lu_ref_init(&lock->l_reference);
497         lu_ref_add(&lock->l_reference, "hash", lock);
498         lock->l_callback_timeout = 0;
499
500 #if LUSTRE_TRACKS_LOCK_EXP_REFS
501         INIT_LIST_HEAD(&lock->l_exp_refs_link);
502         lock->l_exp_refs_nr = 0;
503         lock->l_exp_refs_target = NULL;
504 #endif
505         INIT_LIST_HEAD(&lock->l_exp_list);
506
507         RETURN(lock);
508 }
509
510 /**
511  * Moves LDLM lock \a lock to another resource.
512  * This is used on client when server returns some other lock than requested
513  * (typically as a result of intent operation)
514  */
515 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
516                               const struct ldlm_res_id *new_resid)
517 {
518         struct ldlm_resource *oldres = lock->l_resource;
519         struct ldlm_resource *newres;
520         int type;
521         ENTRY;
522
523         LASSERT(ns_is_client(ns));
524
525         lock_res_and_lock(lock);
526         if (memcmp(new_resid, &lock->l_resource->lr_name,
527                    sizeof(lock->l_resource->lr_name)) == 0) {
528                 /* Nothing to do */
529                 unlock_res_and_lock(lock);
530                 RETURN(0);
531         }
532
533         LASSERT(new_resid->name[0] != 0);
534
535         /* This function assumes that the lock isn't on any lists */
536         LASSERT(list_empty(&lock->l_res_link));
537
538         type = oldres->lr_type;
539         unlock_res_and_lock(lock);
540
541         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
542         if (IS_ERR(newres))
543                 RETURN(PTR_ERR(newres));
544
545         lu_ref_add(&newres->lr_reference, "lock", lock);
546         /*
547          * To flip the lock from the old to the new resource, lock, oldres and
548          * newres have to be locked. Resource spin-locks are nested within
549          * lock->l_lock, and are taken in the memory address order to avoid
550          * dead-locks.
551          */
552         spin_lock(&lock->l_lock);
553         oldres = lock->l_resource;
554         if (oldres < newres) {
555                 lock_res(oldres);
556                 lock_res_nested(newres, LRT_NEW);
557         } else {
558                 lock_res(newres);
559                 lock_res_nested(oldres, LRT_NEW);
560         }
561         LASSERT(memcmp(new_resid, &oldres->lr_name,
562                        sizeof oldres->lr_name) != 0);
563         lock->l_resource = newres;
564         unlock_res(oldres);
565         unlock_res_and_lock(lock);
566
567         /* ...and the flowers are still standing! */
568         lu_ref_del(&oldres->lr_reference, "lock", lock);
569         ldlm_resource_putref(oldres);
570
571         RETURN(0);
572 }
573 EXPORT_SYMBOL(ldlm_lock_change_resource);
574
575 /** \defgroup ldlm_handles LDLM HANDLES
576  * Ways to get hold of locks without any addresses.
577  * @{
578  */
579
580 /**
581  * Fills in handle for LDLM lock \a lock into supplied \a lockh
582  * Does not take any references.
583  */
584 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
585 {
586         lockh->cookie = lock->l_handle.h_cookie;
587 }
588 EXPORT_SYMBOL(ldlm_lock2handle);
589
590 /**
591  * Obtain a lock reference by handle.
592  *
593  * if \a flags: atomically get the lock and set the flags.
594  *              Return NULL if flag already set
595  */
596 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
597                                      __u64 flags)
598 {
599         struct ldlm_lock *lock;
600         ENTRY;
601
602         LASSERT(handle);
603
604         lock = class_handle2object(handle->cookie, NULL);
605         if (lock == NULL)
606                 RETURN(NULL);
607
608         /* It's unlikely but possible that someone marked the lock as
609          * destroyed after we did handle2object on it */
610         if ((flags == 0) && !ldlm_is_destroyed(lock)) {
611                 lu_ref_add(&lock->l_reference, "handle", current);
612                 RETURN(lock);
613         }
614
615         lock_res_and_lock(lock);
616
617         LASSERT(lock->l_resource != NULL);
618
619         lu_ref_add_atomic(&lock->l_reference, "handle", current);
620         if (unlikely(ldlm_is_destroyed(lock))) {
621                 unlock_res_and_lock(lock);
622                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
623                 LDLM_LOCK_PUT(lock);
624                 RETURN(NULL);
625         }
626
627         /* If we're setting flags, make sure none of them are already set. */
628         if (flags != 0) {
629                 if ((lock->l_flags & flags) != 0) {
630                         unlock_res_and_lock(lock);
631                         LDLM_LOCK_PUT(lock);
632                         RETURN(NULL);
633                 }
634
635                 lock->l_flags |= flags;
636         }
637
638         unlock_res_and_lock(lock);
639         RETURN(lock);
640 }
641 EXPORT_SYMBOL(__ldlm_handle2lock);
642 /** @} ldlm_handles */
643
644 /**
645  * Fill in "on the wire" representation for given LDLM lock into supplied
646  * lock descriptor \a desc structure.
647  */
648 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
649 {
650         ldlm_res2desc(lock->l_resource, &desc->l_resource);
651         desc->l_req_mode = lock->l_req_mode;
652         desc->l_granted_mode = lock->l_granted_mode;
653         ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
654                                     &lock->l_policy_data,
655                                     &desc->l_policy_data);
656 }
657 EXPORT_SYMBOL(ldlm_lock2desc);
658
659 /**
660  * Add a lock to list of conflicting locks to send AST to.
661  *
662  * Only add if we have not sent a blocking AST to the lock yet.
663  */
664 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
665                            struct list_head *work_list)
666 {
667         if (!ldlm_is_ast_sent(lock)) {
668                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
669                 ldlm_set_ast_sent(lock);
670                 /* If the enqueuing client said so, tell the AST recipient to
671                  * discard dirty data, rather than writing back. */
672                 if (ldlm_is_ast_discard_data(new))
673                         ldlm_set_discard_data(lock);
674                 LASSERT(list_empty(&lock->l_bl_ast));
675                 list_add(&lock->l_bl_ast, work_list);
676                 LDLM_LOCK_GET(lock);
677                 LASSERT(lock->l_blocking_lock == NULL);
678                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
679         }
680 }
681
682 /**
683  * Add a lock to list of just granted locks to send completion AST to.
684  */
685 void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list)
686 {
687         if (!ldlm_is_cp_reqd(lock)) {
688                 ldlm_set_cp_reqd(lock);
689                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
690                 LASSERT(list_empty(&lock->l_cp_ast));
691                 list_add(&lock->l_cp_ast, work_list);
692                 LDLM_LOCK_GET(lock);
693         }
694 }
695
696 /**
697  * Aggregator function to add AST work items into a list. Determines
698  * what sort of an AST work needs to be done and calls the proper
699  * adding function.
700  * Must be called with lr_lock held.
701  */
702 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
703                             struct list_head *work_list)
704 {
705         ENTRY;
706         check_res_locked(lock->l_resource);
707         if (new)
708                 ldlm_add_bl_work_item(lock, new, work_list);
709         else
710                 ldlm_add_cp_work_item(lock, work_list);
711         EXIT;
712 }
713
714 /**
715  * Add specified reader/writer reference to LDLM lock with handle \a lockh.
716  * r/w reference type is determined by \a mode
717  * Calls ldlm_lock_addref_internal.
718  */
719 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
720 {
721         struct ldlm_lock *lock;
722
723         lock = ldlm_handle2lock(lockh);
724         LASSERT(lock != NULL);
725         ldlm_lock_addref_internal(lock, mode);
726         LDLM_LOCK_PUT(lock);
727 }
728 EXPORT_SYMBOL(ldlm_lock_addref);
729
730 /**
731  * Helper function.
732  * Add specified reader/writer reference to LDLM lock \a lock.
733  * r/w reference type is determined by \a mode
734  * Removes lock from LRU if it is there.
735  * Assumes the LDLM lock is already locked.
736  */
737 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
738 {
739         ldlm_lock_remove_from_lru(lock);
740         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
741                 lock->l_readers++;
742                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
743         }
744         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
745                 lock->l_writers++;
746                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
747         }
748         LDLM_LOCK_GET(lock);
749         lu_ref_add_atomic(&lock->l_reference, "user", lock);
750         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
751 }
752
753 /**
754  * Attempts to add reader/writer reference to a lock with handle \a lockh, and
755  * fails if lock is already LDLM_FL_CBPENDING or destroyed.
756  *
757  * \retval 0 success, lock was addref-ed
758  *
759  * \retval -EAGAIN lock is being canceled.
760  */
761 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
762 {
763         struct ldlm_lock *lock;
764         int               result;
765
766         result = -EAGAIN;
767         lock = ldlm_handle2lock(lockh);
768         if (lock != NULL) {
769                 lock_res_and_lock(lock);
770                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
771                     !ldlm_is_cbpending(lock)) {
772                         ldlm_lock_addref_internal_nolock(lock, mode);
773                         result = 0;
774                 }
775                 unlock_res_and_lock(lock);
776                 LDLM_LOCK_PUT(lock);
777         }
778         return result;
779 }
780 EXPORT_SYMBOL(ldlm_lock_addref_try);
781
782 /**
783  * Add specified reader/writer reference to LDLM lock \a lock.
784  * Locks LDLM lock and calls ldlm_lock_addref_internal_nolock to do the work.
785  * Only called for local locks.
786  */
787 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
788 {
789         lock_res_and_lock(lock);
790         ldlm_lock_addref_internal_nolock(lock, mode);
791         unlock_res_and_lock(lock);
792 }
793
794 /**
795  * Removes reader/writer reference for LDLM lock \a lock.
796  * Assumes LDLM lock is already locked.
797  * only called in ldlm_flock_destroy and for local locks.
798  * Does NOT add lock to LRU if no r/w references left to accomodate flock locks
799  * that cannot be placed in LRU.
800  */
801 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
802 {
803         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
804         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
805                 LASSERT(lock->l_readers > 0);
806                 lu_ref_del(&lock->l_reference, "reader", lock);
807                 lock->l_readers--;
808         }
809         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
810                 LASSERT(lock->l_writers > 0);
811                 lu_ref_del(&lock->l_reference, "writer", lock);
812                 lock->l_writers--;
813         }
814
815         lu_ref_del(&lock->l_reference, "user", lock);
816         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
817 }
818
819 /**
820  * Removes reader/writer reference for LDLM lock \a lock.
821  * Locks LDLM lock first.
822  * If the lock is determined to be client lock on a client and r/w refcount
823  * drops to zero and the lock is not blocked, the lock is added to LRU lock
824  * on the namespace.
825  * For blocked LDLM locks if r/w count drops to zero, blocking_ast is called.
826  */
827 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
828 {
829         struct ldlm_namespace *ns;
830         ENTRY;
831
832         lock_res_and_lock(lock);
833
834         ns = ldlm_lock_to_ns(lock);
835
836         ldlm_lock_decref_internal_nolock(lock, mode);
837
838         if (ldlm_is_local(lock) &&
839             !lock->l_readers && !lock->l_writers) {
840                 /* If this is a local lock on a server namespace and this was
841                  * the last reference, cancel the lock. */
842                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
843                 ldlm_set_cbpending(lock);
844         }
845
846         if (!lock->l_readers && !lock->l_writers &&
847             ldlm_is_cbpending(lock)) {
848                 /* If we received a blocked AST and this was the last reference,
849                  * run the callback. */
850                 if (ldlm_is_ns_srv(lock) && lock->l_export)
851                         CERROR("FL_CBPENDING set on non-local lock--just a "
852                                "warning\n");
853
854                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
855
856                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
857                 ldlm_lock_remove_from_lru(lock);
858                 unlock_res_and_lock(lock);
859
860                 if (ldlm_is_fail_loc(lock))
861                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
862
863                 if (ldlm_is_atomic_cb(lock) ||
864                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
865                         ldlm_handle_bl_callback(ns, NULL, lock);
866         } else if (ns_is_client(ns) &&
867                    !lock->l_readers && !lock->l_writers &&
868                    !ldlm_is_no_lru(lock) &&
869                    !ldlm_is_bl_ast(lock)) {
870
871                 LDLM_DEBUG(lock, "add lock into lru list");
872
873                 /* If this is a client-side namespace and this was the last
874                  * reference, put it on the LRU. */
875                 ldlm_lock_add_to_lru(lock);
876                 unlock_res_and_lock(lock);
877
878                 if (ldlm_is_fail_loc(lock))
879                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
880
881                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
882                  * are not supported by the server, otherwise, it is done on
883                  * enqueue. */
884                 if (!exp_connect_cancelset(lock->l_conn_export) &&
885                     !ns_connect_lru_resize(ns))
886                         ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
887         } else {
888                 LDLM_DEBUG(lock, "do not add lock into lru list");
889                 unlock_res_and_lock(lock);
890         }
891
892         EXIT;
893 }
894
895 /**
896  * Decrease reader/writer refcount for LDLM lock with handle \a lockh
897  */
898 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
899 {
900         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
901         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
902         ldlm_lock_decref_internal(lock, mode);
903         LDLM_LOCK_PUT(lock);
904 }
905 EXPORT_SYMBOL(ldlm_lock_decref);
906
907 /**
908  * Decrease reader/writer refcount for LDLM lock with handle
909  * \a lockh and mark it for subsequent cancellation once r/w refcount
910  * drops to zero instead of putting into LRU.
911  *
912  * Typical usage is for GROUP locks which we cannot allow to be cached.
913  */
914 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
915 {
916         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
917         ENTRY;
918
919         LASSERT(lock != NULL);
920
921         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
922         lock_res_and_lock(lock);
923         ldlm_set_cbpending(lock);
924         unlock_res_and_lock(lock);
925         ldlm_lock_decref_internal(lock, mode);
926         LDLM_LOCK_PUT(lock);
927 }
928 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
929
930 struct sl_insert_point {
931         struct list_head *res_link;
932         struct list_head *mode_link;
933         struct list_head *policy_link;
934 };
935
936 /**
937  * Finds a position to insert the new lock into granted lock list.
938  *
939  * Used for locks eligible for skiplist optimization.
940  *
941  * Parameters:
942  *      queue [input]:  the granted list where search acts on;
943  *      req [input]:    the lock whose position to be located;
944  *      prev [output]:  positions within 3 lists to insert @req to
945  * Return Value:
946  *      filled @prev
947  * NOTE: called by
948  *  - ldlm_grant_lock_with_skiplist
949  */
950 static void search_granted_lock(struct list_head *queue,
951                                 struct ldlm_lock *req,
952                                 struct sl_insert_point *prev)
953 {
954         struct list_head *tmp;
955         struct ldlm_lock *lock, *mode_end, *policy_end;
956         ENTRY;
957
958         list_for_each(tmp, queue) {
959                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
960
961                 mode_end = list_entry(lock->l_sl_mode.prev,
962                                           struct ldlm_lock, l_sl_mode);
963
964                 if (lock->l_req_mode != req->l_req_mode) {
965                         /* jump to last lock of mode group */
966                         tmp = &mode_end->l_res_link;
967                         continue;
968                 }
969
970                 /* suitable mode group is found */
971                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
972                         /* insert point is last lock of the mode group */
973                         prev->res_link = &mode_end->l_res_link;
974                         prev->mode_link = &mode_end->l_sl_mode;
975                         prev->policy_link = &req->l_sl_policy;
976                         EXIT;
977                         return;
978                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
979                         for (;;) {
980                                 policy_end =
981                                         list_entry(lock->l_sl_policy.prev,
982                                                        struct ldlm_lock,
983                                                        l_sl_policy);
984
985                                 if (lock->l_policy_data.l_inodebits.bits ==
986                                     req->l_policy_data.l_inodebits.bits) {
987                                         /* insert point is last lock of
988                                          * the policy group */
989                                         prev->res_link =
990                                                 &policy_end->l_res_link;
991                                         prev->mode_link =
992                                                 &policy_end->l_sl_mode;
993                                         prev->policy_link =
994                                                 &policy_end->l_sl_policy;
995                                         EXIT;
996                                         return;
997                                 }
998
999                                 if (policy_end == mode_end)
1000                                         /* done with mode group */
1001                                         break;
1002
1003                                 /* go to next policy group within mode group */
1004                                 tmp = policy_end->l_res_link.next;
1005                                 lock = list_entry(tmp, struct ldlm_lock,
1006                                                       l_res_link);
1007                         }  /* loop over policy groups within the mode group */
1008
1009                         /* insert point is last lock of the mode group,
1010                          * new policy group is started */
1011                         prev->res_link = &mode_end->l_res_link;
1012                         prev->mode_link = &mode_end->l_sl_mode;
1013                         prev->policy_link = &req->l_sl_policy;
1014                         EXIT;
1015                         return;
1016                 } else {
1017                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
1018                         LBUG();
1019                 }
1020         }
1021
1022         /* insert point is last lock on the queue,
1023          * new mode group and new policy group are started */
1024         prev->res_link = queue->prev;
1025         prev->mode_link = &req->l_sl_mode;
1026         prev->policy_link = &req->l_sl_policy;
1027         EXIT;
1028         return;
1029 }
1030
1031 /**
1032  * Add a lock into resource granted list after a position described by
1033  * \a prev.
1034  */
1035 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
1036                                        struct sl_insert_point *prev)
1037 {
1038         struct ldlm_resource *res = lock->l_resource;
1039         ENTRY;
1040
1041         check_res_locked(res);
1042
1043         ldlm_resource_dump(D_INFO, res);
1044         LDLM_DEBUG(lock, "About to add lock:");
1045
1046         if (ldlm_is_destroyed(lock)) {
1047                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
1048                 return;
1049         }
1050
1051         LASSERT(list_empty(&lock->l_res_link));
1052         LASSERT(list_empty(&lock->l_sl_mode));
1053         LASSERT(list_empty(&lock->l_sl_policy));
1054
1055         /*
1056          * lock->link == prev->link means lock is first starting the group.
1057          * Don't re-add to itself to suppress kernel warnings.
1058          */
1059         if (&lock->l_res_link != prev->res_link)
1060                 list_add(&lock->l_res_link, prev->res_link);
1061         if (&lock->l_sl_mode != prev->mode_link)
1062                 list_add(&lock->l_sl_mode, prev->mode_link);
1063         if (&lock->l_sl_policy != prev->policy_link)
1064                 list_add(&lock->l_sl_policy, prev->policy_link);
1065
1066         EXIT;
1067 }
1068
1069 /**
1070  * Add a lock to granted list on a resource maintaining skiplist
1071  * correctness.
1072  */
1073 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
1074 {
1075         struct sl_insert_point prev;
1076         ENTRY;
1077
1078         LASSERT(lock->l_req_mode == lock->l_granted_mode);
1079
1080         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
1081         ldlm_granted_list_add_lock(lock, &prev);
1082         EXIT;
1083 }
1084
1085 /**
1086  * Perform lock granting bookkeeping.
1087  *
1088  * Includes putting the lock into granted list and updating lock mode.
1089  * NOTE: called by
1090  *  - ldlm_lock_enqueue
1091  *  - ldlm_reprocess_queue
1092  *  - ldlm_lock_convert
1093  *
1094  * must be called with lr_lock held
1095  */
1096 void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
1097 {
1098         struct ldlm_resource *res = lock->l_resource;
1099         ENTRY;
1100
1101         check_res_locked(res);
1102
1103         lock->l_granted_mode = lock->l_req_mode;
1104
1105         if (work_list && lock->l_completion_ast != NULL)
1106                 ldlm_add_ast_work_item(lock, NULL, work_list);
1107
1108         /* We should not add locks to granted list in the following cases:
1109          * - this is an UNLOCK but not a real lock;
1110          * - this is a TEST lock;
1111          * - this is a F_CANCELLK lock (async flock has req_mode == 0)
1112          * - this is a deadlock (flock cannot be granted) */
1113         if (lock->l_req_mode == 0 ||
1114             lock->l_req_mode == LCK_NL ||
1115             ldlm_is_test_lock(lock) ||
1116             ldlm_is_flock_deadlock(lock))
1117                 RETURN_EXIT;
1118
1119         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1120                 ldlm_grant_lock_with_skiplist(lock);
1121         else if (res->lr_type == LDLM_EXTENT)
1122                 ldlm_extent_add_lock(res, lock);
1123         else
1124                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1125
1126         if (lock->l_granted_mode < res->lr_most_restr)
1127                 res->lr_most_restr = lock->l_granted_mode;
1128
1129         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1130         EXIT;
1131 }
1132
1133 /**
1134  * Search for a lock with given properties in a queue.
1135  *
1136  * \retval a referenced lock or NULL.  See the flag descriptions below, in the
1137  * comment above ldlm_lock_match
1138  */
1139 static struct ldlm_lock *search_queue(struct list_head *queue,
1140                                       ldlm_mode_t *mode,
1141                                       ldlm_policy_data_t *policy,
1142                                       struct ldlm_lock *old_lock,
1143                                       __u64 flags, int unref)
1144 {
1145         struct ldlm_lock *lock;
1146         struct list_head       *tmp;
1147
1148         list_for_each(tmp, queue) {
1149                 ldlm_mode_t match;
1150
1151                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1152
1153                 if (lock == old_lock)
1154                         break;
1155
1156                 /* Check if this lock can be matched.
1157                  * Used by LU-2919(exclusive open) for open lease lock */
1158                 if (ldlm_is_excl(lock))
1159                         continue;
1160
1161                 /* llite sometimes wants to match locks that will be
1162                  * canceled when their users drop, but we allow it to match
1163                  * if it passes in CBPENDING and the lock still has users.
1164                  * this is generally only going to be used by children
1165                  * whose parents already hold a lock so forward progress
1166                  * can still happen. */
1167                 if (ldlm_is_cbpending(lock) &&
1168                     !(flags & LDLM_FL_CBPENDING))
1169                         continue;
1170                 if (!unref && ldlm_is_cbpending(lock) &&
1171                     lock->l_readers == 0 && lock->l_writers == 0)
1172                         continue;
1173
1174                 if (!(lock->l_req_mode & *mode))
1175                         continue;
1176                 match = lock->l_req_mode;
1177
1178                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
1179                     (lock->l_policy_data.l_extent.start >
1180                      policy->l_extent.start ||
1181                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
1182                         continue;
1183
1184                 if (unlikely(match == LCK_GROUP) &&
1185                     lock->l_resource->lr_type == LDLM_EXTENT &&
1186                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1187                         continue;
1188
1189                 /* We match if we have existing lock with same or wider set
1190                    of bits. */
1191                 if (lock->l_resource->lr_type == LDLM_IBITS &&
1192                      ((lock->l_policy_data.l_inodebits.bits &
1193                       policy->l_inodebits.bits) !=
1194                       policy->l_inodebits.bits))
1195                         continue;
1196
1197                 if (!unref && LDLM_HAVE_MASK(lock, GONE))
1198                         continue;
1199
1200                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
1201                     !ldlm_is_local(lock))
1202                         continue;
1203
1204                 if (flags & LDLM_FL_TEST_LOCK) {
1205                         LDLM_LOCK_GET(lock);
1206                         ldlm_lock_touch_in_lru(lock);
1207                 } else {
1208                         ldlm_lock_addref_internal_nolock(lock, match);
1209                 }
1210                 *mode = match;
1211                 return lock;
1212         }
1213
1214         return NULL;
1215 }
1216
1217 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1218 {
1219         if ((lock->l_flags & LDLM_FL_FAIL_NOTIFIED) == 0) {
1220                 lock->l_flags |= LDLM_FL_FAIL_NOTIFIED;
1221                 wake_up_all(&lock->l_waitq);
1222         }
1223 }
1224 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1225
1226 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1227 {
1228         lock_res_and_lock(lock);
1229         ldlm_lock_fail_match_locked(lock);
1230         unlock_res_and_lock(lock);
1231 }
1232 EXPORT_SYMBOL(ldlm_lock_fail_match);
1233
1234 /**
1235  * Mark lock as "matchable" by OST.
1236  *
1237  * Used to prevent certain races in LOV/OSC where the lock is granted, but LVB
1238  * is not yet valid.
1239  * Assumes LDLM lock is already locked.
1240  */
1241 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1242 {
1243         ldlm_set_lvb_ready(lock);
1244         wake_up_all(&lock->l_waitq);
1245 }
1246 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
1247
1248 /**
1249  * Mark lock as "matchable" by OST.
1250  * Locks the lock and then \see ldlm_lock_allow_match_locked
1251  */
1252 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1253 {
1254         lock_res_and_lock(lock);
1255         ldlm_lock_allow_match_locked(lock);
1256         unlock_res_and_lock(lock);
1257 }
1258 EXPORT_SYMBOL(ldlm_lock_allow_match);
1259
1260 /**
1261  * Attempt to find a lock with specified properties.
1262  *
1263  * Typically returns a reference to matched lock unless LDLM_FL_TEST_LOCK is
1264  * set in \a flags
1265  *
1266  * Can be called in two ways:
1267  *
1268  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1269  * for a duplicate of.
1270  *
1271  * Otherwise, all of the fields must be filled in, to match against.
1272  *
1273  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1274  *     server (ie, connh is NULL)
1275  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1276  *     list will be considered
1277  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1278  *     to be canceled can still be matched as long as they still have reader
1279  *     or writer refernces
1280  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1281  *     just tell us if we would have matched.
1282  *
1283  * \retval 1 if it finds an already-existing lock that is compatible; in this
1284  * case, lockh is filled in with a addref()ed lock
1285  *
1286  * We also check security context, and if that fails we simply return 0 (to
1287  * keep caller code unchanged), the context failure will be discovered by
1288  * caller sometime later.
1289  */
1290 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
1291                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1292                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1293                             struct lustre_handle *lockh, int unref)
1294 {
1295         struct ldlm_resource *res;
1296         struct ldlm_lock *lock, *old_lock = NULL;
1297         int rc = 0;
1298         ENTRY;
1299
1300         if (ns == NULL) {
1301                 old_lock = ldlm_handle2lock(lockh);
1302                 LASSERT(old_lock);
1303
1304                 ns = ldlm_lock_to_ns(old_lock);
1305                 res_id = &old_lock->l_resource->lr_name;
1306                 type = old_lock->l_resource->lr_type;
1307                 mode = old_lock->l_req_mode;
1308         }
1309
1310         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1311         if (IS_ERR(res)) {
1312                 LASSERT(old_lock == NULL);
1313                 RETURN(0);
1314         }
1315
1316         LDLM_RESOURCE_ADDREF(res);
1317         lock_res(res);
1318
1319         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1320                             flags, unref);
1321         if (lock != NULL)
1322                 GOTO(out, rc = 1);
1323         if (flags & LDLM_FL_BLOCK_GRANTED)
1324                 GOTO(out, rc = 0);
1325         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1326                             flags, unref);
1327         if (lock != NULL)
1328                 GOTO(out, rc = 1);
1329         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1330                             flags, unref);
1331         if (lock != NULL)
1332                 GOTO(out, rc = 1);
1333
1334         EXIT;
1335  out:
1336         unlock_res(res);
1337         LDLM_RESOURCE_DELREF(res);
1338         ldlm_resource_putref(res);
1339
1340         if (lock) {
1341                 ldlm_lock2handle(lock, lockh);
1342                 if ((flags & LDLM_FL_LVB_READY) &&
1343                     (!ldlm_is_lvb_ready(lock))) {
1344                         __u64 wait_flags = LDLM_FL_LVB_READY |
1345                                 LDLM_FL_DESTROYED | LDLM_FL_FAIL_NOTIFIED;
1346                         struct l_wait_info lwi;
1347                         if (lock->l_completion_ast) {
1348                                 int err = lock->l_completion_ast(lock,
1349                                                           LDLM_FL_WAIT_NOREPROC,
1350                                                                  NULL);
1351                                 if (err) {
1352                                         if (flags & LDLM_FL_TEST_LOCK)
1353                                                 LDLM_LOCK_RELEASE(lock);
1354                                         else
1355                                                 ldlm_lock_decref_internal(lock,
1356                                                                           mode);
1357                                         rc = 0;
1358                                         goto out2;
1359                                 }
1360                         }
1361
1362                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1363                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1364
1365                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1366                         l_wait_event(lock->l_waitq,
1367                                      lock->l_flags & wait_flags,
1368                                      &lwi);
1369                         if (!ldlm_is_lvb_ready(lock)) {
1370                                 if (flags & LDLM_FL_TEST_LOCK)
1371                                         LDLM_LOCK_RELEASE(lock);
1372                                 else
1373                                         ldlm_lock_decref_internal(lock, mode);
1374                                 rc = 0;
1375                         }
1376                 }
1377         }
1378  out2:
1379         if (rc) {
1380                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1381                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1382                                 res_id->name[2] : policy->l_extent.start,
1383                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1384                                 res_id->name[3] : policy->l_extent.end);
1385
1386                 /* check user's security context */
1387                 if (lock->l_conn_export &&
1388                     sptlrpc_import_check_ctx(
1389                                 class_exp2cliimp(lock->l_conn_export))) {
1390                         if (!(flags & LDLM_FL_TEST_LOCK))
1391                                 ldlm_lock_decref_internal(lock, mode);
1392                         rc = 0;
1393                 }
1394
1395                 if (flags & LDLM_FL_TEST_LOCK)
1396                         LDLM_LOCK_RELEASE(lock);
1397
1398         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1399                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1400                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1401                                   type, mode, res_id->name[0], res_id->name[1],
1402                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1403                                         res_id->name[2] :policy->l_extent.start,
1404                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1405                                         res_id->name[3] : policy->l_extent.end);
1406         }
1407         if (old_lock)
1408                 LDLM_LOCK_PUT(old_lock);
1409
1410         return rc ? mode : 0;
1411 }
1412 EXPORT_SYMBOL(ldlm_lock_match);
1413
1414 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1415                                         __u64 *bits)
1416 {
1417         struct ldlm_lock *lock;
1418         ldlm_mode_t mode = 0;
1419         ENTRY;
1420
1421         lock = ldlm_handle2lock(lockh);
1422         if (lock != NULL) {
1423                 lock_res_and_lock(lock);
1424                 if (LDLM_HAVE_MASK(lock, GONE))
1425                         GOTO(out, mode);
1426
1427                 if (ldlm_is_cbpending(lock) &&
1428                     lock->l_readers == 0 && lock->l_writers == 0)
1429                         GOTO(out, mode);
1430
1431                 if (bits)
1432                         *bits = lock->l_policy_data.l_inodebits.bits;
1433                 mode = lock->l_granted_mode;
1434                 ldlm_lock_addref_internal_nolock(lock, mode);
1435         }
1436
1437         EXIT;
1438
1439 out:
1440         if (lock != NULL) {
1441                 unlock_res_and_lock(lock);
1442                 LDLM_LOCK_PUT(lock);
1443         }
1444         return mode;
1445 }
1446 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1447
1448 /** The caller must guarantee that the buffer is large enough. */
1449 int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
1450                   enum req_location loc, void *data, int size)
1451 {
1452         void *lvb;
1453         ENTRY;
1454
1455         LASSERT(data != NULL);
1456         LASSERT(size >= 0);
1457
1458         switch (lock->l_lvb_type) {
1459         case LVB_T_OST:
1460                 if (size == sizeof(struct ost_lvb)) {
1461                         if (loc == RCL_CLIENT)
1462                                 lvb = req_capsule_client_swab_get(pill,
1463                                                 &RMF_DLM_LVB,
1464                                                 lustre_swab_ost_lvb);
1465                         else
1466                                 lvb = req_capsule_server_swab_get(pill,
1467                                                 &RMF_DLM_LVB,
1468                                                 lustre_swab_ost_lvb);
1469                         if (unlikely(lvb == NULL)) {
1470                                 LDLM_ERROR(lock, "no LVB");
1471                                 RETURN(-EPROTO);
1472                         }
1473
1474                         memcpy(data, lvb, size);
1475                 } else if (size == sizeof(struct ost_lvb_v1)) {
1476                         struct ost_lvb *olvb = data;
1477
1478                         if (loc == RCL_CLIENT)
1479                                 lvb = req_capsule_client_swab_get(pill,
1480                                                 &RMF_DLM_LVB,
1481                                                 lustre_swab_ost_lvb_v1);
1482                         else
1483                                 lvb = req_capsule_server_sized_swab_get(pill,
1484                                                 &RMF_DLM_LVB, size,
1485                                                 lustre_swab_ost_lvb_v1);
1486                         if (unlikely(lvb == NULL)) {
1487                                 LDLM_ERROR(lock, "no LVB");
1488                                 RETURN(-EPROTO);
1489                         }
1490
1491                         memcpy(data, lvb, size);
1492                         olvb->lvb_mtime_ns = 0;
1493                         olvb->lvb_atime_ns = 0;
1494                         olvb->lvb_ctime_ns = 0;
1495                 } else {
1496                         LDLM_ERROR(lock, "Replied unexpected ost LVB size %d",
1497                                    size);
1498                         RETURN(-EINVAL);
1499                 }
1500                 break;
1501         case LVB_T_LQUOTA:
1502                 if (size == sizeof(struct lquota_lvb)) {
1503                         if (loc == RCL_CLIENT)
1504                                 lvb = req_capsule_client_swab_get(pill,
1505                                                 &RMF_DLM_LVB,
1506                                                 lustre_swab_lquota_lvb);
1507                         else
1508                                 lvb = req_capsule_server_swab_get(pill,
1509                                                 &RMF_DLM_LVB,
1510                                                 lustre_swab_lquota_lvb);
1511                         if (unlikely(lvb == NULL)) {
1512                                 LDLM_ERROR(lock, "no LVB");
1513                                 RETURN(-EPROTO);
1514                         }
1515
1516                         memcpy(data, lvb, size);
1517                 } else {
1518                         LDLM_ERROR(lock, "Replied unexpected lquota LVB size %d",
1519                                    size);
1520                         RETURN(-EINVAL);
1521                 }
1522                 break;
1523         case LVB_T_LAYOUT:
1524                 if (size == 0)
1525                         break;
1526
1527                 if (loc == RCL_CLIENT)
1528                         lvb = req_capsule_client_get(pill, &RMF_DLM_LVB);
1529                 else
1530                         lvb = req_capsule_server_get(pill, &RMF_DLM_LVB);
1531                 if (unlikely(lvb == NULL)) {
1532                         LDLM_ERROR(lock, "no LVB");
1533                         RETURN(-EPROTO);
1534                 }
1535
1536                 memcpy(data, lvb, size);
1537                 break;
1538         default:
1539                 LDLM_ERROR(lock, "Unknown LVB type: %d\n", lock->l_lvb_type);
1540                 libcfs_debug_dumpstack(NULL);
1541                 RETURN(-EINVAL);
1542         }
1543
1544         RETURN(0);
1545 }
1546
1547 /**
1548  * Create and fill in new LDLM lock with specified properties.
1549  * Returns a referenced lock
1550  */
1551 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1552                                    const struct ldlm_res_id *res_id,
1553                                    ldlm_type_t type,
1554                                    ldlm_mode_t mode,
1555                                    const struct ldlm_callback_suite *cbs,
1556                                    void *data, __u32 lvb_len,
1557                                    enum lvb_type lvb_type)
1558 {
1559         struct ldlm_lock        *lock;
1560         struct ldlm_resource    *res;
1561         int                     rc;
1562         ENTRY;
1563
1564         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1565         if (IS_ERR(res))
1566                 RETURN(ERR_CAST(res));
1567
1568         lock = ldlm_lock_new(res);
1569         if (lock == NULL)
1570                 RETURN(ERR_PTR(-ENOMEM));
1571
1572         lock->l_req_mode = mode;
1573         lock->l_ast_data = data;
1574         lock->l_pid = current_pid();
1575         if (ns_is_server(ns))
1576                 ldlm_set_ns_srv(lock);
1577         if (cbs) {
1578                 lock->l_blocking_ast = cbs->lcs_blocking;
1579                 lock->l_completion_ast = cbs->lcs_completion;
1580                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1581         }
1582
1583         lock->l_tree_node = NULL;
1584         /* if this is the extent lock, allocate the interval tree node */
1585         if (type == LDLM_EXTENT)
1586                 if (ldlm_interval_alloc(lock) == NULL)
1587                         GOTO(out, rc = -ENOMEM);
1588
1589         if (lvb_len) {
1590                 lock->l_lvb_len = lvb_len;
1591                 OBD_ALLOC_LARGE(lock->l_lvb_data, lvb_len);
1592                 if (lock->l_lvb_data == NULL)
1593                         GOTO(out, rc = -ENOMEM);
1594         }
1595
1596         lock->l_lvb_type = lvb_type;
1597         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1598                 GOTO(out, rc = -ENOENT);
1599
1600         RETURN(lock);
1601
1602 out:
1603         ldlm_lock_destroy(lock);
1604         LDLM_LOCK_RELEASE(lock);
1605         RETURN(ERR_PTR(rc));
1606 }
1607
1608 /**
1609  * Enqueue (request) a lock.
1610  *
1611  * Does not block. As a result of enqueue the lock would be put
1612  * into granted or waiting list.
1613  *
1614  * If namespace has intent policy sent and the lock has LDLM_FL_HAS_INTENT flag
1615  * set, skip all the enqueueing and delegate lock processing to intent policy
1616  * function.
1617  */
1618 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1619                                struct ldlm_lock **lockp,
1620                                void *cookie, __u64 *flags)
1621 {
1622         struct ldlm_lock *lock = *lockp;
1623         struct ldlm_resource *res = lock->l_resource;
1624         int local = ns_is_client(ldlm_res_to_ns(res));
1625 #ifdef HAVE_SERVER_SUPPORT
1626         ldlm_processing_policy policy;
1627 #endif
1628         ldlm_error_t rc = ELDLM_OK;
1629         struct ldlm_interval *node = NULL;
1630         ENTRY;
1631
1632         lock->l_last_activity = cfs_time_current_sec();
1633         /* policies are not executed on the client or during replay */
1634         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1635             && !local && ns->ns_policy) {
1636                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1637                                    NULL);
1638                 if (rc == ELDLM_LOCK_REPLACED) {
1639                         /* The lock that was returned has already been granted,
1640                          * and placed into lockp.  If it's not the same as the
1641                          * one we passed in, then destroy the old one and our
1642                          * work here is done. */
1643                         if (lock != *lockp) {
1644                                 ldlm_lock_destroy(lock);
1645                                 LDLM_LOCK_RELEASE(lock);
1646                         }
1647                         *flags |= LDLM_FL_LOCK_CHANGED;
1648                         RETURN(0);
1649                 } else if (rc != ELDLM_OK ||
1650                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1651                         ldlm_lock_destroy(lock);
1652                         RETURN(rc);
1653                 }
1654         }
1655
1656         if (*flags & LDLM_FL_RESENT)
1657                 RETURN(ELDLM_OK);
1658
1659         /* For a replaying lock, it might be already in granted list. So
1660          * unlinking the lock will cause the interval node to be freed, we
1661          * have to allocate the interval node early otherwise we can't regrant
1662          * this lock in the future. - jay */
1663         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1664                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
1665
1666         lock_res_and_lock(lock);
1667         if (local && lock->l_req_mode == lock->l_granted_mode) {
1668                 /* The server returned a blocked lock, but it was granted
1669                  * before we got a chance to actually enqueue it.  We don't
1670                  * need to do anything else. */
1671                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1672                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1673                 GOTO(out, rc = ELDLM_OK);
1674         }
1675
1676         ldlm_resource_unlink_lock(lock);
1677         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1678                 if (node == NULL) {
1679                         ldlm_lock_destroy_nolock(lock);
1680                         GOTO(out, rc = -ENOMEM);
1681                 }
1682
1683                 INIT_LIST_HEAD(&node->li_group);
1684                 ldlm_interval_attach(node, lock);
1685                 node = NULL;
1686         }
1687
1688         /* Some flags from the enqueue want to make it into the AST, via the
1689          * lock's l_flags. */
1690         if (*flags & LDLM_FL_AST_DISCARD_DATA)
1691                 ldlm_set_ast_discard_data(lock);
1692         if (*flags & LDLM_FL_TEST_LOCK)
1693                 ldlm_set_test_lock(lock);
1694
1695         /* This distinction between local lock trees is very important; a client
1696          * namespace only has information about locks taken by that client, and
1697          * thus doesn't have enough information to decide for itself if it can
1698          * be granted (below).  In this case, we do exactly what the server
1699          * tells us to do, as dictated by the 'flags'.
1700          *
1701          * We do exactly the same thing during recovery, when the server is
1702          * more or less trusting the clients not to lie.
1703          *
1704          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1705          * granted/converting queues. */
1706         if (local) {
1707                 if (*flags & LDLM_FL_BLOCK_CONV)
1708                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1709                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1710                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1711                 else
1712                         ldlm_grant_lock(lock, NULL);
1713                 GOTO(out, rc = ELDLM_OK);
1714 #ifdef HAVE_SERVER_SUPPORT
1715         } else if (*flags & LDLM_FL_REPLAY) {
1716                 if (*flags & LDLM_FL_BLOCK_CONV) {
1717                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1718                         GOTO(out, rc = ELDLM_OK);
1719                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1720                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1721                         GOTO(out, rc = ELDLM_OK);
1722                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1723                         ldlm_grant_lock(lock, NULL);
1724                         GOTO(out, rc = ELDLM_OK);
1725                 }
1726                 /* If no flags, fall through to normal enqueue path. */
1727         }
1728
1729         policy = ldlm_processing_policy_table[res->lr_type];
1730         policy(lock, flags, 1, &rc, NULL);
1731         GOTO(out, rc);
1732 #else
1733         } else {
1734                 CERROR("This is client-side-only module, cannot handle "
1735                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1736                 LBUG();
1737         }
1738 #endif
1739
1740 out:
1741         unlock_res_and_lock(lock);
1742         if (node)
1743                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1744         return rc;
1745 }
1746
1747 #ifdef HAVE_SERVER_SUPPORT
1748 /**
1749  * Iterate through all waiting locks on a given resource queue and attempt to
1750  * grant them.
1751  *
1752  * Must be called with resource lock held.
1753  */
1754 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
1755                          struct list_head *work_list)
1756 {
1757         struct list_head *tmp, *pos;
1758         ldlm_processing_policy policy;
1759         __u64 flags;
1760         int rc = LDLM_ITER_CONTINUE;
1761         ldlm_error_t err;
1762         ENTRY;
1763
1764         check_res_locked(res);
1765
1766         policy = ldlm_processing_policy_table[res->lr_type];
1767         LASSERT(policy);
1768
1769         list_for_each_safe(tmp, pos, queue) {
1770                 struct ldlm_lock *pending;
1771                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
1772
1773                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1774
1775                 flags = 0;
1776                 rc = policy(pending, &flags, 0, &err, work_list);
1777                 if (rc != LDLM_ITER_CONTINUE)
1778                         break;
1779         }
1780
1781         RETURN(rc);
1782 }
1783 #endif
1784
1785 /**
1786  * Process a call to blocking AST callback for a lock in ast_work list
1787  */
1788 static int
1789 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1790 {
1791         struct ldlm_cb_set_arg *arg = opaq;
1792         struct ldlm_lock_desc   d;
1793         int                     rc;
1794         struct ldlm_lock       *lock;
1795         ENTRY;
1796
1797         if (list_empty(arg->list))
1798                 RETURN(-ENOENT);
1799
1800         lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1801
1802         /* nobody should touch l_bl_ast */
1803         lock_res_and_lock(lock);
1804         list_del_init(&lock->l_bl_ast);
1805
1806         LASSERT(ldlm_is_ast_sent(lock));
1807         LASSERT(lock->l_bl_ast_run == 0);
1808         LASSERT(lock->l_blocking_lock);
1809         lock->l_bl_ast_run++;
1810         unlock_res_and_lock(lock);
1811
1812         ldlm_lock2desc(lock->l_blocking_lock, &d);
1813
1814         rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1815         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1816         lock->l_blocking_lock = NULL;
1817         LDLM_LOCK_RELEASE(lock);
1818
1819         RETURN(rc);
1820 }
1821
1822 /**
1823  * Process a call to completion AST callback for a lock in ast_work list
1824  */
1825 static int
1826 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1827 {
1828         struct ldlm_cb_set_arg  *arg = opaq;
1829         int                      rc = 0;
1830         struct ldlm_lock        *lock;
1831         ldlm_completion_callback completion_callback;
1832         ENTRY;
1833
1834         if (list_empty(arg->list))
1835                 RETURN(-ENOENT);
1836
1837         lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1838
1839         /* It's possible to receive a completion AST before we've set
1840          * the l_completion_ast pointer: either because the AST arrived
1841          * before the reply, or simply because there's a small race
1842          * window between receiving the reply and finishing the local
1843          * enqueue. (bug 842)
1844          *
1845          * This can't happen with the blocking_ast, however, because we
1846          * will never call the local blocking_ast until we drop our
1847          * reader/writer reference, which we won't do until we get the
1848          * reply and finish enqueueing. */
1849
1850         /* nobody should touch l_cp_ast */
1851         lock_res_and_lock(lock);
1852         list_del_init(&lock->l_cp_ast);
1853         LASSERT(ldlm_is_cp_reqd(lock));
1854         /* save l_completion_ast since it can be changed by
1855          * mds_intent_policy(), see bug 14225 */
1856         completion_callback = lock->l_completion_ast;
1857         ldlm_clear_cp_reqd(lock);
1858         unlock_res_and_lock(lock);
1859
1860         if (completion_callback != NULL)
1861                 rc = completion_callback(lock, 0, (void *)arg);
1862         LDLM_LOCK_RELEASE(lock);
1863
1864         RETURN(rc);
1865 }
1866
1867 /**
1868  * Process a call to revocation AST callback for a lock in ast_work list
1869  */
1870 static int
1871 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1872 {
1873         struct ldlm_cb_set_arg *arg = opaq;
1874         struct ldlm_lock_desc   desc;
1875         int                     rc;
1876         struct ldlm_lock       *lock;
1877         ENTRY;
1878
1879         if (list_empty(arg->list))
1880                 RETURN(-ENOENT);
1881
1882         lock = list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1883         list_del_init(&lock->l_rk_ast);
1884
1885         /* the desc just pretend to exclusive */
1886         ldlm_lock2desc(lock, &desc);
1887         desc.l_req_mode = LCK_EX;
1888         desc.l_granted_mode = 0;
1889
1890         rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1891         LDLM_LOCK_RELEASE(lock);
1892
1893         RETURN(rc);
1894 }
1895
1896 /**
1897  * Process a call to glimpse AST callback for a lock in ast_work list
1898  */
1899 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1900 {
1901         struct ldlm_cb_set_arg          *arg = opaq;
1902         struct ldlm_glimpse_work        *gl_work;
1903         struct ldlm_lock                *lock;
1904         int                              rc = 0;
1905         ENTRY;
1906
1907         if (list_empty(arg->list))
1908                 RETURN(-ENOENT);
1909
1910         gl_work = list_entry(arg->list->next, struct ldlm_glimpse_work,
1911                                  gl_list);
1912         list_del_init(&gl_work->gl_list);
1913
1914         lock = gl_work->gl_lock;
1915
1916         /* transfer the glimpse descriptor to ldlm_cb_set_arg */
1917         arg->gl_desc = gl_work->gl_desc;
1918
1919         /* invoke the actual glimpse callback */
1920         if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
1921                 rc = 1;
1922
1923         LDLM_LOCK_RELEASE(lock);
1924
1925         if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
1926                 OBD_FREE_PTR(gl_work);
1927
1928         RETURN(rc);
1929 }
1930
1931 /**
1932  * Process list of locks in need of ASTs being sent.
1933  *
1934  * Used on server to send multiple ASTs together instead of sending one by
1935  * one.
1936  */
1937 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
1938                       ldlm_desc_ast_t ast_type)
1939 {
1940         struct ldlm_cb_set_arg *arg;
1941         set_producer_func       work_ast_lock;
1942         int                     rc;
1943
1944         if (list_empty(rpc_list))
1945                 RETURN(0);
1946
1947         OBD_ALLOC_PTR(arg);
1948         if (arg == NULL)
1949                 RETURN(-ENOMEM);
1950
1951         atomic_set(&arg->restart, 0);
1952         arg->list = rpc_list;
1953
1954         switch (ast_type) {
1955                 case LDLM_WORK_BL_AST:
1956                         arg->type = LDLM_BL_CALLBACK;
1957                         work_ast_lock = ldlm_work_bl_ast_lock;
1958                         break;
1959                 case LDLM_WORK_CP_AST:
1960                         arg->type = LDLM_CP_CALLBACK;
1961                         work_ast_lock = ldlm_work_cp_ast_lock;
1962                         break;
1963                 case LDLM_WORK_REVOKE_AST:
1964                         arg->type = LDLM_BL_CALLBACK;
1965                         work_ast_lock = ldlm_work_revoke_ast_lock;
1966                         break;
1967                 case LDLM_WORK_GL_AST:
1968                         arg->type = LDLM_GL_CALLBACK;
1969                         work_ast_lock = ldlm_work_gl_ast_lock;
1970                         break;
1971                 default:
1972                         LBUG();
1973         }
1974
1975         /* We create a ptlrpc request set with flow control extension.
1976          * This request set will use the work_ast_lock function to produce new
1977          * requests and will send a new request each time one completes in order
1978          * to keep the number of requests in flight to ns_max_parallel_ast */
1979         arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1980                                      work_ast_lock, arg);
1981         if (arg->set == NULL)
1982                 GOTO(out, rc = -ENOMEM);
1983
1984         ptlrpc_set_wait(arg->set);
1985         ptlrpc_set_destroy(arg->set);
1986
1987         rc = atomic_read(&arg->restart) ? -ERESTART : 0;
1988         GOTO(out, rc);
1989 out:
1990         OBD_FREE_PTR(arg);
1991         return rc;
1992 }
1993
1994 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
1995 {
1996         ldlm_reprocess_all(res);
1997         return LDLM_ITER_CONTINUE;
1998 }
1999
2000 static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2001                               struct hlist_node *hnode, void *arg)
2002 {
2003         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2004         int    rc;
2005
2006         rc = reprocess_one_queue(res, arg);
2007
2008         return rc == LDLM_ITER_STOP;
2009 }
2010
2011 /**
2012  * Iterate through all resources on a namespace attempting to grant waiting
2013  * locks.
2014  */
2015 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
2016 {
2017         ENTRY;
2018
2019         if (ns != NULL) {
2020                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2021                                          ldlm_reprocess_res, NULL);
2022         }
2023         EXIT;
2024 }
2025 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
2026
2027 /**
2028  * Try to grant all waiting locks on a resource.
2029  *
2030  * Calls ldlm_reprocess_queue on converting and waiting queues.
2031  *
2032  * Typically called after some resource locks are cancelled to see
2033  * if anything could be granted as a result of the cancellation.
2034  */
2035 void ldlm_reprocess_all(struct ldlm_resource *res)
2036 {
2037         struct list_head rpc_list;
2038 #ifdef HAVE_SERVER_SUPPORT
2039         int rc;
2040         ENTRY;
2041
2042         INIT_LIST_HEAD(&rpc_list);
2043         /* Local lock trees don't get reprocessed. */
2044         if (ns_is_client(ldlm_res_to_ns(res))) {
2045                 EXIT;
2046                 return;
2047         }
2048
2049 restart:
2050         lock_res(res);
2051         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
2052         if (rc == LDLM_ITER_CONTINUE)
2053                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
2054         unlock_res(res);
2055
2056         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
2057                                LDLM_WORK_CP_AST);
2058         if (rc == -ERESTART) {
2059                 LASSERT(list_empty(&rpc_list));
2060                 goto restart;
2061         }
2062 #else
2063         ENTRY;
2064
2065         INIT_LIST_HEAD(&rpc_list);
2066         if (!ns_is_client(ldlm_res_to_ns(res))) {
2067                 CERROR("This is client-side-only module, cannot handle "
2068                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2069                 LBUG();
2070         }
2071 #endif
2072         EXIT;
2073 }
2074 EXPORT_SYMBOL(ldlm_reprocess_all);
2075
2076 /**
2077  * Helper function to call blocking AST for LDLM lock \a lock in a
2078  * "cancelling" mode.
2079  */
2080 void ldlm_cancel_callback(struct ldlm_lock *lock)
2081 {
2082         check_res_locked(lock->l_resource);
2083         if (!ldlm_is_cancel(lock)) {
2084                 ldlm_set_cancel(lock);
2085                 if (lock->l_blocking_ast) {
2086                         unlock_res_and_lock(lock);
2087                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
2088                                              LDLM_CB_CANCELING);
2089                         lock_res_and_lock(lock);
2090                 } else {
2091                         LDLM_DEBUG(lock, "no blocking ast");
2092                 }
2093         }
2094         ldlm_set_bl_done(lock);
2095 }
2096
2097 /**
2098  * Remove skiplist-enabled LDLM lock \a req from granted list
2099  */
2100 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
2101 {
2102         if (req->l_resource->lr_type != LDLM_PLAIN &&
2103             req->l_resource->lr_type != LDLM_IBITS)
2104                 return;
2105
2106         list_del_init(&req->l_sl_policy);
2107         list_del_init(&req->l_sl_mode);
2108 }
2109
2110 /**
2111  * Attempts to cancel LDLM lock \a lock that has no reader/writer references.
2112  */
2113 void ldlm_lock_cancel(struct ldlm_lock *lock)
2114 {
2115         struct ldlm_resource *res;
2116         struct ldlm_namespace *ns;
2117         ENTRY;
2118
2119         lock_res_and_lock(lock);
2120
2121         res = lock->l_resource;
2122         ns  = ldlm_res_to_ns(res);
2123
2124         /* Please do not, no matter how tempting, remove this LBUG without
2125          * talking to me first. -phik */
2126         if (lock->l_readers || lock->l_writers) {
2127                 LDLM_ERROR(lock, "lock still has references");
2128                 LBUG();
2129         }
2130
2131         if (ldlm_is_waited(lock))
2132                 ldlm_del_waiting_lock(lock);
2133
2134         /* Releases cancel callback. */
2135         ldlm_cancel_callback(lock);
2136
2137         /* Yes, second time, just in case it was added again while we were
2138          * running with no res lock in ldlm_cancel_callback */
2139         if (ldlm_is_waited(lock))
2140                 ldlm_del_waiting_lock(lock);
2141
2142         ldlm_resource_unlink_lock(lock);
2143         ldlm_lock_destroy_nolock(lock);
2144
2145         if (lock->l_granted_mode == lock->l_req_mode)
2146                 ldlm_pool_del(&ns->ns_pool, lock);
2147
2148         /* Make sure we will not be called again for same lock what is possible
2149          * if not to zero out lock->l_granted_mode */
2150         lock->l_granted_mode = LCK_MINMODE;
2151         unlock_res_and_lock(lock);
2152
2153         EXIT;
2154 }
2155 EXPORT_SYMBOL(ldlm_lock_cancel);
2156
2157 /**
2158  * Set opaque data into the lock that only makes sense to upper layer.
2159  */
2160 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
2161 {
2162         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2163         int rc = -EINVAL;
2164         ENTRY;
2165
2166         if (lock) {
2167                 if (lock->l_ast_data == NULL)
2168                         lock->l_ast_data = data;
2169                 if (lock->l_ast_data == data)
2170                         rc = 0;
2171                 LDLM_LOCK_PUT(lock);
2172         }
2173         RETURN(rc);
2174 }
2175 EXPORT_SYMBOL(ldlm_lock_set_data);
2176
2177 struct export_cl_data {
2178         struct obd_export       *ecl_exp;
2179         int                     ecl_loop;
2180 };
2181
2182 /**
2183  * Iterator function for ldlm_cancel_locks_for_export.
2184  * Cancels passed locks.
2185  */
2186 int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2187                                     struct hlist_node *hnode, void *data)
2188
2189 {
2190         struct export_cl_data   *ecl = (struct export_cl_data *)data;
2191         struct obd_export       *exp  = ecl->ecl_exp;
2192         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
2193         struct ldlm_resource *res;
2194
2195         res = ldlm_resource_getref(lock->l_resource);
2196         LDLM_LOCK_GET(lock);
2197
2198         LDLM_DEBUG(lock, "export %p", exp);
2199         ldlm_res_lvbo_update(res, NULL, 1);
2200         ldlm_lock_cancel(lock);
2201         ldlm_reprocess_all(res);
2202         ldlm_resource_putref(res);
2203         LDLM_LOCK_RELEASE(lock);
2204
2205         ecl->ecl_loop++;
2206         if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
2207                 CDEBUG(D_INFO,
2208                        "Cancel lock %p for export %p (loop %d), still have "
2209                        "%d locks left on hash table.\n",
2210                        lock, exp, ecl->ecl_loop,
2211                        atomic_read(&hs->hs_count));
2212         }
2213
2214         return 0;
2215 }
2216
2217 /**
2218  * Cancel all locks for given export.
2219  *
2220  * Typically called on client disconnection/eviction
2221  */
2222 void ldlm_cancel_locks_for_export(struct obd_export *exp)
2223 {
2224         struct export_cl_data   ecl = {
2225                 .ecl_exp        = exp,
2226                 .ecl_loop       = 0,
2227         };
2228
2229         cfs_hash_for_each_empty(exp->exp_lock_hash,
2230                                 ldlm_cancel_locks_for_export_cb, &ecl);
2231 }
2232
2233 /**
2234  * Downgrade an exclusive lock.
2235  *
2236  * A fast variant of ldlm_lock_convert for convertion of exclusive
2237  * locks. The convertion is always successful.
2238  * Used by Commit on Sharing (COS) code.
2239  *
2240  * \param lock A lock to convert
2241  * \param new_mode new lock mode
2242  */
2243 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
2244 {
2245         ENTRY;
2246
2247         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
2248         LASSERT(new_mode == LCK_COS);
2249
2250         lock_res_and_lock(lock);
2251         ldlm_resource_unlink_lock(lock);
2252         /*
2253          * Remove the lock from pool as it will be added again in
2254          * ldlm_grant_lock() called below.
2255          */
2256         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
2257
2258         lock->l_req_mode = new_mode;
2259         ldlm_grant_lock(lock, NULL);
2260         unlock_res_and_lock(lock);
2261         ldlm_reprocess_all(lock->l_resource);
2262
2263         EXIT;
2264 }
2265 EXPORT_SYMBOL(ldlm_lock_downgrade);
2266
2267 /**
2268  * Attempt to convert already granted lock to a different mode.
2269  *
2270  * While lock conversion is not currently used, future client-side
2271  * optimizations could take advantage of it to avoid discarding cached
2272  * pages on a file.
2273  */
2274 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
2275                                         __u32 *flags)
2276 {
2277         struct list_head rpc_list;
2278         struct ldlm_resource *res;
2279         struct ldlm_namespace *ns;
2280         int granted = 0;
2281 #ifdef HAVE_SERVER_SUPPORT
2282         int old_mode;
2283         struct sl_insert_point prev;
2284 #endif
2285         struct ldlm_interval *node;
2286         ENTRY;
2287
2288         INIT_LIST_HEAD(&rpc_list);
2289         /* Just return if mode is unchanged. */
2290         if (new_mode == lock->l_granted_mode) {
2291                 *flags |= LDLM_FL_BLOCK_GRANTED;
2292                 RETURN(lock->l_resource);
2293         }
2294
2295         /* I can't check the type of lock here because the bitlock of lock
2296          * is not held here, so do the allocation blindly. -jay */
2297         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
2298         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
2299                 RETURN(NULL);
2300
2301         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
2302                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
2303
2304         lock_res_and_lock(lock);
2305
2306         res = lock->l_resource;
2307         ns  = ldlm_res_to_ns(res);
2308
2309 #ifdef HAVE_SERVER_SUPPORT
2310         old_mode = lock->l_req_mode;
2311 #endif
2312         lock->l_req_mode = new_mode;
2313         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
2314 #ifdef HAVE_SERVER_SUPPORT
2315                 /* remember the lock position where the lock might be
2316                  * added back to the granted list later and also
2317                  * remember the join mode for skiplist fixing. */
2318                 prev.res_link = lock->l_res_link.prev;
2319                 prev.mode_link = lock->l_sl_mode.prev;
2320                 prev.policy_link = lock->l_sl_policy.prev;
2321 #endif
2322                 ldlm_resource_unlink_lock(lock);
2323         } else {
2324                 ldlm_resource_unlink_lock(lock);
2325                 if (res->lr_type == LDLM_EXTENT) {
2326                         /* FIXME: ugly code, I have to attach the lock to a
2327                          * interval node again since perhaps it will be granted
2328                          * soon */
2329                         INIT_LIST_HEAD(&node->li_group);
2330                         ldlm_interval_attach(node, lock);
2331                         node = NULL;
2332                 }
2333         }
2334
2335         /*
2336          * Remove old lock from the pool before adding the lock with new
2337          * mode below in ->policy()
2338          */
2339         ldlm_pool_del(&ns->ns_pool, lock);
2340
2341         /* If this is a local resource, put it on the appropriate list. */
2342         if (ns_is_client(ldlm_res_to_ns(res))) {
2343                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
2344                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
2345                 } else {
2346                         /* This should never happen, because of the way the
2347                          * server handles conversions. */
2348                         LDLM_ERROR(lock, "Erroneous flags %x on local lock\n",
2349                                    *flags);
2350                         LBUG();
2351
2352                         ldlm_grant_lock(lock, &rpc_list);
2353                         granted = 1;
2354                         /* FIXME: completion handling not with lr_lock held ! */
2355                         if (lock->l_completion_ast)
2356                                 lock->l_completion_ast(lock, 0, NULL);
2357                 }
2358 #ifdef HAVE_SERVER_SUPPORT
2359         } else {
2360                 int rc;
2361                 ldlm_error_t err;
2362                 __u64 pflags = 0;
2363                 ldlm_processing_policy policy;
2364                 policy = ldlm_processing_policy_table[res->lr_type];
2365                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
2366                 if (rc == LDLM_ITER_STOP) {
2367                         lock->l_req_mode = old_mode;
2368                         if (res->lr_type == LDLM_EXTENT)
2369                                 ldlm_extent_add_lock(res, lock);
2370                         else
2371                                 ldlm_granted_list_add_lock(lock, &prev);
2372
2373                         res = NULL;
2374                 } else {
2375                         *flags |= LDLM_FL_BLOCK_GRANTED;
2376                         granted = 1;
2377                 }
2378         }
2379 #else
2380         } else {
2381                 CERROR("This is client-side-only module, cannot handle "
2382                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2383                 LBUG();
2384         }
2385 #endif
2386         unlock_res_and_lock(lock);
2387
2388         if (granted)
2389                 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2390         if (node)
2391                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2392         RETURN(res);
2393 }
2394 EXPORT_SYMBOL(ldlm_lock_convert);
2395
2396 /**
2397  * Print lock with lock handle \a lockh description into debug log.
2398  *
2399  * Used when printing all locks on a resource for debug purposes.
2400  */
2401 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2402 {
2403         struct ldlm_lock *lock;
2404
2405         if (!((libcfs_debug | D_ERROR) & level))
2406                 return;
2407
2408         lock = ldlm_handle2lock(lockh);
2409         if (lock == NULL)
2410                 return;
2411
2412         LDLM_DEBUG_LIMIT(level, lock, "###");
2413
2414         LDLM_LOCK_PUT(lock);
2415 }
2416 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2417
2418 /**
2419  * Print lock information with custom message into debug log.
2420  * Helper function.
2421  */
2422 void _ldlm_lock_debug(struct ldlm_lock *lock,
2423                       struct libcfs_debug_msg_data *msgdata,
2424                       const char *fmt, ...)
2425 {
2426         va_list args;
2427         struct obd_export *exp = lock->l_export;
2428         struct ldlm_resource *resource = lock->l_resource;
2429         char *nid = "local";
2430
2431         va_start(args, fmt);
2432
2433         if (exp && exp->exp_connection) {
2434                 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2435         } else if (exp && exp->exp_obd != NULL) {
2436                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2437                 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2438         }
2439
2440         if (resource == NULL) {
2441                 libcfs_debug_vmsg2(msgdata, fmt, args,
2442                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2443                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2444                        "remote: "LPX64" expref: %d pid: %u timeout: %lu "
2445                        "lvb_type: %d\n",
2446                        lock,
2447                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2448                        lock->l_readers, lock->l_writers,
2449                        ldlm_lockname[lock->l_granted_mode],
2450                        ldlm_lockname[lock->l_req_mode],
2451                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2452                        exp ? atomic_read(&exp->exp_refcount) : -99,
2453                        lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2454                 va_end(args);
2455                 return;
2456         }
2457
2458         switch (resource->lr_type) {
2459         case LDLM_EXTENT:
2460                 libcfs_debug_vmsg2(msgdata, fmt, args,
2461                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2462                         "res: "DLDLMRES" rrc: %d type: %s ["LPU64"->"LPU64"] "
2463                         "(req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote: "
2464                         LPX64" expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2465                         ldlm_lock_to_ns_name(lock), lock,
2466                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2467                         lock->l_readers, lock->l_writers,
2468                         ldlm_lockname[lock->l_granted_mode],
2469                         ldlm_lockname[lock->l_req_mode],
2470                         PLDLMRES(resource),
2471                         atomic_read(&resource->lr_refcount),
2472                         ldlm_typename[resource->lr_type],
2473                         lock->l_policy_data.l_extent.start,
2474                         lock->l_policy_data.l_extent.end,
2475                         lock->l_req_extent.start, lock->l_req_extent.end,
2476                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2477                         exp ? atomic_read(&exp->exp_refcount) : -99,
2478                         lock->l_pid, lock->l_callback_timeout,
2479                         lock->l_lvb_type);
2480                 break;
2481
2482         case LDLM_FLOCK:
2483                 libcfs_debug_vmsg2(msgdata, fmt, args,
2484                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2485                         "res: "DLDLMRES" rrc: %d type: %s pid: %d "
2486                         "["LPU64"->"LPU64"] flags: "LPX64" nid: %s "
2487                         "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
2488                         ldlm_lock_to_ns_name(lock), lock,
2489                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2490                         lock->l_readers, lock->l_writers,
2491                         ldlm_lockname[lock->l_granted_mode],
2492                         ldlm_lockname[lock->l_req_mode],
2493                         PLDLMRES(resource),
2494                         atomic_read(&resource->lr_refcount),
2495                         ldlm_typename[resource->lr_type],
2496                         lock->l_policy_data.l_flock.pid,
2497                         lock->l_policy_data.l_flock.start,
2498                         lock->l_policy_data.l_flock.end,
2499                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2500                         exp ? atomic_read(&exp->exp_refcount) : -99,
2501                         lock->l_pid, lock->l_callback_timeout);
2502                 break;
2503
2504         case LDLM_IBITS:
2505                 libcfs_debug_vmsg2(msgdata, fmt, args,
2506                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2507                         "res: "DLDLMRES" bits "LPX64" rrc: %d type: %s "
2508                         "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2509                         "pid: %u timeout: %lu lvb_type: %d\n",
2510                         ldlm_lock_to_ns_name(lock),
2511                         lock, lock->l_handle.h_cookie,
2512                         atomic_read(&lock->l_refc),
2513                         lock->l_readers, lock->l_writers,
2514                         ldlm_lockname[lock->l_granted_mode],
2515                         ldlm_lockname[lock->l_req_mode],
2516                         PLDLMRES(resource),
2517                         lock->l_policy_data.l_inodebits.bits,
2518                         atomic_read(&resource->lr_refcount),
2519                         ldlm_typename[resource->lr_type],
2520                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2521                         exp ? atomic_read(&exp->exp_refcount) : -99,
2522                         lock->l_pid, lock->l_callback_timeout,
2523                         lock->l_lvb_type);
2524                 break;
2525
2526         default:
2527                 libcfs_debug_vmsg2(msgdata, fmt, args,
2528                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2529                         "res: "DLDLMRES" rrc: %d type: %s flags: "LPX64" "
2530                         "nid: %s remote: "LPX64" expref: %d pid: %u "
2531                         "timeout: %lu lvb_type: %d\n",
2532                         ldlm_lock_to_ns_name(lock),
2533                         lock, lock->l_handle.h_cookie,
2534                         atomic_read(&lock->l_refc),
2535                         lock->l_readers, lock->l_writers,
2536                         ldlm_lockname[lock->l_granted_mode],
2537                         ldlm_lockname[lock->l_req_mode],
2538                         PLDLMRES(resource),
2539                         atomic_read(&resource->lr_refcount),
2540                         ldlm_typename[resource->lr_type],
2541                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2542                         exp ? atomic_read(&exp->exp_refcount) : -99,
2543                         lock->l_pid, lock->l_callback_timeout,
2544                         lock->l_lvb_type);
2545                 break;
2546         }
2547         va_end(args);
2548 }
2549 EXPORT_SYMBOL(_ldlm_lock_debug);