Whamcloud - gitweb
LU-12353 ldiskfs: speedup quota journalling
[fs/lustre-release.git] / ldiskfs / kernel_patches / patches / rhel7.6 / ext4-pdirop.patch
1 Single directory performance is a critical for HPC workloads. In a
2 typical use case an application creates a separate output file for
3 each node and task in a job. As nodes and tasks increase, hundreds
4 of thousands of files may be created in a single directory within
5 a short window of time.
6 Today, both filename lookup and file system modifying operations
7 (such as create and unlink) are protected with a single lock for
8 an entire ldiskfs directory. PDO project will remove this
9 bottleneck by introducing a parallel locking mechanism for entire
10 ldiskfs directories. This work will enable multiple application
11 threads to simultaneously lookup, create and unlink in parallel.
12
13 This patch contains:
14  - pdirops support for ldiskfs
15  - integrate with osd-ldiskfs
16
17 Index: linux-3.10.0-229.1.2.fc21.x86_64/fs/ext4/Makefile
18 ===================================================================
19 --- linux-3.10.0-229.1.2.fc21.x86_64.orig/fs/ext4/Makefile
20 +++ linux-3.10.0-229.1.2.fc21.x86_64/fs/ext4/Makefile
21 @@ -6,6 +6,7 @@ obj-$(CONFIG_EXT4_FS) += ext4.o
22  
23  ext4-y := balloc.o bitmap.o dir.o file.o fsync.o ialloc.o inode.o page-io.o \
24                 ioctl.o namei.o super.o symlink.o hash.o resize.o extents.o \
25 +               htree_lock.o \
26                 ext4_jbd2.o migrate.o mballoc.o block_validity.o move_extent.o \
27                 mmp.o indirect.o extents_status.o xattr.o xattr_user.o \
28                 xattr_trusted.o inline.o
29 Index: linux-3.10.0-229.1.2.fc21.x86_64/fs/ext4/ext4.h
30 ===================================================================
31 --- linux-3.10.0-229.1.2.fc21.x86_64.orig/fs/ext4/ext4.h
32 +++ linux-3.10.0-229.1.2.fc21.x86_64/fs/ext4/ext4.h
33 @@ -27,6 +27,7 @@
34  #include <linux/mutex.h>
35  #include <linux/timer.h>
36  #include <linux/wait.h>
37 +#include <linux/htree_lock.h>
38  #include <linux/blockgroup_lock.h>
39  #include <linux/percpu_counter.h>
40  #include <linux/ratelimit.h>
41 @@ -821,6 +822,9 @@ struct ext4_inode_info {
42         __u32   i_dtime;
43         ext4_fsblk_t    i_file_acl;
44  
45 +       /* following fields for parallel directory operations -bzzz */
46 +       struct semaphore i_append_sem;
47 +
48         /*
49          * i_block_group is the number of the block group which contains
50          * this file's inode.  Constant across the lifetime of the inode,
51 @@ -1846,6 +1850,71 @@ struct dx_hash_info
52   */
53  #define HASH_NB_ALWAYS         1
54  
55 +/* assume name-hash is protected by upper layer */
56 +#define EXT4_HTREE_LOCK_HASH   0
57 +
58 +enum ext4_pdo_lk_types {
59 +#if EXT4_HTREE_LOCK_HASH
60 +       EXT4_LK_HASH,
61 +#endif
62 +       EXT4_LK_DX,             /* index block */
63 +       EXT4_LK_DE,             /* directory entry block */
64 +       EXT4_LK_SPIN,           /* spinlock */
65 +       EXT4_LK_MAX,
66 +};
67 +
68 +/* read-only bit */
69 +#define EXT4_LB_RO(b)          (1 << (b))
70 +/* read + write, high bits for writer */
71 +#define EXT4_LB_RW(b)          ((1 << (b)) | (1 << (EXT4_LK_MAX + (b))))
72 +
73 +enum ext4_pdo_lock_bits {
74 +       /* DX lock bits */
75 +       EXT4_LB_DX_RO           = EXT4_LB_RO(EXT4_LK_DX),
76 +       EXT4_LB_DX              = EXT4_LB_RW(EXT4_LK_DX),
77 +       /* DE lock bits */
78 +       EXT4_LB_DE_RO           = EXT4_LB_RO(EXT4_LK_DE),
79 +       EXT4_LB_DE              = EXT4_LB_RW(EXT4_LK_DE),
80 +       /* DX spinlock bits */
81 +       EXT4_LB_SPIN_RO         = EXT4_LB_RO(EXT4_LK_SPIN),
82 +       EXT4_LB_SPIN            = EXT4_LB_RW(EXT4_LK_SPIN),
83 +       /* accurate searching */
84 +       EXT4_LB_EXACT           = EXT4_LB_RO(EXT4_LK_MAX << 1),
85 +};
86 +
87 +enum ext4_pdo_lock_opc {
88 +       /* external */
89 +       EXT4_HLOCK_READDIR      = (EXT4_LB_DE_RO | EXT4_LB_DX_RO),
90 +       EXT4_HLOCK_LOOKUP       = (EXT4_LB_DE_RO | EXT4_LB_SPIN_RO |
91 +                                  EXT4_LB_EXACT),
92 +       EXT4_HLOCK_DEL          = (EXT4_LB_DE | EXT4_LB_SPIN_RO |
93 +                                  EXT4_LB_EXACT),
94 +       EXT4_HLOCK_ADD          = (EXT4_LB_DE | EXT4_LB_SPIN_RO),
95 +
96 +       /* internal */
97 +       EXT4_HLOCK_LOOKUP_SAFE  = (EXT4_LB_DE_RO | EXT4_LB_DX_RO |
98 +                                  EXT4_LB_EXACT),
99 +       EXT4_HLOCK_DEL_SAFE     = (EXT4_LB_DE | EXT4_LB_DX_RO | EXT4_LB_EXACT),
100 +       EXT4_HLOCK_SPLIT        = (EXT4_LB_DE | EXT4_LB_DX | EXT4_LB_SPIN),
101 +};
102 +
103 +extern struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits);
104 +#define ext4_htree_lock_head_free(lhead)       htree_lock_head_free(lhead)
105 +
106 +extern struct htree_lock *ext4_htree_lock_alloc(void);
107 +#define ext4_htree_lock_free(lck)              htree_lock_free(lck)
108 +
109 +extern void ext4_htree_lock(struct htree_lock *lck,
110 +                           struct htree_lock_head *lhead,
111 +                           struct inode *dir, unsigned flags);
112 +#define ext4_htree_unlock(lck)                  htree_unlock(lck)
113 +
114 +extern struct buffer_head *__ext4_find_entry(struct inode *dir,
115 +                                       const struct qstr *d_name,
116 +                                       struct ext4_dir_entry_2 **res_dir,
117 +                                       int *inlined, struct htree_lock *lck);
118 +extern int __ext4_add_entry(handle_t *handle, struct dentry *dentry,
119 +                     struct inode *inode, struct htree_lock *lck);
120  
121  /*
122   * Describe an inode's exact location on disk and in memory
123 @@ -2088,9 +2157,17 @@ void ext4_insert_dentry(struct inode *in
124                         const char *name, int namelen, void *data);
125  static inline void ext4_update_dx_flag(struct inode *inode)
126  {
127 +       /* Disable it for ldiskfs, because going from a DX directory to
128 +        * a non-DX directory while it is in use will completely break
129 +        * the htree-locking.
130 +        * If we really want to support this operation in the future,
131 +        * we need to exclusively lock the directory at here which will
132 +        * increase complexity of code */
133 +#if 0
134         if (!EXT4_HAS_COMPAT_FEATURE(inode->i_sb,
135                                      EXT4_FEATURE_COMPAT_DIR_INDEX))
136                 ext4_clear_inode_flag(inode, EXT4_INODE_INDEX);
137 +#endif
138  }
139  static unsigned char ext4_filetype_table[] = {
140         DT_UNKNOWN, DT_REG, DT_DIR, DT_CHR, DT_BLK, DT_FIFO, DT_SOCK, DT_LNK
141 Index: linux-3.10.0-229.1.2.fc21.x86_64/fs/ext4/namei.c
142 ===================================================================
143 --- linux-3.10.0-229.1.2.fc21.x86_64.orig/fs/ext4/namei.c
144 +++ linux-3.10.0-229.1.2.fc21.x86_64/fs/ext4/namei.c
145 @@ -53,6 +53,7 @@ struct buffer_head *ext4_append(handle_t
146                                         ext4_lblk_t *block)
147  {
148         struct buffer_head *bh;
149 +       struct ext4_inode_info *ei = EXT4_I(inode);
150         int err = 0;
151  
152         if (unlikely(EXT4_SB(inode->i_sb)->s_max_dir_size_kb &&
153 @@ -60,15 +61,22 @@ struct buffer_head *ext4_append(handle_t
154                       EXT4_SB(inode->i_sb)->s_max_dir_size_kb)))
155                 return ERR_PTR(-ENOSPC);
156  
157 +       /* with parallel dir operations all appends
158 +       * have to be serialized -bzzz */
159 +       down(&ei->i_append_sem);
160 +
161         *block = inode->i_size >> inode->i_sb->s_blocksize_bits;
162  
163         bh = ext4_bread(handle, inode, *block, 1, &err);
164 -       if (!bh)
165 +       if (!bh) {
166 +               up(&ei->i_append_sem);
167                 return ERR_PTR(err);
168 +       }
169         inode->i_size += inode->i_sb->s_blocksize;
170         EXT4_I(inode)->i_disksize = inode->i_size;
171         BUFFER_TRACE(bh, "get_write_access");
172         err = ext4_journal_get_write_access(handle, bh);
173 +       up(&ei->i_append_sem);
174         if (err) {
175                 brelse(bh);
176                 ext4_std_error(inode->i_sb, err);
177 @@ -246,7 +254,7 @@ static struct dx_frame *dx_probe(const s
178                                  struct inode *dir,
179                                  struct dx_hash_info *hinfo,
180                                  struct dx_frame *frame,
181 -                                int *err);
182 +                                struct htree_lock *lck, int *err);
183  static void dx_release(struct dx_frame *frames);
184  static int dx_make_map(struct ext4_dir_entry_2 *de, unsigned blocksize,
185                        struct dx_hash_info *hinfo, struct dx_map_entry map[]);
186 @@ -259,13 +267,13 @@ static void dx_insert_block(struct dx_fr
187  static int ext4_htree_next_block(struct inode *dir, __u32 hash,
188                                  struct dx_frame *frame,
189                                  struct dx_frame *frames,
190 -                                __u32 *start_hash);
191 +                                __u32 *start_hash, struct htree_lock *lck);
192  static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
193                 const struct qstr *d_name,
194                 struct ext4_dir_entry_2 **res_dir,
195 -               int *err);
196 +               struct htree_lock *lck, int *err);
197  static int ext4_dx_add_entry(handle_t *handle, struct dentry *dentry,
198 -                            struct inode *inode);
199 +                            struct inode *inode, struct htree_lock *lck);
200  
201  /* checksumming functions */
202  void initialize_dirent_tail(struct ext4_dir_entry_tail *t,
203 @@ -668,6 +676,227 @@ struct stats dx_show_entries(struct dx_h
204  }
205  #endif /* DX_DEBUG */
206  
207 +/* private data for htree_lock */
208 +struct ext4_dir_lock_data {
209 +       unsigned                ld_flags;  /* bits-map for lock types */
210 +       unsigned                ld_count;  /* # entries of the last DX block */
211 +       struct dx_entry         ld_at_entry; /* copy of leaf dx_entry */
212 +       struct dx_entry         *ld_at;    /* position of leaf dx_entry */
213 +};
214 +
215 +#define ext4_htree_lock_data(l)        ((struct ext4_dir_lock_data *)(l)->lk_private)
216 +#define ext4_find_entry(dir, name, dirent, inline) \
217 +                       __ext4_find_entry(dir, name, dirent, inline, NULL)
218 +#define ext4_add_entry(handle, dentry, inode) \
219 +                       __ext4_add_entry(handle, dentry, inode, NULL)
220 +
221 +/* NB: ext4_lblk_t is 32 bits so we use high bits to identify invalid blk */
222 +#define EXT4_HTREE_NODE_CHANGED        (0xcafeULL << 32)
223 +
224 +static void ext4_htree_event_cb(void *target, void *event)
225 +{
226 +       u64 *block = (u64 *)target;
227 +
228 +       if (*block == dx_get_block((struct dx_entry *)event))
229 +               *block = EXT4_HTREE_NODE_CHANGED;
230 +}
231 +
232 +struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits)
233 +{
234 +       struct htree_lock_head *lhead;
235 +
236 +       lhead = htree_lock_head_alloc(EXT4_LK_MAX, hbits, 0);
237 +       if (lhead != NULL) {
238 +               htree_lock_event_attach(lhead, EXT4_LK_SPIN, HTREE_EVENT_WR,
239 +                                       ext4_htree_event_cb);
240 +       }
241 +       return lhead;
242 +}
243 +EXPORT_SYMBOL(ext4_htree_lock_head_alloc);
244 +
245 +struct htree_lock *ext4_htree_lock_alloc(void)
246 +{
247 +       return htree_lock_alloc(EXT4_LK_MAX,
248 +                               sizeof(struct ext4_dir_lock_data));
249 +}
250 +EXPORT_SYMBOL(ext4_htree_lock_alloc);
251 +
252 +static htree_lock_mode_t ext4_htree_mode(unsigned flags)
253 +{
254 +       switch (flags) {
255 +       default: /* 0 or unknown flags require EX lock */
256 +               return HTREE_LOCK_EX;
257 +       case EXT4_HLOCK_READDIR:
258 +               return HTREE_LOCK_PR;
259 +       case EXT4_HLOCK_LOOKUP:
260 +               return HTREE_LOCK_CR;
261 +       case EXT4_HLOCK_DEL:
262 +       case EXT4_HLOCK_ADD:
263 +               return HTREE_LOCK_CW;
264 +       }
265 +}
266 +
267 +/* return PR for read-only operations, otherwise return EX */
268 +static inline htree_lock_mode_t ext4_htree_safe_mode(unsigned flags)
269 +{
270 +       int writer = (flags & EXT4_LB_DE) == EXT4_LB_DE;
271 +
272 +       /* 0 requires EX lock */
273 +       return (flags == 0 || writer) ? HTREE_LOCK_EX : HTREE_LOCK_PR;
274 +}
275 +
276 +static int ext4_htree_safe_locked(struct htree_lock *lck)
277 +{
278 +       int writer;
279 +
280 +       if (lck == NULL || lck->lk_mode == HTREE_LOCK_EX)
281 +               return 1;
282 +
283 +       writer = (ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_DE) ==
284 +                EXT4_LB_DE;
285 +       if (writer) /* all readers & writers are excluded? */
286 +               return lck->lk_mode == HTREE_LOCK_EX;
287 +
288 +       /* all writers are excluded? */
289 +       return lck->lk_mode == HTREE_LOCK_PR ||
290 +              lck->lk_mode == HTREE_LOCK_PW ||
291 +              lck->lk_mode == HTREE_LOCK_EX;
292 +}
293 +
294 +/* relock htree_lock with EX mode if it's change operation, otherwise
295 + * relock it with PR mode. It's noop if PDO is disabled. */
296 +static void ext4_htree_safe_relock(struct htree_lock *lck)
297 +{
298 +       if (!ext4_htree_safe_locked(lck)) {
299 +               unsigned flags = ext4_htree_lock_data(lck)->ld_flags;
300 +
301 +               htree_change_lock(lck, ext4_htree_safe_mode(flags));
302 +       }
303 +}
304 +
305 +void ext4_htree_lock(struct htree_lock *lck, struct htree_lock_head *lhead,
306 +                    struct inode *dir, unsigned flags)
307 +{
308 +       htree_lock_mode_t mode = is_dx(dir) ? ext4_htree_mode(flags) :
309 +                                             ext4_htree_safe_mode(flags);
310 +
311 +       ext4_htree_lock_data(lck)->ld_flags = flags;
312 +       htree_lock(lck, lhead, mode);
313 +       if (!is_dx(dir))
314 +               ext4_htree_safe_relock(lck); /* make sure it's safe locked */
315 +}
316 +EXPORT_SYMBOL(ext4_htree_lock);
317 +
318 +static int ext4_htree_node_lock(struct htree_lock *lck, struct dx_entry *at,
319 +                               unsigned lmask, int wait, void *ev)
320 +{
321 +       u32     key = (at == NULL) ? 0 : dx_get_block(at);
322 +       u32     mode;
323 +
324 +       /* NOOP if htree is well protected or caller doesn't require the lock */
325 +       if (ext4_htree_safe_locked(lck) ||
326 +          !(ext4_htree_lock_data(lck)->ld_flags & lmask))
327 +               return 1;
328 +
329 +       mode = (ext4_htree_lock_data(lck)->ld_flags & lmask) == lmask ?
330 +               HTREE_LOCK_PW : HTREE_LOCK_PR;
331 +       while (1) {
332 +               if (htree_node_lock_try(lck, mode, key, ffz(~lmask), wait, ev))
333 +                       return 1;
334 +               if (!(lmask & EXT4_LB_SPIN)) /* not a spinlock */
335 +                       return 0;
336 +               cpu_relax(); /* spin until granted */
337 +       }
338 +}
339 +
340 +static int ext4_htree_node_locked(struct htree_lock *lck, unsigned lmask)
341 +{
342 +       return ext4_htree_safe_locked(lck) ||
343 +              htree_node_is_granted(lck, ffz(~lmask));
344 +}
345 +
346 +static void ext4_htree_node_unlock(struct htree_lock *lck,
347 +                                  unsigned lmask, void *buf)
348 +{
349 +       /* NB: it's safe to call mutiple times or even it's not locked */
350 +       if (!ext4_htree_safe_locked(lck) &&
351 +            htree_node_is_granted(lck, ffz(~lmask)))
352 +               htree_node_unlock(lck, ffz(~lmask), buf);
353 +}
354 +
355 +#define ext4_htree_dx_lock(lck, key)           \
356 +       ext4_htree_node_lock(lck, key, EXT4_LB_DX, 1, NULL)
357 +#define ext4_htree_dx_lock_try(lck, key)       \
358 +       ext4_htree_node_lock(lck, key, EXT4_LB_DX, 0, NULL)
359 +#define ext4_htree_dx_unlock(lck)              \
360 +       ext4_htree_node_unlock(lck, EXT4_LB_DX, NULL)
361 +#define ext4_htree_dx_locked(lck)              \
362 +       ext4_htree_node_locked(lck, EXT4_LB_DX)
363 +
364 +static void ext4_htree_dx_need_lock(struct htree_lock *lck)
365 +{
366 +       struct ext4_dir_lock_data *ld;
367 +
368 +       if (ext4_htree_safe_locked(lck))
369 +               return;
370 +
371 +       ld = ext4_htree_lock_data(lck);
372 +       switch (ld->ld_flags) {
373 +       default:
374 +               return;
375 +       case EXT4_HLOCK_LOOKUP:
376 +               ld->ld_flags = EXT4_HLOCK_LOOKUP_SAFE;
377 +               return;
378 +       case EXT4_HLOCK_DEL:
379 +               ld->ld_flags = EXT4_HLOCK_DEL_SAFE;
380 +               return;
381 +       case EXT4_HLOCK_ADD:
382 +               ld->ld_flags = EXT4_HLOCK_SPLIT;
383 +               return;
384 +       }
385 +}
386 +
387 +#define ext4_htree_de_lock(lck, key)           \
388 +       ext4_htree_node_lock(lck, key, EXT4_LB_DE, 1, NULL)
389 +#define ext4_htree_de_unlock(lck)              \
390 +       ext4_htree_node_unlock(lck, EXT4_LB_DE, NULL)
391 +
392 +#define ext4_htree_spin_lock(lck, key, event)  \
393 +       ext4_htree_node_lock(lck, key, EXT4_LB_SPIN, 0, event)
394 +#define ext4_htree_spin_unlock(lck)            \
395 +       ext4_htree_node_unlock(lck, EXT4_LB_SPIN, NULL)
396 +#define ext4_htree_spin_unlock_listen(lck, p)  \
397 +       ext4_htree_node_unlock(lck, EXT4_LB_SPIN, p)
398 +
399 +static void ext4_htree_spin_stop_listen(struct htree_lock *lck)
400 +{
401 +       if (!ext4_htree_safe_locked(lck) &&
402 +           htree_node_is_listening(lck, ffz(~EXT4_LB_SPIN)))
403 +               htree_node_stop_listen(lck, ffz(~EXT4_LB_SPIN));
404 +}
405 +
406 +enum {
407 +       DX_HASH_COL_IGNORE,     /* ignore collision while probing frames */
408 +       DX_HASH_COL_YES,        /* there is collision and it does matter */
409 +       DX_HASH_COL_NO,         /* there is no collision */
410 +};
411 +
412 +static int dx_probe_hash_collision(struct htree_lock *lck,
413 +                                  struct dx_entry *entries,
414 +                                  struct dx_entry *at, u32 hash)
415 +{
416 +       if (!(ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_EXACT)) {
417 +               return DX_HASH_COL_IGNORE; /* don't care about collision */
418 +
419 +       } else if (at == entries + dx_get_count(entries) - 1) {
420 +               return DX_HASH_COL_IGNORE; /* not in any leaf of this DX */
421 +
422 +       } else { /* hash collision? */
423 +               return ((dx_get_hash(at + 1) & ~1) == hash) ?
424 +                       DX_HASH_COL_YES : DX_HASH_COL_NO;
425 +       }
426 +}
427 +
428  /*
429   * Probe for a directory leaf block to search.
430   *
431 @@ -679,10 +908,11 @@ struct stats dx_show_entries(struct dx_h
432   */
433  static struct dx_frame *
434  dx_probe(const struct qstr *d_name, struct inode *dir,
435 -        struct dx_hash_info *hinfo, struct dx_frame *frame_in, int *err)
436 +        struct dx_hash_info *hinfo, struct dx_frame *frame_in,
437 +        struct htree_lock *lck, int *err)
438  {
439         unsigned count, indirect;
440 -       struct dx_entry *at, *entries, *p, *q, *m;
441 +       struct dx_entry *at, *entries, *p, *q, *m, *dx = NULL;
442         struct dx_root_info *info;
443         struct buffer_head *bh;
444         struct dx_frame *frame = frame_in;
445 @@ -750,8 +980,15 @@ dx_probe(const struct qstr *d_name, stru
446         dxtrace(printk("Look up %x", hash));
447         while (1)
448         {
449 +               if (indirect == 0) { /* the last index level */
450 +                       /* NB: ext4_htree_dx_lock() could be noop if
451 +                        * DX-lock flag is not set for current operation */
452 +                       ext4_htree_dx_lock(lck, dx);
453 +                       ext4_htree_spin_lock(lck, dx, NULL);
454 +               }
455                 count = dx_get_count(entries);
456 -               if (!count || count > dx_get_limit(entries)) {
457 +               if (count == 0 || count > dx_get_limit(entries)) {
458 +                       ext4_htree_spin_unlock(lck); /* release spin */
459                         ext4_warning(dir->i_sb,
460                                      "dx entry: no count or count > limit");
461                         brelse(bh);
462 @@ -792,7 +1029,70 @@ dx_probe(const struct qstr *d_name, stru
463                 frame->bh = bh;
464                 frame->entries = entries;
465                 frame->at = at;
466 -               if (!indirect--) return frame;
467 +
468 +               if (indirect == 0) { /* the last index level */
469 +                       struct ext4_dir_lock_data *ld;
470 +                       u64 myblock;
471 +
472 +                       /* By default we only lock DE-block, however, we will
473 +                        * also lock the last level DX-block if:
474 +                        * a) there is hash collision
475 +                        *    we will set DX-lock flag (a few lines below)
476 +                        *    and redo to lock DX-block
477 +                        *    see detail in dx_probe_hash_collision()
478 +                        * b) it's a retry from splitting
479 +                        *    we need to lock the last level DX-block so nobody
480 +                        *    else can split any leaf blocks under the same
481 +                        *    DX-block, see detail in ext4_dx_add_entry()
482 +                        */
483 +                       if (ext4_htree_dx_locked(lck)) {
484 +                               /* DX-block is locked, just lock DE-block
485 +                                * and return */
486 +                               ext4_htree_spin_unlock(lck);
487 +                               if (!ext4_htree_safe_locked(lck))
488 +                                       ext4_htree_de_lock(lck, frame->at);
489 +                               return frame;
490 +                       }
491 +                       /* it's pdirop and no DX lock */
492 +                       if (dx_probe_hash_collision(lck, entries, at, hash) ==
493 +                           DX_HASH_COL_YES) {
494 +                               /* found hash collision, set DX-lock flag
495 +                                * and retry to abtain DX-lock */
496 +                               ext4_htree_spin_unlock(lck);
497 +                               ext4_htree_dx_need_lock(lck);
498 +                               continue;
499 +                       }
500 +                       ld = ext4_htree_lock_data(lck);
501 +                       /* because I don't lock DX, so @at can't be trusted
502 +                        * after I release spinlock so I have to save it */
503 +                       ld->ld_at = at;
504 +                       ld->ld_at_entry = *at;
505 +                       ld->ld_count = dx_get_count(entries);
506 +
507 +                       frame->at = &ld->ld_at_entry;
508 +                       myblock = dx_get_block(at);
509 +
510 +                       /* NB: ordering locking */
511 +                       ext4_htree_spin_unlock_listen(lck, &myblock);
512 +                       /* other thread can split this DE-block because:
513 +                        * a) I don't have lock for the DE-block yet
514 +                        * b) I released spinlock on DX-block
515 +                        * if it happened I can detect it by listening
516 +                        * splitting event on this DE-block */
517 +                       ext4_htree_de_lock(lck, frame->at);
518 +                       ext4_htree_spin_stop_listen(lck);
519 +
520 +                       if (myblock == EXT4_HTREE_NODE_CHANGED) {
521 +                               /* someone split this DE-block before
522 +                                * I locked it, I need to retry and lock
523 +                                * valid DE-block */
524 +                               ext4_htree_de_unlock(lck);
525 +                               continue;
526 +                       }
527 +                       return frame;
528 +               }
529 +               dx = at;
530 +               indirect--;
531                 bh = ext4_read_dirblock(dir, dx_get_block(at), INDEX);
532                 if (IS_ERR(bh)) {
533                         *err = PTR_ERR(bh);
534 @@ -860,7 +1160,7 @@ static void dx_release (struct dx_frame
535  static int ext4_htree_next_block(struct inode *dir, __u32 hash,
536                                  struct dx_frame *frame,
537                                  struct dx_frame *frames,
538 -                                __u32 *start_hash)
539 +                                __u32 *start_hash, struct htree_lock *lck)
540  {
541         struct dx_frame *p;
542         struct buffer_head *bh;
543 @@ -875,12 +1175,22 @@ static int ext4_htree_next_block(struct
544          * this loop, num_frames indicates the number of interior
545          * nodes need to be read.
546          */
547 +       ext4_htree_de_unlock(lck);
548         while (1) {
549 -               if (++(p->at) < p->entries + dx_get_count(p->entries))
550 -                       break;
551 +               if (num_frames > 0 || ext4_htree_dx_locked(lck)) {
552 +                       /* num_frames > 0 :
553 +                        *   DX block
554 +                        * ext4_htree_dx_locked:
555 +                        *   frame->at is reliable pointer returned by dx_probe,
556 +                        *   otherwise dx_probe already knew no collision */
557 +                       if (++(p->at) < p->entries + dx_get_count(p->entries))
558 +                               break;
559 +               }
560                 if (p == frames)
561                         return 0;
562                 num_frames++;
563 +               if (num_frames == 1)
564 +                       ext4_htree_dx_unlock(lck);
565                 p--;
566         }
567  
568 @@ -903,6 +1213,13 @@ static int ext4_htree_next_block(struct
569          * block so no check is necessary
570          */
571         while (num_frames--) {
572 +               if (num_frames == 0) {
573 +                       /* it's not always necessary, we just don't want to
574 +                        * detect hash collision again */
575 +                       ext4_htree_dx_need_lock(lck);
576 +                       ext4_htree_dx_lock(lck, p->at);
577 +               }
578 +
579                 bh = ext4_read_dirblock(dir, dx_get_block(p->at), INDEX);
580                 if (IS_ERR(bh))
581                         return PTR_ERR(bh);
582 @@ -911,6 +1228,7 @@ static int ext4_htree_next_block(struct
583                 p->bh = bh;
584                 p->at = p->entries = ((struct dx_node *) bh->b_data)->entries;
585         }
586 +       ext4_htree_de_lock(lck, p->at);
587         return 1;
588  }
589  
590 @@ -1013,10 +1331,10 @@ int ext4_htree_fill_tree(struct file *di
591         }
592         hinfo.hash = start_hash;
593         hinfo.minor_hash = 0;
594 -       frame = dx_probe(NULL, dir, &hinfo, frames, &err);
595 +       /* assume it's PR locked */
596 +       frame = dx_probe(NULL, dir, &hinfo, frames, NULL, &err);
597         if (!frame)
598                 return err;
599 -
600         /* Add '.' and '..' from the htree header */
601         if (!start_hash && !start_minor_hash) {
602                 de = (struct ext4_dir_entry_2 *) frames[0].bh->b_data;
603 @@ -1043,7 +1361,7 @@ int ext4_htree_fill_tree(struct file *di
604                 count += ret;
605                 hashval = ~0;
606                 ret = ext4_htree_next_block(dir, HASH_NB_ALWAYS,
607 -                                           frame, frames, &hashval);
608 +                                           frame, frames, &hashval, NULL);
609                 *next_hash = hashval;
610                 if (ret < 0) {
611                         err = ret;
612 @@ -1236,10 +1554,10 @@ static int is_dx_internal_node(struct in
613   * The returned buffer_head has ->b_count elevated.  The caller is expected
614   * to brelse() it when appropriate.
615   */
616 -static struct buffer_head * ext4_find_entry (struct inode *dir,
617 +struct buffer_head *__ext4_find_entry(struct inode *dir,
618                                         const struct qstr *d_name,
619                                         struct ext4_dir_entry_2 **res_dir,
620 -                                       int *inlined)
621 +                                       int *inlined, struct htree_lock *lck)
622  {
623         struct super_block *sb;
624         struct buffer_head *bh_use[NAMEI_RA_SIZE];
625 @@ -1283,7 +1601,7 @@ static struct buffer_head * ext4_find_en
626                 goto restart;
627         }
628         if (is_dx(dir)) {
629 -               bh = ext4_dx_find_entry(dir, d_name, res_dir, &err);
630 +               bh = ext4_dx_find_entry(dir, d_name, res_dir, lck, &err);
631                 /*
632                  * On success, or if the error was file not found,
633                  * return.  Otherwise, fall back to doing a search the
634 @@ -1297,6 +1615,7 @@ static struct buffer_head * ext4_find_en
635                         return bh;
636                 dxtrace(printk(KERN_DEBUG "ext4_find_entry: dx failed, "
637                                "falling back\n"));
638 +               ext4_htree_safe_relock(lck);
639         }
640         nblocks = dir->i_size >> EXT4_BLOCK_SIZE_BITS(sb);
641         start = EXT4_I(dir)->i_dir_start_lookup;
642 @@ -1389,9 +1708,12 @@ cleanup_and_exit:
643                 brelse(bh_use[ra_ptr]);
644         return ret;
645  }
646 +EXPORT_SYMBOL(__ext4_find_entry);
647  
648 -static struct buffer_head * ext4_dx_find_entry(struct inode *dir, const struct qstr *d_name,
649 -                      struct ext4_dir_entry_2 **res_dir, int *err)
650 +static struct buffer_head *ext4_dx_find_entry(struct inode *dir,
651 +                               const struct qstr *d_name,
652 +                               struct ext4_dir_entry_2 **res_dir,
653 +                               struct htree_lock *lck, int *err)
654  {
655         struct super_block * sb = dir->i_sb;
656         struct dx_hash_info     hinfo;
657 @@ -1400,7 +1722,7 @@ static struct buffer_head * ext4_dx_find
658         ext4_lblk_t block;
659         int retval;
660  
661 -       if (!(frame = dx_probe(d_name, dir, &hinfo, frames, err)))
662 +       if (!(frame = dx_probe(d_name, dir, &hinfo, frames, lck, err)))
663                 return NULL;
664         do {
665                 block = dx_get_block(frame->at);
666 @@ -1424,7 +1746,7 @@ static struct buffer_head * ext4_dx_find
667  
668                 /* Check to see if we should continue to search */
669                 retval = ext4_htree_next_block(dir, hinfo.hash, frame,
670 -                                              frames, NULL);
671 +                                              frames, NULL, lck);
672                 if (retval < 0) {
673                         ext4_warning(sb,
674                              "error reading index page in directory #%lu",
675 @@ -1583,8 +1905,9 @@ static struct ext4_dir_entry_2* dx_pack_
676   * Returns pointer to de in block into which the new entry will be inserted.
677   */
678  static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir,
679 -                       struct buffer_head **bh,struct dx_frame *frame,
680 -                       struct dx_hash_info *hinfo, int *error)
681 +                       struct buffer_head **bh, struct dx_frame *frames,
682 +                       struct dx_frame *frame, struct dx_hash_info *hinfo,
683 +                       struct htree_lock *lck, int *error)
684  {
685         unsigned blocksize = dir->i_sb->s_blocksize;
686         unsigned count, continued;
687 @@ -1647,7 +1970,14 @@ static struct ext4_dir_entry_2 *do_split
688                                         hash2, split, count-split));
689  
690         /* Fancy dance to stay within two buffers */
691 -       de2 = dx_move_dirents(data1, data2, map + split, count - split, blocksize);
692 +       if (hinfo->hash < hash2) {
693 +               de2 = dx_move_dirents(data1, data2, map + split,
694 +                                     count - split, blocksize);
695 +       } else {
696 +               /* make sure we will add entry to the same block which
697 +                * we have already locked */
698 +               de2 = dx_move_dirents(data1, data2, map, split, blocksize);
699 +       }
700         de = dx_pack_dirents(data1, blocksize);
701         de->rec_len = ext4_rec_len_to_disk(data1 + (blocksize - csum_size) -
702                                            (char *) de,
703 @@ -1666,13 +1996,21 @@ static struct ext4_dir_entry_2 *do_split
704         dxtrace(dx_show_leaf (hinfo, (struct ext4_dir_entry_2 *) data1, blocksize, 1));
705         dxtrace(dx_show_leaf (hinfo, (struct ext4_dir_entry_2 *) data2, blocksize, 1));
706  
707 -       /* Which block gets the new entry? */
708 -       if (hinfo->hash >= hash2)
709 -       {
710 -               swap(*bh, bh2);
711 -               de = de2;
712 +       ext4_htree_spin_lock(lck, frame > frames ? (frame - 1)->at : NULL,
713 +                            frame->at); /* notify block is being split */
714 +       if (hinfo->hash < hash2) {
715 +               dx_insert_block(frame, hash2 + continued, newblock);
716 +
717 +       } else {
718 +               /* switch block number */
719 +               dx_insert_block(frame, hash2 + continued,
720 +                               dx_get_block(frame->at));
721 +               dx_set_block(frame->at, newblock);
722 +               (frame->at)++;
723         }
724 -       dx_insert_block(frame, hash2 + continued, newblock);
725 +       ext4_htree_spin_unlock(lck);
726 +       ext4_htree_dx_unlock(lck);
727 +
728         err = ext4_handle_dirty_dirent_node(handle, dir, bh2);
729         if (err)
730                 goto journal_error;
731 @@ -1945,7 +2283,7 @@ static int make_indexed_dir(handle_t *ha
732         if (retval)
733                 goto out_frames;
734  
735 -       de = do_split(handle,dir, &bh2, frame, &hinfo, &retval);
736 +       de = do_split(handle, dir, &bh2, frames, frame, &hinfo, NULL, &retval);
737         if (!de) {
738                 goto out_frames;
739         }
740 @@ -2051,8 +2389,8 @@ out:
741   * may not sleep between calling this and putting something into
742   * the entry, as someone else might have used it while you slept.
743   */
744 -static int ext4_add_entry(handle_t *handle, struct dentry *dentry,
745 -                         struct inode *inode)
746 +int __ext4_add_entry(handle_t *handle, struct dentry *dentry,
747 +                     struct inode *inode, struct htree_lock *lck)
748  {
749         struct inode *dir = dentry->d_parent->d_inode;
750         struct buffer_head *bh = NULL;
751 @@ -2087,9 +2425,10 @@ static int ext4_add_entry(handle_t *hand
752                 if (dentry->d_name.len == 2 &&
753                     memcmp(dentry->d_name.name, "..", 2) == 0)
754                         return ext4_update_dotdot(handle, dentry, inode);
755 -               retval = ext4_dx_add_entry(handle, dentry, inode);
756 +               retval = ext4_dx_add_entry(handle, dentry, inode, lck);
757                 if (!retval || (retval != ERR_BAD_DX_DIR))
758                         goto out;
759 +               ext4_htree_safe_relock(lck);
760                 ext4_clear_inode_flag(dir, EXT4_INODE_INDEX);
761                 dx_fallback++;
762                 ext4_mark_inode_dirty(handle, dir);
763 @@ -2129,12 +2468,13 @@ static int ext4_add_entry(handle_t *hand
764                 ext4_set_inode_state(inode, EXT4_STATE_NEWENTRY);
765         return retval;
766  }
767 +EXPORT_SYMBOL(__ext4_add_entry);
768  
769  /*
770   * Returns 0 for success, or a negative error value
771   */
772  static int ext4_dx_add_entry(handle_t *handle, struct dentry *dentry,
773 -                            struct inode *inode)
774 +                            struct inode *inode, struct htree_lock *lck)
775  {
776         struct dx_frame frames[EXT4_HTREE_LEVEL], *frame;
777         struct dx_entry *entries, *at;
778 @@ -2148,7 +2488,7 @@ static int ext4_dx_add_entry(handle_t *h
779  
780  again:
781         restart = 0;
782 -       frame = dx_probe(&dentry->d_name, dir, &hinfo, frames, &err);
783 +       frame = dx_probe(&dentry->d_name, dir, &hinfo, frames, lck, &err);
784         if (!frame)
785                 return err;
786         entries = frame->entries;
787 @@ -2178,6 +2518,11 @@ again:
788                 struct dx_node *node2;
789                 struct buffer_head *bh2;
790  
791 +               if (!ext4_htree_safe_locked(lck)) { /* retry with EX lock */
792 +                       ext4_htree_safe_relock(lck);
793 +                       restart = 1;
794 +                       goto cleanup;
795 +               }
796                 while (frame > frames) {
797                         if (dx_get_count((frame - 1)->entries) <
798                             dx_get_limit((frame - 1)->entries)) {
799 @@ -2277,16 +2622,43 @@ again:
800                         restart = 1;
801                         goto journal_error;
802                 }
803 +       } else if (!ext4_htree_dx_locked(lck)) {
804 +               struct ext4_dir_lock_data *ld = ext4_htree_lock_data(lck);
805 +
806 +               /* not well protected, require DX lock */
807 +               ext4_htree_dx_need_lock(lck);
808 +               at = frame > frames ? (frame - 1)->at : NULL;
809 +
810 +               /* NB: no risk of deadlock because it's just a try.
811 +                *
812 +                * NB: we check ld_count for twice, the first time before
813 +                * having DX lock, the second time after holding DX lock.
814 +                *
815 +                * NB: We never free blocks for directory so far, which
816 +                * means value returned by dx_get_count() should equal to
817 +                * ld->ld_count if nobody split any DE-block under @at,
818 +                * and ld->ld_at still points to valid dx_entry. */
819 +               if ((ld->ld_count != dx_get_count(entries)) ||
820 +                   !ext4_htree_dx_lock_try(lck, at) ||
821 +                   (ld->ld_count != dx_get_count(entries))) {
822 +                       restart = 1;
823 +                       goto cleanup;
824 +               }
825 +               /* OK, I've got DX lock and nothing changed */
826 +               frame->at = ld->ld_at;
827         }
828 -       de = do_split(handle, dir, &bh, frame, &hinfo, &err);
829 +       de = do_split(handle, dir, &bh, frames, frame, &hinfo, lck, &err);
830         if (!de)
831                 goto cleanup;
832 +
833         err = add_dirent_to_buf(handle, dentry, inode, de, bh);
834         goto cleanup;
835  
836  journal_error:
837         ext4_std_error(dir->i_sb, err); /* this is a no-op if err == 0 */
838  cleanup:
839 +       ext4_htree_dx_unlock(lck);
840 +       ext4_htree_de_unlock(lck);
841         brelse(bh);
842         dx_release(frames);
843         /* @restart is true means htree-path has been changed, we need to
844 Index: linux-3.10.0-229.1.2.fc21.x86_64/fs/ext4/super.c
845 ===================================================================
846 --- linux-3.10.0-229.1.2.fc21.x86_64.orig/fs/ext4/super.c
847 +++ linux-3.10.0-229.1.2.fc21.x86_64/fs/ext4/super.c
848 @@ -875,6 +875,7 @@ static struct inode *ext4_alloc_inode(st
849  
850         ei->vfs_inode.i_version = 1;
851         spin_lock_init(&ei->i_raw_lock);
852 +       sema_init(&ei->i_append_sem, 1);
853         INIT_LIST_HEAD(&ei->i_prealloc_list);
854         spin_lock_init(&ei->i_prealloc_lock);
855         ext4_es_init_tree(&ei->i_es_tree);