Whamcloud - gitweb
b=18721
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/mm.h>
40 #include <linux/smp_lock.h>
41 #include <linux/highmem.h>
42 #include <linux/pagemap.h>
43
44 #define DEBUG_SUBSYSTEM S_LLITE
45
46 #include <obd_support.h>
47 #include <lustre_lite.h>
48 #include <lustre_dlm.h>
49 #include <linux/lustre_version.h>
50 #include "llite_internal.h"
51
52 struct ll_sai_entry {
53         struct list_head        se_list;
54         unsigned int            se_index;
55         int                     se_stat;
56         struct ptlrpc_request  *se_req;
57         struct md_enqueue_info *se_minfo;
58 };
59
60 enum {
61         SA_ENTRY_UNSTATED = 0,
62         SA_ENTRY_STATED
63 };
64
65 static unsigned int sai_generation = 0;
66 static spinlock_t sai_generation_lock = SPIN_LOCK_UNLOCKED;
67
68 /**
69  * Check whether first entry was stated already or not.
70  * No need to hold lli_lock, for:
71  * (1) it is me that remove entry from the list
72  * (2) the statahead thread only add new entry to the list
73  */
74 static int ll_sai_entry_stated(struct ll_statahead_info *sai)
75 {
76         struct ll_sai_entry  *entry;
77         int                   rc = 0;
78
79         if (!list_empty(&sai->sai_entries_stated)) {
80                 entry = list_entry(sai->sai_entries_stated.next,
81                                    struct ll_sai_entry, se_list);
82                 if (entry->se_index == sai->sai_index_next)
83                         rc = 1;
84         }
85         return rc;
86 }
87
88 static inline int sa_received_empty(struct ll_statahead_info *sai)
89 {
90         return list_empty(&sai->sai_entries_received);
91 }
92
93 static inline int sa_not_full(struct ll_statahead_info *sai)
94 {
95         return (sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max);
96 }
97
98 static inline int sa_is_running(struct ll_statahead_info *sai)
99 {
100         return !!(sai->sai_thread.t_flags & SVC_RUNNING);
101 }
102
103 static inline int sa_is_stopping(struct ll_statahead_info *sai)
104 {
105         return !!(sai->sai_thread.t_flags & SVC_STOPPING);
106 }
107
108 static inline int sa_is_stopped(struct ll_statahead_info *sai)
109 {
110         return !!(sai->sai_thread.t_flags & SVC_STOPPED);
111 }
112
113 /**
114  * (1) hit ratio less than 80%
115  * or
116  * (2) consecutive miss more than 8
117  */
118 static inline int sa_low_hit(struct ll_statahead_info *sai)
119 {
120         return ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
121                 (sai->sai_consecutive_miss > 8));
122 }
123
124 /**
125  * process the deleted entry's member and free the entry.
126  * (1) release intent
127  * (2) free md_enqueue_info
128  * (3) drop dentry's ref count
129  * (4) release request's ref count
130  */
131 static void ll_sai_entry_cleanup(struct ll_sai_entry *entry, int free)
132 {
133         struct ptlrpc_request  *req = entry->se_req;
134         struct md_enqueue_info *minfo = entry->se_minfo;
135         ENTRY;
136
137         if (minfo) {
138                 struct dentry        *dentry = minfo->mi_dentry;
139                 struct lookup_intent *it = &minfo->mi_it;
140
141                 entry->se_minfo = NULL;
142                 ll_intent_release(it);
143                 OBD_FREE_PTR(minfo);
144                 dput(dentry);
145         }
146         if (req) {
147                 entry->se_req = NULL;
148                 ptlrpc_req_finished(req);
149         }
150         if (free) {
151                 LASSERT(list_empty(&entry->se_list));
152                 OBD_FREE_PTR(entry);
153         }
154
155         EXIT;
156 }
157
158 static struct ll_statahead_info *ll_sai_alloc(void)
159 {
160         struct ll_statahead_info *sai;
161
162         OBD_ALLOC_PTR(sai);
163         if (!sai)
164                 return NULL;
165
166         spin_lock(&sai_generation_lock);
167         sai->sai_generation = ++sai_generation;
168         if (unlikely(sai_generation == 0))
169                 sai->sai_generation = ++sai_generation;
170         spin_unlock(&sai_generation_lock);
171         atomic_set(&sai->sai_refcount, 1);
172         sai->sai_max = LL_SA_RPC_MIN;
173         cfs_waitq_init(&sai->sai_waitq);
174         cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
175         CFS_INIT_LIST_HEAD(&sai->sai_entries_sent);
176         CFS_INIT_LIST_HEAD(&sai->sai_entries_received);
177         CFS_INIT_LIST_HEAD(&sai->sai_entries_stated);
178         return sai;
179 }
180
181 static inline 
182 struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
183 {
184         LASSERT(sai);
185         atomic_inc(&sai->sai_refcount);
186         return sai;
187 }
188
189 static void ll_sai_put(struct ll_statahead_info *sai)
190 {
191         struct inode         *inode = sai->sai_inode;
192         struct ll_inode_info *lli;
193         ENTRY;
194
195         LASSERT(inode != NULL);
196         lli = ll_i2info(inode);
197         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_lock)) {
198                 struct ll_sai_entry *entry, *next;
199
200                 LASSERT(lli->lli_opendir_key == NULL);
201                 lli->lli_sai = NULL;
202                 lli->lli_opendir_pid = 0;
203                 spin_unlock(&lli->lli_lock);
204
205                 LASSERT(sa_is_stopped(sai));
206
207                 if (sai->sai_sent > sai->sai_replied)
208                         CDEBUG(D_READA,"statahead for dir "DFID" does not "
209                               "finish: [sent:%u] [replied:%u]\n",
210                               PFID(&lli->lli_fid),
211                               sai->sai_sent, sai->sai_replied);
212
213                 list_for_each_entry_safe(entry, next, &sai->sai_entries_sent,
214                                          se_list) {
215                         list_del_init(&entry->se_list);
216                         ll_sai_entry_cleanup(entry, 1);
217                 }
218                 list_for_each_entry_safe(entry, next, &sai->sai_entries_received,
219                                          se_list) {
220                         list_del_init(&entry->se_list);
221                         ll_sai_entry_cleanup(entry, 1);
222                 }
223                 list_for_each_entry_safe(entry, next, &sai->sai_entries_stated,
224                                          se_list) {
225                         list_del_init(&entry->se_list);
226                         ll_sai_entry_cleanup(entry, 1);
227                 }
228                 OBD_FREE_PTR(sai);
229                 iput(inode);
230         }
231         EXIT;
232 }
233
234 /**
235  * insert it into sai_entries_sent tail when init.
236  */
237 static struct ll_sai_entry *
238 ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index)
239 {
240         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
241         struct ll_sai_entry  *entry;
242         ENTRY;
243
244         OBD_ALLOC_PTR(entry);
245         if (entry == NULL)
246                 RETURN(ERR_PTR(-ENOMEM));
247
248         CDEBUG(D_READA, "alloc sai entry %p index %u\n",
249                entry, index);
250         entry->se_index = index;
251         entry->se_stat  = SA_ENTRY_UNSTATED;
252
253         spin_lock(&lli->lli_lock);
254         list_add_tail(&entry->se_list, &sai->sai_entries_sent);
255         spin_unlock(&lli->lli_lock);
256
257         RETURN(entry);
258 }
259
260 /**
261  * delete it from sai_entries_stated head when fini, it need not
262  * to process entry's member.
263  */
264 static void ll_sai_entry_fini(struct ll_statahead_info *sai)
265 {
266         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
267         struct ll_sai_entry  *entry;
268         ENTRY;
269         
270         spin_lock(&lli->lli_lock);
271         sai->sai_index_next++;
272         if (likely(!list_empty(&sai->sai_entries_stated))) {
273                 entry = list_entry(sai->sai_entries_stated.next,
274                                    struct ll_sai_entry, se_list);
275                 if (entry->se_index < sai->sai_index_next) {
276                         list_del(&entry->se_list);
277                         OBD_FREE_PTR(entry);
278                 }
279         } else
280                 LASSERT(sa_is_stopped(sai));
281         spin_unlock(&lli->lli_lock);
282
283         EXIT;
284 }
285
286 /**
287  * inside lli_lock.
288  * \retval NULL : can not find the entry in sai_entries_sent with the index
289  * \retval entry: find the entry in sai_entries_sent with the index
290  */
291 static struct ll_sai_entry *
292 ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat,
293                  struct ptlrpc_request *req, struct md_enqueue_info *minfo)
294 {
295         struct ll_sai_entry *entry;
296         ENTRY;
297
298         if (!list_empty(&sai->sai_entries_sent)) {
299                 list_for_each_entry(entry, &sai->sai_entries_sent, se_list) {
300                         if (entry->se_index == index) {
301                                 entry->se_stat = stat;
302                                 entry->se_req = ptlrpc_request_addref(req);
303                                 entry->se_minfo = minfo;
304                                 RETURN(entry);
305                         } else if (entry->se_index > index)
306                                 RETURN(NULL);
307                 }
308         }
309         RETURN(NULL);
310 }
311
312 /**
313  * inside lli_lock.
314  * Move entry to sai_entries_received and
315  * insert it into sai_entries_received tail.
316  */
317 static inline void
318 ll_sai_entry_to_received(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
319 {
320         if (!list_empty(&entry->se_list))
321                 list_del_init(&entry->se_list);
322         list_add_tail(&entry->se_list, &sai->sai_entries_received);
323 }
324
325 /**
326  * Move entry to sai_entries_stated and
327  * sort with the index.
328  */
329 static int
330 ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
331 {
332         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
333         struct ll_sai_entry  *se;
334         ENTRY;
335
336         ll_sai_entry_cleanup(entry, 0);
337
338         spin_lock(&lli->lli_lock);
339         if (!list_empty(&entry->se_list))
340                 list_del_init(&entry->se_list);
341
342         if (unlikely(entry->se_index < sai->sai_index_next)) {
343                 spin_unlock(&lli->lli_lock);
344                 OBD_FREE_PTR(entry);
345                 RETURN(0);
346         }
347
348         list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) {
349                 if (se->se_index < entry->se_index) {
350                         list_add(&entry->se_list, &se->se_list);
351                         spin_unlock(&lli->lli_lock);
352                         RETURN(1);
353                 }
354         }
355
356         /*
357          * I am the first entry.
358          */
359         list_add(&entry->se_list, &sai->sai_entries_stated);
360         spin_unlock(&lli->lli_lock);
361         RETURN(1);
362 }
363
364 /**
365  * finish lookup/revalidate.
366  */
367 static int do_statahead_interpret(struct ll_statahead_info *sai)
368 {
369         struct ll_inode_info   *lli = ll_i2info(sai->sai_inode);
370         struct ll_sai_entry    *entry;
371         struct ptlrpc_request  *req;
372         struct md_enqueue_info *minfo;
373         struct dentry          *dentry;
374         struct lookup_intent   *it;
375         int                     rc = 0;
376         struct mdt_body        *body;
377         ENTRY;
378
379         spin_lock(&lli->lli_lock);
380         LASSERT(!sa_received_empty(sai));
381         entry = list_entry(sai->sai_entries_received.next, struct ll_sai_entry,
382                            se_list);
383         list_del_init(&entry->se_list);
384         spin_unlock(&lli->lli_lock);
385
386         if (unlikely(entry->se_index < sai->sai_index_next)) {
387                 ll_sai_entry_cleanup(entry, 1);
388                 RETURN(0);
389         }
390
391         if (entry->se_stat != SA_ENTRY_STATED)
392                 GOTO(out, rc = entry->se_stat);
393
394         req = entry->se_req;
395         minfo = entry->se_minfo;
396         dentry = minfo->mi_dentry;
397         it = &minfo->mi_it;
398
399         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
400         if (body == NULL)
401                 GOTO(out, rc = -EFAULT);
402
403         if (dentry->d_inode == NULL) {
404                 /*
405                  * lookup.
406                  */
407                 struct dentry    *save = dentry;
408                 struct it_cb_data icbd = {
409                         .icbd_parent   = dentry->d_parent->d_inode,
410                         .icbd_childp   = &dentry
411                 };
412
413                 LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
414
415                 /*
416                  * XXX: No fid in reply, this is probaly cross-ref case.
417                  * SA can't handle it yet.
418                  */
419                 if (body->valid & OBD_MD_MDS)
420                         GOTO(out, rc = -EAGAIN);
421
422                 rc = ll_lookup_it_finish(req, it, &icbd);
423                 if (!rc)
424                         /*
425                          * Here dentry->d_inode might be NULL,
426                          * because the entry may have been removed before
427                          * we start doing stat ahead.
428                          */
429                         ll_lookup_finish_locks(it, dentry);
430
431                 if (dentry != save) {
432                         minfo->mi_dentry = dentry;
433                         dput(save);
434                 }
435         } else {
436                 /*
437                  * revalidate.
438                  */
439                 if (!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1)) {
440                         ll_unhash_aliases(dentry->d_inode);
441                         GOTO(out, rc = -EAGAIN);
442                 }
443
444                 rc = ll_revalidate_it_finish(req, it, dentry);
445                 if (rc) {
446                         ll_unhash_aliases(dentry->d_inode);
447                         GOTO(out, rc);
448                 }
449
450                 spin_lock(&ll_lookup_lock);
451                 spin_lock(&dcache_lock);
452                 lock_dentry(dentry);
453                 __d_drop(dentry);
454 #ifdef DCACHE_LUSTRE_INVALID
455                 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
456 #endif
457                 unlock_dentry(dentry);
458                 d_rehash_cond(dentry, 0);
459                 spin_unlock(&dcache_lock);
460                 spin_unlock(&ll_lookup_lock);
461
462                 ll_lookup_finish_locks(it, dentry);
463         }
464         EXIT;
465
466 out:
467         if (likely(ll_sai_entry_to_stated(sai, entry)))
468                 cfs_waitq_signal(&sai->sai_waitq);
469         return rc;
470 }
471
472 static int ll_statahead_interpret(struct ptlrpc_request *req,
473                                   struct md_enqueue_info *minfo,
474                                   int rc)
475 {
476         struct dentry            *dentry = minfo->mi_dentry;
477         struct lookup_intent     *it = &minfo->mi_it;
478         struct inode             *dir = dentry->d_parent->d_inode;
479         struct ll_inode_info     *lli = ll_i2info(dir);
480         struct ll_statahead_info *sai;
481         struct ll_sai_entry      *entry;
482         ENTRY;
483
484         CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
485                dentry->d_name.len, dentry->d_name.name, rc);
486
487         spin_lock(&lli->lli_lock);
488         if (unlikely(lli->lli_sai == NULL ||
489             lli->lli_sai->sai_generation != minfo->mi_generation)) {
490                 spin_unlock(&lli->lli_lock);
491                 ll_intent_release(it);
492                 dput(dentry);
493                 OBD_FREE_PTR(minfo);
494                 RETURN(-ESTALE);
495         } else {
496                 sai = ll_sai_get(lli->lli_sai);
497                 if (rc || dir == NULL)
498                         rc = -ESTALE;
499
500                 entry = ll_sai_entry_set(sai,
501                                          (unsigned int)(long)minfo->mi_cbdata,
502                                          rc ? SA_ENTRY_UNSTATED :
503                                          SA_ENTRY_STATED, req, minfo);
504                 LASSERT(entry != NULL);
505                 if (likely(sa_is_running(sai))) {
506                         ll_sai_entry_to_received(sai, entry);
507                         sai->sai_replied++;
508                         spin_unlock(&lli->lli_lock);
509                         cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
510                 } else {
511                         if (!list_empty(&entry->se_list))
512                                 list_del_init(&entry->se_list);
513                         sai->sai_replied++;
514                         spin_unlock(&lli->lli_lock);
515                         ll_sai_entry_cleanup(entry, 1);
516                 }
517                 ll_sai_put(sai);
518                 RETURN(rc);
519         }
520 }
521
522 static void sa_args_fini(struct md_enqueue_info *minfo,
523                          struct ldlm_enqueue_info *einfo)
524 {
525         LASSERT(minfo && einfo);
526         capa_put(minfo->mi_data.op_capa1);
527         capa_put(minfo->mi_data.op_capa2);
528         OBD_FREE_PTR(minfo);
529         OBD_FREE_PTR(einfo);
530 }
531
532 /**
533  * There is race condition between "capa_put" and "ll_statahead_interpret" for
534  * accessing "op_data.op_capa[1,2]" as following:
535  * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling
536  * "md_intent_getattr_async". But "ll_statahead_interpret" maybe run first, and
537  * fill "op_data.op_capa[1,2]" as POISON, then cause "capa_put" access invalid
538  * "ocapa". So here reserve "op_data.op_capa[1,2]" in "pcapa" before calling
539  * "md_intent_getattr_async".
540  */
541 static int sa_args_init(struct inode *dir, struct dentry *dentry,
542                         struct md_enqueue_info **pmi,
543                         struct ldlm_enqueue_info **pei,
544                         struct obd_capa **pcapa)
545 {
546         struct ll_inode_info     *lli = ll_i2info(dir);
547         struct md_enqueue_info   *minfo;
548         struct ldlm_enqueue_info *einfo;
549         struct md_op_data        *op_data;
550
551         OBD_ALLOC_PTR(einfo);
552         if (einfo == NULL)
553                 return -ENOMEM;
554
555         OBD_ALLOC_PTR(minfo);
556         if (minfo == NULL) {
557                 OBD_FREE_PTR(einfo);
558                 return -ENOMEM;
559         }
560
561         op_data = ll_prep_md_op_data(&minfo->mi_data, dir, dentry->d_inode,
562                                      dentry->d_name.name, dentry->d_name.len,
563                                      0, LUSTRE_OPC_ANY, NULL);
564         if (IS_ERR(op_data)) {
565                 OBD_FREE_PTR(einfo);
566                 OBD_FREE_PTR(minfo);
567                 return PTR_ERR(op_data);
568         }
569
570         minfo->mi_it.it_op = IT_GETATTR;
571         minfo->mi_dentry = dentry;
572         minfo->mi_cb = ll_statahead_interpret;
573         minfo->mi_generation = lli->lli_sai->sai_generation;
574         minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index;
575
576         einfo->ei_type   = LDLM_IBITS;
577         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
578         einfo->ei_cb_bl  = ll_md_blocking_ast;
579         einfo->ei_cb_cp  = ldlm_completion_ast;
580         einfo->ei_cb_gl  = NULL;
581         einfo->ei_cbdata = NULL;
582
583         *pmi = minfo;
584         *pei = einfo;
585         pcapa[0] = op_data->op_capa1;
586         pcapa[1] = op_data->op_capa2;
587
588         return 0;
589 }
590
591 /**
592  * similar to ll_lookup_it().
593  */
594 static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
595 {
596         struct md_enqueue_info   *minfo;
597         struct ldlm_enqueue_info *einfo;
598         struct obd_capa          *capas[2];
599         int                       rc;
600         ENTRY;
601
602         rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
603         if (rc)
604                 RETURN(rc);
605
606         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
607         if (!rc) {
608                 capa_put(capas[0]);
609                 capa_put(capas[1]);
610         } else {
611                 sa_args_fini(minfo, einfo);
612         }
613
614         RETURN(rc);
615 }
616
617 /**
618  * similar to ll_revalidate_it().
619  * \retval      1 -- dentry valid
620  * \retval      0 -- will send stat-ahead request
621  * \retval others -- prepare stat-ahead request failed
622  */
623 static int do_sa_revalidate(struct dentry *dentry)
624 {
625         struct inode             *inode = dentry->d_inode;
626         struct inode             *dir = dentry->d_parent->d_inode;
627         struct lookup_intent      it = { .it_op = IT_GETATTR };
628         struct md_enqueue_info   *minfo;
629         struct ldlm_enqueue_info *einfo;
630         struct obd_capa          *capas[2];
631         int rc;
632         ENTRY;
633
634         if (inode == NULL)
635                 RETURN(1);
636
637         if (d_mountpoint(dentry))
638                 RETURN(1);
639
640         if (dentry == dentry->d_sb->s_root)
641                 RETURN(1);
642
643         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode));
644         if (rc == 1) {
645                 ll_intent_release(&it);
646                 RETURN(1);
647         }
648
649         rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
650         if (rc)
651                 RETURN(rc);
652
653         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
654         if (!rc) {
655                 capa_put(capas[0]);
656                 capa_put(capas[1]);
657         } else {
658                 sa_args_fini(minfo, einfo);
659         }
660
661         RETURN(rc);
662 }
663
664 static inline void ll_name2qstr(struct qstr *q, const char *name, int namelen)
665 {
666         q->name = name;
667         q->len  = namelen;
668         q->hash = full_name_hash(name, namelen);
669 }
670
671 static int ll_statahead_one(struct dentry *parent, const char* entry_name,
672                             int entry_name_len)
673 {
674         struct inode             *dir = parent->d_inode;
675         struct ll_inode_info     *lli = ll_i2info(dir);
676         struct ll_statahead_info *sai = lli->lli_sai;
677         struct qstr               name;
678         struct dentry            *dentry;
679         struct ll_sai_entry      *se;
680         int                       rc;
681         ENTRY;
682
683 #ifdef DCACHE_LUSTRE_INVALID
684         if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
685 #else
686         if (d_unhashed(parent)) {
687 #endif
688                 CDEBUG(D_READA, "parent dentry@%p %.*s is "
689                        "invalid, skip statahead\n",
690                        parent, parent->d_name.len, parent->d_name.name);
691                 RETURN(-EINVAL);
692         }
693
694         se = ll_sai_entry_init(sai, sai->sai_index);
695         if (IS_ERR(se))
696                 RETURN(PTR_ERR(se));
697
698         ll_name2qstr(&name, entry_name, entry_name_len);
699         dentry = d_lookup(parent, &name);
700         if (!dentry) {
701                 dentry = d_alloc(parent, &name);
702                 if (dentry) {
703                         rc = do_sa_lookup(dir, dentry);
704                         if (rc)
705                                 dput(dentry);
706                 } else {
707                         GOTO(out, rc = -ENOMEM);
708                 }
709         } else {
710                 rc = do_sa_revalidate(dentry);
711                 if (rc)
712                         dput(dentry);
713         }
714
715         EXIT;
716
717 out:
718         if (rc) {
719                 CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
720                        se, se->se_index, se->se_stat, rc);
721                 se->se_stat = rc;
722                 if (ll_sai_entry_to_stated(sai, se))
723                         cfs_waitq_signal(&sai->sai_waitq);
724         } else {
725                 sai->sai_sent++;
726         }
727
728         sai->sai_index++;
729         return rc;
730 }
731
732 struct ll_sa_thread_args {
733         struct dentry   *sta_parent;
734         pid_t            sta_pid;
735 };
736
737 static int ll_statahead_thread(void *arg)
738 {
739         struct ll_sa_thread_args *sta = arg;
740         struct dentry            *parent = dget(sta->sta_parent);
741         struct inode             *dir = parent->d_inode;
742         struct ll_inode_info     *lli = ll_i2info(dir);
743         struct ll_sb_info        *sbi = ll_i2sbi(dir);
744         struct ll_statahead_info *sai;
745         struct ptlrpc_thread     *thread;
746         struct page              *page;
747         __u64                     pos = 0;
748         int                       first = 0;
749         int                       rc = 0;
750         struct ll_dir_chain       chain;
751         ENTRY;
752
753         spin_lock(&lli->lli_lock);
754         if (unlikely(lli->lli_sai == NULL)) {
755                 spin_unlock(&lli->lli_lock);
756                 dput(parent);
757                 RETURN(-EAGAIN);
758         } else {
759                 sai = ll_sai_get(lli->lli_sai);
760                 spin_unlock(&lli->lli_lock);
761         }
762
763         {
764                 char pname[16];
765                 snprintf(pname, 15, "ll_sa_%u", sta->sta_pid);
766                 cfs_daemonize(pname);
767         }
768
769         thread = &sai->sai_thread;
770         sbi->ll_sa_total++;
771         spin_lock(&lli->lli_lock);
772         thread->t_flags = SVC_RUNNING;
773         spin_unlock(&lli->lli_lock);
774         cfs_waitq_signal(&thread->t_ctl_waitq);
775         CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
776
777         ll_dir_chain_init(&chain);
778         page = ll_get_dir_page(dir, pos, 0, &chain);
779
780         while (1) {
781                 struct l_wait_info lwi = { 0 };
782                 struct lu_dirpage *dp;
783                 struct lu_dirent  *ent;
784
785                 if (IS_ERR(page)) {
786                         rc = PTR_ERR(page);
787                         CERROR("error reading dir "DFID" at "LPU64"/%u: rc %d\n",
788                                PFID(ll_inode2fid(dir)), pos,
789                                sai->sai_index, rc);
790                         break;
791                 }
792
793                 dp = page_address(page);
794                 for (ent = lu_dirent_start(dp); ent != NULL;
795                      ent = lu_dirent_next(ent)) {
796                         char *name = ent->lde_name;
797                         int namelen = le16_to_cpu(ent->lde_namelen);
798
799                         if (namelen == 0)
800                                 /*
801                                  * Skip dummy record.
802                                  */
803                                 continue;
804
805                         if (name[0] == '.') {
806                                 if (namelen == 1) {
807                                         /*
808                                          * skip "."
809                                          */
810                                         continue;
811                                 } else if (name[1] == '.' && namelen == 2) {
812                                         /*
813                                          * skip ".."
814                                          */
815                                         continue;
816                                 } else if (!sai->sai_ls_all) {
817                                         /*
818                                          * skip hidden files.
819                                          */
820                                         sai->sai_skip_hidden++;
821                                         continue;
822                                 }
823                         }
824
825                         /*
826                          * don't stat-ahead first entry.
827                          */
828                         if (unlikely(!first)) {
829                                 first++;
830                                 continue;
831                         }
832
833 keep_de:
834                         l_wait_event(thread->t_ctl_waitq,
835                                      !sa_is_running(sai) || sa_not_full(sai) ||
836                                      !sa_received_empty(sai),
837                                      &lwi);
838
839                         while (!sa_received_empty(sai) && sa_is_running(sai))
840                                 do_statahead_interpret(sai);
841
842                         if (unlikely(!sa_is_running(sai))) {
843                                 ll_put_page(page);
844                                 GOTO(out, rc);
845                         }
846
847                         if (!sa_not_full(sai))
848                                 /*
849                                  * do not skip the current de.
850                                  */
851                                 goto keep_de;
852
853                         rc = ll_statahead_one(parent, name, namelen);
854                         if (rc < 0) {
855                                 ll_put_page(page);
856                                 GOTO(out, rc);
857                         }
858                 }
859                 pos = le64_to_cpu(dp->ldp_hash_end);
860                 ll_put_page(page);
861                 if (pos == DIR_END_OFF) {
862                         /*
863                          * End of directory reached.
864                          */
865                         while (1) {
866                                 l_wait_event(thread->t_ctl_waitq,
867                                              !sa_is_running(sai) ||
868                                              !sa_received_empty(sai) ||
869                                              sai->sai_sent == sai->sai_replied,
870                                              &lwi);
871                                 if (!sa_received_empty(sai) &&
872                                     sa_is_running(sai))
873                                         do_statahead_interpret(sai);
874                                 else
875                                         GOTO(out, rc);
876                         }
877                 } else if (1) {
878                         /*
879                          * chain is exhausted.
880                          * Normal case: continue to the next page.
881                          */
882                         page = ll_get_dir_page(dir, pos, 1, &chain);
883                 } else {
884                         /*
885                          * go into overflow page.
886                          */
887                 }
888         }
889         EXIT;
890
891 out:
892         ll_dir_chain_fini(&chain);
893         spin_lock(&lli->lli_lock);
894         thread->t_flags = SVC_STOPPED;
895         spin_unlock(&lli->lli_lock);
896         cfs_waitq_signal(&sai->sai_waitq);
897         cfs_waitq_signal(&thread->t_ctl_waitq);
898         ll_sai_put(sai);
899         dput(parent);
900         CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
901                cfs_curproc_pid());
902         return rc;
903 }
904
905 /**
906  * called in ll_file_release().
907  */
908 void ll_stop_statahead(struct inode *inode, void *key)
909 {
910         struct ll_inode_info *lli = ll_i2info(inode);
911
912         if (unlikely(key == NULL))
913                 return;
914
915         spin_lock(&lli->lli_lock);
916         if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) {
917                 spin_unlock(&lli->lli_lock);
918                 return;
919         }
920
921         lli->lli_opendir_key = NULL;
922
923         if (lli->lli_sai) {
924                 struct l_wait_info lwi = { 0 };
925                 struct ptlrpc_thread *thread = &lli->lli_sai->sai_thread;
926
927                 if (!sa_is_stopped(lli->lli_sai)) {
928                         thread->t_flags = SVC_STOPPING;
929                         spin_unlock(&lli->lli_lock);
930                         cfs_waitq_signal(&thread->t_ctl_waitq);
931
932                         CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
933                                cfs_curproc_pid());
934                         l_wait_event(thread->t_ctl_waitq,
935                                      sa_is_stopped(lli->lli_sai),
936                                      &lwi);
937                 } else {
938                         spin_unlock(&lli->lli_lock);
939                 }
940
941                 /*
942                  * Put the ref which was held when first statahead_enter.
943                  * It maybe not the last ref for some statahead requests
944                  * maybe inflight.
945                  */
946                 ll_sai_put(lli->lli_sai);
947         } else {
948                 lli->lli_opendir_pid = 0;
949                 spin_unlock(&lli->lli_lock);
950         }
951 }
952
953 enum {
954         /**
955          * not first dirent, or is "."
956          */
957         LS_NONE_FIRST_DE = 0,
958         /**
959          * the first non-hidden dirent
960          */
961         LS_FIRST_DE,
962         /**
963          * the first hidden dirent, that is ".xxx
964          */
965         LS_FIRST_DOT_DE
966 };
967
968 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
969 {
970         struct ll_dir_chain chain;
971         struct qstr        *target = &dentry->d_name;
972         struct page        *page;
973         __u64               pos = 0;
974         int                 dot_de;
975         int                 rc = LS_NONE_FIRST_DE;
976         ENTRY;
977
978         ll_dir_chain_init(&chain);
979         page = ll_get_dir_page(dir, pos, 0, &chain);
980
981         while (1) {
982                 struct lu_dirpage *dp;
983                 struct lu_dirent  *ent;
984
985                 if (IS_ERR(page)) {
986                         rc = PTR_ERR(page);
987                         CERROR("error reading dir "DFID" at "LPU64": rc %d\n",
988                                PFID(ll_inode2fid(dir)), pos, rc);
989                         break;
990                 }
991
992                 dp = page_address(page);
993                 for (ent = lu_dirent_start(dp); ent != NULL;
994                      ent = lu_dirent_next(ent)) {
995                         char *name = ent->lde_name;
996                         int namelen = le16_to_cpu(ent->lde_namelen);
997
998                         if (namelen == 0)
999                                 /*
1000                                  * skip dummy record.
1001                                  */
1002                                 continue;
1003
1004                         if (name[0] == '.') {
1005                                 if (namelen == 1)
1006                                         /*
1007                                          * skip "."
1008                                          */
1009                                         continue;
1010                                 else if (name[1] == '.' && namelen == 2)
1011                                         /*
1012                                          * skip ".."
1013                                          */
1014                                         continue;
1015                                 else
1016                                         dot_de = 1;
1017                         } else {
1018                                 dot_de = 0;
1019                         }
1020
1021                         if (dot_de && target->name[0] != '.') {
1022                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1023                                        target->len, target->name,
1024                                        namelen, name);
1025                                 continue;
1026                         }
1027
1028                         if (target->len == namelen &&
1029                             memcmp(target->name, name, namelen) == 0)
1030                                 rc = LS_FIRST_DE + dot_de;
1031                         else
1032                                 rc = LS_NONE_FIRST_DE;
1033                         ll_put_page(page);
1034                         GOTO(out, rc);
1035                 }
1036                 pos = le64_to_cpu(dp->ldp_hash_end);
1037                 ll_put_page(page);
1038                 if (pos == DIR_END_OFF) {
1039                         /*
1040                          * End of directory reached.
1041                          */
1042                         break;
1043                 } else if (1) {
1044                         /*
1045                          * chain is exhausted 
1046                          * Normal case: continue to the next page.
1047                          */
1048                         page = ll_get_dir_page(dir, pos, 1, &chain);
1049                 } else {
1050                         /*
1051                          * go into overflow page.
1052                          */
1053                 }
1054         }
1055         EXIT;
1056
1057 out:
1058         ll_dir_chain_fini(&chain);
1059         return rc;
1060 }
1061
1062 /**
1063  * Start statahead thread if this is the first dir entry.
1064  * Otherwise if a thread is started already, wait it until it is ahead of me.
1065  * \retval 0       -- stat ahead thread process such dentry, for lookup, it miss
1066  * \retval 1       -- stat ahead thread process such dentry, for lookup, it hit
1067  * \retval -EEXIST -- stat ahead thread started, and this is the first dentry
1068  * \retval -EBADFD -- statahead thread exit and not dentry available
1069  * \retval -EAGAIN -- try to stat by caller
1070  * \retval others  -- error
1071  */
1072 int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
1073 {
1074         struct ll_sb_info        *sbi = ll_i2sbi(dir);
1075         struct ll_inode_info     *lli = ll_i2info(dir);
1076         struct ll_statahead_info *sai = lli->lli_sai;
1077         struct ll_sa_thread_args  sta;
1078         struct l_wait_info        lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1079         int                       rc = 0;
1080         ENTRY;
1081
1082         LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
1083
1084         if (sai) {
1085                 if (unlikely(sa_is_stopped(sai) &&
1086                              list_empty(&sai->sai_entries_stated)))
1087                         RETURN(-EBADFD);
1088
1089                 if ((*dentryp)->d_name.name[0] == '.') {
1090                         if (likely(sai->sai_ls_all ||
1091                             sai->sai_miss_hidden >= sai->sai_skip_hidden)) {
1092                                 /*
1093                                  * Hidden dentry is the first one, or statahead
1094                                  * thread does not skip so many hidden dentries
1095                                  * before "sai_ls_all" enabled as below.
1096                                  */
1097                         } else {
1098                                 if (!sai->sai_ls_all)
1099                                         /*
1100                                          * It maybe because hidden dentry is not
1101                                          * the first one, "sai_ls_all" was not
1102                                          * set, then "ls -al" missed. Enable
1103                                          * "sai_ls_all" for such case.
1104                                          */
1105                                         sai->sai_ls_all = 1;
1106
1107                                 /*
1108                                  * Such "getattr" has been skipped before
1109                                  * "sai_ls_all" enabled as above.
1110                                  */
1111                                 sai->sai_miss_hidden++;
1112                                 RETURN(-ENOENT);
1113                         }
1114                 }
1115
1116                 if (ll_sai_entry_stated(sai)) {
1117                         sbi->ll_sa_cached++;
1118                 } else {
1119                         sbi->ll_sa_blocked++;
1120                         /*
1121                          * thread started already, avoid double-stat.
1122                          */
1123                         rc = l_wait_event(sai->sai_waitq,
1124                                           ll_sai_entry_stated(sai) ||
1125                                           sa_is_stopped(sai),
1126                                           &lwi);
1127                 }
1128
1129                 if (lookup) {
1130                         struct dentry *result;
1131
1132                         result = d_lookup((*dentryp)->d_parent,
1133                                           &(*dentryp)->d_name);
1134                         if (result) {
1135                                 LASSERT(result != *dentryp);
1136                                 /* BUG 16303: do not drop reference count for
1137                                  * "*dentryp", VFS will do that by itself. */
1138                                 *dentryp = result;
1139                                 RETURN(1);
1140                         }
1141                 }
1142                 /*
1143                  * do nothing for revalidate.
1144                  */
1145                 RETURN(rc);
1146         }
1147
1148          /*
1149           * I am the "lli_opendir_pid" owner, only me can set "lli_sai".
1150           */ 
1151         LASSERT(lli->lli_sai == NULL);
1152
1153         rc = is_first_dirent(dir, *dentryp);
1154         if (rc == LS_NONE_FIRST_DE) {
1155                 /*
1156                  * It is not "ls -{a}l" operation, no need statahead for it.
1157                  */
1158                 spin_lock(&lli->lli_lock);
1159                 lli->lli_opendir_key = NULL;
1160                 lli->lli_opendir_pid = 0;
1161                 spin_unlock(&lli->lli_lock);
1162                 RETURN(-EBADF);
1163         }
1164
1165         sai = ll_sai_alloc();
1166         if (sai == NULL)
1167                 RETURN(-ENOMEM);
1168
1169         sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
1170         sai->sai_inode = igrab(dir);
1171         if (unlikely(sai->sai_inode == NULL)) {
1172                 CWARN("Do not start stat ahead on dying inode "DFID" .\n",
1173                       PFID(&lli->lli_fid));
1174                 OBD_FREE_PTR(sai);
1175                 RETURN(-ESTALE);
1176         }
1177
1178         LASSERT(sai->sai_inode == (*dentryp)->d_parent->d_inode);
1179
1180         sta.sta_parent = (*dentryp)->d_parent;
1181         sta.sta_pid    = cfs_curproc_pid();
1182
1183         lli->lli_sai = sai;
1184         rc = cfs_kernel_thread(ll_statahead_thread, &sta, 0);
1185         if (rc < 0) {
1186                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1187                 lli->lli_opendir_key = NULL;
1188                 sai->sai_thread.t_flags = SVC_STOPPED;
1189                 ll_sai_put(sai);
1190                 LASSERT(lli->lli_sai == NULL);
1191                 RETURN(-EAGAIN);
1192         }
1193
1194         l_wait_event(sai->sai_thread.t_ctl_waitq, 
1195                      sa_is_running(sai) || sa_is_stopped(sai),
1196                      &lwi);
1197
1198         /*
1199          * We don't stat-ahead for the first dirent since we are already in
1200          * lookup, and -EEXIST also indicates that this is the first dirent.
1201          */
1202         RETURN(-EEXIST);
1203 }
1204
1205 /**
1206  * update hit/miss count.
1207  */
1208 void ll_statahead_exit(struct dentry *dentry, int result)
1209 {
1210         struct dentry            *parent = dentry->d_parent;
1211         struct ll_inode_info     *lli = ll_i2info(parent->d_inode);
1212         struct ll_sb_info        *sbi = ll_i2sbi(parent->d_inode);
1213         struct ll_statahead_info *sai = lli->lli_sai;
1214         struct ll_dentry_data    *ldd = ll_d2d(dentry);
1215         ENTRY;
1216
1217         if (lli->lli_opendir_pid == cfs_curproc_pid() && sai) {
1218                 if (result >= 1) {
1219                         sbi->ll_sa_hit++;
1220                         sai->sai_hit++;
1221                         sai->sai_consecutive_miss = 0;
1222                         sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
1223                 } else {
1224                         sbi->ll_sa_miss++;
1225                         sai->sai_miss++;
1226                         sai->sai_consecutive_miss++;
1227                         if (sa_low_hit(sai) && sa_is_running(sai)) {
1228                                 sbi->ll_sa_wrong++;
1229                                 CDEBUG(D_READA, "statahead for dir %.*s hit "
1230                                        "ratio too low: hit/miss %u/%u, "
1231                                        "sent/replied %u/%u. stopping statahead "
1232                                        "thread: pid %d\n",
1233                                        parent->d_name.len, parent->d_name.name,
1234                                        sai->sai_hit, sai->sai_miss,
1235                                        sai->sai_sent, sai->sai_replied,
1236                                        cfs_curproc_pid());
1237                                 spin_lock(&lli->lli_lock);
1238                                 if (!sa_is_stopped(sai))
1239                                         sai->sai_thread.t_flags = SVC_STOPPING;
1240                                 spin_unlock(&lli->lli_lock);
1241                         }
1242                 }
1243
1244                 if (!sa_is_stopped(sai))
1245                         cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
1246                 ll_sai_entry_fini(sai);
1247                 if (likely(ldd != NULL))
1248                         ldd->lld_sa_generation = sai->sai_generation;
1249         }
1250         EXIT;
1251 }