Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index d64a402..e5d9c24 100644 (file)
 #include <linux/obd_class.h>
 #include <linux/obd.h>
 
-static int interrupted_completion_wait(void *data)
+static void interrupted_completion_wait(void *data)
 {
-        RETURN(1);
 }
 
+struct lock_wait_data {
+        struct ldlm_lock *lwd_lock;
+        int               lwd_generation;
+};
+
 int ldlm_expired_completion_wait(void *data)
 {
-        struct ldlm_lock *lock = data;
-        struct ptlrpc_connection *conn;
-        struct obd_device *obd;
+        struct lock_wait_data *lwd = data;
+        struct ldlm_lock *lock = lwd->lwd_lock;
+        struct obd_device *obd = class_conn2obd(lock->l_connh);
 
-        if (!lock)
-                CERROR("NULL lock\n");
-        else if (!lock->l_connh)
-                CERROR("lock %p has NULL connh\n", lock);
-        else if (!(obd = class_conn2obd(lock->l_connh)))
-                CERROR("lock %p has NULL obd\n", lock);
-        else if (!(conn = obd->u.cli.cl_import.imp_connection))
-                CERROR("lock %p has NULL connection\n", lock);
-        else {
-                LDLM_DEBUG(lock, "timed out waiting for completion");
-                CERROR("lock %p timed out from %s\n", lock,
-                       conn->c_remote_uuid.uuid);
-                ldlm_lock_dump(D_ERROR, lock);
-                class_signal_connection_failure(conn);
+        if (obd == NULL) {
+                LDLM_ERROR(lock, "lock timed out; mot entering recovery in "
+                           "server code, just going back to sleep");
+        } else {
+                struct obd_import *imp = obd->u.cli.cl_import;
+                ptlrpc_fail_import(imp, lwd->lwd_generation);
+                LDLM_ERROR(lock, "lock timed out, entering recovery for %s@%s",
+                           imp->imp_target_uuid.uuid,
+                           imp->imp_connection->c_remote_uuid.uuid);
         }
+        
         RETURN(0);
 }
 
 int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 {
-        struct l_wait_info lwi =
-                LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
-                                 interrupted_completion_wait, lock);
+        struct lock_wait_data lwd;
+        unsigned long irqflags;
+        struct obd_device *obd;
+        struct obd_import *imp = NULL;
         int rc = 0;
+        struct l_wait_info lwi;
+
+        obd = class_conn2obd(lock->l_connh);
+
+        /* if this is a local lock, then there is no import */
+        if (obd != NULL)
+                imp = obd->u.cli.cl_import;
+
+        lwd.lwd_lock = lock;
+
+        lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
+                               interrupted_completion_wait, &lwd);
         ENTRY;
 
         if (flags == LDLM_FL_WAIT_NOREPROC)
@@ -84,6 +97,12 @@ int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         ldlm_reprocess_all(lock->l_resource);
 
  noreproc:
+        if (imp != NULL) {
+                spin_lock_irqsave(&imp->imp_lock, irqflags);
+                lwd.lwd_generation = imp->imp_generation;
+                spin_unlock_irqrestore(&imp->imp_lock, irqflags);
+        }
+
         /* Go to sleep until the lock is granted or cancelled. */
         rc = l_wait_event(lock->l_waitq,
                           ((lock->l_req_mode == lock->l_granted_mode) ||
@@ -114,7 +133,6 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
                                   ldlm_completion_callback completion,
                                   ldlm_blocking_callback blocking,
                                   void *data,
-                                  void *cp_data,
                                   struct lustre_handle *lockh)
 {
         struct ldlm_lock *lock;
@@ -127,7 +145,7 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         }
 
         lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode,
-                                data, cp_data);
+                                blocking, data);
         if (!lock)
                 GOTO(out_nolock, err = -ENOMEM);
         LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
@@ -136,8 +154,8 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         ldlm_lock2handle(lock, lockh);
         lock->l_flags |= LDLM_FL_LOCAL;
 
-        err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags, completion,
-                                blocking);
+        err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags,
+                                completion);
         if (err != ELDLM_OK)
                 GOTO(out, err);
 
@@ -172,7 +190,6 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                      ldlm_completion_callback completion,
                      ldlm_blocking_callback blocking,
                      void *data,
-                     void *cp_data,
                      struct lustre_handle *lockh)
 {
         struct ldlm_lock *lock;
@@ -188,7 +205,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                 rc = ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
                                             type, cookie, cookielen, mode,
                                             flags, completion, blocking, data,
-                                            cp_data, lockh);
+                                            lockh);
                 RETURN(rc);
         }
 
@@ -200,7 +217,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                 LASSERT(connh == lock->l_connh);
         } else {
                 lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type,
-                                        mode, data, cp_data);
+                                        mode, blocking, data);
                 if (lock == NULL)
                         GOTO(out_nolock, rc = -ENOMEM);
                 /* ugh.  I set this early (instead of waiting for _enqueue)
@@ -227,7 +244,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                 LBUG();
 
         /* Dump lock data into the request buffer */
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
         ldlm_lock2desc(lock, &body->lock_desc);
         body->lock_flags = *flags;
 
@@ -243,6 +260,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         }
         lock->l_connh = connh;
         lock->l_export = NULL;
+        lock->l_blocking_ast = blocking;
 
         LDLM_DEBUG(lock, "sending request");
         rc = ptlrpc_queue_wait(req);
@@ -253,26 +271,54 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
                 /* Set a flag to prevent us from sending a CANCEL (bug 407) */
                 l_lock(&ns->ns_lock);
-                lock->l_flags |= LDLM_FL_CANCELING;
+                lock->l_flags |= LDLM_FL_LOCAL_ONLY;
                 l_unlock(&ns->ns_lock);
 
                 ldlm_lock_decref_and_cancel(lockh, mode);
+
+                if (rc == ELDLM_LOCK_ABORTED) {
+                        /* caller expects reply buffer 0 to have been swabbed */
+                        reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+                                                   lustre_swab_ldlm_reply);
+                        if (reply == NULL) {
+                                CERROR ("Can't unpack ldlm_reply\n");
+                                GOTO (out_req, rc = -EPROTO);
+                        }
+                }
                 GOTO(out_req, rc);
         }
 
-        reply = lustre_msg_buf(req->rq_repmsg, 0);
+        reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+                                   lustre_swab_ldlm_reply);
+        if (reply == NULL) {
+                CERROR ("Can't unpack ldlm_reply\n");
+                GOTO (out_req, rc = -EPROTO);
+        }
+        
         memcpy(&lock->l_remote_handle, &reply->lock_handle,
                sizeof(lock->l_remote_handle));
         *flags = reply->lock_flags;
 
-        CDEBUG(D_INFO, "local: %p, remote: %p, flags: %d\n", lock,
-               (void *)(unsigned long)reply->lock_handle.addr, *flags);
+        CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: %d\n", lock,
+               reply->lock_handle.cookie, *flags);
         if (type == LDLM_EXTENT) {
                 CDEBUG(D_INFO, "requested extent: "LPU64" -> "LPU64", got "
                        "extent "LPU64" -> "LPU64"\n",
                        body->lock_desc.l_extent.start,
                        body->lock_desc.l_extent.end,
                        reply->lock_extent.start, reply->lock_extent.end);
+
+                if ((reply->lock_extent.end & ~PAGE_MASK) != ~PAGE_MASK) {
+                        /* XXX Old versions of BA OST code have a fencepost bug
+                         * which will cause them to grant a lock that's one
+                         * byte too large.  This can be safely removed after BA
+                         * ships their next release -phik (02 Apr 2003) */
+                        reply->lock_extent.end--;
+                } else if ((reply->lock_extent.start & ~PAGE_MASK) ==
+                           ~PAGE_MASK) {
+                        reply->lock_extent.start++;
+                }
+
                 cookie = &reply->lock_extent; /* FIXME bug 267 */
                 cookielen = sizeof(reply->lock_extent);
         }
@@ -310,7 +356,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                 l_lock(&ns->ns_lock);
                 lock->l_completion_ast = NULL;
                 rc = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags,
-                                       completion, blocking);
+                                       completion);
                 l_unlock(&ns->ns_lock);
                 if (lock->l_completion_ast)
                         lock->l_completion_ast(lock, *flags, NULL);
@@ -339,7 +385,6 @@ int ldlm_match_or_enqueue(struct lustre_handle *connh,
                           ldlm_completion_callback completion,
                           ldlm_blocking_callback blocking,
                           void *data,
-                          void *cp_data,
                           struct lustre_handle *lockh)
 {
         int rc;
@@ -357,7 +402,7 @@ int ldlm_match_or_enqueue(struct lustre_handle *connh,
                 rc = ldlm_cli_enqueue(connh, req, ns, parent_lock_handle,
                                       res_id, type, cookie, cookielen, mode,
                                       flags, completion, blocking, data,
-                                      cp_data, lockh);
+                                      lockh);
                 if (rc != ELDLM_OK)
                         CERROR("ldlm_cli_enqueue: err: %d\n", rc);
                 RETURN(rc);
@@ -373,7 +418,7 @@ int ldlm_cli_replay_enqueue(struct ldlm_lock *lock)
         ldlm_lock2handle(lock, &lockh);
         return ldlm_cli_enqueue(lock->l_connh, NULL, NULL, NULL, junk,
                                 lock->l_resource->lr_type, NULL, 0, -1, &flags,
-                                NULL, NULL, NULL, 0, &lockh);
+                                NULL, NULL, NULL, &lockh);
 }
 
 static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
@@ -425,7 +470,7 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
         memcpy(&body->lock_handle1, &lock->l_remote_handle,
                sizeof(body->lock_handle1));
 
@@ -439,7 +484,13 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
         if (rc != ELDLM_OK)
                 GOTO(out, rc);
 
-        reply = lustre_msg_buf(req->rq_repmsg, 0);
+        reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+                                   lustre_swab_ldlm_reply);
+        if (reply == NULL) {
+                CERROR ("Can't unpack ldlm_reply\n");
+                GOTO (out, rc = -EPROTO);
+        }
+        
         res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
         if (res != NULL)
                 ldlm_reprocess_all(res);
@@ -469,23 +520,30 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
 
         if (lock->l_connh) {
                 int local_only;
+                struct obd_import *imp;
 
                 LDLM_DEBUG(lock, "client-side cancel");
                 /* Set this flag to prevent others from getting new references*/
                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
                 lock->l_flags |= LDLM_FL_CBPENDING;
-                ldlm_cancel_callback(lock);
                 local_only = (lock->l_flags & LDLM_FL_LOCAL_ONLY);
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+                ldlm_cancel_callback(lock);
 
                 if (local_only) {
                         CDEBUG(D_INFO, "not sending request (at caller's "
-                               "instruction\n");
+                               "instruction)\n");
+                        goto local_cancel;
+                }
+
+                imp = class_conn2cliimp(lock->l_connh);
+                if (imp == NULL || imp->imp_invalid) {
+                        CDEBUG(D_HA, "skipping cancel on invalid import %p\n",
+                               imp);
                         goto local_cancel;
                 }
 
-                req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh),
-                                      LDLM_CANCEL, 1, &size, NULL);
+                req = ptlrpc_prep_req(imp, LDLM_CANCEL, 1, &size, NULL);
                 if (!req)
                         GOTO(out, rc = -ENOMEM);
 
@@ -493,21 +551,23 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
                 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
 
-                body = lustre_msg_buf(req->rq_reqmsg, 0);
+                body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
                 memcpy(&body->lock_handle1, &lock->l_remote_handle,
                        sizeof(body->lock_handle1));
 
                 req->rq_replen = lustre_msg_size(0, NULL);
 
                 rc = ptlrpc_queue_wait(req);
-                ptlrpc_req_finished(req);
-                if (rc == ESTALE) {
-                        CERROR("client/server out of sync\n");
-                        LBUG();
-                }
-                if (rc != ELDLM_OK)
+
+                if (rc == ESTALE)
+                        CERROR("client/server (nid "LPU64") out of sync--not "
+                               "fatal\n",
+                               req->rq_import->imp_connection->c_peer.peer_nid);
+                else if (rc != ELDLM_OK)
                         CERROR("Got rc %d from cancel RPC: canceling "
                                "anyway\n", rc);
+
+                ptlrpc_req_finished(req);
         local_cancel:
                 ldlm_lock_cancel(lock);
         } else {
@@ -585,8 +645,9 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns)
         RETURN(rc);
 }
 
-int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
-                                    struct ldlm_res_id res_id, int flags)
+static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
+                                           struct ldlm_res_id res_id, int flags,
+                                           void *opaque)
 {
         struct ldlm_resource *res;
         struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
@@ -605,8 +666,17 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
                 struct ldlm_lock *lock;
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
-                if (lock->l_readers || lock->l_writers)
-                        continue;
+                if (lock->l_readers || lock->l_writers) {
+                        if (flags & LDLM_FL_WARN) {
+                                LDLM_ERROR(lock, "lock in use");
+                                LBUG();
+                        }
+                }
+                if (opaque != NULL && lock->l_data != opaque) {
+                        LDLM_ERROR(lock, "data %p doesn't match opaque %p",
+                                   lock->l_data, opaque);
+                        LBUG();
+                }
 
                 /* See CBPENDING comment in ldlm_cancel_lru */
                 lock->l_flags |= LDLM_FL_CBPENDING;
@@ -653,9 +723,10 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
  *
  * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
  * to notify the server.
- * If flags & LDLM_FL_NO_CALLBACK, don't run the cancel callback. */
+ * If flags & LDLM_FL_NO_CALLBACK, don't run the cancel callback.
+ * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. */
 int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
-                           struct ldlm_res_id *res_id, int flags)
+                           struct ldlm_res_id *res_id, int flags, void *opaque)
 {
         int i;
         ENTRY;
@@ -664,7 +735,8 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
                 RETURN(ELDLM_OK);
 
         if (res_id)
-                RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags));
+                RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags,
+                                                       opaque));
 
         l_lock(&ns->ns_lock);
         for (i = 0; i < RES_HASH_SIZE; i++) {
@@ -676,7 +748,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
                         ldlm_resource_getref(res);
 
                         rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
-                                                             flags);
+                                                             flags, opaque);
 
                         if (rc)
                                 CERROR("cancel_unused_res ("LPU64"): %d\n",
@@ -827,7 +899,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
         /* We're part of recovery, so don't wait for it. */
         req->rq_level = LUSTRE_CONN_RECOVD;
         
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
         ldlm_lock2desc(lock, &body->lock_desc);
         body->lock_flags = flags;
 
@@ -839,8 +911,14 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
         rc = ptlrpc_queue_wait(req);
         if (rc != ELDLM_OK)
                 GOTO(out, rc);
-
-        reply = lustre_msg_buf(req->rq_repmsg, 0);
+        
+        reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+                                   lustre_swab_ldlm_reply);
+        if (reply == NULL) {
+                CERROR("Can't unpack ldlm_reply\n");
+                GOTO (out, rc = -EPROTO);
+        }
+        
         memcpy(&lock->l_remote_handle, &reply->lock_handle,
                sizeof(lock->l_remote_handle));
         LDLM_DEBUG(lock, "replayed lock:");