4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
33 #include <linux/sched.h>
34 #include <linux/kthread.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38 #include <linux/delay.h>
40 #define DEBUG_SUBSYSTEM S_LLITE
42 #include <obd_support.h>
43 #include <lustre_dlm.h>
44 #include "llite_internal.h"
46 #define SA_OMITTED_ENTRY_MAX 8ULL
49 /** negative values are for error cases */
50 SA_ENTRY_INIT = 0, /** init entry */
51 SA_ENTRY_SUCC = 1, /** stat succeed */
52 SA_ENTRY_INVA = 2, /** invalid entry */
56 * sa_entry is not refcounted: statahead thread allocates it and do async stat,
57 * and in async stat callback ll_statahead_interpret() will prepare the inode
58 * and set lock data in the ptlrpcd context. Then the scanner process will be
59 * woken up if this entry is the waiting one, can access and free it.
62 /* link into sai_entries */
63 struct list_head se_list;
64 /* link into sai hash table locally */
65 struct list_head se_hash;
66 /* entry index in the sai */
68 /* low layer ldlm lock handle */
72 /* entry size, contains name */
74 /* pointer to the target inode */
75 struct inode *se_inode;
82 static unsigned int sai_generation;
83 static DEFINE_SPINLOCK(sai_generation_lock);
85 static inline int sa_unhashed(struct sa_entry *entry)
87 return list_empty(&entry->se_hash);
90 /* sa_entry is ready to use */
91 static inline int sa_ready(struct sa_entry *entry)
93 /* Make sure sa_entry is updated and ready to use */
95 return (entry->se_state != SA_ENTRY_INIT);
98 /* hash value to put in sai_cache */
99 static inline int sa_hash(int val)
101 return val & LL_SA_CACHE_MASK;
104 /* hash entry into sai_cache */
106 sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
108 int i = sa_hash(entry->se_qstr.hash);
110 spin_lock(&sai->sai_cache_lock[i]);
111 list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
112 spin_unlock(&sai->sai_cache_lock[i]);
115 /* unhash entry from sai_cache */
117 sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
119 int i = sa_hash(entry->se_qstr.hash);
121 spin_lock(&sai->sai_cache_lock[i]);
122 list_del_init(&entry->se_hash);
123 spin_unlock(&sai->sai_cache_lock[i]);
126 static inline int agl_should_run(struct ll_statahead_info *sai,
129 return inode && S_ISREG(inode->i_mode) && sai->sai_agl_task;
132 static inline struct ll_inode_info *
133 agl_first_entry(struct ll_statahead_info *sai)
135 return list_first_entry(&sai->sai_agls, struct ll_inode_info,
139 /* statahead window is full */
140 static inline int sa_sent_full(struct ll_statahead_info *sai)
142 return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
145 /* Batch metadata handle */
146 static inline bool sa_has_batch_handle(struct ll_statahead_info *sai)
148 return sai->sai_bh != NULL;
151 static inline void ll_statahead_flush_nowait(struct ll_statahead_info *sai)
153 if (sa_has_batch_handle(sai)) {
154 sai->sai_index_end = sai->sai_index - 1;
155 (void) md_batch_flush(ll_i2mdexp(sai->sai_dentry->d_inode),
160 static inline int agl_list_empty(struct ll_statahead_info *sai)
162 return list_empty(&sai->sai_agls);
166 * (1) hit ratio less than 80%
168 * (2) consecutive miss more than 8
169 * then means low hit.
171 static inline int sa_low_hit(struct ll_statahead_info *sai)
173 return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
174 (sai->sai_consecutive_miss > 8));
178 * if the given index is behind of statahead window more than
179 * SA_OMITTED_ENTRY_MAX, then it is old.
181 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
183 return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
187 /* allocate sa_entry and hash it to allow scanner process to find it */
188 static struct sa_entry *
189 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
190 const char *name, int len, const struct lu_fid *fid)
192 struct ll_inode_info *lli;
193 struct sa_entry *entry;
199 entry_size = sizeof(struct sa_entry) + (len & ~3) + 4;
200 OBD_ALLOC(entry, entry_size);
201 if (unlikely(!entry))
202 RETURN(ERR_PTR(-ENOMEM));
204 CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
205 len, name, entry, index);
207 entry->se_index = index;
209 entry->se_state = SA_ENTRY_INIT;
210 entry->se_size = entry_size;
211 dname = (char *)entry + sizeof(struct sa_entry);
212 memcpy(dname, name, len);
214 entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
215 entry->se_qstr.len = len;
216 entry->se_qstr.name = dname;
217 entry->se_fid = *fid;
219 lli = ll_i2info(sai->sai_dentry->d_inode);
221 spin_lock(&lli->lli_sa_lock);
222 INIT_LIST_HEAD(&entry->se_list);
223 sa_rehash(sai, entry);
224 spin_unlock(&lli->lli_sa_lock);
226 atomic_inc(&sai->sai_cache_count);
231 /* free sa_entry, which should have been unhashed and not in any list */
232 static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
234 CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
235 entry->se_qstr.len, entry->se_qstr.name, entry,
238 LASSERT(list_empty(&entry->se_list));
239 LASSERT(sa_unhashed(entry));
241 OBD_FREE(entry, entry->se_size);
242 atomic_dec(&sai->sai_cache_count);
246 * find sa_entry by name, used by directory scanner, lock is not needed because
247 * only scanner can remove the entry from cache.
249 static struct sa_entry *
250 sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
252 struct sa_entry *entry;
253 int i = sa_hash(qstr->hash);
255 list_for_each_entry(entry, &sai->sai_cache[i], se_hash) {
256 if (entry->se_qstr.hash == qstr->hash &&
257 entry->se_qstr.len == qstr->len &&
258 memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0)
264 /* unhash and unlink sa_entry, and then free it */
266 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
268 struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
270 LASSERT(!sa_unhashed(entry));
271 LASSERT(!list_empty(&entry->se_list));
272 LASSERT(sa_ready(entry));
274 sa_unhash(sai, entry);
276 spin_lock(&lli->lli_sa_lock);
277 list_del_init(&entry->se_list);
278 spin_unlock(&lli->lli_sa_lock);
280 iput(entry->se_inode);
285 /* called by scanner after use, sa_entry will be killed */
287 sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
289 struct ll_inode_info *lli = ll_i2info(dir);
290 struct sa_entry *tmp, *next;
293 if (entry && entry->se_state == SA_ENTRY_SUCC) {
294 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
297 sai->sai_consecutive_miss = 0;
298 if (sai->sai_max < sbi->ll_sa_max) {
299 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
301 } else if (sai->sai_max_batch_count > 0) {
302 if (sai->sai_max >= sai->sai_max_batch_count &&
303 (sai->sai_index_end - entry->se_index) %
304 sai->sai_max_batch_count == 0) {
306 } else if (entry->se_index == sai->sai_index_end) {
314 sai->sai_consecutive_miss++;
322 * kill old completed entries, only scanner process does this, no need
325 list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) {
326 if (!is_omitted_entry(sai, tmp->se_index))
331 spin_lock(&lli->lli_sa_lock);
332 if (wakeup && sai->sai_task)
333 wake_up_process(sai->sai_task);
334 spin_unlock(&lli->lli_sa_lock);
338 * update state and sort add entry to sai_entries by index, return true if
339 * scanner is waiting on this entry.
342 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
345 struct list_head *pos = &sai->sai_entries;
346 __u64 index = entry->se_index;
348 LASSERT(!sa_ready(entry));
349 LASSERT(list_empty(&entry->se_list));
351 list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
352 if (se->se_index < entry->se_index) {
357 list_add(&entry->se_list, pos);
359 * LU-9210: ll_statahead_interpet must be able to see this before
362 smp_store_release(&entry->se_state,
363 ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
365 return (index == sai->sai_index_wait);
368 /* finish async stat RPC arguments */
369 static void sa_fini_data(struct md_op_item *item)
371 struct md_op_data *op_data = &item->mop_data;
373 if (op_data->op_flags & MF_OPNAME_KMALLOCED)
374 /* allocated via ll_setup_filename called from sa_prep_data */
375 kfree(op_data->op_name);
376 ll_unlock_md_op_lsm(&item->mop_data);
378 if (item->mop_subpill_allocated)
379 OBD_FREE_PTR(item->mop_pill);
383 static int ll_statahead_interpret(struct md_op_item *item, int rc);
386 * prepare arguments for async stat RPC.
388 static struct md_op_item *
389 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
391 struct md_op_item *item;
392 struct ldlm_enqueue_info *einfo;
393 struct md_op_data *op_data;
397 return ERR_PTR(-ENOMEM);
399 op_data = ll_prep_md_op_data(&item->mop_data, dir, child,
400 entry->se_qstr.name, entry->se_qstr.len, 0,
401 LUSTRE_OPC_ANY, NULL);
402 if (IS_ERR(op_data)) {
404 return (struct md_op_item *)op_data;
408 op_data->op_fid2 = entry->se_fid;
410 item->mop_opc = MD_OP_GETATTR;
411 item->mop_it.it_op = IT_GETATTR;
412 item->mop_dir = igrab(dir);
413 item->mop_cb = ll_statahead_interpret;
414 item->mop_cbdata = entry;
416 einfo = &item->mop_einfo;
417 einfo->ei_type = LDLM_IBITS;
418 einfo->ei_mode = it_to_lock_mode(&item->mop_it);
419 einfo->ei_cb_bl = ll_md_blocking_ast;
420 einfo->ei_cb_cp = ldlm_completion_ast;
421 einfo->ei_cb_gl = NULL;
422 einfo->ei_cbdata = NULL;
423 einfo->ei_req_slot = 1;
429 * release resources used in async stat RPC, update entry state and wakeup if
430 * scanner process it waiting on this entry.
433 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
435 struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
438 spin_lock(&lli->lli_sa_lock);
439 wakeup = __sa_make_ready(sai, entry, ret);
440 spin_unlock(&lli->lli_sa_lock);
443 wake_up(&sai->sai_waitq);
446 /* insert inode into the list of sai_agls */
447 static void ll_agl_add(struct ll_statahead_info *sai,
448 struct inode *inode, int index)
450 struct ll_inode_info *child = ll_i2info(inode);
451 struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
453 spin_lock(&child->lli_agl_lock);
454 if (child->lli_agl_index == 0) {
455 child->lli_agl_index = index;
456 spin_unlock(&child->lli_agl_lock);
458 LASSERT(list_empty(&child->lli_agl_list));
460 spin_lock(&parent->lli_agl_lock);
461 /* Re-check under the lock */
462 if (agl_should_run(sai, inode)) {
463 if (agl_list_empty(sai))
464 wake_up_process(sai->sai_agl_task);
466 list_add_tail(&child->lli_agl_list, &sai->sai_agls);
468 child->lli_agl_index = 0;
469 spin_unlock(&parent->lli_agl_lock);
471 spin_unlock(&child->lli_agl_lock);
476 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
478 struct ll_statahead_info *sai;
479 struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
488 sai->sai_dentry = dget(dentry);
489 atomic_set(&sai->sai_refcount, 1);
490 sai->sai_max = LL_SA_RPC_MIN;
492 init_waitqueue_head(&sai->sai_waitq);
494 INIT_LIST_HEAD(&sai->sai_entries);
495 INIT_LIST_HEAD(&sai->sai_agls);
497 for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
498 INIT_LIST_HEAD(&sai->sai_cache[i]);
499 spin_lock_init(&sai->sai_cache_lock[i]);
501 atomic_set(&sai->sai_cache_count, 0);
503 spin_lock(&sai_generation_lock);
504 lli->lli_sa_generation = ++sai_generation;
505 if (unlikely(sai_generation == 0))
506 lli->lli_sa_generation = ++sai_generation;
507 spin_unlock(&sai_generation_lock);
513 static inline void ll_sai_free(struct ll_statahead_info *sai)
515 LASSERT(sai->sai_dentry != NULL);
516 dput(sai->sai_dentry);
521 * take refcount of sai if sai for @dir exists, which means statahead is on for
524 static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
526 struct ll_inode_info *lli = ll_i2info(dir);
527 struct ll_statahead_info *sai = NULL;
529 spin_lock(&lli->lli_sa_lock);
532 atomic_inc(&sai->sai_refcount);
533 spin_unlock(&lli->lli_sa_lock);
539 * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
542 static void ll_sai_put(struct ll_statahead_info *sai)
544 struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
546 if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
547 struct sa_entry *entry, *next;
548 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
551 spin_unlock(&lli->lli_sa_lock);
553 LASSERT(!sai->sai_task);
554 LASSERT(!sai->sai_agl_task);
555 LASSERT(sai->sai_sent == sai->sai_replied);
557 list_for_each_entry_safe(entry, next, &sai->sai_entries,
561 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
562 LASSERT(agl_list_empty(sai));
565 atomic_dec(&sbi->ll_sa_running);
569 /* Do NOT forget to drop inode refcount when into sai_agls. */
570 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
572 struct ll_inode_info *lli = ll_i2info(inode);
573 u64 index = lli->lli_agl_index;
579 LASSERT(list_empty(&lli->lli_agl_list));
581 /* AGL maybe fall behind statahead with one entry */
582 if (is_omitted_entry(sai, index + 1)) {
583 lli->lli_agl_index = 0;
589 * In case of restore, the MDT has the right size and has already
590 * sent it back without granting the layout lock, inode is up-to-date.
591 * Then AGL (async glimpse lock) is useless.
592 * Also to glimpse we need the layout, in case of a runninh restore
593 * the MDT holds the layout lock so the glimpse will block up to the
594 * end of restore (statahead/agl will block)
596 if (test_bit(LLIF_FILE_RESTORING, &lli->lli_flags)) {
597 lli->lli_agl_index = 0;
602 /* Someone is in glimpse (sync or async), do nothing. */
603 rc = down_write_trylock(&lli->lli_glimpse_sem);
605 lli->lli_agl_index = 0;
611 * Someone triggered glimpse within 1 sec before.
612 * 1) The former glimpse succeeded with glimpse lock granted by OST, and
613 * if the lock is still cached on client, AGL needs to do nothing. If
614 * it is cancelled by other client, AGL maybe cannot obtaion new lock
615 * for no glimpse callback triggered by AGL.
616 * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
617 * Under such case, it is quite possible that the OST will not grant
618 * glimpse lock for AGL also.
619 * 3) The former glimpse failed, compared with other two cases, it is
620 * relative rare. AGL can ignore such case, and it will not muchly
621 * affect the performance.
623 expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
624 if (ktime_to_ns(lli->lli_glimpse_time) &&
625 ktime_before(expire, lli->lli_glimpse_time)) {
626 up_write(&lli->lli_glimpse_sem);
627 lli->lli_agl_index = 0;
633 "Handling (init) async glimpse: inode = " DFID", idx = %llu\n",
634 PFID(&lli->lli_fid), index);
637 lli->lli_agl_index = 0;
638 lli->lli_glimpse_time = ktime_get();
639 up_write(&lli->lli_glimpse_sem);
642 "Handled (init) async glimpse: inode= " DFID", idx = %llu, rc = %d\n",
643 PFID(&lli->lli_fid), index, rc);
650 static void ll_statahead_interpret_fini(struct ll_inode_info *lli,
651 struct ll_statahead_info *sai,
652 struct md_op_item *item,
653 struct sa_entry *entry,
654 struct ptlrpc_request *req,
658 * First it will drop ldlm ibits lock refcount by calling
659 * ll_intent_drop_lock() in spite of failures. Do not worry about
660 * calling ll_intent_drop_lock() more than once.
662 ll_intent_release(&item->mop_it);
665 ptlrpc_req_finished(req);
666 sa_make_ready(sai, entry, rc);
668 spin_lock(&lli->lli_sa_lock);
670 spin_unlock(&lli->lli_sa_lock);
673 static void ll_statahead_interpret_work(struct work_struct *work)
675 struct md_op_item *item = container_of(work, struct md_op_item,
677 struct req_capsule *pill = item->mop_pill;
678 struct inode *dir = item->mop_dir;
679 struct ll_inode_info *lli = ll_i2info(dir);
680 struct ll_statahead_info *sai = lli->lli_sai;
681 struct lookup_intent *it;
682 struct sa_entry *entry;
683 struct mdt_body *body;
689 entry = (struct sa_entry *)item->mop_cbdata;
690 LASSERT(entry->se_handle != 0);
693 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
695 GOTO(out, rc = -EFAULT);
697 child = entry->se_inode;
698 /* revalidate; unlinked and re-created with the same name */
699 if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
701 entry->se_inode = NULL;
704 /* The mdt_body is invalid. Skip this entry */
705 GOTO(out, rc = -EAGAIN);
708 it->it_lock_handle = entry->se_handle;
709 rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
711 GOTO(out, rc = -EAGAIN);
713 rc = ll_prep_inode(&child, pill, dir->i_sb, it);
715 CERROR("%s: getattr callback for %.*s "DFID": rc = %d\n",
716 ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
717 entry->se_qstr.name, PFID(&entry->se_fid), rc);
721 /* If encryption context was returned by MDT, put it in
722 * inode now to save an extra getxattr.
724 if (body->mbo_valid & OBD_MD_ENCCTX) {
725 void *encctx = req_capsule_server_get(pill, &RMF_FILE_ENCCTX);
726 __u32 encctxlen = req_capsule_get_size(pill, &RMF_FILE_ENCCTX,
731 "server returned encryption ctx for "DFID"\n",
732 PFID(ll_inode2fid(child)));
733 rc = ll_xattr_cache_insert(child,
734 xattr_for_enc(child),
737 CWARN("%s: cannot set enc ctx for "DFID": rc = %d\n",
738 ll_i2sbi(child)->ll_fsname,
739 PFID(ll_inode2fid(child)), rc);
743 CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
744 ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
745 entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
746 ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
748 entry->se_inode = child;
750 if (agl_should_run(sai, child))
751 ll_agl_add(sai, child, entry->se_index);
753 ll_statahead_interpret_fini(lli, sai, item, entry, pill->rc_req, rc);
757 * Callback for async stat RPC, this is called in ptlrpcd context. It prepares
758 * the inode and set lock data directly in the ptlrpcd context. It will wake up
759 * the directory listing process if the dentry is the waiting one.
761 static int ll_statahead_interpret(struct md_op_item *item, int rc)
763 struct req_capsule *pill = item->mop_pill;
764 struct lookup_intent *it = &item->mop_it;
765 struct inode *dir = item->mop_dir;
766 struct ll_inode_info *lli = ll_i2info(dir);
767 struct ll_statahead_info *sai = lli->lli_sai;
768 struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
769 struct work_struct *work = &item->mop_work;
770 struct mdt_body *body;
776 if (it_disposition(it, DISP_LOOKUP_NEG))
780 * because statahead thread will wait for all inflight RPC to finish,
781 * sai should be always valid, no need to refcount
783 LASSERT(sai != NULL);
784 LASSERT(entry != NULL);
786 CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
787 entry->se_qstr.len, entry->se_qstr.name, rc);
792 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
794 GOTO(out, rc = -EFAULT);
796 child = entry->se_inode;
797 /* revalidate; unlinked and re-created with the same name */
798 if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
800 entry->se_inode = NULL;
803 /* The mdt_body is invalid. Skip this entry */
804 GOTO(out, rc = -EAGAIN);
807 entry->se_handle = it->it_lock_handle;
809 * In ptlrpcd context, it is not allowed to generate new RPCs
810 * especially for striped directories or regular files with layout
814 * release ibits lock ASAP to avoid deadlock when statahead
815 * thread enqueues lock on parent in readdir and another
816 * process enqueues lock on child with parent lock held, eg.
819 handle = it->it_lock_handle;
820 ll_intent_drop_lock(it);
821 ll_unlock_md_op_lsm(&item->mop_data);
824 * If the statahead entry is a striped directory or regular file with
825 * layout change, it will generate a new RPC and long wait in the
827 * However, it is dangerous of blocking in ptlrpcd thread.
828 * Here we use work queue or the separate statahead thread to handle
829 * the extra RPC and long wait:
830 * (@ll_prep_inode->@lmv_revalidate_slaves);
831 * (@ll_prep_inode->@lov_layout_change->osc_cache_wait_range);
833 INIT_WORK(work, ll_statahead_interpret_work);
834 ptlrpc_request_addref(pill->rc_req);
838 ll_statahead_interpret_fini(lli, sai, item, entry, NULL, rc);
842 static inline int sa_getattr(struct inode *dir, struct md_op_item *item)
844 struct ll_statahead_info *sai = ll_i2info(dir)->lli_sai;
847 if (sa_has_batch_handle(sai))
848 rc = md_batch_add(ll_i2mdexp(dir), sai->sai_bh, item);
850 rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
855 /* async stat for file not found in dcache */
856 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
858 struct md_op_item *item;
863 item = sa_prep_data(dir, NULL, entry);
865 RETURN(PTR_ERR(item));
867 rc = sa_getattr(dir, item);
875 * async stat for file found in dcache, similar to .revalidate
877 * \retval 1 dentry valid, no RPC sent
878 * \retval 0 dentry invalid, will send async stat RPC
879 * \retval negative number upon error
881 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
882 struct dentry *dentry)
884 struct inode *inode = dentry->d_inode;
885 struct lookup_intent it = { .it_op = IT_GETATTR,
886 .it_lock_handle = 0 };
887 struct md_op_item *item;
892 if (unlikely(!inode))
895 if (d_mountpoint(dentry))
898 item = sa_prep_data(dir, inode, entry);
900 RETURN(PTR_ERR(item));
902 entry->se_inode = igrab(inode);
903 rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
906 entry->se_handle = it.it_lock_handle;
907 ll_intent_release(&it);
912 rc = sa_getattr(dir, item);
914 entry->se_inode = NULL;
922 /* async stat for file with @name */
923 static void sa_statahead(struct dentry *parent, const char *name, int len,
924 const struct lu_fid *fid)
926 struct inode *dir = parent->d_inode;
927 struct ll_inode_info *lli = ll_i2info(dir);
928 struct ll_statahead_info *sai = lli->lli_sai;
929 struct dentry *dentry = NULL;
930 struct sa_entry *entry;
935 entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
939 dentry = d_lookup(parent, &entry->se_qstr);
941 rc = sa_lookup(dir, entry);
943 rc = sa_revalidate(dir, entry, dentry);
944 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
945 ll_agl_add(sai, dentry->d_inode, entry->se_index);
952 sa_make_ready(sai, entry, rc);
958 if (sa_sent_full(sai))
959 ll_statahead_flush_nowait(sai);
964 /* async glimpse (agl) thread main function */
965 static int ll_agl_thread(void *arg)
967 struct dentry *parent = (struct dentry *)arg;
968 struct inode *dir = parent->d_inode;
969 struct ll_inode_info *plli = ll_i2info(dir);
970 struct ll_inode_info *clli;
972 * We already own this reference, so it is safe to take it
975 struct ll_statahead_info *sai = plli->lli_sai;
979 CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
982 while (({set_current_state(TASK_IDLE);
983 !kthread_should_stop(); })) {
984 spin_lock(&plli->lli_agl_lock);
985 clli = list_first_entry_or_null(&sai->sai_agls,
986 struct ll_inode_info,
989 __set_current_state(TASK_RUNNING);
990 list_del_init(&clli->lli_agl_list);
991 spin_unlock(&plli->lli_agl_lock);
992 ll_agl_trigger(&clli->lli_vfs_inode, sai);
995 spin_unlock(&plli->lli_agl_lock);
999 __set_current_state(TASK_RUNNING);
1003 static void ll_stop_agl(struct ll_statahead_info *sai)
1005 struct dentry *parent = sai->sai_dentry;
1006 struct ll_inode_info *plli = ll_i2info(parent->d_inode);
1007 struct ll_inode_info *clli;
1008 struct task_struct *agl_task;
1010 spin_lock(&plli->lli_agl_lock);
1011 agl_task = sai->sai_agl_task;
1012 sai->sai_agl_task = NULL;
1013 spin_unlock(&plli->lli_agl_lock);
1017 CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
1018 sai, (unsigned int)agl_task->pid);
1019 kthread_stop(agl_task);
1021 spin_lock(&plli->lli_agl_lock);
1022 while ((clli = list_first_entry_or_null(&sai->sai_agls,
1023 struct ll_inode_info,
1024 lli_agl_list)) != NULL) {
1025 list_del_init(&clli->lli_agl_list);
1026 spin_unlock(&plli->lli_agl_lock);
1027 clli->lli_agl_index = 0;
1028 iput(&clli->lli_vfs_inode);
1029 spin_lock(&plli->lli_agl_lock);
1031 spin_unlock(&plli->lli_agl_lock);
1032 CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
1037 /* start agl thread */
1038 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
1040 int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1041 struct ll_inode_info *plli;
1042 struct task_struct *task;
1046 CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
1049 plli = ll_i2info(parent->d_inode);
1050 task = kthread_create_on_node(ll_agl_thread, parent, node, "ll_agl_%d",
1051 plli->lli_opendir_pid);
1053 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
1056 sai->sai_agl_task = task;
1057 atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
1058 /* Get an extra reference that the thread holds */
1059 ll_sai_get(d_inode(parent));
1061 wake_up_process(task);
1066 /* statahead thread main function */
1067 static int ll_statahead_thread(void *arg)
1069 struct dentry *parent = (struct dentry *)arg;
1070 struct inode *dir = parent->d_inode;
1071 struct ll_inode_info *lli = ll_i2info(dir);
1072 struct ll_sb_info *sbi = ll_i2sbi(dir);
1073 struct ll_statahead_info *sai = lli->lli_sai;
1075 struct md_op_data *op_data;
1076 struct page *page = NULL;
1077 struct lu_batch *bh = NULL;
1083 CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
1086 sai->sai_max_batch_count = sbi->ll_sa_batch_max;
1087 if (sai->sai_max_batch_count) {
1088 bh = md_batch_create(ll_i2mdexp(dir), BATCH_FL_RDONLY,
1089 sai->sai_max_batch_count);
1091 GOTO(out_stop_agl, rc = PTR_ERR(bh));
1095 OBD_ALLOC_PTR(op_data);
1097 GOTO(out, rc = -ENOMEM);
1099 /* matches smp_store_release() in ll_deauthorize_statahead() */
1100 while (pos != MDS_DIR_END_OFF && smp_load_acquire(&sai->sai_task)) {
1101 struct lu_dirpage *dp;
1102 struct lu_dirent *ent;
1104 op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
1105 LUSTRE_OPC_ANY, dir);
1106 if (IS_ERR(op_data)) {
1107 rc = PTR_ERR(op_data);
1111 page = ll_get_dir_page(dir, op_data, pos, NULL);
1112 ll_unlock_md_op_lsm(op_data);
1116 "error reading dir "DFID" at %llu /%llu opendir_pid = %u: rc = %d\n",
1117 PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1118 lli->lli_opendir_pid, rc);
1122 dp = page_address(page);
1123 for (ent = lu_dirent_start(dp);
1124 /* matches smp_store_release() in ll_deauthorize_statahead() */
1125 ent != NULL && smp_load_acquire(&sai->sai_task) &&
1127 ent = lu_dirent_next(ent)) {
1132 struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1134 hash = le64_to_cpu(ent->lde_hash);
1135 if (unlikely(hash < pos))
1137 * Skip until we find target hash value.
1141 namelen = le16_to_cpu(ent->lde_namelen);
1142 if (unlikely(namelen == 0))
1144 * Skip dummy record.
1148 name = ent->lde_name;
1149 if (name[0] == '.') {
1155 } else if (name[1] == '.' && namelen == 2) {
1160 } else if (!sai->sai_ls_all) {
1162 * skip hidden files.
1164 sai->sai_skip_hidden++;
1170 * don't stat-ahead first entry.
1172 if (unlikely(++first == 1))
1175 fid_le_to_cpu(&fid, &ent->lde_fid);
1177 while (({set_current_state(TASK_IDLE);
1178 /* matches smp_store_release() in
1179 * ll_deauthorize_statahead() */
1180 smp_load_acquire(&sai->sai_task); })) {
1181 spin_lock(&lli->lli_agl_lock);
1182 while (sa_sent_full(sai) &&
1183 !agl_list_empty(sai)) {
1184 struct ll_inode_info *clli;
1186 __set_current_state(TASK_RUNNING);
1187 clli = agl_first_entry(sai);
1188 list_del_init(&clli->lli_agl_list);
1189 spin_unlock(&lli->lli_agl_lock);
1191 ll_agl_trigger(&clli->lli_vfs_inode,
1194 spin_lock(&lli->lli_agl_lock);
1196 spin_unlock(&lli->lli_agl_lock);
1198 if (!sa_sent_full(sai))
1202 __set_current_state(TASK_RUNNING);
1204 if (IS_ENCRYPTED(dir)) {
1205 struct llcrypt_str de_name =
1206 LLTR_INIT(ent->lde_name, namelen);
1209 rc = llcrypt_fname_alloc_buffer(dir, NAME_MAX,
1214 fid_le_to_cpu(&fid, &ent->lde_fid);
1215 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1217 llcrypt_fname_free_buffer(&lltr);
1225 sa_statahead(parent, name, namelen, &fid);
1226 llcrypt_fname_free_buffer(&lltr);
1229 pos = le64_to_cpu(dp->ldp_hash_end);
1230 down_read(&lli->lli_lsm_sem);
1231 ll_release_page(dir, page,
1232 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1233 up_read(&lli->lli_lsm_sem);
1235 if (sa_low_hit(sai)) {
1237 atomic_inc(&sbi->ll_sa_wrong);
1239 "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stoppingstatahead thread: pid %d\n",
1240 PFID(&lli->lli_fid), sai->sai_hit,
1241 sai->sai_miss, sai->sai_sent,
1242 sai->sai_replied, current->pid);
1246 ll_finish_md_op_data(op_data);
1249 spin_lock(&lli->lli_sa_lock);
1250 sai->sai_task = NULL;
1251 lli->lli_sa_enabled = 0;
1252 spin_unlock(&lli->lli_sa_lock);
1255 ll_statahead_flush_nowait(sai);
1258 * statahead is finished, but statahead entries need to be cached, wait
1259 * for file release closedir() call to stop me.
1261 while (({set_current_state(TASK_IDLE);
1262 /* matches smp_store_release() in ll_deauthorize_statahead() */
1263 smp_load_acquire(&sai->sai_task); })) {
1266 __set_current_state(TASK_RUNNING);
1271 rc = md_batch_stop(ll_i2mdexp(dir), sai->sai_bh);
1279 * wait for inflight statahead RPCs to finish, and then we can free sai
1280 * safely because statahead RPC will access sai data
1282 while (sai->sai_sent != sai->sai_replied)
1283 /* in case we're not woken up, timeout wait */
1286 CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd\n",
1287 sbi->ll_fsname, sai, parent);
1289 spin_lock(&lli->lli_sa_lock);
1290 sai->sai_task = NULL;
1291 spin_unlock(&lli->lli_sa_lock);
1292 wake_up(&sai->sai_waitq);
1294 atomic_add(sai->sai_hit, &sbi->ll_sa_hit_total);
1295 atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
1302 /* authorize opened dir handle @key to statahead */
1303 void ll_authorize_statahead(struct inode *dir, void *key)
1305 struct ll_inode_info *lli = ll_i2info(dir);
1307 spin_lock(&lli->lli_sa_lock);
1308 if (!lli->lli_opendir_key && !lli->lli_sai) {
1310 * if lli_sai is not NULL, it means previous statahead is not
1311 * finished yet, we'd better not start a new statahead for now.
1313 LASSERT(lli->lli_opendir_pid == 0);
1314 lli->lli_opendir_key = key;
1315 lli->lli_opendir_pid = current->pid;
1316 lli->lli_sa_enabled = 1;
1318 spin_unlock(&lli->lli_sa_lock);
1322 * deauthorize opened dir handle @key to statahead, and notify statahead thread
1323 * to quit if it's running.
1325 void ll_deauthorize_statahead(struct inode *dir, void *key)
1327 struct ll_inode_info *lli = ll_i2info(dir);
1328 struct ll_statahead_info *sai;
1330 LASSERT(lli->lli_opendir_key == key);
1331 LASSERT(lli->lli_opendir_pid != 0);
1333 CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1334 PFID(&lli->lli_fid));
1336 spin_lock(&lli->lli_sa_lock);
1337 lli->lli_opendir_key = NULL;
1338 lli->lli_opendir_pid = 0;
1339 lli->lli_sa_enabled = 0;
1341 if (sai && sai->sai_task) {
1343 * statahead thread may not have quit yet because it needs to
1344 * cache entries, now it's time to tell it to quit.
1346 * wake_up_process() provides the necessary barriers
1347 * to pair with set_current_state().
1349 struct task_struct *task = sai->sai_task;
1351 /* matches smp_load_acquire() in ll_statahead_thread() */
1352 smp_store_release(&sai->sai_task, NULL);
1353 wake_up_process(task);
1355 spin_unlock(&lli->lli_sa_lock);
1360 * not first dirent, or is "."
1362 LS_NOT_FIRST_DE = 0,
1364 * the first non-hidden dirent
1368 * the first hidden dirent, that is "."
1373 /* file is first dirent under @dir */
1374 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1376 struct qstr *target = &dentry->d_name;
1377 struct md_op_data *op_data;
1379 struct page *page = NULL;
1380 int rc = LS_NOT_FIRST_DE;
1382 struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
1386 op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1387 LUSTRE_OPC_ANY, dir);
1388 if (IS_ERR(op_data))
1389 RETURN(PTR_ERR(op_data));
1391 if (IS_ENCRYPTED(dir)) {
1392 int rc2 = llcrypt_fname_alloc_buffer(dir, NAME_MAX, &lltr);
1399 *FIXME choose the start offset of the readdir
1402 page = ll_get_dir_page(dir, op_data, 0, NULL);
1405 struct lu_dirpage *dp;
1406 struct lu_dirent *ent;
1409 struct ll_inode_info *lli = ll_i2info(dir);
1412 CERROR("%s: reading dir "DFID" at %llu opendir_pid = %u : rc = %d\n",
1413 ll_i2sbi(dir)->ll_fsname,
1414 PFID(ll_inode2fid(dir)), pos,
1415 lli->lli_opendir_pid, rc);
1419 dp = page_address(page);
1420 for (ent = lu_dirent_start(dp); ent != NULL;
1421 ent = lu_dirent_next(ent)) {
1426 hash = le64_to_cpu(ent->lde_hash);
1428 * The ll_get_dir_page() can return any page containing
1429 * the given hash which may be not the start hash.
1431 if (unlikely(hash < pos))
1434 namelen = le16_to_cpu(ent->lde_namelen);
1435 if (unlikely(namelen == 0))
1437 * skip dummy record.
1441 name = ent->lde_name;
1442 if (name[0] == '.') {
1448 else if (name[1] == '.' && namelen == 2)
1459 if (dot_de && target->name[0] != '.') {
1460 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1461 target->len, target->name,
1466 if (IS_ENCRYPTED(dir)) {
1467 struct llcrypt_str de_name =
1468 LLTR_INIT(ent->lde_name, namelen);
1471 fid_le_to_cpu(&fid, &ent->lde_fid);
1472 if (ll_fname_disk_to_usr(dir, 0, 0, &de_name,
1479 if (target->len != namelen ||
1480 memcmp(target->name, name, namelen) != 0)
1481 rc = LS_NOT_FIRST_DE;
1485 rc = LS_FIRST_DOT_DE;
1487 ll_release_page(dir, page, false);
1490 pos = le64_to_cpu(dp->ldp_hash_end);
1491 if (pos == MDS_DIR_END_OFF) {
1493 * End of directory reached.
1495 ll_release_page(dir, page, false);
1499 * chain is exhausted
1500 * Normal case: continue to the next page.
1502 ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1504 page = ll_get_dir_page(dir, op_data, pos, NULL);
1509 llcrypt_fname_free_buffer(&lltr);
1510 ll_finish_md_op_data(op_data);
1516 * revalidate @dentryp from statahead cache
1518 * \param[in] dir parent directory
1519 * \param[in] sai sai structure
1520 * \param[out] dentryp pointer to dentry which will be revalidated
1521 * \param[in] unplug unplug statahead window only (normally for negative
1523 * \retval 1 on success, dentry is saved in @dentryp
1524 * \retval 0 if revalidation failed (no proper lock on client)
1525 * \retval negative number upon error
1527 static int revalidate_statahead_dentry(struct inode *dir,
1528 struct ll_statahead_info *sai,
1529 struct dentry **dentryp,
1532 struct sa_entry *entry = NULL;
1533 struct ll_inode_info *lli = ll_i2info(dir);
1538 if ((*dentryp)->d_name.name[0] == '.') {
1539 if (sai->sai_ls_all ||
1540 sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1542 * Hidden dentry is the first one, or statahead
1543 * thread does not skip so many hidden dentries
1544 * before "sai_ls_all" enabled as below.
1547 if (!sai->sai_ls_all)
1549 * It maybe because hidden dentry is not
1550 * the first one, "sai_ls_all" was not
1551 * set, then "ls -al" missed. Enable
1552 * "sai_ls_all" for such case.
1554 sai->sai_ls_all = 1;
1557 * Such "getattr" has been skipped before
1558 * "sai_ls_all" enabled as above.
1560 sai->sai_miss_hidden++;
1568 entry = sa_get(sai, &(*dentryp)->d_name);
1570 GOTO(out, rc = -EAGAIN);
1572 if (!sa_ready(entry)) {
1573 spin_lock(&lli->lli_sa_lock);
1574 sai->sai_index_wait = entry->se_index;
1575 spin_unlock(&lli->lli_sa_lock);
1576 rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
1577 cfs_time_seconds(30));
1580 * entry may not be ready, so it may be used by inflight
1581 * statahead RPC, don't free it.
1584 GOTO(out, rc = -EAGAIN);
1589 * We need to see the value that was set immediately before we
1592 if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
1594 struct inode *inode = entry->se_inode;
1595 struct lookup_intent it = { .it_op = IT_GETATTR,
1600 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1601 ll_inode2fid(inode), &bits);
1603 if (!(*dentryp)->d_inode) {
1604 struct dentry *alias;
1606 alias = ll_splice_alias(inode, *dentryp);
1607 if (IS_ERR(alias)) {
1608 ll_intent_release(&it);
1609 GOTO(out, rc = PTR_ERR(alias));
1613 * statahead prepared this inode, transfer inode
1614 * refcount from sa_entry to dentry
1616 entry->se_inode = NULL;
1617 } else if ((*dentryp)->d_inode != inode) {
1618 /* revalidate, but inode is recreated */
1620 "%s: stale dentry %pd inode " DFID", statahead inode "DFID "\n",
1621 ll_i2sbi(inode)->ll_fsname, *dentryp,
1622 PFID(ll_inode2fid((*dentryp)->d_inode)),
1623 PFID(ll_inode2fid(inode)));
1624 ll_intent_release(&it);
1625 GOTO(out, rc = -ESTALE);
1628 if (bits & MDS_INODELOCK_LOOKUP) {
1629 d_lustre_revalidate(*dentryp);
1630 if (S_ISDIR(inode->i_mode))
1631 ll_update_dir_depth_dmv(dir, *dentryp);
1634 ll_intent_release(&it);
1639 * statahead cached sa_entry can be used only once, and will be killed
1640 * right after use, so if lookup/revalidate accessed statahead cache,
1641 * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
1642 * stat this file again, we know we've done statahead before, see
1643 * dentry_may_statahead().
1645 if (lld_is_init(*dentryp))
1646 ll_d2d(*dentryp)->lld_sa_generation = lli->lli_sa_generation;
1647 sa_put(dir, sai, entry);
1653 * start statahead thread
1655 * \param[in] dir parent directory
1656 * \param[in] dentry dentry that triggers statahead, normally the first
1658 * \param[in] agl indicate whether AGL is needed
1659 * \retval -EAGAIN on success, because when this function is
1660 * called, it's already in lookup call, so client should
1661 * do it itself instead of waiting for statahead thread
1662 * to do it asynchronously.
1663 * \retval negative number upon error
1665 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
1668 int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1669 struct ll_inode_info *lli = ll_i2info(dir);
1670 struct ll_statahead_info *sai = NULL;
1671 struct dentry *parent = dentry->d_parent;
1672 struct task_struct *task;
1673 struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
1674 int first = LS_FIRST_DE;
1679 /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1680 first = is_first_dirent(dir, dentry);
1681 if (first == LS_NOT_FIRST_DE)
1682 /* It is not "ls -{a}l" operation, no need statahead for it. */
1683 GOTO(out, rc = -EFAULT);
1685 if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
1686 sbi->ll_sa_running_max)) {
1688 "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
1689 GOTO(out, rc = -EMFILE);
1692 sai = ll_sai_alloc(parent);
1694 GOTO(out, rc = -ENOMEM);
1696 sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
1699 * if current lli_opendir_key was deauthorized, or dir re-opened by
1700 * another process, don't start statahead, otherwise the newly spawned
1701 * statahead thread won't be notified to quit.
1703 spin_lock(&lli->lli_sa_lock);
1704 if (unlikely(lli->lli_sai || !lli->lli_opendir_key ||
1705 lli->lli_opendir_pid != current->pid)) {
1706 spin_unlock(&lli->lli_sa_lock);
1707 GOTO(out, rc = -EPERM);
1710 spin_unlock(&lli->lli_sa_lock);
1712 CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
1713 current->pid, parent);
1715 task = kthread_create_on_node(ll_statahead_thread, parent, node,
1716 "ll_sa_%u", lli->lli_opendir_pid);
1718 spin_lock(&lli->lli_sa_lock);
1719 lli->lli_sai = NULL;
1720 spin_unlock(&lli->lli_sa_lock);
1722 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1726 if (test_bit(LL_SBI_AGL_ENABLED, sbi->ll_flags) && agl)
1727 ll_start_agl(parent, sai);
1729 atomic_inc(&sbi->ll_sa_total);
1730 sai->sai_task = task;
1732 wake_up_process(task);
1734 * We don't stat-ahead for the first dirent since we are already in
1741 * once we start statahead thread failed, disable statahead so that
1742 * subsequent stat won't waste time to try it.
1744 spin_lock(&lli->lli_sa_lock);
1745 if (lli->lli_opendir_pid == current->pid)
1746 lli->lli_sa_enabled = 0;
1747 spin_unlock(&lli->lli_sa_lock);
1751 if (first != LS_NOT_FIRST_DE)
1752 atomic_dec(&sbi->ll_sa_running);
1758 * Check whether statahead for @dir was started.
1760 static inline bool ll_statahead_started(struct inode *dir, bool agl)
1762 struct ll_inode_info *lli = ll_i2info(dir);
1763 struct ll_statahead_info *sai;
1765 spin_lock(&lli->lli_sa_lock);
1767 if (sai && (sai->sai_agl_task != NULL) != agl)
1769 "%s: Statahead AGL hint changed from %d to %d\n",
1770 ll_i2sbi(dir)->ll_fsname,
1771 sai->sai_agl_task != NULL, agl);
1772 spin_unlock(&lli->lli_sa_lock);
1778 * statahead entry function, this is called when client getattr on a file, it
1779 * will start statahead thread if this is the first dir entry, else revalidate
1780 * dentry from statahead cache.
1782 * \param[in] dir parent directory
1783 * \param[out] dentryp dentry to getattr
1784 * \param[in] agl whether start the agl thread
1786 * \retval 1 on success
1787 * \retval 0 revalidation from statahead cache failed, caller needs
1788 * to getattr from server directly
1789 * \retval negative number on error, caller often ignores this and
1790 * then getattr from server
1792 int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
1794 if (!ll_statahead_started(dir, agl))
1795 return start_statahead_thread(dir, dentry, agl);
1800 * revalidate dentry from statahead cache.
1802 * \param[in] dir parent directory
1803 * \param[out] dentryp dentry to getattr
1804 * \param[in] unplug unplug statahead window only (normally for negative
1806 * \retval 1 on success
1807 * \retval 0 revalidation from statahead cache failed, caller needs
1808 * to getattr from server directly
1809 * \retval negative number on error, caller often ignores this and
1810 * then getattr from server
1812 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
1815 struct ll_statahead_info *sai;
1818 sai = ll_sai_get(dir);
1820 rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
1821 CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",