Whamcloud - gitweb
LU-14361 statahead: wait inuse entry finished during cleanup 91/49291/18
authorQian Yingjin <qian@ddn.com>
Thu, 1 Dec 2022 02:43:50 +0000 (21:43 -0500)
committerOleg Drokin <green@whamcloud.com>
Tue, 30 Apr 2024 06:53:38 +0000 (06:53 +0000)
If the entry is being used by the user process when the statahead is
doing cleanup and quit, it must wait for the inuse entry finished
and then kill the local cached entries in statahead context.

Add sanity/test_123{k,l} to verify it.

Signed-off-by: Qian Yingjin <qian@ddn.com>
Change-Id: I747badd85bd44cb20f7d37ca3126ca308a632371
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/49291
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Lai Siyao <lai.siyao@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
lustre/include/obd_support.h
lustre/llite/llite_internal.h
lustre/llite/statahead.c
lustre/ptlrpc/batch.c
lustre/tests/sanity.sh

index ce97791..787853d 100644 (file)
@@ -640,6 +640,7 @@ extern bool obd_enable_health_write;
 #define OBD_FAIL_LLITE_DELAY_TRUNCATE              0x1430
 #define OBD_FAIL_LLITE_READ_PAUSE                  0x1431
 #define OBD_FAIL_LLITE_FAULT_PAUSE                 0x1432
+#define OBD_FAIL_LLITE_STATAHEAD_PAUSE             0x1433
 
 #define OBD_FAIL_FID_INDIR     0x1501
 #define OBD_FAIL_FID_INLMA     0x1502
index cd5fe30..8090c44 100644 (file)
@@ -1764,6 +1764,7 @@ struct ll_statahead_info {
        struct list_head        sai_item;
        struct dentry           *sai_dentry;
        atomic_t                sai_refcount;   /* On access, hold refcount */
+       atomic_t                sai_inuse_count;/* inuse entry count */
        unsigned int            sai_max;        /* max ahead of lookup */
        __u64                   sai_sent;       /* stat requests sent count */
        __u64                   sai_replied;    /* stat req which recv reply */
index f41b4b6..94923a3 100644 (file)
 
 #define SA_OMITTED_ENTRY_MAX 8ULL
 
-typedef enum {
+enum sa_entry_state {
        /** negative values are for error cases */
        SA_ENTRY_INIT = 0,      /** init entry */
        SA_ENTRY_SUCC = 1,      /** stat succeed */
        SA_ENTRY_INVA = 2,      /** invalid entry */
-} se_state_t;
+};
 
 /*
  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
@@ -68,7 +68,7 @@ struct sa_entry {
        /* low layer ldlm lock handle */
        __u64                            se_handle;
        /* entry status */
-       se_state_t                       se_state;
+       enum sa_entry_state              se_state;
        /* entry size, contains name */
        int                              se_size;
        /* pointer to the target inode */
@@ -115,14 +115,24 @@ sa_rehash(struct ll_statahead_context *ctx, struct sa_entry *entry)
 }
 
 /* unhash entry from sai_cache */
-static inline void
-sa_unhash(struct ll_statahead_context *ctx, struct sa_entry *entry)
+static inline int sa_unhash(struct ll_statahead_context *ctx,
+                           struct sa_entry *entry, bool inuse_check)
 {
+       struct ll_statahead_info *sai = entry->se_sai;
        int i = sa_hash(entry->se_qstr.hash);
+       int rc = 0;
+
+       if (inuse_check && atomic_read(&sai->sai_inuse_count) > 0)
+               return -EAGAIN;
 
        spin_lock(&ctx->sax_cache_lock[i]);
-       list_del_init(&entry->se_hash);
+       if (inuse_check && atomic_read(&sai->sai_inuse_count) > 0)
+               rc = -EAGAIN;
+       else
+               list_del_init(&entry->se_hash);
        spin_unlock(&ctx->sax_cache_lock[i]);
+
+       return rc;
 }
 
 static inline int agl_should_run(struct ll_statahead_info *sai,
@@ -167,13 +177,13 @@ static inline int agl_list_empty(struct ll_statahead_info *sai)
 /**
  * (1) hit ratio less than 80%
  * or
- * (2) consecutive miss more than 8
+ * (2) consecutive miss more than 32
  * then means low hit.
  */
 static inline int sa_low_hit(struct ll_statahead_info *sai)
 {
-       return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
-               (sai->sai_consecutive_miss > 8));
+       return ((sai->sai_hit > 32 && sai->sai_hit < 4 * sai->sai_miss) ||
+               (sai->sai_consecutive_miss > 32));
 }
 
 /*
@@ -247,11 +257,13 @@ static void sa_free(struct ll_statahead_context *ctx, struct sa_entry *entry)
 }
 
 /*
- * find sa_entry by name, used by directory scanner, lock is not needed because
- * only scanner can remove the entry from cache.
+ * Find sa_entry by name, used by directory scanner. If @sai_pid is not the PID
+ * of the scanner (which means it may do statahead wrongly, return -EINVAL
+ * immediately.
  */
-static struct sa_entry *
-sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
+static struct sa_entry *sa_get(struct ll_statahead_context *ctx,
+                              const struct qstr *qstr,
+                              struct ll_statahead_info **info)
 {
        struct sa_entry *entry;
        int i = sa_hash(qstr->hash);
@@ -261,6 +273,19 @@ sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
                if (entry->se_qstr.hash == qstr->hash &&
                    entry->se_qstr.len == qstr->len &&
                    memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0) {
+                       struct ll_statahead_info *sai = entry->se_sai;
+
+                       if (sai->sai_pid != current->pid) {
+                               CDEBUG(D_CACHE,
+                                      "%s: wrong pid=%d:%d for entry %.*s\n",
+                                      ll_i2sbi(ctx->sax_inode)->ll_fsname,
+                                      sai->sai_pid, current->pid,
+                                      entry->se_qstr.len, entry->se_qstr.name);
+                               entry = ERR_PTR(-EINVAL);
+                               *info = sai;
+                       }
+
+                       atomic_inc(&sai->sai_inuse_count);
                        spin_unlock(&ctx->sax_cache_lock[i]);
                        return entry;
                }
@@ -270,18 +295,20 @@ sa_get(struct ll_statahead_context *ctx, const struct qstr *qstr)
 }
 
 /* unhash and unlink sa_entry, and then free it */
-static inline void
-sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry, bool locked)
+static inline int sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry,
+                         bool locked, bool inuse_check)
 {
        struct inode *dir = sai->sai_dentry->d_inode;
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_statahead_context *ctx = lli->lli_sax;
+       int rc;
 
-       LASSERT(!sa_unhashed(entry));
        LASSERT(!list_empty(&entry->se_list));
        LASSERT(sa_ready(entry));
 
-       sa_unhash(ctx, entry);
+       rc = sa_unhash(ctx, entry, inuse_check);
+       if (rc)
+               return rc;
 
        if (!locked)
                spin_lock(&lli->lli_sa_lock);
@@ -290,9 +317,18 @@ sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry, bool locked)
 
        iput(entry->se_inode);
        atomic_dec(&sai->sai_cache_count);
+
        sa_free(ctx, entry);
        if (locked)
                spin_lock(&lli->lli_sa_lock);
+
+       return 0;
+}
+
+static inline int sa_kill_try(struct ll_statahead_info *sai,
+                             struct sa_entry *entry, bool locked)
+{
+       return sa_kill(sai, entry, locked, true);
 }
 
 /* called by scanner after use, sa_entry will be killed */
@@ -302,6 +338,7 @@ sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
        struct ll_inode_info *lli = ll_i2info(dir);
        struct sa_entry *tmp;
        bool wakeup = false;
+       bool inuse = false;
 
        if (entry && entry->se_state == SA_ENTRY_SUCC) {
                struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
@@ -328,29 +365,31 @@ sa_put(struct inode *dir, struct ll_statahead_info *sai, struct sa_entry *entry)
                wakeup = true;
        }
 
-       if (entry)
-               sa_kill(sai, entry, false);
+       if (entry) {
+               inuse = true;
+               sa_kill(sai, entry, false, false);
+               CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_STATAHEAD_PAUSE, cfs_fail_val);
+       }
 
-       if (sai) {
+       spin_lock(&lli->lli_sa_lock);
+       if (inuse) {
                /*
                 * kill old completed entries. Maybe kicking old entries can
                 * be ignored?
                 */
-               spin_lock(&lli->lli_sa_lock);
                while ((tmp = list_first_entry_or_null(&sai->sai_entries,
                                struct sa_entry, se_list))) {
                        if (!is_omitted_entry(sai, tmp->se_index))
                                break;
 
                        /* ll_sa_lock is dropped by sa_kill(), restart list */
-                       sa_kill(sai, tmp, true);
+                       sa_kill(sai, tmp, true, false);
                }
-               spin_unlock(&lli->lli_sa_lock);
        }
-
-       spin_lock(&lli->lli_sa_lock);
        if (wakeup && sai->sai_task)
                wake_up_process(sai->sai_task);
+       if (inuse)
+               atomic_dec(&sai->sai_inuse_count);
        spin_unlock(&lli->lli_sa_lock);
 }
 
@@ -586,7 +625,7 @@ static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
        INIT_LIST_HEAD(&sai->sai_agls);
 
        atomic_set(&sai->sai_cache_count, 0);
-
+       atomic_set(&sai->sai_inuse_count, 0);
        spin_lock(&sai_generation_lock);
        lli->lli_sa_generation = ++sai_generation;
        if (unlikely(sai_generation == 0))
@@ -621,7 +660,6 @@ static void ll_sai_put(struct ll_statahead_info *sai)
 
        if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
                struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
-               struct sa_entry *entry, *next;
 
                lli->lli_sai = NULL;
                list_del_init(&sai->sai_item);
@@ -631,10 +669,6 @@ static void ll_sai_put(struct ll_statahead_info *sai)
                LASSERT(!sai->sai_agl_task);
                LASSERT(sai->sai_sent == sai->sai_replied);
 
-               list_for_each_entry_safe(entry, next, &sai->sai_entries,
-                                        se_list)
-                       sa_kill(sai, entry, false);
-
                LASSERT(atomic_read(&sai->sai_cache_count) == 0);
                LASSERT(agl_list_empty(sai));
 
@@ -1490,6 +1524,8 @@ static int ll_statahead_thread(void *arg)
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_sb_info *sbi = ll_i2sbi(dir);
        struct lu_batch *bh = NULL;
+       struct sa_entry *entry;
+       int tries = 0;
        int rc = 0;
 
        ENTRY;
@@ -1570,6 +1606,28 @@ out_stop_agl:
        atomic_add(sai->sai_hit, &sbi->ll_sa_hit_total);
        atomic_add(sai->sai_miss, &sbi->ll_sa_miss_total);
 
+       /* Kill all local cached entry. */
+       spin_lock(&lli->lli_sa_lock);
+       while ((entry = list_first_entry_or_null(&sai->sai_entries,
+                                                struct sa_entry, se_list))) {
+               /*
+                * If the entry is being used by the user process, wait for
+                * inuse entry finished and restart to kill local cached
+                * entries.
+                */
+               if (sa_kill_try(sai, entry, true)) {
+                       spin_unlock(&lli->lli_sa_lock);
+                       msleep(125);
+                       if (++tries % 1024 == 0) {
+                               CWARN("%s: statahead thread waited %lums for inuse entry "DFID" to be finished\n",
+                                     sbi->ll_fsname, tries * 125/MSEC_PER_SEC,
+                                     PFID(&entry->se_fid));
+                       }
+                       spin_lock(&lli->lli_sa_lock);
+               }
+       }
+       spin_unlock(&lli->lli_sa_lock);
+
        ll_sai_put(sai);
        ll_sax_put(dir, lli->lli_sax);
 
@@ -1854,13 +1912,13 @@ static int ll_shared_statahead_check(struct inode *dir, struct dentry *dentry,
                lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
        }
 
-       LASSERT(lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED);
        sai = ll_find_sai_locked(ctx, current->pid);
        if (sai) {
                spin_unlock(&lli->lli_sa_lock);
                RETURN(-EEXIST);
        }
 
+       lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
        spin_unlock(&lli->lli_sa_lock);
 
        RETURN(start_statahead_thread(dir, dentry, true));
@@ -1886,6 +1944,7 @@ static int revalidate_statahead_dentry(struct inode *dir,
        struct sa_entry *entry = NULL;
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_statahead_info *sai = lli->lli_sai;
+       struct ll_statahead_info *info = NULL;
        int rc = 0;
 
        ENTRY;
@@ -1920,8 +1979,25 @@ static int revalidate_statahead_dentry(struct inode *dir,
        if (unplug)
                GOTO(out, rc = 1);
 
-       entry = sa_get(ctx, &(*dentryp)->d_name);
-       if (!entry) {
+       entry = sa_get(ctx, &(*dentryp)->d_name, &info);
+       if (entry == ERR_PTR(-EINVAL)) {
+               sai = info;
+               spin_lock(&lli->lli_sa_lock);
+               if (sai->sai_task) {
+                       struct task_struct *task = sai->sai_task;
+
+                       /*
+                        * matches smp_load_acquire() in
+                        * ll_statahead_thread().
+                        * Notify to stop statahead thread immediately.
+                        */
+                       smp_store_release(&sai->sai_task, NULL);
+                       wake_up_process(task);
+               }
+               atomic_dec(&sai->sai_inuse_count);
+               spin_unlock(&lli->lli_sa_lock);
+               RETURN(-EINVAL);
+       } else if (entry == NULL) {
                if (lli->lli_sa_pattern & LSA_PATTERN_FNAME)
                        rc = ll_shared_statahead_check(dir, *dentryp, ctx);
                GOTO(out, rc = rc == 0 ? -EAGAIN : rc);
@@ -1932,6 +2008,8 @@ static int revalidate_statahead_dentry(struct inode *dir,
        else if (lli->lli_sa_pattern & LSA_PATTERN_FNAME ||
                 lli->lli_sa_pattern == LSA_PATTERN_ADVISE)
                sai = entry->se_sai;
+       else
+               sai = entry->se_sai;
 
        LASSERTF(sai != NULL, "pattern %#X entry %p se_sai %p %pd lli %p\n",
                 lli->lli_sa_pattern, entry, entry->se_sai, *dentryp, lli);
@@ -2503,6 +2581,7 @@ int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise)
        sai->sai_ls_all = 0;
        sai->sai_max = sbi->ll_sa_max;
        strncpy(sai->sai_fname, ladvise->lla_fname, sizeof(sai->sai_fname));
+       sai->sai_pid = current->pid;
 
        ctx = ll_sax_get(dir);
        if (ctx == NULL) {
@@ -2622,6 +2701,7 @@ void ll_statahead_enter(struct inode *dir, struct dentry *dchild)
         * Quick check whether the last character is digit.
         */
        if (!isdigit(dname->name[dname->len - 1])) {
+               lli->lli_sa_pattern &= ~LSA_PATTERN_FN_PREDICT;
                lli->lli_sa_match_count = 0;
                return;
        }
index 1684b31..ecaa860 100644 (file)
@@ -483,10 +483,7 @@ static int batch_send_update_req(const struct lu_env *env,
        if (flags & BATCH_FL_SYNC) {
                rc = ptlrpc_queue_wait(req);
        } else {
-               if ((flags & (BATCH_FL_RDONLY | BATCH_FL_RQSET)) ==
-                   BATCH_FL_RDONLY) {
-                       ptlrpcd_add_req(req);
-               } else if (flags & BATCH_FL_RQSET) {
+               if (bh && (flags & BATCH_FL_RQSET)) {
                        ptlrpc_set_add_req(bh->lbt_rqset, req);
                        ptlrpc_check_set(env, bh->lbt_rqset);
                } else {
index d0ba156..c2ec956 100755 (executable)
@@ -14906,6 +14906,89 @@ test_123j() {
 }
 run_test 123j "-ENOENT error from batched statahead be handled correctly"
 
+test_123k() {
+       MDTEST=${MDTEST:=$(which mdtest 2> /dev/null || true)}
+       [[ -n "$MDTEST" ]] || skip_env "mdtest not found"
+
+       $LCTL get_param -n mdc.*.connect_flags | grep -q batch_rpc ||
+               skip "Server does not support batch RPC"
+
+       local enabled
+
+       enabled=$($LCTL get_param -n llite.*.enable_statahead_fname | head -n 1)
+       stack_trap "$LCTL set_param llite.*.enable_statahead_fname=$enabled"
+       $LCTL set_param llite.*.enable_statahead_fname=1
+
+       local np=2
+
+       mpi_run -np $np $MDTEST -C -F -n 1000 -d $DIR/$tdir
+       cancel_lru_locks mdc
+       $LCTL set_param llite.*.statahead_stats=clear
+       $LCTL set_param mdc.*.batch_stats=0
+       mpi_run -np $np $MDTEST -T -F -n 1000 -d $DIR/$tdir
+       #umount_client $MOUNT || error "failed to umount client"
+       echo "Sleep to wait statahead thread (ll_sa_xxx) to exit ..."
+       wait_update_facet client "pgrep ll_sa" "" 35 ||
+               error "ll_sa thread is still running"
+
+       $LCTL get_param -n llite.*.statahead_stats
+       $LCTL get_param -n mdc.*.batch_stats
+       ps -el | grep ll_sa
+
+       local count=$($LCTL get_param -n llite.*.statahead_stats |
+                       awk '/fname.total:/ {print $2}')
+
+       [ $count == $np ] || error "File name pattern statahead not trigger"
+       count=$($LCTL get_param -n llite.*.statahead_stats |
+               awk '/hit.total:/ {print $2}')
+       # Hit ratio should be >= 75%
+       [ $count -gt $((np * 1000 * 75 / 100)) ] ||
+               error "hit total is too low: $count"
+}
+run_test 123k "Verify statahead work with mdtest shared stat() mode"
+
+test_123l() {
+       local dir=$DIR/$tdir
+       local cmd="touch $dir/$tfile.{000000..000100}"
+
+       $LCTL get_param -n mdc.*.connect_flags | grep -q batch_rpc ||
+               skip "Server does not support batch RPC"
+
+       stack_trap "rm -rf $dir"
+       mkdir -p $dir || error "failed to mkdir $dir"
+       eval $cmd
+
+       cancel_lru_locks mdc
+       $LCTL set_param llite.*.statahead_stats=clear
+       $LCTL set_param mdc.*.batch_stats=0
+
+       local max
+       local batch_max
+       local enabled
+
+       enabled=$($LCTL get_param -n llite.*.enable_statahead_fname | head -n 1)
+       max=$($LCTL get_param -n llite.*.statahead_max | head -n 1)
+       batch_max=$($LCTL get_param -n llite.*.statahead_batch_max | head -n 1)
+       stack_trap "$LCTL set_param llite.*.statahead_max=$max"
+       stack_trap "$LCTL set_param llite.*.statahead_batch_max=$batch_max"
+       stack_trap "$LCTL set_param llite.*.enable_statahead_fname=$enabled"
+       $LCTL set_param llite.*.enable_statahead_fname=1
+       $LCTL set_param llite.*.statahead_max=1024
+       $LCTL set_param llite.*.statahead_batch_max=32
+       $LCTL get_param -n llite.*.statahead_stats
+       #define OBD_FAIL_LLITE_STATAHEAD_PAUSE  0x1433
+       $LCTL set_param fail_loc=0x80001433 fail_val=35
+       ls $dir/* > /dev/null
+       $LCTL get_param -n llite.*.statahead_stats
+       $LCTL get_param -n mdc.*.batch_stats
+
+       echo "Sleep to wait the statahead thread (ll_sa_xxx) to exit ..."
+       wait_update_facet client "pgrep ll_sa" "" 35 ||
+               error "ll_sa thread is still running"
+       $LCTL get_param -n llite.*.statahead_stats
+}
+run_test 123l "Avoid panic when revalidate a local cached entry"
+
 test_124a() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"
        $LCTL get_param -n mdc.*.connect_flags | grep -q lru_resize ||