Whamcloud - gitweb
LU-16335 test: add fail_abort_cleanup()
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #include <linux/fs.h>
33 #include <linux/sched.h>
34 #include <linux/kthread.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38 #include <linux/delay.h>
39
40 #define DEBUG_SUBSYSTEM S_LLITE
41
42 #include <obd_support.h>
43 #include <lustre_dlm.h>
44 #include "llite_internal.h"
45
46 #define SA_OMITTED_ENTRY_MAX 8ULL
47
48 typedef enum {
49         /** negative values are for error cases */
50         SA_ENTRY_INIT = 0,      /** init entry */
51         SA_ENTRY_SUCC = 1,      /** stat succeed */
52         SA_ENTRY_INVA = 2,      /** invalid entry */
53 } se_state_t;
54
55 /*
56  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
57  * and in async stat callback ll_statahead_interpret() will prepare the inode
58  * and set lock data in the ptlrpcd context. Then the scanner process will be
59  * woken up if this entry is the waiting one, can access and free it.
60  */
61 struct sa_entry {
62         /* link into sai_entries */
63         struct list_head        se_list;
64         /* link into sai hash table locally */
65         struct list_head        se_hash;
66         /* entry index in the sai */
67         __u64                   se_index;
68         /* low layer ldlm lock handle */
69         __u64                   se_handle;
70         /* entry status */
71         se_state_t              se_state;
72         /* entry size, contains name */
73         int                     se_size;
74         /* pointer to the target inode */
75         struct inode           *se_inode;
76         /* entry name */
77         struct qstr             se_qstr;
78         /* entry fid */
79         struct lu_fid           se_fid;
80 };
81
82 static unsigned int sai_generation;
83 static DEFINE_SPINLOCK(sai_generation_lock);
84
85 static inline int sa_unhashed(struct sa_entry *entry)
86 {
87         return list_empty(&entry->se_hash);
88 }
89
90 /* sa_entry is ready to use */
91 static inline int sa_ready(struct sa_entry *entry)
92 {
93         /* Make sure sa_entry is updated and ready to use */
94         smp_rmb();
95         return (entry->se_state != SA_ENTRY_INIT);
96 }
97
98 /* hash value to put in sai_cache */
99 static inline int sa_hash(int val)
100 {
101         return val & LL_SA_CACHE_MASK;
102 }
103
104 /* hash entry into sai_cache */
105 static inline void
106 sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
107 {
108         int i = sa_hash(entry->se_qstr.hash);
109
110         spin_lock(&sai->sai_cache_lock[i]);
111         list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
112         spin_unlock(&sai->sai_cache_lock[i]);
113 }
114
115 /* unhash entry from sai_cache */
116 static inline void
117 sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
118 {
119         int i = sa_hash(entry->se_qstr.hash);
120
121         spin_lock(&sai->sai_cache_lock[i]);
122         list_del_init(&entry->se_hash);
123         spin_unlock(&sai->sai_cache_lock[i]);
124 }
125
126 static inline int agl_should_run(struct ll_statahead_info *sai,
127                                  struct inode *inode)
128 {
129         return inode && S_ISREG(inode->i_mode) && sai->sai_agl_task;
130 }
131
132 static inline struct ll_inode_info *
133 agl_first_entry(struct ll_statahead_info *sai)
134 {
135         return list_first_entry(&sai->sai_agls, struct ll_inode_info,
136                                 lli_agl_list);
137 }
138
139 /* statahead window is full */
140 static inline int sa_sent_full(struct ll_statahead_info *sai)
141 {
142         return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
143 }
144
145 static inline int agl_list_empty(struct ll_statahead_info *sai)
146 {
147         return list_empty(&sai->sai_agls);
148 }
149
150 /**
151  * (1) hit ratio less than 80%
152  * or
153  * (2) consecutive miss more than 8
154  * then means low hit.
155  */
156 static inline int sa_low_hit(struct ll_statahead_info *sai)
157 {
158         return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
159                 (sai->sai_consecutive_miss > 8));
160 }
161
162 /*
163  * if the given index is behind of statahead window more than
164  * SA_OMITTED_ENTRY_MAX, then it is old.
165  */
166 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
167 {
168         return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
169                 sai->sai_index);
170 }
171
172 /* allocate sa_entry and hash it to allow scanner process to find it */
173 static struct sa_entry *
174 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
175          const char *name, int len, const struct lu_fid *fid)
176 {
177         struct ll_inode_info *lli;
178         struct sa_entry *entry;
179         int entry_size;
180         char *dname;
181
182         ENTRY;
183
184         entry_size = sizeof(struct sa_entry) + (len & ~3) + 4;
185         OBD_ALLOC(entry, entry_size);
186         if (unlikely(!entry))
187                 RETURN(ERR_PTR(-ENOMEM));
188
189         CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
190                len, name, entry, index);
191
192         entry->se_index = index;
193
194         entry->se_state = SA_ENTRY_INIT;
195         entry->se_size = entry_size;
196         dname = (char *)entry + sizeof(struct sa_entry);
197         memcpy(dname, name, len);
198         dname[len] = 0;
199         entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
200         entry->se_qstr.len = len;
201         entry->se_qstr.name = dname;
202         entry->se_fid = *fid;
203
204         lli = ll_i2info(sai->sai_dentry->d_inode);
205
206         spin_lock(&lli->lli_sa_lock);
207         INIT_LIST_HEAD(&entry->se_list);
208         sa_rehash(sai, entry);
209         spin_unlock(&lli->lli_sa_lock);
210
211         atomic_inc(&sai->sai_cache_count);
212
213         RETURN(entry);
214 }
215
216 /* free sa_entry, which should have been unhashed and not in any list */
217 static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
218 {
219         CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
220                entry->se_qstr.len, entry->se_qstr.name, entry,
221                entry->se_index);
222
223         LASSERT(list_empty(&entry->se_list));
224         LASSERT(sa_unhashed(entry));
225
226         OBD_FREE(entry, entry->se_size);
227         atomic_dec(&sai->sai_cache_count);
228 }
229
230 /*
231  * find sa_entry by name, used by directory scanner, lock is not needed because
232  * only scanner can remove the entry from cache.
233  */
234 static struct sa_entry *
235 sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
236 {
237         struct sa_entry *entry;
238         int i = sa_hash(qstr->hash);
239
240         list_for_each_entry(entry, &sai->sai_cache[i], se_hash) {
241                 if (entry->se_qstr.hash == qstr->hash &&
242                     entry->se_qstr.len == qstr->len &&
243                     memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0)
244                         return entry;
245         }
246         return NULL;
247 }
248
249 /* unhash and unlink sa_entry, and then free it */
250 static inline void
251 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
252 {
253         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
254
255         LASSERT(!sa_unhashed(entry));
256         LASSERT(!list_empty(&entry->se_list));
257         LASSERT(sa_ready(entry));
258
259         sa_unhash(sai, entry);
260
261         spin_lock(&lli->lli_sa_lock);
262         list_del_init(&entry->se_list);
263         spin_unlock(&lli->lli_sa_lock);
264
265         iput(entry->se_inode);
266
267         sa_free(sai, entry);
268 }
269
270 /* called by scanner after use, sa_entry will be killed */
271 static void
272 sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
273 {
274         struct sa_entry *tmp, *next;
275
276         if (entry && entry->se_state == SA_ENTRY_SUCC) {
277                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
278
279                 sai->sai_hit++;
280                 sai->sai_consecutive_miss = 0;
281                 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
282         } else {
283                 sai->sai_miss++;
284                 sai->sai_consecutive_miss++;
285         }
286
287         if (entry)
288                 sa_kill(sai, entry);
289
290         /*
291          * kill old completed entries, only scanner process does this, no need
292          * to lock
293          */
294         list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) {
295                 if (!is_omitted_entry(sai, tmp->se_index))
296                         break;
297                 sa_kill(sai, tmp);
298         }
299 }
300
301 /*
302  * update state and sort add entry to sai_entries by index, return true if
303  * scanner is waiting on this entry.
304  */
305 static bool
306 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
307 {
308         struct sa_entry *se;
309         struct list_head *pos = &sai->sai_entries;
310         __u64 index = entry->se_index;
311
312         LASSERT(!sa_ready(entry));
313         LASSERT(list_empty(&entry->se_list));
314
315         list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
316                 if (se->se_index < entry->se_index) {
317                         pos = &se->se_list;
318                         break;
319                 }
320         }
321         list_add(&entry->se_list, pos);
322         /*
323          * LU-9210: ll_statahead_interpet must be able to see this before
324          * we wake it up
325          */
326         smp_store_release(&entry->se_state,
327                           ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
328
329         return (index == sai->sai_index_wait);
330 }
331
332 /* finish async stat RPC arguments */
333 static void sa_fini_data(struct md_op_item *item)
334 {
335         struct md_op_data *op_data = &item->mop_data;
336
337         if (op_data->op_flags & MF_OPNAME_KMALLOCED)
338                 /* allocated via ll_setup_filename called from sa_prep_data */
339                 kfree(op_data->op_name);
340         ll_unlock_md_op_lsm(&item->mop_data);
341         iput(item->mop_dir);
342         OBD_FREE_PTR(item);
343 }
344
345 static int ll_statahead_interpret(struct md_op_item *item, int rc);
346
347 /*
348  * prepare arguments for async stat RPC.
349  */
350 static struct md_op_item *
351 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
352 {
353         struct md_op_item *item;
354         struct ldlm_enqueue_info *einfo;
355         struct md_op_data *op_data;
356
357         OBD_ALLOC_PTR(item);
358         if (!item)
359                 return ERR_PTR(-ENOMEM);
360
361         op_data = ll_prep_md_op_data(&item->mop_data, dir, child,
362                                      entry->se_qstr.name, entry->se_qstr.len, 0,
363                                      LUSTRE_OPC_ANY, NULL);
364         if (IS_ERR(op_data)) {
365                 OBD_FREE_PTR(item);
366                 return (struct md_op_item *)op_data;
367         }
368
369         if (!child)
370                 op_data->op_fid2 = entry->se_fid;
371
372         item->mop_it.it_op = IT_GETATTR;
373         item->mop_dir = igrab(dir);
374         item->mop_cb = ll_statahead_interpret;
375         item->mop_cbdata = entry;
376
377         einfo = &item->mop_einfo;
378         einfo->ei_type = LDLM_IBITS;
379         einfo->ei_mode = it_to_lock_mode(&item->mop_it);
380         einfo->ei_cb_bl = ll_md_blocking_ast;
381         einfo->ei_cb_cp = ldlm_completion_ast;
382         einfo->ei_cb_gl = NULL;
383         einfo->ei_cbdata = NULL;
384         einfo->ei_req_slot = 1;
385
386         return item;
387 }
388
389 /*
390  * release resources used in async stat RPC, update entry state and wakeup if
391  * scanner process it waiting on this entry.
392  */
393 static void
394 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
395 {
396         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
397         bool wakeup;
398
399         spin_lock(&lli->lli_sa_lock);
400         wakeup = __sa_make_ready(sai, entry, ret);
401         spin_unlock(&lli->lli_sa_lock);
402
403         if (wakeup)
404                 wake_up(&sai->sai_waitq);
405 }
406
407 /* insert inode into the list of sai_agls */
408 static void ll_agl_add(struct ll_statahead_info *sai,
409                        struct inode *inode, int index)
410 {
411         struct ll_inode_info *child  = ll_i2info(inode);
412         struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
413
414         spin_lock(&child->lli_agl_lock);
415         if (child->lli_agl_index == 0) {
416                 child->lli_agl_index = index;
417                 spin_unlock(&child->lli_agl_lock);
418
419                 LASSERT(list_empty(&child->lli_agl_list));
420
421                 spin_lock(&parent->lli_agl_lock);
422                 /* Re-check under the lock */
423                 if (agl_should_run(sai, inode)) {
424                         if (agl_list_empty(sai))
425                                 wake_up_process(sai->sai_agl_task);
426                         igrab(inode);
427                         list_add_tail(&child->lli_agl_list, &sai->sai_agls);
428                 } else
429                         child->lli_agl_index = 0;
430                 spin_unlock(&parent->lli_agl_lock);
431         } else {
432                 spin_unlock(&child->lli_agl_lock);
433         }
434 }
435
436 /* allocate sai */
437 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
438 {
439         struct ll_statahead_info *sai;
440         struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
441         int i;
442
443         ENTRY;
444
445         OBD_ALLOC_PTR(sai);
446         if (!sai)
447                 RETURN(NULL);
448
449         sai->sai_dentry = dget(dentry);
450         atomic_set(&sai->sai_refcount, 1);
451         sai->sai_max = LL_SA_RPC_MIN;
452         sai->sai_index = 1;
453         init_waitqueue_head(&sai->sai_waitq);
454
455         INIT_LIST_HEAD(&sai->sai_entries);
456         INIT_LIST_HEAD(&sai->sai_agls);
457
458         for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
459                 INIT_LIST_HEAD(&sai->sai_cache[i]);
460                 spin_lock_init(&sai->sai_cache_lock[i]);
461         }
462         atomic_set(&sai->sai_cache_count, 0);
463
464         spin_lock(&sai_generation_lock);
465         lli->lli_sa_generation = ++sai_generation;
466         if (unlikely(sai_generation == 0))
467                 lli->lli_sa_generation = ++sai_generation;
468         spin_unlock(&sai_generation_lock);
469
470         RETURN(sai);
471 }
472
473 /* free sai */
474 static inline void ll_sai_free(struct ll_statahead_info *sai)
475 {
476         LASSERT(sai->sai_dentry != NULL);
477         dput(sai->sai_dentry);
478         OBD_FREE_PTR(sai);
479 }
480
481 /*
482  * take refcount of sai if sai for @dir exists, which means statahead is on for
483  * this directory.
484  */
485 static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
486 {
487         struct ll_inode_info *lli = ll_i2info(dir);
488         struct ll_statahead_info *sai = NULL;
489
490         spin_lock(&lli->lli_sa_lock);
491         sai = lli->lli_sai;
492         if (sai)
493                 atomic_inc(&sai->sai_refcount);
494         spin_unlock(&lli->lli_sa_lock);
495
496         return sai;
497 }
498
499 /*
500  * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
501  * attached to it.
502  */
503 static void ll_sai_put(struct ll_statahead_info *sai)
504 {
505         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
506
507         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
508                 struct sa_entry *entry, *next;
509                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
510
511                 lli->lli_sai = NULL;
512                 spin_unlock(&lli->lli_sa_lock);
513
514                 LASSERT(!sai->sai_task);
515                 LASSERT(!sai->sai_agl_task);
516                 LASSERT(sai->sai_sent == sai->sai_replied);
517
518                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
519                                          se_list)
520                         sa_kill(sai, entry);
521
522                 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
523                 LASSERT(agl_list_empty(sai));
524
525                 ll_sai_free(sai);
526                 atomic_dec(&sbi->ll_sa_running);
527         }
528 }
529
530 /* Do NOT forget to drop inode refcount when into sai_agls. */
531 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
532 {
533         struct ll_inode_info *lli = ll_i2info(inode);
534         u64 index = lli->lli_agl_index;
535         ktime_t expire;
536         int rc;
537
538         ENTRY;
539
540         LASSERT(list_empty(&lli->lli_agl_list));
541
542         /* AGL maybe fall behind statahead with one entry */
543         if (is_omitted_entry(sai, index + 1)) {
544                 lli->lli_agl_index = 0;
545                 iput(inode);
546                 RETURN_EXIT;
547         }
548
549         /*
550          * In case of restore, the MDT has the right size and has already
551          * sent it back without granting the layout lock, inode is up-to-date.
552          * Then AGL (async glimpse lock) is useless.
553          * Also to glimpse we need the layout, in case of a runninh restore
554          * the MDT holds the layout lock so the glimpse will block up to the
555          * end of restore (statahead/agl will block)
556          */
557         if (test_bit(LLIF_FILE_RESTORING, &lli->lli_flags)) {
558                 lli->lli_agl_index = 0;
559                 iput(inode);
560                 RETURN_EXIT;
561         }
562
563         /* Someone is in glimpse (sync or async), do nothing. */
564         rc = down_write_trylock(&lli->lli_glimpse_sem);
565         if (rc == 0) {
566                 lli->lli_agl_index = 0;
567                 iput(inode);
568                 RETURN_EXIT;
569         }
570
571         /*
572          * Someone triggered glimpse within 1 sec before.
573          * 1) The former glimpse succeeded with glimpse lock granted by OST, and
574          *    if the lock is still cached on client, AGL needs to do nothing. If
575          *    it is cancelled by other client, AGL maybe cannot obtaion new lock
576          *    for no glimpse callback triggered by AGL.
577          * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
578          *    Under such case, it is quite possible that the OST will not grant
579          *    glimpse lock for AGL also.
580          * 3) The former glimpse failed, compared with other two cases, it is
581          *    relative rare. AGL can ignore such case, and it will not muchly
582          *    affect the performance.
583          */
584         expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
585         if (ktime_to_ns(lli->lli_glimpse_time) &&
586             ktime_before(expire, lli->lli_glimpse_time)) {
587                 up_write(&lli->lli_glimpse_sem);
588                 lli->lli_agl_index = 0;
589                 iput(inode);
590                 RETURN_EXIT;
591         }
592
593         CDEBUG(D_READA,
594                "Handling (init) async glimpse: inode = " DFID", idx = %llu\n",
595                PFID(&lli->lli_fid), index);
596
597         cl_agl(inode);
598         lli->lli_agl_index = 0;
599         lli->lli_glimpse_time = ktime_get();
600         up_write(&lli->lli_glimpse_sem);
601
602         CDEBUG(D_READA,
603                "Handled (init) async glimpse: inode= " DFID", idx = %llu, rc = %d\n",
604                PFID(&lli->lli_fid), index, rc);
605
606         iput(inode);
607
608         EXIT;
609 }
610
611 static void ll_statahead_interpret_fini(struct ll_inode_info *lli,
612                                         struct ll_statahead_info *sai,
613                                         struct md_op_item *item,
614                                         struct sa_entry *entry,
615                                         struct ptlrpc_request *req,
616                                         int rc)
617 {
618         /*
619          * First it will drop ldlm ibits lock refcount by calling
620          * ll_intent_drop_lock() in spite of failures. Do not worry about
621          * calling ll_intent_drop_lock() more than once.
622          */
623         ll_intent_release(&item->mop_it);
624         sa_fini_data(item);
625         if (req)
626                 ptlrpc_req_finished(req);
627         sa_make_ready(sai, entry, rc);
628
629         spin_lock(&lli->lli_sa_lock);
630         sai->sai_replied++;
631         spin_unlock(&lli->lli_sa_lock);
632 }
633
634 static void ll_statahead_interpret_work(struct work_struct *work)
635 {
636         struct md_op_item *item = container_of(work, struct md_op_item,
637                                                mop_work);
638         struct req_capsule *pill = item->mop_pill;
639         struct inode *dir = item->mop_dir;
640         struct ll_inode_info *lli = ll_i2info(dir);
641         struct ll_statahead_info *sai = lli->lli_sai;
642         struct lookup_intent *it;
643         struct sa_entry *entry;
644         struct mdt_body *body;
645         struct inode *child;
646         int rc;
647
648         ENTRY;
649
650         entry = (struct sa_entry *)item->mop_cbdata;
651         LASSERT(entry->se_handle != 0);
652
653         it = &item->mop_it;
654         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
655         if (!body)
656                 GOTO(out, rc = -EFAULT);
657
658         child = entry->se_inode;
659         /* revalidate; unlinked and re-created with the same name */
660         if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
661                 if (child) {
662                         entry->se_inode = NULL;
663                         iput(child);
664                 }
665                 /* The mdt_body is invalid. Skip this entry */
666                 GOTO(out, rc = -EAGAIN);
667         }
668
669         it->it_lock_handle = entry->se_handle;
670         rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
671         if (rc != 1)
672                 GOTO(out, rc = -EAGAIN);
673
674         rc = ll_prep_inode(&child, pill, dir->i_sb, it);
675         if (rc)
676                 GOTO(out, rc);
677
678         /* If encryption context was returned by MDT, put it in
679          * inode now to save an extra getxattr.
680          */
681         if (body->mbo_valid & OBD_MD_ENCCTX) {
682                 void *encctx = req_capsule_server_get(pill, &RMF_FILE_ENCCTX);
683                 __u32 encctxlen = req_capsule_get_size(pill, &RMF_FILE_ENCCTX,
684                                                        RCL_SERVER);
685
686                 if (encctxlen) {
687                         CDEBUG(D_SEC,
688                                "server returned encryption ctx for "DFID"\n",
689                                PFID(ll_inode2fid(child)));
690                         rc = ll_xattr_cache_insert(child,
691                                                    xattr_for_enc(child),
692                                                    encctx, encctxlen);
693                         if (rc)
694                                 CWARN("%s: cannot set enc ctx for "DFID": rc = %d\n",
695                                       ll_i2sbi(child)->ll_fsname,
696                                       PFID(ll_inode2fid(child)), rc);
697                 }
698         }
699
700         CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
701                ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
702                entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
703         ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
704
705         entry->se_inode = child;
706
707         if (agl_should_run(sai, child))
708                 ll_agl_add(sai, child, entry->se_index);
709 out:
710         ll_statahead_interpret_fini(lli, sai, item, entry, pill->rc_req, rc);
711 }
712
713 /*
714  * Callback for async stat RPC, this is called in ptlrpcd context. It prepares
715  * the inode and set lock data directly in the ptlrpcd context. It will wake up
716  * the directory listing process if the dentry is the waiting one.
717  */
718 static int ll_statahead_interpret(struct md_op_item *item, int rc)
719 {
720         struct req_capsule *pill = item->mop_pill;
721         struct lookup_intent *it = &item->mop_it;
722         struct inode *dir = item->mop_dir;
723         struct ll_inode_info *lli = ll_i2info(dir);
724         struct ll_statahead_info *sai = lli->lli_sai;
725         struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
726         struct work_struct *work = &item->mop_work;
727         struct mdt_body *body;
728         struct inode *child;
729         __u64 handle = 0;
730
731         ENTRY;
732
733         if (it_disposition(it, DISP_LOOKUP_NEG))
734                 rc = -ENOENT;
735
736         /*
737          * because statahead thread will wait for all inflight RPC to finish,
738          * sai should be always valid, no need to refcount
739          */
740         LASSERT(sai != NULL);
741         LASSERT(entry != NULL);
742
743         CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
744                entry->se_qstr.len, entry->se_qstr.name, rc);
745
746         if (rc != 0)
747                 GOTO(out, rc);
748
749         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
750         if (!body)
751                 GOTO(out, rc = -EFAULT);
752
753         child = entry->se_inode;
754         /* revalidate; unlinked and re-created with the same name */
755         if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
756                 if (child) {
757                         entry->se_inode = NULL;
758                         iput(child);
759                 }
760                 /* The mdt_body is invalid. Skip this entry */
761                 GOTO(out, rc = -EAGAIN);
762         }
763
764         entry->se_handle = it->it_lock_handle;
765         /*
766          * In ptlrpcd context, it is not allowed to generate new RPCs
767          * especially for striped directories or regular files with layout
768          * change.
769          */
770         /*
771          * release ibits lock ASAP to avoid deadlock when statahead
772          * thread enqueues lock on parent in readdir and another
773          * process enqueues lock on child with parent lock held, eg.
774          * unlink.
775          */
776         handle = it->it_lock_handle;
777         ll_intent_drop_lock(it);
778         ll_unlock_md_op_lsm(&item->mop_data);
779
780         /*
781          * If the statahead entry is a striped directory or regular file with
782          * layout change, it will generate a new RPC and long wait in the
783          * ptlrpcd context.
784          * However, it is dangerous of blocking in ptlrpcd thread.
785          * Here we use work queue or the separate statahead thread to handle
786          * the extra RPC and long wait:
787          *      (@ll_prep_inode->@lmv_revalidate_slaves);
788          *      (@ll_prep_inode->@lov_layout_change->osc_cache_wait_range);
789          */
790         INIT_WORK(work, ll_statahead_interpret_work);
791         ptlrpc_request_addref(pill->rc_req);
792         schedule_work(work);
793         RETURN(0);
794 out:
795         ll_statahead_interpret_fini(lli, sai, item, entry, NULL, rc);
796         RETURN(rc);
797 }
798
799 /* async stat for file not found in dcache */
800 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
801 {
802         struct md_op_item *item;
803         int rc;
804
805         ENTRY;
806
807         item = sa_prep_data(dir, NULL, entry);
808         if (IS_ERR(item))
809                 RETURN(PTR_ERR(item));
810
811         rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
812         if (rc < 0)
813                 sa_fini_data(item);
814
815         RETURN(rc);
816 }
817
818 /**
819  * async stat for file found in dcache, similar to .revalidate
820  *
821  * \retval      1 dentry valid, no RPC sent
822  * \retval      0 dentry invalid, will send async stat RPC
823  * \retval      negative number upon error
824  */
825 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
826                          struct dentry *dentry)
827 {
828         struct inode *inode = dentry->d_inode;
829         struct lookup_intent it = { .it_op = IT_GETATTR,
830                                     .it_lock_handle = 0 };
831         struct md_op_item *item;
832         int rc;
833
834         ENTRY;
835
836         if (unlikely(!inode))
837                 RETURN(1);
838
839         if (d_mountpoint(dentry))
840                 RETURN(1);
841
842         item = sa_prep_data(dir, inode, entry);
843         if (IS_ERR(item))
844                 RETURN(PTR_ERR(item));
845
846         entry->se_inode = igrab(inode);
847         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
848                                 NULL);
849         if (rc == 1) {
850                 entry->se_handle = it.it_lock_handle;
851                 ll_intent_release(&it);
852                 sa_fini_data(item);
853                 RETURN(1);
854         }
855
856         rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
857         if (rc < 0) {
858                 entry->se_inode = NULL;
859                 iput(inode);
860                 sa_fini_data(item);
861         }
862
863         RETURN(rc);
864 }
865
866 /* async stat for file with @name */
867 static void sa_statahead(struct dentry *parent, const char *name, int len,
868                          const struct lu_fid *fid)
869 {
870         struct inode *dir = parent->d_inode;
871         struct ll_inode_info *lli = ll_i2info(dir);
872         struct ll_statahead_info *sai = lli->lli_sai;
873         struct dentry *dentry = NULL;
874         struct sa_entry *entry;
875         int rc;
876
877         ENTRY;
878
879         entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
880         if (IS_ERR(entry))
881                 RETURN_EXIT;
882
883         dentry = d_lookup(parent, &entry->se_qstr);
884         if (!dentry) {
885                 rc = sa_lookup(dir, entry);
886         } else {
887                 rc = sa_revalidate(dir, entry, dentry);
888                 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
889                         ll_agl_add(sai, dentry->d_inode, entry->se_index);
890         }
891
892         if (dentry)
893                 dput(dentry);
894
895         if (rc != 0)
896                 sa_make_ready(sai, entry, rc);
897         else
898                 sai->sai_sent++;
899
900         sai->sai_index++;
901
902         EXIT;
903 }
904
905 /* async glimpse (agl) thread main function */
906 static int ll_agl_thread(void *arg)
907 {
908         struct dentry *parent = (struct dentry *)arg;
909         struct inode *dir = parent->d_inode;
910         struct ll_inode_info *plli = ll_i2info(dir);
911         struct ll_inode_info *clli;
912         /*
913          * We already own this reference, so it is safe to take it
914          * without a lock.
915          */
916         struct ll_statahead_info *sai = plli->lli_sai;
917
918         ENTRY;
919
920         CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
921                sai, parent);
922
923         while (({set_current_state(TASK_IDLE);
924                  !kthread_should_stop(); })) {
925                 spin_lock(&plli->lli_agl_lock);
926                 clli = list_first_entry_or_null(&sai->sai_agls,
927                                                 struct ll_inode_info,
928                                                 lli_agl_list);
929                 if (clli) {
930                         __set_current_state(TASK_RUNNING);
931                         list_del_init(&clli->lli_agl_list);
932                         spin_unlock(&plli->lli_agl_lock);
933                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
934                         cond_resched();
935                 } else {
936                         spin_unlock(&plli->lli_agl_lock);
937                         schedule();
938                 }
939         }
940         __set_current_state(TASK_RUNNING);
941         RETURN(0);
942 }
943
944 static void ll_stop_agl(struct ll_statahead_info *sai)
945 {
946         struct dentry *parent = sai->sai_dentry;
947         struct ll_inode_info *plli = ll_i2info(parent->d_inode);
948         struct ll_inode_info *clli;
949         struct task_struct *agl_task;
950
951         spin_lock(&plli->lli_agl_lock);
952         agl_task = sai->sai_agl_task;
953         sai->sai_agl_task = NULL;
954         spin_unlock(&plli->lli_agl_lock);
955         if (!agl_task)
956                 return;
957
958         CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
959                sai, (unsigned int)agl_task->pid);
960         kthread_stop(agl_task);
961
962         spin_lock(&plli->lli_agl_lock);
963         while ((clli = list_first_entry_or_null(&sai->sai_agls,
964                                                 struct ll_inode_info,
965                                                 lli_agl_list)) != NULL) {
966                 list_del_init(&clli->lli_agl_list);
967                 spin_unlock(&plli->lli_agl_lock);
968                 clli->lli_agl_index = 0;
969                 iput(&clli->lli_vfs_inode);
970                 spin_lock(&plli->lli_agl_lock);
971         }
972         spin_unlock(&plli->lli_agl_lock);
973         CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
974                sai, parent);
975         ll_sai_put(sai);
976 }
977
978 /* start agl thread */
979 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
980 {
981         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
982         struct ll_inode_info *plli;
983         struct task_struct *task;
984
985         ENTRY;
986
987         CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
988                sai, parent);
989
990         plli = ll_i2info(parent->d_inode);
991         task = kthread_create_on_node(ll_agl_thread, parent, node, "ll_agl_%d",
992                                       plli->lli_opendir_pid);
993         if (IS_ERR(task)) {
994                 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
995                 RETURN_EXIT;
996         }
997         sai->sai_agl_task = task;
998         atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
999         /* Get an extra reference that the thread holds */
1000         ll_sai_get(d_inode(parent));
1001
1002         wake_up_process(task);
1003
1004         EXIT;
1005 }
1006
1007 /* statahead thread main function */
1008 static int ll_statahead_thread(void *arg)
1009 {
1010         struct dentry *parent = (struct dentry *)arg;
1011         struct inode *dir = parent->d_inode;
1012         struct ll_inode_info *lli = ll_i2info(dir);
1013         struct ll_sb_info *sbi = ll_i2sbi(dir);
1014         struct ll_statahead_info *sai = lli->lli_sai;
1015         int first = 0;
1016         struct md_op_data *op_data;
1017         struct page *page = NULL;
1018         __u64 pos = 0;
1019         int rc = 0;
1020
1021         ENTRY;
1022
1023         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1024                sai, parent);
1025
1026         OBD_ALLOC_PTR(op_data);
1027         if (!op_data)
1028                 GOTO(out, rc = -ENOMEM);
1029
1030         while (pos != MDS_DIR_END_OFF && sai->sai_task) {
1031                 struct lu_dirpage *dp;
1032                 struct lu_dirent  *ent;
1033
1034                 op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
1035                                              LUSTRE_OPC_ANY, dir);
1036                 if (IS_ERR(op_data)) {
1037                         rc = PTR_ERR(op_data);
1038                         break;
1039                 }
1040
1041                 page = ll_get_dir_page(dir, op_data, pos, NULL);
1042                 ll_unlock_md_op_lsm(op_data);
1043                 if (IS_ERR(page)) {
1044                         rc = PTR_ERR(page);
1045                         CDEBUG(D_READA,
1046                                "error reading dir "DFID" at %llu /%llu opendir_pid = %u: rc = %d\n",
1047                                PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1048                                lli->lli_opendir_pid, rc);
1049                         break;
1050                 }
1051
1052                 dp = page_address(page);
1053                 for (ent = lu_dirent_start(dp);
1054                      ent != NULL && sai->sai_task &&
1055                      !sa_low_hit(sai);
1056                      ent = lu_dirent_next(ent)) {
1057                         __u64 hash;
1058                         int namelen;
1059                         char *name;
1060                         struct lu_fid fid;
1061                         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1062
1063                         hash = le64_to_cpu(ent->lde_hash);
1064                         if (unlikely(hash < pos))
1065                                 /*
1066                                  * Skip until we find target hash value.
1067                                  */
1068                                 continue;
1069
1070                         namelen = le16_to_cpu(ent->lde_namelen);
1071                         if (unlikely(namelen == 0))
1072                                 /*
1073                                  * Skip dummy record.
1074                                  */
1075                                 continue;
1076
1077                         name = ent->lde_name;
1078                         if (name[0] == '.') {
1079                                 if (namelen == 1) {
1080                                         /*
1081                                          * skip "."
1082                                          */
1083                                         continue;
1084                                 } else if (name[1] == '.' && namelen == 2) {
1085                                         /*
1086                                          * skip ".."
1087                                          */
1088                                         continue;
1089                                 } else if (!sai->sai_ls_all) {
1090                                         /*
1091                                          * skip hidden files.
1092                                          */
1093                                         sai->sai_skip_hidden++;
1094                                         continue;
1095                                 }
1096                         }
1097
1098                         /*
1099                          * don't stat-ahead first entry.
1100                          */
1101                         if (unlikely(++first == 1))
1102                                 continue;
1103
1104                         fid_le_to_cpu(&fid, &ent->lde_fid);
1105
1106                         while (({set_current_state(TASK_IDLE);
1107                                  sai->sai_task; })) {
1108                                 spin_lock(&lli->lli_agl_lock);
1109                                 while (sa_sent_full(sai) &&
1110                                        !agl_list_empty(sai)) {
1111                                         struct ll_inode_info *clli;
1112
1113                                         __set_current_state(TASK_RUNNING);
1114                                         clli = agl_first_entry(sai);
1115                                         list_del_init(&clli->lli_agl_list);
1116                                         spin_unlock(&lli->lli_agl_lock);
1117
1118                                         ll_agl_trigger(&clli->lli_vfs_inode,
1119                                                        sai);
1120                                         cond_resched();
1121                                         spin_lock(&lli->lli_agl_lock);
1122                                 }
1123                                 spin_unlock(&lli->lli_agl_lock);
1124
1125                                 if (!sa_sent_full(sai))
1126                                         break;
1127                                 schedule();
1128                         }
1129                         __set_current_state(TASK_RUNNING);
1130
1131                         if (IS_ENCRYPTED(dir)) {
1132                                 struct llcrypt_str de_name =
1133                                         LLTR_INIT(ent->lde_name, namelen);
1134                                 struct lu_fid fid;
1135
1136                                 rc = llcrypt_fname_alloc_buffer(dir, NAME_MAX,
1137                                                                 &lltr);
1138                                 if (rc < 0)
1139                                         continue;
1140
1141                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1142                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1143                                                          &lltr, &fid)) {
1144                                         llcrypt_fname_free_buffer(&lltr);
1145                                         continue;
1146                                 }
1147
1148                                 name = lltr.name;
1149                                 namelen = lltr.len;
1150                         }
1151
1152                         sa_statahead(parent, name, namelen, &fid);
1153                         llcrypt_fname_free_buffer(&lltr);
1154                 }
1155
1156                 pos = le64_to_cpu(dp->ldp_hash_end);
1157                 down_read(&lli->lli_lsm_sem);
1158                 ll_release_page(dir, page,
1159                                 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1160                 up_read(&lli->lli_lsm_sem);
1161
1162                 if (sa_low_hit(sai)) {
1163                         rc = -EFAULT;
1164                         atomic_inc(&sbi->ll_sa_wrong);
1165                         CDEBUG(D_READA,
1166                                "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stoppingstatahead thread: pid %d\n",
1167                                PFID(&lli->lli_fid), sai->sai_hit,
1168                                sai->sai_miss, sai->sai_sent,
1169                                sai->sai_replied, current->pid);
1170                         break;
1171                 }
1172         }
1173         ll_finish_md_op_data(op_data);
1174
1175         if (rc < 0) {
1176                 spin_lock(&lli->lli_sa_lock);
1177                 sai->sai_task = NULL;
1178                 lli->lli_sa_enabled = 0;
1179                 spin_unlock(&lli->lli_sa_lock);
1180         }
1181
1182         /*
1183          * statahead is finished, but statahead entries need to be cached, wait
1184          * for file release closedir() call to stop me.
1185          */
1186         while (({set_current_state(TASK_IDLE);
1187                  sai->sai_task; })) {
1188                 schedule();
1189         }
1190         __set_current_state(TASK_RUNNING);
1191
1192         EXIT;
1193 out:
1194         ll_stop_agl(sai);
1195
1196         /*
1197          * wait for inflight statahead RPCs to finish, and then we can free sai
1198          * safely because statahead RPC will access sai data
1199          */
1200         while (sai->sai_sent != sai->sai_replied)
1201                 /* in case we're not woken up, timeout wait */
1202                 msleep(125);
1203
1204         CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd\n",
1205                sbi->ll_fsname, sai, parent);
1206
1207         spin_lock(&lli->lli_sa_lock);
1208         sai->sai_task = NULL;
1209         spin_unlock(&lli->lli_sa_lock);
1210         wake_up(&sai->sai_waitq);
1211
1212         atomic_add(sai->sai_hit, &sbi->ll_sa_hit_total);
1213         atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
1214
1215         ll_sai_put(sai);
1216
1217         return rc;
1218 }
1219
1220 /* authorize opened dir handle @key to statahead */
1221 void ll_authorize_statahead(struct inode *dir, void *key)
1222 {
1223         struct ll_inode_info *lli = ll_i2info(dir);
1224
1225         spin_lock(&lli->lli_sa_lock);
1226         if (!lli->lli_opendir_key && !lli->lli_sai) {
1227                 /*
1228                  * if lli_sai is not NULL, it means previous statahead is not
1229                  * finished yet, we'd better not start a new statahead for now.
1230                  */
1231                 LASSERT(lli->lli_opendir_pid == 0);
1232                 lli->lli_opendir_key = key;
1233                 lli->lli_opendir_pid = current->pid;
1234                 lli->lli_sa_enabled = 1;
1235         }
1236         spin_unlock(&lli->lli_sa_lock);
1237 }
1238
1239 /*
1240  * deauthorize opened dir handle @key to statahead, and notify statahead thread
1241  * to quit if it's running.
1242  */
1243 void ll_deauthorize_statahead(struct inode *dir, void *key)
1244 {
1245         struct ll_inode_info *lli = ll_i2info(dir);
1246         struct ll_statahead_info *sai;
1247
1248         LASSERT(lli->lli_opendir_key == key);
1249         LASSERT(lli->lli_opendir_pid != 0);
1250
1251         CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1252                PFID(&lli->lli_fid));
1253
1254         spin_lock(&lli->lli_sa_lock);
1255         lli->lli_opendir_key = NULL;
1256         lli->lli_opendir_pid = 0;
1257         lli->lli_sa_enabled = 0;
1258         sai = lli->lli_sai;
1259         if (sai && sai->sai_task) {
1260                 /*
1261                  * statahead thread may not have quit yet because it needs to
1262                  * cache entries, now it's time to tell it to quit.
1263                  *
1264                  * wake_up_process() provides the necessary barriers
1265                  * to pair with set_current_state().
1266                  */
1267                 struct task_struct *task = sai->sai_task;
1268
1269                 sai->sai_task = NULL;
1270                 wake_up_process(task);
1271         }
1272         spin_unlock(&lli->lli_sa_lock);
1273 }
1274
1275 enum {
1276         /**
1277          * not first dirent, or is "."
1278          */
1279         LS_NOT_FIRST_DE = 0,
1280         /**
1281          * the first non-hidden dirent
1282          */
1283         LS_FIRST_DE,
1284         /**
1285          * the first hidden dirent, that is "."
1286          */
1287         LS_FIRST_DOT_DE
1288 };
1289
1290 /* file is first dirent under @dir */
1291 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1292 {
1293         struct qstr *target = &dentry->d_name;
1294         struct md_op_data *op_data;
1295         int dot_de;
1296         struct page *page = NULL;
1297         int rc = LS_NOT_FIRST_DE;
1298         __u64 pos = 0;
1299         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1300
1301         ENTRY;
1302
1303         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1304                                      LUSTRE_OPC_ANY, dir);
1305         if (IS_ERR(op_data))
1306                 RETURN(PTR_ERR(op_data));
1307
1308         if (IS_ENCRYPTED(dir)) {
1309                 int rc2 = llcrypt_fname_alloc_buffer(dir, NAME_MAX, &lltr);
1310
1311                 if (rc2 < 0)
1312                         RETURN(rc2);
1313         }
1314
1315         /**
1316          *FIXME choose the start offset of the readdir
1317          */
1318
1319         page = ll_get_dir_page(dir, op_data, 0, NULL);
1320
1321         while (1) {
1322                 struct lu_dirpage *dp;
1323                 struct lu_dirent  *ent;
1324
1325                 if (IS_ERR(page)) {
1326                         struct ll_inode_info *lli = ll_i2info(dir);
1327
1328                         rc = PTR_ERR(page);
1329                         CERROR("%s: reading dir "DFID" at %llu opendir_pid = %u : rc = %d\n",
1330                                ll_i2sbi(dir)->ll_fsname,
1331                                PFID(ll_inode2fid(dir)), pos,
1332                                lli->lli_opendir_pid, rc);
1333                         break;
1334                 }
1335
1336                 dp = page_address(page);
1337                 for (ent = lu_dirent_start(dp); ent != NULL;
1338                      ent = lu_dirent_next(ent)) {
1339                         __u64 hash;
1340                         int namelen;
1341                         char *name;
1342
1343                         hash = le64_to_cpu(ent->lde_hash);
1344                         /*
1345                          * The ll_get_dir_page() can return any page containing
1346                          * the given hash which may be not the start hash.
1347                          */
1348                         if (unlikely(hash < pos))
1349                                 continue;
1350
1351                         namelen = le16_to_cpu(ent->lde_namelen);
1352                         if (unlikely(namelen == 0))
1353                                 /*
1354                                  * skip dummy record.
1355                                  */
1356                                 continue;
1357
1358                         name = ent->lde_name;
1359                         if (name[0] == '.') {
1360                                 if (namelen == 1)
1361                                         /*
1362                                          * skip "."
1363                                          */
1364                                         continue;
1365                                 else if (name[1] == '.' && namelen == 2)
1366                                         /*
1367                                          * skip ".."
1368                                          */
1369                                         continue;
1370                                 else
1371                                         dot_de = 1;
1372                         } else {
1373                                 dot_de = 0;
1374                         }
1375
1376                         if (dot_de && target->name[0] != '.') {
1377                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1378                                        target->len, target->name,
1379                                        namelen, name);
1380                                 continue;
1381                         }
1382
1383                         if (IS_ENCRYPTED(dir)) {
1384                                 struct llcrypt_str de_name =
1385                                         LLTR_INIT(ent->lde_name, namelen);
1386                                 struct lu_fid fid;
1387
1388                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1389                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1390                                                          &lltr, &fid))
1391                                         continue;
1392                                 name = lltr.name;
1393                                 namelen = lltr.len;
1394                         }
1395
1396                         if (target->len != namelen ||
1397                             memcmp(target->name, name, namelen) != 0)
1398                                 rc = LS_NOT_FIRST_DE;
1399                         else if (!dot_de)
1400                                 rc = LS_FIRST_DE;
1401                         else
1402                                 rc = LS_FIRST_DOT_DE;
1403
1404                         ll_release_page(dir, page, false);
1405                         GOTO(out, rc);
1406                 }
1407                 pos = le64_to_cpu(dp->ldp_hash_end);
1408                 if (pos == MDS_DIR_END_OFF) {
1409                         /*
1410                          * End of directory reached.
1411                          */
1412                         ll_release_page(dir, page, false);
1413                         GOTO(out, rc);
1414                 } else {
1415                         /*
1416                          * chain is exhausted
1417                          * Normal case: continue to the next page.
1418                          */
1419                         ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1420                                               LDF_COLLIDE);
1421                         page = ll_get_dir_page(dir, op_data, pos, NULL);
1422                 }
1423         }
1424         EXIT;
1425 out:
1426         llcrypt_fname_free_buffer(&lltr);
1427         ll_finish_md_op_data(op_data);
1428
1429         return rc;
1430 }
1431
1432 /**
1433  * revalidate @dentryp from statahead cache
1434  *
1435  * \param[in] dir       parent directory
1436  * \param[in] sai       sai structure
1437  * \param[out] dentryp  pointer to dentry which will be revalidated
1438  * \param[in] unplug    unplug statahead window only (normally for negative
1439  *                      dentry)
1440  * \retval              1 on success, dentry is saved in @dentryp
1441  * \retval              0 if revalidation failed (no proper lock on client)
1442  * \retval              negative number upon error
1443  */
1444 static int revalidate_statahead_dentry(struct inode *dir,
1445                                        struct ll_statahead_info *sai,
1446                                        struct dentry **dentryp,
1447                                        bool unplug)
1448 {
1449         struct sa_entry *entry = NULL;
1450         struct ll_inode_info *lli = ll_i2info(dir);
1451         int rc = 0;
1452
1453         ENTRY;
1454
1455         if ((*dentryp)->d_name.name[0] == '.') {
1456                 if (sai->sai_ls_all ||
1457                     sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1458                         /*
1459                          * Hidden dentry is the first one, or statahead
1460                          * thread does not skip so many hidden dentries
1461                          * before "sai_ls_all" enabled as below.
1462                          */
1463                 } else {
1464                         if (!sai->sai_ls_all)
1465                                 /*
1466                                  * It maybe because hidden dentry is not
1467                                  * the first one, "sai_ls_all" was not
1468                                  * set, then "ls -al" missed. Enable
1469                                  * "sai_ls_all" for such case.
1470                                  */
1471                                 sai->sai_ls_all = 1;
1472
1473                         /*
1474                          * Such "getattr" has been skipped before
1475                          * "sai_ls_all" enabled as above.
1476                          */
1477                         sai->sai_miss_hidden++;
1478                         RETURN(-EAGAIN);
1479                 }
1480         }
1481
1482         if (unplug)
1483                 GOTO(out, rc = 1);
1484
1485         entry = sa_get(sai, &(*dentryp)->d_name);
1486         if (!entry)
1487                 GOTO(out, rc = -EAGAIN);
1488
1489         if (!sa_ready(entry)) {
1490                 spin_lock(&lli->lli_sa_lock);
1491                 sai->sai_index_wait = entry->se_index;
1492                 spin_unlock(&lli->lli_sa_lock);
1493                 rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
1494                                              cfs_time_seconds(30));
1495                 if (rc == 0) {
1496                         /*
1497                          * entry may not be ready, so it may be used by inflight
1498                          * statahead RPC, don't free it.
1499                          */
1500                         entry = NULL;
1501                         GOTO(out, rc = -EAGAIN);
1502                 }
1503         }
1504
1505         /*
1506          * We need to see the value that was set immediately before we
1507          * were woken up.
1508          */
1509         if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
1510             entry->se_inode) {
1511                 struct inode *inode = entry->se_inode;
1512                 struct lookup_intent it = { .it_op = IT_GETATTR,
1513                                             .it_lock_handle =
1514                                                 entry->se_handle };
1515                 __u64 bits;
1516
1517                 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1518                                         ll_inode2fid(inode), &bits);
1519                 if (rc == 1) {
1520                         if (!(*dentryp)->d_inode) {
1521                                 struct dentry *alias;
1522
1523                                 alias = ll_splice_alias(inode, *dentryp);
1524                                 if (IS_ERR(alias)) {
1525                                         ll_intent_release(&it);
1526                                         GOTO(out, rc = PTR_ERR(alias));
1527                                 }
1528                                 *dentryp = alias;
1529                                 /*
1530                                  * statahead prepared this inode, transfer inode
1531                                  * refcount from sa_entry to dentry
1532                                  */
1533                                 entry->se_inode = NULL;
1534                         } else if ((*dentryp)->d_inode != inode) {
1535                                 /* revalidate, but inode is recreated */
1536                                 CDEBUG(D_READA,
1537                                        "%s: stale dentry %pd inode " DFID", statahead inode "DFID "\n",
1538                                        ll_i2sbi(inode)->ll_fsname, *dentryp,
1539                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
1540                                        PFID(ll_inode2fid(inode)));
1541                                 ll_intent_release(&it);
1542                                 GOTO(out, rc = -ESTALE);
1543                         }
1544
1545                         if ((bits & MDS_INODELOCK_LOOKUP) &&
1546                             d_lustre_invalid(*dentryp)) {
1547                                 d_lustre_revalidate(*dentryp);
1548                                 ll_update_dir_depth(dir, (*dentryp)->d_inode);
1549                         }
1550
1551                         ll_intent_release(&it);
1552                 }
1553         }
1554 out:
1555         /*
1556          * statahead cached sa_entry can be used only once, and will be killed
1557          * right after use, so if lookup/revalidate accessed statahead cache,
1558          * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
1559          * stat this file again, we know we've done statahead before, see
1560          * dentry_may_statahead().
1561          */
1562         if (lld_is_init(*dentryp))
1563                 ll_d2d(*dentryp)->lld_sa_generation = lli->lli_sa_generation;
1564         sa_put(sai, entry);
1565         spin_lock(&lli->lli_sa_lock);
1566         if (sai->sai_task)
1567                 wake_up_process(sai->sai_task);
1568         spin_unlock(&lli->lli_sa_lock);
1569
1570         RETURN(rc);
1571 }
1572
1573 /**
1574  * start statahead thread
1575  *
1576  * \param[in] dir       parent directory
1577  * \param[in] dentry    dentry that triggers statahead, normally the first
1578  *                      dirent under @dir
1579  * \param[in] agl       indicate whether AGL is needed
1580  * \retval              -EAGAIN on success, because when this function is
1581  *                      called, it's already in lookup call, so client should
1582  *                      do it itself instead of waiting for statahead thread
1583  *                      to do it asynchronously.
1584  * \retval              negative number upon error
1585  */
1586 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
1587                                   bool agl)
1588 {
1589         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1590         struct ll_inode_info *lli = ll_i2info(dir);
1591         struct ll_statahead_info *sai = NULL;
1592         struct dentry *parent = dentry->d_parent;
1593         struct task_struct *task;
1594         struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
1595         int first = LS_FIRST_DE;
1596         int rc = 0;
1597
1598         ENTRY;
1599
1600         /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1601         first = is_first_dirent(dir, dentry);
1602         if (first == LS_NOT_FIRST_DE)
1603                 /* It is not "ls -{a}l" operation, no need statahead for it. */
1604                 GOTO(out, rc = -EFAULT);
1605
1606         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
1607                                        sbi->ll_sa_running_max)) {
1608                 CDEBUG(D_READA,
1609                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
1610                 GOTO(out, rc = -EMFILE);
1611         }
1612
1613         sai = ll_sai_alloc(parent);
1614         if (!sai)
1615                 GOTO(out, rc = -ENOMEM);
1616
1617         sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
1618
1619         /*
1620          * if current lli_opendir_key was deauthorized, or dir re-opened by
1621          * another process, don't start statahead, otherwise the newly spawned
1622          * statahead thread won't be notified to quit.
1623          */
1624         spin_lock(&lli->lli_sa_lock);
1625         if (unlikely(lli->lli_sai || !lli->lli_opendir_key ||
1626                      lli->lli_opendir_pid != current->pid)) {
1627                 spin_unlock(&lli->lli_sa_lock);
1628                 GOTO(out, rc = -EPERM);
1629         }
1630         lli->lli_sai = sai;
1631         spin_unlock(&lli->lli_sa_lock);
1632
1633         CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
1634                current->pid, parent);
1635
1636         task = kthread_create_on_node(ll_statahead_thread, parent, node,
1637                                       "ll_sa_%u", lli->lli_opendir_pid);
1638         if (IS_ERR(task)) {
1639                 spin_lock(&lli->lli_sa_lock);
1640                 lli->lli_sai = NULL;
1641                 spin_unlock(&lli->lli_sa_lock);
1642                 rc = PTR_ERR(task);
1643                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1644                 GOTO(out, rc);
1645         }
1646
1647         if (test_bit(LL_SBI_AGL_ENABLED, ll_i2sbi(parent->d_inode)->ll_flags) &&
1648             agl)
1649                 ll_start_agl(parent, sai);
1650
1651         atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_total);
1652         sai->sai_task = task;
1653
1654         wake_up_process(task);
1655         /*
1656          * We don't stat-ahead for the first dirent since we are already in
1657          * lookup.
1658          */
1659         RETURN(-EAGAIN);
1660
1661 out:
1662         /*
1663          * once we start statahead thread failed, disable statahead so that
1664          * subsequent stat won't waste time to try it.
1665          */
1666         spin_lock(&lli->lli_sa_lock);
1667         if (lli->lli_opendir_pid == current->pid)
1668                 lli->lli_sa_enabled = 0;
1669         spin_unlock(&lli->lli_sa_lock);
1670
1671         if (sai)
1672                 ll_sai_free(sai);
1673         if (first != LS_NOT_FIRST_DE)
1674                 atomic_dec(&sbi->ll_sa_running);
1675
1676         RETURN(rc);
1677 }
1678
1679 /*
1680  * Check whether statahead for @dir was started.
1681  */
1682 static inline bool ll_statahead_started(struct inode *dir, bool agl)
1683 {
1684         struct ll_inode_info *lli = ll_i2info(dir);
1685         struct ll_statahead_info *sai;
1686
1687         spin_lock(&lli->lli_sa_lock);
1688         sai = lli->lli_sai;
1689         if (sai && (sai->sai_agl_task != NULL) != agl)
1690                 CDEBUG(D_READA,
1691                        "%s: Statahead AGL hint changed from %d to %d\n",
1692                        ll_i2sbi(dir)->ll_fsname,
1693                        sai->sai_agl_task != NULL, agl);
1694         spin_unlock(&lli->lli_sa_lock);
1695
1696         return !!sai;
1697 }
1698
1699 /**
1700  * statahead entry function, this is called when client getattr on a file, it
1701  * will start statahead thread if this is the first dir entry, else revalidate
1702  * dentry from statahead cache.
1703  *
1704  * \param[in]  dir      parent directory
1705  * \param[out] dentryp  dentry to getattr
1706  * \param[in]  agl      whether start the agl thread
1707  *
1708  * \retval              1 on success
1709  * \retval              0 revalidation from statahead cache failed, caller needs
1710  *                      to getattr from server directly
1711  * \retval              negative number on error, caller often ignores this and
1712  *                      then getattr from server
1713  */
1714 int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
1715 {
1716         if (!ll_statahead_started(dir, agl))
1717                 return start_statahead_thread(dir, dentry, agl);
1718         return 0;
1719 }
1720
1721 /**
1722  * revalidate dentry from statahead cache.
1723  *
1724  * \param[in]  dir      parent directory
1725  * \param[out] dentryp  dentry to getattr
1726  * \param[in]  unplug   unplug statahead window only (normally for negative
1727  *                      dentry)
1728  * \retval              1 on success
1729  * \retval              0 revalidation from statahead cache failed, caller needs
1730  *                      to getattr from server directly
1731  * \retval              negative number on error, caller often ignores this and
1732  *                      then getattr from server
1733  */
1734 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
1735                             bool unplug)
1736 {
1737         struct ll_statahead_info *sai;
1738         int rc = 0;
1739
1740         sai = ll_sai_get(dir);
1741         if (sai) {
1742                 rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
1743                 CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
1744                        *dentryp, rc);
1745                 ll_sai_put(sai);
1746         }
1747         return rc;
1748 }