Whamcloud - gitweb
f0d33d773089adacb3e575ef5235fe0604b883a8
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #include <linux/fs.h>
34 #include <linux/sched.h>
35 #include <linux/kthread.h>
36 #include <linux/mm.h>
37 #include <linux/highmem.h>
38 #include <linux/pagemap.h>
39
40 #define DEBUG_SUBSYSTEM S_LLITE
41
42 #include <obd_support.h>
43 #include <lustre_dlm.h>
44 #include "llite_internal.h"
45
46 #define SA_OMITTED_ENTRY_MAX 8ULL
47
48 typedef enum {
49         /** negative values are for error cases */
50         SA_ENTRY_INIT = 0,      /** init entry */
51         SA_ENTRY_SUCC = 1,      /** stat succeed */
52         SA_ENTRY_INVA = 2,      /** invalid entry */
53 } se_state_t;
54
55 /* sa_entry is not refcounted: statahead thread allocates it and do async stat,
56  * and in async stat callback ll_statahead_interpret() will add it into
57  * sai_interim_entries, later statahead thread will call sa_handle_callback() to
58  * instantiate entry and move it into sai_entries, and then only scanner process
59  * can access and free it. */
60 struct sa_entry {
61         /* link into sai_interim_entries or sai_entries */
62         struct list_head        se_list;
63         /* link into sai hash table locally */
64         struct list_head        se_hash;
65         /* entry index in the sai */
66         __u64                   se_index;
67         /* low layer ldlm lock handle */
68         __u64                   se_handle;
69         /* entry status */
70         se_state_t              se_state;
71         /* entry size, contains name */
72         int                     se_size;
73         /* pointer to async getattr enqueue info */
74         struct md_enqueue_info *se_minfo;
75         /* pointer to the async getattr request */
76         struct ptlrpc_request  *se_req;
77         /* pointer to the target inode */
78         struct inode           *se_inode;
79         /* entry name */
80         struct qstr             se_qstr;
81         /* entry fid */
82         struct lu_fid           se_fid;
83 };
84
85 static unsigned int sai_generation = 0;
86 static DEFINE_SPINLOCK(sai_generation_lock);
87
88 static inline int sa_unhashed(struct sa_entry *entry)
89 {
90         return list_empty(&entry->se_hash);
91 }
92
93 /* sa_entry is ready to use */
94 static inline int sa_ready(struct sa_entry *entry)
95 {
96         smp_rmb();
97         return (entry->se_state != SA_ENTRY_INIT);
98 }
99
100 /* hash value to put in sai_cache */
101 static inline int sa_hash(int val)
102 {
103         return val & LL_SA_CACHE_MASK;
104 }
105
106 /* hash entry into sai_cache */
107 static inline void
108 sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
109 {
110         int i = sa_hash(entry->se_qstr.hash);
111
112         spin_lock(&sai->sai_cache_lock[i]);
113         list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
114         spin_unlock(&sai->sai_cache_lock[i]);
115 }
116
117 /* unhash entry from sai_cache */
118 static inline void
119 sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
120 {
121         int i = sa_hash(entry->se_qstr.hash);
122
123         spin_lock(&sai->sai_cache_lock[i]);
124         list_del_init(&entry->se_hash);
125         spin_unlock(&sai->sai_cache_lock[i]);
126 }
127
128 static inline int agl_should_run(struct ll_statahead_info *sai,
129                                  struct inode *inode)
130 {
131         return (inode != NULL && S_ISREG(inode->i_mode) && sai->sai_agl_valid);
132 }
133
134 static inline struct ll_inode_info *
135 agl_first_entry(struct ll_statahead_info *sai)
136 {
137         return list_entry(sai->sai_agls.next, struct ll_inode_info,
138                           lli_agl_list);
139 }
140
141 /* statahead window is full */
142 static inline int sa_sent_full(struct ll_statahead_info *sai)
143 {
144         return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
145 }
146
147 /* got async stat replies */
148 static inline int sa_has_callback(struct ll_statahead_info *sai)
149 {
150         return !list_empty(&sai->sai_interim_entries);
151 }
152
153 static inline int agl_list_empty(struct ll_statahead_info *sai)
154 {
155         return list_empty(&sai->sai_agls);
156 }
157
158 /**
159  * (1) hit ratio less than 80%
160  * or
161  * (2) consecutive miss more than 8
162  * then means low hit.
163  */
164 static inline int sa_low_hit(struct ll_statahead_info *sai)
165 {
166         return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
167                 (sai->sai_consecutive_miss > 8));
168 }
169
170 /*
171  * if the given index is behind of statahead window more than
172  * SA_OMITTED_ENTRY_MAX, then it is old.
173  */
174 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
175 {
176         return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
177                  sai->sai_index);
178 }
179
180 /* allocate sa_entry and hash it to allow scanner process to find it */
181 static struct sa_entry *
182 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
183          const char *name, int len, const struct lu_fid *fid)
184 {
185         struct ll_inode_info *lli;
186         struct sa_entry *entry;
187         int entry_size;
188         char *dname;
189         ENTRY;
190
191         entry_size = sizeof(struct sa_entry) + (len & ~3) + 4;
192         OBD_ALLOC(entry, entry_size);
193         if (unlikely(entry == NULL))
194                 RETURN(ERR_PTR(-ENOMEM));
195
196         CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
197                len, name, entry, index);
198
199         entry->se_index = index;
200
201         entry->se_state = SA_ENTRY_INIT;
202         entry->se_size = entry_size;
203         dname = (char *)entry + sizeof(struct sa_entry);
204         memcpy(dname, name, len);
205         dname[len] = 0;
206         entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
207         entry->se_qstr.len = len;
208         entry->se_qstr.name = dname;
209         entry->se_fid = *fid;
210
211         lli = ll_i2info(sai->sai_dentry->d_inode);
212
213         spin_lock(&lli->lli_sa_lock);
214         INIT_LIST_HEAD(&entry->se_list);
215         sa_rehash(sai, entry);
216         spin_unlock(&lli->lli_sa_lock);
217
218         atomic_inc(&sai->sai_cache_count);
219
220         RETURN(entry);
221 }
222
223 /* free sa_entry, which should have been unhashed and not in any list */
224 static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
225 {
226         CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
227                entry->se_qstr.len, entry->se_qstr.name, entry,
228                entry->se_index);
229
230         LASSERT(list_empty(&entry->se_list));
231         LASSERT(sa_unhashed(entry));
232
233         OBD_FREE(entry, entry->se_size);
234         atomic_dec(&sai->sai_cache_count);
235 }
236
237 /*
238  * find sa_entry by name, used by directory scanner, lock is not needed because
239  * only scanner can remove the entry from cache.
240  */
241 static struct sa_entry *
242 sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
243 {
244         struct sa_entry *entry;
245         int i = sa_hash(qstr->hash);
246
247         list_for_each_entry(entry, &sai->sai_cache[i], se_hash) {
248                 if (entry->se_qstr.hash == qstr->hash &&
249                     entry->se_qstr.len == qstr->len &&
250                     memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0)
251                         return entry;
252         }
253         return NULL;
254 }
255
256 /* unhash and unlink sa_entry, and then free it */
257 static inline void
258 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
259 {
260         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
261
262         LASSERT(!sa_unhashed(entry));
263         LASSERT(!list_empty(&entry->se_list));
264         LASSERT(sa_ready(entry));
265
266         sa_unhash(sai, entry);
267
268         spin_lock(&lli->lli_sa_lock);
269         list_del_init(&entry->se_list);
270         spin_unlock(&lli->lli_sa_lock);
271
272         if (entry->se_inode != NULL)
273                 iput(entry->se_inode);
274
275         sa_free(sai, entry);
276 }
277
278 /* called by scanner after use, sa_entry will be killed */
279 static void
280 sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
281 {
282         struct sa_entry *tmp, *next;
283
284         if (entry != NULL && entry->se_state == SA_ENTRY_SUCC) {
285                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
286
287                 sai->sai_hit++;
288                 sai->sai_consecutive_miss = 0;
289                 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
290         } else {
291                 sai->sai_miss++;
292                 sai->sai_consecutive_miss++;
293         }
294
295         if (entry != NULL)
296                 sa_kill(sai, entry);
297
298         /* kill old completed entries, only scanner process does this, no need
299          * to lock */
300         list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) {
301                 if (!is_omitted_entry(sai, tmp->se_index))
302                         break;
303                 sa_kill(sai, tmp);
304         }
305
306         wake_up(&sai->sai_thread.t_ctl_waitq);
307 }
308
309 /* update state and sort add entry to sai_entries by index, return true if
310  * scanner is waiting on this entry. */
311 static bool
312 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
313 {
314         struct sa_entry *se;
315         struct list_head *pos = &sai->sai_entries;
316         __u64 index = entry->se_index;
317
318         LASSERT(!sa_ready(entry));
319         LASSERT(list_empty(&entry->se_list));
320
321         list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
322                 if (se->se_index < entry->se_index) {
323                         pos = &se->se_list;
324                         break;
325                 }
326         }
327         list_add(&entry->se_list, pos);
328         /*
329          * LU-9210: ll_statahead_interpet must be able to see this before
330          * we wake it up
331          */
332         smp_store_release(&entry->se_state, ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
333
334         return (index == sai->sai_index_wait);
335 }
336
337 /* finish async stat RPC arguments */
338 static void sa_fini_data(struct md_enqueue_info *minfo)
339 {
340         ll_unlock_md_op_lsm(&minfo->mi_data);
341         iput(minfo->mi_dir);
342         OBD_FREE_PTR(minfo);
343 }
344
345 static int ll_statahead_interpret(struct ptlrpc_request *req,
346                                   struct md_enqueue_info *minfo, int rc);
347
348 /*
349  * prepare arguments for async stat RPC.
350  */
351 static struct md_enqueue_info *
352 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
353 {
354         struct md_enqueue_info   *minfo;
355         struct ldlm_enqueue_info *einfo;
356         struct md_op_data        *op_data;
357
358         OBD_ALLOC_PTR(minfo);
359         if (minfo == NULL)
360                 return ERR_PTR(-ENOMEM);
361
362         op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child,
363                                      entry->se_qstr.name, entry->se_qstr.len, 0,
364                                      LUSTRE_OPC_ANY, NULL);
365         if (IS_ERR(op_data)) {
366                 OBD_FREE_PTR(minfo);
367                 return (struct md_enqueue_info *)op_data;
368         }
369
370         if (child == NULL)
371                 op_data->op_fid2 = entry->se_fid;
372
373         minfo->mi_it.it_op = IT_GETATTR;
374         minfo->mi_dir = igrab(dir);
375         minfo->mi_cb = ll_statahead_interpret;
376         minfo->mi_cbdata = entry;
377
378         einfo = &minfo->mi_einfo;
379         einfo->ei_type   = LDLM_IBITS;
380         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
381         einfo->ei_cb_bl  = ll_md_blocking_ast;
382         einfo->ei_cb_cp  = ldlm_completion_ast;
383         einfo->ei_cb_gl  = NULL;
384         einfo->ei_cbdata = NULL;
385
386         return minfo;
387 }
388
389 /*
390  * release resources used in async stat RPC, update entry state and wakeup if
391  * scanner process it waiting on this entry.
392  */
393 static void
394 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
395 {
396         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
397         struct md_enqueue_info *minfo = entry->se_minfo;
398         struct ptlrpc_request *req = entry->se_req;
399         bool wakeup;
400
401         /* release resources used in RPC */
402         if (minfo) {
403                 entry->se_minfo = NULL;
404                 ll_intent_release(&minfo->mi_it);
405                 sa_fini_data(minfo);
406         }
407
408         if (req) {
409                 entry->se_req = NULL;
410                 ptlrpc_req_finished(req);
411         }
412
413         spin_lock(&lli->lli_sa_lock);
414         wakeup = __sa_make_ready(sai, entry, ret);
415         spin_unlock(&lli->lli_sa_lock);
416
417         if (wakeup)
418                 wake_up(&sai->sai_waitq);
419 }
420
421 /* insert inode into the list of sai_agls */
422 static void ll_agl_add(struct ll_statahead_info *sai,
423                        struct inode *inode, int index)
424 {
425         struct ll_inode_info *child  = ll_i2info(inode);
426         struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
427         int                   added  = 0;
428
429         spin_lock(&child->lli_agl_lock);
430         if (child->lli_agl_index == 0) {
431                 child->lli_agl_index = index;
432                 spin_unlock(&child->lli_agl_lock);
433
434                 LASSERT(list_empty(&child->lli_agl_list));
435
436                 igrab(inode);
437                 spin_lock(&parent->lli_agl_lock);
438                 if (agl_list_empty(sai))
439                         added = 1;
440                 list_add_tail(&child->lli_agl_list, &sai->sai_agls);
441                 if (added && sai->sai_agl_task)
442                         wake_up_process(sai->sai_agl_task);
443                 spin_unlock(&parent->lli_agl_lock);
444         } else {
445                 spin_unlock(&child->lli_agl_lock);
446         }
447 }
448
449 /* allocate sai */
450 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
451 {
452         struct ll_statahead_info *sai;
453         struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
454         int i;
455         ENTRY;
456
457         OBD_ALLOC_PTR(sai);
458         if (!sai)
459                 RETURN(NULL);
460
461         sai->sai_dentry = dget(dentry);
462         atomic_set(&sai->sai_refcount, 1);
463         sai->sai_max = LL_SA_RPC_MIN;
464         sai->sai_index = 1;
465         init_waitqueue_head(&sai->sai_waitq);
466         init_waitqueue_head(&sai->sai_thread.t_ctl_waitq);
467
468         INIT_LIST_HEAD(&sai->sai_interim_entries);
469         INIT_LIST_HEAD(&sai->sai_entries);
470         INIT_LIST_HEAD(&sai->sai_agls);
471
472         for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
473                 INIT_LIST_HEAD(&sai->sai_cache[i]);
474                 spin_lock_init(&sai->sai_cache_lock[i]);
475         }
476         atomic_set(&sai->sai_cache_count, 0);
477
478         spin_lock(&sai_generation_lock);
479         lli->lli_sa_generation = ++sai_generation;
480         if (unlikely(sai_generation == 0))
481                 lli->lli_sa_generation = ++sai_generation;
482         spin_unlock(&sai_generation_lock);
483
484         RETURN(sai);
485 }
486
487 /* free sai */
488 static inline void ll_sai_free(struct ll_statahead_info *sai)
489 {
490         LASSERT(sai->sai_dentry != NULL);
491         dput(sai->sai_dentry);
492         OBD_FREE_PTR(sai);
493 }
494
495 /*
496  * take refcount of sai if sai for @dir exists, which means statahead is on for
497  * this directory.
498  */
499 static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
500 {
501         struct ll_inode_info *lli = ll_i2info(dir);
502         struct ll_statahead_info *sai = NULL;
503
504         spin_lock(&lli->lli_sa_lock);
505         sai = lli->lli_sai;
506         if (sai != NULL)
507                 atomic_inc(&sai->sai_refcount);
508         spin_unlock(&lli->lli_sa_lock);
509
510         return sai;
511 }
512
513 /*
514  * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
515  * attached to it.
516  */
517 static void ll_sai_put(struct ll_statahead_info *sai)
518 {
519         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
520
521         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
522                 struct sa_entry *entry, *next;
523                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
524
525                 lli->lli_sai = NULL;
526                 spin_unlock(&lli->lli_sa_lock);
527
528                 LASSERT(thread_is_stopped(&sai->sai_thread));
529                 LASSERT(!sai->sai_agl_task);
530                 LASSERT(sai->sai_sent == sai->sai_replied);
531                 LASSERT(!sa_has_callback(sai));
532
533                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
534                                          se_list)
535                         sa_kill(sai, entry);
536
537                 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
538                 LASSERT(agl_list_empty(sai));
539
540                 ll_sai_free(sai);
541                 atomic_dec(&sbi->ll_sa_running);
542         }
543 }
544
545 /* Do NOT forget to drop inode refcount when into sai_agls. */
546 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
547 {
548         struct ll_inode_info *lli = ll_i2info(inode);
549         u64 index = lli->lli_agl_index;
550         ktime_t expire;
551         int rc;
552
553         ENTRY;
554         LASSERT(list_empty(&lli->lli_agl_list));
555
556         /* AGL maybe fall behind statahead with one entry */
557         if (is_omitted_entry(sai, index + 1)) {
558                 lli->lli_agl_index = 0;
559                 iput(inode);
560                 RETURN_EXIT;
561         }
562
563         /* In case of restore, the MDT has the right size and has already
564          * sent it back without granting the layout lock, inode is up-to-date.
565          * Then AGL (async glimpse lock) is useless.
566          * Also to glimpse we need the layout, in case of a runninh restore
567          * the MDT holds the layout lock so the glimpse will block up to the
568          * end of restore (statahead/agl will block) */
569         if (ll_file_test_flag(lli, LLIF_FILE_RESTORING)) {
570                 lli->lli_agl_index = 0;
571                 iput(inode);
572                 RETURN_EXIT;
573         }
574
575         /* Someone is in glimpse (sync or async), do nothing. */
576         rc = down_write_trylock(&lli->lli_glimpse_sem);
577         if (rc == 0) {
578                 lli->lli_agl_index = 0;
579                 iput(inode);
580                 RETURN_EXIT;
581         }
582
583         /*
584          * Someone triggered glimpse within 1 sec before.
585          * 1) The former glimpse succeeded with glimpse lock granted by OST, and
586          *    if the lock is still cached on client, AGL needs to do nothing. If
587          *    it is cancelled by other client, AGL maybe cannot obtaion new lock
588          *    for no glimpse callback triggered by AGL.
589          * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
590          *    Under such case, it is quite possible that the OST will not grant
591          *    glimpse lock for AGL also.
592          * 3) The former glimpse failed, compared with other two cases, it is
593          *    relative rare. AGL can ignore such case, and it will not muchly
594          *    affect the performance.
595          */
596         expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
597         if (ktime_to_ns(lli->lli_glimpse_time) &&
598             ktime_before(expire, lli->lli_glimpse_time)) {
599                 up_write(&lli->lli_glimpse_sem);
600                 lli->lli_agl_index = 0;
601                 iput(inode);
602                 RETURN_EXIT;
603         }
604
605         CDEBUG(D_READA, "Handling (init) async glimpse: inode = "
606                DFID", idx = %llu\n", PFID(&lli->lli_fid), index);
607
608         cl_agl(inode);
609         lli->lli_agl_index = 0;
610         lli->lli_glimpse_time = ktime_get();
611         up_write(&lli->lli_glimpse_sem);
612
613         CDEBUG(D_READA, "Handled (init) async glimpse: inode= "
614                DFID", idx = %llu, rc = %d\n",
615                PFID(&lli->lli_fid), index, rc);
616
617         iput(inode);
618
619         EXIT;
620 }
621
622 /*
623  * prepare inode for sa entry, add it into agl list, now sa_entry is ready
624  * to be used by scanner process.
625  */
626 static void sa_instantiate(struct ll_statahead_info *sai,
627                                  struct sa_entry *entry)
628 {
629         struct inode *dir = sai->sai_dentry->d_inode;
630         struct inode *child;
631         struct md_enqueue_info *minfo;
632         struct lookup_intent *it;
633         struct ptlrpc_request *req;
634         struct mdt_body *body;
635         int rc = 0;
636         ENTRY;
637
638         LASSERT(entry->se_handle != 0);
639
640         minfo = entry->se_minfo;
641         it = &minfo->mi_it;
642         req = entry->se_req;
643         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
644         if (body == NULL)
645                 GOTO(out, rc = -EFAULT);
646
647         child = entry->se_inode;
648         if (child != NULL) {
649                 /* revalidate; unlinked and re-created with the same name */
650                 if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2,
651                                         &body->mbo_fid1))) {
652                         entry->se_inode = NULL;
653                         iput(child);
654                         child = NULL;
655                 }
656         }
657
658         it->it_lock_handle = entry->se_handle;
659         rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
660         if (rc != 1)
661                 GOTO(out, rc = -EAGAIN);
662
663         rc = ll_prep_inode(&child, req, dir->i_sb, it);
664         if (rc)
665                 GOTO(out, rc);
666
667         CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
668                ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
669                entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
670         ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
671
672         entry->se_inode = child;
673
674         if (agl_should_run(sai, child))
675                 ll_agl_add(sai, child, entry->se_index);
676
677         EXIT;
678
679 out:
680         /* sa_make_ready() will drop ldlm ibits lock refcount by calling
681          * ll_intent_drop_lock() in spite of failures. Do not worry about
682          * calling ll_intent_drop_lock() more than once. */
683         sa_make_ready(sai, entry, rc);
684 }
685
686 /* once there are async stat replies, instantiate sa_entry from replies */
687 static void sa_handle_callback(struct ll_statahead_info *sai)
688 {
689         struct ll_inode_info *lli;
690
691         lli = ll_i2info(sai->sai_dentry->d_inode);
692
693         spin_lock(&lli->lli_sa_lock);
694         while (sa_has_callback(sai)) {
695                 struct sa_entry *entry;
696
697                 entry = list_entry(sai->sai_interim_entries.next,
698                                    struct sa_entry, se_list);
699                 list_del_init(&entry->se_list);
700                 spin_unlock(&lli->lli_sa_lock);
701
702                 sa_instantiate(sai, entry);
703                 spin_lock(&lli->lli_sa_lock);
704         }
705         spin_unlock(&lli->lli_sa_lock);
706 }
707
708 /*
709  * callback for async stat RPC, because this is called in ptlrpcd context, we
710  * only put sa_entry in sai_interim_entries, and wake up statahead thread to
711  * really prepare inode and instantiate sa_entry later.
712  */
713 static int ll_statahead_interpret(struct ptlrpc_request *req,
714                                   struct md_enqueue_info *minfo, int rc)
715 {
716         struct lookup_intent *it = &minfo->mi_it;
717         struct inode *dir = minfo->mi_dir;
718         struct ll_inode_info *lli = ll_i2info(dir);
719         struct ll_statahead_info *sai = lli->lli_sai;
720         struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
721         __u64 handle = 0;
722         wait_queue_head_t *waitq = NULL;
723         ENTRY;
724
725         if (it_disposition(it, DISP_LOOKUP_NEG))
726                 rc = -ENOENT;
727
728         /* because statahead thread will wait for all inflight RPC to finish,
729          * sai should be always valid, no need to refcount */
730         LASSERT(sai != NULL);
731         LASSERT(!thread_is_stopped(&sai->sai_thread));
732         LASSERT(entry != NULL);
733
734         CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
735                entry->se_qstr.len, entry->se_qstr.name, rc);
736
737         if (rc != 0) {
738                 ll_intent_release(it);
739                 sa_fini_data(minfo);
740         } else {
741                 /* release ibits lock ASAP to avoid deadlock when statahead
742                  * thread enqueues lock on parent in readdir and another
743                  * process enqueues lock on child with parent lock held, eg.
744                  * unlink. */
745                 handle = it->it_lock_handle;
746                 ll_intent_drop_lock(it);
747                 ll_unlock_md_op_lsm(&minfo->mi_data);
748         }
749
750         spin_lock(&lli->lli_sa_lock);
751         if (rc != 0) {
752                 if (__sa_make_ready(sai, entry, rc))
753                         waitq = &sai->sai_waitq;
754         } else {
755                 entry->se_minfo = minfo;
756                 entry->se_req = ptlrpc_request_addref(req);
757                 /* Release the async ibits lock ASAP to avoid deadlock
758                  * when statahead thread tries to enqueue lock on parent
759                  * for readpage and other tries to enqueue lock on child
760                  * with parent's lock held, for example: unlink. */
761                 entry->se_handle = handle;
762                 if (!sa_has_callback(sai))
763                         waitq = &sai->sai_thread.t_ctl_waitq;
764
765                 list_add_tail(&entry->se_list, &sai->sai_interim_entries);
766         }
767         sai->sai_replied++;
768
769         if (waitq != NULL)
770                 wake_up(waitq);
771         spin_unlock(&lli->lli_sa_lock);
772
773         RETURN(rc);
774 }
775
776 /* async stat for file not found in dcache */
777 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
778 {
779         struct md_enqueue_info   *minfo;
780         int                       rc;
781         ENTRY;
782
783         minfo = sa_prep_data(dir, NULL, entry);
784         if (IS_ERR(minfo))
785                 RETURN(PTR_ERR(minfo));
786
787         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
788         if (rc < 0)
789                 sa_fini_data(minfo);
790
791         RETURN(rc);
792 }
793
794 /**
795  * async stat for file found in dcache, similar to .revalidate
796  *
797  * \retval      1 dentry valid, no RPC sent
798  * \retval      0 dentry invalid, will send async stat RPC
799  * \retval      negative number upon error
800  */
801 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
802                          struct dentry *dentry)
803 {
804         struct inode *inode = dentry->d_inode;
805         struct lookup_intent it = { .it_op = IT_GETATTR,
806                                     .it_lock_handle = 0 };
807         struct md_enqueue_info *minfo;
808         int rc;
809         ENTRY;
810
811         if (unlikely(inode == NULL))
812                 RETURN(1);
813
814         if (d_mountpoint(dentry))
815                 RETURN(1);
816
817         minfo = sa_prep_data(dir, inode, entry);
818         if (IS_ERR(minfo))
819                 RETURN(PTR_ERR(minfo));
820
821         entry->se_inode = igrab(inode);
822         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
823                                 NULL);
824         if (rc == 1) {
825                 entry->se_handle = it.it_lock_handle;
826                 ll_intent_release(&it);
827                 sa_fini_data(minfo);
828                 RETURN(1);
829         }
830
831         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
832         if (rc < 0) {
833                 entry->se_inode = NULL;
834                 iput(inode);
835                 sa_fini_data(minfo);
836         }
837
838         RETURN(rc);
839 }
840
841 /* async stat for file with @name */
842 static void sa_statahead(struct dentry *parent, const char *name, int len,
843                          const struct lu_fid *fid)
844 {
845         struct inode *dir = parent->d_inode;
846         struct ll_inode_info *lli = ll_i2info(dir);
847         struct ll_statahead_info *sai = lli->lli_sai;
848         struct dentry *dentry = NULL;
849         struct sa_entry *entry;
850         int rc;
851         ENTRY;
852
853         entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
854         if (IS_ERR(entry))
855                 RETURN_EXIT;
856
857         dentry = d_lookup(parent, &entry->se_qstr);
858         if (!dentry) {
859                 rc = sa_lookup(dir, entry);
860         } else {
861                 rc = sa_revalidate(dir, entry, dentry);
862                 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
863                         ll_agl_add(sai, dentry->d_inode, entry->se_index);
864         }
865
866         if (dentry != NULL)
867                 dput(dentry);
868
869         if (rc != 0)
870                 sa_make_ready(sai, entry, rc);
871         else
872                 sai->sai_sent++;
873
874         sai->sai_index++;
875
876         EXIT;
877 }
878
879 #ifndef TASK_IDLE
880 #define TASK_IDLE TASK_INTERRUPTIBLE
881 #endif
882
883 /* async glimpse (agl) thread main function */
884 static int ll_agl_thread(void *arg)
885 {
886         struct dentry *parent = (struct dentry *)arg;
887         struct inode *dir = parent->d_inode;
888         struct ll_inode_info *plli = ll_i2info(dir);
889         struct ll_inode_info *clli;
890         /* We already own this reference, so it is safe to take it
891          * without a lock.
892          */
893         struct ll_statahead_info *sai = plli->lli_sai;
894         ENTRY;
895
896         CDEBUG(D_READA, "agl thread started: sai %p, parent %.*s\n",
897                sai, parent->d_name.len, parent->d_name.name);
898
899         while (({set_current_state(TASK_IDLE);
900                  !kthread_should_stop(); })) {
901                 spin_lock(&plli->lli_agl_lock);
902                 if (!agl_list_empty(sai)) {
903                         __set_current_state(TASK_RUNNING);
904                         clli = agl_first_entry(sai);
905                         list_del_init(&clli->lli_agl_list);
906                         spin_unlock(&plli->lli_agl_lock);
907                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
908                         cond_resched();
909                 } else {
910                         spin_unlock(&plli->lli_agl_lock);
911                         schedule();
912                 }
913         }
914         __set_current_state(TASK_RUNNING);
915         RETURN(0);
916 }
917
918 static void ll_stop_agl(struct ll_statahead_info *sai)
919 {
920         struct dentry *parent = sai->sai_dentry;
921         struct ll_inode_info *plli = ll_i2info(parent->d_inode);
922         struct ll_inode_info *clli;
923         struct task_struct *agl_task;
924
925         spin_lock(&plli->lli_agl_lock);
926         agl_task = sai->sai_agl_task;
927         sai->sai_agl_task = NULL;
928         spin_unlock(&plli->lli_agl_lock);
929         if (!agl_task)
930                 return;
931
932         CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
933                sai, (unsigned int)agl_task->pid);
934         kthread_stop(agl_task);
935
936         spin_lock(&plli->lli_agl_lock);
937         sai->sai_agl_valid = 0;
938         while (!agl_list_empty(sai)) {
939                 clli = agl_first_entry(sai);
940                 list_del_init(&clli->lli_agl_list);
941                 spin_unlock(&plli->lli_agl_lock);
942                 clli->lli_agl_index = 0;
943                 iput(&clli->lli_vfs_inode);
944                 spin_lock(&plli->lli_agl_lock);
945         }
946         spin_unlock(&plli->lli_agl_lock);
947         CDEBUG(D_READA, "agl thread stopped: sai %p, parent %.*s\n",
948                sai, parent->d_name.len, parent->d_name.name);
949         ll_sai_put(sai);
950 }
951
952 /* start agl thread */
953 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
954 {
955         struct ll_inode_info  *plli;
956         struct task_struct    *task;
957         ENTRY;
958
959         CDEBUG(D_READA, "start agl thread: sai %p, parent %.*s\n",
960                sai, parent->d_name.len, parent->d_name.name);
961
962         plli = ll_i2info(parent->d_inode);
963         task = kthread_create(ll_agl_thread, parent,
964                               "ll_agl_%u", plli->lli_opendir_pid);
965         if (IS_ERR(task)) {
966                 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
967                 RETURN_EXIT;
968         }
969         sai->sai_agl_task = task;
970         sai->sai_agl_valid = 1;
971         atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
972         /* Get an extra reference that the thread holds */
973         ll_sai_get(d_inode(parent));
974
975         wake_up_process(task);
976
977         EXIT;
978 }
979
980 /* statahead thread main function */
981 static int ll_statahead_thread(void *arg)
982 {
983         struct dentry *parent = (struct dentry *)arg;
984         struct inode *dir = parent->d_inode;
985         struct ll_inode_info *lli = ll_i2info(dir);
986         struct ll_sb_info *sbi = ll_i2sbi(dir);
987         struct ll_statahead_info *sai;
988         struct ptlrpc_thread *sa_thread;
989         int first = 0;
990         struct md_op_data *op_data;
991         struct ll_dir_chain chain;
992         struct page *page = NULL;
993         __u64 pos = 0;
994         int rc = 0;
995         ENTRY;
996
997         sai = ll_sai_get(dir);
998         sa_thread = &sai->sai_thread;
999         sa_thread->t_pid = current_pid();
1000         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n",
1001                sai, parent->d_name.len, parent->d_name.name);
1002
1003         OBD_ALLOC_PTR(op_data);
1004         if (!op_data)
1005                 GOTO(out, rc = -ENOMEM);
1006
1007         if (sbi->ll_flags & LL_SBI_AGL_ENABLED)
1008                 ll_start_agl(parent, sai);
1009
1010         atomic_inc(&sbi->ll_sa_total);
1011         spin_lock(&lli->lli_sa_lock);
1012         if (thread_is_init(sa_thread))
1013                 /* If someone else has changed the thread state
1014                  * (e.g. already changed to SVC_STOPPING), we can't just
1015                  * blindly overwrite that setting. */
1016                 thread_set_flags(sa_thread, SVC_RUNNING);
1017         spin_unlock(&lli->lli_sa_lock);
1018         wake_up(&sa_thread->t_ctl_waitq);
1019
1020         ll_dir_chain_init(&chain);
1021         while (pos != MDS_DIR_END_OFF && thread_is_running(sa_thread)) {
1022                 struct lu_dirpage *dp;
1023                 struct lu_dirent  *ent;
1024
1025                 op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
1026                                      LUSTRE_OPC_ANY, dir);
1027                 if (IS_ERR(op_data)) {
1028                         rc = PTR_ERR(op_data);
1029                         break;
1030                 }
1031
1032                 sai->sai_in_readpage = 1;
1033                 page = ll_get_dir_page(dir, op_data, pos, &chain);
1034                 ll_unlock_md_op_lsm(op_data);
1035                 sai->sai_in_readpage = 0;
1036                 if (IS_ERR(page)) {
1037                         rc = PTR_ERR(page);
1038                         CDEBUG(D_READA, "error reading dir "DFID" at %llu"
1039                                "/%llu opendir_pid = %u: rc = %d\n",
1040                                PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1041                                lli->lli_opendir_pid, rc);
1042                         break;
1043                 }
1044
1045                 dp = page_address(page);
1046                 for (ent = lu_dirent_start(dp);
1047                      ent != NULL && thread_is_running(sa_thread) &&
1048                      !sa_low_hit(sai);
1049                      ent = lu_dirent_next(ent)) {
1050                         __u64 hash;
1051                         int namelen;
1052                         char *name;
1053                         struct lu_fid fid;
1054
1055                         hash = le64_to_cpu(ent->lde_hash);
1056                         if (unlikely(hash < pos))
1057                                 /*
1058                                  * Skip until we find target hash value.
1059                                  */
1060                                 continue;
1061
1062                         namelen = le16_to_cpu(ent->lde_namelen);
1063                         if (unlikely(namelen == 0))
1064                                 /*
1065                                  * Skip dummy record.
1066                                  */
1067                                 continue;
1068
1069                         name = ent->lde_name;
1070                         if (name[0] == '.') {
1071                                 if (namelen == 1) {
1072                                         /*
1073                                          * skip "."
1074                                          */
1075                                         continue;
1076                                 } else if (name[1] == '.' && namelen == 2) {
1077                                         /*
1078                                          * skip ".."
1079                                          */
1080                                         continue;
1081                                 } else if (!sai->sai_ls_all) {
1082                                         /*
1083                                          * skip hidden files.
1084                                          */
1085                                         sai->sai_skip_hidden++;
1086                                         continue;
1087                                 }
1088                         }
1089
1090                         /*
1091                          * don't stat-ahead first entry.
1092                          */
1093                         if (unlikely(++first == 1))
1094                                 continue;
1095
1096                         fid_le_to_cpu(&fid, &ent->lde_fid);
1097
1098                         /* wait for spare statahead window */
1099                         do {
1100                                 wait_event_idle(sa_thread->t_ctl_waitq,
1101                                                 !sa_sent_full(sai) ||
1102                                                 sa_has_callback(sai) ||
1103                                                 !agl_list_empty(sai) ||
1104                                                 !thread_is_running(sa_thread));
1105
1106                                 sa_handle_callback(sai);
1107
1108                                 spin_lock(&lli->lli_agl_lock);
1109                                 while (sa_sent_full(sai) &&
1110                                        !agl_list_empty(sai)) {
1111                                         struct ll_inode_info *clli;
1112
1113                                         clli = agl_first_entry(sai);
1114                                         list_del_init(&clli->lli_agl_list);
1115                                         spin_unlock(&lli->lli_agl_lock);
1116
1117                                         ll_agl_trigger(&clli->lli_vfs_inode,
1118                                                         sai);
1119                                         cond_resched();
1120                                         spin_lock(&lli->lli_agl_lock);
1121                                 }
1122                                 spin_unlock(&lli->lli_agl_lock);
1123                         } while (sa_sent_full(sai) &&
1124                                  thread_is_running(sa_thread));
1125
1126                         sa_statahead(parent, name, namelen, &fid);
1127                 }
1128
1129                 pos = le64_to_cpu(dp->ldp_hash_end);
1130                 ll_release_page(dir, page,
1131                                 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1132
1133                 if (sa_low_hit(sai)) {
1134                         rc = -EFAULT;
1135                         atomic_inc(&sbi->ll_sa_wrong);
1136                         CDEBUG(D_READA, "Statahead for dir "DFID" hit "
1137                                "ratio too low: hit/miss %llu/%llu"
1138                                ", sent/replied %llu/%llu, stopping "
1139                                "statahead thread: pid %d\n",
1140                                PFID(&lli->lli_fid), sai->sai_hit,
1141                                sai->sai_miss, sai->sai_sent,
1142                                sai->sai_replied, current_pid());
1143                         break;
1144                 }
1145         }
1146         ll_dir_chain_fini(&chain);
1147         ll_finish_md_op_data(op_data);
1148
1149         if (rc < 0) {
1150                 spin_lock(&lli->lli_sa_lock);
1151                 thread_set_flags(sa_thread, SVC_STOPPING);
1152                 lli->lli_sa_enabled = 0;
1153                 spin_unlock(&lli->lli_sa_lock);
1154         }
1155
1156         /* statahead is finished, but statahead entries need to be cached, wait
1157          * for file release to stop me. */
1158         while (thread_is_running(sa_thread)) {
1159                 wait_event_idle(sa_thread->t_ctl_waitq,
1160                                 sa_has_callback(sai) ||
1161                                 !thread_is_running(sa_thread));
1162
1163                 sa_handle_callback(sai);
1164         }
1165
1166         EXIT;
1167 out:
1168         ll_stop_agl(sai);
1169
1170         /* wait for inflight statahead RPCs to finish, and then we can free sai
1171          * safely because statahead RPC will access sai data */
1172         while (sai->sai_sent != sai->sai_replied) {
1173                 /* in case we're not woken up, timeout wait */
1174                 wait_event_idle_timeout(sa_thread->t_ctl_waitq,
1175                                         sai->sai_sent == sai->sai_replied,
1176                                         cfs_time_seconds(1) >> 3);
1177         }
1178
1179         /* release resources held by statahead RPCs */
1180         sa_handle_callback(sai);
1181
1182         spin_lock(&lli->lli_sa_lock);
1183         thread_set_flags(sa_thread, SVC_STOPPED);
1184         spin_unlock(&lli->lli_sa_lock);
1185
1186         CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %.*s\n",
1187                sai, parent->d_name.len, parent->d_name.name);
1188
1189         wake_up(&sai->sai_waitq);
1190         wake_up(&sa_thread->t_ctl_waitq);
1191         ll_sai_put(sai);
1192
1193         return rc;
1194 }
1195
1196 /* authorize opened dir handle @key to statahead */
1197 void ll_authorize_statahead(struct inode *dir, void *key)
1198 {
1199         struct ll_inode_info *lli = ll_i2info(dir);
1200
1201         spin_lock(&lli->lli_sa_lock);
1202         if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL) {
1203                 /*
1204                  * if lli_sai is not NULL, it means previous statahead is not
1205                  * finished yet, we'd better not start a new statahead for now.
1206                  */
1207                 LASSERT(lli->lli_opendir_pid == 0);
1208                 lli->lli_opendir_key = key;
1209                 lli->lli_opendir_pid = current_pid();
1210                 lli->lli_sa_enabled = 1;
1211         }
1212         spin_unlock(&lli->lli_sa_lock);
1213 }
1214
1215 /*
1216  * deauthorize opened dir handle @key to statahead, and notify statahead thread
1217  * to quit if it's running.
1218  */
1219 void ll_deauthorize_statahead(struct inode *dir, void *key)
1220 {
1221         struct ll_inode_info *lli = ll_i2info(dir);
1222         struct ll_statahead_info *sai;
1223
1224         LASSERT(lli->lli_opendir_key == key);
1225         LASSERT(lli->lli_opendir_pid != 0);
1226
1227         CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1228                 PFID(&lli->lli_fid));
1229
1230         spin_lock(&lli->lli_sa_lock);
1231         lli->lli_opendir_key = NULL;
1232         lli->lli_opendir_pid = 0;
1233         lli->lli_sa_enabled = 0;
1234         sai = lli->lli_sai;
1235         if (sai != NULL && thread_is_running(&sai->sai_thread)) {
1236                 /*
1237                  * statahead thread may not quit yet because it needs to cache
1238                  * entries, now it's time to tell it to quit.
1239                  *
1240                  * In case sai is released, wake_up() is called inside spinlock,
1241                  * so we have to call smp_mb() explicitely to serialize ops.
1242                  */
1243                 thread_set_flags(&sai->sai_thread, SVC_STOPPING);
1244                 smp_mb();
1245                 wake_up(&sai->sai_thread.t_ctl_waitq);
1246         }
1247         spin_unlock(&lli->lli_sa_lock);
1248 }
1249
1250 enum {
1251         /**
1252          * not first dirent, or is "."
1253          */
1254         LS_NOT_FIRST_DE = 0,
1255         /**
1256          * the first non-hidden dirent
1257          */
1258         LS_FIRST_DE,
1259         /**
1260          * the first hidden dirent, that is "."
1261          */
1262         LS_FIRST_DOT_DE
1263 };
1264
1265 /* file is first dirent under @dir */
1266 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1267 {
1268         struct ll_dir_chain   chain;
1269         struct qstr          *target = &dentry->d_name;
1270         struct md_op_data    *op_data;
1271         int                   dot_de;
1272         struct page          *page = NULL;
1273         int                   rc = LS_NOT_FIRST_DE;
1274         __u64                 pos = 0;
1275         ENTRY;
1276
1277         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1278                                      LUSTRE_OPC_ANY, dir);
1279         if (IS_ERR(op_data))
1280                 RETURN(PTR_ERR(op_data));
1281         /**
1282          *FIXME choose the start offset of the readdir
1283          */
1284
1285         ll_dir_chain_init(&chain);
1286         page = ll_get_dir_page(dir, op_data, 0, &chain);
1287
1288         while (1) {
1289                 struct lu_dirpage *dp;
1290                 struct lu_dirent  *ent;
1291
1292                 if (IS_ERR(page)) {
1293                         struct ll_inode_info *lli = ll_i2info(dir);
1294
1295                         rc = PTR_ERR(page);
1296                         CERROR("%s: reading dir "DFID" at %llu opendir_pid = %u : rc = %d\n",
1297                                ll_i2sbi(dir)->ll_fsname,
1298                                PFID(ll_inode2fid(dir)), pos,
1299                                lli->lli_opendir_pid, rc);
1300                         break;
1301                 }
1302
1303                 dp = page_address(page);
1304                 for (ent = lu_dirent_start(dp); ent != NULL;
1305                      ent = lu_dirent_next(ent)) {
1306                         __u64 hash;
1307                         int namelen;
1308                         char *name;
1309
1310                         hash = le64_to_cpu(ent->lde_hash);
1311                         /* The ll_get_dir_page() can return any page containing
1312                          * the given hash which may be not the start hash. */
1313                         if (unlikely(hash < pos))
1314                                 continue;
1315
1316                         namelen = le16_to_cpu(ent->lde_namelen);
1317                         if (unlikely(namelen == 0))
1318                                 /*
1319                                  * skip dummy record.
1320                                  */
1321                                 continue;
1322
1323                         name = ent->lde_name;
1324                         if (name[0] == '.') {
1325                                 if (namelen == 1)
1326                                         /*
1327                                          * skip "."
1328                                          */
1329                                         continue;
1330                                 else if (name[1] == '.' && namelen == 2)
1331                                         /*
1332                                          * skip ".."
1333                                          */
1334                                         continue;
1335                                 else
1336                                         dot_de = 1;
1337                         } else {
1338                                 dot_de = 0;
1339                         }
1340
1341                         if (dot_de && target->name[0] != '.') {
1342                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1343                                        target->len, target->name,
1344                                        namelen, name);
1345                                 continue;
1346                         }
1347
1348                         if (target->len != namelen ||
1349                             memcmp(target->name, name, namelen) != 0)
1350                                 rc = LS_NOT_FIRST_DE;
1351                         else if (!dot_de)
1352                                 rc = LS_FIRST_DE;
1353                         else
1354                                 rc = LS_FIRST_DOT_DE;
1355
1356                         ll_release_page(dir, page, false);
1357                         GOTO(out, rc);
1358                 }
1359                 pos = le64_to_cpu(dp->ldp_hash_end);
1360                 if (pos == MDS_DIR_END_OFF) {
1361                         /*
1362                          * End of directory reached.
1363                          */
1364                         ll_release_page(dir, page, false);
1365                         GOTO(out, rc);
1366                 } else {
1367                         /*
1368                          * chain is exhausted
1369                          * Normal case: continue to the next page.
1370                          */
1371                         ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1372                                               LDF_COLLIDE);
1373                         page = ll_get_dir_page(dir, op_data, pos, &chain);
1374                 }
1375         }
1376         EXIT;
1377 out:
1378         ll_dir_chain_fini(&chain);
1379         ll_finish_md_op_data(op_data);
1380         return rc;
1381 }
1382
1383 /**
1384  * revalidate @dentryp from statahead cache
1385  *
1386  * \param[in] dir       parent directory
1387  * \param[in] sai       sai structure
1388  * \param[out] dentryp  pointer to dentry which will be revalidated
1389  * \param[in] unplug    unplug statahead window only (normally for negative
1390  *                      dentry)
1391  * \retval              1 on success, dentry is saved in @dentryp
1392  * \retval              0 if revalidation failed (no proper lock on client)
1393  * \retval              negative number upon error
1394  */
1395 static int revalidate_statahead_dentry(struct inode *dir,
1396                                         struct ll_statahead_info *sai,
1397                                         struct dentry **dentryp,
1398                                         bool unplug)
1399 {
1400         struct sa_entry *entry = NULL;
1401         struct ll_dentry_data *ldd;
1402         struct ll_inode_info *lli = ll_i2info(dir);
1403         int rc = 0;
1404         ENTRY;
1405
1406         if ((*dentryp)->d_name.name[0] == '.') {
1407                 if (sai->sai_ls_all ||
1408                     sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1409                         /*
1410                          * Hidden dentry is the first one, or statahead
1411                          * thread does not skip so many hidden dentries
1412                          * before "sai_ls_all" enabled as below.
1413                          */
1414                 } else {
1415                         if (!sai->sai_ls_all)
1416                                 /*
1417                                  * It maybe because hidden dentry is not
1418                                  * the first one, "sai_ls_all" was not
1419                                  * set, then "ls -al" missed. Enable
1420                                  * "sai_ls_all" for such case.
1421                                  */
1422                                 sai->sai_ls_all = 1;
1423
1424                         /*
1425                          * Such "getattr" has been skipped before
1426                          * "sai_ls_all" enabled as above.
1427                          */
1428                         sai->sai_miss_hidden++;
1429                         RETURN(-EAGAIN);
1430                 }
1431         }
1432
1433         if (unplug)
1434                 GOTO(out, rc = 1);
1435
1436         entry = sa_get(sai, &(*dentryp)->d_name);
1437         if (entry == NULL)
1438                 GOTO(out, rc = -EAGAIN);
1439
1440         /* if statahead is busy in readdir, help it do post-work */
1441         if (!sa_ready(entry) && sai->sai_in_readpage)
1442                 sa_handle_callback(sai);
1443
1444         if (!sa_ready(entry)) {
1445                 spin_lock(&lli->lli_sa_lock);
1446                 sai->sai_index_wait = entry->se_index;
1447                 spin_unlock(&lli->lli_sa_lock);
1448                 rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
1449                                              cfs_time_seconds(30));
1450                 if (rc == 0) {
1451                         /*
1452                          * entry may not be ready, so it may be used by inflight
1453                          * statahead RPC, don't free it.
1454                          */
1455                         entry = NULL;
1456                         GOTO(out, rc = -EAGAIN);
1457                 }
1458         }
1459
1460         /*
1461          * We need to see the value that was set immediately before we
1462          * were woken up.
1463          */
1464         if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
1465             entry->se_inode) {
1466                 struct inode *inode = entry->se_inode;
1467                 struct lookup_intent it = { .it_op = IT_GETATTR,
1468                                             .it_lock_handle =
1469                                                 entry->se_handle };
1470                 __u64 bits;
1471
1472                 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1473                                         ll_inode2fid(inode), &bits);
1474                 if (rc == 1) {
1475                         if ((*dentryp)->d_inode == NULL) {
1476                                 struct dentry *alias;
1477
1478                                 alias = ll_splice_alias(inode, *dentryp);
1479                                 if (IS_ERR(alias)) {
1480                                         ll_intent_release(&it);
1481                                         GOTO(out, rc = PTR_ERR(alias));
1482                                 }
1483                                 *dentryp = alias;
1484                                 /* statahead prepared this inode, transfer inode
1485                                  * refcount from sa_entry to dentry */
1486                                 entry->se_inode = NULL;
1487                         } else if ((*dentryp)->d_inode != inode) {
1488                                 /* revalidate, but inode is recreated */
1489                                 CDEBUG(D_READA,
1490                                         "%s: stale dentry %.*s inode "
1491                                         DFID", statahead inode "DFID
1492                                         "\n",
1493                                         ll_i2sbi(inode)->ll_fsname,
1494                                         (*dentryp)->d_name.len,
1495                                         (*dentryp)->d_name.name,
1496                                         PFID(ll_inode2fid((*dentryp)->d_inode)),
1497                                         PFID(ll_inode2fid(inode)));
1498                                 ll_intent_release(&it);
1499                                 GOTO(out, rc = -ESTALE);
1500                         }
1501
1502                         if ((bits & MDS_INODELOCK_LOOKUP) &&
1503                             d_lustre_invalid(*dentryp))
1504                                 d_lustre_revalidate(*dentryp);
1505                         ll_intent_release(&it);
1506                 }
1507         }
1508 out:
1509         /*
1510          * statahead cached sa_entry can be used only once, and will be killed
1511          * right after use, so if lookup/revalidate accessed statahead cache,
1512          * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
1513          * stat this file again, we know we've done statahead before, see
1514          * dentry_may_statahead().
1515          */
1516         ldd = ll_d2d(*dentryp);
1517         /* ldd can be NULL if llite lookup failed. */
1518         if (ldd != NULL)
1519                 ldd->lld_sa_generation = lli->lli_sa_generation;
1520         sa_put(sai, entry);
1521
1522         RETURN(rc);
1523 }
1524
1525 /**
1526  * start statahead thread
1527  *
1528  * \param[in] dir       parent directory
1529  * \param[in] dentry    dentry that triggers statahead, normally the first
1530  *                      dirent under @dir
1531  * \retval              -EAGAIN on success, because when this function is
1532  *                      called, it's already in lookup call, so client should
1533  *                      do it itself instead of waiting for statahead thread
1534  *                      to do it asynchronously.
1535  * \retval              negative number upon error
1536  */
1537 static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
1538 {
1539         struct ll_inode_info *lli = ll_i2info(dir);
1540         struct ll_statahead_info *sai = NULL;
1541         struct dentry *parent = dentry->d_parent;
1542         struct ptlrpc_thread *thread;
1543         struct task_struct *task;
1544         struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
1545         int first = LS_FIRST_DE;
1546         int rc = 0;
1547         ENTRY;
1548
1549         /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1550         first = is_first_dirent(dir, dentry);
1551         if (first == LS_NOT_FIRST_DE)
1552                 /* It is not "ls -{a}l" operation, no need statahead for it. */
1553                 GOTO(out, rc = -EFAULT);
1554
1555         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
1556                                        sbi->ll_sa_running_max)) {
1557                 CDEBUG(D_READA,
1558                        "Too many concurrent statahead instances, "
1559                        "avoid new statahead instance temporarily.\n");
1560                 GOTO(out, rc = -EMFILE);
1561         }
1562
1563         sai = ll_sai_alloc(parent);
1564         if (sai == NULL)
1565                 GOTO(out, rc = -ENOMEM);
1566
1567         sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
1568
1569         /* if current lli_opendir_key was deauthorized, or dir re-opened by
1570          * another process, don't start statahead, otherwise the newly spawned
1571          * statahead thread won't be notified to quit. */
1572         spin_lock(&lli->lli_sa_lock);
1573         if (unlikely(lli->lli_sai != NULL ||
1574                      lli->lli_opendir_key == NULL ||
1575                      lli->lli_opendir_pid != current->pid)) {
1576                 spin_unlock(&lli->lli_sa_lock);
1577                 GOTO(out, rc = -EPERM);
1578         }
1579         lli->lli_sai = sai;
1580         spin_unlock(&lli->lli_sa_lock);
1581
1582         CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %.*s]\n",
1583                current_pid(), parent->d_name.len, parent->d_name.name);
1584
1585         task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u",
1586                            lli->lli_opendir_pid);
1587         thread = &sai->sai_thread;
1588         if (IS_ERR(task)) {
1589                 spin_lock(&lli->lli_sa_lock);
1590                 lli->lli_sai = NULL;
1591                 spin_unlock(&lli->lli_sa_lock);
1592                 rc = PTR_ERR(task);
1593                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1594                 GOTO(out, rc);
1595         }
1596
1597         wait_event_idle(thread->t_ctl_waitq,
1598                         thread_is_running(thread) || thread_is_stopped(thread));
1599         ll_sai_put(sai);
1600
1601         /*
1602          * We don't stat-ahead for the first dirent since we are already in
1603          * lookup.
1604          */
1605         RETURN(-EAGAIN);
1606
1607 out:
1608         /* once we start statahead thread failed, disable statahead so that
1609          * subsequent stat won't waste time to try it. */
1610         spin_lock(&lli->lli_sa_lock);
1611         if (lli->lli_opendir_pid == current->pid)
1612                 lli->lli_sa_enabled = 0;
1613         spin_unlock(&lli->lli_sa_lock);
1614
1615         if (sai != NULL)
1616                 ll_sai_free(sai);
1617         if (first != LS_NOT_FIRST_DE)
1618                 atomic_dec(&sbi->ll_sa_running);
1619
1620         RETURN(rc);
1621 }
1622
1623 /**
1624  * statahead entry function, this is called when client getattr on a file, it
1625  * will start statahead thread if this is the first dir entry, else revalidate
1626  * dentry from statahead cache.
1627  *
1628  * \param[in]  dir      parent directory
1629  * \param[out] dentryp  dentry to getattr
1630  * \param[in]  unplug   unplug statahead window only (normally for negative
1631  *                      dentry)
1632  * \retval              1 on success
1633  * \retval              0 revalidation from statahead cache failed, caller needs
1634  *                      to getattr from server directly
1635  * \retval              negative number on error, caller often ignores this and
1636  *                      then getattr from server
1637  */
1638 int ll_statahead(struct inode *dir, struct dentry **dentryp, bool unplug)
1639 {
1640         struct ll_statahead_info *sai;
1641
1642         sai = ll_sai_get(dir);
1643         if (sai != NULL) {
1644                 int rc;
1645
1646                 rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
1647                 CDEBUG(D_READA, "revalidate statahead %.*s: %d.\n",
1648                         (*dentryp)->d_name.len, (*dentryp)->d_name.name, rc);
1649                 ll_sai_put(sai);
1650                 return rc;
1651         }
1652         return start_statahead_thread(dir, *dentryp);
1653 }