Whamcloud - gitweb
LU-14361 statahead: wait inuse entry finished during cleanup
[fs/lustre-release.git] / lustre / llite / statahead.c
index f41b4b6..94923a3 100644 (file)
 
 #define SA_OMITTED_ENTRY_MAX 8ULL
 
-typedef enum {
+enum sa_entry_state {
        /** negative values are for error cases */
        SA_ENTRY_INIT = 0,      /** init entry */
        SA_ENTRY_SUCC = 1,      /** stat succeed */
        SA_ENTRY_INVA = 2,      /** invalid entry */
-} se_state_t;
+};
 
 /*
  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
@@ -68,7 +68,7 @@ struct sa_entry {
        /* low layer ldlm lock handle */
        __u64                            se_handle;
        /* entry status */
-       se_state_t                       se_state;
+       enum sa_entry_state              se_state;
        /* entry size, contains name */
        int                              se_size;
        /* pointer to the target inode */
@@ -115,14 +115,24 @@ sa_rehash(struct ll_statahead_context *ctx, struct sa_entry *entry)
 }
 
 /* unhash entry from sai_cache */
-static inline void
-sa_unhash(struct ll_statahead_context *ctx, struct sa_entry *entry)
+static inline int sa_unhash(struct ll_statahead_context *ctx,
+                           struct sa_entry *entry, bool inuse_check)
 {
+       struct ll_statahead_info *sai = entry->se_sai;
        int i = sa_hash(entry->se_qstr.hash);
+       int rc = 0;
+
+       if (inuse_check && atomic_read(&sai->sai_inuse_count) > 0)
+               return -EAGAIN;
 
        spin_lock(&ctx->sax_cache_lock[i]);
-       list_del_init(&entry->se_hash);
+       if (inuse_check && atomic_read(&sai->sai_inuse_count) > 0)
+               rc = -EAGAIN;
+       else
+               list_del_init(&entry->se_hash);
        spin_unlock(&ctx->sax_cache_lock[i]);
+
+       return rc;
 }
 
 static inline int agl_should_run(struct ll_statahead_info *sai,
@@ -167,13 +177,13 @@ static inline int agl_list_empty(struct ll_statahead_info *sai)
 /**
  * (1) hit ratio less than 80%
  * or
- * (2) consecutive miss more than 8
+ * (2) consecutive miss more than 32
  * then means low hit.
  */
 static inline int sa_low_hit(struct ll_statahead_info *sai)
 {
-       return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
-               (sai->sai_consecutive_miss > 8));
+       return ((sai->sai_hit > 32 && sai->sai_hit < 4 * sai->sai_miss) ||
+               (sai->sai_consecutive_miss > 32));
 }
 
 /*
@@ -247,11 +257,13 @@ static void sa_free(struct ll_statahead_context *ctx, struct sa_entry *entry)
 }
 
 /*
- * find sa_entry by name, used by directory scanner, lock is not needed because
- * only scanner can remove the entry from cache.
+ * Find sa_entry by name, used by directory scanner. If @sai_pid is not the PID
+ * of the scanner (which means it may do statahead wrongly, return -EINVAL
+ * immediately.
  */
-static struct sa_entry *
-sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
+static struct sa_entry *sa_get(struct ll_statahead_context *ctx,
+                              const struct qstr *qstr,
+                              struct ll_statahead_info **info)
 {
        struct sa_entry *entry;
        int i = sa_hash(qstr->hash);
@@ -261,6 +273,19 @@ sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
                if (entry->se_qstr.hash == qstr->hash &&
                    entry->se_qstr.len == qstr->len &&
                    memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) {
+                       struct ll_statahead_info *sai = entry->se_sai;
+
+                       if (sai->sai_pid != current->pid) {
+                               CDEBUG(D_CACHE,
+                                      "%s: wrong pid=%d:%d for entry %.*s\n",
+                                      ll_i2sbi(ctx->sax_inode)->ll_fsname,
+                                      sai->sai_pid, current->pid,
+                                      entry->se_qstr.len, entry->se_qstr.name);
+                               entry = ERR_PTR(-EINVAL);
+                               *info = sai;
+                       }
+
+                       atomic_inc(&sai->sai_inuse_count);
                        spin_unlock(&ctx->sax_cache_lock[i]);
                        return entry;
                }
@@ -270,18 +295,20 @@ sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
 }
 
 /* unhash and unlink sa_entry, and then free it */
-static inline void
-sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry, bool locked)
+static inline int sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry,
+                         bool locked, bool inuse_check)
 {
        struct inode *dir = sai->sai_dentry->d_inode;
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_statahead_context *ctx = lli->lli_sax;
+       int rc;
 
-       LASSERT(!sa_unhashed(entry));
        LASSERT(!list_empty(&entry->se_list));
        LASSERT(sa_ready(entry));
 
-       sa_unhash(ctx, entry);
+       rc = sa_unhash(ctx, entry, inuse_check);
+       if (rc)
+               return rc;
 
        if (!locked)
                spin_lock(&lli->lli_sa_lock);
@@ -290,9 +317,18 @@ sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry, bool locked)
 
        iput(entry->se_inode);
        atomic_dec(&sai->sai_cache_count);
+
        sa_free(ctx, entry);
        if (locked)
                spin_lock(&lli->lli_sa_lock);
+
+       return 0;
+}
+
+static inline int sa_kill_try(struct ll_statahead_info *sai,
+                             struct sa_entry *entry, bool locked)
+{
+       return sa_kill(sai, entry, locked, true);
 }
 
 /* called by scanner after use, sa_entry will be killed */
@@ -302,6 +338,7 @@ sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
        struct ll_inode_info *lli = ll_i2info(dir);
        struct sa_entry *tmp;
        bool wakeup = false;
+       bool inuse = false;
 
        if (entry && entry->se_state == SA_ENTRY_SUCC) {
                struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
@@ -328,29 +365,31 @@ sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
                wakeup = true;
        }
 
-       if (entry)
-               sa_kill(sai, entry, false);
+       if (entry) {
+               inuse = true;
+               sa_kill(sai, entry, false, false);
+               CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_STATAHEAD_PAUSE, cfs_fail_val);
+       }
 
-       if (sai) {
+       spin_lock(&lli->lli_sa_lock);
+       if (inuse) {
                /*
                 * kill old completed entries. Maybe kicking old entries can
                 * be ignored?
                 */
-               spin_lock(&lli->lli_sa_lock);
                while ((tmp = list_first_entry_or_null(&sai->sai_entries,
                                struct sa_entry, se_list))) {
                        if (!is_omitted_entry(sai, tmp->se_index))
                                break;
 
                        /* ll_sa_lock is dropped by sa_kill(), restart list */
-                       sa_kill(sai, tmp, true);
+                       sa_kill(sai, tmp, true, false);
                }
-               spin_unlock(&lli->lli_sa_lock);
        }
-
-       spin_lock(&lli->lli_sa_lock);
        if (wakeup && sai->sai_task)
                wake_up_process(sai->sai_task);
+       if (inuse)
+               atomic_dec(&sai->sai_inuse_count);
        spin_unlock(&lli->lli_sa_lock);
 }
 
@@ -586,7 +625,7 @@ static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
        INIT_LIST_HEAD(&sai->sai_agls);
 
        atomic_set(&sai->sai_cache_count, 0);
-
+       atomic_set(&sai->sai_inuse_count, 0);
        spin_lock(&sai_generation_lock);
        lli->lli_sa_generation = ++sai_generation;
        if (unlikely(sai_generation == 0))
@@ -621,7 +660,6 @@ static void ll_sai_put(struct ll_statahead_info *sai)
 
        if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
                struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
-               struct sa_entry *entry, *next;
 
                lli->lli_sai = NULL;
                list_del_init(&sai->sai_item);
@@ -631,10 +669,6 @@ static void ll_sai_put(struct ll_statahead_info *sai)
                LASSERT(!sai->sai_agl_task);
                LASSERT(sai->sai_sent == sai->sai_replied);
 
-               list_for_each_entry_safe(entry, next, &sai->sai_entries,
-                                        se_list)
-                       sa_kill(sai, entry, false);
-
                LASSERT(atomic_read(&sai->sai_cache_count) == 0);
                LASSERT(agl_list_empty(sai));
 
@@ -1490,6 +1524,8 @@ static int ll_statahead_thread(void *arg)
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_sb_info *sbi = ll_i2sbi(dir);
        struct lu_batch *bh = NULL;
+       struct sa_entry *entry;
+       int tries = 0;
        int rc = 0;
 
        ENTRY;
@@ -1570,6 +1606,28 @@ out_stop_agl:
        atomic_add(sai->sai_hit, &sbi->ll_sa_hit_total);
        atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
 
+       /* Kill all local cached entry. */
+       spin_lock(&lli->lli_sa_lock);
+       while ((entry = list_first_entry_or_null(&sai->sai_entries,
+                                                struct sa_entry, se_list))) {
+               /*
+                * If the entry is being used by the user process, wait for
+                * inuse entry finished and restart to kill local cached
+                * entries.
+                */
+               if (sa_kill_try(sai, entry, true)) {
+                       spin_unlock(&lli->lli_sa_lock);
+                       msleep(125);
+                       if (++tries % 1024 == 0) {
+                               CWARN("%s: statahead thread waited %lums for inuse entry "DFID" to be finished\n",
+                                     sbi->ll_fsname, tries * 125/MSEC_PER_SEC,
+                                     PFID(&entry->se_fid));
+                       }
+                       spin_lock(&lli->lli_sa_lock);
+               }
+       }
+       spin_unlock(&lli->lli_sa_lock);
+
        ll_sai_put(sai);
        ll_sax_put(dir, lli->lli_sax);
 
@@ -1854,13 +1912,13 @@ static int ll_shared_statahead_check(struct inode *dir, struct dentry *dentry,
                lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
        }
 
-       LASSERT(lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED);
        sai = ll_find_sai_locked(ctx, current->pid);
        if (sai) {
                spin_unlock(&lli->lli_sa_lock);
                RETURN(-EEXIST);
        }
 
+       lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
        spin_unlock(&lli->lli_sa_lock);
 
        RETURN(start_statahead_thread(dir, dentry, true));
@@ -1886,6 +1944,7 @@ static int revalidate_statahead_dentry(struct inode *dir,
        struct sa_entry *entry = NULL;
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_statahead_info *sai = lli->lli_sai;
+       struct ll_statahead_info *info = NULL;
        int rc = 0;
 
        ENTRY;
@@ -1920,8 +1979,25 @@ static int revalidate_statahead_dentry(struct inode *dir,
        if (unplug)
                GOTO(out, rc = 1);
 
-       entry = sa_get(ctx, &(*dentryp)->d_name);
-       if (!entry) {
+       entry = sa_get(ctx, &(*dentryp)->d_name, &info);
+       if (entry == ERR_PTR(-EINVAL)) {
+               sai = info;
+               spin_lock(&lli->lli_sa_lock);
+               if (sai->sai_task) {
+                       struct task_struct *task = sai->sai_task;
+
+                       /*
+                        * matches smp_load_acquire() in
+                        * ll_statahead_thread().
+                        * Notify to stop statahead thread immediately.
+                        */
+                       smp_store_release(&sai->sai_task, NULL);
+                       wake_up_process(task);
+               }
+               atomic_dec(&sai->sai_inuse_count);
+               spin_unlock(&lli->lli_sa_lock);
+               RETURN(-EINVAL);
+       } else if (entry == NULL) {
                if (lli->lli_sa_pattern & LSA_PATTERN_FNAME)
                        rc = ll_shared_statahead_check(dir, *dentryp, ctx);
                GOTO(out, rc = rc == 0 ? -EAGAIN : rc);
@@ -1932,6 +2008,8 @@ static int revalidate_statahead_dentry(struct inode *dir,
        else if (lli->lli_sa_pattern & LSA_PATTERN_FNAME ||
                 lli->lli_sa_pattern == LSA_PATTERN_ADVISE)
                sai = entry->se_sai;
+       else
+               sai = entry->se_sai;
 
        LASSERTF(sai != NULL, "pattern %#X entry %p se_sai %p %pd lli %p\n",
                 lli->lli_sa_pattern, entry, entry->se_sai, *dentryp, lli);
@@ -2503,6 +2581,7 @@ int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise)
        sai->sai_ls_all = 0;
        sai->sai_max = sbi->ll_sa_max;
        strncpy(sai->sai_fname, ladvise->lla_fname, sizeof(sai->sai_fname));
+       sai->sai_pid = current->pid;
 
        ctx = ll_sax_get(dir);
        if (ctx == NULL) {
@@ -2622,6 +2701,7 @@ void ll_statahead_enter(struct inode *dir, struct dentry *dchild)
         * Quick check whether the last character is digit.
         */
        if (!isdigit(dname->name[dname->len - 1])) {
+               lli->lli_sa_pattern &= ~LSA_PATTERN_FN_PREDICT;
                lli->lli_sa_match_count = 0;
                return;
        }