Whamcloud - gitweb
LU-5287 export: hold exp_lock when modify exp_flags
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #include <libcfs/libcfs.h>
45 #include <obd_class.h>
46 #include "ldlm_internal.h"
47
48 /* lock types */
49 char *ldlm_lockname[] = {
50         [0] = "--",
51         [LCK_EX] = "EX",
52         [LCK_PW] = "PW",
53         [LCK_PR] = "PR",
54         [LCK_CW] = "CW",
55         [LCK_CR] = "CR",
56         [LCK_NL] = "NL",
57         [LCK_GROUP] = "GROUP",
58         [LCK_COS] = "COS"
59 };
60 EXPORT_SYMBOL(ldlm_lockname);
61
62 char *ldlm_typename[] = {
63         [LDLM_PLAIN] = "PLN",
64         [LDLM_EXTENT] = "EXT",
65         [LDLM_FLOCK] = "FLK",
66         [LDLM_IBITS] = "IBT",
67 };
68 EXPORT_SYMBOL(ldlm_typename);
69
70 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
71         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_wire_to_local,
72         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
73         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_wire18_to_local,
74         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_wire_to_local,
75 };
76
77 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
78         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_wire_to_local,
79         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
80         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_wire21_to_local,
81         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_wire_to_local,
82 };
83
84 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
85         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_local_to_wire,
86         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_local_to_wire,
87         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_local_to_wire,
88         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_local_to_wire,
89 };
90
91 /**
92  * Converts lock policy from local format to on the wire lock_desc format
93  */
94 void ldlm_convert_policy_to_wire(ldlm_type_t type,
95                                  const ldlm_policy_data_t *lpolicy,
96                                  ldlm_wire_policy_data_t *wpolicy)
97 {
98         ldlm_policy_local_to_wire_t convert;
99
100         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
101
102         convert(lpolicy, wpolicy);
103 }
104
105 /**
106  * Converts lock policy from on the wire lock_desc format to local format
107  */
108 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
109                                   const ldlm_wire_policy_data_t *wpolicy,
110                                   ldlm_policy_data_t *lpolicy)
111 {
112         ldlm_policy_wire_to_local_t convert;
113         int new_client;
114
115         /** some badness for 2.0.0 clients, but 2.0.0 isn't supported */
116         new_client = (exp_connect_flags(exp) & OBD_CONNECT_FULL20) != 0;
117         if (new_client)
118                 convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
119         else
120                 convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
121
122         convert(wpolicy, lpolicy);
123 }
124
125 char *ldlm_it2str(int it)
126 {
127         switch (it) {
128         case IT_OPEN:
129                 return "open";
130         case IT_CREAT:
131                 return "creat";
132         case (IT_OPEN | IT_CREAT):
133                 return "open|creat";
134         case IT_READDIR:
135                 return "readdir";
136         case IT_GETATTR:
137                 return "getattr";
138         case IT_LOOKUP:
139                 return "lookup";
140         case IT_UNLINK:
141                 return "unlink";
142         case IT_GETXATTR:
143                 return "getxattr";
144         case IT_LAYOUT:
145                 return "layout";
146         default:
147                 CERROR("Unknown intent %d\n", it);
148                 return "UNKNOWN";
149         }
150 }
151 EXPORT_SYMBOL(ldlm_it2str);
152
153 extern struct kmem_cache *ldlm_lock_slab;
154
155 #ifdef HAVE_SERVER_SUPPORT
156 static ldlm_processing_policy ldlm_processing_policy_table[] = {
157         [LDLM_PLAIN]    = ldlm_process_plain_lock,
158         [LDLM_EXTENT]   = ldlm_process_extent_lock,
159         [LDLM_FLOCK]    = ldlm_process_flock_lock,
160         [LDLM_IBITS]    = ldlm_process_inodebits_lock,
161 };
162
163 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
164 {
165         return ldlm_processing_policy_table[res->lr_type];
166 }
167 EXPORT_SYMBOL(ldlm_get_processing_policy);
168 #endif /* HAVE_SERVER_SUPPORT */
169
170 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
171 {
172         ns->ns_policy = arg;
173 }
174 EXPORT_SYMBOL(ldlm_register_intent);
175
176 /*
177  * REFCOUNTED LOCK OBJECTS
178  */
179
180
181 /**
182  * Get a reference on a lock.
183  *
184  * Lock refcounts, during creation:
185  *   - one special one for allocation, dec'd only once in destroy
186  *   - one for being a lock that's in-use
187  *   - one for the addref associated with a new lock
188  */
189 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
190 {
191         atomic_inc(&lock->l_refc);
192         return lock;
193 }
194 EXPORT_SYMBOL(ldlm_lock_get);
195
196 /**
197  * Release lock reference.
198  *
199  * Also frees the lock if it was last reference.
200  */
201 void ldlm_lock_put(struct ldlm_lock *lock)
202 {
203         ENTRY;
204
205         LASSERT(lock->l_resource != LP_POISON);
206         LASSERT(atomic_read(&lock->l_refc) > 0);
207         if (atomic_dec_and_test(&lock->l_refc)) {
208                 struct ldlm_resource *res;
209
210                 LDLM_DEBUG(lock,
211                            "final lock_put on destroyed lock, freeing it.");
212
213                 res = lock->l_resource;
214                 LASSERT(ldlm_is_destroyed(lock));
215                 LASSERT(list_empty(&lock->l_res_link));
216                 LASSERT(list_empty(&lock->l_pending_chain));
217
218                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
219                                      LDLM_NSS_LOCKS);
220                 lu_ref_del(&res->lr_reference, "lock", lock);
221                 ldlm_resource_putref(res);
222                 lock->l_resource = NULL;
223                 if (lock->l_export) {
224                         class_export_lock_put(lock->l_export, lock);
225                         lock->l_export = NULL;
226                 }
227
228                 if (lock->l_lvb_data != NULL)
229                         OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
230
231                 ldlm_interval_free(ldlm_interval_detach(lock));
232                 lu_ref_fini(&lock->l_reference);
233                 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
234         }
235
236         EXIT;
237 }
238 EXPORT_SYMBOL(ldlm_lock_put);
239
240 /**
241  * Removes LDLM lock \a lock from LRU. Assumes LRU is already locked.
242  */
243 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
244 {
245         int rc = 0;
246         if (!list_empty(&lock->l_lru)) {
247                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
248
249                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
250                 list_del_init(&lock->l_lru);
251                 LASSERT(ns->ns_nr_unused > 0);
252                 ns->ns_nr_unused--;
253                 rc = 1;
254         }
255         return rc;
256 }
257
258 /**
259  * Removes LDLM lock \a lock from LRU. Obtains the LRU lock first.
260  */
261 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
262 {
263         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
264         int rc;
265
266         ENTRY;
267         if (ldlm_is_ns_srv(lock)) {
268                 LASSERT(list_empty(&lock->l_lru));
269                 RETURN(0);
270         }
271
272         spin_lock(&ns->ns_lock);
273         rc = ldlm_lock_remove_from_lru_nolock(lock);
274         spin_unlock(&ns->ns_lock);
275         EXIT;
276         return rc;
277 }
278
279 /**
280  * Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked.
281  */
282 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
283 {
284         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
285
286         lock->l_last_used = cfs_time_current();
287         LASSERT(list_empty(&lock->l_lru));
288         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
289         list_add_tail(&lock->l_lru, &ns->ns_unused_list);
290         ldlm_clear_skipped(lock);
291         LASSERT(ns->ns_nr_unused >= 0);
292         ns->ns_nr_unused++;
293 }
294
295 /**
296  * Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks
297  * first.
298  */
299 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
300 {
301         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
302
303         ENTRY;
304         spin_lock(&ns->ns_lock);
305         ldlm_lock_add_to_lru_nolock(lock);
306         spin_unlock(&ns->ns_lock);
307         EXIT;
308 }
309
310 /**
311  * Moves LDLM lock \a lock that is already in namespace LRU to the tail of
312  * the LRU. Performs necessary LRU locking
313  */
314 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
315 {
316         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
317
318         ENTRY;
319         if (ldlm_is_ns_srv(lock)) {
320                 LASSERT(list_empty(&lock->l_lru));
321                 EXIT;
322                 return;
323         }
324
325         spin_lock(&ns->ns_lock);
326         if (!list_empty(&lock->l_lru)) {
327                 ldlm_lock_remove_from_lru_nolock(lock);
328                 ldlm_lock_add_to_lru_nolock(lock);
329         }
330         spin_unlock(&ns->ns_lock);
331         EXIT;
332 }
333
334 /**
335  * Helper to destroy a locked lock.
336  *
337  * Used by ldlm_lock_destroy and ldlm_lock_destroy_nolock
338  * Must be called with l_lock and lr_lock held.
339  *
340  * Does not actually free the lock data, but rather marks the lock as
341  * destroyed by setting l_destroyed field in the lock to 1.  Destroys a
342  * handle->lock association too, so that the lock can no longer be found
343  * and removes the lock from LRU list.  Actual lock freeing occurs when
344  * last lock reference goes away.
345  *
346  * Original comment (of some historical value):
347  * This used to have a 'strict' flag, which recovery would use to mark an
348  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
349  * shall explain why it's gone: with the new hash table scheme, once you call
350  * ldlm_lock_destroy, you can never drop your final references on this lock.
351  * Because it's not in the hash table anymore.  -phil
352  */
353 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
354 {
355         ENTRY;
356
357         if (lock->l_readers || lock->l_writers) {
358                 LDLM_ERROR(lock, "lock still has references");
359                 LBUG();
360         }
361
362         if (!list_empty(&lock->l_res_link)) {
363                 LDLM_ERROR(lock, "lock still on resource");
364                 LBUG();
365         }
366
367         if (ldlm_is_destroyed(lock)) {
368                 LASSERT(list_empty(&lock->l_lru));
369                 EXIT;
370                 return 0;
371         }
372         ldlm_set_destroyed(lock);
373
374         if (lock->l_export && lock->l_export->exp_lock_hash) {
375                 /* NB: it's safe to call cfs_hash_del() even lock isn't
376                  * in exp_lock_hash. */
377                 /* In the function below, .hs_keycmp resolves to
378                  * ldlm_export_lock_keycmp() */
379                 /* coverity[overrun-buffer-val] */
380                 cfs_hash_del(lock->l_export->exp_lock_hash,
381                              &lock->l_remote_handle, &lock->l_exp_hash);
382         }
383
384         ldlm_lock_remove_from_lru(lock);
385         class_handle_unhash(&lock->l_handle);
386
387 #if 0
388         /* Wake anyone waiting for this lock */
389         /* FIXME: I should probably add yet another flag, instead of using
390          * l_export to only call this on clients */
391         if (lock->l_export)
392                 class_export_put(lock->l_export);
393         lock->l_export = NULL;
394         if (lock->l_export && lock->l_completion_ast)
395                 lock->l_completion_ast(lock, 0);
396 #endif
397         EXIT;
398         return 1;
399 }
400
401 /**
402  * Destroys a LDLM lock \a lock. Performs necessary locking first.
403  */
404 void ldlm_lock_destroy(struct ldlm_lock *lock)
405 {
406         int first;
407         ENTRY;
408         lock_res_and_lock(lock);
409         first = ldlm_lock_destroy_internal(lock);
410         unlock_res_and_lock(lock);
411
412         /* drop reference from hashtable only for first destroy */
413         if (first) {
414                 lu_ref_del(&lock->l_reference, "hash", lock);
415                 LDLM_LOCK_RELEASE(lock);
416         }
417         EXIT;
418 }
419
420 /**
421  * Destroys a LDLM lock \a lock that is already locked.
422  */
423 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
424 {
425         int first;
426         ENTRY;
427         first = ldlm_lock_destroy_internal(lock);
428         /* drop reference from hashtable only for first destroy */
429         if (first) {
430                 lu_ref_del(&lock->l_reference, "hash", lock);
431                 LDLM_LOCK_RELEASE(lock);
432         }
433         EXIT;
434 }
435
436 /* this is called by portals_handle2object with the handle lock taken */
437 static void lock_handle_addref(void *lock)
438 {
439         LDLM_LOCK_GET((struct ldlm_lock *)lock);
440 }
441
442 static void lock_handle_free(void *lock, int size)
443 {
444         LASSERT(size == sizeof(struct ldlm_lock));
445         OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
446 }
447
448 struct portals_handle_ops lock_handle_ops = {
449         .hop_addref = lock_handle_addref,
450         .hop_free   = lock_handle_free,
451 };
452
453 /**
454  *
455  * Allocate and initialize new lock structure.
456  *
457  * usage: pass in a resource on which you have done ldlm_resource_get
458  *        new lock will take over the refcount.
459  * returns: lock with refcount 2 - one for current caller and one for remote
460  */
461 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
462 {
463         struct ldlm_lock *lock;
464         ENTRY;
465
466         if (resource == NULL)
467                 LBUG();
468
469         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, GFP_NOFS);
470         if (lock == NULL)
471                 RETURN(NULL);
472
473         spin_lock_init(&lock->l_lock);
474         lock->l_resource = resource;
475         lu_ref_add(&resource->lr_reference, "lock", lock);
476
477         atomic_set(&lock->l_refc, 2);
478         INIT_LIST_HEAD(&lock->l_res_link);
479         INIT_LIST_HEAD(&lock->l_lru);
480         INIT_LIST_HEAD(&lock->l_pending_chain);
481         INIT_LIST_HEAD(&lock->l_bl_ast);
482         INIT_LIST_HEAD(&lock->l_cp_ast);
483         INIT_LIST_HEAD(&lock->l_rk_ast);
484         init_waitqueue_head(&lock->l_waitq);
485         lock->l_blocking_lock = NULL;
486         INIT_LIST_HEAD(&lock->l_sl_mode);
487         INIT_LIST_HEAD(&lock->l_sl_policy);
488         INIT_HLIST_NODE(&lock->l_exp_hash);
489         INIT_HLIST_NODE(&lock->l_exp_flock_hash);
490
491         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
492                              LDLM_NSS_LOCKS);
493         INIT_LIST_HEAD(&lock->l_handle.h_link);
494         class_handle_hash(&lock->l_handle, &lock_handle_ops);
495
496         lu_ref_init(&lock->l_reference);
497         lu_ref_add(&lock->l_reference, "hash", lock);
498         lock->l_callback_timeout = 0;
499
500 #if LUSTRE_TRACKS_LOCK_EXP_REFS
501         INIT_LIST_HEAD(&lock->l_exp_refs_link);
502         lock->l_exp_refs_nr = 0;
503         lock->l_exp_refs_target = NULL;
504 #endif
505         INIT_LIST_HEAD(&lock->l_exp_list);
506
507         RETURN(lock);
508 }
509
510 /**
511  * Moves LDLM lock \a lock to another resource.
512  * This is used on client when server returns some other lock than requested
513  * (typically as a result of intent operation)
514  */
515 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
516                               const struct ldlm_res_id *new_resid)
517 {
518         struct ldlm_resource *oldres = lock->l_resource;
519         struct ldlm_resource *newres;
520         int type;
521         ENTRY;
522
523         LASSERT(ns_is_client(ns));
524
525         lock_res_and_lock(lock);
526         if (memcmp(new_resid, &lock->l_resource->lr_name,
527                    sizeof(lock->l_resource->lr_name)) == 0) {
528                 /* Nothing to do */
529                 unlock_res_and_lock(lock);
530                 RETURN(0);
531         }
532
533         LASSERT(new_resid->name[0] != 0);
534
535         /* This function assumes that the lock isn't on any lists */
536         LASSERT(list_empty(&lock->l_res_link));
537
538         type = oldres->lr_type;
539         unlock_res_and_lock(lock);
540
541         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
542         if (IS_ERR(newres))
543                 RETURN(PTR_ERR(newres));
544
545         lu_ref_add(&newres->lr_reference, "lock", lock);
546         /*
547          * To flip the lock from the old to the new resource, lock, oldres and
548          * newres have to be locked. Resource spin-locks are nested within
549          * lock->l_lock, and are taken in the memory address order to avoid
550          * dead-locks.
551          */
552         spin_lock(&lock->l_lock);
553         oldres = lock->l_resource;
554         if (oldres < newres) {
555                 lock_res(oldres);
556                 lock_res_nested(newres, LRT_NEW);
557         } else {
558                 lock_res(newres);
559                 lock_res_nested(oldres, LRT_NEW);
560         }
561         LASSERT(memcmp(new_resid, &oldres->lr_name,
562                        sizeof oldres->lr_name) != 0);
563         lock->l_resource = newres;
564         unlock_res(oldres);
565         unlock_res_and_lock(lock);
566
567         /* ...and the flowers are still standing! */
568         lu_ref_del(&oldres->lr_reference, "lock", lock);
569         ldlm_resource_putref(oldres);
570
571         RETURN(0);
572 }
573 EXPORT_SYMBOL(ldlm_lock_change_resource);
574
575 /** \defgroup ldlm_handles LDLM HANDLES
576  * Ways to get hold of locks without any addresses.
577  * @{
578  */
579
580 /**
581  * Fills in handle for LDLM lock \a lock into supplied \a lockh
582  * Does not take any references.
583  */
584 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
585 {
586         lockh->cookie = lock->l_handle.h_cookie;
587 }
588 EXPORT_SYMBOL(ldlm_lock2handle);
589
590 /**
591  * Obtain a lock reference by handle.
592  *
593  * if \a flags: atomically get the lock and set the flags.
594  *              Return NULL if flag already set
595  */
596 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
597                                      __u64 flags)
598 {
599         struct ldlm_lock *lock;
600         ENTRY;
601
602         LASSERT(handle);
603
604         lock = class_handle2object(handle->cookie, NULL);
605         if (lock == NULL)
606                 RETURN(NULL);
607
608         /* It's unlikely but possible that someone marked the lock as
609          * destroyed after we did handle2object on it */
610         if ((flags == 0) && !ldlm_is_destroyed(lock)) {
611                 lu_ref_add(&lock->l_reference, "handle", current);
612                 RETURN(lock);
613         }
614
615         lock_res_and_lock(lock);
616
617         LASSERT(lock->l_resource != NULL);
618
619         lu_ref_add_atomic(&lock->l_reference, "handle", current);
620         if (unlikely(ldlm_is_destroyed(lock))) {
621                 unlock_res_and_lock(lock);
622                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
623                 LDLM_LOCK_PUT(lock);
624                 RETURN(NULL);
625         }
626
627         /* If we're setting flags, make sure none of them are already set. */
628         if (flags != 0) {
629                 if ((lock->l_flags & flags) != 0) {
630                         unlock_res_and_lock(lock);
631                         LDLM_LOCK_PUT(lock);
632                         RETURN(NULL);
633                 }
634
635                 lock->l_flags |= flags;
636         }
637
638         unlock_res_and_lock(lock);
639         RETURN(lock);
640 }
641 EXPORT_SYMBOL(__ldlm_handle2lock);
642 /** @} ldlm_handles */
643
644 /**
645  * Fill in "on the wire" representation for given LDLM lock into supplied
646  * lock descriptor \a desc structure.
647  */
648 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
649 {
650         ldlm_res2desc(lock->l_resource, &desc->l_resource);
651         desc->l_req_mode = lock->l_req_mode;
652         desc->l_granted_mode = lock->l_granted_mode;
653         ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
654                                     &lock->l_policy_data,
655                                     &desc->l_policy_data);
656 }
657 EXPORT_SYMBOL(ldlm_lock2desc);
658
659 /**
660  * Add a lock to list of conflicting locks to send AST to.
661  *
662  * Only add if we have not sent a blocking AST to the lock yet.
663  */
664 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
665                            struct list_head *work_list)
666 {
667         if (!ldlm_is_ast_sent(lock)) {
668                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
669                 ldlm_set_ast_sent(lock);
670                 /* If the enqueuing client said so, tell the AST recipient to
671                  * discard dirty data, rather than writing back. */
672                 if (ldlm_is_ast_discard_data(new))
673                         ldlm_set_discard_data(lock);
674                 LASSERT(list_empty(&lock->l_bl_ast));
675                 list_add(&lock->l_bl_ast, work_list);
676                 LDLM_LOCK_GET(lock);
677                 LASSERT(lock->l_blocking_lock == NULL);
678                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
679         }
680 }
681
682 /**
683  * Add a lock to list of just granted locks to send completion AST to.
684  */
685 void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list)
686 {
687         if (!ldlm_is_cp_reqd(lock)) {
688                 ldlm_set_cp_reqd(lock);
689                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
690                 LASSERT(list_empty(&lock->l_cp_ast));
691                 list_add(&lock->l_cp_ast, work_list);
692                 LDLM_LOCK_GET(lock);
693         }
694 }
695
696 /**
697  * Aggregator function to add AST work items into a list. Determines
698  * what sort of an AST work needs to be done and calls the proper
699  * adding function.
700  * Must be called with lr_lock held.
701  */
702 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
703                             struct list_head *work_list)
704 {
705         ENTRY;
706         check_res_locked(lock->l_resource);
707         if (new)
708                 ldlm_add_bl_work_item(lock, new, work_list);
709         else
710                 ldlm_add_cp_work_item(lock, work_list);
711         EXIT;
712 }
713
714 /**
715  * Add specified reader/writer reference to LDLM lock with handle \a lockh.
716  * r/w reference type is determined by \a mode
717  * Calls ldlm_lock_addref_internal.
718  */
719 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
720 {
721         struct ldlm_lock *lock;
722
723         lock = ldlm_handle2lock(lockh);
724         LASSERT(lock != NULL);
725         ldlm_lock_addref_internal(lock, mode);
726         LDLM_LOCK_PUT(lock);
727 }
728 EXPORT_SYMBOL(ldlm_lock_addref);
729
730 /**
731  * Helper function.
732  * Add specified reader/writer reference to LDLM lock \a lock.
733  * r/w reference type is determined by \a mode
734  * Removes lock from LRU if it is there.
735  * Assumes the LDLM lock is already locked.
736  */
737 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
738 {
739         ldlm_lock_remove_from_lru(lock);
740         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
741                 lock->l_readers++;
742                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
743         }
744         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
745                 lock->l_writers++;
746                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
747         }
748         LDLM_LOCK_GET(lock);
749         lu_ref_add_atomic(&lock->l_reference, "user", lock);
750         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
751 }
752
753 /**
754  * Attempts to add reader/writer reference to a lock with handle \a lockh, and
755  * fails if lock is already LDLM_FL_CBPENDING or destroyed.
756  *
757  * \retval 0 success, lock was addref-ed
758  *
759  * \retval -EAGAIN lock is being canceled.
760  */
761 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
762 {
763         struct ldlm_lock *lock;
764         int               result;
765
766         result = -EAGAIN;
767         lock = ldlm_handle2lock(lockh);
768         if (lock != NULL) {
769                 lock_res_and_lock(lock);
770                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
771                     !ldlm_is_cbpending(lock)) {
772                         ldlm_lock_addref_internal_nolock(lock, mode);
773                         result = 0;
774                 }
775                 unlock_res_and_lock(lock);
776                 LDLM_LOCK_PUT(lock);
777         }
778         return result;
779 }
780 EXPORT_SYMBOL(ldlm_lock_addref_try);
781
782 /**
783  * Add specified reader/writer reference to LDLM lock \a lock.
784  * Locks LDLM lock and calls ldlm_lock_addref_internal_nolock to do the work.
785  * Only called for local locks.
786  */
787 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
788 {
789         lock_res_and_lock(lock);
790         ldlm_lock_addref_internal_nolock(lock, mode);
791         unlock_res_and_lock(lock);
792 }
793
794 /**
795  * Removes reader/writer reference for LDLM lock \a lock.
796  * Assumes LDLM lock is already locked.
797  * only called in ldlm_flock_destroy and for local locks.
798  * Does NOT add lock to LRU if no r/w references left to accomodate flock locks
799  * that cannot be placed in LRU.
800  */
801 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
802 {
803         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
804         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
805                 LASSERT(lock->l_readers > 0);
806                 lu_ref_del(&lock->l_reference, "reader", lock);
807                 lock->l_readers--;
808         }
809         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
810                 LASSERT(lock->l_writers > 0);
811                 lu_ref_del(&lock->l_reference, "writer", lock);
812                 lock->l_writers--;
813         }
814
815         lu_ref_del(&lock->l_reference, "user", lock);
816         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
817 }
818
819 /**
820  * Removes reader/writer reference for LDLM lock \a lock.
821  * Locks LDLM lock first.
822  * If the lock is determined to be client lock on a client and r/w refcount
823  * drops to zero and the lock is not blocked, the lock is added to LRU lock
824  * on the namespace.
825  * For blocked LDLM locks if r/w count drops to zero, blocking_ast is called.
826  */
827 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
828 {
829         struct ldlm_namespace *ns;
830         ENTRY;
831
832         lock_res_and_lock(lock);
833
834         ns = ldlm_lock_to_ns(lock);
835
836         ldlm_lock_decref_internal_nolock(lock, mode);
837
838         if (ldlm_is_local(lock) &&
839             !lock->l_readers && !lock->l_writers) {
840                 /* If this is a local lock on a server namespace and this was
841                  * the last reference, cancel the lock. */
842                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
843                 ldlm_set_cbpending(lock);
844         }
845
846         if (!lock->l_readers && !lock->l_writers &&
847             ldlm_is_cbpending(lock)) {
848                 /* If we received a blocked AST and this was the last reference,
849                  * run the callback. */
850                 if (ldlm_is_ns_srv(lock) && lock->l_export)
851                         CERROR("FL_CBPENDING set on non-local lock--just a "
852                                "warning\n");
853
854                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
855
856                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
857                 ldlm_lock_remove_from_lru(lock);
858                 unlock_res_and_lock(lock);
859
860                 if (ldlm_is_fail_loc(lock))
861                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
862
863                 if (ldlm_is_atomic_cb(lock) ||
864                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
865                         ldlm_handle_bl_callback(ns, NULL, lock);
866         } else if (ns_is_client(ns) &&
867                    !lock->l_readers && !lock->l_writers &&
868                    !ldlm_is_no_lru(lock) &&
869                    !ldlm_is_bl_ast(lock)) {
870
871                 LDLM_DEBUG(lock, "add lock into lru list");
872
873                 /* If this is a client-side namespace and this was the last
874                  * reference, put it on the LRU. */
875                 ldlm_lock_add_to_lru(lock);
876                 unlock_res_and_lock(lock);
877
878                 if (ldlm_is_fail_loc(lock))
879                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
880
881                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
882                  * are not supported by the server, otherwise, it is done on
883                  * enqueue. */
884                 if (!exp_connect_cancelset(lock->l_conn_export) &&
885                     !ns_connect_lru_resize(ns))
886                         ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
887         } else {
888                 LDLM_DEBUG(lock, "do not add lock into lru list");
889                 unlock_res_and_lock(lock);
890         }
891
892         EXIT;
893 }
894
895 /**
896  * Decrease reader/writer refcount for LDLM lock with handle \a lockh
897  */
898 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
899 {
900         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
901         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
902         ldlm_lock_decref_internal(lock, mode);
903         LDLM_LOCK_PUT(lock);
904 }
905 EXPORT_SYMBOL(ldlm_lock_decref);
906
907 /**
908  * Decrease reader/writer refcount for LDLM lock with handle
909  * \a lockh and mark it for subsequent cancellation once r/w refcount
910  * drops to zero instead of putting into LRU.
911  *
912  * Typical usage is for GROUP locks which we cannot allow to be cached.
913  */
914 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
915 {
916         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
917         ENTRY;
918
919         LASSERT(lock != NULL);
920
921         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
922         lock_res_and_lock(lock);
923         ldlm_set_cbpending(lock);
924         unlock_res_and_lock(lock);
925         ldlm_lock_decref_internal(lock, mode);
926         LDLM_LOCK_PUT(lock);
927 }
928 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
929
930 struct sl_insert_point {
931         struct list_head *res_link;
932         struct list_head *mode_link;
933         struct list_head *policy_link;
934 };
935
936 /**
937  * Finds a position to insert the new lock into granted lock list.
938  *
939  * Used for locks eligible for skiplist optimization.
940  *
941  * Parameters:
942  *      queue [input]:  the granted list where search acts on;
943  *      req [input]:    the lock whose position to be located;
944  *      prev [output]:  positions within 3 lists to insert @req to
945  * Return Value:
946  *      filled @prev
947  * NOTE: called by
948  *  - ldlm_grant_lock_with_skiplist
949  */
950 static void search_granted_lock(struct list_head *queue,
951                                 struct ldlm_lock *req,
952                                 struct sl_insert_point *prev)
953 {
954         struct list_head *tmp;
955         struct ldlm_lock *lock, *mode_end, *policy_end;
956         ENTRY;
957
958         list_for_each(tmp, queue) {
959                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
960
961                 mode_end = list_entry(lock->l_sl_mode.prev,
962                                           struct ldlm_lock, l_sl_mode);
963
964                 if (lock->l_req_mode != req->l_req_mode) {
965                         /* jump to last lock of mode group */
966                         tmp = &mode_end->l_res_link;
967                         continue;
968                 }
969
970                 /* suitable mode group is found */
971                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
972                         /* insert point is last lock of the mode group */
973                         prev->res_link = &mode_end->l_res_link;
974                         prev->mode_link = &mode_end->l_sl_mode;
975                         prev->policy_link = &req->l_sl_policy;
976                         EXIT;
977                         return;
978                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
979                         for (;;) {
980                                 policy_end =
981                                         list_entry(lock->l_sl_policy.prev,
982                                                        struct ldlm_lock,
983                                                        l_sl_policy);
984
985                                 if (lock->l_policy_data.l_inodebits.bits ==
986                                     req->l_policy_data.l_inodebits.bits) {
987                                         /* insert point is last lock of
988                                          * the policy group */
989                                         prev->res_link =
990                                                 &policy_end->l_res_link;
991                                         prev->mode_link =
992                                                 &policy_end->l_sl_mode;
993                                         prev->policy_link =
994                                                 &policy_end->l_sl_policy;
995                                         EXIT;
996                                         return;
997                                 }
998
999                                 if (policy_end == mode_end)
1000                                         /* done with mode group */
1001                                         break;
1002
1003                                 /* go to next policy group within mode group */
1004                                 tmp = policy_end->l_res_link.next;
1005                                 lock = list_entry(tmp, struct ldlm_lock,
1006                                                       l_res_link);
1007                         }  /* loop over policy groups within the mode group */
1008
1009                         /* insert point is last lock of the mode group,
1010                          * new policy group is started */
1011                         prev->res_link = &mode_end->l_res_link;
1012                         prev->mode_link = &mode_end->l_sl_mode;
1013                         prev->policy_link = &req->l_sl_policy;
1014                         EXIT;
1015                         return;
1016                 } else {
1017                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
1018                         LBUG();
1019                 }
1020         }
1021
1022         /* insert point is last lock on the queue,
1023          * new mode group and new policy group are started */
1024         prev->res_link = queue->prev;
1025         prev->mode_link = &req->l_sl_mode;
1026         prev->policy_link = &req->l_sl_policy;
1027         EXIT;
1028         return;
1029 }
1030
1031 /**
1032  * Add a lock into resource granted list after a position described by
1033  * \a prev.
1034  */
1035 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
1036                                        struct sl_insert_point *prev)
1037 {
1038         struct ldlm_resource *res = lock->l_resource;
1039         ENTRY;
1040
1041         check_res_locked(res);
1042
1043         ldlm_resource_dump(D_INFO, res);
1044         LDLM_DEBUG(lock, "About to add lock:");
1045
1046         if (ldlm_is_destroyed(lock)) {
1047                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
1048                 return;
1049         }
1050
1051         LASSERT(list_empty(&lock->l_res_link));
1052         LASSERT(list_empty(&lock->l_sl_mode));
1053         LASSERT(list_empty(&lock->l_sl_policy));
1054
1055         /*
1056          * lock->link == prev->link means lock is first starting the group.
1057          * Don't re-add to itself to suppress kernel warnings.
1058          */
1059         if (&lock->l_res_link != prev->res_link)
1060                 list_add(&lock->l_res_link, prev->res_link);
1061         if (&lock->l_sl_mode != prev->mode_link)
1062                 list_add(&lock->l_sl_mode, prev->mode_link);
1063         if (&lock->l_sl_policy != prev->policy_link)
1064                 list_add(&lock->l_sl_policy, prev->policy_link);
1065
1066         EXIT;
1067 }
1068
1069 /**
1070  * Add a lock to granted list on a resource maintaining skiplist
1071  * correctness.
1072  */
1073 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
1074 {
1075         struct sl_insert_point prev;
1076         ENTRY;
1077
1078         LASSERT(lock->l_req_mode == lock->l_granted_mode);
1079
1080         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
1081         ldlm_granted_list_add_lock(lock, &prev);
1082         EXIT;
1083 }
1084
1085 /**
1086  * Perform lock granting bookkeeping.
1087  *
1088  * Includes putting the lock into granted list and updating lock mode.
1089  * NOTE: called by
1090  *  - ldlm_lock_enqueue
1091  *  - ldlm_reprocess_queue
1092  *  - ldlm_lock_convert
1093  *
1094  * must be called with lr_lock held
1095  */
1096 void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
1097 {
1098         struct ldlm_resource *res = lock->l_resource;
1099         ENTRY;
1100
1101         check_res_locked(res);
1102
1103         lock->l_granted_mode = lock->l_req_mode;
1104
1105         if (work_list && lock->l_completion_ast != NULL)
1106                 ldlm_add_ast_work_item(lock, NULL, work_list);
1107
1108         /* We should not add locks to granted list in the following cases:
1109          * - this is an UNLOCK but not a real lock;
1110          * - this is a TEST lock;
1111          * - this is a F_CANCELLK lock (async flock has req_mode == 0)
1112          * - this is a deadlock (flock cannot be granted) */
1113         if (lock->l_req_mode == 0 ||
1114             lock->l_req_mode == LCK_NL ||
1115             ldlm_is_test_lock(lock) ||
1116             ldlm_is_flock_deadlock(lock))
1117                 RETURN_EXIT;
1118
1119         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1120                 ldlm_grant_lock_with_skiplist(lock);
1121         else if (res->lr_type == LDLM_EXTENT)
1122                 ldlm_extent_add_lock(res, lock);
1123         else
1124                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1125
1126         if (lock->l_granted_mode < res->lr_most_restr)
1127                 res->lr_most_restr = lock->l_granted_mode;
1128
1129         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1130         EXIT;
1131 }
1132
1133 /**
1134  * Search for a lock with given properties in a queue.
1135  *
1136  * \retval a referenced lock or NULL.  See the flag descriptions below, in the
1137  * comment above ldlm_lock_match
1138  */
1139 static struct ldlm_lock *search_queue(struct list_head *queue,
1140                                       ldlm_mode_t *mode,
1141                                       ldlm_policy_data_t *policy,
1142                                       struct ldlm_lock *old_lock,
1143                                       __u64 flags, int unref)
1144 {
1145         struct ldlm_lock *lock;
1146         struct list_head       *tmp;
1147
1148         list_for_each(tmp, queue) {
1149                 ldlm_mode_t match;
1150
1151                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1152
1153                 if (lock == old_lock)
1154                         break;
1155
1156                 /* Check if this lock can be matched.
1157                  * Used by LU-2919(exclusive open) for open lease lock */
1158                 if (ldlm_is_excl(lock))
1159                         continue;
1160
1161                 /* llite sometimes wants to match locks that will be
1162                  * canceled when their users drop, but we allow it to match
1163                  * if it passes in CBPENDING and the lock still has users.
1164                  * this is generally only going to be used by children
1165                  * whose parents already hold a lock so forward progress
1166                  * can still happen. */
1167                 if (ldlm_is_cbpending(lock) &&
1168                     !(flags & LDLM_FL_CBPENDING))
1169                         continue;
1170                 if (!unref && ldlm_is_cbpending(lock) &&
1171                     lock->l_readers == 0 && lock->l_writers == 0)
1172                         continue;
1173
1174                 if (!(lock->l_req_mode & *mode))
1175                         continue;
1176                 match = lock->l_req_mode;
1177
1178                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
1179                     (lock->l_policy_data.l_extent.start >
1180                      policy->l_extent.start ||
1181                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
1182                         continue;
1183
1184                 if (unlikely(match == LCK_GROUP) &&
1185                     lock->l_resource->lr_type == LDLM_EXTENT &&
1186                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1187                         continue;
1188
1189                 /* We match if we have existing lock with same or wider set
1190                    of bits. */
1191                 if (lock->l_resource->lr_type == LDLM_IBITS &&
1192                      ((lock->l_policy_data.l_inodebits.bits &
1193                       policy->l_inodebits.bits) !=
1194                       policy->l_inodebits.bits))
1195                         continue;
1196
1197                 if (!unref && LDLM_HAVE_MASK(lock, GONE))
1198                         continue;
1199
1200                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
1201                     !ldlm_is_local(lock))
1202                         continue;
1203
1204                 if (flags & LDLM_FL_TEST_LOCK) {
1205                         LDLM_LOCK_GET(lock);
1206                         ldlm_lock_touch_in_lru(lock);
1207                 } else {
1208                         ldlm_lock_addref_internal_nolock(lock, match);
1209                 }
1210                 *mode = match;
1211                 return lock;
1212         }
1213
1214         return NULL;
1215 }
1216
1217 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1218 {
1219         if ((lock->l_flags & LDLM_FL_FAIL_NOTIFIED) == 0) {
1220                 lock->l_flags |= LDLM_FL_FAIL_NOTIFIED;
1221                 wake_up_all(&lock->l_waitq);
1222         }
1223 }
1224 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1225
1226 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1227 {
1228         lock_res_and_lock(lock);
1229         ldlm_lock_fail_match_locked(lock);
1230         unlock_res_and_lock(lock);
1231 }
1232 EXPORT_SYMBOL(ldlm_lock_fail_match);
1233
1234 /**
1235  * Mark lock as "matchable" by OST.
1236  *
1237  * Used to prevent certain races in LOV/OSC where the lock is granted, but LVB
1238  * is not yet valid.
1239  * Assumes LDLM lock is already locked.
1240  */
1241 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1242 {
1243         ldlm_set_lvb_ready(lock);
1244         wake_up_all(&lock->l_waitq);
1245 }
1246 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
1247
1248 /**
1249  * Mark lock as "matchable" by OST.
1250  * Locks the lock and then \see ldlm_lock_allow_match_locked
1251  */
1252 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1253 {
1254         lock_res_and_lock(lock);
1255         ldlm_lock_allow_match_locked(lock);
1256         unlock_res_and_lock(lock);
1257 }
1258 EXPORT_SYMBOL(ldlm_lock_allow_match);
1259
1260 /**
1261  * Attempt to find a lock with specified properties.
1262  *
1263  * Typically returns a reference to matched lock unless LDLM_FL_TEST_LOCK is
1264  * set in \a flags
1265  *
1266  * Can be called in two ways:
1267  *
1268  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1269  * for a duplicate of.
1270  *
1271  * Otherwise, all of the fields must be filled in, to match against.
1272  *
1273  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1274  *     server (ie, connh is NULL)
1275  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1276  *     list will be considered
1277  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1278  *     to be canceled can still be matched as long as they still have reader
1279  *     or writer refernces
1280  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1281  *     just tell us if we would have matched.
1282  *
1283  * \retval 1 if it finds an already-existing lock that is compatible; in this
1284  * case, lockh is filled in with a addref()ed lock
1285  *
1286  * We also check security context, and if that fails we simply return 0 (to
1287  * keep caller code unchanged), the context failure will be discovered by
1288  * caller sometime later.
1289  */
1290 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
1291                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1292                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1293                             struct lustre_handle *lockh, int unref)
1294 {
1295         struct ldlm_resource *res;
1296         struct ldlm_lock *lock, *old_lock = NULL;
1297         int rc = 0;
1298         ENTRY;
1299
1300         if (ns == NULL) {
1301                 old_lock = ldlm_handle2lock(lockh);
1302                 LASSERT(old_lock);
1303
1304                 ns = ldlm_lock_to_ns(old_lock);
1305                 res_id = &old_lock->l_resource->lr_name;
1306                 type = old_lock->l_resource->lr_type;
1307                 mode = old_lock->l_req_mode;
1308         }
1309
1310         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1311         if (IS_ERR(res)) {
1312                 LASSERT(old_lock == NULL);
1313                 RETURN(0);
1314         }
1315
1316         LDLM_RESOURCE_ADDREF(res);
1317         lock_res(res);
1318
1319         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1320                             flags, unref);
1321         if (lock != NULL)
1322                 GOTO(out, rc = 1);
1323         if (flags & LDLM_FL_BLOCK_GRANTED)
1324                 GOTO(out, rc = 0);
1325         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1326                             flags, unref);
1327         if (lock != NULL)
1328                 GOTO(out, rc = 1);
1329         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1330                             flags, unref);
1331         if (lock != NULL)
1332                 GOTO(out, rc = 1);
1333
1334         EXIT;
1335  out:
1336         unlock_res(res);
1337         LDLM_RESOURCE_DELREF(res);
1338         ldlm_resource_putref(res);
1339
1340         if (lock) {
1341                 ldlm_lock2handle(lock, lockh);
1342                 if ((flags & LDLM_FL_LVB_READY) &&
1343                     (!ldlm_is_lvb_ready(lock))) {
1344                         __u64 wait_flags = LDLM_FL_LVB_READY |
1345                                 LDLM_FL_DESTROYED | LDLM_FL_FAIL_NOTIFIED;
1346                         struct l_wait_info lwi;
1347                         if (lock->l_completion_ast) {
1348                                 int err = lock->l_completion_ast(lock,
1349                                                           LDLM_FL_WAIT_NOREPROC,
1350                                                                  NULL);
1351                                 if (err) {
1352                                         if (flags & LDLM_FL_TEST_LOCK)
1353                                                 LDLM_LOCK_RELEASE(lock);
1354                                         else
1355                                                 ldlm_lock_decref_internal(lock,
1356                                                                           mode);
1357                                         rc = 0;
1358                                         goto out2;
1359                                 }
1360                         }
1361
1362                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1363                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1364
1365                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1366                         l_wait_event(lock->l_waitq,
1367                                      lock->l_flags & wait_flags,
1368                                      &lwi);
1369                         if (!ldlm_is_lvb_ready(lock)) {
1370                                 if (flags & LDLM_FL_TEST_LOCK)
1371                                         LDLM_LOCK_RELEASE(lock);
1372                                 else
1373                                         ldlm_lock_decref_internal(lock, mode);
1374                                 rc = 0;
1375                         }
1376                 }
1377         }
1378  out2:
1379         if (rc) {
1380                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1381                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1382                                 res_id->name[2] : policy->l_extent.start,
1383                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1384                                 res_id->name[3] : policy->l_extent.end);
1385
1386                 /* check user's security context */
1387                 if (lock->l_conn_export &&
1388                     sptlrpc_import_check_ctx(
1389                                 class_exp2cliimp(lock->l_conn_export))) {
1390                         if (!(flags & LDLM_FL_TEST_LOCK))
1391                                 ldlm_lock_decref_internal(lock, mode);
1392                         rc = 0;
1393                 }
1394
1395                 if (flags & LDLM_FL_TEST_LOCK)
1396                         LDLM_LOCK_RELEASE(lock);
1397
1398         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1399                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1400                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1401                                   type, mode, res_id->name[0], res_id->name[1],
1402                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1403                                         res_id->name[2] :policy->l_extent.start,
1404                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1405                                         res_id->name[3] : policy->l_extent.end);
1406         }
1407         if (old_lock)
1408                 LDLM_LOCK_PUT(old_lock);
1409
1410         return rc ? mode : 0;
1411 }
1412 EXPORT_SYMBOL(ldlm_lock_match);
1413
1414 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1415                                         __u64 *bits)
1416 {
1417         struct ldlm_lock *lock;
1418         ldlm_mode_t mode = 0;
1419         ENTRY;
1420
1421         lock = ldlm_handle2lock(lockh);
1422         if (lock != NULL) {
1423                 lock_res_and_lock(lock);
1424                 if (LDLM_HAVE_MASK(lock, GONE))
1425                         GOTO(out, mode);
1426
1427                 if (ldlm_is_cbpending(lock) &&
1428                     lock->l_readers == 0 && lock->l_writers == 0)
1429                         GOTO(out, mode);
1430
1431                 if (bits)
1432                         *bits = lock->l_policy_data.l_inodebits.bits;
1433                 mode = lock->l_granted_mode;
1434                 ldlm_lock_addref_internal_nolock(lock, mode);
1435         }
1436
1437         EXIT;
1438
1439 out:
1440         if (lock != NULL) {
1441                 unlock_res_and_lock(lock);
1442                 LDLM_LOCK_PUT(lock);
1443         }
1444         return mode;
1445 }
1446 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1447
1448 /** The caller must guarantee that the buffer is large enough. */
1449 int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
1450                   enum req_location loc, void *data, int size)
1451 {
1452         void *lvb;
1453         ENTRY;
1454
1455         LASSERT(data != NULL);
1456         LASSERT(size >= 0);
1457
1458         switch (lock->l_lvb_type) {
1459         case LVB_T_OST:
1460                 if (size == sizeof(struct ost_lvb)) {
1461                         if (loc == RCL_CLIENT)
1462                                 lvb = req_capsule_client_swab_get(pill,
1463                                                 &RMF_DLM_LVB,
1464                                                 lustre_swab_ost_lvb);
1465                         else
1466                                 lvb = req_capsule_server_swab_get(pill,
1467                                                 &RMF_DLM_LVB,
1468                                                 lustre_swab_ost_lvb);
1469                         if (unlikely(lvb == NULL)) {
1470                                 LDLM_ERROR(lock, "no LVB");
1471                                 RETURN(-EPROTO);
1472                         }
1473
1474                         memcpy(data, lvb, size);
1475                 } else if (size == sizeof(struct ost_lvb_v1)) {
1476                         struct ost_lvb *olvb = data;
1477
1478                         if (loc == RCL_CLIENT)
1479                                 lvb = req_capsule_client_swab_get(pill,
1480                                                 &RMF_DLM_LVB,
1481                                                 lustre_swab_ost_lvb_v1);
1482                         else
1483                                 lvb = req_capsule_server_sized_swab_get(pill,
1484                                                 &RMF_DLM_LVB, size,
1485                                                 lustre_swab_ost_lvb_v1);
1486                         if (unlikely(lvb == NULL)) {
1487                                 LDLM_ERROR(lock, "no LVB");
1488                                 RETURN(-EPROTO);
1489                         }
1490
1491                         memcpy(data, lvb, size);
1492                         olvb->lvb_mtime_ns = 0;
1493                         olvb->lvb_atime_ns = 0;
1494                         olvb->lvb_ctime_ns = 0;
1495                 } else {
1496                         LDLM_ERROR(lock, "Replied unexpected ost LVB size %d",
1497                                    size);
1498                         RETURN(-EINVAL);
1499                 }
1500                 break;
1501         case LVB_T_LQUOTA:
1502                 if (size == sizeof(struct lquota_lvb)) {
1503                         if (loc == RCL_CLIENT)
1504                                 lvb = req_capsule_client_swab_get(pill,
1505                                                 &RMF_DLM_LVB,
1506                                                 lustre_swab_lquota_lvb);
1507                         else
1508                                 lvb = req_capsule_server_swab_get(pill,
1509                                                 &RMF_DLM_LVB,
1510                                                 lustre_swab_lquota_lvb);
1511                         if (unlikely(lvb == NULL)) {
1512                                 LDLM_ERROR(lock, "no LVB");
1513                                 RETURN(-EPROTO);
1514                         }
1515
1516                         memcpy(data, lvb, size);
1517                 } else {
1518                         LDLM_ERROR(lock, "Replied unexpected lquota LVB size %d",
1519                                    size);
1520                         RETURN(-EINVAL);
1521                 }
1522                 break;
1523         case LVB_T_LAYOUT:
1524                 if (size == 0)
1525                         break;
1526
1527                 if (loc == RCL_CLIENT)
1528                         lvb = req_capsule_client_get(pill, &RMF_DLM_LVB);
1529                 else
1530                         lvb = req_capsule_server_get(pill, &RMF_DLM_LVB);
1531                 if (unlikely(lvb == NULL)) {
1532                         LDLM_ERROR(lock, "no LVB");
1533                         RETURN(-EPROTO);
1534                 }
1535
1536                 memcpy(data, lvb, size);
1537                 break;
1538         default:
1539                 LDLM_ERROR(lock, "Unknown LVB type: %d\n", lock->l_lvb_type);
1540                 libcfs_debug_dumpstack(NULL);
1541                 RETURN(-EINVAL);
1542         }
1543
1544         RETURN(0);
1545 }
1546
1547 /**
1548  * Create and fill in new LDLM lock with specified properties.
1549  * Returns a referenced lock
1550  */
1551 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1552                                    const struct ldlm_res_id *res_id,
1553                                    ldlm_type_t type,
1554                                    ldlm_mode_t mode,
1555                                    const struct ldlm_callback_suite *cbs,
1556                                    void *data, __u32 lvb_len,
1557                                    enum lvb_type lvb_type)
1558 {
1559         struct ldlm_lock        *lock;
1560         struct ldlm_resource    *res;
1561         int                     rc;
1562         ENTRY;
1563
1564         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1565         if (IS_ERR(res))
1566                 RETURN(ERR_CAST(res));
1567
1568         lock = ldlm_lock_new(res);
1569         if (lock == NULL)
1570                 RETURN(ERR_PTR(-ENOMEM));
1571
1572         lock->l_req_mode = mode;
1573         lock->l_ast_data = data;
1574         lock->l_pid = current_pid();
1575         if (ns_is_server(ns))
1576                 ldlm_set_ns_srv(lock);
1577         if (cbs) {
1578                 lock->l_blocking_ast = cbs->lcs_blocking;
1579                 lock->l_completion_ast = cbs->lcs_completion;
1580                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1581         }
1582
1583         lock->l_tree_node = NULL;
1584         /* if this is the extent lock, allocate the interval tree node */
1585         if (type == LDLM_EXTENT)
1586                 if (ldlm_interval_alloc(lock) == NULL)
1587                         GOTO(out, rc = -ENOMEM);
1588
1589         if (lvb_len) {
1590                 lock->l_lvb_len = lvb_len;
1591                 OBD_ALLOC_LARGE(lock->l_lvb_data, lvb_len);
1592                 if (lock->l_lvb_data == NULL)
1593                         GOTO(out, rc = -ENOMEM);
1594         }
1595
1596         lock->l_lvb_type = lvb_type;
1597         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1598                 GOTO(out, rc = -ENOENT);
1599
1600         RETURN(lock);
1601
1602 out:
1603         ldlm_lock_destroy(lock);
1604         LDLM_LOCK_RELEASE(lock);
1605         RETURN(ERR_PTR(rc));
1606 }
1607
1608 /**
1609  * Enqueue (request) a lock.
1610  *
1611  * Does not block. As a result of enqueue the lock would be put
1612  * into granted or waiting list.
1613  *
1614  * If namespace has intent policy sent and the lock has LDLM_FL_HAS_INTENT flag
1615  * set, skip all the enqueueing and delegate lock processing to intent policy
1616  * function.
1617  */
1618 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1619                                struct ldlm_lock **lockp,
1620                                void *cookie, __u64 *flags)
1621 {
1622         struct ldlm_lock *lock = *lockp;
1623         struct ldlm_resource *res = lock->l_resource;
1624         int local = ns_is_client(ldlm_res_to_ns(res));
1625 #ifdef HAVE_SERVER_SUPPORT
1626         ldlm_processing_policy policy;
1627 #endif
1628         ldlm_error_t rc = ELDLM_OK;
1629         struct ldlm_interval *node = NULL;
1630         ENTRY;
1631
1632         /* policies are not executed on the client or during replay */
1633         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1634             && !local && ns->ns_policy) {
1635                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1636                                    NULL);
1637                 if (rc == ELDLM_LOCK_REPLACED) {
1638                         /* The lock that was returned has already been granted,
1639                          * and placed into lockp.  If it's not the same as the
1640                          * one we passed in, then destroy the old one and our
1641                          * work here is done. */
1642                         if (lock != *lockp) {
1643                                 ldlm_lock_destroy(lock);
1644                                 LDLM_LOCK_RELEASE(lock);
1645                         }
1646                         *flags |= LDLM_FL_LOCK_CHANGED;
1647                         RETURN(0);
1648                 } else if (rc != ELDLM_OK ||
1649                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1650                         ldlm_lock_destroy(lock);
1651                         RETURN(rc);
1652                 }
1653         }
1654
1655         if (*flags & LDLM_FL_RESENT) {
1656                 /* Reconstruct LDLM_FL_SRV_ENQ_MASK @flags for reply.
1657                  * Set LOCK_CHANGED always.
1658                  * Check if the lock is granted for BLOCK_GRANTED.
1659                  * Take NO_TIMEOUT from the lock as it is inherited through
1660                  * LDLM_FL_INHERIT_MASK */
1661                 *flags |= LDLM_FL_LOCK_CHANGED;
1662                 if (lock->l_req_mode != lock->l_granted_mode)
1663                         *flags |= LDLM_FL_BLOCK_GRANTED;
1664                 *flags |= lock->l_flags & LDLM_FL_NO_TIMEOUT;
1665                 RETURN(ELDLM_OK);
1666         }
1667
1668         /* For a replaying lock, it might be already in granted list. So
1669          * unlinking the lock will cause the interval node to be freed, we
1670          * have to allocate the interval node early otherwise we can't regrant
1671          * this lock in the future. - jay */
1672         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1673                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
1674
1675         lock_res_and_lock(lock);
1676         if (local && lock->l_req_mode == lock->l_granted_mode) {
1677                 /* The server returned a blocked lock, but it was granted
1678                  * before we got a chance to actually enqueue it.  We don't
1679                  * need to do anything else. */
1680                 *flags &= ~LDLM_FL_BLOCKED_MASK;
1681                 GOTO(out, rc = ELDLM_OK);
1682         }
1683
1684         ldlm_resource_unlink_lock(lock);
1685         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1686                 if (node == NULL) {
1687                         ldlm_lock_destroy_nolock(lock);
1688                         GOTO(out, rc = -ENOMEM);
1689                 }
1690
1691                 INIT_LIST_HEAD(&node->li_group);
1692                 ldlm_interval_attach(node, lock);
1693                 node = NULL;
1694         }
1695
1696         /* Some flags from the enqueue want to make it into the AST, via the
1697          * lock's l_flags. */
1698         if (*flags & LDLM_FL_AST_DISCARD_DATA)
1699                 ldlm_set_ast_discard_data(lock);
1700         if (*flags & LDLM_FL_TEST_LOCK)
1701                 ldlm_set_test_lock(lock);
1702
1703         /* This distinction between local lock trees is very important; a client
1704          * namespace only has information about locks taken by that client, and
1705          * thus doesn't have enough information to decide for itself if it can
1706          * be granted (below).  In this case, we do exactly what the server
1707          * tells us to do, as dictated by the 'flags'.
1708          *
1709          * We do exactly the same thing during recovery, when the server is
1710          * more or less trusting the clients not to lie.
1711          *
1712          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1713          * granted/converting queues. */
1714         if (local) {
1715                 if (*flags & LDLM_FL_BLOCK_CONV)
1716                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1717                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1718                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1719                 else
1720                         ldlm_grant_lock(lock, NULL);
1721                 GOTO(out, rc = ELDLM_OK);
1722 #ifdef HAVE_SERVER_SUPPORT
1723         } else if (*flags & LDLM_FL_REPLAY) {
1724                 if (*flags & LDLM_FL_BLOCK_CONV) {
1725                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1726                         GOTO(out, rc = ELDLM_OK);
1727                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1728                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1729                         GOTO(out, rc = ELDLM_OK);
1730                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1731                         ldlm_grant_lock(lock, NULL);
1732                         GOTO(out, rc = ELDLM_OK);
1733                 }
1734                 /* If no flags, fall through to normal enqueue path. */
1735         }
1736
1737         policy = ldlm_processing_policy_table[res->lr_type];
1738         policy(lock, flags, 1, &rc, NULL);
1739         GOTO(out, rc);
1740 #else
1741         } else {
1742                 CERROR("This is client-side-only module, cannot handle "
1743                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1744                 LBUG();
1745         }
1746 #endif
1747
1748 out:
1749         unlock_res_and_lock(lock);
1750         if (node)
1751                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1752         return rc;
1753 }
1754
1755 #ifdef HAVE_SERVER_SUPPORT
1756 /**
1757  * Iterate through all waiting locks on a given resource queue and attempt to
1758  * grant them.
1759  *
1760  * Must be called with resource lock held.
1761  */
1762 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
1763                          struct list_head *work_list)
1764 {
1765         struct list_head *tmp, *pos;
1766         ldlm_processing_policy policy;
1767         __u64 flags;
1768         int rc = LDLM_ITER_CONTINUE;
1769         ldlm_error_t err;
1770         ENTRY;
1771
1772         check_res_locked(res);
1773
1774         policy = ldlm_processing_policy_table[res->lr_type];
1775         LASSERT(policy);
1776
1777         list_for_each_safe(tmp, pos, queue) {
1778                 struct ldlm_lock *pending;
1779                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
1780
1781                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1782
1783                 flags = 0;
1784                 rc = policy(pending, &flags, 0, &err, work_list);
1785                 if (rc != LDLM_ITER_CONTINUE)
1786                         break;
1787         }
1788
1789         RETURN(rc);
1790 }
1791 #endif
1792
1793 /**
1794  * Process a call to blocking AST callback for a lock in ast_work list
1795  */
1796 static int
1797 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1798 {
1799         struct ldlm_cb_set_arg *arg = opaq;
1800         struct ldlm_lock_desc   d;
1801         int                     rc;
1802         struct ldlm_lock       *lock;
1803         ENTRY;
1804
1805         if (list_empty(arg->list))
1806                 RETURN(-ENOENT);
1807
1808         lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1809
1810         /* nobody should touch l_bl_ast */
1811         lock_res_and_lock(lock);
1812         list_del_init(&lock->l_bl_ast);
1813
1814         LASSERT(ldlm_is_ast_sent(lock));
1815         LASSERT(lock->l_bl_ast_run == 0);
1816         LASSERT(lock->l_blocking_lock);
1817         lock->l_bl_ast_run++;
1818         unlock_res_and_lock(lock);
1819
1820         ldlm_lock2desc(lock->l_blocking_lock, &d);
1821
1822         rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1823         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1824         lock->l_blocking_lock = NULL;
1825         LDLM_LOCK_RELEASE(lock);
1826
1827         RETURN(rc);
1828 }
1829
1830 /**
1831  * Process a call to completion AST callback for a lock in ast_work list
1832  */
1833 static int
1834 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1835 {
1836         struct ldlm_cb_set_arg  *arg = opaq;
1837         int                      rc = 0;
1838         struct ldlm_lock        *lock;
1839         ldlm_completion_callback completion_callback;
1840         ENTRY;
1841
1842         if (list_empty(arg->list))
1843                 RETURN(-ENOENT);
1844
1845         lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1846
1847         /* It's possible to receive a completion AST before we've set
1848          * the l_completion_ast pointer: either because the AST arrived
1849          * before the reply, or simply because there's a small race
1850          * window between receiving the reply and finishing the local
1851          * enqueue. (bug 842)
1852          *
1853          * This can't happen with the blocking_ast, however, because we
1854          * will never call the local blocking_ast until we drop our
1855          * reader/writer reference, which we won't do until we get the
1856          * reply and finish enqueueing. */
1857
1858         /* nobody should touch l_cp_ast */
1859         lock_res_and_lock(lock);
1860         list_del_init(&lock->l_cp_ast);
1861         LASSERT(ldlm_is_cp_reqd(lock));
1862         /* save l_completion_ast since it can be changed by
1863          * mds_intent_policy(), see bug 14225 */
1864         completion_callback = lock->l_completion_ast;
1865         ldlm_clear_cp_reqd(lock);
1866         unlock_res_and_lock(lock);
1867
1868         if (completion_callback != NULL)
1869                 rc = completion_callback(lock, 0, (void *)arg);
1870         LDLM_LOCK_RELEASE(lock);
1871
1872         RETURN(rc);
1873 }
1874
1875 /**
1876  * Process a call to revocation AST callback for a lock in ast_work list
1877  */
1878 static int
1879 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1880 {
1881         struct ldlm_cb_set_arg *arg = opaq;
1882         struct ldlm_lock_desc   desc;
1883         int                     rc;
1884         struct ldlm_lock       *lock;
1885         ENTRY;
1886
1887         if (list_empty(arg->list))
1888                 RETURN(-ENOENT);
1889
1890         lock = list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1891         list_del_init(&lock->l_rk_ast);
1892
1893         /* the desc just pretend to exclusive */
1894         ldlm_lock2desc(lock, &desc);
1895         desc.l_req_mode = LCK_EX;
1896         desc.l_granted_mode = 0;
1897
1898         rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1899         LDLM_LOCK_RELEASE(lock);
1900
1901         RETURN(rc);
1902 }
1903
1904 /**
1905  * Process a call to glimpse AST callback for a lock in ast_work list
1906  */
1907 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1908 {
1909         struct ldlm_cb_set_arg          *arg = opaq;
1910         struct ldlm_glimpse_work        *gl_work;
1911         struct ldlm_lock                *lock;
1912         int                              rc = 0;
1913         ENTRY;
1914
1915         if (list_empty(arg->list))
1916                 RETURN(-ENOENT);
1917
1918         gl_work = list_entry(arg->list->next, struct ldlm_glimpse_work,
1919                                  gl_list);
1920         list_del_init(&gl_work->gl_list);
1921
1922         lock = gl_work->gl_lock;
1923
1924         /* transfer the glimpse descriptor to ldlm_cb_set_arg */
1925         arg->gl_desc = gl_work->gl_desc;
1926
1927         /* invoke the actual glimpse callback */
1928         if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
1929                 rc = 1;
1930
1931         LDLM_LOCK_RELEASE(lock);
1932
1933         if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
1934                 OBD_FREE_PTR(gl_work);
1935
1936         RETURN(rc);
1937 }
1938
1939 /**
1940  * Process list of locks in need of ASTs being sent.
1941  *
1942  * Used on server to send multiple ASTs together instead of sending one by
1943  * one.
1944  */
1945 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
1946                       ldlm_desc_ast_t ast_type)
1947 {
1948         struct ldlm_cb_set_arg *arg;
1949         set_producer_func       work_ast_lock;
1950         int                     rc;
1951
1952         if (list_empty(rpc_list))
1953                 RETURN(0);
1954
1955         OBD_ALLOC_PTR(arg);
1956         if (arg == NULL)
1957                 RETURN(-ENOMEM);
1958
1959         atomic_set(&arg->restart, 0);
1960         arg->list = rpc_list;
1961
1962         switch (ast_type) {
1963                 case LDLM_WORK_BL_AST:
1964                         arg->type = LDLM_BL_CALLBACK;
1965                         work_ast_lock = ldlm_work_bl_ast_lock;
1966                         break;
1967                 case LDLM_WORK_CP_AST:
1968                         arg->type = LDLM_CP_CALLBACK;
1969                         work_ast_lock = ldlm_work_cp_ast_lock;
1970                         break;
1971                 case LDLM_WORK_REVOKE_AST:
1972                         arg->type = LDLM_BL_CALLBACK;
1973                         work_ast_lock = ldlm_work_revoke_ast_lock;
1974                         break;
1975                 case LDLM_WORK_GL_AST:
1976                         arg->type = LDLM_GL_CALLBACK;
1977                         work_ast_lock = ldlm_work_gl_ast_lock;
1978                         break;
1979                 default:
1980                         LBUG();
1981         }
1982
1983         /* We create a ptlrpc request set with flow control extension.
1984          * This request set will use the work_ast_lock function to produce new
1985          * requests and will send a new request each time one completes in order
1986          * to keep the number of requests in flight to ns_max_parallel_ast */
1987         arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1988                                      work_ast_lock, arg);
1989         if (arg->set == NULL)
1990                 GOTO(out, rc = -ENOMEM);
1991
1992         ptlrpc_set_wait(arg->set);
1993         ptlrpc_set_destroy(arg->set);
1994
1995         rc = atomic_read(&arg->restart) ? -ERESTART : 0;
1996         GOTO(out, rc);
1997 out:
1998         OBD_FREE_PTR(arg);
1999         return rc;
2000 }
2001
2002 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
2003 {
2004         ldlm_reprocess_all(res);
2005         return LDLM_ITER_CONTINUE;
2006 }
2007
2008 static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2009                               struct hlist_node *hnode, void *arg)
2010 {
2011         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2012         int    rc;
2013
2014         rc = reprocess_one_queue(res, arg);
2015
2016         return rc == LDLM_ITER_STOP;
2017 }
2018
2019 /**
2020  * Iterate through all resources on a namespace attempting to grant waiting
2021  * locks.
2022  */
2023 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
2024 {
2025         ENTRY;
2026
2027         if (ns != NULL) {
2028                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2029                                          ldlm_reprocess_res, NULL);
2030         }
2031         EXIT;
2032 }
2033 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
2034
2035 /**
2036  * Try to grant all waiting locks on a resource.
2037  *
2038  * Calls ldlm_reprocess_queue on converting and waiting queues.
2039  *
2040  * Typically called after some resource locks are cancelled to see
2041  * if anything could be granted as a result of the cancellation.
2042  */
2043 void ldlm_reprocess_all(struct ldlm_resource *res)
2044 {
2045         struct list_head rpc_list;
2046 #ifdef HAVE_SERVER_SUPPORT
2047         int rc;
2048         ENTRY;
2049
2050         INIT_LIST_HEAD(&rpc_list);
2051         /* Local lock trees don't get reprocessed. */
2052         if (ns_is_client(ldlm_res_to_ns(res))) {
2053                 EXIT;
2054                 return;
2055         }
2056
2057 restart:
2058         lock_res(res);
2059         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
2060         if (rc == LDLM_ITER_CONTINUE)
2061                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
2062         unlock_res(res);
2063
2064         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
2065                                LDLM_WORK_CP_AST);
2066         if (rc == -ERESTART) {
2067                 LASSERT(list_empty(&rpc_list));
2068                 goto restart;
2069         }
2070 #else
2071         ENTRY;
2072
2073         INIT_LIST_HEAD(&rpc_list);
2074         if (!ns_is_client(ldlm_res_to_ns(res))) {
2075                 CERROR("This is client-side-only module, cannot handle "
2076                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2077                 LBUG();
2078         }
2079 #endif
2080         EXIT;
2081 }
2082 EXPORT_SYMBOL(ldlm_reprocess_all);
2083
2084 /**
2085  * Helper function to call blocking AST for LDLM lock \a lock in a
2086  * "cancelling" mode.
2087  */
2088 void ldlm_cancel_callback(struct ldlm_lock *lock)
2089 {
2090         check_res_locked(lock->l_resource);
2091         if (!ldlm_is_cancel(lock)) {
2092                 ldlm_set_cancel(lock);
2093                 if (lock->l_blocking_ast) {
2094                         unlock_res_and_lock(lock);
2095                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
2096                                              LDLM_CB_CANCELING);
2097                         lock_res_and_lock(lock);
2098                 } else {
2099                         LDLM_DEBUG(lock, "no blocking ast");
2100                 }
2101         }
2102         ldlm_set_bl_done(lock);
2103 }
2104
2105 /**
2106  * Remove skiplist-enabled LDLM lock \a req from granted list
2107  */
2108 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
2109 {
2110         if (req->l_resource->lr_type != LDLM_PLAIN &&
2111             req->l_resource->lr_type != LDLM_IBITS)
2112                 return;
2113
2114         list_del_init(&req->l_sl_policy);
2115         list_del_init(&req->l_sl_mode);
2116 }
2117
2118 /**
2119  * Attempts to cancel LDLM lock \a lock that has no reader/writer references.
2120  */
2121 void ldlm_lock_cancel(struct ldlm_lock *lock)
2122 {
2123         struct ldlm_resource *res;
2124         struct ldlm_namespace *ns;
2125         ENTRY;
2126
2127         lock_res_and_lock(lock);
2128
2129         res = lock->l_resource;
2130         ns  = ldlm_res_to_ns(res);
2131
2132         /* Please do not, no matter how tempting, remove this LBUG without
2133          * talking to me first. -phik */
2134         if (lock->l_readers || lock->l_writers) {
2135                 LDLM_ERROR(lock, "lock still has references");
2136                 LBUG();
2137         }
2138
2139         if (ldlm_is_waited(lock))
2140                 ldlm_del_waiting_lock(lock);
2141
2142         /* Releases cancel callback. */
2143         ldlm_cancel_callback(lock);
2144
2145         /* Yes, second time, just in case it was added again while we were
2146          * running with no res lock in ldlm_cancel_callback */
2147         if (ldlm_is_waited(lock))
2148                 ldlm_del_waiting_lock(lock);
2149
2150         ldlm_resource_unlink_lock(lock);
2151         ldlm_lock_destroy_nolock(lock);
2152
2153         if (lock->l_granted_mode == lock->l_req_mode)
2154                 ldlm_pool_del(&ns->ns_pool, lock);
2155
2156         /* Make sure we will not be called again for same lock what is possible
2157          * if not to zero out lock->l_granted_mode */
2158         lock->l_granted_mode = LCK_MINMODE;
2159         unlock_res_and_lock(lock);
2160
2161         EXIT;
2162 }
2163 EXPORT_SYMBOL(ldlm_lock_cancel);
2164
2165 /**
2166  * Set opaque data into the lock that only makes sense to upper layer.
2167  */
2168 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
2169 {
2170         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2171         int rc = -EINVAL;
2172         ENTRY;
2173
2174         if (lock) {
2175                 if (lock->l_ast_data == NULL)
2176                         lock->l_ast_data = data;
2177                 if (lock->l_ast_data == data)
2178                         rc = 0;
2179                 LDLM_LOCK_PUT(lock);
2180         }
2181         RETURN(rc);
2182 }
2183 EXPORT_SYMBOL(ldlm_lock_set_data);
2184
2185 struct export_cl_data {
2186         struct obd_export       *ecl_exp;
2187         int                     ecl_loop;
2188 };
2189
2190 /**
2191  * Iterator function for ldlm_cancel_locks_for_export.
2192  * Cancels passed locks.
2193  */
2194 int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2195                                     struct hlist_node *hnode, void *data)
2196
2197 {
2198         struct export_cl_data   *ecl = (struct export_cl_data *)data;
2199         struct obd_export       *exp  = ecl->ecl_exp;
2200         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
2201         struct ldlm_resource *res;
2202
2203         res = ldlm_resource_getref(lock->l_resource);
2204         LDLM_LOCK_GET(lock);
2205
2206         LDLM_DEBUG(lock, "export %p", exp);
2207         ldlm_res_lvbo_update(res, NULL, 1);
2208         ldlm_lock_cancel(lock);
2209         ldlm_reprocess_all(res);
2210         ldlm_resource_putref(res);
2211         LDLM_LOCK_RELEASE(lock);
2212
2213         ecl->ecl_loop++;
2214         if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
2215                 CDEBUG(D_INFO,
2216                        "Cancel lock %p for export %p (loop %d), still have "
2217                        "%d locks left on hash table.\n",
2218                        lock, exp, ecl->ecl_loop,
2219                        atomic_read(&hs->hs_count));
2220         }
2221
2222         return 0;
2223 }
2224
2225 /**
2226  * Cancel all locks for given export.
2227  *
2228  * Typically called on client disconnection/eviction
2229  */
2230 void ldlm_cancel_locks_for_export(struct obd_export *exp)
2231 {
2232         struct export_cl_data   ecl = {
2233                 .ecl_exp        = exp,
2234                 .ecl_loop       = 0,
2235         };
2236
2237         cfs_hash_for_each_empty(exp->exp_lock_hash,
2238                                 ldlm_cancel_locks_for_export_cb, &ecl);
2239 }
2240
2241 /**
2242  * Downgrade an exclusive lock.
2243  *
2244  * A fast variant of ldlm_lock_convert for convertion of exclusive
2245  * locks. The convertion is always successful.
2246  * Used by Commit on Sharing (COS) code.
2247  *
2248  * \param lock A lock to convert
2249  * \param new_mode new lock mode
2250  */
2251 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
2252 {
2253         ENTRY;
2254
2255         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
2256         LASSERT(new_mode == LCK_COS);
2257
2258         lock_res_and_lock(lock);
2259         ldlm_resource_unlink_lock(lock);
2260         /*
2261          * Remove the lock from pool as it will be added again in
2262          * ldlm_grant_lock() called below.
2263          */
2264         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
2265
2266         lock->l_req_mode = new_mode;
2267         ldlm_grant_lock(lock, NULL);
2268         unlock_res_and_lock(lock);
2269         ldlm_reprocess_all(lock->l_resource);
2270
2271         EXIT;
2272 }
2273 EXPORT_SYMBOL(ldlm_lock_downgrade);
2274
2275 /**
2276  * Attempt to convert already granted lock to a different mode.
2277  *
2278  * While lock conversion is not currently used, future client-side
2279  * optimizations could take advantage of it to avoid discarding cached
2280  * pages on a file.
2281  */
2282 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
2283                                         __u32 *flags)
2284 {
2285         struct list_head rpc_list;
2286         struct ldlm_resource *res;
2287         struct ldlm_namespace *ns;
2288         int granted = 0;
2289 #ifdef HAVE_SERVER_SUPPORT
2290         int old_mode;
2291         struct sl_insert_point prev;
2292 #endif
2293         struct ldlm_interval *node;
2294         ENTRY;
2295
2296         INIT_LIST_HEAD(&rpc_list);
2297         /* Just return if mode is unchanged. */
2298         if (new_mode == lock->l_granted_mode) {
2299                 *flags |= LDLM_FL_BLOCK_GRANTED;
2300                 RETURN(lock->l_resource);
2301         }
2302
2303         /* I can't check the type of lock here because the bitlock of lock
2304          * is not held here, so do the allocation blindly. -jay */
2305         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
2306         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
2307                 RETURN(NULL);
2308
2309         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
2310                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
2311
2312         lock_res_and_lock(lock);
2313
2314         res = lock->l_resource;
2315         ns  = ldlm_res_to_ns(res);
2316
2317 #ifdef HAVE_SERVER_SUPPORT
2318         old_mode = lock->l_req_mode;
2319 #endif
2320         lock->l_req_mode = new_mode;
2321         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
2322 #ifdef HAVE_SERVER_SUPPORT
2323                 /* remember the lock position where the lock might be
2324                  * added back to the granted list later and also
2325                  * remember the join mode for skiplist fixing. */
2326                 prev.res_link = lock->l_res_link.prev;
2327                 prev.mode_link = lock->l_sl_mode.prev;
2328                 prev.policy_link = lock->l_sl_policy.prev;
2329 #endif
2330                 ldlm_resource_unlink_lock(lock);
2331         } else {
2332                 ldlm_resource_unlink_lock(lock);
2333                 if (res->lr_type == LDLM_EXTENT) {
2334                         /* FIXME: ugly code, I have to attach the lock to a
2335                          * interval node again since perhaps it will be granted
2336                          * soon */
2337                         INIT_LIST_HEAD(&node->li_group);
2338                         ldlm_interval_attach(node, lock);
2339                         node = NULL;
2340                 }
2341         }
2342
2343         /*
2344          * Remove old lock from the pool before adding the lock with new
2345          * mode below in ->policy()
2346          */
2347         ldlm_pool_del(&ns->ns_pool, lock);
2348
2349         /* If this is a local resource, put it on the appropriate list. */
2350         if (ns_is_client(ldlm_res_to_ns(res))) {
2351                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
2352                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
2353                 } else {
2354                         /* This should never happen, because of the way the
2355                          * server handles conversions. */
2356                         LDLM_ERROR(lock, "Erroneous flags %x on local lock\n",
2357                                    *flags);
2358                         LBUG();
2359
2360                         ldlm_grant_lock(lock, &rpc_list);
2361                         granted = 1;
2362                         /* FIXME: completion handling not with lr_lock held ! */
2363                         if (lock->l_completion_ast)
2364                                 lock->l_completion_ast(lock, 0, NULL);
2365                 }
2366 #ifdef HAVE_SERVER_SUPPORT
2367         } else {
2368                 int rc;
2369                 ldlm_error_t err;
2370                 __u64 pflags = 0;
2371                 ldlm_processing_policy policy;
2372                 policy = ldlm_processing_policy_table[res->lr_type];
2373                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
2374                 if (rc == LDLM_ITER_STOP) {
2375                         lock->l_req_mode = old_mode;
2376                         if (res->lr_type == LDLM_EXTENT)
2377                                 ldlm_extent_add_lock(res, lock);
2378                         else
2379                                 ldlm_granted_list_add_lock(lock, &prev);
2380
2381                         res = NULL;
2382                 } else {
2383                         *flags |= LDLM_FL_BLOCK_GRANTED;
2384                         granted = 1;
2385                 }
2386         }
2387 #else
2388         } else {
2389                 CERROR("This is client-side-only module, cannot handle "
2390                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2391                 LBUG();
2392         }
2393 #endif
2394         unlock_res_and_lock(lock);
2395
2396         if (granted)
2397                 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2398         if (node)
2399                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2400         RETURN(res);
2401 }
2402 EXPORT_SYMBOL(ldlm_lock_convert);
2403
2404 /**
2405  * Print lock with lock handle \a lockh description into debug log.
2406  *
2407  * Used when printing all locks on a resource for debug purposes.
2408  */
2409 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2410 {
2411         struct ldlm_lock *lock;
2412
2413         if (!((libcfs_debug | D_ERROR) & level))
2414                 return;
2415
2416         lock = ldlm_handle2lock(lockh);
2417         if (lock == NULL)
2418                 return;
2419
2420         LDLM_DEBUG_LIMIT(level, lock, "###");
2421
2422         LDLM_LOCK_PUT(lock);
2423 }
2424 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2425
2426 /**
2427  * Print lock information with custom message into debug log.
2428  * Helper function.
2429  */
2430 void _ldlm_lock_debug(struct ldlm_lock *lock,
2431                       struct libcfs_debug_msg_data *msgdata,
2432                       const char *fmt, ...)
2433 {
2434         va_list args;
2435         struct obd_export *exp = lock->l_export;
2436         struct ldlm_resource *resource = lock->l_resource;
2437         char *nid = "local";
2438
2439         va_start(args, fmt);
2440
2441         if (exp && exp->exp_connection) {
2442                 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2443         } else if (exp && exp->exp_obd != NULL) {
2444                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2445                 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2446         }
2447
2448         if (resource == NULL) {
2449                 libcfs_debug_vmsg2(msgdata, fmt, args,
2450                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2451                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2452                        "remote: "LPX64" expref: %d pid: %u timeout: %lu "
2453                        "lvb_type: %d\n",
2454                        lock,
2455                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2456                        lock->l_readers, lock->l_writers,
2457                        ldlm_lockname[lock->l_granted_mode],
2458                        ldlm_lockname[lock->l_req_mode],
2459                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2460                        exp ? atomic_read(&exp->exp_refcount) : -99,
2461                        lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2462                 va_end(args);
2463                 return;
2464         }
2465
2466         switch (resource->lr_type) {
2467         case LDLM_EXTENT:
2468                 libcfs_debug_vmsg2(msgdata, fmt, args,
2469                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2470                         "res: "DLDLMRES" rrc: %d type: %s ["LPU64"->"LPU64"] "
2471                         "(req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote: "
2472                         LPX64" expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2473                         ldlm_lock_to_ns_name(lock), lock,
2474                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2475                         lock->l_readers, lock->l_writers,
2476                         ldlm_lockname[lock->l_granted_mode],
2477                         ldlm_lockname[lock->l_req_mode],
2478                         PLDLMRES(resource),
2479                         atomic_read(&resource->lr_refcount),
2480                         ldlm_typename[resource->lr_type],
2481                         lock->l_policy_data.l_extent.start,
2482                         lock->l_policy_data.l_extent.end,
2483                         lock->l_req_extent.start, lock->l_req_extent.end,
2484                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2485                         exp ? atomic_read(&exp->exp_refcount) : -99,
2486                         lock->l_pid, lock->l_callback_timeout,
2487                         lock->l_lvb_type);
2488                 break;
2489
2490         case LDLM_FLOCK:
2491                 libcfs_debug_vmsg2(msgdata, fmt, args,
2492                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2493                         "res: "DLDLMRES" rrc: %d type: %s pid: %d "
2494                         "["LPU64"->"LPU64"] flags: "LPX64" nid: %s "
2495                         "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
2496                         ldlm_lock_to_ns_name(lock), lock,
2497                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2498                         lock->l_readers, lock->l_writers,
2499                         ldlm_lockname[lock->l_granted_mode],
2500                         ldlm_lockname[lock->l_req_mode],
2501                         PLDLMRES(resource),
2502                         atomic_read(&resource->lr_refcount),
2503                         ldlm_typename[resource->lr_type],
2504                         lock->l_policy_data.l_flock.pid,
2505                         lock->l_policy_data.l_flock.start,
2506                         lock->l_policy_data.l_flock.end,
2507                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2508                         exp ? atomic_read(&exp->exp_refcount) : -99,
2509                         lock->l_pid, lock->l_callback_timeout);
2510                 break;
2511
2512         case LDLM_IBITS:
2513                 libcfs_debug_vmsg2(msgdata, fmt, args,
2514                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2515                         "res: "DLDLMRES" bits "LPX64" rrc: %d type: %s "
2516                         "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2517                         "pid: %u timeout: %lu lvb_type: %d\n",
2518                         ldlm_lock_to_ns_name(lock),
2519                         lock, lock->l_handle.h_cookie,
2520                         atomic_read(&lock->l_refc),
2521                         lock->l_readers, lock->l_writers,
2522                         ldlm_lockname[lock->l_granted_mode],
2523                         ldlm_lockname[lock->l_req_mode],
2524                         PLDLMRES(resource),
2525                         lock->l_policy_data.l_inodebits.bits,
2526                         atomic_read(&resource->lr_refcount),
2527                         ldlm_typename[resource->lr_type],
2528                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2529                         exp ? atomic_read(&exp->exp_refcount) : -99,
2530                         lock->l_pid, lock->l_callback_timeout,
2531                         lock->l_lvb_type);
2532                 break;
2533
2534         default:
2535                 libcfs_debug_vmsg2(msgdata, fmt, args,
2536                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2537                         "res: "DLDLMRES" rrc: %d type: %s flags: "LPX64" "
2538                         "nid: %s remote: "LPX64" expref: %d pid: %u "
2539                         "timeout: %lu lvb_type: %d\n",
2540                         ldlm_lock_to_ns_name(lock),
2541                         lock, lock->l_handle.h_cookie,
2542                         atomic_read(&lock->l_refc),
2543                         lock->l_readers, lock->l_writers,
2544                         ldlm_lockname[lock->l_granted_mode],
2545                         ldlm_lockname[lock->l_req_mode],
2546                         PLDLMRES(resource),
2547                         atomic_read(&resource->lr_refcount),
2548                         ldlm_typename[resource->lr_type],
2549                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2550                         exp ? atomic_read(&exp->exp_refcount) : -99,
2551                         lock->l_pid, lock->l_callback_timeout,
2552                         lock->l_lvb_type);
2553                 break;
2554         }
2555         va_end(args);
2556 }
2557 EXPORT_SYMBOL(_ldlm_lock_debug);