Whamcloud - gitweb
LU-2835 ptlrpc: Fix race during exp_flock_hash creation
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 464a19a..1cfd813 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2010, 2012, Intel Corporation.
+ * Copyright (c) 2010, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -60,8 +60,8 @@ static char *ldlm_cpts;
 CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444,
                "CPU partitions ldlm threads should run on");
 
-extern cfs_mem_cache_t *ldlm_resource_slab;
-extern cfs_mem_cache_t *ldlm_lock_slab;
+extern struct kmem_cache *ldlm_resource_slab;
+extern struct kmem_cache *ldlm_lock_slab;
 static struct mutex    ldlm_ref_mutex;
 static int ldlm_refcount;
 
@@ -124,7 +124,7 @@ struct ldlm_bl_work_item {
         cfs_list_t              blwi_head;
         int                     blwi_count;
        struct completion        blwi_comp;
-        int                     blwi_mode;
+       ldlm_cancel_flags_t     blwi_flags;
         int                     blwi_mem_pressure;
 };
 
@@ -178,7 +178,6 @@ static int expired_lock_main(void *arg)
         int do_dump;
 
         ENTRY;
-        cfs_daemonize("ldlm_elt");
 
         expired_lock_thread.elt_state = ELT_READY;
         cfs_waitq_signal(&expired_lock_thread.elt_waitq);
@@ -213,14 +212,15 @@ static int expired_lock_main(void *arg)
 
                         lock = cfs_list_entry(expired->next, struct ldlm_lock,
                                           l_pending_chain);
-                        if ((void *)lock < LP_POISON + CFS_PAGE_SIZE &&
-                            (void *)lock >= LP_POISON) {
+                       if ((void *)lock < LP_POISON + PAGE_CACHE_SIZE &&
+                           (void *)lock >= LP_POISON) {
                                spin_unlock_bh(&waiting_locks_spinlock);
-                                CERROR("free lock on elt list %p\n", lock);
-                                LBUG();
-                        }
-                        cfs_list_del_init(&lock->l_pending_chain);
-                        if ((void *)lock->l_export < LP_POISON + CFS_PAGE_SIZE &&
+                               CERROR("free lock on elt list %p\n", lock);
+                               LBUG();
+                       }
+                       cfs_list_del_init(&lock->l_pending_chain);
+                       if ((void *)lock->l_export <
+                            LP_POISON + PAGE_CACHE_SIZE &&
                             (void *)lock->l_export >= LP_POISON) {
                                 CERROR("lock with free export on elt list %p\n",
                                        lock->l_export);
@@ -233,7 +233,7 @@ static int expired_lock_main(void *arg)
                                 continue;
                         }
 
-                       if (lock->l_destroyed) {
+                       if (lock->l_flags & LDLM_FL_DESTROYED) {
                                /* release the lock refcount where
                                 * waiting_locks_callback() founds */
                                LDLM_LOCK_RELEASE(lock);
@@ -324,7 +324,7 @@ static void waiting_locks_callback(unsigned long unused)
                                    libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
 
                         cfs_list_del_init(&lock->l_pending_chain);
-                       if (lock->l_destroyed) {
+                       if (lock->l_flags & LDLM_FL_DESTROYED) {
                                /* relay the lock refcount decrease to
                                 * expired lock thread */
                                cfs_list_add(&lock->l_pending_chain,
@@ -347,7 +347,7 @@ static void waiting_locks_callback(unsigned long unused)
                                    libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
 
                         cfs_list_del_init(&lock->l_pending_chain);
-                       if (lock->l_destroyed) {
+                       if (lock->l_flags & LDLM_FL_DESTROYED) {
                                /* relay the lock refcount decrease to
                                 * expired lock thread */
                                cfs_list_add(&lock->l_pending_chain,
@@ -468,13 +468,13 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
        int timeout = ldlm_get_enq_timeout(lock);
 
        /* NB: must be called with hold of lock_res_and_lock() */
-       LASSERT(lock->l_res_locked);
-       lock->l_waited = 1;
+       LASSERT(lock->l_flags & LDLM_FL_RES_LOCKED);
+       lock->l_flags |= LDLM_FL_WAITED;
 
        LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
 
        spin_lock_bh(&waiting_locks_spinlock);
-       if (lock->l_destroyed) {
+       if (lock->l_flags & LDLM_FL_DESTROYED) {
                static cfs_time_t next;
                spin_unlock_bh(&waiting_locks_spinlock);
                 LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
@@ -619,8 +619,8 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
 # ifdef HAVE_SERVER_SUPPORT
 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
 {
-       LASSERT(lock->l_res_locked);
-       LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
+       LASSERT((lock->l_flags & (LDLM_FL_RES_LOCKED|LDLM_FL_CANCEL_ON_BLOCK))
+               == LDLM_FL_RES_LOCKED);
        RETURN(1);
 }
 
@@ -797,25 +797,28 @@ static inline int ldlm_ast_fini(struct ptlrpc_request *req,
  */
 static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
 {
-        struct ptlrpc_request *req;
-        ENTRY;
+       struct ptlrpc_request *req;
+       ENTRY;
 
-        if (lock->l_export == NULL) {
-                LDLM_DEBUG(lock, "client lock: no-op");
-                RETURN_EXIT;
-        }
+       if (lock->l_export == NULL) {
+               LDLM_DEBUG(lock, "client lock: no-op");
+               RETURN_EXIT;
+       }
 
        spin_lock_bh(&lock->l_export->exp_rpc_lock);
-        cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
-                                rq_exp_list) {
-                /* Do not process requests that were not yet added to there
-                 * incoming queue or were already removed from there for
-                 * processing */
-                if (!req->rq_hp && !cfs_list_empty(&req->rq_list) &&
-                    req->rq_ops->hpreq_lock_match &&
-                    req->rq_ops->hpreq_lock_match(req, lock))
-                        ptlrpc_hpreq_reorder(req);
-        }
+       cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
+                               rq_exp_list) {
+               /* Do not process requests that were not yet added to there
+                * incoming queue or were already removed from there for
+                * processing. We evaluate ptlrpc_nrs_req_can_move() without
+                * holding svcpt->scp_req_lock, and then redo the check with
+                * the lock held once we need to obtain a reliable result.
+                */
+               if (ptlrpc_nrs_req_can_move(req) &&
+                   req->rq_ops->hpreq_lock_match &&
+                   req->rq_ops->hpreq_lock_match(req, lock))
+                       ptlrpc_nrs_req_hp_move(req);
+       }
        spin_unlock_bh(&lock->l_export->exp_rpc_lock);
        EXIT;
 }
@@ -875,7 +878,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                RETURN(0);
        }
 
-       if (lock->l_destroyed) {
+       if (lock->l_flags & LDLM_FL_DESTROYED) {
                /* What's the point? */
                unlock_res_and_lock(lock);
                ptlrpc_req_finished(req);
@@ -950,6 +953,13 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
 
        /* server namespace, doesn't need lock */
        lvb_len = ldlm_lvbo_size(lock);
+       /* LU-3124 & LU-2187: to not return layout in completion AST because
+        * it may deadlock for LU-2187, or client may not have enough space
+        * for large layout. The layout will be returned to client with an
+        * extra RPC to fetch xattr.lov */
+       if (ldlm_has_layout(lock))
+               lvb_len = 0;
+
        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT, lvb_len);
         rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
         if (rc) {
@@ -973,8 +983,20 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
                void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
 
                lvb_len = ldlm_lvbo_fill(lock, lvb, lvb_len);
-               req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB,
-                                  lvb_len, RCL_CLIENT);
+               if (lvb_len < 0) {
+                       /* We still need to send the RPC to wake up the blocked
+                        * enqueue thread on the client.
+                        *
+                        * Consider old client, there is no better way to notify
+                        * the failure, just zero-sized the LVB, then the client
+                        * will fail out as "-EPROTO". */
+                       req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, 0,
+                                          RCL_CLIENT);
+                       instant_cancel = 1;
+               } else {
+                       req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len,
+                                          RCL_CLIENT);
+               }
         }
 
         LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
@@ -1035,7 +1057,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
 
        rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
 
-        RETURN(rc);
+       RETURN(lvb_len < 0 ? lvb_len : rc);
 }
 EXPORT_SYMBOL(ldlm_server_completion_ast);
 
@@ -1221,7 +1243,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                 GOTO(out, rc = -EFAULT);
         }
 
-        if (req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) {
+       if (exp_connect_flags(req->rq_export) & OBD_CONNECT_IBITS) {
                 if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
                              LDLM_PLAIN)) {
                         DEBUG_REQ(D_ERROR, req,
@@ -1242,7 +1264,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
 
         /* INODEBITS_INTEROP: Perform conversion from plain lock to
          * inodebits lock if client does not support them. */
-        if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) &&
+       if (!(exp_connect_flags(req->rq_export) & OBD_CONNECT_IBITS) &&
             (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN)) {
                 dlm_req->lock_desc.l_resource.lr_type = LDLM_IBITS;
                 dlm_req->lock_desc.l_policy_data.l_inodebits.bits =
@@ -1324,8 +1346,11 @@ existing_lock:
                 lock->l_req_extent = lock->l_policy_data.l_extent;
 
        err = ldlm_lock_enqueue(ns, &lock, cookie, &flags);
-        if (err)
-                GOTO(out, err);
+       if (err) {
+               if ((int)err < 0)
+                       rc = (int)err;
+               GOTO(out, err);
+       }
 
         dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
        dlm_rep->lock_flags = ldlm_flags_to_wire(flags);
@@ -1424,8 +1449,12 @@ existing_lock:
                                buflen = req_capsule_get_size(&req->rq_pill,
                                                &RMF_DLM_LVB, RCL_SERVER);
                                buflen = ldlm_lvbo_fill(lock, buf, buflen);
-                               req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB,
-                                                  buflen, RCL_SERVER);
+                               if (buflen >= 0)
+                                       req_capsule_shrink(&req->rq_pill,
+                                                          &RMF_DLM_LVB,
+                                                          buflen, RCL_SERVER);
+                               else
+                                       rc = buflen;
                        }
                 } else {
                         lock_res_and_lock(lock);
@@ -1499,7 +1528,7 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
 
         lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
         if (!lock) {
-                req->rq_status = EINVAL;
+               req->rq_status = LUSTRE_EINVAL;
         } else {
                 void *res = NULL;
 
@@ -1513,7 +1542,7 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
                                 LDLM_DEBUG(lock, "converted waiting lock");
                         req->rq_status = 0;
                 } else {
-                        req->rq_status = EDEADLOCK;
+                       req->rq_status = LUSTRE_EDEADLK;
                 }
         }
 
@@ -1646,7 +1675,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
                 RETURN(rc);
 
         if (!ldlm_request_cancel(req, dlm_req, 0))
-                req->rq_status = ESTALE;
+               req->rq_status = LUSTRE_ESTALE;
 
         RETURN(ptlrpc_reply(req));
 }
@@ -1702,22 +1731,22 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                                     struct ldlm_lock *lock)
 {
        int lvb_len;
-        CFS_LIST_HEAD(ast_list);
+       CFS_LIST_HEAD(ast_list);
        int rc = 0;
-        ENTRY;
+       ENTRY;
 
-        LDLM_DEBUG(lock, "client completion callback handler START");
+       LDLM_DEBUG(lock, "client completion callback handler START");
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
-                int to = cfs_time_seconds(1);
-                while (to > 0) {
-                        cfs_schedule_timeout_and_set_state(
-                                CFS_TASK_INTERRUPTIBLE, to);
-                        if (lock->l_granted_mode == lock->l_req_mode ||
-                            lock->l_destroyed)
-                                break;
-                }
-        }
+       if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
+               int to = cfs_time_seconds(1);
+               while (to > 0) {
+                       cfs_schedule_timeout_and_set_state(
+                               CFS_TASK_INTERRUPTIBLE, to);
+                       if (lock->l_granted_mode == lock->l_req_mode ||
+                           lock->l_flags & LDLM_FL_DESTROYED)
+                               break;
+               }
+       }
 
        lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
        if (lvb_len < 0) {
@@ -1753,29 +1782,29 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                }
        }
 
-        lock_res_and_lock(lock);
-        if (lock->l_destroyed ||
-            lock->l_granted_mode == lock->l_req_mode) {
-                /* bug 11300: the lock has already been granted */
-                unlock_res_and_lock(lock);
-                LDLM_DEBUG(lock, "Double grant race happened");
+       lock_res_and_lock(lock);
+       if ((lock->l_flags & LDLM_FL_DESTROYED) ||
+           lock->l_granted_mode == lock->l_req_mode) {
+               /* bug 11300: the lock has already been granted */
+               unlock_res_and_lock(lock);
+               LDLM_DEBUG(lock, "Double grant race happened");
                GOTO(out, rc = 0);
-        }
+       }
 
-        /* If we receive the completion AST before the actual enqueue returned,
-         * then we might need to switch lock modes, resources, or extents. */
-        if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
-                lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
-                LDLM_DEBUG(lock, "completion AST, new lock mode");
-        }
+       /* If we receive the completion AST before the actual enqueue returned,
+        * then we might need to switch lock modes, resources, or extents. */
+       if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
+               lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
+               LDLM_DEBUG(lock, "completion AST, new lock mode");
+       }
 
-        if (lock->l_resource->lr_type != LDLM_PLAIN) {
-                ldlm_convert_policy_to_local(req->rq_export,
-                                          dlm_req->lock_desc.l_resource.lr_type,
-                                          &dlm_req->lock_desc.l_policy_data,
-                                          &lock->l_policy_data);
-                LDLM_DEBUG(lock, "completion AST, new policy data");
-        }
+       if (lock->l_resource->lr_type != LDLM_PLAIN) {
+               ldlm_convert_policy_to_local(req->rq_export,
+                                         dlm_req->lock_desc.l_resource.lr_type,
+                                         &dlm_req->lock_desc.l_policy_data,
+                                         &lock->l_policy_data);
+               LDLM_DEBUG(lock, "completion AST, new policy data");
+       }
 
         ldlm_resource_unlink_lock(lock);
         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
@@ -1895,7 +1924,8 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
 }
 
 #ifdef __KERNEL__
-static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, int mode)
+static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
+                              ldlm_cancel_flags_t cancel_flags)
 {
        struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
        ENTRY;
@@ -1913,29 +1943,29 @@ static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, int mode)
 
        cfs_waitq_signal(&blp->blp_waitq);
 
-       /* can not use blwi->blwi_mode as blwi could be already freed in
-          LDLM_ASYNC mode */
-       if (mode == LDLM_SYNC)
+       /* can not check blwi->blwi_flags as blwi could be already freed in
+          LCF_ASYNC mode */
+       if (!(cancel_flags & LCF_ASYNC))
                wait_for_completion(&blwi->blwi_comp);
 
        RETURN(0);
 }
 
 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
-                             struct ldlm_namespace *ns,
-                             struct ldlm_lock_desc *ld,
-                             cfs_list_t *cancels, int count,
-                             struct ldlm_lock *lock,
-                             int mode)
+                            struct ldlm_namespace *ns,
+                            struct ldlm_lock_desc *ld,
+                            cfs_list_t *cancels, int count,
+                            struct ldlm_lock *lock,
+                            ldlm_cancel_flags_t cancel_flags)
 {
        init_completion(&blwi->blwi_comp);
         CFS_INIT_LIST_HEAD(&blwi->blwi_head);
 
-        if (cfs_memory_pressure_get())
+       if (memory_pressure_get())
                 blwi->blwi_mem_pressure = 1;
 
         blwi->blwi_ns = ns;
-        blwi->blwi_mode = mode;
+       blwi->blwi_flags = cancel_flags;
         if (ld != NULL)
                 blwi->blwi_ld = *ld;
         if (count) {
@@ -1957,52 +1987,57 @@ static inline void init_blwi(struct ldlm_bl_work_item *blwi,
  * call ->l_blocking_ast itself.
  */
 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
-                             struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
-                             cfs_list_t *cancels, int count, int mode)
+                            struct ldlm_lock_desc *ld,
+                            struct ldlm_lock *lock,
+                            cfs_list_t *cancels, int count,
+                            ldlm_cancel_flags_t cancel_flags)
 {
-        ENTRY;
+       ENTRY;
 
-        if (cancels && count == 0)
-                RETURN(0);
+       if (cancels && count == 0)
+               RETURN(0);
 
-        if (mode == LDLM_SYNC) {
-                /* if it is synchronous call do minimum mem alloc, as it could
-                 * be triggered from kernel shrinker
-                 */
-                struct ldlm_bl_work_item blwi;
-                memset(&blwi, 0, sizeof(blwi));
-                init_blwi(&blwi, ns, ld, cancels, count, lock, LDLM_SYNC);
-                RETURN(__ldlm_bl_to_thread(&blwi, LDLM_SYNC));
-        } else {
-                struct ldlm_bl_work_item *blwi;
-                OBD_ALLOC(blwi, sizeof(*blwi));
-                if (blwi == NULL)
-                        RETURN(-ENOMEM);
-                init_blwi(blwi, ns, ld, cancels, count, lock, LDLM_ASYNC);
+       if (cancel_flags & LCF_ASYNC) {
+               struct ldlm_bl_work_item *blwi;
 
-                RETURN(__ldlm_bl_to_thread(blwi, LDLM_ASYNC));
-        }
+               OBD_ALLOC(blwi, sizeof(*blwi));
+               if (blwi == NULL)
+                       RETURN(-ENOMEM);
+               init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
+
+               RETURN(__ldlm_bl_to_thread(blwi, cancel_flags));
+       } else {
+               /* if it is synchronous call do minimum mem alloc, as it could
+                * be triggered from kernel shrinker
+                */
+               struct ldlm_bl_work_item blwi;
+
+               memset(&blwi, 0, sizeof(blwi));
+               init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
+               RETURN(__ldlm_bl_to_thread(&blwi, cancel_flags));
+       }
 }
 
 #endif
 
 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
-                           struct ldlm_lock *lock)
+                          struct ldlm_lock *lock)
 {
 #ifdef __KERNEL__
-        RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC));
+       return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
 #else
-        RETURN(-ENOSYS);
+       return -ENOSYS;
 #endif
 }
 
 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
-                           cfs_list_t *cancels, int count, int mode)
+                          cfs_list_t *cancels, int count,
+                          ldlm_cancel_flags_t cancel_flags)
 {
 #ifdef __KERNEL__
-        RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode));
+       return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
 #else
-        RETURN(-ENOSYS);
+       return -ENOSYS;
 #endif
 }
 
@@ -2075,6 +2110,8 @@ static int ldlm_handle_qc_callback(struct ptlrpc_request *req)
                RETURN(-EPROTO);
        }
 
+       oqctl->qc_stat = ptlrpc_status_ntoh(oqctl->qc_stat);
+
        cli->cl_qchk_stat = oqctl->qc_stat;
        return 0;
 }
@@ -2126,16 +2163,6 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 rc = ldlm_handle_setinfo(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
-        case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */
-                CERROR("shouldn't be handling OBD_LOG_CANCEL on DLM thread\n");
-                req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
-                        RETURN(0);
-                rc = llog_origin_handle_cancel(req);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
-                        RETURN(0);
-                ldlm_callback_reply(req, rc);
-                RETURN(0);
         case LLOG_ORIGIN_HANDLE_CREATE:
                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
@@ -2196,7 +2223,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
          * which the server has already started a blocking callback on. */
         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
-                rc = ldlm_cli_cancel(&dlm_req->lock_handle[0]);
+               rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
                 if (rc < 0)
                         CERROR("ldlm_cli_cancel: %d\n", rc);
         }
@@ -2337,15 +2364,6 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
                 if (rc)
                         break;
                 RETURN(0);
-        case OBD_LOG_CANCEL:
-                req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
-                        RETURN(0);
-                rc = llog_origin_handle_cancel(req);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
-                        RETURN(0);
-                ldlm_callback_reply(req, rc);
-                RETURN(0);
         default:
                 CERROR("invalid opcode %d\n",
                        lustre_msg_get_opc(req->rq_reqmsg));
@@ -2545,14 +2563,17 @@ static int ldlm_bl_thread_main(void *arg);
 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
 {
        struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
-       int rc;
+       cfs_task_t *task;
 
        init_completion(&bltd.bltd_comp);
-       rc = cfs_create_thread(ldlm_bl_thread_main, &bltd, 0);
-       if (rc < 0) {
-               CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n",
-                      cfs_atomic_read(&blp->blp_num_threads), rc);
-               return rc;
+       bltd.bltd_num = cfs_atomic_read(&blp->blp_num_threads);
+       snprintf(bltd.bltd_name, sizeof(bltd.bltd_name) - 1,
+               "ldlm_bl_%02d", bltd.bltd_num);
+       task = kthread_run(ldlm_bl_thread_main, &bltd, bltd.bltd_name);
+       if (IS_ERR(task)) {
+               CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
+                      cfs_atomic_read(&blp->blp_num_threads), PTR_ERR(task));
+               return PTR_ERR(task);
        }
        wait_for_completion(&bltd.bltd_comp);
 
@@ -2576,14 +2597,9 @@ static int ldlm_bl_thread_main(void *arg)
 
                 blp = bltd->bltd_blp;
 
-                bltd->bltd_num =
-                        cfs_atomic_inc_return(&blp->blp_num_threads) - 1;
+               cfs_atomic_inc(&blp->blp_num_threads);
                 cfs_atomic_inc(&blp->blp_busy_threads);
 
-                snprintf(bltd->bltd_name, sizeof(bltd->bltd_name) - 1,
-                        "ldlm_bl_%02d", bltd->bltd_num);
-                cfs_daemonize(bltd->bltd_name);
-
                complete(&bltd->bltd_comp);
                 /* cannot use bltd after this, it is only on caller's stack */
         }
@@ -2617,7 +2633,7 @@ static int ldlm_bl_thread_main(void *arg)
                         ldlm_bl_thread_start(blp);
 
                 if (blwi->blwi_mem_pressure)
-                        cfs_memory_pressure_set();
+                       memory_pressure_set();
 
                 if (blwi->blwi_count) {
                         int count;
@@ -2628,17 +2644,18 @@ static int ldlm_bl_thread_main(void *arg)
                         count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
                                                            blwi->blwi_count,
                                                            LCF_BL_AST);
-                        ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, 0);
+                       ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
+                                            blwi->blwi_flags);
                 } else {
                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
                                                 blwi->blwi_lock);
                 }
                 if (blwi->blwi_mem_pressure)
-                        cfs_memory_pressure_clr();
+                       memory_pressure_clr();
 
-                if (blwi->blwi_mode == LDLM_ASYNC)
-                        OBD_FREE(blwi, sizeof(*blwi));
-                else
+               if (blwi->blwi_flags & LCF_ASYNC)
+                       OBD_FREE(blwi, sizeof(*blwi));
+               else
                        complete(&blwi->blwi_comp);
         }
 
@@ -2758,6 +2775,7 @@ static cfs_hash_ops_t ldlm_export_lock_ops = {
 
 int ldlm_init_export(struct obd_export *exp)
 {
+       int rc;
         ENTRY;
 
         exp->exp_lock_hash =
@@ -2773,7 +2791,14 @@ int ldlm_init_export(struct obd_export *exp)
         if (!exp->exp_lock_hash)
                 RETURN(-ENOMEM);
 
+       rc = ldlm_init_flock_export(exp);
+       if (rc)
+               GOTO(err, rc);
+
         RETURN(0);
+err:
+       ldlm_destroy_export(exp);
+       RETURN(rc);
 }
 EXPORT_SYMBOL(ldlm_init_export);
 
@@ -2816,7 +2841,7 @@ static int ldlm_setup(void)
                .psc_name               = "ldlm_cbd",
                .psc_watchdog_factor    = 2,
                .psc_buf                = {
-                       .bc_nbufs               = LDLM_NBUFS,
+                       .bc_nbufs               = LDLM_CLIENT_NBUFS,
                        .bc_buf_size            = LDLM_BUFSIZE,
                        .bc_req_max_size        = LDLM_MAXREQSIZE,
                        .bc_rep_max_size        = LDLM_MAXREPSIZE,
@@ -2855,7 +2880,7 @@ static int ldlm_setup(void)
                .psc_name               = "ldlm_canceld",
                .psc_watchdog_factor    = 6,
                .psc_buf                = {
-                       .bc_nbufs               = LDLM_NBUFS,
+                       .bc_nbufs               = LDLM_SERVER_NBUFS,
                        .bc_buf_size            = LDLM_BUFSIZE,
                        .bc_req_max_size        = LDLM_MAXREQSIZE,
                        .bc_rep_max_size        = LDLM_MAXREPSIZE,
@@ -2922,22 +2947,22 @@ static int ldlm_setup(void)
        }
 
 # ifdef HAVE_SERVER_SUPPORT
-        CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
-        expired_lock_thread.elt_state = ELT_STOPPED;
-        cfs_waitq_init(&expired_lock_thread.elt_waitq);
+       CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
+       expired_lock_thread.elt_state = ELT_STOPPED;
+       cfs_waitq_init(&expired_lock_thread.elt_waitq);
 
-        CFS_INIT_LIST_HEAD(&waiting_locks_list);
+       CFS_INIT_LIST_HEAD(&waiting_locks_list);
        spin_lock_init(&waiting_locks_spinlock);
-        cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
+       cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
 
-        rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS);
-       if (rc < 0) {
+       rc = PTR_ERR(kthread_run(expired_lock_main, NULL, "ldlm_elt"));
+       if (IS_ERR_VALUE(rc)) {
                CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
                GOTO(out, rc);
        }
 
-        cfs_wait_event(expired_lock_thread.elt_waitq,
-                       expired_lock_thread.elt_state == ELT_READY);
+       cfs_wait_event(expired_lock_thread.elt_waitq,
+                      expired_lock_thread.elt_state == ELT_READY);
 # endif /* HAVE_SERVER_SUPPORT */
 
        rc = ldlm_pools_init();
@@ -3019,26 +3044,26 @@ int ldlm_init(void)
        mutex_init(&ldlm_ref_mutex);
        mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
        mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
-        ldlm_resource_slab = cfs_mem_cache_create("ldlm_resources",
-                                               sizeof(struct ldlm_resource), 0,
-                                               CFS_SLAB_HWCACHE_ALIGN);
-        if (ldlm_resource_slab == NULL)
-                return -ENOMEM;
+       ldlm_resource_slab = kmem_cache_create("ldlm_resources",
+                                              sizeof(struct ldlm_resource), 0,
+                                              SLAB_HWCACHE_ALIGN, NULL);
+       if (ldlm_resource_slab == NULL)
+               return -ENOMEM;
 
-       ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
+       ldlm_lock_slab = kmem_cache_create("ldlm_locks",
                              sizeof(struct ldlm_lock), 0,
-                             CFS_SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU);
+                             SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL);
        if (ldlm_lock_slab == NULL) {
-               cfs_mem_cache_destroy(ldlm_resource_slab);
+               kmem_cache_destroy(ldlm_resource_slab);
                return -ENOMEM;
        }
 
-        ldlm_interval_slab = cfs_mem_cache_create("interval_node",
+       ldlm_interval_slab = kmem_cache_create("interval_node",
                                         sizeof(struct ldlm_interval),
-                                        0, CFS_SLAB_HWCACHE_ALIGN);
+                                       0, SLAB_HWCACHE_ALIGN, NULL);
         if (ldlm_interval_slab == NULL) {
-                cfs_mem_cache_destroy(ldlm_resource_slab);
-                cfs_mem_cache_destroy(ldlm_lock_slab);
+               kmem_cache_destroy(ldlm_resource_slab);
+               kmem_cache_destroy(ldlm_lock_slab);
                 return -ENOMEM;
         }
 #if LUSTRE_TRACKS_LOCK_EXP_REFS
@@ -3049,19 +3074,15 @@ int ldlm_init(void)
 
 void ldlm_exit(void)
 {
-        int rc;
-        if (ldlm_refcount)
-                CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
-        rc = cfs_mem_cache_destroy(ldlm_resource_slab);
-        LASSERTF(rc == 0, "couldn't free ldlm resource slab\n");
+       if (ldlm_refcount)
+               CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
+       kmem_cache_destroy(ldlm_resource_slab);
 #ifdef __KERNEL__
-        /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
-         * synchronize_rcu() to wait a grace period elapsed, so that
-         * ldlm_lock_free() get a chance to be called. */
-        synchronize_rcu();
+       /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
+        * synchronize_rcu() to wait a grace period elapsed, so that
+        * ldlm_lock_free() get a chance to be called. */
+       synchronize_rcu();
 #endif
-        rc = cfs_mem_cache_destroy(ldlm_lock_slab);
-        LASSERTF(rc == 0, "couldn't free ldlm lock slab\n");
-        rc = cfs_mem_cache_destroy(ldlm_interval_slab);
-        LASSERTF(rc == 0, "couldn't free interval node slab\n");
+       kmem_cache_destroy(ldlm_lock_slab);
+       kmem_cache_destroy(ldlm_interval_slab);
 }