Whamcloud - gitweb
LU-56 ldlm: SMP improvement for ldlm_lock_cancel
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #ifdef __KERNEL__
45 # include <libcfs/libcfs.h>
46 # include <linux/lustre_intent.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include "ldlm_internal.h"
53
54 /* lock types */
55 char *ldlm_lockname[] = {
56         [0] "--",
57         [LCK_EX] "EX",
58         [LCK_PW] "PW",
59         [LCK_PR] "PR",
60         [LCK_CW] "CW",
61         [LCK_CR] "CR",
62         [LCK_NL] "NL",
63         [LCK_GROUP] "GROUP",
64         [LCK_COS] "COS"
65 };
66
67 char *ldlm_typename[] = {
68         [LDLM_PLAIN] "PLN",
69         [LDLM_EXTENT] "EXT",
70         [LDLM_FLOCK] "FLK",
71         [LDLM_IBITS] "IBT",
72 };
73
74 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
75         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
76         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
77         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire18_to_local,
78         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
79 };
80
81 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
82         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
83         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
84         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire21_to_local,
85         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
86 };
87
88 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
89         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_local_to_wire,
90         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_local_to_wire,
91         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_local_to_wire,
92         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_local_to_wire,
93 };
94
95 /**
96  * Converts lock policy from local format to on the wire lock_desc format
97  */
98 void ldlm_convert_policy_to_wire(ldlm_type_t type,
99                                  const ldlm_policy_data_t *lpolicy,
100                                  ldlm_wire_policy_data_t *wpolicy)
101 {
102         ldlm_policy_local_to_wire_t convert;
103
104         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
105
106         convert(lpolicy, wpolicy);
107 }
108
109 /**
110  * Converts lock policy from on the wire lock_desc format to local format
111  */
112 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
113                                   const ldlm_wire_policy_data_t *wpolicy,
114                                   ldlm_policy_data_t *lpolicy)
115 {
116         ldlm_policy_wire_to_local_t convert;
117         int new_client;
118
119         /** some badnes for 2.0.0 clients, but 2.0.0 isn't supported */
120         new_client = (exp->exp_connect_flags & OBD_CONNECT_FULL20) != 0;
121         if (new_client)
122                convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
123         else
124                convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
125
126         convert(wpolicy, lpolicy);
127 }
128
129 char *ldlm_it2str(int it)
130 {
131         switch (it) {
132         case IT_OPEN:
133                 return "open";
134         case IT_CREAT:
135                 return "creat";
136         case (IT_OPEN | IT_CREAT):
137                 return "open|creat";
138         case IT_READDIR:
139                 return "readdir";
140         case IT_GETATTR:
141                 return "getattr";
142         case IT_LOOKUP:
143                 return "lookup";
144         case IT_UNLINK:
145                 return "unlink";
146         case IT_GETXATTR:
147                 return "getxattr";
148         case IT_LAYOUT:
149                 return "layout";
150         default:
151                 CERROR("Unknown intent %d\n", it);
152                 return "UNKNOWN";
153         }
154 }
155
156 extern cfs_mem_cache_t *ldlm_lock_slab;
157
158 #ifdef HAVE_SERVER_SUPPORT
159 static ldlm_processing_policy ldlm_processing_policy_table[] = {
160         [LDLM_PLAIN] ldlm_process_plain_lock,
161         [LDLM_EXTENT] ldlm_process_extent_lock,
162 # ifdef __KERNEL__
163         [LDLM_FLOCK] ldlm_process_flock_lock,
164 # endif
165         [LDLM_IBITS] ldlm_process_inodebits_lock,
166 };
167
168 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
169 {
170         return ldlm_processing_policy_table[res->lr_type];
171 }
172 #endif /* HAVE_SERVER_SUPPORT */
173
174 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
175 {
176         ns->ns_policy = arg;
177 }
178
179 /*
180  * REFCOUNTED LOCK OBJECTS
181  */
182
183
184 /*
185  * Lock refcounts, during creation:
186  *   - one special one for allocation, dec'd only once in destroy
187  *   - one for being a lock that's in-use
188  *   - one for the addref associated with a new lock
189  */
190 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
191 {
192         cfs_atomic_inc(&lock->l_refc);
193         return lock;
194 }
195
196 void ldlm_lock_put(struct ldlm_lock *lock)
197 {
198         ENTRY;
199
200         LASSERT(lock->l_resource != LP_POISON);
201         LASSERT(cfs_atomic_read(&lock->l_refc) > 0);
202         if (cfs_atomic_dec_and_test(&lock->l_refc)) {
203                 struct ldlm_resource *res;
204
205                 LDLM_DEBUG(lock,
206                            "final lock_put on destroyed lock, freeing it.");
207
208                 res = lock->l_resource;
209                 LASSERT(lock->l_destroyed);
210                 LASSERT(cfs_list_empty(&lock->l_res_link));
211                 LASSERT(cfs_list_empty(&lock->l_pending_chain));
212
213                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
214                                      LDLM_NSS_LOCKS);
215                 lu_ref_del(&res->lr_reference, "lock", lock);
216                 ldlm_resource_putref(res);
217                 lock->l_resource = NULL;
218                 if (lock->l_export) {
219                         class_export_lock_put(lock->l_export, lock);
220                         lock->l_export = NULL;
221                 }
222
223                 if (lock->l_lvb_data != NULL)
224                         OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
225
226                 ldlm_interval_free(ldlm_interval_detach(lock));
227                 lu_ref_fini(&lock->l_reference);
228                 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
229         }
230
231         EXIT;
232 }
233
234 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
235 {
236         int rc = 0;
237         if (!cfs_list_empty(&lock->l_lru)) {
238                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
239
240                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
241                 cfs_list_del_init(&lock->l_lru);
242                 if (lock->l_flags & LDLM_FL_SKIPPED)
243                         lock->l_flags &= ~LDLM_FL_SKIPPED;
244                 LASSERT(ns->ns_nr_unused > 0);
245                 ns->ns_nr_unused--;
246                 rc = 1;
247         }
248         return rc;
249 }
250
251 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
252 {
253         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
254         int rc;
255
256         ENTRY;
257         if (lock->l_ns_srv) {
258                 LASSERT(cfs_list_empty(&lock->l_lru));
259                 RETURN(0);
260         }
261
262         cfs_spin_lock(&ns->ns_lock);
263         rc = ldlm_lock_remove_from_lru_nolock(lock);
264         cfs_spin_unlock(&ns->ns_lock);
265         EXIT;
266         return rc;
267 }
268
269 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
270 {
271         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
272
273         lock->l_last_used = cfs_time_current();
274         LASSERT(cfs_list_empty(&lock->l_lru));
275         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
276         cfs_list_add_tail(&lock->l_lru, &ns->ns_unused_list);
277         LASSERT(ns->ns_nr_unused >= 0);
278         ns->ns_nr_unused++;
279 }
280
281 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
282 {
283         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
284
285         ENTRY;
286         cfs_spin_lock(&ns->ns_lock);
287         ldlm_lock_add_to_lru_nolock(lock);
288         cfs_spin_unlock(&ns->ns_lock);
289         EXIT;
290 }
291
292 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
293 {
294         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
295
296         ENTRY;
297         if (lock->l_ns_srv) {
298                 LASSERT(cfs_list_empty(&lock->l_lru));
299                 EXIT;
300                 return;
301         }
302
303         cfs_spin_lock(&ns->ns_lock);
304         if (!cfs_list_empty(&lock->l_lru)) {
305                 ldlm_lock_remove_from_lru_nolock(lock);
306                 ldlm_lock_add_to_lru_nolock(lock);
307         }
308         cfs_spin_unlock(&ns->ns_lock);
309         EXIT;
310 }
311
312 /* This used to have a 'strict' flag, which recovery would use to mark an
313  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
314  * shall explain why it's gone: with the new hash table scheme, once you call
315  * ldlm_lock_destroy, you can never drop your final references on this lock.
316  * Because it's not in the hash table anymore.  -phil */
317 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
318 {
319         ENTRY;
320
321         if (lock->l_readers || lock->l_writers) {
322                 LDLM_ERROR(lock, "lock still has references");
323                 LBUG();
324         }
325
326         if (!cfs_list_empty(&lock->l_res_link)) {
327                 LDLM_ERROR(lock, "lock still on resource");
328                 LBUG();
329         }
330
331         if (lock->l_destroyed) {
332                 LASSERT(cfs_list_empty(&lock->l_lru));
333                 EXIT;
334                 return 0;
335         }
336         lock->l_destroyed = 1;
337
338         if (lock->l_export && lock->l_export->exp_lock_hash) {
339                 /* NB: it's safe to call cfs_hash_del() even lock isn't
340                  * in exp_lock_hash. */
341                 cfs_hash_del(lock->l_export->exp_lock_hash,
342                              &lock->l_remote_handle, &lock->l_exp_hash);
343         }
344
345         ldlm_lock_remove_from_lru(lock);
346         class_handle_unhash(&lock->l_handle);
347
348 #if 0
349         /* Wake anyone waiting for this lock */
350         /* FIXME: I should probably add yet another flag, instead of using
351          * l_export to only call this on clients */
352         if (lock->l_export)
353                 class_export_put(lock->l_export);
354         lock->l_export = NULL;
355         if (lock->l_export && lock->l_completion_ast)
356                 lock->l_completion_ast(lock, 0);
357 #endif
358         EXIT;
359         return 1;
360 }
361
362 void ldlm_lock_destroy(struct ldlm_lock *lock)
363 {
364         int first;
365         ENTRY;
366         lock_res_and_lock(lock);
367         first = ldlm_lock_destroy_internal(lock);
368         unlock_res_and_lock(lock);
369
370         /* drop reference from hashtable only for first destroy */
371         if (first) {
372                 lu_ref_del(&lock->l_reference, "hash", lock);
373                 LDLM_LOCK_RELEASE(lock);
374         }
375         EXIT;
376 }
377
378 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
379 {
380         int first;
381         ENTRY;
382         first = ldlm_lock_destroy_internal(lock);
383         /* drop reference from hashtable only for first destroy */
384         if (first) {
385                 lu_ref_del(&lock->l_reference, "hash", lock);
386                 LDLM_LOCK_RELEASE(lock);
387         }
388         EXIT;
389 }
390
391 /* this is called by portals_handle2object with the handle lock taken */
392 static void lock_handle_addref(void *lock)
393 {
394         LDLM_LOCK_GET((struct ldlm_lock *)lock);
395 }
396
397 static void lock_handle_free(void *lock, int size)
398 {
399         LASSERT(size == sizeof(struct ldlm_lock));
400         OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
401 }
402
403 struct portals_handle_ops lock_handle_ops = {
404         .hop_addref = lock_handle_addref,
405         .hop_free   = lock_handle_free,
406 };
407
408 /*
409  * usage: pass in a resource on which you have done ldlm_resource_get
410  *        new lock will take over the refcount.
411  * returns: lock with refcount 2 - one for current caller and one for remote
412  */
413 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
414 {
415         struct ldlm_lock *lock;
416         ENTRY;
417
418         if (resource == NULL)
419                 LBUG();
420
421         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, CFS_ALLOC_IO);
422         if (lock == NULL)
423                 RETURN(NULL);
424
425         cfs_spin_lock_init(&lock->l_lock);
426         lock->l_resource = resource;
427         lu_ref_add(&resource->lr_reference, "lock", lock);
428
429         cfs_atomic_set(&lock->l_refc, 2);
430         CFS_INIT_LIST_HEAD(&lock->l_res_link);
431         CFS_INIT_LIST_HEAD(&lock->l_lru);
432         CFS_INIT_LIST_HEAD(&lock->l_pending_chain);
433         CFS_INIT_LIST_HEAD(&lock->l_bl_ast);
434         CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
435         CFS_INIT_LIST_HEAD(&lock->l_rk_ast);
436         cfs_waitq_init(&lock->l_waitq);
437         lock->l_blocking_lock = NULL;
438         CFS_INIT_LIST_HEAD(&lock->l_sl_mode);
439         CFS_INIT_LIST_HEAD(&lock->l_sl_policy);
440         CFS_INIT_HLIST_NODE(&lock->l_exp_hash);
441         CFS_INIT_HLIST_NODE(&lock->l_exp_flock_hash);
442
443         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
444                              LDLM_NSS_LOCKS);
445         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
446         class_handle_hash(&lock->l_handle, &lock_handle_ops);
447
448         lu_ref_init(&lock->l_reference);
449         lu_ref_add(&lock->l_reference, "hash", lock);
450         lock->l_callback_timeout = 0;
451
452 #if LUSTRE_TRACKS_LOCK_EXP_REFS
453         CFS_INIT_LIST_HEAD(&lock->l_exp_refs_link);
454         lock->l_exp_refs_nr = 0;
455         lock->l_exp_refs_target = NULL;
456 #endif
457         CFS_INIT_LIST_HEAD(&lock->l_exp_list);
458
459         RETURN(lock);
460 }
461
462 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
463                               const struct ldlm_res_id *new_resid)
464 {
465         struct ldlm_resource *oldres = lock->l_resource;
466         struct ldlm_resource *newres;
467         int type;
468         ENTRY;
469
470         LASSERT(ns_is_client(ns));
471
472         lock_res_and_lock(lock);
473         if (memcmp(new_resid, &lock->l_resource->lr_name,
474                    sizeof(lock->l_resource->lr_name)) == 0) {
475                 /* Nothing to do */
476                 unlock_res_and_lock(lock);
477                 RETURN(0);
478         }
479
480         LASSERT(new_resid->name[0] != 0);
481
482         /* This function assumes that the lock isn't on any lists */
483         LASSERT(cfs_list_empty(&lock->l_res_link));
484
485         type = oldres->lr_type;
486         unlock_res_and_lock(lock);
487
488         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
489         if (newres == NULL)
490                 RETURN(-ENOMEM);
491
492         lu_ref_add(&newres->lr_reference, "lock", lock);
493         /*
494          * To flip the lock from the old to the new resource, lock, oldres and
495          * newres have to be locked. Resource spin-locks are nested within
496          * lock->l_lock, and are taken in the memory address order to avoid
497          * dead-locks.
498          */
499         cfs_spin_lock(&lock->l_lock);
500         oldres = lock->l_resource;
501         if (oldres < newres) {
502                 lock_res(oldres);
503                 lock_res_nested(newres, LRT_NEW);
504         } else {
505                 lock_res(newres);
506                 lock_res_nested(oldres, LRT_NEW);
507         }
508         LASSERT(memcmp(new_resid, &oldres->lr_name,
509                        sizeof oldres->lr_name) != 0);
510         lock->l_resource = newres;
511         unlock_res(oldres);
512         unlock_res_and_lock(lock);
513
514         /* ...and the flowers are still standing! */
515         lu_ref_del(&oldres->lr_reference, "lock", lock);
516         ldlm_resource_putref(oldres);
517
518         RETURN(0);
519 }
520
521 /*
522  *  HANDLES
523  */
524
525 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
526 {
527         lockh->cookie = lock->l_handle.h_cookie;
528 }
529
530 /* if flags: atomically get the lock and set the flags.
531  *           Return NULL if flag already set
532  */
533
534 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
535                                      int flags)
536 {
537         struct ldlm_lock *lock;
538         ENTRY;
539
540         LASSERT(handle);
541
542         lock = class_handle2object(handle->cookie);
543         if (lock == NULL)
544                 RETURN(NULL);
545
546         /* It's unlikely but possible that someone marked the lock as
547          * destroyed after we did handle2object on it */
548         if (flags == 0 && !lock->l_destroyed) {
549                 lu_ref_add(&lock->l_reference, "handle", cfs_current());
550                 RETURN(lock);
551         }
552
553         lock_res_and_lock(lock);
554
555         LASSERT(lock->l_resource != NULL);
556
557         lu_ref_add_atomic(&lock->l_reference, "handle", cfs_current());
558         if (unlikely(lock->l_destroyed)) {
559                 unlock_res_and_lock(lock);
560                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
561                 LDLM_LOCK_PUT(lock);
562                 RETURN(NULL);
563         }
564
565         if (flags && (lock->l_flags & flags)) {
566                 unlock_res_and_lock(lock);
567                 LDLM_LOCK_PUT(lock);
568                 RETURN(NULL);
569         }
570
571         if (flags)
572                 lock->l_flags |= flags;
573
574         unlock_res_and_lock(lock);
575         RETURN(lock);
576 }
577
578 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
579 {
580         struct obd_export *exp = lock->l_export?:lock->l_conn_export;
581         /* INODEBITS_INTEROP: If the other side does not support
582          * inodebits, reply with a plain lock descriptor.
583          */
584         if ((lock->l_resource->lr_type == LDLM_IBITS) &&
585             (exp && !(exp->exp_connect_flags & OBD_CONNECT_IBITS))) {
586                 /* Make sure all the right bits are set in this lock we
587                    are going to pass to client */
588                 LASSERTF(lock->l_policy_data.l_inodebits.bits ==
589                          (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
590                           MDS_INODELOCK_LAYOUT),
591                          "Inappropriate inode lock bits during "
592                          "conversion " LPU64 "\n",
593                          lock->l_policy_data.l_inodebits.bits);
594
595                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
596                 desc->l_resource.lr_type = LDLM_PLAIN;
597
598                 /* Convert "new" lock mode to something old client can
599                    understand */
600                 if ((lock->l_req_mode == LCK_CR) ||
601                     (lock->l_req_mode == LCK_CW))
602                         desc->l_req_mode = LCK_PR;
603                 else
604                         desc->l_req_mode = lock->l_req_mode;
605                 if ((lock->l_granted_mode == LCK_CR) ||
606                     (lock->l_granted_mode == LCK_CW)) {
607                         desc->l_granted_mode = LCK_PR;
608                 } else {
609                         /* We never grant PW/EX locks to clients */
610                         LASSERT((lock->l_granted_mode != LCK_PW) &&
611                                 (lock->l_granted_mode != LCK_EX));
612                         desc->l_granted_mode = lock->l_granted_mode;
613                 }
614
615                 /* We do not copy policy here, because there is no
616                    policy for plain locks */
617         } else {
618                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
619                 desc->l_req_mode = lock->l_req_mode;
620                 desc->l_granted_mode = lock->l_granted_mode;
621                 ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
622                                             &lock->l_policy_data,
623                                             &desc->l_policy_data);
624         }
625 }
626
627 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
628                            cfs_list_t *work_list)
629 {
630         if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
631                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
632                 lock->l_flags |= LDLM_FL_AST_SENT;
633                 /* If the enqueuing client said so, tell the AST recipient to
634                  * discard dirty data, rather than writing back. */
635                 if (new->l_flags & LDLM_AST_DISCARD_DATA)
636                         lock->l_flags |= LDLM_FL_DISCARD_DATA;
637                 LASSERT(cfs_list_empty(&lock->l_bl_ast));
638                 cfs_list_add(&lock->l_bl_ast, work_list);
639                 LDLM_LOCK_GET(lock);
640                 LASSERT(lock->l_blocking_lock == NULL);
641                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
642         }
643 }
644
645 void ldlm_add_cp_work_item(struct ldlm_lock *lock, cfs_list_t *work_list)
646 {
647         if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
648                 lock->l_flags |= LDLM_FL_CP_REQD;
649                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
650                 LASSERT(cfs_list_empty(&lock->l_cp_ast));
651                 cfs_list_add(&lock->l_cp_ast, work_list);
652                 LDLM_LOCK_GET(lock);
653         }
654 }
655
656 /* must be called with lr_lock held */
657 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
658                             cfs_list_t *work_list)
659 {
660         ENTRY;
661         check_res_locked(lock->l_resource);
662         if (new)
663                 ldlm_add_bl_work_item(lock, new, work_list);
664         else
665                 ldlm_add_cp_work_item(lock, work_list);
666         EXIT;
667 }
668
669 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
670 {
671         struct ldlm_lock *lock;
672
673         lock = ldlm_handle2lock(lockh);
674         LASSERT(lock != NULL);
675         ldlm_lock_addref_internal(lock, mode);
676         LDLM_LOCK_PUT(lock);
677 }
678
679 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
680 {
681         ldlm_lock_remove_from_lru(lock);
682         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
683                 lock->l_readers++;
684                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
685         }
686         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
687                 lock->l_writers++;
688                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
689         }
690         LDLM_LOCK_GET(lock);
691         lu_ref_add_atomic(&lock->l_reference, "user", lock);
692         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
693 }
694
695 /**
696  * Attempts to addref a lock, and fails if lock is already LDLM_FL_CBPENDING
697  * or destroyed.
698  *
699  * \retval 0 success, lock was addref-ed
700  *
701  * \retval -EAGAIN lock is being canceled.
702  */
703 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
704 {
705         struct ldlm_lock *lock;
706         int               result;
707
708         result = -EAGAIN;
709         lock = ldlm_handle2lock(lockh);
710         if (lock != NULL) {
711                 lock_res_and_lock(lock);
712                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
713                     !(lock->l_flags & LDLM_FL_CBPENDING)) {
714                         ldlm_lock_addref_internal_nolock(lock, mode);
715                         result = 0;
716                 }
717                 unlock_res_and_lock(lock);
718                 LDLM_LOCK_PUT(lock);
719         }
720         return result;
721 }
722
723 /* only called for local locks */
724 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
725 {
726         lock_res_and_lock(lock);
727         ldlm_lock_addref_internal_nolock(lock, mode);
728         unlock_res_and_lock(lock);
729 }
730
731 /* only called in ldlm_flock_destroy and for local locks.
732  *  * for LDLM_FLOCK type locks, l_blocking_ast is null, and
733  *   * ldlm_lock_remove_from_lru() does nothing, it is safe
734  *    * for ldlm_flock_destroy usage by dropping some code */
735 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
736 {
737         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
738         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
739                 LASSERT(lock->l_readers > 0);
740                 lu_ref_del(&lock->l_reference, "reader", lock);
741                 lock->l_readers--;
742         }
743         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
744                 LASSERT(lock->l_writers > 0);
745                 lu_ref_del(&lock->l_reference, "writer", lock);
746                 lock->l_writers--;
747         }
748
749         lu_ref_del(&lock->l_reference, "user", lock);
750         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
751 }
752
753 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
754 {
755         struct ldlm_namespace *ns;
756         ENTRY;
757
758         lock_res_and_lock(lock);
759
760         ns = ldlm_lock_to_ns(lock);
761
762         ldlm_lock_decref_internal_nolock(lock, mode);
763
764         if (lock->l_flags & LDLM_FL_LOCAL &&
765             !lock->l_readers && !lock->l_writers) {
766                 /* If this is a local lock on a server namespace and this was
767                  * the last reference, cancel the lock. */
768                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
769                 lock->l_flags |= LDLM_FL_CBPENDING;
770         }
771
772         if (!lock->l_readers && !lock->l_writers &&
773             (lock->l_flags & LDLM_FL_CBPENDING)) {
774                 /* If we received a blocked AST and this was the last reference,
775                  * run the callback. */
776                 if (lock->l_ns_srv && lock->l_export)
777                         CERROR("FL_CBPENDING set on non-local lock--just a "
778                                "warning\n");
779
780                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
781
782                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
783                 ldlm_lock_remove_from_lru(lock);
784                 unlock_res_and_lock(lock);
785
786                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
787                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
788
789                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
790                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
791                         ldlm_handle_bl_callback(ns, NULL, lock);
792         } else if (ns_is_client(ns) &&
793                    !lock->l_readers && !lock->l_writers &&
794                    !(lock->l_flags & LDLM_FL_NO_LRU) &&
795                    !(lock->l_flags & LDLM_FL_BL_AST)) {
796
797                 LDLM_DEBUG(lock, "add lock into lru list");
798
799                 /* If this is a client-side namespace and this was the last
800                  * reference, put it on the LRU. */
801                 ldlm_lock_add_to_lru(lock);
802                 unlock_res_and_lock(lock);
803
804                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
805                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
806
807                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
808                  * are not supported by the server, otherwise, it is done on
809                  * enqueue. */
810                 if (!exp_connect_cancelset(lock->l_conn_export) &&
811                     !ns_connect_lru_resize(ns))
812                         ldlm_cancel_lru(ns, 0, LDLM_ASYNC, 0);
813         } else {
814                 LDLM_DEBUG(lock, "do not add lock into lru list");
815                 unlock_res_and_lock(lock);
816         }
817
818         EXIT;
819 }
820
821 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
822 {
823         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
824         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
825         ldlm_lock_decref_internal(lock, mode);
826         LDLM_LOCK_PUT(lock);
827 }
828
829 /* This will drop a lock reference and mark it for destruction, but will not
830  * necessarily cancel the lock before returning. */
831 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
832 {
833         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
834         ENTRY;
835
836         LASSERT(lock != NULL);
837
838         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
839         lock_res_and_lock(lock);
840         lock->l_flags |= LDLM_FL_CBPENDING;
841         unlock_res_and_lock(lock);
842         ldlm_lock_decref_internal(lock, mode);
843         LDLM_LOCK_PUT(lock);
844 }
845
846 struct sl_insert_point {
847         cfs_list_t *res_link;
848         cfs_list_t *mode_link;
849         cfs_list_t *policy_link;
850 };
851
852 /*
853  * search_granted_lock
854  *
855  * Description:
856  *      Finds a position to insert the new lock.
857  * Parameters:
858  *      queue [input]:  the granted list where search acts on;
859  *      req [input]:    the lock whose position to be located;
860  *      prev [output]:  positions within 3 lists to insert @req to
861  * Return Value:
862  *      filled @prev
863  * NOTE: called by
864  *  - ldlm_grant_lock_with_skiplist
865  */
866 static void search_granted_lock(cfs_list_t *queue,
867                                 struct ldlm_lock *req,
868                                 struct sl_insert_point *prev)
869 {
870         cfs_list_t *tmp;
871         struct ldlm_lock *lock, *mode_end, *policy_end;
872         ENTRY;
873
874         cfs_list_for_each(tmp, queue) {
875                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
876
877                 mode_end = cfs_list_entry(lock->l_sl_mode.prev,
878                                           struct ldlm_lock, l_sl_mode);
879
880                 if (lock->l_req_mode != req->l_req_mode) {
881                         /* jump to last lock of mode group */
882                         tmp = &mode_end->l_res_link;
883                         continue;
884                 }
885
886                 /* suitable mode group is found */
887                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
888                         /* insert point is last lock of the mode group */
889                         prev->res_link = &mode_end->l_res_link;
890                         prev->mode_link = &mode_end->l_sl_mode;
891                         prev->policy_link = &req->l_sl_policy;
892                         EXIT;
893                         return;
894                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
895                         for (;;) {
896                                 policy_end =
897                                         cfs_list_entry(lock->l_sl_policy.prev,
898                                                        struct ldlm_lock,
899                                                        l_sl_policy);
900
901                                 if (lock->l_policy_data.l_inodebits.bits ==
902                                     req->l_policy_data.l_inodebits.bits) {
903                                         /* insert point is last lock of
904                                          * the policy group */
905                                         prev->res_link =
906                                                 &policy_end->l_res_link;
907                                         prev->mode_link =
908                                                 &policy_end->l_sl_mode;
909                                         prev->policy_link =
910                                                 &policy_end->l_sl_policy;
911                                         EXIT;
912                                         return;
913                                 }
914
915                                 if (policy_end == mode_end)
916                                         /* done with mode group */
917                                         break;
918
919                                 /* go to next policy group within mode group */
920                                 tmp = policy_end->l_res_link.next;
921                                 lock = cfs_list_entry(tmp, struct ldlm_lock,
922                                                       l_res_link);
923                         }  /* loop over policy groups within the mode group */
924
925                         /* insert point is last lock of the mode group,
926                          * new policy group is started */
927                         prev->res_link = &mode_end->l_res_link;
928                         prev->mode_link = &mode_end->l_sl_mode;
929                         prev->policy_link = &req->l_sl_policy;
930                         EXIT;
931                         return;
932                 } else {
933                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
934                         LBUG();
935                 }
936         }
937
938         /* insert point is last lock on the queue,
939          * new mode group and new policy group are started */
940         prev->res_link = queue->prev;
941         prev->mode_link = &req->l_sl_mode;
942         prev->policy_link = &req->l_sl_policy;
943         EXIT;
944         return;
945 }
946
947 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
948                                        struct sl_insert_point *prev)
949 {
950         struct ldlm_resource *res = lock->l_resource;
951         ENTRY;
952
953         check_res_locked(res);
954
955         ldlm_resource_dump(D_INFO, res);
956         LDLM_DEBUG(lock, "About to add lock:");
957
958         if (lock->l_destroyed) {
959                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
960                 return;
961         }
962
963         LASSERT(cfs_list_empty(&lock->l_res_link));
964         LASSERT(cfs_list_empty(&lock->l_sl_mode));
965         LASSERT(cfs_list_empty(&lock->l_sl_policy));
966
967         cfs_list_add(&lock->l_res_link, prev->res_link);
968         cfs_list_add(&lock->l_sl_mode, prev->mode_link);
969         cfs_list_add(&lock->l_sl_policy, prev->policy_link);
970
971         EXIT;
972 }
973
974 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
975 {
976         struct sl_insert_point prev;
977         ENTRY;
978
979         LASSERT(lock->l_req_mode == lock->l_granted_mode);
980
981         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
982         ldlm_granted_list_add_lock(lock, &prev);
983         EXIT;
984 }
985
986 /* NOTE: called by
987  *  - ldlm_lock_enqueue
988  *  - ldlm_reprocess_queue
989  *  - ldlm_lock_convert
990  *
991  * must be called with lr_lock held
992  */
993 void ldlm_grant_lock(struct ldlm_lock *lock, cfs_list_t *work_list)
994 {
995         struct ldlm_resource *res = lock->l_resource;
996         ENTRY;
997
998         check_res_locked(res);
999
1000         lock->l_granted_mode = lock->l_req_mode;
1001         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1002                 ldlm_grant_lock_with_skiplist(lock);
1003         else if (res->lr_type == LDLM_EXTENT)
1004                 ldlm_extent_add_lock(res, lock);
1005         else
1006                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1007
1008         if (lock->l_granted_mode < res->lr_most_restr)
1009                 res->lr_most_restr = lock->l_granted_mode;
1010
1011         if (work_list && lock->l_completion_ast != NULL)
1012                 ldlm_add_ast_work_item(lock, NULL, work_list);
1013
1014         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1015         EXIT;
1016 }
1017
1018 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
1019  * comment above ldlm_lock_match */
1020 static struct ldlm_lock *search_queue(cfs_list_t *queue,
1021                                       ldlm_mode_t *mode,
1022                                       ldlm_policy_data_t *policy,
1023                                       struct ldlm_lock *old_lock,
1024                                       int flags, int unref)
1025 {
1026         struct ldlm_lock *lock;
1027         cfs_list_t       *tmp;
1028
1029         cfs_list_for_each(tmp, queue) {
1030                 ldlm_mode_t match;
1031
1032                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1033
1034                 if (lock == old_lock)
1035                         break;
1036
1037                 /* llite sometimes wants to match locks that will be
1038                  * canceled when their users drop, but we allow it to match
1039                  * if it passes in CBPENDING and the lock still has users.
1040                  * this is generally only going to be used by children
1041                  * whose parents already hold a lock so forward progress
1042                  * can still happen. */
1043                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1044                     !(flags & LDLM_FL_CBPENDING))
1045                         continue;
1046                 if (!unref && lock->l_flags & LDLM_FL_CBPENDING &&
1047                     lock->l_readers == 0 && lock->l_writers == 0)
1048                         continue;
1049
1050                 if (!(lock->l_req_mode & *mode))
1051                         continue;
1052                 match = lock->l_req_mode;
1053
1054                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
1055                     (lock->l_policy_data.l_extent.start >
1056                      policy->l_extent.start ||
1057                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
1058                         continue;
1059
1060                 if (unlikely(match == LCK_GROUP) &&
1061                     lock->l_resource->lr_type == LDLM_EXTENT &&
1062                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1063                         continue;
1064
1065                 /* We match if we have existing lock with same or wider set
1066                    of bits. */
1067                 if (lock->l_resource->lr_type == LDLM_IBITS &&
1068                      ((lock->l_policy_data.l_inodebits.bits &
1069                       policy->l_inodebits.bits) !=
1070                       policy->l_inodebits.bits))
1071                         continue;
1072
1073                 if (!unref &&
1074                     (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1075                      lock->l_failed))
1076                         continue;
1077
1078                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
1079                     !(lock->l_flags & LDLM_FL_LOCAL))
1080                         continue;
1081
1082                 if (flags & LDLM_FL_TEST_LOCK) {
1083                         LDLM_LOCK_GET(lock);
1084                         ldlm_lock_touch_in_lru(lock);
1085                 } else {
1086                         ldlm_lock_addref_internal_nolock(lock, match);
1087                 }
1088                 *mode = match;
1089                 return lock;
1090         }
1091
1092         return NULL;
1093 }
1094
1095 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1096 {
1097         if (!lock->l_failed) {
1098                 lock->l_failed = 1;
1099                 cfs_waitq_broadcast(&lock->l_waitq);
1100         }
1101 }
1102 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1103
1104 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1105 {
1106         lock_res_and_lock(lock);
1107         ldlm_lock_fail_match_locked(lock);
1108         unlock_res_and_lock(lock);
1109 }
1110 EXPORT_SYMBOL(ldlm_lock_fail_match);
1111
1112 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1113 {
1114         lock->l_flags |= LDLM_FL_LVB_READY;
1115         cfs_waitq_broadcast(&lock->l_waitq);
1116 }
1117
1118 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1119 {
1120         lock_res_and_lock(lock);
1121         ldlm_lock_allow_match_locked(lock);
1122         unlock_res_and_lock(lock);
1123 }
1124
1125 /* Can be called in two ways:
1126  *
1127  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1128  * for a duplicate of.
1129  *
1130  * Otherwise, all of the fields must be filled in, to match against.
1131  *
1132  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1133  *     server (ie, connh is NULL)
1134  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1135  *     list will be considered
1136  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1137  *     to be canceled can still be matched as long as they still have reader
1138  *     or writer refernces
1139  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1140  *     just tell us if we would have matched.
1141  *
1142  * Returns 1 if it finds an already-existing lock that is compatible; in this
1143  * case, lockh is filled in with a addref()ed lock
1144  *
1145  * we also check security context, if that failed we simply return 0 (to keep
1146  * caller code unchanged), the context failure will be discovered by caller
1147  * sometime later.
1148  */
1149 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
1150                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1151                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1152                             struct lustre_handle *lockh, int unref)
1153 {
1154         struct ldlm_resource *res;
1155         struct ldlm_lock *lock, *old_lock = NULL;
1156         int rc = 0;
1157         ENTRY;
1158
1159         if (ns == NULL) {
1160                 old_lock = ldlm_handle2lock(lockh);
1161                 LASSERT(old_lock);
1162
1163                 ns = ldlm_lock_to_ns(old_lock);
1164                 res_id = &old_lock->l_resource->lr_name;
1165                 type = old_lock->l_resource->lr_type;
1166                 mode = old_lock->l_req_mode;
1167         }
1168
1169         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1170         if (res == NULL) {
1171                 LASSERT(old_lock == NULL);
1172                 RETURN(0);
1173         }
1174
1175         LDLM_RESOURCE_ADDREF(res);
1176         lock_res(res);
1177
1178         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1179                             flags, unref);
1180         if (lock != NULL)
1181                 GOTO(out, rc = 1);
1182         if (flags & LDLM_FL_BLOCK_GRANTED)
1183                 GOTO(out, rc = 0);
1184         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1185                             flags, unref);
1186         if (lock != NULL)
1187                 GOTO(out, rc = 1);
1188         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1189                             flags, unref);
1190         if (lock != NULL)
1191                 GOTO(out, rc = 1);
1192
1193         EXIT;
1194  out:
1195         unlock_res(res);
1196         LDLM_RESOURCE_DELREF(res);
1197         ldlm_resource_putref(res);
1198
1199         if (lock) {
1200                 ldlm_lock2handle(lock, lockh);
1201                 if ((flags & LDLM_FL_LVB_READY) &&
1202                     (!(lock->l_flags & LDLM_FL_LVB_READY))) {
1203                         struct l_wait_info lwi;
1204                         if (lock->l_completion_ast) {
1205                                 int err = lock->l_completion_ast(lock,
1206                                                           LDLM_FL_WAIT_NOREPROC,
1207                                                                  NULL);
1208                                 if (err) {
1209                                         if (flags & LDLM_FL_TEST_LOCK)
1210                                                 LDLM_LOCK_RELEASE(lock);
1211                                         else
1212                                                 ldlm_lock_decref_internal(lock,
1213                                                                           mode);
1214                                         rc = 0;
1215                                         goto out2;
1216                                 }
1217                         }
1218
1219                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1220                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1221
1222                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1223                         l_wait_event(lock->l_waitq,
1224                                      lock->l_flags & LDLM_FL_LVB_READY ||
1225                                      lock->l_failed,
1226                                      &lwi);
1227                         if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
1228                                 if (flags & LDLM_FL_TEST_LOCK)
1229                                         LDLM_LOCK_RELEASE(lock);
1230                                 else
1231                                         ldlm_lock_decref_internal(lock, mode);
1232                                 rc = 0;
1233                         }
1234                 }
1235         }
1236  out2:
1237         if (rc) {
1238                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1239                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1240                                 res_id->name[2] : policy->l_extent.start,
1241                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1242                                 res_id->name[3] : policy->l_extent.end);
1243
1244                 /* check user's security context */
1245                 if (lock->l_conn_export &&
1246                     sptlrpc_import_check_ctx(
1247                                 class_exp2cliimp(lock->l_conn_export))) {
1248                         if (!(flags & LDLM_FL_TEST_LOCK))
1249                                 ldlm_lock_decref_internal(lock, mode);
1250                         rc = 0;
1251                 }
1252
1253                 if (flags & LDLM_FL_TEST_LOCK)
1254                         LDLM_LOCK_RELEASE(lock);
1255
1256         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1257                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1258                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1259                                   type, mode, res_id->name[0], res_id->name[1],
1260                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1261                                         res_id->name[2] :policy->l_extent.start,
1262                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1263                                         res_id->name[3] : policy->l_extent.end);
1264         }
1265         if (old_lock)
1266                 LDLM_LOCK_PUT(old_lock);
1267
1268         return rc ? mode : 0;
1269 }
1270
1271 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1272                                         __u64 *bits)
1273 {
1274         struct ldlm_lock *lock;
1275         ldlm_mode_t mode = 0;
1276         ENTRY;
1277
1278         lock = ldlm_handle2lock(lockh);
1279         if (lock != NULL) {
1280                 lock_res_and_lock(lock);
1281                 if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1282                     lock->l_failed)
1283                         GOTO(out, mode);
1284
1285                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1286                     lock->l_readers == 0 && lock->l_writers == 0)
1287                         GOTO(out, mode);
1288
1289                 if (bits)
1290                         *bits = lock->l_policy_data.l_inodebits.bits;
1291                 mode = lock->l_granted_mode;
1292                 ldlm_lock_addref_internal_nolock(lock, mode);
1293         }
1294
1295         EXIT;
1296
1297 out:
1298         if (lock != NULL) {
1299                 unlock_res_and_lock(lock);
1300                 LDLM_LOCK_PUT(lock);
1301         }
1302         return mode;
1303 }
1304 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1305
1306 /* Returns a referenced lock */
1307 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1308                                    const struct ldlm_res_id *res_id,
1309                                    ldlm_type_t type,
1310                                    ldlm_mode_t mode,
1311                                    const struct ldlm_callback_suite *cbs,
1312                                    void *data, __u32 lvb_len)
1313 {
1314         struct ldlm_lock *lock;
1315         struct ldlm_resource *res;
1316         ENTRY;
1317
1318         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1319         if (res == NULL)
1320                 RETURN(NULL);
1321
1322         lock = ldlm_lock_new(res);
1323
1324         if (lock == NULL)
1325                 RETURN(NULL);
1326
1327         lock->l_req_mode = mode;
1328         lock->l_ast_data = data;
1329         lock->l_pid = cfs_curproc_pid();
1330         lock->l_ns_srv = !!ns_is_server(ns);
1331         if (cbs) {
1332                 lock->l_blocking_ast = cbs->lcs_blocking;
1333                 lock->l_completion_ast = cbs->lcs_completion;
1334                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1335                 lock->l_weigh_ast = cbs->lcs_weigh;
1336         }
1337
1338         lock->l_tree_node = NULL;
1339         /* if this is the extent lock, allocate the interval tree node */
1340         if (type == LDLM_EXTENT) {
1341                 if (ldlm_interval_alloc(lock) == NULL)
1342                         GOTO(out, 0);
1343         }
1344
1345         if (lvb_len) {
1346                 lock->l_lvb_len = lvb_len;
1347                 OBD_ALLOC(lock->l_lvb_data, lvb_len);
1348                 if (lock->l_lvb_data == NULL)
1349                         GOTO(out, 0);
1350         }
1351
1352         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1353                 GOTO(out, 0);
1354
1355         RETURN(lock);
1356
1357 out:
1358         ldlm_lock_destroy(lock);
1359         LDLM_LOCK_RELEASE(lock);
1360         return NULL;
1361 }
1362
1363 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1364                                struct ldlm_lock **lockp,
1365                                void *cookie, int *flags)
1366 {
1367         struct ldlm_lock *lock = *lockp;
1368         struct ldlm_resource *res = lock->l_resource;
1369         int local = ns_is_client(ldlm_res_to_ns(res));
1370 #ifdef HAVE_SERVER_SUPPORT
1371         ldlm_processing_policy policy;
1372 #endif
1373         ldlm_error_t rc = ELDLM_OK;
1374         struct ldlm_interval *node = NULL;
1375         ENTRY;
1376
1377         lock->l_last_activity = cfs_time_current_sec();
1378         /* policies are not executed on the client or during replay */
1379         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1380             && !local && ns->ns_policy) {
1381                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1382                                    NULL);
1383                 if (rc == ELDLM_LOCK_REPLACED) {
1384                         /* The lock that was returned has already been granted,
1385                          * and placed into lockp.  If it's not the same as the
1386                          * one we passed in, then destroy the old one and our
1387                          * work here is done. */
1388                         if (lock != *lockp) {
1389                                 ldlm_lock_destroy(lock);
1390                                 LDLM_LOCK_RELEASE(lock);
1391                         }
1392                         *flags |= LDLM_FL_LOCK_CHANGED;
1393                         RETURN(0);
1394                 } else if (rc != ELDLM_OK ||
1395                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1396                         ldlm_lock_destroy(lock);
1397                         RETURN(rc);
1398                 }
1399         }
1400
1401         /* For a replaying lock, it might be already in granted list. So
1402          * unlinking the lock will cause the interval node to be freed, we
1403          * have to allocate the interval node early otherwise we can't regrant
1404          * this lock in the future. - jay */
1405         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1406                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1407
1408         lock_res_and_lock(lock);
1409         if (local && lock->l_req_mode == lock->l_granted_mode) {
1410                 /* The server returned a blocked lock, but it was granted
1411                  * before we got a chance to actually enqueue it.  We don't
1412                  * need to do anything else. */
1413                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1414                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1415                 GOTO(out, ELDLM_OK);
1416         }
1417
1418         ldlm_resource_unlink_lock(lock);
1419         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1420                 if (node == NULL) {
1421                         ldlm_lock_destroy_nolock(lock);
1422                         GOTO(out, rc = -ENOMEM);
1423                 }
1424
1425                 CFS_INIT_LIST_HEAD(&node->li_group);
1426                 ldlm_interval_attach(node, lock);
1427                 node = NULL;
1428         }
1429
1430         /* Some flags from the enqueue want to make it into the AST, via the
1431          * lock's l_flags. */
1432         lock->l_flags |= *flags & LDLM_AST_DISCARD_DATA;
1433
1434         /* This distinction between local lock trees is very important; a client
1435          * namespace only has information about locks taken by that client, and
1436          * thus doesn't have enough information to decide for itself if it can
1437          * be granted (below).  In this case, we do exactly what the server
1438          * tells us to do, as dictated by the 'flags'.
1439          *
1440          * We do exactly the same thing during recovery, when the server is
1441          * more or less trusting the clients not to lie.
1442          *
1443          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1444          * granted/converting queues. */
1445         if (local) {
1446                 if (*flags & LDLM_FL_BLOCK_CONV)
1447                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1448                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1449                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1450                 else
1451                         ldlm_grant_lock(lock, NULL);
1452                 GOTO(out, ELDLM_OK);
1453 #ifdef HAVE_SERVER_SUPPORT
1454         } else if (*flags & LDLM_FL_REPLAY) {
1455                 if (*flags & LDLM_FL_BLOCK_CONV) {
1456                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1457                         GOTO(out, ELDLM_OK);
1458                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1459                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1460                         GOTO(out, ELDLM_OK);
1461                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1462                         ldlm_grant_lock(lock, NULL);
1463                         GOTO(out, ELDLM_OK);
1464                 }
1465                 /* If no flags, fall through to normal enqueue path. */
1466         }
1467
1468         policy = ldlm_processing_policy_table[res->lr_type];
1469         policy(lock, flags, 1, &rc, NULL);
1470         GOTO(out, rc);
1471 #else
1472         } else {
1473                 CERROR("This is client-side-only module, cannot handle "
1474                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1475                 LBUG();
1476         }
1477 #endif
1478
1479 out:
1480         unlock_res_and_lock(lock);
1481         if (node)
1482                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1483         return rc;
1484 }
1485
1486 #ifdef HAVE_SERVER_SUPPORT
1487 /* Must be called with namespace taken: queue is waiting or converting. */
1488 int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue,
1489                          cfs_list_t *work_list)
1490 {
1491         cfs_list_t *tmp, *pos;
1492         ldlm_processing_policy policy;
1493         int flags;
1494         int rc = LDLM_ITER_CONTINUE;
1495         ldlm_error_t err;
1496         ENTRY;
1497
1498         check_res_locked(res);
1499
1500         policy = ldlm_processing_policy_table[res->lr_type];
1501         LASSERT(policy);
1502
1503         cfs_list_for_each_safe(tmp, pos, queue) {
1504                 struct ldlm_lock *pending;
1505                 pending = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1506
1507                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1508
1509                 flags = 0;
1510                 rc = policy(pending, &flags, 0, &err, work_list);
1511                 if (rc != LDLM_ITER_CONTINUE)
1512                         break;
1513         }
1514
1515         RETURN(rc);
1516 }
1517 #endif
1518
1519 static int
1520 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1521 {
1522         struct ldlm_cb_set_arg *arg = opaq;
1523         struct ldlm_lock_desc   d;
1524         int                     rc;
1525         struct ldlm_lock       *lock;
1526         ENTRY;
1527
1528         if (cfs_list_empty(arg->list))
1529                 RETURN(-ENOENT);
1530
1531         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1532
1533         /* nobody should touch l_bl_ast */
1534         lock_res_and_lock(lock);
1535         cfs_list_del_init(&lock->l_bl_ast);
1536
1537         LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1538         LASSERT(lock->l_bl_ast_run == 0);
1539         LASSERT(lock->l_blocking_lock);
1540         lock->l_bl_ast_run++;
1541         unlock_res_and_lock(lock);
1542
1543         ldlm_lock2desc(lock->l_blocking_lock, &d);
1544
1545         rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1546         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1547         lock->l_blocking_lock = NULL;
1548         LDLM_LOCK_RELEASE(lock);
1549
1550         RETURN(rc);
1551 }
1552
1553 static int
1554 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1555 {
1556         struct ldlm_cb_set_arg  *arg = opaq;
1557         int                      rc = 0;
1558         struct ldlm_lock        *lock;
1559         ldlm_completion_callback completion_callback;
1560         ENTRY;
1561
1562         if (cfs_list_empty(arg->list))
1563                 RETURN(-ENOENT);
1564
1565         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1566
1567         /* It's possible to receive a completion AST before we've set
1568          * the l_completion_ast pointer: either because the AST arrived
1569          * before the reply, or simply because there's a small race
1570          * window between receiving the reply and finishing the local
1571          * enqueue. (bug 842)
1572          *
1573          * This can't happen with the blocking_ast, however, because we
1574          * will never call the local blocking_ast until we drop our
1575          * reader/writer reference, which we won't do until we get the
1576          * reply and finish enqueueing. */
1577
1578         /* nobody should touch l_cp_ast */
1579         lock_res_and_lock(lock);
1580         cfs_list_del_init(&lock->l_cp_ast);
1581         LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
1582         /* save l_completion_ast since it can be changed by
1583          * mds_intent_policy(), see bug 14225 */
1584         completion_callback = lock->l_completion_ast;
1585         lock->l_flags &= ~LDLM_FL_CP_REQD;
1586         unlock_res_and_lock(lock);
1587
1588         if (completion_callback != NULL)
1589                 rc = completion_callback(lock, 0, (void *)arg);
1590         LDLM_LOCK_RELEASE(lock);
1591
1592         RETURN(rc);
1593 }
1594
1595 static int
1596 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1597 {
1598         struct ldlm_cb_set_arg *arg = opaq;
1599         struct ldlm_lock_desc   desc;
1600         int                     rc;
1601         struct ldlm_lock       *lock;
1602         ENTRY;
1603
1604         if (cfs_list_empty(arg->list))
1605                 RETURN(-ENOENT);
1606
1607         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1608         cfs_list_del_init(&lock->l_rk_ast);
1609
1610         /* the desc just pretend to exclusive */
1611         ldlm_lock2desc(lock, &desc);
1612         desc.l_req_mode = LCK_EX;
1613         desc.l_granted_mode = 0;
1614
1615         rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1616         LDLM_LOCK_RELEASE(lock);
1617
1618         RETURN(rc);
1619 }
1620
1621 int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
1622                       ldlm_desc_ast_t ast_type)
1623 {
1624         struct ldlm_cb_set_arg *arg;
1625         set_producer_func       work_ast_lock;
1626         int                     rc;
1627
1628         if (cfs_list_empty(rpc_list))
1629                 RETURN(0);
1630
1631         OBD_ALLOC_PTR(arg);
1632         if (arg == NULL)
1633                 RETURN(-ENOMEM);
1634
1635         cfs_atomic_set(&arg->restart, 0);
1636         arg->list = rpc_list;
1637
1638         switch (ast_type) {
1639                 case LDLM_WORK_BL_AST:
1640                         arg->type = LDLM_BL_CALLBACK;
1641                         work_ast_lock = ldlm_work_bl_ast_lock;
1642                         break;
1643                 case LDLM_WORK_CP_AST:
1644                         arg->type = LDLM_CP_CALLBACK;
1645                         work_ast_lock = ldlm_work_cp_ast_lock;
1646                         break;
1647                 case LDLM_WORK_REVOKE_AST:
1648                         arg->type = LDLM_BL_CALLBACK;
1649                         work_ast_lock = ldlm_work_revoke_ast_lock;
1650                         break;
1651                 default:
1652                         LBUG();
1653         }
1654
1655         /* We create a ptlrpc request set with flow control extension.
1656          * This request set will use the work_ast_lock function to produce new
1657          * requests and will send a new request each time one completes in order
1658          * to keep the number of requests in flight to ns_max_parallel_ast */
1659         arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1660                                      work_ast_lock, arg);
1661         if (arg->set == NULL)
1662                 GOTO(out, rc = -ENOMEM);
1663
1664         ptlrpc_set_wait(arg->set);
1665         ptlrpc_set_destroy(arg->set);
1666
1667         rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0;
1668         GOTO(out, rc);
1669 out:
1670         OBD_FREE_PTR(arg);
1671         return rc;
1672 }
1673
1674 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
1675 {
1676         ldlm_reprocess_all(res);
1677         return LDLM_ITER_CONTINUE;
1678 }
1679
1680 static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1681                               cfs_hlist_node_t *hnode, void *arg)
1682 {
1683         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1684         int    rc;
1685
1686         rc = reprocess_one_queue(res, arg);
1687
1688         return rc == LDLM_ITER_STOP;
1689 }
1690
1691 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
1692 {
1693         ENTRY;
1694
1695         if (ns != NULL) {
1696                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
1697                                          ldlm_reprocess_res, NULL);
1698         }
1699         EXIT;
1700 }
1701
1702 void ldlm_reprocess_all(struct ldlm_resource *res)
1703 {
1704         CFS_LIST_HEAD(rpc_list);
1705
1706 #ifdef HAVE_SERVER_SUPPORT
1707         int rc;
1708         ENTRY;
1709         /* Local lock trees don't get reprocessed. */
1710         if (ns_is_client(ldlm_res_to_ns(res))) {
1711                 EXIT;
1712                 return;
1713         }
1714
1715 restart:
1716         lock_res(res);
1717         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
1718         if (rc == LDLM_ITER_CONTINUE)
1719                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
1720         unlock_res(res);
1721
1722         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
1723                                LDLM_WORK_CP_AST);
1724         if (rc == -ERESTART) {
1725                 LASSERT(cfs_list_empty(&rpc_list));
1726                 goto restart;
1727         }
1728 #else
1729         ENTRY;
1730         if (!ns_is_client(ldlm_res_to_ns(res))) {
1731                 CERROR("This is client-side-only module, cannot handle "
1732                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1733                 LBUG();
1734         }
1735 #endif
1736         EXIT;
1737 }
1738
1739 void ldlm_cancel_callback(struct ldlm_lock *lock)
1740 {
1741         check_res_locked(lock->l_resource);
1742         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
1743                 lock->l_flags |= LDLM_FL_CANCEL;
1744                 if (lock->l_blocking_ast) {
1745                         // l_check_no_ns_lock(ns);
1746                         unlock_res_and_lock(lock);
1747                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
1748                                              LDLM_CB_CANCELING);
1749                         lock_res_and_lock(lock);
1750                 } else {
1751                         LDLM_DEBUG(lock, "no blocking ast");
1752                 }
1753         }
1754         lock->l_flags |= LDLM_FL_BL_DONE;
1755 }
1756
1757 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
1758 {
1759         if (req->l_resource->lr_type != LDLM_PLAIN &&
1760             req->l_resource->lr_type != LDLM_IBITS)
1761                 return;
1762
1763         cfs_list_del_init(&req->l_sl_policy);
1764         cfs_list_del_init(&req->l_sl_mode);
1765 }
1766
1767 void ldlm_lock_cancel(struct ldlm_lock *lock)
1768 {
1769         struct ldlm_resource *res;
1770         struct ldlm_namespace *ns;
1771         ENTRY;
1772
1773         lock_res_and_lock(lock);
1774
1775         res = lock->l_resource;
1776         ns  = ldlm_res_to_ns(res);
1777
1778         /* Please do not, no matter how tempting, remove this LBUG without
1779          * talking to me first. -phik */
1780         if (lock->l_readers || lock->l_writers) {
1781                 LDLM_ERROR(lock, "lock still has references");
1782                 LBUG();
1783         }
1784
1785         if (lock->l_waited)
1786                 ldlm_del_waiting_lock(lock);
1787
1788         /* Releases cancel callback. */
1789         ldlm_cancel_callback(lock);
1790
1791         /* Yes, second time, just in case it was added again while we were
1792            running with no res lock in ldlm_cancel_callback */
1793         if (lock->l_waited)
1794                 ldlm_del_waiting_lock(lock);
1795
1796         ldlm_resource_unlink_lock(lock);
1797         ldlm_lock_destroy_nolock(lock);
1798
1799         if (lock->l_granted_mode == lock->l_req_mode)
1800                 ldlm_pool_del(&ns->ns_pool, lock);
1801
1802         /* Make sure we will not be called again for same lock what is possible
1803          * if not to zero out lock->l_granted_mode */
1804         lock->l_granted_mode = LCK_MINMODE;
1805         unlock_res_and_lock(lock);
1806
1807         EXIT;
1808 }
1809
1810 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
1811 {
1812         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1813         int rc = -EINVAL;
1814         ENTRY;
1815
1816         if (lock) {
1817                 if (lock->l_ast_data == NULL)
1818                         lock->l_ast_data = data;
1819                 if (lock->l_ast_data == data)
1820                         rc = 0;
1821                 LDLM_LOCK_PUT(lock);
1822         }
1823         RETURN(rc);
1824 }
1825 EXPORT_SYMBOL(ldlm_lock_set_data);
1826
1827 struct export_cl_data {
1828         struct obd_export       *ecl_exp;
1829         int                     ecl_loop;
1830 };
1831
1832 int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1833                                     cfs_hlist_node_t *hnode, void *data)
1834
1835 {
1836         struct export_cl_data   *ecl = (struct export_cl_data *)data;
1837         struct obd_export       *exp  = ecl->ecl_exp;
1838         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
1839         struct ldlm_resource *res;
1840
1841         res = ldlm_resource_getref(lock->l_resource);
1842         LDLM_LOCK_GET(lock);
1843
1844         LDLM_DEBUG(lock, "export %p", exp);
1845         ldlm_res_lvbo_update(res, NULL, 1);
1846         ldlm_lock_cancel(lock);
1847         ldlm_reprocess_all(res);
1848         ldlm_resource_putref(res);
1849         LDLM_LOCK_RELEASE(lock);
1850
1851         ecl->ecl_loop++;
1852         if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
1853                 CDEBUG(D_INFO,
1854                        "Cancel lock %p for export %p (loop %d), still have "
1855                        "%d locks left on hash table.\n",
1856                        lock, exp, ecl->ecl_loop,
1857                        cfs_atomic_read(&hs->hs_count));
1858         }
1859
1860         return 0;
1861 }
1862
1863 void ldlm_cancel_locks_for_export(struct obd_export *exp)
1864 {
1865         struct export_cl_data   ecl = {
1866                 .ecl_exp        = exp,
1867                 .ecl_loop       = 0,
1868         };
1869
1870         cfs_hash_for_each_empty(exp->exp_lock_hash,
1871                                 ldlm_cancel_locks_for_export_cb, &ecl);
1872 }
1873
1874 /**
1875  * Downgrade an exclusive lock.
1876  *
1877  * A fast variant of ldlm_lock_convert for convertion of exclusive
1878  * locks. The convertion is always successful.
1879  *
1880  * \param lock A lock to convert
1881  * \param new_mode new lock mode
1882  */
1883 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
1884 {
1885         ENTRY;
1886
1887         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
1888         LASSERT(new_mode == LCK_COS);
1889
1890         lock_res_and_lock(lock);
1891         ldlm_resource_unlink_lock(lock);
1892         /*
1893          * Remove the lock from pool as it will be added again in
1894          * ldlm_grant_lock() called below.
1895          */
1896         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
1897
1898         lock->l_req_mode = new_mode;
1899         ldlm_grant_lock(lock, NULL);
1900         unlock_res_and_lock(lock);
1901         ldlm_reprocess_all(lock->l_resource);
1902
1903         EXIT;
1904 }
1905
1906 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
1907                                         __u32 *flags)
1908 {
1909         CFS_LIST_HEAD(rpc_list);
1910         struct ldlm_resource *res;
1911         struct ldlm_namespace *ns;
1912         int granted = 0;
1913 #ifdef HAVE_SERVER_SUPPORT
1914         int old_mode;
1915         struct sl_insert_point prev;
1916 #endif
1917         struct ldlm_interval *node;
1918         ENTRY;
1919
1920         if (new_mode == lock->l_granted_mode) { // No changes? Just return.
1921                 *flags |= LDLM_FL_BLOCK_GRANTED;
1922                 RETURN(lock->l_resource);
1923         }
1924
1925         /* I can't check the type of lock here because the bitlock of lock
1926          * is not held here, so do the allocation blindly. -jay */
1927         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1928         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
1929                 RETURN(NULL);
1930
1931         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
1932                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
1933
1934         lock_res_and_lock(lock);
1935
1936         res = lock->l_resource;
1937         ns  = ldlm_res_to_ns(res);
1938
1939 #ifdef HAVE_SERVER_SUPPORT
1940         old_mode = lock->l_req_mode;
1941 #endif
1942         lock->l_req_mode = new_mode;
1943         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
1944 #ifdef HAVE_SERVER_SUPPORT
1945                 /* remember the lock position where the lock might be
1946                  * added back to the granted list later and also
1947                  * remember the join mode for skiplist fixing. */
1948                 prev.res_link = lock->l_res_link.prev;
1949                 prev.mode_link = lock->l_sl_mode.prev;
1950                 prev.policy_link = lock->l_sl_policy.prev;
1951 #endif
1952                 ldlm_resource_unlink_lock(lock);
1953         } else {
1954                 ldlm_resource_unlink_lock(lock);
1955                 if (res->lr_type == LDLM_EXTENT) {
1956                         /* FIXME: ugly code, I have to attach the lock to a
1957                          * interval node again since perhaps it will be granted
1958                          * soon */
1959                         CFS_INIT_LIST_HEAD(&node->li_group);
1960                         ldlm_interval_attach(node, lock);
1961                         node = NULL;
1962                 }
1963         }
1964
1965         /*
1966          * Remove old lock from the pool before adding the lock with new
1967          * mode below in ->policy()
1968          */
1969         ldlm_pool_del(&ns->ns_pool, lock);
1970
1971         /* If this is a local resource, put it on the appropriate list. */
1972         if (ns_is_client(ldlm_res_to_ns(res))) {
1973                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
1974                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1975                 } else {
1976                         /* This should never happen, because of the way the
1977                          * server handles conversions. */
1978                         LDLM_ERROR(lock, "Erroneous flags %d on local lock\n",
1979                                    *flags);
1980                         LBUG();
1981
1982                         ldlm_grant_lock(lock, &rpc_list);
1983                         granted = 1;
1984                         /* FIXME: completion handling not with lr_lock held ! */
1985                         if (lock->l_completion_ast)
1986                                 lock->l_completion_ast(lock, 0, NULL);
1987                 }
1988 #ifdef HAVE_SERVER_SUPPORT
1989         } else {
1990                 int rc;
1991                 ldlm_error_t err;
1992                 int pflags = 0;
1993                 ldlm_processing_policy policy;
1994                 policy = ldlm_processing_policy_table[res->lr_type];
1995                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
1996                 if (rc == LDLM_ITER_STOP) {
1997                         lock->l_req_mode = old_mode;
1998                         if (res->lr_type == LDLM_EXTENT)
1999                                 ldlm_extent_add_lock(res, lock);
2000                         else
2001                                 ldlm_granted_list_add_lock(lock, &prev);
2002
2003                         res = NULL;
2004                 } else {
2005                         *flags |= LDLM_FL_BLOCK_GRANTED;
2006                         granted = 1;
2007                 }
2008         }
2009 #else
2010         } else {
2011                 CERROR("This is client-side-only module, cannot handle "
2012                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2013                 LBUG();
2014         }
2015 #endif
2016         unlock_res_and_lock(lock);
2017
2018         if (granted)
2019                 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2020         if (node)
2021                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2022         RETURN(res);
2023 }
2024
2025 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2026 {
2027         struct ldlm_lock *lock;
2028
2029         if (!((libcfs_debug | D_ERROR) & level))
2030                 return;
2031
2032         lock = ldlm_handle2lock(lockh);
2033         if (lock == NULL)
2034                 return;
2035
2036         LDLM_DEBUG_LIMIT(level, lock, "###");
2037
2038         LDLM_LOCK_PUT(lock);
2039 }
2040
2041 void _ldlm_lock_debug(struct ldlm_lock *lock,
2042                       struct libcfs_debug_msg_data *msgdata,
2043                       const char *fmt, ...)
2044 {
2045         va_list args;
2046         struct obd_export *exp = lock->l_export;
2047         struct ldlm_resource *resource = lock->l_resource;
2048         char *nid = "local";
2049
2050         va_start(args, fmt);
2051
2052         if (exp && exp->exp_connection) {
2053                 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2054         } else if (exp && exp->exp_obd != NULL) {
2055                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2056                 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2057         }
2058
2059         if (resource == NULL) {
2060                 libcfs_debug_vmsg2(msgdata, fmt, args,
2061                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2062                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2063                        "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
2064                        lock,
2065                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2066                        lock->l_readers, lock->l_writers,
2067                        ldlm_lockname[lock->l_granted_mode],
2068                        ldlm_lockname[lock->l_req_mode],
2069                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2070                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2071                        lock->l_pid, lock->l_callback_timeout);
2072                 va_end(args);
2073                 return;
2074         }
2075
2076         switch (resource->lr_type) {
2077         case LDLM_EXTENT:
2078                 libcfs_debug_vmsg2(msgdata, fmt, args,
2079                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2080                        "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64
2081                        "] (req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote:"
2082                        " "LPX64" expref: %d pid: %u timeout %lu\n",
2083                        ldlm_lock_to_ns_name(lock), lock,
2084                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2085                        lock->l_readers, lock->l_writers,
2086                        ldlm_lockname[lock->l_granted_mode],
2087                        ldlm_lockname[lock->l_req_mode],
2088                        resource->lr_name.name[0],
2089                        resource->lr_name.name[1],
2090                        cfs_atomic_read(&resource->lr_refcount),
2091                        ldlm_typename[resource->lr_type],
2092                        lock->l_policy_data.l_extent.start,
2093                        lock->l_policy_data.l_extent.end,
2094                        lock->l_req_extent.start, lock->l_req_extent.end,
2095                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2096                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2097                        lock->l_pid, lock->l_callback_timeout);
2098                 break;
2099
2100         case LDLM_FLOCK:
2101                 libcfs_debug_vmsg2(msgdata, fmt, args,
2102                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2103                        "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d "
2104                        "["LPU64"->"LPU64"] flags: "LPX64" nid: %s remote: "LPX64
2105                        " expref: %d pid: %u timeout: %lu\n",
2106                        ldlm_lock_to_ns_name(lock), lock,
2107                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2108                        lock->l_readers, lock->l_writers,
2109                        ldlm_lockname[lock->l_granted_mode],
2110                        ldlm_lockname[lock->l_req_mode],
2111                        resource->lr_name.name[0],
2112                        resource->lr_name.name[1],
2113                        cfs_atomic_read(&resource->lr_refcount),
2114                        ldlm_typename[resource->lr_type],
2115                        lock->l_policy_data.l_flock.pid,
2116                        lock->l_policy_data.l_flock.start,
2117                        lock->l_policy_data.l_flock.end,
2118                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2119                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2120                        lock->l_pid, lock->l_callback_timeout);
2121                 break;
2122
2123         case LDLM_IBITS:
2124                 libcfs_debug_vmsg2(msgdata, fmt, args,
2125                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2126                        "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "
2127                        "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2128                        "pid: %u timeout: %lu\n",
2129                        ldlm_lock_to_ns_name(lock),
2130                        lock, lock->l_handle.h_cookie,
2131                        cfs_atomic_read (&lock->l_refc),
2132                        lock->l_readers, lock->l_writers,
2133                        ldlm_lockname[lock->l_granted_mode],
2134                        ldlm_lockname[lock->l_req_mode],
2135                        resource->lr_name.name[0],
2136                        resource->lr_name.name[1],
2137                        lock->l_policy_data.l_inodebits.bits,
2138                        cfs_atomic_read(&resource->lr_refcount),
2139                        ldlm_typename[resource->lr_type],
2140                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2141                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2142                        lock->l_pid, lock->l_callback_timeout);
2143                 break;
2144
2145         default:
2146                 libcfs_debug_vmsg2(msgdata, fmt, args,
2147                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2148                        "res: "LPU64"/"LPU64" rrc: %d type: %s flags: "LPX64" "
2149                        "nid: %s remote: "LPX64" expref: %d pid: %u timeout %lu"
2150                        "\n",
2151                        ldlm_lock_to_ns_name(lock),
2152                        lock, lock->l_handle.h_cookie,
2153                        cfs_atomic_read (&lock->l_refc),
2154                        lock->l_readers, lock->l_writers,
2155                        ldlm_lockname[lock->l_granted_mode],
2156                        ldlm_lockname[lock->l_req_mode],
2157                        resource->lr_name.name[0],
2158                        resource->lr_name.name[1],
2159                        cfs_atomic_read(&resource->lr_refcount),
2160                        ldlm_typename[resource->lr_type],
2161                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2162                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2163                        lock->l_pid, lock->l_callback_timeout);
2164                 break;
2165         }
2166         va_end(args);
2167 }
2168 EXPORT_SYMBOL(_ldlm_lock_debug);