Whamcloud - gitweb
b09a798bc924267e8ca4fb7a88d67d787649413e
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #include <linux/fs.h>
33 #include <linux/sched.h>
34 #include <linux/kthread.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38 #include <linux/delay.h>
39
40 #define DEBUG_SUBSYSTEM S_LLITE
41
42 #include <obd_support.h>
43 #include <lustre_dlm.h>
44 #include "llite_internal.h"
45
46 #define SA_OMITTED_ENTRY_MAX 8ULL
47
48 typedef enum {
49         /** negative values are for error cases */
50         SA_ENTRY_INIT = 0,      /** init entry */
51         SA_ENTRY_SUCC = 1,      /** stat succeed */
52         SA_ENTRY_INVA = 2,      /** invalid entry */
53 } se_state_t;
54
55 /*
56  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
57  * and in async stat callback ll_statahead_interpret() will add it into
58  * sai_interim_entries, later statahead thread will call sa_handle_callback() to
59  * instantiate entry and move it into sai_entries, and then only scanner process
60  * can access and free it.
61  */
62 struct sa_entry {
63         /* link into sai_interim_entries or sai_entries */
64         struct list_head        se_list;
65         /* link into sai hash table locally */
66         struct list_head        se_hash;
67         /* entry index in the sai */
68         __u64                   se_index;
69         /* low layer ldlm lock handle */
70         __u64                   se_handle;
71         /* entry status */
72         se_state_t              se_state;
73         /* entry size, contains name */
74         int                     se_size;
75         /* pointer to async getattr enqueue info */
76         struct md_enqueue_info *se_minfo;
77         /* pointer to the async getattr request */
78         struct ptlrpc_request  *se_req;
79         /* pointer to the target inode */
80         struct inode           *se_inode;
81         /* entry name */
82         struct qstr             se_qstr;
83         /* entry fid */
84         struct lu_fid           se_fid;
85 };
86
87 static unsigned int sai_generation;
88 static DEFINE_SPINLOCK(sai_generation_lock);
89
90 static inline int sa_unhashed(struct sa_entry *entry)
91 {
92         return list_empty(&entry->se_hash);
93 }
94
95 /* sa_entry is ready to use */
96 static inline int sa_ready(struct sa_entry *entry)
97 {
98         /* Make sure sa_entry is updated and ready to use */
99         smp_rmb();
100         return (entry->se_state != SA_ENTRY_INIT);
101 }
102
103 /* hash value to put in sai_cache */
104 static inline int sa_hash(int val)
105 {
106         return val & LL_SA_CACHE_MASK;
107 }
108
109 /* hash entry into sai_cache */
110 static inline void
111 sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
112 {
113         int i = sa_hash(entry->se_qstr.hash);
114
115         spin_lock(&sai->sai_cache_lock[i]);
116         list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
117         spin_unlock(&sai->sai_cache_lock[i]);
118 }
119
120 /* unhash entry from sai_cache */
121 static inline void
122 sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
123 {
124         int i = sa_hash(entry->se_qstr.hash);
125
126         spin_lock(&sai->sai_cache_lock[i]);
127         list_del_init(&entry->se_hash);
128         spin_unlock(&sai->sai_cache_lock[i]);
129 }
130
131 static inline int agl_should_run(struct ll_statahead_info *sai,
132                                  struct inode *inode)
133 {
134         return inode && S_ISREG(inode->i_mode) && sai->sai_agl_task;
135 }
136
137 static inline struct ll_inode_info *
138 agl_first_entry(struct ll_statahead_info *sai)
139 {
140         return list_first_entry(&sai->sai_agls, struct ll_inode_info,
141                                 lli_agl_list);
142 }
143
144 /* statahead window is full */
145 static inline int sa_sent_full(struct ll_statahead_info *sai)
146 {
147         return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
148 }
149
150 /* got async stat replies */
151 static inline int sa_has_callback(struct ll_statahead_info *sai)
152 {
153         return !list_empty(&sai->sai_interim_entries);
154 }
155
156 static inline int agl_list_empty(struct ll_statahead_info *sai)
157 {
158         return list_empty(&sai->sai_agls);
159 }
160
161 /**
162  * (1) hit ratio less than 80%
163  * or
164  * (2) consecutive miss more than 8
165  * then means low hit.
166  */
167 static inline int sa_low_hit(struct ll_statahead_info *sai)
168 {
169         return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
170                 (sai->sai_consecutive_miss > 8));
171 }
172
173 /*
174  * if the given index is behind of statahead window more than
175  * SA_OMITTED_ENTRY_MAX, then it is old.
176  */
177 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
178 {
179         return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
180                 sai->sai_index);
181 }
182
183 /* allocate sa_entry and hash it to allow scanner process to find it */
184 static struct sa_entry *
185 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
186          const char *name, int len, const struct lu_fid *fid)
187 {
188         struct ll_inode_info *lli;
189         struct sa_entry *entry;
190         int entry_size;
191         char *dname;
192
193         ENTRY;
194
195         entry_size = sizeof(struct sa_entry) + (len & ~3) + 4;
196         OBD_ALLOC(entry, entry_size);
197         if (unlikely(!entry))
198                 RETURN(ERR_PTR(-ENOMEM));
199
200         CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
201                len, name, entry, index);
202
203         entry->se_index = index;
204
205         entry->se_state = SA_ENTRY_INIT;
206         entry->se_size = entry_size;
207         dname = (char *)entry + sizeof(struct sa_entry);
208         memcpy(dname, name, len);
209         dname[len] = 0;
210         entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
211         entry->se_qstr.len = len;
212         entry->se_qstr.name = dname;
213         entry->se_fid = *fid;
214
215         lli = ll_i2info(sai->sai_dentry->d_inode);
216
217         spin_lock(&lli->lli_sa_lock);
218         INIT_LIST_HEAD(&entry->se_list);
219         sa_rehash(sai, entry);
220         spin_unlock(&lli->lli_sa_lock);
221
222         atomic_inc(&sai->sai_cache_count);
223
224         RETURN(entry);
225 }
226
227 /* free sa_entry, which should have been unhashed and not in any list */
228 static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
229 {
230         CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
231                entry->se_qstr.len, entry->se_qstr.name, entry,
232                entry->se_index);
233
234         LASSERT(list_empty(&entry->se_list));
235         LASSERT(sa_unhashed(entry));
236
237         OBD_FREE(entry, entry->se_size);
238         atomic_dec(&sai->sai_cache_count);
239 }
240
241 /*
242  * find sa_entry by name, used by directory scanner, lock is not needed because
243  * only scanner can remove the entry from cache.
244  */
245 static struct sa_entry *
246 sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
247 {
248         struct sa_entry *entry;
249         int i = sa_hash(qstr->hash);
250
251         list_for_each_entry(entry, &sai->sai_cache[i], se_hash) {
252                 if (entry->se_qstr.hash == qstr->hash &&
253                     entry->se_qstr.len == qstr->len &&
254                     memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0)
255                         return entry;
256         }
257         return NULL;
258 }
259
260 /* unhash and unlink sa_entry, and then free it */
261 static inline void
262 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
263 {
264         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
265
266         LASSERT(!sa_unhashed(entry));
267         LASSERT(!list_empty(&entry->se_list));
268         LASSERT(sa_ready(entry));
269
270         sa_unhash(sai, entry);
271
272         spin_lock(&lli->lli_sa_lock);
273         list_del_init(&entry->se_list);
274         spin_unlock(&lli->lli_sa_lock);
275
276         iput(entry->se_inode);
277
278         sa_free(sai, entry);
279 }
280
281 /* called by scanner after use, sa_entry will be killed */
282 static void
283 sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
284 {
285         struct sa_entry *tmp, *next;
286
287         if (entry && entry->se_state == SA_ENTRY_SUCC) {
288                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
289
290                 sai->sai_hit++;
291                 sai->sai_consecutive_miss = 0;
292                 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
293         } else {
294                 sai->sai_miss++;
295                 sai->sai_consecutive_miss++;
296         }
297
298         if (entry)
299                 sa_kill(sai, entry);
300
301         /*
302          * kill old completed entries, only scanner process does this, no need
303          * to lock
304          */
305         list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) {
306                 if (!is_omitted_entry(sai, tmp->se_index))
307                         break;
308                 sa_kill(sai, tmp);
309         }
310 }
311
312 /*
313  * update state and sort add entry to sai_entries by index, return true if
314  * scanner is waiting on this entry.
315  */
316 static bool
317 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
318 {
319         struct sa_entry *se;
320         struct list_head *pos = &sai->sai_entries;
321         __u64 index = entry->se_index;
322
323         LASSERT(!sa_ready(entry));
324         LASSERT(list_empty(&entry->se_list));
325
326         list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
327                 if (se->se_index < entry->se_index) {
328                         pos = &se->se_list;
329                         break;
330                 }
331         }
332         list_add(&entry->se_list, pos);
333         /*
334          * LU-9210: ll_statahead_interpet must be able to see this before
335          * we wake it up
336          */
337         smp_store_release(&entry->se_state,
338                           ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
339
340         return (index == sai->sai_index_wait);
341 }
342
343 /* finish async stat RPC arguments */
344 static void sa_fini_data(struct md_enqueue_info *minfo)
345 {
346         struct md_op_data *op_data = &minfo->mi_data;
347
348         if (op_data->op_flags & MF_OPNAME_KMALLOCED)
349                 /* allocated via ll_setup_filename called from sa_prep_data */
350                 kfree(op_data->op_name);
351         ll_unlock_md_op_lsm(&minfo->mi_data);
352         iput(minfo->mi_dir);
353         OBD_FREE_PTR(minfo);
354 }
355
356 static int ll_statahead_interpret(struct ptlrpc_request *req,
357                                   struct md_enqueue_info *minfo, int rc);
358
359 /*
360  * prepare arguments for async stat RPC.
361  */
362 static struct md_enqueue_info *
363 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
364 {
365         struct md_enqueue_info   *minfo;
366         struct ldlm_enqueue_info *einfo;
367         struct md_op_data        *op_data;
368
369         OBD_ALLOC_PTR(minfo);
370         if (!minfo)
371                 return ERR_PTR(-ENOMEM);
372
373         op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child,
374                                      entry->se_qstr.name, entry->se_qstr.len, 0,
375                                      LUSTRE_OPC_ANY, NULL);
376         if (IS_ERR(op_data)) {
377                 OBD_FREE_PTR(minfo);
378                 return (struct md_enqueue_info *)op_data;
379         }
380
381         if (!child)
382                 op_data->op_fid2 = entry->se_fid;
383
384         minfo->mi_it.it_op = IT_GETATTR;
385         minfo->mi_dir = igrab(dir);
386         minfo->mi_cb = ll_statahead_interpret;
387         minfo->mi_cbdata = entry;
388
389         einfo = &minfo->mi_einfo;
390         einfo->ei_type   = LDLM_IBITS;
391         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
392         einfo->ei_cb_bl  = ll_md_blocking_ast;
393         einfo->ei_cb_cp  = ldlm_completion_ast;
394         einfo->ei_cb_gl  = NULL;
395         einfo->ei_cbdata = NULL;
396         einfo->ei_req_slot = 1;
397
398         return minfo;
399 }
400
401 /*
402  * release resources used in async stat RPC, update entry state and wakeup if
403  * scanner process it waiting on this entry.
404  */
405 static void
406 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
407 {
408         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
409         struct md_enqueue_info *minfo = entry->se_minfo;
410         struct ptlrpc_request *req = entry->se_req;
411         bool wakeup;
412
413         /* release resources used in RPC */
414         if (minfo) {
415                 entry->se_minfo = NULL;
416                 ll_intent_release(&minfo->mi_it);
417                 sa_fini_data(minfo);
418         }
419
420         if (req) {
421                 entry->se_req = NULL;
422                 ptlrpc_req_finished(req);
423         }
424
425         spin_lock(&lli->lli_sa_lock);
426         wakeup = __sa_make_ready(sai, entry, ret);
427         spin_unlock(&lli->lli_sa_lock);
428
429         if (wakeup)
430                 wake_up(&sai->sai_waitq);
431 }
432
433 /* insert inode into the list of sai_agls */
434 static void ll_agl_add(struct ll_statahead_info *sai,
435                        struct inode *inode, int index)
436 {
437         struct ll_inode_info *child  = ll_i2info(inode);
438         struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
439
440         spin_lock(&child->lli_agl_lock);
441         if (child->lli_agl_index == 0) {
442                 child->lli_agl_index = index;
443                 spin_unlock(&child->lli_agl_lock);
444
445                 LASSERT(list_empty(&child->lli_agl_list));
446
447                 spin_lock(&parent->lli_agl_lock);
448                 /* Re-check under the lock */
449                 if (agl_should_run(sai, inode)) {
450                         if (agl_list_empty(sai))
451                                 wake_up_process(sai->sai_agl_task);
452                         igrab(inode);
453                         list_add_tail(&child->lli_agl_list, &sai->sai_agls);
454                 } else
455                         child->lli_agl_index = 0;
456                 spin_unlock(&parent->lli_agl_lock);
457         } else {
458                 spin_unlock(&child->lli_agl_lock);
459         }
460 }
461
462 /* allocate sai */
463 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
464 {
465         struct ll_statahead_info *sai;
466         struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
467         int i;
468
469         ENTRY;
470
471         OBD_ALLOC_PTR(sai);
472         if (!sai)
473                 RETURN(NULL);
474
475         sai->sai_dentry = dget(dentry);
476         atomic_set(&sai->sai_refcount, 1);
477         sai->sai_max = LL_SA_RPC_MIN;
478         sai->sai_index = 1;
479         init_waitqueue_head(&sai->sai_waitq);
480
481         INIT_LIST_HEAD(&sai->sai_interim_entries);
482         INIT_LIST_HEAD(&sai->sai_entries);
483         INIT_LIST_HEAD(&sai->sai_agls);
484
485         for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
486                 INIT_LIST_HEAD(&sai->sai_cache[i]);
487                 spin_lock_init(&sai->sai_cache_lock[i]);
488         }
489         atomic_set(&sai->sai_cache_count, 0);
490
491         spin_lock(&sai_generation_lock);
492         lli->lli_sa_generation = ++sai_generation;
493         if (unlikely(sai_generation == 0))
494                 lli->lli_sa_generation = ++sai_generation;
495         spin_unlock(&sai_generation_lock);
496
497         RETURN(sai);
498 }
499
500 /* free sai */
501 static inline void ll_sai_free(struct ll_statahead_info *sai)
502 {
503         LASSERT(sai->sai_dentry != NULL);
504         dput(sai->sai_dentry);
505         OBD_FREE_PTR(sai);
506 }
507
508 /*
509  * take refcount of sai if sai for @dir exists, which means statahead is on for
510  * this directory.
511  */
512 static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
513 {
514         struct ll_inode_info *lli = ll_i2info(dir);
515         struct ll_statahead_info *sai = NULL;
516
517         spin_lock(&lli->lli_sa_lock);
518         sai = lli->lli_sai;
519         if (sai)
520                 atomic_inc(&sai->sai_refcount);
521         spin_unlock(&lli->lli_sa_lock);
522
523         return sai;
524 }
525
526 /*
527  * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
528  * attached to it.
529  */
530 static void ll_sai_put(struct ll_statahead_info *sai)
531 {
532         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
533
534         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
535                 struct sa_entry *entry, *next;
536                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
537
538                 lli->lli_sai = NULL;
539                 spin_unlock(&lli->lli_sa_lock);
540
541                 LASSERT(!sai->sai_task);
542                 LASSERT(!sai->sai_agl_task);
543                 LASSERT(sai->sai_sent == sai->sai_replied);
544                 LASSERT(!sa_has_callback(sai));
545
546                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
547                                          se_list)
548                         sa_kill(sai, entry);
549
550                 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
551                 LASSERT(agl_list_empty(sai));
552
553                 ll_sai_free(sai);
554                 atomic_dec(&sbi->ll_sa_running);
555         }
556 }
557
558 /* Do NOT forget to drop inode refcount when into sai_agls. */
559 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
560 {
561         struct ll_inode_info *lli = ll_i2info(inode);
562         u64 index = lli->lli_agl_index;
563         ktime_t expire;
564         int rc;
565
566         ENTRY;
567
568         LASSERT(list_empty(&lli->lli_agl_list));
569
570         /* AGL maybe fall behind statahead with one entry */
571         if (is_omitted_entry(sai, index + 1)) {
572                 lli->lli_agl_index = 0;
573                 iput(inode);
574                 RETURN_EXIT;
575         }
576
577         /*
578          * In case of restore, the MDT has the right size and has already
579          * sent it back without granting the layout lock, inode is up-to-date.
580          * Then AGL (async glimpse lock) is useless.
581          * Also to glimpse we need the layout, in case of a runninh restore
582          * the MDT holds the layout lock so the glimpse will block up to the
583          * end of restore (statahead/agl will block)
584          */
585         if (test_bit(LLIF_FILE_RESTORING, &lli->lli_flags)) {
586                 lli->lli_agl_index = 0;
587                 iput(inode);
588                 RETURN_EXIT;
589         }
590
591         /* Someone is in glimpse (sync or async), do nothing. */
592         rc = down_write_trylock(&lli->lli_glimpse_sem);
593         if (rc == 0) {
594                 lli->lli_agl_index = 0;
595                 iput(inode);
596                 RETURN_EXIT;
597         }
598
599         /*
600          * Someone triggered glimpse within 1 sec before.
601          * 1) The former glimpse succeeded with glimpse lock granted by OST, and
602          *    if the lock is still cached on client, AGL needs to do nothing. If
603          *    it is cancelled by other client, AGL maybe cannot obtaion new lock
604          *    for no glimpse callback triggered by AGL.
605          * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
606          *    Under such case, it is quite possible that the OST will not grant
607          *    glimpse lock for AGL also.
608          * 3) The former glimpse failed, compared with other two cases, it is
609          *    relative rare. AGL can ignore such case, and it will not muchly
610          *    affect the performance.
611          */
612         expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
613         if (ktime_to_ns(lli->lli_glimpse_time) &&
614             ktime_before(expire, lli->lli_glimpse_time)) {
615                 up_write(&lli->lli_glimpse_sem);
616                 lli->lli_agl_index = 0;
617                 iput(inode);
618                 RETURN_EXIT;
619         }
620
621         CDEBUG(D_READA,
622                "Handling (init) async glimpse: inode = " DFID", idx = %llu\n",
623                PFID(&lli->lli_fid), index);
624
625         cl_agl(inode);
626         lli->lli_agl_index = 0;
627         lli->lli_glimpse_time = ktime_get();
628         up_write(&lli->lli_glimpse_sem);
629
630         CDEBUG(D_READA,
631                "Handled (init) async glimpse: inode= " DFID", idx = %llu, rc = %d\n",
632                PFID(&lli->lli_fid), index, rc);
633
634         iput(inode);
635
636         EXIT;
637 }
638
639 /*
640  * prepare inode for sa entry, add it into agl list, now sa_entry is ready
641  * to be used by scanner process.
642  */
643 static void sa_instantiate(struct ll_statahead_info *sai,
644                            struct sa_entry *entry)
645 {
646         struct inode *dir = sai->sai_dentry->d_inode;
647         struct inode *child;
648         struct md_enqueue_info *minfo;
649         struct lookup_intent *it;
650         struct ptlrpc_request *req;
651         struct mdt_body *body;
652         int rc = 0;
653
654         ENTRY;
655
656         LASSERT(entry->se_handle != 0);
657
658         minfo = entry->se_minfo;
659         it = &minfo->mi_it;
660         req = entry->se_req;
661         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
662         if (!body)
663                 GOTO(out, rc = -EFAULT);
664
665         child = entry->se_inode;
666         /* revalidate; unlinked and re-created with the same name */
667         if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2, &body->mbo_fid1))) {
668                 if (child) {
669                         entry->se_inode = NULL;
670                         iput(child);
671                 }
672                 /* The mdt_body is invalid. Skip this entry */
673                 GOTO(out, rc = -EAGAIN);
674         }
675
676         it->it_lock_handle = entry->se_handle;
677         rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
678         if (rc != 1)
679                 GOTO(out, rc = -EAGAIN);
680
681         rc = ll_prep_inode(&child, &req->rq_pill, dir->i_sb, it);
682         if (rc)
683                 GOTO(out, rc);
684
685         /* If encryption context was returned by MDT, put it in
686          * inode now to save an extra getxattr.
687          */
688         if (body->mbo_valid & OBD_MD_ENCCTX) {
689                 void *encctx = req_capsule_server_get(&req->rq_pill,
690                                                       &RMF_FILE_ENCCTX);
691                 __u32 encctxlen = req_capsule_get_size(&req->rq_pill,
692                                                        &RMF_FILE_ENCCTX,
693                                                        RCL_SERVER);
694
695                 if (encctxlen) {
696                         CDEBUG(D_SEC,
697                                "server returned encryption ctx for "DFID"\n",
698                                PFID(ll_inode2fid(child)));
699                         rc = ll_xattr_cache_insert(child,
700                                                    xattr_for_enc(child),
701                                                    encctx, encctxlen);
702                         if (rc)
703                                 CWARN("%s: cannot set enc ctx for "DFID": rc = %d\n",
704                                       ll_i2sbi(child)->ll_fsname,
705                                       PFID(ll_inode2fid(child)), rc);
706                 }
707         }
708
709         CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
710                ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
711                entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
712         ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
713
714         entry->se_inode = child;
715
716         if (agl_should_run(sai, child))
717                 ll_agl_add(sai, child, entry->se_index);
718
719         EXIT;
720
721 out:
722         /*
723          * sa_make_ready() will drop ldlm ibits lock refcount by calling
724          * ll_intent_drop_lock() in spite of failures. Do not worry about
725          * calling ll_intent_drop_lock() more than once.
726          */
727         sa_make_ready(sai, entry, rc);
728 }
729
730 /* once there are async stat replies, instantiate sa_entry from replies */
731 static void sa_handle_callback(struct ll_statahead_info *sai)
732 {
733         struct ll_inode_info *lli;
734
735         lli = ll_i2info(sai->sai_dentry->d_inode);
736
737         spin_lock(&lli->lli_sa_lock);
738         while (sa_has_callback(sai)) {
739                 struct sa_entry *entry;
740
741                 entry = list_entry(sai->sai_interim_entries.next,
742                                    struct sa_entry, se_list);
743                 list_del_init(&entry->se_list);
744                 spin_unlock(&lli->lli_sa_lock);
745
746                 sa_instantiate(sai, entry);
747                 spin_lock(&lli->lli_sa_lock);
748         }
749         spin_unlock(&lli->lli_sa_lock);
750 }
751
752 /*
753  * callback for async stat RPC, because this is called in ptlrpcd context, we
754  * only put sa_entry in sai_interim_entries, and wake up statahead thread to
755  * really prepare inode and instantiate sa_entry later.
756  */
757 static int ll_statahead_interpret(struct ptlrpc_request *req,
758                                   struct md_enqueue_info *minfo, int rc)
759 {
760         struct lookup_intent *it = &minfo->mi_it;
761         struct inode *dir = minfo->mi_dir;
762         struct ll_inode_info *lli = ll_i2info(dir);
763         struct ll_statahead_info *sai = lli->lli_sai;
764         struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
765         __u64 handle = 0;
766
767         ENTRY;
768
769         if (it_disposition(it, DISP_LOOKUP_NEG))
770                 rc = -ENOENT;
771
772         /*
773          * because statahead thread will wait for all inflight RPC to finish,
774          * sai should be always valid, no need to refcount
775          */
776         LASSERT(sai != NULL);
777         LASSERT(entry != NULL);
778
779         CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
780                entry->se_qstr.len, entry->se_qstr.name, rc);
781
782         if (rc != 0) {
783                 ll_intent_release(it);
784                 sa_fini_data(minfo);
785         } else {
786                 /*
787                  * release ibits lock ASAP to avoid deadlock when statahead
788                  * thread enqueues lock on parent in readdir and another
789                  * process enqueues lock on child with parent lock held, eg.
790                  * unlink.
791                  */
792                 handle = it->it_lock_handle;
793                 ll_intent_drop_lock(it);
794                 ll_unlock_md_op_lsm(&minfo->mi_data);
795         }
796
797         spin_lock(&lli->lli_sa_lock);
798         if (rc != 0) {
799                 if (__sa_make_ready(sai, entry, rc))
800                         wake_up(&sai->sai_waitq);
801         } else {
802                 int first = 0;
803
804                 entry->se_minfo = minfo;
805                 entry->se_req = ptlrpc_request_addref(req);
806                 /*
807                  * Release the async ibits lock ASAP to avoid deadlock
808                  * when statahead thread tries to enqueue lock on parent
809                  * for readpage and other tries to enqueue lock on child
810                  * with parent's lock held, for example: unlink.
811                  */
812                 entry->se_handle = handle;
813                 if (!sa_has_callback(sai))
814                         first = 1;
815
816                 list_add_tail(&entry->se_list, &sai->sai_interim_entries);
817                 if (first && sai->sai_task)
818                         wake_up_process(sai->sai_task);
819         }
820         sai->sai_replied++;
821
822         spin_unlock(&lli->lli_sa_lock);
823
824         RETURN(rc);
825 }
826
827 /* async stat for file not found in dcache */
828 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
829 {
830         struct md_enqueue_info   *minfo;
831         int                       rc;
832
833         ENTRY;
834
835         minfo = sa_prep_data(dir, NULL, entry);
836         if (IS_ERR(minfo))
837                 RETURN(PTR_ERR(minfo));
838
839         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
840         if (rc < 0)
841                 sa_fini_data(minfo);
842
843         RETURN(rc);
844 }
845
846 /**
847  * async stat for file found in dcache, similar to .revalidate
848  *
849  * \retval      1 dentry valid, no RPC sent
850  * \retval      0 dentry invalid, will send async stat RPC
851  * \retval      negative number upon error
852  */
853 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
854                          struct dentry *dentry)
855 {
856         struct inode *inode = dentry->d_inode;
857         struct lookup_intent it = { .it_op = IT_GETATTR,
858                                     .it_lock_handle = 0 };
859         struct md_enqueue_info *minfo;
860         int rc;
861
862         ENTRY;
863
864         if (unlikely(!inode))
865                 RETURN(1);
866
867         if (d_mountpoint(dentry))
868                 RETURN(1);
869
870         minfo = sa_prep_data(dir, inode, entry);
871         if (IS_ERR(minfo))
872                 RETURN(PTR_ERR(minfo));
873
874         entry->se_inode = igrab(inode);
875         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
876                                 NULL);
877         if (rc == 1) {
878                 entry->se_handle = it.it_lock_handle;
879                 ll_intent_release(&it);
880                 sa_fini_data(minfo);
881                 RETURN(1);
882         }
883
884         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo);
885         if (rc < 0) {
886                 entry->se_inode = NULL;
887                 iput(inode);
888                 sa_fini_data(minfo);
889         }
890
891         RETURN(rc);
892 }
893
894 /* async stat for file with @name */
895 static void sa_statahead(struct dentry *parent, const char *name, int len,
896                          const struct lu_fid *fid)
897 {
898         struct inode *dir = parent->d_inode;
899         struct ll_inode_info *lli = ll_i2info(dir);
900         struct ll_statahead_info *sai = lli->lli_sai;
901         struct dentry *dentry = NULL;
902         struct sa_entry *entry;
903         int rc;
904
905         ENTRY;
906
907         entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
908         if (IS_ERR(entry))
909                 RETURN_EXIT;
910
911         dentry = d_lookup(parent, &entry->se_qstr);
912         if (!dentry) {
913                 rc = sa_lookup(dir, entry);
914         } else {
915                 rc = sa_revalidate(dir, entry, dentry);
916                 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
917                         ll_agl_add(sai, dentry->d_inode, entry->se_index);
918         }
919
920         if (dentry)
921                 dput(dentry);
922
923         if (rc != 0)
924                 sa_make_ready(sai, entry, rc);
925         else
926                 sai->sai_sent++;
927
928         sai->sai_index++;
929
930         EXIT;
931 }
932
933 /* async glimpse (agl) thread main function */
934 static int ll_agl_thread(void *arg)
935 {
936         struct dentry *parent = (struct dentry *)arg;
937         struct inode *dir = parent->d_inode;
938         struct ll_inode_info *plli = ll_i2info(dir);
939         struct ll_inode_info *clli;
940         /*
941          * We already own this reference, so it is safe to take it
942          * without a lock.
943          */
944         struct ll_statahead_info *sai = plli->lli_sai;
945
946         ENTRY;
947
948         CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
949                sai, parent);
950
951         while (({set_current_state(TASK_IDLE);
952                  !kthread_should_stop(); })) {
953                 spin_lock(&plli->lli_agl_lock);
954                 clli = list_first_entry_or_null(&sai->sai_agls,
955                                                 struct ll_inode_info,
956                                                 lli_agl_list);
957                 if (clli) {
958                         __set_current_state(TASK_RUNNING);
959                         list_del_init(&clli->lli_agl_list);
960                         spin_unlock(&plli->lli_agl_lock);
961                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
962                         cond_resched();
963                 } else {
964                         spin_unlock(&plli->lli_agl_lock);
965                         schedule();
966                 }
967         }
968         __set_current_state(TASK_RUNNING);
969         RETURN(0);
970 }
971
972 static void ll_stop_agl(struct ll_statahead_info *sai)
973 {
974         struct dentry *parent = sai->sai_dentry;
975         struct ll_inode_info *plli = ll_i2info(parent->d_inode);
976         struct ll_inode_info *clli;
977         struct task_struct *agl_task;
978
979         spin_lock(&plli->lli_agl_lock);
980         agl_task = sai->sai_agl_task;
981         sai->sai_agl_task = NULL;
982         spin_unlock(&plli->lli_agl_lock);
983         if (!agl_task)
984                 return;
985
986         CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
987                sai, (unsigned int)agl_task->pid);
988         kthread_stop(agl_task);
989
990         spin_lock(&plli->lli_agl_lock);
991         while ((clli = list_first_entry_or_null(&sai->sai_agls,
992                                                 struct ll_inode_info,
993                                                 lli_agl_list)) != NULL) {
994                 list_del_init(&clli->lli_agl_list);
995                 spin_unlock(&plli->lli_agl_lock);
996                 clli->lli_agl_index = 0;
997                 iput(&clli->lli_vfs_inode);
998                 spin_lock(&plli->lli_agl_lock);
999         }
1000         spin_unlock(&plli->lli_agl_lock);
1001         CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
1002                sai, parent);
1003         ll_sai_put(sai);
1004 }
1005
1006 /* start agl thread */
1007 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
1008 {
1009         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1010         struct ll_inode_info *plli;
1011         struct task_struct *task;
1012
1013         ENTRY;
1014
1015         CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
1016                sai, parent);
1017
1018         plli = ll_i2info(parent->d_inode);
1019         task = kthread_create_on_node(ll_agl_thread, parent, node, "ll_agl_%d",
1020                                       plli->lli_opendir_pid);
1021         if (IS_ERR(task)) {
1022                 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
1023                 RETURN_EXIT;
1024         }
1025         sai->sai_agl_task = task;
1026         atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
1027         /* Get an extra reference that the thread holds */
1028         ll_sai_get(d_inode(parent));
1029
1030         wake_up_process(task);
1031
1032         EXIT;
1033 }
1034
1035 /* statahead thread main function */
1036 static int ll_statahead_thread(void *arg)
1037 {
1038         struct dentry *parent = (struct dentry *)arg;
1039         struct inode *dir = parent->d_inode;
1040         struct ll_inode_info *lli = ll_i2info(dir);
1041         struct ll_sb_info *sbi = ll_i2sbi(dir);
1042         struct ll_statahead_info *sai = lli->lli_sai;
1043         int first = 0;
1044         struct md_op_data *op_data;
1045         struct page *page = NULL;
1046         __u64 pos = 0;
1047         int rc = 0;
1048
1049         ENTRY;
1050
1051         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1052                sai, parent);
1053
1054         OBD_ALLOC_PTR(op_data);
1055         if (!op_data)
1056                 GOTO(out, rc = -ENOMEM);
1057
1058         while (pos != MDS_DIR_END_OFF && sai->sai_task) {
1059                 struct lu_dirpage *dp;
1060                 struct lu_dirent  *ent;
1061
1062                 op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
1063                                              LUSTRE_OPC_ANY, dir);
1064                 if (IS_ERR(op_data)) {
1065                         rc = PTR_ERR(op_data);
1066                         break;
1067                 }
1068
1069                 sai->sai_in_readpage = 1;
1070                 page = ll_get_dir_page(dir, op_data, pos, NULL);
1071                 ll_unlock_md_op_lsm(op_data);
1072                 sai->sai_in_readpage = 0;
1073                 if (IS_ERR(page)) {
1074                         rc = PTR_ERR(page);
1075                         CDEBUG(D_READA,
1076                                "error reading dir "DFID" at %llu /%llu opendir_pid = %u: rc = %d\n",
1077                                PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1078                                lli->lli_opendir_pid, rc);
1079                         break;
1080                 }
1081
1082                 dp = page_address(page);
1083                 for (ent = lu_dirent_start(dp);
1084                      ent != NULL && sai->sai_task &&
1085                      !sa_low_hit(sai);
1086                      ent = lu_dirent_next(ent)) {
1087                         __u64 hash;
1088                         int namelen;
1089                         char *name;
1090                         struct lu_fid fid;
1091                         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1092
1093                         hash = le64_to_cpu(ent->lde_hash);
1094                         if (unlikely(hash < pos))
1095                                 /*
1096                                  * Skip until we find target hash value.
1097                                  */
1098                                 continue;
1099
1100                         namelen = le16_to_cpu(ent->lde_namelen);
1101                         if (unlikely(namelen == 0))
1102                                 /*
1103                                  * Skip dummy record.
1104                                  */
1105                                 continue;
1106
1107                         name = ent->lde_name;
1108                         if (name[0] == '.') {
1109                                 if (namelen == 1) {
1110                                         /*
1111                                          * skip "."
1112                                          */
1113                                         continue;
1114                                 } else if (name[1] == '.' && namelen == 2) {
1115                                         /*
1116                                          * skip ".."
1117                                          */
1118                                         continue;
1119                                 } else if (!sai->sai_ls_all) {
1120                                         /*
1121                                          * skip hidden files.
1122                                          */
1123                                         sai->sai_skip_hidden++;
1124                                         continue;
1125                                 }
1126                         }
1127
1128                         /*
1129                          * don't stat-ahead first entry.
1130                          */
1131                         if (unlikely(++first == 1))
1132                                 continue;
1133
1134                         fid_le_to_cpu(&fid, &ent->lde_fid);
1135
1136                         while (({set_current_state(TASK_IDLE);
1137                                  sai->sai_task; })) {
1138                                 if (sa_has_callback(sai)) {
1139                                         __set_current_state(TASK_RUNNING);
1140                                         sa_handle_callback(sai);
1141                                 }
1142
1143                                 spin_lock(&lli->lli_agl_lock);
1144                                 while (sa_sent_full(sai) &&
1145                                        !agl_list_empty(sai)) {
1146                                         struct ll_inode_info *clli;
1147
1148                                         __set_current_state(TASK_RUNNING);
1149                                         clli = agl_first_entry(sai);
1150                                         list_del_init(&clli->lli_agl_list);
1151                                         spin_unlock(&lli->lli_agl_lock);
1152
1153                                         ll_agl_trigger(&clli->lli_vfs_inode,
1154                                                        sai);
1155                                         cond_resched();
1156                                         spin_lock(&lli->lli_agl_lock);
1157                                 }
1158                                 spin_unlock(&lli->lli_agl_lock);
1159
1160                                 if (!sa_sent_full(sai))
1161                                         break;
1162                                 schedule();
1163                         }
1164                         __set_current_state(TASK_RUNNING);
1165
1166                         if (IS_ENCRYPTED(dir)) {
1167                                 struct llcrypt_str de_name =
1168                                         LLTR_INIT(ent->lde_name, namelen);
1169                                 struct lu_fid fid;
1170
1171                                 rc = llcrypt_fname_alloc_buffer(dir, NAME_MAX,
1172                                                                 &lltr);
1173                                 if (rc < 0)
1174                                         continue;
1175
1176                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1177                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1178                                                          &lltr, &fid)) {
1179                                         llcrypt_fname_free_buffer(&lltr);
1180                                         continue;
1181                                 }
1182
1183                                 name = lltr.name;
1184                                 namelen = lltr.len;
1185                         }
1186
1187                         sa_statahead(parent, name, namelen, &fid);
1188                         llcrypt_fname_free_buffer(&lltr);
1189                 }
1190
1191                 pos = le64_to_cpu(dp->ldp_hash_end);
1192                 down_read(&lli->lli_lsm_sem);
1193                 ll_release_page(dir, page,
1194                                 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1195                 up_read(&lli->lli_lsm_sem);
1196
1197                 if (sa_low_hit(sai)) {
1198                         rc = -EFAULT;
1199                         atomic_inc(&sbi->ll_sa_wrong);
1200                         CDEBUG(D_READA,
1201                                "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stoppingstatahead thread: pid %d\n",
1202                                PFID(&lli->lli_fid), sai->sai_hit,
1203                                sai->sai_miss, sai->sai_sent,
1204                                sai->sai_replied, current->pid);
1205                         break;
1206                 }
1207         }
1208         ll_finish_md_op_data(op_data);
1209
1210         if (rc < 0) {
1211                 spin_lock(&lli->lli_sa_lock);
1212                 sai->sai_task = NULL;
1213                 lli->lli_sa_enabled = 0;
1214                 spin_unlock(&lli->lli_sa_lock);
1215         }
1216
1217         /*
1218          * statahead is finished, but statahead entries need to be cached, wait
1219          * for file release to stop me.
1220          */
1221         while (({set_current_state(TASK_IDLE);
1222                  sai->sai_task; })) {
1223                 if (sa_has_callback(sai)) {
1224                         __set_current_state(TASK_RUNNING);
1225                         sa_handle_callback(sai);
1226                 } else {
1227                         schedule();
1228                 }
1229         }
1230         __set_current_state(TASK_RUNNING);
1231
1232         EXIT;
1233 out:
1234         ll_stop_agl(sai);
1235
1236         /*
1237          * wait for inflight statahead RPCs to finish, and then we can free sai
1238          * safely because statahead RPC will access sai data
1239          */
1240         while (sai->sai_sent != sai->sai_replied)
1241                 /* in case we're not woken up, timeout wait */
1242                 msleep(125);
1243
1244         /* release resources held by statahead RPCs */
1245         sa_handle_callback(sai);
1246
1247         CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd\n",
1248                sbi->ll_fsname, sai, parent);
1249
1250         spin_lock(&lli->lli_sa_lock);
1251         sai->sai_task = NULL;
1252         spin_unlock(&lli->lli_sa_lock);
1253         wake_up(&sai->sai_waitq);
1254
1255         ll_sai_put(sai);
1256
1257         return rc;
1258 }
1259
1260 /* authorize opened dir handle @key to statahead */
1261 void ll_authorize_statahead(struct inode *dir, void *key)
1262 {
1263         struct ll_inode_info *lli = ll_i2info(dir);
1264
1265         spin_lock(&lli->lli_sa_lock);
1266         if (!lli->lli_opendir_key && !lli->lli_sai) {
1267                 /*
1268                  * if lli_sai is not NULL, it means previous statahead is not
1269                  * finished yet, we'd better not start a new statahead for now.
1270                  */
1271                 LASSERT(lli->lli_opendir_pid == 0);
1272                 lli->lli_opendir_key = key;
1273                 lli->lli_opendir_pid = current->pid;
1274                 lli->lli_sa_enabled = 1;
1275         }
1276         spin_unlock(&lli->lli_sa_lock);
1277 }
1278
1279 /*
1280  * deauthorize opened dir handle @key to statahead, and notify statahead thread
1281  * to quit if it's running.
1282  */
1283 void ll_deauthorize_statahead(struct inode *dir, void *key)
1284 {
1285         struct ll_inode_info *lli = ll_i2info(dir);
1286         struct ll_statahead_info *sai;
1287
1288         LASSERT(lli->lli_opendir_key == key);
1289         LASSERT(lli->lli_opendir_pid != 0);
1290
1291         CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1292                PFID(&lli->lli_fid));
1293
1294         spin_lock(&lli->lli_sa_lock);
1295         lli->lli_opendir_key = NULL;
1296         lli->lli_opendir_pid = 0;
1297         lli->lli_sa_enabled = 0;
1298         sai = lli->lli_sai;
1299         if (sai && sai->sai_task) {
1300                 /*
1301                  * statahead thread may not have quit yet because it needs to
1302                  * cache entries, now it's time to tell it to quit.
1303                  *
1304                  * wake_up_process() provides the necessary barriers
1305                  * to pair with set_current_state().
1306                  */
1307                 struct task_struct *task = sai->sai_task;
1308
1309                 sai->sai_task = NULL;
1310                 wake_up_process(task);
1311         }
1312         spin_unlock(&lli->lli_sa_lock);
1313 }
1314
1315 enum {
1316         /**
1317          * not first dirent, or is "."
1318          */
1319         LS_NOT_FIRST_DE = 0,
1320         /**
1321          * the first non-hidden dirent
1322          */
1323         LS_FIRST_DE,
1324         /**
1325          * the first hidden dirent, that is "."
1326          */
1327         LS_FIRST_DOT_DE
1328 };
1329
1330 /* file is first dirent under @dir */
1331 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1332 {
1333         struct qstr *target = &dentry->d_name;
1334         struct md_op_data *op_data;
1335         int dot_de;
1336         struct page *page = NULL;
1337         int rc = LS_NOT_FIRST_DE;
1338         __u64 pos = 0;
1339         struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1340
1341         ENTRY;
1342
1343         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1344                                      LUSTRE_OPC_ANY, dir);
1345         if (IS_ERR(op_data))
1346                 RETURN(PTR_ERR(op_data));
1347
1348         if (IS_ENCRYPTED(dir)) {
1349                 int rc2 = llcrypt_fname_alloc_buffer(dir, NAME_MAX, &lltr);
1350
1351                 if (rc2 < 0)
1352                         RETURN(rc2);
1353         }
1354
1355         /**
1356          *FIXME choose the start offset of the readdir
1357          */
1358
1359         page = ll_get_dir_page(dir, op_data, 0, NULL);
1360
1361         while (1) {
1362                 struct lu_dirpage *dp;
1363                 struct lu_dirent  *ent;
1364
1365                 if (IS_ERR(page)) {
1366                         struct ll_inode_info *lli = ll_i2info(dir);
1367
1368                         rc = PTR_ERR(page);
1369                         CERROR("%s: reading dir "DFID" at %llu opendir_pid = %u : rc = %d\n",
1370                                ll_i2sbi(dir)->ll_fsname,
1371                                PFID(ll_inode2fid(dir)), pos,
1372                                lli->lli_opendir_pid, rc);
1373                         break;
1374                 }
1375
1376                 dp = page_address(page);
1377                 for (ent = lu_dirent_start(dp); ent != NULL;
1378                      ent = lu_dirent_next(ent)) {
1379                         __u64 hash;
1380                         int namelen;
1381                         char *name;
1382
1383                         hash = le64_to_cpu(ent->lde_hash);
1384                         /*
1385                          * The ll_get_dir_page() can return any page containing
1386                          * the given hash which may be not the start hash.
1387                          */
1388                         if (unlikely(hash < pos))
1389                                 continue;
1390
1391                         namelen = le16_to_cpu(ent->lde_namelen);
1392                         if (unlikely(namelen == 0))
1393                                 /*
1394                                  * skip dummy record.
1395                                  */
1396                                 continue;
1397
1398                         name = ent->lde_name;
1399                         if (name[0] == '.') {
1400                                 if (namelen == 1)
1401                                         /*
1402                                          * skip "."
1403                                          */
1404                                         continue;
1405                                 else if (name[1] == '.' && namelen == 2)
1406                                         /*
1407                                          * skip ".."
1408                                          */
1409                                         continue;
1410                                 else
1411                                         dot_de = 1;
1412                         } else {
1413                                 dot_de = 0;
1414                         }
1415
1416                         if (dot_de && target->name[0] != '.') {
1417                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1418                                        target->len, target->name,
1419                                        namelen, name);
1420                                 continue;
1421                         }
1422
1423                         if (IS_ENCRYPTED(dir)) {
1424                                 struct llcrypt_str de_name =
1425                                         LLTR_INIT(ent->lde_name, namelen);
1426                                 struct lu_fid fid;
1427
1428                                 fid_le_to_cpu(&fid, &ent->lde_fid);
1429                                 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1430                                                          &lltr, &fid))
1431                                         continue;
1432                                 name = lltr.name;
1433                                 namelen = lltr.len;
1434                         }
1435
1436                         if (target->len != namelen ||
1437                             memcmp(target->name, name, namelen) != 0)
1438                                 rc = LS_NOT_FIRST_DE;
1439                         else if (!dot_de)
1440                                 rc = LS_FIRST_DE;
1441                         else
1442                                 rc = LS_FIRST_DOT_DE;
1443
1444                         ll_release_page(dir, page, false);
1445                         GOTO(out, rc);
1446                 }
1447                 pos = le64_to_cpu(dp->ldp_hash_end);
1448                 if (pos == MDS_DIR_END_OFF) {
1449                         /*
1450                          * End of directory reached.
1451                          */
1452                         ll_release_page(dir, page, false);
1453                         GOTO(out, rc);
1454                 } else {
1455                         /*
1456                          * chain is exhausted
1457                          * Normal case: continue to the next page.
1458                          */
1459                         ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1460                                               LDF_COLLIDE);
1461                         page = ll_get_dir_page(dir, op_data, pos, NULL);
1462                 }
1463         }
1464         EXIT;
1465 out:
1466         llcrypt_fname_free_buffer(&lltr);
1467         ll_finish_md_op_data(op_data);
1468
1469         return rc;
1470 }
1471
1472 /**
1473  * revalidate @dentryp from statahead cache
1474  *
1475  * \param[in] dir       parent directory
1476  * \param[in] sai       sai structure
1477  * \param[out] dentryp  pointer to dentry which will be revalidated
1478  * \param[in] unplug    unplug statahead window only (normally for negative
1479  *                      dentry)
1480  * \retval              1 on success, dentry is saved in @dentryp
1481  * \retval              0 if revalidation failed (no proper lock on client)
1482  * \retval              negative number upon error
1483  */
1484 static int revalidate_statahead_dentry(struct inode *dir,
1485                                        struct ll_statahead_info *sai,
1486                                        struct dentry **dentryp,
1487                                        bool unplug)
1488 {
1489         struct sa_entry *entry = NULL;
1490         struct ll_inode_info *lli = ll_i2info(dir);
1491         int rc = 0;
1492
1493         ENTRY;
1494
1495         if ((*dentryp)->d_name.name[0] == '.') {
1496                 if (sai->sai_ls_all ||
1497                     sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1498                         /*
1499                          * Hidden dentry is the first one, or statahead
1500                          * thread does not skip so many hidden dentries
1501                          * before "sai_ls_all" enabled as below.
1502                          */
1503                 } else {
1504                         if (!sai->sai_ls_all)
1505                                 /*
1506                                  * It maybe because hidden dentry is not
1507                                  * the first one, "sai_ls_all" was not
1508                                  * set, then "ls -al" missed. Enable
1509                                  * "sai_ls_all" for such case.
1510                                  */
1511                                 sai->sai_ls_all = 1;
1512
1513                         /*
1514                          * Such "getattr" has been skipped before
1515                          * "sai_ls_all" enabled as above.
1516                          */
1517                         sai->sai_miss_hidden++;
1518                         RETURN(-EAGAIN);
1519                 }
1520         }
1521
1522         if (unplug)
1523                 GOTO(out, rc = 1);
1524
1525         entry = sa_get(sai, &(*dentryp)->d_name);
1526         if (!entry)
1527                 GOTO(out, rc = -EAGAIN);
1528
1529         /* if statahead is busy in readdir, help it do post-work */
1530         if (!sa_ready(entry) && sai->sai_in_readpage)
1531                 sa_handle_callback(sai);
1532
1533         if (!sa_ready(entry)) {
1534                 spin_lock(&lli->lli_sa_lock);
1535                 sai->sai_index_wait = entry->se_index;
1536                 spin_unlock(&lli->lli_sa_lock);
1537                 rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
1538                                              cfs_time_seconds(30));
1539                 if (rc == 0) {
1540                         /*
1541                          * entry may not be ready, so it may be used by inflight
1542                          * statahead RPC, don't free it.
1543                          */
1544                         entry = NULL;
1545                         GOTO(out, rc = -EAGAIN);
1546                 }
1547         }
1548
1549         /*
1550          * We need to see the value that was set immediately before we
1551          * were woken up.
1552          */
1553         if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
1554             entry->se_inode) {
1555                 struct inode *inode = entry->se_inode;
1556                 struct lookup_intent it = { .it_op = IT_GETATTR,
1557                                             .it_lock_handle =
1558                                                 entry->se_handle };
1559                 __u64 bits;
1560
1561                 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1562                                         ll_inode2fid(inode), &bits);
1563                 if (rc == 1) {
1564                         if (!(*dentryp)->d_inode) {
1565                                 struct dentry *alias;
1566
1567                                 alias = ll_splice_alias(inode, *dentryp);
1568                                 if (IS_ERR(alias)) {
1569                                         ll_intent_release(&it);
1570                                         GOTO(out, rc = PTR_ERR(alias));
1571                                 }
1572                                 *dentryp = alias;
1573                                 /*
1574                                  * statahead prepared this inode, transfer inode
1575                                  * refcount from sa_entry to dentry
1576                                  */
1577                                 entry->se_inode = NULL;
1578                         } else if ((*dentryp)->d_inode != inode) {
1579                                 /* revalidate, but inode is recreated */
1580                                 CDEBUG(D_READA,
1581                                        "%s: stale dentry %pd inode " DFID", statahead inode "DFID "\n",
1582                                        ll_i2sbi(inode)->ll_fsname, *dentryp,
1583                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
1584                                        PFID(ll_inode2fid(inode)));
1585                                 ll_intent_release(&it);
1586                                 GOTO(out, rc = -ESTALE);
1587                         }
1588
1589                         if ((bits & MDS_INODELOCK_LOOKUP) &&
1590                             d_lustre_invalid(*dentryp)) {
1591                                 d_lustre_revalidate(*dentryp);
1592                                 ll_update_dir_depth(dir, (*dentryp)->d_inode);
1593                         }
1594
1595                         ll_intent_release(&it);
1596                 }
1597         }
1598 out:
1599         /*
1600          * statahead cached sa_entry can be used only once, and will be killed
1601          * right after use, so if lookup/revalidate accessed statahead cache,
1602          * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
1603          * stat this file again, we know we've done statahead before, see
1604          * dentry_may_statahead().
1605          */
1606         if (lld_is_init(*dentryp))
1607                 ll_d2d(*dentryp)->lld_sa_generation = lli->lli_sa_generation;
1608         sa_put(sai, entry);
1609         spin_lock(&lli->lli_sa_lock);
1610         if (sai->sai_task)
1611                 wake_up_process(sai->sai_task);
1612         spin_unlock(&lli->lli_sa_lock);
1613
1614         RETURN(rc);
1615 }
1616
1617 /**
1618  * start statahead thread
1619  *
1620  * \param[in] dir       parent directory
1621  * \param[in] dentry    dentry that triggers statahead, normally the first
1622  *                      dirent under @dir
1623  * \param[in] agl       indicate whether AGL is needed
1624  * \retval              -EAGAIN on success, because when this function is
1625  *                      called, it's already in lookup call, so client should
1626  *                      do it itself instead of waiting for statahead thread
1627  *                      to do it asynchronously.
1628  * \retval              negative number upon error
1629  */
1630 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
1631                                   bool agl)
1632 {
1633         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1634         struct ll_inode_info *lli = ll_i2info(dir);
1635         struct ll_statahead_info *sai = NULL;
1636         struct dentry *parent = dentry->d_parent;
1637         struct task_struct *task;
1638         struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
1639         int first = LS_FIRST_DE;
1640         int rc = 0;
1641
1642         ENTRY;
1643
1644         /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1645         first = is_first_dirent(dir, dentry);
1646         if (first == LS_NOT_FIRST_DE)
1647                 /* It is not "ls -{a}l" operation, no need statahead for it. */
1648                 GOTO(out, rc = -EFAULT);
1649
1650         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
1651                                        sbi->ll_sa_running_max)) {
1652                 CDEBUG(D_READA,
1653                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
1654                 GOTO(out, rc = -EMFILE);
1655         }
1656
1657         sai = ll_sai_alloc(parent);
1658         if (!sai)
1659                 GOTO(out, rc = -ENOMEM);
1660
1661         sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
1662
1663         /*
1664          * if current lli_opendir_key was deauthorized, or dir re-opened by
1665          * another process, don't start statahead, otherwise the newly spawned
1666          * statahead thread won't be notified to quit.
1667          */
1668         spin_lock(&lli->lli_sa_lock);
1669         if (unlikely(lli->lli_sai || !lli->lli_opendir_key ||
1670                      lli->lli_opendir_pid != current->pid)) {
1671                 spin_unlock(&lli->lli_sa_lock);
1672                 GOTO(out, rc = -EPERM);
1673         }
1674         lli->lli_sai = sai;
1675         spin_unlock(&lli->lli_sa_lock);
1676
1677         CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
1678                current->pid, parent);
1679
1680         task = kthread_create_on_node(ll_statahead_thread, parent, node,
1681                                       "ll_sa_%u", lli->lli_opendir_pid);
1682         if (IS_ERR(task)) {
1683                 spin_lock(&lli->lli_sa_lock);
1684                 lli->lli_sai = NULL;
1685                 spin_unlock(&lli->lli_sa_lock);
1686                 rc = PTR_ERR(task);
1687                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1688                 GOTO(out, rc);
1689         }
1690
1691         if (test_bit(LL_SBI_AGL_ENABLED, ll_i2sbi(parent->d_inode)->ll_flags) &&
1692             agl)
1693                 ll_start_agl(parent, sai);
1694
1695         atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_total);
1696         sai->sai_task = task;
1697
1698         wake_up_process(task);
1699         /*
1700          * We don't stat-ahead for the first dirent since we are already in
1701          * lookup.
1702          */
1703         RETURN(-EAGAIN);
1704
1705 out:
1706         /*
1707          * once we start statahead thread failed, disable statahead so that
1708          * subsequent stat won't waste time to try it.
1709          */
1710         spin_lock(&lli->lli_sa_lock);
1711         if (lli->lli_opendir_pid == current->pid)
1712                 lli->lli_sa_enabled = 0;
1713         spin_unlock(&lli->lli_sa_lock);
1714
1715         if (sai)
1716                 ll_sai_free(sai);
1717         if (first != LS_NOT_FIRST_DE)
1718                 atomic_dec(&sbi->ll_sa_running);
1719
1720         RETURN(rc);
1721 }
1722
1723 /*
1724  * Check whether statahead for @dir was started.
1725  */
1726 static inline bool ll_statahead_started(struct inode *dir, bool agl)
1727 {
1728         struct ll_inode_info *lli = ll_i2info(dir);
1729         struct ll_statahead_info *sai;
1730
1731         spin_lock(&lli->lli_sa_lock);
1732         sai = lli->lli_sai;
1733         if (sai && (sai->sai_agl_task != NULL) != agl)
1734                 CDEBUG(D_READA,
1735                        "%s: Statahead AGL hint changed from %d to %d\n",
1736                        ll_i2sbi(dir)->ll_fsname,
1737                        sai->sai_agl_task != NULL, agl);
1738         spin_unlock(&lli->lli_sa_lock);
1739
1740         return !!sai;
1741 }
1742
1743 /**
1744  * statahead entry function, this is called when client getattr on a file, it
1745  * will start statahead thread if this is the first dir entry, else revalidate
1746  * dentry from statahead cache.
1747  *
1748  * \param[in]  dir      parent directory
1749  * \param[out] dentryp  dentry to getattr
1750  * \param[in]  agl      whether start the agl thread
1751  *
1752  * \retval              1 on success
1753  * \retval              0 revalidation from statahead cache failed, caller needs
1754  *                      to getattr from server directly
1755  * \retval              negative number on error, caller often ignores this and
1756  *                      then getattr from server
1757  */
1758 int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
1759 {
1760         if (!ll_statahead_started(dir, agl))
1761                 return start_statahead_thread(dir, dentry, agl);
1762         return 0;
1763 }
1764
1765 /**
1766  * revalidate dentry from statahead cache.
1767  *
1768  * \param[in]  dir      parent directory
1769  * \param[out] dentryp  dentry to getattr
1770  * \param[in]  unplug   unplug statahead window only (normally for negative
1771  *                      dentry)
1772  * \retval              1 on success
1773  * \retval              0 revalidation from statahead cache failed, caller needs
1774  *                      to getattr from server directly
1775  * \retval              negative number on error, caller often ignores this and
1776  *                      then getattr from server
1777  */
1778 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
1779                             bool unplug)
1780 {
1781         struct ll_statahead_info *sai;
1782         int rc = 0;
1783
1784         sai = ll_sai_get(dir);
1785         if (sai) {
1786                 rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
1787                 CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
1788                        *dentryp, rc);
1789                 ll_sai_put(sai);
1790         }
1791         return rc;
1792 }