4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
33 #include <linux/sched.h>
34 #include <linux/kthread.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38 #include <linux/delay.h>
40 #define DEBUG_SUBSYSTEM S_LLITE
42 #include <obd_support.h>
43 #include <lustre_dlm.h>
44 #include "llite_internal.h"
46 #define SA_OMITTED_ENTRY_MAX 8ULL
49 /** negative values are for error cases */
50 SA_ENTRY_INIT = 0, /** init entry */
51 SA_ENTRY_SUCC = 1, /** stat succeed */
52 SA_ENTRY_INVA = 2, /** invalid entry */
56 * sa_entry is not refcounted: statahead thread allocates it and do async stat,
57 * and in async stat callback ll_statahead_interpret() will prepare the inode
58 * and set lock data in the ptlrpcd context. Then the scanner process will be
59 * woken up if this entry is the waiting one, can access and free it.
62 /* link into sai_entries */
63 struct list_head se_list;
64 /* link into sai hash table locally */
65 struct list_head se_hash;
66 /* entry index in the sai */
68 /* low layer ldlm lock handle */
72 /* entry size, contains name */
74 /* pointer to the target inode */
75 struct inode *se_inode;
76 /* pointer to @sai per process struct */
77 struct ll_statahead_info *se_sai;
84 static unsigned int sai_generation;
85 static DEFINE_SPINLOCK(sai_generation_lock);
87 static inline int sa_unhashed(struct sa_entry *entry)
89 return list_empty(&entry->se_hash);
92 /* sa_entry is ready to use */
93 static inline int sa_ready(struct sa_entry *entry)
95 /* Make sure sa_entry is updated and ready to use */
97 return (entry->se_state != SA_ENTRY_INIT);
100 /* hash value to put in sai_cache */
101 static inline int sa_hash(int val)
103 return val & LL_SA_CACHE_MASK;
106 /* hash entry into sax_cache */
108 sa_rehash(struct ll_statahead_context *ctx, struct sa_entry *entry)
110 int i = sa_hash(entry->se_qstr.hash);
112 spin_lock(&ctx->sax_cache_lock[i]);
113 list_add_tail(&entry->se_hash, &ctx->sax_cache[i]);
114 spin_unlock(&ctx->sax_cache_lock[i]);
117 /* unhash entry from sai_cache */
119 sa_unhash(struct ll_statahead_context *ctx, struct sa_entry *entry)
121 int i = sa_hash(entry->se_qstr.hash);
123 spin_lock(&ctx->sax_cache_lock[i]);
124 list_del_init(&entry->se_hash);
125 spin_unlock(&ctx->sax_cache_lock[i]);
128 static inline int agl_should_run(struct ll_statahead_info *sai,
131 return inode && S_ISREG(inode->i_mode) && sai->sai_agl_task;
134 static inline struct ll_inode_info *
135 agl_first_entry(struct ll_statahead_info *sai)
137 return list_first_entry(&sai->sai_agls, struct ll_inode_info,
141 /* statahead window is full */
142 static inline int sa_sent_full(struct ll_statahead_info *sai)
144 return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
147 /* Batch metadata handle */
148 static inline bool sa_has_batch_handle(struct ll_statahead_info *sai)
150 return sai->sai_bh != NULL;
153 static inline void ll_statahead_flush_nowait(struct ll_statahead_info *sai)
155 if (sa_has_batch_handle(sai)) {
156 sai->sai_index_end = sai->sai_index - 1;
157 (void) md_batch_flush(ll_i2mdexp(sai->sai_dentry->d_inode),
162 static inline int agl_list_empty(struct ll_statahead_info *sai)
164 return list_empty(&sai->sai_agls);
168 * (1) hit ratio less than 80%
170 * (2) consecutive miss more than 8
171 * then means low hit.
173 static inline int sa_low_hit(struct ll_statahead_info *sai)
175 return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
176 (sai->sai_consecutive_miss > 8));
180 * if the given index is behind of statahead window more than
181 * SA_OMITTED_ENTRY_MAX, then it is old.
183 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
185 return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
189 /* allocate sa_entry and hash it to allow scanner process to find it */
190 static struct sa_entry *
191 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
192 const char *name, int len, const struct lu_fid *fid)
194 struct ll_inode_info *lli;
195 struct sa_entry *entry;
201 entry_size = sizeof(struct sa_entry) +
202 round_up(len + 1 /* for trailing NUL */, 4);
203 OBD_ALLOC(entry, entry_size);
204 if (unlikely(!entry))
205 RETURN(ERR_PTR(-ENOMEM));
207 CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
208 len, name, entry, index);
210 entry->se_index = index;
213 entry->se_state = SA_ENTRY_INIT;
214 entry->se_size = entry_size;
215 dname = (char *)entry + sizeof(struct sa_entry);
216 memcpy(dname, name, len);
218 entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
219 entry->se_qstr.len = len;
220 entry->se_qstr.name = dname;
223 entry->se_fid = *fid;
225 lli = ll_i2info(sai->sai_dentry->d_inode);
226 spin_lock(&lli->lli_sa_lock);
227 INIT_LIST_HEAD(&entry->se_list);
228 sa_rehash(lli->lli_sax, entry);
229 spin_unlock(&lli->lli_sa_lock);
231 atomic_inc(&sai->sai_cache_count);
236 /* free sa_entry, which should have been unhashed and not in any list */
237 static void sa_free(struct ll_statahead_context *ctx, struct sa_entry *entry)
239 CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
240 entry->se_qstr.len, entry->se_qstr.name, entry,
243 LASSERT(list_empty(&entry->se_list));
244 LASSERT(sa_unhashed(entry));
246 OBD_FREE(entry, entry->se_size);
250 * find sa_entry by name, used by directory scanner, lock is not needed because
251 * only scanner can remove the entry from cache.
253 static struct sa_entry *
254 sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
256 struct sa_entry *entry;
257 int i = sa_hash(qstr->hash);
259 spin_lock(&ctx->sax_cache_lock[i]);
260 list_for_each_entry(entry, &ctx->sax_cache[i], se_hash) {
261 if (entry->se_qstr.hash == qstr->hash &&
262 entry->se_qstr.len == qstr->len &&
263 memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) {
264 spin_unlock(&ctx->sax_cache_lock[i]);
268 spin_unlock(&ctx->sax_cache_lock[i]);
272 /* unhash and unlink sa_entry, and then free it */
274 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry, bool locked)
276 struct inode *dir = sai->sai_dentry->d_inode;
277 struct ll_inode_info *lli = ll_i2info(dir);
278 struct ll_statahead_context *ctx = lli->lli_sax;
280 LASSERT(!sa_unhashed(entry));
281 LASSERT(!list_empty(&entry->se_list));
282 LASSERT(sa_ready(entry));
284 sa_unhash(ctx, entry);
287 spin_lock(&lli->lli_sa_lock);
288 list_del_init(&entry->se_list);
289 spin_unlock(&lli->lli_sa_lock);
291 iput(entry->se_inode);
292 atomic_dec(&sai->sai_cache_count);
295 spin_lock(&lli->lli_sa_lock);
298 /* called by scanner after use, sa_entry will be killed */
300 sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
302 struct ll_inode_info *lli = ll_i2info(dir);
303 struct sa_entry *tmp;
306 if (entry && entry->se_state == SA_ENTRY_SUCC) {
307 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
310 sai->sai_consecutive_miss = 0;
311 if (sai->sai_max < sbi->ll_sa_max) {
312 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
314 } else if (sai->sai_max_batch_count > 0) {
315 if (sai->sai_max >= sai->sai_max_batch_count &&
316 (sai->sai_index_end - entry->se_index) %
317 sai->sai_max_batch_count == 0) {
319 } else if (entry->se_index == sai->sai_index_end) {
327 sai->sai_consecutive_miss++;
332 sa_kill(sai, entry, false);
336 * kill old completed entries. Maybe kicking old entries can
339 spin_lock(&lli->lli_sa_lock);
340 while ((tmp = list_first_entry_or_null(&sai->sai_entries,
341 struct sa_entry, se_list))) {
342 if (!is_omitted_entry(sai, tmp->se_index))
345 /* ll_sa_lock is dropped by sa_kill(), restart list */
346 sa_kill(sai, tmp, true);
348 spin_unlock(&lli->lli_sa_lock);
351 spin_lock(&lli->lli_sa_lock);
352 if (wakeup && sai->sai_task)
353 wake_up_process(sai->sai_task);
354 spin_unlock(&lli->lli_sa_lock);
358 * update state and sort add entry to sai_entries by index, return true if
359 * scanner is waiting on this entry.
362 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
365 struct list_head *pos = &sai->sai_entries;
366 __u64 index = entry->se_index;
368 LASSERT(!sa_ready(entry));
369 LASSERT(list_empty(&entry->se_list));
371 list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
372 if (se->se_index < entry->se_index) {
377 list_add(&entry->se_list, pos);
379 * LU-9210: ll_statahead_interpet must be able to see this before
382 smp_store_release(&entry->se_state,
383 ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
385 return (index == sai->sai_index_wait);
388 /* finish async stat RPC arguments */
389 static void sa_fini_data(struct md_op_item *item)
391 struct md_op_data *op_data = &item->mop_data;
393 if (op_data->op_flags & MF_OPNAME_KMALLOCED)
394 /* allocated via ll_setup_filename called from sa_prep_data */
395 kfree(op_data->op_name);
396 ll_unlock_md_op_lsm(&item->mop_data);
398 if (item->mop_subpill_allocated)
399 OBD_FREE_PTR(item->mop_pill);
403 static int ll_statahead_interpret(struct md_op_item *item, int rc);
406 * prepare arguments for async stat RPC.
408 static struct md_op_item *
409 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
411 struct md_op_item *item;
412 struct ldlm_enqueue_info *einfo;
413 struct md_op_data *op_data;
417 return ERR_PTR(-ENOMEM);
419 op_data = ll_prep_md_op_data(&item->mop_data, dir, child,
420 entry->se_qstr.name, entry->se_qstr.len, 0,
421 LUSTRE_OPC_ANY, NULL);
422 if (IS_ERR(op_data)) {
424 return (struct md_op_item *)op_data;
428 op_data->op_fid2 = entry->se_fid;
430 item->mop_opc = MD_OP_GETATTR;
431 item->mop_it.it_op = IT_GETATTR;
432 item->mop_dir = igrab(dir);
433 item->mop_cb = ll_statahead_interpret;
434 item->mop_cbdata = entry;
436 einfo = &item->mop_einfo;
437 einfo->ei_type = LDLM_IBITS;
438 einfo->ei_mode = it_to_lock_mode(&item->mop_it);
439 einfo->ei_cb_bl = ll_md_blocking_ast;
440 einfo->ei_cb_cp = ldlm_completion_ast;
441 einfo->ei_cb_gl = NULL;
442 einfo->ei_cbdata = NULL;
443 einfo->ei_req_slot = 1;
449 * release resources used in async stat RPC, update entry state and wakeup if
450 * scanner process it waiting on this entry.
453 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
455 struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
458 spin_lock(&lli->lli_sa_lock);
459 wakeup = __sa_make_ready(sai, entry, ret);
460 spin_unlock(&lli->lli_sa_lock);
463 wake_up(&sai->sai_waitq);
466 /* insert inode into the list of sai_agls */
467 static void ll_agl_add(struct ll_statahead_info *sai,
468 struct inode *inode, int index)
470 struct ll_inode_info *child = ll_i2info(inode);
471 struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
473 spin_lock(&child->lli_agl_lock);
474 if (child->lli_agl_index == 0) {
475 child->lli_agl_index = index;
476 spin_unlock(&child->lli_agl_lock);
478 LASSERT(list_empty(&child->lli_agl_list));
480 spin_lock(&parent->lli_agl_lock);
481 /* Re-check under the lock */
482 if (agl_should_run(sai, inode)) {
483 if (agl_list_empty(sai))
484 wake_up_process(sai->sai_agl_task);
486 list_add_tail(&child->lli_agl_list, &sai->sai_agls);
488 child->lli_agl_index = 0;
489 spin_unlock(&parent->lli_agl_lock);
491 spin_unlock(&child->lli_agl_lock);
496 static struct ll_statahead_context *ll_sax_alloc(struct inode *dir)
498 struct ll_statahead_context *ctx;
507 ctx->sax_inode = igrab(dir);
508 atomic_set(&ctx->sax_refcount, 1);
509 INIT_LIST_HEAD(&ctx->sax_sai_list);
510 for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
511 INIT_LIST_HEAD(&ctx->sax_cache[i]);
512 spin_lock_init(&ctx->sax_cache_lock[i]);
518 static inline void ll_sax_free(struct ll_statahead_context *ctx)
520 LASSERT(ctx->sax_inode != NULL);
521 iput(ctx->sax_inode);
525 static inline void __ll_sax_get(struct ll_statahead_context *ctx)
527 atomic_inc(&ctx->sax_refcount);
530 static inline struct ll_statahead_context *ll_sax_get(struct inode *dir)
532 struct ll_inode_info *lli = ll_i2info(dir);
533 struct ll_statahead_context *ctx = NULL;
535 spin_lock(&lli->lli_sa_lock);
539 spin_unlock(&lli->lli_sa_lock);
544 static inline void ll_sax_put(struct inode *dir,
545 struct ll_statahead_context *ctx)
547 struct ll_inode_info *lli = ll_i2info(dir);
549 if (atomic_dec_and_lock(&ctx->sax_refcount, &lli->lli_sa_lock)) {
550 LASSERT(list_empty(&ctx->sax_sai_list));
553 if (lli->lli_sa_pattern & (LSA_PATTERN_ADVISE |
554 LSA_PATTERN_FNAME)) {
555 lli->lli_opendir_key = NULL;
556 lli->lli_stat_pid = 0;
557 lli->lli_sa_enabled = 0;
559 lli->lli_sa_pattern = LSA_PATTERN_NONE;
560 spin_unlock(&lli->lli_sa_lock);
567 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
569 struct ll_statahead_info *sai;
570 struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
578 sai->sai_dentry = dget(dentry);
579 atomic_set(&sai->sai_refcount, 1);
580 sai->sai_max = ll_i2sbi(dentry->d_inode)->ll_sa_min;
582 init_waitqueue_head(&sai->sai_waitq);
584 INIT_LIST_HEAD(&sai->sai_item);
585 INIT_LIST_HEAD(&sai->sai_entries);
586 INIT_LIST_HEAD(&sai->sai_agls);
588 atomic_set(&sai->sai_cache_count, 0);
590 spin_lock(&sai_generation_lock);
591 lli->lli_sa_generation = ++sai_generation;
592 if (unlikely(sai_generation == 0))
593 lli->lli_sa_generation = ++sai_generation;
594 spin_unlock(&sai_generation_lock);
600 static inline void ll_sai_free(struct ll_statahead_info *sai)
602 LASSERT(sai->sai_dentry != NULL);
603 dput(sai->sai_dentry);
607 static inline struct ll_statahead_info *
608 __ll_sai_get(struct ll_statahead_info *sai)
610 atomic_inc(&sai->sai_refcount);
615 * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
618 static void ll_sai_put(struct ll_statahead_info *sai)
620 struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
622 if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
623 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
624 struct sa_entry *entry, *next;
627 list_del_init(&sai->sai_item);
628 spin_unlock(&lli->lli_sa_lock);
630 LASSERT(!sai->sai_task);
631 LASSERT(!sai->sai_agl_task);
632 LASSERT(sai->sai_sent == sai->sai_replied);
634 list_for_each_entry_safe(entry, next, &sai->sai_entries,
636 sa_kill(sai, entry, false);
638 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
639 LASSERT(agl_list_empty(sai));
642 atomic_dec(&sbi->ll_sa_running);
646 /* Do NOT forget to drop inode refcount when into sai_agls. */
647 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
649 struct ll_inode_info *lli = ll_i2info(inode);
650 u64 index = lli->lli_agl_index;
656 LASSERT(list_empty(&lli->lli_agl_list));
658 /* AGL maybe fall behind statahead with one entry */
659 if (is_omitted_entry(sai, index + 1)) {
660 lli->lli_agl_index = 0;
666 * In case of restore, the MDT has the right size and has already
667 * sent it back without granting the layout lock, inode is up-to-date.
668 * Then AGL (async glimpse lock) is useless.
669 * Also to glimpse we need the layout, in case of a runninh restore
670 * the MDT holds the layout lock so the glimpse will block up to the
671 * end of restore (statahead/agl will block)
673 if (test_bit(LLIF_FILE_RESTORING, &lli->lli_flags)) {
674 lli->lli_agl_index = 0;
679 /* Someone is in glimpse (sync or async), do nothing. */
680 rc = down_write_trylock(&lli->lli_glimpse_sem);
682 lli->lli_agl_index = 0;
688 * Someone triggered glimpse within 1 sec before.
689 * 1) The former glimpse succeeded with glimpse lock granted by OST, and
690 * if the lock is still cached on client, AGL needs to do nothing. If
691 * it is cancelled by other client, AGL maybe cannot obtaion new lock
692 * for no glimpse callback triggered by AGL.
693 * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
694 * Under such case, it is quite possible that the OST will not grant
695 * glimpse lock for AGL also.
696 * 3) The former glimpse failed, compared with other two cases, it is
697 * relative rare. AGL can ignore such case, and it will not muchly
698 * affect the performance.
700 expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
701 if (ktime_to_ns(lli->lli_glimpse_time) &&
702 ktime_before(expire, lli->lli_glimpse_time)) {
703 up_write(&lli->lli_glimpse_sem);
704 lli->lli_agl_index = 0;
710 "Handling (init) async glimpse: inode = " DFID", idx = %llu\n",
711 PFID(&lli->lli_fid), index);
714 lli->lli_agl_index = 0;
715 lli->lli_glimpse_time = ktime_get();
716 up_write(&lli->lli_glimpse_sem);
719 "Handled (init) async glimpse: inode= " DFID", idx = %llu, rc = %d\n",
720 PFID(&lli->lli_fid), index, rc);
727 static void ll_statahead_interpret_fini(struct ll_inode_info *lli,
728 struct ll_statahead_info *sai,
729 struct md_op_item *item,
730 struct sa_entry *entry,
731 struct ptlrpc_request *req,
735 * First it will drop ldlm ibits lock refcount by calling
736 * ll_intent_drop_lock() in spite of failures. Do not worry about
737 * calling ll_intent_drop_lock() more than once.
739 ll_intent_release(&item->mop_it);
742 ptlrpc_req_finished(req);
743 sa_make_ready(sai, entry, rc);
745 spin_lock(&lli->lli_sa_lock);
747 spin_unlock(&lli->lli_sa_lock);
750 static void ll_statahead_interpret_work(struct work_struct *work)
752 struct md_op_item *item = container_of(work, struct md_op_item,
754 struct req_capsule *pill = item->mop_pill;
755 struct inode *dir = item->mop_dir;
756 struct ll_inode_info *lli = ll_i2info(dir);
757 struct ll_statahead_info *sai;
758 struct lookup_intent *it;
759 struct sa_entry *entry;
760 struct mdt_body *body;
766 entry = (struct sa_entry *)item->mop_cbdata;
767 LASSERT(entry->se_handle != 0);
771 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
773 GOTO(out, rc = -EFAULT);
775 child = entry->se_inode;
776 /* revalidate; unlinked and re-created with the same name */
777 if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
778 !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
780 entry->se_inode = NULL;
783 /* The mdt_body is invalid. Skip this entry */
784 GOTO(out, rc = -EAGAIN);
787 it->it_lock_handle = entry->se_handle;
788 rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
790 GOTO(out, rc = -EAGAIN);
792 rc = ll_prep_inode(&child, pill, dir->i_sb, it);
794 CERROR("%s: getattr callback for %.*s "DFID": rc = %d\n",
795 ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
796 entry->se_qstr.name, PFID(&entry->se_fid), rc);
800 /* If encryption context was returned by MDT, put it in
801 * inode now to save an extra getxattr.
803 if (body->mbo_valid & OBD_MD_ENCCTX) {
804 void *encctx = req_capsule_server_get(pill, &RMF_FILE_ENCCTX);
805 __u32 encctxlen = req_capsule_get_size(pill, &RMF_FILE_ENCCTX,
810 "server returned encryption ctx for "DFID"\n",
811 PFID(ll_inode2fid(child)));
812 rc = ll_xattr_cache_insert(child,
813 xattr_for_enc(child),
816 CWARN("%s: cannot set enc ctx for "DFID": rc = %d\n",
817 ll_i2sbi(child)->ll_fsname,
818 PFID(ll_inode2fid(child)), rc);
822 CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
823 ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
824 entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
825 ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
827 entry->se_inode = child;
829 if (agl_should_run(sai, child))
830 ll_agl_add(sai, child, entry->se_index);
832 ll_statahead_interpret_fini(lli, sai, item, entry, pill->rc_req, rc);
836 * Callback for async stat RPC, this is called in ptlrpcd context. It prepares
837 * the inode and set lock data directly in the ptlrpcd context. It will wake up
838 * the directory listing process if the dentry is the waiting one.
840 static int ll_statahead_interpret(struct md_op_item *item, int rc)
842 struct req_capsule *pill = item->mop_pill;
843 struct lookup_intent *it = &item->mop_it;
844 struct inode *dir = item->mop_dir;
845 struct ll_inode_info *lli = ll_i2info(dir);
846 struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
847 struct work_struct *work = &item->mop_work;
848 struct ll_statahead_info *sai;
849 struct mdt_body *body;
855 if (it_disposition(it, DISP_LOOKUP_NEG))
859 * because statahead thread will wait for all inflight RPC to finish,
860 * sai should be always valid, no need to refcount
862 LASSERT(entry != NULL);
864 LASSERT(sai != NULL);
866 CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
867 entry->se_qstr.len, entry->se_qstr.name, rc);
872 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
874 GOTO(out, rc = -EFAULT);
876 child = entry->se_inode;
878 * revalidate; unlinked and re-created with the same name.
879 * exclude the case where FID is zero as it was from statahead with
880 * regularized file name pattern and had no idea for the FID of the
883 if (unlikely(!fid_is_zero(&item->mop_data.op_fid2) &&
884 !lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
886 entry->se_inode = NULL;
889 /* The mdt_body is invalid. Skip this entry */
890 GOTO(out, rc = -EAGAIN);
893 entry->se_handle = it->it_lock_handle;
895 * In ptlrpcd context, it is not allowed to generate new RPCs
896 * especially for striped directories or regular files with layout
900 * release ibits lock ASAP to avoid deadlock when statahead
901 * thread enqueues lock on parent in readdir and another
902 * process enqueues lock on child with parent lock held, eg.
905 handle = it->it_lock_handle;
906 ll_intent_drop_lock(it);
907 ll_unlock_md_op_lsm(&item->mop_data);
910 * If the statahead entry is a striped directory or regular file with
911 * layout change, it will generate a new RPC and long wait in the
913 * However, it is dangerous of blocking in ptlrpcd thread.
914 * Here we use work queue or the separate statahead thread to handle
915 * the extra RPC and long wait:
916 * (@ll_prep_inode->@lmv_revalidate_slaves);
917 * (@ll_prep_inode->@lov_layout_change->osc_cache_wait_range);
919 INIT_WORK(work, ll_statahead_interpret_work);
920 ptlrpc_request_addref(pill->rc_req);
924 ll_statahead_interpret_fini(lli, sai, item, entry, NULL, rc);
928 static inline int sa_getattr(struct ll_statahead_info *sai, struct inode *dir,
929 struct md_op_item *item)
933 if (sa_has_batch_handle(sai))
934 rc = md_batch_add(ll_i2mdexp(dir), sai->sai_bh, item);
936 rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
941 /* async stat for file not found in dcache */
942 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
944 struct md_op_item *item;
949 item = sa_prep_data(dir, NULL, entry);
951 RETURN(PTR_ERR(item));
953 rc = sa_getattr(entry->se_sai, dir, item);
961 * async stat for file found in dcache, similar to .revalidate
963 * \retval 1 dentry valid, no RPC sent
964 * \retval 0 dentry invalid, will send async stat RPC
965 * \retval negative number upon error
967 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
968 struct dentry *dentry)
970 struct inode *inode = dentry->d_inode;
971 struct lookup_intent it = { .it_op = IT_GETATTR,
972 .it_lock_handle = 0 };
973 struct md_op_item *item;
978 if (unlikely(!inode))
981 if (d_mountpoint(dentry))
984 item = sa_prep_data(dir, inode, entry);
986 RETURN(PTR_ERR(item));
988 entry->se_inode = igrab(inode);
989 rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
992 entry->se_handle = it.it_lock_handle;
993 ll_intent_release(&it);
998 rc = sa_getattr(entry->se_sai, dir, item);
1000 entry->se_inode = NULL;
1008 /* async stat for file with @name */
1009 static void sa_statahead(struct ll_statahead_info *sai, struct dentry *parent,
1010 const char *name, int len, const struct lu_fid *fid)
1012 struct inode *dir = parent->d_inode;
1013 struct dentry *dentry = NULL;
1014 struct sa_entry *entry;
1019 entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
1023 dentry = d_lookup(parent, &entry->se_qstr);
1025 rc = sa_lookup(dir, entry);
1027 rc = sa_revalidate(dir, entry, dentry);
1028 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
1029 ll_agl_add(sai, dentry->d_inode, entry->se_index);
1036 sa_make_ready(sai, entry, rc);
1042 if (sa_sent_full(sai))
1043 ll_statahead_flush_nowait(sai);
1048 /* async glimpse (agl) thread main function */
1049 static int ll_agl_thread(void *arg)
1052 * We already own this reference, so it is safe to take it
1055 struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
1056 struct dentry *parent = sai->sai_dentry;
1057 struct inode *dir = parent->d_inode;
1058 struct ll_inode_info *plli = ll_i2info(dir);
1059 struct ll_inode_info *clli;
1063 CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
1066 while (({set_current_state(TASK_IDLE);
1067 !kthread_should_stop(); })) {
1068 spin_lock(&plli->lli_agl_lock);
1069 clli = list_first_entry_or_null(&sai->sai_agls,
1070 struct ll_inode_info,
1073 __set_current_state(TASK_RUNNING);
1074 list_del_init(&clli->lli_agl_list);
1075 spin_unlock(&plli->lli_agl_lock);
1076 ll_agl_trigger(&clli->lli_vfs_inode, sai);
1079 spin_unlock(&plli->lli_agl_lock);
1083 __set_current_state(TASK_RUNNING);
1087 static void ll_stop_agl(struct ll_statahead_info *sai)
1089 struct dentry *parent = sai->sai_dentry;
1090 struct ll_inode_info *plli = ll_i2info(parent->d_inode);
1091 struct ll_inode_info *clli;
1092 struct task_struct *agl_task;
1094 spin_lock(&plli->lli_agl_lock);
1095 agl_task = sai->sai_agl_task;
1096 sai->sai_agl_task = NULL;
1097 spin_unlock(&plli->lli_agl_lock);
1101 CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
1102 sai, (unsigned int)agl_task->pid);
1103 kthread_stop(agl_task);
1105 spin_lock(&plli->lli_agl_lock);
1106 while ((clli = list_first_entry_or_null(&sai->sai_agls,
1107 struct ll_inode_info,
1108 lli_agl_list)) != NULL) {
1109 list_del_init(&clli->lli_agl_list);
1110 spin_unlock(&plli->lli_agl_lock);
1111 clli->lli_agl_index = 0;
1112 iput(&clli->lli_vfs_inode);
1113 spin_lock(&plli->lli_agl_lock);
1115 spin_unlock(&plli->lli_agl_lock);
1116 CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
1121 /* start agl thread */
1122 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
1124 int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1125 struct ll_inode_info *plli;
1126 struct task_struct *task;
1130 CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
1133 plli = ll_i2info(parent->d_inode);
1134 task = kthread_create_on_node(ll_agl_thread, sai, node, "ll_agl_%d",
1135 plli->lli_stat_pid);
1137 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
1140 sai->sai_agl_task = task;
1141 atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
1142 /* Get an extra reference that the thread holds */
1145 wake_up_process(task);
1150 static int ll_statahead_by_list(struct dentry *parent)
1152 struct inode *dir = parent->d_inode;
1153 struct ll_inode_info *lli = ll_i2info(dir);
1154 struct ll_statahead_info *sai = lli->lli_sai;
1155 struct ll_sb_info *sbi = ll_i2sbi(dir);
1156 struct md_op_data *op_data;
1157 struct page *page = NULL;
1164 CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1167 OBD_ALLOC_PTR(op_data);
1171 while (pos != MDS_DIR_END_OFF &&
1172 /* matches smp_store_release() in ll_deauthorize_statahead() */
1173 smp_load_acquire(&sai->sai_task) &&
1174 lli->lli_sa_enabled) {
1175 struct lu_dirpage *dp;
1176 struct lu_dirent *ent;
1178 op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
1179 LUSTRE_OPC_ANY, dir);
1180 if (IS_ERR(op_data)) {
1181 rc = PTR_ERR(op_data);
1185 page = ll_get_dir_page(dir, op_data, pos, NULL);
1186 ll_unlock_md_op_lsm(op_data);
1190 "error reading dir "DFID" at %llu /%llu stat_pid = %u: rc = %d\n",
1191 PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1192 lli->lli_stat_pid, rc);
1196 dp = page_address(page);
1197 for (ent = lu_dirent_start(dp);
1198 /* matches smp_store_release() in ll_deauthorize_statahead() */
1199 ent != NULL && smp_load_acquire(&sai->sai_task) &&
1200 !sa_low_hit(sai) && lli->lli_sa_enabled;
1201 ent = lu_dirent_next(ent)) {
1206 struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1208 hash = le64_to_cpu(ent->lde_hash);
1209 if (unlikely(hash < pos))
1211 * Skip until we find target hash value.
1215 namelen = le16_to_cpu(ent->lde_namelen);
1216 if (unlikely(namelen == 0))
1218 * Skip dummy record.
1222 name = ent->lde_name;
1223 if (name[0] == '.') {
1229 } else if (name[1] == '.' && namelen == 2) {
1234 } else if (!sai->sai_ls_all) {
1236 * skip hidden files.
1238 sai->sai_skip_hidden++;
1244 * don't stat-ahead first entry.
1246 if (unlikely(++first == 1))
1249 fid_le_to_cpu(&fid, &ent->lde_fid);
1251 while (({set_current_state(TASK_IDLE);
1252 /* matches smp_store_release() in
1253 * ll_deauthorize_statahead()
1255 smp_load_acquire(&sai->sai_task); })) {
1258 spin_lock(&lli->lli_agl_lock);
1259 while (sa_sent_full(sai) &&
1260 !agl_list_empty(sai)) {
1261 struct ll_inode_info *clli;
1263 __set_current_state(TASK_RUNNING);
1264 clli = agl_first_entry(sai);
1265 list_del_init(&clli->lli_agl_list);
1266 spin_unlock(&lli->lli_agl_lock);
1268 ll_agl_trigger(&clli->lli_vfs_inode,
1271 spin_lock(&lli->lli_agl_lock);
1273 spin_unlock(&lli->lli_agl_lock);
1275 if (!sa_sent_full(sai))
1279 * If the thread is not doing stat in
1280 * @sbi->ll_sa_timeout (30s) then it probably
1281 * does not care too much about performance,
1282 * or is no longer using this directory.
1283 * Stop the statahead thread in this case.
1285 timeout = schedule_timeout(
1286 cfs_time_seconds(sbi->ll_sa_timeout));
1288 lli->lli_sa_enabled = 0;
1292 __set_current_state(TASK_RUNNING);
1294 if (IS_ENCRYPTED(dir)) {
1295 struct llcrypt_str de_name =
1296 LLTR_INIT(ent->lde_name, namelen);
1299 rc = llcrypt_fname_alloc_buffer(dir, NAME_MAX,
1304 fid_le_to_cpu(&fid, &ent->lde_fid);
1305 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1307 llcrypt_fname_free_buffer(&lltr);
1315 sa_statahead(sai, parent, name, namelen, &fid);
1316 llcrypt_fname_free_buffer(&lltr);
1319 pos = le64_to_cpu(dp->ldp_hash_end);
1320 ll_release_page(dir, page,
1321 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1323 if (sa_low_hit(sai)) {
1325 atomic_inc(&sbi->ll_sa_wrong);
1327 "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stoppingstatahead thread: pid %d\n",
1328 PFID(&lli->lli_fid), sai->sai_hit,
1329 sai->sai_miss, sai->sai_sent,
1330 sai->sai_replied, current->pid);
1334 ll_finish_md_op_data(op_data);
1339 static void ll_statahead_handle(struct ll_statahead_info *sai,
1340 struct dentry *parent, const char *name,
1341 int len, const struct lu_fid *fid)
1343 struct inode *dir = parent->d_inode;
1344 struct ll_inode_info *lli = ll_i2info(dir);
1345 struct ll_sb_info *sbi = ll_i2sbi(dir);
1348 while (({set_current_state(TASK_IDLE);
1349 /* matches smp_store_release() in ll_deauthorize_statahead() */
1350 smp_load_acquire(&sai->sai_task); })) {
1351 spin_lock(&lli->lli_agl_lock);
1352 while (sa_sent_full(sai) && !agl_list_empty(sai)) {
1353 struct ll_inode_info *clli;
1355 __set_current_state(TASK_RUNNING);
1356 clli = agl_first_entry(sai);
1357 list_del_init(&clli->lli_agl_list);
1358 spin_unlock(&lli->lli_agl_lock);
1360 ll_agl_trigger(&clli->lli_vfs_inode, sai);
1362 spin_lock(&lli->lli_agl_lock);
1364 spin_unlock(&lli->lli_agl_lock);
1366 if (!sa_sent_full(sai))
1370 * If the thread is not doing a stat in 30s then it probably
1371 * does not care too much about performance, or is no longer
1372 * using this directory. Stop the statahead thread in this case.
1374 timeout = schedule_timeout(
1375 cfs_time_seconds(sbi->ll_sa_timeout));
1377 lli->lli_sa_enabled = 0;
1381 __set_current_state(TASK_RUNNING);
1383 sa_statahead(sai, parent, name, len, fid);
1386 static int ll_statahead_by_advise(struct ll_statahead_info *sai,
1387 struct dentry *parent)
1389 struct inode *dir = parent->d_inode;
1390 struct ll_inode_info *lli = ll_i2info(dir);
1391 struct ll_sb_info *sbi = ll_i2sbi(dir);
1401 CDEBUG(D_READA, "%s: ADVISE statahead: parent %pd fname prefix %s\n",
1402 sbi->ll_fsname, parent, sai->sai_fname);
1404 OBD_ALLOC(fname, NAME_MAX);
1408 len = strlen(sai->sai_fname);
1409 memcpy(fname, sai->sai_fname, len);
1410 max_len = sizeof(sai->sai_fname) - len;
1413 /* matches smp_store_release() in ll_deauthorize_statahead() */
1414 while (smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled) {
1417 numlen = snprintf(ptr, max_len, "%llu",
1418 sai->sai_fstart + i);
1420 ll_statahead_handle(sai, parent, fname, len + numlen, NULL);
1421 if (++i >= sai->sai_fend)
1425 OBD_FREE(fname, NAME_MAX);
1429 static int ll_statahead_by_fname(struct ll_statahead_info *sai,
1430 struct dentry *parent)
1432 struct inode *dir = parent->d_inode;
1433 struct ll_inode_info *lli = ll_i2info(dir);
1434 struct ll_sb_info *sbi = ll_i2sbi(dir);
1443 CDEBUG(D_READA, "%s: FNAME statahead: parent %pd fname prefix %s\n",
1444 sbi->ll_fsname, parent, sai->sai_fname);
1446 OBD_ALLOC(fname, NAME_MAX);
1450 len = strlen(sai->sai_fname);
1451 memcpy(fname, sai->sai_fname, len);
1452 max_len = sizeof(sai->sai_fname) - len;
1455 /* matches smp_store_release() in ll_deauthorize_statahead() */
1456 while (smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled) {
1459 if (sai->sai_fname_zeroed_len)
1460 numlen = snprintf(ptr, max_len, "%0*llu",
1461 sai->sai_fname_zeroed_len,
1462 ++sai->sai_fname_index);
1464 numlen = snprintf(ptr, max_len, "%llu",
1465 ++sai->sai_fname_index);
1467 ll_statahead_handle(sai, parent, fname, len + numlen, NULL);
1469 if (sa_low_hit(sai)) {
1471 atomic_inc(&sbi->ll_sa_wrong);
1472 CDEBUG(D_CACHE, "%s: low hit ratio for %pd "DFID": hit=%llu miss=%llu sent=%llu replied=%llu, stopping PID %d\n",
1473 sbi->ll_fsname, parent, PFID(ll_inode2fid(dir)),
1474 sai->sai_hit, sai->sai_miss, sai->sai_sent,
1475 sai->sai_replied, current->pid);
1480 OBD_FREE(fname, NAME_MAX);
1484 /* statahead thread main function */
1485 static int ll_statahead_thread(void *arg)
1487 struct ll_statahead_info *sai = (struct ll_statahead_info *)arg;
1488 struct dentry *parent = sai->sai_dentry;
1489 struct inode *dir = parent->d_inode;
1490 struct ll_inode_info *lli = ll_i2info(dir);
1491 struct ll_sb_info *sbi = ll_i2sbi(dir);
1492 struct lu_batch *bh = NULL;
1497 CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1500 sai->sai_max_batch_count = sbi->ll_sa_batch_max;
1501 if (sai->sai_max_batch_count) {
1502 bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
1503 sai->sai_max_batch_count);
1505 GOTO(out_stop_agl, rc = PTR_ERR(bh));
1510 switch (lli->lli_sa_pattern & LSA_PATTERN_MASK) {
1511 case LSA_PATTERN_LIST:
1512 rc = ll_statahead_by_list(parent);
1514 case LSA_PATTERN_ADVISE:
1515 rc = ll_statahead_by_advise(sai, parent);
1517 case LSA_PATTERN_FNAME:
1518 rc = ll_statahead_by_fname(sai, parent);
1526 spin_lock(&lli->lli_sa_lock);
1527 sai->sai_task = NULL;
1528 spin_unlock(&lli->lli_sa_lock);
1531 ll_statahead_flush_nowait(sai);
1534 * statahead is finished, but statahead entries need to be cached, wait
1535 * for file release closedir() call to stop me.
1537 while (({set_current_state(TASK_IDLE);
1538 /* matches smp_store_release() in ll_deauthorize_statahead() */
1539 smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled; })) {
1542 __set_current_state(TASK_RUNNING);
1547 rc = md_batch_stop(ll_i2mdexp(dir), sai->sai_bh);
1555 * wait for inflight statahead RPCs to finish, and then we can free sai
1556 * safely because statahead RPC will access sai data
1558 while (sai->sai_sent != sai->sai_replied)
1559 /* in case we're not woken up, timeout wait */
1562 CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd hit %llu miss %llu\n",
1563 sbi->ll_fsname, sai, parent, sai->sai_hit, sai->sai_miss);
1565 spin_lock(&lli->lli_sa_lock);
1566 sai->sai_task = NULL;
1567 spin_unlock(&lli->lli_sa_lock);
1568 wake_up(&sai->sai_waitq);
1570 atomic_add(sai->sai_hit, &sbi->ll_sa_hit_total);
1571 atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
1574 ll_sax_put(dir, lli->lli_sax);
1579 /* authorize opened dir handle @key to statahead */
1580 void ll_authorize_statahead(struct inode *dir, void *key)
1582 struct ll_inode_info *lli = ll_i2info(dir);
1584 spin_lock(&lli->lli_sa_lock);
1585 if (!lli->lli_opendir_key && !lli->lli_sai) {
1587 * if lli_sai is not NULL, it means previous statahead is not
1588 * finished yet, we'd better not start a new statahead for now.
1590 lli->lli_opendir_key = key;
1591 lli->lli_stat_pid = current->pid;
1592 lli->lli_sa_enabled = 1;
1593 lli->lli_sa_pattern |= LSA_PATTERN_OPENDIR;
1595 spin_unlock(&lli->lli_sa_lock);
1598 static void ll_deauthorize_statahead_advise(struct inode *dir, void *key)
1600 struct ll_inode_info *lli = ll_i2info(dir);
1601 struct ll_file_data *fd = (struct ll_file_data *)key;
1602 struct ll_statahead_info *sai = fd->fd_sai;
1607 spin_lock(&lli->lli_sa_lock);
1608 if (sai->sai_task) {
1609 struct task_struct *task = sai->sai_task;
1611 /* matches smp_load_acquire() in ll_statahead_thread() */
1612 smp_store_release(&sai->sai_task, NULL);
1613 wake_up_process(task);
1616 spin_unlock(&lli->lli_sa_lock);
1618 LASSERT(lli->lli_sax != NULL);
1619 ll_sax_put(dir, lli->lli_sax);
1623 * deauthorize opened dir handle @key to statahead, and notify statahead thread
1624 * to quit if it's running.
1626 void ll_deauthorize_statahead(struct inode *dir, void *key)
1628 struct ll_inode_info *lli = ll_i2info(dir);
1629 struct ll_statahead_info *sai;
1631 CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1632 PFID(&lli->lli_fid));
1634 if (lli->lli_sa_pattern & LSA_PATTERN_ADVISE) {
1635 ll_deauthorize_statahead_advise(dir, key);
1639 LASSERT(lli->lli_stat_pid != 0);
1640 LASSERT(lli->lli_opendir_key == key);
1641 spin_lock(&lli->lli_sa_lock);
1642 lli->lli_opendir_key = NULL;
1643 lli->lli_stat_pid = 0;
1644 lli->lli_sa_enabled = 0;
1645 lli->lli_sa_pattern = LSA_PATTERN_NONE;
1646 lli->lli_sa_fname_index = 0;
1647 lli->lli_sa_match_count = 0;
1649 if (sai && sai->sai_task) {
1651 * statahead thread may not have quit yet because it needs to
1652 * cache entries, now it's time to tell it to quit.
1654 * wake_up_process() provides the necessary barriers
1655 * to pair with set_current_state().
1657 struct task_struct *task = sai->sai_task;
1659 /* matches smp_load_acquire() in ll_statahead_thread() */
1660 smp_store_release(&sai->sai_task, NULL);
1661 wake_up_process(task);
1663 spin_unlock(&lli->lli_sa_lock);
1668 * not first dirent, or is "."
1670 LS_NOT_FIRST_DE = 0,
1672 * the first non-hidden dirent
1676 * the first hidden dirent, that is "."
1681 /* file is first dirent under @dir */
1682 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1684 struct qstr *target = &dentry->d_name;
1685 struct md_op_data *op_data;
1687 struct page *page = NULL;
1688 int rc = LS_NOT_FIRST_DE;
1690 struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1694 op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1695 LUSTRE_OPC_ANY, dir);
1696 if (IS_ERR(op_data))
1697 RETURN(PTR_ERR(op_data));
1699 if (IS_ENCRYPTED(dir)) {
1700 int rc2 = llcrypt_fname_alloc_buffer(dir, NAME_MAX, &lltr);
1707 *FIXME choose the start offset of the readdir
1710 page = ll_get_dir_page(dir, op_data, 0, NULL);
1713 struct lu_dirpage *dp;
1714 struct lu_dirent *ent;
1717 struct ll_inode_info *lli = ll_i2info(dir);
1720 CERROR("%s: reading dir "DFID" at %llu stat_pid = %u : rc = %d\n",
1721 ll_i2sbi(dir)->ll_fsname,
1722 PFID(ll_inode2fid(dir)), pos,
1723 lli->lli_stat_pid, rc);
1727 dp = page_address(page);
1728 for (ent = lu_dirent_start(dp); ent != NULL;
1729 ent = lu_dirent_next(ent)) {
1734 hash = le64_to_cpu(ent->lde_hash);
1736 * The ll_get_dir_page() can return any page containing
1737 * the given hash which may be not the start hash.
1739 if (unlikely(hash < pos))
1742 namelen = le16_to_cpu(ent->lde_namelen);
1743 if (unlikely(namelen == 0))
1745 * skip dummy record.
1749 name = ent->lde_name;
1750 if (name[0] == '.') {
1756 else if (name[1] == '.' && namelen == 2)
1767 if (dot_de && target->name[0] != '.') {
1768 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1769 target->len, target->name,
1774 if (IS_ENCRYPTED(dir)) {
1775 struct llcrypt_str de_name =
1776 LLTR_INIT(ent->lde_name, namelen);
1779 fid_le_to_cpu(&fid, &ent->lde_fid);
1780 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1787 if (target->len != namelen ||
1788 memcmp(target->name, name, namelen) != 0)
1789 rc = LS_NOT_FIRST_DE;
1793 rc = LS_FIRST_DOT_DE;
1795 ll_release_page(dir, page, false);
1798 pos = le64_to_cpu(dp->ldp_hash_end);
1799 if (pos == MDS_DIR_END_OFF) {
1801 * End of directory reached.
1803 ll_release_page(dir, page, false);
1807 * chain is exhausted
1808 * Normal case: continue to the next page.
1810 ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1812 page = ll_get_dir_page(dir, op_data, pos, NULL);
1817 llcrypt_fname_free_buffer(&lltr);
1818 ll_finish_md_op_data(op_data);
1823 static struct ll_statahead_info *
1824 ll_find_sai_locked(struct ll_statahead_context *ctx, pid_t pid)
1826 struct ll_statahead_info *sai;
1828 list_for_each_entry(sai, &ctx->sax_sai_list, sai_item) {
1829 if (sai->sai_pid == pid)
1835 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
1838 static int ll_shared_statahead_check(struct inode *dir, struct dentry *dentry,
1839 struct ll_statahead_context *ctx)
1841 struct ll_inode_info *lli = ll_i2info(dir);
1842 struct ll_statahead_info *sai;
1846 spin_lock(&lli->lli_sa_lock);
1849 if (sai->sai_pid == current->pid) {
1850 spin_unlock(&lli->lli_sa_lock);
1853 lli->lli_sai = NULL;
1854 lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
1857 LASSERT(lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED);
1858 sai = ll_find_sai_locked(ctx, current->pid);
1860 spin_unlock(&lli->lli_sa_lock);
1864 spin_unlock(&lli->lli_sa_lock);
1866 RETURN(start_statahead_thread(dir, dentry, true));
1870 * revalidate @dentryp from statahead cache
1872 * \param[in] dir parent directory
1873 * \param[in] sai sai structure
1874 * \param[out] dentryp pointer to dentry which will be revalidated
1875 * \param[in] unplug unplug statahead window only (normally for negative
1877 * \retval 1 on success, dentry is saved in @dentryp
1878 * \retval 0 if revalidation failed (no proper lock on client)
1879 * \retval negative number upon error
1881 static int revalidate_statahead_dentry(struct inode *dir,
1882 struct ll_statahead_context *ctx,
1883 struct dentry **dentryp,
1886 struct sa_entry *entry = NULL;
1887 struct ll_inode_info *lli = ll_i2info(dir);
1888 struct ll_statahead_info *sai = lli->lli_sai;
1893 if (sai && (*dentryp)->d_name.name[0] == '.') {
1894 if (sai->sai_ls_all ||
1895 sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1897 * Hidden dentry is the first one, or statahead
1898 * thread does not skip so many hidden dentries
1899 * before "sai_ls_all" enabled as below.
1902 if (!sai->sai_ls_all)
1904 * It maybe because hidden dentry is not
1905 * the first one, "sai_ls_all" was not
1906 * set, then "ls -al" missed. Enable
1907 * "sai_ls_all" for such case.
1909 sai->sai_ls_all = 1;
1912 * Such "getattr" has been skipped before
1913 * "sai_ls_all" enabled as above.
1915 sai->sai_miss_hidden++;
1923 entry = sa_get(ctx, &(*dentryp)->d_name);
1925 if (lli->lli_sa_pattern & LSA_PATTERN_FNAME)
1926 rc = ll_shared_statahead_check(dir, *dentryp, ctx);
1927 GOTO(out, rc = rc == 0 ? -EAGAIN : rc);
1930 if (lli->lli_sa_pattern & LSA_PATTERN_LIST)
1931 LASSERT(sai == entry->se_sai);
1932 else if (lli->lli_sa_pattern & LSA_PATTERN_FNAME ||
1933 lli->lli_sa_pattern == LSA_PATTERN_ADVISE)
1934 sai = entry->se_sai;
1936 LASSERTF(sai != NULL, "pattern %#X entry %p se_sai %p %pd lli %p\n",
1937 lli->lli_sa_pattern, entry, entry->se_sai, *dentryp, lli);
1938 if (!sa_ready(entry)) {
1939 spin_lock(&lli->lli_sa_lock);
1940 sai->sai_index_wait = entry->se_index;
1941 spin_unlock(&lli->lli_sa_lock);
1942 rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
1943 cfs_time_seconds(30));
1946 * entry may not be ready, so it may be used by inflight
1947 * statahead RPC, don't free it.
1950 GOTO(out, rc = -EAGAIN);
1955 * We need to see the value that was set immediately before we
1958 if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
1960 struct inode *inode = entry->se_inode;
1961 struct lookup_intent it = { .it_op = IT_GETATTR,
1966 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1967 ll_inode2fid(inode), &bits);
1969 if (!(*dentryp)->d_inode) {
1970 struct dentry *alias;
1972 alias = ll_splice_alias(inode, *dentryp);
1973 if (IS_ERR(alias)) {
1974 ll_intent_release(&it);
1975 GOTO(out, rc = PTR_ERR(alias));
1979 * statahead prepared this inode, transfer inode
1980 * refcount from sa_entry to dentry
1982 entry->se_inode = NULL;
1983 } else if ((*dentryp)->d_inode != inode) {
1984 /* revalidate, but inode is recreated */
1986 "%s: stale dentry %pd inode " DFID", statahead inode "DFID "\n",
1987 ll_i2sbi(inode)->ll_fsname, *dentryp,
1988 PFID(ll_inode2fid((*dentryp)->d_inode)),
1989 PFID(ll_inode2fid(inode)));
1990 ll_intent_release(&it);
1991 GOTO(out, rc = -ESTALE);
1994 if (bits & MDS_INODELOCK_LOOKUP) {
1995 d_lustre_revalidate(*dentryp);
1996 if (S_ISDIR(inode->i_mode))
1997 ll_update_dir_depth_dmv(dir, *dentryp);
2000 ll_intent_release(&it);
2005 * statahead cached sa_entry can be used only once, and will be killed
2006 * right after use, so if lookup/revalidate accessed statahead cache,
2007 * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
2008 * stat this file again, we know we've done statahead before, see
2009 * dentry_may_statahead().
2011 if (lld_is_init(*dentryp))
2012 ll_d2d(*dentryp)->lld_sa_generation = lli->lli_sa_generation;
2013 sa_put(dir, sai, entry);
2019 sa_pattern_list_detect(struct inode *dir, struct dentry *dchild, int *first)
2021 struct ll_inode_info *lli = ll_i2info(dir);
2023 if (lli->lli_stat_pid == 0)
2026 /* Directory listing needs to call opendir()/readdir()/stat(). */
2027 if (!(lli->lli_sa_pattern & LSA_PATTERN_OPENDIR))
2030 if (lli->lli_sa_enabled == 0)
2033 if (lli->lli_sa_pattern & LSA_PATTERN_LS_NOT_FIRST_DE)
2036 *first = is_first_dirent(dir, dchild);
2037 if (*first == LS_NOT_FIRST_DE) {
2039 * It is not "ls -{a}l" operation, no need statahead for it.
2040 * Disable statahead so that subsequent stat() won't waste
2043 spin_lock(&lli->lli_sa_lock);
2044 if (lli->lli_stat_pid == current->pid) {
2045 lli->lli_sa_enabled = 0;
2046 lli->lli_sa_pattern |= LSA_PATTERN_LS_NOT_FIRST_DE;
2048 spin_unlock(&lli->lli_sa_lock);
2052 spin_lock(&lli->lli_sa_lock);
2053 lli->lli_sa_pattern |= LSA_PATTERN_LIST;
2054 spin_unlock(&lli->lli_sa_lock);
2059 sa_pattern_fname_detect(struct inode *dir, struct dentry *dchild)
2061 struct ll_inode_info *lli = ll_i2info(dir);
2062 struct qstr *dname = &dchild->d_name;
2063 const unsigned char *name = dname->name;
2067 if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
2069 if (lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED)
2073 * Parse the format of the file name to determine whether it matches
2074 * the supported file name pattern for statahead (i.e. mdtest.$rank.$i).
2077 if (isdigit(name[i])) {
2081 if (lli->lli_stat_pid == 0) {
2082 lli->lli_stat_pid = current->pid;
2083 } else if (lli->lli_stat_pid != current->pid) {
2085 * More than two processes (MPI ranks) doing stat()
2086 * calls under this directory, consider it as a mdtest
2087 * shared dir stat() workload.
2089 spin_lock(&lli->lli_sa_lock);
2090 lli->lli_stat_pid = current->pid;
2091 if (lli->lli_sa_pattern & LSA_PATTERN_FNAME) {
2092 lli->lli_sai = NULL;
2095 lli->lli_sa_pattern |= LSA_PATTERN_FNAME;
2098 lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
2099 spin_unlock(&lli->lli_sa_lock);
2103 while (--i >= 0 && isdigit(name[i]))
2106 ret = kstrtol(&name[i], 0, &num);
2111 * The traversing program do multiple stat() calls on the same
2112 * children entry. i.e. ls $dir*.
2114 if (lli->lli_sa_fname_index == num)
2117 if (lli->lli_sa_match_count == 0 ||
2118 num == lli->lli_sa_fname_index + 1) {
2119 lli->lli_sa_match_count++;
2120 lli->lli_sa_fname_index = num;
2122 if (lli->lli_sa_match_count > LSA_FN_MATCH_HIT)
2123 GOTO(out, rc = true);
2129 spin_lock(&lli->lli_sa_lock);
2131 lli->lli_sa_pattern |= LSA_PATTERN_FNAME;
2133 lli->lli_sa_pattern = LSA_PATTERN_NONE;
2134 lli->lli_sa_match_count = 0;
2135 lli->lli_sa_fname_index = 0;
2136 lli->lli_sa_enabled = 0;
2138 spin_unlock(&lli->lli_sa_lock);
2143 /* detect the statahead pattern. */
2145 sa_pattern_detect(struct inode *dir, struct dentry *dchild, int *first)
2147 return sa_pattern_list_detect(dir, dchild, first) ||
2148 sa_pattern_fname_detect(dir, dchild);
2151 static inline int ll_sax_add_sai(struct ll_statahead_context *ctx,
2152 struct ll_statahead_info *sai)
2154 if (ll_find_sai_locked(ctx, sai->sai_pid) != NULL)
2157 list_add_tail(&sai->sai_item, &ctx->sax_sai_list);
2162 * start statahead thread
2164 * \param[in] dir parent directory
2165 * \param[in] dentry dentry that triggers statahead, normally the first
2167 * \param[in] agl indicate whether AGL is needed
2168 * \retval -EAGAIN on success, because when this function is
2169 * called, it's already in lookup call, so client should
2170 * do it itself instead of waiting for statahead thread
2171 * to do it asynchronously.
2172 * \retval negative number upon error
2174 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
2177 int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
2178 struct ll_inode_info *lli = ll_i2info(dir);
2179 struct ll_statahead_info *sai = NULL;
2180 struct ll_statahead_context *ctx = NULL;
2181 struct dentry *parent;
2182 struct task_struct *task;
2183 struct ll_sb_info *sbi;
2184 int first = LS_FIRST_DE;
2189 if (sa_pattern_detect(dir, dentry, &first) == false)
2192 parent = dget_parent(dentry);
2193 sbi = ll_i2sbi(d_inode(parent));
2194 if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
2195 sbi->ll_sa_running_max)) {
2197 "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
2199 GOTO(out, rc = -EMFILE);
2202 /* on success ll_sai_alloc holds a ref on parent */
2203 sai = ll_sai_alloc(parent);
2206 GOTO(out, rc = -ENOMEM);
2208 sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
2209 sai->sai_pid = current->pid;
2211 if (lli->lli_sa_pattern & LSA_PATTERN_FNAME) {
2212 struct qstr *dname = &dentry->d_name;
2213 const unsigned char *name = dname->name;
2217 if (dname->len >= sizeof(sai->sai_fname))
2218 GOTO(out, rc = -ERANGE);
2221 while (--i >= 0 && isdigit(name[i]))
2224 rc = kstrtol(&name[i], 0, &num);
2228 memcpy(sai->sai_fname, dname->name, i);
2229 sai->sai_fname[i] = '\0';
2230 sai->sai_fname_index = num;
2231 /* The front part of the file name is zeroed padding. */
2233 sai->sai_fname_zeroed_len = dname->len - i;
2236 /* The workload like directory listing or mdtest unique dir stat() */
2237 if (lli->lli_sa_pattern & LSA_PATTERN_LIST ||
2238 (lli->lli_sa_pattern & (LSA_PATTERN_FN_SHARED |
2239 LSA_PATTERN_FNAME)) == LSA_PATTERN_FNAME) {
2240 ctx = ll_sax_alloc(dir);
2242 GOTO(out, rc = -ENOMEM);
2245 * if current lli_opendir_key was deauthorized, or dir
2246 * re-opened by another process, don't start statahead,
2247 * otherwise the newly spawned statahead thread won't be
2250 spin_lock(&lli->lli_sa_lock);
2251 if (unlikely(lli->lli_sai || lli->lli_sax ||
2252 ((lli->lli_sa_pattern & LSA_PATTERN_LIST) &&
2253 !lli->lli_opendir_key &&
2254 lli->lli_stat_pid != current->pid))) {
2255 spin_unlock(&lli->lli_sa_lock);
2256 GOTO(out, rc = -EPERM);
2258 rc = ll_sax_add_sai(ctx, sai);
2260 spin_unlock(&lli->lli_sa_lock);
2265 spin_unlock(&lli->lli_sa_lock);
2266 } else if (lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED) {
2267 /* For mdtest shared dir stat() workload */
2268 LASSERT(lli->lli_sa_pattern & LSA_PATTERN_FNAME);
2269 ctx = ll_sax_get(dir);
2271 ctx = ll_sax_alloc(dir);
2273 GOTO(out, rc = -ENOMEM);
2275 spin_lock(&lli->lli_sa_lock);
2277 struct ll_statahead_context *tmp = ctx;
2279 if (lli->lli_sa_pattern &
2280 LSA_PATTERN_FN_SHARED) {
2283 rc = ll_sax_add_sai(ctx, sai);
2285 CWARN("%s: invalid pattern %#X.\n",
2287 lli->lli_sa_pattern);
2291 spin_unlock(&lli->lli_sa_lock);
2297 rc = ll_sax_add_sai(ctx, sai);
2298 spin_unlock(&lli->lli_sa_lock);
2301 spin_lock(&lli->lli_sa_lock);
2302 if (!(lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED)) {
2303 spin_unlock(&lli->lli_sa_lock);
2304 GOTO(out, rc = -EINVAL);
2307 rc = ll_sax_add_sai(ctx, sai);
2308 spin_unlock(&lli->lli_sa_lock);
2314 CERROR("%s: unsupported statahead pattern %#X.\n",
2315 sbi->ll_fsname, lli->lli_sa_pattern);
2316 GOTO(out, rc = -EOPNOTSUPP);
2319 CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
2320 current->pid, parent);
2322 task = kthread_create_on_node(ll_statahead_thread, sai, node,
2323 "ll_sa_%u", lli->lli_stat_pid);
2325 spin_lock(&lli->lli_sa_lock);
2326 lli->lli_sai = NULL;
2327 spin_unlock(&lli->lli_sa_lock);
2329 CERROR("can't start ll_sa thread, rc: %d\n", rc);
2333 if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
2334 ll_start_agl(parent, sai);
2336 atomic_inc(&sbi->ll_sa_total);
2337 if (lli->lli_sa_pattern & LSA_PATTERN_LIST)
2338 atomic_inc(&sbi->ll_sa_list_total);
2339 else if (lli->lli_sa_pattern & LSA_PATTERN_FNAME)
2340 atomic_inc(&sbi->ll_sa_fname_total);
2342 sai->sai_task = task;
2343 wake_up_process(task);
2345 * We don't stat-ahead for the first dirent since we are already in
2352 * once we start statahead thread failed, disable statahead so that
2353 * subsequent stat won't waste time to try it.
2355 spin_lock(&lli->lli_sa_lock);
2356 if (lli->lli_stat_pid == current->pid)
2357 lli->lli_sa_enabled = 0;
2358 spin_unlock(&lli->lli_sa_lock);
2367 atomic_dec(&sbi->ll_sa_running);
2373 * Check whether statahead for @dir was started.
2375 static inline bool ll_statahead_started(struct inode *dir, bool agl)
2377 struct ll_inode_info *lli = ll_i2info(dir);
2378 struct ll_statahead_context *ctx;
2379 struct ll_statahead_info *sai;
2381 spin_lock(&lli->lli_sa_lock);
2384 if (sai && (sai->sai_agl_task != NULL) != agl)
2386 "%s: Statahead AGL hint changed from %d to %d\n",
2387 ll_i2sbi(dir)->ll_fsname,
2388 sai->sai_agl_task != NULL, agl);
2389 spin_unlock(&lli->lli_sa_lock);
2395 * statahead entry function, this is called when client getattr on a file, it
2396 * will start statahead thread if this is the first dir entry, else revalidate
2397 * dentry from statahead cache.
2399 * \param[in] dir parent directory
2400 * \param[out] dentryp dentry to getattr
2401 * \param[in] agl whether start the agl thread
2403 * \retval 1 on success
2404 * \retval 0 revalidation from statahead cache failed, caller needs
2405 * to getattr from server directly
2406 * \retval negative number on error, caller often ignores this and
2407 * then getattr from server
2409 int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
2411 if (!ll_statahead_started(dir, agl))
2412 return start_statahead_thread(dir, dentry, agl);
2417 * revalidate dentry from statahead cache.
2419 * \param[in] dir parent directory
2420 * \param[out] dentryp dentry to getattr
2421 * \param[in] unplug unplug statahead window only (normally for negative
2423 * \retval 1 on success
2424 * \retval 0 revalidation from statahead cache failed, caller needs
2425 * to getattr from server directly
2426 * \retval negative number on error, caller often ignores this and
2427 * then getattr from server
2429 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
2432 struct ll_inode_info *lli = ll_i2info(dir);
2433 struct ll_statahead_context *ctx;
2434 struct ll_statahead_info *sai = NULL;
2437 spin_lock(&lli->lli_sa_lock);
2442 atomic_inc(&sai->sai_refcount);
2443 } else if (lli->lli_sa_pattern & LSA_PATTERN_LIST) {
2444 spin_unlock(&lli->lli_sa_lock);
2449 spin_unlock(&lli->lli_sa_lock);
2451 rc = revalidate_statahead_dentry(dir, ctx, dentryp, unplug);
2452 CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
2456 ll_sax_put(dir, ctx);
2461 int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise)
2463 int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
2464 struct ll_file_data *fd = file->private_data;
2465 struct dentry *dentry = file_dentry(file);
2466 struct inode *dir = dentry->d_inode;
2467 struct ll_inode_info *lli = ll_i2info(dir);
2468 struct ll_sb_info *sbi = ll_i2sbi(dir);
2469 struct ll_statahead_info *sai = NULL;
2470 struct ll_statahead_context *ctx = NULL;
2471 struct task_struct *task;
2477 if (sbi->ll_sa_max == 0)
2480 if (!S_ISDIR(dir->i_mode))
2485 CWARN("%s: already set statahead hint for dir %pd: rc = %d\n",
2486 sbi->ll_fsname, dentry, rc);
2490 if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
2491 sbi->ll_sa_running_max)) {
2493 "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
2494 GOTO(out, rc = -EMFILE);
2497 sai = ll_sai_alloc(dentry);
2499 GOTO(out, rc = -ENOMEM);
2501 sai->sai_fstart = ladvise->lla_start;
2502 sai->sai_fend = ladvise->lla_end;
2503 sai->sai_ls_all = 0;
2504 sai->sai_max = sbi->ll_sa_max;
2505 strncpy(sai->sai_fname, ladvise->lla_fname, sizeof(sai->sai_fname));
2507 ctx = ll_sax_get(dir);
2509 ctx = ll_sax_alloc(dir);
2511 GOTO(out, rc = -ENOMEM);
2513 spin_lock(&lli->lli_sa_lock);
2514 if (unlikely(lli->lli_sax)) {
2515 struct ll_statahead_context *tmp = ctx;
2517 if (lli->lli_sa_pattern == LSA_PATTERN_NONE ||
2518 lli->lli_sa_pattern == LSA_PATTERN_ADVISE) {
2519 lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2522 fd->fd_sai = __ll_sai_get(sai);
2526 CWARN("%s: pattern %X is not ADVISE: rc = %d\n",
2527 sbi->ll_fsname, lli->lli_sa_pattern, rc);
2530 spin_unlock(&lli->lli_sa_lock);
2535 lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2537 fd->fd_sai = __ll_sai_get(sai);
2538 spin_unlock(&lli->lli_sa_lock);
2541 spin_lock(&lli->lli_sa_lock);
2542 if (!(lli->lli_sa_pattern == LSA_PATTERN_ADVISE ||
2543 lli->lli_sa_pattern == LSA_PATTERN_NONE)) {
2544 spin_unlock(&lli->lli_sa_lock);
2545 GOTO(out, rc = -EINVAL);
2548 lli->lli_sa_pattern = LSA_PATTERN_ADVISE;
2549 fd->fd_sai = __ll_sai_get(sai);
2550 spin_unlock(&lli->lli_sa_lock);
2555 "start statahead thread: [pid %d] [parent %pd] sai %p ctx %p\n",
2556 current->pid, dentry, sai, ctx);
2558 task = kthread_create_on_node(ll_statahead_thread, sai, node,
2559 "ll_sa_%u", current->pid);
2562 CERROR("%s: cannot start ll_sa thread: rc = %d\n",
2563 sbi->ll_fsname, rc);
2567 if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
2568 ll_start_agl(dentry, sai);
2570 atomic_inc(&sbi->ll_sa_total);
2571 sai->sai_task = task;
2572 wake_up_process(task);
2578 ll_sax_put(dir, ctx);
2588 atomic_dec(&sbi->ll_sa_running);
2593 * This function is called in each stat() system call to do statahead check.
2594 * When the files' naming of stat() call sequence under a directory follows
2595 * a certain name rule roughly, this directory is considered as an condicant
2597 * For an example, the file naming rule is mdtest.$rank.$i, the suffix of
2598 * the stat() dentry name is number and do stat() for dentries with name
2599 * ending with number more than @LSA_FN_PREDICT_HIT, then the corresponding
2600 * directory is met the requrirement for statahead.
2602 void ll_statahead_enter(struct inode *dir, struct dentry *dchild)
2604 struct ll_inode_info *lli;
2605 struct qstr *dname = &dchild->d_name;
2607 if (ll_i2sbi(dir)->ll_sa_max == 0)
2610 if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
2613 lli = ll_i2info(dir);
2614 if (lli->lli_sa_enabled)
2617 if (lli->lli_sa_pattern & (LSA_PATTERN_FN_PREDICT | LSA_PATTERN_LIST))
2621 * Now support number indexing regularized statahead pattern only.
2622 * Quick check whether the last character is digit.
2624 if (!isdigit(dname->name[dname->len - 1])) {
2625 lli->lli_sa_match_count = 0;
2629 lli->lli_sa_match_count++;
2630 if (lli->lli_sa_match_count > LSA_FN_PREDICT_HIT) {
2631 spin_lock(&lli->lli_sa_lock);
2632 lli->lli_sa_pattern |= LSA_PATTERN_FN_PREDICT;
2633 spin_unlock(&lli->lli_sa_lock);
2634 lli->lli_sa_enabled = 1;
2635 lli->lli_sa_match_count = 0;