Whamcloud - gitweb
Posix record locking changes.
authordonmilos <donmilos>
Mon, 23 Jun 2003 01:28:07 +0000 (01:28 +0000)
committerdonmilos <donmilos>
Mon, 23 Jun 2003 01:28:07 +0000 (01:28 +0000)
lustre/ldlm/ldlm_flock.c [new file with mode: 0644]
lustre/ldlm/ldlm_plain.c [new file with mode: 0644]

diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c
new file mode 100644 (file)
index 0000000..7ef1341
--- /dev/null
@@ -0,0 +1,418 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
+ *   Author: Peter Braam <braam@clusterfs.com>
+ *   Author: Phil Schwan <phil@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_LDLM
+
+#ifdef __KERNEL__
+#include <linux/lustre_dlm.h>
+#include <linux/obd_support.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_lib.h>
+#else
+#include <liblustre.h>
+#endif
+
+static inline int
+ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
+{
+        if ((new->l_data.l_flock.pid == lock->l_data.l_flock.pid) &&
+            (new->l_export == lock->l_export))
+                return 1;
+        else
+                return 0;
+}
+
+static inline int
+ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
+{
+        if ((new->l_data.l_flock.start <= lock->l_data.l_flock.end) &&
+            (new->l_data.l_flock.end >= lock->l_data.l_flock.start))
+                return 1;
+        else
+                return 0;
+}
+
+static inline void
+ldlm_flock_destroy(struct ldlm_lock *lock, int flags)
+{
+        ENTRY;
+
+        list_del_init(&lock->l_res_link);
+        if (flags == LDLM_FL_WAIT_NOREPROC) {
+                /* client side */
+                struct lustre_handle lockh;
+
+                /* Set a flag to prevent us from sending a CANCEL */
+                lock->l_flags |= LDLM_FL_LOCAL_ONLY;
+
+                ldlm_lock2handle(lock, &lockh);
+                ldlm_lock_decref_and_cancel(&lockh, lock->l_granted_mode);
+        }
+
+        ldlm_lock_destroy(lock);
+        EXIT;
+}
+
+int
+ldlm_flock_enqueue(struct ldlm_lock **reqp, void *req_cookie, int *flags,
+                   int first_enq, ldlm_error_t *err)
+{
+        struct ldlm_lock *req = *reqp;
+        struct ldlm_lock *new = req;
+        struct ldlm_lock *new2 = NULL;
+        struct ldlm_lock *lock = NULL;
+        struct ldlm_resource *res = req->l_resource;
+        struct ldlm_namespace *ns = res->lr_namespace;
+        struct list_head *tmp;
+        struct list_head *ownlocks;
+        ldlm_mode_t mode = req->l_req_mode;
+        int added = 0;
+        int overlaps = 0;
+        ENTRY;
+
+        CDEBUG(D_FLOCK, "flags: 0x%x pid: %d mode: %d start: %llu end: %llu\n",
+               *flags, new->l_data.l_flock.pid, mode,
+               req->l_data.l_flock.start, req->l_data.l_flock.end);
+
+        *err = ELDLM_OK;
+
+        /* No blocking ASTs are sent for record locks */
+        req->l_blocking_ast = NULL;
+
+        ownlocks = NULL;
+       if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
+                list_for_each(tmp, &res->lr_granted) {
+                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+                        if (ldlm_same_flock_owner(lock, req)) {
+                                ownlocks = tmp;
+                                break;
+                        }
+                }
+        } else {
+                list_for_each(tmp, &res->lr_granted) {
+                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+                        if (ldlm_same_flock_owner(lock, req)) {
+                                if (!ownlocks)
+                                        ownlocks = tmp;
+                                continue;
+                        }
+
+                        /* locks are compatible, overlap doesn't matter */
+                        if (lockmode_compat(lock->l_granted_mode, mode))
+                                continue;
+                        
+                        if (!ldlm_flocks_overlap(lock, req))
+                                continue;
+
+                        if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+                                ldlm_flock_destroy(req, *flags);
+                                *err = ELDLM_LOCK_ABORTED;
+                                RETURN(LDLM_ITER_STOP);
+                        }
+
+                        if (*flags & LDLM_FL_TEST_LOCK) {
+                                req->l_granted_mode = lock->l_granted_mode;
+                                req->l_data.l_flock.pid =
+                                        lock->l_data.l_flock.pid;
+                                req->l_data.l_flock.start =
+                                        lock->l_data.l_flock.start;
+                                req->l_data.l_flock.end =
+                                        lock->l_data.l_flock.end;
+                                ldlm_flock_destroy(req, *flags);
+                                RETURN(LDLM_ITER_STOP);
+                        }
+
+                        if (first_enq) {
+                                /* XXX - add deadlock detection check here */
+                        }
+
+                        *flags |= LDLM_FL_BLOCK_GRANTED;
+                        RETURN(LDLM_ITER_CONTINUE);
+                }
+        }
+
+        if (*flags & LDLM_FL_TEST_LOCK) {
+                LASSERT(first_enq);
+                req->l_granted_mode = req->l_req_mode;
+                RETURN(LDLM_ITER_STOP);
+        }
+
+        added = (mode == LCK_NL);
+
+        /* Insert the new lock into the list */
+
+        if (!ownlocks)
+                ownlocks = &res->lr_granted;
+
+        for (tmp = ownlocks->next; ownlocks != &res->lr_granted;
+             ownlocks = tmp, tmp = ownlocks->next) {
+                lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
+
+                if (!ldlm_same_flock_owner(lock, new))
+                        break;
+
+               if (lock->l_granted_mode == mode) {
+                       if (lock->l_data.l_flock.end <
+                            (new->l_data.l_flock.start - 1))
+                               continue;
+
+                       if (lock->l_data.l_flock.start >
+                            (new->l_data.l_flock.end + 1))
+                               break;
+
+                       if (lock->l_data.l_flock.start >
+                            new->l_data.l_flock.start)
+                               lock->l_data.l_flock.start =
+                                        new->l_data.l_flock.start;
+                       else
+                               new->l_data.l_flock.start =
+                                        lock->l_data.l_flock.start;
+
+                       if (lock->l_data.l_flock.end <
+                            new->l_data.l_flock.end)
+                               lock->l_data.l_flock.end =
+                                        new->l_data.l_flock.end;
+                       else
+                               new->l_data.l_flock.end =
+                                        lock->l_data.l_flock.end;
+
+                       if (added) {
+                                ldlm_flock_destroy(lock, *flags);
+                       } else {
+                                new = lock;
+                                added = 1;
+                        }
+                        continue;
+               }
+
+                if (lock->l_data.l_flock.end < new->l_data.l_flock.start)
+                        continue;
+                if (lock->l_data.l_flock.start > new->l_data.l_flock.end)
+                        break;
+
+                ++overlaps;
+
+                if (new->l_data.l_flock.start <=
+                    lock->l_data.l_flock.start) {
+                        if (new->l_data.l_flock.end <
+                            lock->l_data.l_flock.end) {
+                                lock->l_data.l_flock.start =
+                                        new->l_data.l_flock.end + 1;
+                                break;
+                        } else if (added) {
+                                ldlm_flock_destroy(lock, *flags);
+                        } else {
+                                lock->l_data.l_flock.start =
+                                        new->l_data.l_flock.start;
+                                lock->l_data.l_flock.end =
+                                        new->l_data.l_flock.end;
+                                new = lock;
+                                added = 1;
+                        }
+                        continue;
+                }
+                if (new->l_data.l_flock.end >= lock->l_data.l_flock.end) {
+                        lock->l_data.l_flock.end =
+                                new->l_data.l_flock.start - 1;
+                        continue;
+                }
+
+                /* split the existing lock into two locks */
+
+                /* if this is an F_UNLCK operation then we could avoid
+                 * allocating a new lock and use the req lock passed in
+                 * with the request but this would complicate the reply
+                 * processing since updates to req get reflected in the
+                 * reply. The client side must see the original lock data
+                 * so that it can process the unlock properly. */
+
+                /* XXX - if ldlm_lock_new() can sleep we have to
+                 * release the ns_lock, allocate the new lock, and
+                 * restart processing this lock. */
+                new2 = ldlm_lock_create(ns, NULL, res->lr_name, LDLM_FLOCK,
+                                        lock->l_granted_mode, NULL, NULL);
+                if (!new2) {
+                        /* LBUG for now */
+                        LASSERT(0);
+                        RETURN(ENOMEM);
+                }
+
+                new2->l_granted_mode = lock->l_granted_mode;
+                new2->l_data.l_flock.pid = new->l_data.l_flock.pid;
+                new2->l_data.l_flock.start = lock->l_data.l_flock.start;
+                new2->l_data.l_flock.end = new->l_data.l_flock.start - 1;
+                lock->l_data.l_flock.start = new->l_data.l_flock.end + 1;
+                new2->l_connh = lock->l_connh;
+                if ((new2->l_export = lock->l_export) != NULL) {
+                        list_add(&new2->l_export_chain,
+                                 &new2->l_export->
+                                 exp_ldlm_data.led_held_locks);
+                }
+                if (*flags == LDLM_FL_WAIT_NOREPROC) {
+                        /* client side */
+                        ldlm_lock_addref_internal(new2, lock->l_granted_mode);
+                }
+
+                /* insert new2 at lock */
+                list_add_tail(&new2->l_res_link, ownlocks);
+                LDLM_LOCK_PUT(new2);
+                break;
+        }
+
+        if (added) {
+                ldlm_flock_destroy(req, *flags);
+        } else {
+                /* insert new at ownlocks */
+                new->l_granted_mode = new->l_req_mode;
+                list_del_init(&new->l_res_link);
+                list_add_tail(&new->l_res_link, ownlocks);
+        }
+
+       if (*flags != LDLM_FL_WAIT_NOREPROC) {
+                if (req->l_completion_ast)
+                        ldlm_add_ast_work_item(req, NULL, NULL, 0);
+
+                /* The only problem with doing the reprocessing here is that
+                 * the completion ASTs for newly granted locks will be sent
+                 * before the unlock completion is sent. It shouldn't be an
+                 * issue. Also note that ldlm_flock_enqueue() will recurse,
+                 * but only once because there can't be unlock requests on
+                 * the wait queue. */
+                if ((mode == LCK_NL) && overlaps)
+                        ldlm_reprocess_queue(res, &res->lr_waiting);
+        }
+
+        ldlm_resource_dump(res);
+
+       RETURN(LDLM_ITER_CONTINUE);
+}
+
+static void interrupted_flock_completion_wait(void *data)
+{
+}
+
+struct flock_wait_data {
+        struct ldlm_lock *fwd_lock;
+        int               fwd_generation;
+};
+
+int
+ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
+{
+        struct ldlm_namespace *ns;
+        struct file_lock *getlk = data;
+        struct flock_wait_data fwd;
+        unsigned long irqflags;
+        struct obd_device *obd;
+        struct obd_import *imp = NULL;
+        ldlm_error_t err;
+        int rc = 0;
+        struct l_wait_info lwi;
+        ENTRY;
+
+        LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
+
+        if (flags == 0) {
+                wake_up(&lock->l_waitq);
+                RETURN(0);
+        }
+
+        if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
+                       LDLM_FL_BLOCK_CONV)))
+                goto  granted;
+
+        LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
+                   "sleeping");
+
+        ldlm_lock_dump(D_OTHER, lock);
+
+        fwd.fwd_lock = lock;
+        obd = class_conn2obd(lock->l_connh);
+
+        /* if this is a local lock, then there is no import */
+        if (obd != NULL)
+                imp = obd->u.cli.cl_import;
+
+        if (imp != NULL) {
+                spin_lock_irqsave(&imp->imp_lock, irqflags);
+                fwd.fwd_generation = imp->imp_generation;
+                spin_unlock_irqrestore(&imp->imp_lock, irqflags);
+        }
+
+        lwi = LWI_TIMEOUT_INTR(0, NULL, interrupted_flock_completion_wait,
+                               &fwd);
+
+        /* Go to sleep until the lock is granted. */
+        rc = l_wait_event(lock->l_waitq,
+                          ((lock->l_req_mode == lock->l_granted_mode) ||
+                           lock->l_destroyed), &lwi);
+
+        LASSERT(!(lock->l_destroyed));
+
+        if (rc) {
+                LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
+                           rc);
+                RETURN(rc);
+        }
+
+granted:
+
+        LDLM_DEBUG(lock, "client-side enqueue waking up");
+        ns = lock->l_resource->lr_namespace;
+        l_lock(&ns->ns_lock);
+
+        /* ldlm_lock_enqueue() has already placed lock on the granted list. */
+        list_del_init(&lock->l_res_link);
+
+        if (getlk) {
+                /* fcntl(F_GETLK) request */
+                if (lock->l_granted_mode == LCK_PR)
+                        getlk->fl_type = F_RDLCK;
+                else if (lock->l_granted_mode == LCK_PW)
+                        getlk->fl_type = F_WRLCK;
+                else
+                        getlk->fl_type = F_UNLCK;
+                getlk->fl_pid = lock->l_data.l_flock.pid;
+                getlk->fl_start = lock->l_data.l_flock.start;
+                getlk->fl_end = lock->l_data.l_flock.end;
+                /* ldlm_flock_destroy(lock); */
+        } else {
+                flags = LDLM_FL_WAIT_NOREPROC;
+                /* We need to reprocess the lock to do merges or split */
+                ldlm_flock_enqueue(&lock, NULL, &flags, 1, &err);
+        }
+        l_unlock(&ns->ns_lock);
+        RETURN(0);
+}
+
+/* This function is only called on the client when a lock is aborted. */
+int
+ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *ld,
+                        void *data, int flag)
+{
+        ENTRY;
+        ldlm_lock_destroy(lock);
+        RETURN(0);
+}
diff --git a/lustre/ldlm/ldlm_plain.c b/lustre/ldlm/ldlm_plain.c
new file mode 100644 (file)
index 0000000..fc6f029
--- /dev/null
@@ -0,0 +1,142 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
+ *   Author: Peter Braam <braam@clusterfs.com>
+ *   Author: Phil Schwan <phil@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_LDLM
+
+#ifdef __KERNEL__
+#include <linux/lustre_dlm.h>
+#include <linux/obd_support.h>
+#include <linux/lustre_lib.h>
+#else
+#include <liblustre.h>
+#endif
+
+static int
+ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *new,
+                        int send_cbs, int first_enq)
+{
+        struct list_head *tmp, *pos;
+       ldlm_mode_t mode = new->l_req_mode;
+        int compat = 1;
+        ENTRY;
+
+        list_for_each_safe(tmp, pos, queue) {
+                struct ldlm_lock *old;
+
+                old = list_entry(tmp, struct ldlm_lock, l_res_link);
+                if (old == new)
+                        continue;
+
+                if (lockmode_compat(old->l_req_mode, mode) &&
+                    lockmode_compat(old->l_granted_mode, mode)) {
+                        CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
+                        continue;
+                }
+
+                compat = 0;
+
+                /* if we're reprocessing the lock then the blocking ASTs
+                 * have already been sent. No need to continue. */
+                if (!first_enq)
+                        break;
+
+                if (send_cbs && (old->l_blocking_ast != NULL)) {
+                        CDEBUG(D_DLMTRACE, "lock %p incompatible; "
+                               "sending blocking AST.\n", old);
+                        ldlm_add_ast_work_item(old, new, NULL, 0);
+                } else if (!(old->l_flags & LDLM_FL_LOCAL)) {
+                        CDEBUG(D_DLMTRACE, "lock %p incompatible; "
+                               "setting blocking AST.\n", old);
+                        old->l_flags |= LDLM_FL_AST_SENT;
+                } else {
+                        CDEBUG(D_DLMTRACE, "local lock %p incompatible.\n",
+                               old);
+                }
+        }
+
+        RETURN(compat);
+}
+
+int
+ldlm_plain_enqueue(struct ldlm_lock **lockp, void *cookie, int *flags,
+                   int first_enq, ldlm_error_t *err)
+{
+        struct ldlm_lock *lock = *lockp;
+        struct ldlm_resource *res = lock->l_resource;
+       int convert_compat = 1;
+       int waiting_compat = 1;
+       int granted_compat = 1;
+        ENTRY;
+
+        /* FIXME: We may want to optimize by checking lr_most_restr */
+
+        /* On the first enqueue of this lock scan all of the queues
+         * to set the LDLM_FL_AST_SENT flag in conflicting locks.
+         * When the completion AST on the client side runs and sees
+         * this flag is will set the LDLM_FL_CB_PENDING flag in the
+         * lock so the client will know to cancel the lock as soon
+         * as possible. This saves us from sending a blocking AST
+         * in addition to the completion AST.
+         *
+         * If it's NOT the first enqueue of this lock then it must be
+         * the first eligible lock in the queues because of the way that
+         * ldlm_reprocess_all() works. So we don't have to check the
+         * converting or waiting queues. */
+        if (first_enq) {
+                if (!list_empty(&res->lr_converting)) {
+                        convert_compat = 0;
+                        ldlm_plain_compat_queue(&res->lr_converting,
+                                                lock, 0, first_enq);
+                }
+                if (!list_empty(&res->lr_waiting)) {
+                        waiting_compat = 0;
+                        ldlm_plain_compat_queue(&res->lr_waiting,
+                                                lock, 0, first_enq);
+                }
+        }
+        granted_compat =
+                ldlm_plain_compat_queue(&res->lr_granted, lock, 1, first_enq);
+
+        if (!convert_compat) {
+                *flags |= LDLM_FL_BLOCK_CONV;
+                RETURN(LDLM_ITER_STOP);
+        }
+        if (!waiting_compat) {
+                *flags |= LDLM_FL_BLOCK_WAIT;
+                RETURN(LDLM_ITER_STOP);
+        }
+        if (!granted_compat) {
+                *flags |= LDLM_FL_BLOCK_GRANTED;
+                RETURN(LDLM_ITER_STOP);
+        }
+
+        list_del_init(&lock->l_res_link);
+        ldlm_grant_lock(lock, NULL, 0);
+
+        if (lock->l_flags & LDLM_FL_AST_SENT) {
+                CDEBUG(D_DLMTRACE, "granted lock %p with AST set\n", lock);
+                *flags |= (lock->l_flags & LDLM_FL_AST_SENT);
+        }
+
+        RETURN(LDLM_ITER_CONTINUE);
+}