Whamcloud - gitweb
b=15908
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2007 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include <linux/fs.h>
23 #include <linux/sched.h>
24 #include <linux/mm.h>
25 #include <linux/smp_lock.h>
26 #include <linux/highmem.h>
27 #include <linux/pagemap.h>
28
29 #define DEBUG_SUBSYSTEM S_LLITE
30
31 #include <obd_support.h>
32 #include <lustre_lite.h>
33 #include <lustre_dlm.h>
34 #include <linux/lustre_version.h>
35 #include "llite_internal.h"
36
37 struct ll_sai_entry {
38         struct list_head        se_list;
39         unsigned int            se_index;
40         int                     se_stat;
41 };
42
43 enum {
44         SA_ENTRY_UNSTATED = 0,
45         SA_ENTRY_STATED
46 };
47
48 static unsigned int sai_generation = 0;
49 static spinlock_t sai_generation_lock = SPIN_LOCK_UNLOCKED;
50
51 static struct ll_statahead_info *ll_sai_alloc(void)
52 {
53         struct ll_statahead_info *sai;
54
55         OBD_ALLOC_PTR(sai);
56         if (!sai)
57                 return NULL;
58
59         spin_lock(&sai_generation_lock);
60         sai->sai_generation = ++sai_generation;
61         if (unlikely(sai_generation == 0))
62                 sai->sai_generation = ++sai_generation;
63         spin_unlock(&sai_generation_lock);
64         atomic_set(&sai->sai_refcount, 1);
65         sai->sai_max = LL_SA_RPC_MIN;
66         cfs_waitq_init(&sai->sai_waitq);
67         cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
68         CFS_INIT_LIST_HEAD(&sai->sai_entries);
69         return sai;
70 }
71
72 static inline 
73 struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
74 {
75         LASSERT(sai);
76         atomic_inc(&sai->sai_refcount);
77         return sai;
78 }
79
80 static void ll_sai_put(struct ll_statahead_info *sai)
81 {
82         struct inode         *inode = sai->sai_inode;
83         struct ll_inode_info *lli = ll_i2info(inode);
84         ENTRY;
85
86         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_lock)) {
87                 struct ll_sai_entry *entry, *next;
88
89                 lli->lli_sai = NULL;
90                 spin_unlock(&lli->lli_lock);
91
92                 LASSERT(sai->sai_thread.t_flags & SVC_STOPPED);
93
94                 if (sai->sai_sent > sai->sai_replied)
95                         CDEBUG(D_READA,"statahead for dir "DFID" does not "
96                               "finish: [sent:%u] [replied:%u]\n",
97                               PFID(&lli->lli_fid),
98                               sai->sai_sent, sai->sai_replied);
99
100                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
101                                          se_list) {
102                         list_del(&entry->se_list);
103                         OBD_FREE_PTR(entry);
104                 }
105                 OBD_FREE_PTR(sai);
106                 iput(inode);
107         }
108         EXIT;
109 }
110
111 static struct ll_sai_entry *
112 ll_sai_entry_get(struct ll_statahead_info *sai, unsigned int index, int stat)
113 {
114         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
115         struct ll_sai_entry  *entry;
116         ENTRY;
117
118         OBD_ALLOC_PTR(entry);
119         if (entry == NULL)
120                 RETURN(ERR_PTR(-ENOMEM));
121
122         CDEBUG(D_READA, "alloc sai entry %p index %u, stat %d\n",
123                entry, index, stat);
124         entry->se_index = index;
125         entry->se_stat  = stat;
126
127         spin_lock(&lli->lli_lock);
128         list_add_tail(&entry->se_list, &sai->sai_entries);
129         spin_unlock(&lli->lli_lock);
130
131         RETURN(entry);
132 }
133
134 /* inside lli_lock
135  * return value:
136  *  0: can not find the entry with the index
137  *  1: it is the first entry
138  *  2: it is not the first entry */
139 static int
140 ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat)
141 {
142         struct ll_sai_entry *entry;
143         int                  rc = 0;
144         ENTRY;
145
146         if (list_empty(&sai->sai_entries))
147                 RETURN(0);
148
149         entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, se_list);
150         if (entry->se_index == index)
151                 GOTO(out, rc = 1);
152
153         while (entry->se_list.next != &sai->sai_entries &&
154                entry->se_index < index) {
155                 entry = list_entry(entry->se_list.next, struct ll_sai_entry,
156                                    se_list);
157                 if (entry->se_index == index)
158                         GOTO(out, rc = 2);
159         }
160
161         EXIT;
162
163 out:
164         if (rc) {
165                 LASSERT(entry->se_stat == SA_ENTRY_UNSTATED);
166                 entry->se_stat = stat;
167         }
168
169         return rc;
170 }
171
172 /* Check whether first entry was stated already or not.
173  * No need to hold lli_lock, for:
174  * (1) it is me that remove entry from the list (ll_sai_entry_put)
175  * (2) the statahead thread only add new entry to the list tail */
176 static int ll_sai_entry_stated(struct ll_statahead_info *sai)
177 {
178         struct ll_sai_entry  *entry;
179         int                   rc = 0;
180         ENTRY;
181
182         if (!list_empty(&sai->sai_entries)) {
183                 entry = list_entry(sai->sai_entries.next, struct ll_sai_entry,
184                                    se_list);
185                 rc = (entry->se_stat != SA_ENTRY_UNSTATED);
186         }
187
188         RETURN(rc);
189 }
190
191 static void ll_sai_entry_put(struct ll_statahead_info *sai)
192 {
193         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
194         struct ll_sai_entry  *entry;
195         ENTRY;
196
197         spin_lock(&lli->lli_lock);
198         if (!list_empty(&sai->sai_entries)) {
199                 entry = list_entry(sai->sai_entries.next,
200                                    struct ll_sai_entry, se_list);
201                 list_del(&entry->se_list);
202                 OBD_FREE_PTR(entry);
203         }
204         spin_unlock(&lli->lli_lock);
205
206         EXIT;
207 }
208
209 /* finish lookup/revalidate */
210 static int ll_statahead_interpret(struct ptlrpc_request *req,
211                                   struct md_enqueue_info *minfo,
212                                   int rc)
213 {
214         struct lookup_intent     *it = &minfo->mi_it;
215         struct dentry            *dentry = minfo->mi_dentry;
216         struct inode             *dir = dentry->d_parent->d_inode;
217         struct ll_inode_info     *lli = ll_i2info(dir);
218         struct ll_statahead_info *sai = NULL;
219         ENTRY;
220
221         CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
222                dentry->d_name.len, dentry->d_name.name, rc);
223
224         spin_lock(&lli->lli_lock);
225         if (unlikely(lli->lli_sai == NULL ||
226             lli->lli_sai->sai_generation != minfo->mi_generation)) {
227                 spin_unlock(&lli->lli_lock);
228                 GOTO(out_free, rc = -ESTALE);
229         } else {
230                 sai = ll_sai_get(lli->lli_sai);
231                 spin_unlock(&lli->lli_lock);
232         }
233
234         if (rc || dir == NULL)
235                 GOTO(out, rc);
236
237         if (dentry->d_inode == NULL) {
238                 /* lookup */
239                 struct dentry    *save = dentry;
240                 struct it_cb_data icbd = {
241                         .icbd_parent   = dir,
242                         .icbd_childp   = &dentry
243                 };
244
245                 LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
246
247                 rc = ll_lookup_it_finish(req, it, &icbd);
248                 if (!rc)
249                         /* Here dentry->d_inode might be NULL,
250                          * because the entry may have been removed before
251                          * we start doing stat ahead. */
252                         ll_lookup_finish_locks(it, dentry);
253
254                 if (dentry != save)
255                         dput(save);
256         } else {
257                 /* revalidate */
258                 struct mdt_body *body;
259
260                 body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
261                                       sizeof(*body));
262                 if (!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1)) {
263                         ll_unhash_aliases(dentry->d_inode);
264                         GOTO(out, rc = -EAGAIN);
265                 }
266
267                 rc = ll_revalidate_it_finish(req, it, dentry);
268                 if (rc) {
269                         ll_unhash_aliases(dentry->d_inode);
270                         GOTO(out, rc);
271                 }
272
273                 spin_lock(&dcache_lock);
274                 lock_dentry(dentry);
275                 __d_drop(dentry);
276 #ifdef DCACHE_LUSTRE_INVALID
277                 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
278 #endif
279                 unlock_dentry(dentry);
280                 d_rehash_cond(dentry, 0);
281                 spin_unlock(&dcache_lock);
282
283                 ll_lookup_finish_locks(it, dentry);
284         }
285         EXIT;
286
287 out:
288         if (sai != NULL) {
289                 int first;
290
291                 sai->sai_replied++;
292                 spin_lock(&lli->lli_lock);
293                 first = ll_sai_entry_set(sai,
294                                          (unsigned int)(long)minfo->mi_cbdata,
295                                          SA_ENTRY_STATED);
296                 spin_unlock(&lli->lli_lock);
297                 if (first == 1)
298                         /* wake up the "ls -l" process only when the first entry
299                          * returned. */
300                         cfs_waitq_signal(&sai->sai_waitq);
301                 else if (first == 0)
302                         CDEBUG(D_READA, "can't find sai entry for dir "
303                                DFID" generation %u index %u\n",
304                                PFID(&lli->lli_fid),
305                                minfo->mi_generation,
306                                (unsigned int)(long)minfo->mi_cbdata);
307
308                 ll_sai_put(sai);
309         }
310 out_free:
311         ll_intent_release(it);
312         OBD_FREE_PTR(minfo);
313
314         dput(dentry);
315         return rc;
316 }
317
318 static void sa_args_fini(struct md_enqueue_info *minfo,
319                          struct ldlm_enqueue_info *einfo)
320 {
321         LASSERT(minfo && einfo);
322         capa_put(minfo->mi_data.op_capa1);
323         capa_put(minfo->mi_data.op_capa2);
324         OBD_FREE_PTR(minfo);
325         OBD_FREE_PTR(einfo);
326 }
327
328 /* There is race condition between "capa_put" and "ll_statahead_interpret" for
329  * accessing "op_data.op_capa[1,2]" as following:
330  * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling
331  * "md_intent_getattr_async". But "ll_statahead_interpret" maybe run first, and
332  * fill "op_data.op_capa[1,2]" as POISON, then cause "capa_put" access invalid
333  * "ocapa". So here reserve "op_data.op_capa[1,2]" in "pcapa" before calling
334  * "md_intent_getattr_async". */
335 static int sa_args_init(struct inode *dir, struct dentry *dentry,
336                         struct md_enqueue_info **pmi,
337                         struct ldlm_enqueue_info **pei,
338                         struct obd_capa **pcapa)
339 {
340         struct ll_inode_info     *lli = ll_i2info(dir);
341         struct md_enqueue_info   *minfo;
342         struct ldlm_enqueue_info *einfo;
343         struct md_op_data        *op_data;
344
345         OBD_ALLOC_PTR(einfo);
346         if (einfo == NULL)
347                 return -ENOMEM;
348
349         OBD_ALLOC_PTR(minfo);
350         if (minfo == NULL) {
351                 OBD_FREE_PTR(einfo);
352                 return -ENOMEM;
353         }
354
355         op_data = ll_prep_md_op_data(&minfo->mi_data, dir, dentry->d_inode,
356                                      dentry->d_name.name, dentry->d_name.len,
357                                      0, LUSTRE_OPC_ANY, NULL);
358         if (IS_ERR(op_data)) {
359                 OBD_FREE_PTR(einfo);
360                 OBD_FREE_PTR(minfo);
361                 return PTR_ERR(op_data);
362         }
363
364         minfo->mi_it.it_op = IT_GETATTR;
365         minfo->mi_dentry = dentry;
366         minfo->mi_cb = ll_statahead_interpret;
367         minfo->mi_generation = lli->lli_sai->sai_generation;
368         minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index;
369
370         einfo->ei_type   = LDLM_IBITS;
371         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
372         einfo->ei_cb_bl  = ll_md_blocking_ast;
373         einfo->ei_cb_cp  = ldlm_completion_ast;
374         einfo->ei_cb_gl  = NULL;
375         einfo->ei_cbdata = NULL;
376
377         *pmi = minfo;
378         *pei = einfo;
379         pcapa[0] = op_data->op_capa1;
380         pcapa[1] = op_data->op_capa2;
381
382         return 0;
383 }
384
385 /* similar to ll_lookup_it(). */
386 static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
387 {
388         struct md_enqueue_info   *minfo;
389         struct ldlm_enqueue_info *einfo;
390         struct obd_capa          *capas[2];
391         int                       rc;
392         ENTRY;
393
394         rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
395         if (rc)
396                 RETURN(rc);
397
398         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
399         if (!rc) {
400                 capa_put(capas[0]);
401                 capa_put(capas[1]);
402         } else {
403                 sa_args_fini(minfo, einfo);
404         }
405
406         RETURN(rc);
407 }
408
409 /* similar to ll_revalidate_it().
410  * return value:
411  *  1      -- dentry valid
412  *  0      -- will send stat-ahead request
413  *  others -- prepare stat-ahead request failed */
414 static int do_sa_revalidate(struct dentry *dentry)
415 {
416         struct inode             *inode = dentry->d_inode;
417         struct inode             *dir = dentry->d_parent->d_inode;
418         struct lookup_intent      it = { .it_op = IT_GETATTR };
419         struct md_enqueue_info   *minfo;
420         struct ldlm_enqueue_info *einfo;
421         struct obd_capa          *capas[2];
422         int rc;
423         ENTRY;
424
425         if (inode == NULL)
426                 RETURN(1);
427
428         if (d_mountpoint(dentry))
429                 RETURN(1);
430
431         if (dentry == dentry->d_sb->s_root)
432                 RETURN(1);
433
434         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode));
435         if (rc == 1) {
436                 ll_intent_release(&it);
437                 RETURN(1);
438         }
439
440         rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
441         if (rc)
442                 RETURN(rc);
443
444         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
445         if (!rc) {
446                 capa_put(capas[0]);
447                 capa_put(capas[1]);
448         } else {
449                 sa_args_fini(minfo, einfo);
450         }
451
452         RETURN(rc);
453 }
454
455 static inline void ll_name2qstr(struct qstr *this, const char *name, int namelen)
456 {
457         unsigned long hash = init_name_hash();
458         unsigned int  c;
459
460         this->name = name;
461         this->len  = namelen;
462         for (; namelen > 0; namelen--, name++) {
463                 c = *(const unsigned char *)name;
464                 hash = partial_name_hash(c, hash);
465         }
466         this->hash = end_name_hash(hash);
467 }
468
469 static int ll_statahead_one(struct dentry *parent, const char* entry_name,
470                             int entry_name_len)
471 {
472         struct inode             *dir = parent->d_inode;
473         struct ll_inode_info     *lli = ll_i2info(dir);
474         struct ll_statahead_info *sai = lli->lli_sai;
475         struct qstr               name;
476         struct dentry            *dentry;
477         struct ll_sai_entry      *se;
478         int                       rc;
479         ENTRY;
480
481 #ifdef DCACHE_LUSTRE_INVALID
482         if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
483 #else
484         if (d_unhashed(parent)) {
485 #endif
486                 CDEBUG(D_READA, "parent dentry@%p %.*s is "
487                        "invalid, skip statahead\n",
488                        parent, parent->d_name.len, parent->d_name.name);
489                 RETURN(-EINVAL);
490         }
491
492         se = ll_sai_entry_get(sai, sai->sai_index, SA_ENTRY_UNSTATED);
493         if (IS_ERR(se))
494                 RETURN(PTR_ERR(se));
495
496         ll_name2qstr(&name, entry_name, entry_name_len);
497         dentry = d_lookup(parent, &name);
498         if (!dentry) {
499                 dentry = d_alloc(parent, &name);
500                 if (dentry) {
501                         rc = do_sa_lookup(dir, dentry);
502                         if (rc)
503                                 dput(dentry);
504                 } else {
505                         GOTO(out, rc = -ENOMEM);
506                 }
507         } else {
508                 rc = do_sa_revalidate(dentry);
509                 if (rc)
510                         dput(dentry);
511         }
512
513         EXIT;
514
515 out:
516         if (rc) {
517                 CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
518                        se, se->se_index, se->se_stat, rc);
519                 se->se_stat = rc;
520                 cfs_waitq_signal(&sai->sai_waitq);
521         } else {
522                 sai->sai_sent++;
523         }
524
525         sai->sai_index++;
526         return rc;
527 }
528
529 static inline int sa_check_stop(struct ll_statahead_info *sai)
530 {
531         return !!(sai->sai_thread.t_flags & SVC_STOPPING);
532 }
533
534 static inline int sa_not_full(struct ll_statahead_info *sai)
535 {
536         return sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max;
537 }
538
539 /* (1) hit ratio less than 80%
540  * or
541  * (2) consecutive miss more than 8 */
542 static inline int sa_low_hit(struct ll_statahead_info *sai)
543 {
544         return ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
545                 (sai->sai_consecutive_miss > 8));
546 }
547
548 struct ll_sa_thread_args {
549         struct dentry   *sta_parent;
550         pid_t            sta_pid;
551 };
552
553 static int ll_statahead_thread(void *arg)
554 {
555         struct ll_sa_thread_args *sta = arg;
556         struct dentry            *parent = dget(sta->sta_parent);
557         struct inode             *dir = parent->d_inode;
558         struct ll_inode_info     *lli = ll_i2info(dir);
559         struct ll_sb_info        *sbi = ll_i2sbi(dir);
560         struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
561         struct ptlrpc_thread     *thread = &sai->sai_thread;
562         struct page              *page;
563         __u64                     pos = 0;
564         int                       first = 0;
565         int                       rc = 0;
566         struct ll_dir_chain       chain;
567         ENTRY;
568
569         {
570                 char pname[16];
571                 snprintf(pname, 15, "ll_sa_%u", sta->sta_pid);
572                 cfs_daemonize(pname);
573         }
574
575         sbi->ll_sa_total++;
576         spin_lock(&lli->lli_lock);
577         thread->t_flags = SVC_RUNNING;
578         spin_unlock(&lli->lli_lock);
579         cfs_waitq_signal(&thread->t_ctl_waitq);
580         CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
581
582         ll_dir_chain_init(&chain);
583         page = ll_get_dir_page(dir, pos, 0, &chain);
584
585         while (1) {
586                 struct lu_dirpage *dp;
587                 struct lu_dirent  *ent;
588
589                 if (IS_ERR(page)) {
590                         rc = PTR_ERR(page);
591                         CERROR("error reading dir "DFID" at %llu/%u: rc %d\n",
592                                PFID(ll_inode2fid(dir)), pos,
593                                sai->sai_index, rc);
594                         break;
595                 }
596
597                 dp = page_address(page);
598                 for (ent = lu_dirent_start(dp); ent != NULL;
599                      ent = lu_dirent_next(ent)) {
600                         struct l_wait_info lwi = { 0 };
601                         char *name = ent->lde_name;
602                         int namelen = le16_to_cpu(ent->lde_namelen);
603
604                         if (namelen == 0)
605                                 /* Skip dummy record. */
606                                 continue;
607
608                         if (name[0] == '.') {
609                                 if (namelen == 1) {
610                                         /* skip . */
611                                         continue;
612                                 } else if (name[1] == '.' && namelen == 2) {
613                                         /* skip .. */
614                                         continue;
615                                 } else if (!sai->sai_ls_all) {
616                                         /* skip hidden files */
617                                         sai->sai_skip_hidden++;
618                                         continue;
619                                 }
620                         }
621
622                         /* don't stat-ahead first entry */
623                         if (unlikely(!first)) {
624                                 first++;
625                                 continue;
626                         }
627
628                         l_wait_event(thread->t_ctl_waitq,
629                                      sa_check_stop(sai) || sa_not_full(sai),
630                                      &lwi);
631
632                         if (unlikely(sa_check_stop(sai))) {
633                                 ll_put_page(page);
634                                 GOTO(out, rc);
635                         }
636
637                         rc = ll_statahead_one(parent, name, namelen);
638                         if (rc < 0) {
639                                 ll_put_page(page);
640                                 GOTO(out, rc);
641                         }
642                 }
643                 pos = le64_to_cpu(dp->ldp_hash_end);
644                 ll_put_page(page);
645                 if (pos == DIR_END_OFF) {
646                         /* End of directory reached. */
647                         break;
648                 } else if (1 /* chain is exhausted*/) {
649                         /* Normal case: continue to the next page. */
650                         page = ll_get_dir_page(dir, pos, 1, &chain);
651                 } else {
652                         /* go into overflow page. */
653                 }
654         }
655         EXIT;
656
657 out:
658         ll_dir_chain_fini(&chain);
659         spin_lock(&lli->lli_lock);
660         thread->t_flags = SVC_STOPPED;
661         spin_unlock(&lli->lli_lock);
662         cfs_waitq_signal(&sai->sai_waitq);
663         cfs_waitq_signal(&thread->t_ctl_waitq);
664         ll_sai_put(sai);
665         dput(parent);
666         CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
667                cfs_curproc_pid());
668         return rc;
669 }
670
671 /* called in ll_file_release() */
672 void ll_stop_statahead(struct inode *inode, void *key)
673 {
674         struct ll_inode_info *lli = ll_i2info(inode);
675         struct ptlrpc_thread *thread;
676
677         spin_lock(&lli->lli_lock);
678         if (lli->lli_opendir_pid == 0 ||
679             unlikely(lli->lli_opendir_key != key)) {
680                 spin_unlock(&lli->lli_lock);
681                 return;
682         }
683
684         lli->lli_opendir_key = NULL;
685         lli->lli_opendir_pid = 0;
686
687         if (lli->lli_sai) {
688                 struct l_wait_info lwi = { 0 };
689
690                 thread = &lli->lli_sai->sai_thread;
691                 if (!(thread->t_flags & SVC_STOPPED)) {
692                         thread->t_flags = SVC_STOPPING;
693                         spin_unlock(&lli->lli_lock);
694                         cfs_waitq_signal(&thread->t_ctl_waitq);
695
696                         CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
697                                cfs_curproc_pid());
698                         l_wait_event(thread->t_ctl_waitq,
699                                      thread->t_flags & SVC_STOPPED,
700                                      &lwi);
701                 } else {
702                         spin_unlock(&lli->lli_lock);
703                 }
704
705                 /* Put the ref which was held when first statahead_enter.
706                  * It maybe not the last ref for some statahead requests
707                  * maybe inflight. */
708                 ll_sai_put(lli->lli_sai);
709                 return;
710         }
711         spin_unlock(&lli->lli_lock);
712 }
713
714 enum {
715         LS_NONE_FIRST_DE = 0,   /* not first dirent, or is "." */
716         LS_FIRST_DE,            /* the first non-hidden dirent */
717         LS_FIRST_DOT_DE         /* the first hidden dirent, that is ".xxx" */
718 };
719
720 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
721 {
722         struct ll_dir_chain chain;
723         struct qstr        *target = &dentry->d_name;
724         struct page        *page;
725         __u64               pos = 0;
726         int                 dot_de;
727         int                 rc = LS_NONE_FIRST_DE;
728         ENTRY;
729
730         ll_dir_chain_init(&chain);
731         page = ll_get_dir_page(dir, pos, 0, &chain);
732
733         while (1) {
734                 struct lu_dirpage *dp;
735                 struct lu_dirent  *ent;
736
737                 if (IS_ERR(page)) {
738                         rc = PTR_ERR(page);
739                         CERROR("error reading dir "DFID" at %llu: rc %d\n",
740                                PFID(ll_inode2fid(dir)), pos, rc);
741                         break;
742                 }
743
744                 dp = page_address(page);
745                 for (ent = lu_dirent_start(dp); ent != NULL;
746                      ent = lu_dirent_next(ent)) {
747                         char *name = ent->lde_name;
748                         int namelen = le16_to_cpu(ent->lde_namelen);
749
750                         if (namelen == 0)
751                                 /* Skip dummy record. */
752                                 continue;
753
754                         if (name[0] == '.') {
755                                 if (namelen == 1)
756                                         /* skip . */
757                                         continue;
758                                 else if (name[1] == '.' && namelen == 2)
759                                         /* skip .. */
760                                         continue;
761                                 else
762                                         dot_de = 1;
763                         } else {
764                                 dot_de = 0;
765                         }
766
767                         if (dot_de && target->name[0] != '.') {
768                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
769                                        target->len, target->name,
770                                        namelen, name);
771                                 continue;
772                         }
773
774                         if (target->len == namelen &&
775                             !strncmp(target->name, name, target->len))
776                                 rc = LS_FIRST_DE + dot_de;
777                         else
778                                 rc = LS_NONE_FIRST_DE;
779                         ll_put_page(page);
780                         GOTO(out, rc);
781                 }
782                 pos = le64_to_cpu(dp->ldp_hash_end);
783                 ll_put_page(page);
784                 if (pos == DIR_END_OFF) {
785                         /* End of directory reached. */
786                         break;
787                 } else if (1 /* chain is exhausted*/) {
788                         /* Normal case: continue to the next page. */
789                         page = ll_get_dir_page(dir, pos, 1, &chain);
790                 } else {
791                         /* go into overflow page. */
792                 }
793         }
794         EXIT;
795
796 out:
797         ll_dir_chain_fini(&chain);
798         return rc;
799 }
800
801 /* Start statahead thread if this is the first dir entry.
802  * Otherwise if a thread is started already, wait it until it is ahead of me.
803  * Return value: 
804  *  0       -- stat ahead thread process such dentry, for lookup, it miss
805  *  1       -- stat ahead thread process such dentry, for lookup, it hit
806  *  -EEXIST -- stat ahead thread started, and this is the first dentry
807  *  -EBADFD -- statahead thread exit and not dentry available
808  *  others  -- error */
809 int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
810 {
811         struct ll_sb_info        *sbi = ll_i2sbi(dir);
812         struct ll_inode_info     *lli = ll_i2info(dir);
813         struct ll_statahead_info *sai = lli->lli_sai;
814         struct ll_sa_thread_args  sta;
815         struct l_wait_info        lwi = { 0 };
816         int                       rc;
817         ENTRY;
818
819         LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
820
821         if (sai) {
822                 if (unlikely(sai->sai_thread.t_flags & SVC_STOPPED &&
823                              list_empty(&sai->sai_entries)))
824                         RETURN(-EBADFD);
825
826                 if ((*dentryp)->d_name.name[0] == '.') {
827                         if (likely(sai->sai_ls_all ||
828                             sai->sai_miss_hidden >= sai->sai_skip_hidden)) {
829                                 /* Hidden dentry is the first one, or statahead
830                                  * thread does not skip so many hidden dentries
831                                  * before "sai_ls_all" enabled as below. */
832                         } else {
833                                 if (!sai->sai_ls_all)
834                                         /* It maybe because hidden dentry is not
835                                          * the first one, "sai_ls_all" was not
836                                          * set, then "ls -al" missed. Enable
837                                          * "sai_ls_all" for such case. */
838                                         sai->sai_ls_all = 1;
839
840                                 /* Such "getattr" has been skipped before
841                                  * "sai_ls_all" enabled as above. */
842                                 sai->sai_miss_hidden++;
843                                 RETURN(-ENOENT);
844                         }
845                 }
846
847                 if (ll_sai_entry_stated(sai)) {
848                         sbi->ll_sa_cached++;
849                 } else {
850                         sbi->ll_sa_blocked++;
851                         /* thread started already, avoid double-stat */
852                         l_wait_event(sai->sai_waitq,
853                                      ll_sai_entry_stated(sai) ||
854                                      sai->sai_thread.t_flags & SVC_STOPPED,
855                                      &lwi);
856                 }
857
858                 if (lookup) {
859                         struct dentry *result;
860
861                         result = d_lookup((*dentryp)->d_parent,
862                                           &(*dentryp)->d_name);
863                         if (result) {
864                                 LASSERT(result != *dentryp);
865                                 dput(*dentryp);
866                                 *dentryp = result;
867                                 RETURN(1);
868                         }
869                 }
870                 /* do nothing for revalidate */
871                 RETURN(0);
872         }
873
874          /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */ 
875         LASSERT(lli->lli_sai == NULL);
876
877         rc = is_first_dirent(dir, *dentryp);
878         if (rc == LS_NONE_FIRST_DE) {
879                 /* It is not "ls -{a}l" operation, no need statahead for it */
880                 spin_lock(&lli->lli_lock);
881                 lli->lli_opendir_key = NULL;
882                 lli->lli_opendir_pid = 0;
883                 spin_unlock(&lli->lli_lock);
884                 RETURN(-EBADF);
885         }
886
887         sai = ll_sai_alloc();
888         if (sai == NULL)
889                 RETURN(-ENOMEM);
890
891         sai->sai_inode  = igrab(dir);
892         sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
893
894         sta.sta_parent = (*dentryp)->d_parent;
895         sta.sta_pid    = cfs_curproc_pid();
896
897         lli->lli_sai = sai;
898         rc = cfs_kernel_thread(ll_statahead_thread, &sta, 0);
899         if (rc < 0) {
900                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
901                 sai->sai_thread.t_flags = SVC_STOPPED;
902                 ll_sai_put(sai);
903                 LASSERT(lli->lli_sai == NULL);
904                 RETURN(rc);
905         }
906
907         l_wait_event(sai->sai_thread.t_ctl_waitq, 
908                      sai->sai_thread.t_flags & (SVC_RUNNING | SVC_STOPPED),
909                      &lwi);
910
911         /* We don't stat-ahead for the first dirent since we are already in
912          * lookup, and -EEXIST also indicates that this is the first dirent. */
913         RETURN(-EEXIST);
914 }
915
916 /* update hit/miss count */
917 void ll_statahead_exit(struct dentry *dentry, int result)
918 {
919         struct dentry         *parent = dentry->d_parent;
920         struct ll_inode_info  *lli = ll_i2info(parent->d_inode);
921         struct ll_sb_info     *sbi = ll_i2sbi(parent->d_inode);
922         struct ll_dentry_data *ldd = ll_d2d(dentry);
923
924         if (lli->lli_opendir_pid != cfs_curproc_pid())
925                 return;
926
927         if (lli->lli_sai) {
928                 struct ll_statahead_info *sai = lli->lli_sai;
929
930                 if (result == 1) {
931                         sbi->ll_sa_hit++;
932                         sai->sai_hit++;
933                         sai->sai_consecutive_miss = 0;
934                         sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
935                 } else {
936                         sbi->ll_sa_miss++;
937                         sai->sai_miss++;
938                         sai->sai_consecutive_miss++;
939                         if (sa_low_hit(sai) &&
940                             sai->sai_thread.t_flags & SVC_RUNNING) {
941                                 sbi->ll_sa_wrong++;
942                                 CDEBUG(D_READA, "statahead for dir %.*s hit "
943                                        "ratio too low: hit/miss %u/%u, "
944                                        "sent/replied %u/%u. stopping statahead "
945                                        "thread: pid %d\n",
946                                        parent->d_name.len, parent->d_name.name,
947                                        sai->sai_hit, sai->sai_miss,
948                                        sai->sai_sent, sai->sai_replied,
949                                        cfs_curproc_pid());
950                                 spin_lock(&lli->lli_lock);
951                                 if (!(sai->sai_thread.t_flags & SVC_STOPPED))
952                                         sai->sai_thread.t_flags = SVC_STOPPING;
953                                 spin_unlock(&lli->lli_lock);
954                         }
955                 }
956
957                 cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
958                 ll_sai_entry_put(sai);
959
960                 if (likely(ldd != NULL))
961                         ldd->lld_sa_generation = sai->sai_generation;
962         }
963 }