Whamcloud - gitweb
Land b1_8_gate onto b1_8 (20081218_1708)
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/mm.h>
40 #include <linux/smp_lock.h>
41 #include <linux/highmem.h>
42 #include <linux/pagemap.h>
43
44 #define DEBUG_SUBSYSTEM S_LLITE
45
46 #include <obd_support.h>
47 #include <lustre_lite.h>
48 #include <lustre_dlm.h>
49 #include <linux/lustre_version.h>
50 #include "llite_internal.h"
51
52 struct ll_sai_entry {
53         struct list_head        se_list;
54         unsigned int            se_index;
55         int                     se_stat;
56         struct ptlrpc_request  *se_req;
57         struct md_enqueue_info *se_minfo;
58 };
59
60 enum {
61         SA_ENTRY_UNSTATED = 0,
62         SA_ENTRY_STATED
63 };
64
65 static unsigned int sai_generation = 0;
66 static spinlock_t sai_generation_lock = SPIN_LOCK_UNLOCKED;
67
68 /**
69  * Check whether first entry was stated already or not.
70  * No need to hold lli_lock, for:
71  * (1) it is me that remove entry from the list
72  * (2) the statahead thread only add new entry to the list
73  */
74 static int ll_sai_entry_stated(struct ll_statahead_info *sai)
75 {
76         struct ll_sai_entry  *entry;
77         int                   rc = 0;
78
79         if (!list_empty(&sai->sai_entries_stated)) {
80                 entry = list_entry(sai->sai_entries_stated.next,
81                                    struct ll_sai_entry, se_list);
82                 if (entry->se_index == sai->sai_index_next)
83                         rc = 1;
84         }
85         return rc;
86 }
87
88 static inline int sa_received_empty(struct ll_statahead_info *sai)
89 {
90         return list_empty(&sai->sai_entries_received);
91 }
92
93 static inline int sa_not_full(struct ll_statahead_info *sai)
94 {
95         return (sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max);
96 }
97
98 static inline int sa_is_running(struct ll_statahead_info *sai)
99 {
100         return !!(sai->sai_thread.t_flags & SVC_RUNNING);
101 }
102
103 static inline int sa_is_stopping(struct ll_statahead_info *sai)
104 {
105         return !!(sai->sai_thread.t_flags & SVC_STOPPING);
106 }
107
108 static inline int sa_is_stopped(struct ll_statahead_info *sai)
109 {
110         return !!(sai->sai_thread.t_flags & SVC_STOPPED);
111 }
112
113 /**
114  * (1) hit ratio less than 80%
115  * or
116  * (2) consecutive miss more than 8
117  */
118 static inline int sa_low_hit(struct ll_statahead_info *sai)
119 {
120         return ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
121                 (sai->sai_consecutive_miss > 8));
122 }
123
124 /**
125  * process the deleted entry's member and free the entry.
126  * (1) release intent
127  * (2) free md_enqueue_info
128  * (3) drop dentry's ref count
129  * (4) release request's ref count
130  */
131 static void ll_sai_entry_cleanup(struct ll_sai_entry *entry, int free)
132 {
133         struct ptlrpc_request  *req = entry->se_req;
134         struct md_enqueue_info *minfo = entry->se_minfo;
135         ENTRY;
136
137         if (minfo) {
138                 struct dentry        *dentry = minfo->mi_dentry;
139                 struct lookup_intent *it = &minfo->mi_it;
140
141                 entry->se_minfo = NULL;
142                 ll_intent_release(it);
143                 OBD_FREE_PTR(minfo);
144                 dput(dentry);
145         }
146         if (req) {
147                 entry->se_req = NULL;
148                 ptlrpc_req_finished(req);
149         }
150         if (free) {
151                 LASSERT(list_empty(&entry->se_list));
152                 OBD_FREE_PTR(entry);
153         }
154
155         EXIT;
156 }
157
158 static struct ll_statahead_info *ll_sai_alloc(void)
159 {
160         struct ll_statahead_info *sai;
161
162         OBD_ALLOC_PTR(sai);
163         if (!sai)
164                 return NULL;
165
166         spin_lock(&sai_generation_lock);
167         sai->sai_generation = ++sai_generation;
168         if (unlikely(sai_generation == 0))
169                 sai->sai_generation = ++sai_generation;
170         spin_unlock(&sai_generation_lock);
171         atomic_set(&sai->sai_refcount, 1);
172         sai->sai_max = LL_SA_RPC_MIN;
173         cfs_waitq_init(&sai->sai_waitq);
174         cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
175         CFS_INIT_LIST_HEAD(&sai->sai_entries_sent);
176         CFS_INIT_LIST_HEAD(&sai->sai_entries_received);
177         CFS_INIT_LIST_HEAD(&sai->sai_entries_stated);
178         return sai;
179 }
180
181 static inline
182 struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
183 {
184         LASSERT(sai);
185         atomic_inc(&sai->sai_refcount);
186         return sai;
187 }
188
189 static void ll_sai_put(struct ll_statahead_info *sai)
190 {
191         struct inode         *inode = sai->sai_inode;
192         struct ll_inode_info *lli;
193         ENTRY;
194
195         LASSERT(inode != NULL);
196         lli = ll_i2info(inode);
197         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_lock)) {
198                 struct ll_sai_entry *entry, *next;
199
200                 LASSERT(lli->lli_opendir_key == NULL);
201                 lli->lli_sai = NULL;
202                 lli->lli_opendir_pid = 0;
203                 spin_unlock(&lli->lli_lock);
204
205                 LASSERT(sa_is_stopped(sai));
206
207                 if (sai->sai_sent > sai->sai_replied)
208                         CDEBUG(D_READA,"statahead for dir %lu/%u does not "
209                               "finish: [sent:%u] [replied:%u]\n",
210                               inode->i_ino, inode->i_generation,
211                               sai->sai_sent, sai->sai_replied);
212
213                 list_for_each_entry_safe(entry, next, &sai->sai_entries_sent,
214                                          se_list) {
215                         list_del_init(&entry->se_list);
216                         ll_sai_entry_cleanup(entry, 1);
217                 }
218                 list_for_each_entry_safe(entry, next, &sai->sai_entries_received,
219                                          se_list) {
220                         list_del_init(&entry->se_list);
221                         ll_sai_entry_cleanup(entry, 1);
222                 }
223                 list_for_each_entry_safe(entry, next, &sai->sai_entries_stated,
224                                          se_list) {
225                         list_del_init(&entry->se_list);
226                         ll_sai_entry_cleanup(entry, 1);
227                 }
228                 OBD_FREE_PTR(sai);
229                 iput(inode);
230         }
231         EXIT;
232 }
233
234 /**
235  * insert it into sai_entries_sent tail when init.
236  */
237 static struct ll_sai_entry *
238 ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index)
239 {
240         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
241         struct ll_sai_entry  *entry;
242         ENTRY;
243
244         OBD_ALLOC_PTR(entry);
245         if (entry == NULL)
246                 RETURN(ERR_PTR(-ENOMEM));
247
248         CDEBUG(D_READA, "alloc sai entry %p index %u\n",
249                entry, index);
250         entry->se_index = index;
251         entry->se_stat  = SA_ENTRY_UNSTATED;
252
253         spin_lock(&lli->lli_lock);
254         list_add_tail(&entry->se_list, &sai->sai_entries_sent);
255         spin_unlock(&lli->lli_lock);
256
257         RETURN(entry);
258 }
259
260 /**
261  * delete it from sai_entries_stated head when fini, it need not
262  * to process entry's member.
263  */
264 static void ll_sai_entry_fini(struct ll_statahead_info *sai)
265 {
266         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
267         struct ll_sai_entry  *entry;
268         ENTRY;
269
270         spin_lock(&lli->lli_lock);
271         sai->sai_index_next++;
272         if (likely(!list_empty(&sai->sai_entries_stated))) {
273                 entry = list_entry(sai->sai_entries_stated.next,
274                                    struct ll_sai_entry, se_list);
275                 if (entry->se_index < sai->sai_index_next) {
276                         list_del(&entry->se_list);
277                         OBD_FREE_PTR(entry);
278                 }
279         } else
280                 LASSERT(sa_is_stopped(sai));
281         spin_unlock(&lli->lli_lock);
282
283         EXIT;
284 }
285
286 /**
287  * inside lli_lock.
288  * \retval NULL : can not find the entry in sai_entries_sent with the index
289  * \retval entry: find the entry in sai_entries_sent with the index
290  */
291 static struct ll_sai_entry *
292 ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat,
293                  struct ptlrpc_request *req, struct md_enqueue_info *minfo)
294 {
295         struct ll_sai_entry *entry;
296         ENTRY;
297
298         if (!list_empty(&sai->sai_entries_sent)) {
299                 list_for_each_entry(entry, &sai->sai_entries_sent, se_list) {
300                         if (entry->se_index == index) {
301                                 entry->se_stat = stat;
302                                 entry->se_req = ptlrpc_request_addref(req);
303                                 entry->se_minfo = minfo;
304                                 RETURN(entry);
305                         } else if (entry->se_index > index)
306                                 RETURN(NULL);
307                 }
308         }
309         RETURN(NULL);
310 }
311
312 /**
313  * inside lli_lock.
314  * Move entry to sai_entries_received and
315  * insert it into sai_entries_received tail.
316  */
317 static inline void
318 ll_sai_entry_to_received(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
319 {
320         if (!list_empty(&entry->se_list))
321                 list_del_init(&entry->se_list);
322         list_add_tail(&entry->se_list, &sai->sai_entries_received);
323 }
324
325 /**
326  * Move entry to sai_entries_stated and
327  * sort with the index.
328  */
329 static int
330 ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
331 {
332         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
333         struct ll_sai_entry  *se;
334         ENTRY;
335
336         ll_sai_entry_cleanup(entry, 0);
337
338         spin_lock(&lli->lli_lock);
339         if (!list_empty(&entry->se_list))
340                 list_del_init(&entry->se_list);
341
342         if (unlikely(entry->se_index < sai->sai_index_next)) {
343                 spin_unlock(&lli->lli_lock);
344                 OBD_FREE_PTR(entry);
345                 RETURN(0);
346         }
347
348         list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) {
349                 if (se->se_index < entry->se_index) {
350                         list_add(&entry->se_list, &se->se_list);
351                         spin_unlock(&lli->lli_lock);
352                         RETURN(1);
353                 }
354         }
355
356         /*
357          * I am the first entry.
358          */
359         list_add(&entry->se_list, &sai->sai_entries_stated);
360         spin_unlock(&lli->lli_lock);
361         RETURN(1);
362 }
363
364 /**
365  * finish lookup/revalidate.
366  */
367 static int do_statahead_interpret(struct ll_statahead_info *sai)
368 {
369         struct ll_inode_info   *lli = ll_i2info(sai->sai_inode);
370         struct ll_sai_entry    *entry;
371         struct ptlrpc_request  *req;
372         struct md_enqueue_info *minfo;
373         struct dentry          *dentry;
374         struct lookup_intent   *it;
375         int                     rc = 0;
376         ENTRY;
377
378         spin_lock(&lli->lli_lock);
379         LASSERT(!sa_received_empty(sai));
380         entry = list_entry(sai->sai_entries_received.next, struct ll_sai_entry,
381                            se_list);
382         list_del_init(&entry->se_list);
383         spin_unlock(&lli->lli_lock);
384
385         if (unlikely(entry->se_index < sai->sai_index_next)) {
386                 ll_sai_entry_cleanup(entry, 1);
387                 RETURN(0);
388         }
389
390         if (entry->se_stat != SA_ENTRY_STATED)
391                 GOTO(out, rc = entry->se_stat);
392
393         req = entry->se_req;
394         minfo = entry->se_minfo;
395         dentry = minfo->mi_dentry;
396         it = &minfo->mi_it;
397
398         if (dentry->d_inode == NULL) {
399                 /*
400                  * lookup.
401                  */
402                 struct dentry    *save = dentry;
403                 struct it_cb_data icbd = {
404                         .icbd_parent   = dentry->d_parent->d_inode,
405                         .icbd_childp   = &dentry
406                 };
407
408                 rc = lookup_it_finish(req, DLM_REPLY_REC_OFF, it, &icbd);
409                 if (!rc)
410                         /*
411                          * Here dentry->d_inode might be NULL,
412                          * because the entry may have been removed before
413                          * we start doing stat ahead.
414                          */
415                         ll_lookup_finish_locks(it, dentry);
416
417                 if (dentry != save) {
418                         minfo->mi_dentry = dentry;
419                         dput(save);
420                 }
421         } else {
422                 /*
423                  * revalidate.
424                  */
425                 struct mds_body *body;
426
427                 body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
428                                       sizeof(*body));
429                 if (memcmp(&minfo->mi_data.fid2, &body->fid1,
430                            sizeof(body->fid1))) {
431                         ll_unhash_aliases(dentry->d_inode);
432                         GOTO(out, rc = -EAGAIN);
433                 }
434
435                 rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, dentry);
436                 if (rc) {
437                         ll_unhash_aliases(dentry->d_inode);
438                         GOTO(out, rc);
439                 }
440
441                 spin_lock(&ll_lookup_lock);
442                 spin_lock(&dcache_lock);
443                 lock_dentry(dentry);
444                 __d_drop(dentry);
445 #ifdef DCACHE_LUSTRE_INVALID
446                 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
447 #endif
448                 unlock_dentry(dentry);
449                 d_rehash_cond(dentry, 0);
450                 spin_unlock(&dcache_lock);
451                 spin_unlock(&ll_lookup_lock);
452
453                 ll_lookup_finish_locks(it, dentry);
454         }
455         EXIT;
456
457 out:
458         if (likely(ll_sai_entry_to_stated(sai, entry)))
459                 cfs_waitq_signal(&sai->sai_waitq);
460         return rc;
461 }
462
463 static int ll_statahead_interpret(struct obd_export *exp,
464                                   struct ptlrpc_request *req,
465                                   struct md_enqueue_info *minfo,
466                                   int rc)
467 {
468         struct dentry            *dentry = minfo->mi_dentry;
469         struct lookup_intent     *it = &minfo->mi_it;
470         struct inode             *dir = dentry->d_parent->d_inode;
471         struct ll_inode_info     *lli = ll_i2info(dir);
472         struct ll_statahead_info *sai;
473         struct ll_sai_entry      *entry;
474         ENTRY;
475
476         CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
477                dentry->d_name.len, dentry->d_name.name, rc);
478
479         spin_lock(&lli->lli_lock);
480         if (unlikely(lli->lli_sai == NULL ||
481                      lli->lli_sai->sai_generation != minfo->mi_generation)) {
482                 spin_unlock(&lli->lli_lock);
483                 ll_intent_release(it);
484                 dput(dentry);
485                 OBD_FREE_PTR(minfo);
486                 RETURN(-ESTALE);
487         } else {
488                 sai = ll_sai_get(lli->lli_sai);
489                 if (rc || dir == NULL)
490                         rc = -ESTALE;
491
492                 entry = ll_sai_entry_set(sai,
493                                          (unsigned int)(long)minfo->mi_cbdata,
494                                          rc ? SA_ENTRY_UNSTATED :
495                                          SA_ENTRY_STATED, req, minfo);
496                 LASSERT(entry != NULL);
497                 if (likely(sa_is_running(sai))) {
498                         ll_sai_entry_to_received(sai, entry);
499                         sai->sai_replied++;
500                         spin_unlock(&lli->lli_lock);
501                         cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
502                 } else {
503                         if (!list_empty(&entry->se_list))
504                                 list_del_init(&entry->se_list);
505                         sai->sai_replied++;
506                         spin_unlock(&lli->lli_lock);
507                         ll_sai_entry_cleanup(entry, 1);
508                 }
509                 ll_sai_put(sai);
510                 RETURN(rc);
511         }
512 }
513
514 static void sa_args_fini(struct md_enqueue_info *minfo,
515                          struct ldlm_enqueue_info *einfo)
516 {
517         LASSERT(minfo && einfo);
518         OBD_FREE_PTR(minfo);
519         OBD_FREE_PTR(einfo);
520 }
521
522 static int sa_args_prep(struct inode *dir, struct dentry *dentry,
523                         struct md_enqueue_info **pmi,
524                         struct ldlm_enqueue_info **pei)
525 {
526         struct ll_inode_info     *lli = ll_i2info(dir);
527         struct md_enqueue_info   *minfo;
528         struct ldlm_enqueue_info *einfo;
529
530         OBD_ALLOC_PTR(einfo);
531         if (einfo == NULL)
532                 return -ENOMEM;
533
534         OBD_ALLOC_PTR(minfo);
535         if (minfo == NULL) {
536                 OBD_FREE_PTR(einfo);
537                 return -ENOMEM;
538         }
539
540         minfo->mi_it.it_op = IT_GETATTR;
541         minfo->mi_dentry = dentry;
542         minfo->mi_cb = ll_statahead_interpret;
543         minfo->mi_generation = lli->lli_sai->sai_generation;
544         minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index;
545
546         einfo->ei_type   = LDLM_IBITS;
547         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
548         einfo->ei_cb_bl  = ll_mdc_blocking_ast;
549         einfo->ei_cb_cp  = ldlm_completion_ast;
550         einfo->ei_cb_gl  = NULL;
551         einfo->ei_cbdata = NULL;
552
553         *pmi = minfo;
554         *pei = einfo;
555
556         return 0;
557 }
558
559 /**
560  * similar to ll_lookup_it().
561  */
562 static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
563 {
564         struct md_enqueue_info   *minfo;
565         struct ldlm_enqueue_info *einfo;
566         int                       rc;
567         ENTRY;
568
569         rc = sa_args_prep(dir, dentry, &minfo, &einfo);
570         if (rc)
571                 RETURN(rc);
572
573         rc = ll_prepare_mdc_op_data(&minfo->mi_data, dir, NULL,
574                                     dentry->d_name.name, dentry->d_name.len, 0,
575                                     NULL);
576         if (rc == 0)
577                 rc = mdc_intent_getattr_async(ll_i2mdcexp(dir), minfo, einfo);
578
579         if (rc)
580                 sa_args_fini(minfo, einfo);
581
582         RETURN(rc);
583 }
584
585 /**
586  * similar to ll_revalidate_it().
587  * \retval      1 -- dentry valid
588  * \retval      0 -- will send stat-ahead request
589  * \retval others -- prepare stat-ahead request failed
590  */
591 static int do_sa_revalidate(struct dentry *dentry)
592 {
593         struct inode             *inode = dentry->d_inode;
594         struct inode             *dir = dentry->d_parent->d_inode;
595         struct ll_fid             fid;
596         struct lookup_intent      it = { .it_op = IT_GETATTR };
597         struct md_enqueue_info   *minfo;
598         struct ldlm_enqueue_info *einfo;
599         int rc;
600         ENTRY;
601
602         if (inode == NULL)
603                 RETURN(1);
604
605         if (d_mountpoint(dentry))
606                 RETURN(1);
607
608         if (dentry == dentry->d_sb->s_root)
609                 RETURN(1);
610
611         ll_inode2fid(&fid, inode);
612
613         rc = mdc_revalidate_lock(ll_i2mdcexp(dir), &it, &fid);
614         if (rc == 1) {
615                 ll_intent_release(&it);
616                 RETURN(1);
617         }
618
619         rc = sa_args_prep(dir, dentry, &minfo, &einfo);
620         if (rc)
621                 RETURN(rc);
622
623         rc = ll_prepare_mdc_op_data(&minfo->mi_data, dentry->d_parent->d_inode,
624                                     inode, dentry->d_name.name,
625                                     dentry->d_name.len, 0, NULL);
626         if (rc == 0)
627                 rc = mdc_intent_getattr_async(ll_i2mdcexp(dir), minfo, einfo);
628
629         if (rc)
630                 sa_args_fini(minfo, einfo);
631
632         RETURN(rc);
633 }
634
635 static inline void ll_name2qstr(struct qstr *q, const char *name, int namelen)
636 {
637         q->name = name;
638         q->len  = namelen;
639         q->hash = full_name_hash(name, namelen);
640 }
641
642 static int ll_statahead_one(struct dentry *parent, struct ll_dir_entry *de)
643 {
644         struct inode             *dir = parent->d_inode;
645         struct ll_inode_info     *lli = ll_i2info(dir);
646         struct ll_statahead_info *sai = lli->lli_sai;
647         struct qstr               name;
648         struct dentry            *dentry;
649         struct ll_sai_entry      *se;
650         int                       rc;
651         ENTRY;
652
653 #ifdef DCACHE_LUSTRE_INVALID
654         if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
655 #else
656         if (d_unhashed(parent)) {
657 #endif
658                 CDEBUG(D_READA, "parent dentry@%p %.*s is "
659                        "invalid, skip statahead\n",
660                        parent, parent->d_name.len, parent->d_name.name);
661                 RETURN(-EINVAL);
662         }
663
664         se = ll_sai_entry_init(sai, sai->sai_index);
665         if (IS_ERR(se))
666                 RETURN(PTR_ERR(se));
667
668         ll_name2qstr(&name, de->lde_name, de->lde_name_len);
669         dentry = d_lookup(parent, &name);
670         if (!dentry) {
671                 dentry = d_alloc(parent, &name);
672                 if (dentry) {
673                         rc = do_sa_lookup(dir, dentry);
674                         if (rc)
675                                 dput(dentry);
676                 } else {
677                         GOTO(out, rc = -ENOMEM);
678                 }
679         } else {
680                 rc = do_sa_revalidate(dentry);
681                 if (rc)
682                         dput(dentry);
683         }
684
685         EXIT;
686
687 out:
688         if (rc) {
689                 CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
690                        se, se->se_index, se->se_stat, rc);
691                 se->se_stat = rc;
692                 if (ll_sai_entry_to_stated(sai, se))
693                         cfs_waitq_signal(&sai->sai_waitq);
694         } else {
695                 sai->sai_sent++;
696         }
697
698         sai->sai_index++;
699         return rc;
700 }
701
702 struct ll_sa_thread_args {
703         struct dentry   *sta_parent;
704         pid_t            sta_pid;
705 };
706
707 static int ll_statahead_thread(void *arg)
708 {
709         struct ll_sa_thread_args *sta = arg;
710         struct dentry            *parent = dget(sta->sta_parent);
711         struct inode             *dir = parent->d_inode;
712         struct ll_inode_info     *lli = ll_i2info(dir);
713         struct ll_sb_info        *sbi = ll_i2sbi(dir);
714         struct ll_statahead_info *sai;
715         struct ptlrpc_thread     *thread;
716         unsigned long             index = 0;
717         int                       first = 0;
718         int                       rc = 0;
719         ENTRY;
720
721         spin_lock(&lli->lli_lock);
722         if (unlikely(lli->lli_sai == NULL)) {
723                 spin_unlock(&lli->lli_lock);
724                 dput(parent);
725                 RETURN(-EAGAIN);
726         } else {
727                 sai = ll_sai_get(lli->lli_sai);
728                 spin_unlock(&lli->lli_lock);
729         }
730
731         {
732                 char pname[16];
733                 snprintf(pname, 15, "ll_sa_%u", sta->sta_pid);
734                 cfs_daemonize(pname);
735         }
736
737         thread = &sai->sai_thread;
738         sbi->ll_sa_total++;
739         spin_lock(&lli->lli_lock);
740         thread->t_flags = SVC_RUNNING;
741         spin_unlock(&lli->lli_lock);
742         cfs_waitq_signal(&thread->t_ctl_waitq);
743         CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
744
745         while (1) {
746                 struct l_wait_info lwi = { 0 };
747                 unsigned long npages;
748                 char *kaddr, *limit;
749                 struct ll_dir_entry *de;
750                 struct page *page;
751
752                 npages = dir_pages(dir);
753                 /*
754                  * reach the end of dir.
755                  */
756                 if (index >= npages) {
757                         CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
758                                index, npages);
759
760                         while (1) {
761                                 l_wait_event(thread->t_ctl_waitq,
762                                              !sa_is_running(sai) ||
763                                              !sa_received_empty(sai) ||
764                                              sai->sai_sent == sai->sai_replied,
765                                              &lwi);
766                                 if (!sa_received_empty(sai) &&
767                                     sa_is_running(sai))
768                                         do_statahead_interpret(sai);
769                                 else
770                                         GOTO(out, rc);
771                         }
772                 }
773
774                 page = ll_get_dir_page(dir, index);
775                 if (IS_ERR(page)) {
776                         rc = PTR_ERR(page);
777                         CERROR("error reading dir %lu/%u page %lu/%u: rc %d\n",
778                                dir->i_ino, dir->i_generation, index,
779                                sai->sai_index, rc);
780                         break;
781                 }
782
783                 kaddr = page_address(page);
784                 limit = kaddr + CFS_PAGE_SIZE - ll_dir_rec_len(1);
785                 de = (struct ll_dir_entry *)kaddr;
786                 if (!index) {
787                         /*
788                          * skip "."
789                          */
790                         de = ll_dir_next_entry(de);
791                         /*
792                          * skip ".."
793                          */
794                         de = ll_dir_next_entry(de);
795                 }
796
797                 for (; (char*)de <= limit; de = ll_dir_next_entry(de)) {
798                         if (de->lde_inode == 0)
799                                 continue;
800
801                         if (de->lde_name[0] == '.' && !sai->sai_ls_all) {
802                                 /*
803                                  * skip hidden files..
804                                  */
805                                 sai->sai_skip_hidden++;
806                                 continue;
807                         }
808
809                         /*
810                          * don't stat-ahead first entry.
811                          */
812                         if (unlikely(!first)) {
813                                 first++;
814                                 continue;
815                         }
816
817 keep_de:
818                         l_wait_event(thread->t_ctl_waitq,
819                                      !sa_is_running(sai) || sa_not_full(sai) ||
820                                      !sa_received_empty(sai),
821                                      &lwi);
822
823                         while (!sa_received_empty(sai) && sa_is_running(sai))
824                                 do_statahead_interpret(sai);
825
826                         if (unlikely(!sa_is_running(sai))) {
827                                 ll_put_page(page);
828                                 GOTO(out, rc);
829                         }
830
831                         if (!sa_not_full(sai))
832                                 /*
833                                  * do not skip the current de.
834                                  */
835                                 goto keep_de;
836
837                         rc = ll_statahead_one(parent, de);
838                         if (rc < 0) {
839                                 ll_put_page(page);
840                                 GOTO(out, rc);
841                         }
842                 }
843                 ll_put_page(page);
844                 index++;
845         }
846         EXIT;
847
848 out:
849         spin_lock(&lli->lli_lock);
850         thread->t_flags = SVC_STOPPED;
851         spin_unlock(&lli->lli_lock);
852         cfs_waitq_signal(&sai->sai_waitq);
853         cfs_waitq_signal(&thread->t_ctl_waitq);
854         ll_sai_put(sai);
855         dput(parent);
856         CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
857                cfs_curproc_pid());
858         return rc;
859 }
860
861 /**
862  * called in ll_file_release().
863  */
864 void ll_stop_statahead(struct inode *inode, void *key)
865 {
866         struct ll_inode_info *lli = ll_i2info(inode);
867
868         if (unlikely(key == NULL))
869                 return;
870
871         spin_lock(&lli->lli_lock);
872         if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) {
873                 spin_unlock(&lli->lli_lock);
874                 return;
875         }
876
877         lli->lli_opendir_key = NULL;
878
879         if (lli->lli_sai) {
880                 struct l_wait_info lwi = { 0 };
881                 struct ptlrpc_thread *thread = &lli->lli_sai->sai_thread;
882
883                 if (!sa_is_stopped(lli->lli_sai)) {
884                         thread->t_flags = SVC_STOPPING;
885                         spin_unlock(&lli->lli_lock);
886                         cfs_waitq_signal(&thread->t_ctl_waitq);
887
888                         CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
889                                cfs_curproc_pid());
890                         l_wait_event(thread->t_ctl_waitq,
891                                      sa_is_stopped(lli->lli_sai),
892                                      &lwi);
893                 } else {
894                         spin_unlock(&lli->lli_lock);
895                 }
896
897                 /*
898                  * Put the ref which was held when first statahead_enter.
899                  * It maybe not the last ref for some statahead requests
900                  * maybe inflight.
901                  */
902                 ll_sai_put(lli->lli_sai);
903         } else {
904                 lli->lli_opendir_pid = 0;
905                 spin_unlock(&lli->lli_lock);
906         }
907 }
908
909 enum {
910         /*
911          * not first dirent, or is "."
912          */
913         LS_NONE_FIRST_DE = 0,
914         /*
915          * the first non-hidden dirent
916          */
917         LS_FIRST_DE,
918         /*
919          * the first hidden dirent, that is ".xxx
920          */
921         LS_FIRST_DOT_DE
922 };
923
924 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
925 {
926         struct qstr         *d_name = &dentry->d_name;
927         unsigned long        npages, index = 0;
928         struct page         *page;
929         struct ll_dir_entry *de;
930         char                *kaddr, *limit;
931         int                  rc = LS_NONE_FIRST_DE, dot_de;
932         ENTRY;
933
934         while (1) {
935                 npages = dir_pages(dir);
936                 /*
937                  * reach the end of dir.
938                  */
939                 if (index >= npages) {
940                         CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
941                                index, npages);
942                         break;
943                 }
944
945                 page = ll_get_dir_page(dir, index);
946                 if (IS_ERR(page)) {
947                         rc = PTR_ERR(page);
948                         CERROR("error reading dir %lu/%u page %lu: rc %d\n",
949                                dir->i_ino, dir->i_generation, index, rc);
950                         break;
951                 }
952
953                 kaddr = page_address(page);
954                 limit = kaddr + CFS_PAGE_SIZE - ll_dir_rec_len(1);
955                 de = (struct ll_dir_entry *)kaddr;
956                 if (!index) {
957                         if (unlikely(!(de->lde_name_len == 1 &&
958                                        strncmp(de->lde_name, ".", 1) == 0)))
959                                 CWARN("Maybe got bad on-disk dir: %lu/%u\n",
960                                       dir->i_ino, dir->i_generation);
961                         /*
962                          * skip "." or ingore bad entry.
963                          */
964                         de = ll_dir_next_entry(de);
965
966                         if (unlikely(!(de->lde_name_len == 2 &&
967                                        strncmp(de->lde_name, "..", 2) == 0)))
968                                 CWARN("Maybe got bad on-disk dir: %lu/%u\n",
969                                       dir->i_ino, dir->i_generation);
970                         /*
971                          * skip ".." or ingore bad entry.
972                          */
973                         de = ll_dir_next_entry(de);
974                 }
975
976                 for (; (char*)de <= limit; de = ll_dir_next_entry(de)) {
977                         if (!de->lde_inode)
978                                 continue;
979
980                         if (de->lde_name[0] == '.')
981                                 dot_de = 1;
982                         else
983                                 dot_de = 0;
984
985                         if (dot_de && d_name->name[0] != '.') {
986                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
987                                        d_name->len, d_name->name,
988                                        de->lde_name_len, de->lde_name);
989                                 continue;
990                         }
991
992                         if (d_name->len == de->lde_name_len &&
993                             !strncmp(d_name->name, de->lde_name, d_name->len))
994                                 rc = LS_FIRST_DE + dot_de;
995                         else
996                                 rc = LS_NONE_FIRST_DE;
997                         ll_put_page(page);
998                         RETURN(rc);
999                 }
1000                 ll_put_page(page);
1001                 index++;
1002         }
1003         RETURN(rc);
1004 }
1005
1006 /**
1007  * Start statahead thread if this is the first dir entry.
1008  * Otherwise if a thread is started already, wait it until it is ahead of me.
1009  * \retval 0       -- stat ahead thread process such dentry, for lookup, it miss
1010  * \retval 1       -- stat ahead thread process such dentry, for lookup, it hit
1011  * \retval -EEXIST -- stat ahead thread started, and this is the first dentry
1012  * \retval -EBADFD -- statahead thread exit and not dentry available
1013  * \retval -EAGAIN -- try to stat by caller
1014  * \retval others  -- error
1015  */
1016 int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
1017 {
1018         struct ll_sb_info        *sbi = ll_i2sbi(dir);
1019         struct ll_inode_info     *lli = ll_i2info(dir);
1020         struct ll_statahead_info *sai = lli->lli_sai;
1021         struct ll_sa_thread_args  sta;
1022         struct l_wait_info        lwi = { 0 };
1023         int                       rc;
1024         ENTRY;
1025
1026         LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
1027
1028         if (sai) {
1029                 if (unlikely(sa_is_stopped(sai) &&
1030                              list_empty(&sai->sai_entries_stated)))
1031                         RETURN(-EBADFD);
1032
1033                 if ((*dentryp)->d_name.name[0] == '.') {
1034                         if (likely(sai->sai_ls_all ||
1035                             sai->sai_miss_hidden >= sai->sai_skip_hidden)) {
1036                                 /*
1037                                  * Hidden dentry is the first one, or statahead
1038                                  * thread does not skip so many hidden dentries
1039                                  * before "sai_ls_all" enabled as below.
1040                                  */
1041                         } else {
1042                                 if (!sai->sai_ls_all)
1043                                         /*
1044                                          * It maybe because hidden dentry is not
1045                                          * the first one, "sai_ls_all" was not
1046                                          * set, then "ls -al" missed. Enable
1047                                          * "sai_ls_all" for such case.
1048                                          */
1049                                         sai->sai_ls_all = 1;
1050
1051                                 /*
1052                                  * Such "getattr" has been skipped before
1053                                  * "sai_ls_all" enabled as above.
1054                                  */
1055                                 sai->sai_miss_hidden++;
1056                                 RETURN(-ENOENT);
1057                         }
1058                 }
1059
1060                 if (ll_sai_entry_stated(sai)) {
1061                         sbi->ll_sa_cached++;
1062                 } else {
1063                         sbi->ll_sa_blocked++;
1064                         /*
1065                          * thread started already, avoid double-stat.
1066                          */
1067                         l_wait_event(sai->sai_waitq,
1068                                      ll_sai_entry_stated(sai) || sa_is_stopped(sai),
1069                                      &lwi);
1070                 }
1071
1072                 if (lookup) {
1073                         struct dentry *result;
1074
1075                         result = d_lookup((*dentryp)->d_parent,
1076                                           &(*dentryp)->d_name);
1077                         if (result) {
1078                                 LASSERT(result != *dentryp);
1079                                 /* BUG 16303: do not drop reference count for
1080                                  * "*dentryp", VFS will do that by itself. */
1081                                 *dentryp = result;
1082                                 RETURN(1);
1083                         }
1084                 }
1085                 /*
1086                  * do nothing for revalidate.
1087                  */
1088                 RETURN(0);
1089         }
1090
1091          /*
1092           * I am the "lli_opendir_pid" owner, only me can set "lli_sai".
1093           */
1094         LASSERT(lli->lli_sai == NULL);
1095
1096         rc = is_first_dirent(dir, *dentryp);
1097         if (rc == LS_NONE_FIRST_DE) {
1098                 /*
1099                  * It is not "ls -{a}l" operation, no need statahead for it.
1100                  */
1101                 spin_lock(&lli->lli_lock);
1102                 lli->lli_opendir_key = NULL;
1103                 lli->lli_opendir_pid = 0;
1104                 spin_unlock(&lli->lli_lock);
1105                 RETURN(-EBADF);
1106         }
1107
1108         sai = ll_sai_alloc();
1109         if (sai == NULL)
1110                 RETURN(-ENOMEM);
1111
1112         sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
1113         sai->sai_inode = igrab(dir);
1114         if (unlikely(sai->sai_inode == NULL)) {
1115                 CWARN("Do not start stat ahead on dying inode %lu/%u.\n",
1116                       dir->i_ino, dir->i_generation);
1117                 OBD_FREE_PTR(sai);
1118                 RETURN(-ESTALE);
1119         }
1120
1121         LASSERT(sai->sai_inode == (*dentryp)->d_parent->d_inode);
1122
1123         sta.sta_parent = (*dentryp)->d_parent;
1124         sta.sta_pid    = cfs_curproc_pid();
1125
1126         lli->lli_sai = sai;
1127         rc = cfs_kernel_thread(ll_statahead_thread, &sta, 0);
1128         if (rc < 0) {
1129                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1130                 lli->lli_opendir_key = NULL;
1131                 sai->sai_thread.t_flags = SVC_STOPPED;
1132                 ll_sai_put(sai);
1133                 LASSERT(lli->lli_sai == NULL);
1134                 RETURN(-EAGAIN);
1135         }
1136
1137         l_wait_event(sai->sai_thread.t_ctl_waitq,
1138                      sa_is_running(sai) || sa_is_stopped(sai),
1139                      &lwi);
1140
1141         /*
1142          * We don't stat-ahead for the first dirent since we are already in
1143          * lookup, and -EEXIST also indicates that this is the first dirent.
1144          */
1145         RETURN(-EEXIST);
1146 }
1147
1148 /**
1149  * update hit/miss count.
1150  */
1151 void ll_statahead_exit(struct dentry *dentry, int result)
1152 {
1153         struct dentry            *parent = dentry->d_parent;
1154         struct ll_inode_info     *lli = ll_i2info(parent->d_inode);
1155         struct ll_sb_info        *sbi = ll_i2sbi(parent->d_inode);
1156         struct ll_statahead_info *sai = lli->lli_sai;
1157         struct ll_dentry_data    *ldd = ll_d2d(dentry);
1158         ENTRY;
1159
1160         if (lli->lli_opendir_pid == cfs_curproc_pid() && sai) {
1161                 if (result >= 1) {
1162                         sbi->ll_sa_hit++;
1163                         sai->sai_hit++;
1164                         sai->sai_consecutive_miss = 0;
1165                         sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
1166                 } else {
1167                         sbi->ll_sa_miss++;
1168                         sai->sai_miss++;
1169                         sai->sai_consecutive_miss++;
1170                         if (sa_low_hit(sai) && sa_is_running(sai)) {
1171                                 sbi->ll_sa_wrong++;
1172                                 CDEBUG(D_READA, "statahead for dir %.*s hit "
1173                                        "ratio too low: hit/miss %u/%u, "
1174                                        "sent/replied %u/%u. stopping statahead "
1175                                        "thread: pid %d\n",
1176                                        parent->d_name.len, parent->d_name.name,
1177                                        sai->sai_hit, sai->sai_miss,
1178                                        sai->sai_sent, sai->sai_replied,
1179                                        cfs_curproc_pid());
1180                                 spin_lock(&lli->lli_lock);
1181                                 if (!sa_is_stopped(sai))
1182                                         sai->sai_thread.t_flags = SVC_STOPPING;
1183                                 spin_unlock(&lli->lli_lock);
1184                         }
1185                 }
1186
1187                 if (!sa_is_stopped(sai))
1188                         cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
1189                 ll_sai_entry_fini(sai);
1190                 if (likely(ldd != NULL))
1191                         ldd->lld_sa_generation = sai->sai_generation;
1192         }
1193         EXIT;
1194 }