4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
33 #include <linux/sched.h>
34 #include <linux/kthread.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38 #include <linux/delay.h>
40 #define DEBUG_SUBSYSTEM S_LLITE
42 #include <obd_support.h>
43 #include <lustre_dlm.h>
44 #include "llite_internal.h"
46 #define SA_OMITTED_ENTRY_MAX 8ULL
49 /** negative values are for error cases */
50 SA_ENTRY_INIT = 0, /** init entry */
51 SA_ENTRY_SUCC = 1, /** stat succeed */
52 SA_ENTRY_INVA = 2, /** invalid entry */
56 * sa_entry is not refcounted: statahead thread allocates it and do async stat,
57 * and in async stat callback ll_statahead_interpret() will prepare the inode
58 * and set lock data in the ptlrpcd context. Then the scanner process will be
59 * woken up if this entry is the waiting one, can access and free it.
62 /* link into sai_entries */
63 struct list_head se_list;
64 /* link into sai hash table locally */
65 struct list_head se_hash;
66 /* entry index in the sai */
68 /* low layer ldlm lock handle */
71 enum sa_entry_state se_state;
72 /* entry size, contains name */
74 /* pointer to the target inode */
75 struct inode *se_inode;
76 /* pointer to @sai per process struct */
77 struct ll_statahead_info *se_sai;
84 static unsigned int sai_generation;
85 static DEFINE_SPINLOCK(sai_generation_lock);
87 static inline int sa_unhashed(struct sa_entry *entry)
89 return list_empty(&entry->se_hash);
92 /* sa_entry is ready to use */
93 static inline int sa_ready(struct sa_entry *entry)
95 /* Make sure sa_entry is updated and ready to use */
97 return (entry->se_state != SA_ENTRY_INIT);
100 /* hash value to put in sai_cache */
101 static inline int sa_hash(int val)
103 return val & LL_SA_CACHE_MASK;
106 /* hash entry into sax_cache */
108 sa_rehash(struct ll_statahead_context *ctx, struct sa_entry *entry)
110 int i = sa_hash(entry->se_qstr.hash);
112 spin_lock(&ctx->sax_cache_lock[i]);
113 list_add_tail(&entry->se_hash, &ctx->sax_cache[i]);
114 spin_unlock(&ctx->sax_cache_lock[i]);
117 /* unhash entry from sai_cache */
118 static inline int sa_unhash(struct ll_statahead_context *ctx,
119 struct sa_entry *entry, bool inuse_check)
121 struct ll_statahead_info *sai = entry->se_sai;
122 int i = sa_hash(entry->se_qstr.hash);
125 if (inuse_check && atomic_read(&sai->sai_inuse_count) > 0)
128 spin_lock(&ctx->sax_cache_lock[i]);
129 if (inuse_check && atomic_read(&sai->sai_inuse_count) > 0)
132 list_del_init(&entry->se_hash);
133 spin_unlock(&ctx->sax_cache_lock[i]);
138 static inline int agl_should_run(struct ll_statahead_info *sai,
141 return inode && S_ISREG(inode->i_mode) && sai->sai_agl_task;
144 static inline struct ll_inode_info *
145 agl_first_entry(struct ll_statahead_info *sai)
147 return list_first_entry(&sai->sai_agls, struct ll_inode_info,
151 /* statahead window is full */
152 static inline int sa_sent_full(struct ll_statahead_info *sai)
154 return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
157 /* Batch metadata handle */
158 static inline bool sa_has_batch_handle(struct ll_statahead_info *sai)
160 return sai->sai_bh != NULL;
163 static inline void ll_statahead_flush_nowait(struct ll_statahead_info *sai)
165 if (sa_has_batch_handle(sai)) {
166 sai->sai_index_end = sai->sai_index - 1;
167 (void) md_batch_flush(ll_i2mdexp(sai->sai_dentry->d_inode),
172 static inline int agl_list_empty(struct ll_statahead_info *sai)
174 return list_empty(&sai->sai_agls);
178 * (1) hit ratio less than 80%
180 * (2) consecutive miss more than 32
181 * then means low hit.
183 static inline int sa_low_hit(struct ll_statahead_info *sai)
185 return ((sai->sai_hit > 32 && sai->sai_hit < 4 * sai->sai_miss) ||
186 (sai->sai_consecutive_miss > 32));
190 * if the given index is behind of statahead window more than
191 * SA_OMITTED_ENTRY_MAX, then it is old.
193 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
195 return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
199 /* allocate sa_entry and hash it to allow scanner process to find it */
200 static struct sa_entry *
201 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
202 const char *name, int len, const struct lu_fid *fid)
204 struct ll_inode_info *lli;
205 struct sa_entry *entry;
211 entry_size = sizeof(struct sa_entry) +
212 round_up(len + 1 /* for trailing NUL */, 4);
213 OBD_ALLOC(entry, entry_size);
214 if (unlikely(!entry))
215 RETURN(ERR_PTR(-ENOMEM));
217 CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
218 len, name, entry, index);
220 entry->se_index = index;
223 entry->se_state = SA_ENTRY_INIT;
224 entry->se_size = entry_size;
225 dname = (char *)entry + sizeof(struct sa_entry);
226 memcpy(dname, name, len);
228 entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
229 entry->se_qstr.len = len;
230 entry->se_qstr.name = dname;
233 entry->se_fid = *fid;
235 lli = ll_i2info(sai->sai_dentry->d_inode);
236 spin_lock(&lli->lli_sa_lock);
237 INIT_LIST_HEAD(&entry->se_list);
238 sa_rehash(lli->lli_sax, entry);
239 spin_unlock(&lli->lli_sa_lock);
241 atomic_inc(&sai->sai_cache_count);
246 /* free sa_entry, which should have been unhashed and not in any list */
247 static void sa_free(struct ll_statahead_context *ctx, struct sa_entry *entry)
249 CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
250 entry->se_qstr.len, entry->se_qstr.name, entry,
253 LASSERT(list_empty(&entry->se_list));
254 LASSERT(sa_unhashed(entry));
256 OBD_FREE(entry, entry->se_size);
260 * Find sa_entry by name, used by directory scanner. If @sai_pid is not the PID
261 * of the scanner (which means it may do statahead wrongly, return -EINVAL
264 static struct sa_entry *sa_get(struct ll_statahead_context *ctx,
265 const struct qstr *qstr,
266 struct ll_statahead_info **info)
268 struct sa_entry *entry;
269 int i = sa_hash(qstr->hash);
271 spin_lock(&ctx->sax_cache_lock[i]);
272 list_for_each_entry(entry, &ctx->sax_cache[i], se_hash) {
273 if (entry->se_qstr.hash == qstr->hash &&
274 entry->se_qstr.len == qstr->len &&
275 memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) {
276 struct ll_statahead_info *sai = entry->se_sai;
278 if (sai->sai_pid != current->pid) {
280 "%s: wrong pid=%d:%d for entry %.*s\n",
281 ll_i2sbi(ctx->sax_inode)->ll_fsname,
282 sai->sai_pid, current->pid,
283 entry->se_qstr.len, entry->se_qstr.name);
284 entry = ERR_PTR(-EINVAL);
288 atomic_inc(&sai->sai_inuse_count);
289 spin_unlock(&ctx->sax_cache_lock[i]);
293 spin_unlock(&ctx->sax_cache_lock[i]);
297 /* unhash and unlink sa_entry, and then free it */
298 static inline int sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry,
299 bool locked, bool inuse_check)
301 struct inode *dir = sai->sai_dentry->d_inode;
302 struct ll_inode_info *lli = ll_i2info(dir);
303 struct ll_statahead_context *ctx = lli->lli_sax;
306 LASSERT(!list_empty(&entry->se_list));
307 LASSERT(sa_ready(entry));
309 rc = sa_unhash(ctx, entry, inuse_check);
314 spin_lock(&lli->lli_sa_lock);
315 list_del_init(&entry->se_list);
316 spin_unlock(&lli->lli_sa_lock);
318 iput(entry->se_inode);
319 atomic_dec(&sai->sai_cache_count);
323 spin_lock(&lli->lli_sa_lock);
328 static inline int sa_kill_try(struct ll_statahead_info *sai,
329 struct sa_entry *entry, bool locked)
331 return sa_kill(sai, entry, locked, true);
334 /* called by scanner after use, sa_entry will be killed */
336 sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
338 struct ll_inode_info *lli = ll_i2info(dir);
339 struct sa_entry *tmp;
343 if (entry && entry->se_state == SA_ENTRY_SUCC) {
344 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
347 sai->sai_consecutive_miss = 0;
348 if (sai->sai_max < sbi->ll_sa_max) {
349 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
351 } else if (sai->sai_max_batch_count > 0) {
352 if (sai->sai_max >= sai->sai_max_batch_count &&
353 (sai->sai_index_end - entry->se_index) %
354 sai->sai_max_batch_count == 0) {
356 } else if (entry->se_index == sai->sai_index_end) {
364 sai->sai_consecutive_miss++;
370 sa_kill(sai, entry, false, false);
371 CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_STATAHEAD_PAUSE, cfs_fail_val);
374 spin_lock(&lli->lli_sa_lock);
377 * kill old completed entries. Maybe kicking old entries can
380 while ((tmp = list_first_entry_or_null(&sai->sai_entries,
381 struct sa_entry, se_list))) {
382 if (!is_omitted_entry(sai, tmp->se_index))
385 /* ll_sa_lock is dropped by sa_kill(), restart list */
386 sa_kill(sai, tmp, true, false);
389 if (wakeup && sai->sai_task)
390 wake_up_process(sai->sai_task);
392 atomic_dec(&sai->sai_inuse_count);
393 spin_unlock(&lli->lli_sa_lock);
397 * update state and sort add entry to sai_entries by index, return true if
398 * scanner is waiting on this entry.
401 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
404 struct list_head *pos = &sai->sai_entries;
405 __u64 index = entry->se_index;
407 LASSERT(!sa_ready(entry));
408 LASSERT(list_empty(&entry->se_list));
410 list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
411 if (se->se_index < entry->se_index) {
416 list_add(&entry->se_list, pos);
418 * LU-9210: ll_statahead_interpet must be able to see this before
421 smp_store_release(&entry->se_state,
422 ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
424 return (index == sai->sai_index_wait);
427 /* finish async stat RPC arguments */
428 static void sa_fini_data(struct md_op_item *item)
430 struct md_op_data *op_data = &item->mop_data;
432 if (op_data->op_flags & MF_OPNAME_KMALLOCED)
433 /* allocated via ll_setup_filename called from sa_prep_data */
434 kfree(op_data->op_name);
435 ll_unlock_md_op_lsm(&item->mop_data);
437 if (item->mop_subpill_allocated)
438 OBD_FREE_PTR(item->mop_pill);
442 static int ll_statahead_interpret(struct md_op_item *item, int rc);
445 * prepare arguments for async stat RPC.
447 static struct md_op_item *
448 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
450 struct md_op_item *item;
451 struct ldlm_enqueue_info *einfo;
452 struct md_op_data *op_data;
456 return ERR_PTR(-ENOMEM);
458 op_data = ll_prep_md_op_data(&item->mop_data, dir, child,
459 entry->se_qstr.name, entry->se_qstr.len, 0,
460 LUSTRE_OPC_ANY, NULL);
461 if (IS_ERR(op_data)) {
463 return (struct md_op_item *)op_data;
467 op_data->op_fid2 = entry->se_fid;
469 item->mop_opc = MD_OP_GETATTR;
470 item->mop_it.it_op = IT_GETATTR;
471 item->mop_dir = igrab(dir);
472 item->mop_cb = ll_statahead_interpret;
473 item->mop_cbdata = entry;
475 einfo = &item->mop_einfo;
476 einfo->ei_type = LDLM_IBITS;
477 einfo->ei_mode = it_to_lock_mode(&item->mop_it);
478 einfo->ei_cb_bl = ll_md_blocking_ast;
479 einfo->ei_cb_cp = ldlm_completion_ast;
480 einfo->ei_cb_gl = NULL;
481 einfo->ei_cbdata = NULL;
482 einfo->ei_req_slot = 1;
488 * release resources used in async stat RPC, update entry state and wakeup if
489 * scanner process it waiting on this entry.
492 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
494 struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
497 spin_lock(&lli->lli_sa_lock);
498 wakeup = __sa_make_ready(sai, entry, ret);
499 spin_unlock(&lli->lli_sa_lock);
502 wake_up(&sai->sai_waitq);
505 /* insert inode into the list of sai_agls */
506 static void ll_agl_add(struct ll_statahead_info *sai,
507 struct inode *inode, int index)
509 struct ll_inode_info *child = ll_i2info(inode);
510 struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
512 spin_lock(&child->lli_agl_lock);
513 if (child->lli_agl_index == 0) {
514 child->lli_agl_index = index;
515 spin_unlock(&child->lli_agl_lock);
517 LASSERT(list_empty(&child->lli_agl_list));
519 spin_lock(&parent->lli_agl_lock);
520 /* Re-check under the lock */
521 if (agl_should_run(sai, inode)) {
522 if (agl_list_empty(sai))
523 wake_up_process(sai->sai_agl_task);
525 list_add_tail(&child->lli_agl_list, &sai->sai_agls);
527 child->lli_agl_index = 0;
528 spin_unlock(&parent->lli_agl_lock);
530 spin_unlock(&child->lli_agl_lock);
535 static struct ll_statahead_context *ll_sax_alloc(struct inode *dir)
537 struct ll_statahead_context *ctx;
546 ctx->sax_inode = igrab(dir);
547 atomic_set(&ctx->sax_refcount, 1);
548 INIT_LIST_HEAD(&ctx->sax_sai_list);
549 for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
550 INIT_LIST_HEAD(&ctx->sax_cache[i]);
551 spin_lock_init(&ctx->sax_cache_lock[i]);
557 static inline void ll_sax_free(struct ll_statahead_context *ctx)
559 LASSERT(ctx->sax_inode != NULL);
560 iput(ctx->sax_inode);
564 static inline void __ll_sax_get(struct ll_statahead_context *ctx)
566 atomic_inc(&ctx->sax_refcount);
569 static inline struct ll_statahead_context *ll_sax_get(struct inode *dir)
571 struct ll_inode_info *lli = ll_i2info(dir);
572 struct ll_statahead_context *ctx = NULL;
574 spin_lock(&lli->lli_sa_lock);
578 spin_unlock(&lli->lli_sa_lock);
583 static inline void ll_sax_put(struct inode *dir,
584 struct ll_statahead_context *ctx)
586 struct ll_inode_info *lli = ll_i2info(dir);
588 if (atomic_dec_and_lock(&ctx->sax_refcount, &lli->lli_sa_lock)) {
589 LASSERT(list_empty(&ctx->sax_sai_list));
592 if (lli->lli_sa_pattern & (LSA_PATTERN_ADVISE |
593 LSA_PATTERN_FNAME)) {
594 lli->lli_opendir_key = NULL;
595 lli->lli_stat_pid = 0;
596 lli->lli_sa_enabled = 0;
598 lli->lli_sa_pattern = LSA_PATTERN_NONE;
599 spin_unlock(&lli->lli_sa_lock);
606 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
608 struct ll_statahead_info *sai;
609 struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
617 sai->sai_dentry = dget(dentry);
618 atomic_set(&sai->sai_refcount, 1);
619 sai->sai_max = ll_i2sbi(dentry->d_inode)->ll_sa_min;
621 init_waitqueue_head(&sai->sai_waitq);
623 INIT_LIST_HEAD(&sai->sai_item);
624 INIT_LIST_HEAD(&sai->sai_entries);
625 INIT_LIST_HEAD(&sai->sai_agls);
627 atomic_set(&sai->sai_cache_count, 0);
628 atomic_set(&sai->sai_inuse_count, 0);
629 spin_lock(&sai_generation_lock);
630 lli->lli_sa_generation = ++sai_generation;
631 if (unlikely(sai_generation == 0))
632 lli->lli_sa_generation = ++sai_generation;
633 spin_unlock(&sai_generation_lock);
639 static inline void ll_sai_free(struct ll_statahead_info *sai)
641 LASSERT(sai->sai_dentry != NULL);
642 dput(sai->sai_dentry);
646 static inline struct ll_statahead_info *
647 __ll_sai_get(struct ll_statahead_info *sai)
649 atomic_inc(&sai->sai_refcount);
654 * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
657 static void ll_sai_put(struct ll_statahead_info *sai)
659 struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
661 if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
662 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
665 list_del_init(&sai->sai_item);
666 spin_unlock(&lli->lli_sa_lock);
668 LASSERT(!sai->sai_task);
669 LASSERT(!sai->sai_agl_task);
670 LASSERT(sai->sai_sent == sai->sai_replied);
672 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
673 LASSERT(agl_list_empty(sai));
676 atomic_dec(&sbi->ll_sa_running);
680 /* Do NOT forget to drop inode refcount when into sai_agls. */
681 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
683 struct ll_inode_info *lli = ll_i2info(inode);
684 u64 index = lli->lli_agl_index;
690 LASSERT(list_empty(&lli->lli_agl_list));
692 /* AGL maybe fall behind statahead with one entry */
693 if (is_omitted_entry(sai, index + 1)) {
694 lli->lli_agl_index = 0;
700 * In case of restore, the MDT has the right size and has already
701 * sent it back without granting the layout lock, inode is up-to-date.
702 * Then AGL (async glimpse lock) is useless.
703 * Also to glimpse we need the layout, in case of a runninh restore
704 * the MDT holds the layout lock so the glimpse will block up to the
705 * end of restore (statahead/agl will block)
707 if (test_bit(LLIF_FILE_RESTORING, &lli->lli_flags)) {
708 lli->lli_agl_index = 0;
713 /* Someone is in glimpse (sync or async), do nothing. */
714 rc = down_write_trylock(&lli->lli_glimpse_sem);
716 lli->lli_agl_index = 0;
722 * Someone triggered glimpse within 1 sec before.
723 * 1) The former glimpse succeeded with glimpse lock granted by OST, and
724 * if the lock is still cached on client, AGL needs to do nothing. If
725 * it is cancelled by other client, AGL maybe cannot obtaion new lock
726 * for no glimpse callback triggered by AGL.
727 * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
728 * Under such case, it is quite possible that the OST will not grant
729 * glimpse lock for AGL also.
730 * 3) The former glimpse failed, compared with other two cases, it is
731 * relative rare. AGL can ignore such case, and it will not muchly
732 * affect the performance.
734 expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
735 if (ktime_to_ns(lli->lli_glimpse_time) &&
736 ktime_before(expire, lli->lli_glimpse_time)) {
737 up_write(&lli->lli_glimpse_sem);
738 lli->lli_agl_index = 0;
744 "Handling (init) async glimpse: inode = " DFID", idx = %llu\n",
745 PFID(&lli->lli_fid), index);
748 lli->lli_agl_index = 0;
749 lli->lli_glimpse_time = ktime_get();
750 up_write(&lli->lli_glimpse_sem);
753 "Handled (init) async glimpse: inode= " DFID", idx = %llu, rc = %d\n",
754 PFID(&lli->lli_fid), index, rc);
761 static void ll_statahead_interpret_fini(struct ll_inode_info *lli,
762 struct ll_statahead_info *sai,
763 struct md_op_item *item,
764 struct sa_entry *entry,
765 struct ptlrpc_request *req,
769 * First it will drop ldlm ibits lock refcount by calling
770 * ll_intent_drop_lock() in spite of failures. Do not worry about
771 * calling ll_intent_drop_lock() more than once.
773 ll_intent_release(&item->mop_it);
776 ptlrpc_req_finished(req);
777 sa_make_ready(sai, entry, rc);
779 spin_lock(&lli->lli_sa_lock);
781 spin_unlock(&lli->lli_sa_lock);
784 static void ll_statahead_interpret_work(struct work_struct *work)
786 struct md_op_item *item = container_of(work, struct md_op_item,
788 struct req_capsule *pill = item->mop_pill;
789 struct inode *dir = item->mop_dir;
790 struct ll_inode_info *lli = ll_i2info(dir);
791 struct ll_statahead_info *sai;
792 struct lookup_intent *it;
793 struct sa_entry *entry;
794 struct mdt_body *body;
800 entry = (struct sa_entry *)item->mop_cbdata;
801 LASSERT(entry->se_handle != 0);
805 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
807 GOTO(out, rc = -EFAULT);
809 child = entry->se_inode;
810 /* revalidate; unlinked and re-created with the same name */
811 if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
812 !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
814 entry->se_inode = NULL;
817 /* The mdt_body is invalid. Skip this entry */
818 GOTO(out, rc = -EAGAIN);
821 it->it_lock_handle = entry->se_handle;
822 rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
824 GOTO(out, rc = -EAGAIN);
826 rc = ll_prep_inode(&child, pill, dir->i_sb, it);
828 CERROR("%s: getattr callback for %.*s "DFID": rc = %d\n",
829 ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
830 entry->se_qstr.name, PFID(&entry->se_fid), rc);
834 /* If encryption context was returned by MDT, put it in
835 * inode now to save an extra getxattr.
837 if (body->mbo_valid & OBD_MD_ENCCTX) {
838 void *encctx = req_capsule_server_get(pill, &RMF_FILE_ENCCTX);
839 __u32 encctxlen = req_capsule_get_size(pill, &RMF_FILE_ENCCTX,
844 "server returned encryption ctx for "DFID"\n",
845 PFID(ll_inode2fid(child)));
846 rc = ll_xattr_cache_insert(child,
847 xattr_for_enc(child),
850 CWARN("%s: cannot set enc ctx for "DFID": rc = %d\n",
851 ll_i2sbi(child)->ll_fsname,
852 PFID(ll_inode2fid(child)), rc);
856 CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
857 ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
858 entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
859 ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
861 entry->se_inode = child;
863 if (agl_should_run(sai, child))
864 ll_agl_add(sai, child, entry->se_index);
866 ll_statahead_interpret_fini(lli, sai, item, entry, pill->rc_req, rc);
870 * Callback for async stat RPC, this is called in ptlrpcd context. It prepares
871 * the inode and set lock data directly in the ptlrpcd context. It will wake up
872 * the directory listing process if the dentry is the waiting one.
874 static int ll_statahead_interpret(struct md_op_item *item, int rc)
876 struct req_capsule *pill = item->mop_pill;
877 struct lookup_intent *it = &item->mop_it;
878 struct inode *dir = item->mop_dir;
879 struct ll_inode_info *lli = ll_i2info(dir);
880 struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
881 struct work_struct *work = &item->mop_work;
882 struct ll_statahead_info *sai;
883 struct mdt_body *body;
889 if (it_disposition(it, DISP_LOOKUP_NEG))
893 * because statahead thread will wait for all inflight RPC to finish,
894 * sai should be always valid, no need to refcount
896 LASSERT(entry != NULL);
898 LASSERT(sai != NULL);
900 CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
901 entry->se_qstr.len, entry->se_qstr.name, rc);
906 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
908 GOTO(out, rc = -EFAULT);
910 child = entry->se_inode;
912 * revalidate; unlinked and re-created with the same name.
913 * exclude the case where FID is zero as it was from statahead with
914 * regularized file name pattern and had no idea for the FID of the
917 if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
918 !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
920 entry->se_inode = NULL;
923 /* The mdt_body is invalid. Skip this entry */
924 GOTO(out, rc = -EAGAIN);
927 entry->se_handle = it->it_lock_handle;
929 * In ptlrpcd context, it is not allowed to generate new RPCs
930 * especially for striped directories or regular files with layout
934 * release ibits lock ASAP to avoid deadlock when statahead
935 * thread enqueues lock on parent in readdir and another
936 * process enqueues lock on child with parent lock held, eg.
939 handle = it->it_lock_handle;
940 ll_intent_drop_lock(it);
941 ll_unlock_md_op_lsm(&item->mop_data);
944 * If the statahead entry is a striped directory or regular file with
945 * layout change, it will generate a new RPC and long wait in the
947 * However, it is dangerous of blocking in ptlrpcd thread.
948 * Here we use work queue or the separate statahead thread to handle
949 * the extra RPC and long wait:
950 * (@ll_prep_inode->@lmv_revalidate_slaves);
951 * (@ll_prep_inode->@lov_layout_change->osc_cache_wait_range);
953 INIT_WORK(work, ll_statahead_interpret_work);
954 ptlrpc_request_addref(pill->rc_req);
958 ll_statahead_interpret_fini(lli, sai, item, entry, NULL, rc);
962 static inline int sa_getattr(struct ll_statahead_info *sai, struct inode *dir,
963 struct md_op_item *item)
967 if (sa_has_batch_handle(sai))
968 rc = md_batch_add(ll_i2mdexp(dir), sai->sai_bh, item);
970 rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
975 /* async stat for file not found in dcache */
976 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
978 struct md_op_item *item;
983 item = sa_prep_data(dir, NULL, entry);
985 RETURN(PTR_ERR(item));
987 rc = sa_getattr(entry->se_sai, dir, item);
995 * async stat for file found in dcache, similar to .revalidate
997 * \retval 1 dentry valid, no RPC sent
998 * \retval 0 dentry invalid, will send async stat RPC
999 * \retval negative number upon error
1001 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
1002 struct dentry *dentry)
1004 struct inode *inode = dentry->d_inode;
1005 struct lookup_intent it = { .it_op = IT_GETATTR,
1006 .it_lock_handle = 0 };
1007 struct md_op_item *item;
1012 if (unlikely(!inode))
1015 if (d_mountpoint(dentry))
1018 item = sa_prep_data(dir, inode, entry);
1020 RETURN(PTR_ERR(item));
1022 entry->se_inode = igrab(inode);
1023 rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
1026 entry->se_handle = it.it_lock_handle;
1027 ll_intent_release(&it);
1032 rc = sa_getattr(entry->se_sai, dir, item);
1034 entry->se_inode = NULL;
1042 /* async stat for file with @name */
1043 static void sa_statahead(struct ll_statahead_info *sai, struct dentry *parent,
1044 const char *name, int len, const struct lu_fid *fid)
1046 struct inode *dir = parent->d_inode;
1047 struct dentry *dentry = NULL;
1048 struct sa_entry *entry;
1053 entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
1057 dentry = d_lookup(parent, &entry->se_qstr);
1059 rc = sa_lookup(dir, entry);
1061 rc = sa_revalidate(dir, entry, dentry);
1062 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
1063 ll_agl_add(sai, dentry->d_inode, entry->se_index);
1070 sa_make_ready(sai, entry, rc);
1076 if (sa_sent_full(sai))
1077 ll_statahead_flush_nowait(sai);
1082 /* async glimpse (agl) thread main function */
1083 static int ll_agl_thread(void *arg)
1086 * We already own this reference, so it is safe to take it
1089 struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
1090 struct dentry *parent = sai->sai_dentry;
1091 struct inode *dir = parent->d_inode;
1092 struct ll_inode_info *plli = ll_i2info(dir);
1093 struct ll_inode_info *clli;
1097 CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
1100 while (({set_current_state(TASK_IDLE);
1101 !kthread_should_stop(); })) {
1102 spin_lock(&plli->lli_agl_lock);
1103 clli = list_first_entry_or_null(&sai->sai_agls,
1104 struct ll_inode_info,
1107 __set_current_state(TASK_RUNNING);
1108 list_del_init(&clli->lli_agl_list);
1109 spin_unlock(&plli->lli_agl_lock);
1110 ll_agl_trigger(&clli->lli_vfs_inode, sai);
1113 spin_unlock(&plli->lli_agl_lock);
1117 __set_current_state(TASK_RUNNING);
1121 static void ll_stop_agl(struct ll_statahead_info *sai)
1123 struct dentry *parent = sai->sai_dentry;
1124 struct ll_inode_info *plli = ll_i2info(parent->d_inode);
1125 struct ll_inode_info *clli;
1126 struct task_struct *agl_task;
1128 spin_lock(&plli->lli_agl_lock);
1129 agl_task = sai->sai_agl_task;
1130 sai->sai_agl_task = NULL;
1131 spin_unlock(&plli->lli_agl_lock);
1135 CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
1136 sai, (unsigned int)agl_task->pid);
1137 kthread_stop(agl_task);
1139 spin_lock(&plli->lli_agl_lock);
1140 while ((clli = list_first_entry_or_null(&sai->sai_agls,
1141 struct ll_inode_info,
1142 lli_agl_list)) != NULL) {
1143 list_del_init(&clli->lli_agl_list);
1144 spin_unlock(&plli->lli_agl_lock);
1145 clli->lli_agl_index = 0;
1146 iput(&clli->lli_vfs_inode);
1147 spin_lock(&plli->lli_agl_lock);
1149 spin_unlock(&plli->lli_agl_lock);
1150 CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
1155 /* start agl thread */
1156 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
1158 int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1159 struct ll_inode_info *plli;
1160 struct task_struct *task;
1164 CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
1167 plli = ll_i2info(parent->d_inode);
1168 task = kthread_create_on_node(ll_agl_thread, sai, node, "ll_agl_%d",
1169 plli->lli_stat_pid);
1171 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
1174 sai->sai_agl_task = task;
1175 atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
1176 /* Get an extra reference that the thread holds */
1179 wake_up_process(task);
1184 static int ll_statahead_by_list(struct dentry *parent)
1186 struct inode *dir = parent->d_inode;
1187 struct ll_inode_info *lli = ll_i2info(dir);
1188 struct ll_statahead_info *sai = lli->lli_sai;
1189 struct ll_sb_info *sbi = ll_i2sbi(dir);
1190 struct md_op_data *op_data;
1191 struct page *page = NULL;
1198 CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1201 OBD_ALLOC_PTR(op_data);
1205 while (pos != MDS_DIR_END_OFF &&
1206 /* matches smp_store_release() in ll_deauthorize_statahead() */
1207 smp_load_acquire(&sai->sai_task) &&
1208 lli->lli_sa_enabled) {
1209 struct lu_dirpage *dp;
1210 struct lu_dirent *ent;
1212 op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
1213 LUSTRE_OPC_ANY, dir);
1214 if (IS_ERR(op_data)) {
1215 rc = PTR_ERR(op_data);
1219 page = ll_get_dir_page(dir, op_data, pos, NULL);
1220 ll_unlock_md_op_lsm(op_data);
1224 "error reading dir "DFID" at %llu /%llu stat_pid = %u: rc = %d\n",
1225 PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1226 lli->lli_stat_pid, rc);
1230 dp = page_address(page);
1231 for (ent = lu_dirent_start(dp);
1232 /* matches smp_store_release() in ll_deauthorize_statahead() */
1233 ent != NULL && smp_load_acquire(&sai->sai_task) &&
1234 !sa_low_hit(sai) && lli->lli_sa_enabled;
1235 ent = lu_dirent_next(ent)) {
1240 struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1242 hash = le64_to_cpu(ent->lde_hash);
1243 if (unlikely(hash < pos))
1245 * Skip until we find target hash value.
1249 namelen = le16_to_cpu(ent->lde_namelen);
1250 if (unlikely(namelen == 0))
1252 * Skip dummy record.
1256 name = ent->lde_name;
1257 if (name[0] == '.') {
1263 } else if (name[1] == '.' && namelen == 2) {
1268 } else if (!sai->sai_ls_all) {
1270 * skip hidden files.
1272 sai->sai_skip_hidden++;
1278 * don't stat-ahead first entry.
1280 if (unlikely(++first == 1))
1283 fid_le_to_cpu(&fid, &ent->lde_fid);
1285 while (({set_current_state(TASK_IDLE);
1286 /* matches smp_store_release() in
1287 * ll_deauthorize_statahead()
1289 smp_load_acquire(&sai->sai_task); })) {
1292 spin_lock(&lli->lli_agl_lock);
1293 while (sa_sent_full(sai) &&
1294 !agl_list_empty(sai)) {
1295 struct ll_inode_info *clli;
1297 __set_current_state(TASK_RUNNING);
1298 clli = agl_first_entry(sai);
1299 list_del_init(&clli->lli_agl_list);
1300 spin_unlock(&lli->lli_agl_lock);
1302 ll_agl_trigger(&clli->lli_vfs_inode,
1305 spin_lock(&lli->lli_agl_lock);
1307 spin_unlock(&lli->lli_agl_lock);
1309 if (!sa_sent_full(sai))
1313 * If the thread is not doing stat in
1314 * @sbi->ll_sa_timeout (30s) then it probably
1315 * does not care too much about performance,
1316 * or is no longer using this directory.
1317 * Stop the statahead thread in this case.
1319 timeout = schedule_timeout(
1320 cfs_time_seconds(sbi->ll_sa_timeout));
1322 lli->lli_sa_enabled = 0;
1326 __set_current_state(TASK_RUNNING);
1328 if (IS_ENCRYPTED(dir)) {
1329 struct llcrypt_str de_name =
1330 LLTR_INIT(ent->lde_name, namelen);
1333 rc = llcrypt_fname_alloc_buffer(dir, NAME_MAX,
1338 fid_le_to_cpu(&fid, &ent->lde_fid);
1339 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1341 llcrypt_fname_free_buffer(&lltr);
1349 sa_statahead(sai, parent, name, namelen, &fid);
1350 llcrypt_fname_free_buffer(&lltr);
1353 pos = le64_to_cpu(dp->ldp_hash_end);
1354 ll_release_page(dir, page,
1355 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1357 if (sa_low_hit(sai)) {
1359 atomic_inc(&sbi->ll_sa_wrong);
1361 "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stoppingstatahead thread: pid %d\n",
1362 PFID(&lli->lli_fid), sai->sai_hit,
1363 sai->sai_miss, sai->sai_sent,
1364 sai->sai_replied, current->pid);
1368 ll_finish_md_op_data(op_data);
1373 static void ll_statahead_handle(struct ll_statahead_info *sai,
1374 struct dentry *parent, const char *name,
1375 int len, const struct lu_fid *fid)
1377 struct inode *dir = parent->d_inode;
1378 struct ll_inode_info *lli = ll_i2info(dir);
1379 struct ll_sb_info *sbi = ll_i2sbi(dir);
1382 while (({set_current_state(TASK_IDLE);
1383 /* matches smp_store_release() in ll_deauthorize_statahead() */
1384 smp_load_acquire(&sai->sai_task); })) {
1385 spin_lock(&lli->lli_agl_lock);
1386 while (sa_sent_full(sai) && !agl_list_empty(sai)) {
1387 struct ll_inode_info *clli;
1389 __set_current_state(TASK_RUNNING);
1390 clli = agl_first_entry(sai);
1391 list_del_init(&clli->lli_agl_list);
1392 spin_unlock(&lli->lli_agl_lock);
1394 ll_agl_trigger(&clli->lli_vfs_inode, sai);
1396 spin_lock(&lli->lli_agl_lock);
1398 spin_unlock(&lli->lli_agl_lock);
1400 if (!sa_sent_full(sai))
1404 * If the thread is not doing a stat in 30s then it probably
1405 * does not care too much about performance, or is no longer
1406 * using this directory. Stop the statahead thread in this case.
1408 timeout = schedule_timeout(
1409 cfs_time_seconds(sbi->ll_sa_timeout));
1411 lli->lli_sa_enabled = 0;
1415 __set_current_state(TASK_RUNNING);
1417 sa_statahead(sai, parent, name, len, fid);
1420 static int ll_statahead_by_advise(struct ll_statahead_info *sai,
1421 struct dentry *parent)
1423 struct inode *dir = parent->d_inode;
1424 struct ll_inode_info *lli = ll_i2info(dir);
1425 struct ll_sb_info *sbi = ll_i2sbi(dir);
1435 CDEBUG(D_READA, "%s: ADVISE statahead: parent %pd fname prefix %s\n",
1436 sbi->ll_fsname, parent, sai->sai_fname);
1438 OBD_ALLOC(fname, NAME_MAX);
1442 len = strlen(sai->sai_fname);
1443 memcpy(fname, sai->sai_fname, len);
1444 max_len = sizeof(sai->sai_fname) - len;
1447 /* matches smp_store_release() in ll_deauthorize_statahead() */
1448 while (smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled) {
1451 numlen = snprintf(ptr, max_len, "%llu",
1452 sai->sai_fstart + i);
1454 ll_statahead_handle(sai, parent, fname, len + numlen, NULL);
1455 if (++i >= sai->sai_fend)
1459 OBD_FREE(fname, NAME_MAX);
1463 static int ll_statahead_by_fname(struct ll_statahead_info *sai,
1464 struct dentry *parent)
1466 struct inode *dir = parent->d_inode;
1467 struct ll_inode_info *lli = ll_i2info(dir);
1468 struct ll_sb_info *sbi = ll_i2sbi(dir);
1477 CDEBUG(D_READA, "%s: FNAME statahead: parent %pd fname prefix %s\n",
1478 sbi->ll_fsname, parent, sai->sai_fname);
1480 OBD_ALLOC(fname, NAME_MAX);
1484 len = strlen(sai->sai_fname);
1485 memcpy(fname, sai->sai_fname, len);
1486 max_len = sizeof(sai->sai_fname) - len;
1489 /* matches smp_store_release() in ll_deauthorize_statahead() */
1490 while (smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled) {
1493 if (sai->sai_fname_zeroed_len)
1494 numlen = snprintf(ptr, max_len, "%0*llu",
1495 sai->sai_fname_zeroed_len,
1496 ++sai->sai_fname_index);
1498 numlen = snprintf(ptr, max_len, "%llu",
1499 ++sai->sai_fname_index);
1501 ll_statahead_handle(sai, parent, fname, len + numlen, NULL);
1503 if (sa_low_hit(sai)) {
1505 atomic_inc(&sbi->ll_sa_wrong);
1506 CDEBUG(D_CACHE, "%s: low hit ratio for %pd "DFID": hit=%llu miss=%llu sent=%llu replied=%llu, stopping PID %d\n",
1507 sbi->ll_fsname, parent, PFID(ll_inode2fid(dir)),
1508 sai->sai_hit, sai->sai_miss, sai->sai_sent,
1509 sai->sai_replied, current->pid);
1514 OBD_FREE(fname, NAME_MAX);
1518 /* statahead thread main function */
1519 static int ll_statahead_thread(void *arg)
1521 struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
1522 struct dentry *parent = sai->sai_dentry;
1523 struct inode *dir = parent->d_inode;
1524 struct ll_inode_info *lli = ll_i2info(dir);
1525 struct ll_sb_info *sbi = ll_i2sbi(dir);
1526 struct lu_batch *bh = NULL;
1527 struct sa_entry *entry;
1533 CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1536 if (exp_connect_batch_rpc(sbi->ll_md_exp))
1537 sai->sai_max_batch_count = sbi->ll_sa_batch_max;
1539 sai->sai_max_batch_count = 0;
1541 if (sai->sai_max_batch_count) {
1542 bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
1543 sai->sai_max_batch_count);
1545 GOTO(out_stop_agl, rc = PTR_ERR(bh));
1550 switch (lli->lli_sa_pattern & LSA_PATTERN_MASK) {
1551 case LSA_PATTERN_LIST:
1552 rc = ll_statahead_by_list(parent);
1554 case LSA_PATTERN_ADVISE:
1555 rc = ll_statahead_by_advise(sai, parent);
1557 case LSA_PATTERN_FNAME:
1558 rc = ll_statahead_by_fname(sai, parent);
1566 spin_lock(&lli->lli_sa_lock);
1567 sai->sai_task = NULL;
1568 spin_unlock(&lli->lli_sa_lock);
1571 ll_statahead_flush_nowait(sai);
1574 * statahead is finished, but statahead entries need to be cached, wait
1575 * for file release closedir() call to stop me.
1577 while (({set_current_state(TASK_IDLE);
1578 /* matches smp_store_release() in ll_deauthorize_statahead() */
1579 smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled; })) {
1582 __set_current_state(TASK_RUNNING);
1587 rc = md_batch_stop(ll_i2mdexp(dir), sai->sai_bh);
1595 * wait for inflight statahead RPCs to finish, and then we can free sai
1596 * safely because statahead RPC will access sai data
1598 while (sai->sai_sent != sai->sai_replied)
1599 /* in case we're not woken up, timeout wait */
1602 CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd hit %llu miss %llu\n",
1603 sbi->ll_fsname, sai, parent, sai->sai_hit, sai->sai_miss);
1605 spin_lock(&lli->lli_sa_lock);
1606 sai->sai_task = NULL;
1607 spin_unlock(&lli->lli_sa_lock);
1608 wake_up(&sai->sai_waitq);
1610 atomic_add(sai->sai_hit, &sbi->ll_sa_hit_total);
1611 atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
1613 /* Kill all local cached entry. */
1614 spin_lock(&lli->lli_sa_lock);
1615 while ((entry = list_first_entry_or_null(&sai->sai_entries,
1616 struct sa_entry, se_list))) {
1618 * If the entry is being used by the user process, wait for
1619 * inuse entry finished and restart to kill local cached
1622 if (sa_kill_try(sai, entry, true)) {
1623 spin_unlock(&lli->lli_sa_lock);
1625 if (++tries % 1024 == 0) {
1626 CWARN("%s: statahead thread waited %lums for inuse entry "DFID" to be finished\n",
1627 sbi->ll_fsname, tries * 125/MSEC_PER_SEC,
1628 PFID(&entry->se_fid));
1630 spin_lock(&lli->lli_sa_lock);
1633 spin_unlock(&lli->lli_sa_lock);
1636 ll_sax_put(dir, lli->lli_sax);
1641 /* authorize opened dir handle @key to statahead */
1642 void ll_authorize_statahead(struct inode *dir, void *key)
1644 struct ll_inode_info *lli = ll_i2info(dir);
1646 spin_lock(&lli->lli_sa_lock);
1647 if (!lli->lli_opendir_key && !lli->lli_sai) {
1649 * if lli_sai is not NULL, it means previous statahead is not
1650 * finished yet, we'd better not start a new statahead for now.
1652 lli->lli_opendir_key = key;
1653 lli->lli_stat_pid = current->pid;
1654 lli->lli_sa_enabled = 1;
1655 lli->lli_sa_pattern |= LSA_PATTERN_OPENDIR;
1657 spin_unlock(&lli->lli_sa_lock);
1660 static void ll_deauthorize_statahead_advise(struct inode *dir, void *key)
1662 struct ll_inode_info *lli = ll_i2info(dir);
1663 struct ll_file_data *fd = (struct ll_file_data *)key;
1664 struct ll_statahead_info *sai = fd->fd_sai;
1669 spin_lock(&lli->lli_sa_lock);
1670 if (sai->sai_task) {
1671 struct task_struct *task = sai->sai_task;
1673 /* matches smp_load_acquire() in ll_statahead_thread() */
1674 smp_store_release(&sai->sai_task, NULL);
1675 wake_up_process(task);
1678 spin_unlock(&lli->lli_sa_lock);
1680 LASSERT(lli->lli_sax != NULL);
1681 ll_sax_put(dir, lli->lli_sax);
1685 * deauthorize opened dir handle @key to statahead, and notify statahead thread
1686 * to quit if it's running.
1688 void ll_deauthorize_statahead(struct inode *dir, void *key)
1690 struct ll_inode_info *lli = ll_i2info(dir);
1691 struct ll_statahead_info *sai;
1693 CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1694 PFID(&lli->lli_fid));
1696 if (lli->lli_sa_pattern & LSA_PATTERN_ADVISE) {
1697 ll_deauthorize_statahead_advise(dir, key);
1701 LASSERT(lli->lli_stat_pid != 0);
1702 LASSERT(lli->lli_opendir_key == key);
1703 spin_lock(&lli->lli_sa_lock);
1704 lli->lli_opendir_key = NULL;
1705 lli->lli_stat_pid = 0;
1706 lli->lli_sa_enabled = 0;
1707 lli->lli_sa_pattern = LSA_PATTERN_NONE;
1708 lli->lli_sa_fname_index = 0;
1709 lli->lli_sa_match_count = 0;
1711 if (sai && sai->sai_task) {
1713 * statahead thread may not have quit yet because it needs to
1714 * cache entries, now it's time to tell it to quit.
1716 * wake_up_process() provides the necessary barriers
1717 * to pair with set_current_state().
1719 struct task_struct *task = sai->sai_task;
1721 /* matches smp_load_acquire() in ll_statahead_thread() */
1722 smp_store_release(&sai->sai_task, NULL);
1723 wake_up_process(task);
1725 spin_unlock(&lli->lli_sa_lock);
1730 * not first dirent, or is "."
1732 LS_NOT_FIRST_DE = 0,
1734 * the first non-hidden dirent
1738 * the first hidden dirent, that is "."
1743 /* file is first dirent under @dir */
1744 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1746 struct qstr *target = &dentry->d_name;
1747 struct md_op_data *op_data;
1749 struct page *page = NULL;
1750 int rc = LS_NOT_FIRST_DE;
1752 struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1756 op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1757 LUSTRE_OPC_ANY, dir);
1758 if (IS_ERR(op_data))
1759 RETURN(PTR_ERR(op_data));
1761 if (IS_ENCRYPTED(dir)) {
1762 int rc2 = llcrypt_fname_alloc_buffer(dir, NAME_MAX, &lltr);
1769 *FIXME choose the start offset of the readdir
1772 page = ll_get_dir_page(dir, op_data, 0, NULL);
1775 struct lu_dirpage *dp;
1776 struct lu_dirent *ent;
1779 struct ll_inode_info *lli = ll_i2info(dir);
1782 CERROR("%s: reading dir "DFID" at %llu stat_pid = %u : rc = %d\n",
1783 ll_i2sbi(dir)->ll_fsname,
1784 PFID(ll_inode2fid(dir)), pos,
1785 lli->lli_stat_pid, rc);
1789 dp = page_address(page);
1790 for (ent = lu_dirent_start(dp); ent != NULL;
1791 ent = lu_dirent_next(ent)) {
1796 hash = le64_to_cpu(ent->lde_hash);
1798 * The ll_get_dir_page() can return any page containing
1799 * the given hash which may be not the start hash.
1801 if (unlikely(hash < pos))
1804 namelen = le16_to_cpu(ent->lde_namelen);
1805 if (unlikely(namelen == 0))
1807 * skip dummy record.
1811 name = ent->lde_name;
1812 if (name[0] == '.') {
1818 else if (name[1] == '.' && namelen == 2)
1829 if (dot_de && target->name[0] != '.') {
1830 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1831 target->len, target->name,
1836 if (IS_ENCRYPTED(dir)) {
1837 struct llcrypt_str de_name =
1838 LLTR_INIT(ent->lde_name, namelen);
1841 fid_le_to_cpu(&fid, &ent->lde_fid);
1842 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1849 if (target->len != namelen ||
1850 memcmp(target->name, name, namelen) != 0)
1851 rc = LS_NOT_FIRST_DE;
1855 rc = LS_FIRST_DOT_DE;
1857 ll_release_page(dir, page, false);
1860 pos = le64_to_cpu(dp->ldp_hash_end);
1861 if (pos == MDS_DIR_END_OFF) {
1863 * End of directory reached.
1865 ll_release_page(dir, page, false);
1869 * chain is exhausted
1870 * Normal case: continue to the next page.
1872 ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1874 page = ll_get_dir_page(dir, op_data, pos, NULL);
1879 llcrypt_fname_free_buffer(&lltr);
1880 ll_finish_md_op_data(op_data);
1885 static struct ll_statahead_info *
1886 ll_find_sai_locked(struct ll_statahead_context *ctx, pid_t pid)
1888 struct ll_statahead_info *sai;
1890 list_for_each_entry(sai, &ctx->sax_sai_list, sai_item) {
1891 if (sai->sai_pid == pid)
1897 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
1900 static int ll_shared_statahead_check(struct inode *dir, struct dentry *dentry,
1901 struct ll_statahead_context *ctx)
1903 struct ll_inode_info *lli = ll_i2info(dir);
1904 struct ll_statahead_info *sai;
1908 spin_lock(&lli->lli_sa_lock);
1911 if (sai->sai_pid == current->pid) {
1912 spin_unlock(&lli->lli_sa_lock);
1915 lli->lli_sai = NULL;
1916 lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
1919 sai = ll_find_sai_locked(ctx, current->pid);
1921 spin_unlock(&lli->lli_sa_lock);
1925 lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
1926 spin_unlock(&lli->lli_sa_lock);
1928 RETURN(start_statahead_thread(dir, dentry, true));
1932 * revalidate @dentryp from statahead cache
1934 * \param[in] dir parent directory
1935 * \param[in] sai sai structure
1936 * \param[out] dentryp pointer to dentry which will be revalidated
1937 * \param[in] unplug unplug statahead window only (normally for negative
1939 * \retval 1 on success, dentry is saved in @dentryp
1940 * \retval 0 if revalidation failed (no proper lock on client)
1941 * \retval negative number upon error
1943 static int revalidate_statahead_dentry(struct inode *dir,
1944 struct ll_statahead_context *ctx,
1945 struct dentry **dentryp,
1948 struct sa_entry *entry = NULL;
1949 struct ll_inode_info *lli = ll_i2info(dir);
1950 struct ll_statahead_info *sai = lli->lli_sai;
1951 struct ll_statahead_info *info = NULL;
1956 if (sai && (*dentryp)->d_name.name[0] == '.') {
1957 if (sai->sai_ls_all ||
1958 sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1960 * Hidden dentry is the first one, or statahead
1961 * thread does not skip so many hidden dentries
1962 * before "sai_ls_all" enabled as below.
1965 if (!sai->sai_ls_all)
1967 * It maybe because hidden dentry is not
1968 * the first one, "sai_ls_all" was not
1969 * set, then "ls -al" missed. Enable
1970 * "sai_ls_all" for such case.
1972 sai->sai_ls_all = 1;
1975 * Such "getattr" has been skipped before
1976 * "sai_ls_all" enabled as above.
1978 sai->sai_miss_hidden++;
1986 entry = sa_get(ctx, &(*dentryp)->d_name, &info);
1987 if (entry == ERR_PTR(-EINVAL)) {
1989 spin_lock(&lli->lli_sa_lock);
1990 if (sai->sai_task) {
1991 struct task_struct *task = sai->sai_task;
1994 * matches smp_load_acquire() in
1995 * ll_statahead_thread().
1996 * Notify to stop statahead thread immediately.
1998 smp_store_release(&sai->sai_task, NULL);
1999 wake_up_process(task);
2001 atomic_dec(&sai->sai_inuse_count);
2002 spin_unlock(&lli->lli_sa_lock);
2004 } else if (entry == NULL) {
2005 if (lli->lli_sa_pattern & LSA_PATTERN_FNAME)
2006 rc = ll_shared_statahead_check(dir, *dentryp, ctx);
2007 GOTO(out, rc = rc == 0 ? -EAGAIN : rc);
2010 if (lli->lli_sa_pattern & LSA_PATTERN_LIST)
2011 LASSERT(sai == entry->se_sai);
2012 else if (lli->lli_sa_pattern & LSA_PATTERN_FNAME ||
2013 lli->lli_sa_pattern == LSA_PATTERN_ADVISE)
2014 sai = entry->se_sai;
2016 sai = entry->se_sai;
2018 LASSERTF(sai != NULL, "pattern %#X entry %p se_sai %p %pd lli %p\n",
2019 lli->lli_sa_pattern, entry, entry->se_sai, *dentryp, lli);
2020 if (!sa_ready(entry)) {
2021 spin_lock(&lli->lli_sa_lock);
2022 sai->sai_index_wait = entry->se_index;
2023 spin_unlock(&lli->lli_sa_lock);
2024 rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
2025 cfs_time_seconds(30));
2028 * entry may not be ready, so it may be used by inflight
2029 * statahead RPC, don't free it.
2032 GOTO(out, rc = -EAGAIN);
2037 * We need to see the value that was set immediately before we
2040 if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
2042 struct inode *inode = entry->se_inode;
2043 struct lookup_intent it = { .it_op = IT_GETATTR,
2048 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
2049 ll_inode2fid(inode), &bits);
2051 if (!(*dentryp)->d_inode) {
2052 struct dentry *alias;
2054 alias = ll_splice_alias(inode, *dentryp);
2055 if (IS_ERR(alias)) {
2056 ll_intent_release(&it);
2057 GOTO(out, rc = PTR_ERR(alias));
2061 * statahead prepared this inode, transfer inode
2062 * refcount from sa_entry to dentry
2064 entry->se_inode = NULL;
2065 } else if ((*dentryp)->d_inode != inode) {
2066 /* revalidate, but inode is recreated */
2068 "%s: stale dentry %pd inode " DFID", statahead inode "DFID "\n",
2069 ll_i2sbi(inode)->ll_fsname, *dentryp,
2070 PFID(ll_inode2fid((*dentryp)->d_inode)),
2071 PFID(ll_inode2fid(inode)));
2072 ll_intent_release(&it);
2073 GOTO(out, rc = -ESTALE);
2076 if (bits & MDS_INODELOCK_LOOKUP) {
2077 d_lustre_revalidate(*dentryp);
2078 if (S_ISDIR(inode->i_mode))
2079 ll_update_dir_depth_dmv(dir, *dentryp);
2082 ll_intent_release(&it);
2087 * statahead cached sa_entry can be used only once, and will be killed
2088 * right after use, so if lookup/revalidate accessed statahead cache,
2089 * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
2090 * stat this file again, we know we've done statahead before, see
2091 * dentry_may_statahead().
2093 if (lld_is_init(*dentryp))
2094 ll_d2d(*dentryp)->lld_sa_generation = lli->lli_sa_generation;
2095 sa_put(dir, sai, entry);
2101 sa_pattern_list_detect(struct inode *dir, struct dentry *dchild, int *first)
2103 struct ll_inode_info *lli = ll_i2info(dir);
2105 if (lli->lli_stat_pid == 0)
2108 /* Directory listing needs to call opendir()/readdir()/stat(). */
2109 if (!(lli->lli_sa_pattern & LSA_PATTERN_OPENDIR))
2112 if (lli->lli_sa_enabled == 0)
2115 if (lli->lli_sa_pattern & LSA_PATTERN_LS_NOT_FIRST_DE)
2118 *first = is_first_dirent(dir, dchild);
2119 if (*first == LS_NOT_FIRST_DE) {
2121 * It is not "ls -{a}l" operation, no need statahead for it.
2122 * Disable statahead so that subsequent stat() won't waste
2125 spin_lock(&lli->lli_sa_lock);
2126 if (lli->lli_stat_pid == current->pid) {
2127 lli->lli_sa_enabled = 0;
2128 lli->lli_sa_pattern |= LSA_PATTERN_LS_NOT_FIRST_DE;
2130 spin_unlock(&lli->lli_sa_lock);
2134 spin_lock(&lli->lli_sa_lock);
2135 lli->lli_sa_pattern |= LSA_PATTERN_LIST;
2136 spin_unlock(&lli->lli_sa_lock);
2141 sa_pattern_fname_detect(struct inode *dir, struct dentry *dchild)
2143 struct ll_inode_info *lli = ll_i2info(dir);
2144 struct qstr *dname = &dchild->d_name;
2145 const unsigned char *name = dname->name;
2149 if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
2151 if (lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED)
2155 * Parse the format of the file name to determine whether it matches
2156 * the supported file name pattern for statahead (i.e. mdtest.$rank.$i).
2159 if (isdigit(name[i])) {
2163 if (lli->lli_stat_pid == 0) {
2164 lli->lli_stat_pid = current->pid;
2165 } else if (lli->lli_stat_pid != current->pid) {
2167 * More than two processes (MPI ranks) doing stat()
2168 * calls under this directory, consider it as a mdtest
2169 * shared dir stat() workload.
2171 spin_lock(&lli->lli_sa_lock);
2172 lli->lli_stat_pid = current->pid;
2173 if (lli->lli_sa_pattern & LSA_PATTERN_FNAME) {
2174 lli->lli_sai = NULL;
2177 lli->lli_sa_pattern |= LSA_PATTERN_FNAME;
2180 lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
2181 spin_unlock(&lli->lli_sa_lock);
2185 while (--i >= 0 && isdigit(name[i]))
2188 ret = kstrtol(&name[i], 0, &num);
2193 * The traversing program do multiple stat() calls on the same
2194 * children entry. i.e. ls $dir*.
2196 if (lli->lli_sa_fname_index == num)
2199 if (lli->lli_sa_match_count == 0 ||
2200 num == lli->lli_sa_fname_index + 1) {
2201 lli->lli_sa_match_count++;
2202 lli->lli_sa_fname_index = num;
2204 if (lli->lli_sa_match_count > LSA_FN_MATCH_HIT)
2205 GOTO(out, rc = true);
2211 spin_lock(&lli->lli_sa_lock);
2213 lli->lli_sa_pattern |= LSA_PATTERN_FNAME;
2215 lli->lli_sa_pattern = LSA_PATTERN_NONE;
2216 lli->lli_sa_match_count = 0;
2217 lli->lli_sa_fname_index = 0;
2218 lli->lli_sa_enabled = 0;
2220 spin_unlock(&lli->lli_sa_lock);
2225 /* detect the statahead pattern. */
2227 sa_pattern_detect(struct inode *dir, struct dentry *dchild, int *first)
2229 return sa_pattern_list_detect(dir, dchild, first) ||
2230 sa_pattern_fname_detect(dir, dchild);
2233 static inline int ll_sax_add_sai(struct ll_statahead_context *ctx,
2234 struct ll_statahead_info *sai)
2236 if (ll_find_sai_locked(ctx, sai->sai_pid) != NULL)
2239 list_add_tail(&sai->sai_item, &ctx->sax_sai_list);
2244 * start statahead thread
2246 * \param[in] dir parent directory
2247 * \param[in] dentry dentry that triggers statahead, normally the first
2249 * \param[in] agl indicate whether AGL is needed
2250 * \retval -EAGAIN on success, because when this function is
2251 * called, it's already in lookup call, so client should
2252 * do it itself instead of waiting for statahead thread
2253 * to do it asynchronously.
2254 * \retval negative number upon error
2256 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
2259 int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
2260 struct ll_inode_info *lli = ll_i2info(dir);
2261 struct ll_statahead_info *sai = NULL;
2262 struct ll_statahead_context *ctx = NULL;
2263 struct dentry *parent;
2264 struct task_struct *task;
2265 struct ll_sb_info *sbi;
2266 int first = LS_FIRST_DE;
2271 if (sa_pattern_detect(dir, dentry, &first) == false)
2274 parent = dget_parent(dentry);
2275 sbi = ll_i2sbi(d_inode(parent));
2276 if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
2277 sbi->ll_sa_running_max)) {
2279 "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
2281 GOTO(out, rc = -EMFILE);
2284 /* on success ll_sai_alloc holds a ref on parent */
2285 sai = ll_sai_alloc(parent);
2288 GOTO(out, rc = -ENOMEM);
2290 sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
2291 sai->sai_pid = current->pid;
2293 if (lli->lli_sa_pattern & LSA_PATTERN_FNAME) {
2294 struct qstr *dname = &dentry->d_name;
2295 const unsigned char *name = dname->name;
2299 if (dname->len >= sizeof(sai->sai_fname))
2300 GOTO(out, rc = -ERANGE);
2303 while (--i >= 0 && isdigit(name[i]))
2306 rc = kstrtol(&name[i], 0, &num);
2310 memcpy(sai->sai_fname, dname->name, i);
2311 sai->sai_fname[i] = '\0';
2312 sai->sai_fname_index = num;
2313 /* The front part of the file name is zeroed padding. */
2315 sai->sai_fname_zeroed_len = dname->len - i;
2318 /* The workload like directory listing or mdtest unique dir stat() */
2319 if (lli->lli_sa_pattern & LSA_PATTERN_LIST ||
2320 (lli->lli_sa_pattern & (LSA_PATTERN_FN_SHARED |
2321 LSA_PATTERN_FNAME)) == LSA_PATTERN_FNAME) {
2322 ctx = ll_sax_alloc(dir);
2324 GOTO(out, rc = -ENOMEM);
2327 * if current lli_opendir_key was deauthorized, or dir
2328 * re-opened by another process, don't start statahead,
2329 * otherwise the newly spawned statahead thread won't be
2332 spin_lock(&lli->lli_sa_lock);
2333 if (unlikely(lli->lli_sai || lli->lli_sax ||
2334 ((lli->lli_sa_pattern & LSA_PATTERN_LIST) &&
2335 !lli->lli_opendir_key &&
2336 lli->lli_stat_pid != current->pid))) {
2337 spin_unlock(&lli->lli_sa_lock);
2338 GOTO(out, rc = -EPERM);
2340 rc = ll_sax_add_sai(ctx, sai);
2342 spin_unlock(&lli->lli_sa_lock);
2347 spin_unlock(&lli->lli_sa_lock);
2348 } else if (lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED) {
2349 /* For mdtest shared dir stat() workload */
2350 LASSERT(lli->lli_sa_pattern & LSA_PATTERN_FNAME);
2351 ctx = ll_sax_get(dir);
2353 ctx = ll_sax_alloc(dir);
2355 GOTO(out, rc = -ENOMEM);
2357 spin_lock(&lli->lli_sa_lock);
2359 struct ll_statahead_context *tmp = ctx;
2361 if (lli->lli_sa_pattern &
2362 LSA_PATTERN_FN_SHARED) {
2365 rc = ll_sax_add_sai(ctx, sai);
2367 CWARN("%s: invalid pattern %#X.\n",
2369 lli->lli_sa_pattern);
2373 spin_unlock(&lli->lli_sa_lock);
2379 rc = ll_sax_add_sai(ctx, sai);
2380 spin_unlock(&lli->lli_sa_lock);
2383 spin_lock(&lli->lli_sa_lock);
2384 if (!(lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED)) {
2385 spin_unlock(&lli->lli_sa_lock);
2386 GOTO(out, rc = -EINVAL);
2389 rc = ll_sax_add_sai(ctx, sai);
2390 spin_unlock(&lli->lli_sa_lock);
2396 CERROR("%s: unsupported statahead pattern %#X.\n",
2397 sbi->ll_fsname, lli->lli_sa_pattern);
2398 GOTO(out, rc = -EOPNOTSUPP);
2401 CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
2402 current->pid, parent);
2404 task = kthread_create_on_node(ll_statahead_thread, sai, node,
2405 "ll_sa_%u", lli->lli_stat_pid);
2407 spin_lock(&lli->lli_sa_lock);
2408 lli->lli_sai = NULL;
2409 spin_unlock(&lli->lli_sa_lock);
2411 CERROR("can't start ll_sa thread, rc: %d\n", rc);
2415 if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
2416 ll_start_agl(parent, sai);
2418 atomic_inc(&sbi->ll_sa_total);
2419 if (lli->lli_sa_pattern & LSA_PATTERN_LIST)
2420 atomic_inc(&sbi->ll_sa_list_total);
2421 else if (lli->lli_sa_pattern & LSA_PATTERN_FNAME)
2422 atomic_inc(&sbi->ll_sa_fname_total);
2424 sai->sai_task = task;
2425 wake_up_process(task);
2427 * We don't stat-ahead for the first dirent since we are already in
2434 * once we start statahead thread failed, disable statahead so that
2435 * subsequent stat won't waste time to try it.
2437 spin_lock(&lli->lli_sa_lock);
2438 if (lli->lli_stat_pid == current->pid)
2439 lli->lli_sa_enabled = 0;
2440 spin_unlock(&lli->lli_sa_lock);
2446 ll_sax_put(dir, ctx);
2449 atomic_dec(&sbi->ll_sa_running);
2455 * Check whether statahead for @dir was started.
2457 static inline bool ll_statahead_started(struct inode *dir, bool agl)
2459 struct ll_inode_info *lli = ll_i2info(dir);
2460 struct ll_statahead_context *ctx;
2461 struct ll_statahead_info *sai;
2463 spin_lock(&lli->lli_sa_lock);
2466 if (sai && (sai->sai_agl_task != NULL) != agl)
2468 "%s: Statahead AGL hint changed from %d to %d\n",
2469 ll_i2sbi(dir)->ll_fsname,
2470 sai->sai_agl_task != NULL, agl);
2471 spin_unlock(&lli->lli_sa_lock);
2477 * statahead entry function, this is called when client getattr on a file, it
2478 * will start statahead thread if this is the first dir entry, else revalidate
2479 * dentry from statahead cache.
2481 * \param[in] dir parent directory
2482 * \param[out] dentryp dentry to getattr
2483 * \param[in] agl whether start the agl thread
2485 * \retval 1 on success
2486 * \retval 0 revalidation from statahead cache failed, caller needs
2487 * to getattr from server directly
2488 * \retval negative number on error, caller often ignores this and
2489 * then getattr from server
2491 int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
2493 if (!ll_statahead_started(dir, agl))
2494 return start_statahead_thread(dir, dentry, agl);
2499 * revalidate dentry from statahead cache.
2501 * \param[in] dir parent directory
2502 * \param[out] dentryp dentry to getattr
2503 * \param[in] unplug unplug statahead window only (normally for negative
2505 * \retval 1 on success
2506 * \retval 0 revalidation from statahead cache failed, caller needs
2507 * to getattr from server directly
2508 * \retval negative number on error, caller often ignores this and
2509 * then getattr from server
2511 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
2514 struct ll_inode_info *lli = ll_i2info(dir);
2515 struct ll_statahead_context *ctx;
2516 struct ll_statahead_info *sai = NULL;
2519 spin_lock(&lli->lli_sa_lock);
2524 atomic_inc(&sai->sai_refcount);
2525 } else if (lli->lli_sa_pattern & LSA_PATTERN_LIST) {
2526 spin_unlock(&lli->lli_sa_lock);
2531 spin_unlock(&lli->lli_sa_lock);
2533 rc = revalidate_statahead_dentry(dir, ctx, dentryp, unplug);
2534 CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
2538 ll_sax_put(dir, ctx);
2543 int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise)
2545 int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
2546 struct ll_file_data *fd = file->private_data;
2547 struct dentry *dentry = file_dentry(file);
2548 struct inode *dir = dentry->d_inode;
2549 struct ll_inode_info *lli = ll_i2info(dir);
2550 struct ll_sb_info *sbi = ll_i2sbi(dir);
2551 struct ll_statahead_info *sai = NULL;
2552 struct ll_statahead_context *ctx = NULL;
2553 struct task_struct *task;
2559 if (sbi->ll_sa_max == 0)
2562 if (!S_ISDIR(dir->i_mode))
2567 CWARN("%s: already set statahead hint for dir %pd: rc = %d\n",
2568 sbi->ll_fsname, dentry, rc);
2572 if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
2573 sbi->ll_sa_running_max)) {
2575 "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
2576 GOTO(out, rc = -EMFILE);
2579 sai = ll_sai_alloc(dentry);
2581 GOTO(out, rc = -ENOMEM);
2583 sai->sai_fstart = ladvise->lla_start;
2584 sai->sai_fend = ladvise->lla_end;
2585 sai->sai_ls_all = 0;
2586 sai->sai_max = sbi->ll_sa_max;
2587 strncpy(sai->sai_fname, ladvise->lla_fname, sizeof(sai->sai_fname));
2588 sai->sai_pid = current->pid;
2590 ctx = ll_sax_get(dir);
2592 ctx = ll_sax_alloc(dir);
2594 GOTO(out, rc = -ENOMEM);
2596 spin_lock(&lli->lli_sa_lock);
2597 if (unlikely(lli->lli_sax)) {
2598 struct ll_statahead_context *tmp = ctx;
2600 if (lli->lli_sa_pattern == LSA_PATTERN_NONE ||
2601 lli->lli_sa_pattern == LSA_PATTERN_ADVISE) {
2602 lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2605 fd->fd_sai = __ll_sai_get(sai);
2609 CWARN("%s: pattern %X is not ADVISE: rc = %d\n",
2610 sbi->ll_fsname, lli->lli_sa_pattern, rc);
2613 spin_unlock(&lli->lli_sa_lock);
2618 lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2620 fd->fd_sai = __ll_sai_get(sai);
2621 spin_unlock(&lli->lli_sa_lock);
2624 spin_lock(&lli->lli_sa_lock);
2625 if (!(lli->lli_sa_pattern == LSA_PATTERN_ADVISE ||
2626 lli->lli_sa_pattern == LSA_PATTERN_NONE)) {
2627 spin_unlock(&lli->lli_sa_lock);
2628 GOTO(out, rc = -EINVAL);
2631 lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2632 fd->fd_sai = __ll_sai_get(sai);
2633 spin_unlock(&lli->lli_sa_lock);
2638 "start statahead thread: [pid %d] [parent %pd] sai %p ctx %p\n",
2639 current->pid, dentry, sai, ctx);
2641 task = kthread_create_on_node(ll_statahead_thread, sai, node,
2642 "ll_sa_%u", current->pid);
2645 CERROR("%s: cannot start ll_sa thread: rc = %d\n",
2646 sbi->ll_fsname, rc);
2650 if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
2651 ll_start_agl(dentry, sai);
2653 atomic_inc(&sbi->ll_sa_total);
2654 sai->sai_task = task;
2655 wake_up_process(task);
2661 ll_sax_put(dir, ctx);
2669 ll_sax_put(dir, ctx);
2671 atomic_dec(&sbi->ll_sa_running);
2676 * This function is called in each stat() system call to do statahead check.
2677 * When the files' naming of stat() call sequence under a directory follows
2678 * a certain name rule roughly, this directory is considered as an condicant
2680 * For an example, the file naming rule is mdtest.$rank.$i, the suffix of
2681 * the stat() dentry name is number and do stat() for dentries with name
2682 * ending with number more than @LSA_FN_PREDICT_HIT, then the corresponding
2683 * directory is met the requrirement for statahead.
2685 void ll_statahead_enter(struct inode *dir, struct dentry *dchild)
2687 struct ll_inode_info *lli;
2688 struct qstr *dname = &dchild->d_name;
2690 if (ll_i2sbi(dir)->ll_sa_max == 0)
2693 if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
2696 lli = ll_i2info(dir);
2697 if (lli->lli_sa_enabled)
2700 if (lli->lli_sa_pattern & (LSA_PATTERN_FN_PREDICT | LSA_PATTERN_LIST))
2704 * Now support number indexing regularized statahead pattern only.
2705 * Quick check whether the last character is digit.
2707 if (!isdigit(dname->name[dname->len - 1])) {
2708 lli->lli_sa_pattern &= ~LSA_PATTERN_FN_PREDICT;
2709 lli->lli_sa_match_count = 0;
2713 lli->lli_sa_match_count++;
2714 if (lli->lli_sa_match_count > LSA_FN_PREDICT_HIT) {
2715 spin_lock(&lli->lli_sa_lock);
2716 lli->lli_sa_pattern |= LSA_PATTERN_FN_PREDICT;
2717 spin_unlock(&lli->lli_sa_lock);
2718 lli->lli_sa_enabled = 1;
2719 lli->lli_sa_match_count = 0;