Whamcloud - gitweb
On a server, a file system object is uniquely identified by a fid, which is
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/mm.h>
40 #include <linux/smp_lock.h>
41 #include <linux/highmem.h>
42 #include <linux/pagemap.h>
43
44 #define DEBUG_SUBSYSTEM S_LLITE
45
46 #include <obd_support.h>
47 #include <lustre_lite.h>
48 #include <lustre_dlm.h>
49 #include <linux/lustre_version.h>
50 #include "llite_internal.h"
51
52 struct ll_sai_entry {
53         struct list_head        se_list;
54         unsigned int            se_index;
55         int                     se_stat;
56         struct ptlrpc_request  *se_req;
57         struct md_enqueue_info *se_minfo;
58 };
59
60 enum {
61         SA_ENTRY_UNSTATED = 0,
62         SA_ENTRY_STATED
63 };
64
65 static unsigned int sai_generation = 0;
66 static spinlock_t sai_generation_lock = SPIN_LOCK_UNLOCKED;
67
68 /**
69  * Check whether first entry was stated already or not.
70  * No need to hold lli_lock, for:
71  * (1) it is me that remove entry from the list
72  * (2) the statahead thread only add new entry to the list
73  */
74 static int ll_sai_entry_stated(struct ll_statahead_info *sai)
75 {
76         struct ll_sai_entry  *entry;
77         int                   rc = 0;
78
79         if (!list_empty(&sai->sai_entries_stated)) {
80                 entry = list_entry(sai->sai_entries_stated.next,
81                                    struct ll_sai_entry, se_list);
82                 if (entry->se_index == sai->sai_index_next)
83                         rc = 1;
84         }
85         return rc;
86 }
87
88 static inline int sa_received_empty(struct ll_statahead_info *sai)
89 {
90         return list_empty(&sai->sai_entries_received);
91 }
92
93 static inline int sa_not_full(struct ll_statahead_info *sai)
94 {
95         return (sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max);
96 }
97
98 static inline int sa_is_running(struct ll_statahead_info *sai)
99 {
100         return !!(sai->sai_thread.t_flags & SVC_RUNNING);
101 }
102
103 static inline int sa_is_stopping(struct ll_statahead_info *sai)
104 {
105         return !!(sai->sai_thread.t_flags & SVC_STOPPING);
106 }
107
108 static inline int sa_is_stopped(struct ll_statahead_info *sai)
109 {
110         return !!(sai->sai_thread.t_flags & SVC_STOPPED);
111 }
112
113 /**
114  * (1) hit ratio less than 80%
115  * or
116  * (2) consecutive miss more than 8
117  */
118 static inline int sa_low_hit(struct ll_statahead_info *sai)
119 {
120         return ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) ||
121                 (sai->sai_consecutive_miss > 8));
122 }
123
124 /**
125  * process the deleted entry's member and free the entry.
126  * (1) release intent
127  * (2) free md_enqueue_info
128  * (3) drop dentry's ref count
129  * (4) release request's ref count
130  */
131 static void ll_sai_entry_cleanup(struct ll_sai_entry *entry, int free)
132 {
133         struct ptlrpc_request  *req = entry->se_req;
134         struct md_enqueue_info *minfo = entry->se_minfo;
135         ENTRY;
136
137         if (minfo) {
138                 struct dentry        *dentry = minfo->mi_dentry;
139                 struct lookup_intent *it = &minfo->mi_it;
140
141                 entry->se_minfo = NULL;
142                 ll_intent_release(it);
143                 OBD_FREE_PTR(minfo);
144                 dput(dentry);
145         }
146         if (req) {
147                 entry->se_req = NULL;
148                 ptlrpc_req_finished(req);
149         }
150         if (free) {
151                 LASSERT(list_empty(&entry->se_list));
152                 OBD_FREE_PTR(entry);
153         }
154
155         EXIT;
156 }
157
158 static struct ll_statahead_info *ll_sai_alloc(void)
159 {
160         struct ll_statahead_info *sai;
161
162         OBD_ALLOC_PTR(sai);
163         if (!sai)
164                 return NULL;
165
166         spin_lock(&sai_generation_lock);
167         sai->sai_generation = ++sai_generation;
168         if (unlikely(sai_generation == 0))
169                 sai->sai_generation = ++sai_generation;
170         spin_unlock(&sai_generation_lock);
171         atomic_set(&sai->sai_refcount, 1);
172         sai->sai_max = LL_SA_RPC_MIN;
173         cfs_waitq_init(&sai->sai_waitq);
174         cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
175         CFS_INIT_LIST_HEAD(&sai->sai_entries_sent);
176         CFS_INIT_LIST_HEAD(&sai->sai_entries_received);
177         CFS_INIT_LIST_HEAD(&sai->sai_entries_stated);
178         return sai;
179 }
180
181 static inline 
182 struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
183 {
184         LASSERT(sai);
185         atomic_inc(&sai->sai_refcount);
186         return sai;
187 }
188
189 static void ll_sai_put(struct ll_statahead_info *sai)
190 {
191         struct inode         *inode = sai->sai_inode;
192         struct ll_inode_info *lli;
193         ENTRY;
194
195         LASSERT(inode != NULL);
196         lli = ll_i2info(inode);
197         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_lock)) {
198                 struct ll_sai_entry *entry, *next;
199
200                 LASSERT(lli->lli_opendir_key == NULL);
201                 lli->lli_sai = NULL;
202                 lli->lli_opendir_pid = 0;
203                 spin_unlock(&lli->lli_lock);
204
205                 LASSERT(sa_is_stopped(sai));
206
207                 if (sai->sai_sent > sai->sai_replied)
208                         CDEBUG(D_READA,"statahead for dir "DFID" does not "
209                               "finish: [sent:%u] [replied:%u]\n",
210                               PFID(&lli->lli_fid),
211                               sai->sai_sent, sai->sai_replied);
212
213                 list_for_each_entry_safe(entry, next, &sai->sai_entries_sent,
214                                          se_list) {
215                         list_del_init(&entry->se_list);
216                         ll_sai_entry_cleanup(entry, 1);
217                 }
218                 list_for_each_entry_safe(entry, next, &sai->sai_entries_received,
219                                          se_list) {
220                         list_del_init(&entry->se_list);
221                         ll_sai_entry_cleanup(entry, 1);
222                 }
223                 list_for_each_entry_safe(entry, next, &sai->sai_entries_stated,
224                                          se_list) {
225                         list_del_init(&entry->se_list);
226                         ll_sai_entry_cleanup(entry, 1);
227                 }
228                 OBD_FREE_PTR(sai);
229                 iput(inode);
230         }
231         EXIT;
232 }
233
234 /**
235  * insert it into sai_entries_sent tail when init.
236  */
237 static struct ll_sai_entry *
238 ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index)
239 {
240         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
241         struct ll_sai_entry  *entry;
242         ENTRY;
243
244         OBD_ALLOC_PTR(entry);
245         if (entry == NULL)
246                 RETURN(ERR_PTR(-ENOMEM));
247
248         CDEBUG(D_READA, "alloc sai entry %p index %u\n",
249                entry, index);
250         entry->se_index = index;
251         entry->se_stat  = SA_ENTRY_UNSTATED;
252
253         spin_lock(&lli->lli_lock);
254         list_add_tail(&entry->se_list, &sai->sai_entries_sent);
255         spin_unlock(&lli->lli_lock);
256
257         RETURN(entry);
258 }
259
260 /**
261  * delete it from sai_entries_stated head when fini, it need not
262  * to process entry's member.
263  */
264 static void ll_sai_entry_fini(struct ll_statahead_info *sai)
265 {
266         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
267         struct ll_sai_entry  *entry;
268         ENTRY;
269         
270         spin_lock(&lli->lli_lock);
271         sai->sai_index_next++;
272         if (likely(!list_empty(&sai->sai_entries_stated))) {
273                 entry = list_entry(sai->sai_entries_stated.next,
274                                    struct ll_sai_entry, se_list);
275                 if (entry->se_index < sai->sai_index_next) {
276                         list_del(&entry->se_list);
277                         OBD_FREE_PTR(entry);
278                 }
279         } else
280                 LASSERT(sa_is_stopped(sai));
281         spin_unlock(&lli->lli_lock);
282
283         EXIT;
284 }
285
286 /**
287  * inside lli_lock.
288  * \retval NULL : can not find the entry in sai_entries_sent with the index
289  * \retval entry: find the entry in sai_entries_sent with the index
290  */
291 static struct ll_sai_entry *
292 ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat,
293                  struct ptlrpc_request *req, struct md_enqueue_info *minfo)
294 {
295         struct ll_sai_entry *entry;
296         ENTRY;
297
298         if (!list_empty(&sai->sai_entries_sent)) {
299                 list_for_each_entry(entry, &sai->sai_entries_sent, se_list) {
300                         if (entry->se_index == index) {
301                                 entry->se_stat = stat;
302                                 entry->se_req = ptlrpc_request_addref(req);
303                                 entry->se_minfo = minfo;
304                                 RETURN(entry);
305                         } else if (entry->se_index > index)
306                                 RETURN(NULL);
307                 }
308         }
309         RETURN(NULL);
310 }
311
312 /**
313  * inside lli_lock.
314  * Move entry to sai_entries_received and
315  * insert it into sai_entries_received tail.
316  */
317 static inline void
318 ll_sai_entry_to_received(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
319 {
320         if (!list_empty(&entry->se_list))
321                 list_del_init(&entry->se_list);
322         list_add_tail(&entry->se_list, &sai->sai_entries_received);
323 }
324
325 /**
326  * Move entry to sai_entries_stated and
327  * sort with the index.
328  */
329 static int
330 ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
331 {
332         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
333         struct ll_sai_entry  *se;
334         ENTRY;
335
336         ll_sai_entry_cleanup(entry, 0);
337
338         spin_lock(&lli->lli_lock);
339         if (!list_empty(&entry->se_list))
340                 list_del_init(&entry->se_list);
341
342         if (unlikely(entry->se_index < sai->sai_index_next)) {
343                 spin_unlock(&lli->lli_lock);
344                 OBD_FREE_PTR(entry);
345                 RETURN(0);
346         }
347
348         list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) {
349                 if (se->se_index < entry->se_index) {
350                         list_add(&entry->se_list, &se->se_list);
351                         spin_unlock(&lli->lli_lock);
352                         RETURN(1);
353                 }
354         }
355
356         /*
357          * I am the first entry.
358          */
359         list_add(&entry->se_list, &sai->sai_entries_stated);
360         spin_unlock(&lli->lli_lock);
361         RETURN(1);
362 }
363
364 /**
365  * finish lookup/revalidate.
366  */
367 static int do_statahead_interpret(struct ll_statahead_info *sai)
368 {
369         struct ll_inode_info   *lli = ll_i2info(sai->sai_inode);
370         struct ll_sai_entry    *entry;
371         struct ptlrpc_request  *req;
372         struct md_enqueue_info *minfo;
373         struct dentry          *dentry;
374         struct lookup_intent   *it;
375         int                     rc = 0;
376         struct mdt_body        *body;
377         ENTRY;
378
379         spin_lock(&lli->lli_lock);
380         LASSERT(!sa_received_empty(sai));
381         entry = list_entry(sai->sai_entries_received.next, struct ll_sai_entry,
382                            se_list);
383         list_del_init(&entry->se_list);
384         spin_unlock(&lli->lli_lock);
385
386         if (unlikely(entry->se_index < sai->sai_index_next)) {
387                 ll_sai_entry_cleanup(entry, 1);
388                 RETURN(0);
389         }
390
391         if (entry->se_stat != SA_ENTRY_STATED)
392                 GOTO(out, rc = entry->se_stat);
393
394         req = entry->se_req;
395         minfo = entry->se_minfo;
396         dentry = minfo->mi_dentry;
397         it = &minfo->mi_it;
398
399         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
400         if (body == NULL)
401                 GOTO(out, rc = -EFAULT);
402
403         if (dentry->d_inode == NULL) {
404                 /*
405                  * lookup.
406                  */
407                 struct dentry    *save = dentry;
408                 struct it_cb_data icbd = {
409                         .icbd_parent   = dentry->d_parent->d_inode,
410                         .icbd_childp   = &dentry
411                 };
412
413                 LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
414
415                 /*
416                  * XXX: No fid in reply, this is probaly cross-ref case.
417                  * SA can't handle it yet.
418                  */
419                 if (body->valid & OBD_MD_MDS)
420                         GOTO(out, rc = -EAGAIN);
421
422                 rc = ll_lookup_it_finish(req, it, &icbd);
423                 if (!rc)
424                         /*
425                          * Here dentry->d_inode might be NULL,
426                          * because the entry may have been removed before
427                          * we start doing stat ahead.
428                          */
429                         ll_lookup_finish_locks(it, dentry);
430
431                 if (dentry != save) {
432                         minfo->mi_dentry = dentry;
433                         dput(save);
434                 }
435         } else {
436                 /*
437                  * revalidate.
438                  */
439                 if (!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1)) {
440                         ll_unhash_aliases(dentry->d_inode);
441                         GOTO(out, rc = -EAGAIN);
442                 }
443
444                 rc = ll_revalidate_it_finish(req, it, dentry);
445                 if (rc) {
446                         ll_unhash_aliases(dentry->d_inode);
447                         GOTO(out, rc);
448                 }
449
450                 spin_lock(&ll_lookup_lock);
451                 spin_lock(&dcache_lock);
452                 lock_dentry(dentry);
453                 __d_drop(dentry);
454 #ifdef DCACHE_LUSTRE_INVALID
455                 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
456 #endif
457                 unlock_dentry(dentry);
458                 d_rehash_cond(dentry, 0);
459                 spin_unlock(&dcache_lock);
460                 spin_unlock(&ll_lookup_lock);
461
462                 ll_lookup_finish_locks(it, dentry);
463         }
464         EXIT;
465
466 out:
467         if (likely(ll_sai_entry_to_stated(sai, entry)))
468                 cfs_waitq_signal(&sai->sai_waitq);
469         return rc;
470 }
471
472 static int ll_statahead_interpret(struct ptlrpc_request *req,
473                                   struct md_enqueue_info *minfo,
474                                   int rc)
475 {
476         struct dentry            *dentry = minfo->mi_dentry;
477         struct lookup_intent     *it = &minfo->mi_it;
478         struct inode             *dir = dentry->d_parent->d_inode;
479         struct ll_inode_info     *lli = ll_i2info(dir);
480         struct ll_statahead_info *sai;
481         struct ll_sai_entry      *entry;
482         ENTRY;
483
484         CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
485                dentry->d_name.len, dentry->d_name.name, rc);
486
487         spin_lock(&lli->lli_lock);
488         if (unlikely(lli->lli_sai == NULL ||
489             lli->lli_sai->sai_generation != minfo->mi_generation)) {
490                 spin_unlock(&lli->lli_lock);
491                 ll_intent_release(it);
492                 dput(dentry);
493                 OBD_FREE_PTR(minfo);
494                 RETURN(-ESTALE);
495         } else {
496                 sai = ll_sai_get(lli->lli_sai);
497                 if (rc || dir == NULL)
498                         rc = -ESTALE;
499
500                 entry = ll_sai_entry_set(sai,
501                                          (unsigned int)(long)minfo->mi_cbdata,
502                                          rc ? SA_ENTRY_UNSTATED :
503                                          SA_ENTRY_STATED, req, minfo);
504                 LASSERT(entry != NULL);
505                 if (likely(sa_is_running(sai))) {
506                         ll_sai_entry_to_received(sai, entry);
507                         sai->sai_replied++;
508                         spin_unlock(&lli->lli_lock);
509                         cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
510                 } else {
511                         if (!list_empty(&entry->se_list))
512                                 list_del_init(&entry->se_list);
513                         sai->sai_replied++;
514                         spin_unlock(&lli->lli_lock);
515                         ll_sai_entry_cleanup(entry, 1);
516                 }
517                 ll_sai_put(sai);
518                 RETURN(rc);
519         }
520 }
521
522 static void sa_args_fini(struct md_enqueue_info *minfo,
523                          struct ldlm_enqueue_info *einfo)
524 {
525         LASSERT(minfo && einfo);
526         capa_put(minfo->mi_data.op_capa1);
527         capa_put(minfo->mi_data.op_capa2);
528         OBD_FREE_PTR(minfo);
529         OBD_FREE_PTR(einfo);
530 }
531
532 /**
533  * There is race condition between "capa_put" and "ll_statahead_interpret" for
534  * accessing "op_data.op_capa[1,2]" as following:
535  * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling
536  * "md_intent_getattr_async". But "ll_statahead_interpret" maybe run first, and
537  * fill "op_data.op_capa[1,2]" as POISON, then cause "capa_put" access invalid
538  * "ocapa". So here reserve "op_data.op_capa[1,2]" in "pcapa" before calling
539  * "md_intent_getattr_async".
540  */
541 static int sa_args_init(struct inode *dir, struct dentry *dentry,
542                         struct md_enqueue_info **pmi,
543                         struct ldlm_enqueue_info **pei,
544                         struct obd_capa **pcapa)
545 {
546         struct ll_inode_info     *lli = ll_i2info(dir);
547         struct md_enqueue_info   *minfo;
548         struct ldlm_enqueue_info *einfo;
549         struct md_op_data        *op_data;
550
551         OBD_ALLOC_PTR(einfo);
552         if (einfo == NULL)
553                 return -ENOMEM;
554
555         OBD_ALLOC_PTR(minfo);
556         if (minfo == NULL) {
557                 OBD_FREE_PTR(einfo);
558                 return -ENOMEM;
559         }
560
561         op_data = ll_prep_md_op_data(&minfo->mi_data, dir, dentry->d_inode,
562                                      dentry->d_name.name, dentry->d_name.len,
563                                      0, LUSTRE_OPC_ANY, NULL);
564         if (IS_ERR(op_data)) {
565                 OBD_FREE_PTR(einfo);
566                 OBD_FREE_PTR(minfo);
567                 return PTR_ERR(op_data);
568         }
569
570         minfo->mi_it.it_op = IT_GETATTR;
571         minfo->mi_dentry = dentry;
572         minfo->mi_cb = ll_statahead_interpret;
573         minfo->mi_generation = lli->lli_sai->sai_generation;
574         minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index;
575
576         einfo->ei_type   = LDLM_IBITS;
577         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
578         einfo->ei_cb_bl  = ll_md_blocking_ast;
579         einfo->ei_cb_cp  = ldlm_completion_ast;
580         einfo->ei_cb_gl  = NULL;
581         einfo->ei_cbdata = NULL;
582
583         *pmi = minfo;
584         *pei = einfo;
585         pcapa[0] = op_data->op_capa1;
586         pcapa[1] = op_data->op_capa2;
587
588         return 0;
589 }
590
591 /**
592  * similar to ll_lookup_it().
593  */
594 static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
595 {
596         struct md_enqueue_info   *minfo;
597         struct ldlm_enqueue_info *einfo;
598         struct obd_capa          *capas[2];
599         int                       rc;
600         ENTRY;
601
602         rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
603         if (rc)
604                 RETURN(rc);
605
606         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
607         if (!rc) {
608                 capa_put(capas[0]);
609                 capa_put(capas[1]);
610         } else {
611                 sa_args_fini(minfo, einfo);
612         }
613
614         RETURN(rc);
615 }
616
617 /**
618  * similar to ll_revalidate_it().
619  * \retval      1 -- dentry valid
620  * \retval      0 -- will send stat-ahead request
621  * \retval others -- prepare stat-ahead request failed
622  */
623 static int do_sa_revalidate(struct dentry *dentry)
624 {
625         struct inode             *inode = dentry->d_inode;
626         struct inode             *dir = dentry->d_parent->d_inode;
627         struct lookup_intent      it = { .it_op = IT_GETATTR };
628         struct md_enqueue_info   *minfo;
629         struct ldlm_enqueue_info *einfo;
630         struct obd_capa          *capas[2];
631         int rc;
632         ENTRY;
633
634         if (inode == NULL)
635                 RETURN(1);
636
637         if (d_mountpoint(dentry))
638                 RETURN(1);
639
640         if (dentry == dentry->d_sb->s_root)
641                 RETURN(1);
642
643         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode));
644         if (rc == 1) {
645                 ll_intent_release(&it);
646                 RETURN(1);
647         }
648
649         rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
650         if (rc)
651                 RETURN(rc);
652
653         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
654         if (!rc) {
655                 capa_put(capas[0]);
656                 capa_put(capas[1]);
657         } else {
658                 sa_args_fini(minfo, einfo);
659         }
660
661         RETURN(rc);
662 }
663
664 static inline void ll_name2qstr(struct qstr *q, const char *name, int namelen)
665 {
666         q->name = name;
667         q->len  = namelen;
668         q->hash = full_name_hash(name, namelen);
669 }
670
671 static int ll_statahead_one(struct dentry *parent, const char* entry_name,
672                             int entry_name_len)
673 {
674         struct inode             *dir = parent->d_inode;
675         struct ll_inode_info     *lli = ll_i2info(dir);
676         struct ll_statahead_info *sai = lli->lli_sai;
677         struct qstr               name;
678         struct dentry            *dentry;
679         struct ll_sai_entry      *se;
680         int                       rc;
681         ENTRY;
682
683 #ifdef DCACHE_LUSTRE_INVALID
684         if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
685 #else
686         if (d_unhashed(parent)) {
687 #endif
688                 CDEBUG(D_READA, "parent dentry@%p %.*s is "
689                        "invalid, skip statahead\n",
690                        parent, parent->d_name.len, parent->d_name.name);
691                 RETURN(-EINVAL);
692         }
693
694         se = ll_sai_entry_init(sai, sai->sai_index);
695         if (IS_ERR(se))
696                 RETURN(PTR_ERR(se));
697
698         ll_name2qstr(&name, entry_name, entry_name_len);
699         dentry = d_lookup(parent, &name);
700         if (!dentry) {
701                 dentry = d_alloc(parent, &name);
702                 if (dentry) {
703                         rc = do_sa_lookup(dir, dentry);
704                         if (rc)
705                                 dput(dentry);
706                 } else {
707                         GOTO(out, rc = -ENOMEM);
708                 }
709         } else {
710                 rc = do_sa_revalidate(dentry);
711                 if (rc)
712                         dput(dentry);
713         }
714
715         EXIT;
716
717 out:
718         if (rc) {
719                 CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
720                        se, se->se_index, se->se_stat, rc);
721                 se->se_stat = rc;
722                 if (ll_sai_entry_to_stated(sai, se))
723                         cfs_waitq_signal(&sai->sai_waitq);
724         } else {
725                 sai->sai_sent++;
726         }
727
728         sai->sai_index++;
729         return rc;
730 }
731
732 struct ll_sa_thread_args {
733         struct dentry   *sta_parent;
734         pid_t            sta_pid;
735 };
736
737 static int ll_statahead_thread(void *arg)
738 {
739         struct ll_sa_thread_args *sta = arg;
740         struct dentry            *parent = dget(sta->sta_parent);
741         struct inode             *dir = parent->d_inode;
742         struct ll_inode_info     *lli = ll_i2info(dir);
743         struct ll_sb_info        *sbi = ll_i2sbi(dir);
744         struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
745         struct ptlrpc_thread     *thread = &sai->sai_thread;
746         struct page              *page;
747         __u64                     pos = 0;
748         int                       first = 0;
749         int                       rc = 0;
750         struct ll_dir_chain       chain;
751         ENTRY;
752
753         {
754                 char pname[16];
755                 snprintf(pname, 15, "ll_sa_%u", sta->sta_pid);
756                 cfs_daemonize(pname);
757         }
758
759         sbi->ll_sa_total++;
760         spin_lock(&lli->lli_lock);
761         thread->t_flags = SVC_RUNNING;
762         spin_unlock(&lli->lli_lock);
763         cfs_waitq_signal(&thread->t_ctl_waitq);
764         CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
765
766         ll_dir_chain_init(&chain);
767         page = ll_get_dir_page(dir, pos, 0, &chain);
768
769         while (1) {
770                 struct l_wait_info lwi = { 0 };
771                 struct lu_dirpage *dp;
772                 struct lu_dirent  *ent;
773
774                 if (IS_ERR(page)) {
775                         rc = PTR_ERR(page);
776                         CERROR("error reading dir "DFID" at "LPU64"/%u: rc %d\n",
777                                PFID(ll_inode2fid(dir)), pos,
778                                sai->sai_index, rc);
779                         break;
780                 }
781
782                 dp = page_address(page);
783                 for (ent = lu_dirent_start(dp); ent != NULL;
784                      ent = lu_dirent_next(ent)) {
785                         char *name = ent->lde_name;
786                         int namelen = le16_to_cpu(ent->lde_namelen);
787
788                         if (namelen == 0)
789                                 /*
790                                  * Skip dummy record.
791                                  */
792                                 continue;
793
794                         if (name[0] == '.') {
795                                 if (namelen == 1) {
796                                         /*
797                                          * skip "."
798                                          */
799                                         continue;
800                                 } else if (name[1] == '.' && namelen == 2) {
801                                         /*
802                                          * skip ".."
803                                          */
804                                         continue;
805                                 } else if (!sai->sai_ls_all) {
806                                         /*
807                                          * skip hidden files.
808                                          */
809                                         sai->sai_skip_hidden++;
810                                         continue;
811                                 }
812                         }
813
814                         /*
815                          * don't stat-ahead first entry.
816                          */
817                         if (unlikely(!first)) {
818                                 first++;
819                                 continue;
820                         }
821
822 keep_de:
823                         l_wait_event(thread->t_ctl_waitq,
824                                      !sa_is_running(sai) || sa_not_full(sai) ||
825                                      !sa_received_empty(sai),
826                                      &lwi);
827
828                         while (!sa_received_empty(sai) && sa_is_running(sai))
829                                 do_statahead_interpret(sai);
830
831                         if (unlikely(!sa_is_running(sai))) {
832                                 ll_put_page(page);
833                                 GOTO(out, rc);
834                         }
835
836                         if (!sa_not_full(sai))
837                                 /*
838                                  * do not skip the current de.
839                                  */
840                                 goto keep_de;
841
842                         rc = ll_statahead_one(parent, name, namelen);
843                         if (rc < 0) {
844                                 ll_put_page(page);
845                                 GOTO(out, rc);
846                         }
847                 }
848                 pos = le64_to_cpu(dp->ldp_hash_end);
849                 ll_put_page(page);
850                 if (pos == DIR_END_OFF) {
851                         /*
852                          * End of directory reached.
853                          */
854                         while (1) {
855                                 l_wait_event(thread->t_ctl_waitq,
856                                              !sa_is_running(sai) ||
857                                              !sa_received_empty(sai) ||
858                                              sai->sai_sent == sai->sai_replied,
859                                              &lwi);
860                                 if (!sa_received_empty(sai) &&
861                                     sa_is_running(sai))
862                                         do_statahead_interpret(sai);
863                                 else
864                                         GOTO(out, rc);
865                         }
866                 } else if (1) {
867                         /*
868                          * chain is exhausted.
869                          * Normal case: continue to the next page.
870                          */
871                         page = ll_get_dir_page(dir, pos, 1, &chain);
872                 } else {
873                         /*
874                          * go into overflow page.
875                          */
876                 }
877         }
878         EXIT;
879
880 out:
881         ll_dir_chain_fini(&chain);
882         spin_lock(&lli->lli_lock);
883         thread->t_flags = SVC_STOPPED;
884         spin_unlock(&lli->lli_lock);
885         cfs_waitq_signal(&sai->sai_waitq);
886         cfs_waitq_signal(&thread->t_ctl_waitq);
887         ll_sai_put(sai);
888         dput(parent);
889         CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
890                cfs_curproc_pid());
891         return rc;
892 }
893
894 /**
895  * called in ll_file_release().
896  */
897 void ll_stop_statahead(struct inode *inode, void *key)
898 {
899         struct ll_inode_info *lli = ll_i2info(inode);
900
901         if (unlikely(key == NULL))
902                 return;
903
904         spin_lock(&lli->lli_lock);
905         if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) {
906                 spin_unlock(&lli->lli_lock);
907                 return;
908         }
909
910         lli->lli_opendir_key = NULL;
911
912         if (lli->lli_sai) {
913                 struct l_wait_info lwi = { 0 };
914                 struct ptlrpc_thread *thread = &lli->lli_sai->sai_thread;
915
916                 if (!sa_is_stopped(lli->lli_sai)) {
917                         thread->t_flags = SVC_STOPPING;
918                         spin_unlock(&lli->lli_lock);
919                         cfs_waitq_signal(&thread->t_ctl_waitq);
920
921                         CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
922                                cfs_curproc_pid());
923                         l_wait_event(thread->t_ctl_waitq,
924                                      sa_is_stopped(lli->lli_sai),
925                                      &lwi);
926                 } else {
927                         spin_unlock(&lli->lli_lock);
928                 }
929
930                 /*
931                  * Put the ref which was held when first statahead_enter.
932                  * It maybe not the last ref for some statahead requests
933                  * maybe inflight.
934                  */
935                 ll_sai_put(lli->lli_sai);
936         } else {
937                 lli->lli_opendir_pid = 0;
938                 spin_unlock(&lli->lli_lock);
939         }
940 }
941
942 enum {
943         /**
944          * not first dirent, or is "."
945          */
946         LS_NONE_FIRST_DE = 0,
947         /**
948          * the first non-hidden dirent
949          */
950         LS_FIRST_DE,
951         /**
952          * the first hidden dirent, that is ".xxx
953          */
954         LS_FIRST_DOT_DE
955 };
956
957 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
958 {
959         struct ll_dir_chain chain;
960         struct qstr        *target = &dentry->d_name;
961         struct page        *page;
962         __u64               pos = 0;
963         int                 dot_de;
964         int                 rc = LS_NONE_FIRST_DE;
965         ENTRY;
966
967         ll_dir_chain_init(&chain);
968         page = ll_get_dir_page(dir, pos, 0, &chain);
969
970         while (1) {
971                 struct lu_dirpage *dp;
972                 struct lu_dirent  *ent;
973
974                 if (IS_ERR(page)) {
975                         rc = PTR_ERR(page);
976                         CERROR("error reading dir "DFID" at "LPU64": rc %d\n",
977                                PFID(ll_inode2fid(dir)), pos, rc);
978                         break;
979                 }
980
981                 dp = page_address(page);
982                 for (ent = lu_dirent_start(dp); ent != NULL;
983                      ent = lu_dirent_next(ent)) {
984                         char *name = ent->lde_name;
985                         int namelen = le16_to_cpu(ent->lde_namelen);
986
987                         if (namelen == 0)
988                                 /*
989                                  * skip dummy record.
990                                  */
991                                 continue;
992
993                         if (name[0] == '.') {
994                                 if (namelen == 1)
995                                         /*
996                                          * skip "."
997                                          */
998                                         continue;
999                                 else if (name[1] == '.' && namelen == 2)
1000                                         /*
1001                                          * skip ".."
1002                                          */
1003                                         continue;
1004                                 else
1005                                         dot_de = 1;
1006                         } else {
1007                                 dot_de = 0;
1008                         }
1009
1010                         if (dot_de && target->name[0] != '.') {
1011                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1012                                        target->len, target->name,
1013                                        namelen, name);
1014                                 continue;
1015                         }
1016
1017                         if (target->len == namelen &&
1018                             memcmp(target->name, name, namelen) == 0)
1019                                 rc = LS_FIRST_DE + dot_de;
1020                         else
1021                                 rc = LS_NONE_FIRST_DE;
1022                         ll_put_page(page);
1023                         GOTO(out, rc);
1024                 }
1025                 pos = le64_to_cpu(dp->ldp_hash_end);
1026                 ll_put_page(page);
1027                 if (pos == DIR_END_OFF) {
1028                         /*
1029                          * End of directory reached.
1030                          */
1031                         break;
1032                 } else if (1) {
1033                         /*
1034                          * chain is exhausted 
1035                          * Normal case: continue to the next page.
1036                          */
1037                         page = ll_get_dir_page(dir, pos, 1, &chain);
1038                 } else {
1039                         /*
1040                          * go into overflow page.
1041                          */
1042                 }
1043         }
1044         EXIT;
1045
1046 out:
1047         ll_dir_chain_fini(&chain);
1048         return rc;
1049 }
1050
1051 /**
1052  * Start statahead thread if this is the first dir entry.
1053  * Otherwise if a thread is started already, wait it until it is ahead of me.
1054  * \retval 0       -- stat ahead thread process such dentry, for lookup, it miss
1055  * \retval 1       -- stat ahead thread process such dentry, for lookup, it hit
1056  * \retval -EEXIST -- stat ahead thread started, and this is the first dentry
1057  * \retval -EBADFD -- statahead thread exit and not dentry available
1058  * \retval others  -- error
1059  */
1060 int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
1061 {
1062         struct ll_sb_info        *sbi = ll_i2sbi(dir);
1063         struct ll_inode_info     *lli = ll_i2info(dir);
1064         struct ll_statahead_info *sai = lli->lli_sai;
1065         struct ll_sa_thread_args  sta;
1066         struct l_wait_info        lwi = { 0 };
1067         int                       rc;
1068         ENTRY;
1069
1070         LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
1071
1072         if (sai) {
1073                 if (unlikely(sa_is_stopped(sai) &&
1074                              list_empty(&sai->sai_entries_stated)))
1075                         RETURN(-EBADFD);
1076
1077                 if ((*dentryp)->d_name.name[0] == '.') {
1078                         if (likely(sai->sai_ls_all ||
1079                             sai->sai_miss_hidden >= sai->sai_skip_hidden)) {
1080                                 /*
1081                                  * Hidden dentry is the first one, or statahead
1082                                  * thread does not skip so many hidden dentries
1083                                  * before "sai_ls_all" enabled as below.
1084                                  */
1085                         } else {
1086                                 if (!sai->sai_ls_all)
1087                                         /*
1088                                          * It maybe because hidden dentry is not
1089                                          * the first one, "sai_ls_all" was not
1090                                          * set, then "ls -al" missed. Enable
1091                                          * "sai_ls_all" for such case.
1092                                          */
1093                                         sai->sai_ls_all = 1;
1094
1095                                 /*
1096                                  * Such "getattr" has been skipped before
1097                                  * "sai_ls_all" enabled as above.
1098                                  */
1099                                 sai->sai_miss_hidden++;
1100                                 RETURN(-ENOENT);
1101                         }
1102                 }
1103
1104                 if (ll_sai_entry_stated(sai)) {
1105                         sbi->ll_sa_cached++;
1106                 } else {
1107                         sbi->ll_sa_blocked++;
1108                         /*
1109                          * thread started already, avoid double-stat.
1110                          */
1111                         l_wait_event(sai->sai_waitq,
1112                                      ll_sai_entry_stated(sai) || sa_is_stopped(sai),
1113                                      &lwi);
1114                 }
1115
1116                 if (lookup) {
1117                         struct dentry *result;
1118
1119                         result = d_lookup((*dentryp)->d_parent,
1120                                           &(*dentryp)->d_name);
1121                         if (result) {
1122                                 LASSERT(result != *dentryp);
1123                                 /* BUG 16303: do not drop reference count for
1124                                  * "*dentryp", VFS will do that by itself. */
1125                                 *dentryp = result;
1126                                 RETURN(1);
1127                         }
1128                 }
1129                 /*
1130                  * do nothing for revalidate.
1131                  */
1132                 RETURN(0);
1133         }
1134
1135          /*
1136           * I am the "lli_opendir_pid" owner, only me can set "lli_sai".
1137           */ 
1138         LASSERT(lli->lli_sai == NULL);
1139
1140         rc = is_first_dirent(dir, *dentryp);
1141         if (rc == LS_NONE_FIRST_DE) {
1142                 /*
1143                  * It is not "ls -{a}l" operation, no need statahead for it.
1144                  */
1145                 spin_lock(&lli->lli_lock);
1146                 lli->lli_opendir_key = NULL;
1147                 lli->lli_opendir_pid = 0;
1148                 spin_unlock(&lli->lli_lock);
1149                 RETURN(-EBADF);
1150         }
1151
1152         sai = ll_sai_alloc();
1153         if (sai == NULL)
1154                 RETURN(-ENOMEM);
1155
1156         sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
1157         sai->sai_inode = igrab(dir);
1158         if (unlikely(sai->sai_inode == NULL)) {
1159                 CWARN("Do not start stat ahead on dying inode "DFID" .\n",
1160                       PFID(&lli->lli_fid));
1161                 OBD_FREE_PTR(sai);
1162                 RETURN(-ESTALE);
1163         }
1164
1165         LASSERT(sai->sai_inode == (*dentryp)->d_parent->d_inode);
1166
1167         sta.sta_parent = (*dentryp)->d_parent;
1168         sta.sta_pid    = cfs_curproc_pid();
1169
1170         lli->lli_sai = sai;
1171         rc = cfs_kernel_thread(ll_statahead_thread, &sta, 0);
1172         if (rc < 0) {
1173                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1174                 lli->lli_opendir_key = NULL;
1175                 sai->sai_thread.t_flags = SVC_STOPPED;
1176                 ll_sai_put(sai);
1177                 LASSERT(lli->lli_sai == NULL);
1178                 RETURN(rc);
1179         }
1180
1181         l_wait_event(sai->sai_thread.t_ctl_waitq, 
1182                      sa_is_running(sai) || sa_is_stopped(sai),
1183                      &lwi);
1184
1185         /*
1186          * We don't stat-ahead for the first dirent since we are already in
1187          * lookup, and -EEXIST also indicates that this is the first dirent.
1188          */
1189         RETURN(-EEXIST);
1190 }
1191
1192 /**
1193  * update hit/miss count.
1194  */
1195 int ll_statahead_exit(struct dentry *dentry, int result)
1196 {
1197         struct dentry         *parent = dentry->d_parent;
1198         struct ll_inode_info  *lli = ll_i2info(parent->d_inode);
1199         struct ll_sb_info     *sbi = ll_i2sbi(parent->d_inode);
1200         int                    rc = 0;
1201         ENTRY;
1202
1203         if (lli->lli_opendir_pid != cfs_curproc_pid())
1204                 RETURN(-EBADFD);
1205
1206         if (lli->lli_sai) {
1207                 struct ll_statahead_info *sai = lli->lli_sai;
1208
1209                 if (result >= 1) {
1210                         sbi->ll_sa_hit++;
1211                         sai->sai_hit++;
1212                         sai->sai_consecutive_miss = 0;
1213                         sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
1214                 } else {
1215                         sbi->ll_sa_miss++;
1216                         sai->sai_miss++;
1217                         sai->sai_consecutive_miss++;
1218                         if (sa_low_hit(sai) && sa_is_running(sai)) {
1219                                 sbi->ll_sa_wrong++;
1220                                 CDEBUG(D_READA, "statahead for dir %.*s hit "
1221                                        "ratio too low: hit/miss %u/%u, "
1222                                        "sent/replied %u/%u. stopping statahead "
1223                                        "thread: pid %d\n",
1224                                        parent->d_name.len, parent->d_name.name,
1225                                        sai->sai_hit, sai->sai_miss,
1226                                        sai->sai_sent, sai->sai_replied,
1227                                        cfs_curproc_pid());
1228                                 spin_lock(&lli->lli_lock);
1229                                 if (!sa_is_stopped(sai))
1230                                         sai->sai_thread.t_flags = SVC_STOPPING;
1231                                 spin_unlock(&lli->lli_lock);
1232                         }
1233                 }
1234
1235                 if (!sa_is_stopped(sai))
1236                         cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
1237                 ll_sai_entry_fini(sai);
1238                 rc = ll_statahead_mark(dentry);
1239         }
1240         RETURN(rc);
1241 }