Whamcloud - gitweb
15e49105c6465a4ba75941a561a11d7f62ce4c49
[fs/lustre-release.git] / ldiskfs / kernel_patches / patches / rhel9.2 / ext4-pdirop.patch
1 Subject: [PATCH] ext4-pdirop
2
3  In a typical use case an application creates a separate output file for each
4  node and task in a job. As nodes and tasks increase, hundreds of thousands of
5  files may be created in a single directory within a short window of time.
6  Today, both filename lookup and file system modifying operations (such as
7  create and unlink) are protected with a single lock for an entire ldiskfs
8  directory. PDO project will remove this bottleneck by introducing a parallel
9  locking mechanism for entire ldiskfs directories. This work will enable
10  multiple application threads to simultaneously lookup, create and unlink in
11  parallel.
12
13 This patch contains:
14  - pdirops support for ldiskfs
15  - integrate with osd-ldiskfs
16 Signed-off-by: Liang Zhen <liang.zhen@intel.com>
17 Change-Id: I269c0e3112e68f3acd79e860dab052a68c7d7aaa
18 ---
19  fs/ext4/Makefile |   1 +
20  fs/ext4/ext4.h   |  78 ++++++++
21  fs/ext4/namei.c  | 467 ++++++++++++++++++++++++++++++++++++++++++-----
22  fs/ext4/super.c  |   1 +
23  4 files changed, 505 insertions(+), 42 deletions(-)
24
25 diff --git a/fs/ext4/Makefile b/fs/ext4/Makefile
26 index 72206a2..1d15a3a 100644
27 --- a/fs/ext4/Makefile
28 +++ b/fs/ext4/Makefile
29 @@ -7,6 +7,7 @@ obj-$(CONFIG_EXT4_FS) += ext4.o
30  
31  ext4-y := balloc.o bitmap.o block_validity.o dir.o ext4_jbd2.o extents.o \
32                 extents_status.o file.o fsmap.o fsync.o hash.o ialloc.o \
33 +               htree_lock.o \
34                 indirect.o inline.o inode.o ioctl.o mballoc.o migrate.o \
35                 mmp.o move_extent.o namei.o page-io.o readpage.o resize.o \
36                 super.o symlink.o sysfs.o xattr.o xattr_hurd.o xattr_trusted.o \
37 diff --git a/fs/ext4/ext4.h b/fs/ext4/ext4.h
38 index 9986f39..cbb58c7 100644
39 --- a/fs/ext4/ext4.h
40 +++ b/fs/ext4/ext4.h
41 @@ -29,6 +29,7 @@
42  #include <linux/mutex.h>
43  #include <linux/timer.h>
44  #include <linux/wait.h>
45 +#include <linux/htree_lock.h>
46  #include <linux/sched/signal.h>
47  #include <linux/blockgroup_lock.h>
48  #include <linux/percpu_counter.h>
49 @@ -1030,6 +1031,9 @@ struct ext4_inode_info {
50         __u32   i_dtime;
51         ext4_fsblk_t    i_file_acl;
52  
53 +       /* following fields for parallel directory operations -bzzz */
54 +       struct semaphore i_append_sem;
55 +
56         /*
57          * i_block_group is the number of the block group which contains
58          * this file's inode.  Constant across the lifetime of the inode,
59 @@ -2566,6 +2570,72 @@ struct dx_hash_info
60   */
61  #define HASH_NB_ALWAYS         1
62  
63 +/* assume name-hash is protected by upper layer */
64 +#define EXT4_HTREE_LOCK_HASH   0
65 +
66 +enum ext4_pdo_lk_types {
67 +#if EXT4_HTREE_LOCK_HASH
68 +       EXT4_LK_HASH,
69 +#endif
70 +       EXT4_LK_DX,             /* index block */
71 +       EXT4_LK_DE,             /* directory entry block */
72 +       EXT4_LK_SPIN,           /* spinlock */
73 +       EXT4_LK_MAX,
74 +};
75 +
76 +/* read-only bit */
77 +#define EXT4_LB_RO(b)          (1 << (b))
78 +/* read + write, high bits for writer */
79 +#define EXT4_LB_RW(b)          ((1 << (b)) | (1 << (EXT4_LK_MAX + (b))))
80 +
81 +enum ext4_pdo_lock_bits {
82 +       /* DX lock bits */
83 +       EXT4_LB_DX_RO           = EXT4_LB_RO(EXT4_LK_DX),
84 +       EXT4_LB_DX              = EXT4_LB_RW(EXT4_LK_DX),
85 +       /* DE lock bits */
86 +       EXT4_LB_DE_RO           = EXT4_LB_RO(EXT4_LK_DE),
87 +       EXT4_LB_DE              = EXT4_LB_RW(EXT4_LK_DE),
88 +       /* DX spinlock bits */
89 +       EXT4_LB_SPIN_RO         = EXT4_LB_RO(EXT4_LK_SPIN),
90 +       EXT4_LB_SPIN            = EXT4_LB_RW(EXT4_LK_SPIN),
91 +       /* accurate searching */
92 +       EXT4_LB_EXACT           = EXT4_LB_RO(EXT4_LK_MAX << 1),
93 +};
94 +
95 +enum ext4_pdo_lock_opc {
96 +       /* external */
97 +       EXT4_HLOCK_READDIR      = (EXT4_LB_DE_RO | EXT4_LB_DX_RO),
98 +       EXT4_HLOCK_LOOKUP       = (EXT4_LB_DE_RO | EXT4_LB_SPIN_RO |
99 +                                  EXT4_LB_EXACT),
100 +       EXT4_HLOCK_DEL          = (EXT4_LB_DE | EXT4_LB_SPIN_RO |
101 +                                  EXT4_LB_EXACT),
102 +       EXT4_HLOCK_ADD          = (EXT4_LB_DE | EXT4_LB_SPIN_RO),
103 +
104 +       /* internal */
105 +       EXT4_HLOCK_LOOKUP_SAFE  = (EXT4_LB_DE_RO | EXT4_LB_DX_RO |
106 +                                  EXT4_LB_EXACT),
107 +       EXT4_HLOCK_DEL_SAFE     = (EXT4_LB_DE | EXT4_LB_DX_RO | EXT4_LB_EXACT),
108 +       EXT4_HLOCK_SPLIT        = (EXT4_LB_DE | EXT4_LB_DX | EXT4_LB_SPIN),
109 +};
110 +
111 +extern struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits);
112 +#define ext4_htree_lock_head_free(lhead)       htree_lock_head_free(lhead)
113 +
114 +extern struct htree_lock *ext4_htree_lock_alloc(void);
115 +#define ext4_htree_lock_free(lck)              htree_lock_free(lck)
116 +
117 +extern void ext4_htree_lock(struct htree_lock *lck,
118 +                           struct htree_lock_head *lhead,
119 +                           struct inode *dir, unsigned flags);
120 +#define ext4_htree_unlock(lck)                  htree_unlock(lck)
121 +
122 +extern struct buffer_head *ext4_find_entry_locked(struct inode *dir,
123 +                                       const struct qstr *d_name,
124 +                                       struct ext4_dir_entry_2 **res_dir,
125 +                                       int *inlined, struct htree_lock *lck);
126 +extern int ext4_add_entry_locked(handle_t *handle, struct dentry *dentry,
127 +                     struct inode *inode, struct htree_lock *lck);
128 +
129  struct ext4_filename {
130         const struct qstr *usr_fname;
131         struct fscrypt_str disk_name;
132 @@ -2896,12 +2966,20 @@ void ext4_insert_dentry(struct inode *dir, struct inode *inode,
133                         void *data);
134  static inline void ext4_update_dx_flag(struct inode *inode)
135  {
136 +       /* Disable it for ldiskfs, because going from a DX directory to
137 +        * a non-DX directory while it is in use will completely break
138 +        * the htree-locking.
139 +        * If we really want to support this operation in the future,
140 +        * we need to exclusively lock the directory at here which will
141 +        * increase complexity of code */
142 +#if 0
143         if (!ext4_has_feature_dir_index(inode->i_sb) &&
144             ext4_test_inode_flag(inode, EXT4_INODE_INDEX)) {
145                 /* ext4_iget() should have caught this... */
146                 WARN_ON_ONCE(ext4_has_feature_metadata_csum(inode->i_sb));
147                 ext4_clear_inode_flag(inode, EXT4_INODE_INDEX);
148         }
149 +#endif
150  }
151  static const unsigned char ext4_filetype_table[] = {
152         DT_UNKNOWN, DT_REG, DT_DIR, DT_CHR, DT_BLK, DT_FIFO, DT_SOCK, DT_LNK
153 diff --git a/fs/ext4/namei.c b/fs/ext4/namei.c
154 index 54bbe22..9b20bc5 100644
155 --- a/fs/ext4/namei.c
156 +++ b/fs/ext4/namei.c
157 @@ -56,6 +56,7 @@ struct buffer_head *ext4_append(handle_t *handle,
158  {
159         struct ext4_map_blocks map;
160         struct buffer_head *bh;
161 +       struct ext4_inode_info *ei = EXT4_I(inode);
162         int err;
163  
164         if (unlikely(EXT4_SB(inode->i_sb)->s_max_dir_size_kb &&
165 @@ -63,6 +64,10 @@ struct buffer_head *ext4_append(handle_t *handle,
166                       EXT4_SB(inode->i_sb)->s_max_dir_size_kb)))
167                 return ERR_PTR(-ENOSPC);
168  
169 +       /* with parallel dir operations all appends
170 +       * have to be serialized -bzzz */
171 +       down(&ei->i_append_sem);
172 +
173         *block = inode->i_size >> inode->i_sb->s_blocksize_bits;
174         map.m_lblk = *block;
175         map.m_len = 1;
176 @@ -74,15 +79,18 @@ struct buffer_head *ext4_append(handle_t *handle,
177          */
178         err = ext4_map_blocks(NULL, inode, &map, 0);
179         if (err < 0)
180 -               return ERR_PTR(err);
181 +               goto err_unlock;
182         if (err) {
183                 EXT4_ERROR_INODE(inode, "Logical block already allocated");
184 -               return ERR_PTR(-EFSCORRUPTED);
185 +               err = -EFSCORRUPTED;
186 +               goto err_unlock;
187         }
188  
189         bh = ext4_bread(handle, inode, *block, EXT4_GET_BLOCKS_CREATE);
190 -       if (IS_ERR(bh))
191 +       if (IS_ERR(bh)) {
192 +               up(&ei->i_append_sem);
193                 return bh;
194 +       }
195         inode->i_size += inode->i_sb->s_blocksize;
196         EXT4_I(inode)->i_disksize = inode->i_size;
197         err = ext4_mark_inode_dirty(handle, inode);
198 @@ -93,11 +101,14 @@ struct buffer_head *ext4_append(handle_t *handle,
199                                             EXT4_JTR_NONE);
200         if (err)
201                 goto out;
202 +       up(&ei->i_append_sem);
203         return bh;
204  
205  out:
206         brelse(bh);
207         ext4_std_error(inode->i_sb, err);
208 +err_unlock:
209 +       up(&ei->i_append_sem);
210         return ERR_PTR(err);
211  }
212  
213 @@ -296,7 +307,8 @@ static unsigned dx_node_limit(struct inode *dir);
214  static struct dx_frame *dx_probe(struct ext4_filename *fname,
215                                  struct inode *dir,
216                                  struct dx_hash_info *hinfo,
217 -                                struct dx_frame *frame);
218 +                                struct dx_frame *frame,
219 +                                struct htree_lock *lck);
220  static void dx_release(struct dx_frame *frames, struct inode *dir);
221  static int dx_make_map(struct inode *dir, struct buffer_head *bh,
222                        struct dx_hash_info *hinfo,
223 @@ -312,12 +324,13 @@ static void dx_insert_block(struct dx_frame *frame,
224  static int ext4_htree_next_block(struct inode *dir, __u32 hash,
225                                  struct dx_frame *frame,
226                                  struct dx_frame *frames,
227 -                                __u32 *start_hash);
228 +                                __u32 *start_hash, struct htree_lock *lck);
229  static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
230                 struct ext4_filename *fname,
231 -               struct ext4_dir_entry_2 **res_dir);
232 +               struct ext4_dir_entry_2 **res_dir, struct htree_lock *lck);
233  static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
234 -                            struct inode *dir, struct inode *inode);
235 +                            struct inode *dir, struct inode *inode,
236 +                            struct htree_lock *lck);
237  
238  /* checksumming functions */
239  void ext4_initialize_dirent_tail(struct buffer_head *bh,
240 @@ -802,6 +815,227 @@ static inline void htree_rep_invariant_check(struct dx_entry *at,
241  }
242  #endif /* DX_DEBUG */
243  
244 +/* private data for htree_lock */
245 +struct ext4_dir_lock_data {
246 +       unsigned                ld_flags;  /* bits-map for lock types */
247 +       unsigned                ld_count;  /* # entries of the last DX block */
248 +       struct dx_entry         ld_at_entry; /* copy of leaf dx_entry */
249 +       struct dx_entry         *ld_at;    /* position of leaf dx_entry */
250 +};
251 +
252 +#define ext4_htree_lock_data(l)        ((struct ext4_dir_lock_data *)(l)->lk_private)
253 +#define ext4_find_entry(dir, name, dirent, inline) \
254 +                       ext4_find_entry_locked(dir, name, dirent, inline, NULL)
255 +#define ext4_add_entry(handle, dentry, inode) \
256 +                       ext4_add_entry_locked(handle, dentry, inode, NULL)
257 +
258 +/* NB: ext4_lblk_t is 32 bits so we use high bits to identify invalid blk */
259 +#define EXT4_HTREE_NODE_CHANGED        (0xcafeULL << 32)
260 +
261 +static void ext4_htree_event_cb(void *target, void *event)
262 +{
263 +       u64 *block = (u64 *)target;
264 +
265 +       if (*block == dx_get_block((struct dx_entry *)event))
266 +               *block = EXT4_HTREE_NODE_CHANGED;
267 +}
268 +
269 +struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits)
270 +{
271 +       struct htree_lock_head *lhead;
272 +
273 +       lhead = htree_lock_head_alloc(EXT4_LK_MAX, hbits, 0);
274 +       if (lhead != NULL) {
275 +               htree_lock_event_attach(lhead, EXT4_LK_SPIN, HTREE_EVENT_WR,
276 +                                       ext4_htree_event_cb);
277 +       }
278 +       return lhead;
279 +}
280 +EXPORT_SYMBOL(ext4_htree_lock_head_alloc);
281 +
282 +struct htree_lock *ext4_htree_lock_alloc(void)
283 +{
284 +       return htree_lock_alloc(EXT4_LK_MAX,
285 +                               sizeof(struct ext4_dir_lock_data));
286 +}
287 +EXPORT_SYMBOL(ext4_htree_lock_alloc);
288 +
289 +static htree_lock_mode_t ext4_htree_mode(unsigned flags)
290 +{
291 +       switch (flags) {
292 +       default: /* 0 or unknown flags require EX lock */
293 +               return HTREE_LOCK_EX;
294 +       case EXT4_HLOCK_READDIR:
295 +               return HTREE_LOCK_PR;
296 +       case EXT4_HLOCK_LOOKUP:
297 +               return HTREE_LOCK_CR;
298 +       case EXT4_HLOCK_DEL:
299 +       case EXT4_HLOCK_ADD:
300 +               return HTREE_LOCK_CW;
301 +       }
302 +}
303 +
304 +/* return PR for read-only operations, otherwise return EX */
305 +static inline htree_lock_mode_t ext4_htree_safe_mode(unsigned flags)
306 +{
307 +       int writer = (flags & EXT4_LB_DE) == EXT4_LB_DE;
308 +
309 +       /* 0 requires EX lock */
310 +       return (flags == 0 || writer) ? HTREE_LOCK_EX : HTREE_LOCK_PR;
311 +}
312 +
313 +static int ext4_htree_safe_locked(struct htree_lock *lck)
314 +{
315 +       int writer;
316 +
317 +       if (lck == NULL || lck->lk_mode == HTREE_LOCK_EX)
318 +               return 1;
319 +
320 +       writer = (ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_DE) ==
321 +                EXT4_LB_DE;
322 +       if (writer) /* all readers & writers are excluded? */
323 +               return lck->lk_mode == HTREE_LOCK_EX;
324 +
325 +       /* all writers are excluded? */
326 +       return lck->lk_mode == HTREE_LOCK_PR ||
327 +              lck->lk_mode == HTREE_LOCK_PW ||
328 +              lck->lk_mode == HTREE_LOCK_EX;
329 +}
330 +
331 +/* relock htree_lock with EX mode if it's change operation, otherwise
332 + * relock it with PR mode. It's noop if PDO is disabled. */
333 +static void ext4_htree_safe_relock(struct htree_lock *lck)
334 +{
335 +       if (!ext4_htree_safe_locked(lck)) {
336 +               unsigned flags = ext4_htree_lock_data(lck)->ld_flags;
337 +
338 +               htree_change_lock(lck, ext4_htree_safe_mode(flags));
339 +       }
340 +}
341 +
342 +void ext4_htree_lock(struct htree_lock *lck, struct htree_lock_head *lhead,
343 +                    struct inode *dir, unsigned flags)
344 +{
345 +       htree_lock_mode_t mode = is_dx(dir) ? ext4_htree_mode(flags) :
346 +                                             ext4_htree_safe_mode(flags);
347 +
348 +       ext4_htree_lock_data(lck)->ld_flags = flags;
349 +       htree_lock(lck, lhead, mode);
350 +       if (!is_dx(dir))
351 +               ext4_htree_safe_relock(lck); /* make sure it's safe locked */
352 +}
353 +EXPORT_SYMBOL(ext4_htree_lock);
354 +
355 +static int ext4_htree_node_lock(struct htree_lock *lck, struct dx_entry *at,
356 +                               unsigned lmask, int wait, void *ev)
357 +{
358 +       u32     key = (at == NULL) ? 0 : dx_get_block(at);
359 +       u32     mode;
360 +
361 +       /* NOOP if htree is well protected or caller doesn't require the lock */
362 +       if (ext4_htree_safe_locked(lck) ||
363 +          !(ext4_htree_lock_data(lck)->ld_flags & lmask))
364 +               return 1;
365 +
366 +       mode = (ext4_htree_lock_data(lck)->ld_flags & lmask) == lmask ?
367 +               HTREE_LOCK_PW : HTREE_LOCK_PR;
368 +       while (1) {
369 +               if (htree_node_lock_try(lck, mode, key, ffz(~lmask), wait, ev))
370 +                       return 1;
371 +               if (!(lmask & EXT4_LB_SPIN)) /* not a spinlock */
372 +                       return 0;
373 +               cpu_relax(); /* spin until granted */
374 +       }
375 +}
376 +
377 +static int ext4_htree_node_locked(struct htree_lock *lck, unsigned lmask)
378 +{
379 +       return ext4_htree_safe_locked(lck) ||
380 +              htree_node_is_granted(lck, ffz(~lmask));
381 +}
382 +
383 +static void ext4_htree_node_unlock(struct htree_lock *lck,
384 +                                  unsigned lmask, void *buf)
385 +{
386 +       /* NB: it's safe to call mutiple times or even it's not locked */
387 +       if (!ext4_htree_safe_locked(lck) &&
388 +            htree_node_is_granted(lck, ffz(~lmask)))
389 +               htree_node_unlock(lck, ffz(~lmask), buf);
390 +}
391 +
392 +#define ext4_htree_dx_lock(lck, key)           \
393 +       ext4_htree_node_lock(lck, key, EXT4_LB_DX, 1, NULL)
394 +#define ext4_htree_dx_lock_try(lck, key)       \
395 +       ext4_htree_node_lock(lck, key, EXT4_LB_DX, 0, NULL)
396 +#define ext4_htree_dx_unlock(lck)              \
397 +       ext4_htree_node_unlock(lck, EXT4_LB_DX, NULL)
398 +#define ext4_htree_dx_locked(lck)              \
399 +       ext4_htree_node_locked(lck, EXT4_LB_DX)
400 +
401 +static void ext4_htree_dx_need_lock(struct htree_lock *lck)
402 +{
403 +       struct ext4_dir_lock_data *ld;
404 +
405 +       if (ext4_htree_safe_locked(lck))
406 +               return;
407 +
408 +       ld = ext4_htree_lock_data(lck);
409 +       switch (ld->ld_flags) {
410 +       default:
411 +               return;
412 +       case EXT4_HLOCK_LOOKUP:
413 +               ld->ld_flags = EXT4_HLOCK_LOOKUP_SAFE;
414 +               return;
415 +       case EXT4_HLOCK_DEL:
416 +               ld->ld_flags = EXT4_HLOCK_DEL_SAFE;
417 +               return;
418 +       case EXT4_HLOCK_ADD:
419 +               ld->ld_flags = EXT4_HLOCK_SPLIT;
420 +               return;
421 +       }
422 +}
423 +
424 +#define ext4_htree_de_lock(lck, key)           \
425 +       ext4_htree_node_lock(lck, key, EXT4_LB_DE, 1, NULL)
426 +#define ext4_htree_de_unlock(lck)              \
427 +       ext4_htree_node_unlock(lck, EXT4_LB_DE, NULL)
428 +
429 +#define ext4_htree_spin_lock(lck, key, event)  \
430 +       ext4_htree_node_lock(lck, key, EXT4_LB_SPIN, 0, event)
431 +#define ext4_htree_spin_unlock(lck)            \
432 +       ext4_htree_node_unlock(lck, EXT4_LB_SPIN, NULL)
433 +#define ext4_htree_spin_unlock_listen(lck, p)  \
434 +       ext4_htree_node_unlock(lck, EXT4_LB_SPIN, p)
435 +
436 +static void ext4_htree_spin_stop_listen(struct htree_lock *lck)
437 +{
438 +       if (!ext4_htree_safe_locked(lck) &&
439 +           htree_node_is_listening(lck, ffz(~EXT4_LB_SPIN)))
440 +               htree_node_stop_listen(lck, ffz(~EXT4_LB_SPIN));
441 +}
442 +
443 +enum {
444 +       DX_HASH_COL_IGNORE,     /* ignore collision while probing frames */
445 +       DX_HASH_COL_YES,        /* there is collision and it does matter */
446 +       DX_HASH_COL_NO,         /* there is no collision */
447 +};
448 +
449 +static int dx_probe_hash_collision(struct htree_lock *lck,
450 +                                  struct dx_entry *entries,
451 +                                  struct dx_entry *at, u32 hash)
452 +{
453 +       if (!(lck && ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_EXACT)) {
454 +               return DX_HASH_COL_IGNORE; /* don't care about collision */
455 +
456 +       } else if (at == entries + dx_get_count(entries) - 1) {
457 +               return DX_HASH_COL_IGNORE; /* not in any leaf of this DX */
458 +
459 +       } else { /* hash collision? */
460 +               return ((dx_get_hash(at + 1) & ~1) == hash) ?
461 +                       DX_HASH_COL_YES : DX_HASH_COL_NO;
462 +       }
463 +}
464 +
465  /*
466   * Probe for a directory leaf block to search.
467   *
468 @@ -813,10 +1047,11 @@ static inline void htree_rep_invariant_check(struct dx_entry *at,
469   */
470  static struct dx_frame *
471  dx_probe(struct ext4_filename *fname, struct inode *dir,
472 -        struct dx_hash_info *hinfo, struct dx_frame *frame_in)
473 +        struct dx_hash_info *hinfo, struct dx_frame *frame_in,
474 +        struct htree_lock *lck)
475  {
476         unsigned count, indirect, level, i;
477 -       struct dx_entry *at, *entries, *p, *q, *m;
478 +       struct dx_entry *at, *entries, *p, *q, *m, *dx = NULL;
479         struct dx_root_info *info;
480         struct dx_frame *frame = frame_in;
481         struct dx_frame *ret_err = ERR_PTR(ERR_BAD_DX_DIR);
482 @@ -900,8 +1135,16 @@ dx_probe(struct ext4_filename *fname, struct inode *dir,
483         level = 0;
484         blocks[0] = 0;
485         while (1) {
486 +               if (indirect == level) { /* the last index level */
487 +                       /* NB: ext4_htree_dx_lock() could be noop if
488 +                        * DX-lock flag is not set for current operation
489 +                        */
490 +                       ext4_htree_dx_lock(lck, dx);
491 +                       ext4_htree_spin_lock(lck, dx, NULL);
492 +               }
493                 count = dx_get_count(entries);
494                 if (!count || count > dx_get_limit(entries)) {
495 +                       ext4_htree_spin_unlock(lck); /* release spin */
496                         ext4_warning_inode(dir,
497                                            "dx entry: count %u beyond limit %u",
498                                            count, dx_get_limit(entries));
499 @@ -928,6 +1171,74 @@ dx_probe(struct ext4_filename *fname, struct inode *dir,
500                 frame->entries = entries;
501                 frame->at = at;
502  
503 +               if (indirect == level) { /* the last index level */
504 +                       struct ext4_dir_lock_data *ld;
505 +                       u64 myblock;
506 +
507 +                       /* By default we only lock DE-block, however, we will
508 +                        * also lock the last level DX-block if:
509 +                        * a) there is hash collision
510 +                        *    we will set DX-lock flag (a few lines below)
511 +                        *    and redo to lock DX-block
512 +                        *    see detail in dx_probe_hash_collision()
513 +                        * b) it's a retry from splitting
514 +                        *    we need to lock the last level DX-block so nobody
515 +                        *    else can split any leaf blocks under the same
516 +                        *    DX-block, see detail in ext4_dx_add_entry()
517 +                        */
518 +                       if (ext4_htree_dx_locked(lck)) {
519 +                               /* DX-block is locked, just lock DE-block
520 +                                * and return
521 +                                */
522 +                               ext4_htree_spin_unlock(lck);
523 +                               if (!ext4_htree_safe_locked(lck))
524 +                                       ext4_htree_de_lock(lck, frame->at);
525 +                               return frame;
526 +                       }
527 +                       /* it's pdirop and no DX lock */
528 +                       if (dx_probe_hash_collision(lck, entries, at, hash) ==
529 +                           DX_HASH_COL_YES) {
530 +                               /* found hash collision, set DX-lock flag
531 +                                * and retry to abtain DX-lock
532 +                                */
533 +                               ext4_htree_spin_unlock(lck);
534 +                               ext4_htree_dx_need_lock(lck);
535 +                               continue;
536 +                       }
537 +                       ld = ext4_htree_lock_data(lck);
538 +                       /* because I don't lock DX, so @at can't be trusted
539 +                        * after I release spinlock so I have to save it
540 +                        */
541 +                       ld->ld_at = at;
542 +                       ld->ld_at_entry = *at;
543 +                       ld->ld_count = dx_get_count(entries);
544 +
545 +                       frame->at = &ld->ld_at_entry;
546 +                       myblock = dx_get_block(at);
547 +
548 +                       /* NB: ordering locking */
549 +                       ext4_htree_spin_unlock_listen(lck, &myblock);
550 +                       /* other thread can split this DE-block because:
551 +                        * a) I don't have lock for the DE-block yet
552 +                        * b) I released spinlock on DX-block
553 +                        * if it happened I can detect it by listening
554 +                        * splitting event on this DE-block
555 +                        */
556 +                       ext4_htree_de_lock(lck, frame->at);
557 +                       ext4_htree_spin_stop_listen(lck);
558 +
559 +                       if (myblock == EXT4_HTREE_NODE_CHANGED) {
560 +                               /* someone split this DE-block before
561 +                                * I locked it, I need to retry and lock
562 +                                * valid DE-block
563 +                                */
564 +                               ext4_htree_de_unlock(lck);
565 +                               continue;
566 +                       }
567 +                       return frame;
568 +               }
569 +               dx = at;
570 +
571                 block = dx_get_block(at);
572                 for (i = 0; i <= level; i++) {
573                         if (blocks[i] == block) {
574 @@ -937,8 +1248,7 @@ dx_probe(struct ext4_filename *fname, struct inode *dir,
575                                 goto fail;
576                         }
577                 }
578 -               if (++level > indirect)
579 -                       return frame;
580 +               ++level;
581                 blocks[level] = block;
582                 frame++;
583                 frame->bh = ext4_read_dirblock(dir, block, INDEX);
584 @@ -1009,7 +1319,7 @@ static void dx_release(struct dx_frame *frames, struct inode *dir)
585  static int ext4_htree_next_block(struct inode *dir, __u32 hash,
586                                  struct dx_frame *frame,
587                                  struct dx_frame *frames,
588 -                                __u32 *start_hash)
589 +                                __u32 *start_hash, struct htree_lock *lck)
590  {
591         struct dx_frame *p;
592         struct buffer_head *bh;
593 @@ -1024,12 +1334,22 @@ static int ext4_htree_next_block(struct inode *dir, __u32 hash,
594          * this loop, num_frames indicates the number of interior
595          * nodes need to be read.
596          */
597 +       ext4_htree_de_unlock(lck);
598         while (1) {
599 -               if (++(p->at) < p->entries + dx_get_count(p->entries))
600 -                       break;
601 +               if (num_frames > 0 || ext4_htree_dx_locked(lck)) {
602 +                       /* num_frames > 0 :
603 +                        *   DX block
604 +                        * ext4_htree_dx_locked:
605 +                        *   frame->at is reliable pointer returned by dx_probe,
606 +                        *   otherwise dx_probe already knew no collision */
607 +                       if (++(p->at) < p->entries + dx_get_count(p->entries))
608 +                               break;
609 +               }
610                 if (p == frames)
611                         return 0;
612                 num_frames++;
613 +               if (num_frames == 1)
614 +                       ext4_htree_dx_unlock(lck);
615                 p--;
616         }
617  
618 @@ -1052,6 +1372,13 @@ static int ext4_htree_next_block(struct inode *dir, __u32 hash,
619          * block so no check is necessary
620          */
621         while (num_frames--) {
622 +               if (num_frames == 0) {
623 +                       /* it's not always necessary, we just don't want to
624 +                        * detect hash collision again */
625 +                       ext4_htree_dx_need_lock(lck);
626 +                       ext4_htree_dx_lock(lck, p->at);
627 +               }
628 +
629                 bh = ext4_read_dirblock(dir, dx_get_block(p->at), INDEX);
630                 if (IS_ERR(bh))
631                         return PTR_ERR(bh);
632 @@ -1060,6 +1387,7 @@ static int ext4_htree_next_block(struct inode *dir, __u32 hash,
633                 p->bh = bh;
634                 p->at = p->entries = ((struct dx_node *) bh->b_data)->entries;
635         }
636 +       ext4_htree_de_lock(lck, p->at);
637         return 1;
638  }
639  
640 @@ -1221,10 +1549,10 @@ int ext4_htree_fill_tree(struct file *dir_file, __u32 start_hash,
641         }
642         hinfo.hash = start_hash;
643         hinfo.minor_hash = 0;
644 -       frame = dx_probe(NULL, dir, &hinfo, frames);
645 +       /* assume it's PR locked */
646 +       frame = dx_probe(NULL, dir, &hinfo, frames, NULL);
647         if (IS_ERR(frame))
648                 return PTR_ERR(frame);
649 -
650         /* Add '.' and '..' from the htree header */
651         if (!start_hash && !start_minor_hash) {
652                 de = (struct ext4_dir_entry_2 *) frames[0].bh->b_data;
653 @@ -1264,7 +1592,7 @@ int ext4_htree_fill_tree(struct file *dir_file, __u32 start_hash,
654                 count += ret;
655                 hashval = ~0;
656                 ret = ext4_htree_next_block(dir, HASH_NB_ALWAYS,
657 -                                           frame, frames, &hashval);
658 +                                           frame, frames, &hashval, NULL);
659                 *next_hash = hashval;
660                 if (ret < 0) {
661                         err = ret;
662 @@ -1584,7 +1912,7 @@ static int is_dx_internal_node(struct inode *dir, ext4_lblk_t block,
663  static struct buffer_head *__ext4_find_entry(struct inode *dir,
664                                              struct ext4_filename *fname,
665                                              struct ext4_dir_entry_2 **res_dir,
666 -                                            int *inlined)
667 +                                            int *inlined, struct htree_lock *lck)
668  {
669         struct super_block *sb;
670         struct buffer_head *bh_use[NAMEI_RA_SIZE];
671 @@ -1626,7 +1954,7 @@ static struct buffer_head *__ext4_find_entry(struct inode *dir,
672                 goto restart;
673         }
674         if (is_dx(dir)) {
675 -               ret = ext4_dx_find_entry(dir, fname, res_dir);
676 +               ret = ext4_dx_find_entry(dir, fname, res_dir, lck);
677                 /*
678                  * On success, or if the error was file not found,
679                  * return.  Otherwise, fall back to doing a search the
680 @@ -1636,6 +1964,7 @@ static struct buffer_head *__ext4_find_entry(struct inode *dir,
681                         goto cleanup_and_exit;
682                 dxtrace(printk(KERN_DEBUG "ext4_find_entry: dx failed, "
683                                "falling back\n"));
684 +               ext4_htree_safe_relock(lck);
685                 ret = NULL;
686         }
687         nblocks = dir->i_size >> EXT4_BLOCK_SIZE_BITS(sb);
688 @@ -1726,10 +2055,10 @@ cleanup_and_exit:
689         return ret;
690  }
691  
692 -static struct buffer_head *ext4_find_entry(struct inode *dir,
693 +struct buffer_head *ext4_find_entry_locked(struct inode *dir,
694                                            const struct qstr *d_name,
695                                            struct ext4_dir_entry_2 **res_dir,
696 -                                          int *inlined)
697 +                                          int *inlined, struct htree_lock *lck)
698  {
699         int err;
700         struct ext4_filename fname;
701 @@ -1741,12 +2070,14 @@ static struct buffer_head *ext4_find_entry(struct inode *dir,
702         if (err)
703                 return ERR_PTR(err);
704  
705 -       bh = __ext4_find_entry(dir, &fname, res_dir, inlined);
706 +       bh = __ext4_find_entry(dir, &fname, res_dir, inlined, lck);
707  
708         ext4_fname_free_filename(&fname);
709         return bh;
710  }
711  
712 +EXPORT_SYMBOL(ext4_find_entry_locked);
713 +
714  static struct buffer_head *ext4_lookup_entry(struct inode *dir,
715                                              struct dentry *dentry,
716                                              struct ext4_dir_entry_2 **res_dir)
717 @@ -1762,7 +2093,7 @@ static struct buffer_head *ext4_lookup_entry(struct inode *dir,
718         if (err)
719                 return ERR_PTR(err);
720  
721 -       bh = __ext4_find_entry(dir, &fname, res_dir, NULL);
722 +       bh = __ext4_find_entry(dir, &fname, res_dir, NULL, NULL);
723  
724         ext4_fname_free_filename(&fname);
725         return bh;
726 @@ -1770,7 +2101,8 @@ static struct buffer_head *ext4_lookup_entry(struct inode *dir,
727  
728  static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
729                         struct ext4_filename *fname,
730 -                       struct ext4_dir_entry_2 **res_dir)
731 +                       struct ext4_dir_entry_2 **res_dir,
732 +                       struct htree_lock *lck)
733  {
734         struct super_block * sb = dir->i_sb;
735         struct dx_frame frames[EXT4_HTREE_LEVEL], *frame;
736 @@ -1781,7 +2113,7 @@ static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
737  #ifdef CONFIG_FS_ENCRYPTION
738         *res_dir = NULL;
739  #endif
740 -       frame = dx_probe(fname, dir, NULL, frames);
741 +       frame = dx_probe(fname, dir, NULL, frames, lck);
742         if (IS_ERR(frame))
743                 return (struct buffer_head *) frame;
744         do {
745 @@ -1803,7 +2135,7 @@ static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
746  
747                 /* Check to see if we should continue to search */
748                 retval = ext4_htree_next_block(dir, fname->hinfo.hash, frame,
749 -                                              frames, NULL);
750 +                                              frames, NULL, lck);
751                 if (retval < 0) {
752                         ext4_warning_inode(dir,
753                                 "error %d reading directory index block",
754 @@ -1992,8 +2324,9 @@ static struct ext4_dir_entry_2 *dx_pack_dirents(struct inode *dir, char *base,
755   * Returns pointer to de in block into which the new entry will be inserted.
756   */
757  static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir,
758 -                       struct buffer_head **bh,struct dx_frame *frame,
759 -                       struct dx_hash_info *hinfo)
760 +                       struct buffer_head **bh, struct dx_frame *frames,
761 +                       struct dx_frame *frame, struct dx_hash_info *hinfo,
762 +                       struct htree_lock *lck)
763  {
764         unsigned blocksize = dir->i_sb->s_blocksize;
765         unsigned continued;
766 @@ -2070,8 +2403,14 @@ static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir,
767                                         hash2, split, count-split));
768  
769         /* Fancy dance to stay within two buffers */
770 -       de2 = dx_move_dirents(dir, data1, data2, map + split, count - split,
771 -                             blocksize);
772 +       if (hinfo->hash < hash2) {
773 +               de2 = dx_move_dirents(dir, data1, data2, map + split,
774 +                                     count - split, blocksize);
775 +       } else {
776 +               /* make sure we will add entry to the same block which
777 +                * we have already locked */
778 +               de2 = dx_move_dirents(dir, data1, data2, map, split, blocksize);
779 +       }
780         de = dx_pack_dirents(dir, data1, blocksize);
781         de->rec_len = ext4_rec_len_to_disk(data1 + (blocksize - csum_size) -
782                                            (char *) de,
783 @@ -2089,12 +2428,21 @@ static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir,
784         dxtrace(dx_show_leaf(dir, hinfo, (struct ext4_dir_entry_2 *) data2,
785                         blocksize, 1));
786  
787 -       /* Which block gets the new entry? */
788 -       if (hinfo->hash >= hash2) {
789 -               swap(*bh, bh2);
790 -               de = de2;
791 +       ext4_htree_spin_lock(lck, frame > frames ? (frame - 1)->at : NULL,
792 +                            frame->at); /* notify block is being split */
793 +       if (hinfo->hash < hash2) {
794 +               dx_insert_block(frame, hash2 + continued, newblock);
795 +
796 +       } else {
797 +               /* switch block number */
798 +               dx_insert_block(frame, hash2 + continued,
799 +                               dx_get_block(frame->at));
800 +               dx_set_block(frame->at, newblock);
801 +               (frame->at)++;
802         }
803 -       dx_insert_block(frame, hash2 + continued, newblock);
804 +       ext4_htree_spin_unlock(lck);
805 +       ext4_htree_dx_unlock(lck);
806 +
807         err = ext4_handle_dirty_dirblock(handle, dir, bh2);
808         if (err)
809                 goto journal_error;
810 @@ -2402,7 +2750,7 @@ static int make_indexed_dir(handle_t *handle, struct ext4_filename *fname,
811         if (retval)
812                 goto out_frames;
813  
814 -       de = do_split(handle,dir, &bh2, frame, &fname->hinfo);
815 +       de = do_split(handle, dir, &bh2, frames, frame, &fname->hinfo, NULL);
816         if (IS_ERR(de)) {
817                 retval = PTR_ERR(de);
818                 goto out_frames;
819 @@ -2512,8 +2860,8 @@ out:
820   * may not sleep between calling this and putting something into
821   * the entry, as someone else might have used it while you slept.
822   */
823 -static int ext4_add_entry(handle_t *handle, struct dentry *dentry,
824 -                         struct inode *inode)
825 +int ext4_add_entry_locked(handle_t *handle, struct dentry *dentry,
826 +                         struct inode *inode, struct htree_lock *lck)
827  {
828         struct inode *dir = d_inode(dentry->d_parent);
829         struct buffer_head *bh = NULL;
830 @@ -2562,9 +2910,10 @@ static int ext4_add_entry(handle_t *handle, struct dentry *dentry,
831                 if (dentry->d_name.len == 2 &&
832                     memcmp(dentry->d_name.name, "..", 2) == 0)
833                         return ext4_update_dotdot(handle, dentry, inode);
834 -               retval = ext4_dx_add_entry(handle, &fname, dir, inode);
835 +               retval = ext4_dx_add_entry(handle, &fname, dir, inode, lck);
836                 if (!retval || (retval != ERR_BAD_DX_DIR))
837                         goto out;
838 +               ext4_htree_safe_relock(lck);
839                 /* Can we just ignore htree data? */
840                 if (ext4_has_metadata_csum(sb)) {
841                         EXT4_ERROR_INODE(dir,
842 @@ -2627,12 +2976,14 @@ out:
843                 ext4_set_inode_state(inode, EXT4_STATE_NEWENTRY);
844         return retval;
845  }
846 +EXPORT_SYMBOL(ext4_add_entry_locked);
847  
848  /*
849   * Returns 0 for success, or a negative error value
850   */
851  static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
852 -                            struct inode *dir, struct inode *inode)
853 +                            struct inode *dir, struct inode *inode,
854 +                            struct htree_lock *lck)
855  {
856         struct dx_frame frames[EXT4_HTREE_LEVEL], *frame;
857         struct dx_entry *entries, *at;
858 @@ -2644,7 +2995,7 @@ static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
859  
860  again:
861         restart = 0;
862 -       frame = dx_probe(fname, dir, NULL, frames);
863 +       frame = dx_probe(fname, dir, NULL, frames, lck);
864         if (IS_ERR(frame))
865                 return PTR_ERR(frame);
866         entries = frame->entries;
867 @@ -2679,6 +3030,12 @@ again:
868                 struct dx_node *node2;
869                 struct buffer_head *bh2;
870  
871 +               if (!ext4_htree_safe_locked(lck)) { /* retry with EX lock */
872 +                       ext4_htree_safe_relock(lck);
873 +                       restart = 1;
874 +                       goto cleanup;
875 +               }
876 +
877                 while (frame > frames) {
878                         if (dx_get_count((frame - 1)->entries) <
879                             dx_get_limit((frame - 1)->entries)) {
880 @@ -2782,8 +3139,32 @@ again:
881                         restart = 1;
882                         goto journal_error;
883                 }
884 +       } else if (!ext4_htree_dx_locked(lck)) {
885 +               struct ext4_dir_lock_data *ld = ext4_htree_lock_data(lck);
886 +
887 +               /* not well protected, require DX lock */
888 +               ext4_htree_dx_need_lock(lck);
889 +               at = frame > frames ? (frame - 1)->at : NULL;
890 +
891 +               /* NB: no risk of deadlock because it's just a try.
892 +                *
893 +                * NB: we check ld_count for twice, the first time before
894 +                * having DX lock, the second time after holding DX lock.
895 +                *
896 +                * NB: We never free blocks for directory so far, which
897 +                * means value returned by dx_get_count() should equal to
898 +                * ld->ld_count if nobody split any DE-block under @at,
899 +                * and ld->ld_at still points to valid dx_entry. */
900 +               if ((ld->ld_count != dx_get_count(entries)) ||
901 +                   !ext4_htree_dx_lock_try(lck, at) ||
902 +                   (ld->ld_count != dx_get_count(entries))) {
903 +                       restart = 1;
904 +                       goto cleanup;
905 +               }
906 +               /* OK, I've got DX lock and nothing changed */
907 +               frame->at = ld->ld_at;
908         }
909 -       de = do_split(handle, dir, &bh, frame, &fname->hinfo);
910 +       de = do_split(handle, dir, &bh, frames, frame, &fname->hinfo, lck);
911         if (IS_ERR(de)) {
912                 err = PTR_ERR(de);
913                 goto cleanup;
914 @@ -2794,6 +3175,8 @@ again:
915  journal_error:
916         ext4_std_error(dir->i_sb, err); /* this is a no-op if err == 0 */
917  cleanup:
918 +       ext4_htree_dx_unlock(lck);
919 +       ext4_htree_de_unlock(lck);
920         brelse(bh);
921         dx_release(frames, dir);
922         /* @restart is true means htree-path has been changed, we need to
923 diff --git a/fs/ext4/super.c b/fs/ext4/super.c
924 index 5a4b03a..8083662 100644
925 --- a/fs/ext4/super.c
926 +++ b/fs/ext4/super.c
927 @@ -1324,6 +1324,7 @@ static struct inode *ext4_alloc_inode(struct super_block *sb)
928  
929         inode_set_iversion(&ei->vfs_inode, 1);
930         spin_lock_init(&ei->i_raw_lock);
931 +       sema_init(&ei->i_append_sem, 1);
932         INIT_LIST_HEAD(&ei->i_prealloc_list);
933         atomic_set(&ei->i_prealloc_active, 0);
934         spin_lock_init(&ei->i_prealloc_lock);
935 -- 
936 2.34.1
937