From b91e8a108cffb78db983d3be0238fef7043eb4b4 Mon Sep 17 00:00:00 2001 From: adilger Date: Wed, 16 Oct 2002 20:13:39 +0000 Subject: [PATCH] - helper lov_stripe_size() calculates apparent file size from each stripe - helper lov_merge_attrs() uses lov_stripe_size() and also merges other attrs - lov_getattr() and lov_open() use lov_merge_attrs() to return size+blocks - fix lov_stripe_offset() to work properly for truncate up operations - don't allocate obdos on the stack if we can avoid it - lov_setattr() is not being used - add LBUG() to ensure we notice when/if it does start being used and do a code audit for correctness --- lustre/lov/lov_obd.c | 208 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 135 insertions(+), 73 deletions(-) diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 946afc0..09f842c 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -220,29 +220,33 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_obd *lov; struct lov_stripe_md *lsm; struct lov_oinfo *loi; + struct obdo *tmp; int sub_offset, stripe_offset; int ost_count; int rc = 0, i; ENTRY; - if (!ea) { - CERROR("lov_create needs ea\n"); - RETURN(-EINVAL); - } + LASSERT(ea); if (!export) RETURN(-EINVAL); + tmp = obdo_alloc(); + if (!tmp) + RETURN(-ENOMEM); + + lov = &export->exp_obd->u.lov; ost_count = lov->desc.ld_tgt_count; oa->o_easize = lov_stripe_md_size(ost_count); - if (!*ea) { - OBD_ALLOC(*ea, oa->o_easize); - if (!*ea) - RETURN(-ENOMEM); - } lsm = *ea; + if (!lsm) { + OBD_ALLOC(lsm, oa->o_easize); + if (!lsm) + GOTO(out_tmp, rc = -ENOMEM); + } + LASSERT(oa->o_valid & OBD_MD_FLID); lsm->lsm_magic = LOV_MAGIC; lsm->lsm_mds_easize = lov_mds_md_size(ost_count); @@ -266,46 +270,48 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { struct lov_stripe_md obj_md; struct lov_stripe_md *obj_mdp = &obj_md; - struct obdo tmp; int ost_idx = (((sub_offset + i) % lsm->lsm_stripe_count) + stripe_offset) % ost_count; /* create data objects with "parent" OA */ - memcpy(&tmp, oa, sizeof(tmp)); - tmp.o_easize = sizeof(struct lov_stripe_md); - rc = obd_create(&lov->tgts[ost_idx].conn, &tmp, &obj_mdp); + memcpy(tmp, oa, sizeof(*tmp)); + tmp->o_easize = sizeof(struct lov_stripe_md); + rc = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp); if (rc) { CERROR("error creating objid "LPX64" sub-object on " "OST idx %d: rc = %d\n", oa->o_id, ost_idx, rc); GOTO(out_cleanup, rc); } - loi->loi_id = tmp.o_id; - loi->loi_size = tmp.o_size; + loi->loi_id = tmp->o_id; + loi->loi_size = tmp->o_size; loi->loi_ost_idx = ost_idx; CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n", lsm->lsm_object_id, loi->loi_id, ost_idx); } + *ea = lsm; + + out_tmp: + obdo_free(tmp); + return rc; + out_cleanup: - if (rc) { - while (i-- > 0) { - struct obdo tmp; - int err; - - --loi; - /* destroy already created objects here */ - memcpy(&tmp, oa, sizeof(tmp)); - tmp.o_id = loi->loi_id; - err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, - &tmp, NULL); - if (err) - CERROR("Failed to remove objid "LPX64" subobj " - LPX64" on OST idx %d: rc = %d\n", - oa->o_id, loi->loi_id, loi->loi_ost_idx, - err); - } + while (i-- > 0) { + int err; + + --loi; + /* destroy already created objects here */ + memcpy(tmp, oa, sizeof(*tmp)); + tmp->o_id = loi->loi_id; + err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL); + if (err) + CERROR("Failed to uncreate objid "LPX64" subobj " + LPX64" on OST idx %d: rc = %d\n", + oa->o_id, loi->loi_id, loi->loi_ost_idx, + err); } - return rc; + OBD_FREE(lsm, oa->o_easize); + goto out_tmp; } static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, @@ -346,6 +352,55 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, RETURN(rc); } +/* compute object size given "stripeno" and the ost size */ +static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size, + int stripeno) +{ + unsigned long ssize = lsm->lsm_stripe_size; + unsigned long swidth = ssize * lsm->lsm_stripe_count; + unsigned long stripe_size; + obd_size lov_size; + + if (ost_size == 0) + return 0; + + /* do_div(a, b) returns a % b, and a = a / b */ + stripe_size = do_div(ost_size, ssize); + + if (stripe_size) + lov_size = ost_size * swidth + stripeno * ssize + stripe_size; + else + lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize; + + return lov_size; +} + +static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid, + struct lov_stripe_md *lsm, int stripeno, int *new) +{ + if (*new) { + obdo_cpy_md(tgt, src, valid); + if (valid & OBD_MD_FLSIZE) + tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno); + *new = 0; + } else { + if (valid & OBD_MD_FLSIZE) { + /* this handles sparse files properly */ + obd_size lov_size; + + lov_size = lov_stripe_size(lsm, src->o_size, stripeno); + if (lov_size > tgt->o_size) + tgt->o_size = lov_size; + } + if (valid & OBD_MD_FLBLOCKS) + tgt->o_blocks += src->o_blocks; + if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime) + tgt->o_ctime = src->o_ctime; + if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime) + tgt->o_mtime = src->o_mtime; + } +} + static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *lsm) { @@ -354,7 +409,7 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, struct lov_obd *lov; struct lov_oinfo *loi; int rc = 0, i; - int set = 0; + int new = 1; ENTRY; if (!lsm) { @@ -393,22 +448,7 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, rc = err; continue; /* XXX or break? */ } - if (!set) { - obdo_cpy_md(oa, &tmp, tmp.o_valid); - set = 1; - } else { -#warning FIXME: the size needs to be fixed for sparse files - if (tmp.o_valid & OBD_MD_FLSIZE) - oa->o_size += tmp.o_size; - if (tmp.o_valid & OBD_MD_FLBLOCKS) - oa->o_blocks += tmp.o_blocks; - if (tmp.o_valid & OBD_MD_FLCTIME && - oa->o_ctime < tmp.o_ctime) - oa->o_ctime = tmp.o_ctime; - if (tmp.o_valid & OBD_MD_FLMTIME && - oa->o_mtime < tmp.o_mtime) - oa->o_mtime = tmp.o_mtime; - } + lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &new); } RETURN(rc); } @@ -416,13 +456,18 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, static int lov_setattr(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *lsm) { - int rc = 0, i; struct obdo tmp; struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; struct lov_oinfo *loi; + int rc = 0, i; ENTRY; + /* Note that this code is currently unused, hence LBUG(), just + * to know when/if it is ever revived that it needs cleanups. + */ + LBUG(); + if (!lsm) { CERROR("LOV requires striping ea\n"); RETURN(-EINVAL); @@ -437,8 +482,8 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa, if (!export || !export->exp_obd) RETURN(-ENODEV); - if (oa->o_valid & OBD_MD_FLSIZE) - CERROR("setting size on an LOV object is totally broken\n"); + /* size changes should go through punch and not setattr */ + LASSERT(!(oa->o_valid & OBD_MD_FLSIZE)); lov = &export->exp_obd->u.lov; for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { @@ -463,10 +508,11 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa, static int lov_open(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *lsm) { - struct obdo tmp; + struct obdo *tmp; struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; struct lov_oinfo *loi; + int new = 1; int rc = 0, i; ENTRY; @@ -484,15 +530,21 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa, if (!export || !export->exp_obd) RETURN(-ENODEV); + tmp = obdo_alloc(); + if (!tmp) + RETURN(-ENOMEM); + lov = &export->exp_obd->u.lov; + oa->o_size = 0; + oa->o_blocks = 0; for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { int err; /* create data objects with "parent" OA */ - memcpy(&tmp, oa, sizeof(tmp)); - tmp.o_id = loi->loi_id; + memcpy(tmp, oa, sizeof(*tmp)); + tmp->o_id = loi->loi_id; - err = obd_open(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL); + err = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL); if (err) { CERROR("Error open objid "LPX64" subobj "LPX64 " on OST idx %d: rc = %d\n", @@ -501,14 +553,17 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa, if (!rc) rc = err; } + + lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new); } /* FIXME: returning an error, but having opened some objects is a bad * idea, since they will likely never be closed. We either * need to not return an error if _some_ objects could be * opened, and leave it to read/write to return -EIO (with * hopefully partial error status) or close all opened objects - * and return an error. + * and return an error. I think the former is preferred. */ + obdo_free(tmp); RETURN(rc); } @@ -563,28 +618,35 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa, #warning FIXME: merge these two functions now that they are nearly the same /* compute ost offset in stripe "stripeno" corresponding to offset "lov_off" */ -static __u64 lov_offset(struct lov_stripe_md *lsm, __u64 lov_off, int stripeno) +static obd_off lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off, + int stripeno) { unsigned long ssize = lsm->lsm_stripe_size; unsigned long swidth = ssize * lsm->lsm_stripe_count; - unsigned long stripe_off; + unsigned long stripe_off, this_stripe; - if (lov_off == OBD_OBJECT_EOF) - return OBD_OBJECT_EOF; + if (lov_off == OBD_OBJECT_EOF || lov_off == 0) + return lov_off; /* do_div(a, b) returns a % b, and a = a / b */ stripe_off = do_div(lov_off, swidth); - if (stripe_off < stripeno * ssize) + this_stripe = stripeno * ssize; + if (stripe_off <= this_stripe) stripe_off = 0; - else - stripe_off -= stripeno * ssize; + else { + stripe_off -= this_stripe; + + if (stripe_off > ssize) + stripe_off = ssize; + } + return lov_off * ssize + stripe_off; } -/* compute which stripe offset "lov_off" will be written into */ -static int lov_stripe_which(struct lov_stripe_md *lsm, __u64 lov_off) +/* compute which stripe number "lov_off" will be written into */ +static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off) { unsigned long ssize = lsm->lsm_stripe_size; unsigned long swidth = ssize * lsm->lsm_stripe_count; @@ -626,8 +688,8 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa, lov = &export->exp_obd->u.lov; for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { - __u64 starti = lov_offset(lsm, start, i); - __u64 endi = lov_offset(lsm, end, i); + obd_off starti = lov_stripe_offset(lsm, start, i); + obd_off endi = lov_stripe_offset(lsm, end, i); int err; if (starti == endi) @@ -727,7 +789,7 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn, our_cb->data = cbd; for (i = 0; i < oa_bufs; i++) { - where[i] = lov_stripe_which(lsm, pga[i].off); + where[i] = lov_stripe_number(lsm, pga[i].off); if (stripeinfo[where[i]].bufct++ == 0) atomic_inc(&our_cb->refcount); } @@ -747,7 +809,7 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn, shift = stripeinfo[which].index + stripeinfo[which].subcount; LASSERT(shift < oa_bufs); ioarr[shift] = pga[i]; - ioarr[shift].off = lov_offset(lsm, pga[i].off, which); + ioarr[shift].off = lov_stripe_offset(lsm, pga[i].off, which); stripeinfo[which].subcount++; } @@ -807,8 +869,8 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm, struct ldlm_extent sub_ext; struct lov_stripe_md submd; - sub_ext.start = lov_offset(lsm, extent->start, i); - sub_ext.end = lov_offset(lsm, extent->end, i); + sub_ext.start = lov_stripe_offset(lsm, extent->start, i); + sub_ext.end = lov_stripe_offset(lsm, extent->end, i); if (sub_ext.start == sub_ext.end) continue; -- 1.8.3.1