1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002 Cluster File Systems, Inc.
5 * Author: Peter Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #define DEBUG_SUBSYSTEM S_LDLM
26 #include <linux/slab.h>
27 #include <linux/module.h>
28 #include <linux/lustre_dlm.h>
29 #include <linux/lustre_mds.h>
30 #include <linux/obd_class.h>
32 //struct lustre_lock ldlm_everything_lock;
35 char *ldlm_lockname[] = {
44 char *ldlm_typename[] = {
49 char *ldlm_it2str(int it)
56 case (IT_OPEN | IT_CREAT):
87 CERROR("Unknown intent %d\n", it);
92 extern kmem_cache_t *ldlm_lock_slab;
93 struct lustre_lock ldlm_handle_lock;
95 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
97 ldlm_res_compat ldlm_res_compat_table[] = {
98 [LDLM_PLAIN] ldlm_plain_compat,
99 [LDLM_EXTENT] ldlm_extent_compat,
102 static ldlm_res_policy ldlm_intent_policy_func;
104 static int ldlm_plain_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
105 void *req_cookie, ldlm_mode_t mode, int flags,
108 if ((flags & LDLM_FL_HAS_INTENT) && ldlm_intent_policy_func) {
109 return ldlm_intent_policy_func(ns, lock, req_cookie, mode,
116 ldlm_res_policy ldlm_res_policy_table[] = {
117 [LDLM_PLAIN] ldlm_plain_policy,
118 [LDLM_EXTENT] ldlm_extent_policy,
121 void ldlm_register_intent(ldlm_res_policy arg)
123 ldlm_intent_policy_func = arg;
126 void ldlm_unregister_intent(void)
128 ldlm_intent_policy_func = NULL;
132 * REFCOUNTED LOCK OBJECTS
137 * Lock refcounts, during creation:
138 * - one special one for allocation, dec'd only once in destroy
139 * - one for being a lock that's in-use
140 * - one for the addref associated with a new lock
142 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
144 atomic_inc(&lock->l_refc);
148 void ldlm_lock_put(struct ldlm_lock *lock)
150 struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
153 if (atomic_dec_and_test(&lock->l_refc)) {
154 l_lock(&ns->ns_lock);
155 LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing");
156 LASSERT(lock->l_destroyed);
157 LASSERT(list_empty(&lock->l_res_link));
159 spin_lock(&ns->ns_counter_lock);
161 spin_unlock(&ns->ns_counter_lock);
163 ldlm_resource_putref(lock->l_resource);
164 lock->l_resource = NULL;
167 LDLM_LOCK_PUT(lock->l_parent);
169 PORTAL_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));
170 l_unlock(&ns->ns_lock);
176 void ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
179 l_lock(&lock->l_resource->lr_namespace->ns_lock);
180 if (!list_empty(&lock->l_lru)) {
181 list_del_init(&lock->l_lru);
182 lock->l_resource->lr_namespace->ns_nr_unused--;
183 LASSERT(lock->l_resource->lr_namespace->ns_nr_unused >= 0);
185 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
189 /* Only called with strict == 0 by recovery, to mark in-use locks as
190 * should-be-destroyed */
191 void ldlm_lock_destroy(struct ldlm_lock *lock)
194 l_lock(&lock->l_resource->lr_namespace->ns_lock);
196 if (!list_empty(&lock->l_children)) {
197 LDLM_DEBUG(lock, "still has children (%p)!",
198 lock->l_children.next);
199 ldlm_lock_dump(D_ERROR, lock);
202 if (lock->l_readers || lock->l_writers) {
203 LDLM_DEBUG(lock, "lock still has references");
204 ldlm_lock_dump(D_OTHER, lock);
207 if (!list_empty(&lock->l_res_link)) {
208 ldlm_lock_dump(D_ERROR, lock);
212 if (lock->l_destroyed) {
213 LASSERT(list_empty(&lock->l_lru));
214 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
218 lock->l_destroyed = 1;
220 list_del_init(&lock->l_export_chain);
221 ldlm_lock_remove_from_lru(lock);
222 portals_handle_unhash(&lock->l_handle);
225 /* Wake anyone waiting for this lock */
226 /* FIXME: I should probably add yet another flag, instead of using
227 * l_export to only call this on clients */
228 lock->l_export = NULL;
229 if (lock->l_export && lock->l_completion_ast)
230 lock->l_completion_ast(lock, 0);
233 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
238 /* this is called by portals_handle2object with the handle lock taken */
239 static void lock_handle_addref(void *lock)
245 * usage: pass in a resource on which you have done ldlm_resource_get
246 * pass in a parent lock on which you have done a ldlm_lock_get
247 * after return, ldlm_*_put the resource and parent
248 * returns: lock with refcount 1
250 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
251 struct ldlm_resource *resource)
253 struct ldlm_lock *lock;
256 if (resource == NULL)
259 PORTAL_SLAB_ALLOC(lock, ldlm_lock_slab, sizeof(*lock));
263 lock->l_resource = ldlm_resource_getref(resource);
265 atomic_set(&lock->l_refc, 2);
266 INIT_LIST_HEAD(&lock->l_children);
267 INIT_LIST_HEAD(&lock->l_res_link);
268 INIT_LIST_HEAD(&lock->l_lru);
269 INIT_LIST_HEAD(&lock->l_export_chain);
270 INIT_LIST_HEAD(&lock->l_pending_chain);
271 init_waitqueue_head(&lock->l_waitq);
273 spin_lock(&resource->lr_namespace->ns_counter_lock);
274 resource->lr_namespace->ns_locks++;
275 spin_unlock(&resource->lr_namespace->ns_counter_lock);
277 if (parent != NULL) {
278 l_lock(&parent->l_resource->lr_namespace->ns_lock);
279 lock->l_parent = LDLM_LOCK_GET(parent);
280 list_add(&lock->l_childof, &parent->l_children);
281 l_unlock(&parent->l_resource->lr_namespace->ns_lock);
284 INIT_LIST_HEAD(&lock->l_handle.h_link);
285 portals_handle_hash(&lock->l_handle, lock_handle_addref);
290 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
293 struct ldlm_resource *oldres = lock->l_resource;
296 l_lock(&ns->ns_lock);
297 if (memcmp(new_resid, lock->l_resource->lr_name,
298 sizeof(lock->l_resource->lr_name)) == 0) {
300 l_unlock(&ns->ns_lock);
304 LASSERT(new_resid[0] != 0);
306 /* This function assumes that the lock isn't on any lists */
307 LASSERT(list_empty(&lock->l_res_link));
309 lock->l_resource = ldlm_resource_get(ns, NULL, new_resid,
310 lock->l_resource->lr_type, 1);
311 if (lock->l_resource == NULL) {
316 /* ...and the flowers are still standing! */
317 ldlm_resource_putref(oldres);
319 l_unlock(&ns->ns_lock);
327 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
329 //lockh->addr = (__u64)(unsigned long)lock;
330 memset(&lockh->addr, 0x69, sizeof(lockh->addr));
331 lockh->cookie = lock->l_handle.h_cookie;
334 /* if flags: atomically get the lock and set the flags.
335 * Return NULL if flag already set
338 struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags)
340 struct ldlm_lock *lock = NULL, *retval = NULL;
345 lock = portals_handle2object(handle->cookie);
349 LASSERT(lock->l_resource != NULL);
350 LASSERT(lock->l_resource->lr_namespace != NULL);
352 l_lock(&lock->l_resource->lr_namespace->ns_lock);
354 /* It's unlikely but possible that someone marked the lock as
355 * destroyed after we did handle2object on it */
356 if (lock->l_destroyed) {
357 CERROR("lock already destroyed: lock %p\n", lock);
362 if (flags && (lock->l_flags & flags)) {
368 lock->l_flags |= flags;
373 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
377 struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns,
378 struct lustre_handle *handle)
380 struct ldlm_lock *retval = NULL;
382 l_lock(&ns->ns_lock);
383 retval = __ldlm_handle2lock(handle, 0);
384 l_unlock(&ns->ns_lock);
389 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
391 return lockmode_compat(a->l_req_mode, b->l_req_mode);
394 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
396 ldlm_res2desc(lock->l_resource, &desc->l_resource);
397 desc->l_req_mode = lock->l_req_mode;
398 desc->l_granted_mode = lock->l_granted_mode;
399 memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
400 memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
403 static void ldlm_add_ast_work_item(struct ldlm_lock *lock,
404 struct ldlm_lock *new)
406 struct ldlm_ast_work *w;
409 l_lock(&lock->l_resource->lr_namespace->ns_lock);
410 if (new && (lock->l_flags & LDLM_FL_AST_SENT))
413 OBD_ALLOC(w, sizeof(*w));
420 lock->l_flags |= LDLM_FL_AST_SENT;
422 ldlm_lock2desc(new, &w->w_desc);
425 w->w_lock = LDLM_LOCK_GET(lock);
426 list_add(&w->w_list, lock->l_resource->lr_tmp);
428 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
432 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
434 struct ldlm_lock *lock;
436 lock = ldlm_handle2lock(lockh);
437 ldlm_lock_addref_internal(lock, mode);
441 /* only called for local locks */
442 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
444 l_lock(&lock->l_resource->lr_namespace->ns_lock);
445 ldlm_lock_remove_from_lru(lock);
446 if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
450 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
452 LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
455 /* Args: unlocked lock */
456 int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
457 __u64 *res_id, int flags);
459 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
461 struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
462 struct ldlm_namespace *ns;
468 LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
469 ns = lock->l_resource->lr_namespace;
470 l_lock(&lock->l_resource->lr_namespace->ns_lock);
471 if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) {
472 LASSERT(lock->l_readers > 0);
475 LASSERT(lock->l_writers > 0);
479 /* If we received a blocked AST and this was the last reference,
480 * run the callback. */
481 if (!lock->l_readers && !lock->l_writers &&
482 (lock->l_flags & LDLM_FL_CBPENDING)) {
483 if (!lock->l_resource->lr_namespace->ns_client &&
485 CERROR("FL_CBPENDING set on non-local lock--just a "
488 LDLM_DEBUG(lock, "final decref done on cbpending lock");
489 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
491 /* FIXME: need a real 'desc' here */
492 lock->l_blocking_ast(lock, NULL, lock->l_data,
493 lock->l_data_len, LDLM_CB_BLOCKING);
494 } else if (ns->ns_client && !lock->l_readers && !lock->l_writers) {
495 LASSERT(list_empty(&lock->l_lru));
496 LASSERT(ns->ns_nr_unused >= 0);
497 list_add_tail(&lock->l_lru, &ns->ns_unused_list);
499 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
502 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
505 LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */
506 LDLM_LOCK_PUT(lock); /* matches the handle2lock above */
511 static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs,
512 struct list_head *queue)
514 struct list_head *tmp, *pos;
517 list_for_each_safe(tmp, pos, queue) {
518 struct ldlm_lock *child;
519 ldlm_res_compat compat;
521 child = list_entry(tmp, struct ldlm_lock, l_res_link);
525 compat = ldlm_res_compat_table[child->l_resource->lr_type];
526 if (compat && compat(child, lock)) {
527 CDEBUG(D_OTHER, "compat function succeded, next.\n");
530 if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
531 CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
537 if (send_cbs && child->l_blocking_ast != NULL) {
538 CDEBUG(D_OTHER, "lock %p incompatible; sending "
539 "blocking AST.\n", child);
540 ldlm_add_ast_work_item(child, lock);
547 static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
552 l_lock(&lock->l_resource->lr_namespace->ns_lock);
553 rc = ldlm_lock_compat_list(lock, send_cbs,
554 &lock->l_resource->lr_granted);
555 /* FIXME: should we be sending ASTs to converting? */
557 rc = ldlm_lock_compat_list
558 (lock, send_cbs, &lock->l_resource->lr_converting);
560 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
565 - ldlm_handle_enqueuque - resource
567 void ldlm_grant_lock(struct ldlm_lock *lock)
569 struct ldlm_resource *res = lock->l_resource;
572 l_lock(&lock->l_resource->lr_namespace->ns_lock);
573 ldlm_resource_add_lock(res, &res->lr_granted, lock);
574 lock->l_granted_mode = lock->l_req_mode;
576 if (lock->l_granted_mode < res->lr_most_restr)
577 res->lr_most_restr = lock->l_granted_mode;
579 if (lock->l_completion_ast) {
580 ldlm_add_ast_work_item(lock, NULL);
582 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
586 /* returns a referenced lock or NULL */
587 static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
588 struct ldlm_extent *extent,
589 struct ldlm_lock *old_lock)
591 struct ldlm_lock *lock;
592 struct list_head *tmp;
594 list_for_each(tmp, queue) {
595 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
597 if (lock == old_lock)
600 if (lock->l_flags & LDLM_FL_CBPENDING)
603 if (lock->l_req_mode != mode)
606 if (lock->l_resource->lr_type == LDLM_EXTENT &&
607 (lock->l_extent.start > extent->start ||
608 lock->l_extent.end < extent->end))
611 if (lock->l_destroyed)
614 ldlm_lock_addref_internal(lock, mode);
621 /* Can be called in two ways:
623 * If 'ns' is NULL, then lockh describes an existing lock that we want to look
624 * for a duplicate of.
626 * Otherwise, all of the fields must be filled in, to match against.
628 * Returns 1 if it finds an already-existing lock that is compatible; in this
629 * case, lockh is filled in with a addref()ed lock
631 int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
632 void *cookie, int cookielen, ldlm_mode_t mode,
633 struct lustre_handle *lockh)
635 struct ldlm_resource *res;
636 struct ldlm_lock *lock, *old_lock = NULL;
641 old_lock = ldlm_handle2lock(lockh);
644 ns = old_lock->l_resource->lr_namespace;
645 res_id = old_lock->l_resource->lr_name;
646 type = old_lock->l_resource->lr_type;
647 mode = old_lock->l_req_mode;
650 res = ldlm_resource_get(ns, NULL, res_id, type, 0);
652 LASSERT(old_lock == NULL);
656 l_lock(&ns->ns_lock);
658 if ((lock = search_queue(&res->lr_granted, mode, cookie, old_lock)))
660 if ((lock = search_queue(&res->lr_converting, mode, cookie, old_lock)))
662 if ((lock = search_queue(&res->lr_waiting, mode, cookie, old_lock)))
667 ldlm_resource_putref(res);
668 l_unlock(&ns->ns_lock);
671 ldlm_lock2handle(lock, lockh);
672 if (lock->l_completion_ast)
673 lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC);
676 LDLM_DEBUG(lock, "matched");
678 LDLM_DEBUG_NOLOCK("not matched");
681 LDLM_LOCK_PUT(old_lock);
686 /* Returns a referenced lock */
687 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
688 struct lustre_handle *parent_lock_handle,
689 __u64 * res_id, __u32 type,
690 ldlm_mode_t mode, void *data, __u32 data_len)
692 struct ldlm_resource *res, *parent_res = NULL;
693 struct ldlm_lock *lock, *parent_lock = NULL;
695 if (parent_lock_handle) {
696 parent_lock = ldlm_handle2lock(parent_lock_handle);
698 parent_res = parent_lock->l_resource;
701 res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
705 lock = ldlm_lock_new(parent_lock, res);
706 ldlm_resource_putref(res);
707 if (parent_lock != NULL)
708 LDLM_LOCK_PUT(parent_lock);
713 lock->l_req_mode = mode;
715 lock->l_data_len = data_len;
720 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
721 struct ldlm_lock *lock,
722 void *cookie, int cookie_len,
724 ldlm_completion_callback completion,
725 ldlm_blocking_callback blocking)
727 struct ldlm_resource *res;
729 ldlm_res_policy policy;
732 res = lock->l_resource;
733 lock->l_blocking_ast = blocking;
735 if (res->lr_type == LDLM_EXTENT)
736 memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
738 /* policies are not executed on the client or during replay */
739 local = res->lr_namespace->ns_client;
740 if (!local && !(*flags & LDLM_FL_REPLAY) &&
741 (policy = ldlm_res_policy_table[res->lr_type])) {
743 rc = policy(ns, lock, cookie, lock->l_req_mode, *flags, NULL);
745 if (rc == ELDLM_LOCK_CHANGED) {
746 res = lock->l_resource;
747 *flags |= LDLM_FL_LOCK_CHANGED;
748 } else if (rc == ELDLM_LOCK_ABORTED) {
749 ldlm_lock_destroy(lock);
754 l_lock(&ns->ns_lock);
755 if (local && lock->l_req_mode == lock->l_granted_mode) {
756 /* The server returned a blocked lock, but it was granted before
757 * we got a chance to actually enqueue it. We don't need to do
759 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
760 LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
764 /* This distinction between local lock trees is very important; a client
765 * namespace only has information about locks taken by that client, and
766 * thus doesn't have enough information to decide for itself if it can
767 * be granted (below). In this case, we do exactly what the server
768 * tells us to do, as dictated by the 'flags'.
770 * We do exactly the same thing during recovery, when the server is
771 * more or less trusting the clients not to lie.
773 * FIXME (bug 268): Detect obvious lies by checking compatibility in
774 * granted/converting queues. */
775 ldlm_resource_unlink_lock(lock);
777 if (*flags & LDLM_FL_BLOCK_CONV)
778 ldlm_resource_add_lock(res, res->lr_converting.prev,
780 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
781 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
783 ldlm_grant_lock(lock);
785 } else if (*flags & LDLM_FL_REPLAY) {
786 if (*flags & LDLM_FL_BLOCK_CONV) {
787 ldlm_resource_add_lock(res, res->lr_converting.prev,
790 } else if (*flags & LDLM_FL_BLOCK_WAIT) {
791 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
793 } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
794 ldlm_grant_lock(lock);
797 /* If no flags, fall through to normal enqueue path. */
800 /* FIXME: We may want to optimize by checking lr_most_restr */
801 if (!list_empty(&res->lr_converting)) {
802 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
803 *flags |= LDLM_FL_BLOCK_CONV;
806 if (!list_empty(&res->lr_waiting)) {
807 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
808 *flags |= LDLM_FL_BLOCK_WAIT;
811 if (!ldlm_lock_compat(lock, 0)) {
812 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
813 *flags |= LDLM_FL_BLOCK_GRANTED;
817 ldlm_grant_lock(lock);
820 l_unlock(&ns->ns_lock);
821 /* Don't set 'completion_ast' until here so that if the lock is granted
822 * immediately we don't do an unnecessary completion call. */
823 lock->l_completion_ast = completion;
827 /* Must be called with namespace taken: queue is waiting or converting. */
828 static int ldlm_reprocess_queue(struct ldlm_resource *res,
829 struct list_head *queue)
831 struct list_head *tmp, *pos;
834 list_for_each_safe(tmp, pos, queue) {
835 struct ldlm_lock *pending;
836 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
838 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
840 if (!ldlm_lock_compat(pending, 1))
843 list_del_init(&pending->l_res_link);
844 ldlm_grant_lock(pending);
850 int ldlm_run_ast_work(struct list_head *rpc_list)
852 struct list_head *tmp, *pos;
856 list_for_each_safe(tmp, pos, rpc_list) {
857 struct ldlm_ast_work *w =
858 list_entry(tmp, struct ldlm_ast_work, w_list);
861 rc = w->w_lock->l_blocking_ast
862 (w->w_lock, &w->w_desc, w->w_data,
863 w->w_datalen, LDLM_CB_BLOCKING);
865 rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags);
869 CERROR("Failed AST - should clean & disconnect "
871 LDLM_LOCK_PUT(w->w_lock);
872 list_del(&w->w_list);
873 OBD_FREE(w, sizeof(*w));
878 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
880 ldlm_reprocess_all(res);
881 return LDLM_ITER_CONTINUE;
884 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
886 (void)ldlm_namespace_foreach_res(ns, reprocess_one_queue, NULL);
889 /* Must be called with resource->lr_lock not taken. */
890 void ldlm_reprocess_all(struct ldlm_resource *res)
892 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
896 /* Local lock trees don't get reprocessed. */
897 if (res->lr_namespace->ns_client) {
903 l_lock(&res->lr_namespace->ns_lock);
904 res->lr_tmp = &rpc_list;
906 ldlm_reprocess_queue(res, &res->lr_converting);
907 if (list_empty(&res->lr_converting))
908 ldlm_reprocess_queue(res, &res->lr_waiting);
911 l_unlock(&res->lr_namespace->ns_lock);
913 rc = ldlm_run_ast_work(&rpc_list);
919 void ldlm_cancel_callback(struct ldlm_lock *lock)
921 l_lock(&lock->l_resource->lr_namespace->ns_lock);
922 if (!(lock->l_flags & LDLM_FL_CANCEL)) {
923 lock->l_flags |= LDLM_FL_CANCEL;
924 if (lock->l_blocking_ast)
925 lock->l_blocking_ast(lock, NULL, lock->l_data,
929 LDLM_DEBUG(lock, "no blocking ast");
931 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
934 void ldlm_lock_cancel(struct ldlm_lock *lock)
936 struct ldlm_resource *res;
937 struct ldlm_namespace *ns;
940 res = lock->l_resource;
941 ns = res->lr_namespace;
943 l_lock(&ns->ns_lock);
944 /* Please do not, no matter how tempting, remove this LBUG without
945 * talking to me first. -phik */
946 if (lock->l_readers || lock->l_writers) {
947 LDLM_DEBUG(lock, "lock still has references");
948 ldlm_lock_dump(D_OTHER, lock);
952 ldlm_cancel_callback(lock);
954 ldlm_del_waiting_lock(lock);
955 ldlm_resource_unlink_lock(lock);
956 ldlm_lock_destroy(lock);
957 l_unlock(&ns->ns_lock);
961 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data, int datalen)
963 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
970 lock->l_data_len = datalen;
977 void ldlm_cancel_locks_for_export(struct obd_export *exp)
979 struct list_head *iter, *n; /* MUST BE CALLED "n"! */
981 list_for_each_safe(iter, n, &exp->exp_ldlm_data.led_held_locks) {
982 struct ldlm_lock *lock;
983 struct ldlm_resource *res;
984 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
985 res = ldlm_resource_getref(lock->l_resource);
986 LDLM_DEBUG(lock, "export %p", exp);
987 ldlm_lock_cancel(lock);
988 ldlm_reprocess_all(res);
989 ldlm_resource_putref(res);
993 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
996 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
997 struct ldlm_resource *res;
998 struct ldlm_namespace *ns;
1002 res = lock->l_resource;
1003 ns = res->lr_namespace;
1005 l_lock(&ns->ns_lock);
1007 lock->l_req_mode = new_mode;
1008 ldlm_resource_unlink_lock(lock);
1010 /* If this is a local resource, put it on the appropriate list. */
1011 if (res->lr_namespace->ns_client) {
1012 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED))
1013 ldlm_resource_add_lock(res, res->lr_converting.prev,
1016 /* This should never happen, because of the way the
1017 * server handles conversions. */
1020 res->lr_tmp = &rpc_list;
1021 ldlm_grant_lock(lock);
1024 /* FIXME: completion handling not with ns_lock held ! */
1025 if (lock->l_completion_ast)
1026 lock->l_completion_ast(lock, 0);
1029 /* FIXME: We should try the conversion right away and possibly
1030 * return success without the need for an extra AST */
1031 ldlm_resource_add_lock(res, res->lr_converting.prev, lock);
1032 *flags |= LDLM_FL_BLOCK_CONV;
1035 l_unlock(&ns->ns_lock);
1038 ldlm_run_ast_work(&rpc_list);
1042 void ldlm_lock_dump(int level, struct ldlm_lock *lock)
1046 if (!(portal_debug & level))
1049 if (RES_VERSION_SIZE != 4)
1053 CDEBUG(level, " NULL LDLM lock\n");
1057 snprintf(ver, sizeof(ver), "%x %x %x %x",
1058 lock->l_version[0], lock->l_version[1],
1059 lock->l_version[2], lock->l_version[3]);
1061 CDEBUG(level, " -- Lock dump: %p (%s)\n", lock, ver);
1062 if (lock->l_export && lock->l_export->exp_connection)
1063 CDEBUG(level, " Node: NID %x (rhandle: "LPX64")\n",
1064 lock->l_export->exp_connection->c_peer.peer_nid,
1065 lock->l_remote_handle.cookie);
1067 CDEBUG(level, " Node: local\n");
1068 CDEBUG(level, " Parent: %p\n", lock->l_parent);
1069 CDEBUG(level, " Resource: %p ("LPD64")\n", lock->l_resource,
1070 lock->l_resource->lr_name[0]);
1071 CDEBUG(level, " Requested mode: %d, granted mode: %d\n",
1072 (int)lock->l_req_mode, (int)lock->l_granted_mode);
1073 CDEBUG(level, " Readers: %u ; Writers; %u\n",
1074 lock->l_readers, lock->l_writers);
1075 if (lock->l_resource->lr_type == LDLM_EXTENT)
1076 CDEBUG(level, " Extent: "LPU64" -> "LPU64"\n",
1077 lock->l_extent.start, lock->l_extent.end);
1080 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
1082 struct ldlm_lock *lock;
1084 lock = ldlm_handle2lock(lockh);
1088 ldlm_lock_dump(D_OTHER, lock);
1090 LDLM_LOCK_PUT(lock);