Whamcloud - gitweb
b=21804 make sure the request is protected by rq_refcount while
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/mm.h>
40 #include <linux/smp_lock.h>
41 #include <linux/highmem.h>
42 #include <linux/pagemap.h>
43
44 #define DEBUG_SUBSYSTEM S_LLITE
45
46 #include <obd_support.h>
47 #include <lustre_lite.h>
48 #include <lustre_dlm.h>
49 #include <linux/lustre_version.h>
50 #include "llite_internal.h"
51
52 struct ll_sai_entry {
53         struct list_head        se_list;
54         unsigned int            se_index;
55         int                     se_stat;
56         struct ptlrpc_request  *se_req;
57         struct md_enqueue_info *se_minfo;
58 };
59
60 enum {
61         SA_ENTRY_UNSTATED = 0,
62         SA_ENTRY_STATED
63 };
64
65 static unsigned int sai_generation = 0;
66 static spinlock_t sai_generation_lock = SPIN_LOCK_UNLOCKED;
67
68 /**
69  * Check whether first entry was stated already or not.
70  * No need to hold lli_lock, for:
71  * (1) it is me that remove entry from the list
72  * (2) the statahead thread only add new entry to the list
73  */
74 static int ll_sai_entry_stated(struct ll_statahead_info *sai)
75 {
76         struct ll_sai_entry  *entry;
77         int                   rc = 0;
78
79         if (!list_empty(&sai->sai_entries_stated)) {
80                 entry = list_entry(sai->sai_entries_stated.next,
81                                    struct ll_sai_entry, se_list);
82                 if (entry->se_index == sai->sai_index_next)
83                         rc = 1;
84         }
85         return rc;
86 }
87
88 static inline int sa_received_empty(struct ll_statahead_info *sai)
89 {
90         return list_empty(&sai->sai_entries_received);
91 }
92
93 static inline int sa_not_full(struct ll_statahead_info *sai)
94 {
95         return (sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max);
96 }
97
98 static inline int sa_is_running(struct ll_statahead_info *sai)
99 {
100         return !!(sai->sai_thread.t_flags & SVC_RUNNING);
101 }
102
103 static inline int sa_is_stopping(struct ll_statahead_info *sai)
104 {
105         return !!(sai->sai_thread.t_flags & SVC_STOPPING);
106 }
107
108 static inline int sa_is_stopped(struct ll_statahead_info *sai)
109 {
110         return !!(sai->sai_thread.t_flags & SVC_STOPPED);
111 }
112
113 /**
114  * (1) hit ratio less than 80%
115  * or
116  * (2) consecutive miss more than 8
117  */
118 static inline int sa_low_hit(struct ll_statahead_info *sai)
119 {
120         return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
121                 (sai->sai_consecutive_miss > 8));
122 }
123
124 /**
125  * process the deleted entry's member and free the entry.
126  * (1) release intent
127  * (2) free md_enqueue_info
128  * (3) drop dentry's ref count
129  * (4) release request's ref count
130  */
131 static void ll_sai_entry_cleanup(struct ll_sai_entry *entry, int free)
132 {
133         struct md_enqueue_info *minfo = entry->se_minfo;
134         struct ptlrpc_request  *req = entry->se_req;
135         ENTRY;
136
137         if (minfo) {
138                 entry->se_minfo = NULL;
139                 ll_intent_release(&minfo->mi_it);
140                 dput(minfo->mi_dentry);
141                 iput(minfo->mi_dir);
142                 OBD_FREE_PTR(minfo);
143         }
144         if (req) {
145                 entry->se_req = NULL;
146                 ptlrpc_req_finished(req);
147         }
148         if (free) {
149                 LASSERT(list_empty(&entry->se_list));
150                 OBD_FREE_PTR(entry);
151         }
152
153         EXIT;
154 }
155
156 static struct ll_statahead_info *ll_sai_alloc(void)
157 {
158         struct ll_statahead_info *sai;
159
160         OBD_ALLOC_PTR(sai);
161         if (!sai)
162                 return NULL;
163
164         spin_lock(&sai_generation_lock);
165         sai->sai_generation = ++sai_generation;
166         if (unlikely(sai_generation == 0))
167                 sai->sai_generation = ++sai_generation;
168         spin_unlock(&sai_generation_lock);
169         atomic_set(&sai->sai_refcount, 1);
170         sai->sai_max = LL_SA_RPC_MIN;
171         cfs_waitq_init(&sai->sai_waitq);
172         cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
173         CFS_INIT_LIST_HEAD(&sai->sai_entries_sent);
174         CFS_INIT_LIST_HEAD(&sai->sai_entries_received);
175         CFS_INIT_LIST_HEAD(&sai->sai_entries_stated);
176         return sai;
177 }
178
179 static inline
180 struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
181 {
182         LASSERT(sai);
183         atomic_inc(&sai->sai_refcount);
184         return sai;
185 }
186
187 static void ll_sai_put(struct ll_statahead_info *sai)
188 {
189         struct inode         *inode = sai->sai_inode;
190         struct ll_inode_info *lli;
191         ENTRY;
192
193         LASSERT(inode != NULL);
194         lli = ll_i2info(inode);
195         LASSERT(lli->lli_sai == sai);
196
197         if (atomic_dec_and_test(&sai->sai_refcount)) {
198                 struct ll_sai_entry *entry, *next;
199
200                 spin_lock(&lli->lli_lock);
201                 if (unlikely(atomic_read(&sai->sai_refcount) > 0)) {
202                         /* It is race case, the interpret callback just hold
203                          * a reference count */
204                         spin_unlock(&lli->lli_lock);
205                         EXIT;
206                         return;
207                 }
208
209                 LASSERT(lli->lli_opendir_key == NULL);
210                 lli->lli_sai = NULL;
211                 lli->lli_opendir_pid = 0;
212                 spin_unlock(&lli->lli_lock);
213
214                 LASSERT(sa_is_stopped(sai));
215
216                 if (sai->sai_sent > sai->sai_replied)
217                         CDEBUG(D_READA,"statahead for dir %lu/%u does not "
218                               "finish: [sent:%u] [replied:%u]\n",
219                               inode->i_ino, inode->i_generation,
220                               sai->sai_sent, sai->sai_replied);
221
222                 list_for_each_entry_safe(entry, next, &sai->sai_entries_sent,
223                                          se_list) {
224                         list_del_init(&entry->se_list);
225                         ll_sai_entry_cleanup(entry, 1);
226                 }
227                 list_for_each_entry_safe(entry, next, &sai->sai_entries_received,
228                                          se_list) {
229                         list_del_init(&entry->se_list);
230                         ll_sai_entry_cleanup(entry, 1);
231                 }
232                 list_for_each_entry_safe(entry, next, &sai->sai_entries_stated,
233                                          se_list) {
234                         list_del_init(&entry->se_list);
235                         ll_sai_entry_cleanup(entry, 1);
236                 }
237                 iput(inode);
238                 OBD_FREE_PTR(sai);
239         }
240         EXIT;
241 }
242
243 /**
244  * insert it into sai_entries_sent tail when init.
245  */
246 static struct ll_sai_entry *
247 ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index)
248 {
249         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
250         struct ll_sai_entry  *entry;
251         ENTRY;
252
253         OBD_ALLOC_PTR(entry);
254         if (entry == NULL)
255                 RETURN(ERR_PTR(-ENOMEM));
256
257         CDEBUG(D_READA, "alloc sai entry %p index %u\n",
258                entry, index);
259         entry->se_index = index;
260         entry->se_stat  = SA_ENTRY_UNSTATED;
261
262         spin_lock(&lli->lli_lock);
263         list_add_tail(&entry->se_list, &sai->sai_entries_sent);
264         spin_unlock(&lli->lli_lock);
265
266         RETURN(entry);
267 }
268
269 /**
270  * delete it from sai_entries_stated head when fini, it need not
271  * to process entry's member.
272  */
273 static void ll_sai_entry_fini(struct ll_statahead_info *sai)
274 {
275         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
276         struct ll_sai_entry  *entry;
277         ENTRY;
278
279         spin_lock(&lli->lli_lock);
280         sai->sai_index_next++;
281         if (likely(!list_empty(&sai->sai_entries_stated))) {
282                 entry = list_entry(sai->sai_entries_stated.next,
283                                    struct ll_sai_entry, se_list);
284                 if (entry->se_index < sai->sai_index_next) {
285                         list_del(&entry->se_list);
286                         OBD_FREE_PTR(entry);
287                 }
288         } else
289                 LASSERT(sa_is_stopped(sai));
290         spin_unlock(&lli->lli_lock);
291
292         EXIT;
293 }
294
295 /**
296  * inside lli_lock.
297  * \retval NULL : can not find the entry in sai_entries_sent with the index
298  * \retval entry: find the entry in sai_entries_sent with the index
299  */
300 static struct ll_sai_entry *
301 ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat,
302                  struct ptlrpc_request *req, struct md_enqueue_info *minfo)
303 {
304         struct ll_sai_entry *entry;
305         ENTRY;
306
307         if (!list_empty(&sai->sai_entries_sent)) {
308                 list_for_each_entry(entry, &sai->sai_entries_sent, se_list) {
309                         if (entry->se_index == index) {
310                                 entry->se_stat = stat;
311                                 entry->se_req = ptlrpc_request_addref(req);
312                                 entry->se_minfo = minfo;
313                                 RETURN(entry);
314                         } else if (entry->se_index > index)
315                                 RETURN(NULL);
316                 }
317         }
318         RETURN(NULL);
319 }
320
321 /**
322  * inside lli_lock.
323  * Move entry to sai_entries_received and
324  * insert it into sai_entries_received tail.
325  */
326 static inline void
327 ll_sai_entry_to_received(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
328 {
329         if (!list_empty(&entry->se_list))
330                 list_del_init(&entry->se_list);
331         list_add_tail(&entry->se_list, &sai->sai_entries_received);
332 }
333
334 /**
335  * Move entry to sai_entries_stated and
336  * sort with the index.
337  */
338 static int
339 ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
340 {
341         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
342         struct ll_sai_entry  *se;
343         ENTRY;
344
345         ll_sai_entry_cleanup(entry, 0);
346
347         spin_lock(&lli->lli_lock);
348         if (!list_empty(&entry->se_list))
349                 list_del_init(&entry->se_list);
350
351         if (unlikely(entry->se_index < sai->sai_index_next)) {
352                 spin_unlock(&lli->lli_lock);
353                 OBD_FREE_PTR(entry);
354                 RETURN(0);
355         }
356
357         list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) {
358                 if (se->se_index < entry->se_index) {
359                         list_add(&entry->se_list, &se->se_list);
360                         spin_unlock(&lli->lli_lock);
361                         RETURN(1);
362                 }
363         }
364
365         /*
366          * I am the first entry.
367          */
368         list_add(&entry->se_list, &sai->sai_entries_stated);
369         spin_unlock(&lli->lli_lock);
370         RETURN(1);
371 }
372
373 /**
374  * finish lookup/revalidate.
375  */
376 static int do_statahead_interpret(struct ll_statahead_info *sai)
377 {
378         struct ll_inode_info   *lli = ll_i2info(sai->sai_inode);
379         struct ll_sai_entry    *entry;
380         struct ptlrpc_request  *req;
381         struct md_enqueue_info *minfo;
382         struct lookup_intent   *it;
383         struct dentry          *dentry;
384         int                     rc = 0;
385         ENTRY;
386
387         spin_lock(&lli->lli_lock);
388         LASSERT(!sa_received_empty(sai));
389         entry = list_entry(sai->sai_entries_received.next, struct ll_sai_entry,
390                            se_list);
391         list_del_init(&entry->se_list);
392         spin_unlock(&lli->lli_lock);
393
394         if (unlikely(entry->se_index < sai->sai_index_next)) {
395                 CWARN("Found stale entry: [index %u] [next %u]\n",
396                       entry->se_index, sai->sai_index_next);
397                 ll_sai_entry_cleanup(entry, 1);
398                 RETURN(0);
399         }
400
401         if (entry->se_stat != SA_ENTRY_STATED)
402                 GOTO(out, rc = entry->se_stat);
403
404         req = entry->se_req;
405         minfo = entry->se_minfo;
406         it = &minfo->mi_it;
407         dentry = minfo->mi_dentry;
408
409         if (dentry->d_inode == NULL) {
410                 /*
411                  * lookup.
412                  */
413                 struct dentry    *save = dentry;
414                 struct it_cb_data icbd = {
415                         .icbd_parent   = minfo->mi_dir,
416                         .icbd_childp   = &dentry
417                 };
418
419                 rc = lookup_it_finish(req, DLM_REPLY_REC_OFF, it, &icbd);
420                 if (!rc)
421                         /*
422                          * Here dentry->d_inode might be NULL,
423                          * because the entry may have been removed before
424                          * we start doing stat ahead.
425                          */
426                         ll_lookup_finish_locks(it, dentry);
427
428                 if (dentry != save) {
429                         minfo->mi_dentry = dentry;
430                         dput(save);
431                 }
432         } else {
433                 /*
434                  * revalidate.
435                  */
436                 struct mds_body *body;
437
438                 body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
439                                       sizeof(*body));
440                 if (memcmp(&minfo->mi_data.fid2, &body->fid1,
441                            sizeof(body->fid1))) {
442                         ll_unhash_aliases(dentry->d_inode);
443                         GOTO(out, rc = -EAGAIN);
444                 }
445
446                 rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, dentry);
447                 if (rc) {
448                         ll_unhash_aliases(dentry->d_inode);
449                         GOTO(out, rc);
450                 }
451
452                 spin_lock(&ll_lookup_lock);
453                 spin_lock(&dcache_lock);
454                 lock_dentry(dentry);
455                 __d_drop(dentry);
456                 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
457                 unlock_dentry(dentry);
458                 d_rehash_cond(dentry, 0);
459                 spin_unlock(&dcache_lock);
460                 spin_unlock(&ll_lookup_lock);
461
462                 ll_lookup_finish_locks(it, dentry);
463         }
464         EXIT;
465
466 out:
467         if (likely(ll_sai_entry_to_stated(sai, entry)))
468                 cfs_waitq_signal(&sai->sai_waitq);
469         return rc;
470 }
471
472 static int ll_statahead_interpret(struct obd_export *exp,
473                                   struct ptlrpc_request *req,
474                                   struct md_enqueue_info *minfo,
475                                   int rc)
476 {
477         struct lookup_intent     *it = &minfo->mi_it;
478         struct dentry            *dentry = minfo->mi_dentry;
479         struct inode             *dir = minfo->mi_dir;
480         struct ll_inode_info     *lli = ll_i2info(dir);
481         struct ll_statahead_info *sai;
482         struct ll_sai_entry      *entry;
483         ENTRY;
484
485         CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
486                dentry->d_name.len, dentry->d_name.name, rc);
487
488         spin_lock(&lli->lli_lock);
489         if (unlikely(lli->lli_sai == NULL ||
490                      lli->lli_sai->sai_generation != minfo->mi_generation)) {
491                 spin_unlock(&lli->lli_lock);
492                 ll_intent_release(it);
493                 dput(dentry);
494                 iput(dir);
495                 OBD_FREE_PTR(minfo);
496                 RETURN(-ESTALE);
497         } else {
498                 sai = ll_sai_get(lli->lli_sai);
499                 entry = ll_sai_entry_set(sai,
500                                          (unsigned int)(long)minfo->mi_cbdata,
501                                          rc ? SA_ENTRY_UNSTATED :
502                                          SA_ENTRY_STATED, req, minfo);
503                 LASSERT(entry != NULL);
504                 if (likely(sa_is_running(sai))) {
505                         ll_sai_entry_to_received(sai, entry);
506                         sai->sai_replied++;
507                         spin_unlock(&lli->lli_lock);
508                         cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
509                 } else {
510                         if (!list_empty(&entry->se_list))
511                                 list_del_init(&entry->se_list);
512                         sai->sai_replied++;
513                         spin_unlock(&lli->lli_lock);
514                         ll_sai_entry_cleanup(entry, 1);
515                 }
516                 ll_sai_put(sai);
517                 RETURN(rc);
518         }
519 }
520
521 static void sa_args_fini(struct md_enqueue_info *minfo,
522                          struct ldlm_enqueue_info *einfo)
523 {
524         LASSERT(minfo && einfo);
525         iput(minfo->mi_dir);
526         OBD_FREE_PTR(minfo);
527         OBD_FREE_PTR(einfo);
528 }
529
530 static int sa_args_prep(struct inode *dir, struct dentry *dentry,
531                         struct md_enqueue_info **pmi,
532                         struct ldlm_enqueue_info **pei)
533 {
534         struct ll_inode_info     *lli = ll_i2info(dir);
535         struct md_enqueue_info   *minfo;
536         struct ldlm_enqueue_info *einfo;
537
538         OBD_ALLOC_PTR(einfo);
539         if (einfo == NULL)
540                 return -ENOMEM;
541
542         OBD_ALLOC_PTR(minfo);
543         if (minfo == NULL) {
544                 OBD_FREE_PTR(einfo);
545                 return -ENOMEM;
546         }
547
548         minfo->mi_it.it_op = IT_GETATTR;
549         minfo->mi_dentry = dentry;
550         minfo->mi_dir = igrab(dir);
551         minfo->mi_cb = ll_statahead_interpret;
552         minfo->mi_generation = lli->lli_sai->sai_generation;
553         minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index;
554
555         einfo->ei_type   = LDLM_IBITS;
556         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
557         einfo->ei_cb_bl  = ll_mdc_blocking_ast;
558         einfo->ei_cb_cp  = ldlm_completion_ast;
559         einfo->ei_cb_gl  = NULL;
560         einfo->ei_cbdata = NULL;
561
562         *pmi = minfo;
563         *pei = einfo;
564
565         return 0;
566 }
567
568 /**
569  * similar to ll_lookup_it().
570  */
571 static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
572 {
573         struct md_enqueue_info   *minfo;
574         struct ldlm_enqueue_info *einfo;
575         int                       rc;
576         ENTRY;
577
578         rc = sa_args_prep(dir, dentry, &minfo, &einfo);
579         if (rc)
580                 RETURN(rc);
581
582         rc = ll_prepare_mdc_op_data(&minfo->mi_data, dir, NULL,
583                                     dentry->d_name.name, dentry->d_name.len, 0,
584                                     NULL);
585         if (rc == 0)
586                 rc = mdc_intent_getattr_async(ll_i2mdcexp(dir), minfo, einfo);
587
588         if (rc)
589                 sa_args_fini(minfo, einfo);
590
591         RETURN(rc);
592 }
593
594 /**
595  * similar to ll_revalidate_it().
596  * \retval      1 -- dentry valid
597  * \retval      0 -- will send stat-ahead request
598  * \retval others -- prepare stat-ahead request failed
599  */
600 static int do_sa_revalidate(struct inode *dir, struct dentry *dentry)
601 {
602         struct inode             *inode = dentry->d_inode;
603         struct ll_fid             fid;
604         struct lookup_intent      it = { .it_op = IT_GETATTR };
605         struct md_enqueue_info   *minfo;
606         struct ldlm_enqueue_info *einfo;
607         int rc;
608         ENTRY;
609
610         if (inode == NULL)
611                 RETURN(1);
612
613         if (d_mountpoint(dentry))
614                 RETURN(1);
615
616         if (dentry == dentry->d_sb->s_root)
617                 RETURN(1);
618
619         ll_inode2fid(&fid, inode);
620
621         rc = mdc_revalidate_lock(ll_i2mdcexp(dir), &it, &fid);
622         if (rc == 1) {
623                 ll_intent_release(&it);
624                 RETURN(1);
625         }
626
627         rc = sa_args_prep(dir, dentry, &minfo, &einfo);
628         if (rc)
629                 RETURN(rc);
630
631         rc = ll_prepare_mdc_op_data(&minfo->mi_data, dir,
632                                     inode, dentry->d_name.name,
633                                     dentry->d_name.len, 0, NULL);
634         if (rc == 0)
635                 rc = mdc_intent_getattr_async(ll_i2mdcexp(dir), minfo, einfo);
636
637         if (rc)
638                 sa_args_fini(minfo, einfo);
639
640         RETURN(rc);
641 }
642
643 static inline void ll_name2qstr(struct qstr *q, const char *name, int namelen)
644 {
645         q->name = name;
646         q->len  = namelen;
647         q->hash = full_name_hash(name, namelen);
648 }
649
650 static int ll_statahead_one(struct dentry *parent, struct ll_dir_entry *de)
651 {
652         struct inode             *dir = parent->d_inode;
653         struct ll_inode_info     *lli = ll_i2info(dir);
654         struct ll_statahead_info *sai = lli->lli_sai;
655         struct qstr               name;
656         struct dentry            *dentry;
657         struct ll_sai_entry      *se;
658         int                       rc;
659         ENTRY;
660
661         if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
662                 CDEBUG(D_READA, "parent dentry@%p %.*s is "
663                        "invalid, skip statahead\n",
664                        parent, parent->d_name.len, parent->d_name.name);
665                 RETURN(-EINVAL);
666         }
667
668         se = ll_sai_entry_init(sai, sai->sai_index);
669         if (IS_ERR(se))
670                 RETURN(PTR_ERR(se));
671
672         ll_name2qstr(&name, de->lde_name, de->lde_name_len);
673         dentry = d_lookup(parent, &name);
674         if (!dentry) {
675                 dentry = d_alloc(parent, &name);
676                 if (dentry) {
677                         rc = do_sa_lookup(dir, dentry);
678                         if (rc)
679                                 dput(dentry);
680                 } else {
681                         GOTO(out, rc = -ENOMEM);
682                 }
683         } else {
684                 rc = do_sa_revalidate(dir, dentry);
685                 if (rc)
686                         dput(dentry);
687         }
688
689         EXIT;
690
691 out:
692         if (rc) {
693                 CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
694                        se, se->se_index, se->se_stat, rc);
695                 se->se_stat = rc;
696                 if (ll_sai_entry_to_stated(sai, se))
697                         cfs_waitq_signal(&sai->sai_waitq);
698         } else {
699                 sai->sai_sent++;
700         }
701
702         sai->sai_index++;
703         return rc;
704 }
705
706 static int ll_statahead_thread(void *arg)
707 {
708         struct dentry            *parent = (struct dentry *)arg;
709         struct inode             *dir = parent->d_inode;
710         struct ll_inode_info     *lli = ll_i2info(dir);
711         struct ll_sb_info        *sbi = ll_i2sbi(dir);
712         struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
713         struct ptlrpc_thread     *thread = &sai->sai_thread;
714         unsigned long             index = 0;
715         int                       first = 0;
716         int                       rc = 0;
717         ENTRY;
718
719         {
720                 char pname[16];
721                 snprintf(pname, 15, "ll_sa_%u", lli->lli_opendir_pid);
722                 cfs_daemonize(pname);
723         }
724
725         sbi->ll_sa_total++;
726         spin_lock(&lli->lli_lock);
727         thread->t_flags = SVC_RUNNING;
728         spin_unlock(&lli->lli_lock);
729         cfs_waitq_signal(&thread->t_ctl_waitq);
730         CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
731
732         while (1) {
733                 struct l_wait_info lwi = { 0 };
734                 unsigned long npages;
735                 char *kaddr, *limit;
736                 struct ll_dir_entry *de;
737                 struct page *page;
738
739                 npages = dir_pages(dir);
740                 /*
741                  * reach the end of dir.
742                  */
743                 if (index >= npages) {
744                         CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
745                                index, npages);
746
747                         while (1) {
748                                 l_wait_event(thread->t_ctl_waitq,
749                                              !sa_is_running(sai) ||
750                                              !sa_received_empty(sai) ||
751                                              sai->sai_sent == sai->sai_replied,
752                                              &lwi);
753                                 if (!sa_received_empty(sai) &&
754                                     sa_is_running(sai))
755                                         do_statahead_interpret(sai);
756                                 else
757                                         GOTO(out, rc);
758                         }
759                 }
760
761                 page = ll_get_dir_page(dir, index);
762                 if (IS_ERR(page)) {
763                         rc = PTR_ERR(page);
764                         CERROR("error reading dir %lu/%u page %lu/%u: rc %d\n",
765                                dir->i_ino, dir->i_generation, index,
766                                sai->sai_index, rc);
767                         break;
768                 }
769
770                 kaddr = page_address(page);
771                 limit = kaddr + CFS_PAGE_SIZE - ll_dir_rec_len(1);
772                 de = (struct ll_dir_entry *)kaddr;
773                 if (!index) {
774                         /*
775                          * skip "."
776                          */
777                         de = ll_dir_next_entry(de);
778                         /*
779                          * skip ".."
780                          */
781                         de = ll_dir_next_entry(de);
782                 }
783
784                 for (; (char*)de <= limit; de = ll_dir_next_entry(de)) {
785                         if (de->lde_inode == 0)
786                                 continue;
787
788                         if (de->lde_name[0] == '.' && !sai->sai_ls_all) {
789                                 /*
790                                  * skip hidden files..
791                                  */
792                                 sai->sai_skip_hidden++;
793                                 continue;
794                         }
795
796                         /*
797                          * don't stat-ahead first entry.
798                          */
799                         if (unlikely(!first)) {
800                                 first++;
801                                 continue;
802                         }
803
804 keep_de:
805                         l_wait_event(thread->t_ctl_waitq,
806                                      !sa_is_running(sai) || sa_not_full(sai) ||
807                                      !sa_received_empty(sai),
808                                      &lwi);
809
810                         while (!sa_received_empty(sai) && sa_is_running(sai))
811                                 do_statahead_interpret(sai);
812
813                         if (unlikely(!sa_is_running(sai))) {
814                                 ll_put_page(page);
815                                 GOTO(out, rc);
816                         }
817
818                         if (!sa_not_full(sai))
819                                 /*
820                                  * do not skip the current de.
821                                  */
822                                 goto keep_de;
823
824                         rc = ll_statahead_one(parent, de);
825                         if (rc < 0) {
826                                 ll_put_page(page);
827                                 GOTO(out, rc);
828                         }
829                 }
830                 ll_put_page(page);
831                 index++;
832         }
833         EXIT;
834
835 out:
836         spin_lock(&lli->lli_lock);
837         thread->t_flags = SVC_STOPPED;
838         spin_unlock(&lli->lli_lock);
839         cfs_waitq_signal(&sai->sai_waitq);
840         cfs_waitq_signal(&thread->t_ctl_waitq);
841         ll_sai_put(sai);
842         dput(parent);
843         CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
844                cfs_curproc_pid());
845         return rc;
846 }
847
848 /**
849  * called in ll_file_release().
850  */
851 void ll_stop_statahead(struct inode *inode, void *key)
852 {
853         struct ll_inode_info *lli = ll_i2info(inode);
854
855         if (unlikely(key == NULL))
856                 return;
857
858         spin_lock(&lli->lli_lock);
859         if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) {
860                 spin_unlock(&lli->lli_lock);
861                 return;
862         }
863
864         lli->lli_opendir_key = NULL;
865
866         if (lli->lli_sai) {
867                 struct l_wait_info lwi = { 0 };
868                 struct ptlrpc_thread *thread = &lli->lli_sai->sai_thread;
869
870                 if (!sa_is_stopped(lli->lli_sai)) {
871                         thread->t_flags = SVC_STOPPING;
872                         spin_unlock(&lli->lli_lock);
873                         cfs_waitq_signal(&thread->t_ctl_waitq);
874
875                         CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
876                                cfs_curproc_pid());
877                         l_wait_event(thread->t_ctl_waitq,
878                                      sa_is_stopped(lli->lli_sai),
879                                      &lwi);
880                 } else {
881                         spin_unlock(&lli->lli_lock);
882                 }
883
884                 /*
885                  * Put the ref which was held when first statahead_enter.
886                  * It maybe not the last ref for some statahead requests
887                  * maybe inflight.
888                  */
889                 ll_sai_put(lli->lli_sai);
890         } else {
891                 lli->lli_opendir_pid = 0;
892                 spin_unlock(&lli->lli_lock);
893         }
894 }
895
896 enum {
897         /*
898          * not first dirent, or is "."
899          */
900         LS_NONE_FIRST_DE = 0,
901         /*
902          * the first non-hidden dirent
903          */
904         LS_FIRST_DE,
905         /*
906          * the first hidden dirent, that is ".xxx
907          */
908         LS_FIRST_DOT_DE
909 };
910
911 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
912 {
913         struct qstr         *d_name = &dentry->d_name;
914         unsigned long        npages, index = 0;
915         struct page         *page;
916         struct ll_dir_entry *de;
917         char                *kaddr, *limit;
918         int                  rc = LS_NONE_FIRST_DE, dot_de;
919         ENTRY;
920
921         while (1) {
922                 npages = dir_pages(dir);
923                 /*
924                  * reach the end of dir.
925                  */
926                 if (index >= npages) {
927                         CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n",
928                                index, npages);
929                         break;
930                 }
931
932                 page = ll_get_dir_page(dir, index);
933                 if (IS_ERR(page)) {
934                         rc = PTR_ERR(page);
935                         CERROR("error reading dir %lu/%u page %lu: rc %d\n",
936                                dir->i_ino, dir->i_generation, index, rc);
937                         break;
938                 }
939
940                 kaddr = page_address(page);
941                 limit = kaddr + CFS_PAGE_SIZE - ll_dir_rec_len(1);
942                 de = (struct ll_dir_entry *)kaddr;
943                 if (!index) {
944                         if (unlikely(!(de->lde_name_len == 1 &&
945                                        strncmp(de->lde_name, ".", 1) == 0)))
946                                 CWARN("Maybe got bad on-disk dir: %lu/%u\n",
947                                       dir->i_ino, dir->i_generation);
948                         /*
949                          * skip "." or ingore bad entry.
950                          */
951                         de = ll_dir_next_entry(de);
952
953                         if (unlikely(!(de->lde_name_len == 2 &&
954                                        strncmp(de->lde_name, "..", 2) == 0)))
955                                 CWARN("Maybe got bad on-disk dir: %lu/%u\n",
956                                       dir->i_ino, dir->i_generation);
957                         /*
958                          * skip ".." or ingore bad entry.
959                          */
960                         de = ll_dir_next_entry(de);
961                 }
962
963                 for (; (char*)de <= limit; de = ll_dir_next_entry(de)) {
964                         if (!de->lde_inode)
965                                 continue;
966
967                         if (de->lde_name[0] == '.')
968                                 dot_de = 1;
969                         else
970                                 dot_de = 0;
971
972                         if (dot_de && d_name->name[0] != '.') {
973                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
974                                        d_name->len, d_name->name,
975                                        de->lde_name_len, de->lde_name);
976                                 continue;
977                         }
978
979                         if (d_name->len != de->lde_name_len ||
980                             strncmp(d_name->name, de->lde_name, d_name->len) != 0)
981                                 rc = LS_NONE_FIRST_DE;
982                         else if (!dot_de)
983                                 rc = LS_FIRST_DE;
984                         else
985                                 rc = LS_FIRST_DOT_DE;
986
987                         ll_put_page(page);
988                         RETURN(rc);
989                 }
990                 ll_put_page(page);
991                 index++;
992         }
993         RETURN(rc);
994 }
995
996 static int trigger_statahead(struct inode *dir, struct dentry **dentryp)
997 {
998         struct ll_inode_info     *lli = ll_i2info(dir);
999         struct l_wait_info        lwi = { 0 };
1000         struct ll_statahead_info *sai;
1001         struct dentry            *parent;
1002         int                       rc;
1003         ENTRY;
1004
1005          /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1006         rc = is_first_dirent(dir, *dentryp);
1007         if (rc == LS_NONE_FIRST_DE)
1008                 /* It is not "ls -{a}l" operation, no need statahead for it. */
1009                 GOTO(out, rc = -EAGAIN);
1010
1011         sai = ll_sai_alloc();
1012         if (sai == NULL)
1013                 GOTO(out, rc = -ENOMEM);
1014
1015         sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
1016         sai->sai_inode = igrab(dir);
1017         if (unlikely(sai->sai_inode == NULL)) {
1018                 CWARN("Do not start stat ahead on dying inode %lu/%u.\n",
1019                       dir->i_ino, dir->i_generation);
1020                 OBD_FREE_PTR(sai);
1021                 GOTO(out, rc = -ESTALE);
1022         }
1023
1024         /* get parent reference count here, and put it in ll_statahead_thread */
1025         parent = dget((*dentryp)->d_parent);
1026         if (unlikely(sai->sai_inode != parent->d_inode)) {
1027                 CWARN("Race condition, someone changed %.*s just now: "
1028                       "old parent "DFID", new parent "DFID" .\n",
1029                       (*dentryp)->d_name.len, (*dentryp)->d_name.name,
1030                       PFID(ll_inode_lu_fid(dir)),
1031                       PFID(ll_inode_lu_fid(parent->d_inode)));
1032                 dput(parent);
1033                 iput(sai->sai_inode);
1034                 OBD_FREE_PTR(sai);
1035                 RETURN(-EAGAIN);
1036         }
1037
1038         lli->lli_sai = sai;
1039         rc = cfs_kernel_thread(ll_statahead_thread, parent, 0);
1040         if (rc < 0) {
1041                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1042                 dput(parent);
1043                 lli->lli_opendir_key = NULL;
1044                 sai->sai_thread.t_flags = SVC_STOPPED;
1045                 ll_sai_put(sai);
1046                 LASSERT(lli->lli_sai == NULL);
1047                 RETURN(-EAGAIN);
1048         }
1049
1050         l_wait_event(sai->sai_thread.t_ctl_waitq,
1051                      sa_is_running(sai) || sa_is_stopped(sai), &lwi);
1052
1053         /* We don't stat-ahead for the first dirent since we are already in
1054          * lookup, and -EEXIST also indicates that this is the first dirent. */
1055         RETURN(-EEXIST);
1056
1057 out:
1058         spin_lock(&lli->lli_lock);
1059         lli->lli_opendir_key = NULL;
1060         lli->lli_opendir_pid = 0;
1061         spin_unlock(&lli->lli_lock);
1062         return rc;
1063 }
1064
1065 /**
1066  * Start statahead thread if this is the first dir entry.
1067  * Otherwise if a thread is started already, wait it until it is ahead of me.
1068  * \retval 0       -- stat ahead thread process such dentry, for lookup, it miss
1069  * \retval 1       -- stat ahead thread process such dentry, for lookup, it hit
1070  * \retval -EEXIST -- stat ahead thread started, and this is the first dentry
1071  * \retval -EBADFD -- statahead thread exit and not dentry available
1072  * \retval -EAGAIN -- try to stat by caller
1073  * \retval others  -- error
1074  */
1075 int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
1076 {
1077         struct ll_inode_info     *lli = ll_i2info(dir);
1078         struct ll_statahead_info *sai;
1079         struct ll_sb_info        *sbi;
1080         int                       rc  = 0;
1081         ENTRY;
1082
1083         spin_lock(&lli->lli_lock);
1084         if (unlikely(lli->lli_opendir_pid != cfs_curproc_pid())) {
1085                 spin_unlock(&lli->lli_lock);
1086                 RETURN(-EAGAIN);
1087         }
1088
1089         if (likely(lli->lli_sai)) {
1090                 sai = ll_sai_get(lli->lli_sai);
1091                 spin_unlock(&lli->lli_lock);
1092         } else {
1093                 spin_unlock(&lli->lli_lock);
1094                 RETURN(trigger_statahead(dir, dentryp));
1095         }
1096
1097         if (unlikely(sa_is_stopped(sai) &&
1098                      list_empty(&sai->sai_entries_stated)))
1099                 GOTO(out, rc = -EBADFD);
1100
1101         if ((*dentryp)->d_name.name[0] == '.') {
1102                 if (likely(sai->sai_ls_all ||
1103                            sai->sai_miss_hidden >= sai->sai_skip_hidden)) {
1104                         /* Hidden dentry is the first one, or statahead thread
1105                          * does not skip so many hidden dentries before
1106                          * "sai_ls_all" enabled as below. */
1107                 } else {
1108                         if (!sai->sai_ls_all)
1109                                 /* It maybe because hidden dentry is not the
1110                                  * first one, "sai_ls_all" was not set, then
1111                                  * "ls -al" missed. Enable "sai_ls_all" for
1112                                  * such case. */
1113                                 sai->sai_ls_all = 1;
1114
1115                         /* Such "getattr" has been skipped before "sai_ls_all"
1116                          * enabled as above. */
1117                         sai->sai_miss_hidden++;
1118                         GOTO(out, rc = -ENOENT);
1119                 }
1120         }
1121
1122         sbi = ll_i2sbi(dir);
1123         if (ll_sai_entry_stated(sai)) {
1124                 sbi->ll_sa_cached++;
1125         } else {
1126                 struct l_wait_info lwi =LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1127
1128                 sbi->ll_sa_blocked++;
1129                 /* thread started already, avoid double-stat. */
1130                 rc = l_wait_event(sai->sai_waitq,
1131                                   ll_sai_entry_stated(sai) || sa_is_stopped(sai),
1132                                   &lwi);
1133         }
1134
1135         if (lookup) {
1136                 struct dentry *result;
1137
1138                 result = d_lookup((*dentryp)->d_parent, &(*dentryp)->d_name);
1139                 if (result) {
1140                         LASSERT(result != *dentryp);
1141                         /* BUG 16303: do not drop reference count for "*dentryp",
1142                          * VFS will do that by itself. */
1143                         *dentryp = result;
1144                         GOTO(out, rc = 1);
1145                 }
1146         }
1147         /* do nothing for revalidate. */
1148         EXIT;
1149
1150 out:
1151         ll_sai_put(sai);
1152         return rc;
1153 }
1154
1155 /**
1156  * update hit/miss count.
1157  */
1158 void ll_statahead_exit(struct inode *dir, struct dentry *dentry, int result)
1159 {
1160         struct ll_inode_info     *lli = ll_i2info(dir);
1161         struct ll_statahead_info *sai;
1162         struct ll_sb_info        *sbi;
1163         struct ll_dentry_data    *ldd = ll_d2d(dentry);
1164         ENTRY;
1165
1166         spin_lock(&lli->lli_lock);
1167         if (unlikely(lli->lli_opendir_pid != cfs_curproc_pid())) {
1168                 spin_unlock(&lli->lli_lock);
1169                 EXIT;
1170                 return;
1171         } else {
1172                 sai = ll_sai_get(lli->lli_sai);
1173                 spin_unlock(&lli->lli_lock);
1174         }
1175         sbi = ll_i2sbi(dir);
1176
1177         if (result >= 1) {
1178                 sbi->ll_sa_hit++;
1179                 sai->sai_hit++;
1180                 sai->sai_consecutive_miss = 0;
1181                 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
1182         } else {
1183                 sbi->ll_sa_miss++;
1184                 sai->sai_miss++;
1185                 sai->sai_consecutive_miss++;
1186                 if (sa_low_hit(sai) && sa_is_running(sai)) {
1187                         sbi->ll_sa_wrong++;
1188                         CDEBUG(D_READA, "Statahead for dir "DFID" hit ratio "
1189                                "too low: hit/miss %u/%u, sent/replied %u/%u, "
1190                                "stopping statahead thread: pid %d\n",
1191                                PFID(ll_inode_lu_fid(dir)), sai->sai_hit,
1192                                sai->sai_miss, sai->sai_sent,
1193                                sai->sai_replied, cfs_curproc_pid());
1194                         spin_lock(&lli->lli_lock);
1195                         if (!sa_is_stopped(sai))
1196                                 sai->sai_thread.t_flags = SVC_STOPPING;
1197                         spin_unlock(&lli->lli_lock);
1198                 }
1199         }
1200
1201         if (!sa_is_stopped(sai))
1202                 cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
1203         ll_sai_entry_fini(sai);
1204         if (likely(ldd != NULL))
1205                 ldd->lld_sa_generation = sai->sai_generation;
1206
1207         ll_sai_put(sai);
1208         EXIT;
1209 }