1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 #include <linux/sched.h>
40 #include <linux/smp_lock.h>
41 #include <linux/highmem.h>
42 #include <linux/pagemap.h>
44 #define DEBUG_SUBSYSTEM S_LLITE
46 #include <obd_support.h>
47 #include <lustre_lite.h>
48 #include <lustre_dlm.h>
49 #include <linux/lustre_version.h>
50 #include "llite_internal.h"
54 unsigned int se_index;
56 struct ptlrpc_request *se_req;
57 struct md_enqueue_info *se_minfo;
58 struct dentry *se_dentry;
59 struct inode *se_inode;
63 SA_ENTRY_UNSTATED = 0,
67 static unsigned int sai_generation = 0;
68 static cfs_spinlock_t sai_generation_lock = CFS_SPIN_LOCK_UNLOCKED;
71 * Check whether first entry was stated already or not.
72 * No need to hold lli_lock, for:
73 * (1) it is me that remove entry from the list
74 * (2) the statahead thread only add new entry to the list
76 static int ll_sai_entry_stated(struct ll_statahead_info *sai)
78 struct ll_sai_entry *entry;
81 if (!cfs_list_empty(&sai->sai_entries_stated)) {
82 entry = cfs_list_entry(sai->sai_entries_stated.next,
83 struct ll_sai_entry, se_list);
84 if (entry->se_index == sai->sai_index_next)
90 static inline int sa_received_empty(struct ll_statahead_info *sai)
92 return cfs_list_empty(&sai->sai_entries_received);
95 static inline int sa_not_full(struct ll_statahead_info *sai)
97 return (sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max);
100 static inline int sa_is_running(struct ll_statahead_info *sai)
102 return !!(sai->sai_thread.t_flags & SVC_RUNNING);
105 static inline int sa_is_stopping(struct ll_statahead_info *sai)
107 return !!(sai->sai_thread.t_flags & SVC_STOPPING);
110 static inline int sa_is_stopped(struct ll_statahead_info *sai)
112 return !!(sai->sai_thread.t_flags & SVC_STOPPED);
116 * (1) hit ratio less than 80%
118 * (2) consecutive miss more than 8
120 static inline int sa_low_hit(struct ll_statahead_info *sai)
122 return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
123 (sai->sai_consecutive_miss > 8));
126 static void ll_sai_entry_free(struct ll_sai_entry *entry)
128 struct dentry *dentry = entry->se_dentry;
129 struct inode *inode = entry->se_inode;
132 entry->se_dentry = NULL;
136 entry->se_inode = NULL;
139 LASSERT(cfs_list_empty(&entry->se_list));
144 * process the deleted entry's member and free the entry.
146 * (2) free md_enqueue_info
147 * (3) drop dentry's ref count
148 * (4) release request's ref count
150 static void ll_sai_entry_cleanup(struct ll_sai_entry *entry, int free)
152 struct md_enqueue_info *minfo = entry->se_minfo;
153 struct ptlrpc_request *req = entry->se_req;
157 entry->se_minfo = NULL;
158 ll_intent_release(&minfo->mi_it);
159 dput(minfo->mi_dentry);
164 entry->se_req = NULL;
165 ptlrpc_req_finished(req);
168 ll_sai_entry_free(entry);
173 static struct ll_statahead_info *ll_sai_alloc(void)
175 struct ll_statahead_info *sai;
181 cfs_spin_lock(&sai_generation_lock);
182 sai->sai_generation = ++sai_generation;
183 if (unlikely(sai_generation == 0))
184 sai->sai_generation = ++sai_generation;
185 cfs_spin_unlock(&sai_generation_lock);
186 cfs_atomic_set(&sai->sai_refcount, 1);
187 sai->sai_max = LL_SA_RPC_MIN;
188 cfs_waitq_init(&sai->sai_waitq);
189 cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
190 CFS_INIT_LIST_HEAD(&sai->sai_entries_sent);
191 CFS_INIT_LIST_HEAD(&sai->sai_entries_received);
192 CFS_INIT_LIST_HEAD(&sai->sai_entries_stated);
197 struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
200 cfs_atomic_inc(&sai->sai_refcount);
204 static void ll_sai_put(struct ll_statahead_info *sai)
206 struct inode *inode = sai->sai_inode;
207 struct ll_inode_info *lli;
210 LASSERT(inode != NULL);
211 lli = ll_i2info(inode);
212 LASSERT(lli->lli_sai == sai);
214 if (cfs_atomic_dec_and_test(&sai->sai_refcount)) {
215 struct ll_sai_entry *entry, *next;
217 cfs_spin_lock(&lli->lli_lock);
218 if (unlikely(cfs_atomic_read(&sai->sai_refcount) > 0)) {
219 /* It is race case, the interpret callback just hold
220 * a reference count */
221 cfs_spin_unlock(&lli->lli_lock);
226 LASSERT(lli->lli_opendir_key == NULL);
228 lli->lli_opendir_pid = 0;
229 cfs_spin_unlock(&lli->lli_lock);
231 LASSERT(sa_is_stopped(sai));
233 if (sai->sai_sent > sai->sai_replied)
234 CDEBUG(D_READA,"statahead for dir "DFID" does not "
235 "finish: [sent:%u] [replied:%u]\n",
237 sai->sai_sent, sai->sai_replied);
239 cfs_list_for_each_entry_safe(entry, next,
240 &sai->sai_entries_sent, se_list) {
241 cfs_list_del_init(&entry->se_list);
242 ll_sai_entry_cleanup(entry, 1);
244 cfs_list_for_each_entry_safe(entry, next,
245 &sai->sai_entries_received,
247 cfs_list_del_init(&entry->se_list);
248 ll_sai_entry_cleanup(entry, 1);
250 cfs_list_for_each_entry_safe(entry, next,
251 &sai->sai_entries_stated,
253 cfs_list_del_init(&entry->se_list);
254 ll_sai_entry_cleanup(entry, 1);
263 * insert it into sai_entries_sent tail when init.
265 static struct ll_sai_entry *
266 ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index)
268 struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
269 struct ll_sai_entry *entry;
272 OBD_ALLOC_PTR(entry);
274 RETURN(ERR_PTR(-ENOMEM));
276 CDEBUG(D_READA, "alloc sai entry %p index %u\n",
278 entry->se_index = index;
279 entry->se_stat = SA_ENTRY_UNSTATED;
281 cfs_spin_lock(&lli->lli_lock);
282 cfs_list_add_tail(&entry->se_list, &sai->sai_entries_sent);
283 cfs_spin_unlock(&lli->lli_lock);
289 * delete it from sai_entries_stated head when fini, it need not
290 * to process entry's member.
292 static int ll_sai_entry_fini(struct ll_statahead_info *sai)
294 struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
295 struct ll_sai_entry *entry;
299 cfs_spin_lock(&lli->lli_lock);
300 sai->sai_index_next++;
301 if (likely(!cfs_list_empty(&sai->sai_entries_stated))) {
302 entry = cfs_list_entry(sai->sai_entries_stated.next,
303 struct ll_sai_entry, se_list);
304 if (entry->se_index < sai->sai_index_next) {
305 cfs_list_del_init(&entry->se_list);
307 ll_sai_entry_free(entry);
310 LASSERT(sa_is_stopped(sai));
312 cfs_spin_unlock(&lli->lli_lock);
319 * \retval NULL : can not find the entry in sai_entries_sent with the index
320 * \retval entry: find the entry in sai_entries_sent with the index
322 static struct ll_sai_entry *
323 ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat,
324 struct ptlrpc_request *req, struct md_enqueue_info *minfo)
326 struct ll_sai_entry *entry;
329 if (!cfs_list_empty(&sai->sai_entries_sent)) {
330 cfs_list_for_each_entry(entry, &sai->sai_entries_sent,
332 if (entry->se_index == index) {
333 entry->se_stat = stat;
334 entry->se_req = ptlrpc_request_addref(req);
335 entry->se_minfo = minfo;
337 } else if (entry->se_index > index) {
347 * Move entry to sai_entries_received and
348 * insert it into sai_entries_received tail.
351 ll_sai_entry_to_received(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
353 if (!cfs_list_empty(&entry->se_list))
354 cfs_list_del_init(&entry->se_list);
355 cfs_list_add_tail(&entry->se_list, &sai->sai_entries_received);
359 * Move entry to sai_entries_stated and
360 * sort with the index.
363 ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
365 struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
366 struct ll_sai_entry *se;
369 ll_sai_entry_cleanup(entry, 0);
371 cfs_spin_lock(&lli->lli_lock);
372 if (!cfs_list_empty(&entry->se_list))
373 cfs_list_del_init(&entry->se_list);
376 if (unlikely(entry->se_index < sai->sai_index_next)) {
377 cfs_spin_unlock(&lli->lli_lock);
378 ll_sai_entry_free(entry);
382 cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) {
383 if (se->se_index < entry->se_index) {
384 cfs_list_add(&entry->se_list, &se->se_list);
385 cfs_spin_unlock(&lli->lli_lock);
391 * I am the first entry.
393 cfs_list_add(&entry->se_list, &sai->sai_entries_stated);
394 cfs_spin_unlock(&lli->lli_lock);
399 * finish lookup/revalidate.
401 static int do_statahead_interpret(struct ll_statahead_info *sai)
403 struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
404 struct ll_sai_entry *entry;
405 struct ptlrpc_request *req;
406 struct md_enqueue_info *minfo;
407 struct lookup_intent *it;
408 struct dentry *dentry;
410 struct mdt_body *body;
413 cfs_spin_lock(&lli->lli_lock);
414 LASSERT(!sa_received_empty(sai));
415 entry = cfs_list_entry(sai->sai_entries_received.next,
416 struct ll_sai_entry, se_list);
417 cfs_list_del_init(&entry->se_list);
418 cfs_spin_unlock(&lli->lli_lock);
420 if (unlikely(entry->se_index < sai->sai_index_next)) {
421 CWARN("Found stale entry: [index %u] [next %u]\n",
422 entry->se_index, sai->sai_index_next);
423 ll_sai_entry_cleanup(entry, 1);
427 if (entry->se_stat != SA_ENTRY_STATED)
428 GOTO(out, rc = entry->se_stat);
431 minfo = entry->se_minfo;
433 dentry = minfo->mi_dentry;
435 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
437 GOTO(out, rc = -EFAULT);
439 if (dentry->d_inode == NULL) {
443 struct dentry *save = dentry;
444 struct it_cb_data icbd = {
445 .icbd_parent = minfo->mi_dir,
446 .icbd_childp = &dentry
449 LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
451 /* XXX: No fid in reply, this is probaly cross-ref case.
452 * SA can't handle it yet. */
453 if (body->valid & OBD_MD_MDS)
454 GOTO(out, rc = -EAGAIN);
456 /* Here dentry->d_inode might be NULL, because the entry may
457 * have been removed before we start doing stat ahead. */
459 /* BUG 15962, 21739: since statahead thread does not hold
460 * parent's i_mutex, it can not alias the dentry to inode.
461 * Here we just create/update inode in memory, and let the
462 * main "ls -l" thread to alias such dentry to the inode with
463 * parent's i_mutex held.
464 * On the other hand, we hold ldlm ibits lock for the inode
465 * yet, to allow other operations to cancel such lock in time,
466 * we should drop the ldlm lock reference count, then the main
467 * "ls -l" thread should check/get such ldlm ibits lock before
468 * aliasing such dentry to the inode later. If we don't do such
469 * drop here, it maybe cause deadlock with i_muext held by
470 * others, just like bug 21739. */
471 rc = ll_lookup_it_finish(req, it, &icbd, &entry->se_inode);
472 if (entry->se_inode != NULL)
473 entry->se_dentry = dget(dentry);
474 LASSERT(dentry == save);
475 ll_intent_drop_lock(it);
480 if (!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1)) {
481 ll_unhash_aliases(dentry->d_inode);
482 GOTO(out, rc = -EAGAIN);
485 rc = ll_revalidate_it_finish(req, it, dentry);
487 ll_unhash_aliases(dentry->d_inode);
491 cfs_spin_lock(&ll_lookup_lock);
492 spin_lock(&dcache_lock);
495 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
496 unlock_dentry(dentry);
497 d_rehash_cond(dentry, 0);
498 spin_unlock(&dcache_lock);
499 cfs_spin_unlock(&ll_lookup_lock);
501 ll_lookup_finish_locks(it, dentry);
506 /* The "ll_sai_entry_to_stated()" will drop related ldlm ibits lock
507 * reference count with ll_intent_drop_lock() called in spite of the
508 * above operations failed or not. Do not worry about calling
509 * "ll_intent_drop_lock()" more than once. */
510 if (likely(ll_sai_entry_to_stated(sai, entry)))
511 cfs_waitq_signal(&sai->sai_waitq);
515 static int ll_statahead_interpret(struct ptlrpc_request *req,
516 struct md_enqueue_info *minfo,
519 struct lookup_intent *it = &minfo->mi_it;
520 struct dentry *dentry = minfo->mi_dentry;
521 struct inode *dir = minfo->mi_dir;
522 struct ll_inode_info *lli = ll_i2info(dir);
523 struct ll_statahead_info *sai;
524 struct ll_sai_entry *entry;
527 CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
528 dentry->d_name.len, dentry->d_name.name, rc);
530 cfs_spin_lock(&lli->lli_lock);
532 if (unlikely(lli->lli_sai == NULL ||
533 lli->lli_sai->sai_generation != minfo->mi_generation)) {
534 cfs_spin_unlock(&lli->lli_lock);
535 ll_intent_release(it);
541 sai = ll_sai_get(lli->lli_sai);
542 entry = ll_sai_entry_set(sai,
543 (unsigned int)(long)minfo->mi_cbdata,
544 rc < 0 ? rc : SA_ENTRY_STATED, req,
546 LASSERT(entry != NULL);
547 if (likely(sa_is_running(sai))) {
548 ll_sai_entry_to_received(sai, entry);
550 cfs_spin_unlock(&lli->lli_lock);
551 cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
553 if (!cfs_list_empty(&entry->se_list))
554 cfs_list_del_init(&entry->se_list);
556 cfs_spin_unlock(&lli->lli_lock);
557 ll_sai_entry_cleanup(entry, 1);
564 static void sa_args_fini(struct md_enqueue_info *minfo,
565 struct ldlm_enqueue_info *einfo)
567 LASSERT(minfo && einfo);
569 capa_put(minfo->mi_data.op_capa1);
570 capa_put(minfo->mi_data.op_capa2);
576 * There is race condition between "capa_put" and "ll_statahead_interpret" for
577 * accessing "op_data.op_capa[1,2]" as following:
578 * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling
579 * "md_intent_getattr_async". But "ll_statahead_interpret" maybe run first, and
580 * fill "op_data.op_capa[1,2]" as POISON, then cause "capa_put" access invalid
581 * "ocapa". So here reserve "op_data.op_capa[1,2]" in "pcapa" before calling
582 * "md_intent_getattr_async".
584 static int sa_args_init(struct inode *dir, struct dentry *dentry,
585 struct md_enqueue_info **pmi,
586 struct ldlm_enqueue_info **pei,
587 struct obd_capa **pcapa)
589 struct ll_inode_info *lli = ll_i2info(dir);
590 struct md_enqueue_info *minfo;
591 struct ldlm_enqueue_info *einfo;
592 struct md_op_data *op_data;
594 OBD_ALLOC_PTR(einfo);
598 OBD_ALLOC_PTR(minfo);
604 op_data = ll_prep_md_op_data(&minfo->mi_data, dir, dentry->d_inode,
605 dentry->d_name.name, dentry->d_name.len,
606 0, LUSTRE_OPC_ANY, NULL);
607 if (IS_ERR(op_data)) {
610 return PTR_ERR(op_data);
613 minfo->mi_it.it_op = IT_GETATTR;
614 minfo->mi_dentry = dentry;
615 minfo->mi_dir = igrab(dir);
616 minfo->mi_cb = ll_statahead_interpret;
617 minfo->mi_generation = lli->lli_sai->sai_generation;
618 minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index;
620 einfo->ei_type = LDLM_IBITS;
621 einfo->ei_mode = it_to_lock_mode(&minfo->mi_it);
622 einfo->ei_cb_bl = ll_md_blocking_ast;
623 einfo->ei_cb_cp = ldlm_completion_ast;
624 einfo->ei_cb_gl = NULL;
625 einfo->ei_cbdata = NULL;
629 pcapa[0] = op_data->op_capa1;
630 pcapa[1] = op_data->op_capa2;
636 * similar to ll_lookup_it().
638 static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
640 struct md_enqueue_info *minfo;
641 struct ldlm_enqueue_info *einfo;
642 struct obd_capa *capas[2];
646 rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
650 rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
655 sa_args_fini(minfo, einfo);
662 * similar to ll_revalidate_it().
663 * \retval 1 -- dentry valid
664 * \retval 0 -- will send stat-ahead request
665 * \retval others -- prepare stat-ahead request failed
667 static int do_sa_revalidate(struct inode *dir, struct dentry *dentry)
669 struct inode *inode = dentry->d_inode;
670 struct lookup_intent it = { .it_op = IT_GETATTR };
671 struct md_enqueue_info *minfo;
672 struct ldlm_enqueue_info *einfo;
673 struct obd_capa *capas[2];
677 if (unlikely(inode == NULL))
680 if (d_mountpoint(dentry))
683 if (unlikely(dentry == dentry->d_sb->s_root))
686 rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
689 ll_intent_release(&it);
693 rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
697 rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
702 sa_args_fini(minfo, einfo);
708 static inline void ll_name2qstr(struct qstr *q, const char *name, int namelen)
712 q->hash = full_name_hash(name, namelen);
715 static int ll_statahead_one(struct dentry *parent, const char* entry_name,
718 struct inode *dir = parent->d_inode;
719 struct ll_inode_info *lli = ll_i2info(dir);
720 struct ll_statahead_info *sai = lli->lli_sai;
722 struct dentry *dentry;
723 struct ll_sai_entry *se;
727 if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
728 CDEBUG(D_READA, "parent dentry@%p %.*s is "
729 "invalid, skip statahead\n",
730 parent, parent->d_name.len, parent->d_name.name);
734 se = ll_sai_entry_init(sai, sai->sai_index);
738 ll_name2qstr(&name, entry_name, entry_name_len);
739 dentry = d_lookup(parent, &name);
741 dentry = d_alloc(parent, &name);
743 rc = do_sa_lookup(dir, dentry);
747 GOTO(out, rc = -ENOMEM);
750 rc = do_sa_revalidate(dir, dentry);
759 CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
760 se, se->se_index, se->se_stat, rc);
761 se->se_stat = rc < 0 ? rc : SA_ENTRY_STATED;
762 if (ll_sai_entry_to_stated(sai, se))
763 cfs_waitq_signal(&sai->sai_waitq);
772 static int ll_statahead_thread(void *arg)
774 struct dentry *parent = (struct dentry *)arg;
775 struct inode *dir = parent->d_inode;
776 struct ll_inode_info *lli = ll_i2info(dir);
777 struct ll_sb_info *sbi = ll_i2sbi(dir);
778 struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
779 struct ptlrpc_thread *thread = &sai->sai_thread;
784 struct ll_dir_chain chain;
789 snprintf(pname, 15, "ll_sa_%u", lli->lli_opendir_pid);
790 cfs_daemonize(pname);
793 atomic_inc(&sbi->ll_sa_total);
794 cfs_spin_lock(&lli->lli_lock);
795 thread->t_flags = SVC_RUNNING;
796 cfs_spin_unlock(&lli->lli_lock);
797 cfs_waitq_signal(&thread->t_ctl_waitq);
798 CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
800 ll_dir_chain_init(&chain);
801 page = ll_get_dir_page(dir, pos, 0, &chain);
804 struct l_wait_info lwi = { 0 };
805 struct lu_dirpage *dp;
806 struct lu_dirent *ent;
810 CDEBUG(D_READA, "error reading dir "DFID" at "LPU64
811 "/%u: [rc %d] [parent %u]\n",
812 PFID(ll_inode2fid(dir)), pos, sai->sai_index,
813 rc, lli->lli_opendir_pid);
817 dp = page_address(page);
818 for (ent = lu_dirent_start(dp); ent != NULL;
819 ent = lu_dirent_next(ent)) {
820 char *name = ent->lde_name;
821 int namelen = le16_to_cpu(ent->lde_namelen);
823 if (unlikely(namelen == 0))
829 if (name[0] == '.') {
835 } else if (name[1] == '.' && namelen == 2) {
840 } else if (!sai->sai_ls_all) {
844 sai->sai_skip_hidden++;
850 * don't stat-ahead first entry.
852 if (unlikely(!first)) {
858 l_wait_event(thread->t_ctl_waitq,
859 !sa_is_running(sai) || sa_not_full(sai) ||
860 !sa_received_empty(sai),
863 while (!sa_received_empty(sai) && sa_is_running(sai))
864 do_statahead_interpret(sai);
866 if (unlikely(!sa_is_running(sai))) {
871 if (!sa_not_full(sai))
873 * do not skip the current de.
877 rc = ll_statahead_one(parent, name, namelen);
883 pos = le64_to_cpu(dp->ldp_hash_end);
885 if (pos == DIR_END_OFF) {
887 * End of directory reached.
890 l_wait_event(thread->t_ctl_waitq,
891 !sa_is_running(sai) ||
892 !sa_received_empty(sai) ||
893 sai->sai_sent == sai->sai_replied,
895 if (!sa_received_empty(sai) &&
897 do_statahead_interpret(sai);
903 * chain is exhausted.
904 * Normal case: continue to the next page.
906 page = ll_get_dir_page(dir, pos, 1, &chain);
909 * go into overflow page.
916 ll_dir_chain_fini(&chain);
917 cfs_spin_lock(&lli->lli_lock);
918 thread->t_flags = SVC_STOPPED;
919 cfs_spin_unlock(&lli->lli_lock);
920 cfs_waitq_signal(&sai->sai_waitq);
921 cfs_waitq_signal(&thread->t_ctl_waitq);
924 CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
930 * called in ll_file_release().
932 void ll_stop_statahead(struct inode *inode, void *key)
934 struct ll_inode_info *lli = ll_i2info(inode);
936 if (unlikely(key == NULL))
939 cfs_spin_lock(&lli->lli_lock);
940 if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) {
941 cfs_spin_unlock(&lli->lli_lock);
945 lli->lli_opendir_key = NULL;
948 struct l_wait_info lwi = { 0 };
949 struct ptlrpc_thread *thread = &lli->lli_sai->sai_thread;
951 if (!sa_is_stopped(lli->lli_sai)) {
952 thread->t_flags = SVC_STOPPING;
953 cfs_spin_unlock(&lli->lli_lock);
954 cfs_waitq_signal(&thread->t_ctl_waitq);
956 CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
958 l_wait_event(thread->t_ctl_waitq,
959 sa_is_stopped(lli->lli_sai),
962 cfs_spin_unlock(&lli->lli_lock);
966 * Put the ref which was held when first statahead_enter.
967 * It maybe not the last ref for some statahead requests
970 ll_sai_put(lli->lli_sai);
972 lli->lli_opendir_pid = 0;
973 cfs_spin_unlock(&lli->lli_lock);
979 * not first dirent, or is "."
981 LS_NONE_FIRST_DE = 0,
983 * the first non-hidden dirent
987 * the first hidden dirent, that is "."
992 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
994 struct ll_dir_chain chain;
995 struct qstr *target = &dentry->d_name;
999 int rc = LS_NONE_FIRST_DE;
1002 ll_dir_chain_init(&chain);
1003 page = ll_get_dir_page(dir, pos, 0, &chain);
1006 struct lu_dirpage *dp;
1007 struct lu_dirent *ent;
1010 struct ll_inode_info *lli = ll_i2info(dir);
1013 CERROR("error reading dir "DFID" at "LPU64": "
1014 "[rc %d] [parent %u]\n",
1015 PFID(ll_inode2fid(dir)), pos,
1016 rc, lli->lli_opendir_pid);
1020 dp = page_address(page);
1021 for (ent = lu_dirent_start(dp); ent != NULL;
1022 ent = lu_dirent_next(ent)) {
1023 char *name = ent->lde_name;
1024 int namelen = le16_to_cpu(ent->lde_namelen);
1028 * skip dummy record.
1032 if (name[0] == '.') {
1038 else if (name[1] == '.' && namelen == 2)
1049 if (dot_de && target->name[0] != '.') {
1050 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1051 target->len, target->name,
1056 if (target->len != namelen ||
1057 memcmp(target->name, name, namelen) != 0)
1058 rc = LS_NONE_FIRST_DE;
1062 rc = LS_FIRST_DOT_DE;
1067 pos = le64_to_cpu(dp->ldp_hash_end);
1069 if (pos == DIR_END_OFF) {
1071 * End of directory reached.
1076 * chain is exhausted
1077 * Normal case: continue to the next page.
1079 page = ll_get_dir_page(dir, pos, 1, &chain);
1082 * go into overflow page.
1089 ll_dir_chain_fini(&chain);
1093 static int is_same_dentry(struct dentry *d1, struct dentry *d2)
1095 if (unlikely(d1 == d2))
1097 if (d1->d_parent == d2->d_parent &&
1098 d1->d_name.hash == d2->d_name.hash &&
1099 d1->d_name.len == d2->d_name.len &&
1100 memcmp(d1->d_name.name, d2->d_name.name, d1->d_name.len) == 0)
1106 * Start statahead thread if this is the first dir entry.
1107 * Otherwise if a thread is started already, wait it until it is ahead of me.
1108 * \retval 0 -- stat ahead thread process such dentry, miss for lookup
1109 * \retval 1 -- stat ahead thread process such dentry, hit for any case
1110 * \retval -EEXIST -- stat ahead thread started, and this is the first dentry
1111 * \retval -EBADFD -- statahead thread exit and not dentry available
1112 * \retval -EAGAIN -- try to stat by caller
1113 * \retval others -- error
1115 int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
1117 struct ll_inode_info *lli;
1118 struct ll_statahead_info *sai;
1119 struct dentry *parent;
1120 struct l_wait_info lwi = { 0 };
1124 LASSERT(dir != NULL);
1125 lli = ll_i2info(dir);
1126 LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
1130 if (unlikely(sa_is_stopped(sai) &&
1131 cfs_list_empty(&sai->sai_entries_stated)))
1134 if ((*dentryp)->d_name.name[0] == '.') {
1135 if (likely(sai->sai_ls_all ||
1136 sai->sai_miss_hidden >= sai->sai_skip_hidden)) {
1138 * Hidden dentry is the first one, or statahead
1139 * thread does not skip so many hidden dentries
1140 * before "sai_ls_all" enabled as below.
1143 if (!sai->sai_ls_all)
1145 * It maybe because hidden dentry is not
1146 * the first one, "sai_ls_all" was not
1147 * set, then "ls -al" missed. Enable
1148 * "sai_ls_all" for such case.
1150 sai->sai_ls_all = 1;
1153 * Such "getattr" has been skipped before
1154 * "sai_ls_all" enabled as above.
1156 sai->sai_miss_hidden++;
1161 if (!ll_sai_entry_stated(sai)) {
1163 * thread started already, avoid double-stat.
1165 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1166 rc = l_wait_event(sai->sai_waitq,
1167 ll_sai_entry_stated(sai) ||
1170 if (unlikely(rc == -EINTR))
1174 if (ll_sai_entry_stated(sai)) {
1175 struct ll_sai_entry *entry;
1177 entry = cfs_list_entry(sai->sai_entries_stated.next,
1178 struct ll_sai_entry, se_list);
1179 /* This is for statahead lookup */
1180 if (entry->se_inode != NULL) {
1181 struct lookup_intent it = {.it_op = IT_GETATTR};
1182 struct dentry *dchild = entry->se_dentry;
1183 struct inode *ichild = entry->se_inode;
1187 LASSERT(dchild != *dentryp);
1190 mutex_lock(&dir->i_mutex);
1192 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1193 ll_inode2fid(ichild),
1196 struct dentry *save = dchild;
1198 ll_lookup_it_alias(&dchild, ichild,
1200 ll_lookup_finish_locks(&it, dchild);
1203 found = is_same_dentry(dchild,
1206 /* Someone has canceled related ldlm
1207 * lock before the real "revalidate"
1209 * Drop the inode reference count held
1210 * by interpreter. */
1215 mutex_unlock(&dir->i_mutex);
1217 entry->se_dentry = NULL;
1218 entry->se_inode = NULL;
1221 LASSERT(dchild != *dentryp);
1222 /* VFS will drop the reference
1223 * count for dchild and *dentryp
1227 LASSERT(dchild == *dentryp);
1228 /* Drop the dentry reference
1229 * count held by statahead. */
1234 /* Drop the dentry reference count held
1242 struct dentry *result;
1244 result = d_lookup((*dentryp)->d_parent,
1245 &(*dentryp)->d_name);
1247 LASSERT(result != *dentryp);
1248 /* BUG 16303: do not drop reference count for
1249 * "*dentryp", VFS will do that by itself. */
1255 * do nothing for revalidate.
1260 /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1261 rc = is_first_dirent(dir, *dentryp);
1262 if (rc == LS_NONE_FIRST_DE)
1263 /* It is not "ls -{a}l" operation, no need statahead for it. */
1264 GOTO(out, rc = -EAGAIN);
1266 sai = ll_sai_alloc();
1268 GOTO(out, rc = -ENOMEM);
1270 sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
1271 sai->sai_inode = igrab(dir);
1272 if (unlikely(sai->sai_inode == NULL)) {
1273 CWARN("Do not start stat ahead on dying inode "DFID"\n",
1274 PFID(&lli->lli_fid));
1276 GOTO(out, rc = -ESTALE);
1279 /* get parent reference count here, and put it in ll_statahead_thread */
1280 parent = dget((*dentryp)->d_parent);
1281 if (unlikely(sai->sai_inode != parent->d_inode)) {
1282 struct ll_inode_info *nlli = ll_i2info(parent->d_inode);
1284 CWARN("Race condition, someone changed %.*s just now: "
1285 "old parent "DFID", new parent "DFID"\n",
1286 (*dentryp)->d_name.len, (*dentryp)->d_name.name,
1287 PFID(&lli->lli_fid), PFID(&nlli->lli_fid));
1289 iput(sai->sai_inode);
1295 rc = cfs_kernel_thread(ll_statahead_thread, parent, 0);
1297 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1299 lli->lli_opendir_key = NULL;
1300 sai->sai_thread.t_flags = SVC_STOPPED;
1302 LASSERT(lli->lli_sai == NULL);
1306 l_wait_event(sai->sai_thread.t_ctl_waitq,
1307 sa_is_running(sai) || sa_is_stopped(sai),
1311 * We don't stat-ahead for the first dirent since we are already in
1312 * lookup, and -EEXIST also indicates that this is the first dirent.
1317 cfs_spin_lock(&lli->lli_lock);
1318 lli->lli_opendir_key = NULL;
1319 lli->lli_opendir_pid = 0;
1320 cfs_spin_unlock(&lli->lli_lock);
1325 * update hit/miss count.
1327 void ll_statahead_exit(struct inode *dir, struct dentry *dentry, int result)
1329 struct ll_inode_info *lli;
1330 struct ll_statahead_info *sai;
1331 struct ll_sb_info *sbi;
1332 struct ll_dentry_data *ldd = ll_d2d(dentry);
1336 LASSERT(dir != NULL);
1337 lli = ll_i2info(dir);
1338 LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
1340 LASSERT(sai != NULL);
1341 sbi = ll_i2sbi(dir);
1343 rc = ll_sai_entry_fini(sai);
1344 /* rc == -ENOENT means such dentry was removed just between statahead
1345 * readdir and pre-fetched, count it as hit.
1347 * result == -ENOENT has two meanings:
1348 * 1. such dentry was removed just between statahead pre-fetched and
1349 * main process stat such dentry.
1350 * 2. main process stat non-exist dentry.
1351 * We can not distinguish such two cases, just count them as miss. */
1352 if (result >= 1 || unlikely(rc == -ENOENT)) {
1354 sai->sai_consecutive_miss = 0;
1355 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
1358 sai->sai_consecutive_miss++;
1359 if (sa_low_hit(sai) && sa_is_running(sai)) {
1360 atomic_inc(&sbi->ll_sa_wrong);
1361 CDEBUG(D_READA, "Statahead for dir "DFID" hit ratio "
1362 "too low: hit/miss %u/%u, sent/replied %u/%u, "
1363 "stopping statahead thread: pid %d\n",
1364 PFID(&lli->lli_fid), sai->sai_hit,
1365 sai->sai_miss, sai->sai_sent,
1366 sai->sai_replied, cfs_curproc_pid());
1367 cfs_spin_lock(&lli->lli_lock);
1368 if (!sa_is_stopped(sai))
1369 sai->sai_thread.t_flags = SVC_STOPPING;
1370 cfs_spin_unlock(&lli->lli_lock);
1374 if (!sa_is_stopped(sai))
1375 cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
1376 if (likely(ldd != NULL))
1377 ldd->lld_sa_generation = sai->sai_generation;