Whamcloud - gitweb
b=13872
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
index 3a8bd24..6cb71e7 100644 (file)
@@ -1,43 +1,47 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
- *   Author: Peter Braam <braam@clusterfs.com>
- *   Author: Phil Schwan <phil@clusterfs.com>
+ *  Copyright (c) 2003 Hewlett-Packard Development Company LP.
+ *   Developed under the sponsorship of the US Government under
+ *   Subcontract No. B514193
  *
- *   This file is part of Lustre, http://www.lustre.org.
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
  *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
  *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
  *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
  */
 
 #define DEBUG_SUBSYSTEM S_LDLM
 
 #ifdef __KERNEL__
-#include <linux/lustre_dlm.h>
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
-#include <linux/lustre_lib.h>
-#include <portals/list.h>
+#include <lustre_dlm.h>
+#include <obd_support.h>
+#include <obd_class.h>
+#include <lustre_lib.h>
+#include <libcfs/list.h>
 #else
 #include <liblustre.h>
+#include <obd_class.h>
 #endif
 
 #include "ldlm_internal.h"
 
 #define l_flock_waitq   l_lru
 
-static struct list_head ldlm_flock_waitq = LIST_HEAD_INIT(ldlm_flock_waitq);
+static struct list_head ldlm_flock_waitq = CFS_LIST_HEAD_INIT(ldlm_flock_waitq);
 
 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                             void *data, int flag);
@@ -87,7 +91,7 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
                 ldlm_lock_decref_internal(lock, mode);
         }
 
-        ldlm_lock_destroy(lock);
+        ldlm_lock_destroy_nolock(lock);
         EXIT;
 }
 
@@ -120,7 +124,7 @@ restart:
 
 int
 ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
-                        ldlm_error_t *err)
+                        ldlm_error_t *err, struct list_head *work_list)
 {
         struct ldlm_resource *res = req->l_resource;
         struct ldlm_namespace *ns = res->lr_namespace;
@@ -130,13 +134,13 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
         struct ldlm_lock *new = req;
         struct ldlm_lock *new2 = NULL;
         ldlm_mode_t mode = req->l_req_mode;
-        int local = ns->ns_client;
+        int local = ns_is_client(ns);
         int added = (mode == LCK_NL);
         int overlaps = 0;
         ENTRY;
 
-        CDEBUG(D_DLMTRACE, "flags %#x pid "LPU64" mode %u start "LPU64" end "
-               LPU64"\n", *flags, new->l_policy_data.l_flock.pid, mode,
+        CDEBUG(D_DLMTRACE, "flags %#x pid %u mode %u start "LPU64" end "LPU64
+               "\n", *flags, new->l_policy_data.l_flock.pid, mode,
                req->l_policy_data.l_flock.start,
                req->l_policy_data.l_flock.end);
 
@@ -254,7 +258,8 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                          * overflow and underflow. */
                         if ((new->l_policy_data.l_flock.start >
                              (lock->l_policy_data.l_flock.end + 1))
-                            && (lock->l_policy_data.l_flock.end != ~0))
+                            && (lock->l_policy_data.l_flock.end !=
+                                OBD_OBJECT_EOF))
                                 continue;
 
                         if ((new->l_policy_data.l_flock.end <
@@ -329,7 +334,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                 /* XXX - if ldlm_lock_new() can sleep we should
                  * release the ns_lock, allocate the new lock,
                  * and restart processing this lock. */
-                new2 = ldlm_lock_create(ns, NULL, res->lr_name, LDLM_FLOCK,
+                new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
                                         lock->l_granted_mode, NULL, NULL, NULL,
                                         NULL, 0);
                 if (!new2) {
@@ -350,8 +355,10 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                 new2->l_conn_export = lock->l_conn_export;
                 if (lock->l_export != NULL) {
                         new2->l_export = class_export_get(lock->l_export);
+                        spin_lock(&new2->l_export->exp_ldlm_data.led_lock);
                         list_add(&new2->l_export_chain,
                                  &new2->l_export->exp_ldlm_data.led_held_locks);
+                        spin_unlock(&new2->l_export->exp_ldlm_data.led_lock);
                 }
                 if (*flags == LDLM_FL_WAIT_NOREPROC)
                         ldlm_lock_addref_internal(new2, lock->l_granted_mode);
@@ -375,7 +382,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
         if (*flags != LDLM_FL_WAIT_NOREPROC) {
                 if (first_enq) {
                         /* If this is an unlock, reprocess the waitq and
-                         * send completions ASTs for locks that can now be 
+                         * send completions ASTs for locks that can now be
                          * granted. The only problem with doing this
                          * reprocessing here is that the completion ASTs for
                          * newly granted locks will be sent before the unlock
@@ -385,23 +392,21 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                          * ldlm_reprocess_queue. */
                         if ((mode == LCK_NL) && overlaps) {
                                 struct list_head rpc_list
-                                                    = LIST_HEAD_INIT(rpc_list);
+                                                    = CFS_LIST_HEAD_INIT(rpc_list);
                                 int rc;
 restart:
-                                res->lr_tmp = &rpc_list;
-                                ldlm_reprocess_queue(res, &res->lr_waiting);
-                                res->lr_tmp = NULL;
-
-                                l_unlock(&ns->ns_lock);
-                                rc = ldlm_run_ast_work(res->lr_namespace,
-                                                       &rpc_list);
-                                l_lock(&ns->ns_lock);
+                                ldlm_reprocess_queue(res, &res->lr_waiting,
+                                                     &rpc_list);
+
+                                unlock_res(res);
+                                rc = ldlm_run_bl_ast_work(&rpc_list);
+                                lock_res(res);
                                 if (rc == -ERESTART)
                                         GOTO(restart, -ERESTART);
                        }
                 } else {
                         LASSERT(req->l_completion_ast);
-                        ldlm_add_ast_work_item(req, NULL, NULL, 0);
+                        ldlm_add_ast_work_item(req, NULL, work_list);
                 }
         }
 
@@ -412,7 +417,7 @@ restart:
         if (added)
                 ldlm_flock_destroy(req, mode, *flags);
 
-        ldlm_resource_dump(res);
+        ldlm_resource_dump(D_OTHER, res);
         RETURN(LDLM_ITER_CONTINUE);
 }
 
@@ -426,7 +431,6 @@ ldlm_flock_interrupted_wait(void *data)
 {
         struct ldlm_lock *lock;
         struct lustre_handle lockh;
-        int rc;
         ENTRY;
 
         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
@@ -434,9 +438,12 @@ ldlm_flock_interrupted_wait(void *data)
         /* take lock off the deadlock detection waitq. */
         list_del_init(&lock->l_flock_waitq);
 
+        /* client side - set flag to prevent lock from being put on lru list */
+        lock->l_flags |= LDLM_FL_CBPENDING;
+
         ldlm_lock_decref_internal(lock, lock->l_req_mode);
         ldlm_lock2handle(lock, &lockh);
-        rc = ldlm_cli_cancel(&lockh);
+        ldlm_cli_cancel(&lockh);
         EXIT;
 }
 
@@ -444,9 +451,8 @@ int
 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 {
         struct ldlm_namespace *ns;
-        struct file_lock *getlk = lock->l_ast_data;
+        cfs_flock_t *getlk = lock->l_ast_data;
         struct ldlm_flock_wait_data fwd;
-        unsigned long irqflags;
         struct obd_device *obd;
         struct obd_import *imp = NULL;
         ldlm_error_t err;
@@ -459,11 +465,6 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 
         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
 
-        if (flags == 0) {
-                wake_up(&lock->l_waitq);
-                RETURN(0);
-        }
-
         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
                        LDLM_FL_BLOCK_CONV)))
                 goto  granted;
@@ -471,8 +472,6 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
                    "sleeping");
 
-        ldlm_lock_dump(D_DLMTRACE, lock, 0);
-
         fwd.fwd_lock = lock;
         obd = class_exp2obd(lock->l_conn_export);
 
@@ -481,9 +480,9 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 imp = obd->u.cli.cl_import;
 
         if (imp != NULL) {
-                spin_lock_irqsave(&imp->imp_lock, irqflags);
+                spin_lock(&imp->imp_lock);
                 fwd.fwd_generation = imp->imp_generation;
-                spin_unlock_irqrestore(&imp->imp_lock, irqflags);
+                spin_unlock(&imp->imp_lock);
         }
 
         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
@@ -493,19 +492,14 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                           ((lock->l_req_mode == lock->l_granted_mode) ||
                            lock->l_destroyed), &lwi);
 
-        if (rc) {
-                LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
-                           rc);
-                RETURN(rc);
-        }
-
-        LASSERT(!(lock->l_destroyed));
+        LDLM_DEBUG(lock, "client-side enqueue waking up: rc = %d", rc);
+        RETURN(rc);
 
 granted:
 
-        LDLM_DEBUG(lock, "client-side enqueue waking up");
+        LDLM_DEBUG(lock, "client-side enqueue granted");
         ns = lock->l_resource->lr_namespace;
-        l_lock(&ns->ns_lock);
+        lock_res(lock->l_resource);
 
         /* take lock off the deadlock detection waitq. */
         list_del_init(&lock->l_flock_waitq);
@@ -517,29 +511,33 @@ granted:
                 /* fcntl(F_GETLK) request */
                 /* The old mode was saved in getlk->fl_type so that if the mode
                  * in the lock changes we can decref the approprate refcount. */
-                ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
+                ldlm_flock_destroy(lock, cfs_flock_type(getlk), LDLM_FL_WAIT_NOREPROC);
                 switch (lock->l_granted_mode) {
                 case LCK_PR:
-                        getlk->fl_type = F_RDLCK;
+                        cfs_flock_set_type(getlk, F_RDLCK);
                         break;
                 case LCK_PW:
-                        getlk->fl_type = F_WRLCK;
+                        cfs_flock_set_type(getlk, F_WRLCK);
                         break;
                 default:
-                        getlk->fl_type = F_UNLCK;
+                        cfs_flock_set_type(getlk, F_UNLCK);
                 }
-                getlk->fl_pid = lock->l_policy_data.l_flock.pid;
-                getlk->fl_start = lock->l_policy_data.l_flock.start;
-                getlk->fl_end = lock->l_policy_data.l_flock.end;
+                cfs_flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
+                cfs_flock_set_start(getlk, (loff_t)lock->l_policy_data.l_flock.start);
+                cfs_flock_set_end(getlk, (loff_t)lock->l_policy_data.l_flock.end);
         } else {
+                int noreproc = LDLM_FL_WAIT_NOREPROC;
+
                 /* We need to reprocess the lock to do merges or splits
                  * with existing locks owned by this process. */
-                flags = LDLM_FL_WAIT_NOREPROC;
-                ldlm_process_flock_lock(lock, &flags, 1, &err);
+                ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
+                if (flags == 0)
+                        cfs_waitq_signal(&lock->l_waitq);
         }
-        l_unlock(&ns->ns_lock);
+        unlock_res(lock->l_resource);
         RETURN(0);
 }
+EXPORT_SYMBOL(ldlm_flock_completion_ast);
 
 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                             void *data, int flag)
@@ -551,10 +549,10 @@ int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
         LASSERT(flag == LDLM_CB_CANCELING);
 
         ns = lock->l_resource->lr_namespace;
-        
+
         /* take lock off the deadlock detection waitq. */
-        l_lock(&ns->ns_lock);
+        lock_res_and_lock(lock);
         list_del_init(&lock->l_flock_waitq);
-        l_unlock(&ns->ns_lock);
+        unlock_res_and_lock(lock);
         RETURN(0);
 }