Whamcloud - gitweb
LU-14989 sec: access to enc file's xattrs
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #include <linux/fs.h>
33 #include <linux/sched.h>
34 #include <linux/kthread.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38 #include <linux/delay.h>
39
40 #define DEBUG_SUBSYSTEM S_LLITE
41
42 #include <obd_support.h>
43 #include <lustre_dlm.h>
44 #include "llite_internal.h"
45
46 #define SA_OMITTED_ENTRY_MAX 8ULL
47
48 typedef enum {
49         /** negative values are for error cases */
50         SA_ENTRY_INIT = 0,      /** init entry */
51         SA_ENTRY_SUCC = 1,      /** stat succeed */
52         SA_ENTRY_INVA = 2,      /** invalid entry */
53 } se_state_t;
54
55 /*
56  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
57  * and in async stat callback ll_statahead_interpret() will add it into
58  * sai_interim_entries, later statahead thread will call sa_handle_callback() to
59  * instantiate entry and move it into sai_entries, and then only scanner process
60  * can access and free it.
61  */
62 struct sa_entry {
63         /* link into sai_interim_entries or sai_entries */
64         struct list_head        se_list;
65         /* link into sai hash table locally */
66         struct list_head        se_hash;
67         /* entry index in the sai */
68         __u64                   se_index;
69         /* low layer ldlm lock handle */
70         __u64                   se_handle;
71         /* entry status */
72         se_state_t              se_state;
73         /* entry size, contains name */
74         int                     se_size;
75         /* pointer to async getattr enqueue info */
76         struct md_enqueue_info *se_minfo;
77         /* pointer to the async getattr request */
78         struct ptlrpc_request  *se_req;
79         /* pointer to the target inode */
80         struct inode           *se_inode;
81         /* entry name */
82         struct qstr             se_qstr;
83         /* entry fid */
84         struct lu_fid           se_fid;
85 };
86
87 static unsigned int sai_generation;
88 static DEFINE_SPINLOCK(sai_generation_lock);
89
90 static inline int sa_unhashed(struct sa_entry *entry)
91 {
92         return list_empty(&entry->se_hash);
93 }
94
95 /* sa_entry is ready to use */
96 static inline int sa_ready(struct sa_entry *entry)
97 {
98         /* Make sure sa_entry is updated and ready to use */
99         smp_rmb();
100         return (entry->se_state != SA_ENTRY_INIT);
101 }
102
103 /* hash value to put in sai_cache */
104 static inline int sa_hash(int val)
105 {
106         return val & LL_SA_CACHE_MASK;
107 }
108
109 /* hash entry into sai_cache */
110 static inline void
111 sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
112 {
113         int i = sa_hash(entry->se_qstr.hash);
114
115         spin_lock(&sai->sai_cache_lock[i]);
116         list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
117         spin_unlock(&sai->sai_cache_lock[i]);
118 }
119
120 /* unhash entry from sai_cache */
121 static inline void
122 sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
123 {
124         int i = sa_hash(entry->se_qstr.hash);
125
126         spin_lock(&sai->sai_cache_lock[i]);
127         list_del_init(&entry->se_hash);
128         spin_unlock(&sai->sai_cache_lock[i]);
129 }
130
131 static inline int agl_should_run(struct ll_statahead_info *sai,
132                                  struct inode *inode)
133 {
134         return inode && S_ISREG(inode->i_mode) && sai->sai_agl_task;
135 }
136
137 static inline struct ll_inode_info *
138 agl_first_entry(struct ll_statahead_info *sai)
139 {
140         return list_first_entry(&sai->sai_agls, struct ll_inode_info,
141                                 lli_agl_list);
142 }
143
144 /* statahead window is full */
145 static inline int sa_sent_full(struct ll_statahead_info *sai)
146 {
147         return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
148 }
149
150 /* got async stat replies */
151 static inline int sa_has_callback(struct ll_statahead_info *sai)
152 {
153         return !list_empty(&sai->sai_interim_entries);
154 }
155
156 static inline int agl_list_empty(struct ll_statahead_info *sai)
157 {
158         return list_empty(&sai->sai_agls);
159 }
160
161 /**
162  * (1) hit ratio less than 80%
163  * or
164  * (2) consecutive miss more than 8
165  * then means low hit.
166  */
167 static inline int sa_low_hit(struct ll_statahead_info *sai)
168 {
169         return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
170                 (sai->sai_consecutive_miss > 8));
171 }
172
173 /*
174  * if the given index is behind of statahead window more than
175  * SA_OMITTED_ENTRY_MAX, then it is old.
176  */
177 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
178 {
179         return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
180                 sai->sai_index);
181 }
182
183 /* allocate sa_entry and hash it to allow scanner process to find it */
184 static struct sa_entry *
185 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
186          const char *name, int len, const struct lu_fid *fid)
187 {
188         struct ll_inode_info *lli;
189         struct sa_entry *entry;
190         int entry_size;
191         char *dname;
192
193         ENTRY;
194
195         entry_size = sizeof(struct sa_entry) + (len & ~3) + 4;
196         OBD_ALLOC(entry, entry_size);
197         if (unlikely(!entry))
198                 RETURN(ERR_PTR(-ENOMEM));
199
200         CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
201                len, name, entry, index);
202
203         entry->se_index = index;
204
205         entry->se_state = SA_ENTRY_INIT;
206         entry->se_size = entry_size;
207         dname = (char *)entry + sizeof(struct sa_entry);
208         memcpy(dname, name, len);
209         dname[len] = 0;
210         entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
211         entry->se_qstr.len = len;
212         entry->se_qstr.name = dname;
213         entry->se_fid = *fid;
214
215         lli = ll_i2info(sai->sai_dentry->d_inode);
216
217         spin_lock(&lli->lli_sa_lock);
218         INIT_LIST_HEAD(&entry->se_list);
219         sa_rehash(sai, entry);
220         spin_unlock(&lli->lli_sa_lock);
221
222         atomic_inc(&sai->sai_cache_count);
223
224         RETURN(entry);
225 }
226
227 /* free sa_entry, which should have been unhashed and not in any list */
228 static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
229 {
230         CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
231                entry->se_qstr.len, entry->se_qstr.name, entry,
232                entry->se_index);
233
234         LASSERT(list_empty(&entry->se_list));
235         LASSERT(sa_unhashed(entry));
236
237         OBD_FREE(entry, entry->se_size);
238         atomic_dec(&sai->sai_cache_count);
239 }
240
241 /*
242  * find sa_entry by name, used by directory scanner, lock is not needed because
243  * only scanner can remove the entry from cache.
244  */
245 static struct sa_entry *
246 sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
247 {
248         struct sa_entry *entry;
249         int i = sa_hash(qstr->hash);
250
251         list_for_each_entry(entry, &sai->sai_cache[i], se_hash) {
252                 if (entry->se_qstr.hash == qstr->hash &&
253                     entry->se_qstr.len == qstr->len &&
254                     memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0)
255                         return entry;
256         }
257         return NULL;
258 }
259
260 /* unhash and unlink sa_entry, and then free it */
261 static inline void
262 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
263 {
264         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
265
266         LASSERT(!sa_unhashed(entry));
267         LASSERT(!list_empty(&entry->se_list));
268         LASSERT(sa_ready(entry));
269
270         sa_unhash(sai, entry);
271
272         spin_lock(&lli->lli_sa_lock);
273         list_del_init(&entry->se_list);
274         spin_unlock(&lli->lli_sa_lock);
275
276         iput(entry->se_inode);
277
278         sa_free(sai, entry);
279 }
280
281 /* called by scanner after use, sa_entry will be killed */
282 static void
283 sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
284 {
285         struct sa_entry *tmp, *next;
286
287         if (entry && entry->se_state == SA_ENTRY_SUCC) {
288                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
289
290                 sai->sai_hit++;
291                 sai->sai_consecutive_miss = 0;
292                 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
293         } else {
294                 sai->sai_miss++;
295                 sai->sai_consecutive_miss++;
296         }
297
298         if (entry)
299                 sa_kill(sai, entry);
300
301         /*
302          * kill old completed entries, only scanner process does this, no need
303          * to lock
304          */
305         list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) {
306                 if (!is_omitted_entry(sai, tmp->se_index))
307                         break;
308                 sa_kill(sai, tmp);
309         }
310 }
311
312 /*
313  * update state and sort add entry to sai_entries by index, return true if
314  * scanner is waiting on this entry.
315  */
316 static bool
317 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
318 {
319         struct sa_entry *se;
320         struct list_head *pos = &sai->sai_entries;
321         __u64 index = entry->se_index;
322
323         LASSERT(!sa_ready(entry));
324         LASSERT(list_empty(&entry->se_list));
325
326         list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
327                 if (se->se_index < entry->se_index) {
328                         pos = &se->se_list;
329                         break;
330                 }
331         }
332         list_add(&entry->se_list, pos);
333         /*
334          * LU-9210: ll_statahead_interpet must be able to see this before
335          * we wake it up
336          */
337         smp_store_release(&entry->se_state,
338                           ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
339
340         return (index == sai->sai_index_wait);
341 }
342
343 /* finish async stat RPC arguments */
344 static void sa_fini_data(struct md_enqueue_info *minfo)
345 {
346         struct md_op_data *op_data = &minfo->mi_data;
347
348         if (op_data->op_flags & MF_OPNAME_KMALLOCED)
349                 /* allocated via ll_setup_filename called from sa_prep_data */
350                 kfree(op_data->op_name);
351         ll_unlock_md_op_lsm(&minfo->mi_data);
352         iput(minfo->mi_dir);
353         OBD_FREE_PTR(minfo);
354 }
355
356 static int ll_statahead_interpret(struct ptlrpc_request *req,
357                                   struct md_enqueue_info *minfo, int rc);
358
359 /*
360  * prepare arguments for async stat RPC.
361  */
362 static struct md_enqueue_info *
363 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
364 {
365         struct md_enqueue_info   *minfo;
366         struct ldlm_enqueue_info *einfo;
367         struct md_op_data        *op_data;
368
369         OBD_ALLOC_PTR(minfo);
370         if (!minfo)
371                 return ERR_PTR(-ENOMEM);
372
373         op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child,
374                                      entry->se_qstr.name, entry->se_qstr.len, 0,
375                                      LUSTRE_OPC_ANY, NULL);
376         if (IS_ERR(op_data)) {
377                 OBD_FREE_PTR(minfo);
378                 return (struct md_enqueue_info *)op_data;
379         }
380
381         if (!child)
382                 op_data->op_fid2 = entry->se_fid;
383
384         minfo->mi_it.it_op = IT_GETATTR;
385         minfo->mi_dir = igrab(dir);
386         minfo->mi_cb = ll_statahead_interpret;
387         minfo->mi_cbdata = entry;
388
389         einfo = &minfo->mi_einfo;
390         einfo->ei_type   = LDLM_IBITS;
391         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
392         einfo->ei_cb_bl  = ll_md_blocking_ast;
393         einfo->ei_cb_cp  = ldlm_completion_ast;
394         einfo->ei_cb_gl  = NULL;
395         einfo->ei_cbdata = NULL;
396
397         return minfo;
398 }
399
400 /*
401  * release resources used in async stat RPC, update entry state and wakeup if
402  * scanner process it waiting on this entry.
403  */
404 static void
405 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
406 {
407         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
408         struct md_enqueue_info *minfo = entry->se_minfo;
409         struct ptlrpc_request *req = entry->se_req;
410         bool wakeup;
411
412         /* release resources used in RPC */
413         if (minfo) {
414                 entry->se_minfo = NULL;
415                 ll_intent_release(&minfo->mi_it);
416                 sa_fini_data(minfo);
417         }
418
419         if (req) {
420                 entry->se_req = NULL;
421                 ptlrpc_req_finished(req);
422         }
423
424         spin_lock(&lli->lli_sa_lock);
425         wakeup = __sa_make_ready(sai, entry, ret);
426         spin_unlock(&lli->lli_sa_lock);
427
428         if (wakeup)
429                 wake_up(&sai->sai_waitq);
430 }
431
432 /* insert inode into the list of sai_agls */
433 static void ll_agl_add(struct ll_statahead_info *sai,
434                        struct inode *inode, int index)
435 {
436         struct ll_inode_info *child  = ll_i2info(inode);
437         struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
438
439         spin_lock(&child->lli_agl_lock);
440         if (child->lli_agl_index == 0) {
441                 child->lli_agl_index = index;
442                 spin_unlock(&child->lli_agl_lock);
443
444                 LASSERT(list_empty(&child->lli_agl_list));
445
446                 spin_lock(&parent->lli_agl_lock);
447                 /* Re-check under the lock */
448                 if (agl_should_run(sai, inode)) {
449                         if (agl_list_empty(sai))
450                                 wake_up_process(sai->sai_agl_task);
451                         igrab(inode);
452                         list_add_tail(&child->lli_agl_list, &sai->sai_agls);
453                 } else
454                         child->lli_agl_index = 0;
455                 spin_unlock(&parent->lli_agl_lock);
456         } else {
457                 spin_unlock(&child->lli_agl_lock);
458         }
459 }
460
461 /* allocate sai */
462 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
463 {
464         struct ll_statahead_info *sai;
465         struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
466         int i;
467
468         ENTRY;
469
470         OBD_ALLOC_PTR(sai);
471         if (!sai)
472                 RETURN(NULL);
473
474         sai->sai_dentry = dget(dentry);
475         atomic_set(&sai->sai_refcount, 1);
476         sai->sai_max = LL_SA_RPC_MIN;
477         sai->sai_index = 1;
478         init_waitqueue_head(&sai->sai_waitq);
479
480         INIT_LIST_HEAD(&sai->sai_interim_entries);
481         INIT_LIST_HEAD(&sai->sai_entries);
482         INIT_LIST_HEAD(&sai->sai_agls);
483
484         for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
485                 INIT_LIST_HEAD(&sai->sai_cache[i]);
486                 spin_lock_init(&sai->sai_cache_lock[i]);
487         }
488         atomic_set(&sai->sai_cache_count, 0);
489
490         spin_lock(&sai_generation_lock);
491         lli->lli_sa_generation = ++sai_generation;
492         if (unlikely(sai_generation == 0))
493                 lli->lli_sa_generation = ++sai_generation;
494         spin_unlock(&sai_generation_lock);
495
496         RETURN(sai);
497 }
498
499 /* free sai */
500 static inline void ll_sai_free(struct ll_statahead_info *sai)
501 {
502         LASSERT(sai->sai_dentry != NULL);
503         dput(sai->sai_dentry);
504         OBD_FREE_PTR(sai);
505 }
506
507 /*
508  * take refcount of sai if sai for @dir exists, which means statahead is on for
509  * this directory.
510  */
511 static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
512 {
513         struct ll_inode_info *lli = ll_i2info(dir);
514         struct ll_statahead_info *sai = NULL;
515
516         spin_lock(&lli->lli_sa_lock);
517         sai = lli->lli_sai;
518         if (sai)
519                 atomic_inc(&sai->sai_refcount);
520         spin_unlock(&lli->lli_sa_lock);
521
522         return sai;
523 }
524
525 /*
526  * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
527  * attached to it.
528  */
529 static void ll_sai_put(struct ll_statahead_info *sai)
530 {
531         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
532
533         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
534                 struct sa_entry *entry, *next;
535                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
536
537                 lli->lli_sai = NULL;
538                 spin_unlock(&lli->lli_sa_lock);
539
540                 LASSERT(!sai->sai_task);
541                 LASSERT(!sai->sai_agl_task);
542                 LASSERT(sai->sai_sent == sai->sai_replied);
543                 LASSERT(!sa_has_callback(sai));
544
545                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
546                                          se_list)
547                         sa_kill(sai, entry);
548
549                 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
550                 LASSERT(agl_list_empty(sai));
551
552                 ll_sai_free(sai);
553                 atomic_dec(&sbi->ll_sa_running);
554         }
555 }
556
557 /* Do NOT forget to drop inode refcount when into sai_agls. */
558 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
559 {
560         struct ll_inode_info *lli = ll_i2info(inode);
561         u64 index = lli->lli_agl_index;
562         ktime_t expire;
563         int rc;
564
565         ENTRY;
566
567         LASSERT(list_empty(&lli->lli_agl_list));
568
569         /* AGL maybe fall behind statahead with one entry */
570         if (is_omitted_entry(sai, index + 1)) {
571                 lli->lli_agl_index = 0;
572                 iput(inode);
573                 RETURN_EXIT;
574         }
575
576         /*
577          * In case of restore, the MDT has the right size and has already
578          * sent it back without granting the layout lock, inode is up-to-date.
579          * Then AGL (async glimpse lock) is useless.
580          * Also to glimpse we need the layout, in case of a runninh restore
581          * the MDT holds the layout lock so the glimpse will block up to the
582          * end of restore (statahead/agl will block)
583          */
584         if (test_bit(LLIF_FILE_RESTORING, &lli->lli_flags)) {
585                 lli->lli_agl_index = 0;
586                 iput(inode);
587                 RETURN_EXIT;
588         }
589
590         /* Someone is in glimpse (sync or async), do nothing. */
591         rc = down_write_trylock(&lli->lli_glimpse_sem);
592         if (rc == 0) {
593                 lli->lli_agl_index = 0;
594                 iput(inode);
595                 RETURN_EXIT;
596         }
597
598         /*
599          * Someone triggered glimpse within 1 sec before.
600          * 1) The former glimpse succeeded with glimpse lock granted by OST, and
601          *    if the lock is still cached on client, AGL needs to do nothing. If
602          *    it is cancelled by other client, AGL maybe cannot obtaion new lock
603          *    for no glimpse callback triggered by AGL.
604          * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
605          *    Under such case, it is quite possible that the OST will not grant
606          *    glimpse lock for AGL also.
607          * 3) The former glimpse failed, compared with other two cases, it is
608          *    relative rare. AGL can ignore such case, and it will not muchly
609          *    affect the performance.
610          */
611         expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
612         if (ktime_to_ns(lli->lli_glimpse_time) &&
613             ktime_before(expire, lli->lli_glimpse_time)) {
614                 up_write(&lli->lli_glimpse_sem);
615                 lli->lli_agl_index = 0;
616                 iput(inode);
617                 RETURN_EXIT;
618         }
619
620         CDEBUG(D_READA,
621                "Handling (init) async glimpse: inode = " DFID", idx = %llu\n",
622                PFID(&lli->lli_fid), index);
623
624         cl_agl(inode);
625         lli->lli_agl_index = 0;
626         lli->lli_glimpse_time = ktime_get();
627         up_write(&lli->lli_glimpse_sem);
628
629         CDEBUG(D_READA,
630                "Handled (init) async glimpse: inode= " DFID", idx = %llu, rc = %d\n",
631                PFID(&lli->lli_fid), index, rc);
632
633         iput(inode);
634
635         EXIT;
636 }
637
638 /*
639  * prepare inode for sa entry, add it into agl list, now sa_entry is ready
640  * to be used by scanner process.
641  */
642 static void sa_instantiate(struct ll_statahead_info *sai,
643                            struct sa_entry *entry)
644 {
645         struct inode *dir = sai->sai_dentry->d_inode;
646         struct inode *child;
647         struct md_enqueue_info *minfo;
648         struct lookup_intent *it;
649         struct ptlrpc_request *req;
650         struct mdt_body *body;
651         int rc = 0;
652
653         ENTRY;
654
655         LASSERT(entry->se_handle != 0);
656
657         minfo = entry->se_minfo;
658         it = &minfo->mi_it;
659         req = entry->se_req;
660         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
661         if (!body)
662                 GOTO(out, rc = -EFAULT);
663
664         child = entry->se_inode;
665         /* revalidate; unlinked and re-created with the same name */
666         if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2, &body->mbo_fid1))) {
667                 if (child) {
668                         entry->se_inode = NULL;
669                         iput(child);
670                 }
671                 /* The mdt_body is invalid. Skip this entry */
672                 GOTO(out, rc = -EAGAIN);
673         }
674
675         it->it_lock_handle = entry->se_handle;
676         rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
677         if (rc != 1)
678                 GOTO(out, rc = -EAGAIN);
679
680         rc = ll_prep_inode(&child, &req->rq_pill, dir->i_sb, it);
681         if (rc)
682                 GOTO(out, rc);
683
684         /* If encryption context was returned by MDT, put it in
685          * inode now to save an extra getxattr.
686          */
687         if (body->mbo_valid & OBD_MD_ENCCTX) {
688                 void *encctx = req_capsule_server_get(&req->rq_pill,
689                                                       &RMF_FILE_ENCCTX);
690                 __u32 encctxlen = req_capsule_get_size(&req->rq_pill,
691                                                        &RMF_FILE_ENCCTX,
692                                                        RCL_SERVER);
693
694                 if (encctxlen) {
695                         CDEBUG(D_SEC,
696                                "server returned encryption ctx for "DFID"\n",
697                                PFID(ll_inode2fid(child)));
698                         rc = ll_xattr_cache_insert(child,
699                                                LL_XATTR_NAME_ENCRYPTION_CONTEXT,
700                                                encctx, encctxlen);
701                         if (rc)
702                                 CWARN("%s: cannot set enc ctx for "DFID": rc = %d\n",
703                                       ll_i2sbi(child)->ll_fsname,
704                                       PFID(ll_inode2fid(child)), rc);
705                 }
706         }
707
708         CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
709                ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
710                entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
711         ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
712
713         entry->se_inode = child;
714
715         if (agl_should_run(sai, child))
716                 ll_agl_add(sai, child, entry->se_index);
717
718         EXIT;
719
720 out:
721         /*
722          * sa_make_ready() will drop ldlm ibits lock refcount by calling
723          * ll_intent_drop_lock() in spite of failures. Do not worry about
724          * calling ll_intent_drop_lock() more than once.
725          */
726         sa_make_ready(sai, entry, rc);
727 }
728
729 /* once there are async stat replies, instantiate sa_entry from replies */
730 static void sa_handle_callback(struct ll_statahead_info *sai)
731 {
732         struct ll_inode_info *lli;
733
734         lli = ll_i2info(sai->sai_dentry->d_inode);
735
736         spin_lock(&lli->lli_sa_lock);
737         while (sa_has_callback(sai)) {
738                 struct sa_entry *entry;
739
740                 entry = list_entry(sai->sai_interim_entries.next,
741                                    struct sa_entry, se_list);
742                 list_del_init(&entry->se_list);
743                 spin_unlock(&lli->lli_sa_lock);
744
745                 sa_instantiate(sai, entry);
746                 spin_lock(&lli->lli_sa_lock);
747         }
748         spin_unlock(&lli->lli_sa_lock);
749 }
750
751 /*
752  * callback for async stat RPC, because this is called in ptlrpcd context, we
753  * only put sa_entry in sai_interim_entries, and wake up statahead thread to
754  * really prepare inode and instantiate sa_entry later.
755  */
756 static int ll_statahead_interpret(struct ptlrpc_request *req,
757                                   struct md_enqueue_info *minfo, int rc)
758 {
759         struct lookup_intent *it = &minfo->mi_it;
760         struct inode *dir = minfo->mi_dir;
761         struct ll_inode_info *lli = ll_i2info(dir);
762         struct ll_statahead_info *sai = lli->lli_sai;
763         struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
764         __u64 handle = 0;
765
766         ENTRY;
767
768         if (it_disposition(it, DISP_LOOKUP_NEG))
769                 rc = -ENOENT;
770
771         /*
772          * because statahead thread will wait for all inflight RPC to finish,
773          * sai should be always valid, no need to refcount
774          */
775         LASSERT(sai != NULL);
776         LASSERT(entry != NULL);
777
778         CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
779                entry->se_qstr.len, entry->se_qstr.name, rc);
780
781         if (rc != 0) {
782                 ll_intent_release(it);
783                 sa_fini_data(minfo);
784         } else {
785                 /*
786                  * release ibits lock ASAP to avoid deadlock when statahead
787                  * thread enqueues lock on parent in readdir and another
788                  * process enqueues lock on child with parent lock held, eg.
789                  * unlink.
790                  */
791                 handle = it->it_lock_handle;
792                 ll_intent_drop_lock(it);
793                 ll_unlock_md_op_lsm(&minfo->mi_data);
794         }
795
796         spin_lock(&lli->lli_sa_lock);
797         if (rc != 0) {
798                 if (__sa_make_ready(sai, entry, rc))
799                         wake_up(&sai->sai_waitq);
800         } else {
801                 int first = 0;
802
803                 entry->se_minfo = minfo;
804                 entry->se_req = ptlrpc_request_addref(req);
805                 /*
806                  * Release the async ibits lock ASAP to avoid deadlock
807                  * when statahead thread tries to enqueue lock on parent
808                  * for readpage and other tries to enqueue lock on child
809                  * with parent's lock held, for example: unlink.
810                  */
811                 entry->se_handle = handle;
812                 if (!sa_has_callback(sai))
813                         first = 1;
814
815                 list_add_tail(&entry->se_list, &sai->sai_interim_entries);
816                 if (first && sai->sai_task)
817                         wake_up_process(sai->sai_task);
818         }
819         sai->sai_replied++;
820
821         spin_unlock(&lli->lli_sa_lock);
822
823         RETURN(rc);
824 }
825
826 /* async stat for file not found in dcache */
827 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
828 {
829         struct md_enqueue_info   *minfo;
830         int                       rc;
831
832         ENTRY;
833
834         minfo = sa_prep_data(dir, NULL, entry);
835         if (IS_ERR(minfo))
836                 RETURN(PTR_ERR(minfo));
837
838         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
839         if (rc < 0)
840                 sa_fini_data(minfo);
841
842         RETURN(rc);
843 }
844
845 /**
846  * async stat for file found in dcache, similar to .revalidate
847  *
848  * \retval      1 dentry valid, no RPC sent
849  * \retval      0 dentry invalid, will send async stat RPC
850  * \retval      negative number upon error
851  */
852 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
853                          struct dentry *dentry)
854 {
855         struct inode *inode = dentry->d_inode;
856         struct lookup_intent it = { .it_op = IT_GETATTR,
857                                     .it_lock_handle = 0 };
858         struct md_enqueue_info *minfo;
859         int rc;
860
861         ENTRY;
862
863         if (unlikely(!inode))
864                 RETURN(1);
865
866         if (d_mountpoint(dentry))
867                 RETURN(1);
868
869         minfo = sa_prep_data(dir, inode, entry);
870         if (IS_ERR(minfo))
871                 RETURN(PTR_ERR(minfo));
872
873         entry->se_inode = igrab(inode);
874         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
875                                 NULL);
876         if (rc == 1) {
877                 entry->se_handle = it.it_lock_handle;
878                 ll_intent_release(&it);
879                 sa_fini_data(minfo);
880                 RETURN(1);
881         }
882
883         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
884         if (rc < 0) {
885                 entry->se_inode = NULL;
886                 iput(inode);
887                 sa_fini_data(minfo);
888         }
889
890         RETURN(rc);
891 }
892
893 /* async stat for file with @name */
894 static void sa_statahead(struct dentry *parent, const char *name, int len,
895                          const struct lu_fid *fid)
896 {
897         struct inode *dir = parent->d_inode;
898         struct ll_inode_info *lli = ll_i2info(dir);
899         struct ll_statahead_info *sai = lli->lli_sai;
900         struct dentry *dentry = NULL;
901         struct sa_entry *entry;
902         int rc;
903
904         ENTRY;
905
906         entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
907         if (IS_ERR(entry))
908                 RETURN_EXIT;
909
910         dentry = d_lookup(parent, &entry->se_qstr);
911         if (!dentry) {
912                 rc = sa_lookup(dir, entry);
913         } else {
914                 rc = sa_revalidate(dir, entry, dentry);
915                 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
916                         ll_agl_add(sai, dentry->d_inode, entry->se_index);
917         }
918
919         if (dentry)
920                 dput(dentry);
921
922         if (rc != 0)
923                 sa_make_ready(sai, entry, rc);
924         else
925                 sai->sai_sent++;
926
927         sai->sai_index++;
928
929         EXIT;
930 }
931
932 /* async glimpse (agl) thread main function */
933 static int ll_agl_thread(void *arg)
934 {
935         struct dentry *parent = (struct dentry *)arg;
936         struct inode *dir = parent->d_inode;
937         struct ll_inode_info *plli = ll_i2info(dir);
938         struct ll_inode_info *clli;
939         /*
940          * We already own this reference, so it is safe to take it
941          * without a lock.
942          */
943         struct ll_statahead_info *sai = plli->lli_sai;
944
945         ENTRY;
946
947         CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
948                sai, parent);
949
950         while (({set_current_state(TASK_IDLE);
951                  !kthread_should_stop(); })) {
952                 spin_lock(&plli->lli_agl_lock);
953                 clli = list_first_entry_or_null(&sai->sai_agls,
954                                                 struct ll_inode_info,
955                                                 lli_agl_list);
956                 if (clli) {
957                         __set_current_state(TASK_RUNNING);
958                         list_del_init(&clli->lli_agl_list);
959                         spin_unlock(&plli->lli_agl_lock);
960                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
961                         cond_resched();
962                 } else {
963                         spin_unlock(&plli->lli_agl_lock);
964                         schedule();
965                 }
966         }
967         __set_current_state(TASK_RUNNING);
968         RETURN(0);
969 }
970
971 static void ll_stop_agl(struct ll_statahead_info *sai)
972 {
973         struct dentry *parent = sai->sai_dentry;
974         struct ll_inode_info *plli = ll_i2info(parent->d_inode);
975         struct ll_inode_info *clli;
976         struct task_struct *agl_task;
977
978         spin_lock(&plli->lli_agl_lock);
979         agl_task = sai->sai_agl_task;
980         sai->sai_agl_task = NULL;
981         spin_unlock(&plli->lli_agl_lock);
982         if (!agl_task)
983                 return;
984
985         CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
986                sai, (unsigned int)agl_task->pid);
987         kthread_stop(agl_task);
988
989         spin_lock(&plli->lli_agl_lock);
990         while ((clli = list_first_entry_or_null(&sai->sai_agls,
991                                                 struct ll_inode_info,
992                                                 lli_agl_list)) != NULL) {
993                 list_del_init(&clli->lli_agl_list);
994                 spin_unlock(&plli->lli_agl_lock);
995                 clli->lli_agl_index = 0;
996                 iput(&clli->lli_vfs_inode);
997                 spin_lock(&plli->lli_agl_lock);
998         }
999         spin_unlock(&plli->lli_agl_lock);
1000         CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
1001                sai, parent);
1002         ll_sai_put(sai);
1003 }
1004
1005 /* start agl thread */
1006 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
1007 {
1008         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1009         struct ll_inode_info *plli;
1010         struct task_struct *task;
1011
1012         ENTRY;
1013
1014         CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
1015                sai, parent);
1016
1017         plli = ll_i2info(parent->d_inode);
1018         task = kthread_create_on_node(ll_agl_thread, parent, node, "ll_agl_%d",
1019                                       plli->lli_opendir_pid);
1020         if (IS_ERR(task)) {
1021                 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
1022                 RETURN_EXIT;
1023         }
1024         sai->sai_agl_task = task;
1025         atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
1026         /* Get an extra reference that the thread holds */
1027         ll_sai_get(d_inode(parent));
1028
1029         wake_up_process(task);
1030
1031         EXIT;
1032 }
1033
1034 /* statahead thread main function */
1035 static int ll_statahead_thread(void *arg)
1036 {
1037         struct dentry *parent = (struct dentry *)arg;
1038         struct inode *dir = parent->d_inode;
1039         struct ll_inode_info *lli = ll_i2info(dir);
1040         struct ll_sb_info *sbi = ll_i2sbi(dir);
1041         struct ll_statahead_info *sai = lli->lli_sai;
1042         int first = 0;
1043         struct md_op_data *op_data;
1044         struct page *page = NULL;
1045         __u64 pos = 0;
1046         int rc = 0;
1047
1048         ENTRY;
1049
1050         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1051                sai, parent);
1052
1053         OBD_ALLOC_PTR(op_data);
1054         if (!op_data)
1055                 GOTO(out, rc = -ENOMEM);
1056
1057         while (pos != MDS_DIR_END_OFF && sai->sai_task) {
1058                 struct lu_dirpage *dp;
1059                 struct lu_dirent  *ent;
1060
1061                 op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
1062                                              LUSTRE_OPC_ANY, dir);
1063                 if (IS_ERR(op_data)) {
1064                         rc = PTR_ERR(op_data);
1065                         break;
1066                 }
1067
1068                 sai->sai_in_readpage = 1;
1069                 page = ll_get_dir_page(dir, op_data, pos);
1070                 ll_unlock_md_op_lsm(op_data);
1071                 sai->sai_in_readpage = 0;
1072                 if (IS_ERR(page)) {
1073                         rc = PTR_ERR(page);
1074                         CDEBUG(D_READA,
1075                                "error reading dir "DFID" at %llu /%llu opendir_pid = %u: rc = %d\n",
1076                                PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1077                                lli->lli_opendir_pid, rc);
1078                         break;
1079                 }
1080
1081                 dp = page_address(page);
1082                 for (ent = lu_dirent_start(dp);
1083                      ent != NULL && sai->sai_task &&
1084                      !sa_low_hit(sai);
1085                      ent = lu_dirent_next(ent)) {
1086                         __u64 hash;
1087                         int namelen;
1088                         char *name;
1089                         struct lu_fid fid;
1090                         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1091
1092                         hash = le64_to_cpu(ent->lde_hash);
1093                         if (unlikely(hash < pos))
1094                                 /*
1095                                  * Skip until we find target hash value.
1096                                  */
1097                                 continue;
1098
1099                         namelen = le16_to_cpu(ent->lde_namelen);
1100                         if (unlikely(namelen == 0))
1101                                 /*
1102                                  * Skip dummy record.
1103                                  */
1104                                 continue;
1105
1106                         name = ent->lde_name;
1107                         if (name[0] == '.') {
1108                                 if (namelen == 1) {
1109                                         /*
1110                                          * skip "."
1111                                          */
1112                                         continue;
1113                                 } else if (name[1] == '.' && namelen == 2) {
1114                                         /*
1115                                          * skip ".."
1116                                          */
1117                                         continue;
1118                                 } else if (!sai->sai_ls_all) {
1119                                         /*
1120                                          * skip hidden files.
1121                                          */
1122                                         sai->sai_skip_hidden++;
1123                                         continue;
1124                                 }
1125                         }
1126
1127                         /*
1128                          * don't stat-ahead first entry.
1129                          */
1130                         if (unlikely(++first == 1))
1131                                 continue;
1132
1133                         fid_le_to_cpu(&fid, &ent->lde_fid);
1134
1135                         while (({set_current_state(TASK_IDLE);
1136                                  sai->sai_task; })) {
1137                                 if (sa_has_callback(sai)) {
1138                                         __set_current_state(TASK_RUNNING);
1139                                         sa_handle_callback(sai);
1140                                 }
1141
1142                                 spin_lock(&lli->lli_agl_lock);
1143                                 while (sa_sent_full(sai) &&
1144                                        !agl_list_empty(sai)) {
1145                                         struct ll_inode_info *clli;
1146
1147                                         __set_current_state(TASK_RUNNING);
1148                                         clli = agl_first_entry(sai);
1149                                         list_del_init(&clli->lli_agl_list);
1150                                         spin_unlock(&lli->lli_agl_lock);
1151
1152                                         ll_agl_trigger(&clli->lli_vfs_inode,
1153                                                        sai);
1154                                         cond_resched();
1155                                         spin_lock(&lli->lli_agl_lock);
1156                                 }
1157                                 spin_unlock(&lli->lli_agl_lock);
1158
1159                                 if (!sa_sent_full(sai))
1160                                         break;
1161                                 schedule();
1162                         }
1163                         __set_current_state(TASK_RUNNING);
1164
1165                         if (IS_ENCRYPTED(dir)) {
1166                                 struct llcrypt_str de_name =
1167                                         LLTR_INIT(ent->lde_name, namelen);
1168                                 struct lu_fid fid;
1169
1170                                 rc = llcrypt_fname_alloc_buffer(dir, NAME_MAX,
1171                                                                 &lltr);
1172                                 if (rc < 0)
1173                                         continue;
1174
1175                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1176                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1177                                                          &lltr, &fid)) {
1178                                         llcrypt_fname_free_buffer(&lltr);
1179                                         continue;
1180                                 }
1181
1182                                 name = lltr.name;
1183                                 namelen = lltr.len;
1184                         }
1185
1186                         sa_statahead(parent, name, namelen, &fid);
1187                         llcrypt_fname_free_buffer(&lltr);
1188                 }
1189
1190                 pos = le64_to_cpu(dp->ldp_hash_end);
1191                 ll_release_page(dir, page,
1192                                 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1193
1194                 if (sa_low_hit(sai)) {
1195                         rc = -EFAULT;
1196                         atomic_inc(&sbi->ll_sa_wrong);
1197                         CDEBUG(D_READA,
1198                                "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stoppingstatahead thread: pid %d\n",
1199                                PFID(&lli->lli_fid), sai->sai_hit,
1200                                sai->sai_miss, sai->sai_sent,
1201                                sai->sai_replied, current->pid);
1202                         break;
1203                 }
1204         }
1205         ll_finish_md_op_data(op_data);
1206
1207         if (rc < 0) {
1208                 spin_lock(&lli->lli_sa_lock);
1209                 sai->sai_task = NULL;
1210                 lli->lli_sa_enabled = 0;
1211                 spin_unlock(&lli->lli_sa_lock);
1212         }
1213
1214         /*
1215          * statahead is finished, but statahead entries need to be cached, wait
1216          * for file release to stop me.
1217          */
1218         while (({set_current_state(TASK_IDLE);
1219                  sai->sai_task; })) {
1220                 if (sa_has_callback(sai)) {
1221                         __set_current_state(TASK_RUNNING);
1222                         sa_handle_callback(sai);
1223                 } else {
1224                         schedule();
1225                 }
1226         }
1227         __set_current_state(TASK_RUNNING);
1228
1229         EXIT;
1230 out:
1231         ll_stop_agl(sai);
1232
1233         /*
1234          * wait for inflight statahead RPCs to finish, and then we can free sai
1235          * safely because statahead RPC will access sai data
1236          */
1237         while (sai->sai_sent != sai->sai_replied)
1238                 /* in case we're not woken up, timeout wait */
1239                 msleep(125);
1240
1241         /* release resources held by statahead RPCs */
1242         sa_handle_callback(sai);
1243
1244         CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd\n",
1245                sbi->ll_fsname, sai, parent);
1246
1247         spin_lock(&lli->lli_sa_lock);
1248         sai->sai_task = NULL;
1249         spin_unlock(&lli->lli_sa_lock);
1250         wake_up(&sai->sai_waitq);
1251
1252         ll_sai_put(sai);
1253
1254         return rc;
1255 }
1256
1257 /* authorize opened dir handle @key to statahead */
1258 void ll_authorize_statahead(struct inode *dir, void *key)
1259 {
1260         struct ll_inode_info *lli = ll_i2info(dir);
1261
1262         spin_lock(&lli->lli_sa_lock);
1263         if (!lli->lli_opendir_key && !lli->lli_sai) {
1264                 /*
1265                  * if lli_sai is not NULL, it means previous statahead is not
1266                  * finished yet, we'd better not start a new statahead for now.
1267                  */
1268                 LASSERT(lli->lli_opendir_pid == 0);
1269                 lli->lli_opendir_key = key;
1270                 lli->lli_opendir_pid = current->pid;
1271                 lli->lli_sa_enabled = 1;
1272         }
1273         spin_unlock(&lli->lli_sa_lock);
1274 }
1275
1276 /*
1277  * deauthorize opened dir handle @key to statahead, and notify statahead thread
1278  * to quit if it's running.
1279  */
1280 void ll_deauthorize_statahead(struct inode *dir, void *key)
1281 {
1282         struct ll_inode_info *lli = ll_i2info(dir);
1283         struct ll_statahead_info *sai;
1284
1285         LASSERT(lli->lli_opendir_key == key);
1286         LASSERT(lli->lli_opendir_pid != 0);
1287
1288         CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1289                PFID(&lli->lli_fid));
1290
1291         spin_lock(&lli->lli_sa_lock);
1292         lli->lli_opendir_key = NULL;
1293         lli->lli_opendir_pid = 0;
1294         lli->lli_sa_enabled = 0;
1295         sai = lli->lli_sai;
1296         if (sai && sai->sai_task) {
1297                 /*
1298                  * statahead thread may not have quit yet because it needs to
1299                  * cache entries, now it's time to tell it to quit.
1300                  *
1301                  * wake_up_process() provides the necessary barriers
1302                  * to pair with set_current_state().
1303                  */
1304                 struct task_struct *task = sai->sai_task;
1305
1306                 sai->sai_task = NULL;
1307                 wake_up_process(task);
1308         }
1309         spin_unlock(&lli->lli_sa_lock);
1310 }
1311
1312 enum {
1313         /**
1314          * not first dirent, or is "."
1315          */
1316         LS_NOT_FIRST_DE = 0,
1317         /**
1318          * the first non-hidden dirent
1319          */
1320         LS_FIRST_DE,
1321         /**
1322          * the first hidden dirent, that is "."
1323          */
1324         LS_FIRST_DOT_DE
1325 };
1326
1327 /* file is first dirent under @dir */
1328 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1329 {
1330         struct qstr *target = &dentry->d_name;
1331         struct md_op_data *op_data;
1332         int dot_de;
1333         struct page *page = NULL;
1334         int rc = LS_NOT_FIRST_DE;
1335         __u64 pos = 0;
1336         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1337
1338         ENTRY;
1339
1340         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1341                                      LUSTRE_OPC_ANY, dir);
1342         if (IS_ERR(op_data))
1343                 RETURN(PTR_ERR(op_data));
1344
1345         if (IS_ENCRYPTED(dir)) {
1346                 int rc2 = llcrypt_fname_alloc_buffer(dir, NAME_MAX, &lltr);
1347
1348                 if (rc2 < 0)
1349                         RETURN(rc2);
1350         }
1351
1352         /**
1353          *FIXME choose the start offset of the readdir
1354          */
1355
1356         page = ll_get_dir_page(dir, op_data, 0);
1357
1358         while (1) {
1359                 struct lu_dirpage *dp;
1360                 struct lu_dirent  *ent;
1361
1362                 if (IS_ERR(page)) {
1363                         struct ll_inode_info *lli = ll_i2info(dir);
1364
1365                         rc = PTR_ERR(page);
1366                         CERROR("%s: reading dir "DFID" at %llu opendir_pid = %u : rc = %d\n",
1367                                ll_i2sbi(dir)->ll_fsname,
1368                                PFID(ll_inode2fid(dir)), pos,
1369                                lli->lli_opendir_pid, rc);
1370                         break;
1371                 }
1372
1373                 dp = page_address(page);
1374                 for (ent = lu_dirent_start(dp); ent != NULL;
1375                      ent = lu_dirent_next(ent)) {
1376                         __u64 hash;
1377                         int namelen;
1378                         char *name;
1379
1380                         hash = le64_to_cpu(ent->lde_hash);
1381                         /*
1382                          * The ll_get_dir_page() can return any page containing
1383                          * the given hash which may be not the start hash.
1384                          */
1385                         if (unlikely(hash < pos))
1386                                 continue;
1387
1388                         namelen = le16_to_cpu(ent->lde_namelen);
1389                         if (unlikely(namelen == 0))
1390                                 /*
1391                                  * skip dummy record.
1392                                  */
1393                                 continue;
1394
1395                         name = ent->lde_name;
1396                         if (name[0] == '.') {
1397                                 if (namelen == 1)
1398                                         /*
1399                                          * skip "."
1400                                          */
1401                                         continue;
1402                                 else if (name[1] == '.' && namelen == 2)
1403                                         /*
1404                                          * skip ".."
1405                                          */
1406                                         continue;
1407                                 else
1408                                         dot_de = 1;
1409                         } else {
1410                                 dot_de = 0;
1411                         }
1412
1413                         if (dot_de && target->name[0] != '.') {
1414                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1415                                        target->len, target->name,
1416                                        namelen, name);
1417                                 continue;
1418                         }
1419
1420                         if (IS_ENCRYPTED(dir)) {
1421                                 struct llcrypt_str de_name =
1422                                         LLTR_INIT(ent->lde_name, namelen);
1423                                 struct lu_fid fid;
1424
1425                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1426                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1427                                                          &lltr, &fid))
1428                                         continue;
1429                                 name = lltr.name;
1430                                 namelen = lltr.len;
1431                         }
1432
1433                         if (target->len != namelen ||
1434                             memcmp(target->name, name, namelen) != 0)
1435                                 rc = LS_NOT_FIRST_DE;
1436                         else if (!dot_de)
1437                                 rc = LS_FIRST_DE;
1438                         else
1439                                 rc = LS_FIRST_DOT_DE;
1440
1441                         ll_release_page(dir, page, false);
1442                         GOTO(out, rc);
1443                 }
1444                 pos = le64_to_cpu(dp->ldp_hash_end);
1445                 if (pos == MDS_DIR_END_OFF) {
1446                         /*
1447                          * End of directory reached.
1448                          */
1449                         ll_release_page(dir, page, false);
1450                         GOTO(out, rc);
1451                 } else {
1452                         /*
1453                          * chain is exhausted
1454                          * Normal case: continue to the next page.
1455                          */
1456                         ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1457                                               LDF_COLLIDE);
1458                         page = ll_get_dir_page(dir, op_data, pos);
1459                 }
1460         }
1461         EXIT;
1462 out:
1463         llcrypt_fname_free_buffer(&lltr);
1464         ll_finish_md_op_data(op_data);
1465
1466         return rc;
1467 }
1468
1469 /**
1470  * revalidate @dentryp from statahead cache
1471  *
1472  * \param[in] dir       parent directory
1473  * \param[in] sai       sai structure
1474  * \param[out] dentryp  pointer to dentry which will be revalidated
1475  * \param[in] unplug    unplug statahead window only (normally for negative
1476  *                      dentry)
1477  * \retval              1 on success, dentry is saved in @dentryp
1478  * \retval              0 if revalidation failed (no proper lock on client)
1479  * \retval              negative number upon error
1480  */
1481 static int revalidate_statahead_dentry(struct inode *dir,
1482                                        struct ll_statahead_info *sai,
1483                                        struct dentry **dentryp,
1484                                        bool unplug)
1485 {
1486         struct sa_entry *entry = NULL;
1487         struct ll_inode_info *lli = ll_i2info(dir);
1488         int rc = 0;
1489
1490         ENTRY;
1491
1492         if ((*dentryp)->d_name.name[0] == '.') {
1493                 if (sai->sai_ls_all ||
1494                     sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1495                         /*
1496                          * Hidden dentry is the first one, or statahead
1497                          * thread does not skip so many hidden dentries
1498                          * before "sai_ls_all" enabled as below.
1499                          */
1500                 } else {
1501                         if (!sai->sai_ls_all)
1502                                 /*
1503                                  * It maybe because hidden dentry is not
1504                                  * the first one, "sai_ls_all" was not
1505                                  * set, then "ls -al" missed. Enable
1506                                  * "sai_ls_all" for such case.
1507                                  */
1508                                 sai->sai_ls_all = 1;
1509
1510                         /*
1511                          * Such "getattr" has been skipped before
1512                          * "sai_ls_all" enabled as above.
1513                          */
1514                         sai->sai_miss_hidden++;
1515                         RETURN(-EAGAIN);
1516                 }
1517         }
1518
1519         if (unplug)
1520                 GOTO(out, rc = 1);
1521
1522         entry = sa_get(sai, &(*dentryp)->d_name);
1523         if (!entry)
1524                 GOTO(out, rc = -EAGAIN);
1525
1526         /* if statahead is busy in readdir, help it do post-work */
1527         if (!sa_ready(entry) && sai->sai_in_readpage)
1528                 sa_handle_callback(sai);
1529
1530         if (!sa_ready(entry)) {
1531                 spin_lock(&lli->lli_sa_lock);
1532                 sai->sai_index_wait = entry->se_index;
1533                 spin_unlock(&lli->lli_sa_lock);
1534                 rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
1535                                              cfs_time_seconds(30));
1536                 if (rc == 0) {
1537                         /*
1538                          * entry may not be ready, so it may be used by inflight
1539                          * statahead RPC, don't free it.
1540                          */
1541                         entry = NULL;
1542                         GOTO(out, rc = -EAGAIN);
1543                 }
1544         }
1545
1546         /*
1547          * We need to see the value that was set immediately before we
1548          * were woken up.
1549          */
1550         if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
1551             entry->se_inode) {
1552                 struct inode *inode = entry->se_inode;
1553                 struct lookup_intent it = { .it_op = IT_GETATTR,
1554                                             .it_lock_handle =
1555                                                 entry->se_handle };
1556                 __u64 bits;
1557
1558                 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1559                                         ll_inode2fid(inode), &bits);
1560                 if (rc == 1) {
1561                         if (!(*dentryp)->d_inode) {
1562                                 struct dentry *alias;
1563
1564                                 alias = ll_splice_alias(inode, *dentryp);
1565                                 if (IS_ERR(alias)) {
1566                                         ll_intent_release(&it);
1567                                         GOTO(out, rc = PTR_ERR(alias));
1568                                 }
1569                                 *dentryp = alias;
1570                                 /*
1571                                  * statahead prepared this inode, transfer inode
1572                                  * refcount from sa_entry to dentry
1573                                  */
1574                                 entry->se_inode = NULL;
1575                         } else if ((*dentryp)->d_inode != inode) {
1576                                 /* revalidate, but inode is recreated */
1577                                 CDEBUG(D_READA,
1578                                        "%s: stale dentry %pd inode " DFID", statahead inode "DFID "\n",
1579                                        ll_i2sbi(inode)->ll_fsname, *dentryp,
1580                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
1581                                        PFID(ll_inode2fid(inode)));
1582                                 ll_intent_release(&it);
1583                                 GOTO(out, rc = -ESTALE);
1584                         }
1585
1586                         if ((bits & MDS_INODELOCK_LOOKUP) &&
1587                             d_lustre_invalid(*dentryp)) {
1588                                 d_lustre_revalidate(*dentryp);
1589                                 ll_update_dir_depth(dir, (*dentryp)->d_inode);
1590                         }
1591
1592                         ll_intent_release(&it);
1593                 }
1594         }
1595 out:
1596         /*
1597          * statahead cached sa_entry can be used only once, and will be killed
1598          * right after use, so if lookup/revalidate accessed statahead cache,
1599          * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
1600          * stat this file again, we know we've done statahead before, see
1601          * dentry_may_statahead().
1602          */
1603         if (lld_is_init(*dentryp))
1604                 ll_d2d(*dentryp)->lld_sa_generation = lli->lli_sa_generation;
1605         sa_put(sai, entry);
1606         spin_lock(&lli->lli_sa_lock);
1607         if (sai->sai_task)
1608                 wake_up_process(sai->sai_task);
1609         spin_unlock(&lli->lli_sa_lock);
1610
1611         RETURN(rc);
1612 }
1613
1614 /**
1615  * start statahead thread
1616  *
1617  * \param[in] dir       parent directory
1618  * \param[in] dentry    dentry that triggers statahead, normally the first
1619  *                      dirent under @dir
1620  * \param[in] agl       indicate whether AGL is needed
1621  * \retval              -EAGAIN on success, because when this function is
1622  *                      called, it's already in lookup call, so client should
1623  *                      do it itself instead of waiting for statahead thread
1624  *                      to do it asynchronously.
1625  * \retval              negative number upon error
1626  */
1627 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
1628                                   bool agl)
1629 {
1630         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1631         struct ll_inode_info *lli = ll_i2info(dir);
1632         struct ll_statahead_info *sai = NULL;
1633         struct dentry *parent = dentry->d_parent;
1634         struct task_struct *task;
1635         struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
1636         int first = LS_FIRST_DE;
1637         int rc = 0;
1638
1639         ENTRY;
1640
1641         /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1642         first = is_first_dirent(dir, dentry);
1643         if (first == LS_NOT_FIRST_DE)
1644                 /* It is not "ls -{a}l" operation, no need statahead for it. */
1645                 GOTO(out, rc = -EFAULT);
1646
1647         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
1648                                        sbi->ll_sa_running_max)) {
1649                 CDEBUG(D_READA,
1650                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
1651                 GOTO(out, rc = -EMFILE);
1652         }
1653
1654         sai = ll_sai_alloc(parent);
1655         if (!sai)
1656                 GOTO(out, rc = -ENOMEM);
1657
1658         sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
1659
1660         /*
1661          * if current lli_opendir_key was deauthorized, or dir re-opened by
1662          * another process, don't start statahead, otherwise the newly spawned
1663          * statahead thread won't be notified to quit.
1664          */
1665         spin_lock(&lli->lli_sa_lock);
1666         if (unlikely(lli->lli_sai || !lli->lli_opendir_key ||
1667                      lli->lli_opendir_pid != current->pid)) {
1668                 spin_unlock(&lli->lli_sa_lock);
1669                 GOTO(out, rc = -EPERM);
1670         }
1671         lli->lli_sai = sai;
1672         spin_unlock(&lli->lli_sa_lock);
1673
1674         CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
1675                current->pid, parent);
1676
1677         task = kthread_create_on_node(ll_statahead_thread, parent, node,
1678                                       "ll_sa_%u", lli->lli_opendir_pid);
1679         if (IS_ERR(task)) {
1680                 spin_lock(&lli->lli_sa_lock);
1681                 lli->lli_sai = NULL;
1682                 spin_unlock(&lli->lli_sa_lock);
1683                 rc = PTR_ERR(task);
1684                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1685                 GOTO(out, rc);
1686         }
1687
1688         if (ll_i2sbi(parent->d_inode)->ll_flags & LL_SBI_AGL_ENABLED && agl)
1689                 ll_start_agl(parent, sai);
1690
1691         atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_total);
1692         sai->sai_task = task;
1693
1694         wake_up_process(task);
1695         /*
1696          * We don't stat-ahead for the first dirent since we are already in
1697          * lookup.
1698          */
1699         RETURN(-EAGAIN);
1700
1701 out:
1702         /*
1703          * once we start statahead thread failed, disable statahead so that
1704          * subsequent stat won't waste time to try it.
1705          */
1706         spin_lock(&lli->lli_sa_lock);
1707         if (lli->lli_opendir_pid == current->pid)
1708                 lli->lli_sa_enabled = 0;
1709         spin_unlock(&lli->lli_sa_lock);
1710
1711         if (sai)
1712                 ll_sai_free(sai);
1713         if (first != LS_NOT_FIRST_DE)
1714                 atomic_dec(&sbi->ll_sa_running);
1715
1716         RETURN(rc);
1717 }
1718
1719 /*
1720  * Check whether statahead for @dir was started.
1721  */
1722 static inline bool ll_statahead_started(struct inode *dir, bool agl)
1723 {
1724         struct ll_inode_info *lli = ll_i2info(dir);
1725         struct ll_statahead_info *sai;
1726
1727         spin_lock(&lli->lli_sa_lock);
1728         sai = lli->lli_sai;
1729         if (sai && (sai->sai_agl_task != NULL) != agl)
1730                 CDEBUG(D_READA,
1731                        "%s: Statahead AGL hint changed from %d to %d\n",
1732                        ll_i2sbi(dir)->ll_fsname,
1733                        sai->sai_agl_task != NULL, agl);
1734         spin_unlock(&lli->lli_sa_lock);
1735
1736         return !!sai;
1737 }
1738
1739 /**
1740  * statahead entry function, this is called when client getattr on a file, it
1741  * will start statahead thread if this is the first dir entry, else revalidate
1742  * dentry from statahead cache.
1743  *
1744  * \param[in]  dir      parent directory
1745  * \param[out] dentryp  dentry to getattr
1746  * \param[in]  agl      whether start the agl thread
1747  *
1748  * \retval              1 on success
1749  * \retval              0 revalidation from statahead cache failed, caller needs
1750  *                      to getattr from server directly
1751  * \retval              negative number on error, caller often ignores this and
1752  *                      then getattr from server
1753  */
1754 int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
1755 {
1756         if (!ll_statahead_started(dir, agl))
1757                 return start_statahead_thread(dir, dentry, agl);
1758         return 0;
1759 }
1760
1761 /**
1762  * revalidate dentry from statahead cache.
1763  *
1764  * \param[in]  dir      parent directory
1765  * \param[out] dentryp  dentry to getattr
1766  * \param[in]  unplug   unplug statahead window only (normally for negative
1767  *                      dentry)
1768  * \retval              1 on success
1769  * \retval              0 revalidation from statahead cache failed, caller needs
1770  *                      to getattr from server directly
1771  * \retval              negative number on error, caller often ignores this and
1772  *                      then getattr from server
1773  */
1774 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
1775                             bool unplug)
1776 {
1777         struct ll_statahead_info *sai;
1778         int rc = 0;
1779
1780         sai = ll_sai_get(dir);
1781         if (sai) {
1782                 rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
1783                 CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
1784                        *dentryp, rc);
1785                 ll_sai_put(sai);
1786         }
1787         return rc;
1788 }