1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * by Cluster File Systems, Inc.
10 * authors, Peter Braam <braam@clusterfs.com> &
11 * Phil Schwan <phil@clusterfs.com>
14 #define DEBUG_SUBSYSTEM S_LDLM
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/lustre_dlm.h>
19 #include <linux/lustre_mds.h>
21 extern kmem_cache_t *ldlm_lock_slab;
22 int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL;
23 int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL;
25 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
26 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
27 ldlm_mode_t mode, void *data);
29 ldlm_res_compat ldlm_res_compat_table [] = {
30 [LDLM_PLAIN] ldlm_plain_compat,
31 [LDLM_EXTENT] ldlm_extent_compat,
32 [LDLM_MDSINTENT] ldlm_plain_compat
35 ldlm_res_policy ldlm_res_policy_table [] = {
37 [LDLM_EXTENT] ldlm_extent_policy,
38 [LDLM_MDSINTENT] ldlm_intent_policy
41 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
42 ldlm_mode_t mode, void *data)
44 struct ptlrpc_request *req = req_cookie;
51 if (req->rq_reqmsg->bufcount > 1) {
52 /* an intent needs to be considered */
53 struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
54 struct mds_body *mds_rep;
55 struct ldlm_reply *rep;
56 struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
57 __u32 type = lock->l_resource->lr_type;
58 __u64 new_resid[3] = {0, 0, 0};
59 int bufcount, rc, size[3] = {sizeof(struct ldlm_reply),
60 sizeof(struct mds_body),
63 it->opc = NTOH__u64(it->opc);
67 /* Note that in the negative case you may be returning
68 * a file and its obdo */
70 case IT_CREAT|IT_OPEN:
83 rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
86 rc = req->rq_status = -ENOMEM;
90 rep = lustre_msg_buf(req->rq_repmsg, 0);
91 rep->lock_policy_res1 = 1;
94 case IT_CREAT|IT_OPEN:
101 if (mds_reint_p == NULL)
103 inter_module_get_request
104 ("mds_reint", "mds");
105 if (IS_ERR(mds_reint_p)) {
106 CERROR("MDSINTENT locks require the MDS "
111 rc = mds_reint_p(2, req);
119 if (mds_getattr_name_p == NULL)
121 inter_module_get_request
122 ("mds_getattr_name", "mds");
123 if (IS_ERR(mds_getattr_name_p)) {
124 CERROR("MDSINTENT locks require the MDS "
129 rc = mds_getattr_name_p(2, req);
135 case IT_READDIR|IT_OPEN:
139 CERROR("Unhandled intent\n");
143 mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
144 rep->lock_policy_res2 = req->rq_status;
145 new_resid[0] = mds_rep->ino;
147 CDEBUG(D_INFO, "remote intent: locking %d instead of"
148 "%ld\n", mds_rep->ino,
149 (long)lock->l_resource->lr_name[0]);
150 ldlm_resource_put(lock->l_resource);
153 ldlm_resource_get(ns, NULL, new_resid, type, 1);
154 if (lock->l_resource == NULL) {
158 RETURN(ELDLM_LOCK_CHANGED);
160 int size = sizeof(struct ldlm_reply);
161 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
164 CERROR("out of memory\n");
172 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
174 return lockmode_compat(a->l_req_mode, b->l_req_mode);
177 /* Args: referenced, unlocked parent (or NULL)
178 * referenced, unlocked resource
179 * Locks: parent->l_lock */
180 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
181 struct ldlm_resource *resource)
183 struct ldlm_lock *lock;
185 if (resource == NULL)
188 lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
192 memset(lock, 0, sizeof(*lock));
193 lock->l_resource = resource;
194 INIT_LIST_HEAD(&lock->l_children);
195 INIT_LIST_HEAD(&lock->l_res_link);
196 init_waitqueue_head(&lock->l_waitq);
197 lock->l_lock = SPIN_LOCK_UNLOCKED;
199 if (parent != NULL) {
200 spin_lock(&parent->l_lock);
201 lock->l_parent = parent;
202 list_add(&lock->l_childof, &parent->l_children);
203 spin_unlock(&parent->l_lock);
209 /* Args: unreferenced, locked lock
211 * Caller must do its own ldlm_resource_put() on lock->l_resource */
212 void ldlm_lock_free(struct ldlm_lock *lock)
214 if (!list_empty(&lock->l_children)) {
215 CERROR("lock %p still has children (%p)!\n", lock,
216 lock->l_children.next);
217 ldlm_lock_dump(lock);
221 if (lock->l_readers || lock->l_writers)
222 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
223 "writers)\n", lock->l_readers, lock->l_writers);
225 if (lock->l_connection)
226 ptlrpc_put_connection(lock->l_connection);
227 kmem_cache_free(ldlm_lock_slab, lock);
230 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
232 ldlm_res2desc(lock->l_resource, &desc->l_resource);
233 desc->l_req_mode = lock->l_req_mode;
234 desc->l_granted_mode = lock->l_granted_mode;
235 memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
236 memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
239 /* Args: unlocked lock */
240 void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
242 spin_lock(&lock->l_lock);
243 if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
247 spin_unlock(&lock->l_lock);
250 void ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
254 spin_lock(&lock->l_lock);
255 if (lock->l_flags & LDLM_FL_AST_SENT) {
260 lock->l_flags |= LDLM_FL_AST_SENT;
261 spin_unlock(&lock->l_lock);
263 lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len);
267 /* Args: unlocked lock */
268 void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
275 spin_lock(&lock->l_lock);
276 if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
280 if (!lock->l_readers && !lock->l_writers &&
281 lock->l_flags & LDLM_FL_DYING) {
282 /* Read this lock its rights. */
283 if (!lock->l_resource->lr_namespace->ns_local) {
284 CERROR("LDLM_FL_DYING set on non-local lock!\n");
288 CDEBUG(D_INFO, "final decref done on dying lock, "
289 "calling callback.\n");
290 spin_unlock(&lock->l_lock);
291 lock->l_blocking_ast(lock, NULL, lock->l_data,
294 spin_unlock(&lock->l_lock);
298 /* Args: locked lock */
299 static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
300 struct list_head *queue)
302 struct list_head *tmp, *pos;
305 list_for_each_safe(tmp, pos, queue) {
306 struct ldlm_lock *child;
307 ldlm_res_compat compat;
309 child = list_entry(tmp, struct ldlm_lock, l_res_link);
313 compat = ldlm_res_compat_table[child->l_resource->lr_type];
314 if (compat(child, lock)) {
315 CDEBUG(D_OTHER, "compat function succeded, next.\n");
318 if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
319 CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
325 CDEBUG(D_OTHER, "compat function failed and lock modes incompat\n");
326 if (send_cbs && child->l_blocking_ast != NULL) {
327 CDEBUG(D_OTHER, "incompatible; sending blocking AST.\n");
328 ldlm_send_blocking_ast(child, lock);
335 /* Args: unlocked lock */
336 static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
341 rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted);
342 /* FIXME: should we be sending ASTs to converting? */
343 rc |= _ldlm_lock_compat(lock, send_cbs,
344 &lock->l_resource->lr_converting);
349 /* Args: locked lock, locked resource */
350 void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
354 ldlm_resource_add_lock(res, &res->lr_granted, lock);
355 lock->l_granted_mode = lock->l_req_mode;
357 if (lock->l_granted_mode < res->lr_most_restr)
358 res->lr_most_restr = lock->l_granted_mode;
360 if (lock->l_completion_ast)
361 lock->l_completion_ast(lock, NULL,
362 lock->l_data, lock->l_data_len);
366 static int search_queue(struct list_head *queue, ldlm_mode_t mode,
367 struct ldlm_extent *extent, struct lustre_handle *lockh)
369 struct list_head *tmp;
371 list_for_each(tmp, queue) {
372 struct ldlm_lock *lock;
373 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
375 if (lock->l_flags & LDLM_FL_DYING)
378 /* lock_convert() takes the resource lock, so we're sure that
379 * req_mode, lr_type, and l_cookie won't change beneath us */
380 if (lock->l_req_mode != mode)
383 if (lock->l_resource->lr_type == LDLM_EXTENT &&
384 (lock->l_extent.start > extent->start ||
385 lock->l_extent.end < extent->end))
388 ldlm_lock_addref(lock, mode);
389 ldlm_object2handle(lock, lockh);
396 /* Must be called with no resource or lock locks held.
398 * Returns 1 if it finds an already-existing lock that is compatible; in this
399 * case, lockh is filled in with a addref()ed lock */
400 int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
401 void *cookie, int cookielen, ldlm_mode_t mode,
402 struct lustre_handle *lockh)
404 struct ldlm_resource *res;
408 res = ldlm_resource_get(ns, NULL, res_id, type, 0);
412 spin_lock(&res->lr_lock);
413 if (search_queue(&res->lr_granted, mode, cookie, lockh))
415 if (search_queue(&res->lr_converting, mode, cookie, lockh))
417 if (search_queue(&res->lr_waiting, mode, cookie, lockh))
422 ldlm_resource_put(res);
423 spin_unlock(&res->lr_lock);
427 /* Must be called without the resource lock held. Returns a referenced,
428 * unlocked ldlm_lock. */
429 ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns,
430 struct lustre_handle *parent_lock_handle,
431 __u64 *res_id, __u32 type,
435 struct lustre_handle *lockh)
437 struct ldlm_resource *res, *parent_res = NULL;
438 struct ldlm_lock *lock, *parent_lock;
440 parent_lock = lustre_handle2object(parent_lock_handle);
442 parent_res = parent_lock->l_resource;
444 res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
448 lock = ldlm_lock_new(parent_lock, res);
450 spin_lock(&res->lr_lock);
451 ldlm_resource_put(res);
452 spin_unlock(&res->lr_lock);
456 lock->l_req_mode = mode;
458 lock->l_data_len = data_len;
459 ldlm_lock_addref(lock, mode);
461 ldlm_object2handle(lock, lockh);
465 /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
466 ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh,
467 void *cookie, int cookie_len,
469 ldlm_lock_callback completion,
470 ldlm_lock_callback blocking)
472 struct ldlm_resource *res;
473 struct ldlm_lock *lock;
474 int incompat = 0, local;
475 ldlm_res_policy policy;
478 lock = lustre_handle2object(lockh);
479 res = lock->l_resource;
480 local = res->lr_namespace->ns_local;
481 spin_lock(&res->lr_lock);
483 lock->l_blocking_ast = blocking;
485 if (res->lr_type == LDLM_EXTENT)
486 memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
488 /* policies are not executed on the client */
489 if (!local && (policy = ldlm_res_policy_table[res->lr_type])) {
490 int rc = policy(lock, cookie, lock->l_req_mode, NULL);
491 if (rc == ELDLM_LOCK_CHANGED) {
492 res = lock->l_resource;
493 *flags |= LDLM_FL_LOCK_CHANGED;
497 ldlm_resource_put(lock->l_resource);
498 ldlm_lock_free(lock);
503 lock->l_cookie = cookie;
504 lock->l_cookie_len = cookie_len;
506 if (local && lock->l_req_mode == lock->l_granted_mode) {
507 /* The server returned a blocked lock, but it was granted before
508 * we got a chance to actually enqueue it. We don't need to do
513 /* If this is a local resource, put it on the appropriate list. */
514 list_del_init(&lock->l_res_link);
516 if (*flags & LDLM_FL_BLOCK_CONV)
517 ldlm_resource_add_lock(res, res->lr_converting.prev,
519 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
520 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
522 ldlm_grant_lock(res, lock);
526 /* FIXME: We may want to optimize by checking lr_most_restr */
527 if (!list_empty(&res->lr_converting)) {
528 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
529 *flags |= LDLM_FL_BLOCK_CONV;
532 if (!list_empty(&res->lr_waiting)) {
533 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
534 *flags |= LDLM_FL_BLOCK_WAIT;
537 incompat = ldlm_lock_compat(lock, 0);
539 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
540 *flags |= LDLM_FL_BLOCK_GRANTED;
544 ldlm_grant_lock(res, lock);
547 /* Don't set 'completion_ast' until here so that if the lock is granted
548 * immediately we don't do an unnecessary completion call. */
549 lock->l_completion_ast = completion;
550 spin_unlock(&res->lr_lock);
554 /* Must be called with resource->lr_lock taken. */
555 static int ldlm_reprocess_queue(struct ldlm_resource *res,
556 struct list_head *converting)
558 struct list_head *tmp, *pos;
561 list_for_each_safe(tmp, pos, converting) {
562 struct ldlm_lock *pending;
563 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
565 /* the resource lock protects ldlm_lock_compat */
566 if (ldlm_lock_compat(pending, 1))
569 list_del_init(&pending->l_res_link);
570 ldlm_grant_lock(res, pending);
572 ldlm_lock_addref(pending, pending->l_req_mode);
573 ldlm_lock_decref(pending, pending->l_granted_mode);
579 /* Must be called with resource->lr_lock not taken. */
580 void ldlm_reprocess_all(struct ldlm_resource *res)
582 /* Local lock trees don't get reprocessed. */
583 if (res->lr_namespace->ns_local)
586 spin_lock(&res->lr_lock);
587 ldlm_reprocess_queue(res, &res->lr_converting);
588 if (list_empty(&res->lr_converting))
589 ldlm_reprocess_queue(res, &res->lr_waiting);
590 spin_unlock(&res->lr_lock);
593 /* Must be called with lock and lock->l_resource unlocked */
594 struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock)
596 struct ldlm_resource *res;
599 res = lock->l_resource;
601 spin_lock(&res->lr_lock);
602 spin_lock(&lock->l_lock);
604 if (lock->l_readers || lock->l_writers)
605 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
606 "writers)\n", lock->l_readers, lock->l_writers);
608 ldlm_resource_del_lock(lock);
609 if (ldlm_resource_put(res))
610 res = NULL; /* res was freed, nothing else to do. */
612 spin_unlock(&res->lr_lock);
613 ldlm_lock_free(lock);
618 /* Must be called with lock and lock->l_resource unlocked */
619 struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh,
620 int new_mode, int *flags)
622 struct ldlm_lock *lock;
623 struct ldlm_resource *res;
626 lock = lustre_handle2object(lockh);
627 res = lock->l_resource;
629 spin_lock(&res->lr_lock);
631 lock->l_req_mode = new_mode;
632 list_del_init(&lock->l_res_link);
634 /* If this is a local resource, put it on the appropriate list. */
635 if (res->lr_namespace->ns_local) {
636 if (*flags & LDLM_FL_BLOCK_CONV)
637 ldlm_resource_add_lock(res, res->lr_converting.prev,
639 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
640 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
642 ldlm_grant_lock(res, lock);
644 list_add(&lock->l_res_link, res->lr_converting.prev);
647 spin_unlock(&res->lr_lock);
652 void ldlm_lock_dump(struct ldlm_lock *lock)
656 if (!(portal_debug & D_OTHER))
659 if (RES_VERSION_SIZE != 4)
663 CDEBUG(D_OTHER, " NULL LDLM lock\n");
667 snprintf(ver, sizeof(ver), "%x %x %x %x",
668 lock->l_version[0], lock->l_version[1],
669 lock->l_version[2], lock->l_version[3]);
671 CDEBUG(D_OTHER, " -- Lock dump: %p (%s)\n", lock, ver);
672 CDEBUG(D_OTHER, " Parent: %p\n", lock->l_parent);
673 CDEBUG(D_OTHER, " Resource: %p\n", lock->l_resource);
674 CDEBUG(D_OTHER, " Requested mode: %d, granted mode: %d\n",
675 (int)lock->l_req_mode, (int)lock->l_granted_mode);
676 CDEBUG(D_OTHER, " Readers: %u ; Writers; %u\n",
677 lock->l_readers, lock->l_writers);
678 if (lock->l_resource->lr_type == LDLM_EXTENT)
679 CDEBUG(D_OTHER, " Extent: %Lu -> %Lu\n",
680 (unsigned long long)lock->l_extent.start,
681 (unsigned long long)lock->l_extent.end);