From 1c2eb904b1545eb9fc48f910dd5648a6aa174f6a Mon Sep 17 00:00:00 2001 From: Fan Yong Date: Tue, 27 Apr 2010 13:23:03 +0800 Subject: [PATCH] b=22342 process racer condition between statahead and rename/unlink operation 1) process racer condition between statahead and rename/unlink operation 2) replace "lli_lock" with "lli_sa_lock" for statahead related processing i=robert.read i=tom.wang --- lustre/llite/dcache.c | 7 +- lustre/llite/file.c | 4 +- lustre/llite/llite_internal.h | 32 +++++-- lustre/llite/llite_lib.c | 2 + lustre/llite/namei.c | 37 +++++--- lustre/llite/statahead.c | 208 ++++++++++++++++++++++++++++++------------ lustre/mdc/mdc_locks.c | 17 ++-- 7 files changed, 218 insertions(+), 89 deletions(-) diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index de07cca..52c7e82 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -167,7 +167,7 @@ static int ll_ddelete(struct dentry *de) RETURN(0); } -void ll_set_dd(struct dentry *de) +int ll_set_dd(struct dentry *de) { ENTRY; LASSERT(de != NULL); @@ -181,16 +181,19 @@ void ll_set_dd(struct dentry *de) OBD_ALLOC_PTR(lld); if (likely(lld != NULL)) { + CFS_INIT_LIST_HEAD(&lld->lld_sa_alias); lock_dentry(de); if (likely(de->d_fsdata == NULL)) de->d_fsdata = lld; else OBD_FREE_PTR(lld); unlock_dentry(de); + } else { + RETURN(-ENOMEM); } } - EXIT; + RETURN(0); } void ll_intent_drop_lock(struct lookup_intent *it) diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 14d68c6..6992c7e 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -515,14 +515,14 @@ int ll_file_open(struct inode *inode, struct file *file) fd->fd_file = file; if (S_ISDIR(inode->i_mode)) { - cfs_spin_lock(&lli->lli_lock); + cfs_spin_lock(&lli->lli_sa_lock); if (lli->lli_opendir_key == NULL && lli->lli_opendir_pid == 0) { LASSERT(lli->lli_sai == NULL); lli->lli_opendir_key = fd; lli->lli_opendir_pid = cfs_curproc_pid(); opendir_set = 1; } - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); } if (inode->i_sb->s_root == file->f_dentry) { diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 02c0898..cb9f9ac 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -75,6 +75,7 @@ struct ll_dentry_data { struct lookup_intent *lld_it; #endif unsigned int lld_sa_generation; + cfs_list_t lld_sa_alias; }; #define ll_d2d(de) ((struct ll_dentry_data*)((de)->d_fsdata)) @@ -174,7 +175,10 @@ struct ll_inode_info { struct obd_capa *lli_mds_capa; cfs_list_t lli_oss_capas; - /* metadata stat-ahead */ + /* metadata statahead */ + /* protect statahead stuff: lli_opendir_pid, lli_opendir_key, lli_sai, + * lli_sa_dentry, and so on. */ + cfs_spinlock_t lli_sa_lock; /* * "opendir_pid" is the token when lookup/revalid -- I am the owner of * dir statahead. @@ -186,6 +190,7 @@ struct ll_inode_info { * before child -- it is me should cleanup the dir readahead. */ void *lli_opendir_key; struct ll_statahead_info *lli_sai; + cfs_list_t lli_sa_dentry; struct cl_object *lli_clob; /* the most recent timestamps obtained from mds */ struct ost_lvb lli_lvb; @@ -521,9 +526,11 @@ static inline struct inode *ll_info2i(struct ll_inode_info *lli) } struct it_cb_data { - struct inode *icbd_parent; + struct inode *icbd_parent; struct dentry **icbd_childp; - obd_id hash; + obd_id hash; + struct inode **icbd_alias; + __u32 *bits; }; __u32 ll_i2suppgid(struct inode *i); @@ -583,8 +590,7 @@ struct lookup_intent *ll_convert_intent(struct open_intent *oit, #endif void ll_lookup_it_alias(struct dentry **de, struct inode *inode, __u32 bits); int ll_lookup_it_finish(struct ptlrpc_request *request, - struct lookup_intent *it, void *data, - struct inode **alias); + struct lookup_intent *it, void *data); /* llite/rw.c */ int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to); @@ -674,7 +680,7 @@ extern struct dentry_operations ll_d_ops; void ll_intent_drop_lock(struct lookup_intent *); void ll_intent_release(struct lookup_intent *); int ll_drop_dentry(struct dentry *dentry); -extern void ll_set_dd(struct dentry *de); +extern int ll_set_dd(struct dentry *de); int ll_drop_dentry(struct dentry *dentry); void ll_unhash_aliases(struct inode *); void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft); @@ -1118,6 +1124,7 @@ struct ll_statahead_info { unsigned int sai_skip_hidden;/* skipped hidden dentry count */ unsigned int sai_ls_all:1; /* "ls -al", do stat-ahead for * hidden entries */ + unsigned int sai_nolock; /* without lookup lock case */ cfs_waitq_t sai_waitq; /* stat-ahead wait queue */ struct ptlrpc_thread sai_thread; /* stat-ahead thread */ cfs_list_t sai_entries_sent; /* entries sent out */ @@ -1144,10 +1151,10 @@ void ll_statahead_mark(struct inode *dir, struct dentry *dentry) if (lli->lli_opendir_pid != cfs_curproc_pid()) return; - cfs_spin_lock(&lli->lli_lock); + cfs_spin_lock(&lli->lli_sa_lock); if (likely(lli->lli_sai != NULL && ldd != NULL)) ldd->lld_sa_generation = lli->lli_sai->sai_generation; - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); } static inline @@ -1189,12 +1196,16 @@ int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) return do_statahead_enter(dir, dentryp, lookup); } -static void inline ll_dops_init(struct dentry *de, int block) +static int inline ll_dops_init(struct dentry *de, int block) { struct ll_dentry_data *lld = ll_d2d(de); + int rc = 0; if (lld == NULL && block != 0) { - ll_set_dd(de); + rc = ll_set_dd(de); + if (rc) + return rc; + lld = ll_d2d(de); } @@ -1202,6 +1213,7 @@ static void inline ll_dops_init(struct dentry *de, int block) lld->lld_sa_generation = 0; de->d_op = &ll_d_ops; + return rc; } /* llite ioctl register support rountine */ diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index f56547e..5a50e8e 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -788,6 +788,8 @@ void ll_lli_init(struct ll_inode_info *lli) lli->lli_rmtperm_utime = 0; cfs_sema_init(&lli->lli_rmtperm_sem, 1); CFS_INIT_LIST_HEAD(&lli->lli_oss_capas); + cfs_spin_lock_init(&lli->lli_sa_lock); + CFS_INIT_LIST_HEAD(&lli->lli_sa_dentry); } int ll_fill_super(struct super_block *sb) diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 352b3b8..30d5d20 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -197,6 +197,7 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, break; case LDLM_CB_CANCELING: { struct inode *inode = ll_inode_from_lock(lock); + struct ll_inode_info *lli; __u64 bits = lock->l_policy_data.l_inodebits.bits; struct lu_fid *fid; @@ -243,8 +244,9 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, ll_md_real_close(inode, flags); } + lli = ll_i2info(inode); if (bits & MDS_INODELOCK_UPDATE) - ll_i2info(inode)->lli_flags &= ~LLIF_MDS_SIZE_LOCK; + lli->lli_flags &= ~LLIF_MDS_SIZE_LOCK; if (S_ISDIR(inode->i_mode) && (bits & MDS_INODELOCK_UPDATE)) { @@ -254,6 +256,18 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, ll_drop_negative_dentry(inode); } + if ((bits & MDS_INODELOCK_LOOKUP) && + !cfs_list_empty(&lli->lli_sa_dentry)) { + struct ll_dentry_data *lld, *next; + + cfs_spin_lock(&lli->lli_sa_lock); + cfs_list_for_each_entry_safe(lld, next, + &lli->lli_sa_dentry, + lld_sa_alias) + cfs_list_del_init(&lld->lld_sa_alias); + cfs_spin_unlock(&lli->lli_sa_lock); + } + if (inode->i_sb->s_root && inode != inode->i_sb->s_root->d_inode && (bits & MDS_INODELOCK_LOOKUP)) @@ -433,12 +447,8 @@ void ll_lookup_it_alias(struct dentry **de, struct inode *inode, __u32 bits) struct ll_dentry_data *lld = ll_d2d(*de); /* just make sure the ll_dentry_data is ready */ - if (unlikely(lld == NULL)) { - ll_set_dd(*de); - lld = ll_d2d(*de); - if (likely(lld != NULL)) - lld->lld_sa_generation = 0; - } + if (unlikely(lld == NULL)) + ll_dops_init(*de, 1); } /* we have lookup look - unhide dentry */ if (bits & MDS_INODELOCK_LOOKUP) { @@ -450,8 +460,7 @@ void ll_lookup_it_alias(struct dentry **de, struct inode *inode, __u32 bits) } int ll_lookup_it_finish(struct ptlrpc_request *request, - struct lookup_intent *it, void *data, - struct inode **alias) + struct lookup_intent *it, void *data) { struct it_cb_data *icbd = data; struct dentry **de = icbd->icbd_childp; @@ -475,8 +484,10 @@ int ll_lookup_it_finish(struct ptlrpc_request *request, md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle, inode, &bits); - if (alias != NULL) { - *alias = inode; + if (icbd->bits != NULL) + *icbd->bits = bits; + if (icbd->icbd_alias != NULL) { + *icbd->icbd_alias = inode; RETURN(0); } @@ -563,6 +574,8 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, icbd.icbd_childp = &dentry; icbd.icbd_parent = parent; + icbd.icbd_alias = NULL; + icbd.bits = NULL; if (it->it_op & IT_CREAT || (it->it_op & IT_OPEN && it->it_create_mode & O_CREAT)) @@ -584,7 +597,7 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, if (rc < 0) GOTO(out, retval = ERR_PTR(rc)); - rc = ll_lookup_it_finish(req, it, &icbd, NULL); + rc = ll_lookup_it_finish(req, it, &icbd); if (rc != 0) { ll_intent_release(it); GOTO(out, retval = ERR_PTR(rc)); diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index 503235a..ca65c0b 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -69,7 +69,7 @@ static cfs_spinlock_t sai_generation_lock = CFS_SPIN_LOCK_UNLOCKED; /** * Check whether first entry was stated already or not. - * No need to hold lli_lock, for: + * No need to hold lli_sa_lock, for: * (1) it is me that remove entry from the list * (2) the statahead thread only add new entry to the list */ @@ -123,13 +123,28 @@ static inline int sa_low_hit(struct ll_statahead_info *sai) (sai->sai_consecutive_miss > 8)); } +static inline int sa_skip_nolock(struct ll_statahead_info *sai) +{ + return (sai->sai_nolock >= 3); +} + static void ll_sai_entry_free(struct ll_sai_entry *entry) { struct dentry *dentry = entry->se_dentry; struct inode *inode = entry->se_inode; if (dentry) { + struct ll_dentry_data *lld = ll_d2d(dentry); + struct ll_inode_info *lli; + entry->se_dentry = NULL; + LASSERT(inode != NULL); + lli = ll_i2info(inode); + if (!cfs_list_empty(&lli->lli_sa_dentry)) { + cfs_spin_lock(&lli->lli_sa_lock); + cfs_list_del_init(&lld->lld_sa_alias); + cfs_spin_unlock(&lli->lli_sa_lock); + } dput(dentry); } if (inode) { @@ -214,11 +229,11 @@ static void ll_sai_put(struct ll_statahead_info *sai) if (cfs_atomic_dec_and_test(&sai->sai_refcount)) { struct ll_sai_entry *entry, *next; - cfs_spin_lock(&lli->lli_lock); + cfs_spin_lock(&lli->lli_sa_lock); if (unlikely(cfs_atomic_read(&sai->sai_refcount) > 0)) { /* It is race case, the interpret callback just hold * a reference count */ - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); EXIT; return; } @@ -226,7 +241,7 @@ static void ll_sai_put(struct ll_statahead_info *sai) LASSERT(lli->lli_opendir_key == NULL); lli->lli_sai = NULL; lli->lli_opendir_pid = 0; - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); LASSERT(sa_is_stopped(sai)); @@ -278,9 +293,9 @@ ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index) entry->se_index = index; entry->se_stat = SA_ENTRY_UNSTATED; - cfs_spin_lock(&lli->lli_lock); + cfs_spin_lock(&lli->lli_sa_lock); cfs_list_add_tail(&entry->se_list, &sai->sai_entries_sent); - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); RETURN(entry); } @@ -296,7 +311,7 @@ static int ll_sai_entry_fini(struct ll_statahead_info *sai) int rc = 0; ENTRY; - cfs_spin_lock(&lli->lli_lock); + cfs_spin_lock(&lli->lli_sa_lock); sai->sai_index_next++; if (likely(!cfs_list_empty(&sai->sai_entries_stated))) { entry = cfs_list_entry(sai->sai_entries_stated.next, @@ -309,13 +324,13 @@ static int ll_sai_entry_fini(struct ll_statahead_info *sai) } else { LASSERT(sa_is_stopped(sai)); } - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); RETURN(rc); } /** - * inside lli_lock. + * inside lli_sa_lock. * \retval NULL : can not find the entry in sai_entries_sent with the index * \retval entry: find the entry in sai_entries_sent with the index */ @@ -343,7 +358,7 @@ ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat, } /** - * inside lli_lock. + * inside lli_sa_lock. * Move entry to sai_entries_received and * insert it into sai_entries_received tail. */ @@ -368,13 +383,13 @@ ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry ll_sai_entry_cleanup(entry, 0); - cfs_spin_lock(&lli->lli_lock); + cfs_spin_lock(&lli->lli_sa_lock); if (!cfs_list_empty(&entry->se_list)) cfs_list_del_init(&entry->se_list); /* stale entry */ if (unlikely(entry->se_index < sai->sai_index_next)) { - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); ll_sai_entry_free(entry); RETURN(0); } @@ -382,7 +397,7 @@ ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) { if (se->se_index < entry->se_index) { cfs_list_add(&entry->se_list, &se->se_list); - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); RETURN(1); } } @@ -391,7 +406,7 @@ ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry * I am the first entry. */ cfs_list_add(&entry->se_list, &sai->sai_entries_stated); - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); RETURN(1); } @@ -410,12 +425,12 @@ static int do_statahead_interpret(struct ll_statahead_info *sai) struct mdt_body *body; ENTRY; - cfs_spin_lock(&lli->lli_lock); + cfs_spin_lock(&lli->lli_sa_lock); LASSERT(!sa_received_empty(sai)); entry = cfs_list_entry(sai->sai_entries_received.next, struct ll_sai_entry, se_list); cfs_list_del_init(&entry->se_list); - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); if (unlikely(entry->se_index < sai->sai_index_next)) { CWARN("Found stale entry: [index %u] [next %u]\n", @@ -441,9 +456,12 @@ static int do_statahead_interpret(struct ll_statahead_info *sai) * lookup. */ struct dentry *save = dentry; + __u32 bits = 0; struct it_cb_data icbd = { .icbd_parent = minfo->mi_dir, - .icbd_childp = &dentry + .icbd_childp = &dentry, + .icbd_alias = &entry->se_inode, + .bits = &bits }; LASSERT(fid_is_zero(&minfo->mi_data.op_fid2)); @@ -468,9 +486,30 @@ static int do_statahead_interpret(struct ll_statahead_info *sai) * aliasing such dentry to the inode later. If we don't do such * drop here, it maybe cause deadlock with i_muext held by * others, just like bug 21739. */ - rc = ll_lookup_it_finish(req, it, &icbd, &entry->se_inode); - if (entry->se_inode != NULL) - entry->se_dentry = dget(dentry); + rc = ll_lookup_it_finish(req, it, &icbd); + if (entry->se_inode != NULL) { + struct ll_dentry_data *lld = ll_d2d(dentry); + struct ll_inode_info *sei = ll_i2info(entry->se_inode); + + /* For statahead lookup case, both MDS_INODELOCK_LOOKUP + * and MDS_INODELOCK_UPDATE should be granted */ + if (likely(bits & MDS_INODELOCK_LOOKUP && + bits & MDS_INODELOCK_UPDATE)) { + /* the first dentry ref_count will be dropped by + * ll_sai_entry_to_stated(), so hold another ref + * in advance */ + entry->se_dentry = dget(dentry); + cfs_spin_lock(&sei->lli_sa_lock); + cfs_list_add(&lld->lld_sa_alias, + &sei->lli_sa_dentry); + cfs_spin_unlock(&sei->lli_sa_lock); + sai->sai_nolock = 0; + } else { + iput(entry->se_inode); + entry->se_inode = NULL; + sai->sai_nolock++; + } + } LASSERT(dentry == save); ll_intent_drop_lock(it); } else { @@ -527,11 +566,11 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, CDEBUG(D_READA, "interpret statahead %.*s rc %d\n", dentry->d_name.len, dentry->d_name.name, rc); - cfs_spin_lock(&lli->lli_lock); + cfs_spin_lock(&lli->lli_sa_lock); /* stale entry */ if (unlikely(lli->lli_sai == NULL || lli->lli_sai->sai_generation != minfo->mi_generation)) { - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); ll_intent_release(it); dput(dentry); iput(dir); @@ -547,13 +586,13 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, if (likely(sa_is_running(sai))) { ll_sai_entry_to_received(sai, entry); sai->sai_replied++; - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); } else { if (!cfs_list_empty(&entry->se_list)) cfs_list_del_init(&entry->se_list); sai->sai_replied++; - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); ll_sai_entry_cleanup(entry, 1); } ll_sai_put(sai); @@ -738,9 +777,18 @@ static int ll_statahead_one(struct dentry *parent, const char* entry_name, ll_name2qstr(&name, entry_name, entry_name_len); dentry = d_lookup(parent, &name); if (!dentry) { + if (unlikely(sa_skip_nolock(sai))) { + CWARN("can not obtain lookup lock, skip the succeedent " + "lookup cases, will cause statahead miss, and " + "statahead maybe exit for that.\n"); + GOTO(out, rc = -EAGAIN); + } + dentry = d_alloc(parent, &name); if (dentry) { - rc = do_sa_lookup(dir, dentry); + rc = ll_dops_init(dentry, 1); + if (!rc) + rc = do_sa_lookup(dir, dentry); if (rc) dput(dentry); } else { @@ -791,9 +839,9 @@ static int ll_statahead_thread(void *arg) } atomic_inc(&sbi->ll_sa_total); - cfs_spin_lock(&lli->lli_lock); + cfs_spin_lock(&lli->lli_sa_lock); thread->t_flags = SVC_RUNNING; - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); cfs_waitq_signal(&thread->t_ctl_waitq); CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name); @@ -914,9 +962,9 @@ keep_de: out: ll_dir_chain_fini(&chain); - cfs_spin_lock(&lli->lli_lock); + cfs_spin_lock(&lli->lli_sa_lock); thread->t_flags = SVC_STOPPED; - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); cfs_waitq_signal(&sai->sai_waitq); cfs_waitq_signal(&thread->t_ctl_waitq); ll_sai_put(sai); @@ -936,9 +984,9 @@ void ll_stop_statahead(struct inode *inode, void *key) if (unlikely(key == NULL)) return; - cfs_spin_lock(&lli->lli_lock); + cfs_spin_lock(&lli->lli_sa_lock); if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) { - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); return; } @@ -950,7 +998,7 @@ void ll_stop_statahead(struct inode *inode, void *key) if (!sa_is_stopped(lli->lli_sai)) { thread->t_flags = SVC_STOPPING; - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); cfs_waitq_signal(&thread->t_ctl_waitq); CDEBUG(D_READA, "stopping statahead thread, pid %d\n", @@ -959,7 +1007,7 @@ void ll_stop_statahead(struct inode *inode, void *key) sa_is_stopped(lli->lli_sai), &lwi); } else { - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); } /* @@ -970,7 +1018,7 @@ void ll_stop_statahead(struct inode *inode, void *key) ll_sai_put(lli->lli_sai); } else { lli->lli_opendir_pid = 0; - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); } } @@ -1178,41 +1226,76 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) struct ll_sai_entry, se_list); /* This is for statahead lookup */ if (entry->se_inode != NULL) { - struct lookup_intent it = {.it_op = IT_GETATTR}; + struct lookup_intent it = {.it_op = IT_LOOKUP}; struct dentry *dchild = entry->se_dentry; struct inode *ichild = entry->se_inode; + struct ll_dentry_data *lld = ll_d2d(dchild); + struct ll_inode_info *sei = ll_i2info(ichild); + struct dentry *save = dchild; + int invalid = 0; + __u32 bits = MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_UPDATE; int found = 0; - __u32 bits; LASSERT(dchild != *dentryp); if (!lookup) mutex_lock(&dir->i_mutex); + /* + * Make sure dentry is still valid. + * For statahead lookup case, we need both + * LOOKUP lock and UPDATE lock which obtained + * by statahead thread originally. + * + * Consider following racer case: + * 1. statahead thread on client1 get lock with + * both LOOKUK and UPDATE bits for "aaa" + * 2. rename thread on client2 cancel such lock + * from client1, then rename "aaa" to "bbb" + * 3. ls thread on client1 obtain LOOKUP lock + * for "bbb" again + * 4. here the dentry "aaa" created by statahead + * thread should be invalid even related + * LOOKUP lock valid for the same inode + */ rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(ichild), &bits); - if (rc == 1) { - struct dentry *save = dchild; - - ll_lookup_it_alias(&dchild, ichild, - bits); - ll_lookup_finish_locks(&it, dchild); - if (dchild != save) - dput(save); - found = is_same_dentry(dchild, - *dentryp); - } else { - /* Someone has canceled related ldlm + cfs_spin_lock(&sei->lli_sa_lock); + if (!cfs_list_empty(&lld->lld_sa_alias)) + cfs_list_del_init(&lld->lld_sa_alias); + else + invalid = 1; + cfs_spin_unlock(&sei->lli_sa_lock); + if (rc != 1) + /* Someone has cancelled the original * lock before the real "revalidate" - * using it. - * Drop the inode reference count held - * by interpreter. */ - iput(ichild); + * using it. Drop it. */ + goto out_mutex; + + if (invalid) { + /* Someone has cancelled the original + * lock, and reobtained it, the dentry + * maybe invalid anymore, Drop it. */ + ll_intent_drop_lock(&it); + goto out_mutex; } + ll_lookup_it_alias(&dchild, ichild, bits); + ll_lookup_finish_locks(&it, dchild); + if (dchild != save) + dput(save); + found = is_same_dentry(dchild, *dentryp); + ichild = NULL; + +out_mutex: if (!lookup) mutex_unlock(&dir->i_mutex); + /* Drop the inode reference count held by + * interpreter. */ + if (ichild != NULL) + iput(ichild); entry->se_dentry = NULL; entry->se_inode = NULL; @@ -1224,7 +1307,18 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) * by itself. */ *dentryp = dchild; } else { - LASSERT(dchild == *dentryp); + LASSERTF(dchild == *dentryp, + "[%.*s/%.*s] " + "[%p "DFID"] " + "[%p "DFID"]\n", + dchild->d_parent->d_name.len, + dchild->d_parent->d_name.name, + dchild->d_name.len, + dchild->d_name.name, + dchild, + PFID(ll_inode2fid(dchild->d_inode)), + *dentryp, + PFID(ll_inode2fid((*dentryp)->d_inode))); /* Drop the dentry reference * count held by statahead. */ dput(dchild); @@ -1314,10 +1408,10 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) RETURN(-EEXIST); out: - cfs_spin_lock(&lli->lli_lock); + cfs_spin_lock(&lli->lli_sa_lock); lli->lli_opendir_key = NULL; lli->lli_opendir_pid = 0; - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); return rc; } @@ -1364,10 +1458,10 @@ void ll_statahead_exit(struct inode *dir, struct dentry *dentry, int result) PFID(&lli->lli_fid), sai->sai_hit, sai->sai_miss, sai->sai_sent, sai->sai_replied, cfs_curproc_pid()); - cfs_spin_lock(&lli->lli_lock); + cfs_spin_lock(&lli->lli_sa_lock); if (!sa_is_stopped(sai)) sai->sai_thread.t_flags = SVC_STOPPING; - cfs_spin_unlock(&lli->lli_lock); + cfs_spin_unlock(&lli->lli_sa_lock); } } diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index f2f23d1..5a378d3 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -844,11 +844,12 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, ENTRY; fid_build_reg_res_name(fid, &res_id); - /* As not all attributes are kept under update lock, e.g. - owner/group/acls are under lookup lock, we need both - ibits for GETATTR. */ - policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ? - MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP; + /* Firstly consider the bits */ + if (bits && *bits) + policy.l_inodebits.bits = *bits; + else + policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ? + MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP; mode = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS, @@ -1015,8 +1016,12 @@ int mdc_intent_getattr_async(struct obd_export *exp, struct ptlrpc_request *req; struct obd_device *obddev = class_exp2obd(exp); struct ldlm_res_id res_id; + /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed + * for statahead currently. Consider CMD in future, such two bits + * maybe managed by different MDS, should be adjusted then. */ ldlm_policy_data_t policy = { - .l_inodebits = { MDS_INODELOCK_LOOKUP } + .l_inodebits = { MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_UPDATE } }; int rc; int flags = LDLM_FL_HAS_INTENT; -- 1.8.3.1