Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2007 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include <linux/fs.h>
23 #include <linux/sched.h>
24 #include <linux/mm.h>
25 #include <linux/smp_lock.h>
26 #include <linux/highmem.h>
27 #include <linux/pagemap.h>
28
29 #define DEBUG_SUBSYSTEM S_LLITE
30
31 #include <obd_support.h>
32 #include <lustre_lite.h>
33 #include <lustre_dlm.h>
34 #include <linux/lustre_version.h>
35 #include "llite_internal.h"
36
37 struct ll_sai_entry {
38         struct list_head        se_list;
39         unsigned int            se_index;
40         int                     se_stat;
41         struct ptlrpc_request  *se_req;
42         struct md_enqueue_info *se_minfo;
43 };
44
45 enum {
46         SA_ENTRY_UNSTATED = 0,
47         SA_ENTRY_STATED
48 };
49
50 struct dentry_operations ll_sai_d_ops = {
51         .d_release = ll_release,
52 };
53
54 static unsigned int sai_generation = 0;
55 static spinlock_t sai_generation_lock = SPIN_LOCK_UNLOCKED;
56
57 /**
58  * Check whether first entry was stated already or not.
59  * No need to hold lli_lock, for:
60  * (1) it is me that remove entry from the list
61  * (2) the statahead thread only add new entry to the list
62  */
63 static int ll_sai_entry_stated(struct ll_statahead_info *sai)
64 {
65         struct ll_sai_entry  *entry;
66         int                   rc = 0;
67
68         if (!list_empty(&sai->sai_entries_stated)) {
69                 entry = list_entry(sai->sai_entries_stated.next,
70                                    struct ll_sai_entry, se_list);
71                 if (entry->se_index == sai->sai_index_next)
72                         rc = 1;
73         }
74         return rc;
75 }
76
77 static inline int sa_received_empty(struct ll_statahead_info *sai)
78 {
79         return list_empty(&sai->sai_entries_received);
80 }
81
82 static inline int sa_not_full(struct ll_statahead_info *sai)
83 {
84         return sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max;
85 }
86
87 static inline int sa_is_running(struct ll_statahead_info *sai)
88 {
89         return !!(sai->sai_thread.t_flags & SVC_RUNNING);
90 }
91
92 static inline int sa_is_stopping(struct ll_statahead_info *sai)
93 {
94         return !!(sai->sai_thread.t_flags & SVC_STOPPING);
95 }
96
97 static inline int sa_is_stopped(struct ll_statahead_info *sai)
98 {
99         return !!(sai->sai_thread.t_flags & SVC_STOPPED);
100 }
101
102 /**
103  * (1) hit ratio less than 80%
104  * or
105  * (2) consecutive miss more than 8
106  */
107 static inline int sa_low_hit(struct ll_statahead_info *sai)
108 {
109         return ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
110                 (sai->sai_consecutive_miss > 8));
111 }
112
113 /**
114  * process the deleted entry's member and free the entry.
115  * (1) release intent
116  * (2) free md_enqueue_info
117  * (3) drop dentry's ref count
118  * (4) release request's ref count
119  */
120 static void ll_sai_entry_cleanup(struct ll_sai_entry *entry)
121 {
122         struct ptlrpc_request  *req = entry->se_req;
123         struct md_enqueue_info *minfo = entry->se_minfo;
124         ENTRY;
125
126         if (minfo) {
127                 struct dentry        *dentry = minfo->mi_dentry;
128                 struct lookup_intent *it = &minfo->mi_it;
129
130                 entry->se_minfo = NULL;
131                 ll_intent_release(it);
132                 OBD_FREE_PTR(minfo);
133                 dput(dentry);
134         }
135         if (req) {
136                 entry->se_req = NULL;
137                 ptlrpc_req_finished(req);
138         }
139         OBD_FREE_PTR(entry);
140
141         EXIT;
142 }
143
144 static struct ll_statahead_info *ll_sai_alloc(void)
145 {
146         struct ll_statahead_info *sai;
147
148         OBD_ALLOC_PTR(sai);
149         if (!sai)
150                 return NULL;
151
152         spin_lock(&sai_generation_lock);
153         sai->sai_generation = ++sai_generation;
154         if (unlikely(sai_generation == 0))
155                 sai->sai_generation = ++sai_generation;
156         spin_unlock(&sai_generation_lock);
157         atomic_set(&sai->sai_refcount, 1);
158         sai->sai_max = LL_SA_RPC_MIN;
159         cfs_waitq_init(&sai->sai_waitq);
160         cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
161         CFS_INIT_LIST_HEAD(&sai->sai_entries_sent);
162         CFS_INIT_LIST_HEAD(&sai->sai_entries_received);
163         CFS_INIT_LIST_HEAD(&sai->sai_entries_stated);
164         return sai;
165 }
166
167 static inline 
168 struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
169 {
170         LASSERT(sai);
171         atomic_inc(&sai->sai_refcount);
172         return sai;
173 }
174
175 static void ll_sai_put(struct ll_statahead_info *sai)
176 {
177         struct inode         *inode = sai->sai_inode;
178         struct ll_inode_info *lli = ll_i2info(inode);
179         ENTRY;
180
181         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_lock)) {
182                 struct ll_sai_entry *entry, *next;
183
184                 lli->lli_sai = NULL;
185                 spin_unlock(&lli->lli_lock);
186
187                 LASSERT(sa_is_stopped(sai));
188
189                 if (sai->sai_sent > sai->sai_replied)
190                         CDEBUG(D_READA,"statahead for dir "DFID" does not "
191                               "finish: [sent:%u] [replied:%u]\n",
192                               PFID(&lli->lli_fid),
193                               sai->sai_sent, sai->sai_replied);
194
195                 list_for_each_entry_safe(entry, next, &sai->sai_entries_sent,
196                                          se_list) {
197                         list_del(&entry->se_list);
198                         ll_sai_entry_cleanup(entry);
199                 }
200                 list_for_each_entry_safe(entry, next, &sai->sai_entries_received,
201                                          se_list) {
202                         list_del(&entry->se_list);
203                         ll_sai_entry_cleanup(entry);
204                 }
205                 list_for_each_entry_safe(entry, next, &sai->sai_entries_stated,
206                                          se_list) {
207                         list_del(&entry->se_list);
208                         ll_sai_entry_cleanup(entry);
209                 }
210                 dput(sai->sai_first);
211                 OBD_FREE_PTR(sai);
212                 iput(inode);
213         }
214         EXIT;
215 }
216
217 /**
218  * insert it into sai_entries_sent tail when init.
219  */
220 static struct ll_sai_entry *
221 ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index)
222 {
223         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
224         struct ll_sai_entry  *entry;
225         ENTRY;
226
227         OBD_ALLOC_PTR(entry);
228         if (entry == NULL)
229                 RETURN(ERR_PTR(-ENOMEM));
230
231         CDEBUG(D_READA, "alloc sai entry %p index %u\n",
232                entry, index);
233         entry->se_index = index;
234         entry->se_stat  = SA_ENTRY_UNSTATED;
235
236         spin_lock(&lli->lli_lock);
237         list_add_tail(&entry->se_list, &sai->sai_entries_sent);
238         spin_unlock(&lli->lli_lock);
239
240         RETURN(entry);
241 }
242
243 /**
244  * delete it from sai_entries_stated head when fini, it need not
245  * to process entry's member.
246  */
247 static void ll_sai_entry_fini(struct ll_statahead_info *sai)
248 {
249         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
250         struct ll_sai_entry  *entry;
251         ENTRY;
252         
253         spin_lock(&lli->lli_lock);
254         sai->sai_index_next++;
255         if (likely(!list_empty(&sai->sai_entries_stated))) {
256                 entry = list_entry(sai->sai_entries_stated.next,
257                                    struct ll_sai_entry, se_list);
258                 if (entry->se_index < sai->sai_index_next) {
259                         list_del(&entry->se_list);
260                         OBD_FREE_PTR(entry);
261                 }
262         } else
263                 LASSERT(sa_is_stopped(sai));
264         spin_unlock(&lli->lli_lock);
265
266         EXIT;
267 }
268
269 /**
270  * inside lli_lock.
271  * \retval NULL : can not find the entry in sai_entries_sent with the index
272  * \retval entry: find the entry in sai_entries_sent with the index
273  */
274 static struct ll_sai_entry *
275 ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat,
276                  struct ptlrpc_request *req, struct md_enqueue_info *minfo)
277 {
278         struct ll_sai_entry *entry;
279         ENTRY;
280
281         if (!list_empty(&sai->sai_entries_sent)) {
282                 list_for_each_entry(entry, &sai->sai_entries_sent,
283                                     se_list) {
284                         if (entry->se_index == index) {
285                                 entry->se_stat = stat;
286                                 entry->se_req = ptlrpc_request_addref(req);
287                                 entry->se_minfo = minfo;
288                                 RETURN(entry);
289                         } else if (entry->se_index > index)
290                                 RETURN(NULL);
291                 }
292         }
293         RETURN(NULL);
294 }
295
296 /**
297  * inside lli_lock.
298  * Move entry to sai_entries_received and
299  * insert it into sai_entries_received tail.
300  */
301 static inline void
302 ll_sai_entry_to_received(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
303 {
304         if (!list_empty(&entry->se_list))
305                 list_del_init(&entry->se_list);
306         list_add_tail(&entry->se_list, &sai->sai_entries_received);
307 }
308
309 /**
310  * Move entry to sai_entries_stated and
311  * sort with the index.
312  */
313 static int
314 ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
315 {
316         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
317         struct ll_sai_entry  *se;
318         ENTRY;
319
320         spin_lock(&lli->lli_lock);
321         if (!list_empty(&entry->se_list))
322                 list_del_init(&entry->se_list);
323
324         if (unlikely(entry->se_index < sai->sai_index_next)) {
325                 spin_unlock(&lli->lli_lock);
326                 ll_sai_entry_cleanup(entry);
327                 RETURN(0);
328         }
329
330         list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) {
331                 if (se->se_index < entry->se_index) {
332                         list_add(&entry->se_list, &se->se_list);
333                         spin_unlock(&lli->lli_lock);
334                         RETURN(1);
335                 }
336         }
337
338         /*
339          * I am the first entry.
340          */
341         list_add(&entry->se_list, &sai->sai_entries_stated);
342         spin_unlock(&lli->lli_lock);
343         RETURN(1);
344 }
345
346 /**
347  * finish lookup/revalidate.
348  */
349 static int do_statahead_interpret(struct ll_statahead_info *sai)
350 {
351         struct ll_inode_info   *lli = ll_i2info(sai->sai_inode);
352         struct ll_sai_entry    *entry;
353         struct ptlrpc_request  *req;
354         struct md_enqueue_info *minfo;
355         struct dentry          *dentry;
356         struct lookup_intent   *it;
357         int                     rc = 0;
358         ENTRY;
359
360         spin_lock(&lli->lli_lock);
361         LASSERT(!sa_received_empty(sai));
362         entry = list_entry(sai->sai_entries_received.next, struct ll_sai_entry,
363                            se_list);
364         list_del_init(&entry->se_list);
365         spin_unlock(&lli->lli_lock);
366
367         if (unlikely(entry->se_index < sai->sai_index_next)) {
368                 ll_sai_entry_cleanup(entry);
369                 RETURN(0);
370         }
371
372         req = entry->se_req;
373         minfo = entry->se_minfo;
374         dentry = minfo->mi_dentry;
375         it = &minfo->mi_it;
376
377         if (entry->se_stat != SA_ENTRY_STATED)
378                 GOTO(out, rc = entry->se_stat);
379
380         if (dentry->d_inode == NULL) {
381                 /*
382                  * lookup.
383                  */
384                 struct dentry    *save = dentry;
385                 struct it_cb_data icbd = {
386                         .icbd_parent   = dentry->d_parent->d_inode,
387                         .icbd_childp   = &dentry
388                 };
389
390                 LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
391
392                 rc = ll_lookup_it_finish(req, it, &icbd);
393                 if (!rc)
394                         /*
395                          * Here dentry->d_inode might be NULL,
396                          * because the entry may have been removed before
397                          * we start doing stat ahead.
398                          */
399                         ll_lookup_finish_locks(it, dentry);
400
401                 if (dentry != save) {
402                         minfo->mi_dentry = dentry;
403                         dput(save);
404                 }
405         } else {
406                 /*
407                  * revalidate.
408                  */
409                 struct mdt_body *body;
410
411                 body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
412                                       sizeof(*body));
413                 if (!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1)) {
414                         ll_unhash_aliases(dentry->d_inode);
415                         GOTO(out, rc = -EAGAIN);
416                 }
417
418                 rc = ll_revalidate_it_finish(req, it, dentry);
419                 if (rc) {
420                         ll_unhash_aliases(dentry->d_inode);
421                         GOTO(out, rc);
422                 }
423
424                 spin_lock(&dcache_lock);
425                 lock_dentry(dentry);
426                 __d_drop(dentry);
427 #ifdef DCACHE_LUSTRE_INVALID
428                 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
429 #endif
430                 unlock_dentry(dentry);
431                 d_rehash_cond(dentry, 0);
432                 spin_unlock(&dcache_lock);
433
434                 ll_lookup_finish_locks(it, dentry);
435         }
436         EXIT;
437
438 out:
439         if (likely(ll_sai_entry_to_stated(sai, entry))) {
440                 entry->se_minfo = NULL;
441                 entry->se_req = NULL;
442                 cfs_waitq_signal(&sai->sai_waitq);
443                 ll_intent_release(it);
444                 OBD_FREE_PTR(minfo);
445                 dput(dentry);
446                 ptlrpc_req_finished(req);
447         }
448         return rc;
449 }
450
451 static int ll_statahead_interpret(struct ptlrpc_request *req,
452                                   struct md_enqueue_info *minfo,
453                                   int rc)
454 {
455         struct dentry            *dentry = minfo->mi_dentry;
456         struct lookup_intent     *it = &minfo->mi_it;
457         struct inode             *dir = dentry->d_parent->d_inode;
458         struct ll_inode_info     *lli = ll_i2info(dir);
459         struct ll_statahead_info *sai;
460         struct ll_sai_entry      *entry;
461         ENTRY;
462
463         CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
464                dentry->d_name.len, dentry->d_name.name, rc);
465
466         spin_lock(&lli->lli_lock);
467         if (unlikely(lli->lli_sai == NULL ||
468             lli->lli_sai->sai_generation != minfo->mi_generation)) {
469                 spin_unlock(&lli->lli_lock);
470                 ll_intent_release(it);
471                 dput(dentry);
472                 OBD_FREE_PTR(minfo);
473                 RETURN(-ESTALE);
474         } else {
475                 sai = lli->lli_sai;
476                 if (rc || dir == NULL)
477                         rc = -ESTALE;
478
479                 entry = ll_sai_entry_set(sai,
480                                          (unsigned int)(long)minfo->mi_cbdata,
481                                          rc ? SA_ENTRY_UNSTATED :
482                                          SA_ENTRY_STATED, req, minfo);
483                 LASSERT(entry != NULL);
484                 if (likely(sa_is_running(sai))) {
485                         ll_sai_entry_to_received(sai, entry);
486                         sai->sai_replied++;
487                         spin_unlock(&lli->lli_lock);
488                         cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
489                 } else {
490                         if (!list_empty(&entry->se_list))
491                                 list_del_init(&entry->se_list);
492                         sai->sai_replied++;
493                         spin_unlock(&lli->lli_lock);
494                         ll_sai_entry_cleanup(entry);
495                 }
496                 RETURN(rc);
497         }
498 }
499
500 static void sa_args_fini(struct md_enqueue_info *minfo,
501                          struct ldlm_enqueue_info *einfo)
502 {
503         LASSERT(minfo && einfo);
504         capa_put(minfo->mi_data.op_capa1);
505         capa_put(minfo->mi_data.op_capa2);
506         OBD_FREE_PTR(minfo);
507         OBD_FREE_PTR(einfo);
508 }
509
510 /**
511  * There is race condition between "capa_put" and "ll_statahead_interpret" for
512  * accessing "op_data.op_capa[1,2]" as following:
513  * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling
514  * "md_intent_getattr_async". But "ll_statahead_interpret" maybe run first, and
515  * fill "op_data.op_capa[1,2]" as POISON, then cause "capa_put" access invalid
516  * "ocapa". So here reserve "op_data.op_capa[1,2]" in "pcapa" before calling
517  * "md_intent_getattr_async".
518  */
519 static int sa_args_init(struct inode *dir, struct dentry *dentry,
520                         struct md_enqueue_info **pmi,
521                         struct ldlm_enqueue_info **pei,
522                         struct obd_capa **pcapa)
523 {
524         struct ll_inode_info     *lli = ll_i2info(dir);
525         struct md_enqueue_info   *minfo;
526         struct ldlm_enqueue_info *einfo;
527         struct md_op_data        *op_data;
528
529         OBD_ALLOC_PTR(einfo);
530         if (einfo == NULL)
531                 return -ENOMEM;
532
533         OBD_ALLOC_PTR(minfo);
534         if (minfo == NULL) {
535                 OBD_FREE_PTR(einfo);
536                 return -ENOMEM;
537         }
538
539         op_data = ll_prep_md_op_data(&minfo->mi_data, dir, dentry->d_inode,
540                                      dentry->d_name.name, dentry->d_name.len,
541                                      0, LUSTRE_OPC_ANY, NULL);
542         if (IS_ERR(op_data)) {
543                 OBD_FREE_PTR(einfo);
544                 OBD_FREE_PTR(minfo);
545                 return PTR_ERR(op_data);
546         }
547
548         minfo->mi_it.it_op = IT_GETATTR;
549         minfo->mi_dentry = dentry;
550         minfo->mi_cb = ll_statahead_interpret;
551         minfo->mi_generation = lli->lli_sai->sai_generation;
552         minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index;
553
554         einfo->ei_type   = LDLM_IBITS;
555         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
556         einfo->ei_cb_bl  = ll_md_blocking_ast;
557         einfo->ei_cb_cp  = ldlm_completion_ast;
558         einfo->ei_cb_gl  = NULL;
559         einfo->ei_cbdata = NULL;
560
561         *pmi = minfo;
562         *pei = einfo;
563         pcapa[0] = op_data->op_capa1;
564         pcapa[1] = op_data->op_capa2;
565
566         return 0;
567 }
568
569 /**
570  * similar to ll_lookup_it().
571  */
572 static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
573 {
574         struct md_enqueue_info   *minfo;
575         struct ldlm_enqueue_info *einfo;
576         struct obd_capa          *capas[2];
577         int                       rc;
578         ENTRY;
579
580         rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
581         if (rc)
582                 RETURN(rc);
583
584         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
585         if (!rc) {
586                 capa_put(capas[0]);
587                 capa_put(capas[1]);
588         } else {
589                 sa_args_fini(minfo, einfo);
590         }
591
592         RETURN(rc);
593 }
594
595 /**
596  * similar to ll_revalidate_it().
597  * \retval      1 -- dentry valid
598  * \retval      0 -- will send stat-ahead request
599  * \retval others -- prepare stat-ahead request failed
600  */
601 static int do_sa_revalidate(struct dentry *dentry)
602 {
603         struct inode             *inode = dentry->d_inode;
604         struct inode             *dir = dentry->d_parent->d_inode;
605         struct lookup_intent      it = { .it_op = IT_GETATTR };
606         struct md_enqueue_info   *minfo;
607         struct ldlm_enqueue_info *einfo;
608         struct obd_capa          *capas[2];
609         int rc;
610         ENTRY;
611
612         if (inode == NULL)
613                 RETURN(1);
614
615         if (d_mountpoint(dentry))
616                 RETURN(1);
617
618         if (dentry == dentry->d_sb->s_root)
619                 RETURN(1);
620
621         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode));
622         if (rc == 1) {
623                 ll_intent_release(&it);
624                 RETURN(1);
625         }
626
627         rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
628         if (rc)
629                 RETURN(rc);
630
631         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
632         if (!rc) {
633                 capa_put(capas[0]);
634                 capa_put(capas[1]);
635         } else {
636                 sa_args_fini(minfo, einfo);
637         }
638
639         RETURN(rc);
640 }
641
642 static inline void ll_name2qstr(struct qstr *this, const char *name, int namelen)
643 {
644         unsigned long hash = init_name_hash();
645         unsigned int  c;
646
647         this->name = name;
648         this->len  = namelen;
649         for (; namelen > 0; namelen--, name++) {
650                 c = *(const unsigned char *)name;
651                 hash = partial_name_hash(c, hash);
652         }
653         this->hash = end_name_hash(hash);
654 }
655
656 static int ll_statahead_one(struct dentry *parent, const char* entry_name,
657                             int entry_name_len)
658 {
659         struct inode             *dir = parent->d_inode;
660         struct ll_inode_info     *lli = ll_i2info(dir);
661         struct ll_statahead_info *sai = lli->lli_sai;
662         struct qstr               name;
663         struct dentry            *dentry;
664         struct ll_sai_entry      *se;
665         int                       rc;
666         ENTRY;
667
668 #ifdef DCACHE_LUSTRE_INVALID
669         if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
670 #else
671         if (d_unhashed(parent)) {
672 #endif
673                 CDEBUG(D_READA, "parent dentry@%p %.*s is "
674                        "invalid, skip statahead\n",
675                        parent, parent->d_name.len, parent->d_name.name);
676                 RETURN(-EINVAL);
677         }
678
679         se = ll_sai_entry_init(sai, sai->sai_index);
680         if (IS_ERR(se))
681                 RETURN(PTR_ERR(se));
682
683         ll_name2qstr(&name, entry_name, entry_name_len);
684         dentry = d_lookup(parent, &name);
685         if (!dentry) {
686                 dentry = d_alloc(parent, &name);
687                 if (dentry) {
688                         rc = do_sa_lookup(dir, dentry);
689                         if (rc)
690                                 dput(dentry);
691                 } else {
692                         GOTO(out, rc = -ENOMEM);
693                 }
694         } else {
695                 rc = do_sa_revalidate(dentry);
696                 if (rc)
697                         dput(dentry);
698         }
699
700         EXIT;
701
702 out:
703         if (rc) {
704                 CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
705                        se, se->se_index, se->se_stat, rc);
706                 se->se_stat = rc;
707                 if (ll_sai_entry_to_stated(sai, se))
708                         cfs_waitq_signal(&sai->sai_waitq);
709         } else {
710                 sai->sai_sent++;
711         }
712
713         sai->sai_index++;
714         return rc;
715 }
716
717 struct ll_sa_thread_args {
718         struct dentry   *sta_parent;
719         pid_t            sta_pid;
720 };
721
722 static int ll_statahead_thread(void *arg)
723 {
724         struct ll_sa_thread_args *sta = arg;
725         struct dentry            *parent = dget(sta->sta_parent);
726         struct inode             *dir = parent->d_inode;
727         struct ll_inode_info     *lli = ll_i2info(dir);
728         struct ll_sb_info        *sbi = ll_i2sbi(dir);
729         struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
730         struct ptlrpc_thread     *thread = &sai->sai_thread;
731         struct page              *page;
732         __u64                     pos = 0;
733         int                       first = 0;
734         int                       rc = 0;
735         struct ll_dir_chain       chain;
736         ENTRY;
737
738         {
739                 char pname[16];
740                 snprintf(pname, 15, "ll_sa_%u", sta->sta_pid);
741                 cfs_daemonize(pname);
742         }
743
744         sbi->ll_sa_total++;
745         spin_lock(&lli->lli_lock);
746         thread->t_flags = SVC_RUNNING;
747         spin_unlock(&lli->lli_lock);
748         cfs_waitq_signal(&thread->t_ctl_waitq);
749         CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
750
751         ll_dir_chain_init(&chain);
752         page = ll_get_dir_page(dir, pos, 0, &chain);
753
754         while (1) {
755                 struct l_wait_info lwi = { 0 };
756                 struct lu_dirpage *dp;
757                 struct lu_dirent  *ent;
758
759                 if (IS_ERR(page)) {
760                         rc = PTR_ERR(page);
761                         CERROR("error reading dir "DFID" at %llu/%u: rc %d\n",
762                                PFID(ll_inode2fid(dir)), pos,
763                                sai->sai_index, rc);
764                         break;
765                 }
766
767                 dp = page_address(page);
768                 for (ent = lu_dirent_start(dp); ent != NULL;
769                      ent = lu_dirent_next(ent)) {
770                         char *name = ent->lde_name;
771                         int namelen = le16_to_cpu(ent->lde_namelen);
772
773                         if (namelen == 0)
774                                 /*
775                                  * Skip dummy record.
776                                  */
777                                 continue;
778
779                         if (name[0] == '.') {
780                                 if (namelen == 1) {
781                                         /*
782                                          * skip "."
783                                          */
784                                         continue;
785                                 } else if (name[1] == '.' && namelen == 2) {
786                                         /*
787                                          * skip ".."
788                                          */
789                                         continue;
790                                 } else if (!sai->sai_ls_all) {
791                                         /*
792                                          * skip hidden files.
793                                          */
794                                         sai->sai_skip_hidden++;
795                                         continue;
796                                 }
797                         }
798
799                         /*
800                          * don't stat-ahead first entry.
801                          */
802                         if (unlikely(!first)) {
803                                 first++;
804                                 continue;
805                         }
806
807 keep_de:
808                         l_wait_event(thread->t_ctl_waitq,
809                                      !sa_is_running(sai) || sa_not_full(sai) ||
810                                      !sa_received_empty(sai),
811                                      &lwi);
812
813                         while (!sa_received_empty(sai) && sa_is_running(sai))
814                                 do_statahead_interpret(sai);
815
816                         if (unlikely(!sa_is_running(sai))) {
817                                 ll_put_page(page);
818                                 GOTO(out, rc);
819                         }
820
821                         if (!sa_not_full(sai))
822                                 /*
823                                  * do not skip the current de.
824                                  */
825                                 goto keep_de;
826
827                         rc = ll_statahead_one(parent, name, namelen);
828                         if (rc < 0) {
829                                 ll_put_page(page);
830                                 GOTO(out, rc);
831                         }
832                 }
833                 pos = le64_to_cpu(dp->ldp_hash_end);
834                 ll_put_page(page);
835                 if (pos == DIR_END_OFF) {
836                         /*
837                          * End of directory reached.
838                          */
839                         while (1) {
840                                 l_wait_event(thread->t_ctl_waitq,
841                                              !sa_is_running(sai) ||
842                                              !sa_received_empty(sai) ||
843                                              sai->sai_sent == sai->sai_replied,
844                                              &lwi);
845                                 if (!sa_received_empty(sai) &&
846                                     sa_is_running(sai))
847                                         do_statahead_interpret(sai);
848                                 else
849                                         GOTO(out, rc);
850                         }
851                 } else if (1) {
852                         /*
853                          * chain is exhausted.
854                          * Normal case: continue to the next page.
855                          */
856                         page = ll_get_dir_page(dir, pos, 1, &chain);
857                 } else {
858                         /*
859                          * go into overflow page.
860                          */
861                 }
862         }
863         EXIT;
864
865 out:
866         ll_dir_chain_fini(&chain);
867         spin_lock(&lli->lli_lock);
868         thread->t_flags = SVC_STOPPED;
869         spin_unlock(&lli->lli_lock);
870         cfs_waitq_signal(&sai->sai_waitq);
871         cfs_waitq_signal(&thread->t_ctl_waitq);
872         ll_sai_put(sai);
873         dput(parent);
874         CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
875                cfs_curproc_pid());
876         return rc;
877 }
878
879 /**
880  * called in ll_file_release().
881  */
882 void ll_stop_statahead(struct inode *inode, void *key)
883 {
884         struct ll_inode_info *lli = ll_i2info(inode);
885         struct ptlrpc_thread *thread;
886
887         spin_lock(&lli->lli_lock);
888         if (lli->lli_opendir_pid == 0 ||
889             unlikely(lli->lli_opendir_key != key)) {
890                 spin_unlock(&lli->lli_lock);
891                 return;
892         }
893
894         lli->lli_opendir_key = NULL;
895         lli->lli_opendir_pid = 0;
896
897         if (lli->lli_sai) {
898                 struct l_wait_info lwi = { 0 };
899
900                 thread = &lli->lli_sai->sai_thread;
901                 if (!sa_is_stopped(lli->lli_sai)) {
902                         thread->t_flags = SVC_STOPPING;
903                         spin_unlock(&lli->lli_lock);
904                         cfs_waitq_signal(&thread->t_ctl_waitq);
905
906                         CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
907                                cfs_curproc_pid());
908                         l_wait_event(thread->t_ctl_waitq,
909                                      sa_is_stopped(lli->lli_sai),
910                                      &lwi);
911                 } else {
912                         spin_unlock(&lli->lli_lock);
913                 }
914
915                 /*
916                  * Put the ref which was held when first statahead_enter.
917                  * It maybe not the last ref for some statahead requests
918                  * maybe inflight.
919                  */
920                 ll_sai_put(lli->lli_sai);
921                 return;
922         }
923         spin_unlock(&lli->lli_lock);
924 }
925
926 enum {
927         /**
928          * not first dirent, or is "."
929          */
930         LS_NONE_FIRST_DE = 0,
931         /**
932          * the first non-hidden dirent
933          */
934         LS_FIRST_DE,
935         /**
936          * the first hidden dirent, that is ".xxx
937          */
938         LS_FIRST_DOT_DE
939 };
940
941 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
942 {
943         struct ll_dir_chain chain;
944         struct qstr        *target = &dentry->d_name;
945         struct page        *page;
946         __u64               pos = 0;
947         int                 dot_de;
948         int                 rc = LS_NONE_FIRST_DE;
949         ENTRY;
950
951         ll_dir_chain_init(&chain);
952         page = ll_get_dir_page(dir, pos, 0, &chain);
953
954         while (1) {
955                 struct lu_dirpage *dp;
956                 struct lu_dirent  *ent;
957
958                 if (IS_ERR(page)) {
959                         rc = PTR_ERR(page);
960                         CERROR("error reading dir "DFID" at %llu: rc %d\n",
961                                PFID(ll_inode2fid(dir)), pos, rc);
962                         break;
963                 }
964
965                 dp = page_address(page);
966                 for (ent = lu_dirent_start(dp); ent != NULL;
967                      ent = lu_dirent_next(ent)) {
968                         char *name = ent->lde_name;
969                         int namelen = le16_to_cpu(ent->lde_namelen);
970
971                         if (namelen == 0)
972                                 /*
973                                  * skip dummy record.
974                                  */
975                                 continue;
976
977                         if (name[0] == '.') {
978                                 if (namelen == 1)
979                                         /*
980                                          * skip "."
981                                          */
982                                         continue;
983                                 else if (name[1] == '.' && namelen == 2)
984                                         /*
985                                          * skip ".."
986                                          */
987                                         continue;
988                                 else
989                                         dot_de = 1;
990                         } else {
991                                 dot_de = 0;
992                         }
993
994                         if (dot_de && target->name[0] != '.') {
995                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
996                                        target->len, target->name,
997                                        namelen, name);
998                                 continue;
999                         }
1000
1001                         if (target->len == namelen &&
1002                             !strncmp(target->name, name, target->len))
1003                                 rc = LS_FIRST_DE + dot_de;
1004                         else
1005                                 rc = LS_NONE_FIRST_DE;
1006                         ll_put_page(page);
1007                         GOTO(out, rc);
1008                 }
1009                 pos = le64_to_cpu(dp->ldp_hash_end);
1010                 ll_put_page(page);
1011                 if (pos == DIR_END_OFF) {
1012                         /*
1013                          * End of directory reached.
1014                          */
1015                         break;
1016                 } else if (1) {
1017                         /*
1018                          * chain is exhausted 
1019                          * Normal case: continue to the next page.
1020                          */
1021                         page = ll_get_dir_page(dir, pos, 1, &chain);
1022                 } else {
1023                         /*
1024                          * go into overflow page.
1025                          */
1026                 }
1027         }
1028         EXIT;
1029
1030 out:
1031         ll_dir_chain_fini(&chain);
1032         return rc;
1033 }
1034
1035 /**
1036  * Start statahead thread if this is the first dir entry.
1037  * Otherwise if a thread is started already, wait it until it is ahead of me.
1038  * \retval 0       -- stat ahead thread process such dentry, for lookup, it miss
1039  * \retval 1       -- stat ahead thread process such dentry, for lookup, it hit
1040  * \retval -EEXIST -- stat ahead thread started, and this is the first dentry
1041  * \retval -EBADFD -- statahead thread exit and not dentry available
1042  * \retval others  -- error
1043  */
1044 int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
1045 {
1046         struct ll_sb_info        *sbi = ll_i2sbi(dir);
1047         struct ll_inode_info     *lli = ll_i2info(dir);
1048         struct ll_statahead_info *sai = lli->lli_sai;
1049         struct ll_sa_thread_args  sta;
1050         struct l_wait_info        lwi = { 0 };
1051         int                       rc;
1052         ENTRY;
1053
1054         LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
1055
1056         if (sai) {
1057                 if (unlikely(sa_is_stopped(sai) &&
1058                              list_empty(&sai->sai_entries_stated)))
1059                         RETURN(-EBADFD);
1060
1061                 /*
1062                  * skip the first dentry.
1063                  */
1064                 if (unlikely((*dentryp)->d_name.len ==
1065                              sai->sai_first->d_name.len &&
1066                              !strncmp((*dentryp)->d_name.name,
1067                                       sai->sai_first->d_name.name,
1068                                       sai->sai_first->d_name.len)))
1069                         RETURN(-EEXIST);
1070
1071                 if ((*dentryp)->d_name.name[0] == '.') {
1072                         if (likely(sai->sai_ls_all ||
1073                             sai->sai_miss_hidden >= sai->sai_skip_hidden)) {
1074                                 /*
1075                                  * Hidden dentry is the first one, or statahead
1076                                  * thread does not skip so many hidden dentries
1077                                  * before "sai_ls_all" enabled as below.
1078                                  */
1079                         } else {
1080                                 if (!sai->sai_ls_all)
1081                                         /*
1082                                          * It maybe because hidden dentry is not
1083                                          * the first one, "sai_ls_all" was not
1084                                          * set, then "ls -al" missed. Enable
1085                                          * "sai_ls_all" for such case.
1086                                          */
1087                                         sai->sai_ls_all = 1;
1088
1089                                 /*
1090                                  * Such "getattr" has been skipped before
1091                                  * "sai_ls_all" enabled as above.
1092                                  */
1093                                 sai->sai_miss_hidden++;
1094                                 RETURN(-ENOENT);
1095                         }
1096                 }
1097
1098                 if (ll_sai_entry_stated(sai)) {
1099                         sbi->ll_sa_cached++;
1100                 } else {
1101                         sbi->ll_sa_blocked++;
1102                         /*
1103                          * thread started already, avoid double-stat.
1104                          */
1105                         l_wait_event(sai->sai_waitq,
1106                                      ll_sai_entry_stated(sai) || sa_is_stopped(sai),
1107                                      &lwi);
1108                 }
1109
1110                 if (lookup) {
1111                         struct dentry *result;
1112
1113                         result = d_lookup((*dentryp)->d_parent,
1114                                           &(*dentryp)->d_name);
1115                         if (result) {
1116                                 LASSERT(result != *dentryp);
1117                                 dput(*dentryp);
1118                                 *dentryp = result;
1119                                 RETURN(1);
1120                         }
1121                 }
1122                 /*
1123                  * do nothing for revalidate.
1124                  */
1125                 RETURN(0);
1126         }
1127
1128          /*
1129           * I am the "lli_opendir_pid" owner, only me can set "lli_sai".
1130           */ 
1131         LASSERT(lli->lli_sai == NULL);
1132
1133         rc = is_first_dirent(dir, *dentryp);
1134         if (rc == LS_NONE_FIRST_DE) {
1135                 /*
1136                  * It is not "ls -{a}l" operation, no need statahead for it.
1137                  */
1138                 spin_lock(&lli->lli_lock);
1139                 lli->lli_opendir_key = NULL;
1140                 lli->lli_opendir_pid = 0;
1141                 spin_unlock(&lli->lli_lock);
1142                 RETURN(-EBADF);
1143         }
1144
1145         sai = ll_sai_alloc();
1146         if (sai == NULL)
1147                 RETURN(-ENOMEM);
1148
1149         sai->sai_inode  = igrab(dir);
1150         sai->sai_first = dget(*dentryp);
1151         sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
1152
1153         sta.sta_parent = (*dentryp)->d_parent;
1154         sta.sta_pid    = cfs_curproc_pid();
1155
1156         lli->lli_sai = sai;
1157         rc = cfs_kernel_thread(ll_statahead_thread, &sta, 0);
1158         if (rc < 0) {
1159                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1160                 sai->sai_thread.t_flags = SVC_STOPPED;
1161                 ll_sai_put(sai);
1162                 LASSERT(lli->lli_sai == NULL);
1163                 RETURN(rc);
1164         }
1165
1166         l_wait_event(sai->sai_thread.t_ctl_waitq, 
1167                      sa_is_running(sai) || sa_is_stopped(sai),
1168                      &lwi);
1169
1170         /*
1171          * We don't stat-ahead for the first dirent since we are already in
1172          * lookup, and -EEXIST also indicates that this is the first dirent.
1173          */
1174         RETURN(-EEXIST);
1175 }
1176
1177 /**
1178  * update hit/miss count.
1179  */
1180 int ll_statahead_exit(struct dentry *dentry, int result)
1181 {
1182         struct dentry         *parent = dentry->d_parent;
1183         struct ll_inode_info  *lli = ll_i2info(parent->d_inode);
1184         struct ll_sb_info     *sbi = ll_i2sbi(parent->d_inode);
1185         struct ll_dentry_data *ldd = ll_d2d(dentry);
1186         ENTRY;
1187
1188         if (lli->lli_opendir_pid != cfs_curproc_pid())
1189                 RETURN(-EBADFD);
1190
1191         if (lli->lli_sai) {
1192                 struct ll_statahead_info *sai = lli->lli_sai;
1193
1194                 if (result >= 1) {
1195                         sbi->ll_sa_hit++;
1196                         sai->sai_hit++;
1197                         sai->sai_consecutive_miss = 0;
1198                         sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
1199                 } else {
1200                         sbi->ll_sa_miss++;
1201                         sai->sai_miss++;
1202                         sai->sai_consecutive_miss++;
1203                         if (sa_low_hit(sai) && sa_is_running(sai)) {
1204                                 sbi->ll_sa_wrong++;
1205                                 CDEBUG(D_READA, "statahead for dir %.*s hit "
1206                                        "ratio too low: hit/miss %u/%u, "
1207                                        "sent/replied %u/%u. stopping statahead "
1208                                        "thread: pid %d\n",
1209                                        parent->d_name.len, parent->d_name.name,
1210                                        sai->sai_hit, sai->sai_miss,
1211                                        sai->sai_sent, sai->sai_replied,
1212                                        cfs_curproc_pid());
1213                                 spin_lock(&lli->lli_lock);
1214                                 if (!sa_is_stopped(sai))
1215                                         sai->sai_thread.t_flags = SVC_STOPPING;
1216                                 spin_unlock(&lli->lli_lock);
1217                         }
1218                 }
1219
1220                 if (!sa_is_stopped(sai))
1221                         cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
1222                 ll_sai_entry_fini(sai);
1223
1224                 if (unlikely(ldd == NULL)) {
1225                         ll_set_dd(dentry);
1226                         ldd = ll_d2d(dentry);
1227                         if (ldd != NULL && dentry->d_op == NULL) {
1228                                 lock_dentry(dentry);
1229                                 dentry->d_op = dentry->d_op ? : &ll_sai_d_ops;
1230                                 unlock_dentry(dentry);
1231                         }
1232                 }
1233
1234                 if (likely(ldd != NULL))
1235                         ldd->lld_sa_generation = sai->sai_generation;
1236                 else
1237                         RETURN(-ENOMEM);
1238         }
1239         RETURN(0);
1240 }