Whamcloud - gitweb
LU-1757 brw: add short io osc/ost transfer. 67/27767/12
authorPatrick Farrell <paf@cray.com>
Mon, 16 Oct 2017 10:22:22 +0000 (05:22 -0500)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 9 Nov 2017 20:06:50 +0000 (20:06 +0000)
There's no need to do target bulk io for small amount of
data, and it requires extra network operations.

For this case we add short i/o.  When the i/o size is less
than or equal to some number of pages (default 3), we
encapsulate the data in the ptlrpc request.

With this patch, 4k direct i/o read latency on a Cray Aries
network (data is on flash on another node on the Aries)
drops from ~280 microseconds to ~200 microseconds.  Write
latency drops from ~370 microseconds to ~350 microseconds
(much more of write latency is waiting for write commit).

This translates to about a 25-30% performance improvement
on 4k direct i/o reads and 4k random reads.  (Write
performance improvement was small to non-existent.)

Improvement was similar with 8k i/o.

Buffered sequential i/o sees no improvement, because it
does not perform small i/os.

Performance data:
        access             = file-per-process
        pattern            = segmented (1 segment)
        ordering in a file = random offsets
        ordering inter file= no tasks offsets
        xfersize           = 4096 bytes
        blocksize          = 100 MiB

nprocs  xfsize  shortio dio     random  Read (MB/s)
1       4k      no      yes     no      15.0
8       4k      no      yes     no      73.4
16      4k      no      yes     no      81.1
1       4k      yes     yes     no      16.5
8       4k      yes     yes     no      95.2
16      4k      yes     yes     no      107.3
1       4k      no      no      yes     15.5
8       4k      no      no      yes     73.4
16      4k      no      no      yes     81.2
1       4k      yes     no      yes     16.8
8       4k      yes     no      yes     95.0
16      4k      yes     no      yes     106.5

Note even when individual i/o performance is not improved,
this change reduces the # of network operations required
for small i/o, which can help on large systems.

Signed-off-by: Patrick Farrell <paf@cray.com>
Change-Id: I70050935eaa0a5e98ca437e18e730be4aa0e4700
Reviewed-on: https://review.whamcloud.com/27767
Tested-by: Jenkins
Reviewed-by: Alexey Lyashkov <c17817@cray.com>
Reviewed-by: Alexandr Boyko <c17825@cray.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
14 files changed:
lustre/include/lprocfs_status.h
lustre/include/lustre_export.h
lustre/include/lustre_net.h
lustre/include/lustre_osc.h
lustre/include/lustre_req_layout.h
lustre/include/obd.h
lustre/ldlm/ldlm_lib.c
lustre/llite/llite_lib.c
lustre/obdclass/lprocfs_status.c
lustre/osc/lproc_osc.c
lustre/osc/osc_page.c
lustre/osc/osc_request.c
lustre/ptlrpc/layout.c
lustre/target/tgt_handler.c

index 8656a08..2afd68a 100644 (file)
@@ -776,6 +776,10 @@ int lprocfs_obd_max_pages_per_rpc_seq_show(struct seq_file *m, void *data);
 ssize_t lprocfs_obd_max_pages_per_rpc_seq_write(struct file *file,
                                                const char __user *buffer,
                                                size_t count, loff_t *off);
 ssize_t lprocfs_obd_max_pages_per_rpc_seq_write(struct file *file,
                                                const char __user *buffer,
                                                size_t count, loff_t *off);
+int lprocfs_obd_short_io_bytes_seq_show(struct seq_file *m, void *data);
+ssize_t lprocfs_obd_short_io_bytes_seq_write(struct file *file,
+                                            const char __user *buffer,
+                                            size_t count, loff_t *off);
 
 struct root_squash_info;
 int lprocfs_wr_root_squash(const char __user *buffer, unsigned long count,
 
 struct root_squash_info;
 int lprocfs_wr_root_squash(const char __user *buffer, unsigned long count,
index c9ae7ea..ed511c8 100644 (file)
@@ -409,6 +409,13 @@ static inline bool imp_connect_disp_stripe(struct obd_import *imp)
        return ocd->ocd_connect_flags & OBD_CONNECT_DISP_STRIPE;
 }
 
        return ocd->ocd_connect_flags & OBD_CONNECT_DISP_STRIPE;
 }
 
+static inline bool imp_connect_shortio(struct obd_import *imp)
+{
+       struct obd_connect_data *ocd = &imp->imp_connect_data;
+
+       return ocd->ocd_connect_flags & OBD_CONNECT_SHORTIO;
+}
+
 static inline __u64 exp_connect_ibits(struct obd_export *exp)
 {
        struct obd_connect_data *ocd;
 static inline __u64 exp_connect_ibits(struct obd_export *exp)
 {
        struct obd_connect_data *ocd;
index b10ac91..08207fb 100644 (file)
  * - single object with 16 pages is 512 bytes
  * - OST_IO_MAXREQSIZE must be at least 1 page of cookies plus some spillover
  * - Must be a multiple of 1024
  * - single object with 16 pages is 512 bytes
  * - OST_IO_MAXREQSIZE must be at least 1 page of cookies plus some spillover
  * - Must be a multiple of 1024
- * - actual size is about 18K
  */
  */
-#define _OST_MAXREQSIZE_SUM (sizeof(struct lustre_msg) + \
-                            sizeof(struct ptlrpc_body) + \
-                            sizeof(struct obdo) + \
-                            sizeof(struct obd_ioobj) + \
-                            sizeof(struct niobuf_remote) * DT_MAX_BRW_PAGES)
+#define _OST_MAXREQSIZE_BASE (sizeof(struct lustre_msg) + \
+                             sizeof(struct ptlrpc_body) + \
+                             sizeof(struct obdo) + \
+                             sizeof(struct obd_ioobj) + \
+                             sizeof(struct niobuf_remote))
+#define _OST_MAXREQSIZE_SUM (_OST_MAXREQSIZE_BASE + \
+                            sizeof(struct niobuf_remote) * \
+                            (DT_MAX_BRW_PAGES - 1))
 /**
  * FIEMAP request can be 4K+ for now
  */
 #define OST_MAXREQSIZE         (16 * 1024)
 #define OST_IO_MAXREQSIZE      max_t(int, OST_MAXREQSIZE, \
                                (((_OST_MAXREQSIZE_SUM - 1) | (1024 - 1)) + 1))
 /**
  * FIEMAP request can be 4K+ for now
  */
 #define OST_MAXREQSIZE         (16 * 1024)
 #define OST_IO_MAXREQSIZE      max_t(int, OST_MAXREQSIZE, \
                                (((_OST_MAXREQSIZE_SUM - 1) | (1024 - 1)) + 1))
+/* Safe estimate of free space in standard RPC, provides upper limit for # of
+ * bytes of i/o to pack in RPC (skipping bulk transfer). */
+#define OST_SHORT_IO_SPACE     (OST_IO_MAXREQSIZE - _OST_MAXREQSIZE_BASE)
+
+/* Actual size used for short i/o buffer.  Calculation means this:
+ * At least one page (for large PAGE_SIZE), or 16 KiB, but not more
+ * than the available space aligned to a page boundary. */
+#define OBD_MAX_SHORT_IO_BYTES (min(max(PAGE_SIZE, 16UL * 1024UL), \
+                                   OST_SHORT_IO_SPACE & PAGE_MASK))
 
 #define OST_MAXREPSIZE         (9 * 1024)
 #define OST_IO_MAXREPSIZE      OST_MAXREPSIZE
 
 #define OST_MAXREPSIZE         (9 * 1024)
 #define OST_IO_MAXREPSIZE      OST_MAXREPSIZE
  */
 #define OST_IO_BUFSIZE         max_t(int, OST_IO_MAXREQSIZE + 1024, 64 * 1024)
 
  */
 #define OST_IO_BUFSIZE         max_t(int, OST_IO_MAXREQSIZE + 1024, 64 * 1024)
 
+
 /* Macro to hide a typecast. */
 #define ptlrpc_req_async_args(req) ((void *)&req->rq_async_args)
 
 /* Macro to hide a typecast. */
 #define ptlrpc_req_async_args(req) ((void *)&req->rq_async_args)
 
index 124300e..df36985 100644 (file)
@@ -445,6 +445,18 @@ struct osc_page {
        cfs_time_t              ops_submit_time;
 };
 
        cfs_time_t              ops_submit_time;
 };
 
+struct osc_brw_async_args {
+       struct obdo             *aa_oa;
+       int                      aa_requested_nob;
+       int                      aa_nio_count;
+       u32                      aa_page_count;
+       int                      aa_resends;
+       struct brw_page         **aa_ppga;
+       struct client_obd       *aa_cli;
+       struct list_head         aa_oaps;
+       struct list_head         aa_exts;
+};
+
 extern struct kmem_cache *osc_lock_kmem;
 extern struct kmem_cache *osc_object_kmem;
 extern struct kmem_cache *osc_thread_kmem;
 extern struct kmem_cache *osc_lock_kmem;
 extern struct kmem_cache *osc_object_kmem;
 extern struct kmem_cache *osc_thread_kmem;
index 46e6fa8..d2f3c52 100644 (file)
@@ -317,6 +317,7 @@ extern struct req_msg_field RMF_RCS;
 extern struct req_msg_field RMF_FIEMAP_KEY;
 extern struct req_msg_field RMF_FIEMAP_VAL;
 extern struct req_msg_field RMF_OST_ID;
 extern struct req_msg_field RMF_FIEMAP_KEY;
 extern struct req_msg_field RMF_FIEMAP_VAL;
 extern struct req_msg_field RMF_OST_ID;
+extern struct req_msg_field RMF_SHORT_IO;
 
 /* MGS config read message format */
 extern struct req_msg_field RMF_MGS_CONFIG_BODY;
 
 /* MGS config read message format */
 extern struct req_msg_field RMF_MGS_CONFIG_BODY;
index be2bec4..a19500a 100644 (file)
@@ -246,6 +246,7 @@ struct client_obd {
        atomic_t                cl_pending_r_pages;
        __u32                   cl_max_pages_per_rpc;
        __u32                   cl_max_rpcs_in_flight;
        atomic_t                cl_pending_r_pages;
        __u32                   cl_max_pages_per_rpc;
        __u32                   cl_max_rpcs_in_flight;
+       __u32                   cl_short_io_bytes;
        struct obd_histogram    cl_read_rpc_hist;
        struct obd_histogram    cl_write_rpc_hist;
        struct obd_histogram    cl_read_page_hist;
        struct obd_histogram    cl_read_rpc_hist;
        struct obd_histogram    cl_write_rpc_hist;
        struct obd_histogram    cl_read_page_hist;
index 0b0d778..c4865f9 100644 (file)
@@ -409,6 +409,8 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
         * from OFD after connecting. */
        cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES;
 
         * from OFD after connecting. */
        cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES;
 
+       cli->cl_short_io_bytes = OBD_MAX_SHORT_IO_BYTES;
+
        /* set cl_chunkbits default value to PAGE_SHIFT,
         * it will be updated at OSC connection time. */
        cli->cl_chunkbits = PAGE_SHIFT;
        /* set cl_chunkbits default value to PAGE_SHIFT,
         * it will be updated at OSC connection time. */
        cli->cl_chunkbits = PAGE_SHIFT;
index da145a4..6071d49 100644 (file)
@@ -399,7 +399,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                  OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE |
                                  OBD_CONNECT_LAYOUTLOCK |
                                  OBD_CONNECT_PINGLESS | OBD_CONNECT_LFSCK |
                                  OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE |
                                  OBD_CONNECT_LAYOUTLOCK |
                                  OBD_CONNECT_PINGLESS | OBD_CONNECT_LFSCK |
-                                 OBD_CONNECT_BULK_MBITS |
+                                 OBD_CONNECT_BULK_MBITS | OBD_CONNECT_SHORTIO |
                                  OBD_CONNECT_FLAGS2;
 
 /* The client currently advertises support for OBD_CONNECT_LOCKAHEAD_OLD so it
                                  OBD_CONNECT_FLAGS2;
 
 /* The client currently advertises support for OBD_CONNECT_LOCKAHEAD_OLD so it
index 37cb29d..d2505ef 100644 (file)
@@ -2432,6 +2432,56 @@ ssize_t lprocfs_obd_max_pages_per_rpc_seq_write(struct file *file,
 }
 EXPORT_SYMBOL(lprocfs_obd_max_pages_per_rpc_seq_write);
 
 }
 EXPORT_SYMBOL(lprocfs_obd_max_pages_per_rpc_seq_write);
 
+int lprocfs_obd_short_io_bytes_seq_show(struct seq_file *m, void *data)
+{
+       struct obd_device *dev = data;
+       struct client_obd *cli = &dev->u.cli;
+
+       spin_lock(&cli->cl_loi_list_lock);
+       seq_printf(m, "%d\n", cli->cl_short_io_bytes);
+       spin_unlock(&cli->cl_loi_list_lock);
+       return 0;
+}
+EXPORT_SYMBOL(lprocfs_obd_short_io_bytes_seq_show);
+
+
+/* Used to catch people who think they're specifying pages. */
+#define MIN_SHORT_IO_BYTES 64
+
+ssize_t lprocfs_obd_short_io_bytes_seq_write(struct file *file,
+                                            const char __user *buffer,
+                                            size_t count, loff_t *off)
+{
+       struct obd_device *dev = ((struct seq_file *)
+                                               file->private_data)->private;
+       struct client_obd *cli = &dev->u.cli;
+       int rc;
+       __u64 val;
+
+       LPROCFS_CLIMP_CHECK(dev);
+
+       rc = lprocfs_str_to_s64(buffer, count, &val);
+       if (rc)
+               GOTO(out, rc);
+
+       if (val > OBD_MAX_SHORT_IO_BYTES || val < MIN_SHORT_IO_BYTES)
+               GOTO(out, rc = -ERANGE);
+
+       rc = count;
+
+       spin_lock(&cli->cl_loi_list_lock);
+       if (val > (cli->cl_max_pages_per_rpc << PAGE_SHIFT))
+               rc = -ERANGE;
+       else
+               cli->cl_short_io_bytes = val;
+       spin_unlock(&cli->cl_loi_list_lock);
+
+out:
+       LPROCFS_CLIMP_EXIT(dev);
+       return rc;
+}
+EXPORT_SYMBOL(lprocfs_obd_short_io_bytes_seq_write);
+
 int lprocfs_wr_root_squash(const char __user *buffer, unsigned long count,
                           struct root_squash_info *squash, char *name)
 {
 int lprocfs_wr_root_squash(const char __user *buffer, unsigned long count,
                           struct root_squash_info *squash, char *name)
 {
index 84ba911..40b2e5f 100644 (file)
@@ -574,6 +574,8 @@ LPROC_SEQ_FOPS_RO(osc_destroys_in_flight);
 
 LPROC_SEQ_FOPS_RW_TYPE(osc, obd_max_pages_per_rpc);
 
 
 LPROC_SEQ_FOPS_RW_TYPE(osc, obd_max_pages_per_rpc);
 
+LPROC_SEQ_FOPS_RW_TYPE(osc, obd_short_io_bytes);
+
 static int osc_unstable_stats_seq_show(struct seq_file *m, void *v)
 {
        struct obd_device *dev = m->private;
 static int osc_unstable_stats_seq_show(struct seq_file *m, void *v)
 {
        struct obd_device *dev = m->private;
@@ -616,6 +618,8 @@ struct lprocfs_vars lprocfs_osc_obd_vars[] = {
          .fops =       &osc_active_fops                },
        { .name =       "max_pages_per_rpc",
          .fops =       &osc_obd_max_pages_per_rpc_fops },
          .fops =       &osc_active_fops                },
        { .name =       "max_pages_per_rpc",
          .fops =       &osc_obd_max_pages_per_rpc_fops },
+       { .name =       "short_io_bytes",
+         .fops =       &osc_obd_short_io_bytes_fops    },
        { .name =       "max_rpcs_in_flight",
          .fops =       &osc_max_rpcs_in_flight_fops    },
        { .name =       "destroys_in_flight",
        { .name =       "max_rpcs_in_flight",
          .fops =       &osc_max_rpcs_in_flight_fops    },
        { .name =       "destroys_in_flight",
index c0aa4be..e929211 100644 (file)
@@ -874,17 +874,27 @@ void osc_lru_unreserve(struct client_obd *cli, unsigned long npages)
  * are likely from the same page zone.
  */
 static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
  * are likely from the same page zone.
  */
 static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+                                           struct osc_brw_async_args *aa,
                                            int factor)
 {
                                            int factor)
 {
-       int page_count = desc->bd_iov_count;
+       int page_count;
        void *zone = NULL;
        int count = 0;
        int i;
 
        void *zone = NULL;
        int count = 0;
        int i;
 
-       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+       if (desc != NULL) {
+               LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+               page_count = desc->bd_iov_count;
+       } else {
+               page_count = aa->aa_page_count;
+       }
 
        for (i = 0; i < page_count; i++) {
 
        for (i = 0; i < page_count; i++) {
-               void *pz = page_zone(BD_GET_KIOV(desc, i).kiov_page);
+               void *pz;
+               if (desc)
+                       pz = page_zone(BD_GET_KIOV(desc, i).kiov_page);
+               else
+                       pz = page_zone(aa->aa_ppga[i]->pg);
 
                if (likely(pz == zone)) {
                        ++count;
 
                if (likely(pz == zone)) {
                        ++count;
@@ -903,14 +913,16 @@ static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
                mod_zone_page_state(zone, NR_UNSTABLE_NFS, factor * count);
 }
 
                mod_zone_page_state(zone, NR_UNSTABLE_NFS, factor * count);
 }
 
-static inline void add_unstable_page_accounting(struct ptlrpc_bulk_desc *desc)
+static inline void add_unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+                                               struct osc_brw_async_args *aa)
 {
 {
-       unstable_page_accounting(desc, 1);
+       unstable_page_accounting(desc, aa, 1);
 }
 
 }
 
-static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc)
+static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+                                               struct osc_brw_async_args *aa)
 {
 {
-       unstable_page_accounting(desc, -1);
+       unstable_page_accounting(desc, aa, -1);
 }
 
 /**
 }
 
 /**
@@ -927,12 +939,19 @@ static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc)
 void osc_dec_unstable_pages(struct ptlrpc_request *req)
 {
        struct ptlrpc_bulk_desc *desc       = req->rq_bulk;
 void osc_dec_unstable_pages(struct ptlrpc_request *req)
 {
        struct ptlrpc_bulk_desc *desc       = req->rq_bulk;
+       struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
        struct client_obd       *cli        = &req->rq_import->imp_obd->u.cli;
        struct client_obd       *cli        = &req->rq_import->imp_obd->u.cli;
-       int                      page_count = desc->bd_iov_count;
+       int                      page_count;
        long                     unstable_count;
 
        long                     unstable_count;
 
+       if (desc)
+               page_count = desc->bd_iov_count;
+       else
+               page_count = aa->aa_page_count;
+
        LASSERT(page_count >= 0);
        LASSERT(page_count >= 0);
-       dec_unstable_page_accounting(desc);
+
+       dec_unstable_page_accounting(desc, aa);
 
        unstable_count = atomic_long_sub_return(page_count,
                                                &cli->cl_unstable_count);
 
        unstable_count = atomic_long_sub_return(page_count,
                                                &cli->cl_unstable_count);
@@ -954,14 +973,20 @@ void osc_dec_unstable_pages(struct ptlrpc_request *req)
 void osc_inc_unstable_pages(struct ptlrpc_request *req)
 {
        struct ptlrpc_bulk_desc *desc = req->rq_bulk;
 void osc_inc_unstable_pages(struct ptlrpc_request *req)
 {
        struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+       struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
        struct client_obd       *cli  = &req->rq_import->imp_obd->u.cli;
        struct client_obd       *cli  = &req->rq_import->imp_obd->u.cli;
-       long                     page_count = desc->bd_iov_count;
+       long                     page_count;
 
        /* No unstable page tracking */
        if (cli->cl_cache == NULL || !cli->cl_cache->ccc_unstable_check)
                return;
 
 
        /* No unstable page tracking */
        if (cli->cl_cache == NULL || !cli->cl_cache->ccc_unstable_check)
                return;
 
-       add_unstable_page_accounting(desc);
+       if (desc)
+               page_count = desc->bd_iov_count;
+       else
+               page_count = aa->aa_page_count;
+
+       add_unstable_page_accounting(desc, aa);
        atomic_long_add(page_count, &cli->cl_unstable_count);
        atomic_long_add(page_count, &cli->cl_cache->ccc_unstable_nr);
 
        atomic_long_add(page_count, &cli->cl_unstable_count);
        atomic_long_add(page_count, &cli->cl_cache->ccc_unstable_nr);
 
index 4052b54..63ae9b0 100644 (file)
@@ -58,18 +58,6 @@ struct ptlrpc_request_pool *osc_rq_pool;
 static unsigned int osc_reqpool_mem_max = 5;
 module_param(osc_reqpool_mem_max, uint, 0444);
 
 static unsigned int osc_reqpool_mem_max = 5;
 module_param(osc_reqpool_mem_max, uint, 0444);
 
-struct osc_brw_async_args {
-       struct obdo              *aa_oa;
-       int                       aa_requested_nob;
-       int                       aa_nio_count;
-       u32                       aa_page_count;
-       int                       aa_resends;
-       struct brw_page **aa_ppga;
-       struct client_obd        *aa_cli;
-       struct list_head          aa_oaps;
-       struct list_head          aa_exts;
-};
-
 #define osc_grant_args osc_brw_async_args
 
 struct osc_setattr_args {
 #define osc_grant_args osc_brw_async_args
 
 struct osc_setattr_args {
@@ -1025,8 +1013,8 @@ static int check_write_rcs(struct ptlrpc_request *req,
                         return(-EPROTO);
                 }
         }
                         return(-EPROTO);
                 }
         }
-
-        if (req->rq_bulk->bd_nob_transferred != requested_nob) {
+       if (req->rq_bulk != NULL &&
+           req->rq_bulk->bd_nob_transferred != requested_nob) {
                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
                        req->rq_bulk->bd_nob_transferred, requested_nob);
                 return(-EPROTO);
                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
                        req->rq_bulk->bd_nob_transferred, requested_nob);
                 return(-EPROTO);
@@ -1119,10 +1107,11 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         struct ost_body         *body;
         struct obd_ioobj        *ioobj;
         struct niobuf_remote    *niobuf;
         struct ost_body         *body;
         struct obd_ioobj        *ioobj;
         struct niobuf_remote    *niobuf;
-        int niocount, i, requested_nob, opc, rc;
+       int niocount, i, requested_nob, opc, rc, short_io_size;
         struct osc_brw_async_args *aa;
         struct req_capsule      *pill;
         struct brw_page *pg_prev;
         struct osc_brw_async_args *aa;
         struct req_capsule      *pill;
         struct brw_page *pg_prev;
+       void *short_io_buf;
 
         ENTRY;
         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
 
         ENTRY;
         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
@@ -1153,6 +1142,20 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
                              niocount * sizeof(*niobuf));
 
         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
                              niocount * sizeof(*niobuf));
 
+       for (i = 0; i < page_count; i++)
+               short_io_size += pga[i]->count;
+
+       /* Check if we can do a short io. */
+       if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 &&
+           imp_connect_shortio(cli->cl_import)))
+               short_io_size = 0;
+
+       req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
+                            opc == OST_READ ? 0 : short_io_size);
+       if (opc == OST_READ)
+               req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
+                                    short_io_size);
+
         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
         if (rc) {
                 ptlrpc_request_free(req);
         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
         if (rc) {
                 ptlrpc_request_free(req);
@@ -1160,10 +1163,17 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         }
         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
         ptlrpc_at_set_req_timeout(req);
         }
         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
         ptlrpc_at_set_req_timeout(req);
+
        /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
         * retry logic */
        req->rq_no_retry_einprogress = 1;
 
        /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
         * retry logic */
        req->rq_no_retry_einprogress = 1;
 
+       if (short_io_size != 0) {
+               desc = NULL;
+               short_io_buf = NULL;
+               goto no_bulk;
+       }
+
        desc = ptlrpc_prep_bulk_imp(req, page_count,
                cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
                (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
        desc = ptlrpc_prep_bulk_imp(req, page_count,
                cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
                (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
@@ -1175,7 +1185,7 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
         /* NB request now owns desc and will free it when it gets freed */
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
         /* NB request now owns desc and will free it when it gets freed */
-
+no_bulk:
         body = req_capsule_client_get(pill, &RMF_OST_BODY);
         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
         body = req_capsule_client_get(pill, &RMF_OST_BODY);
         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
@@ -1190,7 +1200,26 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
         * "max - 1" for old client compatibility sending "0", and also so the
         * the actual maximum is a power-of-two number, not one less. LU-1431 */
         * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
         * "max - 1" for old client compatibility sending "0", and also so the
         * the actual maximum is a power-of-two number, not one less. LU-1431 */
-       ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
+       if (desc != NULL)
+               ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
+       else /* short io */
+               ioobj_max_brw_set(ioobj, 0);
+
+       if (short_io_size != 0) {
+               if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+                       body->oa.o_valid |= OBD_MD_FLFLAGS;
+                       body->oa.o_flags = 0;
+               }
+               body->oa.o_flags |= OBD_FL_SHORT_IO;
+               CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
+                      short_io_size);
+               if (opc == OST_WRITE) {
+                       short_io_buf = req_capsule_client_get(pill,
+                                                             &RMF_SHORT_IO);
+                       LASSERT(short_io_buf != NULL);
+               }
+       }
+
        LASSERT(page_count > 0);
        pg_prev = pga[0];
         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
        LASSERT(page_count > 0);
        pg_prev = pga[0];
         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
@@ -1215,9 +1244,19 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
                          pg_prev->pg->index, pg_prev->off);
                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
                         (pg->flag & OBD_BRW_SRVLOCK));
                          pg_prev->pg->index, pg_prev->off);
                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
                         (pg->flag & OBD_BRW_SRVLOCK));
-
-               desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
-                requested_nob += pg->count;
+               if (short_io_size != 0 && opc == OST_WRITE) {
+                       unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
+
+                       LASSERT(short_io_size >= requested_nob + pg->count);
+                       memcpy(short_io_buf + requested_nob,
+                              ptr + poff,
+                              pg->count);
+                       ll_kunmap_atomic(ptr, KM_USER0);
+               } else if (short_io_size == 0) {
+                       desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
+                                                        pg->count);
+               }
+               requested_nob += pg->count;
 
                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
                         niobuf--;
 
                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
                         niobuf--;
@@ -1486,9 +1525,9 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                         CERROR("Unexpected +ve rc %d\n", rc);
                         RETURN(-EPROTO);
                 }
                         CERROR("Unexpected +ve rc %d\n", rc);
                         RETURN(-EPROTO);
                 }
-                LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
 
 
-                if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
+               if (req->rq_bulk != NULL &&
+                   sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
                         RETURN(-EAGAIN);
 
                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
                         RETURN(-EAGAIN);
 
                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
@@ -1503,8 +1542,14 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
 
         /* The rest of this function executes only for OST_READs */
 
 
         /* The rest of this function executes only for OST_READs */
 
-        /* if unwrap_bulk failed, return -EAGAIN to retry */
-        rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
+       if (req->rq_bulk == NULL) {
+               rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
+                                         RCL_SERVER);
+               LASSERT(rc == req->rq_status);
+       } else {
+               /* if unwrap_bulk failed, return -EAGAIN to retry */
+               rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
+       }
         if (rc < 0)
                 GOTO(out, rc = -EAGAIN);
 
         if (rc < 0)
                 GOTO(out, rc = -EAGAIN);
 
@@ -1514,12 +1559,41 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                 RETURN(-EPROTO);
         }
 
                 RETURN(-EPROTO);
         }
 
-        if (rc != req->rq_bulk->bd_nob_transferred) {
+       if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
                 CERROR ("Unexpected rc %d (%d transferred)\n",
                         rc, req->rq_bulk->bd_nob_transferred);
                 return (-EPROTO);
         }
 
                 CERROR ("Unexpected rc %d (%d transferred)\n",
                         rc, req->rq_bulk->bd_nob_transferred);
                 return (-EPROTO);
         }
 
+       if (req->rq_bulk == NULL) {
+               /* short io */
+               int nob, pg_count, i = 0;
+               unsigned char *buf;
+
+               CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
+               pg_count = aa->aa_page_count;
+               buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
+                                                  rc);
+               nob = rc;
+               while (nob > 0 && pg_count > 0) {
+                       unsigned char *ptr;
+                       int count = aa->aa_ppga[i]->count > nob ?
+                                   nob : aa->aa_ppga[i]->count;
+
+                       CDEBUG(D_CACHE, "page %p count %d\n",
+                              aa->aa_ppga[i]->pg, count);
+                       ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
+                       memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
+                              count);
+                       ll_kunmap_atomic((void *) ptr, KM_USER0);
+
+                       buf += count;
+                       nob -= count;
+                       i++;
+                       pg_count--;
+               }
+       }
+
         if (rc < aa->aa_requested_nob)
                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
 
         if (rc < aa->aa_requested_nob)
                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
 
@@ -1536,7 +1610,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                                                  aa->aa_ppga, OST_READ,
                                                  cksum_type);
 
                                                  aa->aa_ppga, OST_READ,
                                                  cksum_type);
 
-               if (peer->nid != req->rq_bulk->bd_sender) {
+               if (req->rq_bulk != NULL &&
+                   peer->nid != req->rq_bulk->bd_sender) {
                        via = " via ";
                        router = libcfs_nid2str(req->rq_bulk->bd_sender);
                }
                        via = " via ";
                        router = libcfs_nid2str(req->rq_bulk->bd_sender);
                }
@@ -1710,6 +1785,7 @@ static int brw_interpret(const struct lu_env *env,
        struct osc_extent *ext;
        struct osc_extent *tmp;
        struct client_obd *cli = aa->aa_cli;
        struct osc_extent *ext;
        struct osc_extent *tmp;
        struct client_obd *cli = aa->aa_cli;
+       unsigned long           transferred = 0;
         ENTRY;
 
         rc = osc_brw_fini_request(req, rc);
         ENTRY;
 
         rc = osc_brw_fini_request(req, rc);
@@ -1802,8 +1878,12 @@ static int brw_interpret(const struct lu_env *env,
        LASSERT(list_empty(&aa->aa_exts));
        LASSERT(list_empty(&aa->aa_oaps));
 
        LASSERT(list_empty(&aa->aa_exts));
        LASSERT(list_empty(&aa->aa_oaps));
 
+       transferred = (req->rq_bulk == NULL ? /* short io */
+                      aa->aa_requested_nob :
+                      req->rq_bulk->bd_nob_transferred);
+
        osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
        osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
-       ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
+       ptlrpc_lprocfs_brw(req, transferred);
 
        spin_lock(&cli->cl_loi_list_lock);
        /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
 
        spin_lock(&cli->cl_loi_list_lock);
        /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
index 9ffb860..1b645c4 100644 (file)
@@ -609,16 +609,18 @@ static const struct req_msg_field *ost_destroy_client[] = {
 
 
 static const struct req_msg_field *ost_brw_client[] = {
 
 
 static const struct req_msg_field *ost_brw_client[] = {
-        &RMF_PTLRPC_BODY,
-        &RMF_OST_BODY,
-        &RMF_OBD_IOOBJ,
-        &RMF_NIOBUF_REMOTE,
-        &RMF_CAPA1
+       &RMF_PTLRPC_BODY,
+       &RMF_OST_BODY,
+       &RMF_OBD_IOOBJ,
+       &RMF_NIOBUF_REMOTE,
+       &RMF_CAPA1,
+       &RMF_SHORT_IO
 };
 
 static const struct req_msg_field *ost_brw_read_server[] = {
 };
 
 static const struct req_msg_field *ost_brw_read_server[] = {
-        &RMF_PTLRPC_BODY,
-        &RMF_OST_BODY
+       &RMF_PTLRPC_BODY,
+       &RMF_OST_BODY,
+       &RMF_SHORT_IO
 };
 
 static const struct req_msg_field *ost_brw_write_server[] = {
 };
 
 static const struct req_msg_field *ost_brw_write_server[] = {
@@ -1168,6 +1170,9 @@ struct req_msg_field RMF_IDX_INFO =
        DEFINE_MSGF("idx_info", 0, sizeof(struct idx_info),
                    lustre_swab_idx_info, NULL);
 EXPORT_SYMBOL(RMF_IDX_INFO);
        DEFINE_MSGF("idx_info", 0, sizeof(struct idx_info),
                    lustre_swab_idx_info, NULL);
 EXPORT_SYMBOL(RMF_IDX_INFO);
+struct req_msg_field RMF_SHORT_IO =
+       DEFINE_MSGF("short_io", 0, -1, NULL, NULL);
+EXPORT_SYMBOL(RMF_SHORT_IO);
 struct req_msg_field RMF_HSM_USER_STATE =
        DEFINE_MSGF("hsm_user_state", 0, sizeof(struct hsm_user_state),
                    lustre_swab_hsm_user_state, NULL);
 struct req_msg_field RMF_HSM_USER_STATE =
        DEFINE_MSGF("hsm_user_state", 0, sizeof(struct hsm_user_state),
                    lustre_swab_hsm_user_state, NULL);
index 8460774..e359462 100644 (file)
@@ -434,6 +434,19 @@ static int tgt_handle_request0(struct tgt_session_info *tsi,
                                             &RMF_ACL, RCL_SERVER,
                                             LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
 
                                             &RMF_ACL, RCL_SERVER,
                                             LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
 
+               if (req_capsule_has_field(tsi->tsi_pill, &RMF_SHORT_IO,
+                                         RCL_SERVER)) {
+                       struct niobuf_remote *remote_nb =
+                               req_capsule_client_get(tsi->tsi_pill,
+                                                      &RMF_NIOBUF_REMOTE);
+                       struct ost_body *body = tsi->tsi_ost_body;
+
+                       req_capsule_set_size(tsi->tsi_pill, &RMF_SHORT_IO,
+                                        RCL_SERVER,
+                                        (body->oa.o_flags & OBD_FL_SHORT_IO) ?
+                                        remote_nb[0].rnb_len : 0);
+               }
+
                rc = req_capsule_server_pack(tsi->tsi_pill);
        }
 
                rc = req_capsule_server_pack(tsi->tsi_pill);
        }
 
@@ -1657,10 +1670,9 @@ void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob,
                tgt_extent_unlock(lh, mode);
        EXIT;
 }
                tgt_extent_unlock(lh, mode);
        EXIT;
 }
-
-static __u32 tgt_checksum_bulk(struct lu_target *tgt,
-                              struct ptlrpc_bulk_desc *desc, int opc,
-                              enum cksum_types cksum_type)
+static __u32 tgt_checksum_niobuf(struct lu_target *tgt,
+                                struct niobuf_local *local_nb, int npages,
+                                int opc, enum cksum_types cksum_type)
 {
        struct cfs_crypto_hash_desc     *hdesc;
        unsigned int                    bufsize;
 {
        struct cfs_crypto_hash_desc     *hdesc;
        unsigned int                    bufsize;
@@ -1668,8 +1680,6 @@ static __u32 tgt_checksum_bulk(struct lu_target *tgt,
        unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
        __u32                           cksum;
 
        unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
        __u32                           cksum;
 
-       LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
-
        hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
        if (IS_ERR(hdesc)) {
                CERROR("%s: unable to initialize checksum hash %s\n",
        hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
        if (IS_ERR(hdesc)) {
                CERROR("%s: unable to initialize checksum hash %s\n",
@@ -1678,65 +1688,64 @@ static __u32 tgt_checksum_bulk(struct lu_target *tgt,
        }
 
        CDEBUG(D_INFO, "Checksum for algo %s\n", cfs_crypto_hash_name(cfs_alg));
        }
 
        CDEBUG(D_INFO, "Checksum for algo %s\n", cfs_crypto_hash_name(cfs_alg));
-       for (i = 0; i < desc->bd_iov_count; i++) {
+       for (i = 0; i < npages; i++) {
                /* corrupt the data before we compute the checksum, to
                 * simulate a client->OST data error */
                if (i == 0 && opc == OST_WRITE &&
                    OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) {
                /* corrupt the data before we compute the checksum, to
                 * simulate a client->OST data error */
                if (i == 0 && opc == OST_WRITE &&
                    OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) {
-                       int off = BD_GET_KIOV(desc, i).kiov_offset &
-                               ~PAGE_MASK;
-                       int len = BD_GET_KIOV(desc, i).kiov_len;
+                       int off = local_nb[i].lnb_page_offset & ~PAGE_MASK;
+                       int len = local_nb[i].lnb_len;
                        struct page *np = tgt_page_to_corrupt;
                        struct page *np = tgt_page_to_corrupt;
-                       char *ptr = kmap(BD_GET_KIOV(desc, i).kiov_page) + off;
 
                        if (np) {
 
                        if (np) {
-                               char *ptr2 = kmap(np) + off;
+                               char *ptr = ll_kmap_atomic(local_nb[i].lnb_page,
+                                                       KM_USER0);
+                               char *ptr2 = page_address(np);
 
 
-                               memcpy(ptr2, ptr, len);
-                               memcpy(ptr2, "bad3", min(4, len));
-                               kunmap(np);
+                               memcpy(ptr2 + off, ptr + off, len);
+                               memcpy(ptr2 + off, "bad3", min(4, len));
+                               ll_kunmap_atomic(ptr, KM_USER0);
 
                                /* LU-8376 to preserve original index for
                                 * display in dump_all_bulk_pages() */
 
                                /* LU-8376 to preserve original index for
                                 * display in dump_all_bulk_pages() */
-                               np->index = BD_GET_KIOV(desc,
-                                                       i).kiov_page->index;
+                               np->index = i;
 
 
-                               BD_GET_KIOV(desc, i).kiov_page = np;
+                               cfs_crypto_hash_update_page(hdesc, np, off,
+                                                           len);
+                               continue;
                        } else {
                                CERROR("%s: can't alloc page for corruption\n",
                                       tgt_name(tgt));
                        }
                }
                        } else {
                                CERROR("%s: can't alloc page for corruption\n",
                                       tgt_name(tgt));
                        }
                }
-               cfs_crypto_hash_update_page(hdesc,
-                                 BD_GET_KIOV(desc, i).kiov_page,
-                                 BD_GET_KIOV(desc, i).kiov_offset &
-                                       ~PAGE_MASK,
-                                 BD_GET_KIOV(desc, i).kiov_len);
+               cfs_crypto_hash_update_page(hdesc, local_nb[i].lnb_page,
+                                 local_nb[i].lnb_page_offset & ~PAGE_MASK,
+                                 local_nb[i].lnb_len);
 
                 /* corrupt the data after we compute the checksum, to
                 * simulate an OST->client data error */
                if (i == 0 && opc == OST_READ &&
                    OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
 
                 /* corrupt the data after we compute the checksum, to
                 * simulate an OST->client data error */
                if (i == 0 && opc == OST_READ &&
                    OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
-                       int off = BD_GET_KIOV(desc, i).kiov_offset
-                         & ~PAGE_MASK;
-                       int len = BD_GET_KIOV(desc, i).kiov_len;
+                       int off = local_nb[i].lnb_page_offset & ~PAGE_MASK;
+                       int len = local_nb[i].lnb_len;
                        struct page *np = tgt_page_to_corrupt;
                        struct page *np = tgt_page_to_corrupt;
-                       char *ptr =
-                         kmap(BD_GET_KIOV(desc, i).kiov_page) + off;
 
                        if (np) {
 
                        if (np) {
-                               char *ptr2 = kmap(np) + off;
+                               char *ptr = ll_kmap_atomic(local_nb[i].lnb_page,
+                                                       KM_USER0);
+                               char *ptr2 = page_address(np);
 
 
-                               memcpy(ptr2, ptr, len);
-                               memcpy(ptr2, "bad4", min(4, len));
-                               kunmap(np);
+                               memcpy(ptr2 + off, ptr + off, len);
+                               memcpy(ptr2 + off, "bad4", min(4, len));
+                               ll_kunmap_atomic(ptr, KM_USER0);
 
                                /* LU-8376 to preserve original index for
                                 * display in dump_all_bulk_pages() */
 
                                /* LU-8376 to preserve original index for
                                 * display in dump_all_bulk_pages() */
-                               np->index = BD_GET_KIOV(desc,
-                                                       i).kiov_page->index;
+                               np->index = i;
 
 
-                               BD_GET_KIOV(desc, i).kiov_page = np;
+                               cfs_crypto_hash_update_page(hdesc, np, off,
+                                                           len);
+                               continue;
                        } else {
                                CERROR("%s: can't alloc page for corruption\n",
                                       tgt_name(tgt));
                        } else {
                                CERROR("%s: can't alloc page for corruption\n",
                                       tgt_name(tgt));
@@ -1753,8 +1762,8 @@ static __u32 tgt_checksum_bulk(struct lu_target *tgt,
 char dbgcksum_file_name[PATH_MAX];
 
 static void dump_all_bulk_pages(struct obdo *oa, int count,
 char dbgcksum_file_name[PATH_MAX];
 
 static void dump_all_bulk_pages(struct obdo *oa, int count,
-                                   lnet_kiov_t *iov, __u32 server_cksum,
-                                   __u32 client_cksum)
+                               struct niobuf_local *local_nb,
+                               __u32 server_cksum, __u32 client_cksum)
 {
        struct file *filp;
        int rc, i;
 {
        struct file *filp;
        int rc, i;
@@ -1772,9 +1781,9 @@ static void dump_all_bulk_pages(struct obdo *oa, int count,
                 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
                 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
                 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
                 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
                 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
                 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
-                (__u64)iov[0].kiov_page->index << PAGE_SHIFT,
-                ((__u64)iov[count - 1].kiov_page->index << PAGE_SHIFT) +
-                iov[count - 1].kiov_len - 1, client_cksum, server_cksum);
+                local_nb[0].lnb_file_offset,
+                local_nb[count-1].lnb_file_offset +
+                local_nb[count-1].lnb_len - 1, client_cksum, server_cksum);
        filp = filp_open(dbgcksum_file_name,
                         O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
        if (IS_ERR(filp)) {
        filp = filp_open(dbgcksum_file_name,
                         O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
        if (IS_ERR(filp)) {
@@ -1792,8 +1801,8 @@ static void dump_all_bulk_pages(struct obdo *oa, int count,
        oldfs = get_fs();
        set_fs(KERNEL_DS);
        for (i = 0; i < count; i++) {
        oldfs = get_fs();
        set_fs(KERNEL_DS);
        for (i = 0; i < count; i++) {
-               len = iov[i].kiov_len;
-               buf = kmap(iov[i].kiov_page);
+               len = local_nb[i].lnb_len;
+               buf = kmap(local_nb[i].lnb_page);
                while (len != 0) {
                        rc = vfs_write(filp, (__force const char __user *)buf,
                                       len, &filp->f_pos);
                while (len != 0) {
                        rc = vfs_write(filp, (__force const char __user *)buf,
                                       len, &filp->f_pos);
@@ -1807,7 +1816,7 @@ static void dump_all_bulk_pages(struct obdo *oa, int count,
                        CDEBUG(D_INFO, "%s: wrote %d bytes\n",
                               dbgcksum_file_name, rc);
                }
                        CDEBUG(D_INFO, "%s: wrote %d bytes\n",
                               dbgcksum_file_name, rc);
                }
-               kunmap(iov[i].kiov_page);
+               kunmap(local_nb[i].lnb_page);
        }
        set_fs(oldfs);
 
        }
        set_fs(oldfs);
 
@@ -1818,13 +1827,15 @@ static void dump_all_bulk_pages(struct obdo *oa, int count,
        return;
 }
 
        return;
 }
 
-static int check_read_checksum(struct ptlrpc_bulk_desc *desc, struct obdo *oa,
+static int check_read_checksum(struct niobuf_local *local_nb, int npages,
+                              struct obd_export *exp, struct obdo *oa,
                               const lnet_process_id_t *peer,
                               __u32 client_cksum, __u32 server_cksum,
                               enum cksum_types server_cksum_type)
 {
        char *msg;
        enum cksum_types cksum_type;
                               const lnet_process_id_t *peer,
                               __u32 client_cksum, __u32 server_cksum,
                               enum cksum_types server_cksum_type)
 {
        char *msg;
        enum cksum_types cksum_type;
+       loff_t start, end;
 
        /* unlikely to happen and only if resend does not occur due to cksum
         * control failure on Client */
 
        /* unlikely to happen and only if resend does not occur due to cksum
         * control failure on Client */
@@ -1834,9 +1845,8 @@ static int check_read_checksum(struct ptlrpc_bulk_desc *desc, struct obdo *oa,
                return 0;
        }
 
                return 0;
        }
 
-       if (desc->bd_export->exp_obd->obd_checksum_dump)
-               dump_all_bulk_pages(oa, desc->bd_iov_count,
-                                   &BD_GET_KIOV(desc, 0), server_cksum,
+       if (exp->exp_obd->obd_checksum_dump)
+               dump_all_bulk_pages(oa, npages, local_nb, server_cksum,
                                    client_cksum);
 
        cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
                                    client_cksum);
 
        cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
@@ -1848,24 +1858,49 @@ static int check_read_checksum(struct ptlrpc_bulk_desc *desc, struct obdo *oa,
        else
                msg = "should have changed on the client or in transit";
 
        else
                msg = "should have changed on the client or in transit";
 
+       start = local_nb[0].lnb_file_offset;
+       end = local_nb[npages-1].lnb_file_offset +
+                                       local_nb[npages-1].lnb_len - 1;
+
        LCONSOLE_ERROR_MSG(0x132, "%s: BAD READ CHECKSUM: %s: from %s inode "
                DFID " object "DOSTID" extent [%llu-%llu], client returned csum"
                " %x (type %x), server csum %x (type %x)\n",
        LCONSOLE_ERROR_MSG(0x132, "%s: BAD READ CHECKSUM: %s: from %s inode "
                DFID " object "DOSTID" extent [%llu-%llu], client returned csum"
                " %x (type %x), server csum %x (type %x)\n",
-               desc->bd_export->exp_obd->obd_name,
+               exp->exp_obd->obd_name,
                msg, libcfs_nid2str(peer->nid),
                oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
                oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
                oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
                POSTID(&oa->o_oi),
                msg, libcfs_nid2str(peer->nid),
                oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
                oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
                oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
                POSTID(&oa->o_oi),
-               (__u64)BD_GET_KIOV(desc, 0).kiov_page->index << PAGE_SHIFT,
-               ((__u64)BD_GET_KIOV(desc,
-                                   desc->bd_iov_count - 1).kiov_page->index
-                       << PAGE_SHIFT) +
-                       BD_GET_KIOV(desc, desc->bd_iov_count - 1).kiov_len - 1,
-               client_cksum, cksum_type, server_cksum, server_cksum_type);
+               start, end, client_cksum, cksum_type, server_cksum,
+               server_cksum_type);
+
        return 1;
 }
 
        return 1;
 }
 
+static int tgt_pages2shortio(struct niobuf_local *local, int npages,
+                            unsigned char *buf, int size)
+{
+       int     i, off, len, copied = size;
+       char    *ptr;
+
+       for (i = 0; i < npages; i++) {
+               off = local[i].lnb_page_offset & ~PAGE_MASK;
+               len = local[i].lnb_len;
+
+               CDEBUG(D_PAGE, "index %d offset = %d len = %d left = %d\n",
+                      i, off, len, size);
+               if (len > size)
+                       return -EINVAL;
+
+               ptr = ll_kmap_atomic(local[i].lnb_page, KM_USER0);
+               memcpy(buf + off, ptr, len);
+               ll_kunmap_atomic(ptr, KM_USER0);
+               buf += len;
+               size -= len;
+       }
+       return copied - size;
+}
+
 int tgt_brw_read(struct tgt_session_info *tsi)
 {
        struct ptlrpc_request   *req = tgt_ses_req(tsi);
 int tgt_brw_read(struct tgt_session_info *tsi)
 {
        struct ptlrpc_request   *req = tgt_ses_req(tsi);
@@ -1877,7 +1912,8 @@ int tgt_brw_read(struct tgt_session_info *tsi)
        struct ost_body         *body, *repbody;
        struct l_wait_info       lwi;
        struct lustre_handle     lockh = { 0 };
        struct ost_body         *body, *repbody;
        struct l_wait_info       lwi;
        struct lustre_handle     lockh = { 0 };
-       int                      npages, nob = 0, rc, i, no_reply = 0;
+       int                      npages, nob = 0, rc, i, no_reply = 0,
+                                npages_read;
        struct tgt_thread_big_cache *tbc = req->rq_svc_thread->t_data;
 
        ENTRY;
        struct tgt_thread_big_cache *tbc = req->rq_svc_thread->t_data;
 
        ENTRY;
@@ -1953,33 +1989,41 @@ int tgt_brw_read(struct tgt_session_info *tsi)
        if (rc != 0)
                GOTO(out_lock, rc);
 
        if (rc != 0)
                GOTO(out_lock, rc);
 
-       desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
-                                   PTLRPC_BULK_PUT_SOURCE |
-                                       PTLRPC_BULK_BUF_KIOV,
-                                   OST_BULK_PORTAL,
-                                   &ptlrpc_bulk_kiov_nopin_ops);
-       if (desc == NULL)
-               GOTO(out_commitrw, rc = -ENOMEM);
+       if (body->oa.o_flags & OBD_FL_SHORT_IO) {
+               desc = NULL;
+       } else {
+               desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
+                                           PTLRPC_BULK_PUT_SOURCE |
+                                               PTLRPC_BULK_BUF_KIOV,
+                                           OST_BULK_PORTAL,
+                                           &ptlrpc_bulk_kiov_nopin_ops);
+               if (desc == NULL)
+                       GOTO(out_commitrw, rc = -ENOMEM);
+       }
 
        nob = 0;
 
        nob = 0;
+       npages_read = npages;
        for (i = 0; i < npages; i++) {
                int page_rc = local_nb[i].lnb_rc;
 
                if (page_rc < 0) {
                        rc = page_rc;
        for (i = 0; i < npages; i++) {
                int page_rc = local_nb[i].lnb_rc;
 
                if (page_rc < 0) {
                        rc = page_rc;
+                       npages_read = i;
                        break;
                }
 
                nob += page_rc;
                        break;
                }
 
                nob += page_rc;
-               if (page_rc != 0) { /* some data! */
+               if (page_rc != 0 && desc != NULL) { /* some data! */
                        LASSERT(local_nb[i].lnb_page != NULL);
                        desc->bd_frag_ops->add_kiov_frag
                          (desc, local_nb[i].lnb_page,
                        LASSERT(local_nb[i].lnb_page != NULL);
                        desc->bd_frag_ops->add_kiov_frag
                          (desc, local_nb[i].lnb_page,
-                          local_nb[i].lnb_page_offset,
+                          local_nb[i].lnb_page_offset & ~PAGE_MASK,
                           page_rc);
                }
 
                if (page_rc != local_nb[i].lnb_len) { /* short read */
                           page_rc);
                }
 
                if (page_rc != local_nb[i].lnb_len) { /* short read */
+                       local_nb[i].lnb_len = page_rc;
+                       npages_read = i + (page_rc != 0 ? 1 : 0);
                        /* All subsequent pages should be 0 */
                        while (++i < npages)
                                LASSERT(local_nb[i].lnb_rc == 0);
                        /* All subsequent pages should be 0 */
                        while (++i < npages)
                                LASSERT(local_nb[i].lnb_rc == 0);
@@ -1997,8 +2041,9 @@ int tgt_brw_read(struct tgt_session_info *tsi)
 
                repbody->oa.o_flags = cksum_type_pack(cksum_type);
                repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
 
                repbody->oa.o_flags = cksum_type_pack(cksum_type);
                repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
-               repbody->oa.o_cksum = tgt_checksum_bulk(tsi->tsi_tgt, desc,
-                                                       OST_READ, cksum_type);
+               repbody->oa.o_cksum = tgt_checksum_niobuf(tsi->tsi_tgt,
+                                                        local_nb, npages_read,
+                                                        OST_READ, cksum_type);
                CDEBUG(D_PAGE, "checksum at read origin: %x\n",
                       repbody->oa.o_cksum);
 
                CDEBUG(D_PAGE, "checksum at read origin: %x\n",
                       repbody->oa.o_cksum);
 
@@ -2007,7 +2052,8 @@ int tgt_brw_read(struct tgt_session_info *tsi)
                 * zero-cksum case) */
                if ((body->oa.o_valid & OBD_MD_FLFLAGS) &&
                    (body->oa.o_flags & OBD_FL_RECOV_RESEND))
                 * zero-cksum case) */
                if ((body->oa.o_valid & OBD_MD_FLFLAGS) &&
                    (body->oa.o_flags & OBD_FL_RECOV_RESEND))
-                       check_read_checksum(desc, &body->oa, &req->rq_peer,
+                       check_read_checksum(local_nb, npages_read, exp,
+                                           &body->oa, &req->rq_peer,
                                            body->oa.o_cksum,
                                            repbody->oa.o_cksum, cksum_type);
        } else {
                                            body->oa.o_cksum,
                                            repbody->oa.o_cksum, cksum_type);
        } else {
@@ -2017,11 +2063,31 @@ int tgt_brw_read(struct tgt_session_info *tsi)
 
        /* Check if client was evicted while we were doing i/o before touching
         * network */
 
        /* Check if client was evicted while we were doing i/o before touching
         * network */
-       if (likely(rc == 0 &&
-                  !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2) &&
-                  !CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_BULK))) {
-               rc = target_bulk_io(exp, desc, &lwi);
+       if (rc == 0) {
+               if (body->oa.o_flags & OBD_FL_SHORT_IO) {
+                       unsigned char *short_io_buf;
+                       int short_io_size;
+
+                       short_io_buf = req_capsule_server_get(&req->rq_pill,
+                                                             &RMF_SHORT_IO);
+                       short_io_size = req_capsule_get_size(&req->rq_pill,
+                                                            &RMF_SHORT_IO,
+                                                            RCL_SERVER);
+                       rc = tgt_pages2shortio(local_nb, npages_read,
+                                              short_io_buf, short_io_size);
+                       if (rc >= 0)
+                               req_capsule_shrink(&req->rq_pill,
+                                                  &RMF_SHORT_IO, rc,
+                                                  RCL_SERVER);
+                       rc = rc > 0 ? 0 : rc;
+               } else if (!CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2)) {
+                       rc = target_bulk_io(exp, desc, &lwi);
+               }
                no_reply = rc != 0;
                no_reply = rc != 0;
+       } else {
+               if (body->oa.o_flags & OBD_FL_SHORT_IO)
+                       req_capsule_shrink(&req->rq_pill, &RMF_SHORT_IO, 0,
+                                          RCL_SERVER);
        }
 
 out_commitrw:
        }
 
 out_commitrw:
@@ -2049,8 +2115,10 @@ out_lock:
                              obd_export_nid2str(exp), rc);
        }
        /* send a bulk after reply to simulate a network delay or reordering
                              obd_export_nid2str(exp), rc);
        }
        /* send a bulk after reply to simulate a network delay or reordering
-        * by a router */
-       if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
+        * by a router - Note that !desc implies short io, so there is no bulk
+        * to reorder. */
+       if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2)) &&
+           desc) {
                wait_queue_head_t        waitq;
                struct l_wait_info       lwi1;
 
                wait_queue_head_t        waitq;
                struct l_wait_info       lwi1;
 
@@ -2067,6 +2135,32 @@ out_lock:
 }
 EXPORT_SYMBOL(tgt_brw_read);
 
 }
 EXPORT_SYMBOL(tgt_brw_read);
 
+static int tgt_shortio2pages(struct niobuf_local *local, int npages,
+                            unsigned char *buf, int size)
+{
+       int     i, off, len;
+       char    *ptr;
+
+       for (i = 0; i < npages; i++) {
+               off = local[i].lnb_page_offset & ~PAGE_MASK;
+               len = local[i].lnb_len;
+
+               if (len == 0)
+                       continue;
+
+               CDEBUG(D_PAGE, "index %d offset = %d len = %d left = %d\n",
+                      i, off, len, size);
+               ptr = ll_kmap_atomic(local[i].lnb_page, KM_USER0);
+               if (ptr == NULL)
+                       return -EINVAL;
+               memcpy(ptr + off, buf, len < size ? len : size);
+               ll_kunmap_atomic(ptr, KM_USER0);
+               buf += len;
+               size -= len;
+       }
+       return 0;
+}
+
 static void tgt_warn_on_cksum(struct ptlrpc_request *req,
                              struct ptlrpc_bulk_desc *desc,
                              struct niobuf_local *local_nb, int npages,
 static void tgt_warn_on_cksum(struct ptlrpc_request *req,
                              struct ptlrpc_bulk_desc *desc,
                              struct niobuf_local *local_nb, int npages,
@@ -2081,14 +2175,13 @@ static void tgt_warn_on_cksum(struct ptlrpc_request *req,
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
        LASSERT(body != NULL);
 
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
        LASSERT(body != NULL);
 
-       if (req->rq_peer.nid != desc->bd_sender) {
+       if (desc && req->rq_peer.nid != desc->bd_sender) {
                via = " via ";
                router = libcfs_nid2str(desc->bd_sender);
        }
 
        if (exp->exp_obd->obd_checksum_dump)
                via = " via ";
                router = libcfs_nid2str(desc->bd_sender);
        }
 
        if (exp->exp_obd->obd_checksum_dump)
-               dump_all_bulk_pages(&body->oa, desc->bd_iov_count,
-                                   &BD_GET_KIOV(desc, 0), server_cksum,
+               dump_all_bulk_pages(&body->oa, npages, local_nb, server_cksum,
                                    client_cksum);
 
        if (mmap) {
                                    client_cksum);
 
        if (mmap) {
@@ -2238,26 +2331,45 @@ int tgt_brw_write(struct tgt_session_info *tsi)
                        objcount, ioo, remote_nb, &npages, local_nb);
        if (rc < 0)
                GOTO(out_lock, rc);
                        objcount, ioo, remote_nb, &npages, local_nb);
        if (rc < 0)
                GOTO(out_lock, rc);
+       if (body->oa.o_flags & OBD_FL_SHORT_IO) {
+               int short_io_size;
+               unsigned char *short_io_buf;
+
+               short_io_size = req_capsule_get_size(&req->rq_pill,
+                                                    &RMF_SHORT_IO,
+                                                    RCL_CLIENT);
+               short_io_buf = req_capsule_client_get(&req->rq_pill,
+                                                     &RMF_SHORT_IO);
+               CDEBUG(D_INFO, "Client use short io for data transfer,"
+                              " size = %d\n", short_io_size);
+
+               /* Copy short io buf to pages */
+               rc = tgt_shortio2pages(local_nb, npages, short_io_buf,
+                                      short_io_size);
+               desc = NULL;
+       } else {
+               desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
+                                           PTLRPC_BULK_GET_SINK |
+                                           PTLRPC_BULK_BUF_KIOV,
+                                           OST_BULK_PORTAL,
+                                           &ptlrpc_bulk_kiov_nopin_ops);
+               if (desc == NULL)
+                       GOTO(skip_transfer, rc = -ENOMEM);
+
+               /* NB Having prepped, we must commit... */
+               for (i = 0; i < npages; i++)
+                       desc->bd_frag_ops->add_kiov_frag(desc,
+                                       local_nb[i].lnb_page,
+                                       local_nb[i].lnb_page_offset & ~PAGE_MASK,
+                                       local_nb[i].lnb_len);
+
+               rc = sptlrpc_svc_prep_bulk(req, desc);
+               if (rc != 0)
+                       GOTO(skip_transfer, rc);
 
 
-       desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
-                                   PTLRPC_BULK_GET_SINK | PTLRPC_BULK_BUF_KIOV,
-                                   OST_BULK_PORTAL,
-                                   &ptlrpc_bulk_kiov_nopin_ops);
-       if (desc == NULL)
-               GOTO(skip_transfer, rc = -ENOMEM);
-
-       /* NB Having prepped, we must commit... */
-       for (i = 0; i < npages; i++)
-               desc->bd_frag_ops->add_kiov_frag(desc,
-                                                local_nb[i].lnb_page,
-                                                local_nb[i].lnb_page_offset,
-                                                local_nb[i].lnb_len);
-
-       rc = sptlrpc_svc_prep_bulk(req, desc);
-       if (rc != 0)
-               GOTO(skip_transfer, rc);
+               rc = target_bulk_io(exp, desc, &lwi);
+       }
 
 
-       rc = target_bulk_io(exp, desc, &lwi);
        no_reply = rc != 0;
 
 skip_transfer:
        no_reply = rc != 0;
 
 skip_transfer:
@@ -2270,8 +2382,10 @@ skip_transfer:
                repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
                repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
                repbody->oa.o_flags |= cksum_type_pack(cksum_type);
                repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
                repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
                repbody->oa.o_flags |= cksum_type_pack(cksum_type);
-               repbody->oa.o_cksum = tgt_checksum_bulk(tsi->tsi_tgt, desc,
-                                                       OST_WRITE, cksum_type);
+               repbody->oa.o_cksum = tgt_checksum_niobuf(tsi->tsi_tgt,
+                                                         local_nb, npages,
+                                                         OST_WRITE,
+                                                         cksum_type);
                cksum_counter++;
 
                if (unlikely(body->oa.o_cksum != repbody->oa.o_cksum)) {
                cksum_counter++;
 
                if (unlikely(body->oa.o_cksum != repbody->oa.o_cksum)) {