Whamcloud - gitweb
- helper lov_stripe_size() calculates apparent file size from each stripe
authoradilger <adilger>
Wed, 16 Oct 2002 20:13:39 +0000 (20:13 +0000)
committeradilger <adilger>
Wed, 16 Oct 2002 20:13:39 +0000 (20:13 +0000)
- helper lov_merge_attrs() uses lov_stripe_size() and also merges other attrs
- lov_getattr() and lov_open() use lov_merge_attrs() to return size+blocks
- fix lov_stripe_offset() to work properly for truncate up operations
- don't allocate obdos on the stack if we can avoid it
- lov_setattr() is not being used - add LBUG() to ensure we notice when/if
  it does start being used and do a code audit for correctness

lustre/lov/lov_obd.c

index 946afc0..09f842c 100644 (file)
@@ -220,29 +220,33 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
         struct lov_obd *lov;
         struct lov_stripe_md *lsm;
         struct lov_oinfo *loi;
+        struct obdo *tmp;
         int sub_offset, stripe_offset;
         int ost_count;
         int rc = 0, i;
         ENTRY;
 
-        if (!ea) {
-                CERROR("lov_create needs ea\n");
-                RETURN(-EINVAL);
-        }
+        LASSERT(ea);
 
         if (!export)
                 RETURN(-EINVAL);
 
+        tmp = obdo_alloc();
+        if (!tmp)
+                RETURN(-ENOMEM);
+
+
         lov = &export->exp_obd->u.lov;
         ost_count = lov->desc.ld_tgt_count;
         oa->o_easize = lov_stripe_md_size(ost_count);
-        if (!*ea) {
-                OBD_ALLOC(*ea, oa->o_easize);
-                if (!*ea)
-                        RETURN(-ENOMEM);
-        }
 
         lsm = *ea;
+        if (!lsm) {
+                OBD_ALLOC(lsm, oa->o_easize);
+                if (!lsm)
+                        GOTO(out_tmp, rc = -ENOMEM);
+        }
+
         LASSERT(oa->o_valid & OBD_MD_FLID);
         lsm->lsm_magic = LOV_MAGIC;
         lsm->lsm_mds_easize = lov_mds_md_size(ost_count);
@@ -266,46 +270,48 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 struct lov_stripe_md obj_md;
                 struct lov_stripe_md *obj_mdp = &obj_md;
-                struct obdo tmp;
                 int ost_idx = (((sub_offset + i) % lsm->lsm_stripe_count) +
                                stripe_offset) % ost_count;
 
                 /* create data objects with "parent" OA */
-                memcpy(&tmp, oa, sizeof(tmp));
-                tmp.o_easize = sizeof(struct lov_stripe_md);
-                rc = obd_create(&lov->tgts[ost_idx].conn, &tmp, &obj_mdp);
+                memcpy(tmp, oa, sizeof(*tmp));
+                tmp->o_easize = sizeof(struct lov_stripe_md);
+                rc = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp);
                 if (rc) {
                         CERROR("error creating objid "LPX64" sub-object on "
                                "OST idx %d: rc = %d\n", oa->o_id, ost_idx, rc);
                         GOTO(out_cleanup, rc);
                 }
-                loi->loi_id = tmp.o_id;
-                loi->loi_size = tmp.o_size;
+                loi->loi_id = tmp->o_id;
+                loi->loi_size = tmp->o_size;
                 loi->loi_ost_idx = ost_idx;
                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
                        lsm->lsm_object_id, loi->loi_id, ost_idx);
         }
 
+        *ea = lsm;
+
+ out_tmp:
+        obdo_free(tmp);
+        return rc;
+
  out_cleanup:
-        if (rc) {
-                while (i-- > 0) {
-                        struct obdo tmp;
-                        int err;
-
-                        --loi;
-                        /* destroy already created objects here */
-                        memcpy(&tmp, oa, sizeof(tmp));
-                        tmp.o_id = loi->loi_id;
-                        err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn,
-                                          &tmp, NULL);
-                        if (err)
-                                CERROR("Failed to remove objid "LPX64" subobj "
-                                       LPX64" on OST idx %d: rc = %d\n",
-                                       oa->o_id, loi->loi_id, loi->loi_ost_idx,
-                                       err);
-                }
+        while (i-- > 0) {
+                int err;
+
+                --loi;
+                /* destroy already created objects here */
+                memcpy(tmp, oa, sizeof(*tmp));
+                tmp->o_id = loi->loi_id;
+                err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
+                if (err)
+                        CERROR("Failed to uncreate objid "LPX64" subobj "
+                               LPX64" on OST idx %d: rc = %d\n",
+                               oa->o_id, loi->loi_id, loi->loi_ost_idx,
+                               err);
         }
-        return rc;
+        OBD_FREE(lsm, oa->o_easize);
+        goto out_tmp;
 }
 
 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
@@ -346,6 +352,55 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
         RETURN(rc);
 }
 
+/* compute object size given "stripeno" and the ost size */
+static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
+                                int stripeno)
+{
+        unsigned long ssize  = lsm->lsm_stripe_size;
+        unsigned long swidth = ssize * lsm->lsm_stripe_count;
+        unsigned long stripe_size;
+        obd_size lov_size;
+
+        if (ost_size == 0)
+                return 0;
+
+        /* do_div(a, b) returns a % b, and a = a / b */
+        stripe_size = do_div(ost_size, ssize);
+
+        if (stripe_size)
+                lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
+        else
+                lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
+
+        return lov_size;
+}
+
+static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
+                            struct lov_stripe_md *lsm, int stripeno, int *new)
+{
+        if (*new) {
+                obdo_cpy_md(tgt, src, valid);
+                if (valid & OBD_MD_FLSIZE)
+                        tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
+                *new = 0;
+        } else {
+                if (valid & OBD_MD_FLSIZE) {
+                        /* this handles sparse files properly */
+                        obd_size lov_size;
+
+                        lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
+                        if (lov_size > tgt->o_size)
+                                tgt->o_size = lov_size;
+                }
+                if (valid & OBD_MD_FLBLOCKS)
+                        tgt->o_blocks += src->o_blocks;
+                if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
+                        tgt->o_ctime = src->o_ctime;
+                if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
+                        tgt->o_mtime = src->o_mtime;
+        }
+}
+
 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
                        struct lov_stripe_md *lsm)
 {
@@ -354,7 +409,7 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
         struct lov_obd *lov;
         struct lov_oinfo *loi;
         int rc = 0, i;
-        int set = 0;
+        int new = 1;
         ENTRY;
 
         if (!lsm) {
@@ -393,22 +448,7 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
                                 rc = err;
                         continue; /* XXX or break? */
                 }
-                if (!set) {
-                        obdo_cpy_md(oa, &tmp, tmp.o_valid);
-                        set = 1;
-                } else {
-#warning FIXME: the size needs to be fixed for sparse files
-                        if (tmp.o_valid & OBD_MD_FLSIZE)
-                                oa->o_size += tmp.o_size;
-                        if (tmp.o_valid & OBD_MD_FLBLOCKS)
-                                oa->o_blocks += tmp.o_blocks;
-                        if (tmp.o_valid & OBD_MD_FLCTIME &&
-                            oa->o_ctime < tmp.o_ctime)
-                                oa->o_ctime = tmp.o_ctime;
-                        if (tmp.o_valid & OBD_MD_FLMTIME &&
-                            oa->o_mtime < tmp.o_mtime)
-                                oa->o_mtime = tmp.o_mtime;
-                }
+                lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &new);
         }
         RETURN(rc);
 }
@@ -416,13 +456,18 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
                        struct lov_stripe_md *lsm)
 {
-        int rc = 0, i;
         struct obdo tmp;
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
         struct lov_oinfo *loi;
+        int rc = 0, i;
         ENTRY;
 
+        /* Note that this code is currently unused, hence LBUG(), just
+         * to know when/if it is ever revived that it needs cleanups.
+         */
+        LBUG();
+
         if (!lsm) {
                 CERROR("LOV requires striping ea\n");
                 RETURN(-EINVAL);
@@ -437,8 +482,8 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
         if (!export || !export->exp_obd)
                 RETURN(-ENODEV);
 
-        if (oa->o_valid & OBD_MD_FLSIZE)
-                CERROR("setting size on an LOV object is totally broken\n");
+        /* size changes should go through punch and not setattr */
+        LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
 
         lov = &export->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
@@ -463,10 +508,11 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
                     struct lov_stripe_md *lsm)
 {
-        struct obdo tmp;
+        struct obdo *tmp;
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
         struct lov_oinfo *loi;
+        int new = 1;
         int rc = 0, i;
         ENTRY;
 
@@ -484,15 +530,21 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
         if (!export || !export->exp_obd)
                 RETURN(-ENODEV);
 
+        tmp = obdo_alloc();
+        if (!tmp)
+                RETURN(-ENOMEM);
+
         lov = &export->exp_obd->u.lov;
+        oa->o_size = 0;
+        oa->o_blocks = 0;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 int err;
 
                 /* create data objects with "parent" OA */
-                memcpy(&tmp, oa, sizeof(tmp));
-                tmp.o_id = loi->loi_id;
+                memcpy(tmp, oa, sizeof(*tmp));
+                tmp->o_id = loi->loi_id;
 
-                err = obd_open(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
+                err = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
                 if (err) {
                         CERROR("Error open objid "LPX64" subobj "LPX64
                                " on OST idx %d: rc = %d\n",
@@ -501,14 +553,17 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
                         if (!rc)
                                 rc = err;
                 }
+
+                lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new);
         }
         /* FIXME: returning an error, but having opened some objects is a bad
          *        idea, since they will likely never be closed.  We either
          *        need to not return an error if _some_ objects could be
          *        opened, and leave it to read/write to return -EIO (with
          *        hopefully partial error status) or close all opened objects
-         *        and return an error.
+         *        and return an error.  I think the former is preferred.
          */
+        obdo_free(tmp);
         RETURN(rc);
 }
 
@@ -563,28 +618,35 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
 #warning FIXME: merge these two functions now that they are nearly the same
 
 /* compute ost offset in stripe "stripeno" corresponding to offset "lov_off" */
-static __u64 lov_offset(struct lov_stripe_md *lsm, __u64 lov_off, int stripeno)
+static obd_off lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
+                                 int stripeno)
 {
         unsigned long ssize  = lsm->lsm_stripe_size;
         unsigned long swidth = ssize * lsm->lsm_stripe_count;
-        unsigned long stripe_off;
+        unsigned long stripe_off, this_stripe;
 
-        if (lov_off == OBD_OBJECT_EOF)
-                return OBD_OBJECT_EOF;
+        if (lov_off == OBD_OBJECT_EOF || lov_off == 0)
+                return lov_off;
 
         /* do_div(a, b) returns a % b, and a = a / b */
         stripe_off = do_div(lov_off, swidth);
 
-        if (stripe_off < stripeno * ssize)
+        this_stripe = stripeno * ssize;
+        if (stripe_off <= this_stripe)
                 stripe_off = 0;
-        else
-                stripe_off -= stripeno * ssize;
+        else {
+                stripe_off -= this_stripe;
+
+                if (stripe_off > ssize)
+                        stripe_off = ssize;
+        }
+
 
         return lov_off * ssize + stripe_off;
 }
 
-/* compute which stripe offset "lov_off" will be written into */
-static int lov_stripe_which(struct lov_stripe_md *lsm, __u64 lov_off)
+/* compute which stripe number "lov_off" will be written into */
+static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
 {
         unsigned long ssize  = lsm->lsm_stripe_size;
         unsigned long swidth = ssize * lsm->lsm_stripe_count;
@@ -626,8 +688,8 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
 
         lov = &export->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
-                __u64 starti = lov_offset(lsm, start, i);
-                __u64 endi = lov_offset(lsm, end, i);
+                obd_off starti = lov_stripe_offset(lsm, start, i);
+                obd_off endi = lov_stripe_offset(lsm, end, i);
                 int err;
 
                 if (starti == endi)
@@ -727,7 +789,7 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn,
         our_cb->data = cbd;
 
         for (i = 0; i < oa_bufs; i++) {
-                where[i] = lov_stripe_which(lsm, pga[i].off);
+                where[i] = lov_stripe_number(lsm, pga[i].off);
                 if (stripeinfo[where[i]].bufct++ == 0)
                         atomic_inc(&our_cb->refcount);
         }
@@ -747,7 +809,7 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn,
                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
                 LASSERT(shift < oa_bufs);
                 ioarr[shift] = pga[i];
-                ioarr[shift].off = lov_offset(lsm, pga[i].off, which);
+                ioarr[shift].off = lov_stripe_offset(lsm, pga[i].off, which);
                 stripeinfo[which].subcount++;
         }
 
@@ -807,8 +869,8 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
                 struct ldlm_extent sub_ext;
                 struct lov_stripe_md submd;
 
-                sub_ext.start = lov_offset(lsm, extent->start, i);
-                sub_ext.end = lov_offset(lsm, extent->end, i);
+                sub_ext.start = lov_stripe_offset(lsm, extent->start, i);
+                sub_ext.end = lov_stripe_offset(lsm, extent->end, i);
                 if (sub_ext.start == sub_ext.end)
                         continue;