1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * by Cluster File Systems, Inc.
10 * authors, Peter Braam <braam@clusterfs.com> &
11 * Phil Schwan <phil@clusterfs.com>
16 #include <linux/version.h>
17 #include <linux/module.h>
18 #include <linux/slab.h>
19 #include <asm/unistd.h>
21 #define DEBUG_SUBSYSTEM S_LDLM
23 #include <linux/obd_support.h>
24 #include <linux/obd_class.h>
26 #include <linux/lustre_dlm.h>
28 extern kmem_cache_t *ldlm_lock_slab;
30 ldlm_res_compat ldlm_res_compat_table [] = {
32 [LDLM_EXTENT] ldlm_extent_compat,
36 ldlm_res_policy ldlm_res_policy_table [] = {
38 [LDLM_EXTENT] ldlm_extent_policy,
42 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
43 struct ldlm_resource *resource,
46 struct ldlm_lock *lock;
51 lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
55 memset(lock, 0, sizeof(*lock));
56 lock->l_resource = resource;
57 lock->l_req_mode = mode;
58 INIT_LIST_HEAD(&lock->l_children);
61 lock->l_parent = parent;
62 list_add(&lock->l_childof, &parent->l_children);
68 static int ldlm_notify_incompatible(struct list_head *list,
69 struct ldlm_lock *new)
71 struct list_head *tmp;
74 list_for_each(tmp, list) {
75 struct ldlm_lock *lock;
76 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
77 if (lockmode_compat(lock->l_req_mode, new->l_req_mode))
82 if (lock->l_blocking_ast != NULL)
83 lock->l_blocking_ast(lock, new, lock->l_data,
90 static int ldlm_lock_compat(struct ldlm_lock *lock)
92 struct ldlm_resource *parent_res = lock->l_resource->lr_parent;
93 ldlm_res_compat compat;
96 (compat = ldlm_res_compat_table[parent_res->lr_type])) {
97 struct list_head *tmp;
99 list_for_each(tmp, &parent_res->lr_children) {
100 struct ldlm_resource *child;
101 child = list_entry(tmp, struct ldlm_resource,
104 /* compat will return 0 when child == l_resource
105 * hence notifications on the same resource are incl. */
106 if (compat(child, lock->l_resource))
109 incompat |= ldlm_notify_incompatible(&child->lr_granted,
116 return ldlm_notify_incompatible(&lock->l_resource->lr_granted, lock);
119 static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
121 ldlm_resource_add_lock(res, &res->lr_granted, lock);
122 lock->l_granted_mode = lock->l_req_mode;
124 if (lock->l_granted_mode < res->lr_most_restr)
125 res->lr_most_restr = lock->l_granted_mode;
127 if (lock->l_completion_ast)
128 lock->l_completion_ast(lock, NULL, NULL, 0);
131 static int ldlm_reprocess_queue(struct ldlm_lock *lock,
132 struct list_head *converting,
133 struct list_head *granted_list)
135 struct list_head *tmp, *pos;
138 list_for_each_safe(tmp, pos, converting) {
139 struct ldlm_lock *pending;
140 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
142 incompat = ldlm_lock_compat(pending);
146 list_del(&pending->l_res_link);
147 ldlm_grant_lock(pending->l_resource, pending);
153 /* XXX: Revisit the error handling; we do not, for example, do
154 * ldlm_resource_put()s in our error cases, and we probably leak an allocated
156 ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
158 struct ldlm_handle *parent_lock_handle,
163 ldlm_lock_callback completion,
164 ldlm_lock_callback blocking,
167 struct ldlm_handle *lockh)
169 struct ldlm_namespace *ns;
170 struct ldlm_resource *res, *parent_res;
171 struct ldlm_lock *lock, *parent_lock;
172 int incompat = 0, rc;
173 __u64 new_id[RES_NAME_SIZE];
174 ldlm_res_policy policy;
178 parent_lock = ldlm_handle2object(parent_lock_handle);
180 parent_res = parent_lock->l_resource;
184 ns = ldlm_namespace_find(obddev, ns_id);
185 if (ns == NULL || ns->ns_hash == NULL)
186 RETURN(-ELDLM_BAD_NAMESPACE);
189 (policy = ldlm_res_policy_table[parent_res->lr_type])) {
190 rc = policy(parent_res, res_id, new_id, mode, NULL);
191 if (rc == ELDLM_RES_CHANGED) {
192 *flags |= LDLM_FL_RES_CHANGED;
193 memcpy(res_id, new_id, sizeof(__u64) * RES_NAME_SIZE);
197 res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
201 lock = ldlm_lock_new(parent_lock, res, mode);
206 lock->l_data_len = data_len;
207 if ((*flags) & LDLM_FL_COMPLETION_AST)
208 lock->l_completion_ast = completion;
209 if ((*flags) & LDLM_FL_BLOCKING_AST)
210 lock->l_blocking_ast = blocking;
211 ldlm_object2handle(lock, lockh);
212 spin_lock(&res->lr_lock);
214 /* FIXME: We may want to optimize by checking lr_most_restr */
216 if (!list_empty(&res->lr_converting)) {
217 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
218 GOTO(out, rc = -ELDLM_BLOCK_CONV);
220 if (!list_empty(&res->lr_waiting)) {
221 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
222 GOTO(out, rc = -ELDLM_BLOCK_WAIT);
225 incompat = ldlm_lock_compat(lock);
227 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
228 GOTO(out, rc = -ELDLM_BLOCK_GRANTED);
231 ldlm_grant_lock(res, lock);
232 GOTO(out, rc = ELDLM_OK);
235 spin_unlock(&res->lr_lock);
239 static void ldlm_reprocess_res_compat(struct ldlm_lock *lock)
241 struct ldlm_resource *parent_res = lock->l_resource->lr_parent;
242 struct list_head *tmp;
245 list_for_each(tmp, &parent_res->lr_children) {
246 struct ldlm_resource *child;
247 child = list_entry(tmp, struct ldlm_resource, lr_childof);
249 ldlm_reprocess_queue(lock, &child->lr_converting,
251 if (!list_empty(&child->lr_converting))
258 list_for_each(tmp, &parent_res->lr_children) {
259 struct ldlm_resource *child;
260 child = list_entry(tmp, struct ldlm_resource, lr_childof);
262 ldlm_reprocess_queue(lock, &child->lr_waiting,
267 static void ldlm_reprocess_all(struct ldlm_lock *lock)
269 struct ldlm_resource *res = lock->l_resource;
270 struct ldlm_resource *parent_res = res->lr_parent;
272 if (parent_res && ldlm_res_compat_table[parent_res->lr_type]) {
273 ldlm_reprocess_res_compat(lock);
277 ldlm_reprocess_queue(lock, &res->lr_converting, &res->lr_granted);
278 if (list_empty(&res->lr_converting))
279 ldlm_reprocess_queue(lock, &res->lr_waiting, &res->lr_granted);
282 ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev,
283 struct ldlm_handle *lockh)
285 struct ldlm_lock *lock;
286 struct ldlm_resource *res;
289 lock = ldlm_handle2object(lockh);
290 res = lock->l_resource;
292 ldlm_resource_del_lock(lock);
294 kmem_cache_free(ldlm_lock_slab, lock);
295 if (ldlm_resource_put(res))
297 ldlm_reprocess_all(lock);
302 ldlm_error_t ldlm_local_lock_convert(struct obd_device *obddev,
303 struct ldlm_handle *lockh,
304 int new_mode, int *flags)
306 struct ldlm_lock *lock;
307 struct ldlm_resource *res;
310 lock = ldlm_handle2object(lockh);
311 res = lock->l_resource;
312 list_del(&lock->l_res_link);
313 lock->l_req_mode = new_mode;
315 list_add(&lock->l_res_link, res->lr_converting.prev);
317 ldlm_reprocess_all(lock);
322 void ldlm_lock_dump(struct ldlm_lock *lock)
326 if (RES_VERSION_SIZE != 4)
329 snprintf(ver, sizeof(ver), "%x %x %x %x",
330 lock->l_version[0], lock->l_version[1],
331 lock->l_version[2], lock->l_version[3]);
333 CDEBUG(D_OTHER, " -- Lock dump: %p (%s)\n", lock, ver);
334 CDEBUG(D_OTHER, " Parent: %p\n", lock->l_parent);
335 CDEBUG(D_OTHER, " Requested mode: %d, granted mode: %d\n",
336 (int)lock->l_req_mode, (int)lock->l_granted_mode);