Whamcloud - gitweb
LU-15971 llite: implicit default LMV inherit
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #include <linux/fs.h>
33 #include <linux/sched.h>
34 #include <linux/kthread.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38 #include <linux/delay.h>
39
40 #define DEBUG_SUBSYSTEM S_LLITE
41
42 #include <obd_support.h>
43 #include <lustre_dlm.h>
44 #include "llite_internal.h"
45
46 #define SA_OMITTED_ENTRY_MAX 8ULL
47
48 typedef enum {
49         /** negative values are for error cases */
50         SA_ENTRY_INIT = 0,      /** init entry */
51         SA_ENTRY_SUCC = 1,      /** stat succeed */
52         SA_ENTRY_INVA = 2,      /** invalid entry */
53 } se_state_t;
54
55 /*
56  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
57  * and in async stat callback ll_statahead_interpret() will prepare the inode
58  * and set lock data in the ptlrpcd context. Then the scanner process will be
59  * woken up if this entry is the waiting one, can access and free it.
60  */
61 struct sa_entry {
62         /* link into sai_entries */
63         struct list_head        se_list;
64         /* link into sai hash table locally */
65         struct list_head        se_hash;
66         /* entry index in the sai */
67         __u64                   se_index;
68         /* low layer ldlm lock handle */
69         __u64                   se_handle;
70         /* entry status */
71         se_state_t              se_state;
72         /* entry size, contains name */
73         int                     se_size;
74         /* pointer to the target inode */
75         struct inode           *se_inode;
76         /* entry name */
77         struct qstr             se_qstr;
78         /* entry fid */
79         struct lu_fid           se_fid;
80 };
81
82 static unsigned int sai_generation;
83 static DEFINE_SPINLOCK(sai_generation_lock);
84
85 static inline int sa_unhashed(struct sa_entry *entry)
86 {
87         return list_empty(&entry->se_hash);
88 }
89
90 /* sa_entry is ready to use */
91 static inline int sa_ready(struct sa_entry *entry)
92 {
93         /* Make sure sa_entry is updated and ready to use */
94         smp_rmb();
95         return (entry->se_state != SA_ENTRY_INIT);
96 }
97
98 /* hash value to put in sai_cache */
99 static inline int sa_hash(int val)
100 {
101         return val & LL_SA_CACHE_MASK;
102 }
103
104 /* hash entry into sai_cache */
105 static inline void
106 sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
107 {
108         int i = sa_hash(entry->se_qstr.hash);
109
110         spin_lock(&sai->sai_cache_lock[i]);
111         list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
112         spin_unlock(&sai->sai_cache_lock[i]);
113 }
114
115 /* unhash entry from sai_cache */
116 static inline void
117 sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
118 {
119         int i = sa_hash(entry->se_qstr.hash);
120
121         spin_lock(&sai->sai_cache_lock[i]);
122         list_del_init(&entry->se_hash);
123         spin_unlock(&sai->sai_cache_lock[i]);
124 }
125
126 static inline int agl_should_run(struct ll_statahead_info *sai,
127                                  struct inode *inode)
128 {
129         return inode && S_ISREG(inode->i_mode) && sai->sai_agl_task;
130 }
131
132 static inline struct ll_inode_info *
133 agl_first_entry(struct ll_statahead_info *sai)
134 {
135         return list_first_entry(&sai->sai_agls, struct ll_inode_info,
136                                 lli_agl_list);
137 }
138
139 /* statahead window is full */
140 static inline int sa_sent_full(struct ll_statahead_info *sai)
141 {
142         return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
143 }
144
145 /* Batch metadata handle */
146 static inline bool sa_has_batch_handle(struct ll_statahead_info *sai)
147 {
148         return sai->sai_bh != NULL;
149 }
150
151 static inline void ll_statahead_flush_nowait(struct ll_statahead_info *sai)
152 {
153         if (sa_has_batch_handle(sai)) {
154                 sai->sai_index_end = sai->sai_index - 1;
155                 (void) md_batch_flush(ll_i2mdexp(sai->sai_dentry->d_inode),
156                                       sai->sai_bh, false);
157         }
158 }
159
160 static inline int agl_list_empty(struct ll_statahead_info *sai)
161 {
162         return list_empty(&sai->sai_agls);
163 }
164
165 /**
166  * (1) hit ratio less than 80%
167  * or
168  * (2) consecutive miss more than 8
169  * then means low hit.
170  */
171 static inline int sa_low_hit(struct ll_statahead_info *sai)
172 {
173         return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
174                 (sai->sai_consecutive_miss > 8));
175 }
176
177 /*
178  * if the given index is behind of statahead window more than
179  * SA_OMITTED_ENTRY_MAX, then it is old.
180  */
181 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
182 {
183         return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
184                 sai->sai_index);
185 }
186
187 /* allocate sa_entry and hash it to allow scanner process to find it */
188 static struct sa_entry *
189 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
190          const char *name, int len, const struct lu_fid *fid)
191 {
192         struct ll_inode_info *lli;
193         struct sa_entry *entry;
194         int entry_size;
195         char *dname;
196
197         ENTRY;
198
199         entry_size = sizeof(struct sa_entry) + (len & ~3) + 4;
200         OBD_ALLOC(entry, entry_size);
201         if (unlikely(!entry))
202                 RETURN(ERR_PTR(-ENOMEM));
203
204         CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
205                len, name, entry, index);
206
207         entry->se_index = index;
208
209         entry->se_state = SA_ENTRY_INIT;
210         entry->se_size = entry_size;
211         dname = (char *)entry + sizeof(struct sa_entry);
212         memcpy(dname, name, len);
213         dname[len] = 0;
214         entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
215         entry->se_qstr.len = len;
216         entry->se_qstr.name = dname;
217         entry->se_fid = *fid;
218
219         lli = ll_i2info(sai->sai_dentry->d_inode);
220
221         spin_lock(&lli->lli_sa_lock);
222         INIT_LIST_HEAD(&entry->se_list);
223         sa_rehash(sai, entry);
224         spin_unlock(&lli->lli_sa_lock);
225
226         atomic_inc(&sai->sai_cache_count);
227
228         RETURN(entry);
229 }
230
231 /* free sa_entry, which should have been unhashed and not in any list */
232 static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
233 {
234         CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
235                entry->se_qstr.len, entry->se_qstr.name, entry,
236                entry->se_index);
237
238         LASSERT(list_empty(&entry->se_list));
239         LASSERT(sa_unhashed(entry));
240
241         OBD_FREE(entry, entry->se_size);
242         atomic_dec(&sai->sai_cache_count);
243 }
244
245 /*
246  * find sa_entry by name, used by directory scanner, lock is not needed because
247  * only scanner can remove the entry from cache.
248  */
249 static struct sa_entry *
250 sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
251 {
252         struct sa_entry *entry;
253         int i = sa_hash(qstr->hash);
254
255         list_for_each_entry(entry, &sai->sai_cache[i], se_hash) {
256                 if (entry->se_qstr.hash == qstr->hash &&
257                     entry->se_qstr.len == qstr->len &&
258                     memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0)
259                         return entry;
260         }
261         return NULL;
262 }
263
264 /* unhash and unlink sa_entry, and then free it */
265 static inline void
266 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
267 {
268         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
269
270         LASSERT(!sa_unhashed(entry));
271         LASSERT(!list_empty(&entry->se_list));
272         LASSERT(sa_ready(entry));
273
274         sa_unhash(sai, entry);
275
276         spin_lock(&lli->lli_sa_lock);
277         list_del_init(&entry->se_list);
278         spin_unlock(&lli->lli_sa_lock);
279
280         iput(entry->se_inode);
281
282         sa_free(sai, entry);
283 }
284
285 /* called by scanner after use, sa_entry will be killed */
286 static void
287 sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
288 {
289         struct ll_inode_info *lli = ll_i2info(dir);
290         struct sa_entry *tmp, *next;
291         bool wakeup = false;
292
293         if (entry && entry->se_state == SA_ENTRY_SUCC) {
294                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
295
296                 sai->sai_hit++;
297                 sai->sai_consecutive_miss = 0;
298                 if (sai->sai_max < sbi->ll_sa_max) {
299                         sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
300                         wakeup = true;
301                 } else if (sai->sai_max_batch_count > 0) {
302                         if (sai->sai_max >= sai->sai_max_batch_count &&
303                            (sai->sai_index_end - entry->se_index) %
304                            sai->sai_max_batch_count == 0) {
305                                 wakeup = true;
306                         } else if (entry->se_index == sai->sai_index_end) {
307                                 wakeup = true;
308                         }
309                 } else {
310                         wakeup = true;
311                 }
312         } else {
313                 sai->sai_miss++;
314                 sai->sai_consecutive_miss++;
315                 wakeup = true;
316         }
317
318         if (entry)
319                 sa_kill(sai, entry);
320
321         /*
322          * kill old completed entries, only scanner process does this, no need
323          * to lock
324          */
325         list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) {
326                 if (!is_omitted_entry(sai, tmp->se_index))
327                         break;
328                 sa_kill(sai, tmp);
329         }
330
331         spin_lock(&lli->lli_sa_lock);
332         if (wakeup && sai->sai_task)
333                 wake_up_process(sai->sai_task);
334         spin_unlock(&lli->lli_sa_lock);
335 }
336
337 /*
338  * update state and sort add entry to sai_entries by index, return true if
339  * scanner is waiting on this entry.
340  */
341 static bool
342 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
343 {
344         struct sa_entry *se;
345         struct list_head *pos = &sai->sai_entries;
346         __u64 index = entry->se_index;
347
348         LASSERT(!sa_ready(entry));
349         LASSERT(list_empty(&entry->se_list));
350
351         list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
352                 if (se->se_index < entry->se_index) {
353                         pos = &se->se_list;
354                         break;
355                 }
356         }
357         list_add(&entry->se_list, pos);
358         /*
359          * LU-9210: ll_statahead_interpet must be able to see this before
360          * we wake it up
361          */
362         smp_store_release(&entry->se_state,
363                           ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
364
365         return (index == sai->sai_index_wait);
366 }
367
368 /* finish async stat RPC arguments */
369 static void sa_fini_data(struct md_op_item *item)
370 {
371         struct md_op_data *op_data = &item->mop_data;
372
373         if (op_data->op_flags & MF_OPNAME_KMALLOCED)
374                 /* allocated via ll_setup_filename called from sa_prep_data */
375                 kfree(op_data->op_name);
376         ll_unlock_md_op_lsm(&item->mop_data);
377         iput(item->mop_dir);
378         if (item->mop_subpill_allocated)
379                 OBD_FREE_PTR(item->mop_pill);
380         OBD_FREE_PTR(item);
381 }
382
383 static int ll_statahead_interpret(struct md_op_item *item, int rc);
384
385 /*
386  * prepare arguments for async stat RPC.
387  */
388 static struct md_op_item *
389 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
390 {
391         struct md_op_item *item;
392         struct ldlm_enqueue_info *einfo;
393         struct md_op_data *op_data;
394
395         OBD_ALLOC_PTR(item);
396         if (!item)
397                 return ERR_PTR(-ENOMEM);
398
399         op_data = ll_prep_md_op_data(&item->mop_data, dir, child,
400                                      entry->se_qstr.name, entry->se_qstr.len, 0,
401                                      LUSTRE_OPC_ANY, NULL);
402         if (IS_ERR(op_data)) {
403                 OBD_FREE_PTR(item);
404                 return (struct md_op_item *)op_data;
405         }
406
407         if (!child)
408                 op_data->op_fid2 = entry->se_fid;
409
410         item->mop_opc = MD_OP_GETATTR;
411         item->mop_it.it_op = IT_GETATTR;
412         item->mop_dir = igrab(dir);
413         item->mop_cb = ll_statahead_interpret;
414         item->mop_cbdata = entry;
415
416         einfo = &item->mop_einfo;
417         einfo->ei_type = LDLM_IBITS;
418         einfo->ei_mode = it_to_lock_mode(&item->mop_it);
419         einfo->ei_cb_bl = ll_md_blocking_ast;
420         einfo->ei_cb_cp = ldlm_completion_ast;
421         einfo->ei_cb_gl = NULL;
422         einfo->ei_cbdata = NULL;
423         einfo->ei_req_slot = 1;
424
425         return item;
426 }
427
428 /*
429  * release resources used in async stat RPC, update entry state and wakeup if
430  * scanner process it waiting on this entry.
431  */
432 static void
433 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
434 {
435         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
436         bool wakeup;
437
438         spin_lock(&lli->lli_sa_lock);
439         wakeup = __sa_make_ready(sai, entry, ret);
440         spin_unlock(&lli->lli_sa_lock);
441
442         if (wakeup)
443                 wake_up(&sai->sai_waitq);
444 }
445
446 /* insert inode into the list of sai_agls */
447 static void ll_agl_add(struct ll_statahead_info *sai,
448                        struct inode *inode, int index)
449 {
450         struct ll_inode_info *child  = ll_i2info(inode);
451         struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
452
453         spin_lock(&child->lli_agl_lock);
454         if (child->lli_agl_index == 0) {
455                 child->lli_agl_index = index;
456                 spin_unlock(&child->lli_agl_lock);
457
458                 LASSERT(list_empty(&child->lli_agl_list));
459
460                 spin_lock(&parent->lli_agl_lock);
461                 /* Re-check under the lock */
462                 if (agl_should_run(sai, inode)) {
463                         if (agl_list_empty(sai))
464                                 wake_up_process(sai->sai_agl_task);
465                         igrab(inode);
466                         list_add_tail(&child->lli_agl_list, &sai->sai_agls);
467                 } else
468                         child->lli_agl_index = 0;
469                 spin_unlock(&parent->lli_agl_lock);
470         } else {
471                 spin_unlock(&child->lli_agl_lock);
472         }
473 }
474
475 /* allocate sai */
476 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
477 {
478         struct ll_statahead_info *sai;
479         struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
480         int i;
481
482         ENTRY;
483
484         OBD_ALLOC_PTR(sai);
485         if (!sai)
486                 RETURN(NULL);
487
488         sai->sai_dentry = dget(dentry);
489         atomic_set(&sai->sai_refcount, 1);
490         sai->sai_max = LL_SA_RPC_MIN;
491         sai->sai_index = 1;
492         init_waitqueue_head(&sai->sai_waitq);
493
494         INIT_LIST_HEAD(&sai->sai_entries);
495         INIT_LIST_HEAD(&sai->sai_agls);
496
497         for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
498                 INIT_LIST_HEAD(&sai->sai_cache[i]);
499                 spin_lock_init(&sai->sai_cache_lock[i]);
500         }
501         atomic_set(&sai->sai_cache_count, 0);
502
503         spin_lock(&sai_generation_lock);
504         lli->lli_sa_generation = ++sai_generation;
505         if (unlikely(sai_generation == 0))
506                 lli->lli_sa_generation = ++sai_generation;
507         spin_unlock(&sai_generation_lock);
508
509         RETURN(sai);
510 }
511
512 /* free sai */
513 static inline void ll_sai_free(struct ll_statahead_info *sai)
514 {
515         LASSERT(sai->sai_dentry != NULL);
516         dput(sai->sai_dentry);
517         OBD_FREE_PTR(sai);
518 }
519
520 /*
521  * take refcount of sai if sai for @dir exists, which means statahead is on for
522  * this directory.
523  */
524 static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
525 {
526         struct ll_inode_info *lli = ll_i2info(dir);
527         struct ll_statahead_info *sai = NULL;
528
529         spin_lock(&lli->lli_sa_lock);
530         sai = lli->lli_sai;
531         if (sai)
532                 atomic_inc(&sai->sai_refcount);
533         spin_unlock(&lli->lli_sa_lock);
534
535         return sai;
536 }
537
538 /*
539  * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
540  * attached to it.
541  */
542 static void ll_sai_put(struct ll_statahead_info *sai)
543 {
544         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
545
546         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
547                 struct sa_entry *entry, *next;
548                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
549
550                 lli->lli_sai = NULL;
551                 spin_unlock(&lli->lli_sa_lock);
552
553                 LASSERT(!sai->sai_task);
554                 LASSERT(!sai->sai_agl_task);
555                 LASSERT(sai->sai_sent == sai->sai_replied);
556
557                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
558                                          se_list)
559                         sa_kill(sai, entry);
560
561                 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
562                 LASSERT(agl_list_empty(sai));
563
564                 ll_sai_free(sai);
565                 atomic_dec(&sbi->ll_sa_running);
566         }
567 }
568
569 /* Do NOT forget to drop inode refcount when into sai_agls. */
570 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
571 {
572         struct ll_inode_info *lli = ll_i2info(inode);
573         u64 index = lli->lli_agl_index;
574         ktime_t expire;
575         int rc;
576
577         ENTRY;
578
579         LASSERT(list_empty(&lli->lli_agl_list));
580
581         /* AGL maybe fall behind statahead with one entry */
582         if (is_omitted_entry(sai, index + 1)) {
583                 lli->lli_agl_index = 0;
584                 iput(inode);
585                 RETURN_EXIT;
586         }
587
588         /*
589          * In case of restore, the MDT has the right size and has already
590          * sent it back without granting the layout lock, inode is up-to-date.
591          * Then AGL (async glimpse lock) is useless.
592          * Also to glimpse we need the layout, in case of a runninh restore
593          * the MDT holds the layout lock so the glimpse will block up to the
594          * end of restore (statahead/agl will block)
595          */
596         if (test_bit(LLIF_FILE_RESTORING, &lli->lli_flags)) {
597                 lli->lli_agl_index = 0;
598                 iput(inode);
599                 RETURN_EXIT;
600         }
601
602         /* Someone is in glimpse (sync or async), do nothing. */
603         rc = down_write_trylock(&lli->lli_glimpse_sem);
604         if (rc == 0) {
605                 lli->lli_agl_index = 0;
606                 iput(inode);
607                 RETURN_EXIT;
608         }
609
610         /*
611          * Someone triggered glimpse within 1 sec before.
612          * 1) The former glimpse succeeded with glimpse lock granted by OST, and
613          *    if the lock is still cached on client, AGL needs to do nothing. If
614          *    it is cancelled by other client, AGL maybe cannot obtaion new lock
615          *    for no glimpse callback triggered by AGL.
616          * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
617          *    Under such case, it is quite possible that the OST will not grant
618          *    glimpse lock for AGL also.
619          * 3) The former glimpse failed, compared with other two cases, it is
620          *    relative rare. AGL can ignore such case, and it will not muchly
621          *    affect the performance.
622          */
623         expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
624         if (ktime_to_ns(lli->lli_glimpse_time) &&
625             ktime_before(expire, lli->lli_glimpse_time)) {
626                 up_write(&lli->lli_glimpse_sem);
627                 lli->lli_agl_index = 0;
628                 iput(inode);
629                 RETURN_EXIT;
630         }
631
632         CDEBUG(D_READA,
633                "Handling (init) async glimpse: inode = " DFID", idx = %llu\n",
634                PFID(&lli->lli_fid), index);
635
636         cl_agl(inode);
637         lli->lli_agl_index = 0;
638         lli->lli_glimpse_time = ktime_get();
639         up_write(&lli->lli_glimpse_sem);
640
641         CDEBUG(D_READA,
642                "Handled (init) async glimpse: inode= " DFID", idx = %llu, rc = %d\n",
643                PFID(&lli->lli_fid), index, rc);
644
645         iput(inode);
646
647         EXIT;
648 }
649
650 static void ll_statahead_interpret_fini(struct ll_inode_info *lli,
651                                         struct ll_statahead_info *sai,
652                                         struct md_op_item *item,
653                                         struct sa_entry *entry,
654                                         struct ptlrpc_request *req,
655                                         int rc)
656 {
657         /*
658          * First it will drop ldlm ibits lock refcount by calling
659          * ll_intent_drop_lock() in spite of failures. Do not worry about
660          * calling ll_intent_drop_lock() more than once.
661          */
662         ll_intent_release(&item->mop_it);
663         sa_fini_data(item);
664         if (req)
665                 ptlrpc_req_finished(req);
666         sa_make_ready(sai, entry, rc);
667
668         spin_lock(&lli->lli_sa_lock);
669         sai->sai_replied++;
670         spin_unlock(&lli->lli_sa_lock);
671 }
672
673 static void ll_statahead_interpret_work(struct work_struct *work)
674 {
675         struct md_op_item *item = container_of(work, struct md_op_item,
676                                                mop_work);
677         struct req_capsule *pill = item->mop_pill;
678         struct inode *dir = item->mop_dir;
679         struct ll_inode_info *lli = ll_i2info(dir);
680         struct ll_statahead_info *sai = lli->lli_sai;
681         struct lookup_intent *it;
682         struct sa_entry *entry;
683         struct mdt_body *body;
684         struct inode *child;
685         int rc;
686
687         ENTRY;
688
689         entry = (struct sa_entry *)item->mop_cbdata;
690         LASSERT(entry->se_handle != 0);
691
692         it = &item->mop_it;
693         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
694         if (!body)
695                 GOTO(out, rc = -EFAULT);
696
697         child = entry->se_inode;
698         /* revalidate; unlinked and re-created with the same name */
699         if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
700                 if (child) {
701                         entry->se_inode = NULL;
702                         iput(child);
703                 }
704                 /* The mdt_body is invalid. Skip this entry */
705                 GOTO(out, rc = -EAGAIN);
706         }
707
708         it->it_lock_handle = entry->se_handle;
709         rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
710         if (rc != 1)
711                 GOTO(out, rc = -EAGAIN);
712
713         rc = ll_prep_inode(&child, pill, dir->i_sb, it);
714         if (rc) {
715                 CERROR("%s: getattr callback for %.*s "DFID": rc = %d\n",
716                        ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
717                        entry->se_qstr.name, PFID(&entry->se_fid), rc);
718                 GOTO(out, rc);
719         }
720
721         /* If encryption context was returned by MDT, put it in
722          * inode now to save an extra getxattr.
723          */
724         if (body->mbo_valid & OBD_MD_ENCCTX) {
725                 void *encctx = req_capsule_server_get(pill, &RMF_FILE_ENCCTX);
726                 __u32 encctxlen = req_capsule_get_size(pill, &RMF_FILE_ENCCTX,
727                                                        RCL_SERVER);
728
729                 if (encctxlen) {
730                         CDEBUG(D_SEC,
731                                "server returned encryption ctx for "DFID"\n",
732                                PFID(ll_inode2fid(child)));
733                         rc = ll_xattr_cache_insert(child,
734                                                    xattr_for_enc(child),
735                                                    encctx, encctxlen);
736                         if (rc)
737                                 CWARN("%s: cannot set enc ctx for "DFID": rc = %d\n",
738                                       ll_i2sbi(child)->ll_fsname,
739                                       PFID(ll_inode2fid(child)), rc);
740                 }
741         }
742
743         CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
744                ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
745                entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
746         ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
747
748         entry->se_inode = child;
749
750         if (agl_should_run(sai, child))
751                 ll_agl_add(sai, child, entry->se_index);
752 out:
753         ll_statahead_interpret_fini(lli, sai, item, entry, pill->rc_req, rc);
754 }
755
756 /*
757  * Callback for async stat RPC, this is called in ptlrpcd context. It prepares
758  * the inode and set lock data directly in the ptlrpcd context. It will wake up
759  * the directory listing process if the dentry is the waiting one.
760  */
761 static int ll_statahead_interpret(struct md_op_item *item, int rc)
762 {
763         struct req_capsule *pill = item->mop_pill;
764         struct lookup_intent *it = &item->mop_it;
765         struct inode *dir = item->mop_dir;
766         struct ll_inode_info *lli = ll_i2info(dir);
767         struct ll_statahead_info *sai = lli->lli_sai;
768         struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
769         struct work_struct *work = &item->mop_work;
770         struct mdt_body *body;
771         struct inode *child;
772         __u64 handle = 0;
773
774         ENTRY;
775
776         if (it_disposition(it, DISP_LOOKUP_NEG))
777                 rc = -ENOENT;
778
779         /*
780          * because statahead thread will wait for all inflight RPC to finish,
781          * sai should be always valid, no need to refcount
782          */
783         LASSERT(sai != NULL);
784         LASSERT(entry != NULL);
785
786         CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
787                entry->se_qstr.len, entry->se_qstr.name, rc);
788
789         if (rc != 0)
790                 GOTO(out, rc);
791
792         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
793         if (!body)
794                 GOTO(out, rc = -EFAULT);
795
796         child = entry->se_inode;
797         /* revalidate; unlinked and re-created with the same name */
798         if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
799                 if (child) {
800                         entry->se_inode = NULL;
801                         iput(child);
802                 }
803                 /* The mdt_body is invalid. Skip this entry */
804                 GOTO(out, rc = -EAGAIN);
805         }
806
807         entry->se_handle = it->it_lock_handle;
808         /*
809          * In ptlrpcd context, it is not allowed to generate new RPCs
810          * especially for striped directories or regular files with layout
811          * change.
812          */
813         /*
814          * release ibits lock ASAP to avoid deadlock when statahead
815          * thread enqueues lock on parent in readdir and another
816          * process enqueues lock on child with parent lock held, eg.
817          * unlink.
818          */
819         handle = it->it_lock_handle;
820         ll_intent_drop_lock(it);
821         ll_unlock_md_op_lsm(&item->mop_data);
822
823         /*
824          * If the statahead entry is a striped directory or regular file with
825          * layout change, it will generate a new RPC and long wait in the
826          * ptlrpcd context.
827          * However, it is dangerous of blocking in ptlrpcd thread.
828          * Here we use work queue or the separate statahead thread to handle
829          * the extra RPC and long wait:
830          *      (@ll_prep_inode->@lmv_revalidate_slaves);
831          *      (@ll_prep_inode->@lov_layout_change->osc_cache_wait_range);
832          */
833         INIT_WORK(work, ll_statahead_interpret_work);
834         ptlrpc_request_addref(pill->rc_req);
835         schedule_work(work);
836         RETURN(0);
837 out:
838         ll_statahead_interpret_fini(lli, sai, item, entry, NULL, rc);
839         RETURN(rc);
840 }
841
842 static inline int sa_getattr(struct inode *dir, struct md_op_item *item)
843 {
844         struct ll_statahead_info *sai = ll_i2info(dir)->lli_sai;
845         int rc;
846
847         if (sa_has_batch_handle(sai))
848                 rc = md_batch_add(ll_i2mdexp(dir), sai->sai_bh, item);
849         else
850                 rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
851
852         return rc;
853 }
854
855 /* async stat for file not found in dcache */
856 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
857 {
858         struct md_op_item *item;
859         int rc;
860
861         ENTRY;
862
863         item = sa_prep_data(dir, NULL, entry);
864         if (IS_ERR(item))
865                 RETURN(PTR_ERR(item));
866
867         rc = sa_getattr(dir, item);
868         if (rc < 0)
869                 sa_fini_data(item);
870
871         RETURN(rc);
872 }
873
874 /**
875  * async stat for file found in dcache, similar to .revalidate
876  *
877  * \retval      1 dentry valid, no RPC sent
878  * \retval      0 dentry invalid, will send async stat RPC
879  * \retval      negative number upon error
880  */
881 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
882                          struct dentry *dentry)
883 {
884         struct inode *inode = dentry->d_inode;
885         struct lookup_intent it = { .it_op = IT_GETATTR,
886                                     .it_lock_handle = 0 };
887         struct md_op_item *item;
888         int rc;
889
890         ENTRY;
891
892         if (unlikely(!inode))
893                 RETURN(1);
894
895         if (d_mountpoint(dentry))
896                 RETURN(1);
897
898         item = sa_prep_data(dir, inode, entry);
899         if (IS_ERR(item))
900                 RETURN(PTR_ERR(item));
901
902         entry->se_inode = igrab(inode);
903         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
904                                 NULL);
905         if (rc == 1) {
906                 entry->se_handle = it.it_lock_handle;
907                 ll_intent_release(&it);
908                 sa_fini_data(item);
909                 RETURN(1);
910         }
911
912         rc = sa_getattr(dir, item);
913         if (rc < 0) {
914                 entry->se_inode = NULL;
915                 iput(inode);
916                 sa_fini_data(item);
917         }
918
919         RETURN(rc);
920 }
921
922 /* async stat for file with @name */
923 static void sa_statahead(struct dentry *parent, const char *name, int len,
924                          const struct lu_fid *fid)
925 {
926         struct inode *dir = parent->d_inode;
927         struct ll_inode_info *lli = ll_i2info(dir);
928         struct ll_statahead_info *sai = lli->lli_sai;
929         struct dentry *dentry = NULL;
930         struct sa_entry *entry;
931         int rc;
932
933         ENTRY;
934
935         entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
936         if (IS_ERR(entry))
937                 RETURN_EXIT;
938
939         dentry = d_lookup(parent, &entry->se_qstr);
940         if (!dentry) {
941                 rc = sa_lookup(dir, entry);
942         } else {
943                 rc = sa_revalidate(dir, entry, dentry);
944                 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
945                         ll_agl_add(sai, dentry->d_inode, entry->se_index);
946         }
947
948         if (dentry)
949                 dput(dentry);
950
951         if (rc != 0)
952                 sa_make_ready(sai, entry, rc);
953         else
954                 sai->sai_sent++;
955
956         sai->sai_index++;
957
958         if (sa_sent_full(sai))
959                 ll_statahead_flush_nowait(sai);
960
961         EXIT;
962 }
963
964 /* async glimpse (agl) thread main function */
965 static int ll_agl_thread(void *arg)
966 {
967         struct dentry *parent = (struct dentry *)arg;
968         struct inode *dir = parent->d_inode;
969         struct ll_inode_info *plli = ll_i2info(dir);
970         struct ll_inode_info *clli;
971         /*
972          * We already own this reference, so it is safe to take it
973          * without a lock.
974          */
975         struct ll_statahead_info *sai = plli->lli_sai;
976
977         ENTRY;
978
979         CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
980                sai, parent);
981
982         while (({set_current_state(TASK_IDLE);
983                  !kthread_should_stop(); })) {
984                 spin_lock(&plli->lli_agl_lock);
985                 clli = list_first_entry_or_null(&sai->sai_agls,
986                                                 struct ll_inode_info,
987                                                 lli_agl_list);
988                 if (clli) {
989                         __set_current_state(TASK_RUNNING);
990                         list_del_init(&clli->lli_agl_list);
991                         spin_unlock(&plli->lli_agl_lock);
992                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
993                         cond_resched();
994                 } else {
995                         spin_unlock(&plli->lli_agl_lock);
996                         schedule();
997                 }
998         }
999         __set_current_state(TASK_RUNNING);
1000         RETURN(0);
1001 }
1002
1003 static void ll_stop_agl(struct ll_statahead_info *sai)
1004 {
1005         struct dentry *parent = sai->sai_dentry;
1006         struct ll_inode_info *plli = ll_i2info(parent->d_inode);
1007         struct ll_inode_info *clli;
1008         struct task_struct *agl_task;
1009
1010         spin_lock(&plli->lli_agl_lock);
1011         agl_task = sai->sai_agl_task;
1012         sai->sai_agl_task = NULL;
1013         spin_unlock(&plli->lli_agl_lock);
1014         if (!agl_task)
1015                 return;
1016
1017         CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
1018                sai, (unsigned int)agl_task->pid);
1019         kthread_stop(agl_task);
1020
1021         spin_lock(&plli->lli_agl_lock);
1022         while ((clli = list_first_entry_or_null(&sai->sai_agls,
1023                                                 struct ll_inode_info,
1024                                                 lli_agl_list)) != NULL) {
1025                 list_del_init(&clli->lli_agl_list);
1026                 spin_unlock(&plli->lli_agl_lock);
1027                 clli->lli_agl_index = 0;
1028                 iput(&clli->lli_vfs_inode);
1029                 spin_lock(&plli->lli_agl_lock);
1030         }
1031         spin_unlock(&plli->lli_agl_lock);
1032         CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
1033                sai, parent);
1034         ll_sai_put(sai);
1035 }
1036
1037 /* start agl thread */
1038 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
1039 {
1040         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1041         struct ll_inode_info *plli;
1042         struct task_struct *task;
1043
1044         ENTRY;
1045
1046         CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
1047                sai, parent);
1048
1049         plli = ll_i2info(parent->d_inode);
1050         task = kthread_create_on_node(ll_agl_thread, parent, node, "ll_agl_%d",
1051                                       plli->lli_opendir_pid);
1052         if (IS_ERR(task)) {
1053                 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
1054                 RETURN_EXIT;
1055         }
1056         sai->sai_agl_task = task;
1057         atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
1058         /* Get an extra reference that the thread holds */
1059         ll_sai_get(d_inode(parent));
1060
1061         wake_up_process(task);
1062
1063         EXIT;
1064 }
1065
1066 /* statahead thread main function */
1067 static int ll_statahead_thread(void *arg)
1068 {
1069         struct dentry *parent = (struct dentry *)arg;
1070         struct inode *dir = parent->d_inode;
1071         struct ll_inode_info *lli = ll_i2info(dir);
1072         struct ll_sb_info *sbi = ll_i2sbi(dir);
1073         struct ll_statahead_info *sai = lli->lli_sai;
1074         int first = 0;
1075         struct md_op_data *op_data;
1076         struct page *page = NULL;
1077         struct lu_batch *bh = NULL;
1078         __u64 pos = 0;
1079         int rc = 0;
1080
1081         ENTRY;
1082
1083         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1084                sai, parent);
1085
1086         sai->sai_max_batch_count = sbi->ll_sa_batch_max;
1087         if (sai->sai_max_batch_count) {
1088                 bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
1089                                      sai->sai_max_batch_count);
1090                 if (IS_ERR(bh))
1091                         GOTO(out_stop_agl, rc = PTR_ERR(bh));
1092         }
1093
1094         sai->sai_bh = bh;
1095         OBD_ALLOC_PTR(op_data);
1096         if (!op_data)
1097                 GOTO(out, rc = -ENOMEM);
1098
1099         /* matches smp_store_release() in ll_deauthorize_statahead() */
1100         while (pos != MDS_DIR_END_OFF && smp_load_acquire(&sai->sai_task)) {
1101                 struct lu_dirpage *dp;
1102                 struct lu_dirent  *ent;
1103
1104                 op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
1105                                              LUSTRE_OPC_ANY, dir);
1106                 if (IS_ERR(op_data)) {
1107                         rc = PTR_ERR(op_data);
1108                         break;
1109                 }
1110
1111                 page = ll_get_dir_page(dir, op_data, pos, NULL);
1112                 ll_unlock_md_op_lsm(op_data);
1113                 if (IS_ERR(page)) {
1114                         rc = PTR_ERR(page);
1115                         CDEBUG(D_READA,
1116                                "error reading dir "DFID" at %llu /%llu opendir_pid = %u: rc = %d\n",
1117                                PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1118                                lli->lli_opendir_pid, rc);
1119                         break;
1120                 }
1121
1122                 dp = page_address(page);
1123                 for (ent = lu_dirent_start(dp);
1124                      /* matches smp_store_release() in ll_deauthorize_statahead() */
1125                      ent != NULL && smp_load_acquire(&sai->sai_task) &&
1126                      !sa_low_hit(sai);
1127                      ent = lu_dirent_next(ent)) {
1128                         __u64 hash;
1129                         int namelen;
1130                         char *name;
1131                         struct lu_fid fid;
1132                         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1133
1134                         hash = le64_to_cpu(ent->lde_hash);
1135                         if (unlikely(hash < pos))
1136                                 /*
1137                                  * Skip until we find target hash value.
1138                                  */
1139                                 continue;
1140
1141                         namelen = le16_to_cpu(ent->lde_namelen);
1142                         if (unlikely(namelen == 0))
1143                                 /*
1144                                  * Skip dummy record.
1145                                  */
1146                                 continue;
1147
1148                         name = ent->lde_name;
1149                         if (name[0] == '.') {
1150                                 if (namelen == 1) {
1151                                         /*
1152                                          * skip "."
1153                                          */
1154                                         continue;
1155                                 } else if (name[1] == '.' && namelen == 2) {
1156                                         /*
1157                                          * skip ".."
1158                                          */
1159                                         continue;
1160                                 } else if (!sai->sai_ls_all) {
1161                                         /*
1162                                          * skip hidden files.
1163                                          */
1164                                         sai->sai_skip_hidden++;
1165                                         continue;
1166                                 }
1167                         }
1168
1169                         /*
1170                          * don't stat-ahead first entry.
1171                          */
1172                         if (unlikely(++first == 1))
1173                                 continue;
1174
1175                         fid_le_to_cpu(&fid, &ent->lde_fid);
1176
1177                         while (({set_current_state(TASK_IDLE);
1178                                  /* matches smp_store_release() in
1179                                   * ll_deauthorize_statahead() */
1180                                  smp_load_acquire(&sai->sai_task); })) {
1181                                 spin_lock(&lli->lli_agl_lock);
1182                                 while (sa_sent_full(sai) &&
1183                                        !agl_list_empty(sai)) {
1184                                         struct ll_inode_info *clli;
1185
1186                                         __set_current_state(TASK_RUNNING);
1187                                         clli = agl_first_entry(sai);
1188                                         list_del_init(&clli->lli_agl_list);
1189                                         spin_unlock(&lli->lli_agl_lock);
1190
1191                                         ll_agl_trigger(&clli->lli_vfs_inode,
1192                                                        sai);
1193                                         cond_resched();
1194                                         spin_lock(&lli->lli_agl_lock);
1195                                 }
1196                                 spin_unlock(&lli->lli_agl_lock);
1197
1198                                 if (!sa_sent_full(sai))
1199                                         break;
1200                                 schedule();
1201                         }
1202                         __set_current_state(TASK_RUNNING);
1203
1204                         if (IS_ENCRYPTED(dir)) {
1205                                 struct llcrypt_str de_name =
1206                                         LLTR_INIT(ent->lde_name, namelen);
1207                                 struct lu_fid fid;
1208
1209                                 rc = llcrypt_fname_alloc_buffer(dir, NAME_MAX,
1210                                                                 &lltr);
1211                                 if (rc < 0)
1212                                         continue;
1213
1214                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1215                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1216                                                          &lltr, &fid)) {
1217                                         llcrypt_fname_free_buffer(&lltr);
1218                                         continue;
1219                                 }
1220
1221                                 name = lltr.name;
1222                                 namelen = lltr.len;
1223                         }
1224
1225                         sa_statahead(parent, name, namelen, &fid);
1226                         llcrypt_fname_free_buffer(&lltr);
1227                 }
1228
1229                 pos = le64_to_cpu(dp->ldp_hash_end);
1230                 down_read(&lli->lli_lsm_sem);
1231                 ll_release_page(dir, page,
1232                                 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1233                 up_read(&lli->lli_lsm_sem);
1234
1235                 if (sa_low_hit(sai)) {
1236                         rc = -EFAULT;
1237                         atomic_inc(&sbi->ll_sa_wrong);
1238                         CDEBUG(D_READA,
1239                                "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stoppingstatahead thread: pid %d\n",
1240                                PFID(&lli->lli_fid), sai->sai_hit,
1241                                sai->sai_miss, sai->sai_sent,
1242                                sai->sai_replied, current->pid);
1243                         break;
1244                 }
1245         }
1246         ll_finish_md_op_data(op_data);
1247
1248         if (rc < 0) {
1249                 spin_lock(&lli->lli_sa_lock);
1250                 sai->sai_task = NULL;
1251                 lli->lli_sa_enabled = 0;
1252                 spin_unlock(&lli->lli_sa_lock);
1253         }
1254
1255         ll_statahead_flush_nowait(sai);
1256
1257         /*
1258          * statahead is finished, but statahead entries need to be cached, wait
1259          * for file release closedir() call to stop me.
1260          */
1261         while (({set_current_state(TASK_IDLE);
1262                 /* matches smp_store_release() in ll_deauthorize_statahead() */
1263                 smp_load_acquire(&sai->sai_task); })) {
1264                 schedule();
1265         }
1266         __set_current_state(TASK_RUNNING);
1267
1268         EXIT;
1269 out:
1270         if (bh) {
1271                 rc = md_batch_stop(ll_i2mdexp(dir), sai->sai_bh);
1272                 sai->sai_bh = NULL;
1273         }
1274
1275 out_stop_agl:
1276         ll_stop_agl(sai);
1277
1278         /*
1279          * wait for inflight statahead RPCs to finish, and then we can free sai
1280          * safely because statahead RPC will access sai data
1281          */
1282         while (sai->sai_sent != sai->sai_replied)
1283                 /* in case we're not woken up, timeout wait */
1284                 msleep(125);
1285
1286         CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd\n",
1287                sbi->ll_fsname, sai, parent);
1288
1289         spin_lock(&lli->lli_sa_lock);
1290         sai->sai_task = NULL;
1291         spin_unlock(&lli->lli_sa_lock);
1292         wake_up(&sai->sai_waitq);
1293
1294         atomic_add(sai->sai_hit, &sbi->ll_sa_hit_total);
1295         atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
1296
1297         ll_sai_put(sai);
1298
1299         return rc;
1300 }
1301
1302 /* authorize opened dir handle @key to statahead */
1303 void ll_authorize_statahead(struct inode *dir, void *key)
1304 {
1305         struct ll_inode_info *lli = ll_i2info(dir);
1306
1307         spin_lock(&lli->lli_sa_lock);
1308         if (!lli->lli_opendir_key && !lli->lli_sai) {
1309                 /*
1310                  * if lli_sai is not NULL, it means previous statahead is not
1311                  * finished yet, we'd better not start a new statahead for now.
1312                  */
1313                 LASSERT(lli->lli_opendir_pid == 0);
1314                 lli->lli_opendir_key = key;
1315                 lli->lli_opendir_pid = current->pid;
1316                 lli->lli_sa_enabled = 1;
1317         }
1318         spin_unlock(&lli->lli_sa_lock);
1319 }
1320
1321 /*
1322  * deauthorize opened dir handle @key to statahead, and notify statahead thread
1323  * to quit if it's running.
1324  */
1325 void ll_deauthorize_statahead(struct inode *dir, void *key)
1326 {
1327         struct ll_inode_info *lli = ll_i2info(dir);
1328         struct ll_statahead_info *sai;
1329
1330         LASSERT(lli->lli_opendir_key == key);
1331         LASSERT(lli->lli_opendir_pid != 0);
1332
1333         CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1334                PFID(&lli->lli_fid));
1335
1336         spin_lock(&lli->lli_sa_lock);
1337         lli->lli_opendir_key = NULL;
1338         lli->lli_opendir_pid = 0;
1339         lli->lli_sa_enabled = 0;
1340         sai = lli->lli_sai;
1341         if (sai && sai->sai_task) {
1342                 /*
1343                  * statahead thread may not have quit yet because it needs to
1344                  * cache entries, now it's time to tell it to quit.
1345                  *
1346                  * wake_up_process() provides the necessary barriers
1347                  * to pair with set_current_state().
1348                  */
1349                 struct task_struct *task = sai->sai_task;
1350
1351                 /* matches smp_load_acquire() in ll_statahead_thread() */
1352                 smp_store_release(&sai->sai_task, NULL);
1353                 wake_up_process(task);
1354         }
1355         spin_unlock(&lli->lli_sa_lock);
1356 }
1357
1358 enum {
1359         /**
1360          * not first dirent, or is "."
1361          */
1362         LS_NOT_FIRST_DE = 0,
1363         /**
1364          * the first non-hidden dirent
1365          */
1366         LS_FIRST_DE,
1367         /**
1368          * the first hidden dirent, that is "."
1369          */
1370         LS_FIRST_DOT_DE
1371 };
1372
1373 /* file is first dirent under @dir */
1374 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1375 {
1376         struct qstr *target = &dentry->d_name;
1377         struct md_op_data *op_data;
1378         int dot_de;
1379         struct page *page = NULL;
1380         int rc = LS_NOT_FIRST_DE;
1381         __u64 pos = 0;
1382         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1383
1384         ENTRY;
1385
1386         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1387                                      LUSTRE_OPC_ANY, dir);
1388         if (IS_ERR(op_data))
1389                 RETURN(PTR_ERR(op_data));
1390
1391         if (IS_ENCRYPTED(dir)) {
1392                 int rc2 = llcrypt_fname_alloc_buffer(dir, NAME_MAX, &lltr);
1393
1394                 if (rc2 < 0)
1395                         RETURN(rc2);
1396         }
1397
1398         /**
1399          *FIXME choose the start offset of the readdir
1400          */
1401
1402         page = ll_get_dir_page(dir, op_data, 0, NULL);
1403
1404         while (1) {
1405                 struct lu_dirpage *dp;
1406                 struct lu_dirent  *ent;
1407
1408                 if (IS_ERR(page)) {
1409                         struct ll_inode_info *lli = ll_i2info(dir);
1410
1411                         rc = PTR_ERR(page);
1412                         CERROR("%s: reading dir "DFID" at %llu opendir_pid = %u : rc = %d\n",
1413                                ll_i2sbi(dir)->ll_fsname,
1414                                PFID(ll_inode2fid(dir)), pos,
1415                                lli->lli_opendir_pid, rc);
1416                         break;
1417                 }
1418
1419                 dp = page_address(page);
1420                 for (ent = lu_dirent_start(dp); ent != NULL;
1421                      ent = lu_dirent_next(ent)) {
1422                         __u64 hash;
1423                         int namelen;
1424                         char *name;
1425
1426                         hash = le64_to_cpu(ent->lde_hash);
1427                         /*
1428                          * The ll_get_dir_page() can return any page containing
1429                          * the given hash which may be not the start hash.
1430                          */
1431                         if (unlikely(hash < pos))
1432                                 continue;
1433
1434                         namelen = le16_to_cpu(ent->lde_namelen);
1435                         if (unlikely(namelen == 0))
1436                                 /*
1437                                  * skip dummy record.
1438                                  */
1439                                 continue;
1440
1441                         name = ent->lde_name;
1442                         if (name[0] == '.') {
1443                                 if (namelen == 1)
1444                                         /*
1445                                          * skip "."
1446                                          */
1447                                         continue;
1448                                 else if (name[1] == '.' && namelen == 2)
1449                                         /*
1450                                          * skip ".."
1451                                          */
1452                                         continue;
1453                                 else
1454                                         dot_de = 1;
1455                         } else {
1456                                 dot_de = 0;
1457                         }
1458
1459                         if (dot_de && target->name[0] != '.') {
1460                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1461                                        target->len, target->name,
1462                                        namelen, name);
1463                                 continue;
1464                         }
1465
1466                         if (IS_ENCRYPTED(dir)) {
1467                                 struct llcrypt_str de_name =
1468                                         LLTR_INIT(ent->lde_name, namelen);
1469                                 struct lu_fid fid;
1470
1471                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1472                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1473                                                          &lltr, &fid))
1474                                         continue;
1475                                 name = lltr.name;
1476                                 namelen = lltr.len;
1477                         }
1478
1479                         if (target->len != namelen ||
1480                             memcmp(target->name, name, namelen) != 0)
1481                                 rc = LS_NOT_FIRST_DE;
1482                         else if (!dot_de)
1483                                 rc = LS_FIRST_DE;
1484                         else
1485                                 rc = LS_FIRST_DOT_DE;
1486
1487                         ll_release_page(dir, page, false);
1488                         GOTO(out, rc);
1489                 }
1490                 pos = le64_to_cpu(dp->ldp_hash_end);
1491                 if (pos == MDS_DIR_END_OFF) {
1492                         /*
1493                          * End of directory reached.
1494                          */
1495                         ll_release_page(dir, page, false);
1496                         GOTO(out, rc);
1497                 } else {
1498                         /*
1499                          * chain is exhausted
1500                          * Normal case: continue to the next page.
1501                          */
1502                         ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1503                                               LDF_COLLIDE);
1504                         page = ll_get_dir_page(dir, op_data, pos, NULL);
1505                 }
1506         }
1507         EXIT;
1508 out:
1509         llcrypt_fname_free_buffer(&lltr);
1510         ll_finish_md_op_data(op_data);
1511
1512         return rc;
1513 }
1514
1515 /**
1516  * revalidate @dentryp from statahead cache
1517  *
1518  * \param[in] dir       parent directory
1519  * \param[in] sai       sai structure
1520  * \param[out] dentryp  pointer to dentry which will be revalidated
1521  * \param[in] unplug    unplug statahead window only (normally for negative
1522  *                      dentry)
1523  * \retval              1 on success, dentry is saved in @dentryp
1524  * \retval              0 if revalidation failed (no proper lock on client)
1525  * \retval              negative number upon error
1526  */
1527 static int revalidate_statahead_dentry(struct inode *dir,
1528                                        struct ll_statahead_info *sai,
1529                                        struct dentry **dentryp,
1530                                        bool unplug)
1531 {
1532         struct sa_entry *entry = NULL;
1533         struct ll_inode_info *lli = ll_i2info(dir);
1534         int rc = 0;
1535
1536         ENTRY;
1537
1538         if ((*dentryp)->d_name.name[0] == '.') {
1539                 if (sai->sai_ls_all ||
1540                     sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1541                         /*
1542                          * Hidden dentry is the first one, or statahead
1543                          * thread does not skip so many hidden dentries
1544                          * before "sai_ls_all" enabled as below.
1545                          */
1546                 } else {
1547                         if (!sai->sai_ls_all)
1548                                 /*
1549                                  * It maybe because hidden dentry is not
1550                                  * the first one, "sai_ls_all" was not
1551                                  * set, then "ls -al" missed. Enable
1552                                  * "sai_ls_all" for such case.
1553                                  */
1554                                 sai->sai_ls_all = 1;
1555
1556                         /*
1557                          * Such "getattr" has been skipped before
1558                          * "sai_ls_all" enabled as above.
1559                          */
1560                         sai->sai_miss_hidden++;
1561                         RETURN(-EAGAIN);
1562                 }
1563         }
1564
1565         if (unplug)
1566                 GOTO(out, rc = 1);
1567
1568         entry = sa_get(sai, &(*dentryp)->d_name);
1569         if (!entry)
1570                 GOTO(out, rc = -EAGAIN);
1571
1572         if (!sa_ready(entry)) {
1573                 spin_lock(&lli->lli_sa_lock);
1574                 sai->sai_index_wait = entry->se_index;
1575                 spin_unlock(&lli->lli_sa_lock);
1576                 rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
1577                                              cfs_time_seconds(30));
1578                 if (rc == 0) {
1579                         /*
1580                          * entry may not be ready, so it may be used by inflight
1581                          * statahead RPC, don't free it.
1582                          */
1583                         entry = NULL;
1584                         GOTO(out, rc = -EAGAIN);
1585                 }
1586         }
1587
1588         /*
1589          * We need to see the value that was set immediately before we
1590          * were woken up.
1591          */
1592         if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
1593             entry->se_inode) {
1594                 struct inode *inode = entry->se_inode;
1595                 struct lookup_intent it = { .it_op = IT_GETATTR,
1596                                             .it_lock_handle =
1597                                                 entry->se_handle };
1598                 __u64 bits;
1599
1600                 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1601                                         ll_inode2fid(inode), &bits);
1602                 if (rc == 1) {
1603                         if (!(*dentryp)->d_inode) {
1604                                 struct dentry *alias;
1605
1606                                 alias = ll_splice_alias(inode, *dentryp);
1607                                 if (IS_ERR(alias)) {
1608                                         ll_intent_release(&it);
1609                                         GOTO(out, rc = PTR_ERR(alias));
1610                                 }
1611                                 *dentryp = alias;
1612                                 /*
1613                                  * statahead prepared this inode, transfer inode
1614                                  * refcount from sa_entry to dentry
1615                                  */
1616                                 entry->se_inode = NULL;
1617                         } else if ((*dentryp)->d_inode != inode) {
1618                                 /* revalidate, but inode is recreated */
1619                                 CDEBUG(D_READA,
1620                                        "%s: stale dentry %pd inode " DFID", statahead inode "DFID "\n",
1621                                        ll_i2sbi(inode)->ll_fsname, *dentryp,
1622                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
1623                                        PFID(ll_inode2fid(inode)));
1624                                 ll_intent_release(&it);
1625                                 GOTO(out, rc = -ESTALE);
1626                         }
1627
1628                         if (bits & MDS_INODELOCK_LOOKUP) {
1629                                 d_lustre_revalidate(*dentryp);
1630                                 if (S_ISDIR(inode->i_mode))
1631                                         ll_update_dir_depth_dmv(dir, *dentryp);
1632                         }
1633
1634                         ll_intent_release(&it);
1635                 }
1636         }
1637 out:
1638         /*
1639          * statahead cached sa_entry can be used only once, and will be killed
1640          * right after use, so if lookup/revalidate accessed statahead cache,
1641          * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
1642          * stat this file again, we know we've done statahead before, see
1643          * dentry_may_statahead().
1644          */
1645         if (lld_is_init(*dentryp))
1646                 ll_d2d(*dentryp)->lld_sa_generation = lli->lli_sa_generation;
1647         sa_put(dir, sai, entry);
1648
1649         RETURN(rc);
1650 }
1651
1652 /**
1653  * start statahead thread
1654  *
1655  * \param[in] dir       parent directory
1656  * \param[in] dentry    dentry that triggers statahead, normally the first
1657  *                      dirent under @dir
1658  * \param[in] agl       indicate whether AGL is needed
1659  * \retval              -EAGAIN on success, because when this function is
1660  *                      called, it's already in lookup call, so client should
1661  *                      do it itself instead of waiting for statahead thread
1662  *                      to do it asynchronously.
1663  * \retval              negative number upon error
1664  */
1665 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
1666                                   bool agl)
1667 {
1668         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1669         struct ll_inode_info *lli = ll_i2info(dir);
1670         struct ll_statahead_info *sai = NULL;
1671         struct dentry *parent = dentry->d_parent;
1672         struct task_struct *task;
1673         struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
1674         int first = LS_FIRST_DE;
1675         int rc = 0;
1676
1677         ENTRY;
1678
1679         /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1680         first = is_first_dirent(dir, dentry);
1681         if (first == LS_NOT_FIRST_DE)
1682                 /* It is not "ls -{a}l" operation, no need statahead for it. */
1683                 GOTO(out, rc = -EFAULT);
1684
1685         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
1686                                        sbi->ll_sa_running_max)) {
1687                 CDEBUG(D_READA,
1688                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
1689                 GOTO(out, rc = -EMFILE);
1690         }
1691
1692         sai = ll_sai_alloc(parent);
1693         if (!sai)
1694                 GOTO(out, rc = -ENOMEM);
1695
1696         sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
1697
1698         /*
1699          * if current lli_opendir_key was deauthorized, or dir re-opened by
1700          * another process, don't start statahead, otherwise the newly spawned
1701          * statahead thread won't be notified to quit.
1702          */
1703         spin_lock(&lli->lli_sa_lock);
1704         if (unlikely(lli->lli_sai || !lli->lli_opendir_key ||
1705                      lli->lli_opendir_pid != current->pid)) {
1706                 spin_unlock(&lli->lli_sa_lock);
1707                 GOTO(out, rc = -EPERM);
1708         }
1709         lli->lli_sai = sai;
1710         spin_unlock(&lli->lli_sa_lock);
1711
1712         CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
1713                current->pid, parent);
1714
1715         task = kthread_create_on_node(ll_statahead_thread, parent, node,
1716                                       "ll_sa_%u", lli->lli_opendir_pid);
1717         if (IS_ERR(task)) {
1718                 spin_lock(&lli->lli_sa_lock);
1719                 lli->lli_sai = NULL;
1720                 spin_unlock(&lli->lli_sa_lock);
1721                 rc = PTR_ERR(task);
1722                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1723                 GOTO(out, rc);
1724         }
1725
1726         if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
1727                 ll_start_agl(parent, sai);
1728
1729         atomic_inc(&sbi->ll_sa_total);
1730         sai->sai_task = task;
1731
1732         wake_up_process(task);
1733         /*
1734          * We don't stat-ahead for the first dirent since we are already in
1735          * lookup.
1736          */
1737         RETURN(-EAGAIN);
1738
1739 out:
1740         /*
1741          * once we start statahead thread failed, disable statahead so that
1742          * subsequent stat won't waste time to try it.
1743          */
1744         spin_lock(&lli->lli_sa_lock);
1745         if (lli->lli_opendir_pid == current->pid)
1746                 lli->lli_sa_enabled = 0;
1747         spin_unlock(&lli->lli_sa_lock);
1748
1749         if (sai)
1750                 ll_sai_free(sai);
1751         if (first != LS_NOT_FIRST_DE)
1752                 atomic_dec(&sbi->ll_sa_running);
1753
1754         RETURN(rc);
1755 }
1756
1757 /*
1758  * Check whether statahead for @dir was started.
1759  */
1760 static inline bool ll_statahead_started(struct inode *dir, bool agl)
1761 {
1762         struct ll_inode_info *lli = ll_i2info(dir);
1763         struct ll_statahead_info *sai;
1764
1765         spin_lock(&lli->lli_sa_lock);
1766         sai = lli->lli_sai;
1767         if (sai && (sai->sai_agl_task != NULL) != agl)
1768                 CDEBUG(D_READA,
1769                        "%s: Statahead AGL hint changed from %d to %d\n",
1770                        ll_i2sbi(dir)->ll_fsname,
1771                        sai->sai_agl_task != NULL, agl);
1772         spin_unlock(&lli->lli_sa_lock);
1773
1774         return !!sai;
1775 }
1776
1777 /**
1778  * statahead entry function, this is called when client getattr on a file, it
1779  * will start statahead thread if this is the first dir entry, else revalidate
1780  * dentry from statahead cache.
1781  *
1782  * \param[in]  dir      parent directory
1783  * \param[out] dentryp  dentry to getattr
1784  * \param[in]  agl      whether start the agl thread
1785  *
1786  * \retval              1 on success
1787  * \retval              0 revalidation from statahead cache failed, caller needs
1788  *                      to getattr from server directly
1789  * \retval              negative number on error, caller often ignores this and
1790  *                      then getattr from server
1791  */
1792 int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
1793 {
1794         if (!ll_statahead_started(dir, agl))
1795                 return start_statahead_thread(dir, dentry, agl);
1796         return 0;
1797 }
1798
1799 /**
1800  * revalidate dentry from statahead cache.
1801  *
1802  * \param[in]  dir      parent directory
1803  * \param[out] dentryp  dentry to getattr
1804  * \param[in]  unplug   unplug statahead window only (normally for negative
1805  *                      dentry)
1806  * \retval              1 on success
1807  * \retval              0 revalidation from statahead cache failed, caller needs
1808  *                      to getattr from server directly
1809  * \retval              negative number on error, caller often ignores this and
1810  *                      then getattr from server
1811  */
1812 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
1813                             bool unplug)
1814 {
1815         struct ll_statahead_info *sai;
1816         int rc = 0;
1817
1818         sai = ll_sai_get(dir);
1819         if (sai) {
1820                 rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
1821                 CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
1822                        *dentryp, rc);
1823                 ll_sai_put(sai);
1824         }
1825         return rc;
1826 }