Whamcloud - gitweb
LU-17888 osd-ldiskfs: osd_scrub_refresh_mapping deadlock
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #include <linux/fs.h>
33 #include <linux/sched.h>
34 #include <linux/kthread.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38 #include <linux/delay.h>
39
40 #define DEBUG_SUBSYSTEM S_LLITE
41
42 #include <obd_support.h>
43 #include <lustre_dlm.h>
44 #include "llite_internal.h"
45
46 #define SA_OMITTED_ENTRY_MAX 8ULL
47
48 enum sa_entry_state {
49         /** negative values are for error cases */
50         SA_ENTRY_INIT = 0,      /** init entry */
51         SA_ENTRY_SUCC = 1,      /** stat succeed */
52         SA_ENTRY_INVA = 2,      /** invalid entry */
53 };
54
55 /*
56  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
57  * and in async stat callback ll_statahead_interpret() will prepare the inode
58  * and set lock data in the ptlrpcd context. Then the scanner process will be
59  * woken up if this entry is the waiting one, can access and free it.
60  */
61 struct sa_entry {
62         /* link into sai_entries */
63         struct list_head                 se_list;
64         /* link into sai hash table locally */
65         struct list_head                 se_hash;
66         /* entry index in the sai */
67         __u64                            se_index;
68         /* low layer ldlm lock handle */
69         __u64                            se_handle;
70         /* entry status */
71         enum sa_entry_state              se_state;
72         /* entry size, contains name */
73         int                              se_size;
74         /* pointer to the target inode */
75         struct inode                    *se_inode;
76         /* pointer to @sai per process struct */
77         struct ll_statahead_info        *se_sai;
78         /* entry name */
79         struct qstr                      se_qstr;
80         /* entry fid */
81         struct lu_fid                    se_fid;
82 };
83
84 static unsigned int sai_generation;
85 static DEFINE_SPINLOCK(sai_generation_lock);
86
87 static inline int sa_unhashed(struct sa_entry *entry)
88 {
89         return list_empty(&entry->se_hash);
90 }
91
92 /* sa_entry is ready to use */
93 static inline int sa_ready(struct sa_entry *entry)
94 {
95         /* Make sure sa_entry is updated and ready to use */
96         smp_rmb();
97         return (entry->se_state != SA_ENTRY_INIT);
98 }
99
100 /* hash value to put in sai_cache */
101 static inline int sa_hash(int val)
102 {
103         return val & LL_SA_CACHE_MASK;
104 }
105
106 /* hash entry into sax_cache */
107 static inline void
108 sa_rehash(struct ll_statahead_context *ctx, struct sa_entry *entry)
109 {
110         int i = sa_hash(entry->se_qstr.hash);
111
112         spin_lock(&ctx->sax_cache_lock[i]);
113         list_add_tail(&entry->se_hash, &ctx->sax_cache[i]);
114         spin_unlock(&ctx->sax_cache_lock[i]);
115 }
116
117 /* unhash entry from sai_cache */
118 static inline int sa_unhash(struct ll_statahead_context *ctx,
119                             struct sa_entry *entry, bool inuse_check)
120 {
121         struct ll_statahead_info *sai = entry->se_sai;
122         int i = sa_hash(entry->se_qstr.hash);
123         int rc = 0;
124
125         if (inuse_check && atomic_read(&sai->sai_inuse_count) > 0)
126                 return -EAGAIN;
127
128         spin_lock(&ctx->sax_cache_lock[i]);
129         if (inuse_check && atomic_read(&sai->sai_inuse_count) > 0)
130                 rc = -EAGAIN;
131         else
132                 list_del_init(&entry->se_hash);
133         spin_unlock(&ctx->sax_cache_lock[i]);
134
135         return rc;
136 }
137
138 static inline int agl_should_run(struct ll_statahead_info *sai,
139                                  struct inode *inode)
140 {
141         return inode && S_ISREG(inode->i_mode) && sai->sai_agl_task;
142 }
143
144 static inline struct ll_inode_info *
145 agl_first_entry(struct ll_statahead_info *sai)
146 {
147         return list_first_entry(&sai->sai_agls, struct ll_inode_info,
148                                 lli_agl_list);
149 }
150
151 /* statahead window is full */
152 static inline int sa_sent_full(struct ll_statahead_info *sai)
153 {
154         return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
155 }
156
157 /* Batch metadata handle */
158 static inline bool sa_has_batch_handle(struct ll_statahead_info *sai)
159 {
160         return sai->sai_bh != NULL;
161 }
162
163 static inline void ll_statahead_flush_nowait(struct ll_statahead_info *sai)
164 {
165         if (sa_has_batch_handle(sai)) {
166                 sai->sai_index_end = sai->sai_index - 1;
167                 (void) md_batch_flush(ll_i2mdexp(sai->sai_dentry->d_inode),
168                                       sai->sai_bh, false);
169         }
170 }
171
172 static inline int agl_list_empty(struct ll_statahead_info *sai)
173 {
174         return list_empty(&sai->sai_agls);
175 }
176
177 /**
178  * (1) hit ratio less than 80%
179  * or
180  * (2) consecutive miss more than 32
181  * then means low hit.
182  */
183 static inline int sa_low_hit(struct ll_statahead_info *sai)
184 {
185         return ((sai->sai_hit > 32 && sai->sai_hit < 4 * sai->sai_miss) ||
186                 (sai->sai_consecutive_miss > 32));
187 }
188
189 /*
190  * if the given index is behind of statahead window more than
191  * SA_OMITTED_ENTRY_MAX, then it is old.
192  */
193 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
194 {
195         return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
196                 sai->sai_index);
197 }
198
199 /* allocate sa_entry and hash it to allow scanner process to find it */
200 static struct sa_entry *
201 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
202          const char *name, int len, const struct lu_fid *fid)
203 {
204         struct ll_inode_info *lli;
205         struct sa_entry *entry;
206         int entry_size;
207         char *dname;
208
209         ENTRY;
210
211         entry_size = sizeof(struct sa_entry) +
212                      round_up(len + 1 /* for trailing NUL */, 4);
213         OBD_ALLOC(entry, entry_size);
214         if (unlikely(!entry))
215                 RETURN(ERR_PTR(-ENOMEM));
216
217         CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
218                len, name, entry, index);
219
220         entry->se_index = index;
221         entry->se_sai = sai;
222
223         entry->se_state = SA_ENTRY_INIT;
224         entry->se_size = entry_size;
225         dname = (char *)entry + sizeof(struct sa_entry);
226         memcpy(dname, name, len);
227         dname[len] = 0;
228         entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
229         entry->se_qstr.len = len;
230         entry->se_qstr.name = dname;
231
232         if (fid)
233                 entry->se_fid = *fid;
234
235         lli = ll_i2info(sai->sai_dentry->d_inode);
236         spin_lock(&lli->lli_sa_lock);
237         INIT_LIST_HEAD(&entry->se_list);
238         sa_rehash(lli->lli_sax, entry);
239         spin_unlock(&lli->lli_sa_lock);
240
241         atomic_inc(&sai->sai_cache_count);
242
243         RETURN(entry);
244 }
245
246 /* free sa_entry, which should have been unhashed and not in any list */
247 static void sa_free(struct ll_statahead_context *ctx, struct sa_entry *entry)
248 {
249         CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
250                entry->se_qstr.len, entry->se_qstr.name, entry,
251                entry->se_index);
252
253         LASSERT(list_empty(&entry->se_list));
254         LASSERT(sa_unhashed(entry));
255
256         OBD_FREE(entry, entry->se_size);
257 }
258
259 /*
260  * Find sa_entry by name, used by directory scanner. If @sai_pid is not the PID
261  * of the scanner (which means it may do statahead wrongly, return -EINVAL
262  * immediately.
263  */
264 static struct sa_entry *sa_get(struct ll_statahead_context *ctx,
265                                const struct qstr *qstr,
266                                struct ll_statahead_info **info)
267 {
268         struct sa_entry *entry;
269         int i = sa_hash(qstr->hash);
270
271         spin_lock(&ctx->sax_cache_lock[i]);
272         list_for_each_entry(entry, &ctx->sax_cache[i], se_hash) {
273                 if (entry->se_qstr.hash == qstr->hash &&
274                     entry->se_qstr.len == qstr->len &&
275                     memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) {
276                         struct ll_statahead_info *sai = entry->se_sai;
277
278                         if (sai->sai_pid != current->pid) {
279                                 CDEBUG(D_CACHE,
280                                        "%s: wrong pid=%d:%d for entry %.*s\n",
281                                        ll_i2sbi(ctx->sax_inode)->ll_fsname,
282                                        sai->sai_pid, current->pid,
283                                        entry->se_qstr.len, entry->se_qstr.name);
284                                 entry = ERR_PTR(-EINVAL);
285                                 *info = sai;
286                         }
287
288                         atomic_inc(&sai->sai_inuse_count);
289                         spin_unlock(&ctx->sax_cache_lock[i]);
290                         return entry;
291                 }
292         }
293         spin_unlock(&ctx->sax_cache_lock[i]);
294         return NULL;
295 }
296
297 /* unhash and unlink sa_entry, and then free it */
298 static inline int sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry,
299                           bool locked, bool inuse_check)
300 {
301         struct inode *dir = sai->sai_dentry->d_inode;
302         struct ll_inode_info *lli = ll_i2info(dir);
303         struct ll_statahead_context *ctx = lli->lli_sax;
304         int rc;
305
306         LASSERT(!list_empty(&entry->se_list));
307         LASSERT(sa_ready(entry));
308
309         rc = sa_unhash(ctx, entry, inuse_check);
310         if (rc)
311                 return rc;
312
313         if (!locked)
314                 spin_lock(&lli->lli_sa_lock);
315         list_del_init(&entry->se_list);
316         spin_unlock(&lli->lli_sa_lock);
317
318         iput(entry->se_inode);
319         atomic_dec(&sai->sai_cache_count);
320
321         sa_free(ctx, entry);
322         if (locked)
323                 spin_lock(&lli->lli_sa_lock);
324
325         return 0;
326 }
327
328 static inline int sa_kill_try(struct ll_statahead_info *sai,
329                               struct sa_entry *entry, bool locked)
330 {
331         return sa_kill(sai, entry, locked, true);
332 }
333
334 /* called by scanner after use, sa_entry will be killed */
335 static void
336 sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
337 {
338         struct ll_inode_info *lli = ll_i2info(dir);
339         struct sa_entry *tmp;
340         bool wakeup = false;
341         bool inuse = false;
342
343         if (entry && entry->se_state == SA_ENTRY_SUCC) {
344                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
345
346                 sai->sai_hit++;
347                 sai->sai_consecutive_miss = 0;
348                 if (sai->sai_max < sbi->ll_sa_max) {
349                         sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
350                         wakeup = true;
351                 } else if (sai->sai_max_batch_count > 0) {
352                         if (sai->sai_max >= sai->sai_max_batch_count &&
353                            (sai->sai_index_end - entry->se_index) %
354                            sai->sai_max_batch_count == 0) {
355                                 wakeup = true;
356                         } else if (entry->se_index == sai->sai_index_end) {
357                                 wakeup = true;
358                         }
359                 } else {
360                         wakeup = true;
361                 }
362         } else if (sai) {
363                 sai->sai_miss++;
364                 sai->sai_consecutive_miss++;
365                 wakeup = true;
366         }
367
368         if (entry) {
369                 inuse = true;
370                 sa_kill(sai, entry, false, false);
371                 CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_STATAHEAD_PAUSE, cfs_fail_val);
372         }
373
374         spin_lock(&lli->lli_sa_lock);
375         if (inuse) {
376                 /*
377                  * kill old completed entries. Maybe kicking old entries can
378                  * be ignored?
379                  */
380                 while ((tmp = list_first_entry_or_null(&sai->sai_entries,
381                                 struct sa_entry, se_list))) {
382                         if (!is_omitted_entry(sai, tmp->se_index))
383                                 break;
384
385                         /* ll_sa_lock is dropped by sa_kill(), restart list */
386                         sa_kill(sai, tmp, true, false);
387                 }
388         }
389         if (wakeup && sai->sai_task)
390                 wake_up_process(sai->sai_task);
391         if (inuse)
392                 atomic_dec(&sai->sai_inuse_count);
393         spin_unlock(&lli->lli_sa_lock);
394 }
395
396 /*
397  * update state and sort add entry to sai_entries by index, return true if
398  * scanner is waiting on this entry.
399  */
400 static bool
401 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
402 {
403         struct sa_entry *se;
404         struct list_head *pos = &sai->sai_entries;
405         __u64 index = entry->se_index;
406
407         LASSERT(!sa_ready(entry));
408         LASSERT(list_empty(&entry->se_list));
409
410         list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
411                 if (se->se_index < entry->se_index) {
412                         pos = &se->se_list;
413                         break;
414                 }
415         }
416         list_add(&entry->se_list, pos);
417         /*
418          * LU-9210: ll_statahead_interpet must be able to see this before
419          * we wake it up
420          */
421         smp_store_release(&entry->se_state,
422                           ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
423
424         return (index == sai->sai_index_wait);
425 }
426
427 /* finish async stat RPC arguments */
428 static void sa_fini_data(struct md_op_item *item)
429 {
430         struct md_op_data *op_data = &item->mop_data;
431
432         if (op_data->op_flags & MF_OPNAME_KMALLOCED)
433                 /* allocated via ll_setup_filename called from sa_prep_data */
434                 kfree(op_data->op_name);
435         ll_unlock_md_op_lsm(&item->mop_data);
436         iput(item->mop_dir);
437         if (item->mop_subpill_allocated)
438                 OBD_FREE_PTR(item->mop_pill);
439         OBD_FREE_PTR(item);
440 }
441
442 static int ll_statahead_interpret(struct md_op_item *item, int rc);
443
444 /*
445  * prepare arguments for async stat RPC.
446  */
447 static struct md_op_item *
448 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
449 {
450         struct md_op_item *item;
451         struct ldlm_enqueue_info *einfo;
452         struct md_op_data *op_data;
453
454         OBD_ALLOC_PTR(item);
455         if (!item)
456                 return ERR_PTR(-ENOMEM);
457
458         op_data = ll_prep_md_op_data(&item->mop_data, dir, child,
459                                      entry->se_qstr.name, entry->se_qstr.len, 0,
460                                      LUSTRE_OPC_ANY, NULL);
461         if (IS_ERR(op_data)) {
462                 OBD_FREE_PTR(item);
463                 return (struct md_op_item *)op_data;
464         }
465
466         if (!child)
467                 op_data->op_fid2 = entry->se_fid;
468
469         item->mop_opc = MD_OP_GETATTR;
470         item->mop_it.it_op = IT_GETATTR;
471         item->mop_dir = igrab(dir);
472         item->mop_cb = ll_statahead_interpret;
473         item->mop_cbdata = entry;
474
475         einfo = &item->mop_einfo;
476         einfo->ei_type = LDLM_IBITS;
477         einfo->ei_mode = it_to_lock_mode(&item->mop_it);
478         einfo->ei_cb_bl = ll_md_blocking_ast;
479         einfo->ei_cb_cp = ldlm_completion_ast;
480         einfo->ei_cb_gl = NULL;
481         einfo->ei_cbdata = NULL;
482         einfo->ei_req_slot = 1;
483
484         return item;
485 }
486
487 /*
488  * release resources used in async stat RPC, update entry state and wakeup if
489  * scanner process it waiting on this entry.
490  */
491 static void
492 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
493 {
494         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
495         bool wakeup;
496
497         spin_lock(&lli->lli_sa_lock);
498         wakeup = __sa_make_ready(sai, entry, ret);
499         spin_unlock(&lli->lli_sa_lock);
500
501         if (wakeup)
502                 wake_up(&sai->sai_waitq);
503 }
504
505 /* insert inode into the list of sai_agls */
506 static void ll_agl_add(struct ll_statahead_info *sai,
507                        struct inode *inode, int index)
508 {
509         struct ll_inode_info *child  = ll_i2info(inode);
510         struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
511
512         spin_lock(&child->lli_agl_lock);
513         if (child->lli_agl_index == 0) {
514                 child->lli_agl_index = index;
515                 spin_unlock(&child->lli_agl_lock);
516
517                 LASSERT(list_empty(&child->lli_agl_list));
518
519                 spin_lock(&parent->lli_agl_lock);
520                 /* Re-check under the lock */
521                 if (agl_should_run(sai, inode)) {
522                         if (agl_list_empty(sai))
523                                 wake_up_process(sai->sai_agl_task);
524                         igrab(inode);
525                         list_add_tail(&child->lli_agl_list, &sai->sai_agls);
526                 } else
527                         child->lli_agl_index = 0;
528                 spin_unlock(&parent->lli_agl_lock);
529         } else {
530                 spin_unlock(&child->lli_agl_lock);
531         }
532 }
533
534 /* Allocate sax */
535 static struct ll_statahead_context *ll_sax_alloc(struct inode *dir)
536 {
537         struct ll_statahead_context *ctx;
538         int i;
539
540         ENTRY;
541
542         OBD_ALLOC_PTR(ctx);
543         if (ctx == NULL)
544                 RETURN(NULL);
545
546         ctx->sax_inode = igrab(dir);
547         atomic_set(&ctx->sax_refcount, 1);
548         INIT_LIST_HEAD(&ctx->sax_sai_list);
549         for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
550                 INIT_LIST_HEAD(&ctx->sax_cache[i]);
551                 spin_lock_init(&ctx->sax_cache_lock[i]);
552         }
553
554         RETURN(ctx);
555 }
556
557 static inline void ll_sax_free(struct ll_statahead_context *ctx)
558 {
559         LASSERT(ctx->sax_inode != NULL);
560         iput(ctx->sax_inode);
561         OBD_FREE_PTR(ctx);
562 }
563
564 static inline void __ll_sax_get(struct ll_statahead_context *ctx)
565 {
566         atomic_inc(&ctx->sax_refcount);
567 }
568
569 static inline struct ll_statahead_context *ll_sax_get(struct inode *dir)
570 {
571         struct ll_inode_info *lli = ll_i2info(dir);
572         struct ll_statahead_context *ctx = NULL;
573
574         spin_lock(&lli->lli_sa_lock);
575         ctx = lli->lli_sax;
576         if (ctx)
577                 __ll_sax_get(ctx);
578         spin_unlock(&lli->lli_sa_lock);
579
580         return ctx;
581 }
582
583 static inline void ll_sax_put(struct inode *dir,
584                               struct ll_statahead_context *ctx)
585 {
586         struct ll_inode_info *lli = ll_i2info(dir);
587
588         if (atomic_dec_and_lock(&ctx->sax_refcount, &lli->lli_sa_lock)) {
589                 LASSERT(list_empty(&ctx->sax_sai_list));
590                 lli->lli_sai = NULL;
591                 lli->lli_sax = NULL;
592                 if (lli->lli_sa_pattern & (LSA_PATTERN_ADVISE |
593                                            LSA_PATTERN_FNAME)) {
594                         lli->lli_opendir_key = NULL;
595                         lli->lli_stat_pid = 0;
596                         lli->lli_sa_enabled = 0;
597                 }
598                 lli->lli_sa_pattern = LSA_PATTERN_NONE;
599                 spin_unlock(&lli->lli_sa_lock);
600
601                 ll_sax_free(ctx);
602         }
603 }
604
605 /* allocate sai */
606 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
607 {
608         struct ll_statahead_info *sai;
609         struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
610
611         ENTRY;
612
613         OBD_ALLOC_PTR(sai);
614         if (!sai)
615                 RETURN(NULL);
616
617         sai->sai_dentry = dget(dentry);
618         atomic_set(&sai->sai_refcount, 1);
619         sai->sai_max = ll_i2sbi(dentry->d_inode)->ll_sa_min;
620         sai->sai_index = 1;
621         init_waitqueue_head(&sai->sai_waitq);
622
623         INIT_LIST_HEAD(&sai->sai_item);
624         INIT_LIST_HEAD(&sai->sai_entries);
625         INIT_LIST_HEAD(&sai->sai_agls);
626
627         atomic_set(&sai->sai_cache_count, 0);
628         atomic_set(&sai->sai_inuse_count, 0);
629         spin_lock(&sai_generation_lock);
630         lli->lli_sa_generation = ++sai_generation;
631         if (unlikely(sai_generation == 0))
632                 lli->lli_sa_generation = ++sai_generation;
633         spin_unlock(&sai_generation_lock);
634
635         RETURN(sai);
636 }
637
638 /* free sai */
639 static inline void ll_sai_free(struct ll_statahead_info *sai)
640 {
641         LASSERT(sai->sai_dentry != NULL);
642         dput(sai->sai_dentry);
643         OBD_FREE_PTR(sai);
644 }
645
646 static inline struct ll_statahead_info *
647 __ll_sai_get(struct ll_statahead_info *sai)
648 {
649         atomic_inc(&sai->sai_refcount);
650         return sai;
651 }
652
653 /*
654  * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
655  * attached to it.
656  */
657 static void ll_sai_put(struct ll_statahead_info *sai)
658 {
659         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
660
661         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
662                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
663
664                 lli->lli_sai = NULL;
665                 list_del_init(&sai->sai_item);
666                 spin_unlock(&lli->lli_sa_lock);
667
668                 LASSERT(!sai->sai_task);
669                 LASSERT(!sai->sai_agl_task);
670                 LASSERT(sai->sai_sent == sai->sai_replied);
671
672                 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
673                 LASSERT(agl_list_empty(sai));
674
675                 ll_sai_free(sai);
676                 atomic_dec(&sbi->ll_sa_running);
677         }
678 }
679
680 /* Do NOT forget to drop inode refcount when into sai_agls. */
681 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
682 {
683         struct ll_inode_info *lli = ll_i2info(inode);
684         u64 index = lli->lli_agl_index;
685         ktime_t expire;
686         int rc;
687
688         ENTRY;
689
690         LASSERT(list_empty(&lli->lli_agl_list));
691
692         /* AGL maybe fall behind statahead with one entry */
693         if (is_omitted_entry(sai, index + 1)) {
694                 lli->lli_agl_index = 0;
695                 iput(inode);
696                 RETURN_EXIT;
697         }
698
699         /*
700          * In case of restore, the MDT has the right size and has already
701          * sent it back without granting the layout lock, inode is up-to-date.
702          * Then AGL (async glimpse lock) is useless.
703          * Also to glimpse we need the layout, in case of a runninh restore
704          * the MDT holds the layout lock so the glimpse will block up to the
705          * end of restore (statahead/agl will block)
706          */
707         if (test_bit(LLIF_FILE_RESTORING, &lli->lli_flags)) {
708                 lli->lli_agl_index = 0;
709                 iput(inode);
710                 RETURN_EXIT;
711         }
712
713         /* Someone is in glimpse (sync or async), do nothing. */
714         rc = down_write_trylock(&lli->lli_glimpse_sem);
715         if (rc == 0) {
716                 lli->lli_agl_index = 0;
717                 iput(inode);
718                 RETURN_EXIT;
719         }
720
721         /*
722          * Someone triggered glimpse within 1 sec before.
723          * 1) The former glimpse succeeded with glimpse lock granted by OST, and
724          *    if the lock is still cached on client, AGL needs to do nothing. If
725          *    it is cancelled by other client, AGL maybe cannot obtaion new lock
726          *    for no glimpse callback triggered by AGL.
727          * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
728          *    Under such case, it is quite possible that the OST will not grant
729          *    glimpse lock for AGL also.
730          * 3) The former glimpse failed, compared with other two cases, it is
731          *    relative rare. AGL can ignore such case, and it will not muchly
732          *    affect the performance.
733          */
734         expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
735         if (ktime_to_ns(lli->lli_glimpse_time) &&
736             ktime_before(expire, lli->lli_glimpse_time)) {
737                 up_write(&lli->lli_glimpse_sem);
738                 lli->lli_agl_index = 0;
739                 iput(inode);
740                 RETURN_EXIT;
741         }
742
743         CDEBUG(D_READA,
744                "Handling (init) async glimpse: inode = " DFID", idx = %llu\n",
745                PFID(&lli->lli_fid), index);
746
747         cl_agl(inode);
748         lli->lli_agl_index = 0;
749         lli->lli_glimpse_time = ktime_get();
750         up_write(&lli->lli_glimpse_sem);
751
752         CDEBUG(D_READA,
753                "Handled (init) async glimpse: inode= " DFID", idx = %llu, rc = %d\n",
754                PFID(&lli->lli_fid), index, rc);
755
756         iput(inode);
757
758         EXIT;
759 }
760
761 static void ll_statahead_interpret_fini(struct ll_inode_info *lli,
762                                         struct ll_statahead_info *sai,
763                                         struct md_op_item *item,
764                                         struct sa_entry *entry,
765                                         struct ptlrpc_request *req,
766                                         int rc)
767 {
768         /*
769          * First it will drop ldlm ibits lock refcount by calling
770          * ll_intent_drop_lock() in spite of failures. Do not worry about
771          * calling ll_intent_drop_lock() more than once.
772          */
773         ll_intent_release(&item->mop_it);
774         sa_fini_data(item);
775         if (req)
776                 ptlrpc_req_finished(req);
777         sa_make_ready(sai, entry, rc);
778
779         spin_lock(&lli->lli_sa_lock);
780         sai->sai_replied++;
781         spin_unlock(&lli->lli_sa_lock);
782 }
783
784 static void ll_statahead_interpret_work(struct work_struct *work)
785 {
786         struct md_op_item *item = container_of(work, struct md_op_item,
787                                                mop_work);
788         struct req_capsule *pill = item->mop_pill;
789         struct inode *dir = item->mop_dir;
790         struct ll_inode_info *lli = ll_i2info(dir);
791         struct ll_statahead_info *sai;
792         struct lookup_intent *it;
793         struct sa_entry *entry;
794         struct mdt_body *body;
795         struct inode *child;
796         int rc;
797
798         ENTRY;
799
800         entry = (struct sa_entry *)item->mop_cbdata;
801         LASSERT(entry->se_handle != 0);
802
803         sai = entry->se_sai;
804         it = &item->mop_it;
805         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
806         if (!body)
807                 GOTO(out, rc = -EFAULT);
808
809         child = entry->se_inode;
810         /* revalidate; unlinked and re-created with the same name */
811         if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
812                      !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
813                 if (child) {
814                         entry->se_inode = NULL;
815                         iput(child);
816                 }
817                 /* The mdt_body is invalid. Skip this entry */
818                 GOTO(out, rc = -EAGAIN);
819         }
820
821         it->it_lock_handle = entry->se_handle;
822         rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
823         if (rc != 1)
824                 GOTO(out, rc = -EAGAIN);
825
826         rc = ll_prep_inode(&child, pill, dir->i_sb, it);
827         if (rc) {
828                 CERROR("%s: getattr callback for %.*s "DFID": rc = %d\n",
829                        ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
830                        entry->se_qstr.name, PFID(&entry->se_fid), rc);
831                 GOTO(out, rc);
832         }
833
834         /* If encryption context was returned by MDT, put it in
835          * inode now to save an extra getxattr.
836          */
837         if (body->mbo_valid & OBD_MD_ENCCTX) {
838                 void *encctx = req_capsule_server_get(pill, &RMF_FILE_ENCCTX);
839                 __u32 encctxlen = req_capsule_get_size(pill, &RMF_FILE_ENCCTX,
840                                                        RCL_SERVER);
841
842                 if (encctxlen) {
843                         CDEBUG(D_SEC,
844                                "server returned encryption ctx for "DFID"\n",
845                                PFID(ll_inode2fid(child)));
846                         rc = ll_xattr_cache_insert(child,
847                                                    xattr_for_enc(child),
848                                                    encctx, encctxlen);
849                         if (rc)
850                                 CWARN("%s: cannot set enc ctx for "DFID": rc = %d\n",
851                                       ll_i2sbi(child)->ll_fsname,
852                                       PFID(ll_inode2fid(child)), rc);
853                 }
854         }
855
856         CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
857                ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
858                entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
859         ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
860
861         entry->se_inode = child;
862
863         if (agl_should_run(sai, child))
864                 ll_agl_add(sai, child, entry->se_index);
865 out:
866         ll_statahead_interpret_fini(lli, sai, item, entry, pill->rc_req, rc);
867 }
868
869 /*
870  * Callback for async stat RPC, this is called in ptlrpcd context. It prepares
871  * the inode and set lock data directly in the ptlrpcd context. It will wake up
872  * the directory listing process if the dentry is the waiting one.
873  */
874 static int ll_statahead_interpret(struct md_op_item *item, int rc)
875 {
876         struct req_capsule *pill = item->mop_pill;
877         struct lookup_intent *it = &item->mop_it;
878         struct inode *dir = item->mop_dir;
879         struct ll_inode_info *lli = ll_i2info(dir);
880         struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
881         struct work_struct *work = &item->mop_work;
882         struct ll_statahead_info *sai;
883         struct mdt_body *body;
884         struct inode *child;
885         __u64 handle = 0;
886
887         ENTRY;
888
889         if (it_disposition(it, DISP_LOOKUP_NEG))
890                 rc = -ENOENT;
891
892         /*
893          * because statahead thread will wait for all inflight RPC to finish,
894          * sai should be always valid, no need to refcount
895          */
896         LASSERT(entry != NULL);
897         sai = entry->se_sai;
898         LASSERT(sai != NULL);
899
900         CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
901                entry->se_qstr.len, entry->se_qstr.name, rc);
902
903         if (rc != 0)
904                 GOTO(out, rc);
905
906         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
907         if (!body)
908                 GOTO(out, rc = -EFAULT);
909
910         child = entry->se_inode;
911         /*
912          * revalidate; unlinked and re-created with the same name.
913          * exclude the case where FID is zero as it was from statahead with
914          * regularized file name pattern and had no idea for the FID of the
915          * children file.
916          */
917         if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
918                      !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
919                 if (child) {
920                         entry->se_inode = NULL;
921                         iput(child);
922                 }
923                 /* The mdt_body is invalid. Skip this entry */
924                 GOTO(out, rc = -EAGAIN);
925         }
926
927         entry->se_handle = it->it_lock_handle;
928         /*
929          * In ptlrpcd context, it is not allowed to generate new RPCs
930          * especially for striped directories or regular files with layout
931          * change.
932          */
933         /*
934          * release ibits lock ASAP to avoid deadlock when statahead
935          * thread enqueues lock on parent in readdir and another
936          * process enqueues lock on child with parent lock held, eg.
937          * unlink.
938          */
939         handle = it->it_lock_handle;
940         ll_intent_drop_lock(it);
941         ll_unlock_md_op_lsm(&item->mop_data);
942
943         /*
944          * If the statahead entry is a striped directory or regular file with
945          * layout change, it will generate a new RPC and long wait in the
946          * ptlrpcd context.
947          * However, it is dangerous of blocking in ptlrpcd thread.
948          * Here we use work queue or the separate statahead thread to handle
949          * the extra RPC and long wait:
950          *      (@ll_prep_inode->@lmv_revalidate_slaves);
951          *      (@ll_prep_inode->@lov_layout_change->osc_cache_wait_range);
952          */
953         INIT_WORK(work, ll_statahead_interpret_work);
954         ptlrpc_request_addref(pill->rc_req);
955         schedule_work(work);
956         RETURN(0);
957 out:
958         ll_statahead_interpret_fini(lli, sai, item, entry, NULL, rc);
959         RETURN(rc);
960 }
961
962 static inline int sa_getattr(struct ll_statahead_info *sai, struct inode *dir,
963                              struct md_op_item *item)
964 {
965         int rc;
966
967         if (sa_has_batch_handle(sai))
968                 rc = md_batch_add(ll_i2mdexp(dir), sai->sai_bh, item);
969         else
970                 rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
971
972         return rc;
973 }
974
975 /* async stat for file not found in dcache */
976 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
977 {
978         struct md_op_item *item;
979         int rc;
980
981         ENTRY;
982
983         item = sa_prep_data(dir, NULL, entry);
984         if (IS_ERR(item))
985                 RETURN(PTR_ERR(item));
986
987         rc = sa_getattr(entry->se_sai, dir, item);
988         if (rc < 0)
989                 sa_fini_data(item);
990
991         RETURN(rc);
992 }
993
994 /**
995  * async stat for file found in dcache, similar to .revalidate
996  *
997  * \retval      1 dentry valid, no RPC sent
998  * \retval      0 dentry invalid, will send async stat RPC
999  * \retval      negative number upon error
1000  */
1001 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
1002                          struct dentry *dentry)
1003 {
1004         struct inode *inode = dentry->d_inode;
1005         struct lookup_intent it = { .it_op = IT_GETATTR,
1006                                     .it_lock_handle = 0 };
1007         struct md_op_item *item;
1008         int rc;
1009
1010         ENTRY;
1011
1012         if (unlikely(!inode))
1013                 RETURN(1);
1014
1015         if (d_mountpoint(dentry))
1016                 RETURN(1);
1017
1018         item = sa_prep_data(dir, inode, entry);
1019         if (IS_ERR(item))
1020                 RETURN(PTR_ERR(item));
1021
1022         entry->se_inode = igrab(inode);
1023         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
1024                                 NULL);
1025         if (rc == 1) {
1026                 entry->se_handle = it.it_lock_handle;
1027                 ll_intent_release(&it);
1028                 sa_fini_data(item);
1029                 RETURN(1);
1030         }
1031
1032         rc = sa_getattr(entry->se_sai, dir, item);
1033         if (rc < 0) {
1034                 entry->se_inode = NULL;
1035                 iput(inode);
1036                 sa_fini_data(item);
1037         }
1038
1039         RETURN(rc);
1040 }
1041
1042 /* async stat for file with @name */
1043 static void sa_statahead(struct ll_statahead_info *sai, struct dentry *parent,
1044                          const char *name, int len, const struct lu_fid *fid)
1045 {
1046         struct inode *dir = parent->d_inode;
1047         struct dentry *dentry = NULL;
1048         struct sa_entry *entry;
1049         int rc;
1050
1051         ENTRY;
1052
1053         entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
1054         if (IS_ERR(entry))
1055                 RETURN_EXIT;
1056
1057         dentry = d_lookup(parent, &entry->se_qstr);
1058         if (!dentry) {
1059                 rc = sa_lookup(dir, entry);
1060         } else {
1061                 rc = sa_revalidate(dir, entry, dentry);
1062                 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
1063                         ll_agl_add(sai, dentry->d_inode, entry->se_index);
1064         }
1065
1066         if (dentry)
1067                 dput(dentry);
1068
1069         if (rc != 0)
1070                 sa_make_ready(sai, entry, rc);
1071         else
1072                 sai->sai_sent++;
1073
1074         sai->sai_index++;
1075
1076         if (sa_sent_full(sai))
1077                 ll_statahead_flush_nowait(sai);
1078
1079         EXIT;
1080 }
1081
1082 /* async glimpse (agl) thread main function */
1083 static int ll_agl_thread(void *arg)
1084 {
1085         /*
1086          * We already own this reference, so it is safe to take it
1087          * without a lock.
1088          */
1089         struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
1090         struct dentry *parent = sai->sai_dentry;
1091         struct inode *dir = parent->d_inode;
1092         struct ll_inode_info *plli = ll_i2info(dir);
1093         struct ll_inode_info *clli;
1094
1095         ENTRY;
1096
1097         CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
1098                sai, parent);
1099
1100         while (({set_current_state(TASK_IDLE);
1101                  !kthread_should_stop(); })) {
1102                 spin_lock(&plli->lli_agl_lock);
1103                 clli = list_first_entry_or_null(&sai->sai_agls,
1104                                                 struct ll_inode_info,
1105                                                 lli_agl_list);
1106                 if (clli) {
1107                         __set_current_state(TASK_RUNNING);
1108                         list_del_init(&clli->lli_agl_list);
1109                         spin_unlock(&plli->lli_agl_lock);
1110                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
1111                         cond_resched();
1112                 } else {
1113                         spin_unlock(&plli->lli_agl_lock);
1114                         schedule();
1115                 }
1116         }
1117         __set_current_state(TASK_RUNNING);
1118         RETURN(0);
1119 }
1120
1121 static void ll_stop_agl(struct ll_statahead_info *sai)
1122 {
1123         struct dentry *parent = sai->sai_dentry;
1124         struct ll_inode_info *plli = ll_i2info(parent->d_inode);
1125         struct ll_inode_info *clli;
1126         struct task_struct *agl_task;
1127
1128         spin_lock(&plli->lli_agl_lock);
1129         agl_task = sai->sai_agl_task;
1130         sai->sai_agl_task = NULL;
1131         spin_unlock(&plli->lli_agl_lock);
1132         if (!agl_task)
1133                 return;
1134
1135         CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
1136                sai, (unsigned int)agl_task->pid);
1137         kthread_stop(agl_task);
1138
1139         spin_lock(&plli->lli_agl_lock);
1140         while ((clli = list_first_entry_or_null(&sai->sai_agls,
1141                                                 struct ll_inode_info,
1142                                                 lli_agl_list)) != NULL) {
1143                 list_del_init(&clli->lli_agl_list);
1144                 spin_unlock(&plli->lli_agl_lock);
1145                 clli->lli_agl_index = 0;
1146                 iput(&clli->lli_vfs_inode);
1147                 spin_lock(&plli->lli_agl_lock);
1148         }
1149         spin_unlock(&plli->lli_agl_lock);
1150         CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
1151                sai, parent);
1152         ll_sai_put(sai);
1153 }
1154
1155 /* start agl thread */
1156 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
1157 {
1158         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1159         struct ll_inode_info *plli;
1160         struct task_struct *task;
1161
1162         ENTRY;
1163
1164         CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
1165                sai, parent);
1166
1167         plli = ll_i2info(parent->d_inode);
1168         task = kthread_create_on_node(ll_agl_thread, sai, node, "ll_agl_%d",
1169                                       plli->lli_stat_pid);
1170         if (IS_ERR(task)) {
1171                 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
1172                 RETURN_EXIT;
1173         }
1174         sai->sai_agl_task = task;
1175         atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
1176         /* Get an extra reference that the thread holds */
1177         __ll_sai_get(sai);
1178
1179         wake_up_process(task);
1180
1181         EXIT;
1182 }
1183
1184 static int ll_statahead_by_list(struct dentry *parent)
1185 {
1186         struct inode *dir = parent->d_inode;
1187         struct ll_inode_info *lli = ll_i2info(dir);
1188         struct ll_statahead_info *sai = lli->lli_sai;
1189         struct ll_sb_info *sbi = ll_i2sbi(dir);
1190         struct md_op_data *op_data;
1191         struct page *page = NULL;
1192         __u64 pos = 0;
1193         int first = 0;
1194         int rc = 0;
1195
1196         ENTRY;
1197
1198         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1199                sai, parent);
1200
1201         OBD_ALLOC_PTR(op_data);
1202         if (!op_data)
1203                 RETURN(-ENOMEM);
1204
1205         while (pos != MDS_DIR_END_OFF &&
1206                /* matches smp_store_release() in ll_deauthorize_statahead() */
1207                smp_load_acquire(&sai->sai_task) &&
1208                lli->lli_sa_enabled) {
1209                 struct lu_dirpage *dp;
1210                 struct lu_dirent  *ent;
1211
1212                 op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
1213                                              LUSTRE_OPC_ANY, dir);
1214                 if (IS_ERR(op_data)) {
1215                         rc = PTR_ERR(op_data);
1216                         break;
1217                 }
1218
1219                 page = ll_get_dir_page(dir, op_data, pos, NULL);
1220                 ll_unlock_md_op_lsm(op_data);
1221                 if (IS_ERR(page)) {
1222                         rc = PTR_ERR(page);
1223                         CDEBUG(D_READA,
1224                                "error reading dir "DFID" at %llu /%llu stat_pid = %u: rc = %d\n",
1225                                PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1226                                lli->lli_stat_pid, rc);
1227                         break;
1228                 }
1229
1230                 dp = page_address(page);
1231                 for (ent = lu_dirent_start(dp);
1232                      /* matches smp_store_release() in ll_deauthorize_statahead() */
1233                      ent != NULL && smp_load_acquire(&sai->sai_task) &&
1234                      !sa_low_hit(sai) && lli->lli_sa_enabled;
1235                      ent = lu_dirent_next(ent)) {
1236                         __u64 hash;
1237                         int namelen;
1238                         char *name;
1239                         struct lu_fid fid;
1240                         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1241
1242                         hash = le64_to_cpu(ent->lde_hash);
1243                         if (unlikely(hash < pos))
1244                                 /*
1245                                  * Skip until we find target hash value.
1246                                  */
1247                                 continue;
1248
1249                         namelen = le16_to_cpu(ent->lde_namelen);
1250                         if (unlikely(namelen == 0))
1251                                 /*
1252                                  * Skip dummy record.
1253                                  */
1254                                 continue;
1255
1256                         name = ent->lde_name;
1257                         if (name[0] == '.') {
1258                                 if (namelen == 1) {
1259                                         /*
1260                                          * skip "."
1261                                          */
1262                                         continue;
1263                                 } else if (name[1] == '.' && namelen == 2) {
1264                                         /*
1265                                          * skip ".."
1266                                          */
1267                                         continue;
1268                                 } else if (!sai->sai_ls_all) {
1269                                         /*
1270                                          * skip hidden files.
1271                                          */
1272                                         sai->sai_skip_hidden++;
1273                                         continue;
1274                                 }
1275                         }
1276
1277                         /*
1278                          * don't stat-ahead first entry.
1279                          */
1280                         if (unlikely(++first == 1))
1281                                 continue;
1282
1283                         fid_le_to_cpu(&fid, &ent->lde_fid);
1284
1285                         while (({set_current_state(TASK_IDLE);
1286                                  /* matches smp_store_release() in
1287                                   * ll_deauthorize_statahead()
1288                                   */
1289                                  smp_load_acquire(&sai->sai_task); })) {
1290                                 long timeout;
1291
1292                                 spin_lock(&lli->lli_agl_lock);
1293                                 while (sa_sent_full(sai) &&
1294                                        !agl_list_empty(sai)) {
1295                                         struct ll_inode_info *clli;
1296
1297                                         __set_current_state(TASK_RUNNING);
1298                                         clli = agl_first_entry(sai);
1299                                         list_del_init(&clli->lli_agl_list);
1300                                         spin_unlock(&lli->lli_agl_lock);
1301
1302                                         ll_agl_trigger(&clli->lli_vfs_inode,
1303                                                        sai);
1304                                         cond_resched();
1305                                         spin_lock(&lli->lli_agl_lock);
1306                                 }
1307                                 spin_unlock(&lli->lli_agl_lock);
1308
1309                                 if (!sa_sent_full(sai))
1310                                         break;
1311
1312                                 /*
1313                                  * If the thread is not doing stat in
1314                                  * @sbi->ll_sa_timeout (30s) then it probably
1315                                  * does not care too much about performance,
1316                                  * or is no longer using this directory.
1317                                  * Stop the statahead thread in this case.
1318                                  */
1319                                 timeout = schedule_timeout(
1320                                         cfs_time_seconds(sbi->ll_sa_timeout));
1321                                 if (timeout == 0) {
1322                                         lli->lli_sa_enabled = 0;
1323                                         break;
1324                                 }
1325                         }
1326                         __set_current_state(TASK_RUNNING);
1327
1328                         if (IS_ENCRYPTED(dir)) {
1329                                 struct llcrypt_str de_name =
1330                                         LLTR_INIT(ent->lde_name, namelen);
1331                                 struct lu_fid fid;
1332
1333                                 rc = llcrypt_fname_alloc_buffer(dir, NAME_MAX,
1334                                                                 &lltr);
1335                                 if (rc < 0)
1336                                         continue;
1337
1338                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1339                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1340                                                          &lltr, &fid)) {
1341                                         llcrypt_fname_free_buffer(&lltr);
1342                                         continue;
1343                                 }
1344
1345                                 name = lltr.name;
1346                                 namelen = lltr.len;
1347                         }
1348
1349                         sa_statahead(sai, parent, name, namelen, &fid);
1350                         llcrypt_fname_free_buffer(&lltr);
1351                 }
1352
1353                 pos = le64_to_cpu(dp->ldp_hash_end);
1354                 ll_release_page(dir, page,
1355                                 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1356
1357                 if (sa_low_hit(sai)) {
1358                         rc = -EFAULT;
1359                         atomic_inc(&sbi->ll_sa_wrong);
1360                         CDEBUG(D_READA,
1361                                "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stoppingstatahead thread: pid %d\n",
1362                                PFID(&lli->lli_fid), sai->sai_hit,
1363                                sai->sai_miss, sai->sai_sent,
1364                                sai->sai_replied, current->pid);
1365                         break;
1366                 }
1367         }
1368         ll_finish_md_op_data(op_data);
1369
1370         RETURN(rc);
1371 }
1372
1373 static void ll_statahead_handle(struct ll_statahead_info *sai,
1374                                 struct dentry *parent, const char *name,
1375                                 int len, const struct lu_fid *fid)
1376 {
1377         struct inode *dir = parent->d_inode;
1378         struct ll_inode_info *lli = ll_i2info(dir);
1379         struct ll_sb_info *sbi = ll_i2sbi(dir);
1380         long timeout;
1381
1382         while (({set_current_state(TASK_IDLE);
1383                 /* matches smp_store_release() in ll_deauthorize_statahead() */
1384                  smp_load_acquire(&sai->sai_task); })) {
1385                 spin_lock(&lli->lli_agl_lock);
1386                 while (sa_sent_full(sai) && !agl_list_empty(sai)) {
1387                         struct ll_inode_info *clli;
1388
1389                         __set_current_state(TASK_RUNNING);
1390                         clli = agl_first_entry(sai);
1391                         list_del_init(&clli->lli_agl_list);
1392                         spin_unlock(&lli->lli_agl_lock);
1393
1394                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
1395                         cond_resched();
1396                         spin_lock(&lli->lli_agl_lock);
1397                 }
1398                 spin_unlock(&lli->lli_agl_lock);
1399
1400                 if (!sa_sent_full(sai))
1401                         break;
1402
1403                 /*
1404                  * If the thread is not doing a stat in 30s then it probably
1405                  * does not care too much about performance, or is no longer
1406                  * using this directory. Stop the statahead thread in this case.
1407                  */
1408                 timeout = schedule_timeout(
1409                                 cfs_time_seconds(sbi->ll_sa_timeout));
1410                 if (timeout == 0) {
1411                         lli->lli_sa_enabled = 0;
1412                         break;
1413                 }
1414         }
1415         __set_current_state(TASK_RUNNING);
1416
1417         sa_statahead(sai, parent, name, len, fid);
1418 }
1419
1420 static int ll_statahead_by_advise(struct ll_statahead_info *sai,
1421                                   struct dentry *parent)
1422 {
1423         struct inode *dir = parent->d_inode;
1424         struct ll_inode_info *lli = ll_i2info(dir);
1425         struct ll_sb_info *sbi = ll_i2sbi(dir);
1426         size_t max_len;
1427         size_t len;
1428         char *fname;
1429         char *ptr;
1430         int rc = 0;
1431         __u64 i = 0;
1432
1433         ENTRY;
1434
1435         CDEBUG(D_READA, "%s: ADVISE statahead: parent %pd fname prefix %s\n",
1436                sbi->ll_fsname, parent, sai->sai_fname);
1437
1438         OBD_ALLOC(fname, NAME_MAX);
1439         if (fname == NULL)
1440                 RETURN(-ENOMEM);
1441
1442         len = strlen(sai->sai_fname);
1443         memcpy(fname, sai->sai_fname, len);
1444         max_len = sizeof(sai->sai_fname) - len;
1445         ptr = fname + len;
1446
1447         /* matches smp_store_release() in ll_deauthorize_statahead() */
1448         while (smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled) {
1449                 size_t numlen;
1450
1451                 numlen = snprintf(ptr, max_len, "%llu",
1452                                   sai->sai_fstart + i);
1453
1454                 ll_statahead_handle(sai, parent, fname, len + numlen, NULL);
1455                 if (++i >= sai->sai_fend)
1456                         break;
1457         }
1458
1459         OBD_FREE(fname, NAME_MAX);
1460         RETURN(rc);
1461 }
1462
1463 static int ll_statahead_by_fname(struct ll_statahead_info *sai,
1464                                  struct dentry *parent)
1465 {
1466         struct inode *dir = parent->d_inode;
1467         struct ll_inode_info *lli = ll_i2info(dir);
1468         struct ll_sb_info *sbi = ll_i2sbi(dir);
1469         size_t max_len;
1470         size_t len;
1471         char *fname;
1472         char *ptr;
1473         int rc = 0;
1474
1475         ENTRY;
1476
1477         CDEBUG(D_READA, "%s: FNAME statahead: parent %pd fname prefix %s\n",
1478                sbi->ll_fsname, parent, sai->sai_fname);
1479
1480         OBD_ALLOC(fname, NAME_MAX);
1481         if (fname == NULL)
1482                 RETURN(-ENOMEM);
1483
1484         len = strlen(sai->sai_fname);
1485         memcpy(fname, sai->sai_fname, len);
1486         max_len = sizeof(sai->sai_fname) - len;
1487         ptr = fname + len;
1488
1489         /* matches smp_store_release() in ll_deauthorize_statahead() */
1490         while (smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled) {
1491                 size_t numlen;
1492
1493                 if (sai->sai_fname_zeroed_len)
1494                         numlen = snprintf(ptr, max_len, "%0*llu",
1495                                           sai->sai_fname_zeroed_len,
1496                                           ++sai->sai_fname_index);
1497                 else
1498                         numlen = snprintf(ptr, max_len, "%llu",
1499                                           ++sai->sai_fname_index);
1500
1501                 ll_statahead_handle(sai, parent, fname, len + numlen, NULL);
1502
1503                 if (sa_low_hit(sai)) {
1504                         rc = -EFAULT;
1505                         atomic_inc(&sbi->ll_sa_wrong);
1506                         CDEBUG(D_CACHE, "%s: low hit ratio for %pd "DFID": hit=%llu miss=%llu sent=%llu replied=%llu, stopping PID %d\n",
1507                                sbi->ll_fsname, parent, PFID(ll_inode2fid(dir)),
1508                                sai->sai_hit, sai->sai_miss, sai->sai_sent,
1509                                sai->sai_replied, current->pid);
1510                         break;
1511                 }
1512         }
1513
1514         OBD_FREE(fname, NAME_MAX);
1515         RETURN(rc);
1516 }
1517
1518 /* statahead thread main function */
1519 static int ll_statahead_thread(void *arg)
1520 {
1521         struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
1522         struct dentry *parent = sai->sai_dentry;
1523         struct inode *dir = parent->d_inode;
1524         struct ll_inode_info *lli = ll_i2info(dir);
1525         struct ll_sb_info *sbi = ll_i2sbi(dir);
1526         struct lu_batch *bh = NULL;
1527         struct sa_entry *entry;
1528         int tries = 0;
1529         int rc = 0;
1530
1531         ENTRY;
1532
1533         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1534                sai, parent);
1535
1536         sai->sai_max_batch_count = sbi->ll_sa_batch_max;
1537         if (sai->sai_max_batch_count) {
1538                 bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
1539                                      sai->sai_max_batch_count);
1540                 if (IS_ERR(bh))
1541                         GOTO(out_stop_agl, rc = PTR_ERR(bh));
1542         }
1543
1544         sai->sai_bh = bh;
1545
1546         switch (lli->lli_sa_pattern & LSA_PATTERN_MASK) {
1547         case LSA_PATTERN_LIST:
1548                 rc = ll_statahead_by_list(parent);
1549                 break;
1550         case LSA_PATTERN_ADVISE:
1551                 rc = ll_statahead_by_advise(sai, parent);
1552                 break;
1553         case LSA_PATTERN_FNAME:
1554                 rc = ll_statahead_by_fname(sai, parent);
1555                 break;
1556         default:
1557                 rc = -EFAULT;
1558                 break;
1559         }
1560
1561         if (rc < 0) {
1562                 spin_lock(&lli->lli_sa_lock);
1563                 sai->sai_task = NULL;
1564                 spin_unlock(&lli->lli_sa_lock);
1565         }
1566
1567         ll_statahead_flush_nowait(sai);
1568
1569         /*
1570          * statahead is finished, but statahead entries need to be cached, wait
1571          * for file release closedir() call to stop me.
1572          */
1573         while (({set_current_state(TASK_IDLE);
1574                 /* matches smp_store_release() in ll_deauthorize_statahead() */
1575                 smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled; })) {
1576                 schedule();
1577         }
1578         __set_current_state(TASK_RUNNING);
1579
1580         EXIT;
1581
1582         if (bh) {
1583                 rc = md_batch_stop(ll_i2mdexp(dir), sai->sai_bh);
1584                 sai->sai_bh = NULL;
1585         }
1586
1587 out_stop_agl:
1588         ll_stop_agl(sai);
1589
1590         /*
1591          * wait for inflight statahead RPCs to finish, and then we can free sai
1592          * safely because statahead RPC will access sai data
1593          */
1594         while (sai->sai_sent != sai->sai_replied)
1595                 /* in case we're not woken up, timeout wait */
1596                 msleep(125);
1597
1598         CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd hit %llu miss %llu\n",
1599                sbi->ll_fsname, sai, parent, sai->sai_hit, sai->sai_miss);
1600
1601         spin_lock(&lli->lli_sa_lock);
1602         sai->sai_task = NULL;
1603         spin_unlock(&lli->lli_sa_lock);
1604         wake_up(&sai->sai_waitq);
1605
1606         atomic_add(sai->sai_hit, &sbi->ll_sa_hit_total);
1607         atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
1608
1609         /* Kill all local cached entry. */
1610         spin_lock(&lli->lli_sa_lock);
1611         while ((entry = list_first_entry_or_null(&sai->sai_entries,
1612                                                  struct sa_entry, se_list))) {
1613                 /*
1614                  * If the entry is being used by the user process, wait for
1615                  * inuse entry finished and restart to kill local cached
1616                  * entries.
1617                  */
1618                 if (sa_kill_try(sai, entry, true)) {
1619                         spin_unlock(&lli->lli_sa_lock);
1620                         msleep(125);
1621                         if (++tries % 1024 == 0) {
1622                                 CWARN("%s: statahead thread waited %lums for inuse entry "DFID" to be finished\n",
1623                                       sbi->ll_fsname, tries * 125/MSEC_PER_SEC,
1624                                       PFID(&entry->se_fid));
1625                         }
1626                         spin_lock(&lli->lli_sa_lock);
1627                 }
1628         }
1629         spin_unlock(&lli->lli_sa_lock);
1630
1631         ll_sai_put(sai);
1632         ll_sax_put(dir, lli->lli_sax);
1633
1634         return rc;
1635 }
1636
1637 /* authorize opened dir handle @key to statahead */
1638 void ll_authorize_statahead(struct inode *dir, void *key)
1639 {
1640         struct ll_inode_info *lli = ll_i2info(dir);
1641
1642         spin_lock(&lli->lli_sa_lock);
1643         if (!lli->lli_opendir_key && !lli->lli_sai) {
1644                 /*
1645                  * if lli_sai is not NULL, it means previous statahead is not
1646                  * finished yet, we'd better not start a new statahead for now.
1647                  */
1648                 lli->lli_opendir_key = key;
1649                 lli->lli_stat_pid = current->pid;
1650                 lli->lli_sa_enabled = 1;
1651                 lli->lli_sa_pattern |= LSA_PATTERN_OPENDIR;
1652         }
1653         spin_unlock(&lli->lli_sa_lock);
1654 }
1655
1656 static void ll_deauthorize_statahead_advise(struct inode *dir, void *key)
1657 {
1658         struct ll_inode_info *lli = ll_i2info(dir);
1659         struct ll_file_data *fd = (struct ll_file_data *)key;
1660         struct ll_statahead_info *sai = fd->fd_sai;
1661
1662         if (sai == NULL)
1663                 return;
1664
1665         spin_lock(&lli->lli_sa_lock);
1666         if (sai->sai_task) {
1667                 struct task_struct *task = sai->sai_task;
1668
1669                 /* matches smp_load_acquire() in ll_statahead_thread() */
1670                 smp_store_release(&sai->sai_task, NULL);
1671                 wake_up_process(task);
1672         }
1673         fd->fd_sai = NULL;
1674         spin_unlock(&lli->lli_sa_lock);
1675         ll_sai_put(sai);
1676         LASSERT(lli->lli_sax != NULL);
1677         ll_sax_put(dir, lli->lli_sax);
1678 }
1679
1680 /*
1681  * deauthorize opened dir handle @key to statahead, and notify statahead thread
1682  * to quit if it's running.
1683  */
1684 void ll_deauthorize_statahead(struct inode *dir, void *key)
1685 {
1686         struct ll_inode_info *lli = ll_i2info(dir);
1687         struct ll_statahead_info *sai;
1688
1689         CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1690                PFID(&lli->lli_fid));
1691
1692         if (lli->lli_sa_pattern & LSA_PATTERN_ADVISE) {
1693                 ll_deauthorize_statahead_advise(dir, key);
1694                 return;
1695         }
1696
1697         LASSERT(lli->lli_stat_pid != 0);
1698         LASSERT(lli->lli_opendir_key == key);
1699         spin_lock(&lli->lli_sa_lock);
1700         lli->lli_opendir_key = NULL;
1701         lli->lli_stat_pid = 0;
1702         lli->lli_sa_enabled = 0;
1703         lli->lli_sa_pattern = LSA_PATTERN_NONE;
1704         lli->lli_sa_fname_index = 0;
1705         lli->lli_sa_match_count = 0;
1706         sai = lli->lli_sai;
1707         if (sai && sai->sai_task) {
1708                 /*
1709                  * statahead thread may not have quit yet because it needs to
1710                  * cache entries, now it's time to tell it to quit.
1711                  *
1712                  * wake_up_process() provides the necessary barriers
1713                  * to pair with set_current_state().
1714                  */
1715                 struct task_struct *task = sai->sai_task;
1716
1717                 /* matches smp_load_acquire() in ll_statahead_thread() */
1718                 smp_store_release(&sai->sai_task, NULL);
1719                 wake_up_process(task);
1720         }
1721         spin_unlock(&lli->lli_sa_lock);
1722 }
1723
1724 enum {
1725         /**
1726          * not first dirent, or is "."
1727          */
1728         LS_NOT_FIRST_DE = 0,
1729         /**
1730          * the first non-hidden dirent
1731          */
1732         LS_FIRST_DE,
1733         /**
1734          * the first hidden dirent, that is "."
1735          */
1736         LS_FIRST_DOT_DE
1737 };
1738
1739 /* file is first dirent under @dir */
1740 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1741 {
1742         struct qstr *target = &dentry->d_name;
1743         struct md_op_data *op_data;
1744         int dot_de;
1745         struct page *page = NULL;
1746         int rc = LS_NOT_FIRST_DE;
1747         __u64 pos = 0;
1748         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1749
1750         ENTRY;
1751
1752         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1753                                      LUSTRE_OPC_ANY, dir);
1754         if (IS_ERR(op_data))
1755                 RETURN(PTR_ERR(op_data));
1756
1757         if (IS_ENCRYPTED(dir)) {
1758                 int rc2 = llcrypt_fname_alloc_buffer(dir, NAME_MAX, &lltr);
1759
1760                 if (rc2 < 0)
1761                         RETURN(rc2);
1762         }
1763
1764         /**
1765          *FIXME choose the start offset of the readdir
1766          */
1767
1768         page = ll_get_dir_page(dir, op_data, 0, NULL);
1769
1770         while (1) {
1771                 struct lu_dirpage *dp;
1772                 struct lu_dirent  *ent;
1773
1774                 if (IS_ERR(page)) {
1775                         struct ll_inode_info *lli = ll_i2info(dir);
1776
1777                         rc = PTR_ERR(page);
1778                         CERROR("%s: reading dir "DFID" at %llu stat_pid = %u : rc = %d\n",
1779                                ll_i2sbi(dir)->ll_fsname,
1780                                PFID(ll_inode2fid(dir)), pos,
1781                                lli->lli_stat_pid, rc);
1782                         break;
1783                 }
1784
1785                 dp = page_address(page);
1786                 for (ent = lu_dirent_start(dp); ent != NULL;
1787                      ent = lu_dirent_next(ent)) {
1788                         __u64 hash;
1789                         int namelen;
1790                         char *name;
1791
1792                         hash = le64_to_cpu(ent->lde_hash);
1793                         /*
1794                          * The ll_get_dir_page() can return any page containing
1795                          * the given hash which may be not the start hash.
1796                          */
1797                         if (unlikely(hash < pos))
1798                                 continue;
1799
1800                         namelen = le16_to_cpu(ent->lde_namelen);
1801                         if (unlikely(namelen == 0))
1802                                 /*
1803                                  * skip dummy record.
1804                                  */
1805                                 continue;
1806
1807                         name = ent->lde_name;
1808                         if (name[0] == '.') {
1809                                 if (namelen == 1)
1810                                         /*
1811                                          * skip "."
1812                                          */
1813                                         continue;
1814                                 else if (name[1] == '.' && namelen == 2)
1815                                         /*
1816                                          * skip ".."
1817                                          */
1818                                         continue;
1819                                 else
1820                                         dot_de = 1;
1821                         } else {
1822                                 dot_de = 0;
1823                         }
1824
1825                         if (dot_de && target->name[0] != '.') {
1826                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1827                                        target->len, target->name,
1828                                        namelen, name);
1829                                 continue;
1830                         }
1831
1832                         if (IS_ENCRYPTED(dir)) {
1833                                 struct llcrypt_str de_name =
1834                                         LLTR_INIT(ent->lde_name, namelen);
1835                                 struct lu_fid fid;
1836
1837                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1838                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1839                                                          &lltr, &fid))
1840                                         continue;
1841                                 name = lltr.name;
1842                                 namelen = lltr.len;
1843                         }
1844
1845                         if (target->len != namelen ||
1846                             memcmp(target->name, name, namelen) != 0)
1847                                 rc = LS_NOT_FIRST_DE;
1848                         else if (!dot_de)
1849                                 rc = LS_FIRST_DE;
1850                         else
1851                                 rc = LS_FIRST_DOT_DE;
1852
1853                         ll_release_page(dir, page, false);
1854                         GOTO(out, rc);
1855                 }
1856                 pos = le64_to_cpu(dp->ldp_hash_end);
1857                 if (pos == MDS_DIR_END_OFF) {
1858                         /*
1859                          * End of directory reached.
1860                          */
1861                         ll_release_page(dir, page, false);
1862                         GOTO(out, rc);
1863                 } else {
1864                         /*
1865                          * chain is exhausted
1866                          * Normal case: continue to the next page.
1867                          */
1868                         ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1869                                               LDF_COLLIDE);
1870                         page = ll_get_dir_page(dir, op_data, pos, NULL);
1871                 }
1872         }
1873         EXIT;
1874 out:
1875         llcrypt_fname_free_buffer(&lltr);
1876         ll_finish_md_op_data(op_data);
1877
1878         return rc;
1879 }
1880
1881 static struct ll_statahead_info *
1882 ll_find_sai_locked(struct ll_statahead_context *ctx, pid_t pid)
1883 {
1884         struct ll_statahead_info *sai;
1885
1886         list_for_each_entry(sai, &ctx->sax_sai_list, sai_item) {
1887                 if (sai->sai_pid == pid)
1888                         return sai;
1889         }
1890         return NULL;
1891 }
1892
1893 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
1894                                   bool agl);
1895
1896 static int ll_shared_statahead_check(struct inode *dir, struct dentry *dentry,
1897                                      struct ll_statahead_context *ctx)
1898 {
1899         struct ll_inode_info *lli = ll_i2info(dir);
1900         struct ll_statahead_info *sai;
1901
1902         ENTRY;
1903
1904         spin_lock(&lli->lli_sa_lock);
1905         sai = lli->lli_sai;
1906         if (sai) {
1907                 if (sai->sai_pid == current->pid) {
1908                         spin_unlock(&lli->lli_sa_lock);
1909                         RETURN(0);
1910                 }
1911                 lli->lli_sai = NULL;
1912                 lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
1913         }
1914
1915         sai = ll_find_sai_locked(ctx, current->pid);
1916         if (sai) {
1917                 spin_unlock(&lli->lli_sa_lock);
1918                 RETURN(-EEXIST);
1919         }
1920
1921         lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
1922         spin_unlock(&lli->lli_sa_lock);
1923
1924         RETURN(start_statahead_thread(dir, dentry, true));
1925 }
1926
1927 /**
1928  * revalidate @dentryp from statahead cache
1929  *
1930  * \param[in] dir       parent directory
1931  * \param[in] sai       sai structure
1932  * \param[out] dentryp  pointer to dentry which will be revalidated
1933  * \param[in] unplug    unplug statahead window only (normally for negative
1934  *                      dentry)
1935  * \retval              1 on success, dentry is saved in @dentryp
1936  * \retval              0 if revalidation failed (no proper lock on client)
1937  * \retval              negative number upon error
1938  */
1939 static int revalidate_statahead_dentry(struct inode *dir,
1940                                        struct ll_statahead_context *ctx,
1941                                        struct dentry **dentryp,
1942                                        bool unplug)
1943 {
1944         struct sa_entry *entry = NULL;
1945         struct ll_inode_info *lli = ll_i2info(dir);
1946         struct ll_statahead_info *sai = lli->lli_sai;
1947         struct ll_statahead_info *info = NULL;
1948         int rc = 0;
1949
1950         ENTRY;
1951
1952         if (sai && (*dentryp)->d_name.name[0] == '.') {
1953                 if (sai->sai_ls_all ||
1954                     sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1955                         /*
1956                          * Hidden dentry is the first one, or statahead
1957                          * thread does not skip so many hidden dentries
1958                          * before "sai_ls_all" enabled as below.
1959                          */
1960                 } else {
1961                         if (!sai->sai_ls_all)
1962                                 /*
1963                                  * It maybe because hidden dentry is not
1964                                  * the first one, "sai_ls_all" was not
1965                                  * set, then "ls -al" missed. Enable
1966                                  * "sai_ls_all" for such case.
1967                                  */
1968                                 sai->sai_ls_all = 1;
1969
1970                         /*
1971                          * Such "getattr" has been skipped before
1972                          * "sai_ls_all" enabled as above.
1973                          */
1974                         sai->sai_miss_hidden++;
1975                         RETURN(-EAGAIN);
1976                 }
1977         }
1978
1979         if (unplug)
1980                 GOTO(out, rc = 1);
1981
1982         entry = sa_get(ctx, &(*dentryp)->d_name, &info);
1983         if (entry == ERR_PTR(-EINVAL)) {
1984                 sai = info;
1985                 spin_lock(&lli->lli_sa_lock);
1986                 if (sai->sai_task) {
1987                         struct task_struct *task = sai->sai_task;
1988
1989                         /*
1990                          * matches smp_load_acquire() in
1991                          * ll_statahead_thread().
1992                          * Notify to stop statahead thread immediately.
1993                          */
1994                         smp_store_release(&sai->sai_task, NULL);
1995                         wake_up_process(task);
1996                 }
1997                 atomic_dec(&sai->sai_inuse_count);
1998                 spin_unlock(&lli->lli_sa_lock);
1999                 RETURN(-EINVAL);
2000         } else if (entry == NULL) {
2001                 if (lli->lli_sa_pattern & LSA_PATTERN_FNAME)
2002                         rc = ll_shared_statahead_check(dir, *dentryp, ctx);
2003                 GOTO(out, rc = rc == 0 ? -EAGAIN : rc);
2004         }
2005
2006         if (lli->lli_sa_pattern & LSA_PATTERN_LIST)
2007                 LASSERT(sai == entry->se_sai);
2008         else if (lli->lli_sa_pattern & LSA_PATTERN_FNAME ||
2009                  lli->lli_sa_pattern == LSA_PATTERN_ADVISE)
2010                 sai = entry->se_sai;
2011         else
2012                 sai = entry->se_sai;
2013
2014         LASSERTF(sai != NULL, "pattern %#X entry %p se_sai %p %pd lli %p\n",
2015                  lli->lli_sa_pattern, entry, entry->se_sai, *dentryp, lli);
2016         if (!sa_ready(entry)) {
2017                 spin_lock(&lli->lli_sa_lock);
2018                 sai->sai_index_wait = entry->se_index;
2019                 spin_unlock(&lli->lli_sa_lock);
2020                 rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
2021                                              cfs_time_seconds(30));
2022                 if (rc == 0) {
2023                         /*
2024                          * entry may not be ready, so it may be used by inflight
2025                          * statahead RPC, don't free it.
2026                          */
2027                         entry = NULL;
2028                         GOTO(out, rc = -EAGAIN);
2029                 }
2030         }
2031
2032         /*
2033          * We need to see the value that was set immediately before we
2034          * were woken up.
2035          */
2036         if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
2037             entry->se_inode) {
2038                 struct inode *inode = entry->se_inode;
2039                 struct lookup_intent it = { .it_op = IT_GETATTR,
2040                                             .it_lock_handle =
2041                                                 entry->se_handle };
2042                 __u64 bits;
2043
2044                 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
2045                                         ll_inode2fid(inode), &bits);
2046                 if (rc == 1) {
2047                         if (!(*dentryp)->d_inode) {
2048                                 struct dentry *alias;
2049
2050                                 alias = ll_splice_alias(inode, *dentryp);
2051                                 if (IS_ERR(alias)) {
2052                                         ll_intent_release(&it);
2053                                         GOTO(out, rc = PTR_ERR(alias));
2054                                 }
2055                                 *dentryp = alias;
2056                                 /*
2057                                  * statahead prepared this inode, transfer inode
2058                                  * refcount from sa_entry to dentry
2059                                  */
2060                                 entry->se_inode = NULL;
2061                         } else if ((*dentryp)->d_inode != inode) {
2062                                 /* revalidate, but inode is recreated */
2063                                 CDEBUG(D_READA,
2064                                        "%s: stale dentry %pd inode " DFID", statahead inode "DFID "\n",
2065                                        ll_i2sbi(inode)->ll_fsname, *dentryp,
2066                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
2067                                        PFID(ll_inode2fid(inode)));
2068                                 ll_intent_release(&it);
2069                                 GOTO(out, rc = -ESTALE);
2070                         }
2071
2072                         if (bits & MDS_INODELOCK_LOOKUP) {
2073                                 d_lustre_revalidate(*dentryp);
2074                                 if (S_ISDIR(inode->i_mode))
2075                                         ll_update_dir_depth_dmv(dir, *dentryp);
2076                         }
2077
2078                         ll_intent_release(&it);
2079                 }
2080         }
2081 out:
2082         /*
2083          * statahead cached sa_entry can be used only once, and will be killed
2084          * right after use, so if lookup/revalidate accessed statahead cache,
2085          * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
2086          * stat this file again, we know we've done statahead before, see
2087          * dentry_may_statahead().
2088          */
2089         if (lld_is_init(*dentryp))
2090                 ll_d2d(*dentryp)->lld_sa_generation = lli->lli_sa_generation;
2091         sa_put(dir, sai, entry);
2092
2093         RETURN(rc);
2094 }
2095
2096 static inline bool
2097 sa_pattern_list_detect(struct inode *dir, struct dentry *dchild, int *first)
2098 {
2099         struct ll_inode_info *lli = ll_i2info(dir);
2100
2101         if (lli->lli_stat_pid == 0)
2102                 return false;
2103
2104         /* Directory listing needs to call opendir()/readdir()/stat(). */
2105         if (!(lli->lli_sa_pattern & LSA_PATTERN_OPENDIR))
2106                 return false;
2107
2108         if (lli->lli_sa_enabled == 0)
2109                 return false;
2110
2111         if (lli->lli_sa_pattern & LSA_PATTERN_LS_NOT_FIRST_DE)
2112                 return false;
2113
2114         *first = is_first_dirent(dir, dchild);
2115         if (*first == LS_NOT_FIRST_DE) {
2116                 /*
2117                  * It is not "ls -{a}l" operation, no need statahead for it.
2118                  * Disable statahead so that subsequent stat() won't waste
2119                  * time to try it.
2120                  */
2121                 spin_lock(&lli->lli_sa_lock);
2122                 if (lli->lli_stat_pid == current->pid) {
2123                         lli->lli_sa_enabled = 0;
2124                         lli->lli_sa_pattern |= LSA_PATTERN_LS_NOT_FIRST_DE;
2125                 }
2126                 spin_unlock(&lli->lli_sa_lock);
2127                 return false;
2128         }
2129
2130         spin_lock(&lli->lli_sa_lock);
2131         lli->lli_sa_pattern |= LSA_PATTERN_LIST;
2132         spin_unlock(&lli->lli_sa_lock);
2133         return true;
2134 }
2135
2136 static inline bool
2137 sa_pattern_fname_detect(struct inode *dir, struct dentry *dchild)
2138 {
2139         struct ll_inode_info *lli = ll_i2info(dir);
2140         struct qstr *dname = &dchild->d_name;
2141         const unsigned char *name = dname->name;
2142         bool rc = false;
2143         int i;
2144
2145         if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
2146                 return false;
2147         if (lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED)
2148                 return true;
2149
2150         /*
2151          * Parse the format of the file name to determine whether it matches
2152          * the supported file name pattern for statahead (i.e. mdtest.$rank.$i).
2153          */
2154         i = dname->len - 1;
2155         if (isdigit(name[i])) {
2156                 long num;
2157                 int ret;
2158
2159                 if (lli->lli_stat_pid == 0) {
2160                         lli->lli_stat_pid = current->pid;
2161                 } else if (lli->lli_stat_pid != current->pid) {
2162                         /*
2163                          * More than two processes (MPI ranks) doing stat()
2164                          * calls under this directory, consider it as a mdtest
2165                          * shared dir stat() workload.
2166                          */
2167                         spin_lock(&lli->lli_sa_lock);
2168                         lli->lli_stat_pid = current->pid;
2169                         if (lli->lli_sa_pattern & LSA_PATTERN_FNAME) {
2170                                 lli->lli_sai = NULL;
2171                                 rc = false;
2172                         } else {
2173                                 lli->lli_sa_pattern |= LSA_PATTERN_FNAME;
2174                                 rc = true;
2175                         }
2176                         lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
2177                         spin_unlock(&lli->lli_sa_lock);
2178                         return rc;
2179                 }
2180
2181                 while (--i >= 0 && isdigit(name[i]))
2182                         ; /* do nothing */
2183                 i++;
2184                 ret = kstrtol(&name[i], 0, &num);
2185                 if (ret)
2186                         GOTO(out, rc);
2187
2188                 /*
2189                  * The traversing program do multiple stat() calls on the same
2190                  * children entry. i.e. ls $dir*.
2191                  */
2192                 if (lli->lli_sa_fname_index == num)
2193                         return false;
2194
2195                 if (lli->lli_sa_match_count == 0 ||
2196                     num == lli->lli_sa_fname_index + 1) {
2197                         lli->lli_sa_match_count++;
2198                         lli->lli_sa_fname_index = num;
2199
2200                         if (lli->lli_sa_match_count > LSA_FN_MATCH_HIT)
2201                                 GOTO(out, rc = true);
2202
2203                         return false;
2204                 }
2205         }
2206 out:
2207         spin_lock(&lli->lli_sa_lock);
2208         if (rc) {
2209                 lli->lli_sa_pattern |= LSA_PATTERN_FNAME;
2210         } else {
2211                 lli->lli_sa_pattern = LSA_PATTERN_NONE;
2212                 lli->lli_sa_match_count = 0;
2213                 lli->lli_sa_fname_index = 0;
2214                 lli->lli_sa_enabled = 0;
2215         }
2216         spin_unlock(&lli->lli_sa_lock);
2217
2218         return rc;
2219 }
2220
2221 /* detect the statahead pattern. */
2222 static inline bool
2223 sa_pattern_detect(struct inode *dir, struct dentry *dchild, int *first)
2224 {
2225         return sa_pattern_list_detect(dir, dchild, first) ||
2226                sa_pattern_fname_detect(dir, dchild);
2227 }
2228
2229 static inline int ll_sax_add_sai(struct ll_statahead_context *ctx,
2230                                  struct ll_statahead_info *sai)
2231 {
2232         if (ll_find_sai_locked(ctx, sai->sai_pid) != NULL)
2233                 return -EEXIST;
2234
2235         list_add_tail(&sai->sai_item, &ctx->sax_sai_list);
2236         return 0;
2237 }
2238
2239 /**
2240  * start statahead thread
2241  *
2242  * \param[in] dir       parent directory
2243  * \param[in] dentry    dentry that triggers statahead, normally the first
2244  *                      dirent under @dir
2245  * \param[in] agl       indicate whether AGL is needed
2246  * \retval              -EAGAIN on success, because when this function is
2247  *                      called, it's already in lookup call, so client should
2248  *                      do it itself instead of waiting for statahead thread
2249  *                      to do it asynchronously.
2250  * \retval              negative number upon error
2251  */
2252 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
2253                                   bool agl)
2254 {
2255         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
2256         struct ll_inode_info *lli = ll_i2info(dir);
2257         struct ll_statahead_info *sai = NULL;
2258         struct ll_statahead_context *ctx = NULL;
2259         struct dentry *parent;
2260         struct task_struct *task;
2261         struct ll_sb_info *sbi;
2262         int first = LS_FIRST_DE;
2263         int rc = 0;
2264
2265         ENTRY;
2266
2267         if (sa_pattern_detect(dir, dentry, &first) == false)
2268                 RETURN(0);
2269
2270         parent = dget_parent(dentry);
2271         sbi = ll_i2sbi(d_inode(parent));
2272         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
2273                                        sbi->ll_sa_running_max)) {
2274                 CDEBUG(D_READA,
2275                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
2276                 dput(parent);
2277                 GOTO(out, rc = -EMFILE);
2278         }
2279
2280         /* on success ll_sai_alloc holds a ref on parent */
2281         sai = ll_sai_alloc(parent);
2282         dput(parent);
2283         if (!sai)
2284                 GOTO(out, rc = -ENOMEM);
2285
2286         sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
2287         sai->sai_pid = current->pid;
2288
2289         if (lli->lli_sa_pattern & LSA_PATTERN_FNAME) {
2290                 struct qstr *dname = &dentry->d_name;
2291                 const unsigned char *name = dname->name;
2292                 long num;
2293                 int i;
2294
2295                 if (dname->len >= sizeof(sai->sai_fname))
2296                         GOTO(out, rc = -ERANGE);
2297
2298                 i = dname->len;
2299                 while (--i >= 0 && isdigit(name[i]))
2300                         ; /* do nothing */
2301                 i++;
2302                 rc = kstrtol(&name[i], 0, &num);
2303                 if (rc)
2304                         GOTO(out, rc);
2305
2306                 memcpy(sai->sai_fname, dname->name, i);
2307                 sai->sai_fname[i] = '\0';
2308                 sai->sai_fname_index = num;
2309                 /* The front part of the file name is zeroed padding. */
2310                 if (name[i] == '0')
2311                         sai->sai_fname_zeroed_len = dname->len - i;
2312         }
2313
2314         /* The workload like directory listing or mdtest unique dir stat() */
2315         if (lli->lli_sa_pattern & LSA_PATTERN_LIST ||
2316             (lli->lli_sa_pattern & (LSA_PATTERN_FN_SHARED |
2317                                     LSA_PATTERN_FNAME)) == LSA_PATTERN_FNAME) {
2318                 ctx = ll_sax_alloc(dir);
2319                 if (!ctx)
2320                         GOTO(out, rc = -ENOMEM);
2321
2322                 /*
2323                  * if current lli_opendir_key was deauthorized, or dir
2324                  * re-opened by another process, don't start statahead,
2325                  * otherwise the newly spawned statahead thread won't be
2326                  * notified to quit.
2327                  */
2328                 spin_lock(&lli->lli_sa_lock);
2329                 if (unlikely(lli->lli_sai || lli->lli_sax ||
2330                              ((lli->lli_sa_pattern & LSA_PATTERN_LIST) &&
2331                               !lli->lli_opendir_key &&
2332                               lli->lli_stat_pid != current->pid))) {
2333                         spin_unlock(&lli->lli_sa_lock);
2334                         GOTO(out, rc = -EPERM);
2335                 }
2336                 rc = ll_sax_add_sai(ctx, sai);
2337                 if (rc) {
2338                         spin_unlock(&lli->lli_sa_lock);
2339                         GOTO(out, rc);
2340                 }
2341                 lli->lli_sai = sai;
2342                 lli->lli_sax = ctx;
2343                 spin_unlock(&lli->lli_sa_lock);
2344         } else if (lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED) {
2345                 /* For mdtest shared dir stat() workload */
2346                 LASSERT(lli->lli_sa_pattern & LSA_PATTERN_FNAME);
2347                 ctx = ll_sax_get(dir);
2348                 if (ctx == NULL) {
2349                         ctx = ll_sax_alloc(dir);
2350                         if (ctx == NULL)
2351                                 GOTO(out, rc = -ENOMEM);
2352
2353                         spin_lock(&lli->lli_sa_lock);
2354                         if (lli->lli_sax) {
2355                                 struct ll_statahead_context *tmp = ctx;
2356
2357                                 if (lli->lli_sa_pattern &
2358                                     LSA_PATTERN_FN_SHARED) {
2359                                         ctx = lli->lli_sax;
2360                                         __ll_sax_get(ctx);
2361                                         rc = ll_sax_add_sai(ctx, sai);
2362                                 } else {
2363                                         CWARN("%s: invalid pattern %#X.\n",
2364                                               sbi->ll_fsname,
2365                                               lli->lli_sa_pattern);
2366                                         rc = -EINVAL;
2367                                 }
2368
2369                                 spin_unlock(&lli->lli_sa_lock);
2370                                 ll_sax_free(tmp);
2371                                 if (rc)
2372                                         GOTO(out, rc);
2373                         } else {
2374                                 lli->lli_sax = ctx;
2375                                 rc = ll_sax_add_sai(ctx, sai);
2376                                 spin_unlock(&lli->lli_sa_lock);
2377                         }
2378                 } else {
2379                         spin_lock(&lli->lli_sa_lock);
2380                         if (!(lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED)) {
2381                                 spin_unlock(&lli->lli_sa_lock);
2382                                 GOTO(out, rc = -EINVAL);
2383                         }
2384
2385                         rc = ll_sax_add_sai(ctx, sai);
2386                         spin_unlock(&lli->lli_sa_lock);
2387                 }
2388
2389                 if (rc)
2390                         GOTO(out, rc);
2391         } else {
2392                 CERROR("%s: unsupported statahead pattern %#X.\n",
2393                        sbi->ll_fsname, lli->lli_sa_pattern);
2394                 GOTO(out, rc = -EOPNOTSUPP);
2395         }
2396
2397         CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
2398                current->pid, parent);
2399
2400         task = kthread_create_on_node(ll_statahead_thread, sai, node,
2401                                       "ll_sa_%u", lli->lli_stat_pid);
2402         if (IS_ERR(task)) {
2403                 spin_lock(&lli->lli_sa_lock);
2404                 lli->lli_sai = NULL;
2405                 spin_unlock(&lli->lli_sa_lock);
2406                 rc = PTR_ERR(task);
2407                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
2408                 GOTO(out, rc);
2409         }
2410
2411         if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
2412                 ll_start_agl(parent, sai);
2413
2414         atomic_inc(&sbi->ll_sa_total);
2415         if (lli->lli_sa_pattern & LSA_PATTERN_LIST)
2416                 atomic_inc(&sbi->ll_sa_list_total);
2417         else if (lli->lli_sa_pattern & LSA_PATTERN_FNAME)
2418                 atomic_inc(&sbi->ll_sa_fname_total);
2419
2420         sai->sai_task = task;
2421         wake_up_process(task);
2422         /*
2423          * We don't stat-ahead for the first dirent since we are already in
2424          * lookup.
2425          */
2426         RETURN(-EAGAIN);
2427
2428 out:
2429         /*
2430          * once we start statahead thread failed, disable statahead so that
2431          * subsequent stat won't waste time to try it.
2432          */
2433         spin_lock(&lli->lli_sa_lock);
2434         if (lli->lli_stat_pid == current->pid)
2435                 lli->lli_sa_enabled = 0;
2436         spin_unlock(&lli->lli_sa_lock);
2437
2438         if (sai)
2439                 ll_sai_put(sai);
2440
2441         if (ctx)
2442                 ll_sax_put(dir, ctx);
2443
2444         if (rc)
2445                 atomic_dec(&sbi->ll_sa_running);
2446
2447         RETURN(rc);
2448 }
2449
2450 /*
2451  * Check whether statahead for @dir was started.
2452  */
2453 static inline bool ll_statahead_started(struct inode *dir, bool agl)
2454 {
2455         struct ll_inode_info *lli = ll_i2info(dir);
2456         struct ll_statahead_context *ctx;
2457         struct ll_statahead_info *sai;
2458
2459         spin_lock(&lli->lli_sa_lock);
2460         ctx = lli->lli_sax;
2461         sai = lli->lli_sai;
2462         if (sai && (sai->sai_agl_task != NULL) != agl)
2463                 CDEBUG(D_READA,
2464                        "%s: Statahead AGL hint changed from %d to %d\n",
2465                        ll_i2sbi(dir)->ll_fsname,
2466                        sai->sai_agl_task != NULL, agl);
2467         spin_unlock(&lli->lli_sa_lock);
2468
2469         return !!ctx;
2470 }
2471
2472 /**
2473  * statahead entry function, this is called when client getattr on a file, it
2474  * will start statahead thread if this is the first dir entry, else revalidate
2475  * dentry from statahead cache.
2476  *
2477  * \param[in]  dir      parent directory
2478  * \param[out] dentryp  dentry to getattr
2479  * \param[in]  agl      whether start the agl thread
2480  *
2481  * \retval              1 on success
2482  * \retval              0 revalidation from statahead cache failed, caller needs
2483  *                      to getattr from server directly
2484  * \retval              negative number on error, caller often ignores this and
2485  *                      then getattr from server
2486  */
2487 int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
2488 {
2489         if (!ll_statahead_started(dir, agl))
2490                 return start_statahead_thread(dir, dentry, agl);
2491         return 0;
2492 }
2493
2494 /**
2495  * revalidate dentry from statahead cache.
2496  *
2497  * \param[in]  dir      parent directory
2498  * \param[out] dentryp  dentry to getattr
2499  * \param[in]  unplug   unplug statahead window only (normally for negative
2500  *                      dentry)
2501  * \retval              1 on success
2502  * \retval              0 revalidation from statahead cache failed, caller needs
2503  *                      to getattr from server directly
2504  * \retval              negative number on error, caller often ignores this and
2505  *                      then getattr from server
2506  */
2507 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
2508                             bool unplug)
2509 {
2510         struct ll_inode_info *lli = ll_i2info(dir);
2511         struct ll_statahead_context *ctx;
2512         struct ll_statahead_info *sai = NULL;
2513         int rc = 0;
2514
2515         spin_lock(&lli->lli_sa_lock);
2516         ctx = lli->lli_sax;
2517         if (ctx) {
2518                 sai = lli->lli_sai;
2519                 if (sai) {
2520                         atomic_inc(&sai->sai_refcount);
2521                 } else if (lli->lli_sa_pattern & LSA_PATTERN_LIST) {
2522                         spin_unlock(&lli->lli_sa_lock);
2523                         return 0;
2524                 }
2525                 __ll_sax_get(ctx);
2526         }
2527         spin_unlock(&lli->lli_sa_lock);
2528         if (ctx) {
2529                 rc = revalidate_statahead_dentry(dir, ctx, dentryp, unplug);
2530                 CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
2531                        *dentryp, rc);
2532                 if (sai)
2533                         ll_sai_put(sai);
2534                 ll_sax_put(dir, ctx);
2535         }
2536         return rc;
2537 }
2538
2539 int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise)
2540 {
2541         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
2542         struct ll_file_data *fd = file->private_data;
2543         struct dentry *dentry = file_dentry(file);
2544         struct inode *dir = dentry->d_inode;
2545         struct ll_inode_info *lli = ll_i2info(dir);
2546         struct ll_sb_info *sbi = ll_i2sbi(dir);
2547         struct ll_statahead_info *sai = NULL;
2548         struct ll_statahead_context *ctx = NULL;
2549         struct task_struct *task;
2550         bool agl = true;
2551         int rc;
2552
2553         ENTRY;
2554
2555         if (sbi->ll_sa_max == 0)
2556                 RETURN(0);
2557
2558         if (!S_ISDIR(dir->i_mode))
2559                 RETURN(-EINVAL);
2560
2561         if (fd->fd_sai) {
2562                 rc = -EALREADY;
2563                 CWARN("%s: already set statahead hint for dir %pd: rc = %d\n",
2564                       sbi->ll_fsname, dentry, rc);
2565                 RETURN(rc);
2566         }
2567
2568         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
2569                                        sbi->ll_sa_running_max)) {
2570                 CDEBUG(D_READA,
2571                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
2572                 GOTO(out, rc = -EMFILE);
2573         }
2574
2575         sai = ll_sai_alloc(dentry);
2576         if (sai == NULL)
2577                 GOTO(out, rc = -ENOMEM);
2578
2579         sai->sai_fstart = ladvise->lla_start;
2580         sai->sai_fend = ladvise->lla_end;
2581         sai->sai_ls_all = 0;
2582         sai->sai_max = sbi->ll_sa_max;
2583         strncpy(sai->sai_fname, ladvise->lla_fname, sizeof(sai->sai_fname));
2584         sai->sai_pid = current->pid;
2585
2586         ctx = ll_sax_get(dir);
2587         if (ctx == NULL) {
2588                 ctx = ll_sax_alloc(dir);
2589                 if (ctx == NULL)
2590                         GOTO(out, rc = -ENOMEM);
2591
2592                 spin_lock(&lli->lli_sa_lock);
2593                 if (unlikely(lli->lli_sax)) {
2594                         struct ll_statahead_context *tmp = ctx;
2595
2596                         if (lli->lli_sa_pattern == LSA_PATTERN_NONE ||
2597                             lli->lli_sa_pattern == LSA_PATTERN_ADVISE) {
2598                                 lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2599                                 ctx = lli->lli_sax;
2600                                 __ll_sax_get(ctx);
2601                                 fd->fd_sai = __ll_sai_get(sai);
2602                                 rc = 0;
2603                         } else {
2604                                 rc = -EINVAL;
2605                                 CWARN("%s: pattern %X is not ADVISE: rc = %d\n",
2606                                       sbi->ll_fsname, lli->lli_sa_pattern, rc);
2607                         }
2608
2609                         spin_unlock(&lli->lli_sa_lock);
2610                         ll_sax_free(tmp);
2611                         if (rc)
2612                                 GOTO(out, rc);
2613                 } else {
2614                         lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2615                         lli->lli_sax = ctx;
2616                         fd->fd_sai = __ll_sai_get(sai);
2617                         spin_unlock(&lli->lli_sa_lock);
2618                 }
2619         } else {
2620                 spin_lock(&lli->lli_sa_lock);
2621                 if (!(lli->lli_sa_pattern == LSA_PATTERN_ADVISE ||
2622                       lli->lli_sa_pattern == LSA_PATTERN_NONE)) {
2623                         spin_unlock(&lli->lli_sa_lock);
2624                         GOTO(out, rc = -EINVAL);
2625                 }
2626
2627                 lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2628                 fd->fd_sai = __ll_sai_get(sai);
2629                 spin_unlock(&lli->lli_sa_lock);
2630         }
2631
2632         __ll_sax_get(ctx);
2633         CDEBUG(D_READA,
2634                "start statahead thread: [pid %d] [parent %pd] sai %p ctx %p\n",
2635                current->pid, dentry, sai, ctx);
2636
2637         task = kthread_create_on_node(ll_statahead_thread, sai, node,
2638                                       "ll_sa_%u", current->pid);
2639         if (IS_ERR(task)) {
2640                 rc = PTR_ERR(task);
2641                 CERROR("%s: cannot start ll_sa thread: rc = %d\n",
2642                        sbi->ll_fsname, rc);
2643                 GOTO(out, rc);
2644         }
2645
2646         if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
2647                 ll_start_agl(dentry, sai);
2648
2649         atomic_inc(&sbi->ll_sa_total);
2650         sai->sai_task = task;
2651         wake_up_process(task);
2652
2653         RETURN(0);
2654 out:
2655         if (fd->fd_sai) {
2656                 ll_sai_put(sai);
2657                 ll_sax_put(dir, ctx);
2658                 fd->fd_sai = NULL;
2659         }
2660
2661         if (sai)
2662                 ll_sai_put(sai);
2663
2664         if (ctx)
2665                 ll_sax_put(dir, ctx);
2666
2667         atomic_dec(&sbi->ll_sa_running);
2668         RETURN(rc);
2669 }
2670
2671 /*
2672  * This function is called in each stat() system call to do statahead check.
2673  * When the files' naming of stat() call sequence under a directory follows
2674  * a certain name rule roughly, this directory is considered as an condicant
2675  * to do statahead.
2676  * For an example, the file naming rule is mdtest.$rank.$i, the suffix of
2677  * the stat() dentry name is number and do stat() for dentries with name
2678  * ending with number more than @LSA_FN_PREDICT_HIT, then the corresponding
2679  * directory is met the requrirement for statahead.
2680  */
2681 void ll_statahead_enter(struct inode *dir, struct dentry *dchild)
2682 {
2683         struct ll_inode_info *lli;
2684         struct qstr *dname = &dchild->d_name;
2685
2686         if (ll_i2sbi(dir)->ll_sa_max == 0)
2687                 return;
2688
2689         if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
2690                 return;
2691
2692         lli = ll_i2info(dir);
2693         if (lli->lli_sa_enabled)
2694                 return;
2695
2696         if (lli->lli_sa_pattern & (LSA_PATTERN_FN_PREDICT | LSA_PATTERN_LIST))
2697                 return;
2698
2699         /*
2700          * Now support number indexing regularized statahead pattern only.
2701          * Quick check whether the last character is digit.
2702          */
2703         if (!isdigit(dname->name[dname->len - 1])) {
2704                 lli->lli_sa_pattern &= ~LSA_PATTERN_FN_PREDICT;
2705                 lli->lli_sa_match_count = 0;
2706                 return;
2707         }
2708
2709         lli->lli_sa_match_count++;
2710         if (lli->lli_sa_match_count > LSA_FN_PREDICT_HIT) {
2711                 spin_lock(&lli->lli_sa_lock);
2712                 lli->lli_sa_pattern |= LSA_PATTERN_FN_PREDICT;
2713                 spin_unlock(&lli->lli_sa_lock);
2714                 lli->lli_sa_enabled = 1;
2715                 lli->lli_sa_match_count = 0;
2716         }
2717 }