1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2007 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 #include <linux/sched.h>
25 #include <linux/smp_lock.h>
26 #include <linux/highmem.h>
27 #include <linux/pagemap.h>
29 #define DEBUG_SUBSYSTEM S_LLITE
31 #include <obd_support.h>
32 #include <lustre_lite.h>
33 #include <lustre_dlm.h>
34 #include <linux/lustre_version.h>
35 #include "llite_internal.h"
38 struct list_head se_list;
39 unsigned int se_index;
44 SA_ENTRY_UNSTATED = 0,
48 static unsigned int sai_generation = 0;
49 static spinlock_t sai_generation_lock = SPIN_LOCK_UNLOCKED;
51 static struct ll_statahead_info *ll_sai_alloc(void)
53 struct ll_statahead_info *sai;
59 spin_lock(&sai_generation_lock);
60 sai->sai_generation = ++sai_generation;
61 if (unlikely(sai_generation == 0))
62 sai->sai_generation = ++sai_generation;
63 spin_unlock(&sai_generation_lock);
64 atomic_set(&sai->sai_refcount, 1);
65 sai->sai_max = LL_SA_RPC_MIN;
66 cfs_waitq_init(&sai->sai_waitq);
67 cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
68 CFS_INIT_LIST_HEAD(&sai->sai_entries);
73 struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
76 atomic_inc(&sai->sai_refcount);
80 static void ll_sai_put(struct ll_statahead_info *sai)
82 struct inode *inode = sai->sai_inode;
83 struct ll_inode_info *lli = ll_i2info(inode);
86 if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_lock)) {
87 struct ll_sai_entry *entry, *next;
90 spin_unlock(&lli->lli_lock);
92 LASSERT(sai->sai_thread.t_flags & SVC_STOPPED);
94 if (sai->sai_sent > sai->sai_replied)
95 CDEBUG(D_READA,"statahead for dir %lu/%u does not "
96 "finish: [sent:%u] [replied:%u]\n",
97 inode->i_ino, inode->i_generation,
98 sai->sai_sent, sai->sai_replied);
100 list_for_each_entry_safe(entry, next, &sai->sai_entries,
102 list_del(&entry->se_list);
111 static struct ll_sai_entry *
112 ll_sai_entry_get(struct ll_statahead_info *sai, unsigned int index, int stat)
114 struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
115 struct ll_sai_entry *entry;
118 OBD_ALLOC_PTR(entry);
120 RETURN(ERR_PTR(-ENOMEM));
122 CDEBUG(D_READA, "alloc sai entry %p index %u, stat %d\n",
124 entry->se_index = index;
125 entry->se_stat = stat;
127 spin_lock(&lli->lli_lock);
128 list_add_tail(&entry->se_list, &sai->sai_entries);
129 spin_unlock(&lli->lli_lock);
137 * 0: can not find the entry with the index
138 * 1: it is the first entry
139 * 2: it is not the first entry
142 ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat)
144 struct ll_sai_entry *entry;
148 if (list_empty(&sai->sai_entries))
151 entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, se_list);
152 if (entry->se_index == index)
155 while (entry->se_list.next != &sai->sai_entries &&
156 entry->se_index < index) {
157 entry = list_entry(entry->se_list.next, struct ll_sai_entry,
159 if (entry->se_index == index)
167 LASSERT(entry->se_stat == SA_ENTRY_UNSTATED);
168 entry->se_stat = stat;
175 * Check whether first entry was stated already or not.
176 * No need to hold lli_lock, for:
177 * (1) it is me that remove entry from the list
178 * (2) the statahead thread only add new entry to the list tail
180 static int ll_sai_entry_stated(struct ll_statahead_info *sai)
182 struct ll_sai_entry *entry;
186 if (!list_empty(&sai->sai_entries)) {
187 entry = list_entry(sai->sai_entries.next, struct ll_sai_entry,
189 rc = (entry->se_stat != SA_ENTRY_UNSTATED);
195 static void ll_sai_entry_put(struct ll_statahead_info *sai)
197 struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
198 struct ll_sai_entry *entry;
201 spin_lock(&lli->lli_lock);
202 if (!list_empty(&sai->sai_entries)) {
203 entry = list_entry(sai->sai_entries.next,
204 struct ll_sai_entry, se_list);
205 list_del(&entry->se_list);
208 spin_unlock(&lli->lli_lock);
213 /* finish lookup/revalidate */
214 static int ll_statahead_interpret(struct obd_export *exp,
215 struct ptlrpc_request *req,
216 struct md_enqueue_info *minfo,
219 struct lookup_intent *it = &minfo->mi_it;
220 struct dentry *dentry = minfo->mi_dentry;
221 struct inode *dir = dentry->d_parent->d_inode;
222 struct ll_inode_info *lli = ll_i2info(dir);
223 struct ll_statahead_info *sai = NULL;
226 CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
227 dentry->d_name.len, dentry->d_name.name, rc);
229 spin_lock(&lli->lli_lock);
230 if (unlikely(lli->lli_sai == NULL ||
231 lli->lli_sai->sai_generation != minfo->mi_generation)) {
232 spin_unlock(&lli->lli_lock);
233 GOTO(out_free, rc = -ESTALE);
235 sai = ll_sai_get(lli->lli_sai);
236 spin_unlock(&lli->lli_lock);
239 if (rc || dir == NULL)
242 if (dentry->d_inode == NULL) {
244 struct dentry *save = dentry;
245 struct it_cb_data icbd = {
247 .icbd_childp = &dentry
250 rc = lookup_it_finish(req, DLM_REPLY_REC_OFF, it, &icbd);
253 * Here dentry->d_inode might be NULL,
254 * because the entry may have been removed before
255 * we start doing stat ahead.
257 ll_lookup_finish_locks(it, dentry);
263 struct mds_body *body;
265 body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
267 if (memcmp(&minfo->mi_data.fid2, &body->fid1,
268 sizeof(body->fid1))) {
269 ll_unhash_aliases(dentry->d_inode);
270 GOTO(out, rc = -EAGAIN);
273 rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, dentry);
275 ll_unhash_aliases(dentry->d_inode);
279 spin_lock(&dcache_lock);
282 #ifdef DCACHE_LUSTRE_INVALID
283 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
285 unlock_dentry(dentry);
286 d_rehash_cond(dentry, 0);
287 spin_unlock(&dcache_lock);
289 ll_lookup_finish_locks(it, dentry);
298 spin_lock(&lli->lli_lock);
299 first = ll_sai_entry_set(sai,
300 (unsigned int)(long)minfo->mi_cbdata,
303 * wake up the "ls -l" process only when the first entry
306 spin_unlock(&lli->lli_lock);
308 cfs_waitq_signal(&sai->sai_waitq);
310 CDEBUG(D_READA, "can't find sai entry for dir "
311 "%lu/%u generation %u index %d\n",
312 dir->i_ino, dir->i_generation,
313 minfo->mi_generation,
314 (unsigned int)(long)minfo->mi_cbdata);
319 ll_intent_release(it);
326 static void sa_args_fini(struct md_enqueue_info *minfo,
327 struct ldlm_enqueue_info *einfo)
329 LASSERT(minfo && einfo);
334 static int sa_args_prep(struct inode *dir, struct dentry *dentry,
335 struct md_enqueue_info **pmi,
336 struct ldlm_enqueue_info **pei)
338 struct ll_inode_info *lli = ll_i2info(dir);
339 struct md_enqueue_info *minfo;
340 struct ldlm_enqueue_info *einfo;
342 OBD_ALLOC_PTR(einfo);
346 OBD_ALLOC_PTR(minfo);
352 minfo->mi_exp = ll_i2mdcexp(dir);
353 minfo->mi_it.it_op = IT_GETATTR;
354 minfo->mi_dentry = dentry;
355 minfo->mi_cb = ll_statahead_interpret;
356 minfo->mi_generation = lli->lli_sai->sai_generation;
357 minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index;
359 einfo->ei_type = LDLM_IBITS;
360 einfo->ei_mode = it_to_lock_mode(&minfo->mi_it);
361 einfo->ei_cb_bl = ll_mdc_blocking_ast;
362 einfo->ei_cb_cp = ldlm_completion_ast;
363 einfo->ei_cb_gl = NULL;
364 einfo->ei_cbdata = NULL;
372 /* similar to ll_lookup_it(). */
373 static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
375 struct md_enqueue_info *minfo;
376 struct ldlm_enqueue_info *einfo;
380 rc = sa_args_prep(dir, dentry, &minfo, &einfo);
384 rc = ll_prepare_mdc_op_data(&minfo->mi_data, dir, NULL,
385 dentry->d_name.name, dentry->d_name.len, 0,
388 rc = mdc_intent_getattr_async(minfo->mi_exp, minfo, einfo);
391 sa_args_fini(minfo, einfo);
397 * similar to ll_revalidate_it().
400 * 0 -- will send stat-ahead request
401 * others -- prepare stat-ahead request failed
403 static int do_sa_revalidate(struct dentry *dentry)
405 struct inode *inode = dentry->d_inode;
407 struct lookup_intent it = { .it_op = IT_GETATTR };
408 struct md_enqueue_info *minfo;
409 struct ldlm_enqueue_info *einfo;
416 if (d_mountpoint(dentry))
419 if (dentry == dentry->d_sb->s_root)
422 ll_inode2fid(&fid, inode);
424 rc = mdc_revalidate_lock(ll_i2mdcexp(inode), &it, &fid);
426 ll_intent_release(&it);
430 rc = sa_args_prep(dentry->d_parent->d_inode, dentry, &minfo, &einfo);
434 rc = ll_prepare_mdc_op_data(&minfo->mi_data, dentry->d_parent->d_inode,
435 inode, dentry->d_name.name,
436 dentry->d_name.len, 0, NULL);
438 rc = mdc_intent_getattr_async(minfo->mi_exp, minfo, einfo);
441 sa_args_fini(minfo, einfo);
446 static inline void ll_name2qstr(struct qstr *this, const char *name, int namelen)
448 unsigned long hash = init_name_hash();
453 for (; namelen > 0; namelen--, name++) {
454 c = *(const unsigned char *)name;
455 hash = partial_name_hash(c, hash);
457 this->hash = end_name_hash(hash);
460 static int ll_statahead_one(struct dentry *parent, ext2_dirent *de)
462 struct inode *dir = parent->d_inode;
463 struct ll_inode_info *lli = ll_i2info(dir);
465 struct dentry *dentry;
466 struct ll_sai_entry *se;
470 #ifdef DCACHE_LUSTRE_INVALID
471 if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
473 if (d_unhashed(parent)) {
475 CDEBUG(D_READA, "parent dentry@%p %.*s is "
476 "invalid, skip statahead\n",
477 parent, parent->d_name.len, parent->d_name.name);
481 se = ll_sai_entry_get(lli->lli_sai, lli->lli_sai->sai_index,
486 ll_name2qstr(&name, de->name, de->name_len);
487 dentry = d_lookup(parent, &name);
489 dentry = d_alloc(parent, &name);
491 rc = do_sa_lookup(dir, dentry);
495 GOTO(out, rc = -ENOMEM);
498 rc = do_sa_revalidate(dentry);
507 CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
508 se, se->se_index, se->se_stat, rc);
510 cfs_waitq_signal(&lli->lli_sai->sai_waitq);
512 lli->lli_sai->sai_sent++;
515 lli->lli_sai->sai_index++;
519 static inline int sa_check_stop(struct ll_statahead_info *sai)
521 return !!(sai->sai_thread.t_flags & SVC_STOPPING);
524 static inline int sa_not_full(struct ll_statahead_info *sai)
526 return sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max;
529 /* (1) hit ratio less than 80%
531 * (2) consecutive miss more than 8
533 static inline int sa_low_hit(struct ll_statahead_info *sai)
535 return ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
536 (sai->sai_consecutive_miss > 8));
539 struct ll_sa_thread_args {
540 struct dentry *sta_parent;
544 static int ll_statahead_thread(void *arg)
546 struct ll_sa_thread_args *sta = arg;
547 struct dentry *parent = dget(sta->sta_parent);
548 struct inode *dir = parent->d_inode;
549 struct ll_inode_info *lli = ll_i2info(dir);
550 struct ll_sb_info *sbi = ll_i2sbi(dir);
551 struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
552 struct ptlrpc_thread *thread = &sai->sai_thread;
553 unsigned long index = 0;
560 snprintf(name, 15, "ll_sa_%u", sta->sta_pid);
562 spin_lock(&lli->lli_lock);
563 thread->t_flags = SVC_RUNNING;
564 spin_unlock(&lli->lli_lock);
565 cfs_waitq_signal(&thread->t_ctl_waitq);
566 CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
569 struct l_wait_info lwi = { 0 };
570 unsigned long npages;
575 npages = dir_pages(dir);
576 /* reach the end of dir */
577 if (index >= npages) {
578 CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
583 page = ll_get_dir_page(dir, index);
586 CERROR("error reading dir %lu/%u page %lu/%u: rc %d\n",
587 dir->i_ino, dir->i_generation, index,
592 kaddr = page_address(page);
593 limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
594 de = (ext2_dirent *)kaddr;
596 de = ext2_next_entry(de); /* skip "." */
597 de = ext2_next_entry(de); /* skip ".." */
600 for (; (char*)de <= limit; de = ext2_next_entry(de)) {
604 if (de->name[0] == '.' && !sai->sai_ls_all) {
605 /* skip hidden files */
606 sai->sai_skip_hidden++;
610 /* don't stat-ahead first entry */
611 if (unlikely(!first)) {
616 l_wait_event(thread->t_ctl_waitq,
617 sa_check_stop(sai) || sa_not_full(sai),
620 if (unlikely(sa_check_stop(sai))) {
625 rc = ll_statahead_one(parent, de);
636 spin_lock(&lli->lli_lock);
637 thread->t_flags = SVC_STOPPED;
638 spin_unlock(&lli->lli_lock);
639 cfs_waitq_signal(&sai->sai_waitq);
640 cfs_waitq_signal(&thread->t_ctl_waitq);
643 CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
648 /* called in ll_file_release() */
649 void ll_stop_statahead(struct inode *inode, void *key)
651 struct ll_inode_info *lli = ll_i2info(inode);
652 struct ptlrpc_thread *thread;
654 spin_lock(&lli->lli_lock);
655 if (lli->lli_opendir_pid == 0 ||
656 unlikely(lli->lli_opendir_key != key)) {
657 spin_unlock(&lli->lli_lock);
661 lli->lli_opendir_key = NULL;
662 lli->lli_opendir_pid = 0;
665 struct l_wait_info lwi = { 0 };
667 thread = &lli->lli_sai->sai_thread;
668 if (!(thread->t_flags & SVC_STOPPED)) {
669 thread->t_flags = SVC_STOPPING;
670 spin_unlock(&lli->lli_lock);
671 cfs_waitq_signal(&thread->t_ctl_waitq);
673 CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
675 l_wait_event(thread->t_ctl_waitq,
676 thread->t_flags & SVC_STOPPED,
679 spin_unlock(&lli->lli_lock);
683 * Put the ref which was held when first statahead_enter.
684 * It maybe not the last ref for some statahead requests
687 ll_sai_put(lli->lli_sai);
690 spin_unlock(&lli->lli_lock);
694 LS_NONE_FIRST_DE = 0, /* not first dirent, or is "." */
695 LS_FIRST_DE, /* the first non-hidden dirent */
696 LS_FIRST_DOT_DE /* the first hidden dirent, that is ".xxx" */
699 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
701 struct qstr *d_name = &dentry->d_name;
702 unsigned long npages, index = 0;
706 int rc = LS_NONE_FIRST_DE, dot_de;
710 npages = dir_pages(dir);
711 /* reach the end of dir */
712 if (index >= npages) {
713 CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
718 page = ll_get_dir_page(dir, index);
721 CERROR("error reading dir %lu/%u page %lu: rc %d\n",
722 dir->i_ino, dir->i_generation, index, rc);
726 kaddr = page_address(page);
727 limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
728 de = (ext2_dirent *)kaddr;
730 de = ext2_next_entry(de); /* skip "." */
731 de = ext2_next_entry(de); /* skip ".." */
734 for (; (char*)de <= limit; de = ext2_next_entry(de)) {
738 if (de->name[0] == '.')
743 if (dot_de && d_name->name[0] != '.') {
744 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
745 d_name->len, d_name->name,
746 de->name_len, de->name);
750 if (d_name->len == de->name_len &&
751 !strncmp(d_name->name, de->name, d_name->len))
752 rc = LS_FIRST_DE + dot_de;
754 rc = LS_NONE_FIRST_DE;
764 /* Start statahead thread if this is the first dir entry.
765 * Otherwise if a thread is started already, wait it until it is ahead of me.
769 * -EEXIST -- stat ahead thread started, and this is the first dentry
770 * -EBADFD -- statahead thread exit and not dentry available
773 int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
775 struct ll_sb_info *sbi = ll_i2sbi(dir);
776 struct ll_inode_info *lli = ll_i2info(dir);
777 struct ll_statahead_info *sai = lli->lli_sai;
778 struct ll_sa_thread_args sta;
779 struct l_wait_info lwi = { 0 };
783 LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
786 if (unlikely(sai->sai_thread.t_flags & SVC_STOPPED &&
787 list_empty(&sai->sai_entries)))
790 if ((*dentryp)->d_name.name[0] == '.') {
791 if (likely(sai->sai_ls_all ||
792 sai->sai_miss_hidden >= sai->sai_skip_hidden)) {
794 * Hidden dentry is the first one, or statahead
795 * thread does not skip so many hidden dentries
796 * before "sai_ls_all" enabled as below.
799 if (!sai->sai_ls_all)
801 * It maybe because hidden dentry is not
802 * the first one, "sai_ls_all" was not
803 * set, then "ls -al" missed. Enable
804 * "sai_ls_all" for such case.
809 * Such "getattr" has been skipped before
810 * "sai_ls_all" enabled as above.
812 sai->sai_miss_hidden++;
817 if (ll_sai_entry_stated(sai)) {
820 sbi->ll_sa_blocked++;
821 /* thread started already, avoid double-stat */
822 l_wait_event(sai->sai_waitq,
823 ll_sai_entry_stated(sai) ||
824 sai->sai_thread.t_flags & SVC_STOPPED,
829 struct dentry *result;
831 result = d_lookup((*dentryp)->d_parent,
832 &(*dentryp)->d_name);
834 LASSERT(result != *dentryp);
840 /* do nothing for revalidate */
844 /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
845 LASSERT(lli->lli_sai == NULL);
847 rc = is_first_dirent(dir, *dentryp);
848 if (rc == LS_NONE_FIRST_DE) {
849 /* It is not "ls -{a}l" operation, no need statahead for it */
850 spin_lock(&lli->lli_lock);
851 lli->lli_opendir_key = NULL;
852 lli->lli_opendir_pid = 0;
853 spin_unlock(&lli->lli_lock);
857 sai = ll_sai_alloc();
861 sai->sai_inode = igrab(dir);
862 sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
864 sta.sta_parent = (*dentryp)->d_parent;
865 sta.sta_pid = cfs_curproc_pid();
868 rc = cfs_kernel_thread(ll_statahead_thread, &sta, 0);
870 CERROR("can't start ll_sa thread, rc: %d\n", rc);
871 sai->sai_thread.t_flags = SVC_STOPPED;
873 LASSERT(lli->lli_sai == NULL);
877 l_wait_event(sai->sai_thread.t_ctl_waitq,
878 sai->sai_thread.t_flags & (SVC_RUNNING | SVC_STOPPED),
882 * We don't stat-ahead for the first dirent since we are already in
883 * lookup, and -EEXIST also indicates that this is the first dirent.
888 /* update hit/miss count */
889 void ll_statahead_exit(struct dentry *dentry, int result)
891 struct dentry *parent = dentry->d_parent;
892 struct ll_inode_info *lli = ll_i2info(parent->d_inode);
893 struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
894 struct ll_dentry_data *ldd = ll_d2d(dentry);
896 if (lli->lli_opendir_pid != cfs_curproc_pid())
900 struct ll_statahead_info *sai = lli->lli_sai;
905 sai->sai_consecutive_miss = 0;
906 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
910 sai->sai_consecutive_miss++;
911 if (sa_low_hit(sai)) {
913 CDEBUG(D_READA, "statahead for dir %.*s hit "
914 "ratio too low: hit/miss %u/%u, "
915 "sent/replied %u/%u. stopping statahead "
917 parent->d_name.len, parent->d_name.name,
918 sai->sai_hit, sai->sai_miss,
919 sai->sai_sent, sai->sai_replied,
921 spin_lock(&lli->lli_lock);
922 if (!(sai->sai_thread.t_flags & SVC_STOPPED))
923 sai->sai_thread.t_flags = SVC_STOPPING;
924 spin_unlock(&lli->lli_lock);
928 cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
929 ll_sai_entry_put(sai);
931 if (likely(ldd != NULL))
932 ldd->lld_sa_generation = sai->sai_generation;