Whamcloud - gitweb
LU-3321 clio: add pages into writeback cache in batch
[fs/lustre-release.git] / lustre / lov / lov_io.c
index c163cfa..b6608d8 100644 (file)
@@ -556,14 +556,6 @@ static void lov_io_unlock(const struct lu_env *env,
         EXIT;
 }
 
-
-static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld,
-                                              struct cl_page_list *qin,
-                                              int idx, int alloc)
-{
-        return alloc ? &qin[idx] : &ld->ld_emrg[idx]->emrg_page_list;
-}
-
 /**
  * lov implementation of cl_operations::cio_submit() method. It takes a list
  * of pages in \a queue, splits it into per-stripe sub-lists, invokes
@@ -580,32 +572,20 @@ static struct cl_page_list *lov_io_submit_qin(struct lov_device *ld,
  * lov_device::ld_mutex mutex.
  */
 static int lov_io_submit(const struct lu_env *env,
-                         const struct cl_io_slice *ios,
+                        const struct cl_io_slice *ios,
                         enum cl_req_type crt, struct cl_2queue *queue)
 {
-        struct lov_io          *lio = cl2lov_io(env, ios);
-        struct lov_object      *obj = lio->lis_object;
-        struct lov_device       *ld = lu2lov_dev(lov2cl(obj)->co_lu.lo_dev);
-        struct cl_page_list    *qin = &queue->c2_qin;
-        struct cl_2queue      *cl2q = &lov_env_info(env)->lti_cl2q;
-        struct cl_page_list *stripes_qin = NULL;
-        struct cl_page *page;
-        struct cl_page *tmp;
-        int stripe;
-
-#define QIN(stripe) lov_io_submit_qin(ld, stripes_qin, stripe, alloc)
+       struct cl_page_list     *qin = &queue->c2_qin;
+       struct lov_io           *lio = cl2lov_io(env, ios);
+       struct lov_io_sub       *sub;
+       struct cl_page_list     *plist = &lov_env_info(env)->lti_plist;
+       struct cl_page          *page;
+       int stripe;
+       int rc = 0;
+       ENTRY;
 
-        int rc = 0;
-        int alloc =
-#if defined(__KERNEL__) && defined(__linux__)
-                !(current->flags & PF_MEMALLOC);
-#else
-                1;
-#endif
-        ENTRY;
         if (lio->lis_active_subios == 1) {
                 int idx = lio->lis_single_subio_index;
-                struct lov_io_sub *sub;
 
                 LASSERT(idx < lio->lis_nr_subios);
                 sub = lov_sub_get(env, lio, idx);
@@ -618,120 +598,121 @@ static int lov_io_submit(const struct lu_env *env,
         }
 
         LASSERT(lio->lis_subs != NULL);
-        if (alloc) {
-                OBD_ALLOC_LARGE(stripes_qin,
-                                sizeof(*stripes_qin) * lio->lis_nr_subios);
-                if (stripes_qin == NULL)
-                        RETURN(-ENOMEM);
-
-                for (stripe = 0; stripe < lio->lis_nr_subios; stripe++)
-                        cl_page_list_init(&stripes_qin[stripe]);
-        } else {
-                /*
-                 * If we get here, it means pageout & swap doesn't help.
-                 * In order to not make things worse, even don't try to
-                 * allocate the memory with __GFP_NOWARN. -jay
-                 */
-               mutex_lock(&ld->ld_mutex);
-                lio->lis_mem_frozen = 1;
-        }
 
-        cl_2queue_init(cl2q);
-        cl_page_list_for_each_safe(page, tmp, qin) {
-                stripe = lov_page_stripe(page);
-                cl_page_list_move(QIN(stripe), qin, page);
-        }
+       cl_page_list_init(plist);
+       while (qin->pl_nr > 0) {
+               struct cl_2queue  *cl2q = &lov_env_info(env)->lti_cl2q;
 
-        for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
-                struct lov_io_sub   *sub;
-                struct cl_page_list *sub_qin = QIN(stripe);
+               cl_2queue_init(cl2q);
 
-                if (cfs_list_empty(&sub_qin->pl_pages))
-                        continue;
+               page = cl_page_list_first(qin);
+               cl_page_list_move(&cl2q->c2_qin, qin, page);
 
-                cl_page_list_splice(sub_qin, &cl2q->c2_qin);
-                sub = lov_sub_get(env, lio, stripe);
-                if (!IS_ERR(sub)) {
+               stripe = lov_page_stripe(page);
+               while (qin->pl_nr > 0) {
+                       page = cl_page_list_first(qin);
+                       if (stripe != lov_page_stripe(page))
+                               break;
+
+                       cl_page_list_move(&cl2q->c2_qin, qin, page);
+               }
+
+               sub = lov_sub_get(env, lio, stripe);
+               if (!IS_ERR(sub)) {
                         rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
                                             crt, cl2q);
-                        lov_sub_put(sub);
-                } else
-                        rc = PTR_ERR(sub);
-                cl_page_list_splice(&cl2q->c2_qin,  &queue->c2_qin);
-                cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
-                if (rc != 0)
-                        break;
-        }
+                       lov_sub_put(sub);
+               } else {
+                       rc = PTR_ERR(sub);
+               }
 
-        for (stripe = 0; stripe < lio->lis_nr_subios; stripe++) {
-                struct cl_page_list *sub_qin = QIN(stripe);
+               cl_page_list_splice(&cl2q->c2_qin, plist);
+               cl_page_list_splice(&cl2q->c2_qout, &queue->c2_qout);
+               cl_2queue_fini(env, cl2q);
 
-                if (cfs_list_empty(&sub_qin->pl_pages))
-                        continue;
+               if (rc != 0)
+                       break;
+       }
 
-                cl_page_list_splice(sub_qin, qin);
-        }
+       cl_page_list_splice(plist, qin);
+       cl_page_list_fini(env, plist);
 
-        if (alloc) {
-                OBD_FREE_LARGE(stripes_qin,
-                         sizeof(*stripes_qin) * lio->lis_nr_subios);
-        } else {
-                int i;
+       RETURN(rc);
+}
 
-                for (i = 0; i < lio->lis_nr_subios; i++) {
-                        struct cl_io *cio = lio->lis_subs[i].sub_io;
+static int lov_io_commit_async(const struct lu_env *env,
+                              const struct cl_io_slice *ios,
+                              struct cl_page_list *queue, int from, int to,
+                              cl_commit_cbt cb)
+{
+       struct cl_page_list *plist = &lov_env_info(env)->lti_plist;
+       struct lov_io     *lio = cl2lov_io(env, ios);
+       struct lov_io_sub *sub;
+       struct cl_page *page;
+       int rc = 0;
+       ENTRY;
 
-                        if (cio && cio == &ld->ld_emrg[i]->emrg_subio)
-                                lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
-                }
-                lio->lis_mem_frozen = 0;
-               mutex_unlock(&ld->ld_mutex);
-        }
+       if (lio->lis_active_subios == 1) {
+               int idx = lio->lis_single_subio_index;
+
+               LASSERT(idx < lio->lis_nr_subios);
+               sub = lov_sub_get(env, lio, idx);
+               LASSERT(!IS_ERR(sub));
+               LASSERT(sub->sub_io == &lio->lis_single_subio);
+               rc = cl_io_commit_async(sub->sub_env, sub->sub_io, queue,
+                                       from, to, cb);
+               lov_sub_put(sub);
+               RETURN(rc);
+       }
 
-        RETURN(rc);
-#undef QIN
-}
+       LASSERT(lio->lis_subs != NULL);
 
-static int lov_io_prepare_write(const struct lu_env *env,
-                                const struct cl_io_slice *ios,
-                                const struct cl_page_slice *slice,
-                                unsigned from, unsigned to)
-{
-        struct lov_io     *lio      = cl2lov_io(env, ios);
-        struct cl_page    *sub_page = lov_sub_page(slice);
-        struct lov_io_sub *sub;
-        int result;
+       cl_page_list_init(plist);
+       while (queue->pl_nr > 0) {
+               int stripe_to = to;
+               int stripe;
 
-        ENTRY;
-        sub = lov_page_subio(env, lio, slice);
-        if (!IS_ERR(sub)) {
-                result = cl_io_prepare_write(sub->sub_env, sub->sub_io,
-                                             sub_page, from, to);
-                lov_sub_put(sub);
-        } else
-                result = PTR_ERR(sub);
-        RETURN(result);
-}
+               LASSERT(plist->pl_nr == 0);
+               page = cl_page_list_first(queue);
+               cl_page_list_move(plist, queue, page);
 
-static int lov_io_commit_write(const struct lu_env *env,
-                               const struct cl_io_slice *ios,
-                               const struct cl_page_slice *slice,
-                               unsigned from, unsigned to)
-{
-        struct lov_io     *lio      = cl2lov_io(env, ios);
-        struct cl_page    *sub_page = lov_sub_page(slice);
-        struct lov_io_sub *sub;
-        int result;
+               stripe = lov_page_stripe(page);
+               while (queue->pl_nr > 0) {
+                       page = cl_page_list_first(queue);
+                       if (stripe != lov_page_stripe(page))
+                               break;
 
-        ENTRY;
-        sub = lov_page_subio(env, lio, slice);
-        if (!IS_ERR(sub)) {
-                result = cl_io_commit_write(sub->sub_env, sub->sub_io,
-                                            sub_page, from, to);
-                lov_sub_put(sub);
-        } else
-                result = PTR_ERR(sub);
-        RETURN(result);
+                       cl_page_list_move(plist, queue, page);
+               }
+
+               if (queue->pl_nr > 0) /* still has more pages */
+                       stripe_to = PAGE_SIZE;
+
+               sub = lov_sub_get(env, lio, stripe);
+               if (!IS_ERR(sub)) {
+                       rc = cl_io_commit_async(sub->sub_env, sub->sub_io,
+                                               plist, from, stripe_to, cb);
+                       lov_sub_put(sub);
+               } else {
+                       rc = PTR_ERR(sub);
+                       break;
+               }
+
+               if (plist->pl_nr > 0) /* short write */
+                       break;
+
+               from = 0;
+       }
+
+       /* for error case, add the page back into the qin list */
+       LASSERT(ergo(rc == 0, plist->pl_nr == 0));
+       while (plist->pl_nr > 0) {
+               /* error occurred, add the uncommitted pages back into queue */
+               page = cl_page_list_last(plist);
+               cl_page_list_move_head(queue, plist, page);
+       }
+
+       RETURN(rc);
 }
 
 static int lov_io_fault_start(const struct lu_env *env,
@@ -819,20 +800,12 @@ static const struct cl_io_operations lov_io_ops = {
                        .cio_start     = lov_io_start,
                        .cio_end       = lov_io_fsync_end
                },
-                [CIT_MISC] = {
-                        .cio_fini   = lov_io_fini
-                }
-        },
-        .req_op = {
-                 [CRT_READ] = {
-                         .cio_submit    = lov_io_submit
-                 },
-                 [CRT_WRITE] = {
-                         .cio_submit    = lov_io_submit
-                 }
-         },
-        .cio_prepare_write = lov_io_prepare_write,
-        .cio_commit_write  = lov_io_commit_write
+               [CIT_MISC] = {
+                       .cio_fini      = lov_io_fini
+               }
+       },
+       .cio_submit                    = lov_io_submit,
+       .cio_commit_async              = lov_io_commit_async,
 };
 
 /*****************************************************************************
@@ -845,11 +818,11 @@ static void lov_empty_io_fini(const struct lu_env *env,
                               const struct cl_io_slice *ios)
 {
        struct lov_object *lov = cl2lov(ios->cis_obj);
-        ENTRY;
+       ENTRY;
 
        if (cfs_atomic_dec_and_test(&lov->lo_active_ios))
                wake_up_all(&lov->lo_waitq);
-        EXIT;
+       EXIT;
 }
 
 static void lov_empty_impossible(const struct lu_env *env,
@@ -896,21 +869,14 @@ static const struct cl_io_operations lov_empty_io_ops = {
                         .cio_end       = LOV_EMPTY_IMPOSSIBLE
                 },
                [CIT_FSYNC] = {
-                       .cio_fini   = lov_empty_io_fini
+                       .cio_fini      = lov_empty_io_fini
                },
-                [CIT_MISC] = {
-                        .cio_fini   = lov_empty_io_fini
-                }
-        },
-        .req_op = {
-                 [CRT_READ] = {
-                         .cio_submit    = LOV_EMPTY_IMPOSSIBLE
-                 },
-                 [CRT_WRITE] = {
-                         .cio_submit    = LOV_EMPTY_IMPOSSIBLE
-                 }
-         },
-        .cio_commit_write = LOV_EMPTY_IMPOSSIBLE
+               [CIT_MISC] = {
+                       .cio_fini      = lov_empty_io_fini
+               }
+       },
+       .cio_submit                    = LOV_EMPTY_IMPOSSIBLE,
+       .cio_commit_async              = LOV_EMPTY_IMPOSSIBLE
 };
 
 int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,