Whamcloud - gitweb
LU-2513 osc: compute grant targets in bytes
[fs/lustre-release.git] / lustre / osc / osc_request.c
index c6cd621..64d5e3c 100644 (file)
@@ -69,97 +69,99 @@ int osc_cleanup(struct obd_device *obd);
 
 /* Pack OSC object metadata for disk storage (LE byte order). */
 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
-                      struct lov_stripe_md *lsm)
+                     struct lov_stripe_md *lsm)
 {
-        int lmm_size;
-        ENTRY;
+       int lmm_size;
+       ENTRY;
 
-        lmm_size = sizeof(**lmmp);
-        if (!lmmp)
-                RETURN(lmm_size);
+       lmm_size = sizeof(**lmmp);
+       if (lmmp == NULL)
+               RETURN(lmm_size);
 
-        if (*lmmp && !lsm) {
-                OBD_FREE(*lmmp, lmm_size);
-                *lmmp = NULL;
-                RETURN(0);
-        }
+       if (*lmmp != NULL && lsm == NULL) {
+               OBD_FREE(*lmmp, lmm_size);
+               *lmmp = NULL;
+               RETURN(0);
+       } else if (unlikely(lsm != NULL && lsm->lsm_object_id == 0)) {
+               RETURN(-EBADF);
+       }
 
-        if (!*lmmp) {
-                OBD_ALLOC(*lmmp, lmm_size);
-                if (!*lmmp)
-                        RETURN(-ENOMEM);
-        }
+       if (*lmmp == NULL) {
+               OBD_ALLOC(*lmmp, lmm_size);
+               if (*lmmp == NULL)
+                       RETURN(-ENOMEM);
+       }
 
-        if (lsm) {
-                LASSERT(lsm->lsm_object_id);
-                LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
-                (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
-                (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
-        }
+       if (lsm != NULL) {
+               (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
+               (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
+       }
 
-        RETURN(lmm_size);
+       RETURN(lmm_size);
 }
 
 /* Unpack OSC object metadata from disk storage (LE byte order). */
 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
-                        struct lov_mds_md *lmm, int lmm_bytes)
+                       struct lov_mds_md *lmm, int lmm_bytes)
 {
-        int lsm_size;
-        struct obd_import *imp = class_exp2cliimp(exp);
-        ENTRY;
+       int lsm_size;
+       struct obd_import *imp = class_exp2cliimp(exp);
+       ENTRY;
 
-        if (lmm != NULL) {
-                if (lmm_bytes < sizeof (*lmm)) {
-                        CERROR("lov_mds_md too small: %d, need %d\n",
-                               lmm_bytes, (int)sizeof(*lmm));
-                        RETURN(-EINVAL);
-                }
-                /* XXX LOV_MAGIC etc check? */
+       if (lmm != NULL) {
+               if (lmm_bytes < sizeof(*lmm)) {
+                       CERROR("%s: lov_mds_md too small: %d, need %d\n",
+                              exp->exp_obd->obd_name, lmm_bytes,
+                              (int)sizeof(*lmm));
+                       RETURN(-EINVAL);
+               }
+               /* XXX LOV_MAGIC etc check? */
 
-                if (lmm->lmm_object_id == 0) {
-                        CERROR("lov_mds_md: zero lmm_object_id\n");
-                        RETURN(-EINVAL);
-                }
-        }
+               if (unlikely(lmm->lmm_object_id == 0)) {
+                       CERROR("%s: zero lmm_object_id\n",
+                              exp->exp_obd->obd_name);
+                       RETURN(-EINVAL);
+               }
+       }
 
-        lsm_size = lov_stripe_md_size(1);
-        if (lsmp == NULL)
-                RETURN(lsm_size);
+       lsm_size = lov_stripe_md_size(1);
+       if (lsmp == NULL)
+               RETURN(lsm_size);
 
-        if (*lsmp != NULL && lmm == NULL) {
-                OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
-                OBD_FREE(*lsmp, lsm_size);
-                *lsmp = NULL;
-                RETURN(0);
-        }
+       if (*lsmp != NULL && lmm == NULL) {
+               OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
+               OBD_FREE(*lsmp, lsm_size);
+               *lsmp = NULL;
+               RETURN(0);
+       }
 
-        if (*lsmp == NULL) {
-                OBD_ALLOC(*lsmp, lsm_size);
-                if (*lsmp == NULL)
-                        RETURN(-ENOMEM);
-                OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
-                if ((*lsmp)->lsm_oinfo[0] == NULL) {
-                        OBD_FREE(*lsmp, lsm_size);
-                        RETURN(-ENOMEM);
-                }
-                loi_init((*lsmp)->lsm_oinfo[0]);
-        }
+       if (*lsmp == NULL) {
+               OBD_ALLOC(*lsmp, lsm_size);
+               if (unlikely(*lsmp == NULL))
+                       RETURN(-ENOMEM);
+               OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
+               if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
+                       OBD_FREE(*lsmp, lsm_size);
+                       RETURN(-ENOMEM);
+               }
+               loi_init((*lsmp)->lsm_oinfo[0]);
+       } else if (unlikely((*lsmp)->lsm_object_id == 0)) {
+               RETURN(-EBADF);
+       }
 
-        if (lmm != NULL) {
-                /* XXX zero *lsmp? */
-                (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
-                (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
-                LASSERT((*lsmp)->lsm_object_id);
-                LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
-        }
+       if (lmm != NULL) {
+               /* XXX zero *lsmp? */
+               (*lsmp)->lsm_object_id = le64_to_cpu(lmm->lmm_object_id);
+               (*lsmp)->lsm_object_seq = le64_to_cpu(lmm->lmm_object_seq);
+       }
 
-        if (imp != NULL &&
-            (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
-                (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
-        else
-                (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
+       if (imp != NULL &&
+           (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
+               (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
+       else
+               (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
 
-        RETURN(lsm_size);
+       RETURN(lsm_size);
 }
 
 static inline void osc_pack_capa(struct ptlrpc_request *req,
@@ -216,9 +218,9 @@ static int osc_getattr_interpret(const struct lu_env *env,
                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
 
-                /* This should really be sent by the OST */
-                aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
-                aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
+               /* This should really be sent by the OST */
+               aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
+               aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
         } else {
                 CDEBUG(D_INFO, "can't unpack ost_body\n");
                 rc = -EPROTO;
@@ -295,9 +297,8 @@ static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
 
-        /* This should really be sent by the OST */
-        oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
-        oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
+       oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
+       oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
 
         EXIT;
  out:
@@ -478,9 +479,8 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
 
         lustre_get_wire_obdo(oa, &body->oa);
 
-        /* This should really be sent by the OST */
-        oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
-        oa->o_valid |= OBD_MD_FLBLKSZ;
+       oa->o_blksize = cli_brw_size(exp->exp_obd);
+       oa->o_valid |= OBD_MD_FLBLKSZ;
 
         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
          * have valid lsm_oinfo data structs, so don't go touching that.
@@ -932,45 +932,45 @@ static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
  * needed, and avoids shrinking the grant piecemeal. */
 static int osc_shrink_grant(struct client_obd *cli)
 {
-        long target = (cli->cl_max_rpcs_in_flight + 1) *
-                      cli->cl_max_pages_per_rpc;
+       __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
+                            (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT);
 
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-        if (cli->cl_avail_grant <= target)
-                target = cli->cl_max_pages_per_rpc;
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       if (cli->cl_avail_grant <= target_bytes)
+               target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
 
-        return osc_shrink_grant_to_target(cli, target);
+       return osc_shrink_grant_to_target(cli, target_bytes);
 }
 
-int osc_shrink_grant_to_target(struct client_obd *cli, long target)
+int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
 {
-        int    rc = 0;
-        struct ost_body     *body;
-        ENTRY;
-
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-        /* Don't shrink if we are already above or below the desired limit
-         * We don't want to shrink below a single RPC, as that will negatively
-         * impact block allocation and long-term performance. */
-        if (target < cli->cl_max_pages_per_rpc)
-                target = cli->cl_max_pages_per_rpc;
+       int                     rc = 0;
+       struct ost_body        *body;
+       ENTRY;
 
-        if (target >= cli->cl_avail_grant) {
-                client_obd_list_unlock(&cli->cl_loi_list_lock);
-                RETURN(0);
-        }
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       /* Don't shrink if we are already above or below the desired limit
+        * We don't want to shrink below a single RPC, as that will negatively
+        * impact block allocation and long-term performance. */
+       if (target_bytes < cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)
+               target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
+
+       if (target_bytes >= cli->cl_avail_grant) {
+               client_obd_list_unlock(&cli->cl_loi_list_lock);
+               RETURN(0);
+       }
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
 
-        OBD_ALLOC_PTR(body);
-        if (!body)
-                RETURN(-ENOMEM);
+       OBD_ALLOC_PTR(body);
+       if (!body)
+               RETURN(-ENOMEM);
 
-        osc_announce_cached(cli, &body->oa, 0);
+       osc_announce_cached(cli, &body->oa, 0);
 
-        client_obd_list_lock(&cli->cl_loi_list_lock);
-        body->oa.o_grant = cli->cl_avail_grant - target;
-        cli->cl_avail_grant = target;
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       body->oa.o_grant = cli->cl_avail_grant - target_bytes;
+       cli->cl_avail_grant = target_bytes;
         client_obd_list_unlock(&cli->cl_loi_list_lock);
         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
                 body->oa.o_valid |= OBD_MD_FLFLAGS;
@@ -988,7 +988,6 @@ int osc_shrink_grant_to_target(struct client_obd *cli, long target)
         RETURN(rc);
 }
 
-#define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
 static int osc_should_shrink_grant(struct client_obd *client)
 {
         cfs_time_t time = cfs_time_current();
@@ -998,13 +997,18 @@ static int osc_should_shrink_grant(struct client_obd *client)
              OBD_CONNECT_GRANT_SHRINK) == 0)
                 return 0;
 
-        if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
-                if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
-                    client->cl_avail_grant > GRANT_SHRINK_LIMIT)
-                        return 1;
-                else
-                        osc_update_next_shrink(client);
-        }
+       if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
+               /* Get the current RPC size directly, instead of going via:
+                * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
+                * Keep comment here so that it can be found by searching. */
+               int brw_size = client->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
+
+               if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
+                   client->cl_avail_grant > brw_size)
+                       return 1;
+               else
+                       osc_update_next_shrink(client);
+       }
         return 0;
 }
 
@@ -1292,12 +1296,10 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         * retry logic */
        req->rq_no_retry_einprogress = 1;
 
-        if (opc == OST_WRITE)
-                desc = ptlrpc_prep_bulk_imp(req, page_count,
-                                            BULK_GET_SOURCE, OST_BULK_PORTAL);
-        else
-                desc = ptlrpc_prep_bulk_imp(req, page_count,
-                                            BULK_PUT_SINK, OST_BULK_PORTAL);
+       desc = ptlrpc_prep_bulk_imp(req, page_count,
+               cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
+               opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
+               OST_BULK_PORTAL);
 
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
@@ -1310,11 +1312,17 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
 
         lustre_set_wire_obdo(&body->oa, oa);
 
-        obdo_to_ioobj(oa, ioobj);
-        ioobj->ioo_bufcnt = niocount;
-        osc_pack_capa(req, body, ocapa);
-        LASSERT (page_count > 0);
-        pg_prev = pga[0];
+       obdo_to_ioobj(oa, ioobj);
+       ioobj->ioo_bufcnt = niocount;
+       /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
+        * that might be send for this request.  The actual number is decided
+        * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
+        * "max - 1" for old client compatibility sending "0", and also so the
+        * the actual maximum is a power-of-two number, not one less. LU-1431 */
+       ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
+       osc_pack_capa(req, body, ocapa);
+       LASSERT(page_count > 0);
+       pg_prev = pga[0];
         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
                 struct brw_page *pg = pga[i];
                 int poff = pg->off & ~CFS_PAGE_MASK;
@@ -3257,7 +3265,7 @@ static int osc_reconnect(const struct lu_env *env,
 
                 client_obd_list_lock(&cli->cl_loi_list_lock);
                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
-                                2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
+                               2 * cli_brw_size(obd);
                 lost_grant = cli->cl_lost_grant;
                 cli->cl_lost_grant = 0;
                 client_obd_list_unlock(&cli->cl_loi_list_lock);