Whamcloud - gitweb
1db80aec908478fe6ba85ac88134921a9d50ddbb
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #include <linux/fs.h>
33 #include <linux/sched.h>
34 #include <linux/kthread.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38 #include <linux/delay.h>
39
40 #define DEBUG_SUBSYSTEM S_LLITE
41
42 #include <obd_support.h>
43 #include <lustre_dlm.h>
44 #include "llite_internal.h"
45
46 #define SA_OMITTED_ENTRY_MAX 8ULL
47
48 typedef enum {
49         /** negative values are for error cases */
50         SA_ENTRY_INIT = 0,      /** init entry */
51         SA_ENTRY_SUCC = 1,      /** stat succeed */
52         SA_ENTRY_INVA = 2,      /** invalid entry */
53 } se_state_t;
54
55 /*
56  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
57  * and in async stat callback ll_statahead_interpret() will prepare the inode
58  * and set lock data in the ptlrpcd context. Then the scanner process will be
59  * woken up if this entry is the waiting one, can access and free it.
60  */
61 struct sa_entry {
62         /* link into sai_entries */
63         struct list_head        se_list;
64         /* link into sai hash table locally */
65         struct list_head        se_hash;
66         /* entry index in the sai */
67         __u64                   se_index;
68         /* low layer ldlm lock handle */
69         __u64                   se_handle;
70         /* entry status */
71         se_state_t              se_state;
72         /* entry size, contains name */
73         int                     se_size;
74         /* pointer to the target inode */
75         struct inode           *se_inode;
76         /* entry name */
77         struct qstr             se_qstr;
78         /* entry fid */
79         struct lu_fid           se_fid;
80 };
81
82 static unsigned int sai_generation;
83 static DEFINE_SPINLOCK(sai_generation_lock);
84
85 static inline int sa_unhashed(struct sa_entry *entry)
86 {
87         return list_empty(&entry->se_hash);
88 }
89
90 /* sa_entry is ready to use */
91 static inline int sa_ready(struct sa_entry *entry)
92 {
93         /* Make sure sa_entry is updated and ready to use */
94         smp_rmb();
95         return (entry->se_state != SA_ENTRY_INIT);
96 }
97
98 /* hash value to put in sai_cache */
99 static inline int sa_hash(int val)
100 {
101         return val & LL_SA_CACHE_MASK;
102 }
103
104 /* hash entry into sai_cache */
105 static inline void
106 sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
107 {
108         int i = sa_hash(entry->se_qstr.hash);
109
110         spin_lock(&sai->sai_cache_lock[i]);
111         list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
112         spin_unlock(&sai->sai_cache_lock[i]);
113 }
114
115 /* unhash entry from sai_cache */
116 static inline void
117 sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
118 {
119         int i = sa_hash(entry->se_qstr.hash);
120
121         spin_lock(&sai->sai_cache_lock[i]);
122         list_del_init(&entry->se_hash);
123         spin_unlock(&sai->sai_cache_lock[i]);
124 }
125
126 static inline int agl_should_run(struct ll_statahead_info *sai,
127                                  struct inode *inode)
128 {
129         return inode && S_ISREG(inode->i_mode) && sai->sai_agl_task;
130 }
131
132 static inline struct ll_inode_info *
133 agl_first_entry(struct ll_statahead_info *sai)
134 {
135         return list_entry(sai->sai_agls.next, struct ll_inode_info,
136                           lli_agl_list);
137 }
138
139 /* statahead window is full */
140 static inline int sa_sent_full(struct ll_statahead_info *sai)
141 {
142         return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
143 }
144
145 static inline int agl_list_empty(struct ll_statahead_info *sai)
146 {
147         return list_empty(&sai->sai_agls);
148 }
149
150 /**
151  * (1) hit ratio less than 80%
152  * or
153  * (2) consecutive miss more than 8
154  * then means low hit.
155  */
156 static inline int sa_low_hit(struct ll_statahead_info *sai)
157 {
158         return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
159                 (sai->sai_consecutive_miss > 8));
160 }
161
162 /*
163  * if the given index is behind of statahead window more than
164  * SA_OMITTED_ENTRY_MAX, then it is old.
165  */
166 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
167 {
168         return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
169                 sai->sai_index);
170 }
171
172 /* allocate sa_entry and hash it to allow scanner process to find it */
173 static struct sa_entry *
174 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
175          const char *name, int len, const struct lu_fid *fid)
176 {
177         struct ll_inode_info *lli;
178         struct sa_entry *entry;
179         int entry_size;
180         char *dname;
181
182         ENTRY;
183
184         entry_size = sizeof(struct sa_entry) + (len & ~3) + 4;
185         OBD_ALLOC(entry, entry_size);
186         if (unlikely(!entry))
187                 RETURN(ERR_PTR(-ENOMEM));
188
189         CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
190                len, name, entry, index);
191
192         entry->se_index = index;
193
194         entry->se_state = SA_ENTRY_INIT;
195         entry->se_size = entry_size;
196         dname = (char *)entry + sizeof(struct sa_entry);
197         memcpy(dname, name, len);
198         dname[len] = 0;
199         entry->se_qstr.hash = ll_full_name_hash(parent, name, len);
200         entry->se_qstr.len = len;
201         entry->se_qstr.name = dname;
202         entry->se_fid = *fid;
203
204         lli = ll_i2info(sai->sai_dentry->d_inode);
205
206         spin_lock(&lli->lli_sa_lock);
207         INIT_LIST_HEAD(&entry->se_list);
208         sa_rehash(sai, entry);
209         spin_unlock(&lli->lli_sa_lock);
210
211         atomic_inc(&sai->sai_cache_count);
212
213         RETURN(entry);
214 }
215
216 /* free sa_entry, which should have been unhashed and not in any list */
217 static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
218 {
219         CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
220                entry->se_qstr.len, entry->se_qstr.name, entry,
221                entry->se_index);
222
223         LASSERT(list_empty(&entry->se_list));
224         LASSERT(sa_unhashed(entry));
225
226         OBD_FREE(entry, entry->se_size);
227         atomic_dec(&sai->sai_cache_count);
228 }
229
230 /*
231  * find sa_entry by name, used by directory scanner, lock is not needed because
232  * only scanner can remove the entry from cache.
233  */
234 static struct sa_entry *
235 sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
236 {
237         struct sa_entry *entry;
238         int i = sa_hash(qstr->hash);
239
240         list_for_each_entry(entry, &sai->sai_cache[i], se_hash) {
241                 if (entry->se_qstr.hash == qstr->hash &&
242                     entry->se_qstr.len == qstr->len &&
243                     memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0)
244                         return entry;
245         }
246         return NULL;
247 }
248
249 /* unhash and unlink sa_entry, and then free it */
250 static inline void
251 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
252 {
253         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
254
255         LASSERT(!sa_unhashed(entry));
256         LASSERT(!list_empty(&entry->se_list));
257         LASSERT(sa_ready(entry));
258
259         sa_unhash(sai, entry);
260
261         spin_lock(&lli->lli_sa_lock);
262         list_del_init(&entry->se_list);
263         spin_unlock(&lli->lli_sa_lock);
264
265         iput(entry->se_inode);
266
267         sa_free(sai, entry);
268 }
269
270 /* called by scanner after use, sa_entry will be killed */
271 static void
272 sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
273 {
274         struct sa_entry *tmp, *next;
275
276         if (entry && entry->se_state == SA_ENTRY_SUCC) {
277                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
278
279                 sai->sai_hit++;
280                 sai->sai_consecutive_miss = 0;
281                 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
282         } else {
283                 sai->sai_miss++;
284                 sai->sai_consecutive_miss++;
285         }
286
287         if (entry)
288                 sa_kill(sai, entry);
289
290         /*
291          * kill old completed entries, only scanner process does this, no need
292          * to lock
293          */
294         list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) {
295                 if (!is_omitted_entry(sai, tmp->se_index))
296                         break;
297                 sa_kill(sai, tmp);
298         }
299 }
300
301 /*
302  * update state and sort add entry to sai_entries by index, return true if
303  * scanner is waiting on this entry.
304  */
305 static bool
306 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
307 {
308         struct sa_entry *se;
309         struct list_head *pos = &sai->sai_entries;
310         __u64 index = entry->se_index;
311
312         LASSERT(!sa_ready(entry));
313         LASSERT(list_empty(&entry->se_list));
314
315         list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
316                 if (se->se_index < entry->se_index) {
317                         pos = &se->se_list;
318                         break;
319                 }
320         }
321         list_add(&entry->se_list, pos);
322         /*
323          * LU-9210: ll_statahead_interpet must be able to see this before
324          * we wake it up
325          */
326         smp_store_release(&entry->se_state,
327                           ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
328
329         return (index == sai->sai_index_wait);
330 }
331
332 /* finish async stat RPC arguments */
333 static void sa_fini_data(struct md_op_item *item)
334 {
335         ll_unlock_md_op_lsm(&item->mop_data);
336         iput(item->mop_dir);
337         OBD_FREE_PTR(item);
338 }
339
340 static int ll_statahead_interpret(struct req_capsule *pill,
341                                   struct md_op_item *item, int rc);
342
343 /*
344  * prepare arguments for async stat RPC.
345  */
346 static struct md_op_item *
347 sa_prep_data(struct inode *dir, struct inode *child, struct sa_entry *entry)
348 {
349         struct md_op_item *item;
350         struct ldlm_enqueue_info *einfo;
351         struct md_op_data *op_data;
352
353         OBD_ALLOC_PTR(item);
354         if (!item)
355                 return ERR_PTR(-ENOMEM);
356
357         op_data = ll_prep_md_op_data(&item->mop_data, dir, child,
358                                      entry->se_qstr.name, entry->se_qstr.len, 0,
359                                      LUSTRE_OPC_ANY, NULL);
360         if (IS_ERR(op_data)) {
361                 OBD_FREE_PTR(item);
362                 return (struct md_op_item *)op_data;
363         }
364
365         if (!child)
366                 op_data->op_fid2 = entry->se_fid;
367
368         item->mop_it.it_op = IT_GETATTR;
369         item->mop_dir = igrab(dir);
370         item->mop_cb = ll_statahead_interpret;
371         item->mop_cbdata = entry;
372
373         einfo = &item->mop_einfo;
374         einfo->ei_type = LDLM_IBITS;
375         einfo->ei_mode = it_to_lock_mode(&item->mop_it);
376         einfo->ei_cb_bl = ll_md_blocking_ast;
377         einfo->ei_cb_cp = ldlm_completion_ast;
378         einfo->ei_cb_gl = NULL;
379         einfo->ei_cbdata = NULL;
380
381         return item;
382 }
383
384 /*
385  * release resources used in async stat RPC, update entry state and wakeup if
386  * scanner process it waiting on this entry.
387  */
388 static void
389 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
390 {
391         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
392         bool wakeup;
393
394         spin_lock(&lli->lli_sa_lock);
395         wakeup = __sa_make_ready(sai, entry, ret);
396         spin_unlock(&lli->lli_sa_lock);
397
398         if (wakeup)
399                 wake_up(&sai->sai_waitq);
400 }
401
402 /* insert inode into the list of sai_agls */
403 static void ll_agl_add(struct ll_statahead_info *sai,
404                        struct inode *inode, int index)
405 {
406         struct ll_inode_info *child  = ll_i2info(inode);
407         struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
408
409         spin_lock(&child->lli_agl_lock);
410         if (child->lli_agl_index == 0) {
411                 child->lli_agl_index = index;
412                 spin_unlock(&child->lli_agl_lock);
413
414                 LASSERT(list_empty(&child->lli_agl_list));
415
416                 spin_lock(&parent->lli_agl_lock);
417                 /* Re-check under the lock */
418                 if (agl_should_run(sai, inode)) {
419                         if (agl_list_empty(sai))
420                                 wake_up_process(sai->sai_agl_task);
421                         igrab(inode);
422                         list_add_tail(&child->lli_agl_list, &sai->sai_agls);
423                 } else
424                         child->lli_agl_index = 0;
425                 spin_unlock(&parent->lli_agl_lock);
426         } else {
427                 spin_unlock(&child->lli_agl_lock);
428         }
429 }
430
431 /* allocate sai */
432 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
433 {
434         struct ll_statahead_info *sai;
435         struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
436         int i;
437
438         ENTRY;
439
440         OBD_ALLOC_PTR(sai);
441         if (!sai)
442                 RETURN(NULL);
443
444         sai->sai_dentry = dget(dentry);
445         atomic_set(&sai->sai_refcount, 1);
446         sai->sai_max = LL_SA_RPC_MIN;
447         sai->sai_index = 1;
448         init_waitqueue_head(&sai->sai_waitq);
449
450         INIT_LIST_HEAD(&sai->sai_entries);
451         INIT_LIST_HEAD(&sai->sai_agls);
452
453         for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
454                 INIT_LIST_HEAD(&sai->sai_cache[i]);
455                 spin_lock_init(&sai->sai_cache_lock[i]);
456         }
457         atomic_set(&sai->sai_cache_count, 0);
458
459         spin_lock(&sai_generation_lock);
460         lli->lli_sa_generation = ++sai_generation;
461         if (unlikely(sai_generation == 0))
462                 lli->lli_sa_generation = ++sai_generation;
463         spin_unlock(&sai_generation_lock);
464
465         RETURN(sai);
466 }
467
468 /* free sai */
469 static inline void ll_sai_free(struct ll_statahead_info *sai)
470 {
471         LASSERT(sai->sai_dentry != NULL);
472         dput(sai->sai_dentry);
473         OBD_FREE_PTR(sai);
474 }
475
476 /*
477  * take refcount of sai if sai for @dir exists, which means statahead is on for
478  * this directory.
479  */
480 static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
481 {
482         struct ll_inode_info *lli = ll_i2info(dir);
483         struct ll_statahead_info *sai = NULL;
484
485         spin_lock(&lli->lli_sa_lock);
486         sai = lli->lli_sai;
487         if (sai)
488                 atomic_inc(&sai->sai_refcount);
489         spin_unlock(&lli->lli_sa_lock);
490
491         return sai;
492 }
493
494 /*
495  * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
496  * attached to it.
497  */
498 static void ll_sai_put(struct ll_statahead_info *sai)
499 {
500         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
501
502         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
503                 struct sa_entry *entry, *next;
504                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
505
506                 lli->lli_sai = NULL;
507                 spin_unlock(&lli->lli_sa_lock);
508
509                 LASSERT(!sai->sai_task);
510                 LASSERT(!sai->sai_agl_task);
511                 LASSERT(sai->sai_sent == sai->sai_replied);
512
513                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
514                                          se_list)
515                         sa_kill(sai, entry);
516
517                 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
518                 LASSERT(agl_list_empty(sai));
519
520                 ll_sai_free(sai);
521                 atomic_dec(&sbi->ll_sa_running);
522         }
523 }
524
525 /* Do NOT forget to drop inode refcount when into sai_agls. */
526 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
527 {
528         struct ll_inode_info *lli = ll_i2info(inode);
529         u64 index = lli->lli_agl_index;
530         ktime_t expire;
531         int rc;
532
533         ENTRY;
534
535         LASSERT(list_empty(&lli->lli_agl_list));
536
537         /* AGL maybe fall behind statahead with one entry */
538         if (is_omitted_entry(sai, index + 1)) {
539                 lli->lli_agl_index = 0;
540                 iput(inode);
541                 RETURN_EXIT;
542         }
543
544         /*
545          * In case of restore, the MDT has the right size and has already
546          * sent it back without granting the layout lock, inode is up-to-date.
547          * Then AGL (async glimpse lock) is useless.
548          * Also to glimpse we need the layout, in case of a runninh restore
549          * the MDT holds the layout lock so the glimpse will block up to the
550          * end of restore (statahead/agl will block)
551          */
552         if (test_bit(LLIF_FILE_RESTORING, &lli->lli_flags)) {
553                 lli->lli_agl_index = 0;
554                 iput(inode);
555                 RETURN_EXIT;
556         }
557
558         /* Someone is in glimpse (sync or async), do nothing. */
559         rc = down_write_trylock(&lli->lli_glimpse_sem);
560         if (rc == 0) {
561                 lli->lli_agl_index = 0;
562                 iput(inode);
563                 RETURN_EXIT;
564         }
565
566         /*
567          * Someone triggered glimpse within 1 sec before.
568          * 1) The former glimpse succeeded with glimpse lock granted by OST, and
569          *    if the lock is still cached on client, AGL needs to do nothing. If
570          *    it is cancelled by other client, AGL maybe cannot obtaion new lock
571          *    for no glimpse callback triggered by AGL.
572          * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
573          *    Under such case, it is quite possible that the OST will not grant
574          *    glimpse lock for AGL also.
575          * 3) The former glimpse failed, compared with other two cases, it is
576          *    relative rare. AGL can ignore such case, and it will not muchly
577          *    affect the performance.
578          */
579         expire = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
580         if (ktime_to_ns(lli->lli_glimpse_time) &&
581             ktime_before(expire, lli->lli_glimpse_time)) {
582                 up_write(&lli->lli_glimpse_sem);
583                 lli->lli_agl_index = 0;
584                 iput(inode);
585                 RETURN_EXIT;
586         }
587
588         CDEBUG(D_READA,
589                "Handling (init) async glimpse: inode = " DFID", idx = %llu\n",
590                PFID(&lli->lli_fid), index);
591
592         cl_agl(inode);
593         lli->lli_agl_index = 0;
594         lli->lli_glimpse_time = ktime_get();
595         up_write(&lli->lli_glimpse_sem);
596
597         CDEBUG(D_READA,
598                "Handled (init) async glimpse: inode= " DFID", idx = %llu, rc = %d\n",
599                PFID(&lli->lli_fid), index, rc);
600
601         iput(inode);
602
603         EXIT;
604 }
605
606 /*
607  * Callback for async stat RPC, this is called in ptlrpcd context. It prepares
608  * the inode and set lock data directly in the ptlrpcd context. It will wake up
609  * the directory listing process if the dentry is the waiting one.
610  */
611 static int ll_statahead_interpret(struct req_capsule *pill,
612                                   struct md_op_item *item, int rc)
613 {
614         struct lookup_intent *it = &item->mop_it;
615         struct inode *dir = item->mop_dir;
616         struct ll_inode_info *lli = ll_i2info(dir);
617         struct ll_statahead_info *sai = lli->lli_sai;
618         struct sa_entry *entry = (struct sa_entry *)item->mop_cbdata;
619         struct mdt_body *body;
620         struct inode *child;
621         __u64 handle = 0;
622
623         ENTRY;
624
625         if (it_disposition(it, DISP_LOOKUP_NEG))
626                 rc = -ENOENT;
627
628         /*
629          * because statahead thread will wait for all inflight RPC to finish,
630          * sai should be always valid, no need to refcount
631          */
632         LASSERT(sai != NULL);
633         LASSERT(entry != NULL);
634
635         CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
636                entry->se_qstr.len, entry->se_qstr.name, rc);
637
638         if (rc != 0) {
639                 ll_intent_release(it);
640                 sa_fini_data(item);
641         } else {
642                 /*
643                  * release ibits lock ASAP to avoid deadlock when statahead
644                  * thread enqueues lock on parent in readdir and another
645                  * process enqueues lock on child with parent lock held, eg.
646                  * unlink.
647                  */
648                 handle = it->it_lock_handle;
649                 ll_intent_drop_lock(it);
650                 ll_unlock_md_op_lsm(&item->mop_data);
651         }
652
653         if (rc != 0) {
654                 spin_lock(&lli->lli_sa_lock);
655                 if (__sa_make_ready(sai, entry, rc))
656                         wake_up(&sai->sai_waitq);
657
658                 sai->sai_replied++;
659                 spin_unlock(&lli->lli_sa_lock);
660
661                 RETURN(rc);
662         }
663
664         entry->se_handle = handle;
665         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
666         if (!body)
667                 GOTO(out, rc = -EFAULT);
668
669         child = entry->se_inode;
670         /* revalidate; unlinked and re-created with the same name */
671         if (unlikely(!lu_fid_eq(&item->mop_data.op_fid2, &body->mbo_fid1))) {
672                 if (child) {
673                         entry->se_inode = NULL;
674                         iput(child);
675                 }
676                 /* The mdt_body is invalid. Skip this entry */
677                 GOTO(out, rc = -EAGAIN);
678         }
679
680         it->it_lock_handle = entry->se_handle;
681         rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
682         if (rc != 1)
683                 GOTO(out, rc = -EAGAIN);
684
685         rc = ll_prep_inode(&child, pill, dir->i_sb, it);
686         if (rc)
687                 GOTO(out, rc);
688
689         CDEBUG(D_READA, "%s: setting %.*s"DFID" l_data to inode %p\n",
690                ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
691                entry->se_qstr.name, PFID(ll_inode2fid(child)), child);
692         ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
693
694         entry->se_inode = child;
695
696         if (agl_should_run(sai, child))
697                 ll_agl_add(sai, child, entry->se_index);
698
699 out:
700         /*
701          * First it will drop ldlm ibits lock refcount by calling
702          * ll_intent_drop_lock() in spite of failures. Do not worry about
703          * calling ll_intent_drop_lock() more than once.
704          */
705         ll_intent_release(&item->mop_it);
706         sa_fini_data(item);
707         sa_make_ready(sai, entry, rc);
708
709         spin_lock(&lli->lli_sa_lock);
710         sai->sai_replied++;
711         spin_unlock(&lli->lli_sa_lock);
712
713         RETURN(rc);
714 }
715
716 /* async stat for file not found in dcache */
717 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
718 {
719         struct md_op_item *item;
720         int rc;
721
722         ENTRY;
723
724         item = sa_prep_data(dir, NULL, entry);
725         if (IS_ERR(item))
726                 RETURN(PTR_ERR(item));
727
728         rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
729         if (rc < 0)
730                 sa_fini_data(item);
731
732         RETURN(rc);
733 }
734
735 /**
736  * async stat for file found in dcache, similar to .revalidate
737  *
738  * \retval      1 dentry valid, no RPC sent
739  * \retval      0 dentry invalid, will send async stat RPC
740  * \retval      negative number upon error
741  */
742 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
743                          struct dentry *dentry)
744 {
745         struct inode *inode = dentry->d_inode;
746         struct lookup_intent it = { .it_op = IT_GETATTR,
747                                     .it_lock_handle = 0 };
748         struct md_op_item *item;
749         int rc;
750
751         ENTRY;
752
753         if (unlikely(!inode))
754                 RETURN(1);
755
756         if (d_mountpoint(dentry))
757                 RETURN(1);
758
759         item = sa_prep_data(dir, inode, entry);
760         if (IS_ERR(item))
761                 RETURN(PTR_ERR(item));
762
763         entry->se_inode = igrab(inode);
764         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
765                                 NULL);
766         if (rc == 1) {
767                 entry->se_handle = it.it_lock_handle;
768                 ll_intent_release(&it);
769                 sa_fini_data(item);
770                 RETURN(1);
771         }
772
773         rc = md_intent_getattr_async(ll_i2mdexp(dir), item);
774         if (rc < 0) {
775                 entry->se_inode = NULL;
776                 iput(inode);
777                 sa_fini_data(item);
778         }
779
780         RETURN(rc);
781 }
782
783 /* async stat for file with @name */
784 static void sa_statahead(struct dentry *parent, const char *name, int len,
785                          const struct lu_fid *fid)
786 {
787         struct inode *dir = parent->d_inode;
788         struct ll_inode_info *lli = ll_i2info(dir);
789         struct ll_statahead_info *sai = lli->lli_sai;
790         struct dentry *dentry = NULL;
791         struct sa_entry *entry;
792         int rc;
793
794         ENTRY;
795
796         entry = sa_alloc(parent, sai, sai->sai_index, name, len, fid);
797         if (IS_ERR(entry))
798                 RETURN_EXIT;
799
800         dentry = d_lookup(parent, &entry->se_qstr);
801         if (!dentry) {
802                 rc = sa_lookup(dir, entry);
803         } else {
804                 rc = sa_revalidate(dir, entry, dentry);
805                 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
806                         ll_agl_add(sai, dentry->d_inode, entry->se_index);
807         }
808
809         if (dentry)
810                 dput(dentry);
811
812         if (rc != 0)
813                 sa_make_ready(sai, entry, rc);
814         else
815                 sai->sai_sent++;
816
817         sai->sai_index++;
818
819         EXIT;
820 }
821
822 /* async glimpse (agl) thread main function */
823 static int ll_agl_thread(void *arg)
824 {
825         struct dentry *parent = (struct dentry *)arg;
826         struct inode *dir = parent->d_inode;
827         struct ll_inode_info *plli = ll_i2info(dir);
828         struct ll_inode_info *clli;
829         /*
830          * We already own this reference, so it is safe to take it
831          * without a lock.
832          */
833         struct ll_statahead_info *sai = plli->lli_sai;
834
835         ENTRY;
836
837         CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
838                sai, parent);
839
840         while (({set_current_state(TASK_IDLE);
841                  !kthread_should_stop(); })) {
842                 spin_lock(&plli->lli_agl_lock);
843                 if (!agl_list_empty(sai)) {
844                         __set_current_state(TASK_RUNNING);
845                         clli = agl_first_entry(sai);
846                         list_del_init(&clli->lli_agl_list);
847                         spin_unlock(&plli->lli_agl_lock);
848                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
849                         cond_resched();
850                 } else {
851                         spin_unlock(&plli->lli_agl_lock);
852                         schedule();
853                 }
854         }
855         __set_current_state(TASK_RUNNING);
856         RETURN(0);
857 }
858
859 static void ll_stop_agl(struct ll_statahead_info *sai)
860 {
861         struct dentry *parent = sai->sai_dentry;
862         struct ll_inode_info *plli = ll_i2info(parent->d_inode);
863         struct ll_inode_info *clli;
864         struct task_struct *agl_task;
865
866         spin_lock(&plli->lli_agl_lock);
867         agl_task = sai->sai_agl_task;
868         sai->sai_agl_task = NULL;
869         spin_unlock(&plli->lli_agl_lock);
870         if (!agl_task)
871                 return;
872
873         CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
874                sai, (unsigned int)agl_task->pid);
875         kthread_stop(agl_task);
876
877         spin_lock(&plli->lli_agl_lock);
878         while (!agl_list_empty(sai)) {
879                 clli = agl_first_entry(sai);
880                 list_del_init(&clli->lli_agl_list);
881                 spin_unlock(&plli->lli_agl_lock);
882                 clli->lli_agl_index = 0;
883                 iput(&clli->lli_vfs_inode);
884                 spin_lock(&plli->lli_agl_lock);
885         }
886         spin_unlock(&plli->lli_agl_lock);
887         CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
888                sai, parent);
889         ll_sai_put(sai);
890 }
891
892 /* start agl thread */
893 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
894 {
895         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
896         struct ll_inode_info *plli;
897         struct task_struct *task;
898
899         ENTRY;
900
901         CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
902                sai, parent);
903
904         plli = ll_i2info(parent->d_inode);
905         task = kthread_create_on_node(ll_agl_thread, parent, node, "ll_agl_%d",
906                                       plli->lli_opendir_pid);
907         if (IS_ERR(task)) {
908                 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
909                 RETURN_EXIT;
910         }
911         sai->sai_agl_task = task;
912         atomic_inc(&ll_i2sbi(d_inode(parent))->ll_agl_total);
913         /* Get an extra reference that the thread holds */
914         ll_sai_get(d_inode(parent));
915
916         wake_up_process(task);
917
918         EXIT;
919 }
920
921 /* statahead thread main function */
922 static int ll_statahead_thread(void *arg)
923 {
924         struct dentry *parent = (struct dentry *)arg;
925         struct inode *dir = parent->d_inode;
926         struct ll_inode_info *lli = ll_i2info(dir);
927         struct ll_sb_info *sbi = ll_i2sbi(dir);
928         struct ll_statahead_info *sai = lli->lli_sai;
929         int first = 0;
930         struct md_op_data *op_data;
931         struct page *page = NULL;
932         __u64 pos = 0;
933         int rc = 0;
934
935         ENTRY;
936
937         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
938                sai, parent);
939
940         OBD_ALLOC_PTR(op_data);
941         if (!op_data)
942                 GOTO(out, rc = -ENOMEM);
943
944         while (pos != MDS_DIR_END_OFF && sai->sai_task) {
945                 struct lu_dirpage *dp;
946                 struct lu_dirent  *ent;
947
948                 op_data = ll_prep_md_op_data(op_data, dir, dir, NULL, 0, 0,
949                                              LUSTRE_OPC_ANY, dir);
950                 if (IS_ERR(op_data)) {
951                         rc = PTR_ERR(op_data);
952                         break;
953                 }
954
955                 page = ll_get_dir_page(dir, op_data, pos);
956                 ll_unlock_md_op_lsm(op_data);
957                 if (IS_ERR(page)) {
958                         rc = PTR_ERR(page);
959                         CDEBUG(D_READA,
960                                "error reading dir "DFID" at %llu /%llu opendir_pid = %u: rc = %d\n",
961                                PFID(ll_inode2fid(dir)), pos, sai->sai_index,
962                                lli->lli_opendir_pid, rc);
963                         break;
964                 }
965
966                 dp = page_address(page);
967                 for (ent = lu_dirent_start(dp);
968                      ent != NULL && sai->sai_task &&
969                      !sa_low_hit(sai);
970                      ent = lu_dirent_next(ent)) {
971                         __u64 hash;
972                         int namelen;
973                         char *name;
974                         struct lu_fid fid;
975
976                         hash = le64_to_cpu(ent->lde_hash);
977                         if (unlikely(hash < pos))
978                                 /*
979                                  * Skip until we find target hash value.
980                                  */
981                                 continue;
982
983                         namelen = le16_to_cpu(ent->lde_namelen);
984                         if (unlikely(namelen == 0))
985                                 /*
986                                  * Skip dummy record.
987                                  */
988                                 continue;
989
990                         name = ent->lde_name;
991                         if (name[0] == '.') {
992                                 if (namelen == 1) {
993                                         /*
994                                          * skip "."
995                                          */
996                                         continue;
997                                 } else if (name[1] == '.' && namelen == 2) {
998                                         /*
999                                          * skip ".."
1000                                          */
1001                                         continue;
1002                                 } else if (!sai->sai_ls_all) {
1003                                         /*
1004                                          * skip hidden files.
1005                                          */
1006                                         sai->sai_skip_hidden++;
1007                                         continue;
1008                                 }
1009                         }
1010
1011                         /*
1012                          * don't stat-ahead first entry.
1013                          */
1014                         if (unlikely(++first == 1))
1015                                 continue;
1016
1017                         fid_le_to_cpu(&fid, &ent->lde_fid);
1018
1019                         while (({set_current_state(TASK_IDLE);
1020                                  sai->sai_task; })) {
1021                                 spin_lock(&lli->lli_agl_lock);
1022                                 while (sa_sent_full(sai) &&
1023                                        !agl_list_empty(sai)) {
1024                                         struct ll_inode_info *clli;
1025
1026                                         __set_current_state(TASK_RUNNING);
1027                                         clli = agl_first_entry(sai);
1028                                         list_del_init(&clli->lli_agl_list);
1029                                         spin_unlock(&lli->lli_agl_lock);
1030
1031                                         ll_agl_trigger(&clli->lli_vfs_inode,
1032                                                        sai);
1033                                         cond_resched();
1034                                         spin_lock(&lli->lli_agl_lock);
1035                                 }
1036                                 spin_unlock(&lli->lli_agl_lock);
1037
1038                                 if (!sa_sent_full(sai))
1039                                         break;
1040                                 schedule();
1041                         }
1042                         __set_current_state(TASK_RUNNING);
1043
1044                         sa_statahead(parent, name, namelen, &fid);
1045                 }
1046
1047                 pos = le64_to_cpu(dp->ldp_hash_end);
1048                 ll_release_page(dir, page,
1049                                 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1050
1051                 if (sa_low_hit(sai)) {
1052                         rc = -EFAULT;
1053                         atomic_inc(&sbi->ll_sa_wrong);
1054                         CDEBUG(D_READA,
1055                                "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stoppingstatahead thread: pid %d\n",
1056                                PFID(&lli->lli_fid), sai->sai_hit,
1057                                sai->sai_miss, sai->sai_sent,
1058                                sai->sai_replied, current->pid);
1059                         break;
1060                 }
1061         }
1062         ll_finish_md_op_data(op_data);
1063
1064         if (rc < 0) {
1065                 spin_lock(&lli->lli_sa_lock);
1066                 sai->sai_task = NULL;
1067                 lli->lli_sa_enabled = 0;
1068                 spin_unlock(&lli->lli_sa_lock);
1069         }
1070
1071         /*
1072          * statahead is finished, but statahead entries need to be cached, wait
1073          * for file release closedir() call to stop me.
1074          */
1075         while (({set_current_state(TASK_IDLE);
1076                  sai->sai_task; })) {
1077                 schedule();
1078         }
1079         __set_current_state(TASK_RUNNING);
1080
1081         EXIT;
1082 out:
1083         ll_stop_agl(sai);
1084
1085         /*
1086          * wait for inflight statahead RPCs to finish, and then we can free sai
1087          * safely because statahead RPC will access sai data
1088          */
1089         while (sai->sai_sent != sai->sai_replied)
1090                 /* in case we're not woken up, timeout wait */
1091                 msleep(125);
1092
1093         CDEBUG(D_READA, "%s: statahead thread stopped: sai %p, parent %pd\n",
1094                sbi->ll_fsname, sai, parent);
1095
1096         spin_lock(&lli->lli_sa_lock);
1097         sai->sai_task = NULL;
1098         spin_unlock(&lli->lli_sa_lock);
1099         wake_up(&sai->sai_waitq);
1100
1101         ll_sai_put(sai);
1102
1103         return rc;
1104 }
1105
1106 /* authorize opened dir handle @key to statahead */
1107 void ll_authorize_statahead(struct inode *dir, void *key)
1108 {
1109         struct ll_inode_info *lli = ll_i2info(dir);
1110
1111         spin_lock(&lli->lli_sa_lock);
1112         if (!lli->lli_opendir_key && !lli->lli_sai) {
1113                 /*
1114                  * if lli_sai is not NULL, it means previous statahead is not
1115                  * finished yet, we'd better not start a new statahead for now.
1116                  */
1117                 LASSERT(lli->lli_opendir_pid == 0);
1118                 lli->lli_opendir_key = key;
1119                 lli->lli_opendir_pid = current->pid;
1120                 lli->lli_sa_enabled = 1;
1121         }
1122         spin_unlock(&lli->lli_sa_lock);
1123 }
1124
1125 /*
1126  * deauthorize opened dir handle @key to statahead, and notify statahead thread
1127  * to quit if it's running.
1128  */
1129 void ll_deauthorize_statahead(struct inode *dir, void *key)
1130 {
1131         struct ll_inode_info *lli = ll_i2info(dir);
1132         struct ll_statahead_info *sai;
1133
1134         LASSERT(lli->lli_opendir_key == key);
1135         LASSERT(lli->lli_opendir_pid != 0);
1136
1137         CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1138                PFID(&lli->lli_fid));
1139
1140         spin_lock(&lli->lli_sa_lock);
1141         lli->lli_opendir_key = NULL;
1142         lli->lli_opendir_pid = 0;
1143         lli->lli_sa_enabled = 0;
1144         sai = lli->lli_sai;
1145         if (sai && sai->sai_task) {
1146                 /*
1147                  * statahead thread may not have quit yet because it needs to
1148                  * cache entries, now it's time to tell it to quit.
1149                  *
1150                  * wake_up_process() provides the necessary barriers
1151                  * to pair with set_current_state().
1152                  */
1153                 struct task_struct *task = sai->sai_task;
1154
1155                 sai->sai_task = NULL;
1156                 wake_up_process(task);
1157         }
1158         spin_unlock(&lli->lli_sa_lock);
1159 }
1160
1161 enum {
1162         /**
1163          * not first dirent, or is "."
1164          */
1165         LS_NOT_FIRST_DE = 0,
1166         /**
1167          * the first non-hidden dirent
1168          */
1169         LS_FIRST_DE,
1170         /**
1171          * the first hidden dirent, that is "."
1172          */
1173         LS_FIRST_DOT_DE
1174 };
1175
1176 /* file is first dirent under @dir */
1177 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1178 {
1179         struct qstr          *target = &dentry->d_name;
1180         struct md_op_data    *op_data;
1181         int                   dot_de;
1182         struct page          *page = NULL;
1183         int                   rc = LS_NOT_FIRST_DE;
1184         __u64                 pos = 0;
1185
1186         ENTRY;
1187
1188         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1189                                      LUSTRE_OPC_ANY, dir);
1190         if (IS_ERR(op_data))
1191                 RETURN(PTR_ERR(op_data));
1192         /**
1193          *FIXME choose the start offset of the readdir
1194          */
1195
1196         page = ll_get_dir_page(dir, op_data, 0);
1197
1198         while (1) {
1199                 struct lu_dirpage *dp;
1200                 struct lu_dirent  *ent;
1201
1202                 if (IS_ERR(page)) {
1203                         struct ll_inode_info *lli = ll_i2info(dir);
1204
1205                         rc = PTR_ERR(page);
1206                         CERROR("%s: reading dir "DFID" at %llu opendir_pid = %u : rc = %d\n",
1207                                ll_i2sbi(dir)->ll_fsname,
1208                                PFID(ll_inode2fid(dir)), pos,
1209                                lli->lli_opendir_pid, rc);
1210                         break;
1211                 }
1212
1213                 dp = page_address(page);
1214                 for (ent = lu_dirent_start(dp); ent != NULL;
1215                      ent = lu_dirent_next(ent)) {
1216                         __u64 hash;
1217                         int namelen;
1218                         char *name;
1219
1220                         hash = le64_to_cpu(ent->lde_hash);
1221                         /*
1222                          * The ll_get_dir_page() can return any page containing
1223                          * the given hash which may be not the start hash.
1224                          */
1225                         if (unlikely(hash < pos))
1226                                 continue;
1227
1228                         namelen = le16_to_cpu(ent->lde_namelen);
1229                         if (unlikely(namelen == 0))
1230                                 /*
1231                                  * skip dummy record.
1232                                  */
1233                                 continue;
1234
1235                         name = ent->lde_name;
1236                         if (name[0] == '.') {
1237                                 if (namelen == 1)
1238                                         /*
1239                                          * skip "."
1240                                          */
1241                                         continue;
1242                                 else if (name[1] == '.' && namelen == 2)
1243                                         /*
1244                                          * skip ".."
1245                                          */
1246                                         continue;
1247                                 else
1248                                         dot_de = 1;
1249                         } else {
1250                                 dot_de = 0;
1251                         }
1252
1253                         if (dot_de && target->name[0] != '.') {
1254                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1255                                        target->len, target->name,
1256                                        namelen, name);
1257                                 continue;
1258                         }
1259
1260                         if (target->len != namelen ||
1261                             memcmp(target->name, name, namelen) != 0)
1262                                 rc = LS_NOT_FIRST_DE;
1263                         else if (!dot_de)
1264                                 rc = LS_FIRST_DE;
1265                         else
1266                                 rc = LS_FIRST_DOT_DE;
1267
1268                         ll_release_page(dir, page, false);
1269                         GOTO(out, rc);
1270                 }
1271                 pos = le64_to_cpu(dp->ldp_hash_end);
1272                 if (pos == MDS_DIR_END_OFF) {
1273                         /*
1274                          * End of directory reached.
1275                          */
1276                         ll_release_page(dir, page, false);
1277                         GOTO(out, rc);
1278                 } else {
1279                         /*
1280                          * chain is exhausted
1281                          * Normal case: continue to the next page.
1282                          */
1283                         ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1284                                               LDF_COLLIDE);
1285                         page = ll_get_dir_page(dir, op_data, pos);
1286                 }
1287         }
1288         EXIT;
1289 out:
1290         ll_finish_md_op_data(op_data);
1291
1292         return rc;
1293 }
1294
1295 /**
1296  * revalidate @dentryp from statahead cache
1297  *
1298  * \param[in] dir       parent directory
1299  * \param[in] sai       sai structure
1300  * \param[out] dentryp  pointer to dentry which will be revalidated
1301  * \param[in] unplug    unplug statahead window only (normally for negative
1302  *                      dentry)
1303  * \retval              1 on success, dentry is saved in @dentryp
1304  * \retval              0 if revalidation failed (no proper lock on client)
1305  * \retval              negative number upon error
1306  */
1307 static int revalidate_statahead_dentry(struct inode *dir,
1308                                        struct ll_statahead_info *sai,
1309                                        struct dentry **dentryp,
1310                                        bool unplug)
1311 {
1312         struct sa_entry *entry = NULL;
1313         struct ll_dentry_data *ldd;
1314         struct ll_inode_info *lli = ll_i2info(dir);
1315         int rc = 0;
1316
1317         ENTRY;
1318
1319         if ((*dentryp)->d_name.name[0] == '.') {
1320                 if (sai->sai_ls_all ||
1321                     sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1322                         /*
1323                          * Hidden dentry is the first one, or statahead
1324                          * thread does not skip so many hidden dentries
1325                          * before "sai_ls_all" enabled as below.
1326                          */
1327                 } else {
1328                         if (!sai->sai_ls_all)
1329                                 /*
1330                                  * It maybe because hidden dentry is not
1331                                  * the first one, "sai_ls_all" was not
1332                                  * set, then "ls -al" missed. Enable
1333                                  * "sai_ls_all" for such case.
1334                                  */
1335                                 sai->sai_ls_all = 1;
1336
1337                         /*
1338                          * Such "getattr" has been skipped before
1339                          * "sai_ls_all" enabled as above.
1340                          */
1341                         sai->sai_miss_hidden++;
1342                         RETURN(-EAGAIN);
1343                 }
1344         }
1345
1346         if (unplug)
1347                 GOTO(out, rc = 1);
1348
1349         entry = sa_get(sai, &(*dentryp)->d_name);
1350         if (!entry)
1351                 GOTO(out, rc = -EAGAIN);
1352
1353         if (!sa_ready(entry)) {
1354                 spin_lock(&lli->lli_sa_lock);
1355                 sai->sai_index_wait = entry->se_index;
1356                 spin_unlock(&lli->lli_sa_lock);
1357                 rc = wait_event_idle_timeout(sai->sai_waitq, sa_ready(entry),
1358                                              cfs_time_seconds(30));
1359                 if (rc == 0) {
1360                         /*
1361                          * entry may not be ready, so it may be used by inflight
1362                          * statahead RPC, don't free it.
1363                          */
1364                         entry = NULL;
1365                         GOTO(out, rc = -EAGAIN);
1366                 }
1367         }
1368
1369         /*
1370          * We need to see the value that was set immediately before we
1371          * were woken up.
1372          */
1373         if (smp_load_acquire(&entry->se_state) == SA_ENTRY_SUCC &&
1374             entry->se_inode) {
1375                 struct inode *inode = entry->se_inode;
1376                 struct lookup_intent it = { .it_op = IT_GETATTR,
1377                                             .it_lock_handle =
1378                                                 entry->se_handle };
1379                 __u64 bits;
1380
1381                 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1382                                         ll_inode2fid(inode), &bits);
1383                 if (rc == 1) {
1384                         if (!(*dentryp)->d_inode) {
1385                                 struct dentry *alias;
1386
1387                                 alias = ll_splice_alias(inode, *dentryp);
1388                                 if (IS_ERR(alias)) {
1389                                         ll_intent_release(&it);
1390                                         GOTO(out, rc = PTR_ERR(alias));
1391                                 }
1392                                 *dentryp = alias;
1393                                 /*
1394                                  * statahead prepared this inode, transfer inode
1395                                  * refcount from sa_entry to dentry
1396                                  */
1397                                 entry->se_inode = NULL;
1398                         } else if ((*dentryp)->d_inode != inode) {
1399                                 /* revalidate, but inode is recreated */
1400                                 CDEBUG(D_READA,
1401                                        "%s: stale dentry %pd inode " DFID", statahead inode "DFID "\n",
1402                                        ll_i2sbi(inode)->ll_fsname, *dentryp,
1403                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
1404                                        PFID(ll_inode2fid(inode)));
1405                                 ll_intent_release(&it);
1406                                 GOTO(out, rc = -ESTALE);
1407                         }
1408
1409                         if ((bits & MDS_INODELOCK_LOOKUP) &&
1410                             d_lustre_invalid(*dentryp))
1411                                 d_lustre_revalidate(*dentryp);
1412                         ll_intent_release(&it);
1413                 }
1414         }
1415 out:
1416         /*
1417          * statahead cached sa_entry can be used only once, and will be killed
1418          * right after use, so if lookup/revalidate accessed statahead cache,
1419          * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
1420          * stat this file again, we know we've done statahead before, see
1421          * dentry_may_statahead().
1422          */
1423         ldd = ll_d2d(*dentryp);
1424         /* ldd can be NULL if llite lookup failed. */
1425         if (ldd)
1426                 ldd->lld_sa_generation = lli->lli_sa_generation;
1427         sa_put(sai, entry);
1428         spin_lock(&lli->lli_sa_lock);
1429         if (sai->sai_task)
1430                 wake_up_process(sai->sai_task);
1431         spin_unlock(&lli->lli_sa_lock);
1432
1433         RETURN(rc);
1434 }
1435
1436 /**
1437  * start statahead thread
1438  *
1439  * \param[in] dir       parent directory
1440  * \param[in] dentry    dentry that triggers statahead, normally the first
1441  *                      dirent under @dir
1442  * \param[in] agl       indicate whether AGL is needed
1443  * \retval              -EAGAIN on success, because when this function is
1444  *                      called, it's already in lookup call, so client should
1445  *                      do it itself instead of waiting for statahead thread
1446  *                      to do it asynchronously.
1447  * \retval              negative number upon error
1448  */
1449 static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
1450                                   bool agl)
1451 {
1452         int node = cfs_cpt_spread_node(cfs_cpt_tab, CFS_CPT_ANY);
1453         struct ll_inode_info *lli = ll_i2info(dir);
1454         struct ll_statahead_info *sai = NULL;
1455         struct dentry *parent = dentry->d_parent;
1456         struct task_struct *task;
1457         struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode);
1458         int first = LS_FIRST_DE;
1459         int rc = 0;
1460
1461         ENTRY;
1462
1463         /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1464         first = is_first_dirent(dir, dentry);
1465         if (first == LS_NOT_FIRST_DE)
1466                 /* It is not "ls -{a}l" operation, no need statahead for it. */
1467                 GOTO(out, rc = -EFAULT);
1468
1469         if (unlikely(atomic_inc_return(&sbi->ll_sa_running) >
1470                                        sbi->ll_sa_running_max)) {
1471                 CDEBUG(D_READA,
1472                        "Too many concurrent statahead instances, avoid new statahead instance temporarily.\n");
1473                 GOTO(out, rc = -EMFILE);
1474         }
1475
1476         sai = ll_sai_alloc(parent);
1477         if (!sai)
1478                 GOTO(out, rc = -ENOMEM);
1479
1480         sai->sai_ls_all = (first == LS_FIRST_DOT_DE);
1481
1482         /*
1483          * if current lli_opendir_key was deauthorized, or dir re-opened by
1484          * another process, don't start statahead, otherwise the newly spawned
1485          * statahead thread won't be notified to quit.
1486          */
1487         spin_lock(&lli->lli_sa_lock);
1488         if (unlikely(lli->lli_sai || !lli->lli_opendir_key ||
1489                      lli->lli_opendir_pid != current->pid)) {
1490                 spin_unlock(&lli->lli_sa_lock);
1491                 GOTO(out, rc = -EPERM);
1492         }
1493         lli->lli_sai = sai;
1494         spin_unlock(&lli->lli_sa_lock);
1495
1496         CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
1497                current->pid, parent);
1498
1499         task = kthread_create_on_node(ll_statahead_thread, parent, node,
1500                                       "ll_sa_%u", lli->lli_opendir_pid);
1501         if (IS_ERR(task)) {
1502                 spin_lock(&lli->lli_sa_lock);
1503                 lli->lli_sai = NULL;
1504                 spin_unlock(&lli->lli_sa_lock);
1505                 rc = PTR_ERR(task);
1506                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1507                 GOTO(out, rc);
1508         }
1509
1510         if (ll_i2sbi(parent->d_inode)->ll_flags & LL_SBI_AGL_ENABLED && agl)
1511                 ll_start_agl(parent, sai);
1512
1513         atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_total);
1514         sai->sai_task = task;
1515
1516         wake_up_process(task);
1517         /*
1518          * We don't stat-ahead for the first dirent since we are already in
1519          * lookup.
1520          */
1521         RETURN(-EAGAIN);
1522
1523 out:
1524         /*
1525          * once we start statahead thread failed, disable statahead so that
1526          * subsequent stat won't waste time to try it.
1527          */
1528         spin_lock(&lli->lli_sa_lock);
1529         if (lli->lli_opendir_pid == current->pid)
1530                 lli->lli_sa_enabled = 0;
1531         spin_unlock(&lli->lli_sa_lock);
1532
1533         if (sai)
1534                 ll_sai_free(sai);
1535         if (first != LS_NOT_FIRST_DE)
1536                 atomic_dec(&sbi->ll_sa_running);
1537
1538         RETURN(rc);
1539 }
1540
1541 /*
1542  * Check whether statahead for @dir was started.
1543  */
1544 static inline bool ll_statahead_started(struct inode *dir, bool agl)
1545 {
1546         struct ll_inode_info *lli = ll_i2info(dir);
1547         struct ll_statahead_info *sai;
1548
1549         spin_lock(&lli->lli_sa_lock);
1550         sai = lli->lli_sai;
1551         if (sai && (sai->sai_agl_task != NULL) != agl)
1552                 CDEBUG(D_READA,
1553                        "%s: Statahead AGL hint changed from %d to %d\n",
1554                        ll_i2sbi(dir)->ll_fsname,
1555                        sai->sai_agl_task != NULL, agl);
1556         spin_unlock(&lli->lli_sa_lock);
1557
1558         return !!sai;
1559 }
1560
1561 /**
1562  * statahead entry function, this is called when client getattr on a file, it
1563  * will start statahead thread if this is the first dir entry, else revalidate
1564  * dentry from statahead cache.
1565  *
1566  * \param[in]  dir      parent directory
1567  * \param[out] dentryp  dentry to getattr
1568  * \param[in]  agl      whether start the agl thread
1569  *
1570  * \retval              1 on success
1571  * \retval              0 revalidation from statahead cache failed, caller needs
1572  *                      to getattr from server directly
1573  * \retval              negative number on error, caller often ignores this and
1574  *                      then getattr from server
1575  */
1576 int ll_start_statahead(struct inode *dir, struct dentry *dentry, bool agl)
1577 {
1578         if (!ll_statahead_started(dir, agl))
1579                 return start_statahead_thread(dir, dentry, agl);
1580         return 0;
1581 }
1582
1583 /**
1584  * revalidate dentry from statahead cache.
1585  *
1586  * \param[in]  dir      parent directory
1587  * \param[out] dentryp  dentry to getattr
1588  * \param[in]  unplug   unplug statahead window only (normally for negative
1589  *                      dentry)
1590  * \retval              1 on success
1591  * \retval              0 revalidation from statahead cache failed, caller needs
1592  *                      to getattr from server directly
1593  * \retval              negative number on error, caller often ignores this and
1594  *                      then getattr from server
1595  */
1596 int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
1597                             bool unplug)
1598 {
1599         struct ll_statahead_info *sai;
1600         int rc = 0;
1601
1602         sai = ll_sai_get(dir);
1603         if (sai) {
1604                 rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
1605                 CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
1606                        *dentryp, rc);
1607                 ll_sai_put(sai);
1608         }
1609         return rc;
1610 }