Whamcloud - gitweb
the first try at lock LRU
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LDLM
25
26 #include <linux/slab.h>
27 #include <linux/module.h>
28 #include <linux/random.h>
29 #include <linux/lustre_dlm.h>
30 #include <linux/lustre_mds.h>
31 #include <linux/obd_class.h>
32
33 //struct lustre_lock ldlm_everything_lock;
34
35 /* lock types */
36 char *ldlm_lockname[] = {
37         [0] "--",
38         [LCK_EX] "EX",
39         [LCK_PW] "PW",
40         [LCK_PR] "PR",
41         [LCK_CW] "CW",
42         [LCK_CR] "CR",
43         [LCK_NL] "NL"
44 };
45 char *ldlm_typename[] = {
46         [LDLM_PLAIN] "PLN",
47         [LDLM_EXTENT] "EXT",
48 };
49
50 char *ldlm_it2str(int it)
51 {
52         switch (it) {
53         case IT_OPEN:
54                 return "open";
55         case IT_CREAT:
56                 return "creat";
57         case (IT_OPEN | IT_CREAT):
58                 return "open|creat";
59         case IT_MKDIR:
60                 return "mkdir";
61         case IT_LINK:
62                 return "link";
63         case IT_LINK2:
64                 return "link2";
65         case IT_SYMLINK:
66                 return "symlink";
67         case IT_UNLINK:
68                 return "unlink";
69         case IT_RMDIR:
70                 return "rmdir";
71         case IT_RENAME:
72                 return "rename";
73         case IT_RENAME2:
74                 return "rename2";
75         case IT_READDIR:
76                 return "readdir";
77         case IT_GETATTR:
78                 return "getattr";
79         case IT_SETATTR:
80                 return "setattr";
81         case IT_READLINK:
82                 return "readlink";
83         case IT_MKNOD:
84                 return "mknod";
85         case IT_LOOKUP:
86                 return "lookup";
87         default:
88                 CERROR("Unknown intent %d\n", it);
89                 return "UNKNOWN";
90         }
91 }
92
93 extern kmem_cache_t *ldlm_lock_slab;
94
95 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
96
97 ldlm_res_compat ldlm_res_compat_table[] = {
98         [LDLM_PLAIN] ldlm_plain_compat,
99         [LDLM_EXTENT] ldlm_extent_compat,
100 };
101
102 static ldlm_res_policy ldlm_intent_policy_func;
103
104 static int ldlm_plain_policy(struct ldlm_lock *lock, void *req_cookie,
105                              ldlm_mode_t mode, int flags, void *data)
106 {
107         if ((flags & LDLM_FL_HAS_INTENT) && ldlm_intent_policy_func) {
108                 return ldlm_intent_policy_func(lock, req_cookie, mode, flags, 
109                                                data);
110         }
111
112         return ELDLM_OK;
113 }
114
115 ldlm_res_policy ldlm_res_policy_table[] = {
116         [LDLM_PLAIN] ldlm_plain_policy,
117         [LDLM_EXTENT] ldlm_extent_policy,
118 };
119
120 void ldlm_register_intent(ldlm_res_policy arg)
121 {
122         ldlm_intent_policy_func = arg;
123 }
124
125 void ldlm_unregister_intent(void)
126 {
127         ldlm_intent_policy_func = NULL;
128 }
129
130 /*
131  * REFCOUNTED LOCK OBJECTS
132  */
133
134
135 /*
136  * Lock refcounts, during creation:
137  *   - one special one for allocation, dec'd only once in destroy
138  *   - one for being a lock that's in-use
139  *   - one for the addref associated with a new lock
140  */
141 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
142 {
143         l_lock(&lock->l_resource->lr_namespace->ns_lock);
144         lock->l_refc++;
145         ldlm_resource_getref(lock->l_resource);
146         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
147         return lock;
148 }
149
150 void ldlm_lock_put(struct ldlm_lock *lock)
151 {
152         struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
153         ENTRY;
154
155         l_lock(&ns->ns_lock);
156         lock->l_refc--;
157         //LDLM_DEBUG(lock, "after refc--");
158         if (lock->l_refc < 0)
159                 LBUG();
160
161         if (ldlm_resource_put(lock->l_resource)) {
162                 LASSERT(lock->l_refc == 0);
163                 lock->l_resource = NULL;
164         }
165         if (lock->l_parent)
166                 LDLM_LOCK_PUT(lock->l_parent);
167
168         if (lock->l_refc == 0 && (lock->l_flags & LDLM_FL_DESTROYED)) {
169                 l_unlock(&ns->ns_lock);
170                 LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing");
171
172                 //spin_lock(&ldlm_handle_lock);
173                 spin_lock(&ns->ns_counter_lock);
174                 ns->ns_locks--;
175                 spin_unlock(&ns->ns_counter_lock);
176
177                 lock->l_resource = NULL;
178                 lock->l_random = DEAD_HANDLE_MAGIC;
179                 if (lock->l_export && lock->l_export->exp_connection)
180                         ptlrpc_put_connection(lock->l_export->exp_connection);
181                 kmem_cache_free(ldlm_lock_slab, lock);
182                 //spin_unlock(&ldlm_handle_lock);
183                 CDEBUG(D_MALLOC, "kfreed 'lock': %d at %p (tot 0).\n",
184                        sizeof(*lock), lock);
185         } else
186                 l_unlock(&ns->ns_lock);
187
188         EXIT;
189 }
190
191 void ldlm_lock_destroy(struct ldlm_lock *lock)
192 {
193         ENTRY;
194         l_lock(&lock->l_resource->lr_namespace->ns_lock);
195
196         if (!list_empty(&lock->l_children)) {
197                 LDLM_DEBUG(lock, "still has children (%p)!",
198                            lock->l_children.next);
199                 ldlm_lock_dump(lock);
200                 LBUG();
201         }
202         if (lock->l_readers || lock->l_writers) {
203                 LDLM_DEBUG(lock, "lock still has references");
204                 ldlm_lock_dump(lock);
205         }
206
207         if (!list_empty(&lock->l_res_link)) {
208                 ldlm_lock_dump(lock);
209                 LBUG();
210         }
211
212         if (lock->l_flags & LDLM_FL_DESTROYED) {
213                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
214                 EXIT;
215                 return;
216         }
217
218         list_del_init(&lock->l_lru);
219         list_del(&lock->l_export_chain);
220         lock->l_export = NULL;
221         lock->l_flags |= LDLM_FL_DESTROYED;
222
223         /* Wake anyone waiting for this lock */
224         /* FIXME: I should probably add yet another flag, instead of using
225          * l_export to only call this on clients */
226         if (lock->l_export && lock->l_completion_ast)
227                 lock->l_completion_ast(lock, 0);
228
229         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
230         LDLM_LOCK_PUT(lock);
231         EXIT;
232 }
233
234 /*
235    usage: pass in a resource on which you have done get
236           pass in a parent lock on which you have done a get
237           do not put the resource or the parent
238    returns: lock with refcount 1
239 */
240 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
241                                        struct ldlm_resource *resource)
242 {
243         struct ldlm_lock *lock;
244         ENTRY;
245
246         if (resource == NULL)
247                 LBUG();
248
249         lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
250         if (lock == NULL)
251                 RETURN(NULL);
252
253         memset(lock, 0, sizeof(*lock));
254         get_random_bytes(&lock->l_random, sizeof(__u64));
255
256         lock->l_resource = resource;
257         /* this refcount matches the one of the resource passed
258            in which is not being put away */
259         lock->l_refc = 1;
260         INIT_LIST_HEAD(&lock->l_children);
261         INIT_LIST_HEAD(&lock->l_res_link);
262         INIT_LIST_HEAD(&lock->l_lru);
263         INIT_LIST_HEAD(&lock->l_export_chain);
264         INIT_LIST_HEAD(&lock->l_pending_chain);
265         init_waitqueue_head(&lock->l_waitq);
266
267         spin_lock(&resource->lr_namespace->ns_counter_lock);
268         resource->lr_namespace->ns_locks++;
269         spin_unlock(&resource->lr_namespace->ns_counter_lock);
270
271         if (parent != NULL) {
272                 l_lock(&parent->l_resource->lr_namespace->ns_lock);
273                 lock->l_parent = parent;
274                 list_add(&lock->l_childof, &parent->l_children);
275                 l_unlock(&parent->l_resource->lr_namespace->ns_lock);
276         }
277
278         CDEBUG(D_MALLOC, "kmalloced 'lock': %d at "
279                "%p (tot %d).\n", sizeof(*lock), lock, 1);
280         /* this is the extra refcount, to prevent the lock from evaporating */
281         LDLM_LOCK_GET(lock);
282         RETURN(lock);
283 }
284
285 int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3])
286 {
287         struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
288         struct ldlm_resource *oldres = lock->l_resource;
289         int type, i;
290         ENTRY;
291
292         l_lock(&ns->ns_lock);
293         if (memcmp(new_resid, lock->l_resource->lr_name,
294                    sizeof(lock->l_resource->lr_name)) == 0) {
295                 /* Nothing to do */
296                 l_unlock(&ns->ns_lock);
297                 RETURN(0);
298         }
299
300         type = lock->l_resource->lr_type;
301         if (new_resid[0] == 0)
302                 LBUG();
303         lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, type, 1);
304         if (lock->l_resource == NULL) {
305                 LBUG();
306                 RETURN(-ENOMEM);
307         }
308
309         /* move references over */
310         for (i = 0; i < lock->l_refc; i++) {
311                 int rc;
312                 ldlm_resource_getref(lock->l_resource);
313                 rc = ldlm_resource_put(oldres);
314                 if (rc == 1 && i != lock->l_refc - 1)
315                         LBUG();
316         }
317         /* compensate for the initial get above.. */
318         ldlm_resource_put(lock->l_resource);
319
320         l_unlock(&ns->ns_lock);
321         RETURN(0);
322 }
323
324 /*
325  *  HANDLES
326  */
327
328 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
329 {
330         lockh->addr = (__u64) (unsigned long)lock;
331         lockh->cookie = lock->l_random;
332 }
333
334 struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle,
335                                      int strict)
336 {
337         struct ldlm_lock *lock = NULL, *retval = NULL;
338         ENTRY;
339
340         if (!handle || !handle->addr)
341                 RETURN(NULL);
342
343         //spin_lock(&ldlm_handle_lock);
344         lock = (struct ldlm_lock *)(unsigned long)(handle->addr);
345         if (!kmem_cache_validate(ldlm_lock_slab, (void *)lock)) {
346                 CERROR("bogus lock %p\n", lock);
347                 GOTO(out2, retval);
348         }
349
350         if (lock->l_random != handle->cookie) {
351                 CERROR("bogus cookie: lock %p has "LPX64" vs. handle "LPX64"\n",
352                        lock, lock->l_random, handle->cookie);
353                 GOTO(out2, NULL);
354         }
355         if (!lock->l_resource) {
356                 CERROR("trying to lock bogus resource: lock %p\n", lock);
357                 LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
358                 GOTO(out2, retval);
359         }
360         if (!lock->l_resource->lr_namespace) {
361                 CERROR("trying to lock bogus namespace: lock %p\n", lock);
362                 LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
363                 GOTO(out2, retval);
364         }
365
366         l_lock(&lock->l_resource->lr_namespace->ns_lock);
367         if (strict && lock->l_flags & LDLM_FL_DESTROYED) {
368                 CERROR("lock already destroyed: lock %p\n", lock);
369                 LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
370                 GOTO(out, NULL);
371         }
372
373         retval = LDLM_LOCK_GET(lock);
374         if (!retval)
375                 CERROR("lock disappeared below us!!! %p\n", lock);
376         EXIT;
377  out:
378         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
379  out2:
380         //spin_unlock(&ldlm_handle_lock);
381         return retval;
382 }
383
384 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
385 {
386         return lockmode_compat(a->l_req_mode, b->l_req_mode);
387 }
388
389 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
390 {
391         ldlm_res2desc(lock->l_resource, &desc->l_resource);
392         desc->l_req_mode = lock->l_req_mode;
393         desc->l_granted_mode = lock->l_granted_mode;
394         memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
395         memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
396 }
397
398 static void ldlm_add_ast_work_item(struct ldlm_lock *lock,
399                                    struct ldlm_lock *new)
400 {
401         struct ldlm_ast_work *w;
402         ENTRY;
403
404         l_lock(&lock->l_resource->lr_namespace->ns_lock);
405         if (new && (lock->l_flags & LDLM_FL_AST_SENT))
406                 GOTO(out, 0);
407
408         OBD_ALLOC(w, sizeof(*w));
409         if (!w) {
410                 LBUG();
411                 GOTO(out, 0);
412         }
413
414         if (new) {
415                 lock->l_flags |= LDLM_FL_AST_SENT;
416                 w->w_blocking = 1;
417                 ldlm_lock2desc(new, &w->w_desc);
418         }
419
420         w->w_lock = LDLM_LOCK_GET(lock);
421         list_add(&w->w_list, lock->l_resource->lr_tmp);
422       out:
423         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
424         return;
425 }
426
427 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
428 {
429         struct ldlm_lock *lock;
430
431         lock = ldlm_handle2lock(lockh);
432         ldlm_lock_addref_internal(lock, mode);
433         LDLM_LOCK_PUT(lock);
434 }
435
436 /* only called for local locks */
437 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
438 {
439         l_lock(&lock->l_resource->lr_namespace->ns_lock);
440
441         if (!list_empty(&lock->l_lru)) { 
442                 list_del_init(&lock->l_lru);
443                 lock->l_resource->lr_namespace->ns_nr_unused--;
444                 LASSERT(lock->l_resource->lr_namespace->ns_nr_unused >= 0);
445         }
446
447         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
448                 lock->l_readers++;
449         else
450                 lock->l_writers++;
451         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
452         LDLM_LOCK_GET(lock);
453         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
454 }
455
456 /* Args: unlocked lock */
457 int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
458                                     __u64 *res_id, int flags);
459
460 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
461 {
462         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
463         struct ldlm_namespace *ns;
464         ENTRY;
465
466         if (lock == NULL)
467                 LBUG();
468
469         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
470         ns = lock->l_resource->lr_namespace;
471         l_lock(&lock->l_resource->lr_namespace->ns_lock);
472         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
473                 lock->l_readers--;
474         else
475                 lock->l_writers--;
476
477         /* If we received a blocked AST and this was the last reference,
478          * run the callback. */
479         if (!lock->l_readers && !lock->l_writers &&
480             (lock->l_flags & LDLM_FL_CBPENDING)) {
481                 if (!lock->l_resource->lr_namespace->ns_client &&
482                     lock->l_export)
483                         CERROR("FL_CBPENDING set on non-local lock--just a "
484                                "warning\n");
485
486                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
487                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
488
489                 /* FIXME: need a real 'desc' here */
490                 lock->l_blocking_ast(lock, NULL, lock->l_data,
491                                      lock->l_data_len, LDLM_CB_BLOCKING);
492         } else if (!lock->l_readers && !lock->l_writers) {
493                 LASSERT(list_empty(&lock->l_lru));
494                 LASSERT(ns->ns_nr_unused >= 0);
495                 list_add_tail(&lock->l_lru, &ns->ns_unused_list);
496                 ns->ns_nr_unused++;
497                 if (ns->ns_client && ns->ns_nr_unused >= ns->ns_max_unused) {
498                         CDEBUG(D_DLMTRACE, "%d unused (max %d), cancelling "
499                                "LRU\n", ns->ns_nr_unused, ns->ns_max_unused);
500                         ldlm_cli_cancel_unused_resource
501                                 (ns, lock->l_resource->lr_name, LDLM_FL_REDUCE);
502                 }
503                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
504         } else
505                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
506
507         LDLM_LOCK_PUT(lock);    /* matches the ldlm_lock_get in addref */
508         LDLM_LOCK_PUT(lock);    /* matches the handle2lock above */
509
510         EXIT;
511 }
512
513 static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs,
514                                  struct list_head *queue)
515 {
516         struct list_head *tmp, *pos;
517         int rc = 1;
518
519         list_for_each_safe(tmp, pos, queue) {
520                 struct ldlm_lock *child;
521                 ldlm_res_compat compat;
522
523                 child = list_entry(tmp, struct ldlm_lock, l_res_link);
524                 if (lock == child)
525                         continue;
526
527                 compat = ldlm_res_compat_table[child->l_resource->lr_type];
528                 if (compat && compat(child, lock)) {
529                         CDEBUG(D_OTHER, "compat function succeded, next.\n");
530                         continue;
531                 }
532                 if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
533                         CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
534                         continue;
535                 }
536
537                 rc = 0;
538
539                 if (send_cbs && child->l_blocking_ast != NULL) {
540                         CDEBUG(D_OTHER, "lock %p incompatible; sending "
541                                "blocking AST.\n", child);
542                         ldlm_add_ast_work_item(child, lock);
543                 }
544         }
545
546         return rc;
547 }
548
549 static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
550 {
551         int rc;
552         ENTRY;
553
554         l_lock(&lock->l_resource->lr_namespace->ns_lock);
555         rc = ldlm_lock_compat_list(lock, send_cbs,
556                                    &lock->l_resource->lr_granted);
557         /* FIXME: should we be sending ASTs to converting? */
558         if (rc)
559                 rc = ldlm_lock_compat_list
560                         (lock, send_cbs, &lock->l_resource->lr_converting);
561
562         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
563         RETURN(rc);
564 }
565
566 /* NOTE: called by
567    - ldlm_handle_enqueuque - resource
568 */
569 void ldlm_grant_lock(struct ldlm_lock *lock)
570 {
571         struct ldlm_resource *res = lock->l_resource;
572         ENTRY;
573
574         l_lock(&lock->l_resource->lr_namespace->ns_lock);
575         ldlm_resource_add_lock(res, &res->lr_granted, lock);
576         lock->l_granted_mode = lock->l_req_mode;
577
578         if (lock->l_granted_mode < res->lr_most_restr)
579                 res->lr_most_restr = lock->l_granted_mode;
580
581         if (lock->l_completion_ast) {
582                 ldlm_add_ast_work_item(lock, NULL);
583         }
584         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
585         EXIT;
586 }
587
588 /* returns a referenced lock or NULL */
589 static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
590                                       struct ldlm_extent *extent,
591                                       struct ldlm_lock *old_lock)
592 {
593         struct ldlm_lock *lock;
594         struct list_head *tmp;
595
596         list_for_each(tmp, queue) {
597                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
598
599                 if (lock == old_lock)
600                         continue;
601
602                 if (lock->l_flags & (LDLM_FL_CBPENDING | LDLM_FL_DESTROYED))
603                         continue;
604
605                 /* lock_convert() takes the resource lock, so we're sure that
606                  * req_mode and lr_type won't change beneath us */
607                 if (lock->l_req_mode != mode)
608                         continue;
609
610                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
611                     (lock->l_extent.start > extent->start ||
612                      lock->l_extent.end < extent->end))
613                         continue;
614
615                 ldlm_lock_addref_internal(lock, mode);
616                 return lock;
617         }
618
619         return NULL;
620 }
621
622 /* Can be called in two ways:
623  *
624  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
625  * for a duplicate of.
626  *
627  * Otherwise, all of the fields must be filled in, to match against.
628  *
629  * Returns 1 if it finds an already-existing lock that is compatible; in this
630  * case, lockh is filled in with a addref()ed lock
631  */
632 int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
633                     void *cookie, int cookielen, ldlm_mode_t mode,
634                     struct lustre_handle *lockh)
635 {
636         struct ldlm_resource *res;
637         struct ldlm_lock *lock, *old_lock = NULL;
638         int rc = 0;
639         ENTRY;
640
641         if (ns == NULL) {
642                 old_lock = ldlm_handle2lock(lockh);
643                 LASSERT(old_lock);
644
645                 ns = old_lock->l_resource->lr_namespace;
646                 res_id = old_lock->l_resource->lr_name;
647                 type = old_lock->l_resource->lr_type;
648                 mode = old_lock->l_req_mode;
649         }
650
651         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
652         if (res == NULL) {
653                 LASSERT(old_lock == NULL);
654                 RETURN(0);
655         }
656
657         ns = res->lr_namespace;
658         l_lock(&ns->ns_lock);
659
660         if ((lock = search_queue(&res->lr_granted, mode, cookie, old_lock)))
661                 GOTO(out, rc = 1);
662         if ((lock = search_queue(&res->lr_converting, mode, cookie, old_lock)))
663                 GOTO(out, rc = 1);
664         if ((lock = search_queue(&res->lr_waiting, mode, cookie, old_lock)))
665                 GOTO(out, rc = 1);
666
667         EXIT;
668        out:
669         ldlm_resource_put(res);
670         l_unlock(&ns->ns_lock);
671
672         if (lock) {
673                 ldlm_lock2handle(lock, lockh);
674                 if (lock->l_completion_ast)
675                         lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC);
676         }
677         if (rc)
678                 LDLM_DEBUG(lock, "matched");
679         else
680                 LDLM_DEBUG_NOLOCK("not matched");
681
682         if (old_lock)
683                 LDLM_LOCK_PUT(old_lock);
684
685         return rc;
686 }
687
688 /* Returns a referenced lock */
689 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
690                                    struct lustre_handle *parent_lock_handle,
691                                    __u64 * res_id, __u32 type,
692                                    ldlm_mode_t mode, void *data, __u32 data_len)
693 {
694         struct ldlm_resource *res, *parent_res = NULL;
695         struct ldlm_lock *lock, *parent_lock;
696
697         parent_lock = ldlm_handle2lock(parent_lock_handle);
698         if (parent_lock)
699                 parent_res = parent_lock->l_resource;
700
701         res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
702         if (res == NULL)
703                 RETURN(NULL);
704
705         lock = ldlm_lock_new(parent_lock, res);
706         if (lock == NULL) {
707                 ldlm_resource_put(res);
708                 RETURN(NULL);
709         }
710
711         lock->l_req_mode = mode;
712         lock->l_data = data;
713         lock->l_data_len = data_len;
714
715         return lock;
716 }
717
718 /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
719 ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
720                                void *cookie, int cookie_len,
721                                int *flags,
722                                ldlm_completion_callback completion,
723                                ldlm_blocking_callback blocking)
724 {
725         struct ldlm_resource *res;
726         int local;
727         ldlm_res_policy policy;
728         ENTRY;
729
730         res = lock->l_resource;
731         lock->l_blocking_ast = blocking;
732
733         if (res->lr_type == LDLM_EXTENT)
734                 memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
735
736         /* policies are not executed on the client or during replay */
737         local = res->lr_namespace->ns_client;
738         if (!local && !(*flags & LDLM_FL_REPLAY) &&
739             (policy = ldlm_res_policy_table[res->lr_type])) {
740                 int rc;
741                 rc = policy(lock, cookie, lock->l_req_mode, *flags, NULL);
742
743                 if (rc == ELDLM_LOCK_CHANGED) {
744                         res = lock->l_resource;
745                         *flags |= LDLM_FL_LOCK_CHANGED;
746                 } else if (rc == ELDLM_LOCK_ABORTED) {
747                         ldlm_lock_destroy(lock);
748                         RETURN(rc);
749                 }
750         }
751
752         l_lock(&res->lr_namespace->ns_lock);
753         if (local && lock->l_req_mode == lock->l_granted_mode) {
754                 /* The server returned a blocked lock, but it was granted before
755                  * we got a chance to actually enqueue it.  We don't need to do
756                  * anything else. */
757                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | 
758                           LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
759                 GOTO(out, ELDLM_OK);
760         }
761
762         /* This distinction between local lock trees is very important; a client
763          * namespace only has information about locks taken by that client, and
764          * thus doesn't have enough information to decide for itself if it can
765          * be granted (below).  In this case, we do exactly what the server
766          * tells us to do, as dictated by the 'flags'.
767          *
768          * We do exactly the same thing during recovery, when the server is
769          * more or less trusting the clients not to lie.
770          *
771          * FIXME (bug 629283): Detect obvious lies by checking compatibility in
772          * granted/converting queues. */
773         ldlm_resource_unlink_lock(lock);
774         if (local || (*flags & LDLM_FL_REPLAY)) {
775                 if (*flags & LDLM_FL_BLOCK_CONV)
776                         ldlm_resource_add_lock(res, res->lr_converting.prev,
777                                                lock);
778                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
779                         ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
780                 else
781                         ldlm_grant_lock(lock);
782                 GOTO(out, ELDLM_OK);
783         }
784
785         /* FIXME: We may want to optimize by checking lr_most_restr */
786         if (!list_empty(&res->lr_converting)) {
787                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
788                 *flags |= LDLM_FL_BLOCK_CONV;
789                 GOTO(out, ELDLM_OK);
790         }
791         if (!list_empty(&res->lr_waiting)) {
792                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
793                 *flags |= LDLM_FL_BLOCK_WAIT;
794                 GOTO(out, ELDLM_OK);
795         }
796         if (!ldlm_lock_compat(lock, 0)) {
797                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
798                 *flags |= LDLM_FL_BLOCK_GRANTED;
799                 GOTO(out, ELDLM_OK);
800         }
801
802         ldlm_grant_lock(lock);
803         EXIT;
804       out:
805         l_unlock(&res->lr_namespace->ns_lock);
806         /* Don't set 'completion_ast' until here so that if the lock is granted
807          * immediately we don't do an unnecessary completion call. */
808         lock->l_completion_ast = completion;
809         return ELDLM_OK;
810 }
811
812 /* Must be called with namespace taken: queue is waiting or converting. */
813 static int ldlm_reprocess_queue(struct ldlm_resource *res,
814                                 struct list_head *queue)
815 {
816         struct list_head *tmp, *pos;
817         ENTRY;
818
819         list_for_each_safe(tmp, pos, queue) {
820                 struct ldlm_lock *pending;
821                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
822
823                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
824
825                 if (!ldlm_lock_compat(pending, 1))
826                         RETURN(1);
827
828                 list_del_init(&pending->l_res_link);
829                 ldlm_grant_lock(pending);
830         }
831
832         RETURN(0);
833 }
834
835 void ldlm_run_ast_work(struct list_head *rpc_list)
836 {
837         struct list_head *tmp, *pos;
838         int rc;
839         ENTRY;
840
841         list_for_each_safe(tmp, pos, rpc_list) {
842                 struct ldlm_ast_work *w =
843                         list_entry(tmp, struct ldlm_ast_work, w_list);
844
845                 if (w->w_blocking)
846                         rc = w->w_lock->l_blocking_ast
847                                 (w->w_lock, &w->w_desc, w->w_data,
848                                  w->w_datalen, LDLM_CB_BLOCKING);
849                 else
850                         rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags);
851                 if (rc)
852                         CERROR("Failed AST - should clean & disconnect "
853                                "client\n");
854                 LDLM_LOCK_PUT(w->w_lock);
855                 list_del(&w->w_list);
856                 OBD_FREE(w, sizeof(*w));
857         }
858         EXIT;
859 }
860
861 /* Must be called with resource->lr_lock not taken. */
862 void ldlm_reprocess_all(struct ldlm_resource *res)
863 {
864         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
865         ENTRY;
866
867         /* Local lock trees don't get reprocessed. */
868         if (res->lr_namespace->ns_client) {
869                 EXIT;
870                 return;
871         }
872
873         l_lock(&res->lr_namespace->ns_lock);
874         res->lr_tmp = &rpc_list;
875
876         ldlm_reprocess_queue(res, &res->lr_converting);
877         if (list_empty(&res->lr_converting))
878                 ldlm_reprocess_queue(res, &res->lr_waiting);
879
880         res->lr_tmp = NULL;
881         l_unlock(&res->lr_namespace->ns_lock);
882
883         ldlm_run_ast_work(&rpc_list);
884         EXIT;
885 }
886
887 void ldlm_cancel_callback(struct ldlm_lock *lock)
888 {
889         l_lock(&lock->l_resource->lr_namespace->ns_lock);
890         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
891                 lock->l_flags |= LDLM_FL_CANCEL;
892                 lock->l_blocking_ast(lock, NULL, lock->l_data,
893                                      lock->l_data_len, LDLM_CB_CANCELING);
894         }
895         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
896 }
897
898 void ldlm_lock_cancel(struct ldlm_lock *lock)
899 {
900         struct ldlm_resource *res;
901         struct ldlm_namespace *ns;
902         ENTRY;
903
904         res = lock->l_resource;
905         ns = res->lr_namespace;
906
907         l_lock(&ns->ns_lock);
908         if (lock->l_readers || lock->l_writers)
909                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
910                        "writers)\n", lock->l_readers, lock->l_writers);
911
912         ldlm_cancel_callback(lock);
913
914         ldlm_del_waiting_lock(lock);
915         ldlm_resource_unlink_lock(lock);
916         ldlm_lock_destroy(lock);
917         l_unlock(&ns->ns_lock);
918         EXIT;
919 }
920
921 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data, int datalen)
922 {
923         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
924         ENTRY;
925
926         if (lock == NULL)
927                 RETURN(-EINVAL);
928
929         lock->l_data = data;
930         lock->l_data_len = datalen;
931
932         LDLM_LOCK_PUT(lock);
933
934         RETURN(0);
935 }
936
937 void ldlm_cancel_locks_for_export(struct obd_export *exp)
938 {
939         struct list_head *iter, *n; /* MUST BE CALLED "n"! */
940
941         list_for_each_safe(iter, n, &exp->exp_ldlm_data.led_held_locks) {
942                 struct ldlm_lock *lock;
943                 struct ldlm_resource *res;
944                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
945                 res = ldlm_resource_getref(lock->l_resource);
946                 LDLM_DEBUG(lock, "export %p", exp);
947                 ldlm_lock_cancel(lock);
948                 ldlm_reprocess_all(res);
949                 ldlm_resource_put(res);
950         }
951 }
952
953 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
954                                         int *flags)
955 {
956         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
957         struct ldlm_resource *res;
958         struct ldlm_namespace *ns;
959         int granted = 0;
960         ENTRY;
961
962         res = lock->l_resource;
963         ns = res->lr_namespace;
964
965         l_lock(&ns->ns_lock);
966
967         lock->l_req_mode = new_mode;
968         ldlm_resource_unlink_lock(lock);
969
970         /* If this is a local resource, put it on the appropriate list. */
971         if (res->lr_namespace->ns_client) {
972                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED))
973                         ldlm_resource_add_lock(res, res->lr_converting.prev,
974                                                lock);
975                 else {
976                         /* This should never happen, because of the way the
977                          * server handles conversions. */
978                         LBUG();
979
980                         res->lr_tmp = &rpc_list;
981                         ldlm_grant_lock(lock);
982                         res->lr_tmp = NULL;
983                         granted = 1;
984                         /* FIXME: completion handling not with ns_lock held ! */
985                         if (lock->l_completion_ast)
986                                 lock->l_completion_ast(lock, 0);
987                 }
988         } else {
989                 /* FIXME: We should try the conversion right away and possibly
990                  * return success without the need for an extra AST */
991                 ldlm_resource_add_lock(res, res->lr_converting.prev, lock);
992                 *flags |= LDLM_FL_BLOCK_CONV;
993         }
994
995         l_unlock(&ns->ns_lock);
996
997         if (granted)
998                 ldlm_run_ast_work(&rpc_list);
999         RETURN(res);
1000 }
1001
1002 void ldlm_lock_dump(struct ldlm_lock *lock)
1003 {
1004         char ver[128];
1005
1006         if (!(portal_debug & D_OTHER))
1007                 return;
1008
1009         if (RES_VERSION_SIZE != 4)
1010                 LBUG();
1011
1012         if (!lock) {
1013                 CDEBUG(D_OTHER, "  NULL LDLM lock\n");
1014                 return;
1015         }
1016
1017         snprintf(ver, sizeof(ver), "%x %x %x %x",
1018                  lock->l_version[0], lock->l_version[1],
1019                  lock->l_version[2], lock->l_version[3]);
1020
1021         CDEBUG(D_OTHER, "  -- Lock dump: %p (%s)\n", lock, ver);
1022         if (lock->l_export && lock->l_export->exp_connection)
1023                 CDEBUG(D_OTHER, "  Node: NID %x (rhandle: "LPX64")\n",
1024                        lock->l_export->exp_connection->c_peer.peer_nid,
1025                        lock->l_remote_handle.addr);
1026         else
1027                 CDEBUG(D_OTHER, "  Node: local\n");
1028         CDEBUG(D_OTHER, "  Parent: %p\n", lock->l_parent);
1029         CDEBUG(D_OTHER, "  Resource: %p ("LPD64")\n", lock->l_resource,
1030                lock->l_resource->lr_name[0]);
1031         CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
1032                (int)lock->l_req_mode, (int)lock->l_granted_mode);
1033         CDEBUG(D_OTHER, "  Readers: %u ; Writers; %u\n",
1034                lock->l_readers, lock->l_writers);
1035         if (lock->l_resource->lr_type == LDLM_EXTENT)
1036                 CDEBUG(D_OTHER, "  Extent: %Lu -> %Lu\n",
1037                        (unsigned long long)lock->l_extent.start,
1038                        (unsigned long long)lock->l_extent.end);
1039 }