Whamcloud - gitweb
LU-2016 mdd: add layout swap between 2 objects
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #ifdef __KERNEL__
45 # include <libcfs/libcfs.h>
46 # include <linux/lustre_intent.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include "ldlm_internal.h"
53
54 /* lock types */
55 char *ldlm_lockname[] = {
56         [0] "--",
57         [LCK_EX] "EX",
58         [LCK_PW] "PW",
59         [LCK_PR] "PR",
60         [LCK_CW] "CW",
61         [LCK_CR] "CR",
62         [LCK_NL] "NL",
63         [LCK_GROUP] "GROUP",
64         [LCK_COS] "COS"
65 };
66 EXPORT_SYMBOL(ldlm_lockname);
67
68 char *ldlm_typename[] = {
69         [LDLM_PLAIN] "PLN",
70         [LDLM_EXTENT] "EXT",
71         [LDLM_FLOCK] "FLK",
72         [LDLM_IBITS] "IBT",
73 };
74 EXPORT_SYMBOL(ldlm_typename);
75
76 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
77         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
78         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
79         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire18_to_local,
80         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
81 };
82
83 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
84         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
85         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
86         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire21_to_local,
87         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
88 };
89
90 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
91         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_local_to_wire,
92         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_local_to_wire,
93         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_local_to_wire,
94         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_local_to_wire,
95 };
96
97 /**
98  * Converts lock policy from local format to on the wire lock_desc format
99  */
100 void ldlm_convert_policy_to_wire(ldlm_type_t type,
101                                  const ldlm_policy_data_t *lpolicy,
102                                  ldlm_wire_policy_data_t *wpolicy)
103 {
104         ldlm_policy_local_to_wire_t convert;
105
106         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
107
108         convert(lpolicy, wpolicy);
109 }
110
111 /**
112  * Converts lock policy from on the wire lock_desc format to local format
113  */
114 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
115                                   const ldlm_wire_policy_data_t *wpolicy,
116                                   ldlm_policy_data_t *lpolicy)
117 {
118         ldlm_policy_wire_to_local_t convert;
119         int new_client;
120
121         /** some badnes for 2.0.0 clients, but 2.0.0 isn't supported */
122         new_client = (exp->exp_connect_flags & OBD_CONNECT_FULL20) != 0;
123         if (new_client)
124                convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
125         else
126                convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
127
128         convert(wpolicy, lpolicy);
129 }
130
131 char *ldlm_it2str(int it)
132 {
133         switch (it) {
134         case IT_OPEN:
135                 return "open";
136         case IT_CREAT:
137                 return "creat";
138         case (IT_OPEN | IT_CREAT):
139                 return "open|creat";
140         case IT_READDIR:
141                 return "readdir";
142         case IT_GETATTR:
143                 return "getattr";
144         case IT_LOOKUP:
145                 return "lookup";
146         case IT_UNLINK:
147                 return "unlink";
148         case IT_GETXATTR:
149                 return "getxattr";
150         case IT_LAYOUT:
151                 return "layout";
152         default:
153                 CERROR("Unknown intent %d\n", it);
154                 return "UNKNOWN";
155         }
156 }
157 EXPORT_SYMBOL(ldlm_it2str);
158
159 extern cfs_mem_cache_t *ldlm_lock_slab;
160
161 #ifdef HAVE_SERVER_SUPPORT
162 static ldlm_processing_policy ldlm_processing_policy_table[] = {
163         [LDLM_PLAIN] ldlm_process_plain_lock,
164         [LDLM_EXTENT] ldlm_process_extent_lock,
165 # ifdef __KERNEL__
166         [LDLM_FLOCK] ldlm_process_flock_lock,
167 # endif
168         [LDLM_IBITS] ldlm_process_inodebits_lock,
169 };
170
171 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
172 {
173         return ldlm_processing_policy_table[res->lr_type];
174 }
175 EXPORT_SYMBOL(ldlm_get_processing_policy);
176 #endif /* HAVE_SERVER_SUPPORT */
177
178 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
179 {
180         ns->ns_policy = arg;
181 }
182 EXPORT_SYMBOL(ldlm_register_intent);
183
184 /*
185  * REFCOUNTED LOCK OBJECTS
186  */
187
188
189 /*
190  * Lock refcounts, during creation:
191  *   - one special one for allocation, dec'd only once in destroy
192  *   - one for being a lock that's in-use
193  *   - one for the addref associated with a new lock
194  */
195 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
196 {
197         cfs_atomic_inc(&lock->l_refc);
198         return lock;
199 }
200 EXPORT_SYMBOL(ldlm_lock_get);
201
202 void ldlm_lock_put(struct ldlm_lock *lock)
203 {
204         ENTRY;
205
206         LASSERT(lock->l_resource != LP_POISON);
207         LASSERT(cfs_atomic_read(&lock->l_refc) > 0);
208         if (cfs_atomic_dec_and_test(&lock->l_refc)) {
209                 struct ldlm_resource *res;
210
211                 LDLM_DEBUG(lock,
212                            "final lock_put on destroyed lock, freeing it.");
213
214                 res = lock->l_resource;
215                 LASSERT(lock->l_destroyed);
216                 LASSERT(cfs_list_empty(&lock->l_res_link));
217                 LASSERT(cfs_list_empty(&lock->l_pending_chain));
218
219                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
220                                      LDLM_NSS_LOCKS);
221                 lu_ref_del(&res->lr_reference, "lock", lock);
222                 ldlm_resource_putref(res);
223                 lock->l_resource = NULL;
224                 if (lock->l_export) {
225                         class_export_lock_put(lock->l_export, lock);
226                         lock->l_export = NULL;
227                 }
228
229                 if (lock->l_lvb_data != NULL)
230                         OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
231
232                 ldlm_interval_free(ldlm_interval_detach(lock));
233                 lu_ref_fini(&lock->l_reference);
234                 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
235         }
236
237         EXIT;
238 }
239 EXPORT_SYMBOL(ldlm_lock_put);
240
241 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
242 {
243         int rc = 0;
244         if (!cfs_list_empty(&lock->l_lru)) {
245                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
246
247                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
248                 cfs_list_del_init(&lock->l_lru);
249                 if (lock->l_flags & LDLM_FL_SKIPPED)
250                         lock->l_flags &= ~LDLM_FL_SKIPPED;
251                 LASSERT(ns->ns_nr_unused > 0);
252                 ns->ns_nr_unused--;
253                 rc = 1;
254         }
255         return rc;
256 }
257
258 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
259 {
260         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
261         int rc;
262
263         ENTRY;
264         if (lock->l_ns_srv) {
265                 LASSERT(cfs_list_empty(&lock->l_lru));
266                 RETURN(0);
267         }
268
269         spin_lock(&ns->ns_lock);
270         rc = ldlm_lock_remove_from_lru_nolock(lock);
271         spin_unlock(&ns->ns_lock);
272         EXIT;
273         return rc;
274 }
275
276 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
277 {
278         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
279
280         lock->l_last_used = cfs_time_current();
281         LASSERT(cfs_list_empty(&lock->l_lru));
282         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
283         cfs_list_add_tail(&lock->l_lru, &ns->ns_unused_list);
284         LASSERT(ns->ns_nr_unused >= 0);
285         ns->ns_nr_unused++;
286 }
287
288 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
289 {
290         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
291
292         ENTRY;
293         spin_lock(&ns->ns_lock);
294         ldlm_lock_add_to_lru_nolock(lock);
295         spin_unlock(&ns->ns_lock);
296         EXIT;
297 }
298
299 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
300 {
301         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
302
303         ENTRY;
304         if (lock->l_ns_srv) {
305                 LASSERT(cfs_list_empty(&lock->l_lru));
306                 EXIT;
307                 return;
308         }
309
310         spin_lock(&ns->ns_lock);
311         if (!cfs_list_empty(&lock->l_lru)) {
312                 ldlm_lock_remove_from_lru_nolock(lock);
313                 ldlm_lock_add_to_lru_nolock(lock);
314         }
315         spin_unlock(&ns->ns_lock);
316         EXIT;
317 }
318
319 /* This used to have a 'strict' flag, which recovery would use to mark an
320  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
321  * shall explain why it's gone: with the new hash table scheme, once you call
322  * ldlm_lock_destroy, you can never drop your final references on this lock.
323  * Because it's not in the hash table anymore.  -phil */
324 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
325 {
326         ENTRY;
327
328         if (lock->l_readers || lock->l_writers) {
329                 LDLM_ERROR(lock, "lock still has references");
330                 LBUG();
331         }
332
333         if (!cfs_list_empty(&lock->l_res_link)) {
334                 LDLM_ERROR(lock, "lock still on resource");
335                 LBUG();
336         }
337
338         if (lock->l_destroyed) {
339                 LASSERT(cfs_list_empty(&lock->l_lru));
340                 EXIT;
341                 return 0;
342         }
343         lock->l_destroyed = 1;
344
345         if (lock->l_export && lock->l_export->exp_lock_hash) {
346                 /* NB: it's safe to call cfs_hash_del() even lock isn't
347                  * in exp_lock_hash. */
348                 /* In the function below, .hs_keycmp resolves to
349                  * ldlm_export_lock_keycmp() */
350                 /* coverity[overrun-buffer-val] */
351                 cfs_hash_del(lock->l_export->exp_lock_hash,
352                              &lock->l_remote_handle, &lock->l_exp_hash);
353         }
354
355         ldlm_lock_remove_from_lru(lock);
356         class_handle_unhash(&lock->l_handle);
357
358 #if 0
359         /* Wake anyone waiting for this lock */
360         /* FIXME: I should probably add yet another flag, instead of using
361          * l_export to only call this on clients */
362         if (lock->l_export)
363                 class_export_put(lock->l_export);
364         lock->l_export = NULL;
365         if (lock->l_export && lock->l_completion_ast)
366                 lock->l_completion_ast(lock, 0);
367 #endif
368         EXIT;
369         return 1;
370 }
371
372 void ldlm_lock_destroy(struct ldlm_lock *lock)
373 {
374         int first;
375         ENTRY;
376         lock_res_and_lock(lock);
377         first = ldlm_lock_destroy_internal(lock);
378         unlock_res_and_lock(lock);
379
380         /* drop reference from hashtable only for first destroy */
381         if (first) {
382                 lu_ref_del(&lock->l_reference, "hash", lock);
383                 LDLM_LOCK_RELEASE(lock);
384         }
385         EXIT;
386 }
387
388 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
389 {
390         int first;
391         ENTRY;
392         first = ldlm_lock_destroy_internal(lock);
393         /* drop reference from hashtable only for first destroy */
394         if (first) {
395                 lu_ref_del(&lock->l_reference, "hash", lock);
396                 LDLM_LOCK_RELEASE(lock);
397         }
398         EXIT;
399 }
400
401 /* this is called by portals_handle2object with the handle lock taken */
402 static void lock_handle_addref(void *lock)
403 {
404         LDLM_LOCK_GET((struct ldlm_lock *)lock);
405 }
406
407 static void lock_handle_free(void *lock, int size)
408 {
409         LASSERT(size == sizeof(struct ldlm_lock));
410         OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
411 }
412
413 struct portals_handle_ops lock_handle_ops = {
414         .hop_addref = lock_handle_addref,
415         .hop_free   = lock_handle_free,
416 };
417
418 /*
419  * usage: pass in a resource on which you have done ldlm_resource_get
420  *        new lock will take over the refcount.
421  * returns: lock with refcount 2 - one for current caller and one for remote
422  */
423 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
424 {
425         struct ldlm_lock *lock;
426         ENTRY;
427
428         if (resource == NULL)
429                 LBUG();
430
431         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, CFS_ALLOC_IO);
432         if (lock == NULL)
433                 RETURN(NULL);
434
435         spin_lock_init(&lock->l_lock);
436         lock->l_resource = resource;
437         lu_ref_add(&resource->lr_reference, "lock", lock);
438
439         cfs_atomic_set(&lock->l_refc, 2);
440         CFS_INIT_LIST_HEAD(&lock->l_res_link);
441         CFS_INIT_LIST_HEAD(&lock->l_lru);
442         CFS_INIT_LIST_HEAD(&lock->l_pending_chain);
443         CFS_INIT_LIST_HEAD(&lock->l_bl_ast);
444         CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
445         CFS_INIT_LIST_HEAD(&lock->l_rk_ast);
446         cfs_waitq_init(&lock->l_waitq);
447         lock->l_blocking_lock = NULL;
448         CFS_INIT_LIST_HEAD(&lock->l_sl_mode);
449         CFS_INIT_LIST_HEAD(&lock->l_sl_policy);
450         CFS_INIT_HLIST_NODE(&lock->l_exp_hash);
451         CFS_INIT_HLIST_NODE(&lock->l_exp_flock_hash);
452
453         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
454                              LDLM_NSS_LOCKS);
455         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
456         class_handle_hash(&lock->l_handle, &lock_handle_ops);
457
458         lu_ref_init(&lock->l_reference);
459         lu_ref_add(&lock->l_reference, "hash", lock);
460         lock->l_callback_timeout = 0;
461
462 #if LUSTRE_TRACKS_LOCK_EXP_REFS
463         CFS_INIT_LIST_HEAD(&lock->l_exp_refs_link);
464         lock->l_exp_refs_nr = 0;
465         lock->l_exp_refs_target = NULL;
466 #endif
467         CFS_INIT_LIST_HEAD(&lock->l_exp_list);
468
469         RETURN(lock);
470 }
471
472 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
473                               const struct ldlm_res_id *new_resid)
474 {
475         struct ldlm_resource *oldres = lock->l_resource;
476         struct ldlm_resource *newres;
477         int type;
478         ENTRY;
479
480         LASSERT(ns_is_client(ns));
481
482         lock_res_and_lock(lock);
483         if (memcmp(new_resid, &lock->l_resource->lr_name,
484                    sizeof(lock->l_resource->lr_name)) == 0) {
485                 /* Nothing to do */
486                 unlock_res_and_lock(lock);
487                 RETURN(0);
488         }
489
490         LASSERT(new_resid->name[0] != 0);
491
492         /* This function assumes that the lock isn't on any lists */
493         LASSERT(cfs_list_empty(&lock->l_res_link));
494
495         type = oldres->lr_type;
496         unlock_res_and_lock(lock);
497
498         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
499         if (newres == NULL)
500                 RETURN(-ENOMEM);
501
502         lu_ref_add(&newres->lr_reference, "lock", lock);
503         /*
504          * To flip the lock from the old to the new resource, lock, oldres and
505          * newres have to be locked. Resource spin-locks are nested within
506          * lock->l_lock, and are taken in the memory address order to avoid
507          * dead-locks.
508          */
509         spin_lock(&lock->l_lock);
510         oldres = lock->l_resource;
511         if (oldres < newres) {
512                 lock_res(oldres);
513                 lock_res_nested(newres, LRT_NEW);
514         } else {
515                 lock_res(newres);
516                 lock_res_nested(oldres, LRT_NEW);
517         }
518         LASSERT(memcmp(new_resid, &oldres->lr_name,
519                        sizeof oldres->lr_name) != 0);
520         lock->l_resource = newres;
521         unlock_res(oldres);
522         unlock_res_and_lock(lock);
523
524         /* ...and the flowers are still standing! */
525         lu_ref_del(&oldres->lr_reference, "lock", lock);
526         ldlm_resource_putref(oldres);
527
528         RETURN(0);
529 }
530 EXPORT_SYMBOL(ldlm_lock_change_resource);
531
532 /*
533  *  HANDLES
534  */
535
536 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
537 {
538         lockh->cookie = lock->l_handle.h_cookie;
539 }
540 EXPORT_SYMBOL(ldlm_lock2handle);
541
542 /* if flags: atomically get the lock and set the flags.
543  *           Return NULL if flag already set
544  */
545
546 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
547                                      __u64 flags)
548 {
549         struct ldlm_lock *lock;
550         ENTRY;
551
552         LASSERT(handle);
553
554         lock = class_handle2object(handle->cookie);
555         if (lock == NULL)
556                 RETURN(NULL);
557
558         /* It's unlikely but possible that someone marked the lock as
559          * destroyed after we did handle2object on it */
560         if (flags == 0 && !lock->l_destroyed) {
561                 lu_ref_add(&lock->l_reference, "handle", cfs_current());
562                 RETURN(lock);
563         }
564
565         lock_res_and_lock(lock);
566
567         LASSERT(lock->l_resource != NULL);
568
569         lu_ref_add_atomic(&lock->l_reference, "handle", cfs_current());
570         if (unlikely(lock->l_destroyed)) {
571                 unlock_res_and_lock(lock);
572                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
573                 LDLM_LOCK_PUT(lock);
574                 RETURN(NULL);
575         }
576
577         if (flags && (lock->l_flags & flags)) {
578                 unlock_res_and_lock(lock);
579                 LDLM_LOCK_PUT(lock);
580                 RETURN(NULL);
581         }
582
583         if (flags)
584                 lock->l_flags |= flags;
585
586         unlock_res_and_lock(lock);
587         RETURN(lock);
588 }
589 EXPORT_SYMBOL(__ldlm_handle2lock);
590
591 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
592 {
593         struct obd_export *exp = lock->l_export?:lock->l_conn_export;
594         /* INODEBITS_INTEROP: If the other side does not support
595          * inodebits, reply with a plain lock descriptor.
596          */
597         if ((lock->l_resource->lr_type == LDLM_IBITS) &&
598             (exp && !(exp->exp_connect_flags & OBD_CONNECT_IBITS))) {
599                 /* Make sure all the right bits are set in this lock we
600                    are going to pass to client */
601                 LASSERTF(lock->l_policy_data.l_inodebits.bits ==
602                          (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
603                           MDS_INODELOCK_LAYOUT),
604                          "Inappropriate inode lock bits during "
605                          "conversion " LPU64 "\n",
606                          lock->l_policy_data.l_inodebits.bits);
607
608                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
609                 desc->l_resource.lr_type = LDLM_PLAIN;
610
611                 /* Convert "new" lock mode to something old client can
612                    understand */
613                 if ((lock->l_req_mode == LCK_CR) ||
614                     (lock->l_req_mode == LCK_CW))
615                         desc->l_req_mode = LCK_PR;
616                 else
617                         desc->l_req_mode = lock->l_req_mode;
618                 if ((lock->l_granted_mode == LCK_CR) ||
619                     (lock->l_granted_mode == LCK_CW)) {
620                         desc->l_granted_mode = LCK_PR;
621                 } else {
622                         /* We never grant PW/EX locks to clients */
623                         LASSERT((lock->l_granted_mode != LCK_PW) &&
624                                 (lock->l_granted_mode != LCK_EX));
625                         desc->l_granted_mode = lock->l_granted_mode;
626                 }
627
628                 /* We do not copy policy here, because there is no
629                    policy for plain locks */
630         } else {
631                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
632                 desc->l_req_mode = lock->l_req_mode;
633                 desc->l_granted_mode = lock->l_granted_mode;
634                 ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
635                                             &lock->l_policy_data,
636                                             &desc->l_policy_data);
637         }
638 }
639 EXPORT_SYMBOL(ldlm_lock2desc);
640
641 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
642                            cfs_list_t *work_list)
643 {
644         if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
645                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
646                 lock->l_flags |= LDLM_FL_AST_SENT;
647                 /* If the enqueuing client said so, tell the AST recipient to
648                  * discard dirty data, rather than writing back. */
649                 if (new->l_flags & LDLM_AST_DISCARD_DATA)
650                         lock->l_flags |= LDLM_FL_DISCARD_DATA;
651                 LASSERT(cfs_list_empty(&lock->l_bl_ast));
652                 cfs_list_add(&lock->l_bl_ast, work_list);
653                 LDLM_LOCK_GET(lock);
654                 LASSERT(lock->l_blocking_lock == NULL);
655                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
656         }
657 }
658
659 void ldlm_add_cp_work_item(struct ldlm_lock *lock, cfs_list_t *work_list)
660 {
661         if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
662                 lock->l_flags |= LDLM_FL_CP_REQD;
663                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
664                 LASSERT(cfs_list_empty(&lock->l_cp_ast));
665                 cfs_list_add(&lock->l_cp_ast, work_list);
666                 LDLM_LOCK_GET(lock);
667         }
668 }
669
670 /* must be called with lr_lock held */
671 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
672                             cfs_list_t *work_list)
673 {
674         ENTRY;
675         check_res_locked(lock->l_resource);
676         if (new)
677                 ldlm_add_bl_work_item(lock, new, work_list);
678         else
679                 ldlm_add_cp_work_item(lock, work_list);
680         EXIT;
681 }
682
683 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
684 {
685         struct ldlm_lock *lock;
686
687         lock = ldlm_handle2lock(lockh);
688         LASSERT(lock != NULL);
689         ldlm_lock_addref_internal(lock, mode);
690         LDLM_LOCK_PUT(lock);
691 }
692 EXPORT_SYMBOL(ldlm_lock_addref);
693
694 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
695 {
696         ldlm_lock_remove_from_lru(lock);
697         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
698                 lock->l_readers++;
699                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
700         }
701         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
702                 lock->l_writers++;
703                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
704         }
705         LDLM_LOCK_GET(lock);
706         lu_ref_add_atomic(&lock->l_reference, "user", lock);
707         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
708 }
709
710 /**
711  * Attempts to addref a lock, and fails if lock is already LDLM_FL_CBPENDING
712  * or destroyed.
713  *
714  * \retval 0 success, lock was addref-ed
715  *
716  * \retval -EAGAIN lock is being canceled.
717  */
718 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
719 {
720         struct ldlm_lock *lock;
721         int               result;
722
723         result = -EAGAIN;
724         lock = ldlm_handle2lock(lockh);
725         if (lock != NULL) {
726                 lock_res_and_lock(lock);
727                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
728                     !(lock->l_flags & LDLM_FL_CBPENDING)) {
729                         ldlm_lock_addref_internal_nolock(lock, mode);
730                         result = 0;
731                 }
732                 unlock_res_and_lock(lock);
733                 LDLM_LOCK_PUT(lock);
734         }
735         return result;
736 }
737 EXPORT_SYMBOL(ldlm_lock_addref_try);
738
739 /* only called for local locks */
740 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
741 {
742         lock_res_and_lock(lock);
743         ldlm_lock_addref_internal_nolock(lock, mode);
744         unlock_res_and_lock(lock);
745 }
746
747 /* only called in ldlm_flock_destroy and for local locks.
748  *  * for LDLM_FLOCK type locks, l_blocking_ast is null, and
749  *   * ldlm_lock_remove_from_lru() does nothing, it is safe
750  *    * for ldlm_flock_destroy usage by dropping some code */
751 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
752 {
753         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
754         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
755                 LASSERT(lock->l_readers > 0);
756                 lu_ref_del(&lock->l_reference, "reader", lock);
757                 lock->l_readers--;
758         }
759         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
760                 LASSERT(lock->l_writers > 0);
761                 lu_ref_del(&lock->l_reference, "writer", lock);
762                 lock->l_writers--;
763         }
764
765         lu_ref_del(&lock->l_reference, "user", lock);
766         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
767 }
768
769 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
770 {
771         struct ldlm_namespace *ns;
772         ENTRY;
773
774         lock_res_and_lock(lock);
775
776         ns = ldlm_lock_to_ns(lock);
777
778         ldlm_lock_decref_internal_nolock(lock, mode);
779
780         if (lock->l_flags & LDLM_FL_LOCAL &&
781             !lock->l_readers && !lock->l_writers) {
782                 /* If this is a local lock on a server namespace and this was
783                  * the last reference, cancel the lock. */
784                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
785                 lock->l_flags |= LDLM_FL_CBPENDING;
786         }
787
788         if (!lock->l_readers && !lock->l_writers &&
789             (lock->l_flags & LDLM_FL_CBPENDING)) {
790                 /* If we received a blocked AST and this was the last reference,
791                  * run the callback. */
792                 if (lock->l_ns_srv && lock->l_export)
793                         CERROR("FL_CBPENDING set on non-local lock--just a "
794                                "warning\n");
795
796                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
797
798                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
799                 ldlm_lock_remove_from_lru(lock);
800                 unlock_res_and_lock(lock);
801
802                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
803                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
804
805                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
806                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
807                         ldlm_handle_bl_callback(ns, NULL, lock);
808         } else if (ns_is_client(ns) &&
809                    !lock->l_readers && !lock->l_writers &&
810                    !(lock->l_flags & LDLM_FL_NO_LRU) &&
811                    !(lock->l_flags & LDLM_FL_BL_AST)) {
812
813                 LDLM_DEBUG(lock, "add lock into lru list");
814
815                 /* If this is a client-side namespace and this was the last
816                  * reference, put it on the LRU. */
817                 ldlm_lock_add_to_lru(lock);
818                 unlock_res_and_lock(lock);
819
820                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
821                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
822
823                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
824                  * are not supported by the server, otherwise, it is done on
825                  * enqueue. */
826                 if (!exp_connect_cancelset(lock->l_conn_export) &&
827                     !ns_connect_lru_resize(ns))
828                         ldlm_cancel_lru(ns, 0, LDLM_ASYNC, 0);
829         } else {
830                 LDLM_DEBUG(lock, "do not add lock into lru list");
831                 unlock_res_and_lock(lock);
832         }
833
834         EXIT;
835 }
836
837 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
838 {
839         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
840         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
841         ldlm_lock_decref_internal(lock, mode);
842         LDLM_LOCK_PUT(lock);
843 }
844 EXPORT_SYMBOL(ldlm_lock_decref);
845
846 /* This will drop a lock reference and mark it for destruction, but will not
847  * necessarily cancel the lock before returning. */
848 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
849 {
850         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
851         ENTRY;
852
853         LASSERT(lock != NULL);
854
855         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
856         lock_res_and_lock(lock);
857         lock->l_flags |= LDLM_FL_CBPENDING;
858         unlock_res_and_lock(lock);
859         ldlm_lock_decref_internal(lock, mode);
860         LDLM_LOCK_PUT(lock);
861 }
862 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
863
864 struct sl_insert_point {
865         cfs_list_t *res_link;
866         cfs_list_t *mode_link;
867         cfs_list_t *policy_link;
868 };
869
870 /*
871  * search_granted_lock
872  *
873  * Description:
874  *      Finds a position to insert the new lock.
875  * Parameters:
876  *      queue [input]:  the granted list where search acts on;
877  *      req [input]:    the lock whose position to be located;
878  *      prev [output]:  positions within 3 lists to insert @req to
879  * Return Value:
880  *      filled @prev
881  * NOTE: called by
882  *  - ldlm_grant_lock_with_skiplist
883  */
884 static void search_granted_lock(cfs_list_t *queue,
885                                 struct ldlm_lock *req,
886                                 struct sl_insert_point *prev)
887 {
888         cfs_list_t *tmp;
889         struct ldlm_lock *lock, *mode_end, *policy_end;
890         ENTRY;
891
892         cfs_list_for_each(tmp, queue) {
893                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
894
895                 mode_end = cfs_list_entry(lock->l_sl_mode.prev,
896                                           struct ldlm_lock, l_sl_mode);
897
898                 if (lock->l_req_mode != req->l_req_mode) {
899                         /* jump to last lock of mode group */
900                         tmp = &mode_end->l_res_link;
901                         continue;
902                 }
903
904                 /* suitable mode group is found */
905                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
906                         /* insert point is last lock of the mode group */
907                         prev->res_link = &mode_end->l_res_link;
908                         prev->mode_link = &mode_end->l_sl_mode;
909                         prev->policy_link = &req->l_sl_policy;
910                         EXIT;
911                         return;
912                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
913                         for (;;) {
914                                 policy_end =
915                                         cfs_list_entry(lock->l_sl_policy.prev,
916                                                        struct ldlm_lock,
917                                                        l_sl_policy);
918
919                                 if (lock->l_policy_data.l_inodebits.bits ==
920                                     req->l_policy_data.l_inodebits.bits) {
921                                         /* insert point is last lock of
922                                          * the policy group */
923                                         prev->res_link =
924                                                 &policy_end->l_res_link;
925                                         prev->mode_link =
926                                                 &policy_end->l_sl_mode;
927                                         prev->policy_link =
928                                                 &policy_end->l_sl_policy;
929                                         EXIT;
930                                         return;
931                                 }
932
933                                 if (policy_end == mode_end)
934                                         /* done with mode group */
935                                         break;
936
937                                 /* go to next policy group within mode group */
938                                 tmp = policy_end->l_res_link.next;
939                                 lock = cfs_list_entry(tmp, struct ldlm_lock,
940                                                       l_res_link);
941                         }  /* loop over policy groups within the mode group */
942
943                         /* insert point is last lock of the mode group,
944                          * new policy group is started */
945                         prev->res_link = &mode_end->l_res_link;
946                         prev->mode_link = &mode_end->l_sl_mode;
947                         prev->policy_link = &req->l_sl_policy;
948                         EXIT;
949                         return;
950                 } else {
951                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
952                         LBUG();
953                 }
954         }
955
956         /* insert point is last lock on the queue,
957          * new mode group and new policy group are started */
958         prev->res_link = queue->prev;
959         prev->mode_link = &req->l_sl_mode;
960         prev->policy_link = &req->l_sl_policy;
961         EXIT;
962         return;
963 }
964
965 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
966                                        struct sl_insert_point *prev)
967 {
968         struct ldlm_resource *res = lock->l_resource;
969         ENTRY;
970
971         check_res_locked(res);
972
973         ldlm_resource_dump(D_INFO, res);
974         LDLM_DEBUG(lock, "About to add lock:");
975
976         if (lock->l_destroyed) {
977                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
978                 return;
979         }
980
981         LASSERT(cfs_list_empty(&lock->l_res_link));
982         LASSERT(cfs_list_empty(&lock->l_sl_mode));
983         LASSERT(cfs_list_empty(&lock->l_sl_policy));
984
985         /*
986          * lock->link == prev->link means lock is first starting the group.
987          * Don't re-add to itself to suppress kernel warnings.
988          */
989         if (&lock->l_res_link != prev->res_link)
990                 cfs_list_add(&lock->l_res_link, prev->res_link);
991         if (&lock->l_sl_mode != prev->mode_link)
992                 cfs_list_add(&lock->l_sl_mode, prev->mode_link);
993         if (&lock->l_sl_policy != prev->policy_link)
994                 cfs_list_add(&lock->l_sl_policy, prev->policy_link);
995
996         EXIT;
997 }
998
999 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
1000 {
1001         struct sl_insert_point prev;
1002         ENTRY;
1003
1004         LASSERT(lock->l_req_mode == lock->l_granted_mode);
1005
1006         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
1007         ldlm_granted_list_add_lock(lock, &prev);
1008         EXIT;
1009 }
1010
1011 /* NOTE: called by
1012  *  - ldlm_lock_enqueue
1013  *  - ldlm_reprocess_queue
1014  *  - ldlm_lock_convert
1015  *
1016  * must be called with lr_lock held
1017  */
1018 void ldlm_grant_lock(struct ldlm_lock *lock, cfs_list_t *work_list)
1019 {
1020         struct ldlm_resource *res = lock->l_resource;
1021         ENTRY;
1022
1023         check_res_locked(res);
1024
1025         lock->l_granted_mode = lock->l_req_mode;
1026         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1027                 ldlm_grant_lock_with_skiplist(lock);
1028         else if (res->lr_type == LDLM_EXTENT)
1029                 ldlm_extent_add_lock(res, lock);
1030         else
1031                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1032
1033         if (lock->l_granted_mode < res->lr_most_restr)
1034                 res->lr_most_restr = lock->l_granted_mode;
1035
1036         if (work_list && lock->l_completion_ast != NULL)
1037                 ldlm_add_ast_work_item(lock, NULL, work_list);
1038
1039         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1040         EXIT;
1041 }
1042
1043 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
1044  * comment above ldlm_lock_match */
1045 static struct ldlm_lock *search_queue(cfs_list_t *queue,
1046                                       ldlm_mode_t *mode,
1047                                       ldlm_policy_data_t *policy,
1048                                       struct ldlm_lock *old_lock,
1049                                       __u64 flags, int unref)
1050 {
1051         struct ldlm_lock *lock;
1052         cfs_list_t       *tmp;
1053
1054         cfs_list_for_each(tmp, queue) {
1055                 ldlm_mode_t match;
1056
1057                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1058
1059                 if (lock == old_lock)
1060                         break;
1061
1062                 /* llite sometimes wants to match locks that will be
1063                  * canceled when their users drop, but we allow it to match
1064                  * if it passes in CBPENDING and the lock still has users.
1065                  * this is generally only going to be used by children
1066                  * whose parents already hold a lock so forward progress
1067                  * can still happen. */
1068                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1069                     !(flags & LDLM_FL_CBPENDING))
1070                         continue;
1071                 if (!unref && lock->l_flags & LDLM_FL_CBPENDING &&
1072                     lock->l_readers == 0 && lock->l_writers == 0)
1073                         continue;
1074
1075                 if (!(lock->l_req_mode & *mode))
1076                         continue;
1077                 match = lock->l_req_mode;
1078
1079                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
1080                     (lock->l_policy_data.l_extent.start >
1081                      policy->l_extent.start ||
1082                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
1083                         continue;
1084
1085                 if (unlikely(match == LCK_GROUP) &&
1086                     lock->l_resource->lr_type == LDLM_EXTENT &&
1087                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1088                         continue;
1089
1090                 /* We match if we have existing lock with same or wider set
1091                    of bits. */
1092                 if (lock->l_resource->lr_type == LDLM_IBITS &&
1093                      ((lock->l_policy_data.l_inodebits.bits &
1094                       policy->l_inodebits.bits) !=
1095                       policy->l_inodebits.bits))
1096                         continue;
1097
1098                 if (!unref &&
1099                     (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1100                      lock->l_failed))
1101                         continue;
1102
1103                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
1104                     !(lock->l_flags & LDLM_FL_LOCAL))
1105                         continue;
1106
1107                 if (flags & LDLM_FL_TEST_LOCK) {
1108                         LDLM_LOCK_GET(lock);
1109                         ldlm_lock_touch_in_lru(lock);
1110                 } else {
1111                         ldlm_lock_addref_internal_nolock(lock, match);
1112                 }
1113                 *mode = match;
1114                 return lock;
1115         }
1116
1117         return NULL;
1118 }
1119
1120 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1121 {
1122         if (!lock->l_failed) {
1123                 lock->l_failed = 1;
1124                 cfs_waitq_broadcast(&lock->l_waitq);
1125         }
1126 }
1127 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1128
1129 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1130 {
1131         lock_res_and_lock(lock);
1132         ldlm_lock_fail_match_locked(lock);
1133         unlock_res_and_lock(lock);
1134 }
1135 EXPORT_SYMBOL(ldlm_lock_fail_match);
1136
1137 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1138 {
1139         lock->l_flags |= LDLM_FL_LVB_READY;
1140         cfs_waitq_broadcast(&lock->l_waitq);
1141 }
1142 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
1143
1144 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1145 {
1146         lock_res_and_lock(lock);
1147         ldlm_lock_allow_match_locked(lock);
1148         unlock_res_and_lock(lock);
1149 }
1150 EXPORT_SYMBOL(ldlm_lock_allow_match);
1151
1152 /* Can be called in two ways:
1153  *
1154  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1155  * for a duplicate of.
1156  *
1157  * Otherwise, all of the fields must be filled in, to match against.
1158  *
1159  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1160  *     server (ie, connh is NULL)
1161  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1162  *     list will be considered
1163  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1164  *     to be canceled can still be matched as long as they still have reader
1165  *     or writer refernces
1166  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1167  *     just tell us if we would have matched.
1168  *
1169  * Returns 1 if it finds an already-existing lock that is compatible; in this
1170  * case, lockh is filled in with a addref()ed lock
1171  *
1172  * we also check security context, if that failed we simply return 0 (to keep
1173  * caller code unchanged), the context failure will be discovered by caller
1174  * sometime later.
1175  */
1176 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
1177                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1178                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1179                             struct lustre_handle *lockh, int unref)
1180 {
1181         struct ldlm_resource *res;
1182         struct ldlm_lock *lock, *old_lock = NULL;
1183         int rc = 0;
1184         ENTRY;
1185
1186         if (ns == NULL) {
1187                 old_lock = ldlm_handle2lock(lockh);
1188                 LASSERT(old_lock);
1189
1190                 ns = ldlm_lock_to_ns(old_lock);
1191                 res_id = &old_lock->l_resource->lr_name;
1192                 type = old_lock->l_resource->lr_type;
1193                 mode = old_lock->l_req_mode;
1194         }
1195
1196         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1197         if (res == NULL) {
1198                 LASSERT(old_lock == NULL);
1199                 RETURN(0);
1200         }
1201
1202         LDLM_RESOURCE_ADDREF(res);
1203         lock_res(res);
1204
1205         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1206                             flags, unref);
1207         if (lock != NULL)
1208                 GOTO(out, rc = 1);
1209         if (flags & LDLM_FL_BLOCK_GRANTED)
1210                 GOTO(out, rc = 0);
1211         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1212                             flags, unref);
1213         if (lock != NULL)
1214                 GOTO(out, rc = 1);
1215         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1216                             flags, unref);
1217         if (lock != NULL)
1218                 GOTO(out, rc = 1);
1219
1220         EXIT;
1221  out:
1222         unlock_res(res);
1223         LDLM_RESOURCE_DELREF(res);
1224         ldlm_resource_putref(res);
1225
1226         if (lock) {
1227                 ldlm_lock2handle(lock, lockh);
1228                 if ((flags & LDLM_FL_LVB_READY) &&
1229                     (!(lock->l_flags & LDLM_FL_LVB_READY))) {
1230                         struct l_wait_info lwi;
1231                         if (lock->l_completion_ast) {
1232                                 int err = lock->l_completion_ast(lock,
1233                                                           LDLM_FL_WAIT_NOREPROC,
1234                                                                  NULL);
1235                                 if (err) {
1236                                         if (flags & LDLM_FL_TEST_LOCK)
1237                                                 LDLM_LOCK_RELEASE(lock);
1238                                         else
1239                                                 ldlm_lock_decref_internal(lock,
1240                                                                           mode);
1241                                         rc = 0;
1242                                         goto out2;
1243                                 }
1244                         }
1245
1246                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1247                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1248
1249                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1250                         l_wait_event(lock->l_waitq,
1251                                      lock->l_flags & LDLM_FL_LVB_READY ||
1252                                      lock->l_destroyed || lock->l_failed,
1253                                      &lwi);
1254                         if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
1255                                 if (flags & LDLM_FL_TEST_LOCK)
1256                                         LDLM_LOCK_RELEASE(lock);
1257                                 else
1258                                         ldlm_lock_decref_internal(lock, mode);
1259                                 rc = 0;
1260                         }
1261                 }
1262         }
1263  out2:
1264         if (rc) {
1265                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1266                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1267                                 res_id->name[2] : policy->l_extent.start,
1268                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1269                                 res_id->name[3] : policy->l_extent.end);
1270
1271                 /* check user's security context */
1272                 if (lock->l_conn_export &&
1273                     sptlrpc_import_check_ctx(
1274                                 class_exp2cliimp(lock->l_conn_export))) {
1275                         if (!(flags & LDLM_FL_TEST_LOCK))
1276                                 ldlm_lock_decref_internal(lock, mode);
1277                         rc = 0;
1278                 }
1279
1280                 if (flags & LDLM_FL_TEST_LOCK)
1281                         LDLM_LOCK_RELEASE(lock);
1282
1283         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1284                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1285                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1286                                   type, mode, res_id->name[0], res_id->name[1],
1287                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1288                                         res_id->name[2] :policy->l_extent.start,
1289                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1290                                         res_id->name[3] : policy->l_extent.end);
1291         }
1292         if (old_lock)
1293                 LDLM_LOCK_PUT(old_lock);
1294
1295         return rc ? mode : 0;
1296 }
1297 EXPORT_SYMBOL(ldlm_lock_match);
1298
1299 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1300                                         __u64 *bits)
1301 {
1302         struct ldlm_lock *lock;
1303         ldlm_mode_t mode = 0;
1304         ENTRY;
1305
1306         lock = ldlm_handle2lock(lockh);
1307         if (lock != NULL) {
1308                 lock_res_and_lock(lock);
1309                 if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1310                     lock->l_failed)
1311                         GOTO(out, mode);
1312
1313                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1314                     lock->l_readers == 0 && lock->l_writers == 0)
1315                         GOTO(out, mode);
1316
1317                 if (bits)
1318                         *bits = lock->l_policy_data.l_inodebits.bits;
1319                 mode = lock->l_granted_mode;
1320                 ldlm_lock_addref_internal_nolock(lock, mode);
1321         }
1322
1323         EXIT;
1324
1325 out:
1326         if (lock != NULL) {
1327                 unlock_res_and_lock(lock);
1328                 LDLM_LOCK_PUT(lock);
1329         }
1330         return mode;
1331 }
1332 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1333
1334 /* The caller's duty to guarantee the buffer is large enough. */
1335 int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
1336                   enum req_location loc, void *data, int size)
1337 {
1338         void *lvb;
1339         ENTRY;
1340
1341         LASSERT(data != NULL);
1342         LASSERT(size >= 0);
1343
1344         switch (lock->l_lvb_type) {
1345         case LVB_T_OST:
1346                 if (size == sizeof(struct ost_lvb)) {
1347                         if (loc == RCL_CLIENT)
1348                                 lvb = req_capsule_client_swab_get(pill,
1349                                                 &RMF_DLM_LVB,
1350                                                 lustre_swab_ost_lvb);
1351                         else
1352                                 lvb = req_capsule_server_swab_get(pill,
1353                                                 &RMF_DLM_LVB,
1354                                                 lustre_swab_ost_lvb);
1355                         if (unlikely(lvb == NULL)) {
1356                                 LDLM_ERROR(lock, "no LVB");
1357                                 RETURN(-EPROTO);
1358                         }
1359
1360                         memcpy(data, lvb, size);
1361                 } else if (size == sizeof(struct ost_lvb_v1)) {
1362                         struct ost_lvb *olvb = data;
1363
1364                         if (loc == RCL_CLIENT)
1365                                 lvb = req_capsule_client_swab_get(pill,
1366                                                 &RMF_DLM_LVB,
1367                                                 lustre_swab_ost_lvb_v1);
1368                         else
1369                                 lvb = req_capsule_server_sized_swab_get(pill,
1370                                                 &RMF_DLM_LVB, size,
1371                                                 lustre_swab_ost_lvb_v1);
1372                         if (unlikely(lvb == NULL)) {
1373                                 LDLM_ERROR(lock, "no LVB");
1374                                 RETURN(-EPROTO);
1375                         }
1376
1377                         memcpy(data, lvb, size);
1378                         olvb->lvb_mtime_ns = 0;
1379                         olvb->lvb_atime_ns = 0;
1380                         olvb->lvb_ctime_ns = 0;
1381                 } else {
1382                         LDLM_ERROR(lock, "Replied unexpected ost LVB size %d",
1383                                    size);
1384                         RETURN(-EINVAL);
1385                 }
1386                 break;
1387         case LVB_T_LQUOTA:
1388                 if (size == sizeof(struct lquota_lvb)) {
1389                         if (loc == RCL_CLIENT)
1390                                 lvb = req_capsule_client_swab_get(pill,
1391                                                 &RMF_DLM_LVB,
1392                                                 lustre_swab_lquota_lvb);
1393                         else
1394                                 lvb = req_capsule_server_swab_get(pill,
1395                                                 &RMF_DLM_LVB,
1396                                                 lustre_swab_lquota_lvb);
1397                         if (unlikely(lvb == NULL)) {
1398                                 LDLM_ERROR(lock, "no LVB");
1399                                 RETURN(-EPROTO);
1400                         }
1401
1402                         memcpy(data, lvb, size);
1403                 } else {
1404                         LDLM_ERROR(lock, "Replied unexpected lquota LVB size %d",
1405                                    size);
1406                         RETURN(-EINVAL);
1407                 }
1408                 break;
1409         case LVB_T_LAYOUT:
1410                 if (size == 0)
1411                         break;
1412
1413                 if (loc == RCL_CLIENT)
1414                         lvb = req_capsule_client_get(pill, &RMF_DLM_LVB);
1415                 else
1416                         lvb = req_capsule_server_get(pill, &RMF_DLM_LVB);
1417                 if (unlikely(lvb == NULL)) {
1418                         LDLM_ERROR(lock, "no LVB");
1419                         RETURN(-EPROTO);
1420                 }
1421
1422                 memcpy(data, lvb, size);
1423                 break;
1424         default:
1425                 LDLM_ERROR(lock, "Unexpected LVB type");
1426                 RETURN(-EINVAL);
1427         }
1428
1429         RETURN(0);
1430 }
1431
1432 /* Returns a referenced lock */
1433 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1434                                    const struct ldlm_res_id *res_id,
1435                                    ldlm_type_t type,
1436                                    ldlm_mode_t mode,
1437                                    const struct ldlm_callback_suite *cbs,
1438                                    void *data, __u32 lvb_len,
1439                                    enum lvb_type lvb_type)
1440 {
1441         struct ldlm_lock *lock;
1442         struct ldlm_resource *res;
1443         ENTRY;
1444
1445         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1446         if (res == NULL)
1447                 RETURN(NULL);
1448
1449         lock = ldlm_lock_new(res);
1450
1451         if (lock == NULL)
1452                 RETURN(NULL);
1453
1454         lock->l_req_mode = mode;
1455         lock->l_ast_data = data;
1456         lock->l_pid = cfs_curproc_pid();
1457         lock->l_ns_srv = !!ns_is_server(ns);
1458         if (cbs) {
1459                 lock->l_blocking_ast = cbs->lcs_blocking;
1460                 lock->l_completion_ast = cbs->lcs_completion;
1461                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1462                 lock->l_weigh_ast = cbs->lcs_weigh;
1463         }
1464
1465         lock->l_tree_node = NULL;
1466         /* if this is the extent lock, allocate the interval tree node */
1467         if (type == LDLM_EXTENT) {
1468                 if (ldlm_interval_alloc(lock) == NULL)
1469                         GOTO(out, 0);
1470         }
1471
1472         if (lvb_len) {
1473                 lock->l_lvb_len = lvb_len;
1474                 OBD_ALLOC(lock->l_lvb_data, lvb_len);
1475                 if (lock->l_lvb_data == NULL)
1476                         GOTO(out, 0);
1477         }
1478
1479         lock->l_lvb_type = lvb_type;
1480         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1481                 GOTO(out, 0);
1482
1483         RETURN(lock);
1484
1485 out:
1486         ldlm_lock_destroy(lock);
1487         LDLM_LOCK_RELEASE(lock);
1488         return NULL;
1489 }
1490
1491 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1492                                struct ldlm_lock **lockp,
1493                                void *cookie, __u64 *flags)
1494 {
1495         struct ldlm_lock *lock = *lockp;
1496         struct ldlm_resource *res = lock->l_resource;
1497         int local = ns_is_client(ldlm_res_to_ns(res));
1498 #ifdef HAVE_SERVER_SUPPORT
1499         ldlm_processing_policy policy;
1500 #endif
1501         ldlm_error_t rc = ELDLM_OK;
1502         struct ldlm_interval *node = NULL;
1503         ENTRY;
1504
1505         lock->l_last_activity = cfs_time_current_sec();
1506         /* policies are not executed on the client or during replay */
1507         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1508             && !local && ns->ns_policy) {
1509                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1510                                    NULL);
1511                 if (rc == ELDLM_LOCK_REPLACED) {
1512                         /* The lock that was returned has already been granted,
1513                          * and placed into lockp.  If it's not the same as the
1514                          * one we passed in, then destroy the old one and our
1515                          * work here is done. */
1516                         if (lock != *lockp) {
1517                                 ldlm_lock_destroy(lock);
1518                                 LDLM_LOCK_RELEASE(lock);
1519                         }
1520                         *flags |= LDLM_FL_LOCK_CHANGED;
1521                         RETURN(0);
1522                 } else if (rc != ELDLM_OK ||
1523                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1524                         ldlm_lock_destroy(lock);
1525                         RETURN(rc);
1526                 }
1527         }
1528
1529         /* For a replaying lock, it might be already in granted list. So
1530          * unlinking the lock will cause the interval node to be freed, we
1531          * have to allocate the interval node early otherwise we can't regrant
1532          * this lock in the future. - jay */
1533         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1534                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1535
1536         lock_res_and_lock(lock);
1537         if (local && lock->l_req_mode == lock->l_granted_mode) {
1538                 /* The server returned a blocked lock, but it was granted
1539                  * before we got a chance to actually enqueue it.  We don't
1540                  * need to do anything else. */
1541                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1542                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1543                 GOTO(out, ELDLM_OK);
1544         }
1545
1546         ldlm_resource_unlink_lock(lock);
1547         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1548                 if (node == NULL) {
1549                         ldlm_lock_destroy_nolock(lock);
1550                         GOTO(out, rc = -ENOMEM);
1551                 }
1552
1553                 CFS_INIT_LIST_HEAD(&node->li_group);
1554                 ldlm_interval_attach(node, lock);
1555                 node = NULL;
1556         }
1557
1558         /* Some flags from the enqueue want to make it into the AST, via the
1559          * lock's l_flags. */
1560         lock->l_flags |= *flags & LDLM_AST_DISCARD_DATA;
1561
1562         /* This distinction between local lock trees is very important; a client
1563          * namespace only has information about locks taken by that client, and
1564          * thus doesn't have enough information to decide for itself if it can
1565          * be granted (below).  In this case, we do exactly what the server
1566          * tells us to do, as dictated by the 'flags'.
1567          *
1568          * We do exactly the same thing during recovery, when the server is
1569          * more or less trusting the clients not to lie.
1570          *
1571          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1572          * granted/converting queues. */
1573         if (local) {
1574                 if (*flags & LDLM_FL_BLOCK_CONV)
1575                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1576                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1577                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1578                 else
1579                         ldlm_grant_lock(lock, NULL);
1580                 GOTO(out, ELDLM_OK);
1581 #ifdef HAVE_SERVER_SUPPORT
1582         } else if (*flags & LDLM_FL_REPLAY) {
1583                 if (*flags & LDLM_FL_BLOCK_CONV) {
1584                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1585                         GOTO(out, ELDLM_OK);
1586                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1587                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1588                         GOTO(out, ELDLM_OK);
1589                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1590                         ldlm_grant_lock(lock, NULL);
1591                         GOTO(out, ELDLM_OK);
1592                 }
1593                 /* If no flags, fall through to normal enqueue path. */
1594         }
1595
1596         policy = ldlm_processing_policy_table[res->lr_type];
1597         policy(lock, flags, 1, &rc, NULL);
1598         GOTO(out, rc);
1599 #else
1600         } else {
1601                 CERROR("This is client-side-only module, cannot handle "
1602                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1603                 LBUG();
1604         }
1605 #endif
1606
1607 out:
1608         unlock_res_and_lock(lock);
1609         if (node)
1610                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1611         return rc;
1612 }
1613
1614 #ifdef HAVE_SERVER_SUPPORT
1615 /* Must be called with namespace taken: queue is waiting or converting. */
1616 int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue,
1617                          cfs_list_t *work_list)
1618 {
1619         cfs_list_t *tmp, *pos;
1620         ldlm_processing_policy policy;
1621         __u64 flags;
1622         int rc = LDLM_ITER_CONTINUE;
1623         ldlm_error_t err;
1624         ENTRY;
1625
1626         check_res_locked(res);
1627
1628         policy = ldlm_processing_policy_table[res->lr_type];
1629         LASSERT(policy);
1630
1631         cfs_list_for_each_safe(tmp, pos, queue) {
1632                 struct ldlm_lock *pending;
1633                 pending = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1634
1635                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1636
1637                 flags = 0;
1638                 rc = policy(pending, &flags, 0, &err, work_list);
1639                 if (rc != LDLM_ITER_CONTINUE)
1640                         break;
1641         }
1642
1643         RETURN(rc);
1644 }
1645 #endif
1646
1647 static int
1648 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1649 {
1650         struct ldlm_cb_set_arg *arg = opaq;
1651         struct ldlm_lock_desc   d;
1652         int                     rc;
1653         struct ldlm_lock       *lock;
1654         ENTRY;
1655
1656         if (cfs_list_empty(arg->list))
1657                 RETURN(-ENOENT);
1658
1659         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1660
1661         /* nobody should touch l_bl_ast */
1662         lock_res_and_lock(lock);
1663         cfs_list_del_init(&lock->l_bl_ast);
1664
1665         LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1666         LASSERT(lock->l_bl_ast_run == 0);
1667         LASSERT(lock->l_blocking_lock);
1668         lock->l_bl_ast_run++;
1669         unlock_res_and_lock(lock);
1670
1671         ldlm_lock2desc(lock->l_blocking_lock, &d);
1672
1673         rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1674         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1675         lock->l_blocking_lock = NULL;
1676         LDLM_LOCK_RELEASE(lock);
1677
1678         RETURN(rc);
1679 }
1680
1681 static int
1682 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1683 {
1684         struct ldlm_cb_set_arg  *arg = opaq;
1685         int                      rc = 0;
1686         struct ldlm_lock        *lock;
1687         ldlm_completion_callback completion_callback;
1688         ENTRY;
1689
1690         if (cfs_list_empty(arg->list))
1691                 RETURN(-ENOENT);
1692
1693         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1694
1695         /* It's possible to receive a completion AST before we've set
1696          * the l_completion_ast pointer: either because the AST arrived
1697          * before the reply, or simply because there's a small race
1698          * window between receiving the reply and finishing the local
1699          * enqueue. (bug 842)
1700          *
1701          * This can't happen with the blocking_ast, however, because we
1702          * will never call the local blocking_ast until we drop our
1703          * reader/writer reference, which we won't do until we get the
1704          * reply and finish enqueueing. */
1705
1706         /* nobody should touch l_cp_ast */
1707         lock_res_and_lock(lock);
1708         cfs_list_del_init(&lock->l_cp_ast);
1709         LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
1710         /* save l_completion_ast since it can be changed by
1711          * mds_intent_policy(), see bug 14225 */
1712         completion_callback = lock->l_completion_ast;
1713         lock->l_flags &= ~LDLM_FL_CP_REQD;
1714         unlock_res_and_lock(lock);
1715
1716         if (completion_callback != NULL)
1717                 rc = completion_callback(lock, 0, (void *)arg);
1718         LDLM_LOCK_RELEASE(lock);
1719
1720         RETURN(rc);
1721 }
1722
1723 static int
1724 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1725 {
1726         struct ldlm_cb_set_arg *arg = opaq;
1727         struct ldlm_lock_desc   desc;
1728         int                     rc;
1729         struct ldlm_lock       *lock;
1730         ENTRY;
1731
1732         if (cfs_list_empty(arg->list))
1733                 RETURN(-ENOENT);
1734
1735         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1736         cfs_list_del_init(&lock->l_rk_ast);
1737
1738         /* the desc just pretend to exclusive */
1739         ldlm_lock2desc(lock, &desc);
1740         desc.l_req_mode = LCK_EX;
1741         desc.l_granted_mode = 0;
1742
1743         rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1744         LDLM_LOCK_RELEASE(lock);
1745
1746         RETURN(rc);
1747 }
1748
1749 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1750 {
1751         struct ldlm_cb_set_arg          *arg = opaq;
1752         struct ldlm_glimpse_work        *gl_work;
1753         struct ldlm_lock                *lock;
1754         int                              rc = 0;
1755         ENTRY;
1756
1757         if (cfs_list_empty(arg->list))
1758                 RETURN(-ENOENT);
1759
1760         gl_work = cfs_list_entry(arg->list->next, struct ldlm_glimpse_work,
1761                                  gl_list);
1762         cfs_list_del_init(&gl_work->gl_list);
1763
1764         lock = gl_work->gl_lock;
1765
1766         /* transfer the glimpse descriptor to ldlm_cb_set_arg */
1767         arg->gl_desc = gl_work->gl_desc;
1768
1769         /* invoke the actual glimpse callback */
1770         if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
1771                 rc = 1;
1772
1773         LDLM_LOCK_RELEASE(lock);
1774
1775         if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
1776                 OBD_FREE_PTR(gl_work);
1777
1778         RETURN(rc);
1779 }
1780
1781 int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
1782                       ldlm_desc_ast_t ast_type)
1783 {
1784         struct ldlm_cb_set_arg *arg;
1785         set_producer_func       work_ast_lock;
1786         int                     rc;
1787
1788         if (cfs_list_empty(rpc_list))
1789                 RETURN(0);
1790
1791         OBD_ALLOC_PTR(arg);
1792         if (arg == NULL)
1793                 RETURN(-ENOMEM);
1794
1795         cfs_atomic_set(&arg->restart, 0);
1796         arg->list = rpc_list;
1797
1798         switch (ast_type) {
1799                 case LDLM_WORK_BL_AST:
1800                         arg->type = LDLM_BL_CALLBACK;
1801                         work_ast_lock = ldlm_work_bl_ast_lock;
1802                         break;
1803                 case LDLM_WORK_CP_AST:
1804                         arg->type = LDLM_CP_CALLBACK;
1805                         work_ast_lock = ldlm_work_cp_ast_lock;
1806                         break;
1807                 case LDLM_WORK_REVOKE_AST:
1808                         arg->type = LDLM_BL_CALLBACK;
1809                         work_ast_lock = ldlm_work_revoke_ast_lock;
1810                         break;
1811                 case LDLM_WORK_GL_AST:
1812                         arg->type = LDLM_GL_CALLBACK;
1813                         work_ast_lock = ldlm_work_gl_ast_lock;
1814                         break;
1815                 default:
1816                         LBUG();
1817         }
1818
1819         /* We create a ptlrpc request set with flow control extension.
1820          * This request set will use the work_ast_lock function to produce new
1821          * requests and will send a new request each time one completes in order
1822          * to keep the number of requests in flight to ns_max_parallel_ast */
1823         arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1824                                      work_ast_lock, arg);
1825         if (arg->set == NULL)
1826                 GOTO(out, rc = -ENOMEM);
1827
1828         ptlrpc_set_wait(arg->set);
1829         ptlrpc_set_destroy(arg->set);
1830
1831         rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0;
1832         GOTO(out, rc);
1833 out:
1834         OBD_FREE_PTR(arg);
1835         return rc;
1836 }
1837
1838 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
1839 {
1840         ldlm_reprocess_all(res);
1841         return LDLM_ITER_CONTINUE;
1842 }
1843
1844 static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1845                               cfs_hlist_node_t *hnode, void *arg)
1846 {
1847         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1848         int    rc;
1849
1850         rc = reprocess_one_queue(res, arg);
1851
1852         return rc == LDLM_ITER_STOP;
1853 }
1854
1855 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
1856 {
1857         ENTRY;
1858
1859         if (ns != NULL) {
1860                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
1861                                          ldlm_reprocess_res, NULL);
1862         }
1863         EXIT;
1864 }
1865 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
1866
1867 void ldlm_reprocess_all(struct ldlm_resource *res)
1868 {
1869         CFS_LIST_HEAD(rpc_list);
1870
1871 #ifdef HAVE_SERVER_SUPPORT
1872         int rc;
1873         ENTRY;
1874         /* Local lock trees don't get reprocessed. */
1875         if (ns_is_client(ldlm_res_to_ns(res))) {
1876                 EXIT;
1877                 return;
1878         }
1879
1880 restart:
1881         lock_res(res);
1882         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
1883         if (rc == LDLM_ITER_CONTINUE)
1884                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
1885         unlock_res(res);
1886
1887         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
1888                                LDLM_WORK_CP_AST);
1889         if (rc == -ERESTART) {
1890                 LASSERT(cfs_list_empty(&rpc_list));
1891                 goto restart;
1892         }
1893 #else
1894         ENTRY;
1895         if (!ns_is_client(ldlm_res_to_ns(res))) {
1896                 CERROR("This is client-side-only module, cannot handle "
1897                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1898                 LBUG();
1899         }
1900 #endif
1901         EXIT;
1902 }
1903
1904 void ldlm_cancel_callback(struct ldlm_lock *lock)
1905 {
1906         check_res_locked(lock->l_resource);
1907         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
1908                 lock->l_flags |= LDLM_FL_CANCEL;
1909                 if (lock->l_blocking_ast) {
1910                         // l_check_no_ns_lock(ns);
1911                         unlock_res_and_lock(lock);
1912                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
1913                                              LDLM_CB_CANCELING);
1914                         lock_res_and_lock(lock);
1915                 } else {
1916                         LDLM_DEBUG(lock, "no blocking ast");
1917                 }
1918         }
1919         lock->l_flags |= LDLM_FL_BL_DONE;
1920 }
1921
1922 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
1923 {
1924         if (req->l_resource->lr_type != LDLM_PLAIN &&
1925             req->l_resource->lr_type != LDLM_IBITS)
1926                 return;
1927
1928         cfs_list_del_init(&req->l_sl_policy);
1929         cfs_list_del_init(&req->l_sl_mode);
1930 }
1931
1932 void ldlm_lock_cancel(struct ldlm_lock *lock)
1933 {
1934         struct ldlm_resource *res;
1935         struct ldlm_namespace *ns;
1936         ENTRY;
1937
1938         lock_res_and_lock(lock);
1939
1940         res = lock->l_resource;
1941         ns  = ldlm_res_to_ns(res);
1942
1943         /* Please do not, no matter how tempting, remove this LBUG without
1944          * talking to me first. -phik */
1945         if (lock->l_readers || lock->l_writers) {
1946                 LDLM_ERROR(lock, "lock still has references");
1947                 LBUG();
1948         }
1949
1950         if (lock->l_waited)
1951                 ldlm_del_waiting_lock(lock);
1952
1953         /* Releases cancel callback. */
1954         ldlm_cancel_callback(lock);
1955
1956         /* Yes, second time, just in case it was added again while we were
1957            running with no res lock in ldlm_cancel_callback */
1958         if (lock->l_waited)
1959                 ldlm_del_waiting_lock(lock);
1960
1961         ldlm_resource_unlink_lock(lock);
1962         ldlm_lock_destroy_nolock(lock);
1963
1964         if (lock->l_granted_mode == lock->l_req_mode)
1965                 ldlm_pool_del(&ns->ns_pool, lock);
1966
1967         /* Make sure we will not be called again for same lock what is possible
1968          * if not to zero out lock->l_granted_mode */
1969         lock->l_granted_mode = LCK_MINMODE;
1970         unlock_res_and_lock(lock);
1971
1972         EXIT;
1973 }
1974 EXPORT_SYMBOL(ldlm_lock_cancel);
1975
1976 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
1977 {
1978         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1979         int rc = -EINVAL;
1980         ENTRY;
1981
1982         if (lock) {
1983                 if (lock->l_ast_data == NULL)
1984                         lock->l_ast_data = data;
1985                 if (lock->l_ast_data == data)
1986                         rc = 0;
1987                 LDLM_LOCK_PUT(lock);
1988         }
1989         RETURN(rc);
1990 }
1991 EXPORT_SYMBOL(ldlm_lock_set_data);
1992
1993 struct export_cl_data {
1994         struct obd_export       *ecl_exp;
1995         int                     ecl_loop;
1996 };
1997
1998 int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1999                                     cfs_hlist_node_t *hnode, void *data)
2000
2001 {
2002         struct export_cl_data   *ecl = (struct export_cl_data *)data;
2003         struct obd_export       *exp  = ecl->ecl_exp;
2004         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
2005         struct ldlm_resource *res;
2006
2007         res = ldlm_resource_getref(lock->l_resource);
2008         LDLM_LOCK_GET(lock);
2009
2010         LDLM_DEBUG(lock, "export %p", exp);
2011         ldlm_res_lvbo_update(res, NULL, 1);
2012         ldlm_lock_cancel(lock);
2013         ldlm_reprocess_all(res);
2014         ldlm_resource_putref(res);
2015         LDLM_LOCK_RELEASE(lock);
2016
2017         ecl->ecl_loop++;
2018         if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
2019                 CDEBUG(D_INFO,
2020                        "Cancel lock %p for export %p (loop %d), still have "
2021                        "%d locks left on hash table.\n",
2022                        lock, exp, ecl->ecl_loop,
2023                        cfs_atomic_read(&hs->hs_count));
2024         }
2025
2026         return 0;
2027 }
2028
2029 void ldlm_cancel_locks_for_export(struct obd_export *exp)
2030 {
2031         struct export_cl_data   ecl = {
2032                 .ecl_exp        = exp,
2033                 .ecl_loop       = 0,
2034         };
2035
2036         cfs_hash_for_each_empty(exp->exp_lock_hash,
2037                                 ldlm_cancel_locks_for_export_cb, &ecl);
2038 }
2039
2040 /**
2041  * Downgrade an exclusive lock.
2042  *
2043  * A fast variant of ldlm_lock_convert for convertion of exclusive
2044  * locks. The convertion is always successful.
2045  *
2046  * \param lock A lock to convert
2047  * \param new_mode new lock mode
2048  */
2049 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
2050 {
2051         ENTRY;
2052
2053         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
2054         LASSERT(new_mode == LCK_COS);
2055
2056         lock_res_and_lock(lock);
2057         ldlm_resource_unlink_lock(lock);
2058         /*
2059          * Remove the lock from pool as it will be added again in
2060          * ldlm_grant_lock() called below.
2061          */
2062         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
2063
2064         lock->l_req_mode = new_mode;
2065         ldlm_grant_lock(lock, NULL);
2066         unlock_res_and_lock(lock);
2067         ldlm_reprocess_all(lock->l_resource);
2068
2069         EXIT;
2070 }
2071 EXPORT_SYMBOL(ldlm_lock_downgrade);
2072
2073 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
2074                                         __u32 *flags)
2075 {
2076         CFS_LIST_HEAD(rpc_list);
2077         struct ldlm_resource *res;
2078         struct ldlm_namespace *ns;
2079         int granted = 0;
2080 #ifdef HAVE_SERVER_SUPPORT
2081         int old_mode;
2082         struct sl_insert_point prev;
2083 #endif
2084         struct ldlm_interval *node;
2085         ENTRY;
2086
2087         if (new_mode == lock->l_granted_mode) { // No changes? Just return.
2088                 *flags |= LDLM_FL_BLOCK_GRANTED;
2089                 RETURN(lock->l_resource);
2090         }
2091
2092         /* I can't check the type of lock here because the bitlock of lock
2093          * is not held here, so do the allocation blindly. -jay */
2094         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
2095         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
2096                 RETURN(NULL);
2097
2098         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
2099                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
2100
2101         lock_res_and_lock(lock);
2102
2103         res = lock->l_resource;
2104         ns  = ldlm_res_to_ns(res);
2105
2106 #ifdef HAVE_SERVER_SUPPORT
2107         old_mode = lock->l_req_mode;
2108 #endif
2109         lock->l_req_mode = new_mode;
2110         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
2111 #ifdef HAVE_SERVER_SUPPORT
2112                 /* remember the lock position where the lock might be
2113                  * added back to the granted list later and also
2114                  * remember the join mode for skiplist fixing. */
2115                 prev.res_link = lock->l_res_link.prev;
2116                 prev.mode_link = lock->l_sl_mode.prev;
2117                 prev.policy_link = lock->l_sl_policy.prev;
2118 #endif
2119                 ldlm_resource_unlink_lock(lock);
2120         } else {
2121                 ldlm_resource_unlink_lock(lock);
2122                 if (res->lr_type == LDLM_EXTENT) {
2123                         /* FIXME: ugly code, I have to attach the lock to a
2124                          * interval node again since perhaps it will be granted
2125                          * soon */
2126                         CFS_INIT_LIST_HEAD(&node->li_group);
2127                         ldlm_interval_attach(node, lock);
2128                         node = NULL;
2129                 }
2130         }
2131
2132         /*
2133          * Remove old lock from the pool before adding the lock with new
2134          * mode below in ->policy()
2135          */
2136         ldlm_pool_del(&ns->ns_pool, lock);
2137
2138         /* If this is a local resource, put it on the appropriate list. */
2139         if (ns_is_client(ldlm_res_to_ns(res))) {
2140                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
2141                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
2142                 } else {
2143                         /* This should never happen, because of the way the
2144                          * server handles conversions. */
2145                         LDLM_ERROR(lock, "Erroneous flags %x on local lock\n",
2146                                    *flags);
2147                         LBUG();
2148
2149                         ldlm_grant_lock(lock, &rpc_list);
2150                         granted = 1;
2151                         /* FIXME: completion handling not with lr_lock held ! */
2152                         if (lock->l_completion_ast)
2153                                 lock->l_completion_ast(lock, 0, NULL);
2154                 }
2155 #ifdef HAVE_SERVER_SUPPORT
2156         } else {
2157                 int rc;
2158                 ldlm_error_t err;
2159                 __u64 pflags = 0;
2160                 ldlm_processing_policy policy;
2161                 policy = ldlm_processing_policy_table[res->lr_type];
2162                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
2163                 if (rc == LDLM_ITER_STOP) {
2164                         lock->l_req_mode = old_mode;
2165                         if (res->lr_type == LDLM_EXTENT)
2166                                 ldlm_extent_add_lock(res, lock);
2167                         else
2168                                 ldlm_granted_list_add_lock(lock, &prev);
2169
2170                         res = NULL;
2171                 } else {
2172                         *flags |= LDLM_FL_BLOCK_GRANTED;
2173                         granted = 1;
2174                 }
2175         }
2176 #else
2177         } else {
2178                 CERROR("This is client-side-only module, cannot handle "
2179                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2180                 LBUG();
2181         }
2182 #endif
2183         unlock_res_and_lock(lock);
2184
2185         if (granted)
2186                 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2187         if (node)
2188                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2189         RETURN(res);
2190 }
2191 EXPORT_SYMBOL(ldlm_lock_convert);
2192
2193 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2194 {
2195         struct ldlm_lock *lock;
2196
2197         if (!((libcfs_debug | D_ERROR) & level))
2198                 return;
2199
2200         lock = ldlm_handle2lock(lockh);
2201         if (lock == NULL)
2202                 return;
2203
2204         LDLM_DEBUG_LIMIT(level, lock, "###");
2205
2206         LDLM_LOCK_PUT(lock);
2207 }
2208 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2209
2210 void _ldlm_lock_debug(struct ldlm_lock *lock,
2211                       struct libcfs_debug_msg_data *msgdata,
2212                       const char *fmt, ...)
2213 {
2214         va_list args;
2215         struct obd_export *exp = lock->l_export;
2216         struct ldlm_resource *resource = lock->l_resource;
2217         char *nid = "local";
2218
2219         va_start(args, fmt);
2220
2221         if (exp && exp->exp_connection) {
2222                 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2223         } else if (exp && exp->exp_obd != NULL) {
2224                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2225                 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2226         }
2227
2228         if (resource == NULL) {
2229                 libcfs_debug_vmsg2(msgdata, fmt, args,
2230                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2231                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2232                        "remote: "LPX64" expref: %d pid: %u timeout: %lu "
2233                        "lvb_type: %d\n",
2234                        lock,
2235                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2236                        lock->l_readers, lock->l_writers,
2237                        ldlm_lockname[lock->l_granted_mode],
2238                        ldlm_lockname[lock->l_req_mode],
2239                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2240                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2241                        lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2242                 va_end(args);
2243                 return;
2244         }
2245
2246         switch (resource->lr_type) {
2247         case LDLM_EXTENT:
2248                 libcfs_debug_vmsg2(msgdata, fmt, args,
2249                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2250                        "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64
2251                        "] (req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote:"
2252                        " "LPX64" expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2253                        ldlm_lock_to_ns_name(lock), lock,
2254                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2255                        lock->l_readers, lock->l_writers,
2256                        ldlm_lockname[lock->l_granted_mode],
2257                        ldlm_lockname[lock->l_req_mode],
2258                        resource->lr_name.name[0],
2259                        resource->lr_name.name[1],
2260                        cfs_atomic_read(&resource->lr_refcount),
2261                        ldlm_typename[resource->lr_type],
2262                        lock->l_policy_data.l_extent.start,
2263                        lock->l_policy_data.l_extent.end,
2264                        lock->l_req_extent.start, lock->l_req_extent.end,
2265                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2266                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2267                        lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2268                 break;
2269
2270         case LDLM_FLOCK:
2271                 libcfs_debug_vmsg2(msgdata, fmt, args,
2272                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2273                        "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d "
2274                        "["LPU64"->"LPU64"] flags: "LPX64" nid: %s remote: "LPX64
2275                        " expref: %d pid: %u timeout: %lu\n",
2276                        ldlm_lock_to_ns_name(lock), lock,
2277                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2278                        lock->l_readers, lock->l_writers,
2279                        ldlm_lockname[lock->l_granted_mode],
2280                        ldlm_lockname[lock->l_req_mode],
2281                        resource->lr_name.name[0],
2282                        resource->lr_name.name[1],
2283                        cfs_atomic_read(&resource->lr_refcount),
2284                        ldlm_typename[resource->lr_type],
2285                        lock->l_policy_data.l_flock.pid,
2286                        lock->l_policy_data.l_flock.start,
2287                        lock->l_policy_data.l_flock.end,
2288                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2289                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2290                        lock->l_pid, lock->l_callback_timeout);
2291                 break;
2292
2293         case LDLM_IBITS:
2294                 libcfs_debug_vmsg2(msgdata, fmt, args,
2295                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2296                        "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "
2297                        "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2298                        "pid: %u timeout: %lu lvb_type: %d\n",
2299                        ldlm_lock_to_ns_name(lock),
2300                        lock, lock->l_handle.h_cookie,
2301                        cfs_atomic_read (&lock->l_refc),
2302                        lock->l_readers, lock->l_writers,
2303                        ldlm_lockname[lock->l_granted_mode],
2304                        ldlm_lockname[lock->l_req_mode],
2305                        resource->lr_name.name[0],
2306                        resource->lr_name.name[1],
2307                        lock->l_policy_data.l_inodebits.bits,
2308                        cfs_atomic_read(&resource->lr_refcount),
2309                        ldlm_typename[resource->lr_type],
2310                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2311                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2312                        lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2313                 break;
2314
2315         default:
2316                 libcfs_debug_vmsg2(msgdata, fmt, args,
2317                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2318                        "res: "LPU64"/"LPU64" rrc: %d type: %s flags: "LPX64" "
2319                        "nid: %s remote: "LPX64" expref: %d pid: %u timeout: %lu"
2320                        "lvb_type: %d\n",
2321                        ldlm_lock_to_ns_name(lock),
2322                        lock, lock->l_handle.h_cookie,
2323                        cfs_atomic_read (&lock->l_refc),
2324                        lock->l_readers, lock->l_writers,
2325                        ldlm_lockname[lock->l_granted_mode],
2326                        ldlm_lockname[lock->l_req_mode],
2327                        resource->lr_name.name[0],
2328                        resource->lr_name.name[1],
2329                        cfs_atomic_read(&resource->lr_refcount),
2330                        ldlm_typename[resource->lr_type],
2331                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2332                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2333                        lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2334                 break;
2335         }
2336         va_end(args);
2337 }
2338 EXPORT_SYMBOL(_ldlm_lock_debug);