Whamcloud - gitweb
LU-1757 brw: add short io osc/ost transfer.
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 4052b54..63ae9b0 100644 (file)
@@ -58,18 +58,6 @@ struct ptlrpc_request_pool *osc_rq_pool;
 static unsigned int osc_reqpool_mem_max = 5;
 module_param(osc_reqpool_mem_max, uint, 0444);
 
-struct osc_brw_async_args {
-       struct obdo              *aa_oa;
-       int                       aa_requested_nob;
-       int                       aa_nio_count;
-       u32                       aa_page_count;
-       int                       aa_resends;
-       struct brw_page **aa_ppga;
-       struct client_obd        *aa_cli;
-       struct list_head          aa_oaps;
-       struct list_head          aa_exts;
-};
-
 #define osc_grant_args osc_brw_async_args
 
 struct osc_setattr_args {
@@ -1025,8 +1013,8 @@ static int check_write_rcs(struct ptlrpc_request *req,
                         return(-EPROTO);
                 }
         }
-
-        if (req->rq_bulk->bd_nob_transferred != requested_nob) {
+       if (req->rq_bulk != NULL &&
+           req->rq_bulk->bd_nob_transferred != requested_nob) {
                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
                        req->rq_bulk->bd_nob_transferred, requested_nob);
                 return(-EPROTO);
@@ -1119,10 +1107,11 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         struct ost_body         *body;
         struct obd_ioobj        *ioobj;
         struct niobuf_remote    *niobuf;
-        int niocount, i, requested_nob, opc, rc;
+       int niocount, i, requested_nob, opc, rc, short_io_size;
         struct osc_brw_async_args *aa;
         struct req_capsule      *pill;
         struct brw_page *pg_prev;
+       void *short_io_buf;
 
         ENTRY;
         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
@@ -1153,6 +1142,20 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
                              niocount * sizeof(*niobuf));
 
+       for (i = 0; i < page_count; i++)
+               short_io_size += pga[i]->count;
+
+       /* Check if we can do a short io. */
+       if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 &&
+           imp_connect_shortio(cli->cl_import)))
+               short_io_size = 0;
+
+       req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
+                            opc == OST_READ ? 0 : short_io_size);
+       if (opc == OST_READ)
+               req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
+                                    short_io_size);
+
         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
         if (rc) {
                 ptlrpc_request_free(req);
@@ -1160,10 +1163,17 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         }
         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
         ptlrpc_at_set_req_timeout(req);
+
        /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
         * retry logic */
        req->rq_no_retry_einprogress = 1;
 
+       if (short_io_size != 0) {
+               desc = NULL;
+               short_io_buf = NULL;
+               goto no_bulk;
+       }
+
        desc = ptlrpc_prep_bulk_imp(req, page_count,
                cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
                (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
@@ -1175,7 +1185,7 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
         /* NB request now owns desc and will free it when it gets freed */
-
+no_bulk:
         body = req_capsule_client_get(pill, &RMF_OST_BODY);
         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
@@ -1190,7 +1200,26 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
         * "max - 1" for old client compatibility sending "0", and also so the
         * the actual maximum is a power-of-two number, not one less. LU-1431 */
-       ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
+       if (desc != NULL)
+               ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
+       else /* short io */
+               ioobj_max_brw_set(ioobj, 0);
+
+       if (short_io_size != 0) {
+               if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+                       body->oa.o_valid |= OBD_MD_FLFLAGS;
+                       body->oa.o_flags = 0;
+               }
+               body->oa.o_flags |= OBD_FL_SHORT_IO;
+               CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
+                      short_io_size);
+               if (opc == OST_WRITE) {
+                       short_io_buf = req_capsule_client_get(pill,
+                                                             &RMF_SHORT_IO);
+                       LASSERT(short_io_buf != NULL);
+               }
+       }
+
        LASSERT(page_count > 0);
        pg_prev = pga[0];
         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
@@ -1215,9 +1244,19 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
                          pg_prev->pg->index, pg_prev->off);
                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
                         (pg->flag & OBD_BRW_SRVLOCK));
-
-               desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
-                requested_nob += pg->count;
+               if (short_io_size != 0 && opc == OST_WRITE) {
+                       unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
+
+                       LASSERT(short_io_size >= requested_nob + pg->count);
+                       memcpy(short_io_buf + requested_nob,
+                              ptr + poff,
+                              pg->count);
+                       ll_kunmap_atomic(ptr, KM_USER0);
+               } else if (short_io_size == 0) {
+                       desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
+                                                        pg->count);
+               }
+               requested_nob += pg->count;
 
                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
                         niobuf--;
@@ -1486,9 +1525,9 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                         CERROR("Unexpected +ve rc %d\n", rc);
                         RETURN(-EPROTO);
                 }
-                LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
 
-                if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
+               if (req->rq_bulk != NULL &&
+                   sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
                         RETURN(-EAGAIN);
 
                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
@@ -1503,8 +1542,14 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
 
         /* The rest of this function executes only for OST_READs */
 
-        /* if unwrap_bulk failed, return -EAGAIN to retry */
-        rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
+       if (req->rq_bulk == NULL) {
+               rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
+                                         RCL_SERVER);
+               LASSERT(rc == req->rq_status);
+       } else {
+               /* if unwrap_bulk failed, return -EAGAIN to retry */
+               rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
+       }
         if (rc < 0)
                 GOTO(out, rc = -EAGAIN);
 
@@ -1514,12 +1559,41 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                 RETURN(-EPROTO);
         }
 
-        if (rc != req->rq_bulk->bd_nob_transferred) {
+       if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
                 CERROR ("Unexpected rc %d (%d transferred)\n",
                         rc, req->rq_bulk->bd_nob_transferred);
                 return (-EPROTO);
         }
 
+       if (req->rq_bulk == NULL) {
+               /* short io */
+               int nob, pg_count, i = 0;
+               unsigned char *buf;
+
+               CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
+               pg_count = aa->aa_page_count;
+               buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
+                                                  rc);
+               nob = rc;
+               while (nob > 0 && pg_count > 0) {
+                       unsigned char *ptr;
+                       int count = aa->aa_ppga[i]->count > nob ?
+                                   nob : aa->aa_ppga[i]->count;
+
+                       CDEBUG(D_CACHE, "page %p count %d\n",
+                              aa->aa_ppga[i]->pg, count);
+                       ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
+                       memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
+                              count);
+                       ll_kunmap_atomic((void *) ptr, KM_USER0);
+
+                       buf += count;
+                       nob -= count;
+                       i++;
+                       pg_count--;
+               }
+       }
+
         if (rc < aa->aa_requested_nob)
                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
 
@@ -1536,7 +1610,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                                                  aa->aa_ppga, OST_READ,
                                                  cksum_type);
 
-               if (peer->nid != req->rq_bulk->bd_sender) {
+               if (req->rq_bulk != NULL &&
+                   peer->nid != req->rq_bulk->bd_sender) {
                        via = " via ";
                        router = libcfs_nid2str(req->rq_bulk->bd_sender);
                }
@@ -1710,6 +1785,7 @@ static int brw_interpret(const struct lu_env *env,
        struct osc_extent *ext;
        struct osc_extent *tmp;
        struct client_obd *cli = aa->aa_cli;
+       unsigned long           transferred = 0;
         ENTRY;
 
         rc = osc_brw_fini_request(req, rc);
@@ -1802,8 +1878,12 @@ static int brw_interpret(const struct lu_env *env,
        LASSERT(list_empty(&aa->aa_exts));
        LASSERT(list_empty(&aa->aa_oaps));
 
+       transferred = (req->rq_bulk == NULL ? /* short io */
+                      aa->aa_requested_nob :
+                      req->rq_bulk->bd_nob_transferred);
+
        osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
-       ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
+       ptlrpc_lprocfs_brw(req, transferred);
 
        spin_lock(&cli->cl_loi_list_lock);
        /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters