Whamcloud - gitweb
LU-9680 lnet: Empty route/peer table is not an error
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #include <linux/fs.h>
33 #include <linux/sched.h>
34 #include <linux/kthread.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38 #include <linux/delay.h>
39
40 #define DEBUG_SUBSYSTEM S_LLITE
41
42 #include <obd_support.h>
43 #include <lustre_dlm.h>
44 #include "llite_internal.h"
45
46 #define SA_OMITTED_ENTRY_MAX 8ULL
47
48 typedef enum {
49         /** negative values are for error cases */
50         SA_ENTRY_INIT = 0,      /** init entry */
51         SA_ENTRY_SUCC = 1,      /** stat succeed */
52         SA_ENTRY_INVA = 2,      /** invalid entry */
53 } se_state_t;
54
55 /*
56  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
57  * and in async stat callback ll_statahead_interpret() will prepare the inode
58  * and set lock data in the ptlrpcd context. Then the scanner process will be
59  * woken up if this entry is the waiting one, can access and free it.
60  */
61 struct sa_entry {
62         /* link into sai_entries */
63         struct list_head                 se_list;
64         /* link into sai hash table locally */
65         struct list_head                 se_hash;
66         /* entry index in the sai */
67         __u64                            se_index;
68         /* low layer ldlm lock handle */
69         __u64                            se_handle;
70         /* entry status */
71         se_state_t                       se_state;
72         /* entry size, contains name */
73         int                              se_size;
74         /* pointer to the target inode */
75         struct inode                    *se_inode;
76         /* pointer to @sai per process struct */
77         struct ll_statahead_info        *se_sai;
78         /* entry name */
79         struct qstr                      se_qstr;
80         /* entry fid */
81         struct lu_fid                    se_fid;
82 };
83
84 static unsigned int sai_generation;
85 static DEFINE_SPINLOCK(sai_generation_lock);
86
87 static inline int sa_unhashed(struct sa_entry *entry)
88 {
89         return list_empty(&entry->se_hash);
90 }
91
92 /* sa_entry is ready to use */
93 static inline int sa_ready(struct sa_entry *entry)
94 {
95         /* Make sure sa_entry is updated and ready to use */
96         smp_rmb();
97         return (entry->se_state != SA_ENTRY_INIT);
98 }
99
100 /* hash value to put in sai_cache */
101 static inline int sa_hash(int val)
102 {
103         return val & LL_SA_CACHE_MASK;
104 }
105
106 /* hash entry into sax_cache */
107 static inline void
108 sa_rehash(struct ll_statahead_context *ctx, struct sa_entry *entry)
109 {
110         int i = sa_hash(entry->se_qstr.hash);
111
112         spin_lock(&ctx->sax_cache_lock[i]);
113         list_add_tail(&entry->se_hash, &ctx->sax_cache[i]);
114         spin_unlock(&ctx->sax_cache_lock[i]);
115 }
116
117 /* unhash entry from sai_cache */
118 static inline void
119 sa_unhash(struct ll_statahead_context *ctx, struct sa_entry *entry)
120 {
121         int i = sa_hash(entry->se_qstr.hash);
122
123         spin_lock(&ctx->sax_cache_lock[i]);
124         list_del_init(&entry->se_hash);
125         spin_unlock(&ctx->sax_cache_lock[i]);
126 }
127
128 static inline int agl_should_run(struct ll_statahead_info *sai,
129                                  struct inode *inode)
130 {
131         return inode && S_ISREG(inode->i_mode) && sai->sai_agl_task;
132 }
133
134 static inline struct ll_inode_info *
135 agl_first_entry(struct ll_statahead_info *sai)
136 {
137         return list_first_entry(&sai->sai_agls, struct ll_inode_info,
138                                 lli_agl_list);
139 }
140
141 /* statahead window is full */
142 static inline int sa_sent_full(struct ll_statahead_info *sai)
143 {
144         return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
145 }
146
147 /* Batch metadata handle */
148 static inline bool sa_has_batch_handle(struct ll_statahead_info *sai)
149 {
150         return sai->sai_bh != NULL;
151 }
152
153 static inline void ll_statahead_flush_nowait(struct ll_statahead_info *sai)
154 {
155         if (sa_has_batch_handle(sai)) {
156                 sai->sai_index_end = sai->sai_index - 1;
157                 (void) md_batch_flush(ll_i2mdexp(sai->sai_dentry->d_inode),
158                                       sai->sai_bh, false);
159         }
160 }
161
162 static inline int agl_list_empty(struct ll_statahead_info *sai)
163 {
164         return list_empty(&sai->sai_agls);
165 }
166
167 /**
168  * (1) hit ratio less than 80%
169  * or
170  * (2) consecutive miss more than 8
171  * then means low hit.
172  */
173 static inline int sa_low_hit(struct ll_statahead_info *sai)
174 {
175         return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
176                 (sai->sai_consecutive_miss > 8));
177 }
178
179 /*
180  * if the given index is behind of statahead window more than
181  * SA_OMITTED_ENTRY_MAX, then it is old.
182  */
183 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
184 {
185         return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
186                 sai->sai_index);
187 }
188
189 /* allocate sa_entry and hash it to allow scanner process to find it */
190 static struct sa_entry *
191 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
192          const char *name, int len, const struct lu_fid *fid)
193 {
194         struct ll_inode_info *lli;
195         struct sa_entry *entry;
196         int entry_size;
197         char *dname;
198
199         ENTRY;
200
201         entry_size = sizeof(struct sa_entry) +
202                      round_up(len + 1 /* for trailing NUL */, 4);
203         OBD_ALLOC(entry, entry_size);
204         if (unlikely(!entry))
205                 RETURN(ERR_PTR(-ENOMEM));
206
207         CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
208                len, name, entry, index);
209
210         entry->se_index = index;
211         entry->se_sai = sai;
212
213         entry->se_state = SA_ENTRY_INIT;
214         entry->se_size = entry_size;
215         dname = (char *)entry + sizeof(struct sa_entry);
216         memcpy(dname, name, len);
217         dname[len] = 0;
218         entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
219         entry->se_qstr.len = len;
220         entry->se_qstr.name = dname;
221
222         if (fid)
223                 entry->se_fid = *fid;
224
225         lli = ll_i2info(sai->sai_dentry->d_inode);
226         spin_lock(&lli->lli_sa_lock);
227         INIT_LIST_HEAD(&entry->se_list);
228         sa_rehash(lli->lli_sax, entry);
229         spin_unlock(&lli->lli_sa_lock);
230
231         atomic_inc(&sai->sai_cache_count);
232
233         RETURN(entry);
234 }
235
236 /* free sa_entry, which should have been unhashed and not in any list */
237 static void sa_free(struct ll_statahead_context *ctx, struct sa_entry *entry)
238 {
239         CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
240                entry->se_qstr.len, entry->se_qstr.name, entry,
241                entry->se_index);
242
243         LASSERT(list_empty(&entry->se_list));
244         LASSERT(sa_unhashed(entry));
245
246         OBD_FREE(entry, entry->se_size);
247 }
248
249 /*
250  * find sa_entry by name, used by directory scanner, lock is not needed because
251  * only scanner can remove the entry from cache.
252  */
253 static struct sa_entry *
254 sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
255 {
256         struct sa_entry *entry;
257         int i = sa_hash(qstr->hash);
258
259         spin_lock(&ctx->sax_cache_lock[i]);
260         list_for_each_entry(entry, &ctx->sax_cache[i], se_hash) {
261                 if (entry->se_qstr.hash == qstr->hash &&
262                     entry->se_qstr.len == qstr->len &&
263                     memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) {
264                         spin_unlock(&ctx->sax_cache_lock[i]);
265                         return entry;
266                 }
267         }
268         spin_unlock(&ctx->sax_cache_lock[i]);
269         return NULL;
270 }
271
272 /* unhash and unlink sa_entry, and then free it */
273 static inline void
274 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry, bool locked)
275 {
276         struct inode *dir = sai->sai_dentry->d_inode;
277         struct ll_inode_info *lli = ll_i2info(dir);
278         struct ll_statahead_context *ctx = lli->lli_sax;
279
280         LASSERT(!sa_unhashed(entry));
281         LASSERT(!list_empty(&entry->se_list));
282         LASSERT(sa_ready(entry));
283
284         sa_unhash(ctx, entry);
285
286         if (!locked)
287                 spin_lock(&lli->lli_sa_lock);
288         list_del_init(&entry->se_list);
289         spin_unlock(&lli->lli_sa_lock);
290
291         iput(entry->se_inode);
292         atomic_dec(&sai->sai_cache_count);
293         sa_free(ctx, entry);
294         if (locked)
295                 spin_lock(&lli->lli_sa_lock);
296 }
297
298 /* called by scanner after use, sa_entry will be killed */
299 static void
300 sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
301 {
302         struct ll_inode_info *lli = ll_i2info(dir);
303         struct sa_entry *tmp;
304         bool wakeup = false;
305
306         if (entry && entry->se_state == SA_ENTRY_SUCC) {
307                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
308
309                 sai->sai_hit++;
310                 sai->sai_consecutive_miss = 0;
311                 if (sai->sai_max < sbi->ll_sa_max) {
312                         sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
313                         wakeup = true;
314                 } else if (sai->sai_max_batch_count > 0) {
315                         if (sai->sai_max >= sai->sai_max_batch_count &&
316                            (sai->sai_index_end - entry->se_index) %
317                            sai->sai_max_batch_count == 0) {
318                                 wakeup = true;
319                         } else if (entry->se_index == sai->sai_index_end) {
320                                 wakeup = true;
321                         }
322                 } else {
323                         wakeup = true;
324                 }
325         } else if (sai) {
326                 sai->sai_miss++;
327                 sai->sai_consecutive_miss++;
328                 wakeup = true;
329         }
330
331         if (entry)
332                 sa_kill(sai, entry, false);
333
334         if (sai) {
335                 /*
336                  * kill old completed entries. Maybe kicking old entries can
337                  * be ignored?
338                  */
339                 spin_lock(&lli->lli_sa_lock);
340                 while ((tmp = list_first_entry_or_null(&sai->sai_entries,
341                                 struct sa_entry, se_list))) {
342                         if (!is_omitted_entry(sai, tmp->se_index))
343                                 break;
344
345                         /* ll_sa_lock is dropped by sa_kill(), restart list */
346                         sa_kill(sai, tmp, true);
347                 }
348                 spin_unlock(&lli->lli_sa_lock);
349         }
350
351         spin_lock(&lli->lli_sa_lock);
352         if (wakeup && sai->sai_task)
353                 wake_up_process(sai->sai_task);
354         spin_unlock(&lli->lli_sa_lock);
355 }
356
357 /*
358  * update state and sort add entry to sai_entries by index, return true if
359  * scanner is waiting on this entry.
360  */
361 static bool
362 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
363 {
364         struct sa_entry *se;
365         struct list_head *pos = &sai->sai_entries;
366         __u64 index = entry->se_index;
367
368         LASSERT(!sa_ready(entry));
369         LASSERT(list_empty(&entry->se_list));
370
371         list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
372                 if (se->se_index < entry->se_index) {
373                         pos = &se->se_list;
374                         break;
375                 }
376         }
377         list_add(&entry->se_list, pos);
378         /*
379          * LU-9210: ll_statahead_interpet must be able to see this before
380          * we wake it up
381          */
382         smp_store_release(&entry->se_state,
383                           ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
384
385         return (index == sai->sai_index_wait);
386 }
387
388 /* finish async stat RPC arguments */
389 static void sa_fini_data(struct md_op_item *item)
390 {
391         struct md_op_data *op_data = &item->mop_data;
392
393         if (op_data->op_flags & MF_OPNAME_KMALLOCED)
394                 /* allocated via ll_setup_filename called from sa_prep_data */
395                 kfree(op_data->op_name);
396         ll_unlock_md_op_lsm(&item->mop_data);
397         iput(item->mop_dir);
398         if (item->mop_subpill_allocated)
399                 OBD_FREE_PTR(item->mop_pill);
400         OBD_FREE_PTR(item);
401 }
402
403 static int ll_statahead_interpret(struct md_op_item *item, int rc);
404
405 /*
406  * prepare arguments for async stat RPC.
407  */
408 static struct md_op_item *
409 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
410 {
411         struct md_op_item *item;
412         struct ldlm_enqueue_info *einfo;
413         struct md_op_data *op_data;
414
415         OBD_ALLOC_PTR(item);
416         if (!item)
417                 return ERR_PTR(-ENOMEM);
418
419         op_data = ll_prep_md_op_data(&item->mop_data, dir, child,
420                                      entry->se_qstr.name, entry->se_qstr.len, 0,
421                                      LUSTRE_OPC_ANY, NULL);
422         if (IS_ERR(op_data)) {
423                 OBD_FREE_PTR(item);
424                 return (struct md_op_item *)op_data;
425         }
426
427         if (!child)
428                 op_data->op_fid2 = entry->se_fid;
429
430         item->mop_opc = MD_OP_GETATTR;
431         item->mop_it.it_op = IT_GETATTR;
432         item->mop_dir = igrab(dir);
433         item->mop_cb = ll_statahead_interpret;
434         item->mop_cbdata = entry;
435
436         einfo = &item->mop_einfo;
437         einfo->ei_type = LDLM_IBITS;
438         einfo->ei_mode = it_to_lock_mode(&item->mop_it);
439         einfo->ei_cb_bl = ll_md_blocking_ast;
440         einfo->ei_cb_cp = ldlm_completion_ast;
441         einfo->ei_cb_gl = NULL;
442         einfo->ei_cbdata = NULL;
443         einfo->ei_req_slot = 1;
444
445         return item;
446 }
447
448 /*
449  * release resources used in async stat RPC, update entry state and wakeup if
450  * scanner process it waiting on this entry.
451  */
452 static void
453 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
454 {
455         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
456         bool wakeup;
457
458         spin_lock(&lli->lli_sa_lock);
459         wakeup = __sa_make_ready(sai, entry, ret);
460         spin_unlock(&lli->lli_sa_lock);
461
462         if (wakeup)
463                 wake_up(&sai->sai_waitq);
464 }
465
466 /* insert inode into the list of sai_agls */
467 static void ll_agl_add(struct ll_statahead_info *sai,
468                        struct inode *inode, int index)
469 {
470         struct ll_inode_info *child  = ll_i2info(inode);
471         struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
472
473         spin_lock(&child->lli_agl_lock);
474         if (child->lli_agl_index == 0) {
475                 child->lli_agl_index = index;
476                 spin_unlock(&child->lli_agl_lock);
477
478                 LASSERT(list_empty(&child->lli_agl_list));
479
480                 spin_lock(&parent->lli_agl_lock);
481                 /* Re-check under the lock */
482                 if (agl_should_run(sai, inode)) {
483                         if (agl_list_empty(sai))
484                                 wake_up_process(sai->sai_agl_task);
485                         igrab(inode);
486                         list_add_tail(&child->lli_agl_list, &sai->sai_agls);
487                 } else
488                         child->lli_agl_index = 0;
489                 spin_unlock(&parent->lli_agl_lock);
490         } else {
491                 spin_unlock(&child->lli_agl_lock);
492         }
493 }
494
495 /* Allocate sax */
496 static struct ll_statahead_context *ll_sax_alloc(struct inode *dir)
497 {
498         struct ll_statahead_context *ctx;
499         int i;
500
501         ENTRY;
502
503         OBD_ALLOC_PTR(ctx);
504         if (ctx == NULL)
505                 RETURN(NULL);
506
507         ctx->sax_inode = igrab(dir);
508         atomic_set(&ctx->sax_refcount, 1);
509         for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
510                 INIT_LIST_HEAD(&ctx->sax_cache[i]);
511                 spin_lock_init(&ctx->sax_cache_lock[i]);
512         }
513
514         RETURN(ctx);
515 }
516
517 static inline void ll_sax_free(struct ll_statahead_context *ctx)
518 {
519         LASSERT(ctx->sax_inode != NULL);
520         iput(ctx->sax_inode);
521         OBD_FREE_PTR(ctx);
522 }
523
524 static inline void __ll_sax_get(struct ll_statahead_context *ctx)
525 {
526         atomic_inc(&ctx->sax_refcount);
527 }
528
529 static inline struct ll_statahead_context *ll_sax_get(struct inode *dir)
530 {
531         struct ll_inode_info *lli = ll_i2info(dir);
532         struct ll_statahead_context *ctx = NULL;
533
534         spin_lock(&lli->lli_sa_lock);
535         ctx = lli->lli_sax;
536         if (ctx)
537                 __ll_sax_get(ctx);
538         spin_unlock(&lli->lli_sa_lock);
539
540         return ctx;
541 }
542
543 static inline void ll_sax_put(struct inode *dir,
544                               struct ll_statahead_context *ctx)
545 {
546         struct ll_inode_info *lli = ll_i2info(dir);
547
548         if (atomic_dec_and_lock(&ctx->sax_refcount, &lli->lli_sa_lock)) {
549                 lli->lli_sai = NULL;
550                 lli->lli_sax = NULL;
551                 if (lli->lli_sa_pattern == LSA_PATTERN_FNAME) {
552                         lli->lli_opendir_key = NULL;
553                         lli->lli_opendir_pid = 0;
554                         lli->lli_sa_enabled = 0;
555                 }
556                 lli->lli_sa_pattern = LSA_PATTERN_NONE;
557                 spin_unlock(&lli->lli_sa_lock);
558
559                 ll_sax_free(ctx);
560         }
561 }
562
563 /* allocate sai */
564 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
565 {
566         struct ll_statahead_info *sai;
567         struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
568
569         ENTRY;
570
571         OBD_ALLOC_PTR(sai);
572         if (!sai)
573                 RETURN(NULL);
574
575         sai->sai_dentry = dget(dentry);
576         atomic_set(&sai->sai_refcount, 1);
577         sai->sai_max = LL_SA_RPC_MIN;
578         sai->sai_index = 1;
579         init_waitqueue_head(&sai->sai_waitq);
580
581         INIT_LIST_HEAD(&sai->sai_entries);
582         INIT_LIST_HEAD(&sai->sai_agls);
583
584         atomic_set(&sai->sai_cache_count, 0);
585
586         spin_lock(&sai_generation_lock);
587         lli->lli_sa_generation = ++sai_generation;
588         if (unlikely(sai_generation == 0))
589                 lli->lli_sa_generation = ++sai_generation;
590         spin_unlock(&sai_generation_lock);
591
592         RETURN(sai);
593 }
594
595 /* free sai */
596 static inline void ll_sai_free(struct ll_statahead_info *sai)
597 {
598         LASSERT(sai->sai_dentry != NULL);
599         dput(sai->sai_dentry);
600         OBD_FREE_PTR(sai);
601 }
602
603 static inline struct ll_statahead_info *
604 __ll_sai_get(struct ll_statahead_info *sai)
605 {
606         atomic_inc(&sai->sai_refcount);
607         return sai;
608 }
609
610 /*
611  * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
612  * attached to it.
613  */
614 static void ll_sai_put(struct ll_statahead_info *sai)
615 {
616         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
617
618         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
619                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
620                 struct sa_entry *entry, *next;
621
622                 lli->lli_sai = NULL;
623                 spin_unlock(&lli->lli_sa_lock);
624
625                 LASSERT(!sai->sai_task);
626                 LASSERT(!sai->sai_agl_task);
627                 LASSERT(sai->sai_sent == sai->sai_replied);
628
629                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
630                                          se_list)
631                         sa_kill(sai, entry, false);
632
633                 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
634                 LASSERT(agl_list_empty(sai));
635
636                 ll_sai_free(sai);
637                 atomic_dec(&sbi->ll_sa_running);
638         }
639 }
640
641 /* Do NOT forget to drop inode refcount when into sai_agls. */
642 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
643 {
644         struct ll_inode_info *lli = ll_i2info(inode);
645         u64 index = lli->lli_agl_index;
646         ktime_t expire;
647         int rc;
648
649         ENTRY;
650
651         LASSERT(list_empty(&lli->lli_agl_list));
652
653         /* AGL maybe fall behind statahead with one entry */
654         if (is_omitted_entry(sai, index + 1)) {
655                 lli->lli_agl_index = 0;
656                 iput(inode);
657                 RETURN_EXIT;
658         }
659
660         /*
661          * In case of restore, the MDT has the right size and has already
662          * sent it back without granting the layout lock, inode is up-to-date.
663          * Then AGL (async glimpse lock) is useless.
664          * Also to glimpse we need the layout, in case of a runninh restore
665          * the MDT holds the layout lock so the glimpse will block up to the
666          * end of restore (statahead/agl will block)
667          */
668         if (test_bit(LLIF_FILE_RESTORING, &lli->lli_flags)) {
669                 lli->lli_agl_index = 0;
670                 iput(inode);
671                 RETURN_EXIT;
672         }
673
674         /* Someone is in glimpse (sync or async), do nothing. */
675         rc = down_write_trylock(&lli->lli_glimpse_sem);
676         if (rc == 0) {
677                 lli->lli_agl_index = 0;
678                 iput(inode);
679                 RETURN_EXIT;
680         }
681
682         /*
683          * Someone triggered glimpse within 1 sec before.
684          * 1) The former glimpse succeeded with glimpse lock granted by OST, and
685          *    if the lock is still cached on client, AGL needs to do nothing. If
686          *    it is cancelled by other client, AGL maybe cannot obtaion new lock
687          *    for no glimpse callback triggered by AGL.
688          * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
689          *    Under such case, it is quite possible that the OST will not grant
690          *    glimpse lock for AGL also.
691          * 3) The former glimpse failed, compared with other two cases, it is
692          *    relative rare. AGL can ignore such case, and it will not muchly
693          *    affect the performance.
694          */
695         expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
696         if (ktime_to_ns(lli->lli_glimpse_time) &&
697             ktime_before(expire, lli->lli_glimpse_time)) {
698                 up_write(&lli->lli_glimpse_sem);
699                 lli->lli_agl_index = 0;
700                 iput(inode);
701                 RETURN_EXIT;
702         }
703
704         CDEBUG(D_READA,
705                "Handling (init) async glimpse: inode = " DFID", idx = %llu\n",
706                PFID(&lli->lli_fid), index);
707
708         cl_agl(inode);
709         lli->lli_agl_index = 0;
710         lli->lli_glimpse_time = ktime_get();
711         up_write(&lli->lli_glimpse_sem);
712
713         CDEBUG(D_READA,
714                "Handled (init) async glimpse: inode= " DFID", idx = %llu, rc = %d\n",
715                PFID(&lli->lli_fid), index, rc);
716
717         iput(inode);
718
719         EXIT;
720 }
721
722 static void ll_statahead_interpret_fini(struct ll_inode_info *lli,
723                                         struct ll_statahead_info *sai,
724                                         struct md_op_item *item,
725                                         struct sa_entry *entry,
726                                         struct ptlrpc_request *req,
727                                         int rc)
728 {
729         /*
730          * First it will drop ldlm ibits lock refcount by calling
731          * ll_intent_drop_lock() in spite of failures. Do not worry about
732          * calling ll_intent_drop_lock() more than once.
733          */
734         ll_intent_release(&item->mop_it);
735         sa_fini_data(item);
736         if (req)
737                 ptlrpc_req_finished(req);
738         sa_make_ready(sai, entry, rc);
739
740         spin_lock(&lli->lli_sa_lock);
741         sai->sai_replied++;
742         spin_unlock(&lli->lli_sa_lock);
743 }
744
745 static void ll_statahead_interpret_work(struct work_struct *work)
746 {
747         struct md_op_item *item = container_of(work, struct md_op_item,
748                                                mop_work);
749         struct req_capsule *pill = item->mop_pill;
750         struct inode *dir = item->mop_dir;
751         struct ll_inode_info *lli = ll_i2info(dir);
752         struct ll_statahead_info *sai;
753         struct lookup_intent *it;
754         struct sa_entry *entry;
755         struct mdt_body *body;
756         struct inode *child;
757         int rc;
758
759         ENTRY;
760
761         entry = (struct sa_entry *)item->mop_cbdata;
762         LASSERT(entry->se_handle != 0);
763
764         sai = entry->se_sai;
765         it = &item->mop_it;
766         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
767         if (!body)
768                 GOTO(out, rc = -EFAULT);
769
770         child = entry->se_inode;
771         /* revalidate; unlinked and re-created with the same name */
772         if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
773                      !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
774                 if (child) {
775                         entry->se_inode = NULL;
776                         iput(child);
777                 }
778                 /* The mdt_body is invalid. Skip this entry */
779                 GOTO(out, rc = -EAGAIN);
780         }
781
782         it->it_lock_handle = entry->se_handle;
783         rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
784         if (rc != 1)
785                 GOTO(out, rc = -EAGAIN);
786
787         rc = ll_prep_inode(&child, pill, dir->i_sb, it);
788         if (rc) {
789                 CERROR("%s: getattr callback for %.*s "DFID": rc = %d\n",
790                        ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
791                        entry->se_qstr.name, PFID(&entry->se_fid), rc);
792                 GOTO(out, rc);
793         }
794
795         /* If encryption context was returned by MDT, put it in
796          * inode now to save an extra getxattr.
797          */
798         if (body->mbo_valid & OBD_MD_ENCCTX) {
799                 void *encctx = req_capsule_server_get(pill, &RMF_FILE_ENCCTX);
800                 __u32 encctxlen = req_capsule_get_size(pill, &RMF_FILE_ENCCTX,
801                                                        RCL_SERVER);
802
803                 if (encctxlen) {
804                         CDEBUG(D_SEC,
805                                "server returned encryption ctx for "DFID"\n",
806                                PFID(ll_inode2fid(child)));
807                         rc = ll_xattr_cache_insert(child,
808                                                    xattr_for_enc(child),
809                                                    encctx, encctxlen);
810                         if (rc)
811                                 CWARN("%s: cannot set enc ctx for "DFID": rc = %d\n",
812                                       ll_i2sbi(child)->ll_fsname,
813                                       PFID(ll_inode2fid(child)), rc);
814                 }
815         }
816
817         CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
818                ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
819                entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
820         ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
821
822         entry->se_inode = child;
823
824         if (agl_should_run(sai, child))
825                 ll_agl_add(sai, child, entry->se_index);
826 out:
827         ll_statahead_interpret_fini(lli, sai, item, entry, pill->rc_req, rc);
828 }
829
830 /*
831  * Callback for async stat RPC, this is called in ptlrpcd context. It prepares
832  * the inode and set lock data directly in the ptlrpcd context. It will wake up
833  * the directory listing process if the dentry is the waiting one.
834  */
835 static int ll_statahead_interpret(struct md_op_item *item, int rc)
836 {
837         struct req_capsule *pill = item->mop_pill;
838         struct lookup_intent *it = &item->mop_it;
839         struct inode *dir = item->mop_dir;
840         struct ll_inode_info *lli = ll_i2info(dir);
841         struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
842         struct work_struct *work = &item->mop_work;
843         struct ll_statahead_info *sai;
844         struct mdt_body *body;
845         struct inode *child;
846         __u64 handle = 0;
847
848         ENTRY;
849
850         if (it_disposition(it, DISP_LOOKUP_NEG))
851                 rc = -ENOENT;
852
853         /*
854          * because statahead thread will wait for all inflight RPC to finish,
855          * sai should be always valid, no need to refcount
856          */
857         LASSERT(entry != NULL);
858         sai = entry->se_sai;
859         LASSERT(sai != NULL);
860
861         CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
862                entry->se_qstr.len, entry->se_qstr.name, rc);
863
864         if (rc != 0)
865                 GOTO(out, rc);
866
867         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
868         if (!body)
869                 GOTO(out, rc = -EFAULT);
870
871         child = entry->se_inode;
872         /*
873          * revalidate; unlinked and re-created with the same name.
874          * exclude the case where FID is zero as it was from statahead with
875          * regularized file name pattern and had no idea for the FID of the
876          * children file.
877          */
878         if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
879                      !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
880                 if (child) {
881                         entry->se_inode = NULL;
882                         iput(child);
883                 }
884                 /* The mdt_body is invalid. Skip this entry */
885                 GOTO(out, rc = -EAGAIN);
886         }
887
888         entry->se_handle = it->it_lock_handle;
889         /*
890          * In ptlrpcd context, it is not allowed to generate new RPCs
891          * especially for striped directories or regular files with layout
892          * change.
893          */
894         /*
895          * release ibits lock ASAP to avoid deadlock when statahead
896          * thread enqueues lock on parent in readdir and another
897          * process enqueues lock on child with parent lock held, eg.
898          * unlink.
899          */
900         handle = it->it_lock_handle;
901         ll_intent_drop_lock(it);
902         ll_unlock_md_op_lsm(&item->mop_data);
903
904         /*
905          * If the statahead entry is a striped directory or regular file with
906          * layout change, it will generate a new RPC and long wait in the
907          * ptlrpcd context.
908          * However, it is dangerous of blocking in ptlrpcd thread.
909          * Here we use work queue or the separate statahead thread to handle
910          * the extra RPC and long wait:
911          *      (@ll_prep_inode->@lmv_revalidate_slaves);
912          *      (@ll_prep_inode->@lov_layout_change->osc_cache_wait_range);
913          */
914         INIT_WORK(work, ll_statahead_interpret_work);
915         ptlrpc_request_addref(pill->rc_req);
916         schedule_work(work);
917         RETURN(0);
918 out:
919         ll_statahead_interpret_fini(lli, sai, item, entry, NULL, rc);
920         RETURN(rc);
921 }
922
923 static inline int sa_getattr(struct ll_statahead_info *sai, struct inode *dir,
924                              struct md_op_item *item)
925 {
926         int rc;
927
928         if (sa_has_batch_handle(sai))
929                 rc = md_batch_add(ll_i2mdexp(dir), sai->sai_bh, item);
930         else
931                 rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
932
933         return rc;
934 }
935
936 /* async stat for file not found in dcache */
937 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
938 {
939         struct md_op_item *item;
940         int rc;
941
942         ENTRY;
943
944         item = sa_prep_data(dir, NULL, entry);
945         if (IS_ERR(item))
946                 RETURN(PTR_ERR(item));
947
948         rc = sa_getattr(entry->se_sai, dir, item);
949         if (rc < 0)
950                 sa_fini_data(item);
951
952         RETURN(rc);
953 }
954
955 /**
956  * async stat for file found in dcache, similar to .revalidate
957  *
958  * \retval      1 dentry valid, no RPC sent
959  * \retval      0 dentry invalid, will send async stat RPC
960  * \retval      negative number upon error
961  */
962 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
963                          struct dentry *dentry)
964 {
965         struct inode *inode = dentry->d_inode;
966         struct lookup_intent it = { .it_op = IT_GETATTR,
967                                     .it_lock_handle = 0 };
968         struct md_op_item *item;
969         int rc;
970
971         ENTRY;
972
973         if (unlikely(!inode))
974                 RETURN(1);
975
976         if (d_mountpoint(dentry))
977                 RETURN(1);
978
979         item = sa_prep_data(dir, inode, entry);
980         if (IS_ERR(item))
981                 RETURN(PTR_ERR(item));
982
983         entry->se_inode = igrab(inode);
984         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
985                                 NULL);
986         if (rc == 1) {
987                 entry->se_handle = it.it_lock_handle;
988                 ll_intent_release(&it);
989                 sa_fini_data(item);
990                 RETURN(1);
991         }
992
993         rc = sa_getattr(entry->se_sai, dir, item);
994         if (rc < 0) {
995                 entry->se_inode = NULL;
996                 iput(inode);
997                 sa_fini_data(item);
998         }
999
1000         RETURN(rc);
1001 }
1002
1003 /* async stat for file with @name */
1004 static void sa_statahead(struct ll_statahead_info *sai, struct dentry *parent,
1005                          const char *name, int len, const struct lu_fid *fid)
1006 {
1007         struct inode *dir = parent->d_inode;
1008         struct dentry *dentry = NULL;
1009         struct sa_entry *entry;
1010         int rc;
1011
1012         ENTRY;
1013
1014         entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
1015         if (IS_ERR(entry))
1016                 RETURN_EXIT;
1017
1018         dentry = d_lookup(parent, &entry->se_qstr);
1019         if (!dentry) {
1020                 rc = sa_lookup(dir, entry);
1021         } else {
1022                 rc = sa_revalidate(dir, entry, dentry);
1023                 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
1024                         ll_agl_add(sai, dentry->d_inode, entry->se_index);
1025         }
1026
1027         if (dentry)
1028                 dput(dentry);
1029
1030         if (rc != 0)
1031                 sa_make_ready(sai, entry, rc);
1032         else
1033                 sai->sai_sent++;
1034
1035         sai->sai_index++;
1036
1037         if (sa_sent_full(sai))
1038                 ll_statahead_flush_nowait(sai);
1039
1040         EXIT;
1041 }
1042
1043 /* async glimpse (agl) thread main function */
1044 static int ll_agl_thread(void *arg)
1045 {
1046         /*
1047          * We already own this reference, so it is safe to take it
1048          * without a lock.
1049          */
1050         struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
1051         struct dentry *parent = sai->sai_dentry;
1052         struct inode *dir = parent->d_inode;
1053         struct ll_inode_info *plli = ll_i2info(dir);
1054         struct ll_inode_info *clli;
1055
1056         ENTRY;
1057
1058         CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
1059                sai, parent);
1060
1061         while (({set_current_state(TASK_IDLE);
1062                  !kthread_should_stop(); })) {
1063                 spin_lock(&plli->lli_agl_lock);
1064                 clli = list_first_entry_or_null(&sai->sai_agls,
1065                                                 struct ll_inode_info,
1066                                                 lli_agl_list);
1067                 if (clli) {
1068                         __set_current_state(TASK_RUNNING);
1069                         list_del_init(&clli->lli_agl_list);
1070                         spin_unlock(&plli->lli_agl_lock);
1071                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
1072                         cond_resched();
1073                 } else {
1074                         spin_unlock(&plli->lli_agl_lock);
1075                         schedule();
1076                 }
1077         }
1078         __set_current_state(TASK_RUNNING);
1079         RETURN(0);
1080 }
1081
1082 static void ll_stop_agl(struct ll_statahead_info *sai)
1083 {
1084         struct dentry *parent = sai->sai_dentry;
1085         struct ll_inode_info *plli = ll_i2info(parent->d_inode);
1086         struct ll_inode_info *clli;
1087         struct task_struct *agl_task;
1088
1089         spin_lock(&plli->lli_agl_lock);
1090         agl_task = sai->sai_agl_task;
1091         sai->sai_agl_task = NULL;
1092         spin_unlock(&plli->lli_agl_lock);
1093         if (!agl_task)
1094                 return;
1095
1096         CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
1097                sai, (unsigned int)agl_task->pid);
1098         kthread_stop(agl_task);
1099
1100         spin_lock(&plli->lli_agl_lock);
1101         while ((clli = list_first_entry_or_null(&sai->sai_agls,
1102                                                 struct ll_inode_info,
1103                                                 lli_agl_list)) != NULL) {
1104                 list_del_init(&clli->lli_agl_list);
1105                 spin_unlock(&plli->lli_agl_lock);
1106                 clli->lli_agl_index = 0;
1107                 iput(&clli->lli_vfs_inode);
1108                 spin_lock(&plli->lli_agl_lock);
1109         }
1110         spin_unlock(&plli->lli_agl_lock);
1111         CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
1112                sai, parent);
1113         ll_sai_put(sai);
1114 }
1115
1116 /* start agl thread */
1117 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
1118 {
1119         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1120         struct ll_inode_info *plli;
1121         struct task_struct *task;
1122
1123         ENTRY;
1124
1125         CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
1126                sai, parent);
1127
1128         plli = ll_i2info(parent->d_inode);
1129         task = kthread_create_on_node(ll_agl_thread, sai, node, "ll_agl_%d",
1130                                       plli->lli_opendir_pid);
1131         if (IS_ERR(task)) {
1132                 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
1133                 RETURN_EXIT;
1134         }
1135         sai->sai_agl_task = task;
1136         atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
1137         /* Get an extra reference that the thread holds */
1138         __ll_sai_get(sai);
1139
1140         wake_up_process(task);
1141
1142         EXIT;
1143 }
1144
1145 static int ll_statahead_by_list(struct dentry *parent)
1146 {
1147         struct inode *dir = parent->d_inode;
1148         struct ll_inode_info *lli = ll_i2info(dir);
1149         struct ll_statahead_info *sai = lli->lli_sai;
1150         struct ll_sb_info *sbi = ll_i2sbi(dir);
1151         struct md_op_data *op_data;
1152         struct page *page = NULL;
1153         __u64 pos = 0;
1154         int first = 0;
1155         int rc = 0;
1156
1157         ENTRY;
1158
1159         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1160                sai, parent);
1161
1162         OBD_ALLOC_PTR(op_data);
1163         if (!op_data)
1164                 RETURN(-ENOMEM);
1165
1166         /* matches smp_store_release() in ll_deauthorize_statahead() */
1167         while (pos != MDS_DIR_END_OFF && smp_load_acquire(&sai->sai_task)) {
1168                 struct lu_dirpage *dp;
1169                 struct lu_dirent  *ent;
1170
1171                 op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
1172                                              LUSTRE_OPC_ANY, dir);
1173                 if (IS_ERR(op_data)) {
1174                         rc = PTR_ERR(op_data);
1175                         break;
1176                 }
1177
1178                 page = ll_get_dir_page(dir, op_data, pos, NULL);
1179                 ll_unlock_md_op_lsm(op_data);
1180                 if (IS_ERR(page)) {
1181                         rc = PTR_ERR(page);
1182                         CDEBUG(D_READA,
1183                                "error reading dir "DFID" at %llu /%llu opendir_pid = %u: rc = %d\n",
1184                                PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1185                                lli->lli_opendir_pid, rc);
1186                         break;
1187                 }
1188
1189                 dp = page_address(page);
1190                 for (ent = lu_dirent_start(dp);
1191                      /* matches smp_store_release() in ll_deauthorize_statahead() */
1192                      ent != NULL && smp_load_acquire(&sai->sai_task) &&
1193                      !sa_low_hit(sai);
1194                      ent = lu_dirent_next(ent)) {
1195                         __u64 hash;
1196                         int namelen;
1197                         char *name;
1198                         struct lu_fid fid;
1199                         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1200
1201                         hash = le64_to_cpu(ent->lde_hash);
1202                         if (unlikely(hash < pos))
1203                                 /*
1204                                  * Skip until we find target hash value.
1205                                  */
1206                                 continue;
1207
1208                         namelen = le16_to_cpu(ent->lde_namelen);
1209                         if (unlikely(namelen == 0))
1210                                 /*
1211                                  * Skip dummy record.
1212                                  */
1213                                 continue;
1214
1215                         name = ent->lde_name;
1216                         if (name[0] == '.') {
1217                                 if (namelen == 1) {
1218                                         /*
1219                                          * skip "."
1220                                          */
1221                                         continue;
1222                                 } else if (name[1] == '.' && namelen == 2) {
1223                                         /*
1224                                          * skip ".."
1225                                          */
1226                                         continue;
1227                                 } else if (!sai->sai_ls_all) {
1228                                         /*
1229                                          * skip hidden files.
1230                                          */
1231                                         sai->sai_skip_hidden++;
1232                                         continue;
1233                                 }
1234                         }
1235
1236                         /*
1237                          * don't stat-ahead first entry.
1238                          */
1239                         if (unlikely(++first == 1))
1240                                 continue;
1241
1242                         fid_le_to_cpu(&fid, &ent->lde_fid);
1243
1244                         while (({set_current_state(TASK_IDLE);
1245                                  /* matches smp_store_release() in
1246                                   * ll_deauthorize_statahead() */
1247                                  smp_load_acquire(&sai->sai_task); })) {
1248                                 spin_lock(&lli->lli_agl_lock);
1249                                 while (sa_sent_full(sai) &&
1250                                        !agl_list_empty(sai)) {
1251                                         struct ll_inode_info *clli;
1252
1253                                         __set_current_state(TASK_RUNNING);
1254                                         clli = agl_first_entry(sai);
1255                                         list_del_init(&clli->lli_agl_list);
1256                                         spin_unlock(&lli->lli_agl_lock);
1257
1258                                         ll_agl_trigger(&clli->lli_vfs_inode,
1259                                                        sai);
1260                                         cond_resched();
1261                                         spin_lock(&lli->lli_agl_lock);
1262                                 }
1263                                 spin_unlock(&lli->lli_agl_lock);
1264
1265                                 if (!sa_sent_full(sai))
1266                                         break;
1267                                 schedule();
1268                         }
1269                         __set_current_state(TASK_RUNNING);
1270
1271                         if (IS_ENCRYPTED(dir)) {
1272                                 struct llcrypt_str de_name =
1273                                         LLTR_INIT(ent->lde_name, namelen);
1274                                 struct lu_fid fid;
1275
1276                                 rc = llcrypt_fname_alloc_buffer(dir, NAME_MAX,
1277                                                                 &lltr);
1278                                 if (rc < 0)
1279                                         continue;
1280
1281                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1282                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1283                                                          &lltr, &fid)) {
1284                                         llcrypt_fname_free_buffer(&lltr);
1285                                         continue;
1286                                 }
1287
1288                                 name = lltr.name;
1289                                 namelen = lltr.len;
1290                         }
1291
1292                         sa_statahead(sai, parent, name, namelen, &fid);
1293                         llcrypt_fname_free_buffer(&lltr);
1294                 }
1295
1296                 pos = le64_to_cpu(dp->ldp_hash_end);
1297                 ll_release_page(dir, page,
1298                                 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1299
1300                 if (sa_low_hit(sai)) {
1301                         rc = -EFAULT;
1302                         atomic_inc(&sbi->ll_sa_wrong);
1303                         CDEBUG(D_READA,
1304                                "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stoppingstatahead thread: pid %d\n",
1305                                PFID(&lli->lli_fid), sai->sai_hit,
1306                                sai->sai_miss, sai->sai_sent,
1307                                sai->sai_replied, current->pid);
1308                         break;
1309                 }
1310         }
1311         ll_finish_md_op_data(op_data);
1312
1313         RETURN(rc);
1314 }
1315
1316 static int ll_statahead_by_fname(struct ll_statahead_info *sai,
1317                                  struct dentry *parent)
1318 {
1319         struct inode *dir = parent->d_inode;
1320         struct ll_inode_info *lli = ll_i2info(dir);
1321         struct ll_sb_info *sbi = ll_i2sbi(dir);
1322         size_t max_len;
1323         size_t len;
1324         char *fname;
1325         char *ptr;
1326         int rc = 0;
1327         __u64 i = 0;
1328
1329         ENTRY;
1330
1331         CDEBUG(D_READA, "%s: FNAME statahead: parent %pd fname prefix %s\n",
1332                sbi->ll_fsname, parent, sai->sai_fname);
1333
1334         OBD_ALLOC(fname, NAME_MAX);
1335         if (fname == NULL)
1336                 RETURN(-ENOMEM);
1337
1338         len = strlen(sai->sai_fname);
1339         memcpy(fname, sai->sai_fname, len);
1340         max_len = sizeof(sai->sai_fname) - len;
1341         ptr = fname + len;
1342
1343         /* matches smp_store_release() in ll_deauthorize_statahead() */
1344         while (smp_load_acquire(&sai->sai_task)) {
1345                 size_t numlen;
1346
1347                 numlen = snprintf(ptr, max_len, "%llu",
1348                                   sai->sai_fstart + i);
1349
1350                 while (({set_current_state(TASK_IDLE);
1351                          /*
1352                           * matches smp_store_release() in
1353                           * ll_deauthorize_statahead()
1354                           */
1355                          smp_load_acquire(&sai->sai_task); })) {
1356                         spin_lock(&lli->lli_agl_lock);
1357                         while (sa_sent_full(sai) && !agl_list_empty(sai)) {
1358                                 struct ll_inode_info *clli;
1359
1360                                 __set_current_state(TASK_RUNNING);
1361                                 clli = agl_first_entry(sai);
1362                                 list_del_init(&clli->lli_agl_list);
1363                                 spin_unlock(&lli->lli_agl_lock);
1364
1365                                 ll_agl_trigger(&clli->lli_vfs_inode, sai);
1366                                 cond_resched();
1367                                 spin_lock(&lli->lli_agl_lock);
1368                         }
1369                         spin_unlock(&lli->lli_agl_lock);
1370
1371                         if (!sa_sent_full(sai))
1372                                 break;
1373                         schedule();
1374                 }
1375                 __set_current_state(TASK_RUNNING);
1376
1377                 sa_statahead(sai, parent, fname, len + numlen, NULL);
1378                 if (++i >= sai->sai_fend)
1379                         break;
1380         }
1381
1382         OBD_FREE(fname, NAME_MAX);
1383         RETURN(rc);
1384 }
1385
1386 /* statahead thread main function */
1387 static int ll_statahead_thread(void *arg)
1388 {
1389         struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
1390         struct dentry *parent = sai->sai_dentry;
1391         struct inode *dir = parent->d_inode;
1392         struct ll_inode_info *lli = ll_i2info(dir);
1393         struct ll_sb_info *sbi = ll_i2sbi(dir);
1394         struct lu_batch *bh = NULL;
1395         int rc = 0;
1396
1397         ENTRY;
1398
1399         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1400                sai, parent);
1401
1402         sai->sai_max_batch_count = sbi->ll_sa_batch_max;
1403         if (sai->sai_max_batch_count) {
1404                 bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
1405                                      sai->sai_max_batch_count);
1406                 if (IS_ERR(bh))
1407                         GOTO(out_stop_agl, rc = PTR_ERR(bh));
1408         }
1409
1410         sai->sai_bh = bh;
1411
1412         switch (lli->lli_sa_pattern) {
1413         case LSA_PATTERN_LIST:
1414                 rc = ll_statahead_by_list(parent);
1415                 break;
1416         case LSA_PATTERN_FNAME:
1417                 rc = ll_statahead_by_fname(sai, parent);
1418                 break;
1419         default:
1420                 rc = -EFAULT;
1421                 break;
1422         }
1423
1424         if (rc < 0) {
1425                 spin_lock(&lli->lli_sa_lock);
1426                 sai->sai_task = NULL;
1427                 lli->lli_sa_enabled = 0;
1428                 spin_unlock(&lli->lli_sa_lock);
1429         }
1430
1431         ll_statahead_flush_nowait(sai);
1432
1433         /*
1434          * statahead is finished, but statahead entries need to be cached, wait
1435          * for file release closedir() call to stop me.
1436          */
1437         while (({set_current_state(TASK_IDLE);
1438                 /* matches smp_store_release() in ll_deauthorize_statahead() */
1439                 smp_load_acquire(&sai->sai_task); })) {
1440                 schedule();
1441         }
1442         __set_current_state(TASK_RUNNING);
1443
1444         EXIT;
1445
1446         if (bh) {
1447                 rc = md_batch_stop(ll_i2mdexp(dir), sai->sai_bh);
1448                 sai->sai_bh = NULL;
1449         }
1450
1451 out_stop_agl:
1452         ll_stop_agl(sai);
1453
1454         /*
1455          * wait for inflight statahead RPCs to finish, and then we can free sai
1456          * safely because statahead RPC will access sai data
1457          */
1458         while (sai->sai_sent != sai->sai_replied)
1459                 /* in case we're not woken up, timeout wait */
1460                 msleep(125);
1461
1462         CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd hit %llu miss %llu\n",
1463                sbi->ll_fsname, sai, parent, sai->sai_hit, sai->sai_miss);
1464
1465         spin_lock(&lli->lli_sa_lock);
1466         sai->sai_task = NULL;
1467         spin_unlock(&lli->lli_sa_lock);
1468         wake_up(&sai->sai_waitq);
1469
1470         atomic_add(sai->sai_hit, &sbi->ll_sa_hit_total);
1471         atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
1472
1473         ll_sai_put(sai);
1474         ll_sax_put(dir, lli->lli_sax);
1475
1476         return rc;
1477 }
1478
1479 /* authorize opened dir handle @key to statahead */
1480 void ll_authorize_statahead(struct inode *dir, void *key)
1481 {
1482         struct ll_inode_info *lli = ll_i2info(dir);
1483
1484         spin_lock(&lli->lli_sa_lock);
1485         if (!lli->lli_opendir_key && !lli->lli_sai) {
1486                 /*
1487                  * if lli_sai is not NULL, it means previous statahead is not
1488                  * finished yet, we'd better not start a new statahead for now.
1489                  */
1490                 lli->lli_opendir_key = key;
1491                 lli->lli_opendir_pid = current->pid;
1492                 lli->lli_sa_enabled = 1;
1493         }
1494         spin_unlock(&lli->lli_sa_lock);
1495 }
1496
1497 static void ll_deauthorize_statahead_fname(struct inode *dir, void *key)
1498 {
1499         struct ll_inode_info *lli = ll_i2info(dir);
1500         struct ll_file_data *fd = (struct ll_file_data *)key;
1501         struct ll_statahead_info *sai = fd->fd_sai;
1502
1503         if (sai == NULL)
1504                 return;
1505
1506         spin_lock(&lli->lli_sa_lock);
1507         if (sai->sai_task) {
1508                 struct task_struct *task = sai->sai_task;
1509
1510                 sai->sai_task = NULL;
1511                 wake_up_process(task);
1512         }
1513         fd->fd_sai = NULL;
1514         spin_unlock(&lli->lli_sa_lock);
1515         ll_sai_put(sai);
1516         LASSERT(lli->lli_sax != NULL);
1517         ll_sax_put(dir, lli->lli_sax);
1518 }
1519
1520 /*
1521  * deauthorize opened dir handle @key to statahead, and notify statahead thread
1522  * to quit if it's running.
1523  */
1524 void ll_deauthorize_statahead(struct inode *dir, void *key)
1525 {
1526         struct ll_inode_info *lli = ll_i2info(dir);
1527         struct ll_statahead_info *sai;
1528
1529         LASSERT(lli->lli_opendir_pid != 0);
1530
1531         CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1532                PFID(&lli->lli_fid));
1533
1534         if (lli->lli_sa_pattern == LSA_PATTERN_FNAME) {
1535                 ll_deauthorize_statahead_fname(dir, key);
1536                 return;
1537         }
1538
1539         LASSERT(lli->lli_opendir_key == key);
1540         spin_lock(&lli->lli_sa_lock);
1541         lli->lli_opendir_key = NULL;
1542         lli->lli_opendir_pid = 0;
1543         lli->lli_sa_enabled = 0;
1544         sai = lli->lli_sai;
1545         if (sai && sai->sai_task) {
1546                 /*
1547                  * statahead thread may not have quit yet because it needs to
1548                  * cache entries, now it's time to tell it to quit.
1549                  *
1550                  * wake_up_process() provides the necessary barriers
1551                  * to pair with set_current_state().
1552                  */
1553                 struct task_struct *task = sai->sai_task;
1554
1555                 /* matches smp_load_acquire() in ll_statahead_thread() */
1556                 smp_store_release(&sai->sai_task, NULL);
1557                 wake_up_process(task);
1558         }
1559         spin_unlock(&lli->lli_sa_lock);
1560 }
1561
1562 enum {
1563         /**
1564          * not first dirent, or is "."
1565          */
1566         LS_NOT_FIRST_DE = 0,
1567         /**
1568          * the first non-hidden dirent
1569          */
1570         LS_FIRST_DE,
1571         /**
1572          * the first hidden dirent, that is "."
1573          */
1574         LS_FIRST_DOT_DE
1575 };
1576
1577 /* file is first dirent under @dir */
1578 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1579 {
1580         struct qstr *target = &dentry->d_name;
1581         struct md_op_data *op_data;
1582         int dot_de;
1583         struct page *page = NULL;
1584         int rc = LS_NOT_FIRST_DE;
1585         __u64 pos = 0;
1586         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1587
1588         ENTRY;
1589
1590         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1591                                      LUSTRE_OPC_ANY, dir);
1592         if (IS_ERR(op_data))
1593                 RETURN(PTR_ERR(op_data));
1594
1595         if (IS_ENCRYPTED(dir)) {
1596                 int rc2 = llcrypt_fname_alloc_buffer(dir, NAME_MAX, &lltr);
1597
1598                 if (rc2 < 0)
1599                         RETURN(rc2);
1600         }
1601
1602         /**
1603          *FIXME choose the start offset of the readdir
1604          */
1605
1606         page = ll_get_dir_page(dir, op_data, 0, NULL);
1607
1608         while (1) {
1609                 struct lu_dirpage *dp;
1610                 struct lu_dirent  *ent;
1611
1612                 if (IS_ERR(page)) {
1613                         struct ll_inode_info *lli = ll_i2info(dir);
1614
1615                         rc = PTR_ERR(page);
1616                         CERROR("%s: reading dir "DFID" at %llu opendir_pid = %u : rc = %d\n",
1617                                ll_i2sbi(dir)->ll_fsname,
1618                                PFID(ll_inode2fid(dir)), pos,
1619                                lli->lli_opendir_pid, rc);
1620                         break;
1621                 }
1622
1623                 dp = page_address(page);
1624                 for (ent = lu_dirent_start(dp); ent != NULL;
1625                      ent = lu_dirent_next(ent)) {
1626                         __u64 hash;
1627                         int namelen;
1628                         char *name;
1629
1630                         hash = le64_to_cpu(ent->lde_hash);
1631                         /*
1632                          * The ll_get_dir_page() can return any page containing
1633                          * the given hash which may be not the start hash.
1634                          */
1635                         if (unlikely(hash < pos))
1636                                 continue;
1637
1638                         namelen = le16_to_cpu(ent->lde_namelen);
1639                         if (unlikely(namelen == 0))
1640                                 /*
1641                                  * skip dummy record.
1642                                  */
1643                                 continue;
1644
1645                         name = ent->lde_name;
1646                         if (name[0] == '.') {
1647                                 if (namelen == 1)
1648                                         /*
1649                                          * skip "."
1650                                          */
1651                                         continue;
1652                                 else if (name[1] == '.' && namelen == 2)
1653                                         /*
1654                                          * skip ".."
1655                                          */
1656                                         continue;
1657                                 else
1658                                         dot_de = 1;
1659                         } else {
1660                                 dot_de = 0;
1661                         }
1662
1663                         if (dot_de && target->name[0] != '.') {
1664                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1665                                        target->len, target->name,
1666                                        namelen, name);
1667                                 continue;
1668                         }
1669
1670                         if (IS_ENCRYPTED(dir)) {
1671                                 struct llcrypt_str de_name =
1672                                         LLTR_INIT(ent->lde_name, namelen);
1673                                 struct lu_fid fid;
1674
1675                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1676                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1677                                                          &lltr, &fid))
1678                                         continue;
1679                                 name = lltr.name;
1680                                 namelen = lltr.len;
1681                         }
1682
1683                         if (target->len != namelen ||
1684                             memcmp(target->name, name, namelen) != 0)
1685                                 rc = LS_NOT_FIRST_DE;
1686                         else if (!dot_de)
1687                                 rc = LS_FIRST_DE;
1688                         else
1689                                 rc = LS_FIRST_DOT_DE;
1690
1691                         ll_release_page(dir, page, false);
1692                         GOTO(out, rc);
1693                 }
1694                 pos = le64_to_cpu(dp->ldp_hash_end);
1695                 if (pos == MDS_DIR_END_OFF) {
1696                         /*
1697                          * End of directory reached.
1698                          */
1699                         ll_release_page(dir, page, false);
1700                         GOTO(out, rc);
1701                 } else {
1702                         /*
1703                          * chain is exhausted
1704                          * Normal case: continue to the next page.
1705                          */
1706                         ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1707                                               LDF_COLLIDE);
1708                         page = ll_get_dir_page(dir, op_data, pos, NULL);
1709                 }
1710         }
1711         EXIT;
1712 out:
1713         llcrypt_fname_free_buffer(&lltr);
1714         ll_finish_md_op_data(op_data);
1715
1716         return rc;
1717 }
1718
1719 /**
1720  * revalidate @dentryp from statahead cache
1721  *
1722  * \param[in] dir       parent directory
1723  * \param[in] sai       sai structure
1724  * \param[out] dentryp  pointer to dentry which will be revalidated
1725  * \param[in] unplug    unplug statahead window only (normally for negative
1726  *                      dentry)
1727  * \retval              1 on success, dentry is saved in @dentryp
1728  * \retval              0 if revalidation failed (no proper lock on client)
1729  * \retval              negative number upon error
1730  */
1731 static int revalidate_statahead_dentry(struct inode *dir,
1732                                        struct ll_statahead_context *ctx,
1733                                        struct dentry **dentryp,
1734                                        bool unplug)
1735 {
1736         struct sa_entry *entry = NULL;
1737         struct ll_inode_info *lli = ll_i2info(dir);
1738         struct ll_statahead_info *sai = lli->lli_sai;
1739         int rc = 0;
1740
1741         ENTRY;
1742
1743         if (sai && (*dentryp)->d_name.name[0] == '.') {
1744                 if (sai->sai_ls_all ||
1745                     sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1746                         /*
1747                          * Hidden dentry is the first one, or statahead
1748                          * thread does not skip so many hidden dentries
1749                          * before "sai_ls_all" enabled as below.
1750                          */
1751                 } else {
1752                         if (!sai->sai_ls_all)
1753                                 /*
1754                                  * It maybe because hidden dentry is not
1755                                  * the first one, "sai_ls_all" was not
1756                                  * set, then "ls -al" missed. Enable
1757                                  * "sai_ls_all" for such case.
1758                                  */
1759                                 sai->sai_ls_all = 1;
1760
1761                         /*
1762                          * Such "getattr" has been skipped before
1763                          * "sai_ls_all" enabled as above.
1764                          */
1765                         sai->sai_miss_hidden++;
1766                         RETURN(-EAGAIN);
1767                 }
1768         }
1769
1770         if (unplug)
1771                 GOTO(out, rc = 1);
1772
1773         entry = sa_get(ctx, &(*dentryp)->d_name);
1774         if (!entry)
1775                 GOTO(out, rc = -EAGAIN);
1776
1777         if (lli->lli_sa_pattern == LSA_PATTERN_LIST)
1778                 LASSERT(sai == entry->se_sai);
1779         else if (lli->lli_sa_pattern == LSA_PATTERN_FNAME)
1780                 sai = entry->se_sai;
1781
1782         LASSERT(sai != NULL);
1783         if (!sa_ready(entry)) {
1784                 spin_lock(&lli->lli_sa_lock);
1785                 sai->sai_index_wait = entry->se_index;
1786                 spin_unlock(&lli->lli_sa_lock);
1787                 rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
1788                                              cfs_time_seconds(30));
1789                 if (rc == 0) {
1790                         /*
1791                          * entry may not be ready, so it may be used by inflight
1792                          * statahead RPC, don't free it.
1793                          */
1794                         entry = NULL;
1795                         GOTO(out, rc = -EAGAIN);
1796                 }
1797         }
1798
1799         /*
1800          * We need to see the value that was set immediately before we
1801          * were woken up.
1802          */
1803         if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
1804             entry->se_inode) {
1805                 struct inode *inode = entry->se_inode;
1806                 struct lookup_intent it = { .it_op = IT_GETATTR,
1807                                             .it_lock_handle =
1808                                                 entry->se_handle };
1809                 __u64 bits;
1810
1811                 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1812                                         ll_inode2fid(inode), &bits);
1813                 if (rc == 1) {
1814                         if (!(*dentryp)->d_inode) {
1815                                 struct dentry *alias;
1816
1817                                 alias = ll_splice_alias(inode, *dentryp);
1818                                 if (IS_ERR(alias)) {
1819                                         ll_intent_release(&it);
1820                                         GOTO(out, rc = PTR_ERR(alias));
1821                                 }
1822                                 *dentryp = alias;
1823                                 /*
1824                                  * statahead prepared this inode, transfer inode
1825                                  * refcount from sa_entry to dentry
1826                                  */
1827                                 entry->se_inode = NULL;
1828                         } else if ((*dentryp)->d_inode != inode) {
1829                                 /* revalidate, but inode is recreated */
1830                                 CDEBUG(D_READA,
1831                                        "%s: stale dentry %pd inode " DFID", statahead inode "DFID "\n",
1832                                        ll_i2sbi(inode)->ll_fsname, *dentryp,
1833                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
1834                                        PFID(ll_inode2fid(inode)));
1835                                 ll_intent_release(&it);
1836                                 GOTO(out, rc = -ESTALE);
1837                         }
1838
1839                         if (bits & MDS_INODELOCK_LOOKUP) {
1840                                 d_lustre_revalidate(*dentryp);
1841                                 if (S_ISDIR(inode->i_mode))
1842                                         ll_update_dir_depth_dmv(dir, *dentryp);
1843                         }
1844
1845                         ll_intent_release(&it);
1846                 }
1847         }
1848 out:
1849         /*
1850          * statahead cached sa_entry can be used only once, and will be killed
1851          * right after use, so if lookup/revalidate accessed statahead cache,
1852          * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
1853          * stat this file again, we know we've done statahead before, see
1854          * dentry_may_statahead().
1855          */
1856         if (lld_is_init(*dentryp))
1857                 ll_d2d(*dentryp)->lld_sa_generation = lli->lli_sa_generation;
1858         sa_put(dir, sai, entry);
1859
1860         RETURN(rc);
1861 }
1862
1863 /**
1864  * start statahead thread
1865  *
1866  * \param[in] dir       parent directory
1867  * \param[in] dentry    dentry that triggers statahead, normally the first
1868  *                      dirent under @dir
1869  * \param[in] agl       indicate whether AGL is needed
1870  * \retval              -EAGAIN on success, because when this function is
1871  *                      called, it's already in lookup call, so client should
1872  *                      do it itself instead of waiting for statahead thread
1873  *                      to do it asynchronously.
1874  * \retval              negative number upon error
1875  */
1876 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
1877                                   bool agl)
1878 {
1879         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1880         struct ll_inode_info *lli = ll_i2info(dir);
1881         struct ll_statahead_info *sai = NULL;
1882         struct ll_statahead_context *ctx = NULL;
1883         struct dentry *parent = dentry->d_parent;
1884         struct task_struct *task;
1885         struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
1886         int first = LS_FIRST_DE;
1887         int rc = 0;
1888
1889         ENTRY;
1890
1891         /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1892         first = is_first_dirent(dir, dentry);
1893         if (first == LS_NOT_FIRST_DE)
1894                 /* It is not "ls -{a}l" operation, no need statahead for it. */
1895                 GOTO(out, rc = -EFAULT);
1896
1897         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
1898                                        sbi->ll_sa_running_max)) {
1899                 CDEBUG(D_READA,
1900                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
1901                 GOTO(out, rc = -EMFILE);
1902         }
1903
1904         sai = ll_sai_alloc(parent);
1905         if (!sai)
1906                 GOTO(out, rc = -ENOMEM);
1907
1908         ctx = ll_sax_alloc(dir);
1909         if (!ctx)
1910                 GOTO(out, rc = -ENOMEM);
1911
1912         sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
1913
1914         /*
1915          * if current lli_opendir_key was deauthorized, or dir re-opened by
1916          * another process, don't start statahead, otherwise the newly spawned
1917          * statahead thread won't be notified to quit.
1918          */
1919         spin_lock(&lli->lli_sa_lock);
1920         if (unlikely(lli->lli_sai || !lli->lli_opendir_key ||
1921                      lli->lli_opendir_pid != current->pid ||
1922                      lli->lli_sa_pattern != LSA_PATTERN_NONE)) {
1923                 spin_unlock(&lli->lli_sa_lock);
1924                 GOTO(out, rc = -EPERM);
1925         }
1926         lli->lli_sai = sai;
1927         lli->lli_sax = ctx;
1928         lli->lli_sa_pattern = LSA_PATTERN_LIST;
1929         spin_unlock(&lli->lli_sa_lock);
1930
1931         CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
1932                current->pid, parent);
1933
1934         task = kthread_create_on_node(ll_statahead_thread, sai, node,
1935                                       "ll_sa_%u", lli->lli_opendir_pid);
1936         if (IS_ERR(task)) {
1937                 spin_lock(&lli->lli_sa_lock);
1938                 lli->lli_sai = NULL;
1939                 spin_unlock(&lli->lli_sa_lock);
1940                 rc = PTR_ERR(task);
1941                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1942                 GOTO(out, rc);
1943         }
1944
1945         if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
1946                 ll_start_agl(parent, sai);
1947
1948         atomic_inc(&sbi->ll_sa_total);
1949         sai->sai_task = task;
1950
1951         wake_up_process(task);
1952         /*
1953          * We don't stat-ahead for the first dirent since we are already in
1954          * lookup.
1955          */
1956         RETURN(-EAGAIN);
1957
1958 out:
1959         /*
1960          * once we start statahead thread failed, disable statahead so that
1961          * subsequent stat won't waste time to try it.
1962          */
1963         spin_lock(&lli->lli_sa_lock);
1964         if (lli->lli_opendir_pid == current->pid)
1965                 lli->lli_sa_enabled = 0;
1966         spin_unlock(&lli->lli_sa_lock);
1967
1968         if (sai)
1969                 ll_sai_free(sai);
1970
1971         if (ctx)
1972                 ll_sax_free(ctx);
1973
1974         if (first != LS_NOT_FIRST_DE)
1975                 atomic_dec(&sbi->ll_sa_running);
1976
1977         RETURN(rc);
1978 }
1979
1980 /*
1981  * Check whether statahead for @dir was started.
1982  */
1983 static inline bool ll_statahead_started(struct inode *dir, bool agl)
1984 {
1985         struct ll_inode_info *lli = ll_i2info(dir);
1986         struct ll_statahead_context *ctx;
1987         struct ll_statahead_info *sai;
1988
1989         spin_lock(&lli->lli_sa_lock);
1990         ctx = lli->lli_sax;
1991         sai = lli->lli_sai;
1992         if (sai && (sai->sai_agl_task != NULL) != agl)
1993                 CDEBUG(D_READA,
1994                        "%s: Statahead AGL hint changed from %d to %d\n",
1995                        ll_i2sbi(dir)->ll_fsname,
1996                        sai->sai_agl_task != NULL, agl);
1997         spin_unlock(&lli->lli_sa_lock);
1998
1999         return !!ctx;
2000 }
2001
2002 /**
2003  * statahead entry function, this is called when client getattr on a file, it
2004  * will start statahead thread if this is the first dir entry, else revalidate
2005  * dentry from statahead cache.
2006  *
2007  * \param[in]  dir      parent directory
2008  * \param[out] dentryp  dentry to getattr
2009  * \param[in]  agl      whether start the agl thread
2010  *
2011  * \retval              1 on success
2012  * \retval              0 revalidation from statahead cache failed, caller needs
2013  *                      to getattr from server directly
2014  * \retval              negative number on error, caller often ignores this and
2015  *                      then getattr from server
2016  */
2017 int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
2018 {
2019         if (!ll_statahead_started(dir, agl))
2020                 return start_statahead_thread(dir, dentry, agl);
2021         return 0;
2022 }
2023
2024 /**
2025  * revalidate dentry from statahead cache.
2026  *
2027  * \param[in]  dir      parent directory
2028  * \param[out] dentryp  dentry to getattr
2029  * \param[in]  unplug   unplug statahead window only (normally for negative
2030  *                      dentry)
2031  * \retval              1 on success
2032  * \retval              0 revalidation from statahead cache failed, caller needs
2033  *                      to getattr from server directly
2034  * \retval              negative number on error, caller often ignores this and
2035  *                      then getattr from server
2036  */
2037 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
2038                             bool unplug)
2039 {
2040         struct ll_inode_info *lli = ll_i2info(dir);
2041         struct ll_statahead_context *ctx;
2042         struct ll_statahead_info *sai = NULL;
2043         int rc = 0;
2044
2045         spin_lock(&lli->lli_sa_lock);
2046         ctx = lli->lli_sax;
2047         if (ctx) {
2048                 sai = lli->lli_sai;
2049                 if (sai) {
2050                         atomic_inc(&sai->sai_refcount);
2051                 } else if (lli->lli_sa_pattern & LSA_PATTERN_LIST) {
2052                         spin_unlock(&lli->lli_sa_lock);
2053                         return 0;
2054                 }
2055                 __ll_sax_get(ctx);
2056         }
2057         spin_unlock(&lli->lli_sa_lock);
2058         if (ctx) {
2059                 rc = revalidate_statahead_dentry(dir, ctx, dentryp, unplug);
2060                 CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
2061                        *dentryp, rc);
2062                 if (sai)
2063                         ll_sai_put(sai);
2064                 ll_sax_put(dir, ctx);
2065         }
2066         return rc;
2067 }
2068
2069 int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise)
2070 {
2071         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
2072         struct ll_file_data *fd = file->private_data;
2073         struct dentry *dentry = file_dentry(file);
2074         struct inode *dir = dentry->d_inode;
2075         struct ll_inode_info *lli = ll_i2info(dir);
2076         struct ll_sb_info *sbi = ll_i2sbi(dir);
2077         struct ll_statahead_info *sai = NULL;
2078         struct ll_statahead_context *ctx = NULL;
2079         struct task_struct *task;
2080         bool agl = true;
2081         int rc;
2082
2083         ENTRY;
2084
2085         if (sbi->ll_sa_max == 0)
2086                 RETURN(0);
2087
2088         if (!S_ISDIR(dir->i_mode))
2089                 RETURN(-EINVAL);
2090
2091         if (fd->fd_sai) {
2092                 rc = -EALREADY;
2093                 CWARN("%s: already set statahead hint for dir %pd: rc = %d\n",
2094                       sbi->ll_fsname, dentry, rc);
2095                 RETURN(rc);
2096         }
2097
2098         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
2099                                        sbi->ll_sa_running_max)) {
2100                 CDEBUG(D_READA,
2101                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
2102                 GOTO(out, rc = -EMFILE);
2103         }
2104
2105         sai = ll_sai_alloc(dentry);
2106         if (sai == NULL)
2107                 GOTO(out, rc = -ENOMEM);
2108
2109         sai->sai_fstart = ladvise->lla_start;
2110         sai->sai_fend = ladvise->lla_end;
2111         sai->sai_ls_all = 0;
2112         sai->sai_max = sbi->ll_sa_max;
2113         strncpy(sai->sai_fname, ladvise->lla_fname, sizeof(sai->sai_fname));
2114
2115         ctx = ll_sax_get(dir);
2116         if (ctx == NULL) {
2117                 ctx = ll_sax_alloc(dir);
2118                 if (ctx == NULL)
2119                         GOTO(out, rc = -ENOMEM);
2120
2121                 spin_lock(&lli->lli_sa_lock);
2122                 if (unlikely(lli->lli_sax)) {
2123                         struct ll_statahead_context *tmp = ctx;
2124
2125                         if (lli->lli_sa_pattern == LSA_PATTERN_NONE ||
2126                             lli->lli_sa_pattern == LSA_PATTERN_FNAME) {
2127                                 lli->lli_sa_pattern = LSA_PATTERN_FNAME;
2128                                 ctx = lli->lli_sax;
2129                                 __ll_sax_get(ctx);
2130                                 fd->fd_sai = __ll_sai_get(sai);
2131                                 rc = 0;
2132                         } else {
2133                                 rc = -EINVAL;
2134                                 CWARN("%s: pattern %X is not FNAME: rc = %d\n",
2135                                       sbi->ll_fsname, lli->lli_sa_pattern, rc);
2136                         }
2137
2138                         spin_unlock(&lli->lli_sa_lock);
2139                         ll_sax_free(tmp);
2140                         if (rc)
2141                                 GOTO(out, rc);
2142                 } else {
2143                         lli->lli_sa_pattern = LSA_PATTERN_FNAME;
2144                         lli->lli_sax = ctx;
2145                         fd->fd_sai = __ll_sai_get(sai);
2146                         spin_unlock(&lli->lli_sa_lock);
2147                 }
2148         } else {
2149                 spin_lock(&lli->lli_sa_lock);
2150                 if (!(lli->lli_sa_pattern == LSA_PATTERN_FNAME ||
2151                       lli->lli_sa_pattern == LSA_PATTERN_NONE)) {
2152                         spin_unlock(&lli->lli_sa_lock);
2153                         GOTO(out, rc = -EINVAL);
2154                 }
2155
2156                 lli->lli_sa_pattern = LSA_PATTERN_FNAME;
2157                 fd->fd_sai = __ll_sai_get(sai);
2158                 spin_unlock(&lli->lli_sa_lock);
2159         }
2160
2161         __ll_sax_get(ctx);
2162         CDEBUG(D_READA,
2163                "start statahead thread: [pid %d] [parent %pd] sai %p ctx %p\n",
2164                current->pid, dentry, sai, ctx);
2165
2166         task = kthread_create_on_node(ll_statahead_thread, sai, node,
2167                                       "ll_sa_%u", current->pid);
2168         if (IS_ERR(task)) {
2169                 rc = PTR_ERR(task);
2170                 CERROR("%s: cannot start ll_sa thread: rc = %d\n",
2171                        sbi->ll_fsname, rc);
2172                 GOTO(out, rc);
2173         }
2174
2175         if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
2176                 ll_start_agl(dentry, sai);
2177
2178         atomic_inc(&sbi->ll_sa_total);
2179         sai->sai_task = task;
2180         wake_up_process(task);
2181
2182         RETURN(0);
2183 out:
2184         if (fd->fd_sai) {
2185                 ll_sai_put(sai);
2186                 ll_sax_put(dir, ctx);
2187                 fd->fd_sai = NULL;
2188         }
2189
2190         if (sai)
2191                 ll_sai_free(sai);
2192
2193         if (ctx)
2194                 ll_sax_free(ctx);
2195
2196         atomic_dec(&sbi->ll_sa_running);
2197         RETURN(rc);
2198 }