--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
+ * Author: Peter Braam <braam@clusterfs.com>
+ * Author: Phil Schwan <phil@clusterfs.com>
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_LDLM
+
+#ifdef __KERNEL__
+#include <linux/lustre_dlm.h>
+#include <linux/obd_support.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_lib.h>
+#else
+#include <liblustre.h>
+#endif
+
+static inline int
+ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
+{
+ if ((new->l_data.l_flock.pid == lock->l_data.l_flock.pid) &&
+ (new->l_export == lock->l_export))
+ return 1;
+ else
+ return 0;
+}
+
+static inline int
+ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
+{
+ if ((new->l_data.l_flock.start <= lock->l_data.l_flock.end) &&
+ (new->l_data.l_flock.end >= lock->l_data.l_flock.start))
+ return 1;
+ else
+ return 0;
+}
+
+static inline void
+ldlm_flock_destroy(struct ldlm_lock *lock, int flags)
+{
+ ENTRY;
+
+ list_del_init(&lock->l_res_link);
+ if (flags == LDLM_FL_WAIT_NOREPROC) {
+ /* client side */
+ struct lustre_handle lockh;
+
+ /* Set a flag to prevent us from sending a CANCEL */
+ lock->l_flags |= LDLM_FL_LOCAL_ONLY;
+
+ ldlm_lock2handle(lock, &lockh);
+ ldlm_lock_decref_and_cancel(&lockh, lock->l_granted_mode);
+ }
+
+ ldlm_lock_destroy(lock);
+ EXIT;
+}
+
+int
+ldlm_flock_enqueue(struct ldlm_lock **reqp, void *req_cookie, int *flags,
+ int first_enq, ldlm_error_t *err)
+{
+ struct ldlm_lock *req = *reqp;
+ struct ldlm_lock *new = req;
+ struct ldlm_lock *new2 = NULL;
+ struct ldlm_lock *lock = NULL;
+ struct ldlm_resource *res = req->l_resource;
+ struct ldlm_namespace *ns = res->lr_namespace;
+ struct list_head *tmp;
+ struct list_head *ownlocks;
+ ldlm_mode_t mode = req->l_req_mode;
+ int added = 0;
+ int overlaps = 0;
+ ENTRY;
+
+ CDEBUG(D_FLOCK, "flags: 0x%x pid: %d mode: %d start: %llu end: %llu\n",
+ *flags, new->l_data.l_flock.pid, mode,
+ req->l_data.l_flock.start, req->l_data.l_flock.end);
+
+ *err = ELDLM_OK;
+
+ /* No blocking ASTs are sent for record locks */
+ req->l_blocking_ast = NULL;
+
+ ownlocks = NULL;
+ if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
+ list_for_each(tmp, &res->lr_granted) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (ldlm_same_flock_owner(lock, req)) {
+ ownlocks = tmp;
+ break;
+ }
+ }
+ } else {
+ list_for_each(tmp, &res->lr_granted) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (ldlm_same_flock_owner(lock, req)) {
+ if (!ownlocks)
+ ownlocks = tmp;
+ continue;
+ }
+
+ /* locks are compatible, overlap doesn't matter */
+ if (lockmode_compat(lock->l_granted_mode, mode))
+ continue;
+
+ if (!ldlm_flocks_overlap(lock, req))
+ continue;
+
+ if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+ ldlm_flock_destroy(req, *flags);
+ *err = ELDLM_LOCK_ABORTED;
+ RETURN(LDLM_ITER_STOP);
+ }
+
+ if (*flags & LDLM_FL_TEST_LOCK) {
+ req->l_granted_mode = lock->l_granted_mode;
+ req->l_data.l_flock.pid =
+ lock->l_data.l_flock.pid;
+ req->l_data.l_flock.start =
+ lock->l_data.l_flock.start;
+ req->l_data.l_flock.end =
+ lock->l_data.l_flock.end;
+ ldlm_flock_destroy(req, *flags);
+ RETURN(LDLM_ITER_STOP);
+ }
+
+ if (first_enq) {
+ /* XXX - add deadlock detection check here */
+ }
+
+ *flags |= LDLM_FL_BLOCK_GRANTED;
+ RETURN(LDLM_ITER_CONTINUE);
+ }
+ }
+
+ if (*flags & LDLM_FL_TEST_LOCK) {
+ LASSERT(first_enq);
+ req->l_granted_mode = req->l_req_mode;
+ RETURN(LDLM_ITER_STOP);
+ }
+
+ added = (mode == LCK_NL);
+
+ /* Insert the new lock into the list */
+
+ if (!ownlocks)
+ ownlocks = &res->lr_granted;
+
+ for (tmp = ownlocks->next; ownlocks != &res->lr_granted;
+ ownlocks = tmp, tmp = ownlocks->next) {
+ lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
+
+ if (!ldlm_same_flock_owner(lock, new))
+ break;
+
+ if (lock->l_granted_mode == mode) {
+ if (lock->l_data.l_flock.end <
+ (new->l_data.l_flock.start - 1))
+ continue;
+
+ if (lock->l_data.l_flock.start >
+ (new->l_data.l_flock.end + 1))
+ break;
+
+ if (lock->l_data.l_flock.start >
+ new->l_data.l_flock.start)
+ lock->l_data.l_flock.start =
+ new->l_data.l_flock.start;
+ else
+ new->l_data.l_flock.start =
+ lock->l_data.l_flock.start;
+
+ if (lock->l_data.l_flock.end <
+ new->l_data.l_flock.end)
+ lock->l_data.l_flock.end =
+ new->l_data.l_flock.end;
+ else
+ new->l_data.l_flock.end =
+ lock->l_data.l_flock.end;
+
+ if (added) {
+ ldlm_flock_destroy(lock, *flags);
+ } else {
+ new = lock;
+ added = 1;
+ }
+ continue;
+ }
+
+ if (lock->l_data.l_flock.end < new->l_data.l_flock.start)
+ continue;
+ if (lock->l_data.l_flock.start > new->l_data.l_flock.end)
+ break;
+
+ ++overlaps;
+
+ if (new->l_data.l_flock.start <=
+ lock->l_data.l_flock.start) {
+ if (new->l_data.l_flock.end <
+ lock->l_data.l_flock.end) {
+ lock->l_data.l_flock.start =
+ new->l_data.l_flock.end + 1;
+ break;
+ } else if (added) {
+ ldlm_flock_destroy(lock, *flags);
+ } else {
+ lock->l_data.l_flock.start =
+ new->l_data.l_flock.start;
+ lock->l_data.l_flock.end =
+ new->l_data.l_flock.end;
+ new = lock;
+ added = 1;
+ }
+ continue;
+ }
+ if (new->l_data.l_flock.end >= lock->l_data.l_flock.end) {
+ lock->l_data.l_flock.end =
+ new->l_data.l_flock.start - 1;
+ continue;
+ }
+
+ /* split the existing lock into two locks */
+
+ /* if this is an F_UNLCK operation then we could avoid
+ * allocating a new lock and use the req lock passed in
+ * with the request but this would complicate the reply
+ * processing since updates to req get reflected in the
+ * reply. The client side must see the original lock data
+ * so that it can process the unlock properly. */
+
+ /* XXX - if ldlm_lock_new() can sleep we have to
+ * release the ns_lock, allocate the new lock, and
+ * restart processing this lock. */
+ new2 = ldlm_lock_create(ns, NULL, res->lr_name, LDLM_FLOCK,
+ lock->l_granted_mode, NULL, NULL);
+ if (!new2) {
+ /* LBUG for now */
+ LASSERT(0);
+ RETURN(ENOMEM);
+ }
+
+ new2->l_granted_mode = lock->l_granted_mode;
+ new2->l_data.l_flock.pid = new->l_data.l_flock.pid;
+ new2->l_data.l_flock.start = lock->l_data.l_flock.start;
+ new2->l_data.l_flock.end = new->l_data.l_flock.start - 1;
+ lock->l_data.l_flock.start = new->l_data.l_flock.end + 1;
+ new2->l_connh = lock->l_connh;
+ if ((new2->l_export = lock->l_export) != NULL) {
+ list_add(&new2->l_export_chain,
+ &new2->l_export->
+ exp_ldlm_data.led_held_locks);
+ }
+ if (*flags == LDLM_FL_WAIT_NOREPROC) {
+ /* client side */
+ ldlm_lock_addref_internal(new2, lock->l_granted_mode);
+ }
+
+ /* insert new2 at lock */
+ list_add_tail(&new2->l_res_link, ownlocks);
+ LDLM_LOCK_PUT(new2);
+ break;
+ }
+
+ if (added) {
+ ldlm_flock_destroy(req, *flags);
+ } else {
+ /* insert new at ownlocks */
+ new->l_granted_mode = new->l_req_mode;
+ list_del_init(&new->l_res_link);
+ list_add_tail(&new->l_res_link, ownlocks);
+ }
+
+ if (*flags != LDLM_FL_WAIT_NOREPROC) {
+ if (req->l_completion_ast)
+ ldlm_add_ast_work_item(req, NULL, NULL, 0);
+
+ /* The only problem with doing the reprocessing here is that
+ * the completion ASTs for newly granted locks will be sent
+ * before the unlock completion is sent. It shouldn't be an
+ * issue. Also note that ldlm_flock_enqueue() will recurse,
+ * but only once because there can't be unlock requests on
+ * the wait queue. */
+ if ((mode == LCK_NL) && overlaps)
+ ldlm_reprocess_queue(res, &res->lr_waiting);
+ }
+
+ ldlm_resource_dump(res);
+
+ RETURN(LDLM_ITER_CONTINUE);
+}
+
+static void interrupted_flock_completion_wait(void *data)
+{
+}
+
+struct flock_wait_data {
+ struct ldlm_lock *fwd_lock;
+ int fwd_generation;
+};
+
+int
+ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
+{
+ struct ldlm_namespace *ns;
+ struct file_lock *getlk = data;
+ struct flock_wait_data fwd;
+ unsigned long irqflags;
+ struct obd_device *obd;
+ struct obd_import *imp = NULL;
+ ldlm_error_t err;
+ int rc = 0;
+ struct l_wait_info lwi;
+ ENTRY;
+
+ LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
+
+ if (flags == 0) {
+ wake_up(&lock->l_waitq);
+ RETURN(0);
+ }
+
+ if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
+ LDLM_FL_BLOCK_CONV)))
+ goto granted;
+
+ LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
+ "sleeping");
+
+ ldlm_lock_dump(D_OTHER, lock);
+
+ fwd.fwd_lock = lock;
+ obd = class_conn2obd(lock->l_connh);
+
+ /* if this is a local lock, then there is no import */
+ if (obd != NULL)
+ imp = obd->u.cli.cl_import;
+
+ if (imp != NULL) {
+ spin_lock_irqsave(&imp->imp_lock, irqflags);
+ fwd.fwd_generation = imp->imp_generation;
+ spin_unlock_irqrestore(&imp->imp_lock, irqflags);
+ }
+
+ lwi = LWI_TIMEOUT_INTR(0, NULL, interrupted_flock_completion_wait,
+ &fwd);
+
+ /* Go to sleep until the lock is granted. */
+ rc = l_wait_event(lock->l_waitq,
+ ((lock->l_req_mode == lock->l_granted_mode) ||
+ lock->l_destroyed), &lwi);
+
+ LASSERT(!(lock->l_destroyed));
+
+ if (rc) {
+ LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
+ rc);
+ RETURN(rc);
+ }
+
+granted:
+
+ LDLM_DEBUG(lock, "client-side enqueue waking up");
+ ns = lock->l_resource->lr_namespace;
+ l_lock(&ns->ns_lock);
+
+ /* ldlm_lock_enqueue() has already placed lock on the granted list. */
+ list_del_init(&lock->l_res_link);
+
+ if (getlk) {
+ /* fcntl(F_GETLK) request */
+ if (lock->l_granted_mode == LCK_PR)
+ getlk->fl_type = F_RDLCK;
+ else if (lock->l_granted_mode == LCK_PW)
+ getlk->fl_type = F_WRLCK;
+ else
+ getlk->fl_type = F_UNLCK;
+ getlk->fl_pid = lock->l_data.l_flock.pid;
+ getlk->fl_start = lock->l_data.l_flock.start;
+ getlk->fl_end = lock->l_data.l_flock.end;
+ /* ldlm_flock_destroy(lock); */
+ } else {
+ flags = LDLM_FL_WAIT_NOREPROC;
+ /* We need to reprocess the lock to do merges or split */
+ ldlm_flock_enqueue(&lock, NULL, &flags, 1, &err);
+ }
+ l_unlock(&ns->ns_lock);
+ RETURN(0);
+}
+
+/* This function is only called on the client when a lock is aborted. */
+int
+ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *ld,
+ void *data, int flag)
+{
+ ENTRY;
+ ldlm_lock_destroy(lock);
+ RETURN(0);
+}
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
+ * Author: Peter Braam <braam@clusterfs.com>
+ * Author: Phil Schwan <phil@clusterfs.com>
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_LDLM
+
+#ifdef __KERNEL__
+#include <linux/lustre_dlm.h>
+#include <linux/obd_support.h>
+#include <linux/lustre_lib.h>
+#else
+#include <liblustre.h>
+#endif
+
+static int
+ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *new,
+ int send_cbs, int first_enq)
+{
+ struct list_head *tmp, *pos;
+ ldlm_mode_t mode = new->l_req_mode;
+ int compat = 1;
+ ENTRY;
+
+ list_for_each_safe(tmp, pos, queue) {
+ struct ldlm_lock *old;
+
+ old = list_entry(tmp, struct ldlm_lock, l_res_link);
+ if (old == new)
+ continue;
+
+ if (lockmode_compat(old->l_req_mode, mode) &&
+ lockmode_compat(old->l_granted_mode, mode)) {
+ CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
+ continue;
+ }
+
+ compat = 0;
+
+ /* if we're reprocessing the lock then the blocking ASTs
+ * have already been sent. No need to continue. */
+ if (!first_enq)
+ break;
+
+ if (send_cbs && (old->l_blocking_ast != NULL)) {
+ CDEBUG(D_DLMTRACE, "lock %p incompatible; "
+ "sending blocking AST.\n", old);
+ ldlm_add_ast_work_item(old, new, NULL, 0);
+ } else if (!(old->l_flags & LDLM_FL_LOCAL)) {
+ CDEBUG(D_DLMTRACE, "lock %p incompatible; "
+ "setting blocking AST.\n", old);
+ old->l_flags |= LDLM_FL_AST_SENT;
+ } else {
+ CDEBUG(D_DLMTRACE, "local lock %p incompatible.\n",
+ old);
+ }
+ }
+
+ RETURN(compat);
+}
+
+int
+ldlm_plain_enqueue(struct ldlm_lock **lockp, void *cookie, int *flags,
+ int first_enq, ldlm_error_t *err)
+{
+ struct ldlm_lock *lock = *lockp;
+ struct ldlm_resource *res = lock->l_resource;
+ int convert_compat = 1;
+ int waiting_compat = 1;
+ int granted_compat = 1;
+ ENTRY;
+
+ /* FIXME: We may want to optimize by checking lr_most_restr */
+
+ /* On the first enqueue of this lock scan all of the queues
+ * to set the LDLM_FL_AST_SENT flag in conflicting locks.
+ * When the completion AST on the client side runs and sees
+ * this flag is will set the LDLM_FL_CB_PENDING flag in the
+ * lock so the client will know to cancel the lock as soon
+ * as possible. This saves us from sending a blocking AST
+ * in addition to the completion AST.
+ *
+ * If it's NOT the first enqueue of this lock then it must be
+ * the first eligible lock in the queues because of the way that
+ * ldlm_reprocess_all() works. So we don't have to check the
+ * converting or waiting queues. */
+ if (first_enq) {
+ if (!list_empty(&res->lr_converting)) {
+ convert_compat = 0;
+ ldlm_plain_compat_queue(&res->lr_converting,
+ lock, 0, first_enq);
+ }
+ if (!list_empty(&res->lr_waiting)) {
+ waiting_compat = 0;
+ ldlm_plain_compat_queue(&res->lr_waiting,
+ lock, 0, first_enq);
+ }
+ }
+ granted_compat =
+ ldlm_plain_compat_queue(&res->lr_granted, lock, 1, first_enq);
+
+ if (!convert_compat) {
+ *flags |= LDLM_FL_BLOCK_CONV;
+ RETURN(LDLM_ITER_STOP);
+ }
+ if (!waiting_compat) {
+ *flags |= LDLM_FL_BLOCK_WAIT;
+ RETURN(LDLM_ITER_STOP);
+ }
+ if (!granted_compat) {
+ *flags |= LDLM_FL_BLOCK_GRANTED;
+ RETURN(LDLM_ITER_STOP);
+ }
+
+ list_del_init(&lock->l_res_link);
+ ldlm_grant_lock(lock, NULL, 0);
+
+ if (lock->l_flags & LDLM_FL_AST_SENT) {
+ CDEBUG(D_DLMTRACE, "granted lock %p with AST set\n", lock);
+ *flags |= (lock->l_flags & LDLM_FL_AST_SENT);
+ }
+
+ RETURN(LDLM_ITER_CONTINUE);
+}