Whamcloud - gitweb
e3f0662febc01ca7aee8cc635cfa89266344e9b1
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/mm.h>
40 #include <linux/smp_lock.h>
41 #include <linux/highmem.h>
42 #include <linux/pagemap.h>
43
44 #define DEBUG_SUBSYSTEM S_LLITE
45
46 #include <obd_support.h>
47 #include <lustre_lite.h>
48 #include <lustre_dlm.h>
49 #include <linux/lustre_version.h>
50 #include "llite_internal.h"
51
52 struct ll_sai_entry {
53         struct list_head        se_list;
54         unsigned int            se_index;
55         int                     se_stat;
56         struct ptlrpc_request  *se_req;
57         struct md_enqueue_info *se_minfo;
58 };
59
60 enum {
61         SA_ENTRY_UNSTATED = 0,
62         SA_ENTRY_STATED
63 };
64
65 struct dentry_operations ll_sai_d_ops = {
66         .d_release = ll_release,
67 };
68
69 static unsigned int sai_generation = 0;
70 static spinlock_t sai_generation_lock = SPIN_LOCK_UNLOCKED;
71
72 /**
73  * Check whether first entry was stated already or not.
74  * No need to hold lli_lock, for:
75  * (1) it is me that remove entry from the list
76  * (2) the statahead thread only add new entry to the list
77  */
78 static int ll_sai_entry_stated(struct ll_statahead_info *sai)
79 {
80         struct ll_sai_entry  *entry;
81         int                   rc = 0;
82
83         if (!list_empty(&sai->sai_entries_stated)) {
84                 entry = list_entry(sai->sai_entries_stated.next,
85                                    struct ll_sai_entry, se_list);
86                 if (entry->se_index == sai->sai_index_next)
87                         rc = 1;
88         }
89         return rc;
90 }
91
92 static inline int sa_received_empty(struct ll_statahead_info *sai)
93 {
94         return list_empty(&sai->sai_entries_received);
95 }
96
97 static inline int sa_not_full(struct ll_statahead_info *sai)
98 {
99         return sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max;
100 }
101
102 static inline int sa_is_running(struct ll_statahead_info *sai)
103 {
104         return !!(sai->sai_thread.t_flags & SVC_RUNNING);
105 }
106
107 static inline int sa_is_stopping(struct ll_statahead_info *sai)
108 {
109         return !!(sai->sai_thread.t_flags & SVC_STOPPING);
110 }
111
112 static inline int sa_is_stopped(struct ll_statahead_info *sai)
113 {
114         return !!(sai->sai_thread.t_flags & SVC_STOPPED);
115 }
116
117 /**
118  * (1) hit ratio less than 80%
119  * or
120  * (2) consecutive miss more than 8
121  */
122 static inline int sa_low_hit(struct ll_statahead_info *sai)
123 {
124         return ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
125                 (sai->sai_consecutive_miss > 8));
126 }
127
128 /**
129  * process the deleted entry's member and free the entry.
130  * (1) release intent
131  * (2) free md_enqueue_info
132  * (3) drop dentry's ref count
133  * (4) release request's ref count
134  */
135 static void ll_sai_entry_cleanup(struct ll_sai_entry *entry)
136 {
137         struct ptlrpc_request  *req = entry->se_req;
138         struct md_enqueue_info *minfo = entry->se_minfo;
139         ENTRY;
140
141         if (minfo) {
142                 struct dentry        *dentry = minfo->mi_dentry;
143                 struct lookup_intent *it = &minfo->mi_it;
144
145                 entry->se_minfo = NULL;
146                 ll_intent_release(it);
147                 OBD_FREE_PTR(minfo);
148                 dput(dentry);
149         }
150         if (req) {
151                 entry->se_req = NULL;
152                 ptlrpc_req_finished(req);
153         }
154         OBD_FREE_PTR(entry);
155
156         EXIT;
157 }
158
159 static struct ll_statahead_info *ll_sai_alloc(void)
160 {
161         struct ll_statahead_info *sai;
162
163         OBD_ALLOC_PTR(sai);
164         if (!sai)
165                 return NULL;
166
167         spin_lock(&sai_generation_lock);
168         sai->sai_generation = ++sai_generation;
169         if (unlikely(sai_generation == 0))
170                 sai->sai_generation = ++sai_generation;
171         spin_unlock(&sai_generation_lock);
172         atomic_set(&sai->sai_refcount, 1);
173         sai->sai_max = LL_SA_RPC_MIN;
174         cfs_waitq_init(&sai->sai_waitq);
175         cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
176         CFS_INIT_LIST_HEAD(&sai->sai_entries_sent);
177         CFS_INIT_LIST_HEAD(&sai->sai_entries_received);
178         CFS_INIT_LIST_HEAD(&sai->sai_entries_stated);
179         return sai;
180 }
181
182 static inline 
183 struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
184 {
185         LASSERT(sai);
186         atomic_inc(&sai->sai_refcount);
187         return sai;
188 }
189
190 static void ll_sai_put(struct ll_statahead_info *sai)
191 {
192         struct inode         *inode = sai->sai_inode;
193         struct ll_inode_info *lli = ll_i2info(inode);
194         ENTRY;
195
196         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_lock)) {
197                 struct ll_sai_entry *entry, *next;
198
199                 lli->lli_sai = NULL;
200                 spin_unlock(&lli->lli_lock);
201
202                 LASSERT(sa_is_stopped(sai));
203
204                 if (sai->sai_sent > sai->sai_replied)
205                         CDEBUG(D_READA,"statahead for dir "DFID" does not "
206                               "finish: [sent:%u] [replied:%u]\n",
207                               PFID(&lli->lli_fid),
208                               sai->sai_sent, sai->sai_replied);
209
210                 list_for_each_entry_safe(entry, next, &sai->sai_entries_sent,
211                                          se_list) {
212                         list_del(&entry->se_list);
213                         ll_sai_entry_cleanup(entry);
214                 }
215                 list_for_each_entry_safe(entry, next, &sai->sai_entries_received,
216                                          se_list) {
217                         list_del(&entry->se_list);
218                         ll_sai_entry_cleanup(entry);
219                 }
220                 list_for_each_entry_safe(entry, next, &sai->sai_entries_stated,
221                                          se_list) {
222                         list_del(&entry->se_list);
223                         ll_sai_entry_cleanup(entry);
224                 }
225                 dput(sai->sai_first);
226                 OBD_FREE_PTR(sai);
227                 iput(inode);
228         }
229         EXIT;
230 }
231
232 /**
233  * insert it into sai_entries_sent tail when init.
234  */
235 static struct ll_sai_entry *
236 ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index)
237 {
238         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
239         struct ll_sai_entry  *entry;
240         ENTRY;
241
242         OBD_ALLOC_PTR(entry);
243         if (entry == NULL)
244                 RETURN(ERR_PTR(-ENOMEM));
245
246         CDEBUG(D_READA, "alloc sai entry %p index %u\n",
247                entry, index);
248         entry->se_index = index;
249         entry->se_stat  = SA_ENTRY_UNSTATED;
250
251         spin_lock(&lli->lli_lock);
252         list_add_tail(&entry->se_list, &sai->sai_entries_sent);
253         spin_unlock(&lli->lli_lock);
254
255         RETURN(entry);
256 }
257
258 /**
259  * delete it from sai_entries_stated head when fini, it need not
260  * to process entry's member.
261  */
262 static void ll_sai_entry_fini(struct ll_statahead_info *sai)
263 {
264         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
265         struct ll_sai_entry  *entry;
266         ENTRY;
267         
268         spin_lock(&lli->lli_lock);
269         sai->sai_index_next++;
270         if (likely(!list_empty(&sai->sai_entries_stated))) {
271                 entry = list_entry(sai->sai_entries_stated.next,
272                                    struct ll_sai_entry, se_list);
273                 if (entry->se_index < sai->sai_index_next) {
274                         list_del(&entry->se_list);
275                         OBD_FREE_PTR(entry);
276                 }
277         } else
278                 LASSERT(sa_is_stopped(sai));
279         spin_unlock(&lli->lli_lock);
280
281         EXIT;
282 }
283
284 /**
285  * inside lli_lock.
286  * \retval NULL : can not find the entry in sai_entries_sent with the index
287  * \retval entry: find the entry in sai_entries_sent with the index
288  */
289 static struct ll_sai_entry *
290 ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat,
291                  struct ptlrpc_request *req, struct md_enqueue_info *minfo)
292 {
293         struct ll_sai_entry *entry;
294         ENTRY;
295
296         if (!list_empty(&sai->sai_entries_sent)) {
297                 list_for_each_entry(entry, &sai->sai_entries_sent,
298                                     se_list) {
299                         if (entry->se_index == index) {
300                                 entry->se_stat = stat;
301                                 entry->se_req = ptlrpc_request_addref(req);
302                                 entry->se_minfo = minfo;
303                                 RETURN(entry);
304                         } else if (entry->se_index > index)
305                                 RETURN(NULL);
306                 }
307         }
308         RETURN(NULL);
309 }
310
311 /**
312  * inside lli_lock.
313  * Move entry to sai_entries_received and
314  * insert it into sai_entries_received tail.
315  */
316 static inline void
317 ll_sai_entry_to_received(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
318 {
319         if (!list_empty(&entry->se_list))
320                 list_del_init(&entry->se_list);
321         list_add_tail(&entry->se_list, &sai->sai_entries_received);
322 }
323
324 /**
325  * Move entry to sai_entries_stated and
326  * sort with the index.
327  */
328 static int
329 ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
330 {
331         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
332         struct ll_sai_entry  *se;
333         ENTRY;
334
335         spin_lock(&lli->lli_lock);
336         if (!list_empty(&entry->se_list))
337                 list_del_init(&entry->se_list);
338
339         if (unlikely(entry->se_index < sai->sai_index_next)) {
340                 spin_unlock(&lli->lli_lock);
341                 ll_sai_entry_cleanup(entry);
342                 RETURN(0);
343         }
344
345         list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) {
346                 if (se->se_index < entry->se_index) {
347                         list_add(&entry->se_list, &se->se_list);
348                         spin_unlock(&lli->lli_lock);
349                         RETURN(1);
350                 }
351         }
352
353         /*
354          * I am the first entry.
355          */
356         list_add(&entry->se_list, &sai->sai_entries_stated);
357         spin_unlock(&lli->lli_lock);
358         RETURN(1);
359 }
360
361 /**
362  * finish lookup/revalidate.
363  */
364 static int do_statahead_interpret(struct ll_statahead_info *sai)
365 {
366         struct ll_inode_info   *lli = ll_i2info(sai->sai_inode);
367         struct ll_sai_entry    *entry;
368         struct ptlrpc_request  *req;
369         struct md_enqueue_info *minfo;
370         struct dentry          *dentry;
371         struct lookup_intent   *it;
372         int                     rc = 0;
373         ENTRY;
374
375         spin_lock(&lli->lli_lock);
376         LASSERT(!sa_received_empty(sai));
377         entry = list_entry(sai->sai_entries_received.next, struct ll_sai_entry,
378                            se_list);
379         list_del_init(&entry->se_list);
380         spin_unlock(&lli->lli_lock);
381
382         if (unlikely(entry->se_index < sai->sai_index_next)) {
383                 ll_sai_entry_cleanup(entry);
384                 RETURN(0);
385         }
386
387         req = entry->se_req;
388         minfo = entry->se_minfo;
389         dentry = minfo->mi_dentry;
390         it = &minfo->mi_it;
391
392         if (entry->se_stat != SA_ENTRY_STATED)
393                 GOTO(out, rc = entry->se_stat);
394
395         if (dentry->d_inode == NULL) {
396                 /*
397                  * lookup.
398                  */
399                 struct dentry    *save = dentry;
400                 struct it_cb_data icbd = {
401                         .icbd_parent   = dentry->d_parent->d_inode,
402                         .icbd_childp   = &dentry
403                 };
404
405                 LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
406
407                 rc = ll_lookup_it_finish(req, it, &icbd);
408                 if (!rc)
409                         /*
410                          * Here dentry->d_inode might be NULL,
411                          * because the entry may have been removed before
412                          * we start doing stat ahead.
413                          */
414                         ll_lookup_finish_locks(it, dentry);
415
416                 if (dentry != save) {
417                         minfo->mi_dentry = dentry;
418                         dput(save);
419                 }
420         } else {
421                 /*
422                  * revalidate.
423                  */
424                 struct mdt_body *body;
425
426                 body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
427                                       sizeof(*body));
428                 if (!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1)) {
429                         ll_unhash_aliases(dentry->d_inode);
430                         GOTO(out, rc = -EAGAIN);
431                 }
432
433                 rc = ll_revalidate_it_finish(req, it, dentry);
434                 if (rc) {
435                         ll_unhash_aliases(dentry->d_inode);
436                         GOTO(out, rc);
437                 }
438
439                 spin_lock(&dcache_lock);
440                 lock_dentry(dentry);
441                 __d_drop(dentry);
442 #ifdef DCACHE_LUSTRE_INVALID
443                 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
444 #endif
445                 unlock_dentry(dentry);
446                 d_rehash_cond(dentry, 0);
447                 spin_unlock(&dcache_lock);
448
449                 ll_lookup_finish_locks(it, dentry);
450         }
451         EXIT;
452
453 out:
454         if (likely(ll_sai_entry_to_stated(sai, entry))) {
455                 entry->se_minfo = NULL;
456                 entry->se_req = NULL;
457                 cfs_waitq_signal(&sai->sai_waitq);
458                 ll_intent_release(it);
459                 OBD_FREE_PTR(minfo);
460                 dput(dentry);
461                 ptlrpc_req_finished(req);
462         }
463         return rc;
464 }
465
466 static int ll_statahead_interpret(struct ptlrpc_request *req,
467                                   struct md_enqueue_info *minfo,
468                                   int rc)
469 {
470         struct dentry            *dentry = minfo->mi_dentry;
471         struct lookup_intent     *it = &minfo->mi_it;
472         struct inode             *dir = dentry->d_parent->d_inode;
473         struct ll_inode_info     *lli = ll_i2info(dir);
474         struct ll_statahead_info *sai;
475         struct ll_sai_entry      *entry;
476         ENTRY;
477
478         CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
479                dentry->d_name.len, dentry->d_name.name, rc);
480
481         spin_lock(&lli->lli_lock);
482         if (unlikely(lli->lli_sai == NULL ||
483             lli->lli_sai->sai_generation != minfo->mi_generation)) {
484                 spin_unlock(&lli->lli_lock);
485                 ll_intent_release(it);
486                 dput(dentry);
487                 OBD_FREE_PTR(minfo);
488                 RETURN(-ESTALE);
489         } else {
490                 sai = lli->lli_sai;
491                 if (rc || dir == NULL)
492                         rc = -ESTALE;
493
494                 entry = ll_sai_entry_set(sai,
495                                          (unsigned int)(long)minfo->mi_cbdata,
496                                          rc ? SA_ENTRY_UNSTATED :
497                                          SA_ENTRY_STATED, req, minfo);
498                 LASSERT(entry != NULL);
499                 if (likely(sa_is_running(sai))) {
500                         ll_sai_entry_to_received(sai, entry);
501                         sai->sai_replied++;
502                         spin_unlock(&lli->lli_lock);
503                         cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
504                 } else {
505                         if (!list_empty(&entry->se_list))
506                                 list_del_init(&entry->se_list);
507                         sai->sai_replied++;
508                         spin_unlock(&lli->lli_lock);
509                         ll_sai_entry_cleanup(entry);
510                 }
511                 RETURN(rc);
512         }
513 }
514
515 static void sa_args_fini(struct md_enqueue_info *minfo,
516                          struct ldlm_enqueue_info *einfo)
517 {
518         LASSERT(minfo && einfo);
519         capa_put(minfo->mi_data.op_capa1);
520         capa_put(minfo->mi_data.op_capa2);
521         OBD_FREE_PTR(minfo);
522         OBD_FREE_PTR(einfo);
523 }
524
525 /**
526  * There is race condition between "capa_put" and "ll_statahead_interpret" for
527  * accessing "op_data.op_capa[1,2]" as following:
528  * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling
529  * "md_intent_getattr_async". But "ll_statahead_interpret" maybe run first, and
530  * fill "op_data.op_capa[1,2]" as POISON, then cause "capa_put" access invalid
531  * "ocapa". So here reserve "op_data.op_capa[1,2]" in "pcapa" before calling
532  * "md_intent_getattr_async".
533  */
534 static int sa_args_init(struct inode *dir, struct dentry *dentry,
535                         struct md_enqueue_info **pmi,
536                         struct ldlm_enqueue_info **pei,
537                         struct obd_capa **pcapa)
538 {
539         struct ll_inode_info     *lli = ll_i2info(dir);
540         struct md_enqueue_info   *minfo;
541         struct ldlm_enqueue_info *einfo;
542         struct md_op_data        *op_data;
543
544         OBD_ALLOC_PTR(einfo);
545         if (einfo == NULL)
546                 return -ENOMEM;
547
548         OBD_ALLOC_PTR(minfo);
549         if (minfo == NULL) {
550                 OBD_FREE_PTR(einfo);
551                 return -ENOMEM;
552         }
553
554         op_data = ll_prep_md_op_data(&minfo->mi_data, dir, dentry->d_inode,
555                                      dentry->d_name.name, dentry->d_name.len,
556                                      0, LUSTRE_OPC_ANY, NULL);
557         if (IS_ERR(op_data)) {
558                 OBD_FREE_PTR(einfo);
559                 OBD_FREE_PTR(minfo);
560                 return PTR_ERR(op_data);
561         }
562
563         minfo->mi_it.it_op = IT_GETATTR;
564         minfo->mi_dentry = dentry;
565         minfo->mi_cb = ll_statahead_interpret;
566         minfo->mi_generation = lli->lli_sai->sai_generation;
567         minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index;
568
569         einfo->ei_type   = LDLM_IBITS;
570         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
571         einfo->ei_cb_bl  = ll_md_blocking_ast;
572         einfo->ei_cb_cp  = ldlm_completion_ast;
573         einfo->ei_cb_gl  = NULL;
574         einfo->ei_cbdata = NULL;
575
576         *pmi = minfo;
577         *pei = einfo;
578         pcapa[0] = op_data->op_capa1;
579         pcapa[1] = op_data->op_capa2;
580
581         return 0;
582 }
583
584 /**
585  * similar to ll_lookup_it().
586  */
587 static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
588 {
589         struct md_enqueue_info   *minfo;
590         struct ldlm_enqueue_info *einfo;
591         struct obd_capa          *capas[2];
592         int                       rc;
593         ENTRY;
594
595         rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
596         if (rc)
597                 RETURN(rc);
598
599         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
600         if (!rc) {
601                 capa_put(capas[0]);
602                 capa_put(capas[1]);
603         } else {
604                 sa_args_fini(minfo, einfo);
605         }
606
607         RETURN(rc);
608 }
609
610 /**
611  * similar to ll_revalidate_it().
612  * \retval      1 -- dentry valid
613  * \retval      0 -- will send stat-ahead request
614  * \retval others -- prepare stat-ahead request failed
615  */
616 static int do_sa_revalidate(struct dentry *dentry)
617 {
618         struct inode             *inode = dentry->d_inode;
619         struct inode             *dir = dentry->d_parent->d_inode;
620         struct lookup_intent      it = { .it_op = IT_GETATTR };
621         struct md_enqueue_info   *minfo;
622         struct ldlm_enqueue_info *einfo;
623         struct obd_capa          *capas[2];
624         int rc;
625         ENTRY;
626
627         if (inode == NULL)
628                 RETURN(1);
629
630         if (d_mountpoint(dentry))
631                 RETURN(1);
632
633         if (dentry == dentry->d_sb->s_root)
634                 RETURN(1);
635
636         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode));
637         if (rc == 1) {
638                 ll_intent_release(&it);
639                 RETURN(1);
640         }
641
642         rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
643         if (rc)
644                 RETURN(rc);
645
646         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
647         if (!rc) {
648                 capa_put(capas[0]);
649                 capa_put(capas[1]);
650         } else {
651                 sa_args_fini(minfo, einfo);
652         }
653
654         RETURN(rc);
655 }
656
657 static inline void ll_name2qstr(struct qstr *this, const char *name, int namelen)
658 {
659         unsigned long hash = init_name_hash();
660         unsigned int  c;
661
662         this->name = name;
663         this->len  = namelen;
664         for (; namelen > 0; namelen--, name++) {
665                 c = *(const unsigned char *)name;
666                 hash = partial_name_hash(c, hash);
667         }
668         this->hash = end_name_hash(hash);
669 }
670
671 static int ll_statahead_one(struct dentry *parent, const char* entry_name,
672                             int entry_name_len)
673 {
674         struct inode             *dir = parent->d_inode;
675         struct ll_inode_info     *lli = ll_i2info(dir);
676         struct ll_statahead_info *sai = lli->lli_sai;
677         struct qstr               name;
678         struct dentry            *dentry;
679         struct ll_sai_entry      *se;
680         int                       rc;
681         ENTRY;
682
683 #ifdef DCACHE_LUSTRE_INVALID
684         if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
685 #else
686         if (d_unhashed(parent)) {
687 #endif
688                 CDEBUG(D_READA, "parent dentry@%p %.*s is "
689                        "invalid, skip statahead\n",
690                        parent, parent->d_name.len, parent->d_name.name);
691                 RETURN(-EINVAL);
692         }
693
694         se = ll_sai_entry_init(sai, sai->sai_index);
695         if (IS_ERR(se))
696                 RETURN(PTR_ERR(se));
697
698         ll_name2qstr(&name, entry_name, entry_name_len);
699         dentry = d_lookup(parent, &name);
700         if (!dentry) {
701                 dentry = d_alloc(parent, &name);
702                 if (dentry) {
703                         rc = do_sa_lookup(dir, dentry);
704                         if (rc)
705                                 dput(dentry);
706                 } else {
707                         GOTO(out, rc = -ENOMEM);
708                 }
709         } else {
710                 rc = do_sa_revalidate(dentry);
711                 if (rc)
712                         dput(dentry);
713         }
714
715         EXIT;
716
717 out:
718         if (rc) {
719                 CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
720                        se, se->se_index, se->se_stat, rc);
721                 se->se_stat = rc;
722                 if (ll_sai_entry_to_stated(sai, se))
723                         cfs_waitq_signal(&sai->sai_waitq);
724         } else {
725                 sai->sai_sent++;
726         }
727
728         sai->sai_index++;
729         return rc;
730 }
731
732 struct ll_sa_thread_args {
733         struct dentry   *sta_parent;
734         pid_t            sta_pid;
735 };
736
737 static int ll_statahead_thread(void *arg)
738 {
739         struct ll_sa_thread_args *sta = arg;
740         struct dentry            *parent = dget(sta->sta_parent);
741         struct inode             *dir = parent->d_inode;
742         struct ll_inode_info     *lli = ll_i2info(dir);
743         struct ll_sb_info        *sbi = ll_i2sbi(dir);
744         struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
745         struct ptlrpc_thread     *thread = &sai->sai_thread;
746         struct page              *page;
747         __u64                     pos = 0;
748         int                       first = 0;
749         int                       rc = 0;
750         struct ll_dir_chain       chain;
751         ENTRY;
752
753         {
754                 char pname[16];
755                 snprintf(pname, 15, "ll_sa_%u", sta->sta_pid);
756                 cfs_daemonize(pname);
757         }
758
759         sbi->ll_sa_total++;
760         spin_lock(&lli->lli_lock);
761         thread->t_flags = SVC_RUNNING;
762         spin_unlock(&lli->lli_lock);
763         cfs_waitq_signal(&thread->t_ctl_waitq);
764         CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
765
766         ll_dir_chain_init(&chain);
767         page = ll_get_dir_page(dir, pos, 0, &chain);
768
769         while (1) {
770                 struct l_wait_info lwi = { 0 };
771                 struct lu_dirpage *dp;
772                 struct lu_dirent  *ent;
773
774                 if (IS_ERR(page)) {
775                         rc = PTR_ERR(page);
776                         CERROR("error reading dir "DFID" at "LPU64"/%u: rc %d\n",
777                                PFID(ll_inode2fid(dir)), pos,
778                                sai->sai_index, rc);
779                         break;
780                 }
781
782                 dp = page_address(page);
783                 for (ent = lu_dirent_start(dp); ent != NULL;
784                      ent = lu_dirent_next(ent)) {
785                         char *name = ent->lde_name;
786                         int namelen = le16_to_cpu(ent->lde_namelen);
787
788                         if (namelen == 0)
789                                 /*
790                                  * Skip dummy record.
791                                  */
792                                 continue;
793
794                         if (name[0] == '.') {
795                                 if (namelen == 1) {
796                                         /*
797                                          * skip "."
798                                          */
799                                         continue;
800                                 } else if (name[1] == '.' && namelen == 2) {
801                                         /*
802                                          * skip ".."
803                                          */
804                                         continue;
805                                 } else if (!sai->sai_ls_all) {
806                                         /*
807                                          * skip hidden files.
808                                          */
809                                         sai->sai_skip_hidden++;
810                                         continue;
811                                 }
812                         }
813
814                         /*
815                          * don't stat-ahead first entry.
816                          */
817                         if (unlikely(!first)) {
818                                 first++;
819                                 continue;
820                         }
821
822 keep_de:
823                         l_wait_event(thread->t_ctl_waitq,
824                                      !sa_is_running(sai) || sa_not_full(sai) ||
825                                      !sa_received_empty(sai),
826                                      &lwi);
827
828                         while (!sa_received_empty(sai) && sa_is_running(sai))
829                                 do_statahead_interpret(sai);
830
831                         if (unlikely(!sa_is_running(sai))) {
832                                 ll_put_page(page);
833                                 GOTO(out, rc);
834                         }
835
836                         if (!sa_not_full(sai))
837                                 /*
838                                  * do not skip the current de.
839                                  */
840                                 goto keep_de;
841
842                         rc = ll_statahead_one(parent, name, namelen);
843                         if (rc < 0) {
844                                 ll_put_page(page);
845                                 GOTO(out, rc);
846                         }
847                 }
848                 pos = le64_to_cpu(dp->ldp_hash_end);
849                 ll_put_page(page);
850                 if (pos == DIR_END_OFF) {
851                         /*
852                          * End of directory reached.
853                          */
854                         while (1) {
855                                 l_wait_event(thread->t_ctl_waitq,
856                                              !sa_is_running(sai) ||
857                                              !sa_received_empty(sai) ||
858                                              sai->sai_sent == sai->sai_replied,
859                                              &lwi);
860                                 if (!sa_received_empty(sai) &&
861                                     sa_is_running(sai))
862                                         do_statahead_interpret(sai);
863                                 else
864                                         GOTO(out, rc);
865                         }
866                 } else if (1) {
867                         /*
868                          * chain is exhausted.
869                          * Normal case: continue to the next page.
870                          */
871                         page = ll_get_dir_page(dir, pos, 1, &chain);
872                 } else {
873                         /*
874                          * go into overflow page.
875                          */
876                 }
877         }
878         EXIT;
879
880 out:
881         ll_dir_chain_fini(&chain);
882         spin_lock(&lli->lli_lock);
883         thread->t_flags = SVC_STOPPED;
884         spin_unlock(&lli->lli_lock);
885         cfs_waitq_signal(&sai->sai_waitq);
886         cfs_waitq_signal(&thread->t_ctl_waitq);
887         ll_sai_put(sai);
888         dput(parent);
889         CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
890                cfs_curproc_pid());
891         return rc;
892 }
893
894 /**
895  * called in ll_file_release().
896  */
897 void ll_stop_statahead(struct inode *inode, void *key)
898 {
899         struct ll_inode_info *lli = ll_i2info(inode);
900         struct ptlrpc_thread *thread;
901
902         spin_lock(&lli->lli_lock);
903         if (lli->lli_opendir_pid == 0 ||
904             unlikely(lli->lli_opendir_key != key)) {
905                 spin_unlock(&lli->lli_lock);
906                 return;
907         }
908
909         lli->lli_opendir_key = NULL;
910         lli->lli_opendir_pid = 0;
911
912         if (lli->lli_sai) {
913                 struct l_wait_info lwi = { 0 };
914
915                 thread = &lli->lli_sai->sai_thread;
916                 if (!sa_is_stopped(lli->lli_sai)) {
917                         thread->t_flags = SVC_STOPPING;
918                         spin_unlock(&lli->lli_lock);
919                         cfs_waitq_signal(&thread->t_ctl_waitq);
920
921                         CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
922                                cfs_curproc_pid());
923                         l_wait_event(thread->t_ctl_waitq,
924                                      sa_is_stopped(lli->lli_sai),
925                                      &lwi);
926                 } else {
927                         spin_unlock(&lli->lli_lock);
928                 }
929
930                 /*
931                  * Put the ref which was held when first statahead_enter.
932                  * It maybe not the last ref for some statahead requests
933                  * maybe inflight.
934                  */
935                 ll_sai_put(lli->lli_sai);
936                 return;
937         }
938         spin_unlock(&lli->lli_lock);
939 }
940
941 enum {
942         /**
943          * not first dirent, or is "."
944          */
945         LS_NONE_FIRST_DE = 0,
946         /**
947          * the first non-hidden dirent
948          */
949         LS_FIRST_DE,
950         /**
951          * the first hidden dirent, that is ".xxx
952          */
953         LS_FIRST_DOT_DE
954 };
955
956 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
957 {
958         struct ll_dir_chain chain;
959         struct qstr        *target = &dentry->d_name;
960         struct page        *page;
961         __u64               pos = 0;
962         int                 dot_de;
963         int                 rc = LS_NONE_FIRST_DE;
964         ENTRY;
965
966         ll_dir_chain_init(&chain);
967         page = ll_get_dir_page(dir, pos, 0, &chain);
968
969         while (1) {
970                 struct lu_dirpage *dp;
971                 struct lu_dirent  *ent;
972
973                 if (IS_ERR(page)) {
974                         rc = PTR_ERR(page);
975                         CERROR("error reading dir "DFID" at "LPU64": rc %d\n",
976                                PFID(ll_inode2fid(dir)), pos, rc);
977                         break;
978                 }
979
980                 dp = page_address(page);
981                 for (ent = lu_dirent_start(dp); ent != NULL;
982                      ent = lu_dirent_next(ent)) {
983                         char *name = ent->lde_name;
984                         int namelen = le16_to_cpu(ent->lde_namelen);
985
986                         if (namelen == 0)
987                                 /*
988                                  * skip dummy record.
989                                  */
990                                 continue;
991
992                         if (name[0] == '.') {
993                                 if (namelen == 1)
994                                         /*
995                                          * skip "."
996                                          */
997                                         continue;
998                                 else if (name[1] == '.' && namelen == 2)
999                                         /*
1000                                          * skip ".."
1001                                          */
1002                                         continue;
1003                                 else
1004                                         dot_de = 1;
1005                         } else {
1006                                 dot_de = 0;
1007                         }
1008
1009                         if (dot_de && target->name[0] != '.') {
1010                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1011                                        target->len, target->name,
1012                                        namelen, name);
1013                                 continue;
1014                         }
1015
1016                         if (target->len == namelen &&
1017                             !strncmp(target->name, name, target->len))
1018                                 rc = LS_FIRST_DE + dot_de;
1019                         else
1020                                 rc = LS_NONE_FIRST_DE;
1021                         ll_put_page(page);
1022                         GOTO(out, rc);
1023                 }
1024                 pos = le64_to_cpu(dp->ldp_hash_end);
1025                 ll_put_page(page);
1026                 if (pos == DIR_END_OFF) {
1027                         /*
1028                          * End of directory reached.
1029                          */
1030                         break;
1031                 } else if (1) {
1032                         /*
1033                          * chain is exhausted 
1034                          * Normal case: continue to the next page.
1035                          */
1036                         page = ll_get_dir_page(dir, pos, 1, &chain);
1037                 } else {
1038                         /*
1039                          * go into overflow page.
1040                          */
1041                 }
1042         }
1043         EXIT;
1044
1045 out:
1046         ll_dir_chain_fini(&chain);
1047         return rc;
1048 }
1049
1050 /**
1051  * Start statahead thread if this is the first dir entry.
1052  * Otherwise if a thread is started already, wait it until it is ahead of me.
1053  * \retval 0       -- stat ahead thread process such dentry, for lookup, it miss
1054  * \retval 1       -- stat ahead thread process such dentry, for lookup, it hit
1055  * \retval -EEXIST -- stat ahead thread started, and this is the first dentry
1056  * \retval -EBADFD -- statahead thread exit and not dentry available
1057  * \retval others  -- error
1058  */
1059 int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
1060 {
1061         struct ll_sb_info        *sbi = ll_i2sbi(dir);
1062         struct ll_inode_info     *lli = ll_i2info(dir);
1063         struct ll_statahead_info *sai = lli->lli_sai;
1064         struct ll_sa_thread_args  sta;
1065         struct l_wait_info        lwi = { 0 };
1066         int                       rc;
1067         ENTRY;
1068
1069         LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
1070
1071         if (sai) {
1072                 if (unlikely(sa_is_stopped(sai) &&
1073                              list_empty(&sai->sai_entries_stated)))
1074                         RETURN(-EBADFD);
1075
1076                 /*
1077                  * skip the first dentry.
1078                  */
1079                 if (unlikely((*dentryp)->d_name.len ==
1080                              sai->sai_first->d_name.len &&
1081                              !strncmp((*dentryp)->d_name.name,
1082                                       sai->sai_first->d_name.name,
1083                                       sai->sai_first->d_name.len)))
1084                         RETURN(-EEXIST);
1085
1086                 if ((*dentryp)->d_name.name[0] == '.') {
1087                         if (likely(sai->sai_ls_all ||
1088                             sai->sai_miss_hidden >= sai->sai_skip_hidden)) {
1089                                 /*
1090                                  * Hidden dentry is the first one, or statahead
1091                                  * thread does not skip so many hidden dentries
1092                                  * before "sai_ls_all" enabled as below.
1093                                  */
1094                         } else {
1095                                 if (!sai->sai_ls_all)
1096                                         /*
1097                                          * It maybe because hidden dentry is not
1098                                          * the first one, "sai_ls_all" was not
1099                                          * set, then "ls -al" missed. Enable
1100                                          * "sai_ls_all" for such case.
1101                                          */
1102                                         sai->sai_ls_all = 1;
1103
1104                                 /*
1105                                  * Such "getattr" has been skipped before
1106                                  * "sai_ls_all" enabled as above.
1107                                  */
1108                                 sai->sai_miss_hidden++;
1109                                 RETURN(-ENOENT);
1110                         }
1111                 }
1112
1113                 if (ll_sai_entry_stated(sai)) {
1114                         sbi->ll_sa_cached++;
1115                 } else {
1116                         sbi->ll_sa_blocked++;
1117                         /*
1118                          * thread started already, avoid double-stat.
1119                          */
1120                         l_wait_event(sai->sai_waitq,
1121                                      ll_sai_entry_stated(sai) || sa_is_stopped(sai),
1122                                      &lwi);
1123                 }
1124
1125                 if (lookup) {
1126                         struct dentry *result;
1127
1128                         result = d_lookup((*dentryp)->d_parent,
1129                                           &(*dentryp)->d_name);
1130                         if (result) {
1131                                 LASSERT(result != *dentryp);
1132                                 /* BUG 16303: do not drop reference count for
1133                                  * "*dentryp", VFS will do that by itself. */
1134                                 *dentryp = result;
1135                                 RETURN(1);
1136                         }
1137                 }
1138                 /*
1139                  * do nothing for revalidate.
1140                  */
1141                 RETURN(0);
1142         }
1143
1144          /*
1145           * I am the "lli_opendir_pid" owner, only me can set "lli_sai".
1146           */ 
1147         LASSERT(lli->lli_sai == NULL);
1148
1149         rc = is_first_dirent(dir, *dentryp);
1150         if (rc == LS_NONE_FIRST_DE) {
1151                 /*
1152                  * It is not "ls -{a}l" operation, no need statahead for it.
1153                  */
1154                 spin_lock(&lli->lli_lock);
1155                 lli->lli_opendir_key = NULL;
1156                 lli->lli_opendir_pid = 0;
1157                 spin_unlock(&lli->lli_lock);
1158                 RETURN(-EBADF);
1159         }
1160
1161         sai = ll_sai_alloc();
1162         if (sai == NULL)
1163                 RETURN(-ENOMEM);
1164
1165         sai->sai_inode  = igrab(dir);
1166         sai->sai_first = dget(*dentryp);
1167         sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
1168
1169         sta.sta_parent = (*dentryp)->d_parent;
1170         sta.sta_pid    = cfs_curproc_pid();
1171
1172         lli->lli_sai = sai;
1173         rc = cfs_kernel_thread(ll_statahead_thread, &sta, 0);
1174         if (rc < 0) {
1175                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1176                 sai->sai_thread.t_flags = SVC_STOPPED;
1177                 ll_sai_put(sai);
1178                 LASSERT(lli->lli_sai == NULL);
1179                 RETURN(rc);
1180         }
1181
1182         l_wait_event(sai->sai_thread.t_ctl_waitq, 
1183                      sa_is_running(sai) || sa_is_stopped(sai),
1184                      &lwi);
1185
1186         /*
1187          * We don't stat-ahead for the first dirent since we are already in
1188          * lookup, and -EEXIST also indicates that this is the first dirent.
1189          */
1190         RETURN(-EEXIST);
1191 }
1192
1193 /**
1194  * update hit/miss count.
1195  */
1196 int ll_statahead_exit(struct dentry *dentry, int result)
1197 {
1198         struct dentry         *parent = dentry->d_parent;
1199         struct ll_inode_info  *lli = ll_i2info(parent->d_inode);
1200         struct ll_sb_info     *sbi = ll_i2sbi(parent->d_inode);
1201         struct ll_dentry_data *ldd = ll_d2d(dentry);
1202         ENTRY;
1203
1204         if (lli->lli_opendir_pid != cfs_curproc_pid())
1205                 RETURN(-EBADFD);
1206
1207         if (lli->lli_sai) {
1208                 struct ll_statahead_info *sai = lli->lli_sai;
1209
1210                 if (result >= 1) {
1211                         sbi->ll_sa_hit++;
1212                         sai->sai_hit++;
1213                         sai->sai_consecutive_miss = 0;
1214                         sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
1215                 } else {
1216                         sbi->ll_sa_miss++;
1217                         sai->sai_miss++;
1218                         sai->sai_consecutive_miss++;
1219                         if (sa_low_hit(sai) && sa_is_running(sai)) {
1220                                 sbi->ll_sa_wrong++;
1221                                 CDEBUG(D_READA, "statahead for dir %.*s hit "
1222                                        "ratio too low: hit/miss %u/%u, "
1223                                        "sent/replied %u/%u. stopping statahead "
1224                                        "thread: pid %d\n",
1225                                        parent->d_name.len, parent->d_name.name,
1226                                        sai->sai_hit, sai->sai_miss,
1227                                        sai->sai_sent, sai->sai_replied,
1228                                        cfs_curproc_pid());
1229                                 spin_lock(&lli->lli_lock);
1230                                 if (!sa_is_stopped(sai))
1231                                         sai->sai_thread.t_flags = SVC_STOPPING;
1232                                 spin_unlock(&lli->lli_lock);
1233                         }
1234                 }
1235
1236                 if (!sa_is_stopped(sai))
1237                         cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
1238                 ll_sai_entry_fini(sai);
1239
1240                 if (unlikely(ldd == NULL)) {
1241                         ll_set_dd(dentry);
1242                         ldd = ll_d2d(dentry);
1243                         if (ldd != NULL && dentry->d_op == NULL) {
1244                                 lock_dentry(dentry);
1245                                 dentry->d_op = dentry->d_op ? : &ll_sai_d_ops;
1246                                 unlock_dentry(dentry);
1247                         }
1248                 }
1249
1250                 if (likely(ldd != NULL))
1251                         ldd->lld_sa_generation = sai->sai_generation;
1252                 else
1253                         RETURN(-ENOMEM);
1254         }
1255         RETURN(0);
1256 }