X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fllite%2Fstatahead.c;h=0f1dfe3ad2e0e1e28f585ee952ed27bd7df32648;hp=2fab1df172c53a328ca7de7bbee4cc41149ea023;hb=38e6ee6fb779c34c7ef3eb4a010332385b3b84c1;hpb=2c00faec12ea60b8955fe0e793b32ef25795ed42 diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index 2fab1df..0f1dfe3 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,7 +23,7 @@ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2014, Intel Corporation. + * Copyright (c) 2011, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -183,8 +179,8 @@ static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index) /* allocate sa_entry and hash it to allow scanner process to find it */ static struct sa_entry * -sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len, - const struct lu_fid *fid) +sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index, + const char *name, int len, const struct lu_fid *fid) { struct ll_inode_info *lli; struct sa_entry *entry; @@ -197,7 +193,7 @@ sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len, if (unlikely(entry == NULL)) RETURN(ERR_PTR(-ENOMEM)); - CDEBUG(D_READA, "alloc sa entry %.*s(%p) index "LPU64"\n", + CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n", len, name, entry, index); entry->se_index = index; @@ -207,7 +203,7 @@ sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len, dname = (char *)entry + sizeof(struct sa_entry); memcpy(dname, name, len); dname[len] = 0; - entry->se_qstr.hash = full_name_hash(name, len); + entry->se_qstr.hash = ll_full_name_hash(parent, name, len); entry->se_qstr.len = len; entry->se_qstr.name = dname; entry->se_fid = *fid; @@ -227,7 +223,7 @@ sa_alloc(struct ll_statahead_info *sai, __u64 index, const char *name, int len, /* free sa_entry, which should have been unhashed and not in any list */ static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry) { - CDEBUG(D_READA, "free sa entry %.*s(%p) index "LPU64"\n", + CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n", entry->se_qstr.len, entry->se_qstr.name, entry, entry->se_index); @@ -329,11 +325,67 @@ __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret) } } list_add(&entry->se_list, pos); - entry->se_state = ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC; + /* + * LU-9210: ll_statahead_interpet must be able to see this before + * we wake it up + */ + smp_store_release(&entry->se_state, ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC); return (index == sai->sai_index_wait); } +/* finish async stat RPC arguments */ +static void sa_fini_data(struct md_enqueue_info *minfo) +{ + ll_unlock_md_op_lsm(&minfo->mi_data); + iput(minfo->mi_dir); + OBD_FREE_PTR(minfo); +} + +static int ll_statahead_interpret(struct ptlrpc_request *req, + struct md_enqueue_info *minfo, int rc); + +/* + * prepare arguments for async stat RPC. + */ +static struct md_enqueue_info * +sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry) +{ + struct md_enqueue_info *minfo; + struct ldlm_enqueue_info *einfo; + struct md_op_data *op_data; + + OBD_ALLOC_PTR(minfo); + if (minfo == NULL) + return ERR_PTR(-ENOMEM); + + op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child, + entry->se_qstr.name, entry->se_qstr.len, 0, + LUSTRE_OPC_ANY, NULL); + if (IS_ERR(op_data)) { + OBD_FREE_PTR(minfo); + return (struct md_enqueue_info *)op_data; + } + + if (child == NULL) + op_data->op_fid2 = entry->se_fid; + + minfo->mi_it.it_op = IT_GETATTR; + minfo->mi_dir = igrab(dir); + minfo->mi_cb = ll_statahead_interpret; + minfo->mi_cbdata = entry; + + einfo = &minfo->mi_einfo; + einfo->ei_type = LDLM_IBITS; + einfo->ei_mode = it_to_lock_mode(&minfo->mi_it); + einfo->ei_cb_bl = ll_md_blocking_ast; + einfo->ei_cb_cp = ldlm_completion_ast; + einfo->ei_cb_gl = NULL; + einfo->ei_cbdata = NULL; + + return minfo; +} + /* * release resources used in async stat RPC, update entry state and wakeup if * scanner process it waiting on this entry. @@ -350,8 +402,7 @@ sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret) if (minfo) { entry->se_minfo = NULL; ll_intent_release(&minfo->mi_it); - iput(minfo->mi_dir); - OBD_FREE_PTR(minfo); + sa_fini_data(minfo); } if (req) { @@ -497,10 +548,11 @@ static void ll_sai_put(struct ll_statahead_info *sai) static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) { struct ll_inode_info *lli = ll_i2info(inode); - __u64 index = lli->lli_agl_index; + u64 index = lli->lli_agl_index; + ktime_t expire; int rc; - ENTRY; + ENTRY; LASSERT(list_empty(&lli->lli_agl_list)); /* AGL maybe fall behind statahead with one entry */ @@ -510,6 +562,18 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) RETURN_EXIT; } + /* In case of restore, the MDT has the right size and has already + * sent it back without granting the layout lock, inode is up-to-date. + * Then AGL (async glimpse lock) is useless. + * Also to glimpse we need the layout, in case of a runninh restore + * the MDT holds the layout lock so the glimpse will block up to the + * end of restore (statahead/agl will block) */ + if (ll_file_test_flag(lli, LLIF_FILE_RESTORING)) { + lli->lli_agl_index = 0; + iput(inode); + RETURN_EXIT; + } + /* Someone is in glimpse (sync or async), do nothing. */ rc = down_write_trylock(&lli->lli_glimpse_sem); if (rc == 0) { @@ -531,8 +595,9 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) * relative rare. AGL can ignore such case, and it will not muchly * affect the performance. */ - if (lli->lli_glimpse_time != 0 && - cfs_time_before(cfs_time_shift(-1), lli->lli_glimpse_time)) { + expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC); + if (ktime_to_ns(lli->lli_glimpse_time) && + ktime_before(expire, lli->lli_glimpse_time)) { up_write(&lli->lli_glimpse_sem); lli->lli_agl_index = 0; iput(inode); @@ -540,15 +605,15 @@ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) } CDEBUG(D_READA, "Handling (init) async glimpse: inode = " - DFID", idx = "LPU64"\n", PFID(&lli->lli_fid), index); + DFID", idx = %llu\n", PFID(&lli->lli_fid), index); cl_agl(inode); lli->lli_agl_index = 0; - lli->lli_glimpse_time = cfs_time_current(); + lli->lli_glimpse_time = ktime_get(); up_write(&lli->lli_glimpse_sem); CDEBUG(D_READA, "Handled (init) async glimpse: inode= " - DFID", idx = "LPU64", rc = %d\n", + DFID", idx = %llu, rc = %d\n", PFID(&lli->lli_fid), index, rc); iput(inode); @@ -572,14 +637,14 @@ static void sa_instantiate(struct ll_statahead_info *sai, int rc = 0; ENTRY; - LASSERT(entry->se_handle != 0); + LASSERT(entry->se_handle != 0); - minfo = entry->se_minfo; - it = &minfo->mi_it; - req = entry->se_req; - body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - if (body == NULL) - GOTO(out, rc = -EFAULT); + minfo = entry->se_minfo; + it = &minfo->mi_it; + req = entry->se_req; + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + GOTO(out, rc = -EFAULT); child = entry->se_inode; if (child != NULL) { @@ -592,27 +657,26 @@ static void sa_instantiate(struct ll_statahead_info *sai, } } - it->d.lustre.it_lock_handle = entry->se_handle; + it->it_lock_handle = entry->se_handle; rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL); - if (rc != 1) - GOTO(out, rc = -EAGAIN); + if (rc != 1) + GOTO(out, rc = -EAGAIN); - rc = ll_prep_inode(&child, req, dir->i_sb, it); - if (rc) - GOTO(out, rc); + rc = ll_prep_inode(&child, req, dir->i_sb, it); + if (rc) + GOTO(out, rc); CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n", - ll_get_fsname(child->i_sb, NULL, 0), - entry->se_qstr.len, entry->se_qstr.name, - PFID(ll_inode2fid(child)), child); - ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL); + ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len, + entry->se_qstr.name, PFID(ll_inode2fid(child)), child); + ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL); - entry->se_inode = child; + entry->se_inode = child; - if (agl_should_run(sai, child)) - ll_agl_add(sai, child, entry->se_index); + if (agl_should_run(sai, child)) + ll_agl_add(sai, child, entry->se_index); - EXIT; + EXIT; out: /* sa_make_ready() will drop ldlm ibits lock refcount by calling @@ -628,21 +692,19 @@ static void sa_handle_callback(struct ll_statahead_info *sai) lli = ll_i2info(sai->sai_dentry->d_inode); + spin_lock(&lli->lli_sa_lock); while (sa_has_callback(sai)) { struct sa_entry *entry; - spin_lock(&lli->lli_sa_lock); - if (unlikely(!sa_has_callback(sai))) { - spin_unlock(&lli->lli_sa_lock); - break; - } entry = list_entry(sai->sai_interim_entries.next, struct sa_entry, se_list); list_del_init(&entry->se_list); spin_unlock(&lli->lli_sa_lock); sa_instantiate(sai, entry); + spin_lock(&lli->lli_sa_lock); } + spin_unlock(&lli->lli_sa_lock); } /* @@ -659,7 +721,7 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, struct ll_statahead_info *sai = lli->lli_sai; struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata; __u64 handle = 0; - bool wakeup; + wait_queue_head_t *waitq = NULL; ENTRY; if (it_disposition(it, DISP_LOOKUP_NEG)) @@ -676,20 +738,21 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, if (rc != 0) { ll_intent_release(it); - iput(dir); - OBD_FREE_PTR(minfo); + sa_fini_data(minfo); } else { /* release ibits lock ASAP to avoid deadlock when statahead * thread enqueues lock on parent in readdir and another * process enqueues lock on child with parent lock held, eg. * unlink. */ - handle = it->d.lustre.it_lock_handle; + handle = it->it_lock_handle; ll_intent_drop_lock(it); + ll_unlock_md_op_lsm(&minfo->mi_data); } spin_lock(&lli->lli_sa_lock); if (rc != 0) { - wakeup = __sa_make_ready(sai, entry, rc); + if (__sa_make_ready(sai, entry, rc)) + waitq = &sai->sai_waitq; } else { entry->se_minfo = minfo; entry->se_req = ptlrpc_request_addref(req); @@ -698,64 +761,20 @@ static int ll_statahead_interpret(struct ptlrpc_request *req, * for readpage and other tries to enqueue lock on child * with parent's lock held, for example: unlink. */ entry->se_handle = handle; - wakeup = !sa_has_callback(sai); + if (!sa_has_callback(sai)) + waitq = &sai->sai_thread.t_ctl_waitq; + list_add_tail(&entry->se_list, &sai->sai_interim_entries); } sai->sai_replied++; - if (wakeup) - wake_up(&sai->sai_thread.t_ctl_waitq); + + if (waitq != NULL) + wake_up(waitq); spin_unlock(&lli->lli_sa_lock); RETURN(rc); } -/* finish async stat RPC arguments */ -static void sa_fini_data(struct md_enqueue_info *minfo) -{ - iput(minfo->mi_dir); - OBD_FREE_PTR(minfo); -} - -/* - * prepare arguments for async stat RPC. - */ -static struct md_enqueue_info * -sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry) -{ - struct md_enqueue_info *minfo; - struct ldlm_enqueue_info *einfo; - struct md_op_data *op_data; - - OBD_ALLOC_PTR(minfo); - if (minfo == NULL) - return ERR_PTR(-ENOMEM); - - op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child, NULL, 0, 0, - LUSTRE_OPC_ANY, NULL); - if (IS_ERR(op_data)) { - OBD_FREE_PTR(minfo); - return (struct md_enqueue_info *)op_data; - } - - if (child == NULL) - op_data->op_fid2 = entry->se_fid; - - minfo->mi_it.it_op = IT_GETATTR; - minfo->mi_dir = igrab(dir); - minfo->mi_cb = ll_statahead_interpret; - minfo->mi_cbdata = entry; - - einfo = &minfo->mi_einfo; - einfo->ei_type = LDLM_IBITS; - einfo->ei_mode = it_to_lock_mode(&minfo->mi_it); - einfo->ei_cb_bl = ll_md_blocking_ast; - einfo->ei_cb_cp = ldlm_completion_ast; - einfo->ei_cb_gl = NULL; - einfo->ei_cbdata = NULL; - - return minfo; -} - /* async stat for file not found in dcache */ static int sa_lookup(struct inode *dir, struct sa_entry *entry) { @@ -786,7 +805,7 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry, { struct inode *inode = dentry->d_inode; struct lookup_intent it = { .it_op = IT_GETATTR, - .d.lustre.it_lock_handle = 0 }; + .it_lock_handle = 0 }; struct md_enqueue_info *minfo; int rc; ENTRY; @@ -797,22 +816,20 @@ static int sa_revalidate(struct inode *dir, struct sa_entry *entry, if (d_mountpoint(dentry)) RETURN(1); + minfo = sa_prep_data(dir, inode, entry); + if (IS_ERR(minfo)) + RETURN(PTR_ERR(minfo)); + entry->se_inode = igrab(inode); rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode), NULL); if (rc == 1) { - entry->se_handle = it.d.lustre.it_lock_handle; + entry->se_handle = it.it_lock_handle; ll_intent_release(&it); + sa_fini_data(minfo); RETURN(1); } - minfo = sa_prep_data(dir, inode, entry); - if (IS_ERR(minfo)) { - entry->se_inode = NULL; - iput(inode); - RETURN(PTR_ERR(minfo)); - } - rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo); if (rc < 0) { entry->se_inode = NULL; @@ -835,7 +852,7 @@ static void sa_statahead(struct dentry *parent, const char *name, int len, int rc; ENTRY; - entry = sa_alloc(sai, sai->sai_index, name, len, fid); + entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid); if (IS_ERR(entry)) RETURN_EXIT; @@ -871,10 +888,8 @@ static int ll_agl_thread(void *arg) struct ll_sb_info *sbi = ll_i2sbi(dir); struct ll_statahead_info *sai; struct ptlrpc_thread *thread; - struct l_wait_info lwi = { 0 }; ENTRY; - sai = ll_sai_get(dir); thread = &sai->sai_agl_thread; thread->t_pid = current_pid(); @@ -892,14 +907,12 @@ static int ll_agl_thread(void *arg) spin_unlock(&plli->lli_agl_lock); wake_up(&thread->t_ctl_waitq); - while (1) { - l_wait_event(thread->t_ctl_waitq, - !agl_list_empty(sai) || - !thread_is_running(thread), - &lwi); - - if (!thread_is_running(thread)) - break; + while (1) { + wait_event_idle(thread->t_ctl_waitq, + !agl_list_empty(sai) || + !thread_is_running(thread)); + if (!thread_is_running(thread)) + break; spin_lock(&plli->lli_agl_lock); /* The statahead thread maybe help to process AGL entries, @@ -909,6 +922,7 @@ static int ll_agl_thread(void *arg) list_del_init(&clli->lli_agl_list); spin_unlock(&plli->lli_agl_lock); ll_agl_trigger(&clli->lli_vfs_inode, sai); + cond_resched(); } else { spin_unlock(&plli->lli_agl_lock); } @@ -937,7 +951,6 @@ static int ll_agl_thread(void *arg) static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai) { struct ptlrpc_thread *thread = &sai->sai_agl_thread; - struct l_wait_info lwi = { 0 }; struct ll_inode_info *plli; struct task_struct *task; ENTRY; @@ -954,9 +967,8 @@ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai) RETURN_EXIT; } - l_wait_event(thread->t_ctl_waitq, - thread_is_running(thread) || thread_is_stopped(thread), - &lwi); + wait_event_idle(thread->t_ctl_waitq, + thread_is_running(thread) || thread_is_stopped(thread)); EXIT; } @@ -973,7 +985,6 @@ static int ll_statahead_thread(void *arg) int first = 0; struct md_op_data *op_data; struct ll_dir_chain chain; - struct l_wait_info lwi = { 0 }; struct page *page = NULL; __u64 pos = 0; int rc = 0; @@ -986,12 +997,9 @@ static int ll_statahead_thread(void *arg) CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n", sai, parent->d_name.len, parent->d_name.name); - op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0, - LUSTRE_OPC_ANY, dir); - if (IS_ERR(op_data)) - GOTO(out, rc = PTR_ERR(op_data)); - - op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages; + OBD_ALLOC_PTR(op_data); + if (!op_data) + GOTO(out, rc = -ENOMEM); if (sbi->ll_flags & LL_SBI_AGL_ENABLED) ll_start_agl(parent, sai); @@ -1011,13 +1019,21 @@ static int ll_statahead_thread(void *arg) struct lu_dirpage *dp; struct lu_dirent *ent; + op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0, + LUSTRE_OPC_ANY, dir); + if (IS_ERR(op_data)) { + rc = PTR_ERR(op_data); + break; + } + sai->sai_in_readpage = 1; page = ll_get_dir_page(dir, op_data, pos, &chain); + ll_unlock_md_op_lsm(op_data); sai->sai_in_readpage = 0; if (IS_ERR(page)) { rc = PTR_ERR(page); - CDEBUG(D_READA, "error reading dir "DFID" at "LPU64 - "/"LPU64" opendir_pid = %u: rc = %d\n", + CDEBUG(D_READA, "error reading dir "DFID" at %llu" + "/%llu opendir_pid = %u: rc = %d\n", PFID(ll_inode2fid(dir)), pos, sai->sai_index, lli->lli_opendir_pid, rc); break; @@ -1078,12 +1094,11 @@ static int ll_statahead_thread(void *arg) /* wait for spare statahead window */ do { - l_wait_event(sa_thread->t_ctl_waitq, - !sa_sent_full(sai) || - sa_has_callback(sai) || - !agl_list_empty(sai) || - !thread_is_running(sa_thread), - &lwi); + wait_event_idle(sa_thread->t_ctl_waitq, + !sa_sent_full(sai) || + sa_has_callback(sai) || + !agl_list_empty(sai) || + !thread_is_running(sa_thread)); sa_handle_callback(sai); @@ -1098,7 +1113,7 @@ static int ll_statahead_thread(void *arg) ll_agl_trigger(&clli->lli_vfs_inode, sai); - + cond_resched(); spin_lock(&lli->lli_agl_lock); } spin_unlock(&lli->lli_agl_lock); @@ -1116,8 +1131,8 @@ static int ll_statahead_thread(void *arg) rc = -EFAULT; atomic_inc(&sbi->ll_sa_wrong); CDEBUG(D_READA, "Statahead for dir "DFID" hit " - "ratio too low: hit/miss "LPU64"/"LPU64 - ", sent/replied "LPU64"/"LPU64", stopping " + "ratio too low: hit/miss %llu/%llu" + ", sent/replied %llu/%llu, stopping " "statahead thread: pid %d\n", PFID(&lli->lli_fid), sai->sai_hit, sai->sai_miss, sai->sai_sent, @@ -1138,10 +1153,9 @@ static int ll_statahead_thread(void *arg) /* statahead is finished, but statahead entries need to be cached, wait * for file release to stop me. */ while (thread_is_running(sa_thread)) { - l_wait_event(sa_thread->t_ctl_waitq, - sa_has_callback(sai) || - !thread_is_running(sa_thread), - &lwi); + wait_event_idle(sa_thread->t_ctl_waitq, + sa_has_callback(sai) || + !thread_is_running(sa_thread)); sa_handle_callback(sai); } @@ -1156,9 +1170,8 @@ out: CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n", sai, (unsigned int)agl_thread->t_pid); - l_wait_event(agl_thread->t_ctl_waitq, - thread_is_stopped(agl_thread), - &lwi); + wait_event_idle(agl_thread->t_ctl_waitq, + thread_is_stopped(agl_thread)); } else { /* Set agl_thread flags anyway. */ thread_set_flags(agl_thread, SVC_STOPPED); @@ -1168,10 +1181,9 @@ out: * safely because statahead RPC will access sai data */ while (sai->sai_sent != sai->sai_replied) { /* in case we're not woken up, timeout wait */ - lwi = LWI_TIMEOUT(msecs_to_jiffies(MSEC_PER_SEC >> 3), - NULL, NULL); - l_wait_event(sa_thread->t_ctl_waitq, - sai->sai_sent == sai->sai_replied, &lwi); + wait_event_idle_timeout(sa_thread->t_ctl_waitq, + sai->sai_sent == sai->sai_replied, + cfs_time_seconds(1) >> 3); } /* release resources held by statahead RPCs */ @@ -1234,8 +1246,12 @@ void ll_deauthorize_statahead(struct inode *dir, void *key) /* * statahead thread may not quit yet because it needs to cache * entries, now it's time to tell it to quit. + * + * In case sai is released, wake_up() is called inside spinlock, + * so we have to call smp_mb() explicitely to serialize ops. */ thread_set_flags(&sai->sai_thread, SVC_STOPPING); + smp_mb(); wake_up(&sai->sai_thread.t_ctl_waitq); } spin_unlock(&lli->lli_sa_lock); @@ -1275,7 +1291,6 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry) /** *FIXME choose the start offset of the readdir */ - op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages; ll_dir_chain_init(&chain); page = ll_get_dir_page(dir, op_data, 0, &chain); @@ -1288,9 +1303,8 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry) struct ll_inode_info *lli = ll_i2info(dir); rc = PTR_ERR(page); - CERROR("%s: reading dir "DFID" at "LPU64 - "opendir_pid = %u : rc = %d\n", - ll_get_fsname(dir->i_sb, NULL, 0), + CERROR("%s: reading dir "DFID" at %llu opendir_pid = %u : rc = %d\n", + ll_i2sbi(dir)->ll_fsname, PFID(ll_inode2fid(dir)), pos, lli->lli_opendir_pid, rc); break; @@ -1394,9 +1408,8 @@ static int revalidate_statahead_dentry(struct inode *dir, bool unplug) { struct sa_entry *entry = NULL; - struct l_wait_info lwi = { 0 }; struct ll_dentry_data *ldd; - struct ll_inode_info *lli; + struct ll_inode_info *lli = ll_i2info(dir); int rc = 0; ENTRY; @@ -1439,11 +1452,12 @@ static int revalidate_statahead_dentry(struct inode *dir, sa_handle_callback(sai); if (!sa_ready(entry)) { + spin_lock(&lli->lli_sa_lock); sai->sai_index_wait = entry->se_index; - lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL, - LWI_ON_SIGNAL_NOOP, NULL); - rc = l_wait_event(sai->sai_waitq, sa_ready(entry), &lwi); - if (rc < 0) { + spin_unlock(&lli->lli_sa_lock); + rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry), + cfs_time_seconds(30)); + if (rc == 0) { /* * entry may not be ready, so it may be used by inflight * statahead RPC, don't free it. @@ -1453,10 +1467,15 @@ static int revalidate_statahead_dentry(struct inode *dir, } } - if (entry->se_state == SA_ENTRY_SUCC && entry->se_inode != NULL) { + /* + * We need to see the value that was set immediately before we + * were woken up. + */ + if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC && + entry->se_inode) { struct inode *inode = entry->se_inode; struct lookup_intent it = { .it_op = IT_GETATTR, - .d.lustre.it_lock_handle = + .it_lock_handle = entry->se_handle }; __u64 bits; @@ -1481,8 +1500,7 @@ static int revalidate_statahead_dentry(struct inode *dir, "%s: stale dentry %.*s inode " DFID", statahead inode "DFID "\n", - ll_get_fsname((*dentryp)->d_inode->i_sb, - NULL, 0), + ll_i2sbi(inode)->ll_fsname, (*dentryp)->d_name.len, (*dentryp)->d_name.name, PFID(ll_inode2fid((*dentryp)->d_inode)), @@ -1506,7 +1524,6 @@ out: * dentry_may_statahead(). */ ldd = ll_d2d(*dentryp); - lli = ll_i2info(dir); /* ldd can be NULL if llite lookup failed. */ if (ldd != NULL) ldd->lld_sa_generation = lli->lli_sa_generation; @@ -1533,22 +1550,31 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry) struct ll_statahead_info *sai = NULL; struct dentry *parent = dentry->d_parent; struct ptlrpc_thread *thread; - struct l_wait_info lwi = { 0 }; struct task_struct *task; - int rc; + struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode); + int first = LS_FIRST_DE; + int rc = 0; ENTRY; /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */ - rc = is_first_dirent(dir, dentry); - if (rc == LS_NOT_FIRST_DE) + first = is_first_dirent(dir, dentry); + if (first == LS_NOT_FIRST_DE) /* It is not "ls -{a}l" operation, no need statahead for it. */ GOTO(out, rc = -EFAULT); + if (unlikely(atomic_inc_return(&sbi->ll_sa_running) > + sbi->ll_sa_running_max)) { + CDEBUG(D_READA, + "Too many concurrent statahead instances, " + "avoid new statahead instance temporarily.\n"); + GOTO(out, rc = -EMFILE); + } + sai = ll_sai_alloc(parent); if (sai == NULL) GOTO(out, rc = -ENOMEM); - sai->sai_ls_all = (rc == LS_FIRST_DOT_DE); + sai->sai_ls_all = (first == LS_FIRST_DOT_DE); /* if current lli_opendir_key was deauthorized, or dir re-opened by * another process, don't start statahead, otherwise the newly spawned @@ -1563,8 +1589,6 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry) lli->lli_sai = sai; spin_unlock(&lli->lli_sa_lock); - atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_running); - CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n", current_pid(), parent->d_name.len, parent->d_name.name); @@ -1572,14 +1596,16 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry) lli->lli_opendir_pid); thread = &sai->sai_thread; if (IS_ERR(task)) { + spin_lock(&lli->lli_sa_lock); + lli->lli_sai = NULL; + spin_unlock(&lli->lli_sa_lock); rc = PTR_ERR(task); CERROR("can't start ll_sa thread, rc: %d\n", rc); GOTO(out, rc); } - l_wait_event(thread->t_ctl_waitq, - thread_is_running(thread) || thread_is_stopped(thread), - &lwi); + wait_event_idle(thread->t_ctl_waitq, + thread_is_running(thread) || thread_is_stopped(thread)); ll_sai_put(sai); /* @@ -1592,12 +1618,14 @@ out: /* once we start statahead thread failed, disable statahead so that * subsequent stat won't waste time to try it. */ spin_lock(&lli->lli_sa_lock); - lli->lli_sa_enabled = 0; - lli->lli_sai = NULL; + if (lli->lli_opendir_pid == current->pid) + lli->lli_sa_enabled = 0; spin_unlock(&lli->lli_sa_lock); if (sai != NULL) ll_sai_free(sai); + if (first != LS_NOT_FIRST_DE) + atomic_dec(&sbi->ll_sa_running); RETURN(rc); }