Whamcloud - gitweb
34ac565047118691abcb3016a49a9d226b516df8
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #ifdef __KERNEL__
45 # include <libcfs/libcfs.h>
46 # include <linux/lustre_intent.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include "ldlm_internal.h"
53
54 /* lock types */
55 char *ldlm_lockname[] = {
56         [0] "--",
57         [LCK_EX] "EX",
58         [LCK_PW] "PW",
59         [LCK_PR] "PR",
60         [LCK_CW] "CW",
61         [LCK_CR] "CR",
62         [LCK_NL] "NL",
63         [LCK_GROUP] "GROUP",
64         [LCK_COS] "COS"
65 };
66
67 char *ldlm_typename[] = {
68         [LDLM_PLAIN] "PLN",
69         [LDLM_EXTENT] "EXT",
70         [LDLM_FLOCK] "FLK",
71         [LDLM_IBITS] "IBT",
72 };
73
74 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
75         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
76         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
77         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire18_to_local,
78         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
79 };
80
81 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
82         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
83         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
84         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire21_to_local,
85         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
86 };
87
88 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
89         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_local_to_wire,
90         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_local_to_wire,
91         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_local_to_wire,
92         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_local_to_wire,
93 };
94
95 /**
96  * Converts lock policy from local format to on the wire lock_desc format
97  */
98 void ldlm_convert_policy_to_wire(ldlm_type_t type,
99                                  const ldlm_policy_data_t *lpolicy,
100                                  ldlm_wire_policy_data_t *wpolicy)
101 {
102         ldlm_policy_local_to_wire_t convert;
103
104         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
105
106         convert(lpolicy, wpolicy);
107 }
108
109 /**
110  * Converts lock policy from on the wire lock_desc format to local format
111  */
112 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
113                                   const ldlm_wire_policy_data_t *wpolicy,
114                                   ldlm_policy_data_t *lpolicy)
115 {
116         ldlm_policy_wire_to_local_t convert;
117         int new_client;
118
119         /** some badnes for 2.0.0 clients, but 2.0.0 isn't supported */
120         new_client = (exp->exp_connect_flags & OBD_CONNECT_FULL20) != 0;
121         if (new_client)
122                convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
123         else
124                convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
125
126         convert(wpolicy, lpolicy);
127 }
128
129 char *ldlm_it2str(int it)
130 {
131         switch (it) {
132         case IT_OPEN:
133                 return "open";
134         case IT_CREAT:
135                 return "creat";
136         case (IT_OPEN | IT_CREAT):
137                 return "open|creat";
138         case IT_READDIR:
139                 return "readdir";
140         case IT_GETATTR:
141                 return "getattr";
142         case IT_LOOKUP:
143                 return "lookup";
144         case IT_UNLINK:
145                 return "unlink";
146         case IT_GETXATTR:
147                 return "getxattr";
148         case IT_LAYOUT:
149                 return "layout";
150         default:
151                 CERROR("Unknown intent %d\n", it);
152                 return "UNKNOWN";
153         }
154 }
155
156 extern cfs_mem_cache_t *ldlm_lock_slab;
157
158 #ifdef HAVE_SERVER_SUPPORT
159 static ldlm_processing_policy ldlm_processing_policy_table[] = {
160         [LDLM_PLAIN] ldlm_process_plain_lock,
161         [LDLM_EXTENT] ldlm_process_extent_lock,
162 # ifdef __KERNEL__
163         [LDLM_FLOCK] ldlm_process_flock_lock,
164 # endif
165         [LDLM_IBITS] ldlm_process_inodebits_lock,
166 };
167
168 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
169 {
170         return ldlm_processing_policy_table[res->lr_type];
171 }
172 #endif /* HAVE_SERVER_SUPPORT */
173
174 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
175 {
176         ns->ns_policy = arg;
177 }
178
179 /*
180  * REFCOUNTED LOCK OBJECTS
181  */
182
183
184 /*
185  * Lock refcounts, during creation:
186  *   - one special one for allocation, dec'd only once in destroy
187  *   - one for being a lock that's in-use
188  *   - one for the addref associated with a new lock
189  */
190 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
191 {
192         cfs_atomic_inc(&lock->l_refc);
193         return lock;
194 }
195
196 void ldlm_lock_put(struct ldlm_lock *lock)
197 {
198         ENTRY;
199
200         LASSERT(lock->l_resource != LP_POISON);
201         LASSERT(cfs_atomic_read(&lock->l_refc) > 0);
202         if (cfs_atomic_dec_and_test(&lock->l_refc)) {
203                 struct ldlm_resource *res;
204
205                 LDLM_DEBUG(lock,
206                            "final lock_put on destroyed lock, freeing it.");
207
208                 res = lock->l_resource;
209                 LASSERT(lock->l_destroyed);
210                 LASSERT(cfs_list_empty(&lock->l_res_link));
211                 LASSERT(cfs_list_empty(&lock->l_pending_chain));
212
213                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
214                                      LDLM_NSS_LOCKS);
215                 lu_ref_del(&res->lr_reference, "lock", lock);
216                 ldlm_resource_putref(res);
217                 lock->l_resource = NULL;
218                 if (lock->l_export) {
219                         class_export_lock_put(lock->l_export, lock);
220                         lock->l_export = NULL;
221                 }
222
223                 if (lock->l_lvb_data != NULL)
224                         OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
225
226                 ldlm_interval_free(ldlm_interval_detach(lock));
227                 lu_ref_fini(&lock->l_reference);
228                 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
229         }
230
231         EXIT;
232 }
233
234 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
235 {
236         int rc = 0;
237         if (!cfs_list_empty(&lock->l_lru)) {
238                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
239
240                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
241                 cfs_list_del_init(&lock->l_lru);
242                 if (lock->l_flags & LDLM_FL_SKIPPED)
243                         lock->l_flags &= ~LDLM_FL_SKIPPED;
244                 LASSERT(ns->ns_nr_unused > 0);
245                 ns->ns_nr_unused--;
246                 rc = 1;
247         }
248         return rc;
249 }
250
251 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
252 {
253         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
254         int rc;
255
256         ENTRY;
257         if (lock->l_ns_srv) {
258                 LASSERT(cfs_list_empty(&lock->l_lru));
259                 RETURN(0);
260         }
261
262         cfs_spin_lock(&ns->ns_lock);
263         rc = ldlm_lock_remove_from_lru_nolock(lock);
264         cfs_spin_unlock(&ns->ns_lock);
265         EXIT;
266         return rc;
267 }
268
269 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
270 {
271         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
272
273         lock->l_last_used = cfs_time_current();
274         LASSERT(cfs_list_empty(&lock->l_lru));
275         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
276         cfs_list_add_tail(&lock->l_lru, &ns->ns_unused_list);
277         LASSERT(ns->ns_nr_unused >= 0);
278         ns->ns_nr_unused++;
279 }
280
281 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
282 {
283         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
284
285         ENTRY;
286         cfs_spin_lock(&ns->ns_lock);
287         ldlm_lock_add_to_lru_nolock(lock);
288         cfs_spin_unlock(&ns->ns_lock);
289         EXIT;
290 }
291
292 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
293 {
294         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
295
296         ENTRY;
297         if (lock->l_ns_srv) {
298                 LASSERT(cfs_list_empty(&lock->l_lru));
299                 EXIT;
300                 return;
301         }
302
303         cfs_spin_lock(&ns->ns_lock);
304         if (!cfs_list_empty(&lock->l_lru)) {
305                 ldlm_lock_remove_from_lru_nolock(lock);
306                 ldlm_lock_add_to_lru_nolock(lock);
307         }
308         cfs_spin_unlock(&ns->ns_lock);
309         EXIT;
310 }
311
312 /* This used to have a 'strict' flag, which recovery would use to mark an
313  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
314  * shall explain why it's gone: with the new hash table scheme, once you call
315  * ldlm_lock_destroy, you can never drop your final references on this lock.
316  * Because it's not in the hash table anymore.  -phil */
317 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
318 {
319         ENTRY;
320
321         if (lock->l_readers || lock->l_writers) {
322                 LDLM_ERROR(lock, "lock still has references");
323                 LBUG();
324         }
325
326         if (!cfs_list_empty(&lock->l_res_link)) {
327                 LDLM_ERROR(lock, "lock still on resource");
328                 LBUG();
329         }
330
331         if (lock->l_destroyed) {
332                 LASSERT(cfs_list_empty(&lock->l_lru));
333                 EXIT;
334                 return 0;
335         }
336         lock->l_destroyed = 1;
337
338         if (lock->l_export && lock->l_export->exp_lock_hash &&
339             !cfs_hlist_unhashed(&lock->l_exp_hash))
340                 cfs_hash_del(lock->l_export->exp_lock_hash,
341                              &lock->l_remote_handle, &lock->l_exp_hash);
342
343         ldlm_lock_remove_from_lru(lock);
344         class_handle_unhash(&lock->l_handle);
345
346 #if 0
347         /* Wake anyone waiting for this lock */
348         /* FIXME: I should probably add yet another flag, instead of using
349          * l_export to only call this on clients */
350         if (lock->l_export)
351                 class_export_put(lock->l_export);
352         lock->l_export = NULL;
353         if (lock->l_export && lock->l_completion_ast)
354                 lock->l_completion_ast(lock, 0);
355 #endif
356         EXIT;
357         return 1;
358 }
359
360 void ldlm_lock_destroy(struct ldlm_lock *lock)
361 {
362         int first;
363         ENTRY;
364         lock_res_and_lock(lock);
365         first = ldlm_lock_destroy_internal(lock);
366         unlock_res_and_lock(lock);
367
368         /* drop reference from hashtable only for first destroy */
369         if (first) {
370                 lu_ref_del(&lock->l_reference, "hash", lock);
371                 LDLM_LOCK_RELEASE(lock);
372         }
373         EXIT;
374 }
375
376 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
377 {
378         int first;
379         ENTRY;
380         first = ldlm_lock_destroy_internal(lock);
381         /* drop reference from hashtable only for first destroy */
382         if (first) {
383                 lu_ref_del(&lock->l_reference, "hash", lock);
384                 LDLM_LOCK_RELEASE(lock);
385         }
386         EXIT;
387 }
388
389 /* this is called by portals_handle2object with the handle lock taken */
390 static void lock_handle_addref(void *lock)
391 {
392         LDLM_LOCK_GET((struct ldlm_lock *)lock);
393 }
394
395 static void lock_handle_free(void *lock, int size)
396 {
397         LASSERT(size == sizeof(struct ldlm_lock));
398         OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
399 }
400
401 struct portals_handle_ops lock_handle_ops = {
402         .hop_addref = lock_handle_addref,
403         .hop_free   = lock_handle_free,
404 };
405
406 /*
407  * usage: pass in a resource on which you have done ldlm_resource_get
408  *        new lock will take over the refcount.
409  * returns: lock with refcount 2 - one for current caller and one for remote
410  */
411 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
412 {
413         struct ldlm_lock *lock;
414         ENTRY;
415
416         if (resource == NULL)
417                 LBUG();
418
419         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, CFS_ALLOC_IO);
420         if (lock == NULL)
421                 RETURN(NULL);
422
423         cfs_spin_lock_init(&lock->l_lock);
424         lock->l_resource = resource;
425         lu_ref_add(&resource->lr_reference, "lock", lock);
426
427         cfs_atomic_set(&lock->l_refc, 2);
428         CFS_INIT_LIST_HEAD(&lock->l_res_link);
429         CFS_INIT_LIST_HEAD(&lock->l_lru);
430         CFS_INIT_LIST_HEAD(&lock->l_pending_chain);
431         CFS_INIT_LIST_HEAD(&lock->l_bl_ast);
432         CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
433         CFS_INIT_LIST_HEAD(&lock->l_rk_ast);
434         cfs_waitq_init(&lock->l_waitq);
435         lock->l_blocking_lock = NULL;
436         CFS_INIT_LIST_HEAD(&lock->l_sl_mode);
437         CFS_INIT_LIST_HEAD(&lock->l_sl_policy);
438         CFS_INIT_HLIST_NODE(&lock->l_exp_hash);
439
440         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
441                              LDLM_NSS_LOCKS);
442         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
443         class_handle_hash(&lock->l_handle, &lock_handle_ops);
444
445         lu_ref_init(&lock->l_reference);
446         lu_ref_add(&lock->l_reference, "hash", lock);
447         lock->l_callback_timeout = 0;
448
449 #if LUSTRE_TRACKS_LOCK_EXP_REFS
450         CFS_INIT_LIST_HEAD(&lock->l_exp_refs_link);
451         lock->l_exp_refs_nr = 0;
452         lock->l_exp_refs_target = NULL;
453 #endif
454         CFS_INIT_LIST_HEAD(&lock->l_exp_list);
455
456         RETURN(lock);
457 }
458
459 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
460                               const struct ldlm_res_id *new_resid)
461 {
462         struct ldlm_resource *oldres = lock->l_resource;
463         struct ldlm_resource *newres;
464         int type;
465         ENTRY;
466
467         LASSERT(ns_is_client(ns));
468
469         lock_res_and_lock(lock);
470         if (memcmp(new_resid, &lock->l_resource->lr_name,
471                    sizeof(lock->l_resource->lr_name)) == 0) {
472                 /* Nothing to do */
473                 unlock_res_and_lock(lock);
474                 RETURN(0);
475         }
476
477         LASSERT(new_resid->name[0] != 0);
478
479         /* This function assumes that the lock isn't on any lists */
480         LASSERT(cfs_list_empty(&lock->l_res_link));
481
482         type = oldres->lr_type;
483         unlock_res_and_lock(lock);
484
485         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
486         if (newres == NULL)
487                 RETURN(-ENOMEM);
488
489         lu_ref_add(&newres->lr_reference, "lock", lock);
490         /*
491          * To flip the lock from the old to the new resource, lock, oldres and
492          * newres have to be locked. Resource spin-locks are nested within
493          * lock->l_lock, and are taken in the memory address order to avoid
494          * dead-locks.
495          */
496         cfs_spin_lock(&lock->l_lock);
497         oldres = lock->l_resource;
498         if (oldres < newres) {
499                 lock_res(oldres);
500                 lock_res_nested(newres, LRT_NEW);
501         } else {
502                 lock_res(newres);
503                 lock_res_nested(oldres, LRT_NEW);
504         }
505         LASSERT(memcmp(new_resid, &oldres->lr_name,
506                        sizeof oldres->lr_name) != 0);
507         lock->l_resource = newres;
508         unlock_res(oldres);
509         unlock_res_and_lock(lock);
510
511         /* ...and the flowers are still standing! */
512         lu_ref_del(&oldres->lr_reference, "lock", lock);
513         ldlm_resource_putref(oldres);
514
515         RETURN(0);
516 }
517
518 /*
519  *  HANDLES
520  */
521
522 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
523 {
524         lockh->cookie = lock->l_handle.h_cookie;
525 }
526
527 /* if flags: atomically get the lock and set the flags.
528  *           Return NULL if flag already set
529  */
530
531 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
532                                      int flags)
533 {
534         struct ldlm_lock *lock;
535         ENTRY;
536
537         LASSERT(handle);
538
539         lock = class_handle2object(handle->cookie);
540         if (lock == NULL)
541                 RETURN(NULL);
542
543         /* It's unlikely but possible that someone marked the lock as
544          * destroyed after we did handle2object on it */
545         if (flags == 0 && !lock->l_destroyed) {
546                 lu_ref_add(&lock->l_reference, "handle", cfs_current());
547                 RETURN(lock);
548         }
549
550         lock_res_and_lock(lock);
551
552         LASSERT(lock->l_resource != NULL);
553
554         lu_ref_add_atomic(&lock->l_reference, "handle", cfs_current());
555         if (unlikely(lock->l_destroyed)) {
556                 unlock_res_and_lock(lock);
557                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
558                 LDLM_LOCK_PUT(lock);
559                 RETURN(NULL);
560         }
561
562         if (flags && (lock->l_flags & flags)) {
563                 unlock_res_and_lock(lock);
564                 LDLM_LOCK_PUT(lock);
565                 RETURN(NULL);
566         }
567
568         if (flags)
569                 lock->l_flags |= flags;
570
571         unlock_res_and_lock(lock);
572         RETURN(lock);
573 }
574
575 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
576 {
577         struct obd_export *exp = lock->l_export?:lock->l_conn_export;
578         /* INODEBITS_INTEROP: If the other side does not support
579          * inodebits, reply with a plain lock descriptor.
580          */
581         if ((lock->l_resource->lr_type == LDLM_IBITS) &&
582             (exp && !(exp->exp_connect_flags & OBD_CONNECT_IBITS))) {
583                 /* Make sure all the right bits are set in this lock we
584                    are going to pass to client */
585                 LASSERTF(lock->l_policy_data.l_inodebits.bits ==
586                          (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
587                           MDS_INODELOCK_LAYOUT),
588                          "Inappropriate inode lock bits during "
589                          "conversion " LPU64 "\n",
590                          lock->l_policy_data.l_inodebits.bits);
591
592                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
593                 desc->l_resource.lr_type = LDLM_PLAIN;
594
595                 /* Convert "new" lock mode to something old client can
596                    understand */
597                 if ((lock->l_req_mode == LCK_CR) ||
598                     (lock->l_req_mode == LCK_CW))
599                         desc->l_req_mode = LCK_PR;
600                 else
601                         desc->l_req_mode = lock->l_req_mode;
602                 if ((lock->l_granted_mode == LCK_CR) ||
603                     (lock->l_granted_mode == LCK_CW)) {
604                         desc->l_granted_mode = LCK_PR;
605                 } else {
606                         /* We never grant PW/EX locks to clients */
607                         LASSERT((lock->l_granted_mode != LCK_PW) &&
608                                 (lock->l_granted_mode != LCK_EX));
609                         desc->l_granted_mode = lock->l_granted_mode;
610                 }
611
612                 /* We do not copy policy here, because there is no
613                    policy for plain locks */
614         } else {
615                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
616                 desc->l_req_mode = lock->l_req_mode;
617                 desc->l_granted_mode = lock->l_granted_mode;
618                 ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
619                                             &lock->l_policy_data,
620                                             &desc->l_policy_data);
621         }
622 }
623
624 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
625                            cfs_list_t *work_list)
626 {
627         if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
628                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
629                 lock->l_flags |= LDLM_FL_AST_SENT;
630                 /* If the enqueuing client said so, tell the AST recipient to
631                  * discard dirty data, rather than writing back. */
632                 if (new->l_flags & LDLM_AST_DISCARD_DATA)
633                         lock->l_flags |= LDLM_FL_DISCARD_DATA;
634                 LASSERT(cfs_list_empty(&lock->l_bl_ast));
635                 cfs_list_add(&lock->l_bl_ast, work_list);
636                 LDLM_LOCK_GET(lock);
637                 LASSERT(lock->l_blocking_lock == NULL);
638                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
639         }
640 }
641
642 void ldlm_add_cp_work_item(struct ldlm_lock *lock, cfs_list_t *work_list)
643 {
644         if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
645                 lock->l_flags |= LDLM_FL_CP_REQD;
646                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
647                 LASSERT(cfs_list_empty(&lock->l_cp_ast));
648                 cfs_list_add(&lock->l_cp_ast, work_list);
649                 LDLM_LOCK_GET(lock);
650         }
651 }
652
653 /* must be called with lr_lock held */
654 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
655                             cfs_list_t *work_list)
656 {
657         ENTRY;
658         check_res_locked(lock->l_resource);
659         if (new)
660                 ldlm_add_bl_work_item(lock, new, work_list);
661         else
662                 ldlm_add_cp_work_item(lock, work_list);
663         EXIT;
664 }
665
666 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
667 {
668         struct ldlm_lock *lock;
669
670         lock = ldlm_handle2lock(lockh);
671         LASSERT(lock != NULL);
672         ldlm_lock_addref_internal(lock, mode);
673         LDLM_LOCK_PUT(lock);
674 }
675
676 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
677 {
678         ldlm_lock_remove_from_lru(lock);
679         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
680                 lock->l_readers++;
681                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
682         }
683         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
684                 lock->l_writers++;
685                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
686         }
687         LDLM_LOCK_GET(lock);
688         lu_ref_add_atomic(&lock->l_reference, "user", lock);
689         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
690 }
691
692 /**
693  * Attempts to addref a lock, and fails if lock is already LDLM_FL_CBPENDING
694  * or destroyed.
695  *
696  * \retval 0 success, lock was addref-ed
697  *
698  * \retval -EAGAIN lock is being canceled.
699  */
700 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
701 {
702         struct ldlm_lock *lock;
703         int               result;
704
705         result = -EAGAIN;
706         lock = ldlm_handle2lock(lockh);
707         if (lock != NULL) {
708                 lock_res_and_lock(lock);
709                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
710                     !(lock->l_flags & LDLM_FL_CBPENDING)) {
711                         ldlm_lock_addref_internal_nolock(lock, mode);
712                         result = 0;
713                 }
714                 unlock_res_and_lock(lock);
715                 LDLM_LOCK_PUT(lock);
716         }
717         return result;
718 }
719
720 /* only called for local locks */
721 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
722 {
723         lock_res_and_lock(lock);
724         ldlm_lock_addref_internal_nolock(lock, mode);
725         unlock_res_and_lock(lock);
726 }
727
728 /* only called in ldlm_flock_destroy and for local locks.
729  *  * for LDLM_FLOCK type locks, l_blocking_ast is null, and
730  *   * ldlm_lock_remove_from_lru() does nothing, it is safe
731  *    * for ldlm_flock_destroy usage by dropping some code */
732 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
733 {
734         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
735         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
736                 LASSERT(lock->l_readers > 0);
737                 lu_ref_del(&lock->l_reference, "reader", lock);
738                 lock->l_readers--;
739         }
740         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
741                 LASSERT(lock->l_writers > 0);
742                 lu_ref_del(&lock->l_reference, "writer", lock);
743                 lock->l_writers--;
744         }
745
746         lu_ref_del(&lock->l_reference, "user", lock);
747         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
748 }
749
750 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
751 {
752         struct ldlm_namespace *ns;
753         ENTRY;
754
755         lock_res_and_lock(lock);
756
757         ns = ldlm_lock_to_ns(lock);
758
759         ldlm_lock_decref_internal_nolock(lock, mode);
760
761         if (lock->l_flags & LDLM_FL_LOCAL &&
762             !lock->l_readers && !lock->l_writers) {
763                 /* If this is a local lock on a server namespace and this was
764                  * the last reference, cancel the lock. */
765                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
766                 lock->l_flags |= LDLM_FL_CBPENDING;
767         }
768
769         if (!lock->l_readers && !lock->l_writers &&
770             (lock->l_flags & LDLM_FL_CBPENDING)) {
771                 /* If we received a blocked AST and this was the last reference,
772                  * run the callback. */
773                 if (lock->l_ns_srv && lock->l_export)
774                         CERROR("FL_CBPENDING set on non-local lock--just a "
775                                "warning\n");
776
777                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
778
779                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
780                 ldlm_lock_remove_from_lru(lock);
781                 unlock_res_and_lock(lock);
782
783                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
784                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
785
786                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
787                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
788                         ldlm_handle_bl_callback(ns, NULL, lock);
789         } else if (ns_is_client(ns) &&
790                    !lock->l_readers && !lock->l_writers &&
791                    !(lock->l_flags & LDLM_FL_NO_LRU) &&
792                    !(lock->l_flags & LDLM_FL_BL_AST)) {
793
794                 LDLM_DEBUG(lock, "add lock into lru list");
795
796                 /* If this is a client-side namespace and this was the last
797                  * reference, put it on the LRU. */
798                 ldlm_lock_add_to_lru(lock);
799                 unlock_res_and_lock(lock);
800
801                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
802                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
803
804                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
805                  * are not supported by the server, otherwise, it is done on
806                  * enqueue. */
807                 if (!exp_connect_cancelset(lock->l_conn_export) &&
808                     !ns_connect_lru_resize(ns))
809                         ldlm_cancel_lru(ns, 0, LDLM_ASYNC, 0);
810         } else {
811                 LDLM_DEBUG(lock, "do not add lock into lru list");
812                 unlock_res_and_lock(lock);
813         }
814
815         EXIT;
816 }
817
818 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
819 {
820         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
821         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
822         ldlm_lock_decref_internal(lock, mode);
823         LDLM_LOCK_PUT(lock);
824 }
825
826 /* This will drop a lock reference and mark it for destruction, but will not
827  * necessarily cancel the lock before returning. */
828 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
829 {
830         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
831         ENTRY;
832
833         LASSERT(lock != NULL);
834
835         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
836         lock_res_and_lock(lock);
837         lock->l_flags |= LDLM_FL_CBPENDING;
838         unlock_res_and_lock(lock);
839         ldlm_lock_decref_internal(lock, mode);
840         LDLM_LOCK_PUT(lock);
841 }
842
843 struct sl_insert_point {
844         cfs_list_t *res_link;
845         cfs_list_t *mode_link;
846         cfs_list_t *policy_link;
847 };
848
849 /*
850  * search_granted_lock
851  *
852  * Description:
853  *      Finds a position to insert the new lock.
854  * Parameters:
855  *      queue [input]:  the granted list where search acts on;
856  *      req [input]:    the lock whose position to be located;
857  *      prev [output]:  positions within 3 lists to insert @req to
858  * Return Value:
859  *      filled @prev
860  * NOTE: called by
861  *  - ldlm_grant_lock_with_skiplist
862  */
863 static void search_granted_lock(cfs_list_t *queue,
864                                 struct ldlm_lock *req,
865                                 struct sl_insert_point *prev)
866 {
867         cfs_list_t *tmp;
868         struct ldlm_lock *lock, *mode_end, *policy_end;
869         ENTRY;
870
871         cfs_list_for_each(tmp, queue) {
872                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
873
874                 mode_end = cfs_list_entry(lock->l_sl_mode.prev,
875                                           struct ldlm_lock, l_sl_mode);
876
877                 if (lock->l_req_mode != req->l_req_mode) {
878                         /* jump to last lock of mode group */
879                         tmp = &mode_end->l_res_link;
880                         continue;
881                 }
882
883                 /* suitable mode group is found */
884                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
885                         /* insert point is last lock of the mode group */
886                         prev->res_link = &mode_end->l_res_link;
887                         prev->mode_link = &mode_end->l_sl_mode;
888                         prev->policy_link = &req->l_sl_policy;
889                         EXIT;
890                         return;
891                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
892                         for (;;) {
893                                 policy_end =
894                                         cfs_list_entry(lock->l_sl_policy.prev,
895                                                        struct ldlm_lock,
896                                                        l_sl_policy);
897
898                                 if (lock->l_policy_data.l_inodebits.bits ==
899                                     req->l_policy_data.l_inodebits.bits) {
900                                         /* insert point is last lock of
901                                          * the policy group */
902                                         prev->res_link =
903                                                 &policy_end->l_res_link;
904                                         prev->mode_link =
905                                                 &policy_end->l_sl_mode;
906                                         prev->policy_link =
907                                                 &policy_end->l_sl_policy;
908                                         EXIT;
909                                         return;
910                                 }
911
912                                 if (policy_end == mode_end)
913                                         /* done with mode group */
914                                         break;
915
916                                 /* go to next policy group within mode group */
917                                 tmp = policy_end->l_res_link.next;
918                                 lock = cfs_list_entry(tmp, struct ldlm_lock,
919                                                       l_res_link);
920                         }  /* loop over policy groups within the mode group */
921
922                         /* insert point is last lock of the mode group,
923                          * new policy group is started */
924                         prev->res_link = &mode_end->l_res_link;
925                         prev->mode_link = &mode_end->l_sl_mode;
926                         prev->policy_link = &req->l_sl_policy;
927                         EXIT;
928                         return;
929                 } else {
930                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
931                         LBUG();
932                 }
933         }
934
935         /* insert point is last lock on the queue,
936          * new mode group and new policy group are started */
937         prev->res_link = queue->prev;
938         prev->mode_link = &req->l_sl_mode;
939         prev->policy_link = &req->l_sl_policy;
940         EXIT;
941         return;
942 }
943
944 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
945                                        struct sl_insert_point *prev)
946 {
947         struct ldlm_resource *res = lock->l_resource;
948         ENTRY;
949
950         check_res_locked(res);
951
952         ldlm_resource_dump(D_INFO, res);
953         LDLM_DEBUG(lock, "About to add lock:");
954
955         if (lock->l_destroyed) {
956                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
957                 return;
958         }
959
960         LASSERT(cfs_list_empty(&lock->l_res_link));
961         LASSERT(cfs_list_empty(&lock->l_sl_mode));
962         LASSERT(cfs_list_empty(&lock->l_sl_policy));
963
964         cfs_list_add(&lock->l_res_link, prev->res_link);
965         cfs_list_add(&lock->l_sl_mode, prev->mode_link);
966         cfs_list_add(&lock->l_sl_policy, prev->policy_link);
967
968         EXIT;
969 }
970
971 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
972 {
973         struct sl_insert_point prev;
974         ENTRY;
975
976         LASSERT(lock->l_req_mode == lock->l_granted_mode);
977
978         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
979         ldlm_granted_list_add_lock(lock, &prev);
980         EXIT;
981 }
982
983 /* NOTE: called by
984  *  - ldlm_lock_enqueue
985  *  - ldlm_reprocess_queue
986  *  - ldlm_lock_convert
987  *
988  * must be called with lr_lock held
989  */
990 void ldlm_grant_lock(struct ldlm_lock *lock, cfs_list_t *work_list)
991 {
992         struct ldlm_resource *res = lock->l_resource;
993         ENTRY;
994
995         check_res_locked(res);
996
997         lock->l_granted_mode = lock->l_req_mode;
998         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
999                 ldlm_grant_lock_with_skiplist(lock);
1000         else if (res->lr_type == LDLM_EXTENT)
1001                 ldlm_extent_add_lock(res, lock);
1002         else
1003                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1004
1005         if (lock->l_granted_mode < res->lr_most_restr)
1006                 res->lr_most_restr = lock->l_granted_mode;
1007
1008         if (work_list && lock->l_completion_ast != NULL)
1009                 ldlm_add_ast_work_item(lock, NULL, work_list);
1010
1011         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1012         EXIT;
1013 }
1014
1015 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
1016  * comment above ldlm_lock_match */
1017 static struct ldlm_lock *search_queue(cfs_list_t *queue,
1018                                       ldlm_mode_t *mode,
1019                                       ldlm_policy_data_t *policy,
1020                                       struct ldlm_lock *old_lock,
1021                                       int flags, int unref)
1022 {
1023         struct ldlm_lock *lock;
1024         cfs_list_t       *tmp;
1025
1026         cfs_list_for_each(tmp, queue) {
1027                 ldlm_mode_t match;
1028
1029                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1030
1031                 if (lock == old_lock)
1032                         break;
1033
1034                 /* llite sometimes wants to match locks that will be
1035                  * canceled when their users drop, but we allow it to match
1036                  * if it passes in CBPENDING and the lock still has users.
1037                  * this is generally only going to be used by children
1038                  * whose parents already hold a lock so forward progress
1039                  * can still happen. */
1040                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1041                     !(flags & LDLM_FL_CBPENDING))
1042                         continue;
1043                 if (!unref && lock->l_flags & LDLM_FL_CBPENDING &&
1044                     lock->l_readers == 0 && lock->l_writers == 0)
1045                         continue;
1046
1047                 if (!(lock->l_req_mode & *mode))
1048                         continue;
1049                 match = lock->l_req_mode;
1050
1051                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
1052                     (lock->l_policy_data.l_extent.start >
1053                      policy->l_extent.start ||
1054                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
1055                         continue;
1056
1057                 if (unlikely(match == LCK_GROUP) &&
1058                     lock->l_resource->lr_type == LDLM_EXTENT &&
1059                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1060                         continue;
1061
1062                 /* We match if we have existing lock with same or wider set
1063                    of bits. */
1064                 if (lock->l_resource->lr_type == LDLM_IBITS &&
1065                      ((lock->l_policy_data.l_inodebits.bits &
1066                       policy->l_inodebits.bits) !=
1067                       policy->l_inodebits.bits))
1068                         continue;
1069
1070                 if (!unref &&
1071                     (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1072                      lock->l_failed))
1073                         continue;
1074
1075                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
1076                     !(lock->l_flags & LDLM_FL_LOCAL))
1077                         continue;
1078
1079                 if (flags & LDLM_FL_TEST_LOCK) {
1080                         LDLM_LOCK_GET(lock);
1081                         ldlm_lock_touch_in_lru(lock);
1082                 } else {
1083                         ldlm_lock_addref_internal_nolock(lock, match);
1084                 }
1085                 *mode = match;
1086                 return lock;
1087         }
1088
1089         return NULL;
1090 }
1091
1092 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1093 {
1094         if (!lock->l_failed) {
1095                 lock->l_failed = 1;
1096                 cfs_waitq_broadcast(&lock->l_waitq);
1097         }
1098 }
1099 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1100
1101 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1102 {
1103         lock_res_and_lock(lock);
1104         ldlm_lock_fail_match_locked(lock);
1105         unlock_res_and_lock(lock);
1106 }
1107 EXPORT_SYMBOL(ldlm_lock_fail_match);
1108
1109 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1110 {
1111         lock->l_flags |= LDLM_FL_LVB_READY;
1112         cfs_waitq_broadcast(&lock->l_waitq);
1113 }
1114
1115 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1116 {
1117         lock_res_and_lock(lock);
1118         ldlm_lock_allow_match_locked(lock);
1119         unlock_res_and_lock(lock);
1120 }
1121
1122 /* Can be called in two ways:
1123  *
1124  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1125  * for a duplicate of.
1126  *
1127  * Otherwise, all of the fields must be filled in, to match against.
1128  *
1129  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1130  *     server (ie, connh is NULL)
1131  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1132  *     list will be considered
1133  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1134  *     to be canceled can still be matched as long as they still have reader
1135  *     or writer refernces
1136  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1137  *     just tell us if we would have matched.
1138  *
1139  * Returns 1 if it finds an already-existing lock that is compatible; in this
1140  * case, lockh is filled in with a addref()ed lock
1141  *
1142  * we also check security context, if that failed we simply return 0 (to keep
1143  * caller code unchanged), the context failure will be discovered by caller
1144  * sometime later.
1145  */
1146 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
1147                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1148                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1149                             struct lustre_handle *lockh, int unref)
1150 {
1151         struct ldlm_resource *res;
1152         struct ldlm_lock *lock, *old_lock = NULL;
1153         int rc = 0;
1154         ENTRY;
1155
1156         if (ns == NULL) {
1157                 old_lock = ldlm_handle2lock(lockh);
1158                 LASSERT(old_lock);
1159
1160                 ns = ldlm_lock_to_ns(old_lock);
1161                 res_id = &old_lock->l_resource->lr_name;
1162                 type = old_lock->l_resource->lr_type;
1163                 mode = old_lock->l_req_mode;
1164         }
1165
1166         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1167         if (res == NULL) {
1168                 LASSERT(old_lock == NULL);
1169                 RETURN(0);
1170         }
1171
1172         LDLM_RESOURCE_ADDREF(res);
1173         lock_res(res);
1174
1175         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1176                             flags, unref);
1177         if (lock != NULL)
1178                 GOTO(out, rc = 1);
1179         if (flags & LDLM_FL_BLOCK_GRANTED)
1180                 GOTO(out, rc = 0);
1181         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1182                             flags, unref);
1183         if (lock != NULL)
1184                 GOTO(out, rc = 1);
1185         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1186                             flags, unref);
1187         if (lock != NULL)
1188                 GOTO(out, rc = 1);
1189
1190         EXIT;
1191  out:
1192         unlock_res(res);
1193         LDLM_RESOURCE_DELREF(res);
1194         ldlm_resource_putref(res);
1195
1196         if (lock) {
1197                 ldlm_lock2handle(lock, lockh);
1198                 if ((flags & LDLM_FL_LVB_READY) &&
1199                     (!(lock->l_flags & LDLM_FL_LVB_READY))) {
1200                         struct l_wait_info lwi;
1201                         if (lock->l_completion_ast) {
1202                                 int err = lock->l_completion_ast(lock,
1203                                                           LDLM_FL_WAIT_NOREPROC,
1204                                                                  NULL);
1205                                 if (err) {
1206                                         if (flags & LDLM_FL_TEST_LOCK)
1207                                                 LDLM_LOCK_RELEASE(lock);
1208                                         else
1209                                                 ldlm_lock_decref_internal(lock,
1210                                                                           mode);
1211                                         rc = 0;
1212                                         goto out2;
1213                                 }
1214                         }
1215
1216                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1217                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1218
1219                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1220                         l_wait_event(lock->l_waitq,
1221                                      lock->l_flags & LDLM_FL_LVB_READY ||
1222                                      lock->l_failed,
1223                                      &lwi);
1224                         if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
1225                                 if (flags & LDLM_FL_TEST_LOCK)
1226                                         LDLM_LOCK_RELEASE(lock);
1227                                 else
1228                                         ldlm_lock_decref_internal(lock, mode);
1229                                 rc = 0;
1230                         }
1231                 }
1232         }
1233  out2:
1234         if (rc) {
1235                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1236                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1237                                 res_id->name[2] : policy->l_extent.start,
1238                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1239                                 res_id->name[3] : policy->l_extent.end);
1240
1241                 /* check user's security context */
1242                 if (lock->l_conn_export &&
1243                     sptlrpc_import_check_ctx(
1244                                 class_exp2cliimp(lock->l_conn_export))) {
1245                         if (!(flags & LDLM_FL_TEST_LOCK))
1246                                 ldlm_lock_decref_internal(lock, mode);
1247                         rc = 0;
1248                 }
1249
1250                 if (flags & LDLM_FL_TEST_LOCK)
1251                         LDLM_LOCK_RELEASE(lock);
1252
1253         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1254                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1255                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1256                                   type, mode, res_id->name[0], res_id->name[1],
1257                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1258                                         res_id->name[2] :policy->l_extent.start,
1259                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1260                                         res_id->name[3] : policy->l_extent.end);
1261         }
1262         if (old_lock)
1263                 LDLM_LOCK_PUT(old_lock);
1264
1265         return rc ? mode : 0;
1266 }
1267
1268 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1269                                         __u64 *bits)
1270 {
1271         struct ldlm_lock *lock;
1272         ldlm_mode_t mode = 0;
1273         ENTRY;
1274
1275         lock = ldlm_handle2lock(lockh);
1276         if (lock != NULL) {
1277                 lock_res_and_lock(lock);
1278                 if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
1279                     lock->l_failed)
1280                         GOTO(out, mode);
1281
1282                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1283                     lock->l_readers == 0 && lock->l_writers == 0)
1284                         GOTO(out, mode);
1285
1286                 if (bits)
1287                         *bits = lock->l_policy_data.l_inodebits.bits;
1288                 mode = lock->l_granted_mode;
1289                 ldlm_lock_addref_internal_nolock(lock, mode);
1290         }
1291
1292         EXIT;
1293
1294 out:
1295         if (lock != NULL) {
1296                 unlock_res_and_lock(lock);
1297                 LDLM_LOCK_PUT(lock);
1298         }
1299         return mode;
1300 }
1301 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1302
1303 /* Returns a referenced lock */
1304 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1305                                    const struct ldlm_res_id *res_id,
1306                                    ldlm_type_t type,
1307                                    ldlm_mode_t mode,
1308                                    const struct ldlm_callback_suite *cbs,
1309                                    void *data, __u32 lvb_len)
1310 {
1311         struct ldlm_lock *lock;
1312         struct ldlm_resource *res;
1313         ENTRY;
1314
1315         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1316         if (res == NULL)
1317                 RETURN(NULL);
1318
1319         lock = ldlm_lock_new(res);
1320
1321         if (lock == NULL)
1322                 RETURN(NULL);
1323
1324         lock->l_req_mode = mode;
1325         lock->l_ast_data = data;
1326         lock->l_pid = cfs_curproc_pid();
1327         lock->l_ns_srv = !!ns_is_server(ns);
1328         if (cbs) {
1329                 lock->l_blocking_ast = cbs->lcs_blocking;
1330                 lock->l_completion_ast = cbs->lcs_completion;
1331                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1332                 lock->l_weigh_ast = cbs->lcs_weigh;
1333         }
1334
1335         lock->l_tree_node = NULL;
1336         /* if this is the extent lock, allocate the interval tree node */
1337         if (type == LDLM_EXTENT) {
1338                 if (ldlm_interval_alloc(lock) == NULL)
1339                         GOTO(out, 0);
1340         }
1341
1342         if (lvb_len) {
1343                 lock->l_lvb_len = lvb_len;
1344                 OBD_ALLOC(lock->l_lvb_data, lvb_len);
1345                 if (lock->l_lvb_data == NULL)
1346                         GOTO(out, 0);
1347         }
1348
1349         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1350                 GOTO(out, 0);
1351
1352         RETURN(lock);
1353
1354 out:
1355         ldlm_lock_destroy(lock);
1356         LDLM_LOCK_RELEASE(lock);
1357         return NULL;
1358 }
1359
1360 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1361                                struct ldlm_lock **lockp,
1362                                void *cookie, int *flags)
1363 {
1364         struct ldlm_lock *lock = *lockp;
1365         struct ldlm_resource *res = lock->l_resource;
1366         int local = ns_is_client(ldlm_res_to_ns(res));
1367 #ifdef HAVE_SERVER_SUPPORT
1368         ldlm_processing_policy policy;
1369 #endif
1370         ldlm_error_t rc = ELDLM_OK;
1371         struct ldlm_interval *node = NULL;
1372         ENTRY;
1373
1374         lock->l_last_activity = cfs_time_current_sec();
1375         /* policies are not executed on the client or during replay */
1376         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1377             && !local && ns->ns_policy) {
1378                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1379                                    NULL);
1380                 if (rc == ELDLM_LOCK_REPLACED) {
1381                         /* The lock that was returned has already been granted,
1382                          * and placed into lockp.  If it's not the same as the
1383                          * one we passed in, then destroy the old one and our
1384                          * work here is done. */
1385                         if (lock != *lockp) {
1386                                 ldlm_lock_destroy(lock);
1387                                 LDLM_LOCK_RELEASE(lock);
1388                         }
1389                         *flags |= LDLM_FL_LOCK_CHANGED;
1390                         RETURN(0);
1391                 } else if (rc != ELDLM_OK ||
1392                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1393                         ldlm_lock_destroy(lock);
1394                         RETURN(rc);
1395                 }
1396         }
1397
1398         /* For a replaying lock, it might be already in granted list. So
1399          * unlinking the lock will cause the interval node to be freed, we
1400          * have to allocate the interval node early otherwise we can't regrant
1401          * this lock in the future. - jay */
1402         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1403                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1404
1405         lock_res_and_lock(lock);
1406         if (local && lock->l_req_mode == lock->l_granted_mode) {
1407                 /* The server returned a blocked lock, but it was granted
1408                  * before we got a chance to actually enqueue it.  We don't
1409                  * need to do anything else. */
1410                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1411                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1412                 GOTO(out, ELDLM_OK);
1413         }
1414
1415         ldlm_resource_unlink_lock(lock);
1416         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1417                 if (node == NULL) {
1418                         ldlm_lock_destroy_nolock(lock);
1419                         GOTO(out, rc = -ENOMEM);
1420                 }
1421
1422                 CFS_INIT_LIST_HEAD(&node->li_group);
1423                 ldlm_interval_attach(node, lock);
1424                 node = NULL;
1425         }
1426
1427         /* Some flags from the enqueue want to make it into the AST, via the
1428          * lock's l_flags. */
1429         lock->l_flags |= *flags & LDLM_AST_DISCARD_DATA;
1430
1431         /* This distinction between local lock trees is very important; a client
1432          * namespace only has information about locks taken by that client, and
1433          * thus doesn't have enough information to decide for itself if it can
1434          * be granted (below).  In this case, we do exactly what the server
1435          * tells us to do, as dictated by the 'flags'.
1436          *
1437          * We do exactly the same thing during recovery, when the server is
1438          * more or less trusting the clients not to lie.
1439          *
1440          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1441          * granted/converting queues. */
1442         if (local) {
1443                 if (*flags & LDLM_FL_BLOCK_CONV)
1444                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1445                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1446                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1447                 else
1448                         ldlm_grant_lock(lock, NULL);
1449                 GOTO(out, ELDLM_OK);
1450 #ifdef HAVE_SERVER_SUPPORT
1451         } else if (*flags & LDLM_FL_REPLAY) {
1452                 if (*flags & LDLM_FL_BLOCK_CONV) {
1453                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1454                         GOTO(out, ELDLM_OK);
1455                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1456                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1457                         GOTO(out, ELDLM_OK);
1458                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1459                         ldlm_grant_lock(lock, NULL);
1460                         GOTO(out, ELDLM_OK);
1461                 }
1462                 /* If no flags, fall through to normal enqueue path. */
1463         }
1464
1465         policy = ldlm_processing_policy_table[res->lr_type];
1466         policy(lock, flags, 1, &rc, NULL);
1467         GOTO(out, rc);
1468 #else
1469         } else {
1470                 CERROR("This is client-side-only module, cannot handle "
1471                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1472                 LBUG();
1473         }
1474 #endif
1475
1476 out:
1477         unlock_res_and_lock(lock);
1478         if (node)
1479                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1480         return rc;
1481 }
1482
1483 #ifdef HAVE_SERVER_SUPPORT
1484 /* Must be called with namespace taken: queue is waiting or converting. */
1485 int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue,
1486                          cfs_list_t *work_list)
1487 {
1488         cfs_list_t *tmp, *pos;
1489         ldlm_processing_policy policy;
1490         int flags;
1491         int rc = LDLM_ITER_CONTINUE;
1492         ldlm_error_t err;
1493         ENTRY;
1494
1495         check_res_locked(res);
1496
1497         policy = ldlm_processing_policy_table[res->lr_type];
1498         LASSERT(policy);
1499
1500         cfs_list_for_each_safe(tmp, pos, queue) {
1501                 struct ldlm_lock *pending;
1502                 pending = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1503
1504                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1505
1506                 flags = 0;
1507                 rc = policy(pending, &flags, 0, &err, work_list);
1508                 if (rc != LDLM_ITER_CONTINUE)
1509                         break;
1510         }
1511
1512         RETURN(rc);
1513 }
1514 #endif
1515
1516 static int
1517 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1518 {
1519         struct ldlm_cb_set_arg *arg = opaq;
1520         struct ldlm_lock_desc   d;
1521         int                     rc;
1522         struct ldlm_lock       *lock;
1523         ENTRY;
1524
1525         if (cfs_list_empty(arg->list))
1526                 RETURN(-ENOENT);
1527
1528         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1529
1530         /* nobody should touch l_bl_ast */
1531         lock_res_and_lock(lock);
1532         cfs_list_del_init(&lock->l_bl_ast);
1533
1534         LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1535         LASSERT(lock->l_bl_ast_run == 0);
1536         LASSERT(lock->l_blocking_lock);
1537         lock->l_bl_ast_run++;
1538         unlock_res_and_lock(lock);
1539
1540         ldlm_lock2desc(lock->l_blocking_lock, &d);
1541
1542         rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1543         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1544         lock->l_blocking_lock = NULL;
1545         LDLM_LOCK_RELEASE(lock);
1546
1547         RETURN(rc);
1548 }
1549
1550 static int
1551 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1552 {
1553         struct ldlm_cb_set_arg  *arg = opaq;
1554         int                      rc = 0;
1555         struct ldlm_lock        *lock;
1556         ldlm_completion_callback completion_callback;
1557         ENTRY;
1558
1559         if (cfs_list_empty(arg->list))
1560                 RETURN(-ENOENT);
1561
1562         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1563
1564         /* It's possible to receive a completion AST before we've set
1565          * the l_completion_ast pointer: either because the AST arrived
1566          * before the reply, or simply because there's a small race
1567          * window between receiving the reply and finishing the local
1568          * enqueue. (bug 842)
1569          *
1570          * This can't happen with the blocking_ast, however, because we
1571          * will never call the local blocking_ast until we drop our
1572          * reader/writer reference, which we won't do until we get the
1573          * reply and finish enqueueing. */
1574
1575         /* nobody should touch l_cp_ast */
1576         lock_res_and_lock(lock);
1577         cfs_list_del_init(&lock->l_cp_ast);
1578         LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
1579         /* save l_completion_ast since it can be changed by
1580          * mds_intent_policy(), see bug 14225 */
1581         completion_callback = lock->l_completion_ast;
1582         lock->l_flags &= ~LDLM_FL_CP_REQD;
1583         unlock_res_and_lock(lock);
1584
1585         if (completion_callback != NULL)
1586                 rc = completion_callback(lock, 0, (void *)arg);
1587         LDLM_LOCK_RELEASE(lock);
1588
1589         RETURN(rc);
1590 }
1591
1592 static int
1593 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1594 {
1595         struct ldlm_cb_set_arg *arg = opaq;
1596         struct ldlm_lock_desc   desc;
1597         int                     rc;
1598         struct ldlm_lock       *lock;
1599         ENTRY;
1600
1601         if (cfs_list_empty(arg->list))
1602                 RETURN(-ENOENT);
1603
1604         lock = cfs_list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1605         cfs_list_del_init(&lock->l_rk_ast);
1606
1607         /* the desc just pretend to exclusive */
1608         ldlm_lock2desc(lock, &desc);
1609         desc.l_req_mode = LCK_EX;
1610         desc.l_granted_mode = 0;
1611
1612         rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1613         LDLM_LOCK_RELEASE(lock);
1614
1615         RETURN(rc);
1616 }
1617
1618 int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
1619                       ldlm_desc_ast_t ast_type)
1620 {
1621         struct ldlm_cb_set_arg *arg;
1622         set_producer_func       work_ast_lock;
1623         int                     rc;
1624
1625         if (cfs_list_empty(rpc_list))
1626                 RETURN(0);
1627
1628         OBD_ALLOC_PTR(arg);
1629         if (arg == NULL)
1630                 RETURN(-ENOMEM);
1631
1632         cfs_atomic_set(&arg->restart, 0);
1633         arg->list = rpc_list;
1634
1635         switch (ast_type) {
1636                 case LDLM_WORK_BL_AST:
1637                         arg->type = LDLM_BL_CALLBACK;
1638                         work_ast_lock = ldlm_work_bl_ast_lock;
1639                         break;
1640                 case LDLM_WORK_CP_AST:
1641                         arg->type = LDLM_CP_CALLBACK;
1642                         work_ast_lock = ldlm_work_cp_ast_lock;
1643                         break;
1644                 case LDLM_WORK_REVOKE_AST:
1645                         arg->type = LDLM_BL_CALLBACK;
1646                         work_ast_lock = ldlm_work_revoke_ast_lock;
1647                         break;
1648                 default:
1649                         LBUG();
1650         }
1651
1652         /* We create a ptlrpc request set with flow control extension.
1653          * This request set will use the work_ast_lock function to produce new
1654          * requests and will send a new request each time one completes in order
1655          * to keep the number of requests in flight to ns_max_parallel_ast */
1656         arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1657                                      work_ast_lock, arg);
1658         if (arg->set == NULL)
1659                 GOTO(out, rc = -ENOMEM);
1660
1661         ptlrpc_set_wait(arg->set);
1662         ptlrpc_set_destroy(arg->set);
1663
1664         rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0;
1665         GOTO(out, rc);
1666 out:
1667         OBD_FREE_PTR(arg);
1668         return rc;
1669 }
1670
1671 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
1672 {
1673         ldlm_reprocess_all(res);
1674         return LDLM_ITER_CONTINUE;
1675 }
1676
1677 static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1678                               cfs_hlist_node_t *hnode, void *arg)
1679 {
1680         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1681         int    rc;
1682
1683         rc = reprocess_one_queue(res, arg);
1684
1685         return rc == LDLM_ITER_STOP;
1686 }
1687
1688 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
1689 {
1690         ENTRY;
1691
1692         if (ns != NULL) {
1693                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
1694                                          ldlm_reprocess_res, NULL);
1695         }
1696         EXIT;
1697 }
1698
1699 void ldlm_reprocess_all(struct ldlm_resource *res)
1700 {
1701         CFS_LIST_HEAD(rpc_list);
1702
1703 #ifdef HAVE_SERVER_SUPPORT
1704         int rc;
1705         ENTRY;
1706         /* Local lock trees don't get reprocessed. */
1707         if (ns_is_client(ldlm_res_to_ns(res))) {
1708                 EXIT;
1709                 return;
1710         }
1711
1712 restart:
1713         lock_res(res);
1714         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
1715         if (rc == LDLM_ITER_CONTINUE)
1716                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
1717         unlock_res(res);
1718
1719         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
1720                                LDLM_WORK_CP_AST);
1721         if (rc == -ERESTART) {
1722                 LASSERT(cfs_list_empty(&rpc_list));
1723                 goto restart;
1724         }
1725 #else
1726         ENTRY;
1727         if (!ns_is_client(ldlm_res_to_ns(res))) {
1728                 CERROR("This is client-side-only module, cannot handle "
1729                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1730                 LBUG();
1731         }
1732 #endif
1733         EXIT;
1734 }
1735
1736 void ldlm_cancel_callback(struct ldlm_lock *lock)
1737 {
1738         check_res_locked(lock->l_resource);
1739         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
1740                 lock->l_flags |= LDLM_FL_CANCEL;
1741                 if (lock->l_blocking_ast) {
1742                         // l_check_no_ns_lock(ns);
1743                         unlock_res_and_lock(lock);
1744                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
1745                                              LDLM_CB_CANCELING);
1746                         lock_res_and_lock(lock);
1747                 } else {
1748                         LDLM_DEBUG(lock, "no blocking ast");
1749                 }
1750         }
1751         lock->l_flags |= LDLM_FL_BL_DONE;
1752 }
1753
1754 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
1755 {
1756         if (req->l_resource->lr_type != LDLM_PLAIN &&
1757             req->l_resource->lr_type != LDLM_IBITS)
1758                 return;
1759
1760         cfs_list_del_init(&req->l_sl_policy);
1761         cfs_list_del_init(&req->l_sl_mode);
1762 }
1763
1764 void ldlm_lock_cancel(struct ldlm_lock *lock)
1765 {
1766         struct ldlm_resource *res;
1767         struct ldlm_namespace *ns;
1768         ENTRY;
1769
1770         lock_res_and_lock(lock);
1771
1772         res = lock->l_resource;
1773         ns  = ldlm_res_to_ns(res);
1774
1775         /* Please do not, no matter how tempting, remove this LBUG without
1776          * talking to me first. -phik */
1777         if (lock->l_readers || lock->l_writers) {
1778                 LDLM_ERROR(lock, "lock still has references");
1779                 LBUG();
1780         }
1781
1782         ldlm_del_waiting_lock(lock);
1783
1784         /* Releases cancel callback. */
1785         ldlm_cancel_callback(lock);
1786
1787         /* Yes, second time, just in case it was added again while we were
1788            running with no res lock in ldlm_cancel_callback */
1789         ldlm_del_waiting_lock(lock);
1790         ldlm_resource_unlink_lock(lock);
1791         ldlm_lock_destroy_nolock(lock);
1792
1793         if (lock->l_granted_mode == lock->l_req_mode)
1794                 ldlm_pool_del(&ns->ns_pool, lock);
1795
1796         /* Make sure we will not be called again for same lock what is possible
1797          * if not to zero out lock->l_granted_mode */
1798         lock->l_granted_mode = LCK_MINMODE;
1799         unlock_res_and_lock(lock);
1800
1801         EXIT;
1802 }
1803
1804 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
1805 {
1806         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1807         int rc = -EINVAL;
1808         ENTRY;
1809
1810         if (lock) {
1811                 if (lock->l_ast_data == NULL)
1812                         lock->l_ast_data = data;
1813                 if (lock->l_ast_data == data)
1814                         rc = 0;
1815                 LDLM_LOCK_PUT(lock);
1816         }
1817         RETURN(rc);
1818 }
1819 EXPORT_SYMBOL(ldlm_lock_set_data);
1820
1821 int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1822                                     cfs_hlist_node_t *hnode, void *data)
1823
1824 {
1825         struct obd_export    *exp  = data;
1826         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
1827         struct ldlm_resource *res;
1828
1829         res = ldlm_resource_getref(lock->l_resource);
1830         LDLM_LOCK_GET(lock);
1831
1832         LDLM_DEBUG(lock, "export %p", exp);
1833         ldlm_res_lvbo_update(res, NULL, 1);
1834         ldlm_lock_cancel(lock);
1835         ldlm_reprocess_all(res);
1836         ldlm_resource_putref(res);
1837         LDLM_LOCK_RELEASE(lock);
1838         return 0;
1839 }
1840
1841 void ldlm_cancel_locks_for_export(struct obd_export *exp)
1842 {
1843         cfs_hash_for_each_empty(exp->exp_lock_hash,
1844                                 ldlm_cancel_locks_for_export_cb, exp);
1845 }
1846
1847 /**
1848  * Downgrade an exclusive lock.
1849  *
1850  * A fast variant of ldlm_lock_convert for convertion of exclusive
1851  * locks. The convertion is always successful.
1852  *
1853  * \param lock A lock to convert
1854  * \param new_mode new lock mode
1855  */
1856 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
1857 {
1858         ENTRY;
1859
1860         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
1861         LASSERT(new_mode == LCK_COS);
1862
1863         lock_res_and_lock(lock);
1864         ldlm_resource_unlink_lock(lock);
1865         /*
1866          * Remove the lock from pool as it will be added again in
1867          * ldlm_grant_lock() called below.
1868          */
1869         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
1870
1871         lock->l_req_mode = new_mode;
1872         ldlm_grant_lock(lock, NULL);
1873         unlock_res_and_lock(lock);
1874         ldlm_reprocess_all(lock->l_resource);
1875
1876         EXIT;
1877 }
1878
1879 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
1880                                         __u32 *flags)
1881 {
1882         CFS_LIST_HEAD(rpc_list);
1883         struct ldlm_resource *res;
1884         struct ldlm_namespace *ns;
1885         int granted = 0;
1886         int old_mode;
1887         struct sl_insert_point prev;
1888         struct ldlm_interval *node;
1889         ENTRY;
1890
1891         if (new_mode == lock->l_granted_mode) { // No changes? Just return.
1892                 *flags |= LDLM_FL_BLOCK_GRANTED;
1893                 RETURN(lock->l_resource);
1894         }
1895
1896         /* I can't check the type of lock here because the bitlock of lock
1897          * is not held here, so do the allocation blindly. -jay */
1898         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1899         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
1900                 RETURN(NULL);
1901
1902         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
1903                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
1904
1905         lock_res_and_lock(lock);
1906
1907         res = lock->l_resource;
1908         ns  = ldlm_res_to_ns(res);
1909
1910         old_mode = lock->l_req_mode;
1911         lock->l_req_mode = new_mode;
1912         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
1913                 /* remember the lock position where the lock might be
1914                  * added back to the granted list later and also
1915                  * remember the join mode for skiplist fixing. */
1916                 prev.res_link = lock->l_res_link.prev;
1917                 prev.mode_link = lock->l_sl_mode.prev;
1918                 prev.policy_link = lock->l_sl_policy.prev;
1919                 ldlm_resource_unlink_lock(lock);
1920         } else {
1921                 ldlm_resource_unlink_lock(lock);
1922                 if (res->lr_type == LDLM_EXTENT) {
1923                         /* FIXME: ugly code, I have to attach the lock to a
1924                          * interval node again since perhaps it will be granted
1925                          * soon */
1926                         CFS_INIT_LIST_HEAD(&node->li_group);
1927                         ldlm_interval_attach(node, lock);
1928                         node = NULL;
1929                 }
1930         }
1931
1932         /*
1933          * Remove old lock from the pool before adding the lock with new
1934          * mode below in ->policy()
1935          */
1936         ldlm_pool_del(&ns->ns_pool, lock);
1937
1938         /* If this is a local resource, put it on the appropriate list. */
1939         if (ns_is_client(ldlm_res_to_ns(res))) {
1940                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
1941                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1942                 } else {
1943                         /* This should never happen, because of the way the
1944                          * server handles conversions. */
1945                         LDLM_ERROR(lock, "Erroneous flags %d on local lock\n",
1946                                    *flags);
1947                         LBUG();
1948
1949                         ldlm_grant_lock(lock, &rpc_list);
1950                         granted = 1;
1951                         /* FIXME: completion handling not with lr_lock held ! */
1952                         if (lock->l_completion_ast)
1953                                 lock->l_completion_ast(lock, 0, NULL);
1954                 }
1955 #ifdef HAVE_SERVER_SUPPORT
1956         } else {
1957                 int rc;
1958                 ldlm_error_t err;
1959                 int pflags = 0;
1960                 ldlm_processing_policy policy;
1961                 policy = ldlm_processing_policy_table[res->lr_type];
1962                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
1963                 if (rc == LDLM_ITER_STOP) {
1964                         lock->l_req_mode = old_mode;
1965                         if (res->lr_type == LDLM_EXTENT)
1966                                 ldlm_extent_add_lock(res, lock);
1967                         else
1968                                 ldlm_granted_list_add_lock(lock, &prev);
1969
1970                         res = NULL;
1971                 } else {
1972                         *flags |= LDLM_FL_BLOCK_GRANTED;
1973                         granted = 1;
1974                 }
1975         }
1976 #else
1977         } else {
1978                 CERROR("This is client-side-only module, cannot handle "
1979                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1980                 LBUG();
1981         }
1982 #endif
1983         unlock_res_and_lock(lock);
1984
1985         if (granted)
1986                 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
1987         if (node)
1988                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1989         RETURN(res);
1990 }
1991
1992 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
1993 {
1994         struct ldlm_lock *lock;
1995
1996         if (!((libcfs_debug | D_ERROR) & level))
1997                 return;
1998
1999         lock = ldlm_handle2lock(lockh);
2000         if (lock == NULL)
2001                 return;
2002
2003         LDLM_DEBUG_LIMIT(level, lock, "###");
2004
2005         LDLM_LOCK_PUT(lock);
2006 }
2007
2008 void _ldlm_lock_debug(struct ldlm_lock *lock,
2009                       struct libcfs_debug_msg_data *msgdata,
2010                       const char *fmt, ...)
2011 {
2012         va_list args;
2013         struct obd_export *exp = lock->l_export;
2014         struct ldlm_resource *resource = lock->l_resource;
2015         char *nid = "local";
2016
2017         va_start(args, fmt);
2018
2019         if (exp && exp->exp_connection) {
2020                 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2021         } else if (exp && exp->exp_obd != NULL) {
2022                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2023                 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2024         }
2025
2026         if (resource == NULL) {
2027                 libcfs_debug_vmsg2(msgdata, fmt, args,
2028                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2029                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2030                        "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
2031                        lock,
2032                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2033                        lock->l_readers, lock->l_writers,
2034                        ldlm_lockname[lock->l_granted_mode],
2035                        ldlm_lockname[lock->l_req_mode],
2036                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2037                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2038                        lock->l_pid, lock->l_callback_timeout);
2039                 va_end(args);
2040                 return;
2041         }
2042
2043         switch (resource->lr_type) {
2044         case LDLM_EXTENT:
2045                 libcfs_debug_vmsg2(msgdata, fmt, args,
2046                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2047                        "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64
2048                        "] (req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote:"
2049                        " "LPX64" expref: %d pid: %u timeout %lu\n",
2050                        ldlm_lock_to_ns_name(lock), lock,
2051                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2052                        lock->l_readers, lock->l_writers,
2053                        ldlm_lockname[lock->l_granted_mode],
2054                        ldlm_lockname[lock->l_req_mode],
2055                        resource->lr_name.name[0],
2056                        resource->lr_name.name[1],
2057                        cfs_atomic_read(&resource->lr_refcount),
2058                        ldlm_typename[resource->lr_type],
2059                        lock->l_policy_data.l_extent.start,
2060                        lock->l_policy_data.l_extent.end,
2061                        lock->l_req_extent.start, lock->l_req_extent.end,
2062                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2063                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2064                        lock->l_pid, lock->l_callback_timeout);
2065                 break;
2066
2067         case LDLM_FLOCK:
2068                 libcfs_debug_vmsg2(msgdata, fmt, args,
2069                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2070                        "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d "
2071                        "["LPU64"->"LPU64"] flags: "LPX64" nid: %s remote: "LPX64
2072                        " expref: %d pid: %u timeout: %lu\n",
2073                        ldlm_lock_to_ns_name(lock), lock,
2074                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2075                        lock->l_readers, lock->l_writers,
2076                        ldlm_lockname[lock->l_granted_mode],
2077                        ldlm_lockname[lock->l_req_mode],
2078                        resource->lr_name.name[0],
2079                        resource->lr_name.name[1],
2080                        cfs_atomic_read(&resource->lr_refcount),
2081                        ldlm_typename[resource->lr_type],
2082                        lock->l_policy_data.l_flock.pid,
2083                        lock->l_policy_data.l_flock.start,
2084                        lock->l_policy_data.l_flock.end,
2085                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2086                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2087                        lock->l_pid, lock->l_callback_timeout);
2088                 break;
2089
2090         case LDLM_IBITS:
2091                 libcfs_debug_vmsg2(msgdata, fmt, args,
2092                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2093                        "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "
2094                        "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2095                        "pid: %u timeout: %lu\n",
2096                        ldlm_lock_to_ns_name(lock),
2097                        lock, lock->l_handle.h_cookie,
2098                        cfs_atomic_read (&lock->l_refc),
2099                        lock->l_readers, lock->l_writers,
2100                        ldlm_lockname[lock->l_granted_mode],
2101                        ldlm_lockname[lock->l_req_mode],
2102                        resource->lr_name.name[0],
2103                        resource->lr_name.name[1],
2104                        lock->l_policy_data.l_inodebits.bits,
2105                        cfs_atomic_read(&resource->lr_refcount),
2106                        ldlm_typename[resource->lr_type],
2107                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2108                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2109                        lock->l_pid, lock->l_callback_timeout);
2110                 break;
2111
2112         default:
2113                 libcfs_debug_vmsg2(msgdata, fmt, args,
2114                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2115                        "res: "LPU64"/"LPU64" rrc: %d type: %s flags: "LPX64" "
2116                        "nid: %s remote: "LPX64" expref: %d pid: %u timeout %lu"
2117                        "\n",
2118                        ldlm_lock_to_ns_name(lock),
2119                        lock, lock->l_handle.h_cookie,
2120                        cfs_atomic_read (&lock->l_refc),
2121                        lock->l_readers, lock->l_writers,
2122                        ldlm_lockname[lock->l_granted_mode],
2123                        ldlm_lockname[lock->l_req_mode],
2124                        resource->lr_name.name[0],
2125                        resource->lr_name.name[1],
2126                        cfs_atomic_read(&resource->lr_refcount),
2127                        ldlm_typename[resource->lr_type],
2128                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2129                        exp ? cfs_atomic_read(&exp->exp_refcount) : -99,
2130                        lock->l_pid, lock->l_callback_timeout);
2131                 break;
2132         }
2133         va_end(args);
2134 }
2135 EXPORT_SYMBOL(_ldlm_lock_debug);