Whamcloud - gitweb
land b_eq on HEAD
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LDLM
25
26 #ifdef __KERNEL__
27 # include <linux/slab.h>
28 # include <linux/module.h>
29 # include <linux/lustre_dlm.h>
30 #else
31 # include <liblustre.h>
32 # include <linux/kp30.h>
33 #endif
34
35 #include <linux/obd_class.h>
36 #include "ldlm_internal.h"
37
38 //struct lustre_lock ldlm_everything_lock;
39
40 /* lock types */
41 char *ldlm_lockname[] = {
42         [0] "--",
43         [LCK_EX] "EX",
44         [LCK_PW] "PW",
45         [LCK_PR] "PR",
46         [LCK_CW] "CW",
47         [LCK_CR] "CR",
48         [LCK_NL] "NL"
49 };
50 char *ldlm_typename[] = {
51         [LDLM_PLAIN] "PLN",
52         [LDLM_EXTENT] "EXT",
53         [LDLM_FLOCK] "FLK",
54 };
55
56 char *ldlm_it2str(int it)
57 {
58         switch (it) {
59         case IT_OPEN:
60                 return "open";
61         case IT_CREAT:
62                 return "creat";
63         case (IT_OPEN | IT_CREAT):
64                 return "open|creat";
65         case IT_READDIR:
66                 return "readdir";
67         case IT_GETATTR:
68                 return "getattr";
69         case IT_LOOKUP:
70                 return "lookup";
71         case IT_UNLINK:
72                 return "unlink";
73         case IT_GETXATTR:
74                 return "getxattr";
75         default:
76                 CERROR("Unknown intent %d\n", it);
77                 return "UNKNOWN";
78         }
79 }
80
81 extern kmem_cache_t *ldlm_lock_slab;
82 struct lustre_lock ldlm_handle_lock;
83
84 static ldlm_processing_policy ldlm_processing_policy_table[] = {
85         [LDLM_PLAIN] ldlm_process_plain_lock,
86         [LDLM_EXTENT] ldlm_process_extent_lock,
87 #ifdef __KERNEL__
88         [LDLM_FLOCK] ldlm_process_flock_lock,
89 #endif
90 };
91
92 static ldlm_res_policy ldlm_intent_policy_func;
93
94 void ldlm_register_intent(ldlm_res_policy arg)
95 {
96         ldlm_intent_policy_func = arg;
97 }
98
99 void ldlm_unregister_intent(void)
100 {
101         ldlm_intent_policy_func = NULL;
102 }
103
104 /*
105  * REFCOUNTED LOCK OBJECTS
106  */
107
108
109 /*
110  * Lock refcounts, during creation:
111  *   - one special one for allocation, dec'd only once in destroy
112  *   - one for being a lock that's in-use
113  *   - one for the addref associated with a new lock
114  */
115 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
116 {
117         atomic_inc(&lock->l_refc);
118         return lock;
119 }
120
121 void ldlm_lock_put(struct ldlm_lock *lock)
122 {
123         ENTRY;
124
125         if (atomic_dec_and_test(&lock->l_refc)) {
126                 struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
127
128                 l_lock(&ns->ns_lock);
129                 LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing");
130                 LASSERT(lock->l_destroyed);
131                 LASSERT(list_empty(&lock->l_res_link));
132
133                 spin_lock(&ns->ns_counter_lock);
134                 ns->ns_locks--;
135                 spin_unlock(&ns->ns_counter_lock);
136
137                 ldlm_resource_putref(lock->l_resource);
138                 lock->l_resource = NULL;
139                 if (lock->l_export)
140                         class_export_put(lock->l_export);
141
142                 if (lock->l_parent)
143                         LDLM_LOCK_PUT(lock->l_parent);
144
145                 OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
146                 l_unlock(&ns->ns_lock);
147         }
148
149         EXIT;
150 }
151
152 void ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
153 {
154         ENTRY;
155         l_lock(&lock->l_resource->lr_namespace->ns_lock);
156         if (!list_empty(&lock->l_lru)) {
157                 list_del_init(&lock->l_lru);
158                 lock->l_resource->lr_namespace->ns_nr_unused--;
159                 LASSERT(lock->l_resource->lr_namespace->ns_nr_unused >= 0);
160         }
161         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
162         EXIT;
163 }
164
165 /* This used to have a 'strict' flact, which recovery would use to mark an
166  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
167  * shall explain why it's gone: with the new hash table scheme, once you call
168  * ldlm_lock_destroy, you can never drop your final references on this lock.
169  * Because it's not in the hash table anymore.  -phil */
170 void ldlm_lock_destroy(struct ldlm_lock *lock)
171 {
172         ENTRY;
173         l_lock(&lock->l_resource->lr_namespace->ns_lock);
174
175         if (!list_empty(&lock->l_children)) {
176                 LDLM_ERROR(lock, "still has children (%p)!",
177                            lock->l_children.next);
178                 ldlm_lock_dump(D_ERROR, lock, 0);
179                 LBUG();
180         }
181         if (lock->l_readers || lock->l_writers) {
182                 LDLM_ERROR(lock, "lock still has references");
183                 ldlm_lock_dump(D_ERROR, lock, 0);
184                 LBUG();
185         }
186
187         if (!list_empty(&lock->l_res_link)) {
188                 ldlm_lock_dump(D_ERROR, lock, 0);
189                 LBUG();
190         }
191
192         if (lock->l_destroyed) {
193                 LASSERT(list_empty(&lock->l_lru));
194                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
195                 EXIT;
196                 return;
197         }
198         lock->l_destroyed = 1;
199
200         list_del_init(&lock->l_export_chain);
201         ldlm_lock_remove_from_lru(lock);
202         class_handle_unhash(&lock->l_handle);
203
204 #if 0
205         /* Wake anyone waiting for this lock */
206         /* FIXME: I should probably add yet another flag, instead of using
207          * l_export to only call this on clients */
208         if (lock->l_export)
209                 class_export_put(lock->l_export);
210         lock->l_export = NULL;
211         if (lock->l_export && lock->l_completion_ast)
212                 lock->l_completion_ast(lock, 0);
213 #endif
214
215         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
216         LDLM_LOCK_PUT(lock);
217         EXIT;
218 }
219
220 /* this is called by portals_handle2object with the handle lock taken */
221 static void lock_handle_addref(void *lock)
222 {
223         LDLM_LOCK_GET((struct ldlm_lock *)lock);
224 }
225
226 /*
227  * usage: pass in a resource on which you have done ldlm_resource_get
228  *        pass in a parent lock on which you have done a ldlm_lock_get
229  *        after return, ldlm_*_put the resource and parent
230  * returns: lock with refcount 1
231  */
232 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
233                                        struct ldlm_resource *resource)
234 {
235         struct ldlm_lock *lock;
236         ENTRY;
237
238         if (resource == NULL)
239                 LBUG();
240
241         OBD_SLAB_ALLOC(lock, ldlm_lock_slab, SLAB_NOFS, sizeof(*lock));
242         if (lock == NULL)
243                 RETURN(NULL);
244
245         lock->l_resource = ldlm_resource_getref(resource);
246
247         atomic_set(&lock->l_refc, 2);
248         INIT_LIST_HEAD(&lock->l_children);
249         INIT_LIST_HEAD(&lock->l_res_link);
250         INIT_LIST_HEAD(&lock->l_lru);
251         INIT_LIST_HEAD(&lock->l_export_chain);
252         INIT_LIST_HEAD(&lock->l_pending_chain);
253         init_waitqueue_head(&lock->l_waitq);
254
255         spin_lock(&resource->lr_namespace->ns_counter_lock);
256         resource->lr_namespace->ns_locks++;
257         spin_unlock(&resource->lr_namespace->ns_counter_lock);
258
259         if (parent != NULL) {
260                 l_lock(&parent->l_resource->lr_namespace->ns_lock);
261                 lock->l_parent = LDLM_LOCK_GET(parent);
262                 list_add(&lock->l_childof, &parent->l_children);
263                 l_unlock(&parent->l_resource->lr_namespace->ns_lock);
264         }
265
266         INIT_LIST_HEAD(&lock->l_handle.h_link);
267         class_handle_hash(&lock->l_handle, lock_handle_addref);
268
269         RETURN(lock);
270 }
271
272 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
273                               struct ldlm_res_id new_resid)
274 {
275         struct ldlm_resource *oldres = lock->l_resource;
276         ENTRY;
277
278         l_lock(&ns->ns_lock);
279         if (memcmp(&new_resid, &lock->l_resource->lr_name,
280                    sizeof(lock->l_resource->lr_name)) == 0) {
281                 /* Nothing to do */
282                 l_unlock(&ns->ns_lock);
283                 RETURN(0);
284         }
285
286         LASSERT(new_resid.name[0] != 0);
287
288         /* This function assumes that the lock isn't on any lists */
289         LASSERT(list_empty(&lock->l_res_link));
290
291         lock->l_resource = ldlm_resource_get(ns, NULL, new_resid,
292                                              lock->l_resource->lr_type, 1);
293         if (lock->l_resource == NULL) {
294                 LBUG();
295                 RETURN(-ENOMEM);
296         }
297
298         /* ...and the flowers are still standing! */
299         ldlm_resource_putref(oldres);
300
301         l_unlock(&ns->ns_lock);
302         RETURN(0);
303 }
304
305 /*
306  *  HANDLES
307  */
308
309 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
310 {
311         lockh->cookie = lock->l_handle.h_cookie;
312 }
313
314 /* if flags: atomically get the lock and set the flags.
315  *           Return NULL if flag already set
316  */
317
318 struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags)
319 {
320         struct ldlm_namespace *ns;
321         struct ldlm_lock *lock = NULL, *retval = NULL;
322         ENTRY;
323
324         LASSERT(handle);
325
326         lock = class_handle2object(handle->cookie);
327         if (lock == NULL)
328                 RETURN(NULL);
329
330         LASSERT(lock->l_resource != NULL);
331         ns = lock->l_resource->lr_namespace;
332         LASSERT(ns != NULL);
333
334         l_lock(&ns->ns_lock);
335
336         /* It's unlikely but possible that someone marked the lock as
337          * destroyed after we did handle2object on it */
338         if (lock->l_destroyed) {
339                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
340                 LDLM_LOCK_PUT(lock);
341                 GOTO(out, retval);
342         }
343
344         if (flags && (lock->l_flags & flags)) {
345                 LDLM_LOCK_PUT(lock);
346                 GOTO(out, retval);
347         }
348
349         if (flags)
350                 lock->l_flags |= flags;
351
352         retval = lock;
353         EXIT;
354  out:
355         l_unlock(&ns->ns_lock);
356         return retval;
357 }
358
359 struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns,
360                                       struct lustre_handle *handle)
361 {
362         struct ldlm_lock *retval = NULL;
363
364         l_lock(&ns->ns_lock);
365         retval = __ldlm_handle2lock(handle, 0);
366         l_unlock(&ns->ns_lock);
367
368         return retval;
369 }
370
371 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
372 {
373         ldlm_res2desc(lock->l_resource, &desc->l_resource);
374         desc->l_req_mode = lock->l_req_mode;
375         desc->l_granted_mode = lock->l_granted_mode;
376         memcpy(&desc->l_policy_data, &lock->l_policy_data,
377                sizeof(desc->l_policy_data));
378         memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
379 }
380
381 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
382                             void *data, int datalen)
383 {
384         struct ldlm_ast_work *w;
385         ENTRY;
386
387         l_lock(&lock->l_resource->lr_namespace->ns_lock);
388         if (new && (lock->l_flags & LDLM_FL_AST_SENT))
389                 GOTO(out, 0);
390
391         CDEBUG(D_OTHER, "lock %p incompatible; sending blocking AST.\n", lock);
392
393         OBD_ALLOC(w, sizeof(*w));
394         if (!w) {
395                 LBUG();
396                 GOTO(out, 0);
397         }
398
399         w->w_data = data;
400         w->w_datalen = datalen;
401         if (new) {
402                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
403                 lock->l_flags |= LDLM_FL_AST_SENT;
404                 /* If the enqueuing client said so, tell the AST recipient to
405                  * discard dirty data, rather than writing back. */
406                 if (new->l_flags & LDLM_AST_DISCARD_DATA)
407                         lock->l_flags |= LDLM_FL_DISCARD_DATA;
408                 w->w_blocking = 1;
409                 ldlm_lock2desc(new, &w->w_desc);
410         }
411
412         w->w_lock = LDLM_LOCK_GET(lock);
413         list_add(&w->w_list, lock->l_resource->lr_tmp);
414         EXIT;
415  out:
416         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
417 }
418
419 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
420 {
421         struct ldlm_lock *lock;
422
423         lock = ldlm_handle2lock(lockh);
424         ldlm_lock_addref_internal(lock, mode);
425         LDLM_LOCK_PUT(lock);
426 }
427
428 /* only called for local locks */
429 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
430 {
431         l_lock(&lock->l_resource->lr_namespace->ns_lock);
432         ldlm_lock_remove_from_lru(lock);
433         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
434                 lock->l_readers++;
435         else
436                 lock->l_writers++;
437         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
438         LDLM_LOCK_GET(lock);
439         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
440 }
441
442 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
443 {
444         struct ldlm_namespace *ns;
445         ENTRY;
446
447         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
448         ns = lock->l_resource->lr_namespace;
449         l_lock(&ns->ns_lock);
450         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) {
451                 LASSERT(lock->l_readers > 0);
452                 lock->l_readers--;
453         } else {
454                 LASSERT(lock->l_writers > 0);
455                 lock->l_writers--;
456         }
457
458         if (lock->l_flags & LDLM_FL_LOCAL &&
459             !lock->l_readers && !lock->l_writers) {
460                 /* If this is a local lock on a server namespace and this was
461                  * the last reference, cancel the lock. */
462                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
463                 lock->l_flags |= LDLM_FL_CBPENDING;
464         }
465
466         if (!lock->l_readers && !lock->l_writers &&
467             (lock->l_flags & LDLM_FL_CBPENDING)) {
468                 /* If we received a blocked AST and this was the last reference,
469                  * run the callback. */
470                 if (!ns->ns_client && lock->l_export)
471                         CERROR("FL_CBPENDING set on non-local lock--just a "
472                                "warning\n");
473
474                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
475                 l_unlock(&ns->ns_lock);
476
477                 l_check_no_ns_lock(ns);
478                 /* FIXME: need a real 'desc' here */
479                 if (lock->l_blocking_ast != NULL)
480                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
481                                              LDLM_CB_BLOCKING);
482         } else if (ns->ns_client && !lock->l_readers && !lock->l_writers) {
483                 /* If this is a client-side namespace and this was the last
484                  * reference, put it on the LRU. */
485                 LASSERT(list_empty(&lock->l_lru));
486                 LASSERT(ns->ns_nr_unused >= 0);
487                 list_add_tail(&lock->l_lru, &ns->ns_unused_list);
488                 ns->ns_nr_unused++;
489                 l_unlock(&ns->ns_lock);
490                 ldlm_cancel_lru(ns);
491         } else {
492                 l_unlock(&ns->ns_lock);
493         }
494
495         LDLM_LOCK_PUT(lock);    /* matches the ldlm_lock_get in addref */
496
497         EXIT;
498 }
499
500 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
501 {
502         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
503         LASSERT(lock != NULL);
504         ldlm_lock_decref_internal(lock, mode);
505         LDLM_LOCK_PUT(lock);
506 }
507
508 /* This will drop a lock reference and mark it for destruction, but will not
509  * necessarily cancel the lock before returning. */
510 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
511 {
512         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
513         ENTRY;
514
515         LASSERT(lock != NULL);
516
517         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
518         l_lock(&lock->l_resource->lr_namespace->ns_lock);
519         lock->l_flags |= LDLM_FL_CBPENDING;
520         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
521         ldlm_lock_decref_internal(lock, mode);
522         LDLM_LOCK_PUT(lock);
523 }
524
525 /* NOTE: called by
526  *  - ldlm_lock_enqueue
527  *  - ldlm_reprocess_queue
528  *  - ldlm_lock_convert
529  */
530 void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen,
531                      int run_ast)
532 {
533         struct ldlm_resource *res = lock->l_resource;
534         ENTRY;
535
536         l_lock(&lock->l_resource->lr_namespace->ns_lock);
537         lock->l_granted_mode = lock->l_req_mode;
538         ldlm_resource_add_lock(res, &res->lr_granted, lock);
539
540         if (lock->l_granted_mode < res->lr_most_restr)
541                 res->lr_most_restr = lock->l_granted_mode;
542
543         if (run_ast && lock->l_completion_ast != NULL)
544                 ldlm_add_ast_work_item(lock, NULL, data, datalen);
545
546         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
547         EXIT;
548 }
549
550 /* returns a referenced lock or NULL.  See the flag descriptions below, in the
551  * comment above ldlm_lock_match */
552 static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
553                                       struct ldlm_extent *extent,
554                                       struct ldlm_lock *old_lock, int flags)
555 {
556         struct ldlm_lock *lock;
557         struct list_head *tmp;
558
559         list_for_each(tmp, queue) {
560                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
561
562                 if (lock == old_lock)
563                         break;
564
565                 /* llite sometimes wants to match locks that will be
566                  * canceled when their users drop, but we allow it to match
567                  * if it passes in CBPENDING and the lock still has users.
568                  * this is generally only going to be used by children 
569                  * whose parents already hold a lock so forward progress
570                  * can still happen. */
571                 if (lock->l_flags & LDLM_FL_CBPENDING &&
572                     !(flags & LDLM_FL_CBPENDING))
573                         continue;
574                 if (lock->l_flags & LDLM_FL_CBPENDING &&
575                     lock->l_readers == 0 && lock->l_writers == 0)
576                         continue;
577
578                 if (lock->l_req_mode != mode)
579                         continue;
580
581                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
582                     (lock->l_policy_data.l_extent.start > extent->start ||
583                      lock->l_policy_data.l_extent.end < extent->end))
584                         continue;
585
586                 if (lock->l_destroyed)
587                         continue;
588
589                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
590                     !(lock->l_flags & LDLM_FL_LOCAL))
591                         continue;
592
593                 ldlm_lock_addref_internal(lock, mode);
594                 return lock;
595         }
596
597         return NULL;
598 }
599
600 /* Can be called in two ways:
601  *
602  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
603  * for a duplicate of.
604  *
605  * Otherwise, all of the fields must be filled in, to match against.
606  *
607  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
608  *     server (ie, connh is NULL)
609  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
610  *     list will be considered
611  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
612  *     to be canceled can still be matched as long as they still have reader
613  *     or writer refernces
614  *
615  * Returns 1 if it finds an already-existing lock that is compatible; in this
616  * case, lockh is filled in with a addref()ed lock
617  */
618 int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
619                     struct ldlm_res_id *res_id, __u32 type, void *cookie,
620                     int cookielen, ldlm_mode_t mode,
621                     struct lustre_handle *lockh)
622 {
623         struct ldlm_resource *res;
624         struct ldlm_lock *lock, *old_lock = NULL;
625         int rc = 0;
626         ENTRY;
627
628         if (ns == NULL) {
629                 old_lock = ldlm_handle2lock(lockh);
630                 LASSERT(old_lock);
631
632                 ns = old_lock->l_resource->lr_namespace;
633                 res_id = &old_lock->l_resource->lr_name;
634                 type = old_lock->l_resource->lr_type;
635                 mode = old_lock->l_req_mode;
636         }
637
638         res = ldlm_resource_get(ns, NULL, *res_id, type, 0);
639         if (res == NULL) {
640                 LASSERT(old_lock == NULL);
641                 RETURN(0);
642         }
643
644         l_lock(&ns->ns_lock);
645
646         lock = search_queue(&res->lr_granted, mode, cookie, old_lock, flags);
647         if (lock != NULL)
648                 GOTO(out, rc = 1);
649         if (flags & LDLM_FL_BLOCK_GRANTED)
650                 GOTO(out, rc = 0);
651         lock = search_queue(&res->lr_converting, mode, cookie, old_lock, flags);
652         if (lock != NULL)
653                 GOTO(out, rc = 1);
654         lock = search_queue(&res->lr_waiting, mode, cookie, old_lock, flags);
655         if (lock != NULL)
656                 GOTO(out, rc = 1);
657
658         EXIT;
659  out:
660         ldlm_resource_putref(res);
661         l_unlock(&ns->ns_lock);
662
663         if (lock) {
664                 ldlm_lock2handle(lock, lockh);
665                 if (lock->l_completion_ast)
666                         lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
667                                                NULL);
668         }
669         if (rc)
670                 LDLM_DEBUG(lock, "matched");
671         else
672                 LDLM_DEBUG_NOLOCK("not matched");
673
674         if (old_lock)
675                 LDLM_LOCK_PUT(old_lock);
676
677         return rc;
678 }
679
680 /* Returns a referenced lock */
681 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
682                                    struct lustre_handle *parent_lock_handle,
683                                    struct ldlm_res_id res_id, __u32 type,
684                                    ldlm_mode_t mode,
685                                    ldlm_blocking_callback blocking,
686                                    ldlm_completion_callback completion,
687                                    void *data)
688 {
689         struct ldlm_resource *res, *parent_res = NULL;
690         struct ldlm_lock *lock, *parent_lock = NULL;
691         ENTRY;
692
693         if (parent_lock_handle) {
694                 parent_lock = ldlm_handle2lock(parent_lock_handle);
695                 if (parent_lock)
696                         parent_res = parent_lock->l_resource;
697         }
698
699         res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
700         if (res == NULL)
701                 RETURN(NULL);
702
703         lock = ldlm_lock_new(parent_lock, res);
704         ldlm_resource_putref(res);
705         if (parent_lock != NULL)
706                 LDLM_LOCK_PUT(parent_lock);
707
708         if (lock == NULL)
709                 RETURN(NULL);
710
711         lock->l_req_mode = mode;
712         lock->l_ast_data = data;
713         lock->l_blocking_ast = blocking;
714         lock->l_completion_ast = completion;
715
716         RETURN(lock);
717 }
718
719 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
720                                struct ldlm_lock **lockp,
721                                void *cookie, int cookie_len, int *flags)
722 {
723         struct ldlm_lock *lock = *lockp;
724         struct ldlm_resource *res = lock->l_resource;
725         int local = res->lr_namespace->ns_client;
726         ldlm_processing_policy policy;
727         ldlm_error_t rc = ELDLM_OK;
728         ENTRY;
729
730         if (res->lr_type != LDLM_PLAIN)
731                 memcpy(&lock->l_policy_data, cookie, cookie_len);
732
733         /* policies are not executed on the client or during replay */
734         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
735             && !local && ldlm_intent_policy_func) {
736                 rc = ldlm_intent_policy_func(ns, lockp, cookie,
737                                              lock->l_req_mode, *flags, NULL);
738                 if (rc == ELDLM_LOCK_REPLACED) {
739                         /* The lock that was returned has already been granted,
740                          * and placed into lockp.  Destroy the old one and our
741                          * work here is done. */
742                         ldlm_lock_destroy(lock);
743                         LDLM_LOCK_PUT(lock);
744                         *flags |= LDLM_FL_LOCK_CHANGED;
745                         RETURN(0);
746                 } else if (rc == ELDLM_LOCK_ABORTED ||
747                            (rc == 0 && (*flags & LDLM_FL_INTENT_ONLY))) {
748                         ldlm_lock_destroy(lock);
749                         RETURN(rc);
750                 }
751                 LASSERT(rc == ELDLM_OK);
752         }
753
754         l_lock(&ns->ns_lock);
755         if (local && lock->l_req_mode == lock->l_granted_mode) {
756                 /* The server returned a blocked lock, but it was granted before
757                  * we got a chance to actually enqueue it.  We don't need to do
758                  * anything else. */
759                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
760                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
761                 GOTO(out, ELDLM_OK);
762         }
763
764         /* Some flags from the enqueue want to make it into the AST, via the
765          * lock's l_flags. */
766         lock->l_flags |= (*flags & LDLM_AST_DISCARD_DATA);
767
768         /* This distinction between local lock trees is very important; a client
769          * namespace only has information about locks taken by that client, and
770          * thus doesn't have enough information to decide for itself if it can
771          * be granted (below).  In this case, we do exactly what the server
772          * tells us to do, as dictated by the 'flags'.
773          *
774          * We do exactly the same thing during recovery, when the server is
775          * more or less trusting the clients not to lie.
776          *
777          * FIXME (bug 268): Detect obvious lies by checking compatibility in
778          * granted/converting queues. */
779         ldlm_resource_unlink_lock(lock);
780         if (local) {
781                 if (*flags & LDLM_FL_BLOCK_CONV)
782                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
783                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
784                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
785                 else
786                         ldlm_grant_lock(lock, NULL, 0, 0);
787                 GOTO(out, ELDLM_OK);
788         } else if (*flags & LDLM_FL_REPLAY) {
789                 if (*flags & LDLM_FL_BLOCK_CONV) {
790                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
791                         GOTO(out, ELDLM_OK);
792                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
793                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
794                         GOTO(out, ELDLM_OK);
795                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
796                         ldlm_grant_lock(lock, NULL, 0, 0);
797                         GOTO(out, ELDLM_OK);
798                 }
799                 /* If no flags, fall through to normal enqueue path. */
800         }
801
802         policy = ldlm_processing_policy_table[res->lr_type];
803         policy(lock, flags, 1, &rc);
804         EXIT;
805 out:
806         l_unlock(&ns->ns_lock);
807         return rc;
808 }
809
810 /* Must be called with namespace taken: queue is waiting or converting. */
811 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue)
812 {
813         struct list_head *tmp, *pos;
814         ldlm_processing_policy policy;
815         int flags;
816         int rc = LDLM_ITER_CONTINUE;
817         ldlm_error_t err;
818         ENTRY;
819
820         policy = ldlm_processing_policy_table[res->lr_type];
821         LASSERT(policy);
822
823         list_for_each_safe(tmp, pos, queue) {
824                 struct ldlm_lock *pending;
825                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
826
827                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
828
829                 flags = 0;
830                 rc = policy(pending, &flags, 0, &err);
831                 if (rc != LDLM_ITER_CONTINUE)
832                         break;
833         }
834
835         RETURN(rc);
836 }
837
838 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list)
839 {
840         struct list_head *tmp, *pos;
841         int rc, retval = 0;
842         ENTRY;
843
844         l_check_no_ns_lock(ns);
845
846         list_for_each_safe(tmp, pos, rpc_list) {
847                 struct ldlm_ast_work *w =
848                         list_entry(tmp, struct ldlm_ast_work, w_list);
849
850                 /* It's possible to receive a completion AST before we've set
851                  * the l_completion_ast pointer: either because the AST arrived
852                  * before the reply, or simply because there's a small race
853                  * window between receiving the reply and finishing the local
854                  * enqueue. (bug 842)
855                  *
856                  * This can't happen with the blocking_ast, however, because we
857                  * will never call the local blocking_ast until we drop our
858                  * reader/writer reference, which we won't do until we get the
859                  * reply and finish enqueueing. */
860                 LASSERT(w->w_lock != NULL);
861                 if (w->w_blocking) {
862                         LASSERT(w->w_lock->l_blocking_ast != NULL);
863                         rc = w->w_lock->l_blocking_ast
864                                 (w->w_lock, &w->w_desc, w->w_data,
865                                  LDLM_CB_BLOCKING);
866                 } else if (w->w_lock->l_completion_ast != NULL) {
867                         LASSERT(w->w_lock->l_completion_ast != NULL);
868                         rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags,
869                                                          w->w_data);
870                 } else {
871                         rc = 0;
872                 }
873                 if (rc == -ERESTART)
874                         retval = rc;
875                 else if (rc)
876                         CDEBUG(D_DLMTRACE, "Failed AST - should clean & "
877                                "disconnect client\n");
878                 LDLM_LOCK_PUT(w->w_lock);
879                 list_del(&w->w_list);
880                 OBD_FREE(w, sizeof(*w));
881         }
882         RETURN(retval);
883 }
884
885 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
886 {
887         ldlm_reprocess_all(res);
888         return LDLM_ITER_CONTINUE;
889 }
890
891 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
892 {
893         int i, rc;
894
895         l_lock(&ns->ns_lock);
896         for (i = 0; i < RES_HASH_SIZE; i++) {
897                 struct list_head *tmp, *next;
898                 list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
899                         struct ldlm_resource *res =
900                                 list_entry(tmp, struct ldlm_resource, lr_hash);
901
902                         ldlm_resource_getref(res);
903                         l_unlock(&ns->ns_lock);
904                         rc = reprocess_one_queue(res, NULL);
905                         l_lock(&ns->ns_lock);
906                         next = tmp->next;
907                         ldlm_resource_putref(res);
908                         if (rc == LDLM_ITER_STOP)
909                                 GOTO(out, rc);
910                 }
911         }
912  out:
913         l_unlock(&ns->ns_lock);
914         EXIT;
915 }
916
917 void ldlm_reprocess_all(struct ldlm_resource *res)
918 {
919         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
920         int rc;
921         ENTRY;
922
923         /* Local lock trees don't get reprocessed. */
924         if (res->lr_namespace->ns_client) {
925                 EXIT;
926                 return;
927         }
928
929  restart:
930         l_lock(&res->lr_namespace->ns_lock);
931         res->lr_tmp = &rpc_list;
932
933         rc = ldlm_reprocess_queue(res, &res->lr_converting);
934         if (rc == LDLM_ITER_CONTINUE)
935                 ldlm_reprocess_queue(res, &res->lr_waiting);
936
937         res->lr_tmp = NULL;
938         l_unlock(&res->lr_namespace->ns_lock);
939
940         rc = ldlm_run_ast_work(res->lr_namespace, &rpc_list);
941         if (rc == -ERESTART) {
942                 LASSERT(list_empty(&rpc_list));
943                 goto restart;
944         }
945         EXIT;
946 }
947
948 void ldlm_cancel_callback(struct ldlm_lock *lock)
949 {
950         l_lock(&lock->l_resource->lr_namespace->ns_lock);
951         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
952                 lock->l_flags |= LDLM_FL_CANCEL;
953                 if (lock->l_blocking_ast) {
954                         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
955                         // l_check_no_ns_lock(lock->l_resource->lr_namespace);
956                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
957                                              LDLM_CB_CANCELING);
958                         return;
959                 } else {
960                         LDLM_DEBUG(lock, "no blocking ast");
961                 }
962         }
963         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
964 }
965
966 void ldlm_lock_cancel(struct ldlm_lock *lock)
967 {
968         struct ldlm_resource *res;
969         struct ldlm_namespace *ns;
970         ENTRY;
971
972         /* There's no race between calling this and taking the ns lock below;
973          * a lock can only be put on the waiting list once, because it can only
974          * issue a blocking AST once. */
975         ldlm_del_waiting_lock(lock);
976
977         res = lock->l_resource;
978         ns = res->lr_namespace;
979
980         l_lock(&ns->ns_lock);
981         /* Please do not, no matter how tempting, remove this LBUG without
982          * talking to me first. -phik */
983         if (lock->l_readers || lock->l_writers) {
984                 LDLM_DEBUG(lock, "lock still has references");
985                 ldlm_lock_dump(D_OTHER, lock, 0);
986                 LBUG();
987         }
988
989         ldlm_cancel_callback(lock);
990
991         ldlm_resource_unlink_lock(lock);
992         ldlm_lock_destroy(lock);
993         l_unlock(&ns->ns_lock);
994         EXIT;
995 }
996
997 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
998 {
999         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1000         ENTRY;
1001
1002         if (lock == NULL)
1003                 RETURN(-EINVAL);
1004
1005         lock->l_ast_data = data;
1006         LDLM_LOCK_PUT(lock);
1007         RETURN(0);
1008 }
1009
1010 void ldlm_cancel_locks_for_export(struct obd_export *exp)
1011 {
1012         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
1013         struct ldlm_lock *lock;
1014         struct ldlm_resource *res;
1015
1016         l_lock(&ns->ns_lock);
1017         while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) {
1018                 lock = list_entry(exp->exp_ldlm_data.led_held_locks.next,
1019                                   struct ldlm_lock, l_export_chain);
1020                 res = ldlm_resource_getref(lock->l_resource);
1021                 LDLM_DEBUG(lock, "export %p", exp);
1022                 ldlm_lock_cancel(lock);
1023                 l_unlock(&ns->ns_lock);
1024                 ldlm_reprocess_all(res);
1025                 ldlm_resource_putref(res);
1026                 l_lock(&ns->ns_lock);
1027         }
1028         l_unlock(&ns->ns_lock);
1029 }
1030
1031 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
1032                                         int *flags)
1033 {
1034         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1035         struct ldlm_resource *res;
1036         struct ldlm_namespace *ns;
1037         int granted = 0;
1038         ENTRY;
1039
1040         LBUG();
1041
1042         res = lock->l_resource;
1043         ns = res->lr_namespace;
1044
1045         l_lock(&ns->ns_lock);
1046
1047         lock->l_req_mode = new_mode;
1048         ldlm_resource_unlink_lock(lock);
1049
1050         /* If this is a local resource, put it on the appropriate list. */
1051         if (res->lr_namespace->ns_client) {
1052                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
1053                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1054                 } else {
1055                         /* This should never happen, because of the way the
1056                          * server handles conversions. */
1057                         LBUG();
1058
1059                         res->lr_tmp = &rpc_list;
1060                         ldlm_grant_lock(lock, NULL, 0, 0);
1061                         res->lr_tmp = NULL;
1062                         granted = 1;
1063                         /* FIXME: completion handling not with ns_lock held ! */
1064                         if (lock->l_completion_ast)
1065                                 lock->l_completion_ast(lock, 0, NULL);
1066                 }
1067         } else {
1068                 /* FIXME: We should try the conversion right away and possibly
1069                  * return success without the need for an extra AST */
1070                 ldlm_resource_add_lock(res, &res->lr_converting, lock);
1071                 *flags |= LDLM_FL_BLOCK_CONV;
1072         }
1073
1074         l_unlock(&ns->ns_lock);
1075
1076         if (granted)
1077                 ldlm_run_ast_work(ns, &rpc_list);
1078         RETURN(res);
1079 }
1080
1081 void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos)
1082 {
1083         char str[PTL_NALFMT_SIZE];
1084         struct obd_device *obd = NULL;
1085
1086         if (!((portal_debug | D_ERROR) & level))
1087                 return;
1088
1089         if (RES_VERSION_SIZE != 4)
1090                 LBUG();
1091
1092         if (!lock) {
1093                 CDEBUG(level, "  NULL LDLM lock\n");
1094                 return;
1095         }
1096
1097         CDEBUG(level,
1098                "  -- Lock dump: %p/"LPX64" (%x %x %x %x) (rc: %d) (pos: %d)\n",
1099                lock, lock->l_handle.h_cookie, lock->l_version[0],
1100                lock->l_version[1], lock->l_version[2], lock->l_version[3],
1101                atomic_read(&lock->l_refc), pos);
1102         if (lock->l_conn_export != NULL)
1103                 obd = lock->l_conn_export->exp_obd;
1104         if (lock->l_export && lock->l_export->exp_connection) {
1105                 CDEBUG(level, "  Node: NID "LPX64" (%s) on %s (rhandle: "LPX64")\n",
1106                        lock->l_export->exp_connection->c_peer.peer_nid,
1107                        portals_nid2str(lock->l_export->exp_connection->c_peer.peer_ni->pni_number,
1108                                        lock->l_export->exp_connection->c_peer.peer_nid, str),
1109                        lock->l_export->exp_connection->c_peer.peer_ni->pni_name,
1110                        lock->l_remote_handle.cookie);
1111         } else if (obd == NULL) {
1112                 CDEBUG(level, "  Node: local\n");
1113         } else {
1114                 struct obd_import *imp = obd->u.cli.cl_import;
1115                 CDEBUG(level, "  Node: NID "LPX64" (%s) on %s (rhandle: "LPX64")\n",
1116                        imp->imp_connection->c_peer.peer_nid,
1117                        portals_nid2str(imp->imp_connection->c_peer.peer_ni->pni_number,
1118                                        imp->imp_connection->c_peer.peer_nid, str),
1119                        imp->imp_connection->c_peer.peer_ni->pni_name,
1120                        lock->l_remote_handle.cookie);
1121         }
1122         CDEBUG(level, "  Resource: %p ("LPU64"/"LPU64")\n", lock->l_resource,
1123                lock->l_resource->lr_name.name[0],
1124                lock->l_resource->lr_name.name[1]);
1125         CDEBUG(level, "  Req mode: %d, grant mode: %d, rc: %u, read: %d, "
1126                "write: %d\n", (int)lock->l_req_mode, (int)lock->l_granted_mode,
1127                atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers);
1128         if (lock->l_resource->lr_type == LDLM_EXTENT)
1129                 CDEBUG(level, "  Extent: "LPU64" -> "LPU64"\n",
1130                        lock->l_policy_data.l_extent.start,
1131                        lock->l_policy_data.l_extent.end);
1132         else if (lock->l_resource->lr_type == LDLM_FLOCK)
1133                 CDEBUG(level, "  Pid: %d Extent: "LPU64" -> "LPU64"\n",
1134                        lock->l_policy_data.l_flock.pid,
1135                        lock->l_policy_data.l_flock.start,
1136                        lock->l_policy_data.l_flock.end);
1137 }
1138
1139 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
1140 {
1141         struct ldlm_lock *lock;
1142
1143         lock = ldlm_handle2lock(lockh);
1144         if (lock == NULL)
1145                 return;
1146
1147         ldlm_lock_dump(D_OTHER, lock, 0);
1148
1149         LDLM_LOCK_PUT(lock);
1150 }