Whamcloud - gitweb
- Add extent information to LDLM_DEBUG
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 5f21751..723b2ee 100644 (file)
@@ -1,21 +1,78 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * Copyright (C) 2002 Cluster File Systems, Inc.
+ *  Copyright (C) 2002 Cluster File Systems, Inc.
  *
- * This code is issued under the GNU General Public License.
- * See the file COPYING in this distribution
+ *   This file is part of Lustre, http://www.lustre.org.
  *
- * by Cluster File Systems, Inc.
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
 #define DEBUG_SUBSYSTEM S_LDLM
 
 #include <linux/lustre_dlm.h>
+#include <linux/obd_class.h>
+#include <linux/obd.h>
 
-int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
+static int interrupted_completion_wait(void *data)
+{
+        RETURN(1);
+}
+
+static int expired_completion_wait(void *data)
 {
+        struct ldlm_lock *lock = data;
+        struct ptlrpc_connection *conn;
+        struct obd_device *obd;
 
+        if (!lock)
+                CERROR("NULL lock\n");
+        else if (!lock->l_connh)
+                CERROR("lock %p has NULL connh\n", lock);
+        else if (!(obd = class_conn2obd(lock->l_connh)))
+                CERROR("lock %p has NULL obd\n", lock);
+        else if (!(conn = obd->u.cli.cl_import.imp_connection))
+                CERROR("lock %p has NULL connection\n", lock);
+        else {
+                class_signal_connection_failure(conn);
+        }
+        RETURN(0);
+}
+
+#if 0
+static int expired_completion_wait(void *data)
+{
+        struct ldlm_lock *lock = data;
+        struct ptlrpc_connection *conn =
+                class_conn2cliimp(lock->l_connh)->imp_connection;
+
+        if (!conn) {
+                CERROR("lock %p has NULL import connection\n", lock);
+                RETURN(1);
+        }
+
+        class_signal_connection_failure(conn);
+        RETURN(0);
+}
+#endif
+
+int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
+{
+        struct l_wait_info lwi =
+                LWI_TIMEOUT_INTR(obd_timeout * HZ, expired_completion_wait,
+                                 interrupted_completion_wait, lock);
+        int rc = 0;
         ENTRY;
 
         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
@@ -26,19 +83,28 @@ int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
                            " sleeping");
                 ldlm_lock_dump(lock);
                 ldlm_reprocess_all(lock->l_resource);
-                wait_event(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode);
-                LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
-        } else if (flags == LDLM_FL_WAIT_NOREPROC) { 
-                wait_event(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode);
-        } else if (flags == 0) { 
+                rc = l_wait_event(lock->l_waitq,
+                                  (lock->l_req_mode == lock->l_granted_mode),
+                                  &lwi);
+                if (rc) {
+                        LDLM_DEBUG(lock,
+                                   "client-side enqueue waking up: failed (%d)",
+                                   rc);
+                } else {
+                        LDLM_DEBUG(lock,
+                                   "client-side enqueue waking up: granted");
+                }
+        } else if (flags == LDLM_FL_WAIT_NOREPROC) {
+                rc = l_wait_event(lock->l_waitq,
+                                  (lock->l_req_mode == lock->l_granted_mode),
+                                  &lwi);
+        } else if (flags == 0) {
                 wake_up(&lock->l_waitq);
-
         }
 
-        RETURN(0);
+        RETURN(rc);
 }
 
-
 static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
                                   struct lustre_handle *parent_lockh,
                                   __u64 *res_id,
@@ -47,29 +113,32 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
                                   ldlm_mode_t mode,
                                   int *flags,
                                   ldlm_completion_callback completion,
-                                  ldlm_lock_callback callback,
+                                  ldlm_blocking_callback blocking,
                                   void *data,
                                   __u32 data_len,
                                   struct lustre_handle *lockh)
 {
-        struct ldlm_lock *lock; 
-        int err; 
+        struct ldlm_lock *lock;
+        int err;
+        ENTRY;
 
-        if (ns->ns_client) { 
-                CERROR("Trying to cancel local lock\n"); 
+        if (ns->ns_client) {
+                CERROR("Trying to enqueue local lock in a shadow namespace\n");
                 LBUG();
         }
 
-        lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, NULL, 0);
+        lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, data,
+                                data_len);
         if (!lock)
                 GOTO(out_nolock, err = -ENOMEM);
         LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
 
         ldlm_lock_addref_internal(lock, mode);
         ldlm_lock2handle(lock, lockh);
-        lock->l_connh = NULL; 
+        lock->l_connh = NULL;
 
-        err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion, callback);
+        err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
+                                blocking);
         if (err != ELDLM_OK)
                 GOTO(out, err);
 
@@ -78,10 +147,11 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         if ((*flags) & LDLM_FL_LOCK_CHANGED)
                 memcpy(res_id, lock->l_resource->lr_name, sizeof(*res_id));
 
-        LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)", lock);
+        LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)",
+                          lock);
 
         if (lock->l_completion_ast)
-                lock->l_completion_ast(lock, *flags); 
+                lock->l_completion_ast(lock, *flags);
 
         LDLM_DEBUG(lock, "client-side local enqueue END");
         EXIT;
@@ -91,8 +161,7 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         return err;
 }
 
-
-int ldlm_cli_enqueue(struct lustre_handle *connh, 
+int ldlm_cli_enqueue(struct lustre_handle *connh,
                      struct ptlrpc_request *req,
                      struct ldlm_namespace *ns,
                      struct lustre_handle *parent_lock_handle,
@@ -102,23 +171,22 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                      ldlm_mode_t mode,
                      int *flags,
                      ldlm_completion_callback completion,
-                     ldlm_lock_callback callback,
+                     ldlm_blocking_callback blocking,
                      void *data,
                      __u32 data_len,
                      struct lustre_handle *lockh)
 {
-        struct ptlrpc_connection *connection;
         struct ldlm_lock *lock;
         struct ldlm_request *body;
         struct ldlm_reply *reply;
         int rc, size = sizeof(*body), req_passed_in = 1;
         ENTRY;
 
-        if (connh == NULL) 
-                return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id, 
-                                              type, cookie, cookielen, mode, 
-                                              flags, completion, callback, data, data_len, lockh);
-        connection = client_conn2cli(connh)->cl_conn;
+        if (connh == NULL)
+                return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
+                                              type, cookie, cookielen, mode,
+                                              flags, completion, blocking, data,
+                                              data_len, lockh);
 
         *flags = 0;
         lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
@@ -131,7 +199,8 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         ldlm_lock2handle(lock, lockh);
 
         if (req == NULL) {
-                req = ptlrpc_prep_req2(connh, LDLM_ENQUEUE, 1, &size, NULL);
+                req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_ENQUEUE, 1,
+                                      &size, NULL);
                 if (!req)
                         GOTO(out, rc = -ENOMEM);
                 req_passed_in = 0;
@@ -157,9 +226,8 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                 size = sizeof(*reply);
                 req->rq_replen = lustre_msg_size(1, &size);
         }
-        lock->l_connh = connh; 
-        lock->l_connection = ptlrpc_connection_addref(connection);
-        lock->l_client = client_conn2cli(connh)->cl_client;
+        lock->l_connh = connh;
+        lock->l_export = NULL;
 
         rc = ptlrpc_queue_wait(req);
         /* FIXME: status check here? */
@@ -191,27 +259,38 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         /* If enqueue returned a blocked lock but the completion handler has
          * already run, then it fixed up the resource and we don't need to do it
          * again. */
-        if ((*flags) & LDLM_FL_LOCK_CHANGED &&
-            lock->l_req_mode != lock->l_granted_mode) {
-                CDEBUG(D_INFO, "remote intent success, locking %ld instead of"
-                       "%ld\n", (long)reply->lock_resource_name[0],
-                       (long)lock->l_resource->lr_name[0]);
-
-                ldlm_lock_change_resource(lock, reply->lock_resource_name);
-                if (lock->l_resource == NULL) {
-                        LBUG();
-                        RETURN(-ENOMEM);
+        if ((*flags) & LDLM_FL_LOCK_CHANGED) {
+                int newmode = reply->lock_mode;
+                if (newmode && newmode != lock->l_req_mode) {
+                        LDLM_DEBUG(lock, "server returned different mode %s",
+                                   ldlm_lockname[newmode]);
+                        lock->l_req_mode = newmode;
+                }
+
+                if (reply->lock_resource_name[0] !=
+                    lock->l_resource->lr_name[0]) {
+                        CDEBUG(D_INFO, "remote intent success, locking %ld "
+                               "instead of %ld\n",
+                               (long)reply->lock_resource_name[0],
+                               (long)lock->l_resource->lr_name[0]);
+
+                        ldlm_lock_change_resource(lock,
+                                                  reply->lock_resource_name);
+                        if (lock->l_resource == NULL) {
+                                LBUG();
+                                RETURN(-ENOMEM);
+                        }
+                        LDLM_DEBUG(lock, "client-side enqueue, new resource");
                 }
-                LDLM_DEBUG(lock, "client-side enqueue, new resource");
         }
 
         if (!req_passed_in)
                 ptlrpc_free_req(req);
 
         rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
-                               callback);
+                               blocking);
         if (lock->l_completion_ast)
-                lock->l_completion_ast(lock, *flags); 
+                lock->l_completion_ast(lock, *flags);
 
         LDLM_DEBUG(lock, "client-side enqueue END");
         EXIT;
@@ -221,7 +300,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         return rc;
 }
 
-int ldlm_match_or_enqueue(struct lustre_handle *connh, 
+int ldlm_match_or_enqueue(struct lustre_handle *connh,
                           struct ptlrpc_request *req,
                           struct ldlm_namespace *ns,
                           struct lustre_handle *parent_lock_handle,
@@ -231,7 +310,7 @@ int ldlm_match_or_enqueue(struct lustre_handle *connh,
                           ldlm_mode_t mode,
                           int *flags,
                           ldlm_completion_callback completion,
-                          ldlm_lock_callback callback,
+                          ldlm_blocking_callback blocking,
                           void *data,
                           __u32 data_len,
                           struct lustre_handle *lockh)
@@ -242,8 +321,8 @@ int ldlm_match_or_enqueue(struct lustre_handle *connh,
         if (rc == 0) {
                 rc = ldlm_cli_enqueue(connh, req, ns,
                                       parent_lock_handle, res_id, type, cookie,
-                                      cookielen, mode, flags, completion, callback, data,
-                                      data_len, lockh);
+                                      cookielen, mode, flags, completion,
+                                      blocking, data, data_len, lockh);
                 if (rc != ELDLM_OK)
                         CERROR("ldlm_cli_enqueue: err: %d\n", rc);
                 RETURN(rc);
@@ -251,66 +330,17 @@ int ldlm_match_or_enqueue(struct lustre_handle *connh,
                 RETURN(0);
 }
 
-int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
-                    void *data, __u32 data_len)
-{
-        struct ldlm_lock *lock;
-        struct ldlm_request *body;
-        struct ptlrpc_request *req;
-        struct ptlrpc_client *cl;
-        int rc = 0, size = sizeof(*body);
-        ENTRY;
-
-        lock = ldlm_handle2lock(lockh);
-        if (lock == NULL) {
-                LBUG();
-                RETURN(-EINVAL);
-        }
-        cl = &lock->l_resource->lr_namespace->ns_rpc_client;
-        req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
-                              &size, NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
-
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
-        memcpy(&body->lock_handle1, &lock->l_remote_handle,
-               sizeof(body->lock_handle1));
-
-        if (desc == NULL) {
-                CDEBUG(D_NET, "Sending granted AST\n");
-                ldlm_lock2desc(lock, &body->lock_desc);
-        } else {
-                CDEBUG(D_NET, "Sending blocked AST\n");
-                memcpy(&body->lock_desc, desc, sizeof(*desc));
-        }
-
-        LDLM_DEBUG(lock, "server preparing %s AST",
-                   desc == 0 ? "completion" : "blocked");
-
-        req->rq_replen = lustre_msg_size(0, NULL);
-
-        rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
-        ptlrpc_free_req(req);
-
-        EXIT;
- out:
-        LDLM_LOCK_PUT(lock);
-        return rc;
-}
-
-
-
-static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, int *flags)
+static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
+                                  int *flags)
 {
 
-        if (lock->l_resource->lr_namespace->ns_client) { 
-                CERROR("Trying to cancel local lock\n"); 
+        if (lock->l_resource->lr_namespace->ns_client) {
+                CERROR("Trying to cancel local lock\n");
                 LBUG();
         }
         LDLM_DEBUG(lock, "client-side local convert");
 
-        ldlm_lock_convert(lock, new_mode, flags); 
+        ldlm_lock_convert(lock, new_mode, flags);
         ldlm_reprocess_all(lock->l_resource);
 
         LDLM_DEBUG(lock, "client-side local convert handler END");
@@ -318,6 +348,8 @@ static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, int *fla
         RETURN(0);
 }
 
+/* FIXME: one of ldlm_cli_convert or the server side should reject attempted
+ * conversion of locks which are on the waiting or converting queue */
 int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
 {
         struct ldlm_request *body;
@@ -335,14 +367,15 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
                 RETURN(-EINVAL);
         }
         *flags = 0;
-        connh = lock->l_connh; 
+        connh = lock->l_connh;
 
         if (!connh)
                 return ldlm_cli_convert_local(lock, new_mode, flags);
 
         LDLM_DEBUG(lock, "client-side convert");
 
-        req = ptlrpc_prep_req2(connh, LDLM_CONVERT, 1, &size, NULL);
+        req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_CONVERT, 1, &size,
+                              NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -368,7 +401,7 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
         /* Go to sleep until the lock is granted. */
         /* FIXME: or cancelled. */
         if (lock->l_completion_ast)
-                lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC); 
+                lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC);
         EXIT;
  out:
         LDLM_LOCK_PUT(lock);
@@ -384,7 +417,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
         int rc = 0, size = sizeof(*body);
         ENTRY;
 
-        lock = ldlm_handle2lock(lockh); 
+        lock = ldlm_handle2lock(lockh);
         if (!lock) {
                 /* It's possible that the decref that we did just before this
                  * cancel was the last reader/writer, and caused a cancel before
@@ -395,33 +428,40 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                 RETURN(-EINVAL);
         }
 
-        if (lock->l_connh) { 
+        if (lock->l_connh) {
                 LDLM_DEBUG(lock, "client-side cancel");
-                req = ptlrpc_prep_req2(lock->l_connh, LDLM_CANCEL, 1, &size, NULL);
+                /* Set this flag to prevent others from getting new references*/
+                l_lock(&lock->l_resource->lr_namespace->ns_lock);
+                lock->l_flags |= LDLM_FL_CBPENDING;
+                ldlm_cancel_callback(lock);
+                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
+                req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh),
+                                      LDLM_CANCEL, 1, &size, NULL);
                 if (!req)
                         GOTO(out, rc = -ENOMEM);
-                
+
                 body = lustre_msg_buf(req->rq_reqmsg, 0);
                 memcpy(&body->lock_handle1, &lock->l_remote_handle,
                        sizeof(body->lock_handle1));
-                
+
                 req->rq_replen = lustre_msg_size(0, NULL);
-                
+
                 rc = ptlrpc_queue_wait(req);
                 rc = ptlrpc_check_status(req, rc);
                 ptlrpc_free_req(req);
                 if (rc != ELDLM_OK)
                         GOTO(out, rc);
-                
+
                 ldlm_lock_cancel(lock);
-        } else { 
+        } else {
                 LDLM_DEBUG(lock, "client-side local cancel");
-                if (lock->l_resource->lr_namespace->ns_client) { 
-                        CERROR("Trying to cancel local lock\n"); 
+                if (lock->l_resource->lr_namespace->ns_client) {
+                        CERROR("Trying to cancel local lock\n");
                         LBUG();
                 }
-                ldlm_lock_cancel(lock); 
-                ldlm_reprocess_all(lock->l_resource); 
+                ldlm_lock_cancel(lock);
+                ldlm_reprocess_all(lock->l_resource);
                 LDLM_DEBUG(lock, "client-side local cancel handler END");
         }
 
@@ -430,3 +470,65 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
         LDLM_LOCK_PUT(lock);
         return rc;
 }
+
+/* Cancel all locks on a given resource that have 0 readers/writers.
+ *
+ * If 'local_only' is true, throw the locks away without trying to notify the
+ * server. */
+int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id,
+                           int local_only)
+{
+        struct ldlm_resource *res;
+        struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
+        struct ldlm_ast_work *w;
+        ENTRY;
+
+        res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
+        if (res == NULL)
+                RETURN(-EINVAL);
+
+        l_lock(&ns->ns_lock);
+        list_for_each(tmp, &res->lr_granted) {
+                struct ldlm_lock *lock;
+                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+                if (lock->l_readers || lock->l_writers)
+                        continue;
+
+                /* Setting the CBPENDING flag is a little misleading, but
+                 * prevents an important race; namely, once CBPENDING is set,
+                 * the lock can accumulate no more readers/writers.  Since
+                 * readers and writers are already zero here, ldlm_lock_decref
+                 * won't see this flag and call l_blocking_ast */
+                lock->l_flags |= LDLM_FL_CBPENDING;
+
+                OBD_ALLOC(w, sizeof(*w));
+                LASSERT(w);
+
+                w->w_lock = LDLM_LOCK_GET(lock);
+                list_add(&w->w_list, &list);
+        }
+        l_unlock(&ns->ns_lock);
+
+        list_for_each_safe(tmp, next, &list) {
+                struct lustre_handle lockh;
+                int rc;
+                w = list_entry(tmp, struct ldlm_ast_work, w_list);
+
+                if (local_only)
+                        ldlm_lock_cancel(w->w_lock);
+                else {
+                        ldlm_lock2handle(w->w_lock, &lockh);
+                        rc = ldlm_cli_cancel(&lockh);
+                        if (rc != ELDLM_OK)
+                                CERROR("ldlm_cli_cancel: %d\n", rc);
+                }
+                LDLM_LOCK_PUT(w->w_lock);
+                list_del(&w->w_list);
+                OBD_FREE(w, sizeof(*w));
+        }
+
+        ldlm_resource_put(res);
+
+        RETURN(0);
+}