Whamcloud - gitweb
LU-969 debug: reduce stack usage
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/ldlm/ldlm_lock.c
40  *
41  * Author: Peter Braam <braam@clusterfs.com>
42  * Author: Phil Schwan <phil@clusterfs.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_LDLM
46
47 #ifdef __KERNEL__
48 # include <libcfs/libcfs.h>
49 # include <linux/lustre_intent.h>
50 #else
51 # include <liblustre.h>
52 #endif
53
54 #include <obd_class.h>
55 #include "ldlm_internal.h"
56
57 /* lock types */
58 char *ldlm_lockname[] = {
59         [0] "--",
60         [LCK_EX] "EX",
61         [LCK_PW] "PW",
62         [LCK_PR] "PR",
63         [LCK_CW] "CW",
64         [LCK_CR] "CR",
65         [LCK_NL] "NL",
66         [LCK_GROUP] "GROUP",
67         [LCK_COS] "COS"
68 };
69
70 char *ldlm_typename[] = {
71         [LDLM_PLAIN] "PLN",
72         [LDLM_EXTENT] "EXT",
73         [LDLM_FLOCK] "FLK",
74         [LDLM_IBITS] "IBT",
75 };
76
77 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
78         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
79         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
80         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire18_to_local,
81         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
82 };
83
84 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
85         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_wire_to_local,
86         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_wire_to_local,
87         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_wire21_to_local,
88         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_wire_to_local,
89 };
90
91 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
92         [LDLM_PLAIN - LDLM_MIN_TYPE] ldlm_plain_policy_local_to_wire,
93         [LDLM_EXTENT - LDLM_MIN_TYPE] ldlm_extent_policy_local_to_wire,
94         [LDLM_FLOCK - LDLM_MIN_TYPE] ldlm_flock_policy_local_to_wire,
95         [LDLM_IBITS - LDLM_MIN_TYPE] ldlm_ibits_policy_local_to_wire,
96 };
97
98 /**
99  * Converts lock policy from local format to on the wire lock_desc format
100  */
101 void ldlm_convert_policy_to_wire(ldlm_type_t type,
102                                  const ldlm_policy_data_t *lpolicy,
103                                  ldlm_wire_policy_data_t *wpolicy)
104 {
105         ldlm_policy_local_to_wire_t convert;
106
107         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
108
109         convert(lpolicy, wpolicy);
110 }
111
112 /**
113  * Converts lock policy from on the wire lock_desc format to local format
114  */
115 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
116                                   const ldlm_wire_policy_data_t *wpolicy,
117                                   ldlm_policy_data_t *lpolicy)
118 {
119         ldlm_policy_wire_to_local_t convert;
120         int new_client;
121
122         /** some badnes for 2.0.0 clients, but 2.0.0 isn't supported */
123         new_client = (exp->exp_connect_flags & OBD_CONNECT_FULL20) != 0;
124         if (new_client)
125                convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
126         else
127                convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
128
129         convert(wpolicy, lpolicy);
130 }
131
132 char *ldlm_it2str(int it)
133 {
134         switch (it) {
135         case IT_OPEN:
136                 return "open";
137         case IT_CREAT:
138                 return "creat";
139         case (IT_OPEN | IT_CREAT):
140                 return "open|creat";
141         case IT_READDIR:
142                 return "readdir";
143         case IT_GETATTR:
144                 return "getattr";
145         case IT_LOOKUP:
146                 return "lookup";
147         case IT_UNLINK:
148                 return "unlink";
149         case IT_GETXATTR:
150                 return "getxattr";
151         default:
152                 CERROR("Unknown intent %d\n", it);
153                 return "UNKNOWN";
154         }
155 }
156
157 extern cfs_mem_cache_t *ldlm_lock_slab;
158
159 static ldlm_processing_policy ldlm_processing_policy_table[] = {
160         [LDLM_PLAIN] ldlm_process_plain_lock,
161         [LDLM_EXTENT] ldlm_process_extent_lock,
162 #ifdef __KERNEL__
163         [LDLM_FLOCK] ldlm_process_flock_lock,
164 #endif
165         [LDLM_IBITS] ldlm_process_inodebits_lock,
166 };
167
168 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
169 {
170         return ldlm_processing_policy_table[res->lr_type];
171 }
172
173 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
174 {
175         ns->ns_policy = arg;
176 }
177
178 /*
179  * REFCOUNTED LOCK OBJECTS
180  */
181
182
183 /*
184  * Lock refcounts, during creation:
185  *   - one special one for allocation, dec'd only once in destroy
186  *   - one for being a lock that's in-use
187  *   - one for the addref associated with a new lock
188  */
189 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
190 {
191         cfs_atomic_inc(&lock->l_refc);
192         return lock;
193 }
194
195 static void ldlm_lock_free(struct ldlm_lock *lock, size_t size)
196 {
197         LASSERT(size == sizeof(*lock));
198         OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
199 }
200
201 void ldlm_lock_put(struct ldlm_lock *lock)
202 {
203         ENTRY;
204
205         LASSERT(lock->l_resource != LP_POISON);
206         LASSERT(cfs_atomic_read(&lock->l_refc) > 0);
207         if (cfs_atomic_dec_and_test(&lock->l_refc)) {
208                 struct ldlm_resource *res;
209
210                 LDLM_DEBUG(lock,
211                            "final lock_put on destroyed lock, freeing it.");
212
213                 res = lock->l_resource;
214                 LASSERT(lock->l_destroyed);
215                 LASSERT(cfs_list_empty(&lock->l_res_link));
216                 LASSERT(cfs_list_empty(&lock->l_pending_chain));
217
218                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
219                                      LDLM_NSS_LOCKS);
220                 lu_ref_del(&res->lr_reference, "lock", lock);
221                 ldlm_resource_putref(res);
222                 lock->l_resource = NULL;
223                 if (lock->l_export) {
224                         class_export_lock_put(lock->l_export, lock);
225                         lock->l_export = NULL;
226                 }
227
228                 if (lock->l_lvb_data != NULL)
229                         OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
230
231                 ldlm_interval_free(ldlm_interval_detach(lock));
232                 lu_ref_fini(&lock->l_reference);
233                 OBD_FREE_RCU_CB(lock, sizeof(*lock), &lock->l_handle,
234                                 ldlm_lock_free);
235         }
236
237         EXIT;
238 }
239
240 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
241 {
242         int rc = 0;
243         if (!cfs_list_empty(&lock->l_lru)) {
244                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
245
246                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
247                 cfs_list_del_init(&lock->l_lru);
248                 if (lock->l_flags & LDLM_FL_SKIPPED)
249                         lock->l_flags &= ~LDLM_FL_SKIPPED;
250                 LASSERT(ns->ns_nr_unused > 0);
251                 ns->ns_nr_unused--;
252                 rc = 1;
253         }
254         return rc;
255 }
256
257 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
258 {
259         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
260         int rc;
261
262         ENTRY;
263         if (lock->l_ns_srv) {
264                 LASSERT(cfs_list_empty(&lock->l_lru));
265                 RETURN(0);
266         }
267
268         cfs_spin_lock(&ns->ns_lock);
269         rc = ldlm_lock_remove_from_lru_nolock(lock);
270         cfs_spin_unlock(&ns->ns_lock);
271         EXIT;
272         return rc;
273 }
274
275 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
276 {
277         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
278
279         lock->l_last_used = cfs_time_current();
280         LASSERT(cfs_list_empty(&lock->l_lru));
281         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
282         cfs_list_add_tail(&lock->l_lru, &ns->ns_unused_list);
283         LASSERT(ns->ns_nr_unused >= 0);
284         ns->ns_nr_unused++;
285 }
286
287 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
288 {
289         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
290
291         ENTRY;
292         cfs_spin_lock(&ns->ns_lock);
293         ldlm_lock_add_to_lru_nolock(lock);
294         cfs_spin_unlock(&ns->ns_lock);
295         EXIT;
296 }
297
298 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
299 {
300         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
301
302         ENTRY;
303         if (lock->l_ns_srv) {
304                 LASSERT(cfs_list_empty(&lock->l_lru));
305                 EXIT;
306                 return;
307         }
308
309         cfs_spin_lock(&ns->ns_lock);
310         if (!cfs_list_empty(&lock->l_lru)) {
311                 ldlm_lock_remove_from_lru_nolock(lock);
312                 ldlm_lock_add_to_lru_nolock(lock);
313         }
314         cfs_spin_unlock(&ns->ns_lock);
315         EXIT;
316 }
317
318 /* This used to have a 'strict' flag, which recovery would use to mark an
319  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
320  * shall explain why it's gone: with the new hash table scheme, once you call
321  * ldlm_lock_destroy, you can never drop your final references on this lock.
322  * Because it's not in the hash table anymore.  -phil */
323 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
324 {
325         ENTRY;
326
327         if (lock->l_readers || lock->l_writers) {
328                 LDLM_ERROR(lock, "lock still has references");
329                 ldlm_lock_dump(D_ERROR, lock, 0);
330                 LBUG();
331         }
332
333         if (!cfs_list_empty(&lock->l_res_link)) {
334                 LDLM_ERROR(lock, "lock still on resource");
335                 ldlm_lock_dump(D_ERROR, lock, 0);
336                 LBUG();
337         }
338
339         if (lock->l_destroyed) {
340                 LASSERT(cfs_list_empty(&lock->l_lru));
341                 EXIT;
342                 return 0;
343         }
344         lock->l_destroyed = 1;
345
346         if (lock->l_export && lock->l_export->exp_lock_hash &&
347             !cfs_hlist_unhashed(&lock->l_exp_hash))
348                 cfs_hash_del(lock->l_export->exp_lock_hash,
349                              &lock->l_remote_handle, &lock->l_exp_hash);
350
351         ldlm_lock_remove_from_lru(lock);
352         class_handle_unhash(&lock->l_handle);
353
354 #if 0
355         /* Wake anyone waiting for this lock */
356         /* FIXME: I should probably add yet another flag, instead of using
357          * l_export to only call this on clients */
358         if (lock->l_export)
359                 class_export_put(lock->l_export);
360         lock->l_export = NULL;
361         if (lock->l_export && lock->l_completion_ast)
362                 lock->l_completion_ast(lock, 0);
363 #endif
364         EXIT;
365         return 1;
366 }
367
368 void ldlm_lock_destroy(struct ldlm_lock *lock)
369 {
370         int first;
371         ENTRY;
372         lock_res_and_lock(lock);
373         first = ldlm_lock_destroy_internal(lock);
374         unlock_res_and_lock(lock);
375
376         /* drop reference from hashtable only for first destroy */
377         if (first) {
378                 lu_ref_del(&lock->l_reference, "hash", lock);
379                 LDLM_LOCK_RELEASE(lock);
380         }
381         EXIT;
382 }
383
384 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
385 {
386         int first;
387         ENTRY;
388         first = ldlm_lock_destroy_internal(lock);
389         /* drop reference from hashtable only for first destroy */
390         if (first) {
391                 lu_ref_del(&lock->l_reference, "hash", lock);
392                 LDLM_LOCK_RELEASE(lock);
393         }
394         EXIT;
395 }
396
397 /* this is called by portals_handle2object with the handle lock taken */
398 static void lock_handle_addref(void *lock)
399 {
400         LDLM_LOCK_GET((struct ldlm_lock *)lock);
401 }
402
403 /*
404  * usage: pass in a resource on which you have done ldlm_resource_get
405  *        new lock will take over the refcount.
406  * returns: lock with refcount 2 - one for current caller and one for remote
407  */
408 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
409 {
410         struct ldlm_lock *lock;
411         ENTRY;
412
413         if (resource == NULL)
414                 LBUG();
415
416         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, CFS_ALLOC_IO);
417         if (lock == NULL)
418                 RETURN(NULL);
419
420         cfs_spin_lock_init(&lock->l_lock);
421         lock->l_resource = resource;
422         lu_ref_add(&resource->lr_reference, "lock", lock);
423
424         cfs_atomic_set(&lock->l_refc, 2);
425         CFS_INIT_LIST_HEAD(&lock->l_res_link);
426         CFS_INIT_LIST_HEAD(&lock->l_lru);
427         CFS_INIT_LIST_HEAD(&lock->l_pending_chain);
428         CFS_INIT_LIST_HEAD(&lock->l_bl_ast);
429         CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
430         CFS_INIT_LIST_HEAD(&lock->l_rk_ast);
431         cfs_waitq_init(&lock->l_waitq);
432         lock->l_blocking_lock = NULL;
433         CFS_INIT_LIST_HEAD(&lock->l_sl_mode);
434         CFS_INIT_LIST_HEAD(&lock->l_sl_policy);
435         CFS_INIT_HLIST_NODE(&lock->l_exp_hash);
436
437         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
438                              LDLM_NSS_LOCKS);
439         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
440         class_handle_hash(&lock->l_handle, lock_handle_addref);
441
442         lu_ref_init(&lock->l_reference);
443         lu_ref_add(&lock->l_reference, "hash", lock);
444         lock->l_callback_timeout = 0;
445
446 #if LUSTRE_TRACKS_LOCK_EXP_REFS
447         CFS_INIT_LIST_HEAD(&lock->l_exp_refs_link);
448         lock->l_exp_refs_nr = 0;
449         lock->l_exp_refs_target = NULL;
450 #endif
451         CFS_INIT_LIST_HEAD(&lock->l_exp_list);
452
453         RETURN(lock);
454 }
455
456 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
457                               const struct ldlm_res_id *new_resid)
458 {
459         struct ldlm_resource *oldres = lock->l_resource;
460         struct ldlm_resource *newres;
461         int type;
462         ENTRY;
463
464         LASSERT(ns_is_client(ns));
465
466         lock_res_and_lock(lock);
467         if (memcmp(new_resid, &lock->l_resource->lr_name,
468                    sizeof(lock->l_resource->lr_name)) == 0) {
469                 /* Nothing to do */
470                 unlock_res_and_lock(lock);
471                 RETURN(0);
472         }
473
474         LASSERT(new_resid->name[0] != 0);
475
476         /* This function assumes that the lock isn't on any lists */
477         LASSERT(cfs_list_empty(&lock->l_res_link));
478
479         type = oldres->lr_type;
480         unlock_res_and_lock(lock);
481
482         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
483         if (newres == NULL)
484                 RETURN(-ENOMEM);
485
486         lu_ref_add(&newres->lr_reference, "lock", lock);
487         /*
488          * To flip the lock from the old to the new resource, lock, oldres and
489          * newres have to be locked. Resource spin-locks are nested within
490          * lock->l_lock, and are taken in the memory address order to avoid
491          * dead-locks.
492          */
493         cfs_spin_lock(&lock->l_lock);
494         oldres = lock->l_resource;
495         if (oldres < newres) {
496                 lock_res(oldres);
497                 lock_res_nested(newres, LRT_NEW);
498         } else {
499                 lock_res(newres);
500                 lock_res_nested(oldres, LRT_NEW);
501         }
502         LASSERT(memcmp(new_resid, &oldres->lr_name,
503                        sizeof oldres->lr_name) != 0);
504         lock->l_resource = newres;
505         unlock_res(oldres);
506         unlock_res_and_lock(lock);
507
508         /* ...and the flowers are still standing! */
509         lu_ref_del(&oldres->lr_reference, "lock", lock);
510         ldlm_resource_putref(oldres);
511
512         RETURN(0);
513 }
514
515 /*
516  *  HANDLES
517  */
518
519 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
520 {
521         lockh->cookie = lock->l_handle.h_cookie;
522 }
523
524 /* if flags: atomically get the lock and set the flags.
525  *           Return NULL if flag already set
526  */
527
528 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
529                                      int flags)
530 {
531         struct ldlm_lock *lock;
532         ENTRY;
533
534         LASSERT(handle);
535
536         lock = class_handle2object(handle->cookie);
537         if (lock == NULL)
538                 RETURN(NULL);
539
540         /* It's unlikely but possible that someone marked the lock as
541          * destroyed after we did handle2object on it */
542         if (flags == 0 && !lock->l_destroyed) {
543                 lu_ref_add(&lock->l_reference, "handle", cfs_current());
544                 RETURN(lock);
545         }
546
547         lock_res_and_lock(lock);
548
549         LASSERT(lock->l_resource != NULL);
550
551         lu_ref_add_atomic(&lock->l_reference, "handle", cfs_current());
552         if (unlikely(lock->l_destroyed)) {
553                 unlock_res_and_lock(lock);
554                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
555                 LDLM_LOCK_PUT(lock);
556                 RETURN(NULL);
557         }
558
559         if (flags && (lock->l_flags & flags)) {
560                 unlock_res_and_lock(lock);
561                 LDLM_LOCK_PUT(lock);
562                 RETURN(NULL);
563         }
564
565         if (flags)
566                 lock->l_flags |= flags;
567
568         unlock_res_and_lock(lock);
569         RETURN(lock);
570 }
571
572 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
573 {
574         struct obd_export *exp = lock->l_export?:lock->l_conn_export;
575         /* INODEBITS_INTEROP: If the other side does not support
576          * inodebits, reply with a plain lock descriptor.
577          */
578         if ((lock->l_resource->lr_type == LDLM_IBITS) &&
579             (exp && !(exp->exp_connect_flags & OBD_CONNECT_IBITS))) {
580                 /* Make sure all the right bits are set in this lock we
581                    are going to pass to client */
582                 LASSERTF(lock->l_policy_data.l_inodebits.bits ==
583                          (MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE),
584                          "Inappropriate inode lock bits during "
585                          "conversion " LPU64 "\n",
586                          lock->l_policy_data.l_inodebits.bits);
587
588                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
589                 desc->l_resource.lr_type = LDLM_PLAIN;
590
591                 /* Convert "new" lock mode to something old client can
592                    understand */
593                 if ((lock->l_req_mode == LCK_CR) ||
594                     (lock->l_req_mode == LCK_CW))
595                         desc->l_req_mode = LCK_PR;
596                 else
597                         desc->l_req_mode = lock->l_req_mode;
598                 if ((lock->l_granted_mode == LCK_CR) ||
599                     (lock->l_granted_mode == LCK_CW)) {
600                         desc->l_granted_mode = LCK_PR;
601                 } else {
602                         /* We never grant PW/EX locks to clients */
603                         LASSERT((lock->l_granted_mode != LCK_PW) &&
604                                 (lock->l_granted_mode != LCK_EX));
605                         desc->l_granted_mode = lock->l_granted_mode;
606                 }
607
608                 /* We do not copy policy here, because there is no
609                    policy for plain locks */
610         } else {
611                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
612                 desc->l_req_mode = lock->l_req_mode;
613                 desc->l_granted_mode = lock->l_granted_mode;
614                 ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
615                                             &lock->l_policy_data,
616                                             &desc->l_policy_data);
617         }
618 }
619
620 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
621                            cfs_list_t *work_list)
622 {
623         if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
624                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
625                 lock->l_flags |= LDLM_FL_AST_SENT;
626                 /* If the enqueuing client said so, tell the AST recipient to
627                  * discard dirty data, rather than writing back. */
628                 if (new->l_flags & LDLM_AST_DISCARD_DATA)
629                         lock->l_flags |= LDLM_FL_DISCARD_DATA;
630                 LASSERT(cfs_list_empty(&lock->l_bl_ast));
631                 cfs_list_add(&lock->l_bl_ast, work_list);
632                 LDLM_LOCK_GET(lock);
633                 LASSERT(lock->l_blocking_lock == NULL);
634                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
635         }
636 }
637
638 void ldlm_add_cp_work_item(struct ldlm_lock *lock, cfs_list_t *work_list)
639 {
640         if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
641                 lock->l_flags |= LDLM_FL_CP_REQD;
642                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
643                 LASSERT(cfs_list_empty(&lock->l_cp_ast));
644                 cfs_list_add(&lock->l_cp_ast, work_list);
645                 LDLM_LOCK_GET(lock);
646         }
647 }
648
649 /* must be called with lr_lock held */
650 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
651                             cfs_list_t *work_list)
652 {
653         ENTRY;
654         check_res_locked(lock->l_resource);
655         if (new)
656                 ldlm_add_bl_work_item(lock, new, work_list);
657         else
658                 ldlm_add_cp_work_item(lock, work_list);
659         EXIT;
660 }
661
662 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
663 {
664         struct ldlm_lock *lock;
665
666         lock = ldlm_handle2lock(lockh);
667         LASSERT(lock != NULL);
668         ldlm_lock_addref_internal(lock, mode);
669         LDLM_LOCK_PUT(lock);
670 }
671
672 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
673 {
674         ldlm_lock_remove_from_lru(lock);
675         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
676                 lock->l_readers++;
677                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
678         }
679         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
680                 lock->l_writers++;
681                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
682         }
683         LDLM_LOCK_GET(lock);
684         lu_ref_add_atomic(&lock->l_reference, "user", lock);
685         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
686 }
687
688 /**
689  * Attempts to addref a lock, and fails if lock is already LDLM_FL_CBPENDING
690  * or destroyed.
691  *
692  * \retval 0 success, lock was addref-ed
693  *
694  * \retval -EAGAIN lock is being canceled.
695  */
696 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
697 {
698         struct ldlm_lock *lock;
699         int               result;
700
701         result = -EAGAIN;
702         lock = ldlm_handle2lock(lockh);
703         if (lock != NULL) {
704                 lock_res_and_lock(lock);
705                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
706                     !(lock->l_flags & LDLM_FL_CBPENDING)) {
707                         ldlm_lock_addref_internal_nolock(lock, mode);
708                         result = 0;
709                 }
710                 unlock_res_and_lock(lock);
711                 LDLM_LOCK_PUT(lock);
712         }
713         return result;
714 }
715
716 /* only called for local locks */
717 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
718 {
719         lock_res_and_lock(lock);
720         ldlm_lock_addref_internal_nolock(lock, mode);
721         unlock_res_and_lock(lock);
722 }
723
724 /* only called in ldlm_flock_destroy and for local locks.
725  *  * for LDLM_FLOCK type locks, l_blocking_ast is null, and
726  *   * ldlm_lock_remove_from_lru() does nothing, it is safe
727  *    * for ldlm_flock_destroy usage by dropping some code */
728 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
729 {
730         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
731         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
732                 LASSERT(lock->l_readers > 0);
733                 lu_ref_del(&lock->l_reference, "reader", lock);
734                 lock->l_readers--;
735         }
736         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
737                 LASSERT(lock->l_writers > 0);
738                 lu_ref_del(&lock->l_reference, "writer", lock);
739                 lock->l_writers--;
740         }
741
742         lu_ref_del(&lock->l_reference, "user", lock);
743         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
744 }
745
746 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
747 {
748         struct ldlm_namespace *ns;
749         ENTRY;
750
751         lock_res_and_lock(lock);
752
753         ns = ldlm_lock_to_ns(lock);
754
755         ldlm_lock_decref_internal_nolock(lock, mode);
756
757         if (lock->l_flags & LDLM_FL_LOCAL &&
758             !lock->l_readers && !lock->l_writers) {
759                 /* If this is a local lock on a server namespace and this was
760                  * the last reference, cancel the lock. */
761                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
762                 lock->l_flags |= LDLM_FL_CBPENDING;
763         }
764
765         if (!lock->l_readers && !lock->l_writers &&
766             (lock->l_flags & LDLM_FL_CBPENDING)) {
767                 /* If we received a blocked AST and this was the last reference,
768                  * run the callback. */
769                 if (lock->l_ns_srv && lock->l_export)
770                         CERROR("FL_CBPENDING set on non-local lock--just a "
771                                "warning\n");
772
773                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
774
775                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
776                 ldlm_lock_remove_from_lru(lock);
777                 unlock_res_and_lock(lock);
778
779                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
780                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
781
782                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
783                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
784                         ldlm_handle_bl_callback(ns, NULL, lock);
785         } else if (ns_is_client(ns) &&
786                    !lock->l_readers && !lock->l_writers &&
787                    !(lock->l_flags & LDLM_FL_BL_AST)) {
788                 /* If this is a client-side namespace and this was the last
789                  * reference, put it on the LRU. */
790                 ldlm_lock_add_to_lru(lock);
791                 unlock_res_and_lock(lock);
792
793                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
794                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
795
796                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
797                  * are not supported by the server, otherwise, it is done on
798                  * enqueue. */
799                 if (!exp_connect_cancelset(lock->l_conn_export) &&
800                     !ns_connect_lru_resize(ns))
801                         ldlm_cancel_lru(ns, 0, LDLM_ASYNC, 0);
802         } else {
803                 unlock_res_and_lock(lock);
804         }
805
806         EXIT;
807 }
808
809 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
810 {
811         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
812         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
813         ldlm_lock_decref_internal(lock, mode);
814         LDLM_LOCK_PUT(lock);
815 }
816
817 /* This will drop a lock reference and mark it for destruction, but will not
818  * necessarily cancel the lock before returning. */
819 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
820 {
821         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
822         ENTRY;
823
824         LASSERT(lock != NULL);
825
826         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
827         lock_res_and_lock(lock);
828         lock->l_flags |= LDLM_FL_CBPENDING;
829         unlock_res_and_lock(lock);
830         ldlm_lock_decref_internal(lock, mode);
831         LDLM_LOCK_PUT(lock);
832 }
833
834 struct sl_insert_point {
835         cfs_list_t *res_link;
836         cfs_list_t *mode_link;
837         cfs_list_t *policy_link;
838 };
839
840 /*
841  * search_granted_lock
842  *
843  * Description:
844  *      Finds a position to insert the new lock.
845  * Parameters:
846  *      queue [input]:  the granted list where search acts on;
847  *      req [input]:    the lock whose position to be located;
848  *      prev [output]:  positions within 3 lists to insert @req to
849  * Return Value:
850  *      filled @prev
851  * NOTE: called by
852  *  - ldlm_grant_lock_with_skiplist
853  */
854 static void search_granted_lock(cfs_list_t *queue,
855                                 struct ldlm_lock *req,
856                                 struct sl_insert_point *prev)
857 {
858         cfs_list_t *tmp;
859         struct ldlm_lock *lock, *mode_end, *policy_end;
860         ENTRY;
861
862         cfs_list_for_each(tmp, queue) {
863                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
864
865                 mode_end = cfs_list_entry(lock->l_sl_mode.prev,
866                                           struct ldlm_lock, l_sl_mode);
867
868                 if (lock->l_req_mode != req->l_req_mode) {
869                         /* jump to last lock of mode group */
870                         tmp = &mode_end->l_res_link;
871                         continue;
872                 }
873
874                 /* suitable mode group is found */
875                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
876                         /* insert point is last lock of the mode group */
877                         prev->res_link = &mode_end->l_res_link;
878                         prev->mode_link = &mode_end->l_sl_mode;
879                         prev->policy_link = &req->l_sl_policy;
880                         EXIT;
881                         return;
882                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
883                         for (;;) {
884                                 policy_end =
885                                         cfs_list_entry(lock->l_sl_policy.prev,
886                                                        struct ldlm_lock,
887                                                        l_sl_policy);
888
889                                 if (lock->l_policy_data.l_inodebits.bits ==
890                                     req->l_policy_data.l_inodebits.bits) {
891                                         /* insert point is last lock of
892                                          * the policy group */
893                                         prev->res_link =
894                                                 &policy_end->l_res_link;
895                                         prev->mode_link =
896                                                 &policy_end->l_sl_mode;
897                                         prev->policy_link =
898                                                 &policy_end->l_sl_policy;
899                                         EXIT;
900                                         return;
901                                 }
902
903                                 if (policy_end == mode_end)
904                                         /* done with mode group */
905                                         break;
906
907                                 /* go to next policy group within mode group */
908                                 tmp = policy_end->l_res_link.next;
909                                 lock = cfs_list_entry(tmp, struct ldlm_lock,
910                                                       l_res_link);
911                         }  /* loop over policy groups within the mode group */
912
913                         /* insert point is last lock of the mode group,
914                          * new policy group is started */
915                         prev->res_link = &mode_end->l_res_link;
916                         prev->mode_link = &mode_end->l_sl_mode;
917                         prev->policy_link = &req->l_sl_policy;
918                         EXIT;
919                         return;
920                 } else {
921                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
922                         LBUG();
923                 }
924         }
925
926         /* insert point is last lock on the queue,
927          * new mode group and new policy group are started */
928         prev->res_link = queue->prev;
929         prev->mode_link = &req->l_sl_mode;
930         prev->policy_link = &req->l_sl_policy;
931         EXIT;
932         return;
933 }
934
935 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
936                                        struct sl_insert_point *prev)
937 {
938         struct ldlm_resource *res = lock->l_resource;
939         ENTRY;
940
941         check_res_locked(res);
942
943         ldlm_resource_dump(D_INFO, res);
944         CDEBUG(D_OTHER, "About to add this lock:\n");
945         ldlm_lock_dump(D_OTHER, lock, 0);
946
947         if (lock->l_destroyed) {
948                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
949                 return;
950         }
951
952         LASSERT(cfs_list_empty(&lock->l_res_link));
953         LASSERT(cfs_list_empty(&lock->l_sl_mode));
954         LASSERT(cfs_list_empty(&lock->l_sl_policy));
955
956         cfs_list_add(&lock->l_res_link, prev->res_link);
957         cfs_list_add(&lock->l_sl_mode, prev->mode_link);
958         cfs_list_add(&lock->l_sl_policy, prev->policy_link);
959
960         EXIT;
961 }
962
963 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
964 {
965         struct sl_insert_point prev;
966         ENTRY;
967
968         LASSERT(lock->l_req_mode == lock->l_granted_mode);
969
970         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
971         ldlm_granted_list_add_lock(lock, &prev);
972         EXIT;
973 }
974
975 /* NOTE: called by
976  *  - ldlm_lock_enqueue
977  *  - ldlm_reprocess_queue
978  *  - ldlm_lock_convert
979  *
980  * must be called with lr_lock held
981  */
982 void ldlm_grant_lock(struct ldlm_lock *lock, cfs_list_t *work_list)
983 {
984         struct ldlm_resource *res = lock->l_resource;
985         ENTRY;
986
987         check_res_locked(res);
988
989         lock->l_granted_mode = lock->l_req_mode;
990         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
991                 ldlm_grant_lock_with_skiplist(lock);
992         else if (res->lr_type == LDLM_EXTENT)
993                 ldlm_extent_add_lock(res, lock);
994         else
995                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
996
997         if (lock->l_granted_mode < res->lr_most_restr)
998                 res->lr_most_restr = lock->l_granted_mode;
999
1000         if (work_list && lock->l_completion_ast != NULL)
1001                 ldlm_add_ast_work_item(lock, NULL, work_list);
1002
1003         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1004         EXIT;
1005 }
1006
1007 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
1008  * comment above ldlm_lock_match */
1009 static struct ldlm_lock *search_queue(cfs_list_t *queue,
1010                                       ldlm_mode_t *mode,
1011                                       ldlm_policy_data_t *policy,
1012                                       struct ldlm_lock *old_lock,
1013                                       int flags, int unref)
1014 {
1015         struct ldlm_lock *lock;
1016         cfs_list_t       *tmp;
1017
1018         cfs_list_for_each(tmp, queue) {
1019                 ldlm_mode_t match;
1020
1021                 lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1022
1023                 if (lock == old_lock)
1024                         break;
1025
1026                 /* llite sometimes wants to match locks that will be
1027                  * canceled when their users drop, but we allow it to match
1028                  * if it passes in CBPENDING and the lock still has users.
1029                  * this is generally only going to be used by children
1030                  * whose parents already hold a lock so forward progress
1031                  * can still happen. */
1032                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1033                     !(flags & LDLM_FL_CBPENDING))
1034                         continue;
1035                 if (!unref && lock->l_flags & LDLM_FL_CBPENDING &&
1036                     lock->l_readers == 0 && lock->l_writers == 0)
1037                         continue;
1038
1039                 if (!(lock->l_req_mode & *mode))
1040                         continue;
1041                 match = lock->l_req_mode;
1042
1043                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
1044                     (lock->l_policy_data.l_extent.start >
1045                      policy->l_extent.start ||
1046                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
1047                         continue;
1048
1049                 if (unlikely(match == LCK_GROUP) &&
1050                     lock->l_resource->lr_type == LDLM_EXTENT &&
1051                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1052                         continue;
1053
1054                 /* We match if we have existing lock with same or wider set
1055                    of bits. */
1056                 if (lock->l_resource->lr_type == LDLM_IBITS &&
1057                      ((lock->l_policy_data.l_inodebits.bits &
1058                       policy->l_inodebits.bits) !=
1059                       policy->l_inodebits.bits))
1060                         continue;
1061
1062                 if (!unref &&
1063                     (lock->l_destroyed || (lock->l_flags & LDLM_FL_FAILED)))
1064                         continue;
1065
1066                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
1067                     !(lock->l_flags & LDLM_FL_LOCAL))
1068                         continue;
1069
1070                 if (flags & LDLM_FL_TEST_LOCK) {
1071                         LDLM_LOCK_GET(lock);
1072                         ldlm_lock_touch_in_lru(lock);
1073                 } else {
1074                         ldlm_lock_addref_internal_nolock(lock, match);
1075                 }
1076                 *mode = match;
1077                 return lock;
1078         }
1079
1080         return NULL;
1081 }
1082
1083 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1084 {
1085         lock->l_flags |= LDLM_FL_LVB_READY;
1086         cfs_waitq_signal(&lock->l_waitq);
1087 }
1088
1089 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1090 {
1091         lock_res_and_lock(lock);
1092         ldlm_lock_allow_match_locked(lock);
1093         unlock_res_and_lock(lock);
1094 }
1095
1096 /* Can be called in two ways:
1097  *
1098  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1099  * for a duplicate of.
1100  *
1101  * Otherwise, all of the fields must be filled in, to match against.
1102  *
1103  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1104  *     server (ie, connh is NULL)
1105  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1106  *     list will be considered
1107  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1108  *     to be canceled can still be matched as long as they still have reader
1109  *     or writer refernces
1110  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1111  *     just tell us if we would have matched.
1112  *
1113  * Returns 1 if it finds an already-existing lock that is compatible; in this
1114  * case, lockh is filled in with a addref()ed lock
1115  *
1116  * we also check security context, if that failed we simply return 0 (to keep
1117  * caller code unchanged), the context failure will be discovered by caller
1118  * sometime later.
1119  */
1120 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
1121                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1122                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1123                             struct lustre_handle *lockh, int unref)
1124 {
1125         struct ldlm_resource *res;
1126         struct ldlm_lock *lock, *old_lock = NULL;
1127         int rc = 0;
1128         ENTRY;
1129
1130         if (ns == NULL) {
1131                 old_lock = ldlm_handle2lock(lockh);
1132                 LASSERT(old_lock);
1133
1134                 ns = ldlm_lock_to_ns(old_lock);
1135                 res_id = &old_lock->l_resource->lr_name;
1136                 type = old_lock->l_resource->lr_type;
1137                 mode = old_lock->l_req_mode;
1138         }
1139
1140         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1141         if (res == NULL) {
1142                 LASSERT(old_lock == NULL);
1143                 RETURN(0);
1144         }
1145
1146         LDLM_RESOURCE_ADDREF(res);
1147         lock_res(res);
1148
1149         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1150                             flags, unref);
1151         if (lock != NULL)
1152                 GOTO(out, rc = 1);
1153         if (flags & LDLM_FL_BLOCK_GRANTED)
1154                 GOTO(out, rc = 0);
1155         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1156                             flags, unref);
1157         if (lock != NULL)
1158                 GOTO(out, rc = 1);
1159         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1160                             flags, unref);
1161         if (lock != NULL)
1162                 GOTO(out, rc = 1);
1163
1164         EXIT;
1165  out:
1166         unlock_res(res);
1167         LDLM_RESOURCE_DELREF(res);
1168         ldlm_resource_putref(res);
1169
1170         if (lock) {
1171                 ldlm_lock2handle(lock, lockh);
1172                 if ((flags & LDLM_FL_LVB_READY) &&
1173                     (!(lock->l_flags & LDLM_FL_LVB_READY))) {
1174                         struct l_wait_info lwi;
1175                         if (lock->l_completion_ast) {
1176                                 int err = lock->l_completion_ast(lock,
1177                                                           LDLM_FL_WAIT_NOREPROC,
1178                                                                  NULL);
1179                                 if (err) {
1180                                         if (flags & LDLM_FL_TEST_LOCK)
1181                                                 LDLM_LOCK_RELEASE(lock);
1182                                         else
1183                                                 ldlm_lock_decref_internal(lock,
1184                                                                           mode);
1185                                         rc = 0;
1186                                         goto out2;
1187                                 }
1188                         }
1189
1190                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1191                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1192
1193                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1194                         l_wait_event(lock->l_waitq,
1195                                      (lock->l_flags & LDLM_FL_LVB_READY), &lwi);
1196                 }
1197         }
1198  out2:
1199         if (rc) {
1200                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1201                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1202                                 res_id->name[2] : policy->l_extent.start,
1203                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1204                                 res_id->name[3] : policy->l_extent.end);
1205
1206                 /* check user's security context */
1207                 if (lock->l_conn_export &&
1208                     sptlrpc_import_check_ctx(
1209                                 class_exp2cliimp(lock->l_conn_export))) {
1210                         if (!(flags & LDLM_FL_TEST_LOCK))
1211                                 ldlm_lock_decref_internal(lock, mode);
1212                         rc = 0;
1213                 }
1214
1215                 if (flags & LDLM_FL_TEST_LOCK)
1216                         LDLM_LOCK_RELEASE(lock);
1217
1218         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1219                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1220                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1221                                   type, mode, res_id->name[0], res_id->name[1],
1222                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1223                                         res_id->name[2] :policy->l_extent.start,
1224                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1225                                         res_id->name[3] : policy->l_extent.end);
1226         }
1227         if (old_lock)
1228                 LDLM_LOCK_PUT(old_lock);
1229
1230         return rc ? mode : 0;
1231 }
1232
1233 /* Returns a referenced lock */
1234 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1235                                    const struct ldlm_res_id *res_id,
1236                                    ldlm_type_t type,
1237                                    ldlm_mode_t mode,
1238                                    const struct ldlm_callback_suite *cbs,
1239                                    void *data, __u32 lvb_len)
1240 {
1241         struct ldlm_lock *lock;
1242         struct ldlm_resource *res;
1243         ENTRY;
1244
1245         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1246         if (res == NULL)
1247                 RETURN(NULL);
1248
1249         lock = ldlm_lock_new(res);
1250
1251         if (lock == NULL)
1252                 RETURN(NULL);
1253
1254         lock->l_req_mode = mode;
1255         lock->l_ast_data = data;
1256         lock->l_pid = cfs_curproc_pid();
1257         lock->l_ns_srv = ns_is_server(ns);
1258         if (cbs) {
1259                 lock->l_blocking_ast = cbs->lcs_blocking;
1260                 lock->l_completion_ast = cbs->lcs_completion;
1261                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1262                 lock->l_weigh_ast = cbs->lcs_weigh;
1263         }
1264
1265         lock->l_tree_node = NULL;
1266         /* if this is the extent lock, allocate the interval tree node */
1267         if (type == LDLM_EXTENT) {
1268                 if (ldlm_interval_alloc(lock) == NULL)
1269                         GOTO(out, 0);
1270         }
1271
1272         if (lvb_len) {
1273                 lock->l_lvb_len = lvb_len;
1274                 OBD_ALLOC(lock->l_lvb_data, lvb_len);
1275                 if (lock->l_lvb_data == NULL)
1276                         GOTO(out, 0);
1277         }
1278
1279         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1280                 GOTO(out, 0);
1281
1282         RETURN(lock);
1283
1284 out:
1285         ldlm_lock_destroy(lock);
1286         LDLM_LOCK_RELEASE(lock);
1287         return NULL;
1288 }
1289
1290 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1291                                struct ldlm_lock **lockp,
1292                                void *cookie, int *flags)
1293 {
1294         struct ldlm_lock *lock = *lockp;
1295         struct ldlm_resource *res = lock->l_resource;
1296         int local = ns_is_client(ldlm_res_to_ns(res));
1297         ldlm_processing_policy policy;
1298         ldlm_error_t rc = ELDLM_OK;
1299         struct ldlm_interval *node = NULL;
1300         ENTRY;
1301
1302         lock->l_last_activity = cfs_time_current_sec();
1303         /* policies are not executed on the client or during replay */
1304         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1305             && !local && ns->ns_policy) {
1306                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1307                                    NULL);
1308                 if (rc == ELDLM_LOCK_REPLACED) {
1309                         /* The lock that was returned has already been granted,
1310                          * and placed into lockp.  If it's not the same as the
1311                          * one we passed in, then destroy the old one and our
1312                          * work here is done. */
1313                         if (lock != *lockp) {
1314                                 ldlm_lock_destroy(lock);
1315                                 LDLM_LOCK_RELEASE(lock);
1316                         }
1317                         *flags |= LDLM_FL_LOCK_CHANGED;
1318                         RETURN(0);
1319                 } else if (rc != ELDLM_OK ||
1320                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1321                         ldlm_lock_destroy(lock);
1322                         RETURN(rc);
1323                 }
1324         }
1325
1326         /* For a replaying lock, it might be already in granted list. So
1327          * unlinking the lock will cause the interval node to be freed, we
1328          * have to allocate the interval node early otherwise we can't regrant
1329          * this lock in the future. - jay */
1330         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1331                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1332
1333         lock_res_and_lock(lock);
1334         if (local && lock->l_req_mode == lock->l_granted_mode) {
1335                 /* The server returned a blocked lock, but it was granted
1336                  * before we got a chance to actually enqueue it.  We don't
1337                  * need to do anything else. */
1338                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1339                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1340                 GOTO(out, ELDLM_OK);
1341         }
1342
1343         ldlm_resource_unlink_lock(lock);
1344         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1345                 if (node == NULL) {
1346                         ldlm_lock_destroy_nolock(lock);
1347                         GOTO(out, rc = -ENOMEM);
1348                 }
1349
1350                 CFS_INIT_LIST_HEAD(&node->li_group);
1351                 ldlm_interval_attach(node, lock);
1352                 node = NULL;
1353         }
1354
1355         /* Some flags from the enqueue want to make it into the AST, via the
1356          * lock's l_flags. */
1357         lock->l_flags |= *flags & LDLM_AST_DISCARD_DATA;
1358
1359         /* This distinction between local lock trees is very important; a client
1360          * namespace only has information about locks taken by that client, and
1361          * thus doesn't have enough information to decide for itself if it can
1362          * be granted (below).  In this case, we do exactly what the server
1363          * tells us to do, as dictated by the 'flags'.
1364          *
1365          * We do exactly the same thing during recovery, when the server is
1366          * more or less trusting the clients not to lie.
1367          *
1368          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1369          * granted/converting queues. */
1370         if (local) {
1371                 if (*flags & LDLM_FL_BLOCK_CONV)
1372                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1373                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1374                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1375                 else
1376                         ldlm_grant_lock(lock, NULL);
1377                 GOTO(out, ELDLM_OK);
1378         } else if (*flags & LDLM_FL_REPLAY) {
1379                 if (*flags & LDLM_FL_BLOCK_CONV) {
1380                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1381                         GOTO(out, ELDLM_OK);
1382                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
1383                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1384                         GOTO(out, ELDLM_OK);
1385                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
1386                         ldlm_grant_lock(lock, NULL);
1387                         GOTO(out, ELDLM_OK);
1388                 }
1389                 /* If no flags, fall through to normal enqueue path. */
1390         }
1391
1392         policy = ldlm_processing_policy_table[res->lr_type];
1393         policy(lock, flags, 1, &rc, NULL);
1394         GOTO(out, rc);
1395 out:
1396         unlock_res_and_lock(lock);
1397         if (node)
1398                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1399         return rc;
1400 }
1401
1402 /* Must be called with namespace taken: queue is waiting or converting. */
1403 int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue,
1404                          cfs_list_t *work_list)
1405 {
1406         cfs_list_t *tmp, *pos;
1407         ldlm_processing_policy policy;
1408         int flags;
1409         int rc = LDLM_ITER_CONTINUE;
1410         ldlm_error_t err;
1411         ENTRY;
1412
1413         check_res_locked(res);
1414
1415         policy = ldlm_processing_policy_table[res->lr_type];
1416         LASSERT(policy);
1417
1418         cfs_list_for_each_safe(tmp, pos, queue) {
1419                 struct ldlm_lock *pending;
1420                 pending = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
1421
1422                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
1423
1424                 flags = 0;
1425                 rc = policy(pending, &flags, 0, &err, work_list);
1426                 if (rc != LDLM_ITER_CONTINUE)
1427                         break;
1428         }
1429
1430         RETURN(rc);
1431 }
1432
1433 /* Helper function for ldlm_run_ast_work().
1434  *
1435  * Send an existing rpc set specified by @arg->set and then
1436  * destroy it. Create new one if @do_create flag is set. */
1437 static void
1438 ldlm_send_and_maybe_create_set(struct ldlm_cb_set_arg *arg, int do_create)
1439 {
1440         ENTRY;
1441
1442         ptlrpc_set_wait(arg->set);
1443         if (arg->type == LDLM_BL_CALLBACK)
1444                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
1445         ptlrpc_set_destroy(arg->set);
1446
1447         if (do_create)
1448                 arg->set = ptlrpc_prep_set();
1449
1450         EXIT;
1451 }
1452
1453 static int
1454 ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
1455 {
1456         struct ldlm_lock_desc d;
1457         struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock,
1458                                                 l_bl_ast);
1459         ENTRY;
1460
1461         /* nobody should touch l_bl_ast */
1462         lock_res_and_lock(lock);
1463         cfs_list_del_init(&lock->l_bl_ast);
1464
1465         LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1466         LASSERT(lock->l_bl_ast_run == 0);
1467         LASSERT(lock->l_blocking_lock);
1468         lock->l_bl_ast_run++;
1469         unlock_res_and_lock(lock);
1470
1471         ldlm_lock2desc(lock->l_blocking_lock, &d);
1472
1473         lock->l_blocking_ast(lock, &d, (void *)arg,
1474                              LDLM_CB_BLOCKING);
1475         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1476         lock->l_blocking_lock = NULL;
1477         LDLM_LOCK_RELEASE(lock);
1478
1479         RETURN(1);
1480 }
1481
1482 static int
1483 ldlm_work_cp_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
1484 {
1485         struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock, l_cp_ast);
1486         ldlm_completion_callback completion_callback;
1487         int rc = 0;
1488         ENTRY;
1489
1490         /* It's possible to receive a completion AST before we've set
1491          * the l_completion_ast pointer: either because the AST arrived
1492          * before the reply, or simply because there's a small race
1493          * window between receiving the reply and finishing the local
1494          * enqueue. (bug 842)
1495          *
1496          * This can't happen with the blocking_ast, however, because we
1497          * will never call the local blocking_ast until we drop our
1498          * reader/writer reference, which we won't do until we get the
1499          * reply and finish enqueueing. */
1500
1501         /* nobody should touch l_cp_ast */
1502         lock_res_and_lock(lock);
1503         cfs_list_del_init(&lock->l_cp_ast);
1504         LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
1505         /* save l_completion_ast since it can be changed by
1506          * mds_intent_policy(), see bug 14225 */
1507         completion_callback = lock->l_completion_ast;
1508         lock->l_flags &= ~LDLM_FL_CP_REQD;
1509         unlock_res_and_lock(lock);
1510
1511         if (completion_callback != NULL) {
1512                 completion_callback(lock, 0, (void *)arg);
1513                 rc = 1;
1514         }
1515         LDLM_LOCK_RELEASE(lock);
1516
1517         RETURN(rc);
1518 }
1519
1520 static int
1521 ldlm_work_revoke_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
1522 {
1523         struct ldlm_lock_desc desc;
1524         struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock,
1525                                                 l_rk_ast);
1526         ENTRY;
1527
1528         cfs_list_del_init(&lock->l_rk_ast);
1529
1530         /* the desc just pretend to exclusive */
1531         ldlm_lock2desc(lock, &desc);
1532         desc.l_req_mode = LCK_EX;
1533         desc.l_granted_mode = 0;
1534
1535         lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1536         LDLM_LOCK_RELEASE(lock);
1537
1538         RETURN(1);
1539 }
1540
1541 int ldlm_run_ast_work(cfs_list_t *rpc_list, ldlm_desc_ast_t ast_type)
1542 {
1543         struct ldlm_cb_set_arg arg;
1544         cfs_list_t *tmp, *pos;
1545         int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg);
1546         int ast_count;
1547         ENTRY;
1548
1549         if (cfs_list_empty(rpc_list))
1550                 RETURN(0);
1551
1552         arg.set = ptlrpc_prep_set();
1553         if (NULL == arg.set)
1554                 RETURN(-ERESTART);
1555         cfs_atomic_set(&arg.restart, 0);
1556         switch (ast_type) {
1557         case LDLM_WORK_BL_AST:
1558                 arg.type = LDLM_BL_CALLBACK;
1559                 work_ast_lock = ldlm_work_bl_ast_lock;
1560                 break;
1561         case LDLM_WORK_CP_AST:
1562                 arg.type = LDLM_CP_CALLBACK;
1563                 work_ast_lock = ldlm_work_cp_ast_lock;
1564                 break;
1565         case LDLM_WORK_REVOKE_AST:
1566                 arg.type = LDLM_BL_CALLBACK;
1567                 work_ast_lock = ldlm_work_revoke_ast_lock;
1568                 break;
1569         default:
1570                 LBUG();
1571         }
1572
1573         ast_count = 0;
1574         cfs_list_for_each_safe(tmp, pos, rpc_list) {
1575                 ast_count += work_ast_lock(tmp, &arg);
1576
1577                 /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,
1578                  * and create a new set for requests that remained in
1579                  * @rpc_list */
1580                 if (unlikely(ast_count == PARALLEL_AST_LIMIT)) {
1581                         ldlm_send_and_maybe_create_set(&arg, 1);
1582                         ast_count = 0;
1583                 }
1584         }
1585
1586         if (ast_count > 0)
1587                 ldlm_send_and_maybe_create_set(&arg, 0);
1588         else
1589                 /* In case when number of ASTs is multiply of
1590                  * PARALLEL_AST_LIMIT or @rpc_list was initially empty,
1591                  * @arg.set must be destroyed here, otherwise we get
1592                  * write memory leaking. */
1593                 ptlrpc_set_destroy(arg.set);
1594
1595         RETURN(cfs_atomic_read(&arg.restart) ? -ERESTART : 0);
1596 }
1597
1598 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
1599 {
1600         ldlm_reprocess_all(res);
1601         return LDLM_ITER_CONTINUE;
1602 }
1603
1604 static int ldlm_reprocess_res(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1605                               cfs_hlist_node_t *hnode, void *arg)
1606 {
1607         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1608         int    rc;
1609
1610         rc = reprocess_one_queue(res, arg);
1611
1612         return rc == LDLM_ITER_STOP;
1613 }
1614
1615 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
1616 {
1617         ENTRY;
1618
1619         if (ns != NULL) {
1620                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
1621                                          ldlm_reprocess_res, NULL);
1622         }
1623         EXIT;
1624 }
1625
1626 void ldlm_reprocess_all(struct ldlm_resource *res)
1627 {
1628         CFS_LIST_HEAD(rpc_list);
1629         int rc;
1630         ENTRY;
1631
1632         /* Local lock trees don't get reprocessed. */
1633         if (ns_is_client(ldlm_res_to_ns(res))) {
1634                 EXIT;
1635                 return;
1636         }
1637
1638  restart:
1639         lock_res(res);
1640         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
1641         if (rc == LDLM_ITER_CONTINUE)
1642                 ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
1643         unlock_res(res);
1644
1645         rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST);
1646         if (rc == -ERESTART) {
1647                 LASSERT(cfs_list_empty(&rpc_list));
1648                 goto restart;
1649         }
1650         EXIT;
1651 }
1652
1653 void ldlm_cancel_callback(struct ldlm_lock *lock)
1654 {
1655         check_res_locked(lock->l_resource);
1656         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
1657                 lock->l_flags |= LDLM_FL_CANCEL;
1658                 if (lock->l_blocking_ast) {
1659                         // l_check_no_ns_lock(ns);
1660                         unlock_res_and_lock(lock);
1661                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
1662                                              LDLM_CB_CANCELING);
1663                         lock_res_and_lock(lock);
1664                 } else {
1665                         LDLM_DEBUG(lock, "no blocking ast");
1666                 }
1667         }
1668         lock->l_flags |= LDLM_FL_BL_DONE;
1669 }
1670
1671 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
1672 {
1673         if (req->l_resource->lr_type != LDLM_PLAIN &&
1674             req->l_resource->lr_type != LDLM_IBITS)
1675                 return;
1676
1677         cfs_list_del_init(&req->l_sl_policy);
1678         cfs_list_del_init(&req->l_sl_mode);
1679 }
1680
1681 void ldlm_lock_cancel(struct ldlm_lock *lock)
1682 {
1683         struct ldlm_resource *res;
1684         struct ldlm_namespace *ns;
1685         ENTRY;
1686
1687         lock_res_and_lock(lock);
1688
1689         res = lock->l_resource;
1690         ns  = ldlm_res_to_ns(res);
1691
1692         /* Please do not, no matter how tempting, remove this LBUG without
1693          * talking to me first. -phik */
1694         if (lock->l_readers || lock->l_writers) {
1695                 LDLM_ERROR(lock, "lock still has references");
1696                 LBUG();
1697         }
1698
1699         ldlm_del_waiting_lock(lock);
1700
1701         /* Releases cancel callback. */
1702         ldlm_cancel_callback(lock);
1703
1704         /* Yes, second time, just in case it was added again while we were
1705            running with no res lock in ldlm_cancel_callback */
1706         ldlm_del_waiting_lock(lock);
1707         ldlm_resource_unlink_lock(lock);
1708         ldlm_lock_destroy_nolock(lock);
1709
1710         if (lock->l_granted_mode == lock->l_req_mode)
1711                 ldlm_pool_del(&ns->ns_pool, lock);
1712
1713         /* Make sure we will not be called again for same lock what is possible
1714          * if not to zero out lock->l_granted_mode */
1715         lock->l_granted_mode = LCK_MINMODE;
1716         unlock_res_and_lock(lock);
1717
1718         EXIT;
1719 }
1720
1721 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
1722 {
1723         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1724         ENTRY;
1725
1726         if (lock == NULL)
1727                 RETURN(-EINVAL);
1728
1729         lock->l_ast_data = data;
1730         LDLM_LOCK_PUT(lock);
1731         RETURN(0);
1732 }
1733
1734 int ldlm_cancel_locks_for_export_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
1735                                     cfs_hlist_node_t *hnode, void *data)
1736
1737 {
1738         struct obd_export    *exp  = data;
1739         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
1740         struct ldlm_resource *res;
1741
1742         res = ldlm_resource_getref(lock->l_resource);
1743         LDLM_LOCK_GET(lock);
1744
1745         LDLM_DEBUG(lock, "export %p", exp);
1746         ldlm_res_lvbo_update(res, NULL, 1);
1747         ldlm_lock_cancel(lock);
1748         ldlm_reprocess_all(res);
1749         ldlm_resource_putref(res);
1750         LDLM_LOCK_RELEASE(lock);
1751         return 0;
1752 }
1753
1754 void ldlm_cancel_locks_for_export(struct obd_export *exp)
1755 {
1756         cfs_hash_for_each_empty(exp->exp_lock_hash,
1757                                 ldlm_cancel_locks_for_export_cb, exp);
1758 }
1759
1760 /**
1761  * Downgrade an exclusive lock.
1762  *
1763  * A fast variant of ldlm_lock_convert for convertion of exclusive
1764  * locks. The convertion is always successful.
1765  *
1766  * \param lock A lock to convert
1767  * \param new_mode new lock mode
1768  */
1769 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
1770 {
1771         ENTRY;
1772
1773         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
1774         LASSERT(new_mode == LCK_COS);
1775
1776         lock_res_and_lock(lock);
1777         ldlm_resource_unlink_lock(lock);
1778         /*
1779          * Remove the lock from pool as it will be added again in
1780          * ldlm_grant_lock() called below.
1781          */
1782         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
1783
1784         lock->l_req_mode = new_mode;
1785         ldlm_grant_lock(lock, NULL);
1786         unlock_res_and_lock(lock);
1787         ldlm_reprocess_all(lock->l_resource);
1788
1789         EXIT;
1790 }
1791
1792 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
1793                                         __u32 *flags)
1794 {
1795         CFS_LIST_HEAD(rpc_list);
1796         struct ldlm_resource *res;
1797         struct ldlm_namespace *ns;
1798         int granted = 0;
1799         int old_mode, rc;
1800         struct sl_insert_point prev;
1801         ldlm_error_t err;
1802         struct ldlm_interval *node;
1803         ENTRY;
1804
1805         if (new_mode == lock->l_granted_mode) { // No changes? Just return.
1806                 *flags |= LDLM_FL_BLOCK_GRANTED;
1807                 RETURN(lock->l_resource);
1808         }
1809
1810         /* I can't check the type of lock here because the bitlock of lock
1811          * is not held here, so do the allocation blindly. -jay */
1812         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, CFS_ALLOC_IO);
1813         if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */
1814                 RETURN(NULL);
1815
1816         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
1817                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
1818
1819         lock_res_and_lock(lock);
1820
1821         res = lock->l_resource;
1822         ns  = ldlm_res_to_ns(res);
1823
1824         old_mode = lock->l_req_mode;
1825         lock->l_req_mode = new_mode;
1826         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
1827                 /* remember the lock position where the lock might be
1828                  * added back to the granted list later and also
1829                  * remember the join mode for skiplist fixing. */
1830                 prev.res_link = lock->l_res_link.prev;
1831                 prev.mode_link = lock->l_sl_mode.prev;
1832                 prev.policy_link = lock->l_sl_policy.prev;
1833                 ldlm_resource_unlink_lock(lock);
1834         } else {
1835                 ldlm_resource_unlink_lock(lock);
1836                 if (res->lr_type == LDLM_EXTENT) {
1837                         /* FIXME: ugly code, I have to attach the lock to a
1838                          * interval node again since perhaps it will be granted
1839                          * soon */
1840                         CFS_INIT_LIST_HEAD(&node->li_group);
1841                         ldlm_interval_attach(node, lock);
1842                         node = NULL;
1843                 }
1844         }
1845
1846         /*
1847          * Remove old lock from the pool before adding the lock with new
1848          * mode below in ->policy()
1849          */
1850         ldlm_pool_del(&ns->ns_pool, lock);
1851
1852         /* If this is a local resource, put it on the appropriate list. */
1853         if (ns_is_client(ldlm_res_to_ns(res))) {
1854                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
1855                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1856                 } else {
1857                         /* This should never happen, because of the way the
1858                          * server handles conversions. */
1859                         LDLM_ERROR(lock, "Erroneous flags %d on local lock\n",
1860                                    *flags);
1861                         LBUG();
1862
1863                         ldlm_grant_lock(lock, &rpc_list);
1864                         granted = 1;
1865                         /* FIXME: completion handling not with lr_lock held ! */
1866                         if (lock->l_completion_ast)
1867                                 lock->l_completion_ast(lock, 0, NULL);
1868                 }
1869         } else {
1870                 int pflags = 0;
1871                 ldlm_processing_policy policy;
1872                 policy = ldlm_processing_policy_table[res->lr_type];
1873                 rc = policy(lock, &pflags, 0, &err, &rpc_list);
1874                 if (rc == LDLM_ITER_STOP) {
1875                         lock->l_req_mode = old_mode;
1876                         if (res->lr_type == LDLM_EXTENT)
1877                                 ldlm_extent_add_lock(res, lock);
1878                         else
1879                                 ldlm_granted_list_add_lock(lock, &prev);
1880
1881                         res = NULL;
1882                 } else {
1883                         *flags |= LDLM_FL_BLOCK_GRANTED;
1884                         granted = 1;
1885                 }
1886         }
1887         unlock_res_and_lock(lock);
1888
1889         if (granted)
1890                 ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST);
1891         if (node)
1892                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1893         RETURN(res);
1894 }
1895
1896 void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos)
1897 {
1898         struct obd_device *obd = NULL;
1899
1900         if (!((libcfs_debug | D_ERROR) & level))
1901                 return;
1902
1903         if (!lock) {
1904                 CDEBUG(level, "  NULL LDLM lock\n");
1905                 return;
1906         }
1907
1908         CDEBUG(level," -- Lock dump: %p/"LPX64" (rc: %d) (pos: %d) (pid: %d)\n",
1909                lock, lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
1910                pos, lock->l_pid);
1911         if (lock->l_conn_export != NULL)
1912                 obd = lock->l_conn_export->exp_obd;
1913         if (lock->l_export && lock->l_export->exp_connection) {
1914                 CDEBUG(level, "  Node: NID %s (rhandle: "LPX64")\n",
1915                      libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid),
1916                      lock->l_remote_handle.cookie);
1917         } else if (obd == NULL) {
1918                 CDEBUG(level, "  Node: local\n");
1919         } else {
1920                 struct obd_import *imp = obd->u.cli.cl_import;
1921                 CDEBUG(level, "  Node: NID %s (rhandle: "LPX64")\n",
1922                        libcfs_nid2str(imp->imp_connection->c_peer.nid),
1923                        lock->l_remote_handle.cookie);
1924         }
1925         CDEBUG(level, "  Resource: %p ("LPU64"/"LPU64"/"LPU64")\n",
1926                   lock->l_resource,
1927                   lock->l_resource->lr_name.name[0],
1928                   lock->l_resource->lr_name.name[1],
1929                   lock->l_resource->lr_name.name[2]);
1930         CDEBUG(level, "  Req mode: %s, grant mode: %s, rc: %u, read: %d, "
1931                "write: %d flags: "LPX64"\n", ldlm_lockname[lock->l_req_mode],
1932                ldlm_lockname[lock->l_granted_mode],
1933                cfs_atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers,
1934                lock->l_flags);
1935         if (lock->l_resource->lr_type == LDLM_EXTENT)
1936                 CDEBUG(level, "  Extent: "LPU64" -> "LPU64
1937                        " (req "LPU64"-"LPU64")\n",
1938                        lock->l_policy_data.l_extent.start,
1939                        lock->l_policy_data.l_extent.end,
1940                        lock->l_req_extent.start, lock->l_req_extent.end);
1941         else if (lock->l_resource->lr_type == LDLM_FLOCK)
1942                 CDEBUG(level, "  Pid: %d Extent: "LPU64" -> "LPU64"\n",
1943                        lock->l_policy_data.l_flock.pid,
1944                        lock->l_policy_data.l_flock.start,
1945                        lock->l_policy_data.l_flock.end);
1946        else if (lock->l_resource->lr_type == LDLM_IBITS)
1947                 CDEBUG(level, "  Bits: "LPX64"\n",
1948                        lock->l_policy_data.l_inodebits.bits);
1949 }
1950
1951 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
1952 {
1953         struct ldlm_lock *lock;
1954
1955         if (!((libcfs_debug | D_ERROR) & level))
1956                 return;
1957
1958         lock = ldlm_handle2lock(lockh);
1959         if (lock == NULL)
1960                 return;
1961
1962         ldlm_lock_dump(D_OTHER, lock, 0);
1963
1964         LDLM_LOCK_PUT(lock);
1965 }
1966
1967 void _ldlm_lock_debug(struct ldlm_lock *lock,
1968                       struct libcfs_debug_msg_data *msgdata,
1969                       const char *fmt, ...)
1970 {
1971         va_list args;
1972
1973         va_start(args, fmt);
1974
1975         if (lock->l_resource == NULL) {
1976                 libcfs_debug_vmsg2(msgdata, fmt, args,
1977                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1978                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" remote: "
1979                        LPX64" expref: %d pid: %u timeout: %lu\n", lock,
1980                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
1981                        lock->l_readers, lock->l_writers,
1982                        ldlm_lockname[lock->l_granted_mode],
1983                        ldlm_lockname[lock->l_req_mode],
1984                        lock->l_flags, lock->l_remote_handle.cookie,
1985                        lock->l_export ?
1986                        cfs_atomic_read(&lock->l_export->exp_refcount) : -99,
1987                        lock->l_pid, lock->l_callback_timeout);
1988                 va_end(args);
1989                 return;
1990         }
1991
1992         switch (lock->l_resource->lr_type) {
1993         case LDLM_EXTENT:
1994                 libcfs_debug_vmsg2(msgdata, fmt, args,
1995                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
1996                        "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64
1997                        "] (req "LPU64"->"LPU64") flags: "LPX64" remote: "LPX64
1998                        " expref: %d pid: %u timeout %lu\n",
1999                        ldlm_lock_to_ns_name(lock), lock,
2000                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2001                        lock->l_readers, lock->l_writers,
2002                        ldlm_lockname[lock->l_granted_mode],
2003                        ldlm_lockname[lock->l_req_mode],
2004                        lock->l_resource->lr_name.name[0],
2005                        lock->l_resource->lr_name.name[1],
2006                        cfs_atomic_read(&lock->l_resource->lr_refcount),
2007                        ldlm_typename[lock->l_resource->lr_type],
2008                        lock->l_policy_data.l_extent.start,
2009                        lock->l_policy_data.l_extent.end,
2010                        lock->l_req_extent.start, lock->l_req_extent.end,
2011                        lock->l_flags, lock->l_remote_handle.cookie,
2012                        lock->l_export ?
2013                        cfs_atomic_read(&lock->l_export->exp_refcount) : -99,
2014                        lock->l_pid, lock->l_callback_timeout);
2015                 break;
2016
2017         case LDLM_FLOCK:
2018                 libcfs_debug_vmsg2(msgdata, fmt, args,
2019                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2020                        "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d "
2021                        "["LPU64"->"LPU64"] flags: "LPX64" remote: "LPX64
2022                        " expref: %d pid: %u timeout: %lu\n",
2023                        ldlm_lock_to_ns_name(lock), lock,
2024                        lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc),
2025                        lock->l_readers, lock->l_writers,
2026                        ldlm_lockname[lock->l_granted_mode],
2027                        ldlm_lockname[lock->l_req_mode],
2028                        lock->l_resource->lr_name.name[0],
2029                        lock->l_resource->lr_name.name[1],
2030                        cfs_atomic_read(&lock->l_resource->lr_refcount),
2031                        ldlm_typename[lock->l_resource->lr_type],
2032                        lock->l_policy_data.l_flock.pid,
2033                        lock->l_policy_data.l_flock.start,
2034                        lock->l_policy_data.l_flock.end,
2035                        lock->l_flags, lock->l_remote_handle.cookie,
2036                        lock->l_export ?
2037                        cfs_atomic_read(&lock->l_export->exp_refcount) : -99,
2038                        lock->l_pid, lock->l_callback_timeout);
2039                 break;
2040
2041         case LDLM_IBITS:
2042                 libcfs_debug_vmsg2(msgdata, fmt, args,
2043                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2044                        "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "
2045                        "flags: "LPX64" remote: "LPX64" expref: %d "
2046                        "pid: %u timeout: %lu\n",
2047                        ldlm_lock_to_ns_name(lock),
2048                        lock, lock->l_handle.h_cookie,
2049                        cfs_atomic_read (&lock->l_refc),
2050                        lock->l_readers, lock->l_writers,
2051                        ldlm_lockname[lock->l_granted_mode],
2052                        ldlm_lockname[lock->l_req_mode],
2053                        lock->l_resource->lr_name.name[0],
2054                        lock->l_resource->lr_name.name[1],
2055                        lock->l_policy_data.l_inodebits.bits,
2056                        cfs_atomic_read(&lock->l_resource->lr_refcount),
2057                        ldlm_typename[lock->l_resource->lr_type],
2058                        lock->l_flags, lock->l_remote_handle.cookie,
2059                        lock->l_export ?
2060                        cfs_atomic_read(&lock->l_export->exp_refcount) : -99,
2061                        lock->l_pid, lock->l_callback_timeout);
2062                 break;
2063
2064         default:
2065                 libcfs_debug_vmsg2(msgdata, fmt, args,
2066                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2067                        "res: "LPU64"/"LPU64" rrc: %d type: %s flags: "LPX64" "
2068                        "remote: "LPX64" expref: %d pid: %u timeout %lu\n",
2069                        ldlm_lock_to_ns_name(lock),
2070                        lock, lock->l_handle.h_cookie,
2071                        cfs_atomic_read (&lock->l_refc),
2072                        lock->l_readers, lock->l_writers,
2073                        ldlm_lockname[lock->l_granted_mode],
2074                        ldlm_lockname[lock->l_req_mode],
2075                        lock->l_resource->lr_name.name[0],
2076                        lock->l_resource->lr_name.name[1],
2077                        cfs_atomic_read(&lock->l_resource->lr_refcount),
2078                        ldlm_typename[lock->l_resource->lr_type],
2079                        lock->l_flags, lock->l_remote_handle.cookie,
2080                        lock->l_export ?
2081                        cfs_atomic_read(&lock->l_export->exp_refcount) : -99,
2082                        lock->l_pid, lock->l_callback_timeout);
2083                 break;
2084         }
2085         va_end(args);
2086 }
2087 EXPORT_SYMBOL(_ldlm_lock_debug);