Whamcloud - gitweb
Merge b_md to HEAD for 0.5.19 release.
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LDLM
25
26 #include <linux/slab.h>
27 #include <linux/module.h>
28 #include <linux/lustre_dlm.h>
29 #include <linux/lustre_mds.h>
30 #include <linux/obd_class.h>
31
32 //struct lustre_lock ldlm_everything_lock;
33
34 /* lock types */
35 char *ldlm_lockname[] = {
36         [0] "--",
37         [LCK_EX] "EX",
38         [LCK_PW] "PW",
39         [LCK_PR] "PR",
40         [LCK_CW] "CW",
41         [LCK_CR] "CR",
42         [LCK_NL] "NL"
43 };
44 char *ldlm_typename[] = {
45         [LDLM_PLAIN] "PLN",
46         [LDLM_EXTENT] "EXT",
47 };
48
49 char *ldlm_it2str(int it)
50 {
51         switch (it) {
52         case IT_OPEN:
53                 return "open";
54         case IT_CREAT:
55                 return "creat";
56         case (IT_OPEN | IT_CREAT):
57                 return "open|creat";
58         case IT_MKDIR:
59                 return "mkdir";
60         case IT_LINK:
61                 return "link";
62         case IT_LINK2:
63                 return "link2";
64         case IT_SYMLINK:
65                 return "symlink";
66         case IT_UNLINK:
67                 return "unlink";
68         case IT_RMDIR:
69                 return "rmdir";
70         case IT_RENAME:
71                 return "rename";
72         case IT_RENAME2:
73                 return "rename2";
74         case IT_READDIR:
75                 return "readdir";
76         case IT_GETATTR:
77                 return "getattr";
78         case IT_SETATTR:
79                 return "setattr";
80         case IT_READLINK:
81                 return "readlink";
82         case IT_MKNOD:
83                 return "mknod";
84         case IT_LOOKUP:
85                 return "lookup";
86         default:
87                 CERROR("Unknown intent %d\n", it);
88                 return "UNKNOWN";
89         }
90 }
91
92 extern kmem_cache_t *ldlm_lock_slab;
93 struct lustre_lock ldlm_handle_lock;
94
95 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
96
97 ldlm_res_compat ldlm_res_compat_table[] = {
98         [LDLM_PLAIN] ldlm_plain_compat,
99         [LDLM_EXTENT] ldlm_extent_compat,
100 };
101
102 static ldlm_res_policy ldlm_intent_policy_func;
103
104 static int ldlm_plain_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
105                              void *req_cookie, ldlm_mode_t mode, int flags,
106                              void *data)
107 {
108         if ((flags & LDLM_FL_HAS_INTENT) && ldlm_intent_policy_func) {
109                 return ldlm_intent_policy_func(ns, lock, req_cookie, mode,
110                                                flags, data);
111         }
112
113         return ELDLM_OK;
114 }
115
116 ldlm_res_policy ldlm_res_policy_table[] = {
117         [LDLM_PLAIN] ldlm_plain_policy,
118         [LDLM_EXTENT] ldlm_extent_policy,
119 };
120
121 void ldlm_register_intent(ldlm_res_policy arg)
122 {
123         ldlm_intent_policy_func = arg;
124 }
125
126 void ldlm_unregister_intent(void)
127 {
128         ldlm_intent_policy_func = NULL;
129 }
130
131 /*
132  * REFCOUNTED LOCK OBJECTS
133  */
134
135
136 /*
137  * Lock refcounts, during creation:
138  *   - one special one for allocation, dec'd only once in destroy
139  *   - one for being a lock that's in-use
140  *   - one for the addref associated with a new lock
141  */
142 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
143 {
144         atomic_inc(&lock->l_refc);
145         return lock;
146 }
147
148 void ldlm_lock_put(struct ldlm_lock *lock)
149 {
150         struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
151         ENTRY;
152
153         if (atomic_dec_and_test(&lock->l_refc)) {
154                 l_lock(&ns->ns_lock);
155                 LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing");
156                 LASSERT(lock->l_destroyed);
157                 LASSERT(list_empty(&lock->l_res_link));
158
159                 spin_lock(&ns->ns_counter_lock);
160                 ns->ns_locks--;
161                 spin_unlock(&ns->ns_counter_lock);
162
163                 ldlm_resource_putref(lock->l_resource);
164                 lock->l_resource = NULL;
165
166                 if (lock->l_parent)
167                         LDLM_LOCK_PUT(lock->l_parent);
168
169                 PORTAL_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
170                 l_unlock(&ns->ns_lock);
171         }
172
173         EXIT;
174 }
175
176 void ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
177 {
178         ENTRY;
179         l_lock(&lock->l_resource->lr_namespace->ns_lock);
180         if (!list_empty(&lock->l_lru)) {
181                 list_del_init(&lock->l_lru);
182                 lock->l_resource->lr_namespace->ns_nr_unused--;
183                 LASSERT(lock->l_resource->lr_namespace->ns_nr_unused >= 0);
184         }
185         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
186         EXIT;
187 }
188
189 /* Only called with strict == 0 by recovery, to mark in-use locks as
190  * should-be-destroyed */
191 void ldlm_lock_destroy(struct ldlm_lock *lock)
192 {
193         ENTRY;
194         l_lock(&lock->l_resource->lr_namespace->ns_lock);
195
196         if (!list_empty(&lock->l_children)) {
197                 LDLM_DEBUG(lock, "still has children (%p)!",
198                            lock->l_children.next);
199                 ldlm_lock_dump(D_ERROR, lock);
200                 LBUG();
201         }
202         if (lock->l_readers || lock->l_writers) {
203                 LDLM_DEBUG(lock, "lock still has references");
204                 ldlm_lock_dump(D_OTHER, lock);
205         }
206
207         if (!list_empty(&lock->l_res_link)) {
208                 ldlm_lock_dump(D_ERROR, lock);
209                 LBUG();
210         }
211
212         if (lock->l_destroyed) {
213                 LASSERT(list_empty(&lock->l_lru));
214                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
215                 EXIT;
216                 return;
217         }
218         lock->l_destroyed = 1;
219
220         list_del_init(&lock->l_export_chain);
221         ldlm_lock_remove_from_lru(lock);
222         portals_handle_unhash(&lock->l_handle);
223
224 #if 0
225         /* Wake anyone waiting for this lock */
226         /* FIXME: I should probably add yet another flag, instead of using
227          * l_export to only call this on clients */
228         lock->l_export = NULL;
229         if (lock->l_export && lock->l_completion_ast)
230                 lock->l_completion_ast(lock, 0);
231 #endif
232
233         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
234         LDLM_LOCK_PUT(lock);
235         EXIT;
236 }
237
238 /* this is called by portals_handle2object with the handle lock taken */
239 static void lock_handle_addref(void *lock)
240 {
241         ldlm_lock_get(lock);
242 }
243
244 /*
245  * usage: pass in a resource on which you have done ldlm_resource_get
246  *        pass in a parent lock on which you have done a ldlm_lock_get
247  *        after return, ldlm_*_put the resource and parent
248  * returns: lock with refcount 1
249  */
250 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
251                                        struct ldlm_resource *resource)
252 {
253         struct ldlm_lock *lock;
254         ENTRY;
255
256         if (resource == NULL)
257                 LBUG();
258
259         PORTAL_SLAB_ALLOC(lock, ldlm_lock_slab, sizeof(*lock));
260         if (lock == NULL)
261                 RETURN(NULL);
262
263         lock->l_resource = ldlm_resource_getref(resource);
264
265         atomic_set(&lock->l_refc, 2);
266         INIT_LIST_HEAD(&lock->l_children);
267         INIT_LIST_HEAD(&lock->l_res_link);
268         INIT_LIST_HEAD(&lock->l_lru);
269         INIT_LIST_HEAD(&lock->l_export_chain);
270         INIT_LIST_HEAD(&lock->l_pending_chain);
271         init_waitqueue_head(&lock->l_waitq);
272
273         spin_lock(&resource->lr_namespace->ns_counter_lock);
274         resource->lr_namespace->ns_locks++;
275         spin_unlock(&resource->lr_namespace->ns_counter_lock);
276
277         if (parent != NULL) {
278                 l_lock(&parent->l_resource->lr_namespace->ns_lock);
279                 lock->l_parent = LDLM_LOCK_GET(parent);
280                 list_add(&lock->l_childof, &parent->l_children);
281                 l_unlock(&parent->l_resource->lr_namespace->ns_lock);
282         }
283
284         INIT_LIST_HEAD(&lock->l_handle.h_link);
285         portals_handle_hash(&lock->l_handle, lock_handle_addref);
286
287         RETURN(lock);
288 }
289
290 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
291                               __u64 new_resid[3])
292 {
293         struct ldlm_resource *oldres = lock->l_resource;
294         ENTRY;
295
296         l_lock(&ns->ns_lock);
297         if (memcmp(new_resid, lock->l_resource->lr_name,
298                    sizeof(lock->l_resource->lr_name)) == 0) {
299                 /* Nothing to do */
300                 l_unlock(&ns->ns_lock);
301                 RETURN(0);
302         }
303
304         LASSERT(new_resid[0] != 0);
305
306         /* This function assumes that the lock isn't on any lists */
307         LASSERT(list_empty(&lock->l_res_link));
308
309         lock->l_resource = ldlm_resource_get(ns, NULL, new_resid,
310                                              lock->l_resource->lr_type, 1);
311         if (lock->l_resource == NULL) {
312                 LBUG();
313                 RETURN(-ENOMEM);
314         }
315
316         /* ...and the flowers are still standing! */
317         ldlm_resource_putref(oldres);
318
319         l_unlock(&ns->ns_lock);
320         RETURN(0);
321 }
322
323 /*
324  *  HANDLES
325  */
326
327 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
328 {
329         //lockh->addr = (__u64)(unsigned long)lock;
330         memset(&lockh->addr, 0x69, sizeof(lockh->addr));
331         lockh->cookie = lock->l_handle.h_cookie;
332 }
333
334 /* if flags: atomically get the lock and set the flags. 
335  *           Return NULL if flag already set
336  */
337
338 struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags)
339 {
340         struct ldlm_lock *lock = NULL, *retval = NULL;
341         ENTRY;
342
343         LASSERT(handle);
344
345         lock = portals_handle2object(handle->cookie);
346         if (lock == NULL)
347                 RETURN(NULL);
348
349         LASSERT(lock->l_resource != NULL);
350         LASSERT(lock->l_resource->lr_namespace != NULL);
351
352         l_lock(&lock->l_resource->lr_namespace->ns_lock);
353
354         /* It's unlikely but possible that someone marked the lock as
355          * destroyed after we did handle2object on it */
356         if (lock->l_destroyed) {
357                 CERROR("lock already destroyed: lock %p\n", lock);
358                 LDLM_LOCK_PUT(lock);
359                 GOTO(out, retval);
360         }
361
362         if (flags && (lock->l_flags & flags)) {
363                 LDLM_LOCK_PUT(lock);
364                 GOTO(out, retval);
365         }
366
367         if (flags)
368                 lock->l_flags |= flags;
369
370         retval = lock;
371         EXIT;
372  out:
373         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
374         return retval;
375 }
376
377 struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns,
378                                       struct lustre_handle *handle)
379 {
380         struct ldlm_lock *retval = NULL;
381
382         l_lock(&ns->ns_lock);
383         retval = __ldlm_handle2lock(handle, 0);
384         l_unlock(&ns->ns_lock);
385
386         return retval;
387 }
388
389 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
390 {
391         return lockmode_compat(a->l_req_mode, b->l_req_mode);
392 }
393
394 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
395 {
396         ldlm_res2desc(lock->l_resource, &desc->l_resource);
397         desc->l_req_mode = lock->l_req_mode;
398         desc->l_granted_mode = lock->l_granted_mode;
399         memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
400         memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
401 }
402
403 static void ldlm_add_ast_work_item(struct ldlm_lock *lock,
404                                    struct ldlm_lock *new)
405 {
406         struct ldlm_ast_work *w;
407         ENTRY;
408
409         l_lock(&lock->l_resource->lr_namespace->ns_lock);
410         if (new && (lock->l_flags & LDLM_FL_AST_SENT))
411                 GOTO(out, 0);
412
413         OBD_ALLOC(w, sizeof(*w));
414         if (!w) {
415                 LBUG();
416                 GOTO(out, 0);
417         }
418
419         if (new) {
420                 lock->l_flags |= LDLM_FL_AST_SENT;
421                 w->w_blocking = 1;
422                 ldlm_lock2desc(new, &w->w_desc);
423         }
424
425         w->w_lock = LDLM_LOCK_GET(lock);
426         list_add(&w->w_list, lock->l_resource->lr_tmp);
427       out:
428         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
429         return;
430 }
431
432 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
433 {
434         struct ldlm_lock *lock;
435
436         lock = ldlm_handle2lock(lockh);
437         ldlm_lock_addref_internal(lock, mode);
438         LDLM_LOCK_PUT(lock);
439 }
440
441 /* only called for local locks */
442 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
443 {
444         l_lock(&lock->l_resource->lr_namespace->ns_lock);
445         ldlm_lock_remove_from_lru(lock);
446         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
447                 lock->l_readers++;
448         else
449                 lock->l_writers++;
450         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
451         LDLM_LOCK_GET(lock);
452         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
453 }
454
455 /* Args: unlocked lock */
456 int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
457                                     __u64 *res_id, int flags);
458
459 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
460 {
461         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
462         struct ldlm_namespace *ns;
463         ENTRY;
464
465         if (lock == NULL)
466                 LBUG();
467
468         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
469         ns = lock->l_resource->lr_namespace;
470         l_lock(&lock->l_resource->lr_namespace->ns_lock);
471         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) {
472                 LASSERT(lock->l_readers > 0);
473                 lock->l_readers--;
474         } else {
475                 LASSERT(lock->l_writers > 0);
476                 lock->l_writers--;
477         }
478
479         /* If we received a blocked AST and this was the last reference,
480          * run the callback. */
481         if (!lock->l_readers && !lock->l_writers &&
482             (lock->l_flags & LDLM_FL_CBPENDING)) {
483                 if (!lock->l_resource->lr_namespace->ns_client &&
484                     lock->l_export)
485                         CERROR("FL_CBPENDING set on non-local lock--just a "
486                                "warning\n");
487
488                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
489                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
490
491                 /* FIXME: need a real 'desc' here */
492                 lock->l_blocking_ast(lock, NULL, lock->l_data,
493                                      lock->l_data_len, LDLM_CB_BLOCKING);
494         } else if (ns->ns_client && !lock->l_readers && !lock->l_writers) {
495                 LASSERT(list_empty(&lock->l_lru));
496                 LASSERT(ns->ns_nr_unused >= 0);
497                 list_add_tail(&lock->l_lru, &ns->ns_unused_list);
498                 ns->ns_nr_unused++;
499                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
500                 ldlm_cancel_lru(ns);
501         } else {
502                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
503         }
504
505         LDLM_LOCK_PUT(lock);    /* matches the ldlm_lock_get in addref */
506         LDLM_LOCK_PUT(lock);    /* matches the handle2lock above */
507
508         EXIT;
509 }
510
511 static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs,
512                                  struct list_head *queue)
513 {
514         struct list_head *tmp, *pos;
515         int rc = 1;
516
517         list_for_each_safe(tmp, pos, queue) {
518                 struct ldlm_lock *child;
519                 ldlm_res_compat compat;
520
521                 child = list_entry(tmp, struct ldlm_lock, l_res_link);
522                 if (lock == child)
523                         continue;
524
525                 compat = ldlm_res_compat_table[child->l_resource->lr_type];
526                 if (compat && compat(child, lock)) {
527                         CDEBUG(D_OTHER, "compat function succeded, next.\n");
528                         continue;
529                 }
530                 if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
531                         CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
532                         continue;
533                 }
534
535                 rc = 0;
536
537                 if (send_cbs && child->l_blocking_ast != NULL) {
538                         CDEBUG(D_OTHER, "lock %p incompatible; sending "
539                                "blocking AST.\n", child);
540                         ldlm_add_ast_work_item(child, lock);
541                 }
542         }
543
544         return rc;
545 }
546
547 static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
548 {
549         int rc;
550         ENTRY;
551
552         l_lock(&lock->l_resource->lr_namespace->ns_lock);
553         rc = ldlm_lock_compat_list(lock, send_cbs,
554                                    &lock->l_resource->lr_granted);
555         /* FIXME: should we be sending ASTs to converting? */
556         if (rc)
557                 rc = ldlm_lock_compat_list
558                         (lock, send_cbs, &lock->l_resource->lr_converting);
559
560         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
561         RETURN(rc);
562 }
563
564 /* NOTE: called by
565    - ldlm_handle_enqueuque - resource
566 */
567 void ldlm_grant_lock(struct ldlm_lock *lock)
568 {
569         struct ldlm_resource *res = lock->l_resource;
570         ENTRY;
571
572         l_lock(&lock->l_resource->lr_namespace->ns_lock);
573         ldlm_resource_add_lock(res, &res->lr_granted, lock);
574         lock->l_granted_mode = lock->l_req_mode;
575
576         if (lock->l_granted_mode < res->lr_most_restr)
577                 res->lr_most_restr = lock->l_granted_mode;
578
579         if (lock->l_completion_ast) {
580                 ldlm_add_ast_work_item(lock, NULL);
581         }
582         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
583         EXIT;
584 }
585
586 /* returns a referenced lock or NULL */
587 static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
588                                       struct ldlm_extent *extent,
589                                       struct ldlm_lock *old_lock)
590 {
591         struct ldlm_lock *lock;
592         struct list_head *tmp;
593
594         list_for_each(tmp, queue) {
595                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
596
597                 if (lock == old_lock)
598                         continue;
599
600                 if (lock->l_flags & LDLM_FL_CBPENDING)
601                         continue;
602
603                 if (lock->l_req_mode != mode)
604                         continue;
605
606                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
607                     (lock->l_extent.start > extent->start ||
608                      lock->l_extent.end < extent->end))
609                         continue;
610
611                 if (lock->l_destroyed)
612                         continue;
613
614                 ldlm_lock_addref_internal(lock, mode);
615                 return lock;
616         }
617
618         return NULL;
619 }
620
621 /* Can be called in two ways:
622  *
623  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
624  * for a duplicate of.
625  *
626  * Otherwise, all of the fields must be filled in, to match against.
627  *
628  * Returns 1 if it finds an already-existing lock that is compatible; in this
629  * case, lockh is filled in with a addref()ed lock
630  */
631 int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
632                     void *cookie, int cookielen, ldlm_mode_t mode,
633                     struct lustre_handle *lockh)
634 {
635         struct ldlm_resource *res;
636         struct ldlm_lock *lock, *old_lock = NULL;
637         int rc = 0;
638         ENTRY;
639
640         if (ns == NULL) {
641                 old_lock = ldlm_handle2lock(lockh);
642                 LASSERT(old_lock);
643
644                 ns = old_lock->l_resource->lr_namespace;
645                 res_id = old_lock->l_resource->lr_name;
646                 type = old_lock->l_resource->lr_type;
647                 mode = old_lock->l_req_mode;
648         }
649
650         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
651         if (res == NULL) {
652                 LASSERT(old_lock == NULL);
653                 RETURN(0);
654         }
655
656         l_lock(&ns->ns_lock);
657
658         if ((lock = search_queue(&res->lr_granted, mode, cookie, old_lock)))
659                 GOTO(out, rc = 1);
660         if ((lock = search_queue(&res->lr_converting, mode, cookie, old_lock)))
661                 GOTO(out, rc = 1);
662         if ((lock = search_queue(&res->lr_waiting, mode, cookie, old_lock)))
663                 GOTO(out, rc = 1);
664
665         EXIT;
666        out:
667         ldlm_resource_putref(res);
668         l_unlock(&ns->ns_lock);
669
670         if (lock) {
671                 ldlm_lock2handle(lock, lockh);
672                 if (lock->l_completion_ast)
673                         lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC);
674         }
675         if (rc)
676                 LDLM_DEBUG(lock, "matched");
677         else
678                 LDLM_DEBUG_NOLOCK("not matched");
679
680         if (old_lock)
681                 LDLM_LOCK_PUT(old_lock);
682
683         return rc;
684 }
685
686 /* Returns a referenced lock */
687 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
688                                    struct lustre_handle *parent_lock_handle,
689                                    __u64 * res_id, __u32 type,
690                                    ldlm_mode_t mode, void *data, __u32 data_len)
691 {
692         struct ldlm_resource *res, *parent_res = NULL;
693         struct ldlm_lock *lock, *parent_lock = NULL;
694
695         if (parent_lock_handle) {
696                 parent_lock = ldlm_handle2lock(parent_lock_handle);
697                 if (parent_lock)
698                         parent_res = parent_lock->l_resource;
699         }
700
701         res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
702         if (res == NULL)
703                 RETURN(NULL);
704
705         lock = ldlm_lock_new(parent_lock, res);
706         ldlm_resource_putref(res);
707         if (parent_lock != NULL)
708                 LDLM_LOCK_PUT(parent_lock);
709
710         if (lock == NULL)
711                 RETURN(NULL);
712
713         lock->l_req_mode = mode;
714         lock->l_data = data;
715         lock->l_data_len = data_len;
716
717         return lock;
718 }
719
720 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
721                                struct ldlm_lock *lock,
722                                void *cookie, int cookie_len,
723                                int *flags,
724                                ldlm_completion_callback completion,
725                                ldlm_blocking_callback blocking)
726 {
727         struct ldlm_resource *res;
728         int local;
729         ldlm_res_policy policy;
730         ENTRY;
731
732         res = lock->l_resource;
733         lock->l_blocking_ast = blocking;
734
735         if (res->lr_type == LDLM_EXTENT)
736                 memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
737
738         /* policies are not executed on the client or during replay */
739         local = res->lr_namespace->ns_client;
740         if (!local && !(*flags & LDLM_FL_REPLAY) &&
741             (policy = ldlm_res_policy_table[res->lr_type])) {
742                 int rc;
743                 rc = policy(ns, lock, cookie, lock->l_req_mode, *flags, NULL);
744
745                 if (rc == ELDLM_LOCK_CHANGED) {
746                         res = lock->l_resource;
747                         *flags |= LDLM_FL_LOCK_CHANGED;
748                 } else if (rc == ELDLM_LOCK_ABORTED) {
749                         ldlm_lock_destroy(lock);
750                         RETURN(rc);
751                 }
752         }
753
754         l_lock(&ns->ns_lock);
755         if (local && lock->l_req_mode == lock->l_granted_mode) {
756                 /* The server returned a blocked lock, but it was granted before
757                  * we got a chance to actually enqueue it.  We don't need to do
758                  * anything else. */
759                 *flags &= ~(LDLM_FL_BLOCK_GRANTED | 
760                           LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
761                 GOTO(out, ELDLM_OK);
762         }
763
764         /* This distinction between local lock trees is very important; a client
765          * namespace only has information about locks taken by that client, and
766          * thus doesn't have enough information to decide for itself if it can
767          * be granted (below).  In this case, we do exactly what the server
768          * tells us to do, as dictated by the 'flags'.
769          *
770          * We do exactly the same thing during recovery, when the server is
771          * more or less trusting the clients not to lie.
772          *
773          * FIXME (bug 268): Detect obvious lies by checking compatibility in
774          * granted/converting queues. */
775         ldlm_resource_unlink_lock(lock);
776         if (local) {
777                 if (*flags & LDLM_FL_BLOCK_CONV)
778                         ldlm_resource_add_lock(res, res->lr_converting.prev,
779                                                lock);
780                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
781                         ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
782                 else
783                         ldlm_grant_lock(lock);
784                 GOTO(out, ELDLM_OK);
785         } else if (*flags & LDLM_FL_REPLAY) {
786                 if (*flags & LDLM_FL_BLOCK_CONV) {
787                         ldlm_resource_add_lock(res, res->lr_converting.prev,
788                                                lock);
789                         GOTO(out, ELDLM_OK);
790                 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
791                         ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
792                         GOTO(out, ELDLM_OK);
793                 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
794                         ldlm_grant_lock(lock);
795                         GOTO(out, ELDLM_OK);
796                 }
797                 /* If no flags, fall through to normal enqueue path. */
798         }
799
800         /* FIXME: We may want to optimize by checking lr_most_restr */
801         if (!list_empty(&res->lr_converting)) {
802                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
803                 *flags |= LDLM_FL_BLOCK_CONV;
804                 GOTO(out, ELDLM_OK);
805         }
806         if (!list_empty(&res->lr_waiting)) {
807                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
808                 *flags |= LDLM_FL_BLOCK_WAIT;
809                 GOTO(out, ELDLM_OK);
810         }
811         if (!ldlm_lock_compat(lock, 0)) {
812                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
813                 *flags |= LDLM_FL_BLOCK_GRANTED;
814                 GOTO(out, ELDLM_OK);
815         }
816
817         ldlm_grant_lock(lock);
818         EXIT;
819       out:
820         l_unlock(&ns->ns_lock);
821         /* Don't set 'completion_ast' until here so that if the lock is granted
822          * immediately we don't do an unnecessary completion call. */
823         lock->l_completion_ast = completion;
824         return ELDLM_OK;
825 }
826
827 /* Must be called with namespace taken: queue is waiting or converting. */
828 static int ldlm_reprocess_queue(struct ldlm_resource *res,
829                                 struct list_head *queue)
830 {
831         struct list_head *tmp, *pos;
832         ENTRY;
833
834         list_for_each_safe(tmp, pos, queue) {
835                 struct ldlm_lock *pending;
836                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
837
838                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
839
840                 if (!ldlm_lock_compat(pending, 1))
841                         RETURN(1);
842
843                 list_del_init(&pending->l_res_link);
844                 ldlm_grant_lock(pending);
845         }
846
847         RETURN(0);
848 }
849
850 int ldlm_run_ast_work(struct list_head *rpc_list)
851 {
852         struct list_head *tmp, *pos;
853         int rc, retval = 0;
854         ENTRY;
855
856         list_for_each_safe(tmp, pos, rpc_list) {
857                 struct ldlm_ast_work *w =
858                         list_entry(tmp, struct ldlm_ast_work, w_list);
859
860                 if (w->w_blocking)
861                         rc = w->w_lock->l_blocking_ast
862                                 (w->w_lock, &w->w_desc, w->w_data,
863                                  w->w_datalen, LDLM_CB_BLOCKING);
864                 else
865                         rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags);
866                 if (rc == -ERESTART)
867                         retval = rc;
868                 else if (rc)
869                         CERROR("Failed AST - should clean & disconnect "
870                                "client\n");
871                 LDLM_LOCK_PUT(w->w_lock);
872                 list_del(&w->w_list);
873                 OBD_FREE(w, sizeof(*w));
874         }
875         RETURN(retval);
876 }
877
878 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
879 {
880         ldlm_reprocess_all(res);
881         return LDLM_ITER_CONTINUE;
882 }
883
884 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
885 {
886         (void)ldlm_namespace_foreach_res(ns, reprocess_one_queue, NULL);
887 }
888
889 /* Must be called with resource->lr_lock not taken. */
890 void ldlm_reprocess_all(struct ldlm_resource *res)
891 {
892         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
893         int rc;
894         ENTRY;
895
896         /* Local lock trees don't get reprocessed. */
897         if (res->lr_namespace->ns_client) {
898                 EXIT;
899                 return;
900         }
901
902  restart:
903         l_lock(&res->lr_namespace->ns_lock);
904         res->lr_tmp = &rpc_list;
905
906         ldlm_reprocess_queue(res, &res->lr_converting);
907         if (list_empty(&res->lr_converting))
908                 ldlm_reprocess_queue(res, &res->lr_waiting);
909
910         res->lr_tmp = NULL;
911         l_unlock(&res->lr_namespace->ns_lock);
912
913         rc = ldlm_run_ast_work(&rpc_list);
914         if (rc == -ERESTART)
915                 goto restart;
916         EXIT;
917 }
918
919 void ldlm_cancel_callback(struct ldlm_lock *lock)
920 {
921         l_lock(&lock->l_resource->lr_namespace->ns_lock);
922         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
923                 lock->l_flags |= LDLM_FL_CANCEL;
924                 if (lock->l_blocking_ast)
925                         lock->l_blocking_ast(lock, NULL, lock->l_data,
926                                              lock->l_data_len,
927                                              LDLM_CB_CANCELING);
928                 else
929                         LDLM_DEBUG(lock, "no blocking ast");
930         }
931         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
932 }
933
934 void ldlm_lock_cancel(struct ldlm_lock *lock)
935 {
936         struct ldlm_resource *res;
937         struct ldlm_namespace *ns;
938         ENTRY;
939
940         res = lock->l_resource;
941         ns = res->lr_namespace;
942
943         l_lock(&ns->ns_lock);
944         /* Please do not, no matter how tempting, remove this LBUG without
945          * talking to me first. -phik */
946         if (lock->l_readers || lock->l_writers) {
947                 LDLM_DEBUG(lock, "lock still has references");
948                 ldlm_lock_dump(D_OTHER, lock);
949                 LBUG();
950         }
951
952         ldlm_cancel_callback(lock);
953
954         ldlm_del_waiting_lock(lock);
955         ldlm_resource_unlink_lock(lock);
956         ldlm_lock_destroy(lock);
957         l_unlock(&ns->ns_lock);
958         EXIT;
959 }
960
961 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data, int datalen)
962 {
963         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
964         ENTRY;
965
966         if (lock == NULL)
967                 RETURN(-EINVAL);
968
969         lock->l_data = data;
970         lock->l_data_len = datalen;
971
972         LDLM_LOCK_PUT(lock);
973
974         RETURN(0);
975 }
976
977 void ldlm_cancel_locks_for_export(struct obd_export *exp)
978 {
979         struct list_head *iter, *n; /* MUST BE CALLED "n"! */
980
981         list_for_each_safe(iter, n, &exp->exp_ldlm_data.led_held_locks) {
982                 struct ldlm_lock *lock;
983                 struct ldlm_resource *res;
984                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
985                 res = ldlm_resource_getref(lock->l_resource);
986                 LDLM_DEBUG(lock, "export %p", exp);
987                 ldlm_lock_cancel(lock);
988                 ldlm_reprocess_all(res);
989                 ldlm_resource_putref(res);
990         }
991 }
992
993 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
994                                         int *flags)
995 {
996         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
997         struct ldlm_resource *res;
998         struct ldlm_namespace *ns;
999         int granted = 0;
1000         ENTRY;
1001
1002         res = lock->l_resource;
1003         ns = res->lr_namespace;
1004
1005         l_lock(&ns->ns_lock);
1006
1007         lock->l_req_mode = new_mode;
1008         ldlm_resource_unlink_lock(lock);
1009
1010         /* If this is a local resource, put it on the appropriate list. */
1011         if (res->lr_namespace->ns_client) {
1012                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED))
1013                         ldlm_resource_add_lock(res, res->lr_converting.prev,
1014                                                lock);
1015                 else {
1016                         /* This should never happen, because of the way the
1017                          * server handles conversions. */
1018                         LBUG();
1019
1020                         res->lr_tmp = &rpc_list;
1021                         ldlm_grant_lock(lock);
1022                         res->lr_tmp = NULL;
1023                         granted = 1;
1024                         /* FIXME: completion handling not with ns_lock held ! */
1025                         if (lock->l_completion_ast)
1026                                 lock->l_completion_ast(lock, 0);
1027                 }
1028         } else {
1029                 /* FIXME: We should try the conversion right away and possibly
1030                  * return success without the need for an extra AST */
1031                 ldlm_resource_add_lock(res, res->lr_converting.prev, lock);
1032                 *flags |= LDLM_FL_BLOCK_CONV;
1033         }
1034
1035         l_unlock(&ns->ns_lock);
1036
1037         if (granted)
1038                 ldlm_run_ast_work(&rpc_list);
1039         RETURN(res);
1040 }
1041
1042 void ldlm_lock_dump(int level, struct ldlm_lock *lock)
1043 {
1044         char ver[128];
1045
1046         if (!(portal_debug & level))
1047                 return;
1048
1049         if (RES_VERSION_SIZE != 4)
1050                 LBUG();
1051
1052         if (!lock) {
1053                 CDEBUG(level, "  NULL LDLM lock\n");
1054                 return;
1055         }
1056
1057         snprintf(ver, sizeof(ver), "%x %x %x %x",
1058                  lock->l_version[0], lock->l_version[1],
1059                  lock->l_version[2], lock->l_version[3]);
1060
1061         CDEBUG(level, "  -- Lock dump: %p (%s)\n", lock, ver);
1062         if (lock->l_export && lock->l_export->exp_connection)
1063                 CDEBUG(level, "  Node: NID %x (rhandle: "LPX64")\n",
1064                        lock->l_export->exp_connection->c_peer.peer_nid,
1065                        lock->l_remote_handle.cookie);
1066         else
1067                 CDEBUG(level, "  Node: local\n");
1068         CDEBUG(level, "  Parent: %p\n", lock->l_parent);
1069         CDEBUG(level, "  Resource: %p ("LPD64")\n", lock->l_resource,
1070                lock->l_resource->lr_name[0]);
1071         CDEBUG(level, "  Requested mode: %d, granted mode: %d\n",
1072                (int)lock->l_req_mode, (int)lock->l_granted_mode);
1073         CDEBUG(level, "  Readers: %u ; Writers; %u\n",
1074                lock->l_readers, lock->l_writers);
1075         if (lock->l_resource->lr_type == LDLM_EXTENT)
1076                 CDEBUG(level, "  Extent: "LPU64" -> "LPU64"\n",
1077                        lock->l_extent.start, lock->l_extent.end);
1078 }
1079
1080 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
1081 {
1082         struct ldlm_lock *lock;
1083
1084         lock = ldlm_handle2lock(lockh);
1085         if (lock == NULL)
1086                 return;
1087
1088         ldlm_lock_dump(D_OTHER, lock);
1089
1090         LDLM_LOCK_PUT(lock);
1091 }