Whamcloud - gitweb
LU-11310 ldiskfs: Repair support for SUSE 15 again
[fs/lustre-release.git] / ldiskfs / kernel_patches / patches / sles15sp1 / ext4-pdirop.patch
1 Single directory performance is a critical for HPC workloads. In a
2 typical use case an application creates a separate output file for
3 each node and task in a job. As nodes and tasks increase, hundreds
4 of thousands of files may be created in a single directory within
5 a short window of time.
6 Today, both filename lookup and file system modifying operations
7 (such as create and unlink) are protected with a single lock for
8 an entire ldiskfs directory. PDO project will remove this
9 bottleneck by introducing a parallel locking mechanism for entire
10 ldiskfs directories. This work will enable multiple application
11 threads to simultaneously lookup, create and unlink in parallel.
12
13 This patch contains:
14  - pdirops support for ldiskfs
15  - integrate with osd-ldiskfs
16
17 ---
18  fs/ext4/Makefile           |    1 
19  fs/ext4/ext4.h             |   78 +++
20  fs/ext4/namei.c            |  446 ++++++++++++++++++++--
21  fs/ext4/super.c            |    1 
22  6 files changed, 1567 insertions(+), 37 deletions(-)
23
24 --- a/fs/ext4/Makefile
25 +++ b/fs/ext4/Makefile
26 @@ -6,6 +6,7 @@ obj-$(CONFIG_EXT4_FS) += ext4.o
27  
28  ext4-y := balloc.o bitmap.o block_validity.o dir.o ext4_jbd2.o extents.o \
29                 extents_status.o file.o fsmap.o fsync.o hash.o ialloc.o \
30 +               htree_lock.o \
31                 indirect.o inline.o inode.o ioctl.o mballoc.o migrate.o \
32                 mmp.o move_extent.o namei.o page-io.o readpage.o resize.o \
33                 super.o symlink.o sysfs.o xattr.o xattr_trusted.o xattr_user.o
34 --- a/fs/ext4/ext4.h
35 +++ b/fs/ext4/ext4.h
36 @@ -28,6 +28,7 @@
37  #include <linux/timer.h>
38  #include <linux/version.h>
39  #include <linux/wait.h>
40 +#include <linux/htree_lock.h>
41  #include <linux/sched/signal.h>
42  #include <linux/blockgroup_lock.h>
43  #include <linux/percpu_counter.h>
44 @@ -980,6 +981,9 @@ struct ext4_inode_info {
45         __u32   i_dtime;
46         ext4_fsblk_t    i_file_acl;
47  
48 +       /* following fields for parallel directory operations -bzzz */
49 +       struct semaphore i_append_sem;
50 +
51         /*
52          * i_block_group is the number of the block group which contains
53          * this file's inode.  Constant across the lifetime of the inode,
54 @@ -2162,6 +2166,72 @@ struct dx_hash_info
55   */
56  #define HASH_NB_ALWAYS         1
57  
58 +/* assume name-hash is protected by upper layer */
59 +#define EXT4_HTREE_LOCK_HASH   0
60 +
61 +enum ext4_pdo_lk_types {
62 +#if EXT4_HTREE_LOCK_HASH
63 +       EXT4_LK_HASH,
64 +#endif
65 +       EXT4_LK_DX,             /* index block */
66 +       EXT4_LK_DE,             /* directory entry block */
67 +       EXT4_LK_SPIN,           /* spinlock */
68 +       EXT4_LK_MAX,
69 +};
70 +
71 +/* read-only bit */
72 +#define EXT4_LB_RO(b)          (1 << (b))
73 +/* read + write, high bits for writer */
74 +#define EXT4_LB_RW(b)          ((1 << (b)) | (1 << (EXT4_LK_MAX + (b))))
75 +
76 +enum ext4_pdo_lock_bits {
77 +       /* DX lock bits */
78 +       EXT4_LB_DX_RO           = EXT4_LB_RO(EXT4_LK_DX),
79 +       EXT4_LB_DX              = EXT4_LB_RW(EXT4_LK_DX),
80 +       /* DE lock bits */
81 +       EXT4_LB_DE_RO           = EXT4_LB_RO(EXT4_LK_DE),
82 +       EXT4_LB_DE              = EXT4_LB_RW(EXT4_LK_DE),
83 +       /* DX spinlock bits */
84 +       EXT4_LB_SPIN_RO         = EXT4_LB_RO(EXT4_LK_SPIN),
85 +       EXT4_LB_SPIN            = EXT4_LB_RW(EXT4_LK_SPIN),
86 +       /* accurate searching */
87 +       EXT4_LB_EXACT           = EXT4_LB_RO(EXT4_LK_MAX << 1),
88 +};
89 +
90 +enum ext4_pdo_lock_opc {
91 +       /* external */
92 +       EXT4_HLOCK_READDIR      = (EXT4_LB_DE_RO | EXT4_LB_DX_RO),
93 +       EXT4_HLOCK_LOOKUP       = (EXT4_LB_DE_RO | EXT4_LB_SPIN_RO |
94 +                                  EXT4_LB_EXACT),
95 +       EXT4_HLOCK_DEL          = (EXT4_LB_DE | EXT4_LB_SPIN_RO |
96 +                                  EXT4_LB_EXACT),
97 +       EXT4_HLOCK_ADD          = (EXT4_LB_DE | EXT4_LB_SPIN_RO),
98 +
99 +       /* internal */
100 +       EXT4_HLOCK_LOOKUP_SAFE  = (EXT4_LB_DE_RO | EXT4_LB_DX_RO |
101 +                                  EXT4_LB_EXACT),
102 +       EXT4_HLOCK_DEL_SAFE     = (EXT4_LB_DE | EXT4_LB_DX_RO | EXT4_LB_EXACT),
103 +       EXT4_HLOCK_SPLIT        = (EXT4_LB_DE | EXT4_LB_DX | EXT4_LB_SPIN),
104 +};
105 +
106 +extern struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits);
107 +#define ext4_htree_lock_head_free(lhead)       htree_lock_head_free(lhead)
108 +
109 +extern struct htree_lock *ext4_htree_lock_alloc(void);
110 +#define ext4_htree_lock_free(lck)              htree_lock_free(lck)
111 +
112 +extern void ext4_htree_lock(struct htree_lock *lck,
113 +                           struct htree_lock_head *lhead,
114 +                           struct inode *dir, unsigned flags);
115 +#define ext4_htree_unlock(lck)                  htree_unlock(lck)
116 +
117 +extern struct buffer_head *__ext4_find_entry(struct inode *dir,
118 +                                       const struct qstr *d_name,
119 +                                       struct ext4_dir_entry_2 **res_dir,
120 +                                       int *inlined, struct htree_lock *lck);
121 +extern int __ext4_add_entry(handle_t *handle, struct dentry *dentry,
122 +                     struct inode *inode, struct htree_lock *lck);
123 +
124  struct ext4_filename {
125         const struct qstr *usr_fname;
126         struct fscrypt_str disk_name;
127 @@ -2473,11 +2543,19 @@ void ext4_insert_dentry(struct inode *in
128                         struct ext4_filename *fname, void *data);
129  static inline void ext4_update_dx_flag(struct inode *inode)
130  {
131 +       /* Disable it for ldiskfs, because going from a DX directory to
132 +        * a non-DX directory while it is in use will completely break
133 +        * the htree-locking.
134 +        * If we really want to support this operation in the future,
135 +        * we need to exclusively lock the directory at here which will
136 +        * increase complexity of code */
137 +#if 0
138         if (!ext4_has_feature_dir_index(inode->i_sb)) {
139                 /* ext4_iget() should have caught this... */
140                 WARN_ON_ONCE(ext4_has_feature_metadata_csum(inode->i_sb));
141                 ext4_clear_inode_flag(inode, EXT4_INODE_INDEX);
142         }
143 +#endif
144  }
145  static const unsigned char ext4_filetype_table[] = {
146         DT_UNKNOWN, DT_REG, DT_DIR, DT_CHR, DT_BLK, DT_FIFO, DT_SOCK, DT_LNK
147 --- a/fs/ext4/namei.c
148 +++ b/fs/ext4/namei.c
149 @@ -53,6 +53,7 @@ struct buffer_head *ext4_append(handle_t
150                                         ext4_lblk_t *block)
151  {
152         struct buffer_head *bh;
153 +       struct ext4_inode_info *ei = EXT4_I(inode);
154         int err;
155  
156         if (unlikely(EXT4_SB(inode->i_sb)->s_max_dir_size_kb &&
157 @@ -60,15 +61,22 @@ struct buffer_head *ext4_append(handle_t
158                       EXT4_SB(inode->i_sb)->s_max_dir_size_kb)))
159                 return ERR_PTR(-ENOSPC);
160  
161 +       /* with parallel dir operations all appends
162 +       * have to be serialized -bzzz */
163 +       down(&ei->i_append_sem);
164 +
165         *block = inode->i_size >> inode->i_sb->s_blocksize_bits;
166  
167         bh = ext4_bread(handle, inode, *block, EXT4_GET_BLOCKS_CREATE);
168 -       if (IS_ERR(bh))
169 +       if (IS_ERR(bh)) {
170 +               up(&ei->i_append_sem);
171                 return bh;
172 +       }
173         inode->i_size += inode->i_sb->s_blocksize;
174         EXT4_I(inode)->i_disksize = inode->i_size;
175         BUFFER_TRACE(bh, "get_write_access");
176         err = ext4_journal_get_write_access(handle, bh);
177 +       up(&ei->i_append_sem);
178         if (err) {
179                 brelse(bh);
180                 ext4_std_error(inode->i_sb, err);
181 @@ -249,7 +257,8 @@ static unsigned dx_node_limit(struct ino
182  static struct dx_frame *dx_probe(struct ext4_filename *fname,
183                                  struct inode *dir,
184                                  struct dx_hash_info *hinfo,
185 -                                struct dx_frame *frame);
186 +                                struct dx_frame *frame,
187 +                                struct htree_lock *lck);
188  static void dx_release(struct dx_frame *frames);
189  static int dx_make_map(struct inode *dir, struct ext4_dir_entry_2 *de,
190                        unsigned blocksize, struct dx_hash_info *hinfo,
191 @@ -263,12 +272,13 @@ static void dx_insert_block(struct dx_fr
192  static int ext4_htree_next_block(struct inode *dir, __u32 hash,
193                                  struct dx_frame *frame,
194                                  struct dx_frame *frames,
195 -                                __u32 *start_hash);
196 +                                __u32 *start_hash, struct htree_lock *lck);
197  static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
198                 struct ext4_filename *fname,
199 -               struct ext4_dir_entry_2 **res_dir);
200 +               struct ext4_dir_entry_2 **res_dir, struct htree_lock *lck);
201  static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
202 -                            struct inode *dir, struct inode *inode);
203 +                            struct inode *dir, struct inode *inode,
204 +                            struct htree_lock *lck);
205  
206  /* checksumming functions */
207  void initialize_dirent_tail(struct ext4_dir_entry_tail *t,
208 @@ -732,6 +742,227 @@ struct stats dx_show_entries(struct dx_h
209  }
210  #endif /* DX_DEBUG */
211  
212 +/* private data for htree_lock */
213 +struct ext4_dir_lock_data {
214 +       unsigned                ld_flags;  /* bits-map for lock types */
215 +       unsigned                ld_count;  /* # entries of the last DX block */
216 +       struct dx_entry         ld_at_entry; /* copy of leaf dx_entry */
217 +       struct dx_entry         *ld_at;    /* position of leaf dx_entry */
218 +};
219 +
220 +#define ext4_htree_lock_data(l)        ((struct ext4_dir_lock_data *)(l)->lk_private)
221 +#define ext4_find_entry(dir, name, dirent, inline) \
222 +                       __ext4_find_entry(dir, name, dirent, inline, NULL)
223 +#define ext4_add_entry(handle, dentry, inode) \
224 +                       __ext4_add_entry(handle, dentry, inode, NULL)
225 +
226 +/* NB: ext4_lblk_t is 32 bits so we use high bits to identify invalid blk */
227 +#define EXT4_HTREE_NODE_CHANGED        (0xcafeULL << 32)
228 +
229 +static void ext4_htree_event_cb(void *target, void *event)
230 +{
231 +       u64 *block = (u64 *)target;
232 +
233 +       if (*block == dx_get_block((struct dx_entry *)event))
234 +               *block = EXT4_HTREE_NODE_CHANGED;
235 +}
236 +
237 +struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits)
238 +{
239 +       struct htree_lock_head *lhead;
240 +
241 +       lhead = htree_lock_head_alloc(EXT4_LK_MAX, hbits, 0);
242 +       if (lhead != NULL) {
243 +               htree_lock_event_attach(lhead, EXT4_LK_SPIN, HTREE_EVENT_WR,
244 +                                       ext4_htree_event_cb);
245 +       }
246 +       return lhead;
247 +}
248 +EXPORT_SYMBOL(ext4_htree_lock_head_alloc);
249 +
250 +struct htree_lock *ext4_htree_lock_alloc(void)
251 +{
252 +       return htree_lock_alloc(EXT4_LK_MAX,
253 +                               sizeof(struct ext4_dir_lock_data));
254 +}
255 +EXPORT_SYMBOL(ext4_htree_lock_alloc);
256 +
257 +static htree_lock_mode_t ext4_htree_mode(unsigned flags)
258 +{
259 +       switch (flags) {
260 +       default: /* 0 or unknown flags require EX lock */
261 +               return HTREE_LOCK_EX;
262 +       case EXT4_HLOCK_READDIR:
263 +               return HTREE_LOCK_PR;
264 +       case EXT4_HLOCK_LOOKUP:
265 +               return HTREE_LOCK_CR;
266 +       case EXT4_HLOCK_DEL:
267 +       case EXT4_HLOCK_ADD:
268 +               return HTREE_LOCK_CW;
269 +       }
270 +}
271 +
272 +/* return PR for read-only operations, otherwise return EX */
273 +static inline htree_lock_mode_t ext4_htree_safe_mode(unsigned flags)
274 +{
275 +       int writer = (flags & EXT4_LB_DE) == EXT4_LB_DE;
276 +
277 +       /* 0 requires EX lock */
278 +       return (flags == 0 || writer) ? HTREE_LOCK_EX : HTREE_LOCK_PR;
279 +}
280 +
281 +static int ext4_htree_safe_locked(struct htree_lock *lck)
282 +{
283 +       int writer;
284 +
285 +       if (lck == NULL || lck->lk_mode == HTREE_LOCK_EX)
286 +               return 1;
287 +
288 +       writer = (ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_DE) ==
289 +                EXT4_LB_DE;
290 +       if (writer) /* all readers & writers are excluded? */
291 +               return lck->lk_mode == HTREE_LOCK_EX;
292 +
293 +       /* all writers are excluded? */
294 +       return lck->lk_mode == HTREE_LOCK_PR ||
295 +              lck->lk_mode == HTREE_LOCK_PW ||
296 +              lck->lk_mode == HTREE_LOCK_EX;
297 +}
298 +
299 +/* relock htree_lock with EX mode if it's change operation, otherwise
300 + * relock it with PR mode. It's noop if PDO is disabled. */
301 +static void ext4_htree_safe_relock(struct htree_lock *lck)
302 +{
303 +       if (!ext4_htree_safe_locked(lck)) {
304 +               unsigned flags = ext4_htree_lock_data(lck)->ld_flags;
305 +
306 +               htree_change_lock(lck, ext4_htree_safe_mode(flags));
307 +       }
308 +}
309 +
310 +void ext4_htree_lock(struct htree_lock *lck, struct htree_lock_head *lhead,
311 +                    struct inode *dir, unsigned flags)
312 +{
313 +       htree_lock_mode_t mode = is_dx(dir) ? ext4_htree_mode(flags) :
314 +                                             ext4_htree_safe_mode(flags);
315 +
316 +       ext4_htree_lock_data(lck)->ld_flags = flags;
317 +       htree_lock(lck, lhead, mode);
318 +       if (!is_dx(dir))
319 +               ext4_htree_safe_relock(lck); /* make sure it's safe locked */
320 +}
321 +EXPORT_SYMBOL(ext4_htree_lock);
322 +
323 +static int ext4_htree_node_lock(struct htree_lock *lck, struct dx_entry *at,
324 +                               unsigned lmask, int wait, void *ev)
325 +{
326 +       u32     key = (at == NULL) ? 0 : dx_get_block(at);
327 +       u32     mode;
328 +
329 +       /* NOOP if htree is well protected or caller doesn't require the lock */
330 +       if (ext4_htree_safe_locked(lck) ||
331 +          !(ext4_htree_lock_data(lck)->ld_flags & lmask))
332 +               return 1;
333 +
334 +       mode = (ext4_htree_lock_data(lck)->ld_flags & lmask) == lmask ?
335 +               HTREE_LOCK_PW : HTREE_LOCK_PR;
336 +       while (1) {
337 +               if (htree_node_lock_try(lck, mode, key, ffz(~lmask), wait, ev))
338 +                       return 1;
339 +               if (!(lmask & EXT4_LB_SPIN)) /* not a spinlock */
340 +                       return 0;
341 +               cpu_relax(); /* spin until granted */
342 +       }
343 +}
344 +
345 +static int ext4_htree_node_locked(struct htree_lock *lck, unsigned lmask)
346 +{
347 +       return ext4_htree_safe_locked(lck) ||
348 +              htree_node_is_granted(lck, ffz(~lmask));
349 +}
350 +
351 +static void ext4_htree_node_unlock(struct htree_lock *lck,
352 +                                  unsigned lmask, void *buf)
353 +{
354 +       /* NB: it's safe to call mutiple times or even it's not locked */
355 +       if (!ext4_htree_safe_locked(lck) &&
356 +            htree_node_is_granted(lck, ffz(~lmask)))
357 +               htree_node_unlock(lck, ffz(~lmask), buf);
358 +}
359 +
360 +#define ext4_htree_dx_lock(lck, key)           \
361 +       ext4_htree_node_lock(lck, key, EXT4_LB_DX, 1, NULL)
362 +#define ext4_htree_dx_lock_try(lck, key)       \
363 +       ext4_htree_node_lock(lck, key, EXT4_LB_DX, 0, NULL)
364 +#define ext4_htree_dx_unlock(lck)              \
365 +       ext4_htree_node_unlock(lck, EXT4_LB_DX, NULL)
366 +#define ext4_htree_dx_locked(lck)              \
367 +       ext4_htree_node_locked(lck, EXT4_LB_DX)
368 +
369 +static void ext4_htree_dx_need_lock(struct htree_lock *lck)
370 +{
371 +       struct ext4_dir_lock_data *ld;
372 +
373 +       if (ext4_htree_safe_locked(lck))
374 +               return;
375 +
376 +       ld = ext4_htree_lock_data(lck);
377 +       switch (ld->ld_flags) {
378 +       default:
379 +               return;
380 +       case EXT4_HLOCK_LOOKUP:
381 +               ld->ld_flags = EXT4_HLOCK_LOOKUP_SAFE;
382 +               return;
383 +       case EXT4_HLOCK_DEL:
384 +               ld->ld_flags = EXT4_HLOCK_DEL_SAFE;
385 +               return;
386 +       case EXT4_HLOCK_ADD:
387 +               ld->ld_flags = EXT4_HLOCK_SPLIT;
388 +               return;
389 +       }
390 +}
391 +
392 +#define ext4_htree_de_lock(lck, key)           \
393 +       ext4_htree_node_lock(lck, key, EXT4_LB_DE, 1, NULL)
394 +#define ext4_htree_de_unlock(lck)              \
395 +       ext4_htree_node_unlock(lck, EXT4_LB_DE, NULL)
396 +
397 +#define ext4_htree_spin_lock(lck, key, event)  \
398 +       ext4_htree_node_lock(lck, key, EXT4_LB_SPIN, 0, event)
399 +#define ext4_htree_spin_unlock(lck)            \
400 +       ext4_htree_node_unlock(lck, EXT4_LB_SPIN, NULL)
401 +#define ext4_htree_spin_unlock_listen(lck, p)  \
402 +       ext4_htree_node_unlock(lck, EXT4_LB_SPIN, p)
403 +
404 +static void ext4_htree_spin_stop_listen(struct htree_lock *lck)
405 +{
406 +       if (!ext4_htree_safe_locked(lck) &&
407 +           htree_node_is_listening(lck, ffz(~EXT4_LB_SPIN)))
408 +               htree_node_stop_listen(lck, ffz(~EXT4_LB_SPIN));
409 +}
410 +
411 +enum {
412 +       DX_HASH_COL_IGNORE,     /* ignore collision while probing frames */
413 +       DX_HASH_COL_YES,        /* there is collision and it does matter */
414 +       DX_HASH_COL_NO,         /* there is no collision */
415 +};
416 +
417 +static int dx_probe_hash_collision(struct htree_lock *lck,
418 +                                  struct dx_entry *entries,
419 +                                  struct dx_entry *at, u32 hash)
420 +{
421 +       if (!(lck && ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_EXACT)) {
422 +               return DX_HASH_COL_IGNORE; /* don't care about collision */
423 +
424 +       } else if (at == entries + dx_get_count(entries) - 1) {
425 +               return DX_HASH_COL_IGNORE; /* not in any leaf of this DX */
426 +
427 +       } else { /* hash collision? */
428 +               return ((dx_get_hash(at + 1) & ~1) == hash) ?
429 +                       DX_HASH_COL_YES : DX_HASH_COL_NO;
430 +       }
431 +}
432 +
433  /*
434   * Probe for a directory leaf block to search.
435   *
436 @@ -743,10 +974,11 @@ struct stats dx_show_entries(struct dx_h
437   */
438  static struct dx_frame *
439  dx_probe(struct ext4_filename *fname, struct inode *dir,
440 -        struct dx_hash_info *hinfo, struct dx_frame *frame_in)
441 +        struct dx_hash_info *hinfo, struct dx_frame *frame_in,
442 +        struct htree_lock *lck)
443  {
444         unsigned count, indirect;
445 -       struct dx_entry *at, *entries, *p, *q, *m;
446 +       struct dx_entry *at, *entries, *p, *q, *m, *dx = NULL;
447         struct dx_root_info *info;
448         struct dx_frame *frame = frame_in;
449         struct dx_frame *ret_err = ERR_PTR(ERR_BAD_DX_DIR);
450 @@ -809,8 +1041,15 @@ dx_probe(struct ext4_filename *fname, st
451  
452         dxtrace(printk("Look up %x", hash));
453         while (1) {
454 +               if (indirect == 0) { /* the last index level */
455 +                       /* NB: ext4_htree_dx_lock() could be noop if
456 +                        * DX-lock flag is not set for current operation */
457 +                       ext4_htree_dx_lock(lck, dx);
458 +                       ext4_htree_spin_lock(lck, dx, NULL);
459 +               }
460                 count = dx_get_count(entries);
461 -               if (!count || count > dx_get_limit(entries)) {
462 +               if (count == 0 || count > dx_get_limit(entries)) {
463 +                       ext4_htree_spin_unlock(lck); /* release spin */
464                         ext4_warning_inode(dir,
465                                            "dx entry: count %u beyond limit %u",
466                                            count, dx_get_limit(entries));
467 @@ -849,8 +1088,70 @@ dx_probe(struct ext4_filename *fname, st
468                                dx_get_block(at)));
469                 frame->entries = entries;
470                 frame->at = at;
471 -               if (!indirect--)
472 +
473 +               if (indirect == 0) { /* the last index level */
474 +                       struct ext4_dir_lock_data *ld;
475 +                       u64 myblock;
476 +
477 +                       /* By default we only lock DE-block, however, we will
478 +                        * also lock the last level DX-block if:
479 +                        * a) there is hash collision
480 +                        *    we will set DX-lock flag (a few lines below)
481 +                        *    and redo to lock DX-block
482 +                        *    see detail in dx_probe_hash_collision()
483 +                        * b) it's a retry from splitting
484 +                        *    we need to lock the last level DX-block so nobody
485 +                        *    else can split any leaf blocks under the same
486 +                        *    DX-block, see detail in ext4_dx_add_entry()
487 +                        */
488 +                       if (ext4_htree_dx_locked(lck)) {
489 +                               /* DX-block is locked, just lock DE-block
490 +                                * and return */
491 +                               ext4_htree_spin_unlock(lck);
492 +                               if (!ext4_htree_safe_locked(lck))
493 +                                       ext4_htree_de_lock(lck, frame->at);
494 +                               return frame;
495 +                       }
496 +                       /* it's pdirop and no DX lock */
497 +                       if (dx_probe_hash_collision(lck, entries, at, hash) ==
498 +                           DX_HASH_COL_YES) {
499 +                               /* found hash collision, set DX-lock flag
500 +                                * and retry to abtain DX-lock */
501 +                               ext4_htree_spin_unlock(lck);
502 +                               ext4_htree_dx_need_lock(lck);
503 +                               continue;
504 +                       }
505 +                       ld = ext4_htree_lock_data(lck);
506 +                       /* because I don't lock DX, so @at can't be trusted
507 +                        * after I release spinlock so I have to save it */
508 +                       ld->ld_at = at;
509 +                       ld->ld_at_entry = *at;
510 +                       ld->ld_count = dx_get_count(entries);
511 +
512 +                       frame->at = &ld->ld_at_entry;
513 +                       myblock = dx_get_block(at);
514 +
515 +                       /* NB: ordering locking */
516 +                       ext4_htree_spin_unlock_listen(lck, &myblock);
517 +                       /* other thread can split this DE-block because:
518 +                        * a) I don't have lock for the DE-block yet
519 +                        * b) I released spinlock on DX-block
520 +                        * if it happened I can detect it by listening
521 +                        * splitting event on this DE-block */
522 +                       ext4_htree_de_lock(lck, frame->at);
523 +                       ext4_htree_spin_stop_listen(lck);
524 +
525 +                       if (myblock == EXT4_HTREE_NODE_CHANGED) {
526 +                               /* someone split this DE-block before
527 +                                * I locked it, I need to retry and lock
528 +                                * valid DE-block */
529 +                               ext4_htree_de_unlock(lck);
530 +                               continue;
531 +                       }
532                         return frame;
533 +               }
534 +               dx = at;
535 +               indirect--;
536                 frame++;
537                 frame->bh = ext4_read_dirblock(dir, dx_get_block(at), INDEX);
538                 if (IS_ERR(frame->bh)) {
539 @@ -917,7 +1218,7 @@ static void dx_release(struct dx_frame *
540  static int ext4_htree_next_block(struct inode *dir, __u32 hash,
541                                  struct dx_frame *frame,
542                                  struct dx_frame *frames,
543 -                                __u32 *start_hash)
544 +                                __u32 *start_hash, struct htree_lock *lck)
545  {
546         struct dx_frame *p;
547         struct buffer_head *bh;
548 @@ -932,12 +1233,22 @@ static int ext4_htree_next_block(struct
549          * this loop, num_frames indicates the number of interior
550          * nodes need to be read.
551          */
552 +       ext4_htree_de_unlock(lck);
553         while (1) {
554 -               if (++(p->at) < p->entries + dx_get_count(p->entries))
555 -                       break;
556 +               if (num_frames > 0 || ext4_htree_dx_locked(lck)) {
557 +                       /* num_frames > 0 :
558 +                        *   DX block
559 +                        * ext4_htree_dx_locked:
560 +                        *   frame->at is reliable pointer returned by dx_probe,
561 +                        *   otherwise dx_probe already knew no collision */
562 +                       if (++(p->at) < p->entries + dx_get_count(p->entries))
563 +                               break;
564 +               }
565                 if (p == frames)
566                         return 0;
567                 num_frames++;
568 +               if (num_frames == 1)
569 +                       ext4_htree_dx_unlock(lck);
570                 p--;
571         }
572  
573 @@ -960,6 +1271,13 @@ static int ext4_htree_next_block(struct
574          * block so no check is necessary
575          */
576         while (num_frames--) {
577 +               if (num_frames == 0) {
578 +                       /* it's not always necessary, we just don't want to
579 +                        * detect hash collision again */
580 +                       ext4_htree_dx_need_lock(lck);
581 +                       ext4_htree_dx_lock(lck, p->at);
582 +               }
583 +
584                 bh = ext4_read_dirblock(dir, dx_get_block(p->at), INDEX);
585                 if (IS_ERR(bh))
586                         return PTR_ERR(bh);
587 @@ -968,6 +1286,7 @@ static int ext4_htree_next_block(struct
588                 p->bh = bh;
589                 p->at = p->entries = ((struct dx_node *) bh->b_data)->entries;
590         }
591 +       ext4_htree_de_lock(lck, p->at);
592         return 1;
593  }
594  
595 @@ -1115,10 +1434,10 @@ int ext4_htree_fill_tree(struct file *di
596         }
597         hinfo.hash = start_hash;
598         hinfo.minor_hash = 0;
599 -       frame = dx_probe(NULL, dir, &hinfo, frames);
600 +       /* assume it's PR locked */
601 +       frame = dx_probe(NULL, dir, &hinfo, frames, NULL);
602         if (IS_ERR(frame))
603                 return PTR_ERR(frame);
604 -
605         /* Add '.' and '..' from the htree header */
606         if (!start_hash && !start_minor_hash) {
607                 de = (struct ext4_dir_entry_2 *) frames[0].bh->b_data;
608 @@ -1158,7 +1477,7 @@ int ext4_htree_fill_tree(struct file *di
609                 count += ret;
610                 hashval = ~0;
611                 ret = ext4_htree_next_block(dir, HASH_NB_ALWAYS,
612 -                                           frame, frames, &hashval);
613 +                                           frame, frames, &hashval, NULL);
614                 *next_hash = hashval;
615                 if (ret < 0) {
616                         err = ret;
617 @@ -1350,10 +1669,10 @@ static int is_dx_internal_node(struct in
618   * The returned buffer_head has ->b_count elevated.  The caller is expected
619   * to brelse() it when appropriate.
620   */
621 -static struct buffer_head * ext4_find_entry (struct inode *dir,
622 +struct buffer_head *__ext4_find_entry(struct inode *dir,
623                                         const struct qstr *d_name,
624                                         struct ext4_dir_entry_2 **res_dir,
625 -                                       int *inlined)
626 +                                       int *inlined, struct htree_lock *lck)
627  {
628         struct super_block *sb;
629         struct buffer_head *bh_use[NAMEI_RA_SIZE];
630 @@ -1403,7 +1722,7 @@ static struct buffer_head * ext4_find_en
631                 goto restart;
632         }
633         if (is_dx(dir)) {
634 -               ret = ext4_dx_find_entry(dir, &fname, res_dir);
635 +               ret = ext4_dx_find_entry(dir, &fname, res_dir, lck);
636                 /*
637                  * On success, or if the error was file not found,
638                  * return.  Otherwise, fall back to doing a search the
639 @@ -1413,6 +1732,7 @@ static struct buffer_head * ext4_find_en
640                         goto cleanup_and_exit;
641                 dxtrace(printk(KERN_DEBUG "ext4_find_entry: dx failed, "
642                                "falling back\n"));
643 +               ext4_htree_safe_relock(lck);
644                 ret = NULL;
645         }
646         nblocks = dir->i_size >> EXT4_BLOCK_SIZE_BITS(sb);
647 @@ -1514,10 +1834,12 @@ cleanup_and_exit:
648         ext4_fname_free_filename(&fname);
649         return ret;
650  }
651 +EXPORT_SYMBOL(__ext4_find_entry);
652  
653  static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
654                         struct ext4_filename *fname,
655 -                       struct ext4_dir_entry_2 **res_dir)
656 +                       struct ext4_dir_entry_2 **res_dir,
657 +                       struct htree_lock *lck)
658  {
659         struct super_block * sb = dir->i_sb;
660         struct dx_frame frames[EXT4_HTREE_LEVEL], *frame;
661 @@ -1528,7 +1850,7 @@ static struct buffer_head * ext4_dx_find
662  #ifdef CONFIG_EXT4_FS_ENCRYPTION
663         *res_dir = NULL;
664  #endif
665 -       frame = dx_probe(fname, dir, NULL, frames);
666 +       frame = dx_probe(fname, dir, NULL, frames, lck);
667         if (IS_ERR(frame))
668                 return (struct buffer_head *) frame;
669         do {
670 @@ -1550,7 +1872,7 @@ static struct buffer_head * ext4_dx_find
671  
672                 /* Check to see if we should continue to search */
673                 retval = ext4_htree_next_block(dir, fname->hinfo.hash, frame,
674 -                                              frames, NULL);
675 +                                              frames, NULL, lck);
676                 if (retval < 0) {
677                         ext4_warning_inode(dir,
678                                 "error %d reading directory index block",
679 @@ -1735,8 +2057,9 @@ static struct ext4_dir_entry_2* dx_pack_
680   * Returns pointer to de in block into which the new entry will be inserted.
681   */
682  static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir,
683 -                       struct buffer_head **bh,struct dx_frame *frame,
684 -                       struct dx_hash_info *hinfo)
685 +                       struct buffer_head **bh, struct dx_frame *frames,
686 +                       struct dx_frame *frame, struct dx_hash_info *hinfo,
687 +                       struct htree_lock *lck)
688  {
689         unsigned blocksize = dir->i_sb->s_blocksize;
690         unsigned count, continued;
691 @@ -1798,8 +2121,14 @@ static struct ext4_dir_entry_2 *do_split
692                                         hash2, split, count-split));
693  
694         /* Fancy dance to stay within two buffers */
695 -       de2 = dx_move_dirents(data1, data2, map + split, count - split,
696 -                             blocksize);
697 +       if (hinfo->hash < hash2) {
698 +               de2 = dx_move_dirents(data1, data2, map + split,
699 +                                     count - split, blocksize);
700 +       } else {
701 +               /* make sure we will add entry to the same block which
702 +                * we have already locked */
703 +               de2 = dx_move_dirents(data1, data2, map, split, blocksize);
704 +       }
705         de = dx_pack_dirents(data1, blocksize);
706         de->rec_len = ext4_rec_len_to_disk(data1 + (blocksize - csum_size) -
707                                            (char *) de,
708 @@ -1820,12 +2149,21 @@ static struct ext4_dir_entry_2 *do_split
709         dxtrace(dx_show_leaf(dir, hinfo, (struct ext4_dir_entry_2 *) data2,
710                         blocksize, 1));
711  
712 -       /* Which block gets the new entry? */
713 -       if (hinfo->hash >= hash2) {
714 -               swap(*bh, bh2);
715 -               de = de2;
716 +       ext4_htree_spin_lock(lck, frame > frames ? (frame - 1)->at : NULL,
717 +                            frame->at); /* notify block is being split */
718 +       if (hinfo->hash < hash2) {
719 +               dx_insert_block(frame, hash2 + continued, newblock);
720 +
721 +       } else {
722 +               /* switch block number */
723 +               dx_insert_block(frame, hash2 + continued,
724 +                               dx_get_block(frame->at));
725 +               dx_set_block(frame->at, newblock);
726 +               (frame->at)++;
727         }
728 -       dx_insert_block(frame, hash2 + continued, newblock);
729 +       ext4_htree_spin_unlock(lck);
730 +       ext4_htree_dx_unlock(lck);
731 +
732         err = ext4_handle_dirty_dirent_node(handle, dir, bh2);
733         if (err)
734                 goto journal_error;
735 @@ -2099,7 +2437,7 @@ static int make_indexed_dir(handle_t *ha
736         if (retval)
737                 goto out_frames;        
738  
739 -       de = do_split(handle,dir, &bh2, frame, &fname->hinfo);
740 +       de = do_split(handle, dir, &bh2, frames, frame, &fname->hinfo, NULL);
741         if (IS_ERR(de)) {
742                 retval = PTR_ERR(de);
743                 goto out_frames;
744 @@ -2209,8 +2547,8 @@ out:
745   * may not sleep between calling this and putting something into
746   * the entry, as someone else might have used it while you slept.
747   */
748 -static int ext4_add_entry(handle_t *handle, struct dentry *dentry,
749 -                         struct inode *inode)
750 +int __ext4_add_entry(handle_t *handle, struct dentry *dentry,
751 +                     struct inode *inode, struct htree_lock *lck)
752  {
753         struct inode *dir = d_inode(dentry->d_parent);
754         struct buffer_head *bh = NULL;
755 @@ -2251,7 +2589,7 @@ static int ext4_add_entry(handle_t *hand
756                 if (dentry->d_name.len == 2 &&
757                     memcmp(dentry->d_name.name, "..", 2) == 0)
758                         return ext4_update_dotdot(handle, dentry, inode);
759 -               retval = ext4_dx_add_entry(handle, &fname, dir, inode);
760 +               retval = ext4_dx_add_entry(handle, &fname, dir, inode, lck);
761                 if (!retval || (retval != ERR_BAD_DX_DIR))
762                         goto out;
763                 /* Can we just ignore htree data? */
764 @@ -2261,6 +2599,7 @@ static int ext4_add_entry(handle_t *hand
765                         retval = -EFSCORRUPTED;
766                         goto out;
767                 }
768 +               ext4_htree_safe_relock(lck);
769                 ext4_clear_inode_flag(dir, EXT4_INODE_INDEX);
770                 dx_fallback++;
771                 ext4_mark_inode_dirty(handle, dir);
772 @@ -2310,12 +2649,14 @@ out:
773                 ext4_set_inode_state(inode, EXT4_STATE_NEWENTRY);
774         return retval;
775  }
776 +EXPORT_SYMBOL(__ext4_add_entry);
777  
778  /*
779   * Returns 0 for success, or a negative error value
780   */
781  static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
782 -                            struct inode *dir, struct inode *inode)
783 +                            struct inode *dir, struct inode *inode,
784 +                            struct htree_lock *lck)
785  {
786         struct dx_frame frames[EXT4_HTREE_LEVEL], *frame;
787         struct dx_entry *entries, *at;
788 @@ -2327,7 +2668,7 @@ static int ext4_dx_add_entry(handle_t *h
789  
790  again:
791         restart = 0;
792 -       frame = dx_probe(fname, dir, NULL, frames);
793 +       frame = dx_probe(fname, dir, NULL, frames, lck);
794         if (IS_ERR(frame))
795                 return PTR_ERR(frame);
796         entries = frame->entries;
797 @@ -2362,6 +2703,11 @@ again:
798                 struct dx_node *node2;
799                 struct buffer_head *bh2;
800  
801 +               if (!ext4_htree_safe_locked(lck)) { /* retry with EX lock */
802 +                       ext4_htree_safe_relock(lck);
803 +                       restart = 1;
804 +                       goto cleanup;
805 +               }
806                 while (frame > frames) {
807                         if (dx_get_count((frame - 1)->entries) <
808                             dx_get_limit((frame - 1)->entries)) {
809 @@ -2465,8 +2811,32 @@ again:
810                         restart = 1;
811                         goto journal_error;
812                 }
813 +       } else if (!ext4_htree_dx_locked(lck)) {
814 +               struct ext4_dir_lock_data *ld = ext4_htree_lock_data(lck);
815 +
816 +               /* not well protected, require DX lock */
817 +               ext4_htree_dx_need_lock(lck);
818 +               at = frame > frames ? (frame - 1)->at : NULL;
819 +
820 +               /* NB: no risk of deadlock because it's just a try.
821 +                *
822 +                * NB: we check ld_count for twice, the first time before
823 +                * having DX lock, the second time after holding DX lock.
824 +                *
825 +                * NB: We never free blocks for directory so far, which
826 +                * means value returned by dx_get_count() should equal to
827 +                * ld->ld_count if nobody split any DE-block under @at,
828 +                * and ld->ld_at still points to valid dx_entry. */
829 +               if ((ld->ld_count != dx_get_count(entries)) ||
830 +                   !ext4_htree_dx_lock_try(lck, at) ||
831 +                   (ld->ld_count != dx_get_count(entries))) {
832 +                       restart = 1;
833 +                       goto cleanup;
834 +               }
835 +               /* OK, I've got DX lock and nothing changed */
836 +               frame->at = ld->ld_at;
837         }
838 -       de = do_split(handle, dir, &bh, frame, &fname->hinfo);
839 +       de = do_split(handle, dir, &bh, frames, frame, &fname->hinfo, lck);
840         if (IS_ERR(de)) {
841                 err = PTR_ERR(de);
842                 goto cleanup;
843 @@ -2477,6 +2847,8 @@ again:
844  journal_error:
845         ext4_std_error(dir->i_sb, err); /* this is a no-op if err == 0 */
846  cleanup:
847 +       ext4_htree_dx_unlock(lck);
848 +       ext4_htree_de_unlock(lck);
849         brelse(bh);
850         dx_release(frames);
851         /* @restart is true means htree-path has been changed, we need to
852 --- a/fs/ext4/super.c
853 +++ b/fs/ext4/super.c
854 @@ -989,6 +989,7 @@ static struct inode *ext4_alloc_inode(st
855  
856         inode_set_iversion(&ei->vfs_inode, 1);
857         spin_lock_init(&ei->i_raw_lock);
858 +       sema_init(&ei->i_append_sem, 1);
859         INIT_LIST_HEAD(&ei->i_prealloc_list);
860         spin_lock_init(&ei->i_prealloc_lock);
861         ext4_es_init_tree(&ei->i_es_tree);