* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2016, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include <linux/sched.h>
#include <linux/list.h>
+#include <linux/list_sort.h>
#include <obd_class.h>
#include <obd_support.h>
#include <lustre_fid.h>
#include <cl_object.h>
#include "cl_internal.h"
-#include <lustre_compat.h>
/*****************************************************************************
*
* \pre iot == CIT_READ || iot == CIT_WRITE
*/
int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
- enum cl_io_type iot, loff_t pos, size_t count)
+ enum cl_io_type iot, loff_t pos, size_t count)
{
LINVRNT(iot == CIT_READ || iot == CIT_WRITE);
LINVRNT(io->ci_obj != NULL);
ENTRY;
- if (cfs_ptengine_weight(cl_io_engine) < 2)
- io->ci_pio = 0;
-
LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
- "io %s range: [%llu, %llu) %s %s %s %s\n",
- iot == CIT_READ ? "read" : "write",
- pos, pos + count,
- io->u.ci_rw.rw_nonblock ? "nonblock" : "block",
- io->u.ci_rw.rw_append ? "append" : "-",
- io->u.ci_rw.rw_sync ? "sync" : "-",
- io->ci_pio ? "pio" : "-");
-
- io->u.ci_rw.rw_range.cir_pos = pos;
- io->u.ci_rw.rw_range.cir_count = count;
-
+ "io range: %u [%llu, %llu) %u %u\n",
+ iot, (__u64)pos, (__u64)pos + count,
+ io->u.ci_rw.crw_nonblock, io->u.ci_wr.wr_append);
+ io->u.ci_rw.crw_pos = pos;
+ io->u.ci_rw.crw_count = count;
RETURN(cl_io_init(env, io, iot, io->ci_obj));
}
EXPORT_SYMBOL(cl_io_rw_init);
-static int cl_lock_descr_sort(const struct cl_lock_descr *d0,
- const struct cl_lock_descr *d1)
+static int cl_lock_descr_cmp(void *priv,
+ struct list_head *a, struct list_head *b)
{
+ const struct cl_io_lock_link *l0 = list_entry(a, struct cl_io_lock_link,
+ cill_linkage);
+ const struct cl_io_lock_link *l1 = list_entry(b, struct cl_io_lock_link,
+ cill_linkage);
+ const struct cl_lock_descr *d0 = &l0->cill_descr;
+ const struct cl_lock_descr *d1 = &l1->cill_descr;
+
return lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
lu_object_fid(&d1->cld_obj->co_lu));
}
-/*
- * Sort locks in lexicographical order of their (fid, start-offset) pairs.
- */
-static void cl_io_locks_sort(struct cl_io *io)
-{
- int done = 0;
-
- ENTRY;
- /* hidden treasure: bubble sort for now. */
- do {
- struct cl_io_lock_link *curr;
- struct cl_io_lock_link *prev;
- struct cl_io_lock_link *temp;
-
- done = 1;
- prev = NULL;
-
- list_for_each_entry_safe(curr, temp, &io->ci_lockset.cls_todo,
- cill_linkage) {
- if (prev != NULL) {
- switch (cl_lock_descr_sort(&prev->cill_descr,
- &curr->cill_descr)) {
- case 0:
- /*
- * IMPOSSIBLE: Identical locks are
- * already removed at
- * this point.
- */
- default:
- LBUG();
- case +1:
- list_move_tail(&curr->cill_linkage,
- &prev->cill_linkage);
- done = 0;
- continue; /* don't change prev: it's
- * still "previous" */
- case -1: /* already in order */
- break;
- }
- }
- prev = curr;
- }
- } while (!done);
- EXIT;
-}
-
static void cl_lock_descr_merge(struct cl_lock_descr *d0,
const struct cl_lock_descr *d1)
{
break;
}
if (result == 0) {
- cl_io_locks_sort(io);
+ /*
+ * Sort locks in lexicographical order of their (fid,
+ * start-offset) pairs to avoid deadlocks.
+ */
+ list_sort(NULL, &io->ci_lockset.cls_todo, cl_lock_descr_cmp);
result = cl_lockset_lock(env, io, &io->ci_lockset);
}
if (result != 0)
*/
void cl_io_iter_fini(const struct lu_env *env, struct cl_io *io)
{
- const struct cl_io_slice *scan;
+ const struct cl_io_slice *scan;
- LINVRNT(cl_io_is_loopable(io));
- LINVRNT(io->ci_state == CIS_UNLOCKED);
- LINVRNT(cl_io_invariant(io));
+ LINVRNT(cl_io_is_loopable(io));
+ LINVRNT(io->ci_state <= CIS_IT_STARTED ||
+ io->ci_state > CIS_IO_FINISHED);
+ LINVRNT(cl_io_invariant(io));
- ENTRY;
+ ENTRY;
list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
if (scan->cis_iop->op[io->ci_type].cio_iter_fini != NULL)
scan->cis_iop->op[io->ci_type].cio_iter_fini(env, scan);
}
- io->ci_state = CIS_IT_ENDED;
- EXIT;
+ io->ci_state = CIS_IT_ENDED;
+ EXIT;
}
EXPORT_SYMBOL(cl_io_iter_fini);
*/
void cl_io_rw_advance(const struct lu_env *env, struct cl_io *io, size_t nob)
{
- const struct cl_io_slice *scan;
+ const struct cl_io_slice *scan;
- LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
- nob == 0);
- LINVRNT(cl_io_is_loopable(io));
- LINVRNT(cl_io_invariant(io));
+ ENTRY;
- ENTRY;
+ LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
+ nob == 0);
+ LINVRNT(cl_io_is_loopable(io));
+ LINVRNT(cl_io_invariant(io));
- io->u.ci_rw.rw_range.cir_pos += nob;
- io->u.ci_rw.rw_range.cir_count -= nob;
+ io->u.ci_rw.crw_pos += nob;
+ io->u.ci_rw.crw_count -= nob;
- /* layers have to be notified. */
+ /* layers have to be notified. */
list_for_each_entry_reverse(scan, &io->ci_layers, cis_linkage) {
if (scan->cis_iop->op[io->ci_type].cio_advance != NULL)
scan->cis_iop->op[io->ci_type].cio_advance(env, scan,
nob);
}
- EXIT;
+ EXIT;
}
/**
const struct cl_io_slice *scan;
int result = 0;
- LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_FAULT);
+ LINVRNT(io->ci_type == CIT_READ ||
+ io->ci_type == CIT_FAULT ||
+ io->ci_type == CIT_WRITE);
LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
LINVRNT(cl_io_invariant(io));
ENTRY;
* \see cl_io_operations::cio_commit_async()
*/
int cl_io_commit_async(const struct lu_env *env, struct cl_io *io,
- struct cl_page_list *queue, int from, int to,
- cl_commit_cbt cb)
+ struct cl_page_list *queue, int from, int to,
+ cl_commit_cbt cb)
{
const struct cl_io_slice *scan;
int result = 0;
pg->cp_sync_io = anchor;
}
- cl_sync_io_init(anchor, queue->c2_qin.pl_nr, &cl_sync_io_end);
+ cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
rc = cl_io_submit_rw(env, io, iot, queue);
if (rc == 0) {
/*
return result;
}
-static
-struct cl_io_pt *cl_io_submit_pt(struct cl_io *io, loff_t pos, size_t count)
-{
- struct cl_io_pt *pt;
- int rc;
-
- OBD_ALLOC(pt, sizeof(*pt));
- if (pt == NULL)
- RETURN(ERR_PTR(-ENOMEM));
-
- pt->cip_next = NULL;
- init_sync_kiocb(&pt->cip_iocb, io->u.ci_rw.rw_file);
- pt->cip_iocb.ki_pos = pos;
-#ifdef HAVE_KIOCB_KI_LEFT
- pt->cip_iocb.ki_left = count;
-#elif defined(HAVE_KI_NBYTES)
- pt->cip_iocb.ki_nbytes = count;
-#endif
- pt->cip_iter = io->u.ci_rw.rw_iter;
- iov_iter_truncate(&pt->cip_iter, count);
- pt->cip_file = io->u.ci_rw.rw_file;
- pt->cip_iot = io->ci_type;
- pt->cip_pos = pos;
- pt->cip_count = count;
- pt->cip_result = 0;
-
- rc = cfs_ptask_init(&pt->cip_task, io->u.ci_rw.rw_ptask, pt,
- PTF_ORDERED | PTF_COMPLETE |
- PTF_USER_MM | PTF_RETRY, smp_processor_id());
- if (rc)
- GOTO(out_error, rc);
-
- CDEBUG(D_VFSTRACE, "submit %s range: [%llu, %llu)\n",
- io->ci_type == CIT_READ ? "read" : "write",
- pos, pos + count);
-
- rc = cfs_ptask_submit(&pt->cip_task, cl_io_engine);
- if (rc)
- GOTO(out_error, rc);
-
- RETURN(pt);
-
-out_error:
- OBD_FREE(pt, sizeof(*pt));
- RETURN(ERR_PTR(rc));
-}
-
/**
* Main io loop.
*
*/
int cl_io_loop(const struct lu_env *env, struct cl_io *io)
{
- struct cl_io_pt *pt = NULL, *head = NULL;
- struct cl_io_pt **tail = &head;
- loff_t pos;
- size_t count;
- size_t last_chunk_count = 0;
- bool short_io = false;
- int rc = 0;
- ENTRY;
+ int result = 0;
LINVRNT(cl_io_is_loopable(io));
+ ENTRY;
do {
- io->ci_continue = 0;
-
- rc = cl_io_iter_init(env, io);
- if (rc) {
- cl_io_iter_fini(env, io);
- break;
- }
-
- pos = io->u.ci_rw.rw_range.cir_pos;
- count = io->u.ci_rw.rw_range.cir_count;
-
- if (io->ci_pio) {
- /* submit this range for parallel execution */
- pt = cl_io_submit_pt(io, pos, count);
- if (IS_ERR(pt)) {
- cl_io_iter_fini(env, io);
- rc = PTR_ERR(pt);
- break;
- }
-
- *tail = pt;
- tail = &pt->cip_next;
- } else {
- size_t nob = io->ci_nob;
-
- CDEBUG(D_VFSTRACE,
- "execute type %u range: [%llu, %llu) nob: %zu %s\n",
- io->ci_type, pos, pos + count, nob,
- io->ci_continue ? "continue" : "stop");
+ size_t nob;
- rc = cl_io_lock(env, io);
- if (rc) {
- cl_io_iter_fini(env, io);
- break;
+ io->ci_continue = 0;
+ result = cl_io_iter_init(env, io);
+ if (result == 0) {
+ nob = io->ci_nob;
+ result = cl_io_lock(env, io);
+ if (result == 0) {
+ /*
+ * Notify layers that locks has been taken,
+ * and do actual i/o.
+ *
+ * - llite: kms, short read;
+ * - llite: generic_file_read();
+ */
+ result = cl_io_start(env, io);
+ /*
+ * Send any remaining pending
+ * io, etc.
+ *
+ ** - llite: ll_rw_stats_tally.
+ */
+ cl_io_end(env, io);
+ cl_io_unlock(env, io);
+ cl_io_rw_advance(env, io, io->ci_nob - nob);
}
-
- /*
- * Notify layers that locks has been taken,
- * and do actual i/o.
- *
- * - llite: kms, short read;
- * - llite: generic_file_read();
- */
- rc = cl_io_start(env, io);
-
- /*
- * Send any remaining pending
- * io, etc.
- *
- * - llite: ll_rw_stats_tally.
- */
- cl_io_end(env, io);
- cl_io_unlock(env, io);
-
- count = io->ci_nob - nob;
- last_chunk_count = count;
}
-
- cl_io_rw_advance(env, io, count);
cl_io_iter_fini(env, io);
- } while (!rc && io->ci_continue);
+ } while (result == 0 && io->ci_continue);
- if (rc == -EWOULDBLOCK && io->ci_ndelay) {
+ if (result == -EWOULDBLOCK && io->ci_ndelay) {
io->ci_need_restart = 1;
- rc = 0;
- }
-
- CDEBUG(D_VFSTRACE, "loop type %u done: nob: %zu, rc: %d %s\n",
- io->ci_type, io->ci_nob, rc,
- io->ci_continue ? "continue" : "stop");
-
- while (head != NULL) {
- int rc2;
-
- pt = head;
- head = head->cip_next;
-
- rc2 = cfs_ptask_wait_for(&pt->cip_task);
- LASSERTF(!rc2, "wait for task error: %d\n", rc2);
-
- rc2 = cfs_ptask_result(&pt->cip_task);
- CDEBUG(D_VFSTRACE,
- "done %s range: [%llu, %llu) ret: %zd, rc: %d\n",
- pt->cip_iot == CIT_READ ? "read" : "write",
- pt->cip_pos, pt->cip_pos + pt->cip_count,
- pt->cip_result, rc2);
-
- /* save the result of ptask */
- rc = rc ? : rc2;
- io->ci_need_restart |= pt->cip_need_restart;
-
- if (!short_io) {
- if (!rc2) /* IO is done by this task successfully */
- io->ci_nob += pt->cip_result;
- if (pt->cip_result < pt->cip_count) {
- /* short IO happened.
- * Not necessary to be an error */
- CDEBUG(D_VFSTRACE,
- "incomplete range: [%llu, %llu) "
- "last_chunk_count: %zu\n",
- pt->cip_pos,
- pt->cip_pos + pt->cip_count,
- last_chunk_count);
- io->ci_nob -= last_chunk_count;
- short_io = true;
- }
- }
- OBD_FREE(pt, sizeof(*pt));
+ result = 0;
}
- CDEBUG(D_VFSTRACE, "return nob: %zu (%s io), rc: %d\n",
- io->ci_nob, short_io ? "short" : "full", rc);
-
- RETURN(rc < 0 ? rc : io->ci_result);
+ if (result == 0)
+ result = io->ci_result;
+ RETURN(result < 0 ? result : 0);
}
EXPORT_SYMBOL(cl_io_loop);
* \see cl_lock_slice_add(), cl_req_slice_add(), cl_page_slice_add()
*/
void cl_io_slice_add(struct cl_io *io, struct cl_io_slice *slice,
- struct cl_object *obj,
- const struct cl_io_operations *ops)
+ struct cl_object *obj,
+ const struct cl_io_operations *ops)
{
struct list_head *linkage = &slice->cis_linkage;
- LASSERT((linkage->prev == NULL && linkage->next == NULL) ||
+ LASSERT((linkage->prev == NULL && linkage->next == NULL) ||
list_empty(linkage));
- ENTRY;
+ ENTRY;
list_add_tail(linkage, &io->ci_layers);
- slice->cis_io = io;
- slice->cis_obj = obj;
- slice->cis_iop = ops;
- EXIT;
+ slice->cis_io = io;
+ slice->cis_obj = obj;
+ slice->cis_iop = ops;
+ EXIT;
}
EXPORT_SYMBOL(cl_io_slice_add);
}
EXPORT_SYMBOL(cl_req_attr_set);
-/* cl_sync_io_callback assumes the caller must call cl_sync_io_wait() to
- * wait for the IO to finish. */
-void cl_sync_io_end(const struct lu_env *env, struct cl_sync_io *anchor)
-{
- wake_up_all(&anchor->csi_waitq);
-
- /* it's safe to nuke or reuse anchor now */
- atomic_set(&anchor->csi_barrier, 0);
-}
-EXPORT_SYMBOL(cl_sync_io_end);
-
/**
- * Initialize synchronous io wait anchor
+ * Initialize synchronous io wait \a anchor for \a nr pages with optional
+ * \a end handler.
+ * \param anchor owned by caller, initialzied here.
+ * \param nr number of pages initally pending in sync.
+ * \param end optional callback sync_io completion, can be used to
+ * trigger erasure coding, integrity, dedupe, or similar operation.
+ * \q end is called with a spinlock on anchor->csi_waitq.lock
*/
-void cl_sync_io_init(struct cl_sync_io *anchor, int nr,
- void (*end)(const struct lu_env *, struct cl_sync_io *))
+
+void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr,
+ struct cl_dio_aio *aio, cl_sync_io_end_t *end)
{
ENTRY;
memset(anchor, 0, sizeof(*anchor));
init_waitqueue_head(&anchor->csi_waitq);
atomic_set(&anchor->csi_sync_nr, nr);
- atomic_set(&anchor->csi_barrier, nr > 0);
anchor->csi_sync_rc = 0;
anchor->csi_end_io = end;
- LASSERT(end != NULL);
+ anchor->csi_aio = aio;
EXIT;
}
-EXPORT_SYMBOL(cl_sync_io_init);
+EXPORT_SYMBOL(cl_sync_io_init_notify);
/**
* Wait until all IO completes. Transfer completion routine has to call
int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout)
{
- struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
- NULL, NULL, NULL);
- int rc;
+ int rc = 0;
ENTRY;
LASSERT(timeout >= 0);
- rc = l_wait_event(anchor->csi_waitq,
- atomic_read(&anchor->csi_sync_nr) == 0,
- &lwi);
- if (rc < 0) {
+ if (timeout > 0 &&
+ wait_event_idle_timeout(anchor->csi_waitq,
+ atomic_read(&anchor->csi_sync_nr) == 0,
+ cfs_time_seconds(timeout)) == 0) {
+ rc = -ETIMEDOUT;
CERROR("IO failed: %d, still wait for %d remaining entries\n",
rc, atomic_read(&anchor->csi_sync_nr));
+ }
- lwi = (struct l_wait_info) { 0 };
- (void)l_wait_event(anchor->csi_waitq,
- atomic_read(&anchor->csi_sync_nr) == 0,
- &lwi);
- } else {
+ wait_event_idle(anchor->csi_waitq,
+ atomic_read(&anchor->csi_sync_nr) == 0);
+ if (!rc)
rc = anchor->csi_sync_rc;
- }
+
+ /* We take the lock to ensure that cl_sync_io_note() has finished */
+ spin_lock(&anchor->csi_waitq.lock);
LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
+ spin_unlock(&anchor->csi_waitq.lock);
- /* wait until cl_sync_io_note() has done wakeup */
- while (unlikely(atomic_read(&anchor->csi_barrier) != 0)) {
- cpu_relax();
- }
RETURN(rc);
}
EXPORT_SYMBOL(cl_sync_io_wait);
* IO.
*/
LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
- if (atomic_dec_and_test(&anchor->csi_sync_nr)) {
- LASSERT(anchor->csi_end_io != NULL);
- anchor->csi_end_io(env, anchor);
- /* Can't access anchor any more */
+ if (atomic_dec_and_lock(&anchor->csi_sync_nr,
+ &anchor->csi_waitq.lock)) {
+ struct cl_dio_aio *aio = NULL;
+
+ cl_sync_io_end_t *end_io = anchor->csi_end_io;
+
+ /*
+ * Holding the lock across both the decrement and
+ * the wakeup ensures cl_sync_io_wait() doesn't complete
+ * before the wakeup completes and the contents of
+ * of anchor become unsafe to access as the owner is free
+ * to immediately reclaim anchor when cl_sync_io_wait()
+ * completes.
+ */
+ wake_up_all_locked(&anchor->csi_waitq);
+ if (end_io)
+ end_io(env, anchor);
+ if (anchor->csi_aio)
+ aio = anchor->csi_aio;
+
+ spin_unlock(&anchor->csi_waitq.lock);
+
+ /**
+ * If anchor->csi_aio is set, we are responsible for freeing
+ * memory here rather than when cl_sync_io_wait() completes.
+ */
+ if (aio)
+ OBD_FREE_PTR(aio);
}
EXIT;
}