1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * by Cluster File Systems, Inc.
10 * authors, Peter Braam <braam@clusterfs.com> &
11 * Phil Schwan <phil@clusterfs.com>
15 #define DEBUG_SUBSYSTEM S_LDLM
17 #include <linux/slab.h>
18 #include <linux/lustre_dlm.h>
20 extern kmem_cache_t *ldlm_lock_slab;
22 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
23 static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b);
25 ldlm_res_compat ldlm_res_compat_table [] = {
26 [LDLM_PLAIN] ldlm_plain_compat,
27 [LDLM_EXTENT] ldlm_extent_compat,
28 [LDLM_MDSINTENT] ldlm_intent_compat
31 ldlm_res_policy ldlm_res_policy_table [] = {
33 [LDLM_EXTENT] ldlm_extent_policy,
37 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
39 return lockmode_compat(a->l_req_mode, b->l_req_mode);
42 static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b)
48 /* Args: referenced, unlocked parent (or NULL)
49 * referenced, unlocked resource
50 * Locks: parent->l_lock */
51 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
52 struct ldlm_resource *resource)
54 struct ldlm_lock *lock;
59 lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
63 memset(lock, 0, sizeof(*lock));
64 lock->l_resource = resource;
65 INIT_LIST_HEAD(&lock->l_children);
66 INIT_LIST_HEAD(&lock->l_res_link);
67 init_waitqueue_head(&lock->l_waitq);
68 lock->l_lock = SPIN_LOCK_UNLOCKED;
71 spin_lock(&parent->l_lock);
72 lock->l_parent = parent;
73 list_add(&lock->l_childof, &parent->l_children);
74 spin_unlock(&parent->l_lock);
80 /* Args: unreferenced, locked lock
82 * Caller must do its own ldlm_resource_put() on lock->l_resource */
83 void ldlm_lock_free(struct ldlm_lock *lock)
85 if (!list_empty(&lock->l_children)) {
86 CERROR("lock %p still has children (%p)!\n", lock,
87 lock->l_children.next);
92 if (lock->l_readers || lock->l_writers)
93 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
94 "writers)\n", lock->l_readers, lock->l_writers);
96 if (lock->l_connection)
97 ptlrpc_put_connection(lock->l_connection);
98 kmem_cache_free(ldlm_lock_slab, lock);
101 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
103 ldlm_res2desc(lock->l_resource, &desc->l_resource);
104 desc->l_req_mode = lock->l_req_mode;
105 desc->l_granted_mode = lock->l_granted_mode;
106 memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
107 memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
110 /* Args: unlocked lock */
111 void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
113 spin_lock(&lock->l_lock);
114 if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
118 spin_unlock(&lock->l_lock);
121 /* Args: unlocked lock */
122 void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
126 spin_lock(&lock->l_lock);
127 if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
131 if (!lock->l_readers && !lock->l_writers &&
132 lock->l_flags & LDLM_FL_DYING) {
133 /* Read this lock its rights. */
134 if (!lock->l_resource->lr_namespace->ns_local) {
135 CERROR("LDLM_FL_DYING set on non-local lock!\n");
139 CDEBUG(D_INFO, "final decref done on dying lock, "
141 spin_unlock(&lock->l_lock);
142 rc = ldlm_cli_cancel(lock->l_client, lock);
144 /* FIXME: do something more dramatic */
145 CERROR("ldlm_cli_cancel: %d\n", rc);
148 spin_unlock(&lock->l_lock);
151 /* Args: locked lock */
152 static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
153 struct list_head *queue)
155 struct list_head *tmp, *pos;
158 list_for_each_safe(tmp, pos, queue) {
159 struct ldlm_lock *child;
160 ldlm_res_compat compat;
162 child = list_entry(tmp, struct ldlm_lock, l_res_link);
166 compat = ldlm_res_compat_table[child->l_resource->lr_type];
167 if (compat(child, lock)) {
168 CDEBUG(D_OTHER, "compat function succeded, next.\n");
171 if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
172 CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
178 CDEBUG(D_OTHER, "compat function failed and lock modes are "
179 "incompatible; sending blocking AST.\n");
180 if (send_cbs && child->l_blocking_ast != NULL)
181 child->l_blocking_ast(child, lock, child->l_data,
188 /* Args: unlocked lock */
189 static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
194 rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted);
195 rc |= _ldlm_lock_compat(lock, send_cbs,
196 &lock->l_resource->lr_converting);
201 /* Args: locked lock, locked resource */
202 void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
206 ldlm_resource_add_lock(res, &res->lr_granted, lock);
207 lock->l_granted_mode = lock->l_req_mode;
209 if (lock->l_granted_mode < res->lr_most_restr)
210 res->lr_most_restr = lock->l_granted_mode;
212 if (lock->l_completion_ast)
213 lock->l_completion_ast(lock, NULL,
214 lock->l_data, lock->l_data_len);
218 static int search_queue(struct list_head *queue, ldlm_mode_t mode,
219 struct ldlm_extent *extent, struct ldlm_handle *lockh)
221 struct list_head *tmp;
223 list_for_each(tmp, queue) {
224 struct ldlm_lock *lock;
225 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
227 if (lock->l_flags & LDLM_FL_DYING)
230 /* lock_convert() takes the resource lock, so we're sure that
231 * req_mode, lr_type, and l_extent won't change beneath us */
232 if (lock->l_req_mode != mode)
235 if (lock->l_resource->lr_type == LDLM_EXTENT &&
236 (lock->l_extent.start > extent->start ||
237 lock->l_extent.end < extent->end))
240 ldlm_lock_addref(lock, mode);
241 ldlm_object2handle(lock, lockh);
248 /* Must be called with no resource or lock locks held.
250 * Returns 1 if it finds an already-existing lock that is compatible; in this
251 * case, lockh is filled in with a addref()ed lock */
252 int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
253 struct ldlm_extent *extent, ldlm_mode_t mode,
254 struct ldlm_handle *lockh)
256 struct ldlm_resource *res;
260 res = ldlm_resource_get(ns, NULL, res_id, type, 0);
264 spin_lock(&res->lr_lock);
265 if (search_queue(&res->lr_granted, mode, extent, lockh))
267 if (search_queue(&res->lr_converting, mode, extent, lockh))
269 if (search_queue(&res->lr_waiting, mode, extent, lockh))
274 ldlm_resource_put(res);
275 spin_unlock(&res->lr_lock);
279 /* Must be called without the resource lock held. */
280 ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns,
281 struct ldlm_handle *parent_lock_handle,
282 __u64 *res_id, __u32 type,
286 struct ldlm_handle *lockh)
288 struct ldlm_resource *res, *parent_res = NULL;
289 struct ldlm_lock *lock, *parent_lock;
291 parent_lock = ldlm_handle2object(parent_lock_handle);
293 parent_res = parent_lock->l_resource;
295 res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
299 lock = ldlm_lock_new(parent_lock, res);
301 spin_lock(&res->lr_lock);
302 ldlm_resource_put(res);
303 spin_unlock(&res->lr_lock);
307 lock->l_req_mode = mode;
309 lock->l_data_len = data_len;
310 ldlm_lock_addref(lock, mode);
312 ldlm_object2handle(lock, lockh);
316 /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
317 ldlm_error_t ldlm_local_lock_enqueue(struct ldlm_handle *lockh,
318 struct ldlm_extent *req_ex,
320 ldlm_lock_callback completion,
321 ldlm_lock_callback blocking)
323 struct ldlm_resource *res;
324 struct ldlm_lock *lock;
325 int incompat = 0, local;
326 ldlm_res_policy policy;
329 lock = ldlm_handle2object(lockh);
330 res = lock->l_resource;
331 local = res->lr_namespace->ns_local;
332 spin_lock(&res->lr_lock);
334 lock->l_blocking_ast = blocking;
336 if ((res->lr_type == LDLM_EXTENT && !req_ex) ||
337 (res->lr_type != LDLM_EXTENT && req_ex))
340 if ((policy = ldlm_res_policy_table[res->lr_type])) {
341 struct ldlm_extent new_ex;
342 int rc = policy(res, req_ex, &new_ex, lock->l_req_mode, NULL);
343 if (rc == ELDLM_LOCK_CHANGED) {
344 *flags |= LDLM_FL_LOCK_CHANGED;
345 memcpy(req_ex, &new_ex, sizeof(new_ex));
350 memcpy(&lock->l_extent, req_ex, sizeof(*req_ex));
352 if (local && lock->l_req_mode == lock->l_granted_mode) {
353 /* The server returned a blocked lock, but it was granted before
354 * we got a chance to actually enqueue it. We don't need to do
359 /* If this is a local resource, put it on the appropriate list. */
361 if (*flags & LDLM_FL_BLOCK_CONV)
362 ldlm_resource_add_lock(res, res->lr_converting.prev,
364 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
365 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
367 ldlm_grant_lock(res, lock);
371 /* FIXME: We may want to optimize by checking lr_most_restr */
372 if (!list_empty(&res->lr_converting)) {
373 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
374 *flags |= LDLM_FL_BLOCK_CONV;
377 if (!list_empty(&res->lr_waiting)) {
378 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
379 *flags |= LDLM_FL_BLOCK_WAIT;
382 incompat = ldlm_lock_compat(lock, 0);
384 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
385 *flags |= LDLM_FL_BLOCK_GRANTED;
389 ldlm_grant_lock(res, lock);
392 /* Don't set 'completion_ast' until here so that if the lock is granted
393 * immediately we don't do an unnecessary completion call. */
394 lock->l_completion_ast = completion;
395 spin_unlock(&res->lr_lock);
399 /* Must be called with resource->lr_lock taken. */
400 static int ldlm_reprocess_queue(struct ldlm_resource *res,
401 struct list_head *converting)
403 struct list_head *tmp, *pos;
406 list_for_each_safe(tmp, pos, converting) {
407 struct ldlm_lock *pending;
408 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
410 /* the resource lock protects ldlm_lock_compat */
411 if (ldlm_lock_compat(pending, 1))
414 list_del_init(&pending->l_res_link);
415 ldlm_grant_lock(res, pending);
417 ldlm_lock_addref(pending, pending->l_req_mode);
418 ldlm_lock_decref(pending, pending->l_granted_mode);
424 /* Must be called with resource->lr_lock not taken. */
425 void ldlm_reprocess_all(struct ldlm_resource *res)
427 /* Local lock trees don't get reprocessed. */
428 if (res->lr_namespace->ns_local)
431 spin_lock(&res->lr_lock);
432 ldlm_reprocess_queue(res, &res->lr_converting);
433 if (list_empty(&res->lr_converting))
434 ldlm_reprocess_queue(res, &res->lr_waiting);
435 spin_unlock(&res->lr_lock);
438 /* Must be called with lock and lock->l_resource unlocked */
439 struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock)
441 struct ldlm_resource *res;
444 res = lock->l_resource;
446 spin_lock(&res->lr_lock);
447 spin_lock(&lock->l_lock);
449 if (lock->l_readers || lock->l_writers)
450 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
451 "writers)\n", lock->l_readers, lock->l_writers);
453 ldlm_resource_del_lock(lock);
454 if (ldlm_resource_put(res))
455 res = NULL; /* res was freed, nothing else to do. */
457 spin_unlock(&res->lr_lock);
458 ldlm_lock_free(lock);
463 /* Must be called with lock and lock->l_resource unlocked */
464 struct ldlm_resource *ldlm_local_lock_convert(struct ldlm_handle *lockh,
465 int new_mode, int *flags)
467 struct ldlm_lock *lock;
468 struct ldlm_resource *res;
471 lock = ldlm_handle2object(lockh);
472 res = lock->l_resource;
474 spin_lock(&res->lr_lock);
476 lock->l_req_mode = new_mode;
477 list_del_init(&lock->l_res_link);
479 /* If this is a local resource, put it on the appropriate list. */
480 if (res->lr_namespace->ns_local) {
481 if (*flags & LDLM_FL_BLOCK_CONV)
482 ldlm_resource_add_lock(res, res->lr_converting.prev,
484 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
485 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
487 ldlm_grant_lock(res, lock);
489 list_add(&lock->l_res_link, res->lr_converting.prev);
492 spin_unlock(&res->lr_lock);
497 void ldlm_lock_dump(struct ldlm_lock *lock)
501 if (!(portal_debug & D_OTHER))
504 if (RES_VERSION_SIZE != 4)
507 snprintf(ver, sizeof(ver), "%x %x %x %x",
508 lock->l_version[0], lock->l_version[1],
509 lock->l_version[2], lock->l_version[3]);
511 CDEBUG(D_OTHER, " -- Lock dump: %p (%s)\n", lock, ver);
512 CDEBUG(D_OTHER, " Parent: %p\n", lock->l_parent);
513 CDEBUG(D_OTHER, " Resource: %p\n", lock->l_resource);
514 CDEBUG(D_OTHER, " Requested mode: %d, granted mode: %d\n",
515 (int)lock->l_req_mode, (int)lock->l_granted_mode);
516 CDEBUG(D_OTHER, " Readers: %u ; Writers; %u\n",
517 lock->l_readers, lock->l_writers);
518 if (lock->l_resource->lr_type == LDLM_EXTENT)
519 CDEBUG(D_OTHER, " Extent: %Lu -> %Lu\n",
520 (unsigned long long)lock->l_extent.start,
521 (unsigned long long)lock->l_extent.end);