X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fllite%2Fstatahead.c;h=5cf4b9c88cc52400505ae673414d6a2f2d9078cd;hb=b58a81067237f398f735f85b2211431cfc37493a;hp=cb1883b4b58916ffc2982d383098648e274d73ce;hpb=49c9474995673394c223a32a4ffbfbb8d7162a83;p=fs%2Flustre-release.git diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index cb1883b..5cf4b9c 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -29,8 +27,7 @@ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011 Whamcloud, Inc. - * + * Copyright (c) 2011, 2012, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -40,7 +37,6 @@ #include #include #include -#include #include #include @@ -49,7 +45,6 @@ #include #include #include -#include #include "llite_internal.h" #define SA_OMITTED_ENTRY_MAX 8ULL @@ -88,7 +83,7 @@ struct ll_sa_entry { }; static unsigned int sai_generation = 0; -static cfs_spinlock_t sai_generation_lock = CFS_SPIN_LOCK_UNLOCKED; +static DEFINE_SPINLOCK(sai_generation_lock); static inline int ll_sa_entry_unlinked(struct ll_sa_entry *entry) { @@ -140,14 +135,42 @@ ll_sa_entry_unhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry) cfs_spin_unlock(&sai->sai_cache_lock[i]); } +static inline int agl_should_run(struct ll_statahead_info *sai, + struct inode *inode) +{ + if (inode != NULL && S_ISREG(inode->i_mode) && + ll_i2info(inode)->lli_has_smd && sai->sai_agl_valid) + return 1; + return 0; +} + +static inline struct ll_sa_entry * +sa_first_received_entry(struct ll_statahead_info *sai) +{ + return cfs_list_entry(sai->sai_entries_received.next, + struct ll_sa_entry, se_list); +} + +static inline struct ll_inode_info * +agl_first_entry(struct ll_statahead_info *sai) +{ + return cfs_list_entry(sai->sai_entries_agl.next, + struct ll_inode_info, lli_agl_list); +} + +static inline int sa_sent_full(struct ll_statahead_info *sai) +{ + return cfs_atomic_read(&sai->sai_cache_count) >= sai->sai_max; +} + static inline int sa_received_empty(struct ll_statahead_info *sai) { return cfs_list_empty(&sai->sai_entries_received); } -static inline int sa_not_full(struct ll_statahead_info *sai) +static inline int agl_list_empty(struct ll_statahead_info *sai) { - return (cfs_atomic_read(&sai->sai_cache_count) < sai->sai_max); + return cfs_list_empty(&sai->sai_entries_agl); } /** @@ -417,6 +440,37 @@ ll_sa_entry_to_stated(struct ll_statahead_info *sai, return ret; } +/* + * Insert inode into the list of sai_entries_agl. + */ +static void ll_agl_add(struct ll_statahead_info *sai, + struct inode *inode, int index) +{ + struct ll_inode_info *child = ll_i2info(inode); + struct ll_inode_info *parent = ll_i2info(sai->sai_inode); + int added = 0; + + cfs_spin_lock(&child->lli_agl_lock); + if (child->lli_agl_index == 0) { + child->lli_agl_index = index; + cfs_spin_unlock(&child->lli_agl_lock); + + LASSERT(cfs_list_empty(&child->lli_agl_list)); + + igrab(inode); + cfs_spin_lock(&parent->lli_agl_lock); + if (agl_list_empty(sai)) + added = 1; + cfs_list_add_tail(&child->lli_agl_list, &sai->sai_entries_agl); + cfs_spin_unlock(&parent->lli_agl_lock); + } else { + cfs_spin_unlock(&child->lli_agl_lock); + } + + if (added > 0) + cfs_waitq_signal(&sai->sai_agl_thread.t_ctl_waitq); +} + static struct ll_statahead_info *ll_sai_alloc(void) { struct ll_statahead_info *sai; @@ -428,17 +482,24 @@ static struct ll_statahead_info *ll_sai_alloc(void) RETURN(NULL); cfs_atomic_set(&sai->sai_refcount, 1); + cfs_spin_lock(&sai_generation_lock); sai->sai_generation = ++sai_generation; if (unlikely(sai_generation == 0)) sai->sai_generation = ++sai_generation; cfs_spin_unlock(&sai_generation_lock); + sai->sai_max = LL_SA_RPC_MIN; + sai->sai_index = 1; cfs_waitq_init(&sai->sai_waitq); cfs_waitq_init(&sai->sai_thread.t_ctl_waitq); + cfs_waitq_init(&sai->sai_agl_thread.t_ctl_waitq); + CFS_INIT_LIST_HEAD(&sai->sai_entries_sent); CFS_INIT_LIST_HEAD(&sai->sai_entries_received); CFS_INIT_LIST_HEAD(&sai->sai_entries_stated); + CFS_INIT_LIST_HEAD(&sai->sai_entries_agl); + for (i = 0; i < LL_SA_CACHE_SIZE; i++) { CFS_INIT_LIST_HEAD(&sai->sai_cache[i]); cfs_spin_lock_init(&sai->sai_cache_lock[i]); @@ -472,12 +533,13 @@ static void ll_sai_put(struct ll_statahead_info *sai) } LASSERT(lli->lli_opendir_key == NULL); + LASSERT(thread_is_stopped(&sai->sai_thread)); + LASSERT(thread_is_stopped(&sai->sai_agl_thread)); + lli->lli_sai = NULL; lli->lli_opendir_pid = 0; cfs_spin_unlock(&lli->lli_sa_lock); - LASSERT(thread_is_stopped(&sai->sai_thread)); - if (sai->sai_sent > sai->sai_replied) CDEBUG(D_READA,"statahead for dir "DFID" does not " "finish: [sent:"LPU64"] [replied:"LPU64"]\n", @@ -495,6 +557,7 @@ static void ll_sai_put(struct ll_statahead_info *sai) do_sai_entry_fini(sai, entry); LASSERT(cfs_atomic_read(&sai->sai_cache_count) == 0); + LASSERT(agl_list_empty(sai)); iput(inode); OBD_FREE_PTR(sai); @@ -503,6 +566,69 @@ static void ll_sai_put(struct ll_statahead_info *sai) EXIT; } +/* Do NOT forget to drop inode refcount when into sai_entries_agl. */ +static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) +{ + struct ll_inode_info *lli = ll_i2info(inode); + __u64 index = lli->lli_agl_index; + int rc; + ENTRY; + + LASSERT(cfs_list_empty(&lli->lli_agl_list)); + + /* AGL maybe fall behind statahead with one entry */ + if (is_omitted_entry(sai, index + 1)) { + lli->lli_agl_index = 0; + iput(inode); + RETURN_EXIT; + } + + /* Someone is in glimpse (sync or async), do nothing. */ + rc = cfs_down_write_trylock(&lli->lli_glimpse_sem); + if (rc == 0) { + lli->lli_agl_index = 0; + iput(inode); + RETURN_EXIT; + } + + /* + * Someone triggered glimpse within 1 sec before. + * 1) The former glimpse succeeded with glimpse lock granted by OST, and + * if the lock is still cached on client, AGL needs to do nothing. If + * it is cancelled by other client, AGL maybe cannot obtaion new lock + * for no glimpse callback triggered by AGL. + * 2) The former glimpse succeeded, but OST did not grant glimpse lock. + * Under such case, it is quite possible that the OST will not grant + * glimpse lock for AGL also. + * 3) The former glimpse failed, compared with other two cases, it is + * relative rare. AGL can ignore such case, and it will not muchly + * affect the performance. + */ + if (lli->lli_glimpse_time != 0 && + cfs_time_before(cfs_time_shift(-1), lli->lli_glimpse_time)) { + cfs_up_write(&lli->lli_glimpse_sem); + lli->lli_agl_index = 0; + iput(inode); + RETURN_EXIT; + } + + CDEBUG(D_READA, "Handling (init) async glimpse: inode = " + DFID", idx = "LPU64"\n", PFID(&lli->lli_fid), index); + + cl_agl(inode); + lli->lli_agl_index = 0; + lli->lli_glimpse_time = cfs_time_current(); + cfs_up_write(&lli->lli_glimpse_sem); + + CDEBUG(D_READA, "Handled (init) async glimpse: inode= " + DFID", idx = "LPU64", rc = %d\n", + PFID(&lli->lli_fid), index, rc); + + iput(inode); + + EXIT; +} + static void do_statahead_interpret(struct ll_statahead_info *sai, struct ll_sa_entry *target) { @@ -525,8 +651,7 @@ static void do_statahead_interpret(struct ll_statahead_info *sai, cfs_spin_unlock(&lli->lli_sa_lock); RETURN_EXIT; } else { - entry = cfs_list_entry(sai->sai_entries_received.next, - struct ll_sa_entry, se_list); + entry = sa_first_received_entry(sai); } cfs_atomic_inc(&entry->se_refcount); @@ -580,6 +705,9 @@ static void do_statahead_interpret(struct ll_statahead_info *sai, entry->se_inode = child; + if (agl_should_run(sai, child)) + ll_agl_add(sai, child, entry->se_index); + EXIT; out: @@ -689,11 +817,12 @@ static void sa_args_fini(struct md_enqueue_info *minfo, * "md_intent_getattr_async". */ static int sa_args_init(struct inode *dir, struct inode *child, - struct qstr *qstr, struct md_enqueue_info **pmi, + struct ll_sa_entry *entry, struct md_enqueue_info **pmi, struct ldlm_enqueue_info **pei, struct obd_capa **pcapa) { - struct ll_inode_info *lli = ll_i2info(dir); + struct qstr *qstr = &entry->se_qstr; + struct ll_inode_info *lli = ll_i2info(dir); struct md_enqueue_info *minfo; struct ldlm_enqueue_info *einfo; struct md_op_data *op_data; @@ -720,7 +849,7 @@ static int sa_args_init(struct inode *dir, struct inode *child, minfo->mi_dir = igrab(dir); minfo->mi_cb = ll_statahead_interpret; minfo->mi_generation = lli->lli_sai->sai_generation; - minfo->mi_cbdata = lli->lli_sai->sai_index; + minfo->mi_cbdata = entry->se_index; einfo->ei_type = LDLM_IBITS; einfo->ei_mode = it_to_lock_mode(&minfo->mi_it); @@ -745,7 +874,7 @@ static int do_sa_lookup(struct inode *dir, struct ll_sa_entry *entry) int rc; ENTRY; - rc = sa_args_init(dir, NULL, &entry->se_qstr, &minfo, &einfo, capas); + rc = sa_args_init(dir, NULL, entry, &minfo, &einfo, capas); if (rc) RETURN(rc); @@ -795,7 +924,7 @@ static int do_sa_revalidate(struct inode *dir, struct ll_sa_entry *entry, RETURN(1); } - rc = sa_args_init(dir, inode, &entry->se_qstr, &minfo, &einfo, capas); + rc = sa_args_init(dir, inode, entry, &minfo, &einfo, capas); if (rc) { entry->se_inode = NULL; iput(inode); @@ -818,9 +947,9 @@ static int do_sa_revalidate(struct inode *dir, struct ll_sa_entry *entry, static void ll_statahead_one(struct dentry *parent, const char* entry_name, int entry_name_len) { - struct inode *dir = parent->d_inode; - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai = lli->lli_sai; + struct inode *dir = parent->d_inode; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai = lli->lli_sai; struct dentry *dentry = NULL; struct ll_sa_entry *entry; int rc; @@ -833,10 +962,13 @@ static void ll_statahead_one(struct dentry *parent, const char* entry_name, RETURN_EXIT; dentry = d_lookup(parent, &entry->se_qstr); - if (!dentry) + if (!dentry) { rc = do_sa_lookup(dir, entry); - else + } else { rc = do_sa_revalidate(dir, entry, dentry); + if (rc == 1 && agl_should_run(sai, dentry->d_inode)) + ll_agl_add(sai, dentry->d_inode, entry->se_index); + } if (dentry != NULL) dput(dentry); @@ -857,40 +989,138 @@ static void ll_statahead_one(struct dentry *parent, const char* entry_name, EXIT; } +static int ll_agl_thread(void *arg) +{ + struct dentry *parent = (struct dentry *)arg; + struct inode *dir = parent->d_inode; + struct ll_inode_info *plli = ll_i2info(dir); + struct ll_inode_info *clli; + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai); + struct ptlrpc_thread *thread = &sai->sai_agl_thread; + struct l_wait_info lwi = { 0 }; + ENTRY; + + { + char pname[16]; + snprintf(pname, 15, "ll_agl_%u", plli->lli_opendir_pid); + cfs_daemonize(pname); + } + + CDEBUG(D_READA, "agl thread started: [pid %d] [parent %.*s]\n", + cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + + atomic_inc(&sbi->ll_agl_total); + cfs_spin_lock(&plli->lli_agl_lock); + sai->sai_agl_valid = 1; + thread_set_flags(thread, SVC_RUNNING); + cfs_spin_unlock(&plli->lli_agl_lock); + cfs_waitq_signal(&thread->t_ctl_waitq); + + while (1) { + l_wait_event(thread->t_ctl_waitq, + !agl_list_empty(sai) || + !thread_is_running(thread), + &lwi); + + if (!thread_is_running(thread)) + break; + + cfs_spin_lock(&plli->lli_agl_lock); + /* The statahead thread maybe help to process AGL entries, + * so check whether list empty again. */ + if (!agl_list_empty(sai)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + cfs_spin_unlock(&plli->lli_agl_lock); + ll_agl_trigger(&clli->lli_vfs_inode, sai); + } else { + cfs_spin_unlock(&plli->lli_agl_lock); + } + } + + cfs_spin_lock(&plli->lli_agl_lock); + sai->sai_agl_valid = 0; + while (!agl_list_empty(sai)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + cfs_spin_unlock(&plli->lli_agl_lock); + clli->lli_agl_index = 0; + iput(&clli->lli_vfs_inode); + cfs_spin_lock(&plli->lli_agl_lock); + } + thread_set_flags(thread, SVC_STOPPED); + cfs_spin_unlock(&plli->lli_agl_lock); + cfs_waitq_signal(&thread->t_ctl_waitq); + ll_sai_put(sai); + CDEBUG(D_READA, "agl thread stopped: [pid %d] [parent %.*s]\n", + cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + RETURN(0); +} + +static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai) +{ + struct ptlrpc_thread *thread = &sai->sai_agl_thread; + struct l_wait_info lwi = { 0 }; + int rc; + ENTRY; + + CDEBUG(D_READA, "start agl thread: [pid %d] [parent %.*s]\n", + cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + + rc = cfs_create_thread(ll_agl_thread, parent, 0); + if (rc < 0) { + CERROR("can't start ll_agl thread, rc: %d\n", rc); + thread_set_flags(thread, SVC_STOPPED); + RETURN_EXIT; + } + + l_wait_event(thread->t_ctl_waitq, + thread_is_running(thread) || thread_is_stopped(thread), + &lwi); + EXIT; +} + static int ll_statahead_thread(void *arg) { struct dentry *parent = (struct dentry *)arg; - struct inode *dir = parent->d_inode; - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_sb_info *sbi = ll_i2sbi(dir); - struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai); + struct inode *dir = parent->d_inode; + struct ll_inode_info *plli = ll_i2info(dir); + struct ll_inode_info *clli; + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_statahead_info *sai = ll_sai_get(plli->lli_sai); struct ptlrpc_thread *thread = &sai->sai_thread; + struct ptlrpc_thread *agl_thread = &sai->sai_agl_thread; struct page *page; - __u64 pos = 0; - int first = 0; - int rc = 0; + __u64 pos = 0; + int first = 0; + int rc = 0; struct ll_dir_chain chain; + struct l_wait_info lwi = { 0 }; ENTRY; { char pname[16]; - snprintf(pname, 15, "ll_sa_%u", lli->lli_opendir_pid); + snprintf(pname, 15, "ll_sa_%u", plli->lli_opendir_pid); cfs_daemonize(pname); } + CDEBUG(D_READA, "statahead thread started: [pid %d] [parent %.*s]\n", + cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + + if (sbi->ll_flags & LL_SBI_AGL_ENABLED) + ll_start_agl(parent, sai); + atomic_inc(&sbi->ll_sa_total); - cfs_spin_lock(&lli->lli_sa_lock); + cfs_spin_lock(&plli->lli_sa_lock); thread_set_flags(thread, SVC_RUNNING); - cfs_spin_unlock(&lli->lli_sa_lock); + cfs_spin_unlock(&plli->lli_sa_lock); cfs_waitq_signal(&thread->t_ctl_waitq); - CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name); - lli->lli_sa_pos = 0; ll_dir_chain_init(&chain); - page = ll_get_dir_page(NULL, dir, pos, &chain); + page = ll_get_dir_page(dir, pos, &chain); while (1) { - struct l_wait_info lwi = { 0 }; struct lu_dirpage *dp; struct lu_dirent *ent; @@ -899,7 +1129,7 @@ static int ll_statahead_thread(void *arg) CDEBUG(D_READA, "error reading dir "DFID" at "LPU64 "/"LPU64": [rc %d] [parent %u]\n", PFID(ll_inode2fid(dir)), pos, sai->sai_index, - rc, lli->lli_opendir_pid); + rc, plli->lli_opendir_pid); GOTO(out, rc); } @@ -951,27 +1181,55 @@ static int ll_statahead_thread(void *arg) if (unlikely(++first == 1)) continue; -keep_de: +keep_it: l_wait_event(thread->t_ctl_waitq, - sa_not_full(sai) || + !sa_sent_full(sai) || !sa_received_empty(sai) || + !agl_list_empty(sai) || !thread_is_running(thread), &lwi); +interpret_it: while (!sa_received_empty(sai)) do_statahead_interpret(sai, NULL); if (unlikely(!thread_is_running(thread))) { ll_release_page(page, 0); - GOTO(out, rc); + GOTO(out, rc = 0); } - if (!sa_not_full(sai)) - /* - * do not skip the current de. - */ - goto keep_de; + /* If no window for metadata statahead, but there are + * some AGL entries to be triggered, then try to help + * to process the AGL entries. */ + if (sa_sent_full(sai)) { + cfs_spin_lock(&plli->lli_agl_lock); + while (!agl_list_empty(sai)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + cfs_spin_unlock(&plli->lli_agl_lock); + ll_agl_trigger(&clli->lli_vfs_inode, + sai); + + if (!sa_received_empty(sai)) + goto interpret_it; + + if (unlikely( + !thread_is_running(thread))) { + ll_release_page(page, 0); + GOTO(out, rc = 0); + } + + if (!sa_sent_full(sai)) + goto do_it; + + cfs_spin_lock(&plli->lli_agl_lock); + } + cfs_spin_unlock(&plli->lli_agl_lock); + + goto keep_it; + } +do_it: ll_statahead_one(parent, name, namelen); } pos = le64_to_cpu(dp->ldp_hash_end); @@ -990,11 +1248,26 @@ keep_de: while (!sa_received_empty(sai)) do_statahead_interpret(sai, NULL); - if ((sai->sai_sent == sai->sai_replied && - sa_received_empty(sai)) || - !thread_is_running(thread)) + if (unlikely(!thread_is_running(thread))) GOTO(out, rc = 0); + + if (sai->sai_sent == sai->sai_replied && + sa_received_empty(sai)) + break; } + + cfs_spin_lock(&plli->lli_agl_lock); + while (!agl_list_empty(sai) && + thread_is_running(thread)) { + clli = agl_first_entry(sai); + cfs_list_del_init(&clli->lli_agl_list); + cfs_spin_unlock(&plli->lli_agl_lock); + ll_agl_trigger(&clli->lli_vfs_inode, sai); + cfs_spin_lock(&plli->lli_agl_lock); + } + cfs_spin_unlock(&plli->lli_agl_lock); + + GOTO(out, rc = 0); } else if (1) { /* * chain is exhausted. @@ -1002,9 +1275,8 @@ keep_de: */ ll_release_page(page, le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); - lli->lli_sa_pos = pos; sai->sai_in_readpage = 1; - page = ll_get_dir_page(NULL, dir, pos, &chain); + page = ll_get_dir_page(dir, pos, &chain); sai->sai_in_readpage = 0; } else { LASSERT(le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); @@ -1017,26 +1289,41 @@ keep_de: EXIT; out: + if (sai->sai_agl_valid) { + cfs_spin_lock(&plli->lli_agl_lock); + thread_set_flags(agl_thread, SVC_STOPPING); + cfs_spin_unlock(&plli->lli_agl_lock); + cfs_waitq_signal(&agl_thread->t_ctl_waitq); + + CDEBUG(D_READA, "stop agl thread: [pid %d]\n", + cfs_curproc_pid()); + l_wait_event(agl_thread->t_ctl_waitq, + thread_is_stopped(agl_thread), + &lwi); + } else { + /* Set agl_thread flags anyway. */ + thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED); + } ll_dir_chain_fini(&chain); - cfs_spin_lock(&lli->lli_sa_lock); + cfs_spin_lock(&plli->lli_sa_lock); if (!sa_received_empty(sai)) { thread_set_flags(thread, SVC_STOPPING); - cfs_spin_unlock(&lli->lli_sa_lock); + cfs_spin_unlock(&plli->lli_sa_lock); /* To release the resources held by received entries. */ while (!sa_received_empty(sai)) do_statahead_interpret(sai, NULL); - cfs_spin_lock(&lli->lli_sa_lock); + cfs_spin_lock(&plli->lli_sa_lock); } thread_set_flags(thread, SVC_STOPPED); - cfs_spin_unlock(&lli->lli_sa_lock); + cfs_spin_unlock(&plli->lli_sa_lock); cfs_waitq_signal(&sai->sai_waitq); cfs_waitq_signal(&thread->t_ctl_waitq); ll_sai_put(sai); dput(parent); - CDEBUG(D_READA, "statahead thread stopped, pid %d\n", - cfs_curproc_pid()); + CDEBUG(D_READA, "statahead thread stopped: [pid %d] [parent %.*s]\n", + cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); return rc; } @@ -1067,7 +1354,7 @@ void ll_stop_statahead(struct inode *dir, void *key) cfs_spin_unlock(&lli->lli_sa_lock); cfs_waitq_signal(&thread->t_ctl_waitq); - CDEBUG(D_READA, "stopping statahead thread, pid %d\n", + CDEBUG(D_READA, "stop statahead thread: [pid %d]\n", cfs_curproc_pid()); l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread), @@ -1105,18 +1392,16 @@ enum { static int is_first_dirent(struct inode *dir, struct dentry *dentry) { - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_dir_chain chain; - struct qstr *target = &dentry->d_name; - struct page *page; - __u64 pos = 0; - int dot_de; - int rc = LS_NONE_FIRST_DE; + struct ll_dir_chain chain; + struct qstr *target = &dentry->d_name; + struct page *page; + __u64 pos = 0; + int dot_de; + int rc = LS_NONE_FIRST_DE; ENTRY; - lli->lli_sa_pos = 0; ll_dir_chain_init(&chain); - page = ll_get_dir_page(NULL, dir, pos, &chain); + page = ll_get_dir_page(dir, pos, &chain); while (1) { struct lu_dirpage *dp; @@ -1203,8 +1488,7 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry) */ ll_release_page(page, le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); - lli->lli_sa_pos = pos; - page = ll_get_dir_page(NULL, dir, pos, &chain); + page = ll_get_dir_page(dir, pos, &chain); } else { /* * go into overflow page. @@ -1225,7 +1509,7 @@ ll_sai_unplug(struct ll_statahead_info *sai, struct ll_sa_entry *entry) { struct ptlrpc_thread *thread = &sai->sai_thread; struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode); - int hit; + int hit; ENTRY; if (entry != NULL && entry->se_stat == SA_ENTRY_SUCC) @@ -1355,27 +1639,35 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, struct lookup_intent it = { .it_op = IT_GETATTR, .d.lustre.it_lock_handle = entry->se_handle }; - struct ll_dentry_data *lld; - __u64 bits; - - rc = md_revalidate_lock(ll_i2mdexp(dir), &it, - ll_inode2fid(inode), &bits); - if (rc == 1) { - if ((*dentryp)->d_inode == NULL) { - *dentryp = ll_find_alias(inode, - *dentryp); - lld = ll_d2d(*dentryp); - if (unlikely(lld == NULL)) - ll_dops_init(*dentryp, 1, 1); + __u64 bits; + + rc = md_revalidate_lock(ll_i2mdexp(dir), &it, + ll_inode2fid(inode), &bits); + if (rc == 1) { + if ((*dentryp)->d_inode == NULL) { + *dentryp = ll_splice_alias(inode, + *dentryp); + } else if ((*dentryp)->d_inode != inode) { + /* revalidate, but inode is recreated */ + CDEBUG(D_READA, + "stale dentry %.*s inode %lu/%u, " + "statahead inode %lu/%u\n", + (*dentryp)->d_name.len, + (*dentryp)->d_name.name, + (*dentryp)->d_inode->i_ino, + (*dentryp)->d_inode->i_generation, + inode->i_ino, + inode->i_generation); + ll_sai_unplug(sai, entry); + RETURN(-ESTALE); } else { - LASSERT((*dentryp)->d_inode == inode); - - ll_dentry_rehash(*dentryp, 0); - iput(inode); - } - entry->se_inode = NULL; + iput(inode); + } + entry->se_inode = NULL; - ll_dentry_reset_flags(*dentryp, bits); + if ((bits & MDS_INODELOCK_LOOKUP) && + d_lustre_invalid(*dentryp)) + d_lustre_revalidate(*dentryp); ll_intent_release(&it); } } @@ -1416,6 +1708,9 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, GOTO(out, rc = -EAGAIN); } + CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n", + cfs_curproc_pid(), parent->d_name.len, parent->d_name.name); + lli->lli_sai = sai; rc = cfs_create_thread(ll_statahead_thread, parent, 0); thread = &sai->sai_thread; @@ -1424,6 +1719,7 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, dput(parent); lli->lli_opendir_key = NULL; thread_set_flags(thread, SVC_STOPPED); + thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED); ll_sai_put(sai); LASSERT(lli->lli_sai == NULL); RETURN(-EAGAIN); @@ -1435,9 +1731,9 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp, /* * We don't stat-ahead for the first dirent since we are already in - * lookup, and -EEXIST also indicates that this is the first dirent. + * lookup. */ - RETURN(-EEXIST); + RETURN(-EAGAIN); out: if (sai != NULL)