Whamcloud - gitweb
LU-6142 llite: Fix style issues under lustre/llite
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #include <linux/fs.h>
33 #include <linux/sched.h>
34 #include <linux/kthread.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38 #include <linux/delay.h>
39
40 #define DEBUG_SUBSYSTEM S_LLITE
41
42 #include <obd_support.h>
43 #include <lustre_dlm.h>
44 #include "llite_internal.h"
45
46 #define SA_OMITTED_ENTRY_MAX 8ULL
47
48 typedef enum {
49         /** negative values are for error cases */
50         SA_ENTRY_INIT = 0,      /** init entry */
51         SA_ENTRY_SUCC = 1,      /** stat succeed */
52         SA_ENTRY_INVA = 2,      /** invalid entry */
53 } se_state_t;
54
55 /*
56  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
57  * and in async stat callback ll_statahead_interpret() will prepare the inode
58  * and set lock data in the ptlrpcd context. Then the scanner process will be
59  * woken up if this entry is the waiting one, can access and free it.
60  */
61 struct sa_entry {
62         /* link into sai_entries */
63         struct list_head                 se_list;
64         /* link into sai hash table locally */
65         struct list_head                 se_hash;
66         /* entry index in the sai */
67         __u64                            se_index;
68         /* low layer ldlm lock handle */
69         __u64                            se_handle;
70         /* entry status */
71         se_state_t                       se_state;
72         /* entry size, contains name */
73         int                              se_size;
74         /* pointer to the target inode */
75         struct inode                    *se_inode;
76         /* pointer to @sai per process struct */
77         struct ll_statahead_info        *se_sai;
78         /* entry name */
79         struct qstr                      se_qstr;
80         /* entry fid */
81         struct lu_fid                    se_fid;
82 };
83
84 static unsigned int sai_generation;
85 static DEFINE_SPINLOCK(sai_generation_lock);
86
87 static inline int sa_unhashed(struct sa_entry *entry)
88 {
89         return list_empty(&entry->se_hash);
90 }
91
92 /* sa_entry is ready to use */
93 static inline int sa_ready(struct sa_entry *entry)
94 {
95         /* Make sure sa_entry is updated and ready to use */
96         smp_rmb();
97         return (entry->se_state != SA_ENTRY_INIT);
98 }
99
100 /* hash value to put in sai_cache */
101 static inline int sa_hash(int val)
102 {
103         return val & LL_SA_CACHE_MASK;
104 }
105
106 /* hash entry into sax_cache */
107 static inline void
108 sa_rehash(struct ll_statahead_context *ctx, struct sa_entry *entry)
109 {
110         int i = sa_hash(entry->se_qstr.hash);
111
112         spin_lock(&ctx->sax_cache_lock[i]);
113         list_add_tail(&entry->se_hash, &ctx->sax_cache[i]);
114         spin_unlock(&ctx->sax_cache_lock[i]);
115 }
116
117 /* unhash entry from sai_cache */
118 static inline void
119 sa_unhash(struct ll_statahead_context *ctx, struct sa_entry *entry)
120 {
121         int i = sa_hash(entry->se_qstr.hash);
122
123         spin_lock(&ctx->sax_cache_lock[i]);
124         list_del_init(&entry->se_hash);
125         spin_unlock(&ctx->sax_cache_lock[i]);
126 }
127
128 static inline int agl_should_run(struct ll_statahead_info *sai,
129                                  struct inode *inode)
130 {
131         return inode && S_ISREG(inode->i_mode) && sai->sai_agl_task;
132 }
133
134 static inline struct ll_inode_info *
135 agl_first_entry(struct ll_statahead_info *sai)
136 {
137         return list_first_entry(&sai->sai_agls, struct ll_inode_info,
138                                 lli_agl_list);
139 }
140
141 /* statahead window is full */
142 static inline int sa_sent_full(struct ll_statahead_info *sai)
143 {
144         return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
145 }
146
147 /* Batch metadata handle */
148 static inline bool sa_has_batch_handle(struct ll_statahead_info *sai)
149 {
150         return sai->sai_bh != NULL;
151 }
152
153 static inline void ll_statahead_flush_nowait(struct ll_statahead_info *sai)
154 {
155         if (sa_has_batch_handle(sai)) {
156                 sai->sai_index_end = sai->sai_index - 1;
157                 (void) md_batch_flush(ll_i2mdexp(sai->sai_dentry->d_inode),
158                                       sai->sai_bh, false);
159         }
160 }
161
162 static inline int agl_list_empty(struct ll_statahead_info *sai)
163 {
164         return list_empty(&sai->sai_agls);
165 }
166
167 /**
168  * (1) hit ratio less than 80%
169  * or
170  * (2) consecutive miss more than 8
171  * then means low hit.
172  */
173 static inline int sa_low_hit(struct ll_statahead_info *sai)
174 {
175         return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
176                 (sai->sai_consecutive_miss > 8));
177 }
178
179 /*
180  * if the given index is behind of statahead window more than
181  * SA_OMITTED_ENTRY_MAX, then it is old.
182  */
183 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
184 {
185         return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
186                 sai->sai_index);
187 }
188
189 /* allocate sa_entry and hash it to allow scanner process to find it */
190 static struct sa_entry *
191 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
192          const char *name, int len, const struct lu_fid *fid)
193 {
194         struct ll_inode_info *lli;
195         struct sa_entry *entry;
196         int entry_size;
197         char *dname;
198
199         ENTRY;
200
201         entry_size = sizeof(struct sa_entry) +
202                      round_up(len + 1 /* for trailing NUL */, 4);
203         OBD_ALLOC(entry, entry_size);
204         if (unlikely(!entry))
205                 RETURN(ERR_PTR(-ENOMEM));
206
207         CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
208                len, name, entry, index);
209
210         entry->se_index = index;
211         entry->se_sai = sai;
212
213         entry->se_state = SA_ENTRY_INIT;
214         entry->se_size = entry_size;
215         dname = (char *)entry + sizeof(struct sa_entry);
216         memcpy(dname, name, len);
217         dname[len] = 0;
218         entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
219         entry->se_qstr.len = len;
220         entry->se_qstr.name = dname;
221
222         if (fid)
223                 entry->se_fid = *fid;
224
225         lli = ll_i2info(sai->sai_dentry->d_inode);
226         spin_lock(&lli->lli_sa_lock);
227         INIT_LIST_HEAD(&entry->se_list);
228         sa_rehash(lli->lli_sax, entry);
229         spin_unlock(&lli->lli_sa_lock);
230
231         atomic_inc(&sai->sai_cache_count);
232
233         RETURN(entry);
234 }
235
236 /* free sa_entry, which should have been unhashed and not in any list */
237 static void sa_free(struct ll_statahead_context *ctx, struct sa_entry *entry)
238 {
239         CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
240                entry->se_qstr.len, entry->se_qstr.name, entry,
241                entry->se_index);
242
243         LASSERT(list_empty(&entry->se_list));
244         LASSERT(sa_unhashed(entry));
245
246         OBD_FREE(entry, entry->se_size);
247 }
248
249 /*
250  * find sa_entry by name, used by directory scanner, lock is not needed because
251  * only scanner can remove the entry from cache.
252  */
253 static struct sa_entry *
254 sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
255 {
256         struct sa_entry *entry;
257         int i = sa_hash(qstr->hash);
258
259         spin_lock(&ctx->sax_cache_lock[i]);
260         list_for_each_entry(entry, &ctx->sax_cache[i], se_hash) {
261                 if (entry->se_qstr.hash == qstr->hash &&
262                     entry->se_qstr.len == qstr->len &&
263                     memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) {
264                         spin_unlock(&ctx->sax_cache_lock[i]);
265                         return entry;
266                 }
267         }
268         spin_unlock(&ctx->sax_cache_lock[i]);
269         return NULL;
270 }
271
272 /* unhash and unlink sa_entry, and then free it */
273 static inline void
274 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry, bool locked)
275 {
276         struct inode *dir = sai->sai_dentry->d_inode;
277         struct ll_inode_info *lli = ll_i2info(dir);
278         struct ll_statahead_context *ctx = lli->lli_sax;
279
280         LASSERT(!sa_unhashed(entry));
281         LASSERT(!list_empty(&entry->se_list));
282         LASSERT(sa_ready(entry));
283
284         sa_unhash(ctx, entry);
285
286         if (!locked)
287                 spin_lock(&lli->lli_sa_lock);
288         list_del_init(&entry->se_list);
289         spin_unlock(&lli->lli_sa_lock);
290
291         iput(entry->se_inode);
292         atomic_dec(&sai->sai_cache_count);
293         sa_free(ctx, entry);
294         if (locked)
295                 spin_lock(&lli->lli_sa_lock);
296 }
297
298 /* called by scanner after use, sa_entry will be killed */
299 static void
300 sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
301 {
302         struct ll_inode_info *lli = ll_i2info(dir);
303         struct sa_entry *tmp;
304         bool wakeup = false;
305
306         if (entry && entry->se_state == SA_ENTRY_SUCC) {
307                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
308
309                 sai->sai_hit++;
310                 sai->sai_consecutive_miss = 0;
311                 if (sai->sai_max < sbi->ll_sa_max) {
312                         sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
313                         wakeup = true;
314                 } else if (sai->sai_max_batch_count > 0) {
315                         if (sai->sai_max >= sai->sai_max_batch_count &&
316                            (sai->sai_index_end - entry->se_index) %
317                            sai->sai_max_batch_count == 0) {
318                                 wakeup = true;
319                         } else if (entry->se_index == sai->sai_index_end) {
320                                 wakeup = true;
321                         }
322                 } else {
323                         wakeup = true;
324                 }
325         } else if (sai) {
326                 sai->sai_miss++;
327                 sai->sai_consecutive_miss++;
328                 wakeup = true;
329         }
330
331         if (entry)
332                 sa_kill(sai, entry, false);
333
334         if (sai) {
335                 /*
336                  * kill old completed entries. Maybe kicking old entries can
337                  * be ignored?
338                  */
339                 spin_lock(&lli->lli_sa_lock);
340                 while ((tmp = list_first_entry_or_null(&sai->sai_entries,
341                                 struct sa_entry, se_list))) {
342                         if (!is_omitted_entry(sai, tmp->se_index))
343                                 break;
344
345                         /* ll_sa_lock is dropped by sa_kill(), restart list */
346                         sa_kill(sai, tmp, true);
347                 }
348                 spin_unlock(&lli->lli_sa_lock);
349         }
350
351         spin_lock(&lli->lli_sa_lock);
352         if (wakeup && sai->sai_task)
353                 wake_up_process(sai->sai_task);
354         spin_unlock(&lli->lli_sa_lock);
355 }
356
357 /*
358  * update state and sort add entry to sai_entries by index, return true if
359  * scanner is waiting on this entry.
360  */
361 static bool
362 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
363 {
364         struct sa_entry *se;
365         struct list_head *pos = &sai->sai_entries;
366         __u64 index = entry->se_index;
367
368         LASSERT(!sa_ready(entry));
369         LASSERT(list_empty(&entry->se_list));
370
371         list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
372                 if (se->se_index < entry->se_index) {
373                         pos = &se->se_list;
374                         break;
375                 }
376         }
377         list_add(&entry->se_list, pos);
378         /*
379          * LU-9210: ll_statahead_interpet must be able to see this before
380          * we wake it up
381          */
382         smp_store_release(&entry->se_state,
383                           ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
384
385         return (index == sai->sai_index_wait);
386 }
387
388 /* finish async stat RPC arguments */
389 static void sa_fini_data(struct md_op_item *item)
390 {
391         struct md_op_data *op_data = &item->mop_data;
392
393         if (op_data->op_flags & MF_OPNAME_KMALLOCED)
394                 /* allocated via ll_setup_filename called from sa_prep_data */
395                 kfree(op_data->op_name);
396         ll_unlock_md_op_lsm(&item->mop_data);
397         iput(item->mop_dir);
398         if (item->mop_subpill_allocated)
399                 OBD_FREE_PTR(item->mop_pill);
400         OBD_FREE_PTR(item);
401 }
402
403 static int ll_statahead_interpret(struct md_op_item *item, int rc);
404
405 /*
406  * prepare arguments for async stat RPC.
407  */
408 static struct md_op_item *
409 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
410 {
411         struct md_op_item *item;
412         struct ldlm_enqueue_info *einfo;
413         struct md_op_data *op_data;
414
415         OBD_ALLOC_PTR(item);
416         if (!item)
417                 return ERR_PTR(-ENOMEM);
418
419         op_data = ll_prep_md_op_data(&item->mop_data, dir, child,
420                                      entry->se_qstr.name, entry->se_qstr.len, 0,
421                                      LUSTRE_OPC_ANY, NULL);
422         if (IS_ERR(op_data)) {
423                 OBD_FREE_PTR(item);
424                 return (struct md_op_item *)op_data;
425         }
426
427         if (!child)
428                 op_data->op_fid2 = entry->se_fid;
429
430         item->mop_opc = MD_OP_GETATTR;
431         item->mop_it.it_op = IT_GETATTR;
432         item->mop_dir = igrab(dir);
433         item->mop_cb = ll_statahead_interpret;
434         item->mop_cbdata = entry;
435
436         einfo = &item->mop_einfo;
437         einfo->ei_type = LDLM_IBITS;
438         einfo->ei_mode = it_to_lock_mode(&item->mop_it);
439         einfo->ei_cb_bl = ll_md_blocking_ast;
440         einfo->ei_cb_cp = ldlm_completion_ast;
441         einfo->ei_cb_gl = NULL;
442         einfo->ei_cbdata = NULL;
443         einfo->ei_req_slot = 1;
444
445         return item;
446 }
447
448 /*
449  * release resources used in async stat RPC, update entry state and wakeup if
450  * scanner process it waiting on this entry.
451  */
452 static void
453 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
454 {
455         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
456         bool wakeup;
457
458         spin_lock(&lli->lli_sa_lock);
459         wakeup = __sa_make_ready(sai, entry, ret);
460         spin_unlock(&lli->lli_sa_lock);
461
462         if (wakeup)
463                 wake_up(&sai->sai_waitq);
464 }
465
466 /* insert inode into the list of sai_agls */
467 static void ll_agl_add(struct ll_statahead_info *sai,
468                        struct inode *inode, int index)
469 {
470         struct ll_inode_info *child  = ll_i2info(inode);
471         struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
472
473         spin_lock(&child->lli_agl_lock);
474         if (child->lli_agl_index == 0) {
475                 child->lli_agl_index = index;
476                 spin_unlock(&child->lli_agl_lock);
477
478                 LASSERT(list_empty(&child->lli_agl_list));
479
480                 spin_lock(&parent->lli_agl_lock);
481                 /* Re-check under the lock */
482                 if (agl_should_run(sai, inode)) {
483                         if (agl_list_empty(sai))
484                                 wake_up_process(sai->sai_agl_task);
485                         igrab(inode);
486                         list_add_tail(&child->lli_agl_list, &sai->sai_agls);
487                 } else
488                         child->lli_agl_index = 0;
489                 spin_unlock(&parent->lli_agl_lock);
490         } else {
491                 spin_unlock(&child->lli_agl_lock);
492         }
493 }
494
495 /* Allocate sax */
496 static struct ll_statahead_context *ll_sax_alloc(struct inode *dir)
497 {
498         struct ll_statahead_context *ctx;
499         int i;
500
501         ENTRY;
502
503         OBD_ALLOC_PTR(ctx);
504         if (ctx == NULL)
505                 RETURN(NULL);
506
507         ctx->sax_inode = igrab(dir);
508         atomic_set(&ctx->sax_refcount, 1);
509         for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
510                 INIT_LIST_HEAD(&ctx->sax_cache[i]);
511                 spin_lock_init(&ctx->sax_cache_lock[i]);
512         }
513
514         RETURN(ctx);
515 }
516
517 static inline void ll_sax_free(struct ll_statahead_context *ctx)
518 {
519         LASSERT(ctx->sax_inode != NULL);
520         iput(ctx->sax_inode);
521         OBD_FREE_PTR(ctx);
522 }
523
524 static inline void __ll_sax_get(struct ll_statahead_context *ctx)
525 {
526         atomic_inc(&ctx->sax_refcount);
527 }
528
529 static inline struct ll_statahead_context *ll_sax_get(struct inode *dir)
530 {
531         struct ll_inode_info *lli = ll_i2info(dir);
532         struct ll_statahead_context *ctx = NULL;
533
534         spin_lock(&lli->lli_sa_lock);
535         ctx = lli->lli_sax;
536         if (ctx)
537                 __ll_sax_get(ctx);
538         spin_unlock(&lli->lli_sa_lock);
539
540         return ctx;
541 }
542
543 static inline void ll_sax_put(struct inode *dir,
544                               struct ll_statahead_context *ctx)
545 {
546         struct ll_inode_info *lli = ll_i2info(dir);
547
548         if (atomic_dec_and_lock(&ctx->sax_refcount, &lli->lli_sa_lock)) {
549                 lli->lli_sai = NULL;
550                 lli->lli_sax = NULL;
551                 if (lli->lli_sa_pattern & (LSA_PATTERN_ADVISE |
552                                            LSA_PATTERN_FNAME)) {
553                         lli->lli_opendir_key = NULL;
554                         lli->lli_opendir_pid = 0;
555                         lli->lli_sa_enabled = 0;
556                 }
557                 lli->lli_sa_pattern = LSA_PATTERN_NONE;
558                 spin_unlock(&lli->lli_sa_lock);
559
560                 ll_sax_free(ctx);
561         }
562 }
563
564 /* allocate sai */
565 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
566 {
567         struct ll_statahead_info *sai;
568         struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
569
570         ENTRY;
571
572         OBD_ALLOC_PTR(sai);
573         if (!sai)
574                 RETURN(NULL);
575
576         sai->sai_dentry = dget(dentry);
577         atomic_set(&sai->sai_refcount, 1);
578         sai->sai_max = ll_i2sbi(dentry->d_inode)->ll_sa_min;
579         sai->sai_index = 1;
580         init_waitqueue_head(&sai->sai_waitq);
581
582         INIT_LIST_HEAD(&sai->sai_entries);
583         INIT_LIST_HEAD(&sai->sai_agls);
584
585         atomic_set(&sai->sai_cache_count, 0);
586
587         spin_lock(&sai_generation_lock);
588         lli->lli_sa_generation = ++sai_generation;
589         if (unlikely(sai_generation == 0))
590                 lli->lli_sa_generation = ++sai_generation;
591         spin_unlock(&sai_generation_lock);
592
593         RETURN(sai);
594 }
595
596 /* free sai */
597 static inline void ll_sai_free(struct ll_statahead_info *sai)
598 {
599         LASSERT(sai->sai_dentry != NULL);
600         dput(sai->sai_dentry);
601         OBD_FREE_PTR(sai);
602 }
603
604 static inline struct ll_statahead_info *
605 __ll_sai_get(struct ll_statahead_info *sai)
606 {
607         atomic_inc(&sai->sai_refcount);
608         return sai;
609 }
610
611 /*
612  * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
613  * attached to it.
614  */
615 static void ll_sai_put(struct ll_statahead_info *sai)
616 {
617         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
618
619         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
620                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
621                 struct sa_entry *entry, *next;
622
623                 lli->lli_sai = NULL;
624                 spin_unlock(&lli->lli_sa_lock);
625
626                 LASSERT(!sai->sai_task);
627                 LASSERT(!sai->sai_agl_task);
628                 LASSERT(sai->sai_sent == sai->sai_replied);
629
630                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
631                                          se_list)
632                         sa_kill(sai, entry, false);
633
634                 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
635                 LASSERT(agl_list_empty(sai));
636
637                 ll_sai_free(sai);
638                 atomic_dec(&sbi->ll_sa_running);
639         }
640 }
641
642 /* Do NOT forget to drop inode refcount when into sai_agls. */
643 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
644 {
645         struct ll_inode_info *lli = ll_i2info(inode);
646         u64 index = lli->lli_agl_index;
647         ktime_t expire;
648         int rc;
649
650         ENTRY;
651
652         LASSERT(list_empty(&lli->lli_agl_list));
653
654         /* AGL maybe fall behind statahead with one entry */
655         if (is_omitted_entry(sai, index + 1)) {
656                 lli->lli_agl_index = 0;
657                 iput(inode);
658                 RETURN_EXIT;
659         }
660
661         /*
662          * In case of restore, the MDT has the right size and has already
663          * sent it back without granting the layout lock, inode is up-to-date.
664          * Then AGL (async glimpse lock) is useless.
665          * Also to glimpse we need the layout, in case of a runninh restore
666          * the MDT holds the layout lock so the glimpse will block up to the
667          * end of restore (statahead/agl will block)
668          */
669         if (test_bit(LLIF_FILE_RESTORING, &lli->lli_flags)) {
670                 lli->lli_agl_index = 0;
671                 iput(inode);
672                 RETURN_EXIT;
673         }
674
675         /* Someone is in glimpse (sync or async), do nothing. */
676         rc = down_write_trylock(&lli->lli_glimpse_sem);
677         if (rc == 0) {
678                 lli->lli_agl_index = 0;
679                 iput(inode);
680                 RETURN_EXIT;
681         }
682
683         /*
684          * Someone triggered glimpse within 1 sec before.
685          * 1) The former glimpse succeeded with glimpse lock granted by OST, and
686          *    if the lock is still cached on client, AGL needs to do nothing. If
687          *    it is cancelled by other client, AGL maybe cannot obtaion new lock
688          *    for no glimpse callback triggered by AGL.
689          * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
690          *    Under such case, it is quite possible that the OST will not grant
691          *    glimpse lock for AGL also.
692          * 3) The former glimpse failed, compared with other two cases, it is
693          *    relative rare. AGL can ignore such case, and it will not muchly
694          *    affect the performance.
695          */
696         expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
697         if (ktime_to_ns(lli->lli_glimpse_time) &&
698             ktime_before(expire, lli->lli_glimpse_time)) {
699                 up_write(&lli->lli_glimpse_sem);
700                 lli->lli_agl_index = 0;
701                 iput(inode);
702                 RETURN_EXIT;
703         }
704
705         CDEBUG(D_READA,
706                "Handling (init) async glimpse: inode = " DFID", idx = %llu\n",
707                PFID(&lli->lli_fid), index);
708
709         cl_agl(inode);
710         lli->lli_agl_index = 0;
711         lli->lli_glimpse_time = ktime_get();
712         up_write(&lli->lli_glimpse_sem);
713
714         CDEBUG(D_READA,
715                "Handled (init) async glimpse: inode= " DFID", idx = %llu, rc = %d\n",
716                PFID(&lli->lli_fid), index, rc);
717
718         iput(inode);
719
720         EXIT;
721 }
722
723 static void ll_statahead_interpret_fini(struct ll_inode_info *lli,
724                                         struct ll_statahead_info *sai,
725                                         struct md_op_item *item,
726                                         struct sa_entry *entry,
727                                         struct ptlrpc_request *req,
728                                         int rc)
729 {
730         /*
731          * First it will drop ldlm ibits lock refcount by calling
732          * ll_intent_drop_lock() in spite of failures. Do not worry about
733          * calling ll_intent_drop_lock() more than once.
734          */
735         ll_intent_release(&item->mop_it);
736         sa_fini_data(item);
737         if (req)
738                 ptlrpc_req_finished(req);
739         sa_make_ready(sai, entry, rc);
740
741         spin_lock(&lli->lli_sa_lock);
742         sai->sai_replied++;
743         spin_unlock(&lli->lli_sa_lock);
744 }
745
746 static void ll_statahead_interpret_work(struct work_struct *work)
747 {
748         struct md_op_item *item = container_of(work, struct md_op_item,
749                                                mop_work);
750         struct req_capsule *pill = item->mop_pill;
751         struct inode *dir = item->mop_dir;
752         struct ll_inode_info *lli = ll_i2info(dir);
753         struct ll_statahead_info *sai;
754         struct lookup_intent *it;
755         struct sa_entry *entry;
756         struct mdt_body *body;
757         struct inode *child;
758         int rc;
759
760         ENTRY;
761
762         entry = (struct sa_entry *)item->mop_cbdata;
763         LASSERT(entry->se_handle != 0);
764
765         sai = entry->se_sai;
766         it = &item->mop_it;
767         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
768         if (!body)
769                 GOTO(out, rc = -EFAULT);
770
771         child = entry->se_inode;
772         /* revalidate; unlinked and re-created with the same name */
773         if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
774                      !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
775                 if (child) {
776                         entry->se_inode = NULL;
777                         iput(child);
778                 }
779                 /* The mdt_body is invalid. Skip this entry */
780                 GOTO(out, rc = -EAGAIN);
781         }
782
783         it->it_lock_handle = entry->se_handle;
784         rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
785         if (rc != 1)
786                 GOTO(out, rc = -EAGAIN);
787
788         rc = ll_prep_inode(&child, pill, dir->i_sb, it);
789         if (rc) {
790                 CERROR("%s: getattr callback for %.*s "DFID": rc = %d\n",
791                        ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
792                        entry->se_qstr.name, PFID(&entry->se_fid), rc);
793                 GOTO(out, rc);
794         }
795
796         /* If encryption context was returned by MDT, put it in
797          * inode now to save an extra getxattr.
798          */
799         if (body->mbo_valid & OBD_MD_ENCCTX) {
800                 void *encctx = req_capsule_server_get(pill, &RMF_FILE_ENCCTX);
801                 __u32 encctxlen = req_capsule_get_size(pill, &RMF_FILE_ENCCTX,
802                                                        RCL_SERVER);
803
804                 if (encctxlen) {
805                         CDEBUG(D_SEC,
806                                "server returned encryption ctx for "DFID"\n",
807                                PFID(ll_inode2fid(child)));
808                         rc = ll_xattr_cache_insert(child,
809                                                    xattr_for_enc(child),
810                                                    encctx, encctxlen);
811                         if (rc)
812                                 CWARN("%s: cannot set enc ctx for "DFID": rc = %d\n",
813                                       ll_i2sbi(child)->ll_fsname,
814                                       PFID(ll_inode2fid(child)), rc);
815                 }
816         }
817
818         CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
819                ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
820                entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
821         ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
822
823         entry->se_inode = child;
824
825         if (agl_should_run(sai, child))
826                 ll_agl_add(sai, child, entry->se_index);
827 out:
828         ll_statahead_interpret_fini(lli, sai, item, entry, pill->rc_req, rc);
829 }
830
831 /*
832  * Callback for async stat RPC, this is called in ptlrpcd context. It prepares
833  * the inode and set lock data directly in the ptlrpcd context. It will wake up
834  * the directory listing process if the dentry is the waiting one.
835  */
836 static int ll_statahead_interpret(struct md_op_item *item, int rc)
837 {
838         struct req_capsule *pill = item->mop_pill;
839         struct lookup_intent *it = &item->mop_it;
840         struct inode *dir = item->mop_dir;
841         struct ll_inode_info *lli = ll_i2info(dir);
842         struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
843         struct work_struct *work = &item->mop_work;
844         struct ll_statahead_info *sai;
845         struct mdt_body *body;
846         struct inode *child;
847         __u64 handle = 0;
848
849         ENTRY;
850
851         if (it_disposition(it, DISP_LOOKUP_NEG))
852                 rc = -ENOENT;
853
854         /*
855          * because statahead thread will wait for all inflight RPC to finish,
856          * sai should be always valid, no need to refcount
857          */
858         LASSERT(entry != NULL);
859         sai = entry->se_sai;
860         LASSERT(sai != NULL);
861
862         CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
863                entry->se_qstr.len, entry->se_qstr.name, rc);
864
865         if (rc != 0)
866                 GOTO(out, rc);
867
868         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
869         if (!body)
870                 GOTO(out, rc = -EFAULT);
871
872         child = entry->se_inode;
873         /*
874          * revalidate; unlinked and re-created with the same name.
875          * exclude the case where FID is zero as it was from statahead with
876          * regularized file name pattern and had no idea for the FID of the
877          * children file.
878          */
879         if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
880                      !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
881                 if (child) {
882                         entry->se_inode = NULL;
883                         iput(child);
884                 }
885                 /* The mdt_body is invalid. Skip this entry */
886                 GOTO(out, rc = -EAGAIN);
887         }
888
889         entry->se_handle = it->it_lock_handle;
890         /*
891          * In ptlrpcd context, it is not allowed to generate new RPCs
892          * especially for striped directories or regular files with layout
893          * change.
894          */
895         /*
896          * release ibits lock ASAP to avoid deadlock when statahead
897          * thread enqueues lock on parent in readdir and another
898          * process enqueues lock on child with parent lock held, eg.
899          * unlink.
900          */
901         handle = it->it_lock_handle;
902         ll_intent_drop_lock(it);
903         ll_unlock_md_op_lsm(&item->mop_data);
904
905         /*
906          * If the statahead entry is a striped directory or regular file with
907          * layout change, it will generate a new RPC and long wait in the
908          * ptlrpcd context.
909          * However, it is dangerous of blocking in ptlrpcd thread.
910          * Here we use work queue or the separate statahead thread to handle
911          * the extra RPC and long wait:
912          *      (@ll_prep_inode->@lmv_revalidate_slaves);
913          *      (@ll_prep_inode->@lov_layout_change->osc_cache_wait_range);
914          */
915         INIT_WORK(work, ll_statahead_interpret_work);
916         ptlrpc_request_addref(pill->rc_req);
917         schedule_work(work);
918         RETURN(0);
919 out:
920         ll_statahead_interpret_fini(lli, sai, item, entry, NULL, rc);
921         RETURN(rc);
922 }
923
924 static inline int sa_getattr(struct ll_statahead_info *sai, struct inode *dir,
925                              struct md_op_item *item)
926 {
927         int rc;
928
929         if (sa_has_batch_handle(sai))
930                 rc = md_batch_add(ll_i2mdexp(dir), sai->sai_bh, item);
931         else
932                 rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
933
934         return rc;
935 }
936
937 /* async stat for file not found in dcache */
938 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
939 {
940         struct md_op_item *item;
941         int rc;
942
943         ENTRY;
944
945         item = sa_prep_data(dir, NULL, entry);
946         if (IS_ERR(item))
947                 RETURN(PTR_ERR(item));
948
949         rc = sa_getattr(entry->se_sai, dir, item);
950         if (rc < 0)
951                 sa_fini_data(item);
952
953         RETURN(rc);
954 }
955
956 /**
957  * async stat for file found in dcache, similar to .revalidate
958  *
959  * \retval      1 dentry valid, no RPC sent
960  * \retval      0 dentry invalid, will send async stat RPC
961  * \retval      negative number upon error
962  */
963 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
964                          struct dentry *dentry)
965 {
966         struct inode *inode = dentry->d_inode;
967         struct lookup_intent it = { .it_op = IT_GETATTR,
968                                     .it_lock_handle = 0 };
969         struct md_op_item *item;
970         int rc;
971
972         ENTRY;
973
974         if (unlikely(!inode))
975                 RETURN(1);
976
977         if (d_mountpoint(dentry))
978                 RETURN(1);
979
980         item = sa_prep_data(dir, inode, entry);
981         if (IS_ERR(item))
982                 RETURN(PTR_ERR(item));
983
984         entry->se_inode = igrab(inode);
985         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
986                                 NULL);
987         if (rc == 1) {
988                 entry->se_handle = it.it_lock_handle;
989                 ll_intent_release(&it);
990                 sa_fini_data(item);
991                 RETURN(1);
992         }
993
994         rc = sa_getattr(entry->se_sai, dir, item);
995         if (rc < 0) {
996                 entry->se_inode = NULL;
997                 iput(inode);
998                 sa_fini_data(item);
999         }
1000
1001         RETURN(rc);
1002 }
1003
1004 /* async stat for file with @name */
1005 static void sa_statahead(struct ll_statahead_info *sai, struct dentry *parent,
1006                          const char *name, int len, const struct lu_fid *fid)
1007 {
1008         struct inode *dir = parent->d_inode;
1009         struct dentry *dentry = NULL;
1010         struct sa_entry *entry;
1011         int rc;
1012
1013         ENTRY;
1014
1015         entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
1016         if (IS_ERR(entry))
1017                 RETURN_EXIT;
1018
1019         dentry = d_lookup(parent, &entry->se_qstr);
1020         if (!dentry) {
1021                 rc = sa_lookup(dir, entry);
1022         } else {
1023                 rc = sa_revalidate(dir, entry, dentry);
1024                 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
1025                         ll_agl_add(sai, dentry->d_inode, entry->se_index);
1026         }
1027
1028         if (dentry)
1029                 dput(dentry);
1030
1031         if (rc != 0)
1032                 sa_make_ready(sai, entry, rc);
1033         else
1034                 sai->sai_sent++;
1035
1036         sai->sai_index++;
1037
1038         if (sa_sent_full(sai))
1039                 ll_statahead_flush_nowait(sai);
1040
1041         EXIT;
1042 }
1043
1044 /* async glimpse (agl) thread main function */
1045 static int ll_agl_thread(void *arg)
1046 {
1047         /*
1048          * We already own this reference, so it is safe to take it
1049          * without a lock.
1050          */
1051         struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
1052         struct dentry *parent = sai->sai_dentry;
1053         struct inode *dir = parent->d_inode;
1054         struct ll_inode_info *plli = ll_i2info(dir);
1055         struct ll_inode_info *clli;
1056
1057         ENTRY;
1058
1059         CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
1060                sai, parent);
1061
1062         while (({set_current_state(TASK_IDLE);
1063                  !kthread_should_stop(); })) {
1064                 spin_lock(&plli->lli_agl_lock);
1065                 clli = list_first_entry_or_null(&sai->sai_agls,
1066                                                 struct ll_inode_info,
1067                                                 lli_agl_list);
1068                 if (clli) {
1069                         __set_current_state(TASK_RUNNING);
1070                         list_del_init(&clli->lli_agl_list);
1071                         spin_unlock(&plli->lli_agl_lock);
1072                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
1073                         cond_resched();
1074                 } else {
1075                         spin_unlock(&plli->lli_agl_lock);
1076                         schedule();
1077                 }
1078         }
1079         __set_current_state(TASK_RUNNING);
1080         RETURN(0);
1081 }
1082
1083 static void ll_stop_agl(struct ll_statahead_info *sai)
1084 {
1085         struct dentry *parent = sai->sai_dentry;
1086         struct ll_inode_info *plli = ll_i2info(parent->d_inode);
1087         struct ll_inode_info *clli;
1088         struct task_struct *agl_task;
1089
1090         spin_lock(&plli->lli_agl_lock);
1091         agl_task = sai->sai_agl_task;
1092         sai->sai_agl_task = NULL;
1093         spin_unlock(&plli->lli_agl_lock);
1094         if (!agl_task)
1095                 return;
1096
1097         CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
1098                sai, (unsigned int)agl_task->pid);
1099         kthread_stop(agl_task);
1100
1101         spin_lock(&plli->lli_agl_lock);
1102         while ((clli = list_first_entry_or_null(&sai->sai_agls,
1103                                                 struct ll_inode_info,
1104                                                 lli_agl_list)) != NULL) {
1105                 list_del_init(&clli->lli_agl_list);
1106                 spin_unlock(&plli->lli_agl_lock);
1107                 clli->lli_agl_index = 0;
1108                 iput(&clli->lli_vfs_inode);
1109                 spin_lock(&plli->lli_agl_lock);
1110         }
1111         spin_unlock(&plli->lli_agl_lock);
1112         CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
1113                sai, parent);
1114         ll_sai_put(sai);
1115 }
1116
1117 /* start agl thread */
1118 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
1119 {
1120         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1121         struct ll_inode_info *plli;
1122         struct task_struct *task;
1123
1124         ENTRY;
1125
1126         CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
1127                sai, parent);
1128
1129         plli = ll_i2info(parent->d_inode);
1130         task = kthread_create_on_node(ll_agl_thread, sai, node, "ll_agl_%d",
1131                                       plli->lli_opendir_pid);
1132         if (IS_ERR(task)) {
1133                 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
1134                 RETURN_EXIT;
1135         }
1136         sai->sai_agl_task = task;
1137         atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
1138         /* Get an extra reference that the thread holds */
1139         __ll_sai_get(sai);
1140
1141         wake_up_process(task);
1142
1143         EXIT;
1144 }
1145
1146 static int ll_statahead_by_list(struct dentry *parent)
1147 {
1148         struct inode *dir = parent->d_inode;
1149         struct ll_inode_info *lli = ll_i2info(dir);
1150         struct ll_statahead_info *sai = lli->lli_sai;
1151         struct ll_sb_info *sbi = ll_i2sbi(dir);
1152         struct md_op_data *op_data;
1153         struct page *page = NULL;
1154         __u64 pos = 0;
1155         int first = 0;
1156         int rc = 0;
1157
1158         ENTRY;
1159
1160         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1161                sai, parent);
1162
1163         OBD_ALLOC_PTR(op_data);
1164         if (!op_data)
1165                 RETURN(-ENOMEM);
1166
1167         while (pos != MDS_DIR_END_OFF &&
1168                /* matches smp_store_release() in ll_deauthorize_statahead() */
1169                smp_load_acquire(&sai->sai_task) &&
1170                lli->lli_sa_enabled) {
1171                 struct lu_dirpage *dp;
1172                 struct lu_dirent  *ent;
1173
1174                 op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
1175                                              LUSTRE_OPC_ANY, dir);
1176                 if (IS_ERR(op_data)) {
1177                         rc = PTR_ERR(op_data);
1178                         break;
1179                 }
1180
1181                 page = ll_get_dir_page(dir, op_data, pos, NULL);
1182                 ll_unlock_md_op_lsm(op_data);
1183                 if (IS_ERR(page)) {
1184                         rc = PTR_ERR(page);
1185                         CDEBUG(D_READA,
1186                                "error reading dir "DFID" at %llu /%llu opendir_pid = %u: rc = %d\n",
1187                                PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1188                                lli->lli_opendir_pid, rc);
1189                         break;
1190                 }
1191
1192                 dp = page_address(page);
1193                 for (ent = lu_dirent_start(dp);
1194                      /* matches smp_store_release() in ll_deauthorize_statahead() */
1195                      ent != NULL && smp_load_acquire(&sai->sai_task) &&
1196                      !sa_low_hit(sai) && lli->lli_sa_enabled;
1197                      ent = lu_dirent_next(ent)) {
1198                         __u64 hash;
1199                         int namelen;
1200                         char *name;
1201                         struct lu_fid fid;
1202                         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1203
1204                         hash = le64_to_cpu(ent->lde_hash);
1205                         if (unlikely(hash < pos))
1206                                 /*
1207                                  * Skip until we find target hash value.
1208                                  */
1209                                 continue;
1210
1211                         namelen = le16_to_cpu(ent->lde_namelen);
1212                         if (unlikely(namelen == 0))
1213                                 /*
1214                                  * Skip dummy record.
1215                                  */
1216                                 continue;
1217
1218                         name = ent->lde_name;
1219                         if (name[0] == '.') {
1220                                 if (namelen == 1) {
1221                                         /*
1222                                          * skip "."
1223                                          */
1224                                         continue;
1225                                 } else if (name[1] == '.' && namelen == 2) {
1226                                         /*
1227                                          * skip ".."
1228                                          */
1229                                         continue;
1230                                 } else if (!sai->sai_ls_all) {
1231                                         /*
1232                                          * skip hidden files.
1233                                          */
1234                                         sai->sai_skip_hidden++;
1235                                         continue;
1236                                 }
1237                         }
1238
1239                         /*
1240                          * don't stat-ahead first entry.
1241                          */
1242                         if (unlikely(++first == 1))
1243                                 continue;
1244
1245                         fid_le_to_cpu(&fid, &ent->lde_fid);
1246
1247                         while (({set_current_state(TASK_IDLE);
1248                                  /* matches smp_store_release() in
1249                                   * ll_deauthorize_statahead()
1250                                   */
1251                                  smp_load_acquire(&sai->sai_task); })) {
1252                                 long timeout;
1253
1254                                 spin_lock(&lli->lli_agl_lock);
1255                                 while (sa_sent_full(sai) &&
1256                                        !agl_list_empty(sai)) {
1257                                         struct ll_inode_info *clli;
1258
1259                                         __set_current_state(TASK_RUNNING);
1260                                         clli = agl_first_entry(sai);
1261                                         list_del_init(&clli->lli_agl_list);
1262                                         spin_unlock(&lli->lli_agl_lock);
1263
1264                                         ll_agl_trigger(&clli->lli_vfs_inode,
1265                                                        sai);
1266                                         cond_resched();
1267                                         spin_lock(&lli->lli_agl_lock);
1268                                 }
1269                                 spin_unlock(&lli->lli_agl_lock);
1270
1271                                 if (!sa_sent_full(sai))
1272                                         break;
1273
1274                                 /*
1275                                  * If the thread is not doing stat in
1276                                  * @sbi->ll_sa_timeout (30s) then it probably
1277                                  * does not care too much about performance,
1278                                  * or is no longer using this directory.
1279                                  * Stop the statahead thread in this case.
1280                                  */
1281                                 timeout = schedule_timeout(
1282                                         cfs_time_seconds(sbi->ll_sa_timeout));
1283                                 if (timeout == 0) {
1284                                         lli->lli_sa_enabled = 0;
1285                                         break;
1286                                 }
1287                         }
1288                         __set_current_state(TASK_RUNNING);
1289
1290                         if (IS_ENCRYPTED(dir)) {
1291                                 struct llcrypt_str de_name =
1292                                         LLTR_INIT(ent->lde_name, namelen);
1293                                 struct lu_fid fid;
1294
1295                                 rc = llcrypt_fname_alloc_buffer(dir, NAME_MAX,
1296                                                                 &lltr);
1297                                 if (rc < 0)
1298                                         continue;
1299
1300                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1301                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1302                                                          &lltr, &fid)) {
1303                                         llcrypt_fname_free_buffer(&lltr);
1304                                         continue;
1305                                 }
1306
1307                                 name = lltr.name;
1308                                 namelen = lltr.len;
1309                         }
1310
1311                         sa_statahead(sai, parent, name, namelen, &fid);
1312                         llcrypt_fname_free_buffer(&lltr);
1313                 }
1314
1315                 pos = le64_to_cpu(dp->ldp_hash_end);
1316                 ll_release_page(dir, page,
1317                                 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1318
1319                 if (sa_low_hit(sai)) {
1320                         rc = -EFAULT;
1321                         atomic_inc(&sbi->ll_sa_wrong);
1322                         CDEBUG(D_READA,
1323                                "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stoppingstatahead thread: pid %d\n",
1324                                PFID(&lli->lli_fid), sai->sai_hit,
1325                                sai->sai_miss, sai->sai_sent,
1326                                sai->sai_replied, current->pid);
1327                         break;
1328                 }
1329         }
1330         ll_finish_md_op_data(op_data);
1331
1332         RETURN(rc);
1333 }
1334
1335 static void ll_statahead_handle(struct ll_statahead_info *sai,
1336                                 struct dentry *parent, const char *name,
1337                                 int len, const struct lu_fid *fid)
1338 {
1339         struct inode *dir = parent->d_inode;
1340         struct ll_inode_info *lli = ll_i2info(dir);
1341         struct ll_sb_info *sbi = ll_i2sbi(dir);
1342         long timeout;
1343
1344         while (({set_current_state(TASK_IDLE);
1345                 /* matches smp_store_release() in ll_deauthorize_statahead() */
1346                  smp_load_acquire(&sai->sai_task); })) {
1347                 spin_lock(&lli->lli_agl_lock);
1348                 while (sa_sent_full(sai) && !agl_list_empty(sai)) {
1349                         struct ll_inode_info *clli;
1350
1351                         __set_current_state(TASK_RUNNING);
1352                         clli = agl_first_entry(sai);
1353                         list_del_init(&clli->lli_agl_list);
1354                         spin_unlock(&lli->lli_agl_lock);
1355
1356                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
1357                         cond_resched();
1358                         spin_lock(&lli->lli_agl_lock);
1359                 }
1360                 spin_unlock(&lli->lli_agl_lock);
1361
1362                 if (!sa_sent_full(sai))
1363                         break;
1364
1365                 /*
1366                  * If the thread is not doing a stat in 30s then it probably
1367                  * does not care too much about performance, or is no longer
1368                  * using this directory. Stop the statahead thread in this case.
1369                  */
1370                 timeout = schedule_timeout(
1371                                 cfs_time_seconds(sbi->ll_sa_timeout));
1372                 if (timeout == 0) {
1373                         lli->lli_sa_enabled = 0;
1374                         break;
1375                 }
1376         }
1377         __set_current_state(TASK_RUNNING);
1378
1379         sa_statahead(sai, parent, name, len, fid);
1380 }
1381
1382 static int ll_statahead_by_advise(struct ll_statahead_info *sai,
1383                                   struct dentry *parent)
1384 {
1385         struct inode *dir = parent->d_inode;
1386         struct ll_inode_info *lli = ll_i2info(dir);
1387         struct ll_sb_info *sbi = ll_i2sbi(dir);
1388         size_t max_len;
1389         size_t len;
1390         char *fname;
1391         char *ptr;
1392         int rc = 0;
1393         __u64 i = 0;
1394
1395         ENTRY;
1396
1397         CDEBUG(D_READA, "%s: ADVISE statahead: parent %pd fname prefix %s\n",
1398                sbi->ll_fsname, parent, sai->sai_fname);
1399
1400         OBD_ALLOC(fname, NAME_MAX);
1401         if (fname == NULL)
1402                 RETURN(-ENOMEM);
1403
1404         len = strlen(sai->sai_fname);
1405         memcpy(fname, sai->sai_fname, len);
1406         max_len = sizeof(sai->sai_fname) - len;
1407         ptr = fname + len;
1408
1409         /* matches smp_store_release() in ll_deauthorize_statahead() */
1410         while (smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled) {
1411                 size_t numlen;
1412
1413                 numlen = snprintf(ptr, max_len, "%llu",
1414                                   sai->sai_fstart + i);
1415
1416                 ll_statahead_handle(sai, parent, fname, len + numlen, NULL);
1417                 if (++i >= sai->sai_fend)
1418                         break;
1419         }
1420
1421         OBD_FREE(fname, NAME_MAX);
1422         RETURN(rc);
1423 }
1424
1425 static int ll_statahead_by_fname(struct ll_statahead_info *sai,
1426                                  struct dentry *parent)
1427 {
1428         struct inode *dir = parent->d_inode;
1429         struct ll_inode_info *lli = ll_i2info(dir);
1430         struct ll_sb_info *sbi = ll_i2sbi(dir);
1431         size_t max_len;
1432         size_t len;
1433         char *fname;
1434         char *ptr;
1435         int rc = 0;
1436
1437         ENTRY;
1438
1439         CDEBUG(D_READA, "%s: FNAME statahead: parent %pd fname prefix %s\n",
1440                sbi->ll_fsname, parent, sai->sai_fname);
1441
1442         OBD_ALLOC(fname, NAME_MAX);
1443         if (fname == NULL)
1444                 RETURN(-ENOMEM);
1445
1446         len = strlen(sai->sai_fname);
1447         memcpy(fname, sai->sai_fname, len);
1448         max_len = sizeof(sai->sai_fname) - len;
1449         ptr = fname + len;
1450
1451         /* matches smp_store_release() in ll_deauthorize_statahead() */
1452         while (smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled) {
1453                 size_t numlen;
1454
1455                 if (sai->sai_fname_zeroed_len)
1456                         numlen = snprintf(ptr, max_len, "%0*llu",
1457                                           sai->sai_fname_zeroed_len,
1458                                           ++sai->sai_fname_index);
1459                 else
1460                         numlen = snprintf(ptr, max_len, "%llu",
1461                                           ++sai->sai_fname_index);
1462
1463                 ll_statahead_handle(sai, parent, fname, len + numlen, NULL);
1464
1465                 if (sa_low_hit(sai)) {
1466                         rc = -EFAULT;
1467                         atomic_inc(&sbi->ll_sa_wrong);
1468                         CDEBUG(D_CACHE, "%s: low hit ratio for %pd "DFID": hit=%llu miss=%llu sent=%llu replied=%llu, stopping PID %d\n",
1469                                sbi->ll_fsname, parent, PFID(ll_inode2fid(dir)),
1470                                sai->sai_hit, sai->sai_miss, sai->sai_sent,
1471                                sai->sai_replied, current->pid);
1472                         break;
1473                 }
1474         }
1475
1476         OBD_FREE(fname, NAME_MAX);
1477         RETURN(rc);
1478 }
1479
1480 /* statahead thread main function */
1481 static int ll_statahead_thread(void *arg)
1482 {
1483         struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
1484         struct dentry *parent = sai->sai_dentry;
1485         struct inode *dir = parent->d_inode;
1486         struct ll_inode_info *lli = ll_i2info(dir);
1487         struct ll_sb_info *sbi = ll_i2sbi(dir);
1488         struct lu_batch *bh = NULL;
1489         int rc = 0;
1490
1491         ENTRY;
1492
1493         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1494                sai, parent);
1495
1496         sai->sai_max_batch_count = sbi->ll_sa_batch_max;
1497         if (sai->sai_max_batch_count) {
1498                 bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
1499                                      sai->sai_max_batch_count);
1500                 if (IS_ERR(bh))
1501                         GOTO(out_stop_agl, rc = PTR_ERR(bh));
1502         }
1503
1504         sai->sai_bh = bh;
1505
1506         switch (lli->lli_sa_pattern & LSA_PATTERN_MASK) {
1507         case LSA_PATTERN_LIST:
1508                 rc = ll_statahead_by_list(parent);
1509                 break;
1510         case LSA_PATTERN_ADVISE:
1511                 rc = ll_statahead_by_advise(sai, parent);
1512                 break;
1513         case LSA_PATTERN_FNAME:
1514                 rc = ll_statahead_by_fname(sai, parent);
1515                 break;
1516         default:
1517                 rc = -EFAULT;
1518                 break;
1519         }
1520
1521         if (rc < 0) {
1522                 spin_lock(&lli->lli_sa_lock);
1523                 sai->sai_task = NULL;
1524                 lli->lli_sa_enabled = 0;
1525                 spin_unlock(&lli->lli_sa_lock);
1526         }
1527
1528         ll_statahead_flush_nowait(sai);
1529
1530         /*
1531          * statahead is finished, but statahead entries need to be cached, wait
1532          * for file release closedir() call to stop me.
1533          */
1534         while (({set_current_state(TASK_IDLE);
1535                 /* matches smp_store_release() in ll_deauthorize_statahead() */
1536                 smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled; })) {
1537                 schedule();
1538         }
1539         __set_current_state(TASK_RUNNING);
1540
1541         EXIT;
1542
1543         if (bh) {
1544                 rc = md_batch_stop(ll_i2mdexp(dir), sai->sai_bh);
1545                 sai->sai_bh = NULL;
1546         }
1547
1548 out_stop_agl:
1549         ll_stop_agl(sai);
1550
1551         /*
1552          * wait for inflight statahead RPCs to finish, and then we can free sai
1553          * safely because statahead RPC will access sai data
1554          */
1555         while (sai->sai_sent != sai->sai_replied)
1556                 /* in case we're not woken up, timeout wait */
1557                 msleep(125);
1558
1559         CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd hit %llu miss %llu\n",
1560                sbi->ll_fsname, sai, parent, sai->sai_hit, sai->sai_miss);
1561
1562         spin_lock(&lli->lli_sa_lock);
1563         sai->sai_task = NULL;
1564         spin_unlock(&lli->lli_sa_lock);
1565         wake_up(&sai->sai_waitq);
1566
1567         atomic_add(sai->sai_hit, &sbi->ll_sa_hit_total);
1568         atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
1569
1570         ll_sai_put(sai);
1571         ll_sax_put(dir, lli->lli_sax);
1572
1573         return rc;
1574 }
1575
1576 /* authorize opened dir handle @key to statahead */
1577 void ll_authorize_statahead(struct inode *dir, void *key)
1578 {
1579         struct ll_inode_info *lli = ll_i2info(dir);
1580
1581         spin_lock(&lli->lli_sa_lock);
1582         if (!lli->lli_opendir_key && !lli->lli_sai) {
1583                 /*
1584                  * if lli_sai is not NULL, it means previous statahead is not
1585                  * finished yet, we'd better not start a new statahead for now.
1586                  */
1587                 lli->lli_opendir_key = key;
1588                 lli->lli_opendir_pid = current->pid;
1589                 lli->lli_sa_enabled = 1;
1590         }
1591         spin_unlock(&lli->lli_sa_lock);
1592 }
1593
1594 static void ll_deauthorize_statahead_advise(struct inode *dir, void *key)
1595 {
1596         struct ll_inode_info *lli = ll_i2info(dir);
1597         struct ll_file_data *fd = (struct ll_file_data *)key;
1598         struct ll_statahead_info *sai = fd->fd_sai;
1599
1600         if (sai == NULL)
1601                 return;
1602
1603         spin_lock(&lli->lli_sa_lock);
1604         if (sai->sai_task) {
1605                 struct task_struct *task = sai->sai_task;
1606
1607                 /* matches smp_load_acquire() in ll_statahead_thread() */
1608                 smp_store_release(&sai->sai_task, NULL);
1609                 wake_up_process(task);
1610         }
1611         fd->fd_sai = NULL;
1612         spin_unlock(&lli->lli_sa_lock);
1613         ll_sai_put(sai);
1614         LASSERT(lli->lli_sax != NULL);
1615         ll_sax_put(dir, lli->lli_sax);
1616 }
1617
1618 /*
1619  * deauthorize opened dir handle @key to statahead, and notify statahead thread
1620  * to quit if it's running.
1621  */
1622 void ll_deauthorize_statahead(struct inode *dir, void *key)
1623 {
1624         struct ll_inode_info *lli = ll_i2info(dir);
1625         struct ll_statahead_info *sai;
1626
1627         CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1628                PFID(&lli->lli_fid));
1629
1630         if (lli->lli_sa_pattern & LSA_PATTERN_ADVISE) {
1631                 ll_deauthorize_statahead_advise(dir, key);
1632                 return;
1633         }
1634
1635         LASSERT(lli->lli_opendir_pid != 0);
1636         LASSERT(lli->lli_opendir_key == key);
1637         spin_lock(&lli->lli_sa_lock);
1638         lli->lli_opendir_key = NULL;
1639         lli->lli_opendir_pid = 0;
1640         lli->lli_sa_enabled = 0;
1641         lli->lli_sa_pattern = LSA_PATTERN_NONE;
1642         lli->lli_sa_fname_index = 0;
1643         lli->lli_sa_match_count = 0;
1644         sai = lli->lli_sai;
1645         if (sai && sai->sai_task) {
1646                 /*
1647                  * statahead thread may not have quit yet because it needs to
1648                  * cache entries, now it's time to tell it to quit.
1649                  *
1650                  * wake_up_process() provides the necessary barriers
1651                  * to pair with set_current_state().
1652                  */
1653                 struct task_struct *task = sai->sai_task;
1654
1655                 /* matches smp_load_acquire() in ll_statahead_thread() */
1656                 smp_store_release(&sai->sai_task, NULL);
1657                 wake_up_process(task);
1658         }
1659         spin_unlock(&lli->lli_sa_lock);
1660 }
1661
1662 enum {
1663         /**
1664          * not first dirent, or is "."
1665          */
1666         LS_NOT_FIRST_DE = 0,
1667         /**
1668          * the first non-hidden dirent
1669          */
1670         LS_FIRST_DE,
1671         /**
1672          * the first hidden dirent, that is "."
1673          */
1674         LS_FIRST_DOT_DE
1675 };
1676
1677 /* file is first dirent under @dir */
1678 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1679 {
1680         struct qstr *target = &dentry->d_name;
1681         struct md_op_data *op_data;
1682         int dot_de;
1683         struct page *page = NULL;
1684         int rc = LS_NOT_FIRST_DE;
1685         __u64 pos = 0;
1686         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1687
1688         ENTRY;
1689
1690         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1691                                      LUSTRE_OPC_ANY, dir);
1692         if (IS_ERR(op_data))
1693                 RETURN(PTR_ERR(op_data));
1694
1695         if (IS_ENCRYPTED(dir)) {
1696                 int rc2 = llcrypt_fname_alloc_buffer(dir, NAME_MAX, &lltr);
1697
1698                 if (rc2 < 0)
1699                         RETURN(rc2);
1700         }
1701
1702         /**
1703          *FIXME choose the start offset of the readdir
1704          */
1705
1706         page = ll_get_dir_page(dir, op_data, 0, NULL);
1707
1708         while (1) {
1709                 struct lu_dirpage *dp;
1710                 struct lu_dirent  *ent;
1711
1712                 if (IS_ERR(page)) {
1713                         struct ll_inode_info *lli = ll_i2info(dir);
1714
1715                         rc = PTR_ERR(page);
1716                         CERROR("%s: reading dir "DFID" at %llu opendir_pid = %u : rc = %d\n",
1717                                ll_i2sbi(dir)->ll_fsname,
1718                                PFID(ll_inode2fid(dir)), pos,
1719                                lli->lli_opendir_pid, rc);
1720                         break;
1721                 }
1722
1723                 dp = page_address(page);
1724                 for (ent = lu_dirent_start(dp); ent != NULL;
1725                      ent = lu_dirent_next(ent)) {
1726                         __u64 hash;
1727                         int namelen;
1728                         char *name;
1729
1730                         hash = le64_to_cpu(ent->lde_hash);
1731                         /*
1732                          * The ll_get_dir_page() can return any page containing
1733                          * the given hash which may be not the start hash.
1734                          */
1735                         if (unlikely(hash < pos))
1736                                 continue;
1737
1738                         namelen = le16_to_cpu(ent->lde_namelen);
1739                         if (unlikely(namelen == 0))
1740                                 /*
1741                                  * skip dummy record.
1742                                  */
1743                                 continue;
1744
1745                         name = ent->lde_name;
1746                         if (name[0] == '.') {
1747                                 if (namelen == 1)
1748                                         /*
1749                                          * skip "."
1750                                          */
1751                                         continue;
1752                                 else if (name[1] == '.' && namelen == 2)
1753                                         /*
1754                                          * skip ".."
1755                                          */
1756                                         continue;
1757                                 else
1758                                         dot_de = 1;
1759                         } else {
1760                                 dot_de = 0;
1761                         }
1762
1763                         if (dot_de && target->name[0] != '.') {
1764                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1765                                        target->len, target->name,
1766                                        namelen, name);
1767                                 continue;
1768                         }
1769
1770                         if (IS_ENCRYPTED(dir)) {
1771                                 struct llcrypt_str de_name =
1772                                         LLTR_INIT(ent->lde_name, namelen);
1773                                 struct lu_fid fid;
1774
1775                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1776                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1777                                                          &lltr, &fid))
1778                                         continue;
1779                                 name = lltr.name;
1780                                 namelen = lltr.len;
1781                         }
1782
1783                         if (target->len != namelen ||
1784                             memcmp(target->name, name, namelen) != 0)
1785                                 rc = LS_NOT_FIRST_DE;
1786                         else if (!dot_de)
1787                                 rc = LS_FIRST_DE;
1788                         else
1789                                 rc = LS_FIRST_DOT_DE;
1790
1791                         ll_release_page(dir, page, false);
1792                         GOTO(out, rc);
1793                 }
1794                 pos = le64_to_cpu(dp->ldp_hash_end);
1795                 if (pos == MDS_DIR_END_OFF) {
1796                         /*
1797                          * End of directory reached.
1798                          */
1799                         ll_release_page(dir, page, false);
1800                         GOTO(out, rc);
1801                 } else {
1802                         /*
1803                          * chain is exhausted
1804                          * Normal case: continue to the next page.
1805                          */
1806                         ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1807                                               LDF_COLLIDE);
1808                         page = ll_get_dir_page(dir, op_data, pos, NULL);
1809                 }
1810         }
1811         EXIT;
1812 out:
1813         llcrypt_fname_free_buffer(&lltr);
1814         ll_finish_md_op_data(op_data);
1815
1816         return rc;
1817 }
1818
1819 /**
1820  * revalidate @dentryp from statahead cache
1821  *
1822  * \param[in] dir       parent directory
1823  * \param[in] sai       sai structure
1824  * \param[out] dentryp  pointer to dentry which will be revalidated
1825  * \param[in] unplug    unplug statahead window only (normally for negative
1826  *                      dentry)
1827  * \retval              1 on success, dentry is saved in @dentryp
1828  * \retval              0 if revalidation failed (no proper lock on client)
1829  * \retval              negative number upon error
1830  */
1831 static int revalidate_statahead_dentry(struct inode *dir,
1832                                        struct ll_statahead_context *ctx,
1833                                        struct dentry **dentryp,
1834                                        bool unplug)
1835 {
1836         struct sa_entry *entry = NULL;
1837         struct ll_inode_info *lli = ll_i2info(dir);
1838         struct ll_statahead_info *sai = lli->lli_sai;
1839         int rc = 0;
1840
1841         ENTRY;
1842
1843         if (sai && (*dentryp)->d_name.name[0] == '.') {
1844                 if (sai->sai_ls_all ||
1845                     sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1846                         /*
1847                          * Hidden dentry is the first one, or statahead
1848                          * thread does not skip so many hidden dentries
1849                          * before "sai_ls_all" enabled as below.
1850                          */
1851                 } else {
1852                         if (!sai->sai_ls_all)
1853                                 /*
1854                                  * It maybe because hidden dentry is not
1855                                  * the first one, "sai_ls_all" was not
1856                                  * set, then "ls -al" missed. Enable
1857                                  * "sai_ls_all" for such case.
1858                                  */
1859                                 sai->sai_ls_all = 1;
1860
1861                         /*
1862                          * Such "getattr" has been skipped before
1863                          * "sai_ls_all" enabled as above.
1864                          */
1865                         sai->sai_miss_hidden++;
1866                         RETURN(-EAGAIN);
1867                 }
1868         }
1869
1870         if (unplug)
1871                 GOTO(out, rc = 1);
1872
1873         entry = sa_get(ctx, &(*dentryp)->d_name);
1874         if (!entry)
1875                 GOTO(out, rc = -EAGAIN);
1876
1877         if (lli->lli_sa_pattern & LSA_PATTERN_LIST ||
1878             lli->lli_sa_pattern & LSA_PATTERN_FNAME)
1879                 LASSERT(sai == entry->se_sai);
1880         else if (lli->lli_sa_pattern == LSA_PATTERN_ADVISE)
1881                 sai = entry->se_sai;
1882
1883         LASSERT(sai != NULL);
1884         if (!sa_ready(entry)) {
1885                 spin_lock(&lli->lli_sa_lock);
1886                 sai->sai_index_wait = entry->se_index;
1887                 spin_unlock(&lli->lli_sa_lock);
1888                 rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
1889                                              cfs_time_seconds(30));
1890                 if (rc == 0) {
1891                         /*
1892                          * entry may not be ready, so it may be used by inflight
1893                          * statahead RPC, don't free it.
1894                          */
1895                         entry = NULL;
1896                         GOTO(out, rc = -EAGAIN);
1897                 }
1898         }
1899
1900         /*
1901          * We need to see the value that was set immediately before we
1902          * were woken up.
1903          */
1904         if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
1905             entry->se_inode) {
1906                 struct inode *inode = entry->se_inode;
1907                 struct lookup_intent it = { .it_op = IT_GETATTR,
1908                                             .it_lock_handle =
1909                                                 entry->se_handle };
1910                 __u64 bits;
1911
1912                 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1913                                         ll_inode2fid(inode), &bits);
1914                 if (rc == 1) {
1915                         if (!(*dentryp)->d_inode) {
1916                                 struct dentry *alias;
1917
1918                                 alias = ll_splice_alias(inode, *dentryp);
1919                                 if (IS_ERR(alias)) {
1920                                         ll_intent_release(&it);
1921                                         GOTO(out, rc = PTR_ERR(alias));
1922                                 }
1923                                 *dentryp = alias;
1924                                 /*
1925                                  * statahead prepared this inode, transfer inode
1926                                  * refcount from sa_entry to dentry
1927                                  */
1928                                 entry->se_inode = NULL;
1929                         } else if ((*dentryp)->d_inode != inode) {
1930                                 /* revalidate, but inode is recreated */
1931                                 CDEBUG(D_READA,
1932                                        "%s: stale dentry %pd inode " DFID", statahead inode "DFID "\n",
1933                                        ll_i2sbi(inode)->ll_fsname, *dentryp,
1934                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
1935                                        PFID(ll_inode2fid(inode)));
1936                                 ll_intent_release(&it);
1937                                 GOTO(out, rc = -ESTALE);
1938                         }
1939
1940                         if (bits & MDS_INODELOCK_LOOKUP) {
1941                                 d_lustre_revalidate(*dentryp);
1942                                 if (S_ISDIR(inode->i_mode))
1943                                         ll_update_dir_depth_dmv(dir, *dentryp);
1944                         }
1945
1946                         ll_intent_release(&it);
1947                 }
1948         }
1949 out:
1950         /*
1951          * statahead cached sa_entry can be used only once, and will be killed
1952          * right after use, so if lookup/revalidate accessed statahead cache,
1953          * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
1954          * stat this file again, we know we've done statahead before, see
1955          * dentry_may_statahead().
1956          */
1957         if (lld_is_init(*dentryp))
1958                 ll_d2d(*dentryp)->lld_sa_generation = lli->lli_sa_generation;
1959         sa_put(dir, sai, entry);
1960
1961         RETURN(rc);
1962 }
1963
1964 static inline bool
1965 sa_pattern_list_detect(struct inode *dir, struct dentry *dchild, int *first)
1966 {
1967         struct ll_inode_info *lli = ll_i2info(dir);
1968
1969         if (lli->lli_opendir_pid == 0)
1970                 return false;
1971
1972         if (lli->lli_sa_enabled == 0)
1973                 return false;
1974
1975         if (lli->lli_sa_pattern & LSA_PATTERN_LS_NOT_FIRST_DE)
1976                 return false;
1977
1978         *first = is_first_dirent(dir, dchild);
1979         if (*first == LS_NOT_FIRST_DE) {
1980                 /*
1981                  * It is not "ls -{a}l" operation, no need statahead for it.
1982                  * Disable statahead so that subsequent stat() won't waste
1983                  * time to try it.
1984                  */
1985                 spin_lock(&lli->lli_sa_lock);
1986                 if (lli->lli_opendir_pid == current->pid) {
1987                         lli->lli_sa_enabled = 0;
1988                         lli->lli_sa_pattern |= LSA_PATTERN_LS_NOT_FIRST_DE;
1989                 }
1990                 spin_unlock(&lli->lli_sa_lock);
1991                 return false;
1992         }
1993
1994         spin_lock(&lli->lli_sa_lock);
1995         lli->lli_sa_pattern |= LSA_PATTERN_LIST;
1996         spin_unlock(&lli->lli_sa_lock);
1997         return true;
1998 }
1999
2000 static inline bool
2001 sa_pattern_fname_detect(struct inode *dir, struct dentry *dchild)
2002 {
2003         struct ll_inode_info *lli = ll_i2info(dir);
2004         struct qstr *dname = &dchild->d_name;
2005         const unsigned char *name = dname->name;
2006         bool rc = false;
2007         int i;
2008
2009         if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
2010                 return false;
2011
2012         /*
2013          * Parse the format of the file name to determine whether it matches
2014          * the supported file name pattern for statahead (i.e. mdtest.$i).
2015          */
2016         i = dname->len - 1;
2017         if (isdigit(name[i])) {
2018                 long num;
2019                 int ret;
2020
2021                 while (--i >= 0 && isdigit(name[i]))
2022                         ; /* do nothing */
2023                 i++;
2024                 ret = kstrtol(&name[i], 0, &num);
2025                 if (ret)
2026                         GOTO(out, rc);
2027
2028                 /*
2029                  * The traversing program do multiple stat() calls on the same
2030                  * children entry. i.e. ls $dir*.
2031                  */
2032                 if (lli->lli_sa_fname_index == num)
2033                         return false;
2034
2035                 if (lli->lli_sa_match_count == 0 ||
2036                     num == lli->lli_sa_fname_index + 1) {
2037                         lli->lli_sa_match_count++;
2038                         lli->lli_sa_fname_index = num;
2039
2040                         if (lli->lli_sa_match_count > LSA_FN_MATCH_HIT) {
2041                                 spin_lock(&lli->lli_sa_lock);
2042                                 lli->lli_sa_pattern |= LSA_PATTERN_FN_UNIQUE;
2043                                 spin_unlock(&lli->lli_sa_lock);
2044                                 GOTO(out, rc = true);
2045                         }
2046
2047                         return false;
2048                 }
2049         }
2050 out:
2051         spin_lock(&lli->lli_sa_lock);
2052         if (rc) {
2053                 lli->lli_sa_pattern |= LSA_PATTERN_FNAME;
2054         } else {
2055                 lli->lli_sa_pattern = LSA_PATTERN_NONE;
2056                 lli->lli_sa_match_count = 0;
2057                 lli->lli_sa_fname_index = 0;
2058                 lli->lli_sa_enabled = 0;
2059         }
2060         spin_unlock(&lli->lli_sa_lock);
2061
2062         return rc;
2063 }
2064
2065 /* detect the statahead pattern. */
2066 static inline bool
2067 sa_pattern_detect(struct inode *dir, struct dentry *dchild, int *first)
2068 {
2069         return sa_pattern_list_detect(dir, dchild, first) ||
2070                sa_pattern_fname_detect(dir, dchild);
2071 }
2072
2073 /**
2074  * start statahead thread
2075  *
2076  * \param[in] dir       parent directory
2077  * \param[in] dentry    dentry that triggers statahead, normally the first
2078  *                      dirent under @dir
2079  * \param[in] agl       indicate whether AGL is needed
2080  * \retval              -EAGAIN on success, because when this function is
2081  *                      called, it's already in lookup call, so client should
2082  *                      do it itself instead of waiting for statahead thread
2083  *                      to do it asynchronously.
2084  * \retval              negative number upon error
2085  */
2086 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
2087                                   bool agl)
2088 {
2089         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
2090         struct ll_inode_info *lli = ll_i2info(dir);
2091         struct ll_statahead_info *sai = NULL;
2092         struct ll_statahead_context *ctx = NULL;
2093         struct dentry *parent;
2094         struct task_struct *task;
2095         struct ll_sb_info *sbi;
2096         int first = LS_FIRST_DE;
2097         int rc = 0;
2098
2099         ENTRY;
2100
2101         if (sa_pattern_detect(dir, dentry, &first) == false)
2102                 RETURN(0);
2103
2104         parent = dget_parent(dentry);
2105         sbi = ll_i2sbi(d_inode(parent));
2106         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
2107                                        sbi->ll_sa_running_max)) {
2108                 CDEBUG(D_READA,
2109                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
2110                 dput(parent);
2111                 GOTO(out, rc = -EMFILE);
2112         }
2113
2114         /* on success ll_sai_alloc holds a ref on parent */
2115         sai = ll_sai_alloc(parent);
2116         dput(parent);
2117         if (!sai)
2118                 GOTO(out, rc = -ENOMEM);
2119
2120         ctx = ll_sax_alloc(dir);
2121         if (!ctx)
2122                 GOTO(out, rc = -ENOMEM);
2123
2124         sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
2125
2126         if (lli->lli_sa_pattern & LSA_PATTERN_FNAME) {
2127                 struct qstr *dname = &dentry->d_name;
2128                 const unsigned char *name = dname->name;
2129                 int rc;
2130                 int i;
2131
2132                 if (dname->len >= sizeof(sai->sai_fname))
2133                         GOTO(out, rc = -ERANGE);
2134
2135                 i = dname->len;
2136                 while (--i >= 0 && isdigit(name[i]))
2137                         ; /* do nothing */
2138                 i++;
2139
2140                 memcpy(sai->sai_fname, dname->name, i);
2141                 sai->sai_fname[i] = '\0';
2142                 sai->sai_fname_index = lli->lli_sa_fname_index;
2143                 /* The front part of the file name is zeroed padding. */
2144                 if (name[i] == '0')
2145                         sai->sai_fname_zeroed_len = dname->len - i;
2146         }
2147
2148         /*
2149          * if current lli_opendir_key was deauthorized, or dir re-opened by
2150          * another process, don't start statahead, otherwise the newly spawned
2151          * statahead thread won't be notified to quit.
2152          */
2153         spin_lock(&lli->lli_sa_lock);
2154         if (unlikely(lli->lli_sai ||
2155                      ((lli->lli_sa_pattern & LSA_PATTERN_LIST) &&
2156                       !lli->lli_opendir_key &&
2157                       lli->lli_opendir_pid != current->pid))) {
2158                 spin_unlock(&lli->lli_sa_lock);
2159                 GOTO(out, rc = -EPERM);
2160         }
2161         lli->lli_sai = sai;
2162         lli->lli_sax = ctx;
2163         spin_unlock(&lli->lli_sa_lock);
2164
2165         CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
2166                current->pid, parent);
2167
2168         task = kthread_create_on_node(ll_statahead_thread, sai, node,
2169                                       "ll_sa_%u", lli->lli_opendir_pid);
2170         if (IS_ERR(task)) {
2171                 spin_lock(&lli->lli_sa_lock);
2172                 lli->lli_sai = NULL;
2173                 spin_unlock(&lli->lli_sa_lock);
2174                 rc = PTR_ERR(task);
2175                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
2176                 GOTO(out, rc);
2177         }
2178
2179         if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
2180                 ll_start_agl(parent, sai);
2181
2182         atomic_inc(&sbi->ll_sa_total);
2183         if (lli->lli_sa_pattern & LSA_PATTERN_LIST)
2184                 atomic_inc(&sbi->ll_sa_list_total);
2185         else if (lli->lli_sa_pattern & LSA_PATTERN_FNAME)
2186                 atomic_inc(&sbi->ll_sa_fname_total);
2187
2188         sai->sai_task = task;
2189         wake_up_process(task);
2190         /*
2191          * We don't stat-ahead for the first dirent since we are already in
2192          * lookup.
2193          */
2194         RETURN(-EAGAIN);
2195
2196 out:
2197         /*
2198          * once we start statahead thread failed, disable statahead so that
2199          * subsequent stat won't waste time to try it.
2200          */
2201         spin_lock(&lli->lli_sa_lock);
2202         if (lli->lli_opendir_pid == current->pid)
2203                 lli->lli_sa_enabled = 0;
2204         spin_unlock(&lli->lli_sa_lock);
2205
2206         if (sai)
2207                 ll_sai_free(sai);
2208
2209         if (ctx)
2210                 ll_sax_free(ctx);
2211
2212         if (first != LS_NOT_FIRST_DE)
2213                 atomic_dec(&sbi->ll_sa_running);
2214
2215         RETURN(rc);
2216 }
2217
2218 /*
2219  * Check whether statahead for @dir was started.
2220  */
2221 static inline bool ll_statahead_started(struct inode *dir, bool agl)
2222 {
2223         struct ll_inode_info *lli = ll_i2info(dir);
2224         struct ll_statahead_context *ctx;
2225         struct ll_statahead_info *sai;
2226
2227         spin_lock(&lli->lli_sa_lock);
2228         ctx = lli->lli_sax;
2229         sai = lli->lli_sai;
2230         if (sai && (sai->sai_agl_task != NULL) != agl)
2231                 CDEBUG(D_READA,
2232                        "%s: Statahead AGL hint changed from %d to %d\n",
2233                        ll_i2sbi(dir)->ll_fsname,
2234                        sai->sai_agl_task != NULL, agl);
2235         spin_unlock(&lli->lli_sa_lock);
2236
2237         return !!ctx;
2238 }
2239
2240 /**
2241  * statahead entry function, this is called when client getattr on a file, it
2242  * will start statahead thread if this is the first dir entry, else revalidate
2243  * dentry from statahead cache.
2244  *
2245  * \param[in]  dir      parent directory
2246  * \param[out] dentryp  dentry to getattr
2247  * \param[in]  agl      whether start the agl thread
2248  *
2249  * \retval              1 on success
2250  * \retval              0 revalidation from statahead cache failed, caller needs
2251  *                      to getattr from server directly
2252  * \retval              negative number on error, caller often ignores this and
2253  *                      then getattr from server
2254  */
2255 int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
2256 {
2257         if (!ll_statahead_started(dir, agl))
2258                 return start_statahead_thread(dir, dentry, agl);
2259         return 0;
2260 }
2261
2262 /**
2263  * revalidate dentry from statahead cache.
2264  *
2265  * \param[in]  dir      parent directory
2266  * \param[out] dentryp  dentry to getattr
2267  * \param[in]  unplug   unplug statahead window only (normally for negative
2268  *                      dentry)
2269  * \retval              1 on success
2270  * \retval              0 revalidation from statahead cache failed, caller needs
2271  *                      to getattr from server directly
2272  * \retval              negative number on error, caller often ignores this and
2273  *                      then getattr from server
2274  */
2275 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
2276                             bool unplug)
2277 {
2278         struct ll_inode_info *lli = ll_i2info(dir);
2279         struct ll_statahead_context *ctx;
2280         struct ll_statahead_info *sai = NULL;
2281         int rc = 0;
2282
2283         spin_lock(&lli->lli_sa_lock);
2284         ctx = lli->lli_sax;
2285         if (ctx) {
2286                 sai = lli->lli_sai;
2287                 if (sai) {
2288                         atomic_inc(&sai->sai_refcount);
2289                 } else if (lli->lli_sa_pattern & LSA_PATTERN_LIST) {
2290                         spin_unlock(&lli->lli_sa_lock);
2291                         return 0;
2292                 }
2293                 __ll_sax_get(ctx);
2294         }
2295         spin_unlock(&lli->lli_sa_lock);
2296         if (ctx) {
2297                 rc = revalidate_statahead_dentry(dir, ctx, dentryp, unplug);
2298                 CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
2299                        *dentryp, rc);
2300                 if (sai)
2301                         ll_sai_put(sai);
2302                 ll_sax_put(dir, ctx);
2303         }
2304         return rc;
2305 }
2306
2307 int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise)
2308 {
2309         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
2310         struct ll_file_data *fd = file->private_data;
2311         struct dentry *dentry = file_dentry(file);
2312         struct inode *dir = dentry->d_inode;
2313         struct ll_inode_info *lli = ll_i2info(dir);
2314         struct ll_sb_info *sbi = ll_i2sbi(dir);
2315         struct ll_statahead_info *sai = NULL;
2316         struct ll_statahead_context *ctx = NULL;
2317         struct task_struct *task;
2318         bool agl = true;
2319         int rc;
2320
2321         ENTRY;
2322
2323         if (sbi->ll_sa_max == 0)
2324                 RETURN(0);
2325
2326         if (!S_ISDIR(dir->i_mode))
2327                 RETURN(-EINVAL);
2328
2329         if (fd->fd_sai) {
2330                 rc = -EALREADY;
2331                 CWARN("%s: already set statahead hint for dir %pd: rc = %d\n",
2332                       sbi->ll_fsname, dentry, rc);
2333                 RETURN(rc);
2334         }
2335
2336         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
2337                                        sbi->ll_sa_running_max)) {
2338                 CDEBUG(D_READA,
2339                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
2340                 GOTO(out, rc = -EMFILE);
2341         }
2342
2343         sai = ll_sai_alloc(dentry);
2344         if (sai == NULL)
2345                 GOTO(out, rc = -ENOMEM);
2346
2347         sai->sai_fstart = ladvise->lla_start;
2348         sai->sai_fend = ladvise->lla_end;
2349         sai->sai_ls_all = 0;
2350         sai->sai_max = sbi->ll_sa_max;
2351         strncpy(sai->sai_fname, ladvise->lla_fname, sizeof(sai->sai_fname));
2352
2353         ctx = ll_sax_get(dir);
2354         if (ctx == NULL) {
2355                 ctx = ll_sax_alloc(dir);
2356                 if (ctx == NULL)
2357                         GOTO(out, rc = -ENOMEM);
2358
2359                 spin_lock(&lli->lli_sa_lock);
2360                 if (unlikely(lli->lli_sax)) {
2361                         struct ll_statahead_context *tmp = ctx;
2362
2363                         if (lli->lli_sa_pattern == LSA_PATTERN_NONE ||
2364                             lli->lli_sa_pattern == LSA_PATTERN_ADVISE) {
2365                                 lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2366                                 ctx = lli->lli_sax;
2367                                 __ll_sax_get(ctx);
2368                                 fd->fd_sai = __ll_sai_get(sai);
2369                                 rc = 0;
2370                         } else {
2371                                 rc = -EINVAL;
2372                                 CWARN("%s: pattern %X is not ADVISE: rc = %d\n",
2373                                       sbi->ll_fsname, lli->lli_sa_pattern, rc);
2374                         }
2375
2376                         spin_unlock(&lli->lli_sa_lock);
2377                         ll_sax_free(tmp);
2378                         if (rc)
2379                                 GOTO(out, rc);
2380                 } else {
2381                         lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2382                         lli->lli_sax = ctx;
2383                         fd->fd_sai = __ll_sai_get(sai);
2384                         spin_unlock(&lli->lli_sa_lock);
2385                 }
2386         } else {
2387                 spin_lock(&lli->lli_sa_lock);
2388                 if (!(lli->lli_sa_pattern == LSA_PATTERN_ADVISE ||
2389                       lli->lli_sa_pattern == LSA_PATTERN_NONE)) {
2390                         spin_unlock(&lli->lli_sa_lock);
2391                         GOTO(out, rc = -EINVAL);
2392                 }
2393
2394                 lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2395                 fd->fd_sai = __ll_sai_get(sai);
2396                 spin_unlock(&lli->lli_sa_lock);
2397         }
2398
2399         __ll_sax_get(ctx);
2400         CDEBUG(D_READA,
2401                "start statahead thread: [pid %d] [parent %pd] sai %p ctx %p\n",
2402                current->pid, dentry, sai, ctx);
2403
2404         task = kthread_create_on_node(ll_statahead_thread, sai, node,
2405                                       "ll_sa_%u", current->pid);
2406         if (IS_ERR(task)) {
2407                 rc = PTR_ERR(task);
2408                 CERROR("%s: cannot start ll_sa thread: rc = %d\n",
2409                        sbi->ll_fsname, rc);
2410                 GOTO(out, rc);
2411         }
2412
2413         if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
2414                 ll_start_agl(dentry, sai);
2415
2416         atomic_inc(&sbi->ll_sa_total);
2417         sai->sai_task = task;
2418         wake_up_process(task);
2419
2420         RETURN(0);
2421 out:
2422         if (fd->fd_sai) {
2423                 ll_sai_put(sai);
2424                 ll_sax_put(dir, ctx);
2425                 fd->fd_sai = NULL;
2426         }
2427
2428         if (sai)
2429                 ll_sai_free(sai);
2430
2431         if (ctx)
2432                 ll_sax_free(ctx);
2433
2434         atomic_dec(&sbi->ll_sa_running);
2435         RETURN(rc);
2436 }
2437
2438 /*
2439  * This function is called in each stat() system call to do statahead check.
2440  * When the files' naming of stat() call sequence under a directory follows
2441  * a certain name rule roughly, this directory is considered as an condicant
2442  * to do statahead.
2443  * For an example, the file naming rule is mdtest.$rank.$i, the suffix of
2444  * the stat() dentry name is number and do stat() for dentries with name
2445  * ending with number more than @LSA_FN_PREDICT_HIT, then the corresponding
2446  * directory is met the requrirement for statahead.
2447  */
2448 void ll_statahead_enter(struct inode *dir, struct dentry *dchild)
2449 {
2450         struct ll_inode_info *lli;
2451         struct qstr *dname = &dchild->d_name;
2452
2453         if (ll_i2sbi(dir)->ll_sa_max == 0)
2454                 return;
2455
2456         if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
2457                 return;
2458
2459         lli = ll_i2info(dir);
2460         if (lli->lli_sa_enabled)
2461                 return;
2462
2463         if (lli->lli_sa_pattern & (LSA_PATTERN_FN_PREDICT | LSA_PATTERN_LIST))
2464                 return;
2465
2466         /*
2467          * Now support number indexing regularized statahead pattern only.
2468          * Quick check whether the last character is digit.
2469          */
2470         if (!isdigit(dname->name[dname->len - 1])) {
2471                 lli->lli_sa_match_count = 0;
2472                 return;
2473         }
2474
2475         lli->lli_sa_match_count++;
2476         if (lli->lli_sa_match_count > LSA_FN_PREDICT_HIT) {
2477                 spin_lock(&lli->lli_sa_lock);
2478                 lli->lli_sa_pattern |= LSA_PATTERN_FN_PREDICT;
2479                 spin_unlock(&lli->lli_sa_lock);
2480                 lli->lli_sa_enabled = 1;
2481                 lli->lli_sa_match_count = 0;
2482         }
2483 }