Whamcloud - gitweb
LU-5458: libcfs: protect kkuc_groups from write access
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #include <libcfs/libcfs.h>
45 #include <obd_class.h>
46 #include "ldlm_internal.h"
47
48 /* lock types */
49 char *ldlm_lockname[] = {
50         [0] = "--",
51         [LCK_EX] = "EX",
52         [LCK_PW] = "PW",
53         [LCK_PR] = "PR",
54         [LCK_CW] = "CW",
55         [LCK_CR] = "CR",
56         [LCK_NL] = "NL",
57         [LCK_GROUP] = "GROUP",
58         [LCK_COS] = "COS"
59 };
60 EXPORT_SYMBOL(ldlm_lockname);
61
62 char *ldlm_typename[] = {
63         [LDLM_PLAIN] = "PLN",
64         [LDLM_EXTENT] = "EXT",
65         [LDLM_FLOCK] = "FLK",
66         [LDLM_IBITS] = "IBT",
67 };
68 EXPORT_SYMBOL(ldlm_typename);
69
70 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
71         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_wire_to_local,
72         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
73         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_wire18_to_local,
74         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_wire_to_local,
75 };
76
77 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
78         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_wire_to_local,
79         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
80         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_wire21_to_local,
81         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_wire_to_local,
82 };
83
84 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
85         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_local_to_wire,
86         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_local_to_wire,
87         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_local_to_wire,
88         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_local_to_wire,
89 };
90
91 /**
92  * Converts lock policy from local format to on the wire lock_desc format
93  */
94 void ldlm_convert_policy_to_wire(ldlm_type_t type,
95                                  const ldlm_policy_data_t *lpolicy,
96                                  ldlm_wire_policy_data_t *wpolicy)
97 {
98         ldlm_policy_local_to_wire_t convert;
99
100         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
101
102         convert(lpolicy, wpolicy);
103 }
104
105 /**
106  * Converts lock policy from on the wire lock_desc format to local format
107  */
108 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
109                                   const ldlm_wire_policy_data_t *wpolicy,
110                                   ldlm_policy_data_t *lpolicy)
111 {
112         ldlm_policy_wire_to_local_t convert;
113         int new_client;
114
115         /** some badness for 2.0.0 clients, but 2.0.0 isn't supported */
116         new_client = (exp_connect_flags(exp) & OBD_CONNECT_FULL20) != 0;
117         if (new_client)
118                 convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
119         else
120                 convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
121
122         convert(wpolicy, lpolicy);
123 }
124
125 char *ldlm_it2str(int it)
126 {
127         switch (it) {
128         case IT_OPEN:
129                 return "open";
130         case IT_CREAT:
131                 return "creat";
132         case (IT_OPEN | IT_CREAT):
133                 return "open|creat";
134         case IT_READDIR:
135                 return "readdir";
136         case IT_GETATTR:
137                 return "getattr";
138         case IT_LOOKUP:
139                 return "lookup";
140         case IT_UNLINK:
141                 return "unlink";
142         case IT_GETXATTR:
143                 return "getxattr";
144         case IT_LAYOUT:
145                 return "layout";
146         default:
147                 CERROR("Unknown intent %d\n", it);
148                 return "UNKNOWN";
149         }
150 }
151 EXPORT_SYMBOL(ldlm_it2str);
152
153 extern struct kmem_cache *ldlm_lock_slab;
154
155 #ifdef HAVE_SERVER_SUPPORT
156 static ldlm_processing_policy ldlm_processing_policy_table[] = {
157         [LDLM_PLAIN]    = ldlm_process_plain_lock,
158         [LDLM_EXTENT]   = ldlm_process_extent_lock,
159         [LDLM_FLOCK]    = ldlm_process_flock_lock,
160         [LDLM_IBITS]    = ldlm_process_inodebits_lock,
161 };
162
163 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
164 {
165         return ldlm_processing_policy_table[res->lr_type];
166 }
167 EXPORT_SYMBOL(ldlm_get_processing_policy);
168 #endif /* HAVE_SERVER_SUPPORT */
169
170 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
171 {
172         ns->ns_policy = arg;
173 }
174 EXPORT_SYMBOL(ldlm_register_intent);
175
176 /*
177  * REFCOUNTED LOCK OBJECTS
178  */
179
180
181 /**
182  * Get a reference on a lock.
183  *
184  * Lock refcounts, during creation:
185  *   - one special one for allocation, dec'd only once in destroy
186  *   - one for being a lock that's in-use
187  *   - one for the addref associated with a new lock
188  */
189 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
190 {
191         atomic_inc(&lock->l_refc);
192         return lock;
193 }
194 EXPORT_SYMBOL(ldlm_lock_get);
195
196 /**
197  * Release lock reference.
198  *
199  * Also frees the lock if it was last reference.
200  */
201 void ldlm_lock_put(struct ldlm_lock *lock)
202 {
203         ENTRY;
204
205         LASSERT(lock->l_resource != LP_POISON);
206         LASSERT(atomic_read(&lock->l_refc) > 0);
207         if (atomic_dec_and_test(&lock->l_refc)) {
208                 struct ldlm_resource *res;
209
210                 LDLM_DEBUG(lock,
211                            "final lock_put on destroyed lock, freeing it.");
212
213                 res = lock->l_resource;
214                 LASSERT(ldlm_is_destroyed(lock));
215                 LASSERT(list_empty(&lock->l_res_link));
216                 LASSERT(list_empty(&lock->l_pending_chain));
217
218                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
219                                      LDLM_NSS_LOCKS);
220                 lu_ref_del(&res->lr_reference, "lock", lock);
221                 ldlm_resource_putref(res);
222                 lock->l_resource = NULL;
223                 if (lock->l_export) {
224                         class_export_lock_put(lock->l_export, lock);
225                         lock->l_export = NULL;
226                 }
227
228                 if (lock->l_lvb_data != NULL)
229                         OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
230
231                 ldlm_interval_free(ldlm_interval_detach(lock));
232                 lu_ref_fini(&lock->l_reference);
233                 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
234         }
235
236         EXIT;
237 }
238 EXPORT_SYMBOL(ldlm_lock_put);
239
240 /**
241  * Removes LDLM lock \a lock from LRU. Assumes LRU is already locked.
242  */
243 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
244 {
245         int rc = 0;
246         if (!list_empty(&lock->l_lru)) {
247                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
248
249                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
250                 list_del_init(&lock->l_lru);
251                 LASSERT(ns->ns_nr_unused > 0);
252                 ns->ns_nr_unused--;
253                 rc = 1;
254         }
255         return rc;
256 }
257
258 /**
259  * Removes LDLM lock \a lock from LRU. Obtains the LRU lock first.
260  */
261 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
262 {
263         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
264         int rc;
265
266         ENTRY;
267         if (ldlm_is_ns_srv(lock)) {
268                 LASSERT(list_empty(&lock->l_lru));
269                 RETURN(0);
270         }
271
272         spin_lock(&ns->ns_lock);
273         rc = ldlm_lock_remove_from_lru_nolock(lock);
274         spin_unlock(&ns->ns_lock);
275         EXIT;
276         return rc;
277 }
278
279 /**
280  * Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked.
281  */
282 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
283 {
284         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
285
286         lock->l_last_used = cfs_time_current();
287         LASSERT(list_empty(&lock->l_lru));
288         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
289         list_add_tail(&lock->l_lru, &ns->ns_unused_list);
290         ldlm_clear_skipped(lock);
291         LASSERT(ns->ns_nr_unused >= 0);
292         ns->ns_nr_unused++;
293 }
294
295 /**
296  * Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks
297  * first.
298  */
299 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
300 {
301         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
302
303         ENTRY;
304         spin_lock(&ns->ns_lock);
305         ldlm_lock_add_to_lru_nolock(lock);
306         spin_unlock(&ns->ns_lock);
307         EXIT;
308 }
309
310 /**
311  * Moves LDLM lock \a lock that is already in namespace LRU to the tail of
312  * the LRU. Performs necessary LRU locking
313  */
314 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
315 {
316         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
317
318         ENTRY;
319         if (ldlm_is_ns_srv(lock)) {
320                 LASSERT(list_empty(&lock->l_lru));
321                 EXIT;
322                 return;
323         }
324
325         spin_lock(&ns->ns_lock);
326         if (!list_empty(&lock->l_lru)) {
327                 ldlm_lock_remove_from_lru_nolock(lock);
328                 ldlm_lock_add_to_lru_nolock(lock);
329         }
330         spin_unlock(&ns->ns_lock);
331         EXIT;
332 }
333
334 /**
335  * Helper to destroy a locked lock.
336  *
337  * Used by ldlm_lock_destroy and ldlm_lock_destroy_nolock
338  * Must be called with l_lock and lr_lock held.
339  *
340  * Does not actually free the lock data, but rather marks the lock as
341  * destroyed by setting l_destroyed field in the lock to 1.  Destroys a
342  * handle->lock association too, so that the lock can no longer be found
343  * and removes the lock from LRU list.  Actual lock freeing occurs when
344  * last lock reference goes away.
345  *
346  * Original comment (of some historical value):
347  * This used to have a 'strict' flag, which recovery would use to mark an
348  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
349  * shall explain why it's gone: with the new hash table scheme, once you call
350  * ldlm_lock_destroy, you can never drop your final references on this lock.
351  * Because it's not in the hash table anymore.  -phil
352  */
353 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
354 {
355         ENTRY;
356
357         if (lock->l_readers || lock->l_writers) {
358                 LDLM_ERROR(lock, "lock still has references");
359                 LBUG();
360         }
361
362         if (!list_empty(&lock->l_res_link)) {
363                 LDLM_ERROR(lock, "lock still on resource");
364                 LBUG();
365         }
366
367         if (ldlm_is_destroyed(lock)) {
368                 LASSERT(list_empty(&lock->l_lru));
369                 EXIT;
370                 return 0;
371         }
372         ldlm_set_destroyed(lock);
373
374         if (lock->l_export && lock->l_export->exp_lock_hash) {
375                 /* NB: it's safe to call cfs_hash_del() even lock isn't
376                  * in exp_lock_hash. */
377                 /* In the function below, .hs_keycmp resolves to
378                  * ldlm_export_lock_keycmp() */
379                 /* coverity[overrun-buffer-val] */
380                 cfs_hash_del(lock->l_export->exp_lock_hash,
381                              &lock->l_remote_handle, &lock->l_exp_hash);
382         }
383
384         ldlm_lock_remove_from_lru(lock);
385         class_handle_unhash(&lock->l_handle);
386
387 #if 0
388         /* Wake anyone waiting for this lock */
389         /* FIXME: I should probably add yet another flag, instead of using
390          * l_export to only call this on clients */
391         if (lock->l_export)
392                 class_export_put(lock->l_export);
393         lock->l_export = NULL;
394         if (lock->l_export && lock->l_completion_ast)
395                 lock->l_completion_ast(lock, 0);
396 #endif
397         EXIT;
398         return 1;
399 }
400
401 /**
402  * Destroys a LDLM lock \a lock. Performs necessary locking first.
403  */
404 void ldlm_lock_destroy(struct ldlm_lock *lock)
405 {
406         int first;
407         ENTRY;
408         lock_res_and_lock(lock);
409         first = ldlm_lock_destroy_internal(lock);
410         unlock_res_and_lock(lock);
411
412         /* drop reference from hashtable only for first destroy */
413         if (first) {
414                 lu_ref_del(&lock->l_reference, "hash", lock);
415                 LDLM_LOCK_RELEASE(lock);
416         }
417         EXIT;
418 }
419
420 /**
421  * Destroys a LDLM lock \a lock that is already locked.
422  */
423 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
424 {
425         int first;
426         ENTRY;
427         first = ldlm_lock_destroy_internal(lock);
428         /* drop reference from hashtable only for first destroy */
429         if (first) {
430                 lu_ref_del(&lock->l_reference, "hash", lock);
431                 LDLM_LOCK_RELEASE(lock);
432         }
433         EXIT;
434 }
435
436 /* this is called by portals_handle2object with the handle lock taken */
437 static void lock_handle_addref(void *lock)
438 {
439         LDLM_LOCK_GET((struct ldlm_lock *)lock);
440 }
441
442 static void lock_handle_free(void *lock, int size)
443 {
444         LASSERT(size == sizeof(struct ldlm_lock));
445         OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
446 }
447
448 struct portals_handle_ops lock_handle_ops = {
449         .hop_addref = lock_handle_addref,
450         .hop_free   = lock_handle_free,
451 };
452
453 /**
454  *
455  * Allocate and initialize new lock structure.
456  *
457  * usage: pass in a resource on which you have done ldlm_resource_get
458  *        new lock will take over the refcount.
459  * returns: lock with refcount 2 - one for current caller and one for remote
460  */
461 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
462 {
463         struct ldlm_lock *lock;
464         ENTRY;
465
466         if (resource == NULL)
467                 LBUG();
468
469         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, GFP_NOFS);
470         if (lock == NULL)
471                 RETURN(NULL);
472
473         spin_lock_init(&lock->l_lock);
474         lock->l_resource = resource;
475         lu_ref_add(&resource->lr_reference, "lock", lock);
476
477         atomic_set(&lock->l_refc, 2);
478         INIT_LIST_HEAD(&lock->l_res_link);
479         INIT_LIST_HEAD(&lock->l_lru);
480         INIT_LIST_HEAD(&lock->l_pending_chain);
481         INIT_LIST_HEAD(&lock->l_bl_ast);
482         INIT_LIST_HEAD(&lock->l_cp_ast);
483         INIT_LIST_HEAD(&lock->l_rk_ast);
484         init_waitqueue_head(&lock->l_waitq);
485         lock->l_blocking_lock = NULL;
486         INIT_LIST_HEAD(&lock->l_sl_mode);
487         INIT_LIST_HEAD(&lock->l_sl_policy);
488         INIT_HLIST_NODE(&lock->l_exp_hash);
489         INIT_HLIST_NODE(&lock->l_exp_flock_hash);
490
491         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
492                              LDLM_NSS_LOCKS);
493         INIT_LIST_HEAD(&lock->l_handle.h_link);
494         class_handle_hash(&lock->l_handle, &lock_handle_ops);
495
496         lu_ref_init(&lock->l_reference);
497         lu_ref_add(&lock->l_reference, "hash", lock);
498         lock->l_callback_timeout = 0;
499
500 #if LUSTRE_TRACKS_LOCK_EXP_REFS
501         INIT_LIST_HEAD(&lock->l_exp_refs_link);
502         lock->l_exp_refs_nr = 0;
503         lock->l_exp_refs_target = NULL;
504 #endif
505         INIT_LIST_HEAD(&lock->l_exp_list);
506
507         RETURN(lock);
508 }
509
510 /**
511  * Moves LDLM lock \a lock to another resource.
512  * This is used on client when server returns some other lock than requested
513  * (typically as a result of intent operation)
514  */
515 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
516                               const struct ldlm_res_id *new_resid)
517 {
518         struct ldlm_resource *oldres = lock->l_resource;
519         struct ldlm_resource *newres;
520         int type;
521         ENTRY;
522
523         LASSERT(ns_is_client(ns));
524
525         lock_res_and_lock(lock);
526         if (memcmp(new_resid, &lock->l_resource->lr_name,
527                    sizeof(lock->l_resource->lr_name)) == 0) {
528                 /* Nothing to do */
529                 unlock_res_and_lock(lock);
530                 RETURN(0);
531         }
532
533         LASSERT(new_resid->name[0] != 0);
534
535         /* This function assumes that the lock isn't on any lists */
536         LASSERT(list_empty(&lock->l_res_link));
537
538         type = oldres->lr_type;
539         unlock_res_and_lock(lock);
540
541         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
542         if (IS_ERR(newres))
543                 RETURN(PTR_ERR(newres));
544
545         lu_ref_add(&newres->lr_reference, "lock", lock);
546         /*
547          * To flip the lock from the old to the new resource, lock, oldres and
548          * newres have to be locked. Resource spin-locks are nested within
549          * lock->l_lock, and are taken in the memory address order to avoid
550          * dead-locks.
551          */
552         spin_lock(&lock->l_lock);
553         oldres = lock->l_resource;
554         if (oldres < newres) {
555                 lock_res(oldres);
556                 lock_res_nested(newres, LRT_NEW);
557         } else {
558                 lock_res(newres);
559                 lock_res_nested(oldres, LRT_NEW);
560         }
561         LASSERT(memcmp(new_resid, &oldres->lr_name,
562                        sizeof oldres->lr_name) != 0);
563         lock->l_resource = newres;
564         unlock_res(oldres);
565         unlock_res_and_lock(lock);
566
567         /* ...and the flowers are still standing! */
568         lu_ref_del(&oldres->lr_reference, "lock", lock);
569         ldlm_resource_putref(oldres);
570
571         RETURN(0);
572 }
573 EXPORT_SYMBOL(ldlm_lock_change_resource);
574
575 /** \defgroup ldlm_handles LDLM HANDLES
576  * Ways to get hold of locks without any addresses.
577  * @{
578  */
579
580 /**
581  * Fills in handle for LDLM lock \a lock into supplied \a lockh
582  * Does not take any references.
583  */
584 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
585 {
586         lockh->cookie = lock->l_handle.h_cookie;
587 }
588 EXPORT_SYMBOL(ldlm_lock2handle);
589
590 /**
591  * Obtain a lock reference by handle.
592  *
593  * if \a flags: atomically get the lock and set the flags.
594  *              Return NULL if flag already set
595  */
596 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
597                                      __u64 flags)
598 {
599         struct ldlm_lock *lock;
600         ENTRY;
601
602         LASSERT(handle);
603
604         lock = class_handle2object(handle->cookie, NULL);
605         if (lock == NULL)
606                 RETURN(NULL);
607
608         /* It's unlikely but possible that someone marked the lock as
609          * destroyed after we did handle2object on it */
610         if ((flags == 0) && !ldlm_is_destroyed(lock)) {
611                 lu_ref_add(&lock->l_reference, "handle", current);
612                 RETURN(lock);
613         }
614
615         lock_res_and_lock(lock);
616
617         LASSERT(lock->l_resource != NULL);
618
619         lu_ref_add_atomic(&lock->l_reference, "handle", current);
620         if (unlikely(ldlm_is_destroyed(lock))) {
621                 unlock_res_and_lock(lock);
622                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
623                 LDLM_LOCK_PUT(lock);
624                 RETURN(NULL);
625         }
626
627         /* If we're setting flags, make sure none of them are already set. */
628         if (flags != 0) {
629                 if ((lock->l_flags & flags) != 0) {
630                         unlock_res_and_lock(lock);
631                         LDLM_LOCK_PUT(lock);
632                         RETURN(NULL);
633                 }
634
635                 lock->l_flags |= flags;
636         }
637
638         unlock_res_and_lock(lock);
639         RETURN(lock);
640 }
641 EXPORT_SYMBOL(__ldlm_handle2lock);
642 /** @} ldlm_handles */
643
644 /**
645  * Fill in "on the wire" representation for given LDLM lock into supplied
646  * lock descriptor \a desc structure.
647  */
648 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
649 {
650         ldlm_res2desc(lock->l_resource, &desc->l_resource);
651         desc->l_req_mode = lock->l_req_mode;
652         desc->l_granted_mode = lock->l_granted_mode;
653         ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
654                                     &lock->l_policy_data,
655                                     &desc->l_policy_data);
656 }
657 EXPORT_SYMBOL(ldlm_lock2desc);
658
659 /**
660  * Add a lock to list of conflicting locks to send AST to.
661  *
662  * Only add if we have not sent a blocking AST to the lock yet.
663  */
664 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
665                            struct list_head *work_list)
666 {
667         if (!ldlm_is_ast_sent(lock)) {
668                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
669                 ldlm_set_ast_sent(lock);
670                 /* If the enqueuing client said so, tell the AST recipient to
671                  * discard dirty data, rather than writing back. */
672                 if (ldlm_is_ast_discard_data(new))
673                         ldlm_set_discard_data(lock);
674                 LASSERT(list_empty(&lock->l_bl_ast));
675                 list_add(&lock->l_bl_ast, work_list);
676                 LDLM_LOCK_GET(lock);
677                 LASSERT(lock->l_blocking_lock == NULL);
678                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
679         }
680 }
681
682 /**
683  * Add a lock to list of just granted locks to send completion AST to.
684  */
685 void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list)
686 {
687         if (!ldlm_is_cp_reqd(lock)) {
688                 ldlm_set_cp_reqd(lock);
689                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
690                 LASSERT(list_empty(&lock->l_cp_ast));
691                 list_add(&lock->l_cp_ast, work_list);
692                 LDLM_LOCK_GET(lock);
693         }
694 }
695
696 /**
697  * Aggregator function to add AST work items into a list. Determines
698  * what sort of an AST work needs to be done and calls the proper
699  * adding function.
700  * Must be called with lr_lock held.
701  */
702 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
703                             struct list_head *work_list)
704 {
705         ENTRY;
706         check_res_locked(lock->l_resource);
707         if (new)
708                 ldlm_add_bl_work_item(lock, new, work_list);
709         else
710                 ldlm_add_cp_work_item(lock, work_list);
711         EXIT;
712 }
713
714 /**
715  * Add specified reader/writer reference to LDLM lock with handle \a lockh.
716  * r/w reference type is determined by \a mode
717  * Calls ldlm_lock_addref_internal.
718  */
719 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
720 {
721         struct ldlm_lock *lock;
722
723         lock = ldlm_handle2lock(lockh);
724         LASSERT(lock != NULL);
725         ldlm_lock_addref_internal(lock, mode);
726         LDLM_LOCK_PUT(lock);
727 }
728 EXPORT_SYMBOL(ldlm_lock_addref);
729
730 /**
731  * Helper function.
732  * Add specified reader/writer reference to LDLM lock \a lock.
733  * r/w reference type is determined by \a mode
734  * Removes lock from LRU if it is there.
735  * Assumes the LDLM lock is already locked.
736  */
737 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
738 {
739         ldlm_lock_remove_from_lru(lock);
740         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
741                 lock->l_readers++;
742                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
743         }
744         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
745                 lock->l_writers++;
746                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
747         }
748         LDLM_LOCK_GET(lock);
749         lu_ref_add_atomic(&lock->l_reference, "user", lock);
750         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
751 }
752
753 /**
754  * Attempts to add reader/writer reference to a lock with handle \a lockh, and
755  * fails if lock is already LDLM_FL_CBPENDING or destroyed.
756  *
757  * \retval 0 success, lock was addref-ed
758  *
759  * \retval -EAGAIN lock is being canceled.
760  */
761 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
762 {
763         struct ldlm_lock *lock;
764         int               result;
765
766         result = -EAGAIN;
767         lock = ldlm_handle2lock(lockh);
768         if (lock != NULL) {
769                 lock_res_and_lock(lock);
770                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
771                     !ldlm_is_cbpending(lock)) {
772                         ldlm_lock_addref_internal_nolock(lock, mode);
773                         result = 0;
774                 }
775                 unlock_res_and_lock(lock);
776                 LDLM_LOCK_PUT(lock);
777         }
778         return result;
779 }
780 EXPORT_SYMBOL(ldlm_lock_addref_try);
781
782 /**
783  * Add specified reader/writer reference to LDLM lock \a lock.
784  * Locks LDLM lock and calls ldlm_lock_addref_internal_nolock to do the work.
785  * Only called for local locks.
786  */
787 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
788 {
789         lock_res_and_lock(lock);
790         ldlm_lock_addref_internal_nolock(lock, mode);
791         unlock_res_and_lock(lock);
792 }
793
794 /**
795  * Removes reader/writer reference for LDLM lock \a lock.
796  * Assumes LDLM lock is already locked.
797  * only called in ldlm_flock_destroy and for local locks.
798  * Does NOT add lock to LRU if no r/w references left to accomodate flock locks
799  * that cannot be placed in LRU.
800  */
801 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
802 {
803         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
804         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
805                 LASSERT(lock->l_readers > 0);
806                 lu_ref_del(&lock->l_reference, "reader", lock);
807                 lock->l_readers--;
808         }
809         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
810                 LASSERT(lock->l_writers > 0);
811                 lu_ref_del(&lock->l_reference, "writer", lock);
812                 lock->l_writers--;
813         }
814
815         lu_ref_del(&lock->l_reference, "user", lock);
816         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
817 }
818
819 /**
820  * Removes reader/writer reference for LDLM lock \a lock.
821  * Locks LDLM lock first.
822  * If the lock is determined to be client lock on a client and r/w refcount
823  * drops to zero and the lock is not blocked, the lock is added to LRU lock
824  * on the namespace.
825  * For blocked LDLM locks if r/w count drops to zero, blocking_ast is called.
826  */
827 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
828 {
829         struct ldlm_namespace *ns;
830         ENTRY;
831
832         lock_res_and_lock(lock);
833
834         ns = ldlm_lock_to_ns(lock);
835
836         ldlm_lock_decref_internal_nolock(lock, mode);
837
838         if (ldlm_is_local(lock) &&
839             !lock->l_readers && !lock->l_writers) {
840                 /* If this is a local lock on a server namespace and this was
841                  * the last reference, cancel the lock. */
842                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
843                 ldlm_set_cbpending(lock);
844         }
845
846         if (!lock->l_readers && !lock->l_writers &&
847             ldlm_is_cbpending(lock)) {
848                 /* If we received a blocked AST and this was the last reference,
849                  * run the callback. */
850                 if (ldlm_is_ns_srv(lock) && lock->l_export)
851                         CERROR("FL_CBPENDING set on non-local lock--just a "
852                                "warning\n");
853
854                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
855
856                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
857                 ldlm_lock_remove_from_lru(lock);
858                 unlock_res_and_lock(lock);
859
860                 if (ldlm_is_fail_loc(lock))
861                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
862
863                 if (ldlm_is_atomic_cb(lock) ||
864                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
865                         ldlm_handle_bl_callback(ns, NULL, lock);
866         } else if (ns_is_client(ns) &&
867                    !lock->l_readers && !lock->l_writers &&
868                    !ldlm_is_no_lru(lock) &&
869                    !ldlm_is_bl_ast(lock)) {
870
871                 LDLM_DEBUG(lock, "add lock into lru list");
872
873                 /* If this is a client-side namespace and this was the last
874                  * reference, put it on the LRU. */
875                 ldlm_lock_add_to_lru(lock);
876                 unlock_res_and_lock(lock);
877
878                 if (ldlm_is_fail_loc(lock))
879                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
880
881                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
882                  * are not supported by the server, otherwise, it is done on
883                  * enqueue. */
884                 if (!exp_connect_cancelset(lock->l_conn_export) &&
885                     !ns_connect_lru_resize(ns))
886                         ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
887         } else {
888                 LDLM_DEBUG(lock, "do not add lock into lru list");
889                 unlock_res_and_lock(lock);
890         }
891
892         EXIT;
893 }
894
895 /**
896  * Decrease reader/writer refcount for LDLM lock with handle \a lockh
897  */
898 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
899 {
900         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
901         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
902         ldlm_lock_decref_internal(lock, mode);
903         LDLM_LOCK_PUT(lock);
904 }
905 EXPORT_SYMBOL(ldlm_lock_decref);
906
907 /**
908  * Decrease reader/writer refcount for LDLM lock with handle
909  * \a lockh and mark it for subsequent cancellation once r/w refcount
910  * drops to zero instead of putting into LRU.
911  *
912  * Typical usage is for GROUP locks which we cannot allow to be cached.
913  */
914 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
915 {
916         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
917         ENTRY;
918
919         LASSERT(lock != NULL);
920
921         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
922         lock_res_and_lock(lock);
923         ldlm_set_cbpending(lock);
924         unlock_res_and_lock(lock);
925         ldlm_lock_decref_internal(lock, mode);
926         LDLM_LOCK_PUT(lock);
927 }
928 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
929
930 struct sl_insert_point {
931         struct list_head *res_link;
932         struct list_head *mode_link;
933         struct list_head *policy_link;
934 };
935
936 /**
937  * Finds a position to insert the new lock into granted lock list.
938  *
939  * Used for locks eligible for skiplist optimization.
940  *
941  * Parameters:
942  *      queue [input]:  the granted list where search acts on;
943  *      req [input]:    the lock whose position to be located;
944  *      prev [output]:  positions within 3 lists to insert @req to
945  * Return Value:
946  *      filled @prev
947  * NOTE: called by
948  *  - ldlm_grant_lock_with_skiplist
949  */
950 static void search_granted_lock(struct list_head *queue,
951                                 struct ldlm_lock *req,
952                                 struct sl_insert_point *prev)
953 {
954         struct list_head *tmp;
955         struct ldlm_lock *lock, *mode_end, *policy_end;
956         ENTRY;
957
958         list_for_each(tmp, queue) {
959                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
960
961                 mode_end = list_entry(lock->l_sl_mode.prev,
962                                           struct ldlm_lock, l_sl_mode);
963
964                 if (lock->l_req_mode != req->l_req_mode) {
965                         /* jump to last lock of mode group */
966                         tmp = &mode_end->l_res_link;
967                         continue;
968                 }
969
970                 /* suitable mode group is found */
971                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
972                         /* insert point is last lock of the mode group */
973                         prev->res_link = &mode_end->l_res_link;
974                         prev->mode_link = &mode_end->l_sl_mode;
975                         prev->policy_link = &req->l_sl_policy;
976                         EXIT;
977                         return;
978                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
979                         for (;;) {
980                                 policy_end =
981                                         list_entry(lock->l_sl_policy.prev,
982                                                        struct ldlm_lock,
983                                                        l_sl_policy);
984
985                                 if (lock->l_policy_data.l_inodebits.bits ==
986                                     req->l_policy_data.l_inodebits.bits) {
987                                         /* insert point is last lock of
988                                          * the policy group */
989                                         prev->res_link =
990                                                 &policy_end->l_res_link;
991                                         prev->mode_link =
992                                                 &policy_end->l_sl_mode;
993                                         prev->policy_link =
994                                                 &policy_end->l_sl_policy;
995                                         EXIT;
996                                         return;
997                                 }
998
999                                 if (policy_end == mode_end)
1000                                         /* done with mode group */
1001                                         break;
1002
1003                                 /* go to next policy group within mode group */
1004                                 tmp = policy_end->l_res_link.next;
1005                                 lock = list_entry(tmp, struct ldlm_lock,
1006                                                       l_res_link);
1007                         }  /* loop over policy groups within the mode group */
1008
1009                         /* insert point is last lock of the mode group,
1010                          * new policy group is started */
1011                         prev->res_link = &mode_end->l_res_link;
1012                         prev->mode_link = &mode_end->l_sl_mode;
1013                         prev->policy_link = &req->l_sl_policy;
1014                         EXIT;
1015                         return;
1016                 } else {
1017                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
1018                         LBUG();
1019                 }
1020         }
1021
1022         /* insert point is last lock on the queue,
1023          * new mode group and new policy group are started */
1024         prev->res_link = queue->prev;
1025         prev->mode_link = &req->l_sl_mode;
1026         prev->policy_link = &req->l_sl_policy;
1027         EXIT;
1028         return;
1029 }
1030
1031 /**
1032  * Add a lock into resource granted list after a position described by
1033  * \a prev.
1034  */
1035 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
1036                                        struct sl_insert_point *prev)
1037 {
1038         struct ldlm_resource *res = lock->l_resource;
1039         ENTRY;
1040
1041         check_res_locked(res);
1042
1043         ldlm_resource_dump(D_INFO, res);
1044         LDLM_DEBUG(lock, "About to add lock:");
1045
1046         if (ldlm_is_destroyed(lock)) {
1047                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
1048                 return;
1049         }
1050
1051         LASSERT(list_empty(&lock->l_res_link));
1052         LASSERT(list_empty(&lock->l_sl_mode));
1053         LASSERT(list_empty(&lock->l_sl_policy));
1054
1055         /*
1056          * lock->link == prev->link means lock is first starting the group.
1057          * Don't re-add to itself to suppress kernel warnings.
1058          */
1059         if (&lock->l_res_link != prev->res_link)
1060                 list_add(&lock->l_res_link, prev->res_link);
1061         if (&lock->l_sl_mode != prev->mode_link)
1062                 list_add(&lock->l_sl_mode, prev->mode_link);
1063         if (&lock->l_sl_policy != prev->policy_link)
1064                 list_add(&lock->l_sl_policy, prev->policy_link);
1065
1066         EXIT;
1067 }
1068
1069 /**
1070  * Add a lock to granted list on a resource maintaining skiplist
1071  * correctness.
1072  */
1073 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
1074 {
1075         struct sl_insert_point prev;
1076         ENTRY;
1077
1078         LASSERT(lock->l_req_mode == lock->l_granted_mode);
1079
1080         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
1081         ldlm_granted_list_add_lock(lock, &prev);
1082         EXIT;
1083 }
1084
1085 /**
1086  * Perform lock granting bookkeeping.
1087  *
1088  * Includes putting the lock into granted list and updating lock mode.
1089  * NOTE: called by
1090  *  - ldlm_lock_enqueue
1091  *  - ldlm_reprocess_queue
1092  *  - ldlm_lock_convert
1093  *
1094  * must be called with lr_lock held
1095  */
1096 void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
1097 {
1098         struct ldlm_resource *res = lock->l_resource;
1099         ENTRY;
1100
1101         check_res_locked(res);
1102
1103         lock->l_granted_mode = lock->l_req_mode;
1104
1105         if (work_list && lock->l_completion_ast != NULL)
1106                 ldlm_add_ast_work_item(lock, NULL, work_list);
1107
1108         /* We should not add locks to granted list in the following cases:
1109          * - this is an UNLOCK but not a real lock;
1110          * - this is a TEST lock;
1111          * - this is a F_CANCELLK lock (async flock has req_mode == 0)
1112          * - this is a deadlock (flock cannot be granted) */
1113         if (lock->l_req_mode == 0 ||
1114             lock->l_req_mode == LCK_NL ||
1115             ldlm_is_test_lock(lock) ||
1116             ldlm_is_flock_deadlock(lock))
1117                 RETURN_EXIT;
1118
1119         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1120                 ldlm_grant_lock_with_skiplist(lock);
1121         else if (res->lr_type == LDLM_EXTENT)
1122                 ldlm_extent_add_lock(res, lock);
1123         else
1124                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1125
1126         if (lock->l_granted_mode < res->lr_most_restr)
1127                 res->lr_most_restr = lock->l_granted_mode;
1128
1129         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1130         EXIT;
1131 }
1132
1133 /**
1134  * Search for a lock with given properties in a queue.
1135  *
1136  * \retval a referenced lock or NULL.  See the flag descriptions below, in the
1137  * comment above ldlm_lock_match
1138  */
1139 static struct ldlm_lock *search_queue(struct list_head *queue,
1140                                       ldlm_mode_t *mode,
1141                                       ldlm_policy_data_t *policy,
1142                                       struct ldlm_lock *old_lock,
1143                                       __u64 flags, int unref)
1144 {
1145         struct ldlm_lock *lock;
1146         struct list_head       *tmp;
1147
1148         list_for_each(tmp, queue) {
1149                 ldlm_mode_t match;
1150
1151                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1152
1153                 if (lock == old_lock)
1154                         break;
1155
1156                 /* Check if this lock can be matched.
1157                  * Used by LU-2919(exclusive open) for open lease lock */
1158                 if (ldlm_is_excl(lock))
1159                         continue;
1160
1161                 /* llite sometimes wants to match locks that will be
1162                  * canceled when their users drop, but we allow it to match
1163                  * if it passes in CBPENDING and the lock still has users.
1164                  * this is generally only going to be used by children
1165                  * whose parents already hold a lock so forward progress
1166                  * can still happen. */
1167                 if (ldlm_is_cbpending(lock) &&
1168                     !(flags & LDLM_FL_CBPENDING))
1169                         continue;
1170                 if (!unref && ldlm_is_cbpending(lock) &&
1171                     lock->l_readers == 0 && lock->l_writers == 0)
1172                         continue;
1173
1174                 if (!(lock->l_req_mode & *mode))
1175                         continue;
1176                 match = lock->l_req_mode;
1177
1178                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
1179                     (lock->l_policy_data.l_extent.start >
1180                      policy->l_extent.start ||
1181                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
1182                         continue;
1183
1184                 if (unlikely(match == LCK_GROUP) &&
1185                     lock->l_resource->lr_type == LDLM_EXTENT &&
1186                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1187                         continue;
1188
1189                 /* We match if we have existing lock with same or wider set
1190                    of bits. */
1191                 if (lock->l_resource->lr_type == LDLM_IBITS &&
1192                      ((lock->l_policy_data.l_inodebits.bits &
1193                       policy->l_inodebits.bits) !=
1194                       policy->l_inodebits.bits))
1195                         continue;
1196
1197                 if (!unref && LDLM_HAVE_MASK(lock, GONE))
1198                         continue;
1199
1200                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
1201                     !ldlm_is_local(lock))
1202                         continue;
1203
1204                 if (flags & LDLM_FL_TEST_LOCK) {
1205                         LDLM_LOCK_GET(lock);
1206                         ldlm_lock_touch_in_lru(lock);
1207                 } else {
1208                         ldlm_lock_addref_internal_nolock(lock, match);
1209                 }
1210                 *mode = match;
1211                 return lock;
1212         }
1213
1214         return NULL;
1215 }
1216
1217 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1218 {
1219         if ((lock->l_flags & LDLM_FL_FAIL_NOTIFIED) == 0) {
1220                 lock->l_flags |= LDLM_FL_FAIL_NOTIFIED;
1221                 wake_up_all(&lock->l_waitq);
1222         }
1223 }
1224 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1225
1226 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1227 {
1228         lock_res_and_lock(lock);
1229         ldlm_lock_fail_match_locked(lock);
1230         unlock_res_and_lock(lock);
1231 }
1232 EXPORT_SYMBOL(ldlm_lock_fail_match);
1233
1234 /**
1235  * Mark lock as "matchable" by OST.
1236  *
1237  * Used to prevent certain races in LOV/OSC where the lock is granted, but LVB
1238  * is not yet valid.
1239  * Assumes LDLM lock is already locked.
1240  */
1241 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1242 {
1243         ldlm_set_lvb_ready(lock);
1244         wake_up_all(&lock->l_waitq);
1245 }
1246 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
1247
1248 /**
1249  * Mark lock as "matchable" by OST.
1250  * Locks the lock and then \see ldlm_lock_allow_match_locked
1251  */
1252 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1253 {
1254         lock_res_and_lock(lock);
1255         ldlm_lock_allow_match_locked(lock);
1256         unlock_res_and_lock(lock);
1257 }
1258 EXPORT_SYMBOL(ldlm_lock_allow_match);
1259
1260 /**
1261  * Attempt to find a lock with specified properties.
1262  *
1263  * Typically returns a reference to matched lock unless LDLM_FL_TEST_LOCK is
1264  * set in \a flags
1265  *
1266  * Can be called in two ways:
1267  *
1268  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1269  * for a duplicate of.
1270  *
1271  * Otherwise, all of the fields must be filled in, to match against.
1272  *
1273  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1274  *     server (ie, connh is NULL)
1275  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1276  *     list will be considered
1277  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1278  *     to be canceled can still be matched as long as they still have reader
1279  *     or writer refernces
1280  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1281  *     just tell us if we would have matched.
1282  *
1283  * \retval 1 if it finds an already-existing lock that is compatible; in this
1284  * case, lockh is filled in with a addref()ed lock
1285  *
1286  * We also check security context, and if that fails we simply return 0 (to
1287  * keep caller code unchanged), the context failure will be discovered by
1288  * caller sometime later.
1289  */
1290 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
1291                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1292                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1293                             struct lustre_handle *lockh, int unref)
1294 {
1295         struct ldlm_resource *res;
1296         struct ldlm_lock *lock, *old_lock = NULL;
1297         int rc = 0;
1298         ENTRY;
1299
1300         if (ns == NULL) {
1301                 old_lock = ldlm_handle2lock(lockh);
1302                 LASSERT(old_lock);
1303
1304                 ns = ldlm_lock_to_ns(old_lock);
1305                 res_id = &old_lock->l_resource->lr_name;
1306                 type = old_lock->l_resource->lr_type;
1307                 mode = old_lock->l_req_mode;
1308         }
1309
1310         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1311         if (IS_ERR(res)) {
1312                 LASSERT(old_lock == NULL);
1313                 RETURN(0);
1314         }
1315
1316         LDLM_RESOURCE_ADDREF(res);
1317         lock_res(res);
1318
1319         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1320                             flags, unref);
1321         if (lock != NULL)
1322                 GOTO(out, rc = 1);
1323         if (flags & LDLM_FL_BLOCK_GRANTED)
1324                 GOTO(out, rc = 0);
1325         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1326                             flags, unref);
1327         if (lock != NULL)
1328                 GOTO(out, rc = 1);
1329         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1330                             flags, unref);
1331         if (lock != NULL)
1332                 GOTO(out, rc = 1);
1333
1334         EXIT;
1335  out:
1336         unlock_res(res);
1337         LDLM_RESOURCE_DELREF(res);
1338         ldlm_resource_putref(res);
1339
1340         if (lock) {
1341                 ldlm_lock2handle(lock, lockh);
1342                 if ((flags & LDLM_FL_LVB_READY) &&
1343                     (!ldlm_is_lvb_ready(lock))) {
1344                         __u64 wait_flags = LDLM_FL_LVB_READY |
1345                                 LDLM_FL_DESTROYED | LDLM_FL_FAIL_NOTIFIED;
1346                         struct l_wait_info lwi;
1347                         if (lock->l_completion_ast) {
1348                                 int err = lock->l_completion_ast(lock,
1349                                                           LDLM_FL_WAIT_NOREPROC,
1350                                                                  NULL);
1351                                 if (err) {
1352                                         if (flags & LDLM_FL_TEST_LOCK)
1353                                                 LDLM_LOCK_RELEASE(lock);
1354                                         else
1355                                                 ldlm_lock_decref_internal(lock,
1356                                                                           mode);
1357                                         rc = 0;
1358                                         goto out2;
1359                                 }
1360                         }
1361
1362                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1363                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1364
1365                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1366                         l_wait_event(lock->l_waitq,
1367                                      lock->l_flags & wait_flags,
1368                                      &lwi);
1369                         if (!ldlm_is_lvb_ready(lock)) {
1370                                 if (flags & LDLM_FL_TEST_LOCK)
1371                                         LDLM_LOCK_RELEASE(lock);
1372                                 else
1373                                         ldlm_lock_decref_internal(lock, mode);
1374                                 rc = 0;
1375                         }
1376                 }
1377         }
1378  out2:
1379         if (rc) {
1380                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1381                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1382                                 res_id->name[2] : policy->l_extent.start,
1383                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1384                                 res_id->name[3] : policy->l_extent.end);
1385
1386                 /* check user's security context */
1387                 if (lock->l_conn_export &&
1388                     sptlrpc_import_check_ctx(
1389                                 class_exp2cliimp(lock->l_conn_export))) {
1390                         if (!(flags & LDLM_FL_TEST_LOCK))
1391                                 ldlm_lock_decref_internal(lock, mode);
1392                         rc = 0;
1393                 }
1394
1395                 if (flags & LDLM_FL_TEST_LOCK)
1396                         LDLM_LOCK_RELEASE(lock);
1397
1398         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1399                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1400                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1401                                   type, mode, res_id->name[0], res_id->name[1],
1402                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1403                                         res_id->name[2] :policy->l_extent.start,
1404                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1405                                         res_id->name[3] : policy->l_extent.end);
1406         }
1407         if (old_lock)
1408                 LDLM_LOCK_PUT(old_lock);
1409
1410         return rc ? mode : 0;
1411 }
1412 EXPORT_SYMBOL(ldlm_lock_match);
1413
1414 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1415                                         __u64 *bits)
1416 {
1417         struct ldlm_lock *lock;
1418         ldlm_mode_t mode = 0;
1419         ENTRY;
1420
1421         lock = ldlm_handle2lock(lockh);
1422         if (lock != NULL) {
1423                 lock_res_and_lock(lock);
1424                 if (LDLM_HAVE_MASK(lock, GONE))
1425                         GOTO(out, mode);
1426
1427                 if (ldlm_is_cbpending(lock) &&
1428                     lock->l_readers == 0 && lock->l_writers == 0)
1429                         GOTO(out, mode);
1430
1431                 if (bits)
1432                         *bits = lock->l_policy_data.l_inodebits.bits;
1433                 mode = lock->l_granted_mode;
1434                 ldlm_lock_addref_internal_nolock(lock, mode);
1435         }
1436
1437         EXIT;
1438
1439 out:
1440         if (lock != NULL) {
1441                 unlock_res_and_lock(lock);
1442                 LDLM_LOCK_PUT(lock);
1443         }
1444         return mode;
1445 }
1446 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1447
1448 /** The caller must guarantee that the buffer is large enough. */
1449 int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
1450                   enum req_location loc, void *data, int size)
1451 {
1452         void *lvb;
1453         ENTRY;
1454
1455         LASSERT(data != NULL);
1456         LASSERT(size >= 0);
1457
1458         switch (lock->l_lvb_type) {
1459         case LVB_T_OST:
1460                 if (size == sizeof(struct ost_lvb)) {
1461                         if (loc == RCL_CLIENT)
1462                                 lvb = req_capsule_client_swab_get(pill,
1463                                                 &RMF_DLM_LVB,
1464                                                 lustre_swab_ost_lvb);
1465                         else
1466                                 lvb = req_capsule_server_swab_get(pill,
1467                                                 &RMF_DLM_LVB,
1468                                                 lustre_swab_ost_lvb);
1469                         if (unlikely(lvb == NULL)) {
1470                                 LDLM_ERROR(lock, "no LVB");
1471                                 RETURN(-EPROTO);
1472                         }
1473
1474                         memcpy(data, lvb, size);
1475                 } else if (size == sizeof(struct ost_lvb_v1)) {
1476                         struct ost_lvb *olvb = data;
1477
1478                         if (loc == RCL_CLIENT)
1479                                 lvb = req_capsule_client_swab_get(pill,
1480                                                 &RMF_DLM_LVB,
1481                                                 lustre_swab_ost_lvb_v1);
1482                         else
1483                                 lvb = req_capsule_server_sized_swab_get(pill,
1484                                                 &RMF_DLM_LVB, size,
1485                                                 lustre_swab_ost_lvb_v1);
1486                         if (unlikely(lvb == NULL)) {
1487                                 LDLM_ERROR(lock, "no LVB");
1488                                 RETURN(-EPROTO);
1489                         }
1490
1491                         memcpy(data, lvb, size);
1492                         olvb->lvb_mtime_ns = 0;
1493                         olvb->lvb_atime_ns = 0;
1494                         olvb->lvb_ctime_ns = 0;
1495                 } else {
1496                         LDLM_ERROR(lock, "Replied unexpected ost LVB size %d",
1497                                    size);
1498                         RETURN(-EINVAL);
1499                 }
1500                 break;
1501         case LVB_T_LQUOTA:
1502                 if (size == sizeof(struct lquota_lvb)) {
1503                         if (loc == RCL_CLIENT)
1504                                 lvb = req_capsule_client_swab_get(pill,
1505                                                 &RMF_DLM_LVB,
1506                                                 lustre_swab_lquota_lvb);
1507                         else
1508                                 lvb = req_capsule_server_swab_get(pill,
1509                                                 &RMF_DLM_LVB,
1510                                                 lustre_swab_lquota_lvb);
1511                         if (unlikely(lvb == NULL)) {
1512                                 LDLM_ERROR(lock, "no LVB");
1513                                 RETURN(-EPROTO);
1514                         }
1515
1516                         memcpy(data, lvb, size);
1517                 } else {
1518                         LDLM_ERROR(lock, "Replied unexpected lquota LVB size %d",
1519                                    size);
1520                         RETURN(-EINVAL);
1521                 }
1522                 break;
1523         case LVB_T_LAYOUT:
1524                 if (size == 0)
1525                         break;
1526
1527                 if (loc == RCL_CLIENT)
1528                         lvb = req_capsule_client_get(pill, &RMF_DLM_LVB);
1529                 else
1530                         lvb = req_capsule_server_get(pill, &RMF_DLM_LVB);
1531                 if (unlikely(lvb == NULL)) {
1532                         LDLM_ERROR(lock, "no LVB");
1533                         RETURN(-EPROTO);
1534                 }
1535
1536                 memcpy(data, lvb, size);
1537                 break;
1538         default:
1539                 LDLM_ERROR(lock, "Unknown LVB type: %d\n", lock->l_lvb_type);
1540                 libcfs_debug_dumpstack(NULL);
1541                 RETURN(-EINVAL);
1542         }
1543
1544         RETURN(0);
1545 }
1546
1547 /**
1548  * Create and fill in new LDLM lock with specified properties.
1549  * Returns a referenced lock
1550  */
1551 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1552                                    const struct ldlm_res_id *res_id,
1553                                    ldlm_type_t type,
1554                                    ldlm_mode_t mode,
1555                                    const struct ldlm_callback_suite *cbs,
1556                                    void *data, __u32 lvb_len,
1557                                    enum lvb_type lvb_type)
1558 {
1559         struct ldlm_lock        *lock;
1560         struct ldlm_resource    *res;
1561         int                     rc;
1562         ENTRY;
1563
1564         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1565         if (IS_ERR(res))
1566                 RETURN(ERR_CAST(res));
1567
1568         lock = ldlm_lock_new(res);
1569         if (lock == NULL)
1570                 RETURN(ERR_PTR(-ENOMEM));
1571
1572         lock->l_req_mode = mode;
1573         lock->l_ast_data = data;
1574         lock->l_pid = current_pid();
1575         if (ns_is_server(ns))
1576                 ldlm_set_ns_srv(lock);
1577         if (cbs) {
1578                 lock->l_blocking_ast = cbs->lcs_blocking;
1579                 lock->l_completion_ast = cbs->lcs_completion;
1580                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1581         }
1582
1583         lock->l_tree_node = NULL;
1584         /* if this is the extent lock, allocate the interval tree node */
1585         if (type == LDLM_EXTENT)
1586                 if (ldlm_interval_alloc(lock) == NULL)
1587                         GOTO(out, rc = -ENOMEM);
1588
1589         if (lvb_len) {
1590                 lock->l_lvb_len = lvb_len;
1591                 OBD_ALLOC_LARGE(lock->l_lvb_data, lvb_len);
1592                 if (lock->l_lvb_data == NULL)
1593                         GOTO(out, rc = -ENOMEM);
1594         }
1595
1596         lock->l_lvb_type = lvb_type;
1597         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1598                 GOTO(out, rc = -ENOENT);
1599
1600         RETURN(lock);
1601
1602 out:
1603         ldlm_lock_destroy(lock);
1604         LDLM_LOCK_RELEASE(lock);
1605         RETURN(ERR_PTR(rc));
1606 }
1607
1608 /**
1609  * Enqueue (request) a lock.
1610  *
1611  * Does not block. As a result of enqueue the lock would be put
1612  * into granted or waiting list.
1613  *
1614  * If namespace has intent policy sent and the lock has LDLM_FL_HAS_INTENT flag
1615  * set, skip all the enqueueing and delegate lock processing to intent policy
1616  * function.
1617  */
1618 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1619                                struct ldlm_lock **lockp,
1620                                void *cookie, __u64 *flags)
1621 {
1622         struct ldlm_lock *lock = *lockp;
1623         struct ldlm_resource *res = lock->l_resource;
1624         int local = ns_is_client(ldlm_res_to_ns(res));
1625 #ifdef HAVE_SERVER_SUPPORT
1626         ldlm_processing_policy policy;
1627 #endif
1628         ldlm_error_t rc = ELDLM_OK;
1629         struct ldlm_interval *node = NULL;
1630         ENTRY;
1631
1632         lock->l_last_activity = cfs_time_current_sec();
1633         /* policies are not executed on the client or during replay */
1634         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1635             && !local && ns->ns_policy) {
1636                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1637                                    NULL);
1638                 if (rc == ELDLM_LOCK_REPLACED) {
1639                         /* The lock that was returned has already been granted,
1640                          * and placed into lockp.  If it's not the same as the
1641                          * one we passed in, then destroy the old one and our
1642                          * work here is done. */
1643                         if (lock != *lockp) {
1644                                 ldlm_lock_destroy(lock);
1645                                 LDLM_LOCK_RELEASE(lock);
1646                         }
1647                         *flags |= LDLM_FL_LOCK_CHANGED;
1648                         RETURN(0);
1649                 } else if (rc != ELDLM_OK ||
1650                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1651                         ldlm_lock_destroy(lock);
1652                         RETURN(rc);
1653                 }
1654         }
1655
1656         if (*flags & LDLM_FL_RESENT) {
1657                 /* Reconstruct LDLM_FL_SRV_ENQ_MASK @flags for reply.
1658                  * Set LOCK_CHANGED always.
1659                  * Check if the lock is granted for BLOCK_GRANTED.
1660                  * Take NO_TIMEOUT from the lock as it is inherited through
1661                  * LDLM_FL_INHERIT_MASK */
1662                 *flags |= LDLM_FL_LOCK_CHANGED;
1663                 if (lock->l_req_mode != lock->l_granted_mode)
1664                         *flags |= LDLM_FL_BLOCK_GRANTED;
1665                 *flags |= lock->l_flags & LDLM_FL_NO_TIMEOUT;
1666                 RETURN(ELDLM_OK);
1667         }
1668
1669         /* For a replaying lock, it might be already in granted list. So
1670          * unlinking the lock will cause the interval node to be freed, we
1671          * have to allocate the interval node early otherwise we can't regrant
1672          * this lock in the future. - jay */
1673         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1674                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
1675
1676         lock_res_and_lock(lock);
1677         if (local && lock->l_req_mode == lock->l_granted_mode) {
1678                 /* The server returned a blocked lock, but it was granted
1679                  * before we got a chance to actually enqueue it.  We don't
1680                  * need to do anything else. */
1681                 *flags &= ~LDLM_FL_BLOCKED_MASK;
1682                 GOTO(out, rc = ELDLM_OK);
1683         }
1684
1685         ldlm_resource_unlink_lock(lock);
1686         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1687                 if (node == NULL) {
1688                         ldlm_lock_destroy_nolock(lock);
1689                         GOTO(out, rc = -ENOMEM);
1690                 }
1691
1692                 INIT_LIST_HEAD(&node->li_group);
1693                 ldlm_interval_attach(node, lock);
1694                 node = NULL;
1695         }
1696
1697         /* Some flags from the enqueue want to make it into the AST, via the
1698          * lock's l_flags. */
1699         if (*flags & LDLM_FL_AST_DISCARD_DATA)
1700                 ldlm_set_ast_discard_data(lock);
1701         if (*flags & LDLM_FL_TEST_LOCK)
1702                 ldlm_set_test_lock(lock);
1703
1704         /* This distinction between local lock trees is very important; a client
1705          * namespace only has information about locks taken by that client, and
1706          * thus doesn't have enough information to decide for itself if it can
1707          * be granted (below).  In this case, we do exactly what the server
1708          * tells us to do, as dictated by the 'flags'.
1709          *
1710          * We do exactly the same thing during recovery, when the server is
1711          * more or less trusting the clients not to lie.
1712          *
1713          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1714          * granted/converting queues. */
1715         if (local) {
1716                 if (*flags & LDLM_FL_BLOCK_CONV)
1717                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1718                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1719                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1720                 else
1721                         ldlm_grant_lock(lock, NULL);
1722                 GOTO(out, rc = ELDLM_OK);
1723 #ifdef HAVE_SERVER_SUPPORT
1724         } else if (*flags & LDLM_FL_REPLAY) {
1725                 if (*flags & LDLM_FL_BLOCK_CONV) {
1726                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1727                         GOTO(out, rc = ELDLM_OK);
1728                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1729                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1730                         GOTO(out, rc = ELDLM_OK);
1731                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1732                         ldlm_grant_lock(lock, NULL);
1733                         GOTO(out, rc = ELDLM_OK);
1734                 }
1735                 /* If no flags, fall through to normal enqueue path. */
1736         }
1737
1738         policy = ldlm_processing_policy_table[res->lr_type];
1739         policy(lock, flags, 1, &rc, NULL);
1740         GOTO(out, rc);
1741 #else
1742         } else {
1743                 CERROR("This is client-side-only module, cannot handle "
1744                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1745                 LBUG();
1746         }
1747 #endif
1748
1749 out:
1750         unlock_res_and_lock(lock);
1751         if (node)
1752                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1753         return rc;
1754 }
1755
1756 #ifdef HAVE_SERVER_SUPPORT
1757 /**
1758  * Iterate through all waiting locks on a given resource queue and attempt to
1759  * grant them.
1760  *
1761  * Must be called with resource lock held.
1762  */
1763 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
1764                          struct list_head *work_list)
1765 {
1766         struct list_head *tmp, *pos;
1767         ldlm_processing_policy policy;
1768         __u64 flags;
1769         int rc = LDLM_ITER_CONTINUE;
1770         ldlm_error_t err;
1771         ENTRY;
1772
1773         check_res_locked(res);
1774
1775         policy = ldlm_processing_policy_table[res->lr_type];
1776         LASSERT(policy);
1777
1778         list_for_each_safe(tmp, pos, queue) {
1779                 struct ldlm_lock *pending;
1780                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
1781
1782                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1783
1784                 flags = 0;
1785                 rc = policy(pending, &flags, 0, &err, work_list);
1786                 if (rc != LDLM_ITER_CONTINUE)
1787                         break;
1788         }
1789
1790         RETURN(rc);
1791 }
1792 #endif
1793
1794 /**
1795  * Process a call to blocking AST callback for a lock in ast_work list
1796  */
1797 static int
1798 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1799 {
1800         struct ldlm_cb_set_arg *arg = opaq;
1801         struct ldlm_lock_desc   d;
1802         int                     rc;
1803         struct ldlm_lock       *lock;
1804         ENTRY;
1805
1806         if (list_empty(arg->list))
1807                 RETURN(-ENOENT);
1808
1809         lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1810
1811         /* nobody should touch l_bl_ast */
1812         lock_res_and_lock(lock);
1813         list_del_init(&lock->l_bl_ast);
1814
1815         LASSERT(ldlm_is_ast_sent(lock));
1816         LASSERT(lock->l_bl_ast_run == 0);
1817         LASSERT(lock->l_blocking_lock);
1818         lock->l_bl_ast_run++;
1819         unlock_res_and_lock(lock);
1820
1821         ldlm_lock2desc(lock->l_blocking_lock, &d);
1822
1823         rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1824         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1825         lock->l_blocking_lock = NULL;
1826         LDLM_LOCK_RELEASE(lock);
1827
1828         RETURN(rc);
1829 }
1830
1831 /**
1832  * Process a call to completion AST callback for a lock in ast_work list
1833  */
1834 static int
1835 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1836 {
1837         struct ldlm_cb_set_arg  *arg = opaq;
1838         int                      rc = 0;
1839         struct ldlm_lock        *lock;
1840         ldlm_completion_callback completion_callback;
1841         ENTRY;
1842
1843         if (list_empty(arg->list))
1844                 RETURN(-ENOENT);
1845
1846         lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1847
1848         /* It's possible to receive a completion AST before we've set
1849          * the l_completion_ast pointer: either because the AST arrived
1850          * before the reply, or simply because there's a small race
1851          * window between receiving the reply and finishing the local
1852          * enqueue. (bug 842)
1853          *
1854          * This can't happen with the blocking_ast, however, because we
1855          * will never call the local blocking_ast until we drop our
1856          * reader/writer reference, which we won't do until we get the
1857          * reply and finish enqueueing. */
1858
1859         /* nobody should touch l_cp_ast */
1860         lock_res_and_lock(lock);
1861         list_del_init(&lock->l_cp_ast);
1862         LASSERT(ldlm_is_cp_reqd(lock));
1863         /* save l_completion_ast since it can be changed by
1864          * mds_intent_policy(), see bug 14225 */
1865         completion_callback = lock->l_completion_ast;
1866         ldlm_clear_cp_reqd(lock);
1867         unlock_res_and_lock(lock);
1868
1869         if (completion_callback != NULL)
1870                 rc = completion_callback(lock, 0, (void *)arg);
1871         LDLM_LOCK_RELEASE(lock);
1872
1873         RETURN(rc);
1874 }
1875
1876 /**
1877  * Process a call to revocation AST callback for a lock in ast_work list
1878  */
1879 static int
1880 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1881 {
1882         struct ldlm_cb_set_arg *arg = opaq;
1883         struct ldlm_lock_desc   desc;
1884         int                     rc;
1885         struct ldlm_lock       *lock;
1886         ENTRY;
1887
1888         if (list_empty(arg->list))
1889                 RETURN(-ENOENT);
1890
1891         lock = list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1892         list_del_init(&lock->l_rk_ast);
1893
1894         /* the desc just pretend to exclusive */
1895         ldlm_lock2desc(lock, &desc);
1896         desc.l_req_mode = LCK_EX;
1897         desc.l_granted_mode = 0;
1898
1899         rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1900         LDLM_LOCK_RELEASE(lock);
1901
1902         RETURN(rc);
1903 }
1904
1905 /**
1906  * Process a call to glimpse AST callback for a lock in ast_work list
1907  */
1908 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1909 {
1910         struct ldlm_cb_set_arg          *arg = opaq;
1911         struct ldlm_glimpse_work        *gl_work;
1912         struct ldlm_lock                *lock;
1913         int                              rc = 0;
1914         ENTRY;
1915
1916         if (list_empty(arg->list))
1917                 RETURN(-ENOENT);
1918
1919         gl_work = list_entry(arg->list->next, struct ldlm_glimpse_work,
1920                                  gl_list);
1921         list_del_init(&gl_work->gl_list);
1922
1923         lock = gl_work->gl_lock;
1924
1925         /* transfer the glimpse descriptor to ldlm_cb_set_arg */
1926         arg->gl_desc = gl_work->gl_desc;
1927
1928         /* invoke the actual glimpse callback */
1929         if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
1930                 rc = 1;
1931
1932         LDLM_LOCK_RELEASE(lock);
1933
1934         if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
1935                 OBD_FREE_PTR(gl_work);
1936
1937         RETURN(rc);
1938 }
1939
1940 /**
1941  * Process list of locks in need of ASTs being sent.
1942  *
1943  * Used on server to send multiple ASTs together instead of sending one by
1944  * one.
1945  */
1946 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
1947                       ldlm_desc_ast_t ast_type)
1948 {
1949         struct ldlm_cb_set_arg *arg;
1950         set_producer_func       work_ast_lock;
1951         int                     rc;
1952
1953         if (list_empty(rpc_list))
1954                 RETURN(0);
1955
1956         OBD_ALLOC_PTR(arg);
1957         if (arg == NULL)
1958                 RETURN(-ENOMEM);
1959
1960         atomic_set(&arg->restart, 0);
1961         arg->list = rpc_list;
1962
1963         switch (ast_type) {
1964                 case LDLM_WORK_BL_AST:
1965                         arg->type = LDLM_BL_CALLBACK;
1966                         work_ast_lock = ldlm_work_bl_ast_lock;
1967                         break;
1968                 case LDLM_WORK_CP_AST:
1969                         arg->type = LDLM_CP_CALLBACK;
1970                         work_ast_lock = ldlm_work_cp_ast_lock;
1971                         break;
1972                 case LDLM_WORK_REVOKE_AST:
1973                         arg->type = LDLM_BL_CALLBACK;
1974                         work_ast_lock = ldlm_work_revoke_ast_lock;
1975                         break;
1976                 case LDLM_WORK_GL_AST:
1977                         arg->type = LDLM_GL_CALLBACK;
1978                         work_ast_lock = ldlm_work_gl_ast_lock;
1979                         break;
1980                 default:
1981                         LBUG();
1982         }
1983
1984         /* We create a ptlrpc request set with flow control extension.
1985          * This request set will use the work_ast_lock function to produce new
1986          * requests and will send a new request each time one completes in order
1987          * to keep the number of requests in flight to ns_max_parallel_ast */
1988         arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1989                                      work_ast_lock, arg);
1990         if (arg->set == NULL)
1991                 GOTO(out, rc = -ENOMEM);
1992
1993         ptlrpc_set_wait(arg->set);
1994         ptlrpc_set_destroy(arg->set);
1995
1996         rc = atomic_read(&arg->restart) ? -ERESTART : 0;
1997         GOTO(out, rc);
1998 out:
1999         OBD_FREE_PTR(arg);
2000         return rc;
2001 }
2002
2003 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
2004 {
2005         ldlm_reprocess_all(res);
2006         return LDLM_ITER_CONTINUE;
2007 }
2008
2009 static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2010                               struct hlist_node *hnode, void *arg)
2011 {
2012         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2013         int    rc;
2014
2015         rc = reprocess_one_queue(res, arg);
2016
2017         return rc == LDLM_ITER_STOP;
2018 }
2019
2020 /**
2021  * Iterate through all resources on a namespace attempting to grant waiting
2022  * locks.
2023  */
2024 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
2025 {
2026         ENTRY;
2027
2028         if (ns != NULL) {
2029                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2030                                          ldlm_reprocess_res, NULL);
2031         }
2032         EXIT;
2033 }
2034 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
2035
2036 /**
2037  * Try to grant all waiting locks on a resource.
2038  *
2039  * Calls ldlm_reprocess_queue on converting and waiting queues.
2040  *
2041  * Typically called after some resource locks are cancelled to see
2042  * if anything could be granted as a result of the cancellation.
2043  */
2044 void ldlm_reprocess_all(struct ldlm_resource *res)
2045 {
2046         struct list_head rpc_list;
2047 #ifdef HAVE_SERVER_SUPPORT
2048         int rc;
2049         ENTRY;
2050
2051         INIT_LIST_HEAD(&rpc_list);
2052         /* Local lock trees don't get reprocessed. */
2053         if (ns_is_client(ldlm_res_to_ns(res))) {
2054                 EXIT;
2055                 return;
2056         }
2057
2058 restart:
2059         lock_res(res);
2060         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
2061         if (rc == LDLM_ITER_CONTINUE)
2062                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
2063         unlock_res(res);
2064
2065         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
2066                                LDLM_WORK_CP_AST);
2067         if (rc == -ERESTART) {
2068                 LASSERT(list_empty(&rpc_list));
2069                 goto restart;
2070         }
2071 #else
2072         ENTRY;
2073
2074         INIT_LIST_HEAD(&rpc_list);
2075         if (!ns_is_client(ldlm_res_to_ns(res))) {
2076                 CERROR("This is client-side-only module, cannot handle "
2077                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2078                 LBUG();
2079         }
2080 #endif
2081         EXIT;
2082 }
2083 EXPORT_SYMBOL(ldlm_reprocess_all);
2084
2085 /**
2086  * Helper function to call blocking AST for LDLM lock \a lock in a
2087  * "cancelling" mode.
2088  */
2089 void ldlm_cancel_callback(struct ldlm_lock *lock)
2090 {
2091         check_res_locked(lock->l_resource);
2092         if (!ldlm_is_cancel(lock)) {
2093                 ldlm_set_cancel(lock);
2094                 if (lock->l_blocking_ast) {
2095                         unlock_res_and_lock(lock);
2096                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
2097                                              LDLM_CB_CANCELING);
2098                         lock_res_and_lock(lock);
2099                 } else {
2100                         LDLM_DEBUG(lock, "no blocking ast");
2101                 }
2102         }
2103         ldlm_set_bl_done(lock);
2104 }
2105
2106 /**
2107  * Remove skiplist-enabled LDLM lock \a req from granted list
2108  */
2109 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
2110 {
2111         if (req->l_resource->lr_type != LDLM_PLAIN &&
2112             req->l_resource->lr_type != LDLM_IBITS)
2113                 return;
2114
2115         list_del_init(&req->l_sl_policy);
2116         list_del_init(&req->l_sl_mode);
2117 }
2118
2119 /**
2120  * Attempts to cancel LDLM lock \a lock that has no reader/writer references.
2121  */
2122 void ldlm_lock_cancel(struct ldlm_lock *lock)
2123 {
2124         struct ldlm_resource *res;
2125         struct ldlm_namespace *ns;
2126         ENTRY;
2127
2128         lock_res_and_lock(lock);
2129
2130         res = lock->l_resource;
2131         ns  = ldlm_res_to_ns(res);
2132
2133         /* Please do not, no matter how tempting, remove this LBUG without
2134          * talking to me first. -phik */
2135         if (lock->l_readers || lock->l_writers) {
2136                 LDLM_ERROR(lock, "lock still has references");
2137                 LBUG();
2138         }
2139
2140         if (ldlm_is_waited(lock))
2141                 ldlm_del_waiting_lock(lock);
2142
2143         /* Releases cancel callback. */
2144         ldlm_cancel_callback(lock);
2145
2146         /* Yes, second time, just in case it was added again while we were
2147          * running with no res lock in ldlm_cancel_callback */
2148         if (ldlm_is_waited(lock))
2149                 ldlm_del_waiting_lock(lock);
2150
2151         ldlm_resource_unlink_lock(lock);
2152         ldlm_lock_destroy_nolock(lock);
2153
2154         if (lock->l_granted_mode == lock->l_req_mode)
2155                 ldlm_pool_del(&ns->ns_pool, lock);
2156
2157         /* Make sure we will not be called again for same lock what is possible
2158          * if not to zero out lock->l_granted_mode */
2159         lock->l_granted_mode = LCK_MINMODE;
2160         unlock_res_and_lock(lock);
2161
2162         EXIT;
2163 }
2164 EXPORT_SYMBOL(ldlm_lock_cancel);
2165
2166 /**
2167  * Set opaque data into the lock that only makes sense to upper layer.
2168  */
2169 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
2170 {
2171         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2172         int rc = -EINVAL;
2173         ENTRY;
2174
2175         if (lock) {
2176                 if (lock->l_ast_data == NULL)
2177                         lock->l_ast_data = data;
2178                 if (lock->l_ast_data == data)
2179                         rc = 0;
2180                 LDLM_LOCK_PUT(lock);
2181         }
2182         RETURN(rc);
2183 }
2184 EXPORT_SYMBOL(ldlm_lock_set_data);
2185
2186 struct export_cl_data {
2187         struct obd_export       *ecl_exp;
2188         int                     ecl_loop;
2189 };
2190
2191 /**
2192  * Iterator function for ldlm_cancel_locks_for_export.
2193  * Cancels passed locks.
2194  */
2195 int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2196                                     struct hlist_node *hnode, void *data)
2197
2198 {
2199         struct export_cl_data   *ecl = (struct export_cl_data *)data;
2200         struct obd_export       *exp  = ecl->ecl_exp;
2201         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
2202         struct ldlm_resource *res;
2203
2204         res = ldlm_resource_getref(lock->l_resource);
2205         LDLM_LOCK_GET(lock);
2206
2207         LDLM_DEBUG(lock, "export %p", exp);
2208         ldlm_res_lvbo_update(res, NULL, 1);
2209         ldlm_lock_cancel(lock);
2210         ldlm_reprocess_all(res);
2211         ldlm_resource_putref(res);
2212         LDLM_LOCK_RELEASE(lock);
2213
2214         ecl->ecl_loop++;
2215         if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
2216                 CDEBUG(D_INFO,
2217                        "Cancel lock %p for export %p (loop %d), still have "
2218                        "%d locks left on hash table.\n",
2219                        lock, exp, ecl->ecl_loop,
2220                        atomic_read(&hs->hs_count));
2221         }
2222
2223         return 0;
2224 }
2225
2226 /**
2227  * Cancel all locks for given export.
2228  *
2229  * Typically called on client disconnection/eviction
2230  */
2231 void ldlm_cancel_locks_for_export(struct obd_export *exp)
2232 {
2233         struct export_cl_data   ecl = {
2234                 .ecl_exp        = exp,
2235                 .ecl_loop       = 0,
2236         };
2237
2238         cfs_hash_for_each_empty(exp->exp_lock_hash,
2239                                 ldlm_cancel_locks_for_export_cb, &ecl);
2240 }
2241
2242 /**
2243  * Downgrade an exclusive lock.
2244  *
2245  * A fast variant of ldlm_lock_convert for convertion of exclusive
2246  * locks. The convertion is always successful.
2247  * Used by Commit on Sharing (COS) code.
2248  *
2249  * \param lock A lock to convert
2250  * \param new_mode new lock mode
2251  */
2252 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
2253 {
2254         ENTRY;
2255
2256         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
2257         LASSERT(new_mode == LCK_COS);
2258
2259         lock_res_and_lock(lock);
2260         ldlm_resource_unlink_lock(lock);
2261         /*
2262          * Remove the lock from pool as it will be added again in
2263          * ldlm_grant_lock() called below.
2264          */
2265         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
2266
2267         lock->l_req_mode = new_mode;
2268         ldlm_grant_lock(lock, NULL);
2269         unlock_res_and_lock(lock);
2270         ldlm_reprocess_all(lock->l_resource);
2271
2272         EXIT;
2273 }
2274 EXPORT_SYMBOL(ldlm_lock_downgrade);
2275
2276 /**
2277  * Attempt to convert already granted lock to a different mode.
2278  *
2279  * While lock conversion is not currently used, future client-side
2280  * optimizations could take advantage of it to avoid discarding cached
2281  * pages on a file.
2282  */
2283 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
2284                                         __u32 *flags)
2285 {
2286         struct list_head rpc_list;
2287         struct ldlm_resource *res;
2288         struct ldlm_namespace *ns;
2289         int granted = 0;
2290 #ifdef HAVE_SERVER_SUPPORT
2291         int old_mode;
2292         struct sl_insert_point prev;
2293 #endif
2294         struct ldlm_interval *node;
2295         ENTRY;
2296
2297         INIT_LIST_HEAD(&rpc_list);
2298         /* Just return if mode is unchanged. */
2299         if (new_mode == lock->l_granted_mode) {
2300                 *flags |= LDLM_FL_BLOCK_GRANTED;
2301                 RETURN(lock->l_resource);
2302         }
2303
2304         /* I can't check the type of lock here because the bitlock of lock
2305          * is not held here, so do the allocation blindly. -jay */
2306         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
2307         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
2308                 RETURN(NULL);
2309
2310         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
2311                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
2312
2313         lock_res_and_lock(lock);
2314
2315         res = lock->l_resource;
2316         ns  = ldlm_res_to_ns(res);
2317
2318 #ifdef HAVE_SERVER_SUPPORT
2319         old_mode = lock->l_req_mode;
2320 #endif
2321         lock->l_req_mode = new_mode;
2322         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
2323 #ifdef HAVE_SERVER_SUPPORT
2324                 /* remember the lock position where the lock might be
2325                  * added back to the granted list later and also
2326                  * remember the join mode for skiplist fixing. */
2327                 prev.res_link = lock->l_res_link.prev;
2328                 prev.mode_link = lock->l_sl_mode.prev;
2329                 prev.policy_link = lock->l_sl_policy.prev;
2330 #endif
2331                 ldlm_resource_unlink_lock(lock);
2332         } else {
2333                 ldlm_resource_unlink_lock(lock);
2334                 if (res->lr_type == LDLM_EXTENT) {
2335                         /* FIXME: ugly code, I have to attach the lock to a
2336                          * interval node again since perhaps it will be granted
2337                          * soon */
2338                         INIT_LIST_HEAD(&node->li_group);
2339                         ldlm_interval_attach(node, lock);
2340                         node = NULL;
2341                 }
2342         }
2343
2344         /*
2345          * Remove old lock from the pool before adding the lock with new
2346          * mode below in ->policy()
2347          */
2348         ldlm_pool_del(&ns->ns_pool, lock);
2349
2350         /* If this is a local resource, put it on the appropriate list. */
2351         if (ns_is_client(ldlm_res_to_ns(res))) {
2352                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
2353                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
2354                 } else {
2355                         /* This should never happen, because of the way the
2356                          * server handles conversions. */
2357                         LDLM_ERROR(lock, "Erroneous flags %x on local lock\n",
2358                                    *flags);
2359                         LBUG();
2360
2361                         ldlm_grant_lock(lock, &rpc_list);
2362                         granted = 1;
2363                         /* FIXME: completion handling not with lr_lock held ! */
2364                         if (lock->l_completion_ast)
2365                                 lock->l_completion_ast(lock, 0, NULL);
2366                 }
2367 #ifdef HAVE_SERVER_SUPPORT
2368         } else {
2369                 int rc;
2370                 ldlm_error_t err;
2371                 __u64 pflags = 0;
2372                 ldlm_processing_policy policy;
2373                 policy = ldlm_processing_policy_table[res->lr_type];
2374                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
2375                 if (rc == LDLM_ITER_STOP) {
2376                         lock->l_req_mode = old_mode;
2377                         if (res->lr_type == LDLM_EXTENT)
2378                                 ldlm_extent_add_lock(res, lock);
2379                         else
2380                                 ldlm_granted_list_add_lock(lock, &prev);
2381
2382                         res = NULL;
2383                 } else {
2384                         *flags |= LDLM_FL_BLOCK_GRANTED;
2385                         granted = 1;
2386                 }
2387         }
2388 #else
2389         } else {
2390                 CERROR("This is client-side-only module, cannot handle "
2391                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2392                 LBUG();
2393         }
2394 #endif
2395         unlock_res_and_lock(lock);
2396
2397         if (granted)
2398                 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2399         if (node)
2400                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2401         RETURN(res);
2402 }
2403 EXPORT_SYMBOL(ldlm_lock_convert);
2404
2405 /**
2406  * Print lock with lock handle \a lockh description into debug log.
2407  *
2408  * Used when printing all locks on a resource for debug purposes.
2409  */
2410 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2411 {
2412         struct ldlm_lock *lock;
2413
2414         if (!((libcfs_debug | D_ERROR) & level))
2415                 return;
2416
2417         lock = ldlm_handle2lock(lockh);
2418         if (lock == NULL)
2419                 return;
2420
2421         LDLM_DEBUG_LIMIT(level, lock, "###");
2422
2423         LDLM_LOCK_PUT(lock);
2424 }
2425 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2426
2427 /**
2428  * Print lock information with custom message into debug log.
2429  * Helper function.
2430  */
2431 void _ldlm_lock_debug(struct ldlm_lock *lock,
2432                       struct libcfs_debug_msg_data *msgdata,
2433                       const char *fmt, ...)
2434 {
2435         va_list args;
2436         struct obd_export *exp = lock->l_export;
2437         struct ldlm_resource *resource = lock->l_resource;
2438         char *nid = "local";
2439
2440         va_start(args, fmt);
2441
2442         if (exp && exp->exp_connection) {
2443                 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2444         } else if (exp && exp->exp_obd != NULL) {
2445                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2446                 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2447         }
2448
2449         if (resource == NULL) {
2450                 libcfs_debug_vmsg2(msgdata, fmt, args,
2451                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2452                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2453                        "remote: "LPX64" expref: %d pid: %u timeout: %lu "
2454                        "lvb_type: %d\n",
2455                        lock,
2456                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2457                        lock->l_readers, lock->l_writers,
2458                        ldlm_lockname[lock->l_granted_mode],
2459                        ldlm_lockname[lock->l_req_mode],
2460                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2461                        exp ? atomic_read(&exp->exp_refcount) : -99,
2462                        lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2463                 va_end(args);
2464                 return;
2465         }
2466
2467         switch (resource->lr_type) {
2468         case LDLM_EXTENT:
2469                 libcfs_debug_vmsg2(msgdata, fmt, args,
2470                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2471                         "res: "DLDLMRES" rrc: %d type: %s ["LPU64"->"LPU64"] "
2472                         "(req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote: "
2473                         LPX64" expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2474                         ldlm_lock_to_ns_name(lock), lock,
2475                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2476                         lock->l_readers, lock->l_writers,
2477                         ldlm_lockname[lock->l_granted_mode],
2478                         ldlm_lockname[lock->l_req_mode],
2479                         PLDLMRES(resource),
2480                         atomic_read(&resource->lr_refcount),
2481                         ldlm_typename[resource->lr_type],
2482                         lock->l_policy_data.l_extent.start,
2483                         lock->l_policy_data.l_extent.end,
2484                         lock->l_req_extent.start, lock->l_req_extent.end,
2485                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2486                         exp ? atomic_read(&exp->exp_refcount) : -99,
2487                         lock->l_pid, lock->l_callback_timeout,
2488                         lock->l_lvb_type);
2489                 break;
2490
2491         case LDLM_FLOCK:
2492                 libcfs_debug_vmsg2(msgdata, fmt, args,
2493                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2494                         "res: "DLDLMRES" rrc: %d type: %s pid: %d "
2495                         "["LPU64"->"LPU64"] flags: "LPX64" nid: %s "
2496                         "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
2497                         ldlm_lock_to_ns_name(lock), lock,
2498                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2499                         lock->l_readers, lock->l_writers,
2500                         ldlm_lockname[lock->l_granted_mode],
2501                         ldlm_lockname[lock->l_req_mode],
2502                         PLDLMRES(resource),
2503                         atomic_read(&resource->lr_refcount),
2504                         ldlm_typename[resource->lr_type],
2505                         lock->l_policy_data.l_flock.pid,
2506                         lock->l_policy_data.l_flock.start,
2507                         lock->l_policy_data.l_flock.end,
2508                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2509                         exp ? atomic_read(&exp->exp_refcount) : -99,
2510                         lock->l_pid, lock->l_callback_timeout);
2511                 break;
2512
2513         case LDLM_IBITS:
2514                 libcfs_debug_vmsg2(msgdata, fmt, args,
2515                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2516                         "res: "DLDLMRES" bits "LPX64" rrc: %d type: %s "
2517                         "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2518                         "pid: %u timeout: %lu lvb_type: %d\n",
2519                         ldlm_lock_to_ns_name(lock),
2520                         lock, lock->l_handle.h_cookie,
2521                         atomic_read(&lock->l_refc),
2522                         lock->l_readers, lock->l_writers,
2523                         ldlm_lockname[lock->l_granted_mode],
2524                         ldlm_lockname[lock->l_req_mode],
2525                         PLDLMRES(resource),
2526                         lock->l_policy_data.l_inodebits.bits,
2527                         atomic_read(&resource->lr_refcount),
2528                         ldlm_typename[resource->lr_type],
2529                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2530                         exp ? atomic_read(&exp->exp_refcount) : -99,
2531                         lock->l_pid, lock->l_callback_timeout,
2532                         lock->l_lvb_type);
2533                 break;
2534
2535         default:
2536                 libcfs_debug_vmsg2(msgdata, fmt, args,
2537                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2538                         "res: "DLDLMRES" rrc: %d type: %s flags: "LPX64" "
2539                         "nid: %s remote: "LPX64" expref: %d pid: %u "
2540                         "timeout: %lu lvb_type: %d\n",
2541                         ldlm_lock_to_ns_name(lock),
2542                         lock, lock->l_handle.h_cookie,
2543                         atomic_read(&lock->l_refc),
2544                         lock->l_readers, lock->l_writers,
2545                         ldlm_lockname[lock->l_granted_mode],
2546                         ldlm_lockname[lock->l_req_mode],
2547                         PLDLMRES(resource),
2548                         atomic_read(&resource->lr_refcount),
2549                         ldlm_typename[resource->lr_type],
2550                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2551                         exp ? atomic_read(&exp->exp_refcount) : -99,
2552                         lock->l_pid, lock->l_callback_timeout,
2553                         lock->l_lvb_type);
2554                 break;
2555         }
2556         va_end(args);
2557 }
2558 EXPORT_SYMBOL(_ldlm_lock_debug);