Whamcloud - gitweb
LU-3285 merge: 'dom' branch merging
[fs/lustre-release.git] / lustre / osc / osc_request.c
index e4c6a04..5c6438c 100644 (file)
@@ -58,18 +58,6 @@ struct ptlrpc_request_pool *osc_rq_pool;
 static unsigned int osc_reqpool_mem_max = 5;
 module_param(osc_reqpool_mem_max, uint, 0444);
 
-struct osc_brw_async_args {
-       struct obdo              *aa_oa;
-       int                       aa_requested_nob;
-       int                       aa_nio_count;
-       u32                       aa_page_count;
-       int                       aa_resends;
-       struct brw_page **aa_ppga;
-       struct client_obd        *aa_cli;
-       struct list_head          aa_oaps;
-       struct list_head          aa_exts;
-};
-
 #define osc_grant_args osc_brw_async_args
 
 struct osc_setattr_args {
@@ -711,10 +699,11 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
 
 void osc_update_next_shrink(struct client_obd *cli)
 {
-        cli->cl_next_shrink_grant =
-                cfs_time_shift(cli->cl_grant_shrink_interval);
-        CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
-               cli->cl_next_shrink_grant);
+       cli->cl_next_shrink_grant = ktime_get_seconds() +
+                                   cli->cl_grant_shrink_interval;
+
+       CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
+              cli->cl_next_shrink_grant);
 }
 
 static void __osc_update_grant(struct client_obd *cli, u64 grant)
@@ -831,14 +820,13 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
 
 static int osc_should_shrink_grant(struct client_obd *client)
 {
-        cfs_time_t time = cfs_time_current();
-        cfs_time_t next_shrink = client->cl_next_shrink_grant;
+       time64_t next_shrink = client->cl_next_shrink_grant;
 
         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
              OBD_CONNECT_GRANT_SHRINK) == 0)
                 return 0;
 
-       if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
+       if (ktime_get_seconds() >= next_shrink - 5) {
                /* Get the current RPC size directly, instead of going via:
                 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
                 * Keep comment here so that it can be found by searching. */
@@ -1011,8 +999,8 @@ static int check_write_rcs(struct ptlrpc_request *req,
                         return(-EPROTO);
                 }
         }
-
-        if (req->rq_bulk->bd_nob_transferred != requested_nob) {
+       if (req->rq_bulk != NULL &&
+           req->rq_bulk->bd_nob_transferred != requested_nob) {
                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
                        req->rq_bulk->bd_nob_transferred, requested_nob);
                 return(-EPROTO);
@@ -1105,10 +1093,11 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         struct ost_body         *body;
         struct obd_ioobj        *ioobj;
         struct niobuf_remote    *niobuf;
-        int niocount, i, requested_nob, opc, rc;
+       int niocount, i, requested_nob, opc, rc, short_io_size;
         struct osc_brw_async_args *aa;
         struct req_capsule      *pill;
         struct brw_page *pg_prev;
+       void *short_io_buf;
 
         ENTRY;
         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
@@ -1139,6 +1128,20 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
                              niocount * sizeof(*niobuf));
 
+       for (i = 0; i < page_count; i++)
+               short_io_size += pga[i]->count;
+
+       /* Check if we can do a short io. */
+       if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 &&
+           imp_connect_shortio(cli->cl_import)))
+               short_io_size = 0;
+
+       req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
+                            opc == OST_READ ? 0 : short_io_size);
+       if (opc == OST_READ)
+               req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
+                                    short_io_size);
+
         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
         if (rc) {
                 ptlrpc_request_free(req);
@@ -1151,6 +1154,12 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         * retry logic */
        req->rq_no_retry_einprogress = 1;
 
+       if (short_io_size != 0) {
+               desc = NULL;
+               short_io_buf = NULL;
+               goto no_bulk;
+       }
+
        desc = ptlrpc_prep_bulk_imp(req, page_count,
                cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
                (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
@@ -1162,7 +1171,7 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
         /* NB request now owns desc and will free it when it gets freed */
-
+no_bulk:
         body = req_capsule_client_get(pill, &RMF_OST_BODY);
         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
@@ -1177,7 +1186,26 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
         * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
         * "max - 1" for old client compatibility sending "0", and also so the
         * the actual maximum is a power-of-two number, not one less. LU-1431 */
-       ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
+       if (desc != NULL)
+               ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
+       else /* short io */
+               ioobj_max_brw_set(ioobj, 0);
+
+       if (short_io_size != 0) {
+               if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+                       body->oa.o_valid |= OBD_MD_FLFLAGS;
+                       body->oa.o_flags = 0;
+               }
+               body->oa.o_flags |= OBD_FL_SHORT_IO;
+               CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
+                      short_io_size);
+               if (opc == OST_WRITE) {
+                       short_io_buf = req_capsule_client_get(pill,
+                                                             &RMF_SHORT_IO);
+                       LASSERT(short_io_buf != NULL);
+               }
+       }
+
        LASSERT(page_count > 0);
        pg_prev = pga[0];
         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
@@ -1202,9 +1230,19 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
                          pg_prev->pg->index, pg_prev->off);
                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
                         (pg->flag & OBD_BRW_SRVLOCK));
-
-               desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
-                requested_nob += pg->count;
+               if (short_io_size != 0 && opc == OST_WRITE) {
+                       unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
+
+                       LASSERT(short_io_size >= requested_nob + pg->count);
+                       memcpy(short_io_buf + requested_nob,
+                              ptr + poff,
+                              pg->count);
+                       ll_kunmap_atomic(ptr, KM_USER0);
+               } else if (short_io_size == 0) {
+                       desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
+                                                        pg->count);
+               }
+               requested_nob += pg->count;
 
                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
                         niobuf--;
@@ -1279,17 +1317,17 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
                 * resent due to cksum error, this will allow Server to
                 * check+dump pages on its side */
        }
-        ptlrpc_request_set_replen(req);
+       ptlrpc_request_set_replen(req);
 
-        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-        aa = ptlrpc_req_async_args(req);
-        aa->aa_oa = oa;
-        aa->aa_requested_nob = requested_nob;
-        aa->aa_nio_count = niocount;
-        aa->aa_page_count = page_count;
-        aa->aa_resends = 0;
-        aa->aa_ppga = pga;
-        aa->aa_cli = cli;
+       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+       aa = ptlrpc_req_async_args(req);
+       aa->aa_oa = oa;
+       aa->aa_requested_nob = requested_nob;
+       aa->aa_nio_count = niocount;
+       aa->aa_page_count = page_count;
+       aa->aa_resends = 0;
+       aa->aa_ppga = pga;
+       aa->aa_cli = cli;
        INIT_LIST_HEAD(&aa->aa_oaps);
 
        *reqp = req;
@@ -1473,9 +1511,9 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                         CERROR("Unexpected +ve rc %d\n", rc);
                         RETURN(-EPROTO);
                 }
-                LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
 
-                if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
+               if (req->rq_bulk != NULL &&
+                   sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
                         RETURN(-EAGAIN);
 
                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
@@ -1490,8 +1528,14 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
 
         /* The rest of this function executes only for OST_READs */
 
-        /* if unwrap_bulk failed, return -EAGAIN to retry */
-        rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
+       if (req->rq_bulk == NULL) {
+               rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
+                                         RCL_SERVER);
+               LASSERT(rc == req->rq_status);
+       } else {
+               /* if unwrap_bulk failed, return -EAGAIN to retry */
+               rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
+       }
         if (rc < 0)
                 GOTO(out, rc = -EAGAIN);
 
@@ -1501,12 +1545,41 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                 RETURN(-EPROTO);
         }
 
-        if (rc != req->rq_bulk->bd_nob_transferred) {
+       if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
                 CERROR ("Unexpected rc %d (%d transferred)\n",
                         rc, req->rq_bulk->bd_nob_transferred);
                 return (-EPROTO);
         }
 
+       if (req->rq_bulk == NULL) {
+               /* short io */
+               int nob, pg_count, i = 0;
+               unsigned char *buf;
+
+               CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
+               pg_count = aa->aa_page_count;
+               buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
+                                                  rc);
+               nob = rc;
+               while (nob > 0 && pg_count > 0) {
+                       unsigned char *ptr;
+                       int count = aa->aa_ppga[i]->count > nob ?
+                                   nob : aa->aa_ppga[i]->count;
+
+                       CDEBUG(D_CACHE, "page %p count %d\n",
+                              aa->aa_ppga[i]->pg, count);
+                       ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
+                       memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
+                              count);
+                       ll_kunmap_atomic((void *) ptr, KM_USER0);
+
+                       buf += count;
+                       nob -= count;
+                       i++;
+                       pg_count--;
+               }
+       }
+
         if (rc < aa->aa_requested_nob)
                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
 
@@ -1523,7 +1596,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                                                  aa->aa_ppga, OST_READ,
                                                  cksum_type);
 
-               if (peer->nid != req->rq_bulk->bd_sender) {
+               if (req->rq_bulk != NULL &&
+                   peer->nid != req->rq_bulk->bd_sender) {
                        via = " via ";
                        router = libcfs_nid2str(req->rq_bulk->bd_sender);
                }
@@ -1697,6 +1771,7 @@ static int brw_interpret(const struct lu_env *env,
        struct osc_extent *ext;
        struct osc_extent *tmp;
        struct client_obd *cli = aa->aa_cli;
+       unsigned long           transferred = 0;
         ENTRY;
 
         rc = osc_brw_fini_request(req, rc);
@@ -1789,8 +1864,12 @@ static int brw_interpret(const struct lu_env *env,
        LASSERT(list_empty(&aa->aa_exts));
        LASSERT(list_empty(&aa->aa_oaps));
 
+       transferred = (req->rq_bulk == NULL ? /* short io */
+                      aa->aa_requested_nob :
+                      req->rq_bulk->bd_nob_transferred);
+
        osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
-       ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
+       ptlrpc_lprocfs_brw(req, transferred);
 
        spin_lock(&cli->cl_loi_list_lock);
        /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
@@ -2391,13 +2470,13 @@ static int osc_statfs_async(struct obd_export *exp,
                 req->rq_no_delay = 1;
         }
 
-        req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
-        CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
-        aa = ptlrpc_req_async_args(req);
-        aa->aa_oi = oinfo;
+       req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
+       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+       aa = ptlrpc_req_async_args(req);
+       aa->aa_oi = oinfo;
 
-        ptlrpc_set_add_req(rqset, req);
-        RETURN(0);
+       ptlrpc_set_add_req(rqset, req);
+       RETURN(0);
 }
 
 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
@@ -2590,23 +2669,23 @@ int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
        tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
                                                        &RMF_OST_BODY :
                                                        &RMF_SETINFO_VAL);
-        memcpy(tmp, val, vallen);
+       memcpy(tmp, val, vallen);
 
        if (KEY_IS(KEY_GRANT_SHRINK)) {
-                struct osc_grant_args *aa;
-                struct obdo *oa;
-
-                CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-                aa = ptlrpc_req_async_args(req);
-                OBDO_ALLOC(oa);
-                if (!oa) {
-                        ptlrpc_req_finished(req);
-                        RETURN(-ENOMEM);
-                }
-                *oa = ((struct ost_body *)val)->oa;
-                aa->aa_oa = oa;
-                req->rq_interpret_reply = osc_shrink_grant_interpret;
-        }
+               struct osc_grant_args *aa;
+               struct obdo *oa;
+
+               CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+               aa = ptlrpc_req_async_args(req);
+               OBDO_ALLOC(oa);
+               if (!oa) {
+                       ptlrpc_req_finished(req);
+                       RETURN(-ENOMEM);
+               }
+               *oa = ((struct ost_body *)val)->oa;
+               aa->aa_oa = oa;
+               req->rq_interpret_reply = osc_shrink_grant_interpret;
+       }
 
        ptlrpc_request_set_replen(req);
        if (!KEY_IS(KEY_GRANT_SHRINK)) {