Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2007 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include <linux/fs.h>
23 #include <linux/sched.h>
24 #include <linux/mm.h>
25 #include <linux/smp_lock.h>
26 #include <linux/highmem.h>
27 #include <linux/pagemap.h>
28
29 #define DEBUG_SUBSYSTEM S_LLITE
30
31 #include <obd_support.h>
32 #include <lustre_lite.h>
33 #include <lustre_dlm.h>
34 #include <linux/lustre_version.h>
35 #include "llite_internal.h"
36
37 struct ll_sai_entry {
38         struct list_head        se_list;
39         unsigned int            se_index;
40         int                     se_stat;
41 };
42
43 enum {
44         SA_ENTRY_UNSTATED = 0,
45         SA_ENTRY_STATED
46 };
47
48 static unsigned int sai_generation = 0;
49 static spinlock_t sai_generation_lock = SPIN_LOCK_UNLOCKED;
50
51 static struct ll_statahead_info *ll_sai_alloc(void)
52 {
53         struct ll_statahead_info *sai;
54
55         OBD_ALLOC_PTR(sai);
56         if (!sai)
57                 return NULL;
58
59         spin_lock(&sai_generation_lock);
60         sai->sai_generation = ++sai_generation;
61         if (unlikely(sai_generation == 0))
62                 sai->sai_generation = ++sai_generation;
63         spin_unlock(&sai_generation_lock);
64         atomic_set(&sai->sai_refcount, 1);
65         sai->sai_max = LL_SA_RPC_MIN;
66         cfs_waitq_init(&sai->sai_waitq);
67         cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
68         CFS_INIT_LIST_HEAD(&sai->sai_entries);
69         return sai;
70 }
71
72 static inline 
73 struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
74 {
75         LASSERT(sai);
76         atomic_inc(&sai->sai_refcount);
77         return sai;
78 }
79
80 static void ll_sai_put(struct ll_statahead_info *sai)
81 {
82         struct inode         *inode = sai->sai_inode;
83         struct ll_inode_info *lli = ll_i2info(inode);
84         ENTRY;
85
86         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_lock)) {
87                 struct ll_sai_entry *entry, *next;
88
89                 lli->lli_sai = NULL;
90                 spin_unlock(&lli->lli_lock);
91
92                 LASSERT(sai->sai_thread.t_flags & SVC_STOPPED);
93
94                 if (sai->sai_sent > sai->sai_replied)
95                         CDEBUG(D_READA,"statahead for dir %lu/%u does not "
96                               "finish: [sent:%u] [replied:%u]\n",
97                               inode->i_ino, inode->i_generation,
98                               sai->sai_sent, sai->sai_replied);
99
100                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
101                                          se_list) {
102                         list_del(&entry->se_list);
103                         OBD_FREE_PTR(entry);
104                 }
105                 OBD_FREE_PTR(sai);
106                 iput(inode);
107         }
108         EXIT;
109 }
110
111 static struct ll_sai_entry *
112 ll_sai_entry_get(struct ll_statahead_info *sai, unsigned int index, int stat)
113 {
114         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
115         struct ll_sai_entry  *entry;
116         ENTRY;
117
118         OBD_ALLOC_PTR(entry);
119         if (entry == NULL)
120                 RETURN(ERR_PTR(-ENOMEM));
121         
122         CDEBUG(D_READA, "alloc sai entry %p index %u, stat %d\n",
123                entry, index, stat);
124         entry->se_index = index;
125         entry->se_stat  = stat;
126
127         spin_lock(&lli->lli_lock);
128         list_add_tail(&entry->se_list, &sai->sai_entries);
129         spin_unlock(&lli->lli_lock);
130
131         RETURN(entry);
132 }
133
134 /*
135  * inside lli_lock
136  * return value:
137  *  0: can not find the entry with the index
138  *  1: it is the first entry
139  *  2: it is not the first entry
140  */
141 static int
142 ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat)
143 {
144         struct ll_sai_entry *entry;
145         int                  rc = 0;
146         ENTRY;
147
148         if (list_empty(&sai->sai_entries))
149                 RETURN(0);
150
151         entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, se_list);
152         if (entry->se_index == index)
153                 GOTO(out, rc = 1);
154
155         while (entry->se_list.next != &sai->sai_entries &&
156                entry->se_index < index) {
157                 entry = list_entry(entry->se_list.next, struct ll_sai_entry,
158                                    se_list);
159                 if (entry->se_index == index)
160                         GOTO(out, rc = 2);
161         }
162
163         EXIT;
164
165 out:
166         if (rc) {
167                 LASSERT(entry->se_stat == SA_ENTRY_UNSTATED);
168                 entry->se_stat = stat;
169         }
170
171         return rc;
172 }
173
174 /*
175  * Check whether first entry was stated already or not.
176  * No need to hold lli_lock, for:
177  * (1) it is me that remove entry from the list
178  * (2) the statahead thread only add new entry to the list tail
179  */
180 static int ll_sai_entry_stated(struct ll_statahead_info *sai)
181 {
182         struct ll_sai_entry  *entry;
183         int                   rc = 0;
184         ENTRY;
185
186         if (!list_empty(&sai->sai_entries)) {
187                 entry = list_entry(sai->sai_entries.next, struct ll_sai_entry,
188                                    se_list);
189                 rc = (entry->se_stat != SA_ENTRY_UNSTATED);
190         }
191
192         RETURN(rc);
193 }
194
195 static void ll_sai_entry_put(struct ll_statahead_info *sai)
196 {
197         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
198         struct ll_sai_entry  *entry;
199         ENTRY;
200         
201         spin_lock(&lli->lli_lock);
202         if (!list_empty(&sai->sai_entries)) {
203                 entry = list_entry(sai->sai_entries.next,
204                                    struct ll_sai_entry, se_list);
205                 list_del(&entry->se_list);
206                 OBD_FREE_PTR(entry);
207         }
208         spin_unlock(&lli->lli_lock);
209
210         EXIT;
211 }
212
213 /* finish lookup/revalidate */
214 static int ll_statahead_interpret(struct obd_export *exp,
215                                   struct ptlrpc_request *req,
216                                   struct md_enqueue_info *minfo,
217                                   int rc)
218 {
219         struct lookup_intent     *it = &minfo->mi_it;
220         struct dentry            *dentry = minfo->mi_dentry;
221         struct inode             *dir = dentry->d_parent->d_inode;
222         struct ll_inode_info     *lli = ll_i2info(dir);
223         struct ll_statahead_info *sai = NULL;
224         ENTRY;
225
226         CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
227                dentry->d_name.len, dentry->d_name.name, rc);
228
229         spin_lock(&lli->lli_lock);
230         if (unlikely(lli->lli_sai == NULL ||
231             lli->lli_sai->sai_generation != minfo->mi_generation)) {
232                 spin_unlock(&lli->lli_lock);
233                 GOTO(out_free, rc = -ESTALE);
234         } else {
235                 sai = ll_sai_get(lli->lli_sai);
236                 spin_unlock(&lli->lli_lock);
237         }
238
239         if (rc || dir == NULL)
240                 GOTO(out, rc);
241
242         if (dentry->d_inode == NULL) {
243                 /* lookup */
244                 struct dentry    *save = dentry;
245                 struct it_cb_data icbd = {
246                         .icbd_parent   = dir,
247                         .icbd_childp   = &dentry
248                 };
249
250                 rc = lookup_it_finish(req, DLM_REPLY_REC_OFF, it, &icbd);
251                 if (!rc)
252                         /* 
253                          * Here dentry->d_inode might be NULL,
254                          * because the entry may have been removed before
255                          * we start doing stat ahead.
256                          */
257                         ll_lookup_finish_locks(it, dentry);
258
259                 if (dentry != save)
260                         dput(save);
261         } else {
262                 /* revalidate */
263                 struct mds_body *body;
264
265                 body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
266                                       sizeof(*body));
267                 if (memcmp(&minfo->mi_data.fid2, &body->fid1,
268                            sizeof(body->fid1))) {
269                         ll_unhash_aliases(dentry->d_inode);
270                         GOTO(out, rc = -EAGAIN);
271                 }
272
273                 rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, dentry);
274                 if (rc) {
275                         ll_unhash_aliases(dentry->d_inode);
276                         GOTO(out, rc);
277                 }
278
279                 spin_lock(&dcache_lock);
280                 lock_dentry(dentry);
281                 __d_drop(dentry);
282 #ifdef DCACHE_LUSTRE_INVALID
283                 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
284 #endif
285                 unlock_dentry(dentry);
286                 d_rehash_cond(dentry, 0);
287                 spin_unlock(&dcache_lock);
288
289                 ll_lookup_finish_locks(it, dentry);
290         }
291         EXIT;
292
293 out:
294         if (sai != NULL) {
295                 int first;
296
297                 sai->sai_replied++;
298                 spin_lock(&lli->lli_lock);
299                 first = ll_sai_entry_set(sai,
300                                          (unsigned int)(long)minfo->mi_cbdata,
301                                          SA_ENTRY_STATED);
302                 /*
303                  * wake up the "ls -l" process only when the first entry
304                  * returned.
305                  */
306                 spin_unlock(&lli->lli_lock);
307                 if (first == 1)
308                         cfs_waitq_signal(&sai->sai_waitq);
309                 else if (first == 0)
310                         CDEBUG(D_READA, "can't find sai entry for dir "
311                                "%lu/%u generation %u index %d\n",
312                                dir->i_ino, dir->i_generation,
313                                minfo->mi_generation,
314                                (unsigned int)(long)minfo->mi_cbdata);
315
316                 ll_sai_put(sai);
317         }
318 out_free:
319         ll_intent_release(it);
320         OBD_FREE_PTR(minfo);
321
322         dput(dentry);
323         return rc;
324 }
325
326 static void sa_args_fini(struct md_enqueue_info *minfo,
327                          struct ldlm_enqueue_info *einfo)
328 {
329         LASSERT(minfo && einfo);
330         OBD_FREE_PTR(minfo);
331         OBD_FREE_PTR(einfo);
332 }
333
334 static int sa_args_prep(struct inode *dir, struct dentry *dentry,
335                         struct md_enqueue_info **pmi,
336                         struct ldlm_enqueue_info **pei)
337 {
338         struct ll_inode_info     *lli = ll_i2info(dir);
339         struct md_enqueue_info   *minfo;
340         struct ldlm_enqueue_info *einfo;
341
342         OBD_ALLOC_PTR(einfo);
343         if (einfo == NULL)
344                 return -ENOMEM;
345
346         OBD_ALLOC_PTR(minfo);
347         if (minfo == NULL) {
348                 OBD_FREE_PTR(einfo);
349                 return -ENOMEM;
350         }
351
352         minfo->mi_exp = ll_i2mdcexp(dir);
353         minfo->mi_it.it_op = IT_GETATTR;
354         minfo->mi_dentry = dentry;
355         minfo->mi_cb = ll_statahead_interpret;
356         minfo->mi_generation = lli->lli_sai->sai_generation;
357         minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index;
358
359         einfo->ei_type   = LDLM_IBITS;
360         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
361         einfo->ei_cb_bl  = ll_mdc_blocking_ast;
362         einfo->ei_cb_cp  = ldlm_completion_ast;
363         einfo->ei_cb_gl  = NULL;
364         einfo->ei_cbdata = NULL;
365
366         *pmi = minfo;
367         *pei = einfo;
368
369         return 0;
370 }
371
372 /* similar to ll_lookup_it(). */
373 static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
374 {
375         struct md_enqueue_info   *minfo;
376         struct ldlm_enqueue_info *einfo;
377         int                       rc;                
378         ENTRY;
379
380         rc = sa_args_prep(dir, dentry, &minfo, &einfo);
381         if (rc)
382                 RETURN(rc);
383
384         rc = ll_prepare_mdc_op_data(&minfo->mi_data, dir, NULL,
385                                     dentry->d_name.name, dentry->d_name.len, 0,
386                                     NULL);
387         if (rc == 0)
388                 rc = mdc_intent_getattr_async(minfo->mi_exp, minfo, einfo);
389
390         if (rc)
391                 sa_args_fini(minfo, einfo);
392
393         RETURN(rc);
394 }
395
396 /* 
397  * similar to ll_revalidate_it().
398  * return value:
399  *  1      -- dentry valid
400  *  0      -- will send stat-ahead request
401  *  others -- prepare stat-ahead request failed
402  */
403 static int do_sa_revalidate(struct dentry *dentry)
404 {
405         struct inode             *inode = dentry->d_inode;
406         struct ll_fid             fid;
407         struct lookup_intent      it = { .it_op = IT_GETATTR };
408         struct md_enqueue_info   *minfo;
409         struct ldlm_enqueue_info *einfo;
410         int rc;
411         ENTRY;
412
413         if (inode == NULL)
414                 RETURN(1);
415
416         if (d_mountpoint(dentry))
417                 RETURN(1);
418
419         if (dentry == dentry->d_sb->s_root)
420                 RETURN(1);
421
422         ll_inode2fid(&fid, inode);
423
424         rc = mdc_revalidate_lock(ll_i2mdcexp(inode), &it, &fid);
425         if (rc == 1) {
426                 ll_intent_release(&it);
427                 RETURN(1);
428         }
429
430         rc = sa_args_prep(dentry->d_parent->d_inode, dentry, &minfo, &einfo);
431         if (rc)
432                 RETURN(rc);
433
434         rc = ll_prepare_mdc_op_data(&minfo->mi_data, dentry->d_parent->d_inode,
435                                     inode, dentry->d_name.name,
436                                     dentry->d_name.len, 0, NULL);
437         if (rc == 0)
438                 rc = mdc_intent_getattr_async(minfo->mi_exp, minfo, einfo);
439
440         if (rc)
441                 sa_args_fini(minfo, einfo);
442
443         RETURN(rc);
444 }
445
446 static inline void ll_name2qstr(struct qstr *this, const char *name, int namelen)
447 {
448         unsigned long hash = init_name_hash();
449         unsigned int  c;
450
451         this->name = name;
452         this->len  = namelen;
453         for (; namelen > 0; namelen--, name++) {
454                 c = *(const unsigned char *)name;
455                 hash = partial_name_hash(c, hash);
456         }
457         this->hash = end_name_hash(hash);
458 }
459
460 static int ll_statahead_one(struct dentry *parent, ext2_dirent *de)
461 {
462         struct inode           *dir = parent->d_inode;
463         struct ll_inode_info   *lli = ll_i2info(dir);
464         struct qstr             name;
465         struct dentry          *dentry;
466         struct ll_sai_entry    *se;
467         int                     rc;
468         ENTRY;
469
470 #ifdef DCACHE_LUSTRE_INVALID
471         if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
472 #else
473         if (d_unhashed(parent)) {
474 #endif
475                 CDEBUG(D_READA, "parent dentry@%p %.*s is "
476                        "invalid, skip statahead\n",
477                        parent, parent->d_name.len, parent->d_name.name);
478                 RETURN(-EINVAL);
479         }
480
481         se = ll_sai_entry_get(lli->lli_sai, lli->lli_sai->sai_index,
482                               SA_ENTRY_UNSTATED);
483         if (IS_ERR(se))
484                 RETURN(PTR_ERR(se));
485
486         ll_name2qstr(&name, de->name, de->name_len);
487         dentry = d_lookup(parent, &name);
488         if (!dentry) {
489                 dentry = d_alloc(parent, &name);
490                 if (dentry) {
491                         rc = do_sa_lookup(dir, dentry);
492                         if (rc)
493                                 dput(dentry);
494                 } else {
495                         GOTO(out, rc = -ENOMEM);
496                 }
497         } else {
498                 rc = do_sa_revalidate(dentry);
499                 if (rc)
500                         dput(dentry);
501         }
502
503         EXIT;
504
505 out:
506         if (rc) {
507                 CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
508                        se, se->se_index, se->se_stat, rc);
509                 se->se_stat = rc;
510                 cfs_waitq_signal(&lli->lli_sai->sai_waitq);
511         } else {
512                 lli->lli_sai->sai_sent++;
513         }
514
515         lli->lli_sai->sai_index++;
516         return rc;
517 }
518                 
519 static inline int sa_check_stop(struct ll_statahead_info *sai)
520 {
521         return !!(sai->sai_thread.t_flags & SVC_STOPPING);
522 }
523
524 static inline int sa_not_full(struct ll_statahead_info *sai)
525 {
526         return sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max;
527 }
528
529 /* (1) hit ratio less than 80%
530  * or
531  * (2) consecutive miss more than 8
532  */
533 static inline int sa_low_hit(struct ll_statahead_info *sai)
534 {
535         return ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
536                 (sai->sai_consecutive_miss > 8));
537 }
538
539 struct ll_sa_thread_args {
540         struct dentry   *sta_parent;
541         pid_t            sta_pid;
542 };
543
544 static int ll_statahead_thread(void *arg)
545 {
546         struct ll_sa_thread_args *sta = arg;
547         struct dentry            *parent = dget(sta->sta_parent);
548         struct inode             *dir = parent->d_inode;
549         struct ll_inode_info     *lli = ll_i2info(dir);
550         struct ll_sb_info        *sbi = ll_i2sbi(dir);
551         struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
552         struct ptlrpc_thread     *thread = &sai->sai_thread;
553         unsigned long             index = 0;
554         int                       first = 0;
555         int                       rc = 0;
556         char                      name[16] = "";
557         ENTRY;
558
559         sbi->ll_sa_total++;
560         snprintf(name, 15, "ll_sa_%u", sta->sta_pid);
561         cfs_daemonize(name);
562         spin_lock(&lli->lli_lock);
563         thread->t_flags = SVC_RUNNING;
564         spin_unlock(&lli->lli_lock);
565         cfs_waitq_signal(&thread->t_ctl_waitq);
566         CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
567
568         while (1) {
569                 struct l_wait_info lwi = { 0 };
570                 unsigned long npages;
571                 char *kaddr, *limit;
572                 ext2_dirent *de;
573                 struct page *page;
574
575                 npages = dir_pages(dir);
576                 /* reach the end of dir */
577                 if (index >= npages) {
578                         CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
579                                index, npages);
580                         break;
581                 }
582
583                 page = ll_get_dir_page(dir, index);
584                 if (IS_ERR(page)) {
585                         rc = PTR_ERR(page);
586                         CERROR("error reading dir %lu/%u page %lu/%u: rc %d\n",
587                                dir->i_ino, dir->i_generation, index,
588                                sai->sai_index, rc);
589                         break;
590                 }
591
592                 kaddr = page_address(page);
593                 limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
594                 de = (ext2_dirent *)kaddr;
595                 if (!index) {
596                         de = ext2_next_entry(de); /* skip "." */
597                         de = ext2_next_entry(de); /* skip ".." */
598                 }
599
600                 for (; (char*)de <= limit; de = ext2_next_entry(de)) {
601                         if (!de->inode)
602                                 continue;
603
604                         if (de->name[0] == '.' && !sai->sai_ls_all) {
605                                 /* skip hidden files */
606                                 sai->sai_skip_hidden++;
607                                 continue;
608                         }
609
610                         /* don't stat-ahead first entry */
611                         if (unlikely(!first)) {
612                                 first++;
613                                 continue;
614                         }
615
616                         l_wait_event(thread->t_ctl_waitq,
617                                      sa_check_stop(sai) || sa_not_full(sai),
618                                      &lwi);
619
620                         if (unlikely(sa_check_stop(sai))) {
621                                 ext2_put_page(page);
622                                 GOTO(out, rc);
623                         }
624
625                         rc = ll_statahead_one(parent, de);
626                         if (rc < 0) {
627                                 ext2_put_page(page);
628                                 GOTO(out, rc);
629                         }
630                 }
631                 ext2_put_page(page);
632                 index++;
633         }
634         EXIT;
635 out:
636         spin_lock(&lli->lli_lock);
637         thread->t_flags = SVC_STOPPED;
638         spin_unlock(&lli->lli_lock);
639         cfs_waitq_signal(&sai->sai_waitq);
640         cfs_waitq_signal(&thread->t_ctl_waitq);
641         ll_sai_put(sai);
642         dput(parent);
643         CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
644                cfs_curproc_pid());
645         return rc;
646 }
647
648 /* called in ll_file_release() */
649 void ll_stop_statahead(struct inode *inode, void *key)
650 {
651         struct ll_inode_info *lli = ll_i2info(inode);
652         struct ptlrpc_thread *thread;
653
654         spin_lock(&lli->lli_lock);
655         if (lli->lli_opendir_pid == 0 ||
656             unlikely(lli->lli_opendir_key != key)) {
657                 spin_unlock(&lli->lli_lock);
658                 return;
659         }
660
661         lli->lli_opendir_key = NULL;
662         lli->lli_opendir_pid = 0;
663
664         if (lli->lli_sai) {
665                 struct l_wait_info lwi = { 0 };
666
667                 thread = &lli->lli_sai->sai_thread;
668                 if (!(thread->t_flags & SVC_STOPPED)) {
669                         thread->t_flags = SVC_STOPPING;
670                         spin_unlock(&lli->lli_lock);
671                         cfs_waitq_signal(&thread->t_ctl_waitq);
672
673                         CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
674                                cfs_curproc_pid());
675                         l_wait_event(thread->t_ctl_waitq,
676                                      thread->t_flags & SVC_STOPPED,
677                                      &lwi);
678                 } else {
679                         spin_unlock(&lli->lli_lock);
680                 }
681
682                 /*
683                  * Put the ref which was held when first statahead_enter.
684                  * It maybe not the last ref for some statahead requests
685                  * maybe inflight.
686                  */
687                 ll_sai_put(lli->lli_sai);
688                 return;
689         }
690         spin_unlock(&lli->lli_lock);
691 }
692
693 enum {
694         LS_NONE_FIRST_DE = 0,   /* not first dirent, or is "." */
695         LS_FIRST_DE,            /* the first non-hidden dirent */
696         LS_FIRST_DOT_DE         /* the first hidden dirent, that is ".xxx" */
697 };
698
699 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
700 {
701         struct qstr   *d_name = &dentry->d_name;
702         unsigned long  npages, index = 0;
703         struct page   *page;
704         ext2_dirent   *de;
705         char          *kaddr, *limit;
706         int            rc = LS_NONE_FIRST_DE, dot_de;
707         ENTRY;
708
709         while (1) {
710                 npages = dir_pages(dir);
711                 /* reach the end of dir */
712                 if (index >= npages) {
713                         CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
714                                index, npages);
715                         break;
716                 }
717
718                 page = ll_get_dir_page(dir, index);
719                 if (IS_ERR(page)) {
720                         rc = PTR_ERR(page);
721                         CERROR("error reading dir %lu/%u page %lu: rc %d\n",
722                                dir->i_ino, dir->i_generation, index, rc);
723                         break;
724                 }
725
726                 kaddr = page_address(page);
727                 limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1);
728                 de = (ext2_dirent *)kaddr;
729                 if (!index) {
730                         de = ext2_next_entry(de); /* skip "." */
731                         de = ext2_next_entry(de); /* skip ".." */
732                 }
733
734                 for (; (char*)de <= limit; de = ext2_next_entry(de)) {
735                         if (!de->inode)
736                                 continue;
737
738                         if (de->name[0] == '.')
739                                 dot_de = 1;
740                         else
741                                 dot_de = 0;
742
743                         if (dot_de && d_name->name[0] != '.') {
744                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
745                                        d_name->len, d_name->name,
746                                        de->name_len, de->name);
747                                 continue;
748                         }
749
750                         if (d_name->len == de->name_len &&
751                             !strncmp(d_name->name, de->name, d_name->len))
752                                 rc = LS_FIRST_DE + dot_de;
753                         else
754                                 rc = LS_NONE_FIRST_DE;
755                         ext2_put_page(page);
756                         RETURN(rc);
757                 }
758                 ext2_put_page(page);
759                 index++;
760         }
761         RETURN(rc);
762 }
763
764 /* Start statahead thread if this is the first dir entry.
765  * Otherwise if a thread is started already, wait it until it is ahead of me.
766  * Return value: 
767  *  0       -- miss
768  *  1       -- hit
769  *  -EEXIST -- stat ahead thread started, and this is the first dentry
770  *  -EBADFD -- statahead thread exit and not dentry available
771  *  others  -- error
772  */
773 int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
774 {
775         struct ll_sb_info        *sbi = ll_i2sbi(dir);
776         struct ll_inode_info     *lli = ll_i2info(dir);
777         struct ll_statahead_info *sai = lli->lli_sai;
778         struct ll_sa_thread_args  sta;
779         struct l_wait_info        lwi = { 0 };
780         int                       rc;
781         ENTRY;
782
783         LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
784
785         if (sai) {
786                 if (unlikely(sai->sai_thread.t_flags & SVC_STOPPED &&
787                              list_empty(&sai->sai_entries)))
788                         RETURN(-EBADFD);
789
790                 if ((*dentryp)->d_name.name[0] == '.') {
791                         if (likely(sai->sai_ls_all ||
792                             sai->sai_miss_hidden >= sai->sai_skip_hidden)) {
793                                 /*
794                                  * Hidden dentry is the first one, or statahead
795                                  * thread does not skip so many hidden dentries
796                                  * before "sai_ls_all" enabled as below.
797                                  */
798                         } else {
799                                 if (!sai->sai_ls_all)
800                                         /*
801                                          * It maybe because hidden dentry is not
802                                          * the first one, "sai_ls_all" was not
803                                          * set, then "ls -al" missed. Enable
804                                          * "sai_ls_all" for such case.
805                                          */
806                                         sai->sai_ls_all = 1;
807
808                                 /*
809                                  * Such "getattr" has been skipped before
810                                  * "sai_ls_all" enabled as above.
811                                  */
812                                 sai->sai_miss_hidden++;
813                                 RETURN(-ENOENT);
814                         }
815                 }
816
817                 if (ll_sai_entry_stated(sai)) {
818                         sbi->ll_sa_cached++;
819                 } else {
820                         sbi->ll_sa_blocked++;
821                         /* thread started already, avoid double-stat */
822                         l_wait_event(sai->sai_waitq,
823                                      ll_sai_entry_stated(sai) ||
824                                      sai->sai_thread.t_flags & SVC_STOPPED,
825                                      &lwi);
826                 }
827
828                 if (lookup) {
829                         struct dentry *result;
830
831                         result = d_lookup((*dentryp)->d_parent,
832                                           &(*dentryp)->d_name);
833                         if (result) {
834                                 LASSERT(result != *dentryp);
835                                 dput(*dentryp);
836                                 *dentryp = result;
837                                 RETURN(1);
838                         }
839                 }
840                 /* do nothing for revalidate */
841                 RETURN(0);
842         }
843
844          /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */ 
845         LASSERT(lli->lli_sai == NULL);
846
847         rc = is_first_dirent(dir, *dentryp);
848         if (rc == LS_NONE_FIRST_DE) {
849                 /* It is not "ls -{a}l" operation, no need statahead for it */
850                 spin_lock(&lli->lli_lock);
851                 lli->lli_opendir_key = NULL;
852                 lli->lli_opendir_pid = 0;
853                 spin_unlock(&lli->lli_lock);
854                 RETURN(-EBADF);
855         }
856
857         sai = ll_sai_alloc();
858         if (sai == NULL)
859                 RETURN(-ENOMEM);
860         
861         sai->sai_inode  = igrab(dir);
862         sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
863
864         sta.sta_parent = (*dentryp)->d_parent;
865         sta.sta_pid    = cfs_curproc_pid();
866
867         lli->lli_sai = sai;
868         rc = cfs_kernel_thread(ll_statahead_thread, &sta, 0);
869         if (rc < 0) {
870                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
871                 sai->sai_thread.t_flags = SVC_STOPPED;
872                 ll_sai_put(sai);
873                 LASSERT(lli->lli_sai == NULL);
874                 RETURN(rc);
875         }
876
877         l_wait_event(sai->sai_thread.t_ctl_waitq, 
878                      sai->sai_thread.t_flags & (SVC_RUNNING | SVC_STOPPED),
879                      &lwi);
880
881         /*
882          * We don't stat-ahead for the first dirent since we are already in
883          * lookup, and -EEXIST also indicates that this is the first dirent.
884          */
885         RETURN(-EEXIST);
886 }
887
888 /* update hit/miss count */
889 void ll_statahead_exit(struct dentry *dentry, int result)
890 {
891         struct dentry         *parent = dentry->d_parent;
892         struct ll_inode_info  *lli = ll_i2info(parent->d_inode);
893         struct ll_sb_info     *sbi = ll_i2sbi(parent->d_inode);
894         struct ll_dentry_data *ldd = ll_d2d(dentry);
895
896         if (lli->lli_opendir_pid != cfs_curproc_pid())
897                 return;
898
899         if (lli->lli_sai) {
900                 struct ll_statahead_info *sai = lli->lli_sai;
901
902                 if (result == 1) {
903                         sbi->ll_sa_hit++;
904                         sai->sai_hit++;
905                         sai->sai_consecutive_miss = 0;
906                         sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
907                 } else {
908                         sbi->ll_sa_miss++;
909                         sai->sai_miss++;
910                         sai->sai_consecutive_miss++;
911                         if (sa_low_hit(sai)) {
912                                 sbi->ll_sa_wrong++;
913                                 CDEBUG(D_READA, "statahead for dir %.*s hit "
914                                        "ratio too low: hit/miss %u/%u, "
915                                        "sent/replied %u/%u. stopping statahead "
916                                        "thread: pid %d\n",
917                                        parent->d_name.len, parent->d_name.name,
918                                        sai->sai_hit, sai->sai_miss,
919                                        sai->sai_sent, sai->sai_replied,
920                                        cfs_curproc_pid());
921                                 spin_lock(&lli->lli_lock);
922                                 if (!(sai->sai_thread.t_flags & SVC_STOPPED))
923                                         sai->sai_thread.t_flags = SVC_STOPPING;
924                                 spin_unlock(&lli->lli_lock);
925                         }
926                 }
927
928                 cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
929                 ll_sai_entry_put(sai);
930
931                 if (likely(ldd != NULL))
932                         ldd->lld_sa_generation = sai->sai_generation;
933         }
934 }