Whamcloud - gitweb
LU-6529 ldlm: cancel aged locks for LRUR
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 0fa66b0..684062f 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2010, 2013, Intel Corporation.
+ * Copyright (c) 2010, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  */
 
 #define DEBUG_SUBSYSTEM S_LDLM
-#ifndef __KERNEL__
-#include <signal.h>
-#include <liblustre.h>
-#endif
 
 #include <lustre_dlm.h>
 #include <obd_class.h>
@@ -71,8 +67,8 @@
 
 #include "ldlm_internal.h"
 
-int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
-CFS_MODULE_PARM(ldlm_enqueue_min, "i", int, 0644,
+unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
+CFS_MODULE_PARM(ldlm_enqueue_min, "i", uint, 0644,
                 "lock enqueue timeout minimum");
 
 /* in client side, whether the cached locks will be canceled before replay */
@@ -135,45 +131,56 @@ int ldlm_expired_completion_wait(void *data)
 
         RETURN(0);
 }
-EXPORT_SYMBOL(ldlm_expired_completion_wait);
+
+/**
+ * Calculate the Completion timeout (covering enqueue, BL AST, data flush,
+ * lock cancel, and their replies). Used for lock completion timeout on the
+ * client side.
+ *
+ * \param[in] lock        lock which is waiting the completion callback
+ *
+ * \retval            timeout in seconds to wait for the server reply
+ */
 
 /* We use the same basis for both server side and client side functions
    from a single node. */
-int ldlm_get_enq_timeout(struct ldlm_lock *lock)
+static unsigned int ldlm_cp_timeout(struct ldlm_lock *lock)
 {
-        int timeout = at_get(ldlm_lock_to_ns_at(lock));
-        if (AT_OFF)
-                return obd_timeout / 2;
-        /* Since these are non-updating timeouts, we should be conservative.
-           It would be nice to have some kind of "early reply" mechanism for
-           lock callbacks too... */
-        timeout = min_t(int, at_max, timeout + (timeout >> 1)); /* 150% */
-        return max(timeout, ldlm_enqueue_min);
+       unsigned int timeout;
+
+       if (AT_OFF)
+               return obd_timeout;
+
+       /* Wait a long time for enqueue - server may have to callback a
+        * lock from another client.  Server will evict the other client if it
+        * doesn't respond reasonably, and then give us the lock. */
+       timeout = at_get(ldlm_lock_to_ns_at(lock));
+       return max(3 * timeout, ldlm_enqueue_min);
 }
-EXPORT_SYMBOL(ldlm_get_enq_timeout);
 
 /**
  * Helper function for ldlm_completion_ast(), updating timings when lock is
  * actually granted.
  */
-static int ldlm_completion_tail(struct ldlm_lock *lock)
+static int ldlm_completion_tail(struct ldlm_lock *lock, void *data)
 {
        long delay;
-       int  result;
+       int  result = 0;
 
        if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) {
                LDLM_DEBUG(lock, "client-side enqueue: destroyed");
                result = -EIO;
+       } else if (data == NULL) {
+               LDLM_DEBUG(lock, "client-side enqueue: granted");
        } else {
+               /* Take into AT only CP RPC, not immediately granted locks */
                delay = cfs_time_sub(cfs_time_current_sec(),
                                     lock->l_last_activity);
                LDLM_DEBUG(lock, "client-side enqueue: granted after "
                           CFS_DURATION_T"s", delay);
 
                /* Update our time estimate */
-               at_measured(ldlm_lock_to_ns_at(lock),
-                           delay);
-               result = 0;
+               at_measured(ldlm_lock_to_ns_at(lock), delay);
        }
        return result;
 }
@@ -192,10 +199,9 @@ int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
                RETURN(0);
        }
 
-       if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
-                      LDLM_FL_BLOCK_CONV))) {
+       if (!(flags & LDLM_FL_BLOCKED_MASK)) {
                wake_up(&lock->l_waitq);
-               RETURN(ldlm_completion_tail(lock));
+               RETURN(ldlm_completion_tail(lock, data));
        }
 
        LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
@@ -242,8 +248,7 @@ int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
                 goto noreproc;
         }
 
-       if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
-                      LDLM_FL_BLOCK_CONV))) {
+       if (!(flags & LDLM_FL_BLOCKED_MASK)) {
                wake_up(&lock->l_waitq);
                RETURN(0);
        }
@@ -260,12 +265,10 @@ noreproc:
                 imp = obd->u.cli.cl_import;
         }
 
-        /* Wait a long time for enqueue - server may have to callback a
-           lock from another client.  Server will evict the other client if it
-           doesn't respond reasonably, and then give us the lock. */
-        timeout = ldlm_get_enq_timeout(lock) * 2;
+       timeout = ldlm_cp_timeout(lock);
 
-        lwd.lwd_lock = lock;
+       lwd.lwd_lock = lock;
+       lock->l_last_activity = cfs_time_current_sec();
 
        if (ldlm_is_no_timeout(lock)) {
                 LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
@@ -297,9 +300,9 @@ noreproc:
                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
                            rc);
                 RETURN(rc);
-        }
+       }
 
-        RETURN(ldlm_completion_tail(lock));
+       RETURN(ldlm_completion_tail(lock, data));
 }
 EXPORT_SYMBOL(ldlm_completion_ast);
 
@@ -378,32 +381,38 @@ int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 EXPORT_SYMBOL(ldlm_blocking_ast);
 
 /**
- * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See
- * comment in filter_intent_policy() on why you may need this.
+ * Implements ldlm_lock::l_glimpse_ast for extent locks acquired on the server.
+ *
+ * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for that is
+ * rather subtle: with OST-side locking, it may so happen that _all_ extent
+ * locks are held by the OST. If client wants to obtain the current file size
+ * it calls ll_glimpse_size(), and (as all locks are held only on the server),
+ * this dummy glimpse callback fires and does nothing. The client still
+ * receives the correct file size due to the following fragment of code in
+ * ldlm_cb_interpret():
+ *
+ *     if (rc == -ELDLM_NO_LOCK_DATA) {
+ *             LDLM_DEBUG(lock, "lost race - client has a lock but no"
+ *                        "inode");
+ *             ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
+ *     }
+ *
+ * That is, after the glimpse returns this error, ofd_lvbo_update() is called
+ * and returns the updated file attributes from the inode to the client.
+ *
+ * See also comment in ofd_intent_policy() on why servers must set a non-NULL
+ * l_glimpse_ast when grabbing DLM locks.  Otherwise, the server will assume
+ * that the object is in the process of being destroyed.
+ *
+ * \param[in] lock     DLM lock being glimpsed, unused
+ * \param[in] reqp     pointer to ptlrpc_request, unused
+ *
+ * \retval             -ELDLM_NO_LOCK_DATA to get attributes from disk object
  */
 int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp)
 {
-        /*
-         * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for
-         * that is rather subtle: with OST-side locking, it may so happen that
-         * _all_ extent locks are held by the OST. If client wants to obtain
-         * current file size it calls ll{,u}_glimpse_size(), and (as locks are
-         * on the server), dummy glimpse callback fires and does
-         * nothing. Client still receives correct file size due to the
-         * following fragment in filter_intent_policy():
-         *
-         * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB
-         * if (rc != 0 && res->lr_namespace->ns_lvbo &&
-         *     res->lr_namespace->ns_lvbo->lvbo_update) {
-         *         res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
-         * }
-         *
-         * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and
-         * returns correct file size to the client.
-         */
         return -ELDLM_NO_LOCK_DATA;
 }
-EXPORT_SYMBOL(ldlm_glimpse_ast);
 
 /**
  * Enqueue a local lock (typically on a server).
@@ -438,6 +447,12 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
        if (IS_ERR(lock))
                GOTO(out_nolock, err = PTR_ERR(lock));
 
+       err = ldlm_lvbo_init(lock->l_resource);
+       if (err < 0) {
+               LDLM_ERROR(lock, "delayed lvb init failed (rc %d)", err);
+               GOTO(out, err);
+       }
+
         ldlm_lock2handle(lock, lockh);
 
         /* NB: we don't have any lock now (lock_res_and_lock)
@@ -538,7 +553,6 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
         struct ldlm_lock *lock;
         struct ldlm_reply *reply;
         int cleanup_phase = 1;
-       int size = 0;
         ENTRY;
 
         lock = ldlm_handle2lock(lockh);
@@ -565,8 +579,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        if (reply == NULL)
                GOTO(cleanup, rc = -EPROTO);
 
-       if (lvb_len != 0) {
-               LASSERT(lvb != NULL);
+       if (lvb_len > 0) {
+               int size = 0;
 
                size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
                                            RCL_SERVER);
@@ -579,13 +593,14 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                                   lvb_len, size);
                        GOTO(cleanup, rc = -EINVAL);
                }
+               lvb_len = size;
        }
 
        if (rc == ELDLM_LOCK_ABORTED) {
-               if (lvb_len != 0)
+               if (lvb_len > 0 && lvb != NULL)
                        rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
-                                          lvb, size);
-               GOTO(cleanup, rc = (rc != 0 ? rc : ELDLM_LOCK_ABORTED));
+                                          lvb, lvb_len);
+               GOTO(cleanup, rc = rc ? : ELDLM_LOCK_ABORTED);
        }
 
         /* lock enqueued on the server */
@@ -650,11 +665,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                         LDLM_DEBUG(lock,"client-side enqueue, new policy data");
         }
 
-        if ((*flags) & LDLM_FL_AST_SENT ||
-            /* Cancel extent locks as soon as possible on a liblustre client,
-             * because it cannot handle asynchronous ASTs robustly (see
-             * bug 7311). */
-            (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {
+       if ((*flags) & LDLM_FL_AST_SENT) {
                 lock_res_and_lock(lock);
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
                 unlock_res_and_lock(lock);
@@ -663,7 +674,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 
         /* If the lock has already been granted by a completion AST, don't
          * clobber the LVB with an older one. */
-       if (lvb_len != 0) {
+       if (lvb_len > 0) {
                /* We must lock or a racing completion might update lvb without
                 * letting us know and we'll clobber the correct value.
                 * Cannot unlock after the check either, a that still leaves
@@ -671,7 +682,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                lock_res_and_lock(lock);
                if (lock->l_req_mode != lock->l_granted_mode)
                        rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
-                                          lock->l_lvb_data, size);
+                                          lock->l_lvb_data, lvb_len);
                unlock_res_and_lock(lock);
                if (rc < 0) {
                        cleanup_phase = 1;
@@ -690,11 +701,11 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                 }
         }
 
-        if (lvb_len && lvb != NULL) {
-                /* Copy the LVB here, and not earlier, because the completion
-                 * AST (if any) can override what we got in the reply */
-                memcpy(lvb, lock->l_lvb_data, lvb_len);
-        }
+       if (lvb_len > 0 && lvb != NULL) {
+               /* Copy the LVB here, and not earlier, because the completion
+                * AST (if any) can override what we got in the reply */
+               memcpy(lvb, lock->l_lvb_data, lvb_len);
+       }
 
         LDLM_DEBUG(lock, "client-side enqueue END");
         EXIT;
@@ -732,16 +743,16 @@ static inline int ldlm_capsule_handles_avail(struct req_capsule *pill,
                                              enum req_location loc,
                                              int off)
 {
-        int size = req_capsule_msg_size(pill, loc);
-        return ldlm_req_handles_avail(size, off);
+       __u32 size = req_capsule_msg_size(pill, loc);
+       return ldlm_req_handles_avail(size, off);
 }
 
 static inline int ldlm_format_handles_avail(struct obd_import *imp,
                                             const struct req_format *fmt,
                                             enum req_location loc, int off)
 {
-        int size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
-        return ldlm_req_handles_avail(size, off);
+       __u32 size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
+       return ldlm_req_handles_avail(size, off);
 }
 
 /**
@@ -801,7 +812,7 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
                         dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
                         LASSERT(dlm);
                         /* Skip first lock handler in ldlm_request_pack(),
-                         * this method will incrment @lock_count according
+                        * this method will increment @lock_count according
                          * to the lock handle amount actually written to
                          * the buffer. */
                         dlm->lock_count = canceloff;
@@ -818,7 +829,7 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
 EXPORT_SYMBOL(ldlm_prep_elc_req);
 
 int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
-                          cfs_list_t *cancels, int count)
+                         struct list_head *cancels, int count)
 {
         return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
                                  LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
@@ -898,21 +909,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                 /* for the local lock, add the reference */
                 ldlm_lock_addref_internal(lock, einfo->ei_mode);
                 ldlm_lock2handle(lock, lockh);
-                if (policy != NULL) {
-                        /* INODEBITS_INTEROP: If the server does not support
-                         * inodebits, we will request a plain lock in the
-                         * descriptor (ldlm_lock2desc() below) but use an
-                         * inodebits lock internally with both bits set.
-                         */
-                        if (einfo->ei_type == LDLM_IBITS &&
-                           !(exp_connect_flags(exp) &
-                             OBD_CONNECT_IBITS))
-                                lock->l_policy_data.l_inodebits.bits =
-                                        MDS_INODELOCK_LOOKUP |
-                                        MDS_INODELOCK_UPDATE;
-                        else
-                                lock->l_policy_data = *policy;
-                }
+               if (policy != NULL)
+                       lock->l_policy_data = *policy;
 
                if (einfo->ei_type == LDLM_EXTENT) {
                        /* extent lock without policy is a bug */
@@ -929,6 +927,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
        lock->l_export = NULL;
        lock->l_blocking_ast = einfo->ei_cb_bl;
        lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL));
+        lock->l_last_activity = cfs_time_current_sec();
 
         /* lock not sent to server yet */
 
@@ -971,14 +970,6 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                ptlrpc_request_set_replen(req);
         }
 
-        /*
-         * Liblustre client doesn't get extent locks, except for O_APPEND case
-         * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where
-         * [i_size, OBD_OBJECT_EOF] lock is taken.
-         */
-        LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||
-                     policy->l_extent.end == OBD_OBJECT_EOF));
-
         if (async) {
                 LASSERT(reqp != NULL);
                 RETURN(0);
@@ -1106,7 +1097,6 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
         ptlrpc_req_finished(req);
         return rc;
 }
-EXPORT_SYMBOL(ldlm_cli_convert);
 
 /**
  * Cancel locks locally.
@@ -1157,7 +1147,7 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
  * Pack \a count locks in \a head into ldlm_request buffer of request \a req.
  */
 static void ldlm_cancel_pack(struct ptlrpc_request *req,
-                             cfs_list_t *head, int count)
+                            struct list_head *head, int count)
 {
         struct ldlm_request *dlm;
         struct ldlm_lock *lock;
@@ -1177,7 +1167,7 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req,
         /* XXX: it would be better to pack lock handles grouped by resource.
          * so that the server cancel would call filter_lvbo_update() less
          * frequently. */
-        cfs_list_for_each_entry(lock, head, l_bl_ast) {
+       list_for_each_entry(lock, head, l_bl_ast) {
                 if (!count--)
                         break;
                 LASSERT(lock->l_conn_export);
@@ -1193,7 +1183,7 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req,
 /**
  * Prepare and send a batched cancel RPC. It will include \a count lock
  * handles of locks given in \a cancels list. */
-int ldlm_cli_cancel_req(struct obd_export *exp, cfs_list_t *cancels,
+int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                         int count, ldlm_cancel_flags_t flags)
 {
         struct ptlrpc_request *req = NULL;
@@ -1277,7 +1267,6 @@ int ldlm_cli_cancel_req(struct obd_export *exp, cfs_list_t *cancels,
 out:
         return sent ? sent : rc;
 }
-EXPORT_SYMBOL(ldlm_cli_cancel_req);
 
 static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
 {
@@ -1333,7 +1322,6 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req)
 
         RETURN(0);
 }
-EXPORT_SYMBOL(ldlm_cli_update_pool);
 
 /**
  * Client side lock cancel.
@@ -1366,8 +1354,8 @@ int ldlm_cli_cancel(struct lustre_handle *lockh,
        /* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL
         * RPC which goes to canceld portal, so we can cancel other LRU locks
         * here and send them all as one LDLM_CANCEL RPC. */
-        LASSERT(cfs_list_empty(&lock->l_bl_ast));
-        cfs_list_add(&lock->l_bl_ast, &cancels);
+       LASSERT(list_empty(&lock->l_bl_ast));
+       list_add(&lock->l_bl_ast, &cancels);
 
         exp = lock->l_conn_export;
         if (exp_connect_cancelset(exp)) {
@@ -1391,7 +1379,7 @@ EXPORT_SYMBOL(ldlm_cli_cancel);
  * Locally cancel up to \a count locks in list \a cancels.
  * Return the number of cancelled locks.
  */
-int ldlm_cli_cancel_list_local(cfs_list_t *cancels, int count,
+int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
                               ldlm_cancel_flags_t flags)
 {
        struct list_head head = LIST_HEAD_INIT(head);
@@ -1400,7 +1388,7 @@ int ldlm_cli_cancel_list_local(cfs_list_t *cancels, int count,
        __u64 rc;
 
         left = count;
-        cfs_list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
+       list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
                 if (left-- == 0)
                         break;
 
@@ -1416,14 +1404,14 @@ int ldlm_cli_cancel_list_local(cfs_list_t *cancels, int count,
                 * the one being generated now. */
                if (!(flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) {
                        LDLM_DEBUG(lock, "Cancel lock separately");
-                       cfs_list_del_init(&lock->l_bl_ast);
-                       cfs_list_add(&lock->l_bl_ast, &head);
+                       list_del_init(&lock->l_bl_ast);
+                       list_add(&lock->l_bl_ast, &head);
                        bl_ast++;
                         continue;
                 }
                 if (rc == LDLM_FL_LOCAL_ONLY) {
                         /* CANCEL RPC should not be sent to server. */
-                        cfs_list_del_init(&lock->l_bl_ast);
+                       list_del_init(&lock->l_bl_ast);
                         LDLM_LOCK_RELEASE(lock);
                         count--;
                 }
@@ -1435,7 +1423,6 @@ int ldlm_cli_cancel_list_local(cfs_list_t *cancels, int count,
 
         RETURN(count);
 }
-EXPORT_SYMBOL(ldlm_cli_cancel_list_local);
 
 /**
  * Cancel as many locks as possible w/o sending any RPCs (e.g. to write back
@@ -1493,6 +1480,12 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
        if (count && added >= count)
                return LDLM_POLICY_KEEP_LOCK;
 
+       /* Despite of the LV, It doesn't make sense to keep the lock which
+        * is unused for ns_max_age time. */
+       if (cfs_time_after(cfs_time_current(),
+                          cfs_time_add(lock->l_last_used, ns->ns_max_age)))
+               return LDLM_POLICY_CANCEL_LOCK;
+
        slv = ldlm_pool_get_slv(pl);
        lvf = ldlm_pool_get_lvf(pl);
        la = cfs_duration_sec(cfs_time_sub(cur,
@@ -1507,9 +1500,6 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
        if (slv == 0 || lv < slv)
                return LDLM_POLICY_KEEP_LOCK;
 
-       if (ns->ns_cancel != NULL && ns->ns_cancel(lock) == 0)
-               return LDLM_POLICY_KEEP_LOCK;
-
        return LDLM_POLICY_CANCEL_LOCK;
 }
 
@@ -1547,15 +1537,10 @@ static ldlm_policy_res_t ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
                                                 int unused, int added,
                                                 int count)
 {
-        if (added >= count)
-               return LDLM_POLICY_KEEP_LOCK;
-
-       if (cfs_time_before(cfs_time_current(),
+       if ((added >= count) &&
+           cfs_time_before(cfs_time_current(),
                            cfs_time_add(lock->l_last_used, ns->ns_max_age)))
-                return LDLM_POLICY_KEEP_LOCK;
-
-        if (ns->ns_cancel != NULL && ns->ns_cancel(lock) == 0)
-                return LDLM_POLICY_KEEP_LOCK;
+               return LDLM_POLICY_KEEP_LOCK;
 
        return LDLM_POLICY_CANCEL_LOCK;
 }
@@ -1639,8 +1624,9 @@ ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
  *                               sending any RPCs or waiting for any
  *                               outstanding RPC to complete.
  */
-static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, cfs_list_t *cancels,
-                                 int count, int max, int flags)
+static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
+                                struct list_head *cancels, int count, int max,
+                                int flags)
 {
        ldlm_cancel_lru_policy_t pf;
        struct ldlm_lock *lock, *next;
@@ -1657,7 +1643,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, cfs_list_t *cancels,
         pf = ldlm_cancel_lru_policy(ns, flags);
         LASSERT(pf != NULL);
 
-        while (!cfs_list_empty(&ns->ns_unused_list)) {
+       while (!list_empty(&ns->ns_unused_list)) {
                 ldlm_policy_res_t result;
 
                 /* all unused locks */
@@ -1668,7 +1654,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, cfs_list_t *cancels,
                 if (max && added >= max)
                         break;
 
-                cfs_list_for_each_entry_safe(lock, next, &ns->ns_unused_list,
+               list_for_each_entry_safe(lock, next, &ns->ns_unused_list,
                                             l_lru) {
                         /* No locks which got blocking requests. */
                        LASSERT(!ldlm_is_bl_ast(lock));
@@ -1758,8 +1744,8 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, cfs_list_t *cancels,
                 * and can't use l_pending_chain as it is used both on
                 * server and client nevertheless bug 5666 says it is
                 * used only on server */
-               LASSERT(cfs_list_empty(&lock->l_bl_ast));
-               cfs_list_add(&lock->l_bl_ast, cancels);
+               LASSERT(list_empty(&lock->l_bl_ast));
+               list_add(&lock->l_bl_ast, cancels);
                unlock_res_and_lock(lock);
                lu_ref_del(&lock->l_reference, __FUNCTION__, current);
                spin_lock(&ns->ns_lock);
@@ -1770,7 +1756,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, cfs_list_t *cancels,
        RETURN(added);
 }
 
-int ldlm_cancel_lru_local(struct ldlm_namespace *ns, cfs_list_t *cancels,
+int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
                           int count, int max, ldlm_cancel_flags_t cancel_flags,
                           int flags)
 {
@@ -1797,9 +1783,6 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
        int count, rc;
        ENTRY;
 
-#ifndef __KERNEL__
-       cancel_flags &= ~LCF_ASYNC; /* force to be sync in user space */
-#endif
        /* Just prepare the list of locks, do not actually cancel them yet.
         * Locks are cancelled later in a separate thread. */
        count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags);
@@ -1816,7 +1799,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
  * list.
  */
 int ldlm_cancel_resource_local(struct ldlm_resource *res,
-                              cfs_list_t *cancels,
+                              struct list_head *cancels,
                               ldlm_policy_data_t *policy,
                               ldlm_mode_t mode, __u64 lock_flags,
                               ldlm_cancel_flags_t cancel_flags, void *opaque)
@@ -1826,7 +1809,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
         ENTRY;
 
         lock_res(res);
-        cfs_list_for_each_entry(lock, &res->lr_granted, l_res_link) {
+       list_for_each_entry(lock, &res->lr_granted, l_res_link) {
                 if (opaque != NULL && lock->l_ast_data != opaque) {
                         LDLM_ERROR(lock, "data %p doesn't match opaque %p",
                                    lock->l_ast_data, opaque);
@@ -1856,8 +1839,8 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
                                 lock_flags;
 
-                LASSERT(cfs_list_empty(&lock->l_bl_ast));
-                cfs_list_add(&lock->l_bl_ast, cancels);
+               LASSERT(list_empty(&lock->l_bl_ast));
+               list_add(&lock->l_bl_ast, cancels);
                 LDLM_LOCK_GET(lock);
                 count++;
         }
@@ -1877,14 +1860,14 @@ EXPORT_SYMBOL(ldlm_cancel_resource_local);
  * buffer at the offset \a off.
  * Destroy \a cancels at the end.
  */
-int ldlm_cli_cancel_list(cfs_list_t *cancels, int count,
+int ldlm_cli_cancel_list(struct list_head *cancels, int count,
                          struct ptlrpc_request *req, ldlm_cancel_flags_t flags)
 {
         struct ldlm_lock *lock;
         int res = 0;
         ENTRY;
 
-        if (cfs_list_empty(cancels) || count == 0)
+       if (list_empty(cancels) || count == 0)
                 RETURN(0);
 
         /* XXX: requests (both batched and not) could be sent in parallel.
@@ -1893,8 +1876,8 @@ int ldlm_cli_cancel_list(cfs_list_t *cancels, int count,
          * It would also speed up the case when the server does not support
          * the feature. */
         while (count > 0) {
-                LASSERT(!cfs_list_empty(cancels));
-                lock = cfs_list_entry(cancels->next, struct ldlm_lock,
+               LASSERT(!list_empty(cancels));
+               lock = list_entry(cancels->next, struct ldlm_lock,
                                       l_bl_ast);
                 LASSERT(lock->l_conn_export);
 
@@ -1970,7 +1953,7 @@ struct ldlm_cli_cancel_arg {
 };
 
 static int ldlm_cli_hash_cancel_unused(cfs_hash_t *hs, cfs_hash_bd_t *bd,
-                                      cfs_hlist_node_t *hnode, void *arg)
+                                      struct hlist_node *hnode, void *arg)
 {
        struct ldlm_resource           *res = cfs_hash_object(hs, hnode);
        struct ldlm_cli_cancel_arg     *lc = arg;
@@ -2012,14 +1995,13 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
                 RETURN(ELDLM_OK);
         }
 }
-EXPORT_SYMBOL(ldlm_cli_cancel_unused);
 
 /* Lock iterators. */
 
 int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
                           void *closure)
 {
-        cfs_list_t *tmp, *next;
+       struct list_head *tmp, *next;
         struct ldlm_lock *lock;
         int rc = LDLM_ITER_CONTINUE;
 
@@ -2029,22 +2011,22 @@ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
                 RETURN(LDLM_ITER_CONTINUE);
 
         lock_res(res);
-        cfs_list_for_each_safe(tmp, next, &res->lr_granted) {
-                lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
+       list_for_each_safe(tmp, next, &res->lr_granted) {
+               lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
                 if (iter(lock, closure) == LDLM_ITER_STOP)
                         GOTO(out, rc = LDLM_ITER_STOP);
         }
 
-        cfs_list_for_each_safe(tmp, next, &res->lr_converting) {
-                lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
+       list_for_each_safe(tmp, next, &res->lr_converting) {
+               lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
                 if (iter(lock, closure) == LDLM_ITER_STOP)
                         GOTO(out, rc = LDLM_ITER_STOP);
         }
 
-        cfs_list_for_each_safe(tmp, next, &res->lr_waiting) {
-                lock = cfs_list_entry(tmp, struct ldlm_lock, l_res_link);
+       list_for_each_safe(tmp, next, &res->lr_waiting) {
+               lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
                 if (iter(lock, closure) == LDLM_ITER_STOP)
                         GOTO(out, rc = LDLM_ITER_STOP);
@@ -2053,7 +2035,6 @@ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
         unlock_res(res);
         RETURN(rc);
 }
-EXPORT_SYMBOL(ldlm_resource_foreach);
 
 struct iter_helper_data {
         ldlm_iterator_t iter;
@@ -2067,7 +2048,7 @@ static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
 }
 
 static int ldlm_res_iter_helper(cfs_hash_t *hs, cfs_hash_bd_t *bd,
-                                cfs_hlist_node_t *hnode, void *arg)
+                               struct hlist_node *hnode, void *arg)
 
 {
         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
@@ -2086,7 +2067,6 @@ void ldlm_namespace_foreach(struct ldlm_namespace *ns,
                                  ldlm_res_iter_helper, &helper);
 
 }
-EXPORT_SYMBOL(ldlm_namespace_foreach);
 
 /* non-blocking function to manipulate a lock whose cb_data is being put away.
  * return  0:  find no resource
@@ -2119,10 +2099,10 @@ EXPORT_SYMBOL(ldlm_resource_iterate);
 
 static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
 {
-        cfs_list_t *list = closure;
+       struct list_head *list = closure;
 
         /* we use l_pending_chain here, because it's unused on clients. */
-        LASSERTF(cfs_list_empty(&lock->l_pending_chain),
+       LASSERTF(list_empty(&lock->l_pending_chain),
                  "lock %p next %p prev %p\n",
                  lock, &lock->l_pending_chain.next,&lock->l_pending_chain.prev);
         /* bug 9573: don't replay locks left after eviction, or
@@ -2130,7 +2110,7 @@ static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
          * on a lock so that it does not disapear under us (e.g. due to cancel)
          */
         if (!(lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_CANCELING))) {
-                cfs_list_add(&lock->l_pending_chain, list);
+               list_add(&lock->l_pending_chain, list);
                 LDLM_LOCK_GET(lock);
         }
 
@@ -2230,7 +2210,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
                 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
         else if (lock->l_granted_mode)
                 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
-        else if (!cfs_list_empty(&lock->l_res_link))
+       else if (!list_empty(&lock->l_res_link))
                 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
         else
                 flags = LDLM_FL_REPLAY;
@@ -2323,8 +2303,8 @@ int ldlm_replay_locks(struct obd_import *imp)
 
        ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
 
-       cfs_list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
-               cfs_list_del_init(&lock->l_pending_chain);
+       list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
+               list_del_init(&lock->l_pending_chain);
                if (rc) {
                        LDLM_LOCK_RELEASE(lock);
                        continue; /* or try to do the rest? */
@@ -2337,4 +2317,3 @@ int ldlm_replay_locks(struct obd_import *imp)
 
        RETURN(rc);
 }
-EXPORT_SYMBOL(ldlm_replay_locks);