1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * by Cluster File Systems, Inc.
10 * authors, Peter Braam <braam@clusterfs.com> &
11 * Phil Schwan <phil@clusterfs.com>
14 #define DEBUG_SUBSYSTEM S_LDLM
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/lustre_dlm.h>
19 #include <linux/lustre_mds.h>
21 extern kmem_cache_t *ldlm_lock_slab;
22 int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL;
23 int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL;
25 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
26 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
27 ldlm_mode_t mode, void *data);
29 ldlm_res_compat ldlm_res_compat_table [] = {
30 [LDLM_PLAIN] ldlm_plain_compat,
31 [LDLM_EXTENT] ldlm_extent_compat,
32 [LDLM_MDSINTENT] ldlm_plain_compat
35 ldlm_res_policy ldlm_res_policy_table [] = {
37 [LDLM_EXTENT] ldlm_extent_policy,
38 [LDLM_MDSINTENT] ldlm_intent_policy
41 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
42 ldlm_mode_t mode, void *data)
44 struct ptlrpc_request *req = req_cookie;
51 if (req->rq_reqmsg->bufcount > 1) {
52 /* an intent needs to be considered */
53 struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
54 struct mds_body *mds_rep;
55 struct ldlm_reply *rep;
56 struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
57 __u32 type = lock->l_resource->lr_type;
58 __u64 new_resid[3] = {0, 0, 0}, old_res;
59 int bufcount = -1, rc, size[3] = {sizeof(struct ldlm_reply),
60 sizeof(struct mds_body),
63 it->opc = NTOH__u64(it->opc);
65 LDLM_DEBUG(lock, "intent policy, opc: %Ld", it->opc);
70 /* Note that in the negative case you may be returning
71 * a file and its obdo */
73 case IT_CREAT|IT_OPEN:
84 size[1] = sizeof(struct obdo);
93 rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
96 rc = req->rq_status = -ENOMEM;
100 rep = lustre_msg_buf(req->rq_repmsg, 0);
101 rep->lock_policy_res1 = 1;
106 case IT_CREAT|IT_OPEN:
115 if (mds_reint_p == NULL)
117 inter_module_get_request
118 ("mds_reint", "mds");
119 if (IS_ERR(mds_reint_p)) {
120 CERROR("MDSINTENT locks require the MDS "
125 rc = mds_reint_p(2, req);
133 if (mds_getattr_name_p == NULL)
135 inter_module_get_request
136 ("mds_getattr_name", "mds");
137 if (IS_ERR(mds_getattr_name_p)) {
138 CERROR("MDSINTENT locks require the MDS "
143 rc = mds_getattr_name_p(2, req);
149 case IT_READDIR|IT_OPEN:
153 CERROR("Unhandled intent\n");
157 if (it->opc == IT_UNLINK || it->opc == IT_RMDIR)
158 RETURN(ELDLM_LOCK_ABORTED);
160 mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
161 rep->lock_policy_res2 = req->rq_status;
162 new_resid[0] = mds_rep->ino;
163 old_res = lock->l_resource->lr_name[0];
165 CDEBUG(D_INFO, "remote intent: locking %d instead of"
166 "%ld\n", mds_rep->ino, (long)old_res);
167 spin_lock(&lock->l_resource->lr_lock);
168 if (!ldlm_resource_put(lock->l_resource))
169 /* unlock it unless the resource was freed */
170 spin_unlock(&lock->l_resource->lr_lock);
173 ldlm_resource_get(ns, NULL, new_resid, type, 1);
174 if (lock->l_resource == NULL) {
178 LDLM_DEBUG(lock, "intent policy, old res %ld",
180 RETURN(ELDLM_LOCK_CHANGED);
182 int size = sizeof(struct ldlm_reply);
183 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
186 CERROR("out of memory\n");
194 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
196 return lockmode_compat(a->l_req_mode, b->l_req_mode);
199 /* Args: referenced, unlocked parent (or NULL)
200 * referenced, unlocked resource
201 * Locks: parent->l_lock */
202 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
203 struct ldlm_resource *resource)
205 struct ldlm_lock *lock;
207 if (resource == NULL)
210 lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
214 memset(lock, 0, sizeof(*lock));
215 lock->l_resource = resource;
216 INIT_LIST_HEAD(&lock->l_children);
217 INIT_LIST_HEAD(&lock->l_res_link);
218 init_waitqueue_head(&lock->l_waitq);
219 lock->l_lock = SPIN_LOCK_UNLOCKED;
221 if (parent != NULL) {
222 spin_lock(&parent->l_lock);
223 lock->l_parent = parent;
224 list_add(&lock->l_childof, &parent->l_children);
225 spin_unlock(&parent->l_lock);
231 /* Args: unreferenced, locked lock
233 * Caller must do its own ldlm_resource_put() on lock->l_resource */
234 void ldlm_lock_free(struct ldlm_lock *lock)
236 if (!list_empty(&lock->l_children)) {
237 CERROR("lock %p still has children (%p)!\n", lock,
238 lock->l_children.next);
239 ldlm_lock_dump(lock);
243 if (lock->l_readers || lock->l_writers)
244 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
245 "writers)\n", lock->l_readers, lock->l_writers);
247 if (lock->l_connection)
248 ptlrpc_put_connection(lock->l_connection);
249 kmem_cache_free(ldlm_lock_slab, lock);
252 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
254 ldlm_res2desc(lock->l_resource, &desc->l_resource);
255 desc->l_req_mode = lock->l_req_mode;
256 desc->l_granted_mode = lock->l_granted_mode;
257 memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
258 memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
261 /* Args: unlocked lock */
262 void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
264 spin_lock(&lock->l_lock);
265 if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
269 spin_unlock(&lock->l_lock);
272 int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
274 struct ptlrpc_request *req = NULL;
277 spin_lock(&lock->l_lock);
278 if (lock->l_flags & LDLM_FL_AST_SENT) {
282 lock->l_flags |= LDLM_FL_AST_SENT;
284 lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len, &req);
285 spin_unlock(&lock->l_lock);
287 struct list_head *list = lock->l_resource->lr_tmp;
288 list_add(&req->rq_multi, list);
293 /* Args: unlocked lock */
294 void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
301 spin_lock(&lock->l_lock);
302 if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
306 if (!lock->l_readers && !lock->l_writers &&
307 lock->l_flags & LDLM_FL_DYING) {
308 /* Read this lock its rights. */
309 if (!lock->l_resource->lr_namespace->ns_client) {
310 CERROR("LDLM_FL_DYING set on non-local lock!\n");
314 CDEBUG(D_INFO, "final decref done on dying lock, "
315 "calling callback.\n");
316 spin_unlock(&lock->l_lock);
317 /* This function pointer is unfortunately overloaded. This
318 * call will not result in an RPC. */
319 lock->l_blocking_ast(lock, NULL, lock->l_data,
320 lock->l_data_len, NULL);
322 spin_unlock(&lock->l_lock);
326 /* Args: unlocked lock */
327 static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
328 struct list_head *queue)
330 struct list_head *tmp, *pos;
333 list_for_each_safe(tmp, pos, queue) {
334 struct ldlm_lock *child;
335 ldlm_res_compat compat;
337 child = list_entry(tmp, struct ldlm_lock, l_res_link);
341 compat = ldlm_res_compat_table[child->l_resource->lr_type];
342 if (compat(child, lock)) {
343 CDEBUG(D_OTHER, "compat function succeded, next.\n");
346 if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
347 CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
353 CDEBUG(D_OTHER, "compat function failed and lock modes incompat\n");
354 if (send_cbs && child->l_blocking_ast != NULL) {
355 CDEBUG(D_OTHER, "incompatible; sending blocking AST.\n");
356 /* It's very difficult to actually send the AST from
357 * here, because we'd have to drop the lock before going
358 * to sleep to wait for the reply. Instead we build the
359 * packet and send it later. */
360 ldlm_send_blocking_ast(child, lock);
367 /* Args: unlocked lock */
368 static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
373 rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted);
374 /* FIXME: should we be sending ASTs to converting? */
375 rc |= _ldlm_lock_compat(lock, send_cbs,
376 &lock->l_resource->lr_converting);
381 /* Args: locked lock, locked resource */
382 void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
386 ldlm_resource_add_lock(res, &res->lr_granted, lock);
387 lock->l_granted_mode = lock->l_req_mode;
389 if (lock->l_granted_mode < res->lr_most_restr)
390 res->lr_most_restr = lock->l_granted_mode;
392 if (lock->l_completion_ast)
393 lock->l_completion_ast(lock, NULL, lock->l_data,
394 lock->l_data_len, NULL);
398 static int search_queue(struct list_head *queue, ldlm_mode_t mode,
399 struct ldlm_extent *extent, struct lustre_handle *lockh)
401 struct list_head *tmp;
403 list_for_each(tmp, queue) {
404 struct ldlm_lock *lock;
405 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
407 if (lock->l_flags & LDLM_FL_DYING)
410 /* lock_convert() takes the resource lock, so we're sure that
411 * req_mode, lr_type, and l_cookie won't change beneath us */
412 if (lock->l_req_mode != mode)
415 if (lock->l_resource->lr_type == LDLM_EXTENT &&
416 (lock->l_extent.start > extent->start ||
417 lock->l_extent.end < extent->end))
420 ldlm_lock_addref(lock, mode);
421 ldlm_object2handle(lock, lockh);
428 /* Must be called with no resource or lock locks held.
430 * Returns 1 if it finds an already-existing lock that is compatible; in this
431 * case, lockh is filled in with a addref()ed lock */
432 int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
433 void *cookie, int cookielen, ldlm_mode_t mode,
434 struct lustre_handle *lockh)
436 struct ldlm_resource *res;
440 res = ldlm_resource_get(ns, NULL, res_id, type, 0);
444 spin_lock(&res->lr_lock);
445 if (search_queue(&res->lr_granted, mode, cookie, lockh))
447 if (search_queue(&res->lr_converting, mode, cookie, lockh))
449 if (search_queue(&res->lr_waiting, mode, cookie, lockh))
454 ldlm_resource_put(res);
455 spin_unlock(&res->lr_lock);
459 /* Must be called without the resource lock held. Returns a referenced,
460 * unlocked ldlm_lock. */
461 ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns,
462 struct lustre_handle *parent_lock_handle,
463 __u64 *res_id, __u32 type,
467 struct lustre_handle *lockh)
469 struct ldlm_resource *res, *parent_res = NULL;
470 struct ldlm_lock *lock, *parent_lock;
472 parent_lock = lustre_handle2object(parent_lock_handle);
474 parent_res = parent_lock->l_resource;
476 res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
480 lock = ldlm_lock_new(parent_lock, res);
482 spin_lock(&res->lr_lock);
483 ldlm_resource_put(res);
484 spin_unlock(&res->lr_lock);
488 lock->l_req_mode = mode;
490 lock->l_data_len = data_len;
491 ldlm_lock_addref(lock, mode);
493 ldlm_object2handle(lock, lockh);
497 /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
498 ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh,
499 void *cookie, int cookie_len,
501 ldlm_lock_callback completion,
502 ldlm_lock_callback blocking)
504 struct ldlm_resource *res;
505 struct ldlm_lock *lock;
506 int incompat = 0, local;
507 ldlm_res_policy policy;
510 lock = lustre_handle2object(lockh);
511 res = lock->l_resource;
512 local = res->lr_namespace->ns_client;
513 spin_lock(&res->lr_lock);
515 lock->l_blocking_ast = blocking;
517 if (res->lr_type == LDLM_EXTENT)
518 memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
520 /* policies are not executed on the client */
521 if (!local && (policy = ldlm_res_policy_table[res->lr_type])) {
524 /* We do this dancing with refcounts and locks because the
525 * policy function could send an RPC */
527 spin_unlock(&res->lr_lock);
529 rc = policy(lock, cookie, lock->l_req_mode, NULL);
531 spin_lock(&res->lr_lock);
532 ldlm_resource_put(res);
534 if (rc == ELDLM_LOCK_CHANGED) {
535 spin_unlock(&res->lr_lock);
536 res = lock->l_resource;
537 spin_lock(&res->lr_lock);
538 *flags |= LDLM_FL_LOCK_CHANGED;
539 } else if (rc == ELDLM_LOCK_ABORTED) {
541 ldlm_resource_put(lock->l_resource);
542 ldlm_lock_free(lock);
547 lock->l_cookie = cookie;
548 lock->l_cookie_len = cookie_len;
550 if (local && lock->l_req_mode == lock->l_granted_mode) {
551 /* The server returned a blocked lock, but it was granted before
552 * we got a chance to actually enqueue it. We don't need to do
554 GOTO(out_noput, ELDLM_OK);
557 /* If this is a local resource, put it on the appropriate list. */
558 list_del_init(&lock->l_res_link);
560 if (*flags & LDLM_FL_BLOCK_CONV)
561 ldlm_resource_add_lock(res, res->lr_converting.prev,
563 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
564 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
566 ldlm_grant_lock(res, lock);
570 /* FIXME: We may want to optimize by checking lr_most_restr */
571 if (!list_empty(&res->lr_converting)) {
572 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
573 *flags |= LDLM_FL_BLOCK_CONV;
576 if (!list_empty(&res->lr_waiting)) {
577 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
578 *flags |= LDLM_FL_BLOCK_WAIT;
581 incompat = ldlm_lock_compat(lock, 0);
583 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
584 *flags |= LDLM_FL_BLOCK_GRANTED;
588 ldlm_grant_lock(res, lock);
591 /* We're called with a lock that has a referenced resource and is not on
592 * any resource list. When we added it to a list, we incurred an extra
594 ldlm_resource_put(lock->l_resource);
596 /* Don't set 'completion_ast' until here so that if the lock is granted
597 * immediately we don't do an unnecessary completion call. */
598 lock->l_completion_ast = completion;
599 spin_unlock(&res->lr_lock);
603 /* Must be called with resource->lr_lock taken. */
604 static int ldlm_reprocess_queue(struct ldlm_resource *res,
605 struct list_head *converting)
607 struct list_head *tmp, *pos;
610 list_for_each_safe(tmp, pos, converting) {
611 struct ldlm_lock *pending;
612 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
614 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
616 /* the resource lock protects ldlm_lock_compat */
617 if (ldlm_lock_compat(pending, 1))
620 list_del_init(&pending->l_res_link);
621 ldlm_grant_lock(res, pending);
623 ldlm_lock_addref(pending, pending->l_req_mode);
624 ldlm_lock_decref(pending, pending->l_granted_mode);
630 /* Must be called with resource->lr_lock not taken. */
631 void ldlm_reprocess_all(struct ldlm_resource *res)
633 struct list_head rpc_list, *tmp, *pos;
635 INIT_LIST_HEAD(&rpc_list);
637 /* Local lock trees don't get reprocessed. */
638 if (res->lr_namespace->ns_client)
641 spin_lock(&res->lr_lock);
642 res->lr_tmp = &rpc_list;
644 ldlm_reprocess_queue(res, &res->lr_converting);
645 if (list_empty(&res->lr_converting))
646 ldlm_reprocess_queue(res, &res->lr_waiting);
649 spin_unlock(&res->lr_lock);
651 list_for_each_safe(tmp, pos, &rpc_list) {
653 struct ptlrpc_request *req =
654 list_entry(tmp, struct ptlrpc_request, rq_multi);
656 CDEBUG(D_INFO, "Sending callback.\n");
658 rc = ptlrpc_queue_wait(req);
659 rc = ptlrpc_check_status(req, rc);
660 ptlrpc_free_req(req);
662 CERROR("Callback send failed: %d\n", rc);
666 /* Must be called with lock and lock->l_resource unlocked */
667 struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock)
669 struct ldlm_resource *res;
672 res = lock->l_resource;
674 spin_lock(&res->lr_lock);
675 spin_lock(&lock->l_lock);
677 if (lock->l_readers || lock->l_writers)
678 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
679 "writers)\n", lock->l_readers, lock->l_writers);
681 if (ldlm_resource_del_lock(lock))
682 res = NULL; /* res was freed, nothing else to do. */
684 spin_unlock(&res->lr_lock);
685 ldlm_lock_free(lock);
690 /* Must be called with lock and lock->l_resource unlocked */
691 struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh,
692 int new_mode, int *flags)
694 struct ldlm_lock *lock;
695 struct ldlm_resource *res;
698 lock = lustre_handle2object(lockh);
699 res = lock->l_resource;
701 spin_lock(&res->lr_lock);
703 lock->l_req_mode = new_mode;
704 list_del_init(&lock->l_res_link);
706 /* If this is a local resource, put it on the appropriate list. */
707 if (res->lr_namespace->ns_client) {
708 if (*flags & LDLM_FL_BLOCK_CONV)
709 ldlm_resource_add_lock(res, res->lr_converting.prev,
711 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
712 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
714 ldlm_grant_lock(res, lock);
716 list_add(&lock->l_res_link, res->lr_converting.prev);
719 spin_unlock(&res->lr_lock);
724 void ldlm_lock_dump(struct ldlm_lock *lock)
728 if (!(portal_debug & D_OTHER))
731 if (RES_VERSION_SIZE != 4)
735 CDEBUG(D_OTHER, " NULL LDLM lock\n");
739 snprintf(ver, sizeof(ver), "%x %x %x %x",
740 lock->l_version[0], lock->l_version[1],
741 lock->l_version[2], lock->l_version[3]);
743 CDEBUG(D_OTHER, " -- Lock dump: %p (%s)\n", lock, ver);
744 CDEBUG(D_OTHER, " Parent: %p\n", lock->l_parent);
745 CDEBUG(D_OTHER, " Resource: %p (%Ld)\n", lock->l_resource,
746 lock->l_resource->lr_name[0]);
747 CDEBUG(D_OTHER, " Requested mode: %d, granted mode: %d\n",
748 (int)lock->l_req_mode, (int)lock->l_granted_mode);
749 CDEBUG(D_OTHER, " Readers: %u ; Writers; %u\n",
750 lock->l_readers, lock->l_writers);
751 if (lock->l_resource->lr_type == LDLM_EXTENT)
752 CDEBUG(D_OTHER, " Extent: %Lu -> %Lu\n",
753 (unsigned long long)lock->l_extent.start,
754 (unsigned long long)lock->l_extent.end);