Whamcloud - gitweb
land b_md onto HEAD. almost entirely small cleanups and miscellaneous fixes,
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LDLM
25
26 #include <linux/slab.h>
27 #include <linux/module.h>
28 #include <linux/random.h>
29 #include <linux/lustre_dlm.h>
30 #include <linux/lustre_mds.h>
31 #include <linux/obd_class.h>
32
33 //struct lustre_lock ldlm_everything_lock;
34
35 /* lock types */
36 char *ldlm_lockname[] = {
37         [0] "--",
38         [LCK_EX] "EX",
39         [LCK_PW] "PW",
40         [LCK_PR] "PR",
41         [LCK_CW] "CW",
42         [LCK_CR] "CR",
43         [LCK_NL] "NL"
44 };
45 char *ldlm_typename[] = {
46         [LDLM_PLAIN] "PLN",
47         [LDLM_EXTENT] "EXT",
48 };
49
50 char *ldlm_it2str(int it)
51 {
52         switch (it) {
53         case IT_OPEN:
54                 return "open";
55         case IT_CREAT:
56                 return "creat";
57         case (IT_OPEN | IT_CREAT):
58                 return "open|creat";
59         case IT_MKDIR:
60                 return "mkdir";
61         case IT_LINK:
62                 return "link";
63         case IT_LINK2:
64                 return "link2";
65         case IT_SYMLINK:
66                 return "symlink";
67         case IT_UNLINK:
68                 return "unlink";
69         case IT_RMDIR:
70                 return "rmdir";
71         case IT_RENAME:
72                 return "rename";
73         case IT_RENAME2:
74                 return "rename2";
75         case IT_READDIR:
76                 return "readdir";
77         case IT_GETATTR:
78                 return "getattr";
79         case IT_SETATTR:
80                 return "setattr";
81         case IT_READLINK:
82                 return "readlink";
83         case IT_MKNOD:
84                 return "mknod";
85         case IT_LOOKUP:
86                 return "lookup";
87         default:
88                 CERROR("Unknown intent %d\n", it);
89                 return "UNKNOWN";
90         }
91 }
92
93 extern kmem_cache_t *ldlm_lock_slab;
94 struct lustre_lock ldlm_handle_lock;
95
96 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
97
98 ldlm_res_compat ldlm_res_compat_table[] = {
99         [LDLM_PLAIN] ldlm_plain_compat,
100         [LDLM_EXTENT] ldlm_extent_compat,
101 };
102
103 static ldlm_res_policy ldlm_intent_policy_func;
104
105 static int ldlm_plain_policy(struct ldlm_lock *lock, void *req_cookie,
106                              ldlm_mode_t mode, int flags, void *data)
107 {
108         if ((flags & LDLM_FL_HAS_INTENT) && ldlm_intent_policy_func) {
109                 return ldlm_intent_policy_func(lock, req_cookie, mode, flags, 
110                                                data);
111         }
112
113         return ELDLM_OK;
114 }
115
116 ldlm_res_policy ldlm_res_policy_table[] = {
117         [LDLM_PLAIN] ldlm_plain_policy,
118         [LDLM_EXTENT] ldlm_extent_policy,
119 };
120
121 void ldlm_register_intent(ldlm_res_policy arg)
122 {
123         ldlm_intent_policy_func = arg;
124 }
125
126 void ldlm_unregister_intent(void)
127 {
128         ldlm_intent_policy_func = NULL;
129 }
130
131 /*
132  * REFCOUNTED LOCK OBJECTS
133  */
134
135
136 /*
137  * Lock refcounts, during creation:
138  *   - one special one for allocation, dec'd only once in destroy
139  *   - one for being a lock that's in-use
140  *   - one for the addref associated with a new lock
141  */
142 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
143 {
144         atomic_inc(&lock->l_refc);
145         return lock;
146 }
147
148 void ldlm_lock_put(struct ldlm_lock *lock)
149 {
150         struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
151         ENTRY;
152
153         if (atomic_dec_and_test(&lock->l_refc)) {
154                 l_lock(&ns->ns_lock);
155                 LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing");
156                 LASSERT(lock->l_destroyed);
157                 LASSERT(list_empty(&lock->l_res_link));
158
159                 spin_lock(&ns->ns_counter_lock);
160                 ns->ns_locks--;
161                 spin_unlock(&ns->ns_counter_lock);
162
163                 ldlm_resource_putref(lock->l_resource);
164                 lock->l_resource = NULL;
165
166                 if (lock->l_parent)
167                         LDLM_LOCK_PUT(lock->l_parent);
168
169                 PORTAL_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
170                 l_unlock(&ns->ns_lock);
171         }
172
173         EXIT;
174 }
175
176 void ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
177 {
178         ENTRY;
179         l_lock(&lock->l_resource->lr_namespace->ns_lock);
180         if (!list_empty(&lock->l_lru)) {
181                 list_del_init(&lock->l_lru);
182                 lock->l_resource->lr_namespace->ns_nr_unused--;
183                 LASSERT(lock->l_resource->lr_namespace->ns_nr_unused >= 0);
184         }
185         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
186         EXIT;
187 }
188
189 void ldlm_lock_destroy(struct ldlm_lock *lock)
190 {
191         ENTRY;
192         l_lock(&lock->l_resource->lr_namespace->ns_lock);
193
194         if (!list_empty(&lock->l_children)) {
195                 LDLM_DEBUG(lock, "still has children (%p)!",
196                            lock->l_children.next);
197                 ldlm_lock_dump(lock);
198                 LBUG();
199         }
200         if (lock->l_readers || lock->l_writers) {
201                 LDLM_DEBUG(lock, "lock still has references");
202                 ldlm_lock_dump(lock);
203         }
204
205         if (!list_empty(&lock->l_res_link)) {
206                 ldlm_lock_dump(lock);
207                 LBUG();
208         }
209
210         if (lock->l_destroyed) {
211                 LASSERT(list_empty(&lock->l_lru));
212                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
213                 EXIT;
214                 return;
215         }
216         lock->l_destroyed = 1;
217
218         list_del_init(&lock->l_export_chain);
219         ldlm_lock_remove_from_lru(lock);
220
221 #if 0
222         /* Wake anyone waiting for this lock */
223         /* FIXME: I should probably add yet another flag, instead of using
224          * l_export to only call this on clients */
225         lock->l_export = NULL;
226         if (lock->l_export && lock->l_completion_ast)
227                 lock->l_completion_ast(lock, 0);
228 #endif
229
230         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
231         LDLM_LOCK_PUT(lock);
232         EXIT;
233 }
234
235 /* this is called by portals_handle2object with the handle lock taken */
236 static void lock_handle_addref(void *lock)
237 {
238         ldlm_lock_get(lock);
239 }
240
241 /*
242  * usage: pass in a resource on which you have done ldlm_resource_get
243  *        pass in a parent lock on which you have done a ldlm_lock_get
244  *        after return, ldlm_*_put the resource and parent
245  * returns: lock with refcount 1
246  */
247 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
248                                        struct ldlm_resource *resource)
249 {
250         struct ldlm_lock *lock;
251         ENTRY;
252
253         if (resource == NULL)
254                 LBUG();
255
256         PORTAL_SLAB_ALLOC(lock, ldlm_lock_slab, sizeof(*lock));
257         if (lock == NULL)
258                 RETURN(NULL);
259
260         get_random_bytes(&lock->l_random, sizeof(__u64));
261         lock->l_resource = ldlm_resource_getref(resource);
262
263         atomic_set(&lock->l_refc, 2);
264         INIT_LIST_HEAD(&lock->l_children);
265         INIT_LIST_HEAD(&lock->l_res_link);
266         INIT_LIST_HEAD(&lock->l_lru);
267         INIT_LIST_HEAD(&lock->l_export_chain);
268         INIT_LIST_HEAD(&lock->l_pending_chain);
269         init_waitqueue_head(&lock->l_waitq);
270
271         spin_lock(&resource->lr_namespace->ns_counter_lock);
272         resource->lr_namespace->ns_locks++;
273         spin_unlock(&resource->lr_namespace->ns_counter_lock);
274
275         if (parent != NULL) {
276                 l_lock(&parent->l_resource->lr_namespace->ns_lock);
277                 lock->l_parent = LDLM_LOCK_GET(parent);
278                 list_add(&lock->l_childof, &parent->l_children);
279                 l_unlock(&parent->l_resource->lr_namespace->ns_lock);
280         }
281
282         RETURN(lock);
283 }
284
285 int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3])
286 {
287         struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
288         struct ldlm_resource *oldres = lock->l_resource;
289         ENTRY;
290
291         l_lock(&ns->ns_lock);
292         if (memcmp(new_resid, lock->l_resource->lr_name,
293                    sizeof(lock->l_resource->lr_name)) == 0) {
294                 /* Nothing to do */
295                 l_unlock(&ns->ns_lock);
296                 RETURN(0);
297         }
298
299         LASSERT(new_resid[0] != 0);
300
301         /* This function assumes that the lock isn't on any lists */
302         LASSERT(list_empty(&lock->l_res_link));
303
304         lock->l_resource = ldlm_resource_get(ns, NULL, new_resid,
305                                              lock->l_resource->lr_type, 1);
306         if (lock->l_resource == NULL) {
307                 LBUG();
308                 RETURN(-ENOMEM);
309         }
310
311         /* ...and the flowers are still standing! */
312         ldlm_resource_putref(oldres);
313
314         l_unlock(&ns->ns_lock);
315         RETURN(0);
316 }
317
318 /*
319  *  HANDLES
320  */
321
322 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
323 {
324         lockh->addr = (__u64) (unsigned long)lock;
325         lockh->cookie = lock->l_random;
326 }
327
328 /* 
329  * if flags: atomically get the lock and set the flags. 
330  * Return NULL if flag already set
331  */
332
333 struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int strict,
334                                      int flags)
335 {
336         struct ldlm_lock *lock = NULL, *retval = NULL;
337         ENTRY;
338
339         LASSERT(handle);
340
341         if (!handle->addr)
342                 RETURN(NULL);
343
344         lock = (struct ldlm_lock *)(unsigned long)(handle->addr);
345         if (!kmem_cache_validate(ldlm_lock_slab, (void *)lock)) {
346                 //CERROR("bogus lock %p\n", lock);
347                 GOTO(out2, retval);
348         }
349
350         if (lock->l_random != handle->cookie) {
351                 //CERROR("bogus cookie: lock %p has "LPX64" vs. handle "LPX64
352                 //       "\n", lock, lock->l_random, handle->cookie);
353                 GOTO(out2, NULL);
354         }
355         if (!lock->l_resource) {
356                 CERROR("trying to lock bogus resource: lock %p\n", lock);
357                 //LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
358                 GOTO(out2, retval);
359         }
360         if (!lock->l_resource->lr_namespace) {
361                 CERROR("trying to lock bogus namespace: lock %p\n", lock);
362                 //LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
363                 GOTO(out2, retval);
364         }
365
366         l_lock(&lock->l_resource->lr_namespace->ns_lock);
367         if (strict && lock->l_destroyed) {
368                 CERROR("lock already destroyed: lock %p\n", lock);
369                 //LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
370                 GOTO(out, NULL);
371         }
372
373         if (flags && (lock->l_flags & flags))
374                 GOTO(out, NULL);
375
376         if (flags)
377                 lock->l_flags |= flags;
378
379         retval = LDLM_LOCK_GET(lock);
380         EXIT;
381  out:
382         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
383  out2:
384         return retval;
385 }
386
387 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
388 {
389         return lockmode_compat(a->l_req_mode, b->l_req_mode);
390 }
391
392 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
393 {
394         ldlm_res2desc(lock->l_resource, &desc->l_resource);
395         desc->l_req_mode = lock->l_req_mode;
396         desc->l_granted_mode = lock->l_granted_mode;
397         memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
398         memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
399 }
400
401 static void ldlm_add_ast_work_item(struct ldlm_lock *lock,
402                                    struct ldlm_lock *new)
403 {
404         struct ldlm_ast_work *w;
405         ENTRY;
406
407         l_lock(&lock->l_resource->lr_namespace->ns_lock);
408         if (new && (lock->l_flags & LDLM_FL_AST_SENT))
409                 GOTO(out, 0);
410
411         OBD_ALLOC(w, sizeof(*w));
412         if (!w) {
413                 LBUG();
414                 GOTO(out, 0);
415         }
416
417         if (new) {
418                 lock->l_flags |= LDLM_FL_AST_SENT;
419                 w->w_blocking = 1;
420                 ldlm_lock2desc(new, &w->w_desc);
421         }
422
423         w->w_lock = LDLM_LOCK_GET(lock);
424         list_add(&w->w_list, lock->l_resource->lr_tmp);
425       out:
426         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
427         return;
428 }
429
430 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
431 {
432         struct ldlm_lock *lock;
433
434         lock = ldlm_handle2lock(lockh);
435         ldlm_lock_addref_internal(lock, mode);
436         LDLM_LOCK_PUT(lock);
437 }
438
439 /* only called for local locks */
440 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
441 {
442         l_lock(&lock->l_resource->lr_namespace->ns_lock);
443         ldlm_lock_remove_from_lru(lock);
444         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
445                 lock->l_readers++;
446         else
447                 lock->l_writers++;
448         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
449         LDLM_LOCK_GET(lock);
450         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
451 }
452
453 /* Args: unlocked lock */
454 int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
455                                     __u64 *res_id, int flags);
456
457 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
458 {
459         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0, 0);
460         struct ldlm_namespace *ns;
461         ENTRY;
462
463         if (lock == NULL)
464                 LBUG();
465
466         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
467         ns = lock->l_resource->lr_namespace;
468         l_lock(&lock->l_resource->lr_namespace->ns_lock);
469         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
470                 lock->l_readers--;
471         else
472                 lock->l_writers--;
473
474         /* If we received a blocked AST and this was the last reference,
475          * run the callback. */
476         if (!lock->l_readers && !lock->l_writers &&
477             (lock->l_flags & LDLM_FL_CBPENDING)) {
478                 if (!lock->l_resource->lr_namespace->ns_client &&
479                     lock->l_export)
480                         CERROR("FL_CBPENDING set on non-local lock--just a "
481                                "warning\n");
482
483                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
484                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
485
486                 /* FIXME: need a real 'desc' here */
487                 lock->l_blocking_ast(lock, NULL, lock->l_data,
488                                      lock->l_data_len, LDLM_CB_BLOCKING);
489         } else if (ns->ns_client && !lock->l_readers && !lock->l_writers) {
490                 LASSERT(list_empty(&lock->l_lru));
491                 LASSERT(ns->ns_nr_unused >= 0);
492                 list_add_tail(&lock->l_lru, &ns->ns_unused_list);
493                 ns->ns_nr_unused++;
494                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
495                 ldlm_cancel_lru(ns);
496         } else
497                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
498
499         LDLM_LOCK_PUT(lock);    /* matches the ldlm_lock_get in addref */
500         LDLM_LOCK_PUT(lock);    /* matches the handle2lock above */
501
502         EXIT;
503 }
504
505 static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs,
506                                  struct list_head *queue)
507 {
508         struct list_head *tmp, *pos;
509         int rc = 1;
510
511         list_for_each_safe(tmp, pos, queue) {
512                 struct ldlm_lock *child;
513                 ldlm_res_compat compat;
514
515                 child = list_entry(tmp, struct ldlm_lock, l_res_link);
516                 if (lock == child)
517                         continue;
518
519                 compat = ldlm_res_compat_table[child->l_resource->lr_type];
520                 if (compat && compat(child, lock)) {
521                         CDEBUG(D_OTHER, "compat function succeded, next.\n");
522                         continue;
523                 }
524                 if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
525                         CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
526                         continue;
527                 }
528
529                 rc = 0;
530
531                 if (send_cbs && child->l_blocking_ast != NULL) {
532                         CDEBUG(D_OTHER, "lock %p incompatible; sending "
533                                "blocking AST.\n", child);
534                         ldlm_add_ast_work_item(child, lock);
535                 }
536         }
537
538         return rc;
539 }
540
541 static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
542 {
543         int rc;
544         ENTRY;
545
546         l_lock(&lock->l_resource->lr_namespace->ns_lock);
547         rc = ldlm_lock_compat_list(lock, send_cbs,
548                                    &lock->l_resource->lr_granted);
549         /* FIXME: should we be sending ASTs to converting? */
550         if (rc)
551                 rc = ldlm_lock_compat_list
552                         (lock, send_cbs, &lock->l_resource->lr_converting);
553
554         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
555         RETURN(rc);
556 }
557
558 /* NOTE: called by
559    - ldlm_handle_enqueuque - resource
560 */
561 void ldlm_grant_lock(struct ldlm_lock *lock)
562 {
563         struct ldlm_resource *res = lock->l_resource;
564         ENTRY;
565
566         l_lock(&lock->l_resource->lr_namespace->ns_lock);
567         ldlm_resource_add_lock(res, &res->lr_granted, lock);
568         lock->l_granted_mode = lock->l_req_mode;
569
570         if (lock->l_granted_mode < res->lr_most_restr)
571                 res->lr_most_restr = lock->l_granted_mode;
572
573         if (lock->l_completion_ast) {
574                 ldlm_add_ast_work_item(lock, NULL);
575         }
576         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
577         EXIT;
578 }
579
580 /* returns a referenced lock or NULL */
581 static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
582                                       struct ldlm_extent *extent,
583                                       struct ldlm_lock *old_lock)
584 {
585         struct ldlm_lock *lock;
586         struct list_head *tmp;
587
588         list_for_each(tmp, queue) {
589                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
590
591                 if (lock == old_lock)
592                         continue;
593
594                 if (lock->l_flags & LDLM_FL_CBPENDING)
595                         continue;
596
597                 if (lock->l_req_mode != mode)
598                         continue;
599
600                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
601                     (lock->l_extent.start > extent->start ||
602                      lock->l_extent.end < extent->end))
603                         continue;
604
605                 if (lock->l_destroyed)
606                         continue;
607
608                 ldlm_lock_addref_internal(lock, mode);
609                 return lock;
610         }
611
612         return NULL;
613 }
614
615 /* Can be called in two ways:
616  *
617  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
618  * for a duplicate of.
619  *
620  * Otherwise, all of the fields must be filled in, to match against.
621  *
622  * Returns 1 if it finds an already-existing lock that is compatible; in this
623  * case, lockh is filled in with a addref()ed lock
624  */
625 int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
626                     void *cookie, int cookielen, ldlm_mode_t mode,
627                     struct lustre_handle *lockh)
628 {
629         struct ldlm_resource *res;
630         struct ldlm_lock *lock, *old_lock = NULL;
631         int rc = 0;
632         ENTRY;
633
634         if (ns == NULL) {
635                 old_lock = ldlm_handle2lock(lockh);
636                 LASSERT(old_lock);
637
638                 ns = old_lock->l_resource->lr_namespace;
639                 res_id = old_lock->l_resource->lr_name;
640                 type = old_lock->l_resource->lr_type;
641                 mode = old_lock->l_req_mode;
642         }
643
644         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
645         if (res == NULL) {
646                 LASSERT(old_lock == NULL);
647                 RETURN(0);
648         }
649
650         l_lock(&ns->ns_lock);
651
652         if ((lock = search_queue(&res->lr_granted, mode, cookie, old_lock)))
653                 GOTO(out, rc = 1);
654         if ((lock = search_queue(&res->lr_converting, mode, cookie, old_lock)))
655                 GOTO(out, rc = 1);
656         if ((lock = search_queue(&res->lr_waiting, mode, cookie, old_lock)))
657                 GOTO(out, rc = 1);
658
659         EXIT;
660        out:
661         ldlm_resource_putref(res);
662         l_unlock(&ns->ns_lock);
663
664         if (lock) {
665                 ldlm_lock2handle(lock, lockh);
666                 if (lock->l_completion_ast)
667                         lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC);
668         }
669         if (rc)
670                 LDLM_DEBUG(lock, "matched");
671         else
672                 LDLM_DEBUG_NOLOCK("not matched");
673
674         if (old_lock)
675                 LDLM_LOCK_PUT(old_lock);
676
677         return rc;
678 }
679
680 /* Returns a referenced lock */
681 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
682                                    struct lustre_handle *parent_lock_handle,
683                                    __u64 * res_id, __u32 type,
684                                    ldlm_mode_t mode, void *data, __u32 data_len)
685 {
686         struct ldlm_resource *res, *parent_res = NULL;
687         struct ldlm_lock *lock, *parent_lock = NULL;
688
689         if (parent_lock_handle) {
690                 parent_lock = ldlm_handle2lock(parent_lock_handle);
691                 if (parent_lock)
692                         parent_res = parent_lock->l_resource;
693         }
694
695         res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
696         if (res == NULL)
697                 RETURN(NULL);
698
699         lock = ldlm_lock_new(parent_lock, res);
700         ldlm_resource_putref(res);
701         if (parent_lock != NULL)
702                 LDLM_LOCK_PUT(parent_lock);
703
704         if (lock == NULL)
705                 RETURN(NULL);
706
707         lock->l_req_mode = mode;
708         lock->l_data = data;
709         lock->l_data_len = data_len;
710
711         return lock;
712 }
713
714 /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
715 ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
716                                void *cookie, int cookie_len,
717                                int *flags,
718                                ldlm_completion_callback completion,
719                                ldlm_blocking_callback blocking)
720 {
721         struct ldlm_resource *res;
722         int local;
723         ldlm_res_policy policy;
724         ENTRY;
725
726         res = lock->l_resource;
727         lock->l_blocking_ast = blocking;
728
729         if (res->lr_type == LDLM_EXTENT)
730                 memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
731
732         /* policies are not executed on the client or during replay */
733         local = res->lr_namespace->ns_client;
734         if (!local && !(*flags & LDLM_FL_REPLAY) &&
735             (policy = ldlm_res_policy_table[res->lr_type])) {
736                 int rc;
737                 rc = policy(lock, cookie, lock->l_req_mode, *flags, NULL);
738
739                 if (rc == ELDLM_LOCK_CHANGED) {
740                         res = lock->l_resource;
741                         *flags |= LDLM_FL_LOCK_CHANGED;
742                 } else if (rc == ELDLM_LOCK_ABORTED) {
743                         ldlm_lock_destroy(lock);
744                         RETURN(rc);
745                 }
746         }
747
748         l_lock(&res->lr_namespace->ns_lock);
749         if (local && lock->l_req_mode == lock->l_granted_mode) {
750                 /* The server returned a blocked lock, but it was granted before
751                  * we got a chance to actually enqueue it.  We don't need to do
752                  * anything else. */
753                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | 
754                           LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
755                 GOTO(out, ELDLM_OK);
756         }
757
758         /* This distinction between local lock trees is very important; a client
759          * namespace only has information about locks taken by that client, and
760          * thus doesn't have enough information to decide for itself if it can
761          * be granted (below).  In this case, we do exactly what the server
762          * tells us to do, as dictated by the 'flags'.
763          *
764          * We do exactly the same thing during recovery, when the server is
765          * more or less trusting the clients not to lie.
766          *
767          * FIXME (bug 268): Detect obvious lies by checking compatibility in
768          * granted/converting queues. */
769         ldlm_resource_unlink_lock(lock);
770         if (local || (*flags & LDLM_FL_REPLAY)) {
771                 if (*flags & LDLM_FL_BLOCK_CONV)
772                         ldlm_resource_add_lock(res, res->lr_converting.prev,
773                                                lock);
774                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
775                         ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
776                 else
777                         ldlm_grant_lock(lock);
778                 GOTO(out, ELDLM_OK);
779         }
780
781         /* FIXME: We may want to optimize by checking lr_most_restr */
782         if (!list_empty(&res->lr_converting)) {
783                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
784                 *flags |= LDLM_FL_BLOCK_CONV;
785                 GOTO(out, ELDLM_OK);
786         }
787         if (!list_empty(&res->lr_waiting)) {
788                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
789                 *flags |= LDLM_FL_BLOCK_WAIT;
790                 GOTO(out, ELDLM_OK);
791         }
792         if (!ldlm_lock_compat(lock, 0)) {
793                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
794                 *flags |= LDLM_FL_BLOCK_GRANTED;
795                 GOTO(out, ELDLM_OK);
796         }
797
798         ldlm_grant_lock(lock);
799         EXIT;
800       out:
801         l_unlock(&res->lr_namespace->ns_lock);
802         /* Don't set 'completion_ast' until here so that if the lock is granted
803          * immediately we don't do an unnecessary completion call. */
804         lock->l_completion_ast = completion;
805         return ELDLM_OK;
806 }
807
808 /* Must be called with namespace taken: queue is waiting or converting. */
809 static int ldlm_reprocess_queue(struct ldlm_resource *res,
810                                 struct list_head *queue)
811 {
812         struct list_head *tmp, *pos;
813         ENTRY;
814
815         list_for_each_safe(tmp, pos, queue) {
816                 struct ldlm_lock *pending;
817                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
818
819                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
820
821                 if (!ldlm_lock_compat(pending, 1))
822                         RETURN(1);
823
824                 list_del_init(&pending->l_res_link);
825                 ldlm_grant_lock(pending);
826         }
827
828         RETURN(0);
829 }
830
831 void ldlm_run_ast_work(struct list_head *rpc_list)
832 {
833         struct list_head *tmp, *pos;
834         int rc;
835         ENTRY;
836
837         list_for_each_safe(tmp, pos, rpc_list) {
838                 struct ldlm_ast_work *w =
839                         list_entry(tmp, struct ldlm_ast_work, w_list);
840
841                 if (w->w_blocking)
842                         rc = w->w_lock->l_blocking_ast
843                                 (w->w_lock, &w->w_desc, w->w_data,
844                                  w->w_datalen, LDLM_CB_BLOCKING);
845                 else
846                         rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags);
847                 if (rc)
848                         CERROR("Failed AST - should clean & disconnect "
849                                "client\n");
850                 LDLM_LOCK_PUT(w->w_lock);
851                 list_del(&w->w_list);
852                 OBD_FREE(w, sizeof(*w));
853         }
854         EXIT;
855 }
856
857 /* Must be called with resource->lr_lock not taken. */
858 void ldlm_reprocess_all(struct ldlm_resource *res)
859 {
860         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
861         ENTRY;
862
863         /* Local lock trees don't get reprocessed. */
864         if (res->lr_namespace->ns_client) {
865                 EXIT;
866                 return;
867         }
868
869         l_lock(&res->lr_namespace->ns_lock);
870         res->lr_tmp = &rpc_list;
871
872         ldlm_reprocess_queue(res, &res->lr_converting);
873         if (list_empty(&res->lr_converting))
874                 ldlm_reprocess_queue(res, &res->lr_waiting);
875
876         res->lr_tmp = NULL;
877         l_unlock(&res->lr_namespace->ns_lock);
878
879         ldlm_run_ast_work(&rpc_list);
880         EXIT;
881 }
882
883 void ldlm_cancel_callback(struct ldlm_lock *lock)
884 {
885         l_lock(&lock->l_resource->lr_namespace->ns_lock);
886         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
887                 lock->l_flags |= LDLM_FL_CANCEL;
888                 if (lock->l_blocking_ast)
889                         lock->l_blocking_ast(lock, NULL, lock->l_data,
890                                              lock->l_data_len,
891                                              LDLM_CB_CANCELING);
892                 else
893                         LDLM_DEBUG(lock, "no blocking ast");
894         }
895         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
896 }
897
898 void ldlm_lock_cancel(struct ldlm_lock *lock)
899 {
900         struct ldlm_resource *res;
901         struct ldlm_namespace *ns;
902         ENTRY;
903
904         res = lock->l_resource;
905         ns = res->lr_namespace;
906
907         l_lock(&ns->ns_lock);
908         if (lock->l_readers || lock->l_writers) {
909                 LDLM_DEBUG(lock, "lock still has references");
910                 ldlm_lock_dump(lock);
911                 //LBUG();
912         }
913
914         ldlm_cancel_callback(lock);
915
916         ldlm_del_waiting_lock(lock);
917         ldlm_resource_unlink_lock(lock);
918         ldlm_lock_destroy(lock);
919         l_unlock(&ns->ns_lock);
920         EXIT;
921 }
922
923 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data, int datalen)
924 {
925         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
926         ENTRY;
927
928         if (lock == NULL)
929                 RETURN(-EINVAL);
930
931         lock->l_data = data;
932         lock->l_data_len = datalen;
933
934         LDLM_LOCK_PUT(lock);
935
936         RETURN(0);
937 }
938
939 void ldlm_cancel_locks_for_export(struct obd_export *exp)
940 {
941         struct list_head *iter, *n; /* MUST BE CALLED "n"! */
942
943         list_for_each_safe(iter, n, &exp->exp_ldlm_data.led_held_locks) {
944                 struct ldlm_lock *lock;
945                 struct ldlm_resource *res;
946                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
947                 res = ldlm_resource_getref(lock->l_resource);
948                 LDLM_DEBUG(lock, "export %p", exp);
949                 ldlm_lock_cancel(lock);
950                 ldlm_reprocess_all(res);
951                 ldlm_resource_putref(res);
952         }
953 }
954
955 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
956                                         int *flags)
957 {
958         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
959         struct ldlm_resource *res;
960         struct ldlm_namespace *ns;
961         int granted = 0;
962         ENTRY;
963
964         res = lock->l_resource;
965         ns = res->lr_namespace;
966
967         l_lock(&ns->ns_lock);
968
969         lock->l_req_mode = new_mode;
970         ldlm_resource_unlink_lock(lock);
971
972         /* If this is a local resource, put it on the appropriate list. */
973         if (res->lr_namespace->ns_client) {
974                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED))
975                         ldlm_resource_add_lock(res, res->lr_converting.prev,
976                                                lock);
977                 else {
978                         /* This should never happen, because of the way the
979                          * server handles conversions. */
980                         LBUG();
981
982                         res->lr_tmp = &rpc_list;
983                         ldlm_grant_lock(lock);
984                         res->lr_tmp = NULL;
985                         granted = 1;
986                         /* FIXME: completion handling not with ns_lock held ! */
987                         if (lock->l_completion_ast)
988                                 lock->l_completion_ast(lock, 0);
989                 }
990         } else {
991                 /* FIXME: We should try the conversion right away and possibly
992                  * return success without the need for an extra AST */
993                 ldlm_resource_add_lock(res, res->lr_converting.prev, lock);
994                 *flags |= LDLM_FL_BLOCK_CONV;
995         }
996
997         l_unlock(&ns->ns_lock);
998
999         if (granted)
1000                 ldlm_run_ast_work(&rpc_list);
1001         RETURN(res);
1002 }
1003
1004 void ldlm_lock_dump(struct ldlm_lock *lock)
1005 {
1006         char ver[128];
1007
1008         if (!(portal_debug & D_OTHER))
1009                 return;
1010
1011         if (RES_VERSION_SIZE != 4)
1012                 LBUG();
1013
1014         if (!lock) {
1015                 CDEBUG(D_OTHER, "  NULL LDLM lock\n");
1016                 return;
1017         }
1018
1019         snprintf(ver, sizeof(ver), "%x %x %x %x",
1020                  lock->l_version[0], lock->l_version[1],
1021                  lock->l_version[2], lock->l_version[3]);
1022
1023         CDEBUG(D_OTHER, "  -- Lock dump: %p (%s)\n", lock, ver);
1024         if (lock->l_export && lock->l_export->exp_connection)
1025                 CDEBUG(D_OTHER, "  Node: NID %x (rhandle: "LPX64")\n",
1026                        lock->l_export->exp_connection->c_peer.peer_nid,
1027                        lock->l_remote_handle.addr);
1028         else
1029                 CDEBUG(D_OTHER, "  Node: local\n");
1030         CDEBUG(D_OTHER, "  Parent: %p\n", lock->l_parent);
1031         CDEBUG(D_OTHER, "  Resource: %p ("LPD64")\n", lock->l_resource,
1032                lock->l_resource->lr_name[0]);
1033         CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
1034                (int)lock->l_req_mode, (int)lock->l_granted_mode);
1035         CDEBUG(D_OTHER, "  Readers: %u ; Writers; %u\n",
1036                lock->l_readers, lock->l_writers);
1037         if (lock->l_resource->lr_type == LDLM_EXTENT)
1038                 CDEBUG(D_OTHER, "  Extent: %Lu -> %Lu\n",
1039                        (unsigned long long)lock->l_extent.start,
1040                        (unsigned long long)lock->l_extent.end);
1041 }
1042
1043 void ldlm_lock_dump_handle(struct lustre_handle *lockh)
1044 {
1045         struct ldlm_lock *lock;
1046
1047         lock = ldlm_handle2lock(lockh);
1048         if (lock == NULL)
1049                 return;
1050
1051         ldlm_lock_dump(lock);
1052
1053         LDLM_LOCK_PUT(lock);
1054 }