Whamcloud - gitweb
503235aa9374f6b161272d52678922f5530f0314
[fs/lustre-release.git] / lustre / llite / statahead.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/mm.h>
40 #include <linux/smp_lock.h>
41 #include <linux/highmem.h>
42 #include <linux/pagemap.h>
43
44 #define DEBUG_SUBSYSTEM S_LLITE
45
46 #include <obd_support.h>
47 #include <lustre_lite.h>
48 #include <lustre_dlm.h>
49 #include <linux/lustre_version.h>
50 #include "llite_internal.h"
51
52 struct ll_sai_entry {
53         cfs_list_t              se_list;
54         unsigned int            se_index;
55         int                     se_stat;
56         struct ptlrpc_request  *se_req;
57         struct md_enqueue_info *se_minfo;
58         struct dentry          *se_dentry;
59         struct inode           *se_inode;
60 };
61
62 enum {
63         SA_ENTRY_UNSTATED = 0,
64         SA_ENTRY_STATED
65 };
66
67 static unsigned int sai_generation = 0;
68 static cfs_spinlock_t sai_generation_lock = CFS_SPIN_LOCK_UNLOCKED;
69
70 /**
71  * Check whether first entry was stated already or not.
72  * No need to hold lli_lock, for:
73  * (1) it is me that remove entry from the list
74  * (2) the statahead thread only add new entry to the list
75  */
76 static int ll_sai_entry_stated(struct ll_statahead_info *sai)
77 {
78         struct ll_sai_entry  *entry;
79         int                   rc = 0;
80
81         if (!cfs_list_empty(&sai->sai_entries_stated)) {
82                 entry = cfs_list_entry(sai->sai_entries_stated.next,
83                                        struct ll_sai_entry, se_list);
84                 if (entry->se_index == sai->sai_index_next)
85                         rc = 1;
86         }
87         return rc;
88 }
89
90 static inline int sa_received_empty(struct ll_statahead_info *sai)
91 {
92         return cfs_list_empty(&sai->sai_entries_received);
93 }
94
95 static inline int sa_not_full(struct ll_statahead_info *sai)
96 {
97         return (sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max);
98 }
99
100 static inline int sa_is_running(struct ll_statahead_info *sai)
101 {
102         return !!(sai->sai_thread.t_flags & SVC_RUNNING);
103 }
104
105 static inline int sa_is_stopping(struct ll_statahead_info *sai)
106 {
107         return !!(sai->sai_thread.t_flags & SVC_STOPPING);
108 }
109
110 static inline int sa_is_stopped(struct ll_statahead_info *sai)
111 {
112         return !!(sai->sai_thread.t_flags & SVC_STOPPED);
113 }
114
115 /**
116  * (1) hit ratio less than 80%
117  * or
118  * (2) consecutive miss more than 8
119  */
120 static inline int sa_low_hit(struct ll_statahead_info *sai)
121 {
122         return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
123                 (sai->sai_consecutive_miss > 8));
124 }
125
126 static void ll_sai_entry_free(struct ll_sai_entry *entry)
127 {
128         struct dentry *dentry = entry->se_dentry;
129         struct inode  *inode  = entry->se_inode;
130
131         if (dentry) {
132                 entry->se_dentry = NULL;
133                 dput(dentry);
134         }
135         if (inode) {
136                 entry->se_inode = NULL;
137                 iput(inode);
138         }
139         LASSERT(cfs_list_empty(&entry->se_list));
140         OBD_FREE_PTR(entry);
141 }
142
143 /**
144  * process the deleted entry's member and free the entry.
145  * (1) release intent
146  * (2) free md_enqueue_info
147  * (3) drop dentry's ref count
148  * (4) release request's ref count
149  */
150 static void ll_sai_entry_cleanup(struct ll_sai_entry *entry, int free)
151 {
152         struct md_enqueue_info *minfo = entry->se_minfo;
153         struct ptlrpc_request  *req   = entry->se_req;
154         ENTRY;
155
156         if (minfo) {
157                 entry->se_minfo = NULL;
158                 ll_intent_release(&minfo->mi_it);
159                 dput(minfo->mi_dentry);
160                 iput(minfo->mi_dir);
161                 OBD_FREE_PTR(minfo);
162         }
163         if (req) {
164                 entry->se_req = NULL;
165                 ptlrpc_req_finished(req);
166         }
167         if (free)
168                 ll_sai_entry_free(entry);
169
170         EXIT;
171 }
172
173 static struct ll_statahead_info *ll_sai_alloc(void)
174 {
175         struct ll_statahead_info *sai;
176
177         OBD_ALLOC_PTR(sai);
178         if (!sai)
179                 return NULL;
180
181         cfs_spin_lock(&sai_generation_lock);
182         sai->sai_generation = ++sai_generation;
183         if (unlikely(sai_generation == 0))
184                 sai->sai_generation = ++sai_generation;
185         cfs_spin_unlock(&sai_generation_lock);
186         cfs_atomic_set(&sai->sai_refcount, 1);
187         sai->sai_max = LL_SA_RPC_MIN;
188         cfs_waitq_init(&sai->sai_waitq);
189         cfs_waitq_init(&sai->sai_thread.t_ctl_waitq);
190         CFS_INIT_LIST_HEAD(&sai->sai_entries_sent);
191         CFS_INIT_LIST_HEAD(&sai->sai_entries_received);
192         CFS_INIT_LIST_HEAD(&sai->sai_entries_stated);
193         return sai;
194 }
195
196 static inline
197 struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai)
198 {
199         LASSERT(sai);
200         cfs_atomic_inc(&sai->sai_refcount);
201         return sai;
202 }
203
204 static void ll_sai_put(struct ll_statahead_info *sai)
205 {
206         struct inode         *inode = sai->sai_inode;
207         struct ll_inode_info *lli;
208         ENTRY;
209
210         LASSERT(inode != NULL);
211         lli = ll_i2info(inode);
212         LASSERT(lli->lli_sai == sai);
213
214         if (cfs_atomic_dec_and_test(&sai->sai_refcount)) {
215                 struct ll_sai_entry *entry, *next;
216
217                 cfs_spin_lock(&lli->lli_lock);
218                 if (unlikely(cfs_atomic_read(&sai->sai_refcount) > 0)) {
219                         /* It is race case, the interpret callback just hold
220                          * a reference count */
221                         cfs_spin_unlock(&lli->lli_lock);
222                         EXIT;
223                         return;
224                 }
225
226                 LASSERT(lli->lli_opendir_key == NULL);
227                 lli->lli_sai = NULL;
228                 lli->lli_opendir_pid = 0;
229                 cfs_spin_unlock(&lli->lli_lock);
230
231                 LASSERT(sa_is_stopped(sai));
232
233                 if (sai->sai_sent > sai->sai_replied)
234                         CDEBUG(D_READA,"statahead for dir "DFID" does not "
235                               "finish: [sent:%u] [replied:%u]\n",
236                               PFID(&lli->lli_fid),
237                               sai->sai_sent, sai->sai_replied);
238
239                 cfs_list_for_each_entry_safe(entry, next,
240                                              &sai->sai_entries_sent, se_list) {
241                         cfs_list_del_init(&entry->se_list);
242                         ll_sai_entry_cleanup(entry, 1);
243                 }
244                 cfs_list_for_each_entry_safe(entry, next,
245                                              &sai->sai_entries_received,
246                                              se_list) {
247                         cfs_list_del_init(&entry->se_list);
248                         ll_sai_entry_cleanup(entry, 1);
249                 }
250                 cfs_list_for_each_entry_safe(entry, next,
251                                              &sai->sai_entries_stated,
252                                              se_list) {
253                         cfs_list_del_init(&entry->se_list);
254                         ll_sai_entry_cleanup(entry, 1);
255                 }
256                 iput(inode);
257                 OBD_FREE_PTR(sai);
258         }
259         EXIT;
260 }
261
262 /**
263  * insert it into sai_entries_sent tail when init.
264  */
265 static struct ll_sai_entry *
266 ll_sai_entry_init(struct ll_statahead_info *sai, unsigned int index)
267 {
268         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
269         struct ll_sai_entry  *entry;
270         ENTRY;
271
272         OBD_ALLOC_PTR(entry);
273         if (entry == NULL)
274                 RETURN(ERR_PTR(-ENOMEM));
275
276         CDEBUG(D_READA, "alloc sai entry %p index %u\n",
277                entry, index);
278         entry->se_index = index;
279         entry->se_stat = SA_ENTRY_UNSTATED;
280
281         cfs_spin_lock(&lli->lli_lock);
282         cfs_list_add_tail(&entry->se_list, &sai->sai_entries_sent);
283         cfs_spin_unlock(&lli->lli_lock);
284
285         RETURN(entry);
286 }
287
288 /**
289  * delete it from sai_entries_stated head when fini, it need not
290  * to process entry's member.
291  */
292 static int ll_sai_entry_fini(struct ll_statahead_info *sai)
293 {
294         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
295         struct ll_sai_entry  *entry;
296         int rc = 0;
297         ENTRY;
298
299         cfs_spin_lock(&lli->lli_lock);
300         sai->sai_index_next++;
301         if (likely(!cfs_list_empty(&sai->sai_entries_stated))) {
302                 entry = cfs_list_entry(sai->sai_entries_stated.next,
303                                        struct ll_sai_entry, se_list);
304                 if (entry->se_index < sai->sai_index_next) {
305                         cfs_list_del_init(&entry->se_list);
306                         rc = entry->se_stat;
307                         ll_sai_entry_free(entry);
308                 }
309         } else {
310                 LASSERT(sa_is_stopped(sai));
311         }
312         cfs_spin_unlock(&lli->lli_lock);
313
314         RETURN(rc);
315 }
316
317 /**
318  * inside lli_lock.
319  * \retval NULL : can not find the entry in sai_entries_sent with the index
320  * \retval entry: find the entry in sai_entries_sent with the index
321  */
322 static struct ll_sai_entry *
323 ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat,
324                  struct ptlrpc_request *req, struct md_enqueue_info *minfo)
325 {
326         struct ll_sai_entry *entry;
327         ENTRY;
328
329         if (!cfs_list_empty(&sai->sai_entries_sent)) {
330                 cfs_list_for_each_entry(entry, &sai->sai_entries_sent,
331                                         se_list) {
332                         if (entry->se_index == index) {
333                                 entry->se_stat = stat;
334                                 entry->se_req = ptlrpc_request_addref(req);
335                                 entry->se_minfo = minfo;
336                                 RETURN(entry);
337                         } else if (entry->se_index > index) {
338                                 RETURN(NULL);
339                         }
340                 }
341         }
342         RETURN(NULL);
343 }
344
345 /**
346  * inside lli_lock.
347  * Move entry to sai_entries_received and
348  * insert it into sai_entries_received tail.
349  */
350 static inline void
351 ll_sai_entry_to_received(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
352 {
353         if (!cfs_list_empty(&entry->se_list))
354                 cfs_list_del_init(&entry->se_list);
355         cfs_list_add_tail(&entry->se_list, &sai->sai_entries_received);
356 }
357
358 /**
359  * Move entry to sai_entries_stated and
360  * sort with the index.
361  */
362 static int
363 ll_sai_entry_to_stated(struct ll_statahead_info *sai, struct ll_sai_entry *entry)
364 {
365         struct ll_inode_info *lli = ll_i2info(sai->sai_inode);
366         struct ll_sai_entry  *se;
367         ENTRY;
368
369         ll_sai_entry_cleanup(entry, 0);
370
371         cfs_spin_lock(&lli->lli_lock);
372         if (!cfs_list_empty(&entry->se_list))
373                 cfs_list_del_init(&entry->se_list);
374
375         /* stale entry */
376         if (unlikely(entry->se_index < sai->sai_index_next)) {
377                 cfs_spin_unlock(&lli->lli_lock);
378                 ll_sai_entry_free(entry);
379                 RETURN(0);
380         }
381
382         cfs_list_for_each_entry_reverse(se, &sai->sai_entries_stated, se_list) {
383                 if (se->se_index < entry->se_index) {
384                         cfs_list_add(&entry->se_list, &se->se_list);
385                         cfs_spin_unlock(&lli->lli_lock);
386                         RETURN(1);
387                 }
388         }
389
390         /*
391          * I am the first entry.
392          */
393         cfs_list_add(&entry->se_list, &sai->sai_entries_stated);
394         cfs_spin_unlock(&lli->lli_lock);
395         RETURN(1);
396 }
397
398 /**
399  * finish lookup/revalidate.
400  */
401 static int do_statahead_interpret(struct ll_statahead_info *sai)
402 {
403         struct ll_inode_info   *lli = ll_i2info(sai->sai_inode);
404         struct ll_sai_entry    *entry;
405         struct ptlrpc_request  *req;
406         struct md_enqueue_info *minfo;
407         struct lookup_intent   *it;
408         struct dentry          *dentry;
409         int                     rc = 0;
410         struct mdt_body        *body;
411         ENTRY;
412
413         cfs_spin_lock(&lli->lli_lock);
414         LASSERT(!sa_received_empty(sai));
415         entry = cfs_list_entry(sai->sai_entries_received.next,
416                                struct ll_sai_entry, se_list);
417         cfs_list_del_init(&entry->se_list);
418         cfs_spin_unlock(&lli->lli_lock);
419
420         if (unlikely(entry->se_index < sai->sai_index_next)) {
421                 CWARN("Found stale entry: [index %u] [next %u]\n",
422                       entry->se_index, sai->sai_index_next);
423                 ll_sai_entry_cleanup(entry, 1);
424                 RETURN(0);
425         }
426
427         if (entry->se_stat != SA_ENTRY_STATED)
428                 GOTO(out, rc = entry->se_stat);
429
430         req = entry->se_req;
431         minfo = entry->se_minfo;
432         it = &minfo->mi_it;
433         dentry = minfo->mi_dentry;
434
435         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
436         if (body == NULL)
437                 GOTO(out, rc = -EFAULT);
438
439         if (dentry->d_inode == NULL) {
440                 /*
441                  * lookup.
442                  */
443                 struct dentry    *save = dentry;
444                 struct it_cb_data icbd = {
445                         .icbd_parent   = minfo->mi_dir,
446                         .icbd_childp   = &dentry
447                 };
448
449                 LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
450
451                 /* XXX: No fid in reply, this is probaly cross-ref case.
452                  * SA can't handle it yet. */
453                 if (body->valid & OBD_MD_MDS)
454                         GOTO(out, rc = -EAGAIN);
455
456                 /* Here dentry->d_inode might be NULL, because the entry may
457                  * have been removed before we start doing stat ahead. */
458
459                 /* BUG 15962, 21739: since statahead thread does not hold
460                  * parent's i_mutex, it can not alias the dentry to inode.
461                  * Here we just create/update inode in memory, and let the
462                  * main "ls -l" thread to alias such dentry to the inode with
463                  * parent's i_mutex held.
464                  * On the other hand, we hold ldlm ibits lock for the inode
465                  * yet, to allow other operations to cancel such lock in time,
466                  * we should drop the ldlm lock reference count, then the main
467                  * "ls -l" thread should check/get such ldlm ibits lock before
468                  * aliasing such dentry to the inode later. If we don't do such
469                  * drop here, it maybe cause deadlock with i_muext held by
470                  * others, just like bug 21739. */
471                 rc = ll_lookup_it_finish(req, it, &icbd, &entry->se_inode);
472                 if (entry->se_inode != NULL)
473                         entry->se_dentry = dget(dentry);
474                 LASSERT(dentry == save);
475                 ll_intent_drop_lock(it);
476         } else {
477                 /*
478                  * revalidate.
479                  */
480                 if (!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1)) {
481                         ll_unhash_aliases(dentry->d_inode);
482                         GOTO(out, rc = -EAGAIN);
483                 }
484
485                 rc = ll_revalidate_it_finish(req, it, dentry);
486                 if (rc) {
487                         ll_unhash_aliases(dentry->d_inode);
488                         GOTO(out, rc);
489                 }
490
491                 cfs_spin_lock(&ll_lookup_lock);
492                 spin_lock(&dcache_lock);
493                 lock_dentry(dentry);
494                 __d_drop(dentry);
495                 dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
496                 unlock_dentry(dentry);
497                 d_rehash_cond(dentry, 0);
498                 spin_unlock(&dcache_lock);
499                 cfs_spin_unlock(&ll_lookup_lock);
500
501                 ll_lookup_finish_locks(it, dentry);
502         }
503         EXIT;
504
505 out:
506         /* The "ll_sai_entry_to_stated()" will drop related ldlm ibits lock
507          * reference count with ll_intent_drop_lock() called in spite of the
508          * above operations failed or not. Do not worry about calling
509          * "ll_intent_drop_lock()" more than once. */
510         if (likely(ll_sai_entry_to_stated(sai, entry)))
511                 cfs_waitq_signal(&sai->sai_waitq);
512         return rc;
513 }
514
515 static int ll_statahead_interpret(struct ptlrpc_request *req,
516                                   struct md_enqueue_info *minfo,
517                                   int rc)
518 {
519         struct lookup_intent     *it = &minfo->mi_it;
520         struct dentry            *dentry = minfo->mi_dentry;
521         struct inode             *dir = minfo->mi_dir;
522         struct ll_inode_info     *lli = ll_i2info(dir);
523         struct ll_statahead_info *sai;
524         struct ll_sai_entry      *entry;
525         ENTRY;
526
527         CDEBUG(D_READA, "interpret statahead %.*s rc %d\n",
528                dentry->d_name.len, dentry->d_name.name, rc);
529
530         cfs_spin_lock(&lli->lli_lock);
531         /* stale entry */
532         if (unlikely(lli->lli_sai == NULL ||
533             lli->lli_sai->sai_generation != minfo->mi_generation)) {
534                 cfs_spin_unlock(&lli->lli_lock);
535                 ll_intent_release(it);
536                 dput(dentry);
537                 iput(dir);
538                 OBD_FREE_PTR(minfo);
539                 RETURN(-ESTALE);
540         } else {
541                 sai = ll_sai_get(lli->lli_sai);
542                 entry = ll_sai_entry_set(sai,
543                                          (unsigned int)(long)minfo->mi_cbdata,
544                                          rc < 0 ? rc : SA_ENTRY_STATED, req,
545                                          minfo);
546                 LASSERT(entry != NULL);
547                 if (likely(sa_is_running(sai))) {
548                         ll_sai_entry_to_received(sai, entry);
549                         sai->sai_replied++;
550                         cfs_spin_unlock(&lli->lli_lock);
551                         cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
552                 } else {
553                         if (!cfs_list_empty(&entry->se_list))
554                                 cfs_list_del_init(&entry->se_list);
555                         sai->sai_replied++;
556                         cfs_spin_unlock(&lli->lli_lock);
557                         ll_sai_entry_cleanup(entry, 1);
558                 }
559                 ll_sai_put(sai);
560                 RETURN(rc);
561         }
562 }
563
564 static void sa_args_fini(struct md_enqueue_info *minfo,
565                          struct ldlm_enqueue_info *einfo)
566 {
567         LASSERT(minfo && einfo);
568         iput(minfo->mi_dir);
569         capa_put(minfo->mi_data.op_capa1);
570         capa_put(minfo->mi_data.op_capa2);
571         OBD_FREE_PTR(minfo);
572         OBD_FREE_PTR(einfo);
573 }
574
575 /**
576  * There is race condition between "capa_put" and "ll_statahead_interpret" for
577  * accessing "op_data.op_capa[1,2]" as following:
578  * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling
579  * "md_intent_getattr_async". But "ll_statahead_interpret" maybe run first, and
580  * fill "op_data.op_capa[1,2]" as POISON, then cause "capa_put" access invalid
581  * "ocapa". So here reserve "op_data.op_capa[1,2]" in "pcapa" before calling
582  * "md_intent_getattr_async".
583  */
584 static int sa_args_init(struct inode *dir, struct dentry *dentry,
585                         struct md_enqueue_info **pmi,
586                         struct ldlm_enqueue_info **pei,
587                         struct obd_capa **pcapa)
588 {
589         struct ll_inode_info     *lli = ll_i2info(dir);
590         struct md_enqueue_info   *minfo;
591         struct ldlm_enqueue_info *einfo;
592         struct md_op_data        *op_data;
593
594         OBD_ALLOC_PTR(einfo);
595         if (einfo == NULL)
596                 return -ENOMEM;
597
598         OBD_ALLOC_PTR(minfo);
599         if (minfo == NULL) {
600                 OBD_FREE_PTR(einfo);
601                 return -ENOMEM;
602         }
603
604         op_data = ll_prep_md_op_data(&minfo->mi_data, dir, dentry->d_inode,
605                                      dentry->d_name.name, dentry->d_name.len,
606                                      0, LUSTRE_OPC_ANY, NULL);
607         if (IS_ERR(op_data)) {
608                 OBD_FREE_PTR(einfo);
609                 OBD_FREE_PTR(minfo);
610                 return PTR_ERR(op_data);
611         }
612
613         minfo->mi_it.it_op = IT_GETATTR;
614         minfo->mi_dentry = dentry;
615         minfo->mi_dir = igrab(dir);
616         minfo->mi_cb = ll_statahead_interpret;
617         minfo->mi_generation = lli->lli_sai->sai_generation;
618         minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index;
619
620         einfo->ei_type   = LDLM_IBITS;
621         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
622         einfo->ei_cb_bl  = ll_md_blocking_ast;
623         einfo->ei_cb_cp  = ldlm_completion_ast;
624         einfo->ei_cb_gl  = NULL;
625         einfo->ei_cbdata = NULL;
626
627         *pmi = minfo;
628         *pei = einfo;
629         pcapa[0] = op_data->op_capa1;
630         pcapa[1] = op_data->op_capa2;
631
632         return 0;
633 }
634
635 /**
636  * similar to ll_lookup_it().
637  */
638 static int do_sa_lookup(struct inode *dir, struct dentry *dentry)
639 {
640         struct md_enqueue_info   *minfo;
641         struct ldlm_enqueue_info *einfo;
642         struct obd_capa          *capas[2];
643         int                       rc;
644         ENTRY;
645
646         rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
647         if (rc)
648                 RETURN(rc);
649
650         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
651         if (!rc) {
652                 capa_put(capas[0]);
653                 capa_put(capas[1]);
654         } else {
655                 sa_args_fini(minfo, einfo);
656         }
657
658         RETURN(rc);
659 }
660
661 /**
662  * similar to ll_revalidate_it().
663  * \retval      1 -- dentry valid
664  * \retval      0 -- will send stat-ahead request
665  * \retval others -- prepare stat-ahead request failed
666  */
667 static int do_sa_revalidate(struct inode *dir, struct dentry *dentry)
668 {
669         struct inode             *inode = dentry->d_inode;
670         struct lookup_intent      it = { .it_op = IT_GETATTR };
671         struct md_enqueue_info   *minfo;
672         struct ldlm_enqueue_info *einfo;
673         struct obd_capa          *capas[2];
674         int rc;
675         ENTRY;
676
677         if (unlikely(inode == NULL))
678                 RETURN(1);
679
680         if (d_mountpoint(dentry))
681                 RETURN(1);
682
683         if (unlikely(dentry == dentry->d_sb->s_root))
684                 RETURN(1);
685
686         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
687                                 NULL);
688         if (rc == 1) {
689                 ll_intent_release(&it);
690                 RETURN(1);
691         }
692
693         rc = sa_args_init(dir, dentry, &minfo, &einfo, capas);
694         if (rc)
695                 RETURN(rc);
696
697         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
698         if (!rc) {
699                 capa_put(capas[0]);
700                 capa_put(capas[1]);
701         } else {
702                 sa_args_fini(minfo, einfo);
703         }
704
705         RETURN(rc);
706 }
707
708 static inline void ll_name2qstr(struct qstr *q, const char *name, int namelen)
709 {
710         q->name = name;
711         q->len  = namelen;
712         q->hash = full_name_hash(name, namelen);
713 }
714
715 static int ll_statahead_one(struct dentry *parent, const char* entry_name,
716                             int entry_name_len)
717 {
718         struct inode             *dir = parent->d_inode;
719         struct ll_inode_info     *lli = ll_i2info(dir);
720         struct ll_statahead_info *sai = lli->lli_sai;
721         struct qstr               name;
722         struct dentry            *dentry;
723         struct ll_sai_entry      *se;
724         int                       rc;
725         ENTRY;
726
727         if (parent->d_flags & DCACHE_LUSTRE_INVALID) {
728                 CDEBUG(D_READA, "parent dentry@%p %.*s is "
729                        "invalid, skip statahead\n",
730                        parent, parent->d_name.len, parent->d_name.name);
731                 RETURN(-EINVAL);
732         }
733
734         se = ll_sai_entry_init(sai, sai->sai_index);
735         if (IS_ERR(se))
736                 RETURN(PTR_ERR(se));
737
738         ll_name2qstr(&name, entry_name, entry_name_len);
739         dentry = d_lookup(parent, &name);
740         if (!dentry) {
741                 dentry = d_alloc(parent, &name);
742                 if (dentry) {
743                         rc = do_sa_lookup(dir, dentry);
744                         if (rc)
745                                 dput(dentry);
746                 } else {
747                         GOTO(out, rc = -ENOMEM);
748                 }
749         } else {
750                 rc = do_sa_revalidate(dir, dentry);
751                 if (rc)
752                         dput(dentry);
753         }
754
755         EXIT;
756
757 out:
758         if (rc) {
759                 CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n",
760                        se, se->se_index, se->se_stat, rc);
761                 se->se_stat = rc < 0 ? rc : SA_ENTRY_STATED;
762                 if (ll_sai_entry_to_stated(sai, se))
763                         cfs_waitq_signal(&sai->sai_waitq);
764         } else {
765                 sai->sai_sent++;
766         }
767
768         sai->sai_index++;
769         return rc;
770 }
771
772 static int ll_statahead_thread(void *arg)
773 {
774         struct dentry            *parent = (struct dentry *)arg;
775         struct inode             *dir = parent->d_inode;
776         struct ll_inode_info     *lli = ll_i2info(dir);
777         struct ll_sb_info        *sbi = ll_i2sbi(dir);
778         struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai);
779         struct ptlrpc_thread     *thread = &sai->sai_thread;
780         struct page              *page;
781         __u64                     pos = 0;
782         int                       first = 0;
783         int                       rc = 0;
784         struct ll_dir_chain       chain;
785         ENTRY;
786
787         {
788                 char pname[16];
789                 snprintf(pname, 15, "ll_sa_%u", lli->lli_opendir_pid);
790                 cfs_daemonize(pname);
791         }
792
793         atomic_inc(&sbi->ll_sa_total);
794         cfs_spin_lock(&lli->lli_lock);
795         thread->t_flags = SVC_RUNNING;
796         cfs_spin_unlock(&lli->lli_lock);
797         cfs_waitq_signal(&thread->t_ctl_waitq);
798         CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name);
799
800         ll_dir_chain_init(&chain);
801         page = ll_get_dir_page(dir, pos, 0, &chain);
802
803         while (1) {
804                 struct l_wait_info lwi = { 0 };
805                 struct lu_dirpage *dp;
806                 struct lu_dirent  *ent;
807
808                 if (IS_ERR(page)) {
809                         rc = PTR_ERR(page);
810                         CDEBUG(D_READA, "error reading dir "DFID" at "LPU64
811                                "/%u: [rc %d] [parent %u]\n",
812                                PFID(ll_inode2fid(dir)), pos, sai->sai_index,
813                                rc, lli->lli_opendir_pid);
814                         break;
815                 }
816
817                 dp = page_address(page);
818                 for (ent = lu_dirent_start(dp); ent != NULL;
819                      ent = lu_dirent_next(ent)) {
820                         char *name = ent->lde_name;
821                         int namelen = le16_to_cpu(ent->lde_namelen);
822
823                         if (unlikely(namelen == 0))
824                                 /*
825                                  * Skip dummy record.
826                                  */
827                                 continue;
828
829                         if (name[0] == '.') {
830                                 if (namelen == 1) {
831                                         /*
832                                          * skip "."
833                                          */
834                                         continue;
835                                 } else if (name[1] == '.' && namelen == 2) {
836                                         /*
837                                          * skip ".."
838                                          */
839                                         continue;
840                                 } else if (!sai->sai_ls_all) {
841                                         /*
842                                          * skip hidden files.
843                                          */
844                                         sai->sai_skip_hidden++;
845                                         continue;
846                                 }
847                         }
848
849                         /*
850                          * don't stat-ahead first entry.
851                          */
852                         if (unlikely(!first)) {
853                                 first++;
854                                 continue;
855                         }
856
857 keep_de:
858                         l_wait_event(thread->t_ctl_waitq,
859                                      !sa_is_running(sai) || sa_not_full(sai) ||
860                                      !sa_received_empty(sai),
861                                      &lwi);
862
863                         while (!sa_received_empty(sai) && sa_is_running(sai))
864                                 do_statahead_interpret(sai);
865
866                         if (unlikely(!sa_is_running(sai))) {
867                                 ll_put_page(page);
868                                 GOTO(out, rc);
869                         }
870
871                         if (!sa_not_full(sai))
872                                 /*
873                                  * do not skip the current de.
874                                  */
875                                 goto keep_de;
876
877                         rc = ll_statahead_one(parent, name, namelen);
878                         if (rc < 0) {
879                                 ll_put_page(page);
880                                 GOTO(out, rc);
881                         }
882                 }
883                 pos = le64_to_cpu(dp->ldp_hash_end);
884                 ll_put_page(page);
885                 if (pos == DIR_END_OFF) {
886                         /*
887                          * End of directory reached.
888                          */
889                         while (1) {
890                                 l_wait_event(thread->t_ctl_waitq,
891                                              !sa_is_running(sai) ||
892                                              !sa_received_empty(sai) ||
893                                              sai->sai_sent == sai->sai_replied,
894                                              &lwi);
895                                 if (!sa_received_empty(sai) &&
896                                     sa_is_running(sai))
897                                         do_statahead_interpret(sai);
898                                 else
899                                         GOTO(out, rc);
900                         }
901                 } else if (1) {
902                         /*
903                          * chain is exhausted.
904                          * Normal case: continue to the next page.
905                          */
906                         page = ll_get_dir_page(dir, pos, 1, &chain);
907                 } else {
908                         /*
909                          * go into overflow page.
910                          */
911                 }
912         }
913         EXIT;
914
915 out:
916         ll_dir_chain_fini(&chain);
917         cfs_spin_lock(&lli->lli_lock);
918         thread->t_flags = SVC_STOPPED;
919         cfs_spin_unlock(&lli->lli_lock);
920         cfs_waitq_signal(&sai->sai_waitq);
921         cfs_waitq_signal(&thread->t_ctl_waitq);
922         ll_sai_put(sai);
923         dput(parent);
924         CDEBUG(D_READA, "statahead thread stopped, pid %d\n",
925                cfs_curproc_pid());
926         return rc;
927 }
928
929 /**
930  * called in ll_file_release().
931  */
932 void ll_stop_statahead(struct inode *inode, void *key)
933 {
934         struct ll_inode_info *lli = ll_i2info(inode);
935
936         if (unlikely(key == NULL))
937                 return;
938
939         cfs_spin_lock(&lli->lli_lock);
940         if (lli->lli_opendir_key != key || lli->lli_opendir_pid == 0) {
941                 cfs_spin_unlock(&lli->lli_lock);
942                 return;
943         }
944
945         lli->lli_opendir_key = NULL;
946
947         if (lli->lli_sai) {
948                 struct l_wait_info lwi = { 0 };
949                 struct ptlrpc_thread *thread = &lli->lli_sai->sai_thread;
950
951                 if (!sa_is_stopped(lli->lli_sai)) {
952                         thread->t_flags = SVC_STOPPING;
953                         cfs_spin_unlock(&lli->lli_lock);
954                         cfs_waitq_signal(&thread->t_ctl_waitq);
955
956                         CDEBUG(D_READA, "stopping statahead thread, pid %d\n",
957                                cfs_curproc_pid());
958                         l_wait_event(thread->t_ctl_waitq,
959                                      sa_is_stopped(lli->lli_sai),
960                                      &lwi);
961                 } else {
962                         cfs_spin_unlock(&lli->lli_lock);
963                 }
964
965                 /*
966                  * Put the ref which was held when first statahead_enter.
967                  * It maybe not the last ref for some statahead requests
968                  * maybe inflight.
969                  */
970                 ll_sai_put(lli->lli_sai);
971         } else {
972                 lli->lli_opendir_pid = 0;
973                 cfs_spin_unlock(&lli->lli_lock);
974         }
975 }
976
977 enum {
978         /**
979          * not first dirent, or is "."
980          */
981         LS_NONE_FIRST_DE = 0,
982         /**
983          * the first non-hidden dirent
984          */
985         LS_FIRST_DE,
986         /**
987          * the first hidden dirent, that is "." 
988          */
989         LS_FIRST_DOT_DE
990 };
991
992 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
993 {
994         struct ll_dir_chain chain;
995         struct qstr        *target = &dentry->d_name;
996         struct page        *page;
997         __u64               pos = 0;
998         int                 dot_de;
999         int                 rc = LS_NONE_FIRST_DE;
1000         ENTRY;
1001
1002         ll_dir_chain_init(&chain);
1003         page = ll_get_dir_page(dir, pos, 0, &chain);
1004
1005         while (1) {
1006                 struct lu_dirpage *dp;
1007                 struct lu_dirent  *ent;
1008
1009                 if (IS_ERR(page)) {
1010                         struct ll_inode_info *lli = ll_i2info(dir);
1011
1012                         rc = PTR_ERR(page);
1013                         CERROR("error reading dir "DFID" at "LPU64": "
1014                                "[rc %d] [parent %u]\n",
1015                                PFID(ll_inode2fid(dir)), pos,
1016                                rc, lli->lli_opendir_pid);
1017                         break;
1018                 }
1019
1020                 dp = page_address(page);
1021                 for (ent = lu_dirent_start(dp); ent != NULL;
1022                      ent = lu_dirent_next(ent)) {
1023                         char *name = ent->lde_name;
1024                         int namelen = le16_to_cpu(ent->lde_namelen);
1025
1026                         if (namelen == 0)
1027                                 /*
1028                                  * skip dummy record.
1029                                  */
1030                                 continue;
1031
1032                         if (name[0] == '.') {
1033                                 if (namelen == 1)
1034                                         /*
1035                                          * skip "."
1036                                          */
1037                                         continue;
1038                                 else if (name[1] == '.' && namelen == 2)
1039                                         /*
1040                                          * skip ".."
1041                                          */
1042                                         continue;
1043                                 else
1044                                         dot_de = 1;
1045                         } else {
1046                                 dot_de = 0;
1047                         }
1048
1049                         if (dot_de && target->name[0] != '.') {
1050                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1051                                        target->len, target->name,
1052                                        namelen, name);
1053                                 continue;
1054                         }
1055
1056                         if (target->len != namelen ||
1057                             memcmp(target->name, name, namelen) != 0)
1058                                 rc = LS_NONE_FIRST_DE;
1059                         else if (!dot_de)
1060                                 rc = LS_FIRST_DE;
1061                         else
1062                                 rc = LS_FIRST_DOT_DE;
1063
1064                         ll_put_page(page);
1065                         GOTO(out, rc);
1066                 }
1067                 pos = le64_to_cpu(dp->ldp_hash_end);
1068                 ll_put_page(page);
1069                 if (pos == DIR_END_OFF) {
1070                         /*
1071                          * End of directory reached.
1072                          */
1073                         break;
1074                 } else if (1) {
1075                         /*
1076                          * chain is exhausted
1077                          * Normal case: continue to the next page.
1078                          */
1079                         page = ll_get_dir_page(dir, pos, 1, &chain);
1080                 } else {
1081                         /*
1082                          * go into overflow page.
1083                          */
1084                 }
1085         }
1086         EXIT;
1087
1088 out:
1089         ll_dir_chain_fini(&chain);
1090         return rc;
1091 }
1092
1093 static int is_same_dentry(struct dentry *d1, struct dentry *d2)
1094 {
1095         if (unlikely(d1 == d2))
1096                 return 1;
1097         if (d1->d_parent == d2->d_parent &&
1098             d1->d_name.hash == d2->d_name.hash &&
1099             d1->d_name.len == d2->d_name.len &&
1100             memcmp(d1->d_name.name, d2->d_name.name, d1->d_name.len) == 0)
1101                 return 1;
1102         return 0;
1103 }
1104
1105 /**
1106  * Start statahead thread if this is the first dir entry.
1107  * Otherwise if a thread is started already, wait it until it is ahead of me.
1108  * \retval 0       -- stat ahead thread process such dentry, miss for lookup
1109  * \retval 1       -- stat ahead thread process such dentry, hit for any case
1110  * \retval -EEXIST -- stat ahead thread started, and this is the first dentry
1111  * \retval -EBADFD -- statahead thread exit and not dentry available
1112  * \retval -EAGAIN -- try to stat by caller
1113  * \retval others  -- error
1114  */
1115 int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup)
1116 {
1117         struct ll_inode_info     *lli;
1118         struct ll_statahead_info *sai;
1119         struct dentry            *parent;
1120         struct l_wait_info        lwi = { 0 };
1121         int                       rc = 0;
1122         ENTRY;
1123
1124         LASSERT(dir != NULL);
1125         lli = ll_i2info(dir);
1126         LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
1127         sai = lli->lli_sai;
1128
1129         if (sai) {
1130                 if (unlikely(sa_is_stopped(sai) &&
1131                              cfs_list_empty(&sai->sai_entries_stated)))
1132                         RETURN(-EBADFD);
1133
1134                 if ((*dentryp)->d_name.name[0] == '.') {
1135                         if (likely(sai->sai_ls_all ||
1136                             sai->sai_miss_hidden >= sai->sai_skip_hidden)) {
1137                                 /*
1138                                  * Hidden dentry is the first one, or statahead
1139                                  * thread does not skip so many hidden dentries
1140                                  * before "sai_ls_all" enabled as below.
1141                                  */
1142                         } else {
1143                                 if (!sai->sai_ls_all)
1144                                         /*
1145                                          * It maybe because hidden dentry is not
1146                                          * the first one, "sai_ls_all" was not
1147                                          * set, then "ls -al" missed. Enable
1148                                          * "sai_ls_all" for such case.
1149                                          */
1150                                         sai->sai_ls_all = 1;
1151
1152                                 /*
1153                                  * Such "getattr" has been skipped before
1154                                  * "sai_ls_all" enabled as above.
1155                                  */
1156                                 sai->sai_miss_hidden++;
1157                                 RETURN(-ENOENT);
1158                         }
1159                 }
1160
1161                 if (!ll_sai_entry_stated(sai)) {
1162                         /*
1163                          * thread started already, avoid double-stat.
1164                          */
1165                         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1166                         rc = l_wait_event(sai->sai_waitq,
1167                                           ll_sai_entry_stated(sai) ||
1168                                           sa_is_stopped(sai),
1169                                           &lwi);
1170                         if (unlikely(rc == -EINTR))
1171                                 RETURN(rc);
1172                 }
1173
1174                 if (ll_sai_entry_stated(sai)) {
1175                         struct ll_sai_entry  *entry;
1176
1177                         entry = cfs_list_entry(sai->sai_entries_stated.next,
1178                                                struct ll_sai_entry, se_list);
1179                         /* This is for statahead lookup */
1180                         if (entry->se_inode != NULL) {
1181                                 struct lookup_intent it = {.it_op = IT_GETATTR};
1182                                 struct dentry *dchild = entry->se_dentry;
1183                                 struct inode *ichild = entry->se_inode;
1184                                 int found = 0;
1185                                 __u32 bits;
1186
1187                                 LASSERT(dchild != *dentryp);
1188
1189                                 if (!lookup)
1190                                         mutex_lock(&dir->i_mutex);
1191
1192                                 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1193                                                         ll_inode2fid(ichild),
1194                                                         &bits);
1195                                 if (rc == 1) {
1196                                         struct dentry *save = dchild;
1197
1198                                         ll_lookup_it_alias(&dchild, ichild,
1199                                                            bits);
1200                                         ll_lookup_finish_locks(&it, dchild);
1201                                         if (dchild != save)
1202                                                 dput(save);
1203                                         found = is_same_dentry(dchild,
1204                                                                *dentryp);
1205                                 } else {
1206                                         /* Someone has canceled related ldlm
1207                                          * lock before the real "revalidate"
1208                                          * using it.
1209                                          * Drop the inode reference count held
1210                                          * by interpreter. */
1211                                         iput(ichild);
1212                                 }
1213
1214                                 if (!lookup)
1215                                         mutex_unlock(&dir->i_mutex);
1216
1217                                 entry->se_dentry = NULL;
1218                                 entry->se_inode = NULL;
1219                                 if (found) {
1220                                         if (lookup) {
1221                                                 LASSERT(dchild != *dentryp);
1222                                                 /* VFS will drop the reference
1223                                                  * count for dchild and *dentryp
1224                                                  * by itself. */
1225                                                 *dentryp = dchild;
1226                                         } else {
1227                                                 LASSERT(dchild == *dentryp);
1228                                                 /* Drop the dentry reference
1229                                                  * count held by statahead. */
1230                                                 dput(dchild);
1231                                         }
1232                                         RETURN(1);
1233                                 } else {
1234                                         /* Drop the dentry reference count held
1235                                          * by statahead. */
1236                                         dput(dchild);
1237                                 }
1238                         }
1239                 }
1240
1241                 if (lookup) {
1242                         struct dentry *result;
1243
1244                         result = d_lookup((*dentryp)->d_parent,
1245                                           &(*dentryp)->d_name);
1246                         if (result) {
1247                                 LASSERT(result != *dentryp);
1248                                 /* BUG 16303: do not drop reference count for
1249                                  * "*dentryp", VFS will do that by itself. */
1250                                 *dentryp = result;
1251                                 RETURN(1);
1252                         }
1253                 }
1254                 /*
1255                  * do nothing for revalidate.
1256                  */
1257                 RETURN(0);
1258         }
1259
1260         /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1261         rc = is_first_dirent(dir, *dentryp);
1262         if (rc == LS_NONE_FIRST_DE)
1263                 /* It is not "ls -{a}l" operation, no need statahead for it. */
1264                 GOTO(out, rc = -EAGAIN);
1265
1266         sai = ll_sai_alloc();
1267         if (sai == NULL)
1268                 GOTO(out, rc = -ENOMEM);
1269
1270         sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
1271         sai->sai_inode = igrab(dir);
1272         if (unlikely(sai->sai_inode == NULL)) {
1273                 CWARN("Do not start stat ahead on dying inode "DFID"\n",
1274                       PFID(&lli->lli_fid));
1275                 OBD_FREE_PTR(sai);
1276                 GOTO(out, rc = -ESTALE);
1277         }
1278
1279         /* get parent reference count here, and put it in ll_statahead_thread */
1280         parent = dget((*dentryp)->d_parent);
1281         if (unlikely(sai->sai_inode != parent->d_inode)) {
1282                 struct ll_inode_info *nlli = ll_i2info(parent->d_inode);
1283
1284                 CWARN("Race condition, someone changed %.*s just now: "
1285                       "old parent "DFID", new parent "DFID"\n",
1286                       (*dentryp)->d_name.len, (*dentryp)->d_name.name,
1287                       PFID(&lli->lli_fid), PFID(&nlli->lli_fid));
1288                 dput(parent);
1289                 iput(sai->sai_inode);
1290                 OBD_FREE_PTR(sai);
1291                 RETURN(-EAGAIN);
1292         }
1293
1294         lli->lli_sai = sai;
1295         rc = cfs_kernel_thread(ll_statahead_thread, parent, 0);
1296         if (rc < 0) {
1297                 CERROR("can't start ll_sa thread, rc: %d\n", rc);
1298                 dput(parent);
1299                 lli->lli_opendir_key = NULL;
1300                 sai->sai_thread.t_flags = SVC_STOPPED;
1301                 ll_sai_put(sai);
1302                 LASSERT(lli->lli_sai == NULL);
1303                 RETURN(-EAGAIN);
1304         }
1305
1306         l_wait_event(sai->sai_thread.t_ctl_waitq,
1307                      sa_is_running(sai) || sa_is_stopped(sai),
1308                      &lwi);
1309
1310         /*
1311          * We don't stat-ahead for the first dirent since we are already in
1312          * lookup, and -EEXIST also indicates that this is the first dirent.
1313          */
1314         RETURN(-EEXIST);
1315
1316 out:
1317         cfs_spin_lock(&lli->lli_lock);
1318         lli->lli_opendir_key = NULL;
1319         lli->lli_opendir_pid = 0;
1320         cfs_spin_unlock(&lli->lli_lock);
1321         return rc;
1322 }
1323
1324 /**
1325  * update hit/miss count.
1326  */
1327 void ll_statahead_exit(struct inode *dir, struct dentry *dentry, int result)
1328 {
1329         struct ll_inode_info     *lli;
1330         struct ll_statahead_info *sai;
1331         struct ll_sb_info        *sbi;
1332         struct ll_dentry_data    *ldd = ll_d2d(dentry);
1333         int                       rc;
1334         ENTRY;
1335
1336         LASSERT(dir != NULL);
1337         lli = ll_i2info(dir);
1338         LASSERT(lli->lli_opendir_pid == cfs_curproc_pid());
1339         sai = lli->lli_sai;
1340         LASSERT(sai != NULL);
1341         sbi = ll_i2sbi(dir);
1342
1343         rc = ll_sai_entry_fini(sai);
1344         /* rc == -ENOENT means such dentry was removed just between statahead
1345          * readdir and pre-fetched, count it as hit.
1346          *
1347          * result == -ENOENT has two meanings:
1348          * 1. such dentry was removed just between statahead pre-fetched and
1349          *    main process stat such dentry.
1350          * 2. main process stat non-exist dentry.
1351          * We can not distinguish such two cases, just count them as miss. */
1352         if (result >= 1 || unlikely(rc == -ENOENT)) {
1353                 sai->sai_hit++;
1354                 sai->sai_consecutive_miss = 0;
1355                 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
1356         } else {
1357                 sai->sai_miss++;
1358                 sai->sai_consecutive_miss++;
1359                 if (sa_low_hit(sai) && sa_is_running(sai)) {
1360                         atomic_inc(&sbi->ll_sa_wrong);
1361                         CDEBUG(D_READA, "Statahead for dir "DFID" hit ratio "
1362                                "too low: hit/miss %u/%u, sent/replied %u/%u, "
1363                                "stopping statahead thread: pid %d\n",
1364                                PFID(&lli->lli_fid), sai->sai_hit,
1365                                sai->sai_miss, sai->sai_sent,
1366                                sai->sai_replied, cfs_curproc_pid());
1367                         cfs_spin_lock(&lli->lli_lock);
1368                         if (!sa_is_stopped(sai))
1369                                 sai->sai_thread.t_flags = SVC_STOPPING;
1370                         cfs_spin_unlock(&lli->lli_lock);
1371                 }
1372         }
1373
1374         if (!sa_is_stopped(sai))
1375                 cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq);
1376         if (likely(ldd != NULL))
1377                 ldd->lld_sa_generation = sai->sai_generation;
1378
1379         EXIT;
1380 }