Whamcloud - gitweb
b=11270
authorzam <zam>
Thu, 8 May 2008 07:37:08 +0000 (07:37 +0000)
committerzam <zam>
Thu, 8 May 2008 07:37:08 +0000 (07:37 +0000)
i=vitaly.vertman
i=oleg.drokin

Lockless i/o and lockless truncate code and sanityN tests.

15 files changed:
lustre/ChangeLog
lustre/include/linux/lustre_lite.h
lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_resource.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/llite_mmap.c
lustre/llite/lproc_llite.c
lustre/llite/rw.c
lustre/lov/lov_request.c
lustre/osc/osc_request.c
lustre/tests/sanityN.sh
lustre/tests/test-framework.sh

index 7913dba..85f27fb 100644 (file)
@@ -964,6 +964,12 @@ Details    : Change the structure of stats under obdfilter and mds to
              The "uuid"s file would list the uuids of _active_ exports.
              And the clear entry is to clear all stats and stale nids.
 
+Severity   : enhancement
+Bugzilla   : 11270
+Description: eliminate client locks in face of contention
+Details    : file contention detection and lockless i/o implementation
+             for contended files.
+
 --------------------------------------------------------------------------------
 
 2007-08-10         Cluster File Systems, Inc. <info@clusterfs.com>
index 20dea87..d3c1504 100644 (file)
@@ -47,6 +47,7 @@ enum {
          LPROC_LL_FSYNC,
          LPROC_LL_SETATTR,
          LPROC_LL_TRUNC,
+         LPROC_LL_LOCKLESS_TRUNC,
          LPROC_LL_FLOCK,
          LPROC_LL_GETATTR,
          LPROC_LL_STAFS,
@@ -58,6 +59,8 @@ enum {
          LPROC_LL_INODE_PERM,
          LPROC_LL_DIRECT_READ,
          LPROC_LL_DIRECT_WRITE,
+         LPROC_LL_LOCKLESS_READ,
+         LPROC_LL_LOCKLESS_WRITE,
          LPROC_LL_FILE_OPCODES
 };
 
index 852da26..34d17de 100644 (file)
@@ -132,6 +132,9 @@ typedef enum {
  * w/o involving separate thread. in order to decrease cs rate */
 #define LDLM_FL_ATOMIC_CB      0x4000000
 
+/* Cancel lock asynchronously. See ldlm_cli_cancel_unused_resource. */
+#define LDLM_FL_ASYNC           0x8000000
+
 /* It may happen that a client initiate 2 operations, e.g. unlink and mkdir,
  * such that server send blocking ast for conflict locks to this client for
  * the 1st operation, whereas the 2nd operation has canceled this lock and
@@ -145,8 +148,8 @@ typedef enum {
 #define LDLM_FL_BL_AST          0x10000000
 #define LDLM_FL_BL_DONE         0x20000000
 
-/* Cancel lock asynchronously. See ldlm_cli_cancel_unused_resource. */
-#define LDLM_FL_ASYNC           0x40000000
+/* measure lock contention and return -EUSERS if locking contention is high */
+#define LDLM_FL_DENY_ON_CONTENTION 0x40000000
 
 /* The blocking callback is overloaded to perform two functions.  These flags
  * indicate which operation should be performed. */
@@ -287,6 +290,12 @@ typedef enum {
  * others (including ibits locks) will be canceled on memory pressure event. */
 #define LDLM_LOCK_SHRINK_THUMB 256
 
+/* default values for the "max_nolock_size", "contention_time"
+ * and "contended_locks" namespace tunables */
+#define NS_DEFAULT_MAX_NOLOCK_BYTES 0
+#define NS_DEFAULT_CONTENTION_SECONDS 2
+#define NS_DEFAULT_CONTENDED_LOCKS 32
+
 struct ldlm_namespace {
         char                  *ns_name;
         ldlm_side_t            ns_client; /* is this a client-side lock tree? */
@@ -321,6 +330,14 @@ struct ldlm_namespace {
         cfs_waitq_t            ns_waitq;
         struct ldlm_pool       ns_pool;
         ldlm_appetite_t        ns_appetite;
+        /* if more than @ns_contented_locks found, the resource considered
+         * as contended */
+        unsigned               ns_contended_locks;
+        /* the resource remembers contended state during @ns_contention_time,
+         * in seconds */
+        unsigned               ns_contention_time;
+        /* limit size of nolock requests, in bytes */
+        unsigned               ns_max_nolock_size;
 };
 
 static inline int ns_is_client(struct ldlm_namespace *ns)
@@ -486,6 +503,9 @@ struct ldlm_resource {
         struct semaphore       lr_lvb_sem;
         __u32                  lr_lvb_len;
         void                  *lr_lvb_data;
+
+        /* when the resource was considered as contended */
+        cfs_time_t             lr_contention_time;
 };
 
 struct ldlm_ast_work {
index f1f88ce..2e6aa1a 100644 (file)
 #define DEBUG_SUBSYSTEM S_LDLM
 #ifndef __KERNEL__
 # include <liblustre.h>
+#else
+# include <libcfs/libcfs.h>
+# include <libcfs/kp30.h>
 #endif
 
 #include <lustre_dlm.h>
 #include <obd_support.h>
+#include <obd.h>
 #include <lustre_lib.h>
 
 #include "ldlm_internal.h"
@@ -259,10 +263,23 @@ static void ldlm_extent_policy(struct ldlm_resource *res,
         }
 }
 
+static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
+{
+        struct ldlm_resource *res = lock->l_resource;
+        cfs_time_t now = cfs_time_current();
+
+        CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
+        if (contended_locks > res->lr_namespace->ns_contended_locks)
+                res->lr_contention_time = now;
+        return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
+                cfs_time_seconds(res->lr_namespace->ns_contention_time)));
+}
+
 struct ldlm_extent_compat_args {
         struct list_head *work_list;
         struct ldlm_lock *lock;
         ldlm_mode_t mode;
+        int *locks;
         int *compat;
 };
 
@@ -271,9 +288,11 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
 {
         struct ldlm_extent_compat_args *priv = data;
         struct ldlm_interval *node = to_ldlm_interval(n);
+        struct ldlm_extent *extent;
         struct list_head *work_list = priv->work_list;
         struct ldlm_lock *lock, *enq = priv->lock;
         ldlm_mode_t mode = priv->mode;
+        int count = 0;
         ENTRY;
 
         LASSERT(!list_empty(&node->li_group));
@@ -284,11 +303,17 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
                          "mode = %s, lock->l_granted_mode = %s\n",
                          ldlm_lockname[mode],
                          ldlm_lockname[lock->l_granted_mode]);
-
+                count++;
                 if (lock->l_blocking_ast)
                         ldlm_add_ast_work_item(lock, enq, work_list);
         }
 
+        /* don't count conflicting glimpse locks */
+        extent = ldlm_interval_extent(node);
+        if (!(mode == LCK_PR &&
+            extent->start == 0 && extent->end == OBD_OBJECT_EOF))
+                *priv->locks += count;
+
         if (priv->compat)
                 *priv->compat = 0;
 
@@ -307,7 +332,7 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n,
 static int
 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                          int *flags, ldlm_error_t *err,
-                         struct list_head *work_list)
+                         struct list_head *work_list, int *contended_locks)
 {
         struct list_head *tmp;
         struct ldlm_lock *lock;
@@ -317,6 +342,7 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
         __u64 req_end = req->l_req_extent.end;
         int compat = 1;
         int scan = 0;
+        int check_contention;
         ENTRY;
 
         lockmode_verify(req_mode);
@@ -326,6 +352,7 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                 struct ldlm_interval_tree *tree;
                 struct ldlm_extent_compat_args data = {.work_list = work_list,
                                                .lock = req,
+                                               .locks = contended_locks,
                                                .compat = &compat };
                 struct interval_node_extent ex = { .start = req_start,
                                                    .end = req_end };
@@ -382,157 +409,179 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                                         compat = 0;
                         }
                 }
-                RETURN(compat);
-        }
+        } else { /* for waiting queue */
+                list_for_each(tmp, queue) {
+                        check_contention = 1;
+
+                        lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+                        if (req == lock)
+                                break;
+
+                        if (unlikely(scan)) {
+                                /* We only get here if we are queuing GROUP lock
+                                   and met some incompatible one. The main idea of this
+                                   code is to insert GROUP lock past compatible GROUP
+                                   lock in the waiting queue or if there is not any,
+                                   then in front of first non-GROUP lock */
+                                if (lock->l_req_mode != LCK_GROUP) {
+                                        /* Ok, we hit non-GROUP lock, there should
+                                         * be no more GROUP locks later on, queue in
+                                         * front of first non-GROUP lock */
+
+                                        ldlm_resource_insert_lock_after(lock, req);
+                                        list_del_init(&lock->l_res_link);
+                                        ldlm_resource_insert_lock_after(req, lock);
+                                        compat = 0;
+                                        break;
+                                }
+                                if (req->l_policy_data.l_extent.gid ==
+                                    lock->l_policy_data.l_extent.gid) {
+                                        /* found it */
+                                        ldlm_resource_insert_lock_after(lock, req);
+                                        compat = 0;
+                                        break;
+                                }
+                                continue;
+                        }
 
-        /* for waiting queue */
-        list_for_each(tmp, queue) {
-                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+                        /* locks are compatible, overlap doesn't matter */
+                        if (lockmode_compat(lock->l_req_mode, req_mode)) {
+                                if (req_mode == LCK_PR &&
+                                    ((lock->l_policy_data.l_extent.start <=
+                                      req->l_policy_data.l_extent.start) &&
+                                     (lock->l_policy_data.l_extent.end >=
+                                      req->l_policy_data.l_extent.end))) {
+                                        /* If we met a PR lock just like us or wider,
+                                           and nobody down the list conflicted with
+                                           it, that means we can skip processing of
+                                           the rest of the list and safely place
+                                           ourselves at the end of the list, or grant
+                                           (dependent if we met an conflicting locks
+                                           before in the list).
+                                           In case of 1st enqueue only we continue
+                                           traversing if there is something conflicting
+                                           down the list because we need to make sure
+                                           that something is marked as AST_SENT as well,
+                                           in cse of empy worklist we would exit on
+                                           first conflict met. */
+                                        /* There IS a case where such flag is
+                                           not set for a lock, yet it blocks
+                                           something. Luckily for us this is
+                                           only during destroy, so lock is
+                                           exclusive. So here we are safe */
+                                        if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
+                                                RETURN(compat);
+                                        }
+                                }
 
-                if (req == lock)
-                        RETURN(compat);
-
-                if (unlikely(scan)) {
-                        /* We only get here if we are queuing GROUP lock
-                           and met some incompatible one. The main idea of this
-                           code is to insert GROUP lock past compatible GROUP
-                           lock in the waiting queue or if there is not any,
-                           then in front of first non-GROUP lock */
-                        if (lock->l_req_mode != LCK_GROUP) {
-                                /* Ok, we hit non-GROUP lock, there should
-                                 * be no more GROUP locks later on, queue in
-                                 * front of first non-GROUP lock */
-
-                                ldlm_resource_insert_lock_after(lock, req);
-                                list_del_init(&lock->l_res_link);
-                                ldlm_resource_insert_lock_after(req, lock);
-                                RETURN(0);
-                        }
-                        if (req->l_policy_data.l_extent.gid ==
-                             lock->l_policy_data.l_extent.gid) {
-                                /* found it */
-                                ldlm_resource_insert_lock_after(lock, req);
-                                RETURN(0);
-                        }
-                        continue;
-                }
+                                /* non-group locks are compatible, overlap doesn't
+                                   matter */
+                                if (likely(req_mode != LCK_GROUP))
+                                        continue;
 
-                /* locks are compatible, overlap doesn't matter */
-                if (lockmode_compat(lock->l_req_mode, req_mode)) {
-                        if (req_mode == LCK_PR &&
-                            ((lock->l_policy_data.l_extent.start <=
-                             req->l_policy_data.l_extent.start) &&
-                             (lock->l_policy_data.l_extent.end >=
-                              req->l_policy_data.l_extent.end))) {
-                                /* If we met a PR lock just like us or wider,
-                                   and nobody down the list conflicted with
-                                   it, that means we can skip processing of
-                                   the rest of the list and safely place
-                                   ourselves at the end of the list, or grant
-                                   (dependent if we met an conflicting locks
-                                   before in the list).
-                                   In case of 1st enqueue only we continue
-                                   traversing if there is something conflicting
-                                   down the list because we need to make sure
-                                   that something is marked as AST_SENT as well,
-                                   in cse of empy worklist we would exit on
-                                   first conflict met. */
-                                /* There IS a case where such flag is
-                                   not set for a lock, yet it blocks
-                                   something. Luckily for us this is
-                                   only during destroy, so lock is
-                                   exclusive. So here we are safe */
-                                if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
-                                        RETURN(compat);
+                                /* If we are trying to get a GROUP lock and there is
+                                   another one of this kind, we need to compare gid */
+                                if (req->l_policy_data.l_extent.gid ==
+                                    lock->l_policy_data.l_extent.gid) {
+                                        /* If existing lock with matched gid is granted,
+                                           we grant new one too. */
+                                        if (lock->l_req_mode == lock->l_granted_mode)
+                                                RETURN(2);
+
+                                        /* Otherwise we are scanning queue of waiting
+                                         * locks and it means current request would
+                                         * block along with existing lock (that is
+                                         * already blocked.
+                                         * If we are in nonblocking mode - return
+                                         * immediately */
+                                        if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+                                                compat = -EWOULDBLOCK;
+                                                goto destroylock;
+                                        }
+                                        /* If this group lock is compatible with another
+                                         * group lock on the waiting list, they must be
+                                         * together in the list, so they can be granted
+                                         * at the same time.  Otherwise the later lock
+                                         * can get stuck behind another, incompatible,
+                                         * lock. */
+                                        ldlm_resource_insert_lock_after(lock, req);
+                                        /* Because 'lock' is not granted, we can stop
+                                         * processing this queue and return immediately.
+                                         * There is no need to check the rest of the
+                                         * list. */
+                                        RETURN(0);
                                 }
                         }
 
-                        /* non-group locks are compatible, overlap doesn't
-                           matter */
-                        if (likely(req_mode != LCK_GROUP))
+                        if (unlikely(req_mode == LCK_GROUP &&
+                                     (lock->l_req_mode != lock->l_granted_mode))) {
+                                scan = 1;
+                                compat = 0;
+                                if (lock->l_req_mode != LCK_GROUP) {
+                                        /* Ok, we hit non-GROUP lock, there should be no
+                                           more GROUP locks later on, queue in front of
+                                           first non-GROUP lock */
+
+                                        ldlm_resource_insert_lock_after(lock, req);
+                                        list_del_init(&lock->l_res_link);
+                                        ldlm_resource_insert_lock_after(req, lock);
+                                        break;
+                                }
+                                if (req->l_policy_data.l_extent.gid ==
+                                    lock->l_policy_data.l_extent.gid) {
+                                        /* found it */
+                                        ldlm_resource_insert_lock_after(lock, req);
+                                        break;
+                                }
                                 continue;
+                        }
 
-                        /* If we are trying to get a GROUP lock and there is
-                           another one of this kind, we need to compare gid */
-                        if (req->l_policy_data.l_extent.gid ==
-                            lock->l_policy_data.l_extent.gid) {
-                                /* If existing lock with matched gid is granted,
-                                   we grant new one too. */
-                                if (lock->l_req_mode == lock->l_granted_mode)
-                                        RETURN(2);
-
-                                /* Otherwise we are scanning queue of waiting
-                                 * locks and it means current request would
-                                 * block along with existing lock (that is
-                                 * already blocked.
-                                 * If we are in nonblocking mode - return
-                                 * immediately */
+                        if (unlikely(lock->l_req_mode == LCK_GROUP)) {
+                                /* If compared lock is GROUP, then requested is PR/PW/
+                                 * so this is not compatible; extent range does not
+                                 * matter */
                                 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
                                         compat = -EWOULDBLOCK;
                                         goto destroylock;
+                                } else {
+                                        *flags |= LDLM_FL_NO_TIMEOUT;
                                 }
-                                /* If this group lock is compatible with another
-                                 * group lock on the waiting list, they must be
-                                 * together in the list, so they can be granted
-                                 * at the same time.  Otherwise the later lock
-                                 * can get stuck behind another, incompatible,
-                                 * lock. */
-                                ldlm_resource_insert_lock_after(lock, req);
-                                /* Because 'lock' is not granted, we can stop
-                                 * processing this queue and return immediately.
-                                 * There is no need to check the rest of the
-                                 * list. */
-                                RETURN(0);
+                        } else if (lock->l_policy_data.l_extent.end < req_start ||
+                                   lock->l_policy_data.l_extent.start > req_end) {
+                                /* if a non group lock doesn't overlap skip it */
+                                continue;
+                        } else if (lock->l_req_extent.end < req_start ||
+                                   lock->l_req_extent.start > req_end) {
+                                /* false contention, the requests doesn't really overlap */
+                                check_contention = 0;
                         }
-                }
 
-                if (unlikely(req_mode == LCK_GROUP &&
-                    (lock->l_req_mode != lock->l_granted_mode))) {
-                        scan = 1;
-                        compat = 0;
-                        if (lock->l_req_mode != LCK_GROUP) {
-                        /* Ok, we hit non-GROUP lock, there should be no
-                           more GROUP locks later on, queue in front of
-                           first non-GROUP lock */
-
-                                ldlm_resource_insert_lock_after(lock, req);
-                                list_del_init(&lock->l_res_link);
-                                ldlm_resource_insert_lock_after(req, lock);
+                        if (!work_list)
                                 RETURN(0);
-                        }
-                        if (req->l_policy_data.l_extent.gid ==
-                             lock->l_policy_data.l_extent.gid) {
-                                /* found it */
-                                ldlm_resource_insert_lock_after(lock, req);
-                                RETURN(0);
-                        }
-                        continue;
-                }
 
-                if (unlikely(lock->l_req_mode == LCK_GROUP)) {
-                        /* If compared lock is GROUP, then requested is PR/PW/
-                         * so this is not compatible; extent range does not
-                         * matter */
-                        if (*flags & LDLM_FL_BLOCK_NOWAIT) {
-                                compat = -EWOULDBLOCK;
-                                goto destroylock;
-                        } else {
-                                *flags |= LDLM_FL_NO_TIMEOUT;
-                        }
-                } else if (lock->l_policy_data.l_extent.end < req_start ||
-                           lock->l_policy_data.l_extent.start > req_end) {
-                        /* if a non group lock doesn't overlap skip it */
-                        continue;
-                }
+                        /* don't count conflicting glimpse locks */
+                        if (lock->l_req_mode == LCK_PR &&
+                            lock->l_policy_data.l_extent.start == 0 &&
+                            lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
+                                check_contention = 0;
 
-                if (!work_list)
-                        RETURN(0);
+                        *contended_locks += check_contention;
 
-                compat = 0;
-                if (lock->l_blocking_ast)
-                        ldlm_add_ast_work_item(lock, req, work_list);
+                        compat = 0;
+                        if (lock->l_blocking_ast)
+                                ldlm_add_ast_work_item(lock, req, work_list);
+                }
         }
 
+        if (ldlm_check_contention(req, *contended_locks) &&
+            compat == 0 &&
+            (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
+            req->l_req_mode != LCK_GROUP &&
+            req_end - req_start <=
+            req->l_resource->lr_namespace->ns_max_nolock_size)
+                GOTO(destroylock, compat = -EUSERS);
+
         RETURN(compat);
 destroylock:
         list_del_init(&req->l_res_link);
@@ -541,6 +590,27 @@ destroylock:
         RETURN(compat);
 }
 
+static void discard_bl_list(struct list_head *bl_list)
+{
+        struct list_head *tmp, *pos;
+        ENTRY;
+
+        list_for_each_safe(pos, tmp, bl_list) {
+                struct ldlm_lock *lock =
+                        list_entry(pos, struct ldlm_lock, l_bl_ast);
+
+                list_del_init(&lock->l_bl_ast);
+                LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
+                lock->l_flags &= ~LDLM_FL_AST_SENT;
+                LASSERT(lock->l_bl_ast_run == 0);
+                LASSERT(lock->l_blocking_lock);
+                LDLM_LOCK_PUT(lock->l_blocking_lock);
+                lock->l_blocking_lock = NULL;
+                LDLM_LOCK_PUT(lock);
+        }
+        EXIT;
+}
+
 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
   *   - blocking ASTs have already been sent
   *   - must call this function with the ns lock held
@@ -554,9 +624,12 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
         struct ldlm_resource *res = lock->l_resource;
         struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list);
         int rc, rc2;
+        int contended_locks = 0;
         ENTRY;
 
         LASSERT(list_empty(&res->lr_converting));
+        LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
+                !(lock->l_flags & LDLM_AST_DISCARD_DATA));
         check_res_locked(res);
         *err = ELDLM_OK;
 
@@ -568,10 +641,11 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                  * being true, we want to find out. */
                 LASSERT(*flags == 0);
                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
-                                              err, NULL);
+                                              err, NULL, &contended_locks);
                 if (rc == 1) {
                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
-                                                      flags, err, NULL);
+                                                      flags, err, NULL,
+                                                      &contended_locks);
                 }
                 if (rc == 0)
                         RETURN(LDLM_ITER_STOP);
@@ -585,13 +659,16 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
         }
 
  restart:
-        rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list);
+        contended_locks = 0;
+        rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
+                                      &rpc_list, &contended_locks);
         if (rc < 0)
                 GOTO(out, rc); /* lock was destroyed */
         if (rc == 2)
                 goto grant;
 
-        rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, &rpc_list);
+        rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
+                                       &rpc_list, &contended_locks);
         if (rc2 < 0)
                 GOTO(out, rc = rc2); /* lock was destroyed */
 
@@ -636,8 +713,12 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                 *flags |= LDLM_FL_NO_TIMEOUT;
 
         }
-        rc = 0;
+        RETURN(0);
 out:
+        if (!list_empty(&rpc_list)) {
+                LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
+                discard_bl_list(&rpc_list);
+        }
         RETURN(rc);
 }
 
index b39d2e9..151c513 100644 (file)
@@ -269,6 +269,27 @@ void ldlm_proc_namespace(struct ldlm_namespace *ns)
                 lock_vars[0].read_fptr = lprocfs_rd_uint;
                 lock_vars[0].write_fptr = lprocfs_wr_uint;
                 lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+
+                snprintf(lock_name, MAX_STRING_SIZE, "%s/max_nolock_bytes",
+                         ns->ns_name);
+                lock_vars[0].data = &ns->ns_max_nolock_size;
+                lock_vars[0].read_fptr = lprocfs_rd_uint;
+                lock_vars[0].write_fptr = lprocfs_wr_uint;
+                lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+
+                snprintf(lock_name, MAX_STRING_SIZE, "%s/contention_seconds",
+                         ns->ns_name);
+                lock_vars[0].data = &ns->ns_contention_time;
+                lock_vars[0].read_fptr = lprocfs_rd_uint;
+                lock_vars[0].write_fptr = lprocfs_wr_uint;
+                lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+
+                snprintf(lock_name, MAX_STRING_SIZE, "%s/contended_locks",
+                         ns->ns_name);
+                lock_vars[0].data = &ns->ns_contended_locks;
+                lock_vars[0].read_fptr = lprocfs_rd_uint;
+                lock_vars[0].write_fptr = lprocfs_wr_uint;
+                lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
         }
 }
 #undef MAX_STRING_SIZE
@@ -314,6 +335,9 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client,
         atomic_set(&ns->ns_locks, 0);
         ns->ns_resources = 0;
         cfs_waitq_init(&ns->ns_waitq);
+        ns->ns_max_nolock_size = NS_DEFAULT_MAX_NOLOCK_BYTES;
+        ns->ns_contention_time = NS_DEFAULT_CONTENTION_SECONDS;
+        ns->ns_contended_locks = NS_DEFAULT_CONTENDED_LOCKS;
 
         for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash;
              bucket--)
index 05aaa59..80ac03b 100644 (file)
@@ -1317,6 +1317,97 @@ int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
         RETURN(rc);
 }
 
+static void ll_set_file_contended(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        cfs_time_t now = cfs_time_current();
+
+        spin_lock(&lli->lli_lock);
+        lli->lli_contention_time = now;
+        lli->lli_flags |= LLIF_CONTENDED;
+        spin_unlock(&lli->lli_lock);
+}
+
+void ll_clear_file_contended(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+
+        spin_lock(&lli->lli_lock);
+        lli->lli_flags &= ~LLIF_CONTENDED;
+        spin_unlock(&lli->lli_lock);
+}
+
+static int ll_is_file_contended(struct file *file)
+{
+        struct inode *inode = file->f_dentry->d_inode;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+        ENTRY;
+
+        if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
+                CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
+                       " osc connect flags = 0x"LPX64"\n",
+                       sbi->ll_lco.lco_flags);
+                RETURN(0);
+        }
+        if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
+                RETURN(1);
+        if (lli->lli_flags & LLIF_CONTENDED) {
+                cfs_time_t cur_time = cfs_time_current();
+                cfs_time_t retry_time;
+
+                retry_time = cfs_time_add(
+                        lli->lli_contention_time,
+                        cfs_time_seconds(sbi->ll_contention_time));
+                if (cfs_time_after(cur_time, retry_time)) {
+                        ll_clear_file_contended(inode);
+                        RETURN(0);
+                }
+                RETURN(1);
+        }
+        RETURN(0);
+}
+
+static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
+                                 const char *buf, size_t count,
+                                 loff_t start, loff_t end, int rw)
+{
+        int append;
+        int tree_locked = 0;
+        int rc;
+        struct inode * inode = file->f_dentry->d_inode;
+        ENTRY;
+
+        append = (rw == WRITE) && (file->f_flags & O_APPEND);
+
+        if (append || !ll_is_file_contended(file)) {
+                struct ll_lock_tree_node *node;
+                int ast_flags;
+
+                ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
+                if (file->f_flags & O_NONBLOCK)
+                        ast_flags |= LDLM_FL_BLOCK_NOWAIT;
+                node = ll_node_from_inode(inode, start, end,
+                                          (rw == WRITE) ? LCK_PW : LCK_PR);
+                if (IS_ERR(node)) {
+                        rc = PTR_ERR(node);
+                        GOTO(out, rc);
+                }
+                tree->lt_fd = LUSTRE_FPRIVATE(file);
+                rc = ll_tree_lock(tree, node, buf, count, ast_flags);
+                if (rc == 0)
+                        tree_locked = 1;
+                else if (rc == -EUSERS)
+                        ll_set_file_contended(inode);
+                else
+                        GOTO(out, rc);
+        }
+        RETURN(tree_locked);
+out:
+        return rc;
+}
+
 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
                             loff_t *ppos)
 {
@@ -1325,12 +1416,12 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
         struct lov_stripe_md *lsm = lli->lli_smd;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ll_lock_tree tree;
-        struct ll_lock_tree_node *node;
         struct ost_lvb lvb;
         struct ll_ra_read bead;
-        int rc, ra = 0;
+        int ra = 0;
         loff_t end;
         ssize_t retval, chunk, sum = 0;
+        int tree_locked;
 
         __u64 kms;
         ENTRY;
@@ -1368,7 +1459,6 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
                         RETURN(-EFAULT);
                 RETURN(count);
         }
-
 repeat:
         if (sbi->ll_max_rw_chunk != 0) {
                 /* first, let's know the end of the current stripe */
@@ -1387,16 +1477,10 @@ repeat:
                 end = *ppos + count - 1;
         }
 
-        node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
-        if (IS_ERR(node)){
-                GOTO(out, retval = PTR_ERR(node));
-        }
-
-        tree.lt_fd = LUSTRE_FPRIVATE(file);
-        rc = ll_tree_lock(&tree, node, buf, count,
-                          file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
-        if (rc != 0)
-                GOTO(out, retval = rc);
+        tree_locked = ll_file_get_tree_lock(&tree, file, buf,
+                                            count, *ppos, end, READ);
+        if (tree_locked < 0)
+                GOTO(out, retval = tree_locked);
 
         ll_inode_size_lock(inode, 1);
         /*
@@ -1427,7 +1511,8 @@ repeat:
                 ll_inode_size_unlock(inode, 1);
                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
                 if (retval) {
-                        ll_tree_unlock(&tree);
+                        if (tree_locked)
+                                ll_tree_unlock(&tree);
                         goto out;
                 }
         } else {
@@ -1446,23 +1531,27 @@ repeat:
         CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
                inode->i_ino, chunk, *ppos, i_size_read(inode));
 
-        /* turn off the kernel's read-ahead */
-        file->f_ra.ra_pages = 0;
+        if (tree_locked) {
+                /* turn off the kernel's read-ahead */
+                file->f_ra.ra_pages = 0;
 
-        /* initialize read-ahead window once per syscall */
-        if (ra == 0) {
-                ra = 1;
-                bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
-                bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
-                ll_ra_read_in(file, &bead);
-        }
+                /* initialize read-ahead window once per syscall */
+                if (ra == 0) {
+                        ra = 1;
+                        bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
+                        bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
+                        ll_ra_read_in(file, &bead);
+                }
 
-        /* BUG: 5972 */
-        file_accessed(file);
-        retval = generic_file_read(file, buf, chunk, ppos);
-        ll_rw_stats_tally(sbi, current->pid, file, count, 0);
+                /* BUG: 5972 */
+                file_accessed(file);
+                retval = generic_file_read(file, buf, chunk, ppos);
+                ll_tree_unlock(&tree);
+        } else {
+                retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
+        }
 
-        ll_tree_unlock(&tree);
+        ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
 
         if (retval > 0) {
                 buf += retval;
@@ -1489,11 +1578,10 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         struct ll_lock_tree tree;
-        struct ll_lock_tree_node *node;
         loff_t maxbytes = ll_file_maxbytes(inode);
         loff_t lock_start, lock_end, end;
         ssize_t retval, chunk, sum = 0;
-        int rc;
+        int tree_locked;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
@@ -1541,16 +1629,11 @@ repeat:
                 lock_start = *ppos;
                 lock_end = *ppos + count - 1;
         }
-        node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
-
-        if (IS_ERR(node))
-                GOTO(out, retval = PTR_ERR(node));
 
-        tree.lt_fd = LUSTRE_FPRIVATE(file);
-        rc = ll_tree_lock(&tree, node, buf, count,
-                          file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
-        if (rc != 0)
-                GOTO(out, retval = rc);
+        tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
+                                            lock_start, lock_end, WRITE);
+        if (tree_locked < 0)
+                GOTO(out, retval = tree_locked);
 
         /* This is ok, g_f_w will overwrite this under i_sem if it races
          * with a local truncate, it just makes our maxbyte checking easier.
@@ -1565,18 +1648,23 @@ repeat:
                 send_sig(SIGXFSZ, current, 0);
                 GOTO(out_unlock, retval = -EFBIG);
         }
-        if (*ppos + count > maxbytes)
-                count = maxbytes - *ppos;
+        if (end > maxbytes - 1)
+                end = maxbytes - 1;
 
         /* generic_file_write handles O_APPEND after getting i_mutex */
         chunk = end - *ppos + 1;
         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
                inode->i_ino, chunk, *ppos);
-        retval = generic_file_write(file, buf, chunk, ppos);
-        ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
+        if (tree_locked)
+                retval = generic_file_write(file, buf, chunk, ppos);
+        else
+                retval = ll_file_lockless_io(file, (char*)buf, chunk,
+                                             ppos, WRITE);
+        ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
 
 out_unlock:
-        ll_tree_unlock(&tree);
+        if (tree_locked)
+                ll_tree_unlock(&tree);
 
 out:
         if (retval > 0) {
@@ -1638,6 +1726,7 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
         if (rc != 0)
                 RETURN(rc);
 
+        ll_clear_file_contended(inode);
         ll_inode_size_lock(inode, 1);
         /*
          * Consistency guarantees: following possibilities exist for the
index 1bffef4..6aed986 100644 (file)
@@ -78,6 +78,11 @@ enum lli_flags {
         /* Sizeon-on-MDS attributes are changed. An attribute update needs to
          * be sent to MDS. */
         LLIF_SOM_DIRTY          = (1 << 3),
+        /* File is contented */
+        LLIF_CONTENDED         = (1 << 4),
+        /* Truncate uses server lock for this file */
+        LLIF_SRVLOCK           = (1 << 5)
+
 };
 
 struct ll_inode_info {
@@ -89,6 +94,7 @@ struct ll_inode_info {
         __u64                   lli_maxbytes;
         __u64                   lli_ioepoch;
         unsigned long           lli_flags;
+        cfs_time_t              lli_contention_time;
 
         /* this lock protects posix_acl, pending_write_llaps, mmap_cnt */
         spinlock_t              lli_lock;
@@ -234,6 +240,10 @@ enum stats_track_type {
 #define LL_SBI_LOCALFLOCK       0x200 /* Local flocks support by kernel */
 #define LL_SBI_LRU_RESIZE       0x400 /* lru resize support */
 
+/* default value for ll_sb_info->contention_time */
+#define SBI_DEFAULT_CONTENTION_SECONDS     60
+/* default value for lockless_truncate_enable */
+#define SBI_DEFAULT_LOCKLESS_TRUNCATE_ENABLE 1
 #define RCE_HASHES      32
 
 struct rmtacl_ctl_entry {
@@ -289,6 +299,9 @@ struct ll_sb_info {
         unsigned long             ll_pglist_gen;
         struct list_head          ll_pglist; /* all pages (llap_pglist_item) */
 
+        unsigned                  ll_contention_time; /* seconds */
+        unsigned                  ll_lockless_truncate_enable; /* true/false */
+
         struct ll_ra_info         ll_ra_info;
         unsigned int              ll_namelen;
         struct file_operations   *ll_fop;
@@ -458,7 +471,8 @@ struct ll_async_page {
                          llap_defer_uptodate:1,
                          llap_origin:3,
                          llap_ra_used:1,
-                         llap_ignore_quota:1;
+                         llap_ignore_quota:1,
+                         llap_lockless_io_page:1;
         void            *llap_cookie;
         struct page     *llap_page;
         struct list_head llap_pending_write;
@@ -478,6 +492,7 @@ enum {
         LLAP_ORIGIN_COMMIT_WRITE,
         LLAP_ORIGIN_WRITEPAGE,
         LLAP_ORIGIN_REMOVEPAGE,
+        LLAP_ORIGIN_LOCKLESS_IO,
         LLAP__ORIGIN_MAX,
 };
 extern char *llap_origins[];
@@ -545,6 +560,9 @@ struct ll_async_page *llap_cast_private(struct page *page);
 void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras);
 void ll_ra_accounting(struct ll_async_page *llap,struct address_space *mapping);
 void ll_truncate(struct inode *inode);
+int ll_file_punch(struct inode *, loff_t, int);
+ssize_t ll_file_lockless_io(struct file *, char *, size_t, loff_t *, int);
+void ll_clear_file_contended(struct inode*);
 int ll_sync_page_range(struct inode *, struct address_space *, loff_t, size_t);
 
 /* llite/file.c */
index dd9d7e8..7b63ce3 100644 (file)
@@ -76,7 +76,8 @@ static struct ll_sb_info *ll_init_sbi(void)
                                            SBI_DEFAULT_READAHEAD_MAX);
         sbi->ll_ra_info.ra_max_read_ahead_whole_pages =
                                            SBI_DEFAULT_READAHEAD_WHOLE_MAX;
-
+        sbi->ll_contention_time = SBI_DEFAULT_CONTENTION_SECONDS;
+        sbi->ll_lockless_truncate_enable = SBI_DEFAULT_LOCKLESS_TRUNCATE_ENABLE;
         INIT_LIST_HEAD(&sbi->ll_conn_chain);
         INIT_LIST_HEAD(&sbi->ll_orphan_dentry_list);
 
@@ -365,7 +366,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
 
         data->ocd_connect_flags = OBD_CONNECT_GRANT     | OBD_CONNECT_VERSION  |
                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE |
-                                  OBD_CONNECT_CANCELSET | OBD_CONNECT_FID;
+                                  OBD_CONNECT_CANCELSET | OBD_CONNECT_FID      |
+                                  OBD_CONNECT_SRVLOCK   | OBD_CONNECT_TRUNCLOCK;
         if (sbi->ll_flags & LL_SBI_OSS_CAPA)
                 data->ocd_connect_flags |= OBD_CONNECT_OSS_CAPA;
 
@@ -1244,6 +1246,92 @@ static int ll_setattr_done_writing(struct inode *inode,
         RETURN(rc);
 }
 
+static int ll_setattr_do_truncate(struct inode *inode, loff_t new_size)
+{
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct lov_stripe_md *lsm = lli->lli_smd;
+        int rc;
+        ldlm_policy_data_t policy = { .l_extent = {new_size,
+                                                   OBD_OBJECT_EOF } };
+        struct lustre_handle lockh = { 0 };
+        int local_lock = 0; /* 0 - no local lock;
+                             * 1 - lock taken by lock_extent;
+                             * 2 - by obd_match*/
+        int ast_flags;
+        int err;
+        ENTRY;
+
+        UNLOCK_INODE_MUTEX(inode);
+        UP_WRITE_I_ALLOC_SEM(inode);
+
+        if (sbi->ll_lockless_truncate_enable &&
+            (sbi->ll_lco.lco_flags & OBD_CONNECT_TRUNCLOCK)) {
+                ast_flags = LDLM_FL_BLOCK_GRANTED;
+                rc = obd_match(sbi->ll_dt_exp, lsm, LDLM_EXTENT,
+                               &policy, LCK_PW, &ast_flags, inode, &lockh);
+                if (rc > 0) {
+                        local_lock = 2;
+                        rc = 0;
+                } else if (rc == 0) {
+                        rc = ll_file_punch(inode, new_size, 1);
+                }
+        } else {
+                /* XXX when we fix the AST intents to pass the discard-range
+                 * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA
+                 * XXX here. */
+                ast_flags = (new_size == 0) ? LDLM_AST_DISCARD_DATA : 0;
+                rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy,
+                                    &lockh, ast_flags);
+                if (likely(rc == 0))
+                        local_lock = 1;
+        }
+
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+        DOWN_WRITE_I_ALLOC_SEM(inode);
+        LOCK_INODE_MUTEX(inode);
+#else
+        LOCK_INODE_MUTEX(inode);
+        DOWN_WRITE_I_ALLOC_SEM(inode);
+#endif
+        if (likely(rc == 0)) {
+                /* Only ll_inode_size_lock is taken at this level.
+                 * lov_stripe_lock() is grabbed by ll_truncate() only over
+                 * call to obd_adjust_kms().  If vmtruncate returns 0, then
+                 * ll_truncate dropped ll_inode_size_lock() */
+                ll_inode_size_lock(inode, 0);
+                if (!local_lock) {
+                        spin_lock(&lli->lli_lock);
+                        lli->lli_flags |= LLIF_SRVLOCK;
+                        spin_unlock(&lli->lli_lock);
+                }
+                rc = vmtruncate(inode, new_size);
+                if (!local_lock) {
+                        spin_lock(&lli->lli_lock);
+                        lli->lli_flags &= ~LLIF_SRVLOCK;
+                        spin_unlock(&lli->lli_lock);
+                }
+                if (rc != 0) {
+                        LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
+                        ll_inode_size_unlock(inode, 0);
+                }
+        }
+
+        if (local_lock) {
+                if (local_lock == 2)
+                        err = obd_cancel(sbi->ll_dt_exp, lsm, LCK_PW, &lockh);
+                else
+                        err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
+                if (unlikely(err != 0)){
+                        CERROR("extent unlock failed: err=%d,"
+                               " unlock method =%d\n", err, local_lock);
+                        if (rc == 0)
+                                rc = err;
+                }
+        }
+        RETURN(rc);
+}
+
 /* If this inode has objects allocated to it (lsm != NULL), then the OST
  * object(s) determine the file size and mtime.  Otherwise, the MDS will
  * keep these values until such a time that objects are allocated for it.
@@ -1356,43 +1444,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
          * last one is especially bad for racing o_append users on other
          * nodes. */
         if (ia_valid & ATTR_SIZE) {
-                ldlm_policy_data_t policy = { .l_extent = {attr->ia_size,
-                                                           OBD_OBJECT_EOF } };
-                struct lustre_handle lockh = { 0 };
-                int err, ast_flags = 0;
-                /* XXX when we fix the AST intents to pass the discard-range
-                 * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA
-                 * XXX here. */
-                if (attr->ia_size == 0)
-                        ast_flags = LDLM_AST_DISCARD_DATA;
-
-                UNLOCK_INODE_MUTEX(inode);
-                UP_WRITE_I_ALLOC_SEM(inode);
-                rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy, &lockh,
-                                    ast_flags);
-                LOCK_INODE_MUTEX(inode);
-                DOWN_WRITE_I_ALLOC_SEM(inode);
-
-                if (rc != 0)
-                        GOTO(out, rc);
-
-                /* Only ll_inode_size_lock is taken at this level.
-                 * lov_stripe_lock() is grabbed by ll_truncate() only over
-                 * call to obd_adjust_kms().  If vmtruncate returns 0, then
-                 * ll_truncate dropped ll_inode_size_lock() */
-                ll_inode_size_lock(inode, 0);
-                rc = vmtruncate(inode, attr->ia_size);
-                if (rc != 0) {
-                        LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
-                        ll_inode_size_unlock(inode, 0);
-                }
-
-                err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
-                if (err) {
-                        CERROR("ll_extent_unlock failed: %d\n", err);
-                        if (!rc)
-                                rc = err;
-                }
+                rc = ll_setattr_do_truncate(inode, attr->ia_size);
         } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) {
                 obd_flag flags;
                 struct obd_info oinfo = { { { 0 } } };
@@ -2073,6 +2125,7 @@ char *llap_origins[] = {
         [LLAP_ORIGIN_READAHEAD] = "ra",
         [LLAP_ORIGIN_COMMIT_WRITE] = "cw",
         [LLAP_ORIGIN_WRITEPAGE] = "wp",
+        [LLAP_ORIGIN_LOCKLESS_IO] = "ls"
 };
 
 struct ll_async_page *llite_pglist_next_llap(struct ll_sb_info *sbi,
index 63b168b..147a4d7 100644 (file)
@@ -366,6 +366,8 @@ struct page *ll_nopage(struct vm_area_struct *vma, unsigned long address,
                 RETURN(NULL);
         }
 
+        ll_clear_file_contended(inode);
+
         /* start and end the lock on the first and last bytes in the page */
         policy_from_vma(&policy, vma, address, CFS_PAGE_SIZE);
 
index 5417c37..56b8d1a 100644 (file)
@@ -460,6 +460,47 @@ static int ll_wr_track_gid(struct file *file, const char *buffer,
         return (ll_wr_track_id(buffer, count, data, STATS_TRACK_GID));
 }
 
+static int ll_rd_contention_time(char *page, char **start, off_t off,
+                                 int count, int *eof, void *data)
+{
+        struct super_block *sb = data;
+
+        *eof = 1;
+        return snprintf(page, count, "%u\n", ll_s2sbi(sb)->ll_contention_time);
+
+}
+
+static int ll_wr_contention_time(struct file *file, const char *buffer,
+                                 unsigned long count, void *data)
+{
+        struct super_block *sb = data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+        return lprocfs_write_helper(buffer, count,&sbi->ll_contention_time) ?:
+                count;
+}
+
+static int ll_rd_lockless_truncate(char *page, char **start, off_t off,
+                                   int count, int *eof, void *data)
+{
+        struct super_block *sb = data;
+
+        *eof = 1;
+        return snprintf(page, count, "%u\n",
+                        ll_s2sbi(sb)->ll_lockless_truncate_enable);
+}
+
+static int ll_wr_lockless_truncate(struct file *file, const char *buffer,
+                                   unsigned long count, void *data)
+{
+        struct super_block *sb = data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+        return lprocfs_write_helper(buffer, count,
+                                    &sbi->ll_lockless_truncate_enable)
+                                    ?: count;
+}
+
 static struct lprocfs_vars lprocfs_llite_obd_vars[] = {
         { "uuid",         ll_rd_sb_uuid,          0, 0 },
         //{ "mntpt_path",   ll_rd_path,             0, 0 },
@@ -482,6 +523,9 @@ static struct lprocfs_vars lprocfs_llite_obd_vars[] = {
         { "stats_track_pid",  ll_rd_track_pid, ll_wr_track_pid, 0 },
         { "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 },
         { "stats_track_gid",  ll_rd_track_gid, ll_wr_track_gid, 0 },
+        { "contention_seconds", ll_rd_contention_time, ll_wr_contention_time, 0},
+        { "lockless_truncate", ll_rd_lockless_truncate,
+                               ll_wr_lockless_truncate, 0},
         { 0 }
 };
 
@@ -521,6 +565,7 @@ struct llite_file_opcode {
         /* inode operation */
         { LPROC_LL_SETATTR,        LPROCFS_TYPE_REGS, "setattr" },
         { LPROC_LL_TRUNC,          LPROCFS_TYPE_REGS, "truncate" },
+        { LPROC_LL_LOCKLESS_TRUNC, LPROCFS_TYPE_REGS, "lockless_truncate"},
         { LPROC_LL_FLOCK,          LPROCFS_TYPE_REGS, "flock" },
         { LPROC_LL_GETATTR,        LPROCFS_TYPE_REGS, "getattr" },
         /* special inode operation */
@@ -535,6 +580,10 @@ struct llite_file_opcode {
                                    "direct_read" },
         { LPROC_LL_DIRECT_WRITE,   LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_PAGES,
                                    "direct_write" },
+        { LPROC_LL_LOCKLESS_READ,  LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_BYTES,
+                                   "lockless_read_bytes" },
+        { LPROC_LL_LOCKLESS_WRITE, LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_BYTES,
+                                   "lockless_write_bytes" },
 
 };
 
index 3eba56d..94ed749 100644 (file)
@@ -107,6 +107,47 @@ static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
         RETURN(rc);
 }
 
+int ll_file_punch(struct inode * inode, loff_t new_size, int srvlock)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct obd_info oinfo = { { { 0 } } };
+        struct obdo oa;
+        int rc;
+
+        ENTRY;
+        CDEBUG(D_INFO, "calling punch for "LPX64" (new size %Lu=%#Lx)\n",
+               lli->lli_smd->lsm_object_id, i_size_read(inode), i_size_read(inode));
+
+        oinfo.oi_md = lli->lli_smd;
+        oinfo.oi_policy.l_extent.start = new_size;
+        oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
+        oinfo.oi_oa = &oa;
+        oa.o_id = lli->lli_smd->lsm_object_id;
+        oa.o_gr = lli->lli_smd->lsm_object_gr;
+        oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+        if (srvlock) {
+                /* set OBD_MD_FLFLAGS in o_valid, only if we
+                 * set OBD_FL_TRUNCLOCK, otherwise ost_punch
+                 * and filter_setattr get confused, see the comment
+                 * in ost_punch */
+                oa.o_flags = OBD_FL_TRUNCLOCK;
+                oa.o_valid |= OBD_MD_FLFLAGS;
+        }
+        obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
+                        OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME |
+                        OBD_MD_FLFID | OBD_MD_FLGENER);
+
+        oinfo.oi_capa = ll_osscapa_get(inode, CAPA_OPC_OSS_TRUNC);
+        rc = obd_punch_rqset(ll_i2dtexp(inode), &oinfo, NULL);
+        ll_truncate_free_capa(oinfo.oi_capa);
+        if (rc)
+                CERROR("obd_truncate fails (%d) ino %lu\n", rc, inode->i_ino);
+        else
+                obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+                              OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+        RETURN(rc);
+}
+
 /* this isn't where truncate starts.   roughly:
  * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate. setattr_raw grabs
  * DLM lock on [size, EOF], i_mutex, ->lli_size_sem, and WRITE_I_ALLOC_SEM to
@@ -116,10 +157,8 @@ static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
 void ll_truncate(struct inode *inode)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
-        struct obd_info oinfo = { { { 0 } } };
-        struct ost_lvb lvb;
-        struct obdo oa;
-        int rc;
+        int srvlock = !!(lli->lli_flags & LLIF_SRVLOCK);
+        loff_t new_size;
         ENTRY;
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) to %Lu=%#Lx\n",inode->i_ino,
                inode->i_generation, inode, i_size_read(inode),
@@ -139,22 +178,27 @@ void ll_truncate(struct inode *inode)
 
         LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
 
-        /* XXX I'm pretty sure this is a hack to paper over a more fundamental
-         * race condition. */
-        lov_stripe_lock(lli->lli_smd);
-        inode_init_lvb(inode, &lvb);
-        rc = obd_merge_lvb(ll_i2dtexp(inode), lli->lli_smd, &lvb, 0);
-        if (lvb.lvb_size == i_size_read(inode) && rc == 0) {
-                CDEBUG(D_VFSTRACE, "skipping punch for obj "LPX64", %Lu=%#Lx\n",
-                       lli->lli_smd->lsm_object_id, i_size_read(inode),
-                       i_size_read(inode));
+        if (!srvlock) {
+                struct ost_lvb lvb;
+                int rc;
+
+                /* XXX I'm pretty sure this is a hack to paper
+                 * over a more fundamental race condition. */
+                lov_stripe_lock(lli->lli_smd);
+                inode_init_lvb(inode, &lvb);
+                rc = obd_merge_lvb(ll_i2dtexp(inode), lli->lli_smd, &lvb, 0);
+                if (lvb.lvb_size == i_size_read(inode) && rc == 0) {
+                        CDEBUG(D_VFSTRACE, "skipping punch for obj "LPX64
+                               ",%Lu=%#Lx\n", lli->lli_smd->lsm_object_id,
+                               i_size_read(inode), i_size_read(inode));
+                        lov_stripe_unlock(lli->lli_smd);
+                        GOTO(out_unlock, 0);
+                }
+                obd_adjust_kms(ll_i2dtexp(inode), lli->lli_smd,
+                               i_size_read(inode), 1);
                 lov_stripe_unlock(lli->lli_smd);
-                GOTO(out_unlock, 0);
         }
 
-        obd_adjust_kms(ll_i2dtexp(inode), lli->lli_smd, i_size_read(inode), 1);
-        lov_stripe_unlock(lli->lli_smd);
-
         if (unlikely((ll_i2sbi(inode)->ll_flags & LL_SBI_CHECKSUM) &&
                      (i_size_read(inode) & ~CFS_PAGE_MASK))) {
                 /* If the truncate leaves behind a partial page, update its
@@ -178,31 +222,13 @@ void ll_truncate(struct inode *inode)
                 }
         }
 
-        CDEBUG(D_INFO, "calling punch for "LPX64" (new size %Lu=%#Lx)\n",
-               lli->lli_smd->lsm_object_id, i_size_read(inode), i_size_read(inode));
-
-        oinfo.oi_md = lli->lli_smd;
-        oinfo.oi_policy.l_extent.start = i_size_read(inode);
-        oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
-        oinfo.oi_oa = &oa;
-        oa.o_id = lli->lli_smd->lsm_object_id;
-        oa.o_gr = lli->lli_smd->lsm_object_gr;
-        oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
-
-        obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
-                        OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME |
-                        OBD_MD_FLFID | OBD_MD_FLGENER);
-
+        new_size = i_size_read(inode);
         ll_inode_size_unlock(inode, 0);
-
-        oinfo.oi_capa = ll_osscapa_get(inode, CAPA_OPC_OSS_TRUNC);
-        rc = obd_punch_rqset(ll_i2dtexp(inode), &oinfo, NULL);
-        ll_truncate_free_capa(oinfo.oi_capa);
-        if (rc)
-                CERROR("obd_truncate fails (%d) ino %lu\n", rc, inode->i_ino);
+        if (!srvlock)
+                ll_file_punch(inode, new_size, 0);
         else
-                obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
-                              OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+                ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LOCKLESS_TRUNC, 1);
+
         EXIT;
         return;
 
@@ -650,7 +676,8 @@ struct ll_async_page *llap_from_page(struct page *page, unsigned origin)
                                         OSC_DEFAULT_CKSUM);
                 kunmap_atomic(kaddr, KM_USER0);
                 if (origin == LLAP_ORIGIN_READAHEAD ||
-                    origin == LLAP_ORIGIN_READPAGE) {
+                    origin == LLAP_ORIGIN_READPAGE ||
+                    origin == LLAP_ORIGIN_LOCKLESS_IO) {
                         llap->llap_checksum = 0;
                 } else if (origin == LLAP_ORIGIN_COMMIT_WRITE ||
                            llap->llap_checksum == 0) {
@@ -933,11 +960,7 @@ int ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
         RETURN(ret);
 }
 
-/* the kernel calls us here when a page is unhashed from the page cache.
- * the page will be locked and the kernel is holding a spinlock, so
- * we need to be careful.  we're just tearing down our book-keeping
- * here. */
-void ll_removepage(struct page *page)
+static void __ll_put_llap(struct page *page)
 {
         struct inode *inode = page->mapping->host;
         struct obd_export *exp;
@@ -946,17 +969,6 @@ void ll_removepage(struct page *page)
         int rc;
         ENTRY;
 
-        LASSERT(!in_interrupt());
-
-        /* sync pages or failed read pages can leave pages in the page
-         * cache that don't have our data associated with them anymore */
-        if (page_private(page) == 0) {
-                EXIT;
-                return;
-        }
-
-        LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n");
-
         exp = ll_i2dtexp(inode);
         if (exp == NULL) {
                 CERROR("page %p ind %lu gave null export\n", page, page->index);
@@ -994,6 +1006,29 @@ void ll_removepage(struct page *page)
         EXIT;
 }
 
+/* the kernel calls us here when a page is unhashed from the page cache.
+ * the page will be locked and the kernel is holding a spinlock, so
+ * we need to be careful.  we're just tearing down our book-keeping
+ * here. */
+void ll_removepage(struct page *page)
+{
+        ENTRY;
+
+        LASSERT(!in_interrupt());
+
+        /* sync pages or failed read pages can leave pages in the page
+         * cache that don't have our data associated with them anymore */
+        if (page_private(page) == 0) {
+                EXIT;
+                return;
+        }
+
+        LASSERT(!llap_cast_private(page)->llap_lockless_io_page);
+        LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n");
+        __ll_put_llap(page);
+        EXIT;
+}
+
 static int ll_page_matches(struct page *page, int fd_flags)
 {
         struct lustre_handle match_lockh = {0};
@@ -1872,3 +1907,274 @@ out_oig:
                 oig_release(oig);
         RETURN(rc);
 }
+
+static void ll_file_put_pages(struct page **pages, int numpages)
+{
+        int i;
+        struct page **pp;
+        ENTRY;
+
+        for (i = 0, pp = pages; i < numpages; i++, pp++) {
+                if (*pp) {
+                        LL_CDEBUG_PAGE(D_PAGE, (*pp), "free\n");
+                        __ll_put_llap(*pp);
+                        if (page_private(*pp))
+                                CERROR("the llap wasn't freed\n");
+                        (*pp)->mapping = NULL;
+                        if (page_count(*pp) != 1)
+                                CERROR("page %p, flags %#lx, count %i, private %p\n",
+                                (*pp), (unsigned long)(*pp)->flags, page_count(*pp),
+                                (void*)page_private(*pp));
+                        __free_pages(*pp, 0);
+                }
+        }
+        OBD_FREE(pages, numpages * sizeof(struct page*));
+        EXIT;
+}
+
+static struct page **ll_file_prepare_pages(int numpages, struct inode *inode,
+                                           unsigned long first)
+{
+        struct page **pages;
+        int i;
+        int rc = 0;
+        ENTRY;
+
+        OBD_ALLOC(pages, sizeof(struct page *) * numpages);
+        if (pages == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
+        for (i = 0; i < numpages; i++) {
+                struct page *page;
+                struct ll_async_page *llap;
+
+                page = alloc_pages(GFP_HIGHUSER, 0);
+                if (page == NULL)
+                        GOTO(err, rc = -ENOMEM);
+                pages[i] = page;
+                /* llap_from_page needs page index and mapping to be set */
+                page->index = first++;
+                page->mapping = inode->i_mapping;
+                llap = llap_from_page(page, LLAP_ORIGIN_LOCKLESS_IO);
+                if (IS_ERR(llap))
+                        GOTO(err, rc = PTR_ERR(llap));
+                llap->llap_lockless_io_page = 1;
+        }
+        RETURN(pages);
+err:
+        ll_file_put_pages(pages, numpages);
+        RETURN(ERR_PTR(rc));
+ }
+
+static ssize_t ll_file_copy_pages(struct page **pages, int numpages,
+                                  char *buf, loff_t pos, size_t count, int rw)
+{
+        ssize_t amount = 0;
+        int i;
+        int updatechecksum = ll_i2sbi(pages[0]->mapping->host)->ll_flags &
+                             LL_SBI_CHECKSUM;
+        ENTRY;
+
+        for (i = 0; i < numpages; i++) {
+                unsigned offset, bytes, left;
+                char *vaddr;
+
+                vaddr = kmap(pages[i]);
+                offset = pos & (CFS_PAGE_SIZE - 1);
+                bytes = min_t(unsigned, CFS_PAGE_SIZE - offset, count);
+                LL_CDEBUG_PAGE(D_PAGE, pages[i], "op = %s, addr = %p, "
+                               "buf = %p, bytes = %u\n",
+                               (rw == WRITE) ? "CFU" : "CTU",
+                               vaddr + offset, buf, bytes);
+                if (rw == WRITE) {
+                        left = copy_from_user(vaddr + offset, buf, bytes);
+                        if (updatechecksum) {
+                                struct ll_async_page *llap;
+
+                                llap = llap_cast_private(pages[i]);
+                                llap->llap_checksum = crc32_le(0, vaddr,
+                                                               CFS_PAGE_SIZE);
+                        }
+                } else {
+                        left = copy_to_user(buf, vaddr + offset, bytes);
+                }
+                kunmap(pages[i]);
+                amount += bytes;
+                if (left) {
+                        amount -= left;
+                        break;
+                }
+                buf += bytes;
+                count -= bytes;
+                pos += bytes;
+        }
+        if (amount == 0)
+                RETURN(-EFAULT);
+        RETURN(amount);
+}
+
+static int ll_file_oig_pages(struct inode * inode, struct page **pages,
+                             int numpages, loff_t pos, size_t count, int rw)
+{
+        struct obd_io_group *oig;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct obd_export *exp;
+        loff_t org_pos = pos;
+        obd_flag brw_flags;
+        int rc;
+        int i;
+        ENTRY;
+
+        exp = ll_i2dtexp(inode);
+        if (exp == NULL)
+                RETURN(-EINVAL);
+        rc = oig_init(&oig);
+        if (rc)
+                RETURN(rc);
+        brw_flags = OBD_BRW_SRVLOCK;
+        if (capable(CAP_SYS_RESOURCE))
+                brw_flags |= OBD_BRW_NOQUOTA;
+
+        for (i = 0; i < numpages; i++) {
+                struct ll_async_page *llap;
+                unsigned from, bytes;
+
+                from = pos & (CFS_PAGE_SIZE - 1);
+                bytes = min_t(unsigned, CFS_PAGE_SIZE - from,
+                              count - pos + org_pos);
+                llap = llap_cast_private(pages[i]);
+                LASSERT(llap);
+
+                lock_page(pages[i]);
+
+                LL_CDEBUG_PAGE(D_PAGE, pages[i], "offset "LPU64","
+                               " from %u, bytes = %u\n",
+                               pos, from, bytes);
+                LASSERTF(pos >> CFS_PAGE_SHIFT == pages[i]->index,
+                         "wrong page index %lu (%lu)\n",
+                         pages[i]->index,
+                         (unsigned long)(pos >> CFS_PAGE_SHIFT));
+                rc = obd_queue_group_io(exp, lli->lli_smd, NULL, oig,
+                                        llap->llap_cookie,
+                                        (rw == WRITE) ?
+                                        OBD_BRW_WRITE:OBD_BRW_READ,
+                                        from, bytes, brw_flags,
+                                        ASYNC_READY | ASYNC_URGENT |
+                                        ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
+                if (rc) {
+                        i++;
+                        GOTO(out, rc);
+                }
+                pos += bytes;
+        }
+        rc = obd_trigger_group_io(exp, lli->lli_smd, NULL, oig);
+        if (rc)
+                GOTO(out, rc);
+        rc = oig_wait(oig);
+out:
+        while(--i >= 0)
+                unlock_page(pages[i]);
+        oig_release(oig);
+        RETURN(rc);
+}
+
+ssize_t ll_file_lockless_io(struct file *file, char *buf, size_t count,
+                                   loff_t *ppos, int rw)
+{
+        loff_t pos;
+        struct inode *inode = file->f_dentry->d_inode;
+        ssize_t rc = 0;
+        int max_pages;
+        size_t amount = 0;
+        unsigned long first, last;
+        ENTRY;
+
+        if (rw == READ) {
+                loff_t isize;
+
+                ll_inode_size_lock(inode, 0);
+                isize = i_size_read(inode);
+                ll_inode_size_unlock(inode, 0);
+                if (*ppos >= isize)
+                        GOTO(out, rc = 0);
+                if (*ppos + count >= isize)
+                        count -= *ppos + count - isize;
+                if (count == 0)
+                        GOTO(out, rc);
+        } else {
+                rc = generic_write_checks(file, ppos, &count, 0);
+                if (rc)
+                        GOTO(out, rc);
+                rc = remove_suid(file->f_dentry);
+                if (rc)
+                        GOTO(out, rc);
+        }
+        pos = *ppos;
+        first = pos >> CFS_PAGE_SHIFT;
+        last = (pos + count - 1) >> CFS_PAGE_SHIFT;
+        max_pages = PTLRPC_MAX_BRW_PAGES *
+                ll_i2info(inode)->lli_smd->lsm_stripe_count;
+        CDEBUG(D_INFO, "%u, stripe_count = %u\n",
+               PTLRPC_MAX_BRW_PAGES /* max_pages_per_rpc */,
+               ll_i2info(inode)->lli_smd->lsm_stripe_count);
+
+        while (first <= last && rc >= 0) {
+                int pages_for_io;
+                struct page **pages;
+                size_t bytes = count - amount;
+
+                pages_for_io = min_t(int, last - first + 1, max_pages);
+                pages = ll_file_prepare_pages(pages_for_io, inode, first);
+                if (IS_ERR(pages)) {
+                        rc = PTR_ERR(pages);
+                        break;
+                }
+                if (rw == WRITE) {
+                        rc = ll_file_copy_pages(pages, pages_for_io, buf,
+                                                pos + amount, bytes, rw);
+                        if (rc < 0)
+                                GOTO(put_pages, rc);
+                        bytes = rc;
+                }
+                rc = ll_file_oig_pages(inode, pages, pages_for_io,
+                                       pos + amount, bytes, rw);
+                if (rc)
+                        GOTO(put_pages, rc);
+                if (rw == READ) {
+                        rc = ll_file_copy_pages(pages, pages_for_io, buf,
+                                                pos + amount, bytes, rw);
+                        if (rc < 0)
+                                GOTO(put_pages, rc);
+                        bytes = rc;
+                }
+                amount += bytes;
+                buf += bytes;
+put_pages:
+                ll_file_put_pages(pages, pages_for_io);
+                first += pages_for_io;
+                /* a short read/write check */
+                if (pos + amount < ((loff_t)first << CFS_PAGE_SHIFT))
+                        break;
+        }
+        /* NOTE: don't update i_size and KMS in absence of LDLM locks even
+         * write makes the file large */
+        file_accessed(file);
+        if (rw == READ && amount < count && rc == 0) {
+                unsigned long not_cleared;
+
+                not_cleared = clear_user(buf, count - amount);
+                amount = count - not_cleared;
+                if (not_cleared)
+                        rc = -EFAULT;
+        }
+        if (amount > 0) {
+                lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
+                                    (rw == WRITE) ?
+                                    LPROC_LL_LOCKLESS_WRITE :
+                                    LPROC_LL_LOCKLESS_READ,
+                                    (long)amount);
+                *ppos += amount;
+                RETURN(amount);
+        }
+out:
+        RETURN(rc);
+}
index 5d80fcb..aeb17a0 100644 (file)
@@ -176,7 +176,8 @@ int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
                 if (lov->lov_tgts[req->rq_idx] && 
                     lov->lov_tgts[req->rq_idx]->ltd_active) {
-                        if (rc != -EINTR)
+                        /* -EUSERS used by OST to report file contention */
+                        if (rc != -EINTR && rc != -EUSERS)
                                 CERROR("enqueue objid "LPX64" subobj "
                                        LPX64" on OST idx %d: rc %d\n",
                                        set->set_oi->oi_md->lsm_object_id,
index fd6a232..2bad71a 100644 (file)
@@ -2180,6 +2180,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
         CFS_LIST_HEAD(rpc_list);
         unsigned int ending_offset;
         unsigned  starting_offset = 0;
+        int srvlock = 0;
         ENTRY;
 
         /* first we find the pages we're allowed to work with */
@@ -2189,6 +2190,13 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
 
                 LASSERT(oap->oap_magic == OAP_MAGIC);
 
+                if (page_count != 0 &&
+                    srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
+                        CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
+                               " oap %p, page %p, srvlock %u\n",
+                               oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
+                        break;
+                }
                 /* in llite being 'ready' equates to the page being locked
                  * until completion unlocks it.  commit_write submits a page
                  * as not ready because its unlock will happen unconditionally
@@ -2270,6 +2278,8 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
 
                 /* now put the page back in our accounting */
                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
+                if (page_count == 0)
+                        srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
                 if (++page_count >= cli->cl_max_pages_per_rpc)
                         break;
 
index 8370a8b..777dec8 100644 (file)
@@ -608,6 +608,85 @@ test_31() {
 }
 run_test 31 "voluntary cancel / blocking ast race=============="
 
+# enable/disable lockless truncate feature, depending on the arg 0/1
+enable_lockless_truncate() {
+        lctl set_param -n llite.*.lockless_truncate $1
+}
+
+test_32a() { # bug 11270
+        local p="$TMP/sanityN-$TESTNAME.parameters"
+        save_lustre_params $HOSTNAME llite.*.lockless_truncate > $p
+        cancel_lru_locks osc
+        clear_llite_stats
+        enable_lockless_truncate 1
+        dd if=/dev/zero of=$DIR1/$tfile count=10 bs=1M > /dev/null 2>&1
+
+        log "checking cached lockless truncate"
+        $TRUNCATE $DIR1/$tfile 8000000
+        $CHECKSTAT -s 8000000 $DIR2/$tfile || error "wrong file size"
+        [ $(calc_llite_stats lockless_truncate) -eq 0 ] ||
+                error "lockless truncate doesn't use cached locks"
+
+        log "checking not cached lockless truncate"
+        $TRUNCATE $DIR2/$tfile 5000000
+        $CHECKSTAT -s 5000000 $DIR1/$tfile || error "wrong file size"
+        [ $(calc_llite_stats lockless_truncate) -ne 0 ] ||
+                error "not cached trancate isn't lockless"
+
+        log "disabled lockless truncate"
+        enable_lockless_truncate 0
+        clear_llite_stats
+        $TRUNCATE $DIR2/$tfile 3000000
+        $CHECKSTAT -s 3000000 $DIR1/$tfile || error "wrong file size"
+        [ $(calc_llite_stats lockless_truncate) -eq 0 ] ||
+                error "lockless truncate disabling failed"
+        rm $DIR1/$tfile
+        # restore lockless_truncate default values
+        restore_lustre_params < $p
+        rm -f $p
+}
+run_test 32a "lockless truncate"
+
+test_32b() { # bug 11270
+        local node
+        local p="$TMP/sanityN-$TESTNAME.parameters"
+        save_lustre_params $HOSTNAME "llite.*.contention_seconds" > $p
+        for node in $(osts_nodes); do
+                save_lustre_params $node "ldlm.namespaces.filter-*.max_nolock_bytes" >> $p
+                save_lustre_params $node "ldlm.namespaces.filter-*.contended_locks" >> $p
+                save_lustre_params $node "ldlm.namespaces.filter-*.contention_seconds" >> $p
+        done
+        clear_llite_stats
+        # agressive lockless i/o settings 
+        for node in $(osts_nodes); do
+                do_node $node 'lctl set_param -n ldlm.namespaces.filter-*.max_nolock_bytes 2000000; lctl set_param -n ldlm.namespaces.filter-*.contended_locks 0; lctl set_param -n ldlm.namespaces.filter-*.contention_seconds 60'
+        done
+        lctl set_param -n llite.*.contention_seconds 60
+        for i in $(seq 5); do
+                dd if=/dev/zero of=$DIR1/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+                dd if=/dev/zero of=$DIR2/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+        done
+        [ $(calc_llite_stats lockless_write_bytes) -ne 0 ] || error "lockless i/o was not triggered" 
+        # disable lockless i/o (it is disabled by default)
+        for node in $(osts_nodes); do
+                do_node $node 'lctl set_param -n ldlm.namespaces.filter-*.max_nolock_bytes 0; lctl set_param -n ldlm.namespaces.filter-*.contended_locks 32; lctl set_param -n ldlm.namespaces.filter-*.contention_seconds 0'
+        done
+        # set contention_seconds to 0 at client too, otherwise Lustre still
+        # remembers lock contention
+        lctl set_param -n llite.*.contention_seconds 0
+        clear_llite_stats
+        for i in $(seq 5); do
+                dd if=/dev/zero of=$DIR1/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+                dd if=/dev/zero of=$DIR2/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+        done
+        [ $(calc_llite_stats lockless_write_bytes) -eq 0 ] ||
+                error "lockless i/o works when disabled" 
+        rm -f $DIR1/$tfile
+        restore_lustre_params <$p
+        rm -f $p
+}
+run_test 32b "lockless i/o"
+
 log "cleanup: ======================================================"
 
 check_and_cleanup_lustre
index c08579c..a92a828 100644 (file)
@@ -1635,3 +1635,33 @@ multiop_bg_pause() {
 
     return 0
 }
+
+# reset llite stat counters
+clear_llite_stats(){
+        lctl set_param -n llite.*.stats 0
+}
+
+# sum llite stat items
+calc_llite_stats() {
+        local res=$(lctl get_param -n llite.*.stats |
+                    awk 'BEGIN {s = 0} END {print s} /^'"$1"'/ {s += $2}')
+        echo $res
+}
+
+# save_lustre_params(node, parameter_mask)
+# generate a stream of formatted strings (<node> <param name>=<param value>)
+save_lustre_params() {
+        local s
+        do_node $1 "lctl get_param $2" | while read s; do echo "$1 $s"; done
+}
+
+# restore lustre parameters from input stream, produces by save_lustre_params
+restore_lustre_params() {
+        local node
+        local name
+        local val
+        while IFS=" =" read node name val; do
+                do_node $node "lctl set_param -n $name $val"
+        done
+}
+