Whamcloud - gitweb
e0aef8b63c5964082bdddee96a8d6e98b68431f7
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #include <linux/fs.h>
33 #include <linux/sched.h>
34 #include <linux/kthread.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38 #include <linux/delay.h>
39
40 #define DEBUG_SUBSYSTEM S_LLITE
41
42 #include <obd_support.h>
43 #include <lustre_dlm.h>
44 #include "llite_internal.h"
45
46 #define SA_OMITTED_ENTRY_MAX 8ULL
47
48 typedef enum {
49         /** negative values are for error cases */
50         SA_ENTRY_INIT = 0,      /** init entry */
51         SA_ENTRY_SUCC = 1,      /** stat succeed */
52         SA_ENTRY_INVA = 2,      /** invalid entry */
53 } se_state_t;
54
55 /*
56  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
57  * and in async stat callback ll_statahead_interpret() will add it into
58  * sai_interim_entries, later statahead thread will call sa_handle_callback() to
59  * instantiate entry and move it into sai_entries, and then only scanner process
60  * can access and free it.
61  */
62 struct sa_entry {
63         /* link into sai_interim_entries or sai_entries */
64         struct list_head        se_list;
65         /* link into sai hash table locally */
66         struct list_head        se_hash;
67         /* entry index in the sai */
68         __u64                   se_index;
69         /* low layer ldlm lock handle */
70         __u64                   se_handle;
71         /* entry status */
72         se_state_t              se_state;
73         /* entry size, contains name */
74         int                     se_size;
75         /* pointer to async getattr enqueue info */
76         struct md_enqueue_info *se_minfo;
77         /* pointer to the async getattr request */
78         struct ptlrpc_request  *se_req;
79         /* pointer to the target inode */
80         struct inode           *se_inode;
81         /* entry name */
82         struct qstr             se_qstr;
83         /* entry fid */
84         struct lu_fid           se_fid;
85 };
86
87 static unsigned int sai_generation;
88 static DEFINE_SPINLOCK(sai_generation_lock);
89
90 static inline int sa_unhashed(struct sa_entry *entry)
91 {
92         return list_empty(&entry->se_hash);
93 }
94
95 /* sa_entry is ready to use */
96 static inline int sa_ready(struct sa_entry *entry)
97 {
98         /* Make sure sa_entry is updated and ready to use */
99         smp_rmb();
100         return (entry->se_state != SA_ENTRY_INIT);
101 }
102
103 /* hash value to put in sai_cache */
104 static inline int sa_hash(int val)
105 {
106         return val & LL_SA_CACHE_MASK;
107 }
108
109 /* hash entry into sai_cache */
110 static inline void
111 sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
112 {
113         int i = sa_hash(entry->se_qstr.hash);
114
115         spin_lock(&sai->sai_cache_lock[i]);
116         list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
117         spin_unlock(&sai->sai_cache_lock[i]);
118 }
119
120 /* unhash entry from sai_cache */
121 static inline void
122 sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
123 {
124         int i = sa_hash(entry->se_qstr.hash);
125
126         spin_lock(&sai->sai_cache_lock[i]);
127         list_del_init(&entry->se_hash);
128         spin_unlock(&sai->sai_cache_lock[i]);
129 }
130
131 static inline int agl_should_run(struct ll_statahead_info *sai,
132                                  struct inode *inode)
133 {
134         return inode && S_ISREG(inode->i_mode) && sai->sai_agl_task;
135 }
136
137 static inline struct ll_inode_info *
138 agl_first_entry(struct ll_statahead_info *sai)
139 {
140         return list_first_entry(&sai->sai_agls, struct ll_inode_info,
141                                 lli_agl_list);
142 }
143
144 /* statahead window is full */
145 static inline int sa_sent_full(struct ll_statahead_info *sai)
146 {
147         return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
148 }
149
150 /* got async stat replies */
151 static inline int sa_has_callback(struct ll_statahead_info *sai)
152 {
153         return !list_empty(&sai->sai_interim_entries);
154 }
155
156 static inline int agl_list_empty(struct ll_statahead_info *sai)
157 {
158         return list_empty(&sai->sai_agls);
159 }
160
161 /**
162  * (1) hit ratio less than 80%
163  * or
164  * (2) consecutive miss more than 8
165  * then means low hit.
166  */
167 static inline int sa_low_hit(struct ll_statahead_info *sai)
168 {
169         return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
170                 (sai->sai_consecutive_miss > 8));
171 }
172
173 /*
174  * if the given index is behind of statahead window more than
175  * SA_OMITTED_ENTRY_MAX, then it is old.
176  */
177 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
178 {
179         return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
180                 sai->sai_index);
181 }
182
183 /* allocate sa_entry and hash it to allow scanner process to find it */
184 static struct sa_entry *
185 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
186          const char *name, int len, const struct lu_fid *fid)
187 {
188         struct ll_inode_info *lli;
189         struct sa_entry *entry;
190         int entry_size;
191         char *dname;
192
193         ENTRY;
194
195         entry_size = sizeof(struct sa_entry) + (len & ~3) + 4;
196         OBD_ALLOC(entry, entry_size);
197         if (unlikely(!entry))
198                 RETURN(ERR_PTR(-ENOMEM));
199
200         CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
201                len, name, entry, index);
202
203         entry->se_index = index;
204
205         entry->se_state = SA_ENTRY_INIT;
206         entry->se_size = entry_size;
207         dname = (char *)entry + sizeof(struct sa_entry);
208         memcpy(dname, name, len);
209         dname[len] = 0;
210         entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
211         entry->se_qstr.len = len;
212         entry->se_qstr.name = dname;
213         entry->se_fid = *fid;
214
215         lli = ll_i2info(sai->sai_dentry->d_inode);
216
217         spin_lock(&lli->lli_sa_lock);
218         INIT_LIST_HEAD(&entry->se_list);
219         sa_rehash(sai, entry);
220         spin_unlock(&lli->lli_sa_lock);
221
222         atomic_inc(&sai->sai_cache_count);
223
224         RETURN(entry);
225 }
226
227 /* free sa_entry, which should have been unhashed and not in any list */
228 static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
229 {
230         CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
231                entry->se_qstr.len, entry->se_qstr.name, entry,
232                entry->se_index);
233
234         LASSERT(list_empty(&entry->se_list));
235         LASSERT(sa_unhashed(entry));
236
237         OBD_FREE(entry, entry->se_size);
238         atomic_dec(&sai->sai_cache_count);
239 }
240
241 /*
242  * find sa_entry by name, used by directory scanner, lock is not needed because
243  * only scanner can remove the entry from cache.
244  */
245 static struct sa_entry *
246 sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
247 {
248         struct sa_entry *entry;
249         int i = sa_hash(qstr->hash);
250
251         list_for_each_entry(entry, &sai->sai_cache[i], se_hash) {
252                 if (entry->se_qstr.hash == qstr->hash &&
253                     entry->se_qstr.len == qstr->len &&
254                     memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0)
255                         return entry;
256         }
257         return NULL;
258 }
259
260 /* unhash and unlink sa_entry, and then free it */
261 static inline void
262 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
263 {
264         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
265
266         LASSERT(!sa_unhashed(entry));
267         LASSERT(!list_empty(&entry->se_list));
268         LASSERT(sa_ready(entry));
269
270         sa_unhash(sai, entry);
271
272         spin_lock(&lli->lli_sa_lock);
273         list_del_init(&entry->se_list);
274         spin_unlock(&lli->lli_sa_lock);
275
276         iput(entry->se_inode);
277
278         sa_free(sai, entry);
279 }
280
281 /* called by scanner after use, sa_entry will be killed */
282 static void
283 sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
284 {
285         struct sa_entry *tmp, *next;
286
287         if (entry && entry->se_state == SA_ENTRY_SUCC) {
288                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
289
290                 sai->sai_hit++;
291                 sai->sai_consecutive_miss = 0;
292                 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
293         } else {
294                 sai->sai_miss++;
295                 sai->sai_consecutive_miss++;
296         }
297
298         if (entry)
299                 sa_kill(sai, entry);
300
301         /*
302          * kill old completed entries, only scanner process does this, no need
303          * to lock
304          */
305         list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) {
306                 if (!is_omitted_entry(sai, tmp->se_index))
307                         break;
308                 sa_kill(sai, tmp);
309         }
310 }
311
312 /*
313  * update state and sort add entry to sai_entries by index, return true if
314  * scanner is waiting on this entry.
315  */
316 static bool
317 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
318 {
319         struct sa_entry *se;
320         struct list_head *pos = &sai->sai_entries;
321         __u64 index = entry->se_index;
322
323         LASSERT(!sa_ready(entry));
324         LASSERT(list_empty(&entry->se_list));
325
326         list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
327                 if (se->se_index < entry->se_index) {
328                         pos = &se->se_list;
329                         break;
330                 }
331         }
332         list_add(&entry->se_list, pos);
333         /*
334          * LU-9210: ll_statahead_interpet must be able to see this before
335          * we wake it up
336          */
337         smp_store_release(&entry->se_state,
338                           ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
339
340         return (index == sai->sai_index_wait);
341 }
342
343 /* finish async stat RPC arguments */
344 static void sa_fini_data(struct md_enqueue_info *minfo)
345 {
346         struct md_op_data *op_data = &minfo->mi_data;
347
348         if (op_data->op_flags & MF_OPNAME_KMALLOCED)
349                 /* allocated via ll_setup_filename called from sa_prep_data */
350                 kfree(op_data->op_name);
351         ll_unlock_md_op_lsm(&minfo->mi_data);
352         iput(minfo->mi_dir);
353         OBD_FREE_PTR(minfo);
354 }
355
356 static int ll_statahead_interpret(struct ptlrpc_request *req,
357                                   struct md_enqueue_info *minfo, int rc);
358
359 /*
360  * prepare arguments for async stat RPC.
361  */
362 static struct md_enqueue_info *
363 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
364 {
365         struct md_enqueue_info   *minfo;
366         struct ldlm_enqueue_info *einfo;
367         struct md_op_data        *op_data;
368
369         OBD_ALLOC_PTR(minfo);
370         if (!minfo)
371                 return ERR_PTR(-ENOMEM);
372
373         op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child,
374                                      entry->se_qstr.name, entry->se_qstr.len, 0,
375                                      LUSTRE_OPC_ANY, NULL);
376         if (IS_ERR(op_data)) {
377                 OBD_FREE_PTR(minfo);
378                 return (struct md_enqueue_info *)op_data;
379         }
380
381         if (!child)
382                 op_data->op_fid2 = entry->se_fid;
383
384         minfo->mi_it.it_op = IT_GETATTR;
385         minfo->mi_dir = igrab(dir);
386         minfo->mi_cb = ll_statahead_interpret;
387         minfo->mi_cbdata = entry;
388
389         einfo = &minfo->mi_einfo;
390         einfo->ei_type   = LDLM_IBITS;
391         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
392         einfo->ei_cb_bl  = ll_md_blocking_ast;
393         einfo->ei_cb_cp  = ldlm_completion_ast;
394         einfo->ei_cb_gl  = NULL;
395         einfo->ei_cbdata = NULL;
396
397         return minfo;
398 }
399
400 /*
401  * release resources used in async stat RPC, update entry state and wakeup if
402  * scanner process it waiting on this entry.
403  */
404 static void
405 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
406 {
407         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
408         struct md_enqueue_info *minfo = entry->se_minfo;
409         struct ptlrpc_request *req = entry->se_req;
410         bool wakeup;
411
412         /* release resources used in RPC */
413         if (minfo) {
414                 entry->se_minfo = NULL;
415                 ll_intent_release(&minfo->mi_it);
416                 sa_fini_data(minfo);
417         }
418
419         if (req) {
420                 entry->se_req = NULL;
421                 ptlrpc_req_finished(req);
422         }
423
424         spin_lock(&lli->lli_sa_lock);
425         wakeup = __sa_make_ready(sai, entry, ret);
426         spin_unlock(&lli->lli_sa_lock);
427
428         if (wakeup)
429                 wake_up(&sai->sai_waitq);
430 }
431
432 /* insert inode into the list of sai_agls */
433 static void ll_agl_add(struct ll_statahead_info *sai,
434                        struct inode *inode, int index)
435 {
436         struct ll_inode_info *child  = ll_i2info(inode);
437         struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
438
439         spin_lock(&child->lli_agl_lock);
440         if (child->lli_agl_index == 0) {
441                 child->lli_agl_index = index;
442                 spin_unlock(&child->lli_agl_lock);
443
444                 LASSERT(list_empty(&child->lli_agl_list));
445
446                 spin_lock(&parent->lli_agl_lock);
447                 /* Re-check under the lock */
448                 if (agl_should_run(sai, inode)) {
449                         if (agl_list_empty(sai))
450                                 wake_up_process(sai->sai_agl_task);
451                         igrab(inode);
452                         list_add_tail(&child->lli_agl_list, &sai->sai_agls);
453                 } else
454                         child->lli_agl_index = 0;
455                 spin_unlock(&parent->lli_agl_lock);
456         } else {
457                 spin_unlock(&child->lli_agl_lock);
458         }
459 }
460
461 /* allocate sai */
462 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
463 {
464         struct ll_statahead_info *sai;
465         struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
466         int i;
467
468         ENTRY;
469
470         OBD_ALLOC_PTR(sai);
471         if (!sai)
472                 RETURN(NULL);
473
474         sai->sai_dentry = dget(dentry);
475         atomic_set(&sai->sai_refcount, 1);
476         sai->sai_max = LL_SA_RPC_MIN;
477         sai->sai_index = 1;
478         init_waitqueue_head(&sai->sai_waitq);
479
480         INIT_LIST_HEAD(&sai->sai_interim_entries);
481         INIT_LIST_HEAD(&sai->sai_entries);
482         INIT_LIST_HEAD(&sai->sai_agls);
483
484         for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
485                 INIT_LIST_HEAD(&sai->sai_cache[i]);
486                 spin_lock_init(&sai->sai_cache_lock[i]);
487         }
488         atomic_set(&sai->sai_cache_count, 0);
489
490         spin_lock(&sai_generation_lock);
491         lli->lli_sa_generation = ++sai_generation;
492         if (unlikely(sai_generation == 0))
493                 lli->lli_sa_generation = ++sai_generation;
494         spin_unlock(&sai_generation_lock);
495
496         RETURN(sai);
497 }
498
499 /* free sai */
500 static inline void ll_sai_free(struct ll_statahead_info *sai)
501 {
502         LASSERT(sai->sai_dentry != NULL);
503         dput(sai->sai_dentry);
504         OBD_FREE_PTR(sai);
505 }
506
507 /*
508  * take refcount of sai if sai for @dir exists, which means statahead is on for
509  * this directory.
510  */
511 static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
512 {
513         struct ll_inode_info *lli = ll_i2info(dir);
514         struct ll_statahead_info *sai = NULL;
515
516         spin_lock(&lli->lli_sa_lock);
517         sai = lli->lli_sai;
518         if (sai)
519                 atomic_inc(&sai->sai_refcount);
520         spin_unlock(&lli->lli_sa_lock);
521
522         return sai;
523 }
524
525 /*
526  * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
527  * attached to it.
528  */
529 static void ll_sai_put(struct ll_statahead_info *sai)
530 {
531         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
532
533         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
534                 struct sa_entry *entry, *next;
535                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
536
537                 lli->lli_sai = NULL;
538                 spin_unlock(&lli->lli_sa_lock);
539
540                 LASSERT(!sai->sai_task);
541                 LASSERT(!sai->sai_agl_task);
542                 LASSERT(sai->sai_sent == sai->sai_replied);
543                 LASSERT(!sa_has_callback(sai));
544
545                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
546                                          se_list)
547                         sa_kill(sai, entry);
548
549                 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
550                 LASSERT(agl_list_empty(sai));
551
552                 ll_sai_free(sai);
553                 atomic_dec(&sbi->ll_sa_running);
554         }
555 }
556
557 /* Do NOT forget to drop inode refcount when into sai_agls. */
558 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
559 {
560         struct ll_inode_info *lli = ll_i2info(inode);
561         u64 index = lli->lli_agl_index;
562         ktime_t expire;
563         int rc;
564
565         ENTRY;
566
567         LASSERT(list_empty(&lli->lli_agl_list));
568
569         /* AGL maybe fall behind statahead with one entry */
570         if (is_omitted_entry(sai, index + 1)) {
571                 lli->lli_agl_index = 0;
572                 iput(inode);
573                 RETURN_EXIT;
574         }
575
576         /*
577          * In case of restore, the MDT has the right size and has already
578          * sent it back without granting the layout lock, inode is up-to-date.
579          * Then AGL (async glimpse lock) is useless.
580          * Also to glimpse we need the layout, in case of a runninh restore
581          * the MDT holds the layout lock so the glimpse will block up to the
582          * end of restore (statahead/agl will block)
583          */
584         if (test_bit(LLIF_FILE_RESTORING, &lli->lli_flags)) {
585                 lli->lli_agl_index = 0;
586                 iput(inode);
587                 RETURN_EXIT;
588         }
589
590         /* Someone is in glimpse (sync or async), do nothing. */
591         rc = down_write_trylock(&lli->lli_glimpse_sem);
592         if (rc == 0) {
593                 lli->lli_agl_index = 0;
594                 iput(inode);
595                 RETURN_EXIT;
596         }
597
598         /*
599          * Someone triggered glimpse within 1 sec before.
600          * 1) The former glimpse succeeded with glimpse lock granted by OST, and
601          *    if the lock is still cached on client, AGL needs to do nothing. If
602          *    it is cancelled by other client, AGL maybe cannot obtaion new lock
603          *    for no glimpse callback triggered by AGL.
604          * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
605          *    Under such case, it is quite possible that the OST will not grant
606          *    glimpse lock for AGL also.
607          * 3) The former glimpse failed, compared with other two cases, it is
608          *    relative rare. AGL can ignore such case, and it will not muchly
609          *    affect the performance.
610          */
611         expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
612         if (ktime_to_ns(lli->lli_glimpse_time) &&
613             ktime_before(expire, lli->lli_glimpse_time)) {
614                 up_write(&lli->lli_glimpse_sem);
615                 lli->lli_agl_index = 0;
616                 iput(inode);
617                 RETURN_EXIT;
618         }
619
620         CDEBUG(D_READA,
621                "Handling (init) async glimpse: inode = " DFID", idx = %llu\n",
622                PFID(&lli->lli_fid), index);
623
624         cl_agl(inode);
625         lli->lli_agl_index = 0;
626         lli->lli_glimpse_time = ktime_get();
627         up_write(&lli->lli_glimpse_sem);
628
629         CDEBUG(D_READA,
630                "Handled (init) async glimpse: inode= " DFID", idx = %llu, rc = %d\n",
631                PFID(&lli->lli_fid), index, rc);
632
633         iput(inode);
634
635         EXIT;
636 }
637
638 /*
639  * prepare inode for sa entry, add it into agl list, now sa_entry is ready
640  * to be used by scanner process.
641  */
642 static void sa_instantiate(struct ll_statahead_info *sai,
643                            struct sa_entry *entry)
644 {
645         struct inode *dir = sai->sai_dentry->d_inode;
646         struct inode *child;
647         struct md_enqueue_info *minfo;
648         struct lookup_intent *it;
649         struct ptlrpc_request *req;
650         struct mdt_body *body;
651         int rc = 0;
652
653         ENTRY;
654
655         LASSERT(entry->se_handle != 0);
656
657         minfo = entry->se_minfo;
658         it = &minfo->mi_it;
659         req = entry->se_req;
660         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
661         if (!body)
662                 GOTO(out, rc = -EFAULT);
663
664         child = entry->se_inode;
665         /* revalidate; unlinked and re-created with the same name */
666         if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2, &body->mbo_fid1))) {
667                 if (child) {
668                         entry->se_inode = NULL;
669                         iput(child);
670                 }
671                 /* The mdt_body is invalid. Skip this entry */
672                 GOTO(out, rc = -EAGAIN);
673         }
674
675         it->it_lock_handle = entry->se_handle;
676         rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
677         if (rc != 1)
678                 GOTO(out, rc = -EAGAIN);
679
680         rc = ll_prep_inode(&child, &req->rq_pill, dir->i_sb, it);
681         if (rc)
682                 GOTO(out, rc);
683
684         CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
685                ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
686                entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
687         ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
688
689         entry->se_inode = child;
690
691         if (agl_should_run(sai, child))
692                 ll_agl_add(sai, child, entry->se_index);
693
694         EXIT;
695
696 out:
697         /*
698          * sa_make_ready() will drop ldlm ibits lock refcount by calling
699          * ll_intent_drop_lock() in spite of failures. Do not worry about
700          * calling ll_intent_drop_lock() more than once.
701          */
702         sa_make_ready(sai, entry, rc);
703 }
704
705 /* once there are async stat replies, instantiate sa_entry from replies */
706 static void sa_handle_callback(struct ll_statahead_info *sai)
707 {
708         struct ll_inode_info *lli;
709
710         lli = ll_i2info(sai->sai_dentry->d_inode);
711
712         spin_lock(&lli->lli_sa_lock);
713         while (sa_has_callback(sai)) {
714                 struct sa_entry *entry;
715
716                 entry = list_entry(sai->sai_interim_entries.next,
717                                    struct sa_entry, se_list);
718                 list_del_init(&entry->se_list);
719                 spin_unlock(&lli->lli_sa_lock);
720
721                 sa_instantiate(sai, entry);
722                 spin_lock(&lli->lli_sa_lock);
723         }
724         spin_unlock(&lli->lli_sa_lock);
725 }
726
727 /*
728  * callback for async stat RPC, because this is called in ptlrpcd context, we
729  * only put sa_entry in sai_interim_entries, and wake up statahead thread to
730  * really prepare inode and instantiate sa_entry later.
731  */
732 static int ll_statahead_interpret(struct ptlrpc_request *req,
733                                   struct md_enqueue_info *minfo, int rc)
734 {
735         struct lookup_intent *it = &minfo->mi_it;
736         struct inode *dir = minfo->mi_dir;
737         struct ll_inode_info *lli = ll_i2info(dir);
738         struct ll_statahead_info *sai = lli->lli_sai;
739         struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
740         __u64 handle = 0;
741
742         ENTRY;
743
744         if (it_disposition(it, DISP_LOOKUP_NEG))
745                 rc = -ENOENT;
746
747         /*
748          * because statahead thread will wait for all inflight RPC to finish,
749          * sai should be always valid, no need to refcount
750          */
751         LASSERT(sai != NULL);
752         LASSERT(entry != NULL);
753
754         CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
755                entry->se_qstr.len, entry->se_qstr.name, rc);
756
757         if (rc != 0) {
758                 ll_intent_release(it);
759                 sa_fini_data(minfo);
760         } else {
761                 /*
762                  * release ibits lock ASAP to avoid deadlock when statahead
763                  * thread enqueues lock on parent in readdir and another
764                  * process enqueues lock on child with parent lock held, eg.
765                  * unlink.
766                  */
767                 handle = it->it_lock_handle;
768                 ll_intent_drop_lock(it);
769                 ll_unlock_md_op_lsm(&minfo->mi_data);
770         }
771
772         spin_lock(&lli->lli_sa_lock);
773         if (rc != 0) {
774                 if (__sa_make_ready(sai, entry, rc))
775                         wake_up(&sai->sai_waitq);
776         } else {
777                 int first = 0;
778
779                 entry->se_minfo = minfo;
780                 entry->se_req = ptlrpc_request_addref(req);
781                 /*
782                  * Release the async ibits lock ASAP to avoid deadlock
783                  * when statahead thread tries to enqueue lock on parent
784                  * for readpage and other tries to enqueue lock on child
785                  * with parent's lock held, for example: unlink.
786                  */
787                 entry->se_handle = handle;
788                 if (!sa_has_callback(sai))
789                         first = 1;
790
791                 list_add_tail(&entry->se_list, &sai->sai_interim_entries);
792                 if (first && sai->sai_task)
793                         wake_up_process(sai->sai_task);
794         }
795         sai->sai_replied++;
796
797         spin_unlock(&lli->lli_sa_lock);
798
799         RETURN(rc);
800 }
801
802 /* async stat for file not found in dcache */
803 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
804 {
805         struct md_enqueue_info   *minfo;
806         int                       rc;
807
808         ENTRY;
809
810         minfo = sa_prep_data(dir, NULL, entry);
811         if (IS_ERR(minfo))
812                 RETURN(PTR_ERR(minfo));
813
814         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
815         if (rc < 0)
816                 sa_fini_data(minfo);
817
818         RETURN(rc);
819 }
820
821 /**
822  * async stat for file found in dcache, similar to .revalidate
823  *
824  * \retval      1 dentry valid, no RPC sent
825  * \retval      0 dentry invalid, will send async stat RPC
826  * \retval      negative number upon error
827  */
828 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
829                          struct dentry *dentry)
830 {
831         struct inode *inode = dentry->d_inode;
832         struct lookup_intent it = { .it_op = IT_GETATTR,
833                                     .it_lock_handle = 0 };
834         struct md_enqueue_info *minfo;
835         int rc;
836
837         ENTRY;
838
839         if (unlikely(!inode))
840                 RETURN(1);
841
842         if (d_mountpoint(dentry))
843                 RETURN(1);
844
845         minfo = sa_prep_data(dir, inode, entry);
846         if (IS_ERR(minfo))
847                 RETURN(PTR_ERR(minfo));
848
849         entry->se_inode = igrab(inode);
850         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
851                                 NULL);
852         if (rc == 1) {
853                 entry->se_handle = it.it_lock_handle;
854                 ll_intent_release(&it);
855                 sa_fini_data(minfo);
856                 RETURN(1);
857         }
858
859         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
860         if (rc < 0) {
861                 entry->se_inode = NULL;
862                 iput(inode);
863                 sa_fini_data(minfo);
864         }
865
866         RETURN(rc);
867 }
868
869 /* async stat for file with @name */
870 static void sa_statahead(struct dentry *parent, const char *name, int len,
871                          const struct lu_fid *fid)
872 {
873         struct inode *dir = parent->d_inode;
874         struct ll_inode_info *lli = ll_i2info(dir);
875         struct ll_statahead_info *sai = lli->lli_sai;
876         struct dentry *dentry = NULL;
877         struct sa_entry *entry;
878         int rc;
879
880         ENTRY;
881
882         entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
883         if (IS_ERR(entry))
884                 RETURN_EXIT;
885
886         dentry = d_lookup(parent, &entry->se_qstr);
887         if (!dentry) {
888                 rc = sa_lookup(dir, entry);
889         } else {
890                 rc = sa_revalidate(dir, entry, dentry);
891                 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
892                         ll_agl_add(sai, dentry->d_inode, entry->se_index);
893         }
894
895         if (dentry)
896                 dput(dentry);
897
898         if (rc != 0)
899                 sa_make_ready(sai, entry, rc);
900         else
901                 sai->sai_sent++;
902
903         sai->sai_index++;
904
905         EXIT;
906 }
907
908 /* async glimpse (agl) thread main function */
909 static int ll_agl_thread(void *arg)
910 {
911         struct dentry *parent = (struct dentry *)arg;
912         struct inode *dir = parent->d_inode;
913         struct ll_inode_info *plli = ll_i2info(dir);
914         struct ll_inode_info *clli;
915         /*
916          * We already own this reference, so it is safe to take it
917          * without a lock.
918          */
919         struct ll_statahead_info *sai = plli->lli_sai;
920
921         ENTRY;
922
923         CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
924                sai, parent);
925
926         while (({set_current_state(TASK_IDLE);
927                  !kthread_should_stop(); })) {
928                 spin_lock(&plli->lli_agl_lock);
929                 clli = list_first_entry_or_null(&sai->sai_agls,
930                                                 struct ll_inode_info,
931                                                 lli_agl_list);
932                 if (clli) {
933                         __set_current_state(TASK_RUNNING);
934                         list_del_init(&clli->lli_agl_list);
935                         spin_unlock(&plli->lli_agl_lock);
936                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
937                         cond_resched();
938                 } else {
939                         spin_unlock(&plli->lli_agl_lock);
940                         schedule();
941                 }
942         }
943         __set_current_state(TASK_RUNNING);
944         RETURN(0);
945 }
946
947 static void ll_stop_agl(struct ll_statahead_info *sai)
948 {
949         struct dentry *parent = sai->sai_dentry;
950         struct ll_inode_info *plli = ll_i2info(parent->d_inode);
951         struct ll_inode_info *clli;
952         struct task_struct *agl_task;
953
954         spin_lock(&plli->lli_agl_lock);
955         agl_task = sai->sai_agl_task;
956         sai->sai_agl_task = NULL;
957         spin_unlock(&plli->lli_agl_lock);
958         if (!agl_task)
959                 return;
960
961         CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
962                sai, (unsigned int)agl_task->pid);
963         kthread_stop(agl_task);
964
965         spin_lock(&plli->lli_agl_lock);
966         while ((clli = list_first_entry_or_null(&sai->sai_agls,
967                                                 struct ll_inode_info,
968                                                 lli_agl_list)) != NULL) {
969                 list_del_init(&clli->lli_agl_list);
970                 spin_unlock(&plli->lli_agl_lock);
971                 clli->lli_agl_index = 0;
972                 iput(&clli->lli_vfs_inode);
973                 spin_lock(&plli->lli_agl_lock);
974         }
975         spin_unlock(&plli->lli_agl_lock);
976         CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
977                sai, parent);
978         ll_sai_put(sai);
979 }
980
981 /* start agl thread */
982 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
983 {
984         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
985         struct ll_inode_info *plli;
986         struct task_struct *task;
987
988         ENTRY;
989
990         CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
991                sai, parent);
992
993         plli = ll_i2info(parent->d_inode);
994         task = kthread_create_on_node(ll_agl_thread, parent, node, "ll_agl_%d",
995                                       plli->lli_opendir_pid);
996         if (IS_ERR(task)) {
997                 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
998                 RETURN_EXIT;
999         }
1000         sai->sai_agl_task = task;
1001         atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
1002         /* Get an extra reference that the thread holds */
1003         ll_sai_get(d_inode(parent));
1004
1005         wake_up_process(task);
1006
1007         EXIT;
1008 }
1009
1010 /* statahead thread main function */
1011 static int ll_statahead_thread(void *arg)
1012 {
1013         struct dentry *parent = (struct dentry *)arg;
1014         struct inode *dir = parent->d_inode;
1015         struct ll_inode_info *lli = ll_i2info(dir);
1016         struct ll_sb_info *sbi = ll_i2sbi(dir);
1017         struct ll_statahead_info *sai = lli->lli_sai;
1018         int first = 0;
1019         struct md_op_data *op_data;
1020         struct page *page = NULL;
1021         __u64 pos = 0;
1022         int rc = 0;
1023
1024         ENTRY;
1025
1026         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1027                sai, parent);
1028
1029         OBD_ALLOC_PTR(op_data);
1030         if (!op_data)
1031                 GOTO(out, rc = -ENOMEM);
1032
1033         while (pos != MDS_DIR_END_OFF && sai->sai_task) {
1034                 struct lu_dirpage *dp;
1035                 struct lu_dirent  *ent;
1036
1037                 op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
1038                                              LUSTRE_OPC_ANY, dir);
1039                 if (IS_ERR(op_data)) {
1040                         rc = PTR_ERR(op_data);
1041                         break;
1042                 }
1043
1044                 sai->sai_in_readpage = 1;
1045                 page = ll_get_dir_page(dir, op_data, pos);
1046                 ll_unlock_md_op_lsm(op_data);
1047                 sai->sai_in_readpage = 0;
1048                 if (IS_ERR(page)) {
1049                         rc = PTR_ERR(page);
1050                         CDEBUG(D_READA,
1051                                "error reading dir "DFID" at %llu /%llu opendir_pid = %u: rc = %d\n",
1052                                PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1053                                lli->lli_opendir_pid, rc);
1054                         break;
1055                 }
1056
1057                 dp = page_address(page);
1058                 for (ent = lu_dirent_start(dp);
1059                      ent != NULL && sai->sai_task &&
1060                      !sa_low_hit(sai);
1061                      ent = lu_dirent_next(ent)) {
1062                         __u64 hash;
1063                         int namelen;
1064                         char *name;
1065                         struct lu_fid fid;
1066                         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1067
1068                         hash = le64_to_cpu(ent->lde_hash);
1069                         if (unlikely(hash < pos))
1070                                 /*
1071                                  * Skip until we find target hash value.
1072                                  */
1073                                 continue;
1074
1075                         namelen = le16_to_cpu(ent->lde_namelen);
1076                         if (unlikely(namelen == 0))
1077                                 /*
1078                                  * Skip dummy record.
1079                                  */
1080                                 continue;
1081
1082                         name = ent->lde_name;
1083                         if (name[0] == '.') {
1084                                 if (namelen == 1) {
1085                                         /*
1086                                          * skip "."
1087                                          */
1088                                         continue;
1089                                 } else if (name[1] == '.' && namelen == 2) {
1090                                         /*
1091                                          * skip ".."
1092                                          */
1093                                         continue;
1094                                 } else if (!sai->sai_ls_all) {
1095                                         /*
1096                                          * skip hidden files.
1097                                          */
1098                                         sai->sai_skip_hidden++;
1099                                         continue;
1100                                 }
1101                         }
1102
1103                         /*
1104                          * don't stat-ahead first entry.
1105                          */
1106                         if (unlikely(++first == 1))
1107                                 continue;
1108
1109                         fid_le_to_cpu(&fid, &ent->lde_fid);
1110
1111                         while (({set_current_state(TASK_IDLE);
1112                                  sai->sai_task; })) {
1113                                 if (sa_has_callback(sai)) {
1114                                         __set_current_state(TASK_RUNNING);
1115                                         sa_handle_callback(sai);
1116                                 }
1117
1118                                 spin_lock(&lli->lli_agl_lock);
1119                                 while (sa_sent_full(sai) &&
1120                                        !agl_list_empty(sai)) {
1121                                         struct ll_inode_info *clli;
1122
1123                                         __set_current_state(TASK_RUNNING);
1124                                         clli = agl_first_entry(sai);
1125                                         list_del_init(&clli->lli_agl_list);
1126                                         spin_unlock(&lli->lli_agl_lock);
1127
1128                                         ll_agl_trigger(&clli->lli_vfs_inode,
1129                                                        sai);
1130                                         cond_resched();
1131                                         spin_lock(&lli->lli_agl_lock);
1132                                 }
1133                                 spin_unlock(&lli->lli_agl_lock);
1134
1135                                 if (!sa_sent_full(sai))
1136                                         break;
1137                                 schedule();
1138                         }
1139                         __set_current_state(TASK_RUNNING);
1140
1141                         if (IS_ENCRYPTED(dir)) {
1142                                 struct llcrypt_str de_name =
1143                                         LLTR_INIT(ent->lde_name, namelen);
1144
1145                                 rc = llcrypt_fname_alloc_buffer(dir, NAME_MAX,
1146                                                                 &lltr);
1147                                 if (rc < 0)
1148                                         continue;
1149
1150                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1151                                                          &lltr)) {
1152                                         llcrypt_fname_free_buffer(&lltr);
1153                                         continue;
1154                                 }
1155
1156                                 name = lltr.name;
1157                                 namelen = lltr.len;
1158                         }
1159
1160                         sa_statahead(parent, name, namelen, &fid);
1161                         llcrypt_fname_free_buffer(&lltr);
1162                 }
1163
1164                 pos = le64_to_cpu(dp->ldp_hash_end);
1165                 ll_release_page(dir, page,
1166                                 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1167
1168                 if (sa_low_hit(sai)) {
1169                         rc = -EFAULT;
1170                         atomic_inc(&sbi->ll_sa_wrong);
1171                         CDEBUG(D_READA,
1172                                "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stoppingstatahead thread: pid %d\n",
1173                                PFID(&lli->lli_fid), sai->sai_hit,
1174                                sai->sai_miss, sai->sai_sent,
1175                                sai->sai_replied, current->pid);
1176                         break;
1177                 }
1178         }
1179         ll_finish_md_op_data(op_data);
1180
1181         if (rc < 0) {
1182                 spin_lock(&lli->lli_sa_lock);
1183                 sai->sai_task = NULL;
1184                 lli->lli_sa_enabled = 0;
1185                 spin_unlock(&lli->lli_sa_lock);
1186         }
1187
1188         /*
1189          * statahead is finished, but statahead entries need to be cached, wait
1190          * for file release to stop me.
1191          */
1192         while (({set_current_state(TASK_IDLE);
1193                  sai->sai_task; })) {
1194                 if (sa_has_callback(sai)) {
1195                         __set_current_state(TASK_RUNNING);
1196                         sa_handle_callback(sai);
1197                 } else {
1198                         schedule();
1199                 }
1200         }
1201         __set_current_state(TASK_RUNNING);
1202
1203         EXIT;
1204 out:
1205         ll_stop_agl(sai);
1206
1207         /*
1208          * wait for inflight statahead RPCs to finish, and then we can free sai
1209          * safely because statahead RPC will access sai data
1210          */
1211         while (sai->sai_sent != sai->sai_replied)
1212                 /* in case we're not woken up, timeout wait */
1213                 msleep(125);
1214
1215         /* release resources held by statahead RPCs */
1216         sa_handle_callback(sai);
1217
1218         CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd\n",
1219                sbi->ll_fsname, sai, parent);
1220
1221         spin_lock(&lli->lli_sa_lock);
1222         sai->sai_task = NULL;
1223         spin_unlock(&lli->lli_sa_lock);
1224         wake_up(&sai->sai_waitq);
1225
1226         ll_sai_put(sai);
1227
1228         return rc;
1229 }
1230
1231 /* authorize opened dir handle @key to statahead */
1232 void ll_authorize_statahead(struct inode *dir, void *key)
1233 {
1234         struct ll_inode_info *lli = ll_i2info(dir);
1235
1236         spin_lock(&lli->lli_sa_lock);
1237         if (!lli->lli_opendir_key && !lli->lli_sai) {
1238                 /*
1239                  * if lli_sai is not NULL, it means previous statahead is not
1240                  * finished yet, we'd better not start a new statahead for now.
1241                  */
1242                 LASSERT(lli->lli_opendir_pid == 0);
1243                 lli->lli_opendir_key = key;
1244                 lli->lli_opendir_pid = current->pid;
1245                 lli->lli_sa_enabled = 1;
1246         }
1247         spin_unlock(&lli->lli_sa_lock);
1248 }
1249
1250 /*
1251  * deauthorize opened dir handle @key to statahead, and notify statahead thread
1252  * to quit if it's running.
1253  */
1254 void ll_deauthorize_statahead(struct inode *dir, void *key)
1255 {
1256         struct ll_inode_info *lli = ll_i2info(dir);
1257         struct ll_statahead_info *sai;
1258
1259         LASSERT(lli->lli_opendir_key == key);
1260         LASSERT(lli->lli_opendir_pid != 0);
1261
1262         CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1263                PFID(&lli->lli_fid));
1264
1265         spin_lock(&lli->lli_sa_lock);
1266         lli->lli_opendir_key = NULL;
1267         lli->lli_opendir_pid = 0;
1268         lli->lli_sa_enabled = 0;
1269         sai = lli->lli_sai;
1270         if (sai && sai->sai_task) {
1271                 /*
1272                  * statahead thread may not have quit yet because it needs to
1273                  * cache entries, now it's time to tell it to quit.
1274                  *
1275                  * wake_up_process() provides the necessary barriers
1276                  * to pair with set_current_state().
1277                  */
1278                 struct task_struct *task = sai->sai_task;
1279
1280                 sai->sai_task = NULL;
1281                 wake_up_process(task);
1282         }
1283         spin_unlock(&lli->lli_sa_lock);
1284 }
1285
1286 enum {
1287         /**
1288          * not first dirent, or is "."
1289          */
1290         LS_NOT_FIRST_DE = 0,
1291         /**
1292          * the first non-hidden dirent
1293          */
1294         LS_FIRST_DE,
1295         /**
1296          * the first hidden dirent, that is "."
1297          */
1298         LS_FIRST_DOT_DE
1299 };
1300
1301 /* file is first dirent under @dir */
1302 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1303 {
1304         struct qstr *target = &dentry->d_name;
1305         struct md_op_data *op_data;
1306         int dot_de;
1307         struct page *page = NULL;
1308         int rc = LS_NOT_FIRST_DE;
1309         __u64 pos = 0;
1310         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1311
1312         ENTRY;
1313
1314         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1315                                      LUSTRE_OPC_ANY, dir);
1316         if (IS_ERR(op_data))
1317                 RETURN(PTR_ERR(op_data));
1318
1319         if (IS_ENCRYPTED(dir)) {
1320                 int rc2 = llcrypt_fname_alloc_buffer(dir, NAME_MAX, &lltr);
1321
1322                 if (rc2 < 0)
1323                         RETURN(rc2);
1324         }
1325
1326         /**
1327          *FIXME choose the start offset of the readdir
1328          */
1329
1330         page = ll_get_dir_page(dir, op_data, 0);
1331
1332         while (1) {
1333                 struct lu_dirpage *dp;
1334                 struct lu_dirent  *ent;
1335
1336                 if (IS_ERR(page)) {
1337                         struct ll_inode_info *lli = ll_i2info(dir);
1338
1339                         rc = PTR_ERR(page);
1340                         CERROR("%s: reading dir "DFID" at %llu opendir_pid = %u : rc = %d\n",
1341                                ll_i2sbi(dir)->ll_fsname,
1342                                PFID(ll_inode2fid(dir)), pos,
1343                                lli->lli_opendir_pid, rc);
1344                         break;
1345                 }
1346
1347                 dp = page_address(page);
1348                 for (ent = lu_dirent_start(dp); ent != NULL;
1349                      ent = lu_dirent_next(ent)) {
1350                         __u64 hash;
1351                         int namelen;
1352                         char *name;
1353
1354                         hash = le64_to_cpu(ent->lde_hash);
1355                         /*
1356                          * The ll_get_dir_page() can return any page containing
1357                          * the given hash which may be not the start hash.
1358                          */
1359                         if (unlikely(hash < pos))
1360                                 continue;
1361
1362                         namelen = le16_to_cpu(ent->lde_namelen);
1363                         if (unlikely(namelen == 0))
1364                                 /*
1365                                  * skip dummy record.
1366                                  */
1367                                 continue;
1368
1369                         name = ent->lde_name;
1370                         if (name[0] == '.') {
1371                                 if (namelen == 1)
1372                                         /*
1373                                          * skip "."
1374                                          */
1375                                         continue;
1376                                 else if (name[1] == '.' && namelen == 2)
1377                                         /*
1378                                          * skip ".."
1379                                          */
1380                                         continue;
1381                                 else
1382                                         dot_de = 1;
1383                         } else {
1384                                 dot_de = 0;
1385                         }
1386
1387                         if (dot_de && target->name[0] != '.') {
1388                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1389                                        target->len, target->name,
1390                                        namelen, name);
1391                                 continue;
1392                         }
1393
1394                         if (IS_ENCRYPTED(dir)) {
1395                                 struct llcrypt_str de_name =
1396                                         LLTR_INIT(ent->lde_name, namelen);
1397
1398                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1399                                                           &lltr))
1400                                         continue;
1401                                 name = lltr.name;
1402                                 namelen = lltr.len;
1403                         }
1404
1405                         if (target->len != namelen ||
1406                             memcmp(target->name, name, namelen) != 0)
1407                                 rc = LS_NOT_FIRST_DE;
1408                         else if (!dot_de)
1409                                 rc = LS_FIRST_DE;
1410                         else
1411                                 rc = LS_FIRST_DOT_DE;
1412
1413                         ll_release_page(dir, page, false);
1414                         GOTO(out, rc);
1415                 }
1416                 pos = le64_to_cpu(dp->ldp_hash_end);
1417                 if (pos == MDS_DIR_END_OFF) {
1418                         /*
1419                          * End of directory reached.
1420                          */
1421                         ll_release_page(dir, page, false);
1422                         GOTO(out, rc);
1423                 } else {
1424                         /*
1425                          * chain is exhausted
1426                          * Normal case: continue to the next page.
1427                          */
1428                         ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1429                                               LDF_COLLIDE);
1430                         page = ll_get_dir_page(dir, op_data, pos);
1431                 }
1432         }
1433         EXIT;
1434 out:
1435         llcrypt_fname_free_buffer(&lltr);
1436         ll_finish_md_op_data(op_data);
1437
1438         return rc;
1439 }
1440
1441 /**
1442  * revalidate @dentryp from statahead cache
1443  *
1444  * \param[in] dir       parent directory
1445  * \param[in] sai       sai structure
1446  * \param[out] dentryp  pointer to dentry which will be revalidated
1447  * \param[in] unplug    unplug statahead window only (normally for negative
1448  *                      dentry)
1449  * \retval              1 on success, dentry is saved in @dentryp
1450  * \retval              0 if revalidation failed (no proper lock on client)
1451  * \retval              negative number upon error
1452  */
1453 static int revalidate_statahead_dentry(struct inode *dir,
1454                                        struct ll_statahead_info *sai,
1455                                        struct dentry **dentryp,
1456                                        bool unplug)
1457 {
1458         struct sa_entry *entry = NULL;
1459         struct ll_inode_info *lli = ll_i2info(dir);
1460         int rc = 0;
1461
1462         ENTRY;
1463
1464         if ((*dentryp)->d_name.name[0] == '.') {
1465                 if (sai->sai_ls_all ||
1466                     sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1467                         /*
1468                          * Hidden dentry is the first one, or statahead
1469                          * thread does not skip so many hidden dentries
1470                          * before "sai_ls_all" enabled as below.
1471                          */
1472                 } else {
1473                         if (!sai->sai_ls_all)
1474                                 /*
1475                                  * It maybe because hidden dentry is not
1476                                  * the first one, "sai_ls_all" was not
1477                                  * set, then "ls -al" missed. Enable
1478                                  * "sai_ls_all" for such case.
1479                                  */
1480                                 sai->sai_ls_all = 1;
1481
1482                         /*
1483                          * Such "getattr" has been skipped before
1484                          * "sai_ls_all" enabled as above.
1485                          */
1486                         sai->sai_miss_hidden++;
1487                         RETURN(-EAGAIN);
1488                 }
1489         }
1490
1491         if (unplug)
1492                 GOTO(out, rc = 1);
1493
1494         entry = sa_get(sai, &(*dentryp)->d_name);
1495         if (!entry)
1496                 GOTO(out, rc = -EAGAIN);
1497
1498         /* if statahead is busy in readdir, help it do post-work */
1499         if (!sa_ready(entry) && sai->sai_in_readpage)
1500                 sa_handle_callback(sai);
1501
1502         if (!sa_ready(entry)) {
1503                 spin_lock(&lli->lli_sa_lock);
1504                 sai->sai_index_wait = entry->se_index;
1505                 spin_unlock(&lli->lli_sa_lock);
1506                 rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
1507                                              cfs_time_seconds(30));
1508                 if (rc == 0) {
1509                         /*
1510                          * entry may not be ready, so it may be used by inflight
1511                          * statahead RPC, don't free it.
1512                          */
1513                         entry = NULL;
1514                         GOTO(out, rc = -EAGAIN);
1515                 }
1516         }
1517
1518         /*
1519          * We need to see the value that was set immediately before we
1520          * were woken up.
1521          */
1522         if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
1523             entry->se_inode) {
1524                 struct inode *inode = entry->se_inode;
1525                 struct lookup_intent it = { .it_op = IT_GETATTR,
1526                                             .it_lock_handle =
1527                                                 entry->se_handle };
1528                 __u64 bits;
1529
1530                 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1531                                         ll_inode2fid(inode), &bits);
1532                 if (rc == 1) {
1533                         if (!(*dentryp)->d_inode) {
1534                                 struct dentry *alias;
1535
1536                                 alias = ll_splice_alias(inode, *dentryp);
1537                                 if (IS_ERR(alias)) {
1538                                         ll_intent_release(&it);
1539                                         GOTO(out, rc = PTR_ERR(alias));
1540                                 }
1541                                 *dentryp = alias;
1542                                 /*
1543                                  * statahead prepared this inode, transfer inode
1544                                  * refcount from sa_entry to dentry
1545                                  */
1546                                 entry->se_inode = NULL;
1547                         } else if ((*dentryp)->d_inode != inode) {
1548                                 /* revalidate, but inode is recreated */
1549                                 CDEBUG(D_READA,
1550                                        "%s: stale dentry %pd inode " DFID", statahead inode "DFID "\n",
1551                                        ll_i2sbi(inode)->ll_fsname, *dentryp,
1552                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
1553                                        PFID(ll_inode2fid(inode)));
1554                                 ll_intent_release(&it);
1555                                 GOTO(out, rc = -ESTALE);
1556                         }
1557
1558                         if ((bits & MDS_INODELOCK_LOOKUP) &&
1559                             d_lustre_invalid(*dentryp)) {
1560                                 d_lustre_revalidate(*dentryp);
1561                                 ll_update_dir_depth(dir, (*dentryp)->d_inode);
1562                         }
1563
1564                         ll_intent_release(&it);
1565                 }
1566         }
1567 out:
1568         /*
1569          * statahead cached sa_entry can be used only once, and will be killed
1570          * right after use, so if lookup/revalidate accessed statahead cache,
1571          * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
1572          * stat this file again, we know we've done statahead before, see
1573          * dentry_may_statahead().
1574          */
1575         if (lld_is_init(*dentryp))
1576                 ll_d2d(*dentryp)->lld_sa_generation = lli->lli_sa_generation;
1577         sa_put(sai, entry);
1578         spin_lock(&lli->lli_sa_lock);
1579         if (sai->sai_task)
1580                 wake_up_process(sai->sai_task);
1581         spin_unlock(&lli->lli_sa_lock);
1582
1583         RETURN(rc);
1584 }
1585
1586 /**
1587  * start statahead thread
1588  *
1589  * \param[in] dir       parent directory
1590  * \param[in] dentry    dentry that triggers statahead, normally the first
1591  *                      dirent under @dir
1592  * \param[in] agl       indicate whether AGL is needed
1593  * \retval              -EAGAIN on success, because when this function is
1594  *                      called, it's already in lookup call, so client should
1595  *                      do it itself instead of waiting for statahead thread
1596  *                      to do it asynchronously.
1597  * \retval              negative number upon error
1598  */
1599 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
1600                                   bool agl)
1601 {
1602         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1603         struct ll_inode_info *lli = ll_i2info(dir);
1604         struct ll_statahead_info *sai = NULL;
1605         struct dentry *parent = dentry->d_parent;
1606         struct task_struct *task;
1607         struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
1608         int first = LS_FIRST_DE;
1609         int rc = 0;
1610
1611         ENTRY;
1612
1613         /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1614         first = is_first_dirent(dir, dentry);
1615         if (first == LS_NOT_FIRST_DE)
1616                 /* It is not "ls -{a}l" operation, no need statahead for it. */
1617                 GOTO(out, rc = -EFAULT);
1618
1619         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
1620                                        sbi->ll_sa_running_max)) {
1621                 CDEBUG(D_READA,
1622                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
1623                 GOTO(out, rc = -EMFILE);
1624         }
1625
1626         sai = ll_sai_alloc(parent);
1627         if (!sai)
1628                 GOTO(out, rc = -ENOMEM);
1629
1630         sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
1631
1632         /*
1633          * if current lli_opendir_key was deauthorized, or dir re-opened by
1634          * another process, don't start statahead, otherwise the newly spawned
1635          * statahead thread won't be notified to quit.
1636          */
1637         spin_lock(&lli->lli_sa_lock);
1638         if (unlikely(lli->lli_sai || !lli->lli_opendir_key ||
1639                      lli->lli_opendir_pid != current->pid)) {
1640                 spin_unlock(&lli->lli_sa_lock);
1641                 GOTO(out, rc = -EPERM);
1642         }
1643         lli->lli_sai = sai;
1644         spin_unlock(&lli->lli_sa_lock);
1645
1646         CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
1647                current->pid, parent);
1648
1649         task = kthread_create_on_node(ll_statahead_thread, parent, node,
1650                                       "ll_sa_%u", lli->lli_opendir_pid);
1651         if (IS_ERR(task)) {
1652                 spin_lock(&lli->lli_sa_lock);
1653                 lli->lli_sai = NULL;
1654                 spin_unlock(&lli->lli_sa_lock);
1655                 rc = PTR_ERR(task);
1656                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1657                 GOTO(out, rc);
1658         }
1659
1660         if (ll_i2sbi(parent->d_inode)->ll_flags & LL_SBI_AGL_ENABLED && agl)
1661                 ll_start_agl(parent, sai);
1662
1663         atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_total);
1664         sai->sai_task = task;
1665
1666         wake_up_process(task);
1667         /*
1668          * We don't stat-ahead for the first dirent since we are already in
1669          * lookup.
1670          */
1671         RETURN(-EAGAIN);
1672
1673 out:
1674         /*
1675          * once we start statahead thread failed, disable statahead so that
1676          * subsequent stat won't waste time to try it.
1677          */
1678         spin_lock(&lli->lli_sa_lock);
1679         if (lli->lli_opendir_pid == current->pid)
1680                 lli->lli_sa_enabled = 0;
1681         spin_unlock(&lli->lli_sa_lock);
1682
1683         if (sai)
1684                 ll_sai_free(sai);
1685         if (first != LS_NOT_FIRST_DE)
1686                 atomic_dec(&sbi->ll_sa_running);
1687
1688         RETURN(rc);
1689 }
1690
1691 /*
1692  * Check whether statahead for @dir was started.
1693  */
1694 static inline bool ll_statahead_started(struct inode *dir, bool agl)
1695 {
1696         struct ll_inode_info *lli = ll_i2info(dir);
1697         struct ll_statahead_info *sai;
1698
1699         spin_lock(&lli->lli_sa_lock);
1700         sai = lli->lli_sai;
1701         if (sai && (sai->sai_agl_task != NULL) != agl)
1702                 CDEBUG(D_READA,
1703                        "%s: Statahead AGL hint changed from %d to %d\n",
1704                        ll_i2sbi(dir)->ll_fsname,
1705                        sai->sai_agl_task != NULL, agl);
1706         spin_unlock(&lli->lli_sa_lock);
1707
1708         return !!sai;
1709 }
1710
1711 /**
1712  * statahead entry function, this is called when client getattr on a file, it
1713  * will start statahead thread if this is the first dir entry, else revalidate
1714  * dentry from statahead cache.
1715  *
1716  * \param[in]  dir      parent directory
1717  * \param[out] dentryp  dentry to getattr
1718  * \param[in]  agl      whether start the agl thread
1719  *
1720  * \retval              1 on success
1721  * \retval              0 revalidation from statahead cache failed, caller needs
1722  *                      to getattr from server directly
1723  * \retval              negative number on error, caller often ignores this and
1724  *                      then getattr from server
1725  */
1726 int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
1727 {
1728         if (!ll_statahead_started(dir, agl))
1729                 return start_statahead_thread(dir, dentry, agl);
1730         return 0;
1731 }
1732
1733 /**
1734  * revalidate dentry from statahead cache.
1735  *
1736  * \param[in]  dir      parent directory
1737  * \param[out] dentryp  dentry to getattr
1738  * \param[in]  unplug   unplug statahead window only (normally for negative
1739  *                      dentry)
1740  * \retval              1 on success
1741  * \retval              0 revalidation from statahead cache failed, caller needs
1742  *                      to getattr from server directly
1743  * \retval              negative number on error, caller often ignores this and
1744  *                      then getattr from server
1745  */
1746 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
1747                             bool unplug)
1748 {
1749         struct ll_statahead_info *sai;
1750         int rc = 0;
1751
1752         sai = ll_sai_get(dir);
1753         if (sai) {
1754                 rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
1755                 CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
1756                        *dentryp, rc);
1757                 ll_sai_put(sai);
1758         }
1759         return rc;
1760 }