--- /dev/null
+From 1a0f7f0b9c13ef0aa86e125f350b6733bff8db3c Mon Sep 17 00:00:00 2001
+From: Liang Zhen <liang.zhen@intel.com>
+Date: Wed, 15 Jan 2020 07:35:13 -0600
+Subject: [PATCH] LU-50 ldiskfs: parallel directory operations for ext4
+
+ In a typical use case an application creates a separate output file for each
+ node and task in a job. As nodes and tasks increase, hundreds of thousands of
+ files may be created in a single directory within a short window of time.
+ Today, both filename lookup and file system modifying operations (such as
+ create and unlink) are protected with a single lock for an entire ldiskfs
+ directory. PDO project will remove this bottleneck by introducing a parallel
+ locking mechanism for entire ldiskfs directories. This work will enable
+ multiple application threads to simultaneously lookup, create and unlink in
+ parallel.
+
+This patch contains:
+ - pdirops support for ldiskfs
+ - integrate with osd-ldiskfs
+Signed-off-by: Liang Zhen <liang.zhen@intel.com>
+Change-Id: I269c0e3112e68f3acd79e860dab052a68c7d7aaa
+---
+ fs/ext4/Makefile | 1 +
+ fs/ext4/ext4.h | 78 ++++
+ fs/ext4/htree_lock.c | 891 +++++++++++++++++++++++++++++++++++++
+ fs/ext4/namei.c | 454 +++++++++++++++++--
+ fs/ext4/super.c | 1 +
+ include/linux/htree_lock.h | 187 ++++++++
+ 6 files changed, 1572 insertions(+), 40 deletions(-)
+ create mode 100644 fs/ext4/htree_lock.c
+ create mode 100644 include/linux/htree_lock.h
+
+diff -wur a/fs/ext4/ext4.h b/fs/ext4/ext4.h
+--- a/fs/ext4/ext4.h 2020-08-30 12:06:02.782523259 -0600
++++ b/fs/ext4/ext4.h 2020-08-30 12:09:18.997212399 -0600
+@@ -29,6 +29,7 @@
+ #include <linux/timer.h>
+ #include <linux/version.h>
+ #include <linux/wait.h>
++#include <linux/htree_lock.h>
+ #include <linux/sched/signal.h>
+ #include <linux/blockgroup_lock.h>
+ #include <linux/percpu_counter.h>
+@@ -961,6 +962,9 @@
+ __u32 i_dtime;
+ ext4_fsblk_t i_file_acl;
+
++ /* following fields for parallel directory operations -bzzz */
++ struct semaphore i_append_sem;
++
+ /*
+ * i_block_group is the number of the block group which contains
+ * this file's inode. Constant across the lifetime of the inode,
+@@ -2206,6 +2210,72 @@
+ */
+ #define HASH_NB_ALWAYS 1
+
++/* assume name-hash is protected by upper layer */
++#define EXT4_HTREE_LOCK_HASH 0
++
++enum ext4_pdo_lk_types {
++#if EXT4_HTREE_LOCK_HASH
++ EXT4_LK_HASH,
++#endif
++ EXT4_LK_DX, /* index block */
++ EXT4_LK_DE, /* directory entry block */
++ EXT4_LK_SPIN, /* spinlock */
++ EXT4_LK_MAX,
++};
++
++/* read-only bit */
++#define EXT4_LB_RO(b) (1 << (b))
++/* read + write, high bits for writer */
++#define EXT4_LB_RW(b) ((1 << (b)) | (1 << (EXT4_LK_MAX + (b))))
++
++enum ext4_pdo_lock_bits {
++ /* DX lock bits */
++ EXT4_LB_DX_RO = EXT4_LB_RO(EXT4_LK_DX),
++ EXT4_LB_DX = EXT4_LB_RW(EXT4_LK_DX),
++ /* DE lock bits */
++ EXT4_LB_DE_RO = EXT4_LB_RO(EXT4_LK_DE),
++ EXT4_LB_DE = EXT4_LB_RW(EXT4_LK_DE),
++ /* DX spinlock bits */
++ EXT4_LB_SPIN_RO = EXT4_LB_RO(EXT4_LK_SPIN),
++ EXT4_LB_SPIN = EXT4_LB_RW(EXT4_LK_SPIN),
++ /* accurate searching */
++ EXT4_LB_EXACT = EXT4_LB_RO(EXT4_LK_MAX << 1),
++};
++
++enum ext4_pdo_lock_opc {
++ /* external */
++ EXT4_HLOCK_READDIR = (EXT4_LB_DE_RO | EXT4_LB_DX_RO),
++ EXT4_HLOCK_LOOKUP = (EXT4_LB_DE_RO | EXT4_LB_SPIN_RO |
++ EXT4_LB_EXACT),
++ EXT4_HLOCK_DEL = (EXT4_LB_DE | EXT4_LB_SPIN_RO |
++ EXT4_LB_EXACT),
++ EXT4_HLOCK_ADD = (EXT4_LB_DE | EXT4_LB_SPIN_RO),
++
++ /* internal */
++ EXT4_HLOCK_LOOKUP_SAFE = (EXT4_LB_DE_RO | EXT4_LB_DX_RO |
++ EXT4_LB_EXACT),
++ EXT4_HLOCK_DEL_SAFE = (EXT4_LB_DE | EXT4_LB_DX_RO | EXT4_LB_EXACT),
++ EXT4_HLOCK_SPLIT = (EXT4_LB_DE | EXT4_LB_DX | EXT4_LB_SPIN),
++};
++
++extern struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits);
++#define ext4_htree_lock_head_free(lhead) htree_lock_head_free(lhead)
++
++extern struct htree_lock *ext4_htree_lock_alloc(void);
++#define ext4_htree_lock_free(lck) htree_lock_free(lck)
++
++extern void ext4_htree_lock(struct htree_lock *lck,
++ struct htree_lock_head *lhead,
++ struct inode *dir, unsigned flags);
++#define ext4_htree_unlock(lck) htree_unlock(lck)
++
++extern struct buffer_head *ext4_find_entry_locked(struct inode *dir,
++ const struct qstr *d_name,
++ struct ext4_dir_entry_2 **res_dir,
++ int *inlined, struct htree_lock *lck);
++extern int ext4_add_entry_locked(handle_t *handle, struct dentry *dentry,
++ struct inode *inode, struct htree_lock *lck);
++
+ struct ext4_filename {
+ const struct qstr *usr_fname;
+ struct fscrypt_str disk_name;
+@@ -2573,11 +2643,20 @@
+ struct ext4_filename *fname, void *data);
+ static inline void ext4_update_dx_flag(struct inode *inode)
+ {
++ /* Disable it for ldiskfs, because going from a DX directory to
++ * a non-DX directory while it is in use will completely break
++ * the htree-locking.
++ * If we really want to support this operation in the future,
++ * we need to exclusively lock the directory at here which will
++ * increase complexity of code
++ */
++#if 0
+ if (!ext4_has_feature_dir_index(inode->i_sb)) {
+ /* ext4_iget() should have caught this... */
+ WARN_ON_ONCE(ext4_has_feature_metadata_csum(inode->i_sb));
+ ext4_clear_inode_flag(inode, EXT4_INODE_INDEX);
+ }
++#endif
+ }
+ static const unsigned char ext4_filetype_table[] = {
+ DT_UNKNOWN, DT_REG, DT_DIR, DT_CHR, DT_BLK, DT_FIFO, DT_SOCK, DT_LNK
+diff -wur a/fs/ext4/Makefile b/fs/ext4/Makefile
+--- a/fs/ext4/Makefile 2020-08-30 12:06:02.378525933 -0600
++++ b/fs/ext4/Makefile 2020-08-30 12:07:32.337927838 -0600
+@@ -7,6 +7,7 @@
+
+ ext4-y := balloc.o bitmap.o block_validity.o dir.o ext4_jbd2.o extents.o \
+ extents_status.o file.o fsmap.o fsync.o hash.o ialloc.o \
++ htree_lock.o \
+ indirect.o inline.o inode.o ioctl.o mballoc.o migrate.o \
+ mmp.o move_extent.o namei.o page-io.o readpage.o resize.o \
+ super.o symlink.o sysfs.o xattr.o xattr_trusted.o xattr_user.o
+diff -wur a/fs/ext4/namei.c b/fs/ext4/namei.c
+--- a/fs/ext4/namei.c 2020-08-30 12:06:02.746523498 -0600
++++ b/fs/ext4/namei.c 2020-08-30 12:11:25.136359125 -0600
+@@ -55,6 +55,7 @@
+ ext4_lblk_t *block)
+ {
+ struct buffer_head *bh;
++ struct ext4_inode_info *ei = EXT4_I(inode);
+ int err;
+
+ if (unlikely(EXT4_SB(inode->i_sb)->s_max_dir_size_kb &&
+@@ -62,15 +63,22 @@
+ EXT4_SB(inode->i_sb)->s_max_dir_size_kb)))
+ return ERR_PTR(-ENOSPC);
+
++ /* with parallel dir operations all appends
++ * have to be serialized -bzzz */
++ down(&ei->i_append_sem);
++
+ *block = inode->i_size >> inode->i_sb->s_blocksize_bits;
+
+ bh = ext4_bread(handle, inode, *block, EXT4_GET_BLOCKS_CREATE);
+- if (IS_ERR(bh))
++ if (IS_ERR(bh)) {
++ up(&ei->i_append_sem);
+ return bh;
++ }
+ inode->i_size += inode->i_sb->s_blocksize;
+ EXT4_I(inode)->i_disksize = inode->i_size;
+ BUFFER_TRACE(bh, "get_write_access");
+ err = ext4_journal_get_write_access(handle, bh);
++ up(&ei->i_append_sem);
+ if (err) {
+ brelse(bh);
+ ext4_std_error(inode->i_sb, err);
+@@ -264,7 +272,8 @@
+ static struct dx_frame *dx_probe(struct ext4_filename *fname,
+ struct inode *dir,
+ struct dx_hash_info *hinfo,
+- struct dx_frame *frame);
++ struct dx_frame *frame,
++ struct htree_lock *lck);
+ static void dx_release(struct dx_frame *frames);
+ static int dx_make_map(struct inode *dir, struct ext4_dir_entry_2 *de,
+ unsigned blocksize, struct dx_hash_info *hinfo,
+@@ -278,12 +287,13 @@
+ static int ext4_htree_next_block(struct inode *dir, __u32 hash,
+ struct dx_frame *frame,
+ struct dx_frame *frames,
+- __u32 *start_hash);
++ __u32 *start_hash, struct htree_lock *lck);
+ static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
+ struct ext4_filename *fname,
+- struct ext4_dir_entry_2 **res_dir);
++ struct ext4_dir_entry_2 **res_dir, struct htree_lock *lck);
+ static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
+- struct inode *dir, struct inode *inode);
++ struct inode *dir, struct inode *inode,
++ struct htree_lock *lck);
+
+ /* checksumming functions */
+ void ext4_initialize_dirent_tail(struct buffer_head *bh,
+@@ -748,6 +758,227 @@
+ }
+ #endif /* DX_DEBUG */
+
++/* private data for htree_lock */
++struct ext4_dir_lock_data {
++ unsigned ld_flags; /* bits-map for lock types */
++ unsigned ld_count; /* # entries of the last DX block */
++ struct dx_entry ld_at_entry; /* copy of leaf dx_entry */
++ struct dx_entry *ld_at; /* position of leaf dx_entry */
++};
++
++#define ext4_htree_lock_data(l) ((struct ext4_dir_lock_data *)(l)->lk_private)
++#define ext4_find_entry(dir, name, dirent, inline) \
++ ext4_find_entry_locked(dir, name, dirent, inline, NULL)
++#define ext4_add_entry(handle, dentry, inode) \
++ ext4_add_entry_locked(handle, dentry, inode, NULL)
++
++/* NB: ext4_lblk_t is 32 bits so we use high bits to identify invalid blk */
++#define EXT4_HTREE_NODE_CHANGED (0xcafeULL << 32)
++
++static void ext4_htree_event_cb(void *target, void *event)
++{
++ u64 *block = (u64 *)target;
++
++ if (*block == dx_get_block((struct dx_entry *)event))
++ *block = EXT4_HTREE_NODE_CHANGED;
++}
++
++struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits)
++{
++ struct htree_lock_head *lhead;
++
++ lhead = htree_lock_head_alloc(EXT4_LK_MAX, hbits, 0);
++ if (lhead != NULL) {
++ htree_lock_event_attach(lhead, EXT4_LK_SPIN, HTREE_EVENT_WR,
++ ext4_htree_event_cb);
++ }
++ return lhead;
++}
++EXPORT_SYMBOL(ext4_htree_lock_head_alloc);
++
++struct htree_lock *ext4_htree_lock_alloc(void)
++{
++ return htree_lock_alloc(EXT4_LK_MAX,
++ sizeof(struct ext4_dir_lock_data));
++}
++EXPORT_SYMBOL(ext4_htree_lock_alloc);
++
++static htree_lock_mode_t ext4_htree_mode(unsigned flags)
++{
++ switch (flags) {
++ default: /* 0 or unknown flags require EX lock */
++ return HTREE_LOCK_EX;
++ case EXT4_HLOCK_READDIR:
++ return HTREE_LOCK_PR;
++ case EXT4_HLOCK_LOOKUP:
++ return HTREE_LOCK_CR;
++ case EXT4_HLOCK_DEL:
++ case EXT4_HLOCK_ADD:
++ return HTREE_LOCK_CW;
++ }
++}
++
++/* return PR for read-only operations, otherwise return EX */
++static inline htree_lock_mode_t ext4_htree_safe_mode(unsigned flags)
++{
++ int writer = (flags & EXT4_LB_DE) == EXT4_LB_DE;
++
++ /* 0 requires EX lock */
++ return (flags == 0 || writer) ? HTREE_LOCK_EX : HTREE_LOCK_PR;
++}
++
++static int ext4_htree_safe_locked(struct htree_lock *lck)
++{
++ int writer;
++
++ if (lck == NULL || lck->lk_mode == HTREE_LOCK_EX)
++ return 1;
++
++ writer = (ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_DE) ==
++ EXT4_LB_DE;
++ if (writer) /* all readers & writers are excluded? */
++ return lck->lk_mode == HTREE_LOCK_EX;
++
++ /* all writers are excluded? */
++ return lck->lk_mode == HTREE_LOCK_PR ||
++ lck->lk_mode == HTREE_LOCK_PW ||
++ lck->lk_mode == HTREE_LOCK_EX;
++}
++
++/* relock htree_lock with EX mode if it's change operation, otherwise
++ * relock it with PR mode. It's noop if PDO is disabled. */
++static void ext4_htree_safe_relock(struct htree_lock *lck)
++{
++ if (!ext4_htree_safe_locked(lck)) {
++ unsigned flags = ext4_htree_lock_data(lck)->ld_flags;
++
++ htree_change_lock(lck, ext4_htree_safe_mode(flags));
++ }
++}
++
++void ext4_htree_lock(struct htree_lock *lck, struct htree_lock_head *lhead,
++ struct inode *dir, unsigned flags)
++{
++ htree_lock_mode_t mode = is_dx(dir) ? ext4_htree_mode(flags) :
++ ext4_htree_safe_mode(flags);
++
++ ext4_htree_lock_data(lck)->ld_flags = flags;
++ htree_lock(lck, lhead, mode);
++ if (!is_dx(dir))
++ ext4_htree_safe_relock(lck); /* make sure it's safe locked */
++}
++EXPORT_SYMBOL(ext4_htree_lock);
++
++static int ext4_htree_node_lock(struct htree_lock *lck, struct dx_entry *at,
++ unsigned lmask, int wait, void *ev)
++{
++ u32 key = (at == NULL) ? 0 : dx_get_block(at);
++ u32 mode;
++
++ /* NOOP if htree is well protected or caller doesn't require the lock */
++ if (ext4_htree_safe_locked(lck) ||
++ !(ext4_htree_lock_data(lck)->ld_flags & lmask))
++ return 1;
++
++ mode = (ext4_htree_lock_data(lck)->ld_flags & lmask) == lmask ?
++ HTREE_LOCK_PW : HTREE_LOCK_PR;
++ while (1) {
++ if (htree_node_lock_try(lck, mode, key, ffz(~lmask), wait, ev))
++ return 1;
++ if (!(lmask & EXT4_LB_SPIN)) /* not a spinlock */
++ return 0;
++ cpu_relax(); /* spin until granted */
++ }
++}
++
++static int ext4_htree_node_locked(struct htree_lock *lck, unsigned lmask)
++{
++ return ext4_htree_safe_locked(lck) ||
++ htree_node_is_granted(lck, ffz(~lmask));
++}
++
++static void ext4_htree_node_unlock(struct htree_lock *lck,
++ unsigned lmask, void *buf)
++{
++ /* NB: it's safe to call mutiple times or even it's not locked */
++ if (!ext4_htree_safe_locked(lck) &&
++ htree_node_is_granted(lck, ffz(~lmask)))
++ htree_node_unlock(lck, ffz(~lmask), buf);
++}
++
++#define ext4_htree_dx_lock(lck, key) \
++ ext4_htree_node_lock(lck, key, EXT4_LB_DX, 1, NULL)
++#define ext4_htree_dx_lock_try(lck, key) \
++ ext4_htree_node_lock(lck, key, EXT4_LB_DX, 0, NULL)
++#define ext4_htree_dx_unlock(lck) \
++ ext4_htree_node_unlock(lck, EXT4_LB_DX, NULL)
++#define ext4_htree_dx_locked(lck) \
++ ext4_htree_node_locked(lck, EXT4_LB_DX)
++
++static void ext4_htree_dx_need_lock(struct htree_lock *lck)
++{
++ struct ext4_dir_lock_data *ld;
++
++ if (ext4_htree_safe_locked(lck))
++ return;
++
++ ld = ext4_htree_lock_data(lck);
++ switch (ld->ld_flags) {
++ default:
++ return;
++ case EXT4_HLOCK_LOOKUP:
++ ld->ld_flags = EXT4_HLOCK_LOOKUP_SAFE;
++ return;
++ case EXT4_HLOCK_DEL:
++ ld->ld_flags = EXT4_HLOCK_DEL_SAFE;
++ return;
++ case EXT4_HLOCK_ADD:
++ ld->ld_flags = EXT4_HLOCK_SPLIT;
++ return;
++ }
++}
++
++#define ext4_htree_de_lock(lck, key) \
++ ext4_htree_node_lock(lck, key, EXT4_LB_DE, 1, NULL)
++#define ext4_htree_de_unlock(lck) \
++ ext4_htree_node_unlock(lck, EXT4_LB_DE, NULL)
++
++#define ext4_htree_spin_lock(lck, key, event) \
++ ext4_htree_node_lock(lck, key, EXT4_LB_SPIN, 0, event)
++#define ext4_htree_spin_unlock(lck) \
++ ext4_htree_node_unlock(lck, EXT4_LB_SPIN, NULL)
++#define ext4_htree_spin_unlock_listen(lck, p) \
++ ext4_htree_node_unlock(lck, EXT4_LB_SPIN, p)
++
++static void ext4_htree_spin_stop_listen(struct htree_lock *lck)
++{
++ if (!ext4_htree_safe_locked(lck) &&
++ htree_node_is_listening(lck, ffz(~EXT4_LB_SPIN)))
++ htree_node_stop_listen(lck, ffz(~EXT4_LB_SPIN));
++}
++
++enum {
++ DX_HASH_COL_IGNORE, /* ignore collision while probing frames */
++ DX_HASH_COL_YES, /* there is collision and it does matter */
++ DX_HASH_COL_NO, /* there is no collision */
++};
++
++static int dx_probe_hash_collision(struct htree_lock *lck,
++ struct dx_entry *entries,
++ struct dx_entry *at, u32 hash)
++{
++ if (!(lck && ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_EXACT)) {
++ return DX_HASH_COL_IGNORE; /* don't care about collision */
++
++ } else if (at == entries + dx_get_count(entries) - 1) {
++ return DX_HASH_COL_IGNORE; /* not in any leaf of this DX */
++
++ } else { /* hash collision? */
++ return ((dx_get_hash(at + 1) & ~1) == hash) ?
++ DX_HASH_COL_YES : DX_HASH_COL_NO;
++ }
++}
++
+ /*
+ * Probe for a directory leaf block to search.
+ *
+@@ -759,10 +990,11 @@
+ */
+ static struct dx_frame *
+ dx_probe(struct ext4_filename *fname, struct inode *dir,
+- struct dx_hash_info *hinfo, struct dx_frame *frame_in)
++ struct dx_hash_info *hinfo, struct dx_frame *frame_in,
++ struct htree_lock *lck)
+ {
+ unsigned count, indirect;
+- struct dx_entry *at, *entries, *p, *q, *m;
++ struct dx_entry *at, *entries, *p, *q, *m, *dx = NULL;
+ struct dx_root_info *info;
+ struct dx_frame *frame = frame_in;
+ struct dx_frame *ret_err = ERR_PTR(ERR_BAD_DX_DIR);
+@@ -824,8 +1056,15 @@
+
+ dxtrace(printk("Look up %x", hash));
+ while (1) {
++ if (indirect == 0) { /* the last index level */
++ /* NB: ext4_htree_dx_lock() could be noop if
++ * DX-lock flag is not set for current operation */
++ ext4_htree_dx_lock(lck, dx);
++ ext4_htree_spin_lock(lck, dx, NULL);
++ }
+ count = dx_get_count(entries);
+- if (!count || count > dx_get_limit(entries)) {
++ if (count == 0 || count > dx_get_limit(entries)) {
++ ext4_htree_spin_unlock(lck); /* release spin */
+ ext4_warning_inode(dir,
+ "dx entry: count %u beyond limit %u",
+ count, dx_get_limit(entries));
+@@ -864,8 +1103,70 @@
+ dx_get_block(at)));
+ frame->entries = entries;
+ frame->at = at;
+- if (!indirect--)
++
++ if (indirect == 0) { /* the last index level */
++ struct ext4_dir_lock_data *ld;
++ u64 myblock;
++
++ /* By default we only lock DE-block, however, we will
++ * also lock the last level DX-block if:
++ * a) there is hash collision
++ * we will set DX-lock flag (a few lines below)
++ * and redo to lock DX-block
++ * see detail in dx_probe_hash_collision()
++ * b) it's a retry from splitting
++ * we need to lock the last level DX-block so nobody
++ * else can split any leaf blocks under the same
++ * DX-block, see detail in ext4_dx_add_entry()
++ */
++ if (ext4_htree_dx_locked(lck)) {
++ /* DX-block is locked, just lock DE-block
++ * and return */
++ ext4_htree_spin_unlock(lck);
++ if (!ext4_htree_safe_locked(lck))
++ ext4_htree_de_lock(lck, frame->at);
++ return frame;
++ }
++ /* it's pdirop and no DX lock */
++ if (dx_probe_hash_collision(lck, entries, at, hash) ==
++ DX_HASH_COL_YES) {
++ /* found hash collision, set DX-lock flag
++ * and retry to abtain DX-lock */
++ ext4_htree_spin_unlock(lck);
++ ext4_htree_dx_need_lock(lck);
++ continue;
++ }
++ ld = ext4_htree_lock_data(lck);
++ /* because I don't lock DX, so @at can't be trusted
++ * after I release spinlock so I have to save it */
++ ld->ld_at = at;
++ ld->ld_at_entry = *at;
++ ld->ld_count = dx_get_count(entries);
++
++ frame->at = &ld->ld_at_entry;
++ myblock = dx_get_block(at);
++
++ /* NB: ordering locking */
++ ext4_htree_spin_unlock_listen(lck, &myblock);
++ /* other thread can split this DE-block because:
++ * a) I don't have lock for the DE-block yet
++ * b) I released spinlock on DX-block
++ * if it happened I can detect it by listening
++ * splitting event on this DE-block */
++ ext4_htree_de_lock(lck, frame->at);
++ ext4_htree_spin_stop_listen(lck);
++
++ if (myblock == EXT4_HTREE_NODE_CHANGED) {
++ /* someone split this DE-block before
++ * I locked it, I need to retry and lock
++ * valid DE-block */
++ ext4_htree_de_unlock(lck);
++ continue;
++ }
+ return frame;
++ }
++ dx = at;
++ indirect--;
+ frame++;
+ frame->bh = ext4_read_dirblock(dir, dx_get_block(at), INDEX);
+ if (IS_ERR(frame->bh)) {
+@@ -934,7 +1235,7 @@
+ static int ext4_htree_next_block(struct inode *dir, __u32 hash,
+ struct dx_frame *frame,
+ struct dx_frame *frames,
+- __u32 *start_hash)
++ __u32 *start_hash, struct htree_lock *lck)
+ {
+ struct dx_frame *p;
+ struct buffer_head *bh;
+@@ -949,12 +1250,22 @@
+ * this loop, num_frames indicates the number of interior
+ * nodes need to be read.
+ */
++ ext4_htree_de_unlock(lck);
+ while (1) {
++ if (num_frames > 0 || ext4_htree_dx_locked(lck)) {
++ /* num_frames > 0 :
++ * DX block
++ * ext4_htree_dx_locked:
++ * frame->at is reliable pointer returned by dx_probe,
++ * otherwise dx_probe already knew no collision */
+ if (++(p->at) < p->entries + dx_get_count(p->entries))
+ break;
++ }
+ if (p == frames)
+ return 0;
+ num_frames++;
++ if (num_frames == 1)
++ ext4_htree_dx_unlock(lck);
+ p--;
+ }
+
+@@ -977,6 +1288,13 @@
+ * block so no check is necessary
+ */
+ while (num_frames--) {
++ if (num_frames == 0) {
++ /* it's not always necessary, we just don't want to
++ * detect hash collision again */
++ ext4_htree_dx_need_lock(lck);
++ ext4_htree_dx_lock(lck, p->at);
++ }
++
+ bh = ext4_read_dirblock(dir, dx_get_block(p->at), INDEX);
+ if (IS_ERR(bh))
+ return PTR_ERR(bh);
+@@ -985,6 +1303,7 @@
+ p->bh = bh;
+ p->at = p->entries = ((struct dx_node *) bh->b_data)->entries;
+ }
++ ext4_htree_de_lock(lck, p->at);
+ return 1;
+ }
+
+@@ -1132,10 +1451,10 @@
+ }
+ hinfo.hash = start_hash;
+ hinfo.minor_hash = 0;
+- frame = dx_probe(NULL, dir, &hinfo, frames);
++ /* assume it's PR locked */
++ frame = dx_probe(NULL, dir, &hinfo, frames, NULL);
+ if (IS_ERR(frame))
+ return PTR_ERR(frame);
+-
+ /* Add '.' and '..' from the htree header */
+ if (!start_hash && !start_minor_hash) {
+ de = (struct ext4_dir_entry_2 *) frames[0].bh->b_data;
+@@ -1175,7 +1494,7 @@
+ count += ret;
+ hashval = ~0;
+ ret = ext4_htree_next_block(dir, HASH_NB_ALWAYS,
+- frame, frames, &hashval);
++ frame, frames, &hashval, NULL);
+ *next_hash = hashval;
+ if (ret < 0) {
+ err = ret;
+@@ -1451,7 +1770,7 @@
+ static struct buffer_head *__ext4_find_entry(struct inode *dir,
+ struct ext4_filename *fname,
+ struct ext4_dir_entry_2 **res_dir,
+- int *inlined)
++ int *inlined, struct htree_lock *lck)
+ {
+ struct super_block *sb;
+ struct buffer_head *bh_use[NAMEI_RA_SIZE];
+@@ -1493,7 +1812,7 @@
+ goto restart;
+ }
+ if (is_dx(dir)) {
+- ret = ext4_dx_find_entry(dir, fname, res_dir);
++ ret = ext4_dx_find_entry(dir, fname, res_dir, lck);
+ /*
+ * On success, or if the error was file not found,
+ * return. Otherwise, fall back to doing a search the
+@@ -1503,6 +1822,7 @@
+ goto cleanup_and_exit;
+ dxtrace(printk(KERN_DEBUG "ext4_find_entry: dx failed, "
+ "falling back\n"));
++ ext4_htree_safe_relock(lck);
+ ret = NULL;
+ }
+ nblocks = dir->i_size >> EXT4_BLOCK_SIZE_BITS(sb);
+@@ -1591,10 +1911,10 @@
+ return ret;
+ }
+
+-static struct buffer_head *ext4_find_entry(struct inode *dir,
++struct buffer_head *ext4_find_entry_locked(struct inode *dir,
+ const struct qstr *d_name,
+ struct ext4_dir_entry_2 **res_dir,
+- int *inlined)
++ int *inlined, struct htree_lock *lck)
+ {
+ int err;
+ struct ext4_filename fname;
+@@ -1606,12 +1926,14 @@
+ if (err)
+ return ERR_PTR(err);
+
+- bh = __ext4_find_entry(dir, &fname, res_dir, inlined);
++ bh = __ext4_find_entry(dir, &fname, res_dir, inlined, lck);
+
+ ext4_fname_free_filename(&fname);
+ return bh;
+ }
+
++EXPORT_SYMBOL(ext4_find_entry_locked);
++
+ static struct buffer_head *ext4_lookup_entry(struct inode *dir,
+ struct dentry *dentry,
+ struct ext4_dir_entry_2 **res_dir)
+@@ -1626,7 +1948,7 @@
+ if (err)
+ return ERR_PTR(err);
+
+- bh = __ext4_find_entry(dir, &fname, res_dir, NULL);
++ bh = __ext4_find_entry(dir, &fname, res_dir, NULL, NULL);
+
+ ext4_fname_free_filename(&fname);
+ return bh;
+@@ -1634,7 +1956,8 @@
+
+ static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
+ struct ext4_filename *fname,
+- struct ext4_dir_entry_2 **res_dir)
++ struct ext4_dir_entry_2 **res_dir,
++ struct htree_lock *lck)
+ {
+ struct super_block * sb = dir->i_sb;
+ struct dx_frame frames[EXT4_HTREE_LEVEL], *frame;
+@@ -1645,7 +1968,7 @@
+ #ifdef CONFIG_FS_ENCRYPTION
+ *res_dir = NULL;
+ #endif
+- frame = dx_probe(fname, dir, NULL, frames);
++ frame = dx_probe(fname, dir, NULL, frames, lck);
+ if (IS_ERR(frame))
+ return (struct buffer_head *) frame;
+ do {
+@@ -1667,7 +1990,7 @@
+
+ /* Check to see if we should continue to search */
+ retval = ext4_htree_next_block(dir, fname->hinfo.hash, frame,
+- frames, NULL);
++ frames, NULL, lck);
+ if (retval < 0) {
+ ext4_warning_inode(dir,
+ "error %d reading directory index block",
+@@ -1847,8 +2170,9 @@
+ * Returns pointer to de in block into which the new entry will be inserted.
+ */
+ static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir,
+- struct buffer_head **bh,struct dx_frame *frame,
+- struct dx_hash_info *hinfo)
++ struct buffer_head **bh, struct dx_frame *frames,
++ struct dx_frame *frame, struct dx_hash_info *hinfo,
++ struct htree_lock *lck)
+ {
+ unsigned blocksize = dir->i_sb->s_blocksize;
+ unsigned count, continued;
+@@ -1909,8 +2233,14 @@
+ hash2, split, count-split));
+
+ /* Fancy dance to stay within two buffers */
+- de2 = dx_move_dirents(data1, data2, map + split, count - split,
+- blocksize);
++ if (hinfo->hash < hash2) {
++ de2 = dx_move_dirents(data1, data2, map + split,
++ count - split, blocksize);
++ } else {
++ /* make sure we will add entry to the same block which
++ * we have already locked */
++ de2 = dx_move_dirents(data1, data2, map, split, blocksize);
++ }
+ de = dx_pack_dirents(data1, blocksize);
+ de->rec_len = ext4_rec_len_to_disk(data1 + (blocksize - csum_size) -
+ (char *) de,
+@@ -1928,12 +2258,21 @@
+ dxtrace(dx_show_leaf(dir, hinfo, (struct ext4_dir_entry_2 *) data2,
+ blocksize, 1));
+
+- /* Which block gets the new entry? */
+- if (hinfo->hash >= hash2) {
+- swap(*bh, bh2);
+- de = de2;
+- }
++ ext4_htree_spin_lock(lck, frame > frames ? (frame - 1)->at : NULL,
++ frame->at); /* notify block is being split */
++ if (hinfo->hash < hash2) {
+ dx_insert_block(frame, hash2 + continued, newblock);
++
++ } else {
++ /* switch block number */
++ dx_insert_block(frame, hash2 + continued,
++ dx_get_block(frame->at));
++ dx_set_block(frame->at, newblock);
++ (frame->at)++;
++ }
++ ext4_htree_spin_unlock(lck);
++ ext4_htree_dx_unlock(lck);
++
+ err = ext4_handle_dirty_dirblock(handle, dir, bh2);
+ if (err)
+ goto journal_error;
+@@ -2203,7 +2542,7 @@
+ if (retval)
+ goto out_frames;
+
+- de = do_split(handle,dir, &bh2, frame, &fname->hinfo);
++ de = do_split(handle, dir, &bh2, frames, frame, &fname->hinfo, NULL);
+ if (IS_ERR(de)) {
+ retval = PTR_ERR(de);
+ goto out_frames;
+@@ -2313,8 +2652,8 @@
+ * may not sleep between calling this and putting something into
+ * the entry, as someone else might have used it while you slept.
+ */
+-static int ext4_add_entry(handle_t *handle, struct dentry *dentry,
+- struct inode *inode)
++int ext4_add_entry_locked(handle_t *handle, struct dentry *dentry,
++ struct inode *inode, struct htree_lock *lck)
+ {
+ struct inode *dir = d_inode(dentry->d_parent);
+ struct buffer_head *bh = NULL;
+@@ -2362,9 +2701,10 @@
+ if (dentry->d_name.len == 2 &&
+ memcmp(dentry->d_name.name, "..", 2) == 0)
+ return ext4_update_dotdot(handle, dentry, inode);
+- retval = ext4_dx_add_entry(handle, &fname, dir, inode);
++ retval = ext4_dx_add_entry(handle, &fname, dir, inode, lck);
+ if (!retval || (retval != ERR_BAD_DX_DIR))
+ goto out;
++ ext4_htree_safe_relock(lck);
+ /* Can we just ignore htree data? */
+ if (ext4_has_metadata_csum(sb)) {
+ EXT4_ERROR_INODE(dir,
+@@ -2425,12 +2765,14 @@
+ ext4_set_inode_state(inode, EXT4_STATE_NEWENTRY);
+ return retval;
+ }
++EXPORT_SYMBOL(ext4_add_entry_locked);
+
+ /*
+ * Returns 0 for success, or a negative error value
+ */
+ static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
+- struct inode *dir, struct inode *inode)
++ struct inode *dir, struct inode *inode,
++ struct htree_lock *lck)
+ {
+ struct dx_frame frames[EXT4_HTREE_LEVEL], *frame;
+ struct dx_entry *entries, *at;
+@@ -2442,7 +2784,7 @@
+
+ again:
+ restart = 0;
+- frame = dx_probe(fname, dir, NULL, frames);
++ frame = dx_probe(fname, dir, NULL, frames, lck);
+ if (IS_ERR(frame))
+ return PTR_ERR(frame);
+ entries = frame->entries;
+@@ -2477,6 +2819,12 @@
+ struct dx_node *node2;
+ struct buffer_head *bh2;
+
++ if (!ext4_htree_safe_locked(lck)) { /* retry with EX lock */
++ ext4_htree_safe_relock(lck);
++ restart = 1;
++ goto cleanup;
++ }
++
+ while (frame > frames) {
+ if (dx_get_count((frame - 1)->entries) <
+ dx_get_limit((frame - 1)->entries)) {
+@@ -2579,8 +2927,32 @@
+ restart = 1;
+ goto journal_error;
+ }
++ } else if (!ext4_htree_dx_locked(lck)) {
++ struct ext4_dir_lock_data *ld = ext4_htree_lock_data(lck);
++
++ /* not well protected, require DX lock */
++ ext4_htree_dx_need_lock(lck);
++ at = frame > frames ? (frame - 1)->at : NULL;
++
++ /* NB: no risk of deadlock because it's just a try.
++ *
++ * NB: we check ld_count for twice, the first time before
++ * having DX lock, the second time after holding DX lock.
++ *
++ * NB: We never free blocks for directory so far, which
++ * means value returned by dx_get_count() should equal to
++ * ld->ld_count if nobody split any DE-block under @at,
++ * and ld->ld_at still points to valid dx_entry. */
++ if ((ld->ld_count != dx_get_count(entries)) ||
++ !ext4_htree_dx_lock_try(lck, at) ||
++ (ld->ld_count != dx_get_count(entries))) {
++ restart = 1;
++ goto cleanup;
++ }
++ /* OK, I've got DX lock and nothing changed */
++ frame->at = ld->ld_at;
+ }
+- de = do_split(handle, dir, &bh, frame, &fname->hinfo);
++ de = do_split(handle, dir, &bh, frames, frame, &fname->hinfo, lck);
+ if (IS_ERR(de)) {
+ err = PTR_ERR(de);
+ goto cleanup;
+@@ -2591,6 +2963,8 @@
+ journal_error:
+ ext4_std_error(dir->i_sb, err); /* this is a no-op if err == 0 */
+ cleanup:
++ ext4_htree_dx_unlock(lck);
++ ext4_htree_de_unlock(lck);
+ brelse(bh);
+ dx_release(frames);
+ /* @restart is true means htree-path has been changed, we need to
+diff -wur a/fs/ext4/super.c b/fs/ext4/super.c
+--- a/fs/ext4/super.c 2020-08-30 12:06:02.746523498 -0600
++++ b/fs/ext4/super.c 2020-08-30 12:07:32.345927785 -0600
+@@ -1087,6 +1087,7 @@
+
+ inode_set_iversion(&ei->vfs_inode, 1);
+ spin_lock_init(&ei->i_raw_lock);
++ sema_init(&ei->i_append_sem, 1);
+ INIT_LIST_HEAD(&ei->i_prealloc_list);
+ spin_lock_init(&ei->i_prealloc_lock);
+ ext4_es_init_tree(&ei->i_es_tree);