Whamcloud - gitweb
LU-7890 lov: Ensure correct operation for large object sizes
[fs/lustre-release.git] / lustre / lov / lov_ea.c
index 2e2357c..e39acc3 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2014, Intel Corporation.
+ * Copyright (c) 2011, 2015, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -79,18 +79,20 @@ static int lsm_lmm_verify_common(struct lov_mds_md *lmm, int lmm_bytes,
        return 0;
 }
 
-struct lov_stripe_md *lsm_alloc_plain(__u16 stripe_count, int *size)
+struct lov_stripe_md *lsm_alloc_plain(u16 stripe_count)
 {
        struct lov_stripe_md *lsm;
        struct lov_oinfo     *loi;
-       int                   i, oinfo_ptrs_size;
+       size_t lsm_size;
+       size_t oinfo_ptrs_size;
+       int i;
 
        LASSERT(stripe_count <= LOV_MAX_STRIPE_COUNT);
 
        oinfo_ptrs_size = sizeof(struct lov_oinfo *) * stripe_count;
-       *size = sizeof(struct lov_stripe_md) + oinfo_ptrs_size;
+       lsm_size = sizeof(*lsm) + oinfo_ptrs_size;
 
-       OBD_ALLOC_LARGE(lsm, *size);
+       OBD_ALLOC_LARGE(lsm, lsm_size);
        if (!lsm)
                return NULL;
 
@@ -106,25 +108,60 @@ struct lov_stripe_md *lsm_alloc_plain(__u16 stripe_count, int *size)
 err:
        while (--i >= 0)
                OBD_SLAB_FREE(lsm->lsm_oinfo[i], lov_oinfo_slab, sizeof(*loi));
-       OBD_FREE_LARGE(lsm, *size);
+
+       OBD_FREE_LARGE(lsm, lsm_size);
+
        return NULL;
 }
 
 void lsm_free_plain(struct lov_stripe_md *lsm)
 {
-        __u16 stripe_count = lsm->lsm_stripe_count;
-        int i;
-
-        for (i = 0; i < stripe_count; i++)
-                OBD_SLAB_FREE(lsm->lsm_oinfo[i], lov_oinfo_slab,
-                              sizeof(struct lov_oinfo));
-        OBD_FREE_LARGE(lsm, sizeof(struct lov_stripe_md) +
-                       stripe_count * sizeof(struct lov_oinfo *));
+       __u16 stripe_count = lsm->lsm_stripe_count;
+       int i;
+
+       for (i = 0; i < stripe_count; i++)
+               OBD_SLAB_FREE(lsm->lsm_oinfo[i], lov_oinfo_slab,
+                             sizeof(struct lov_oinfo));
+       OBD_FREE_LARGE(lsm, sizeof(struct lov_stripe_md) +
+                      stripe_count * sizeof(struct lov_oinfo *));
+}
+
+/* Find minimum stripe maxbytes value.  For inactive or
+ * reconnecting targets use LUSTRE_EXT3_STRIPE_MAXBYTES. */
+static loff_t lov_tgt_maxbytes(struct lov_tgt_desc *tgt)
+{
+       struct obd_import *imp;
+       loff_t maxbytes = LUSTRE_EXT3_STRIPE_MAXBYTES;
+
+       if (!tgt->ltd_active)
+               return maxbytes;
+
+       imp = tgt->ltd_obd->u.cli.cl_import;
+       if (imp == NULL)
+               return maxbytes;
+
+       spin_lock(&imp->imp_lock);
+       if (imp->imp_state == LUSTRE_IMP_FULL &&
+           (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES) &&
+           imp->imp_connect_data.ocd_maxbytes > 0)
+               maxbytes = imp->imp_connect_data.ocd_maxbytes;
+
+       spin_unlock(&imp->imp_lock);
+
+       return maxbytes;
 }
 
-static void lsm_unpackmd_common(struct lov_stripe_md *lsm,
-                                struct lov_mds_md *lmm)
+static int lsm_unpackmd_common(struct lov_obd *lov,
+                              struct lov_stripe_md *lsm,
+                              struct lov_mds_md *lmm,
+                              struct lov_ost_data_v1 *objects)
 {
+       struct lov_oinfo *loi;
+       loff_t min_stripe_maxbytes = 0;
+       loff_t lov_bytes;
+       unsigned int stripe_count;
+       unsigned int i;
+
        /*
         * This supposes lov_mds_md_v1/v3 first fields are
         * are the same
@@ -134,6 +171,50 @@ static void lsm_unpackmd_common(struct lov_stripe_md *lsm,
        lsm->lsm_pattern = le32_to_cpu(lmm->lmm_pattern);
        lsm->lsm_layout_gen = le16_to_cpu(lmm->lmm_layout_gen);
        lsm->lsm_pool_name[0] = '\0';
+
+       stripe_count = lsm_is_released(lsm) ? 0 : lsm->lsm_stripe_count;
+
+       for (i = 0; i < stripe_count; i++) {
+               loi = lsm->lsm_oinfo[i];
+               ostid_le_to_cpu(&objects[i].l_ost_oi, &loi->loi_oi);
+               loi->loi_ost_idx = le32_to_cpu(objects[i].l_ost_idx);
+               loi->loi_ost_gen = le32_to_cpu(objects[i].l_ost_gen);
+               if (lov_oinfo_is_dummy(loi))
+                       continue;
+
+               if (loi->loi_ost_idx >= lov->desc.ld_tgt_count &&
+                   !lov2obd(lov)->obd_process_conf) {
+                       CERROR("%s: OST index %d more than OST count %d\n",
+                              (char*)lov->desc.ld_uuid.uuid,
+                              loi->loi_ost_idx, lov->desc.ld_tgt_count);
+                       lov_dump_lmm_v1(D_WARNING, lmm);
+                       return -EINVAL;
+               }
+
+               if (lov->lov_tgts[loi->loi_ost_idx] == NULL) {
+                       CERROR("%s: OST index %d missing\n",
+                              (char*)lov->desc.ld_uuid.uuid, loi->loi_ost_idx);
+                       lov_dump_lmm_v1(D_WARNING, lmm);
+                       continue;
+               }
+
+               lov_bytes = lov_tgt_maxbytes(lov->lov_tgts[loi->loi_ost_idx]);
+               if (min_stripe_maxbytes == 0 || lov_bytes < min_stripe_maxbytes)
+                       min_stripe_maxbytes = lov_bytes;
+       }
+
+       if (min_stripe_maxbytes == 0)
+               min_stripe_maxbytes = LUSTRE_EXT3_STRIPE_MAXBYTES;
+
+       stripe_count = lsm->lsm_stripe_count ?: lov->desc.ld_tgt_count;
+       lov_bytes = min_stripe_maxbytes * stripe_count;
+
+       if (lov_bytes < min_stripe_maxbytes) /* handle overflow */
+               lsm->lsm_maxbytes = MAX_LFS_FILESIZE;
+       else
+               lsm->lsm_maxbytes = lov_bytes;
+
+       return 0;
 }
 
 static void
@@ -152,29 +233,6 @@ lsm_stripe_by_offset_plain(struct lov_stripe_md *lsm, int *stripeno,
                *swidth = (loff_t)lsm->lsm_stripe_size * lsm->lsm_stripe_count;
 }
 
-/* Find minimum stripe maxbytes value.  For inactive or
- * reconnecting targets use LUSTRE_EXT3_STRIPE_MAXBYTES. */
-static void lov_tgt_maxbytes(struct lov_tgt_desc *tgt, __u64 *stripe_maxbytes)
-{
-        struct obd_import *imp = tgt->ltd_obd->u.cli.cl_import;
-
-        if (imp == NULL || !tgt->ltd_active) {
-               *stripe_maxbytes = LUSTRE_EXT3_STRIPE_MAXBYTES;
-                return;
-        }
-
-       spin_lock(&imp->imp_lock);
-       if (imp->imp_state == LUSTRE_IMP_FULL &&
-           (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES) &&
-           imp->imp_connect_data.ocd_maxbytes > 0) {
-               if (*stripe_maxbytes > imp->imp_connect_data.ocd_maxbytes)
-                       *stripe_maxbytes = imp->imp_connect_data.ocd_maxbytes;
-       } else {
-               *stripe_maxbytes = LUSTRE_EXT3_STRIPE_MAXBYTES;
-       }
-       spin_unlock(&imp->imp_lock);
-}
-
 static int lsm_lmm_verify_v1(struct lov_mds_md_v1 *lmm, int lmm_bytes,
                              __u16 *stripe_count)
 {
@@ -201,45 +259,7 @@ static int lsm_lmm_verify_v1(struct lov_mds_md_v1 *lmm, int lmm_bytes,
 static int lsm_unpackmd_v1(struct lov_obd *lov, struct lov_stripe_md *lsm,
                           struct lov_mds_md_v1 *lmm)
 {
-        struct lov_oinfo *loi;
-        int i;
-       int stripe_count;
-        __u64 stripe_maxbytes = OBD_OBJECT_EOF;
-
-        lsm_unpackmd_common(lsm, lmm);
-
-       stripe_count = lsm_is_released(lsm) ? 0 : lsm->lsm_stripe_count;
-
-       for (i = 0; i < stripe_count; i++) {
-               /* XXX LOV STACKING call down to osc_unpackmd() */
-               loi = lsm->lsm_oinfo[i];
-               ostid_le_to_cpu(&lmm->lmm_objects[i].l_ost_oi, &loi->loi_oi);
-               loi->loi_ost_idx = le32_to_cpu(lmm->lmm_objects[i].l_ost_idx);
-               loi->loi_ost_gen = le32_to_cpu(lmm->lmm_objects[i].l_ost_gen);
-               if (lov_oinfo_is_dummy(loi))
-                       continue;
-
-               if (loi->loi_ost_idx >= lov->desc.ld_tgt_count) {
-                        CERROR("OST index %d more than OST count %d\n",
-                               loi->loi_ost_idx, lov->desc.ld_tgt_count);
-                        lov_dump_lmm_v1(D_WARNING, lmm);
-                        return -EINVAL;
-                }
-                if (!lov->lov_tgts[loi->loi_ost_idx]) {
-                        CERROR("OST index %d missing\n", loi->loi_ost_idx);
-                        lov_dump_lmm_v1(D_WARNING, lmm);
-                        return -EINVAL;
-                }
-                /* calculate the minimum stripe max bytes */
-                lov_tgt_maxbytes(lov->lov_tgts[loi->loi_ost_idx],
-                                 &stripe_maxbytes);
-        }
-
-       lsm->lsm_maxbytes = stripe_maxbytes * lsm->lsm_stripe_count;
-       if (lsm->lsm_stripe_count == 0)
-               lsm->lsm_maxbytes = stripe_maxbytes * lov->desc.ld_tgt_count;
-
-       return 0;
+       return lsm_unpackmd_common(lov, lsm, lmm, lmm->lmm_objects);
 }
 
 const struct lsm_operations lsm_v1_ops = {
@@ -279,55 +299,21 @@ static int lsm_lmm_verify_v3(struct lov_mds_md *lmmv1, int lmm_bytes,
 }
 
 static int lsm_unpackmd_v3(struct lov_obd *lov, struct lov_stripe_md *lsm,
-                          struct lov_mds_md *lmmv1)
+                          struct lov_mds_md *lmm)
 {
-        struct lov_mds_md_v3 *lmm;
-        struct lov_oinfo *loi;
-        int i;
-       int stripe_count;
-        __u64 stripe_maxbytes = OBD_OBJECT_EOF;
-       int cplen = 0;
+       struct lov_mds_md_v3 *lmm_v3 = (struct lov_mds_md_v3 *)lmm;
+       size_t cplen;
+       int rc;
 
-        lmm = (struct lov_mds_md_v3 *)lmmv1;
+       rc = lsm_unpackmd_common(lov, lsm, lmm, lmm_v3->lmm_objects);
+       if (rc != 0)
+               return rc;
 
-        lsm_unpackmd_common(lsm, (struct lov_mds_md_v1 *)lmm);
-
-       stripe_count = lsm_is_released(lsm) ? 0 : lsm->lsm_stripe_count;
-
-       cplen = strlcpy(lsm->lsm_pool_name, lmm->lmm_pool_name,
+       cplen = strlcpy(lsm->lsm_pool_name, lmm_v3->lmm_pool_name,
                        sizeof(lsm->lsm_pool_name));
        if (cplen >= sizeof(lsm->lsm_pool_name))
                return -E2BIG;
 
-        for (i = 0; i < stripe_count; i++) {
-               /* XXX LOV STACKING call down to osc_unpackmd() */
-               loi = lsm->lsm_oinfo[i];
-               ostid_le_to_cpu(&lmm->lmm_objects[i].l_ost_oi, &loi->loi_oi);
-               loi->loi_ost_idx = le32_to_cpu(lmm->lmm_objects[i].l_ost_idx);
-               loi->loi_ost_gen = le32_to_cpu(lmm->lmm_objects[i].l_ost_gen);
-               if (lov_oinfo_is_dummy(loi))
-                       continue;
-
-                if (loi->loi_ost_idx >= lov->desc.ld_tgt_count) {
-                        CERROR("OST index %d more than OST count %d\n",
-                               loi->loi_ost_idx, lov->desc.ld_tgt_count);
-                        lov_dump_lmm_v3(D_WARNING, lmm);
-                        return -EINVAL;
-                }
-                if (!lov->lov_tgts[loi->loi_ost_idx]) {
-                        CERROR("OST index %d missing\n", loi->loi_ost_idx);
-                        lov_dump_lmm_v3(D_WARNING, lmm);
-                        return -EINVAL;
-                }
-                /* calculate the minimum stripe max bytes */
-                lov_tgt_maxbytes(lov->lov_tgts[loi->loi_ost_idx],
-                                 &stripe_maxbytes);
-        }
-
-       lsm->lsm_maxbytes = stripe_maxbytes * lsm->lsm_stripe_count;
-       if (lsm->lsm_stripe_count == 0)
-               lsm->lsm_maxbytes = stripe_maxbytes * lov->desc.ld_tgt_count;
-
        return 0;
 }