Whamcloud - gitweb
b=21454 Disable cl_page_in_io check for append write.
[fs/lustre-release.git] / lustre / obdclass / cl_io.c
index 62357e7..5a6b8b0 100644 (file)
@@ -75,15 +75,6 @@ static inline int cl_io_is_loopable(const struct cl_io *io)
 }
 
 /**
- * True, iff \a io is a sendfile().
- */
-int cl_io_is_sendfile(const struct cl_io *io)
-{
-        return io->ci_type == CIT_READ && io->u.ci_rd.rd_is_sendfile;
-}
-EXPORT_SYMBOL(cl_io_is_sendfile);
-
-/**
  * Returns true iff there is an IO ongoing in the given environment.
  */
 int cl_io_is_going(const struct lu_env *env)
@@ -227,7 +218,7 @@ int cl_io_rw_init(const struct lu_env *env, struct cl_io *io,
         ENTRY;
 
         LU_OBJECT_HEADER(D_VFSTRACE, env, &io->ci_obj->co_lu,
-                         "io range: %i [%llu, %llu) %i %i\n",
+                         "io range: %u ["LPU64", "LPU64") %u %u\n",
                          iot, (__u64)pos, (__u64)pos + count,
                          io->u.ci_rw.crw_nonblock, io->u.ci_wr.wr_append);
         io->u.ci_rw.crw_pos    = pos;
@@ -332,12 +323,11 @@ static int cl_lockset_lock_one(const struct lu_env *env,
 
         ENTRY;
 
-        lock = cl_lock_request(env, io, &link->cill_descr, link->cill_enq_flags,
-                               "io", io);
+        lock = cl_lock_request(env, io, &link->cill_descr, "io", io);
         if (!IS_ERR(lock)) {
                 link->cill_lock = lock;
                 list_move(&link->cill_linkage, &set->cls_curr);
-                if (!(link->cill_enq_flags & CEF_ASYNC)) {
+                if (!(link->cill_descr.cld_enq_flags & CEF_ASYNC)) {
                         result = cl_wait(env, lock);
                         if (result == 0)
                                 list_move(&link->cill_linkage, &set->cls_done);
@@ -466,7 +456,7 @@ void cl_io_unlock(const struct lu_env *env, struct cl_io *io)
                         scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
         }
         io->ci_state = CIS_UNLOCKED;
-        LASSERT(cl_env_info(env)->clt_nr_locks_acquired == 0);
+        LASSERT(!cl_env_info(env)->clt_counters[CNL_TOP].ctc_nr_locks_acquired);
         EXIT;
 }
 EXPORT_SYMBOL(cl_io_unlock);
@@ -590,8 +580,8 @@ int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
         ENTRY;
         OBD_ALLOC_PTR(link);
         if (link != NULL) {
-                link->cill_descr = *descr;
-                link->cill_fini = cl_free_io_lock_link;
+                link->cill_descr     = *descr;
+                link->cill_fini      = cl_free_io_lock_link;
                 result = cl_io_lock_add(env, io, link);
                 if (result) /* lock match */
                         link->cill_fini(env, link);
@@ -667,7 +657,7 @@ cl_io_slice_page(const struct cl_io_slice *ios, struct cl_page *page)
  */
 static int cl_page_in_io(const struct cl_page *page, const struct cl_io *io)
 {
-        int     result;
+        int     result = 1;
         loff_t  start;
         loff_t  end;
         pgoff_t idx;
@@ -680,10 +670,13 @@ static int cl_page_in_io(const struct cl_page *page, const struct cl_io *io)
                  * check that [start, end) and [pos, pos + count) extents
                  * overlap.
                  */
-                start = cl_offset(page->cp_obj, idx);
-                end   = cl_offset(page->cp_obj, idx + 1);
-                result = io->u.ci_rw.crw_pos < end &&
-                        start < io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
+                if (!cl_io_is_append(io)) {
+                        const struct cl_io_rw_common *crw = &(io->u.ci_rw);
+                        start = cl_offset(page->cp_obj, idx);
+                        end   = cl_offset(page->cp_obj, idx + 1);
+                        result = crw->crw_pos < end &&
+                                 start < crw->crw_pos + crw->crw_count;
+                }
                 break;
         case CIT_FAULT:
                 result = io->u.ci_fault.ft_index == idx;
@@ -738,7 +731,7 @@ int cl_io_read_page(const struct lu_env *env, struct cl_io *io,
                 }
         }
         if (result == 0)
-                result = cl_io_submit_rw(env, io, CRT_READ, queue);
+                result = cl_io_submit_rw(env, io, CRT_READ, queue, CRP_NORMAL);
         /*
          * Unlock unsent pages in case of error.
          */
@@ -834,7 +827,8 @@ EXPORT_SYMBOL(cl_io_commit_write);
  * \see cl_io_operations::cio_submit()
  */
 int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
-                    enum cl_req_type crt, struct cl_2queue *queue)
+                    enum cl_req_type crt, struct cl_2queue *queue,
+                    enum cl_req_priority priority)
 {
         const struct cl_io_slice *scan;
         int result = 0;
@@ -846,7 +840,7 @@ int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
                 if (scan->cis_iop->req_op[crt].cio_submit == NULL)
                         continue;
                 result = scan->cis_iop->req_op[crt].cio_submit(env, scan, crt,
-                                                               queue);
+                                                               queue, priority);
                 if (result != 0)
                         break;
         }
@@ -859,6 +853,51 @@ int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
 EXPORT_SYMBOL(cl_io_submit_rw);
 
 /**
+ * Submit a sync_io and wait for the IO to be finished, or error happens.
+ * If \a timeout is zero, it means to wait for the IO unconditionally.
+ */
+int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
+                      enum cl_req_type iot, struct cl_2queue *queue,
+                      enum cl_req_priority prio, long timeout)
+{
+        struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
+        struct cl_page *pg;
+        int rc;
+
+        LASSERT(prio == CRP_NORMAL || prio == CRP_CANCEL);
+
+        cl_page_list_for_each(pg, &queue->c2_qin) {
+                LASSERT(pg->cp_sync_io == NULL);
+                pg->cp_sync_io = anchor;
+        }
+
+        cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
+        rc = cl_io_submit_rw(env, io, iot, queue, prio);
+        if (rc == 0) {
+                /*
+                 * If some pages weren't sent for any reason (e.g.,
+                 * read found up-to-date pages in the cache, or write found
+                 * clean pages), count them as completed to avoid infinite
+                 * wait.
+                 */
+                 cl_page_list_for_each(pg, &queue->c2_qin) {
+                        pg->cp_sync_io = NULL;
+                        cl_sync_io_note(anchor, +1);
+                 }
+
+                 /* wait for the IO to be finished. */
+                 rc = cl_sync_io_wait(env, io, &queue->c2_qout,
+                                      anchor, timeout);
+        } else {
+                LASSERT(list_empty(&queue->c2_qout.pl_pages));
+                cl_page_list_for_each(pg, &queue->c2_qin)
+                        pg->cp_sync_io = NULL;
+        }
+        return rc;
+}
+EXPORT_SYMBOL(cl_io_submit_sync);
+
+/**
  * Cancel an IO which has been submitted by cl_io_submit_rw.
  */
 int cl_io_cancel(const struct lu_env *env, struct cl_io *io,
@@ -1127,6 +1166,7 @@ int cl_page_list_own(const struct lu_env *env,
 {
         struct cl_page *page;
         struct cl_page *temp;
+        pgoff_t index = 0;
         int result;
 
         LINVRNT(plist->pl_owner == cfs_current());
@@ -1134,8 +1174,10 @@ int cl_page_list_own(const struct lu_env *env,
         ENTRY;
         result = 0;
         cl_page_list_for_each_safe(page, temp, plist) {
+                LASSERT(index <= page->cp_index);
+                index = page->cp_index;
                 if (cl_page_own(env, io, page) == 0)
-                result = result ?: page->cp_error;
+                        result = result ?: page->cp_error;
                 else
                         cl_page_list_del(env, plist, page);
         }
@@ -1568,7 +1610,7 @@ EXPORT_SYMBOL(cl_req_attr_set);
 void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages)
 {
         ENTRY;
-        init_completion(&anchor->csi_sync_completion);
+        cfs_waitq_init(&anchor->csi_waitq);
         atomic_set(&anchor->csi_sync_nr, nrpages);
         anchor->csi_sync_rc  = 0;
         EXIT;
@@ -1580,24 +1622,33 @@ EXPORT_SYMBOL(cl_sync_io_init);
  * cl_sync_io_note() for every page.
  */
 int cl_sync_io_wait(const struct lu_env *env, struct cl_io *io,
-                    struct cl_page_list *queue, struct cl_sync_io *anchor)
+                    struct cl_page_list *queue, struct cl_sync_io *anchor,
+                    long timeout)
 {
+        struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
+                                                  NULL, NULL, NULL);
         int rc;
         ENTRY;
 
-        rc = wait_for_completion_interruptible(&anchor->csi_sync_completion);
+        LASSERT(timeout >= 0);
+
+        rc = l_wait_event(anchor->csi_waitq,
+                          atomic_read(&anchor->csi_sync_nr) == 0,
+                          &lwi);
         if (rc < 0) {
-                int rc2;
-                rc2 = cl_io_cancel(env, io, queue);
-                if (rc2 < 0) {
-                        /* Too bad, some pages are still in IO. */
-                        CDEBUG(D_VFSTRACE, "Failed to cancel transfer (%i). "
-                               "Waiting for %i pages\n",
-                               rc2, atomic_read(&anchor->csi_sync_nr));
-                        wait_for_completion(&anchor->csi_sync_completion);
-                }
-        } else
+                CERROR("SYNC IO failed with error: %d, try to cancel "
+                       "%d remaining pages\n",
+                       rc, atomic_read(&anchor->csi_sync_nr));
+
+                (void)cl_io_cancel(env, io, queue);
+
+                lwi = (struct l_wait_info) { 0 };
+                (void)l_wait_event(anchor->csi_waitq,
+                                   atomic_read(&anchor->csi_sync_nr) == 0,
+                                   &lwi);
+        } else {
                 rc = anchor->csi_sync_rc;
+        }
         LASSERT(atomic_read(&anchor->csi_sync_nr) == 0);
         cl_page_list_assume(env, io, queue);
         POISON(anchor, 0x5a, sizeof *anchor);
@@ -1618,8 +1669,9 @@ void cl_sync_io_note(struct cl_sync_io *anchor, int ioret)
          * ->{prepare,commit}_write(). Completion is used to signal the end of
          * IO.
          */
+        LASSERT(atomic_read(&anchor->csi_sync_nr) > 0);
         if (atomic_dec_and_test(&anchor->csi_sync_nr))
-                complete(&anchor->csi_sync_completion);
+                cfs_waitq_broadcast(&anchor->csi_waitq);
         EXIT;
 }
 EXPORT_SYMBOL(cl_sync_io_note);