Whamcloud - gitweb
LU-17383 statahead: quit statahead with a long time wait
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #include <linux/fs.h>
33 #include <linux/sched.h>
34 #include <linux/kthread.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38 #include <linux/delay.h>
39
40 #define DEBUG_SUBSYSTEM S_LLITE
41
42 #include <obd_support.h>
43 #include <lustre_dlm.h>
44 #include "llite_internal.h"
45
46 #define SA_OMITTED_ENTRY_MAX 8ULL
47
48 typedef enum {
49         /** negative values are for error cases */
50         SA_ENTRY_INIT = 0,      /** init entry */
51         SA_ENTRY_SUCC = 1,      /** stat succeed */
52         SA_ENTRY_INVA = 2,      /** invalid entry */
53 } se_state_t;
54
55 /*
56  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
57  * and in async stat callback ll_statahead_interpret() will prepare the inode
58  * and set lock data in the ptlrpcd context. Then the scanner process will be
59  * woken up if this entry is the waiting one, can access and free it.
60  */
61 struct sa_entry {
62         /* link into sai_entries */
63         struct list_head                 se_list;
64         /* link into sai hash table locally */
65         struct list_head                 se_hash;
66         /* entry index in the sai */
67         __u64                            se_index;
68         /* low layer ldlm lock handle */
69         __u64                            se_handle;
70         /* entry status */
71         se_state_t                       se_state;
72         /* entry size, contains name */
73         int                              se_size;
74         /* pointer to the target inode */
75         struct inode                    *se_inode;
76         /* pointer to @sai per process struct */
77         struct ll_statahead_info        *se_sai;
78         /* entry name */
79         struct qstr                      se_qstr;
80         /* entry fid */
81         struct lu_fid                    se_fid;
82 };
83
84 static unsigned int sai_generation;
85 static DEFINE_SPINLOCK(sai_generation_lock);
86
87 static inline int sa_unhashed(struct sa_entry *entry)
88 {
89         return list_empty(&entry->se_hash);
90 }
91
92 /* sa_entry is ready to use */
93 static inline int sa_ready(struct sa_entry *entry)
94 {
95         /* Make sure sa_entry is updated and ready to use */
96         smp_rmb();
97         return (entry->se_state != SA_ENTRY_INIT);
98 }
99
100 /* hash value to put in sai_cache */
101 static inline int sa_hash(int val)
102 {
103         return val & LL_SA_CACHE_MASK;
104 }
105
106 /* hash entry into sax_cache */
107 static inline void
108 sa_rehash(struct ll_statahead_context *ctx, struct sa_entry *entry)
109 {
110         int i = sa_hash(entry->se_qstr.hash);
111
112         spin_lock(&ctx->sax_cache_lock[i]);
113         list_add_tail(&entry->se_hash, &ctx->sax_cache[i]);
114         spin_unlock(&ctx->sax_cache_lock[i]);
115 }
116
117 /* unhash entry from sai_cache */
118 static inline void
119 sa_unhash(struct ll_statahead_context *ctx, struct sa_entry *entry)
120 {
121         int i = sa_hash(entry->se_qstr.hash);
122
123         spin_lock(&ctx->sax_cache_lock[i]);
124         list_del_init(&entry->se_hash);
125         spin_unlock(&ctx->sax_cache_lock[i]);
126 }
127
128 static inline int agl_should_run(struct ll_statahead_info *sai,
129                                  struct inode *inode)
130 {
131         return inode && S_ISREG(inode->i_mode) && sai->sai_agl_task;
132 }
133
134 static inline struct ll_inode_info *
135 agl_first_entry(struct ll_statahead_info *sai)
136 {
137         return list_first_entry(&sai->sai_agls, struct ll_inode_info,
138                                 lli_agl_list);
139 }
140
141 /* statahead window is full */
142 static inline int sa_sent_full(struct ll_statahead_info *sai)
143 {
144         return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
145 }
146
147 /* Batch metadata handle */
148 static inline bool sa_has_batch_handle(struct ll_statahead_info *sai)
149 {
150         return sai->sai_bh != NULL;
151 }
152
153 static inline void ll_statahead_flush_nowait(struct ll_statahead_info *sai)
154 {
155         if (sa_has_batch_handle(sai)) {
156                 sai->sai_index_end = sai->sai_index - 1;
157                 (void) md_batch_flush(ll_i2mdexp(sai->sai_dentry->d_inode),
158                                       sai->sai_bh, false);
159         }
160 }
161
162 static inline int agl_list_empty(struct ll_statahead_info *sai)
163 {
164         return list_empty(&sai->sai_agls);
165 }
166
167 /**
168  * (1) hit ratio less than 80%
169  * or
170  * (2) consecutive miss more than 8
171  * then means low hit.
172  */
173 static inline int sa_low_hit(struct ll_statahead_info *sai)
174 {
175         return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
176                 (sai->sai_consecutive_miss > 8));
177 }
178
179 /*
180  * if the given index is behind of statahead window more than
181  * SA_OMITTED_ENTRY_MAX, then it is old.
182  */
183 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
184 {
185         return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
186                 sai->sai_index);
187 }
188
189 /* allocate sa_entry and hash it to allow scanner process to find it */
190 static struct sa_entry *
191 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
192          const char *name, int len, const struct lu_fid *fid)
193 {
194         struct ll_inode_info *lli;
195         struct sa_entry *entry;
196         int entry_size;
197         char *dname;
198
199         ENTRY;
200
201         entry_size = sizeof(struct sa_entry) +
202                      round_up(len + 1 /* for trailing NUL */, 4);
203         OBD_ALLOC(entry, entry_size);
204         if (unlikely(!entry))
205                 RETURN(ERR_PTR(-ENOMEM));
206
207         CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
208                len, name, entry, index);
209
210         entry->se_index = index;
211         entry->se_sai = sai;
212
213         entry->se_state = SA_ENTRY_INIT;
214         entry->se_size = entry_size;
215         dname = (char *)entry + sizeof(struct sa_entry);
216         memcpy(dname, name, len);
217         dname[len] = 0;
218         entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
219         entry->se_qstr.len = len;
220         entry->se_qstr.name = dname;
221
222         if (fid)
223                 entry->se_fid = *fid;
224
225         lli = ll_i2info(sai->sai_dentry->d_inode);
226         spin_lock(&lli->lli_sa_lock);
227         INIT_LIST_HEAD(&entry->se_list);
228         sa_rehash(lli->lli_sax, entry);
229         spin_unlock(&lli->lli_sa_lock);
230
231         atomic_inc(&sai->sai_cache_count);
232
233         RETURN(entry);
234 }
235
236 /* free sa_entry, which should have been unhashed and not in any list */
237 static void sa_free(struct ll_statahead_context *ctx, struct sa_entry *entry)
238 {
239         CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
240                entry->se_qstr.len, entry->se_qstr.name, entry,
241                entry->se_index);
242
243         LASSERT(list_empty(&entry->se_list));
244         LASSERT(sa_unhashed(entry));
245
246         OBD_FREE(entry, entry->se_size);
247 }
248
249 /*
250  * find sa_entry by name, used by directory scanner, lock is not needed because
251  * only scanner can remove the entry from cache.
252  */
253 static struct sa_entry *
254 sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
255 {
256         struct sa_entry *entry;
257         int i = sa_hash(qstr->hash);
258
259         spin_lock(&ctx->sax_cache_lock[i]);
260         list_for_each_entry(entry, &ctx->sax_cache[i], se_hash) {
261                 if (entry->se_qstr.hash == qstr->hash &&
262                     entry->se_qstr.len == qstr->len &&
263                     memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) {
264                         spin_unlock(&ctx->sax_cache_lock[i]);
265                         return entry;
266                 }
267         }
268         spin_unlock(&ctx->sax_cache_lock[i]);
269         return NULL;
270 }
271
272 /* unhash and unlink sa_entry, and then free it */
273 static inline void
274 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry, bool locked)
275 {
276         struct inode *dir = sai->sai_dentry->d_inode;
277         struct ll_inode_info *lli = ll_i2info(dir);
278         struct ll_statahead_context *ctx = lli->lli_sax;
279
280         LASSERT(!sa_unhashed(entry));
281         LASSERT(!list_empty(&entry->se_list));
282         LASSERT(sa_ready(entry));
283
284         sa_unhash(ctx, entry);
285
286         if (!locked)
287                 spin_lock(&lli->lli_sa_lock);
288         list_del_init(&entry->se_list);
289         spin_unlock(&lli->lli_sa_lock);
290
291         iput(entry->se_inode);
292         atomic_dec(&sai->sai_cache_count);
293         sa_free(ctx, entry);
294         if (locked)
295                 spin_lock(&lli->lli_sa_lock);
296 }
297
298 /* called by scanner after use, sa_entry will be killed */
299 static void
300 sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
301 {
302         struct ll_inode_info *lli = ll_i2info(dir);
303         struct sa_entry *tmp;
304         bool wakeup = false;
305
306         if (entry && entry->se_state == SA_ENTRY_SUCC) {
307                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
308
309                 sai->sai_hit++;
310                 sai->sai_consecutive_miss = 0;
311                 if (sai->sai_max < sbi->ll_sa_max) {
312                         sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
313                         wakeup = true;
314                 } else if (sai->sai_max_batch_count > 0) {
315                         if (sai->sai_max >= sai->sai_max_batch_count &&
316                            (sai->sai_index_end - entry->se_index) %
317                            sai->sai_max_batch_count == 0) {
318                                 wakeup = true;
319                         } else if (entry->se_index == sai->sai_index_end) {
320                                 wakeup = true;
321                         }
322                 } else {
323                         wakeup = true;
324                 }
325         } else if (sai) {
326                 sai->sai_miss++;
327                 sai->sai_consecutive_miss++;
328                 wakeup = true;
329         }
330
331         if (entry)
332                 sa_kill(sai, entry, false);
333
334         if (sai) {
335                 /*
336                  * kill old completed entries. Maybe kicking old entries can
337                  * be ignored?
338                  */
339                 spin_lock(&lli->lli_sa_lock);
340                 while ((tmp = list_first_entry_or_null(&sai->sai_entries,
341                                 struct sa_entry, se_list))) {
342                         if (!is_omitted_entry(sai, tmp->se_index))
343                                 break;
344
345                         /* ll_sa_lock is dropped by sa_kill(), restart list */
346                         sa_kill(sai, tmp, true);
347                 }
348                 spin_unlock(&lli->lli_sa_lock);
349         }
350
351         spin_lock(&lli->lli_sa_lock);
352         if (wakeup && sai->sai_task)
353                 wake_up_process(sai->sai_task);
354         spin_unlock(&lli->lli_sa_lock);
355 }
356
357 /*
358  * update state and sort add entry to sai_entries by index, return true if
359  * scanner is waiting on this entry.
360  */
361 static bool
362 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
363 {
364         struct sa_entry *se;
365         struct list_head *pos = &sai->sai_entries;
366         __u64 index = entry->se_index;
367
368         LASSERT(!sa_ready(entry));
369         LASSERT(list_empty(&entry->se_list));
370
371         list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
372                 if (se->se_index < entry->se_index) {
373                         pos = &se->se_list;
374                         break;
375                 }
376         }
377         list_add(&entry->se_list, pos);
378         /*
379          * LU-9210: ll_statahead_interpet must be able to see this before
380          * we wake it up
381          */
382         smp_store_release(&entry->se_state,
383                           ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
384
385         return (index == sai->sai_index_wait);
386 }
387
388 /* finish async stat RPC arguments */
389 static void sa_fini_data(struct md_op_item *item)
390 {
391         struct md_op_data *op_data = &item->mop_data;
392
393         if (op_data->op_flags & MF_OPNAME_KMALLOCED)
394                 /* allocated via ll_setup_filename called from sa_prep_data */
395                 kfree(op_data->op_name);
396         ll_unlock_md_op_lsm(&item->mop_data);
397         iput(item->mop_dir);
398         if (item->mop_subpill_allocated)
399                 OBD_FREE_PTR(item->mop_pill);
400         OBD_FREE_PTR(item);
401 }
402
403 static int ll_statahead_interpret(struct md_op_item *item, int rc);
404
405 /*
406  * prepare arguments for async stat RPC.
407  */
408 static struct md_op_item *
409 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
410 {
411         struct md_op_item *item;
412         struct ldlm_enqueue_info *einfo;
413         struct md_op_data *op_data;
414
415         OBD_ALLOC_PTR(item);
416         if (!item)
417                 return ERR_PTR(-ENOMEM);
418
419         op_data = ll_prep_md_op_data(&item->mop_data, dir, child,
420                                      entry->se_qstr.name, entry->se_qstr.len, 0,
421                                      LUSTRE_OPC_ANY, NULL);
422         if (IS_ERR(op_data)) {
423                 OBD_FREE_PTR(item);
424                 return (struct md_op_item *)op_data;
425         }
426
427         if (!child)
428                 op_data->op_fid2 = entry->se_fid;
429
430         item->mop_opc = MD_OP_GETATTR;
431         item->mop_it.it_op = IT_GETATTR;
432         item->mop_dir = igrab(dir);
433         item->mop_cb = ll_statahead_interpret;
434         item->mop_cbdata = entry;
435
436         einfo = &item->mop_einfo;
437         einfo->ei_type = LDLM_IBITS;
438         einfo->ei_mode = it_to_lock_mode(&item->mop_it);
439         einfo->ei_cb_bl = ll_md_blocking_ast;
440         einfo->ei_cb_cp = ldlm_completion_ast;
441         einfo->ei_cb_gl = NULL;
442         einfo->ei_cbdata = NULL;
443         einfo->ei_req_slot = 1;
444
445         return item;
446 }
447
448 /*
449  * release resources used in async stat RPC, update entry state and wakeup if
450  * scanner process it waiting on this entry.
451  */
452 static void
453 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
454 {
455         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
456         bool wakeup;
457
458         spin_lock(&lli->lli_sa_lock);
459         wakeup = __sa_make_ready(sai, entry, ret);
460         spin_unlock(&lli->lli_sa_lock);
461
462         if (wakeup)
463                 wake_up(&sai->sai_waitq);
464 }
465
466 /* insert inode into the list of sai_agls */
467 static void ll_agl_add(struct ll_statahead_info *sai,
468                        struct inode *inode, int index)
469 {
470         struct ll_inode_info *child  = ll_i2info(inode);
471         struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
472
473         spin_lock(&child->lli_agl_lock);
474         if (child->lli_agl_index == 0) {
475                 child->lli_agl_index = index;
476                 spin_unlock(&child->lli_agl_lock);
477
478                 LASSERT(list_empty(&child->lli_agl_list));
479
480                 spin_lock(&parent->lli_agl_lock);
481                 /* Re-check under the lock */
482                 if (agl_should_run(sai, inode)) {
483                         if (agl_list_empty(sai))
484                                 wake_up_process(sai->sai_agl_task);
485                         igrab(inode);
486                         list_add_tail(&child->lli_agl_list, &sai->sai_agls);
487                 } else
488                         child->lli_agl_index = 0;
489                 spin_unlock(&parent->lli_agl_lock);
490         } else {
491                 spin_unlock(&child->lli_agl_lock);
492         }
493 }
494
495 /* Allocate sax */
496 static struct ll_statahead_context *ll_sax_alloc(struct inode *dir)
497 {
498         struct ll_statahead_context *ctx;
499         int i;
500
501         ENTRY;
502
503         OBD_ALLOC_PTR(ctx);
504         if (ctx == NULL)
505                 RETURN(NULL);
506
507         ctx->sax_inode = igrab(dir);
508         atomic_set(&ctx->sax_refcount, 1);
509         for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
510                 INIT_LIST_HEAD(&ctx->sax_cache[i]);
511                 spin_lock_init(&ctx->sax_cache_lock[i]);
512         }
513
514         RETURN(ctx);
515 }
516
517 static inline void ll_sax_free(struct ll_statahead_context *ctx)
518 {
519         LASSERT(ctx->sax_inode != NULL);
520         iput(ctx->sax_inode);
521         OBD_FREE_PTR(ctx);
522 }
523
524 static inline void __ll_sax_get(struct ll_statahead_context *ctx)
525 {
526         atomic_inc(&ctx->sax_refcount);
527 }
528
529 static inline struct ll_statahead_context *ll_sax_get(struct inode *dir)
530 {
531         struct ll_inode_info *lli = ll_i2info(dir);
532         struct ll_statahead_context *ctx = NULL;
533
534         spin_lock(&lli->lli_sa_lock);
535         ctx = lli->lli_sax;
536         if (ctx)
537                 __ll_sax_get(ctx);
538         spin_unlock(&lli->lli_sa_lock);
539
540         return ctx;
541 }
542
543 static inline void ll_sax_put(struct inode *dir,
544                               struct ll_statahead_context *ctx)
545 {
546         struct ll_inode_info *lli = ll_i2info(dir);
547
548         if (atomic_dec_and_lock(&ctx->sax_refcount, &lli->lli_sa_lock)) {
549                 lli->lli_sai = NULL;
550                 lli->lli_sax = NULL;
551                 if (lli->lli_sa_pattern & (LSA_PATTERN_ADVISE |
552                                            LSA_PATTERN_FNAME)) {
553                         lli->lli_opendir_key = NULL;
554                         lli->lli_opendir_pid = 0;
555                         lli->lli_sa_enabled = 0;
556                 }
557                 lli->lli_sa_pattern = LSA_PATTERN_NONE;
558                 spin_unlock(&lli->lli_sa_lock);
559
560                 ll_sax_free(ctx);
561         }
562 }
563
564 /* allocate sai */
565 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
566 {
567         struct ll_statahead_info *sai;
568         struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
569
570         ENTRY;
571
572         OBD_ALLOC_PTR(sai);
573         if (!sai)
574                 RETURN(NULL);
575
576         sai->sai_dentry = dget(dentry);
577         atomic_set(&sai->sai_refcount, 1);
578         sai->sai_max = ll_i2sbi(dentry->d_inode)->ll_sa_min;
579         sai->sai_index = 1;
580         init_waitqueue_head(&sai->sai_waitq);
581
582         INIT_LIST_HEAD(&sai->sai_entries);
583         INIT_LIST_HEAD(&sai->sai_agls);
584
585         atomic_set(&sai->sai_cache_count, 0);
586
587         spin_lock(&sai_generation_lock);
588         lli->lli_sa_generation = ++sai_generation;
589         if (unlikely(sai_generation == 0))
590                 lli->lli_sa_generation = ++sai_generation;
591         spin_unlock(&sai_generation_lock);
592
593         RETURN(sai);
594 }
595
596 /* free sai */
597 static inline void ll_sai_free(struct ll_statahead_info *sai)
598 {
599         LASSERT(sai->sai_dentry != NULL);
600         dput(sai->sai_dentry);
601         OBD_FREE_PTR(sai);
602 }
603
604 static inline struct ll_statahead_info *
605 __ll_sai_get(struct ll_statahead_info *sai)
606 {
607         atomic_inc(&sai->sai_refcount);
608         return sai;
609 }
610
611 /*
612  * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
613  * attached to it.
614  */
615 static void ll_sai_put(struct ll_statahead_info *sai)
616 {
617         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
618
619         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
620                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
621                 struct sa_entry *entry, *next;
622
623                 lli->lli_sai = NULL;
624                 spin_unlock(&lli->lli_sa_lock);
625
626                 LASSERT(!sai->sai_task);
627                 LASSERT(!sai->sai_agl_task);
628                 LASSERT(sai->sai_sent == sai->sai_replied);
629
630                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
631                                          se_list)
632                         sa_kill(sai, entry, false);
633
634                 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
635                 LASSERT(agl_list_empty(sai));
636
637                 ll_sai_free(sai);
638                 atomic_dec(&sbi->ll_sa_running);
639         }
640 }
641
642 /* Do NOT forget to drop inode refcount when into sai_agls. */
643 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
644 {
645         struct ll_inode_info *lli = ll_i2info(inode);
646         u64 index = lli->lli_agl_index;
647         ktime_t expire;
648         int rc;
649
650         ENTRY;
651
652         LASSERT(list_empty(&lli->lli_agl_list));
653
654         /* AGL maybe fall behind statahead with one entry */
655         if (is_omitted_entry(sai, index + 1)) {
656                 lli->lli_agl_index = 0;
657                 iput(inode);
658                 RETURN_EXIT;
659         }
660
661         /*
662          * In case of restore, the MDT has the right size and has already
663          * sent it back without granting the layout lock, inode is up-to-date.
664          * Then AGL (async glimpse lock) is useless.
665          * Also to glimpse we need the layout, in case of a runninh restore
666          * the MDT holds the layout lock so the glimpse will block up to the
667          * end of restore (statahead/agl will block)
668          */
669         if (test_bit(LLIF_FILE_RESTORING, &lli->lli_flags)) {
670                 lli->lli_agl_index = 0;
671                 iput(inode);
672                 RETURN_EXIT;
673         }
674
675         /* Someone is in glimpse (sync or async), do nothing. */
676         rc = down_write_trylock(&lli->lli_glimpse_sem);
677         if (rc == 0) {
678                 lli->lli_agl_index = 0;
679                 iput(inode);
680                 RETURN_EXIT;
681         }
682
683         /*
684          * Someone triggered glimpse within 1 sec before.
685          * 1) The former glimpse succeeded with glimpse lock granted by OST, and
686          *    if the lock is still cached on client, AGL needs to do nothing. If
687          *    it is cancelled by other client, AGL maybe cannot obtaion new lock
688          *    for no glimpse callback triggered by AGL.
689          * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
690          *    Under such case, it is quite possible that the OST will not grant
691          *    glimpse lock for AGL also.
692          * 3) The former glimpse failed, compared with other two cases, it is
693          *    relative rare. AGL can ignore such case, and it will not muchly
694          *    affect the performance.
695          */
696         expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
697         if (ktime_to_ns(lli->lli_glimpse_time) &&
698             ktime_before(expire, lli->lli_glimpse_time)) {
699                 up_write(&lli->lli_glimpse_sem);
700                 lli->lli_agl_index = 0;
701                 iput(inode);
702                 RETURN_EXIT;
703         }
704
705         CDEBUG(D_READA,
706                "Handling (init) async glimpse: inode = " DFID", idx = %llu\n",
707                PFID(&lli->lli_fid), index);
708
709         cl_agl(inode);
710         lli->lli_agl_index = 0;
711         lli->lli_glimpse_time = ktime_get();
712         up_write(&lli->lli_glimpse_sem);
713
714         CDEBUG(D_READA,
715                "Handled (init) async glimpse: inode= " DFID", idx = %llu, rc = %d\n",
716                PFID(&lli->lli_fid), index, rc);
717
718         iput(inode);
719
720         EXIT;
721 }
722
723 static void ll_statahead_interpret_fini(struct ll_inode_info *lli,
724                                         struct ll_statahead_info *sai,
725                                         struct md_op_item *item,
726                                         struct sa_entry *entry,
727                                         struct ptlrpc_request *req,
728                                         int rc)
729 {
730         /*
731          * First it will drop ldlm ibits lock refcount by calling
732          * ll_intent_drop_lock() in spite of failures. Do not worry about
733          * calling ll_intent_drop_lock() more than once.
734          */
735         ll_intent_release(&item->mop_it);
736         sa_fini_data(item);
737         if (req)
738                 ptlrpc_req_finished(req);
739         sa_make_ready(sai, entry, rc);
740
741         spin_lock(&lli->lli_sa_lock);
742         sai->sai_replied++;
743         spin_unlock(&lli->lli_sa_lock);
744 }
745
746 static void ll_statahead_interpret_work(struct work_struct *work)
747 {
748         struct md_op_item *item = container_of(work, struct md_op_item,
749                                                mop_work);
750         struct req_capsule *pill = item->mop_pill;
751         struct inode *dir = item->mop_dir;
752         struct ll_inode_info *lli = ll_i2info(dir);
753         struct ll_statahead_info *sai;
754         struct lookup_intent *it;
755         struct sa_entry *entry;
756         struct mdt_body *body;
757         struct inode *child;
758         int rc;
759
760         ENTRY;
761
762         entry = (struct sa_entry *)item->mop_cbdata;
763         LASSERT(entry->se_handle != 0);
764
765         sai = entry->se_sai;
766         it = &item->mop_it;
767         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
768         if (!body)
769                 GOTO(out, rc = -EFAULT);
770
771         child = entry->se_inode;
772         /* revalidate; unlinked and re-created with the same name */
773         if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
774                      !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
775                 if (child) {
776                         entry->se_inode = NULL;
777                         iput(child);
778                 }
779                 /* The mdt_body is invalid. Skip this entry */
780                 GOTO(out, rc = -EAGAIN);
781         }
782
783         it->it_lock_handle = entry->se_handle;
784         rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
785         if (rc != 1)
786                 GOTO(out, rc = -EAGAIN);
787
788         rc = ll_prep_inode(&child, pill, dir->i_sb, it);
789         if (rc) {
790                 CERROR("%s: getattr callback for %.*s "DFID": rc = %d\n",
791                        ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
792                        entry->se_qstr.name, PFID(&entry->se_fid), rc);
793                 GOTO(out, rc);
794         }
795
796         /* If encryption context was returned by MDT, put it in
797          * inode now to save an extra getxattr.
798          */
799         if (body->mbo_valid & OBD_MD_ENCCTX) {
800                 void *encctx = req_capsule_server_get(pill, &RMF_FILE_ENCCTX);
801                 __u32 encctxlen = req_capsule_get_size(pill, &RMF_FILE_ENCCTX,
802                                                        RCL_SERVER);
803
804                 if (encctxlen) {
805                         CDEBUG(D_SEC,
806                                "server returned encryption ctx for "DFID"\n",
807                                PFID(ll_inode2fid(child)));
808                         rc = ll_xattr_cache_insert(child,
809                                                    xattr_for_enc(child),
810                                                    encctx, encctxlen);
811                         if (rc)
812                                 CWARN("%s: cannot set enc ctx for "DFID": rc = %d\n",
813                                       ll_i2sbi(child)->ll_fsname,
814                                       PFID(ll_inode2fid(child)), rc);
815                 }
816         }
817
818         CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
819                ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
820                entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
821         ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
822
823         entry->se_inode = child;
824
825         if (agl_should_run(sai, child))
826                 ll_agl_add(sai, child, entry->se_index);
827 out:
828         ll_statahead_interpret_fini(lli, sai, item, entry, pill->rc_req, rc);
829 }
830
831 /*
832  * Callback for async stat RPC, this is called in ptlrpcd context. It prepares
833  * the inode and set lock data directly in the ptlrpcd context. It will wake up
834  * the directory listing process if the dentry is the waiting one.
835  */
836 static int ll_statahead_interpret(struct md_op_item *item, int rc)
837 {
838         struct req_capsule *pill = item->mop_pill;
839         struct lookup_intent *it = &item->mop_it;
840         struct inode *dir = item->mop_dir;
841         struct ll_inode_info *lli = ll_i2info(dir);
842         struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
843         struct work_struct *work = &item->mop_work;
844         struct ll_statahead_info *sai;
845         struct mdt_body *body;
846         struct inode *child;
847         __u64 handle = 0;
848
849         ENTRY;
850
851         if (it_disposition(it, DISP_LOOKUP_NEG))
852                 rc = -ENOENT;
853
854         /*
855          * because statahead thread will wait for all inflight RPC to finish,
856          * sai should be always valid, no need to refcount
857          */
858         LASSERT(entry != NULL);
859         sai = entry->se_sai;
860         LASSERT(sai != NULL);
861
862         CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
863                entry->se_qstr.len, entry->se_qstr.name, rc);
864
865         if (rc != 0)
866                 GOTO(out, rc);
867
868         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
869         if (!body)
870                 GOTO(out, rc = -EFAULT);
871
872         child = entry->se_inode;
873         /*
874          * revalidate; unlinked and re-created with the same name.
875          * exclude the case where FID is zero as it was from statahead with
876          * regularized file name pattern and had no idea for the FID of the
877          * children file.
878          */
879         if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
880                      !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
881                 if (child) {
882                         entry->se_inode = NULL;
883                         iput(child);
884                 }
885                 /* The mdt_body is invalid. Skip this entry */
886                 GOTO(out, rc = -EAGAIN);
887         }
888
889         entry->se_handle = it->it_lock_handle;
890         /*
891          * In ptlrpcd context, it is not allowed to generate new RPCs
892          * especially for striped directories or regular files with layout
893          * change.
894          */
895         /*
896          * release ibits lock ASAP to avoid deadlock when statahead
897          * thread enqueues lock on parent in readdir and another
898          * process enqueues lock on child with parent lock held, eg.
899          * unlink.
900          */
901         handle = it->it_lock_handle;
902         ll_intent_drop_lock(it);
903         ll_unlock_md_op_lsm(&item->mop_data);
904
905         /*
906          * If the statahead entry is a striped directory or regular file with
907          * layout change, it will generate a new RPC and long wait in the
908          * ptlrpcd context.
909          * However, it is dangerous of blocking in ptlrpcd thread.
910          * Here we use work queue or the separate statahead thread to handle
911          * the extra RPC and long wait:
912          *      (@ll_prep_inode->@lmv_revalidate_slaves);
913          *      (@ll_prep_inode->@lov_layout_change->osc_cache_wait_range);
914          */
915         INIT_WORK(work, ll_statahead_interpret_work);
916         ptlrpc_request_addref(pill->rc_req);
917         schedule_work(work);
918         RETURN(0);
919 out:
920         ll_statahead_interpret_fini(lli, sai, item, entry, NULL, rc);
921         RETURN(rc);
922 }
923
924 static inline int sa_getattr(struct ll_statahead_info *sai, struct inode *dir,
925                              struct md_op_item *item)
926 {
927         int rc;
928
929         if (sa_has_batch_handle(sai))
930                 rc = md_batch_add(ll_i2mdexp(dir), sai->sai_bh, item);
931         else
932                 rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
933
934         return rc;
935 }
936
937 /* async stat for file not found in dcache */
938 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
939 {
940         struct md_op_item *item;
941         int rc;
942
943         ENTRY;
944
945         item = sa_prep_data(dir, NULL, entry);
946         if (IS_ERR(item))
947                 RETURN(PTR_ERR(item));
948
949         rc = sa_getattr(entry->se_sai, dir, item);
950         if (rc < 0)
951                 sa_fini_data(item);
952
953         RETURN(rc);
954 }
955
956 /**
957  * async stat for file found in dcache, similar to .revalidate
958  *
959  * \retval      1 dentry valid, no RPC sent
960  * \retval      0 dentry invalid, will send async stat RPC
961  * \retval      negative number upon error
962  */
963 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
964                          struct dentry *dentry)
965 {
966         struct inode *inode = dentry->d_inode;
967         struct lookup_intent it = { .it_op = IT_GETATTR,
968                                     .it_lock_handle = 0 };
969         struct md_op_item *item;
970         int rc;
971
972         ENTRY;
973
974         if (unlikely(!inode))
975                 RETURN(1);
976
977         if (d_mountpoint(dentry))
978                 RETURN(1);
979
980         item = sa_prep_data(dir, inode, entry);
981         if (IS_ERR(item))
982                 RETURN(PTR_ERR(item));
983
984         entry->se_inode = igrab(inode);
985         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
986                                 NULL);
987         if (rc == 1) {
988                 entry->se_handle = it.it_lock_handle;
989                 ll_intent_release(&it);
990                 sa_fini_data(item);
991                 RETURN(1);
992         }
993
994         rc = sa_getattr(entry->se_sai, dir, item);
995         if (rc < 0) {
996                 entry->se_inode = NULL;
997                 iput(inode);
998                 sa_fini_data(item);
999         }
1000
1001         RETURN(rc);
1002 }
1003
1004 /* async stat for file with @name */
1005 static void sa_statahead(struct ll_statahead_info *sai, struct dentry *parent,
1006                          const char *name, int len, const struct lu_fid *fid)
1007 {
1008         struct inode *dir = parent->d_inode;
1009         struct dentry *dentry = NULL;
1010         struct sa_entry *entry;
1011         int rc;
1012
1013         ENTRY;
1014
1015         entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
1016         if (IS_ERR(entry))
1017                 RETURN_EXIT;
1018
1019         dentry = d_lookup(parent, &entry->se_qstr);
1020         if (!dentry) {
1021                 rc = sa_lookup(dir, entry);
1022         } else {
1023                 rc = sa_revalidate(dir, entry, dentry);
1024                 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
1025                         ll_agl_add(sai, dentry->d_inode, entry->se_index);
1026         }
1027
1028         if (dentry)
1029                 dput(dentry);
1030
1031         if (rc != 0)
1032                 sa_make_ready(sai, entry, rc);
1033         else
1034                 sai->sai_sent++;
1035
1036         sai->sai_index++;
1037
1038         if (sa_sent_full(sai))
1039                 ll_statahead_flush_nowait(sai);
1040
1041         EXIT;
1042 }
1043
1044 /* async glimpse (agl) thread main function */
1045 static int ll_agl_thread(void *arg)
1046 {
1047         /*
1048          * We already own this reference, so it is safe to take it
1049          * without a lock.
1050          */
1051         struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
1052         struct dentry *parent = sai->sai_dentry;
1053         struct inode *dir = parent->d_inode;
1054         struct ll_inode_info *plli = ll_i2info(dir);
1055         struct ll_inode_info *clli;
1056
1057         ENTRY;
1058
1059         CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
1060                sai, parent);
1061
1062         while (({set_current_state(TASK_IDLE);
1063                  !kthread_should_stop(); })) {
1064                 spin_lock(&plli->lli_agl_lock);
1065                 clli = list_first_entry_or_null(&sai->sai_agls,
1066                                                 struct ll_inode_info,
1067                                                 lli_agl_list);
1068                 if (clli) {
1069                         __set_current_state(TASK_RUNNING);
1070                         list_del_init(&clli->lli_agl_list);
1071                         spin_unlock(&plli->lli_agl_lock);
1072                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
1073                         cond_resched();
1074                 } else {
1075                         spin_unlock(&plli->lli_agl_lock);
1076                         schedule();
1077                 }
1078         }
1079         __set_current_state(TASK_RUNNING);
1080         RETURN(0);
1081 }
1082
1083 static void ll_stop_agl(struct ll_statahead_info *sai)
1084 {
1085         struct dentry *parent = sai->sai_dentry;
1086         struct ll_inode_info *plli = ll_i2info(parent->d_inode);
1087         struct ll_inode_info *clli;
1088         struct task_struct *agl_task;
1089
1090         spin_lock(&plli->lli_agl_lock);
1091         agl_task = sai->sai_agl_task;
1092         sai->sai_agl_task = NULL;
1093         spin_unlock(&plli->lli_agl_lock);
1094         if (!agl_task)
1095                 return;
1096
1097         CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
1098                sai, (unsigned int)agl_task->pid);
1099         kthread_stop(agl_task);
1100
1101         spin_lock(&plli->lli_agl_lock);
1102         while ((clli = list_first_entry_or_null(&sai->sai_agls,
1103                                                 struct ll_inode_info,
1104                                                 lli_agl_list)) != NULL) {
1105                 list_del_init(&clli->lli_agl_list);
1106                 spin_unlock(&plli->lli_agl_lock);
1107                 clli->lli_agl_index = 0;
1108                 iput(&clli->lli_vfs_inode);
1109                 spin_lock(&plli->lli_agl_lock);
1110         }
1111         spin_unlock(&plli->lli_agl_lock);
1112         CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
1113                sai, parent);
1114         ll_sai_put(sai);
1115 }
1116
1117 /* start agl thread */
1118 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
1119 {
1120         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1121         struct ll_inode_info *plli;
1122         struct task_struct *task;
1123
1124         ENTRY;
1125
1126         CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
1127                sai, parent);
1128
1129         plli = ll_i2info(parent->d_inode);
1130         task = kthread_create_on_node(ll_agl_thread, sai, node, "ll_agl_%d",
1131                                       plli->lli_opendir_pid);
1132         if (IS_ERR(task)) {
1133                 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
1134                 RETURN_EXIT;
1135         }
1136         sai->sai_agl_task = task;
1137         atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
1138         /* Get an extra reference that the thread holds */
1139         __ll_sai_get(sai);
1140
1141         wake_up_process(task);
1142
1143         EXIT;
1144 }
1145
1146 static int ll_statahead_by_list(struct dentry *parent)
1147 {
1148         struct inode *dir = parent->d_inode;
1149         struct ll_inode_info *lli = ll_i2info(dir);
1150         struct ll_statahead_info *sai = lli->lli_sai;
1151         struct ll_sb_info *sbi = ll_i2sbi(dir);
1152         struct md_op_data *op_data;
1153         struct page *page = NULL;
1154         __u64 pos = 0;
1155         int first = 0;
1156         int rc = 0;
1157
1158         ENTRY;
1159
1160         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1161                sai, parent);
1162
1163         OBD_ALLOC_PTR(op_data);
1164         if (!op_data)
1165                 RETURN(-ENOMEM);
1166
1167         while (pos != MDS_DIR_END_OFF &&
1168                /* matches smp_store_release() in ll_deauthorize_statahead() */
1169                smp_load_acquire(&sai->sai_task) &&
1170                lli->lli_sa_enabled) {
1171                 struct lu_dirpage *dp;
1172                 struct lu_dirent  *ent;
1173
1174                 op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
1175                                              LUSTRE_OPC_ANY, dir);
1176                 if (IS_ERR(op_data)) {
1177                         rc = PTR_ERR(op_data);
1178                         break;
1179                 }
1180
1181                 page = ll_get_dir_page(dir, op_data, pos, NULL);
1182                 ll_unlock_md_op_lsm(op_data);
1183                 if (IS_ERR(page)) {
1184                         rc = PTR_ERR(page);
1185                         CDEBUG(D_READA,
1186                                "error reading dir "DFID" at %llu /%llu opendir_pid = %u: rc = %d\n",
1187                                PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1188                                lli->lli_opendir_pid, rc);
1189                         break;
1190                 }
1191
1192                 dp = page_address(page);
1193                 for (ent = lu_dirent_start(dp);
1194                      /* matches smp_store_release() in ll_deauthorize_statahead() */
1195                      ent != NULL && smp_load_acquire(&sai->sai_task) &&
1196                      !sa_low_hit(sai) && lli->lli_sa_enabled;
1197                      ent = lu_dirent_next(ent)) {
1198                         __u64 hash;
1199                         int namelen;
1200                         char *name;
1201                         struct lu_fid fid;
1202                         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1203
1204                         hash = le64_to_cpu(ent->lde_hash);
1205                         if (unlikely(hash < pos))
1206                                 /*
1207                                  * Skip until we find target hash value.
1208                                  */
1209                                 continue;
1210
1211                         namelen = le16_to_cpu(ent->lde_namelen);
1212                         if (unlikely(namelen == 0))
1213                                 /*
1214                                  * Skip dummy record.
1215                                  */
1216                                 continue;
1217
1218                         name = ent->lde_name;
1219                         if (name[0] == '.') {
1220                                 if (namelen == 1) {
1221                                         /*
1222                                          * skip "."
1223                                          */
1224                                         continue;
1225                                 } else if (name[1] == '.' && namelen == 2) {
1226                                         /*
1227                                          * skip ".."
1228                                          */
1229                                         continue;
1230                                 } else if (!sai->sai_ls_all) {
1231                                         /*
1232                                          * skip hidden files.
1233                                          */
1234                                         sai->sai_skip_hidden++;
1235                                         continue;
1236                                 }
1237                         }
1238
1239                         /*
1240                          * don't stat-ahead first entry.
1241                          */
1242                         if (unlikely(++first == 1))
1243                                 continue;
1244
1245                         fid_le_to_cpu(&fid, &ent->lde_fid);
1246
1247                         while (({set_current_state(TASK_IDLE);
1248                                  /* matches smp_store_release() in
1249                                   * ll_deauthorize_statahead() */
1250                                  smp_load_acquire(&sai->sai_task); })) {
1251                                 long timeout;
1252
1253                                 spin_lock(&lli->lli_agl_lock);
1254                                 while (sa_sent_full(sai) &&
1255                                        !agl_list_empty(sai)) {
1256                                         struct ll_inode_info *clli;
1257
1258                                         __set_current_state(TASK_RUNNING);
1259                                         clli = agl_first_entry(sai);
1260                                         list_del_init(&clli->lli_agl_list);
1261                                         spin_unlock(&lli->lli_agl_lock);
1262
1263                                         ll_agl_trigger(&clli->lli_vfs_inode,
1264                                                        sai);
1265                                         cond_resched();
1266                                         spin_lock(&lli->lli_agl_lock);
1267                                 }
1268                                 spin_unlock(&lli->lli_agl_lock);
1269
1270                                 if (!sa_sent_full(sai))
1271                                         break;
1272
1273                                 /*
1274                                  * If the thread is not doing stat in
1275                                  * @sbi->ll_sa_timeout (30s) then it probably
1276                                  * does not care too much about performance,
1277                                  * or is no longer using this directory.
1278                                  * Stop the statahead thread in this case.
1279                                  */
1280                                 timeout = schedule_timeout(
1281                                         cfs_time_seconds(sbi->ll_sa_timeout));
1282                                 if (timeout == 0) {
1283                                         lli->lli_sa_enabled = 0;
1284                                         break;
1285                                 }
1286                         }
1287                         __set_current_state(TASK_RUNNING);
1288
1289                         if (IS_ENCRYPTED(dir)) {
1290                                 struct llcrypt_str de_name =
1291                                         LLTR_INIT(ent->lde_name, namelen);
1292                                 struct lu_fid fid;
1293
1294                                 rc = llcrypt_fname_alloc_buffer(dir, NAME_MAX,
1295                                                                 &lltr);
1296                                 if (rc < 0)
1297                                         continue;
1298
1299                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1300                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1301                                                          &lltr, &fid)) {
1302                                         llcrypt_fname_free_buffer(&lltr);
1303                                         continue;
1304                                 }
1305
1306                                 name = lltr.name;
1307                                 namelen = lltr.len;
1308                         }
1309
1310                         sa_statahead(sai, parent, name, namelen, &fid);
1311                         llcrypt_fname_free_buffer(&lltr);
1312                 }
1313
1314                 pos = le64_to_cpu(dp->ldp_hash_end);
1315                 ll_release_page(dir, page,
1316                                 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1317
1318                 if (sa_low_hit(sai)) {
1319                         rc = -EFAULT;
1320                         atomic_inc(&sbi->ll_sa_wrong);
1321                         CDEBUG(D_READA,
1322                                "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stoppingstatahead thread: pid %d\n",
1323                                PFID(&lli->lli_fid), sai->sai_hit,
1324                                sai->sai_miss, sai->sai_sent,
1325                                sai->sai_replied, current->pid);
1326                         break;
1327                 }
1328         }
1329         ll_finish_md_op_data(op_data);
1330
1331         RETURN(rc);
1332 }
1333
1334 static void ll_statahead_handle(struct ll_statahead_info *sai,
1335                                 struct dentry *parent, const char *name,
1336                                 int len, const struct lu_fid *fid)
1337 {
1338         struct inode *dir = parent->d_inode;
1339         struct ll_inode_info *lli = ll_i2info(dir);
1340         struct ll_sb_info *sbi = ll_i2sbi(dir);
1341         long timeout;
1342
1343         while (({set_current_state(TASK_IDLE);
1344                 /* matches smp_store_release() in ll_deauthorize_statahead() */
1345                  smp_load_acquire(&sai->sai_task); })) {
1346                 spin_lock(&lli->lli_agl_lock);
1347                 while (sa_sent_full(sai) && !agl_list_empty(sai)) {
1348                         struct ll_inode_info *clli;
1349
1350                         __set_current_state(TASK_RUNNING);
1351                         clli = agl_first_entry(sai);
1352                         list_del_init(&clli->lli_agl_list);
1353                         spin_unlock(&lli->lli_agl_lock);
1354
1355                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
1356                         cond_resched();
1357                         spin_lock(&lli->lli_agl_lock);
1358                 }
1359                 spin_unlock(&lli->lli_agl_lock);
1360
1361                 if (!sa_sent_full(sai))
1362                         break;
1363
1364                 /*
1365                  * If the thread is not doing a stat in 30s then it probably
1366                  * does not care too much about performance, or is no longer
1367                  * using this directory. Stop the statahead thread in this case.
1368                  */
1369                 timeout = schedule_timeout(
1370                                 cfs_time_seconds(sbi->ll_sa_timeout));
1371                 if (timeout == 0) {
1372                         lli->lli_sa_enabled = 0;
1373                         break;
1374                 }
1375         }
1376         __set_current_state(TASK_RUNNING);
1377
1378         sa_statahead(sai, parent, name, len, fid);
1379 }
1380
1381 static int ll_statahead_by_advise(struct ll_statahead_info *sai,
1382                                   struct dentry *parent)
1383 {
1384         struct inode *dir = parent->d_inode;
1385         struct ll_inode_info *lli = ll_i2info(dir);
1386         struct ll_sb_info *sbi = ll_i2sbi(dir);
1387         size_t max_len;
1388         size_t len;
1389         char *fname;
1390         char *ptr;
1391         int rc = 0;
1392         __u64 i = 0;
1393
1394         ENTRY;
1395
1396         CDEBUG(D_READA, "%s: ADVISE statahead: parent %pd fname prefix %s\n",
1397                sbi->ll_fsname, parent, sai->sai_fname);
1398
1399         OBD_ALLOC(fname, NAME_MAX);
1400         if (fname == NULL)
1401                 RETURN(-ENOMEM);
1402
1403         len = strlen(sai->sai_fname);
1404         memcpy(fname, sai->sai_fname, len);
1405         max_len = sizeof(sai->sai_fname) - len;
1406         ptr = fname + len;
1407
1408         /* matches smp_store_release() in ll_deauthorize_statahead() */
1409         while (smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled) {
1410                 size_t numlen;
1411
1412                 numlen = snprintf(ptr, max_len, "%llu",
1413                                   sai->sai_fstart + i);
1414
1415                 ll_statahead_handle(sai, parent, fname, len + numlen, NULL);
1416                 if (++i >= sai->sai_fend)
1417                         break;
1418         }
1419
1420         OBD_FREE(fname, NAME_MAX);
1421         RETURN(rc);
1422 }
1423
1424 static int ll_statahead_by_fname(struct ll_statahead_info *sai,
1425                                  struct dentry *parent)
1426 {
1427         struct inode *dir = parent->d_inode;
1428         struct ll_inode_info *lli = ll_i2info(dir);
1429         struct ll_sb_info *sbi = ll_i2sbi(dir);
1430         size_t max_len;
1431         size_t len;
1432         char *fname;
1433         char *ptr;
1434         int rc = 0;
1435
1436         ENTRY;
1437
1438         CDEBUG(D_READA, "%s: FNAME statahead: parent %pd fname prefix %s\n",
1439                sbi->ll_fsname, parent, sai->sai_fname);
1440
1441         OBD_ALLOC(fname, NAME_MAX);
1442         if (fname == NULL)
1443                 RETURN(-ENOMEM);
1444
1445         len = strlen(sai->sai_fname);
1446         memcpy(fname, sai->sai_fname, len);
1447         max_len = sizeof(sai->sai_fname) - len;
1448         ptr = fname + len;
1449
1450         /* matches smp_store_release() in ll_deauthorize_statahead() */
1451         while (smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled) {
1452                 size_t numlen;
1453
1454                 if (sai->sai_fname_zeroed_len)
1455                         numlen = snprintf(ptr, max_len, "%0*llu",
1456                                           sai->sai_fname_zeroed_len,
1457                                           ++sai->sai_fname_index);
1458                 else
1459                         numlen = snprintf(ptr, max_len, "%llu",
1460                                           ++sai->sai_fname_index);
1461
1462                 ll_statahead_handle(sai, parent, fname, len + numlen, NULL);
1463
1464                 if (sa_low_hit(sai)) {
1465                         rc = -EFAULT;
1466                         atomic_inc(&sbi->ll_sa_wrong);
1467                         CDEBUG(D_CACHE, "%s: low hit ratio for %pd "DFID": hit=%llu miss=%llu sent=%llu replied=%llu, stopping PID %d\n",
1468                                sbi->ll_fsname, parent, PFID(ll_inode2fid(dir)),
1469                                sai->sai_hit, sai->sai_miss, sai->sai_sent,
1470                                sai->sai_replied, current->pid);
1471                         break;
1472                 }
1473         }
1474
1475         OBD_FREE(fname, NAME_MAX);
1476         RETURN(rc);
1477 }
1478
1479 /* statahead thread main function */
1480 static int ll_statahead_thread(void *arg)
1481 {
1482         struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
1483         struct dentry *parent = sai->sai_dentry;
1484         struct inode *dir = parent->d_inode;
1485         struct ll_inode_info *lli = ll_i2info(dir);
1486         struct ll_sb_info *sbi = ll_i2sbi(dir);
1487         struct lu_batch *bh = NULL;
1488         int rc = 0;
1489
1490         ENTRY;
1491
1492         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1493                sai, parent);
1494
1495         sai->sai_max_batch_count = sbi->ll_sa_batch_max;
1496         if (sai->sai_max_batch_count) {
1497                 bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
1498                                      sai->sai_max_batch_count);
1499                 if (IS_ERR(bh))
1500                         GOTO(out_stop_agl, rc = PTR_ERR(bh));
1501         }
1502
1503         sai->sai_bh = bh;
1504
1505         switch (lli->lli_sa_pattern & LSA_PATTERN_MASK) {
1506         case LSA_PATTERN_LIST:
1507                 rc = ll_statahead_by_list(parent);
1508                 break;
1509         case LSA_PATTERN_ADVISE:
1510                 rc = ll_statahead_by_advise(sai, parent);
1511                 break;
1512         case LSA_PATTERN_FNAME:
1513                 rc = ll_statahead_by_fname(sai, parent);
1514                 break;
1515         default:
1516                 rc = -EFAULT;
1517                 break;
1518         }
1519
1520         if (rc < 0) {
1521                 spin_lock(&lli->lli_sa_lock);
1522                 sai->sai_task = NULL;
1523                 lli->lli_sa_enabled = 0;
1524                 spin_unlock(&lli->lli_sa_lock);
1525         }
1526
1527         ll_statahead_flush_nowait(sai);
1528
1529         /*
1530          * statahead is finished, but statahead entries need to be cached, wait
1531          * for file release closedir() call to stop me.
1532          */
1533         while (({set_current_state(TASK_IDLE);
1534                 /* matches smp_store_release() in ll_deauthorize_statahead() */
1535                 smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled; })) {
1536                 schedule();
1537         }
1538         __set_current_state(TASK_RUNNING);
1539
1540         EXIT;
1541
1542         if (bh) {
1543                 rc = md_batch_stop(ll_i2mdexp(dir), sai->sai_bh);
1544                 sai->sai_bh = NULL;
1545         }
1546
1547 out_stop_agl:
1548         ll_stop_agl(sai);
1549
1550         /*
1551          * wait for inflight statahead RPCs to finish, and then we can free sai
1552          * safely because statahead RPC will access sai data
1553          */
1554         while (sai->sai_sent != sai->sai_replied)
1555                 /* in case we're not woken up, timeout wait */
1556                 msleep(125);
1557
1558         CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd hit %llu miss %llu\n",
1559                sbi->ll_fsname, sai, parent, sai->sai_hit, sai->sai_miss);
1560
1561         spin_lock(&lli->lli_sa_lock);
1562         sai->sai_task = NULL;
1563         spin_unlock(&lli->lli_sa_lock);
1564         wake_up(&sai->sai_waitq);
1565
1566         atomic_add(sai->sai_hit, &sbi->ll_sa_hit_total);
1567         atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
1568
1569         ll_sai_put(sai);
1570         ll_sax_put(dir, lli->lli_sax);
1571
1572         return rc;
1573 }
1574
1575 /* authorize opened dir handle @key to statahead */
1576 void ll_authorize_statahead(struct inode *dir, void *key)
1577 {
1578         struct ll_inode_info *lli = ll_i2info(dir);
1579
1580         spin_lock(&lli->lli_sa_lock);
1581         if (!lli->lli_opendir_key && !lli->lli_sai) {
1582                 /*
1583                  * if lli_sai is not NULL, it means previous statahead is not
1584                  * finished yet, we'd better not start a new statahead for now.
1585                  */
1586                 lli->lli_opendir_key = key;
1587                 lli->lli_opendir_pid = current->pid;
1588                 lli->lli_sa_enabled = 1;
1589         }
1590         spin_unlock(&lli->lli_sa_lock);
1591 }
1592
1593 static void ll_deauthorize_statahead_advise(struct inode *dir, void *key)
1594 {
1595         struct ll_inode_info *lli = ll_i2info(dir);
1596         struct ll_file_data *fd = (struct ll_file_data *)key;
1597         struct ll_statahead_info *sai = fd->fd_sai;
1598
1599         if (sai == NULL)
1600                 return;
1601
1602         spin_lock(&lli->lli_sa_lock);
1603         if (sai->sai_task) {
1604                 struct task_struct *task = sai->sai_task;
1605
1606                 /* matches smp_load_acquire() in ll_statahead_thread() */
1607                 smp_store_release(&sai->sai_task, NULL);
1608                 wake_up_process(task);
1609         }
1610         fd->fd_sai = NULL;
1611         spin_unlock(&lli->lli_sa_lock);
1612         ll_sai_put(sai);
1613         LASSERT(lli->lli_sax != NULL);
1614         ll_sax_put(dir, lli->lli_sax);
1615 }
1616
1617 /*
1618  * deauthorize opened dir handle @key to statahead, and notify statahead thread
1619  * to quit if it's running.
1620  */
1621 void ll_deauthorize_statahead(struct inode *dir, void *key)
1622 {
1623         struct ll_inode_info *lli = ll_i2info(dir);
1624         struct ll_statahead_info *sai;
1625
1626         CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1627                PFID(&lli->lli_fid));
1628
1629         if (lli->lli_sa_pattern & LSA_PATTERN_ADVISE) {
1630                 ll_deauthorize_statahead_advise(dir, key);
1631                 return;
1632         }
1633
1634         LASSERT(lli->lli_opendir_pid != 0);
1635         LASSERT(lli->lli_opendir_key == key);
1636         spin_lock(&lli->lli_sa_lock);
1637         lli->lli_opendir_key = NULL;
1638         lli->lli_opendir_pid = 0;
1639         lli->lli_sa_enabled = 0;
1640         lli->lli_sa_pattern = LSA_PATTERN_NONE;
1641         lli->lli_sa_fname_index = 0;
1642         lli->lli_sa_match_count = 0;
1643         sai = lli->lli_sai;
1644         if (sai && sai->sai_task) {
1645                 /*
1646                  * statahead thread may not have quit yet because it needs to
1647                  * cache entries, now it's time to tell it to quit.
1648                  *
1649                  * wake_up_process() provides the necessary barriers
1650                  * to pair with set_current_state().
1651                  */
1652                 struct task_struct *task = sai->sai_task;
1653
1654                 /* matches smp_load_acquire() in ll_statahead_thread() */
1655                 smp_store_release(&sai->sai_task, NULL);
1656                 wake_up_process(task);
1657         }
1658         spin_unlock(&lli->lli_sa_lock);
1659 }
1660
1661 enum {
1662         /**
1663          * not first dirent, or is "."
1664          */
1665         LS_NOT_FIRST_DE = 0,
1666         /**
1667          * the first non-hidden dirent
1668          */
1669         LS_FIRST_DE,
1670         /**
1671          * the first hidden dirent, that is "."
1672          */
1673         LS_FIRST_DOT_DE
1674 };
1675
1676 /* file is first dirent under @dir */
1677 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1678 {
1679         struct qstr *target = &dentry->d_name;
1680         struct md_op_data *op_data;
1681         int dot_de;
1682         struct page *page = NULL;
1683         int rc = LS_NOT_FIRST_DE;
1684         __u64 pos = 0;
1685         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1686
1687         ENTRY;
1688
1689         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1690                                      LUSTRE_OPC_ANY, dir);
1691         if (IS_ERR(op_data))
1692                 RETURN(PTR_ERR(op_data));
1693
1694         if (IS_ENCRYPTED(dir)) {
1695                 int rc2 = llcrypt_fname_alloc_buffer(dir, NAME_MAX, &lltr);
1696
1697                 if (rc2 < 0)
1698                         RETURN(rc2);
1699         }
1700
1701         /**
1702          *FIXME choose the start offset of the readdir
1703          */
1704
1705         page = ll_get_dir_page(dir, op_data, 0, NULL);
1706
1707         while (1) {
1708                 struct lu_dirpage *dp;
1709                 struct lu_dirent  *ent;
1710
1711                 if (IS_ERR(page)) {
1712                         struct ll_inode_info *lli = ll_i2info(dir);
1713
1714                         rc = PTR_ERR(page);
1715                         CERROR("%s: reading dir "DFID" at %llu opendir_pid = %u : rc = %d\n",
1716                                ll_i2sbi(dir)->ll_fsname,
1717                                PFID(ll_inode2fid(dir)), pos,
1718                                lli->lli_opendir_pid, rc);
1719                         break;
1720                 }
1721
1722                 dp = page_address(page);
1723                 for (ent = lu_dirent_start(dp); ent != NULL;
1724                      ent = lu_dirent_next(ent)) {
1725                         __u64 hash;
1726                         int namelen;
1727                         char *name;
1728
1729                         hash = le64_to_cpu(ent->lde_hash);
1730                         /*
1731                          * The ll_get_dir_page() can return any page containing
1732                          * the given hash which may be not the start hash.
1733                          */
1734                         if (unlikely(hash < pos))
1735                                 continue;
1736
1737                         namelen = le16_to_cpu(ent->lde_namelen);
1738                         if (unlikely(namelen == 0))
1739                                 /*
1740                                  * skip dummy record.
1741                                  */
1742                                 continue;
1743
1744                         name = ent->lde_name;
1745                         if (name[0] == '.') {
1746                                 if (namelen == 1)
1747                                         /*
1748                                          * skip "."
1749                                          */
1750                                         continue;
1751                                 else if (name[1] == '.' && namelen == 2)
1752                                         /*
1753                                          * skip ".."
1754                                          */
1755                                         continue;
1756                                 else
1757                                         dot_de = 1;
1758                         } else {
1759                                 dot_de = 0;
1760                         }
1761
1762                         if (dot_de && target->name[0] != '.') {
1763                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1764                                        target->len, target->name,
1765                                        namelen, name);
1766                                 continue;
1767                         }
1768
1769                         if (IS_ENCRYPTED(dir)) {
1770                                 struct llcrypt_str de_name =
1771                                         LLTR_INIT(ent->lde_name, namelen);
1772                                 struct lu_fid fid;
1773
1774                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1775                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1776                                                          &lltr, &fid))
1777                                         continue;
1778                                 name = lltr.name;
1779                                 namelen = lltr.len;
1780                         }
1781
1782                         if (target->len != namelen ||
1783                             memcmp(target->name, name, namelen) != 0)
1784                                 rc = LS_NOT_FIRST_DE;
1785                         else if (!dot_de)
1786                                 rc = LS_FIRST_DE;
1787                         else
1788                                 rc = LS_FIRST_DOT_DE;
1789
1790                         ll_release_page(dir, page, false);
1791                         GOTO(out, rc);
1792                 }
1793                 pos = le64_to_cpu(dp->ldp_hash_end);
1794                 if (pos == MDS_DIR_END_OFF) {
1795                         /*
1796                          * End of directory reached.
1797                          */
1798                         ll_release_page(dir, page, false);
1799                         GOTO(out, rc);
1800                 } else {
1801                         /*
1802                          * chain is exhausted
1803                          * Normal case: continue to the next page.
1804                          */
1805                         ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1806                                               LDF_COLLIDE);
1807                         page = ll_get_dir_page(dir, op_data, pos, NULL);
1808                 }
1809         }
1810         EXIT;
1811 out:
1812         llcrypt_fname_free_buffer(&lltr);
1813         ll_finish_md_op_data(op_data);
1814
1815         return rc;
1816 }
1817
1818 /**
1819  * revalidate @dentryp from statahead cache
1820  *
1821  * \param[in] dir       parent directory
1822  * \param[in] sai       sai structure
1823  * \param[out] dentryp  pointer to dentry which will be revalidated
1824  * \param[in] unplug    unplug statahead window only (normally for negative
1825  *                      dentry)
1826  * \retval              1 on success, dentry is saved in @dentryp
1827  * \retval              0 if revalidation failed (no proper lock on client)
1828  * \retval              negative number upon error
1829  */
1830 static int revalidate_statahead_dentry(struct inode *dir,
1831                                        struct ll_statahead_context *ctx,
1832                                        struct dentry **dentryp,
1833                                        bool unplug)
1834 {
1835         struct sa_entry *entry = NULL;
1836         struct ll_inode_info *lli = ll_i2info(dir);
1837         struct ll_statahead_info *sai = lli->lli_sai;
1838         int rc = 0;
1839
1840         ENTRY;
1841
1842         if (sai && (*dentryp)->d_name.name[0] == '.') {
1843                 if (sai->sai_ls_all ||
1844                     sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1845                         /*
1846                          * Hidden dentry is the first one, or statahead
1847                          * thread does not skip so many hidden dentries
1848                          * before "sai_ls_all" enabled as below.
1849                          */
1850                 } else {
1851                         if (!sai->sai_ls_all)
1852                                 /*
1853                                  * It maybe because hidden dentry is not
1854                                  * the first one, "sai_ls_all" was not
1855                                  * set, then "ls -al" missed. Enable
1856                                  * "sai_ls_all" for such case.
1857                                  */
1858                                 sai->sai_ls_all = 1;
1859
1860                         /*
1861                          * Such "getattr" has been skipped before
1862                          * "sai_ls_all" enabled as above.
1863                          */
1864                         sai->sai_miss_hidden++;
1865                         RETURN(-EAGAIN);
1866                 }
1867         }
1868
1869         if (unplug)
1870                 GOTO(out, rc = 1);
1871
1872         entry = sa_get(ctx, &(*dentryp)->d_name);
1873         if (!entry)
1874                 GOTO(out, rc = -EAGAIN);
1875
1876         if (lli->lli_sa_pattern & LSA_PATTERN_LIST ||
1877             lli->lli_sa_pattern & LSA_PATTERN_FNAME)
1878                 LASSERT(sai == entry->se_sai);
1879         else if (lli->lli_sa_pattern == LSA_PATTERN_ADVISE)
1880                 sai = entry->se_sai;
1881
1882         LASSERT(sai != NULL);
1883         if (!sa_ready(entry)) {
1884                 spin_lock(&lli->lli_sa_lock);
1885                 sai->sai_index_wait = entry->se_index;
1886                 spin_unlock(&lli->lli_sa_lock);
1887                 rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
1888                                              cfs_time_seconds(30));
1889                 if (rc == 0) {
1890                         /*
1891                          * entry may not be ready, so it may be used by inflight
1892                          * statahead RPC, don't free it.
1893                          */
1894                         entry = NULL;
1895                         GOTO(out, rc = -EAGAIN);
1896                 }
1897         }
1898
1899         /*
1900          * We need to see the value that was set immediately before we
1901          * were woken up.
1902          */
1903         if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
1904             entry->se_inode) {
1905                 struct inode *inode = entry->se_inode;
1906                 struct lookup_intent it = { .it_op = IT_GETATTR,
1907                                             .it_lock_handle =
1908                                                 entry->se_handle };
1909                 __u64 bits;
1910
1911                 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1912                                         ll_inode2fid(inode), &bits);
1913                 if (rc == 1) {
1914                         if (!(*dentryp)->d_inode) {
1915                                 struct dentry *alias;
1916
1917                                 alias = ll_splice_alias(inode, *dentryp);
1918                                 if (IS_ERR(alias)) {
1919                                         ll_intent_release(&it);
1920                                         GOTO(out, rc = PTR_ERR(alias));
1921                                 }
1922                                 *dentryp = alias;
1923                                 /*
1924                                  * statahead prepared this inode, transfer inode
1925                                  * refcount from sa_entry to dentry
1926                                  */
1927                                 entry->se_inode = NULL;
1928                         } else if ((*dentryp)->d_inode != inode) {
1929                                 /* revalidate, but inode is recreated */
1930                                 CDEBUG(D_READA,
1931                                        "%s: stale dentry %pd inode " DFID", statahead inode "DFID "\n",
1932                                        ll_i2sbi(inode)->ll_fsname, *dentryp,
1933                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
1934                                        PFID(ll_inode2fid(inode)));
1935                                 ll_intent_release(&it);
1936                                 GOTO(out, rc = -ESTALE);
1937                         }
1938
1939                         if (bits & MDS_INODELOCK_LOOKUP) {
1940                                 d_lustre_revalidate(*dentryp);
1941                                 if (S_ISDIR(inode->i_mode))
1942                                         ll_update_dir_depth_dmv(dir, *dentryp);
1943                         }
1944
1945                         ll_intent_release(&it);
1946                 }
1947         }
1948 out:
1949         /*
1950          * statahead cached sa_entry can be used only once, and will be killed
1951          * right after use, so if lookup/revalidate accessed statahead cache,
1952          * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
1953          * stat this file again, we know we've done statahead before, see
1954          * dentry_may_statahead().
1955          */
1956         if (lld_is_init(*dentryp))
1957                 ll_d2d(*dentryp)->lld_sa_generation = lli->lli_sa_generation;
1958         sa_put(dir, sai, entry);
1959
1960         RETURN(rc);
1961 }
1962
1963 static inline bool
1964 sa_pattern_list_detect(struct inode *dir, struct dentry *dchild, int *first)
1965 {
1966         struct ll_inode_info *lli = ll_i2info(dir);
1967
1968         if (lli->lli_opendir_pid == 0)
1969                 return false;
1970
1971         if (lli->lli_sa_enabled == 0)
1972                 return false;
1973
1974         if (lli->lli_sa_pattern & LSA_PATTERN_LS_NOT_FIRST_DE)
1975                 return false;
1976
1977         *first = is_first_dirent(dir, dchild);
1978         if (*first == LS_NOT_FIRST_DE) {
1979                 /*
1980                  * It is not "ls -{a}l" operation, no need statahead for it.
1981                  * Disable statahead so that subsequent stat() won't waste
1982                  * time to try it.
1983                  */
1984                 spin_lock(&lli->lli_sa_lock);
1985                 if (lli->lli_opendir_pid == current->pid) {
1986                         lli->lli_sa_enabled = 0;
1987                         lli->lli_sa_pattern |= LSA_PATTERN_LS_NOT_FIRST_DE;
1988                 }
1989                 spin_unlock(&lli->lli_sa_lock);
1990                 return false;
1991         }
1992
1993         spin_lock(&lli->lli_sa_lock);
1994         lli->lli_sa_pattern |= LSA_PATTERN_LIST;
1995         spin_unlock(&lli->lli_sa_lock);
1996         return true;
1997 }
1998
1999 static inline bool
2000 sa_pattern_fname_detect(struct inode *dir, struct dentry *dchild)
2001 {
2002         struct ll_inode_info *lli = ll_i2info(dir);
2003         struct qstr *dname = &dchild->d_name;
2004         const unsigned char *name = dname->name;
2005         bool rc = false;
2006         int i;
2007
2008         if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
2009                 return false;
2010
2011         /*
2012          * Parse the format of the file name to determine whether it matches
2013          * the supported file name pattern for statahead (i.e. mdtest.$i).
2014          */
2015         i = dname->len - 1;
2016         if (isdigit(name[i])) {
2017                 long num;
2018                 int ret;
2019
2020                 while (--i >= 0 && isdigit(name[i]))
2021                         /* do nothing */;
2022                 i++;
2023                 ret = kstrtol(&name[i], 0, &num);
2024                 if (ret)
2025                         GOTO(out, rc);
2026
2027                 /*
2028                  * The traversing program do multiple stat() calls on the same
2029                  * children entry. i.e. ls $dir*.
2030                  */
2031                 if (lli->lli_sa_fname_index == num)
2032                         return false;
2033
2034                 if (lli->lli_sa_match_count == 0 ||
2035                     num == lli->lli_sa_fname_index + 1) {
2036                         lli->lli_sa_match_count++;
2037                         lli->lli_sa_fname_index = num;
2038
2039                         if (lli->lli_sa_match_count > LSA_FN_MATCH_HIT) {
2040                                 spin_lock(&lli->lli_sa_lock);
2041                                 lli->lli_sa_pattern |= LSA_PATTERN_FN_UNIQUE;
2042                                 spin_unlock(&lli->lli_sa_lock);
2043                                 GOTO(out, rc = true);
2044                         }
2045
2046                         return false;
2047                 }
2048         }
2049 out:
2050         spin_lock(&lli->lli_sa_lock);
2051         if (rc) {
2052                 lli->lli_sa_pattern |= LSA_PATTERN_FNAME;
2053         } else {
2054                 lli->lli_sa_pattern = LSA_PATTERN_NONE;
2055                 lli->lli_sa_match_count = 0;
2056                 lli->lli_sa_fname_index = 0;
2057                 lli->lli_sa_enabled = 0;
2058         }
2059         spin_unlock(&lli->lli_sa_lock);
2060
2061         return rc;
2062 }
2063
2064 /* detect the statahead pattern. */
2065 static inline bool
2066 sa_pattern_detect(struct inode *dir, struct dentry *dchild, int *first)
2067 {
2068         return sa_pattern_list_detect(dir, dchild, first) ||
2069                sa_pattern_fname_detect(dir, dchild);
2070 }
2071
2072 /**
2073  * start statahead thread
2074  *
2075  * \param[in] dir       parent directory
2076  * \param[in] dentry    dentry that triggers statahead, normally the first
2077  *                      dirent under @dir
2078  * \param[in] agl       indicate whether AGL is needed
2079  * \retval              -EAGAIN on success, because when this function is
2080  *                      called, it's already in lookup call, so client should
2081  *                      do it itself instead of waiting for statahead thread
2082  *                      to do it asynchronously.
2083  * \retval              negative number upon error
2084  */
2085 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
2086                                   bool agl)
2087 {
2088         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
2089         struct ll_inode_info *lli = ll_i2info(dir);
2090         struct ll_statahead_info *sai = NULL;
2091         struct ll_statahead_context *ctx = NULL;
2092         struct dentry *parent = dentry->d_parent;
2093         struct task_struct *task;
2094         struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
2095         int first = LS_FIRST_DE;
2096         int rc = 0;
2097
2098         ENTRY;
2099
2100         if (sa_pattern_detect(dir, dentry, &first) == false)
2101                 RETURN(0);
2102
2103         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
2104                                        sbi->ll_sa_running_max)) {
2105                 CDEBUG(D_READA,
2106                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
2107                 GOTO(out, rc = -EMFILE);
2108         }
2109
2110         sai = ll_sai_alloc(parent);
2111         if (!sai)
2112                 GOTO(out, rc = -ENOMEM);
2113
2114         ctx = ll_sax_alloc(dir);
2115         if (!ctx)
2116                 GOTO(out, rc = -ENOMEM);
2117
2118         sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
2119
2120         if (lli->lli_sa_pattern & LSA_PATTERN_FNAME) {
2121                 struct qstr *dname = &dentry->d_name;
2122                 const unsigned char *name = dname->name;
2123                 int rc;
2124                 int i;
2125
2126                 if (dname->len >= sizeof(sai->sai_fname))
2127                         GOTO(out, rc = -ERANGE);
2128
2129                 i = dname->len;
2130                 while (--i >= 0 && isdigit(name[i]))
2131                         /* do nothing */;
2132                 i++;
2133
2134                 memcpy(sai->sai_fname, dname->name, i);
2135                 sai->sai_fname[i] = '\0';
2136                 sai->sai_fname_index = lli->lli_sa_fname_index;
2137                 /* The front part of the file name is zeroed padding. */
2138                 if (name[i] == '0')
2139                         sai->sai_fname_zeroed_len = dname->len - i;
2140         }
2141
2142         /*
2143          * if current lli_opendir_key was deauthorized, or dir re-opened by
2144          * another process, don't start statahead, otherwise the newly spawned
2145          * statahead thread won't be notified to quit.
2146          */
2147         spin_lock(&lli->lli_sa_lock);
2148         if (unlikely(lli->lli_sai ||
2149                      ((lli->lli_sa_pattern & LSA_PATTERN_LIST) &&
2150                       !lli->lli_opendir_key &&
2151                       lli->lli_opendir_pid != current->pid))) {
2152                 spin_unlock(&lli->lli_sa_lock);
2153                 GOTO(out, rc = -EPERM);
2154         }
2155         lli->lli_sai = sai;
2156         lli->lli_sax = ctx;
2157         spin_unlock(&lli->lli_sa_lock);
2158
2159         CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
2160                current->pid, parent);
2161
2162         task = kthread_create_on_node(ll_statahead_thread, sai, node,
2163                                       "ll_sa_%u", lli->lli_opendir_pid);
2164         if (IS_ERR(task)) {
2165                 spin_lock(&lli->lli_sa_lock);
2166                 lli->lli_sai = NULL;
2167                 spin_unlock(&lli->lli_sa_lock);
2168                 rc = PTR_ERR(task);
2169                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
2170                 GOTO(out, rc);
2171         }
2172
2173         if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
2174                 ll_start_agl(parent, sai);
2175
2176         atomic_inc(&sbi->ll_sa_total);
2177         if (lli->lli_sa_pattern & LSA_PATTERN_LIST)
2178                 atomic_inc(&sbi->ll_sa_list_total);
2179         else if (lli->lli_sa_pattern & LSA_PATTERN_FNAME)
2180                 atomic_inc(&sbi->ll_sa_fname_total);
2181
2182         sai->sai_task = task;
2183         wake_up_process(task);
2184         /*
2185          * We don't stat-ahead for the first dirent since we are already in
2186          * lookup.
2187          */
2188         RETURN(-EAGAIN);
2189
2190 out:
2191         /*
2192          * once we start statahead thread failed, disable statahead so that
2193          * subsequent stat won't waste time to try it.
2194          */
2195         spin_lock(&lli->lli_sa_lock);
2196         if (lli->lli_opendir_pid == current->pid)
2197                 lli->lli_sa_enabled = 0;
2198         spin_unlock(&lli->lli_sa_lock);
2199
2200         if (sai)
2201                 ll_sai_free(sai);
2202
2203         if (ctx)
2204                 ll_sax_free(ctx);
2205
2206         if (first != LS_NOT_FIRST_DE)
2207                 atomic_dec(&sbi->ll_sa_running);
2208
2209         RETURN(rc);
2210 }
2211
2212 /*
2213  * Check whether statahead for @dir was started.
2214  */
2215 static inline bool ll_statahead_started(struct inode *dir, bool agl)
2216 {
2217         struct ll_inode_info *lli = ll_i2info(dir);
2218         struct ll_statahead_context *ctx;
2219         struct ll_statahead_info *sai;
2220
2221         spin_lock(&lli->lli_sa_lock);
2222         ctx = lli->lli_sax;
2223         sai = lli->lli_sai;
2224         if (sai && (sai->sai_agl_task != NULL) != agl)
2225                 CDEBUG(D_READA,
2226                        "%s: Statahead AGL hint changed from %d to %d\n",
2227                        ll_i2sbi(dir)->ll_fsname,
2228                        sai->sai_agl_task != NULL, agl);
2229         spin_unlock(&lli->lli_sa_lock);
2230
2231         return !!ctx;
2232 }
2233
2234 /**
2235  * statahead entry function, this is called when client getattr on a file, it
2236  * will start statahead thread if this is the first dir entry, else revalidate
2237  * dentry from statahead cache.
2238  *
2239  * \param[in]  dir      parent directory
2240  * \param[out] dentryp  dentry to getattr
2241  * \param[in]  agl      whether start the agl thread
2242  *
2243  * \retval              1 on success
2244  * \retval              0 revalidation from statahead cache failed, caller needs
2245  *                      to getattr from server directly
2246  * \retval              negative number on error, caller often ignores this and
2247  *                      then getattr from server
2248  */
2249 int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
2250 {
2251         if (!ll_statahead_started(dir, agl))
2252                 return start_statahead_thread(dir, dentry, agl);
2253         return 0;
2254 }
2255
2256 /**
2257  * revalidate dentry from statahead cache.
2258  *
2259  * \param[in]  dir      parent directory
2260  * \param[out] dentryp  dentry to getattr
2261  * \param[in]  unplug   unplug statahead window only (normally for negative
2262  *                      dentry)
2263  * \retval              1 on success
2264  * \retval              0 revalidation from statahead cache failed, caller needs
2265  *                      to getattr from server directly
2266  * \retval              negative number on error, caller often ignores this and
2267  *                      then getattr from server
2268  */
2269 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
2270                             bool unplug)
2271 {
2272         struct ll_inode_info *lli = ll_i2info(dir);
2273         struct ll_statahead_context *ctx;
2274         struct ll_statahead_info *sai = NULL;
2275         int rc = 0;
2276
2277         spin_lock(&lli->lli_sa_lock);
2278         ctx = lli->lli_sax;
2279         if (ctx) {
2280                 sai = lli->lli_sai;
2281                 if (sai) {
2282                         atomic_inc(&sai->sai_refcount);
2283                 } else if (lli->lli_sa_pattern & LSA_PATTERN_LIST) {
2284                         spin_unlock(&lli->lli_sa_lock);
2285                         return 0;
2286                 }
2287                 __ll_sax_get(ctx);
2288         }
2289         spin_unlock(&lli->lli_sa_lock);
2290         if (ctx) {
2291                 rc = revalidate_statahead_dentry(dir, ctx, dentryp, unplug);
2292                 CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
2293                        *dentryp, rc);
2294                 if (sai)
2295                         ll_sai_put(sai);
2296                 ll_sax_put(dir, ctx);
2297         }
2298         return rc;
2299 }
2300
2301 int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise)
2302 {
2303         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
2304         struct ll_file_data *fd = file->private_data;
2305         struct dentry *dentry = file_dentry(file);
2306         struct inode *dir = dentry->d_inode;
2307         struct ll_inode_info *lli = ll_i2info(dir);
2308         struct ll_sb_info *sbi = ll_i2sbi(dir);
2309         struct ll_statahead_info *sai = NULL;
2310         struct ll_statahead_context *ctx = NULL;
2311         struct task_struct *task;
2312         bool agl = true;
2313         int rc;
2314
2315         ENTRY;
2316
2317         if (sbi->ll_sa_max == 0)
2318                 RETURN(0);
2319
2320         if (!S_ISDIR(dir->i_mode))
2321                 RETURN(-EINVAL);
2322
2323         if (fd->fd_sai) {
2324                 rc = -EALREADY;
2325                 CWARN("%s: already set statahead hint for dir %pd: rc = %d\n",
2326                       sbi->ll_fsname, dentry, rc);
2327                 RETURN(rc);
2328         }
2329
2330         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
2331                                        sbi->ll_sa_running_max)) {
2332                 CDEBUG(D_READA,
2333                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
2334                 GOTO(out, rc = -EMFILE);
2335         }
2336
2337         sai = ll_sai_alloc(dentry);
2338         if (sai == NULL)
2339                 GOTO(out, rc = -ENOMEM);
2340
2341         sai->sai_fstart = ladvise->lla_start;
2342         sai->sai_fend = ladvise->lla_end;
2343         sai->sai_ls_all = 0;
2344         sai->sai_max = sbi->ll_sa_max;
2345         strncpy(sai->sai_fname, ladvise->lla_fname, sizeof(sai->sai_fname));
2346
2347         ctx = ll_sax_get(dir);
2348         if (ctx == NULL) {
2349                 ctx = ll_sax_alloc(dir);
2350                 if (ctx == NULL)
2351                         GOTO(out, rc = -ENOMEM);
2352
2353                 spin_lock(&lli->lli_sa_lock);
2354                 if (unlikely(lli->lli_sax)) {
2355                         struct ll_statahead_context *tmp = ctx;
2356
2357                         if (lli->lli_sa_pattern == LSA_PATTERN_NONE ||
2358                             lli->lli_sa_pattern == LSA_PATTERN_ADVISE) {
2359                                 lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2360                                 ctx = lli->lli_sax;
2361                                 __ll_sax_get(ctx);
2362                                 fd->fd_sai = __ll_sai_get(sai);
2363                                 rc = 0;
2364                         } else {
2365                                 rc = -EINVAL;
2366                                 CWARN("%s: pattern %X is not ADVISE: rc = %d\n",
2367                                       sbi->ll_fsname, lli->lli_sa_pattern, rc);
2368                         }
2369
2370                         spin_unlock(&lli->lli_sa_lock);
2371                         ll_sax_free(tmp);
2372                         if (rc)
2373                                 GOTO(out, rc);
2374                 } else {
2375                         lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2376                         lli->lli_sax = ctx;
2377                         fd->fd_sai = __ll_sai_get(sai);
2378                         spin_unlock(&lli->lli_sa_lock);
2379                 }
2380         } else {
2381                 spin_lock(&lli->lli_sa_lock);
2382                 if (!(lli->lli_sa_pattern == LSA_PATTERN_ADVISE ||
2383                       lli->lli_sa_pattern == LSA_PATTERN_NONE)) {
2384                         spin_unlock(&lli->lli_sa_lock);
2385                         GOTO(out, rc = -EINVAL);
2386                 }
2387
2388                 lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2389                 fd->fd_sai = __ll_sai_get(sai);
2390                 spin_unlock(&lli->lli_sa_lock);
2391         }
2392
2393         __ll_sax_get(ctx);
2394         CDEBUG(D_READA,
2395                "start statahead thread: [pid %d] [parent %pd] sai %p ctx %p\n",
2396                current->pid, dentry, sai, ctx);
2397
2398         task = kthread_create_on_node(ll_statahead_thread, sai, node,
2399                                       "ll_sa_%u", current->pid);
2400         if (IS_ERR(task)) {
2401                 rc = PTR_ERR(task);
2402                 CERROR("%s: cannot start ll_sa thread: rc = %d\n",
2403                        sbi->ll_fsname, rc);
2404                 GOTO(out, rc);
2405         }
2406
2407         if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
2408                 ll_start_agl(dentry, sai);
2409
2410         atomic_inc(&sbi->ll_sa_total);
2411         sai->sai_task = task;
2412         wake_up_process(task);
2413
2414         RETURN(0);
2415 out:
2416         if (fd->fd_sai) {
2417                 ll_sai_put(sai);
2418                 ll_sax_put(dir, ctx);
2419                 fd->fd_sai = NULL;
2420         }
2421
2422         if (sai)
2423                 ll_sai_free(sai);
2424
2425         if (ctx)
2426                 ll_sax_free(ctx);
2427
2428         atomic_dec(&sbi->ll_sa_running);
2429         RETURN(rc);
2430 }
2431
2432 /*
2433  * This function is called in each stat() system call to do statahead check.
2434  * When the files' naming of stat() call sequence under a directory follows
2435  * a certain name rule roughly, this directory is considered as an condicant
2436  * to do statahead.
2437  * For an example, the file naming rule is mdtest.$rank.$i, the suffix of
2438  * the stat() dentry name is number and do stat() for dentries with name
2439  * ending with number more than @LSA_FN_PREDICT_HIT, then the corresponding
2440  * directory is met the requrirement for statahead.
2441  */
2442 void ll_statahead_enter(struct inode *dir, struct dentry *dchild)
2443 {
2444         struct ll_inode_info *lli;
2445         struct qstr *dname = &dchild->d_name;
2446
2447         if (ll_i2sbi(dir)->ll_sa_max == 0)
2448                 return;
2449
2450         if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
2451                 return;
2452
2453         lli = ll_i2info(dir);
2454         if (lli->lli_sa_enabled)
2455                 return;
2456
2457         if (lli->lli_sa_pattern & (LSA_PATTERN_FN_PREDICT | LSA_PATTERN_LIST))
2458                 return;
2459
2460         /*
2461          * Now support number indexing regularized statahead pattern only.
2462          * Quick check whether the last character is digit.
2463          */
2464         if (!isdigit(dname->name[dname->len - 1])) {
2465                 lli->lli_sa_match_count = 0;
2466                 return;
2467         }
2468
2469         lli->lli_sa_match_count++;
2470         if (lli->lli_sa_match_count > LSA_FN_PREDICT_HIT) {
2471                 spin_lock(&lli->lli_sa_lock);
2472                 lli->lli_sa_pattern |= LSA_PATTERN_FN_PREDICT;
2473                 spin_unlock(&lli->lli_sa_lock);
2474                 lli->lli_sa_enabled = 1;
2475                 lli->lli_sa_match_count = 0;
2476         }
2477 }