reconnect to the last active server first before trying the
other potential connections.
+Severity : enhancement
+Bugzilla : 11270
+Description: eliminate client locks in face of contention
+Details : file contention detection and lockless i/o implementation
+ for contended files.
+
--------------------------------------------------------------------------------
2007-08-27 Cluster File Systems, Inc. <info@clusterfs.com>
LPROC_LL_INODE_PERM,
LPROC_LL_DIRECT_READ,
LPROC_LL_DIRECT_WRITE,
+ LPROC_LL_LOCKLESS_READ,
+ LPROC_LL_LOCKLESS_WRITE,
LPROC_LL_FILE_OPCODES
};
int buf_idx, int increase);
};
+/* default values for the "max_nolock_size", "contention_time"
+ * and "contended_locks" namespace tunables */
+#define NS_DEFAULT_MAX_NOLOCK_BYTES 131072
+#define NS_DEFAULT_CONTENTION_SECONDS 2
+#define NS_DEFAULT_CONTENDED_LOCKS 0
+
struct ldlm_namespace {
char *ns_name;
__u32 ns_client; /* is this a client-side lock tree? */
struct ldlm_valblock_ops *ns_lvbo;
void *ns_lvbp;
cfs_waitq_t ns_waitq;
+ /* if more than @ns_contented_locks found, the resource considered
+ * as contended */
+ unsigned ns_contended_locks;
+ /* the resource remembers contended state during @ns_contention_time,
+ * in seconds */
+ unsigned ns_contention_time;
+ /* limit size of nolock requests, in bytes */
+ unsigned ns_max_nolock_size;
};
/*
struct semaphore lr_lvb_sem;
__u32 lr_lvb_len;
void *lr_lvb_data;
+
+ /* when the resource was considered as contended */
+ cfs_time_t lr_contention_time;
};
struct ldlm_ast_work {
void ldlm_resource_iterate(struct ldlm_namespace *, struct ldlm_res_id *,
ldlm_iterator_t iter, void *data);
+/* measure lock contention and return -EBUSY if locking contention is high */
+#define LDLM_FL_DENY_ON_CONTENTION 0x10000000
/* ldlm_flock.c */
int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data);
#include <lustre_dlm.h>
#include <obd_support.h>
+#include <obd.h>
#include <lustre_lib.h>
#include "ldlm_internal.h"
}
}
+static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
+{
+ struct ldlm_resource *res = lock->l_resource;
+ cfs_time_t now = cfs_time_current();
+
+ CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
+ if (contended_locks > res->lr_namespace->ns_contended_locks)
+ res->lr_contention_time = now;
+ return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
+ cfs_time_seconds(res->lr_namespace->ns_contention_time)));
+}
+
/* Determine if the lock is compatible with all locks on the queue.
* We stop walking the queue if we hit ourselves so we don't take
* conflicting locks enqueued after us into accound, or we'd wait forever.
static int
ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
int *flags, ldlm_error_t *err,
- struct list_head *work_list)
+ struct list_head *work_list, int *contended_locks)
{
struct list_head *tmp;
struct ldlm_lock *lock;
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
if (req == lock)
- RETURN(compat);
+ break;
if (unlikely(scan)) {
/* We only get here if we are queuing GROUP lock
ldlm_resource_insert_lock_after(lock, req);
list_del_init(&lock->l_res_link);
ldlm_resource_insert_lock_after(req, lock);
- RETURN(0);
+ compat = 0;
+ break;
}
if (req->l_policy_data.l_extent.gid ==
lock->l_policy_data.l_extent.gid) {
/* found it */
ldlm_resource_insert_lock_after(lock, req);
- RETURN(0);
+ compat = 0;
+ break;
}
continue;
}
ldlm_resource_insert_lock_after(lock, req);
list_del_init(&lock->l_res_link);
ldlm_resource_insert_lock_after(req, lock);
- RETURN(0);
+ break;
}
if (req->l_policy_data.l_extent.gid ==
lock->l_policy_data.l_extent.gid) {
/* found it */
ldlm_resource_insert_lock_after(lock, req);
- RETURN(0);
+ break;
}
continue;
}
if (!work_list)
RETURN(0);
+ /* don't count conflicting glimpse locks */
+ *contended_locks +=
+ !(lock->l_req_mode == LCK_PR &&
+ lock->l_policy_data.l_extent.start == 0 &&
+ lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF);
+
compat = 0;
if (lock->l_blocking_ast)
ldlm_add_ast_work_item(lock, req, work_list);
}
+ if (ldlm_check_contention(req, *contended_locks) &&
+ compat == 0 &&
+ (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
+ req->l_req_mode != LCK_GROUP &&
+ req_end - req_start <=
+ req->l_resource->lr_namespace->ns_max_nolock_size)
+ GOTO(destroylock, compat = -EBUSY);
+
RETURN(compat);
destroylock:
list_del_init(&req->l_res_link);
RETURN(compat);
}
+static void discard_bl_list(struct list_head *bl_list)
+{
+ struct list_head *tmp, *pos;
+ ENTRY;
+
+ list_for_each_safe(pos, tmp, bl_list) {
+ struct ldlm_lock *lock =
+ list_entry(pos, struct ldlm_lock, l_bl_ast);
+
+ list_del_init(&lock->l_bl_ast);
+ LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
+ lock->l_flags &= ~LDLM_FL_AST_SENT;
+ LASSERT(lock->l_bl_ast_run == 0);
+ LASSERT(lock->l_blocking_lock);
+ LDLM_LOCK_PUT(lock->l_blocking_lock);
+ lock->l_blocking_lock = NULL;
+ LDLM_LOCK_PUT(lock);
+ }
+ EXIT;
+}
+
/* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
* - blocking ASTs have already been sent
* - must call this function with the ns lock held
struct ldlm_resource *res = lock->l_resource;
struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list);
int rc, rc2;
+ int contended_locks = 0;
ENTRY;
LASSERT(list_empty(&res->lr_converting));
+ LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
+ !(lock->l_flags & LDLM_AST_DISCARD_DATA));
check_res_locked(res);
*err = ELDLM_OK;
* being true, we want to find out. */
LASSERT(*flags == 0);
rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
- err, NULL);
+ err, NULL, &contended_locks);
if (rc == 1) {
rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
- flags, err, NULL);
+ flags, err, NULL,
+ &contended_locks);
}
if (rc == 0)
RETURN(LDLM_ITER_STOP);
}
restart:
- rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list);
+ contended_locks = 0;
+ rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
+ &rpc_list, &contended_locks);
if (rc < 0)
GOTO(out, rc); /* lock was destroyed */
if (rc == 2)
goto grant;
- rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, &rpc_list);
+ rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
+ &rpc_list, &contended_locks);
if (rc2 < 0)
GOTO(out, rc = rc2); /* lock was destroyed */
*flags |= LDLM_FL_NO_TIMEOUT;
}
- rc = 0;
+ RETURN(0);
out:
+ if (!list_empty(&rpc_list)) {
+ LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
+ discard_bl_list(&rpc_list);
+ }
RETURN(rc);
}
lock_vars[0].read_fptr = lprocfs_uint_rd;
lock_vars[0].write_fptr = lprocfs_uint_wr;
lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+ } else {
+ snprintf(lock_name, MAX_STRING_SIZE, "%s/max_nolock_bytes",
+ ns->ns_name);
+ lock_vars[0].data = &ns->ns_max_nolock_size;
+ lock_vars[0].read_fptr = lprocfs_uint_rd;
+ lock_vars[0].write_fptr = lprocfs_uint_wr;
+ lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+
+ snprintf(lock_name, MAX_STRING_SIZE, "%s/contention_seconds",
+ ns->ns_name);
+ lock_vars[0].data = &ns->ns_contention_time;
+ lock_vars[0].read_fptr = lprocfs_uint_rd;
+ lock_vars[0].write_fptr = lprocfs_uint_wr;
+ lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+
+ snprintf(lock_name, MAX_STRING_SIZE, "%s/contended_locks",
+ ns->ns_name);
+ lock_vars[0].data = &ns->ns_contended_locks;
+ lock_vars[0].read_fptr = lprocfs_uint_rd;
+ lock_vars[0].write_fptr = lprocfs_uint_wr;
+ lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
}
}
#undef MAX_STRING_SIZE
atomic_set(&ns->ns_locks, 0);
ns->ns_resources = 0;
cfs_waitq_init(&ns->ns_waitq);
+ ns->ns_max_nolock_size = NS_DEFAULT_MAX_NOLOCK_BYTES;
+ ns->ns_contention_time = NS_DEFAULT_CONTENTION_SECONDS;
+ ns->ns_contended_locks = NS_DEFAULT_CONTENDED_LOCKS;
for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash;
bucket--)
RETURN(rc);
}
+static void ll_set_file_contended(struct inode *inode)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+
+ lli->lli_contention_time = cfs_time_current();
+ set_bit(LLI_F_CONTENDED, &lli->lli_flags);
+}
+
+void ll_clear_file_contended(struct inode *inode)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+
+ clear_bit(LLI_F_CONTENDED, &lli->lli_flags);
+}
+
+static int ll_is_file_contended(struct file *file)
+{
+ struct inode *inode = file->f_dentry->d_inode;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ ENTRY;
+
+ if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
+ CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
+ " osc connect flags = 0x"LPX64"\n",
+ sbi->ll_lco.lco_flags);
+ RETURN(0);
+ }
+ if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
+ RETURN(1);
+ if (test_bit(LLI_F_CONTENDED, &lli->lli_flags)) {
+ cfs_time_t cur_time = cfs_time_current();
+ cfs_time_t retry_time;
+
+ retry_time = cfs_time_add(
+ lli->lli_contention_time,
+ cfs_time_seconds(sbi->ll_contention_time));
+ if (cfs_time_after(cur_time, retry_time)) {
+ ll_clear_file_contended(inode);
+ RETURN(0);
+ }
+ RETURN(1);
+ }
+ RETURN(0);
+}
+
+static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
+ const char *buf, size_t count,
+ loff_t start, loff_t end, int rw)
+{
+ int append;
+ int tree_locked = 0;
+ int rc;
+ struct inode * inode = file->f_dentry->d_inode;
+
+ append = (rw == WRITE) && (file->f_flags & O_APPEND);
+
+ if (append || !ll_is_file_contended(file)) {
+ struct ll_lock_tree_node *node;
+ int ast_flags;
+
+ ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
+ if (file->f_flags & O_NONBLOCK)
+ ast_flags |= LDLM_FL_BLOCK_NOWAIT;
+ node = ll_node_from_inode(inode, start, end,
+ (rw == WRITE) ? LCK_PW : LCK_PR);
+ if (IS_ERR(node)) {
+ rc = PTR_ERR(node);
+ GOTO(out, rc);
+ }
+ tree->lt_fd = LUSTRE_FPRIVATE(file);
+ rc = ll_tree_lock(tree, node, buf, count, ast_flags);
+ if (rc == 0)
+ tree_locked = 1;
+ else if (rc == -EBUSY)
+ ll_set_file_contended(inode);
+ else
+ GOTO(out, rc);
+ }
+ RETURN(tree_locked);
+out:
+ return rc;
+}
+
static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
loff_t *ppos)
{
struct lov_stripe_md *lsm = lli->lli_smd;
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct ll_lock_tree tree;
- struct ll_lock_tree_node *node;
struct ost_lvb lvb;
struct ll_ra_read bead;
- int rc, ra = 0;
+ int ra = 0;
loff_t end;
ssize_t retval, chunk, sum = 0;
+ int tree_locked;
__u64 kms;
ENTRY;
RETURN(-EFAULT);
RETURN(count);
}
-
repeat:
if (sbi->ll_max_rw_chunk != 0) {
/* first, let's know the end of the current stripe */
} else {
end = *ppos + count - 1;
}
-
- node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
- if (IS_ERR(node)){
- GOTO(out, retval = PTR_ERR(node));
- }
- tree.lt_fd = LUSTRE_FPRIVATE(file);
- rc = ll_tree_lock(&tree, node, buf, count,
- file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
- if (rc != 0)
- GOTO(out, retval = rc);
+ tree_locked = ll_file_get_tree_lock(&tree, file, buf,
+ count, *ppos, end, READ);
+ if (tree_locked < 0)
+ GOTO(out, retval = tree_locked);
ll_inode_size_lock(inode, 1);
/*
inode->i_ino, chunk, *ppos, inode->i_size);
/* turn off the kernel's read-ahead */
+ if (tree_locked) {
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
- file->f_ramax = 0;
+ file->f_ramax = 0;
#else
- file->f_ra.ra_pages = 0;
+ file->f_ra.ra_pages = 0;
#endif
- /* initialize read-ahead window once per syscall */
- if (ra == 0) {
- ra = 1;
- bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
- bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
- ll_ra_read_in(file, &bead);
- }
+ /* initialize read-ahead window once per syscall */
+ if (ra == 0) {
+ ra = 1;
+ bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
+ bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
+ ll_ra_read_in(file, &bead);
+ }
- /* BUG: 5972 */
- file_accessed(file);
- retval = generic_file_read(file, buf, chunk, ppos);
+ /* BUG: 5972 */
+ file_accessed(file);
+ retval = generic_file_read(file, buf, chunk, ppos);
+ ll_tree_unlock(&tree);
+ } else {
+ retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
+ }
ll_rw_stats_tally(sbi, current->pid, file, count, 0);
-
- ll_tree_unlock(&tree);
-
if (retval > 0) {
buf += retval;
count -= retval;
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
struct ll_lock_tree tree;
- struct ll_lock_tree_node *node;
loff_t maxbytes = ll_file_maxbytes(inode);
loff_t lock_start, lock_end, end;
ssize_t retval, chunk, sum = 0;
- int rc;
+ int tree_locked;
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
lock_start = *ppos;
lock_end = *ppos + count - 1;
}
- node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
- if (IS_ERR(node))
- GOTO(out, retval = PTR_ERR(node));
-
- tree.lt_fd = LUSTRE_FPRIVATE(file);
- rc = ll_tree_lock(&tree, node, buf, count,
- file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
- if (rc != 0)
- GOTO(out, retval = rc);
+ tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
+ lock_start, lock_end, WRITE);
+ if (tree_locked < 0)
+ GOTO(out, retval = tree_locked);
/* This is ok, g_f_w will overwrite this under i_sem if it races
* with a local truncate, it just makes our maxbyte checking easier.
send_sig(SIGXFSZ, current, 0);
GOTO(out_unlock, retval = -EFBIG);
}
- if (*ppos + count > maxbytes)
- count = maxbytes - *ppos;
+ if (end > maxbytes - 1)
+ end = maxbytes - 1;
/* generic_file_write handles O_APPEND after getting i_mutex */
chunk = end - *ppos + 1;
CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
inode->i_ino, chunk, *ppos);
- retval = generic_file_write(file, buf, chunk, ppos);
- ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
+ if (tree_locked)
+ retval = generic_file_write(file, buf, chunk, ppos);
+ else
+ retval = ll_file_lockless_io(file, (char*)buf, chunk,
+ ppos, WRITE);
+ ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
out_unlock:
- ll_tree_unlock(&tree);
+ if (tree_locked)
+ ll_tree_unlock(&tree);
out:
if (retval > 0) {
if (rc != 0)
RETURN(rc);
+ ll_clear_file_contended(inode);
ll_inode_size_lock(inode, 1);
/*
* Consistency guarantees: following possibilities exist for the
#define LLI_INODE_DEAD 0xdeadd00d
#define LLI_F_HAVE_OST_SIZE_LOCK 0
#define LLI_F_HAVE_MDS_SIZE_LOCK 1
+#define LLI_F_CONTENDED 2
+#define LLI_F_SRVLOCK 3
struct ll_inode_info {
int lli_inode_magic;
__u64 lli_maxbytes;
__u64 lli_io_epoch;
unsigned long lli_flags;
+ cfs_time_t lli_contention_time;
/* this lock protects s_d_w and p_w_ll and mmap_cnt */
spinlock_t lli_lock;
#define LL_SBI_JOIN 0x20 /* support JOIN */
#define LL_SBI_LOCALFLOCK 0x40 /* Local flocks support by kernel */
+/* default value for ll_sb_info->contention_time */
+#define SBI_DEFAULT_CONTENTION_SECONDS 60
+
struct ll_sb_info {
struct list_head ll_list;
/* this protects pglist and ra_info. It isn't safe to
unsigned long ll_pglist_gen;
struct list_head ll_pglist; /* all pages (llap_pglist_item) */
+ unsigned ll_contention_time; /* seconds */
+
struct ll_ra_info ll_ra_info;
unsigned int ll_namelen;
struct file_operations *ll_fop;
LLAP_ORIGIN_COMMIT_WRITE,
LLAP_ORIGIN_WRITEPAGE,
LLAP_ORIGIN_REMOVEPAGE,
+ LLAP_ORIGIN_LOCKLESS_IO,
LLAP__ORIGIN_MAX,
};
extern char *llap_origins[];
void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras);
void ll_ra_accounting(struct ll_async_page *llap,struct address_space *mapping);
void ll_truncate(struct inode *inode);
+int ll_file_punch(struct inode *, loff_t, int);
+ssize_t ll_file_lockless_io(struct file *, char *, size_t, loff_t *, int);
+void ll_clear_file_contended(struct inode*);
int ll_sync_page_range(struct inode *, struct address_space *, loff_t, size_t);
/* llite/file.c */
SBI_DEFAULT_READAHEAD_MAX);
sbi->ll_ra_info.ra_max_read_ahead_whole_pages =
SBI_DEFAULT_READAHEAD_WHOLE_MAX;
-
+ sbi->ll_contention_time = SBI_DEFAULT_CONTENTION_SECONDS;
INIT_LIST_HEAD(&sbi->ll_conn_chain);
INIT_LIST_HEAD(&sbi->ll_orphan_dentry_list);
data->ocd_connect_flags = OBD_CONNECT_GRANT | OBD_CONNECT_VERSION |
OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE |
- OBD_CONNECT_CANCELSET;
+ OBD_CONNECT_SRVLOCK | OBD_CONNECT_CANCELSET;
CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d "
"ocd_grant: %d\n", data->ocd_connect_flags,
* last one is especially bad for racing o_append users on other
* nodes. */
if (ia_valid & ATTR_SIZE) {
+ int srvlock = !!(sbi->ll_lco.lco_flags & OBD_CONNECT_TRUNCLOCK);
ldlm_policy_data_t policy = { .l_extent = {attr->ia_size,
OBD_OBJECT_EOF } };
struct lustre_handle lockh = { 0 };
- int err, ast_flags = 0;
- /* XXX when we fix the AST intents to pass the discard-range
- * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA
- * XXX here. */
- if (attr->ia_size == 0)
- ast_flags = LDLM_AST_DISCARD_DATA;
+ int err;
+
+ if (srvlock) {
+ int flags = LDLM_FL_BLOCK_GRANTED;
+
+ rc = obd_match(ll_i2sbi(inode)->ll_osc_exp,
+ lsm, LDLM_EXTENT,
+ &policy, LCK_PW, &flags, inode,
+ &lockh);
+ if (rc < 0)
+ RETURN(rc);
+ if (rc == 1)
+ srvlock = 0;
+ }
UNLOCK_INODE_MUTEX(inode);
UP_WRITE_I_ALLOC_SEM(inode);
- rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy, &lockh,
- ast_flags);
+
+ if (srvlock) {
+ rc = ll_file_punch(inode, attr->ia_size, 1);
+ if (rc)
+ RETURN(rc);
+ } else {
+ int ast_flags = 0;
+
+ /* XXX when we fix the AST intents to pass the discard-range
+ * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA
+ * XXX here. */
+ if (attr->ia_size == 0)
+ ast_flags = LDLM_AST_DISCARD_DATA;
+
+ rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy,
+ &lockh, ast_flags);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
DOWN_WRITE_I_ALLOC_SEM(inode);
LOCK_INODE_MUTEX(inode);
LOCK_INODE_MUTEX(inode);
DOWN_WRITE_I_ALLOC_SEM(inode);
#endif
- if (rc != 0)
- RETURN(rc);
-
/* Only ll_inode_size_lock is taken at this level.
* lov_stripe_lock() is grabbed by ll_truncate() only over
* call to obd_adjust_kms(). If vmtruncate returns 0, then
* ll_truncate dropped ll_inode_size_lock() */
ll_inode_size_lock(inode, 0);
+ if (srvlock)
+ set_bit(LLI_F_SRVLOCK, &lli->lli_flags);
rc = vmtruncate(inode, attr->ia_size);
+ clear_bit(LLI_F_SRVLOCK, &lli->lli_flags);
if (rc != 0) {
LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
ll_inode_size_unlock(inode, 0);
}
- err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
- if (err) {
- CERROR("ll_extent_unlock failed: %d\n", err);
- if (!rc)
- rc = err;
+ if (!srvlock) {
+ err = ll_extent_unlock(NULL, inode, lsm,
+ LCK_PW, &lockh);
+ if (err) {
+ CERROR("ll_extent_unlock failed: %d\n", err);
+ if (!rc)
+ rc = err;
+ }
}
} else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) {
obd_flag flags;
[LLAP_ORIGIN_READAHEAD] = "ra",
[LLAP_ORIGIN_COMMIT_WRITE] = "cw",
[LLAP_ORIGIN_WRITEPAGE] = "wp",
+ [LLAP_ORIGIN_LOCKLESS_IO] = "ls"
};
struct ll_async_page *llite_pglist_next_llap(struct ll_sb_info *sbi,
RETURN(NULL);
}
+ ll_clear_file_contended(inode);
+
/* start and end the lock on the first and last bytes in the page */
policy_from_vma(&policy, vma, address, CFS_PAGE_SIZE);
return (ll_wr_track_id(buffer, count, data, STATS_TRACK_GID));
}
+static int ll_rd_contention_time(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct super_block *sb = data;
+
+ *eof = 1;
+ return snprintf(page, count, "%u\n", ll_s2sbi(sb)->ll_contention_time);
+
+}
+
+static int ll_wr_contention_time(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct super_block *sb = data;
+ struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+ return lprocfs_write_helper(buffer, count,&sbi->ll_contention_time) ?:
+ count;
+}
+
static struct lprocfs_vars lprocfs_obd_vars[] = {
{ "uuid", ll_rd_sb_uuid, 0, 0 },
//{ "mntpt_path", ll_rd_path, 0, 0 },
{ "stats_track_pid", ll_rd_track_pid, ll_wr_track_pid, 0 },
{ "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 },
{ "stats_track_gid", ll_rd_track_gid, ll_wr_track_gid, 0 },
+ { "contention_seconds", ll_rd_contention_time, ll_wr_contention_time, 0},
{ 0 }
};
"direct_read" },
{ LPROC_LL_DIRECT_WRITE, LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_PAGES,
"direct_write" },
+ { LPROC_LL_LOCKLESS_READ, LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_BYTES,
+ "lockless_read_bytes" },
+ { LPROC_LL_LOCKLESS_WRITE, LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_BYTES,
+ "lockless_write_bytes" },
};
RETURN(rc);
}
+int ll_file_punch(struct inode * inode, loff_t new_size, int srvlock)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct obd_info oinfo = { { { 0 } } };
+ struct obdo oa;
+ int rc;
+
+ ENTRY;
+ CDEBUG(D_INFO, "calling punch for "LPX64" (new size %Lu=%#Lx)\n",
+ lli->lli_smd->lsm_object_id, new_size, new_size);
+
+ oinfo.oi_md = lli->lli_smd;
+ oinfo.oi_policy.l_extent.start = new_size;
+ oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
+ oinfo.oi_oa = &oa;
+ oa.o_id = lli->lli_smd->lsm_object_id;
+ oa.o_valid = OBD_MD_FLID;
+ oa.o_flags = srvlock ? OBD_FL_TRUNCLOCK : 0;
+ obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |OBD_MD_FLFID|
+ OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME |
+ OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGENER |
+ OBD_MD_FLBLOCKS);
+ rc = obd_punch_rqset(ll_i2obdexp(inode), &oinfo, NULL);
+ if (rc) {
+ CERROR("obd_truncate fails (%d) ino %lu\n", rc, inode->i_ino);
+ RETURN(rc);
+ }
+ obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+ OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+ RETURN(0);
+}
/* this isn't where truncate starts. roughly:
* sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate. setattr_raw grabs
* DLM lock on [size, EOF], i_mutex, ->lli_size_sem, and WRITE_I_ALLOC_SEM to
void ll_truncate(struct inode *inode)
{
struct ll_inode_info *lli = ll_i2info(inode);
- struct obd_info oinfo = { { { 0 } } };
- struct ost_lvb lvb;
- struct obdo oa;
- int rc;
+ int srvlock = test_bit(LLI_F_SRVLOCK, &lli->lli_flags);
+ loff_t new_size;
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) to %Lu=%#Lx\n",inode->i_ino,
inode->i_generation, inode, inode->i_size, inode->i_size);
LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
- /* XXX I'm pretty sure this is a hack to paper over a more fundamental
- * race condition. */
- lov_stripe_lock(lli->lli_smd);
- inode_init_lvb(inode, &lvb);
- rc = obd_merge_lvb(ll_i2obdexp(inode), lli->lli_smd, &lvb, 0);
- oa.o_blocks = lvb.lvb_blocks;
- if (lvb.lvb_size == inode->i_size && rc == 0) {
- CDEBUG(D_VFSTRACE, "skipping punch for obj "LPX64", %Lu=%#Lx\n",
- lli->lli_smd->lsm_object_id,inode->i_size,inode->i_size);
+ if (!srvlock) {
+ struct ost_lvb lvb;
+ int rc;
+
+ /* XXX I'm pretty sure this is a hack to paper over a more fundamental
+ * race condition. */
+ lov_stripe_lock(lli->lli_smd);
+ inode_init_lvb(inode, &lvb);
+ rc = obd_merge_lvb(ll_i2obdexp(inode), lli->lli_smd, &lvb, 0);
+ inode->i_blocks = lvb.lvb_blocks;
+ if (lvb.lvb_size == inode->i_size && rc == 0) {
+ CDEBUG(D_VFSTRACE, "skipping punch for obj "LPX64", %Lu=%#Lx\n",
+ lli->lli_smd->lsm_object_id,inode->i_size,inode->i_size);
+ lov_stripe_unlock(lli->lli_smd);
+ GOTO(out_unlock, 0);
+ }
+
+ obd_adjust_kms(ll_i2obdexp(inode), lli->lli_smd, inode->i_size, 1);
lov_stripe_unlock(lli->lli_smd);
- GOTO(out_unlock, 0);
}
- obd_adjust_kms(ll_i2obdexp(inode), lli->lli_smd, inode->i_size, 1);
- lov_stripe_unlock(lli->lli_smd);
-
if (unlikely((ll_i2sbi(inode)->ll_flags & LL_SBI_CHECKSUM) &&
(inode->i_size & ~CFS_PAGE_MASK))) {
/* If the truncate leaves behind a partial page, update its
}
}
- CDEBUG(D_INFO, "calling punch for "LPX64" (new size %Lu=%#Lx)\n",
- lli->lli_smd->lsm_object_id, inode->i_size, inode->i_size);
-
- oinfo.oi_md = lli->lli_smd;
- oinfo.oi_policy.l_extent.start = inode->i_size;
- oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
- oinfo.oi_oa = &oa;
- oa.o_id = lli->lli_smd->lsm_object_id;
- oa.o_valid = OBD_MD_FLID;
-
- obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |OBD_MD_FLFID|
- OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME |
- OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGENER |
- OBD_MD_FLBLOCKS);
-
+ new_size = inode->i_size;
ll_inode_size_unlock(inode, 0);
+ if (!srvlock)
+ ll_file_punch(inode, new_size, 0);
- rc = obd_punch_rqset(ll_i2obdexp(inode), &oinfo, NULL);
- if (rc)
- CERROR("obd_truncate fails (%d) ino %lu\n", rc, inode->i_ino);
- else
- obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
- OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME);
EXIT;
return;
oig_release(oig);
RETURN(rc);
}
+
+static void ll_file_put_pages(struct page **pages, int numpages)
+{
+ int i;
+ struct page **pp;
+ ENTRY;
+
+ for (i = 0, pp = pages; i < numpages; i++, pp++) {
+ if (*pp) {
+ LL_CDEBUG_PAGE(D_PAGE, (*pp), "free\n");
+ ll_removepage(*pp);
+ if (page_private(*pp))
+ CERROR("the llap wasn't freed\n");
+ (*pp)->mapping = NULL;
+ if (page_count(*pp) != 1)
+ CERROR("page %p, flags %#lx, count %i, private %p\n",
+ (*pp), (*pp)->flags, page_count(*pp),
+ (void*)page_private(*pp));
+ __free_pages(*pp, 0);
+ }
+ }
+ OBD_FREE(pages, numpages * sizeof(struct page*));
+ EXIT;
+}
+
+static struct page **ll_file_prepare_pages(int numpages, struct inode *inode,
+ unsigned long first)
+{
+ struct page **pages;
+ int i;
+ int rc = 0;
+ ENTRY;
+
+ OBD_ALLOC(pages, sizeof(struct page *) * numpages);
+ if (pages == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+ for (i = 0; i < numpages; i++) {
+ struct page *page;
+ struct ll_async_page *llap;
+
+ page = alloc_pages(GFP_HIGHUSER, 0);
+ if (page == NULL)
+ GOTO(err, rc = -ENOMEM);
+ pages[i] = page;
+ /* llap_from_page needs page index and mapping to be set */
+ page->index = first++;
+ page->mapping = inode->i_mapping;
+ llap = llap_from_page(page, LLAP_ORIGIN_LOCKLESS_IO);
+ if (IS_ERR(llap))
+ GOTO(err, rc = PTR_ERR(llap));
+ }
+ RETURN(pages);
+err:
+ ll_file_put_pages(pages, numpages);
+ RETURN(ERR_PTR(rc));
+ }
+
+static ssize_t ll_file_copy_pages(struct page **pages, int numpages,
+ char *buf, loff_t pos, size_t count, int rw)
+{
+ ssize_t amount = 0;
+ int i;
+ ENTRY;
+
+ for (i = 0; i < numpages; i++) {
+ unsigned offset, bytes, left;
+ char *vaddr;
+
+ vaddr = kmap(pages[i]);
+ offset = pos & (CFS_PAGE_SIZE - 1);
+ bytes = min_t(unsigned, CFS_PAGE_SIZE - offset, count);
+ LL_CDEBUG_PAGE(D_PAGE, pages[i], "op = %s, addr = %p, "
+ "buf = %p, bytes = %u\n",
+ (rw == WRITE) ? "CFU" : "CTU",
+ vaddr + offset, buf, bytes);
+ if (rw == WRITE)
+ left = copy_from_user(vaddr + offset, buf, bytes);
+ else
+ left = copy_to_user(buf, vaddr + offset, bytes);
+ kunmap(pages[i]);
+ amount += bytes;
+ if (left) {
+ amount -= left;
+ break;
+ }
+ buf += bytes;
+ count -= bytes;
+ pos += bytes;
+ }
+ if (amount == 0)
+ RETURN(-EFAULT);
+ RETURN(amount);
+}
+
+static int ll_file_oig_pages(struct inode * inode, struct page **pages,
+ int numpages, loff_t pos, size_t count, int rw)
+{
+ struct obd_io_group *oig;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct obd_export *exp;
+ loff_t org_pos = pos;
+ obd_flag brw_flags;
+ int rc;
+ int i;
+ ENTRY;
+
+ exp = ll_i2obdexp(inode);
+ if (exp == NULL)
+ RETURN(-EINVAL);
+ rc = oig_init(&oig);
+ if (rc)
+ RETURN(rc);
+ brw_flags = OBD_BRW_SRVLOCK;
+ if (capable(CAP_SYS_RESOURCE))
+ brw_flags |= OBD_BRW_NOQUOTA;
+
+ for (i = 0; i < numpages; i++) {
+ struct ll_async_page *llap;
+ unsigned from, bytes;
+
+ from = pos & (CFS_PAGE_SIZE - 1);
+ bytes = min_t(unsigned, CFS_PAGE_SIZE - from,
+ count - pos + org_pos);
+ llap = llap_cast_private(pages[i]);
+ LASSERT(llap);
+
+ lock_page(pages[i]);
+
+ LL_CDEBUG_PAGE(D_PAGE, pages[i], "offset "LPU64","
+ " from %u, bytes = %u\n",
+ pos, from, bytes);
+ LASSERTF(pos >> CFS_PAGE_SHIFT == pages[i]->index,
+ "wrong page index %lu (%lu)\n",
+ pages[i]->index,
+ (unsigned long)(pos >> CFS_PAGE_SHIFT));
+ rc = obd_queue_group_io(exp, lli->lli_smd, NULL, oig,
+ llap->llap_cookie,
+ (rw == WRITE) ?
+ OBD_BRW_WRITE:OBD_BRW_READ,
+ from, bytes, brw_flags,
+ ASYNC_READY | ASYNC_URGENT |
+ ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
+ if (rc) {
+ i++;
+ GOTO(out, rc);
+ }
+ pos += bytes;
+ }
+ rc = obd_trigger_group_io(exp, lli->lli_smd, NULL, oig);
+ if (rc)
+ GOTO(out, rc);
+ rc = oig_wait(oig);
+out:
+ while(--i >= 0)
+ unlock_page(pages[i]);
+ oig_release(oig);
+ RETURN(rc);
+}
+
+ssize_t ll_file_lockless_io(struct file *file, char *buf, size_t count,
+ loff_t *ppos, int rw)
+{
+ loff_t pos;
+ struct inode *inode = file->f_dentry->d_inode;
+ ssize_t rc = 0;
+ int max_pages;
+ size_t amount = 0;
+ unsigned long first, last;
+ ENTRY;
+
+ if (rw == READ) {
+ loff_t isize;
+
+ ll_inode_size_lock(inode, 0);
+ isize = inode->i_size;
+ ll_inode_size_unlock(inode, 0);
+ if (*ppos >= isize)
+ GOTO(out, rc = 0);
+ if (*ppos + count >= isize)
+ count -= *ppos + count - isize;
+ if (count == 0)
+ GOTO(out, rc);
+ } else {
+ rc = generic_write_checks(file, ppos, &count, 0);
+ if (rc)
+ GOTO(out, rc);
+ rc = remove_suid(file->f_dentry);
+ if (rc)
+ GOTO(out, rc);
+ }
+ pos = *ppos;
+ first = pos >> CFS_PAGE_SHIFT;
+ last = (pos + count - 1) >> CFS_PAGE_SHIFT;
+ max_pages = PTLRPC_MAX_BRW_PAGES *
+ ll_i2info(inode)->lli_smd->lsm_stripe_count;
+ CDEBUG(D_INFO, "%u, stripe_count = %u\n",
+ PTLRPC_MAX_BRW_PAGES /* max_pages_per_rpc */,
+ ll_i2info(inode)->lli_smd->lsm_stripe_count);
+
+ while (first <= last && rc >= 0) {
+ int pages_for_io;
+ struct page **pages;
+ size_t bytes = count - amount;
+
+ pages_for_io = min_t(int, last - first + 1, max_pages);
+ pages = ll_file_prepare_pages(pages_for_io, inode, first);
+ if (IS_ERR(pages)) {
+ rc = PTR_ERR(pages);
+ break;
+ }
+ if (rw == WRITE) {
+ rc = ll_file_copy_pages(pages, pages_for_io, buf,
+ pos + amount, bytes, rw);
+ if (rc < 0)
+ GOTO(put_pages, rc);
+ bytes = rc;
+ }
+ rc = ll_file_oig_pages(inode, pages, pages_for_io,
+ pos + amount, bytes, rw);
+ if (rc)
+ GOTO(put_pages, rc);
+ if (rw == READ) {
+ rc = ll_file_copy_pages(pages, pages_for_io, buf,
+ pos + amount, bytes, rw);
+ if (rc < 0)
+ GOTO(put_pages, rc);
+ bytes = rc;
+ }
+ amount += bytes;
+ buf += bytes;
+put_pages:
+ ll_file_put_pages(pages, pages_for_io);
+ first += pages_for_io;
+ /* a short read/write check */
+ if (pos + amount < ((loff_t)first << CFS_PAGE_SHIFT))
+ break;
+ }
+ /* NOTE: don't update i_size and KMS in absence of LDLM locks even
+ * write makes the file large */
+ file_accessed(file);
+ if (rw == READ && amount < count && rc == 0) {
+ unsigned long not_cleared;
+
+ not_cleared = clear_user(buf, count - amount);
+ amount = count - not_cleared;
+ if (not_cleared)
+ rc = -EFAULT;
+ }
+ if (amount > 0) {
+ lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
+ (rw == WRITE) ?
+ LPROC_LL_LOCKLESS_WRITE :
+ LPROC_LL_LOCKLESS_READ,
+ (long)amount);
+ *ppos += amount;
+ RETURN(amount);
+ }
+out:
+ RETURN(rc);
+}
CFS_LIST_HEAD(rpc_list);
unsigned int ending_offset;
unsigned starting_offset = 0;
+ int srvlock = 0;
ENTRY;
/* first we find the pages we're allowed to work with */
LASSERT(oap->oap_magic == OAP_MAGIC);
+ if (page_count != 0 &&
+ srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
+ CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
+ " oap %p, page %p, srvlock %u\n",
+ oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
+ break;
+ }
/* in llite being 'ready' equates to the page being locked
* until completion unlocks it. commit_write submits a page
* as not ready because its unlock will happen unconditionally
/* now put the page back in our accounting */
list_add_tail(&oap->oap_rpc_item, &rpc_list);
+ if (page_count == 0)
+ srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
if (++page_count >= cli->cl_max_pages_per_rpc)
break;