Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index dafcb6e..9d2857e 100644 (file)
 # include <linux/module.h>
 # include <linux/slab.h>
 # include <linux/init.h>
+# include <linux/wait.h>
 #else
 # include <liblustre.h>
 #endif
 
 #include <linux/lustre_dlm.h>
 #include <linux/obd_class.h>
-
 extern kmem_cache_t *ldlm_resource_slab;
 extern kmem_cache_t *ldlm_lock_slab;
 extern struct lustre_lock ldlm_handle_lock;
@@ -42,6 +42,10 @@ extern struct list_head ldlm_namespace_list;
 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
 
+static int ldlm_already_setup = 0;
+
+#ifdef __KERNEL__
+
 inline unsigned long round_timeout(unsigned long timeout)
 {
         return ((timeout / HZ) + 1) * HZ;
@@ -51,23 +55,103 @@ inline unsigned long round_timeout(unsigned long timeout)
 static struct list_head waiting_locks_list;
 static spinlock_t waiting_locks_spinlock;
 static struct timer_list waiting_locks_timer;
-static int ldlm_already_setup = 0;
+
+static struct expired_lock_thread {
+        wait_queue_head_t         elt_waitq;
+        int                       elt_state;
+        struct list_head          elt_expired_locks;
+        spinlock_t                elt_lock;
+} expired_lock_thread;
+
+#define ELT_STOPPED   0
+#define ELT_READY     1
+#define ELT_TERMINATE 2
+
+static inline int have_expired_locks(void)
+{
+        int need_to_run;
+
+        spin_lock_bh(&expired_lock_thread.elt_lock);
+        need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
+        spin_unlock_bh(&expired_lock_thread.elt_lock);
+
+        RETURN(need_to_run);
+}
+
+static int expired_lock_main(void *arg)
+{
+        struct list_head *expired = &expired_lock_thread.elt_expired_locks;
+        struct l_wait_info lwi = { 0 };
+        unsigned long flags;
+
+        ENTRY;
+        lock_kernel();
+        kportal_daemonize("ldlm_elt");
+        
+        SIGNAL_MASK_LOCK(current, flags);
+        sigfillset(&current->blocked);
+        RECALC_SIGPENDING;
+        SIGNAL_MASK_UNLOCK(current, flags);
+        
+        unlock_kernel();
+        
+        expired_lock_thread.elt_state = ELT_READY;
+        wake_up(&expired_lock_thread.elt_waitq);
+        
+        while (1) {
+                l_wait_event(expired_lock_thread.elt_waitq,
+                             have_expired_locks() ||
+                             expired_lock_thread.elt_state == ELT_TERMINATE,
+                             &lwi);
+
+                spin_lock_bh(&expired_lock_thread.elt_lock);
+                while (!list_empty(expired)) {
+                        struct ldlm_lock *lock = list_entry(expired->next,
+                                                            struct ldlm_lock,
+                                                            l_pending_chain);
+                        spin_unlock_bh(&expired_lock_thread.elt_lock);
+                        
+                        ptlrpc_fail_export(lock->l_export);
+
+                        spin_lock_bh(&expired_lock_thread.elt_lock);
+                }
+                spin_unlock_bh(&expired_lock_thread.elt_lock);
+
+                if (expired_lock_thread.elt_state == ELT_TERMINATE)
+                        break;
+        }
+
+        expired_lock_thread.elt_state = ELT_STOPPED;
+        wake_up(&expired_lock_thread.elt_waitq);
+        RETURN(0);
+}
 
 static void waiting_locks_callback(unsigned long unused)
 {
-        struct list_head *liter, *n;
+        struct ldlm_lock *lock;
 
         spin_lock_bh(&waiting_locks_spinlock);
-        list_for_each_safe(liter, n, &waiting_locks_list) {
-                struct ldlm_lock *l = list_entry(liter, struct ldlm_lock,
-                                                 l_pending_chain);
-                if (l->l_callback_timeout > jiffies)
+        while (!list_empty(&waiting_locks_list)) {
+                lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
+                                  l_pending_chain);
+
+                if (lock->l_callback_timeout > jiffies)
                         break;
-                CERROR("lock timer expired, lock %p\n", l);
-                LDLM_DEBUG(l, "timer expired, recovering exp %p on conn %p",
-                           l->l_export, l->l_export->exp_connection);
-                recovd_conn_fail(l->l_export->exp_connection);
+
+                LDLM_ERROR(lock, "lock callback timer expired: evicting client "
+                           "%s@%s nid "LPU64,
+                           lock->l_export->exp_client_uuid.uuid,
+                           lock->l_export->exp_connection->c_remote_uuid.uuid,
+                           lock->l_export->exp_connection->c_peer.peer_nid);
+
+                spin_lock_bh(&expired_lock_thread.elt_lock);
+                list_del(&lock->l_pending_chain);
+                list_add(&lock->l_pending_chain,
+                         &expired_lock_thread.elt_expired_locks);
+                spin_unlock_bh(&expired_lock_thread.elt_lock);
+                wake_up(&expired_lock_thread.elt_waitq);
         }
+
         spin_unlock_bh(&waiting_locks_spinlock);
 }
 
@@ -80,8 +164,8 @@ static void waiting_locks_callback(unsigned long unused)
 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
 {
         unsigned long timeout_rounded;
-        ENTRY;
 
+        LDLM_DEBUG(lock, "adding to wait list");
         LASSERT(list_empty(&lock->l_pending_chain));
 
         spin_lock_bh(&waiting_locks_spinlock);
@@ -95,7 +179,9 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
         }
         list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
         spin_unlock_bh(&waiting_locks_spinlock);
-        RETURN(1);
+        /* We drop this ref when we get removed from the list. */
+        class_export_get(lock->l_export);
+        return 1;
 }
 
 /*
@@ -107,13 +193,18 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock)
 {
         struct list_head *list_next;
 
-        ENTRY;
+        if (lock->l_export == NULL) {
+                /* We don't have a "waiting locks list" on clients. */
+                LDLM_DEBUG(lock, "client lock: no-op");
+                return 0;
+        }
 
         spin_lock_bh(&waiting_locks_spinlock);
 
         if (list_empty(&lock->l_pending_chain)) {
                 spin_unlock_bh(&waiting_locks_spinlock);
-                RETURN(0);
+                LDLM_DEBUG(lock, "wasn't waiting");
+                return 0;
         }
 
         list_next = lock->l_pending_chain.next;
@@ -132,13 +223,39 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock)
         }
         list_del_init(&lock->l_pending_chain);
         spin_unlock_bh(&waiting_locks_spinlock);
+        /* We got this ref when we were added to the list. */
+        class_export_put(lock->l_export);
+        LDLM_DEBUG(lock, "removed");
+        return 1;
+}
+
+#else /* !__KERNEL__ */
+
+static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
+{
         RETURN(1);
 }
 
-static inline void ldlm_failed_ast(struct ldlm_lock *lock)
+int ldlm_del_waiting_lock(struct ldlm_lock *lock)
 {
-        /* XXX diagnostic */
-        recovd_conn_fail(lock->l_export->exp_connection);
+        RETURN(0);
+}
+
+#endif /* __KERNEL__ */
+
+static inline void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
+                                   char *ast_type)
+{
+        CERROR("%s AST failed (%d) for res "LPU64"/"LPU64
+               ", mode %s: evicting client %s@%s NID "LPU64"\n",
+               ast_type, rc,
+               lock->l_resource->lr_name.name[0],
+               lock->l_resource->lr_name.name[1],
+               ldlm_lockname[lock->l_granted_mode],
+               lock->l_export->exp_client_uuid.uuid,
+               lock->l_export->exp_connection->c_remote_uuid.uuid,
+               lock->l_export->exp_connection->c_peer.peer_nid);
+        ptlrpc_fail_export(lock->l_export);
 }
 
 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
@@ -171,12 +288,19 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                 RETURN(0);
         }
 
-        req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_import,
+#if 0
+        if (LTIME_S(CURRENT_TIME) - lock->l_export->exp_last_request_time > 30){
+                ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");
+                RETURN(-ETIMEDOUT);
+        }
+#endif
+
+        req = ptlrpc_prep_req(lock->l_export->exp_ldlm_data.led_import,
                               LDLM_BL_CALLBACK, 1, &size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
         memcpy(&body->lock_handle1, &lock->l_remote_handle,
                sizeof(body->lock_handle1));
         memcpy(&body->lock_desc, desc, sizeof(*desc));
@@ -188,14 +312,28 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
         req->rq_level = LUSTRE_CONN_RECOVD;
-        req->rq_timeout = 2;
+        req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
         rc = ptlrpc_queue_wait(req);
         if (rc == -ETIMEDOUT || rc == -EINTR) {
                 ldlm_del_waiting_lock(lock);
-                ldlm_failed_ast(lock);
+                ldlm_failed_ast(lock, rc, "blocking");
         } else if (rc) {
-                CERROR("client returned %d from blocking AST for lock %p\n",
-                       req->rq_status, lock);
+                if (rc == -EINVAL)
+                        CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
+                               "from blocking AST for lock %p--normal race\n",
+                               req->rq_connection->c_peer.peer_nid,
+                               req->rq_repmsg->status, lock);
+                else if (rc == -ENOTCONN)
+                        CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
+                               "from blocking AST for lock %p--this client was "
+                               "probably rebooted while it held a lock, nothing"
+                               " serious\n",req->rq_connection->c_peer.peer_nid,
+                               req->rq_repmsg->status, lock);
+                else
+                        CDEBUG(D_ERROR, "client (nid "LPU64") returned %d "
+                               "from blocking AST for lock %p\n",
+                               req->rq_connection->c_peer.peer_nid,
+                               req->rq_repmsg->status, lock);
                 LDLM_DEBUG(lock, "client returned error %d from blocking AST",
                            req->rq_status);
                 ldlm_lock_cancel(lock);
@@ -221,12 +359,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 RETURN(-EINVAL);
         }
 
-        req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_import,
+        req = ptlrpc_prep_req(lock->l_export->exp_ldlm_data.led_import,
                               LDLM_CP_CALLBACK, 1, &size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
         memcpy(&body->lock_handle1, &lock->l_remote_handle,
                sizeof(body->lock_handle1));
         body->lock_flags = flags;
@@ -236,11 +374,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         req->rq_replen = lustre_msg_size(0, NULL);
 
         req->rq_level = LUSTRE_CONN_RECOVD;
-        req->rq_timeout = 2;
+        req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
         rc = ptlrpc_queue_wait(req);
         if (rc == -ETIMEDOUT || rc == -EINTR) {
                 ldlm_del_waiting_lock(lock);
-                ldlm_failed_ast(lock);
+                ldlm_failed_ast(lock, rc, "completion");
         } else if (rc) {
                 CERROR("client returned %d from completion AST for lock %p\n",
                        req->rq_status, lock);
@@ -272,7 +410,13 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
 
         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
 
-        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+        dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
+                                      lustre_swab_ldlm_request);
+        if (dlm_req == NULL) {
+                CERROR ("Can't unpack dlm_req\n");
+                RETURN (-EFAULT);
+        }
+        
         flags = dlm_req->lock_flags;
         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN &&
             (flags & LDLM_FL_HAS_INTENT)) {
@@ -298,7 +442,8 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                                 &dlm_req->lock_handle2,
                                 dlm_req->lock_desc.l_resource.lr_name,
                                 dlm_req->lock_desc.l_resource.lr_type,
-                                dlm_req->lock_desc.l_req_mode, NULL, 0);
+                                dlm_req->lock_desc.l_req_mode,
+                                blocking_callback, NULL);
         if (!lock)
                 GOTO(out, err = -ENOMEM);
 
@@ -314,11 +459,11 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
         err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen,
-                                &flags, completion_callback, blocking_callback);
+                                &flags, completion_callback);
         if (err)
                 GOTO(out, err);
 
-        dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
+        dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
         dlm_rep->lock_flags = flags;
 
         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
@@ -358,13 +503,19 @@ int ldlm_handle_convert(struct ptlrpc_request *req)
         int rc, size = sizeof(*dlm_rep);
         ENTRY;
 
+        dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
+                                      lustre_swab_ldlm_request);
+        if (dlm_req == NULL) {
+                CERROR ("Can't unpack dlm_req\n");
+                RETURN (-EFAULT);
+        }
+        
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc) {
                 CERROR("out of memory\n");
                 RETURN(-ENOMEM);
         }
-        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
-        dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
+        dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
         dlm_rep->lock_flags = dlm_req->lock_flags;
 
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
@@ -396,21 +547,24 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         int rc;
         ENTRY;
 
+        dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
+                                      lustre_swab_ldlm_request);
+        if (dlm_req == NULL) {
+                CERROR("bad request buffer for cancel\n");
+                RETURN(-EFAULT);
+        }
+
         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc) {
                 CERROR("out of memory\n");
                 RETURN(-ENOMEM);
         }
-        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
-        if (!dlm_req) {
-                CERROR("bad request buffer for cancel\n");
-                RETURN(-EINVAL);
-        }
 
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
         if (!lock) {
-                CERROR("received cancel for unknown lock cookie "LPX64"\n",
-                       dlm_req->lock_handle1.cookie);
+                CERROR("received cancel for unknown lock cookie "LPX64
+                       " from nid "LPU64"\n", dlm_req->lock_handle1.cookie,
+                       req->rq_connection->c_peer.peer_nid);
                 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
                                   "(cookie "LPU64")",
                                   dlm_req->lock_handle1.cookie);
@@ -423,7 +577,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
                 req->rq_status = 0;
         }
 
-        if (ptlrpc_reply(req->rq_svc, req) != 0)
+        if (ptlrpc_reply(req) != 0)
                 LBUG();
 
         if (lock) {
@@ -443,32 +597,28 @@ static void ldlm_handle_bl_callback(struct ptlrpc_request *req,
         int do_ast;
         ENTRY;
 
-        /* Try to narrow down this damn iozone bug */
-        if (lock->l_resource == NULL)
-                CERROR("lock %p resource NULL\n", lock);
-        if (lock->l_resource->lr_type != LDLM_EXTENT)
-                if (lock->l_resource->lr_namespace != ns)
-                        CERROR("lock %p namespace %p != passed ns %p\n", lock,
-                               lock->l_resource->lr_namespace, ns);
+        l_lock(&ns->ns_lock);
         LDLM_DEBUG(lock, "client blocking AST callback handler START");
 
-        l_lock(&ns->ns_lock);
         lock->l_flags |= LDLM_FL_CBPENDING;
         do_ast = (!lock->l_readers && !lock->l_writers);
-        l_unlock(&ns->ns_lock);
 
         if (do_ast) {
                 LDLM_DEBUG(lock, "already unused, calling "
                            "callback (%p)", lock->l_blocking_ast);
-                if (lock->l_blocking_ast != NULL)
+                if (lock->l_blocking_ast != NULL) {
+                        l_unlock(&ns->ns_lock);
                         lock->l_blocking_ast(lock, &dlm_req->lock_desc,
                                              lock->l_data, LDLM_CB_BLOCKING);
+                        l_lock(&ns->ns_lock);
+                }
         } else {
                 LDLM_DEBUG(lock, "Lock still has references, will be"
                            " cancelled later");
         }
 
         LDLM_DEBUG(lock, "client blocking callback handler END");
+        l_unlock(&ns->ns_lock);
         LDLM_LOCK_PUT(lock);
         EXIT;
 }
@@ -481,9 +631,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
         LIST_HEAD(ast_list);
         ENTRY;
 
-        LDLM_DEBUG(lock, "client completion callback handler START");
-
         l_lock(&ns->ns_lock);
+        LDLM_DEBUG(lock, "client completion callback handler START");
 
         /* If we receive the completion AST before the actual enqueue returned,
          * then we might need to switch lock modes, resources, or extents. */
@@ -491,9 +640,22 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
                 LDLM_DEBUG(lock, "completion AST, new lock mode");
         }
-        if (lock->l_resource->lr_type == LDLM_EXTENT)
+        if (lock->l_resource->lr_type == LDLM_EXTENT) {
                 memcpy(&lock->l_extent, &dlm_req->lock_desc.l_extent,
                        sizeof(lock->l_extent));
+
+                if ((lock->l_extent.end & ~PAGE_MASK) != ~PAGE_MASK) {
+                        /* XXX Old versions of BA OST code have a fencepost bug
+                         * which will cause them to grant a lock that's one
+                         * byte too large.  This can be safely removed after BA
+                         * ships their next release -phik (02 Apr 2003) */
+                        lock->l_extent.end--;
+                } else if ((lock->l_extent.start & ~PAGE_MASK) ==
+                           ~PAGE_MASK) {
+                        lock->l_extent.start++;
+                }
+        }
+
         ldlm_resource_unlink_lock(lock);
         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
                    &lock->l_resource->lr_name,
@@ -505,8 +667,8 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
         lock->l_resource->lr_tmp = &ast_list;
         ldlm_grant_lock(lock, req, sizeof(*req));
         lock->l_resource->lr_tmp = NULL;
-        l_unlock(&ns->ns_lock);
         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
+        l_unlock(&ns->ns_lock);
         LDLM_LOCK_PUT(lock);
 
         ldlm_run_ast_work(&ast_list);
@@ -523,7 +685,7 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
                              &req->rq_repmsg);
         if (rc)
                 return rc;
-        return ptlrpc_reply(req->rq_svc, req);
+        return ptlrpc_reply(req);
 }
 
 static int ldlm_callback_handler(struct ptlrpc_request *req)
@@ -531,26 +693,29 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         struct ldlm_namespace *ns;
         struct ldlm_request *dlm_req;
         struct ldlm_lock *lock;
-        int rc;
         ENTRY;
 
-        rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
-        if (rc) {
-                CERROR("Invalid request: %d\n", rc);
-                RETURN(rc);
-        }
+        /* Requests arrive in sender's byte order.  The ptlrpc service
+         * handler has already checked and, if necessary, byte-swapped the
+         * incoming request message body, but I am responsible for the
+         * message buffers. */
 
         if (req->rq_export == NULL) {
                 struct ldlm_request *dlm_req;
 
-                CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
-                       req->rq_reqmsg->opc, req->rq_request_portal,
-                       req->rq_reply_portal);
-                CERROR("--> export addr: "LPX64", cookie: "LPX64"\n",
-                       req->rq_reqmsg->addr, req->rq_reqmsg->cookie);
-                dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
-                CERROR("--> lock addr: "LPX64", cookie: "LPX64"\n",
-                       dlm_req->lock_handle1.addr,dlm_req->lock_handle1.cookie);
+                CDEBUG(D_RPCTRACE, "operation %d from nid "LPU64" with bad "
+                       "export cookie "LPX64" (ptl req %d/rep %d); this is "
+                       "normal if this node rebooted with a lock held\n",
+                       req->rq_reqmsg->opc, req->rq_connection->c_peer.peer_nid,
+                       req->rq_reqmsg->handle.cookie,
+                       req->rq_request_portal, req->rq_reply_portal);
+
+                dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
+                                             lustre_swab_ldlm_request);
+                if (dlm_req != NULL)
+                        CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
+                               dlm_req->lock_handle1.cookie);
+
                 ldlm_callback_reply(req, -ENOTCONN);
                 RETURN(0);
         }
@@ -560,7 +725,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
         } else {
-                ldlm_callback_reply(req, -EIO);
+                ldlm_callback_reply(req, -EPROTO);
                 RETURN(0);
         }
 
@@ -569,7 +734,14 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         ns = req->rq_export->exp_obd->obd_namespace;
         LASSERT(ns != NULL);
 
-        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+        dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
+                                      lustre_swab_ldlm_request);
+        if (dlm_req == NULL) {
+                CERROR ("can't unpack dlm_req\n");
+                ldlm_callback_reply (req, -EPROTO);
+                RETURN (0);
+        }
+        
         lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
         if (!lock) {
                 CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
@@ -592,6 +764,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 CDEBUG(D_INODE, "completion ast\n");
                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
                 break;
+        default:
+                LBUG();                         /* checked above */
         }
 
         RETURN(0);
@@ -602,27 +776,28 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
         int rc;
         ENTRY;
 
-        rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
-        if (rc) {
-                CERROR("lustre_ldlm: Invalid request: %d\n", rc);
-                RETURN(rc);
-        }
+        /* Requests arrive in sender's byte order.  The ptlrpc service
+         * handler has already checked and, if necessary, byte-swapped the
+         * incoming request message body, but I am responsible for the
+         * message buffers. */
 
         if (req->rq_export == NULL) {
                 struct ldlm_request *dlm_req;
                 CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
                        req->rq_reqmsg->opc, req->rq_request_portal,
                        req->rq_reply_portal);
-                CERROR("--> export addr: "LPX64", cookie: "LPX64"\n",
-                       req->rq_reqmsg->addr, req->rq_reqmsg->cookie);
-                dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
-                ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
+                CERROR("--> export cookie: "LPX64"\n",
+                       req->rq_reqmsg->handle.cookie);
+                dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
+                                             lustre_swab_ldlm_request);
+                if (dlm_req != NULL)
+                        ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
                 RETURN(-ENOTCONN);
         }
 
         switch (req->rq_reqmsg->opc) {
 
-        /* XXX FIXME move this back to mds/handler.c, bug 625069 */
+        /* XXX FIXME move this back to mds/handler.c, bug 249 */
         case LDLM_CANCEL:
                 CDEBUG(D_INODE, "cancel\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
@@ -696,11 +871,18 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
                 RETURN(rc);
 
 #ifdef __KERNEL__
+        inter_module_register("ldlm_cli_cancel_unused", THIS_MODULE,
+                              ldlm_cli_cancel_unused);
+        inter_module_register("ldlm_namespace_cleanup", THIS_MODULE,
+                              ldlm_namespace_cleanup);
+        inter_module_register("ldlm_replay_locks", THIS_MODULE,
+                              ldlm_replay_locks);
+
         ldlm->ldlm_cb_service =
                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
                                 LDLM_MAXREQSIZE, LDLM_CB_REQUEST_PORTAL,
                                 LDLM_CB_REPLY_PORTAL,
-                                ldlm_callback_handler, "ldlm_cbd");
+                                ldlm_callback_handler, "ldlm_cbd", obddev);
 
         if (!ldlm->ldlm_cb_service) {
                 CERROR("failed to start service\n");
@@ -711,7 +893,7 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
                                 LDLM_MAXREQSIZE, LDLM_CANCEL_REQUEST_PORTAL,
                                 LDLM_CANCEL_REPLY_PORTAL,
-                                ldlm_cancel_handler, "ldlm_canceld");
+                                ldlm_cancel_handler, "ldlm_canceld", obddev);
 
         if (!ldlm->ldlm_cancel_service) {
                 CERROR("failed to start service\n");
@@ -741,12 +923,26 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
                 }
         }
 
-#endif
+        INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
+        spin_lock_init(&expired_lock_thread.elt_lock);
+        expired_lock_thread.elt_state = ELT_STOPPED;
+        init_waitqueue_head(&expired_lock_thread.elt_waitq);
+
+        rc = kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FS);
+        if (rc < 0) {
+                CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
+                GOTO(out_thread, rc);
+        }
+
+        wait_event(expired_lock_thread.elt_waitq,
+                   expired_lock_thread.elt_state == ELT_READY);
+
         INIT_LIST_HEAD(&waiting_locks_list);
         spin_lock_init(&waiting_locks_spinlock);
         waiting_locks_timer.function = waiting_locks_callback;
         waiting_locks_timer.data = 0;
         init_timer(&waiting_locks_timer);
+#endif
 
         ldlm_already_setup = 1;
 
@@ -765,30 +961,49 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
         return rc;
 }
 
-static int ldlm_cleanup(struct obd_device *obddev)
+static int ldlm_cleanup(struct obd_device *obddev, int force, int failover)
 {
         struct ldlm_obd *ldlm = &obddev->u.ldlm;
         ENTRY;
 
         if (!list_empty(&ldlm_namespace_list)) {
                 CERROR("ldlm still has namespaces; clean these up first.\n");
+                ldlm_dump_all_namespaces();
                 RETURN(-EBUSY);
         }
 
 #ifdef __KERNEL__
+        if (force) {
+                ptlrpc_put_ldlm_hooks();
+        } else if (ptlrpc_ldlm_hooks_referenced()) {
+                CERROR("Some connections weren't cleaned up; run lconf with "
+                       "--force to forcibly unload.\n");
+                ptlrpc_dump_connections();
+                RETURN(-EBUSY);
+        }
+
         ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
         ptlrpc_unregister_service(ldlm->ldlm_cb_service);
         ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
         ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
         ldlm_proc_cleanup(obddev);
+
+        expired_lock_thread.elt_state = ELT_TERMINATE;
+        wake_up(&expired_lock_thread.elt_waitq);
+        wait_event(expired_lock_thread.elt_waitq,
+                   expired_lock_thread.elt_state == ELT_STOPPED);
+
+        inter_module_unregister("ldlm_namespace_cleanup");
+        inter_module_unregister("ldlm_cli_cancel_unused");
+        inter_module_unregister("ldlm_replay_locks");
 #endif
+
         ldlm_already_setup = 0;
         RETURN(0);
 }
 
 static int ldlm_connect(struct lustre_handle *conn, struct obd_device *src,
-                        struct obd_uuid *cluuid, struct recovd_obd *recovd,
-                        ptlrpc_recovery_cb_t recover)
+                        struct obd_uuid *cluuid)
 {
         return class_connect(conn, src, cluuid);
 }
@@ -896,6 +1111,18 @@ EXPORT_SYMBOL(ldlm_namespace_dump);
 EXPORT_SYMBOL(l_lock);
 EXPORT_SYMBOL(l_unlock);
 
+/* ldlm_lib.c */
+EXPORT_SYMBOL(client_import_connect);
+EXPORT_SYMBOL(client_import_disconnect);
+EXPORT_SYMBOL(target_abort_recovery);
+EXPORT_SYMBOL(target_handle_connect);
+EXPORT_SYMBOL(target_cancel_recovery_timer);
+EXPORT_SYMBOL(target_send_reply);
+EXPORT_SYMBOL(target_queue_recovery_request);
+EXPORT_SYMBOL(target_handle_ping);
+EXPORT_SYMBOL(target_handle_disconnect);
+EXPORT_SYMBOL(target_queue_final_reply);
+
 #ifdef __KERNEL__
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");