Whamcloud - gitweb
LU-1428 ldlm: fix a race in ldlm_lock_destroy_internal
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #ifdef __KERNEL__
45 # include <libcfs/libcfs.h>
46 # include <linux/lustre_intent.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include "ldlm_internal.h"
53
54 /* lock types */
55 char *ldlm_lockname[] = {
56         [0] "--",
57         [LCK_EX] "EX",
58         [LCK_PW] "PW",
59         [LCK_PR] "PR",
60         [LCK_CW] "CW",
61         [LCK_CR] "CR",
62         [LCK_NL] "NL",
63         [LCK_GROUP] "GROUP",
64         [LCK_COS] "COS"
65 };
66
67 char *ldlm_typename[] = {
68         [LDLM_PLAIN] "PLN",
69         [LDLM_EXTENT] "EXT",
70         [LDLM_FLOCK] "FLK",
71         [LDLM_IBITS] "IBT",
72 };
73
74 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
75         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
76         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
77         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire18_to_local,
78         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
79 };
80
81 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
82         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
83         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
84         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire21_to_local,
85         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
86 };
87
88 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
89         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_local_to_wire,
90         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_local_to_wire,
91         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_local_to_wire,
92         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_local_to_wire,
93 };
94
95 /**
96  * Converts lock policy from local format to on the wire lock_desc format
97  */
98 void ldlm_convert_policy_to_wire(ldlm_type_t type,
99                                  const ldlm_policy_data_t *lpolicy,
100                                  ldlm_wire_policy_data_t *wpolicy)
101 {
102         ldlm_policy_local_to_wire_t convert;
103
104         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
105
106         convert(lpolicy, wpolicy);
107 }
108
109 /**
110  * Converts lock policy from on the wire lock_desc format to local format
111  */
112 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
113                                   const ldlm_wire_policy_data_t *wpolicy,
114                                   ldlm_policy_data_t *lpolicy)
115 {
116         ldlm_policy_wire_to_local_t convert;
117         int new_client;
118
119         /** some badnes for 2.0.0 clients, but 2.0.0 isn't supported */
120         new_client = (exp->exp_connect_flags & OBD_CONNECT_FULL20) != 0;
121         if (new_client)
122                convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
123         else
124                convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
125
126         convert(wpolicy, lpolicy);
127 }
128
129 char *ldlm_it2str(int it)
130 {
131         switch (it) {
132         case IT_OPEN:
133                 return "open";
134         case IT_CREAT:
135                 return "creat";
136         case (IT_OPEN | IT_CREAT):
137                 return "open|creat";
138         case IT_READDIR:
139                 return "readdir";
140         case IT_GETATTR:
141                 return "getattr";
142         case IT_LOOKUP:
143                 return "lookup";
144         case IT_UNLINK:
145                 return "unlink";
146         case IT_GETXATTR:
147                 return "getxattr";
148         case IT_LAYOUT:
149                 return "layout";
150         default:
151                 CERROR("Unknown intent %d\n", it);
152                 return "UNKNOWN";
153         }
154 }
155
156 extern cfs_mem_cache_t *ldlm_lock_slab;
157
158 #ifdef HAVE_SERVER_SUPPORT
159 static ldlm_processing_policy ldlm_processing_policy_table[] = {
160         [LDLM_PLAIN] ldlm_process_plain_lock,
161         [LDLM_EXTENT] ldlm_process_extent_lock,
162 # ifdef __KERNEL__
163         [LDLM_FLOCK] ldlm_process_flock_lock,
164 # endif
165         [LDLM_IBITS] ldlm_process_inodebits_lock,
166 };
167
168 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
169 {
170         return ldlm_processing_policy_table[res->lr_type];
171 }
172 #endif /* HAVE_SERVER_SUPPORT */
173
174 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
175 {
176         ns->ns_policy = arg;
177 }
178
179 /*
180  * REFCOUNTED LOCK OBJECTS
181  */
182
183
184 /*
185  * Lock refcounts, during creation:
186  *   - one special one for allocation, dec'd only once in destroy
187  *   - one for being a lock that's in-use
188  *   - one for the addref associated with a new lock
189  */
190 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
191 {
192         cfs_atomic_inc(&lock->l_refc);
193         return lock;
194 }
195
196 void ldlm_lock_put(struct ldlm_lock *lock)
197 {
198         ENTRY;
199
200         LASSERT(lock->l_resource != LP_POISON);
201         LASSERT(cfs_atomic_read(&lock->l_refc) > 0);
202         if (cfs_atomic_dec_and_test(&lock->l_refc)) {
203                 struct ldlm_resource *res;
204
205                 LDLM_DEBUG(lock,
206                            "final lock_put on destroyed lock, freeing it.");
207
208                 res = lock->l_resource;
209                 LASSERT(lock->l_destroyed);
210                 LASSERT(cfs_list_empty(&lock->l_res_link));
211                 LASSERT(cfs_list_empty(&lock->l_pending_chain));
212
213                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
214                                      LDLM_NSS_LOCKS);
215                 lu_ref_del(&res->lr_reference, "lock", lock);
216                 ldlm_resource_putref(res);
217                 lock->l_resource = NULL;
218                 if (lock->l_export) {
219                         class_export_lock_put(lock->l_export, lock);
220                         lock->l_export = NULL;
221                 }
222
223                 if (lock->l_lvb_data != NULL)
224                         OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
225
226                 ldlm_interval_free(ldlm_interval_detach(lock));
227                 lu_ref_fini(&lock->l_reference);
228                 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
229         }
230
231         EXIT;
232 }
233
234 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
235 {
236         int rc = 0;
237         if (!cfs_list_empty(&lock->l_lru)) {
238                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
239
240                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
241                 cfs_list_del_init(&lock->l_lru);
242                 if (lock->l_flags & LDLM_FL_SKIPPED)
243                         lock->l_flags &= ~LDLM_FL_SKIPPED;
244                 LASSERT(ns->ns_nr_unused > 0);
245                 ns->ns_nr_unused--;
246                 rc = 1;
247         }
248         return rc;
249 }
250
251 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
252 {
253         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
254         int rc;
255
256         ENTRY;
257         if (lock->l_ns_srv) {
258                 LASSERT(cfs_list_empty(&lock->l_lru));
259                 RETURN(0);
260         }
261
262         cfs_spin_lock(&ns->ns_lock);
263         rc = ldlm_lock_remove_from_lru_nolock(lock);
264         cfs_spin_unlock(&ns->ns_lock);
265         EXIT;
266         return rc;
267 }
268
269 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
270 {
271         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
272
273         lock->l_last_used = cfs_time_current();
274         LASSERT(cfs_list_empty(&lock->l_lru));
275         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
276         cfs_list_add_tail(&lock->l_lru, &ns->ns_unused_list);
277         LASSERT(ns->ns_nr_unused >= 0);
278         ns->ns_nr_unused++;
279 }
280
281 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
282 {
283         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
284
285         ENTRY;
286         cfs_spin_lock(&ns->ns_lock);
287         ldlm_lock_add_to_lru_nolock(lock);
288         cfs_spin_unlock(&ns->ns_lock);
289         EXIT;
290 }
291
292 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
293 {
294         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
295
296         ENTRY;
297         if (lock->l_ns_srv) {
298                 LASSERT(cfs_list_empty(&lock->l_lru));
299                 EXIT;
300                 return;
301         }
302
303         cfs_spin_lock(&ns->ns_lock);
304         if (!cfs_list_empty(&lock->l_lru)) {
305                 ldlm_lock_remove_from_lru_nolock(lock);
306                 ldlm_lock_add_to_lru_nolock(lock);
307         }
308         cfs_spin_unlock(&ns->ns_lock);
309         EXIT;
310 }
311
312 /* This used to have a 'strict' flag, which recovery would use to mark an
313  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
314  * shall explain why it's gone: with the new hash table scheme, once you call
315  * ldlm_lock_destroy, you can never drop your final references on this lock.
316  * Because it's not in the hash table anymore.  -phil */
317 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
318 {
319         ENTRY;
320
321         if (lock->l_readers || lock->l_writers) {
322                 LDLM_ERROR(lock, "lock still has references");
323                 LBUG();
324         }
325
326         if (!cfs_list_empty(&lock->l_res_link)) {
327                 LDLM_ERROR(lock, "lock still on resource");
328                 LBUG();
329         }
330
331         if (lock->l_destroyed) {
332                 LASSERT(cfs_list_empty(&lock->l_lru));
333                 EXIT;
334                 return 0;
335         }
336         lock->l_destroyed = 1;
337
338         if (lock->l_export && lock->l_export->exp_lock_hash) {
339                 /* NB: it's safe to call cfs_hash_del() even lock isn't
340                  * in exp_lock_hash. */
341                 cfs_hash_del(lock->l_export->exp_lock_hash,
342                              &lock->l_remote_handle, &lock->l_exp_hash);
343         }
344
345         ldlm_lock_remove_from_lru(lock);
346         class_handle_unhash(&lock->l_handle);
347
348 #if 0
349         /* Wake anyone waiting for this lock */
350         /* FIXME: I should probably add yet another flag, instead of using
351          * l_export to only call this on clients */
352         if (lock->l_export)
353                 class_export_put(lock->l_export);
354         lock->l_export = NULL;
355         if (lock->l_export && lock->l_completion_ast)
356                 lock->l_completion_ast(lock, 0);
357 #endif
358         EXIT;
359         return 1;
360 }
361
362 void ldlm_lock_destroy(struct ldlm_lock *lock)
363 {
364         int first;
365         ENTRY;
366         lock_res_and_lock(lock);
367         first = ldlm_lock_destroy_internal(lock);
368         unlock_res_and_lock(lock);
369
370         /* drop reference from hashtable only for first destroy */
371         if (first) {
372                 lu_ref_del(&lock->l_reference, "hash", lock);
373                 LDLM_LOCK_RELEASE(lock);
374         }
375         EXIT;
376 }
377
378 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
379 {
380         int first;
381         ENTRY;
382         first = ldlm_lock_destroy_internal(lock);
383         /* drop reference from hashtable only for first destroy */
384         if (first) {
385                 lu_ref_del(&lock->l_reference, "hash", lock);
386                 LDLM_LOCK_RELEASE(lock);
387         }
388         EXIT;
389 }
390
391 /* this is called by portals_handle2object with the handle lock taken */
392 static void lock_handle_addref(void *lock)
393 {
394         LDLM_LOCK_GET((struct ldlm_lock *)lock);
395 }
396
397 static void lock_handle_free(void *lock, int size)
398 {
399         LASSERT(size == sizeof(struct ldlm_lock));
400         OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
401 }
402
403 struct portals_handle_ops lock_handle_ops = {
404         .hop_addref = lock_handle_addref,
405         .hop_free   = lock_handle_free,
406 };
407
408 /*
409  * usage: pass in a resource on which you have done ldlm_resource_get
410  *        new lock will take over the refcount.
411  * returns: lock with refcount 2 - one for current caller and one for remote
412  */
413 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
414 {
415         struct ldlm_lock *lock;
416         ENTRY;
417
418         if (resource == NULL)
419                 LBUG();
420
421         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, CFS_ALLOC_IO);
422         if (lock == NULL)
423                 RETURN(NULL);
424
425         cfs_spin_lock_init(&lock->l_lock);
426         lock->l_resource = resource;
427         lu_ref_add(&resource->lr_reference, "lock", lock);
428
429         cfs_atomic_set(&lock->l_refc, 2);
430         CFS_INIT_LIST_HEAD(&lock->l_res_link);
431         CFS_INIT_LIST_HEAD(&lock->l_lru);
432         CFS_INIT_LIST_HEAD(&lock->l_pending_chain);
433         CFS_INIT_LIST_HEAD(&lock->l_bl_ast);
434         CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
435         CFS_INIT_LIST_HEAD(&lock->l_rk_ast);
436         cfs_waitq_init(&lock->l_waitq);
437         lock->l_blocking_lock = NULL;
438         CFS_INIT_LIST_HEAD(&lock->l_sl_mode);
439         CFS_INIT_LIST_HEAD(&lock->l_sl_policy);
440         CFS_INIT_HLIST_NODE(&lock->l_exp_hash);
441
442         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
443                              LDLM_NSS_LOCKS);
444         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
445         class_handle_hash(&lock->l_handle, &lock_handle_ops);
446
447         lu_ref_init(&lock->l_reference);
448         lu_ref_add(&lock->l_reference, "hash", lock);
449         lock->l_callback_timeout = 0;
450
451 #if LUSTRE_TRACKS_LOCK_EXP_REFS
452         CFS_INIT_LIST_HEAD(&lock->l_exp_refs_link);
453         lock->l_exp_refs_nr = 0;
454         lock->l_exp_refs_target = NULL;
455 #endif
456         CFS_INIT_LIST_HEAD(&lock->l_exp_list);
457
458         RETURN(lock);
459 }
460
461 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
462                               const struct ldlm_res_id *new_resid)
463 {
464         struct ldlm_resource *oldres = lock->l_resource;
465         struct ldlm_resource *newres;
466         int type;
467         ENTRY;
468
469         LASSERT(ns_is_client(ns));
470
471         lock_res_and_lock(lock);
472         if (memcmp(new_resid, &lock->l_resource->lr_name,
473                    sizeof(lock->l_resource->lr_name)) == 0) {
474                 /* Nothing to do */
475                 unlock_res_and_lock(lock);
476                 RETURN(0);
477         }
478
479         LASSERT(new_resid->name[0] != 0);
480
481         /* This function assumes that the lock isn't on any lists */
482         LASSERT(cfs_list_empty(&lock->l_res_link));
483
484         type = oldres->lr_type;
485         unlock_res_and_lock(lock);
486
487         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
488         if (newres == NULL)
489                 RETURN(-ENOMEM);
490
491         lu_ref_add(&newres->lr_reference, "lock", lock);
492         /*
493          * To flip the lock from the old to the new resource, lock, oldres and
494          * newres have to be locked. Resource spin-locks are nested within
495          * lock->l_lock, and are taken in the memory address order to avoid
496          * dead-locks.
497          */
498         cfs_spin_lock(&lock->l_lock);
499         oldres = lock->l_resource;
500         if (oldres < newres) {
501                 lock_res(oldres);
502                 lock_res_nested(newres, LRT_NEW);
503         } else {
504                 lock_res(newres);
505                 lock_res_nested(oldres, LRT_NEW);
506         }
507         LASSERT(memcmp(new_resid, &oldres->lr_name,
508                        sizeof oldres->lr_name) != 0);
509         lock->l_resource = newres;
510         unlock_res(oldres);
511         unlock_res_and_lock(lock);
512
513         /* ...and the flowers are still standing! */
514         lu_ref_del(&oldres->lr_reference, "lock", lock);
515         ldlm_resource_putref(oldres);
516
517         RETURN(0);
518 }
519
520 /*
521  *  HANDLES
522  */
523
524 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
525 {
526         lockh->cookie = lock->l_handle.h_cookie;
527 }
528
529 /* if flags: atomically get the lock and set the flags.
530  *           Return NULL if flag already set
531  */
532
533 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
534                                      int flags)
535 {
536         struct ldlm_lock *lock;
537         ENTRY;
538
539         LASSERT(handle);
540
541         lock = class_handle2object(handle->cookie);
542         if (lock == NULL)
543                 RETURN(NULL);
544
545         /* It's unlikely but possible that someone marked the lock as
546          * destroyed after we did handle2object on it */
547         if (flags == 0 && !lock->l_destroyed) {
548                 lu_ref_add(&lock->l_reference, "handle", cfs_current());
549                 RETURN(lock);
550         }
551
552         lock_res_and_lock(lock);
553
554         LASSERT(lock->l_resource != NULL);
555
556         lu_ref_add_atomic(&lock->l_reference, "handle", cfs_current());
557         if (unlikely(lock->l_destroyed)) {
558                 unlock_res_and_lock(lock);
559                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
560                 LDLM_LOCK_PUT(lock);
561                 RETURN(NULL);
562         }
563
564         if (flags && (lock->l_flags & flags)) {
565                 unlock_res_and_lock(lock);
566                 LDLM_LOCK_PUT(lock);
567                 RETURN(NULL);
568         }
569
570         if (flags)
571                 lock->l_flags |= flags;
572
573         unlock_res_and_lock(lock);
574         RETURN(lock);
575 }
576
577 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
578 {
579         struct obd_export *exp = lock->l_export?:lock->l_conn_export;
580         /* INODEBITS_INTEROP: If the other side does not support
581          * inodebits, reply with a plain lock descriptor.
582          */
583         if ((lock->l_resource->lr_type == LDLM_IBITS) &&
584             (exp && !(exp->exp_connect_flags & OBD_CONNECT_IBITS))) {
585                 /* Make sure all the right bits are set in this lock we
586                    are going to pass to client */
587                 LASSERTF(lock->l_policy_data.l_inodebits.bits ==
588                          (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
589                           MDS_INODELOCK_LAYOUT),
590                          "Inappropriate inode lock bits during "
591                          "conversion " LPU64 "\n",
592                          lock->l_policy_data.l_inodebits.bits);
593
594                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
595                 desc->l_resource.lr_type = LDLM_PLAIN;
596
597                 /* Convert "new" lock mode to something old client can
598                    understand */
599                 if ((lock->l_req_mode == LCK_CR) ||
600                     (lock->l_req_mode == LCK_CW))
601                         desc->l_req_mode = LCK_PR;
602                 else
603                         desc->l_req_mode = lock->l_req_mode;
604                 if ((lock->l_granted_mode == LCK_CR) ||
605                     (lock->l_granted_mode == LCK_CW)) {
606                         desc->l_granted_mode = LCK_PR;
607                 } else {
608                         /* We never grant PW/EX locks to clients */
609                         LASSERT((lock->l_granted_mode != LCK_PW) &&
610                                 (lock->l_granted_mode != LCK_EX));
611                         desc->l_granted_mode = lock->l_granted_mode;
612                 }
613
614                 /* We do not copy policy here, because there is no
615                    policy for plain locks */
616         } else {
617                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
618                 desc->l_req_mode = lock->l_req_mode;
619                 desc->l_granted_mode = lock->l_granted_mode;
620                 ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
621                                             &lock->l_policy_data,
622                                             &desc->l_policy_data);
623         }
624 }
625
626 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
627                            cfs_list_t *work_list)
628 {
629         if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
630                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
631                 lock->l_flags |= LDLM_FL_AST_SENT;
632                 /* If the enqueuing client said so, tell the AST recipient to
633                  * discard dirty data, rather than writing back. */
634                 if (new->l_flags & LDLM_AST_DISCARD_DATA)
635                         lock->l_flags |= LDLM_FL_DISCARD_DATA;
636                 LASSERT(cfs_list_empty(&lock->l_bl_ast));
637                 cfs_list_add(&lock->l_bl_ast, work_list);
638                 LDLM_LOCK_GET(lock);
639                 LASSERT(lock->l_blocking_lock == NULL);
640                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
641         }
642 }
643
644 void ldlm_add_cp_work_item(struct ldlm_lock *lock, cfs_list_t *work_list)
645 {
646         if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
647                 lock->l_flags |= LDLM_FL_CP_REQD;
648                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
649                 LASSERT(cfs_list_empty(&lock->l_cp_ast));
650                 cfs_list_add(&lock->l_cp_ast, work_list);
651                 LDLM_LOCK_GET(lock);
652         }
653 }
654
655 /* must be called with lr_lock held */
656 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
657                             cfs_list_t *work_list)
658 {
659         ENTRY;
660         check_res_locked(lock->l_resource);
661         if (new)
662                 ldlm_add_bl_work_item(lock, new, work_list);
663         else
664                 ldlm_add_cp_work_item(lock, work_list);
665         EXIT;
666 }
667
668 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
669 {
670         struct ldlm_lock *lock;
671
672         lock = ldlm_handle2lock(lockh);
673         LASSERT(lock != NULL);
674         ldlm_lock_addref_internal(lock, mode);
675         LDLM_LOCK_PUT(lock);
676 }
677
678 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
679 {
680         ldlm_lock_remove_from_lru(lock);
681         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
682                 lock->l_readers++;
683                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
684         }
685         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
686                 lock->l_writers++;
687                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
688         }
689         LDLM_LOCK_GET(lock);
690         lu_ref_add_atomic(&lock->l_reference, "user", lock);
691         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
692 }
693
694 /**
695  * Attempts to addref a lock, and fails if lock is already LDLM_FL_CBPENDING
696  * or destroyed.
697  *
698  * \retval 0 success, lock was addref-ed
699  *
700  * \retval -EAGAIN lock is being canceled.
701  */
702 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
703 {
704         struct ldlm_lock *lock;
705         int               result;
706
707         result = -EAGAIN;
708         lock = ldlm_handle2lock(lockh);
709         if (lock != NULL) {
710                 lock_res_and_lock(lock);
711                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
712                     !(lock->l_flags & LDLM_FL_CBPENDING)) {
713                         ldlm_lock_addref_internal_nolock(lock, mode);
714                         result = 0;
715                 }
716                 unlock_res_and_lock(lock);
717                 LDLM_LOCK_PUT(lock);
718         }
719         return result;
720 }
721
722 /* only called for local locks */
723 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
724 {
725         lock_res_and_lock(lock);
726         ldlm_lock_addref_internal_nolock(lock, mode);
727         unlock_res_and_lock(lock);
728 }
729
730 /* only called in ldlm_flock_destroy and for local locks.
731  *  * for LDLM_FLOCK type locks, l_blocking_ast is null, and
732  *   * ldlm_lock_remove_from_lru() does nothing, it is safe
733  *    * for ldlm_flock_destroy usage by dropping some code */
734 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
735 {
736         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
737         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
738                 LASSERT(lock->l_readers > 0);
739                 lu_ref_del(&lock->l_reference, "reader", lock);
740                 lock->l_readers--;
741         }
742         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
743                 LASSERT(lock->l_writers > 0);
744                 lu_ref_del(&lock->l_reference, "writer", lock);
745                 lock->l_writers--;
746         }
747
748         lu_ref_del(&lock->l_reference, "user", lock);
749         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
750 }
751
752 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
753 {
754         struct ldlm_namespace *ns;
755         ENTRY;
756
757         lock_res_and_lock(lock);
758
759         ns = ldlm_lock_to_ns(lock);
760
761         ldlm_lock_decref_internal_nolock(lock, mode);
762
763         if (lock->l_flags & LDLM_FL_LOCAL &&
764             !lock->l_readers && !lock->l_writers) {
765                 /* If this is a local lock on a server namespace and this was
766                  * the last reference, cancel the lock. */
767                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
768                 lock->l_flags |= LDLM_FL_CBPENDING;
769         }
770
771         if (!lock->l_readers && !lock->l_writers &&
772             (lock->l_flags & LDLM_FL_CBPENDING)) {
773                 /* If we received a blocked AST and this was the last reference,
774                  * run the callback. */
775                 if (lock->l_ns_srv && lock->l_export)
776                         CERROR("FL_CBPENDING set on non-local lock--just a "
777                                "warning\n");
778
779                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
780
781                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
782                 ldlm_lock_remove_from_lru(lock);
783                 unlock_res_and_lock(lock);
784
785                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
786                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
787
788                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
789                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
790                         ldlm_handle_bl_callback(ns, NULL, lock);
791         } else if (ns_is_client(ns) &&
792                    !lock->l_readers && !lock->l_writers &&
793                    !(lock->l_flags & LDLM_FL_NO_LRU) &&
794                    !(lock->l_flags & LDLM_FL_BL_AST)) {
795
796                 LDLM_DEBUG(lock, "add lock into lru list");
797
798                 /* If this is a client-side namespace and this was the last
799                  * reference, put it on the LRU. */
800                 ldlm_lock_add_to_lru(lock);
801                 unlock_res_and_lock(lock);
802
803                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
804                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
805
806                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
807                  * are not supported by the server, otherwise, it is done on
808                  * enqueue. */
809                 if (!exp_connect_cancelset(lock->l_conn_export) &&
810                     !ns_connect_lru_resize(ns))
811                         ldlm_cancel_lru(ns, 0, LDLM_ASYNC, 0);
812         } else {
813                 LDLM_DEBUG(lock, "do not add lock into lru list");
814                 unlock_res_and_lock(lock);
815         }
816
817         EXIT;
818 }
819
820 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
821 {
822         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
823         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
824         ldlm_lock_decref_internal(lock, mode);
825         LDLM_LOCK_PUT(lock);
826 }
827
828 /* This will drop a lock reference and mark it for destruction, but will not
829  * necessarily cancel the lock before returning. */
830 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
831 {
832         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
833         ENTRY;
834
835         LASSERT(lock != NULL);
836
837         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
838         lock_res_and_lock(lock);
839         lock->l_flags |= LDLM_FL_CBPENDING;
840         unlock_res_and_lock(lock);
841         ldlm_lock_decref_internal(lock, mode);
842         LDLM_LOCK_PUT(lock);
843 }
844
845 struct sl_insert_point {
846         cfs_list_t *res_link;
847         cfs_list_t *mode_link;
848         cfs_list_t *policy_link;
849 };
850
851 /*
852  * search_granted_lock
853  *
854  * Description:
855  *      Finds a position to insert the new lock.
856  * Parameters:
857  *      queue [input]:  the granted list where search acts on;
858  *      req [input]:    the lock whose position to be located;
859  *      prev [output]:  positions within 3 lists to insert @req to
860  * Return Value:
861  *      filled @prev
862  * NOTE: called by
863  *  - ldlm_grant_lock_with_skiplist
864  */
865 static void search_granted_lock(cfs_list_t *queue,
866                                 struct ldlm_lock *req,
867                                 struct sl_insert_point *prev)
868 {
869         cfs_list_t *tmp;
870         struct ldlm_lock *lock, *mode_end, *policy_end;
871         ENTRY;
872
873         cfs_list_for_each(tmp, queue) {
874                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
875
876                 mode_end = cfs_list_entry(lock->l_sl_mode.prev,
877                                           struct ldlm_lock, l_sl_mode);
878
879                 if (lock->l_req_mode != req->l_req_mode) {
880                         /* jump to last lock of mode group */
881                         tmp = &mode_end->l_res_link;
882                         continue;
883                 }
884
885                 /* suitable mode group is found */
886                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
887                         /* insert point is last lock of the mode group */
888                         prev->res_link = &mode_end->l_res_link;
889                         prev->mode_link = &mode_end->l_sl_mode;
890                         prev->policy_link = &req->l_sl_policy;
891                         EXIT;
892                         return;
893                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
894                         for (;;) {
895                                 policy_end =
896                                         cfs_list_entry(lock->l_sl_policy.prev,
897                                                        struct ldlm_lock,
898                                                        l_sl_policy);
899
900                                 if (lock->l_policy_data.l_inodebits.bits ==
901                                     req->l_policy_data.l_inodebits.bits) {
902                                         /* insert point is last lock of
903                                          * the policy group */
904                                         prev->res_link =
905                                                 &policy_end->l_res_link;
906                                         prev->mode_link =
907                                                 &policy_end->l_sl_mode;
908                                         prev->policy_link =
909                                                 &policy_end->l_sl_policy;
910                                         EXIT;
911                                         return;
912                                 }
913
914                                 if (policy_end == mode_end)
915                                         /* done with mode group */
916                                         break;
917
918                                 /* go to next policy group within mode group */
919                                 tmp = policy_end->l_res_link.next;
920                                 lock = cfs_list_entry(tmp, struct ldlm_lock,
921                                                       l_res_link);
922                         }  /* loop over policy groups within the mode group */
923
924                         /* insert point is last lock of the mode group,
925                          * new policy group is started */
926                         prev->res_link = &mode_end->l_res_link;
927                         prev->mode_link = &mode_end->l_sl_mode;
928                         prev->policy_link = &req->l_sl_policy;
929                         EXIT;
930                         return;
931                 } else {
932                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
933                         LBUG();
934                 }
935         }
936
937         /* insert point is last lock on the queue,
938          * new mode group and new policy group are started */
939         prev->res_link = queue->prev;
940         prev->mode_link = &req->l_sl_mode;
941         prev->policy_link = &req->l_sl_policy;
942         EXIT;
943         return;
944 }
945
946 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
947                                        struct sl_insert_point *prev)
948 {
949         struct ldlm_resource *res = lock->l_resource;
950         ENTRY;
951
952         check_res_locked(res);
953
954         ldlm_resource_dump(D_INFO, res);
955         LDLM_DEBUG(lock, "About to add lock:");
956
957         if (lock->l_destroyed) {
958                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
959                 return;
960         }
961
962         LASSERT(cfs_list_empty(&lock->l_res_link));
963         LASSERT(cfs_list_empty(&lock->l_sl_mode));
964         LASSERT(cfs_list_empty(&lock->l_sl_policy));
965
966         cfs_list_add(&lock->l_res_link, prev->res_link);
967         cfs_list_add(&lock->l_sl_mode, prev->mode_link);
968         cfs_list_add(&lock->l_sl_policy, prev->policy_link);
969
970         EXIT;
971 }
972
973 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
974 {
975         struct sl_insert_point prev;
976         ENTRY;
977
978         LASSERT(lock->l_req_mode == lock->l_granted_mode);
979
980         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
981         ldlm_granted_list_add_lock(lock, &prev);
982         EXIT;
983 }
984
985 /* NOTE: called by
986  *  - ldlm_lock_enqueue
987  *  - ldlm_reprocess_queue
988  *  - ldlm_lock_convert
989  *
990  * must be called with lr_lock held
991  */
992 void ldlm_grant_lock(struct ldlm_lock *lock, cfs_list_t *work_list)
993 {
994         struct ldlm_resource *res = lock->l_resource;
995         ENTRY;
996
997         check_res_locked(res);
998
999         lock->l_granted_mode = lock->l_req_mode;
1000         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1001                 ldlm_grant_lock_with_skiplist(lock);
1002         else if (res->lr_type == LDLM_EXTENT)
1003                 ldlm_extent_add_lock(res, lock);
1004         else
1005                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1006
1007         if (lock->l_granted_mode < res->lr_most_restr)
1008                 res->lr_most_restr = lock->l_granted_mode;
1009
1010         if (work_list && lock->l_completion_ast != NULL)
1011                 ldlm_add_ast_work_item(lock, NULL, work_list);
1012
1013         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1014         EXIT;
1015 }
1016
1017 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
1018  * comment above ldlm_lock_match */
1019 static struct ldlm_lock *search_queue(cfs_list_t *queue,
1020                                       ldlm_mode_t *mode,
1021                                       ldlm_policy_data_t *policy,
1022                                       struct ldlm_lock *old_lock,
1023                                       int flags, int unref)
1024 {
1025         struct ldlm_lock *lock;
1026         cfs_list_t       *tmp;
1027
1028         cfs_list_for_each(tmp, queue) {
1029                 ldlm_mode_t match;
1030
1031                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1032
1033                 if (lock == old_lock)
1034                         break;
1035
1036                 /* llite sometimes wants to match locks that will be
1037                  * canceled when their users drop, but we allow it to match
1038                  * if it passes in CBPENDING and the lock still has users.
1039                  * this is generally only going to be used by children
1040                  * whose parents already hold a lock so forward progress
1041                  * can still happen. */
1042                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1043                     !(flags & LDLM_FL_CBPENDING))
1044                         continue;
1045                 if (!unref && lock->l_flags & LDLM_FL_CBPENDING &&
1046                     lock->l_readers == 0 && lock->l_writers == 0)
1047                         continue;
1048
1049                 if (!(lock->l_req_mode & *mode))
1050                         continue;
1051                 match = lock->l_req_mode;
1052
1053                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
1054                     (lock->l_policy_data.l_extent.start >
1055                      policy->l_extent.start ||
1056                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
1057                         continue;
1058
1059                 if (unlikely(match == LCK_GROUP) &&
1060                     lock->l_resource->lr_type == LDLM_EXTENT &&
1061                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1062                         continue;
1063
1064                 /* We match if we have existing lock with same or wider set
1065                    of bits. */
1066                 if (lock->l_resource->lr_type == LDLM_IBITS &&
1067                      ((lock->l_policy_data.l_inodebits.bits &
1068                       policy->l_inodebits.bits) !=
1069                       policy->l_inodebits.bits))
1070                         continue;
1071
1072                 if (!unref &&
1073                     (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1074                      lock->l_failed))
1075                         continue;
1076
1077                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
1078                     !(lock->l_flags & LDLM_FL_LOCAL))
1079                         continue;
1080
1081                 if (flags & LDLM_FL_TEST_LOCK) {
1082                         LDLM_LOCK_GET(lock);
1083                         ldlm_lock_touch_in_lru(lock);
1084                 } else {
1085                         ldlm_lock_addref_internal_nolock(lock, match);
1086                 }
1087                 *mode = match;
1088                 return lock;
1089         }
1090
1091         return NULL;
1092 }
1093
1094 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1095 {
1096         if (!lock->l_failed) {
1097                 lock->l_failed = 1;
1098                 cfs_waitq_broadcast(&lock->l_waitq);
1099         }
1100 }
1101 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1102
1103 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1104 {
1105         lock_res_and_lock(lock);
1106         ldlm_lock_fail_match_locked(lock);
1107         unlock_res_and_lock(lock);
1108 }
1109 EXPORT_SYMBOL(ldlm_lock_fail_match);
1110
1111 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1112 {
1113         lock->l_flags |= LDLM_FL_LVB_READY;
1114         cfs_waitq_broadcast(&lock->l_waitq);
1115 }
1116
1117 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1118 {
1119         lock_res_and_lock(lock);
1120         ldlm_lock_allow_match_locked(lock);
1121         unlock_res_and_lock(lock);
1122 }
1123
1124 /* Can be called in two ways:
1125  *
1126  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1127  * for a duplicate of.
1128  *
1129  * Otherwise, all of the fields must be filled in, to match against.
1130  *
1131  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1132  *     server (ie, connh is NULL)
1133  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1134  *     list will be considered
1135  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1136  *     to be canceled can still be matched as long as they still have reader
1137  *     or writer refernces
1138  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1139  *     just tell us if we would have matched.
1140  *
1141  * Returns 1 if it finds an already-existing lock that is compatible; in this
1142  * case, lockh is filled in with a addref()ed lock
1143  *
1144  * we also check security context, if that failed we simply return 0 (to keep
1145  * caller code unchanged), the context failure will be discovered by caller
1146  * sometime later.
1147  */
1148 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
1149                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1150                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1151                             struct lustre_handle *lockh, int unref)
1152 {
1153         struct ldlm_resource *res;
1154         struct ldlm_lock *lock, *old_lock = NULL;
1155         int rc = 0;
1156         ENTRY;
1157
1158         if (ns == NULL) {
1159                 old_lock = ldlm_handle2lock(lockh);
1160                 LASSERT(old_lock);
1161
1162                 ns = ldlm_lock_to_ns(old_lock);
1163                 res_id = &old_lock->l_resource->lr_name;
1164                 type = old_lock->l_resource->lr_type;
1165                 mode = old_lock->l_req_mode;
1166         }
1167
1168         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1169         if (res == NULL) {
1170                 LASSERT(old_lock == NULL);
1171                 RETURN(0);
1172         }
1173
1174         LDLM_RESOURCE_ADDREF(res);
1175         lock_res(res);
1176
1177         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1178                             flags, unref);
1179         if (lock != NULL)
1180                 GOTO(out, rc = 1);
1181         if (flags & LDLM_FL_BLOCK_GRANTED)
1182                 GOTO(out, rc = 0);
1183         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1184                             flags, unref);
1185         if (lock != NULL)
1186                 GOTO(out, rc = 1);
1187         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1188                             flags, unref);
1189         if (lock != NULL)
1190                 GOTO(out, rc = 1);
1191
1192         EXIT;
1193  out:
1194         unlock_res(res);
1195         LDLM_RESOURCE_DELREF(res);
1196         ldlm_resource_putref(res);
1197
1198         if (lock) {
1199                 ldlm_lock2handle(lock, lockh);
1200                 if ((flags & LDLM_FL_LVB_READY) &&
1201                     (!(lock->l_flags & LDLM_FL_LVB_READY))) {
1202                         struct l_wait_info lwi;
1203                         if (lock->l_completion_ast) {
1204                                 int err = lock->l_completion_ast(lock,
1205                                                           LDLM_FL_WAIT_NOREPROC,
1206                                                                  NULL);
1207                                 if (err) {
1208                                         if (flags & LDLM_FL_TEST_LOCK)
1209                                                 LDLM_LOCK_RELEASE(lock);
1210                                         else
1211                                                 ldlm_lock_decref_internal(lock,
1212                                                                           mode);
1213                                         rc = 0;
1214                                         goto out2;
1215                                 }
1216                         }
1217
1218                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1219                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1220
1221                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1222                         l_wait_event(lock->l_waitq,
1223                                      lock->l_flags & LDLM_FL_LVB_READY ||
1224                                      lock->l_failed,
1225                                      &lwi);
1226                         if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
1227                                 if (flags & LDLM_FL_TEST_LOCK)
1228                                         LDLM_LOCK_RELEASE(lock);
1229                                 else
1230                                         ldlm_lock_decref_internal(lock, mode);
1231                                 rc = 0;
1232                         }
1233                 }
1234         }
1235  out2:
1236         if (rc) {
1237                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1238                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1239                                 res_id->name[2] : policy->l_extent.start,
1240                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1241                                 res_id->name[3] : policy->l_extent.end);
1242
1243                 /* check user's security context */
1244                 if (lock->l_conn_export &&
1245                     sptlrpc_import_check_ctx(
1246                                 class_exp2cliimp(lock->l_conn_export))) {
1247                         if (!(flags & LDLM_FL_TEST_LOCK))
1248                                 ldlm_lock_decref_internal(lock, mode);
1249                         rc = 0;
1250                 }
1251
1252                 if (flags & LDLM_FL_TEST_LOCK)
1253                         LDLM_LOCK_RELEASE(lock);
1254
1255         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1256                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1257                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1258                                   type, mode, res_id->name[0], res_id->name[1],
1259                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1260                                         res_id->name[2] :policy->l_extent.start,
1261                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1262                                         res_id->name[3] : policy->l_extent.end);
1263         }
1264         if (old_lock)
1265                 LDLM_LOCK_PUT(old_lock);
1266
1267         return rc ? mode : 0;
1268 }
1269
1270 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1271                                         __u64 *bits)
1272 {
1273         struct ldlm_lock *lock;
1274         ldlm_mode_t mode = 0;
1275         ENTRY;
1276
1277         lock = ldlm_handle2lock(lockh);
1278         if (lock != NULL) {
1279                 lock_res_and_lock(lock);
1280                 if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1281                     lock->l_failed)
1282                         GOTO(out, mode);
1283
1284                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1285                     lock->l_readers == 0 && lock->l_writers == 0)
1286                         GOTO(out, mode);
1287
1288                 if (bits)
1289                         *bits = lock->l_policy_data.l_inodebits.bits;
1290                 mode = lock->l_granted_mode;
1291                 ldlm_lock_addref_internal_nolock(lock, mode);
1292         }
1293
1294         EXIT;
1295
1296 out:
1297         if (lock != NULL) {
1298                 unlock_res_and_lock(lock);
1299                 LDLM_LOCK_PUT(lock);
1300         }
1301         return mode;
1302 }
1303 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1304
1305 /* Returns a referenced lock */
1306 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1307                                    const struct ldlm_res_id *res_id,
1308                                    ldlm_type_t type,
1309                                    ldlm_mode_t mode,
1310                                    const struct ldlm_callback_suite *cbs,
1311                                    void *data, __u32 lvb_len)
1312 {
1313         struct ldlm_lock *lock;
1314         struct ldlm_resource *res;
1315         ENTRY;
1316
1317         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1318         if (res == NULL)
1319                 RETURN(NULL);
1320
1321         lock = ldlm_lock_new(res);
1322
1323         if (lock == NULL)
1324                 RETURN(NULL);
1325
1326         lock->l_req_mode = mode;
1327         lock->l_ast_data = data;
1328         lock->l_pid = cfs_curproc_pid();
1329         lock->l_ns_srv = !!ns_is_server(ns);
1330         if (cbs) {
1331                 lock->l_blocking_ast = cbs->lcs_blocking;
1332                 lock->l_completion_ast = cbs->lcs_completion;
1333                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1334                 lock->l_weigh_ast = cbs->lcs_weigh;
1335         }
1336
1337         lock->l_tree_node = NULL;
1338         /* if this is the extent lock, allocate the interval tree node */
1339         if (type == LDLM_EXTENT) {
1340                 if (ldlm_interval_alloc(lock) == NULL)
1341                         GOTO(out, 0);
1342         }
1343
1344         if (lvb_len) {
1345                 lock->l_lvb_len = lvb_len;
1346                 OBD_ALLOC(lock->l_lvb_data, lvb_len);
1347                 if (lock->l_lvb_data == NULL)
1348                         GOTO(out, 0);
1349         }
1350
1351         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1352                 GOTO(out, 0);
1353
1354         RETURN(lock);
1355
1356 out:
1357         ldlm_lock_destroy(lock);
1358         LDLM_LOCK_RELEASE(lock);
1359         return NULL;
1360 }
1361
1362 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1363                                struct ldlm_lock **lockp,
1364                                void *cookie, int *flags)
1365 {
1366         struct ldlm_lock *lock = *lockp;
1367         struct ldlm_resource *res = lock->l_resource;
1368         int local = ns_is_client(ldlm_res_to_ns(res));
1369 #ifdef HAVE_SERVER_SUPPORT
1370         ldlm_processing_policy policy;
1371 #endif
1372         ldlm_error_t rc = ELDLM_OK;
1373         struct ldlm_interval *node = NULL;
1374         ENTRY;
1375
1376         lock->l_last_activity = cfs_time_current_sec();
1377         /* policies are not executed on the client or during replay */
1378         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1379             && !local && ns->ns_policy) {
1380                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1381                                    NULL);
1382                 if (rc == ELDLM_LOCK_REPLACED) {
1383                         /* The lock that was returned has already been granted,
1384                          * and placed into lockp.  If it's not the same as the
1385                          * one we passed in, then destroy the old one and our
1386                          * work here is done. */
1387                         if (lock != *lockp) {
1388                                 ldlm_lock_destroy(lock);
1389                                 LDLM_LOCK_RELEASE(lock);
1390                         }
1391                         *flags |= LDLM_FL_LOCK_CHANGED;
1392                         RETURN(0);
1393                 } else if (rc != ELDLM_OK ||
1394                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1395                         ldlm_lock_destroy(lock);
1396                         RETURN(rc);
1397                 }
1398         }
1399
1400         /* For a replaying lock, it might be already in granted list. So
1401          * unlinking the lock will cause the interval node to be freed, we
1402          * have to allocate the interval node early otherwise we can't regrant
1403          * this lock in the future. - jay */
1404         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1405                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1406
1407         lock_res_and_lock(lock);
1408         if (local && lock->l_req_mode == lock->l_granted_mode) {
1409                 /* The server returned a blocked lock, but it was granted
1410                  * before we got a chance to actually enqueue it.  We don't
1411                  * need to do anything else. */
1412                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1413                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1414                 GOTO(out, ELDLM_OK);
1415         }
1416
1417         ldlm_resource_unlink_lock(lock);
1418         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1419                 if (node == NULL) {
1420                         ldlm_lock_destroy_nolock(lock);
1421                         GOTO(out, rc = -ENOMEM);
1422                 }
1423
1424                 CFS_INIT_LIST_HEAD(&node->li_group);
1425                 ldlm_interval_attach(node, lock);
1426                 node = NULL;
1427         }
1428
1429         /* Some flags from the enqueue want to make it into the AST, via the
1430          * lock's l_flags. */
1431         lock->l_flags |= *flags & LDLM_AST_DISCARD_DATA;
1432
1433         /* This distinction between local lock trees is very important; a client
1434          * namespace only has information about locks taken by that client, and
1435          * thus doesn't have enough information to decide for itself if it can
1436          * be granted (below).  In this case, we do exactly what the server
1437          * tells us to do, as dictated by the 'flags'.
1438          *
1439          * We do exactly the same thing during recovery, when the server is
1440          * more or less trusting the clients not to lie.
1441          *
1442          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1443          * granted/converting queues. */
1444         if (local) {
1445                 if (*flags & LDLM_FL_BLOCK_CONV)
1446                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1447                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1448                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1449                 else
1450                         ldlm_grant_lock(lock, NULL);
1451                 GOTO(out, ELDLM_OK);
1452 #ifdef HAVE_SERVER_SUPPORT
1453         } else if (*flags & LDLM_FL_REPLAY) {
1454                 if (*flags & LDLM_FL_BLOCK_CONV) {
1455                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1456                         GOTO(out, ELDLM_OK);
1457                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1458                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1459                         GOTO(out, ELDLM_OK);
1460                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1461                         ldlm_grant_lock(lock, NULL);
1462                         GOTO(out, ELDLM_OK);
1463                 }
1464                 /* If no flags, fall through to normal enqueue path. */
1465         }
1466
1467         policy = ldlm_processing_policy_table[res->lr_type];
1468         policy(lock, flags, 1, &rc, NULL);
1469         GOTO(out, rc);
1470 #else
1471         } else {
1472                 CERROR("This is client-side-only module, cannot handle "
1473                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1474                 LBUG();
1475         }
1476 #endif
1477
1478 out:
1479         unlock_res_and_lock(lock);
1480         if (node)
1481                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1482         return rc;
1483 }
1484
1485 #ifdef HAVE_SERVER_SUPPORT
1486 /* Must be called with namespace taken: queue is waiting or converting. */
1487 int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue,
1488                          cfs_list_t *work_list)
1489 {
1490         cfs_list_t *tmp, *pos;
1491         ldlm_processing_policy policy;
1492         int flags;
1493         int rc = LDLM_ITER_CONTINUE;
1494         ldlm_error_t err;
1495         ENTRY;
1496
1497         check_res_locked(res);
1498
1499         policy = ldlm_processing_policy_table[res->lr_type];
1500         LASSERT(policy);
1501
1502         cfs_list_for_each_safe(tmp, pos, queue) {
1503                 struct ldlm_lock *pending;
1504                 pending = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1505
1506                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1507
1508                 flags = 0;
1509                 rc = policy(pending, &flags, 0, &err, work_list);
1510                 if (rc != LDLM_ITER_CONTINUE)
1511                         break;
1512         }
1513
1514         RETURN(rc);
1515 }
1516 #endif
1517
1518 static int
1519 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1520 {
1521         struct ldlm_cb_set_arg *arg = opaq;
1522         struct ldlm_lock_desc   d;
1523         int                     rc;
1524         struct ldlm_lock       *lock;
1525         ENTRY;
1526
1527         if (cfs_list_empty(arg->list))
1528                 RETURN(-ENOENT);
1529
1530         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1531
1532         /* nobody should touch l_bl_ast */
1533         lock_res_and_lock(lock);
1534         cfs_list_del_init(&lock->l_bl_ast);
1535
1536         LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1537         LASSERT(lock->l_bl_ast_run == 0);
1538         LASSERT(lock->l_blocking_lock);
1539         lock->l_bl_ast_run++;
1540         unlock_res_and_lock(lock);
1541
1542         ldlm_lock2desc(lock->l_blocking_lock, &d);
1543
1544         rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1545         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1546         lock->l_blocking_lock = NULL;
1547         LDLM_LOCK_RELEASE(lock);
1548
1549         RETURN(rc);
1550 }
1551
1552 static int
1553 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1554 {
1555         struct ldlm_cb_set_arg  *arg = opaq;
1556         int                      rc = 0;
1557         struct ldlm_lock        *lock;
1558         ldlm_completion_callback completion_callback;
1559         ENTRY;
1560
1561         if (cfs_list_empty(arg->list))
1562                 RETURN(-ENOENT);
1563
1564         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1565
1566         /* It's possible to receive a completion AST before we've set
1567          * the l_completion_ast pointer: either because the AST arrived
1568          * before the reply, or simply because there's a small race
1569          * window between receiving the reply and finishing the local
1570          * enqueue. (bug 842)
1571          *
1572          * This can't happen with the blocking_ast, however, because we
1573          * will never call the local blocking_ast until we drop our
1574          * reader/writer reference, which we won't do until we get the
1575          * reply and finish enqueueing. */
1576
1577         /* nobody should touch l_cp_ast */
1578         lock_res_and_lock(lock);
1579         cfs_list_del_init(&lock->l_cp_ast);
1580         LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
1581         /* save l_completion_ast since it can be changed by
1582          * mds_intent_policy(), see bug 14225 */
1583         completion_callback = lock->l_completion_ast;
1584         lock->l_flags &= ~LDLM_FL_CP_REQD;
1585         unlock_res_and_lock(lock);
1586
1587         if (completion_callback != NULL)
1588                 rc = completion_callback(lock, 0, (void *)arg);
1589         LDLM_LOCK_RELEASE(lock);
1590
1591         RETURN(rc);
1592 }
1593
1594 static int
1595 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1596 {
1597         struct ldlm_cb_set_arg *arg = opaq;
1598         struct ldlm_lock_desc   desc;
1599         int                     rc;
1600         struct ldlm_lock       *lock;
1601         ENTRY;
1602
1603         if (cfs_list_empty(arg->list))
1604                 RETURN(-ENOENT);
1605
1606         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1607         cfs_list_del_init(&lock->l_rk_ast);
1608
1609         /* the desc just pretend to exclusive */
1610         ldlm_lock2desc(lock, &desc);
1611         desc.l_req_mode = LCK_EX;
1612         desc.l_granted_mode = 0;
1613
1614         rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1615         LDLM_LOCK_RELEASE(lock);
1616
1617         RETURN(rc);
1618 }
1619
1620 int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
1621                       ldlm_desc_ast_t ast_type)
1622 {
1623         struct ldlm_cb_set_arg *arg;
1624         set_producer_func       work_ast_lock;
1625         int                     rc;
1626
1627         if (cfs_list_empty(rpc_list))
1628                 RETURN(0);
1629
1630         OBD_ALLOC_PTR(arg);
1631         if (arg == NULL)
1632                 RETURN(-ENOMEM);
1633
1634         cfs_atomic_set(&arg->restart, 0);
1635         arg->list = rpc_list;
1636
1637         switch (ast_type) {
1638                 case LDLM_WORK_BL_AST:
1639                         arg->type = LDLM_BL_CALLBACK;
1640                         work_ast_lock = ldlm_work_bl_ast_lock;
1641                         break;
1642                 case LDLM_WORK_CP_AST:
1643                         arg->type = LDLM_CP_CALLBACK;
1644                         work_ast_lock = ldlm_work_cp_ast_lock;
1645                         break;
1646                 case LDLM_WORK_REVOKE_AST:
1647                         arg->type = LDLM_BL_CALLBACK;
1648                         work_ast_lock = ldlm_work_revoke_ast_lock;
1649                         break;
1650                 default:
1651                         LBUG();
1652         }
1653
1654         /* We create a ptlrpc request set with flow control extension.
1655          * This request set will use the work_ast_lock function to produce new
1656          * requests and will send a new request each time one completes in order
1657          * to keep the number of requests in flight to ns_max_parallel_ast */
1658         arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1659                                      work_ast_lock, arg);
1660         if (arg->set == NULL)
1661                 GOTO(out, rc = -ENOMEM);
1662
1663         ptlrpc_set_wait(arg->set);
1664         ptlrpc_set_destroy(arg->set);
1665
1666         rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0;
1667         GOTO(out, rc);
1668 out:
1669         OBD_FREE_PTR(arg);
1670         return rc;
1671 }
1672
1673 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
1674 {
1675         ldlm_reprocess_all(res);
1676         return LDLM_ITER_CONTINUE;
1677 }
1678
1679 static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1680                               cfs_hlist_node_t *hnode, void *arg)
1681 {
1682         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1683         int    rc;
1684
1685         rc = reprocess_one_queue(res, arg);
1686
1687         return rc == LDLM_ITER_STOP;
1688 }
1689
1690 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
1691 {
1692         ENTRY;
1693
1694         if (ns != NULL) {
1695                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
1696                                          ldlm_reprocess_res, NULL);
1697         }
1698         EXIT;
1699 }
1700
1701 void ldlm_reprocess_all(struct ldlm_resource *res)
1702 {
1703         CFS_LIST_HEAD(rpc_list);
1704
1705 #ifdef HAVE_SERVER_SUPPORT
1706         int rc;
1707         ENTRY;
1708         /* Local lock trees don't get reprocessed. */
1709         if (ns_is_client(ldlm_res_to_ns(res))) {
1710                 EXIT;
1711                 return;
1712         }
1713
1714 restart:
1715         lock_res(res);
1716         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
1717         if (rc == LDLM_ITER_CONTINUE)
1718                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
1719         unlock_res(res);
1720
1721         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
1722                                LDLM_WORK_CP_AST);
1723         if (rc == -ERESTART) {
1724                 LASSERT(cfs_list_empty(&rpc_list));
1725                 goto restart;
1726         }
1727 #else
1728         ENTRY;
1729         if (!ns_is_client(ldlm_res_to_ns(res))) {
1730                 CERROR("This is client-side-only module, cannot handle "
1731                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1732                 LBUG();
1733         }
1734 #endif
1735         EXIT;
1736 }
1737
1738 void ldlm_cancel_callback(struct ldlm_lock *lock)
1739 {
1740         check_res_locked(lock->l_resource);
1741         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
1742                 lock->l_flags |= LDLM_FL_CANCEL;
1743                 if (lock->l_blocking_ast) {
1744                         // l_check_no_ns_lock(ns);
1745                         unlock_res_and_lock(lock);
1746                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
1747                                              LDLM_CB_CANCELING);
1748                         lock_res_and_lock(lock);
1749                 } else {
1750                         LDLM_DEBUG(lock, "no blocking ast");
1751                 }
1752         }
1753         lock->l_flags |= LDLM_FL_BL_DONE;
1754 }
1755
1756 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
1757 {
1758         if (req->l_resource->lr_type != LDLM_PLAIN &&
1759             req->l_resource->lr_type != LDLM_IBITS)
1760                 return;
1761
1762         cfs_list_del_init(&req->l_sl_policy);
1763         cfs_list_del_init(&req->l_sl_mode);
1764 }
1765
1766 void ldlm_lock_cancel(struct ldlm_lock *lock)
1767 {
1768         struct ldlm_resource *res;
1769         struct ldlm_namespace *ns;
1770         ENTRY;
1771
1772         lock_res_and_lock(lock);
1773
1774         res = lock->l_resource;
1775         ns  = ldlm_res_to_ns(res);
1776
1777         /* Please do not, no matter how tempting, remove this LBUG without
1778          * talking to me first. -phik */
1779         if (lock->l_readers || lock->l_writers) {
1780                 LDLM_ERROR(lock, "lock still has references");
1781                 LBUG();
1782         }
1783
1784         ldlm_del_waiting_lock(lock);
1785
1786         /* Releases cancel callback. */
1787         ldlm_cancel_callback(lock);
1788
1789         /* Yes, second time, just in case it was added again while we were
1790            running with no res lock in ldlm_cancel_callback */
1791         ldlm_del_waiting_lock(lock);
1792         ldlm_resource_unlink_lock(lock);
1793         ldlm_lock_destroy_nolock(lock);
1794
1795         if (lock->l_granted_mode == lock->l_req_mode)
1796                 ldlm_pool_del(&ns->ns_pool, lock);
1797
1798         /* Make sure we will not be called again for same lock what is possible
1799          * if not to zero out lock->l_granted_mode */
1800         lock->l_granted_mode = LCK_MINMODE;
1801         unlock_res_and_lock(lock);
1802
1803         EXIT;
1804 }
1805
1806 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
1807 {
1808         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1809         int rc = -EINVAL;
1810         ENTRY;
1811
1812         if (lock) {
1813                 if (lock->l_ast_data == NULL)
1814                         lock->l_ast_data = data;
1815                 if (lock->l_ast_data == data)
1816                         rc = 0;
1817                 LDLM_LOCK_PUT(lock);
1818         }
1819         RETURN(rc);
1820 }
1821 EXPORT_SYMBOL(ldlm_lock_set_data);
1822
1823 struct export_cl_data {
1824         struct obd_export       *ecl_exp;
1825         int                     ecl_loop;
1826 };
1827
1828 int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1829                                     cfs_hlist_node_t *hnode, void *data)
1830
1831 {
1832         struct export_cl_data   *ecl = (struct export_cl_data *)data;
1833         struct obd_export       *exp  = ecl->ecl_exp;
1834         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
1835         struct ldlm_resource *res;
1836
1837         res = ldlm_resource_getref(lock->l_resource);
1838         LDLM_LOCK_GET(lock);
1839
1840         LDLM_DEBUG(lock, "export %p", exp);
1841         ldlm_res_lvbo_update(res, NULL, 1);
1842         ldlm_lock_cancel(lock);
1843         ldlm_reprocess_all(res);
1844         ldlm_resource_putref(res);
1845         LDLM_LOCK_RELEASE(lock);
1846
1847         ecl->ecl_loop++;
1848         if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
1849                 CDEBUG(D_INFO,
1850                        "Cancel lock %p for export %p (loop %d), still have "
1851                        "%d locks left on hash table.\n",
1852                        lock, exp, ecl->ecl_loop,
1853                        cfs_atomic_read(&hs->hs_count));
1854         }
1855
1856         return 0;
1857 }
1858
1859 void ldlm_cancel_locks_for_export(struct obd_export *exp)
1860 {
1861         struct export_cl_data   ecl = {
1862                 .ecl_exp        = exp,
1863                 .ecl_loop       = 0,
1864         };
1865
1866         cfs_hash_for_each_empty(exp->exp_lock_hash,
1867                                 ldlm_cancel_locks_for_export_cb, &ecl);
1868 }
1869
1870 /**
1871  * Downgrade an exclusive lock.
1872  *
1873  * A fast variant of ldlm_lock_convert for convertion of exclusive
1874  * locks. The convertion is always successful.
1875  *
1876  * \param lock A lock to convert
1877  * \param new_mode new lock mode
1878  */
1879 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
1880 {
1881         ENTRY;
1882
1883         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
1884         LASSERT(new_mode == LCK_COS);
1885
1886         lock_res_and_lock(lock);
1887         ldlm_resource_unlink_lock(lock);
1888         /*
1889          * Remove the lock from pool as it will be added again in
1890          * ldlm_grant_lock() called below.
1891          */
1892         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
1893
1894         lock->l_req_mode = new_mode;
1895         ldlm_grant_lock(lock, NULL);
1896         unlock_res_and_lock(lock);
1897         ldlm_reprocess_all(lock->l_resource);
1898
1899         EXIT;
1900 }
1901
1902 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
1903                                         __u32 *flags)
1904 {
1905         CFS_LIST_HEAD(rpc_list);
1906         struct ldlm_resource *res;
1907         struct ldlm_namespace *ns;
1908         int granted = 0;
1909 #ifdef HAVE_SERVER_SUPPORT
1910         int old_mode;
1911         struct sl_insert_point prev;
1912 #endif
1913         struct ldlm_interval *node;
1914         ENTRY;
1915
1916         if (new_mode == lock->l_granted_mode) { // No changes? Just return.
1917                 *flags |= LDLM_FL_BLOCK_GRANTED;
1918                 RETURN(lock->l_resource);
1919         }
1920
1921         /* I can't check the type of lock here because the bitlock of lock
1922          * is not held here, so do the allocation blindly. -jay */
1923         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1924         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
1925                 RETURN(NULL);
1926
1927         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
1928                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
1929
1930         lock_res_and_lock(lock);
1931
1932         res = lock->l_resource;
1933         ns  = ldlm_res_to_ns(res);
1934
1935 #ifdef HAVE_SERVER_SUPPORT
1936         old_mode = lock->l_req_mode;
1937 #endif
1938         lock->l_req_mode = new_mode;
1939         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
1940 #ifdef HAVE_SERVER_SUPPORT
1941                 /* remember the lock position where the lock might be
1942                  * added back to the granted list later and also
1943                  * remember the join mode for skiplist fixing. */
1944                 prev.res_link = lock->l_res_link.prev;
1945                 prev.mode_link = lock->l_sl_mode.prev;
1946                 prev.policy_link = lock->l_sl_policy.prev;
1947 #endif
1948                 ldlm_resource_unlink_lock(lock);
1949         } else {
1950                 ldlm_resource_unlink_lock(lock);
1951                 if (res->lr_type == LDLM_EXTENT) {
1952                         /* FIXME: ugly code, I have to attach the lock to a
1953                          * interval node again since perhaps it will be granted
1954                          * soon */
1955                         CFS_INIT_LIST_HEAD(&node->li_group);
1956                         ldlm_interval_attach(node, lock);
1957                         node = NULL;
1958                 }
1959         }
1960
1961         /*
1962          * Remove old lock from the pool before adding the lock with new
1963          * mode below in ->policy()
1964          */
1965         ldlm_pool_del(&ns->ns_pool, lock);
1966
1967         /* If this is a local resource, put it on the appropriate list. */
1968         if (ns_is_client(ldlm_res_to_ns(res))) {
1969                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
1970                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1971                 } else {
1972                         /* This should never happen, because of the way the
1973                          * server handles conversions. */
1974                         LDLM_ERROR(lock, "Erroneous flags %d on local lock\n",
1975                                    *flags);
1976                         LBUG();
1977
1978                         ldlm_grant_lock(lock, &rpc_list);
1979                         granted = 1;
1980                         /* FIXME: completion handling not with lr_lock held ! */
1981                         if (lock->l_completion_ast)
1982                                 lock->l_completion_ast(lock, 0, NULL);
1983                 }
1984 #ifdef HAVE_SERVER_SUPPORT
1985         } else {
1986                 int rc;
1987                 ldlm_error_t err;
1988                 int pflags = 0;
1989                 ldlm_processing_policy policy;
1990                 policy = ldlm_processing_policy_table[res->lr_type];
1991                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
1992                 if (rc == LDLM_ITER_STOP) {
1993                         lock->l_req_mode = old_mode;
1994                         if (res->lr_type == LDLM_EXTENT)
1995                                 ldlm_extent_add_lock(res, lock);
1996                         else
1997                                 ldlm_granted_list_add_lock(lock, &prev);
1998
1999                         res = NULL;
2000                 } else {
2001                         *flags |= LDLM_FL_BLOCK_GRANTED;
2002                         granted = 1;
2003                 }
2004         }
2005 #else
2006         } else {
2007                 CERROR("This is client-side-only module, cannot handle "
2008                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2009                 LBUG();
2010         }
2011 #endif
2012         unlock_res_and_lock(lock);
2013
2014         if (granted)
2015                 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2016         if (node)
2017                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2018         RETURN(res);
2019 }
2020
2021 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2022 {
2023         struct ldlm_lock *lock;
2024
2025         if (!((libcfs_debug | D_ERROR) & level))
2026                 return;
2027
2028         lock = ldlm_handle2lock(lockh);
2029         if (lock == NULL)
2030                 return;
2031
2032         LDLM_DEBUG_LIMIT(level, lock, "###");
2033
2034         LDLM_LOCK_PUT(lock);
2035 }
2036
2037 void _ldlm_lock_debug(struct ldlm_lock *lock,
2038                       struct libcfs_debug_msg_data *msgdata,
2039                       const char *fmt, ...)
2040 {
2041         va_list args;
2042         struct obd_export *exp = lock->l_export;
2043         struct ldlm_resource *resource = lock->l_resource;
2044         char *nid = "local";
2045
2046         va_start(args, fmt);
2047
2048         if (exp && exp->exp_connection) {
2049                 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2050         } else if (exp && exp->exp_obd != NULL) {
2051                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2052                 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2053         }
2054
2055         if (resource == NULL) {
2056                 libcfs_debug_vmsg2(msgdata, fmt, args,
2057                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2058                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2059                        "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
2060                        lock,
2061                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2062                        lock->l_readers, lock->l_writers,
2063                        ldlm_lockname[lock->l_granted_mode],
2064                        ldlm_lockname[lock->l_req_mode],
2065                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2066                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2067                        lock->l_pid, lock->l_callback_timeout);
2068                 va_end(args);
2069                 return;
2070         }
2071
2072         switch (resource->lr_type) {
2073         case LDLM_EXTENT:
2074                 libcfs_debug_vmsg2(msgdata, fmt, args,
2075                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2076                        "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64
2077                        "] (req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote:"
2078                        " "LPX64" expref: %d pid: %u timeout %lu\n",
2079                        ldlm_lock_to_ns_name(lock), lock,
2080                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2081                        lock->l_readers, lock->l_writers,
2082                        ldlm_lockname[lock->l_granted_mode],
2083                        ldlm_lockname[lock->l_req_mode],
2084                        resource->lr_name.name[0],
2085                        resource->lr_name.name[1],
2086                        cfs_atomic_read(&resource->lr_refcount),
2087                        ldlm_typename[resource->lr_type],
2088                        lock->l_policy_data.l_extent.start,
2089                        lock->l_policy_data.l_extent.end,
2090                        lock->l_req_extent.start, lock->l_req_extent.end,
2091                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2092                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2093                        lock->l_pid, lock->l_callback_timeout);
2094                 break;
2095
2096         case LDLM_FLOCK:
2097                 libcfs_debug_vmsg2(msgdata, fmt, args,
2098                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2099                        "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d "
2100                        "["LPU64"->"LPU64"] flags: "LPX64" nid: %s remote: "LPX64
2101                        " expref: %d pid: %u timeout: %lu\n",
2102                        ldlm_lock_to_ns_name(lock), lock,
2103                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2104                        lock->l_readers, lock->l_writers,
2105                        ldlm_lockname[lock->l_granted_mode],
2106                        ldlm_lockname[lock->l_req_mode],
2107                        resource->lr_name.name[0],
2108                        resource->lr_name.name[1],
2109                        cfs_atomic_read(&resource->lr_refcount),
2110                        ldlm_typename[resource->lr_type],
2111                        lock->l_policy_data.l_flock.pid,
2112                        lock->l_policy_data.l_flock.start,
2113                        lock->l_policy_data.l_flock.end,
2114                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2115                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2116                        lock->l_pid, lock->l_callback_timeout);
2117                 break;
2118
2119         case LDLM_IBITS:
2120                 libcfs_debug_vmsg2(msgdata, fmt, args,
2121                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2122                        "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "
2123                        "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2124                        "pid: %u timeout: %lu\n",
2125                        ldlm_lock_to_ns_name(lock),
2126                        lock, lock->l_handle.h_cookie,
2127                        cfs_atomic_read (&lock->l_refc),
2128                        lock->l_readers, lock->l_writers,
2129                        ldlm_lockname[lock->l_granted_mode],
2130                        ldlm_lockname[lock->l_req_mode],
2131                        resource->lr_name.name[0],
2132                        resource->lr_name.name[1],
2133                        lock->l_policy_data.l_inodebits.bits,
2134                        cfs_atomic_read(&resource->lr_refcount),
2135                        ldlm_typename[resource->lr_type],
2136                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2137                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2138                        lock->l_pid, lock->l_callback_timeout);
2139                 break;
2140
2141         default:
2142                 libcfs_debug_vmsg2(msgdata, fmt, args,
2143                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2144                        "res: "LPU64"/"LPU64" rrc: %d type: %s flags: "LPX64" "
2145                        "nid: %s remote: "LPX64" expref: %d pid: %u timeout %lu"
2146                        "\n",
2147                        ldlm_lock_to_ns_name(lock),
2148                        lock, lock->l_handle.h_cookie,
2149                        cfs_atomic_read (&lock->l_refc),
2150                        lock->l_readers, lock->l_writers,
2151                        ldlm_lockname[lock->l_granted_mode],
2152                        ldlm_lockname[lock->l_req_mode],
2153                        resource->lr_name.name[0],
2154                        resource->lr_name.name[1],
2155                        cfs_atomic_read(&resource->lr_refcount),
2156                        ldlm_typename[resource->lr_type],
2157                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2158                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2159                        lock->l_pid, lock->l_callback_timeout);
2160                 break;
2161         }
2162         va_end(args);
2163 }
2164 EXPORT_SYMBOL(_ldlm_lock_debug);