From 4dad795bcc8f4d99e25563b056cf950b613c2c17 Mon Sep 17 00:00:00 2001 From: donmilos Date: Mon, 23 Jun 2003 01:28:07 +0000 Subject: [PATCH] Posix record locking changes. --- lustre/ldlm/ldlm_flock.c | 418 +++++++++++++++++++++++++++++++++++++++++++++++ lustre/ldlm/ldlm_plain.c | 142 ++++++++++++++++ 2 files changed, 560 insertions(+) create mode 100644 lustre/ldlm/ldlm_flock.c create mode 100644 lustre/ldlm/ldlm_plain.c diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c new file mode 100644 index 0000000..7ef1341 --- /dev/null +++ b/lustre/ldlm/ldlm_flock.c @@ -0,0 +1,418 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (c) 2002, 2003 Cluster File Systems, Inc. + * Author: Peter Braam + * Author: Phil Schwan + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define DEBUG_SUBSYSTEM S_LDLM + +#ifdef __KERNEL__ +#include +#include +#include +#include +#else +#include +#endif + +static inline int +ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new) +{ + if ((new->l_data.l_flock.pid == lock->l_data.l_flock.pid) && + (new->l_export == lock->l_export)) + return 1; + else + return 0; +} + +static inline int +ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new) +{ + if ((new->l_data.l_flock.start <= lock->l_data.l_flock.end) && + (new->l_data.l_flock.end >= lock->l_data.l_flock.start)) + return 1; + else + return 0; +} + +static inline void +ldlm_flock_destroy(struct ldlm_lock *lock, int flags) +{ + ENTRY; + + list_del_init(&lock->l_res_link); + if (flags == LDLM_FL_WAIT_NOREPROC) { + /* client side */ + struct lustre_handle lockh; + + /* Set a flag to prevent us from sending a CANCEL */ + lock->l_flags |= LDLM_FL_LOCAL_ONLY; + + ldlm_lock2handle(lock, &lockh); + ldlm_lock_decref_and_cancel(&lockh, lock->l_granted_mode); + } + + ldlm_lock_destroy(lock); + EXIT; +} + +int +ldlm_flock_enqueue(struct ldlm_lock **reqp, void *req_cookie, int *flags, + int first_enq, ldlm_error_t *err) +{ + struct ldlm_lock *req = *reqp; + struct ldlm_lock *new = req; + struct ldlm_lock *new2 = NULL; + struct ldlm_lock *lock = NULL; + struct ldlm_resource *res = req->l_resource; + struct ldlm_namespace *ns = res->lr_namespace; + struct list_head *tmp; + struct list_head *ownlocks; + ldlm_mode_t mode = req->l_req_mode; + int added = 0; + int overlaps = 0; + ENTRY; + + CDEBUG(D_FLOCK, "flags: 0x%x pid: %d mode: %d start: %llu end: %llu\n", + *flags, new->l_data.l_flock.pid, mode, + req->l_data.l_flock.start, req->l_data.l_flock.end); + + *err = ELDLM_OK; + + /* No blocking ASTs are sent for record locks */ + req->l_blocking_ast = NULL; + + ownlocks = NULL; + if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) { + list_for_each(tmp, &res->lr_granted) { + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + + if (ldlm_same_flock_owner(lock, req)) { + ownlocks = tmp; + break; + } + } + } else { + list_for_each(tmp, &res->lr_granted) { + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + + if (ldlm_same_flock_owner(lock, req)) { + if (!ownlocks) + ownlocks = tmp; + continue; + } + + /* locks are compatible, overlap doesn't matter */ + if (lockmode_compat(lock->l_granted_mode, mode)) + continue; + + if (!ldlm_flocks_overlap(lock, req)) + continue; + + if (*flags & LDLM_FL_BLOCK_NOWAIT) { + ldlm_flock_destroy(req, *flags); + *err = ELDLM_LOCK_ABORTED; + RETURN(LDLM_ITER_STOP); + } + + if (*flags & LDLM_FL_TEST_LOCK) { + req->l_granted_mode = lock->l_granted_mode; + req->l_data.l_flock.pid = + lock->l_data.l_flock.pid; + req->l_data.l_flock.start = + lock->l_data.l_flock.start; + req->l_data.l_flock.end = + lock->l_data.l_flock.end; + ldlm_flock_destroy(req, *flags); + RETURN(LDLM_ITER_STOP); + } + + if (first_enq) { + /* XXX - add deadlock detection check here */ + } + + *flags |= LDLM_FL_BLOCK_GRANTED; + RETURN(LDLM_ITER_CONTINUE); + } + } + + if (*flags & LDLM_FL_TEST_LOCK) { + LASSERT(first_enq); + req->l_granted_mode = req->l_req_mode; + RETURN(LDLM_ITER_STOP); + } + + added = (mode == LCK_NL); + + /* Insert the new lock into the list */ + + if (!ownlocks) + ownlocks = &res->lr_granted; + + for (tmp = ownlocks->next; ownlocks != &res->lr_granted; + ownlocks = tmp, tmp = ownlocks->next) { + lock = list_entry(ownlocks, struct ldlm_lock, l_res_link); + + if (!ldlm_same_flock_owner(lock, new)) + break; + + if (lock->l_granted_mode == mode) { + if (lock->l_data.l_flock.end < + (new->l_data.l_flock.start - 1)) + continue; + + if (lock->l_data.l_flock.start > + (new->l_data.l_flock.end + 1)) + break; + + if (lock->l_data.l_flock.start > + new->l_data.l_flock.start) + lock->l_data.l_flock.start = + new->l_data.l_flock.start; + else + new->l_data.l_flock.start = + lock->l_data.l_flock.start; + + if (lock->l_data.l_flock.end < + new->l_data.l_flock.end) + lock->l_data.l_flock.end = + new->l_data.l_flock.end; + else + new->l_data.l_flock.end = + lock->l_data.l_flock.end; + + if (added) { + ldlm_flock_destroy(lock, *flags); + } else { + new = lock; + added = 1; + } + continue; + } + + if (lock->l_data.l_flock.end < new->l_data.l_flock.start) + continue; + if (lock->l_data.l_flock.start > new->l_data.l_flock.end) + break; + + ++overlaps; + + if (new->l_data.l_flock.start <= + lock->l_data.l_flock.start) { + if (new->l_data.l_flock.end < + lock->l_data.l_flock.end) { + lock->l_data.l_flock.start = + new->l_data.l_flock.end + 1; + break; + } else if (added) { + ldlm_flock_destroy(lock, *flags); + } else { + lock->l_data.l_flock.start = + new->l_data.l_flock.start; + lock->l_data.l_flock.end = + new->l_data.l_flock.end; + new = lock; + added = 1; + } + continue; + } + if (new->l_data.l_flock.end >= lock->l_data.l_flock.end) { + lock->l_data.l_flock.end = + new->l_data.l_flock.start - 1; + continue; + } + + /* split the existing lock into two locks */ + + /* if this is an F_UNLCK operation then we could avoid + * allocating a new lock and use the req lock passed in + * with the request but this would complicate the reply + * processing since updates to req get reflected in the + * reply. The client side must see the original lock data + * so that it can process the unlock properly. */ + + /* XXX - if ldlm_lock_new() can sleep we have to + * release the ns_lock, allocate the new lock, and + * restart processing this lock. */ + new2 = ldlm_lock_create(ns, NULL, res->lr_name, LDLM_FLOCK, + lock->l_granted_mode, NULL, NULL); + if (!new2) { + /* LBUG for now */ + LASSERT(0); + RETURN(ENOMEM); + } + + new2->l_granted_mode = lock->l_granted_mode; + new2->l_data.l_flock.pid = new->l_data.l_flock.pid; + new2->l_data.l_flock.start = lock->l_data.l_flock.start; + new2->l_data.l_flock.end = new->l_data.l_flock.start - 1; + lock->l_data.l_flock.start = new->l_data.l_flock.end + 1; + new2->l_connh = lock->l_connh; + if ((new2->l_export = lock->l_export) != NULL) { + list_add(&new2->l_export_chain, + &new2->l_export-> + exp_ldlm_data.led_held_locks); + } + if (*flags == LDLM_FL_WAIT_NOREPROC) { + /* client side */ + ldlm_lock_addref_internal(new2, lock->l_granted_mode); + } + + /* insert new2 at lock */ + list_add_tail(&new2->l_res_link, ownlocks); + LDLM_LOCK_PUT(new2); + break; + } + + if (added) { + ldlm_flock_destroy(req, *flags); + } else { + /* insert new at ownlocks */ + new->l_granted_mode = new->l_req_mode; + list_del_init(&new->l_res_link); + list_add_tail(&new->l_res_link, ownlocks); + } + + if (*flags != LDLM_FL_WAIT_NOREPROC) { + if (req->l_completion_ast) + ldlm_add_ast_work_item(req, NULL, NULL, 0); + + /* The only problem with doing the reprocessing here is that + * the completion ASTs for newly granted locks will be sent + * before the unlock completion is sent. It shouldn't be an + * issue. Also note that ldlm_flock_enqueue() will recurse, + * but only once because there can't be unlock requests on + * the wait queue. */ + if ((mode == LCK_NL) && overlaps) + ldlm_reprocess_queue(res, &res->lr_waiting); + } + + ldlm_resource_dump(res); + + RETURN(LDLM_ITER_CONTINUE); +} + +static void interrupted_flock_completion_wait(void *data) +{ +} + +struct flock_wait_data { + struct ldlm_lock *fwd_lock; + int fwd_generation; +}; + +int +ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) +{ + struct ldlm_namespace *ns; + struct file_lock *getlk = data; + struct flock_wait_data fwd; + unsigned long irqflags; + struct obd_device *obd; + struct obd_import *imp = NULL; + ldlm_error_t err; + int rc = 0; + struct l_wait_info lwi; + ENTRY; + + LASSERT(flags != LDLM_FL_WAIT_NOREPROC); + + if (flags == 0) { + wake_up(&lock->l_waitq); + RETURN(0); + } + + if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | + LDLM_FL_BLOCK_CONV))) + goto granted; + + LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, " + "sleeping"); + + ldlm_lock_dump(D_OTHER, lock); + + fwd.fwd_lock = lock; + obd = class_conn2obd(lock->l_connh); + + /* if this is a local lock, then there is no import */ + if (obd != NULL) + imp = obd->u.cli.cl_import; + + if (imp != NULL) { + spin_lock_irqsave(&imp->imp_lock, irqflags); + fwd.fwd_generation = imp->imp_generation; + spin_unlock_irqrestore(&imp->imp_lock, irqflags); + } + + lwi = LWI_TIMEOUT_INTR(0, NULL, interrupted_flock_completion_wait, + &fwd); + + /* Go to sleep until the lock is granted. */ + rc = l_wait_event(lock->l_waitq, + ((lock->l_req_mode == lock->l_granted_mode) || + lock->l_destroyed), &lwi); + + LASSERT(!(lock->l_destroyed)); + + if (rc) { + LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)", + rc); + RETURN(rc); + } + +granted: + + LDLM_DEBUG(lock, "client-side enqueue waking up"); + ns = lock->l_resource->lr_namespace; + l_lock(&ns->ns_lock); + + /* ldlm_lock_enqueue() has already placed lock on the granted list. */ + list_del_init(&lock->l_res_link); + + if (getlk) { + /* fcntl(F_GETLK) request */ + if (lock->l_granted_mode == LCK_PR) + getlk->fl_type = F_RDLCK; + else if (lock->l_granted_mode == LCK_PW) + getlk->fl_type = F_WRLCK; + else + getlk->fl_type = F_UNLCK; + getlk->fl_pid = lock->l_data.l_flock.pid; + getlk->fl_start = lock->l_data.l_flock.start; + getlk->fl_end = lock->l_data.l_flock.end; + /* ldlm_flock_destroy(lock); */ + } else { + flags = LDLM_FL_WAIT_NOREPROC; + /* We need to reprocess the lock to do merges or split */ + ldlm_flock_enqueue(&lock, NULL, &flags, 1, &err); + } + l_unlock(&ns->ns_lock); + RETURN(0); +} + +/* This function is only called on the client when a lock is aborted. */ +int +ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *ld, + void *data, int flag) +{ + ENTRY; + ldlm_lock_destroy(lock); + RETURN(0); +} diff --git a/lustre/ldlm/ldlm_plain.c b/lustre/ldlm/ldlm_plain.c new file mode 100644 index 0000000..fc6f029 --- /dev/null +++ b/lustre/ldlm/ldlm_plain.c @@ -0,0 +1,142 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (c) 2002, 2003 Cluster File Systems, Inc. + * Author: Peter Braam + * Author: Phil Schwan + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define DEBUG_SUBSYSTEM S_LDLM + +#ifdef __KERNEL__ +#include +#include +#include +#else +#include +#endif + +static int +ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *new, + int send_cbs, int first_enq) +{ + struct list_head *tmp, *pos; + ldlm_mode_t mode = new->l_req_mode; + int compat = 1; + ENTRY; + + list_for_each_safe(tmp, pos, queue) { + struct ldlm_lock *old; + + old = list_entry(tmp, struct ldlm_lock, l_res_link); + if (old == new) + continue; + + if (lockmode_compat(old->l_req_mode, mode) && + lockmode_compat(old->l_granted_mode, mode)) { + CDEBUG(D_OTHER, "lock modes are compatible, next.\n"); + continue; + } + + compat = 0; + + /* if we're reprocessing the lock then the blocking ASTs + * have already been sent. No need to continue. */ + if (!first_enq) + break; + + if (send_cbs && (old->l_blocking_ast != NULL)) { + CDEBUG(D_DLMTRACE, "lock %p incompatible; " + "sending blocking AST.\n", old); + ldlm_add_ast_work_item(old, new, NULL, 0); + } else if (!(old->l_flags & LDLM_FL_LOCAL)) { + CDEBUG(D_DLMTRACE, "lock %p incompatible; " + "setting blocking AST.\n", old); + old->l_flags |= LDLM_FL_AST_SENT; + } else { + CDEBUG(D_DLMTRACE, "local lock %p incompatible.\n", + old); + } + } + + RETURN(compat); +} + +int +ldlm_plain_enqueue(struct ldlm_lock **lockp, void *cookie, int *flags, + int first_enq, ldlm_error_t *err) +{ + struct ldlm_lock *lock = *lockp; + struct ldlm_resource *res = lock->l_resource; + int convert_compat = 1; + int waiting_compat = 1; + int granted_compat = 1; + ENTRY; + + /* FIXME: We may want to optimize by checking lr_most_restr */ + + /* On the first enqueue of this lock scan all of the queues + * to set the LDLM_FL_AST_SENT flag in conflicting locks. + * When the completion AST on the client side runs and sees + * this flag is will set the LDLM_FL_CB_PENDING flag in the + * lock so the client will know to cancel the lock as soon + * as possible. This saves us from sending a blocking AST + * in addition to the completion AST. + * + * If it's NOT the first enqueue of this lock then it must be + * the first eligible lock in the queues because of the way that + * ldlm_reprocess_all() works. So we don't have to check the + * converting or waiting queues. */ + if (first_enq) { + if (!list_empty(&res->lr_converting)) { + convert_compat = 0; + ldlm_plain_compat_queue(&res->lr_converting, + lock, 0, first_enq); + } + if (!list_empty(&res->lr_waiting)) { + waiting_compat = 0; + ldlm_plain_compat_queue(&res->lr_waiting, + lock, 0, first_enq); + } + } + granted_compat = + ldlm_plain_compat_queue(&res->lr_granted, lock, 1, first_enq); + + if (!convert_compat) { + *flags |= LDLM_FL_BLOCK_CONV; + RETURN(LDLM_ITER_STOP); + } + if (!waiting_compat) { + *flags |= LDLM_FL_BLOCK_WAIT; + RETURN(LDLM_ITER_STOP); + } + if (!granted_compat) { + *flags |= LDLM_FL_BLOCK_GRANTED; + RETURN(LDLM_ITER_STOP); + } + + list_del_init(&lock->l_res_link); + ldlm_grant_lock(lock, NULL, 0); + + if (lock->l_flags & LDLM_FL_AST_SENT) { + CDEBUG(D_DLMTRACE, "granted lock %p with AST set\n", lock); + *flags |= (lock->l_flags & LDLM_FL_AST_SENT); + } + + RETURN(LDLM_ITER_CONTINUE); +} -- 1.8.3.1