Whamcloud - gitweb
LU-6068 misc: update Intel copyright messages 2014
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #include <libcfs/libcfs.h>
45 #include <obd_class.h>
46 #include "ldlm_internal.h"
47
48 /* lock types */
49 char *ldlm_lockname[] = {
50         [0] = "--",
51         [LCK_EX] = "EX",
52         [LCK_PW] = "PW",
53         [LCK_PR] = "PR",
54         [LCK_CW] = "CW",
55         [LCK_CR] = "CR",
56         [LCK_NL] = "NL",
57         [LCK_GROUP] = "GROUP",
58         [LCK_COS] = "COS"
59 };
60 EXPORT_SYMBOL(ldlm_lockname);
61
62 char *ldlm_typename[] = {
63         [LDLM_PLAIN] = "PLN",
64         [LDLM_EXTENT] = "EXT",
65         [LDLM_FLOCK] = "FLK",
66         [LDLM_IBITS] = "IBT",
67 };
68 EXPORT_SYMBOL(ldlm_typename);
69
70 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
71         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_wire_to_local,
72         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
73         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_wire18_to_local,
74         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_wire_to_local,
75 };
76
77 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
78         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_wire_to_local,
79         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
80         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_wire21_to_local,
81         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_wire_to_local,
82 };
83
84 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
85         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_local_to_wire,
86         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_local_to_wire,
87         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_local_to_wire,
88         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_local_to_wire,
89 };
90
91 /**
92  * Converts lock policy from local format to on the wire lock_desc format
93  */
94 void ldlm_convert_policy_to_wire(ldlm_type_t type,
95                                  const ldlm_policy_data_t *lpolicy,
96                                  ldlm_wire_policy_data_t *wpolicy)
97 {
98         ldlm_policy_local_to_wire_t convert;
99
100         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
101
102         convert(lpolicy, wpolicy);
103 }
104
105 /**
106  * Converts lock policy from on the wire lock_desc format to local format
107  */
108 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
109                                   const ldlm_wire_policy_data_t *wpolicy,
110                                   ldlm_policy_data_t *lpolicy)
111 {
112         ldlm_policy_wire_to_local_t convert;
113         int new_client;
114
115         /** some badness for 2.0.0 clients, but 2.0.0 isn't supported */
116         new_client = (exp_connect_flags(exp) & OBD_CONNECT_FULL20) != 0;
117         if (new_client)
118                 convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
119         else
120                 convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
121
122         convert(wpolicy, lpolicy);
123 }
124
125 char *ldlm_it2str(int it)
126 {
127         switch (it) {
128         case IT_OPEN:
129                 return "open";
130         case IT_CREAT:
131                 return "creat";
132         case (IT_OPEN | IT_CREAT):
133                 return "open|creat";
134         case IT_READDIR:
135                 return "readdir";
136         case IT_GETATTR:
137                 return "getattr";
138         case IT_LOOKUP:
139                 return "lookup";
140         case IT_UNLINK:
141                 return "unlink";
142         case IT_GETXATTR:
143                 return "getxattr";
144         case IT_LAYOUT:
145                 return "layout";
146         default:
147                 CERROR("Unknown intent %d\n", it);
148                 return "UNKNOWN";
149         }
150 }
151 EXPORT_SYMBOL(ldlm_it2str);
152
153 extern struct kmem_cache *ldlm_lock_slab;
154
155 #ifdef HAVE_SERVER_SUPPORT
156 static ldlm_processing_policy ldlm_processing_policy_table[] = {
157         [LDLM_PLAIN]    = ldlm_process_plain_lock,
158         [LDLM_EXTENT]   = ldlm_process_extent_lock,
159         [LDLM_FLOCK]    = ldlm_process_flock_lock,
160         [LDLM_IBITS]    = ldlm_process_inodebits_lock,
161 };
162
163 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
164 {
165         return ldlm_processing_policy_table[res->lr_type];
166 }
167 EXPORT_SYMBOL(ldlm_get_processing_policy);
168 #endif /* HAVE_SERVER_SUPPORT */
169
170 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
171 {
172         ns->ns_policy = arg;
173 }
174 EXPORT_SYMBOL(ldlm_register_intent);
175
176 /*
177  * REFCOUNTED LOCK OBJECTS
178  */
179
180
181 /**
182  * Get a reference on a lock.
183  *
184  * Lock refcounts, during creation:
185  *   - one special one for allocation, dec'd only once in destroy
186  *   - one for being a lock that's in-use
187  *   - one for the addref associated with a new lock
188  */
189 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
190 {
191         atomic_inc(&lock->l_refc);
192         return lock;
193 }
194 EXPORT_SYMBOL(ldlm_lock_get);
195
196 /**
197  * Release lock reference.
198  *
199  * Also frees the lock if it was last reference.
200  */
201 void ldlm_lock_put(struct ldlm_lock *lock)
202 {
203         ENTRY;
204
205         LASSERT(lock->l_resource != LP_POISON);
206         LASSERT(atomic_read(&lock->l_refc) > 0);
207         if (atomic_dec_and_test(&lock->l_refc)) {
208                 struct ldlm_resource *res;
209
210                 LDLM_DEBUG(lock,
211                            "final lock_put on destroyed lock, freeing it.");
212
213                 res = lock->l_resource;
214                 LASSERT(ldlm_is_destroyed(lock));
215                 LASSERT(list_empty(&lock->l_res_link));
216                 LASSERT(list_empty(&lock->l_pending_chain));
217
218                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
219                                      LDLM_NSS_LOCKS);
220                 lu_ref_del(&res->lr_reference, "lock", lock);
221                 ldlm_resource_putref(res);
222                 lock->l_resource = NULL;
223                 if (lock->l_export) {
224                         class_export_lock_put(lock->l_export, lock);
225                         lock->l_export = NULL;
226                 }
227
228                 if (lock->l_lvb_data != NULL)
229                         OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
230
231                 ldlm_interval_free(ldlm_interval_detach(lock));
232                 lu_ref_fini(&lock->l_reference);
233                 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
234         }
235
236         EXIT;
237 }
238 EXPORT_SYMBOL(ldlm_lock_put);
239
240 /**
241  * Removes LDLM lock \a lock from LRU. Assumes LRU is already locked.
242  */
243 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
244 {
245         int rc = 0;
246         if (!list_empty(&lock->l_lru)) {
247                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
248
249                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
250                 list_del_init(&lock->l_lru);
251                 LASSERT(ns->ns_nr_unused > 0);
252                 ns->ns_nr_unused--;
253                 rc = 1;
254         }
255         return rc;
256 }
257
258 /**
259  * Removes LDLM lock \a lock from LRU. Obtains the LRU lock first.
260  */
261 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
262 {
263         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
264         int rc;
265
266         ENTRY;
267         if (ldlm_is_ns_srv(lock)) {
268                 LASSERT(list_empty(&lock->l_lru));
269                 RETURN(0);
270         }
271
272         spin_lock(&ns->ns_lock);
273         rc = ldlm_lock_remove_from_lru_nolock(lock);
274         spin_unlock(&ns->ns_lock);
275         EXIT;
276         return rc;
277 }
278
279 /**
280  * Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked.
281  */
282 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
283 {
284         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
285
286         lock->l_last_used = cfs_time_current();
287         LASSERT(list_empty(&lock->l_lru));
288         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
289         list_add_tail(&lock->l_lru, &ns->ns_unused_list);
290         ldlm_clear_skipped(lock);
291         LASSERT(ns->ns_nr_unused >= 0);
292         ns->ns_nr_unused++;
293 }
294
295 /**
296  * Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks
297  * first.
298  */
299 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
300 {
301         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
302
303         ENTRY;
304         spin_lock(&ns->ns_lock);
305         ldlm_lock_add_to_lru_nolock(lock);
306         spin_unlock(&ns->ns_lock);
307         EXIT;
308 }
309
310 /**
311  * Moves LDLM lock \a lock that is already in namespace LRU to the tail of
312  * the LRU. Performs necessary LRU locking
313  */
314 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
315 {
316         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
317
318         ENTRY;
319         if (ldlm_is_ns_srv(lock)) {
320                 LASSERT(list_empty(&lock->l_lru));
321                 EXIT;
322                 return;
323         }
324
325         spin_lock(&ns->ns_lock);
326         if (!list_empty(&lock->l_lru)) {
327                 ldlm_lock_remove_from_lru_nolock(lock);
328                 ldlm_lock_add_to_lru_nolock(lock);
329         }
330         spin_unlock(&ns->ns_lock);
331         EXIT;
332 }
333
334 /**
335  * Helper to destroy a locked lock.
336  *
337  * Used by ldlm_lock_destroy and ldlm_lock_destroy_nolock
338  * Must be called with l_lock and lr_lock held.
339  *
340  * Does not actually free the lock data, but rather marks the lock as
341  * destroyed by setting l_destroyed field in the lock to 1.  Destroys a
342  * handle->lock association too, so that the lock can no longer be found
343  * and removes the lock from LRU list.  Actual lock freeing occurs when
344  * last lock reference goes away.
345  *
346  * Original comment (of some historical value):
347  * This used to have a 'strict' flag, which recovery would use to mark an
348  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
349  * shall explain why it's gone: with the new hash table scheme, once you call
350  * ldlm_lock_destroy, you can never drop your final references on this lock.
351  * Because it's not in the hash table anymore.  -phil
352  */
353 static int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
354 {
355         ENTRY;
356
357         if (lock->l_readers || lock->l_writers) {
358                 LDLM_ERROR(lock, "lock still has references");
359                 LBUG();
360         }
361
362         if (!list_empty(&lock->l_res_link)) {
363                 LDLM_ERROR(lock, "lock still on resource");
364                 LBUG();
365         }
366
367         if (ldlm_is_destroyed(lock)) {
368                 LASSERT(list_empty(&lock->l_lru));
369                 EXIT;
370                 return 0;
371         }
372         ldlm_set_destroyed(lock);
373
374         if (lock->l_export && lock->l_export->exp_lock_hash) {
375                 /* NB: it's safe to call cfs_hash_del() even lock isn't
376                  * in exp_lock_hash. */
377                 /* In the function below, .hs_keycmp resolves to
378                  * ldlm_export_lock_keycmp() */
379                 /* coverity[overrun-buffer-val] */
380                 cfs_hash_del(lock->l_export->exp_lock_hash,
381                              &lock->l_remote_handle, &lock->l_exp_hash);
382         }
383
384         ldlm_lock_remove_from_lru(lock);
385         class_handle_unhash(&lock->l_handle);
386
387 #if 0
388         /* Wake anyone waiting for this lock */
389         /* FIXME: I should probably add yet another flag, instead of using
390          * l_export to only call this on clients */
391         if (lock->l_export)
392                 class_export_put(lock->l_export);
393         lock->l_export = NULL;
394         if (lock->l_export && lock->l_completion_ast)
395                 lock->l_completion_ast(lock, 0);
396 #endif
397         EXIT;
398         return 1;
399 }
400
401 /**
402  * Destroys a LDLM lock \a lock. Performs necessary locking first.
403  */
404 void ldlm_lock_destroy(struct ldlm_lock *lock)
405 {
406         int first;
407         ENTRY;
408         lock_res_and_lock(lock);
409         first = ldlm_lock_destroy_internal(lock);
410         unlock_res_and_lock(lock);
411
412         /* drop reference from hashtable only for first destroy */
413         if (first) {
414                 lu_ref_del(&lock->l_reference, "hash", lock);
415                 LDLM_LOCK_RELEASE(lock);
416         }
417         EXIT;
418 }
419
420 /**
421  * Destroys a LDLM lock \a lock that is already locked.
422  */
423 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
424 {
425         int first;
426         ENTRY;
427         first = ldlm_lock_destroy_internal(lock);
428         /* drop reference from hashtable only for first destroy */
429         if (first) {
430                 lu_ref_del(&lock->l_reference, "hash", lock);
431                 LDLM_LOCK_RELEASE(lock);
432         }
433         EXIT;
434 }
435
436 /* this is called by portals_handle2object with the handle lock taken */
437 static void lock_handle_addref(void *lock)
438 {
439         LDLM_LOCK_GET((struct ldlm_lock *)lock);
440 }
441
442 static void lock_handle_free(void *lock, int size)
443 {
444         LASSERT(size == sizeof(struct ldlm_lock));
445         OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
446 }
447
448 static struct portals_handle_ops lock_handle_ops = {
449         .hop_addref = lock_handle_addref,
450         .hop_free   = lock_handle_free,
451 };
452
453 /**
454  *
455  * Allocate and initialize new lock structure.
456  *
457  * usage: pass in a resource on which you have done ldlm_resource_get
458  *        new lock will take over the refcount.
459  * returns: lock with refcount 2 - one for current caller and one for remote
460  */
461 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
462 {
463         struct ldlm_lock *lock;
464         ENTRY;
465
466         if (resource == NULL)
467                 LBUG();
468
469         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, GFP_NOFS);
470         if (lock == NULL)
471                 RETURN(NULL);
472
473         spin_lock_init(&lock->l_lock);
474         lock->l_resource = resource;
475         lu_ref_add(&resource->lr_reference, "lock", lock);
476
477         atomic_set(&lock->l_refc, 2);
478         INIT_LIST_HEAD(&lock->l_res_link);
479         INIT_LIST_HEAD(&lock->l_lru);
480         INIT_LIST_HEAD(&lock->l_pending_chain);
481         INIT_LIST_HEAD(&lock->l_bl_ast);
482         INIT_LIST_HEAD(&lock->l_cp_ast);
483         INIT_LIST_HEAD(&lock->l_rk_ast);
484         init_waitqueue_head(&lock->l_waitq);
485         lock->l_blocking_lock = NULL;
486         INIT_LIST_HEAD(&lock->l_sl_mode);
487         INIT_LIST_HEAD(&lock->l_sl_policy);
488         INIT_HLIST_NODE(&lock->l_exp_hash);
489         INIT_HLIST_NODE(&lock->l_exp_flock_hash);
490
491         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
492                              LDLM_NSS_LOCKS);
493         INIT_LIST_HEAD(&lock->l_handle.h_link);
494         class_handle_hash(&lock->l_handle, &lock_handle_ops);
495
496         lu_ref_init(&lock->l_reference);
497         lu_ref_add(&lock->l_reference, "hash", lock);
498         lock->l_callback_timeout = 0;
499
500 #if LUSTRE_TRACKS_LOCK_EXP_REFS
501         INIT_LIST_HEAD(&lock->l_exp_refs_link);
502         lock->l_exp_refs_nr = 0;
503         lock->l_exp_refs_target = NULL;
504 #endif
505         INIT_LIST_HEAD(&lock->l_exp_list);
506
507         RETURN(lock);
508 }
509
510 /**
511  * Moves LDLM lock \a lock to another resource.
512  * This is used on client when server returns some other lock than requested
513  * (typically as a result of intent operation)
514  */
515 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
516                               const struct ldlm_res_id *new_resid)
517 {
518         struct ldlm_resource *oldres = lock->l_resource;
519         struct ldlm_resource *newres;
520         int type;
521         ENTRY;
522
523         LASSERT(ns_is_client(ns));
524
525         lock_res_and_lock(lock);
526         if (memcmp(new_resid, &lock->l_resource->lr_name,
527                    sizeof(lock->l_resource->lr_name)) == 0) {
528                 /* Nothing to do */
529                 unlock_res_and_lock(lock);
530                 RETURN(0);
531         }
532
533         LASSERT(new_resid->name[0] != 0);
534
535         /* This function assumes that the lock isn't on any lists */
536         LASSERT(list_empty(&lock->l_res_link));
537
538         type = oldres->lr_type;
539         unlock_res_and_lock(lock);
540
541         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
542         if (IS_ERR(newres))
543                 RETURN(PTR_ERR(newres));
544
545         lu_ref_add(&newres->lr_reference, "lock", lock);
546         /*
547          * To flip the lock from the old to the new resource, lock, oldres and
548          * newres have to be locked. Resource spin-locks are nested within
549          * lock->l_lock, and are taken in the memory address order to avoid
550          * dead-locks.
551          */
552         spin_lock(&lock->l_lock);
553         oldres = lock->l_resource;
554         if (oldres < newres) {
555                 lock_res(oldres);
556                 lock_res_nested(newres, LRT_NEW);
557         } else {
558                 lock_res(newres);
559                 lock_res_nested(oldres, LRT_NEW);
560         }
561         LASSERT(memcmp(new_resid, &oldres->lr_name,
562                        sizeof oldres->lr_name) != 0);
563         lock->l_resource = newres;
564         unlock_res(oldres);
565         unlock_res_and_lock(lock);
566
567         /* ...and the flowers are still standing! */
568         lu_ref_del(&oldres->lr_reference, "lock", lock);
569         ldlm_resource_putref(oldres);
570
571         RETURN(0);
572 }
573 EXPORT_SYMBOL(ldlm_lock_change_resource);
574
575 /** \defgroup ldlm_handles LDLM HANDLES
576  * Ways to get hold of locks without any addresses.
577  * @{
578  */
579
580 /**
581  * Fills in handle for LDLM lock \a lock into supplied \a lockh
582  * Does not take any references.
583  */
584 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
585 {
586         lockh->cookie = lock->l_handle.h_cookie;
587 }
588 EXPORT_SYMBOL(ldlm_lock2handle);
589
590 /**
591  * Obtain a lock reference by handle.
592  *
593  * if \a flags: atomically get the lock and set the flags.
594  *              Return NULL if flag already set
595  */
596 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
597                                      __u64 flags)
598 {
599         struct ldlm_lock *lock;
600         ENTRY;
601
602         LASSERT(handle);
603
604         lock = class_handle2object(handle->cookie, NULL);
605         if (lock == NULL)
606                 RETURN(NULL);
607
608         /* It's unlikely but possible that someone marked the lock as
609          * destroyed after we did handle2object on it */
610         if ((flags == 0) && !ldlm_is_destroyed(lock)) {
611                 lu_ref_add(&lock->l_reference, "handle", current);
612                 RETURN(lock);
613         }
614
615         lock_res_and_lock(lock);
616
617         LASSERT(lock->l_resource != NULL);
618
619         lu_ref_add_atomic(&lock->l_reference, "handle", current);
620         if (unlikely(ldlm_is_destroyed(lock))) {
621                 unlock_res_and_lock(lock);
622                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
623                 LDLM_LOCK_PUT(lock);
624                 RETURN(NULL);
625         }
626
627         /* If we're setting flags, make sure none of them are already set. */
628         if (flags != 0) {
629                 if ((lock->l_flags & flags) != 0) {
630                         unlock_res_and_lock(lock);
631                         LDLM_LOCK_PUT(lock);
632                         RETURN(NULL);
633                 }
634
635                 lock->l_flags |= flags;
636         }
637
638         unlock_res_and_lock(lock);
639         RETURN(lock);
640 }
641 EXPORT_SYMBOL(__ldlm_handle2lock);
642 /** @} ldlm_handles */
643
644 /**
645  * Fill in "on the wire" representation for given LDLM lock into supplied
646  * lock descriptor \a desc structure.
647  */
648 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
649 {
650         ldlm_res2desc(lock->l_resource, &desc->l_resource);
651         desc->l_req_mode = lock->l_req_mode;
652         desc->l_granted_mode = lock->l_granted_mode;
653         ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
654                                     &lock->l_policy_data,
655                                     &desc->l_policy_data);
656 }
657 EXPORT_SYMBOL(ldlm_lock2desc);
658
659 /**
660  * Add a lock to list of conflicting locks to send AST to.
661  *
662  * Only add if we have not sent a blocking AST to the lock yet.
663  */
664 static void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
665                                   struct list_head *work_list)
666 {
667         if (!ldlm_is_ast_sent(lock)) {
668                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
669                 ldlm_set_ast_sent(lock);
670                 /* If the enqueuing client said so, tell the AST recipient to
671                  * discard dirty data, rather than writing back. */
672                 if (ldlm_is_ast_discard_data(new))
673                         ldlm_set_discard_data(lock);
674                 LASSERT(list_empty(&lock->l_bl_ast));
675                 list_add(&lock->l_bl_ast, work_list);
676                 LDLM_LOCK_GET(lock);
677                 LASSERT(lock->l_blocking_lock == NULL);
678                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
679         }
680 }
681
682 /**
683  * Add a lock to list of just granted locks to send completion AST to.
684  */
685 static void ldlm_add_cp_work_item(struct ldlm_lock *lock,
686                                   struct list_head *work_list)
687 {
688         if (!ldlm_is_cp_reqd(lock)) {
689                 ldlm_set_cp_reqd(lock);
690                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
691                 LASSERT(list_empty(&lock->l_cp_ast));
692                 list_add(&lock->l_cp_ast, work_list);
693                 LDLM_LOCK_GET(lock);
694         }
695 }
696
697 /**
698  * Aggregator function to add AST work items into a list. Determines
699  * what sort of an AST work needs to be done and calls the proper
700  * adding function.
701  * Must be called with lr_lock held.
702  */
703 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
704                             struct list_head *work_list)
705 {
706         ENTRY;
707         check_res_locked(lock->l_resource);
708         if (new)
709                 ldlm_add_bl_work_item(lock, new, work_list);
710         else
711                 ldlm_add_cp_work_item(lock, work_list);
712         EXIT;
713 }
714
715 /**
716  * Add specified reader/writer reference to LDLM lock with handle \a lockh.
717  * r/w reference type is determined by \a mode
718  * Calls ldlm_lock_addref_internal.
719  */
720 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
721 {
722         struct ldlm_lock *lock;
723
724         lock = ldlm_handle2lock(lockh);
725         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
726         ldlm_lock_addref_internal(lock, mode);
727         LDLM_LOCK_PUT(lock);
728 }
729 EXPORT_SYMBOL(ldlm_lock_addref);
730
731 /**
732  * Helper function.
733  * Add specified reader/writer reference to LDLM lock \a lock.
734  * r/w reference type is determined by \a mode
735  * Removes lock from LRU if it is there.
736  * Assumes the LDLM lock is already locked.
737  */
738 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
739 {
740         ldlm_lock_remove_from_lru(lock);
741         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
742                 lock->l_readers++;
743                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
744         }
745         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
746                 lock->l_writers++;
747                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
748         }
749         LDLM_LOCK_GET(lock);
750         lu_ref_add_atomic(&lock->l_reference, "user", lock);
751         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
752 }
753
754 /**
755  * Attempts to add reader/writer reference to a lock with handle \a lockh, and
756  * fails if lock is already LDLM_FL_CBPENDING or destroyed.
757  *
758  * \retval 0 success, lock was addref-ed
759  *
760  * \retval -EAGAIN lock is being canceled.
761  */
762 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
763 {
764         struct ldlm_lock *lock;
765         int               result;
766
767         result = -EAGAIN;
768         lock = ldlm_handle2lock(lockh);
769         if (lock != NULL) {
770                 lock_res_and_lock(lock);
771                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
772                     !ldlm_is_cbpending(lock)) {
773                         ldlm_lock_addref_internal_nolock(lock, mode);
774                         result = 0;
775                 }
776                 unlock_res_and_lock(lock);
777                 LDLM_LOCK_PUT(lock);
778         }
779         return result;
780 }
781 EXPORT_SYMBOL(ldlm_lock_addref_try);
782
783 /**
784  * Add specified reader/writer reference to LDLM lock \a lock.
785  * Locks LDLM lock and calls ldlm_lock_addref_internal_nolock to do the work.
786  * Only called for local locks.
787  */
788 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
789 {
790         lock_res_and_lock(lock);
791         ldlm_lock_addref_internal_nolock(lock, mode);
792         unlock_res_and_lock(lock);
793 }
794
795 /**
796  * Removes reader/writer reference for LDLM lock \a lock.
797  * Assumes LDLM lock is already locked.
798  * only called in ldlm_flock_destroy and for local locks.
799  * Does NOT add lock to LRU if no r/w references left to accomodate flock locks
800  * that cannot be placed in LRU.
801  */
802 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
803 {
804         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
805         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
806                 LASSERT(lock->l_readers > 0);
807                 lu_ref_del(&lock->l_reference, "reader", lock);
808                 lock->l_readers--;
809         }
810         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
811                 LASSERT(lock->l_writers > 0);
812                 lu_ref_del(&lock->l_reference, "writer", lock);
813                 lock->l_writers--;
814         }
815
816         lu_ref_del(&lock->l_reference, "user", lock);
817         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
818 }
819
820 /**
821  * Removes reader/writer reference for LDLM lock \a lock.
822  * Locks LDLM lock first.
823  * If the lock is determined to be client lock on a client and r/w refcount
824  * drops to zero and the lock is not blocked, the lock is added to LRU lock
825  * on the namespace.
826  * For blocked LDLM locks if r/w count drops to zero, blocking_ast is called.
827  */
828 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
829 {
830         struct ldlm_namespace *ns;
831         ENTRY;
832
833         lock_res_and_lock(lock);
834
835         ns = ldlm_lock_to_ns(lock);
836
837         ldlm_lock_decref_internal_nolock(lock, mode);
838
839         if (ldlm_is_local(lock) &&
840             !lock->l_readers && !lock->l_writers) {
841                 /* If this is a local lock on a server namespace and this was
842                  * the last reference, cancel the lock. */
843                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
844                 ldlm_set_cbpending(lock);
845         }
846
847         if (!lock->l_readers && !lock->l_writers &&
848             ldlm_is_cbpending(lock)) {
849                 /* If we received a blocked AST and this was the last reference,
850                  * run the callback. */
851                 if (ldlm_is_ns_srv(lock) && lock->l_export)
852                         CERROR("FL_CBPENDING set on non-local lock--just a "
853                                "warning\n");
854
855                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
856
857                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
858                 ldlm_lock_remove_from_lru(lock);
859                 unlock_res_and_lock(lock);
860
861                 if (ldlm_is_fail_loc(lock))
862                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
863
864                 if (ldlm_is_atomic_cb(lock) ||
865                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
866                         ldlm_handle_bl_callback(ns, NULL, lock);
867         } else if (ns_is_client(ns) &&
868                    !lock->l_readers && !lock->l_writers &&
869                    !ldlm_is_no_lru(lock) &&
870                    !ldlm_is_bl_ast(lock)) {
871
872                 LDLM_DEBUG(lock, "add lock into lru list");
873
874                 /* If this is a client-side namespace and this was the last
875                  * reference, put it on the LRU. */
876                 ldlm_lock_add_to_lru(lock);
877                 unlock_res_and_lock(lock);
878
879                 if (ldlm_is_fail_loc(lock))
880                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
881
882                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
883                  * are not supported by the server, otherwise, it is done on
884                  * enqueue. */
885                 if (!exp_connect_cancelset(lock->l_conn_export) &&
886                     !ns_connect_lru_resize(ns))
887                         ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
888         } else {
889                 LDLM_DEBUG(lock, "do not add lock into lru list");
890                 unlock_res_and_lock(lock);
891         }
892
893         EXIT;
894 }
895
896 /**
897  * Decrease reader/writer refcount for LDLM lock with handle \a lockh
898  */
899 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
900 {
901         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
902         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
903         ldlm_lock_decref_internal(lock, mode);
904         LDLM_LOCK_PUT(lock);
905 }
906 EXPORT_SYMBOL(ldlm_lock_decref);
907
908 /**
909  * Decrease reader/writer refcount for LDLM lock with handle
910  * \a lockh and mark it for subsequent cancellation once r/w refcount
911  * drops to zero instead of putting into LRU.
912  *
913  * Typical usage is for GROUP locks which we cannot allow to be cached.
914  */
915 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
916 {
917         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
918         ENTRY;
919
920         LASSERT(lock != NULL);
921
922         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
923         lock_res_and_lock(lock);
924         ldlm_set_cbpending(lock);
925         unlock_res_and_lock(lock);
926         ldlm_lock_decref_internal(lock, mode);
927         LDLM_LOCK_PUT(lock);
928 }
929 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
930
931 struct sl_insert_point {
932         struct list_head *res_link;
933         struct list_head *mode_link;
934         struct list_head *policy_link;
935 };
936
937 /**
938  * Finds a position to insert the new lock into granted lock list.
939  *
940  * Used for locks eligible for skiplist optimization.
941  *
942  * Parameters:
943  *      queue [input]:  the granted list where search acts on;
944  *      req [input]:    the lock whose position to be located;
945  *      prev [output]:  positions within 3 lists to insert @req to
946  * Return Value:
947  *      filled @prev
948  * NOTE: called by
949  *  - ldlm_grant_lock_with_skiplist
950  */
951 static void search_granted_lock(struct list_head *queue,
952                                 struct ldlm_lock *req,
953                                 struct sl_insert_point *prev)
954 {
955         struct list_head *tmp;
956         struct ldlm_lock *lock, *mode_end, *policy_end;
957         ENTRY;
958
959         list_for_each(tmp, queue) {
960                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
961
962                 mode_end = list_entry(lock->l_sl_mode.prev,
963                                           struct ldlm_lock, l_sl_mode);
964
965                 if (lock->l_req_mode != req->l_req_mode) {
966                         /* jump to last lock of mode group */
967                         tmp = &mode_end->l_res_link;
968                         continue;
969                 }
970
971                 /* suitable mode group is found */
972                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
973                         /* insert point is last lock of the mode group */
974                         prev->res_link = &mode_end->l_res_link;
975                         prev->mode_link = &mode_end->l_sl_mode;
976                         prev->policy_link = &req->l_sl_policy;
977                         EXIT;
978                         return;
979                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
980                         for (;;) {
981                                 policy_end =
982                                         list_entry(lock->l_sl_policy.prev,
983                                                        struct ldlm_lock,
984                                                        l_sl_policy);
985
986                                 if (lock->l_policy_data.l_inodebits.bits ==
987                                     req->l_policy_data.l_inodebits.bits) {
988                                         /* insert point is last lock of
989                                          * the policy group */
990                                         prev->res_link =
991                                                 &policy_end->l_res_link;
992                                         prev->mode_link =
993                                                 &policy_end->l_sl_mode;
994                                         prev->policy_link =
995                                                 &policy_end->l_sl_policy;
996                                         EXIT;
997                                         return;
998                                 }
999
1000                                 if (policy_end == mode_end)
1001                                         /* done with mode group */
1002                                         break;
1003
1004                                 /* go to next policy group within mode group */
1005                                 tmp = policy_end->l_res_link.next;
1006                                 lock = list_entry(tmp, struct ldlm_lock,
1007                                                       l_res_link);
1008                         }  /* loop over policy groups within the mode group */
1009
1010                         /* insert point is last lock of the mode group,
1011                          * new policy group is started */
1012                         prev->res_link = &mode_end->l_res_link;
1013                         prev->mode_link = &mode_end->l_sl_mode;
1014                         prev->policy_link = &req->l_sl_policy;
1015                         EXIT;
1016                         return;
1017                 } else {
1018                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
1019                         LBUG();
1020                 }
1021         }
1022
1023         /* insert point is last lock on the queue,
1024          * new mode group and new policy group are started */
1025         prev->res_link = queue->prev;
1026         prev->mode_link = &req->l_sl_mode;
1027         prev->policy_link = &req->l_sl_policy;
1028         EXIT;
1029         return;
1030 }
1031
1032 /**
1033  * Add a lock into resource granted list after a position described by
1034  * \a prev.
1035  */
1036 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
1037                                        struct sl_insert_point *prev)
1038 {
1039         struct ldlm_resource *res = lock->l_resource;
1040         ENTRY;
1041
1042         check_res_locked(res);
1043
1044         ldlm_resource_dump(D_INFO, res);
1045         LDLM_DEBUG(lock, "About to add lock:");
1046
1047         if (ldlm_is_destroyed(lock)) {
1048                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
1049                 return;
1050         }
1051
1052         LASSERT(list_empty(&lock->l_res_link));
1053         LASSERT(list_empty(&lock->l_sl_mode));
1054         LASSERT(list_empty(&lock->l_sl_policy));
1055
1056         /*
1057          * lock->link == prev->link means lock is first starting the group.
1058          * Don't re-add to itself to suppress kernel warnings.
1059          */
1060         if (&lock->l_res_link != prev->res_link)
1061                 list_add(&lock->l_res_link, prev->res_link);
1062         if (&lock->l_sl_mode != prev->mode_link)
1063                 list_add(&lock->l_sl_mode, prev->mode_link);
1064         if (&lock->l_sl_policy != prev->policy_link)
1065                 list_add(&lock->l_sl_policy, prev->policy_link);
1066
1067         EXIT;
1068 }
1069
1070 /**
1071  * Add a lock to granted list on a resource maintaining skiplist
1072  * correctness.
1073  */
1074 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
1075 {
1076         struct sl_insert_point prev;
1077         ENTRY;
1078
1079         LASSERT(lock->l_req_mode == lock->l_granted_mode);
1080
1081         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
1082         ldlm_granted_list_add_lock(lock, &prev);
1083         EXIT;
1084 }
1085
1086 /**
1087  * Perform lock granting bookkeeping.
1088  *
1089  * Includes putting the lock into granted list and updating lock mode.
1090  * NOTE: called by
1091  *  - ldlm_lock_enqueue
1092  *  - ldlm_reprocess_queue
1093  *  - ldlm_lock_convert
1094  *
1095  * must be called with lr_lock held
1096  */
1097 void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
1098 {
1099         struct ldlm_resource *res = lock->l_resource;
1100         ENTRY;
1101
1102         check_res_locked(res);
1103
1104         lock->l_granted_mode = lock->l_req_mode;
1105
1106         if (work_list && lock->l_completion_ast != NULL)
1107                 ldlm_add_ast_work_item(lock, NULL, work_list);
1108
1109         /* We should not add locks to granted list in the following cases:
1110          * - this is an UNLOCK but not a real lock;
1111          * - this is a TEST lock;
1112          * - this is a F_CANCELLK lock (async flock has req_mode == 0)
1113          * - this is a deadlock (flock cannot be granted) */
1114         if (lock->l_req_mode == 0 ||
1115             lock->l_req_mode == LCK_NL ||
1116             ldlm_is_test_lock(lock) ||
1117             ldlm_is_flock_deadlock(lock))
1118                 RETURN_EXIT;
1119
1120         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1121                 ldlm_grant_lock_with_skiplist(lock);
1122         else if (res->lr_type == LDLM_EXTENT)
1123                 ldlm_extent_add_lock(res, lock);
1124         else
1125                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1126
1127         if (lock->l_granted_mode < res->lr_most_restr)
1128                 res->lr_most_restr = lock->l_granted_mode;
1129
1130         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1131         EXIT;
1132 }
1133
1134 /**
1135  * Search for a lock with given properties in a queue.
1136  *
1137  * \retval a referenced lock or NULL.  See the flag descriptions below, in the
1138  * comment above ldlm_lock_match
1139  */
1140 static struct ldlm_lock *search_queue(struct list_head *queue,
1141                                       ldlm_mode_t *mode,
1142                                       ldlm_policy_data_t *policy,
1143                                       struct ldlm_lock *old_lock,
1144                                       __u64 flags, int unref)
1145 {
1146         struct ldlm_lock *lock;
1147         struct list_head       *tmp;
1148
1149         list_for_each(tmp, queue) {
1150                 ldlm_mode_t match;
1151
1152                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1153
1154                 if (lock == old_lock)
1155                         break;
1156
1157                 /* Check if this lock can be matched.
1158                  * Used by LU-2919(exclusive open) for open lease lock */
1159                 if (ldlm_is_excl(lock))
1160                         continue;
1161
1162                 /* llite sometimes wants to match locks that will be
1163                  * canceled when their users drop, but we allow it to match
1164                  * if it passes in CBPENDING and the lock still has users.
1165                  * this is generally only going to be used by children
1166                  * whose parents already hold a lock so forward progress
1167                  * can still happen. */
1168                 if (ldlm_is_cbpending(lock) &&
1169                     !(flags & LDLM_FL_CBPENDING))
1170                         continue;
1171                 if (!unref && ldlm_is_cbpending(lock) &&
1172                     lock->l_readers == 0 && lock->l_writers == 0)
1173                         continue;
1174
1175                 if (!(lock->l_req_mode & *mode))
1176                         continue;
1177                 match = lock->l_req_mode;
1178
1179                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
1180                     (lock->l_policy_data.l_extent.start >
1181                      policy->l_extent.start ||
1182                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
1183                         continue;
1184
1185                 if (unlikely(match == LCK_GROUP) &&
1186                     lock->l_resource->lr_type == LDLM_EXTENT &&
1187                     policy->l_extent.gid != LDLM_GID_ANY &&
1188                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1189                         continue;
1190
1191                 /* We match if we have existing lock with same or wider set
1192                    of bits. */
1193                 if (lock->l_resource->lr_type == LDLM_IBITS &&
1194                      ((lock->l_policy_data.l_inodebits.bits &
1195                       policy->l_inodebits.bits) !=
1196                       policy->l_inodebits.bits))
1197                         continue;
1198
1199                 if (!unref && LDLM_HAVE_MASK(lock, GONE))
1200                         continue;
1201
1202                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
1203                     !ldlm_is_local(lock))
1204                         continue;
1205
1206                 if (flags & LDLM_FL_TEST_LOCK) {
1207                         LDLM_LOCK_GET(lock);
1208                         ldlm_lock_touch_in_lru(lock);
1209                 } else {
1210                         ldlm_lock_addref_internal_nolock(lock, match);
1211                 }
1212                 *mode = match;
1213                 return lock;
1214         }
1215
1216         return NULL;
1217 }
1218
1219 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1220 {
1221         if ((lock->l_flags & LDLM_FL_FAIL_NOTIFIED) == 0) {
1222                 lock->l_flags |= LDLM_FL_FAIL_NOTIFIED;
1223                 wake_up_all(&lock->l_waitq);
1224         }
1225 }
1226 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1227
1228 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1229 {
1230         lock_res_and_lock(lock);
1231         ldlm_lock_fail_match_locked(lock);
1232         unlock_res_and_lock(lock);
1233 }
1234 EXPORT_SYMBOL(ldlm_lock_fail_match);
1235
1236 /**
1237  * Mark lock as "matchable" by OST.
1238  *
1239  * Used to prevent certain races in LOV/OSC where the lock is granted, but LVB
1240  * is not yet valid.
1241  * Assumes LDLM lock is already locked.
1242  */
1243 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1244 {
1245         ldlm_set_lvb_ready(lock);
1246         wake_up_all(&lock->l_waitq);
1247 }
1248 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
1249
1250 /**
1251  * Mark lock as "matchable" by OST.
1252  * Locks the lock and then \see ldlm_lock_allow_match_locked
1253  */
1254 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1255 {
1256         lock_res_and_lock(lock);
1257         ldlm_lock_allow_match_locked(lock);
1258         unlock_res_and_lock(lock);
1259 }
1260 EXPORT_SYMBOL(ldlm_lock_allow_match);
1261
1262 /**
1263  * Attempt to find a lock with specified properties.
1264  *
1265  * Typically returns a reference to matched lock unless LDLM_FL_TEST_LOCK is
1266  * set in \a flags
1267  *
1268  * Can be called in two ways:
1269  *
1270  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1271  * for a duplicate of.
1272  *
1273  * Otherwise, all of the fields must be filled in, to match against.
1274  *
1275  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1276  *     server (ie, connh is NULL)
1277  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1278  *     list will be considered
1279  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1280  *     to be canceled can still be matched as long as they still have reader
1281  *     or writer refernces
1282  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1283  *     just tell us if we would have matched.
1284  *
1285  * \retval 1 if it finds an already-existing lock that is compatible; in this
1286  * case, lockh is filled in with a addref()ed lock
1287  *
1288  * We also check security context, and if that fails we simply return 0 (to
1289  * keep caller code unchanged), the context failure will be discovered by
1290  * caller sometime later.
1291  */
1292 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
1293                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1294                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1295                             struct lustre_handle *lockh, int unref)
1296 {
1297         struct ldlm_resource *res;
1298         struct ldlm_lock *lock, *old_lock = NULL;
1299         int rc = 0;
1300         ENTRY;
1301
1302         if (ns == NULL) {
1303                 old_lock = ldlm_handle2lock(lockh);
1304                 LASSERT(old_lock);
1305
1306                 ns = ldlm_lock_to_ns(old_lock);
1307                 res_id = &old_lock->l_resource->lr_name;
1308                 type = old_lock->l_resource->lr_type;
1309                 mode = old_lock->l_req_mode;
1310         }
1311
1312         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1313         if (IS_ERR(res)) {
1314                 LASSERT(old_lock == NULL);
1315                 RETURN(0);
1316         }
1317
1318         LDLM_RESOURCE_ADDREF(res);
1319         lock_res(res);
1320
1321         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1322                             flags, unref);
1323         if (lock != NULL)
1324                 GOTO(out, rc = 1);
1325         if (flags & LDLM_FL_BLOCK_GRANTED)
1326                 GOTO(out, rc = 0);
1327         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1328                             flags, unref);
1329         if (lock != NULL)
1330                 GOTO(out, rc = 1);
1331         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1332                             flags, unref);
1333         if (lock != NULL)
1334                 GOTO(out, rc = 1);
1335
1336         EXIT;
1337  out:
1338         unlock_res(res);
1339         LDLM_RESOURCE_DELREF(res);
1340         ldlm_resource_putref(res);
1341
1342         if (lock) {
1343                 ldlm_lock2handle(lock, lockh);
1344                 if ((flags & LDLM_FL_LVB_READY) &&
1345                     (!ldlm_is_lvb_ready(lock))) {
1346                         __u64 wait_flags = LDLM_FL_LVB_READY |
1347                                 LDLM_FL_DESTROYED | LDLM_FL_FAIL_NOTIFIED;
1348                         struct l_wait_info lwi;
1349                         if (lock->l_completion_ast) {
1350                                 int err = lock->l_completion_ast(lock,
1351                                                           LDLM_FL_WAIT_NOREPROC,
1352                                                                  NULL);
1353                                 if (err) {
1354                                         if (flags & LDLM_FL_TEST_LOCK)
1355                                                 LDLM_LOCK_RELEASE(lock);
1356                                         else
1357                                                 ldlm_lock_decref_internal(lock,
1358                                                                           mode);
1359                                         rc = 0;
1360                                         goto out2;
1361                                 }
1362                         }
1363
1364                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1365                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1366
1367                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1368                         l_wait_event(lock->l_waitq,
1369                                      lock->l_flags & wait_flags,
1370                                      &lwi);
1371                         if (!ldlm_is_lvb_ready(lock)) {
1372                                 if (flags & LDLM_FL_TEST_LOCK)
1373                                         LDLM_LOCK_RELEASE(lock);
1374                                 else
1375                                         ldlm_lock_decref_internal(lock, mode);
1376                                 rc = 0;
1377                         }
1378                 }
1379         }
1380  out2:
1381         if (rc) {
1382                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1383                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1384                                 res_id->name[2] : policy->l_extent.start,
1385                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1386                                 res_id->name[3] : policy->l_extent.end);
1387
1388                 /* check user's security context */
1389                 if (lock->l_conn_export &&
1390                     sptlrpc_import_check_ctx(
1391                                 class_exp2cliimp(lock->l_conn_export))) {
1392                         if (!(flags & LDLM_FL_TEST_LOCK))
1393                                 ldlm_lock_decref_internal(lock, mode);
1394                         rc = 0;
1395                 }
1396
1397                 if (flags & LDLM_FL_TEST_LOCK)
1398                         LDLM_LOCK_RELEASE(lock);
1399
1400         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1401                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1402                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1403                                   type, mode, res_id->name[0], res_id->name[1],
1404                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1405                                         res_id->name[2] :policy->l_extent.start,
1406                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1407                                         res_id->name[3] : policy->l_extent.end);
1408         }
1409         if (old_lock)
1410                 LDLM_LOCK_PUT(old_lock);
1411
1412         return rc ? mode : 0;
1413 }
1414 EXPORT_SYMBOL(ldlm_lock_match);
1415
1416 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1417                                         __u64 *bits)
1418 {
1419         struct ldlm_lock *lock;
1420         ldlm_mode_t mode = 0;
1421         ENTRY;
1422
1423         lock = ldlm_handle2lock(lockh);
1424         if (lock != NULL) {
1425                 lock_res_and_lock(lock);
1426                 if (LDLM_HAVE_MASK(lock, GONE))
1427                         GOTO(out, mode);
1428
1429                 if (ldlm_is_cbpending(lock) &&
1430                     lock->l_readers == 0 && lock->l_writers == 0)
1431                         GOTO(out, mode);
1432
1433                 if (bits)
1434                         *bits = lock->l_policy_data.l_inodebits.bits;
1435                 mode = lock->l_granted_mode;
1436                 ldlm_lock_addref_internal_nolock(lock, mode);
1437         }
1438
1439         EXIT;
1440
1441 out:
1442         if (lock != NULL) {
1443                 unlock_res_and_lock(lock);
1444                 LDLM_LOCK_PUT(lock);
1445         }
1446         return mode;
1447 }
1448 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1449
1450 /** The caller must guarantee that the buffer is large enough. */
1451 int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
1452                   enum req_location loc, void *data, int size)
1453 {
1454         void *lvb;
1455         ENTRY;
1456
1457         LASSERT(data != NULL);
1458         LASSERT(size >= 0);
1459
1460         switch (lock->l_lvb_type) {
1461         case LVB_T_OST:
1462                 if (size == sizeof(struct ost_lvb)) {
1463                         if (loc == RCL_CLIENT)
1464                                 lvb = req_capsule_client_swab_get(pill,
1465                                                 &RMF_DLM_LVB,
1466                                                 lustre_swab_ost_lvb);
1467                         else
1468                                 lvb = req_capsule_server_swab_get(pill,
1469                                                 &RMF_DLM_LVB,
1470                                                 lustre_swab_ost_lvb);
1471                         if (unlikely(lvb == NULL)) {
1472                                 LDLM_ERROR(lock, "no LVB");
1473                                 RETURN(-EPROTO);
1474                         }
1475
1476                         memcpy(data, lvb, size);
1477                 } else if (size == sizeof(struct ost_lvb_v1)) {
1478                         struct ost_lvb *olvb = data;
1479
1480                         if (loc == RCL_CLIENT)
1481                                 lvb = req_capsule_client_swab_get(pill,
1482                                                 &RMF_DLM_LVB,
1483                                                 lustre_swab_ost_lvb_v1);
1484                         else
1485                                 lvb = req_capsule_server_sized_swab_get(pill,
1486                                                 &RMF_DLM_LVB, size,
1487                                                 lustre_swab_ost_lvb_v1);
1488                         if (unlikely(lvb == NULL)) {
1489                                 LDLM_ERROR(lock, "no LVB");
1490                                 RETURN(-EPROTO);
1491                         }
1492
1493                         memcpy(data, lvb, size);
1494                         olvb->lvb_mtime_ns = 0;
1495                         olvb->lvb_atime_ns = 0;
1496                         olvb->lvb_ctime_ns = 0;
1497                 } else {
1498                         LDLM_ERROR(lock, "Replied unexpected ost LVB size %d",
1499                                    size);
1500                         RETURN(-EINVAL);
1501                 }
1502                 break;
1503         case LVB_T_LQUOTA:
1504                 if (size == sizeof(struct lquota_lvb)) {
1505                         if (loc == RCL_CLIENT)
1506                                 lvb = req_capsule_client_swab_get(pill,
1507                                                 &RMF_DLM_LVB,
1508                                                 lustre_swab_lquota_lvb);
1509                         else
1510                                 lvb = req_capsule_server_swab_get(pill,
1511                                                 &RMF_DLM_LVB,
1512                                                 lustre_swab_lquota_lvb);
1513                         if (unlikely(lvb == NULL)) {
1514                                 LDLM_ERROR(lock, "no LVB");
1515                                 RETURN(-EPROTO);
1516                         }
1517
1518                         memcpy(data, lvb, size);
1519                 } else {
1520                         LDLM_ERROR(lock, "Replied unexpected lquota LVB size %d",
1521                                    size);
1522                         RETURN(-EINVAL);
1523                 }
1524                 break;
1525         case LVB_T_LAYOUT:
1526                 if (size == 0)
1527                         break;
1528
1529                 if (loc == RCL_CLIENT)
1530                         lvb = req_capsule_client_get(pill, &RMF_DLM_LVB);
1531                 else
1532                         lvb = req_capsule_server_get(pill, &RMF_DLM_LVB);
1533                 if (unlikely(lvb == NULL)) {
1534                         LDLM_ERROR(lock, "no LVB");
1535                         RETURN(-EPROTO);
1536                 }
1537
1538                 memcpy(data, lvb, size);
1539                 break;
1540         default:
1541                 LDLM_ERROR(lock, "Unknown LVB type: %d\n", lock->l_lvb_type);
1542                 libcfs_debug_dumpstack(NULL);
1543                 RETURN(-EINVAL);
1544         }
1545
1546         RETURN(0);
1547 }
1548
1549 /**
1550  * Create and fill in new LDLM lock with specified properties.
1551  * Returns a referenced lock
1552  */
1553 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1554                                    const struct ldlm_res_id *res_id,
1555                                    ldlm_type_t type,
1556                                    ldlm_mode_t mode,
1557                                    const struct ldlm_callback_suite *cbs,
1558                                    void *data, __u32 lvb_len,
1559                                    enum lvb_type lvb_type)
1560 {
1561         struct ldlm_lock        *lock;
1562         struct ldlm_resource    *res;
1563         int                     rc;
1564         ENTRY;
1565
1566         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1567         if (IS_ERR(res))
1568                 RETURN(ERR_CAST(res));
1569
1570         lock = ldlm_lock_new(res);
1571         if (lock == NULL)
1572                 RETURN(ERR_PTR(-ENOMEM));
1573
1574         lock->l_req_mode = mode;
1575         lock->l_ast_data = data;
1576         lock->l_pid = current_pid();
1577         if (ns_is_server(ns))
1578                 ldlm_set_ns_srv(lock);
1579         if (cbs) {
1580                 lock->l_blocking_ast = cbs->lcs_blocking;
1581                 lock->l_completion_ast = cbs->lcs_completion;
1582                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1583         }
1584
1585         lock->l_tree_node = NULL;
1586         /* if this is the extent lock, allocate the interval tree node */
1587         if (type == LDLM_EXTENT)
1588                 if (ldlm_interval_alloc(lock) == NULL)
1589                         GOTO(out, rc = -ENOMEM);
1590
1591         if (lvb_len) {
1592                 lock->l_lvb_len = lvb_len;
1593                 OBD_ALLOC_LARGE(lock->l_lvb_data, lvb_len);
1594                 if (lock->l_lvb_data == NULL)
1595                         GOTO(out, rc = -ENOMEM);
1596         }
1597
1598         lock->l_lvb_type = lvb_type;
1599         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1600                 GOTO(out, rc = -ENOENT);
1601
1602         RETURN(lock);
1603
1604 out:
1605         ldlm_lock_destroy(lock);
1606         LDLM_LOCK_RELEASE(lock);
1607         RETURN(ERR_PTR(rc));
1608 }
1609
1610 /**
1611  * Enqueue (request) a lock.
1612  *
1613  * Does not block. As a result of enqueue the lock would be put
1614  * into granted or waiting list.
1615  *
1616  * If namespace has intent policy sent and the lock has LDLM_FL_HAS_INTENT flag
1617  * set, skip all the enqueueing and delegate lock processing to intent policy
1618  * function.
1619  */
1620 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1621                                struct ldlm_lock **lockp,
1622                                void *cookie, __u64 *flags)
1623 {
1624         struct ldlm_lock *lock = *lockp;
1625         struct ldlm_resource *res = lock->l_resource;
1626         int local = ns_is_client(ldlm_res_to_ns(res));
1627 #ifdef HAVE_SERVER_SUPPORT
1628         ldlm_processing_policy policy;
1629 #endif
1630         ldlm_error_t rc = ELDLM_OK;
1631         struct ldlm_interval *node = NULL;
1632         ENTRY;
1633
1634         /* policies are not executed on the client or during replay */
1635         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1636             && !local && ns->ns_policy) {
1637                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1638                                    NULL);
1639                 if (rc == ELDLM_LOCK_REPLACED) {
1640                         /* The lock that was returned has already been granted,
1641                          * and placed into lockp.  If it's not the same as the
1642                          * one we passed in, then destroy the old one and our
1643                          * work here is done. */
1644                         if (lock != *lockp) {
1645                                 ldlm_lock_destroy(lock);
1646                                 LDLM_LOCK_RELEASE(lock);
1647                         }
1648                         *flags |= LDLM_FL_LOCK_CHANGED;
1649                         RETURN(0);
1650                 } else if (rc != ELDLM_OK ||
1651                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1652                         ldlm_lock_destroy(lock);
1653                         RETURN(rc);
1654                 }
1655         }
1656
1657         if (*flags & LDLM_FL_RESENT) {
1658                 /* Reconstruct LDLM_FL_SRV_ENQ_MASK @flags for reply.
1659                  * Set LOCK_CHANGED always.
1660                  * Check if the lock is granted for BLOCK_GRANTED.
1661                  * Take NO_TIMEOUT from the lock as it is inherited through
1662                  * LDLM_FL_INHERIT_MASK */
1663                 *flags |= LDLM_FL_LOCK_CHANGED;
1664                 if (lock->l_req_mode != lock->l_granted_mode)
1665                         *flags |= LDLM_FL_BLOCK_GRANTED;
1666                 *flags |= lock->l_flags & LDLM_FL_NO_TIMEOUT;
1667                 RETURN(ELDLM_OK);
1668         }
1669
1670         /* For a replaying lock, it might be already in granted list. So
1671          * unlinking the lock will cause the interval node to be freed, we
1672          * have to allocate the interval node early otherwise we can't regrant
1673          * this lock in the future. - jay */
1674         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1675                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
1676
1677         lock_res_and_lock(lock);
1678         if (local && lock->l_req_mode == lock->l_granted_mode) {
1679                 /* The server returned a blocked lock, but it was granted
1680                  * before we got a chance to actually enqueue it.  We don't
1681                  * need to do anything else. */
1682                 *flags &= ~LDLM_FL_BLOCKED_MASK;
1683                 GOTO(out, rc = ELDLM_OK);
1684         }
1685
1686         ldlm_resource_unlink_lock(lock);
1687         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1688                 if (node == NULL) {
1689                         ldlm_lock_destroy_nolock(lock);
1690                         GOTO(out, rc = -ENOMEM);
1691                 }
1692
1693                 INIT_LIST_HEAD(&node->li_group);
1694                 ldlm_interval_attach(node, lock);
1695                 node = NULL;
1696         }
1697
1698         /* Some flags from the enqueue want to make it into the AST, via the
1699          * lock's l_flags. */
1700         if (*flags & LDLM_FL_AST_DISCARD_DATA)
1701                 ldlm_set_ast_discard_data(lock);
1702         if (*flags & LDLM_FL_TEST_LOCK)
1703                 ldlm_set_test_lock(lock);
1704
1705         /* This distinction between local lock trees is very important; a client
1706          * namespace only has information about locks taken by that client, and
1707          * thus doesn't have enough information to decide for itself if it can
1708          * be granted (below).  In this case, we do exactly what the server
1709          * tells us to do, as dictated by the 'flags'.
1710          *
1711          * We do exactly the same thing during recovery, when the server is
1712          * more or less trusting the clients not to lie.
1713          *
1714          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1715          * granted/converting queues. */
1716         if (local) {
1717                 if (*flags & LDLM_FL_BLOCK_CONV)
1718                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1719                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1720                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1721                 else
1722                         ldlm_grant_lock(lock, NULL);
1723                 GOTO(out, rc = ELDLM_OK);
1724 #ifdef HAVE_SERVER_SUPPORT
1725         } else if (*flags & LDLM_FL_REPLAY) {
1726                 if (*flags & LDLM_FL_BLOCK_CONV) {
1727                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1728                         GOTO(out, rc = ELDLM_OK);
1729                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1730                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1731                         GOTO(out, rc = ELDLM_OK);
1732                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1733                         ldlm_grant_lock(lock, NULL);
1734                         GOTO(out, rc = ELDLM_OK);
1735                 }
1736                 /* If no flags, fall through to normal enqueue path. */
1737         }
1738
1739         policy = ldlm_processing_policy_table[res->lr_type];
1740         policy(lock, flags, 1, &rc, NULL);
1741         GOTO(out, rc);
1742 #else
1743         } else {
1744                 CERROR("This is client-side-only module, cannot handle "
1745                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1746                 LBUG();
1747         }
1748 #endif
1749
1750 out:
1751         unlock_res_and_lock(lock);
1752         if (node)
1753                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1754         return rc;
1755 }
1756
1757 #ifdef HAVE_SERVER_SUPPORT
1758 /**
1759  * Iterate through all waiting locks on a given resource queue and attempt to
1760  * grant them.
1761  *
1762  * Must be called with resource lock held.
1763  */
1764 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
1765                          struct list_head *work_list)
1766 {
1767         struct list_head *tmp, *pos;
1768         ldlm_processing_policy policy;
1769         __u64 flags;
1770         int rc = LDLM_ITER_CONTINUE;
1771         ldlm_error_t err;
1772         ENTRY;
1773
1774         check_res_locked(res);
1775
1776         policy = ldlm_processing_policy_table[res->lr_type];
1777         LASSERT(policy);
1778
1779         list_for_each_safe(tmp, pos, queue) {
1780                 struct ldlm_lock *pending;
1781                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
1782
1783                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1784
1785                 flags = 0;
1786                 rc = policy(pending, &flags, 0, &err, work_list);
1787                 if (rc != LDLM_ITER_CONTINUE)
1788                         break;
1789         }
1790
1791         RETURN(rc);
1792 }
1793 #endif
1794
1795 /**
1796  * Process a call to blocking AST callback for a lock in ast_work list
1797  */
1798 static int
1799 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1800 {
1801         struct ldlm_cb_set_arg *arg = opaq;
1802         struct ldlm_lock_desc   d;
1803         int                     rc;
1804         struct ldlm_lock       *lock;
1805         ENTRY;
1806
1807         if (list_empty(arg->list))
1808                 RETURN(-ENOENT);
1809
1810         lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1811
1812         /* nobody should touch l_bl_ast */
1813         lock_res_and_lock(lock);
1814         list_del_init(&lock->l_bl_ast);
1815
1816         LASSERT(ldlm_is_ast_sent(lock));
1817         LASSERT(lock->l_bl_ast_run == 0);
1818         LASSERT(lock->l_blocking_lock);
1819         lock->l_bl_ast_run++;
1820         unlock_res_and_lock(lock);
1821
1822         ldlm_lock2desc(lock->l_blocking_lock, &d);
1823
1824         rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1825         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1826         lock->l_blocking_lock = NULL;
1827         LDLM_LOCK_RELEASE(lock);
1828
1829         RETURN(rc);
1830 }
1831
1832 /**
1833  * Process a call to completion AST callback for a lock in ast_work list
1834  */
1835 static int
1836 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1837 {
1838         struct ldlm_cb_set_arg  *arg = opaq;
1839         int                      rc = 0;
1840         struct ldlm_lock        *lock;
1841         ldlm_completion_callback completion_callback;
1842         ENTRY;
1843
1844         if (list_empty(arg->list))
1845                 RETURN(-ENOENT);
1846
1847         lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1848
1849         /* It's possible to receive a completion AST before we've set
1850          * the l_completion_ast pointer: either because the AST arrived
1851          * before the reply, or simply because there's a small race
1852          * window between receiving the reply and finishing the local
1853          * enqueue. (bug 842)
1854          *
1855          * This can't happen with the blocking_ast, however, because we
1856          * will never call the local blocking_ast until we drop our
1857          * reader/writer reference, which we won't do until we get the
1858          * reply and finish enqueueing. */
1859
1860         /* nobody should touch l_cp_ast */
1861         lock_res_and_lock(lock);
1862         list_del_init(&lock->l_cp_ast);
1863         LASSERT(ldlm_is_cp_reqd(lock));
1864         /* save l_completion_ast since it can be changed by
1865          * mds_intent_policy(), see bug 14225 */
1866         completion_callback = lock->l_completion_ast;
1867         ldlm_clear_cp_reqd(lock);
1868         unlock_res_and_lock(lock);
1869
1870         if (completion_callback != NULL)
1871                 rc = completion_callback(lock, 0, (void *)arg);
1872         LDLM_LOCK_RELEASE(lock);
1873
1874         RETURN(rc);
1875 }
1876
1877 /**
1878  * Process a call to revocation AST callback for a lock in ast_work list
1879  */
1880 static int
1881 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1882 {
1883         struct ldlm_cb_set_arg *arg = opaq;
1884         struct ldlm_lock_desc   desc;
1885         int                     rc;
1886         struct ldlm_lock       *lock;
1887         ENTRY;
1888
1889         if (list_empty(arg->list))
1890                 RETURN(-ENOENT);
1891
1892         lock = list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1893         list_del_init(&lock->l_rk_ast);
1894
1895         /* the desc just pretend to exclusive */
1896         ldlm_lock2desc(lock, &desc);
1897         desc.l_req_mode = LCK_EX;
1898         desc.l_granted_mode = 0;
1899
1900         rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1901         LDLM_LOCK_RELEASE(lock);
1902
1903         RETURN(rc);
1904 }
1905
1906 /**
1907  * Process a call to glimpse AST callback for a lock in ast_work list
1908  */
1909 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1910 {
1911         struct ldlm_cb_set_arg          *arg = opaq;
1912         struct ldlm_glimpse_work        *gl_work;
1913         struct ldlm_lock                *lock;
1914         int                              rc = 0;
1915         ENTRY;
1916
1917         if (list_empty(arg->list))
1918                 RETURN(-ENOENT);
1919
1920         gl_work = list_entry(arg->list->next, struct ldlm_glimpse_work,
1921                                  gl_list);
1922         list_del_init(&gl_work->gl_list);
1923
1924         lock = gl_work->gl_lock;
1925
1926         /* transfer the glimpse descriptor to ldlm_cb_set_arg */
1927         arg->gl_desc = gl_work->gl_desc;
1928
1929         /* invoke the actual glimpse callback */
1930         if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
1931                 rc = 1;
1932
1933         LDLM_LOCK_RELEASE(lock);
1934
1935         if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
1936                 OBD_FREE_PTR(gl_work);
1937
1938         RETURN(rc);
1939 }
1940
1941 /**
1942  * Process list of locks in need of ASTs being sent.
1943  *
1944  * Used on server to send multiple ASTs together instead of sending one by
1945  * one.
1946  */
1947 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
1948                       ldlm_desc_ast_t ast_type)
1949 {
1950         struct ldlm_cb_set_arg *arg;
1951         set_producer_func       work_ast_lock;
1952         int                     rc;
1953
1954         if (list_empty(rpc_list))
1955                 RETURN(0);
1956
1957         OBD_ALLOC_PTR(arg);
1958         if (arg == NULL)
1959                 RETURN(-ENOMEM);
1960
1961         atomic_set(&arg->restart, 0);
1962         arg->list = rpc_list;
1963
1964         switch (ast_type) {
1965                 case LDLM_WORK_BL_AST:
1966                         arg->type = LDLM_BL_CALLBACK;
1967                         work_ast_lock = ldlm_work_bl_ast_lock;
1968                         break;
1969                 case LDLM_WORK_CP_AST:
1970                         arg->type = LDLM_CP_CALLBACK;
1971                         work_ast_lock = ldlm_work_cp_ast_lock;
1972                         break;
1973                 case LDLM_WORK_REVOKE_AST:
1974                         arg->type = LDLM_BL_CALLBACK;
1975                         work_ast_lock = ldlm_work_revoke_ast_lock;
1976                         break;
1977                 case LDLM_WORK_GL_AST:
1978                         arg->type = LDLM_GL_CALLBACK;
1979                         work_ast_lock = ldlm_work_gl_ast_lock;
1980                         break;
1981                 default:
1982                         LBUG();
1983         }
1984
1985         /* We create a ptlrpc request set with flow control extension.
1986          * This request set will use the work_ast_lock function to produce new
1987          * requests and will send a new request each time one completes in order
1988          * to keep the number of requests in flight to ns_max_parallel_ast */
1989         arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1990                                      work_ast_lock, arg);
1991         if (arg->set == NULL)
1992                 GOTO(out, rc = -ENOMEM);
1993
1994         ptlrpc_set_wait(arg->set);
1995         ptlrpc_set_destroy(arg->set);
1996
1997         rc = atomic_read(&arg->restart) ? -ERESTART : 0;
1998         GOTO(out, rc);
1999 out:
2000         OBD_FREE_PTR(arg);
2001         return rc;
2002 }
2003
2004 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
2005 {
2006         ldlm_reprocess_all(res);
2007         return LDLM_ITER_CONTINUE;
2008 }
2009
2010 static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2011                               struct hlist_node *hnode, void *arg)
2012 {
2013         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2014         int    rc;
2015
2016         rc = reprocess_one_queue(res, arg);
2017
2018         return rc == LDLM_ITER_STOP;
2019 }
2020
2021 /**
2022  * Iterate through all resources on a namespace attempting to grant waiting
2023  * locks.
2024  */
2025 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
2026 {
2027         ENTRY;
2028
2029         if (ns != NULL) {
2030                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2031                                          ldlm_reprocess_res, NULL);
2032         }
2033         EXIT;
2034 }
2035 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
2036
2037 /**
2038  * Try to grant all waiting locks on a resource.
2039  *
2040  * Calls ldlm_reprocess_queue on converting and waiting queues.
2041  *
2042  * Typically called after some resource locks are cancelled to see
2043  * if anything could be granted as a result of the cancellation.
2044  */
2045 void ldlm_reprocess_all(struct ldlm_resource *res)
2046 {
2047         struct list_head rpc_list;
2048 #ifdef HAVE_SERVER_SUPPORT
2049         int rc;
2050         ENTRY;
2051
2052         INIT_LIST_HEAD(&rpc_list);
2053         /* Local lock trees don't get reprocessed. */
2054         if (ns_is_client(ldlm_res_to_ns(res))) {
2055                 EXIT;
2056                 return;
2057         }
2058
2059 restart:
2060         lock_res(res);
2061         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
2062         if (rc == LDLM_ITER_CONTINUE)
2063                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
2064         unlock_res(res);
2065
2066         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
2067                                LDLM_WORK_CP_AST);
2068         if (rc == -ERESTART) {
2069                 LASSERT(list_empty(&rpc_list));
2070                 goto restart;
2071         }
2072 #else
2073         ENTRY;
2074
2075         INIT_LIST_HEAD(&rpc_list);
2076         if (!ns_is_client(ldlm_res_to_ns(res))) {
2077                 CERROR("This is client-side-only module, cannot handle "
2078                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2079                 LBUG();
2080         }
2081 #endif
2082         EXIT;
2083 }
2084 EXPORT_SYMBOL(ldlm_reprocess_all);
2085
2086 /**
2087  * Helper function to call blocking AST for LDLM lock \a lock in a
2088  * "cancelling" mode.
2089  */
2090 void ldlm_cancel_callback(struct ldlm_lock *lock)
2091 {
2092         check_res_locked(lock->l_resource);
2093         if (!ldlm_is_cancel(lock)) {
2094                 ldlm_set_cancel(lock);
2095                 if (lock->l_blocking_ast) {
2096                         unlock_res_and_lock(lock);
2097                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
2098                                              LDLM_CB_CANCELING);
2099                         lock_res_and_lock(lock);
2100                 } else {
2101                         LDLM_DEBUG(lock, "no blocking ast");
2102                 }
2103         }
2104         ldlm_set_bl_done(lock);
2105 }
2106
2107 /**
2108  * Remove skiplist-enabled LDLM lock \a req from granted list
2109  */
2110 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
2111 {
2112         if (req->l_resource->lr_type != LDLM_PLAIN &&
2113             req->l_resource->lr_type != LDLM_IBITS)
2114                 return;
2115
2116         list_del_init(&req->l_sl_policy);
2117         list_del_init(&req->l_sl_mode);
2118 }
2119
2120 /**
2121  * Attempts to cancel LDLM lock \a lock that has no reader/writer references.
2122  */
2123 void ldlm_lock_cancel(struct ldlm_lock *lock)
2124 {
2125         struct ldlm_resource *res;
2126         struct ldlm_namespace *ns;
2127         ENTRY;
2128
2129         lock_res_and_lock(lock);
2130
2131         res = lock->l_resource;
2132         ns  = ldlm_res_to_ns(res);
2133
2134         /* Please do not, no matter how tempting, remove this LBUG without
2135          * talking to me first. -phik */
2136         if (lock->l_readers || lock->l_writers) {
2137                 LDLM_ERROR(lock, "lock still has references");
2138                 LBUG();
2139         }
2140
2141         if (ldlm_is_waited(lock))
2142                 ldlm_del_waiting_lock(lock);
2143
2144         /* Releases cancel callback. */
2145         ldlm_cancel_callback(lock);
2146
2147         /* Yes, second time, just in case it was added again while we were
2148          * running with no res lock in ldlm_cancel_callback */
2149         if (ldlm_is_waited(lock))
2150                 ldlm_del_waiting_lock(lock);
2151
2152         ldlm_resource_unlink_lock(lock);
2153         ldlm_lock_destroy_nolock(lock);
2154
2155         if (lock->l_granted_mode == lock->l_req_mode)
2156                 ldlm_pool_del(&ns->ns_pool, lock);
2157
2158         /* Make sure we will not be called again for same lock what is possible
2159          * if not to zero out lock->l_granted_mode */
2160         lock->l_granted_mode = LCK_MINMODE;
2161         unlock_res_and_lock(lock);
2162
2163         EXIT;
2164 }
2165 EXPORT_SYMBOL(ldlm_lock_cancel);
2166
2167 /**
2168  * Set opaque data into the lock that only makes sense to upper layer.
2169  */
2170 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
2171 {
2172         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2173         int rc = -EINVAL;
2174         ENTRY;
2175
2176         if (lock) {
2177                 if (lock->l_ast_data == NULL)
2178                         lock->l_ast_data = data;
2179                 if (lock->l_ast_data == data)
2180                         rc = 0;
2181                 LDLM_LOCK_PUT(lock);
2182         }
2183         RETURN(rc);
2184 }
2185 EXPORT_SYMBOL(ldlm_lock_set_data);
2186
2187 struct export_cl_data {
2188         struct obd_export       *ecl_exp;
2189         int                     ecl_loop;
2190 };
2191
2192 /**
2193  * Iterator function for ldlm_cancel_locks_for_export.
2194  * Cancels passed locks.
2195  */
2196 static int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2197                                            struct hlist_node *hnode, void *data)
2198
2199 {
2200         struct export_cl_data   *ecl = (struct export_cl_data *)data;
2201         struct obd_export       *exp  = ecl->ecl_exp;
2202         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
2203         struct ldlm_resource *res;
2204
2205         res = ldlm_resource_getref(lock->l_resource);
2206         LDLM_LOCK_GET(lock);
2207
2208         LDLM_DEBUG(lock, "export %p", exp);
2209         ldlm_res_lvbo_update(res, NULL, 1);
2210         ldlm_lock_cancel(lock);
2211         ldlm_reprocess_all(res);
2212         ldlm_resource_putref(res);
2213         LDLM_LOCK_RELEASE(lock);
2214
2215         ecl->ecl_loop++;
2216         if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
2217                 CDEBUG(D_INFO,
2218                        "Cancel lock %p for export %p (loop %d), still have "
2219                        "%d locks left on hash table.\n",
2220                        lock, exp, ecl->ecl_loop,
2221                        atomic_read(&hs->hs_count));
2222         }
2223
2224         return 0;
2225 }
2226
2227 /**
2228  * Cancel all locks for given export.
2229  *
2230  * Typically called on client disconnection/eviction
2231  */
2232 void ldlm_cancel_locks_for_export(struct obd_export *exp)
2233 {
2234         struct export_cl_data   ecl = {
2235                 .ecl_exp        = exp,
2236                 .ecl_loop       = 0,
2237         };
2238
2239         cfs_hash_for_each_empty(exp->exp_lock_hash,
2240                                 ldlm_cancel_locks_for_export_cb, &ecl);
2241 }
2242
2243 /**
2244  * Downgrade an exclusive lock.
2245  *
2246  * A fast variant of ldlm_lock_convert for convertion of exclusive
2247  * locks. The convertion is always successful.
2248  * Used by Commit on Sharing (COS) code.
2249  *
2250  * \param lock A lock to convert
2251  * \param new_mode new lock mode
2252  */
2253 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
2254 {
2255         ENTRY;
2256
2257         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
2258         LASSERT(new_mode == LCK_COS);
2259
2260         lock_res_and_lock(lock);
2261         ldlm_resource_unlink_lock(lock);
2262         /*
2263          * Remove the lock from pool as it will be added again in
2264          * ldlm_grant_lock() called below.
2265          */
2266         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
2267
2268         lock->l_req_mode = new_mode;
2269         ldlm_grant_lock(lock, NULL);
2270         unlock_res_and_lock(lock);
2271         ldlm_reprocess_all(lock->l_resource);
2272
2273         EXIT;
2274 }
2275 EXPORT_SYMBOL(ldlm_lock_downgrade);
2276
2277 /**
2278  * Attempt to convert already granted lock to a different mode.
2279  *
2280  * While lock conversion is not currently used, future client-side
2281  * optimizations could take advantage of it to avoid discarding cached
2282  * pages on a file.
2283  */
2284 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
2285                                         __u32 *flags)
2286 {
2287         struct list_head rpc_list;
2288         struct ldlm_resource *res;
2289         struct ldlm_namespace *ns;
2290         int granted = 0;
2291 #ifdef HAVE_SERVER_SUPPORT
2292         int old_mode;
2293         struct sl_insert_point prev;
2294 #endif
2295         struct ldlm_interval *node;
2296         ENTRY;
2297
2298         INIT_LIST_HEAD(&rpc_list);
2299         /* Just return if mode is unchanged. */
2300         if (new_mode == lock->l_granted_mode) {
2301                 *flags |= LDLM_FL_BLOCK_GRANTED;
2302                 RETURN(lock->l_resource);
2303         }
2304
2305         /* I can't check the type of lock here because the bitlock of lock
2306          * is not held here, so do the allocation blindly. -jay */
2307         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
2308         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
2309                 RETURN(NULL);
2310
2311         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
2312                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
2313
2314         lock_res_and_lock(lock);
2315
2316         res = lock->l_resource;
2317         ns  = ldlm_res_to_ns(res);
2318
2319 #ifdef HAVE_SERVER_SUPPORT
2320         old_mode = lock->l_req_mode;
2321 #endif
2322         lock->l_req_mode = new_mode;
2323         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
2324 #ifdef HAVE_SERVER_SUPPORT
2325                 /* remember the lock position where the lock might be
2326                  * added back to the granted list later and also
2327                  * remember the join mode for skiplist fixing. */
2328                 prev.res_link = lock->l_res_link.prev;
2329                 prev.mode_link = lock->l_sl_mode.prev;
2330                 prev.policy_link = lock->l_sl_policy.prev;
2331 #endif
2332                 ldlm_resource_unlink_lock(lock);
2333         } else {
2334                 ldlm_resource_unlink_lock(lock);
2335                 if (res->lr_type == LDLM_EXTENT) {
2336                         /* FIXME: ugly code, I have to attach the lock to a
2337                          * interval node again since perhaps it will be granted
2338                          * soon */
2339                         INIT_LIST_HEAD(&node->li_group);
2340                         ldlm_interval_attach(node, lock);
2341                         node = NULL;
2342                 }
2343         }
2344
2345         /*
2346          * Remove old lock from the pool before adding the lock with new
2347          * mode below in ->policy()
2348          */
2349         ldlm_pool_del(&ns->ns_pool, lock);
2350
2351         /* If this is a local resource, put it on the appropriate list. */
2352         if (ns_is_client(ldlm_res_to_ns(res))) {
2353                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
2354                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
2355                 } else {
2356                         /* This should never happen, because of the way the
2357                          * server handles conversions. */
2358                         LDLM_ERROR(lock, "Erroneous flags %x on local lock\n",
2359                                    *flags);
2360                         LBUG();
2361
2362                         ldlm_grant_lock(lock, &rpc_list);
2363                         granted = 1;
2364                         /* FIXME: completion handling not with lr_lock held ! */
2365                         if (lock->l_completion_ast)
2366                                 lock->l_completion_ast(lock, 0, NULL);
2367                 }
2368 #ifdef HAVE_SERVER_SUPPORT
2369         } else {
2370                 int rc;
2371                 ldlm_error_t err;
2372                 __u64 pflags = 0;
2373                 ldlm_processing_policy policy;
2374                 policy = ldlm_processing_policy_table[res->lr_type];
2375                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
2376                 if (rc == LDLM_ITER_STOP) {
2377                         lock->l_req_mode = old_mode;
2378                         if (res->lr_type == LDLM_EXTENT)
2379                                 ldlm_extent_add_lock(res, lock);
2380                         else
2381                                 ldlm_granted_list_add_lock(lock, &prev);
2382
2383                         res = NULL;
2384                 } else {
2385                         *flags |= LDLM_FL_BLOCK_GRANTED;
2386                         granted = 1;
2387                 }
2388         }
2389 #else
2390         } else {
2391                 CERROR("This is client-side-only module, cannot handle "
2392                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2393                 LBUG();
2394         }
2395 #endif
2396         unlock_res_and_lock(lock);
2397
2398         if (granted)
2399                 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2400         if (node)
2401                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2402         RETURN(res);
2403 }
2404 EXPORT_SYMBOL(ldlm_lock_convert);
2405
2406 /**
2407  * Print lock with lock handle \a lockh description into debug log.
2408  *
2409  * Used when printing all locks on a resource for debug purposes.
2410  */
2411 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2412 {
2413         struct ldlm_lock *lock;
2414
2415         if (!((libcfs_debug | D_ERROR) & level))
2416                 return;
2417
2418         lock = ldlm_handle2lock(lockh);
2419         if (lock == NULL)
2420                 return;
2421
2422         LDLM_DEBUG_LIMIT(level, lock, "###");
2423
2424         LDLM_LOCK_PUT(lock);
2425 }
2426 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2427
2428 /**
2429  * Print lock information with custom message into debug log.
2430  * Helper function.
2431  */
2432 void _ldlm_lock_debug(struct ldlm_lock *lock,
2433                       struct libcfs_debug_msg_data *msgdata,
2434                       const char *fmt, ...)
2435 {
2436         va_list args;
2437         struct obd_export *exp = lock->l_export;
2438         struct ldlm_resource *resource = lock->l_resource;
2439         char *nid = "local";
2440
2441         va_start(args, fmt);
2442
2443         if (exp && exp->exp_connection) {
2444                 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2445         } else if (exp && exp->exp_obd != NULL) {
2446                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2447                 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2448         }
2449
2450         if (resource == NULL) {
2451                 libcfs_debug_vmsg2(msgdata, fmt, args,
2452                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2453                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2454                        "remote: "LPX64" expref: %d pid: %u timeout: %lu "
2455                        "lvb_type: %d\n",
2456                        lock,
2457                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2458                        lock->l_readers, lock->l_writers,
2459                        ldlm_lockname[lock->l_granted_mode],
2460                        ldlm_lockname[lock->l_req_mode],
2461                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2462                        exp ? atomic_read(&exp->exp_refcount) : -99,
2463                        lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2464                 va_end(args);
2465                 return;
2466         }
2467
2468         switch (resource->lr_type) {
2469         case LDLM_EXTENT:
2470                 libcfs_debug_vmsg2(msgdata, fmt, args,
2471                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2472                         "res: "DLDLMRES" rrc: %d type: %s ["LPU64"->"LPU64"] "
2473                         "(req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote: "
2474                         LPX64" expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2475                         ldlm_lock_to_ns_name(lock), lock,
2476                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2477                         lock->l_readers, lock->l_writers,
2478                         ldlm_lockname[lock->l_granted_mode],
2479                         ldlm_lockname[lock->l_req_mode],
2480                         PLDLMRES(resource),
2481                         atomic_read(&resource->lr_refcount),
2482                         ldlm_typename[resource->lr_type],
2483                         lock->l_policy_data.l_extent.start,
2484                         lock->l_policy_data.l_extent.end,
2485                         lock->l_req_extent.start, lock->l_req_extent.end,
2486                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2487                         exp ? atomic_read(&exp->exp_refcount) : -99,
2488                         lock->l_pid, lock->l_callback_timeout,
2489                         lock->l_lvb_type);
2490                 break;
2491
2492         case LDLM_FLOCK:
2493                 libcfs_debug_vmsg2(msgdata, fmt, args,
2494                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2495                         "res: "DLDLMRES" rrc: %d type: %s pid: %d "
2496                         "["LPU64"->"LPU64"] flags: "LPX64" nid: %s "
2497                         "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
2498                         ldlm_lock_to_ns_name(lock), lock,
2499                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2500                         lock->l_readers, lock->l_writers,
2501                         ldlm_lockname[lock->l_granted_mode],
2502                         ldlm_lockname[lock->l_req_mode],
2503                         PLDLMRES(resource),
2504                         atomic_read(&resource->lr_refcount),
2505                         ldlm_typename[resource->lr_type],
2506                         lock->l_policy_data.l_flock.pid,
2507                         lock->l_policy_data.l_flock.start,
2508                         lock->l_policy_data.l_flock.end,
2509                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2510                         exp ? atomic_read(&exp->exp_refcount) : -99,
2511                         lock->l_pid, lock->l_callback_timeout);
2512                 break;
2513
2514         case LDLM_IBITS:
2515                 libcfs_debug_vmsg2(msgdata, fmt, args,
2516                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2517                         "res: "DLDLMRES" bits "LPX64" rrc: %d type: %s "
2518                         "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2519                         "pid: %u timeout: %lu lvb_type: %d\n",
2520                         ldlm_lock_to_ns_name(lock),
2521                         lock, lock->l_handle.h_cookie,
2522                         atomic_read(&lock->l_refc),
2523                         lock->l_readers, lock->l_writers,
2524                         ldlm_lockname[lock->l_granted_mode],
2525                         ldlm_lockname[lock->l_req_mode],
2526                         PLDLMRES(resource),
2527                         lock->l_policy_data.l_inodebits.bits,
2528                         atomic_read(&resource->lr_refcount),
2529                         ldlm_typename[resource->lr_type],
2530                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2531                         exp ? atomic_read(&exp->exp_refcount) : -99,
2532                         lock->l_pid, lock->l_callback_timeout,
2533                         lock->l_lvb_type);
2534                 break;
2535
2536         default:
2537                 libcfs_debug_vmsg2(msgdata, fmt, args,
2538                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2539                         "res: "DLDLMRES" rrc: %d type: %s flags: "LPX64" "
2540                         "nid: %s remote: "LPX64" expref: %d pid: %u "
2541                         "timeout: %lu lvb_type: %d\n",
2542                         ldlm_lock_to_ns_name(lock),
2543                         lock, lock->l_handle.h_cookie,
2544                         atomic_read(&lock->l_refc),
2545                         lock->l_readers, lock->l_writers,
2546                         ldlm_lockname[lock->l_granted_mode],
2547                         ldlm_lockname[lock->l_req_mode],
2548                         PLDLMRES(resource),
2549                         atomic_read(&resource->lr_refcount),
2550                         ldlm_typename[resource->lr_type],
2551                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2552                         exp ? atomic_read(&exp->exp_refcount) : -99,
2553                         lock->l_pid, lock->l_callback_timeout,
2554                         lock->l_lvb_type);
2555                 break;
2556         }
2557         va_end(args);
2558 }
2559 EXPORT_SYMBOL(_ldlm_lock_debug);