From 118d37e4b981234db7fc35bdf10b6adaca34cc8f Mon Sep 17 00:00:00 2001 From: pschwan Date: Thu, 5 Sep 2002 07:05:41 +0000 Subject: [PATCH] - Fixed some error messages in the LOV - lov_brw was using the same callback structure for two sets of callbacks, thus overloading the refcount and causing serious problems (sometimes failing to free the callback data, and I could believe sometimes freeing it too soon). Alas, this does not appear to completely fix the iozone bugs we're seeing. --- lustre/include/linux/lustre_lib.h | 3 +- lustre/llite/rw.c | 27 ++++++------ lustre/lov/lov_obd.c | 93 ++++++++++++++++++++------------------- 3 files changed, 63 insertions(+), 60 deletions(-) diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index a166ee9..72f4c5c 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -81,7 +81,8 @@ struct io_cb_data { int complete; int err; struct ptlrpc_bulk_desc *desc; - brw_callback_t cb; + brw_callback_t cb; + void *data; }; int ll_sync_io_cb(struct io_cb_data *data, int err, int phase); diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 3e1d53f..bbdfad3 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -162,13 +162,13 @@ static int ll_commit_write(struct file *file, struct page *page, int err; loff_t size; struct io_cb_data *cbd = ll_init_cb(); + ENTRY; pg.pg = page; pg.count = to; pg.off = (((obd_off)page->index) << PAGE_SHIFT); pg.flag = create ? OBD_BRW_CREATE : 0; - ENTRY; if (!cbd) RETURN(-ENOMEM); @@ -240,26 +240,31 @@ void ll_truncate(struct inode *inode) int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf, unsigned long blocknr, int blocksize) { - obd_count bufs_per_obdo = iobuf->nr_pages; + obd_count bufs_per_obdo = iobuf->nr_pages; struct ll_inode_info *lli = ll_i2info(inode); struct lov_stripe_md *md = lli->lli_smd; struct brw_page *pga; - int rc = 0; - int i; - struct io_cb_data *cbd = ll_init_cb(); + int i, rc = 0; + struct io_cb_data *cbd; ENTRY; - if (!cbd) + if (!md || !md->lmd_object_id) RETURN(-ENOMEM); if (blocksize != PAGE_SIZE) { CERROR("direct_IO blocksize != PAGE_SIZE\n"); - return -EINVAL; + RETURN(-EINVAL); } + cbd = ll_init_cb(); + if (!cbd) + RETURN(-ENOMEM); + OBD_ALLOC(pga, sizeof(*pga) * bufs_per_obdo); - if (!pga) - GOTO(out, rc = -ENOMEM); + if (!pga) { + OBD_FREE(cbd, sizeof(*cbd)); + RETURN(-ENOMEM); + } /* NB: we can't use iobuf->maplist[i]->index for the offset * instead of "blocknr" because ->index contains garbage. @@ -273,16 +278,12 @@ int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf, pga[i].flag = OBD_BRW_CREATE; } - if (!md || !md->lmd_object_id) - GOTO(out, rc = -ENOMEM); - rc = obd_brw(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, ll_i2obdconn(inode), md, bufs_per_obdo, pga, ll_sync_io_cb, cbd); if (rc == 0) rc = bufs_per_obdo * PAGE_SIZE; -out: OBD_FREE(pga, sizeof(*pga) * bufs_per_obdo); RETURN(rc); } diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 2fc0400..bbd8bd1 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -199,7 +199,8 @@ static inline int lov_mds_md_size(struct obd_device *obd) } /* the LOV counts on oa->o_id to be set as the LOV object id */ -static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea) +static int lov_create(struct lustre_handle *conn, struct obdo *oa, + struct lov_stripe_md **ea) { int rc = 0, i; struct obdo tmp; @@ -216,7 +217,7 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_st RETURN(-EINVAL); lov = &export->exp_obd->u.lov; - oa->o_easize = lov_stripe_md_size(export->exp_obd); + oa->o_easize = lov_stripe_md_size(export->exp_obd); if (!*ea) { OBD_ALLOC(*ea, oa->o_easize); if (! *ea) @@ -226,9 +227,8 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_st md = *ea; md->lmd_easize = lov_mds_md_size(export->exp_obd); md->lmd_object_id = oa->o_id; - if (!md->lmd_stripe_count) { + if (!md->lmd_stripe_count) md->lmd_stripe_count = lov->desc.ld_default_stripe_count; - } if (!md->lmd_stripe_size) md->lmd_stripe_size = lov->desc.ld_default_stripe_size; @@ -251,14 +251,13 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_st out_cleanup: if (rc) { int i2, rc2; - for (i2 = 0 ; i2 < i ; i2++) { + for (i2 = 0; i2 < i; i2++) { /* destroy already created objects here */ tmp.o_id = md->lmd_oinfo[i].loi_id; rc2 = obd_destroy(&lov->tgts[i].conn, &tmp, NULL); - if (rc2) { - CERROR("Failed to remove object from target %d\n", - i2); - } + if (rc2) + CERROR("Failed to remove object from target " + "%d\n", i2); } } return rc; @@ -274,7 +273,7 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, ENTRY; if (!md) { - CERROR("LOV requires striping ea for desctruction\n"); + CERROR("LOV requires striping ea for destruction\n"); RETURN(-EINVAL); } @@ -287,10 +286,9 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, memcpy(&tmp, oa, sizeof(tmp)); tmp.o_id = md->lmd_oinfo[i].loi_id; rc = obd_destroy(&lov->tgts[i].conn, &tmp, NULL); - if (!rc) { + if (rc) CERROR("Error destroying object %Ld on %d\n", - tmp.o_id, i); - } + md->lmd_oinfo[i].loi_id, i); } RETURN(rc); } @@ -329,7 +327,7 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, err = obd_getattr(&lov->tgts[i].conn, &tmp, NULL); if (err) { CERROR("Error getattr object %Ld on %d: err = %d\n", - tmp.o_id, i, err); + md->lmd_oinfo[i].loi_id, i, err); if (!rc) rc = err; continue; /* XXX or break? */ @@ -377,10 +375,9 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa, tmp.o_id = md->lmd_oinfo[i].loi_id; rc = obd_setattr(&lov->tgts[i].conn, &tmp, NULL); - if (!rc) { + if (rc) CERROR("Error setattr object %Ld on %d\n", tmp.o_id, i); - } } RETURN(rc); } @@ -412,13 +409,12 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa, if (rc) { rc2 = rc; CERROR("Error open object %Ld on %d\n", - tmp.o_id, i); + md->lmd_oinfo[i].loi_id, i); } } RETURN(rc2); } - static int lov_close(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *md) { @@ -443,10 +439,9 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa, tmp.o_id = md->lmd_oinfo[i].loi_id; rc = obd_close(&lov->tgts[i].conn, &tmp, NULL); - if (rc) { + if (rc) CERROR("Error close object %Ld on %d\n", - tmp.o_id, i); - } + md->lmd_oinfo[i].loi_id, i); } RETURN(rc); } @@ -495,7 +490,7 @@ int lov_stripe_which(struct lov_stripe_md *md, __u64 in) { __u32 ssz = md->lmd_stripe_size; int j; - j = (((__u32) in)/ssz) % md->lmd_stripe_count; + j = (((__u32) in) / ssz) % md->lmd_stripe_count; return j; } @@ -534,15 +529,14 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa, rc = obd_punch(&lov->tgts[i].conn, &tmp, NULL, starti, endi); - if (!rc) { + if (rc) CERROR("Error punch object %Ld on %d\n", - tmp.o_id, i); - } + md->lmd_oinfo[i].loi_id, i); } RETURN(rc); } -int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase) +static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase) { int ret = 0; ENTRY; @@ -554,7 +548,7 @@ int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase) if (err) cbd->err = err; if (atomic_dec_and_test(&cbd->refcount)) - ret = cbd->cb(cbd, cbd->err, phase); + ret = cbd->cb(cbd->data, cbd->err, phase); RETURN(ret); } @@ -579,21 +573,24 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn, } *stripeinfo; struct brw_page *ioarr; int rc, i; + struct io_cb_data *our_cb; ENTRY; lov = &export->exp_obd->u.lov; + our_cb = ll_init_cb(); + if (!our_cb) + RETURN(-ENOMEM); + OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo)); if (!stripeinfo) - RETURN(-ENOMEM); + GOTO(out_cbdata, rc = -ENOMEM); OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs); - if (!ioarr) { - OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo)); - RETURN(-ENOMEM); - } + if (!ioarr) + GOTO(out_sinfo, rc = -ENOMEM); - for (i = 0; i < oa_bufs; i++ ) { + for (i = 0; i < oa_bufs; i++) { int which; which = lov_stripe_which(md, pga[i].off); stripeinfo[which].bufct++; @@ -603,42 +600,48 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn, if (i > 0) stripeinfo[i].index = stripeinfo[i - 1].index + stripeinfo[i - 1].bufct; - stripeinfo[i].md.lmd_object_id = - md->lmd_oinfo[i].loi_id; + stripeinfo[i].md.lmd_object_id = md->lmd_oinfo[i].loi_id; } - for (i = 0; i < oa_bufs; i++ ) { + for (i = 0; i < oa_bufs; i++) { int which, shift; which = lov_stripe_which(md, pga[i].off); shift = stripeinfo[which].index; + LASSERT(shift + stripeinfo[which].subcount < oa_bufs); ioarr[shift + stripeinfo[which].subcount] = pga[i]; ioarr[shift + stripeinfo[which].subcount].off = lov_offset(md, pga[i].off, which); stripeinfo[which].subcount++; } - cbd->cb = callback; + our_cb->cb = callback; + our_cb->data = cbd; /* This is the only race-free way I can think of to get the refcount * correct. -phil */ - atomic_set(&cbd->refcount, 0); + atomic_set(&our_cb->refcount, 0); for (i = 0; i < stripe_count; i++) if (stripeinfo[i].bufct) - atomic_inc(&cbd->refcount); + atomic_inc(&our_cb->refcount); for (i = 0; i < stripe_count; i++) { int shift = stripeinfo[i].index; - if (stripeinfo[i].bufct) + if (stripeinfo[i].bufct) { + LASSERT(shift < oa_bufs); obd_brw(cmd, &lov->tgts[i].conn, &stripeinfo[i].md, stripeinfo[i].bufct, &ioarr[shift], - lov_osc_brw_callback, cbd); + lov_osc_brw_callback, our_cb); + } } rc = callback(cbd, 0, CB_PHASE_START); - OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo)); OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs); + out_sinfo: + OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo)); + out_cbdata: + OBD_FREE(our_cb, sizeof(*our_cb)); RETURN(rc); } @@ -680,10 +683,9 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *md, type, &sub_ext, sizeof(sub_ext), mode, flags, cb, data, datalen, &(lockhs[i])); // XXX add a lock debug statement here - if (rc) { + if (rc) CERROR("Error obd_enqueue object %Ld subobj %Ld\n", md->lmd_object_id, md->lmd_oinfo[i].loi_id); - } } RETURN(rc); } @@ -714,10 +716,9 @@ static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *md, submd.lmd_object_id = md->lmd_oinfo[i].loi_id; submd.lmd_easize = sizeof(struct lov_mds_md); rc = obd_cancel(&lov->tgts[i].conn, &submd, mode, &lockhs[i]); - if (rc) { + if (rc) CERROR("Error cancel object %Ld subobj %Ld\n", md->lmd_object_id, md->lmd_oinfo[i].loi_id); - } } RETURN(rc); } -- 1.8.3.1