Whamcloud - gitweb
LU-523: No prepare_write for lockless IO
authorJinshan Xiong <jay@whamcloud.com>
Sat, 23 Jul 2011 03:52:46 +0000 (20:52 -0700)
committerOleg Drokin <green@whamcloud.com>
Tue, 26 Jul 2011 02:34:31 +0000 (22:34 -0400)
For page unaligned write, CLIO does prepare_write even for lockless IO
and then write full page back, this will cause data corruption since
data is not covered by lock.

In this patch, we don't do prepare_write for lockless IO, and then submit
exact bytes in the page to OST.

Change-Id: I4aa5afeb82cb717de499c8a8c004078b279302c7
Signed-off-by: Jinshan Xiong <jay@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/1130
Tested-by: Hudson
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
lustre/obdclass/cl_io.c
lustre/osc/osc_internal.h
lustre/osc/osc_io.c
lustre/osc/osc_page.c
lustre/osc/osc_request.c

index 3b9b876..d5d52d9 100644 (file)
@@ -1547,6 +1547,9 @@ void cl_req_page_add(const struct lu_env *env,
         LASSERT(cfs_list_empty(&page->cp_flight));
         LASSERT(page->cp_req == NULL);
 
+        CL_PAGE_DEBUG(D_PAGE, env, page, "req %p, %d, %u\n",
+                      req, req->crq_type, req->crq_nrpages);
+
         cfs_list_add_tail(&page->cp_flight, &req->crq_pages);
         ++req->crq_nrpages;
         page->cp_req = req;
index 305ddd3..2a94ebb 100644 (file)
@@ -159,9 +159,8 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli);
 
 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
                        struct lov_stripe_md *lsm, struct lov_oinfo *loi,
-                       struct osc_async_page *oap, int cmd, obd_off off,
-                       int count, obd_flag brw_flags,
-                       enum async_flags async_flags);
+                       struct osc_async_page *oap, int cmd, int off,
+                       int count,  obd_flag brw_flags, enum async_flags async_flags);
 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
                             struct lov_oinfo *loi, struct osc_async_page *oap);
 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *cfg);
index 9e77fc6..4cae4a2 100644 (file)
@@ -299,14 +299,24 @@ static int osc_io_prepare_write(const struct lu_env *env,
 {
         struct osc_device *dev = lu2osc_dev(slice->cpl_obj->co_lu.lo_dev);
         struct obd_import *imp = class_exp2cliimp(dev->od_exp);
-
+        struct osc_io     *oio = cl2osc_io(env, ios);
+        int result = 0;
         ENTRY;
 
         /*
          * This implements OBD_BRW_CHECK logic from old client.
          */
 
-        RETURN(imp == NULL || imp->imp_invalid ? -EIO : 0);
+        if (imp == NULL || imp->imp_invalid)
+                result = -EIO;
+        if (result == 0 && oio->oi_lockless)
+                /* this page contains `invalid' data, but who cares?
+                 * nobody can access the invalid data.
+                 * in osc_io_commit_write(), we're going to write exact
+                 * [from, to) bytes of this page to OST. -jay */
+                cl_page_export(env, slice->cpl_page, 1);
+
+        RETURN(result);
 }
 
 static int osc_io_commit_write(const struct lu_env *env,
@@ -314,6 +324,7 @@ static int osc_io_commit_write(const struct lu_env *env,
                                const struct cl_page_slice *slice,
                                unsigned from, unsigned to)
 {
+        struct osc_io         *oio = cl2osc_io(env, ios);
         struct osc_page       *opg = cl2osc_page(slice);
         struct osc_object     *obj = cl2osc(opg->ops_cl.cpl_obj);
         struct osc_async_page *oap = &opg->ops_oap;
@@ -331,6 +342,10 @@ static int osc_io_commit_write(const struct lu_env *env,
             cfs_capable(CFS_CAP_SYS_RESOURCE))
                 oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
 
+        if (oio->oi_lockless)
+                /* see osc_io_prepare_write() for lockless io handling. */
+                cl_page_clip(env, slice->cpl_page, from, to);
+
         RETURN(0);
 }
 
index d7f7772..d5e24e2 100644 (file)
@@ -232,7 +232,8 @@ static int osc_page_cache_add(const struct lu_env *env,
         osc_page_transfer_get(opg, "transfer\0cache");
         result = osc_queue_async_io(env, osc_export(obj), NULL, obj->oo_oinfo,
                                     &opg->ops_oap, OBD_BRW_WRITE | noquota,
-                                    0, 0, brw_flags, 0);
+                                    opg->ops_from, opg->ops_to - opg->ops_from,
+                                    brw_flags, 0);
         if (result != 0)
                 osc_page_transfer_put(env, opg);
         else
@@ -365,7 +366,7 @@ static int osc_page_print(const struct lu_env *env,
 
         return (*printer)(env, cookie, LUSTRE_OSC_NAME"-page@%p: "
                           "1< %#x %d %u %s %s %s > "
-                          "2< "LPU64" %u %#x %#x | %p %p %p %p %p > "
+                          "2< "LPU64" %u %u %#x %#x | %p %p %p %p %p > "
                           "3< %s %p %d %lu %d > "
                           "4< %d %d %d %lu %s | %s %s %s %s > "
                           "5< %s %s %s %s | %d %s %s | %d %s %s>\n",
@@ -377,7 +378,7 @@ static int osc_page_print(const struct lu_env *env,
                           osc_list(&oap->oap_urgent_item),
                           osc_list(&oap->oap_rpc_item),
                           /* 2 */
-                          oap->oap_obj_off, oap->oap_page_off,
+                          oap->oap_obj_off, oap->oap_page_off, oap->oap_count,
                           oap->oap_async_flags, oap->oap_brw_flags,
                           oap->oap_request,
                           oap->oap_cli, oap->oap_loi, oap->oap_caller_ops,
index a7cc433..b4abf57 100644 (file)
@@ -2436,7 +2436,8 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
         CFS_LIST_HEAD(rpc_list);
         CFS_LIST_HEAD(tmp_list);
         unsigned int ending_offset;
-        unsigned  starting_offset = 0;
+        obd_off starting_offset = OBD_OBJECT_EOF;
+        int starting_page_off = 0;
         int srvlock = 0, mem_tight = 0;
         struct cl_object *clob = NULL;
         ENTRY;
@@ -2482,7 +2483,13 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
                 /* If there is a gap at the start of this page, it can't merge
                  * with any previous page, so we'll hand the network a
                  * "fragmented" page array that it can't transfer in 1 RDMA */
-                if (page_count != 0 && oap->oap_page_off != 0)
+                if (oap->oap_obj_off < starting_offset) {
+                        if (starting_page_off != 0)
+                                break;
+
+                        starting_page_off = oap->oap_page_off;
+                        starting_offset = oap->oap_obj_off + starting_page_off;
+                } else if (oap->oap_page_off != 0)
                         break;
 
                 /* in llite being 'ready' equates to the page being locked
@@ -2560,10 +2567,6 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
                 lop_update_pending(cli, lop, cmd, -1);
                 cfs_list_del_init(&oap->oap_urgent_item);
 
-                if (page_count == 0)
-                        starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
-                                          (PTLRPC_MAX_BRW_SIZE - 1);
-
                 /* ask the caller for the size of the io as the rpc leaves. */
                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
                         oap->oap_count =
@@ -2628,6 +2631,7 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
 
         aa = ptlrpc_req_async_args(req);
 
+        starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
         if (cmd == OBD_BRW_READ) {
                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
@@ -2968,9 +2972,8 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
 
 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
                        struct lov_stripe_md *lsm, struct lov_oinfo *loi,
-                       struct osc_async_page *oap, int cmd, obd_off off,
-                       int count, obd_flag brw_flags,
-                       enum async_flags async_flags)
+                       struct osc_async_page *oap, int cmd, int off,
+                       int count, obd_flag brw_flags, enum async_flags async_flags)
 {
         struct client_obd *cli = &exp->exp_obd->u.cli;
         int rc = 0;