Whamcloud - gitweb
LU-3031 ldlm: disconnect speedup
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #include <libcfs/libcfs.h>
45 #include <obd_class.h>
46 #include "ldlm_internal.h"
47
48 /* lock types */
49 char *ldlm_lockname[] = {
50         [0] = "--",
51         [LCK_EX] = "EX",
52         [LCK_PW] = "PW",
53         [LCK_PR] = "PR",
54         [LCK_CW] = "CW",
55         [LCK_CR] = "CR",
56         [LCK_NL] = "NL",
57         [LCK_GROUP] = "GROUP",
58         [LCK_COS] = "COS"
59 };
60 EXPORT_SYMBOL(ldlm_lockname);
61
62 char *ldlm_typename[] = {
63         [LDLM_PLAIN] = "PLN",
64         [LDLM_EXTENT] = "EXT",
65         [LDLM_FLOCK] = "FLK",
66         [LDLM_IBITS] = "IBT",
67 };
68
69 static ldlm_policy_wire_to_local_t ldlm_policy_wire_to_local[] = {
70         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_wire_to_local,
71         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local,
72         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_wire_to_local,
73         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_wire_to_local,
74 };
75
76 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
77         [LDLM_PLAIN - LDLM_MIN_TYPE]  = ldlm_plain_policy_local_to_wire,
78         [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_local_to_wire,
79         [LDLM_FLOCK - LDLM_MIN_TYPE]  = ldlm_flock_policy_local_to_wire,
80         [LDLM_IBITS - LDLM_MIN_TYPE]  = ldlm_ibits_policy_local_to_wire,
81 };
82
83 /**
84  * Converts lock policy from local format to on the wire lock_desc format
85  */
86 void ldlm_convert_policy_to_wire(ldlm_type_t type,
87                                  const ldlm_policy_data_t *lpolicy,
88                                  ldlm_wire_policy_data_t *wpolicy)
89 {
90         ldlm_policy_local_to_wire_t convert;
91
92         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
93
94         convert(lpolicy, wpolicy);
95 }
96
97 /**
98  * Converts lock policy from on the wire lock_desc format to local format
99  */
100 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
101                                   const ldlm_wire_policy_data_t *wpolicy,
102                                   ldlm_policy_data_t *lpolicy)
103 {
104         ldlm_policy_wire_to_local_t convert;
105
106         convert = ldlm_policy_wire_to_local[type - LDLM_MIN_TYPE];
107
108         convert(wpolicy, lpolicy);
109 }
110
111 char *ldlm_it2str(int it)
112 {
113         switch (it) {
114         case IT_OPEN:
115                 return "open";
116         case IT_CREAT:
117                 return "creat";
118         case (IT_OPEN | IT_CREAT):
119                 return "open|creat";
120         case IT_READDIR:
121                 return "readdir";
122         case IT_GETATTR:
123                 return "getattr";
124         case IT_LOOKUP:
125                 return "lookup";
126         case IT_UNLINK:
127                 return "unlink";
128         case IT_GETXATTR:
129                 return "getxattr";
130         case IT_LAYOUT:
131                 return "layout";
132         default:
133                 CERROR("Unknown intent %d\n", it);
134                 return "UNKNOWN";
135         }
136 }
137 EXPORT_SYMBOL(ldlm_it2str);
138
139 extern struct kmem_cache *ldlm_lock_slab;
140
141 #ifdef HAVE_SERVER_SUPPORT
142 static ldlm_processing_policy ldlm_processing_policy_table[] = {
143         [LDLM_PLAIN]    = ldlm_process_plain_lock,
144         [LDLM_EXTENT]   = ldlm_process_extent_lock,
145         [LDLM_FLOCK]    = ldlm_process_flock_lock,
146         [LDLM_IBITS]    = ldlm_process_inodebits_lock,
147 };
148
149 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
150 {
151         return ldlm_processing_policy_table[res->lr_type];
152 }
153 EXPORT_SYMBOL(ldlm_get_processing_policy);
154 #endif /* HAVE_SERVER_SUPPORT */
155
156 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
157 {
158         ns->ns_policy = arg;
159 }
160 EXPORT_SYMBOL(ldlm_register_intent);
161
162 /*
163  * REFCOUNTED LOCK OBJECTS
164  */
165
166
167 /**
168  * Get a reference on a lock.
169  *
170  * Lock refcounts, during creation:
171  *   - one special one for allocation, dec'd only once in destroy
172  *   - one for being a lock that's in-use
173  *   - one for the addref associated with a new lock
174  */
175 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
176 {
177         atomic_inc(&lock->l_refc);
178         return lock;
179 }
180 EXPORT_SYMBOL(ldlm_lock_get);
181
182 /**
183  * Release lock reference.
184  *
185  * Also frees the lock if it was last reference.
186  */
187 void ldlm_lock_put(struct ldlm_lock *lock)
188 {
189         ENTRY;
190
191         LASSERT(lock->l_resource != LP_POISON);
192         LASSERT(atomic_read(&lock->l_refc) > 0);
193         if (atomic_dec_and_test(&lock->l_refc)) {
194                 struct ldlm_resource *res;
195
196                 LDLM_DEBUG(lock,
197                            "final lock_put on destroyed lock, freeing it.");
198
199                 res = lock->l_resource;
200                 LASSERT(ldlm_is_destroyed(lock));
201                 LASSERT(list_empty(&lock->l_exp_list));
202                 LASSERT(list_empty(&lock->l_res_link));
203                 LASSERT(list_empty(&lock->l_pending_chain));
204
205                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
206                                      LDLM_NSS_LOCKS);
207                 lu_ref_del(&res->lr_reference, "lock", lock);
208                 ldlm_resource_putref(res);
209                 lock->l_resource = NULL;
210                 if (lock->l_export) {
211                         class_export_lock_put(lock->l_export, lock);
212                         lock->l_export = NULL;
213                 }
214
215                 if (lock->l_lvb_data != NULL)
216                         OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
217
218                 ldlm_interval_free(ldlm_interval_detach(lock));
219                 lu_ref_fini(&lock->l_reference);
220                 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
221         }
222
223         EXIT;
224 }
225 EXPORT_SYMBOL(ldlm_lock_put);
226
227 /**
228  * Removes LDLM lock \a lock from LRU. Assumes LRU is already locked.
229  */
230 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
231 {
232         int rc = 0;
233         if (!list_empty(&lock->l_lru)) {
234                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
235
236                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
237                 list_del_init(&lock->l_lru);
238                 LASSERT(ns->ns_nr_unused > 0);
239                 ns->ns_nr_unused--;
240                 rc = 1;
241         }
242         return rc;
243 }
244
245 /**
246  * Removes LDLM lock \a lock from LRU. Obtains the LRU lock first.
247  *
248  * If \a last_use is non-zero, it will remove the lock from LRU only if
249  * it matches lock's l_last_used.
250  *
251  * \retval 0 if \a last_use is set, the lock is not in LRU list or \a last_use
252  *           doesn't match lock's l_last_used;
253  *           otherwise, the lock hasn't been in the LRU list.
254  * \retval 1 the lock was in LRU list and removed.
255  */
256 int ldlm_lock_remove_from_lru_check(struct ldlm_lock *lock, cfs_time_t last_use)
257 {
258         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
259         int rc = 0;
260
261         ENTRY;
262         if (ldlm_is_ns_srv(lock)) {
263                 LASSERT(list_empty(&lock->l_lru));
264                 RETURN(0);
265         }
266
267         spin_lock(&ns->ns_lock);
268         if (last_use == 0 || last_use == lock->l_last_used)
269                 rc = ldlm_lock_remove_from_lru_nolock(lock);
270         spin_unlock(&ns->ns_lock);
271
272         RETURN(rc);
273 }
274
275 /**
276  * Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked.
277  */
278 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
279 {
280         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
281
282         lock->l_last_used = cfs_time_current();
283         LASSERT(list_empty(&lock->l_lru));
284         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
285         list_add_tail(&lock->l_lru, &ns->ns_unused_list);
286         ldlm_clear_skipped(lock);
287         LASSERT(ns->ns_nr_unused >= 0);
288         ns->ns_nr_unused++;
289 }
290
291 /**
292  * Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks
293  * first.
294  */
295 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
296 {
297         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
298
299         ENTRY;
300         spin_lock(&ns->ns_lock);
301         ldlm_lock_add_to_lru_nolock(lock);
302         spin_unlock(&ns->ns_lock);
303         EXIT;
304 }
305
306 /**
307  * Moves LDLM lock \a lock that is already in namespace LRU to the tail of
308  * the LRU. Performs necessary LRU locking
309  */
310 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
311 {
312         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
313
314         ENTRY;
315         if (ldlm_is_ns_srv(lock)) {
316                 LASSERT(list_empty(&lock->l_lru));
317                 EXIT;
318                 return;
319         }
320
321         spin_lock(&ns->ns_lock);
322         if (!list_empty(&lock->l_lru)) {
323                 ldlm_lock_remove_from_lru_nolock(lock);
324                 ldlm_lock_add_to_lru_nolock(lock);
325         }
326         spin_unlock(&ns->ns_lock);
327         EXIT;
328 }
329
330 /**
331  * Helper to destroy a locked lock.
332  *
333  * Used by ldlm_lock_destroy and ldlm_lock_destroy_nolock
334  * Must be called with l_lock and lr_lock held.
335  *
336  * Does not actually free the lock data, but rather marks the lock as
337  * destroyed by setting l_destroyed field in the lock to 1.  Destroys a
338  * handle->lock association too, so that the lock can no longer be found
339  * and removes the lock from LRU list.  Actual lock freeing occurs when
340  * last lock reference goes away.
341  *
342  * Original comment (of some historical value):
343  * This used to have a 'strict' flag, which recovery would use to mark an
344  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
345  * shall explain why it's gone: with the new hash table scheme, once you call
346  * ldlm_lock_destroy, you can never drop your final references on this lock.
347  * Because it's not in the hash table anymore.  -phil
348  */
349 static int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
350 {
351         ENTRY;
352
353         if (lock->l_readers || lock->l_writers) {
354                 LDLM_ERROR(lock, "lock still has references");
355                 LBUG();
356         }
357
358         if (!list_empty(&lock->l_res_link)) {
359                 LDLM_ERROR(lock, "lock still on resource");
360                 LBUG();
361         }
362
363         if (ldlm_is_destroyed(lock)) {
364                 LASSERT(list_empty(&lock->l_lru));
365                 EXIT;
366                 return 0;
367         }
368         ldlm_set_destroyed(lock);
369
370         if (lock->l_export && lock->l_export->exp_lock_hash) {
371                 /* NB: it's safe to call cfs_hash_del() even lock isn't
372                  * in exp_lock_hash. */
373                 /* In the function below, .hs_keycmp resolves to
374                  * ldlm_export_lock_keycmp() */
375                 /* coverity[overrun-buffer-val] */
376                 cfs_hash_del(lock->l_export->exp_lock_hash,
377                              &lock->l_remote_handle, &lock->l_exp_hash);
378         }
379
380         ldlm_lock_remove_from_lru(lock);
381         class_handle_unhash(&lock->l_handle);
382
383 #if 0
384         /* Wake anyone waiting for this lock */
385         /* FIXME: I should probably add yet another flag, instead of using
386          * l_export to only call this on clients */
387         if (lock->l_export)
388                 class_export_put(lock->l_export);
389         lock->l_export = NULL;
390         if (lock->l_export && lock->l_completion_ast)
391                 lock->l_completion_ast(lock, 0);
392 #endif
393         EXIT;
394         return 1;
395 }
396
397 /**
398  * Destroys a LDLM lock \a lock. Performs necessary locking first.
399  */
400 void ldlm_lock_destroy(struct ldlm_lock *lock)
401 {
402         int first;
403         ENTRY;
404         lock_res_and_lock(lock);
405         first = ldlm_lock_destroy_internal(lock);
406         unlock_res_and_lock(lock);
407
408         /* drop reference from hashtable only for first destroy */
409         if (first) {
410                 lu_ref_del(&lock->l_reference, "hash", lock);
411                 LDLM_LOCK_RELEASE(lock);
412         }
413         EXIT;
414 }
415
416 /**
417  * Destroys a LDLM lock \a lock that is already locked.
418  */
419 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
420 {
421         int first;
422         ENTRY;
423         first = ldlm_lock_destroy_internal(lock);
424         /* drop reference from hashtable only for first destroy */
425         if (first) {
426                 lu_ref_del(&lock->l_reference, "hash", lock);
427                 LDLM_LOCK_RELEASE(lock);
428         }
429         EXIT;
430 }
431
432 /* this is called by portals_handle2object with the handle lock taken */
433 static void lock_handle_addref(void *lock)
434 {
435         LDLM_LOCK_GET((struct ldlm_lock *)lock);
436 }
437
438 static void lock_handle_free(void *lock, int size)
439 {
440         LASSERT(size == sizeof(struct ldlm_lock));
441         OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
442 }
443
444 static struct portals_handle_ops lock_handle_ops = {
445         .hop_addref = lock_handle_addref,
446         .hop_free   = lock_handle_free,
447 };
448
449 /**
450  *
451  * Allocate and initialize new lock structure.
452  *
453  * usage: pass in a resource on which you have done ldlm_resource_get
454  *        new lock will take over the refcount.
455  * returns: lock with refcount 2 - one for current caller and one for remote
456  */
457 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
458 {
459         struct ldlm_lock *lock;
460         ENTRY;
461
462         if (resource == NULL)
463                 LBUG();
464
465         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, GFP_NOFS);
466         if (lock == NULL)
467                 RETURN(NULL);
468
469         spin_lock_init(&lock->l_lock);
470         lock->l_resource = resource;
471         lu_ref_add(&resource->lr_reference, "lock", lock);
472
473         atomic_set(&lock->l_refc, 2);
474         INIT_LIST_HEAD(&lock->l_res_link);
475         INIT_LIST_HEAD(&lock->l_lru);
476         INIT_LIST_HEAD(&lock->l_pending_chain);
477         INIT_LIST_HEAD(&lock->l_bl_ast);
478         INIT_LIST_HEAD(&lock->l_cp_ast);
479         INIT_LIST_HEAD(&lock->l_rk_ast);
480         init_waitqueue_head(&lock->l_waitq);
481         lock->l_blocking_lock = NULL;
482         INIT_LIST_HEAD(&lock->l_sl_mode);
483         INIT_LIST_HEAD(&lock->l_sl_policy);
484         INIT_HLIST_NODE(&lock->l_exp_hash);
485         INIT_HLIST_NODE(&lock->l_exp_flock_hash);
486
487         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
488                              LDLM_NSS_LOCKS);
489         INIT_LIST_HEAD(&lock->l_handle.h_link);
490         class_handle_hash(&lock->l_handle, &lock_handle_ops);
491
492         lu_ref_init(&lock->l_reference);
493         lu_ref_add(&lock->l_reference, "hash", lock);
494         lock->l_callback_timeout = 0;
495
496 #if LUSTRE_TRACKS_LOCK_EXP_REFS
497         INIT_LIST_HEAD(&lock->l_exp_refs_link);
498         lock->l_exp_refs_nr = 0;
499         lock->l_exp_refs_target = NULL;
500 #endif
501         INIT_LIST_HEAD(&lock->l_exp_list);
502
503         RETURN(lock);
504 }
505
506 /**
507  * Moves LDLM lock \a lock to another resource.
508  * This is used on client when server returns some other lock than requested
509  * (typically as a result of intent operation)
510  */
511 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
512                               const struct ldlm_res_id *new_resid)
513 {
514         struct ldlm_resource *oldres = lock->l_resource;
515         struct ldlm_resource *newres;
516         int type;
517         ENTRY;
518
519         LASSERT(ns_is_client(ns));
520
521         lock_res_and_lock(lock);
522         if (memcmp(new_resid, &lock->l_resource->lr_name,
523                    sizeof(lock->l_resource->lr_name)) == 0) {
524                 /* Nothing to do */
525                 unlock_res_and_lock(lock);
526                 RETURN(0);
527         }
528
529         LASSERT(new_resid->name[0] != 0);
530
531         /* This function assumes that the lock isn't on any lists */
532         LASSERT(list_empty(&lock->l_res_link));
533
534         type = oldres->lr_type;
535         unlock_res_and_lock(lock);
536
537         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
538         if (IS_ERR(newres))
539                 RETURN(PTR_ERR(newres));
540
541         lu_ref_add(&newres->lr_reference, "lock", lock);
542         /*
543          * To flip the lock from the old to the new resource, lock, oldres and
544          * newres have to be locked. Resource spin-locks are nested within
545          * lock->l_lock, and are taken in the memory address order to avoid
546          * dead-locks.
547          */
548         spin_lock(&lock->l_lock);
549         oldres = lock->l_resource;
550         if (oldres < newres) {
551                 lock_res(oldres);
552                 lock_res_nested(newres, LRT_NEW);
553         } else {
554                 lock_res(newres);
555                 lock_res_nested(oldres, LRT_NEW);
556         }
557         LASSERT(memcmp(new_resid, &oldres->lr_name,
558                        sizeof oldres->lr_name) != 0);
559         lock->l_resource = newres;
560         unlock_res(oldres);
561         unlock_res_and_lock(lock);
562
563         /* ...and the flowers are still standing! */
564         lu_ref_del(&oldres->lr_reference, "lock", lock);
565         ldlm_resource_putref(oldres);
566
567         RETURN(0);
568 }
569
570 /** \defgroup ldlm_handles LDLM HANDLES
571  * Ways to get hold of locks without any addresses.
572  * @{
573  */
574
575 /**
576  * Fills in handle for LDLM lock \a lock into supplied \a lockh
577  * Does not take any references.
578  */
579 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
580 {
581         lockh->cookie = lock->l_handle.h_cookie;
582 }
583 EXPORT_SYMBOL(ldlm_lock2handle);
584
585 /**
586  * Obtain a lock reference by handle.
587  *
588  * if \a flags: atomically get the lock and set the flags.
589  *              Return NULL if flag already set
590  */
591 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
592                                      __u64 flags)
593 {
594         struct ldlm_lock *lock;
595         ENTRY;
596
597         LASSERT(handle);
598
599         lock = class_handle2object(handle->cookie, NULL);
600         if (lock == NULL)
601                 RETURN(NULL);
602
603         /* It's unlikely but possible that someone marked the lock as
604          * destroyed after we did handle2object on it */
605         if ((flags == 0) && !ldlm_is_destroyed(lock)) {
606                 lu_ref_add(&lock->l_reference, "handle", current);
607                 RETURN(lock);
608         }
609
610         lock_res_and_lock(lock);
611
612         LASSERT(lock->l_resource != NULL);
613
614         lu_ref_add_atomic(&lock->l_reference, "handle", current);
615         if (unlikely(ldlm_is_destroyed(lock))) {
616                 unlock_res_and_lock(lock);
617                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
618                 LDLM_LOCK_PUT(lock);
619                 RETURN(NULL);
620         }
621
622         /* If we're setting flags, make sure none of them are already set. */
623         if (flags != 0) {
624                 if ((lock->l_flags & flags) != 0) {
625                         unlock_res_and_lock(lock);
626                         LDLM_LOCK_PUT(lock);
627                         RETURN(NULL);
628                 }
629
630                 lock->l_flags |= flags;
631         }
632
633         unlock_res_and_lock(lock);
634         RETURN(lock);
635 }
636 EXPORT_SYMBOL(__ldlm_handle2lock);
637 /** @} ldlm_handles */
638
639 /**
640  * Fill in "on the wire" representation for given LDLM lock into supplied
641  * lock descriptor \a desc structure.
642  */
643 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
644 {
645         ldlm_res2desc(lock->l_resource, &desc->l_resource);
646         desc->l_req_mode = lock->l_req_mode;
647         desc->l_granted_mode = lock->l_granted_mode;
648         ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
649                                     &lock->l_policy_data,
650                                     &desc->l_policy_data);
651 }
652
653 /**
654  * Add a lock to list of conflicting locks to send AST to.
655  *
656  * Only add if we have not sent a blocking AST to the lock yet.
657  */
658 static void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
659                                   struct list_head *work_list)
660 {
661         if (!ldlm_is_ast_sent(lock)) {
662                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
663                 ldlm_set_ast_sent(lock);
664                 /* If the enqueuing client said so, tell the AST recipient to
665                  * discard dirty data, rather than writing back. */
666                 if (ldlm_is_ast_discard_data(new))
667                         ldlm_set_discard_data(lock);
668                 LASSERT(list_empty(&lock->l_bl_ast));
669                 list_add(&lock->l_bl_ast, work_list);
670                 LDLM_LOCK_GET(lock);
671                 LASSERT(lock->l_blocking_lock == NULL);
672                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
673         }
674 }
675
676 /**
677  * Add a lock to list of just granted locks to send completion AST to.
678  */
679 static void ldlm_add_cp_work_item(struct ldlm_lock *lock,
680                                   struct list_head *work_list)
681 {
682         if (!ldlm_is_cp_reqd(lock)) {
683                 ldlm_set_cp_reqd(lock);
684                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
685                 LASSERT(list_empty(&lock->l_cp_ast));
686                 list_add(&lock->l_cp_ast, work_list);
687                 LDLM_LOCK_GET(lock);
688         }
689 }
690
691 /**
692  * Aggregator function to add AST work items into a list. Determines
693  * what sort of an AST work needs to be done and calls the proper
694  * adding function.
695  * Must be called with lr_lock held.
696  */
697 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
698                             struct list_head *work_list)
699 {
700         ENTRY;
701         check_res_locked(lock->l_resource);
702         if (new)
703                 ldlm_add_bl_work_item(lock, new, work_list);
704         else
705                 ldlm_add_cp_work_item(lock, work_list);
706         EXIT;
707 }
708
709 /**
710  * Add specified reader/writer reference to LDLM lock with handle \a lockh.
711  * r/w reference type is determined by \a mode
712  * Calls ldlm_lock_addref_internal.
713  */
714 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
715 {
716         struct ldlm_lock *lock;
717
718         lock = ldlm_handle2lock(lockh);
719         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
720         ldlm_lock_addref_internal(lock, mode);
721         LDLM_LOCK_PUT(lock);
722 }
723 EXPORT_SYMBOL(ldlm_lock_addref);
724
725 /**
726  * Helper function.
727  * Add specified reader/writer reference to LDLM lock \a lock.
728  * r/w reference type is determined by \a mode
729  * Removes lock from LRU if it is there.
730  * Assumes the LDLM lock is already locked.
731  */
732 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
733 {
734         ldlm_lock_remove_from_lru(lock);
735         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
736                 lock->l_readers++;
737                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
738         }
739         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
740                 lock->l_writers++;
741                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
742         }
743         LDLM_LOCK_GET(lock);
744         lu_ref_add_atomic(&lock->l_reference, "user", lock);
745         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
746 }
747
748 /**
749  * Attempts to add reader/writer reference to a lock with handle \a lockh, and
750  * fails if lock is already LDLM_FL_CBPENDING or destroyed.
751  *
752  * \retval 0 success, lock was addref-ed
753  *
754  * \retval -EAGAIN lock is being canceled.
755  */
756 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
757 {
758         struct ldlm_lock *lock;
759         int               result;
760
761         result = -EAGAIN;
762         lock = ldlm_handle2lock(lockh);
763         if (lock != NULL) {
764                 lock_res_and_lock(lock);
765                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
766                     !ldlm_is_cbpending(lock)) {
767                         ldlm_lock_addref_internal_nolock(lock, mode);
768                         result = 0;
769                 }
770                 unlock_res_and_lock(lock);
771                 LDLM_LOCK_PUT(lock);
772         }
773         return result;
774 }
775 EXPORT_SYMBOL(ldlm_lock_addref_try);
776
777 /**
778  * Add specified reader/writer reference to LDLM lock \a lock.
779  * Locks LDLM lock and calls ldlm_lock_addref_internal_nolock to do the work.
780  * Only called for local locks.
781  */
782 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
783 {
784         lock_res_and_lock(lock);
785         ldlm_lock_addref_internal_nolock(lock, mode);
786         unlock_res_and_lock(lock);
787 }
788
789 /**
790  * Removes reader/writer reference for LDLM lock \a lock.
791  * Assumes LDLM lock is already locked.
792  * only called in ldlm_flock_destroy and for local locks.
793  * Does NOT add lock to LRU if no r/w references left to accomodate flock locks
794  * that cannot be placed in LRU.
795  */
796 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
797 {
798         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
799         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
800                 LASSERT(lock->l_readers > 0);
801                 lu_ref_del(&lock->l_reference, "reader", lock);
802                 lock->l_readers--;
803         }
804         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
805                 LASSERT(lock->l_writers > 0);
806                 lu_ref_del(&lock->l_reference, "writer", lock);
807                 lock->l_writers--;
808         }
809
810         lu_ref_del(&lock->l_reference, "user", lock);
811         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
812 }
813
814 /**
815  * Removes reader/writer reference for LDLM lock \a lock.
816  * Locks LDLM lock first.
817  * If the lock is determined to be client lock on a client and r/w refcount
818  * drops to zero and the lock is not blocked, the lock is added to LRU lock
819  * on the namespace.
820  * For blocked LDLM locks if r/w count drops to zero, blocking_ast is called.
821  */
822 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
823 {
824         struct ldlm_namespace *ns;
825         ENTRY;
826
827         lock_res_and_lock(lock);
828
829         ns = ldlm_lock_to_ns(lock);
830
831         ldlm_lock_decref_internal_nolock(lock, mode);
832
833         if (ldlm_is_local(lock) &&
834             !lock->l_readers && !lock->l_writers) {
835                 /* If this is a local lock on a server namespace and this was
836                  * the last reference, cancel the lock. */
837                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
838                 ldlm_set_cbpending(lock);
839         }
840
841         if (!lock->l_readers && !lock->l_writers &&
842             (ldlm_is_cbpending(lock) || lock->l_req_mode == LCK_GROUP)) {
843                 /* If we received a blocked AST and this was the last reference,
844                  * run the callback.
845                  * Group locks are special:
846                  * They must not go in LRU, but they are not called back
847                  * like non-group locks, instead they are manually released.
848                  * They have an l_writers reference which they keep until
849                  * they are manually released, so we remove them when they have
850                  * no more reader or writer references. - LU-6368 */
851                 if (ldlm_is_ns_srv(lock) && lock->l_export)
852                         CERROR("FL_CBPENDING set on non-local lock--just a "
853                                "warning\n");
854
855                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
856
857                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
858                 ldlm_lock_remove_from_lru(lock);
859                 unlock_res_and_lock(lock);
860
861                 if (ldlm_is_fail_loc(lock))
862                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
863
864                 if (ldlm_is_atomic_cb(lock) ||
865                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
866                         ldlm_handle_bl_callback(ns, NULL, lock);
867         } else if (ns_is_client(ns) &&
868                    !lock->l_readers && !lock->l_writers &&
869                    !ldlm_is_no_lru(lock) &&
870                    !ldlm_is_bl_ast(lock)) {
871
872                 LDLM_DEBUG(lock, "add lock into lru list");
873
874                 /* If this is a client-side namespace and this was the last
875                  * reference, put it on the LRU. */
876                 ldlm_lock_add_to_lru(lock);
877                 unlock_res_and_lock(lock);
878
879                 if (ldlm_is_fail_loc(lock))
880                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
881
882                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
883                  * are not supported by the server, otherwise, it is done on
884                  * enqueue. */
885                 if (!exp_connect_cancelset(lock->l_conn_export) &&
886                     !ns_connect_lru_resize(ns))
887                         ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
888         } else {
889                 LDLM_DEBUG(lock, "do not add lock into lru list");
890                 unlock_res_and_lock(lock);
891         }
892
893         EXIT;
894 }
895
896 /**
897  * Decrease reader/writer refcount for LDLM lock with handle \a lockh
898  */
899 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
900 {
901         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
902         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
903         ldlm_lock_decref_internal(lock, mode);
904         LDLM_LOCK_PUT(lock);
905 }
906 EXPORT_SYMBOL(ldlm_lock_decref);
907
908 /**
909  * Decrease reader/writer refcount for LDLM lock with handle
910  * \a lockh and mark it for subsequent cancellation once r/w refcount
911  * drops to zero instead of putting into LRU.
912  *
913  */
914 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
915 {
916         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
917         ENTRY;
918
919         LASSERT(lock != NULL);
920
921         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
922         lock_res_and_lock(lock);
923         ldlm_set_cbpending(lock);
924         unlock_res_and_lock(lock);
925         ldlm_lock_decref_internal(lock, mode);
926         LDLM_LOCK_PUT(lock);
927 }
928 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
929
930 struct sl_insert_point {
931         struct list_head *res_link;
932         struct list_head *mode_link;
933         struct list_head *policy_link;
934 };
935
936 /**
937  * Finds a position to insert the new lock into granted lock list.
938  *
939  * Used for locks eligible for skiplist optimization.
940  *
941  * Parameters:
942  *      queue [input]:  the granted list where search acts on;
943  *      req [input]:    the lock whose position to be located;
944  *      prev [output]:  positions within 3 lists to insert @req to
945  * Return Value:
946  *      filled @prev
947  * NOTE: called by
948  *  - ldlm_grant_lock_with_skiplist
949  */
950 static void search_granted_lock(struct list_head *queue,
951                                 struct ldlm_lock *req,
952                                 struct sl_insert_point *prev)
953 {
954         struct list_head *tmp;
955         struct ldlm_lock *lock, *mode_end, *policy_end;
956         ENTRY;
957
958         list_for_each(tmp, queue) {
959                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
960
961                 mode_end = list_entry(lock->l_sl_mode.prev,
962                                           struct ldlm_lock, l_sl_mode);
963
964                 if (lock->l_req_mode != req->l_req_mode) {
965                         /* jump to last lock of mode group */
966                         tmp = &mode_end->l_res_link;
967                         continue;
968                 }
969
970                 /* suitable mode group is found */
971                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
972                         /* insert point is last lock of the mode group */
973                         prev->res_link = &mode_end->l_res_link;
974                         prev->mode_link = &mode_end->l_sl_mode;
975                         prev->policy_link = &req->l_sl_policy;
976                         EXIT;
977                         return;
978                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
979                         for (;;) {
980                                 policy_end =
981                                         list_entry(lock->l_sl_policy.prev,
982                                                        struct ldlm_lock,
983                                                        l_sl_policy);
984
985                                 if (lock->l_policy_data.l_inodebits.bits ==
986                                     req->l_policy_data.l_inodebits.bits) {
987                                         /* insert point is last lock of
988                                          * the policy group */
989                                         prev->res_link =
990                                                 &policy_end->l_res_link;
991                                         prev->mode_link =
992                                                 &policy_end->l_sl_mode;
993                                         prev->policy_link =
994                                                 &policy_end->l_sl_policy;
995                                         EXIT;
996                                         return;
997                                 }
998
999                                 if (policy_end == mode_end)
1000                                         /* done with mode group */
1001                                         break;
1002
1003                                 /* go to next policy group within mode group */
1004                                 tmp = policy_end->l_res_link.next;
1005                                 lock = list_entry(tmp, struct ldlm_lock,
1006                                                       l_res_link);
1007                         }  /* loop over policy groups within the mode group */
1008
1009                         /* insert point is last lock of the mode group,
1010                          * new policy group is started */
1011                         prev->res_link = &mode_end->l_res_link;
1012                         prev->mode_link = &mode_end->l_sl_mode;
1013                         prev->policy_link = &req->l_sl_policy;
1014                         EXIT;
1015                         return;
1016                 } else {
1017                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
1018                         LBUG();
1019                 }
1020         }
1021
1022         /* insert point is last lock on the queue,
1023          * new mode group and new policy group are started */
1024         prev->res_link = queue->prev;
1025         prev->mode_link = &req->l_sl_mode;
1026         prev->policy_link = &req->l_sl_policy;
1027         EXIT;
1028         return;
1029 }
1030
1031 /**
1032  * Add a lock into resource granted list after a position described by
1033  * \a prev.
1034  */
1035 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
1036                                        struct sl_insert_point *prev)
1037 {
1038         struct ldlm_resource *res = lock->l_resource;
1039         ENTRY;
1040
1041         check_res_locked(res);
1042
1043         ldlm_resource_dump(D_INFO, res);
1044         LDLM_DEBUG(lock, "About to add lock:");
1045
1046         if (ldlm_is_destroyed(lock)) {
1047                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
1048                 return;
1049         }
1050
1051         LASSERT(list_empty(&lock->l_res_link));
1052         LASSERT(list_empty(&lock->l_sl_mode));
1053         LASSERT(list_empty(&lock->l_sl_policy));
1054
1055         /*
1056          * lock->link == prev->link means lock is first starting the group.
1057          * Don't re-add to itself to suppress kernel warnings.
1058          */
1059         if (&lock->l_res_link != prev->res_link)
1060                 list_add(&lock->l_res_link, prev->res_link);
1061         if (&lock->l_sl_mode != prev->mode_link)
1062                 list_add(&lock->l_sl_mode, prev->mode_link);
1063         if (&lock->l_sl_policy != prev->policy_link)
1064                 list_add(&lock->l_sl_policy, prev->policy_link);
1065
1066         EXIT;
1067 }
1068
1069 /**
1070  * Add a lock to granted list on a resource maintaining skiplist
1071  * correctness.
1072  */
1073 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
1074 {
1075         struct sl_insert_point prev;
1076         ENTRY;
1077
1078         LASSERT(lock->l_req_mode == lock->l_granted_mode);
1079
1080         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
1081         ldlm_granted_list_add_lock(lock, &prev);
1082         EXIT;
1083 }
1084
1085 /**
1086  * Perform lock granting bookkeeping.
1087  *
1088  * Includes putting the lock into granted list and updating lock mode.
1089  * NOTE: called by
1090  *  - ldlm_lock_enqueue
1091  *  - ldlm_reprocess_queue
1092  *  - ldlm_lock_convert
1093  *
1094  * must be called with lr_lock held
1095  */
1096 void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
1097 {
1098         struct ldlm_resource *res = lock->l_resource;
1099         ENTRY;
1100
1101         check_res_locked(res);
1102
1103         lock->l_granted_mode = lock->l_req_mode;
1104
1105         if (work_list && lock->l_completion_ast != NULL)
1106                 ldlm_add_ast_work_item(lock, NULL, work_list);
1107
1108         /* We should not add locks to granted list in the following cases:
1109          * - this is an UNLOCK but not a real lock;
1110          * - this is a TEST lock;
1111          * - this is a F_CANCELLK lock (async flock has req_mode == 0)
1112          * - this is a deadlock (flock cannot be granted) */
1113         if (lock->l_req_mode == 0 ||
1114             lock->l_req_mode == LCK_NL ||
1115             ldlm_is_test_lock(lock) ||
1116             ldlm_is_flock_deadlock(lock))
1117                 RETURN_EXIT;
1118
1119         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1120                 ldlm_grant_lock_with_skiplist(lock);
1121         else if (res->lr_type == LDLM_EXTENT)
1122                 ldlm_extent_add_lock(res, lock);
1123         else
1124                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1125
1126         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1127         EXIT;
1128 }
1129
1130 /**
1131  * Describe the overlap between two locks.  itree_overlap_cb data.
1132  */
1133 struct lock_match_data {
1134         struct ldlm_lock    *lmd_old;
1135         struct ldlm_lock    *lmd_lock;
1136         ldlm_mode_t         *lmd_mode;
1137         ldlm_policy_data_t  *lmd_policy;
1138         __u64                lmd_flags;
1139         int                  lmd_unref;
1140 };
1141
1142 /**
1143  * Check if the given @lock meets the criteria for a match.
1144  * A reference on the lock is taken if matched.
1145  *
1146  * \param lock     test-against this lock
1147  * \param data     parameters
1148  */
1149 static int lock_matches(struct ldlm_lock *lock, struct lock_match_data *data)
1150 {
1151         ldlm_policy_data_t *lpol = &lock->l_policy_data;
1152         ldlm_mode_t match;
1153
1154         if (lock == data->lmd_old)
1155                 return INTERVAL_ITER_STOP;
1156
1157         /* Check if this lock can be matched.
1158          * Used by LU-2919(exclusive open) for open lease lock */
1159         if (ldlm_is_excl(lock))
1160                 return INTERVAL_ITER_CONT;
1161
1162         /* llite sometimes wants to match locks that will be
1163          * canceled when their users drop, but we allow it to match
1164          * if it passes in CBPENDING and the lock still has users.
1165          * this is generally only going to be used by children
1166          * whose parents already hold a lock so forward progress
1167          * can still happen. */
1168         if (ldlm_is_cbpending(lock) &&
1169             !(data->lmd_flags & LDLM_FL_CBPENDING))
1170                 return INTERVAL_ITER_CONT;
1171         if (!data->lmd_unref && ldlm_is_cbpending(lock) &&
1172             lock->l_readers == 0 && lock->l_writers == 0)
1173                 return INTERVAL_ITER_CONT;
1174
1175         if (!(lock->l_req_mode & *data->lmd_mode))
1176                 return INTERVAL_ITER_CONT;
1177         match = lock->l_req_mode;
1178
1179         switch (lock->l_resource->lr_type) {
1180         case LDLM_EXTENT:
1181                 if (lpol->l_extent.start > data->lmd_policy->l_extent.start ||
1182                     lpol->l_extent.end < data->lmd_policy->l_extent.end)
1183                         return INTERVAL_ITER_CONT;
1184
1185                 if (unlikely(match == LCK_GROUP) &&
1186                     data->lmd_policy->l_extent.gid != LDLM_GID_ANY &&
1187                     lpol->l_extent.gid != data->lmd_policy->l_extent.gid)
1188                         return INTERVAL_ITER_CONT;
1189                 break;
1190         case LDLM_IBITS:
1191                 /* We match if we have existing lock with same or wider set
1192                    of bits. */
1193                 if ((lpol->l_inodebits.bits &
1194                      data->lmd_policy->l_inodebits.bits) !=
1195                     data->lmd_policy->l_inodebits.bits)
1196                         return INTERVAL_ITER_CONT;
1197                 break;
1198         default:
1199                 ;
1200         }
1201
1202         /* We match if we have existing lock with same or wider set
1203            of bits. */
1204         if (!data->lmd_unref && LDLM_HAVE_MASK(lock, GONE))
1205                 return INTERVAL_ITER_CONT;
1206
1207         if ((data->lmd_flags & LDLM_FL_LOCAL_ONLY) &&
1208             !ldlm_is_local(lock))
1209                 return INTERVAL_ITER_CONT;
1210
1211         if (data->lmd_flags & LDLM_FL_TEST_LOCK) {
1212                 LDLM_LOCK_GET(lock);
1213                 ldlm_lock_touch_in_lru(lock);
1214         } else {
1215                 ldlm_lock_addref_internal_nolock(lock, match);
1216         }
1217
1218         *data->lmd_mode = match;
1219         data->lmd_lock = lock;
1220
1221         return INTERVAL_ITER_STOP;
1222 }
1223
1224 static unsigned int itree_overlap_cb(struct interval_node *in, void *args)
1225 {
1226         struct ldlm_interval *node = to_ldlm_interval(in);
1227         struct lock_match_data *data = args;
1228         struct ldlm_lock *lock;
1229         int rc;
1230
1231         list_for_each_entry(lock, &node->li_group, l_sl_policy) {
1232                 rc = lock_matches(lock, data);
1233                 if (rc == INTERVAL_ITER_STOP)
1234                         return INTERVAL_ITER_STOP;
1235         }
1236         return INTERVAL_ITER_CONT;
1237 }
1238
1239 /**
1240  * Search for a lock with given parameters in interval trees.
1241  *
1242  * \param res      search for a lock in this resource
1243  * \param data     parameters
1244  *
1245  * \retval a referenced lock or NULL.
1246  */
1247 static struct ldlm_lock *search_itree(struct ldlm_resource *res,
1248                                       struct lock_match_data *data)
1249 {
1250         struct interval_node_extent ext = {
1251                 .start     = data->lmd_policy->l_extent.start,
1252                 .end       = data->lmd_policy->l_extent.end
1253         };
1254         int idx;
1255
1256         for (idx = 0; idx < LCK_MODE_NUM; idx++) {
1257                 struct ldlm_interval_tree *tree = &res->lr_itree[idx];
1258
1259                 if (tree->lit_root == NULL)
1260                         continue;
1261
1262                 if (!(tree->lit_mode & *data->lmd_mode))
1263                         continue;
1264
1265                 interval_search(tree->lit_root, &ext,
1266                                 itree_overlap_cb, data);
1267         }
1268         return data->lmd_lock;
1269 }
1270
1271
1272 /**
1273  * Search for a lock with given properties in a queue.
1274  *
1275  * \param queue    search for a lock in this queue
1276  * \param data     parameters
1277  *
1278  * \retval a referenced lock or NULL.
1279  */
1280 static struct ldlm_lock *search_queue(struct list_head *queue,
1281                                       struct lock_match_data *data)
1282 {
1283         struct ldlm_lock *lock;
1284         int rc;
1285
1286         list_for_each_entry(lock, queue, l_res_link) {
1287                 rc = lock_matches(lock, data);
1288                 if (rc == INTERVAL_ITER_STOP)
1289                         return data->lmd_lock;
1290         }
1291         return NULL;
1292 }
1293
1294 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1295 {
1296         if ((lock->l_flags & LDLM_FL_FAIL_NOTIFIED) == 0) {
1297                 lock->l_flags |= LDLM_FL_FAIL_NOTIFIED;
1298                 wake_up_all(&lock->l_waitq);
1299         }
1300 }
1301 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1302
1303 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1304 {
1305         lock_res_and_lock(lock);
1306         ldlm_lock_fail_match_locked(lock);
1307         unlock_res_and_lock(lock);
1308 }
1309
1310 /**
1311  * Mark lock as "matchable" by OST.
1312  *
1313  * Used to prevent certain races in LOV/OSC where the lock is granted, but LVB
1314  * is not yet valid.
1315  * Assumes LDLM lock is already locked.
1316  */
1317 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1318 {
1319         ldlm_set_lvb_ready(lock);
1320         wake_up_all(&lock->l_waitq);
1321 }
1322 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
1323
1324 /**
1325  * Mark lock as "matchable" by OST.
1326  * Locks the lock and then \see ldlm_lock_allow_match_locked
1327  */
1328 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1329 {
1330         lock_res_and_lock(lock);
1331         ldlm_lock_allow_match_locked(lock);
1332         unlock_res_and_lock(lock);
1333 }
1334 EXPORT_SYMBOL(ldlm_lock_allow_match);
1335
1336 /**
1337  * Attempt to find a lock with specified properties.
1338  *
1339  * Typically returns a reference to matched lock unless LDLM_FL_TEST_LOCK is
1340  * set in \a flags
1341  *
1342  * Can be called in two ways:
1343  *
1344  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1345  * for a duplicate of.
1346  *
1347  * Otherwise, all of the fields must be filled in, to match against.
1348  *
1349  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1350  *     server (ie, connh is NULL)
1351  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1352  *     list will be considered
1353  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1354  *     to be canceled can still be matched as long as they still have reader
1355  *     or writer refernces
1356  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1357  *     just tell us if we would have matched.
1358  *
1359  * \retval 1 if it finds an already-existing lock that is compatible; in this
1360  * case, lockh is filled in with a addref()ed lock
1361  *
1362  * We also check security context, and if that fails we simply return 0 (to
1363  * keep caller code unchanged), the context failure will be discovered by
1364  * caller sometime later.
1365  */
1366 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
1367                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1368                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1369                             struct lustre_handle *lockh, int unref)
1370 {
1371         struct lock_match_data data = {
1372                 .lmd_old        = NULL,
1373                 .lmd_lock       = NULL,
1374                 .lmd_mode       = &mode,
1375                 .lmd_policy     = policy,
1376                 .lmd_flags      = flags,
1377                 .lmd_unref      = unref,
1378         };
1379         struct ldlm_resource *res;
1380         struct ldlm_lock *lock;
1381         int rc = 0;
1382         ENTRY;
1383
1384         if (ns == NULL) {
1385                 data.lmd_old = ldlm_handle2lock(lockh);
1386                 LASSERT(data.lmd_old != NULL);
1387
1388                 ns = ldlm_lock_to_ns(data.lmd_old);
1389                 res_id = &data.lmd_old->l_resource->lr_name;
1390                 type = data.lmd_old->l_resource->lr_type;
1391                 *data.lmd_mode = data.lmd_old->l_req_mode;
1392         }
1393
1394         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1395         if (IS_ERR(res)) {
1396                 LASSERT(data.lmd_old == NULL);
1397                 RETURN(0);
1398         }
1399
1400         LDLM_RESOURCE_ADDREF(res);
1401         lock_res(res);
1402
1403         if (res->lr_type == LDLM_EXTENT)
1404                 lock = search_itree(res, &data);
1405         else
1406                 lock = search_queue(&res->lr_granted, &data);
1407         if (lock != NULL)
1408                 GOTO(out, rc = 1);
1409         if (flags & LDLM_FL_BLOCK_GRANTED)
1410                 GOTO(out, rc = 0);
1411         lock = search_queue(&res->lr_converting, &data);
1412         if (lock != NULL)
1413                 GOTO(out, rc = 1);
1414         lock = search_queue(&res->lr_waiting, &data);
1415         if (lock != NULL)
1416                 GOTO(out, rc = 1);
1417
1418         EXIT;
1419  out:
1420         unlock_res(res);
1421         LDLM_RESOURCE_DELREF(res);
1422         ldlm_resource_putref(res);
1423
1424         if (lock) {
1425                 ldlm_lock2handle(lock, lockh);
1426                 if ((flags & LDLM_FL_LVB_READY) &&
1427                     (!ldlm_is_lvb_ready(lock))) {
1428                         __u64 wait_flags = LDLM_FL_LVB_READY |
1429                                 LDLM_FL_DESTROYED | LDLM_FL_FAIL_NOTIFIED;
1430                         struct l_wait_info lwi;
1431                         if (lock->l_completion_ast) {
1432                                 int err = lock->l_completion_ast(lock,
1433                                                           LDLM_FL_WAIT_NOREPROC,
1434                                                                  NULL);
1435                                 if (err) {
1436                                         if (flags & LDLM_FL_TEST_LOCK)
1437                                                 LDLM_LOCK_RELEASE(lock);
1438                                         else
1439                                                 ldlm_lock_decref_internal(lock,
1440                                                                           mode);
1441                                         rc = 0;
1442                                         goto out2;
1443                                 }
1444                         }
1445
1446                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1447                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1448
1449                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1450                         l_wait_event(lock->l_waitq,
1451                                      lock->l_flags & wait_flags,
1452                                      &lwi);
1453                         if (!ldlm_is_lvb_ready(lock)) {
1454                                 if (flags & LDLM_FL_TEST_LOCK)
1455                                         LDLM_LOCK_RELEASE(lock);
1456                                 else
1457                                         ldlm_lock_decref_internal(lock, mode);
1458                                 rc = 0;
1459                         }
1460                 }
1461         }
1462  out2:
1463         if (rc) {
1464                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1465                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1466                                 res_id->name[2] : policy->l_extent.start,
1467                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1468                                 res_id->name[3] : policy->l_extent.end);
1469
1470                 /* check user's security context */
1471                 if (lock->l_conn_export &&
1472                     sptlrpc_import_check_ctx(
1473                                 class_exp2cliimp(lock->l_conn_export))) {
1474                         if (!(flags & LDLM_FL_TEST_LOCK))
1475                                 ldlm_lock_decref_internal(lock, mode);
1476                         rc = 0;
1477                 }
1478
1479                 if (flags & LDLM_FL_TEST_LOCK)
1480                         LDLM_LOCK_RELEASE(lock);
1481
1482         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1483                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1484                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1485                                   type, mode, res_id->name[0], res_id->name[1],
1486                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1487                                         res_id->name[2] :policy->l_extent.start,
1488                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1489                                         res_id->name[3] : policy->l_extent.end);
1490         }
1491         if (data.lmd_old != NULL)
1492                 LDLM_LOCK_PUT(data.lmd_old);
1493
1494         return rc ? mode : 0;
1495 }
1496 EXPORT_SYMBOL(ldlm_lock_match);
1497
1498 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1499                                         __u64 *bits)
1500 {
1501         struct ldlm_lock *lock;
1502         ldlm_mode_t mode = 0;
1503         ENTRY;
1504
1505         lock = ldlm_handle2lock(lockh);
1506         if (lock != NULL) {
1507                 lock_res_and_lock(lock);
1508                 if (LDLM_HAVE_MASK(lock, GONE))
1509                         GOTO(out, mode);
1510
1511                 if (ldlm_is_cbpending(lock) &&
1512                     lock->l_readers == 0 && lock->l_writers == 0)
1513                         GOTO(out, mode);
1514
1515                 if (bits)
1516                         *bits = lock->l_policy_data.l_inodebits.bits;
1517                 mode = lock->l_granted_mode;
1518                 ldlm_lock_addref_internal_nolock(lock, mode);
1519         }
1520
1521         EXIT;
1522
1523 out:
1524         if (lock != NULL) {
1525                 unlock_res_and_lock(lock);
1526                 LDLM_LOCK_PUT(lock);
1527         }
1528         return mode;
1529 }
1530 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1531
1532 /** The caller must guarantee that the buffer is large enough. */
1533 int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
1534                   enum req_location loc, void *data, int size)
1535 {
1536         void *lvb;
1537         ENTRY;
1538
1539         LASSERT(data != NULL);
1540         LASSERT(size >= 0);
1541
1542         switch (lock->l_lvb_type) {
1543         case LVB_T_OST:
1544                 if (size == sizeof(struct ost_lvb)) {
1545                         if (loc == RCL_CLIENT)
1546                                 lvb = req_capsule_client_swab_get(pill,
1547                                                 &RMF_DLM_LVB,
1548                                                 lustre_swab_ost_lvb);
1549                         else
1550                                 lvb = req_capsule_server_swab_get(pill,
1551                                                 &RMF_DLM_LVB,
1552                                                 lustre_swab_ost_lvb);
1553                         if (unlikely(lvb == NULL)) {
1554                                 LDLM_ERROR(lock, "no LVB");
1555                                 RETURN(-EPROTO);
1556                         }
1557
1558                         memcpy(data, lvb, size);
1559                 } else if (size == sizeof(struct ost_lvb_v1)) {
1560                         struct ost_lvb *olvb = data;
1561
1562                         if (loc == RCL_CLIENT)
1563                                 lvb = req_capsule_client_swab_get(pill,
1564                                                 &RMF_DLM_LVB,
1565                                                 lustre_swab_ost_lvb_v1);
1566                         else
1567                                 lvb = req_capsule_server_sized_swab_get(pill,
1568                                                 &RMF_DLM_LVB, size,
1569                                                 lustre_swab_ost_lvb_v1);
1570                         if (unlikely(lvb == NULL)) {
1571                                 LDLM_ERROR(lock, "no LVB");
1572                                 RETURN(-EPROTO);
1573                         }
1574
1575                         memcpy(data, lvb, size);
1576                         olvb->lvb_mtime_ns = 0;
1577                         olvb->lvb_atime_ns = 0;
1578                         olvb->lvb_ctime_ns = 0;
1579                 } else {
1580                         LDLM_ERROR(lock, "Replied unexpected ost LVB size %d",
1581                                    size);
1582                         RETURN(-EINVAL);
1583                 }
1584                 break;
1585         case LVB_T_LQUOTA:
1586                 if (size == sizeof(struct lquota_lvb)) {
1587                         if (loc == RCL_CLIENT)
1588                                 lvb = req_capsule_client_swab_get(pill,
1589                                                 &RMF_DLM_LVB,
1590                                                 lustre_swab_lquota_lvb);
1591                         else
1592                                 lvb = req_capsule_server_swab_get(pill,
1593                                                 &RMF_DLM_LVB,
1594                                                 lustre_swab_lquota_lvb);
1595                         if (unlikely(lvb == NULL)) {
1596                                 LDLM_ERROR(lock, "no LVB");
1597                                 RETURN(-EPROTO);
1598                         }
1599
1600                         memcpy(data, lvb, size);
1601                 } else {
1602                         LDLM_ERROR(lock, "Replied unexpected lquota LVB size %d",
1603                                    size);
1604                         RETURN(-EINVAL);
1605                 }
1606                 break;
1607         case LVB_T_LAYOUT:
1608                 if (size == 0)
1609                         break;
1610
1611                 if (loc == RCL_CLIENT)
1612                         lvb = req_capsule_client_get(pill, &RMF_DLM_LVB);
1613                 else
1614                         lvb = req_capsule_server_get(pill, &RMF_DLM_LVB);
1615                 if (unlikely(lvb == NULL)) {
1616                         LDLM_ERROR(lock, "no LVB");
1617                         RETURN(-EPROTO);
1618                 }
1619
1620                 memcpy(data, lvb, size);
1621                 break;
1622         default:
1623                 LDLM_ERROR(lock, "Unknown LVB type: %d\n", lock->l_lvb_type);
1624                 libcfs_debug_dumpstack(NULL);
1625                 RETURN(-EINVAL);
1626         }
1627
1628         RETURN(0);
1629 }
1630
1631 /**
1632  * Create and fill in new LDLM lock with specified properties.
1633  * Returns a referenced lock
1634  */
1635 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1636                                    const struct ldlm_res_id *res_id,
1637                                    ldlm_type_t type,
1638                                    ldlm_mode_t mode,
1639                                    const struct ldlm_callback_suite *cbs,
1640                                    void *data, __u32 lvb_len,
1641                                    enum lvb_type lvb_type)
1642 {
1643         struct ldlm_lock        *lock;
1644         struct ldlm_resource    *res;
1645         int                     rc;
1646         ENTRY;
1647
1648         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1649         if (IS_ERR(res))
1650                 RETURN(ERR_CAST(res));
1651
1652         lock = ldlm_lock_new(res);
1653         if (lock == NULL)
1654                 RETURN(ERR_PTR(-ENOMEM));
1655
1656         lock->l_req_mode = mode;
1657         lock->l_ast_data = data;
1658         lock->l_pid = current_pid();
1659         if (ns_is_server(ns))
1660                 ldlm_set_ns_srv(lock);
1661         if (cbs) {
1662                 lock->l_blocking_ast = cbs->lcs_blocking;
1663                 lock->l_completion_ast = cbs->lcs_completion;
1664                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1665         }
1666
1667         lock->l_tree_node = NULL;
1668         /* if this is the extent lock, allocate the interval tree node */
1669         if (type == LDLM_EXTENT)
1670                 if (ldlm_interval_alloc(lock) == NULL)
1671                         GOTO(out, rc = -ENOMEM);
1672
1673         if (lvb_len) {
1674                 lock->l_lvb_len = lvb_len;
1675                 OBD_ALLOC_LARGE(lock->l_lvb_data, lvb_len);
1676                 if (lock->l_lvb_data == NULL)
1677                         GOTO(out, rc = -ENOMEM);
1678         }
1679
1680         lock->l_lvb_type = lvb_type;
1681         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1682                 GOTO(out, rc = -ENOENT);
1683
1684         RETURN(lock);
1685
1686 out:
1687         ldlm_lock_destroy(lock);
1688         LDLM_LOCK_RELEASE(lock);
1689         RETURN(ERR_PTR(rc));
1690 }
1691
1692 /**
1693  * Enqueue (request) a lock.
1694  *
1695  * Does not block. As a result of enqueue the lock would be put
1696  * into granted or waiting list.
1697  *
1698  * If namespace has intent policy sent and the lock has LDLM_FL_HAS_INTENT flag
1699  * set, skip all the enqueueing and delegate lock processing to intent policy
1700  * function.
1701  */
1702 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1703                                struct ldlm_lock **lockp,
1704                                void *cookie, __u64 *flags)
1705 {
1706         struct ldlm_lock *lock = *lockp;
1707         struct ldlm_resource *res = lock->l_resource;
1708         int local = ns_is_client(ldlm_res_to_ns(res));
1709 #ifdef HAVE_SERVER_SUPPORT
1710         ldlm_processing_policy policy;
1711 #endif
1712         ldlm_error_t rc = ELDLM_OK;
1713         struct ldlm_interval *node = NULL;
1714         ENTRY;
1715
1716         /* policies are not executed on the client or during replay */
1717         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1718             && !local && ns->ns_policy) {
1719                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1720                                    NULL);
1721                 if (rc == ELDLM_LOCK_REPLACED) {
1722                         /* The lock that was returned has already been granted,
1723                          * and placed into lockp.  If it's not the same as the
1724                          * one we passed in, then destroy the old one and our
1725                          * work here is done. */
1726                         if (lock != *lockp) {
1727                                 ldlm_lock_destroy(lock);
1728                                 LDLM_LOCK_RELEASE(lock);
1729                         }
1730                         *flags |= LDLM_FL_LOCK_CHANGED;
1731                         RETURN(0);
1732                 } else if (rc != ELDLM_OK ||
1733                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1734                         ldlm_lock_destroy(lock);
1735                         RETURN(rc);
1736                 }
1737         }
1738
1739         if (*flags & LDLM_FL_RESENT) {
1740                 /* Reconstruct LDLM_FL_SRV_ENQ_MASK @flags for reply.
1741                  * Set LOCK_CHANGED always.
1742                  * Check if the lock is granted for BLOCK_GRANTED.
1743                  * Take NO_TIMEOUT from the lock as it is inherited through
1744                  * LDLM_FL_INHERIT_MASK */
1745                 *flags |= LDLM_FL_LOCK_CHANGED;
1746                 if (lock->l_req_mode != lock->l_granted_mode)
1747                         *flags |= LDLM_FL_BLOCK_GRANTED;
1748                 *flags |= lock->l_flags & LDLM_FL_NO_TIMEOUT;
1749                 RETURN(ELDLM_OK);
1750         }
1751
1752         /* For a replaying lock, it might be already in granted list. So
1753          * unlinking the lock will cause the interval node to be freed, we
1754          * have to allocate the interval node early otherwise we can't regrant
1755          * this lock in the future. - jay */
1756         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1757                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
1758
1759         lock_res_and_lock(lock);
1760         if (local && lock->l_req_mode == lock->l_granted_mode) {
1761                 /* The server returned a blocked lock, but it was granted
1762                  * before we got a chance to actually enqueue it.  We don't
1763                  * need to do anything else. */
1764                 *flags &= ~LDLM_FL_BLOCKED_MASK;
1765                 GOTO(out, rc = ELDLM_OK);
1766         }
1767
1768         ldlm_resource_unlink_lock(lock);
1769         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1770                 if (node == NULL) {
1771                         ldlm_lock_destroy_nolock(lock);
1772                         GOTO(out, rc = -ENOMEM);
1773                 }
1774
1775                 INIT_LIST_HEAD(&node->li_group);
1776                 ldlm_interval_attach(node, lock);
1777                 node = NULL;
1778         }
1779
1780         /* Some flags from the enqueue want to make it into the AST, via the
1781          * lock's l_flags. */
1782         if (*flags & LDLM_FL_AST_DISCARD_DATA)
1783                 ldlm_set_ast_discard_data(lock);
1784         if (*flags & LDLM_FL_TEST_LOCK)
1785                 ldlm_set_test_lock(lock);
1786
1787         /* This distinction between local lock trees is very important; a client
1788          * namespace only has information about locks taken by that client, and
1789          * thus doesn't have enough information to decide for itself if it can
1790          * be granted (below).  In this case, we do exactly what the server
1791          * tells us to do, as dictated by the 'flags'.
1792          *
1793          * We do exactly the same thing during recovery, when the server is
1794          * more or less trusting the clients not to lie.
1795          *
1796          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1797          * granted/converting queues. */
1798         if (local) {
1799                 if (*flags & LDLM_FL_BLOCK_CONV)
1800                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1801                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1802                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1803                 else
1804                         ldlm_grant_lock(lock, NULL);
1805                 GOTO(out, rc = ELDLM_OK);
1806 #ifdef HAVE_SERVER_SUPPORT
1807         } else if (*flags & LDLM_FL_REPLAY) {
1808                 if (*flags & LDLM_FL_BLOCK_CONV) {
1809                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1810                         GOTO(out, rc = ELDLM_OK);
1811                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1812                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1813                         GOTO(out, rc = ELDLM_OK);
1814                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1815                         ldlm_grant_lock(lock, NULL);
1816                         GOTO(out, rc = ELDLM_OK);
1817                 }
1818                 /* If no flags, fall through to normal enqueue path. */
1819         }
1820
1821         policy = ldlm_processing_policy_table[res->lr_type];
1822         policy(lock, flags, 1, &rc, NULL);
1823         GOTO(out, rc);
1824 #else
1825         } else {
1826                 CERROR("This is client-side-only module, cannot handle "
1827                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1828                 LBUG();
1829         }
1830 #endif
1831
1832 out:
1833         unlock_res_and_lock(lock);
1834         if (node)
1835                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1836         return rc;
1837 }
1838
1839 #ifdef HAVE_SERVER_SUPPORT
1840 /**
1841  * Iterate through all waiting locks on a given resource queue and attempt to
1842  * grant them.
1843  *
1844  * Must be called with resource lock held.
1845  */
1846 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
1847                          struct list_head *work_list)
1848 {
1849         struct list_head *tmp, *pos;
1850         ldlm_processing_policy policy;
1851         __u64 flags;
1852         int rc = LDLM_ITER_CONTINUE;
1853         ldlm_error_t err;
1854         ENTRY;
1855
1856         check_res_locked(res);
1857
1858         policy = ldlm_processing_policy_table[res->lr_type];
1859         LASSERT(policy);
1860
1861         list_for_each_safe(tmp, pos, queue) {
1862                 struct ldlm_lock *pending;
1863                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
1864
1865                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1866
1867                 flags = 0;
1868                 rc = policy(pending, &flags, 0, &err, work_list);
1869                 if (rc != LDLM_ITER_CONTINUE)
1870                         break;
1871         }
1872
1873         RETURN(rc);
1874 }
1875 #endif
1876
1877 /**
1878  * Process a call to blocking AST callback for a lock in ast_work list
1879  */
1880 static int
1881 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1882 {
1883         struct ldlm_cb_set_arg *arg = opaq;
1884         struct ldlm_lock_desc   d;
1885         int                     rc;
1886         struct ldlm_lock       *lock;
1887         ENTRY;
1888
1889         if (list_empty(arg->list))
1890                 RETURN(-ENOENT);
1891
1892         lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1893
1894         /* nobody should touch l_bl_ast */
1895         lock_res_and_lock(lock);
1896         list_del_init(&lock->l_bl_ast);
1897
1898         LASSERT(ldlm_is_ast_sent(lock));
1899         LASSERT(lock->l_bl_ast_run == 0);
1900         LASSERT(lock->l_blocking_lock);
1901         lock->l_bl_ast_run++;
1902         unlock_res_and_lock(lock);
1903
1904         ldlm_lock2desc(lock->l_blocking_lock, &d);
1905
1906         rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1907         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1908         lock->l_blocking_lock = NULL;
1909         LDLM_LOCK_RELEASE(lock);
1910
1911         RETURN(rc);
1912 }
1913
1914 /**
1915  * Process a call to completion AST callback for a lock in ast_work list
1916  */
1917 static int
1918 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1919 {
1920         struct ldlm_cb_set_arg  *arg = opaq;
1921         int                      rc = 0;
1922         struct ldlm_lock        *lock;
1923         ldlm_completion_callback completion_callback;
1924         ENTRY;
1925
1926         if (list_empty(arg->list))
1927                 RETURN(-ENOENT);
1928
1929         lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1930
1931         /* It's possible to receive a completion AST before we've set
1932          * the l_completion_ast pointer: either because the AST arrived
1933          * before the reply, or simply because there's a small race
1934          * window between receiving the reply and finishing the local
1935          * enqueue. (bug 842)
1936          *
1937          * This can't happen with the blocking_ast, however, because we
1938          * will never call the local blocking_ast until we drop our
1939          * reader/writer reference, which we won't do until we get the
1940          * reply and finish enqueueing. */
1941
1942         /* nobody should touch l_cp_ast */
1943         lock_res_and_lock(lock);
1944         list_del_init(&lock->l_cp_ast);
1945         LASSERT(ldlm_is_cp_reqd(lock));
1946         /* save l_completion_ast since it can be changed by
1947          * mds_intent_policy(), see bug 14225 */
1948         completion_callback = lock->l_completion_ast;
1949         ldlm_clear_cp_reqd(lock);
1950         unlock_res_and_lock(lock);
1951
1952         if (completion_callback != NULL)
1953                 rc = completion_callback(lock, 0, (void *)arg);
1954         LDLM_LOCK_RELEASE(lock);
1955
1956         RETURN(rc);
1957 }
1958
1959 /**
1960  * Process a call to revocation AST callback for a lock in ast_work list
1961  */
1962 static int
1963 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1964 {
1965         struct ldlm_cb_set_arg *arg = opaq;
1966         struct ldlm_lock_desc   desc;
1967         int                     rc;
1968         struct ldlm_lock       *lock;
1969         ENTRY;
1970
1971         if (list_empty(arg->list))
1972                 RETURN(-ENOENT);
1973
1974         lock = list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1975         list_del_init(&lock->l_rk_ast);
1976
1977         /* the desc just pretend to exclusive */
1978         ldlm_lock2desc(lock, &desc);
1979         desc.l_req_mode = LCK_EX;
1980         desc.l_granted_mode = 0;
1981
1982         rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1983         LDLM_LOCK_RELEASE(lock);
1984
1985         RETURN(rc);
1986 }
1987
1988 /**
1989  * Process a call to glimpse AST callback for a lock in ast_work list
1990  */
1991 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1992 {
1993         struct ldlm_cb_set_arg          *arg = opaq;
1994         struct ldlm_glimpse_work        *gl_work;
1995         struct ldlm_lock                *lock;
1996         int                              rc = 0;
1997         ENTRY;
1998
1999         if (list_empty(arg->list))
2000                 RETURN(-ENOENT);
2001
2002         gl_work = list_entry(arg->list->next, struct ldlm_glimpse_work,
2003                                  gl_list);
2004         list_del_init(&gl_work->gl_list);
2005
2006         lock = gl_work->gl_lock;
2007
2008         /* transfer the glimpse descriptor to ldlm_cb_set_arg */
2009         arg->gl_desc = gl_work->gl_desc;
2010
2011         /* invoke the actual glimpse callback */
2012         if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
2013                 rc = 1;
2014
2015         LDLM_LOCK_RELEASE(lock);
2016
2017         if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
2018                 OBD_FREE_PTR(gl_work);
2019
2020         RETURN(rc);
2021 }
2022
2023 /**
2024  * Process list of locks in need of ASTs being sent.
2025  *
2026  * Used on server to send multiple ASTs together instead of sending one by
2027  * one.
2028  */
2029 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
2030                       ldlm_desc_ast_t ast_type)
2031 {
2032         struct ldlm_cb_set_arg *arg;
2033         set_producer_func       work_ast_lock;
2034         int                     rc;
2035
2036         if (list_empty(rpc_list))
2037                 RETURN(0);
2038
2039         OBD_ALLOC_PTR(arg);
2040         if (arg == NULL)
2041                 RETURN(-ENOMEM);
2042
2043         atomic_set(&arg->restart, 0);
2044         arg->list = rpc_list;
2045
2046         switch (ast_type) {
2047                 case LDLM_WORK_BL_AST:
2048                         arg->type = LDLM_BL_CALLBACK;
2049                         work_ast_lock = ldlm_work_bl_ast_lock;
2050                         break;
2051                 case LDLM_WORK_CP_AST:
2052                         arg->type = LDLM_CP_CALLBACK;
2053                         work_ast_lock = ldlm_work_cp_ast_lock;
2054                         break;
2055                 case LDLM_WORK_REVOKE_AST:
2056                         arg->type = LDLM_BL_CALLBACK;
2057                         work_ast_lock = ldlm_work_revoke_ast_lock;
2058                         break;
2059                 case LDLM_WORK_GL_AST:
2060                         arg->type = LDLM_GL_CALLBACK;
2061                         work_ast_lock = ldlm_work_gl_ast_lock;
2062                         break;
2063                 default:
2064                         LBUG();
2065         }
2066
2067         /* We create a ptlrpc request set with flow control extension.
2068          * This request set will use the work_ast_lock function to produce new
2069          * requests and will send a new request each time one completes in order
2070          * to keep the number of requests in flight to ns_max_parallel_ast */
2071         arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
2072                                      work_ast_lock, arg);
2073         if (arg->set == NULL)
2074                 GOTO(out, rc = -ENOMEM);
2075
2076         ptlrpc_set_wait(arg->set);
2077         ptlrpc_set_destroy(arg->set);
2078
2079         rc = atomic_read(&arg->restart) ? -ERESTART : 0;
2080         GOTO(out, rc);
2081 out:
2082         OBD_FREE_PTR(arg);
2083         return rc;
2084 }
2085
2086 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
2087 {
2088         ldlm_reprocess_all(res);
2089         return LDLM_ITER_CONTINUE;
2090 }
2091
2092 static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2093                               struct hlist_node *hnode, void *arg)
2094 {
2095         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2096         int    rc;
2097
2098         rc = reprocess_one_queue(res, arg);
2099
2100         return rc == LDLM_ITER_STOP;
2101 }
2102
2103 /**
2104  * Iterate through all resources on a namespace attempting to grant waiting
2105  * locks.
2106  */
2107 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
2108 {
2109         ENTRY;
2110
2111         if (ns != NULL) {
2112                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2113                                          ldlm_reprocess_res, NULL, 0);
2114         }
2115         EXIT;
2116 }
2117
2118 /**
2119  * Try to grant all waiting locks on a resource.
2120  *
2121  * Calls ldlm_reprocess_queue on converting and waiting queues.
2122  *
2123  * Typically called after some resource locks are cancelled to see
2124  * if anything could be granted as a result of the cancellation.
2125  */
2126 void ldlm_reprocess_all(struct ldlm_resource *res)
2127 {
2128         struct list_head rpc_list;
2129 #ifdef HAVE_SERVER_SUPPORT
2130         int rc;
2131         ENTRY;
2132
2133         INIT_LIST_HEAD(&rpc_list);
2134         /* Local lock trees don't get reprocessed. */
2135         if (ns_is_client(ldlm_res_to_ns(res))) {
2136                 EXIT;
2137                 return;
2138         }
2139
2140 restart:
2141         lock_res(res);
2142         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
2143         if (rc == LDLM_ITER_CONTINUE)
2144                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
2145         unlock_res(res);
2146
2147         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
2148                                LDLM_WORK_CP_AST);
2149         if (rc == -ERESTART) {
2150                 LASSERT(list_empty(&rpc_list));
2151                 goto restart;
2152         }
2153 #else
2154         ENTRY;
2155
2156         INIT_LIST_HEAD(&rpc_list);
2157         if (!ns_is_client(ldlm_res_to_ns(res))) {
2158                 CERROR("This is client-side-only module, cannot handle "
2159                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2160                 LBUG();
2161         }
2162 #endif
2163         EXIT;
2164 }
2165 EXPORT_SYMBOL(ldlm_reprocess_all);
2166
2167 /**
2168  * Helper function to call blocking AST for LDLM lock \a lock in a
2169  * "cancelling" mode.
2170  */
2171 void ldlm_cancel_callback(struct ldlm_lock *lock)
2172 {
2173         check_res_locked(lock->l_resource);
2174         if (!ldlm_is_cancel(lock)) {
2175                 ldlm_set_cancel(lock);
2176                 if (lock->l_blocking_ast) {
2177                         unlock_res_and_lock(lock);
2178                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
2179                                              LDLM_CB_CANCELING);
2180                         lock_res_and_lock(lock);
2181                 } else {
2182                         LDLM_DEBUG(lock, "no blocking ast");
2183                 }
2184         }
2185         ldlm_set_bl_done(lock);
2186 }
2187
2188 /**
2189  * Remove skiplist-enabled LDLM lock \a req from granted list
2190  */
2191 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
2192 {
2193         if (req->l_resource->lr_type != LDLM_PLAIN &&
2194             req->l_resource->lr_type != LDLM_IBITS)
2195                 return;
2196
2197         list_del_init(&req->l_sl_policy);
2198         list_del_init(&req->l_sl_mode);
2199 }
2200
2201 /**
2202  * Attempts to cancel LDLM lock \a lock that has no reader/writer references.
2203  */
2204 void ldlm_lock_cancel(struct ldlm_lock *lock)
2205 {
2206         struct ldlm_resource *res;
2207         struct ldlm_namespace *ns;
2208         ENTRY;
2209
2210         lock_res_and_lock(lock);
2211
2212         res = lock->l_resource;
2213         ns  = ldlm_res_to_ns(res);
2214
2215         /* Please do not, no matter how tempting, remove this LBUG without
2216          * talking to me first. -phik */
2217         if (lock->l_readers || lock->l_writers) {
2218                 LDLM_ERROR(lock, "lock still has references");
2219                 LBUG();
2220         }
2221
2222         if (ldlm_is_waited(lock))
2223                 ldlm_del_waiting_lock(lock);
2224
2225         /* Releases cancel callback. */
2226         ldlm_cancel_callback(lock);
2227
2228         LASSERT(!ldlm_is_waited(lock));
2229
2230         ldlm_resource_unlink_lock(lock);
2231         ldlm_lock_destroy_nolock(lock);
2232
2233         if (lock->l_granted_mode == lock->l_req_mode)
2234                 ldlm_pool_del(&ns->ns_pool, lock);
2235
2236         /* Make sure we will not be called again for same lock what is possible
2237          * if not to zero out lock->l_granted_mode */
2238         lock->l_granted_mode = LCK_MINMODE;
2239         unlock_res_and_lock(lock);
2240
2241         EXIT;
2242 }
2243 EXPORT_SYMBOL(ldlm_lock_cancel);
2244
2245 /**
2246  * Set opaque data into the lock that only makes sense to upper layer.
2247  */
2248 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
2249 {
2250         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2251         int rc = -EINVAL;
2252         ENTRY;
2253
2254         if (lock) {
2255                 if (lock->l_ast_data == NULL)
2256                         lock->l_ast_data = data;
2257                 if (lock->l_ast_data == data)
2258                         rc = 0;
2259                 LDLM_LOCK_PUT(lock);
2260         }
2261         RETURN(rc);
2262 }
2263 EXPORT_SYMBOL(ldlm_lock_set_data);
2264
2265 struct export_cl_data {
2266         struct obd_export       *ecl_exp;
2267         int                     ecl_loop;
2268 };
2269
2270 static void ldlm_cancel_lock_for_export(struct obd_export *exp,
2271                                         struct ldlm_lock *lock,
2272                                         struct export_cl_data *ecl)
2273 {
2274         struct ldlm_resource *res;
2275
2276         res = ldlm_resource_getref(lock->l_resource);
2277
2278         ldlm_res_lvbo_update(res, NULL, 1);
2279         ldlm_lock_cancel(lock);
2280         if (!exp->exp_obd->obd_stopping)
2281                 ldlm_reprocess_all(res);
2282         ldlm_resource_putref(res);
2283
2284         ecl->ecl_loop++;
2285         if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
2286                 CDEBUG(D_INFO, "Export %p, %d locks cancelled.\n",
2287                        exp, ecl->ecl_loop);
2288         }
2289 }
2290
2291 /**
2292  * Iterator function for ldlm_export_cancel_locks.
2293  * Cancels passed locks.
2294  */
2295 static int
2296 ldlm_cancel_locks_for_export_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2297                                 struct hlist_node *hnode, void *data)
2298
2299 {
2300         struct export_cl_data   *ecl = (struct export_cl_data *)data;
2301         struct obd_export       *exp  = ecl->ecl_exp;
2302         struct ldlm_lock        *lock = cfs_hash_object(hs, hnode);
2303
2304         LDLM_LOCK_GET(lock);
2305         ldlm_cancel_lock_for_export(exp, lock, ecl);
2306         LDLM_LOCK_RELEASE(lock);
2307
2308         return 0;
2309 }
2310
2311 /**
2312  * Cancel all blocked locks for given export.
2313  *
2314  * Typically called on client disconnection/eviction
2315  */
2316 int ldlm_export_cancel_blocked_locks(struct obd_export *exp)
2317 {
2318         struct export_cl_data   ecl = {
2319                 .ecl_exp        = exp,
2320                 .ecl_loop       = 0,
2321         };
2322
2323         while (!list_empty(&exp->exp_bl_list)) {
2324                 struct ldlm_lock *lock;
2325
2326                 spin_lock_bh(&exp->exp_bl_list_lock);
2327                 if (!list_empty(&exp->exp_bl_list)) {
2328                         lock = list_entry(exp->exp_bl_list.next,
2329                                           struct ldlm_lock, l_exp_list);
2330                         LDLM_LOCK_GET(lock);
2331                         list_del_init(&lock->l_exp_list);
2332                 } else {
2333                         lock = NULL;
2334                 }
2335                 spin_unlock_bh(&exp->exp_bl_list_lock);
2336
2337                 if (lock == NULL)
2338                         break;
2339
2340                 ldlm_cancel_lock_for_export(exp, lock, &ecl);
2341                 LDLM_LOCK_RELEASE(lock);
2342         }
2343
2344         CDEBUG(D_DLMTRACE, "Export %p, canceled %d locks, "
2345                "left on hash table %d.\n", exp, ecl.ecl_loop,
2346                atomic_read(&exp->exp_lock_hash->hs_count));
2347
2348         return ecl.ecl_loop;
2349 }
2350
2351 /**
2352  * Cancel all locks for given export.
2353  *
2354  * Typically called after client disconnection/eviction
2355  */
2356 int ldlm_export_cancel_locks(struct obd_export *exp)
2357 {
2358         struct export_cl_data   ecl = {
2359                 .ecl_exp        = exp,
2360                 .ecl_loop       = 0,
2361         };
2362
2363         cfs_hash_for_each_empty(exp->exp_lock_hash,
2364                                 ldlm_cancel_locks_for_export_cb, &ecl);
2365
2366         CDEBUG(D_DLMTRACE, "Export %p, canceled %d locks, "
2367                "left on hash table %d.\n", exp, ecl.ecl_loop,
2368                atomic_read(&exp->exp_lock_hash->hs_count));
2369
2370         return ecl.ecl_loop;
2371 }
2372
2373 /**
2374  * Downgrade an exclusive lock.
2375  *
2376  * A fast variant of ldlm_lock_convert for convertion of exclusive
2377  * locks. The convertion is always successful.
2378  * Used by Commit on Sharing (COS) code.
2379  *
2380  * \param lock A lock to convert
2381  * \param new_mode new lock mode
2382  */
2383 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
2384 {
2385         ENTRY;
2386
2387         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
2388         LASSERT(new_mode == LCK_COS);
2389
2390         lock_res_and_lock(lock);
2391         ldlm_resource_unlink_lock(lock);
2392         /*
2393          * Remove the lock from pool as it will be added again in
2394          * ldlm_grant_lock() called below.
2395          */
2396         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
2397
2398         lock->l_req_mode = new_mode;
2399         ldlm_grant_lock(lock, NULL);
2400         unlock_res_and_lock(lock);
2401         ldlm_reprocess_all(lock->l_resource);
2402
2403         EXIT;
2404 }
2405 EXPORT_SYMBOL(ldlm_lock_downgrade);
2406
2407 /**
2408  * Attempt to convert already granted lock to a different mode.
2409  *
2410  * While lock conversion is not currently used, future client-side
2411  * optimizations could take advantage of it to avoid discarding cached
2412  * pages on a file.
2413  */
2414 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
2415                                         __u32 *flags)
2416 {
2417         struct list_head rpc_list;
2418         struct ldlm_resource *res;
2419         struct ldlm_namespace *ns;
2420         int granted = 0;
2421 #ifdef HAVE_SERVER_SUPPORT
2422         int old_mode;
2423         struct sl_insert_point prev;
2424 #endif
2425         struct ldlm_interval *node;
2426         ENTRY;
2427
2428         INIT_LIST_HEAD(&rpc_list);
2429         /* Just return if mode is unchanged. */
2430         if (new_mode == lock->l_granted_mode) {
2431                 *flags |= LDLM_FL_BLOCK_GRANTED;
2432                 RETURN(lock->l_resource);
2433         }
2434
2435         /* I can't check the type of lock here because the bitlock of lock
2436          * is not held here, so do the allocation blindly. -jay */
2437         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
2438         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
2439                 RETURN(NULL);
2440
2441         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
2442                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
2443
2444         lock_res_and_lock(lock);
2445
2446         res = lock->l_resource;
2447         ns  = ldlm_res_to_ns(res);
2448
2449 #ifdef HAVE_SERVER_SUPPORT
2450         old_mode = lock->l_req_mode;
2451 #endif
2452         lock->l_req_mode = new_mode;
2453         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
2454 #ifdef HAVE_SERVER_SUPPORT
2455                 /* remember the lock position where the lock might be
2456                  * added back to the granted list later and also
2457                  * remember the join mode for skiplist fixing. */
2458                 prev.res_link = lock->l_res_link.prev;
2459                 prev.mode_link = lock->l_sl_mode.prev;
2460                 prev.policy_link = lock->l_sl_policy.prev;
2461 #endif
2462                 ldlm_resource_unlink_lock(lock);
2463         } else {
2464                 ldlm_resource_unlink_lock(lock);
2465                 if (res->lr_type == LDLM_EXTENT) {
2466                         /* FIXME: ugly code, I have to attach the lock to a
2467                          * interval node again since perhaps it will be granted
2468                          * soon */
2469                         INIT_LIST_HEAD(&node->li_group);
2470                         ldlm_interval_attach(node, lock);
2471                         node = NULL;
2472                 }
2473         }
2474
2475         /*
2476          * Remove old lock from the pool before adding the lock with new
2477          * mode below in ->policy()
2478          */
2479         ldlm_pool_del(&ns->ns_pool, lock);
2480
2481         /* If this is a local resource, put it on the appropriate list. */
2482         if (ns_is_client(ldlm_res_to_ns(res))) {
2483                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
2484                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
2485                 } else {
2486                         /* This should never happen, because of the way the
2487                          * server handles conversions. */
2488                         LDLM_ERROR(lock, "Erroneous flags %x on local lock\n",
2489                                    *flags);
2490                         LBUG();
2491
2492                         ldlm_grant_lock(lock, &rpc_list);
2493                         granted = 1;
2494                         /* FIXME: completion handling not with lr_lock held ! */
2495                         if (lock->l_completion_ast)
2496                                 lock->l_completion_ast(lock, 0, NULL);
2497                 }
2498 #ifdef HAVE_SERVER_SUPPORT
2499         } else {
2500                 int rc;
2501                 ldlm_error_t err;
2502                 __u64 pflags = 0;
2503                 ldlm_processing_policy policy;
2504                 policy = ldlm_processing_policy_table[res->lr_type];
2505                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
2506                 if (rc == LDLM_ITER_STOP) {
2507                         lock->l_req_mode = old_mode;
2508                         if (res->lr_type == LDLM_EXTENT)
2509                                 ldlm_extent_add_lock(res, lock);
2510                         else
2511                                 ldlm_granted_list_add_lock(lock, &prev);
2512
2513                         res = NULL;
2514                 } else {
2515                         *flags |= LDLM_FL_BLOCK_GRANTED;
2516                         granted = 1;
2517                 }
2518         }
2519 #else
2520         } else {
2521                 CERROR("This is client-side-only module, cannot handle "
2522                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2523                 LBUG();
2524         }
2525 #endif
2526         unlock_res_and_lock(lock);
2527
2528         if (granted)
2529                 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2530         if (node)
2531                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2532         RETURN(res);
2533 }
2534
2535 /**
2536  * Print lock with lock handle \a lockh description into debug log.
2537  *
2538  * Used when printing all locks on a resource for debug purposes.
2539  */
2540 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2541 {
2542         struct ldlm_lock *lock;
2543
2544         if (!((libcfs_debug | D_ERROR) & level))
2545                 return;
2546
2547         lock = ldlm_handle2lock(lockh);
2548         if (lock == NULL)
2549                 return;
2550
2551         LDLM_DEBUG_LIMIT(level, lock, "###");
2552
2553         LDLM_LOCK_PUT(lock);
2554 }
2555 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2556
2557 /**
2558  * Print lock information with custom message into debug log.
2559  * Helper function.
2560  */
2561 void _ldlm_lock_debug(struct ldlm_lock *lock,
2562                       struct libcfs_debug_msg_data *msgdata,
2563                       const char *fmt, ...)
2564 {
2565         va_list args;
2566         struct obd_export *exp = lock->l_export;
2567         struct ldlm_resource *resource = lock->l_resource;
2568         char *nid = "local";
2569
2570         va_start(args, fmt);
2571
2572         if (exp && exp->exp_connection) {
2573                 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2574         } else if (exp && exp->exp_obd != NULL) {
2575                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2576                 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2577         }
2578
2579         if (resource == NULL) {
2580                 libcfs_debug_vmsg2(msgdata, fmt, args,
2581                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2582                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2583                        "remote: "LPX64" expref: %d pid: %u timeout: %lu "
2584                        "lvb_type: %d\n",
2585                        lock,
2586                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2587                        lock->l_readers, lock->l_writers,
2588                        ldlm_lockname[lock->l_granted_mode],
2589                        ldlm_lockname[lock->l_req_mode],
2590                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2591                        exp ? atomic_read(&exp->exp_refcount) : -99,
2592                        lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2593                 va_end(args);
2594                 return;
2595         }
2596
2597         switch (resource->lr_type) {
2598         case LDLM_EXTENT:
2599                 libcfs_debug_vmsg2(msgdata, fmt, args,
2600                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2601                         "res: "DLDLMRES" rrc: %d type: %s ["LPU64"->"LPU64"] "
2602                         "(req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote: "
2603                         LPX64" expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2604                         ldlm_lock_to_ns_name(lock), lock,
2605                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2606                         lock->l_readers, lock->l_writers,
2607                         ldlm_lockname[lock->l_granted_mode],
2608                         ldlm_lockname[lock->l_req_mode],
2609                         PLDLMRES(resource),
2610                         atomic_read(&resource->lr_refcount),
2611                         ldlm_typename[resource->lr_type],
2612                         lock->l_policy_data.l_extent.start,
2613                         lock->l_policy_data.l_extent.end,
2614                         lock->l_req_extent.start, lock->l_req_extent.end,
2615                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2616                         exp ? atomic_read(&exp->exp_refcount) : -99,
2617                         lock->l_pid, lock->l_callback_timeout,
2618                         lock->l_lvb_type);
2619                 break;
2620
2621         case LDLM_FLOCK:
2622                 libcfs_debug_vmsg2(msgdata, fmt, args,
2623                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2624                         "res: "DLDLMRES" rrc: %d type: %s pid: %d "
2625                         "["LPU64"->"LPU64"] flags: "LPX64" nid: %s "
2626                         "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
2627                         ldlm_lock_to_ns_name(lock), lock,
2628                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2629                         lock->l_readers, lock->l_writers,
2630                         ldlm_lockname[lock->l_granted_mode],
2631                         ldlm_lockname[lock->l_req_mode],
2632                         PLDLMRES(resource),
2633                         atomic_read(&resource->lr_refcount),
2634                         ldlm_typename[resource->lr_type],
2635                         lock->l_policy_data.l_flock.pid,
2636                         lock->l_policy_data.l_flock.start,
2637                         lock->l_policy_data.l_flock.end,
2638                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2639                         exp ? atomic_read(&exp->exp_refcount) : -99,
2640                         lock->l_pid, lock->l_callback_timeout);
2641                 break;
2642
2643         case LDLM_IBITS:
2644                 libcfs_debug_vmsg2(msgdata, fmt, args,
2645                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2646                         "res: "DLDLMRES" bits "LPX64" rrc: %d type: %s "
2647                         "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2648                         "pid: %u timeout: %lu lvb_type: %d\n",
2649                         ldlm_lock_to_ns_name(lock),
2650                         lock, lock->l_handle.h_cookie,
2651                         atomic_read(&lock->l_refc),
2652                         lock->l_readers, lock->l_writers,
2653                         ldlm_lockname[lock->l_granted_mode],
2654                         ldlm_lockname[lock->l_req_mode],
2655                         PLDLMRES(resource),
2656                         lock->l_policy_data.l_inodebits.bits,
2657                         atomic_read(&resource->lr_refcount),
2658                         ldlm_typename[resource->lr_type],
2659                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2660                         exp ? atomic_read(&exp->exp_refcount) : -99,
2661                         lock->l_pid, lock->l_callback_timeout,
2662                         lock->l_lvb_type);
2663                 break;
2664
2665         default:
2666                 libcfs_debug_vmsg2(msgdata, fmt, args,
2667                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2668                         "res: "DLDLMRES" rrc: %d type: %s flags: "LPX64" "
2669                         "nid: %s remote: "LPX64" expref: %d pid: %u "
2670                         "timeout: %lu lvb_type: %d\n",
2671                         ldlm_lock_to_ns_name(lock),
2672                         lock, lock->l_handle.h_cookie,
2673                         atomic_read(&lock->l_refc),
2674                         lock->l_readers, lock->l_writers,
2675                         ldlm_lockname[lock->l_granted_mode],
2676                         ldlm_lockname[lock->l_req_mode],
2677                         PLDLMRES(resource),
2678                         atomic_read(&resource->lr_refcount),
2679                         ldlm_typename[resource->lr_type],
2680                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2681                         exp ? atomic_read(&exp->exp_refcount) : -99,
2682                         lock->l_pid, lock->l_callback_timeout,
2683                         lock->l_lvb_type);
2684                 break;
2685         }
2686         va_end(args);
2687 }
2688 EXPORT_SYMBOL(_ldlm_lock_debug);