node->in_max_high = end;
}
+static inline void interval_init(struct interval_node *node)
+{
+ memset(node, 0, sizeof(*node));
+}
+
+int node_equal(struct interval_node *n1, struct interval_node *n2);
+
/* Rules to write an interval callback.
* - the callback returns INTERVAL_ITER_STOP when it thinks the iteration
* should be stopped. It will then cause the iteration function to return
return (e1->start == e2->start) && (e1->end == e2->end);
}
-static inline int extent_overlapped(struct interval_node_extent *e1,
+static inline int extent_overlapped(struct interval_node_extent *e1,
struct interval_node_extent *e2)
{
return (e1->start <= e2->end) && (e2->start <= e1->end);
return extent_compare(&n1->in_extent, &n2->in_extent);
}
-static inline int node_equal(struct interval_node *n1,
- struct interval_node *n2)
+int node_equal(struct interval_node *n1, struct interval_node *n2)
{
- return extent_equal(&n1->in_extent, &n2->in_extent);
+ return extent_equal(&n1->in_extent, &n2->in_extent);
}
+EXPORT_SYMBOL(node_equal);
static inline __u64 max_u64(__u64 x, __u64 y)
{
*
*/
enum interval_iter interval_search(struct interval_node *node,
- struct interval_node_extent *ext,
- interval_callback_t func,
- void *data)
-{
- struct interval_node *parent;
- enum interval_iter rc = INTERVAL_ITER_CONT;
-
- LASSERT(ext != NULL);
- LASSERT(func != NULL);
-
- while (node) {
- if (ext->end < interval_low(node)) {
- if (node->in_left) {
- node = node->in_left;
- continue;
- }
- } else if (interval_may_overlap(node, ext)) {
- if (extent_overlapped(ext, &node->in_extent)) {
- rc = func(node, data);
- if (rc == INTERVAL_ITER_STOP)
- break;
- }
-
- if (node->in_left) {
- node = node->in_left;
- continue;
- }
- if (node->in_right) {
- node = node->in_right;
- continue;
- }
- }
-
- parent = node->in_parent;
- while (parent) {
- if (node_is_left_child(node) &&
- parent->in_right) {
- /* If we ever got the left, it means that the
- * parent met ext->end<interval_low(parent), or
- * may_overlap(parent). If the former is true,
- * we needn't go back. So stop early and check
- * may_overlap(parent) after this loop. */
- node = parent->in_right;
- break;
- }
- node = parent;
- parent = parent->in_parent;
- }
- if (parent == NULL || !interval_may_overlap(parent, ext))
- break;
- }
-
- return rc;
+ struct interval_node_extent *ext,
+ interval_callback_t func,
+ void *data)
+{
+ struct interval_node *parent;
+ enum interval_iter rc = INTERVAL_ITER_CONT;
+
+ ENTRY;
+
+ LASSERT(ext != NULL);
+ LASSERT(func != NULL);
+
+ while (node) {
+ if (ext->end < interval_low(node)) {
+ if (node->in_left) {
+ node = node->in_left;
+ continue;
+ }
+ } else if (interval_may_overlap(node, ext)) {
+ if (extent_overlapped(ext, &node->in_extent)) {
+ rc = func(node, data);
+ if (rc == INTERVAL_ITER_STOP)
+ break;
+ }
+
+ if (node->in_left) {
+ node = node->in_left;
+ continue;
+ }
+ if (node->in_right) {
+ node = node->in_right;
+ continue;
+ }
+ }
+
+ parent = node->in_parent;
+ while (parent) {
+ if (node_is_left_child(node) &&
+ parent->in_right) {
+ /* If we ever got the left, it means that the
+ * parent met ext->end<interval_low(parent), or
+ * may_overlap(parent). If the former is true,
+ * we needn't go back. So stop early and check
+ * may_overlap(parent) after this loop. */
+ node = parent->in_right;
+ break;
+ }
+ node = parent;
+ parent = parent->in_parent;
+ }
+ if (parent == NULL || !interval_may_overlap(parent, ext))
+ break;
+ }
+
+ RETURN(rc);
}
EXPORT_SYMBOL(interval_search);
lustre-objs += lcommon_cl.o
lustre-objs += lcommon_misc.o
lustre-objs += vvp_dev.o vvp_page.o vvp_lock.o vvp_io.o vvp_object.o
+lustre-objs += range_lock.o
llite_lloop-objs := lloop.o
EXTRA_DIST := $(lustre-objs:.o=.c) llite_internal.h rw26.c super25.c
EXTRA_DIST += $(llite_lloop-objs:.o=.c)
-EXTRA_DIST += vvp_internal.h
+EXTRA_DIST += vvp_internal.h range_lock.h
@INCLUDE_RULES@
struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
struct cl_io *io;
ssize_t result;
+ struct range_lock range;
ENTRY;
CDEBUG(D_VFSTRACE, "file: %s, type: %d ppos: "LPU64", count: %zd\n",
ll_io_init(io, file, iot == CIT_WRITE);
if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
- struct vvp_io *vio = vvp_env_io(env);
- struct ccc_io *cio = ccc_env_io(env);
- int write_mutex_locked = 0;
+ struct vvp_io *vio = vvp_env_io(env);
+ struct ccc_io *cio = ccc_env_io(env);
+ bool range_locked = false;
+ if (file->f_flags & O_APPEND)
+ range_lock_init(&range, 0, LUSTRE_EOF);
+ else
+ range_lock_init(&range, *ppos, *ppos + count - 1);
cio->cui_fd = LUSTRE_FPRIVATE(file);
vio->cui_io_subtype = args->via_io_subtype;
cio->cui_iocb = args->u.normal.via_iocb;
if ((iot == CIT_WRITE) &&
!(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
- if (mutex_lock_interruptible(&lli->
- lli_write_mutex))
- GOTO(out, result = -ERESTARTSYS);
- write_mutex_locked = 1;
+ CDEBUG(D_VFSTRACE, "Range lock "RL_FMT"\n",
+ RL_PARA(&range));
+ result = range_lock(&lli->lli_write_tree,
+ &range);
+ if (result < 0)
+ GOTO(out, result);
+
+ range_locked = true;
}
down_read(&lli->lli_trunc_sem);
break;
if (args->via_io_subtype == IO_NORMAL)
up_read(&lli->lli_trunc_sem);
- if (write_mutex_locked)
- mutex_unlock(&lli->lli_write_mutex);
+ if (range_locked) {
+ CDEBUG(D_VFSTRACE, "Range unlock "RL_FMT"\n",
+ RL_PARA(&range));
+ range_unlock(&lli->lli_write_tree, &range);
+ }
} else {
/* cl_io_rw_init() handled IO */
result = io->ci_result;
#include <lustre_intent.h>
#include <linux/compat.h>
+#include "range_lock.h"
+
#ifndef FMODE_EXEC
#define FMODE_EXEC 0
#endif
* }
*/
struct rw_semaphore f_trunc_sem;
- struct mutex f_write_mutex;
+ struct range_lock_tree f_write_tree;
struct rw_semaphore f_glimpse_sem;
cfs_time_t f_glimpse_time;
#define lli_symlink_name u.f.f_symlink_name
#define lli_maxbytes u.f.f_maxbytes
#define lli_trunc_sem u.f.f_trunc_sem
-#define lli_write_mutex u.f.f_write_mutex
+#define lli_write_tree u.f.f_write_tree
#define lli_glimpse_sem u.f.f_glimpse_sem
#define lli_glimpse_time u.f.f_glimpse_time
#define lli_agl_list u.f.f_agl_list
mutex_init(&lli->lli_size_mutex);
lli->lli_symlink_name = NULL;
init_rwsem(&lli->lli_trunc_sem);
- mutex_init(&lli->lli_write_mutex);
+ range_lock_tree_init(&lli->lli_write_tree);
init_rwsem(&lli->lli_glimpse_sem);
lli->lli_glimpse_time = 0;
INIT_LIST_HEAD(&lli->lli_agl_list);
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Range lock is used to allow multiple threads writing a single shared
+ * file given each thread is writing to a non-overlapping portion of the
+ * file.
+ *
+ * Refer to the possible upstream kernel version of range lock by
+ * Jan Kara <jack@suse.cz>: https://lkml.org/lkml/2013/1/31/480
+ *
+ * This file could later replaced by the upstream kernel version.
+ */
+/*
+ * Author: Prakash Surya <surya1@llnl.gov>
+ * Author: Bobi Jam <bobijam.xu@intel.com>
+ */
+#include "range_lock.h"
+#include <lustre/lustre_user.h>
+
+/**
+ * Initialize a range lock tree
+ *
+ * \param tree [in] an empty range lock tree
+ *
+ * Pre: Caller should have allocated the range lock tree.
+ * Post: The range lock tree is ready to function.
+ */
+void range_lock_tree_init(struct range_lock_tree *tree)
+{
+ tree->rlt_root = NULL;
+ tree->rlt_sequence = 0;
+ spin_lock_init(&tree->rlt_lock);
+}
+
+/**
+ * Intialize a range lock node
+ *
+ * \param lock [in] an empty range lock node
+ * \param start [in] start of the covering region
+ * \param end [in] end of the covering region
+ *
+ * Pre: Caller should have allocated the range lock node.
+ * Post: The range lock node is meant to cover [start, end] region
+ */
+void range_lock_init(struct range_lock *lock, __u64 start, __u64 end)
+{
+ interval_init(&lock->rl_node);
+ if (end != LUSTRE_EOF)
+ end >>= PAGE_SHIFT;
+ interval_set(&lock->rl_node, start >> PAGE_SHIFT, end);
+ INIT_LIST_HEAD(&lock->rl_next_lock);
+ lock->rl_task = NULL;
+ lock->rl_lock_count = 0;
+ lock->rl_blocking_ranges = 0;
+ lock->rl_sequence = 0;
+}
+
+static inline struct range_lock *next_lock(struct range_lock *lock)
+{
+ return list_entry(lock->rl_next_lock.next, typeof(*lock), rl_next_lock);
+}
+
+/**
+ * Helper function of range_unlock()
+ *
+ * \param node [in] a range lock found overlapped during interval node
+ * search
+ * \param arg [in] the range lock to be tested
+ *
+ * \retval INTERVAL_ITER_CONT indicate to continue the search for next
+ * overlapping range node
+ * \retval INTERVAL_ITER_STOP indicate to stop the search
+ */
+enum interval_iter range_unlock_cb(struct interval_node *node, void *arg)
+{
+ struct range_lock *lock = arg;
+ struct range_lock *overlap = node2rangelock(node);
+ struct range_lock *iter;
+ ENTRY;
+
+ list_for_each_entry(iter, &overlap->rl_next_lock, rl_next_lock) {
+ if (iter->rl_sequence > lock->rl_sequence) {
+ --iter->rl_blocking_ranges;
+ LASSERT(iter->rl_blocking_ranges > 0);
+ }
+ }
+ if (overlap->rl_sequence > lock->rl_sequence) {
+ --overlap->rl_blocking_ranges;
+ if (overlap->rl_blocking_ranges == 0)
+ wake_up_process(overlap->rl_task);
+ }
+ RETURN(INTERVAL_ITER_CONT);
+}
+
+/**
+ * Unlock a range lock, wake up locks blocked by this lock.
+ *
+ * \param tree [in] range lock tree
+ * \param lock [in] range lock to be deleted
+ *
+ * If this lock has been granted, relase it; if not, just delete it from
+ * the tree or the same region lock list. Wake up those locks only blocked
+ * by this lock through range_unlock_cb().
+ */
+void range_unlock(struct range_lock_tree *tree, struct range_lock *lock)
+{
+ ENTRY;
+
+ spin_lock(&tree->rlt_lock);
+ if (!list_empty(&lock->rl_next_lock)) {
+ struct range_lock *next;
+
+ if (interval_is_intree(&lock->rl_node)) { /* first lock */
+ /* Insert the next same range lock into the tree */
+ next = next_lock(lock);
+ next->rl_lock_count = lock->rl_lock_count - 1;
+ interval_erase(&lock->rl_node, &tree->rlt_root);
+ interval_insert(&next->rl_node, &tree->rlt_root);
+ } else {
+ /* find the first lock in tree */
+ list_for_each_entry(next, &lock->rl_next_lock,
+ rl_next_lock) {
+ if (!interval_is_intree(&next->rl_node))
+ continue;
+
+ LASSERT(next->rl_lock_count > 0);
+ next->rl_lock_count--;
+ break;
+ }
+ }
+ list_del_init(&lock->rl_next_lock);
+ } else {
+ LASSERT(interval_is_intree(&lock->rl_node));
+ interval_erase(&lock->rl_node, &tree->rlt_root);
+ }
+
+ interval_search(tree->rlt_root, &lock->rl_node.in_extent,
+ range_unlock_cb, lock);
+ spin_unlock(&tree->rlt_lock);
+
+ EXIT;
+}
+
+/**
+ * Helper function of range_lock()
+ *
+ * \param node [in] a range lock found overlapped during interval node
+ * search
+ * \param arg [in] the range lock to be tested
+ *
+ * \retval INTERVAL_ITER_CONT indicate to continue the search for next
+ * overlapping range node
+ * \retval INTERVAL_ITER_STOP indicate to stop the search
+ */
+enum interval_iter range_lock_cb(struct interval_node *node, void *arg)
+{
+ struct range_lock *lock = (struct range_lock *)arg;
+ struct range_lock *overlap = node2rangelock(node);
+
+ lock->rl_blocking_ranges += overlap->rl_lock_count + 1;
+ RETURN(INTERVAL_ITER_CONT);
+}
+
+/**
+ * Lock a region
+ *
+ * \param tree [in] range lock tree
+ * \param lock [in] range lock node containing the region span
+ *
+ * \retval 0 get the range lock
+ * \retval <0 error code while not getting the range lock
+ *
+ * If there exists overlapping range lock, the new lock will wait and
+ * retry, if later it find that it is not the chosen one to wake up,
+ * it wait again.
+ */
+int range_lock(struct range_lock_tree *tree, struct range_lock *lock)
+{
+ struct interval_node *node;
+ int rc = 0;
+ ENTRY;
+
+ spin_lock(&tree->rlt_lock);
+ /*
+ * We need to check for all conflicting intervals
+ * already in the tree.
+ */
+ interval_search(tree->rlt_root, &lock->rl_node.in_extent,
+ range_lock_cb, lock);
+ /*
+ * Insert to the tree if I am unique, otherwise I've been linked to
+ * the rl_next_lock of another lock which has the same range as mine
+ * in range_lock_cb().
+ */
+ node = interval_insert(&lock->rl_node, &tree->rlt_root);
+ if (node != NULL) {
+ struct range_lock *tmp = node2rangelock(node);
+
+ list_add_tail(&lock->rl_next_lock, &tmp->rl_next_lock);
+ tmp->rl_lock_count++;
+ }
+ lock->rl_sequence = ++tree->rlt_sequence;
+
+ while (lock->rl_blocking_ranges > 0) {
+ lock->rl_task = current;
+ __set_current_state(TASK_INTERRUPTIBLE);
+ spin_unlock(&tree->rlt_lock);
+ schedule();
+
+ if (signal_pending(current)) {
+ range_unlock(tree, lock);
+ GOTO(out, rc = -EINTR);
+ }
+ spin_lock(&tree->rlt_lock);
+ }
+ spin_unlock(&tree->rlt_lock);
+out:
+ RETURN(rc);
+}
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Range lock is used to allow multiple threads writing a single shared
+ * file given each thread is writing to a non-overlapping portion of the
+ * file.
+ *
+ * Refer to the possible upstream kernel version of range lock by
+ * Jan Kara <jack@suse.cz>: https://lkml.org/lkml/2013/1/31/480
+ *
+ * This file could later replaced by the upstream kernel version.
+ */
+/*
+ * Author: Prakash Surya <surya1@llnl.gov>
+ * Author: Bobi Jam <bobijam.xu@intel.com>
+ */
+#ifndef _RANGE_LOCK_H
+#define _RANGE_LOCK_H
+
+#include <libcfs/libcfs.h>
+#include <interval_tree.h>
+
+#define RL_FMT "["LPU64", "LPU64"]"
+#define RL_PARA(range) \
+ (range)->rl_node.in_extent.start, \
+ (range)->rl_node.in_extent.end
+
+struct range_lock {
+ struct interval_node rl_node;
+ /**
+ * Process to enqueue this lock.
+ */
+ struct task_struct *rl_task;
+ /**
+ * List of locks with the same range.
+ */
+ struct list_head rl_next_lock;
+ /**
+ * Number of locks in the list rl_next_lock
+ */
+ unsigned int rl_lock_count;
+ /**
+ * Number of ranges which are blocking acquisition of the lock
+ */
+ unsigned int rl_blocking_ranges;
+ /**
+ * Sequence number of range lock. This number is used to get to know
+ * the order the locks are queued; this is required for range_cancel().
+ */
+ __u64 rl_sequence;
+};
+
+static inline struct range_lock *node2rangelock(const struct interval_node *n)
+{
+ return container_of(n, struct range_lock, rl_node);
+}
+
+struct range_lock_tree {
+ struct interval_node *rlt_root;
+ spinlock_t rlt_lock;
+ __u64 rlt_sequence;
+};
+
+void range_lock_tree_init(struct range_lock_tree *tree);
+void range_lock_init(struct range_lock *lock, __u64 start, __u64 end);
+int range_lock(struct range_lock_tree *tree, struct range_lock *lock);
+void range_unlock(struct range_lock_tree *tree, struct range_lock *lock);
+#endif