Whamcloud - gitweb
LU-1842 ldlm: support for sending GL ASTs to multiple locks
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #ifdef __KERNEL__
45 # include <libcfs/libcfs.h>
46 # include <linux/lustre_intent.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include "ldlm_internal.h"
53
54 /* lock types */
55 char *ldlm_lockname[] = {
56         [0] "--",
57         [LCK_EX] "EX",
58         [LCK_PW] "PW",
59         [LCK_PR] "PR",
60         [LCK_CW] "CW",
61         [LCK_CR] "CR",
62         [LCK_NL] "NL",
63         [LCK_GROUP] "GROUP",
64         [LCK_COS] "COS"
65 };
66
67 char *ldlm_typename[] = {
68         [LDLM_PLAIN] "PLN",
69         [LDLM_EXTENT] "EXT",
70         [LDLM_FLOCK] "FLK",
71         [LDLM_IBITS] "IBT",
72 };
73
74 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
75         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
76         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
77         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire18_to_local,
78         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
79 };
80
81 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
82         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
83         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
84         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire21_to_local,
85         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
86 };
87
88 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
89         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_local_to_wire,
90         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_local_to_wire,
91         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_local_to_wire,
92         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_local_to_wire,
93 };
94
95 /**
96  * Converts lock policy from local format to on the wire lock_desc format
97  */
98 void ldlm_convert_policy_to_wire(ldlm_type_t type,
99                                  const ldlm_policy_data_t *lpolicy,
100                                  ldlm_wire_policy_data_t *wpolicy)
101 {
102         ldlm_policy_local_to_wire_t convert;
103
104         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
105
106         convert(lpolicy, wpolicy);
107 }
108
109 /**
110  * Converts lock policy from on the wire lock_desc format to local format
111  */
112 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
113                                   const ldlm_wire_policy_data_t *wpolicy,
114                                   ldlm_policy_data_t *lpolicy)
115 {
116         ldlm_policy_wire_to_local_t convert;
117         int new_client;
118
119         /** some badnes for 2.0.0 clients, but 2.0.0 isn't supported */
120         new_client = (exp->exp_connect_flags & OBD_CONNECT_FULL20) != 0;
121         if (new_client)
122                convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
123         else
124                convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
125
126         convert(wpolicy, lpolicy);
127 }
128
129 char *ldlm_it2str(int it)
130 {
131         switch (it) {
132         case IT_OPEN:
133                 return "open";
134         case IT_CREAT:
135                 return "creat";
136         case (IT_OPEN | IT_CREAT):
137                 return "open|creat";
138         case IT_READDIR:
139                 return "readdir";
140         case IT_GETATTR:
141                 return "getattr";
142         case IT_LOOKUP:
143                 return "lookup";
144         case IT_UNLINK:
145                 return "unlink";
146         case IT_GETXATTR:
147                 return "getxattr";
148         case IT_LAYOUT:
149                 return "layout";
150         default:
151                 CERROR("Unknown intent %d\n", it);
152                 return "UNKNOWN";
153         }
154 }
155
156 extern cfs_mem_cache_t *ldlm_lock_slab;
157
158 #ifdef HAVE_SERVER_SUPPORT
159 static ldlm_processing_policy ldlm_processing_policy_table[] = {
160         [LDLM_PLAIN] ldlm_process_plain_lock,
161         [LDLM_EXTENT] ldlm_process_extent_lock,
162 # ifdef __KERNEL__
163         [LDLM_FLOCK] ldlm_process_flock_lock,
164 # endif
165         [LDLM_IBITS] ldlm_process_inodebits_lock,
166 };
167
168 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
169 {
170         return ldlm_processing_policy_table[res->lr_type];
171 }
172 #endif /* HAVE_SERVER_SUPPORT */
173
174 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
175 {
176         ns->ns_policy = arg;
177 }
178
179 /*
180  * REFCOUNTED LOCK OBJECTS
181  */
182
183
184 /*
185  * Lock refcounts, during creation:
186  *   - one special one for allocation, dec'd only once in destroy
187  *   - one for being a lock that's in-use
188  *   - one for the addref associated with a new lock
189  */
190 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
191 {
192         cfs_atomic_inc(&lock->l_refc);
193         return lock;
194 }
195
196 void ldlm_lock_put(struct ldlm_lock *lock)
197 {
198         ENTRY;
199
200         LASSERT(lock->l_resource != LP_POISON);
201         LASSERT(cfs_atomic_read(&lock->l_refc) > 0);
202         if (cfs_atomic_dec_and_test(&lock->l_refc)) {
203                 struct ldlm_resource *res;
204
205                 LDLM_DEBUG(lock,
206                            "final lock_put on destroyed lock, freeing it.");
207
208                 res = lock->l_resource;
209                 LASSERT(lock->l_destroyed);
210                 LASSERT(cfs_list_empty(&lock->l_res_link));
211                 LASSERT(cfs_list_empty(&lock->l_pending_chain));
212
213                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
214                                      LDLM_NSS_LOCKS);
215                 lu_ref_del(&res->lr_reference, "lock", lock);
216                 ldlm_resource_putref(res);
217                 lock->l_resource = NULL;
218                 if (lock->l_export) {
219                         class_export_lock_put(lock->l_export, lock);
220                         lock->l_export = NULL;
221                 }
222
223                 if (lock->l_lvb_data != NULL)
224                         OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
225
226                 ldlm_interval_free(ldlm_interval_detach(lock));
227                 lu_ref_fini(&lock->l_reference);
228                 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
229         }
230
231         EXIT;
232 }
233
234 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
235 {
236         int rc = 0;
237         if (!cfs_list_empty(&lock->l_lru)) {
238                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
239
240                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
241                 cfs_list_del_init(&lock->l_lru);
242                 if (lock->l_flags & LDLM_FL_SKIPPED)
243                         lock->l_flags &= ~LDLM_FL_SKIPPED;
244                 LASSERT(ns->ns_nr_unused > 0);
245                 ns->ns_nr_unused--;
246                 rc = 1;
247         }
248         return rc;
249 }
250
251 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
252 {
253         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
254         int rc;
255
256         ENTRY;
257         if (lock->l_ns_srv) {
258                 LASSERT(cfs_list_empty(&lock->l_lru));
259                 RETURN(0);
260         }
261
262         cfs_spin_lock(&ns->ns_lock);
263         rc = ldlm_lock_remove_from_lru_nolock(lock);
264         cfs_spin_unlock(&ns->ns_lock);
265         EXIT;
266         return rc;
267 }
268
269 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
270 {
271         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
272
273         lock->l_last_used = cfs_time_current();
274         LASSERT(cfs_list_empty(&lock->l_lru));
275         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
276         cfs_list_add_tail(&lock->l_lru, &ns->ns_unused_list);
277         LASSERT(ns->ns_nr_unused >= 0);
278         ns->ns_nr_unused++;
279 }
280
281 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
282 {
283         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
284
285         ENTRY;
286         cfs_spin_lock(&ns->ns_lock);
287         ldlm_lock_add_to_lru_nolock(lock);
288         cfs_spin_unlock(&ns->ns_lock);
289         EXIT;
290 }
291
292 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
293 {
294         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
295
296         ENTRY;
297         if (lock->l_ns_srv) {
298                 LASSERT(cfs_list_empty(&lock->l_lru));
299                 EXIT;
300                 return;
301         }
302
303         cfs_spin_lock(&ns->ns_lock);
304         if (!cfs_list_empty(&lock->l_lru)) {
305                 ldlm_lock_remove_from_lru_nolock(lock);
306                 ldlm_lock_add_to_lru_nolock(lock);
307         }
308         cfs_spin_unlock(&ns->ns_lock);
309         EXIT;
310 }
311
312 /* This used to have a 'strict' flag, which recovery would use to mark an
313  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
314  * shall explain why it's gone: with the new hash table scheme, once you call
315  * ldlm_lock_destroy, you can never drop your final references on this lock.
316  * Because it's not in the hash table anymore.  -phil */
317 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
318 {
319         ENTRY;
320
321         if (lock->l_readers || lock->l_writers) {
322                 LDLM_ERROR(lock, "lock still has references");
323                 LBUG();
324         }
325
326         if (!cfs_list_empty(&lock->l_res_link)) {
327                 LDLM_ERROR(lock, "lock still on resource");
328                 LBUG();
329         }
330
331         if (lock->l_destroyed) {
332                 LASSERT(cfs_list_empty(&lock->l_lru));
333                 EXIT;
334                 return 0;
335         }
336         lock->l_destroyed = 1;
337
338         if (lock->l_export && lock->l_export->exp_lock_hash) {
339                 /* NB: it's safe to call cfs_hash_del() even lock isn't
340                  * in exp_lock_hash. */
341                 cfs_hash_del(lock->l_export->exp_lock_hash,
342                              &lock->l_remote_handle, &lock->l_exp_hash);
343         }
344
345         ldlm_lock_remove_from_lru(lock);
346         class_handle_unhash(&lock->l_handle);
347
348 #if 0
349         /* Wake anyone waiting for this lock */
350         /* FIXME: I should probably add yet another flag, instead of using
351          * l_export to only call this on clients */
352         if (lock->l_export)
353                 class_export_put(lock->l_export);
354         lock->l_export = NULL;
355         if (lock->l_export && lock->l_completion_ast)
356                 lock->l_completion_ast(lock, 0);
357 #endif
358         EXIT;
359         return 1;
360 }
361
362 void ldlm_lock_destroy(struct ldlm_lock *lock)
363 {
364         int first;
365         ENTRY;
366         lock_res_and_lock(lock);
367         first = ldlm_lock_destroy_internal(lock);
368         unlock_res_and_lock(lock);
369
370         /* drop reference from hashtable only for first destroy */
371         if (first) {
372                 lu_ref_del(&lock->l_reference, "hash", lock);
373                 LDLM_LOCK_RELEASE(lock);
374         }
375         EXIT;
376 }
377
378 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
379 {
380         int first;
381         ENTRY;
382         first = ldlm_lock_destroy_internal(lock);
383         /* drop reference from hashtable only for first destroy */
384         if (first) {
385                 lu_ref_del(&lock->l_reference, "hash", lock);
386                 LDLM_LOCK_RELEASE(lock);
387         }
388         EXIT;
389 }
390
391 /* this is called by portals_handle2object with the handle lock taken */
392 static void lock_handle_addref(void *lock)
393 {
394         LDLM_LOCK_GET((struct ldlm_lock *)lock);
395 }
396
397 static void lock_handle_free(void *lock, int size)
398 {
399         LASSERT(size == sizeof(struct ldlm_lock));
400         OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
401 }
402
403 struct portals_handle_ops lock_handle_ops = {
404         .hop_addref = lock_handle_addref,
405         .hop_free   = lock_handle_free,
406 };
407
408 /*
409  * usage: pass in a resource on which you have done ldlm_resource_get
410  *        new lock will take over the refcount.
411  * returns: lock with refcount 2 - one for current caller and one for remote
412  */
413 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
414 {
415         struct ldlm_lock *lock;
416         ENTRY;
417
418         if (resource == NULL)
419                 LBUG();
420
421         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, CFS_ALLOC_IO);
422         if (lock == NULL)
423                 RETURN(NULL);
424
425         cfs_spin_lock_init(&lock->l_lock);
426         lock->l_resource = resource;
427         lu_ref_add(&resource->lr_reference, "lock", lock);
428
429         cfs_atomic_set(&lock->l_refc, 2);
430         CFS_INIT_LIST_HEAD(&lock->l_res_link);
431         CFS_INIT_LIST_HEAD(&lock->l_lru);
432         CFS_INIT_LIST_HEAD(&lock->l_pending_chain);
433         CFS_INIT_LIST_HEAD(&lock->l_bl_ast);
434         CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
435         CFS_INIT_LIST_HEAD(&lock->l_rk_ast);
436         cfs_waitq_init(&lock->l_waitq);
437         lock->l_blocking_lock = NULL;
438         CFS_INIT_LIST_HEAD(&lock->l_sl_mode);
439         CFS_INIT_LIST_HEAD(&lock->l_sl_policy);
440         CFS_INIT_HLIST_NODE(&lock->l_exp_hash);
441         CFS_INIT_HLIST_NODE(&lock->l_exp_flock_hash);
442
443         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
444                              LDLM_NSS_LOCKS);
445         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
446         class_handle_hash(&lock->l_handle, &lock_handle_ops);
447
448         lu_ref_init(&lock->l_reference);
449         lu_ref_add(&lock->l_reference, "hash", lock);
450         lock->l_callback_timeout = 0;
451
452 #if LUSTRE_TRACKS_LOCK_EXP_REFS
453         CFS_INIT_LIST_HEAD(&lock->l_exp_refs_link);
454         lock->l_exp_refs_nr = 0;
455         lock->l_exp_refs_target = NULL;
456 #endif
457         CFS_INIT_LIST_HEAD(&lock->l_exp_list);
458
459         RETURN(lock);
460 }
461
462 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
463                               const struct ldlm_res_id *new_resid)
464 {
465         struct ldlm_resource *oldres = lock->l_resource;
466         struct ldlm_resource *newres;
467         int type;
468         ENTRY;
469
470         LASSERT(ns_is_client(ns));
471
472         lock_res_and_lock(lock);
473         if (memcmp(new_resid, &lock->l_resource->lr_name,
474                    sizeof(lock->l_resource->lr_name)) == 0) {
475                 /* Nothing to do */
476                 unlock_res_and_lock(lock);
477                 RETURN(0);
478         }
479
480         LASSERT(new_resid->name[0] != 0);
481
482         /* This function assumes that the lock isn't on any lists */
483         LASSERT(cfs_list_empty(&lock->l_res_link));
484
485         type = oldres->lr_type;
486         unlock_res_and_lock(lock);
487
488         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
489         if (newres == NULL)
490                 RETURN(-ENOMEM);
491
492         lu_ref_add(&newres->lr_reference, "lock", lock);
493         /*
494          * To flip the lock from the old to the new resource, lock, oldres and
495          * newres have to be locked. Resource spin-locks are nested within
496          * lock->l_lock, and are taken in the memory address order to avoid
497          * dead-locks.
498          */
499         cfs_spin_lock(&lock->l_lock);
500         oldres = lock->l_resource;
501         if (oldres < newres) {
502                 lock_res(oldres);
503                 lock_res_nested(newres, LRT_NEW);
504         } else {
505                 lock_res(newres);
506                 lock_res_nested(oldres, LRT_NEW);
507         }
508         LASSERT(memcmp(new_resid, &oldres->lr_name,
509                        sizeof oldres->lr_name) != 0);
510         lock->l_resource = newres;
511         unlock_res(oldres);
512         unlock_res_and_lock(lock);
513
514         /* ...and the flowers are still standing! */
515         lu_ref_del(&oldres->lr_reference, "lock", lock);
516         ldlm_resource_putref(oldres);
517
518         RETURN(0);
519 }
520
521 /*
522  *  HANDLES
523  */
524
525 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
526 {
527         lockh->cookie = lock->l_handle.h_cookie;
528 }
529
530 /* if flags: atomically get the lock and set the flags.
531  *           Return NULL if flag already set
532  */
533
534 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
535                                      int flags)
536 {
537         struct ldlm_lock *lock;
538         ENTRY;
539
540         LASSERT(handle);
541
542         lock = class_handle2object(handle->cookie);
543         if (lock == NULL)
544                 RETURN(NULL);
545
546         /* It's unlikely but possible that someone marked the lock as
547          * destroyed after we did handle2object on it */
548         if (flags == 0 && !lock->l_destroyed) {
549                 lu_ref_add(&lock->l_reference, "handle", cfs_current());
550                 RETURN(lock);
551         }
552
553         lock_res_and_lock(lock);
554
555         LASSERT(lock->l_resource != NULL);
556
557         lu_ref_add_atomic(&lock->l_reference, "handle", cfs_current());
558         if (unlikely(lock->l_destroyed)) {
559                 unlock_res_and_lock(lock);
560                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
561                 LDLM_LOCK_PUT(lock);
562                 RETURN(NULL);
563         }
564
565         if (flags && (lock->l_flags & flags)) {
566                 unlock_res_and_lock(lock);
567                 LDLM_LOCK_PUT(lock);
568                 RETURN(NULL);
569         }
570
571         if (flags)
572                 lock->l_flags |= flags;
573
574         unlock_res_and_lock(lock);
575         RETURN(lock);
576 }
577
578 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
579 {
580         struct obd_export *exp = lock->l_export?:lock->l_conn_export;
581         /* INODEBITS_INTEROP: If the other side does not support
582          * inodebits, reply with a plain lock descriptor.
583          */
584         if ((lock->l_resource->lr_type == LDLM_IBITS) &&
585             (exp && !(exp->exp_connect_flags & OBD_CONNECT_IBITS))) {
586                 /* Make sure all the right bits are set in this lock we
587                    are going to pass to client */
588                 LASSERTF(lock->l_policy_data.l_inodebits.bits ==
589                          (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
590                           MDS_INODELOCK_LAYOUT),
591                          "Inappropriate inode lock bits during "
592                          "conversion " LPU64 "\n",
593                          lock->l_policy_data.l_inodebits.bits);
594
595                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
596                 desc->l_resource.lr_type = LDLM_PLAIN;
597
598                 /* Convert "new" lock mode to something old client can
599                    understand */
600                 if ((lock->l_req_mode == LCK_CR) ||
601                     (lock->l_req_mode == LCK_CW))
602                         desc->l_req_mode = LCK_PR;
603                 else
604                         desc->l_req_mode = lock->l_req_mode;
605                 if ((lock->l_granted_mode == LCK_CR) ||
606                     (lock->l_granted_mode == LCK_CW)) {
607                         desc->l_granted_mode = LCK_PR;
608                 } else {
609                         /* We never grant PW/EX locks to clients */
610                         LASSERT((lock->l_granted_mode != LCK_PW) &&
611                                 (lock->l_granted_mode != LCK_EX));
612                         desc->l_granted_mode = lock->l_granted_mode;
613                 }
614
615                 /* We do not copy policy here, because there is no
616                    policy for plain locks */
617         } else {
618                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
619                 desc->l_req_mode = lock->l_req_mode;
620                 desc->l_granted_mode = lock->l_granted_mode;
621                 ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
622                                             &lock->l_policy_data,
623                                             &desc->l_policy_data);
624         }
625 }
626
627 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
628                            cfs_list_t *work_list)
629 {
630         if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
631                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
632                 lock->l_flags |= LDLM_FL_AST_SENT;
633                 /* If the enqueuing client said so, tell the AST recipient to
634                  * discard dirty data, rather than writing back. */
635                 if (new->l_flags & LDLM_AST_DISCARD_DATA)
636                         lock->l_flags |= LDLM_FL_DISCARD_DATA;
637                 LASSERT(cfs_list_empty(&lock->l_bl_ast));
638                 cfs_list_add(&lock->l_bl_ast, work_list);
639                 LDLM_LOCK_GET(lock);
640                 LASSERT(lock->l_blocking_lock == NULL);
641                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
642         }
643 }
644
645 void ldlm_add_cp_work_item(struct ldlm_lock *lock, cfs_list_t *work_list)
646 {
647         if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
648                 lock->l_flags |= LDLM_FL_CP_REQD;
649                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
650                 LASSERT(cfs_list_empty(&lock->l_cp_ast));
651                 cfs_list_add(&lock->l_cp_ast, work_list);
652                 LDLM_LOCK_GET(lock);
653         }
654 }
655
656 /* must be called with lr_lock held */
657 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
658                             cfs_list_t *work_list)
659 {
660         ENTRY;
661         check_res_locked(lock->l_resource);
662         if (new)
663                 ldlm_add_bl_work_item(lock, new, work_list);
664         else
665                 ldlm_add_cp_work_item(lock, work_list);
666         EXIT;
667 }
668
669 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
670 {
671         struct ldlm_lock *lock;
672
673         lock = ldlm_handle2lock(lockh);
674         LASSERT(lock != NULL);
675         ldlm_lock_addref_internal(lock, mode);
676         LDLM_LOCK_PUT(lock);
677 }
678
679 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
680 {
681         ldlm_lock_remove_from_lru(lock);
682         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
683                 lock->l_readers++;
684                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
685         }
686         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
687                 lock->l_writers++;
688                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
689         }
690         LDLM_LOCK_GET(lock);
691         lu_ref_add_atomic(&lock->l_reference, "user", lock);
692         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
693 }
694
695 /**
696  * Attempts to addref a lock, and fails if lock is already LDLM_FL_CBPENDING
697  * or destroyed.
698  *
699  * \retval 0 success, lock was addref-ed
700  *
701  * \retval -EAGAIN lock is being canceled.
702  */
703 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
704 {
705         struct ldlm_lock *lock;
706         int               result;
707
708         result = -EAGAIN;
709         lock = ldlm_handle2lock(lockh);
710         if (lock != NULL) {
711                 lock_res_and_lock(lock);
712                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
713                     !(lock->l_flags & LDLM_FL_CBPENDING)) {
714                         ldlm_lock_addref_internal_nolock(lock, mode);
715                         result = 0;
716                 }
717                 unlock_res_and_lock(lock);
718                 LDLM_LOCK_PUT(lock);
719         }
720         return result;
721 }
722
723 /* only called for local locks */
724 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
725 {
726         lock_res_and_lock(lock);
727         ldlm_lock_addref_internal_nolock(lock, mode);
728         unlock_res_and_lock(lock);
729 }
730
731 /* only called in ldlm_flock_destroy and for local locks.
732  *  * for LDLM_FLOCK type locks, l_blocking_ast is null, and
733  *   * ldlm_lock_remove_from_lru() does nothing, it is safe
734  *    * for ldlm_flock_destroy usage by dropping some code */
735 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
736 {
737         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
738         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
739                 LASSERT(lock->l_readers > 0);
740                 lu_ref_del(&lock->l_reference, "reader", lock);
741                 lock->l_readers--;
742         }
743         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
744                 LASSERT(lock->l_writers > 0);
745                 lu_ref_del(&lock->l_reference, "writer", lock);
746                 lock->l_writers--;
747         }
748
749         lu_ref_del(&lock->l_reference, "user", lock);
750         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
751 }
752
753 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
754 {
755         struct ldlm_namespace *ns;
756         ENTRY;
757
758         lock_res_and_lock(lock);
759
760         ns = ldlm_lock_to_ns(lock);
761
762         ldlm_lock_decref_internal_nolock(lock, mode);
763
764         if (lock->l_flags & LDLM_FL_LOCAL &&
765             !lock->l_readers && !lock->l_writers) {
766                 /* If this is a local lock on a server namespace and this was
767                  * the last reference, cancel the lock. */
768                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
769                 lock->l_flags |= LDLM_FL_CBPENDING;
770         }
771
772         if (!lock->l_readers && !lock->l_writers &&
773             (lock->l_flags & LDLM_FL_CBPENDING)) {
774                 /* If we received a blocked AST and this was the last reference,
775                  * run the callback. */
776                 if (lock->l_ns_srv && lock->l_export)
777                         CERROR("FL_CBPENDING set on non-local lock--just a "
778                                "warning\n");
779
780                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
781
782                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
783                 ldlm_lock_remove_from_lru(lock);
784                 unlock_res_and_lock(lock);
785
786                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
787                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
788
789                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
790                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
791                         ldlm_handle_bl_callback(ns, NULL, lock);
792         } else if (ns_is_client(ns) &&
793                    !lock->l_readers && !lock->l_writers &&
794                    !(lock->l_flags & LDLM_FL_NO_LRU) &&
795                    !(lock->l_flags & LDLM_FL_BL_AST)) {
796
797                 LDLM_DEBUG(lock, "add lock into lru list");
798
799                 /* If this is a client-side namespace and this was the last
800                  * reference, put it on the LRU. */
801                 ldlm_lock_add_to_lru(lock);
802                 unlock_res_and_lock(lock);
803
804                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
805                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
806
807                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
808                  * are not supported by the server, otherwise, it is done on
809                  * enqueue. */
810                 if (!exp_connect_cancelset(lock->l_conn_export) &&
811                     !ns_connect_lru_resize(ns))
812                         ldlm_cancel_lru(ns, 0, LDLM_ASYNC, 0);
813         } else {
814                 LDLM_DEBUG(lock, "do not add lock into lru list");
815                 unlock_res_and_lock(lock);
816         }
817
818         EXIT;
819 }
820
821 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
822 {
823         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
824         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
825         ldlm_lock_decref_internal(lock, mode);
826         LDLM_LOCK_PUT(lock);
827 }
828
829 /* This will drop a lock reference and mark it for destruction, but will not
830  * necessarily cancel the lock before returning. */
831 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
832 {
833         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
834         ENTRY;
835
836         LASSERT(lock != NULL);
837
838         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
839         lock_res_and_lock(lock);
840         lock->l_flags |= LDLM_FL_CBPENDING;
841         unlock_res_and_lock(lock);
842         ldlm_lock_decref_internal(lock, mode);
843         LDLM_LOCK_PUT(lock);
844 }
845
846 struct sl_insert_point {
847         cfs_list_t *res_link;
848         cfs_list_t *mode_link;
849         cfs_list_t *policy_link;
850 };
851
852 /*
853  * search_granted_lock
854  *
855  * Description:
856  *      Finds a position to insert the new lock.
857  * Parameters:
858  *      queue [input]:  the granted list where search acts on;
859  *      req [input]:    the lock whose position to be located;
860  *      prev [output]:  positions within 3 lists to insert @req to
861  * Return Value:
862  *      filled @prev
863  * NOTE: called by
864  *  - ldlm_grant_lock_with_skiplist
865  */
866 static void search_granted_lock(cfs_list_t *queue,
867                                 struct ldlm_lock *req,
868                                 struct sl_insert_point *prev)
869 {
870         cfs_list_t *tmp;
871         struct ldlm_lock *lock, *mode_end, *policy_end;
872         ENTRY;
873
874         cfs_list_for_each(tmp, queue) {
875                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
876
877                 mode_end = cfs_list_entry(lock->l_sl_mode.prev,
878                                           struct ldlm_lock, l_sl_mode);
879
880                 if (lock->l_req_mode != req->l_req_mode) {
881                         /* jump to last lock of mode group */
882                         tmp = &mode_end->l_res_link;
883                         continue;
884                 }
885
886                 /* suitable mode group is found */
887                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
888                         /* insert point is last lock of the mode group */
889                         prev->res_link = &mode_end->l_res_link;
890                         prev->mode_link = &mode_end->l_sl_mode;
891                         prev->policy_link = &req->l_sl_policy;
892                         EXIT;
893                         return;
894                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
895                         for (;;) {
896                                 policy_end =
897                                         cfs_list_entry(lock->l_sl_policy.prev,
898                                                        struct ldlm_lock,
899                                                        l_sl_policy);
900
901                                 if (lock->l_policy_data.l_inodebits.bits ==
902                                     req->l_policy_data.l_inodebits.bits) {
903                                         /* insert point is last lock of
904                                          * the policy group */
905                                         prev->res_link =
906                                                 &policy_end->l_res_link;
907                                         prev->mode_link =
908                                                 &policy_end->l_sl_mode;
909                                         prev->policy_link =
910                                                 &policy_end->l_sl_policy;
911                                         EXIT;
912                                         return;
913                                 }
914
915                                 if (policy_end == mode_end)
916                                         /* done with mode group */
917                                         break;
918
919                                 /* go to next policy group within mode group */
920                                 tmp = policy_end->l_res_link.next;
921                                 lock = cfs_list_entry(tmp, struct ldlm_lock,
922                                                       l_res_link);
923                         }  /* loop over policy groups within the mode group */
924
925                         /* insert point is last lock of the mode group,
926                          * new policy group is started */
927                         prev->res_link = &mode_end->l_res_link;
928                         prev->mode_link = &mode_end->l_sl_mode;
929                         prev->policy_link = &req->l_sl_policy;
930                         EXIT;
931                         return;
932                 } else {
933                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
934                         LBUG();
935                 }
936         }
937
938         /* insert point is last lock on the queue,
939          * new mode group and new policy group are started */
940         prev->res_link = queue->prev;
941         prev->mode_link = &req->l_sl_mode;
942         prev->policy_link = &req->l_sl_policy;
943         EXIT;
944         return;
945 }
946
947 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
948                                        struct sl_insert_point *prev)
949 {
950         struct ldlm_resource *res = lock->l_resource;
951         ENTRY;
952
953         check_res_locked(res);
954
955         ldlm_resource_dump(D_INFO, res);
956         LDLM_DEBUG(lock, "About to add lock:");
957
958         if (lock->l_destroyed) {
959                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
960                 return;
961         }
962
963         LASSERT(cfs_list_empty(&lock->l_res_link));
964         LASSERT(cfs_list_empty(&lock->l_sl_mode));
965         LASSERT(cfs_list_empty(&lock->l_sl_policy));
966
967         cfs_list_add(&lock->l_res_link, prev->res_link);
968         cfs_list_add(&lock->l_sl_mode, prev->mode_link);
969         cfs_list_add(&lock->l_sl_policy, prev->policy_link);
970
971         EXIT;
972 }
973
974 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
975 {
976         struct sl_insert_point prev;
977         ENTRY;
978
979         LASSERT(lock->l_req_mode == lock->l_granted_mode);
980
981         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
982         ldlm_granted_list_add_lock(lock, &prev);
983         EXIT;
984 }
985
986 /* NOTE: called by
987  *  - ldlm_lock_enqueue
988  *  - ldlm_reprocess_queue
989  *  - ldlm_lock_convert
990  *
991  * must be called with lr_lock held
992  */
993 void ldlm_grant_lock(struct ldlm_lock *lock, cfs_list_t *work_list)
994 {
995         struct ldlm_resource *res = lock->l_resource;
996         ENTRY;
997
998         check_res_locked(res);
999
1000         lock->l_granted_mode = lock->l_req_mode;
1001         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1002                 ldlm_grant_lock_with_skiplist(lock);
1003         else if (res->lr_type == LDLM_EXTENT)
1004                 ldlm_extent_add_lock(res, lock);
1005         else
1006                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1007
1008         if (lock->l_granted_mode < res->lr_most_restr)
1009                 res->lr_most_restr = lock->l_granted_mode;
1010
1011         if (work_list && lock->l_completion_ast != NULL)
1012                 ldlm_add_ast_work_item(lock, NULL, work_list);
1013
1014         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1015         EXIT;
1016 }
1017
1018 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
1019  * comment above ldlm_lock_match */
1020 static struct ldlm_lock *search_queue(cfs_list_t *queue,
1021                                       ldlm_mode_t *mode,
1022                                       ldlm_policy_data_t *policy,
1023                                       struct ldlm_lock *old_lock,
1024                                       int flags, int unref)
1025 {
1026         struct ldlm_lock *lock;
1027         cfs_list_t       *tmp;
1028
1029         cfs_list_for_each(tmp, queue) {
1030                 ldlm_mode_t match;
1031
1032                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1033
1034                 if (lock == old_lock)
1035                         break;
1036
1037                 /* llite sometimes wants to match locks that will be
1038                  * canceled when their users drop, but we allow it to match
1039                  * if it passes in CBPENDING and the lock still has users.
1040                  * this is generally only going to be used by children
1041                  * whose parents already hold a lock so forward progress
1042                  * can still happen. */
1043                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1044                     !(flags & LDLM_FL_CBPENDING))
1045                         continue;
1046                 if (!unref && lock->l_flags & LDLM_FL_CBPENDING &&
1047                     lock->l_readers == 0 && lock->l_writers == 0)
1048                         continue;
1049
1050                 if (!(lock->l_req_mode & *mode))
1051                         continue;
1052                 match = lock->l_req_mode;
1053
1054                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
1055                     (lock->l_policy_data.l_extent.start >
1056                      policy->l_extent.start ||
1057                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
1058                         continue;
1059
1060                 if (unlikely(match == LCK_GROUP) &&
1061                     lock->l_resource->lr_type == LDLM_EXTENT &&
1062                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1063                         continue;
1064
1065                 /* We match if we have existing lock with same or wider set
1066                    of bits. */
1067                 if (lock->l_resource->lr_type == LDLM_IBITS &&
1068                      ((lock->l_policy_data.l_inodebits.bits &
1069                       policy->l_inodebits.bits) !=
1070                       policy->l_inodebits.bits))
1071                         continue;
1072
1073                 if (!unref &&
1074                     (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1075                      lock->l_failed))
1076                         continue;
1077
1078                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
1079                     !(lock->l_flags & LDLM_FL_LOCAL))
1080                         continue;
1081
1082                 if (flags & LDLM_FL_TEST_LOCK) {
1083                         LDLM_LOCK_GET(lock);
1084                         ldlm_lock_touch_in_lru(lock);
1085                 } else {
1086                         ldlm_lock_addref_internal_nolock(lock, match);
1087                 }
1088                 *mode = match;
1089                 return lock;
1090         }
1091
1092         return NULL;
1093 }
1094
1095 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1096 {
1097         if (!lock->l_failed) {
1098                 lock->l_failed = 1;
1099                 cfs_waitq_broadcast(&lock->l_waitq);
1100         }
1101 }
1102 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1103
1104 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1105 {
1106         lock_res_and_lock(lock);
1107         ldlm_lock_fail_match_locked(lock);
1108         unlock_res_and_lock(lock);
1109 }
1110 EXPORT_SYMBOL(ldlm_lock_fail_match);
1111
1112 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1113 {
1114         lock->l_flags |= LDLM_FL_LVB_READY;
1115         cfs_waitq_broadcast(&lock->l_waitq);
1116 }
1117
1118 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1119 {
1120         lock_res_and_lock(lock);
1121         ldlm_lock_allow_match_locked(lock);
1122         unlock_res_and_lock(lock);
1123 }
1124
1125 /* Can be called in two ways:
1126  *
1127  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1128  * for a duplicate of.
1129  *
1130  * Otherwise, all of the fields must be filled in, to match against.
1131  *
1132  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1133  *     server (ie, connh is NULL)
1134  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1135  *     list will be considered
1136  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1137  *     to be canceled can still be matched as long as they still have reader
1138  *     or writer refernces
1139  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1140  *     just tell us if we would have matched.
1141  *
1142  * Returns 1 if it finds an already-existing lock that is compatible; in this
1143  * case, lockh is filled in with a addref()ed lock
1144  *
1145  * we also check security context, if that failed we simply return 0 (to keep
1146  * caller code unchanged), the context failure will be discovered by caller
1147  * sometime later.
1148  */
1149 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
1150                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1151                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1152                             struct lustre_handle *lockh, int unref)
1153 {
1154         struct ldlm_resource *res;
1155         struct ldlm_lock *lock, *old_lock = NULL;
1156         int rc = 0;
1157         ENTRY;
1158
1159         if (ns == NULL) {
1160                 old_lock = ldlm_handle2lock(lockh);
1161                 LASSERT(old_lock);
1162
1163                 ns = ldlm_lock_to_ns(old_lock);
1164                 res_id = &old_lock->l_resource->lr_name;
1165                 type = old_lock->l_resource->lr_type;
1166                 mode = old_lock->l_req_mode;
1167         }
1168
1169         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1170         if (res == NULL) {
1171                 LASSERT(old_lock == NULL);
1172                 RETURN(0);
1173         }
1174
1175         LDLM_RESOURCE_ADDREF(res);
1176         lock_res(res);
1177
1178         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1179                             flags, unref);
1180         if (lock != NULL)
1181                 GOTO(out, rc = 1);
1182         if (flags & LDLM_FL_BLOCK_GRANTED)
1183                 GOTO(out, rc = 0);
1184         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1185                             flags, unref);
1186         if (lock != NULL)
1187                 GOTO(out, rc = 1);
1188         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1189                             flags, unref);
1190         if (lock != NULL)
1191                 GOTO(out, rc = 1);
1192
1193         EXIT;
1194  out:
1195         unlock_res(res);
1196         LDLM_RESOURCE_DELREF(res);
1197         ldlm_resource_putref(res);
1198
1199         if (lock) {
1200                 ldlm_lock2handle(lock, lockh);
1201                 if ((flags & LDLM_FL_LVB_READY) &&
1202                     (!(lock->l_flags & LDLM_FL_LVB_READY))) {
1203                         struct l_wait_info lwi;
1204                         if (lock->l_completion_ast) {
1205                                 int err = lock->l_completion_ast(lock,
1206                                                           LDLM_FL_WAIT_NOREPROC,
1207                                                                  NULL);
1208                                 if (err) {
1209                                         if (flags & LDLM_FL_TEST_LOCK)
1210                                                 LDLM_LOCK_RELEASE(lock);
1211                                         else
1212                                                 ldlm_lock_decref_internal(lock,
1213                                                                           mode);
1214                                         rc = 0;
1215                                         goto out2;
1216                                 }
1217                         }
1218
1219                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1220                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1221
1222                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1223                         l_wait_event(lock->l_waitq,
1224                                      lock->l_flags & LDLM_FL_LVB_READY ||
1225                                      lock->l_failed,
1226                                      &lwi);
1227                         if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
1228                                 if (flags & LDLM_FL_TEST_LOCK)
1229                                         LDLM_LOCK_RELEASE(lock);
1230                                 else
1231                                         ldlm_lock_decref_internal(lock, mode);
1232                                 rc = 0;
1233                         }
1234                 }
1235         }
1236  out2:
1237         if (rc) {
1238                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1239                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1240                                 res_id->name[2] : policy->l_extent.start,
1241                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1242                                 res_id->name[3] : policy->l_extent.end);
1243
1244                 /* check user's security context */
1245                 if (lock->l_conn_export &&
1246                     sptlrpc_import_check_ctx(
1247                                 class_exp2cliimp(lock->l_conn_export))) {
1248                         if (!(flags & LDLM_FL_TEST_LOCK))
1249                                 ldlm_lock_decref_internal(lock, mode);
1250                         rc = 0;
1251                 }
1252
1253                 if (flags & LDLM_FL_TEST_LOCK)
1254                         LDLM_LOCK_RELEASE(lock);
1255
1256         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1257                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1258                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1259                                   type, mode, res_id->name[0], res_id->name[1],
1260                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1261                                         res_id->name[2] :policy->l_extent.start,
1262                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1263                                         res_id->name[3] : policy->l_extent.end);
1264         }
1265         if (old_lock)
1266                 LDLM_LOCK_PUT(old_lock);
1267
1268         return rc ? mode : 0;
1269 }
1270
1271 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1272                                         __u64 *bits)
1273 {
1274         struct ldlm_lock *lock;
1275         ldlm_mode_t mode = 0;
1276         ENTRY;
1277
1278         lock = ldlm_handle2lock(lockh);
1279         if (lock != NULL) {
1280                 lock_res_and_lock(lock);
1281                 if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1282                     lock->l_failed)
1283                         GOTO(out, mode);
1284
1285                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1286                     lock->l_readers == 0 && lock->l_writers == 0)
1287                         GOTO(out, mode);
1288
1289                 if (bits)
1290                         *bits = lock->l_policy_data.l_inodebits.bits;
1291                 mode = lock->l_granted_mode;
1292                 ldlm_lock_addref_internal_nolock(lock, mode);
1293         }
1294
1295         EXIT;
1296
1297 out:
1298         if (lock != NULL) {
1299                 unlock_res_and_lock(lock);
1300                 LDLM_LOCK_PUT(lock);
1301         }
1302         return mode;
1303 }
1304 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1305
1306 /* Returns a referenced lock */
1307 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1308                                    const struct ldlm_res_id *res_id,
1309                                    ldlm_type_t type,
1310                                    ldlm_mode_t mode,
1311                                    const struct ldlm_callback_suite *cbs,
1312                                    void *data, __u32 lvb_len)
1313 {
1314         struct ldlm_lock *lock;
1315         struct ldlm_resource *res;
1316         ENTRY;
1317
1318         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1319         if (res == NULL)
1320                 RETURN(NULL);
1321
1322         lock = ldlm_lock_new(res);
1323
1324         if (lock == NULL)
1325                 RETURN(NULL);
1326
1327         lock->l_req_mode = mode;
1328         lock->l_ast_data = data;
1329         lock->l_pid = cfs_curproc_pid();
1330         lock->l_ns_srv = !!ns_is_server(ns);
1331         if (cbs) {
1332                 lock->l_blocking_ast = cbs->lcs_blocking;
1333                 lock->l_completion_ast = cbs->lcs_completion;
1334                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1335                 lock->l_weigh_ast = cbs->lcs_weigh;
1336         }
1337
1338         lock->l_tree_node = NULL;
1339         /* if this is the extent lock, allocate the interval tree node */
1340         if (type == LDLM_EXTENT) {
1341                 if (ldlm_interval_alloc(lock) == NULL)
1342                         GOTO(out, 0);
1343         }
1344
1345         if (lvb_len) {
1346                 lock->l_lvb_len = lvb_len;
1347                 OBD_ALLOC(lock->l_lvb_data, lvb_len);
1348                 if (lock->l_lvb_data == NULL)
1349                         GOTO(out, 0);
1350         }
1351
1352         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1353                 GOTO(out, 0);
1354
1355         RETURN(lock);
1356
1357 out:
1358         ldlm_lock_destroy(lock);
1359         LDLM_LOCK_RELEASE(lock);
1360         return NULL;
1361 }
1362
1363 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1364                                struct ldlm_lock **lockp,
1365                                void *cookie, int *flags)
1366 {
1367         struct ldlm_lock *lock = *lockp;
1368         struct ldlm_resource *res = lock->l_resource;
1369         int local = ns_is_client(ldlm_res_to_ns(res));
1370 #ifdef HAVE_SERVER_SUPPORT
1371         ldlm_processing_policy policy;
1372 #endif
1373         ldlm_error_t rc = ELDLM_OK;
1374         struct ldlm_interval *node = NULL;
1375         ENTRY;
1376
1377         lock->l_last_activity = cfs_time_current_sec();
1378         /* policies are not executed on the client or during replay */
1379         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1380             && !local && ns->ns_policy) {
1381                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1382                                    NULL);
1383                 if (rc == ELDLM_LOCK_REPLACED) {
1384                         /* The lock that was returned has already been granted,
1385                          * and placed into lockp.  If it's not the same as the
1386                          * one we passed in, then destroy the old one and our
1387                          * work here is done. */
1388                         if (lock != *lockp) {
1389                                 ldlm_lock_destroy(lock);
1390                                 LDLM_LOCK_RELEASE(lock);
1391                         }
1392                         *flags |= LDLM_FL_LOCK_CHANGED;
1393                         RETURN(0);
1394                 } else if (rc != ELDLM_OK ||
1395                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1396                         ldlm_lock_destroy(lock);
1397                         RETURN(rc);
1398                 }
1399         }
1400
1401         /* For a replaying lock, it might be already in granted list. So
1402          * unlinking the lock will cause the interval node to be freed, we
1403          * have to allocate the interval node early otherwise we can't regrant
1404          * this lock in the future. - jay */
1405         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1406                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1407
1408         lock_res_and_lock(lock);
1409         if (local && lock->l_req_mode == lock->l_granted_mode) {
1410                 /* The server returned a blocked lock, but it was granted
1411                  * before we got a chance to actually enqueue it.  We don't
1412                  * need to do anything else. */
1413                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1414                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1415                 GOTO(out, ELDLM_OK);
1416         }
1417
1418         ldlm_resource_unlink_lock(lock);
1419         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1420                 if (node == NULL) {
1421                         ldlm_lock_destroy_nolock(lock);
1422                         GOTO(out, rc = -ENOMEM);
1423                 }
1424
1425                 CFS_INIT_LIST_HEAD(&node->li_group);
1426                 ldlm_interval_attach(node, lock);
1427                 node = NULL;
1428         }
1429
1430         /* Some flags from the enqueue want to make it into the AST, via the
1431          * lock's l_flags. */
1432         lock->l_flags |= *flags & LDLM_AST_DISCARD_DATA;
1433
1434         /* This distinction between local lock trees is very important; a client
1435          * namespace only has information about locks taken by that client, and
1436          * thus doesn't have enough information to decide for itself if it can
1437          * be granted (below).  In this case, we do exactly what the server
1438          * tells us to do, as dictated by the 'flags'.
1439          *
1440          * We do exactly the same thing during recovery, when the server is
1441          * more or less trusting the clients not to lie.
1442          *
1443          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1444          * granted/converting queues. */
1445         if (local) {
1446                 if (*flags & LDLM_FL_BLOCK_CONV)
1447                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1448                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1449                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1450                 else
1451                         ldlm_grant_lock(lock, NULL);
1452                 GOTO(out, ELDLM_OK);
1453 #ifdef HAVE_SERVER_SUPPORT
1454         } else if (*flags & LDLM_FL_REPLAY) {
1455                 if (*flags & LDLM_FL_BLOCK_CONV) {
1456                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1457                         GOTO(out, ELDLM_OK);
1458                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1459                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1460                         GOTO(out, ELDLM_OK);
1461                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1462                         ldlm_grant_lock(lock, NULL);
1463                         GOTO(out, ELDLM_OK);
1464                 }
1465                 /* If no flags, fall through to normal enqueue path. */
1466         }
1467
1468         policy = ldlm_processing_policy_table[res->lr_type];
1469         policy(lock, flags, 1, &rc, NULL);
1470         GOTO(out, rc);
1471 #else
1472         } else {
1473                 CERROR("This is client-side-only module, cannot handle "
1474                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1475                 LBUG();
1476         }
1477 #endif
1478
1479 out:
1480         unlock_res_and_lock(lock);
1481         if (node)
1482                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1483         return rc;
1484 }
1485
1486 #ifdef HAVE_SERVER_SUPPORT
1487 /* Must be called with namespace taken: queue is waiting or converting. */
1488 int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue,
1489                          cfs_list_t *work_list)
1490 {
1491         cfs_list_t *tmp, *pos;
1492         ldlm_processing_policy policy;
1493         int flags;
1494         int rc = LDLM_ITER_CONTINUE;
1495         ldlm_error_t err;
1496         ENTRY;
1497
1498         check_res_locked(res);
1499
1500         policy = ldlm_processing_policy_table[res->lr_type];
1501         LASSERT(policy);
1502
1503         cfs_list_for_each_safe(tmp, pos, queue) {
1504                 struct ldlm_lock *pending;
1505                 pending = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1506
1507                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1508
1509                 flags = 0;
1510                 rc = policy(pending, &flags, 0, &err, work_list);
1511                 if (rc != LDLM_ITER_CONTINUE)
1512                         break;
1513         }
1514
1515         RETURN(rc);
1516 }
1517 #endif
1518
1519 static int
1520 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1521 {
1522         struct ldlm_cb_set_arg *arg = opaq;
1523         struct ldlm_lock_desc   d;
1524         int                     rc;
1525         struct ldlm_lock       *lock;
1526         ENTRY;
1527
1528         if (cfs_list_empty(arg->list))
1529                 RETURN(-ENOENT);
1530
1531         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1532
1533         /* nobody should touch l_bl_ast */
1534         lock_res_and_lock(lock);
1535         cfs_list_del_init(&lock->l_bl_ast);
1536
1537         LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1538         LASSERT(lock->l_bl_ast_run == 0);
1539         LASSERT(lock->l_blocking_lock);
1540         lock->l_bl_ast_run++;
1541         unlock_res_and_lock(lock);
1542
1543         ldlm_lock2desc(lock->l_blocking_lock, &d);
1544
1545         rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1546         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1547         lock->l_blocking_lock = NULL;
1548         LDLM_LOCK_RELEASE(lock);
1549
1550         RETURN(rc);
1551 }
1552
1553 static int
1554 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1555 {
1556         struct ldlm_cb_set_arg  *arg = opaq;
1557         int                      rc = 0;
1558         struct ldlm_lock        *lock;
1559         ldlm_completion_callback completion_callback;
1560         ENTRY;
1561
1562         if (cfs_list_empty(arg->list))
1563                 RETURN(-ENOENT);
1564
1565         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1566
1567         /* It's possible to receive a completion AST before we've set
1568          * the l_completion_ast pointer: either because the AST arrived
1569          * before the reply, or simply because there's a small race
1570          * window between receiving the reply and finishing the local
1571          * enqueue. (bug 842)
1572          *
1573          * This can't happen with the blocking_ast, however, because we
1574          * will never call the local blocking_ast until we drop our
1575          * reader/writer reference, which we won't do until we get the
1576          * reply and finish enqueueing. */
1577
1578         /* nobody should touch l_cp_ast */
1579         lock_res_and_lock(lock);
1580         cfs_list_del_init(&lock->l_cp_ast);
1581         LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
1582         /* save l_completion_ast since it can be changed by
1583          * mds_intent_policy(), see bug 14225 */
1584         completion_callback = lock->l_completion_ast;
1585         lock->l_flags &= ~LDLM_FL_CP_REQD;
1586         unlock_res_and_lock(lock);
1587
1588         if (completion_callback != NULL)
1589                 rc = completion_callback(lock, 0, (void *)arg);
1590         LDLM_LOCK_RELEASE(lock);
1591
1592         RETURN(rc);
1593 }
1594
1595 static int
1596 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1597 {
1598         struct ldlm_cb_set_arg *arg = opaq;
1599         struct ldlm_lock_desc   desc;
1600         int                     rc;
1601         struct ldlm_lock       *lock;
1602         ENTRY;
1603
1604         if (cfs_list_empty(arg->list))
1605                 RETURN(-ENOENT);
1606
1607         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1608         cfs_list_del_init(&lock->l_rk_ast);
1609
1610         /* the desc just pretend to exclusive */
1611         ldlm_lock2desc(lock, &desc);
1612         desc.l_req_mode = LCK_EX;
1613         desc.l_granted_mode = 0;
1614
1615         rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1616         LDLM_LOCK_RELEASE(lock);
1617
1618         RETURN(rc);
1619 }
1620
1621 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1622 {
1623         struct ldlm_cb_set_arg          *arg = opaq;
1624         struct ldlm_glimpse_work        *gl_work;
1625         struct ldlm_lock                *lock;
1626         int                              rc = 0;
1627         ENTRY;
1628
1629         if (cfs_list_empty(arg->list))
1630                 RETURN(-ENOENT);
1631
1632         gl_work = cfs_list_entry(arg->list->next, struct ldlm_glimpse_work,
1633                                  gl_list);
1634         cfs_list_del_init(&gl_work->gl_list);
1635
1636         lock = gl_work->gl_lock;
1637         if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
1638                 rc = 1;
1639
1640         LDLM_LOCK_RELEASE(lock);
1641
1642         if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
1643                 OBD_FREE_PTR(gl_work);
1644
1645         RETURN(rc);
1646 }
1647
1648 int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
1649                       ldlm_desc_ast_t ast_type)
1650 {
1651         struct ldlm_cb_set_arg *arg;
1652         set_producer_func       work_ast_lock;
1653         int                     rc;
1654
1655         if (cfs_list_empty(rpc_list))
1656                 RETURN(0);
1657
1658         OBD_ALLOC_PTR(arg);
1659         if (arg == NULL)
1660                 RETURN(-ENOMEM);
1661
1662         cfs_atomic_set(&arg->restart, 0);
1663         arg->list = rpc_list;
1664
1665         switch (ast_type) {
1666                 case LDLM_WORK_BL_AST:
1667                         arg->type = LDLM_BL_CALLBACK;
1668                         work_ast_lock = ldlm_work_bl_ast_lock;
1669                         break;
1670                 case LDLM_WORK_CP_AST:
1671                         arg->type = LDLM_CP_CALLBACK;
1672                         work_ast_lock = ldlm_work_cp_ast_lock;
1673                         break;
1674                 case LDLM_WORK_REVOKE_AST:
1675                         arg->type = LDLM_BL_CALLBACK;
1676                         work_ast_lock = ldlm_work_revoke_ast_lock;
1677                         break;
1678                 case LDLM_WORK_GL_AST:
1679                         arg->type = LDLM_GL_CALLBACK;
1680                         work_ast_lock = ldlm_work_gl_ast_lock;
1681                         break;
1682                 default:
1683                         LBUG();
1684         }
1685
1686         /* We create a ptlrpc request set with flow control extension.
1687          * This request set will use the work_ast_lock function to produce new
1688          * requests and will send a new request each time one completes in order
1689          * to keep the number of requests in flight to ns_max_parallel_ast */
1690         arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1691                                      work_ast_lock, arg);
1692         if (arg->set == NULL)
1693                 GOTO(out, rc = -ENOMEM);
1694
1695         ptlrpc_set_wait(arg->set);
1696         ptlrpc_set_destroy(arg->set);
1697
1698         rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0;
1699         GOTO(out, rc);
1700 out:
1701         OBD_FREE_PTR(arg);
1702         return rc;
1703 }
1704
1705 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
1706 {
1707         ldlm_reprocess_all(res);
1708         return LDLM_ITER_CONTINUE;
1709 }
1710
1711 static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1712                               cfs_hlist_node_t *hnode, void *arg)
1713 {
1714         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1715         int    rc;
1716
1717         rc = reprocess_one_queue(res, arg);
1718
1719         return rc == LDLM_ITER_STOP;
1720 }
1721
1722 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
1723 {
1724         ENTRY;
1725
1726         if (ns != NULL) {
1727                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
1728                                          ldlm_reprocess_res, NULL);
1729         }
1730         EXIT;
1731 }
1732
1733 void ldlm_reprocess_all(struct ldlm_resource *res)
1734 {
1735         CFS_LIST_HEAD(rpc_list);
1736
1737 #ifdef HAVE_SERVER_SUPPORT
1738         int rc;
1739         ENTRY;
1740         /* Local lock trees don't get reprocessed. */
1741         if (ns_is_client(ldlm_res_to_ns(res))) {
1742                 EXIT;
1743                 return;
1744         }
1745
1746 restart:
1747         lock_res(res);
1748         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
1749         if (rc == LDLM_ITER_CONTINUE)
1750                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
1751         unlock_res(res);
1752
1753         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
1754                                LDLM_WORK_CP_AST);
1755         if (rc == -ERESTART) {
1756                 LASSERT(cfs_list_empty(&rpc_list));
1757                 goto restart;
1758         }
1759 #else
1760         ENTRY;
1761         if (!ns_is_client(ldlm_res_to_ns(res))) {
1762                 CERROR("This is client-side-only module, cannot handle "
1763                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1764                 LBUG();
1765         }
1766 #endif
1767         EXIT;
1768 }
1769
1770 void ldlm_cancel_callback(struct ldlm_lock *lock)
1771 {
1772         check_res_locked(lock->l_resource);
1773         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
1774                 lock->l_flags |= LDLM_FL_CANCEL;
1775                 if (lock->l_blocking_ast) {
1776                         // l_check_no_ns_lock(ns);
1777                         unlock_res_and_lock(lock);
1778                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
1779                                              LDLM_CB_CANCELING);
1780                         lock_res_and_lock(lock);
1781                 } else {
1782                         LDLM_DEBUG(lock, "no blocking ast");
1783                 }
1784         }
1785         lock->l_flags |= LDLM_FL_BL_DONE;
1786 }
1787
1788 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
1789 {
1790         if (req->l_resource->lr_type != LDLM_PLAIN &&
1791             req->l_resource->lr_type != LDLM_IBITS)
1792                 return;
1793
1794         cfs_list_del_init(&req->l_sl_policy);
1795         cfs_list_del_init(&req->l_sl_mode);
1796 }
1797
1798 void ldlm_lock_cancel(struct ldlm_lock *lock)
1799 {
1800         struct ldlm_resource *res;
1801         struct ldlm_namespace *ns;
1802         ENTRY;
1803
1804         lock_res_and_lock(lock);
1805
1806         res = lock->l_resource;
1807         ns  = ldlm_res_to_ns(res);
1808
1809         /* Please do not, no matter how tempting, remove this LBUG without
1810          * talking to me first. -phik */
1811         if (lock->l_readers || lock->l_writers) {
1812                 LDLM_ERROR(lock, "lock still has references");
1813                 LBUG();
1814         }
1815
1816         if (lock->l_waited)
1817                 ldlm_del_waiting_lock(lock);
1818
1819         /* Releases cancel callback. */
1820         ldlm_cancel_callback(lock);
1821
1822         /* Yes, second time, just in case it was added again while we were
1823            running with no res lock in ldlm_cancel_callback */
1824         if (lock->l_waited)
1825                 ldlm_del_waiting_lock(lock);
1826
1827         ldlm_resource_unlink_lock(lock);
1828         ldlm_lock_destroy_nolock(lock);
1829
1830         if (lock->l_granted_mode == lock->l_req_mode)
1831                 ldlm_pool_del(&ns->ns_pool, lock);
1832
1833         /* Make sure we will not be called again for same lock what is possible
1834          * if not to zero out lock->l_granted_mode */
1835         lock->l_granted_mode = LCK_MINMODE;
1836         unlock_res_and_lock(lock);
1837
1838         EXIT;
1839 }
1840
1841 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
1842 {
1843         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1844         int rc = -EINVAL;
1845         ENTRY;
1846
1847         if (lock) {
1848                 if (lock->l_ast_data == NULL)
1849                         lock->l_ast_data = data;
1850                 if (lock->l_ast_data == data)
1851                         rc = 0;
1852                 LDLM_LOCK_PUT(lock);
1853         }
1854         RETURN(rc);
1855 }
1856 EXPORT_SYMBOL(ldlm_lock_set_data);
1857
1858 struct export_cl_data {
1859         struct obd_export       *ecl_exp;
1860         int                     ecl_loop;
1861 };
1862
1863 int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1864                                     cfs_hlist_node_t *hnode, void *data)
1865
1866 {
1867         struct export_cl_data   *ecl = (struct export_cl_data *)data;
1868         struct obd_export       *exp  = ecl->ecl_exp;
1869         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
1870         struct ldlm_resource *res;
1871
1872         res = ldlm_resource_getref(lock->l_resource);
1873         LDLM_LOCK_GET(lock);
1874
1875         LDLM_DEBUG(lock, "export %p", exp);
1876         ldlm_res_lvbo_update(res, NULL, 1);
1877         ldlm_lock_cancel(lock);
1878         ldlm_reprocess_all(res);
1879         ldlm_resource_putref(res);
1880         LDLM_LOCK_RELEASE(lock);
1881
1882         ecl->ecl_loop++;
1883         if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
1884                 CDEBUG(D_INFO,
1885                        "Cancel lock %p for export %p (loop %d), still have "
1886                        "%d locks left on hash table.\n",
1887                        lock, exp, ecl->ecl_loop,
1888                        cfs_atomic_read(&hs->hs_count));
1889         }
1890
1891         return 0;
1892 }
1893
1894 void ldlm_cancel_locks_for_export(struct obd_export *exp)
1895 {
1896         struct export_cl_data   ecl = {
1897                 .ecl_exp        = exp,
1898                 .ecl_loop       = 0,
1899         };
1900
1901         cfs_hash_for_each_empty(exp->exp_lock_hash,
1902                                 ldlm_cancel_locks_for_export_cb, &ecl);
1903 }
1904
1905 /**
1906  * Downgrade an exclusive lock.
1907  *
1908  * A fast variant of ldlm_lock_convert for convertion of exclusive
1909  * locks. The convertion is always successful.
1910  *
1911  * \param lock A lock to convert
1912  * \param new_mode new lock mode
1913  */
1914 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
1915 {
1916         ENTRY;
1917
1918         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
1919         LASSERT(new_mode == LCK_COS);
1920
1921         lock_res_and_lock(lock);
1922         ldlm_resource_unlink_lock(lock);
1923         /*
1924          * Remove the lock from pool as it will be added again in
1925          * ldlm_grant_lock() called below.
1926          */
1927         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
1928
1929         lock->l_req_mode = new_mode;
1930         ldlm_grant_lock(lock, NULL);
1931         unlock_res_and_lock(lock);
1932         ldlm_reprocess_all(lock->l_resource);
1933
1934         EXIT;
1935 }
1936
1937 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
1938                                         __u32 *flags)
1939 {
1940         CFS_LIST_HEAD(rpc_list);
1941         struct ldlm_resource *res;
1942         struct ldlm_namespace *ns;
1943         int granted = 0;
1944 #ifdef HAVE_SERVER_SUPPORT
1945         int old_mode;
1946         struct sl_insert_point prev;
1947 #endif
1948         struct ldlm_interval *node;
1949         ENTRY;
1950
1951         if (new_mode == lock->l_granted_mode) { // No changes? Just return.
1952                 *flags |= LDLM_FL_BLOCK_GRANTED;
1953                 RETURN(lock->l_resource);
1954         }
1955
1956         /* I can't check the type of lock here because the bitlock of lock
1957          * is not held here, so do the allocation blindly. -jay */
1958         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1959         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
1960                 RETURN(NULL);
1961
1962         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
1963                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
1964
1965         lock_res_and_lock(lock);
1966
1967         res = lock->l_resource;
1968         ns  = ldlm_res_to_ns(res);
1969
1970 #ifdef HAVE_SERVER_SUPPORT
1971         old_mode = lock->l_req_mode;
1972 #endif
1973         lock->l_req_mode = new_mode;
1974         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
1975 #ifdef HAVE_SERVER_SUPPORT
1976                 /* remember the lock position where the lock might be
1977                  * added back to the granted list later and also
1978                  * remember the join mode for skiplist fixing. */
1979                 prev.res_link = lock->l_res_link.prev;
1980                 prev.mode_link = lock->l_sl_mode.prev;
1981                 prev.policy_link = lock->l_sl_policy.prev;
1982 #endif
1983                 ldlm_resource_unlink_lock(lock);
1984         } else {
1985                 ldlm_resource_unlink_lock(lock);
1986                 if (res->lr_type == LDLM_EXTENT) {
1987                         /* FIXME: ugly code, I have to attach the lock to a
1988                          * interval node again since perhaps it will be granted
1989                          * soon */
1990                         CFS_INIT_LIST_HEAD(&node->li_group);
1991                         ldlm_interval_attach(node, lock);
1992                         node = NULL;
1993                 }
1994         }
1995
1996         /*
1997          * Remove old lock from the pool before adding the lock with new
1998          * mode below in ->policy()
1999          */
2000         ldlm_pool_del(&ns->ns_pool, lock);
2001
2002         /* If this is a local resource, put it on the appropriate list. */
2003         if (ns_is_client(ldlm_res_to_ns(res))) {
2004                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
2005                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
2006                 } else {
2007                         /* This should never happen, because of the way the
2008                          * server handles conversions. */
2009                         LDLM_ERROR(lock, "Erroneous flags %d on local lock\n",
2010                                    *flags);
2011                         LBUG();
2012
2013                         ldlm_grant_lock(lock, &rpc_list);
2014                         granted = 1;
2015                         /* FIXME: completion handling not with lr_lock held ! */
2016                         if (lock->l_completion_ast)
2017                                 lock->l_completion_ast(lock, 0, NULL);
2018                 }
2019 #ifdef HAVE_SERVER_SUPPORT
2020         } else {
2021                 int rc;
2022                 ldlm_error_t err;
2023                 int pflags = 0;
2024                 ldlm_processing_policy policy;
2025                 policy = ldlm_processing_policy_table[res->lr_type];
2026                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
2027                 if (rc == LDLM_ITER_STOP) {
2028                         lock->l_req_mode = old_mode;
2029                         if (res->lr_type == LDLM_EXTENT)
2030                                 ldlm_extent_add_lock(res, lock);
2031                         else
2032                                 ldlm_granted_list_add_lock(lock, &prev);
2033
2034                         res = NULL;
2035                 } else {
2036                         *flags |= LDLM_FL_BLOCK_GRANTED;
2037                         granted = 1;
2038                 }
2039         }
2040 #else
2041         } else {
2042                 CERROR("This is client-side-only module, cannot handle "
2043                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2044                 LBUG();
2045         }
2046 #endif
2047         unlock_res_and_lock(lock);
2048
2049         if (granted)
2050                 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2051         if (node)
2052                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2053         RETURN(res);
2054 }
2055
2056 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2057 {
2058         struct ldlm_lock *lock;
2059
2060         if (!((libcfs_debug | D_ERROR) & level))
2061                 return;
2062
2063         lock = ldlm_handle2lock(lockh);
2064         if (lock == NULL)
2065                 return;
2066
2067         LDLM_DEBUG_LIMIT(level, lock, "###");
2068
2069         LDLM_LOCK_PUT(lock);
2070 }
2071
2072 void _ldlm_lock_debug(struct ldlm_lock *lock,
2073                       struct libcfs_debug_msg_data *msgdata,
2074                       const char *fmt, ...)
2075 {
2076         va_list args;
2077         struct obd_export *exp = lock->l_export;
2078         struct ldlm_resource *resource = lock->l_resource;
2079         char *nid = "local";
2080
2081         va_start(args, fmt);
2082
2083         if (exp && exp->exp_connection) {
2084                 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2085         } else if (exp && exp->exp_obd != NULL) {
2086                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2087                 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2088         }
2089
2090         if (resource == NULL) {
2091                 libcfs_debug_vmsg2(msgdata, fmt, args,
2092                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2093                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2094                        "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
2095                        lock,
2096                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2097                        lock->l_readers, lock->l_writers,
2098                        ldlm_lockname[lock->l_granted_mode],
2099                        ldlm_lockname[lock->l_req_mode],
2100                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2101                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2102                        lock->l_pid, lock->l_callback_timeout);
2103                 va_end(args);
2104                 return;
2105         }
2106
2107         switch (resource->lr_type) {
2108         case LDLM_EXTENT:
2109                 libcfs_debug_vmsg2(msgdata, fmt, args,
2110                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2111                        "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64
2112                        "] (req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote:"
2113                        " "LPX64" expref: %d pid: %u timeout %lu\n",
2114                        ldlm_lock_to_ns_name(lock), lock,
2115                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2116                        lock->l_readers, lock->l_writers,
2117                        ldlm_lockname[lock->l_granted_mode],
2118                        ldlm_lockname[lock->l_req_mode],
2119                        resource->lr_name.name[0],
2120                        resource->lr_name.name[1],
2121                        cfs_atomic_read(&resource->lr_refcount),
2122                        ldlm_typename[resource->lr_type],
2123                        lock->l_policy_data.l_extent.start,
2124                        lock->l_policy_data.l_extent.end,
2125                        lock->l_req_extent.start, lock->l_req_extent.end,
2126                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2127                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2128                        lock->l_pid, lock->l_callback_timeout);
2129                 break;
2130
2131         case LDLM_FLOCK:
2132                 libcfs_debug_vmsg2(msgdata, fmt, args,
2133                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2134                        "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d "
2135                        "["LPU64"->"LPU64"] flags: "LPX64" nid: %s remote: "LPX64
2136                        " expref: %d pid: %u timeout: %lu\n",
2137                        ldlm_lock_to_ns_name(lock), lock,
2138                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2139                        lock->l_readers, lock->l_writers,
2140                        ldlm_lockname[lock->l_granted_mode],
2141                        ldlm_lockname[lock->l_req_mode],
2142                        resource->lr_name.name[0],
2143                        resource->lr_name.name[1],
2144                        cfs_atomic_read(&resource->lr_refcount),
2145                        ldlm_typename[resource->lr_type],
2146                        lock->l_policy_data.l_flock.pid,
2147                        lock->l_policy_data.l_flock.start,
2148                        lock->l_policy_data.l_flock.end,
2149                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2150                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2151                        lock->l_pid, lock->l_callback_timeout);
2152                 break;
2153
2154         case LDLM_IBITS:
2155                 libcfs_debug_vmsg2(msgdata, fmt, args,
2156                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2157                        "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "
2158                        "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2159                        "pid: %u timeout: %lu\n",
2160                        ldlm_lock_to_ns_name(lock),
2161                        lock, lock->l_handle.h_cookie,
2162                        cfs_atomic_read (&lock->l_refc),
2163                        lock->l_readers, lock->l_writers,
2164                        ldlm_lockname[lock->l_granted_mode],
2165                        ldlm_lockname[lock->l_req_mode],
2166                        resource->lr_name.name[0],
2167                        resource->lr_name.name[1],
2168                        lock->l_policy_data.l_inodebits.bits,
2169                        cfs_atomic_read(&resource->lr_refcount),
2170                        ldlm_typename[resource->lr_type],
2171                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2172                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2173                        lock->l_pid, lock->l_callback_timeout);
2174                 break;
2175
2176         default:
2177                 libcfs_debug_vmsg2(msgdata, fmt, args,
2178                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2179                        "res: "LPU64"/"LPU64" rrc: %d type: %s flags: "LPX64" "
2180                        "nid: %s remote: "LPX64" expref: %d pid: %u timeout %lu"
2181                        "\n",
2182                        ldlm_lock_to_ns_name(lock),
2183                        lock, lock->l_handle.h_cookie,
2184                        cfs_atomic_read (&lock->l_refc),
2185                        lock->l_readers, lock->l_writers,
2186                        ldlm_lockname[lock->l_granted_mode],
2187                        ldlm_lockname[lock->l_req_mode],
2188                        resource->lr_name.name[0],
2189                        resource->lr_name.name[1],
2190                        cfs_atomic_read(&resource->lr_refcount),
2191                        ldlm_typename[resource->lr_type],
2192                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2193                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2194                        lock->l_pid, lock->l_callback_timeout);
2195                 break;
2196         }
2197         va_end(args);
2198 }
2199 EXPORT_SYMBOL(_ldlm_lock_debug);