Whamcloud - gitweb
LU-1346 libcfs: replace libcfs wrappers with kernel API
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #ifdef __KERNEL__
45 # include <libcfs/libcfs.h>
46 # include <linux/lustre_intent.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include "ldlm_internal.h"
53
54 /* lock types */
55 char *ldlm_lockname[] = {
56         [0] "--",
57         [LCK_EX] "EX",
58         [LCK_PW] "PW",
59         [LCK_PR] "PR",
60         [LCK_CW] "CW",
61         [LCK_CR] "CR",
62         [LCK_NL] "NL",
63         [LCK_GROUP] "GROUP",
64         [LCK_COS] "COS"
65 };
66 EXPORT_SYMBOL(ldlm_lockname);
67
68 char *ldlm_typename[] = {
69         [LDLM_PLAIN] "PLN",
70         [LDLM_EXTENT] "EXT",
71         [LDLM_FLOCK] "FLK",
72         [LDLM_IBITS] "IBT",
73 };
74 EXPORT_SYMBOL(ldlm_typename);
75
76 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
77         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
78         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
79         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire18_to_local,
80         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
81 };
82
83 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
84         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
85         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
86         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire21_to_local,
87         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
88 };
89
90 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
91         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_local_to_wire,
92         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_local_to_wire,
93         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_local_to_wire,
94         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_local_to_wire,
95 };
96
97 /**
98  * Converts lock policy from local format to on the wire lock_desc format
99  */
100 void ldlm_convert_policy_to_wire(ldlm_type_t type,
101                                  const ldlm_policy_data_t *lpolicy,
102                                  ldlm_wire_policy_data_t *wpolicy)
103 {
104         ldlm_policy_local_to_wire_t convert;
105
106         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
107
108         convert(lpolicy, wpolicy);
109 }
110
111 /**
112  * Converts lock policy from on the wire lock_desc format to local format
113  */
114 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
115                                   const ldlm_wire_policy_data_t *wpolicy,
116                                   ldlm_policy_data_t *lpolicy)
117 {
118         ldlm_policy_wire_to_local_t convert;
119         int new_client;
120
121         /** some badnes for 2.0.0 clients, but 2.0.0 isn't supported */
122         new_client = (exp->exp_connect_flags & OBD_CONNECT_FULL20) != 0;
123         if (new_client)
124                convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
125         else
126                convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
127
128         convert(wpolicy, lpolicy);
129 }
130
131 char *ldlm_it2str(int it)
132 {
133         switch (it) {
134         case IT_OPEN:
135                 return "open";
136         case IT_CREAT:
137                 return "creat";
138         case (IT_OPEN | IT_CREAT):
139                 return "open|creat";
140         case IT_READDIR:
141                 return "readdir";
142         case IT_GETATTR:
143                 return "getattr";
144         case IT_LOOKUP:
145                 return "lookup";
146         case IT_UNLINK:
147                 return "unlink";
148         case IT_GETXATTR:
149                 return "getxattr";
150         case IT_LAYOUT:
151                 return "layout";
152         default:
153                 CERROR("Unknown intent %d\n", it);
154                 return "UNKNOWN";
155         }
156 }
157 EXPORT_SYMBOL(ldlm_it2str);
158
159 extern cfs_mem_cache_t *ldlm_lock_slab;
160
161 #ifdef HAVE_SERVER_SUPPORT
162 static ldlm_processing_policy ldlm_processing_policy_table[] = {
163         [LDLM_PLAIN] ldlm_process_plain_lock,
164         [LDLM_EXTENT] ldlm_process_extent_lock,
165 # ifdef __KERNEL__
166         [LDLM_FLOCK] ldlm_process_flock_lock,
167 # endif
168         [LDLM_IBITS] ldlm_process_inodebits_lock,
169 };
170
171 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
172 {
173         return ldlm_processing_policy_table[res->lr_type];
174 }
175 EXPORT_SYMBOL(ldlm_get_processing_policy);
176 #endif /* HAVE_SERVER_SUPPORT */
177
178 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
179 {
180         ns->ns_policy = arg;
181 }
182 EXPORT_SYMBOL(ldlm_register_intent);
183
184 /*
185  * REFCOUNTED LOCK OBJECTS
186  */
187
188
189 /*
190  * Lock refcounts, during creation:
191  *   - one special one for allocation, dec'd only once in destroy
192  *   - one for being a lock that's in-use
193  *   - one for the addref associated with a new lock
194  */
195 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
196 {
197         cfs_atomic_inc(&lock->l_refc);
198         return lock;
199 }
200 EXPORT_SYMBOL(ldlm_lock_get);
201
202 void ldlm_lock_put(struct ldlm_lock *lock)
203 {
204         ENTRY;
205
206         LASSERT(lock->l_resource != LP_POISON);
207         LASSERT(cfs_atomic_read(&lock->l_refc) > 0);
208         if (cfs_atomic_dec_and_test(&lock->l_refc)) {
209                 struct ldlm_resource *res;
210
211                 LDLM_DEBUG(lock,
212                            "final lock_put on destroyed lock, freeing it.");
213
214                 res = lock->l_resource;
215                 LASSERT(lock->l_destroyed);
216                 LASSERT(cfs_list_empty(&lock->l_res_link));
217                 LASSERT(cfs_list_empty(&lock->l_pending_chain));
218
219                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
220                                      LDLM_NSS_LOCKS);
221                 lu_ref_del(&res->lr_reference, "lock", lock);
222                 ldlm_resource_putref(res);
223                 lock->l_resource = NULL;
224                 if (lock->l_export) {
225                         class_export_lock_put(lock->l_export, lock);
226                         lock->l_export = NULL;
227                 }
228
229                 if (lock->l_lvb_data != NULL)
230                         OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
231
232                 ldlm_interval_free(ldlm_interval_detach(lock));
233                 lu_ref_fini(&lock->l_reference);
234                 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
235         }
236
237         EXIT;
238 }
239 EXPORT_SYMBOL(ldlm_lock_put);
240
241 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
242 {
243         int rc = 0;
244         if (!cfs_list_empty(&lock->l_lru)) {
245                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
246
247                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
248                 cfs_list_del_init(&lock->l_lru);
249                 if (lock->l_flags & LDLM_FL_SKIPPED)
250                         lock->l_flags &= ~LDLM_FL_SKIPPED;
251                 LASSERT(ns->ns_nr_unused > 0);
252                 ns->ns_nr_unused--;
253                 rc = 1;
254         }
255         return rc;
256 }
257
258 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
259 {
260         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
261         int rc;
262
263         ENTRY;
264         if (lock->l_ns_srv) {
265                 LASSERT(cfs_list_empty(&lock->l_lru));
266                 RETURN(0);
267         }
268
269         spin_lock(&ns->ns_lock);
270         rc = ldlm_lock_remove_from_lru_nolock(lock);
271         spin_unlock(&ns->ns_lock);
272         EXIT;
273         return rc;
274 }
275
276 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
277 {
278         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
279
280         lock->l_last_used = cfs_time_current();
281         LASSERT(cfs_list_empty(&lock->l_lru));
282         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
283         cfs_list_add_tail(&lock->l_lru, &ns->ns_unused_list);
284         LASSERT(ns->ns_nr_unused >= 0);
285         ns->ns_nr_unused++;
286 }
287
288 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
289 {
290         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
291
292         ENTRY;
293         spin_lock(&ns->ns_lock);
294         ldlm_lock_add_to_lru_nolock(lock);
295         spin_unlock(&ns->ns_lock);
296         EXIT;
297 }
298
299 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
300 {
301         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
302
303         ENTRY;
304         if (lock->l_ns_srv) {
305                 LASSERT(cfs_list_empty(&lock->l_lru));
306                 EXIT;
307                 return;
308         }
309
310         spin_lock(&ns->ns_lock);
311         if (!cfs_list_empty(&lock->l_lru)) {
312                 ldlm_lock_remove_from_lru_nolock(lock);
313                 ldlm_lock_add_to_lru_nolock(lock);
314         }
315         spin_unlock(&ns->ns_lock);
316         EXIT;
317 }
318
319 /* This used to have a 'strict' flag, which recovery would use to mark an
320  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
321  * shall explain why it's gone: with the new hash table scheme, once you call
322  * ldlm_lock_destroy, you can never drop your final references on this lock.
323  * Because it's not in the hash table anymore.  -phil */
324 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
325 {
326         ENTRY;
327
328         if (lock->l_readers || lock->l_writers) {
329                 LDLM_ERROR(lock, "lock still has references");
330                 LBUG();
331         }
332
333         if (!cfs_list_empty(&lock->l_res_link)) {
334                 LDLM_ERROR(lock, "lock still on resource");
335                 LBUG();
336         }
337
338         if (lock->l_destroyed) {
339                 LASSERT(cfs_list_empty(&lock->l_lru));
340                 EXIT;
341                 return 0;
342         }
343         lock->l_destroyed = 1;
344
345         if (lock->l_export && lock->l_export->exp_lock_hash) {
346                 /* NB: it's safe to call cfs_hash_del() even lock isn't
347                  * in exp_lock_hash. */
348                 /* In the function below, .hs_keycmp resolves to
349                  * ldlm_export_lock_keycmp() */
350                 /* coverity[overrun-buffer-val] */
351                 cfs_hash_del(lock->l_export->exp_lock_hash,
352                              &lock->l_remote_handle, &lock->l_exp_hash);
353         }
354
355         ldlm_lock_remove_from_lru(lock);
356         class_handle_unhash(&lock->l_handle);
357
358 #if 0
359         /* Wake anyone waiting for this lock */
360         /* FIXME: I should probably add yet another flag, instead of using
361          * l_export to only call this on clients */
362         if (lock->l_export)
363                 class_export_put(lock->l_export);
364         lock->l_export = NULL;
365         if (lock->l_export && lock->l_completion_ast)
366                 lock->l_completion_ast(lock, 0);
367 #endif
368         EXIT;
369         return 1;
370 }
371
372 void ldlm_lock_destroy(struct ldlm_lock *lock)
373 {
374         int first;
375         ENTRY;
376         lock_res_and_lock(lock);
377         first = ldlm_lock_destroy_internal(lock);
378         unlock_res_and_lock(lock);
379
380         /* drop reference from hashtable only for first destroy */
381         if (first) {
382                 lu_ref_del(&lock->l_reference, "hash", lock);
383                 LDLM_LOCK_RELEASE(lock);
384         }
385         EXIT;
386 }
387
388 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
389 {
390         int first;
391         ENTRY;
392         first = ldlm_lock_destroy_internal(lock);
393         /* drop reference from hashtable only for first destroy */
394         if (first) {
395                 lu_ref_del(&lock->l_reference, "hash", lock);
396                 LDLM_LOCK_RELEASE(lock);
397         }
398         EXIT;
399 }
400
401 /* this is called by portals_handle2object with the handle lock taken */
402 static void lock_handle_addref(void *lock)
403 {
404         LDLM_LOCK_GET((struct ldlm_lock *)lock);
405 }
406
407 static void lock_handle_free(void *lock, int size)
408 {
409         LASSERT(size == sizeof(struct ldlm_lock));
410         OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
411 }
412
413 struct portals_handle_ops lock_handle_ops = {
414         .hop_addref = lock_handle_addref,
415         .hop_free   = lock_handle_free,
416 };
417
418 /*
419  * usage: pass in a resource on which you have done ldlm_resource_get
420  *        new lock will take over the refcount.
421  * returns: lock with refcount 2 - one for current caller and one for remote
422  */
423 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
424 {
425         struct ldlm_lock *lock;
426         ENTRY;
427
428         if (resource == NULL)
429                 LBUG();
430
431         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, CFS_ALLOC_IO);
432         if (lock == NULL)
433                 RETURN(NULL);
434
435         spin_lock_init(&lock->l_lock);
436         lock->l_resource = resource;
437         lu_ref_add(&resource->lr_reference, "lock", lock);
438
439         cfs_atomic_set(&lock->l_refc, 2);
440         CFS_INIT_LIST_HEAD(&lock->l_res_link);
441         CFS_INIT_LIST_HEAD(&lock->l_lru);
442         CFS_INIT_LIST_HEAD(&lock->l_pending_chain);
443         CFS_INIT_LIST_HEAD(&lock->l_bl_ast);
444         CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
445         CFS_INIT_LIST_HEAD(&lock->l_rk_ast);
446         cfs_waitq_init(&lock->l_waitq);
447         lock->l_blocking_lock = NULL;
448         CFS_INIT_LIST_HEAD(&lock->l_sl_mode);
449         CFS_INIT_LIST_HEAD(&lock->l_sl_policy);
450         CFS_INIT_HLIST_NODE(&lock->l_exp_hash);
451         CFS_INIT_HLIST_NODE(&lock->l_exp_flock_hash);
452
453         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
454                              LDLM_NSS_LOCKS);
455         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
456         class_handle_hash(&lock->l_handle, &lock_handle_ops);
457
458         lu_ref_init(&lock->l_reference);
459         lu_ref_add(&lock->l_reference, "hash", lock);
460         lock->l_callback_timeout = 0;
461
462 #if LUSTRE_TRACKS_LOCK_EXP_REFS
463         CFS_INIT_LIST_HEAD(&lock->l_exp_refs_link);
464         lock->l_exp_refs_nr = 0;
465         lock->l_exp_refs_target = NULL;
466 #endif
467         CFS_INIT_LIST_HEAD(&lock->l_exp_list);
468
469         RETURN(lock);
470 }
471
472 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
473                               const struct ldlm_res_id *new_resid)
474 {
475         struct ldlm_resource *oldres = lock->l_resource;
476         struct ldlm_resource *newres;
477         int type;
478         ENTRY;
479
480         LASSERT(ns_is_client(ns));
481
482         lock_res_and_lock(lock);
483         if (memcmp(new_resid, &lock->l_resource->lr_name,
484                    sizeof(lock->l_resource->lr_name)) == 0) {
485                 /* Nothing to do */
486                 unlock_res_and_lock(lock);
487                 RETURN(0);
488         }
489
490         LASSERT(new_resid->name[0] != 0);
491
492         /* This function assumes that the lock isn't on any lists */
493         LASSERT(cfs_list_empty(&lock->l_res_link));
494
495         type = oldres->lr_type;
496         unlock_res_and_lock(lock);
497
498         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
499         if (newres == NULL)
500                 RETURN(-ENOMEM);
501
502         lu_ref_add(&newres->lr_reference, "lock", lock);
503         /*
504          * To flip the lock from the old to the new resource, lock, oldres and
505          * newres have to be locked. Resource spin-locks are nested within
506          * lock->l_lock, and are taken in the memory address order to avoid
507          * dead-locks.
508          */
509         spin_lock(&lock->l_lock);
510         oldres = lock->l_resource;
511         if (oldres < newres) {
512                 lock_res(oldres);
513                 lock_res_nested(newres, LRT_NEW);
514         } else {
515                 lock_res(newres);
516                 lock_res_nested(oldres, LRT_NEW);
517         }
518         LASSERT(memcmp(new_resid, &oldres->lr_name,
519                        sizeof oldres->lr_name) != 0);
520         lock->l_resource = newres;
521         unlock_res(oldres);
522         unlock_res_and_lock(lock);
523
524         /* ...and the flowers are still standing! */
525         lu_ref_del(&oldres->lr_reference, "lock", lock);
526         ldlm_resource_putref(oldres);
527
528         RETURN(0);
529 }
530 EXPORT_SYMBOL(ldlm_lock_change_resource);
531
532 /*
533  *  HANDLES
534  */
535
536 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
537 {
538         lockh->cookie = lock->l_handle.h_cookie;
539 }
540 EXPORT_SYMBOL(ldlm_lock2handle);
541
542 /* if flags: atomically get the lock and set the flags.
543  *           Return NULL if flag already set
544  */
545
546 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
547                                      __u64 flags)
548 {
549         struct ldlm_lock *lock;
550         ENTRY;
551
552         LASSERT(handle);
553
554         lock = class_handle2object(handle->cookie);
555         if (lock == NULL)
556                 RETURN(NULL);
557
558         /* It's unlikely but possible that someone marked the lock as
559          * destroyed after we did handle2object on it */
560         if (flags == 0 && !lock->l_destroyed) {
561                 lu_ref_add(&lock->l_reference, "handle", cfs_current());
562                 RETURN(lock);
563         }
564
565         lock_res_and_lock(lock);
566
567         LASSERT(lock->l_resource != NULL);
568
569         lu_ref_add_atomic(&lock->l_reference, "handle", cfs_current());
570         if (unlikely(lock->l_destroyed)) {
571                 unlock_res_and_lock(lock);
572                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
573                 LDLM_LOCK_PUT(lock);
574                 RETURN(NULL);
575         }
576
577         if (flags && (lock->l_flags & flags)) {
578                 unlock_res_and_lock(lock);
579                 LDLM_LOCK_PUT(lock);
580                 RETURN(NULL);
581         }
582
583         if (flags)
584                 lock->l_flags |= flags;
585
586         unlock_res_and_lock(lock);
587         RETURN(lock);
588 }
589 EXPORT_SYMBOL(__ldlm_handle2lock);
590
591 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
592 {
593         struct obd_export *exp = lock->l_export?:lock->l_conn_export;
594         /* INODEBITS_INTEROP: If the other side does not support
595          * inodebits, reply with a plain lock descriptor.
596          */
597         if ((lock->l_resource->lr_type == LDLM_IBITS) &&
598             (exp && !(exp->exp_connect_flags & OBD_CONNECT_IBITS))) {
599                 /* Make sure all the right bits are set in this lock we
600                    are going to pass to client */
601                 LASSERTF(lock->l_policy_data.l_inodebits.bits ==
602                          (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
603                           MDS_INODELOCK_LAYOUT),
604                          "Inappropriate inode lock bits during "
605                          "conversion " LPU64 "\n",
606                          lock->l_policy_data.l_inodebits.bits);
607
608                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
609                 desc->l_resource.lr_type = LDLM_PLAIN;
610
611                 /* Convert "new" lock mode to something old client can
612                    understand */
613                 if ((lock->l_req_mode == LCK_CR) ||
614                     (lock->l_req_mode == LCK_CW))
615                         desc->l_req_mode = LCK_PR;
616                 else
617                         desc->l_req_mode = lock->l_req_mode;
618                 if ((lock->l_granted_mode == LCK_CR) ||
619                     (lock->l_granted_mode == LCK_CW)) {
620                         desc->l_granted_mode = LCK_PR;
621                 } else {
622                         /* We never grant PW/EX locks to clients */
623                         LASSERT((lock->l_granted_mode != LCK_PW) &&
624                                 (lock->l_granted_mode != LCK_EX));
625                         desc->l_granted_mode = lock->l_granted_mode;
626                 }
627
628                 /* We do not copy policy here, because there is no
629                    policy for plain locks */
630         } else {
631                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
632                 desc->l_req_mode = lock->l_req_mode;
633                 desc->l_granted_mode = lock->l_granted_mode;
634                 ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
635                                             &lock->l_policy_data,
636                                             &desc->l_policy_data);
637         }
638 }
639 EXPORT_SYMBOL(ldlm_lock2desc);
640
641 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
642                            cfs_list_t *work_list)
643 {
644         if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
645                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
646                 lock->l_flags |= LDLM_FL_AST_SENT;
647                 /* If the enqueuing client said so, tell the AST recipient to
648                  * discard dirty data, rather than writing back. */
649                 if (new->l_flags & LDLM_AST_DISCARD_DATA)
650                         lock->l_flags |= LDLM_FL_DISCARD_DATA;
651                 LASSERT(cfs_list_empty(&lock->l_bl_ast));
652                 cfs_list_add(&lock->l_bl_ast, work_list);
653                 LDLM_LOCK_GET(lock);
654                 LASSERT(lock->l_blocking_lock == NULL);
655                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
656         }
657 }
658
659 void ldlm_add_cp_work_item(struct ldlm_lock *lock, cfs_list_t *work_list)
660 {
661         if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
662                 lock->l_flags |= LDLM_FL_CP_REQD;
663                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
664                 LASSERT(cfs_list_empty(&lock->l_cp_ast));
665                 cfs_list_add(&lock->l_cp_ast, work_list);
666                 LDLM_LOCK_GET(lock);
667         }
668 }
669
670 /* must be called with lr_lock held */
671 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
672                             cfs_list_t *work_list)
673 {
674         ENTRY;
675         check_res_locked(lock->l_resource);
676         if (new)
677                 ldlm_add_bl_work_item(lock, new, work_list);
678         else
679                 ldlm_add_cp_work_item(lock, work_list);
680         EXIT;
681 }
682
683 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
684 {
685         struct ldlm_lock *lock;
686
687         lock = ldlm_handle2lock(lockh);
688         LASSERT(lock != NULL);
689         ldlm_lock_addref_internal(lock, mode);
690         LDLM_LOCK_PUT(lock);
691 }
692 EXPORT_SYMBOL(ldlm_lock_addref);
693
694 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
695 {
696         ldlm_lock_remove_from_lru(lock);
697         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
698                 lock->l_readers++;
699                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
700         }
701         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
702                 lock->l_writers++;
703                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
704         }
705         LDLM_LOCK_GET(lock);
706         lu_ref_add_atomic(&lock->l_reference, "user", lock);
707         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
708 }
709
710 /**
711  * Attempts to addref a lock, and fails if lock is already LDLM_FL_CBPENDING
712  * or destroyed.
713  *
714  * \retval 0 success, lock was addref-ed
715  *
716  * \retval -EAGAIN lock is being canceled.
717  */
718 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
719 {
720         struct ldlm_lock *lock;
721         int               result;
722
723         result = -EAGAIN;
724         lock = ldlm_handle2lock(lockh);
725         if (lock != NULL) {
726                 lock_res_and_lock(lock);
727                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
728                     !(lock->l_flags & LDLM_FL_CBPENDING)) {
729                         ldlm_lock_addref_internal_nolock(lock, mode);
730                         result = 0;
731                 }
732                 unlock_res_and_lock(lock);
733                 LDLM_LOCK_PUT(lock);
734         }
735         return result;
736 }
737 EXPORT_SYMBOL(ldlm_lock_addref_try);
738
739 /* only called for local locks */
740 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
741 {
742         lock_res_and_lock(lock);
743         ldlm_lock_addref_internal_nolock(lock, mode);
744         unlock_res_and_lock(lock);
745 }
746
747 /* only called in ldlm_flock_destroy and for local locks.
748  *  * for LDLM_FLOCK type locks, l_blocking_ast is null, and
749  *   * ldlm_lock_remove_from_lru() does nothing, it is safe
750  *    * for ldlm_flock_destroy usage by dropping some code */
751 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
752 {
753         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
754         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
755                 LASSERT(lock->l_readers > 0);
756                 lu_ref_del(&lock->l_reference, "reader", lock);
757                 lock->l_readers--;
758         }
759         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
760                 LASSERT(lock->l_writers > 0);
761                 lu_ref_del(&lock->l_reference, "writer", lock);
762                 lock->l_writers--;
763         }
764
765         lu_ref_del(&lock->l_reference, "user", lock);
766         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
767 }
768
769 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
770 {
771         struct ldlm_namespace *ns;
772         ENTRY;
773
774         lock_res_and_lock(lock);
775
776         ns = ldlm_lock_to_ns(lock);
777
778         ldlm_lock_decref_internal_nolock(lock, mode);
779
780         if (lock->l_flags & LDLM_FL_LOCAL &&
781             !lock->l_readers && !lock->l_writers) {
782                 /* If this is a local lock on a server namespace and this was
783                  * the last reference, cancel the lock. */
784                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
785                 lock->l_flags |= LDLM_FL_CBPENDING;
786         }
787
788         if (!lock->l_readers && !lock->l_writers &&
789             (lock->l_flags & LDLM_FL_CBPENDING)) {
790                 /* If we received a blocked AST and this was the last reference,
791                  * run the callback. */
792                 if (lock->l_ns_srv && lock->l_export)
793                         CERROR("FL_CBPENDING set on non-local lock--just a "
794                                "warning\n");
795
796                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
797
798                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
799                 ldlm_lock_remove_from_lru(lock);
800                 unlock_res_and_lock(lock);
801
802                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
803                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
804
805                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
806                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
807                         ldlm_handle_bl_callback(ns, NULL, lock);
808         } else if (ns_is_client(ns) &&
809                    !lock->l_readers && !lock->l_writers &&
810                    !(lock->l_flags & LDLM_FL_NO_LRU) &&
811                    !(lock->l_flags & LDLM_FL_BL_AST)) {
812
813                 LDLM_DEBUG(lock, "add lock into lru list");
814
815                 /* If this is a client-side namespace and this was the last
816                  * reference, put it on the LRU. */
817                 ldlm_lock_add_to_lru(lock);
818                 unlock_res_and_lock(lock);
819
820                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
821                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
822
823                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
824                  * are not supported by the server, otherwise, it is done on
825                  * enqueue. */
826                 if (!exp_connect_cancelset(lock->l_conn_export) &&
827                     !ns_connect_lru_resize(ns))
828                         ldlm_cancel_lru(ns, 0, LDLM_ASYNC, 0);
829         } else {
830                 LDLM_DEBUG(lock, "do not add lock into lru list");
831                 unlock_res_and_lock(lock);
832         }
833
834         EXIT;
835 }
836
837 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
838 {
839         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
840         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
841         ldlm_lock_decref_internal(lock, mode);
842         LDLM_LOCK_PUT(lock);
843 }
844 EXPORT_SYMBOL(ldlm_lock_decref);
845
846 /* This will drop a lock reference and mark it for destruction, but will not
847  * necessarily cancel the lock before returning. */
848 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
849 {
850         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
851         ENTRY;
852
853         LASSERT(lock != NULL);
854
855         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
856         lock_res_and_lock(lock);
857         lock->l_flags |= LDLM_FL_CBPENDING;
858         unlock_res_and_lock(lock);
859         ldlm_lock_decref_internal(lock, mode);
860         LDLM_LOCK_PUT(lock);
861 }
862 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
863
864 struct sl_insert_point {
865         cfs_list_t *res_link;
866         cfs_list_t *mode_link;
867         cfs_list_t *policy_link;
868 };
869
870 /*
871  * search_granted_lock
872  *
873  * Description:
874  *      Finds a position to insert the new lock.
875  * Parameters:
876  *      queue [input]:  the granted list where search acts on;
877  *      req [input]:    the lock whose position to be located;
878  *      prev [output]:  positions within 3 lists to insert @req to
879  * Return Value:
880  *      filled @prev
881  * NOTE: called by
882  *  - ldlm_grant_lock_with_skiplist
883  */
884 static void search_granted_lock(cfs_list_t *queue,
885                                 struct ldlm_lock *req,
886                                 struct sl_insert_point *prev)
887 {
888         cfs_list_t *tmp;
889         struct ldlm_lock *lock, *mode_end, *policy_end;
890         ENTRY;
891
892         cfs_list_for_each(tmp, queue) {
893                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
894
895                 mode_end = cfs_list_entry(lock->l_sl_mode.prev,
896                                           struct ldlm_lock, l_sl_mode);
897
898                 if (lock->l_req_mode != req->l_req_mode) {
899                         /* jump to last lock of mode group */
900                         tmp = &mode_end->l_res_link;
901                         continue;
902                 }
903
904                 /* suitable mode group is found */
905                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
906                         /* insert point is last lock of the mode group */
907                         prev->res_link = &mode_end->l_res_link;
908                         prev->mode_link = &mode_end->l_sl_mode;
909                         prev->policy_link = &req->l_sl_policy;
910                         EXIT;
911                         return;
912                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
913                         for (;;) {
914                                 policy_end =
915                                         cfs_list_entry(lock->l_sl_policy.prev,
916                                                        struct ldlm_lock,
917                                                        l_sl_policy);
918
919                                 if (lock->l_policy_data.l_inodebits.bits ==
920                                     req->l_policy_data.l_inodebits.bits) {
921                                         /* insert point is last lock of
922                                          * the policy group */
923                                         prev->res_link =
924                                                 &policy_end->l_res_link;
925                                         prev->mode_link =
926                                                 &policy_end->l_sl_mode;
927                                         prev->policy_link =
928                                                 &policy_end->l_sl_policy;
929                                         EXIT;
930                                         return;
931                                 }
932
933                                 if (policy_end == mode_end)
934                                         /* done with mode group */
935                                         break;
936
937                                 /* go to next policy group within mode group */
938                                 tmp = policy_end->l_res_link.next;
939                                 lock = cfs_list_entry(tmp, struct ldlm_lock,
940                                                       l_res_link);
941                         }  /* loop over policy groups within the mode group */
942
943                         /* insert point is last lock of the mode group,
944                          * new policy group is started */
945                         prev->res_link = &mode_end->l_res_link;
946                         prev->mode_link = &mode_end->l_sl_mode;
947                         prev->policy_link = &req->l_sl_policy;
948                         EXIT;
949                         return;
950                 } else {
951                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
952                         LBUG();
953                 }
954         }
955
956         /* insert point is last lock on the queue,
957          * new mode group and new policy group are started */
958         prev->res_link = queue->prev;
959         prev->mode_link = &req->l_sl_mode;
960         prev->policy_link = &req->l_sl_policy;
961         EXIT;
962         return;
963 }
964
965 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
966                                        struct sl_insert_point *prev)
967 {
968         struct ldlm_resource *res = lock->l_resource;
969         ENTRY;
970
971         check_res_locked(res);
972
973         ldlm_resource_dump(D_INFO, res);
974         LDLM_DEBUG(lock, "About to add lock:");
975
976         if (lock->l_destroyed) {
977                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
978                 return;
979         }
980
981         LASSERT(cfs_list_empty(&lock->l_res_link));
982         LASSERT(cfs_list_empty(&lock->l_sl_mode));
983         LASSERT(cfs_list_empty(&lock->l_sl_policy));
984
985         /*
986          * lock->link == prev->link means lock is first starting the group.
987          * Don't re-add to itself to suppress kernel warnings.
988          */
989         if (&lock->l_res_link != prev->res_link)
990                 cfs_list_add(&lock->l_res_link, prev->res_link);
991         if (&lock->l_sl_mode != prev->mode_link)
992                 cfs_list_add(&lock->l_sl_mode, prev->mode_link);
993         if (&lock->l_sl_policy != prev->policy_link)
994                 cfs_list_add(&lock->l_sl_policy, prev->policy_link);
995
996         EXIT;
997 }
998
999 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
1000 {
1001         struct sl_insert_point prev;
1002         ENTRY;
1003
1004         LASSERT(lock->l_req_mode == lock->l_granted_mode);
1005
1006         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
1007         ldlm_granted_list_add_lock(lock, &prev);
1008         EXIT;
1009 }
1010
1011 /* NOTE: called by
1012  *  - ldlm_lock_enqueue
1013  *  - ldlm_reprocess_queue
1014  *  - ldlm_lock_convert
1015  *
1016  * must be called with lr_lock held
1017  */
1018 void ldlm_grant_lock(struct ldlm_lock *lock, cfs_list_t *work_list)
1019 {
1020         struct ldlm_resource *res = lock->l_resource;
1021         ENTRY;
1022
1023         check_res_locked(res);
1024
1025         lock->l_granted_mode = lock->l_req_mode;
1026         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1027                 ldlm_grant_lock_with_skiplist(lock);
1028         else if (res->lr_type == LDLM_EXTENT)
1029                 ldlm_extent_add_lock(res, lock);
1030         else
1031                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1032
1033         if (lock->l_granted_mode < res->lr_most_restr)
1034                 res->lr_most_restr = lock->l_granted_mode;
1035
1036         if (work_list && lock->l_completion_ast != NULL)
1037                 ldlm_add_ast_work_item(lock, NULL, work_list);
1038
1039         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1040         EXIT;
1041 }
1042
1043 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
1044  * comment above ldlm_lock_match */
1045 static struct ldlm_lock *search_queue(cfs_list_t *queue,
1046                                       ldlm_mode_t *mode,
1047                                       ldlm_policy_data_t *policy,
1048                                       struct ldlm_lock *old_lock,
1049                                       __u64 flags, int unref)
1050 {
1051         struct ldlm_lock *lock;
1052         cfs_list_t       *tmp;
1053
1054         cfs_list_for_each(tmp, queue) {
1055                 ldlm_mode_t match;
1056
1057                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1058
1059                 if (lock == old_lock)
1060                         break;
1061
1062                 /* llite sometimes wants to match locks that will be
1063                  * canceled when their users drop, but we allow it to match
1064                  * if it passes in CBPENDING and the lock still has users.
1065                  * this is generally only going to be used by children
1066                  * whose parents already hold a lock so forward progress
1067                  * can still happen. */
1068                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1069                     !(flags & LDLM_FL_CBPENDING))
1070                         continue;
1071                 if (!unref && lock->l_flags & LDLM_FL_CBPENDING &&
1072                     lock->l_readers == 0 && lock->l_writers == 0)
1073                         continue;
1074
1075                 if (!(lock->l_req_mode & *mode))
1076                         continue;
1077                 match = lock->l_req_mode;
1078
1079                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
1080                     (lock->l_policy_data.l_extent.start >
1081                      policy->l_extent.start ||
1082                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
1083                         continue;
1084
1085                 if (unlikely(match == LCK_GROUP) &&
1086                     lock->l_resource->lr_type == LDLM_EXTENT &&
1087                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1088                         continue;
1089
1090                 /* We match if we have existing lock with same or wider set
1091                    of bits. */
1092                 if (lock->l_resource->lr_type == LDLM_IBITS &&
1093                      ((lock->l_policy_data.l_inodebits.bits &
1094                       policy->l_inodebits.bits) !=
1095                       policy->l_inodebits.bits))
1096                         continue;
1097
1098                 if (!unref &&
1099                     (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1100                      lock->l_failed))
1101                         continue;
1102
1103                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
1104                     !(lock->l_flags & LDLM_FL_LOCAL))
1105                         continue;
1106
1107                 if (flags & LDLM_FL_TEST_LOCK) {
1108                         LDLM_LOCK_GET(lock);
1109                         ldlm_lock_touch_in_lru(lock);
1110                 } else {
1111                         ldlm_lock_addref_internal_nolock(lock, match);
1112                 }
1113                 *mode = match;
1114                 return lock;
1115         }
1116
1117         return NULL;
1118 }
1119
1120 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1121 {
1122         if (!lock->l_failed) {
1123                 lock->l_failed = 1;
1124                 cfs_waitq_broadcast(&lock->l_waitq);
1125         }
1126 }
1127 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1128
1129 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1130 {
1131         lock_res_and_lock(lock);
1132         ldlm_lock_fail_match_locked(lock);
1133         unlock_res_and_lock(lock);
1134 }
1135 EXPORT_SYMBOL(ldlm_lock_fail_match);
1136
1137 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1138 {
1139         lock->l_flags |= LDLM_FL_LVB_READY;
1140         cfs_waitq_broadcast(&lock->l_waitq);
1141 }
1142 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
1143
1144 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1145 {
1146         lock_res_and_lock(lock);
1147         ldlm_lock_allow_match_locked(lock);
1148         unlock_res_and_lock(lock);
1149 }
1150 EXPORT_SYMBOL(ldlm_lock_allow_match);
1151
1152 /* Can be called in two ways:
1153  *
1154  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1155  * for a duplicate of.
1156  *
1157  * Otherwise, all of the fields must be filled in, to match against.
1158  *
1159  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1160  *     server (ie, connh is NULL)
1161  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1162  *     list will be considered
1163  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1164  *     to be canceled can still be matched as long as they still have reader
1165  *     or writer refernces
1166  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1167  *     just tell us if we would have matched.
1168  *
1169  * Returns 1 if it finds an already-existing lock that is compatible; in this
1170  * case, lockh is filled in with a addref()ed lock
1171  *
1172  * we also check security context, if that failed we simply return 0 (to keep
1173  * caller code unchanged), the context failure will be discovered by caller
1174  * sometime later.
1175  */
1176 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
1177                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1178                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1179                             struct lustre_handle *lockh, int unref)
1180 {
1181         struct ldlm_resource *res;
1182         struct ldlm_lock *lock, *old_lock = NULL;
1183         int rc = 0;
1184         ENTRY;
1185
1186         if (ns == NULL) {
1187                 old_lock = ldlm_handle2lock(lockh);
1188                 LASSERT(old_lock);
1189
1190                 ns = ldlm_lock_to_ns(old_lock);
1191                 res_id = &old_lock->l_resource->lr_name;
1192                 type = old_lock->l_resource->lr_type;
1193                 mode = old_lock->l_req_mode;
1194         }
1195
1196         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1197         if (res == NULL) {
1198                 LASSERT(old_lock == NULL);
1199                 RETURN(0);
1200         }
1201
1202         LDLM_RESOURCE_ADDREF(res);
1203         lock_res(res);
1204
1205         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1206                             flags, unref);
1207         if (lock != NULL)
1208                 GOTO(out, rc = 1);
1209         if (flags & LDLM_FL_BLOCK_GRANTED)
1210                 GOTO(out, rc = 0);
1211         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1212                             flags, unref);
1213         if (lock != NULL)
1214                 GOTO(out, rc = 1);
1215         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1216                             flags, unref);
1217         if (lock != NULL)
1218                 GOTO(out, rc = 1);
1219
1220         EXIT;
1221  out:
1222         unlock_res(res);
1223         LDLM_RESOURCE_DELREF(res);
1224         ldlm_resource_putref(res);
1225
1226         if (lock) {
1227                 ldlm_lock2handle(lock, lockh);
1228                 if ((flags & LDLM_FL_LVB_READY) &&
1229                     (!(lock->l_flags & LDLM_FL_LVB_READY))) {
1230                         struct l_wait_info lwi;
1231                         if (lock->l_completion_ast) {
1232                                 int err = lock->l_completion_ast(lock,
1233                                                           LDLM_FL_WAIT_NOREPROC,
1234                                                                  NULL);
1235                                 if (err) {
1236                                         if (flags & LDLM_FL_TEST_LOCK)
1237                                                 LDLM_LOCK_RELEASE(lock);
1238                                         else
1239                                                 ldlm_lock_decref_internal(lock,
1240                                                                           mode);
1241                                         rc = 0;
1242                                         goto out2;
1243                                 }
1244                         }
1245
1246                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1247                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1248
1249                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1250                         l_wait_event(lock->l_waitq,
1251                                      lock->l_flags & LDLM_FL_LVB_READY ||
1252                                      lock->l_destroyed || lock->l_failed,
1253                                      &lwi);
1254                         if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
1255                                 if (flags & LDLM_FL_TEST_LOCK)
1256                                         LDLM_LOCK_RELEASE(lock);
1257                                 else
1258                                         ldlm_lock_decref_internal(lock, mode);
1259                                 rc = 0;
1260                         }
1261                 }
1262         }
1263  out2:
1264         if (rc) {
1265                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1266                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1267                                 res_id->name[2] : policy->l_extent.start,
1268                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1269                                 res_id->name[3] : policy->l_extent.end);
1270
1271                 /* check user's security context */
1272                 if (lock->l_conn_export &&
1273                     sptlrpc_import_check_ctx(
1274                                 class_exp2cliimp(lock->l_conn_export))) {
1275                         if (!(flags & LDLM_FL_TEST_LOCK))
1276                                 ldlm_lock_decref_internal(lock, mode);
1277                         rc = 0;
1278                 }
1279
1280                 if (flags & LDLM_FL_TEST_LOCK)
1281                         LDLM_LOCK_RELEASE(lock);
1282
1283         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1284                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1285                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1286                                   type, mode, res_id->name[0], res_id->name[1],
1287                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1288                                         res_id->name[2] :policy->l_extent.start,
1289                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1290                                         res_id->name[3] : policy->l_extent.end);
1291         }
1292         if (old_lock)
1293                 LDLM_LOCK_PUT(old_lock);
1294
1295         return rc ? mode : 0;
1296 }
1297 EXPORT_SYMBOL(ldlm_lock_match);
1298
1299 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1300                                         __u64 *bits)
1301 {
1302         struct ldlm_lock *lock;
1303         ldlm_mode_t mode = 0;
1304         ENTRY;
1305
1306         lock = ldlm_handle2lock(lockh);
1307         if (lock != NULL) {
1308                 lock_res_and_lock(lock);
1309                 if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1310                     lock->l_failed)
1311                         GOTO(out, mode);
1312
1313                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1314                     lock->l_readers == 0 && lock->l_writers == 0)
1315                         GOTO(out, mode);
1316
1317                 if (bits)
1318                         *bits = lock->l_policy_data.l_inodebits.bits;
1319                 mode = lock->l_granted_mode;
1320                 ldlm_lock_addref_internal_nolock(lock, mode);
1321         }
1322
1323         EXIT;
1324
1325 out:
1326         if (lock != NULL) {
1327                 unlock_res_and_lock(lock);
1328                 LDLM_LOCK_PUT(lock);
1329         }
1330         return mode;
1331 }
1332 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1333
1334 /* Returns a referenced lock */
1335 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1336                                    const struct ldlm_res_id *res_id,
1337                                    ldlm_type_t type,
1338                                    ldlm_mode_t mode,
1339                                    const struct ldlm_callback_suite *cbs,
1340                                    void *data, __u32 lvb_len)
1341 {
1342         struct ldlm_lock *lock;
1343         struct ldlm_resource *res;
1344         ENTRY;
1345
1346         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1347         if (res == NULL)
1348                 RETURN(NULL);
1349
1350         lock = ldlm_lock_new(res);
1351
1352         if (lock == NULL)
1353                 RETURN(NULL);
1354
1355         lock->l_req_mode = mode;
1356         lock->l_ast_data = data;
1357         lock->l_pid = cfs_curproc_pid();
1358         lock->l_ns_srv = !!ns_is_server(ns);
1359         if (cbs) {
1360                 lock->l_blocking_ast = cbs->lcs_blocking;
1361                 lock->l_completion_ast = cbs->lcs_completion;
1362                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1363                 lock->l_weigh_ast = cbs->lcs_weigh;
1364         }
1365
1366         lock->l_tree_node = NULL;
1367         /* if this is the extent lock, allocate the interval tree node */
1368         if (type == LDLM_EXTENT) {
1369                 if (ldlm_interval_alloc(lock) == NULL)
1370                         GOTO(out, 0);
1371         }
1372
1373         if (lvb_len) {
1374                 lock->l_lvb_len = lvb_len;
1375                 OBD_ALLOC(lock->l_lvb_data, lvb_len);
1376                 if (lock->l_lvb_data == NULL)
1377                         GOTO(out, 0);
1378         }
1379
1380         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1381                 GOTO(out, 0);
1382
1383         RETURN(lock);
1384
1385 out:
1386         ldlm_lock_destroy(lock);
1387         LDLM_LOCK_RELEASE(lock);
1388         return NULL;
1389 }
1390
1391 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1392                                struct ldlm_lock **lockp,
1393                                void *cookie, __u64 *flags)
1394 {
1395         struct ldlm_lock *lock = *lockp;
1396         struct ldlm_resource *res = lock->l_resource;
1397         int local = ns_is_client(ldlm_res_to_ns(res));
1398 #ifdef HAVE_SERVER_SUPPORT
1399         ldlm_processing_policy policy;
1400 #endif
1401         ldlm_error_t rc = ELDLM_OK;
1402         struct ldlm_interval *node = NULL;
1403         ENTRY;
1404
1405         lock->l_last_activity = cfs_time_current_sec();
1406         /* policies are not executed on the client or during replay */
1407         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1408             && !local && ns->ns_policy) {
1409                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1410                                    NULL);
1411                 if (rc == ELDLM_LOCK_REPLACED) {
1412                         /* The lock that was returned has already been granted,
1413                          * and placed into lockp.  If it's not the same as the
1414                          * one we passed in, then destroy the old one and our
1415                          * work here is done. */
1416                         if (lock != *lockp) {
1417                                 ldlm_lock_destroy(lock);
1418                                 LDLM_LOCK_RELEASE(lock);
1419                         }
1420                         *flags |= LDLM_FL_LOCK_CHANGED;
1421                         RETURN(0);
1422                 } else if (rc != ELDLM_OK ||
1423                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1424                         ldlm_lock_destroy(lock);
1425                         RETURN(rc);
1426                 }
1427         }
1428
1429         /* For a replaying lock, it might be already in granted list. So
1430          * unlinking the lock will cause the interval node to be freed, we
1431          * have to allocate the interval node early otherwise we can't regrant
1432          * this lock in the future. - jay */
1433         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1434                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1435
1436         lock_res_and_lock(lock);
1437         if (local && lock->l_req_mode == lock->l_granted_mode) {
1438                 /* The server returned a blocked lock, but it was granted
1439                  * before we got a chance to actually enqueue it.  We don't
1440                  * need to do anything else. */
1441                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1442                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1443                 GOTO(out, ELDLM_OK);
1444         }
1445
1446         ldlm_resource_unlink_lock(lock);
1447         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1448                 if (node == NULL) {
1449                         ldlm_lock_destroy_nolock(lock);
1450                         GOTO(out, rc = -ENOMEM);
1451                 }
1452
1453                 CFS_INIT_LIST_HEAD(&node->li_group);
1454                 ldlm_interval_attach(node, lock);
1455                 node = NULL;
1456         }
1457
1458         /* Some flags from the enqueue want to make it into the AST, via the
1459          * lock's l_flags. */
1460         lock->l_flags |= *flags & LDLM_AST_DISCARD_DATA;
1461
1462         /* This distinction between local lock trees is very important; a client
1463          * namespace only has information about locks taken by that client, and
1464          * thus doesn't have enough information to decide for itself if it can
1465          * be granted (below).  In this case, we do exactly what the server
1466          * tells us to do, as dictated by the 'flags'.
1467          *
1468          * We do exactly the same thing during recovery, when the server is
1469          * more or less trusting the clients not to lie.
1470          *
1471          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1472          * granted/converting queues. */
1473         if (local) {
1474                 if (*flags & LDLM_FL_BLOCK_CONV)
1475                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1476                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1477                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1478                 else
1479                         ldlm_grant_lock(lock, NULL);
1480                 GOTO(out, ELDLM_OK);
1481 #ifdef HAVE_SERVER_SUPPORT
1482         } else if (*flags & LDLM_FL_REPLAY) {
1483                 if (*flags & LDLM_FL_BLOCK_CONV) {
1484                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1485                         GOTO(out, ELDLM_OK);
1486                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1487                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1488                         GOTO(out, ELDLM_OK);
1489                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1490                         ldlm_grant_lock(lock, NULL);
1491                         GOTO(out, ELDLM_OK);
1492                 }
1493                 /* If no flags, fall through to normal enqueue path. */
1494         }
1495
1496         policy = ldlm_processing_policy_table[res->lr_type];
1497         policy(lock, flags, 1, &rc, NULL);
1498         GOTO(out, rc);
1499 #else
1500         } else {
1501                 CERROR("This is client-side-only module, cannot handle "
1502                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1503                 LBUG();
1504         }
1505 #endif
1506
1507 out:
1508         unlock_res_and_lock(lock);
1509         if (node)
1510                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1511         return rc;
1512 }
1513
1514 #ifdef HAVE_SERVER_SUPPORT
1515 /* Must be called with namespace taken: queue is waiting or converting. */
1516 int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue,
1517                          cfs_list_t *work_list)
1518 {
1519         cfs_list_t *tmp, *pos;
1520         ldlm_processing_policy policy;
1521         __u64 flags;
1522         int rc = LDLM_ITER_CONTINUE;
1523         ldlm_error_t err;
1524         ENTRY;
1525
1526         check_res_locked(res);
1527
1528         policy = ldlm_processing_policy_table[res->lr_type];
1529         LASSERT(policy);
1530
1531         cfs_list_for_each_safe(tmp, pos, queue) {
1532                 struct ldlm_lock *pending;
1533                 pending = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1534
1535                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1536
1537                 flags = 0;
1538                 rc = policy(pending, &flags, 0, &err, work_list);
1539                 if (rc != LDLM_ITER_CONTINUE)
1540                         break;
1541         }
1542
1543         RETURN(rc);
1544 }
1545 #endif
1546
1547 static int
1548 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1549 {
1550         struct ldlm_cb_set_arg *arg = opaq;
1551         struct ldlm_lock_desc   d;
1552         int                     rc;
1553         struct ldlm_lock       *lock;
1554         ENTRY;
1555
1556         if (cfs_list_empty(arg->list))
1557                 RETURN(-ENOENT);
1558
1559         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1560
1561         /* nobody should touch l_bl_ast */
1562         lock_res_and_lock(lock);
1563         cfs_list_del_init(&lock->l_bl_ast);
1564
1565         LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1566         LASSERT(lock->l_bl_ast_run == 0);
1567         LASSERT(lock->l_blocking_lock);
1568         lock->l_bl_ast_run++;
1569         unlock_res_and_lock(lock);
1570
1571         ldlm_lock2desc(lock->l_blocking_lock, &d);
1572
1573         rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1574         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1575         lock->l_blocking_lock = NULL;
1576         LDLM_LOCK_RELEASE(lock);
1577
1578         RETURN(rc);
1579 }
1580
1581 static int
1582 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1583 {
1584         struct ldlm_cb_set_arg  *arg = opaq;
1585         int                      rc = 0;
1586         struct ldlm_lock        *lock;
1587         ldlm_completion_callback completion_callback;
1588         ENTRY;
1589
1590         if (cfs_list_empty(arg->list))
1591                 RETURN(-ENOENT);
1592
1593         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1594
1595         /* It's possible to receive a completion AST before we've set
1596          * the l_completion_ast pointer: either because the AST arrived
1597          * before the reply, or simply because there's a small race
1598          * window between receiving the reply and finishing the local
1599          * enqueue. (bug 842)
1600          *
1601          * This can't happen with the blocking_ast, however, because we
1602          * will never call the local blocking_ast until we drop our
1603          * reader/writer reference, which we won't do until we get the
1604          * reply and finish enqueueing. */
1605
1606         /* nobody should touch l_cp_ast */
1607         lock_res_and_lock(lock);
1608         cfs_list_del_init(&lock->l_cp_ast);
1609         LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
1610         /* save l_completion_ast since it can be changed by
1611          * mds_intent_policy(), see bug 14225 */
1612         completion_callback = lock->l_completion_ast;
1613         lock->l_flags &= ~LDLM_FL_CP_REQD;
1614         unlock_res_and_lock(lock);
1615
1616         if (completion_callback != NULL)
1617                 rc = completion_callback(lock, 0, (void *)arg);
1618         LDLM_LOCK_RELEASE(lock);
1619
1620         RETURN(rc);
1621 }
1622
1623 static int
1624 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1625 {
1626         struct ldlm_cb_set_arg *arg = opaq;
1627         struct ldlm_lock_desc   desc;
1628         int                     rc;
1629         struct ldlm_lock       *lock;
1630         ENTRY;
1631
1632         if (cfs_list_empty(arg->list))
1633                 RETURN(-ENOENT);
1634
1635         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1636         cfs_list_del_init(&lock->l_rk_ast);
1637
1638         /* the desc just pretend to exclusive */
1639         ldlm_lock2desc(lock, &desc);
1640         desc.l_req_mode = LCK_EX;
1641         desc.l_granted_mode = 0;
1642
1643         rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1644         LDLM_LOCK_RELEASE(lock);
1645
1646         RETURN(rc);
1647 }
1648
1649 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1650 {
1651         struct ldlm_cb_set_arg          *arg = opaq;
1652         struct ldlm_glimpse_work        *gl_work;
1653         struct ldlm_lock                *lock;
1654         int                              rc = 0;
1655         ENTRY;
1656
1657         if (cfs_list_empty(arg->list))
1658                 RETURN(-ENOENT);
1659
1660         gl_work = cfs_list_entry(arg->list->next, struct ldlm_glimpse_work,
1661                                  gl_list);
1662         cfs_list_del_init(&gl_work->gl_list);
1663
1664         lock = gl_work->gl_lock;
1665
1666         /* transfer the glimpse descriptor to ldlm_cb_set_arg */
1667         arg->gl_desc = gl_work->gl_desc;
1668
1669         /* invoke the actual glimpse callback */
1670         if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
1671                 rc = 1;
1672
1673         LDLM_LOCK_RELEASE(lock);
1674
1675         if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
1676                 OBD_FREE_PTR(gl_work);
1677
1678         RETURN(rc);
1679 }
1680
1681 int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
1682                       ldlm_desc_ast_t ast_type)
1683 {
1684         struct ldlm_cb_set_arg *arg;
1685         set_producer_func       work_ast_lock;
1686         int                     rc;
1687
1688         if (cfs_list_empty(rpc_list))
1689                 RETURN(0);
1690
1691         OBD_ALLOC_PTR(arg);
1692         if (arg == NULL)
1693                 RETURN(-ENOMEM);
1694
1695         cfs_atomic_set(&arg->restart, 0);
1696         arg->list = rpc_list;
1697
1698         switch (ast_type) {
1699                 case LDLM_WORK_BL_AST:
1700                         arg->type = LDLM_BL_CALLBACK;
1701                         work_ast_lock = ldlm_work_bl_ast_lock;
1702                         break;
1703                 case LDLM_WORK_CP_AST:
1704                         arg->type = LDLM_CP_CALLBACK;
1705                         work_ast_lock = ldlm_work_cp_ast_lock;
1706                         break;
1707                 case LDLM_WORK_REVOKE_AST:
1708                         arg->type = LDLM_BL_CALLBACK;
1709                         work_ast_lock = ldlm_work_revoke_ast_lock;
1710                         break;
1711                 case LDLM_WORK_GL_AST:
1712                         arg->type = LDLM_GL_CALLBACK;
1713                         work_ast_lock = ldlm_work_gl_ast_lock;
1714                         break;
1715                 default:
1716                         LBUG();
1717         }
1718
1719         /* We create a ptlrpc request set with flow control extension.
1720          * This request set will use the work_ast_lock function to produce new
1721          * requests and will send a new request each time one completes in order
1722          * to keep the number of requests in flight to ns_max_parallel_ast */
1723         arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1724                                      work_ast_lock, arg);
1725         if (arg->set == NULL)
1726                 GOTO(out, rc = -ENOMEM);
1727
1728         ptlrpc_set_wait(arg->set);
1729         ptlrpc_set_destroy(arg->set);
1730
1731         rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0;
1732         GOTO(out, rc);
1733 out:
1734         OBD_FREE_PTR(arg);
1735         return rc;
1736 }
1737
1738 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
1739 {
1740         ldlm_reprocess_all(res);
1741         return LDLM_ITER_CONTINUE;
1742 }
1743
1744 static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1745                               cfs_hlist_node_t *hnode, void *arg)
1746 {
1747         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1748         int    rc;
1749
1750         rc = reprocess_one_queue(res, arg);
1751
1752         return rc == LDLM_ITER_STOP;
1753 }
1754
1755 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
1756 {
1757         ENTRY;
1758
1759         if (ns != NULL) {
1760                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
1761                                          ldlm_reprocess_res, NULL);
1762         }
1763         EXIT;
1764 }
1765 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
1766
1767 void ldlm_reprocess_all(struct ldlm_resource *res)
1768 {
1769         CFS_LIST_HEAD(rpc_list);
1770
1771 #ifdef HAVE_SERVER_SUPPORT
1772         int rc;
1773         ENTRY;
1774         /* Local lock trees don't get reprocessed. */
1775         if (ns_is_client(ldlm_res_to_ns(res))) {
1776                 EXIT;
1777                 return;
1778         }
1779
1780 restart:
1781         lock_res(res);
1782         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
1783         if (rc == LDLM_ITER_CONTINUE)
1784                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
1785         unlock_res(res);
1786
1787         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
1788                                LDLM_WORK_CP_AST);
1789         if (rc == -ERESTART) {
1790                 LASSERT(cfs_list_empty(&rpc_list));
1791                 goto restart;
1792         }
1793 #else
1794         ENTRY;
1795         if (!ns_is_client(ldlm_res_to_ns(res))) {
1796                 CERROR("This is client-side-only module, cannot handle "
1797                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1798                 LBUG();
1799         }
1800 #endif
1801         EXIT;
1802 }
1803
1804 void ldlm_cancel_callback(struct ldlm_lock *lock)
1805 {
1806         check_res_locked(lock->l_resource);
1807         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
1808                 lock->l_flags |= LDLM_FL_CANCEL;
1809                 if (lock->l_blocking_ast) {
1810                         // l_check_no_ns_lock(ns);
1811                         unlock_res_and_lock(lock);
1812                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
1813                                              LDLM_CB_CANCELING);
1814                         lock_res_and_lock(lock);
1815                 } else {
1816                         LDLM_DEBUG(lock, "no blocking ast");
1817                 }
1818         }
1819         lock->l_flags |= LDLM_FL_BL_DONE;
1820 }
1821
1822 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
1823 {
1824         if (req->l_resource->lr_type != LDLM_PLAIN &&
1825             req->l_resource->lr_type != LDLM_IBITS)
1826                 return;
1827
1828         cfs_list_del_init(&req->l_sl_policy);
1829         cfs_list_del_init(&req->l_sl_mode);
1830 }
1831
1832 void ldlm_lock_cancel(struct ldlm_lock *lock)
1833 {
1834         struct ldlm_resource *res;
1835         struct ldlm_namespace *ns;
1836         ENTRY;
1837
1838         lock_res_and_lock(lock);
1839
1840         res = lock->l_resource;
1841         ns  = ldlm_res_to_ns(res);
1842
1843         /* Please do not, no matter how tempting, remove this LBUG without
1844          * talking to me first. -phik */
1845         if (lock->l_readers || lock->l_writers) {
1846                 LDLM_ERROR(lock, "lock still has references");
1847                 LBUG();
1848         }
1849
1850         if (lock->l_waited)
1851                 ldlm_del_waiting_lock(lock);
1852
1853         /* Releases cancel callback. */
1854         ldlm_cancel_callback(lock);
1855
1856         /* Yes, second time, just in case it was added again while we were
1857            running with no res lock in ldlm_cancel_callback */
1858         if (lock->l_waited)
1859                 ldlm_del_waiting_lock(lock);
1860
1861         ldlm_resource_unlink_lock(lock);
1862         ldlm_lock_destroy_nolock(lock);
1863
1864         if (lock->l_granted_mode == lock->l_req_mode)
1865                 ldlm_pool_del(&ns->ns_pool, lock);
1866
1867         /* Make sure we will not be called again for same lock what is possible
1868          * if not to zero out lock->l_granted_mode */
1869         lock->l_granted_mode = LCK_MINMODE;
1870         unlock_res_and_lock(lock);
1871
1872         EXIT;
1873 }
1874 EXPORT_SYMBOL(ldlm_lock_cancel);
1875
1876 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
1877 {
1878         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1879         int rc = -EINVAL;
1880         ENTRY;
1881
1882         if (lock) {
1883                 if (lock->l_ast_data == NULL)
1884                         lock->l_ast_data = data;
1885                 if (lock->l_ast_data == data)
1886                         rc = 0;
1887                 LDLM_LOCK_PUT(lock);
1888         }
1889         RETURN(rc);
1890 }
1891 EXPORT_SYMBOL(ldlm_lock_set_data);
1892
1893 struct export_cl_data {
1894         struct obd_export       *ecl_exp;
1895         int                     ecl_loop;
1896 };
1897
1898 int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1899                                     cfs_hlist_node_t *hnode, void *data)
1900
1901 {
1902         struct export_cl_data   *ecl = (struct export_cl_data *)data;
1903         struct obd_export       *exp  = ecl->ecl_exp;
1904         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
1905         struct ldlm_resource *res;
1906
1907         res = ldlm_resource_getref(lock->l_resource);
1908         LDLM_LOCK_GET(lock);
1909
1910         LDLM_DEBUG(lock, "export %p", exp);
1911         ldlm_res_lvbo_update(res, NULL, 1);
1912         ldlm_lock_cancel(lock);
1913         ldlm_reprocess_all(res);
1914         ldlm_resource_putref(res);
1915         LDLM_LOCK_RELEASE(lock);
1916
1917         ecl->ecl_loop++;
1918         if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
1919                 CDEBUG(D_INFO,
1920                        "Cancel lock %p for export %p (loop %d), still have "
1921                        "%d locks left on hash table.\n",
1922                        lock, exp, ecl->ecl_loop,
1923                        cfs_atomic_read(&hs->hs_count));
1924         }
1925
1926         return 0;
1927 }
1928
1929 void ldlm_cancel_locks_for_export(struct obd_export *exp)
1930 {
1931         struct export_cl_data   ecl = {
1932                 .ecl_exp        = exp,
1933                 .ecl_loop       = 0,
1934         };
1935
1936         cfs_hash_for_each_empty(exp->exp_lock_hash,
1937                                 ldlm_cancel_locks_for_export_cb, &ecl);
1938 }
1939
1940 /**
1941  * Downgrade an exclusive lock.
1942  *
1943  * A fast variant of ldlm_lock_convert for convertion of exclusive
1944  * locks. The convertion is always successful.
1945  *
1946  * \param lock A lock to convert
1947  * \param new_mode new lock mode
1948  */
1949 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
1950 {
1951         ENTRY;
1952
1953         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
1954         LASSERT(new_mode == LCK_COS);
1955
1956         lock_res_and_lock(lock);
1957         ldlm_resource_unlink_lock(lock);
1958         /*
1959          * Remove the lock from pool as it will be added again in
1960          * ldlm_grant_lock() called below.
1961          */
1962         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
1963
1964         lock->l_req_mode = new_mode;
1965         ldlm_grant_lock(lock, NULL);
1966         unlock_res_and_lock(lock);
1967         ldlm_reprocess_all(lock->l_resource);
1968
1969         EXIT;
1970 }
1971 EXPORT_SYMBOL(ldlm_lock_downgrade);
1972
1973 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
1974                                         __u32 *flags)
1975 {
1976         CFS_LIST_HEAD(rpc_list);
1977         struct ldlm_resource *res;
1978         struct ldlm_namespace *ns;
1979         int granted = 0;
1980 #ifdef HAVE_SERVER_SUPPORT
1981         int old_mode;
1982         struct sl_insert_point prev;
1983 #endif
1984         struct ldlm_interval *node;
1985         ENTRY;
1986
1987         if (new_mode == lock->l_granted_mode) { // No changes? Just return.
1988                 *flags |= LDLM_FL_BLOCK_GRANTED;
1989                 RETURN(lock->l_resource);
1990         }
1991
1992         /* I can't check the type of lock here because the bitlock of lock
1993          * is not held here, so do the allocation blindly. -jay */
1994         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1995         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
1996                 RETURN(NULL);
1997
1998         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
1999                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
2000
2001         lock_res_and_lock(lock);
2002
2003         res = lock->l_resource;
2004         ns  = ldlm_res_to_ns(res);
2005
2006 #ifdef HAVE_SERVER_SUPPORT
2007         old_mode = lock->l_req_mode;
2008 #endif
2009         lock->l_req_mode = new_mode;
2010         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
2011 #ifdef HAVE_SERVER_SUPPORT
2012                 /* remember the lock position where the lock might be
2013                  * added back to the granted list later and also
2014                  * remember the join mode for skiplist fixing. */
2015                 prev.res_link = lock->l_res_link.prev;
2016                 prev.mode_link = lock->l_sl_mode.prev;
2017                 prev.policy_link = lock->l_sl_policy.prev;
2018 #endif
2019                 ldlm_resource_unlink_lock(lock);
2020         } else {
2021                 ldlm_resource_unlink_lock(lock);
2022                 if (res->lr_type == LDLM_EXTENT) {
2023                         /* FIXME: ugly code, I have to attach the lock to a
2024                          * interval node again since perhaps it will be granted
2025                          * soon */
2026                         CFS_INIT_LIST_HEAD(&node->li_group);
2027                         ldlm_interval_attach(node, lock);
2028                         node = NULL;
2029                 }
2030         }
2031
2032         /*
2033          * Remove old lock from the pool before adding the lock with new
2034          * mode below in ->policy()
2035          */
2036         ldlm_pool_del(&ns->ns_pool, lock);
2037
2038         /* If this is a local resource, put it on the appropriate list. */
2039         if (ns_is_client(ldlm_res_to_ns(res))) {
2040                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
2041                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
2042                 } else {
2043                         /* This should never happen, because of the way the
2044                          * server handles conversions. */
2045                         LDLM_ERROR(lock, "Erroneous flags %x on local lock\n",
2046                                    *flags);
2047                         LBUG();
2048
2049                         ldlm_grant_lock(lock, &rpc_list);
2050                         granted = 1;
2051                         /* FIXME: completion handling not with lr_lock held ! */
2052                         if (lock->l_completion_ast)
2053                                 lock->l_completion_ast(lock, 0, NULL);
2054                 }
2055 #ifdef HAVE_SERVER_SUPPORT
2056         } else {
2057                 int rc;
2058                 ldlm_error_t err;
2059                 __u64 pflags = 0;
2060                 ldlm_processing_policy policy;
2061                 policy = ldlm_processing_policy_table[res->lr_type];
2062                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
2063                 if (rc == LDLM_ITER_STOP) {
2064                         lock->l_req_mode = old_mode;
2065                         if (res->lr_type == LDLM_EXTENT)
2066                                 ldlm_extent_add_lock(res, lock);
2067                         else
2068                                 ldlm_granted_list_add_lock(lock, &prev);
2069
2070                         res = NULL;
2071                 } else {
2072                         *flags |= LDLM_FL_BLOCK_GRANTED;
2073                         granted = 1;
2074                 }
2075         }
2076 #else
2077         } else {
2078                 CERROR("This is client-side-only module, cannot handle "
2079                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2080                 LBUG();
2081         }
2082 #endif
2083         unlock_res_and_lock(lock);
2084
2085         if (granted)
2086                 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2087         if (node)
2088                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2089         RETURN(res);
2090 }
2091 EXPORT_SYMBOL(ldlm_lock_convert);
2092
2093 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2094 {
2095         struct ldlm_lock *lock;
2096
2097         if (!((libcfs_debug | D_ERROR) & level))
2098                 return;
2099
2100         lock = ldlm_handle2lock(lockh);
2101         if (lock == NULL)
2102                 return;
2103
2104         LDLM_DEBUG_LIMIT(level, lock, "###");
2105
2106         LDLM_LOCK_PUT(lock);
2107 }
2108 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2109
2110 void _ldlm_lock_debug(struct ldlm_lock *lock,
2111                       struct libcfs_debug_msg_data *msgdata,
2112                       const char *fmt, ...)
2113 {
2114         va_list args;
2115         struct obd_export *exp = lock->l_export;
2116         struct ldlm_resource *resource = lock->l_resource;
2117         char *nid = "local";
2118
2119         va_start(args, fmt);
2120
2121         if (exp && exp->exp_connection) {
2122                 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2123         } else if (exp && exp->exp_obd != NULL) {
2124                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2125                 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2126         }
2127
2128         if (resource == NULL) {
2129                 libcfs_debug_vmsg2(msgdata, fmt, args,
2130                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2131                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2132                        "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
2133                        lock,
2134                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2135                        lock->l_readers, lock->l_writers,
2136                        ldlm_lockname[lock->l_granted_mode],
2137                        ldlm_lockname[lock->l_req_mode],
2138                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2139                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2140                        lock->l_pid, lock->l_callback_timeout);
2141                 va_end(args);
2142                 return;
2143         }
2144
2145         switch (resource->lr_type) {
2146         case LDLM_EXTENT:
2147                 libcfs_debug_vmsg2(msgdata, fmt, args,
2148                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2149                        "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64
2150                        "] (req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote:"
2151                        " "LPX64" expref: %d pid: %u timeout %lu\n",
2152                        ldlm_lock_to_ns_name(lock), lock,
2153                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2154                        lock->l_readers, lock->l_writers,
2155                        ldlm_lockname[lock->l_granted_mode],
2156                        ldlm_lockname[lock->l_req_mode],
2157                        resource->lr_name.name[0],
2158                        resource->lr_name.name[1],
2159                        cfs_atomic_read(&resource->lr_refcount),
2160                        ldlm_typename[resource->lr_type],
2161                        lock->l_policy_data.l_extent.start,
2162                        lock->l_policy_data.l_extent.end,
2163                        lock->l_req_extent.start, lock->l_req_extent.end,
2164                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2165                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2166                        lock->l_pid, lock->l_callback_timeout);
2167                 break;
2168
2169         case LDLM_FLOCK:
2170                 libcfs_debug_vmsg2(msgdata, fmt, args,
2171                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2172                        "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d "
2173                        "["LPU64"->"LPU64"] flags: "LPX64" nid: %s remote: "LPX64
2174                        " expref: %d pid: %u timeout: %lu\n",
2175                        ldlm_lock_to_ns_name(lock), lock,
2176                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2177                        lock->l_readers, lock->l_writers,
2178                        ldlm_lockname[lock->l_granted_mode],
2179                        ldlm_lockname[lock->l_req_mode],
2180                        resource->lr_name.name[0],
2181                        resource->lr_name.name[1],
2182                        cfs_atomic_read(&resource->lr_refcount),
2183                        ldlm_typename[resource->lr_type],
2184                        lock->l_policy_data.l_flock.pid,
2185                        lock->l_policy_data.l_flock.start,
2186                        lock->l_policy_data.l_flock.end,
2187                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2188                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2189                        lock->l_pid, lock->l_callback_timeout);
2190                 break;
2191
2192         case LDLM_IBITS:
2193                 libcfs_debug_vmsg2(msgdata, fmt, args,
2194                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2195                        "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "
2196                        "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2197                        "pid: %u timeout: %lu\n",
2198                        ldlm_lock_to_ns_name(lock),
2199                        lock, lock->l_handle.h_cookie,
2200                        cfs_atomic_read (&lock->l_refc),
2201                        lock->l_readers, lock->l_writers,
2202                        ldlm_lockname[lock->l_granted_mode],
2203                        ldlm_lockname[lock->l_req_mode],
2204                        resource->lr_name.name[0],
2205                        resource->lr_name.name[1],
2206                        lock->l_policy_data.l_inodebits.bits,
2207                        cfs_atomic_read(&resource->lr_refcount),
2208                        ldlm_typename[resource->lr_type],
2209                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2210                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2211                        lock->l_pid, lock->l_callback_timeout);
2212                 break;
2213
2214         default:
2215                 libcfs_debug_vmsg2(msgdata, fmt, args,
2216                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2217                        "res: "LPU64"/"LPU64" rrc: %d type: %s flags: "LPX64" "
2218                        "nid: %s remote: "LPX64" expref: %d pid: %u timeout %lu"
2219                        "\n",
2220                        ldlm_lock_to_ns_name(lock),
2221                        lock, lock->l_handle.h_cookie,
2222                        cfs_atomic_read (&lock->l_refc),
2223                        lock->l_readers, lock->l_writers,
2224                        ldlm_lockname[lock->l_granted_mode],
2225                        ldlm_lockname[lock->l_req_mode],
2226                        resource->lr_name.name[0],
2227                        resource->lr_name.name[1],
2228                        cfs_atomic_read(&resource->lr_refcount),
2229                        ldlm_typename[resource->lr_type],
2230                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2231                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2232                        lock->l_pid, lock->l_callback_timeout);
2233                 break;
2234         }
2235         va_end(args);
2236 }
2237 EXPORT_SYMBOL(_ldlm_lock_debug);