#include <linux/sched.h>
#include <linux/list.h>
+#include <linux/list_sort.h>
#include <obd_class.h>
#include <obd_support.h>
#include <lustre_fid.h>
}
EXPORT_SYMBOL(cl_io_rw_init);
-static int cl_lock_descr_sort(const struct cl_lock_descr *d0,
- const struct cl_lock_descr *d1)
+static int cl_lock_descr_cmp(void *priv,
+ struct list_head *a, struct list_head *b)
{
+ const struct cl_io_lock_link *l0 = list_entry(a, struct cl_io_lock_link,
+ cill_linkage);
+ const struct cl_io_lock_link *l1 = list_entry(b, struct cl_io_lock_link,
+ cill_linkage);
+ const struct cl_lock_descr *d0 = &l0->cill_descr;
+ const struct cl_lock_descr *d1 = &l1->cill_descr;
+
return lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
lu_object_fid(&d1->cld_obj->co_lu));
}
-/*
- * Sort locks in lexicographical order of their (fid, start-offset) pairs.
- */
-static void cl_io_locks_sort(struct cl_io *io)
-{
- int done = 0;
-
- ENTRY;
- /* hidden treasure: bubble sort for now. */
- do {
- struct cl_io_lock_link *curr;
- struct cl_io_lock_link *prev;
- struct cl_io_lock_link *temp;
-
- done = 1;
- prev = NULL;
-
- list_for_each_entry_safe(curr, temp, &io->ci_lockset.cls_todo,
- cill_linkage) {
- if (prev != NULL) {
- switch (cl_lock_descr_sort(&prev->cill_descr,
- &curr->cill_descr)) {
- case 0:
- /*
- * IMPOSSIBLE: Identical locks are
- * already removed at
- * this point.
- */
- default:
- LBUG();
- case +1:
- list_move_tail(&curr->cill_linkage,
- &prev->cill_linkage);
- done = 0;
- continue; /* don't change prev: it's
- * still "previous" */
- case -1: /* already in order */
- break;
- }
- }
- prev = curr;
- }
- } while (!done);
- EXIT;
-}
-
static void cl_lock_descr_merge(struct cl_lock_descr *d0,
const struct cl_lock_descr *d1)
{
break;
}
if (result == 0) {
- cl_io_locks_sort(io);
+ /*
+ * Sort locks in lexicographical order of their (fid,
+ * start-offset) pairs to avoid deadlocks.
+ */
+ list_sort(NULL, &io->ci_lockset.cls_todo, cl_lock_descr_cmp);
result = cl_lockset_lock(env, io, &io->ci_lockset);
}
if (result != 0)
const struct cl_io_slice *scan;
int result = 0;
- LINVRNT(io->ci_type == CIT_READ || io->ci_type == CIT_FAULT);
+ LINVRNT(io->ci_type == CIT_READ ||
+ io->ci_type == CIT_FAULT ||
+ io->ci_type == CIT_WRITE);
LINVRNT(io->ci_state == CIS_IO_GOING || io->ci_state == CIS_LOCKED);
LINVRNT(cl_io_invariant(io));
ENTRY;
* \see cl_io_operations::cio_commit_async()
*/
int cl_io_commit_async(const struct lu_env *env, struct cl_io *io,
- struct cl_page_list *queue, int from, int to,
- cl_commit_cbt cb)
+ struct cl_page_list *queue, int from, int to,
+ cl_commit_cbt cb)
{
const struct cl_io_slice *scan;
int result = 0;
pg->cp_sync_io = anchor;
}
- cl_sync_io_init(anchor, queue->c2_qin.pl_nr, &cl_sync_io_end);
+ cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
rc = cl_io_submit_rw(env, io, iot, queue);
if (rc == 0) {
/*
}
EXPORT_SYMBOL(cl_req_attr_set);
-/*
- * cl_sync_io_end callback is issued as cl_sync_io completes and before
- * control of anchor reverts to model used by the caller of cl_sync_io_init()
- *
- * NOTE: called with spinlock on anchor->csi_waitq.lock
- */
-void cl_sync_io_end(const struct lu_env *env, struct cl_sync_io *anchor)
-{
- /* deprecated pending future removal */
-}
-EXPORT_SYMBOL(cl_sync_io_end);
-
/**
- * Initialize synchronous io wait anchor
+ * Initialize synchronous io wait \a anchor for \a nr pages with optional
+ * \a end handler.
+ * \param anchor owned by caller, initialzied here.
+ * \param nr number of pages initally pending in sync.
+ * \param end optional callback sync_io completion, can be used to
+ * trigger erasure coding, integrity, dedupe, or similar operation.
+ * \q end is called with a spinlock on anchor->csi_waitq.lock
*/
-void cl_sync_io_init(struct cl_sync_io *anchor, int nr,
- void (*end)(const struct lu_env *, struct cl_sync_io *))
+
+void cl_sync_io_init_notify(struct cl_sync_io *anchor, int nr,
+ cl_sync_io_end_t *end)
{
ENTRY;
memset(anchor, 0, sizeof(*anchor));
atomic_set(&anchor->csi_sync_nr, nr);
anchor->csi_sync_rc = 0;
anchor->csi_end_io = end;
- LASSERT(end != NULL);
EXIT;
}
-EXPORT_SYMBOL(cl_sync_io_init);
+EXPORT_SYMBOL(cl_sync_io_init_notify);
/**
* Wait until all IO completes. Transfer completion routine has to call
int cl_sync_io_wait(const struct lu_env *env, struct cl_sync_io *anchor,
long timeout)
{
- struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
- NULL, NULL, NULL);
- int rc;
+ int rc = 0;
ENTRY;
LASSERT(timeout >= 0);
- rc = l_wait_event(anchor->csi_waitq,
- atomic_read(&anchor->csi_sync_nr) == 0,
- &lwi);
- if (rc < 0) {
+ if (timeout > 0 &&
+ wait_event_idle_timeout(anchor->csi_waitq,
+ atomic_read(&anchor->csi_sync_nr) == 0,
+ cfs_time_seconds(timeout)) == 0) {
+ rc = -ETIMEDOUT;
CERROR("IO failed: %d, still wait for %d remaining entries\n",
rc, atomic_read(&anchor->csi_sync_nr));
+ }
- lwi = (struct l_wait_info) { 0 };
- (void)l_wait_event(anchor->csi_waitq,
- atomic_read(&anchor->csi_sync_nr) == 0,
- &lwi);
- } else {
+ wait_event_idle(anchor->csi_waitq,
+ atomic_read(&anchor->csi_sync_nr) == 0);
+ if (!rc)
rc = anchor->csi_sync_rc;
- }
+
/* We take the lock to ensure that cl_sync_io_note() has finished */
spin_lock(&anchor->csi_waitq.lock);
LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
if (atomic_dec_and_lock(&anchor->csi_sync_nr,
&anchor->csi_waitq.lock)) {
+ cl_sync_io_end_t *end_io = anchor->csi_end_io;
+
/*
* Holding the lock across both the decrement and
* the wakeup ensures cl_sync_io_wait() doesn't complete
* completes.
*/
wake_up_all_locked(&anchor->csi_waitq);
- anchor->csi_end_io(env, anchor);
+ if (end_io)
+ end_io(env, anchor);
spin_unlock(&anchor->csi_waitq.lock);
/* Can't access anchor any more */