Whamcloud - gitweb
e8e5572017dd17cd462368323da31f45fe157be1
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/mm.h>
40 #include <linux/smp_lock.h>
41 #include <linux/highmem.h>
42 #include <linux/pagemap.h>
43
44 #define DEBUG_SUBSYSTEM S_LLITE
45
46 #include <obd_support.h>
47 #include <lustre_lite.h>
48 #include <lustre_dlm.h>
49 #include <linux/lustre_version.h>
50 #include "llite_internal.h"
51
52 struct ll_sai_entry {
53         struct list_head        se_list;
54         unsigned int            se_index;
55         int                     se_stat;
56         struct ptlrpc_request  *se_req;
57         struct md_enqueue_info *se_minfo;
58 };
59
60 enum {
61         SA_ENTRY_UNSTATED = 0,
62         SA_ENTRY_STATED
63 };
64
65 struct dentry_operations ll_sai_d_ops = {
66         .d_release = ll_release,
67 };
68
69 static unsigned int sai_generation = 0;
70 static spinlock_t sai_generation_lock = SPIN_LOCK_UNLOCKED;
71
72 /**
73  * Check whether first entry was stated already or not.
74  * No need to hold lli_lock, for:
75  * (1) it is me that remove entry from the list
76  * (2) the statahead thread only add new entry to the list
77  */
78 static int ll_sai_entry_stated(struct ll_statahead_info *sai)
79 {
80         struct ll_sai_entry  *entry;
81         int                   rc = 0;
82
83         if (!list_empty(&sai->sai_entries_stated)) {
84                 entry = list_entry(sai->sai_entries_stated.next,
85                                    struct ll_sai_entry, se_list);
86                 if (entry->se_index == sai->sai_index_next)
87                         rc = 1;
88         }
89         return rc;
90 }
91
92 static inline int sa_received_empty(struct ll_statahead_info *sai)
93 {
94         return list_empty(&sai->sai_entries_received);
95 }
96
97 static inline int sa_not_full(struct ll_statahead_info *sai)
98 {
99         return sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max;
100 }
101
102 static inline int sa_is_running(struct ll_statahead_info *sai)
103 {
104         return !!(sai->sai_thread.t_flags & SVC_RUNNING);
105 }
106
107 static inline int sa_is_stopping(struct ll_statahead_info *sai)
108 {
109         return !!(sai->sai_thread.t_flags & SVC_STOPPING);
110 }
111
112 static inline int sa_is_stopped(struct ll_statahead_info *sai)
113 {
114         return !!(sai->sai_thread.t_flags & SVC_STOPPED);
115 }
116
117 /**
118  * (1) hit ratio less than 80%
119  * or
120  * (2) consecutive miss more than 8
121  */
122 static inline int sa_low_hit(struct ll_statahead_info *sai)
123 {
124         return ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
125                 (sai->sai_consecutive_miss > 8));
126 }
127
128 /**
129  * process the deleted entry's member and free the entry.
130  * (1) release intent
131  * (2) free md_enqueue_info
132  * (3) drop dentry's ref count
133  * (4) release request's ref count
134  */
135 static void ll_sai_entry_cleanup(struct ll_sai_entry *entry)
136 {
137         struct ptlrpc_request  *req = entry->se_req;
138         struct md_enqueue_info *minfo = entry->se_minfo;
139         ENTRY;
140
141         if (minfo) {
142                 struct dentry        *dentry = minfo->mi_dentry;
143                 struct lookup_intent *it = &minfo->mi_it;
144
145                 entry->se_minfo = NULL;
146                 ll_intent_release(it);
147                 OBD_FREE_PTR(minfo);
148                 dput(dentry);
149         }
150         if (req) {
151                 entry->se_req = NULL;
152                 ptlrpc_req_finished(req);
153         }
154         OBD_FREE_PTR(entry);
155
156         EXIT;
157 }
158
159 static struct ll_statahead_info *ll_sai_alloc(void)
160 {
161         struct ll_statahead_info *sai;
162
163         OBD_ALLOC_PTR(sai);
164         if (!sai)
165                 return NULL;
166
167         spin_lock(&sai_generation_lock);
168         sai->sai_generation = ++sai_generation;
169         if (unlikely(sai_generation == 0))
170                 sai->sai_generation = ++sai_generation;
171         spin_unlock(&sai_generation_lock);
172         atomic_set(&sai->sai_refcount, 1);
173         sai->sai_max = LL_SA_RPC_MIN;
174         cfs_waitq_init(&sai->sai_waitq);
175         cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
176         CFS_INIT_LIST_HEAD(&sai->sai_entries_sent);
177         CFS_INIT_LIST_HEAD(&sai->sai_entries_received);
178         CFS_INIT_LIST_HEAD(&sai->sai_entries_stated);
179         return sai;
180 }
181
182 static inline 
183 struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
184 {
185         LASSERT(sai);
186         atomic_inc(&sai->sai_refcount);
187         return sai;
188 }
189
190 static void ll_sai_put(struct ll_statahead_info *sai)
191 {
192         struct inode         *inode = sai->sai_inode;
193         struct ll_inode_info *lli = ll_i2info(inode);
194         ENTRY;
195
196         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_lock)) {
197                 struct ll_sai_entry *entry, *next;
198
199                 lli->lli_sai = NULL;
200                 spin_unlock(&lli->lli_lock);
201
202                 LASSERT(sa_is_stopped(sai));
203
204                 if (sai->sai_sent > sai->sai_replied)
205                         CDEBUG(D_READA,"statahead for dir "DFID" does not "
206                               "finish: [sent:%u] [replied:%u]\n",
207                               PFID(&lli->lli_fid),
208                               sai->sai_sent, sai->sai_replied);
209
210                 list_for_each_entry_safe(entry, next, &sai->sai_entries_sent,
211                                          se_list) {
212                         list_del(&entry->se_list);
213                         ll_sai_entry_cleanup(entry);
214                 }
215                 list_for_each_entry_safe(entry, next, &sai->sai_entries_received,
216                                          se_list) {
217                         list_del(&entry->se_list);
218                         ll_sai_entry_cleanup(entry);
219                 }
220                 list_for_each_entry_safe(entry, next, &sai->sai_entries_stated,
221                                          se_list) {
222                         list_del(&entry->se_list);
223                         ll_sai_entry_cleanup(entry);
224                 }
225                 dput(sai->sai_first);
226                 OBD_FREE_PTR(sai);
227                 iput(inode);
228         }
229         EXIT;
230 }
231
232 /**
233  * insert it into sai_entries_sent tail when init.
234  */
235 static struct ll_sai_entry *
236 ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index)
237 {
238         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
239         struct ll_sai_entry  *entry;
240         ENTRY;
241
242         OBD_ALLOC_PTR(entry);
243         if (entry == NULL)
244                 RETURN(ERR_PTR(-ENOMEM));
245
246         CDEBUG(D_READA, "alloc sai entry %p index %u\n",
247                entry, index);
248         entry->se_index = index;
249         entry->se_stat  = SA_ENTRY_UNSTATED;
250
251         spin_lock(&lli->lli_lock);
252         list_add_tail(&entry->se_list, &sai->sai_entries_sent);
253         spin_unlock(&lli->lli_lock);
254
255         RETURN(entry);
256 }
257
258 /**
259  * delete it from sai_entries_stated head when fini, it need not
260  * to process entry's member.
261  */
262 static void ll_sai_entry_fini(struct ll_statahead_info *sai)
263 {
264         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
265         struct ll_sai_entry  *entry;
266         ENTRY;
267         
268         spin_lock(&lli->lli_lock);
269         sai->sai_index_next++;
270         if (likely(!list_empty(&sai->sai_entries_stated))) {
271                 entry = list_entry(sai->sai_entries_stated.next,
272                                    struct ll_sai_entry, se_list);
273                 if (entry->se_index < sai->sai_index_next) {
274                         list_del(&entry->se_list);
275                         OBD_FREE_PTR(entry);
276                 }
277         } else
278                 LASSERT(sa_is_stopped(sai));
279         spin_unlock(&lli->lli_lock);
280
281         EXIT;
282 }
283
284 /**
285  * inside lli_lock.
286  * \retval NULL : can not find the entry in sai_entries_sent with the index
287  * \retval entry: find the entry in sai_entries_sent with the index
288  */
289 static struct ll_sai_entry *
290 ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat,
291                  struct ptlrpc_request *req, struct md_enqueue_info *minfo)
292 {
293         struct ll_sai_entry *entry;
294         ENTRY;
295
296         if (!list_empty(&sai->sai_entries_sent)) {
297                 list_for_each_entry(entry, &sai->sai_entries_sent,
298                                     se_list) {
299                         if (entry->se_index == index) {
300                                 entry->se_stat = stat;
301                                 entry->se_req = ptlrpc_request_addref(req);
302                                 entry->se_minfo = minfo;
303                                 RETURN(entry);
304                         } else if (entry->se_index > index)
305                                 RETURN(NULL);
306                 }
307         }
308         RETURN(NULL);
309 }
310
311 /**
312  * inside lli_lock.
313  * Move entry to sai_entries_received and
314  * insert it into sai_entries_received tail.
315  */
316 static inline void
317 ll_sai_entry_to_received(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
318 {
319         if (!list_empty(&entry->se_list))
320                 list_del_init(&entry->se_list);
321         list_add_tail(&entry->se_list, &sai->sai_entries_received);
322 }
323
324 /**
325  * Move entry to sai_entries_stated and
326  * sort with the index.
327  */
328 static int
329 ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
330 {
331         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
332         struct ll_sai_entry  *se;
333         ENTRY;
334
335         spin_lock(&lli->lli_lock);
336         if (!list_empty(&entry->se_list))
337                 list_del_init(&entry->se_list);
338
339         if (unlikely(entry->se_index < sai->sai_index_next)) {
340                 spin_unlock(&lli->lli_lock);
341                 ll_sai_entry_cleanup(entry);
342                 RETURN(0);
343         }
344
345         list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) {
346                 if (se->se_index < entry->se_index) {
347                         list_add(&entry->se_list, &se->se_list);
348                         spin_unlock(&lli->lli_lock);
349                         RETURN(1);
350                 }
351         }
352
353         /*
354          * I am the first entry.
355          */
356         list_add(&entry->se_list, &sai->sai_entries_stated);
357         spin_unlock(&lli->lli_lock);
358         RETURN(1);
359 }
360
361 /**
362  * finish lookup/revalidate.
363  */
364 static int do_statahead_interpret(struct ll_statahead_info *sai)
365 {
366         struct ll_inode_info   *lli = ll_i2info(sai->sai_inode);
367         struct ll_sai_entry    *entry;
368         struct ptlrpc_request  *req;
369         struct md_enqueue_info *minfo;
370         struct dentry          *dentry;
371         struct lookup_intent   *it;
372         int                     rc = 0;
373         struct mdt_body        *body;
374         ENTRY;
375
376         spin_lock(&lli->lli_lock);
377         LASSERT(!sa_received_empty(sai));
378         entry = list_entry(sai->sai_entries_received.next, struct ll_sai_entry,
379                            se_list);
380         list_del_init(&entry->se_list);
381         spin_unlock(&lli->lli_lock);
382
383         if (unlikely(entry->se_index < sai->sai_index_next)) {
384                 ll_sai_entry_cleanup(entry);
385                 RETURN(0);
386         }
387
388         req = entry->se_req;
389         minfo = entry->se_minfo;
390         dentry = minfo->mi_dentry;
391         it = &minfo->mi_it;
392
393         if (entry->se_stat != SA_ENTRY_STATED)
394                 GOTO(out, rc = entry->se_stat);
395
396         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
397         if (body == NULL)
398                 GOTO(out, rc = -EFAULT);
399
400         if (dentry->d_inode == NULL) {
401                 /*
402                  * lookup.
403                  */
404                 struct dentry    *save = dentry;
405                 struct it_cb_data icbd = {
406                         .icbd_parent   = dentry->d_parent->d_inode,
407                         .icbd_childp   = &dentry
408                 };
409
410                 LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
411
412                 /*
413                  * XXX: No fid in reply, this is probaly cross-ref case.
414                  * SA can't handle it yet.
415                  */
416                 if (body->valid & OBD_MD_MDS)
417                         GOTO(out, rc = -EAGAIN);
418
419                 rc = ll_lookup_it_finish(req, it, &icbd);
420                 if (!rc)
421                         /*
422                          * Here dentry->d_inode might be NULL,
423                          * because the entry may have been removed before
424                          * we start doing stat ahead.
425                          */
426                         ll_lookup_finish_locks(it, dentry);
427
428                 if (dentry != save) {
429                         minfo->mi_dentry = dentry;
430                         dput(save);
431                 }
432         } else {
433                 /*
434                  * revalidate.
435                  */
436                 if (!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1)) {
437                         ll_unhash_aliases(dentry->d_inode);
438                         GOTO(out, rc = -EAGAIN);
439                 }
440
441                 rc = ll_revalidate_it_finish(req, it, dentry);
442                 if (rc) {
443                         ll_unhash_aliases(dentry->d_inode);
444                         GOTO(out, rc);
445                 }
446
447                 spin_lock(&dcache_lock);
448                 lock_dentry(dentry);
449                 __d_drop(dentry);
450 #ifdef DCACHE_LUSTRE_INVALID
451                 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
452 #endif
453                 unlock_dentry(dentry);
454                 d_rehash_cond(dentry, 0);
455                 spin_unlock(&dcache_lock);
456
457                 ll_lookup_finish_locks(it, dentry);
458         }
459         EXIT;
460
461 out:
462         if (likely(ll_sai_entry_to_stated(sai, entry))) {
463                 entry->se_minfo = NULL;
464                 entry->se_req = NULL;
465                 cfs_waitq_signal(&sai->sai_waitq);
466                 ll_intent_release(it);
467                 OBD_FREE_PTR(minfo);
468                 dput(dentry);
469                 ptlrpc_req_finished(req);
470         }
471         return rc;
472 }
473
474 static int ll_statahead_interpret(struct ptlrpc_request *req,
475                                   struct md_enqueue_info *minfo,
476                                   int rc)
477 {
478         struct dentry            *dentry = minfo->mi_dentry;
479         struct lookup_intent     *it = &minfo->mi_it;
480         struct inode             *dir = dentry->d_parent->d_inode;
481         struct ll_inode_info     *lli = ll_i2info(dir);
482         struct ll_statahead_info *sai;
483         struct ll_sai_entry      *entry;
484         ENTRY;
485
486         CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
487                dentry->d_name.len, dentry->d_name.name, rc);
488
489         spin_lock(&lli->lli_lock);
490         if (unlikely(lli->lli_sai == NULL ||
491             lli->lli_sai->sai_generation != minfo->mi_generation)) {
492                 spin_unlock(&lli->lli_lock);
493                 ll_intent_release(it);
494                 dput(dentry);
495                 OBD_FREE_PTR(minfo);
496                 RETURN(-ESTALE);
497         } else {
498                 sai = lli->lli_sai;
499                 if (rc || dir == NULL)
500                         rc = -ESTALE;
501
502                 entry = ll_sai_entry_set(sai,
503                                          (unsigned int)(long)minfo->mi_cbdata,
504                                          rc ? SA_ENTRY_UNSTATED :
505                                          SA_ENTRY_STATED, req, minfo);
506                 LASSERT(entry != NULL);
507                 if (likely(sa_is_running(sai))) {
508                         ll_sai_entry_to_received(sai, entry);
509                         sai->sai_replied++;
510                         spin_unlock(&lli->lli_lock);
511                         cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
512                 } else {
513                         if (!list_empty(&entry->se_list))
514                                 list_del_init(&entry->se_list);
515                         sai->sai_replied++;
516                         spin_unlock(&lli->lli_lock);
517                         ll_sai_entry_cleanup(entry);
518                 }
519                 RETURN(rc);
520         }
521 }
522
523 static void sa_args_fini(struct md_enqueue_info *minfo,
524                          struct ldlm_enqueue_info *einfo)
525 {
526         LASSERT(minfo && einfo);
527         capa_put(minfo->mi_data.op_capa1);
528         capa_put(minfo->mi_data.op_capa2);
529         OBD_FREE_PTR(minfo);
530         OBD_FREE_PTR(einfo);
531 }
532
533 /**
534  * There is race condition between "capa_put" and "ll_statahead_interpret" for
535  * accessing "op_data.op_capa[1,2]" as following:
536  * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling
537  * "md_intent_getattr_async". But "ll_statahead_interpret" maybe run first, and
538  * fill "op_data.op_capa[1,2]" as POISON, then cause "capa_put" access invalid
539  * "ocapa". So here reserve "op_data.op_capa[1,2]" in "pcapa" before calling
540  * "md_intent_getattr_async".
541  */
542 static int sa_args_init(struct inode *dir, struct dentry *dentry,
543                         struct md_enqueue_info **pmi,
544                         struct ldlm_enqueue_info **pei,
545                         struct obd_capa **pcapa)
546 {
547         struct ll_inode_info     *lli = ll_i2info(dir);
548         struct md_enqueue_info   *minfo;
549         struct ldlm_enqueue_info *einfo;
550         struct md_op_data        *op_data;
551
552         OBD_ALLOC_PTR(einfo);
553         if (einfo == NULL)
554                 return -ENOMEM;
555
556         OBD_ALLOC_PTR(minfo);
557         if (minfo == NULL) {
558                 OBD_FREE_PTR(einfo);
559                 return -ENOMEM;
560         }
561
562         op_data = ll_prep_md_op_data(&minfo->mi_data, dir, dentry->d_inode,
563                                      dentry->d_name.name, dentry->d_name.len,
564                                      0, LUSTRE_OPC_ANY, NULL);
565         if (IS_ERR(op_data)) {
566                 OBD_FREE_PTR(einfo);
567                 OBD_FREE_PTR(minfo);
568                 return PTR_ERR(op_data);
569         }
570
571         minfo->mi_it.it_op = IT_GETATTR;
572         minfo->mi_dentry = dentry;
573         minfo->mi_cb = ll_statahead_interpret;
574         minfo->mi_generation = lli->lli_sai->sai_generation;
575         minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index;
576
577         einfo->ei_type   = LDLM_IBITS;
578         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
579         einfo->ei_cb_bl  = ll_md_blocking_ast;
580         einfo->ei_cb_cp  = ldlm_completion_ast;
581         einfo->ei_cb_gl  = NULL;
582         einfo->ei_cbdata = NULL;
583
584         *pmi = minfo;
585         *pei = einfo;
586         pcapa[0] = op_data->op_capa1;
587         pcapa[1] = op_data->op_capa2;
588
589         return 0;
590 }
591
592 /**
593  * similar to ll_lookup_it().
594  */
595 static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
596 {
597         struct md_enqueue_info   *minfo;
598         struct ldlm_enqueue_info *einfo;
599         struct obd_capa          *capas[2];
600         int                       rc;
601         ENTRY;
602
603         rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
604         if (rc)
605                 RETURN(rc);
606
607         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
608         if (!rc) {
609                 capa_put(capas[0]);
610                 capa_put(capas[1]);
611         } else {
612                 sa_args_fini(minfo, einfo);
613         }
614
615         RETURN(rc);
616 }
617
618 /**
619  * similar to ll_revalidate_it().
620  * \retval      1 -- dentry valid
621  * \retval      0 -- will send stat-ahead request
622  * \retval others -- prepare stat-ahead request failed
623  */
624 static int do_sa_revalidate(struct dentry *dentry)
625 {
626         struct inode             *inode = dentry->d_inode;
627         struct inode             *dir = dentry->d_parent->d_inode;
628         struct lookup_intent      it = { .it_op = IT_GETATTR };
629         struct md_enqueue_info   *minfo;
630         struct ldlm_enqueue_info *einfo;
631         struct obd_capa          *capas[2];
632         int rc;
633         ENTRY;
634
635         if (inode == NULL)
636                 RETURN(1);
637
638         if (d_mountpoint(dentry))
639                 RETURN(1);
640
641         if (dentry == dentry->d_sb->s_root)
642                 RETURN(1);
643
644         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode));
645         if (rc == 1) {
646                 ll_intent_release(&it);
647                 RETURN(1);
648         }
649
650         rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
651         if (rc)
652                 RETURN(rc);
653
654         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
655         if (!rc) {
656                 capa_put(capas[0]);
657                 capa_put(capas[1]);
658         } else {
659                 sa_args_fini(minfo, einfo);
660         }
661
662         RETURN(rc);
663 }
664
665 static inline void ll_name2qstr(struct qstr *this, const char *name, int namelen)
666 {
667         unsigned long hash = init_name_hash();
668         unsigned int  c;
669
670         this->name = name;
671         this->len  = namelen;
672         for (; namelen > 0; namelen--, name++) {
673                 c = *(const unsigned char *)name;
674                 hash = partial_name_hash(c, hash);
675         }
676         this->hash = end_name_hash(hash);
677 }
678
679 static int ll_statahead_one(struct dentry *parent, const char* entry_name,
680                             int entry_name_len)
681 {
682         struct inode             *dir = parent->d_inode;
683         struct ll_inode_info     *lli = ll_i2info(dir);
684         struct ll_statahead_info *sai = lli->lli_sai;
685         struct qstr               name;
686         struct dentry            *dentry;
687         struct ll_sai_entry      *se;
688         int                       rc;
689         ENTRY;
690
691 #ifdef DCACHE_LUSTRE_INVALID
692         if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
693 #else
694         if (d_unhashed(parent)) {
695 #endif
696                 CDEBUG(D_READA, "parent dentry@%p %.*s is "
697                        "invalid, skip statahead\n",
698                        parent, parent->d_name.len, parent->d_name.name);
699                 RETURN(-EINVAL);
700         }
701
702         se = ll_sai_entry_init(sai, sai->sai_index);
703         if (IS_ERR(se))
704                 RETURN(PTR_ERR(se));
705
706         ll_name2qstr(&name, entry_name, entry_name_len);
707         dentry = d_lookup(parent, &name);
708         if (!dentry) {
709                 dentry = d_alloc(parent, &name);
710                 if (dentry) {
711                         rc = do_sa_lookup(dir, dentry);
712                         if (rc)
713                                 dput(dentry);
714                 } else {
715                         GOTO(out, rc = -ENOMEM);
716                 }
717         } else {
718                 rc = do_sa_revalidate(dentry);
719                 if (rc)
720                         dput(dentry);
721         }
722
723         EXIT;
724
725 out:
726         if (rc) {
727                 CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
728                        se, se->se_index, se->se_stat, rc);
729                 se->se_stat = rc;
730                 if (ll_sai_entry_to_stated(sai, se))
731                         cfs_waitq_signal(&sai->sai_waitq);
732         } else {
733                 sai->sai_sent++;
734         }
735
736         sai->sai_index++;
737         return rc;
738 }
739
740 struct ll_sa_thread_args {
741         struct dentry   *sta_parent;
742         pid_t            sta_pid;
743 };
744
745 static int ll_statahead_thread(void *arg)
746 {
747         struct ll_sa_thread_args *sta = arg;
748         struct dentry            *parent = dget(sta->sta_parent);
749         struct inode             *dir = parent->d_inode;
750         struct ll_inode_info     *lli = ll_i2info(dir);
751         struct ll_sb_info        *sbi = ll_i2sbi(dir);
752         struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
753         struct ptlrpc_thread     *thread = &sai->sai_thread;
754         struct page              *page;
755         __u64                     pos = 0;
756         int                       first = 0;
757         int                       rc = 0;
758         struct ll_dir_chain       chain;
759         ENTRY;
760
761         {
762                 char pname[16];
763                 snprintf(pname, 15, "ll_sa_%u", sta->sta_pid);
764                 cfs_daemonize(pname);
765         }
766
767         sbi->ll_sa_total++;
768         spin_lock(&lli->lli_lock);
769         thread->t_flags = SVC_RUNNING;
770         spin_unlock(&lli->lli_lock);
771         cfs_waitq_signal(&thread->t_ctl_waitq);
772         CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
773
774         ll_dir_chain_init(&chain);
775         page = ll_get_dir_page(dir, pos, 0, &chain);
776
777         while (1) {
778                 struct l_wait_info lwi = { 0 };
779                 struct lu_dirpage *dp;
780                 struct lu_dirent  *ent;
781
782                 if (IS_ERR(page)) {
783                         rc = PTR_ERR(page);
784                         CERROR("error reading dir "DFID" at "LPU64"/%u: rc %d\n",
785                                PFID(ll_inode2fid(dir)), pos,
786                                sai->sai_index, rc);
787                         break;
788                 }
789
790                 dp = page_address(page);
791                 for (ent = lu_dirent_start(dp); ent != NULL;
792                      ent = lu_dirent_next(ent)) {
793                         char *name = ent->lde_name;
794                         int namelen = le16_to_cpu(ent->lde_namelen);
795
796                         if (namelen == 0)
797                                 /*
798                                  * Skip dummy record.
799                                  */
800                                 continue;
801
802                         if (name[0] == '.') {
803                                 if (namelen == 1) {
804                                         /*
805                                          * skip "."
806                                          */
807                                         continue;
808                                 } else if (name[1] == '.' && namelen == 2) {
809                                         /*
810                                          * skip ".."
811                                          */
812                                         continue;
813                                 } else if (!sai->sai_ls_all) {
814                                         /*
815                                          * skip hidden files.
816                                          */
817                                         sai->sai_skip_hidden++;
818                                         continue;
819                                 }
820                         }
821
822                         /*
823                          * don't stat-ahead first entry.
824                          */
825                         if (unlikely(!first)) {
826                                 first++;
827                                 continue;
828                         }
829
830 keep_de:
831                         l_wait_event(thread->t_ctl_waitq,
832                                      !sa_is_running(sai) || sa_not_full(sai) ||
833                                      !sa_received_empty(sai),
834                                      &lwi);
835
836                         while (!sa_received_empty(sai) && sa_is_running(sai))
837                                 do_statahead_interpret(sai);
838
839                         if (unlikely(!sa_is_running(sai))) {
840                                 ll_put_page(page);
841                                 GOTO(out, rc);
842                         }
843
844                         if (!sa_not_full(sai))
845                                 /*
846                                  * do not skip the current de.
847                                  */
848                                 goto keep_de;
849
850                         rc = ll_statahead_one(parent, name, namelen);
851                         if (rc < 0) {
852                                 ll_put_page(page);
853                                 GOTO(out, rc);
854                         }
855                 }
856                 pos = le64_to_cpu(dp->ldp_hash_end);
857                 ll_put_page(page);
858                 if (pos == DIR_END_OFF) {
859                         /*
860                          * End of directory reached.
861                          */
862                         while (1) {
863                                 l_wait_event(thread->t_ctl_waitq,
864                                              !sa_is_running(sai) ||
865                                              !sa_received_empty(sai) ||
866                                              sai->sai_sent == sai->sai_replied,
867                                              &lwi);
868                                 if (!sa_received_empty(sai) &&
869                                     sa_is_running(sai))
870                                         do_statahead_interpret(sai);
871                                 else
872                                         GOTO(out, rc);
873                         }
874                 } else if (1) {
875                         /*
876                          * chain is exhausted.
877                          * Normal case: continue to the next page.
878                          */
879                         page = ll_get_dir_page(dir, pos, 1, &chain);
880                 } else {
881                         /*
882                          * go into overflow page.
883                          */
884                 }
885         }
886         EXIT;
887
888 out:
889         ll_dir_chain_fini(&chain);
890         spin_lock(&lli->lli_lock);
891         thread->t_flags = SVC_STOPPED;
892         spin_unlock(&lli->lli_lock);
893         cfs_waitq_signal(&sai->sai_waitq);
894         cfs_waitq_signal(&thread->t_ctl_waitq);
895         ll_sai_put(sai);
896         dput(parent);
897         CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
898                cfs_curproc_pid());
899         return rc;
900 }
901
902 /**
903  * called in ll_file_release().
904  */
905 void ll_stop_statahead(struct inode *inode, void *key)
906 {
907         struct ll_inode_info *lli = ll_i2info(inode);
908         struct ptlrpc_thread *thread;
909
910         spin_lock(&lli->lli_lock);
911         if (lli->lli_opendir_pid == 0 ||
912             unlikely(lli->lli_opendir_key != key)) {
913                 spin_unlock(&lli->lli_lock);
914                 return;
915         }
916
917         lli->lli_opendir_key = NULL;
918         lli->lli_opendir_pid = 0;
919
920         if (lli->lli_sai) {
921                 struct l_wait_info lwi = { 0 };
922
923                 thread = &lli->lli_sai->sai_thread;
924                 if (!sa_is_stopped(lli->lli_sai)) {
925                         thread->t_flags = SVC_STOPPING;
926                         spin_unlock(&lli->lli_lock);
927                         cfs_waitq_signal(&thread->t_ctl_waitq);
928
929                         CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
930                                cfs_curproc_pid());
931                         l_wait_event(thread->t_ctl_waitq,
932                                      sa_is_stopped(lli->lli_sai),
933                                      &lwi);
934                 } else {
935                         spin_unlock(&lli->lli_lock);
936                 }
937
938                 /*
939                  * Put the ref which was held when first statahead_enter.
940                  * It maybe not the last ref for some statahead requests
941                  * maybe inflight.
942                  */
943                 ll_sai_put(lli->lli_sai);
944                 return;
945         }
946         spin_unlock(&lli->lli_lock);
947 }
948
949 enum {
950         /**
951          * not first dirent, or is "."
952          */
953         LS_NONE_FIRST_DE = 0,
954         /**
955          * the first non-hidden dirent
956          */
957         LS_FIRST_DE,
958         /**
959          * the first hidden dirent, that is ".xxx
960          */
961         LS_FIRST_DOT_DE
962 };
963
964 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
965 {
966         struct ll_dir_chain chain;
967         struct qstr        *target = &dentry->d_name;
968         struct page        *page;
969         __u64               pos = 0;
970         int                 dot_de;
971         int                 rc = LS_NONE_FIRST_DE;
972         ENTRY;
973
974         ll_dir_chain_init(&chain);
975         page = ll_get_dir_page(dir, pos, 0, &chain);
976
977         while (1) {
978                 struct lu_dirpage *dp;
979                 struct lu_dirent  *ent;
980
981                 if (IS_ERR(page)) {
982                         rc = PTR_ERR(page);
983                         CERROR("error reading dir "DFID" at "LPU64": rc %d\n",
984                                PFID(ll_inode2fid(dir)), pos, rc);
985                         break;
986                 }
987
988                 dp = page_address(page);
989                 for (ent = lu_dirent_start(dp); ent != NULL;
990                      ent = lu_dirent_next(ent)) {
991                         char *name = ent->lde_name;
992                         int namelen = le16_to_cpu(ent->lde_namelen);
993
994                         if (namelen == 0)
995                                 /*
996                                  * skip dummy record.
997                                  */
998                                 continue;
999
1000                         if (name[0] == '.') {
1001                                 if (namelen == 1)
1002                                         /*
1003                                          * skip "."
1004                                          */
1005                                         continue;
1006                                 else if (name[1] == '.' && namelen == 2)
1007                                         /*
1008                                          * skip ".."
1009                                          */
1010                                         continue;
1011                                 else
1012                                         dot_de = 1;
1013                         } else {
1014                                 dot_de = 0;
1015                         }
1016
1017                         if (dot_de && target->name[0] != '.') {
1018                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1019                                        target->len, target->name,
1020                                        namelen, name);
1021                                 continue;
1022                         }
1023
1024                         if (target->len == namelen &&
1025                             !strncmp(target->name, name, target->len))
1026                                 rc = LS_FIRST_DE + dot_de;
1027                         else
1028                                 rc = LS_NONE_FIRST_DE;
1029                         ll_put_page(page);
1030                         GOTO(out, rc);
1031                 }
1032                 pos = le64_to_cpu(dp->ldp_hash_end);
1033                 ll_put_page(page);
1034                 if (pos == DIR_END_OFF) {
1035                         /*
1036                          * End of directory reached.
1037                          */
1038                         break;
1039                 } else if (1) {
1040                         /*
1041                          * chain is exhausted 
1042                          * Normal case: continue to the next page.
1043                          */
1044                         page = ll_get_dir_page(dir, pos, 1, &chain);
1045                 } else {
1046                         /*
1047                          * go into overflow page.
1048                          */
1049                 }
1050         }
1051         EXIT;
1052
1053 out:
1054         ll_dir_chain_fini(&chain);
1055         return rc;
1056 }
1057
1058 /**
1059  * Start statahead thread if this is the first dir entry.
1060  * Otherwise if a thread is started already, wait it until it is ahead of me.
1061  * \retval 0       -- stat ahead thread process such dentry, for lookup, it miss
1062  * \retval 1       -- stat ahead thread process such dentry, for lookup, it hit
1063  * \retval -EEXIST -- stat ahead thread started, and this is the first dentry
1064  * \retval -EBADFD -- statahead thread exit and not dentry available
1065  * \retval others  -- error
1066  */
1067 int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
1068 {
1069         struct ll_sb_info        *sbi = ll_i2sbi(dir);
1070         struct ll_inode_info     *lli = ll_i2info(dir);
1071         struct ll_statahead_info *sai = lli->lli_sai;
1072         struct ll_sa_thread_args  sta;
1073         struct l_wait_info        lwi = { 0 };
1074         int                       rc;
1075         ENTRY;
1076
1077         LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
1078
1079         if (sai) {
1080                 if (unlikely(sa_is_stopped(sai) &&
1081                              list_empty(&sai->sai_entries_stated)))
1082                         RETURN(-EBADFD);
1083
1084                 /*
1085                  * skip the first dentry.
1086                  */
1087                 if (unlikely((*dentryp)->d_name.len ==
1088                              sai->sai_first->d_name.len &&
1089                              !strncmp((*dentryp)->d_name.name,
1090                                       sai->sai_first->d_name.name,
1091                                       sai->sai_first->d_name.len)))
1092                         RETURN(-EEXIST);
1093
1094                 if ((*dentryp)->d_name.name[0] == '.') {
1095                         if (likely(sai->sai_ls_all ||
1096                             sai->sai_miss_hidden >= sai->sai_skip_hidden)) {
1097                                 /*
1098                                  * Hidden dentry is the first one, or statahead
1099                                  * thread does not skip so many hidden dentries
1100                                  * before "sai_ls_all" enabled as below.
1101                                  */
1102                         } else {
1103                                 if (!sai->sai_ls_all)
1104                                         /*
1105                                          * It maybe because hidden dentry is not
1106                                          * the first one, "sai_ls_all" was not
1107                                          * set, then "ls -al" missed. Enable
1108                                          * "sai_ls_all" for such case.
1109                                          */
1110                                         sai->sai_ls_all = 1;
1111
1112                                 /*
1113                                  * Such "getattr" has been skipped before
1114                                  * "sai_ls_all" enabled as above.
1115                                  */
1116                                 sai->sai_miss_hidden++;
1117                                 RETURN(-ENOENT);
1118                         }
1119                 }
1120
1121                 if (ll_sai_entry_stated(sai)) {
1122                         sbi->ll_sa_cached++;
1123                 } else {
1124                         sbi->ll_sa_blocked++;
1125                         /*
1126                          * thread started already, avoid double-stat.
1127                          */
1128                         l_wait_event(sai->sai_waitq,
1129                                      ll_sai_entry_stated(sai) || sa_is_stopped(sai),
1130                                      &lwi);
1131                 }
1132
1133                 if (lookup) {
1134                         struct dentry *result;
1135
1136                         result = d_lookup((*dentryp)->d_parent,
1137                                           &(*dentryp)->d_name);
1138                         if (result) {
1139                                 LASSERT(result != *dentryp);
1140                                 /* BUG 16303: do not drop reference count for
1141                                  * "*dentryp", VFS will do that by itself. */
1142                                 *dentryp = result;
1143                                 RETURN(1);
1144                         }
1145                 }
1146                 /*
1147                  * do nothing for revalidate.
1148                  */
1149                 RETURN(0);
1150         }
1151
1152          /*
1153           * I am the "lli_opendir_pid" owner, only me can set "lli_sai".
1154           */ 
1155         LASSERT(lli->lli_sai == NULL);
1156
1157         rc = is_first_dirent(dir, *dentryp);
1158         if (rc == LS_NONE_FIRST_DE) {
1159                 /*
1160                  * It is not "ls -{a}l" operation, no need statahead for it.
1161                  */
1162                 spin_lock(&lli->lli_lock);
1163                 lli->lli_opendir_key = NULL;
1164                 lli->lli_opendir_pid = 0;
1165                 spin_unlock(&lli->lli_lock);
1166                 RETURN(-EBADF);
1167         }
1168
1169         sai = ll_sai_alloc();
1170         if (sai == NULL)
1171                 RETURN(-ENOMEM);
1172
1173         sai->sai_inode  = igrab(dir);
1174         sai->sai_first = dget(*dentryp);
1175         sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
1176
1177         sta.sta_parent = (*dentryp)->d_parent;
1178         sta.sta_pid    = cfs_curproc_pid();
1179
1180         lli->lli_sai = sai;
1181         rc = cfs_kernel_thread(ll_statahead_thread, &sta, 0);
1182         if (rc < 0) {
1183                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1184                 sai->sai_thread.t_flags = SVC_STOPPED;
1185                 ll_sai_put(sai);
1186                 LASSERT(lli->lli_sai == NULL);
1187                 RETURN(rc);
1188         }
1189
1190         l_wait_event(sai->sai_thread.t_ctl_waitq, 
1191                      sa_is_running(sai) || sa_is_stopped(sai),
1192                      &lwi);
1193
1194         /*
1195          * We don't stat-ahead for the first dirent since we are already in
1196          * lookup, and -EEXIST also indicates that this is the first dirent.
1197          */
1198         RETURN(-EEXIST);
1199 }
1200
1201 /**
1202  * update hit/miss count.
1203  */
1204 int ll_statahead_exit(struct dentry *dentry, int result)
1205 {
1206         struct dentry         *parent = dentry->d_parent;
1207         struct ll_inode_info  *lli = ll_i2info(parent->d_inode);
1208         struct ll_sb_info     *sbi = ll_i2sbi(parent->d_inode);
1209         struct ll_dentry_data *ldd = ll_d2d(dentry);
1210         ENTRY;
1211
1212         if (lli->lli_opendir_pid != cfs_curproc_pid())
1213                 RETURN(-EBADFD);
1214
1215         if (lli->lli_sai) {
1216                 struct ll_statahead_info *sai = lli->lli_sai;
1217
1218                 if (result >= 1) {
1219                         sbi->ll_sa_hit++;
1220                         sai->sai_hit++;
1221                         sai->sai_consecutive_miss = 0;
1222                         sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
1223                 } else {
1224                         sbi->ll_sa_miss++;
1225                         sai->sai_miss++;
1226                         sai->sai_consecutive_miss++;
1227                         if (sa_low_hit(sai) && sa_is_running(sai)) {
1228                                 sbi->ll_sa_wrong++;
1229                                 CDEBUG(D_READA, "statahead for dir %.*s hit "
1230                                        "ratio too low: hit/miss %u/%u, "
1231                                        "sent/replied %u/%u. stopping statahead "
1232                                        "thread: pid %d\n",
1233                                        parent->d_name.len, parent->d_name.name,
1234                                        sai->sai_hit, sai->sai_miss,
1235                                        sai->sai_sent, sai->sai_replied,
1236                                        cfs_curproc_pid());
1237                                 spin_lock(&lli->lli_lock);
1238                                 if (!sa_is_stopped(sai))
1239                                         sai->sai_thread.t_flags = SVC_STOPPING;
1240                                 spin_unlock(&lli->lli_lock);
1241                         }
1242                 }
1243
1244                 if (!sa_is_stopped(sai))
1245                         cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
1246                 ll_sai_entry_fini(sai);
1247
1248                 if (unlikely(ldd == NULL)) {
1249                         ll_set_dd(dentry);
1250                         ldd = ll_d2d(dentry);
1251                         if (ldd != NULL && dentry->d_op == NULL) {
1252                                 lock_dentry(dentry);
1253                                 dentry->d_op = dentry->d_op ? : &ll_sai_d_ops;
1254                                 unlock_dentry(dentry);
1255                         }
1256                 }
1257
1258                 if (likely(ldd != NULL))
1259                         ldd->lld_sa_generation = sai->sai_generation;
1260                 else
1261                         RETURN(-ENOMEM);
1262         }
1263         RETURN(0);
1264 }