From: zam Date: Thu, 8 May 2008 07:37:08 +0000 (+0000) Subject: b=11270 X-Git-Tag: v1_9_50~529 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=e628a59ade003f281800faf3553ac5930e05cb8c b=11270 i=vitaly.vertman i=oleg.drokin Lockless i/o and lockless truncate code and sanityN tests. --- diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 7913dba..85f27fb 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -964,6 +964,12 @@ Details : Change the structure of stats under obdfilter and mds to The "uuid"s file would list the uuids of _active_ exports. And the clear entry is to clear all stats and stale nids. +Severity : enhancement +Bugzilla : 11270 +Description: eliminate client locks in face of contention +Details : file contention detection and lockless i/o implementation + for contended files. + -------------------------------------------------------------------------------- 2007-08-10 Cluster File Systems, Inc. diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index 20dea87..d3c1504 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -47,6 +47,7 @@ enum { LPROC_LL_FSYNC, LPROC_LL_SETATTR, LPROC_LL_TRUNC, + LPROC_LL_LOCKLESS_TRUNC, LPROC_LL_FLOCK, LPROC_LL_GETATTR, LPROC_LL_STAFS, @@ -58,6 +59,8 @@ enum { LPROC_LL_INODE_PERM, LPROC_LL_DIRECT_READ, LPROC_LL_DIRECT_WRITE, + LPROC_LL_LOCKLESS_READ, + LPROC_LL_LOCKLESS_WRITE, LPROC_LL_FILE_OPCODES }; diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 852da26..34d17de 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -132,6 +132,9 @@ typedef enum { * w/o involving separate thread. in order to decrease cs rate */ #define LDLM_FL_ATOMIC_CB 0x4000000 +/* Cancel lock asynchronously. See ldlm_cli_cancel_unused_resource. */ +#define LDLM_FL_ASYNC 0x8000000 + /* It may happen that a client initiate 2 operations, e.g. unlink and mkdir, * such that server send blocking ast for conflict locks to this client for * the 1st operation, whereas the 2nd operation has canceled this lock and @@ -145,8 +148,8 @@ typedef enum { #define LDLM_FL_BL_AST 0x10000000 #define LDLM_FL_BL_DONE 0x20000000 -/* Cancel lock asynchronously. See ldlm_cli_cancel_unused_resource. */ -#define LDLM_FL_ASYNC 0x40000000 +/* measure lock contention and return -EUSERS if locking contention is high */ +#define LDLM_FL_DENY_ON_CONTENTION 0x40000000 /* The blocking callback is overloaded to perform two functions. These flags * indicate which operation should be performed. */ @@ -287,6 +290,12 @@ typedef enum { * others (including ibits locks) will be canceled on memory pressure event. */ #define LDLM_LOCK_SHRINK_THUMB 256 +/* default values for the "max_nolock_size", "contention_time" + * and "contended_locks" namespace tunables */ +#define NS_DEFAULT_MAX_NOLOCK_BYTES 0 +#define NS_DEFAULT_CONTENTION_SECONDS 2 +#define NS_DEFAULT_CONTENDED_LOCKS 32 + struct ldlm_namespace { char *ns_name; ldlm_side_t ns_client; /* is this a client-side lock tree? */ @@ -321,6 +330,14 @@ struct ldlm_namespace { cfs_waitq_t ns_waitq; struct ldlm_pool ns_pool; ldlm_appetite_t ns_appetite; + /* if more than @ns_contented_locks found, the resource considered + * as contended */ + unsigned ns_contended_locks; + /* the resource remembers contended state during @ns_contention_time, + * in seconds */ + unsigned ns_contention_time; + /* limit size of nolock requests, in bytes */ + unsigned ns_max_nolock_size; }; static inline int ns_is_client(struct ldlm_namespace *ns) @@ -486,6 +503,9 @@ struct ldlm_resource { struct semaphore lr_lvb_sem; __u32 lr_lvb_len; void *lr_lvb_data; + + /* when the resource was considered as contended */ + cfs_time_t lr_contention_time; }; struct ldlm_ast_work { diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index f1f88ce..2e6aa1a 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -27,10 +27,14 @@ #define DEBUG_SUBSYSTEM S_LDLM #ifndef __KERNEL__ # include +#else +# include +# include #endif #include #include +#include #include #include "ldlm_internal.h" @@ -259,10 +263,23 @@ static void ldlm_extent_policy(struct ldlm_resource *res, } } +static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks) +{ + struct ldlm_resource *res = lock->l_resource; + cfs_time_t now = cfs_time_current(); + + CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks); + if (contended_locks > res->lr_namespace->ns_contended_locks) + res->lr_contention_time = now; + return cfs_time_before(now, cfs_time_add(res->lr_contention_time, + cfs_time_seconds(res->lr_namespace->ns_contention_time))); +} + struct ldlm_extent_compat_args { struct list_head *work_list; struct ldlm_lock *lock; ldlm_mode_t mode; + int *locks; int *compat; }; @@ -271,9 +288,11 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n, { struct ldlm_extent_compat_args *priv = data; struct ldlm_interval *node = to_ldlm_interval(n); + struct ldlm_extent *extent; struct list_head *work_list = priv->work_list; struct ldlm_lock *lock, *enq = priv->lock; ldlm_mode_t mode = priv->mode; + int count = 0; ENTRY; LASSERT(!list_empty(&node->li_group)); @@ -284,11 +303,17 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n, "mode = %s, lock->l_granted_mode = %s\n", ldlm_lockname[mode], ldlm_lockname[lock->l_granted_mode]); - + count++; if (lock->l_blocking_ast) ldlm_add_ast_work_item(lock, enq, work_list); } + /* don't count conflicting glimpse locks */ + extent = ldlm_interval_extent(node); + if (!(mode == LCK_PR && + extent->start == 0 && extent->end == OBD_OBJECT_EOF)) + *priv->locks += count; + if (priv->compat) *priv->compat = 0; @@ -307,7 +332,7 @@ static enum interval_iter ldlm_extent_compat_cb(struct interval_node *n, static int ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, int *flags, ldlm_error_t *err, - struct list_head *work_list) + struct list_head *work_list, int *contended_locks) { struct list_head *tmp; struct ldlm_lock *lock; @@ -317,6 +342,7 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, __u64 req_end = req->l_req_extent.end; int compat = 1; int scan = 0; + int check_contention; ENTRY; lockmode_verify(req_mode); @@ -326,6 +352,7 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, struct ldlm_interval_tree *tree; struct ldlm_extent_compat_args data = {.work_list = work_list, .lock = req, + .locks = contended_locks, .compat = &compat }; struct interval_node_extent ex = { .start = req_start, .end = req_end }; @@ -382,157 +409,179 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, compat = 0; } } - RETURN(compat); - } + } else { /* for waiting queue */ + list_for_each(tmp, queue) { + check_contention = 1; + + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + + if (req == lock) + break; + + if (unlikely(scan)) { + /* We only get here if we are queuing GROUP lock + and met some incompatible one. The main idea of this + code is to insert GROUP lock past compatible GROUP + lock in the waiting queue or if there is not any, + then in front of first non-GROUP lock */ + if (lock->l_req_mode != LCK_GROUP) { + /* Ok, we hit non-GROUP lock, there should + * be no more GROUP locks later on, queue in + * front of first non-GROUP lock */ + + ldlm_resource_insert_lock_after(lock, req); + list_del_init(&lock->l_res_link); + ldlm_resource_insert_lock_after(req, lock); + compat = 0; + break; + } + if (req->l_policy_data.l_extent.gid == + lock->l_policy_data.l_extent.gid) { + /* found it */ + ldlm_resource_insert_lock_after(lock, req); + compat = 0; + break; + } + continue; + } - /* for waiting queue */ - list_for_each(tmp, queue) { - lock = list_entry(tmp, struct ldlm_lock, l_res_link); + /* locks are compatible, overlap doesn't matter */ + if (lockmode_compat(lock->l_req_mode, req_mode)) { + if (req_mode == LCK_PR && + ((lock->l_policy_data.l_extent.start <= + req->l_policy_data.l_extent.start) && + (lock->l_policy_data.l_extent.end >= + req->l_policy_data.l_extent.end))) { + /* If we met a PR lock just like us or wider, + and nobody down the list conflicted with + it, that means we can skip processing of + the rest of the list and safely place + ourselves at the end of the list, or grant + (dependent if we met an conflicting locks + before in the list). + In case of 1st enqueue only we continue + traversing if there is something conflicting + down the list because we need to make sure + that something is marked as AST_SENT as well, + in cse of empy worklist we would exit on + first conflict met. */ + /* There IS a case where such flag is + not set for a lock, yet it blocks + something. Luckily for us this is + only during destroy, so lock is + exclusive. So here we are safe */ + if (!(lock->l_flags & LDLM_FL_AST_SENT)) { + RETURN(compat); + } + } - if (req == lock) - RETURN(compat); - - if (unlikely(scan)) { - /* We only get here if we are queuing GROUP lock - and met some incompatible one. The main idea of this - code is to insert GROUP lock past compatible GROUP - lock in the waiting queue or if there is not any, - then in front of first non-GROUP lock */ - if (lock->l_req_mode != LCK_GROUP) { - /* Ok, we hit non-GROUP lock, there should - * be no more GROUP locks later on, queue in - * front of first non-GROUP lock */ - - ldlm_resource_insert_lock_after(lock, req); - list_del_init(&lock->l_res_link); - ldlm_resource_insert_lock_after(req, lock); - RETURN(0); - } - if (req->l_policy_data.l_extent.gid == - lock->l_policy_data.l_extent.gid) { - /* found it */ - ldlm_resource_insert_lock_after(lock, req); - RETURN(0); - } - continue; - } + /* non-group locks are compatible, overlap doesn't + matter */ + if (likely(req_mode != LCK_GROUP)) + continue; - /* locks are compatible, overlap doesn't matter */ - if (lockmode_compat(lock->l_req_mode, req_mode)) { - if (req_mode == LCK_PR && - ((lock->l_policy_data.l_extent.start <= - req->l_policy_data.l_extent.start) && - (lock->l_policy_data.l_extent.end >= - req->l_policy_data.l_extent.end))) { - /* If we met a PR lock just like us or wider, - and nobody down the list conflicted with - it, that means we can skip processing of - the rest of the list and safely place - ourselves at the end of the list, or grant - (dependent if we met an conflicting locks - before in the list). - In case of 1st enqueue only we continue - traversing if there is something conflicting - down the list because we need to make sure - that something is marked as AST_SENT as well, - in cse of empy worklist we would exit on - first conflict met. */ - /* There IS a case where such flag is - not set for a lock, yet it blocks - something. Luckily for us this is - only during destroy, so lock is - exclusive. So here we are safe */ - if (!(lock->l_flags & LDLM_FL_AST_SENT)) { - RETURN(compat); + /* If we are trying to get a GROUP lock and there is + another one of this kind, we need to compare gid */ + if (req->l_policy_data.l_extent.gid == + lock->l_policy_data.l_extent.gid) { + /* If existing lock with matched gid is granted, + we grant new one too. */ + if (lock->l_req_mode == lock->l_granted_mode) + RETURN(2); + + /* Otherwise we are scanning queue of waiting + * locks and it means current request would + * block along with existing lock (that is + * already blocked. + * If we are in nonblocking mode - return + * immediately */ + if (*flags & LDLM_FL_BLOCK_NOWAIT) { + compat = -EWOULDBLOCK; + goto destroylock; + } + /* If this group lock is compatible with another + * group lock on the waiting list, they must be + * together in the list, so they can be granted + * at the same time. Otherwise the later lock + * can get stuck behind another, incompatible, + * lock. */ + ldlm_resource_insert_lock_after(lock, req); + /* Because 'lock' is not granted, we can stop + * processing this queue and return immediately. + * There is no need to check the rest of the + * list. */ + RETURN(0); } } - /* non-group locks are compatible, overlap doesn't - matter */ - if (likely(req_mode != LCK_GROUP)) + if (unlikely(req_mode == LCK_GROUP && + (lock->l_req_mode != lock->l_granted_mode))) { + scan = 1; + compat = 0; + if (lock->l_req_mode != LCK_GROUP) { + /* Ok, we hit non-GROUP lock, there should be no + more GROUP locks later on, queue in front of + first non-GROUP lock */ + + ldlm_resource_insert_lock_after(lock, req); + list_del_init(&lock->l_res_link); + ldlm_resource_insert_lock_after(req, lock); + break; + } + if (req->l_policy_data.l_extent.gid == + lock->l_policy_data.l_extent.gid) { + /* found it */ + ldlm_resource_insert_lock_after(lock, req); + break; + } continue; + } - /* If we are trying to get a GROUP lock and there is - another one of this kind, we need to compare gid */ - if (req->l_policy_data.l_extent.gid == - lock->l_policy_data.l_extent.gid) { - /* If existing lock with matched gid is granted, - we grant new one too. */ - if (lock->l_req_mode == lock->l_granted_mode) - RETURN(2); - - /* Otherwise we are scanning queue of waiting - * locks and it means current request would - * block along with existing lock (that is - * already blocked. - * If we are in nonblocking mode - return - * immediately */ + if (unlikely(lock->l_req_mode == LCK_GROUP)) { + /* If compared lock is GROUP, then requested is PR/PW/ + * so this is not compatible; extent range does not + * matter */ if (*flags & LDLM_FL_BLOCK_NOWAIT) { compat = -EWOULDBLOCK; goto destroylock; + } else { + *flags |= LDLM_FL_NO_TIMEOUT; } - /* If this group lock is compatible with another - * group lock on the waiting list, they must be - * together in the list, so they can be granted - * at the same time. Otherwise the later lock - * can get stuck behind another, incompatible, - * lock. */ - ldlm_resource_insert_lock_after(lock, req); - /* Because 'lock' is not granted, we can stop - * processing this queue and return immediately. - * There is no need to check the rest of the - * list. */ - RETURN(0); + } else if (lock->l_policy_data.l_extent.end < req_start || + lock->l_policy_data.l_extent.start > req_end) { + /* if a non group lock doesn't overlap skip it */ + continue; + } else if (lock->l_req_extent.end < req_start || + lock->l_req_extent.start > req_end) { + /* false contention, the requests doesn't really overlap */ + check_contention = 0; } - } - if (unlikely(req_mode == LCK_GROUP && - (lock->l_req_mode != lock->l_granted_mode))) { - scan = 1; - compat = 0; - if (lock->l_req_mode != LCK_GROUP) { - /* Ok, we hit non-GROUP lock, there should be no - more GROUP locks later on, queue in front of - first non-GROUP lock */ - - ldlm_resource_insert_lock_after(lock, req); - list_del_init(&lock->l_res_link); - ldlm_resource_insert_lock_after(req, lock); + if (!work_list) RETURN(0); - } - if (req->l_policy_data.l_extent.gid == - lock->l_policy_data.l_extent.gid) { - /* found it */ - ldlm_resource_insert_lock_after(lock, req); - RETURN(0); - } - continue; - } - if (unlikely(lock->l_req_mode == LCK_GROUP)) { - /* If compared lock is GROUP, then requested is PR/PW/ - * so this is not compatible; extent range does not - * matter */ - if (*flags & LDLM_FL_BLOCK_NOWAIT) { - compat = -EWOULDBLOCK; - goto destroylock; - } else { - *flags |= LDLM_FL_NO_TIMEOUT; - } - } else if (lock->l_policy_data.l_extent.end < req_start || - lock->l_policy_data.l_extent.start > req_end) { - /* if a non group lock doesn't overlap skip it */ - continue; - } + /* don't count conflicting glimpse locks */ + if (lock->l_req_mode == LCK_PR && + lock->l_policy_data.l_extent.start == 0 && + lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF) + check_contention = 0; - if (!work_list) - RETURN(0); + *contended_locks += check_contention; - compat = 0; - if (lock->l_blocking_ast) - ldlm_add_ast_work_item(lock, req, work_list); + compat = 0; + if (lock->l_blocking_ast) + ldlm_add_ast_work_item(lock, req, work_list); + } } + if (ldlm_check_contention(req, *contended_locks) && + compat == 0 && + (*flags & LDLM_FL_DENY_ON_CONTENTION) && + req->l_req_mode != LCK_GROUP && + req_end - req_start <= + req->l_resource->lr_namespace->ns_max_nolock_size) + GOTO(destroylock, compat = -EUSERS); + RETURN(compat); destroylock: list_del_init(&req->l_res_link); @@ -541,6 +590,27 @@ destroylock: RETURN(compat); } +static void discard_bl_list(struct list_head *bl_list) +{ + struct list_head *tmp, *pos; + ENTRY; + + list_for_each_safe(pos, tmp, bl_list) { + struct ldlm_lock *lock = + list_entry(pos, struct ldlm_lock, l_bl_ast); + + list_del_init(&lock->l_bl_ast); + LASSERT(lock->l_flags & LDLM_FL_AST_SENT); + lock->l_flags &= ~LDLM_FL_AST_SENT; + LASSERT(lock->l_bl_ast_run == 0); + LASSERT(lock->l_blocking_lock); + LDLM_LOCK_PUT(lock->l_blocking_lock); + lock->l_blocking_lock = NULL; + LDLM_LOCK_PUT(lock); + } + EXIT; +} + /* If first_enq is 0 (ie, called from ldlm_reprocess_queue): * - blocking ASTs have already been sent * - must call this function with the ns lock held @@ -554,9 +624,12 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, struct ldlm_resource *res = lock->l_resource; struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list); int rc, rc2; + int contended_locks = 0; ENTRY; LASSERT(list_empty(&res->lr_converting)); + LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) || + !(lock->l_flags & LDLM_AST_DISCARD_DATA)); check_res_locked(res); *err = ELDLM_OK; @@ -568,10 +641,11 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, * being true, we want to find out. */ LASSERT(*flags == 0); rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, - err, NULL); + err, NULL, &contended_locks); if (rc == 1) { rc = ldlm_extent_compat_queue(&res->lr_waiting, lock, - flags, err, NULL); + flags, err, NULL, + &contended_locks); } if (rc == 0) RETURN(LDLM_ITER_STOP); @@ -585,13 +659,16 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, } restart: - rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list); + contended_locks = 0; + rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, + &rpc_list, &contended_locks); if (rc < 0) GOTO(out, rc); /* lock was destroyed */ if (rc == 2) goto grant; - rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, &rpc_list); + rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, + &rpc_list, &contended_locks); if (rc2 < 0) GOTO(out, rc = rc2); /* lock was destroyed */ @@ -636,8 +713,12 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, *flags |= LDLM_FL_NO_TIMEOUT; } - rc = 0; + RETURN(0); out: + if (!list_empty(&rpc_list)) { + LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA)); + discard_bl_list(&rpc_list); + } RETURN(rc); } diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index b39d2e9..151c513 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -269,6 +269,27 @@ void ldlm_proc_namespace(struct ldlm_namespace *ns) lock_vars[0].read_fptr = lprocfs_rd_uint; lock_vars[0].write_fptr = lprocfs_wr_uint; lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); + + snprintf(lock_name, MAX_STRING_SIZE, "%s/max_nolock_bytes", + ns->ns_name); + lock_vars[0].data = &ns->ns_max_nolock_size; + lock_vars[0].read_fptr = lprocfs_rd_uint; + lock_vars[0].write_fptr = lprocfs_wr_uint; + lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); + + snprintf(lock_name, MAX_STRING_SIZE, "%s/contention_seconds", + ns->ns_name); + lock_vars[0].data = &ns->ns_contention_time; + lock_vars[0].read_fptr = lprocfs_rd_uint; + lock_vars[0].write_fptr = lprocfs_wr_uint; + lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); + + snprintf(lock_name, MAX_STRING_SIZE, "%s/contended_locks", + ns->ns_name); + lock_vars[0].data = &ns->ns_contended_locks; + lock_vars[0].read_fptr = lprocfs_rd_uint; + lock_vars[0].write_fptr = lprocfs_wr_uint; + lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); } } #undef MAX_STRING_SIZE @@ -314,6 +335,9 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client, atomic_set(&ns->ns_locks, 0); ns->ns_resources = 0; cfs_waitq_init(&ns->ns_waitq); + ns->ns_max_nolock_size = NS_DEFAULT_MAX_NOLOCK_BYTES; + ns->ns_contention_time = NS_DEFAULT_CONTENTION_SECONDS; + ns->ns_contended_locks = NS_DEFAULT_CONTENDED_LOCKS; for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash; bucket--) diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 05aaa59..80ac03b 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -1317,6 +1317,97 @@ int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode, RETURN(rc); } +static void ll_set_file_contended(struct inode *inode) +{ + struct ll_inode_info *lli = ll_i2info(inode); + cfs_time_t now = cfs_time_current(); + + spin_lock(&lli->lli_lock); + lli->lli_contention_time = now; + lli->lli_flags |= LLIF_CONTENDED; + spin_unlock(&lli->lli_lock); +} + +void ll_clear_file_contended(struct inode *inode) +{ + struct ll_inode_info *lli = ll_i2info(inode); + + spin_lock(&lli->lli_lock); + lli->lli_flags &= ~LLIF_CONTENDED; + spin_unlock(&lli->lli_lock); +} + +static int ll_is_file_contended(struct file *file) +{ + struct inode *inode = file->f_dentry->d_inode; + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ll_file_data *fd = LUSTRE_FPRIVATE(file); + ENTRY; + + if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) { + CDEBUG(D_INFO, "the server does not support SRVLOCK feature," + " osc connect flags = 0x"LPX64"\n", + sbi->ll_lco.lco_flags); + RETURN(0); + } + if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) + RETURN(1); + if (lli->lli_flags & LLIF_CONTENDED) { + cfs_time_t cur_time = cfs_time_current(); + cfs_time_t retry_time; + + retry_time = cfs_time_add( + lli->lli_contention_time, + cfs_time_seconds(sbi->ll_contention_time)); + if (cfs_time_after(cur_time, retry_time)) { + ll_clear_file_contended(inode); + RETURN(0); + } + RETURN(1); + } + RETURN(0); +} + +static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file, + const char *buf, size_t count, + loff_t start, loff_t end, int rw) +{ + int append; + int tree_locked = 0; + int rc; + struct inode * inode = file->f_dentry->d_inode; + ENTRY; + + append = (rw == WRITE) && (file->f_flags & O_APPEND); + + if (append || !ll_is_file_contended(file)) { + struct ll_lock_tree_node *node; + int ast_flags; + + ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION; + if (file->f_flags & O_NONBLOCK) + ast_flags |= LDLM_FL_BLOCK_NOWAIT; + node = ll_node_from_inode(inode, start, end, + (rw == WRITE) ? LCK_PW : LCK_PR); + if (IS_ERR(node)) { + rc = PTR_ERR(node); + GOTO(out, rc); + } + tree->lt_fd = LUSTRE_FPRIVATE(file); + rc = ll_tree_lock(tree, node, buf, count, ast_flags); + if (rc == 0) + tree_locked = 1; + else if (rc == -EUSERS) + ll_set_file_contended(inode); + else + GOTO(out, rc); + } + RETURN(tree_locked); +out: + return rc; +} + static ssize_t ll_file_read(struct file *file, char *buf, size_t count, loff_t *ppos) { @@ -1325,12 +1416,12 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count, struct lov_stripe_md *lsm = lli->lli_smd; struct ll_sb_info *sbi = ll_i2sbi(inode); struct ll_lock_tree tree; - struct ll_lock_tree_node *node; struct ost_lvb lvb; struct ll_ra_read bead; - int rc, ra = 0; + int ra = 0; loff_t end; ssize_t retval, chunk, sum = 0; + int tree_locked; __u64 kms; ENTRY; @@ -1368,7 +1459,6 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count, RETURN(-EFAULT); RETURN(count); } - repeat: if (sbi->ll_max_rw_chunk != 0) { /* first, let's know the end of the current stripe */ @@ -1387,16 +1477,10 @@ repeat: end = *ppos + count - 1; } - node = ll_node_from_inode(inode, *ppos, end, LCK_PR); - if (IS_ERR(node)){ - GOTO(out, retval = PTR_ERR(node)); - } - - tree.lt_fd = LUSTRE_FPRIVATE(file); - rc = ll_tree_lock(&tree, node, buf, count, - file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0); - if (rc != 0) - GOTO(out, retval = rc); + tree_locked = ll_file_get_tree_lock(&tree, file, buf, + count, *ppos, end, READ); + if (tree_locked < 0) + GOTO(out, retval = tree_locked); ll_inode_size_lock(inode, 1); /* @@ -1427,7 +1511,8 @@ repeat: ll_inode_size_unlock(inode, 1); retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED); if (retval) { - ll_tree_unlock(&tree); + if (tree_locked) + ll_tree_unlock(&tree); goto out; } } else { @@ -1446,23 +1531,27 @@ repeat: CDEBUG(D_INODE, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n", inode->i_ino, chunk, *ppos, i_size_read(inode)); - /* turn off the kernel's read-ahead */ - file->f_ra.ra_pages = 0; + if (tree_locked) { + /* turn off the kernel's read-ahead */ + file->f_ra.ra_pages = 0; - /* initialize read-ahead window once per syscall */ - if (ra == 0) { - ra = 1; - bead.lrr_start = *ppos >> CFS_PAGE_SHIFT; - bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT; - ll_ra_read_in(file, &bead); - } + /* initialize read-ahead window once per syscall */ + if (ra == 0) { + ra = 1; + bead.lrr_start = *ppos >> CFS_PAGE_SHIFT; + bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT; + ll_ra_read_in(file, &bead); + } - /* BUG: 5972 */ - file_accessed(file); - retval = generic_file_read(file, buf, chunk, ppos); - ll_rw_stats_tally(sbi, current->pid, file, count, 0); + /* BUG: 5972 */ + file_accessed(file); + retval = generic_file_read(file, buf, chunk, ppos); + ll_tree_unlock(&tree); + } else { + retval = ll_file_lockless_io(file, buf, chunk, ppos, READ); + } - ll_tree_unlock(&tree); + ll_rw_stats_tally(sbi, current->pid, file, chunk, 0); if (retval > 0) { buf += retval; @@ -1489,11 +1578,10 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count, struct ll_sb_info *sbi = ll_i2sbi(inode); struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; struct ll_lock_tree tree; - struct ll_lock_tree_node *node; loff_t maxbytes = ll_file_maxbytes(inode); loff_t lock_start, lock_end, end; ssize_t retval, chunk, sum = 0; - int rc; + int tree_locked; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n", @@ -1541,16 +1629,11 @@ repeat: lock_start = *ppos; lock_end = *ppos + count - 1; } - node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW); - - if (IS_ERR(node)) - GOTO(out, retval = PTR_ERR(node)); - tree.lt_fd = LUSTRE_FPRIVATE(file); - rc = ll_tree_lock(&tree, node, buf, count, - file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0); - if (rc != 0) - GOTO(out, retval = rc); + tree_locked = ll_file_get_tree_lock(&tree, file, buf, count, + lock_start, lock_end, WRITE); + if (tree_locked < 0) + GOTO(out, retval = tree_locked); /* This is ok, g_f_w will overwrite this under i_sem if it races * with a local truncate, it just makes our maxbyte checking easier. @@ -1565,18 +1648,23 @@ repeat: send_sig(SIGXFSZ, current, 0); GOTO(out_unlock, retval = -EFBIG); } - if (*ppos + count > maxbytes) - count = maxbytes - *ppos; + if (end > maxbytes - 1) + end = maxbytes - 1; /* generic_file_write handles O_APPEND after getting i_mutex */ chunk = end - *ppos + 1; CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n", inode->i_ino, chunk, *ppos); - retval = generic_file_write(file, buf, chunk, ppos); - ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1); + if (tree_locked) + retval = generic_file_write(file, buf, chunk, ppos); + else + retval = ll_file_lockless_io(file, (char*)buf, chunk, + ppos, WRITE); + ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1); out_unlock: - ll_tree_unlock(&tree); + if (tree_locked) + ll_tree_unlock(&tree); out: if (retval > 0) { @@ -1638,6 +1726,7 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count, if (rc != 0) RETURN(rc); + ll_clear_file_contended(inode); ll_inode_size_lock(inode, 1); /* * Consistency guarantees: following possibilities exist for the diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 1bffef4..6aed986 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -78,6 +78,11 @@ enum lli_flags { /* Sizeon-on-MDS attributes are changed. An attribute update needs to * be sent to MDS. */ LLIF_SOM_DIRTY = (1 << 3), + /* File is contented */ + LLIF_CONTENDED = (1 << 4), + /* Truncate uses server lock for this file */ + LLIF_SRVLOCK = (1 << 5) + }; struct ll_inode_info { @@ -89,6 +94,7 @@ struct ll_inode_info { __u64 lli_maxbytes; __u64 lli_ioepoch; unsigned long lli_flags; + cfs_time_t lli_contention_time; /* this lock protects posix_acl, pending_write_llaps, mmap_cnt */ spinlock_t lli_lock; @@ -234,6 +240,10 @@ enum stats_track_type { #define LL_SBI_LOCALFLOCK 0x200 /* Local flocks support by kernel */ #define LL_SBI_LRU_RESIZE 0x400 /* lru resize support */ +/* default value for ll_sb_info->contention_time */ +#define SBI_DEFAULT_CONTENTION_SECONDS 60 +/* default value for lockless_truncate_enable */ +#define SBI_DEFAULT_LOCKLESS_TRUNCATE_ENABLE 1 #define RCE_HASHES 32 struct rmtacl_ctl_entry { @@ -289,6 +299,9 @@ struct ll_sb_info { unsigned long ll_pglist_gen; struct list_head ll_pglist; /* all pages (llap_pglist_item) */ + unsigned ll_contention_time; /* seconds */ + unsigned ll_lockless_truncate_enable; /* true/false */ + struct ll_ra_info ll_ra_info; unsigned int ll_namelen; struct file_operations *ll_fop; @@ -458,7 +471,8 @@ struct ll_async_page { llap_defer_uptodate:1, llap_origin:3, llap_ra_used:1, - llap_ignore_quota:1; + llap_ignore_quota:1, + llap_lockless_io_page:1; void *llap_cookie; struct page *llap_page; struct list_head llap_pending_write; @@ -478,6 +492,7 @@ enum { LLAP_ORIGIN_COMMIT_WRITE, LLAP_ORIGIN_WRITEPAGE, LLAP_ORIGIN_REMOVEPAGE, + LLAP_ORIGIN_LOCKLESS_IO, LLAP__ORIGIN_MAX, }; extern char *llap_origins[]; @@ -545,6 +560,9 @@ struct ll_async_page *llap_cast_private(struct page *page); void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras); void ll_ra_accounting(struct ll_async_page *llap,struct address_space *mapping); void ll_truncate(struct inode *inode); +int ll_file_punch(struct inode *, loff_t, int); +ssize_t ll_file_lockless_io(struct file *, char *, size_t, loff_t *, int); +void ll_clear_file_contended(struct inode*); int ll_sync_page_range(struct inode *, struct address_space *, loff_t, size_t); /* llite/file.c */ diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index dd9d7e8..7b63ce3 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -76,7 +76,8 @@ static struct ll_sb_info *ll_init_sbi(void) SBI_DEFAULT_READAHEAD_MAX); sbi->ll_ra_info.ra_max_read_ahead_whole_pages = SBI_DEFAULT_READAHEAD_WHOLE_MAX; - + sbi->ll_contention_time = SBI_DEFAULT_CONTENTION_SECONDS; + sbi->ll_lockless_truncate_enable = SBI_DEFAULT_LOCKLESS_TRUNCATE_ENABLE; INIT_LIST_HEAD(&sbi->ll_conn_chain); INIT_LIST_HEAD(&sbi->ll_orphan_dentry_list); @@ -365,7 +366,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt) data->ocd_connect_flags = OBD_CONNECT_GRANT | OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE | - OBD_CONNECT_CANCELSET | OBD_CONNECT_FID; + OBD_CONNECT_CANCELSET | OBD_CONNECT_FID | + OBD_CONNECT_SRVLOCK | OBD_CONNECT_TRUNCLOCK; if (sbi->ll_flags & LL_SBI_OSS_CAPA) data->ocd_connect_flags |= OBD_CONNECT_OSS_CAPA; @@ -1244,6 +1246,92 @@ static int ll_setattr_done_writing(struct inode *inode, RETURN(rc); } +static int ll_setattr_do_truncate(struct inode *inode, loff_t new_size) +{ + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ll_inode_info *lli = ll_i2info(inode); + struct lov_stripe_md *lsm = lli->lli_smd; + int rc; + ldlm_policy_data_t policy = { .l_extent = {new_size, + OBD_OBJECT_EOF } }; + struct lustre_handle lockh = { 0 }; + int local_lock = 0; /* 0 - no local lock; + * 1 - lock taken by lock_extent; + * 2 - by obd_match*/ + int ast_flags; + int err; + ENTRY; + + UNLOCK_INODE_MUTEX(inode); + UP_WRITE_I_ALLOC_SEM(inode); + + if (sbi->ll_lockless_truncate_enable && + (sbi->ll_lco.lco_flags & OBD_CONNECT_TRUNCLOCK)) { + ast_flags = LDLM_FL_BLOCK_GRANTED; + rc = obd_match(sbi->ll_dt_exp, lsm, LDLM_EXTENT, + &policy, LCK_PW, &ast_flags, inode, &lockh); + if (rc > 0) { + local_lock = 2; + rc = 0; + } else if (rc == 0) { + rc = ll_file_punch(inode, new_size, 1); + } + } else { + /* XXX when we fix the AST intents to pass the discard-range + * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA + * XXX here. */ + ast_flags = (new_size == 0) ? LDLM_AST_DISCARD_DATA : 0; + rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy, + &lockh, ast_flags); + if (likely(rc == 0)) + local_lock = 1; + } + +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + DOWN_WRITE_I_ALLOC_SEM(inode); + LOCK_INODE_MUTEX(inode); +#else + LOCK_INODE_MUTEX(inode); + DOWN_WRITE_I_ALLOC_SEM(inode); +#endif + if (likely(rc == 0)) { + /* Only ll_inode_size_lock is taken at this level. + * lov_stripe_lock() is grabbed by ll_truncate() only over + * call to obd_adjust_kms(). If vmtruncate returns 0, then + * ll_truncate dropped ll_inode_size_lock() */ + ll_inode_size_lock(inode, 0); + if (!local_lock) { + spin_lock(&lli->lli_lock); + lli->lli_flags |= LLIF_SRVLOCK; + spin_unlock(&lli->lli_lock); + } + rc = vmtruncate(inode, new_size); + if (!local_lock) { + spin_lock(&lli->lli_lock); + lli->lli_flags &= ~LLIF_SRVLOCK; + spin_unlock(&lli->lli_lock); + } + if (rc != 0) { + LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0); + ll_inode_size_unlock(inode, 0); + } + } + + if (local_lock) { + if (local_lock == 2) + err = obd_cancel(sbi->ll_dt_exp, lsm, LCK_PW, &lockh); + else + err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh); + if (unlikely(err != 0)){ + CERROR("extent unlock failed: err=%d," + " unlock method =%d\n", err, local_lock); + if (rc == 0) + rc = err; + } + } + RETURN(rc); +} + /* If this inode has objects allocated to it (lsm != NULL), then the OST * object(s) determine the file size and mtime. Otherwise, the MDS will * keep these values until such a time that objects are allocated for it. @@ -1356,43 +1444,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) * last one is especially bad for racing o_append users on other * nodes. */ if (ia_valid & ATTR_SIZE) { - ldlm_policy_data_t policy = { .l_extent = {attr->ia_size, - OBD_OBJECT_EOF } }; - struct lustre_handle lockh = { 0 }; - int err, ast_flags = 0; - /* XXX when we fix the AST intents to pass the discard-range - * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA - * XXX here. */ - if (attr->ia_size == 0) - ast_flags = LDLM_AST_DISCARD_DATA; - - UNLOCK_INODE_MUTEX(inode); - UP_WRITE_I_ALLOC_SEM(inode); - rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy, &lockh, - ast_flags); - LOCK_INODE_MUTEX(inode); - DOWN_WRITE_I_ALLOC_SEM(inode); - - if (rc != 0) - GOTO(out, rc); - - /* Only ll_inode_size_lock is taken at this level. - * lov_stripe_lock() is grabbed by ll_truncate() only over - * call to obd_adjust_kms(). If vmtruncate returns 0, then - * ll_truncate dropped ll_inode_size_lock() */ - ll_inode_size_lock(inode, 0); - rc = vmtruncate(inode, attr->ia_size); - if (rc != 0) { - LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0); - ll_inode_size_unlock(inode, 0); - } - - err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh); - if (err) { - CERROR("ll_extent_unlock failed: %d\n", err); - if (!rc) - rc = err; - } + rc = ll_setattr_do_truncate(inode, attr->ia_size); } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) { obd_flag flags; struct obd_info oinfo = { { { 0 } } }; @@ -2073,6 +2125,7 @@ char *llap_origins[] = { [LLAP_ORIGIN_READAHEAD] = "ra", [LLAP_ORIGIN_COMMIT_WRITE] = "cw", [LLAP_ORIGIN_WRITEPAGE] = "wp", + [LLAP_ORIGIN_LOCKLESS_IO] = "ls" }; struct ll_async_page *llite_pglist_next_llap(struct ll_sb_info *sbi, diff --git a/lustre/llite/llite_mmap.c b/lustre/llite/llite_mmap.c index 63b168b..147a4d7 100644 --- a/lustre/llite/llite_mmap.c +++ b/lustre/llite/llite_mmap.c @@ -366,6 +366,8 @@ struct page *ll_nopage(struct vm_area_struct *vma, unsigned long address, RETURN(NULL); } + ll_clear_file_contended(inode); + /* start and end the lock on the first and last bytes in the page */ policy_from_vma(&policy, vma, address, CFS_PAGE_SIZE); diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c index 5417c37..56b8d1a 100644 --- a/lustre/llite/lproc_llite.c +++ b/lustre/llite/lproc_llite.c @@ -460,6 +460,47 @@ static int ll_wr_track_gid(struct file *file, const char *buffer, return (ll_wr_track_id(buffer, count, data, STATS_TRACK_GID)); } +static int ll_rd_contention_time(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct super_block *sb = data; + + *eof = 1; + return snprintf(page, count, "%u\n", ll_s2sbi(sb)->ll_contention_time); + +} + +static int ll_wr_contention_time(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct super_block *sb = data; + struct ll_sb_info *sbi = ll_s2sbi(sb); + + return lprocfs_write_helper(buffer, count,&sbi->ll_contention_time) ?: + count; +} + +static int ll_rd_lockless_truncate(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct super_block *sb = data; + + *eof = 1; + return snprintf(page, count, "%u\n", + ll_s2sbi(sb)->ll_lockless_truncate_enable); +} + +static int ll_wr_lockless_truncate(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct super_block *sb = data; + struct ll_sb_info *sbi = ll_s2sbi(sb); + + return lprocfs_write_helper(buffer, count, + &sbi->ll_lockless_truncate_enable) + ?: count; +} + static struct lprocfs_vars lprocfs_llite_obd_vars[] = { { "uuid", ll_rd_sb_uuid, 0, 0 }, //{ "mntpt_path", ll_rd_path, 0, 0 }, @@ -482,6 +523,9 @@ static struct lprocfs_vars lprocfs_llite_obd_vars[] = { { "stats_track_pid", ll_rd_track_pid, ll_wr_track_pid, 0 }, { "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 }, { "stats_track_gid", ll_rd_track_gid, ll_wr_track_gid, 0 }, + { "contention_seconds", ll_rd_contention_time, ll_wr_contention_time, 0}, + { "lockless_truncate", ll_rd_lockless_truncate, + ll_wr_lockless_truncate, 0}, { 0 } }; @@ -521,6 +565,7 @@ struct llite_file_opcode { /* inode operation */ { LPROC_LL_SETATTR, LPROCFS_TYPE_REGS, "setattr" }, { LPROC_LL_TRUNC, LPROCFS_TYPE_REGS, "truncate" }, + { LPROC_LL_LOCKLESS_TRUNC, LPROCFS_TYPE_REGS, "lockless_truncate"}, { LPROC_LL_FLOCK, LPROCFS_TYPE_REGS, "flock" }, { LPROC_LL_GETATTR, LPROCFS_TYPE_REGS, "getattr" }, /* special inode operation */ @@ -535,6 +580,10 @@ struct llite_file_opcode { "direct_read" }, { LPROC_LL_DIRECT_WRITE, LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_PAGES, "direct_write" }, + { LPROC_LL_LOCKLESS_READ, LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_BYTES, + "lockless_read_bytes" }, + { LPROC_LL_LOCKLESS_WRITE, LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_BYTES, + "lockless_write_bytes" }, }; diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 3eba56d..94ed7499 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -107,6 +107,47 @@ static int ll_brw(int cmd, struct inode *inode, struct obdo *oa, RETURN(rc); } +int ll_file_punch(struct inode * inode, loff_t new_size, int srvlock) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct obd_info oinfo = { { { 0 } } }; + struct obdo oa; + int rc; + + ENTRY; + CDEBUG(D_INFO, "calling punch for "LPX64" (new size %Lu=%#Lx)\n", + lli->lli_smd->lsm_object_id, i_size_read(inode), i_size_read(inode)); + + oinfo.oi_md = lli->lli_smd; + oinfo.oi_policy.l_extent.start = new_size; + oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF; + oinfo.oi_oa = &oa; + oa.o_id = lli->lli_smd->lsm_object_id; + oa.o_gr = lli->lli_smd->lsm_object_gr; + oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + if (srvlock) { + /* set OBD_MD_FLFLAGS in o_valid, only if we + * set OBD_FL_TRUNCLOCK, otherwise ost_punch + * and filter_setattr get confused, see the comment + * in ost_punch */ + oa.o_flags = OBD_FL_TRUNCLOCK; + oa.o_valid |= OBD_MD_FLFLAGS; + } + obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE | + OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME | + OBD_MD_FLFID | OBD_MD_FLGENER); + + oinfo.oi_capa = ll_osscapa_get(inode, CAPA_OPC_OSS_TRUNC); + rc = obd_punch_rqset(ll_i2dtexp(inode), &oinfo, NULL); + ll_truncate_free_capa(oinfo.oi_capa); + if (rc) + CERROR("obd_truncate fails (%d) ino %lu\n", rc, inode->i_ino); + else + obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | + OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME); + RETURN(rc); +} + /* this isn't where truncate starts. roughly: * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate. setattr_raw grabs * DLM lock on [size, EOF], i_mutex, ->lli_size_sem, and WRITE_I_ALLOC_SEM to @@ -116,10 +157,8 @@ static int ll_brw(int cmd, struct inode *inode, struct obdo *oa, void ll_truncate(struct inode *inode) { struct ll_inode_info *lli = ll_i2info(inode); - struct obd_info oinfo = { { { 0 } } }; - struct ost_lvb lvb; - struct obdo oa; - int rc; + int srvlock = !!(lli->lli_flags & LLIF_SRVLOCK); + loff_t new_size; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) to %Lu=%#Lx\n",inode->i_ino, inode->i_generation, inode, i_size_read(inode), @@ -139,22 +178,27 @@ void ll_truncate(struct inode *inode) LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0); - /* XXX I'm pretty sure this is a hack to paper over a more fundamental - * race condition. */ - lov_stripe_lock(lli->lli_smd); - inode_init_lvb(inode, &lvb); - rc = obd_merge_lvb(ll_i2dtexp(inode), lli->lli_smd, &lvb, 0); - if (lvb.lvb_size == i_size_read(inode) && rc == 0) { - CDEBUG(D_VFSTRACE, "skipping punch for obj "LPX64", %Lu=%#Lx\n", - lli->lli_smd->lsm_object_id, i_size_read(inode), - i_size_read(inode)); + if (!srvlock) { + struct ost_lvb lvb; + int rc; + + /* XXX I'm pretty sure this is a hack to paper + * over a more fundamental race condition. */ + lov_stripe_lock(lli->lli_smd); + inode_init_lvb(inode, &lvb); + rc = obd_merge_lvb(ll_i2dtexp(inode), lli->lli_smd, &lvb, 0); + if (lvb.lvb_size == i_size_read(inode) && rc == 0) { + CDEBUG(D_VFSTRACE, "skipping punch for obj "LPX64 + ",%Lu=%#Lx\n", lli->lli_smd->lsm_object_id, + i_size_read(inode), i_size_read(inode)); + lov_stripe_unlock(lli->lli_smd); + GOTO(out_unlock, 0); + } + obd_adjust_kms(ll_i2dtexp(inode), lli->lli_smd, + i_size_read(inode), 1); lov_stripe_unlock(lli->lli_smd); - GOTO(out_unlock, 0); } - obd_adjust_kms(ll_i2dtexp(inode), lli->lli_smd, i_size_read(inode), 1); - lov_stripe_unlock(lli->lli_smd); - if (unlikely((ll_i2sbi(inode)->ll_flags & LL_SBI_CHECKSUM) && (i_size_read(inode) & ~CFS_PAGE_MASK))) { /* If the truncate leaves behind a partial page, update its @@ -178,31 +222,13 @@ void ll_truncate(struct inode *inode) } } - CDEBUG(D_INFO, "calling punch for "LPX64" (new size %Lu=%#Lx)\n", - lli->lli_smd->lsm_object_id, i_size_read(inode), i_size_read(inode)); - - oinfo.oi_md = lli->lli_smd; - oinfo.oi_policy.l_extent.start = i_size_read(inode); - oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF; - oinfo.oi_oa = &oa; - oa.o_id = lli->lli_smd->lsm_object_id; - oa.o_gr = lli->lli_smd->lsm_object_gr; - oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; - - obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE | - OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME | - OBD_MD_FLFID | OBD_MD_FLGENER); - + new_size = i_size_read(inode); ll_inode_size_unlock(inode, 0); - - oinfo.oi_capa = ll_osscapa_get(inode, CAPA_OPC_OSS_TRUNC); - rc = obd_punch_rqset(ll_i2dtexp(inode), &oinfo, NULL); - ll_truncate_free_capa(oinfo.oi_capa); - if (rc) - CERROR("obd_truncate fails (%d) ino %lu\n", rc, inode->i_ino); + if (!srvlock) + ll_file_punch(inode, new_size, 0); else - obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | - OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME); + ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LOCKLESS_TRUNC, 1); + EXIT; return; @@ -650,7 +676,8 @@ struct ll_async_page *llap_from_page(struct page *page, unsigned origin) OSC_DEFAULT_CKSUM); kunmap_atomic(kaddr, KM_USER0); if (origin == LLAP_ORIGIN_READAHEAD || - origin == LLAP_ORIGIN_READPAGE) { + origin == LLAP_ORIGIN_READPAGE || + origin == LLAP_ORIGIN_LOCKLESS_IO) { llap->llap_checksum = 0; } else if (origin == LLAP_ORIGIN_COMMIT_WRITE || llap->llap_checksum == 0) { @@ -933,11 +960,7 @@ int ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc) RETURN(ret); } -/* the kernel calls us here when a page is unhashed from the page cache. - * the page will be locked and the kernel is holding a spinlock, so - * we need to be careful. we're just tearing down our book-keeping - * here. */ -void ll_removepage(struct page *page) +static void __ll_put_llap(struct page *page) { struct inode *inode = page->mapping->host; struct obd_export *exp; @@ -946,17 +969,6 @@ void ll_removepage(struct page *page) int rc; ENTRY; - LASSERT(!in_interrupt()); - - /* sync pages or failed read pages can leave pages in the page - * cache that don't have our data associated with them anymore */ - if (page_private(page) == 0) { - EXIT; - return; - } - - LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n"); - exp = ll_i2dtexp(inode); if (exp == NULL) { CERROR("page %p ind %lu gave null export\n", page, page->index); @@ -994,6 +1006,29 @@ void ll_removepage(struct page *page) EXIT; } +/* the kernel calls us here when a page is unhashed from the page cache. + * the page will be locked and the kernel is holding a spinlock, so + * we need to be careful. we're just tearing down our book-keeping + * here. */ +void ll_removepage(struct page *page) +{ + ENTRY; + + LASSERT(!in_interrupt()); + + /* sync pages or failed read pages can leave pages in the page + * cache that don't have our data associated with them anymore */ + if (page_private(page) == 0) { + EXIT; + return; + } + + LASSERT(!llap_cast_private(page)->llap_lockless_io_page); + LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n"); + __ll_put_llap(page); + EXIT; +} + static int ll_page_matches(struct page *page, int fd_flags) { struct lustre_handle match_lockh = {0}; @@ -1872,3 +1907,274 @@ out_oig: oig_release(oig); RETURN(rc); } + +static void ll_file_put_pages(struct page **pages, int numpages) +{ + int i; + struct page **pp; + ENTRY; + + for (i = 0, pp = pages; i < numpages; i++, pp++) { + if (*pp) { + LL_CDEBUG_PAGE(D_PAGE, (*pp), "free\n"); + __ll_put_llap(*pp); + if (page_private(*pp)) + CERROR("the llap wasn't freed\n"); + (*pp)->mapping = NULL; + if (page_count(*pp) != 1) + CERROR("page %p, flags %#lx, count %i, private %p\n", + (*pp), (unsigned long)(*pp)->flags, page_count(*pp), + (void*)page_private(*pp)); + __free_pages(*pp, 0); + } + } + OBD_FREE(pages, numpages * sizeof(struct page*)); + EXIT; +} + +static struct page **ll_file_prepare_pages(int numpages, struct inode *inode, + unsigned long first) +{ + struct page **pages; + int i; + int rc = 0; + ENTRY; + + OBD_ALLOC(pages, sizeof(struct page *) * numpages); + if (pages == NULL) + RETURN(ERR_PTR(-ENOMEM)); + for (i = 0; i < numpages; i++) { + struct page *page; + struct ll_async_page *llap; + + page = alloc_pages(GFP_HIGHUSER, 0); + if (page == NULL) + GOTO(err, rc = -ENOMEM); + pages[i] = page; + /* llap_from_page needs page index and mapping to be set */ + page->index = first++; + page->mapping = inode->i_mapping; + llap = llap_from_page(page, LLAP_ORIGIN_LOCKLESS_IO); + if (IS_ERR(llap)) + GOTO(err, rc = PTR_ERR(llap)); + llap->llap_lockless_io_page = 1; + } + RETURN(pages); +err: + ll_file_put_pages(pages, numpages); + RETURN(ERR_PTR(rc)); + } + +static ssize_t ll_file_copy_pages(struct page **pages, int numpages, + char *buf, loff_t pos, size_t count, int rw) +{ + ssize_t amount = 0; + int i; + int updatechecksum = ll_i2sbi(pages[0]->mapping->host)->ll_flags & + LL_SBI_CHECKSUM; + ENTRY; + + for (i = 0; i < numpages; i++) { + unsigned offset, bytes, left; + char *vaddr; + + vaddr = kmap(pages[i]); + offset = pos & (CFS_PAGE_SIZE - 1); + bytes = min_t(unsigned, CFS_PAGE_SIZE - offset, count); + LL_CDEBUG_PAGE(D_PAGE, pages[i], "op = %s, addr = %p, " + "buf = %p, bytes = %u\n", + (rw == WRITE) ? "CFU" : "CTU", + vaddr + offset, buf, bytes); + if (rw == WRITE) { + left = copy_from_user(vaddr + offset, buf, bytes); + if (updatechecksum) { + struct ll_async_page *llap; + + llap = llap_cast_private(pages[i]); + llap->llap_checksum = crc32_le(0, vaddr, + CFS_PAGE_SIZE); + } + } else { + left = copy_to_user(buf, vaddr + offset, bytes); + } + kunmap(pages[i]); + amount += bytes; + if (left) { + amount -= left; + break; + } + buf += bytes; + count -= bytes; + pos += bytes; + } + if (amount == 0) + RETURN(-EFAULT); + RETURN(amount); +} + +static int ll_file_oig_pages(struct inode * inode, struct page **pages, + int numpages, loff_t pos, size_t count, int rw) +{ + struct obd_io_group *oig; + struct ll_inode_info *lli = ll_i2info(inode); + struct obd_export *exp; + loff_t org_pos = pos; + obd_flag brw_flags; + int rc; + int i; + ENTRY; + + exp = ll_i2dtexp(inode); + if (exp == NULL) + RETURN(-EINVAL); + rc = oig_init(&oig); + if (rc) + RETURN(rc); + brw_flags = OBD_BRW_SRVLOCK; + if (capable(CAP_SYS_RESOURCE)) + brw_flags |= OBD_BRW_NOQUOTA; + + for (i = 0; i < numpages; i++) { + struct ll_async_page *llap; + unsigned from, bytes; + + from = pos & (CFS_PAGE_SIZE - 1); + bytes = min_t(unsigned, CFS_PAGE_SIZE - from, + count - pos + org_pos); + llap = llap_cast_private(pages[i]); + LASSERT(llap); + + lock_page(pages[i]); + + LL_CDEBUG_PAGE(D_PAGE, pages[i], "offset "LPU64"," + " from %u, bytes = %u\n", + pos, from, bytes); + LASSERTF(pos >> CFS_PAGE_SHIFT == pages[i]->index, + "wrong page index %lu (%lu)\n", + pages[i]->index, + (unsigned long)(pos >> CFS_PAGE_SHIFT)); + rc = obd_queue_group_io(exp, lli->lli_smd, NULL, oig, + llap->llap_cookie, + (rw == WRITE) ? + OBD_BRW_WRITE:OBD_BRW_READ, + from, bytes, brw_flags, + ASYNC_READY | ASYNC_URGENT | + ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC); + if (rc) { + i++; + GOTO(out, rc); + } + pos += bytes; + } + rc = obd_trigger_group_io(exp, lli->lli_smd, NULL, oig); + if (rc) + GOTO(out, rc); + rc = oig_wait(oig); +out: + while(--i >= 0) + unlock_page(pages[i]); + oig_release(oig); + RETURN(rc); +} + +ssize_t ll_file_lockless_io(struct file *file, char *buf, size_t count, + loff_t *ppos, int rw) +{ + loff_t pos; + struct inode *inode = file->f_dentry->d_inode; + ssize_t rc = 0; + int max_pages; + size_t amount = 0; + unsigned long first, last; + ENTRY; + + if (rw == READ) { + loff_t isize; + + ll_inode_size_lock(inode, 0); + isize = i_size_read(inode); + ll_inode_size_unlock(inode, 0); + if (*ppos >= isize) + GOTO(out, rc = 0); + if (*ppos + count >= isize) + count -= *ppos + count - isize; + if (count == 0) + GOTO(out, rc); + } else { + rc = generic_write_checks(file, ppos, &count, 0); + if (rc) + GOTO(out, rc); + rc = remove_suid(file->f_dentry); + if (rc) + GOTO(out, rc); + } + pos = *ppos; + first = pos >> CFS_PAGE_SHIFT; + last = (pos + count - 1) >> CFS_PAGE_SHIFT; + max_pages = PTLRPC_MAX_BRW_PAGES * + ll_i2info(inode)->lli_smd->lsm_stripe_count; + CDEBUG(D_INFO, "%u, stripe_count = %u\n", + PTLRPC_MAX_BRW_PAGES /* max_pages_per_rpc */, + ll_i2info(inode)->lli_smd->lsm_stripe_count); + + while (first <= last && rc >= 0) { + int pages_for_io; + struct page **pages; + size_t bytes = count - amount; + + pages_for_io = min_t(int, last - first + 1, max_pages); + pages = ll_file_prepare_pages(pages_for_io, inode, first); + if (IS_ERR(pages)) { + rc = PTR_ERR(pages); + break; + } + if (rw == WRITE) { + rc = ll_file_copy_pages(pages, pages_for_io, buf, + pos + amount, bytes, rw); + if (rc < 0) + GOTO(put_pages, rc); + bytes = rc; + } + rc = ll_file_oig_pages(inode, pages, pages_for_io, + pos + amount, bytes, rw); + if (rc) + GOTO(put_pages, rc); + if (rw == READ) { + rc = ll_file_copy_pages(pages, pages_for_io, buf, + pos + amount, bytes, rw); + if (rc < 0) + GOTO(put_pages, rc); + bytes = rc; + } + amount += bytes; + buf += bytes; +put_pages: + ll_file_put_pages(pages, pages_for_io); + first += pages_for_io; + /* a short read/write check */ + if (pos + amount < ((loff_t)first << CFS_PAGE_SHIFT)) + break; + } + /* NOTE: don't update i_size and KMS in absence of LDLM locks even + * write makes the file large */ + file_accessed(file); + if (rw == READ && amount < count && rc == 0) { + unsigned long not_cleared; + + not_cleared = clear_user(buf, count - amount); + amount = count - not_cleared; + if (not_cleared) + rc = -EFAULT; + } + if (amount > 0) { + lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, + (rw == WRITE) ? + LPROC_LL_LOCKLESS_WRITE : + LPROC_LL_LOCKLESS_READ, + (long)amount); + *ppos += amount; + RETURN(amount); + } +out: + RETURN(rc); +} diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index 5d80fcb..aeb17a0 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -176,7 +176,8 @@ int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc) memset(lov_lockhp, 0, sizeof(*lov_lockhp)); if (lov->lov_tgts[req->rq_idx] && lov->lov_tgts[req->rq_idx]->ltd_active) { - if (rc != -EINTR) + /* -EUSERS used by OST to report file contention */ + if (rc != -EINTR && rc != -EUSERS) CERROR("enqueue objid "LPX64" subobj " LPX64" on OST idx %d: rc %d\n", set->set_oi->oi_md->lsm_object_id, diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index fd6a232..2bad71a 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2180,6 +2180,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, CFS_LIST_HEAD(rpc_list); unsigned int ending_offset; unsigned starting_offset = 0; + int srvlock = 0; ENTRY; /* first we find the pages we're allowed to work with */ @@ -2189,6 +2190,13 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, LASSERT(oap->oap_magic == OAP_MAGIC); + if (page_count != 0 && + srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) { + CDEBUG(D_PAGE, "SRVLOCK flag mismatch," + " oap %p, page %p, srvlock %u\n", + oap, oap->oap_brw_page.pg, (unsigned)!srvlock); + break; + } /* in llite being 'ready' equates to the page being locked * until completion unlocks it. commit_write submits a page * as not ready because its unlock will happen unconditionally @@ -2270,6 +2278,8 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, /* now put the page back in our accounting */ list_add_tail(&oap->oap_rpc_item, &rpc_list); + if (page_count == 0) + srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK); if (++page_count >= cli->cl_max_pages_per_rpc) break; diff --git a/lustre/tests/sanityN.sh b/lustre/tests/sanityN.sh index 8370a8b..777dec8 100644 --- a/lustre/tests/sanityN.sh +++ b/lustre/tests/sanityN.sh @@ -608,6 +608,85 @@ test_31() { } run_test 31 "voluntary cancel / blocking ast race==============" +# enable/disable lockless truncate feature, depending on the arg 0/1 +enable_lockless_truncate() { + lctl set_param -n llite.*.lockless_truncate $1 +} + +test_32a() { # bug 11270 + local p="$TMP/sanityN-$TESTNAME.parameters" + save_lustre_params $HOSTNAME llite.*.lockless_truncate > $p + cancel_lru_locks osc + clear_llite_stats + enable_lockless_truncate 1 + dd if=/dev/zero of=$DIR1/$tfile count=10 bs=1M > /dev/null 2>&1 + + log "checking cached lockless truncate" + $TRUNCATE $DIR1/$tfile 8000000 + $CHECKSTAT -s 8000000 $DIR2/$tfile || error "wrong file size" + [ $(calc_llite_stats lockless_truncate) -eq 0 ] || + error "lockless truncate doesn't use cached locks" + + log "checking not cached lockless truncate" + $TRUNCATE $DIR2/$tfile 5000000 + $CHECKSTAT -s 5000000 $DIR1/$tfile || error "wrong file size" + [ $(calc_llite_stats lockless_truncate) -ne 0 ] || + error "not cached trancate isn't lockless" + + log "disabled lockless truncate" + enable_lockless_truncate 0 + clear_llite_stats + $TRUNCATE $DIR2/$tfile 3000000 + $CHECKSTAT -s 3000000 $DIR1/$tfile || error "wrong file size" + [ $(calc_llite_stats lockless_truncate) -eq 0 ] || + error "lockless truncate disabling failed" + rm $DIR1/$tfile + # restore lockless_truncate default values + restore_lustre_params < $p + rm -f $p +} +run_test 32a "lockless truncate" + +test_32b() { # bug 11270 + local node + local p="$TMP/sanityN-$TESTNAME.parameters" + save_lustre_params $HOSTNAME "llite.*.contention_seconds" > $p + for node in $(osts_nodes); do + save_lustre_params $node "ldlm.namespaces.filter-*.max_nolock_bytes" >> $p + save_lustre_params $node "ldlm.namespaces.filter-*.contended_locks" >> $p + save_lustre_params $node "ldlm.namespaces.filter-*.contention_seconds" >> $p + done + clear_llite_stats + # agressive lockless i/o settings + for node in $(osts_nodes); do + do_node $node 'lctl set_param -n ldlm.namespaces.filter-*.max_nolock_bytes 2000000; lctl set_param -n ldlm.namespaces.filter-*.contended_locks 0; lctl set_param -n ldlm.namespaces.filter-*.contention_seconds 60' + done + lctl set_param -n llite.*.contention_seconds 60 + for i in $(seq 5); do + dd if=/dev/zero of=$DIR1/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1 + dd if=/dev/zero of=$DIR2/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1 + done + [ $(calc_llite_stats lockless_write_bytes) -ne 0 ] || error "lockless i/o was not triggered" + # disable lockless i/o (it is disabled by default) + for node in $(osts_nodes); do + do_node $node 'lctl set_param -n ldlm.namespaces.filter-*.max_nolock_bytes 0; lctl set_param -n ldlm.namespaces.filter-*.contended_locks 32; lctl set_param -n ldlm.namespaces.filter-*.contention_seconds 0' + done + # set contention_seconds to 0 at client too, otherwise Lustre still + # remembers lock contention + lctl set_param -n llite.*.contention_seconds 0 + clear_llite_stats + for i in $(seq 5); do + dd if=/dev/zero of=$DIR1/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1 + dd if=/dev/zero of=$DIR2/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1 + done + [ $(calc_llite_stats lockless_write_bytes) -eq 0 ] || + error "lockless i/o works when disabled" + rm -f $DIR1/$tfile + restore_lustre_params <$p + rm -f $p +} +run_test 32b "lockless i/o" + log "cleanup: ======================================================" check_and_cleanup_lustre diff --git a/lustre/tests/test-framework.sh b/lustre/tests/test-framework.sh index c08579c..a92a828 100644 --- a/lustre/tests/test-framework.sh +++ b/lustre/tests/test-framework.sh @@ -1635,3 +1635,33 @@ multiop_bg_pause() { return 0 } + +# reset llite stat counters +clear_llite_stats(){ + lctl set_param -n llite.*.stats 0 +} + +# sum llite stat items +calc_llite_stats() { + local res=$(lctl get_param -n llite.*.stats | + awk 'BEGIN {s = 0} END {print s} /^'"$1"'/ {s += $2}') + echo $res +} + +# save_lustre_params(node, parameter_mask) +# generate a stream of formatted strings ( =) +save_lustre_params() { + local s + do_node $1 "lctl get_param $2" | while read s; do echo "$1 $s"; done +} + +# restore lustre parameters from input stream, produces by save_lustre_params +restore_lustre_params() { + local node + local name + local val + while IFS=" =" read node name val; do + do_node $node "lctl set_param -n $name $val" + done +} +