Whamcloud - gitweb
LU-1669 llite: Replace write mutex with range lock 20/6320/20
authorPrakash Surya <surya1@llnl.gov>
Wed, 19 Jun 2013 17:30:36 +0000 (10:30 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 12 Sep 2014 16:36:41 +0000 (16:36 +0000)
Testing has shown the ll_inode_inode's lli_write_mutex to be a
limiting factor with single shared file write performance, when using
many writing threads on a single machine. Even if each thread is
writing to a unique portion of the file, the lli_write_mutex will
prevent no more than a single thread to ever write to the file
simultaneously.

This change attempts to remove this bottle neck, by replacing this
mutex with a range lock. This should allow multiple threads to write
to a single file simultaneously iff the threads are writing to unique
regions of the file.

Performance testing shows this change to garner a significant
performance boost to write bandwidth. Using FIO on a single machine
with 32 GB of RAM, write performance tests were run with and without
this change applied; the results are below:

    +---------+-----------+---------+--------+--------+--------+
    |     fio v2.0.13     |        Write Bandwidth (KB/s)      |
    +---------+-----------+---------+--------+--------+--------+
    | # Tasks | GB / Task | Test 1  | Test 2 | Test 3 | Test 4 |
    +---------+-----------+---------+--------+--------+--------+
    |    1    |    64     |  452446 | 454623 | 457653 | 463737 |
    |    2    |    32     |  850318 | 565373 | 602498 | 733027 |
    |    4    |    16     | 1058900 | 463546 | 529107 | 976284 |
    |    8    |     8     | 1026300 | 468190 | 576451 | 963404 |
    |   16    |     4     | 1065500 | 503160 | 462902 | 830065 |
    |   32    |     2     | 1068600 | 462228 | 466963 | 749733 |
    |   64    |     1     |  991830 | 556618 | 557863 | 710912 |
    +---------+-----------+---------+--------+--------+--------+

 * Test 1: Lustre client running 04ec54f. File per process write
           workload. This test was used as a baseline for what we
           _could_ achieve in the single shared file tests if the
           bottle necks were removed.

 * Test 2: Lustre client running 04ec54f. Single shared file
           workload, each task writing to a unique region.

 * Test 3: Lustre client running 04ec54f + I0023132b. Single shared
           file workload, each task writing to a unique region.

 * Test 4: Lustre client running 04ec54f + this patch.
           Single shared file workload, each task writing to a unique
           region.

Signed-off-by: Prakash Surya <surya1@llnl.gov>
Change-Id: I71e060c190065d87a20dc8df3104f898883d0583
Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Reviewed-on: http://review.whamcloud.com/6320
Tested-by: Jenkins
Reviewed-by: Hiroya Nozaki <nozaki.hiroya@jp.fujitsu.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/interval_tree.h
lustre/ldlm/interval_tree.c
lustre/llite/Makefile.in
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/range_lock.c [new file with mode: 0644]
lustre/llite/range_lock.h [new file with mode: 0644]

index 22581e3..82ccea1 100644 (file)
@@ -86,6 +86,13 @@ static inline void interval_set(struct interval_node *node,
         node->in_max_high = end;
 }
 
         node->in_max_high = end;
 }
 
+static inline void interval_init(struct interval_node *node)
+{
+       memset(node, 0, sizeof(*node));
+}
+
+int node_equal(struct interval_node *n1, struct interval_node *n2);
+
 /* Rules to write an interval callback.
  *  - the callback returns INTERVAL_ITER_STOP when it thinks the iteration
  *    should be stopped. It will then cause the iteration function to return
 /* Rules to write an interval callback.
  *  - the callback returns INTERVAL_ITER_STOP when it thinks the iteration
  *    should be stopped. It will then cause the iteration function to return
index b052766..53c23d5 100644 (file)
@@ -99,7 +99,7 @@ static inline int extent_equal(struct interval_node_extent *e1,
         return (e1->start == e2->start) && (e1->end == e2->end);
 }
 
         return (e1->start == e2->start) && (e1->end == e2->end);
 }
 
-static inline int extent_overlapped(struct interval_node_extent *e1, 
+static inline int extent_overlapped(struct interval_node_extent *e1,
                                     struct interval_node_extent *e2)
 {
         return (e1->start <= e2->end) && (e2->start <= e1->end);
                                     struct interval_node_extent *e2)
 {
         return (e1->start <= e2->end) && (e2->start <= e1->end);
@@ -111,11 +111,11 @@ static inline int node_compare(struct interval_node *n1,
         return extent_compare(&n1->in_extent, &n2->in_extent);
 }
 
         return extent_compare(&n1->in_extent, &n2->in_extent);
 }
 
-static inline int node_equal(struct interval_node *n1,
-                             struct interval_node *n2)
+int node_equal(struct interval_node *n1, struct interval_node *n2)
 {
 {
-        return extent_equal(&n1->in_extent, &n2->in_extent);
+       return extent_equal(&n1->in_extent, &n2->in_extent);
 }
 }
+EXPORT_SYMBOL(node_equal);
 
 static inline __u64 max_u64(__u64 x, __u64 y)
 {
 
 static inline __u64 max_u64(__u64 x, __u64 y)
 {
@@ -623,59 +623,61 @@ static inline int interval_may_overlap(struct interval_node *node,
  *
  */
 enum interval_iter interval_search(struct interval_node *node,
  *
  */
 enum interval_iter interval_search(struct interval_node *node,
-                                   struct interval_node_extent *ext,
-                                   interval_callback_t func,
-                                   void *data)
-{
-        struct interval_node *parent;
-        enum interval_iter rc = INTERVAL_ITER_CONT;
-
-        LASSERT(ext != NULL);
-        LASSERT(func != NULL);
-
-        while (node) {
-                if (ext->end < interval_low(node)) {
-                        if (node->in_left) {
-                                node = node->in_left;
-                                continue;
-                        }
-                } else if (interval_may_overlap(node, ext)) {
-                        if (extent_overlapped(ext, &node->in_extent)) {
-                                rc = func(node, data);
-                                if (rc == INTERVAL_ITER_STOP)
-                                        break;
-                        }
-
-                        if (node->in_left) {
-                                node = node->in_left;
-                                continue;
-                        }
-                        if (node->in_right) {
-                                node = node->in_right;
-                                continue;
-                        }
-                } 
-
-                parent = node->in_parent;
-                while (parent) {
-                        if (node_is_left_child(node) &&
-                            parent->in_right) {
-                                /* If we ever got the left, it means that the 
-                                 * parent met ext->end<interval_low(parent), or
-                                 * may_overlap(parent). If the former is true,
-                                 * we needn't go back. So stop early and check
-                                 * may_overlap(parent) after this loop.  */
-                                node = parent->in_right;
-                                break;
-                        }
-                        node = parent;
-                        parent = parent->in_parent;
-                }
-                if (parent == NULL || !interval_may_overlap(parent, ext))
-                        break;
-        }
-
-        return rc;
+                                  struct interval_node_extent *ext,
+                                  interval_callback_t func,
+                                  void *data)
+{
+       struct interval_node *parent;
+       enum interval_iter rc = INTERVAL_ITER_CONT;
+
+       ENTRY;
+
+       LASSERT(ext != NULL);
+       LASSERT(func != NULL);
+
+       while (node) {
+               if (ext->end < interval_low(node)) {
+                       if (node->in_left) {
+                               node = node->in_left;
+                               continue;
+                       }
+               } else if (interval_may_overlap(node, ext)) {
+                       if (extent_overlapped(ext, &node->in_extent)) {
+                               rc = func(node, data);
+                               if (rc == INTERVAL_ITER_STOP)
+                                       break;
+                       }
+
+                       if (node->in_left) {
+                               node = node->in_left;
+                               continue;
+                       }
+                       if (node->in_right) {
+                               node = node->in_right;
+                               continue;
+                       }
+               }
+
+               parent = node->in_parent;
+               while (parent) {
+                       if (node_is_left_child(node) &&
+                           parent->in_right) {
+                               /* If we ever got the left, it means that the
+                                * parent met ext->end<interval_low(parent), or
+                                * may_overlap(parent). If the former is true,
+                                * we needn't go back. So stop early and check
+                                * may_overlap(parent) after this loop.  */
+                               node = parent->in_right;
+                               break;
+                       }
+                       node = parent;
+                       parent = parent->in_parent;
+               }
+               if (parent == NULL || !interval_may_overlap(parent, ext))
+                       break;
+       }
+
+       RETURN(rc);
 }
 EXPORT_SYMBOL(interval_search);
 
 }
 EXPORT_SYMBOL(interval_search);
 
index cfe49d3..16a839c 100644 (file)
@@ -8,11 +8,12 @@ lustre-objs += glimpse.o
 lustre-objs += lcommon_cl.o
 lustre-objs += lcommon_misc.o
 lustre-objs += vvp_dev.o vvp_page.o vvp_lock.o vvp_io.o vvp_object.o
 lustre-objs += lcommon_cl.o
 lustre-objs += lcommon_misc.o
 lustre-objs += vvp_dev.o vvp_page.o vvp_lock.o vvp_io.o vvp_object.o
+lustre-objs += range_lock.o
 
 llite_lloop-objs := lloop.o
 
 EXTRA_DIST := $(lustre-objs:.o=.c) llite_internal.h rw26.c super25.c
 EXTRA_DIST += $(llite_lloop-objs:.o=.c)
 
 llite_lloop-objs := lloop.o
 
 EXTRA_DIST := $(lustre-objs:.o=.c) llite_internal.h rw26.c super25.c
 EXTRA_DIST += $(llite_lloop-objs:.o=.c)
-EXTRA_DIST += vvp_internal.h
+EXTRA_DIST += vvp_internal.h range_lock.h
 
 @INCLUDE_RULES@
 
 @INCLUDE_RULES@
index 8e3d07c..19948f7 100644 (file)
@@ -1141,6 +1141,7 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
        struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
        struct cl_io         *io;
        ssize_t               result;
        struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
        struct cl_io         *io;
        ssize_t               result;
+       struct range_lock     range;
        ENTRY;
 
        CDEBUG(D_VFSTRACE, "file: %s, type: %d ppos: "LPU64", count: %zd\n",
        ENTRY;
 
        CDEBUG(D_VFSTRACE, "file: %s, type: %d ppos: "LPU64", count: %zd\n",
@@ -1151,10 +1152,14 @@ restart:
         ll_io_init(io, file, iot == CIT_WRITE);
 
         if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
         ll_io_init(io, file, iot == CIT_WRITE);
 
         if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
-                struct vvp_io *vio = vvp_env_io(env);
-                struct ccc_io *cio = ccc_env_io(env);
-                int write_mutex_locked = 0;
+               struct vvp_io *vio = vvp_env_io(env);
+               struct ccc_io *cio = ccc_env_io(env);
+               bool range_locked = false;
 
 
+               if (file->f_flags & O_APPEND)
+                       range_lock_init(&range, 0, LUSTRE_EOF);
+               else
+                       range_lock_init(&range, *ppos, *ppos + count - 1);
                cio->cui_fd  = LUSTRE_FPRIVATE(file);
                vio->cui_io_subtype = args->via_io_subtype;
 
                cio->cui_fd  = LUSTRE_FPRIVATE(file);
                vio->cui_io_subtype = args->via_io_subtype;
 
@@ -1166,10 +1171,14 @@ restart:
                         cio->cui_iocb = args->u.normal.via_iocb;
                         if ((iot == CIT_WRITE) &&
                             !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
                         cio->cui_iocb = args->u.normal.via_iocb;
                         if ((iot == CIT_WRITE) &&
                             !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
-                               if (mutex_lock_interruptible(&lli->
-                                                       lli_write_mutex))
-                                       GOTO(out, result = -ERESTARTSYS);
-                               write_mutex_locked = 1;
+                               CDEBUG(D_VFSTRACE, "Range lock "RL_FMT"\n",
+                                      RL_PARA(&range));
+                               result = range_lock(&lli->lli_write_tree,
+                                                   &range);
+                               if (result < 0)
+                                       GOTO(out, result);
+
+                               range_locked = true;
                        }
                        down_read(&lli->lli_trunc_sem);
                         break;
                        }
                        down_read(&lli->lli_trunc_sem);
                         break;
@@ -1188,8 +1197,11 @@ restart:
 
                if (args->via_io_subtype == IO_NORMAL)
                        up_read(&lli->lli_trunc_sem);
 
                if (args->via_io_subtype == IO_NORMAL)
                        up_read(&lli->lli_trunc_sem);
-               if (write_mutex_locked)
-                       mutex_unlock(&lli->lli_write_mutex);
+               if (range_locked) {
+                       CDEBUG(D_VFSTRACE, "Range unlock "RL_FMT"\n",
+                              RL_PARA(&range));
+                       range_unlock(&lli->lli_write_tree, &range);
+               }
         } else {
                 /* cl_io_rw_init() handled IO */
                 result = io->ci_result;
         } else {
                 /* cl_io_rw_init() handled IO */
                 result = io->ci_result;
index 86c86a0..722b1aa 100644 (file)
@@ -49,6 +49,8 @@
 #include <lustre_intent.h>
 #include <linux/compat.h>
 
 #include <lustre_intent.h>
 #include <linux/compat.h>
 
+#include "range_lock.h"
+
 #ifndef FMODE_EXEC
 #define FMODE_EXEC 0
 #endif
 #ifndef FMODE_EXEC
 #define FMODE_EXEC 0
 #endif
@@ -227,7 +229,7 @@ struct ll_inode_info {
                         * }
                         */
                        struct rw_semaphore             f_trunc_sem;
                         * }
                         */
                        struct rw_semaphore             f_trunc_sem;
-                       struct mutex                    f_write_mutex;
+                       struct range_lock_tree          f_write_tree;
 
                        struct rw_semaphore             f_glimpse_sem;
                        cfs_time_t                      f_glimpse_time;
 
                        struct rw_semaphore             f_glimpse_sem;
                        cfs_time_t                      f_glimpse_time;
@@ -252,7 +254,7 @@ struct ll_inode_info {
 #define lli_symlink_name        u.f.f_symlink_name
 #define lli_maxbytes            u.f.f_maxbytes
 #define lli_trunc_sem           u.f.f_trunc_sem
 #define lli_symlink_name        u.f.f_symlink_name
 #define lli_maxbytes            u.f.f_maxbytes
 #define lli_trunc_sem           u.f.f_trunc_sem
-#define lli_write_mutex         u.f.f_write_mutex
+#define lli_write_tree          u.f.f_write_tree
 #define lli_glimpse_sem        u.f.f_glimpse_sem
 #define lli_glimpse_time       u.f.f_glimpse_time
 #define lli_agl_list           u.f.f_agl_list
 #define lli_glimpse_sem        u.f.f_glimpse_sem
 #define lli_glimpse_time       u.f.f_glimpse_time
 #define lli_agl_list           u.f.f_agl_list
index 3f8eea2..aa60a72 100644 (file)
@@ -993,7 +993,7 @@ void ll_lli_init(struct ll_inode_info *lli)
                mutex_init(&lli->lli_size_mutex);
                lli->lli_symlink_name = NULL;
                init_rwsem(&lli->lli_trunc_sem);
                mutex_init(&lli->lli_size_mutex);
                lli->lli_symlink_name = NULL;
                init_rwsem(&lli->lli_trunc_sem);
-               mutex_init(&lli->lli_write_mutex);
+               range_lock_tree_init(&lli->lli_write_tree);
                init_rwsem(&lli->lli_glimpse_sem);
                lli->lli_glimpse_time = 0;
                INIT_LIST_HEAD(&lli->lli_agl_list);
                init_rwsem(&lli->lli_glimpse_sem);
                lli->lli_glimpse_time = 0;
                INIT_LIST_HEAD(&lli->lli_agl_list);
diff --git a/lustre/llite/range_lock.c b/lustre/llite/range_lock.c
new file mode 100644 (file)
index 0000000..4685144
--- /dev/null
@@ -0,0 +1,238 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Range lock is used to allow multiple threads writing a single shared
+ * file given each thread is writing to a non-overlapping portion of the
+ * file.
+ *
+ * Refer to the possible upstream kernel version of range lock by
+ * Jan Kara <jack@suse.cz>: https://lkml.org/lkml/2013/1/31/480
+ *
+ * This file could later replaced by the upstream kernel version.
+ */
+/*
+ * Author: Prakash Surya <surya1@llnl.gov>
+ * Author: Bobi Jam <bobijam.xu@intel.com>
+ */
+#include "range_lock.h"
+#include <lustre/lustre_user.h>
+
+/**
+ * Initialize a range lock tree
+ *
+ * \param tree [in]    an empty range lock tree
+ *
+ * Pre:  Caller should have allocated the range lock tree.
+ * Post: The range lock tree is ready to function.
+ */
+void range_lock_tree_init(struct range_lock_tree *tree)
+{
+       tree->rlt_root = NULL;
+       tree->rlt_sequence = 0;
+       spin_lock_init(&tree->rlt_lock);
+}
+
+/**
+ * Intialize a range lock node
+ *
+ * \param lock  [in]   an empty range lock node
+ * \param start [in]   start of the covering region
+ * \param end   [in]   end of the covering region
+ *
+ * Pre:  Caller should have allocated the range lock node.
+ * Post: The range lock node is meant to cover [start, end] region
+ */
+void range_lock_init(struct range_lock *lock, __u64 start, __u64 end)
+{
+       interval_init(&lock->rl_node);
+       if (end != LUSTRE_EOF)
+               end >>= PAGE_SHIFT;
+       interval_set(&lock->rl_node, start >> PAGE_SHIFT, end);
+       INIT_LIST_HEAD(&lock->rl_next_lock);
+       lock->rl_task = NULL;
+       lock->rl_lock_count = 0;
+       lock->rl_blocking_ranges = 0;
+       lock->rl_sequence = 0;
+}
+
+static inline struct range_lock *next_lock(struct range_lock *lock)
+{
+       return list_entry(lock->rl_next_lock.next, typeof(*lock), rl_next_lock);
+}
+
+/**
+ * Helper function of range_unlock()
+ *
+ * \param node [in]    a range lock found overlapped during interval node
+ *                     search
+ * \param arg [in]     the range lock to be tested
+ *
+ * \retval INTERVAL_ITER_CONT  indicate to continue the search for next
+ *                             overlapping range node
+ * \retval INTERVAL_ITER_STOP  indicate to stop the search
+ */
+enum interval_iter range_unlock_cb(struct interval_node *node, void *arg)
+{
+       struct range_lock *lock = arg;
+       struct range_lock *overlap = node2rangelock(node);
+       struct range_lock *iter;
+       ENTRY;
+
+       list_for_each_entry(iter, &overlap->rl_next_lock, rl_next_lock) {
+               if (iter->rl_sequence > lock->rl_sequence) {
+                       --iter->rl_blocking_ranges;
+                       LASSERT(iter->rl_blocking_ranges > 0);
+               }
+       }
+       if (overlap->rl_sequence > lock->rl_sequence) {
+               --overlap->rl_blocking_ranges;
+               if (overlap->rl_blocking_ranges == 0)
+                       wake_up_process(overlap->rl_task);
+       }
+       RETURN(INTERVAL_ITER_CONT);
+}
+
+/**
+ * Unlock a range lock, wake up locks blocked by this lock.
+ *
+ * \param tree [in]    range lock tree
+ * \param lock [in]    range lock to be deleted
+ *
+ * If this lock has been granted, relase it; if not, just delete it from
+ * the tree or the same region lock list. Wake up those locks only blocked
+ * by this lock through range_unlock_cb().
+ */
+void range_unlock(struct range_lock_tree *tree, struct range_lock *lock)
+{
+       ENTRY;
+
+       spin_lock(&tree->rlt_lock);
+       if (!list_empty(&lock->rl_next_lock)) {
+               struct range_lock *next;
+
+               if (interval_is_intree(&lock->rl_node)) { /* first lock */
+                       /* Insert the next same range lock into the tree */
+                       next = next_lock(lock);
+                       next->rl_lock_count = lock->rl_lock_count - 1;
+                       interval_erase(&lock->rl_node, &tree->rlt_root);
+                       interval_insert(&next->rl_node, &tree->rlt_root);
+               } else {
+                       /* find the first lock in tree */
+                       list_for_each_entry(next, &lock->rl_next_lock,
+                                           rl_next_lock) {
+                               if (!interval_is_intree(&next->rl_node))
+                                       continue;
+
+                               LASSERT(next->rl_lock_count > 0);
+                               next->rl_lock_count--;
+                               break;
+                       }
+               }
+               list_del_init(&lock->rl_next_lock);
+       } else {
+               LASSERT(interval_is_intree(&lock->rl_node));
+               interval_erase(&lock->rl_node, &tree->rlt_root);
+       }
+
+       interval_search(tree->rlt_root, &lock->rl_node.in_extent,
+                       range_unlock_cb, lock);
+       spin_unlock(&tree->rlt_lock);
+
+       EXIT;
+}
+
+/**
+ * Helper function of range_lock()
+ *
+ * \param node [in]    a range lock found overlapped during interval node
+ *                     search
+ * \param arg [in]     the range lock to be tested
+ *
+ * \retval INTERVAL_ITER_CONT  indicate to continue the search for next
+ *                             overlapping range node
+ * \retval INTERVAL_ITER_STOP  indicate to stop the search
+ */
+enum interval_iter range_lock_cb(struct interval_node *node, void *arg)
+{
+       struct range_lock *lock = (struct range_lock *)arg;
+       struct range_lock *overlap = node2rangelock(node);
+
+       lock->rl_blocking_ranges += overlap->rl_lock_count + 1;
+       RETURN(INTERVAL_ITER_CONT);
+}
+
+/**
+ * Lock a region
+ *
+ * \param tree [in]    range lock tree
+ * \param lock [in]    range lock node containing the region span
+ *
+ * \retval 0   get the range lock
+ * \retval <0  error code while not getting the range lock
+ *
+ * If there exists overlapping range lock, the new lock will wait and
+ * retry, if later it find that it is not the chosen one to wake up,
+ * it wait again.
+ */
+int range_lock(struct range_lock_tree *tree, struct range_lock *lock)
+{
+       struct interval_node *node;
+       int rc = 0;
+       ENTRY;
+
+       spin_lock(&tree->rlt_lock);
+       /*
+        * We need to check for all conflicting intervals
+        * already in the tree.
+        */
+       interval_search(tree->rlt_root, &lock->rl_node.in_extent,
+                       range_lock_cb, lock);
+       /*
+        * Insert to the tree if I am unique, otherwise I've been linked to
+        * the rl_next_lock of another lock which has the same range as mine
+        * in range_lock_cb().
+        */
+       node = interval_insert(&lock->rl_node, &tree->rlt_root);
+       if (node != NULL) {
+               struct range_lock *tmp = node2rangelock(node);
+
+               list_add_tail(&lock->rl_next_lock, &tmp->rl_next_lock);
+               tmp->rl_lock_count++;
+       }
+       lock->rl_sequence = ++tree->rlt_sequence;
+
+       while (lock->rl_blocking_ranges > 0) {
+               lock->rl_task = current;
+               __set_current_state(TASK_INTERRUPTIBLE);
+               spin_unlock(&tree->rlt_lock);
+               schedule();
+
+               if (signal_pending(current)) {
+                       range_unlock(tree, lock);
+                       GOTO(out, rc = -EINTR);
+               }
+               spin_lock(&tree->rlt_lock);
+       }
+       spin_unlock(&tree->rlt_lock);
+out:
+       RETURN(rc);
+}
diff --git a/lustre/llite/range_lock.h b/lustre/llite/range_lock.h
new file mode 100644 (file)
index 0000000..33a39ac
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Range lock is used to allow multiple threads writing a single shared
+ * file given each thread is writing to a non-overlapping portion of the
+ * file.
+ *
+ * Refer to the possible upstream kernel version of range lock by
+ * Jan Kara <jack@suse.cz>: https://lkml.org/lkml/2013/1/31/480
+ *
+ * This file could later replaced by the upstream kernel version.
+ */
+/*
+ * Author: Prakash Surya <surya1@llnl.gov>
+ * Author: Bobi Jam <bobijam.xu@intel.com>
+ */
+#ifndef _RANGE_LOCK_H
+#define _RANGE_LOCK_H
+
+#include <libcfs/libcfs.h>
+#include <interval_tree.h>
+
+#define RL_FMT "["LPU64", "LPU64"]"
+#define RL_PARA(range)                         \
+       (range)->rl_node.in_extent.start,       \
+       (range)->rl_node.in_extent.end
+
+struct range_lock {
+       struct interval_node    rl_node;
+       /**
+        * Process to enqueue this lock.
+        */
+       struct task_struct      *rl_task;
+       /**
+        * List of locks with the same range.
+        */
+       struct list_head        rl_next_lock;
+       /**
+        * Number of locks in the list rl_next_lock
+        */
+       unsigned int            rl_lock_count;
+       /**
+        * Number of ranges which are blocking acquisition of the lock
+        */
+       unsigned int            rl_blocking_ranges;
+       /**
+        * Sequence number of range lock. This number is used to get to know
+        * the order the locks are queued; this is required for range_cancel().
+        */
+       __u64                   rl_sequence;
+};
+
+static inline struct range_lock *node2rangelock(const struct interval_node *n)
+{
+       return container_of(n, struct range_lock, rl_node);
+}
+
+struct range_lock_tree {
+       struct interval_node    *rlt_root;
+       spinlock_t               rlt_lock;
+       __u64                    rlt_sequence;
+};
+
+void range_lock_tree_init(struct range_lock_tree *tree);
+void range_lock_init(struct range_lock *lock, __u64 start, __u64 end);
+int  range_lock(struct range_lock_tree *tree, struct range_lock *lock);
+void range_unlock(struct range_lock_tree *tree, struct range_lock *lock);
+#endif