Whamcloud - gitweb
r=adilger,phil
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 64dfb52..6602713 100644 (file)
@@ -37,6 +37,7 @@
 
 #include <linux/lustre_dlm.h>
 #include <linux/obd_class.h>
+#include <portals/list.h>
 #include "ldlm_internal.h"
 
 extern kmem_cache_t *ldlm_resource_slab;
@@ -51,7 +52,7 @@ static int ldlm_refcount = 0;
 
 /* LDLM state */
 
-static struct ldlm_state *ldlm ;
+static struct ldlm_state *ldlm_state;
 
 inline unsigned long round_timeout(unsigned long timeout)
 {
@@ -388,24 +389,18 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         req->rq_send_state = LUSTRE_IMP_FULL;
         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
         rc = ptlrpc_queue_wait(req);
-        if (rc == -ETIMEDOUT || rc == -EINTR) {
-#ifdef __KERNEL__
-                ldlm_del_waiting_lock(lock);
-                ldlm_failed_ast(lock, rc, "blocking");
-#else
-                /* XXX
-                 * Here we treat all clients as liblustre. When BLOCKING AST
-                 * timeout we don't evicting the client and only cancel
-                 * the lock.
-                 * restore to orignial implementation later!!!
-                 * XXX
-                 */
-                CERROR("BLOCKING AST to client (nid "LPU64") timeout, "
-                       "simply cancel lock 0x%p\n",
-                       req->rq_peer.peer_nid, lock);
-                ldlm_lock_cancel(lock);
-                rc = -ERESTART;
-#endif
+        if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
+                LASSERT(lock->l_export);
+                if (lock->l_export->exp_libclient) {
+                        CDEBUG(D_HA, "BLOCKING AST to liblustre client (nid "
+                               LPU64") timeout, simply cancel lock 0x%p\n",
+                               req->rq_peer.peer_nid, lock);
+                        ldlm_lock_cancel(lock);
+                        rc = -ERESTART;
+                } else {
+                        ldlm_del_waiting_lock(lock);
+                        ldlm_failed_ast(lock, rc, "blocking");
+                }
         } else if (rc) {
                 if (rc == -EINVAL)
                         CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
@@ -451,13 +446,10 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         struct ptlrpc_request *req;
         struct timeval granted_time;
         long total_enqueue_wait;
-        int rc = 0, size = sizeof(*body);
+        int rc = 0, size[2] = {sizeof(*body)}, buffers = 1;
         ENTRY;
 
-        if (lock == NULL) {
-                LBUG();
-                RETURN(-EINVAL);
-        }
+        LASSERT(lock != NULL);
 
         do_gettimeofday(&granted_time);
         total_enqueue_wait = timeval_sub(&granted_time, &lock->l_enqueued_time);
@@ -465,9 +457,14 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (total_enqueue_wait / 1000000 > obd_timeout)
                 LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait);
 
+        if (lock->l_resource->lr_lvb_len) {
+                buffers = 2;
+                size[1] = lock->l_resource->lr_lvb_len;
+        }
+
         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
-                              LDLM_CP_CALLBACK, 1, &size, NULL);
-        if (!req)
+                              LDLM_CP_CALLBACK, buffers, size, NULL);
+        if (req == NULL)
                 RETURN(-ENOMEM);
 
         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
@@ -476,6 +473,13 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         body->lock_flags = flags;
         ldlm_lock2desc(lock, &body->lock_desc);
 
+        if (buffers == 2) {
+                void *lvb = lustre_msg_buf(req->rq_reqmsg, 1,
+                                           lock->l_resource->lr_lvb_len);
+                memcpy(lvb, lock->l_resource->lr_lvb_data,
+                       lock->l_resource->lr_lvb_len);
+        }
+
         LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
                    total_enqueue_wait);
         req->rq_replen = lustre_msg_size(0, NULL);
@@ -492,12 +496,15 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
         rc = ptlrpc_queue_wait(req);
-        if (rc == -ETIMEDOUT || rc == -EINTR) {
+        if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
                 ldlm_del_waiting_lock(lock);
                 ldlm_failed_ast(lock, rc, "completion");
+        } else if (rc == -EINVAL) {
+                LDLM_DEBUG(lock, "lost the race -- client no longer has this "
+                           "lock");
         } else if (rc) {
                 LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
-                           "completion AST\n", rc, req->rq_status);
+                           "completion AST", rc, req->rq_status);
                 ldlm_lock_cancel(lock);
                 /* Server-side AST functions are called from ldlm_reprocess_all,
                  * which needs to be told to please restart its reprocessing. */
@@ -508,16 +515,64 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         RETURN(rc);
 }
 
+int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
+{
+        struct ldlm_resource *res = lock->l_resource;
+        struct ldlm_request *body;
+        struct ptlrpc_request *req;
+        int rc = 0, size = sizeof(*body);
+        ENTRY;
+
+        LASSERT(lock != NULL);
+
+        req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
+                              LDLM_GL_CALLBACK, 1, &size, NULL);
+        if (req == NULL)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
+        memcpy(&body->lock_handle1, &lock->l_remote_handle,
+               sizeof(body->lock_handle1));
+        ldlm_lock2desc(lock, &body->lock_desc);
+
+        size = lock->l_resource->lr_lvb_len;
+        req->rq_replen = lustre_msg_size(1, &size);
+
+        req->rq_send_state = LUSTRE_IMP_FULL;
+        req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
+
+        rc = ptlrpc_queue_wait(req);
+        if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
+                ldlm_del_waiting_lock(lock);
+                ldlm_failed_ast(lock, rc, "glimpse");
+        } else if (rc == -EINVAL) {
+                LDLM_DEBUG(lock, "lost the race -- client no longer has this "
+                           "lock");
+        } else if (rc == -ELDLM_NO_LOCK_DATA) {
+                LDLM_DEBUG(lock, "lost a race -- client has a lock, but no "
+                           "inode");
+        } else if (rc) {
+                LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
+                           "glimpse AST", rc, req->rq_status);
+        } else {
+                rc = res->lr_namespace->ns_lvbo->lvbo_update
+                        (res, req->rq_repmsg, 0, 1);
+        }
+        ptlrpc_req_finished(req);
+        RETURN(rc);
+}
+
 int ldlm_handle_enqueue(struct ptlrpc_request *req,
                         ldlm_completion_callback completion_callback,
-                        ldlm_blocking_callback blocking_callback)
+                        ldlm_blocking_callback blocking_callback,
+                        ldlm_glimpse_callback glimpse_callback)
 {
         struct obd_device *obddev = req->rq_export->exp_obd;
         struct ldlm_reply *dlm_rep;
         struct ldlm_request *dlm_req;
-        int rc, size = sizeof(*dlm_rep), cookielen = 0;
+        int rc = 0, size[2] = {sizeof(*dlm_rep)};
         __u32 flags;
-        ldlm_error_t err;
+        ldlm_error_t err = ELDLM_OK;
         struct ldlm_lock *lock = NULL;
         void *cookie = NULL;
         ENTRY;
@@ -528,37 +583,20 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                                       lustre_swab_ldlm_request);
         if (dlm_req == NULL) {
                 CERROR ("Can't unpack dlm_req\n");
-                RETURN (-EFAULT);
+                GOTO(out, rc = -EFAULT);
         }
 
         flags = dlm_req->lock_flags;
-        if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN &&
-            (flags & LDLM_FL_HAS_INTENT)) {
-                /* In this case, the reply buffer is allocated deep in
-                 * local_lock_enqueue by the policy function. */
-                cookie = req;
-                cookielen = sizeof(*req);
-        } else {
-                rc = lustre_pack_reply(req, 1, &size, NULL);
-                if (rc) {
-                        CERROR("out of memory\n");
-                        RETURN(-ENOMEM);
-                }
-                if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) {
-                        cookie = &dlm_req->lock_desc.l_policy_data;
-                        cookielen = sizeof(ldlm_policy_data_t);
-                }
-        }
 
         /* The lock's callback data might be set in the policy function */
-        lock = ldlm_lock_create(obddev->obd_namespace,
-                                &dlm_req->lock_handle2,
+        lock = ldlm_lock_create(obddev->obd_namespace, &dlm_req->lock_handle2,
                                 dlm_req->lock_desc.l_resource.lr_name,
                                 dlm_req->lock_desc.l_resource.lr_type,
                                 dlm_req->lock_desc.l_req_mode,
-                                blocking_callback, completion_callback, NULL);
+                                blocking_callback, completion_callback,
+                                glimpse_callback, NULL, 0);
         if (!lock)
-                GOTO(out, err = -ENOMEM);
+                GOTO(out, rc = -ENOMEM);
 
         do_gettimeofday(&lock->l_enqueued_time);
         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
@@ -572,24 +610,38 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                  &lock->l_export->exp_ldlm_data.led_held_locks);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
-        err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen,
-                                &flags);
+        if (flags & LDLM_FL_HAS_INTENT) {
+                /* In this case, the reply buffer is allocated deep in
+                 * local_lock_enqueue by the policy function. */
+                cookie = req;
+        } else {
+                int buffers = 1;
+                if (lock->l_resource->lr_lvb_len) {
+                        size[1] = lock->l_resource->lr_lvb_len;
+                        buffers = 2;
+                }
+
+                rc = lustre_pack_reply(req, buffers, size, NULL);
+                if (rc)
+                        GOTO(out, rc);
+        }
+
+        if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
+                memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
+                       sizeof(ldlm_policy_data_t));
+        if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
+                memcpy(&lock->l_req_extent, &lock->l_policy_data.l_extent,
+                       sizeof(lock->l_req_extent));
+
+        err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, &flags);
         if (err)
                 GOTO(out, err);
 
         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
         dlm_rep->lock_flags = flags;
 
+        ldlm_lock2desc(lock, &dlm_rep->lock_desc);
         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
-        if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN) {
-                memcpy(&dlm_rep->lock_policy_data, &lock->l_policy_data,
-                       cookielen);
-        }
-        if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
-                memcpy(&dlm_rep->lock_resource_name, &lock->l_resource->lr_name,
-                       sizeof(dlm_rep->lock_resource_name));
-                dlm_rep->lock_mode = lock->l_req_mode;
-        }
 
         /* We never send a blocking AST until the lock is granted, but
          * we can tell it right now */
@@ -604,19 +656,33 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
         EXIT;
  out:
         req->rq_status = err;
+        if (req->rq_reply_state == NULL) {
+                err = lustre_pack_reply(req, 0, NULL, NULL);
+                if (rc == 0)
+                        rc = err;
+        }
 
         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
         if (lock) {
                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
-                           "(err=%d)", err);
+                           "(err=%d, rc=%d)", err, rc);
+
+                if (lock->l_resource->lr_lvb_len > 0) {
+                        void *lvb = lustre_msg_buf(req->rq_repmsg, 1,
+                                                  lock->l_resource->lr_lvb_len);
+                        memcpy(lvb, lock->l_resource->lr_lvb_data,
+                               lock->l_resource->lr_lvb_len);
+                }
+
                 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
                         ldlm_reprocess_all(lock->l_resource);
                 LDLM_LOCK_PUT(lock);
         }
-        LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
+        LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
+                          lock, rc);
 
-        return 0;
+        return rc;
 }
 
 int ldlm_handle_convert(struct ptlrpc_request *req)
@@ -668,11 +734,12 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
         struct ldlm_lock *lock;
+        struct ldlm_resource *res;
         char str[PTL_NALFMT_SIZE];
         int rc;
         ENTRY;
 
-        dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
+        dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
                                       lustre_swab_ldlm_request);
         if (dlm_req == NULL) {
                 CERROR("bad request buffer for cancel\n");
@@ -688,7 +755,9 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
         if (!lock) {
                 CERROR("received cancel for unknown lock cookie "LPX64
-                       " from nid "LPX64" (%s)\n", dlm_req->lock_handle1.cookie,
+                       " from client %s nid "LPX64" (%s)\n",
+                       dlm_req->lock_handle1.cookie,
+                       req->rq_export->exp_client_uuid.uuid,
                        req->rq_peer.peer_nid,
                        portals_nid2str(req->rq_peer.peer_ni->pni_number,
                                        req->rq_peer.peer_nid, str));
@@ -698,10 +767,18 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
                 req->rq_status = ESTALE;
         } else {
                 LDLM_DEBUG(lock, "server-side cancel handler START");
+                res = lock->l_resource;
+                if (res && res->lr_namespace->ns_lvbo &&
+                    res->lr_namespace->ns_lvbo->lvbo_update) {
+                        (void)res->lr_namespace->ns_lvbo->lvbo_update
+                                (res, NULL, 0, 0);
+                                //(res, req->rq_reqmsg, 1);
+                }
+
                 ldlm_lock_cancel(lock);
                 if (ldlm_del_waiting_lock(lock))
                         CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
-                req->rq_status = 0;
+                req->rq_status = rc;
         }
 
         if (ptlrpc_reply(req) != 0)
@@ -767,9 +844,12 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
                 LDLM_DEBUG(lock, "completion AST, new lock mode");
         }
-        if (lock->l_resource->lr_type != LDLM_PLAIN)
+
+        if (lock->l_resource->lr_type != LDLM_PLAIN) {
                 memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
                        sizeof(lock->l_policy_data));
+                LDLM_DEBUG(lock, "completion AST, new policy data");
+        }
 
         ldlm_resource_unlink_lock(lock);
         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
@@ -785,6 +865,18 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
         }
 
+        if (lock->l_lvb_len) {
+                void *lvb;
+                lvb = lustre_swab_reqbuf(req, 1, lock->l_lvb_len,
+                                         lock->l_lvb_swabber);
+                if (lvb == NULL) {
+                        LDLM_ERROR(lock, "completion AST did not contain "
+                                   "expected LVB!");
+                } else {
+                        memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
+                }
+        }
+
         lock->l_resource->lr_tmp = &ast_list;
         ldlm_grant_lock(lock, req, sizeof(*req), 1);
         lock->l_resource->lr_tmp = NULL;
@@ -799,20 +891,61 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
         EXIT;
 }
 
+static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
+                                    struct ldlm_namespace *ns,
+                                    struct ldlm_request *dlm_req,
+                                    struct ldlm_lock *lock)
+{
+        int rc = -ENOSYS;
+        ENTRY;
+
+        l_lock(&ns->ns_lock);
+        LDLM_DEBUG(lock, "client glimpse AST callback handler");
+
+        if (lock->l_glimpse_ast != NULL) {
+                l_unlock(&ns->ns_lock);
+                l_check_no_ns_lock(ns);
+                rc = lock->l_glimpse_ast(lock, req);
+                l_lock(&ns->ns_lock);
+        }
+
+        if (req->rq_repmsg != NULL) {
+                ptlrpc_reply(req);
+        } else {
+                req->rq_status = rc;
+                ptlrpc_error(req);
+        }
+
+        if (lock->l_granted_mode == LCK_PW &&
+            !lock->l_readers && !lock->l_writers &&
+            time_after(jiffies, lock->l_last_used + 10 * HZ)) {
+                l_unlock(&ns->ns_lock);
+                ldlm_handle_bl_callback(ns, NULL, lock);
+                EXIT;
+                return;
+        }
+
+        l_unlock(&ns->ns_lock);
+        LDLM_LOCK_PUT(lock);
+        EXIT;
+}
+
 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
 {
         req->rq_status = rc;
-        rc = lustre_pack_reply(req, 0, NULL, NULL);
-        if (rc)
-                return rc;
+        if (req->rq_reply_state == NULL) {
+                rc = lustre_pack_reply(req, 0, NULL, NULL);
+                if (rc)
+                        return rc;
+        }
         return ptlrpc_reply(req);
 }
 
 #ifdef __KERNEL__
-static int ldlm_bl_to_thread(struct ldlm_state *ldlm, struct ldlm_namespace *ns,
-                             struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+                      struct ldlm_lock *lock)
 {
-        struct ldlm_bl_pool *blp = ldlm->ldlm_bl_pool;
+        struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
         struct ldlm_bl_work_item *blwi;
         ENTRY;
 
@@ -821,7 +954,8 @@ static int ldlm_bl_to_thread(struct ldlm_state *ldlm, struct ldlm_namespace *ns,
                 RETURN(-ENOMEM);
 
         blwi->blwi_ns = ns;
-        blwi->blwi_ld = *ld;
+        if (ld != NULL)
+                blwi->blwi_ld = *ld;
         blwi->blwi_lock = lock;
 
         spin_lock(&blp->blp_lock);
@@ -869,56 +1003,48 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 RETURN(0);
         }
 
-        if (req->rq_reqmsg->opc == LDLM_BL_CALLBACK) {
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
-        } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
-        } else if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) {
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
-        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) {
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
-        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_NEXT_BLOCK) {
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
-        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_READ_HEADER) {
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
-        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CLOSE) {
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
-        } else {
-                ldlm_callback_reply(req, -EPROTO);
-                RETURN(0);
-        }
-
         LASSERT(req->rq_export != NULL);
         LASSERT(req->rq_export->exp_obd != NULL);
 
-        /* FIXME - how to send reply */
-        if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) {
-                int rc = llog_origin_handle_cancel(req);
+        switch(req->rq_reqmsg->opc) {
+        case LDLM_BL_CALLBACK:
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
+                break;
+        case LDLM_CP_CALLBACK:
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
+                break;
+        case LDLM_GL_CALLBACK:
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0);
+                break;
+        case OBD_LOG_CANCEL:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
+                rc = llog_origin_handle_cancel(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
-        }
-        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) {
-                int rc = llog_origin_handle_create(req);
-                req->rq_status = rc;
-                ptlrpc_reply(req);
+        case LLOG_ORIGIN_HANDLE_CREATE:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_create(req);
+                ldlm_callback_reply(req, rc);
                 RETURN(0);
-        }
-        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_NEXT_BLOCK) {
-                int rc = llog_origin_handle_next_block(req);
-                req->rq_status = rc;
-                ptlrpc_reply(req);
+        case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_next_block(req);
+                ldlm_callback_reply(req, rc);
                 RETURN(0);
-        }
-        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_READ_HEADER) {
-                int rc = llog_origin_handle_read_header(req);
-                req->rq_status = rc;
-                ptlrpc_reply(req);
+        case LLOG_ORIGIN_HANDLE_READ_HEADER:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_read_header(req);
+                ldlm_callback_reply(req, rc);
                 RETURN(0);
-        }
-        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CLOSE) {
-                int rc = llog_origin_handle_close(req);
+        case LLOG_ORIGIN_HANDLE_CLOSE:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_close(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
+        default:
+                CERROR("unknown opcode %u\n", req->rq_reqmsg->opc);
+                ldlm_callback_reply(req, -EPROTO);
+                RETURN(0);
         }
 
         ns = req->rq_export->exp_obd->obd_namespace;
@@ -951,14 +1077,12 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
          * cancelling right now, because it's unused, or have an intent result
          * in the reply, so we might have to push the responsibility for sending
          * the reply down into the AST handlers, alas. */
-        if (req->rq_reqmsg->opc != LDLM_BL_CALLBACK)
-                ldlm_callback_reply(req, 0);
 
         switch (req->rq_reqmsg->opc) {
         case LDLM_BL_CALLBACK:
                 CDEBUG(D_INODE, "blocking ast\n");
 #ifdef __KERNEL__
-                rc = ldlm_bl_to_thread(ldlm, ns, &dlm_req->lock_desc, lock);
+                rc = ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock);
                 ldlm_callback_reply(req, rc);
 #else
                 rc = 0;
@@ -968,8 +1092,13 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 break;
         case LDLM_CP_CALLBACK:
                 CDEBUG(D_INODE, "completion ast\n");
+                ldlm_callback_reply(req, 0);
                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
                 break;
+        case LDLM_GL_CALLBACK:
+                CDEBUG(D_INODE, "glimpse ast\n");
+                ldlm_handle_gl_callback(req, ns, dlm_req, lock);
+                break;
         default:
                 LBUG();                         /* checked above */
         }
@@ -1129,11 +1258,11 @@ static int ldlm_setup(void)
 #endif
         ENTRY;
 
-        if (ldlm != NULL)
+        if (ldlm_state != NULL)
                 RETURN(-EALREADY);
 
-        OBD_ALLOC(ldlm, sizeof(*ldlm));
-        if (ldlm == NULL)
+        OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
+        if (ldlm_state == NULL)
                 RETURN(-ENOMEM);
 
 #ifdef __KERNEL__
@@ -1142,26 +1271,25 @@ static int ldlm_setup(void)
                 GOTO(out_free, rc);
 #endif
 
-        ldlm->ldlm_cb_service =
-                ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
-                                LDLM_MAXREQSIZE, LDLM_CB_REQUEST_PORTAL,
-                                LDLM_CB_REPLY_PORTAL,
+        ldlm_state->ldlm_cb_service =
+                ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
+                                LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                                 ldlm_callback_handler, "ldlm_cbd",
                                 ldlm_svc_proc_dir);
 
-        if (!ldlm->ldlm_cb_service) {
+        if (!ldlm_state->ldlm_cb_service) {
                 CERROR("failed to start service\n");
                 GOTO(out_proc, rc = -ENOMEM);
         }
 
-        ldlm->ldlm_cancel_service =
-                ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
-                                LDLM_MAXREQSIZE, LDLM_CANCEL_REQUEST_PORTAL,
+        ldlm_state->ldlm_cancel_service =
+                ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
+                                LDLM_CANCEL_REQUEST_PORTAL,
                                 LDLM_CANCEL_REPLY_PORTAL,
                                 ldlm_cancel_handler, "ldlm_canceld",
                                 ldlm_svc_proc_dir);
 
-        if (!ldlm->ldlm_cancel_service) {
+        if (!ldlm_state->ldlm_cancel_service) {
                 CERROR("failed to start service\n");
                 GOTO(out_proc, rc = -ENOMEM);
         }
@@ -1169,7 +1297,7 @@ static int ldlm_setup(void)
         OBD_ALLOC(blp, sizeof(*blp));
         if (blp == NULL)
                 GOTO(out_proc, rc = -ENOMEM);
-        ldlm->ldlm_bl_pool = blp;
+        ldlm_state->ldlm_bl_pool = blp;
 
         atomic_set(&blp->blp_num_threads, 0);
         init_waitqueue_head(&blp->blp_waitq);
@@ -1193,14 +1321,14 @@ static int ldlm_setup(void)
                 wait_for_completion(&blp->blp_comp);
         }
 
-        rc = ptlrpc_start_n_threads(NULL, ldlm->ldlm_cancel_service,
+        rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cancel_service,
                                     LDLM_NUM_THREADS, "ldlm_cn");
         if (rc) {
                 LBUG();
                 GOTO(out_thread, rc);
         }
 
-        rc = ptlrpc_start_n_threads(NULL, ldlm->ldlm_cb_service,
+        rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cb_service,
                                     LDLM_NUM_THREADS, "ldlm_cb");
         if (rc) {
                 LBUG();
@@ -1232,8 +1360,8 @@ static int ldlm_setup(void)
 
 #ifdef __KERNEL__
  out_thread:
-        ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
-        ptlrpc_unregister_service(ldlm->ldlm_cb_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
 #endif
 
  out_proc:
@@ -1241,15 +1369,15 @@ static int ldlm_setup(void)
         ldlm_proc_cleanup();
  out_free:
 #endif
-        OBD_FREE(ldlm, sizeof(*ldlm));
-        ldlm = NULL;
+        OBD_FREE(ldlm_state, sizeof(*ldlm_state));
+        ldlm_state = NULL;
         return rc;
 }
 
 static int ldlm_cleanup(int force)
 {
 #ifdef __KERNEL__
-        struct ldlm_bl_pool *blp = ldlm->ldlm_bl_pool;
+        struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
 #endif
         ENTRY;
 
@@ -1274,10 +1402,10 @@ static int ldlm_cleanup(int force)
         }
         OBD_FREE(blp, sizeof(*blp));
 
-        ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
-        ptlrpc_unregister_service(ldlm->ldlm_cb_service);
-        ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
-        ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
+        ptlrpc_stop_all_threads(ldlm_state->ldlm_cb_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
+        ptlrpc_stop_all_threads(ldlm_state->ldlm_cancel_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
         ldlm_proc_cleanup();
 
         expired_lock_thread.elt_state = ELT_TERMINATE;
@@ -1287,8 +1415,8 @@ static int ldlm_cleanup(int force)
 
 #endif
 
-        OBD_FREE(ldlm, sizeof(*ldlm));
-        ldlm = NULL;
+        OBD_FREE(ldlm_state, sizeof(*ldlm_state));
+        ldlm_state = NULL;
 
         RETURN(0);
 }
@@ -1327,14 +1455,18 @@ void __exit ldlm_exit(void)
 /* ldlm_flock.c */
 EXPORT_SYMBOL(ldlm_flock_completion_ast);
 
+/* ldlm_extent.c */
+EXPORT_SYMBOL(ldlm_extent_shift_kms);
+
 /* ldlm_lock.c */
+EXPORT_SYMBOL(ldlm_get_processing_policy);
 EXPORT_SYMBOL(ldlm_lock2desc);
 EXPORT_SYMBOL(ldlm_register_intent);
-EXPORT_SYMBOL(ldlm_unregister_intent);
 EXPORT_SYMBOL(ldlm_lockname);
 EXPORT_SYMBOL(ldlm_typename);
 EXPORT_SYMBOL(ldlm_lock2handle);
 EXPORT_SYMBOL(__ldlm_handle2lock);
+EXPORT_SYMBOL(ldlm_lock_get);
 EXPORT_SYMBOL(ldlm_lock_put);
 EXPORT_SYMBOL(ldlm_lock_match);
 EXPORT_SYMBOL(ldlm_lock_cancel);
@@ -1348,6 +1480,7 @@ EXPORT_SYMBOL(ldlm_lock_dump);
 EXPORT_SYMBOL(ldlm_lock_dump_handle);
 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
+EXPORT_SYMBOL(ldlm_lock_allow_match);
 
 /* ldlm_request.c */
 EXPORT_SYMBOL(ldlm_completion_ast);
@@ -1365,6 +1498,7 @@ EXPORT_SYMBOL(ldlm_change_cbdata);
 /* ldlm_lockd.c */
 EXPORT_SYMBOL(ldlm_server_blocking_ast);
 EXPORT_SYMBOL(ldlm_server_completion_ast);
+EXPORT_SYMBOL(ldlm_server_glimpse_ast);
 EXPORT_SYMBOL(ldlm_handle_enqueue);
 EXPORT_SYMBOL(ldlm_handle_cancel);
 EXPORT_SYMBOL(ldlm_handle_convert);
@@ -1383,6 +1517,10 @@ EXPORT_SYMBOL(ldlm_regression_stop);
 EXPORT_SYMBOL(ldlm_namespace_new);
 EXPORT_SYMBOL(ldlm_namespace_cleanup);
 EXPORT_SYMBOL(ldlm_namespace_free);
+EXPORT_SYMBOL(ldlm_namespace_dump);
+EXPORT_SYMBOL(ldlm_dump_all_namespaces);
+EXPORT_SYMBOL(ldlm_resource_get);
+EXPORT_SYMBOL(ldlm_resource_putref);
 
 /* l_lock.c */
 EXPORT_SYMBOL(l_lock);
@@ -1402,4 +1540,3 @@ EXPORT_SYMBOL(target_queue_recovery_request);
 EXPORT_SYMBOL(target_handle_ping);
 EXPORT_SYMBOL(target_handle_disconnect);
 EXPORT_SYMBOL(target_queue_final_reply);
-EXPORT_SYMBOL(ldlm_put_lock_into_req);