From 8eec98f7f15041759b1d4592934c091d32ab2bb7 Mon Sep 17 00:00:00 2001 From: nikita Date: Wed, 12 Oct 2005 10:55:00 +0000 Subject: [PATCH] Add locking to provide consistency between kms and lsm. b=5047 r=nikita r=adilger --- lustre/ChangeLog | 9 +++++ lustre/include/linux/obd.h | 14 ++++++++ lustre/liblustre/super.c | 2 +- lustre/llite/file.c | 80 +++++++++++++++++++++++++++++-------------- lustre/llite/llite_internal.h | 13 ++++++- lustre/llite/llite_lib.c | 45 ++++++++++++++++++++---- lustre/llite/llite_mmap.c | 18 ++++++---- lustre/llite/rw.c | 66 +++++++++++++++++++++-------------- lustre/lov/lov_merge.c | 12 ++++++- lustre/lov/lov_obd.c | 18 ++++++++++ lustre/lov/lov_pack.c | 1 + lustre/lov/lov_request.c | 4 +++ 12 files changed, 214 insertions(+), 68 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index b78a184..88bad11 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -174,6 +174,15 @@ Details : Having an LWI_INTR() wait event (interruptible, but no timeout) request was interrupted, and we also didn't break out of the event loop if there was no timeout +Severity : minor +Frequency : rare +Bugzilla : 5047 +Description: data loss during concurrent not-page-aligned writes. +Details : updates to KMS and lsm weren't protected by common lock. Resulting + inconsistency leds to false short-reads, that were cached and later + used by ->prepare_write() to fill in partially written page, + leading to data loss. + ------------------------------------------------------------------------------ 08-26-2005 Cluster File Systems, Inc. diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index 23a7827..f0697c3 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -83,6 +83,9 @@ static inline void loi_init(struct lov_oinfo *loi) } struct lov_stripe_md { + spinlock_t lsm_lock; + void *lsm_lock_owner; /* debugging */ + /* Public members. */ __u64 lsm_object_id; /* lov object id */ __u64 lsm_object_gr; /* lov object id */ @@ -97,6 +100,17 @@ struct lov_stripe_md { struct lov_oinfo lsm_oinfo[0]; }; +/* compare all fields except for semaphore */ +static inline int lov_stripe_md_cmp(struct lov_stripe_md *m1, + struct lov_stripe_md *m2) +{ + return memcmp(&m1->lsm_object_id, &m2->lsm_object_id, + (char *)&m2->lsm_oinfo[0] - (char *)&m2->lsm_object_id); +} + +void lov_stripe_lock(struct lov_stripe_md *md); +void lov_stripe_unlock(struct lov_stripe_md *md); + struct obd_type { struct list_head typ_chain; struct obd_ops *typ_ops; diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index f32aa36..422b658 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -138,7 +138,7 @@ void llu_update_inode(struct inode *inode, struct mds_body *body, if (lli->lli_maxbytes > PAGE_CACHE_MAXBYTES) lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; } else { - if (memcmp(lli->lli_smd, lsm, sizeof(*lsm))) { + if (lov_stripe_md_cmp(lli->lli_smd, lsm)) { CERROR("lsm mismatch for inode %lld\n", (long long)st->st_ino); LBUG(); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index d7b3d08..170f7a7 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -534,10 +534,11 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, stripe = ll_lock_to_stripe_offset(inode, lock); if (stripe < 0) goto iput; + ll_pgcache_remove_extent(inode, lsm, lock, stripe); l_lock(&lock->l_resource->lr_namespace->ns_lock); - down(&lli->lli_size_sem); + lov_stripe_lock(lsm); kms = ldlm_extent_shift_kms(lock, lsm->lsm_oinfo[stripe].loi_kms); @@ -545,7 +546,7 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64, lsm->lsm_oinfo[stripe].loi_kms, kms); lsm->lsm_oinfo[stripe].loi_kms = kms; - up(&lli->lli_size_sem); + lov_stripe_unlock(lsm); l_unlock(&lock->l_resource->lr_namespace->ns_lock); //ll_try_done_writing(inode); iput: @@ -681,6 +682,13 @@ int ll_glimpse_size(struct inode *inode) CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino); + /* NOTE: this looks like DLM lock request, but it may not be one. Due + * to LDLM_FL_HAS_INTENT flag, this is glimpse request, that + * won't revoke any conflicting DLM locks held. Instead, + * ll_glimpse_callback() will be called on each client + * holding a DLM lock against this file, and resulting size + * will be returned for each stripe. DLM lock on [0, EOF] is + * acquired only if there were no conflicting locks. */ rc = obd_enqueue(sbi->ll_osc_exp, lli->lli_smd, LDLM_EXTENT, &policy, LCK_PR, &flags, ll_extent_lock_callback, ldlm_completion_ast, ll_glimpse_callback, inode, @@ -692,10 +700,10 @@ int ll_glimpse_size(struct inode *inode) RETURN(rc > 0 ? -EIO : rc); } - down(&lli->lli_size_sem); + ll_inode_size_lock(inode, 1); inode->i_size = lov_merge_size(lli->lli_smd, 0); inode->i_blocks = lov_merge_blocks(lli->lli_smd); - up(&lli->lli_size_sem); + ll_inode_size_unlock(inode, 1); LTIME_S(inode->i_mtime) = lov_merge_mtime(lli->lli_smd, LTIME_S(inode->i_mtime)); @@ -713,7 +721,6 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode, int ast_flags) { struct ll_sb_info *sbi = ll_i2sbi(inode); - struct ll_inode_info *lli = ll_i2info(inode); int rc; ENTRY; @@ -741,16 +748,18 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode, if (policy->l_extent.start == 0 && policy->l_extent.end == OBD_OBJECT_EOF) { /* vmtruncate()->ll_truncate() first sets the i_size and then - * the kms under both a DLM lock and the i_sem. If we don't - * get the i_sem here we can match the DLM lock and reset - * i_size from the kms before the truncating path has updated - * the kms. generic_file_write can then trust the stale i_size - * when doing appending writes and effectively cancel the - * result of the truncate. Getting the i_sem after the enqueue - * maintains the DLM -> i_sem acquiry order. */ - down(&lli->lli_size_sem); + * the kms under both a DLM lock and the + * ll_inode_size_lock(). If we don't get the + * ll_inode_size_lock() here we can match the DLM lock and + * reset i_size from the kms before the truncating path has + * updated the kms. generic_file_write can then trust the + * stale i_size when doing appending writes and effectively + * cancel the result of the truncate. Getting the + * ll_inode_size_lock() after the enqueue maintains the DLM + * -> ll_inode_size_lock() acquiring order. */ + ll_inode_size_lock(inode, 1); inode->i_size = lov_merge_size(lsm, 1); - up(&lli->lli_size_sem); + ll_inode_size_unlock(inode, 1); } if (rc == 0) @@ -834,18 +843,38 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count, if (rc != 0) RETURN(rc); - down(&lli->lli_size_sem); + ll_inode_size_lock(inode, 1); + /* + * Consistency guarantees: following possibilities exist for the + * relation between region being read and real file size at this + * moment: + * + * (A): the region is completely inside of the file; + * + * (B-x): x bytes of region are inside of the file, the rest is + * outside; + * + * (C): the region is completely outside of the file. + * + * This classification is stable under DLM lock acquired by + * ll_tree_lock() above, because to change class, other client has to + * take DLM lock conflicting with our lock. Also, any updates to + * ->i_size by other threads on this client are serialized by + * ll_inode_size_lock(). This guarantees that short reads are handled + * correctly in the face of concurrent writes and truncates. + */ kms = lov_merge_size(lsm, 1); if (*ppos + count - 1 > kms) { - /* A glimpse is necessary to determine whether we return a short - * read or some zeroes at the end of the buffer */ - up(&lli->lli_size_sem); + /* A glimpse is necessary to determine whether we return a + * short read (B) or some zeroes at the end of the buffer (C) */ + ll_inode_size_unlock(inode, 1); retval = ll_glimpse_size(inode); if (retval) goto out; } else { + /* region is within kms and, hence, within real file size (A) */ inode->i_size = kms; - up(&lli->lli_size_sem); + ll_inode_size_unlock(inode, 1); } CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n", @@ -1256,7 +1285,8 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin) { struct inode *inode = file->f_dentry->d_inode; struct ll_file_data *fd = LUSTRE_FPRIVATE(file); - struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; + struct ll_inode_info *lli = ll_i2info(inode); + struct lov_stripe_md *lsm = lli->lli_smd; struct lustre_handle lockh = {0}; loff_t retval; ENTRY; @@ -1269,7 +1299,6 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin) lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK); if (origin == 2) { /* SEEK_END */ ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF }}; - struct ll_inode_info *lli = ll_i2info(inode); int nonblock = 0, rc; if (file->f_flags & O_NONBLOCK) @@ -1280,9 +1309,9 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin) if (rc != 0) RETURN(rc); - down(&lli->lli_size_sem); + ll_inode_size_lock(inode, 0); offset += inode->i_size; - up(&lli->lli_size_sem); + ll_inode_size_unlock(inode, 0); } else if (origin == 1) { /* SEEK_CUR */ offset += file->f_pos; } @@ -1538,7 +1567,6 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct lookup_intent *it, struct kstat *stat) { struct inode *inode = de->d_inode; - struct ll_inode_info *lli = ll_i2info(inode); int res = 0; res = ll_inode_revalidate_it(de, it); @@ -1559,10 +1587,10 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de, stat->ctime = inode->i_ctime; stat->blksize = inode->i_blksize; - down(&lli->lli_size_sem); + ll_inode_size_lock(inode, 0); stat->size = inode->i_size; stat->blocks = inode->i_blocks; - up(&lli->lli_size_sem); + ll_inode_size_unlock(inode, 0); return 0; } diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 3ca8df3..c3d963a 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -52,8 +52,8 @@ extern struct file_operations ll_pgcache_seq_fops; struct ll_inode_info { int lli_inode_magic; - int lli_size_pid; struct semaphore lli_size_sem; + void *lli_size_sem_owner; struct semaphore lli_open_sem; struct lov_stripe_md *lli_smd; char *lli_symlink_name; @@ -81,6 +81,17 @@ struct ll_inode_info { #endif }; +/* + * Locking to guarantee consistency of non-atomic updates to long long i_size, + * consistency between file size and KMS, and consistency within + * ->lli_smd->lsm_oinfo[]'s. + * + * Implemented by ->lli_size_sem and ->lsm_sem, nested in that order. + */ + +void ll_inode_size_lock(struct inode *inode, int lock_lsm); +void ll_inode_size_unlock(struct inode *inode, int unlock_lsm); + // FIXME: replace the name of this with LL_I to conform to kernel stuff // static inline struct ll_inode_info *LL_I(struct inode *inode) static inline struct ll_inode_info *ll_i2info(struct inode *inode) diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 864232d..92759c3 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -460,7 +460,6 @@ void ll_lli_init(struct ll_inode_info *lli) { sema_init(&lli->lli_open_sem, 1); sema_init(&lli->lli_size_sem, 1); - lli->lli_size_pid = 0; lli->lli_flags = 0; lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; spin_lock_init(&lli->lli_lock); @@ -1078,14 +1077,15 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) if (rc != 0) RETURN(rc); - down(&lli->lli_size_sem); - lli->lli_size_pid = current->pid; + /* Only ll_inode_size_lock is taken at this level. + * lov_stripe_lock() is grabbed by ll_truncate() only over + * call to obd_adjust_kms(). If vmtruncate returns 0, then + * ll_truncate dropped ll_inode_size_lock() */ + ll_inode_size_lock(inode, 0); rc = vmtruncate(inode, attr->ia_size); - // if vmtruncate returned 0, then ll_truncate dropped _size_sem if (rc != 0) { LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0); - lli->lli_size_pid = 0; - up(&lli->lli_size_sem); + ll_inode_size_unlock(inode, 0); } err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh); @@ -1196,6 +1196,35 @@ int ll_statfs(struct super_block *sb, struct kstatfs *sfs) return 0; } +void ll_inode_size_lock(struct inode *inode, int lock_lsm) +{ + struct ll_inode_info *lli; + struct lov_stripe_md *lsm; + + lli = ll_i2info(inode); + LASSERT(lli->lli_size_sem_owner != current); + down(&lli->lli_size_sem); + LASSERT(lli->lli_size_sem_owner == NULL); + lli->lli_size_sem_owner = current; + lsm = lli->lli_smd; + if (lsm != NULL && lock_lsm) + lov_stripe_lock(lsm); +} + +void ll_inode_size_unlock(struct inode *inode, int unlock_lsm) +{ + struct ll_inode_info *lli; + struct lov_stripe_md *lsm; + + lli = ll_i2info(inode); + lsm = lli->lli_smd; + if (lsm != NULL && unlock_lsm) + lov_stripe_unlock(lsm); + LASSERT(lli->lli_size_sem_owner == current); + lli->lli_size_sem_owner = NULL; + up(&lli->lli_size_sem); +} + void ll_update_inode(struct inode *inode, struct mds_body *body, struct lov_stripe_md *lsm) { @@ -1210,12 +1239,14 @@ void ll_update_inode(struct inode *inode, struct mds_body *body, } CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n", lsm, inode->i_ino, inode->i_generation, inode); + ll_inode_size_lock(inode, 0); lli->lli_smd = lsm; + ll_inode_size_unlock(inode, 0); lli->lli_maxbytes = lsm->lsm_maxbytes; if (lli->lli_maxbytes > PAGE_CACHE_MAXBYTES) lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; } else { - if (memcmp(lli->lli_smd, lsm, sizeof(*lsm))) { + if (lov_stripe_md_cmp(lli->lli_smd, lsm)) { CERROR("lsm mismatch for inode %ld\n", inode->i_ino); CERROR("lli_smd:\n"); diff --git a/lustre/llite/llite_mmap.c b/lustre/llite/llite_mmap.c index 4be05bf..7ad228e 100644 --- a/lustre/llite/llite_mmap.c +++ b/lustre/llite/llite_mmap.c @@ -367,6 +367,7 @@ struct page *ll_nopage(struct vm_area_struct *vma, unsigned long address, ldlm_mode_t mode; struct page *page = NULL; struct ll_inode_info *lli = ll_i2info(inode); + struct lov_stripe_md *lsm; __u64 kms, old_mtime; unsigned long pgoff, size, rand_read, seq_read; int rc = 0; @@ -386,7 +387,8 @@ struct page *ll_nopage(struct vm_area_struct *vma, unsigned long address, mode = mode_from_vma(vma); old_mtime = LTIME_S(inode->i_mtime); - rc = ll_extent_lock(fd, inode, lli->lli_smd, mode, &policy, + lsm = lli->lli_smd; + rc = ll_extent_lock(fd, inode, lsm, mode, &policy, &lockh, LDLM_FL_CBPENDING | LDLM_FL_NO_LRU); if (rc != 0) RETURN(NULL); @@ -394,19 +396,21 @@ struct page *ll_nopage(struct vm_area_struct *vma, unsigned long address, if (vma->vm_flags & VM_EXEC && LTIME_S(inode->i_mtime) != old_mtime) CWARN("binary changed. inode %lu\n", inode->i_ino); - /* XXX change inode size without i_sem hold! there is a race condition - * with truncate path. (see ll_extent_lock) */ - //down(&lli->lli_size_sem); - kms = lov_merge_size(lli->lli_smd, 1); + lov_stripe_lock(lsm); + kms = lov_merge_size(lsm, 1); + pgoff = ((address - vma->vm_start) >> PAGE_CACHE_SHIFT) + vma->vm_pgoff; size = (kms + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; if (pgoff >= size) { - //up(&lli->lli_size_sem); + lov_stripe_unlock(lsm); ll_glimpse_size(inode); } else { + /* XXX change inode size without ll_inode_size_lock() held! + * there is a race condition with truncate path. (see + * ll_extent_lock) */ inode->i_size = kms; - //up(&lli->lli_size_sem); + lov_stripe_unlock(lsm); } /* disable VM_SEQ_READ and use VM_RAND_READ to make sure that diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index cc34c88..1b86074 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -102,10 +102,11 @@ static int ll_brw(int cmd, struct inode *inode, struct obdo *oa, } /* this isn't where truncate starts. roughly: - * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate - * we grab the lock back in setattr_raw to avoid races. + * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate. setattr_raw grabs + * DLM lock on [0, EOF], i_sem, ->lli_size_sem, and WRITE_I_ALLOC_SEM to + * avoid races. * - * must be called with lli_size_sem held */ + * must be called under ->lli_size_sem */ void ll_truncate(struct inode *inode) { struct ll_inode_info *lli = ll_i2info(inode); @@ -116,7 +117,7 @@ void ll_truncate(struct inode *inode) CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) to %Lu=%#Lx\n",inode->i_ino, inode->i_generation, inode, inode->i_size, inode->i_size); - if (lli->lli_size_pid != current->pid) { + if (lli->lli_size_sem_owner != current) { EXIT; return; } @@ -131,12 +132,17 @@ void ll_truncate(struct inode *inode) /* XXX I'm pretty sure this is a hack to paper over a more fundamental * race condition. */ + lov_stripe_lock(lsm); if (lov_merge_size(lsm, 0) == inode->i_size) { CDEBUG(D_VFSTRACE, "skipping punch for obj "LPX64", %Lu=%#Lx\n", lsm->lsm_object_id, inode->i_size, inode->i_size); + lov_stripe_unlock(lsm); GOTO(out_unlock, 0); } + obd_adjust_kms(ll_i2obdexp(inode), lsm, inode->i_size, 1); + lov_stripe_unlock(lsm); + if (unlikely((ll_i2sbi(inode)->ll_flags & LL_SBI_CHECKSUM) && (inode->i_size & ~PAGE_MASK))) { /* If the truncate leaves behind a partial page, update its @@ -162,10 +168,7 @@ void ll_truncate(struct inode *inode) obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE | OBD_MD_FLATIME |OBD_MD_FLMTIME |OBD_MD_FLCTIME); - obd_adjust_kms(ll_i2obdexp(inode), lsm, inode->i_size, 1); - - lli->lli_size_pid = 0; - up(&lli->lli_size_sem); + ll_inode_size_unlock(inode, 0); rc = obd_punch(ll_i2obdexp(inode), &oa, lsm, inode->i_size, OBD_OBJECT_EOF, NULL); @@ -179,8 +182,8 @@ void ll_truncate(struct inode *inode) return; out_unlock: - lli->lli_size_pid = 0; - up(&lli->lli_size_sem); + ll_inode_size_unlock(inode, 0); + EXIT; } /* ll_truncate */ int ll_prepare_write(struct file *file, struct page *page, unsigned from, @@ -230,9 +233,9 @@ int ll_prepare_write(struct file *file, struct page *page, unsigned from, /* If are writing to a new page, no need to read old data. The extent * locking will have updated the KMS, and for our purposes here we can * treat it like i_size. */ - down(&lli->lli_size_sem); + lov_stripe_lock(lsm); kms = lov_merge_size(lsm, 1); - up(&lli->lli_size_sem); + lov_stripe_unlock(lsm); if (kms <= offset) { LL_CDEBUG_PAGE(D_PAGE, page, "kms "LPU64" <= offset "LPU64"\n", kms, offset); @@ -307,6 +310,7 @@ static int ll_ap_refresh_count(void *data, int cmd) struct ll_async_page *llap; struct lov_stripe_md *lsm; struct page *page; + struct inode *inode; __u64 kms; ENTRY; @@ -315,12 +319,13 @@ static int ll_ap_refresh_count(void *data, int cmd) llap = LLAP_FROM_COOKIE(data); page = llap->llap_page; - lli = ll_i2info(page->mapping->host); + inode = page->mapping->host; + lli = ll_i2info(inode); lsm = lli->lli_smd; - //down(&lli->lli_size_sem); + lov_stripe_lock(lsm); kms = lov_merge_size(lsm, 1); - //up(&lli->lli_size_sem); + lov_stripe_unlock(lsm); /* catch race with truncate */ if (((__u64)page->index << PAGE_SHIFT) >= kms) @@ -502,11 +507,13 @@ struct ll_async_page *llap_from_page(struct page *page, unsigned origin) llap = llap_cast_private(page); if (llap != NULL) { /* move to end of LRU list */ - spin_lock(&sbi->ll_lock); - sbi->ll_pglist_gen++; - list_del_init(&llap->llap_pglist_item); - list_add_tail(&llap->llap_pglist_item, &sbi->ll_pglist); - spin_unlock(&sbi->ll_lock); + if (origin == 0) { + spin_lock(&sbi->ll_lock); + sbi->ll_pglist_gen++; + list_del_init(&llap->llap_pglist_item); + list_add_tail(&llap->llap_pglist_item, &sbi->ll_pglist); + spin_unlock(&sbi->ll_lock); + } GOTO(out, llap); } @@ -709,9 +716,11 @@ int ll_commit_write(struct file *file, struct page *page, unsigned from, out: size = (((obd_off)page->index) << PAGE_SHIFT) + to; - down(&lli->lli_size_sem); + ll_inode_size_lock(inode, 0); if (rc == 0) { + lov_stripe_lock(lsm); obd_adjust_kms(exp, lsm, size, 0); + lov_stripe_unlock(lsm); if (size > inode->i_size) inode->i_size = size; SetPageUptodate(page); @@ -721,7 +730,7 @@ out: * teardown our book-keeping here. */ ll_removepage(page); } - up(&lli->lli_size_sem); + ll_inode_size_unlock(inode, 0); RETURN(rc); } @@ -1014,10 +1023,17 @@ static int ll_readahead(struct ll_readahead_state *ras, int rc, ret = 0, match_failed = 0; __u64 kms; unsigned int gfp_mask; + struct inode *inode; + struct lov_stripe_md *lsm; struct ll_ra_read *bead; ENTRY; - kms = lov_merge_size(ll_i2info(mapping->host)->lli_smd, 1); + inode = mapping->host; + lsm = ll_i2info(inode)->lli_smd; + + lov_stripe_lock(lsm); + kms = lov_merge_size(lsm, 1); + lov_stripe_unlock(lsm); if (kms == 0) { ll_ra_stats_inc(mapping, RA_STAT_ZERO_LEN); RETURN(0); @@ -1064,7 +1080,7 @@ static int ll_readahead(struct ll_readahead_state *ras, RETURN(0); } - reserved = ll_ra_count_get(ll_i2sbi(mapping->host), end - start + 1); + reserved = ll_ra_count_get(ll_i2sbi(inode), end - start + 1); if (reserved < end - start + 1) ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT); @@ -1120,7 +1136,7 @@ static int ll_readahead(struct ll_readahead_state *ras, LASSERTF(reserved >= 0, "reserved %lu\n", reserved); if (reserved != 0) - ll_ra_count_put(ll_i2sbi(mapping->host), reserved); + ll_ra_count_put(ll_i2sbi(inode), reserved); if (i == end + 1 && end == (kms >> PAGE_CACHE_SHIFT)) ll_ra_stats_inc(mapping, RA_STAT_EOF); diff --git a/lustre/lov/lov_merge.c b/lustre/lov/lov_merge.c index 614d1e6..73f549c 100644 --- a/lustre/lov/lov_merge.c +++ b/lustre/lov/lov_merge.c @@ -49,6 +49,11 @@ __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms) __u64 size = 0; int i; + LASSERT_SPIN_LOCKED(&lsm->lsm_lock); +#ifdef CONFIG_SMP + LASSERT(lsm->lsm_lock_owner == current); +#endif + for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++) { obd_size lov_size, tmpsize; @@ -91,7 +96,7 @@ __u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time) } EXPORT_SYMBOL(lov_merge_mtime); -/* Must be called with the inode's lli_size_sem held. */ +/* Must be called under the lov_stripe_lock() */ int lov_adjust_kms(struct obd_export *exp, struct lov_stripe_md *lsm, obd_off size, int shrink) { @@ -100,6 +105,11 @@ int lov_adjust_kms(struct obd_export *exp, struct lov_stripe_md *lsm, __u64 kms; ENTRY; + LASSERT_SPIN_LOCKED(&lsm->lsm_lock); +#ifdef CONFIG_SMP + LASSERT(lsm->lsm_lock_owner == current); +#endif + if (shrink) { struct lov_oinfo *loi; for (loi = lsm->lsm_oinfo; stripe < lsm->lsm_stripe_count; diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 32ab6e7..8c12ee6 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -2217,6 +2217,24 @@ int lov_complete_many(struct obd_export *exp, struct lov_stripe_md *lsm, } #endif + +void lov_stripe_lock(struct lov_stripe_md *md) +{ + LASSERT(md->lsm_lock_owner != current); + spin_lock(&md->lsm_lock); + LASSERT(md->lsm_lock_owner == NULL); + md->lsm_lock_owner = current; +} +EXPORT_SYMBOL(lov_stripe_lock); + +void lov_stripe_unlock(struct lov_stripe_md *md) +{ + LASSERT(md->lsm_lock_owner == current); + md->lsm_lock_owner = NULL; + spin_unlock(&md->lsm_lock); +} +EXPORT_SYMBOL(lov_stripe_unlock); + struct obd_ops lov_obd_ops = { .o_owner = THIS_MODULE, .o_setup = lov_setup, diff --git a/lustre/lov/lov_pack.c b/lustre/lov/lov_pack.c index c85333d..a471eea 100644 --- a/lustre/lov/lov_pack.c +++ b/lustre/lov/lov_pack.c @@ -228,6 +228,7 @@ int lov_alloc_memmd(struct lov_stripe_md **lsmp, int stripe_count, int pattern) if (!*lsmp) return -ENOMEM; + spin_lock_init(&(*lsmp)->lsm_lock); (*lsmp)->lsm_magic = LOV_MAGIC; (*lsmp)->lsm_stripe_count = stripe_count; (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES * stripe_count; diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index 6bc4bd5..a756920 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -132,6 +132,7 @@ int lov_update_enqueue_set(struct lov_request_set *set, __u64 tmp = req->rq_md->lsm_oinfo->loi_rss; LASSERT(lock != NULL); + lov_stripe_lock(set->set_md); loi->loi_rss = tmp; loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime; loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks; @@ -150,13 +151,16 @@ int lov_update_enqueue_set(struct lov_request_set *set, loi->loi_rss, loi->loi_kms, lock->l_policy_data.l_extent.end); } + lov_stripe_unlock(set->set_md); ldlm_lock_allow_match(lock); LDLM_LOCK_PUT(lock); } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) { memset(lov_lockhp, 0, sizeof(*lov_lockhp)); + lov_stripe_lock(set->set_md); loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss; loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime; loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks; + lov_stripe_unlock(set->set_md); CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" " kms="LPU64"\n", loi->loi_rss, loi->loi_kms); rc = ELDLM_OK; -- 1.8.3.1