Whamcloud - gitweb
LU-14361 statahead: add support for mdtest shared dir workload
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #include <linux/fs.h>
33 #include <linux/sched.h>
34 #include <linux/kthread.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38 #include <linux/delay.h>
39
40 #define DEBUG_SUBSYSTEM S_LLITE
41
42 #include <obd_support.h>
43 #include <lustre_dlm.h>
44 #include "llite_internal.h"
45
46 #define SA_OMITTED_ENTRY_MAX 8ULL
47
48 typedef enum {
49         /** negative values are for error cases */
50         SA_ENTRY_INIT = 0,      /** init entry */
51         SA_ENTRY_SUCC = 1,      /** stat succeed */
52         SA_ENTRY_INVA = 2,      /** invalid entry */
53 } se_state_t;
54
55 /*
56  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
57  * and in async stat callback ll_statahead_interpret() will prepare the inode
58  * and set lock data in the ptlrpcd context. Then the scanner process will be
59  * woken up if this entry is the waiting one, can access and free it.
60  */
61 struct sa_entry {
62         /* link into sai_entries */
63         struct list_head                 se_list;
64         /* link into sai hash table locally */
65         struct list_head                 se_hash;
66         /* entry index in the sai */
67         __u64                            se_index;
68         /* low layer ldlm lock handle */
69         __u64                            se_handle;
70         /* entry status */
71         se_state_t                       se_state;
72         /* entry size, contains name */
73         int                              se_size;
74         /* pointer to the target inode */
75         struct inode                    *se_inode;
76         /* pointer to @sai per process struct */
77         struct ll_statahead_info        *se_sai;
78         /* entry name */
79         struct qstr                      se_qstr;
80         /* entry fid */
81         struct lu_fid                    se_fid;
82 };
83
84 static unsigned int sai_generation;
85 static DEFINE_SPINLOCK(sai_generation_lock);
86
87 static inline int sa_unhashed(struct sa_entry *entry)
88 {
89         return list_empty(&entry->se_hash);
90 }
91
92 /* sa_entry is ready to use */
93 static inline int sa_ready(struct sa_entry *entry)
94 {
95         /* Make sure sa_entry is updated and ready to use */
96         smp_rmb();
97         return (entry->se_state != SA_ENTRY_INIT);
98 }
99
100 /* hash value to put in sai_cache */
101 static inline int sa_hash(int val)
102 {
103         return val & LL_SA_CACHE_MASK;
104 }
105
106 /* hash entry into sax_cache */
107 static inline void
108 sa_rehash(struct ll_statahead_context *ctx, struct sa_entry *entry)
109 {
110         int i = sa_hash(entry->se_qstr.hash);
111
112         spin_lock(&ctx->sax_cache_lock[i]);
113         list_add_tail(&entry->se_hash, &ctx->sax_cache[i]);
114         spin_unlock(&ctx->sax_cache_lock[i]);
115 }
116
117 /* unhash entry from sai_cache */
118 static inline void
119 sa_unhash(struct ll_statahead_context *ctx, struct sa_entry *entry)
120 {
121         int i = sa_hash(entry->se_qstr.hash);
122
123         spin_lock(&ctx->sax_cache_lock[i]);
124         list_del_init(&entry->se_hash);
125         spin_unlock(&ctx->sax_cache_lock[i]);
126 }
127
128 static inline int agl_should_run(struct ll_statahead_info *sai,
129                                  struct inode *inode)
130 {
131         return inode && S_ISREG(inode->i_mode) && sai->sai_agl_task;
132 }
133
134 static inline struct ll_inode_info *
135 agl_first_entry(struct ll_statahead_info *sai)
136 {
137         return list_first_entry(&sai->sai_agls, struct ll_inode_info,
138                                 lli_agl_list);
139 }
140
141 /* statahead window is full */
142 static inline int sa_sent_full(struct ll_statahead_info *sai)
143 {
144         return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
145 }
146
147 /* Batch metadata handle */
148 static inline bool sa_has_batch_handle(struct ll_statahead_info *sai)
149 {
150         return sai->sai_bh != NULL;
151 }
152
153 static inline void ll_statahead_flush_nowait(struct ll_statahead_info *sai)
154 {
155         if (sa_has_batch_handle(sai)) {
156                 sai->sai_index_end = sai->sai_index - 1;
157                 (void) md_batch_flush(ll_i2mdexp(sai->sai_dentry->d_inode),
158                                       sai->sai_bh, false);
159         }
160 }
161
162 static inline int agl_list_empty(struct ll_statahead_info *sai)
163 {
164         return list_empty(&sai->sai_agls);
165 }
166
167 /**
168  * (1) hit ratio less than 80%
169  * or
170  * (2) consecutive miss more than 8
171  * then means low hit.
172  */
173 static inline int sa_low_hit(struct ll_statahead_info *sai)
174 {
175         return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
176                 (sai->sai_consecutive_miss > 8));
177 }
178
179 /*
180  * if the given index is behind of statahead window more than
181  * SA_OMITTED_ENTRY_MAX, then it is old.
182  */
183 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
184 {
185         return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
186                 sai->sai_index);
187 }
188
189 /* allocate sa_entry and hash it to allow scanner process to find it */
190 static struct sa_entry *
191 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
192          const char *name, int len, const struct lu_fid *fid)
193 {
194         struct ll_inode_info *lli;
195         struct sa_entry *entry;
196         int entry_size;
197         char *dname;
198
199         ENTRY;
200
201         entry_size = sizeof(struct sa_entry) +
202                      round_up(len + 1 /* for trailing NUL */, 4);
203         OBD_ALLOC(entry, entry_size);
204         if (unlikely(!entry))
205                 RETURN(ERR_PTR(-ENOMEM));
206
207         CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
208                len, name, entry, index);
209
210         entry->se_index = index;
211         entry->se_sai = sai;
212
213         entry->se_state = SA_ENTRY_INIT;
214         entry->se_size = entry_size;
215         dname = (char *)entry + sizeof(struct sa_entry);
216         memcpy(dname, name, len);
217         dname[len] = 0;
218         entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
219         entry->se_qstr.len = len;
220         entry->se_qstr.name = dname;
221
222         if (fid)
223                 entry->se_fid = *fid;
224
225         lli = ll_i2info(sai->sai_dentry->d_inode);
226         spin_lock(&lli->lli_sa_lock);
227         INIT_LIST_HEAD(&entry->se_list);
228         sa_rehash(lli->lli_sax, entry);
229         spin_unlock(&lli->lli_sa_lock);
230
231         atomic_inc(&sai->sai_cache_count);
232
233         RETURN(entry);
234 }
235
236 /* free sa_entry, which should have been unhashed and not in any list */
237 static void sa_free(struct ll_statahead_context *ctx, struct sa_entry *entry)
238 {
239         CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
240                entry->se_qstr.len, entry->se_qstr.name, entry,
241                entry->se_index);
242
243         LASSERT(list_empty(&entry->se_list));
244         LASSERT(sa_unhashed(entry));
245
246         OBD_FREE(entry, entry->se_size);
247 }
248
249 /*
250  * find sa_entry by name, used by directory scanner, lock is not needed because
251  * only scanner can remove the entry from cache.
252  */
253 static struct sa_entry *
254 sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
255 {
256         struct sa_entry *entry;
257         int i = sa_hash(qstr->hash);
258
259         spin_lock(&ctx->sax_cache_lock[i]);
260         list_for_each_entry(entry, &ctx->sax_cache[i], se_hash) {
261                 if (entry->se_qstr.hash == qstr->hash &&
262                     entry->se_qstr.len == qstr->len &&
263                     memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) {
264                         spin_unlock(&ctx->sax_cache_lock[i]);
265                         return entry;
266                 }
267         }
268         spin_unlock(&ctx->sax_cache_lock[i]);
269         return NULL;
270 }
271
272 /* unhash and unlink sa_entry, and then free it */
273 static inline void
274 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry, bool locked)
275 {
276         struct inode *dir = sai->sai_dentry->d_inode;
277         struct ll_inode_info *lli = ll_i2info(dir);
278         struct ll_statahead_context *ctx = lli->lli_sax;
279
280         LASSERT(!sa_unhashed(entry));
281         LASSERT(!list_empty(&entry->se_list));
282         LASSERT(sa_ready(entry));
283
284         sa_unhash(ctx, entry);
285
286         if (!locked)
287                 spin_lock(&lli->lli_sa_lock);
288         list_del_init(&entry->se_list);
289         spin_unlock(&lli->lli_sa_lock);
290
291         iput(entry->se_inode);
292         atomic_dec(&sai->sai_cache_count);
293         sa_free(ctx, entry);
294         if (locked)
295                 spin_lock(&lli->lli_sa_lock);
296 }
297
298 /* called by scanner after use, sa_entry will be killed */
299 static void
300 sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
301 {
302         struct ll_inode_info *lli = ll_i2info(dir);
303         struct sa_entry *tmp;
304         bool wakeup = false;
305
306         if (entry && entry->se_state == SA_ENTRY_SUCC) {
307                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
308
309                 sai->sai_hit++;
310                 sai->sai_consecutive_miss = 0;
311                 if (sai->sai_max < sbi->ll_sa_max) {
312                         sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
313                         wakeup = true;
314                 } else if (sai->sai_max_batch_count > 0) {
315                         if (sai->sai_max >= sai->sai_max_batch_count &&
316                            (sai->sai_index_end - entry->se_index) %
317                            sai->sai_max_batch_count == 0) {
318                                 wakeup = true;
319                         } else if (entry->se_index == sai->sai_index_end) {
320                                 wakeup = true;
321                         }
322                 } else {
323                         wakeup = true;
324                 }
325         } else if (sai) {
326                 sai->sai_miss++;
327                 sai->sai_consecutive_miss++;
328                 wakeup = true;
329         }
330
331         if (entry)
332                 sa_kill(sai, entry, false);
333
334         if (sai) {
335                 /*
336                  * kill old completed entries. Maybe kicking old entries can
337                  * be ignored?
338                  */
339                 spin_lock(&lli->lli_sa_lock);
340                 while ((tmp = list_first_entry_or_null(&sai->sai_entries,
341                                 struct sa_entry, se_list))) {
342                         if (!is_omitted_entry(sai, tmp->se_index))
343                                 break;
344
345                         /* ll_sa_lock is dropped by sa_kill(), restart list */
346                         sa_kill(sai, tmp, true);
347                 }
348                 spin_unlock(&lli->lli_sa_lock);
349         }
350
351         spin_lock(&lli->lli_sa_lock);
352         if (wakeup && sai->sai_task)
353                 wake_up_process(sai->sai_task);
354         spin_unlock(&lli->lli_sa_lock);
355 }
356
357 /*
358  * update state and sort add entry to sai_entries by index, return true if
359  * scanner is waiting on this entry.
360  */
361 static bool
362 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
363 {
364         struct sa_entry *se;
365         struct list_head *pos = &sai->sai_entries;
366         __u64 index = entry->se_index;
367
368         LASSERT(!sa_ready(entry));
369         LASSERT(list_empty(&entry->se_list));
370
371         list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
372                 if (se->se_index < entry->se_index) {
373                         pos = &se->se_list;
374                         break;
375                 }
376         }
377         list_add(&entry->se_list, pos);
378         /*
379          * LU-9210: ll_statahead_interpet must be able to see this before
380          * we wake it up
381          */
382         smp_store_release(&entry->se_state,
383                           ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
384
385         return (index == sai->sai_index_wait);
386 }
387
388 /* finish async stat RPC arguments */
389 static void sa_fini_data(struct md_op_item *item)
390 {
391         struct md_op_data *op_data = &item->mop_data;
392
393         if (op_data->op_flags & MF_OPNAME_KMALLOCED)
394                 /* allocated via ll_setup_filename called from sa_prep_data */
395                 kfree(op_data->op_name);
396         ll_unlock_md_op_lsm(&item->mop_data);
397         iput(item->mop_dir);
398         if (item->mop_subpill_allocated)
399                 OBD_FREE_PTR(item->mop_pill);
400         OBD_FREE_PTR(item);
401 }
402
403 static int ll_statahead_interpret(struct md_op_item *item, int rc);
404
405 /*
406  * prepare arguments for async stat RPC.
407  */
408 static struct md_op_item *
409 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
410 {
411         struct md_op_item *item;
412         struct ldlm_enqueue_info *einfo;
413         struct md_op_data *op_data;
414
415         OBD_ALLOC_PTR(item);
416         if (!item)
417                 return ERR_PTR(-ENOMEM);
418
419         op_data = ll_prep_md_op_data(&item->mop_data, dir, child,
420                                      entry->se_qstr.name, entry->se_qstr.len, 0,
421                                      LUSTRE_OPC_ANY, NULL);
422         if (IS_ERR(op_data)) {
423                 OBD_FREE_PTR(item);
424                 return (struct md_op_item *)op_data;
425         }
426
427         if (!child)
428                 op_data->op_fid2 = entry->se_fid;
429
430         item->mop_opc = MD_OP_GETATTR;
431         item->mop_it.it_op = IT_GETATTR;
432         item->mop_dir = igrab(dir);
433         item->mop_cb = ll_statahead_interpret;
434         item->mop_cbdata = entry;
435
436         einfo = &item->mop_einfo;
437         einfo->ei_type = LDLM_IBITS;
438         einfo->ei_mode = it_to_lock_mode(&item->mop_it);
439         einfo->ei_cb_bl = ll_md_blocking_ast;
440         einfo->ei_cb_cp = ldlm_completion_ast;
441         einfo->ei_cb_gl = NULL;
442         einfo->ei_cbdata = NULL;
443         einfo->ei_req_slot = 1;
444
445         return item;
446 }
447
448 /*
449  * release resources used in async stat RPC, update entry state and wakeup if
450  * scanner process it waiting on this entry.
451  */
452 static void
453 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
454 {
455         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
456         bool wakeup;
457
458         spin_lock(&lli->lli_sa_lock);
459         wakeup = __sa_make_ready(sai, entry, ret);
460         spin_unlock(&lli->lli_sa_lock);
461
462         if (wakeup)
463                 wake_up(&sai->sai_waitq);
464 }
465
466 /* insert inode into the list of sai_agls */
467 static void ll_agl_add(struct ll_statahead_info *sai,
468                        struct inode *inode, int index)
469 {
470         struct ll_inode_info *child  = ll_i2info(inode);
471         struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
472
473         spin_lock(&child->lli_agl_lock);
474         if (child->lli_agl_index == 0) {
475                 child->lli_agl_index = index;
476                 spin_unlock(&child->lli_agl_lock);
477
478                 LASSERT(list_empty(&child->lli_agl_list));
479
480                 spin_lock(&parent->lli_agl_lock);
481                 /* Re-check under the lock */
482                 if (agl_should_run(sai, inode)) {
483                         if (agl_list_empty(sai))
484                                 wake_up_process(sai->sai_agl_task);
485                         igrab(inode);
486                         list_add_tail(&child->lli_agl_list, &sai->sai_agls);
487                 } else
488                         child->lli_agl_index = 0;
489                 spin_unlock(&parent->lli_agl_lock);
490         } else {
491                 spin_unlock(&child->lli_agl_lock);
492         }
493 }
494
495 /* Allocate sax */
496 static struct ll_statahead_context *ll_sax_alloc(struct inode *dir)
497 {
498         struct ll_statahead_context *ctx;
499         int i;
500
501         ENTRY;
502
503         OBD_ALLOC_PTR(ctx);
504         if (ctx == NULL)
505                 RETURN(NULL);
506
507         ctx->sax_inode = igrab(dir);
508         atomic_set(&ctx->sax_refcount, 1);
509         INIT_LIST_HEAD(&ctx->sax_sai_list);
510         for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
511                 INIT_LIST_HEAD(&ctx->sax_cache[i]);
512                 spin_lock_init(&ctx->sax_cache_lock[i]);
513         }
514
515         RETURN(ctx);
516 }
517
518 static inline void ll_sax_free(struct ll_statahead_context *ctx)
519 {
520         LASSERT(ctx->sax_inode != NULL);
521         iput(ctx->sax_inode);
522         OBD_FREE_PTR(ctx);
523 }
524
525 static inline void __ll_sax_get(struct ll_statahead_context *ctx)
526 {
527         atomic_inc(&ctx->sax_refcount);
528 }
529
530 static inline struct ll_statahead_context *ll_sax_get(struct inode *dir)
531 {
532         struct ll_inode_info *lli = ll_i2info(dir);
533         struct ll_statahead_context *ctx = NULL;
534
535         spin_lock(&lli->lli_sa_lock);
536         ctx = lli->lli_sax;
537         if (ctx)
538                 __ll_sax_get(ctx);
539         spin_unlock(&lli->lli_sa_lock);
540
541         return ctx;
542 }
543
544 static inline void ll_sax_put(struct inode *dir,
545                               struct ll_statahead_context *ctx)
546 {
547         struct ll_inode_info *lli = ll_i2info(dir);
548
549         if (atomic_dec_and_lock(&ctx->sax_refcount, &lli->lli_sa_lock)) {
550                 LASSERT(list_empty(&ctx->sax_sai_list));
551                 lli->lli_sai = NULL;
552                 lli->lli_sax = NULL;
553                 if (lli->lli_sa_pattern & (LSA_PATTERN_ADVISE |
554                                            LSA_PATTERN_FNAME)) {
555                         lli->lli_opendir_key = NULL;
556                         lli->lli_stat_pid = 0;
557                         lli->lli_sa_enabled = 0;
558                 }
559                 lli->lli_sa_pattern = LSA_PATTERN_NONE;
560                 spin_unlock(&lli->lli_sa_lock);
561
562                 ll_sax_free(ctx);
563         }
564 }
565
566 /* allocate sai */
567 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
568 {
569         struct ll_statahead_info *sai;
570         struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
571
572         ENTRY;
573
574         OBD_ALLOC_PTR(sai);
575         if (!sai)
576                 RETURN(NULL);
577
578         sai->sai_dentry = dget(dentry);
579         atomic_set(&sai->sai_refcount, 1);
580         sai->sai_max = ll_i2sbi(dentry->d_inode)->ll_sa_min;
581         sai->sai_index = 1;
582         init_waitqueue_head(&sai->sai_waitq);
583
584         INIT_LIST_HEAD(&sai->sai_item);
585         INIT_LIST_HEAD(&sai->sai_entries);
586         INIT_LIST_HEAD(&sai->sai_agls);
587
588         atomic_set(&sai->sai_cache_count, 0);
589
590         spin_lock(&sai_generation_lock);
591         lli->lli_sa_generation = ++sai_generation;
592         if (unlikely(sai_generation == 0))
593                 lli->lli_sa_generation = ++sai_generation;
594         spin_unlock(&sai_generation_lock);
595
596         RETURN(sai);
597 }
598
599 /* free sai */
600 static inline void ll_sai_free(struct ll_statahead_info *sai)
601 {
602         LASSERT(sai->sai_dentry != NULL);
603         dput(sai->sai_dentry);
604         OBD_FREE_PTR(sai);
605 }
606
607 static inline struct ll_statahead_info *
608 __ll_sai_get(struct ll_statahead_info *sai)
609 {
610         atomic_inc(&sai->sai_refcount);
611         return sai;
612 }
613
614 /*
615  * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
616  * attached to it.
617  */
618 static void ll_sai_put(struct ll_statahead_info *sai)
619 {
620         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
621
622         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
623                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
624                 struct sa_entry *entry, *next;
625
626                 lli->lli_sai = NULL;
627                 list_del_init(&sai->sai_item);
628                 spin_unlock(&lli->lli_sa_lock);
629
630                 LASSERT(!sai->sai_task);
631                 LASSERT(!sai->sai_agl_task);
632                 LASSERT(sai->sai_sent == sai->sai_replied);
633
634                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
635                                          se_list)
636                         sa_kill(sai, entry, false);
637
638                 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
639                 LASSERT(agl_list_empty(sai));
640
641                 ll_sai_free(sai);
642                 atomic_dec(&sbi->ll_sa_running);
643         }
644 }
645
646 /* Do NOT forget to drop inode refcount when into sai_agls. */
647 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
648 {
649         struct ll_inode_info *lli = ll_i2info(inode);
650         u64 index = lli->lli_agl_index;
651         ktime_t expire;
652         int rc;
653
654         ENTRY;
655
656         LASSERT(list_empty(&lli->lli_agl_list));
657
658         /* AGL maybe fall behind statahead with one entry */
659         if (is_omitted_entry(sai, index + 1)) {
660                 lli->lli_agl_index = 0;
661                 iput(inode);
662                 RETURN_EXIT;
663         }
664
665         /*
666          * In case of restore, the MDT has the right size and has already
667          * sent it back without granting the layout lock, inode is up-to-date.
668          * Then AGL (async glimpse lock) is useless.
669          * Also to glimpse we need the layout, in case of a runninh restore
670          * the MDT holds the layout lock so the glimpse will block up to the
671          * end of restore (statahead/agl will block)
672          */
673         if (test_bit(LLIF_FILE_RESTORING, &lli->lli_flags)) {
674                 lli->lli_agl_index = 0;
675                 iput(inode);
676                 RETURN_EXIT;
677         }
678
679         /* Someone is in glimpse (sync or async), do nothing. */
680         rc = down_write_trylock(&lli->lli_glimpse_sem);
681         if (rc == 0) {
682                 lli->lli_agl_index = 0;
683                 iput(inode);
684                 RETURN_EXIT;
685         }
686
687         /*
688          * Someone triggered glimpse within 1 sec before.
689          * 1) The former glimpse succeeded with glimpse lock granted by OST, and
690          *    if the lock is still cached on client, AGL needs to do nothing. If
691          *    it is cancelled by other client, AGL maybe cannot obtaion new lock
692          *    for no glimpse callback triggered by AGL.
693          * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
694          *    Under such case, it is quite possible that the OST will not grant
695          *    glimpse lock for AGL also.
696          * 3) The former glimpse failed, compared with other two cases, it is
697          *    relative rare. AGL can ignore such case, and it will not muchly
698          *    affect the performance.
699          */
700         expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
701         if (ktime_to_ns(lli->lli_glimpse_time) &&
702             ktime_before(expire, lli->lli_glimpse_time)) {
703                 up_write(&lli->lli_glimpse_sem);
704                 lli->lli_agl_index = 0;
705                 iput(inode);
706                 RETURN_EXIT;
707         }
708
709         CDEBUG(D_READA,
710                "Handling (init) async glimpse: inode = " DFID", idx = %llu\n",
711                PFID(&lli->lli_fid), index);
712
713         cl_agl(inode);
714         lli->lli_agl_index = 0;
715         lli->lli_glimpse_time = ktime_get();
716         up_write(&lli->lli_glimpse_sem);
717
718         CDEBUG(D_READA,
719                "Handled (init) async glimpse: inode= " DFID", idx = %llu, rc = %d\n",
720                PFID(&lli->lli_fid), index, rc);
721
722         iput(inode);
723
724         EXIT;
725 }
726
727 static void ll_statahead_interpret_fini(struct ll_inode_info *lli,
728                                         struct ll_statahead_info *sai,
729                                         struct md_op_item *item,
730                                         struct sa_entry *entry,
731                                         struct ptlrpc_request *req,
732                                         int rc)
733 {
734         /*
735          * First it will drop ldlm ibits lock refcount by calling
736          * ll_intent_drop_lock() in spite of failures. Do not worry about
737          * calling ll_intent_drop_lock() more than once.
738          */
739         ll_intent_release(&item->mop_it);
740         sa_fini_data(item);
741         if (req)
742                 ptlrpc_req_finished(req);
743         sa_make_ready(sai, entry, rc);
744
745         spin_lock(&lli->lli_sa_lock);
746         sai->sai_replied++;
747         spin_unlock(&lli->lli_sa_lock);
748 }
749
750 static void ll_statahead_interpret_work(struct work_struct *work)
751 {
752         struct md_op_item *item = container_of(work, struct md_op_item,
753                                                mop_work);
754         struct req_capsule *pill = item->mop_pill;
755         struct inode *dir = item->mop_dir;
756         struct ll_inode_info *lli = ll_i2info(dir);
757         struct ll_statahead_info *sai;
758         struct lookup_intent *it;
759         struct sa_entry *entry;
760         struct mdt_body *body;
761         struct inode *child;
762         int rc;
763
764         ENTRY;
765
766         entry = (struct sa_entry *)item->mop_cbdata;
767         LASSERT(entry->se_handle != 0);
768
769         sai = entry->se_sai;
770         it = &item->mop_it;
771         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
772         if (!body)
773                 GOTO(out, rc = -EFAULT);
774
775         child = entry->se_inode;
776         /* revalidate; unlinked and re-created with the same name */
777         if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
778                      !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
779                 if (child) {
780                         entry->se_inode = NULL;
781                         iput(child);
782                 }
783                 /* The mdt_body is invalid. Skip this entry */
784                 GOTO(out, rc = -EAGAIN);
785         }
786
787         it->it_lock_handle = entry->se_handle;
788         rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
789         if (rc != 1)
790                 GOTO(out, rc = -EAGAIN);
791
792         rc = ll_prep_inode(&child, pill, dir->i_sb, it);
793         if (rc) {
794                 CERROR("%s: getattr callback for %.*s "DFID": rc = %d\n",
795                        ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
796                        entry->se_qstr.name, PFID(&entry->se_fid), rc);
797                 GOTO(out, rc);
798         }
799
800         /* If encryption context was returned by MDT, put it in
801          * inode now to save an extra getxattr.
802          */
803         if (body->mbo_valid & OBD_MD_ENCCTX) {
804                 void *encctx = req_capsule_server_get(pill, &RMF_FILE_ENCCTX);
805                 __u32 encctxlen = req_capsule_get_size(pill, &RMF_FILE_ENCCTX,
806                                                        RCL_SERVER);
807
808                 if (encctxlen) {
809                         CDEBUG(D_SEC,
810                                "server returned encryption ctx for "DFID"\n",
811                                PFID(ll_inode2fid(child)));
812                         rc = ll_xattr_cache_insert(child,
813                                                    xattr_for_enc(child),
814                                                    encctx, encctxlen);
815                         if (rc)
816                                 CWARN("%s: cannot set enc ctx for "DFID": rc = %d\n",
817                                       ll_i2sbi(child)->ll_fsname,
818                                       PFID(ll_inode2fid(child)), rc);
819                 }
820         }
821
822         CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
823                ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
824                entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
825         ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
826
827         entry->se_inode = child;
828
829         if (agl_should_run(sai, child))
830                 ll_agl_add(sai, child, entry->se_index);
831 out:
832         ll_statahead_interpret_fini(lli, sai, item, entry, pill->rc_req, rc);
833 }
834
835 /*
836  * Callback for async stat RPC, this is called in ptlrpcd context. It prepares
837  * the inode and set lock data directly in the ptlrpcd context. It will wake up
838  * the directory listing process if the dentry is the waiting one.
839  */
840 static int ll_statahead_interpret(struct md_op_item *item, int rc)
841 {
842         struct req_capsule *pill = item->mop_pill;
843         struct lookup_intent *it = &item->mop_it;
844         struct inode *dir = item->mop_dir;
845         struct ll_inode_info *lli = ll_i2info(dir);
846         struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
847         struct work_struct *work = &item->mop_work;
848         struct ll_statahead_info *sai;
849         struct mdt_body *body;
850         struct inode *child;
851         __u64 handle = 0;
852
853         ENTRY;
854
855         if (it_disposition(it, DISP_LOOKUP_NEG))
856                 rc = -ENOENT;
857
858         /*
859          * because statahead thread will wait for all inflight RPC to finish,
860          * sai should be always valid, no need to refcount
861          */
862         LASSERT(entry != NULL);
863         sai = entry->se_sai;
864         LASSERT(sai != NULL);
865
866         CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
867                entry->se_qstr.len, entry->se_qstr.name, rc);
868
869         if (rc != 0)
870                 GOTO(out, rc);
871
872         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
873         if (!body)
874                 GOTO(out, rc = -EFAULT);
875
876         child = entry->se_inode;
877         /*
878          * revalidate; unlinked and re-created with the same name.
879          * exclude the case where FID is zero as it was from statahead with
880          * regularized file name pattern and had no idea for the FID of the
881          * children file.
882          */
883         if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
884                      !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
885                 if (child) {
886                         entry->se_inode = NULL;
887                         iput(child);
888                 }
889                 /* The mdt_body is invalid. Skip this entry */
890                 GOTO(out, rc = -EAGAIN);
891         }
892
893         entry->se_handle = it->it_lock_handle;
894         /*
895          * In ptlrpcd context, it is not allowed to generate new RPCs
896          * especially for striped directories or regular files with layout
897          * change.
898          */
899         /*
900          * release ibits lock ASAP to avoid deadlock when statahead
901          * thread enqueues lock on parent in readdir and another
902          * process enqueues lock on child with parent lock held, eg.
903          * unlink.
904          */
905         handle = it->it_lock_handle;
906         ll_intent_drop_lock(it);
907         ll_unlock_md_op_lsm(&item->mop_data);
908
909         /*
910          * If the statahead entry is a striped directory or regular file with
911          * layout change, it will generate a new RPC and long wait in the
912          * ptlrpcd context.
913          * However, it is dangerous of blocking in ptlrpcd thread.
914          * Here we use work queue or the separate statahead thread to handle
915          * the extra RPC and long wait:
916          *      (@ll_prep_inode->@lmv_revalidate_slaves);
917          *      (@ll_prep_inode->@lov_layout_change->osc_cache_wait_range);
918          */
919         INIT_WORK(work, ll_statahead_interpret_work);
920         ptlrpc_request_addref(pill->rc_req);
921         schedule_work(work);
922         RETURN(0);
923 out:
924         ll_statahead_interpret_fini(lli, sai, item, entry, NULL, rc);
925         RETURN(rc);
926 }
927
928 static inline int sa_getattr(struct ll_statahead_info *sai, struct inode *dir,
929                              struct md_op_item *item)
930 {
931         int rc;
932
933         if (sa_has_batch_handle(sai))
934                 rc = md_batch_add(ll_i2mdexp(dir), sai->sai_bh, item);
935         else
936                 rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
937
938         return rc;
939 }
940
941 /* async stat for file not found in dcache */
942 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
943 {
944         struct md_op_item *item;
945         int rc;
946
947         ENTRY;
948
949         item = sa_prep_data(dir, NULL, entry);
950         if (IS_ERR(item))
951                 RETURN(PTR_ERR(item));
952
953         rc = sa_getattr(entry->se_sai, dir, item);
954         if (rc < 0)
955                 sa_fini_data(item);
956
957         RETURN(rc);
958 }
959
960 /**
961  * async stat for file found in dcache, similar to .revalidate
962  *
963  * \retval      1 dentry valid, no RPC sent
964  * \retval      0 dentry invalid, will send async stat RPC
965  * \retval      negative number upon error
966  */
967 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
968                          struct dentry *dentry)
969 {
970         struct inode *inode = dentry->d_inode;
971         struct lookup_intent it = { .it_op = IT_GETATTR,
972                                     .it_lock_handle = 0 };
973         struct md_op_item *item;
974         int rc;
975
976         ENTRY;
977
978         if (unlikely(!inode))
979                 RETURN(1);
980
981         if (d_mountpoint(dentry))
982                 RETURN(1);
983
984         item = sa_prep_data(dir, inode, entry);
985         if (IS_ERR(item))
986                 RETURN(PTR_ERR(item));
987
988         entry->se_inode = igrab(inode);
989         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
990                                 NULL);
991         if (rc == 1) {
992                 entry->se_handle = it.it_lock_handle;
993                 ll_intent_release(&it);
994                 sa_fini_data(item);
995                 RETURN(1);
996         }
997
998         rc = sa_getattr(entry->se_sai, dir, item);
999         if (rc < 0) {
1000                 entry->se_inode = NULL;
1001                 iput(inode);
1002                 sa_fini_data(item);
1003         }
1004
1005         RETURN(rc);
1006 }
1007
1008 /* async stat for file with @name */
1009 static void sa_statahead(struct ll_statahead_info *sai, struct dentry *parent,
1010                          const char *name, int len, const struct lu_fid *fid)
1011 {
1012         struct inode *dir = parent->d_inode;
1013         struct dentry *dentry = NULL;
1014         struct sa_entry *entry;
1015         int rc;
1016
1017         ENTRY;
1018
1019         entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
1020         if (IS_ERR(entry))
1021                 RETURN_EXIT;
1022
1023         dentry = d_lookup(parent, &entry->se_qstr);
1024         if (!dentry) {
1025                 rc = sa_lookup(dir, entry);
1026         } else {
1027                 rc = sa_revalidate(dir, entry, dentry);
1028                 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
1029                         ll_agl_add(sai, dentry->d_inode, entry->se_index);
1030         }
1031
1032         if (dentry)
1033                 dput(dentry);
1034
1035         if (rc != 0)
1036                 sa_make_ready(sai, entry, rc);
1037         else
1038                 sai->sai_sent++;
1039
1040         sai->sai_index++;
1041
1042         if (sa_sent_full(sai))
1043                 ll_statahead_flush_nowait(sai);
1044
1045         EXIT;
1046 }
1047
1048 /* async glimpse (agl) thread main function */
1049 static int ll_agl_thread(void *arg)
1050 {
1051         /*
1052          * We already own this reference, so it is safe to take it
1053          * without a lock.
1054          */
1055         struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
1056         struct dentry *parent = sai->sai_dentry;
1057         struct inode *dir = parent->d_inode;
1058         struct ll_inode_info *plli = ll_i2info(dir);
1059         struct ll_inode_info *clli;
1060
1061         ENTRY;
1062
1063         CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
1064                sai, parent);
1065
1066         while (({set_current_state(TASK_IDLE);
1067                  !kthread_should_stop(); })) {
1068                 spin_lock(&plli->lli_agl_lock);
1069                 clli = list_first_entry_or_null(&sai->sai_agls,
1070                                                 struct ll_inode_info,
1071                                                 lli_agl_list);
1072                 if (clli) {
1073                         __set_current_state(TASK_RUNNING);
1074                         list_del_init(&clli->lli_agl_list);
1075                         spin_unlock(&plli->lli_agl_lock);
1076                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
1077                         cond_resched();
1078                 } else {
1079                         spin_unlock(&plli->lli_agl_lock);
1080                         schedule();
1081                 }
1082         }
1083         __set_current_state(TASK_RUNNING);
1084         RETURN(0);
1085 }
1086
1087 static void ll_stop_agl(struct ll_statahead_info *sai)
1088 {
1089         struct dentry *parent = sai->sai_dentry;
1090         struct ll_inode_info *plli = ll_i2info(parent->d_inode);
1091         struct ll_inode_info *clli;
1092         struct task_struct *agl_task;
1093
1094         spin_lock(&plli->lli_agl_lock);
1095         agl_task = sai->sai_agl_task;
1096         sai->sai_agl_task = NULL;
1097         spin_unlock(&plli->lli_agl_lock);
1098         if (!agl_task)
1099                 return;
1100
1101         CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
1102                sai, (unsigned int)agl_task->pid);
1103         kthread_stop(agl_task);
1104
1105         spin_lock(&plli->lli_agl_lock);
1106         while ((clli = list_first_entry_or_null(&sai->sai_agls,
1107                                                 struct ll_inode_info,
1108                                                 lli_agl_list)) != NULL) {
1109                 list_del_init(&clli->lli_agl_list);
1110                 spin_unlock(&plli->lli_agl_lock);
1111                 clli->lli_agl_index = 0;
1112                 iput(&clli->lli_vfs_inode);
1113                 spin_lock(&plli->lli_agl_lock);
1114         }
1115         spin_unlock(&plli->lli_agl_lock);
1116         CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
1117                sai, parent);
1118         ll_sai_put(sai);
1119 }
1120
1121 /* start agl thread */
1122 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
1123 {
1124         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1125         struct ll_inode_info *plli;
1126         struct task_struct *task;
1127
1128         ENTRY;
1129
1130         CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
1131                sai, parent);
1132
1133         plli = ll_i2info(parent->d_inode);
1134         task = kthread_create_on_node(ll_agl_thread, sai, node, "ll_agl_%d",
1135                                       plli->lli_stat_pid);
1136         if (IS_ERR(task)) {
1137                 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
1138                 RETURN_EXIT;
1139         }
1140         sai->sai_agl_task = task;
1141         atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
1142         /* Get an extra reference that the thread holds */
1143         __ll_sai_get(sai);
1144
1145         wake_up_process(task);
1146
1147         EXIT;
1148 }
1149
1150 static int ll_statahead_by_list(struct dentry *parent)
1151 {
1152         struct inode *dir = parent->d_inode;
1153         struct ll_inode_info *lli = ll_i2info(dir);
1154         struct ll_statahead_info *sai = lli->lli_sai;
1155         struct ll_sb_info *sbi = ll_i2sbi(dir);
1156         struct md_op_data *op_data;
1157         struct page *page = NULL;
1158         __u64 pos = 0;
1159         int first = 0;
1160         int rc = 0;
1161
1162         ENTRY;
1163
1164         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1165                sai, parent);
1166
1167         OBD_ALLOC_PTR(op_data);
1168         if (!op_data)
1169                 RETURN(-ENOMEM);
1170
1171         while (pos != MDS_DIR_END_OFF &&
1172                /* matches smp_store_release() in ll_deauthorize_statahead() */
1173                smp_load_acquire(&sai->sai_task) &&
1174                lli->lli_sa_enabled) {
1175                 struct lu_dirpage *dp;
1176                 struct lu_dirent  *ent;
1177
1178                 op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
1179                                              LUSTRE_OPC_ANY, dir);
1180                 if (IS_ERR(op_data)) {
1181                         rc = PTR_ERR(op_data);
1182                         break;
1183                 }
1184
1185                 page = ll_get_dir_page(dir, op_data, pos, NULL);
1186                 ll_unlock_md_op_lsm(op_data);
1187                 if (IS_ERR(page)) {
1188                         rc = PTR_ERR(page);
1189                         CDEBUG(D_READA,
1190                                "error reading dir "DFID" at %llu /%llu stat_pid = %u: rc = %d\n",
1191                                PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1192                                lli->lli_stat_pid, rc);
1193                         break;
1194                 }
1195
1196                 dp = page_address(page);
1197                 for (ent = lu_dirent_start(dp);
1198                      /* matches smp_store_release() in ll_deauthorize_statahead() */
1199                      ent != NULL && smp_load_acquire(&sai->sai_task) &&
1200                      !sa_low_hit(sai) && lli->lli_sa_enabled;
1201                      ent = lu_dirent_next(ent)) {
1202                         __u64 hash;
1203                         int namelen;
1204                         char *name;
1205                         struct lu_fid fid;
1206                         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1207
1208                         hash = le64_to_cpu(ent->lde_hash);
1209                         if (unlikely(hash < pos))
1210                                 /*
1211                                  * Skip until we find target hash value.
1212                                  */
1213                                 continue;
1214
1215                         namelen = le16_to_cpu(ent->lde_namelen);
1216                         if (unlikely(namelen == 0))
1217                                 /*
1218                                  * Skip dummy record.
1219                                  */
1220                                 continue;
1221
1222                         name = ent->lde_name;
1223                         if (name[0] == '.') {
1224                                 if (namelen == 1) {
1225                                         /*
1226                                          * skip "."
1227                                          */
1228                                         continue;
1229                                 } else if (name[1] == '.' && namelen == 2) {
1230                                         /*
1231                                          * skip ".."
1232                                          */
1233                                         continue;
1234                                 } else if (!sai->sai_ls_all) {
1235                                         /*
1236                                          * skip hidden files.
1237                                          */
1238                                         sai->sai_skip_hidden++;
1239                                         continue;
1240                                 }
1241                         }
1242
1243                         /*
1244                          * don't stat-ahead first entry.
1245                          */
1246                         if (unlikely(++first == 1))
1247                                 continue;
1248
1249                         fid_le_to_cpu(&fid, &ent->lde_fid);
1250
1251                         while (({set_current_state(TASK_IDLE);
1252                                  /* matches smp_store_release() in
1253                                   * ll_deauthorize_statahead()
1254                                   */
1255                                  smp_load_acquire(&sai->sai_task); })) {
1256                                 long timeout;
1257
1258                                 spin_lock(&lli->lli_agl_lock);
1259                                 while (sa_sent_full(sai) &&
1260                                        !agl_list_empty(sai)) {
1261                                         struct ll_inode_info *clli;
1262
1263                                         __set_current_state(TASK_RUNNING);
1264                                         clli = agl_first_entry(sai);
1265                                         list_del_init(&clli->lli_agl_list);
1266                                         spin_unlock(&lli->lli_agl_lock);
1267
1268                                         ll_agl_trigger(&clli->lli_vfs_inode,
1269                                                        sai);
1270                                         cond_resched();
1271                                         spin_lock(&lli->lli_agl_lock);
1272                                 }
1273                                 spin_unlock(&lli->lli_agl_lock);
1274
1275                                 if (!sa_sent_full(sai))
1276                                         break;
1277
1278                                 /*
1279                                  * If the thread is not doing stat in
1280                                  * @sbi->ll_sa_timeout (30s) then it probably
1281                                  * does not care too much about performance,
1282                                  * or is no longer using this directory.
1283                                  * Stop the statahead thread in this case.
1284                                  */
1285                                 timeout = schedule_timeout(
1286                                         cfs_time_seconds(sbi->ll_sa_timeout));
1287                                 if (timeout == 0) {
1288                                         lli->lli_sa_enabled = 0;
1289                                         break;
1290                                 }
1291                         }
1292                         __set_current_state(TASK_RUNNING);
1293
1294                         if (IS_ENCRYPTED(dir)) {
1295                                 struct llcrypt_str de_name =
1296                                         LLTR_INIT(ent->lde_name, namelen);
1297                                 struct lu_fid fid;
1298
1299                                 rc = llcrypt_fname_alloc_buffer(dir, NAME_MAX,
1300                                                                 &lltr);
1301                                 if (rc < 0)
1302                                         continue;
1303
1304                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1305                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1306                                                          &lltr, &fid)) {
1307                                         llcrypt_fname_free_buffer(&lltr);
1308                                         continue;
1309                                 }
1310
1311                                 name = lltr.name;
1312                                 namelen = lltr.len;
1313                         }
1314
1315                         sa_statahead(sai, parent, name, namelen, &fid);
1316                         llcrypt_fname_free_buffer(&lltr);
1317                 }
1318
1319                 pos = le64_to_cpu(dp->ldp_hash_end);
1320                 ll_release_page(dir, page,
1321                                 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1322
1323                 if (sa_low_hit(sai)) {
1324                         rc = -EFAULT;
1325                         atomic_inc(&sbi->ll_sa_wrong);
1326                         CDEBUG(D_READA,
1327                                "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stoppingstatahead thread: pid %d\n",
1328                                PFID(&lli->lli_fid), sai->sai_hit,
1329                                sai->sai_miss, sai->sai_sent,
1330                                sai->sai_replied, current->pid);
1331                         break;
1332                 }
1333         }
1334         ll_finish_md_op_data(op_data);
1335
1336         RETURN(rc);
1337 }
1338
1339 static void ll_statahead_handle(struct ll_statahead_info *sai,
1340                                 struct dentry *parent, const char *name,
1341                                 int len, const struct lu_fid *fid)
1342 {
1343         struct inode *dir = parent->d_inode;
1344         struct ll_inode_info *lli = ll_i2info(dir);
1345         struct ll_sb_info *sbi = ll_i2sbi(dir);
1346         long timeout;
1347
1348         while (({set_current_state(TASK_IDLE);
1349                 /* matches smp_store_release() in ll_deauthorize_statahead() */
1350                  smp_load_acquire(&sai->sai_task); })) {
1351                 spin_lock(&lli->lli_agl_lock);
1352                 while (sa_sent_full(sai) && !agl_list_empty(sai)) {
1353                         struct ll_inode_info *clli;
1354
1355                         __set_current_state(TASK_RUNNING);
1356                         clli = agl_first_entry(sai);
1357                         list_del_init(&clli->lli_agl_list);
1358                         spin_unlock(&lli->lli_agl_lock);
1359
1360                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
1361                         cond_resched();
1362                         spin_lock(&lli->lli_agl_lock);
1363                 }
1364                 spin_unlock(&lli->lli_agl_lock);
1365
1366                 if (!sa_sent_full(sai))
1367                         break;
1368
1369                 /*
1370                  * If the thread is not doing a stat in 30s then it probably
1371                  * does not care too much about performance, or is no longer
1372                  * using this directory. Stop the statahead thread in this case.
1373                  */
1374                 timeout = schedule_timeout(
1375                                 cfs_time_seconds(sbi->ll_sa_timeout));
1376                 if (timeout == 0) {
1377                         lli->lli_sa_enabled = 0;
1378                         break;
1379                 }
1380         }
1381         __set_current_state(TASK_RUNNING);
1382
1383         sa_statahead(sai, parent, name, len, fid);
1384 }
1385
1386 static int ll_statahead_by_advise(struct ll_statahead_info *sai,
1387                                   struct dentry *parent)
1388 {
1389         struct inode *dir = parent->d_inode;
1390         struct ll_inode_info *lli = ll_i2info(dir);
1391         struct ll_sb_info *sbi = ll_i2sbi(dir);
1392         size_t max_len;
1393         size_t len;
1394         char *fname;
1395         char *ptr;
1396         int rc = 0;
1397         __u64 i = 0;
1398
1399         ENTRY;
1400
1401         CDEBUG(D_READA, "%s: ADVISE statahead: parent %pd fname prefix %s\n",
1402                sbi->ll_fsname, parent, sai->sai_fname);
1403
1404         OBD_ALLOC(fname, NAME_MAX);
1405         if (fname == NULL)
1406                 RETURN(-ENOMEM);
1407
1408         len = strlen(sai->sai_fname);
1409         memcpy(fname, sai->sai_fname, len);
1410         max_len = sizeof(sai->sai_fname) - len;
1411         ptr = fname + len;
1412
1413         /* matches smp_store_release() in ll_deauthorize_statahead() */
1414         while (smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled) {
1415                 size_t numlen;
1416
1417                 numlen = snprintf(ptr, max_len, "%llu",
1418                                   sai->sai_fstart + i);
1419
1420                 ll_statahead_handle(sai, parent, fname, len + numlen, NULL);
1421                 if (++i >= sai->sai_fend)
1422                         break;
1423         }
1424
1425         OBD_FREE(fname, NAME_MAX);
1426         RETURN(rc);
1427 }
1428
1429 static int ll_statahead_by_fname(struct ll_statahead_info *sai,
1430                                  struct dentry *parent)
1431 {
1432         struct inode *dir = parent->d_inode;
1433         struct ll_inode_info *lli = ll_i2info(dir);
1434         struct ll_sb_info *sbi = ll_i2sbi(dir);
1435         size_t max_len;
1436         size_t len;
1437         char *fname;
1438         char *ptr;
1439         int rc = 0;
1440
1441         ENTRY;
1442
1443         CDEBUG(D_READA, "%s: FNAME statahead: parent %pd fname prefix %s\n",
1444                sbi->ll_fsname, parent, sai->sai_fname);
1445
1446         OBD_ALLOC(fname, NAME_MAX);
1447         if (fname == NULL)
1448                 RETURN(-ENOMEM);
1449
1450         len = strlen(sai->sai_fname);
1451         memcpy(fname, sai->sai_fname, len);
1452         max_len = sizeof(sai->sai_fname) - len;
1453         ptr = fname + len;
1454
1455         /* matches smp_store_release() in ll_deauthorize_statahead() */
1456         while (smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled) {
1457                 size_t numlen;
1458
1459                 if (sai->sai_fname_zeroed_len)
1460                         numlen = snprintf(ptr, max_len, "%0*llu",
1461                                           sai->sai_fname_zeroed_len,
1462                                           ++sai->sai_fname_index);
1463                 else
1464                         numlen = snprintf(ptr, max_len, "%llu",
1465                                           ++sai->sai_fname_index);
1466
1467                 ll_statahead_handle(sai, parent, fname, len + numlen, NULL);
1468
1469                 if (sa_low_hit(sai)) {
1470                         rc = -EFAULT;
1471                         atomic_inc(&sbi->ll_sa_wrong);
1472                         CDEBUG(D_CACHE, "%s: low hit ratio for %pd "DFID": hit=%llu miss=%llu sent=%llu replied=%llu, stopping PID %d\n",
1473                                sbi->ll_fsname, parent, PFID(ll_inode2fid(dir)),
1474                                sai->sai_hit, sai->sai_miss, sai->sai_sent,
1475                                sai->sai_replied, current->pid);
1476                         break;
1477                 }
1478         }
1479
1480         OBD_FREE(fname, NAME_MAX);
1481         RETURN(rc);
1482 }
1483
1484 /* statahead thread main function */
1485 static int ll_statahead_thread(void *arg)
1486 {
1487         struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
1488         struct dentry *parent = sai->sai_dentry;
1489         struct inode *dir = parent->d_inode;
1490         struct ll_inode_info *lli = ll_i2info(dir);
1491         struct ll_sb_info *sbi = ll_i2sbi(dir);
1492         struct lu_batch *bh = NULL;
1493         int rc = 0;
1494
1495         ENTRY;
1496
1497         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1498                sai, parent);
1499
1500         sai->sai_max_batch_count = sbi->ll_sa_batch_max;
1501         if (sai->sai_max_batch_count) {
1502                 bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
1503                                      sai->sai_max_batch_count);
1504                 if (IS_ERR(bh))
1505                         GOTO(out_stop_agl, rc = PTR_ERR(bh));
1506         }
1507
1508         sai->sai_bh = bh;
1509
1510         switch (lli->lli_sa_pattern & LSA_PATTERN_MASK) {
1511         case LSA_PATTERN_LIST:
1512                 rc = ll_statahead_by_list(parent);
1513                 break;
1514         case LSA_PATTERN_ADVISE:
1515                 rc = ll_statahead_by_advise(sai, parent);
1516                 break;
1517         case LSA_PATTERN_FNAME:
1518                 rc = ll_statahead_by_fname(sai, parent);
1519                 break;
1520         default:
1521                 rc = -EFAULT;
1522                 break;
1523         }
1524
1525         if (rc < 0) {
1526                 spin_lock(&lli->lli_sa_lock);
1527                 sai->sai_task = NULL;
1528                 spin_unlock(&lli->lli_sa_lock);
1529         }
1530
1531         ll_statahead_flush_nowait(sai);
1532
1533         /*
1534          * statahead is finished, but statahead entries need to be cached, wait
1535          * for file release closedir() call to stop me.
1536          */
1537         while (({set_current_state(TASK_IDLE);
1538                 /* matches smp_store_release() in ll_deauthorize_statahead() */
1539                 smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled; })) {
1540                 schedule();
1541         }
1542         __set_current_state(TASK_RUNNING);
1543
1544         EXIT;
1545
1546         if (bh) {
1547                 rc = md_batch_stop(ll_i2mdexp(dir), sai->sai_bh);
1548                 sai->sai_bh = NULL;
1549         }
1550
1551 out_stop_agl:
1552         ll_stop_agl(sai);
1553
1554         /*
1555          * wait for inflight statahead RPCs to finish, and then we can free sai
1556          * safely because statahead RPC will access sai data
1557          */
1558         while (sai->sai_sent != sai->sai_replied)
1559                 /* in case we're not woken up, timeout wait */
1560                 msleep(125);
1561
1562         CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd hit %llu miss %llu\n",
1563                sbi->ll_fsname, sai, parent, sai->sai_hit, sai->sai_miss);
1564
1565         spin_lock(&lli->lli_sa_lock);
1566         sai->sai_task = NULL;
1567         spin_unlock(&lli->lli_sa_lock);
1568         wake_up(&sai->sai_waitq);
1569
1570         atomic_add(sai->sai_hit, &sbi->ll_sa_hit_total);
1571         atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
1572
1573         ll_sai_put(sai);
1574         ll_sax_put(dir, lli->lli_sax);
1575
1576         return rc;
1577 }
1578
1579 /* authorize opened dir handle @key to statahead */
1580 void ll_authorize_statahead(struct inode *dir, void *key)
1581 {
1582         struct ll_inode_info *lli = ll_i2info(dir);
1583
1584         spin_lock(&lli->lli_sa_lock);
1585         if (!lli->lli_opendir_key && !lli->lli_sai) {
1586                 /*
1587                  * if lli_sai is not NULL, it means previous statahead is not
1588                  * finished yet, we'd better not start a new statahead for now.
1589                  */
1590                 lli->lli_opendir_key = key;
1591                 lli->lli_stat_pid = current->pid;
1592                 lli->lli_sa_enabled = 1;
1593                 lli->lli_sa_pattern |= LSA_PATTERN_OPENDIR;
1594         }
1595         spin_unlock(&lli->lli_sa_lock);
1596 }
1597
1598 static void ll_deauthorize_statahead_advise(struct inode *dir, void *key)
1599 {
1600         struct ll_inode_info *lli = ll_i2info(dir);
1601         struct ll_file_data *fd = (struct ll_file_data *)key;
1602         struct ll_statahead_info *sai = fd->fd_sai;
1603
1604         if (sai == NULL)
1605                 return;
1606
1607         spin_lock(&lli->lli_sa_lock);
1608         if (sai->sai_task) {
1609                 struct task_struct *task = sai->sai_task;
1610
1611                 /* matches smp_load_acquire() in ll_statahead_thread() */
1612                 smp_store_release(&sai->sai_task, NULL);
1613                 wake_up_process(task);
1614         }
1615         fd->fd_sai = NULL;
1616         spin_unlock(&lli->lli_sa_lock);
1617         ll_sai_put(sai);
1618         LASSERT(lli->lli_sax != NULL);
1619         ll_sax_put(dir, lli->lli_sax);
1620 }
1621
1622 /*
1623  * deauthorize opened dir handle @key to statahead, and notify statahead thread
1624  * to quit if it's running.
1625  */
1626 void ll_deauthorize_statahead(struct inode *dir, void *key)
1627 {
1628         struct ll_inode_info *lli = ll_i2info(dir);
1629         struct ll_statahead_info *sai;
1630
1631         CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1632                PFID(&lli->lli_fid));
1633
1634         if (lli->lli_sa_pattern & LSA_PATTERN_ADVISE) {
1635                 ll_deauthorize_statahead_advise(dir, key);
1636                 return;
1637         }
1638
1639         LASSERT(lli->lli_stat_pid != 0);
1640         LASSERT(lli->lli_opendir_key == key);
1641         spin_lock(&lli->lli_sa_lock);
1642         lli->lli_opendir_key = NULL;
1643         lli->lli_stat_pid = 0;
1644         lli->lli_sa_enabled = 0;
1645         lli->lli_sa_pattern = LSA_PATTERN_NONE;
1646         lli->lli_sa_fname_index = 0;
1647         lli->lli_sa_match_count = 0;
1648         sai = lli->lli_sai;
1649         if (sai && sai->sai_task) {
1650                 /*
1651                  * statahead thread may not have quit yet because it needs to
1652                  * cache entries, now it's time to tell it to quit.
1653                  *
1654                  * wake_up_process() provides the necessary barriers
1655                  * to pair with set_current_state().
1656                  */
1657                 struct task_struct *task = sai->sai_task;
1658
1659                 /* matches smp_load_acquire() in ll_statahead_thread() */
1660                 smp_store_release(&sai->sai_task, NULL);
1661                 wake_up_process(task);
1662         }
1663         spin_unlock(&lli->lli_sa_lock);
1664 }
1665
1666 enum {
1667         /**
1668          * not first dirent, or is "."
1669          */
1670         LS_NOT_FIRST_DE = 0,
1671         /**
1672          * the first non-hidden dirent
1673          */
1674         LS_FIRST_DE,
1675         /**
1676          * the first hidden dirent, that is "."
1677          */
1678         LS_FIRST_DOT_DE
1679 };
1680
1681 /* file is first dirent under @dir */
1682 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1683 {
1684         struct qstr *target = &dentry->d_name;
1685         struct md_op_data *op_data;
1686         int dot_de;
1687         struct page *page = NULL;
1688         int rc = LS_NOT_FIRST_DE;
1689         __u64 pos = 0;
1690         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1691
1692         ENTRY;
1693
1694         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1695                                      LUSTRE_OPC_ANY, dir);
1696         if (IS_ERR(op_data))
1697                 RETURN(PTR_ERR(op_data));
1698
1699         if (IS_ENCRYPTED(dir)) {
1700                 int rc2 = llcrypt_fname_alloc_buffer(dir, NAME_MAX, &lltr);
1701
1702                 if (rc2 < 0)
1703                         RETURN(rc2);
1704         }
1705
1706         /**
1707          *FIXME choose the start offset of the readdir
1708          */
1709
1710         page = ll_get_dir_page(dir, op_data, 0, NULL);
1711
1712         while (1) {
1713                 struct lu_dirpage *dp;
1714                 struct lu_dirent  *ent;
1715
1716                 if (IS_ERR(page)) {
1717                         struct ll_inode_info *lli = ll_i2info(dir);
1718
1719                         rc = PTR_ERR(page);
1720                         CERROR("%s: reading dir "DFID" at %llu stat_pid = %u : rc = %d\n",
1721                                ll_i2sbi(dir)->ll_fsname,
1722                                PFID(ll_inode2fid(dir)), pos,
1723                                lli->lli_stat_pid, rc);
1724                         break;
1725                 }
1726
1727                 dp = page_address(page);
1728                 for (ent = lu_dirent_start(dp); ent != NULL;
1729                      ent = lu_dirent_next(ent)) {
1730                         __u64 hash;
1731                         int namelen;
1732                         char *name;
1733
1734                         hash = le64_to_cpu(ent->lde_hash);
1735                         /*
1736                          * The ll_get_dir_page() can return any page containing
1737                          * the given hash which may be not the start hash.
1738                          */
1739                         if (unlikely(hash < pos))
1740                                 continue;
1741
1742                         namelen = le16_to_cpu(ent->lde_namelen);
1743                         if (unlikely(namelen == 0))
1744                                 /*
1745                                  * skip dummy record.
1746                                  */
1747                                 continue;
1748
1749                         name = ent->lde_name;
1750                         if (name[0] == '.') {
1751                                 if (namelen == 1)
1752                                         /*
1753                                          * skip "."
1754                                          */
1755                                         continue;
1756                                 else if (name[1] == '.' && namelen == 2)
1757                                         /*
1758                                          * skip ".."
1759                                          */
1760                                         continue;
1761                                 else
1762                                         dot_de = 1;
1763                         } else {
1764                                 dot_de = 0;
1765                         }
1766
1767                         if (dot_de && target->name[0] != '.') {
1768                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1769                                        target->len, target->name,
1770                                        namelen, name);
1771                                 continue;
1772                         }
1773
1774                         if (IS_ENCRYPTED(dir)) {
1775                                 struct llcrypt_str de_name =
1776                                         LLTR_INIT(ent->lde_name, namelen);
1777                                 struct lu_fid fid;
1778
1779                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1780                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1781                                                          &lltr, &fid))
1782                                         continue;
1783                                 name = lltr.name;
1784                                 namelen = lltr.len;
1785                         }
1786
1787                         if (target->len != namelen ||
1788                             memcmp(target->name, name, namelen) != 0)
1789                                 rc = LS_NOT_FIRST_DE;
1790                         else if (!dot_de)
1791                                 rc = LS_FIRST_DE;
1792                         else
1793                                 rc = LS_FIRST_DOT_DE;
1794
1795                         ll_release_page(dir, page, false);
1796                         GOTO(out, rc);
1797                 }
1798                 pos = le64_to_cpu(dp->ldp_hash_end);
1799                 if (pos == MDS_DIR_END_OFF) {
1800                         /*
1801                          * End of directory reached.
1802                          */
1803                         ll_release_page(dir, page, false);
1804                         GOTO(out, rc);
1805                 } else {
1806                         /*
1807                          * chain is exhausted
1808                          * Normal case: continue to the next page.
1809                          */
1810                         ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1811                                               LDF_COLLIDE);
1812                         page = ll_get_dir_page(dir, op_data, pos, NULL);
1813                 }
1814         }
1815         EXIT;
1816 out:
1817         llcrypt_fname_free_buffer(&lltr);
1818         ll_finish_md_op_data(op_data);
1819
1820         return rc;
1821 }
1822
1823 static struct ll_statahead_info *
1824 ll_find_sai_locked(struct ll_statahead_context *ctx, pid_t pid)
1825 {
1826         struct ll_statahead_info *sai;
1827
1828         list_for_each_entry(sai, &ctx->sax_sai_list, sai_item) {
1829                 if (sai->sai_pid == pid)
1830                         return sai;
1831         }
1832         return NULL;
1833 }
1834
1835 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
1836                                   bool agl);
1837
1838 static int ll_shared_statahead_check(struct inode *dir, struct dentry *dentry,
1839                                      struct ll_statahead_context *ctx)
1840 {
1841         struct ll_inode_info *lli = ll_i2info(dir);
1842         struct ll_statahead_info *sai;
1843
1844         ENTRY;
1845
1846         spin_lock(&lli->lli_sa_lock);
1847         sai = lli->lli_sai;
1848         if (sai) {
1849                 if (sai->sai_pid == current->pid) {
1850                         spin_unlock(&lli->lli_sa_lock);
1851                         RETURN(0);
1852                 }
1853                 lli->lli_sai = NULL;
1854                 lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
1855         }
1856
1857         LASSERT(lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED);
1858         sai = ll_find_sai_locked(ctx, current->pid);
1859         if (sai) {
1860                 spin_unlock(&lli->lli_sa_lock);
1861                 RETURN(-EEXIST);
1862         }
1863
1864         spin_unlock(&lli->lli_sa_lock);
1865
1866         RETURN(start_statahead_thread(dir, dentry, true));
1867 }
1868
1869 /**
1870  * revalidate @dentryp from statahead cache
1871  *
1872  * \param[in] dir       parent directory
1873  * \param[in] sai       sai structure
1874  * \param[out] dentryp  pointer to dentry which will be revalidated
1875  * \param[in] unplug    unplug statahead window only (normally for negative
1876  *                      dentry)
1877  * \retval              1 on success, dentry is saved in @dentryp
1878  * \retval              0 if revalidation failed (no proper lock on client)
1879  * \retval              negative number upon error
1880  */
1881 static int revalidate_statahead_dentry(struct inode *dir,
1882                                        struct ll_statahead_context *ctx,
1883                                        struct dentry **dentryp,
1884                                        bool unplug)
1885 {
1886         struct sa_entry *entry = NULL;
1887         struct ll_inode_info *lli = ll_i2info(dir);
1888         struct ll_statahead_info *sai = lli->lli_sai;
1889         int rc = 0;
1890
1891         ENTRY;
1892
1893         if (sai && (*dentryp)->d_name.name[0] == '.') {
1894                 if (sai->sai_ls_all ||
1895                     sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1896                         /*
1897                          * Hidden dentry is the first one, or statahead
1898                          * thread does not skip so many hidden dentries
1899                          * before "sai_ls_all" enabled as below.
1900                          */
1901                 } else {
1902                         if (!sai->sai_ls_all)
1903                                 /*
1904                                  * It maybe because hidden dentry is not
1905                                  * the first one, "sai_ls_all" was not
1906                                  * set, then "ls -al" missed. Enable
1907                                  * "sai_ls_all" for such case.
1908                                  */
1909                                 sai->sai_ls_all = 1;
1910
1911                         /*
1912                          * Such "getattr" has been skipped before
1913                          * "sai_ls_all" enabled as above.
1914                          */
1915                         sai->sai_miss_hidden++;
1916                         RETURN(-EAGAIN);
1917                 }
1918         }
1919
1920         if (unplug)
1921                 GOTO(out, rc = 1);
1922
1923         entry = sa_get(ctx, &(*dentryp)->d_name);
1924         if (!entry) {
1925                 if (lli->lli_sa_pattern & LSA_PATTERN_FNAME)
1926                         rc = ll_shared_statahead_check(dir, *dentryp, ctx);
1927                 GOTO(out, rc = rc == 0 ? -EAGAIN : rc);
1928         }
1929
1930         if (lli->lli_sa_pattern & LSA_PATTERN_LIST)
1931                 LASSERT(sai == entry->se_sai);
1932         else if (lli->lli_sa_pattern & LSA_PATTERN_FNAME ||
1933                  lli->lli_sa_pattern == LSA_PATTERN_ADVISE)
1934                 sai = entry->se_sai;
1935
1936         LASSERTF(sai != NULL, "pattern %#X entry %p se_sai %p %pd lli %p\n",
1937                  lli->lli_sa_pattern, entry, entry->se_sai, *dentryp, lli);
1938         if (!sa_ready(entry)) {
1939                 spin_lock(&lli->lli_sa_lock);
1940                 sai->sai_index_wait = entry->se_index;
1941                 spin_unlock(&lli->lli_sa_lock);
1942                 rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
1943                                              cfs_time_seconds(30));
1944                 if (rc == 0) {
1945                         /*
1946                          * entry may not be ready, so it may be used by inflight
1947                          * statahead RPC, don't free it.
1948                          */
1949                         entry = NULL;
1950                         GOTO(out, rc = -EAGAIN);
1951                 }
1952         }
1953
1954         /*
1955          * We need to see the value that was set immediately before we
1956          * were woken up.
1957          */
1958         if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
1959             entry->se_inode) {
1960                 struct inode *inode = entry->se_inode;
1961                 struct lookup_intent it = { .it_op = IT_GETATTR,
1962                                             .it_lock_handle =
1963                                                 entry->se_handle };
1964                 __u64 bits;
1965
1966                 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1967                                         ll_inode2fid(inode), &bits);
1968                 if (rc == 1) {
1969                         if (!(*dentryp)->d_inode) {
1970                                 struct dentry *alias;
1971
1972                                 alias = ll_splice_alias(inode, *dentryp);
1973                                 if (IS_ERR(alias)) {
1974                                         ll_intent_release(&it);
1975                                         GOTO(out, rc = PTR_ERR(alias));
1976                                 }
1977                                 *dentryp = alias;
1978                                 /*
1979                                  * statahead prepared this inode, transfer inode
1980                                  * refcount from sa_entry to dentry
1981                                  */
1982                                 entry->se_inode = NULL;
1983                         } else if ((*dentryp)->d_inode != inode) {
1984                                 /* revalidate, but inode is recreated */
1985                                 CDEBUG(D_READA,
1986                                        "%s: stale dentry %pd inode " DFID", statahead inode "DFID "\n",
1987                                        ll_i2sbi(inode)->ll_fsname, *dentryp,
1988                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
1989                                        PFID(ll_inode2fid(inode)));
1990                                 ll_intent_release(&it);
1991                                 GOTO(out, rc = -ESTALE);
1992                         }
1993
1994                         if (bits & MDS_INODELOCK_LOOKUP) {
1995                                 d_lustre_revalidate(*dentryp);
1996                                 if (S_ISDIR(inode->i_mode))
1997                                         ll_update_dir_depth_dmv(dir, *dentryp);
1998                         }
1999
2000                         ll_intent_release(&it);
2001                 }
2002         }
2003 out:
2004         /*
2005          * statahead cached sa_entry can be used only once, and will be killed
2006          * right after use, so if lookup/revalidate accessed statahead cache,
2007          * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
2008          * stat this file again, we know we've done statahead before, see
2009          * dentry_may_statahead().
2010          */
2011         if (lld_is_init(*dentryp))
2012                 ll_d2d(*dentryp)->lld_sa_generation = lli->lli_sa_generation;
2013         sa_put(dir, sai, entry);
2014
2015         RETURN(rc);
2016 }
2017
2018 static inline bool
2019 sa_pattern_list_detect(struct inode *dir, struct dentry *dchild, int *first)
2020 {
2021         struct ll_inode_info *lli = ll_i2info(dir);
2022
2023         if (lli->lli_stat_pid == 0)
2024                 return false;
2025
2026         /* Directory listing needs to call opendir()/readdir()/stat(). */
2027         if (!(lli->lli_sa_pattern & LSA_PATTERN_OPENDIR))
2028                 return false;
2029
2030         if (lli->lli_sa_enabled == 0)
2031                 return false;
2032
2033         if (lli->lli_sa_pattern & LSA_PATTERN_LS_NOT_FIRST_DE)
2034                 return false;
2035
2036         *first = is_first_dirent(dir, dchild);
2037         if (*first == LS_NOT_FIRST_DE) {
2038                 /*
2039                  * It is not "ls -{a}l" operation, no need statahead for it.
2040                  * Disable statahead so that subsequent stat() won't waste
2041                  * time to try it.
2042                  */
2043                 spin_lock(&lli->lli_sa_lock);
2044                 if (lli->lli_stat_pid == current->pid) {
2045                         lli->lli_sa_enabled = 0;
2046                         lli->lli_sa_pattern |= LSA_PATTERN_LS_NOT_FIRST_DE;
2047                 }
2048                 spin_unlock(&lli->lli_sa_lock);
2049                 return false;
2050         }
2051
2052         spin_lock(&lli->lli_sa_lock);
2053         lli->lli_sa_pattern |= LSA_PATTERN_LIST;
2054         spin_unlock(&lli->lli_sa_lock);
2055         return true;
2056 }
2057
2058 static inline bool
2059 sa_pattern_fname_detect(struct inode *dir, struct dentry *dchild)
2060 {
2061         struct ll_inode_info *lli = ll_i2info(dir);
2062         struct qstr *dname = &dchild->d_name;
2063         const unsigned char *name = dname->name;
2064         bool rc = false;
2065         int i;
2066
2067         if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
2068                 return false;
2069         if (lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED)
2070                 return true;
2071
2072         /*
2073          * Parse the format of the file name to determine whether it matches
2074          * the supported file name pattern for statahead (i.e. mdtest.$rank.$i).
2075          */
2076         i = dname->len - 1;
2077         if (isdigit(name[i])) {
2078                 long num;
2079                 int ret;
2080
2081                 if (lli->lli_stat_pid == 0) {
2082                         lli->lli_stat_pid = current->pid;
2083                 } else if (lli->lli_stat_pid != current->pid) {
2084                         /*
2085                          * More than two processes (MPI ranks) doing stat()
2086                          * calls under this directory, consider it as a mdtest
2087                          * shared dir stat() workload.
2088                          */
2089                         spin_lock(&lli->lli_sa_lock);
2090                         lli->lli_stat_pid = current->pid;
2091                         if (lli->lli_sa_pattern & LSA_PATTERN_FNAME) {
2092                                 lli->lli_sai = NULL;
2093                                 rc = false;
2094                         } else {
2095                                 lli->lli_sa_pattern |= LSA_PATTERN_FNAME;
2096                                 rc = true;
2097                         }
2098                         lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
2099                         spin_unlock(&lli->lli_sa_lock);
2100                         return rc;
2101                 }
2102
2103                 while (--i >= 0 && isdigit(name[i]))
2104                         ; /* do nothing */
2105                 i++;
2106                 ret = kstrtol(&name[i], 0, &num);
2107                 if (ret)
2108                         GOTO(out, rc);
2109
2110                 /*
2111                  * The traversing program do multiple stat() calls on the same
2112                  * children entry. i.e. ls $dir*.
2113                  */
2114                 if (lli->lli_sa_fname_index == num)
2115                         return false;
2116
2117                 if (lli->lli_sa_match_count == 0 ||
2118                     num == lli->lli_sa_fname_index + 1) {
2119                         lli->lli_sa_match_count++;
2120                         lli->lli_sa_fname_index = num;
2121
2122                         if (lli->lli_sa_match_count > LSA_FN_MATCH_HIT)
2123                                 GOTO(out, rc = true);
2124
2125                         return false;
2126                 }
2127         }
2128 out:
2129         spin_lock(&lli->lli_sa_lock);
2130         if (rc) {
2131                 lli->lli_sa_pattern |= LSA_PATTERN_FNAME;
2132         } else {
2133                 lli->lli_sa_pattern = LSA_PATTERN_NONE;
2134                 lli->lli_sa_match_count = 0;
2135                 lli->lli_sa_fname_index = 0;
2136                 lli->lli_sa_enabled = 0;
2137         }
2138         spin_unlock(&lli->lli_sa_lock);
2139
2140         return rc;
2141 }
2142
2143 /* detect the statahead pattern. */
2144 static inline bool
2145 sa_pattern_detect(struct inode *dir, struct dentry *dchild, int *first)
2146 {
2147         return sa_pattern_list_detect(dir, dchild, first) ||
2148                sa_pattern_fname_detect(dir, dchild);
2149 }
2150
2151 static inline int ll_sax_add_sai(struct ll_statahead_context *ctx,
2152                                  struct ll_statahead_info *sai)
2153 {
2154         if (ll_find_sai_locked(ctx, sai->sai_pid) != NULL)
2155                 return -EEXIST;
2156
2157         list_add_tail(&sai->sai_item, &ctx->sax_sai_list);
2158         return 0;
2159 }
2160
2161 /**
2162  * start statahead thread
2163  *
2164  * \param[in] dir       parent directory
2165  * \param[in] dentry    dentry that triggers statahead, normally the first
2166  *                      dirent under @dir
2167  * \param[in] agl       indicate whether AGL is needed
2168  * \retval              -EAGAIN on success, because when this function is
2169  *                      called, it's already in lookup call, so client should
2170  *                      do it itself instead of waiting for statahead thread
2171  *                      to do it asynchronously.
2172  * \retval              negative number upon error
2173  */
2174 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
2175                                   bool agl)
2176 {
2177         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
2178         struct ll_inode_info *lli = ll_i2info(dir);
2179         struct ll_statahead_info *sai = NULL;
2180         struct ll_statahead_context *ctx = NULL;
2181         struct dentry *parent;
2182         struct task_struct *task;
2183         struct ll_sb_info *sbi;
2184         int first = LS_FIRST_DE;
2185         int rc = 0;
2186
2187         ENTRY;
2188
2189         if (sa_pattern_detect(dir, dentry, &first) == false)
2190                 RETURN(0);
2191
2192         parent = dget_parent(dentry);
2193         sbi = ll_i2sbi(d_inode(parent));
2194         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
2195                                        sbi->ll_sa_running_max)) {
2196                 CDEBUG(D_READA,
2197                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
2198                 dput(parent);
2199                 GOTO(out, rc = -EMFILE);
2200         }
2201
2202         /* on success ll_sai_alloc holds a ref on parent */
2203         sai = ll_sai_alloc(parent);
2204         dput(parent);
2205         if (!sai)
2206                 GOTO(out, rc = -ENOMEM);
2207
2208         sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
2209         sai->sai_pid = current->pid;
2210
2211         if (lli->lli_sa_pattern & LSA_PATTERN_FNAME) {
2212                 struct qstr *dname = &dentry->d_name;
2213                 const unsigned char *name = dname->name;
2214                 long num;
2215                 int i;
2216
2217                 if (dname->len >= sizeof(sai->sai_fname))
2218                         GOTO(out, rc = -ERANGE);
2219
2220                 i = dname->len;
2221                 while (--i >= 0 && isdigit(name[i]))
2222                         ; /* do nothing */
2223                 i++;
2224                 rc = kstrtol(&name[i], 0, &num);
2225                 if (rc)
2226                         GOTO(out, rc);
2227
2228                 memcpy(sai->sai_fname, dname->name, i);
2229                 sai->sai_fname[i] = '\0';
2230                 sai->sai_fname_index = num;
2231                 /* The front part of the file name is zeroed padding. */
2232                 if (name[i] == '0')
2233                         sai->sai_fname_zeroed_len = dname->len - i;
2234         }
2235
2236         /* The workload like directory listing or mdtest unique dir stat() */
2237         if (lli->lli_sa_pattern & LSA_PATTERN_LIST ||
2238             (lli->lli_sa_pattern & (LSA_PATTERN_FN_SHARED |
2239                                     LSA_PATTERN_FNAME)) == LSA_PATTERN_FNAME) {
2240                 ctx = ll_sax_alloc(dir);
2241                 if (!ctx)
2242                         GOTO(out, rc = -ENOMEM);
2243
2244                 /*
2245                  * if current lli_opendir_key was deauthorized, or dir
2246                  * re-opened by another process, don't start statahead,
2247                  * otherwise the newly spawned statahead thread won't be
2248                  * notified to quit.
2249                  */
2250                 spin_lock(&lli->lli_sa_lock);
2251                 if (unlikely(lli->lli_sai || lli->lli_sax ||
2252                              ((lli->lli_sa_pattern & LSA_PATTERN_LIST) &&
2253                               !lli->lli_opendir_key &&
2254                               lli->lli_stat_pid != current->pid))) {
2255                         spin_unlock(&lli->lli_sa_lock);
2256                         GOTO(out, rc = -EPERM);
2257                 }
2258                 rc = ll_sax_add_sai(ctx, sai);
2259                 if (rc) {
2260                         spin_unlock(&lli->lli_sa_lock);
2261                         GOTO(out, rc);
2262                 }
2263                 lli->lli_sai = sai;
2264                 lli->lli_sax = ctx;
2265                 spin_unlock(&lli->lli_sa_lock);
2266         } else if (lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED) {
2267                 /* For mdtest shared dir stat() workload */
2268                 LASSERT(lli->lli_sa_pattern & LSA_PATTERN_FNAME);
2269                 ctx = ll_sax_get(dir);
2270                 if (ctx == NULL) {
2271                         ctx = ll_sax_alloc(dir);
2272                         if (ctx == NULL)
2273                                 GOTO(out, rc = -ENOMEM);
2274
2275                         spin_lock(&lli->lli_sa_lock);
2276                         if (lli->lli_sax) {
2277                                 struct ll_statahead_context *tmp = ctx;
2278
2279                                 if (lli->lli_sa_pattern &
2280                                     LSA_PATTERN_FN_SHARED) {
2281                                         ctx = lli->lli_sax;
2282                                         __ll_sax_get(ctx);
2283                                         rc = ll_sax_add_sai(ctx, sai);
2284                                 } else {
2285                                         CWARN("%s: invalid pattern %#X.\n",
2286                                               sbi->ll_fsname,
2287                                               lli->lli_sa_pattern);
2288                                         rc = -EINVAL;
2289                                 }
2290
2291                                 spin_unlock(&lli->lli_sa_lock);
2292                                 ll_sax_free(tmp);
2293                                 if (rc)
2294                                         GOTO(out, rc);
2295                         } else {
2296                                 lli->lli_sax = ctx;
2297                                 rc = ll_sax_add_sai(ctx, sai);
2298                                 spin_unlock(&lli->lli_sa_lock);
2299                         }
2300                 } else {
2301                         spin_lock(&lli->lli_sa_lock);
2302                         if (!(lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED)) {
2303                                 spin_unlock(&lli->lli_sa_lock);
2304                                 GOTO(out, rc = -EINVAL);
2305                         }
2306
2307                         rc = ll_sax_add_sai(ctx, sai);
2308                         spin_unlock(&lli->lli_sa_lock);
2309                 }
2310
2311                 if (rc)
2312                         GOTO(out, rc);
2313         } else {
2314                 CERROR("%s: unsupported statahead pattern %#X.\n",
2315                        sbi->ll_fsname, lli->lli_sa_pattern);
2316                 GOTO(out, rc = -EOPNOTSUPP);
2317         }
2318
2319         CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
2320                current->pid, parent);
2321
2322         task = kthread_create_on_node(ll_statahead_thread, sai, node,
2323                                       "ll_sa_%u", lli->lli_stat_pid);
2324         if (IS_ERR(task)) {
2325                 spin_lock(&lli->lli_sa_lock);
2326                 lli->lli_sai = NULL;
2327                 spin_unlock(&lli->lli_sa_lock);
2328                 rc = PTR_ERR(task);
2329                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
2330                 GOTO(out, rc);
2331         }
2332
2333         if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
2334                 ll_start_agl(parent, sai);
2335
2336         atomic_inc(&sbi->ll_sa_total);
2337         if (lli->lli_sa_pattern & LSA_PATTERN_LIST)
2338                 atomic_inc(&sbi->ll_sa_list_total);
2339         else if (lli->lli_sa_pattern & LSA_PATTERN_FNAME)
2340                 atomic_inc(&sbi->ll_sa_fname_total);
2341
2342         sai->sai_task = task;
2343         wake_up_process(task);
2344         /*
2345          * We don't stat-ahead for the first dirent since we are already in
2346          * lookup.
2347          */
2348         RETURN(-EAGAIN);
2349
2350 out:
2351         /*
2352          * once we start statahead thread failed, disable statahead so that
2353          * subsequent stat won't waste time to try it.
2354          */
2355         spin_lock(&lli->lli_sa_lock);
2356         if (lli->lli_stat_pid == current->pid)
2357                 lli->lli_sa_enabled = 0;
2358         spin_unlock(&lli->lli_sa_lock);
2359
2360         if (sai)
2361                 ll_sai_free(sai);
2362
2363         if (ctx)
2364                 ll_sax_free(ctx);
2365
2366         if (rc)
2367                 atomic_dec(&sbi->ll_sa_running);
2368
2369         RETURN(rc);
2370 }
2371
2372 /*
2373  * Check whether statahead for @dir was started.
2374  */
2375 static inline bool ll_statahead_started(struct inode *dir, bool agl)
2376 {
2377         struct ll_inode_info *lli = ll_i2info(dir);
2378         struct ll_statahead_context *ctx;
2379         struct ll_statahead_info *sai;
2380
2381         spin_lock(&lli->lli_sa_lock);
2382         ctx = lli->lli_sax;
2383         sai = lli->lli_sai;
2384         if (sai && (sai->sai_agl_task != NULL) != agl)
2385                 CDEBUG(D_READA,
2386                        "%s: Statahead AGL hint changed from %d to %d\n",
2387                        ll_i2sbi(dir)->ll_fsname,
2388                        sai->sai_agl_task != NULL, agl);
2389         spin_unlock(&lli->lli_sa_lock);
2390
2391         return !!ctx;
2392 }
2393
2394 /**
2395  * statahead entry function, this is called when client getattr on a file, it
2396  * will start statahead thread if this is the first dir entry, else revalidate
2397  * dentry from statahead cache.
2398  *
2399  * \param[in]  dir      parent directory
2400  * \param[out] dentryp  dentry to getattr
2401  * \param[in]  agl      whether start the agl thread
2402  *
2403  * \retval              1 on success
2404  * \retval              0 revalidation from statahead cache failed, caller needs
2405  *                      to getattr from server directly
2406  * \retval              negative number on error, caller often ignores this and
2407  *                      then getattr from server
2408  */
2409 int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
2410 {
2411         if (!ll_statahead_started(dir, agl))
2412                 return start_statahead_thread(dir, dentry, agl);
2413         return 0;
2414 }
2415
2416 /**
2417  * revalidate dentry from statahead cache.
2418  *
2419  * \param[in]  dir      parent directory
2420  * \param[out] dentryp  dentry to getattr
2421  * \param[in]  unplug   unplug statahead window only (normally for negative
2422  *                      dentry)
2423  * \retval              1 on success
2424  * \retval              0 revalidation from statahead cache failed, caller needs
2425  *                      to getattr from server directly
2426  * \retval              negative number on error, caller often ignores this and
2427  *                      then getattr from server
2428  */
2429 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
2430                             bool unplug)
2431 {
2432         struct ll_inode_info *lli = ll_i2info(dir);
2433         struct ll_statahead_context *ctx;
2434         struct ll_statahead_info *sai = NULL;
2435         int rc = 0;
2436
2437         spin_lock(&lli->lli_sa_lock);
2438         ctx = lli->lli_sax;
2439         if (ctx) {
2440                 sai = lli->lli_sai;
2441                 if (sai) {
2442                         atomic_inc(&sai->sai_refcount);
2443                 } else if (lli->lli_sa_pattern & LSA_PATTERN_LIST) {
2444                         spin_unlock(&lli->lli_sa_lock);
2445                         return 0;
2446                 }
2447                 __ll_sax_get(ctx);
2448         }
2449         spin_unlock(&lli->lli_sa_lock);
2450         if (ctx) {
2451                 rc = revalidate_statahead_dentry(dir, ctx, dentryp, unplug);
2452                 CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
2453                        *dentryp, rc);
2454                 if (sai)
2455                         ll_sai_put(sai);
2456                 ll_sax_put(dir, ctx);
2457         }
2458         return rc;
2459 }
2460
2461 int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise)
2462 {
2463         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
2464         struct ll_file_data *fd = file->private_data;
2465         struct dentry *dentry = file_dentry(file);
2466         struct inode *dir = dentry->d_inode;
2467         struct ll_inode_info *lli = ll_i2info(dir);
2468         struct ll_sb_info *sbi = ll_i2sbi(dir);
2469         struct ll_statahead_info *sai = NULL;
2470         struct ll_statahead_context *ctx = NULL;
2471         struct task_struct *task;
2472         bool agl = true;
2473         int rc;
2474
2475         ENTRY;
2476
2477         if (sbi->ll_sa_max == 0)
2478                 RETURN(0);
2479
2480         if (!S_ISDIR(dir->i_mode))
2481                 RETURN(-EINVAL);
2482
2483         if (fd->fd_sai) {
2484                 rc = -EALREADY;
2485                 CWARN("%s: already set statahead hint for dir %pd: rc = %d\n",
2486                       sbi->ll_fsname, dentry, rc);
2487                 RETURN(rc);
2488         }
2489
2490         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
2491                                        sbi->ll_sa_running_max)) {
2492                 CDEBUG(D_READA,
2493                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
2494                 GOTO(out, rc = -EMFILE);
2495         }
2496
2497         sai = ll_sai_alloc(dentry);
2498         if (sai == NULL)
2499                 GOTO(out, rc = -ENOMEM);
2500
2501         sai->sai_fstart = ladvise->lla_start;
2502         sai->sai_fend = ladvise->lla_end;
2503         sai->sai_ls_all = 0;
2504         sai->sai_max = sbi->ll_sa_max;
2505         strncpy(sai->sai_fname, ladvise->lla_fname, sizeof(sai->sai_fname));
2506
2507         ctx = ll_sax_get(dir);
2508         if (ctx == NULL) {
2509                 ctx = ll_sax_alloc(dir);
2510                 if (ctx == NULL)
2511                         GOTO(out, rc = -ENOMEM);
2512
2513                 spin_lock(&lli->lli_sa_lock);
2514                 if (unlikely(lli->lli_sax)) {
2515                         struct ll_statahead_context *tmp = ctx;
2516
2517                         if (lli->lli_sa_pattern == LSA_PATTERN_NONE ||
2518                             lli->lli_sa_pattern == LSA_PATTERN_ADVISE) {
2519                                 lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2520                                 ctx = lli->lli_sax;
2521                                 __ll_sax_get(ctx);
2522                                 fd->fd_sai = __ll_sai_get(sai);
2523                                 rc = 0;
2524                         } else {
2525                                 rc = -EINVAL;
2526                                 CWARN("%s: pattern %X is not ADVISE: rc = %d\n",
2527                                       sbi->ll_fsname, lli->lli_sa_pattern, rc);
2528                         }
2529
2530                         spin_unlock(&lli->lli_sa_lock);
2531                         ll_sax_free(tmp);
2532                         if (rc)
2533                                 GOTO(out, rc);
2534                 } else {
2535                         lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2536                         lli->lli_sax = ctx;
2537                         fd->fd_sai = __ll_sai_get(sai);
2538                         spin_unlock(&lli->lli_sa_lock);
2539                 }
2540         } else {
2541                 spin_lock(&lli->lli_sa_lock);
2542                 if (!(lli->lli_sa_pattern == LSA_PATTERN_ADVISE ||
2543                       lli->lli_sa_pattern == LSA_PATTERN_NONE)) {
2544                         spin_unlock(&lli->lli_sa_lock);
2545                         GOTO(out, rc = -EINVAL);
2546                 }
2547
2548                 lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2549                 fd->fd_sai = __ll_sai_get(sai);
2550                 spin_unlock(&lli->lli_sa_lock);
2551         }
2552
2553         __ll_sax_get(ctx);
2554         CDEBUG(D_READA,
2555                "start statahead thread: [pid %d] [parent %pd] sai %p ctx %p\n",
2556                current->pid, dentry, sai, ctx);
2557
2558         task = kthread_create_on_node(ll_statahead_thread, sai, node,
2559                                       "ll_sa_%u", current->pid);
2560         if (IS_ERR(task)) {
2561                 rc = PTR_ERR(task);
2562                 CERROR("%s: cannot start ll_sa thread: rc = %d\n",
2563                        sbi->ll_fsname, rc);
2564                 GOTO(out, rc);
2565         }
2566
2567         if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
2568                 ll_start_agl(dentry, sai);
2569
2570         atomic_inc(&sbi->ll_sa_total);
2571         sai->sai_task = task;
2572         wake_up_process(task);
2573
2574         RETURN(0);
2575 out:
2576         if (fd->fd_sai) {
2577                 ll_sai_put(sai);
2578                 ll_sax_put(dir, ctx);
2579                 fd->fd_sai = NULL;
2580         }
2581
2582         if (sai)
2583                 ll_sai_free(sai);
2584
2585         if (ctx)
2586                 ll_sax_free(ctx);
2587
2588         atomic_dec(&sbi->ll_sa_running);
2589         RETURN(rc);
2590 }
2591
2592 /*
2593  * This function is called in each stat() system call to do statahead check.
2594  * When the files' naming of stat() call sequence under a directory follows
2595  * a certain name rule roughly, this directory is considered as an condicant
2596  * to do statahead.
2597  * For an example, the file naming rule is mdtest.$rank.$i, the suffix of
2598  * the stat() dentry name is number and do stat() for dentries with name
2599  * ending with number more than @LSA_FN_PREDICT_HIT, then the corresponding
2600  * directory is met the requrirement for statahead.
2601  */
2602 void ll_statahead_enter(struct inode *dir, struct dentry *dchild)
2603 {
2604         struct ll_inode_info *lli;
2605         struct qstr *dname = &dchild->d_name;
2606
2607         if (ll_i2sbi(dir)->ll_sa_max == 0)
2608                 return;
2609
2610         if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
2611                 return;
2612
2613         lli = ll_i2info(dir);
2614         if (lli->lli_sa_enabled)
2615                 return;
2616
2617         if (lli->lli_sa_pattern & (LSA_PATTERN_FN_PREDICT | LSA_PATTERN_LIST))
2618                 return;
2619
2620         /*
2621          * Now support number indexing regularized statahead pattern only.
2622          * Quick check whether the last character is digit.
2623          */
2624         if (!isdigit(dname->name[dname->len - 1])) {
2625                 lli->lli_sa_match_count = 0;
2626                 return;
2627         }
2628
2629         lli->lli_sa_match_count++;
2630         if (lli->lli_sa_match_count > LSA_FN_PREDICT_HIT) {
2631                 spin_lock(&lli->lli_sa_lock);
2632                 lli->lli_sa_pattern |= LSA_PATTERN_FN_PREDICT;
2633                 spin_unlock(&lli->lli_sa_lock);
2634                 lli->lli_sa_enabled = 1;
2635                 lli->lli_sa_match_count = 0;
2636         }
2637 }