Whamcloud - gitweb
165fe683d5452dd1a065bd582463b832ebb1e286
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #ifdef __KERNEL__
45 # include <libcfs/libcfs.h>
46 # include <linux/lustre_intent.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include "ldlm_internal.h"
53
54 /* lock types */
55 char *ldlm_lockname[] = {
56         [0] "--",
57         [LCK_EX] "EX",
58         [LCK_PW] "PW",
59         [LCK_PR] "PR",
60         [LCK_CW] "CW",
61         [LCK_CR] "CR",
62         [LCK_NL] "NL",
63         [LCK_GROUP] "GROUP",
64         [LCK_COS] "COS"
65 };
66 EXPORT_SYMBOL(ldlm_lockname);
67
68 char *ldlm_typename[] = {
69         [LDLM_PLAIN] "PLN",
70         [LDLM_EXTENT] "EXT",
71         [LDLM_FLOCK] "FLK",
72         [LDLM_IBITS] "IBT",
73 };
74 EXPORT_SYMBOL(ldlm_typename);
75
76 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
77         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
78         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
79         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire18_to_local,
80         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
81 };
82
83 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
84         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
85         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
86         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire21_to_local,
87         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
88 };
89
90 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
91         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_local_to_wire,
92         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_local_to_wire,
93         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_local_to_wire,
94         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_local_to_wire,
95 };
96
97 /**
98  * Converts lock policy from local format to on the wire lock_desc format
99  */
100 void ldlm_convert_policy_to_wire(ldlm_type_t type,
101                                  const ldlm_policy_data_t *lpolicy,
102                                  ldlm_wire_policy_data_t *wpolicy)
103 {
104         ldlm_policy_local_to_wire_t convert;
105
106         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
107
108         convert(lpolicy, wpolicy);
109 }
110
111 /**
112  * Converts lock policy from on the wire lock_desc format to local format
113  */
114 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
115                                   const ldlm_wire_policy_data_t *wpolicy,
116                                   ldlm_policy_data_t *lpolicy)
117 {
118         ldlm_policy_wire_to_local_t convert;
119         int new_client;
120
121         /** some badnes for 2.0.0 clients, but 2.0.0 isn't supported */
122         new_client = (exp->exp_connect_flags & OBD_CONNECT_FULL20) != 0;
123         if (new_client)
124                convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
125         else
126                convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
127
128         convert(wpolicy, lpolicy);
129 }
130
131 char *ldlm_it2str(int it)
132 {
133         switch (it) {
134         case IT_OPEN:
135                 return "open";
136         case IT_CREAT:
137                 return "creat";
138         case (IT_OPEN | IT_CREAT):
139                 return "open|creat";
140         case IT_READDIR:
141                 return "readdir";
142         case IT_GETATTR:
143                 return "getattr";
144         case IT_LOOKUP:
145                 return "lookup";
146         case IT_UNLINK:
147                 return "unlink";
148         case IT_GETXATTR:
149                 return "getxattr";
150         case IT_LAYOUT:
151                 return "layout";
152         default:
153                 CERROR("Unknown intent %d\n", it);
154                 return "UNKNOWN";
155         }
156 }
157 EXPORT_SYMBOL(ldlm_it2str);
158
159 extern cfs_mem_cache_t *ldlm_lock_slab;
160
161 #ifdef HAVE_SERVER_SUPPORT
162 static ldlm_processing_policy ldlm_processing_policy_table[] = {
163         [LDLM_PLAIN] ldlm_process_plain_lock,
164         [LDLM_EXTENT] ldlm_process_extent_lock,
165 # ifdef __KERNEL__
166         [LDLM_FLOCK] ldlm_process_flock_lock,
167 # endif
168         [LDLM_IBITS] ldlm_process_inodebits_lock,
169 };
170
171 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
172 {
173         return ldlm_processing_policy_table[res->lr_type];
174 }
175 EXPORT_SYMBOL(ldlm_get_processing_policy);
176 #endif /* HAVE_SERVER_SUPPORT */
177
178 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
179 {
180         ns->ns_policy = arg;
181 }
182 EXPORT_SYMBOL(ldlm_register_intent);
183
184 /*
185  * REFCOUNTED LOCK OBJECTS
186  */
187
188
189 /*
190  * Lock refcounts, during creation:
191  *   - one special one for allocation, dec'd only once in destroy
192  *   - one for being a lock that's in-use
193  *   - one for the addref associated with a new lock
194  */
195 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
196 {
197         cfs_atomic_inc(&lock->l_refc);
198         return lock;
199 }
200 EXPORT_SYMBOL(ldlm_lock_get);
201
202 void ldlm_lock_put(struct ldlm_lock *lock)
203 {
204         ENTRY;
205
206         LASSERT(lock->l_resource != LP_POISON);
207         LASSERT(cfs_atomic_read(&lock->l_refc) > 0);
208         if (cfs_atomic_dec_and_test(&lock->l_refc)) {
209                 struct ldlm_resource *res;
210
211                 LDLM_DEBUG(lock,
212                            "final lock_put on destroyed lock, freeing it.");
213
214                 res = lock->l_resource;
215                 LASSERT(lock->l_destroyed);
216                 LASSERT(cfs_list_empty(&lock->l_res_link));
217                 LASSERT(cfs_list_empty(&lock->l_pending_chain));
218
219                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
220                                      LDLM_NSS_LOCKS);
221                 lu_ref_del(&res->lr_reference, "lock", lock);
222                 ldlm_resource_putref(res);
223                 lock->l_resource = NULL;
224                 if (lock->l_export) {
225                         class_export_lock_put(lock->l_export, lock);
226                         lock->l_export = NULL;
227                 }
228
229                 if (lock->l_lvb_data != NULL)
230                         OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
231
232                 ldlm_interval_free(ldlm_interval_detach(lock));
233                 lu_ref_fini(&lock->l_reference);
234                 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
235         }
236
237         EXIT;
238 }
239 EXPORT_SYMBOL(ldlm_lock_put);
240
241 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
242 {
243         int rc = 0;
244         if (!cfs_list_empty(&lock->l_lru)) {
245                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
246
247                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
248                 cfs_list_del_init(&lock->l_lru);
249                 if (lock->l_flags & LDLM_FL_SKIPPED)
250                         lock->l_flags &= ~LDLM_FL_SKIPPED;
251                 LASSERT(ns->ns_nr_unused > 0);
252                 ns->ns_nr_unused--;
253                 rc = 1;
254         }
255         return rc;
256 }
257
258 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
259 {
260         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
261         int rc;
262
263         ENTRY;
264         if (lock->l_ns_srv) {
265                 LASSERT(cfs_list_empty(&lock->l_lru));
266                 RETURN(0);
267         }
268
269         cfs_spin_lock(&ns->ns_lock);
270         rc = ldlm_lock_remove_from_lru_nolock(lock);
271         cfs_spin_unlock(&ns->ns_lock);
272         EXIT;
273         return rc;
274 }
275
276 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
277 {
278         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
279
280         lock->l_last_used = cfs_time_current();
281         LASSERT(cfs_list_empty(&lock->l_lru));
282         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
283         cfs_list_add_tail(&lock->l_lru, &ns->ns_unused_list);
284         LASSERT(ns->ns_nr_unused >= 0);
285         ns->ns_nr_unused++;
286 }
287
288 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
289 {
290         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
291
292         ENTRY;
293         cfs_spin_lock(&ns->ns_lock);
294         ldlm_lock_add_to_lru_nolock(lock);
295         cfs_spin_unlock(&ns->ns_lock);
296         EXIT;
297 }
298
299 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
300 {
301         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
302
303         ENTRY;
304         if (lock->l_ns_srv) {
305                 LASSERT(cfs_list_empty(&lock->l_lru));
306                 EXIT;
307                 return;
308         }
309
310         cfs_spin_lock(&ns->ns_lock);
311         if (!cfs_list_empty(&lock->l_lru)) {
312                 ldlm_lock_remove_from_lru_nolock(lock);
313                 ldlm_lock_add_to_lru_nolock(lock);
314         }
315         cfs_spin_unlock(&ns->ns_lock);
316         EXIT;
317 }
318
319 /* This used to have a 'strict' flag, which recovery would use to mark an
320  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
321  * shall explain why it's gone: with the new hash table scheme, once you call
322  * ldlm_lock_destroy, you can never drop your final references on this lock.
323  * Because it's not in the hash table anymore.  -phil */
324 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
325 {
326         ENTRY;
327
328         if (lock->l_readers || lock->l_writers) {
329                 LDLM_ERROR(lock, "lock still has references");
330                 LBUG();
331         }
332
333         if (!cfs_list_empty(&lock->l_res_link)) {
334                 LDLM_ERROR(lock, "lock still on resource");
335                 LBUG();
336         }
337
338         if (lock->l_destroyed) {
339                 LASSERT(cfs_list_empty(&lock->l_lru));
340                 EXIT;
341                 return 0;
342         }
343         lock->l_destroyed = 1;
344
345         if (lock->l_export && lock->l_export->exp_lock_hash) {
346                 /* NB: it's safe to call cfs_hash_del() even lock isn't
347                  * in exp_lock_hash. */
348                 /* In the function below, .hs_keycmp resolves to
349                  * ldlm_export_lock_keycmp() */
350                 /* coverity[overrun-buffer-val] */
351                 cfs_hash_del(lock->l_export->exp_lock_hash,
352                              &lock->l_remote_handle, &lock->l_exp_hash);
353         }
354
355         ldlm_lock_remove_from_lru(lock);
356         class_handle_unhash(&lock->l_handle);
357
358 #if 0
359         /* Wake anyone waiting for this lock */
360         /* FIXME: I should probably add yet another flag, instead of using
361          * l_export to only call this on clients */
362         if (lock->l_export)
363                 class_export_put(lock->l_export);
364         lock->l_export = NULL;
365         if (lock->l_export && lock->l_completion_ast)
366                 lock->l_completion_ast(lock, 0);
367 #endif
368         EXIT;
369         return 1;
370 }
371
372 void ldlm_lock_destroy(struct ldlm_lock *lock)
373 {
374         int first;
375         ENTRY;
376         lock_res_and_lock(lock);
377         first = ldlm_lock_destroy_internal(lock);
378         unlock_res_and_lock(lock);
379
380         /* drop reference from hashtable only for first destroy */
381         if (first) {
382                 lu_ref_del(&lock->l_reference, "hash", lock);
383                 LDLM_LOCK_RELEASE(lock);
384         }
385         EXIT;
386 }
387
388 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
389 {
390         int first;
391         ENTRY;
392         first = ldlm_lock_destroy_internal(lock);
393         /* drop reference from hashtable only for first destroy */
394         if (first) {
395                 lu_ref_del(&lock->l_reference, "hash", lock);
396                 LDLM_LOCK_RELEASE(lock);
397         }
398         EXIT;
399 }
400
401 /* this is called by portals_handle2object with the handle lock taken */
402 static void lock_handle_addref(void *lock)
403 {
404         LDLM_LOCK_GET((struct ldlm_lock *)lock);
405 }
406
407 static void lock_handle_free(void *lock, int size)
408 {
409         LASSERT(size == sizeof(struct ldlm_lock));
410         OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
411 }
412
413 struct portals_handle_ops lock_handle_ops = {
414         .hop_addref = lock_handle_addref,
415         .hop_free   = lock_handle_free,
416 };
417
418 /*
419  * usage: pass in a resource on which you have done ldlm_resource_get
420  *        new lock will take over the refcount.
421  * returns: lock with refcount 2 - one for current caller and one for remote
422  */
423 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
424 {
425         struct ldlm_lock *lock;
426         ENTRY;
427
428         if (resource == NULL)
429                 LBUG();
430
431         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, CFS_ALLOC_IO);
432         if (lock == NULL)
433                 RETURN(NULL);
434
435         cfs_spin_lock_init(&lock->l_lock);
436         lock->l_resource = resource;
437         lu_ref_add(&resource->lr_reference, "lock", lock);
438
439         cfs_atomic_set(&lock->l_refc, 2);
440         CFS_INIT_LIST_HEAD(&lock->l_res_link);
441         CFS_INIT_LIST_HEAD(&lock->l_lru);
442         CFS_INIT_LIST_HEAD(&lock->l_pending_chain);
443         CFS_INIT_LIST_HEAD(&lock->l_bl_ast);
444         CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
445         CFS_INIT_LIST_HEAD(&lock->l_rk_ast);
446         cfs_waitq_init(&lock->l_waitq);
447         lock->l_blocking_lock = NULL;
448         CFS_INIT_LIST_HEAD(&lock->l_sl_mode);
449         CFS_INIT_LIST_HEAD(&lock->l_sl_policy);
450         CFS_INIT_HLIST_NODE(&lock->l_exp_hash);
451         CFS_INIT_HLIST_NODE(&lock->l_exp_flock_hash);
452
453         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
454                              LDLM_NSS_LOCKS);
455         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
456         class_handle_hash(&lock->l_handle, &lock_handle_ops);
457
458         lu_ref_init(&lock->l_reference);
459         lu_ref_add(&lock->l_reference, "hash", lock);
460         lock->l_callback_timeout = 0;
461
462 #if LUSTRE_TRACKS_LOCK_EXP_REFS
463         CFS_INIT_LIST_HEAD(&lock->l_exp_refs_link);
464         lock->l_exp_refs_nr = 0;
465         lock->l_exp_refs_target = NULL;
466 #endif
467         CFS_INIT_LIST_HEAD(&lock->l_exp_list);
468
469         RETURN(lock);
470 }
471
472 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
473                               const struct ldlm_res_id *new_resid)
474 {
475         struct ldlm_resource *oldres = lock->l_resource;
476         struct ldlm_resource *newres;
477         int type;
478         ENTRY;
479
480         LASSERT(ns_is_client(ns));
481
482         lock_res_and_lock(lock);
483         if (memcmp(new_resid, &lock->l_resource->lr_name,
484                    sizeof(lock->l_resource->lr_name)) == 0) {
485                 /* Nothing to do */
486                 unlock_res_and_lock(lock);
487                 RETURN(0);
488         }
489
490         LASSERT(new_resid->name[0] != 0);
491
492         /* This function assumes that the lock isn't on any lists */
493         LASSERT(cfs_list_empty(&lock->l_res_link));
494
495         type = oldres->lr_type;
496         unlock_res_and_lock(lock);
497
498         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
499         if (newres == NULL)
500                 RETURN(-ENOMEM);
501
502         lu_ref_add(&newres->lr_reference, "lock", lock);
503         /*
504          * To flip the lock from the old to the new resource, lock, oldres and
505          * newres have to be locked. Resource spin-locks are nested within
506          * lock->l_lock, and are taken in the memory address order to avoid
507          * dead-locks.
508          */
509         cfs_spin_lock(&lock->l_lock);
510         oldres = lock->l_resource;
511         if (oldres < newres) {
512                 lock_res(oldres);
513                 lock_res_nested(newres, LRT_NEW);
514         } else {
515                 lock_res(newres);
516                 lock_res_nested(oldres, LRT_NEW);
517         }
518         LASSERT(memcmp(new_resid, &oldres->lr_name,
519                        sizeof oldres->lr_name) != 0);
520         lock->l_resource = newres;
521         unlock_res(oldres);
522         unlock_res_and_lock(lock);
523
524         /* ...and the flowers are still standing! */
525         lu_ref_del(&oldres->lr_reference, "lock", lock);
526         ldlm_resource_putref(oldres);
527
528         RETURN(0);
529 }
530 EXPORT_SYMBOL(ldlm_lock_change_resource);
531
532 /*
533  *  HANDLES
534  */
535
536 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
537 {
538         lockh->cookie = lock->l_handle.h_cookie;
539 }
540 EXPORT_SYMBOL(ldlm_lock2handle);
541
542 /* if flags: atomically get the lock and set the flags.
543  *           Return NULL if flag already set
544  */
545
546 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
547                                      int flags)
548 {
549         struct ldlm_lock *lock;
550         ENTRY;
551
552         LASSERT(handle);
553
554         lock = class_handle2object(handle->cookie);
555         if (lock == NULL)
556                 RETURN(NULL);
557
558         /* It's unlikely but possible that someone marked the lock as
559          * destroyed after we did handle2object on it */
560         if (flags == 0 && !lock->l_destroyed) {
561                 lu_ref_add(&lock->l_reference, "handle", cfs_current());
562                 RETURN(lock);
563         }
564
565         lock_res_and_lock(lock);
566
567         LASSERT(lock->l_resource != NULL);
568
569         lu_ref_add_atomic(&lock->l_reference, "handle", cfs_current());
570         if (unlikely(lock->l_destroyed)) {
571                 unlock_res_and_lock(lock);
572                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
573                 LDLM_LOCK_PUT(lock);
574                 RETURN(NULL);
575         }
576
577         if (flags && (lock->l_flags & flags)) {
578                 unlock_res_and_lock(lock);
579                 LDLM_LOCK_PUT(lock);
580                 RETURN(NULL);
581         }
582
583         if (flags)
584                 lock->l_flags |= flags;
585
586         unlock_res_and_lock(lock);
587         RETURN(lock);
588 }
589 EXPORT_SYMBOL(__ldlm_handle2lock);
590
591 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
592 {
593         struct obd_export *exp = lock->l_export?:lock->l_conn_export;
594         /* INODEBITS_INTEROP: If the other side does not support
595          * inodebits, reply with a plain lock descriptor.
596          */
597         if ((lock->l_resource->lr_type == LDLM_IBITS) &&
598             (exp && !(exp->exp_connect_flags & OBD_CONNECT_IBITS))) {
599                 /* Make sure all the right bits are set in this lock we
600                    are going to pass to client */
601                 LASSERTF(lock->l_policy_data.l_inodebits.bits ==
602                          (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
603                           MDS_INODELOCK_LAYOUT),
604                          "Inappropriate inode lock bits during "
605                          "conversion " LPU64 "\n",
606                          lock->l_policy_data.l_inodebits.bits);
607
608                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
609                 desc->l_resource.lr_type = LDLM_PLAIN;
610
611                 /* Convert "new" lock mode to something old client can
612                    understand */
613                 if ((lock->l_req_mode == LCK_CR) ||
614                     (lock->l_req_mode == LCK_CW))
615                         desc->l_req_mode = LCK_PR;
616                 else
617                         desc->l_req_mode = lock->l_req_mode;
618                 if ((lock->l_granted_mode == LCK_CR) ||
619                     (lock->l_granted_mode == LCK_CW)) {
620                         desc->l_granted_mode = LCK_PR;
621                 } else {
622                         /* We never grant PW/EX locks to clients */
623                         LASSERT((lock->l_granted_mode != LCK_PW) &&
624                                 (lock->l_granted_mode != LCK_EX));
625                         desc->l_granted_mode = lock->l_granted_mode;
626                 }
627
628                 /* We do not copy policy here, because there is no
629                    policy for plain locks */
630         } else {
631                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
632                 desc->l_req_mode = lock->l_req_mode;
633                 desc->l_granted_mode = lock->l_granted_mode;
634                 ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
635                                             &lock->l_policy_data,
636                                             &desc->l_policy_data);
637         }
638 }
639 EXPORT_SYMBOL(ldlm_lock2desc);
640
641 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
642                            cfs_list_t *work_list)
643 {
644         if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
645                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
646                 lock->l_flags |= LDLM_FL_AST_SENT;
647                 /* If the enqueuing client said so, tell the AST recipient to
648                  * discard dirty data, rather than writing back. */
649                 if (new->l_flags & LDLM_AST_DISCARD_DATA)
650                         lock->l_flags |= LDLM_FL_DISCARD_DATA;
651                 LASSERT(cfs_list_empty(&lock->l_bl_ast));
652                 cfs_list_add(&lock->l_bl_ast, work_list);
653                 LDLM_LOCK_GET(lock);
654                 LASSERT(lock->l_blocking_lock == NULL);
655                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
656         }
657 }
658
659 void ldlm_add_cp_work_item(struct ldlm_lock *lock, cfs_list_t *work_list)
660 {
661         if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
662                 lock->l_flags |= LDLM_FL_CP_REQD;
663                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
664                 LASSERT(cfs_list_empty(&lock->l_cp_ast));
665                 cfs_list_add(&lock->l_cp_ast, work_list);
666                 LDLM_LOCK_GET(lock);
667         }
668 }
669
670 /* must be called with lr_lock held */
671 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
672                             cfs_list_t *work_list)
673 {
674         ENTRY;
675         check_res_locked(lock->l_resource);
676         if (new)
677                 ldlm_add_bl_work_item(lock, new, work_list);
678         else
679                 ldlm_add_cp_work_item(lock, work_list);
680         EXIT;
681 }
682
683 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
684 {
685         struct ldlm_lock *lock;
686
687         lock = ldlm_handle2lock(lockh);
688         LASSERT(lock != NULL);
689         ldlm_lock_addref_internal(lock, mode);
690         LDLM_LOCK_PUT(lock);
691 }
692 EXPORT_SYMBOL(ldlm_lock_addref);
693
694 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
695 {
696         ldlm_lock_remove_from_lru(lock);
697         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
698                 lock->l_readers++;
699                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
700         }
701         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
702                 lock->l_writers++;
703                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
704         }
705         LDLM_LOCK_GET(lock);
706         lu_ref_add_atomic(&lock->l_reference, "user", lock);
707         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
708 }
709
710 /**
711  * Attempts to addref a lock, and fails if lock is already LDLM_FL_CBPENDING
712  * or destroyed.
713  *
714  * \retval 0 success, lock was addref-ed
715  *
716  * \retval -EAGAIN lock is being canceled.
717  */
718 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
719 {
720         struct ldlm_lock *lock;
721         int               result;
722
723         result = -EAGAIN;
724         lock = ldlm_handle2lock(lockh);
725         if (lock != NULL) {
726                 lock_res_and_lock(lock);
727                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
728                     !(lock->l_flags & LDLM_FL_CBPENDING)) {
729                         ldlm_lock_addref_internal_nolock(lock, mode);
730                         result = 0;
731                 }
732                 unlock_res_and_lock(lock);
733                 LDLM_LOCK_PUT(lock);
734         }
735         return result;
736 }
737 EXPORT_SYMBOL(ldlm_lock_addref_try);
738
739 /* only called for local locks */
740 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
741 {
742         lock_res_and_lock(lock);
743         ldlm_lock_addref_internal_nolock(lock, mode);
744         unlock_res_and_lock(lock);
745 }
746
747 /* only called in ldlm_flock_destroy and for local locks.
748  *  * for LDLM_FLOCK type locks, l_blocking_ast is null, and
749  *   * ldlm_lock_remove_from_lru() does nothing, it is safe
750  *    * for ldlm_flock_destroy usage by dropping some code */
751 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
752 {
753         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
754         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
755                 LASSERT(lock->l_readers > 0);
756                 lu_ref_del(&lock->l_reference, "reader", lock);
757                 lock->l_readers--;
758         }
759         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
760                 LASSERT(lock->l_writers > 0);
761                 lu_ref_del(&lock->l_reference, "writer", lock);
762                 lock->l_writers--;
763         }
764
765         lu_ref_del(&lock->l_reference, "user", lock);
766         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
767 }
768
769 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
770 {
771         struct ldlm_namespace *ns;
772         ENTRY;
773
774         lock_res_and_lock(lock);
775
776         ns = ldlm_lock_to_ns(lock);
777
778         ldlm_lock_decref_internal_nolock(lock, mode);
779
780         if (lock->l_flags & LDLM_FL_LOCAL &&
781             !lock->l_readers && !lock->l_writers) {
782                 /* If this is a local lock on a server namespace and this was
783                  * the last reference, cancel the lock. */
784                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
785                 lock->l_flags |= LDLM_FL_CBPENDING;
786         }
787
788         if (!lock->l_readers && !lock->l_writers &&
789             (lock->l_flags & LDLM_FL_CBPENDING)) {
790                 /* If we received a blocked AST and this was the last reference,
791                  * run the callback. */
792                 if (lock->l_ns_srv && lock->l_export)
793                         CERROR("FL_CBPENDING set on non-local lock--just a "
794                                "warning\n");
795
796                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
797
798                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
799                 ldlm_lock_remove_from_lru(lock);
800                 unlock_res_and_lock(lock);
801
802                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
803                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
804
805                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
806                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
807                         ldlm_handle_bl_callback(ns, NULL, lock);
808         } else if (ns_is_client(ns) &&
809                    !lock->l_readers && !lock->l_writers &&
810                    !(lock->l_flags & LDLM_FL_NO_LRU) &&
811                    !(lock->l_flags & LDLM_FL_BL_AST)) {
812
813                 LDLM_DEBUG(lock, "add lock into lru list");
814
815                 /* If this is a client-side namespace and this was the last
816                  * reference, put it on the LRU. */
817                 ldlm_lock_add_to_lru(lock);
818                 unlock_res_and_lock(lock);
819
820                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
821                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
822
823                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
824                  * are not supported by the server, otherwise, it is done on
825                  * enqueue. */
826                 if (!exp_connect_cancelset(lock->l_conn_export) &&
827                     !ns_connect_lru_resize(ns))
828                         ldlm_cancel_lru(ns, 0, LDLM_ASYNC, 0);
829         } else {
830                 LDLM_DEBUG(lock, "do not add lock into lru list");
831                 unlock_res_and_lock(lock);
832         }
833
834         EXIT;
835 }
836
837 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
838 {
839         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
840         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
841         ldlm_lock_decref_internal(lock, mode);
842         LDLM_LOCK_PUT(lock);
843 }
844 EXPORT_SYMBOL(ldlm_lock_decref);
845
846 /* This will drop a lock reference and mark it for destruction, but will not
847  * necessarily cancel the lock before returning. */
848 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
849 {
850         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
851         ENTRY;
852
853         LASSERT(lock != NULL);
854
855         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
856         lock_res_and_lock(lock);
857         lock->l_flags |= LDLM_FL_CBPENDING;
858         unlock_res_and_lock(lock);
859         ldlm_lock_decref_internal(lock, mode);
860         LDLM_LOCK_PUT(lock);
861 }
862 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
863
864 struct sl_insert_point {
865         cfs_list_t *res_link;
866         cfs_list_t *mode_link;
867         cfs_list_t *policy_link;
868 };
869
870 /*
871  * search_granted_lock
872  *
873  * Description:
874  *      Finds a position to insert the new lock.
875  * Parameters:
876  *      queue [input]:  the granted list where search acts on;
877  *      req [input]:    the lock whose position to be located;
878  *      prev [output]:  positions within 3 lists to insert @req to
879  * Return Value:
880  *      filled @prev
881  * NOTE: called by
882  *  - ldlm_grant_lock_with_skiplist
883  */
884 static void search_granted_lock(cfs_list_t *queue,
885                                 struct ldlm_lock *req,
886                                 struct sl_insert_point *prev)
887 {
888         cfs_list_t *tmp;
889         struct ldlm_lock *lock, *mode_end, *policy_end;
890         ENTRY;
891
892         cfs_list_for_each(tmp, queue) {
893                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
894
895                 mode_end = cfs_list_entry(lock->l_sl_mode.prev,
896                                           struct ldlm_lock, l_sl_mode);
897
898                 if (lock->l_req_mode != req->l_req_mode) {
899                         /* jump to last lock of mode group */
900                         tmp = &mode_end->l_res_link;
901                         continue;
902                 }
903
904                 /* suitable mode group is found */
905                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
906                         /* insert point is last lock of the mode group */
907                         prev->res_link = &mode_end->l_res_link;
908                         prev->mode_link = &mode_end->l_sl_mode;
909                         prev->policy_link = &req->l_sl_policy;
910                         EXIT;
911                         return;
912                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
913                         for (;;) {
914                                 policy_end =
915                                         cfs_list_entry(lock->l_sl_policy.prev,
916                                                        struct ldlm_lock,
917                                                        l_sl_policy);
918
919                                 if (lock->l_policy_data.l_inodebits.bits ==
920                                     req->l_policy_data.l_inodebits.bits) {
921                                         /* insert point is last lock of
922                                          * the policy group */
923                                         prev->res_link =
924                                                 &policy_end->l_res_link;
925                                         prev->mode_link =
926                                                 &policy_end->l_sl_mode;
927                                         prev->policy_link =
928                                                 &policy_end->l_sl_policy;
929                                         EXIT;
930                                         return;
931                                 }
932
933                                 if (policy_end == mode_end)
934                                         /* done with mode group */
935                                         break;
936
937                                 /* go to next policy group within mode group */
938                                 tmp = policy_end->l_res_link.next;
939                                 lock = cfs_list_entry(tmp, struct ldlm_lock,
940                                                       l_res_link);
941                         }  /* loop over policy groups within the mode group */
942
943                         /* insert point is last lock of the mode group,
944                          * new policy group is started */
945                         prev->res_link = &mode_end->l_res_link;
946                         prev->mode_link = &mode_end->l_sl_mode;
947                         prev->policy_link = &req->l_sl_policy;
948                         EXIT;
949                         return;
950                 } else {
951                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
952                         LBUG();
953                 }
954         }
955
956         /* insert point is last lock on the queue,
957          * new mode group and new policy group are started */
958         prev->res_link = queue->prev;
959         prev->mode_link = &req->l_sl_mode;
960         prev->policy_link = &req->l_sl_policy;
961         EXIT;
962         return;
963 }
964
965 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
966                                        struct sl_insert_point *prev)
967 {
968         struct ldlm_resource *res = lock->l_resource;
969         ENTRY;
970
971         check_res_locked(res);
972
973         ldlm_resource_dump(D_INFO, res);
974         LDLM_DEBUG(lock, "About to add lock:");
975
976         if (lock->l_destroyed) {
977                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
978                 return;
979         }
980
981         LASSERT(cfs_list_empty(&lock->l_res_link));
982         LASSERT(cfs_list_empty(&lock->l_sl_mode));
983         LASSERT(cfs_list_empty(&lock->l_sl_policy));
984
985         cfs_list_add(&lock->l_res_link, prev->res_link);
986         cfs_list_add(&lock->l_sl_mode, prev->mode_link);
987         cfs_list_add(&lock->l_sl_policy, prev->policy_link);
988
989         EXIT;
990 }
991
992 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
993 {
994         struct sl_insert_point prev;
995         ENTRY;
996
997         LASSERT(lock->l_req_mode == lock->l_granted_mode);
998
999         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
1000         ldlm_granted_list_add_lock(lock, &prev);
1001         EXIT;
1002 }
1003
1004 /* NOTE: called by
1005  *  - ldlm_lock_enqueue
1006  *  - ldlm_reprocess_queue
1007  *  - ldlm_lock_convert
1008  *
1009  * must be called with lr_lock held
1010  */
1011 void ldlm_grant_lock(struct ldlm_lock *lock, cfs_list_t *work_list)
1012 {
1013         struct ldlm_resource *res = lock->l_resource;
1014         ENTRY;
1015
1016         check_res_locked(res);
1017
1018         lock->l_granted_mode = lock->l_req_mode;
1019         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1020                 ldlm_grant_lock_with_skiplist(lock);
1021         else if (res->lr_type == LDLM_EXTENT)
1022                 ldlm_extent_add_lock(res, lock);
1023         else
1024                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1025
1026         if (lock->l_granted_mode < res->lr_most_restr)
1027                 res->lr_most_restr = lock->l_granted_mode;
1028
1029         if (work_list && lock->l_completion_ast != NULL)
1030                 ldlm_add_ast_work_item(lock, NULL, work_list);
1031
1032         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1033         EXIT;
1034 }
1035
1036 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
1037  * comment above ldlm_lock_match */
1038 static struct ldlm_lock *search_queue(cfs_list_t *queue,
1039                                       ldlm_mode_t *mode,
1040                                       ldlm_policy_data_t *policy,
1041                                       struct ldlm_lock *old_lock,
1042                                       int flags, int unref)
1043 {
1044         struct ldlm_lock *lock;
1045         cfs_list_t       *tmp;
1046
1047         cfs_list_for_each(tmp, queue) {
1048                 ldlm_mode_t match;
1049
1050                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1051
1052                 if (lock == old_lock)
1053                         break;
1054
1055                 /* llite sometimes wants to match locks that will be
1056                  * canceled when their users drop, but we allow it to match
1057                  * if it passes in CBPENDING and the lock still has users.
1058                  * this is generally only going to be used by children
1059                  * whose parents already hold a lock so forward progress
1060                  * can still happen. */
1061                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1062                     !(flags & LDLM_FL_CBPENDING))
1063                         continue;
1064                 if (!unref && lock->l_flags & LDLM_FL_CBPENDING &&
1065                     lock->l_readers == 0 && lock->l_writers == 0)
1066                         continue;
1067
1068                 if (!(lock->l_req_mode & *mode))
1069                         continue;
1070                 match = lock->l_req_mode;
1071
1072                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
1073                     (lock->l_policy_data.l_extent.start >
1074                      policy->l_extent.start ||
1075                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
1076                         continue;
1077
1078                 if (unlikely(match == LCK_GROUP) &&
1079                     lock->l_resource->lr_type == LDLM_EXTENT &&
1080                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1081                         continue;
1082
1083                 /* We match if we have existing lock with same or wider set
1084                    of bits. */
1085                 if (lock->l_resource->lr_type == LDLM_IBITS &&
1086                      ((lock->l_policy_data.l_inodebits.bits &
1087                       policy->l_inodebits.bits) !=
1088                       policy->l_inodebits.bits))
1089                         continue;
1090
1091                 if (!unref &&
1092                     (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1093                      lock->l_failed))
1094                         continue;
1095
1096                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
1097                     !(lock->l_flags & LDLM_FL_LOCAL))
1098                         continue;
1099
1100                 if (flags & LDLM_FL_TEST_LOCK) {
1101                         LDLM_LOCK_GET(lock);
1102                         ldlm_lock_touch_in_lru(lock);
1103                 } else {
1104                         ldlm_lock_addref_internal_nolock(lock, match);
1105                 }
1106                 *mode = match;
1107                 return lock;
1108         }
1109
1110         return NULL;
1111 }
1112
1113 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1114 {
1115         if (!lock->l_failed) {
1116                 lock->l_failed = 1;
1117                 cfs_waitq_broadcast(&lock->l_waitq);
1118         }
1119 }
1120 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1121
1122 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1123 {
1124         lock_res_and_lock(lock);
1125         ldlm_lock_fail_match_locked(lock);
1126         unlock_res_and_lock(lock);
1127 }
1128 EXPORT_SYMBOL(ldlm_lock_fail_match);
1129
1130 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1131 {
1132         lock->l_flags |= LDLM_FL_LVB_READY;
1133         cfs_waitq_broadcast(&lock->l_waitq);
1134 }
1135 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
1136
1137 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1138 {
1139         lock_res_and_lock(lock);
1140         ldlm_lock_allow_match_locked(lock);
1141         unlock_res_and_lock(lock);
1142 }
1143 EXPORT_SYMBOL(ldlm_lock_allow_match);
1144
1145 /* Can be called in two ways:
1146  *
1147  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1148  * for a duplicate of.
1149  *
1150  * Otherwise, all of the fields must be filled in, to match against.
1151  *
1152  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1153  *     server (ie, connh is NULL)
1154  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1155  *     list will be considered
1156  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1157  *     to be canceled can still be matched as long as they still have reader
1158  *     or writer refernces
1159  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1160  *     just tell us if we would have matched.
1161  *
1162  * Returns 1 if it finds an already-existing lock that is compatible; in this
1163  * case, lockh is filled in with a addref()ed lock
1164  *
1165  * we also check security context, if that failed we simply return 0 (to keep
1166  * caller code unchanged), the context failure will be discovered by caller
1167  * sometime later.
1168  */
1169 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
1170                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1171                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1172                             struct lustre_handle *lockh, int unref)
1173 {
1174         struct ldlm_resource *res;
1175         struct ldlm_lock *lock, *old_lock = NULL;
1176         int rc = 0;
1177         ENTRY;
1178
1179         if (ns == NULL) {
1180                 old_lock = ldlm_handle2lock(lockh);
1181                 LASSERT(old_lock);
1182
1183                 ns = ldlm_lock_to_ns(old_lock);
1184                 res_id = &old_lock->l_resource->lr_name;
1185                 type = old_lock->l_resource->lr_type;
1186                 mode = old_lock->l_req_mode;
1187         }
1188
1189         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1190         if (res == NULL) {
1191                 LASSERT(old_lock == NULL);
1192                 RETURN(0);
1193         }
1194
1195         LDLM_RESOURCE_ADDREF(res);
1196         lock_res(res);
1197
1198         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1199                             flags, unref);
1200         if (lock != NULL)
1201                 GOTO(out, rc = 1);
1202         if (flags & LDLM_FL_BLOCK_GRANTED)
1203                 GOTO(out, rc = 0);
1204         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1205                             flags, unref);
1206         if (lock != NULL)
1207                 GOTO(out, rc = 1);
1208         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1209                             flags, unref);
1210         if (lock != NULL)
1211                 GOTO(out, rc = 1);
1212
1213         EXIT;
1214  out:
1215         unlock_res(res);
1216         LDLM_RESOURCE_DELREF(res);
1217         ldlm_resource_putref(res);
1218
1219         if (lock) {
1220                 ldlm_lock2handle(lock, lockh);
1221                 if ((flags & LDLM_FL_LVB_READY) &&
1222                     (!(lock->l_flags & LDLM_FL_LVB_READY))) {
1223                         struct l_wait_info lwi;
1224                         if (lock->l_completion_ast) {
1225                                 int err = lock->l_completion_ast(lock,
1226                                                           LDLM_FL_WAIT_NOREPROC,
1227                                                                  NULL);
1228                                 if (err) {
1229                                         if (flags & LDLM_FL_TEST_LOCK)
1230                                                 LDLM_LOCK_RELEASE(lock);
1231                                         else
1232                                                 ldlm_lock_decref_internal(lock,
1233                                                                           mode);
1234                                         rc = 0;
1235                                         goto out2;
1236                                 }
1237                         }
1238
1239                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1240                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1241
1242                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1243                         l_wait_event(lock->l_waitq,
1244                                      lock->l_flags & LDLM_FL_LVB_READY ||
1245                                      lock->l_failed,
1246                                      &lwi);
1247                         if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
1248                                 if (flags & LDLM_FL_TEST_LOCK)
1249                                         LDLM_LOCK_RELEASE(lock);
1250                                 else
1251                                         ldlm_lock_decref_internal(lock, mode);
1252                                 rc = 0;
1253                         }
1254                 }
1255         }
1256  out2:
1257         if (rc) {
1258                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1259                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1260                                 res_id->name[2] : policy->l_extent.start,
1261                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1262                                 res_id->name[3] : policy->l_extent.end);
1263
1264                 /* check user's security context */
1265                 if (lock->l_conn_export &&
1266                     sptlrpc_import_check_ctx(
1267                                 class_exp2cliimp(lock->l_conn_export))) {
1268                         if (!(flags & LDLM_FL_TEST_LOCK))
1269                                 ldlm_lock_decref_internal(lock, mode);
1270                         rc = 0;
1271                 }
1272
1273                 if (flags & LDLM_FL_TEST_LOCK)
1274                         LDLM_LOCK_RELEASE(lock);
1275
1276         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1277                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1278                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1279                                   type, mode, res_id->name[0], res_id->name[1],
1280                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1281                                         res_id->name[2] :policy->l_extent.start,
1282                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1283                                         res_id->name[3] : policy->l_extent.end);
1284         }
1285         if (old_lock)
1286                 LDLM_LOCK_PUT(old_lock);
1287
1288         return rc ? mode : 0;
1289 }
1290 EXPORT_SYMBOL(ldlm_lock_match);
1291
1292 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1293                                         __u64 *bits)
1294 {
1295         struct ldlm_lock *lock;
1296         ldlm_mode_t mode = 0;
1297         ENTRY;
1298
1299         lock = ldlm_handle2lock(lockh);
1300         if (lock != NULL) {
1301                 lock_res_and_lock(lock);
1302                 if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1303                     lock->l_failed)
1304                         GOTO(out, mode);
1305
1306                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1307                     lock->l_readers == 0 && lock->l_writers == 0)
1308                         GOTO(out, mode);
1309
1310                 if (bits)
1311                         *bits = lock->l_policy_data.l_inodebits.bits;
1312                 mode = lock->l_granted_mode;
1313                 ldlm_lock_addref_internal_nolock(lock, mode);
1314         }
1315
1316         EXIT;
1317
1318 out:
1319         if (lock != NULL) {
1320                 unlock_res_and_lock(lock);
1321                 LDLM_LOCK_PUT(lock);
1322         }
1323         return mode;
1324 }
1325 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1326
1327 /* Returns a referenced lock */
1328 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1329                                    const struct ldlm_res_id *res_id,
1330                                    ldlm_type_t type,
1331                                    ldlm_mode_t mode,
1332                                    const struct ldlm_callback_suite *cbs,
1333                                    void *data, __u32 lvb_len)
1334 {
1335         struct ldlm_lock *lock;
1336         struct ldlm_resource *res;
1337         ENTRY;
1338
1339         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1340         if (res == NULL)
1341                 RETURN(NULL);
1342
1343         lock = ldlm_lock_new(res);
1344
1345         if (lock == NULL)
1346                 RETURN(NULL);
1347
1348         lock->l_req_mode = mode;
1349         lock->l_ast_data = data;
1350         lock->l_pid = cfs_curproc_pid();
1351         lock->l_ns_srv = !!ns_is_server(ns);
1352         if (cbs) {
1353                 lock->l_blocking_ast = cbs->lcs_blocking;
1354                 lock->l_completion_ast = cbs->lcs_completion;
1355                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1356                 lock->l_weigh_ast = cbs->lcs_weigh;
1357         }
1358
1359         lock->l_tree_node = NULL;
1360         /* if this is the extent lock, allocate the interval tree node */
1361         if (type == LDLM_EXTENT) {
1362                 if (ldlm_interval_alloc(lock) == NULL)
1363                         GOTO(out, 0);
1364         }
1365
1366         if (lvb_len) {
1367                 lock->l_lvb_len = lvb_len;
1368                 OBD_ALLOC(lock->l_lvb_data, lvb_len);
1369                 if (lock->l_lvb_data == NULL)
1370                         GOTO(out, 0);
1371         }
1372
1373         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1374                 GOTO(out, 0);
1375
1376         RETURN(lock);
1377
1378 out:
1379         ldlm_lock_destroy(lock);
1380         LDLM_LOCK_RELEASE(lock);
1381         return NULL;
1382 }
1383
1384 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1385                                struct ldlm_lock **lockp,
1386                                void *cookie, int *flags)
1387 {
1388         struct ldlm_lock *lock = *lockp;
1389         struct ldlm_resource *res = lock->l_resource;
1390         int local = ns_is_client(ldlm_res_to_ns(res));
1391 #ifdef HAVE_SERVER_SUPPORT
1392         ldlm_processing_policy policy;
1393 #endif
1394         ldlm_error_t rc = ELDLM_OK;
1395         struct ldlm_interval *node = NULL;
1396         ENTRY;
1397
1398         lock->l_last_activity = cfs_time_current_sec();
1399         /* policies are not executed on the client or during replay */
1400         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1401             && !local && ns->ns_policy) {
1402                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1403                                    NULL);
1404                 if (rc == ELDLM_LOCK_REPLACED) {
1405                         /* The lock that was returned has already been granted,
1406                          * and placed into lockp.  If it's not the same as the
1407                          * one we passed in, then destroy the old one and our
1408                          * work here is done. */
1409                         if (lock != *lockp) {
1410                                 ldlm_lock_destroy(lock);
1411                                 LDLM_LOCK_RELEASE(lock);
1412                         }
1413                         *flags |= LDLM_FL_LOCK_CHANGED;
1414                         RETURN(0);
1415                 } else if (rc != ELDLM_OK ||
1416                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1417                         ldlm_lock_destroy(lock);
1418                         RETURN(rc);
1419                 }
1420         }
1421
1422         /* For a replaying lock, it might be already in granted list. So
1423          * unlinking the lock will cause the interval node to be freed, we
1424          * have to allocate the interval node early otherwise we can't regrant
1425          * this lock in the future. - jay */
1426         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1427                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1428
1429         lock_res_and_lock(lock);
1430         if (local && lock->l_req_mode == lock->l_granted_mode) {
1431                 /* The server returned a blocked lock, but it was granted
1432                  * before we got a chance to actually enqueue it.  We don't
1433                  * need to do anything else. */
1434                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1435                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1436                 GOTO(out, ELDLM_OK);
1437         }
1438
1439         ldlm_resource_unlink_lock(lock);
1440         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1441                 if (node == NULL) {
1442                         ldlm_lock_destroy_nolock(lock);
1443                         GOTO(out, rc = -ENOMEM);
1444                 }
1445
1446                 CFS_INIT_LIST_HEAD(&node->li_group);
1447                 ldlm_interval_attach(node, lock);
1448                 node = NULL;
1449         }
1450
1451         /* Some flags from the enqueue want to make it into the AST, via the
1452          * lock's l_flags. */
1453         lock->l_flags |= *flags & LDLM_AST_DISCARD_DATA;
1454
1455         /* This distinction between local lock trees is very important; a client
1456          * namespace only has information about locks taken by that client, and
1457          * thus doesn't have enough information to decide for itself if it can
1458          * be granted (below).  In this case, we do exactly what the server
1459          * tells us to do, as dictated by the 'flags'.
1460          *
1461          * We do exactly the same thing during recovery, when the server is
1462          * more or less trusting the clients not to lie.
1463          *
1464          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1465          * granted/converting queues. */
1466         if (local) {
1467                 if (*flags & LDLM_FL_BLOCK_CONV)
1468                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1469                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1470                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1471                 else
1472                         ldlm_grant_lock(lock, NULL);
1473                 GOTO(out, ELDLM_OK);
1474 #ifdef HAVE_SERVER_SUPPORT
1475         } else if (*flags & LDLM_FL_REPLAY) {
1476                 if (*flags & LDLM_FL_BLOCK_CONV) {
1477                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1478                         GOTO(out, ELDLM_OK);
1479                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1480                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1481                         GOTO(out, ELDLM_OK);
1482                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1483                         ldlm_grant_lock(lock, NULL);
1484                         GOTO(out, ELDLM_OK);
1485                 }
1486                 /* If no flags, fall through to normal enqueue path. */
1487         }
1488
1489         policy = ldlm_processing_policy_table[res->lr_type];
1490         policy(lock, flags, 1, &rc, NULL);
1491         GOTO(out, rc);
1492 #else
1493         } else {
1494                 CERROR("This is client-side-only module, cannot handle "
1495                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1496                 LBUG();
1497         }
1498 #endif
1499
1500 out:
1501         unlock_res_and_lock(lock);
1502         if (node)
1503                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1504         return rc;
1505 }
1506
1507 #ifdef HAVE_SERVER_SUPPORT
1508 /* Must be called with namespace taken: queue is waiting or converting. */
1509 int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue,
1510                          cfs_list_t *work_list)
1511 {
1512         cfs_list_t *tmp, *pos;
1513         ldlm_processing_policy policy;
1514         int flags;
1515         int rc = LDLM_ITER_CONTINUE;
1516         ldlm_error_t err;
1517         ENTRY;
1518
1519         check_res_locked(res);
1520
1521         policy = ldlm_processing_policy_table[res->lr_type];
1522         LASSERT(policy);
1523
1524         cfs_list_for_each_safe(tmp, pos, queue) {
1525                 struct ldlm_lock *pending;
1526                 pending = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1527
1528                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1529
1530                 flags = 0;
1531                 rc = policy(pending, &flags, 0, &err, work_list);
1532                 if (rc != LDLM_ITER_CONTINUE)
1533                         break;
1534         }
1535
1536         RETURN(rc);
1537 }
1538 #endif
1539
1540 static int
1541 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1542 {
1543         struct ldlm_cb_set_arg *arg = opaq;
1544         struct ldlm_lock_desc   d;
1545         int                     rc;
1546         struct ldlm_lock       *lock;
1547         ENTRY;
1548
1549         if (cfs_list_empty(arg->list))
1550                 RETURN(-ENOENT);
1551
1552         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1553
1554         /* nobody should touch l_bl_ast */
1555         lock_res_and_lock(lock);
1556         cfs_list_del_init(&lock->l_bl_ast);
1557
1558         LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1559         LASSERT(lock->l_bl_ast_run == 0);
1560         LASSERT(lock->l_blocking_lock);
1561         lock->l_bl_ast_run++;
1562         unlock_res_and_lock(lock);
1563
1564         ldlm_lock2desc(lock->l_blocking_lock, &d);
1565
1566         rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1567         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1568         lock->l_blocking_lock = NULL;
1569         LDLM_LOCK_RELEASE(lock);
1570
1571         RETURN(rc);
1572 }
1573
1574 static int
1575 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1576 {
1577         struct ldlm_cb_set_arg  *arg = opaq;
1578         int                      rc = 0;
1579         struct ldlm_lock        *lock;
1580         ldlm_completion_callback completion_callback;
1581         ENTRY;
1582
1583         if (cfs_list_empty(arg->list))
1584                 RETURN(-ENOENT);
1585
1586         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1587
1588         /* It's possible to receive a completion AST before we've set
1589          * the l_completion_ast pointer: either because the AST arrived
1590          * before the reply, or simply because there's a small race
1591          * window between receiving the reply and finishing the local
1592          * enqueue. (bug 842)
1593          *
1594          * This can't happen with the blocking_ast, however, because we
1595          * will never call the local blocking_ast until we drop our
1596          * reader/writer reference, which we won't do until we get the
1597          * reply and finish enqueueing. */
1598
1599         /* nobody should touch l_cp_ast */
1600         lock_res_and_lock(lock);
1601         cfs_list_del_init(&lock->l_cp_ast);
1602         LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
1603         /* save l_completion_ast since it can be changed by
1604          * mds_intent_policy(), see bug 14225 */
1605         completion_callback = lock->l_completion_ast;
1606         lock->l_flags &= ~LDLM_FL_CP_REQD;
1607         unlock_res_and_lock(lock);
1608
1609         if (completion_callback != NULL)
1610                 rc = completion_callback(lock, 0, (void *)arg);
1611         LDLM_LOCK_RELEASE(lock);
1612
1613         RETURN(rc);
1614 }
1615
1616 static int
1617 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1618 {
1619         struct ldlm_cb_set_arg *arg = opaq;
1620         struct ldlm_lock_desc   desc;
1621         int                     rc;
1622         struct ldlm_lock       *lock;
1623         ENTRY;
1624
1625         if (cfs_list_empty(arg->list))
1626                 RETURN(-ENOENT);
1627
1628         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1629         cfs_list_del_init(&lock->l_rk_ast);
1630
1631         /* the desc just pretend to exclusive */
1632         ldlm_lock2desc(lock, &desc);
1633         desc.l_req_mode = LCK_EX;
1634         desc.l_granted_mode = 0;
1635
1636         rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1637         LDLM_LOCK_RELEASE(lock);
1638
1639         RETURN(rc);
1640 }
1641
1642 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1643 {
1644         struct ldlm_cb_set_arg          *arg = opaq;
1645         struct ldlm_glimpse_work        *gl_work;
1646         struct ldlm_lock                *lock;
1647         int                              rc = 0;
1648         ENTRY;
1649
1650         if (cfs_list_empty(arg->list))
1651                 RETURN(-ENOENT);
1652
1653         gl_work = cfs_list_entry(arg->list->next, struct ldlm_glimpse_work,
1654                                  gl_list);
1655         cfs_list_del_init(&gl_work->gl_list);
1656
1657         lock = gl_work->gl_lock;
1658
1659         /* transfer the glimpse descriptor to ldlm_cb_set_arg */
1660         arg->gl_desc = gl_work->gl_desc;
1661
1662         /* invoke the actual glimpse callback */
1663         if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
1664                 rc = 1;
1665
1666         LDLM_LOCK_RELEASE(lock);
1667
1668         if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
1669                 OBD_FREE_PTR(gl_work);
1670
1671         RETURN(rc);
1672 }
1673
1674 int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
1675                       ldlm_desc_ast_t ast_type)
1676 {
1677         struct ldlm_cb_set_arg *arg;
1678         set_producer_func       work_ast_lock;
1679         int                     rc;
1680
1681         if (cfs_list_empty(rpc_list))
1682                 RETURN(0);
1683
1684         OBD_ALLOC_PTR(arg);
1685         if (arg == NULL)
1686                 RETURN(-ENOMEM);
1687
1688         cfs_atomic_set(&arg->restart, 0);
1689         arg->list = rpc_list;
1690
1691         switch (ast_type) {
1692                 case LDLM_WORK_BL_AST:
1693                         arg->type = LDLM_BL_CALLBACK;
1694                         work_ast_lock = ldlm_work_bl_ast_lock;
1695                         break;
1696                 case LDLM_WORK_CP_AST:
1697                         arg->type = LDLM_CP_CALLBACK;
1698                         work_ast_lock = ldlm_work_cp_ast_lock;
1699                         break;
1700                 case LDLM_WORK_REVOKE_AST:
1701                         arg->type = LDLM_BL_CALLBACK;
1702                         work_ast_lock = ldlm_work_revoke_ast_lock;
1703                         break;
1704                 case LDLM_WORK_GL_AST:
1705                         arg->type = LDLM_GL_CALLBACK;
1706                         work_ast_lock = ldlm_work_gl_ast_lock;
1707                         break;
1708                 default:
1709                         LBUG();
1710         }
1711
1712         /* We create a ptlrpc request set with flow control extension.
1713          * This request set will use the work_ast_lock function to produce new
1714          * requests and will send a new request each time one completes in order
1715          * to keep the number of requests in flight to ns_max_parallel_ast */
1716         arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1717                                      work_ast_lock, arg);
1718         if (arg->set == NULL)
1719                 GOTO(out, rc = -ENOMEM);
1720
1721         ptlrpc_set_wait(arg->set);
1722         ptlrpc_set_destroy(arg->set);
1723
1724         rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0;
1725         GOTO(out, rc);
1726 out:
1727         OBD_FREE_PTR(arg);
1728         return rc;
1729 }
1730
1731 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
1732 {
1733         ldlm_reprocess_all(res);
1734         return LDLM_ITER_CONTINUE;
1735 }
1736
1737 static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1738                               cfs_hlist_node_t *hnode, void *arg)
1739 {
1740         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1741         int    rc;
1742
1743         rc = reprocess_one_queue(res, arg);
1744
1745         return rc == LDLM_ITER_STOP;
1746 }
1747
1748 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
1749 {
1750         ENTRY;
1751
1752         if (ns != NULL) {
1753                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
1754                                          ldlm_reprocess_res, NULL);
1755         }
1756         EXIT;
1757 }
1758 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
1759
1760 void ldlm_reprocess_all(struct ldlm_resource *res)
1761 {
1762         CFS_LIST_HEAD(rpc_list);
1763
1764 #ifdef HAVE_SERVER_SUPPORT
1765         int rc;
1766         ENTRY;
1767         /* Local lock trees don't get reprocessed. */
1768         if (ns_is_client(ldlm_res_to_ns(res))) {
1769                 EXIT;
1770                 return;
1771         }
1772
1773 restart:
1774         lock_res(res);
1775         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
1776         if (rc == LDLM_ITER_CONTINUE)
1777                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
1778         unlock_res(res);
1779
1780         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
1781                                LDLM_WORK_CP_AST);
1782         if (rc == -ERESTART) {
1783                 LASSERT(cfs_list_empty(&rpc_list));
1784                 goto restart;
1785         }
1786 #else
1787         ENTRY;
1788         if (!ns_is_client(ldlm_res_to_ns(res))) {
1789                 CERROR("This is client-side-only module, cannot handle "
1790                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1791                 LBUG();
1792         }
1793 #endif
1794         EXIT;
1795 }
1796
1797 void ldlm_cancel_callback(struct ldlm_lock *lock)
1798 {
1799         check_res_locked(lock->l_resource);
1800         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
1801                 lock->l_flags |= LDLM_FL_CANCEL;
1802                 if (lock->l_blocking_ast) {
1803                         // l_check_no_ns_lock(ns);
1804                         unlock_res_and_lock(lock);
1805                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
1806                                              LDLM_CB_CANCELING);
1807                         lock_res_and_lock(lock);
1808                 } else {
1809                         LDLM_DEBUG(lock, "no blocking ast");
1810                 }
1811         }
1812         lock->l_flags |= LDLM_FL_BL_DONE;
1813 }
1814
1815 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
1816 {
1817         if (req->l_resource->lr_type != LDLM_PLAIN &&
1818             req->l_resource->lr_type != LDLM_IBITS)
1819                 return;
1820
1821         cfs_list_del_init(&req->l_sl_policy);
1822         cfs_list_del_init(&req->l_sl_mode);
1823 }
1824
1825 void ldlm_lock_cancel(struct ldlm_lock *lock)
1826 {
1827         struct ldlm_resource *res;
1828         struct ldlm_namespace *ns;
1829         ENTRY;
1830
1831         lock_res_and_lock(lock);
1832
1833         res = lock->l_resource;
1834         ns  = ldlm_res_to_ns(res);
1835
1836         /* Please do not, no matter how tempting, remove this LBUG without
1837          * talking to me first. -phik */
1838         if (lock->l_readers || lock->l_writers) {
1839                 LDLM_ERROR(lock, "lock still has references");
1840                 LBUG();
1841         }
1842
1843         if (lock->l_waited)
1844                 ldlm_del_waiting_lock(lock);
1845
1846         /* Releases cancel callback. */
1847         ldlm_cancel_callback(lock);
1848
1849         /* Yes, second time, just in case it was added again while we were
1850            running with no res lock in ldlm_cancel_callback */
1851         if (lock->l_waited)
1852                 ldlm_del_waiting_lock(lock);
1853
1854         ldlm_resource_unlink_lock(lock);
1855         ldlm_lock_destroy_nolock(lock);
1856
1857         if (lock->l_granted_mode == lock->l_req_mode)
1858                 ldlm_pool_del(&ns->ns_pool, lock);
1859
1860         /* Make sure we will not be called again for same lock what is possible
1861          * if not to zero out lock->l_granted_mode */
1862         lock->l_granted_mode = LCK_MINMODE;
1863         unlock_res_and_lock(lock);
1864
1865         EXIT;
1866 }
1867 EXPORT_SYMBOL(ldlm_lock_cancel);
1868
1869 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
1870 {
1871         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1872         int rc = -EINVAL;
1873         ENTRY;
1874
1875         if (lock) {
1876                 if (lock->l_ast_data == NULL)
1877                         lock->l_ast_data = data;
1878                 if (lock->l_ast_data == data)
1879                         rc = 0;
1880                 LDLM_LOCK_PUT(lock);
1881         }
1882         RETURN(rc);
1883 }
1884 EXPORT_SYMBOL(ldlm_lock_set_data);
1885
1886 struct export_cl_data {
1887         struct obd_export       *ecl_exp;
1888         int                     ecl_loop;
1889 };
1890
1891 int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1892                                     cfs_hlist_node_t *hnode, void *data)
1893
1894 {
1895         struct export_cl_data   *ecl = (struct export_cl_data *)data;
1896         struct obd_export       *exp  = ecl->ecl_exp;
1897         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
1898         struct ldlm_resource *res;
1899
1900         res = ldlm_resource_getref(lock->l_resource);
1901         LDLM_LOCK_GET(lock);
1902
1903         LDLM_DEBUG(lock, "export %p", exp);
1904         ldlm_res_lvbo_update(res, NULL, 1);
1905         ldlm_lock_cancel(lock);
1906         ldlm_reprocess_all(res);
1907         ldlm_resource_putref(res);
1908         LDLM_LOCK_RELEASE(lock);
1909
1910         ecl->ecl_loop++;
1911         if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
1912                 CDEBUG(D_INFO,
1913                        "Cancel lock %p for export %p (loop %d), still have "
1914                        "%d locks left on hash table.\n",
1915                        lock, exp, ecl->ecl_loop,
1916                        cfs_atomic_read(&hs->hs_count));
1917         }
1918
1919         return 0;
1920 }
1921
1922 void ldlm_cancel_locks_for_export(struct obd_export *exp)
1923 {
1924         struct export_cl_data   ecl = {
1925                 .ecl_exp        = exp,
1926                 .ecl_loop       = 0,
1927         };
1928
1929         cfs_hash_for_each_empty(exp->exp_lock_hash,
1930                                 ldlm_cancel_locks_for_export_cb, &ecl);
1931 }
1932
1933 /**
1934  * Downgrade an exclusive lock.
1935  *
1936  * A fast variant of ldlm_lock_convert for convertion of exclusive
1937  * locks. The convertion is always successful.
1938  *
1939  * \param lock A lock to convert
1940  * \param new_mode new lock mode
1941  */
1942 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
1943 {
1944         ENTRY;
1945
1946         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
1947         LASSERT(new_mode == LCK_COS);
1948
1949         lock_res_and_lock(lock);
1950         ldlm_resource_unlink_lock(lock);
1951         /*
1952          * Remove the lock from pool as it will be added again in
1953          * ldlm_grant_lock() called below.
1954          */
1955         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
1956
1957         lock->l_req_mode = new_mode;
1958         ldlm_grant_lock(lock, NULL);
1959         unlock_res_and_lock(lock);
1960         ldlm_reprocess_all(lock->l_resource);
1961
1962         EXIT;
1963 }
1964 EXPORT_SYMBOL(ldlm_lock_downgrade);
1965
1966 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
1967                                         __u32 *flags)
1968 {
1969         CFS_LIST_HEAD(rpc_list);
1970         struct ldlm_resource *res;
1971         struct ldlm_namespace *ns;
1972         int granted = 0;
1973 #ifdef HAVE_SERVER_SUPPORT
1974         int old_mode;
1975         struct sl_insert_point prev;
1976 #endif
1977         struct ldlm_interval *node;
1978         ENTRY;
1979
1980         if (new_mode == lock->l_granted_mode) { // No changes? Just return.
1981                 *flags |= LDLM_FL_BLOCK_GRANTED;
1982                 RETURN(lock->l_resource);
1983         }
1984
1985         /* I can't check the type of lock here because the bitlock of lock
1986          * is not held here, so do the allocation blindly. -jay */
1987         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1988         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
1989                 RETURN(NULL);
1990
1991         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
1992                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
1993
1994         lock_res_and_lock(lock);
1995
1996         res = lock->l_resource;
1997         ns  = ldlm_res_to_ns(res);
1998
1999 #ifdef HAVE_SERVER_SUPPORT
2000         old_mode = lock->l_req_mode;
2001 #endif
2002         lock->l_req_mode = new_mode;
2003         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
2004 #ifdef HAVE_SERVER_SUPPORT
2005                 /* remember the lock position where the lock might be
2006                  * added back to the granted list later and also
2007                  * remember the join mode for skiplist fixing. */
2008                 prev.res_link = lock->l_res_link.prev;
2009                 prev.mode_link = lock->l_sl_mode.prev;
2010                 prev.policy_link = lock->l_sl_policy.prev;
2011 #endif
2012                 ldlm_resource_unlink_lock(lock);
2013         } else {
2014                 ldlm_resource_unlink_lock(lock);
2015                 if (res->lr_type == LDLM_EXTENT) {
2016                         /* FIXME: ugly code, I have to attach the lock to a
2017                          * interval node again since perhaps it will be granted
2018                          * soon */
2019                         CFS_INIT_LIST_HEAD(&node->li_group);
2020                         ldlm_interval_attach(node, lock);
2021                         node = NULL;
2022                 }
2023         }
2024
2025         /*
2026          * Remove old lock from the pool before adding the lock with new
2027          * mode below in ->policy()
2028          */
2029         ldlm_pool_del(&ns->ns_pool, lock);
2030
2031         /* If this is a local resource, put it on the appropriate list. */
2032         if (ns_is_client(ldlm_res_to_ns(res))) {
2033                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
2034                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
2035                 } else {
2036                         /* This should never happen, because of the way the
2037                          * server handles conversions. */
2038                         LDLM_ERROR(lock, "Erroneous flags %d on local lock\n",
2039                                    *flags);
2040                         LBUG();
2041
2042                         ldlm_grant_lock(lock, &rpc_list);
2043                         granted = 1;
2044                         /* FIXME: completion handling not with lr_lock held ! */
2045                         if (lock->l_completion_ast)
2046                                 lock->l_completion_ast(lock, 0, NULL);
2047                 }
2048 #ifdef HAVE_SERVER_SUPPORT
2049         } else {
2050                 int rc;
2051                 ldlm_error_t err;
2052                 int pflags = 0;
2053                 ldlm_processing_policy policy;
2054                 policy = ldlm_processing_policy_table[res->lr_type];
2055                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
2056                 if (rc == LDLM_ITER_STOP) {
2057                         lock->l_req_mode = old_mode;
2058                         if (res->lr_type == LDLM_EXTENT)
2059                                 ldlm_extent_add_lock(res, lock);
2060                         else
2061                                 ldlm_granted_list_add_lock(lock, &prev);
2062
2063                         res = NULL;
2064                 } else {
2065                         *flags |= LDLM_FL_BLOCK_GRANTED;
2066                         granted = 1;
2067                 }
2068         }
2069 #else
2070         } else {
2071                 CERROR("This is client-side-only module, cannot handle "
2072                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2073                 LBUG();
2074         }
2075 #endif
2076         unlock_res_and_lock(lock);
2077
2078         if (granted)
2079                 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2080         if (node)
2081                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2082         RETURN(res);
2083 }
2084 EXPORT_SYMBOL(ldlm_lock_convert);
2085
2086 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2087 {
2088         struct ldlm_lock *lock;
2089
2090         if (!((libcfs_debug | D_ERROR) & level))
2091                 return;
2092
2093         lock = ldlm_handle2lock(lockh);
2094         if (lock == NULL)
2095                 return;
2096
2097         LDLM_DEBUG_LIMIT(level, lock, "###");
2098
2099         LDLM_LOCK_PUT(lock);
2100 }
2101 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2102
2103 void _ldlm_lock_debug(struct ldlm_lock *lock,
2104                       struct libcfs_debug_msg_data *msgdata,
2105                       const char *fmt, ...)
2106 {
2107         va_list args;
2108         struct obd_export *exp = lock->l_export;
2109         struct ldlm_resource *resource = lock->l_resource;
2110         char *nid = "local";
2111
2112         va_start(args, fmt);
2113
2114         if (exp && exp->exp_connection) {
2115                 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2116         } else if (exp && exp->exp_obd != NULL) {
2117                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2118                 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2119         }
2120
2121         if (resource == NULL) {
2122                 libcfs_debug_vmsg2(msgdata, fmt, args,
2123                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2124                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2125                        "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
2126                        lock,
2127                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2128                        lock->l_readers, lock->l_writers,
2129                        ldlm_lockname[lock->l_granted_mode],
2130                        ldlm_lockname[lock->l_req_mode],
2131                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2132                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2133                        lock->l_pid, lock->l_callback_timeout);
2134                 va_end(args);
2135                 return;
2136         }
2137
2138         switch (resource->lr_type) {
2139         case LDLM_EXTENT:
2140                 libcfs_debug_vmsg2(msgdata, fmt, args,
2141                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2142                        "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64
2143                        "] (req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote:"
2144                        " "LPX64" expref: %d pid: %u timeout %lu\n",
2145                        ldlm_lock_to_ns_name(lock), lock,
2146                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2147                        lock->l_readers, lock->l_writers,
2148                        ldlm_lockname[lock->l_granted_mode],
2149                        ldlm_lockname[lock->l_req_mode],
2150                        resource->lr_name.name[0],
2151                        resource->lr_name.name[1],
2152                        cfs_atomic_read(&resource->lr_refcount),
2153                        ldlm_typename[resource->lr_type],
2154                        lock->l_policy_data.l_extent.start,
2155                        lock->l_policy_data.l_extent.end,
2156                        lock->l_req_extent.start, lock->l_req_extent.end,
2157                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2158                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2159                        lock->l_pid, lock->l_callback_timeout);
2160                 break;
2161
2162         case LDLM_FLOCK:
2163                 libcfs_debug_vmsg2(msgdata, fmt, args,
2164                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2165                        "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d "
2166                        "["LPU64"->"LPU64"] flags: "LPX64" nid: %s remote: "LPX64
2167                        " expref: %d pid: %u timeout: %lu\n",
2168                        ldlm_lock_to_ns_name(lock), lock,
2169                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2170                        lock->l_readers, lock->l_writers,
2171                        ldlm_lockname[lock->l_granted_mode],
2172                        ldlm_lockname[lock->l_req_mode],
2173                        resource->lr_name.name[0],
2174                        resource->lr_name.name[1],
2175                        cfs_atomic_read(&resource->lr_refcount),
2176                        ldlm_typename[resource->lr_type],
2177                        lock->l_policy_data.l_flock.pid,
2178                        lock->l_policy_data.l_flock.start,
2179                        lock->l_policy_data.l_flock.end,
2180                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2181                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2182                        lock->l_pid, lock->l_callback_timeout);
2183                 break;
2184
2185         case LDLM_IBITS:
2186                 libcfs_debug_vmsg2(msgdata, fmt, args,
2187                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2188                        "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "
2189                        "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2190                        "pid: %u timeout: %lu\n",
2191                        ldlm_lock_to_ns_name(lock),
2192                        lock, lock->l_handle.h_cookie,
2193                        cfs_atomic_read (&lock->l_refc),
2194                        lock->l_readers, lock->l_writers,
2195                        ldlm_lockname[lock->l_granted_mode],
2196                        ldlm_lockname[lock->l_req_mode],
2197                        resource->lr_name.name[0],
2198                        resource->lr_name.name[1],
2199                        lock->l_policy_data.l_inodebits.bits,
2200                        cfs_atomic_read(&resource->lr_refcount),
2201                        ldlm_typename[resource->lr_type],
2202                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2203                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2204                        lock->l_pid, lock->l_callback_timeout);
2205                 break;
2206
2207         default:
2208                 libcfs_debug_vmsg2(msgdata, fmt, args,
2209                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2210                        "res: "LPU64"/"LPU64" rrc: %d type: %s flags: "LPX64" "
2211                        "nid: %s remote: "LPX64" expref: %d pid: %u timeout %lu"
2212                        "\n",
2213                        ldlm_lock_to_ns_name(lock),
2214                        lock, lock->l_handle.h_cookie,
2215                        cfs_atomic_read (&lock->l_refc),
2216                        lock->l_readers, lock->l_writers,
2217                        ldlm_lockname[lock->l_granted_mode],
2218                        ldlm_lockname[lock->l_req_mode],
2219                        resource->lr_name.name[0],
2220                        resource->lr_name.name[1],
2221                        cfs_atomic_read(&resource->lr_refcount),
2222                        ldlm_typename[resource->lr_type],
2223                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2224                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2225                        lock->l_pid, lock->l_callback_timeout);
2226                 break;
2227         }
2228         va_end(args);
2229 }
2230 EXPORT_SYMBOL(_ldlm_lock_debug);