unsigned long lg_gid;
};
+/* See comment on trunc_sem_down_read_nowait */
+struct ll_trunc_sem {
+ /* when positive, this is a count of readers, when -1, it indicates
+ * the semaphore is held for write, and 0 is unlocked
+ */
+ atomic_t ll_trunc_readers;
+ /* this tracks a count of waiting writers */
+ atomic_t ll_trunc_waiters;
+};
+
struct ll_inode_info {
__u32 lli_inode_magic;
spinlock_t lli_lock;
struct {
struct mutex lli_size_mutex;
char *lli_symlink_name;
- /*
- * struct rw_semaphore {
- * signed long count; // align d.d_def_acl
- * spinlock_t wait_lock; // align d.d_sa_lock
- * struct list_head wait_list;
- * }
- */
- struct rw_semaphore lli_trunc_sem;
+ struct ll_trunc_sem lli_trunc_sem;
struct range_lock_tree lli_write_tree;
struct rw_semaphore lli_glimpse_sem;
struct list_head lli_xattrs; /* ll_xattr_entry->xe_list */
};
+static inline void ll_trunc_sem_init(struct ll_trunc_sem *sem)
+{
+ atomic_set(&sem->ll_trunc_readers, 0);
+ atomic_set(&sem->ll_trunc_waiters, 0);
+}
+
+/* This version of down read ignores waiting writers, meaning if the semaphore
+ * is already held for read, this down_read will 'join' that reader and also
+ * take the semaphore.
+ *
+ * This lets us avoid an unusual deadlock.
+ *
+ * We must take lli_trunc_sem in read mode on entry in to various i/o paths
+ * in Lustre, in order to exclude truncates. Some of these paths then need to
+ * take the mmap_sem, while still holding the trunc_sem. The problem is that
+ * page faults hold the mmap_sem when calling in to Lustre, and then must also
+ * take the trunc_sem to exclude truncate.
+ *
+ * This means the locking order for trunc_sem and mmap_sem is sometimes AB,
+ * sometimes BA. This is almost OK because in both cases, we take the trunc
+ * sem for read, so it doesn't block.
+ *
+ * However, if a write mode user (truncate, a setattr op) arrives in the
+ * middle of this, the second reader on the truncate_sem will wait behind that
+ * writer.
+ *
+ * So we have, on our truncate sem, in order (where 'reader' and 'writer' refer
+ * to the mode in which they take the semaphore):
+ * reader (holding mmap_sem, needs truncate_sem)
+ * writer
+ * reader (holding truncate sem, waiting for mmap_sem)
+ *
+ * And so the readers deadlock.
+ *
+ * The solution is this modified semaphore, where this down_read ignores
+ * waiting write operations, and all waiters are woken up at once, so readers
+ * using down_read_nowait cannot get stuck behind waiting writers, regardless
+ * of the order they arrived in.
+ *
+ * down_read_nowait is only used in the page fault case, where we already hold
+ * the mmap_sem. This is because otherwise repeated read and write operations
+ * (which take the truncate sem) could prevent a truncate from ever starting.
+ * This could still happen with page faults, but without an even more complex
+ * mechanism, this is unavoidable.
+ *
+ * LU-12460
+ */
+static inline void trunc_sem_down_read_nowait(struct ll_trunc_sem *sem)
+{
+ wait_var_event(&sem->ll_trunc_readers,
+ atomic_inc_unless_negative(&sem->ll_trunc_readers));
+}
+
+static inline void trunc_sem_down_read(struct ll_trunc_sem *sem)
+{
+ wait_var_event(&sem->ll_trunc_readers,
+ atomic_read(&sem->ll_trunc_waiters) == 0 &&
+ atomic_inc_unless_negative(&sem->ll_trunc_readers));
+}
+
+static inline void trunc_sem_up_read(struct ll_trunc_sem *sem)
+{
+ if (atomic_dec_return(&sem->ll_trunc_readers) == 0 &&
+ atomic_read(&sem->ll_trunc_waiters))
+ wake_up_var(&sem->ll_trunc_readers);
+}
+
+static inline void trunc_sem_down_write(struct ll_trunc_sem *sem)
+{
+ atomic_inc(&sem->ll_trunc_waiters);
+ wait_var_event(&sem->ll_trunc_readers,
+ atomic_cmpxchg(&sem->ll_trunc_readers, 0, -1) == 0);
+ atomic_dec(&sem->ll_trunc_waiters);
+}
+
+static inline void trunc_sem_up_write(struct ll_trunc_sem *sem)
+{
+ atomic_set(&sem->ll_trunc_readers, 0);
+ wake_up_var(&sem->ll_trunc_readers);
+}
+
static inline __u32 ll_layout_version_get(struct ll_inode_info *lli)
{
__u32 gen;
struct ll_inode_info *lli = ll_i2info(inode);
if (cl_io_is_trunc(io)) {
- down_write(&lli->lli_trunc_sem);
+ trunc_sem_down_write(&lli->lli_trunc_sem);
inode_lock(inode);
inode_dio_wait(inode);
} else {
vvp_do_vmtruncate(inode, io->u.ci_setattr.sa_attr.lvb_size);
inode_dio_write_done(inode);
inode_unlock(inode);
- up_write(&lli->lli_trunc_sem);
+ trunc_sem_up_write(&lli->lli_trunc_sem);
} else {
inode_unlock(inode);
}
pos, pos + cnt);
if (vio->vui_io_subtype == IO_NORMAL)
- down_read(&lli->lli_trunc_sem);
+ trunc_sem_down_read(&lli->lli_trunc_sem);
if (io->ci_async_readahead) {
file_accessed(file);
ENTRY;
if (vio->vui_io_subtype == IO_NORMAL)
- down_read(&lli->lli_trunc_sem);
+ trunc_sem_down_read(&lli->lli_trunc_sem);
if (!can_populate_pages(env, io, inode))
RETURN(0);
struct ll_inode_info *lli = ll_i2info(inode);
if (vio->vui_io_subtype == IO_NORMAL)
- up_read(&lli->lli_trunc_sem);
+ trunc_sem_up_read(&lli->lli_trunc_sem);
}
static int vvp_io_kernel_fault(struct vvp_fault_io *cfio)
pgoff_t last_index;
ENTRY;
- down_read(&lli->lli_trunc_sem);
+ trunc_sem_down_read_nowait(&lli->lli_trunc_sem);
/* offset of the last byte on the page */
offset = cl_offset(obj, fio->ft_index + 1) - 1;
CLOBINVRNT(env, ios->cis_io->ci_obj,
vvp_object_invariant(ios->cis_io->ci_obj));
- up_read(&lli->lli_trunc_sem);
+ trunc_sem_up_read(&lli->lli_trunc_sem);
}
static int vvp_io_fsync_start(const struct lu_env *env,