From 6572c3c5869c3ef692212b36444b70d7d7a30d58 Mon Sep 17 00:00:00 2001 From: adilger Date: Fri, 1 Nov 2002 22:33:08 +0000 Subject: [PATCH] Revert Mike's recovery changes from LOV, for the greater good of being able to use LOV again. --- lustre/lov/lov_obd.c | 151 +++++++++------------------------------------------ 1 file changed, 27 insertions(+), 124 deletions(-) diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 7e1d1cb..97429ee 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -982,41 +982,19 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa, RETURN(rc); } -struct lov_brw_cb_data { - atomic_t lbc_remaining; - wait_queue_head_t lbc_waitq; -}; - static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase) { int ret = 0; - struct lov_brw_cb_data *lbc = cbd->data; ENTRY; - - if (phase == CB_PHASE_START) { - /* We raise the reference count here, so that it's still - * around when we go to inspect in case of failure. - * Balanced in the loop at the bottom of lov_brw. - */ - atomic_inc(&cbd->desc->bd_refcount); + if (phase == CB_PHASE_START) RETURN(0); - } if (phase == CB_PHASE_FINISH) { - if (err) { - CDEBUG(D_HA, "err %d on BRW to %s\n", err, - cbd->desc->bd_connection->c_remote_uuid); + if (err) cbd->err = err; - cbd->complete = 0; - } else { - CDEBUG(D_HA, "BRW to %s complete\n", - cbd->desc->bd_connection->c_remote_uuid); - cbd->err = 0; - cbd->complete = 1; - } - if (atomic_dec_and_test(&lbc->lbc_remaining)) - wake_up(&lbc->lbc_waitq); + if (atomic_dec_and_test(&cbd->refcount)) + ret = cbd->cb(cbd->data, cbd->err, phase); RETURN(ret); } @@ -1024,10 +1002,10 @@ static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase) return 0; } -static int lov_brw(int cmd, struct lustre_handle *conn, - struct lov_stripe_md *lsm, obd_count oa_bufs, - struct brw_page *pga, brw_callback_t callback, - struct io_cb_data *cbd) +static inline int lov_brw(int cmd, struct lustre_handle *conn, + struct lov_stripe_md *lsm, obd_count oa_bufs, + struct brw_page *pga, + brw_callback_t callback, struct io_cb_data *cbd) { int stripe_count = lsm->lsm_stripe_count; struct obd_export *export = class_conn2export(conn); @@ -1041,10 +1019,8 @@ static int lov_brw(int cmd, struct lustre_handle *conn, } *stripeinfo, *si, *si_last; struct brw_page *ioarr; int rc, i; - struct io_cb_data *cb_data; + struct io_cb_data *our_cb; struct lov_oinfo *loi; - struct lov_brw_cb_data lbc; - struct l_wait_info lwi; int *where; ENTRY; @@ -1061,10 +1037,14 @@ static int lov_brw(int cmd, struct lustre_handle *conn, lov = &export->exp_obd->u.lov; - OBD_ALLOC(stripeinfo, sizeof(*stripeinfo) * stripe_count); - if (!stripeinfo) + our_cb = ll_init_cb(); + if (!our_cb) RETURN(-ENOMEM); + OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo)); + if (!stripeinfo) + GOTO(out_cbdata, rc = -ENOMEM); + OBD_ALLOC(where, sizeof(*where) * oa_bufs); if (!where) GOTO(out_sinfo, rc = -ENOMEM); @@ -1073,25 +1053,18 @@ static int lov_brw(int cmd, struct lustre_handle *conn, if (!ioarr) GOTO(out_where, rc = -ENOMEM); - OBD_ALLOC(cb_data, sizeof(*cb_data) * stripe_count); - if (!cb_data) - GOTO(out_ioarr, rc = -ENOMEM); - - init_waitqueue_head(&lbc.lbc_waitq); - atomic_set(&lbc.lbc_remaining, 0); + /* This is the only race-free way I can think of to get the refcount + * correct. -phil */ + atomic_set(&our_cb->refcount, 0); + our_cb->cb = callback; + our_cb->data = cbd; - /* Compute the page count per stripe, and set where[i] to be the - * stripe number for this brw_page. - */ for (i = 0; i < oa_bufs; i++) { where[i] = lov_stripe_number(lsm, pga[i].off); if (stripeinfo[where[i]].bufct++ == 0) - atomic_inc(&lbc.lbc_remaining); + atomic_inc(&our_cb->refcount); } - /* Find the starting offset within the page array for each stripeinfo, - * and the index within this LOV's vector of possible OSCs. - */ for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo; i < stripe_count; i++, loi++, si_last = si, si++) { if (i > 0) @@ -1100,10 +1073,6 @@ static int lov_brw(int cmd, struct lustre_handle *conn, si->ost_idx = loi->loi_ost_idx; } - /* Repack the requests densely into ioarr, with each target's pages in - * order, and then grouped by stripe order (A1A2A3B1B2B3C1C2, for a - * write with striping pattern of ABCABCAB)). - */ for (i = 0; i < oa_bufs; i++) { int which = where[i]; int shift; @@ -1115,93 +1084,27 @@ static int lov_brw(int cmd, struct lustre_handle *conn, stripeinfo[which].subcount++; } - /* For each target to which we are writing -- some stripes might have - * zero pages to write, e.g. the write is < stripe_count *stripe_width - * -- call obd_brw for the range of brw_pages sent to that target. - * ([offset, count] will be A:[0, 3], B:[3, 3], C:[6, 2] for the - * example above.) - */ for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) { int shift = si->index; if (si->bufct) { - struct io_cb_data *data = &cb_data[i]; LASSERT(shift < oa_bufs); - - /* This looks like ll_init_cb, except in-place. */ - init_waitqueue_head(&data->waitq); - atomic_set(&data->refcount, 2); - data->data = &lbc; - data->cb = callback; - /* XXX handle error returns here */ - rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn, - &si->lsm, si->bufct, &ioarr[shift], - lov_osc_brw_callback, data); - - /* On error, pretend this didn't exist, because we won't - * have seen a START call to add a ref to this OBD's - * desc, and so we don't want to muddle with the - * likely-deleted desc below. - */ - if (rc) - si->bufct = 0; - + obd_brw(cmd, &lov->tgts[si->ost_idx].conn, + &si->lsm, si->bufct, &ioarr[shift], + lov_osc_brw_callback, our_cb); } } - /* A brief note on the recovery story here: - * - * Each obd_brw gets its own io_cb_data, and they're all fused into a - * single allocation (cb_data). The lov_osc_brw_callback invocation - * that results from each obd_brw's underlying bulk send/recv completing - * will mark that io_cb_data as complete, and decrement the - * lbc_remaining count in the LOV's "master" callback data. - * - * The LOV will go to sleep as soon as all the (async) obd_brws have - * been started. lov_osc_brw_callback will wake it up iff all OSCs have - * completed (lbc_remaining has reached zero). If the timeout expires, - * the LOV will walk the cb_data vector and initiate recovery on any - * connection associated with an as-yet-incomplete desc. - */ - - /* XXX Make sure that the callback doesn't block here, by faking - * XXX "completion". This is very very gross, and we might be - * XXX better off just not calling the callback at all. - */ - cbd->complete = 1; - (void)callback(cbd, 0, CB_PHASE_START); - /* XXX Watch us ignore the return code! */ - - lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, NULL, NULL, NULL); - rc = l_wait_event(lbc.lbc_waitq, atomic_read(&lbc.lbc_remaining) == 0, - &lwi); - - for (i = 0; i < oa_bufs; i++) { - if (stripeinfo[i].bufct == 0) - continue; - - if (!cb_data[i].complete) { - CERROR("invoking recovery for OSC %s: %d\n", - lov->tgts[stripeinfo[i].ost_idx].uuid, rc); - recovd_conn_fail(cb_data[i].desc->bd_connection); - } - ptlrpc_bulk_decref(cb_data[i].desc); - } - - (void)callback(cbd, 0, CB_PHASE_FINISH); - /* XXX We need an error reporting/bytes-written story here, statim. */ - - rc = 0; + rc = callback(cbd, 0, CB_PHASE_START); - OBD_FREE(cb_data, sizeof(*cb_data) * oa_bufs); - out_ioarr: OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs); out_where: OBD_FREE(where, sizeof(*where) * oa_bufs); out_sinfo: OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo)); - + out_cbdata: + OBD_FREE(our_cb, sizeof(*our_cb)); RETURN(rc); } -- 1.8.3.1