struct ll_dio_pages csd_dio_pages;
struct iov_iter csd_iter;
struct cl_iter_dup csd_dup;
+ spinlock_t csd_lock;
unsigned csd_creator_free:1,
csd_write:1,
- csd_unaligned:1;
+ csd_unaligned:1,
+ csd_write_copied:1;
};
static inline u64 cl_io_nob_aligned(u64 off, u32 nob, u32 pgsz)
struct list_head oo_hp_exts; /* list of hp extents */
struct list_head oo_urgent_exts; /* list of writeback extents */
struct list_head oo_full_exts;
+ struct list_head oo_dio_exts;
struct list_head oo_reading_exts;
unsigned int oe_nr_pages;
/** list of pending oap pages. Pages in this list are NOT sorted. */
struct list_head oe_pages;
+ struct cl_sub_dio *oe_csd;
/** start and end index of this extent, include start and end
* themselves. Page offset here is the page index of osc_pages.
* oe_start is used as keyword for red-black tree.
GOTO(out, result);
}
- if (unaligned && rw == WRITE) {
- result = ll_dio_user_copy(sdio);
- if (unlikely(result <= 0)) {
- cl_sync_io_note(env, &sdio->csd_sync, result);
- if (sync_submit) {
- LASSERT(sdio->csd_creator_free);
- cl_sub_dio_free(sdio);
- }
- GOTO(out, result);
- }
- }
-
result = ll_direct_rw_pages(env, io, count, rw, inode, sdio);
/* if the i/o was unsuccessful, we zero the number of bytes to
* copy back. Note that partial I/O completion isn't possible
sdio->csd_creator_free = sync;
sdio->csd_write = write;
sdio->csd_unaligned = unaligned;
+ spin_lock_init(&sdio->csd_lock);
atomic_add(1, &ll_aio->cda_sync.csi_sync_nr);
#endif
/* copy IO data to/from internal buffer and userspace iovec */
-ssize_t ll_dio_user_copy(struct cl_sub_dio *sdio)
+ssize_t __ll_dio_user_copy(struct cl_sub_dio *sdio)
{
struct iov_iter *iter = &sdio->csd_iter;
struct ll_dio_pages *pvec = &sdio->csd_dio_pages;
* Also, if mm == current->mm, that means this is being handled in the
* thread which created it, and not in a separate kthread - so it is
* unnecessary (and incorrect) to do a use_mm here
+ *
+ * assert that if we have an mm and it's not ours, we're doing this
+ * copying from a kernel thread - otherwise kthread_use_mm will happily
+ * trash memory and crash later
*/
if (mm && mm != current->mm) {
+ LASSERT(current->flags & PF_KTHREAD);
kthread_use_mm(mm);
mm_used = true;
}
/* the total bytes copied, or status */
RETURN(original_count - count ? original_count - count : status);
}
+
+struct dio_user_copy_data {
+ struct cl_sub_dio *ducd_sdio;
+ struct completion ducd_completion;
+ ssize_t ducd_result;
+};
+
+int ll_dio_user_copy_helper(void *data)
+{
+ struct dio_user_copy_data *ducd = data;
+ struct cl_sub_dio *sdio = ducd->ducd_sdio;
+
+ ducd->ducd_result = __ll_dio_user_copy(sdio);
+ complete(&ducd->ducd_completion);
+
+ return 0;
+}
+
+ssize_t ll_dio_user_copy(struct cl_sub_dio *sdio)
+{
+ struct dio_user_copy_data ducd;
+ struct task_struct *kthread;
+
+ /* normal case - copy is being done by ptlrpcd */
+ if (current->flags & PF_KTHREAD ||
+ /* for non-parallel DIO, the submitting thread does the copy */
+ sdio->csd_ll_aio->cda_mm == current->mm)
+ return __ll_dio_user_copy(sdio);
+
+ /* this is a slightly unfortunate workaround; when doing an fsync, a
+ * user thread may pick up a DIO extent which is about to be written
+ * out. we can't just ignore these, but we also can't handle them from
+ * the user thread, since user threads can't do data copying from
+ * another thread's memory.
+ *
+ * so we spawn a kthread to handle this case.
+ * this will be rare and is not a 'hot path', so the performance
+ * cost doesn't matter
+ */
+ init_completion(&ducd.ducd_completion);
+ ducd.ducd_sdio = sdio;
+
+ kthread = kthread_run(ll_dio_user_copy_helper, &ducd,
+ "ll_ucp_%u", current->pid);
+ if (IS_ERR_OR_NULL(kthread))
+ return PTR_ERR(kthread);
+ wait_for_completion(&ducd.ducd_completion);
+
+ return ducd.ducd_result;
+}
EXPORT_SYMBOL(ll_dio_user_copy);
/**
const char *const lustre_reserved[] = { "ll_ping", "ptlrpc",
"ldlm", "ll_sa", "kworker",
"kswapd", "writeback", "irq",
- "ksoftirq", NULL };
+ "ksoftirq", "ll_ucp", NULL };
int i;
if (jobid[0] == '\0')
static int jobid_print_current_comm(char *jobid, ssize_t joblen)
{
const char *const names[] = {"kworker", "kswapd", "ll_sa", "ll_agl",
- "ldlm_bl", NULL};
+ "ldlm_bl", "ll_ucp", NULL};
int i;
if (current->flags & PF_KTHREAD) {
ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
ext->oe_dio = !!(brw_flags & OBD_BRW_NOCACHE);
+ if (ext->oe_dio) {
+ struct cl_sync_io *anchor;
+ struct cl_page *clpage;
+
+ oap = list_first_entry(list, struct osc_async_page,
+ oap_pending_item);
+ clpage = oap2cl_page(oap);
+ LASSERT(clpage->cp_type == CPT_TRANSIENT);
+ anchor = clpage->cp_sync_io;
+ ext->oe_csd = anchor->csi_dio_aio;
+ }
oscl = oio->oi_write_osclock ? : oio->oi_read_osclock;
if (oscl && oscl->ols_dlmlock != NULL) {
ext->oe_dlmlock = LDLM_LOCK_GET(oscl->ols_dlmlock);
/* add pages into rpc_list to build BRW rpc */
list_for_each_entry(ext, ext_list, oe_link) {
+ struct cl_sub_dio *sdio = ext->oe_csd;
+
LASSERT(ext->oe_state == OES_RPC);
mem_tight |= ext->oe_memalloc;
grant += ext->oe_grants;
layout_version = max(layout_version, ext->oe_layout_version);
if (obj == NULL)
obj = ext->oe_obj;
+
+ /* for unaligned writes, we do the data copying here */
+ if (sdio && sdio->csd_unaligned && sdio->csd_write &&
+ !sdio->csd_write_copied) {
+ /* note a single sdio can correspond to multiple RPCs,
+ * so we use this lock to ensure the data copy is only
+ * done once (an sdio can also correspond to multiple
+ * extents, which is also handled by this)
+ */
+ spin_lock(&sdio->csd_lock);
+ if (!sdio->csd_write_copied) {
+ rc = ll_dio_user_copy(sdio);
+ if (rc <= 0) {
+ spin_unlock(&sdio->csd_lock);
+ GOTO(out, rc);
+ }
+ sdio->csd_write_copied = true;
+ }
+ spin_unlock(&sdio->csd_lock);
+ }
}
soft_sync = osc_over_unstable_soft_limit(cli);