From: phil Date: Wed, 2 Mar 2005 23:35:09 +0000 (+0000) Subject: b=5492,5624,5654,5664,5672 X-Git-Tag: v1_8_0_110~486^7~150 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=8129cde023ee671708da75c89fe189593f749f6e;p=fs%2Flustre-release.git b=5492,5624,5654,5664,5672 Fundamentally changes the locking rules for the i_size and KMS. The way in which this was done is documented in bug 5654 comment #6, and I'll ask the colibri team to either adopt or adapt this design, but in any case make it a part of their documentation after they've merged it. i_size and KMS sampling and updates are now protected by an lli_size_sem. The reasons are many: The truncate path has been reorganized not to hold this semaphore during RPCs, which allows us to reverse the deadlock-inducing order with the ns_lock in ll_pgcache_remove_extent. (bug 5492) The introduction of the i_alloc_sem in 2.6 was wreaking havoc on our ability to get our ordering right. The truncate path was a festering boil of unlocking and relocking which may well have been the source of other concurrency bugs. Not using the i_sem for i_size updates eliminates this rat nest (bugs 5624, 5654) Finally, the CMD team reported a similar inversion between a writing thread and ptlrpcd (the writing thread has the i_sem, and won't release it until ptlrpcd finishes the I/O; ptlrpcd wants the i_sem to finalize lock cancellation after being evicted). (bugs 5664, 5672) This has been running at NERSC for the last week, so I think it's ready for more exposure. --- diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 662945e..b7f610e 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -33,6 +33,7 @@ tbd Cluster File Systems, Inc. - avoid needless client->OST connect, fix handle mismatch (5317) - fix DLM error path that led to out-of-sync client, long delays (5779) - support common vfs-enforced mount options (nodev,nosuid,noexec) (5637) + - fix several locking issues related to i_size (5492,5624,5654,5672) * miscellania - service request history (4965) - put {ll,lov,osc}_async_page structs in a single slab (4699) diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index f659ef8..9316230 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -71,6 +71,7 @@ struct ll_inode_info { struct lov_stripe_md *lli_smd; char *lli_symlink_name; struct semaphore lli_open_sem; + struct semaphore lli_size_sem; __u64 lli_maxbytes; __u64 lli_io_epoch; unsigned long lli_flags; diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 1763524..aa9a996 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -507,12 +507,8 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, goto iput; ll_pgcache_remove_extent(inode, lsm, lock, stripe); - /* grabbing the i_sem will wait for write() to complete. ns - * lock hold times should be very short as ast processing - * requires them and has a short timeout. so, i_sem before ns - * lock.*/ - down(&inode->i_sem); l_lock(&lock->l_resource->lr_namespace->ns_lock); + down(&lli->lli_size_sem); kms = ldlm_extent_shift_kms(lock, lsm->lsm_oinfo[stripe].loi_kms); @@ -520,8 +516,8 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64, lsm->lsm_oinfo[stripe].loi_kms, kms); lsm->lsm_oinfo[stripe].loi_kms = kms; + up(&lli->lli_size_sem); l_unlock(&lock->l_resource->lr_namespace->ns_lock); - up(&inode->i_sem); //ll_try_done_writing(inode); iput: iput(inode); @@ -671,8 +667,10 @@ int ll_glimpse_size(struct inode *inode) RETURN(rc > 0 ? -EIO : rc); } + down(&lli->lli_size_sem); inode->i_size = lov_merge_size(lli->lli_smd, 0); inode->i_blocks = lov_merge_blocks(lli->lli_smd); + up(&lli->lli_size_sem); LTIME_S(inode->i_mtime) = lov_merge_mtime(lli->lli_smd, LTIME_S(inode->i_mtime)); @@ -690,6 +688,7 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode, int ast_flags) { struct ll_sb_info *sbi = ll_i2sbi(inode); + struct ll_inode_info *lli = ll_i2info(inode); int rc; ENTRY; @@ -724,9 +723,9 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode, * when doing appending writes and effectively cancel the * result of the truncate. Getting the i_sem after the enqueue * maintains the DLM -> i_sem acquiry order. */ - down(&inode->i_sem); + down(&lli->lli_size_sem); inode->i_size = lov_merge_size(lsm, 1); - up(&inode->i_sem); + up(&lli->lli_size_sem); } if (rc == 0) @@ -787,15 +786,18 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, if (rc != 0) RETURN(rc); + down(&lli->lli_size_sem); kms = lov_merge_size(lsm, 1); if (*ppos + count - 1 > kms) { /* A glimpse is necessary to determine whether we return a short * read or some zeroes at the end of the buffer */ + up(&lli->lli_size_sem); retval = ll_glimpse_size(inode); if (retval) goto out; } else { inode->i_size = kms; + up(&lli->lli_size_sem); } CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n", @@ -1136,13 +1138,16 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin) lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK); if (origin == 2) { /* SEEK_END */ ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF }}; + struct ll_inode_info *lli = ll_i2info(inode); int rc; rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh,0); if (rc != 0) RETURN(rc); + down(&lli->lli_size_sem); offset += inode->i_size; + up(&lli->lli_size_sem); } else if (origin == 1) { /* SEEK_CUR */ offset += file->f_pos; } @@ -1398,8 +1403,9 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it) int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct lookup_intent *it, struct kstat *stat) { - int res = 0; struct inode *inode = de->d_inode; + struct ll_inode_info *lli = ll_i2info(inode); + int res = 0; res = ll_inode_revalidate_it(de, it); lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR); @@ -1417,9 +1423,13 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de, stat->atime = inode->i_atime; stat->mtime = inode->i_mtime; stat->ctime = inode->i_ctime; - stat->size = inode->i_size; stat->blksize = inode->i_blksize; + + down(&lli->lli_size_sem); + stat->size = inode->i_size; stat->blocks = inode->i_blocks; + up(&lli->lli_size_sem); + return 0; } #endif diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 9086073..b7a58c4 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -397,6 +397,7 @@ void ll_options(char *options, char **ost, char **mdc, int *flags) void ll_lli_init(struct ll_inode_info *lli) { sema_init(&lli->lli_open_sem, 1); + sema_init(&lli->lli_size_sem, 1); lli->lli_flags = 0; lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; spin_lock_init(&lli->lli_lock); @@ -952,6 +953,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) * inode ourselves so we can call obdo_from_inode() always. */ if (ia_valid & (lsm ? ~(ATTR_SIZE | ATTR_FROM_OPEN | ATTR_RAW) : ~0)) { struct lustre_md md; + int save_valid; ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0); rc = mdc_setattr(sbi->ll_mdc_exp, &op_data, @@ -970,9 +972,16 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) RETURN(rc); } - /* Won't invoke vmtruncate as we already cleared ATTR_SIZE, - * but needed to set timestamps backwards on utime. */ + /* We call inode_setattr to adjust timestamps, but we first + * clear ATTR_SIZE to avoid invoking vmtruncate. + * + * NB: ATTR_SIZE will only be set at this point if the size + * resides on the MDS, ie, this file has no objects. */ + save_valid = attr->ia_valid; + attr->ia_valid &= ~ATTR_SIZE; inode_setattr(inode, attr); + attr->ia_valid = save_valid; + ll_update_inode(inode, md.body, md.lsm); ptlrpc_req_finished(request); @@ -1013,6 +1022,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) ldlm_policy_data_t policy = { .l_extent = {attr->ia_size, OBD_OBJECT_EOF } }; struct lustre_handle lockh = { 0 }; + struct ll_inode_info *lli = ll_i2info(inode); int err, ast_flags = 0; /* XXX when we fix the AST intents to pass the discard-range * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA @@ -1020,38 +1030,20 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) if (attr->ia_size == 0) ast_flags = LDLM_AST_DISCARD_DATA; - /* bug 1639: avoid write/truncate i_sem/DLM deadlock */ - LASSERT(atomic_read(&inode->i_sem.count) <= 0); - up(&inode->i_sem); - UP_WRITE_I_ALLOC_SEM(inode); rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy, &lockh, ast_flags); -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - DOWN_WRITE_I_ALLOC_SEM(inode); - down(&inode->i_sem); -#else - down(&inode->i_sem); - DOWN_WRITE_I_ALLOC_SEM(inode); -#endif if (rc != 0) RETURN(rc); + down(&lli->lli_size_sem); rc = vmtruncate(inode, attr->ia_size); + // if vmtruncate returned 0, then ll_truncate dropped _size_sem + if (rc != 0) { + LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0); + up(&lli->lli_size_sem); + } - /* We need to drop the semaphore here, because this unlock may - * result in a cancellation, which will need the i_sem */ - up(&inode->i_sem); - UP_WRITE_I_ALLOC_SEM(inode); - /* unlock now as we don't mind others file lockers racing with - * the mds updates below? */ err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh); -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - DOWN_WRITE_I_ALLOC_SEM(inode); - down(&inode->i_sem); -#else - down(&inode->i_sem); - DOWN_WRITE_I_ALLOC_SEM(inode); -#endif if (err) { CERROR("ll_extent_unlock failed: %d\n", err); if (!rc) diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 3e947dd..8bbd1d8 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -105,10 +105,13 @@ __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms); /* this isn't where truncate starts. roughly: * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate - * we grab the lock back in setattr_raw to avoid races. */ + * we grab the lock back in setattr_raw to avoid races. + * + * must be called with lli_size_sem held */ void ll_truncate(struct inode *inode) { struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; + struct ll_inode_info *lli = ll_i2info(inode); struct obdo oa; int rc; ENTRY; @@ -118,37 +121,43 @@ void ll_truncate(struct inode *inode) if (!lsm) { CDEBUG(D_INODE, "truncate on inode %lu with no objects\n", inode->i_ino); - EXIT; - return; + GOTO(out_unlock, 0); } if (lov_merge_size(lsm, 0) == inode->i_size) { CDEBUG(D_VFSTRACE, "skipping punch for "LPX64" (size = %llu)\n", lsm->lsm_object_id, inode->i_size); - } else { - CDEBUG(D_INFO, "calling punch for "LPX64" (new size %llu)\n", - lsm->lsm_object_id, inode->i_size); + GOTO(out_unlock, 0); + } - oa.o_id = lsm->lsm_object_id; - oa.o_valid = OBD_MD_FLID; - obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE | - OBD_MD_FLATIME |OBD_MD_FLMTIME |OBD_MD_FLCTIME); + CDEBUG(D_INFO, "calling punch for "LPX64" (new size %llu)\n", + lsm->lsm_object_id, inode->i_size); - /* truncate == punch from new size to absolute end of file */ - /* NB: must call obd_punch with i_sem held! It updates kms! */ - rc = obd_punch(ll_i2obdexp(inode), &oa, lsm, inode->i_size, - OBD_OBJECT_EOF, NULL); - if (rc) - CERROR("obd_truncate fails (%d) ino %lu\n", rc, - inode->i_ino); - else - obdo_to_inode(inode, &oa, OBD_MD_FLSIZE|OBD_MD_FLBLOCKS| - OBD_MD_FLATIME | OBD_MD_FLMTIME | - OBD_MD_FLCTIME); - } + oa.o_id = lsm->lsm_object_id; + oa.o_valid = OBD_MD_FLID; + obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE | + OBD_MD_FLATIME |OBD_MD_FLMTIME |OBD_MD_FLCTIME); + + obd_adjust_kms(ll_i2obdexp(inode), lsm, inode->i_size, 1); + + LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0); + up(&lli->lli_size_sem); + rc = obd_punch(ll_i2obdexp(inode), &oa, lsm, inode->i_size, + OBD_OBJECT_EOF, NULL); + if (rc) + CERROR("obd_truncate fails (%d) ino %lu\n", rc, + inode->i_ino); + else + obdo_to_inode(inode, &oa, OBD_MD_FLSIZE|OBD_MD_FLBLOCKS| + OBD_MD_FLATIME | OBD_MD_FLMTIME | + OBD_MD_FLCTIME); EXIT; return; + + out_unlock: + LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0); + up(&lli->lli_size_sem); } /* ll_truncate */ __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms); @@ -199,7 +208,9 @@ int ll_prepare_write(struct file *file, struct page *page, unsigned from, /* If are writing to a new page, no need to read old data. The extent * locking will have updated the KMS, and for our purposes here we can * treat it like i_size. */ + down(&lli->lli_size_sem); kms = lov_merge_size(lsm, 1); + up(&lli->lli_size_sem); if (kms <= offset) { LL_CDEBUG_PAGE(D_PAGE, page, "kms "LPU64" <= offset "LPU64"\n", kms, offset); @@ -641,6 +652,7 @@ int ll_commit_write(struct file *file, struct page *page, unsigned from, out: size = (((obd_off)page->index) << PAGE_SHIFT) + to; + down(&lli->lli_size_sem); if (rc == 0) { obd_adjust_kms(exp, lsm, size, 0); if (size > inode->i_size) @@ -652,6 +664,7 @@ out: * teardown our book-keeping here. */ ll_removepage(page); } + up(&lli->lli_size_sem); RETURN(rc); } diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index e273c63..1ac7190 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -1126,15 +1126,10 @@ int lov_update_setattr_set(struct lov_request_set *set, int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req, int rc) { - struct lov_stripe_md *lsm = set->set_md; struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; ENTRY; lov_update_set(set, req, rc); - if (rc == 0) { - struct lov_oinfo *loi = &lsm->lsm_oinfo[req->rq_stripe]; - loi->loi_kms = loi->loi_rss = req->rq_extent.start; - } if (rc && !lov->tgts[req->rq_idx].active) rc = 0; /* FIXME in raid1 regime, should return 0 */ diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index f135417..194bbb5 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -814,6 +814,7 @@ int mds_open(struct mds_update_record *rec, int offset, } DEBUG_REQ(D_ERROR, req, "fid2 not set on open replay"); } + /* this LASSERT needs to go; handle more gracefully. */ LASSERT(rec->ur_fid2->id); rc = mds_open_by_fid(req, rec->ur_fid2, body, rec->ur_flags,