Whamcloud - gitweb
LU-2675 lustre: remove lustre_lite.h
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/mm.h>
40 #include <linux/highmem.h>
41 #include <linux/pagemap.h>
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44
45 #include <obd_support.h>
46 #include <lustre_dlm.h>
47 #include "llite_internal.h"
48
49 #define SA_OMITTED_ENTRY_MAX 8ULL
50
51 typedef enum {
52         /** negative values are for error cases */
53         SA_ENTRY_INIT = 0,      /** init entry */
54         SA_ENTRY_SUCC = 1,      /** stat succeed */
55         SA_ENTRY_INVA = 2,      /** invalid entry */
56         SA_ENTRY_DEST = 3,      /** entry to be destroyed */
57 } se_stat_t;
58
59 struct ll_sa_entry {
60         /* link into sai->sai_entries */
61         struct list_head        se_link;
62         /* link into sai->sai_entries_{received,stated} */
63         struct list_head        se_list;
64         /* link into sai hash table locally */
65         struct list_head        se_hash;
66         /* entry reference count */
67         atomic_t                se_refcount;
68         /* entry index in the sai */
69         __u64                   se_index;
70         /* low layer ldlm lock handle */
71         __u64                   se_handle;
72         /* entry status */
73         se_stat_t               se_stat;
74         /* entry size, contains name */
75         int                     se_size;
76         /* pointer to async getattr enqueue info */
77         struct md_enqueue_info *se_minfo;
78         /* pointer to the async getattr request */
79         struct ptlrpc_request  *se_req;
80         /* pointer to the target inode */
81         struct inode           *se_inode;
82         /* entry name */
83         struct qstr             se_qstr;
84 };
85
86 static unsigned int sai_generation = 0;
87 static DEFINE_SPINLOCK(sai_generation_lock);
88
89 static inline int ll_sa_entry_unhashed(struct ll_sa_entry *entry)
90 {
91         return list_empty(&entry->se_hash);
92 }
93
94 /*
95  * The entry only can be released by the caller, it is necessary to hold lock.
96  */
97 static inline int ll_sa_entry_stated(struct ll_sa_entry *entry)
98 {
99         smp_rmb();
100         return (entry->se_stat != SA_ENTRY_INIT);
101 }
102
103 static inline int ll_sa_entry_hash(int val)
104 {
105         return val & LL_SA_CACHE_MASK;
106 }
107
108 /*
109  * Insert entry to hash SA table.
110  */
111 static inline void
112 ll_sa_entry_enhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
113 {
114         int i = ll_sa_entry_hash(entry->se_qstr.hash);
115
116         spin_lock(&sai->sai_cache_lock[i]);
117         list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
118         spin_unlock(&sai->sai_cache_lock[i]);
119 }
120
121 /*
122  * Remove entry from SA table.
123  */
124 static inline void
125 ll_sa_entry_unhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
126 {
127         int i = ll_sa_entry_hash(entry->se_qstr.hash);
128
129         spin_lock(&sai->sai_cache_lock[i]);
130         list_del_init(&entry->se_hash);
131         spin_unlock(&sai->sai_cache_lock[i]);
132 }
133
134 static inline int agl_should_run(struct ll_statahead_info *sai,
135                                  struct inode *inode)
136 {
137         return (inode != NULL && S_ISREG(inode->i_mode) && sai->sai_agl_valid);
138 }
139
140 static inline struct ll_sa_entry *
141 sa_first_received_entry(struct ll_statahead_info *sai)
142 {
143         return list_entry(sai->sai_entries_received.next,
144                           struct ll_sa_entry, se_list);
145 }
146
147 static inline struct ll_inode_info *
148 agl_first_entry(struct ll_statahead_info *sai)
149 {
150         return list_entry(sai->sai_entries_agl.next,
151                           struct ll_inode_info, lli_agl_list);
152 }
153
154 static inline int sa_sent_full(struct ll_statahead_info *sai)
155 {
156         return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
157 }
158
159 static inline int sa_received_empty(struct ll_statahead_info *sai)
160 {
161         return list_empty(&sai->sai_entries_received);
162 }
163
164 static inline int agl_list_empty(struct ll_statahead_info *sai)
165 {
166         return list_empty(&sai->sai_entries_agl);
167 }
168
169 /**
170  * (1) hit ratio less than 80%
171  * or
172  * (2) consecutive miss more than 8
173  * then means low hit.
174  */
175 static inline int sa_low_hit(struct ll_statahead_info *sai)
176 {
177         return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
178                 (sai->sai_consecutive_miss > 8));
179 }
180
181 /*
182  * If the given index is behind of statahead window more than
183  * SA_OMITTED_ENTRY_MAX, then it is old.
184  */
185 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
186 {
187         return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
188                  sai->sai_index);
189 }
190
191 /*
192  * Insert it into sai_entries tail when init.
193  */
194 static struct ll_sa_entry *
195 ll_sa_entry_alloc(struct ll_statahead_info *sai, __u64 index, const char *name,
196                  int len)
197 {
198         struct ll_inode_info *lli;
199         struct ll_sa_entry   *entry;
200         int                   entry_size;
201         char                 *dname;
202         ENTRY;
203
204         entry_size = sizeof(struct ll_sa_entry) + (len & ~3) + 4;
205         OBD_ALLOC(entry, entry_size);
206         if (unlikely(entry == NULL))
207                 RETURN(ERR_PTR(-ENOMEM));
208
209         CDEBUG(D_READA, "alloc sa entry %.*s(%p) index "LPU64"\n",
210                len, name, entry, index);
211
212         entry->se_index = index;
213
214         /*
215          * Statahead entry reference rules:
216          *
217          * 1) When statahead entry is initialized, its reference is set as 2.
218          *    One reference is used by the directory scanner. When the scanner
219          *    searches the statahead cache for the given name, it can perform
220          *    lockless hash lookup (only the scanner can remove entry from hash
221          *    list), and once found, it needn't to call "atomic_inc()" for the
222          *    entry reference. So the performance is improved. After using the
223          *    statahead entry, the scanner will call "atomic_dec()" to drop the
224          *    reference held when initialization. If it is the last reference,
225          *    the statahead entry will be freed.
226          *
227          * 2) All other threads, including statahead thread and ptlrpcd thread,
228          *    when they process the statahead entry, the reference for target
229          *    should be held to guarantee the entry will not be released by the
230          *    directory scanner. After processing the entry, these threads will
231          *    drop the entry reference. If it is the last reference, the entry
232          *    will be freed.
233          *
234          *    The second reference when initializes the statahead entry is used
235          *    by the statahead thread, following the rule 2).
236          */
237         atomic_set(&entry->se_refcount, 2);
238         entry->se_stat = SA_ENTRY_INIT;
239         entry->se_size = entry_size;
240         dname = (char *)entry + sizeof(struct ll_sa_entry);
241         memcpy(dname, name, len);
242         dname[len] = 0;
243         entry->se_qstr.hash = full_name_hash(name, len);
244         entry->se_qstr.len = len;
245         entry->se_qstr.name = dname;
246
247         lli = ll_i2info(sai->sai_inode);
248         spin_lock(&lli->lli_sa_lock);
249         list_add_tail(&entry->se_link, &sai->sai_entries);
250         INIT_LIST_HEAD(&entry->se_list);
251         ll_sa_entry_enhash(sai, entry);
252         spin_unlock(&lli->lli_sa_lock);
253
254         atomic_inc(&sai->sai_cache_count);
255
256         RETURN(entry);
257 }
258
259 /*
260  * Used by the directory scanner to search entry with name.
261  *
262  * Only the caller can remove the entry from hash, so it is unnecessary to hold
263  * hash lock. It is caller's duty to release the init refcount on the entry, so
264  * it is also unnecessary to increase refcount on the entry.
265  */
266 static struct ll_sa_entry *
267 ll_sa_entry_get_byname(struct ll_statahead_info *sai, const struct qstr *qstr)
268 {
269         struct ll_sa_entry *entry;
270         int i = ll_sa_entry_hash(qstr->hash);
271
272         list_for_each_entry(entry, &sai->sai_cache[i], se_hash) {
273                 if (entry->se_qstr.hash == qstr->hash &&
274                     entry->se_qstr.len == qstr->len &&
275                     memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0)
276                         return entry;
277         }
278         return NULL;
279 }
280
281 /*
282  * Used by the async getattr request callback to find entry with index.
283  *
284  * Inside lli_sa_lock to prevent others to change the list during the search.
285  * It needs to increase entry refcount before returning to guarantee that the
286  * entry cannot be freed by others.
287  */
288 static struct ll_sa_entry *
289 ll_sa_entry_get_byindex(struct ll_statahead_info *sai, __u64 index)
290 {
291         struct ll_sa_entry *entry;
292
293         list_for_each_entry(entry, &sai->sai_entries, se_link) {
294                 if (entry->se_index == index) {
295                         LASSERT(atomic_read(&entry->se_refcount) > 0);
296                         atomic_inc(&entry->se_refcount);
297                         return entry;
298                 }
299                 if (entry->se_index > index)
300                         break;
301         }
302         return NULL;
303 }
304
305 static void ll_sa_entry_put(struct ll_statahead_info *sai,
306                              struct ll_sa_entry *entry)
307 {
308         if (atomic_dec_and_test(&entry->se_refcount)) {
309                 CDEBUG(D_READA, "free sa entry %.*s(%p) index "LPU64"\n",
310                        entry->se_qstr.len, entry->se_qstr.name, entry,
311                        entry->se_index);
312
313                 LASSERT(list_empty(&entry->se_link));
314                 LASSERT(list_empty(&entry->se_list));
315                 LASSERT(ll_sa_entry_unhashed(entry));
316
317                 if (entry->se_inode)
318                         iput(entry->se_inode);
319
320                 OBD_FREE(entry, entry->se_size);
321                 atomic_dec(&sai->sai_cache_count);
322         }
323 }
324
325 static inline void
326 do_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
327 {
328         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
329
330         LASSERT(!ll_sa_entry_unhashed(entry));
331         LASSERT(!list_empty(&entry->se_link));
332
333         ll_sa_entry_unhash(sai, entry);
334
335         spin_lock(&lli->lli_sa_lock);
336         entry->se_stat = SA_ENTRY_DEST;
337         list_del_init(&entry->se_link);
338         if (likely(!list_empty(&entry->se_list)))
339                 list_del_init(&entry->se_list);
340         spin_unlock(&lli->lli_sa_lock);
341
342         ll_sa_entry_put(sai, entry);
343 }
344
345 /*
346  * Delete it from sai_entries_stated list when fini.
347  */
348 static void
349 ll_sa_entry_fini(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
350 {
351         struct ll_sa_entry *pos, *next;
352
353         if (entry)
354                 do_sa_entry_fini(sai, entry);
355
356         /* drop old entry, only 'scanner' process does this, no need to lock */
357         list_for_each_entry_safe(pos, next, &sai->sai_entries, se_link) {
358                 if (!is_omitted_entry(sai, pos->se_index))
359                         break;
360                 /* keep those whose statahead RPC not finished */
361                 if (pos->se_stat == SA_ENTRY_SUCC ||
362                     pos->se_stat == SA_ENTRY_INVA)
363                         do_sa_entry_fini(sai, pos);
364         }
365 }
366
367 /*
368  * Inside lli_sa_lock.
369  */
370 static void
371 __sa_entry_post_stat(struct ll_statahead_info *sai, struct ll_sa_entry *entry,
372                      se_stat_t stat)
373 {
374         struct ll_sa_entry *se;
375         struct list_head *pos = &sai->sai_entries_stated;
376
377         LASSERT(entry->se_stat == SA_ENTRY_INIT);
378
379         if (!list_empty(&entry->se_list))
380                 list_del_init(&entry->se_list);
381
382         list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) {
383                 if (se->se_index < entry->se_index) {
384                         pos = &se->se_list;
385                         break;
386                 }
387         }
388
389         list_add(&entry->se_list, pos);
390         entry->se_stat = stat;
391 }
392
393 /*
394  * Move entry to sai_entries_stated and sort with the index.
395  * \retval 1    -- entry to be destroyed.
396  * \retval 0    -- entry is inserted into stated list.
397  */
398 static void
399 sa_entry_post_stat(struct ll_statahead_info *sai, struct ll_sa_entry *entry,
400                    se_stat_t stat)
401 {
402         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
403         struct md_enqueue_info *minfo = entry->se_minfo;
404         struct ptlrpc_request *req = entry->se_req;
405
406         /* release resources used in RPC */
407         if (minfo) {
408                 entry->se_minfo = NULL;
409                 ll_intent_release(&minfo->mi_it);
410                 iput(minfo->mi_dir);
411                 OBD_FREE_PTR(minfo);
412         }
413
414         if (req) {
415                 entry->se_req = NULL;
416                 ptlrpc_req_finished(req);
417         }
418
419         spin_lock(&lli->lli_sa_lock);
420         __sa_entry_post_stat(sai, entry, stat);
421         spin_unlock(&lli->lli_sa_lock);
422 }
423
424 /*
425  * Insert inode into the list of sai_entries_agl.
426  */
427 static void ll_agl_add(struct ll_statahead_info *sai,
428                        struct inode *inode, int index)
429 {
430         struct ll_inode_info *child  = ll_i2info(inode);
431         struct ll_inode_info *parent = ll_i2info(sai->sai_inode);
432         int                   added  = 0;
433
434         spin_lock(&child->lli_agl_lock);
435         if (child->lli_agl_index == 0) {
436                 child->lli_agl_index = index;
437                 spin_unlock(&child->lli_agl_lock);
438
439                 LASSERT(list_empty(&child->lli_agl_list));
440
441                 igrab(inode);
442                 spin_lock(&parent->lli_agl_lock);
443                 if (agl_list_empty(sai))
444                         added = 1;
445                 list_add_tail(&child->lli_agl_list, &sai->sai_entries_agl);
446                 spin_unlock(&parent->lli_agl_lock);
447         } else {
448                 spin_unlock(&child->lli_agl_lock);
449         }
450
451         if (added > 0)
452                 wake_up(&sai->sai_agl_thread.t_ctl_waitq);
453 }
454
455 static struct ll_statahead_info *ll_sai_alloc(void)
456 {
457         struct ll_statahead_info *sai;
458         int                       i;
459         ENTRY;
460
461         OBD_ALLOC_PTR(sai);
462         if (!sai)
463                 RETURN(NULL);
464
465         atomic_set(&sai->sai_refcount, 1);
466
467         spin_lock(&sai_generation_lock);
468         sai->sai_generation = ++sai_generation;
469         if (unlikely(sai_generation == 0))
470                 sai->sai_generation = ++sai_generation;
471         spin_unlock(&sai_generation_lock);
472
473         sai->sai_max = LL_SA_RPC_MIN;
474         sai->sai_index = 1;
475         init_waitqueue_head(&sai->sai_waitq);
476         init_waitqueue_head(&sai->sai_thread.t_ctl_waitq);
477         init_waitqueue_head(&sai->sai_agl_thread.t_ctl_waitq);
478
479         INIT_LIST_HEAD(&sai->sai_entries);
480         INIT_LIST_HEAD(&sai->sai_entries_received);
481         INIT_LIST_HEAD(&sai->sai_entries_stated);
482         INIT_LIST_HEAD(&sai->sai_entries_agl);
483
484         for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
485                 INIT_LIST_HEAD(&sai->sai_cache[i]);
486                 spin_lock_init(&sai->sai_cache_lock[i]);
487         }
488         atomic_set(&sai->sai_cache_count, 0);
489
490         RETURN(sai);
491 }
492
493 static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
494 {
495         struct ll_inode_info *lli = ll_i2info(dir);
496         struct ll_statahead_info *sai = NULL;
497
498         spin_lock(&lli->lli_sa_lock);
499         sai = lli->lli_sai;
500         if (sai != NULL)
501                 atomic_inc(&sai->sai_refcount);
502         spin_unlock(&lli->lli_sa_lock);
503
504         return sai;
505 }
506
507 static void ll_sai_put(struct ll_statahead_info *sai)
508 {
509         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
510
511         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
512                 struct ll_sa_entry *entry, *next;
513                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode);
514
515                 lli->lli_sai = NULL;
516                 spin_unlock(&lli->lli_sa_lock);
517
518                 LASSERT(thread_is_stopped(&sai->sai_thread));
519                 LASSERT(thread_is_stopped(&sai->sai_agl_thread));
520                 LASSERT(sai->sai_sent == sai->sai_replied);
521
522                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
523                                          se_link)
524                         do_sa_entry_fini(sai, entry);
525
526                 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
527                 LASSERT(agl_list_empty(sai));
528                 LASSERT(atomic_read(&sai->sai_refcount) == 0);
529
530                 iput(sai->sai_inode);
531                 OBD_FREE_PTR(sai);
532                 atomic_dec(&sbi->ll_sa_running);
533         }
534 }
535
536 /* Do NOT forget to drop inode refcount when into sai_entries_agl. */
537 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
538 {
539         struct ll_inode_info *lli   = ll_i2info(inode);
540         __u64                 index = lli->lli_agl_index;
541         int                   rc;
542         ENTRY;
543
544         LASSERT(list_empty(&lli->lli_agl_list));
545
546         /* AGL maybe fall behind statahead with one entry */
547         if (is_omitted_entry(sai, index + 1)) {
548                 lli->lli_agl_index = 0;
549                 iput(inode);
550                 RETURN_EXIT;
551         }
552
553         /* Someone is in glimpse (sync or async), do nothing. */
554         rc = down_write_trylock(&lli->lli_glimpse_sem);
555         if (rc == 0) {
556                 lli->lli_agl_index = 0;
557                 iput(inode);
558                 RETURN_EXIT;
559         }
560
561         /*
562          * Someone triggered glimpse within 1 sec before.
563          * 1) The former glimpse succeeded with glimpse lock granted by OST, and
564          *    if the lock is still cached on client, AGL needs to do nothing. If
565          *    it is cancelled by other client, AGL maybe cannot obtaion new lock
566          *    for no glimpse callback triggered by AGL.
567          * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
568          *    Under such case, it is quite possible that the OST will not grant
569          *    glimpse lock for AGL also.
570          * 3) The former glimpse failed, compared with other two cases, it is
571          *    relative rare. AGL can ignore such case, and it will not muchly
572          *    affect the performance.
573          */
574         if (lli->lli_glimpse_time != 0 &&
575             cfs_time_before(cfs_time_shift(-1), lli->lli_glimpse_time)) {
576                 up_write(&lli->lli_glimpse_sem);
577                 lli->lli_agl_index = 0;
578                 iput(inode);
579                 RETURN_EXIT;
580         }
581
582         CDEBUG(D_READA, "Handling (init) async glimpse: inode = "
583                DFID", idx = "LPU64"\n", PFID(&lli->lli_fid), index);
584
585         cl_agl(inode);
586         lli->lli_agl_index = 0;
587         lli->lli_glimpse_time = cfs_time_current();
588         up_write(&lli->lli_glimpse_sem);
589
590         CDEBUG(D_READA, "Handled (init) async glimpse: inode= "
591                DFID", idx = "LPU64", rc = %d\n",
592                PFID(&lli->lli_fid), index, rc);
593
594         iput(inode);
595
596         EXIT;
597 }
598
599 /* prepare inode for received statahead entry, and add it into agl list */
600 static void sa_post_one(struct ll_statahead_info *sai,
601                         struct ll_sa_entry *entry)
602 {
603         struct inode           *dir   = sai->sai_inode;
604         struct inode           *child;
605         struct md_enqueue_info *minfo;
606         struct lookup_intent   *it;
607         struct ptlrpc_request  *req;
608         struct mdt_body        *body;
609         int                     rc    = 0;
610         ENTRY;
611
612         LASSERT(entry->se_handle != 0);
613
614         minfo = entry->se_minfo;
615         it = &minfo->mi_it;
616         req = entry->se_req;
617         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
618         if (body == NULL)
619                 GOTO(out, rc = -EFAULT);
620
621         child = entry->se_inode;
622         if (child == NULL) {
623                 /*
624                  * lookup.
625                  */
626                 LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
627
628                 /* XXX: No fid in reply, this is probaly cross-ref case.
629                  * SA can't handle it yet. */
630                 if (body->mbo_valid & OBD_MD_MDS)
631                         GOTO(out, rc = -EAGAIN);
632         } else {
633                 /*
634                  * revalidate.
635                  */
636                 /* unlinked and re-created with the same name */
637                 if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2,
638                                         &body->mbo_fid1))) {
639                         entry->se_inode = NULL;
640                         iput(child);
641                         child = NULL;
642                 }
643         }
644
645         it->d.lustre.it_lock_handle = entry->se_handle;
646         rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
647         if (rc != 1)
648                 GOTO(out, rc = -EAGAIN);
649
650         rc = ll_prep_inode(&child, req, dir->i_sb, it);
651         if (rc)
652                 GOTO(out, rc);
653
654         CDEBUG(D_DLMTRACE, "%s: setting l_data to inode "DFID"(%p)\n",
655                ll_get_fsname(child->i_sb, NULL, 0),
656                PFID(ll_inode2fid(child)), child);
657         ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
658
659         entry->se_inode = child;
660
661         if (agl_should_run(sai, child))
662                 ll_agl_add(sai, child, entry->se_index);
663
664         EXIT;
665
666 out:
667         /* The "sa_entry_post_stat()" will drop related ldlm ibits lock
668          * reference count by calling "ll_intent_drop_lock()" in spite of the
669          * above operations failed or not. Do not worry about calling
670          * "ll_intent_drop_lock()" more than once. */
671         sa_entry_post_stat(sai, entry, rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
672         if (entry->se_index == sai->sai_index_wait)
673                 wake_up(&sai->sai_waitq);
674         ll_sa_entry_put(sai, entry);
675 }
676
677 static void ll_post_statahead(struct ll_statahead_info *sai)
678 {
679         struct ll_inode_info *lli;
680
681         lli = ll_i2info(sai->sai_inode);
682
683         while (!sa_received_empty(sai)) {
684                 struct ll_sa_entry *entry;
685
686                 spin_lock(&lli->lli_sa_lock);
687                 if (unlikely(sa_received_empty(sai))) {
688                         spin_unlock(&lli->lli_sa_lock);
689                         break;
690                 }
691                 entry = sa_first_received_entry(sai);
692                 atomic_inc(&entry->se_refcount);
693                 list_del_init(&entry->se_list);
694                 spin_unlock(&lli->lli_sa_lock);
695
696                 sa_post_one(sai, entry);
697         }
698
699         spin_lock(&lli->lli_agl_lock);
700         while (!agl_list_empty(sai)) {
701                 struct ll_inode_info *clli;
702
703                 clli = agl_first_entry(sai);
704                 list_del_init(&clli->lli_agl_list);
705                 spin_unlock(&lli->lli_agl_lock);
706
707                 ll_agl_trigger(&clli->lli_vfs_inode, sai);
708
709                 spin_lock(&lli->lli_agl_lock);
710         }
711         spin_unlock(&lli->lli_agl_lock);
712 }
713
714 static int ll_statahead_interpret(struct ptlrpc_request *req,
715                                   struct md_enqueue_info *minfo, int rc)
716 {
717         struct lookup_intent *it = &minfo->mi_it;
718         struct inode *dir = minfo->mi_dir;
719         struct ll_inode_info *lli = ll_i2info(dir);
720         struct ll_statahead_info *sai;
721         struct ll_sa_entry *entry;
722         int wakeup;
723         ENTRY;
724
725         if (it_disposition(it, DISP_LOOKUP_NEG))
726                 rc = -ENOENT;
727
728         sai = ll_sai_get(dir);
729         LASSERT(sai != NULL);
730         LASSERT(!thread_is_stopped(&sai->sai_thread));
731
732         spin_lock(&lli->lli_sa_lock);
733         entry = ll_sa_entry_get_byindex(sai, minfo->mi_cbdata);
734         LASSERT(entry != NULL);
735         if (rc != 0) {
736                 __sa_entry_post_stat(sai, entry, SA_ENTRY_INVA);
737                 wakeup = (entry->se_index == sai->sai_index_wait);
738         } else {
739                 entry->se_minfo = minfo;
740                 entry->se_req = ptlrpc_request_addref(req);
741                 /* Release the async ibits lock ASAP to avoid deadlock
742                  * when statahead thread tries to enqueue lock on parent
743                  * for readpage and other tries to enqueue lock on child
744                  * with parent's lock held, for example: unlink. */
745                 entry->se_handle = it->d.lustre.it_lock_handle;
746                 ll_intent_drop_lock(it);
747                 wakeup = sa_received_empty(sai);
748                 list_add_tail(&entry->se_list, &sai->sai_entries_received);
749         }
750         sai->sai_replied++;
751         spin_unlock(&lli->lli_sa_lock);
752
753         ll_sa_entry_put(sai, entry);
754         if (wakeup)
755                 wake_up(&sai->sai_thread.t_ctl_waitq);
756
757         if (rc != 0) {
758                 ll_intent_release(it);
759                 iput(dir);
760                 OBD_FREE_PTR(minfo);
761         }
762         ll_sai_put(sai);
763         RETURN(rc);
764 }
765
766 static void sa_args_fini(struct md_enqueue_info *minfo,
767                          struct ldlm_enqueue_info *einfo)
768 {
769         LASSERT(minfo && einfo);
770         iput(minfo->mi_dir);
771         capa_put(minfo->mi_data.op_capa1);
772         capa_put(minfo->mi_data.op_capa2);
773         OBD_FREE_PTR(minfo);
774         OBD_FREE_PTR(einfo);
775 }
776
777 /**
778  * There is race condition between "capa_put" and "ll_statahead_interpret" for
779  * accessing "op_data.op_capa[1,2]" as following:
780  * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling
781  * "md_intent_getattr_async". But "ll_statahead_interpret" maybe run first, and
782  * fill "op_data.op_capa[1,2]" as POISON, then cause "capa_put" access invalid
783  * "ocapa". So here reserve "op_data.op_capa[1,2]" in "pcapa" before calling
784  * "md_intent_getattr_async".
785  */
786 static int sa_args_init(struct inode *dir, struct inode *child,
787                         struct ll_sa_entry *entry, struct md_enqueue_info **pmi,
788                         struct ldlm_enqueue_info **pei,
789                         struct obd_capa **pcapa)
790 {
791         struct qstr              *qstr = &entry->se_qstr;
792         struct md_enqueue_info   *minfo;
793         struct ldlm_enqueue_info *einfo;
794         struct md_op_data        *op_data;
795
796         OBD_ALLOC_PTR(einfo);
797         if (einfo == NULL)
798                 return -ENOMEM;
799
800         OBD_ALLOC_PTR(minfo);
801         if (minfo == NULL) {
802                 OBD_FREE_PTR(einfo);
803                 return -ENOMEM;
804         }
805
806         op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child, qstr->name,
807                                      qstr->len, 0, LUSTRE_OPC_ANY, NULL);
808         if (IS_ERR(op_data)) {
809                 OBD_FREE_PTR(einfo);
810                 OBD_FREE_PTR(minfo);
811                 return PTR_ERR(op_data);
812         }
813
814         minfo->mi_it.it_op = IT_GETATTR;
815         minfo->mi_dir = igrab(dir);
816         minfo->mi_cb = ll_statahead_interpret;
817         minfo->mi_cbdata = entry->se_index;
818
819         einfo->ei_type   = LDLM_IBITS;
820         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
821         einfo->ei_cb_bl  = ll_md_blocking_ast;
822         einfo->ei_cb_cp  = ldlm_completion_ast;
823         einfo->ei_cb_gl  = NULL;
824         einfo->ei_cbdata = NULL;
825
826         *pmi = minfo;
827         *pei = einfo;
828         pcapa[0] = op_data->op_capa1;
829         pcapa[1] = op_data->op_capa2;
830
831         return 0;
832 }
833
834 static int do_sa_lookup(struct inode *dir, struct ll_sa_entry *entry)
835 {
836         struct md_enqueue_info   *minfo;
837         struct ldlm_enqueue_info *einfo;
838         struct obd_capa          *capas[2];
839         int                       rc;
840         ENTRY;
841
842         rc = sa_args_init(dir, NULL, entry, &minfo, &einfo, capas);
843         if (rc)
844                 RETURN(rc);
845
846         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
847         if (!rc) {
848                 capa_put(capas[0]);
849                 capa_put(capas[1]);
850         } else {
851                 sa_args_fini(minfo, einfo);
852         }
853
854         RETURN(rc);
855 }
856
857 /**
858  * similar to ll_revalidate_it().
859  * \retval      1 -- dentry valid
860  * \retval      0 -- will send stat-ahead request
861  * \retval others -- prepare stat-ahead request failed
862  */
863 static int do_sa_revalidate(struct inode *dir, struct ll_sa_entry *entry,
864                             struct dentry *dentry)
865 {
866         struct inode             *inode = dentry->d_inode;
867         struct lookup_intent      it = { .it_op = IT_GETATTR,
868                                          .d.lustre.it_lock_handle = 0 };
869         struct md_enqueue_info   *minfo;
870         struct ldlm_enqueue_info *einfo;
871         struct obd_capa          *capas[2];
872         int rc;
873         ENTRY;
874
875         if (unlikely(inode == NULL))
876                 RETURN(1);
877
878         if (d_mountpoint(dentry))
879                 RETURN(1);
880
881         entry->se_inode = igrab(inode);
882         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),NULL);
883         if (rc == 1) {
884                 entry->se_handle = it.d.lustre.it_lock_handle;
885                 ll_intent_release(&it);
886                 RETURN(1);
887         }
888
889         rc = sa_args_init(dir, inode, entry, &minfo, &einfo, capas);
890         if (rc) {
891                 entry->se_inode = NULL;
892                 iput(inode);
893                 RETURN(rc);
894         }
895
896         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
897         if (!rc) {
898                 capa_put(capas[0]);
899                 capa_put(capas[1]);
900         } else {
901                 entry->se_inode = NULL;
902                 iput(inode);
903                 sa_args_fini(minfo, einfo);
904         }
905
906         RETURN(rc);
907 }
908
909 static void ll_statahead_one(struct dentry *parent, const char *name,
910                              const int namelen)
911 {
912         struct inode             *dir    = parent->d_inode;
913         struct ll_inode_info     *lli    = ll_i2info(dir);
914         struct ll_statahead_info *sai    = lli->lli_sai;
915         struct dentry            *dentry = NULL;
916         struct ll_sa_entry       *entry;
917         int                       rc;
918         ENTRY;
919
920         entry = ll_sa_entry_alloc(sai, sai->sai_index, name,namelen);
921         if (IS_ERR(entry))
922                 RETURN_EXIT;
923
924         dentry = d_lookup(parent, &entry->se_qstr);
925         if (!dentry) {
926                 rc = do_sa_lookup(dir, entry);
927         } else {
928                 rc = do_sa_revalidate(dir, entry, dentry);
929                 if (rc == 1 && agl_should_run(sai, dentry->d_inode))
930                         ll_agl_add(sai, dentry->d_inode, entry->se_index);
931         }
932
933         if (dentry != NULL)
934                 dput(dentry);
935
936         if (rc) {
937                 sa_entry_post_stat(sai, entry,
938                                    rc < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC);
939                 if (entry->se_index == sai->sai_index_wait)
940                         wake_up(&sai->sai_waitq);
941         } else {
942                 sai->sai_sent++;
943         }
944
945         sai->sai_index++;
946         /* drop one refcount on entry by ll_sa_entry_alloc */
947         ll_sa_entry_put(sai, entry);
948
949         EXIT;
950 }
951
952 static int ll_agl_thread(void *arg)
953 {
954         struct dentry *parent = (struct dentry *)arg;
955         struct inode *dir = parent->d_inode;
956         struct ll_inode_info *plli = ll_i2info(dir);
957         struct ll_inode_info *clli;
958         struct ll_sb_info *sbi = ll_i2sbi(dir);
959         struct ll_statahead_info *sai;
960         struct ptlrpc_thread *thread;
961         struct l_wait_info lwi = { 0 };
962         ENTRY;
963
964
965         sai = ll_sai_get(dir);
966         thread = &sai->sai_agl_thread;
967         thread->t_pid = current_pid();
968         CDEBUG(D_READA, "agl thread started: sai %p, parent %.*s\n",
969                sai, parent->d_name.len, parent->d_name.name);
970
971         atomic_inc(&sbi->ll_agl_total);
972         spin_lock(&plli->lli_agl_lock);
973         sai->sai_agl_valid = 1;
974         if (thread_is_init(thread))
975                 /* If someone else has changed the thread state
976                  * (e.g. already changed to SVC_STOPPING), we can't just
977                  * blindly overwrite that setting. */
978                 thread_set_flags(thread, SVC_RUNNING);
979         spin_unlock(&plli->lli_agl_lock);
980         wake_up(&thread->t_ctl_waitq);
981
982         while (1) {
983                 l_wait_event(thread->t_ctl_waitq,
984                              !agl_list_empty(sai) ||
985                              !thread_is_running(thread),
986                              &lwi);
987
988                 if (!thread_is_running(thread))
989                         break;
990
991                 spin_lock(&plli->lli_agl_lock);
992                 /* The statahead thread maybe help to process AGL entries,
993                  * so check whether list empty again. */
994                 if (!agl_list_empty(sai)) {
995                         clli = agl_first_entry(sai);
996                         list_del_init(&clli->lli_agl_list);
997                         spin_unlock(&plli->lli_agl_lock);
998                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
999                 } else {
1000                         spin_unlock(&plli->lli_agl_lock);
1001                 }
1002         }
1003
1004         spin_lock(&plli->lli_agl_lock);
1005         sai->sai_agl_valid = 0;
1006         while (!agl_list_empty(sai)) {
1007                 clli = agl_first_entry(sai);
1008                 list_del_init(&clli->lli_agl_list);
1009                 spin_unlock(&plli->lli_agl_lock);
1010                 clli->lli_agl_index = 0;
1011                 iput(&clli->lli_vfs_inode);
1012                 spin_lock(&plli->lli_agl_lock);
1013         }
1014         thread_set_flags(thread, SVC_STOPPED);
1015         spin_unlock(&plli->lli_agl_lock);
1016         wake_up(&thread->t_ctl_waitq);
1017         ll_sai_put(sai);
1018         CDEBUG(D_READA, "agl thread stopped: sai %p, parent %.*s\n",
1019                sai, parent->d_name.len, parent->d_name.name);
1020         RETURN(0);
1021 }
1022
1023 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
1024 {
1025         struct ptlrpc_thread *thread = &sai->sai_agl_thread;
1026         struct l_wait_info    lwi    = { 0 };
1027         struct ll_inode_info  *plli;
1028         struct task_struct            *task;
1029         ENTRY;
1030
1031         CDEBUG(D_READA, "start agl thread: sai %p, parent %.*s\n",
1032                sai, parent->d_name.len, parent->d_name.name);
1033
1034         plli = ll_i2info(parent->d_inode);
1035         task = kthread_run(ll_agl_thread, parent,
1036                                "ll_agl_%u", plli->lli_opendir_pid);
1037         if (IS_ERR(task)) {
1038                 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
1039                 thread_set_flags(thread, SVC_STOPPED);
1040                 RETURN_EXIT;
1041         }
1042
1043         l_wait_event(thread->t_ctl_waitq,
1044                      thread_is_running(thread) || thread_is_stopped(thread),
1045                      &lwi);
1046         EXIT;
1047 }
1048
1049 static int ll_statahead_thread(void *arg)
1050 {
1051         struct dentry *parent = (struct dentry *)arg;
1052         struct inode *dir = parent->d_inode;
1053         struct ll_inode_info *lli = ll_i2info(dir);
1054         struct ll_sb_info *sbi = ll_i2sbi(dir);
1055         struct ll_statahead_info *sai;
1056         struct ptlrpc_thread *thread;
1057         struct ptlrpc_thread *agl_thread;
1058         int first = 0;
1059         struct md_op_data *op_data;
1060         struct ll_dir_chain chain;
1061         struct l_wait_info lwi = { 0 };
1062         struct page *page = NULL;
1063         __u64 pos = 0;
1064         int rc = 0;
1065         ENTRY;
1066
1067         sai = ll_sai_get(dir);
1068         thread = &sai->sai_thread;
1069         agl_thread = &sai->sai_agl_thread;
1070         thread->t_pid = current_pid();
1071         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %.*s\n",
1072                sai, parent->d_name.len, parent->d_name.name);
1073
1074         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1075                                      LUSTRE_OPC_ANY, dir);
1076         if (IS_ERR(op_data))
1077                 GOTO(out, rc = PTR_ERR(op_data));
1078
1079         op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
1080
1081         if (sbi->ll_flags & LL_SBI_AGL_ENABLED)
1082                 ll_start_agl(parent, sai);
1083
1084         atomic_inc(&sbi->ll_sa_total);
1085         spin_lock(&lli->lli_sa_lock);
1086         if (thread_is_init(thread))
1087                 /* If someone else has changed the thread state
1088                  * (e.g. already changed to SVC_STOPPING), we can't just
1089                  * blindly overwrite that setting. */
1090                 thread_set_flags(thread, SVC_RUNNING);
1091         spin_unlock(&lli->lli_sa_lock);
1092         wake_up(&thread->t_ctl_waitq);
1093
1094         ll_dir_chain_init(&chain);
1095         while (pos != MDS_DIR_END_OFF && thread_is_running(thread)) {
1096                 struct lu_dirpage *dp;
1097                 struct lu_dirent  *ent;
1098
1099                 sai->sai_in_readpage = 1;
1100                 page = ll_get_dir_page(dir, op_data, pos, &chain);
1101                 sai->sai_in_readpage = 0;
1102                 if (IS_ERR(page)) {
1103                         rc = PTR_ERR(page);
1104                         CDEBUG(D_READA, "error reading dir "DFID" at "LPU64
1105                                "/"LPU64" opendir_pid = %u: rc = %d\n",
1106                                PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1107                                lli->lli_opendir_pid, rc);
1108                         break;
1109                 }
1110
1111                 dp = page_address(page);
1112                 for (ent = lu_dirent_start(dp);
1113                      ent != NULL && thread_is_running(thread) &&
1114                      !sa_low_hit(sai);
1115                      ent = lu_dirent_next(ent)) {
1116                         __u64 hash;
1117                         int namelen;
1118                         char *name;
1119
1120                         hash = le64_to_cpu(ent->lde_hash);
1121                         if (unlikely(hash < pos))
1122                                 /*
1123                                  * Skip until we find target hash value.
1124                                  */
1125                                 continue;
1126
1127                         namelen = le16_to_cpu(ent->lde_namelen);
1128                         if (unlikely(namelen == 0))
1129                                 /*
1130                                  * Skip dummy record.
1131                                  */
1132                                 continue;
1133
1134                         name = ent->lde_name;
1135                         if (name[0] == '.') {
1136                                 if (namelen == 1) {
1137                                         /*
1138                                          * skip "."
1139                                          */
1140                                         continue;
1141                                 } else if (name[1] == '.' && namelen == 2) {
1142                                         /*
1143                                          * skip ".."
1144                                          */
1145                                         continue;
1146                                 } else if (!sai->sai_ls_all) {
1147                                         /*
1148                                          * skip hidden files.
1149                                          */
1150                                         sai->sai_skip_hidden++;
1151                                         continue;
1152                                 }
1153                         }
1154
1155                         /*
1156                          * don't stat-ahead first entry.
1157                          */
1158                         if (unlikely(++first == 1))
1159                                 continue;
1160
1161                         /* wait for spare statahead window */
1162                         do {
1163                                 l_wait_event(thread->t_ctl_waitq,
1164                                              !sa_sent_full(sai) ||
1165                                              !sa_received_empty(sai) ||
1166                                              !agl_list_empty(sai) ||
1167                                              !thread_is_running(thread),
1168                                              &lwi);
1169
1170                                 ll_post_statahead(sai);
1171                         } while (sa_sent_full(sai) &&
1172                                  thread_is_running(thread));
1173
1174                         ll_statahead_one(parent, name, namelen);
1175                 }
1176
1177                 pos = le64_to_cpu(dp->ldp_hash_end);
1178                 ll_release_page(dir, page,
1179                                 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1180
1181                 if (sa_low_hit(sai)) {
1182                         rc = -EFAULT;
1183                         atomic_inc(&sbi->ll_sa_wrong);
1184                         CDEBUG(D_READA, "Statahead for dir "DFID" hit "
1185                                "ratio too low: hit/miss "LPU64"/"LPU64
1186                                ", sent/replied "LPU64"/"LPU64", stopping "
1187                                "statahead thread: pid %d\n",
1188                                PFID(&lli->lli_fid), sai->sai_hit,
1189                                sai->sai_miss, sai->sai_sent,
1190                                sai->sai_replied, current_pid());
1191                         break;
1192                 }
1193         }
1194         ll_dir_chain_fini(&chain);
1195         ll_finish_md_op_data(op_data);
1196
1197         if (rc < 0) {
1198                 spin_lock(&lli->lli_sa_lock);
1199                 thread_set_flags(thread, SVC_STOPPING);
1200                 lli->lli_sa_enabled = 0;
1201                 spin_unlock(&lli->lli_sa_lock);
1202         }
1203
1204         /* statahead is finished, but statahead entries need to be cached, wait
1205          * for file release to stop me. */
1206         while (thread_is_running(thread)) {
1207                 l_wait_event(thread->t_ctl_waitq,
1208                              !sa_received_empty(sai) ||
1209                              !agl_list_empty(sai) ||
1210                              !thread_is_running(thread),
1211                              &lwi);
1212
1213                 ll_post_statahead(sai);
1214         }
1215
1216         EXIT;
1217 out:
1218         if (sai->sai_agl_valid) {
1219                 spin_lock(&lli->lli_agl_lock);
1220                 thread_set_flags(agl_thread, SVC_STOPPING);
1221                 spin_unlock(&lli->lli_agl_lock);
1222                 wake_up(&agl_thread->t_ctl_waitq);
1223
1224                 CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
1225                        sai, (unsigned int)agl_thread->t_pid);
1226                 l_wait_event(agl_thread->t_ctl_waitq,
1227                              thread_is_stopped(agl_thread),
1228                              &lwi);
1229         } else {
1230                 /* Set agl_thread flags anyway. */
1231                 thread_set_flags(agl_thread, SVC_STOPPED);
1232         }
1233
1234         /* wait for inflight statahead RPCs to finish, and then we can free sai
1235          * safely because statahead RPC will access sai data */
1236         while (sai->sai_sent != sai->sai_replied) {
1237                 /* in case we're not woken up, timeout wait */
1238                 lwi = LWI_TIMEOUT(HZ >> 3, NULL, NULL);
1239                 l_wait_event(thread->t_ctl_waitq,
1240                         sai->sai_sent == sai->sai_replied, &lwi);
1241         }
1242
1243         /* release resources held by received entries. */
1244         ll_post_statahead(sai);
1245
1246         spin_lock(&lli->lli_sa_lock);
1247         thread_set_flags(thread, SVC_STOPPED);
1248         spin_unlock(&lli->lli_sa_lock);
1249
1250         wake_up(&sai->sai_waitq);
1251         wake_up(&thread->t_ctl_waitq);
1252         ll_sai_put(sai);
1253         CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %.*s\n",
1254                sai, parent->d_name.len, parent->d_name.name);
1255         dput(parent);
1256         return rc;
1257 }
1258
1259 /* authorize opened dir handle @key to statahead later */
1260 void ll_authorize_statahead(struct inode *dir, void *key)
1261 {
1262         struct ll_inode_info *lli = ll_i2info(dir);
1263
1264         spin_lock(&lli->lli_sa_lock);
1265         if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL) {
1266                 /*
1267                  * if lli_sai is not NULL, it means previous statahead is not
1268                  * finished yet, we'd better not start a new statahead for now.
1269                  */
1270                 LASSERT(lli->lli_opendir_pid == 0);
1271                 lli->lli_opendir_key = key;
1272                 lli->lli_opendir_pid = current_pid();
1273                 lli->lli_sa_enabled = 1;
1274         }
1275         spin_unlock(&lli->lli_sa_lock);
1276 }
1277
1278 /* deauthorize opened dir handle @key to statahead, but statahead thread may
1279  * still be running, notify it to quit. */
1280 void ll_deauthorize_statahead(struct inode *dir, void *key)
1281 {
1282         struct ll_inode_info *lli = ll_i2info(dir);
1283         struct ll_statahead_info *sai;
1284
1285         LASSERT(lli->lli_opendir_key == key);
1286         LASSERT(lli->lli_opendir_pid != 0);
1287
1288         CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1289                 PFID(&lli->lli_fid));
1290
1291         spin_lock(&lli->lli_sa_lock);
1292         lli->lli_opendir_key = NULL;
1293         lli->lli_opendir_pid = 0;
1294         lli->lli_sa_enabled = 0;
1295         sai = lli->lli_sai;
1296         if (sai != NULL && thread_is_running(&sai->sai_thread)) {
1297                 /*
1298                  * statahead thread may not quit yet because it needs to cache
1299                  * stated entries, now it's time to tell it to quit.
1300                  */
1301                 thread_set_flags(&sai->sai_thread, SVC_STOPPING);
1302                 wake_up(&sai->sai_thread.t_ctl_waitq);
1303         }
1304         spin_unlock(&lli->lli_sa_lock);
1305 }
1306
1307 enum {
1308         /**
1309          * not first dirent, or is "."
1310          */
1311         LS_NONE_FIRST_DE = 0,
1312         /**
1313          * the first non-hidden dirent
1314          */
1315         LS_FIRST_DE,
1316         /**
1317          * the first hidden dirent, that is "."
1318          */
1319         LS_FIRST_DOT_DE
1320 };
1321
1322 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1323 {
1324         struct ll_dir_chain   chain;
1325         struct qstr          *target = &dentry->d_name;
1326         struct md_op_data    *op_data;
1327         int                   dot_de;
1328         struct page          *page = NULL;
1329         int                   rc     = LS_NONE_FIRST_DE;
1330         __u64                 pos = 0;
1331         ENTRY;
1332
1333         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1334                                      LUSTRE_OPC_ANY, dir);
1335         if (IS_ERR(op_data))
1336                 RETURN(PTR_ERR(op_data));
1337         /**
1338          *FIXME choose the start offset of the readdir
1339          */
1340         op_data->op_stripe_offset = 0;
1341         op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
1342
1343         ll_dir_chain_init(&chain);
1344         page = ll_get_dir_page(dir, op_data, 0, &chain);
1345
1346         while (1) {
1347                 struct lu_dirpage *dp;
1348                 struct lu_dirent  *ent;
1349
1350                 if (IS_ERR(page)) {
1351                         struct ll_inode_info *lli = ll_i2info(dir);
1352
1353                         rc = PTR_ERR(page);
1354                         CERROR("%s: reading dir "DFID" at "LPU64
1355                                "opendir_pid = %u : rc = %d\n",
1356                                ll_get_fsname(dir->i_sb, NULL, 0),
1357                                PFID(ll_inode2fid(dir)), pos,
1358                                lli->lli_opendir_pid, rc);
1359                         break;
1360                 }
1361
1362                 dp = page_address(page);
1363                 for (ent = lu_dirent_start(dp); ent != NULL;
1364                      ent = lu_dirent_next(ent)) {
1365                         __u64 hash;
1366                         int namelen;
1367                         char *name;
1368
1369                         hash = le64_to_cpu(ent->lde_hash);
1370                         /* The ll_get_dir_page() can return any page containing
1371                          * the given hash which may be not the start hash. */
1372                         if (unlikely(hash < pos))
1373                                 continue;
1374
1375                         namelen = le16_to_cpu(ent->lde_namelen);
1376                         if (unlikely(namelen == 0))
1377                                 /*
1378                                  * skip dummy record.
1379                                  */
1380                                 continue;
1381
1382                         name = ent->lde_name;
1383                         if (name[0] == '.') {
1384                                 if (namelen == 1)
1385                                         /*
1386                                          * skip "."
1387                                          */
1388                                         continue;
1389                                 else if (name[1] == '.' && namelen == 2)
1390                                         /*
1391                                          * skip ".."
1392                                          */
1393                                         continue;
1394                                 else
1395                                         dot_de = 1;
1396                         } else {
1397                                 dot_de = 0;
1398                         }
1399
1400                         if (dot_de && target->name[0] != '.') {
1401                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1402                                        target->len, target->name,
1403                                        namelen, name);
1404                                 continue;
1405                         }
1406
1407                         if (target->len != namelen ||
1408                             memcmp(target->name, name, namelen) != 0)
1409                                 rc = LS_NONE_FIRST_DE;
1410                         else if (!dot_de)
1411                                 rc = LS_FIRST_DE;
1412                         else
1413                                 rc = LS_FIRST_DOT_DE;
1414
1415                         ll_release_page(dir, page, false);
1416                         GOTO(out, rc);
1417                 }
1418                 pos = le64_to_cpu(dp->ldp_hash_end);
1419                 if (pos == MDS_DIR_END_OFF) {
1420                         /*
1421                          * End of directory reached.
1422                          */
1423                         ll_release_page(dir, page, false);
1424                         GOTO(out, rc);
1425                 } else {
1426                         /*
1427                          * chain is exhausted
1428                          * Normal case: continue to the next page.
1429                          */
1430                         ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
1431                                               LDF_COLLIDE);
1432                         page = ll_get_dir_page(dir, op_data, pos, &chain);
1433                 }
1434         }
1435         EXIT;
1436 out:
1437         ll_dir_chain_fini(&chain);
1438         ll_finish_md_op_data(op_data);
1439         return rc;
1440 }
1441
1442 static void
1443 ll_sai_unplug(struct ll_statahead_info *sai, struct ll_sa_entry *entry)
1444 {
1445         if (entry != NULL && entry->se_stat == SA_ENTRY_SUCC) {
1446                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode);
1447
1448                 sai->sai_hit++;
1449                 sai->sai_consecutive_miss = 0;
1450                 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
1451         } else {
1452                 sai->sai_miss++;
1453                 sai->sai_consecutive_miss++;
1454         }
1455         ll_sa_entry_fini(sai, entry);
1456         wake_up(&sai->sai_thread.t_ctl_waitq);
1457 }
1458
1459 static int revalidate_statahead_dentry(struct inode *dir,
1460                                         struct ll_statahead_info *sai,
1461                                         struct dentry **dentryp,
1462                                         int only_unplug)
1463 {
1464         struct ll_sa_entry *entry = NULL;
1465         struct l_wait_info lwi = { 0 };
1466         int rc = 0;
1467         ENTRY;
1468
1469         if ((*dentryp)->d_name.name[0] == '.') {
1470                 if (sai->sai_ls_all ||
1471                     sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1472                         /*
1473                          * Hidden dentry is the first one, or statahead
1474                          * thread does not skip so many hidden dentries
1475                          * before "sai_ls_all" enabled as below.
1476                          */
1477                 } else {
1478                         if (!sai->sai_ls_all)
1479                                 /*
1480                                  * It maybe because hidden dentry is not
1481                                  * the first one, "sai_ls_all" was not
1482                                  * set, then "ls -al" missed. Enable
1483                                  * "sai_ls_all" for such case.
1484                                  */
1485                                 sai->sai_ls_all = 1;
1486
1487                         /*
1488                          * Such "getattr" has been skipped before
1489                          * "sai_ls_all" enabled as above.
1490                          */
1491                         sai->sai_miss_hidden++;
1492                         RETURN(-EAGAIN);
1493                 }
1494         }
1495
1496         entry = ll_sa_entry_get_byname(sai, &(*dentryp)->d_name);
1497         if (entry == NULL || only_unplug) {
1498                 ll_sai_unplug(sai, entry);
1499                 RETURN(entry ? 1 : -EAGAIN);
1500         }
1501
1502         /* if statahead is busy in readdir, help it do post-work */
1503         if (!ll_sa_entry_stated(entry) && sai->sai_in_readpage)
1504                 ll_post_statahead(sai);
1505
1506         if (!ll_sa_entry_stated(entry)) {
1507                 sai->sai_index_wait = entry->se_index;
1508                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL,
1509                                         LWI_ON_SIGNAL_NOOP, NULL);
1510                 rc = l_wait_event(sai->sai_waitq,
1511                                 ll_sa_entry_stated(entry) ||
1512                                 thread_is_stopped(&sai->sai_thread),
1513                                 &lwi);
1514                 if (rc < 0) {
1515                         ll_sai_unplug(sai, entry);
1516                         RETURN(-EAGAIN);
1517                 }
1518         }
1519
1520         if (entry->se_stat == SA_ENTRY_SUCC && entry->se_inode != NULL) {
1521                 struct inode *inode = entry->se_inode;
1522                 struct lookup_intent it = { .it_op = IT_GETATTR,
1523                                             .d.lustre.it_lock_handle =
1524                                                 entry->se_handle };
1525                 __u64 bits;
1526
1527                 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1528                                         ll_inode2fid(inode), &bits);
1529                 if (rc == 1) {
1530                         if ((*dentryp)->d_inode == NULL) {
1531                                 struct dentry *alias;
1532
1533                                 alias = ll_splice_alias(inode, *dentryp);
1534                                 if (IS_ERR(alias)) {
1535                                         ll_sai_unplug(sai, entry);
1536                                         RETURN(PTR_ERR(alias));
1537                                 }
1538                                 *dentryp = alias;
1539                         } else if ((*dentryp)->d_inode != inode) {
1540                                 /* revalidate, but inode is recreated */
1541                                 CDEBUG(D_READA,
1542                                         "%s: stale dentry %.*s inode "
1543                                         DFID", statahead inode "DFID
1544                                         "\n",
1545                                         ll_get_fsname((*dentryp)->d_inode->i_sb,
1546                                                       NULL, 0),
1547                                         (*dentryp)->d_name.len,
1548                                         (*dentryp)->d_name.name,
1549                                         PFID(ll_inode2fid((*dentryp)->d_inode)),
1550                                         PFID(ll_inode2fid(inode)));
1551                                 ll_sai_unplug(sai, entry);
1552                                 RETURN(-ESTALE);
1553                         } else {
1554                                 iput(inode);
1555                         }
1556                         entry->se_inode = NULL;
1557
1558                         if ((bits & MDS_INODELOCK_LOOKUP) &&
1559                             d_lustre_invalid(*dentryp))
1560                                 d_lustre_revalidate(*dentryp);
1561                         ll_intent_release(&it);
1562                 }
1563         }
1564
1565         ll_sai_unplug(sai, entry);
1566         RETURN(rc);
1567 }
1568
1569 static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
1570 {
1571         struct ll_inode_info *lli = ll_i2info(dir);
1572         struct ll_statahead_info *sai = NULL;
1573         struct dentry *parent;
1574         struct ptlrpc_thread *thread;
1575         struct l_wait_info lwi = { 0 };
1576         struct task_struct *task;
1577         int rc;
1578         ENTRY;
1579
1580         /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1581         rc = is_first_dirent(dir, dentry);
1582         if (rc == LS_NONE_FIRST_DE)
1583                 /* It is not "ls -{a}l" operation, no need statahead for it. */
1584                 GOTO(out, rc = -EAGAIN);
1585
1586         sai = ll_sai_alloc();
1587         if (sai == NULL)
1588                 GOTO(out, rc = -ENOMEM);
1589
1590         sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
1591         sai->sai_inode = igrab(dir);
1592         if (unlikely(sai->sai_inode == NULL)) {
1593                 CWARN("Do not start stat ahead on dying inode "DFID"\n",
1594                         PFID(&lli->lli_fid));
1595                 GOTO(out, rc = -ESTALE);
1596         }
1597
1598         /* get parent reference count here, and put it in ll_statahead_thread */
1599         parent = dget(dentry->d_parent);
1600         if (unlikely(sai->sai_inode != parent->d_inode)) {
1601                 struct ll_inode_info *nlli = ll_i2info(parent->d_inode);
1602
1603                 CWARN("Race condition, someone changed %.*s just now: "
1604                         "old parent "DFID", new parent "DFID"\n",
1605                         dentry->d_name.len, dentry->d_name.name,
1606                         PFID(&lli->lli_fid), PFID(&nlli->lli_fid));
1607                 dput(parent);
1608                 iput(sai->sai_inode);
1609                 GOTO(out, rc = -EAGAIN);
1610         }
1611
1612         CDEBUG(D_READA, "start statahead thread: sai %p, parent %.*s\n",
1613                sai, parent->d_name.len, parent->d_name.name);
1614
1615         lli->lli_sai = sai;
1616
1617         task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u",
1618                            lli->lli_opendir_pid);
1619         thread = &sai->sai_thread;
1620         if (IS_ERR(task)) {
1621                 rc = PTR_ERR(task);
1622                 CERROR("cannot start ll_sa thread: rc = %d\n", rc);
1623                 dput(parent);
1624                 lli->lli_opendir_key = NULL;
1625                 thread_set_flags(thread, SVC_STOPPED);
1626                 thread_set_flags(&sai->sai_agl_thread, SVC_STOPPED);
1627                 ll_sai_put(sai);
1628                 LASSERT(lli->lli_sai == NULL);
1629                 RETURN(-EAGAIN);
1630         }
1631
1632         l_wait_event(thread->t_ctl_waitq,
1633                      thread_is_running(thread) || thread_is_stopped(thread),
1634                      &lwi);
1635         atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_running);
1636         ll_sai_put(sai);
1637
1638         /*
1639          * We don't stat-ahead for the first dirent since we are already in
1640          * lookup.
1641          */
1642         RETURN(-EAGAIN);
1643
1644 out:
1645         if (sai != NULL)
1646                 OBD_FREE_PTR(sai);
1647         spin_lock(&lli->lli_sa_lock);
1648         lli->lli_opendir_key = NULL;
1649         lli->lli_opendir_pid = 0;
1650         lli->lli_sa_enabled = 0;
1651         spin_unlock(&lli->lli_sa_lock);
1652
1653         RETURN(rc);
1654 }
1655
1656 /**
1657  * Start statahead thread if this is the first dir entry.
1658  * Otherwise if a thread is started already, wait it until it is ahead of me.
1659  * \retval 1       -- find entry with lock in cache, the caller needs to do
1660  *                    nothing.
1661  * \retval 0       -- find entry in cache, but without lock, the caller needs
1662  *                    refresh from MDS.
1663  * \retval others  -- the caller need to process as non-statahead.
1664  */
1665 int do_statahead_enter(struct inode *dir, struct dentry **dentryp,
1666                        int only_unplug)
1667 {
1668         struct ll_statahead_info *sai;
1669
1670         sai = ll_sai_get(dir);
1671         if (sai != NULL) {
1672                 int rc;
1673
1674                 rc = revalidate_statahead_dentry(dir, sai, dentryp,
1675                                                  only_unplug);
1676                 CDEBUG(D_READA, "revalidate statahead %.*s: %d.\n",
1677                         (*dentryp)->d_name.len, (*dentryp)->d_name.name, rc);
1678                 ll_sai_put(sai);
1679                 return rc;
1680         }
1681
1682         return start_statahead_thread(dir, *dentryp);
1683 }