Whamcloud - gitweb
LU-14361 statahead: add tunable for fname pattern detection 92/51592/39
authorQian Yingjin <qian@ddn.com>
Thu, 6 Jul 2023 13:21:40 +0000 (09:21 -0400)
committerOleg Drokin <green@whamcloud.com>
Mon, 2 Dec 2024 05:41:19 +0000 (05:41 +0000)
This patch adds two tunable parameters for the detection of the
fname pattern statahead:
- llite.*.statahead_fname_predict_hit: when the naming of stat()
  files under a directory follows a certain name rule roughly more
  than this value, the directory is considered to meet the first
  requirement for statahead. For an example, file naming rule is
  mdtest.$rank.$i, the suffix of the stat() dentry name is number
  and do stat() for dentries with name ending with number more
  then this parameter.
- llite.*.statahead_fname_match_hit: After meet the first
  requirement, then the naming of stat() files under a directory
  continuously satisfies a certain name rule strictly more than
  this value, it will start a statahead thread to do attribute
  prefetching under the directory.

This patch also fixes the following panic:
IP: _atomic_dec_and_lock+0xc/0x70
->ll_sax_put [lustre]
->ll_statahead_thread [lustre]
->kthread

The reason is that the @lli_sax is set with NULL by wrong
statahead context (sax) put.

This patch also fixes the possible deadlock between hardlink and
batch stat-ahead operations.
Fix the test failure on lustre-rsync-test/test_6.

It must wait all inuse inodes for statahead to release during
umount.
Otherwsie, it may cause the following panic:
BUG: Dentry 0000000033ca4f3e{i=280001b840002c7,n=l4}  still in use
(1) [unmount of lustre lustre]
RIP: 0010:umount_check.cold.52+0x2f/0x3b
d_walk+0xe7/0x290
do_one_tree+0x20/0x40
shrink_dcache_for_umount+0x28/0x90
generic_shutdown_super+0x1a/0x110
kill_anon_super+0x14/0x30
deactivate_locked_super+0x34/0x70
cleanup_mnt+0x3b/0x70

RIP: 0010:ll_prep_md_op_data+0x73/0x870 [lustre]
sa_prep_data+0xde/0x350 [lustre]
sa_statahead+0x3b9/0xd20 [lustre]
ll_statahead_thread+0x1507/0x21f0 [lustre]
kthread+0x134/0x150

Test-Parameters: clientdistro=el8.10 testlist=sanity
Test-Parameters: clientdistro=el8.10 testlist=sanity
Test-Parameters: clientdistro=el8.10 testlist=sanity
Test-Parameters: clientdistro=el8.10 testlist=sanity
Test-Parameters: clientdistro=el8.10 testlist=sanity
Signed-off-by: Qian Yingjin <qian@ddn.com>
Change-Id: I42d9478e796918d9f2498ab64cf7c20b61334144
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/51592
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Lai Siyao <lai.siyao@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/lproc_llite.c
lustre/llite/statahead.c
lustre/mdt/mdt_handler.c

index 15a981f..ada9b45 100644 (file)
@@ -148,6 +148,8 @@ enum ll_sa_pattern {
        LSA_PATTERN_MASK                = (LSA_PATTERN_LIST |
                                           LSA_PATTERN_FNAME |
                                           LSA_PATTERN_ADVISE),
+       LSA_PATTERN_SFNAME              = (LSA_PATTERN_FNAME |
+                                          LSA_PATTERN_FN_SHARED),
        LSA_PATTERN_MAX,
 };
 
@@ -964,6 +966,7 @@ struct ll_sb_info {
        atomic_t                  ll_sa_total;   /* sa thread started count */
        atomic_t                  ll_sa_wrong;   /* sa stopped low hit ratio */
        atomic_t                  ll_sa_running; /* running sa thread count */
+       atomic_t                  ll_sa_refcnt;  /* inuse reference count */
        atomic_t                  ll_agl_total;  /* AGL thread started count */
        atomic_t                  ll_sa_hit_total;  /* total hit count */
        atomic_t                  ll_sa_miss_total; /* total miss count */
@@ -977,6 +980,8 @@ struct ll_sb_info {
         * the user is no longer using this directory.
         */
        unsigned long             ll_sa_timeout;
+       unsigned int              ll_sa_fname_predict_hit;
+       unsigned int              ll_sa_fname_match_hit;
        /* save s_dev before assign for clustred nfs */
        dev_t                     ll_sdev_orig;
        /* root squash */
@@ -1798,8 +1803,8 @@ void ll_ra_stats_inc(struct inode *inode, enum ra_stat which);
 #define LL_SA_CACHE_SIZE        (1 << LL_SA_CACHE_BIT)
 #define LL_SA_CACHE_MASK        (LL_SA_CACHE_SIZE - 1)
 
-#define LSA_FN_PREDICT_HIT     2
-#define LSA_FN_MATCH_HIT       4
+#define LSA_FN_PREDICT_HIT_DEF 2
+#define LSA_FN_MATCH_HIT_DEF   4
 
 /* statahead controller, per process struct, for dir only */
 struct ll_statahead_info {
index 00ad633..23b5e0d 100644 (file)
@@ -189,14 +189,18 @@ static struct ll_sb_info *ll_init_sbi(struct lustre_sb_info *lsi)
        set_bit(LL_SBI_LAZYSTATFS, sbi->ll_flags);
 
        /* metadata statahead is enabled by default */
+       sbi->ll_enable_statahead_fname = 1;
        sbi->ll_sa_running_max = LL_SA_RUNNING_DEF;
        sbi->ll_sa_batch_max = LL_SA_BATCH_DEF;
        sbi->ll_sa_max = LL_SA_REQ_MAX_DEF;
        sbi->ll_sa_min = LL_SA_REQ_MIN_DEF;
        sbi->ll_sa_timeout = LL_SA_TIMEOUT_DEF;
+       sbi->ll_sa_fname_predict_hit = LSA_FN_PREDICT_HIT_DEF;
+       sbi->ll_sa_fname_match_hit = LSA_FN_MATCH_HIT_DEF;
        atomic_set(&sbi->ll_sa_total, 0);
        atomic_set(&sbi->ll_sa_wrong, 0);
        atomic_set(&sbi->ll_sa_running, 0);
+       atomic_set(&sbi->ll_sa_refcnt, 0);
        atomic_set(&sbi->ll_agl_total, 0);
        atomic_set(&sbi->ll_sa_hit_total, 0);
        atomic_set(&sbi->ll_sa_miss_total, 0);
@@ -987,7 +991,8 @@ void ll_kill_super(struct super_block *sb)
                sb->s_dev = sbi->ll_sdev_orig;
 
                /* wait running statahead threads to quit */
-               while (atomic_read(&sbi->ll_sa_running) > 0)
+               while (atomic_read(&sbi->ll_sa_running) > 0 ||
+                      atomic_read(&sbi->ll_sa_refcnt) > 0)
                        schedule_timeout_uninterruptible(
                                cfs_time_seconds(1) >> 3);
        }
@@ -3941,6 +3946,8 @@ struct md_op_data *ll_prep_md_op_data(struct md_op_data *op_data,
                        pfid = &fid;
                rc = ll_setup_filename(dir, &dname, lookup, &fname, pfid);
                if (rc) {
+                       CERROR("%s: failed to setup filename: rc = %d\n",
+                              ll_i2sbi(i1)->ll_fsname, rc);
                        ll_finish_md_op_data(op_data);
                        return ERR_PTR(rc);
                }
index 790a389..7afeee9 100644 (file)
@@ -1131,6 +1131,63 @@ static ssize_t statahead_timeout_store(struct kobject *kobj,
 }
 LUSTRE_RW_ATTR(statahead_timeout);
 
+static ssize_t
+statahead_fname_predict_hit_show(struct kobject *kobj, struct attribute *attr,
+                                char *buf)
+{
+       struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+                                             ll_kset.kobj);
+
+       return snprintf(buf, PAGE_SIZE, "%u\n", sbi->ll_sa_fname_predict_hit);
+}
+
+static ssize_t
+statahead_fname_predict_hit_store(struct kobject *kobj, struct attribute *attr,
+                                 const char *buffer, size_t count)
+{
+       struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+                                             ll_kset.kobj);
+       unsigned long val;
+       int rc;
+
+       rc = kstrtoul(buffer, 0, &val);
+       if (rc)
+               return rc;
+
+       sbi->ll_sa_fname_predict_hit = val;
+       return count;
+}
+LUSTRE_RW_ATTR(statahead_fname_predict_hit);
+
+
+static ssize_t
+statahead_fname_match_hit_show(struct kobject *kobj, struct attribute *attr,
+                              char *buf)
+{
+       struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+                                             ll_kset.kobj);
+
+       return snprintf(buf, PAGE_SIZE, "%u\n", sbi->ll_sa_fname_match_hit);
+}
+
+static ssize_t
+statahead_fname_match_hit_store(struct kobject *kobj, struct attribute *attr,
+                               const char *buffer, size_t count)
+{
+       struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+                                             ll_kset.kobj);
+       unsigned long val;
+       int rc;
+
+       rc = kstrtoul(buffer, 0, &val);
+       if (rc)
+               return rc;
+
+       sbi->ll_sa_fname_match_hit = val;
+       return count;
+}
+LUSTRE_RW_ATTR(statahead_fname_match_hit);
+
 static ssize_t statahead_agl_show(struct kobject *kobj,
                                  struct attribute *attr,
                                  char *buf)
@@ -2410,6 +2467,8 @@ static struct attribute *llite_attrs[] = {
        &lustre_attr_statahead_max.attr,
        &lustre_attr_statahead_min.attr,
        &lustre_attr_statahead_timeout.attr,
+       &lustre_attr_statahead_fname_predict_hit.attr,
+       &lustre_attr_statahead_fname_match_hit.attr,
        &lustre_attr_statahead_agl.attr,
        &lustre_attr_lazystatfs.attr,
        &lustre_attr_statfs_max_age.attr,
index 5746de2..d9127bd 100644 (file)
@@ -551,13 +551,18 @@ static struct ll_statahead_context *ll_sax_alloc(struct inode *dir)
                spin_lock_init(&ctx->sax_cache_lock[i]);
        }
 
+       atomic_inc(&ll_i2sbi(dir)->ll_sa_refcnt);
        RETURN(ctx);
 }
 
 static inline void ll_sax_free(struct ll_statahead_context *ctx)
 {
+       struct ll_sb_info *sbi;
+
        LASSERT(ctx->sax_inode != NULL);
+       sbi = ll_i2sbi(ctx->sax_inode);
        iput(ctx->sax_inode);
+       atomic_dec(&sbi->ll_sa_refcnt);
        OBD_FREE_PTR(ctx);
 }
 
@@ -825,9 +830,14 @@ static void ll_statahead_interpret_work(struct work_struct *work)
 
        rc = ll_prep_inode(&child, pill, dir->i_sb, it);
        if (rc) {
-               CERROR("%s: getattr callback for %.*s "DFID": rc = %d\n",
+               CERROR("%s: failed to prep %.*s "DFID" inode@%pK: rc = %d\n",
                       ll_i2sbi(dir)->ll_fsname, entry->se_qstr.len,
-                      entry->se_qstr.name, PFID(&entry->se_fid), rc);
+                      entry->se_qstr.name, PFID(&entry->se_fid),
+                      entry->se_inode, rc);
+               if (entry->se_inode) {
+                       iput(entry->se_inode);
+                       entry->se_inode = NULL;
+               }
                GOTO(out, rc);
        }
 
@@ -1166,9 +1176,11 @@ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
 
        plli = ll_i2info(parent->d_inode);
        task = kthread_create_on_node(ll_agl_thread, sai, node, "ll_agl_%d",
-                                     plli->lli_stat_pid);
+                                     sai->sai_pid);
        if (IS_ERR(task)) {
-               CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
+               CERROR("%s: cannot start ll_agl_%u thread for '%pd': rc=%ld\n",
+                      ll_i2sbi(d_inode(parent))->ll_fsname, sai->sai_pid,
+                      sai->sai_dentry, PTR_ERR(task));
                RETURN_EXIT;
        }
        sai->sai_agl_task = task;
@@ -1181,11 +1193,11 @@ static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
        EXIT;
 }
 
-static int ll_statahead_by_list(struct dentry *parent)
+static int ll_statahead_by_list(struct ll_statahead_info *sai,
+                               struct dentry *parent)
 {
        struct inode *dir = parent->d_inode;
        struct ll_inode_info *lli = ll_i2info(dir);
-       struct ll_statahead_info *sai = lli->lli_sai;
        struct ll_sb_info *sbi = ll_i2sbi(dir);
        struct md_op_data *op_data;
        struct page *page = NULL;
@@ -1195,6 +1207,7 @@ static int ll_statahead_by_list(struct dentry *parent)
 
        ENTRY;
 
+       LASSERTF(sai != NULL, "Pattern %#X\n", lli->lli_sa_pattern);
        CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
               sai, parent);
 
@@ -1523,6 +1536,7 @@ static int ll_statahead_thread(void *arg)
        struct inode *dir = parent->d_inode;
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_sb_info *sbi = ll_i2sbi(dir);
+       enum ll_sa_pattern pattern;
        struct lu_batch *bh = NULL;
        struct sa_entry *entry;
        int tries = 0;
@@ -1546,10 +1560,10 @@ static int ll_statahead_thread(void *arg)
        }
 
        sai->sai_bh = bh;
-
-       switch (lli->lli_sa_pattern & LSA_PATTERN_MASK) {
+       pattern = lli->lli_sa_pattern & LSA_PATTERN_MASK;
+       switch (pattern) {
        case LSA_PATTERN_LIST:
-               rc = ll_statahead_by_list(parent);
+               rc = ll_statahead_by_list(sai, parent);
                break;
        case LSA_PATTERN_ADVISE:
                rc = ll_statahead_by_advise(sai, parent);
@@ -1577,7 +1591,14 @@ static int ll_statahead_thread(void *arg)
        while (({set_current_state(TASK_IDLE);
                /* matches smp_store_release() in ll_deauthorize_statahead() */
                smp_load_acquire(&sai->sai_task) && lli->lli_sa_enabled; })) {
-               schedule();
+               long timeout;
+
+               timeout = schedule_timeout(
+                               cfs_time_seconds(sbi->ll_sa_timeout));
+               if (timeout == 0 && pattern == LSA_PATTERN_LIST &&
+                   lli->lli_sai == NULL)
+                       CERROR("%s: ll_sa_%u LIST => FNAME no wakeup.\n",
+                              sbi->ll_fsname, sai->sai_pid);
        }
        __set_current_state(TASK_RUNNING);
 
@@ -1902,30 +1923,35 @@ static int ll_shared_statahead_check(struct inode *dir, struct dentry *dentry,
 {
        struct ll_inode_info *lli = ll_i2info(dir);
        struct ll_statahead_info *sai;
+       int rc;
 
        ENTRY;
 
        spin_lock(&lli->lli_sa_lock);
+       if (!(lli->lli_sa_pattern & LSA_PATTERN_FNAME))
+               GOTO(out_unlock, rc = 0);
+
        sai = lli->lli_sai;
        if (sai) {
-               if (sai->sai_pid == current->pid) {
-                       spin_unlock(&lli->lli_sa_lock);
-                       RETURN(0);
-               }
+               if (sai->sai_pid == current->pid)
+                       GOTO(out_unlock, rc = 0);
+
                lli->lli_sai = NULL;
                lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
        }
 
        sai = ll_find_sai_locked(ctx, current->pid);
-       if (sai) {
-               spin_unlock(&lli->lli_sa_lock);
-               RETURN(-EEXIST);
-       }
+       if (sai)
+               GOTO(out_unlock, rc = -EEXIST);
 
        lli->lli_sa_pattern |= LSA_PATTERN_FN_SHARED;
        spin_unlock(&lli->lli_sa_lock);
 
        RETURN(start_statahead_thread(dir, dentry, true));
+
+out_unlock:
+       spin_unlock(&lli->lli_sa_lock);
+       RETURN(rc);
 }
 
 /**
@@ -1941,6 +1967,7 @@ static int ll_shared_statahead_check(struct inode *dir, struct dentry *dentry,
  * \retval             negative number upon error
  */
 static int revalidate_statahead_dentry(struct inode *dir,
+                                      struct ll_statahead_info *sai,
                                       struct ll_statahead_context *ctx,
                                       struct dentry **dentryp,
                                       bool unplug)
@@ -1948,7 +1975,6 @@ static int revalidate_statahead_dentry(struct inode *dir,
        struct sa_entry *entry = NULL;
        struct ll_dentry_data *lld;
        struct ll_inode_info *lli = ll_i2info(dir);
-       struct ll_statahead_info *sai = lli->lli_sai;
        struct ll_statahead_info *info = NULL;
        int rc = 0;
 
@@ -2018,6 +2044,7 @@ static int revalidate_statahead_dentry(struct inode *dir,
 
        LASSERTF(sai != NULL, "pattern %#X entry %p se_sai %p %pd lli %p\n",
                 lli->lli_sa_pattern, entry, entry->se_sai, *dentryp, lli);
+
        if (!sa_ready(entry)) {
                spin_lock(&lli->lli_sa_lock);
                sai->sai_index_wait = entry->se_index;
@@ -2145,12 +2172,13 @@ static inline bool
 sa_pattern_fname_detect(struct inode *dir, struct dentry *dchild)
 {
        struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_sb_info *sbi = ll_i2sbi(dir);
        struct qstr *dname = &dchild->d_name;
        const unsigned char *name = dname->name;
        bool rc = false;
        int i;
 
-       if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
+       if (sbi->ll_enable_statahead_fname == 0)
                return false;
        if (lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED)
                return true;
@@ -2205,7 +2233,8 @@ sa_pattern_fname_detect(struct inode *dir, struct dentry *dchild)
                        lli->lli_sa_match_count++;
                        lli->lli_sa_fname_index = num;
 
-                       if (lli->lli_sa_match_count > LSA_FN_MATCH_HIT)
+                       if (lli->lli_sa_match_count >
+                           sbi->ll_sa_fname_match_hit)
                                GOTO(out, rc = true);
 
                        return false;
@@ -2244,6 +2273,12 @@ static inline int ll_sax_add_sai(struct ll_statahead_context *ctx,
        return 0;
 }
 
+/* Check whether it is shared FNAME statahead pattern. */
+static inline bool sa_pattern_shared_fname(struct ll_inode_info *lli)
+{
+       return (lli->lli_sa_pattern & LSA_PATTERN_SFNAME) == LSA_PATTERN_SFNAME;
+}
+
 /**
  * start statahead thread
  *
@@ -2268,6 +2303,7 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
        struct task_struct *task;
        struct ll_sb_info *sbi;
        int first = LS_FIRST_DE;
+       bool fail_free = true;
        int rc = 0;
 
        ENTRY;
@@ -2321,8 +2357,7 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
 
        /* The workload like directory listing or mdtest unique dir stat() */
        if (lli->lli_sa_pattern & LSA_PATTERN_LIST ||
-           (lli->lli_sa_pattern & (LSA_PATTERN_FN_SHARED |
-                                   LSA_PATTERN_FNAME)) == LSA_PATTERN_FNAME) {
+           (lli->lli_sa_pattern & LSA_PATTERN_SFNAME) == LSA_PATTERN_FNAME) {
                ctx = ll_sax_alloc(dir);
                if (!ctx)
                        GOTO(out, rc = -ENOMEM);
@@ -2336,8 +2371,8 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
                spin_lock(&lli->lli_sa_lock);
                if (unlikely(lli->lli_sai || lli->lli_sax ||
                             ((lli->lli_sa_pattern & LSA_PATTERN_LIST) &&
-                             !lli->lli_opendir_key &&
-                             lli->lli_stat_pid != current->pid))) {
+                             (!lli->lli_opendir_key ||
+                              lli->lli_stat_pid != current->pid)))) {
                        spin_unlock(&lli->lli_sa_lock);
                        GOTO(out, rc = -EPERM);
                }
@@ -2349,9 +2384,8 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
                lli->lli_sai = sai;
                lli->lli_sax = ctx;
                spin_unlock(&lli->lli_sa_lock);
-       } else if (lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED) {
+       } else if (sa_pattern_shared_fname(lli)) {
                /* For mdtest shared dir stat() workload */
-               LASSERT(lli->lli_sa_pattern & LSA_PATTERN_FNAME);
                ctx = ll_sax_get(dir);
                if (ctx == NULL) {
                        ctx = ll_sax_alloc(dir);
@@ -2359,39 +2393,51 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
                                GOTO(out, rc = -ENOMEM);
 
                        spin_lock(&lli->lli_sa_lock);
+                       if (!sa_pattern_shared_fname(lli)) {
+                               spin_unlock(&lli->lli_sa_lock);
+                               CERROR("%s: invalid pattern %#X.\n",
+                                      sbi->ll_fsname, lli->lli_sa_pattern);
+                               GOTO(out, rc = -EINVAL);
+                       }
+
                        if (lli->lli_sax) {
                                struct ll_statahead_context *tmp = ctx;
 
-                               if (lli->lli_sa_pattern &
-                                   LSA_PATTERN_FN_SHARED) {
-                                       ctx = lli->lli_sax;
-                                       __ll_sax_get(ctx);
-                                       rc = ll_sax_add_sai(ctx, sai);
-                               } else {
-                                       CWARN("%s: invalid pattern %#X.\n",
-                                             sbi->ll_fsname,
-                                             lli->lli_sa_pattern);
-                                       rc = -EINVAL;
-                               }
+                               ctx = lli->lli_sax;
+                               __ll_sax_get(ctx);
+                               rc = ll_sax_add_sai(ctx, sai);
 
                                spin_unlock(&lli->lli_sa_lock);
                                ll_sax_free(tmp);
-                               if (rc)
+                               if (rc) {
+                                       CERROR("%s: failed to add sai: rc=%d\n",
+                                              sbi->ll_fsname, rc);
+                                       ll_sax_put(dir, ctx);
+                                       ctx = NULL;
                                        GOTO(out, rc);
+                               }
                        } else {
                                lli->lli_sax = ctx;
                                rc = ll_sax_add_sai(ctx, sai);
+                               if (rc)
+                                       lli->lli_sax = NULL;
                                spin_unlock(&lli->lli_sa_lock);
                        }
                } else {
                        spin_lock(&lli->lli_sa_lock);
-                       if (!(lli->lli_sa_pattern & LSA_PATTERN_FN_SHARED)) {
-                               spin_unlock(&lli->lli_sa_lock);
-                               GOTO(out, rc = -EINVAL);
-                       }
-
-                       rc = ll_sax_add_sai(ctx, sai);
+                       if (sa_pattern_shared_fname(lli))
+                               rc = ll_sax_add_sai(ctx, sai);
+                       else
+                               rc = -EINVAL;
                        spin_unlock(&lli->lli_sa_lock);
+                       if (rc) {
+                               CERROR("%s: failed to add sai: pattern %#X pid=%d rc=%d\n",
+                                      sbi->ll_fsname, lli->lli_sa_pattern,
+                                      sai->sai_pid, rc);
+                               ll_sax_put(dir, ctx);
+                               ctx = NULL;
+                               GOTO(out, rc);
+                       }
                }
 
                if (rc)
@@ -2406,13 +2452,16 @@ static int start_statahead_thread(struct inode *dir, struct dentry *dentry,
               current->pid, parent);
 
        task = kthread_create_on_node(ll_statahead_thread, sai, node,
-                                     "ll_sa_%u", lli->lli_stat_pid);
+                                     "ll_sa_%u", sai->sai_pid);
        if (IS_ERR(task)) {
                spin_lock(&lli->lli_sa_lock);
                lli->lli_sai = NULL;
                spin_unlock(&lli->lli_sa_lock);
                rc = PTR_ERR(task);
-               CERROR("can't start ll_sa thread, rc: %d\n", rc);
+               CERROR("%s: cannot start ll_sa_%u thread for '%pd': rc = %d\n",
+                      sbi->ll_fsname, current->pid, sai->sai_dentry, rc);
+               /* Use @ll_sai/sax_put to release @sai and @ctx. */
+               fail_free = false;
                GOTO(out, rc);
        }
 
@@ -2443,15 +2492,29 @@ out:
                lli->lli_sa_enabled = 0;
        spin_unlock(&lli->lli_sa_lock);
 
-       if (sai)
-               ll_sai_put(sai);
-
-       if (ctx)
-               ll_sax_put(dir, ctx);
+       /*
+        * @ll_sai_free() and @ll_sax_free() will not set @lli_sai and @lli_sax
+        * with NULL. When these two values are not set by this process upon
+        * failure, we should free @sai and @ctx directly by using these two
+        * functions.
+        * If using ll_sax/sai_put(), it will change @lli_sai and @lli_sax into
+        * NULL and other statahead threads will use/access these two data
+        * structures (NULL) wrongly.
+        */
+       if (fail_free) {
+               if (sai)
+                       ll_sai_free(sai);
+               if (ctx)
+                       ll_sax_free(ctx);
+       } else {
+               if (sai)
+                       ll_sai_put(sai);
+               if (ctx)
+                       ll_sax_put(dir, ctx);
+       }
 
        if (rc)
                atomic_dec(&sbi->ll_sa_running);
-
        RETURN(rc);
 }
 
@@ -2534,7 +2597,8 @@ int ll_revalidate_statahead(struct inode *dir, struct dentry **dentryp,
        }
        spin_unlock(&lli->lli_sa_lock);
        if (ctx) {
-               rc = revalidate_statahead_dentry(dir, ctx, dentryp, unplug);
+               rc = revalidate_statahead_dentry(dir, sai, ctx,
+                                                dentryp, unplug);
                CDEBUG(D_READA, "revalidate statahead %pd: rc = %d.\n",
                       *dentryp, rc);
                if (sai)
@@ -2646,8 +2710,8 @@ int ll_ioctl_ahead(struct file *file, struct llapi_lu_ladvise2 *ladvise)
                                      "ll_sa_%u", current->pid);
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
-               CERROR("%s: cannot start ll_sa thread: rc = %d\n",
-                      sbi->ll_fsname, rc);
+               CERROR("%s: cannot start ll_sa_%u thread for '%pd': rc = %d\n",
+                      sbi->ll_fsname, sai->sai_pid, sai->sai_dentry, rc);
                GOTO(out, rc);
        }
 
@@ -2688,16 +2752,16 @@ out:
  */
 void ll_statahead_enter(struct inode *dir, struct dentry *dchild)
 {
-       struct ll_inode_info *lli;
+       struct ll_inode_info *lli = ll_i2info(dir);
+       struct ll_sb_info *sbi = ll_i2sbi(dir);
        struct qstr *dname = &dchild->d_name;
 
-       if (ll_i2sbi(dir)->ll_sa_max == 0)
+       if (sbi->ll_sa_max == 0)
                return;
 
-       if (ll_i2sbi(dir)->ll_enable_statahead_fname == 0)
+       if (sbi->ll_enable_statahead_fname == 0)
                return;
 
-       lli = ll_i2info(dir);
        if (lli->lli_sa_enabled)
                return;
 
@@ -2715,7 +2779,7 @@ void ll_statahead_enter(struct inode *dir, struct dentry *dchild)
        }
 
        lli->lli_sa_match_count++;
-       if (lli->lli_sa_match_count > LSA_FN_PREDICT_HIT) {
+       if (lli->lli_sa_match_count > sbi->ll_sa_fname_predict_hit) {
                spin_lock(&lli->lli_sa_lock);
                lli->lli_sa_pattern |= LSA_PATTERN_FN_PREDICT;
                spin_unlock(&lli->lli_sa_lock);
index 9dcae05..c5d8823 100644 (file)
@@ -2363,8 +2363,11 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
        if (rc < 0) {
                GOTO(out_child, rc);
        } else if (rc > 0) {
-               if (!(child_bits & MDS_INODELOCK_UPDATE) &&
-                   !mdt_object_remote(child)) {
+               bool hardlink_check = lhp && info->mti_batch_env &&
+                                     S_ISREG(lu_object_attr(&child->mot_obj));
+
+               if ((!(child_bits & MDS_INODELOCK_UPDATE) &&
+                    !mdt_object_remote(child)) || hardlink_check) {
                        struct md_attr *ma = &info->mti_attr;
 
                        ma->ma_valid = 0;
@@ -2373,6 +2376,35 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                        if (unlikely(rc != 0))
                                GOTO(out_child, rc);
 
+                       /*
+                        * There is a possible deadlock between link() and batch
+                        * stat-ahead on hardlinks.
+                        * link()
+                        * - Take parent DLM lock: mdt_parent_lock PW
+                        * - Take object DLM lock: mdt_object_lock EX
+                        * batch stat-ahead
+                        * - Already hold the DLM lock on one link of the
+                        *   object which will return to the client in previous
+                        *   stat operation on MDT.
+                        * - Take parent DLM lock: mdt_parent_lock PR
+                        *
+                        * Deadlock:
+                        * The link operation, which is holding the parent PW
+                        * lock, is waiting for the batch stat-ahead to release
+                        * the DLM lock on one link of the file.
+                        * The batch statahead, which is holding the DLM lock on
+                        * the file in the previous sub stat operation in the
+                        * batch RPC, currently is trying to acquire the PR DLM
+                        * lock on the parent.
+                        * To avoid this deadlock, we simply cancel the
+                        * statahead on the hardlink in a batch RPC.
+                        * Without this fix, it failed lustre-rsync-test/test_6.
+                        */
+                       if (hardlink_check && (ma->ma_valid & MA_INODE) &&
+                           (ma->ma_attr.la_valid & LA_NLINK) &&
+                           ma->ma_attr.la_nlink > 1)
+                               GOTO(out_child, rc = -ECANCELED);
+
                        /* If the file has not been changed for some time, we
                         * return not only a LOOKUP lock, but also an UPDATE
                         * lock and this might save us RPC on later STAT. For