The "uuid"s file would list the uuids of _active_ exports.
And the clear entry is to clear all stats and stale nids.
+Severity : enhancement
+Bugzilla : 11270
+Description: eliminate client locks in face of contention
+Details : file contention detection and lockless i/o implementation
+ for contended files.
+
--------------------------------------------------------------------------------
2007-08-10 Cluster File Systems, Inc. <info@clusterfs.com>
LPROC_LL_FSYNC,
LPROC_LL_SETATTR,
LPROC_LL_TRUNC,
+ LPROC_LL_LOCKLESS_TRUNC,
LPROC_LL_FLOCK,
LPROC_LL_GETATTR,
LPROC_LL_STAFS,
LPROC_LL_INODE_PERM,
LPROC_LL_DIRECT_READ,
LPROC_LL_DIRECT_WRITE,
+ LPROC_LL_LOCKLESS_READ,
+ LPROC_LL_LOCKLESS_WRITE,
LPROC_LL_FILE_OPCODES
};
* w/o involving separate thread. in order to decrease cs rate */
#define LDLM_FL_ATOMIC_CB 0x4000000
+/* Cancel lock asynchronously. See ldlm_cli_cancel_unused_resource. */
+#define LDLM_FL_ASYNC 0x8000000
+
/* It may happen that a client initiate 2 operations, e.g. unlink and mkdir,
* such that server send blocking ast for conflict locks to this client for
* the 1st operation, whereas the 2nd operation has canceled this lock and
#define LDLM_FL_BL_AST 0x10000000
#define LDLM_FL_BL_DONE 0x20000000
-/* Cancel lock asynchronously. See ldlm_cli_cancel_unused_resource. */
-#define LDLM_FL_ASYNC 0x40000000
+/* measure lock contention and return -EUSERS if locking contention is high */
+#define LDLM_FL_DENY_ON_CONTENTION 0x40000000
/* The blocking callback is overloaded to perform two functions. These flags
* indicate which operation should be performed. */
* others (including ibits locks) will be canceled on memory pressure event. */
#define LDLM_LOCK_SHRINK_THUMB 256
+/* default values for the "max_nolock_size", "contention_time"
+ * and "contended_locks" namespace tunables */
+#define NS_DEFAULT_MAX_NOLOCK_BYTES 0
+#define NS_DEFAULT_CONTENTION_SECONDS 2
+#define NS_DEFAULT_CONTENDED_LOCKS 32
+
struct ldlm_namespace {
char *ns_name;
ldlm_side_t ns_client; /* is this a client-side lock tree? */
cfs_waitq_t ns_waitq;
struct ldlm_pool ns_pool;
ldlm_appetite_t ns_appetite;
+ /* if more than @ns_contented_locks found, the resource considered
+ * as contended */
+ unsigned ns_contended_locks;
+ /* the resource remembers contended state during @ns_contention_time,
+ * in seconds */
+ unsigned ns_contention_time;
+ /* limit size of nolock requests, in bytes */
+ unsigned ns_max_nolock_size;
};
static inline int ns_is_client(struct ldlm_namespace *ns)
struct semaphore lr_lvb_sem;
__u32 lr_lvb_len;
void *lr_lvb_data;
+
+ /* when the resource was considered as contended */
+ cfs_time_t lr_contention_time;
};
struct ldlm_ast_work {
#define DEBUG_SUBSYSTEM S_LDLM
#ifndef __KERNEL__
# include <liblustre.h>
+#else
+# include <libcfs/libcfs.h>
+# include <libcfs/kp30.h>
#endif
#include <lustre_dlm.h>
#include <obd_support.h>
+#include <obd.h>
#include <lustre_lib.h>
#include "ldlm_internal.h"
}
}
+static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
+{
+ struct ldlm_resource *res = lock->l_resource;
+ cfs_time_t now = cfs_time_current();
+
+ CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
+ if (contended_locks > res->lr_namespace->ns_contended_locks)
+ res->lr_contention_time = now;
+ return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
+ cfs_time_seconds(res->lr_namespace->ns_contention_time)));
+}
+
struct ldlm_extent_compat_args {
struct list_head *work_list;
struct ldlm_lock *lock;
ldlm_mode_t mode;
+ int *locks;
int *compat;
};
{
struct ldlm_extent_compat_args *priv = data;
struct ldlm_interval *node = to_ldlm_interval(n);
+ struct ldlm_extent *extent;
struct list_head *work_list = priv->work_list;
struct ldlm_lock *lock, *enq = priv->lock;
ldlm_mode_t mode = priv->mode;
+ int count = 0;
ENTRY;
LASSERT(!list_empty(&node->li_group));
"mode = %s, lock->l_granted_mode = %s\n",
ldlm_lockname[mode],
ldlm_lockname[lock->l_granted_mode]);
-
+ count++;
if (lock->l_blocking_ast)
ldlm_add_ast_work_item(lock, enq, work_list);
}
+ /* don't count conflicting glimpse locks */
+ extent = ldlm_interval_extent(node);
+ if (!(mode == LCK_PR &&
+ extent->start == 0 && extent->end == OBD_OBJECT_EOF))
+ *priv->locks += count;
+
if (priv->compat)
*priv->compat = 0;
static int
ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
int *flags, ldlm_error_t *err,
- struct list_head *work_list)
+ struct list_head *work_list, int *contended_locks)
{
struct list_head *tmp;
struct ldlm_lock *lock;
__u64 req_end = req->l_req_extent.end;
int compat = 1;
int scan = 0;
+ int check_contention;
ENTRY;
lockmode_verify(req_mode);
struct ldlm_interval_tree *tree;
struct ldlm_extent_compat_args data = {.work_list = work_list,
.lock = req,
+ .locks = contended_locks,
.compat = &compat };
struct interval_node_extent ex = { .start = req_start,
.end = req_end };
compat = 0;
}
}
- RETURN(compat);
- }
+ } else { /* for waiting queue */
+ list_for_each(tmp, queue) {
+ check_contention = 1;
+
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (req == lock)
+ break;
+
+ if (unlikely(scan)) {
+ /* We only get here if we are queuing GROUP lock
+ and met some incompatible one. The main idea of this
+ code is to insert GROUP lock past compatible GROUP
+ lock in the waiting queue or if there is not any,
+ then in front of first non-GROUP lock */
+ if (lock->l_req_mode != LCK_GROUP) {
+ /* Ok, we hit non-GROUP lock, there should
+ * be no more GROUP locks later on, queue in
+ * front of first non-GROUP lock */
+
+ ldlm_resource_insert_lock_after(lock, req);
+ list_del_init(&lock->l_res_link);
+ ldlm_resource_insert_lock_after(req, lock);
+ compat = 0;
+ break;
+ }
+ if (req->l_policy_data.l_extent.gid ==
+ lock->l_policy_data.l_extent.gid) {
+ /* found it */
+ ldlm_resource_insert_lock_after(lock, req);
+ compat = 0;
+ break;
+ }
+ continue;
+ }
- /* for waiting queue */
- list_for_each(tmp, queue) {
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+ /* locks are compatible, overlap doesn't matter */
+ if (lockmode_compat(lock->l_req_mode, req_mode)) {
+ if (req_mode == LCK_PR &&
+ ((lock->l_policy_data.l_extent.start <=
+ req->l_policy_data.l_extent.start) &&
+ (lock->l_policy_data.l_extent.end >=
+ req->l_policy_data.l_extent.end))) {
+ /* If we met a PR lock just like us or wider,
+ and nobody down the list conflicted with
+ it, that means we can skip processing of
+ the rest of the list and safely place
+ ourselves at the end of the list, or grant
+ (dependent if we met an conflicting locks
+ before in the list).
+ In case of 1st enqueue only we continue
+ traversing if there is something conflicting
+ down the list because we need to make sure
+ that something is marked as AST_SENT as well,
+ in cse of empy worklist we would exit on
+ first conflict met. */
+ /* There IS a case where such flag is
+ not set for a lock, yet it blocks
+ something. Luckily for us this is
+ only during destroy, so lock is
+ exclusive. So here we are safe */
+ if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
+ RETURN(compat);
+ }
+ }
- if (req == lock)
- RETURN(compat);
-
- if (unlikely(scan)) {
- /* We only get here if we are queuing GROUP lock
- and met some incompatible one. The main idea of this
- code is to insert GROUP lock past compatible GROUP
- lock in the waiting queue or if there is not any,
- then in front of first non-GROUP lock */
- if (lock->l_req_mode != LCK_GROUP) {
- /* Ok, we hit non-GROUP lock, there should
- * be no more GROUP locks later on, queue in
- * front of first non-GROUP lock */
-
- ldlm_resource_insert_lock_after(lock, req);
- list_del_init(&lock->l_res_link);
- ldlm_resource_insert_lock_after(req, lock);
- RETURN(0);
- }
- if (req->l_policy_data.l_extent.gid ==
- lock->l_policy_data.l_extent.gid) {
- /* found it */
- ldlm_resource_insert_lock_after(lock, req);
- RETURN(0);
- }
- continue;
- }
+ /* non-group locks are compatible, overlap doesn't
+ matter */
+ if (likely(req_mode != LCK_GROUP))
+ continue;
- /* locks are compatible, overlap doesn't matter */
- if (lockmode_compat(lock->l_req_mode, req_mode)) {
- if (req_mode == LCK_PR &&
- ((lock->l_policy_data.l_extent.start <=
- req->l_policy_data.l_extent.start) &&
- (lock->l_policy_data.l_extent.end >=
- req->l_policy_data.l_extent.end))) {
- /* If we met a PR lock just like us or wider,
- and nobody down the list conflicted with
- it, that means we can skip processing of
- the rest of the list and safely place
- ourselves at the end of the list, or grant
- (dependent if we met an conflicting locks
- before in the list).
- In case of 1st enqueue only we continue
- traversing if there is something conflicting
- down the list because we need to make sure
- that something is marked as AST_SENT as well,
- in cse of empy worklist we would exit on
- first conflict met. */
- /* There IS a case where such flag is
- not set for a lock, yet it blocks
- something. Luckily for us this is
- only during destroy, so lock is
- exclusive. So here we are safe */
- if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
- RETURN(compat);
+ /* If we are trying to get a GROUP lock and there is
+ another one of this kind, we need to compare gid */
+ if (req->l_policy_data.l_extent.gid ==
+ lock->l_policy_data.l_extent.gid) {
+ /* If existing lock with matched gid is granted,
+ we grant new one too. */
+ if (lock->l_req_mode == lock->l_granted_mode)
+ RETURN(2);
+
+ /* Otherwise we are scanning queue of waiting
+ * locks and it means current request would
+ * block along with existing lock (that is
+ * already blocked.
+ * If we are in nonblocking mode - return
+ * immediately */
+ if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+ compat = -EWOULDBLOCK;
+ goto destroylock;
+ }
+ /* If this group lock is compatible with another
+ * group lock on the waiting list, they must be
+ * together in the list, so they can be granted
+ * at the same time. Otherwise the later lock
+ * can get stuck behind another, incompatible,
+ * lock. */
+ ldlm_resource_insert_lock_after(lock, req);
+ /* Because 'lock' is not granted, we can stop
+ * processing this queue and return immediately.
+ * There is no need to check the rest of the
+ * list. */
+ RETURN(0);
}
}
- /* non-group locks are compatible, overlap doesn't
- matter */
- if (likely(req_mode != LCK_GROUP))
+ if (unlikely(req_mode == LCK_GROUP &&
+ (lock->l_req_mode != lock->l_granted_mode))) {
+ scan = 1;
+ compat = 0;
+ if (lock->l_req_mode != LCK_GROUP) {
+ /* Ok, we hit non-GROUP lock, there should be no
+ more GROUP locks later on, queue in front of
+ first non-GROUP lock */
+
+ ldlm_resource_insert_lock_after(lock, req);
+ list_del_init(&lock->l_res_link);
+ ldlm_resource_insert_lock_after(req, lock);
+ break;
+ }
+ if (req->l_policy_data.l_extent.gid ==
+ lock->l_policy_data.l_extent.gid) {
+ /* found it */
+ ldlm_resource_insert_lock_after(lock, req);
+ break;
+ }
continue;
+ }
- /* If we are trying to get a GROUP lock and there is
- another one of this kind, we need to compare gid */
- if (req->l_policy_data.l_extent.gid ==
- lock->l_policy_data.l_extent.gid) {
- /* If existing lock with matched gid is granted,
- we grant new one too. */
- if (lock->l_req_mode == lock->l_granted_mode)
- RETURN(2);
-
- /* Otherwise we are scanning queue of waiting
- * locks and it means current request would
- * block along with existing lock (that is
- * already blocked.
- * If we are in nonblocking mode - return
- * immediately */
+ if (unlikely(lock->l_req_mode == LCK_GROUP)) {
+ /* If compared lock is GROUP, then requested is PR/PW/
+ * so this is not compatible; extent range does not
+ * matter */
if (*flags & LDLM_FL_BLOCK_NOWAIT) {
compat = -EWOULDBLOCK;
goto destroylock;
+ } else {
+ *flags |= LDLM_FL_NO_TIMEOUT;
}
- /* If this group lock is compatible with another
- * group lock on the waiting list, they must be
- * together in the list, so they can be granted
- * at the same time. Otherwise the later lock
- * can get stuck behind another, incompatible,
- * lock. */
- ldlm_resource_insert_lock_after(lock, req);
- /* Because 'lock' is not granted, we can stop
- * processing this queue and return immediately.
- * There is no need to check the rest of the
- * list. */
- RETURN(0);
+ } else if (lock->l_policy_data.l_extent.end < req_start ||
+ lock->l_policy_data.l_extent.start > req_end) {
+ /* if a non group lock doesn't overlap skip it */
+ continue;
+ } else if (lock->l_req_extent.end < req_start ||
+ lock->l_req_extent.start > req_end) {
+ /* false contention, the requests doesn't really overlap */
+ check_contention = 0;
}
- }
- if (unlikely(req_mode == LCK_GROUP &&
- (lock->l_req_mode != lock->l_granted_mode))) {
- scan = 1;
- compat = 0;
- if (lock->l_req_mode != LCK_GROUP) {
- /* Ok, we hit non-GROUP lock, there should be no
- more GROUP locks later on, queue in front of
- first non-GROUP lock */
-
- ldlm_resource_insert_lock_after(lock, req);
- list_del_init(&lock->l_res_link);
- ldlm_resource_insert_lock_after(req, lock);
+ if (!work_list)
RETURN(0);
- }
- if (req->l_policy_data.l_extent.gid ==
- lock->l_policy_data.l_extent.gid) {
- /* found it */
- ldlm_resource_insert_lock_after(lock, req);
- RETURN(0);
- }
- continue;
- }
- if (unlikely(lock->l_req_mode == LCK_GROUP)) {
- /* If compared lock is GROUP, then requested is PR/PW/
- * so this is not compatible; extent range does not
- * matter */
- if (*flags & LDLM_FL_BLOCK_NOWAIT) {
- compat = -EWOULDBLOCK;
- goto destroylock;
- } else {
- *flags |= LDLM_FL_NO_TIMEOUT;
- }
- } else if (lock->l_policy_data.l_extent.end < req_start ||
- lock->l_policy_data.l_extent.start > req_end) {
- /* if a non group lock doesn't overlap skip it */
- continue;
- }
+ /* don't count conflicting glimpse locks */
+ if (lock->l_req_mode == LCK_PR &&
+ lock->l_policy_data.l_extent.start == 0 &&
+ lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF)
+ check_contention = 0;
- if (!work_list)
- RETURN(0);
+ *contended_locks += check_contention;
- compat = 0;
- if (lock->l_blocking_ast)
- ldlm_add_ast_work_item(lock, req, work_list);
+ compat = 0;
+ if (lock->l_blocking_ast)
+ ldlm_add_ast_work_item(lock, req, work_list);
+ }
}
+ if (ldlm_check_contention(req, *contended_locks) &&
+ compat == 0 &&
+ (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
+ req->l_req_mode != LCK_GROUP &&
+ req_end - req_start <=
+ req->l_resource->lr_namespace->ns_max_nolock_size)
+ GOTO(destroylock, compat = -EUSERS);
+
RETURN(compat);
destroylock:
list_del_init(&req->l_res_link);
RETURN(compat);
}
+static void discard_bl_list(struct list_head *bl_list)
+{
+ struct list_head *tmp, *pos;
+ ENTRY;
+
+ list_for_each_safe(pos, tmp, bl_list) {
+ struct ldlm_lock *lock =
+ list_entry(pos, struct ldlm_lock, l_bl_ast);
+
+ list_del_init(&lock->l_bl_ast);
+ LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
+ lock->l_flags &= ~LDLM_FL_AST_SENT;
+ LASSERT(lock->l_bl_ast_run == 0);
+ LASSERT(lock->l_blocking_lock);
+ LDLM_LOCK_PUT(lock->l_blocking_lock);
+ lock->l_blocking_lock = NULL;
+ LDLM_LOCK_PUT(lock);
+ }
+ EXIT;
+}
+
/* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
* - blocking ASTs have already been sent
* - must call this function with the ns lock held
struct ldlm_resource *res = lock->l_resource;
struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list);
int rc, rc2;
+ int contended_locks = 0;
ENTRY;
LASSERT(list_empty(&res->lr_converting));
+ LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
+ !(lock->l_flags & LDLM_AST_DISCARD_DATA));
check_res_locked(res);
*err = ELDLM_OK;
* being true, we want to find out. */
LASSERT(*flags == 0);
rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
- err, NULL);
+ err, NULL, &contended_locks);
if (rc == 1) {
rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
- flags, err, NULL);
+ flags, err, NULL,
+ &contended_locks);
}
if (rc == 0)
RETURN(LDLM_ITER_STOP);
}
restart:
- rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list);
+ contended_locks = 0;
+ rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
+ &rpc_list, &contended_locks);
if (rc < 0)
GOTO(out, rc); /* lock was destroyed */
if (rc == 2)
goto grant;
- rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, &rpc_list);
+ rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
+ &rpc_list, &contended_locks);
if (rc2 < 0)
GOTO(out, rc = rc2); /* lock was destroyed */
*flags |= LDLM_FL_NO_TIMEOUT;
}
- rc = 0;
+ RETURN(0);
out:
+ if (!list_empty(&rpc_list)) {
+ LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
+ discard_bl_list(&rpc_list);
+ }
RETURN(rc);
}
lock_vars[0].read_fptr = lprocfs_rd_uint;
lock_vars[0].write_fptr = lprocfs_wr_uint;
lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+
+ snprintf(lock_name, MAX_STRING_SIZE, "%s/max_nolock_bytes",
+ ns->ns_name);
+ lock_vars[0].data = &ns->ns_max_nolock_size;
+ lock_vars[0].read_fptr = lprocfs_rd_uint;
+ lock_vars[0].write_fptr = lprocfs_wr_uint;
+ lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+
+ snprintf(lock_name, MAX_STRING_SIZE, "%s/contention_seconds",
+ ns->ns_name);
+ lock_vars[0].data = &ns->ns_contention_time;
+ lock_vars[0].read_fptr = lprocfs_rd_uint;
+ lock_vars[0].write_fptr = lprocfs_wr_uint;
+ lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+
+ snprintf(lock_name, MAX_STRING_SIZE, "%s/contended_locks",
+ ns->ns_name);
+ lock_vars[0].data = &ns->ns_contended_locks;
+ lock_vars[0].read_fptr = lprocfs_rd_uint;
+ lock_vars[0].write_fptr = lprocfs_wr_uint;
+ lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
}
}
#undef MAX_STRING_SIZE
atomic_set(&ns->ns_locks, 0);
ns->ns_resources = 0;
cfs_waitq_init(&ns->ns_waitq);
+ ns->ns_max_nolock_size = NS_DEFAULT_MAX_NOLOCK_BYTES;
+ ns->ns_contention_time = NS_DEFAULT_CONTENTION_SECONDS;
+ ns->ns_contended_locks = NS_DEFAULT_CONTENDED_LOCKS;
for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash;
bucket--)
RETURN(rc);
}
+static void ll_set_file_contended(struct inode *inode)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ cfs_time_t now = cfs_time_current();
+
+ spin_lock(&lli->lli_lock);
+ lli->lli_contention_time = now;
+ lli->lli_flags |= LLIF_CONTENDED;
+ spin_unlock(&lli->lli_lock);
+}
+
+void ll_clear_file_contended(struct inode *inode)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+
+ spin_lock(&lli->lli_lock);
+ lli->lli_flags &= ~LLIF_CONTENDED;
+ spin_unlock(&lli->lli_lock);
+}
+
+static int ll_is_file_contended(struct file *file)
+{
+ struct inode *inode = file->f_dentry->d_inode;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ ENTRY;
+
+ if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
+ CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
+ " osc connect flags = 0x"LPX64"\n",
+ sbi->ll_lco.lco_flags);
+ RETURN(0);
+ }
+ if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
+ RETURN(1);
+ if (lli->lli_flags & LLIF_CONTENDED) {
+ cfs_time_t cur_time = cfs_time_current();
+ cfs_time_t retry_time;
+
+ retry_time = cfs_time_add(
+ lli->lli_contention_time,
+ cfs_time_seconds(sbi->ll_contention_time));
+ if (cfs_time_after(cur_time, retry_time)) {
+ ll_clear_file_contended(inode);
+ RETURN(0);
+ }
+ RETURN(1);
+ }
+ RETURN(0);
+}
+
+static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
+ const char *buf, size_t count,
+ loff_t start, loff_t end, int rw)
+{
+ int append;
+ int tree_locked = 0;
+ int rc;
+ struct inode * inode = file->f_dentry->d_inode;
+ ENTRY;
+
+ append = (rw == WRITE) && (file->f_flags & O_APPEND);
+
+ if (append || !ll_is_file_contended(file)) {
+ struct ll_lock_tree_node *node;
+ int ast_flags;
+
+ ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
+ if (file->f_flags & O_NONBLOCK)
+ ast_flags |= LDLM_FL_BLOCK_NOWAIT;
+ node = ll_node_from_inode(inode, start, end,
+ (rw == WRITE) ? LCK_PW : LCK_PR);
+ if (IS_ERR(node)) {
+ rc = PTR_ERR(node);
+ GOTO(out, rc);
+ }
+ tree->lt_fd = LUSTRE_FPRIVATE(file);
+ rc = ll_tree_lock(tree, node, buf, count, ast_flags);
+ if (rc == 0)
+ tree_locked = 1;
+ else if (rc == -EUSERS)
+ ll_set_file_contended(inode);
+ else
+ GOTO(out, rc);
+ }
+ RETURN(tree_locked);
+out:
+ return rc;
+}
+
static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
loff_t *ppos)
{
struct lov_stripe_md *lsm = lli->lli_smd;
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct ll_lock_tree tree;
- struct ll_lock_tree_node *node;
struct ost_lvb lvb;
struct ll_ra_read bead;
- int rc, ra = 0;
+ int ra = 0;
loff_t end;
ssize_t retval, chunk, sum = 0;
+ int tree_locked;
__u64 kms;
ENTRY;
RETURN(-EFAULT);
RETURN(count);
}
-
repeat:
if (sbi->ll_max_rw_chunk != 0) {
/* first, let's know the end of the current stripe */
end = *ppos + count - 1;
}
- node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
- if (IS_ERR(node)){
- GOTO(out, retval = PTR_ERR(node));
- }
-
- tree.lt_fd = LUSTRE_FPRIVATE(file);
- rc = ll_tree_lock(&tree, node, buf, count,
- file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
- if (rc != 0)
- GOTO(out, retval = rc);
+ tree_locked = ll_file_get_tree_lock(&tree, file, buf,
+ count, *ppos, end, READ);
+ if (tree_locked < 0)
+ GOTO(out, retval = tree_locked);
ll_inode_size_lock(inode, 1);
/*
ll_inode_size_unlock(inode, 1);
retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
if (retval) {
- ll_tree_unlock(&tree);
+ if (tree_locked)
+ ll_tree_unlock(&tree);
goto out;
}
} else {
CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
inode->i_ino, chunk, *ppos, i_size_read(inode));
- /* turn off the kernel's read-ahead */
- file->f_ra.ra_pages = 0;
+ if (tree_locked) {
+ /* turn off the kernel's read-ahead */
+ file->f_ra.ra_pages = 0;
- /* initialize read-ahead window once per syscall */
- if (ra == 0) {
- ra = 1;
- bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
- bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
- ll_ra_read_in(file, &bead);
- }
+ /* initialize read-ahead window once per syscall */
+ if (ra == 0) {
+ ra = 1;
+ bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
+ bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
+ ll_ra_read_in(file, &bead);
+ }
- /* BUG: 5972 */
- file_accessed(file);
- retval = generic_file_read(file, buf, chunk, ppos);
- ll_rw_stats_tally(sbi, current->pid, file, count, 0);
+ /* BUG: 5972 */
+ file_accessed(file);
+ retval = generic_file_read(file, buf, chunk, ppos);
+ ll_tree_unlock(&tree);
+ } else {
+ retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
+ }
- ll_tree_unlock(&tree);
+ ll_rw_stats_tally(sbi, current->pid, file, chunk, 0);
if (retval > 0) {
buf += retval;
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
struct ll_lock_tree tree;
- struct ll_lock_tree_node *node;
loff_t maxbytes = ll_file_maxbytes(inode);
loff_t lock_start, lock_end, end;
ssize_t retval, chunk, sum = 0;
- int rc;
+ int tree_locked;
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
lock_start = *ppos;
lock_end = *ppos + count - 1;
}
- node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
-
- if (IS_ERR(node))
- GOTO(out, retval = PTR_ERR(node));
- tree.lt_fd = LUSTRE_FPRIVATE(file);
- rc = ll_tree_lock(&tree, node, buf, count,
- file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
- if (rc != 0)
- GOTO(out, retval = rc);
+ tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
+ lock_start, lock_end, WRITE);
+ if (tree_locked < 0)
+ GOTO(out, retval = tree_locked);
/* This is ok, g_f_w will overwrite this under i_sem if it races
* with a local truncate, it just makes our maxbyte checking easier.
send_sig(SIGXFSZ, current, 0);
GOTO(out_unlock, retval = -EFBIG);
}
- if (*ppos + count > maxbytes)
- count = maxbytes - *ppos;
+ if (end > maxbytes - 1)
+ end = maxbytes - 1;
/* generic_file_write handles O_APPEND after getting i_mutex */
chunk = end - *ppos + 1;
CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
inode->i_ino, chunk, *ppos);
- retval = generic_file_write(file, buf, chunk, ppos);
- ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
+ if (tree_locked)
+ retval = generic_file_write(file, buf, chunk, ppos);
+ else
+ retval = ll_file_lockless_io(file, (char*)buf, chunk,
+ ppos, WRITE);
+ ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
out_unlock:
- ll_tree_unlock(&tree);
+ if (tree_locked)
+ ll_tree_unlock(&tree);
out:
if (retval > 0) {
if (rc != 0)
RETURN(rc);
+ ll_clear_file_contended(inode);
ll_inode_size_lock(inode, 1);
/*
* Consistency guarantees: following possibilities exist for the
/* Sizeon-on-MDS attributes are changed. An attribute update needs to
* be sent to MDS. */
LLIF_SOM_DIRTY = (1 << 3),
+ /* File is contented */
+ LLIF_CONTENDED = (1 << 4),
+ /* Truncate uses server lock for this file */
+ LLIF_SRVLOCK = (1 << 5)
+
};
struct ll_inode_info {
__u64 lli_maxbytes;
__u64 lli_ioepoch;
unsigned long lli_flags;
+ cfs_time_t lli_contention_time;
/* this lock protects posix_acl, pending_write_llaps, mmap_cnt */
spinlock_t lli_lock;
#define LL_SBI_LOCALFLOCK 0x200 /* Local flocks support by kernel */
#define LL_SBI_LRU_RESIZE 0x400 /* lru resize support */
+/* default value for ll_sb_info->contention_time */
+#define SBI_DEFAULT_CONTENTION_SECONDS 60
+/* default value for lockless_truncate_enable */
+#define SBI_DEFAULT_LOCKLESS_TRUNCATE_ENABLE 1
#define RCE_HASHES 32
struct rmtacl_ctl_entry {
unsigned long ll_pglist_gen;
struct list_head ll_pglist; /* all pages (llap_pglist_item) */
+ unsigned ll_contention_time; /* seconds */
+ unsigned ll_lockless_truncate_enable; /* true/false */
+
struct ll_ra_info ll_ra_info;
unsigned int ll_namelen;
struct file_operations *ll_fop;
llap_defer_uptodate:1,
llap_origin:3,
llap_ra_used:1,
- llap_ignore_quota:1;
+ llap_ignore_quota:1,
+ llap_lockless_io_page:1;
void *llap_cookie;
struct page *llap_page;
struct list_head llap_pending_write;
LLAP_ORIGIN_COMMIT_WRITE,
LLAP_ORIGIN_WRITEPAGE,
LLAP_ORIGIN_REMOVEPAGE,
+ LLAP_ORIGIN_LOCKLESS_IO,
LLAP__ORIGIN_MAX,
};
extern char *llap_origins[];
void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras);
void ll_ra_accounting(struct ll_async_page *llap,struct address_space *mapping);
void ll_truncate(struct inode *inode);
+int ll_file_punch(struct inode *, loff_t, int);
+ssize_t ll_file_lockless_io(struct file *, char *, size_t, loff_t *, int);
+void ll_clear_file_contended(struct inode*);
int ll_sync_page_range(struct inode *, struct address_space *, loff_t, size_t);
/* llite/file.c */
SBI_DEFAULT_READAHEAD_MAX);
sbi->ll_ra_info.ra_max_read_ahead_whole_pages =
SBI_DEFAULT_READAHEAD_WHOLE_MAX;
-
+ sbi->ll_contention_time = SBI_DEFAULT_CONTENTION_SECONDS;
+ sbi->ll_lockless_truncate_enable = SBI_DEFAULT_LOCKLESS_TRUNCATE_ENABLE;
INIT_LIST_HEAD(&sbi->ll_conn_chain);
INIT_LIST_HEAD(&sbi->ll_orphan_dentry_list);
data->ocd_connect_flags = OBD_CONNECT_GRANT | OBD_CONNECT_VERSION |
OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE |
- OBD_CONNECT_CANCELSET | OBD_CONNECT_FID;
+ OBD_CONNECT_CANCELSET | OBD_CONNECT_FID |
+ OBD_CONNECT_SRVLOCK | OBD_CONNECT_TRUNCLOCK;
if (sbi->ll_flags & LL_SBI_OSS_CAPA)
data->ocd_connect_flags |= OBD_CONNECT_OSS_CAPA;
RETURN(rc);
}
+static int ll_setattr_do_truncate(struct inode *inode, loff_t new_size)
+{
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct lov_stripe_md *lsm = lli->lli_smd;
+ int rc;
+ ldlm_policy_data_t policy = { .l_extent = {new_size,
+ OBD_OBJECT_EOF } };
+ struct lustre_handle lockh = { 0 };
+ int local_lock = 0; /* 0 - no local lock;
+ * 1 - lock taken by lock_extent;
+ * 2 - by obd_match*/
+ int ast_flags;
+ int err;
+ ENTRY;
+
+ UNLOCK_INODE_MUTEX(inode);
+ UP_WRITE_I_ALLOC_SEM(inode);
+
+ if (sbi->ll_lockless_truncate_enable &&
+ (sbi->ll_lco.lco_flags & OBD_CONNECT_TRUNCLOCK)) {
+ ast_flags = LDLM_FL_BLOCK_GRANTED;
+ rc = obd_match(sbi->ll_dt_exp, lsm, LDLM_EXTENT,
+ &policy, LCK_PW, &ast_flags, inode, &lockh);
+ if (rc > 0) {
+ local_lock = 2;
+ rc = 0;
+ } else if (rc == 0) {
+ rc = ll_file_punch(inode, new_size, 1);
+ }
+ } else {
+ /* XXX when we fix the AST intents to pass the discard-range
+ * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA
+ * XXX here. */
+ ast_flags = (new_size == 0) ? LDLM_AST_DISCARD_DATA : 0;
+ rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy,
+ &lockh, ast_flags);
+ if (likely(rc == 0))
+ local_lock = 1;
+ }
+
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+ DOWN_WRITE_I_ALLOC_SEM(inode);
+ LOCK_INODE_MUTEX(inode);
+#else
+ LOCK_INODE_MUTEX(inode);
+ DOWN_WRITE_I_ALLOC_SEM(inode);
+#endif
+ if (likely(rc == 0)) {
+ /* Only ll_inode_size_lock is taken at this level.
+ * lov_stripe_lock() is grabbed by ll_truncate() only over
+ * call to obd_adjust_kms(). If vmtruncate returns 0, then
+ * ll_truncate dropped ll_inode_size_lock() */
+ ll_inode_size_lock(inode, 0);
+ if (!local_lock) {
+ spin_lock(&lli->lli_lock);
+ lli->lli_flags |= LLIF_SRVLOCK;
+ spin_unlock(&lli->lli_lock);
+ }
+ rc = vmtruncate(inode, new_size);
+ if (!local_lock) {
+ spin_lock(&lli->lli_lock);
+ lli->lli_flags &= ~LLIF_SRVLOCK;
+ spin_unlock(&lli->lli_lock);
+ }
+ if (rc != 0) {
+ LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
+ ll_inode_size_unlock(inode, 0);
+ }
+ }
+
+ if (local_lock) {
+ if (local_lock == 2)
+ err = obd_cancel(sbi->ll_dt_exp, lsm, LCK_PW, &lockh);
+ else
+ err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
+ if (unlikely(err != 0)){
+ CERROR("extent unlock failed: err=%d,"
+ " unlock method =%d\n", err, local_lock);
+ if (rc == 0)
+ rc = err;
+ }
+ }
+ RETURN(rc);
+}
+
/* If this inode has objects allocated to it (lsm != NULL), then the OST
* object(s) determine the file size and mtime. Otherwise, the MDS will
* keep these values until such a time that objects are allocated for it.
* last one is especially bad for racing o_append users on other
* nodes. */
if (ia_valid & ATTR_SIZE) {
- ldlm_policy_data_t policy = { .l_extent = {attr->ia_size,
- OBD_OBJECT_EOF } };
- struct lustre_handle lockh = { 0 };
- int err, ast_flags = 0;
- /* XXX when we fix the AST intents to pass the discard-range
- * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA
- * XXX here. */
- if (attr->ia_size == 0)
- ast_flags = LDLM_AST_DISCARD_DATA;
-
- UNLOCK_INODE_MUTEX(inode);
- UP_WRITE_I_ALLOC_SEM(inode);
- rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy, &lockh,
- ast_flags);
- LOCK_INODE_MUTEX(inode);
- DOWN_WRITE_I_ALLOC_SEM(inode);
-
- if (rc != 0)
- GOTO(out, rc);
-
- /* Only ll_inode_size_lock is taken at this level.
- * lov_stripe_lock() is grabbed by ll_truncate() only over
- * call to obd_adjust_kms(). If vmtruncate returns 0, then
- * ll_truncate dropped ll_inode_size_lock() */
- ll_inode_size_lock(inode, 0);
- rc = vmtruncate(inode, attr->ia_size);
- if (rc != 0) {
- LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
- ll_inode_size_unlock(inode, 0);
- }
-
- err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
- if (err) {
- CERROR("ll_extent_unlock failed: %d\n", err);
- if (!rc)
- rc = err;
- }
+ rc = ll_setattr_do_truncate(inode, attr->ia_size);
} else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) {
obd_flag flags;
struct obd_info oinfo = { { { 0 } } };
[LLAP_ORIGIN_READAHEAD] = "ra",
[LLAP_ORIGIN_COMMIT_WRITE] = "cw",
[LLAP_ORIGIN_WRITEPAGE] = "wp",
+ [LLAP_ORIGIN_LOCKLESS_IO] = "ls"
};
struct ll_async_page *llite_pglist_next_llap(struct ll_sb_info *sbi,
RETURN(NULL);
}
+ ll_clear_file_contended(inode);
+
/* start and end the lock on the first and last bytes in the page */
policy_from_vma(&policy, vma, address, CFS_PAGE_SIZE);
return (ll_wr_track_id(buffer, count, data, STATS_TRACK_GID));
}
+static int ll_rd_contention_time(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct super_block *sb = data;
+
+ *eof = 1;
+ return snprintf(page, count, "%u\n", ll_s2sbi(sb)->ll_contention_time);
+
+}
+
+static int ll_wr_contention_time(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct super_block *sb = data;
+ struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+ return lprocfs_write_helper(buffer, count,&sbi->ll_contention_time) ?:
+ count;
+}
+
+static int ll_rd_lockless_truncate(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct super_block *sb = data;
+
+ *eof = 1;
+ return snprintf(page, count, "%u\n",
+ ll_s2sbi(sb)->ll_lockless_truncate_enable);
+}
+
+static int ll_wr_lockless_truncate(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct super_block *sb = data;
+ struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+ return lprocfs_write_helper(buffer, count,
+ &sbi->ll_lockless_truncate_enable)
+ ?: count;
+}
+
static struct lprocfs_vars lprocfs_llite_obd_vars[] = {
{ "uuid", ll_rd_sb_uuid, 0, 0 },
//{ "mntpt_path", ll_rd_path, 0, 0 },
{ "stats_track_pid", ll_rd_track_pid, ll_wr_track_pid, 0 },
{ "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 },
{ "stats_track_gid", ll_rd_track_gid, ll_wr_track_gid, 0 },
+ { "contention_seconds", ll_rd_contention_time, ll_wr_contention_time, 0},
+ { "lockless_truncate", ll_rd_lockless_truncate,
+ ll_wr_lockless_truncate, 0},
{ 0 }
};
/* inode operation */
{ LPROC_LL_SETATTR, LPROCFS_TYPE_REGS, "setattr" },
{ LPROC_LL_TRUNC, LPROCFS_TYPE_REGS, "truncate" },
+ { LPROC_LL_LOCKLESS_TRUNC, LPROCFS_TYPE_REGS, "lockless_truncate"},
{ LPROC_LL_FLOCK, LPROCFS_TYPE_REGS, "flock" },
{ LPROC_LL_GETATTR, LPROCFS_TYPE_REGS, "getattr" },
/* special inode operation */
"direct_read" },
{ LPROC_LL_DIRECT_WRITE, LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_PAGES,
"direct_write" },
+ { LPROC_LL_LOCKLESS_READ, LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_BYTES,
+ "lockless_read_bytes" },
+ { LPROC_LL_LOCKLESS_WRITE, LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_BYTES,
+ "lockless_write_bytes" },
};
RETURN(rc);
}
+int ll_file_punch(struct inode * inode, loff_t new_size, int srvlock)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct obd_info oinfo = { { { 0 } } };
+ struct obdo oa;
+ int rc;
+
+ ENTRY;
+ CDEBUG(D_INFO, "calling punch for "LPX64" (new size %Lu=%#Lx)\n",
+ lli->lli_smd->lsm_object_id, i_size_read(inode), i_size_read(inode));
+
+ oinfo.oi_md = lli->lli_smd;
+ oinfo.oi_policy.l_extent.start = new_size;
+ oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
+ oinfo.oi_oa = &oa;
+ oa.o_id = lli->lli_smd->lsm_object_id;
+ oa.o_gr = lli->lli_smd->lsm_object_gr;
+ oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+ if (srvlock) {
+ /* set OBD_MD_FLFLAGS in o_valid, only if we
+ * set OBD_FL_TRUNCLOCK, otherwise ost_punch
+ * and filter_setattr get confused, see the comment
+ * in ost_punch */
+ oa.o_flags = OBD_FL_TRUNCLOCK;
+ oa.o_valid |= OBD_MD_FLFLAGS;
+ }
+ obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
+ OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME |
+ OBD_MD_FLFID | OBD_MD_FLGENER);
+
+ oinfo.oi_capa = ll_osscapa_get(inode, CAPA_OPC_OSS_TRUNC);
+ rc = obd_punch_rqset(ll_i2dtexp(inode), &oinfo, NULL);
+ ll_truncate_free_capa(oinfo.oi_capa);
+ if (rc)
+ CERROR("obd_truncate fails (%d) ino %lu\n", rc, inode->i_ino);
+ else
+ obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+ OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+ RETURN(rc);
+}
+
/* this isn't where truncate starts. roughly:
* sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate. setattr_raw grabs
* DLM lock on [size, EOF], i_mutex, ->lli_size_sem, and WRITE_I_ALLOC_SEM to
void ll_truncate(struct inode *inode)
{
struct ll_inode_info *lli = ll_i2info(inode);
- struct obd_info oinfo = { { { 0 } } };
- struct ost_lvb lvb;
- struct obdo oa;
- int rc;
+ int srvlock = !!(lli->lli_flags & LLIF_SRVLOCK);
+ loff_t new_size;
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) to %Lu=%#Lx\n",inode->i_ino,
inode->i_generation, inode, i_size_read(inode),
LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
- /* XXX I'm pretty sure this is a hack to paper over a more fundamental
- * race condition. */
- lov_stripe_lock(lli->lli_smd);
- inode_init_lvb(inode, &lvb);
- rc = obd_merge_lvb(ll_i2dtexp(inode), lli->lli_smd, &lvb, 0);
- if (lvb.lvb_size == i_size_read(inode) && rc == 0) {
- CDEBUG(D_VFSTRACE, "skipping punch for obj "LPX64", %Lu=%#Lx\n",
- lli->lli_smd->lsm_object_id, i_size_read(inode),
- i_size_read(inode));
+ if (!srvlock) {
+ struct ost_lvb lvb;
+ int rc;
+
+ /* XXX I'm pretty sure this is a hack to paper
+ * over a more fundamental race condition. */
+ lov_stripe_lock(lli->lli_smd);
+ inode_init_lvb(inode, &lvb);
+ rc = obd_merge_lvb(ll_i2dtexp(inode), lli->lli_smd, &lvb, 0);
+ if (lvb.lvb_size == i_size_read(inode) && rc == 0) {
+ CDEBUG(D_VFSTRACE, "skipping punch for obj "LPX64
+ ",%Lu=%#Lx\n", lli->lli_smd->lsm_object_id,
+ i_size_read(inode), i_size_read(inode));
+ lov_stripe_unlock(lli->lli_smd);
+ GOTO(out_unlock, 0);
+ }
+ obd_adjust_kms(ll_i2dtexp(inode), lli->lli_smd,
+ i_size_read(inode), 1);
lov_stripe_unlock(lli->lli_smd);
- GOTO(out_unlock, 0);
}
- obd_adjust_kms(ll_i2dtexp(inode), lli->lli_smd, i_size_read(inode), 1);
- lov_stripe_unlock(lli->lli_smd);
-
if (unlikely((ll_i2sbi(inode)->ll_flags & LL_SBI_CHECKSUM) &&
(i_size_read(inode) & ~CFS_PAGE_MASK))) {
/* If the truncate leaves behind a partial page, update its
}
}
- CDEBUG(D_INFO, "calling punch for "LPX64" (new size %Lu=%#Lx)\n",
- lli->lli_smd->lsm_object_id, i_size_read(inode), i_size_read(inode));
-
- oinfo.oi_md = lli->lli_smd;
- oinfo.oi_policy.l_extent.start = i_size_read(inode);
- oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
- oinfo.oi_oa = &oa;
- oa.o_id = lli->lli_smd->lsm_object_id;
- oa.o_gr = lli->lli_smd->lsm_object_gr;
- oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
-
- obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
- OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME |
- OBD_MD_FLFID | OBD_MD_FLGENER);
-
+ new_size = i_size_read(inode);
ll_inode_size_unlock(inode, 0);
-
- oinfo.oi_capa = ll_osscapa_get(inode, CAPA_OPC_OSS_TRUNC);
- rc = obd_punch_rqset(ll_i2dtexp(inode), &oinfo, NULL);
- ll_truncate_free_capa(oinfo.oi_capa);
- if (rc)
- CERROR("obd_truncate fails (%d) ino %lu\n", rc, inode->i_ino);
+ if (!srvlock)
+ ll_file_punch(inode, new_size, 0);
else
- obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
- OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+ ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LOCKLESS_TRUNC, 1);
+
EXIT;
return;
OSC_DEFAULT_CKSUM);
kunmap_atomic(kaddr, KM_USER0);
if (origin == LLAP_ORIGIN_READAHEAD ||
- origin == LLAP_ORIGIN_READPAGE) {
+ origin == LLAP_ORIGIN_READPAGE ||
+ origin == LLAP_ORIGIN_LOCKLESS_IO) {
llap->llap_checksum = 0;
} else if (origin == LLAP_ORIGIN_COMMIT_WRITE ||
llap->llap_checksum == 0) {
RETURN(ret);
}
-/* the kernel calls us here when a page is unhashed from the page cache.
- * the page will be locked and the kernel is holding a spinlock, so
- * we need to be careful. we're just tearing down our book-keeping
- * here. */
-void ll_removepage(struct page *page)
+static void __ll_put_llap(struct page *page)
{
struct inode *inode = page->mapping->host;
struct obd_export *exp;
int rc;
ENTRY;
- LASSERT(!in_interrupt());
-
- /* sync pages or failed read pages can leave pages in the page
- * cache that don't have our data associated with them anymore */
- if (page_private(page) == 0) {
- EXIT;
- return;
- }
-
- LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n");
-
exp = ll_i2dtexp(inode);
if (exp == NULL) {
CERROR("page %p ind %lu gave null export\n", page, page->index);
EXIT;
}
+/* the kernel calls us here when a page is unhashed from the page cache.
+ * the page will be locked and the kernel is holding a spinlock, so
+ * we need to be careful. we're just tearing down our book-keeping
+ * here. */
+void ll_removepage(struct page *page)
+{
+ ENTRY;
+
+ LASSERT(!in_interrupt());
+
+ /* sync pages or failed read pages can leave pages in the page
+ * cache that don't have our data associated with them anymore */
+ if (page_private(page) == 0) {
+ EXIT;
+ return;
+ }
+
+ LASSERT(!llap_cast_private(page)->llap_lockless_io_page);
+ LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n");
+ __ll_put_llap(page);
+ EXIT;
+}
+
static int ll_page_matches(struct page *page, int fd_flags)
{
struct lustre_handle match_lockh = {0};
oig_release(oig);
RETURN(rc);
}
+
+static void ll_file_put_pages(struct page **pages, int numpages)
+{
+ int i;
+ struct page **pp;
+ ENTRY;
+
+ for (i = 0, pp = pages; i < numpages; i++, pp++) {
+ if (*pp) {
+ LL_CDEBUG_PAGE(D_PAGE, (*pp), "free\n");
+ __ll_put_llap(*pp);
+ if (page_private(*pp))
+ CERROR("the llap wasn't freed\n");
+ (*pp)->mapping = NULL;
+ if (page_count(*pp) != 1)
+ CERROR("page %p, flags %#lx, count %i, private %p\n",
+ (*pp), (unsigned long)(*pp)->flags, page_count(*pp),
+ (void*)page_private(*pp));
+ __free_pages(*pp, 0);
+ }
+ }
+ OBD_FREE(pages, numpages * sizeof(struct page*));
+ EXIT;
+}
+
+static struct page **ll_file_prepare_pages(int numpages, struct inode *inode,
+ unsigned long first)
+{
+ struct page **pages;
+ int i;
+ int rc = 0;
+ ENTRY;
+
+ OBD_ALLOC(pages, sizeof(struct page *) * numpages);
+ if (pages == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+ for (i = 0; i < numpages; i++) {
+ struct page *page;
+ struct ll_async_page *llap;
+
+ page = alloc_pages(GFP_HIGHUSER, 0);
+ if (page == NULL)
+ GOTO(err, rc = -ENOMEM);
+ pages[i] = page;
+ /* llap_from_page needs page index and mapping to be set */
+ page->index = first++;
+ page->mapping = inode->i_mapping;
+ llap = llap_from_page(page, LLAP_ORIGIN_LOCKLESS_IO);
+ if (IS_ERR(llap))
+ GOTO(err, rc = PTR_ERR(llap));
+ llap->llap_lockless_io_page = 1;
+ }
+ RETURN(pages);
+err:
+ ll_file_put_pages(pages, numpages);
+ RETURN(ERR_PTR(rc));
+ }
+
+static ssize_t ll_file_copy_pages(struct page **pages, int numpages,
+ char *buf, loff_t pos, size_t count, int rw)
+{
+ ssize_t amount = 0;
+ int i;
+ int updatechecksum = ll_i2sbi(pages[0]->mapping->host)->ll_flags &
+ LL_SBI_CHECKSUM;
+ ENTRY;
+
+ for (i = 0; i < numpages; i++) {
+ unsigned offset, bytes, left;
+ char *vaddr;
+
+ vaddr = kmap(pages[i]);
+ offset = pos & (CFS_PAGE_SIZE - 1);
+ bytes = min_t(unsigned, CFS_PAGE_SIZE - offset, count);
+ LL_CDEBUG_PAGE(D_PAGE, pages[i], "op = %s, addr = %p, "
+ "buf = %p, bytes = %u\n",
+ (rw == WRITE) ? "CFU" : "CTU",
+ vaddr + offset, buf, bytes);
+ if (rw == WRITE) {
+ left = copy_from_user(vaddr + offset, buf, bytes);
+ if (updatechecksum) {
+ struct ll_async_page *llap;
+
+ llap = llap_cast_private(pages[i]);
+ llap->llap_checksum = crc32_le(0, vaddr,
+ CFS_PAGE_SIZE);
+ }
+ } else {
+ left = copy_to_user(buf, vaddr + offset, bytes);
+ }
+ kunmap(pages[i]);
+ amount += bytes;
+ if (left) {
+ amount -= left;
+ break;
+ }
+ buf += bytes;
+ count -= bytes;
+ pos += bytes;
+ }
+ if (amount == 0)
+ RETURN(-EFAULT);
+ RETURN(amount);
+}
+
+static int ll_file_oig_pages(struct inode * inode, struct page **pages,
+ int numpages, loff_t pos, size_t count, int rw)
+{
+ struct obd_io_group *oig;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct obd_export *exp;
+ loff_t org_pos = pos;
+ obd_flag brw_flags;
+ int rc;
+ int i;
+ ENTRY;
+
+ exp = ll_i2dtexp(inode);
+ if (exp == NULL)
+ RETURN(-EINVAL);
+ rc = oig_init(&oig);
+ if (rc)
+ RETURN(rc);
+ brw_flags = OBD_BRW_SRVLOCK;
+ if (capable(CAP_SYS_RESOURCE))
+ brw_flags |= OBD_BRW_NOQUOTA;
+
+ for (i = 0; i < numpages; i++) {
+ struct ll_async_page *llap;
+ unsigned from, bytes;
+
+ from = pos & (CFS_PAGE_SIZE - 1);
+ bytes = min_t(unsigned, CFS_PAGE_SIZE - from,
+ count - pos + org_pos);
+ llap = llap_cast_private(pages[i]);
+ LASSERT(llap);
+
+ lock_page(pages[i]);
+
+ LL_CDEBUG_PAGE(D_PAGE, pages[i], "offset "LPU64","
+ " from %u, bytes = %u\n",
+ pos, from, bytes);
+ LASSERTF(pos >> CFS_PAGE_SHIFT == pages[i]->index,
+ "wrong page index %lu (%lu)\n",
+ pages[i]->index,
+ (unsigned long)(pos >> CFS_PAGE_SHIFT));
+ rc = obd_queue_group_io(exp, lli->lli_smd, NULL, oig,
+ llap->llap_cookie,
+ (rw == WRITE) ?
+ OBD_BRW_WRITE:OBD_BRW_READ,
+ from, bytes, brw_flags,
+ ASYNC_READY | ASYNC_URGENT |
+ ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
+ if (rc) {
+ i++;
+ GOTO(out, rc);
+ }
+ pos += bytes;
+ }
+ rc = obd_trigger_group_io(exp, lli->lli_smd, NULL, oig);
+ if (rc)
+ GOTO(out, rc);
+ rc = oig_wait(oig);
+out:
+ while(--i >= 0)
+ unlock_page(pages[i]);
+ oig_release(oig);
+ RETURN(rc);
+}
+
+ssize_t ll_file_lockless_io(struct file *file, char *buf, size_t count,
+ loff_t *ppos, int rw)
+{
+ loff_t pos;
+ struct inode *inode = file->f_dentry->d_inode;
+ ssize_t rc = 0;
+ int max_pages;
+ size_t amount = 0;
+ unsigned long first, last;
+ ENTRY;
+
+ if (rw == READ) {
+ loff_t isize;
+
+ ll_inode_size_lock(inode, 0);
+ isize = i_size_read(inode);
+ ll_inode_size_unlock(inode, 0);
+ if (*ppos >= isize)
+ GOTO(out, rc = 0);
+ if (*ppos + count >= isize)
+ count -= *ppos + count - isize;
+ if (count == 0)
+ GOTO(out, rc);
+ } else {
+ rc = generic_write_checks(file, ppos, &count, 0);
+ if (rc)
+ GOTO(out, rc);
+ rc = remove_suid(file->f_dentry);
+ if (rc)
+ GOTO(out, rc);
+ }
+ pos = *ppos;
+ first = pos >> CFS_PAGE_SHIFT;
+ last = (pos + count - 1) >> CFS_PAGE_SHIFT;
+ max_pages = PTLRPC_MAX_BRW_PAGES *
+ ll_i2info(inode)->lli_smd->lsm_stripe_count;
+ CDEBUG(D_INFO, "%u, stripe_count = %u\n",
+ PTLRPC_MAX_BRW_PAGES /* max_pages_per_rpc */,
+ ll_i2info(inode)->lli_smd->lsm_stripe_count);
+
+ while (first <= last && rc >= 0) {
+ int pages_for_io;
+ struct page **pages;
+ size_t bytes = count - amount;
+
+ pages_for_io = min_t(int, last - first + 1, max_pages);
+ pages = ll_file_prepare_pages(pages_for_io, inode, first);
+ if (IS_ERR(pages)) {
+ rc = PTR_ERR(pages);
+ break;
+ }
+ if (rw == WRITE) {
+ rc = ll_file_copy_pages(pages, pages_for_io, buf,
+ pos + amount, bytes, rw);
+ if (rc < 0)
+ GOTO(put_pages, rc);
+ bytes = rc;
+ }
+ rc = ll_file_oig_pages(inode, pages, pages_for_io,
+ pos + amount, bytes, rw);
+ if (rc)
+ GOTO(put_pages, rc);
+ if (rw == READ) {
+ rc = ll_file_copy_pages(pages, pages_for_io, buf,
+ pos + amount, bytes, rw);
+ if (rc < 0)
+ GOTO(put_pages, rc);
+ bytes = rc;
+ }
+ amount += bytes;
+ buf += bytes;
+put_pages:
+ ll_file_put_pages(pages, pages_for_io);
+ first += pages_for_io;
+ /* a short read/write check */
+ if (pos + amount < ((loff_t)first << CFS_PAGE_SHIFT))
+ break;
+ }
+ /* NOTE: don't update i_size and KMS in absence of LDLM locks even
+ * write makes the file large */
+ file_accessed(file);
+ if (rw == READ && amount < count && rc == 0) {
+ unsigned long not_cleared;
+
+ not_cleared = clear_user(buf, count - amount);
+ amount = count - not_cleared;
+ if (not_cleared)
+ rc = -EFAULT;
+ }
+ if (amount > 0) {
+ lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
+ (rw == WRITE) ?
+ LPROC_LL_LOCKLESS_WRITE :
+ LPROC_LL_LOCKLESS_READ,
+ (long)amount);
+ *ppos += amount;
+ RETURN(amount);
+ }
+out:
+ RETURN(rc);
+}
memset(lov_lockhp, 0, sizeof(*lov_lockhp));
if (lov->lov_tgts[req->rq_idx] &&
lov->lov_tgts[req->rq_idx]->ltd_active) {
- if (rc != -EINTR)
+ /* -EUSERS used by OST to report file contention */
+ if (rc != -EINTR && rc != -EUSERS)
CERROR("enqueue objid "LPX64" subobj "
LPX64" on OST idx %d: rc %d\n",
set->set_oi->oi_md->lsm_object_id,
CFS_LIST_HEAD(rpc_list);
unsigned int ending_offset;
unsigned starting_offset = 0;
+ int srvlock = 0;
ENTRY;
/* first we find the pages we're allowed to work with */
LASSERT(oap->oap_magic == OAP_MAGIC);
+ if (page_count != 0 &&
+ srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
+ CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
+ " oap %p, page %p, srvlock %u\n",
+ oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
+ break;
+ }
/* in llite being 'ready' equates to the page being locked
* until completion unlocks it. commit_write submits a page
* as not ready because its unlock will happen unconditionally
/* now put the page back in our accounting */
list_add_tail(&oap->oap_rpc_item, &rpc_list);
+ if (page_count == 0)
+ srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
if (++page_count >= cli->cl_max_pages_per_rpc)
break;
}
run_test 31 "voluntary cancel / blocking ast race=============="
+# enable/disable lockless truncate feature, depending on the arg 0/1
+enable_lockless_truncate() {
+ lctl set_param -n llite.*.lockless_truncate $1
+}
+
+test_32a() { # bug 11270
+ local p="$TMP/sanityN-$TESTNAME.parameters"
+ save_lustre_params $HOSTNAME llite.*.lockless_truncate > $p
+ cancel_lru_locks osc
+ clear_llite_stats
+ enable_lockless_truncate 1
+ dd if=/dev/zero of=$DIR1/$tfile count=10 bs=1M > /dev/null 2>&1
+
+ log "checking cached lockless truncate"
+ $TRUNCATE $DIR1/$tfile 8000000
+ $CHECKSTAT -s 8000000 $DIR2/$tfile || error "wrong file size"
+ [ $(calc_llite_stats lockless_truncate) -eq 0 ] ||
+ error "lockless truncate doesn't use cached locks"
+
+ log "checking not cached lockless truncate"
+ $TRUNCATE $DIR2/$tfile 5000000
+ $CHECKSTAT -s 5000000 $DIR1/$tfile || error "wrong file size"
+ [ $(calc_llite_stats lockless_truncate) -ne 0 ] ||
+ error "not cached trancate isn't lockless"
+
+ log "disabled lockless truncate"
+ enable_lockless_truncate 0
+ clear_llite_stats
+ $TRUNCATE $DIR2/$tfile 3000000
+ $CHECKSTAT -s 3000000 $DIR1/$tfile || error "wrong file size"
+ [ $(calc_llite_stats lockless_truncate) -eq 0 ] ||
+ error "lockless truncate disabling failed"
+ rm $DIR1/$tfile
+ # restore lockless_truncate default values
+ restore_lustre_params < $p
+ rm -f $p
+}
+run_test 32a "lockless truncate"
+
+test_32b() { # bug 11270
+ local node
+ local p="$TMP/sanityN-$TESTNAME.parameters"
+ save_lustre_params $HOSTNAME "llite.*.contention_seconds" > $p
+ for node in $(osts_nodes); do
+ save_lustre_params $node "ldlm.namespaces.filter-*.max_nolock_bytes" >> $p
+ save_lustre_params $node "ldlm.namespaces.filter-*.contended_locks" >> $p
+ save_lustre_params $node "ldlm.namespaces.filter-*.contention_seconds" >> $p
+ done
+ clear_llite_stats
+ # agressive lockless i/o settings
+ for node in $(osts_nodes); do
+ do_node $node 'lctl set_param -n ldlm.namespaces.filter-*.max_nolock_bytes 2000000; lctl set_param -n ldlm.namespaces.filter-*.contended_locks 0; lctl set_param -n ldlm.namespaces.filter-*.contention_seconds 60'
+ done
+ lctl set_param -n llite.*.contention_seconds 60
+ for i in $(seq 5); do
+ dd if=/dev/zero of=$DIR1/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+ dd if=/dev/zero of=$DIR2/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+ done
+ [ $(calc_llite_stats lockless_write_bytes) -ne 0 ] || error "lockless i/o was not triggered"
+ # disable lockless i/o (it is disabled by default)
+ for node in $(osts_nodes); do
+ do_node $node 'lctl set_param -n ldlm.namespaces.filter-*.max_nolock_bytes 0; lctl set_param -n ldlm.namespaces.filter-*.contended_locks 32; lctl set_param -n ldlm.namespaces.filter-*.contention_seconds 0'
+ done
+ # set contention_seconds to 0 at client too, otherwise Lustre still
+ # remembers lock contention
+ lctl set_param -n llite.*.contention_seconds 0
+ clear_llite_stats
+ for i in $(seq 5); do
+ dd if=/dev/zero of=$DIR1/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+ dd if=/dev/zero of=$DIR2/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1
+ done
+ [ $(calc_llite_stats lockless_write_bytes) -eq 0 ] ||
+ error "lockless i/o works when disabled"
+ rm -f $DIR1/$tfile
+ restore_lustre_params <$p
+ rm -f $p
+}
+run_test 32b "lockless i/o"
+
log "cleanup: ======================================================"
check_and_cleanup_lustre
return 0
}
+
+# reset llite stat counters
+clear_llite_stats(){
+ lctl set_param -n llite.*.stats 0
+}
+
+# sum llite stat items
+calc_llite_stats() {
+ local res=$(lctl get_param -n llite.*.stats |
+ awk 'BEGIN {s = 0} END {print s} /^'"$1"'/ {s += $2}')
+ echo $res
+}
+
+# save_lustre_params(node, parameter_mask)
+# generate a stream of formatted strings (<node> <param name>=<param value>)
+save_lustre_params() {
+ local s
+ do_node $1 "lctl get_param $2" | while read s; do echo "$1 $s"; done
+}
+
+# restore lustre parameters from input stream, produces by save_lustre_params
+restore_lustre_params() {
+ local node
+ local name
+ local val
+ while IFS=" =" read node name val; do
+ do_node $node "lctl set_param -n $name $val"
+ done
+}
+