Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 9350539..5ee9f0f 100644 (file)
@@ -46,15 +46,19 @@ extern struct list_head ldlm_namespace_list;
 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
 
-static int ldlm_already_setup = 0;
+static DECLARE_MUTEX(ldlm_ref_sem);
+static int ldlm_refcount = 0;
 
-#ifdef __KERNEL__
+/* LDLM state */ 
+
+static struct ldlm_state *ldlm ;
 
 inline unsigned long round_timeout(unsigned long timeout)
 {
         return ((timeout / HZ) + 1) * HZ;
 }
 
+#ifdef __KERNEL__
 /* XXX should this be per-ldlm? */
 static struct list_head waiting_locks_list;
 static spinlock_t waiting_locks_spinlock;
@@ -66,11 +70,29 @@ static struct expired_lock_thread {
         struct list_head          elt_expired_locks;
         spinlock_t                elt_lock;
 } expired_lock_thread;
+#endif
 
 #define ELT_STOPPED   0
 #define ELT_READY     1
 #define ELT_TERMINATE 2
 
+struct ldlm_bl_pool {
+        spinlock_t              blp_lock;
+        struct list_head        blp_list;
+        wait_queue_head_t       blp_waitq;
+        atomic_t                blp_num_threads;
+        struct completion       blp_comp;
+};
+
+struct ldlm_bl_work_item {
+        struct list_head        blwi_entry;
+        struct ldlm_namespace   *blwi_ns;
+        struct ldlm_lock_desc   blwi_ld;
+        struct ldlm_lock        *blwi_lock;
+};
+
+#ifdef __KERNEL__
+
 static inline int have_expired_locks(void)
 {
         int need_to_run;
@@ -91,17 +113,17 @@ static int expired_lock_main(void *arg)
         ENTRY;
         lock_kernel();
         kportal_daemonize("ldlm_elt");
-        
+
         SIGNAL_MASK_LOCK(current, flags);
         sigfillset(&current->blocked);
         RECALC_SIGPENDING;
         SIGNAL_MASK_UNLOCK(current, flags);
-        
+
         unlock_kernel();
-        
+
         expired_lock_thread.elt_state = ELT_READY;
         wake_up(&expired_lock_thread.elt_waitq);
-        
+
         while (1) {
                 l_wait_event(expired_lock_thread.elt_waitq,
                              have_expired_locks() ||
@@ -114,7 +136,7 @@ static int expired_lock_main(void *arg)
                                                             struct ldlm_lock,
                                                             l_pending_chain);
                         spin_unlock_bh(&expired_lock_thread.elt_lock);
-                        
+
                         ptlrpc_fail_export(lock->l_export);
 
                         spin_lock_bh(&expired_lock_thread.elt_lock);
@@ -133,6 +155,7 @@ static int expired_lock_main(void *arg)
 static void waiting_locks_callback(unsigned long unused)
 {
         struct ldlm_lock *lock;
+        char str[PTL_NALFMT_SIZE];
 
         spin_lock_bh(&waiting_locks_spinlock);
         while (!list_empty(&waiting_locks_list)) {
@@ -143,10 +166,13 @@ static void waiting_locks_callback(unsigned long unused)
                         break;
 
                 LDLM_ERROR(lock, "lock callback timer expired: evicting client "
-                           "%s@%s nid "LPU64,
+                           "%s@%s nid "LPX64" (%s) ",
                            lock->l_export->exp_client_uuid.uuid,
                            lock->l_export->exp_connection->c_remote_uuid.uuid,
-                           lock->l_export->exp_connection->c_peer.peer_nid);
+                           lock->l_export->exp_connection->c_peer.peer_nid,
+                           portals_nid2str(lock->l_export->exp_connection->c_peer.peer_ni->pni_number,
+                                           lock->l_export->exp_connection->c_peer.peer_nid,
+                                           str));
 
                 spin_lock_bh(&expired_lock_thread.elt_lock);
                 list_del(&lock->l_pending_chain);
@@ -169,10 +195,14 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
 {
         unsigned long timeout_rounded;
 
+        spin_lock_bh(&waiting_locks_spinlock);
+        if (!list_empty(&lock->l_pending_chain)) {
+                LDLM_DEBUG(lock, "not re-adding to wait list");
+                spin_unlock_bh(&waiting_locks_spinlock);
+                return 0;
+        }
         LDLM_DEBUG(lock, "adding to wait list");
-        LASSERT(list_empty(&lock->l_pending_chain));
 
-        spin_lock_bh(&waiting_locks_spinlock);
         lock->l_callback_timeout = jiffies + (obd_timeout * HZ / 2);
 
         timeout_rounded = round_timeout(lock->l_callback_timeout);
@@ -245,15 +275,19 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock)
 
 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, char *ast_type)
 {
+        const struct ptlrpc_connection *conn = lock->l_export->exp_connection;
+        char str[PTL_NALFMT_SIZE];
+
         CERROR("%s AST failed (%d) for res "LPU64"/"LPU64
-               ", mode %s: evicting client %s@%s NID "LPU64"\n",
+               ", mode %s: evicting client %s@%s NID "LPX64" (%s)\n",
                ast_type, rc,
                lock->l_resource->lr_name.name[0],
                lock->l_resource->lr_name.name[1],
                ldlm_lockname[lock->l_granted_mode],
                lock->l_export->exp_client_uuid.uuid,
-               lock->l_export->exp_connection->c_remote_uuid.uuid,
-               lock->l_export->exp_connection->c_peer.peer_nid);
+               conn->c_remote_uuid.uuid, conn->c_peer.peer_nid,
+               portals_nid2str(conn->c_peer.peer_ni->pni_number,
+                               conn->c_peer.peer_nid, str));
         ptlrpc_fail_export(lock->l_export);
 }
 
@@ -274,11 +308,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         LASSERT(lock);
 
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
-        /* XXX This is necessary because, with the lock re-tasking, we actually
-         * _can_ get called in here twice.  (bug 830) */
-        if (!list_empty(&lock->l_pending_chain)) {
+        if (lock->l_granted_mode != lock->l_req_mode) {
+                /* this blocking AST will be communicated as part of the
+                 * completion AST instead */
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-                RETURN(0);
+                LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");                 RETURN(0);
         }
 
         if (lock->l_destroyed) {
@@ -294,7 +328,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         }
 #endif
 
-        req = ptlrpc_prep_req(lock->l_export->exp_ldlm_data.led_import,
+        req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
                               LDLM_BL_CALLBACK, 1, &size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
@@ -303,19 +337,36 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         memcpy(&body->lock_handle1, &lock->l_remote_handle,
                sizeof(body->lock_handle1));
         memcpy(&body->lock_desc, desc, sizeof(*desc));
+        body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
 
         LDLM_DEBUG(lock, "server preparing blocking AST");
         req->rq_replen = lustre_msg_size(0, NULL);
 
-        ldlm_add_waiting_lock(lock);
+        if (lock->l_granted_mode == lock->l_req_mode)
+                ldlm_add_waiting_lock(lock);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
-        req->rq_level = LUSTRE_CONN_RECOVER;
+        req->rq_send_state = LUSTRE_IMP_FULL;
         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
         rc = ptlrpc_queue_wait(req);
         if (rc == -ETIMEDOUT || rc == -EINTR) {
+#ifdef __KERNEL__
                 ldlm_del_waiting_lock(lock);
                 ldlm_failed_ast(lock, rc, "blocking");
+#else
+                /* XXX
+                 * Here we treat all clients as liblustre. When BLOCKING AST
+                 * timeout we don't evicting the client and only cancel
+                 * the lock.
+                 * restore to orignial implementation later!!!
+                 * XXX
+                 */
+                CERROR("BLOCKING AST to client (nid "LPU64") timeout, "
+                       "simply cancel lock 0x%p\n",
+                       req->rq_connection->c_peer.peer_nid, lock);
+                ldlm_lock_cancel(lock);
+                rc = -ERESTART;
+#endif
         } else if (rc) {
                 if (rc == -EINVAL)
                         CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
@@ -332,9 +383,11 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                         CDEBUG(D_ERROR, "client (nid "LPU64") returned %d "
                                "from blocking AST for lock %p\n",
                                req->rq_connection->c_peer.peer_nid,
-                               req->rq_repmsg->status, lock);
-                LDLM_DEBUG(lock, "client returned error %d from blocking AST",
-                           req->rq_status);
+                               (req->rq_repmsg != NULL)?
+                               req->rq_repmsg->status : 0,
+                               lock);
+                LDLM_DEBUG(lock, "client sent rc %d rq_status %d from blocking "
+                           "AST", rc, req->rq_status);
                 ldlm_lock_cancel(lock);
                 /* Server-side AST functions are called from ldlm_reprocess_all,
                  * which needs to be told to please restart its reprocessing. */
@@ -373,7 +426,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (total_enqueue_wait / 1000000 > obd_timeout)
                 LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait);
 
-        req = ptlrpc_prep_req(lock->l_export->exp_ldlm_data.led_import,
+        req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
                               LDLM_CP_CALLBACK, 1, &size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
@@ -388,17 +441,24 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                    total_enqueue_wait);
         req->rq_replen = lustre_msg_size(0, NULL);
 
-        req->rq_level = LUSTRE_CONN_RECOVER;
+        req->rq_send_state = LUSTRE_IMP_FULL;
         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
+
+        /* We only send real blocking ASTs after the lock is granted */
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        if (lock->l_flags & LDLM_FL_AST_SENT) {
+                body->lock_flags |= LDLM_FL_AST_SENT;
+                ldlm_add_waiting_lock(lock); /* start the lock-timeout clock */
+        }
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
         rc = ptlrpc_queue_wait(req);
         if (rc == -ETIMEDOUT || rc == -EINTR) {
                 ldlm_del_waiting_lock(lock);
                 ldlm_failed_ast(lock, rc, "completion");
         } else if (rc) {
-                CERROR("client returned %d from completion AST for lock %p\n",
-                       req->rq_status, lock);
-                LDLM_DEBUG(lock, "client returned error %d from completion AST",
-                           req->rq_status);
+                LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
+                           "completion AST\n", rc, req->rq_status);
                 ldlm_lock_cancel(lock);
                 /* Server-side AST functions are called from ldlm_reprocess_all,
                  * which needs to be told to please restart its reprocessing. */
@@ -431,7 +491,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                 CERROR ("Can't unpack dlm_req\n");
                 RETURN (-EFAULT);
         }
-        
+
         flags = dlm_req->lock_flags;
         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN &&
             (flags & LDLM_FL_HAS_INTENT)) {
@@ -440,15 +500,14 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                 cookie = req;
                 cookielen = sizeof(*req);
         } else {
-                rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
-                                     &req->rq_repmsg);
+                rc = lustre_pack_reply(req, 1, &size, NULL);
                 if (rc) {
                         CERROR("out of memory\n");
                         RETURN(-ENOMEM);
                 }
-                if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
-                        cookie = &dlm_req->lock_desc.l_extent;
-                        cookielen = sizeof(struct ldlm_extent);
+                if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) {
+                        cookie = &dlm_req->lock_desc.l_policy_data;
+                        cookielen = sizeof(ldlm_policy_data_t);
                 }
         }
 
@@ -483,26 +542,33 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
         dlm_rep->lock_flags = flags;
 
         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
-        if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
-                memcpy(&dlm_rep->lock_extent, &lock->l_extent,
-                       sizeof(lock->l_extent));
+        if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) {
+                memcpy(&dlm_rep->lock_policy_data, &lock->l_policy_data,
+                       cookielen);
+        }
         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
                 memcpy(&dlm_rep->lock_resource_name, &lock->l_resource->lr_name,
                        sizeof(dlm_rep->lock_resource_name));
                 dlm_rep->lock_mode = lock->l_req_mode;
         }
 
+        /* We never send a blocking AST until the lock is granted, but
+         * we can tell it right now */
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        if (lock->l_flags & LDLM_FL_AST_SENT)
+                dlm_rep->lock_flags |= LDLM_FL_AST_SENT;
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
         EXIT;
  out:
-        if (lock)
-                LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
-                           "(err=%d)", err);
         req->rq_status = err;
 
         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
         if (lock) {
-                if (!err)
+                LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
+                           "(err=%d)", err);
+                if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
                         ldlm_reprocess_all(lock->l_resource);
                 LDLM_LOCK_PUT(lock);
         }
@@ -525,8 +591,8 @@ int ldlm_handle_convert(struct ptlrpc_request *req)
                 CERROR ("Can't unpack dlm_req\n");
                 RETURN (-EFAULT);
         }
-        
-        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+
+        rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc) {
                 CERROR("out of memory\n");
                 RETURN(-ENOMEM);
@@ -560,6 +626,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
         struct ldlm_lock *lock;
+        char str[PTL_NALFMT_SIZE];
         int rc;
         ENTRY;
 
@@ -570,7 +637,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
                 RETURN(-EFAULT);
         }
 
-        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        rc = lustre_pack_reply(req, 0, NULL, NULL);
         if (rc) {
                 CERROR("out of memory\n");
                 RETURN(-ENOMEM);
@@ -579,8 +646,10 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
         if (!lock) {
                 CERROR("received cancel for unknown lock cookie "LPX64
-                       " from nid "LPU64"\n", dlm_req->lock_handle1.cookie,
-                       req->rq_connection->c_peer.peer_nid);
+                       " from nid "LPX64" (%s)\n", dlm_req->lock_handle1.cookie,
+                       req->rq_connection->c_peer.peer_nid,
+                       portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
+                                       req->rq_connection->c_peer.peer_nid, str));
                 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
                                   "(cookie "LPU64")",
                                   dlm_req->lock_handle1.cookie);
@@ -605,9 +674,8 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static void ldlm_handle_bl_callback(struct ptlrpc_request *req,
-                                    struct ldlm_namespace *ns,
-                                    struct ldlm_request *dlm_req,
+static void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
+                                    struct ldlm_lock_desc *ld,
                                     struct ldlm_lock *lock)
 {
         int do_ast;
@@ -624,8 +692,9 @@ static void ldlm_handle_bl_callback(struct ptlrpc_request *req,
                            "callback (%p)", lock->l_blocking_ast);
                 if (lock->l_blocking_ast != NULL) {
                         l_unlock(&ns->ns_lock);
-                        lock->l_blocking_ast(lock, &dlm_req->lock_desc,
-                                             lock->l_data, LDLM_CB_BLOCKING);
+                        l_check_no_ns_lock(ns);
+                        lock->l_blocking_ast(lock, ld, lock->l_ast_data,
+                                             LDLM_CB_BLOCKING);
                         l_lock(&ns->ns_lock);
                 }
         } else {
@@ -656,9 +725,9 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
                 LDLM_DEBUG(lock, "completion AST, new lock mode");
         }
-        if (lock->l_resource->lr_type == LDLM_EXTENT)
-                memcpy(&lock->l_extent, &dlm_req->lock_desc.l_extent,
-                       sizeof(lock->l_extent));
+        if (lock->l_resource->lr_type != LDLM_PLAIN)
+                memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
+                       sizeof(lock->l_policy_data));
 
         ldlm_resource_unlink_lock(lock);
         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
@@ -668,6 +737,12 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                                          dlm_req->lock_desc.l_resource.lr_name);
                 LDLM_DEBUG(lock, "completion AST, new resource");
         }
+
+        if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
+                lock->l_flags |= LDLM_FL_CBPENDING;
+                LDLM_DEBUG(lock, "completion AST includes blocking AST");
+        }
+
         lock->l_resource->lr_tmp = &ast_list;
         ldlm_grant_lock(lock, req, sizeof(*req), 1);
         lock->l_resource->lr_tmp = NULL;
@@ -675,7 +750,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
         l_unlock(&ns->ns_lock);
         LDLM_LOCK_PUT(lock);
 
-        ldlm_run_ast_work(&ast_list);
+        ldlm_run_ast_work(ns, &ast_list);
 
         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
                           lock);
@@ -685,18 +760,44 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
 {
         req->rq_status = rc;
-        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
-                             &req->rq_repmsg);
+        rc = lustre_pack_reply(req, 0, NULL, NULL);
         if (rc)
                 return rc;
         return ptlrpc_reply(req);
 }
 
+#ifdef __KERNEL__
+static int ldlm_bl_to_thread(struct ldlm_state *ldlm, struct ldlm_namespace *ns,
+                             struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+{
+        struct ldlm_bl_pool *blp = ldlm->ldlm_bl_pool;
+        struct ldlm_bl_work_item *blwi;
+        ENTRY;
+
+        OBD_ALLOC(blwi, sizeof(*blwi));
+        if (blwi == NULL)
+                RETURN(-ENOMEM);
+
+        blwi->blwi_ns = ns;
+        blwi->blwi_ld = *ld;
+        blwi->blwi_lock = lock;
+
+        spin_lock(&blp->blp_lock);
+        list_add_tail(&blwi->blwi_entry, &blp->blp_list);
+        wake_up(&blp->blp_waitq);
+        spin_unlock(&blp->blp_lock);
+
+        RETURN(0);
+}
+#endif
+
 static int ldlm_callback_handler(struct ptlrpc_request *req)
 {
         struct ldlm_namespace *ns;
         struct ldlm_request *dlm_req;
         struct ldlm_lock *lock;
+        char str[PTL_NALFMT_SIZE];
+        int rc;
         ENTRY;
 
         /* Requests arrive in sender's byte order.  The ptlrpc service
@@ -707,10 +808,12 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         if (req->rq_export == NULL) {
                 struct ldlm_request *dlm_req;
 
-                CDEBUG(D_RPCTRACE, "operation %d from nid "LPU64" with bad "
+                CDEBUG(D_RPCTRACE, "operation %d from nid "LPX64" (%s) with bad "
                        "export cookie "LPX64" (ptl req %d/rep %d); this is "
                        "normal if this node rebooted with a lock held\n",
                        req->rq_reqmsg->opc, req->rq_connection->c_peer.peer_nid,
+                       portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
+                                       req->rq_connection->c_peer.peer_nid, str),
                        req->rq_reqmsg->handle.cookie,
                        req->rq_request_portal, req->rq_reply_portal);
 
@@ -728,6 +831,16 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
         } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
+        } else if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) {
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
+        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) {
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_NEXT_BLOCK) {
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_READ_HEADER) {
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CLOSE) {
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
         } else {
                 ldlm_callback_reply(req, -EPROTO);
                 RETURN(0);
@@ -735,6 +848,42 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
 
         LASSERT(req->rq_export != NULL);
         LASSERT(req->rq_export->exp_obd != NULL);
+
+#ifdef ENABLE_ORPHANS
+        /* FIXME - how to send reply */
+        if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) {
+                int rc = llog_origin_handle_cancel(req);
+                ldlm_callback_reply(req, rc);
+                RETURN(0);
+        }
+
+        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) {
+                int rc = llog_origin_handle_create(req);
+                req->rq_status = rc;
+                ptlrpc_reply(req);
+                RETURN(0);
+        }
+
+        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_NEXT_BLOCK) {
+                int rc = llog_origin_handle_next_block(req);
+                req->rq_status = rc;
+                ptlrpc_reply(req);
+                RETURN(0);
+        }
+
+        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_READ_HEADER) {
+                int rc = llog_origin_handle_read_header(req);
+                req->rq_status = rc;
+                ptlrpc_reply(req);
+                RETURN(0);
+        }
+
+        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CLOSE) {
+                int rc = llog_origin_handle_close(req);
+                ldlm_callback_reply(req, rc);
+                RETURN(0);
+        }
+#endif
         ns = req->rq_export->exp_obd->obd_namespace;
         LASSERT(ns != NULL);
 
@@ -745,7 +894,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 ldlm_callback_reply (req, -EPROTO);
                 RETURN (0);
         }
-        
+
         lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
         if (!lock) {
                 CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
@@ -754,15 +903,31 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 RETURN(0);
         }
 
-        /* we want the ost thread to get this reply so that it can respond
+        /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
+        lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
+
+        /* We want the ost thread to get this reply so that it can respond
          * to ost requests (write cache writeback) that might be triggered
-         * in the callback */
-        ldlm_callback_reply(req, 0);
+         * in the callback.
+         *
+         * But we'd also like to be able to indicate in the reply that we're
+         * cancelling right now, because it's unused, or have an intent result
+         * in the reply, so we might have to push the responsibility for sending
+         * the reply down into the AST handlers, alas. */
+        if (req->rq_reqmsg->opc != LDLM_BL_CALLBACK)
+                ldlm_callback_reply(req, 0);
 
         switch (req->rq_reqmsg->opc) {
         case LDLM_BL_CALLBACK:
                 CDEBUG(D_INODE, "blocking ast\n");
-                ldlm_handle_bl_callback(req, ns, dlm_req, lock);
+#ifdef __KERNEL__
+                rc = ldlm_bl_to_thread(ldlm, ns, &dlm_req->lock_desc, lock);
+                ldlm_callback_reply(req, rc);
+#else
+                rc = 0;
+                ldlm_callback_reply(req, rc);
+                ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
+#endif
                 break;
         case LDLM_CP_CALLBACK:
                 CDEBUG(D_INODE, "completion ast\n");
@@ -818,75 +983,134 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static int ldlm_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
-                          void *karg, void *uarg)
+#ifdef __KERNEL__
+static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
+{
+        struct ldlm_bl_work_item *blwi = NULL;
+
+        spin_lock(&blp->blp_lock);
+        if (!list_empty(&blp->blp_list)) {
+                blwi = list_entry(blp->blp_list.next, struct ldlm_bl_work_item,
+                                  blwi_entry);
+                list_del(&blwi->blwi_entry);
+        }
+        spin_unlock(&blp->blp_lock);
+
+        return blwi;
+}
+
+struct ldlm_bl_thread_data {
+        int                     bltd_num;
+        struct ldlm_bl_pool     *bltd_blp;
+};
+
+static int ldlm_bl_thread_main(void *arg)
 {
-        struct obd_device *obddev = class_conn2obd(conn);
-        struct ptlrpc_connection *connection;
-        struct obd_uuid uuid = { "ldlm" };
-        int err = 0;
+        struct ldlm_bl_thread_data *bltd = arg;
+        struct ldlm_bl_pool *blp = bltd->bltd_blp;
+        unsigned long flags;
         ENTRY;
 
-        if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
-            _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
-                CDEBUG(D_IOCTL, "invalid ioctl (type %d, nr %d, size %d)\n",
-                       _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
-                RETURN(-EINVAL);
+        /* XXX boiler-plate */
+        {
+                char name[sizeof(current->comm)];
+                snprintf(name, sizeof(name) - 1, "ldlm_bl_%02d",
+                         bltd->bltd_num);
+                kportal_daemonize(name);
         }
+        SIGNAL_MASK_LOCK(current, flags);
+        sigfillset(&current->blocked);
+        RECALC_SIGPENDING;
+        SIGNAL_MASK_UNLOCK(current, flags);
 
-        OBD_ALLOC(obddev->u.ldlm.ldlm_client,
-                  sizeof(*obddev->u.ldlm.ldlm_client));
-        connection = ptlrpc_uuid_to_connection(&uuid);
-        if (!connection)
-                CERROR("No LDLM UUID found: assuming ldlm is local.\n");
-
-        switch (cmd) {
-        case IOC_LDLM_TEST:
-                //err = ldlm_test(obddev, conn);
-                err = 0;
-                CERROR("-- NO TESTS WERE RUN done err %d\n", err);
-                GOTO(out, err);
-        case IOC_LDLM_DUMP:
-                ldlm_dump_all_namespaces();
-                GOTO(out, err);
-        default:
-                GOTO(out, err = -EINVAL);
+        atomic_inc(&blp->blp_num_threads);
+        complete(&blp->blp_comp);
+
+        while(1) {
+                struct l_wait_info lwi = { 0 };
+                struct ldlm_bl_work_item *blwi = NULL;
+
+                l_wait_event_exclusive(blp->blp_waitq,
+                                       (blwi = ldlm_bl_get_work(blp)) != NULL,
+                                       &lwi);
+
+                if (blwi->blwi_ns == NULL)
+                        break;
+
+                ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
+                                        blwi->blwi_lock);
+                OBD_FREE(blwi, sizeof(*blwi));
         }
 
- out:
-        if (connection)
-                ptlrpc_put_connection(connection);
-        OBD_FREE(obddev->u.ldlm.ldlm_client,
-                 sizeof(*obddev->u.ldlm.ldlm_client));
-        return err;
+        atomic_dec(&blp->blp_num_threads);
+        complete(&blp->blp_comp);
+        RETURN(0);
+}
+
+#endif
+
+static int ldlm_setup(void);
+static int ldlm_cleanup(int force);
+
+int ldlm_get_ref(void)
+{
+        int rc = 0;
+        down(&ldlm_ref_sem);
+        if (++ldlm_refcount == 1) {
+                rc = ldlm_setup();
+                if (rc)
+                        ldlm_refcount--;
+        }
+        up(&ldlm_ref_sem);
+
+        RETURN(rc);
 }
 
-static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
+void ldlm_put_ref(int force)
 {
-        struct ldlm_obd *ldlm = &obddev->u.ldlm;
-        int rc, i;
+        down(&ldlm_ref_sem);
+        if (ldlm_refcount == 1) {
+                int rc = ldlm_cleanup(force);
+                if (rc)
+                        CERROR("ldlm_cleanup failed: %d\n", rc);
+                else
+                        ldlm_refcount--;
+        } else {
+                ldlm_refcount--;
+        }
+        up(&ldlm_ref_sem);
+
+        EXIT;
+}
+
+static int ldlm_setup(void)
+{
+        struct ldlm_bl_pool *blp;
+        int rc = 0;
+#ifdef __KERNEL__
+        int i;
+#endif
         ENTRY;
 
-        if (ldlm_already_setup)
+        if (ldlm != NULL)
                 RETURN(-EALREADY);
 
-        rc = ldlm_proc_setup(obddev);
-        if (rc != 0)
-                RETURN(rc);
+        OBD_ALLOC(ldlm, sizeof(*ldlm));
+        if (ldlm == NULL)
+                RETURN(-ENOMEM);
 
 #ifdef __KERNEL__
-        inter_module_register("ldlm_cli_cancel_unused", THIS_MODULE,
-                              ldlm_cli_cancel_unused);
-        inter_module_register("ldlm_namespace_cleanup", THIS_MODULE,
-                              ldlm_namespace_cleanup);
-        inter_module_register("ldlm_replay_locks", THIS_MODULE,
-                              ldlm_replay_locks);
+        rc = ldlm_proc_setup();
+        if (rc != 0)
+                GOTO(out_free, rc);
+#endif
 
         ldlm->ldlm_cb_service =
                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
                                 LDLM_MAXREQSIZE, LDLM_CB_REQUEST_PORTAL,
                                 LDLM_CB_REPLY_PORTAL,
-                                ldlm_callback_handler, "ldlm_cbd", obddev);
+                                ldlm_callback_handler, "ldlm_cbd", 
+                                ldlm_svc_proc_dir);
 
         if (!ldlm->ldlm_cb_service) {
                 CERROR("failed to start service\n");
@@ -897,34 +1121,52 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
                                 LDLM_MAXREQSIZE, LDLM_CANCEL_REQUEST_PORTAL,
                                 LDLM_CANCEL_REPLY_PORTAL,
-                                ldlm_cancel_handler, "ldlm_canceld", obddev);
+                                ldlm_cancel_handler, "ldlm_canceld", 
+                                ldlm_svc_proc_dir);
 
         if (!ldlm->ldlm_cancel_service) {
                 CERROR("failed to start service\n");
                 GOTO(out_proc, rc = -ENOMEM);
         }
 
+        OBD_ALLOC(blp, sizeof(*blp));
+        if (blp == NULL)
+                GOTO(out_proc, rc = -ENOMEM);
+        ldlm->ldlm_bl_pool = blp;
+
+        atomic_set(&blp->blp_num_threads, 0);
+        init_waitqueue_head(&blp->blp_waitq);
+        spin_lock_init(&blp->blp_lock);
+
+        INIT_LIST_HEAD(&blp->blp_list);
+
+#ifdef __KERNEL__
         for (i = 0; i < LDLM_NUM_THREADS; i++) {
-                char name[32];
-                sprintf(name, "ldlm_cn_%02d", i);
-                rc = ptlrpc_start_thread(obddev, ldlm->ldlm_cancel_service,
-                                         name);
-                if (rc) {
+                struct ldlm_bl_thread_data bltd = {
+                        .bltd_num = i,
+                        .bltd_blp = blp,
+                };
+                init_completion(&blp->blp_comp);
+                if (kernel_thread(ldlm_bl_thread_main, &bltd, 0) < 0) {
                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
                         LBUG();
                         GOTO(out_thread, rc);
                 }
+                wait_for_completion(&blp->blp_comp);
         }
 
-        for (i = 0; i < LDLM_NUM_THREADS; i++) {
-                char name[32];
-                sprintf(name, "ldlm_cb_%02d", i);
-                rc = ptlrpc_start_thread(obddev, ldlm->ldlm_cb_service, name);
-                if (rc) {
-                        CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
-                        LBUG();
-                        GOTO(out_thread, rc);
-                }
+        rc = ptlrpc_start_n_threads(NULL, ldlm->ldlm_cancel_service,
+                                    LDLM_NUM_THREADS, "ldlm_cn");
+        if (rc) {
+                LBUG();
+                GOTO(out_thread, rc);
+        }
+
+        rc = ptlrpc_start_n_threads(NULL, ldlm->ldlm_cb_service,
+                                    LDLM_NUM_THREADS, "ldlm_cb");
+        if (rc) {
+                LBUG();
+                GOTO(out_thread, rc);
         }
 
         INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
@@ -948,26 +1190,29 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
         init_timer(&waiting_locks_timer);
 #endif
 
-        ldlm_already_setup = 1;
-
         RETURN(0);
 
- out_thread:
 #ifdef __KERNEL__
-        ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
+ out_thread:
         ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
-        ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
         ptlrpc_unregister_service(ldlm->ldlm_cb_service);
 #endif
- out_proc:
-        ldlm_proc_cleanup(obddev);
 
+ out_proc:
+#ifdef __KERNEL__
+        ldlm_proc_cleanup();
+ out_free:
+#endif
+        OBD_FREE(ldlm, sizeof(*ldlm));
+        ldlm = NULL;
         return rc;
 }
 
-static int ldlm_cleanup(struct obd_device *obddev, int flags)
+static int ldlm_cleanup(int force)
 {
-        struct ldlm_obd *ldlm = &obddev->u.ldlm;
+#ifdef __KERNEL__
+        struct ldlm_bl_pool *blp = ldlm->ldlm_bl_pool;
+#endif
         ENTRY;
 
         if (!list_empty(&ldlm_namespace_list)) {
@@ -977,56 +1222,41 @@ static int ldlm_cleanup(struct obd_device *obddev, int flags)
         }
 
 #ifdef __KERNEL__
-        if (flags & OBD_OPT_FORCE) {
-                ptlrpc_put_ldlm_hooks();
-        } else if (ptlrpc_ldlm_hooks_referenced()) {
-                CERROR("Some connections weren't cleaned up; run lconf with "
-                       "--force to forcibly unload.\n");
-                ptlrpc_dump_connections();
-                RETURN(-EBUSY);
+        while (atomic_read(&blp->blp_num_threads) > 0) {
+                struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
+
+                init_completion(&blp->blp_comp);
+
+                spin_lock(&blp->blp_lock);
+                list_add_tail(&blwi.blwi_entry, &blp->blp_list);
+                wake_up(&blp->blp_waitq);
+                spin_unlock(&blp->blp_lock);
+
+                wait_for_completion(&blp->blp_comp);
         }
+        OBD_FREE(blp, sizeof(*blp));
 
         ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
         ptlrpc_unregister_service(ldlm->ldlm_cb_service);
         ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
         ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
-        ldlm_proc_cleanup(obddev);
+        ldlm_proc_cleanup();
 
         expired_lock_thread.elt_state = ELT_TERMINATE;
         wake_up(&expired_lock_thread.elt_waitq);
         wait_event(expired_lock_thread.elt_waitq,
                    expired_lock_thread.elt_state == ELT_STOPPED);
 
-        inter_module_unregister("ldlm_namespace_cleanup");
-        inter_module_unregister("ldlm_cli_cancel_unused");
-        inter_module_unregister("ldlm_replay_locks");
 #endif
 
-        ldlm_already_setup = 0;
-        RETURN(0);
-}
+        OBD_FREE(ldlm, sizeof(*ldlm));
+        ldlm = NULL;
 
-static int ldlm_connect(struct lustre_handle *conn, struct obd_device *src,
-                        struct obd_uuid *cluuid)
-{
-        return class_connect(conn, src, cluuid);
+        RETURN(0);
 }
 
-struct obd_ops ldlm_obd_ops = {
-        o_owner:       THIS_MODULE,
-        o_iocontrol:   ldlm_iocontrol,
-        o_setup:       ldlm_setup,
-        o_cleanup:     ldlm_cleanup,
-        o_connect:     ldlm_connect,
-        o_disconnect:  class_disconnect
-};
-
 int __init ldlm_init(void)
 {
-        int rc = class_register_type(&ldlm_obd_ops, 0, OBD_LDLM_DEVICENAME);
-        if (rc != 0)
-                return rc;
-
         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
                                                sizeof(struct ldlm_resource), 0,
                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
@@ -1046,15 +1276,19 @@ int __init ldlm_init(void)
         return 0;
 }
 
-static void __exit ldlm_exit(void)
+void __exit ldlm_exit(void)
 {
-        class_unregister_type(OBD_LDLM_DEVICENAME);
+        if ( ldlm_refcount )
+                CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
                 CERROR("couldn't free ldlm resource slab\n");
         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
                 CERROR("couldn't free ldlm lock slab\n");
 }
 
+/* ldlm_flock.c */
+EXPORT_SYMBOL(ldlm_flock_completion_ast);
+
 /* ldlm_lock.c */
 EXPORT_SYMBOL(ldlm_lock2desc);
 EXPORT_SYMBOL(ldlm_register_intent);
@@ -1097,6 +1331,8 @@ EXPORT_SYMBOL(ldlm_handle_enqueue);
 EXPORT_SYMBOL(ldlm_handle_cancel);
 EXPORT_SYMBOL(ldlm_handle_convert);
 EXPORT_SYMBOL(ldlm_del_waiting_lock);
+EXPORT_SYMBOL(ldlm_get_ref);
+EXPORT_SYMBOL(ldlm_put_ref);
 
 #if 0
 /* ldlm_test.c */
@@ -1116,22 +1352,17 @@ EXPORT_SYMBOL(l_lock);
 EXPORT_SYMBOL(l_unlock);
 
 /* ldlm_lib.c */
-EXPORT_SYMBOL(client_import_connect);
-EXPORT_SYMBOL(client_import_disconnect);
+EXPORT_SYMBOL(client_obd_setup);
+EXPORT_SYMBOL(client_obd_cleanup);
+EXPORT_SYMBOL(client_connect_import);
+EXPORT_SYMBOL(client_disconnect_export);
 EXPORT_SYMBOL(target_abort_recovery);
 EXPORT_SYMBOL(target_handle_connect);
+EXPORT_SYMBOL(target_destroy_export);
 EXPORT_SYMBOL(target_cancel_recovery_timer);
 EXPORT_SYMBOL(target_send_reply);
 EXPORT_SYMBOL(target_queue_recovery_request);
 EXPORT_SYMBOL(target_handle_ping);
 EXPORT_SYMBOL(target_handle_disconnect);
 EXPORT_SYMBOL(target_queue_final_reply);
-
-#ifdef __KERNEL__
-MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
-MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
-MODULE_LICENSE("GPL");
-
-module_init(ldlm_init);
-module_exit(ldlm_exit);
-#endif
+EXPORT_SYMBOL(ldlm_put_lock_into_req);