Whamcloud - gitweb
r=adilger,phil
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 016a471..6602713 100644 (file)
@@ -21,7 +21,9 @@
  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
-#define EXPORT_SYMTAB
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
 #define DEBUG_SUBSYSTEM S_LDLM
 
 #ifdef __KERNEL__
@@ -35,6 +37,9 @@
 
 #include <linux/lustre_dlm.h>
 #include <linux/obd_class.h>
+#include <portals/list.h>
+#include "ldlm_internal.h"
+
 extern kmem_cache_t *ldlm_resource_slab;
 extern kmem_cache_t *ldlm_lock_slab;
 extern struct lustre_lock ldlm_handle_lock;
@@ -42,15 +47,19 @@ extern struct list_head ldlm_namespace_list;
 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
 
-static int ldlm_already_setup = 0;
+static DECLARE_MUTEX(ldlm_ref_sem);
+static int ldlm_refcount = 0;
 
-#ifdef __KERNEL__
+/* LDLM state */
+
+static struct ldlm_state *ldlm_state;
 
 inline unsigned long round_timeout(unsigned long timeout)
 {
         return ((timeout / HZ) + 1) * HZ;
 }
 
+#ifdef __KERNEL__
 /* XXX should this be per-ldlm? */
 static struct list_head waiting_locks_list;
 static spinlock_t waiting_locks_spinlock;
@@ -62,11 +71,29 @@ static struct expired_lock_thread {
         struct list_head          elt_expired_locks;
         spinlock_t                elt_lock;
 } expired_lock_thread;
+#endif
 
 #define ELT_STOPPED   0
 #define ELT_READY     1
 #define ELT_TERMINATE 2
 
+struct ldlm_bl_pool {
+        spinlock_t              blp_lock;
+        struct list_head        blp_list;
+        wait_queue_head_t       blp_waitq;
+        atomic_t                blp_num_threads;
+        struct completion       blp_comp;
+};
+
+struct ldlm_bl_work_item {
+        struct list_head        blwi_entry;
+        struct ldlm_namespace   *blwi_ns;
+        struct ldlm_lock_desc   blwi_ld;
+        struct ldlm_lock        *blwi_lock;
+};
+
+#ifdef __KERNEL__
+
 static inline int have_expired_locks(void)
 {
         int need_to_run;
@@ -87,18 +114,19 @@ static int expired_lock_main(void *arg)
         ENTRY;
         lock_kernel();
         kportal_daemonize("ldlm_elt");
-        
+
         SIGNAL_MASK_LOCK(current, flags);
         sigfillset(&current->blocked);
         RECALC_SIGPENDING;
         SIGNAL_MASK_UNLOCK(current, flags);
-        
+
         unlock_kernel();
-        
+
         expired_lock_thread.elt_state = ELT_READY;
         wake_up(&expired_lock_thread.elt_waitq);
-        
+
         while (1) {
+                struct list_head *tmp, *n, work_list;
                 l_wait_event(expired_lock_thread.elt_waitq,
                              have_expired_locks() ||
                              expired_lock_thread.elt_state == ELT_TERMINATE,
@@ -106,12 +134,32 @@ static int expired_lock_main(void *arg)
 
                 spin_lock_bh(&expired_lock_thread.elt_lock);
                 while (!list_empty(expired)) {
-                        struct ldlm_lock *lock = list_entry(expired->next,
-                                                            struct ldlm_lock,
-                                                            l_pending_chain);
+                        struct ldlm_lock *lock;
+
+                        list_add(&work_list, expired);
+                        list_del_init(expired);
+
+                        list_for_each_entry(lock, &work_list, l_pending_chain) {
+                                LDLM_DEBUG(lock, "moving to work list");
+                        }
+
                         spin_unlock_bh(&expired_lock_thread.elt_lock);
-                        
-                        ptlrpc_fail_export(lock->l_export);
+
+
+                        list_for_each_safe(tmp, n, &work_list) {
+                                 lock = list_entry(tmp, struct ldlm_lock,
+                                                   l_pending_chain);
+                                 ptlrpc_fail_export(lock->l_export);
+                        }
+
+
+                        if (!list_empty(&work_list)) {
+                                list_for_each_entry(lock, &work_list, l_pending_chain) {
+                                        LDLM_ERROR(lock, "still on work list!");
+                                }
+                        }
+                        LASSERTF (list_empty(&work_list),
+                                  "some exports not failed properly\n");
 
                         spin_lock_bh(&expired_lock_thread.elt_lock);
                 }
@@ -129,6 +177,7 @@ static int expired_lock_main(void *arg)
 static void waiting_locks_callback(unsigned long unused)
 {
         struct ldlm_lock *lock;
+        char str[PTL_NALFMT_SIZE];
 
         spin_lock_bh(&waiting_locks_spinlock);
         while (!list_empty(&waiting_locks_list)) {
@@ -139,10 +188,13 @@ static void waiting_locks_callback(unsigned long unused)
                         break;
 
                 LDLM_ERROR(lock, "lock callback timer expired: evicting client "
-                           "%s@%s nid "LPU64,
+                           "%s@%s nid "LPX64" (%s) ",
                            lock->l_export->exp_client_uuid.uuid,
                            lock->l_export->exp_connection->c_remote_uuid.uuid,
-                           lock->l_export->exp_connection->c_peer.peer_nid);
+                           lock->l_export->exp_connection->c_peer.peer_nid,
+                           portals_nid2str(lock->l_export->exp_connection->c_peer.peer_ni->pni_number,
+                                           lock->l_export->exp_connection->c_peer.peer_nid,
+                                           str));
 
                 spin_lock_bh(&expired_lock_thread.elt_lock);
                 list_del(&lock->l_pending_chain);
@@ -152,6 +204,17 @@ static void waiting_locks_callback(unsigned long unused)
                 wake_up(&expired_lock_thread.elt_waitq);
         }
 
+        /*
+         * Make sure the timer will fire again if we have any locks
+         * left.
+         */
+        if (!list_empty(&waiting_locks_list)) {
+                unsigned long timeout_rounded;
+                lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
+                                  l_pending_chain);
+                timeout_rounded = round_timeout(lock->l_callback_timeout);
+                mod_timer(&waiting_locks_timer, timeout_rounded);
+        }
         spin_unlock_bh(&waiting_locks_spinlock);
 }
 
@@ -165,10 +228,14 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
 {
         unsigned long timeout_rounded;
 
+        spin_lock_bh(&waiting_locks_spinlock);
+        if (!list_empty(&lock->l_pending_chain)) {
+                LDLM_DEBUG(lock, "not re-adding to wait list");
+                spin_unlock_bh(&waiting_locks_spinlock);
+                return 0;
+        }
         LDLM_DEBUG(lock, "adding to wait list");
-        LASSERT(list_empty(&lock->l_pending_chain));
 
-        spin_lock_bh(&waiting_locks_spinlock);
         lock->l_callback_timeout = jiffies + (obd_timeout * HZ / 2);
 
         timeout_rounded = round_timeout(lock->l_callback_timeout);
@@ -219,7 +286,11 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock)
                                   round_timeout(next->l_callback_timeout));
                 }
         }
+
+        spin_lock_bh(&expired_lock_thread.elt_lock);
         list_del_init(&lock->l_pending_chain);
+        spin_unlock_bh(&expired_lock_thread.elt_lock);
+
         spin_unlock_bh(&waiting_locks_spinlock);
         LDLM_DEBUG(lock, "removed");
         return 1;
@@ -241,15 +312,19 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock)
 
 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, char *ast_type)
 {
+        const struct ptlrpc_connection *conn = lock->l_export->exp_connection;
+        char str[PTL_NALFMT_SIZE];
+
         CERROR("%s AST failed (%d) for res "LPU64"/"LPU64
-               ", mode %s: evicting client %s@%s NID "LPU64"\n",
+               ", mode %s: evicting client %s@%s NID "LPX64" (%s)\n",
                ast_type, rc,
                lock->l_resource->lr_name.name[0],
                lock->l_resource->lr_name.name[1],
                ldlm_lockname[lock->l_granted_mode],
                lock->l_export->exp_client_uuid.uuid,
-               lock->l_export->exp_connection->c_remote_uuid.uuid,
-               lock->l_export->exp_connection->c_peer.peer_nid);
+               conn->c_remote_uuid.uuid, conn->c_peer.peer_nid,
+               portals_nid2str(conn->c_peer.peer_ni->pni_number,
+                               conn->c_peer.peer_nid, str));
         ptlrpc_fail_export(lock->l_export);
 }
 
@@ -270,11 +345,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         LASSERT(lock);
 
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
-        /* XXX This is necessary because, with the lock re-tasking, we actually
-         * _can_ get called in here twice.  (bug 830) */
-        if (!list_empty(&lock->l_pending_chain)) {
+        if (lock->l_granted_mode != lock->l_req_mode) {
+                /* this blocking AST will be communicated as part of the
+                 * completion AST instead */
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-                RETURN(0);
+                LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");                 RETURN(0);
         }
 
         if (lock->l_destroyed) {
@@ -286,51 +361,67 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
 #if 0
         if (LTIME_S(CURRENT_TIME) - lock->l_export->exp_last_request_time > 30){
                 ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");
+                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 RETURN(-ETIMEDOUT);
         }
 #endif
 
-        req = ptlrpc_prep_req(lock->l_export->exp_ldlm_data.led_import,
+        req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
                               LDLM_BL_CALLBACK, 1, &size, NULL);
-        if (!req)
+        if (req == NULL) {
+                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 RETURN(-ENOMEM);
+        }
 
         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
         memcpy(&body->lock_handle1, &lock->l_remote_handle,
                sizeof(body->lock_handle1));
         memcpy(&body->lock_desc, desc, sizeof(*desc));
+        body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
 
         LDLM_DEBUG(lock, "server preparing blocking AST");
         req->rq_replen = lustre_msg_size(0, NULL);
 
-        ldlm_add_waiting_lock(lock);
+        if (lock->l_granted_mode == lock->l_req_mode)
+                ldlm_add_waiting_lock(lock);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
-        req->rq_level = LUSTRE_CONN_RECOVER;
+        req->rq_send_state = LUSTRE_IMP_FULL;
         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
         rc = ptlrpc_queue_wait(req);
-        if (rc == -ETIMEDOUT || rc == -EINTR) {
-                ldlm_del_waiting_lock(lock);
-                ldlm_failed_ast(lock, rc, "blocking");
+        if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
+                LASSERT(lock->l_export);
+                if (lock->l_export->exp_libclient) {
+                        CDEBUG(D_HA, "BLOCKING AST to liblustre client (nid "
+                               LPU64") timeout, simply cancel lock 0x%p\n",
+                               req->rq_peer.peer_nid, lock);
+                        ldlm_lock_cancel(lock);
+                        rc = -ERESTART;
+                } else {
+                        ldlm_del_waiting_lock(lock);
+                        ldlm_failed_ast(lock, rc, "blocking");
+                }
         } else if (rc) {
                 if (rc == -EINVAL)
                         CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
                                "from blocking AST for lock %p--normal race\n",
-                               req->rq_connection->c_peer.peer_nid,
+                               req->rq_peer.peer_nid,
                                req->rq_repmsg->status, lock);
                 else if (rc == -ENOTCONN)
                         CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
                                "from blocking AST for lock %p--this client was "
                                "probably rebooted while it held a lock, nothing"
-                               " serious\n",req->rq_connection->c_peer.peer_nid,
+                               " serious\n",req->rq_peer.peer_nid,
                                req->rq_repmsg->status, lock);
                 else
                         CDEBUG(D_ERROR, "client (nid "LPU64") returned %d "
                                "from blocking AST for lock %p\n",
-                               req->rq_connection->c_peer.peer_nid,
-                               req->rq_repmsg->status, lock);
-                LDLM_DEBUG(lock, "client returned error %d from blocking AST",
-                           req->rq_status);
+                               req->rq_peer.peer_nid,
+                               (req->rq_repmsg != NULL)?
+                               req->rq_repmsg->status : 0,
+                               lock);
+                LDLM_DEBUG(lock, "client sent rc %d rq_status %d from blocking "
+                           "AST", rc, req->rq_status);
                 ldlm_lock_cancel(lock);
                 /* Server-side AST functions are called from ldlm_reprocess_all,
                  * which needs to be told to please restart its reprocessing. */
@@ -355,13 +446,10 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         struct ptlrpc_request *req;
         struct timeval granted_time;
         long total_enqueue_wait;
-        int rc = 0, size = sizeof(*body);
+        int rc = 0, size[2] = {sizeof(*body)}, buffers = 1;
         ENTRY;
 
-        if (lock == NULL) {
-                LBUG();
-                RETURN(-EINVAL);
-        }
+        LASSERT(lock != NULL);
 
         do_gettimeofday(&granted_time);
         total_enqueue_wait = timeval_sub(&granted_time, &lock->l_enqueued_time);
@@ -369,9 +457,14 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (total_enqueue_wait / 1000000 > obd_timeout)
                 LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait);
 
-        req = ptlrpc_prep_req(lock->l_export->exp_ldlm_data.led_import,
-                              LDLM_CP_CALLBACK, 1, &size, NULL);
-        if (!req)
+        if (lock->l_resource->lr_lvb_len) {
+                buffers = 2;
+                size[1] = lock->l_resource->lr_lvb_len;
+        }
+
+        req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
+                              LDLM_CP_CALLBACK, buffers, size, NULL);
+        if (req == NULL)
                 RETURN(-ENOMEM);
 
         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
@@ -380,21 +473,38 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         body->lock_flags = flags;
         ldlm_lock2desc(lock, &body->lock_desc);
 
+        if (buffers == 2) {
+                void *lvb = lustre_msg_buf(req->rq_reqmsg, 1,
+                                           lock->l_resource->lr_lvb_len);
+                memcpy(lvb, lock->l_resource->lr_lvb_data,
+                       lock->l_resource->lr_lvb_len);
+        }
+
         LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
                    total_enqueue_wait);
         req->rq_replen = lustre_msg_size(0, NULL);
 
-        req->rq_level = LUSTRE_CONN_RECOVER;
+        req->rq_send_state = LUSTRE_IMP_FULL;
         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
+
+        /* We only send real blocking ASTs after the lock is granted */
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        if (lock->l_flags & LDLM_FL_AST_SENT) {
+                body->lock_flags |= LDLM_FL_AST_SENT;
+                ldlm_add_waiting_lock(lock); /* start the lock-timeout clock */
+        }
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
         rc = ptlrpc_queue_wait(req);
-        if (rc == -ETIMEDOUT || rc == -EINTR) {
+        if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
                 ldlm_del_waiting_lock(lock);
                 ldlm_failed_ast(lock, rc, "completion");
+        } else if (rc == -EINVAL) {
+                LDLM_DEBUG(lock, "lost the race -- client no longer has this "
+                           "lock");
         } else if (rc) {
-                CERROR("client returned %d from completion AST for lock %p\n",
-                       req->rq_status, lock);
-                LDLM_DEBUG(lock, "client returned error %d from completion AST",
-                           req->rq_status);
+                LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
+                           "completion AST", rc, req->rq_status);
                 ldlm_lock_cancel(lock);
                 /* Server-side AST functions are called from ldlm_reprocess_all,
                  * which needs to be told to please restart its reprocessing. */
@@ -405,16 +515,64 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         RETURN(rc);
 }
 
+int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
+{
+        struct ldlm_resource *res = lock->l_resource;
+        struct ldlm_request *body;
+        struct ptlrpc_request *req;
+        int rc = 0, size = sizeof(*body);
+        ENTRY;
+
+        LASSERT(lock != NULL);
+
+        req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
+                              LDLM_GL_CALLBACK, 1, &size, NULL);
+        if (req == NULL)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+        memcpy(&body->lock_handle1, &lock->l_remote_handle,
+               sizeof(body->lock_handle1));
+        ldlm_lock2desc(lock, &body->lock_desc);
+
+        size = lock->l_resource->lr_lvb_len;
+        req->rq_replen = lustre_msg_size(1, &size);
+
+        req->rq_send_state = LUSTRE_IMP_FULL;
+        req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
+
+        rc = ptlrpc_queue_wait(req);
+        if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
+                ldlm_del_waiting_lock(lock);
+                ldlm_failed_ast(lock, rc, "glimpse");
+        } else if (rc == -EINVAL) {
+                LDLM_DEBUG(lock, "lost the race -- client no longer has this "
+                           "lock");
+        } else if (rc == -ELDLM_NO_LOCK_DATA) {
+                LDLM_DEBUG(lock, "lost a race -- client has a lock, but no "
+                           "inode");
+        } else if (rc) {
+                LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
+                           "glimpse AST", rc, req->rq_status);
+        } else {
+                rc = res->lr_namespace->ns_lvbo->lvbo_update
+                        (res, req->rq_repmsg, 0, 1);
+        }
+        ptlrpc_req_finished(req);
+        RETURN(rc);
+}
+
 int ldlm_handle_enqueue(struct ptlrpc_request *req,
                         ldlm_completion_callback completion_callback,
-                        ldlm_blocking_callback blocking_callback)
+                        ldlm_blocking_callback blocking_callback,
+                        ldlm_glimpse_callback glimpse_callback)
 {
         struct obd_device *obddev = req->rq_export->exp_obd;
         struct ldlm_reply *dlm_rep;
         struct ldlm_request *dlm_req;
-        int rc, size = sizeof(*dlm_rep), cookielen = 0;
+        int rc = 0, size[2] = {sizeof(*dlm_rep)};
         __u32 flags;
-        ldlm_error_t err;
+        ldlm_error_t err = ELDLM_OK;
         struct ldlm_lock *lock = NULL;
         void *cookie = NULL;
         ENTRY;
@@ -425,38 +583,20 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                                       lustre_swab_ldlm_request);
         if (dlm_req == NULL) {
                 CERROR ("Can't unpack dlm_req\n");
-                RETURN (-EFAULT);
+                GOTO(out, rc = -EFAULT);
         }
-        
+
         flags = dlm_req->lock_flags;
-        if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN &&
-            (flags & LDLM_FL_HAS_INTENT)) {
-                /* In this case, the reply buffer is allocated deep in
-                 * local_lock_enqueue by the policy function. */
-                cookie = req;
-                cookielen = sizeof(*req);
-        } else {
-                rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
-                                     &req->rq_repmsg);
-                if (rc) {
-                        CERROR("out of memory\n");
-                        RETURN(-ENOMEM);
-                }
-                if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
-                        cookie = &dlm_req->lock_desc.l_extent;
-                        cookielen = sizeof(struct ldlm_extent);
-                }
-        }
 
         /* The lock's callback data might be set in the policy function */
-        lock = ldlm_lock_create(obddev->obd_namespace,
-                                &dlm_req->lock_handle2,
+        lock = ldlm_lock_create(obddev->obd_namespace, &dlm_req->lock_handle2,
                                 dlm_req->lock_desc.l_resource.lr_name,
                                 dlm_req->lock_desc.l_resource.lr_type,
                                 dlm_req->lock_desc.l_req_mode,
-                                blocking_callback, NULL);
+                                blocking_callback, completion_callback,
+                                glimpse_callback, NULL, 0);
         if (!lock)
-                GOTO(out, err = -ENOMEM);
+                GOTO(out, rc = -ENOMEM);
 
         do_gettimeofday(&lock->l_enqueued_time);
         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
@@ -470,41 +610,79 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                  &lock->l_export->exp_ldlm_data.led_held_locks);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
-        err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen,
-                                &flags, completion_callback);
+        if (flags & LDLM_FL_HAS_INTENT) {
+                /* In this case, the reply buffer is allocated deep in
+                 * local_lock_enqueue by the policy function. */
+                cookie = req;
+        } else {
+                int buffers = 1;
+                if (lock->l_resource->lr_lvb_len) {
+                        size[1] = lock->l_resource->lr_lvb_len;
+                        buffers = 2;
+                }
+
+                rc = lustre_pack_reply(req, buffers, size, NULL);
+                if (rc)
+                        GOTO(out, rc);
+        }
+
+        if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
+                memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
+                       sizeof(ldlm_policy_data_t));
+        if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
+                memcpy(&lock->l_req_extent, &lock->l_policy_data.l_extent,
+                       sizeof(lock->l_req_extent));
+
+        err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, &flags);
         if (err)
                 GOTO(out, err);
 
         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
         dlm_rep->lock_flags = flags;
 
+        ldlm_lock2desc(lock, &dlm_rep->lock_desc);
         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
-        if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
-                memcpy(&dlm_rep->lock_extent, &lock->l_extent,
-                       sizeof(lock->l_extent));
-        if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
-                memcpy(&dlm_rep->lock_resource_name, &lock->l_resource->lr_name,
-                       sizeof(dlm_rep->lock_resource_name));
-                dlm_rep->lock_mode = lock->l_req_mode;
+
+        /* We never send a blocking AST until the lock is granted, but
+         * we can tell it right now */
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        if (lock->l_flags & LDLM_FL_AST_SENT) {
+                dlm_rep->lock_flags |= LDLM_FL_AST_SENT;
+                if (lock->l_granted_mode == lock->l_req_mode)
+                        ldlm_add_waiting_lock(lock);
         }
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
         EXIT;
  out:
-        if (lock)
-                LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
-                           "(err=%d)", err);
         req->rq_status = err;
+        if (req->rq_reply_state == NULL) {
+                err = lustre_pack_reply(req, 0, NULL, NULL);
+                if (rc == 0)
+                        rc = err;
+        }
 
         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
         if (lock) {
-                if (!err)
+                LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
+                           "(err=%d, rc=%d)", err, rc);
+
+                if (lock->l_resource->lr_lvb_len > 0) {
+                        void *lvb = lustre_msg_buf(req->rq_repmsg, 1,
+                                                  lock->l_resource->lr_lvb_len);
+                        memcpy(lvb, lock->l_resource->lr_lvb_data,
+                               lock->l_resource->lr_lvb_len);
+                }
+
+                if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
                         ldlm_reprocess_all(lock->l_resource);
                 LDLM_LOCK_PUT(lock);
         }
-        LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
+        LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
+                          lock, rc);
 
-        return 0;
+        return rc;
 }
 
 int ldlm_handle_convert(struct ptlrpc_request *req)
@@ -521,8 +699,8 @@ int ldlm_handle_convert(struct ptlrpc_request *req)
                 CERROR ("Can't unpack dlm_req\n");
                 RETURN (-EFAULT);
         }
-        
-        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+
+        rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc) {
                 CERROR("out of memory\n");
                 RETURN(-ENOMEM);
@@ -556,17 +734,19 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
         struct ldlm_lock *lock;
+        struct ldlm_resource *res;
+        char str[PTL_NALFMT_SIZE];
         int rc;
         ENTRY;
 
-        dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
+        dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
                                       lustre_swab_ldlm_request);
         if (dlm_req == NULL) {
                 CERROR("bad request buffer for cancel\n");
                 RETURN(-EFAULT);
         }
 
-        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        rc = lustre_pack_reply(req, 0, NULL, NULL);
         if (rc) {
                 CERROR("out of memory\n");
                 RETURN(-ENOMEM);
@@ -575,18 +755,30 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
         if (!lock) {
                 CERROR("received cancel for unknown lock cookie "LPX64
-                       " from nid "LPU64"\n", dlm_req->lock_handle1.cookie,
-                       req->rq_connection->c_peer.peer_nid);
+                       " from client %s nid "LPX64" (%s)\n",
+                       dlm_req->lock_handle1.cookie,
+                       req->rq_export->exp_client_uuid.uuid,
+                       req->rq_peer.peer_nid,
+                       portals_nid2str(req->rq_peer.peer_ni->pni_number,
+                                       req->rq_peer.peer_nid, str));
                 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
                                   "(cookie "LPU64")",
                                   dlm_req->lock_handle1.cookie);
                 req->rq_status = ESTALE;
         } else {
                 LDLM_DEBUG(lock, "server-side cancel handler START");
+                res = lock->l_resource;
+                if (res && res->lr_namespace->ns_lvbo &&
+                    res->lr_namespace->ns_lvbo->lvbo_update) {
+                        (void)res->lr_namespace->ns_lvbo->lvbo_update
+                                (res, NULL, 0, 0);
+                                //(res, req->rq_reqmsg, 1);
+                }
+
                 ldlm_lock_cancel(lock);
                 if (ldlm_del_waiting_lock(lock))
                         CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
-                req->rq_status = 0;
+                req->rq_status = rc;
         }
 
         if (ptlrpc_reply(req) != 0)
@@ -601,9 +793,8 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static void ldlm_handle_bl_callback(struct ptlrpc_request *req,
-                                    struct ldlm_namespace *ns,
-                                    struct ldlm_request *dlm_req,
+static void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
+                                    struct ldlm_lock_desc *ld,
                                     struct ldlm_lock *lock)
 {
         int do_ast;
@@ -620,8 +811,9 @@ static void ldlm_handle_bl_callback(struct ptlrpc_request *req,
                            "callback (%p)", lock->l_blocking_ast);
                 if (lock->l_blocking_ast != NULL) {
                         l_unlock(&ns->ns_lock);
-                        lock->l_blocking_ast(lock, &dlm_req->lock_desc,
-                                             lock->l_data, LDLM_CB_BLOCKING);
+                        l_check_no_ns_lock(ns);
+                        lock->l_blocking_ast(lock, ld, lock->l_ast_data,
+                                             LDLM_CB_BLOCKING);
                         l_lock(&ns->ns_lock);
                 }
         } else {
@@ -652,9 +844,12 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
                 LDLM_DEBUG(lock, "completion AST, new lock mode");
         }
-        if (lock->l_resource->lr_type == LDLM_EXTENT)
-                memcpy(&lock->l_extent, &dlm_req->lock_desc.l_extent,
-                       sizeof(lock->l_extent));
+
+        if (lock->l_resource->lr_type != LDLM_PLAIN) {
+                memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
+                       sizeof(lock->l_policy_data));
+                LDLM_DEBUG(lock, "completion AST, new policy data");
+        }
 
         ldlm_resource_unlink_lock(lock);
         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
@@ -664,35 +859,121 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                                          dlm_req->lock_desc.l_resource.lr_name);
                 LDLM_DEBUG(lock, "completion AST, new resource");
         }
+
+        if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
+                lock->l_flags |= LDLM_FL_CBPENDING;
+                LDLM_DEBUG(lock, "completion AST includes blocking AST");
+        }
+
+        if (lock->l_lvb_len) {
+                void *lvb;
+                lvb = lustre_swab_reqbuf(req, 1, lock->l_lvb_len,
+                                         lock->l_lvb_swabber);
+                if (lvb == NULL) {
+                        LDLM_ERROR(lock, "completion AST did not contain "
+                                   "expected LVB!");
+                } else {
+                        memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
+                }
+        }
+
         lock->l_resource->lr_tmp = &ast_list;
-        ldlm_grant_lock(lock, req, sizeof(*req));
+        ldlm_grant_lock(lock, req, sizeof(*req), 1);
         lock->l_resource->lr_tmp = NULL;
         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
         l_unlock(&ns->ns_lock);
         LDLM_LOCK_PUT(lock);
 
-        ldlm_run_ast_work(&ast_list);
+        ldlm_run_ast_work(ns, &ast_list);
 
         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
                           lock);
         EXIT;
 }
 
+static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
+                                    struct ldlm_namespace *ns,
+                                    struct ldlm_request *dlm_req,
+                                    struct ldlm_lock *lock)
+{
+        int rc = -ENOSYS;
+        ENTRY;
+
+        l_lock(&ns->ns_lock);
+        LDLM_DEBUG(lock, "client glimpse AST callback handler");
+
+        if (lock->l_glimpse_ast != NULL) {
+                l_unlock(&ns->ns_lock);
+                l_check_no_ns_lock(ns);
+                rc = lock->l_glimpse_ast(lock, req);
+                l_lock(&ns->ns_lock);
+        }
+
+        if (req->rq_repmsg != NULL) {
+                ptlrpc_reply(req);
+        } else {
+                req->rq_status = rc;
+                ptlrpc_error(req);
+        }
+
+        if (lock->l_granted_mode == LCK_PW &&
+            !lock->l_readers && !lock->l_writers &&
+            time_after(jiffies, lock->l_last_used + 10 * HZ)) {
+                l_unlock(&ns->ns_lock);
+                ldlm_handle_bl_callback(ns, NULL, lock);
+                EXIT;
+                return;
+        }
+
+        l_unlock(&ns->ns_lock);
+        LDLM_LOCK_PUT(lock);
+        EXIT;
+}
+
 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
 {
         req->rq_status = rc;
-        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
-                             &req->rq_repmsg);
-        if (rc)
-                return rc;
+        if (req->rq_reply_state == NULL) {
+                rc = lustre_pack_reply(req, 0, NULL, NULL);
+                if (rc)
+                        return rc;
+        }
         return ptlrpc_reply(req);
 }
 
+#ifdef __KERNEL__
+int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+                      struct ldlm_lock *lock)
+{
+        struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
+        struct ldlm_bl_work_item *blwi;
+        ENTRY;
+
+        OBD_ALLOC(blwi, sizeof(*blwi));
+        if (blwi == NULL)
+                RETURN(-ENOMEM);
+
+        blwi->blwi_ns = ns;
+        if (ld != NULL)
+                blwi->blwi_ld = *ld;
+        blwi->blwi_lock = lock;
+
+        spin_lock(&blp->blp_lock);
+        list_add_tail(&blwi->blwi_entry, &blp->blp_list);
+        wake_up(&blp->blp_waitq);
+        spin_unlock(&blp->blp_lock);
+
+        RETURN(0);
+}
+#endif
+
 static int ldlm_callback_handler(struct ptlrpc_request *req)
 {
         struct ldlm_namespace *ns;
         struct ldlm_request *dlm_req;
         struct ldlm_lock *lock;
+        char str[PTL_NALFMT_SIZE];
+        int rc;
         ENTRY;
 
         /* Requests arrive in sender's byte order.  The ptlrpc service
@@ -703,10 +984,12 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         if (req->rq_export == NULL) {
                 struct ldlm_request *dlm_req;
 
-                CDEBUG(D_RPCTRACE, "operation %d from nid "LPU64" with bad "
+                CDEBUG(D_RPCTRACE, "operation %d from nid "LPX64" (%s) with bad "
                        "export cookie "LPX64" (ptl req %d/rep %d); this is "
                        "normal if this node rebooted with a lock held\n",
-                       req->rq_reqmsg->opc, req->rq_connection->c_peer.peer_nid,
+                       req->rq_reqmsg->opc, req->rq_peer.peer_nid,
+                       portals_nid2str(req->rq_peer.peer_ni->pni_number,
+                                       req->rq_peer.peer_nid, str),
                        req->rq_reqmsg->handle.cookie,
                        req->rq_request_portal, req->rq_reply_portal);
 
@@ -720,17 +1003,50 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 RETURN(0);
         }
 
-        if (req->rq_reqmsg->opc == LDLM_BL_CALLBACK) {
+        LASSERT(req->rq_export != NULL);
+        LASSERT(req->rq_export->exp_obd != NULL);
+
+        switch(req->rq_reqmsg->opc) {
+        case LDLM_BL_CALLBACK:
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
-        } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
+                break;
+        case LDLM_CP_CALLBACK:
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
-        } else {
+                break;
+        case LDLM_GL_CALLBACK:
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0);
+                break;
+        case OBD_LOG_CANCEL:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
+                rc = llog_origin_handle_cancel(req);
+                ldlm_callback_reply(req, rc);
+                RETURN(0);
+        case LLOG_ORIGIN_HANDLE_CREATE:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_create(req);
+                ldlm_callback_reply(req, rc);
+                RETURN(0);
+        case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_next_block(req);
+                ldlm_callback_reply(req, rc);
+                RETURN(0);
+        case LLOG_ORIGIN_HANDLE_READ_HEADER:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_read_header(req);
+                ldlm_callback_reply(req, rc);
+                RETURN(0);
+        case LLOG_ORIGIN_HANDLE_CLOSE:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_close(req);
+                ldlm_callback_reply(req, rc);
+                RETURN(0);
+        default:
+                CERROR("unknown opcode %u\n", req->rq_reqmsg->opc);
                 ldlm_callback_reply(req, -EPROTO);
                 RETURN(0);
         }
 
-        LASSERT(req->rq_export != NULL);
-        LASSERT(req->rq_export->exp_obd != NULL);
         ns = req->rq_export->exp_obd->obd_namespace;
         LASSERT(ns != NULL);
 
@@ -741,7 +1057,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 ldlm_callback_reply (req, -EPROTO);
                 RETURN (0);
         }
-        
+
         lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
         if (!lock) {
                 CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
@@ -750,20 +1066,39 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 RETURN(0);
         }
 
-        /* we want the ost thread to get this reply so that it can respond
+        /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
+        lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
+
+        /* We want the ost thread to get this reply so that it can respond
          * to ost requests (write cache writeback) that might be triggered
-         * in the callback */
-        ldlm_callback_reply(req, 0);
+         * in the callback.
+         *
+         * But we'd also like to be able to indicate in the reply that we're
+         * cancelling right now, because it's unused, or have an intent result
+         * in the reply, so we might have to push the responsibility for sending
+         * the reply down into the AST handlers, alas. */
 
         switch (req->rq_reqmsg->opc) {
         case LDLM_BL_CALLBACK:
                 CDEBUG(D_INODE, "blocking ast\n");
-                ldlm_handle_bl_callback(req, ns, dlm_req, lock);
+#ifdef __KERNEL__
+                rc = ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock);
+                ldlm_callback_reply(req, rc);
+#else
+                rc = 0;
+                ldlm_callback_reply(req, rc);
+                ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
+#endif
                 break;
         case LDLM_CP_CALLBACK:
                 CDEBUG(D_INODE, "completion ast\n");
+                ldlm_callback_reply(req, 0);
                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
                 break;
+        case LDLM_GL_CALLBACK:
+                CDEBUG(D_INODE, "glimpse ast\n");
+                ldlm_handle_gl_callback(req, ns, dlm_req, lock);
+                break;
         default:
                 LBUG();                         /* checked above */
         }
@@ -814,113 +1149,190 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static int ldlm_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
-                          void *karg, void *uarg)
+#ifdef __KERNEL__
+static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
+{
+        struct ldlm_bl_work_item *blwi = NULL;
+
+        spin_lock(&blp->blp_lock);
+        if (!list_empty(&blp->blp_list)) {
+                blwi = list_entry(blp->blp_list.next, struct ldlm_bl_work_item,
+                                  blwi_entry);
+                list_del(&blwi->blwi_entry);
+        }
+        spin_unlock(&blp->blp_lock);
+
+        return blwi;
+}
+
+struct ldlm_bl_thread_data {
+        int                     bltd_num;
+        struct ldlm_bl_pool     *bltd_blp;
+};
+
+static int ldlm_bl_thread_main(void *arg)
 {
-        struct obd_device *obddev = class_conn2obd(conn);
-        struct ptlrpc_connection *connection;
-        struct obd_uuid uuid = { "ldlm" };
-        int err = 0;
+        struct ldlm_bl_thread_data *bltd = arg;
+        struct ldlm_bl_pool *blp = bltd->bltd_blp;
+        unsigned long flags;
         ENTRY;
 
-        if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
-            _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
-                CDEBUG(D_IOCTL, "invalid ioctl (type %d, nr %d, size %d)\n",
-                       _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
-                RETURN(-EINVAL);
+        /* XXX boiler-plate */
+        {
+                char name[sizeof(current->comm)];
+                snprintf(name, sizeof(name) - 1, "ldlm_bl_%02d",
+                         bltd->bltd_num);
+                kportal_daemonize(name);
         }
+        SIGNAL_MASK_LOCK(current, flags);
+        sigfillset(&current->blocked);
+        RECALC_SIGPENDING;
+        SIGNAL_MASK_UNLOCK(current, flags);
 
-        OBD_ALLOC(obddev->u.ldlm.ldlm_client,
-                  sizeof(*obddev->u.ldlm.ldlm_client));
-        connection = ptlrpc_uuid_to_connection(&uuid);
-        if (!connection)
-                CERROR("No LDLM UUID found: assuming ldlm is local.\n");
-
-        switch (cmd) {
-        case IOC_LDLM_TEST:
-                //err = ldlm_test(obddev, conn);
-                err = 0;
-                CERROR("-- NO TESTS WERE RUN done err %d\n", err);
-                GOTO(out, err);
-        case IOC_LDLM_DUMP:
-                ldlm_dump_all_namespaces();
-                GOTO(out, err);
-        default:
-                GOTO(out, err = -EINVAL);
+        atomic_inc(&blp->blp_num_threads);
+        complete(&blp->blp_comp);
+
+        while(1) {
+                struct l_wait_info lwi = { 0 };
+                struct ldlm_bl_work_item *blwi = NULL;
+
+                l_wait_event_exclusive(blp->blp_waitq,
+                                       (blwi = ldlm_bl_get_work(blp)) != NULL,
+                                       &lwi);
+
+                if (blwi->blwi_ns == NULL)
+                        break;
+
+                ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
+                                        blwi->blwi_lock);
+                OBD_FREE(blwi, sizeof(*blwi));
         }
 
- out:
-        if (connection)
-                ptlrpc_put_connection(connection);
-        OBD_FREE(obddev->u.ldlm.ldlm_client,
-                 sizeof(*obddev->u.ldlm.ldlm_client));
-        return err;
+        atomic_dec(&blp->blp_num_threads);
+        complete(&blp->blp_comp);
+        RETURN(0);
 }
 
-static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
+#endif
+
+static int ldlm_setup(void);
+static int ldlm_cleanup(int force);
+
+int ldlm_get_ref(void)
+{
+        int rc = 0;
+        down(&ldlm_ref_sem);
+        if (++ldlm_refcount == 1) {
+                rc = ldlm_setup();
+                if (rc)
+                        ldlm_refcount--;
+        }
+        up(&ldlm_ref_sem);
+
+        RETURN(rc);
+}
+
+void ldlm_put_ref(int force)
+{
+        down(&ldlm_ref_sem);
+        if (ldlm_refcount == 1) {
+                int rc = ldlm_cleanup(force);
+                if (rc)
+                        CERROR("ldlm_cleanup failed: %d\n", rc);
+                else
+                        ldlm_refcount--;
+        } else {
+                ldlm_refcount--;
+        }
+        up(&ldlm_ref_sem);
+
+        EXIT;
+}
+
+static int ldlm_setup(void)
 {
-        struct ldlm_obd *ldlm = &obddev->u.ldlm;
-        int rc, i;
+        struct ldlm_bl_pool *blp;
+        int rc = 0;
+#ifdef __KERNEL__
+        int i;
+#endif
         ENTRY;
 
-        if (ldlm_already_setup)
+        if (ldlm_state != NULL)
                 RETURN(-EALREADY);
 
-        rc = ldlm_proc_setup(obddev);
-        if (rc != 0)
-                RETURN(rc);
+        OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
+        if (ldlm_state == NULL)
+                RETURN(-ENOMEM);
 
 #ifdef __KERNEL__
-        inter_module_register("ldlm_cli_cancel_unused", THIS_MODULE,
-                              ldlm_cli_cancel_unused);
-        inter_module_register("ldlm_namespace_cleanup", THIS_MODULE,
-                              ldlm_namespace_cleanup);
-        inter_module_register("ldlm_replay_locks", THIS_MODULE,
-                              ldlm_replay_locks);
-
-        ldlm->ldlm_cb_service =
-                ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
-                                LDLM_MAXREQSIZE, LDLM_CB_REQUEST_PORTAL,
-                                LDLM_CB_REPLY_PORTAL,
-                                ldlm_callback_handler, "ldlm_cbd", obddev);
-
-        if (!ldlm->ldlm_cb_service) {
+        rc = ldlm_proc_setup();
+        if (rc != 0)
+                GOTO(out_free, rc);
+#endif
+
+        ldlm_state->ldlm_cb_service =
+                ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
+                                LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
+                                ldlm_callback_handler, "ldlm_cbd",
+                                ldlm_svc_proc_dir);
+
+        if (!ldlm_state->ldlm_cb_service) {
                 CERROR("failed to start service\n");
                 GOTO(out_proc, rc = -ENOMEM);
         }
 
-        ldlm->ldlm_cancel_service =
-                ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
-                                LDLM_MAXREQSIZE, LDLM_CANCEL_REQUEST_PORTAL,
+        ldlm_state->ldlm_cancel_service =
+                ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
+                                LDLM_CANCEL_REQUEST_PORTAL,
                                 LDLM_CANCEL_REPLY_PORTAL,
-                                ldlm_cancel_handler, "ldlm_canceld", obddev);
+                                ldlm_cancel_handler, "ldlm_canceld",
+                                ldlm_svc_proc_dir);
 
-        if (!ldlm->ldlm_cancel_service) {
+        if (!ldlm_state->ldlm_cancel_service) {
                 CERROR("failed to start service\n");
                 GOTO(out_proc, rc = -ENOMEM);
         }
 
+        OBD_ALLOC(blp, sizeof(*blp));
+        if (blp == NULL)
+                GOTO(out_proc, rc = -ENOMEM);
+        ldlm_state->ldlm_bl_pool = blp;
+
+        atomic_set(&blp->blp_num_threads, 0);
+        init_waitqueue_head(&blp->blp_waitq);
+        spin_lock_init(&blp->blp_lock);
+
+        INIT_LIST_HEAD(&blp->blp_list);
+
+#ifdef __KERNEL__
         for (i = 0; i < LDLM_NUM_THREADS; i++) {
-                char name[32];
-                sprintf(name, "ldlm_cn_%02d", i);
-                rc = ptlrpc_start_thread(obddev, ldlm->ldlm_cancel_service,
-                                         name);
-                if (rc) {
+                struct ldlm_bl_thread_data bltd = {
+                        .bltd_num = i,
+                        .bltd_blp = blp,
+                };
+                init_completion(&blp->blp_comp);
+                rc = kernel_thread(ldlm_bl_thread_main, &bltd, 0);
+                if (rc < 0) {
                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
                         LBUG();
                         GOTO(out_thread, rc);
                 }
+                wait_for_completion(&blp->blp_comp);
         }
 
-        for (i = 0; i < LDLM_NUM_THREADS; i++) {
-                char name[32];
-                sprintf(name, "ldlm_cb_%02d", i);
-                rc = ptlrpc_start_thread(obddev, ldlm->ldlm_cb_service, name);
-                if (rc) {
-                        CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
-                        LBUG();
-                        GOTO(out_thread, rc);
-                }
+        rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cancel_service,
+                                    LDLM_NUM_THREADS, "ldlm_cn");
+        if (rc) {
+                LBUG();
+                GOTO(out_thread, rc);
+        }
+
+        rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cb_service,
+                                    LDLM_NUM_THREADS, "ldlm_cb");
+        if (rc) {
+                LBUG();
+                GOTO(out_thread, rc);
         }
 
         INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
@@ -944,26 +1356,29 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
         init_timer(&waiting_locks_timer);
 #endif
 
-        ldlm_already_setup = 1;
-
         RETURN(0);
 
- out_thread:
 #ifdef __KERNEL__
-        ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
-        ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
-        ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
-        ptlrpc_unregister_service(ldlm->ldlm_cb_service);
+ out_thread:
+        ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
 #endif
- out_proc:
-        ldlm_proc_cleanup(obddev);
 
+ out_proc:
+#ifdef __KERNEL__
+        ldlm_proc_cleanup();
+ out_free:
+#endif
+        OBD_FREE(ldlm_state, sizeof(*ldlm_state));
+        ldlm_state = NULL;
         return rc;
 }
 
-static int ldlm_cleanup(struct obd_device *obddev, int flags)
+static int ldlm_cleanup(int force)
 {
-        struct ldlm_obd *ldlm = &obddev->u.ldlm;
+#ifdef __KERNEL__
+        struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
+#endif
         ENTRY;
 
         if (!list_empty(&ldlm_namespace_list)) {
@@ -973,56 +1388,41 @@ static int ldlm_cleanup(struct obd_device *obddev, int flags)
         }
 
 #ifdef __KERNEL__
-        if (flags & OBD_OPT_FORCE) {
-                ptlrpc_put_ldlm_hooks();
-        } else if (ptlrpc_ldlm_hooks_referenced()) {
-                CERROR("Some connections weren't cleaned up; run lconf with "
-                       "--force to forcibly unload.\n");
-                ptlrpc_dump_connections();
-                RETURN(-EBUSY);
+        while (atomic_read(&blp->blp_num_threads) > 0) {
+                struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
+
+                init_completion(&blp->blp_comp);
+
+                spin_lock(&blp->blp_lock);
+                list_add_tail(&blwi.blwi_entry, &blp->blp_list);
+                wake_up(&blp->blp_waitq);
+                spin_unlock(&blp->blp_lock);
+
+                wait_for_completion(&blp->blp_comp);
         }
+        OBD_FREE(blp, sizeof(*blp));
 
-        ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
-        ptlrpc_unregister_service(ldlm->ldlm_cb_service);
-        ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
-        ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
-        ldlm_proc_cleanup(obddev);
+        ptlrpc_stop_all_threads(ldlm_state->ldlm_cb_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
+        ptlrpc_stop_all_threads(ldlm_state->ldlm_cancel_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
+        ldlm_proc_cleanup();
 
         expired_lock_thread.elt_state = ELT_TERMINATE;
         wake_up(&expired_lock_thread.elt_waitq);
         wait_event(expired_lock_thread.elt_waitq,
                    expired_lock_thread.elt_state == ELT_STOPPED);
 
-        inter_module_unregister("ldlm_namespace_cleanup");
-        inter_module_unregister("ldlm_cli_cancel_unused");
-        inter_module_unregister("ldlm_replay_locks");
 #endif
 
-        ldlm_already_setup = 0;
-        RETURN(0);
-}
+        OBD_FREE(ldlm_state, sizeof(*ldlm_state));
+        ldlm_state = NULL;
 
-static int ldlm_connect(struct lustre_handle *conn, struct obd_device *src,
-                        struct obd_uuid *cluuid)
-{
-        return class_connect(conn, src, cluuid);
+        RETURN(0);
 }
 
-struct obd_ops ldlm_obd_ops = {
-        o_owner:       THIS_MODULE,
-        o_iocontrol:   ldlm_iocontrol,
-        o_setup:       ldlm_setup,
-        o_cleanup:     ldlm_cleanup,
-        o_connect:     ldlm_connect,
-        o_disconnect:  class_disconnect
-};
-
 int __init ldlm_init(void)
 {
-        int rc = class_register_type(&ldlm_obd_ops, 0, OBD_LDLM_DEVICENAME);
-        if (rc != 0)
-                return rc;
-
         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
                                                sizeof(struct ldlm_resource), 0,
                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
@@ -1042,23 +1442,31 @@ int __init ldlm_init(void)
         return 0;
 }
 
-static void __exit ldlm_exit(void)
+void __exit ldlm_exit(void)
 {
-        class_unregister_type(OBD_LDLM_DEVICENAME);
+        if ( ldlm_refcount )
+                CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
                 CERROR("couldn't free ldlm resource slab\n");
         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
                 CERROR("couldn't free ldlm lock slab\n");
 }
 
+/* ldlm_flock.c */
+EXPORT_SYMBOL(ldlm_flock_completion_ast);
+
+/* ldlm_extent.c */
+EXPORT_SYMBOL(ldlm_extent_shift_kms);
+
 /* ldlm_lock.c */
+EXPORT_SYMBOL(ldlm_get_processing_policy);
 EXPORT_SYMBOL(ldlm_lock2desc);
 EXPORT_SYMBOL(ldlm_register_intent);
-EXPORT_SYMBOL(ldlm_unregister_intent);
 EXPORT_SYMBOL(ldlm_lockname);
 EXPORT_SYMBOL(ldlm_typename);
 EXPORT_SYMBOL(ldlm_lock2handle);
 EXPORT_SYMBOL(__ldlm_handle2lock);
+EXPORT_SYMBOL(ldlm_lock_get);
 EXPORT_SYMBOL(ldlm_lock_put);
 EXPORT_SYMBOL(ldlm_lock_match);
 EXPORT_SYMBOL(ldlm_lock_cancel);
@@ -1072,6 +1480,7 @@ EXPORT_SYMBOL(ldlm_lock_dump);
 EXPORT_SYMBOL(ldlm_lock_dump_handle);
 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
+EXPORT_SYMBOL(ldlm_lock_allow_match);
 
 /* ldlm_request.c */
 EXPORT_SYMBOL(ldlm_completion_ast);
@@ -1089,10 +1498,13 @@ EXPORT_SYMBOL(ldlm_change_cbdata);
 /* ldlm_lockd.c */
 EXPORT_SYMBOL(ldlm_server_blocking_ast);
 EXPORT_SYMBOL(ldlm_server_completion_ast);
+EXPORT_SYMBOL(ldlm_server_glimpse_ast);
 EXPORT_SYMBOL(ldlm_handle_enqueue);
 EXPORT_SYMBOL(ldlm_handle_cancel);
 EXPORT_SYMBOL(ldlm_handle_convert);
 EXPORT_SYMBOL(ldlm_del_waiting_lock);
+EXPORT_SYMBOL(ldlm_get_ref);
+EXPORT_SYMBOL(ldlm_put_ref);
 
 #if 0
 /* ldlm_test.c */
@@ -1106,28 +1518,25 @@ EXPORT_SYMBOL(ldlm_namespace_new);
 EXPORT_SYMBOL(ldlm_namespace_cleanup);
 EXPORT_SYMBOL(ldlm_namespace_free);
 EXPORT_SYMBOL(ldlm_namespace_dump);
+EXPORT_SYMBOL(ldlm_dump_all_namespaces);
+EXPORT_SYMBOL(ldlm_resource_get);
+EXPORT_SYMBOL(ldlm_resource_putref);
 
 /* l_lock.c */
 EXPORT_SYMBOL(l_lock);
 EXPORT_SYMBOL(l_unlock);
 
 /* ldlm_lib.c */
-EXPORT_SYMBOL(client_import_connect);
-EXPORT_SYMBOL(client_import_disconnect);
+EXPORT_SYMBOL(client_obd_setup);
+EXPORT_SYMBOL(client_obd_cleanup);
+EXPORT_SYMBOL(client_connect_import);
+EXPORT_SYMBOL(client_disconnect_export);
 EXPORT_SYMBOL(target_abort_recovery);
 EXPORT_SYMBOL(target_handle_connect);
+EXPORT_SYMBOL(target_destroy_export);
 EXPORT_SYMBOL(target_cancel_recovery_timer);
 EXPORT_SYMBOL(target_send_reply);
 EXPORT_SYMBOL(target_queue_recovery_request);
 EXPORT_SYMBOL(target_handle_ping);
 EXPORT_SYMBOL(target_handle_disconnect);
 EXPORT_SYMBOL(target_queue_final_reply);
-
-#ifdef __KERNEL__
-MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
-MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
-MODULE_LICENSE("GPL");
-
-module_init(ldlm_init);
-module_exit(ldlm_exit);
-#endif