From 038aef0711a9588994fbceee26bf2c6a74e0ee60 Mon Sep 17 00:00:00 2001 From: shaver Date: Wed, 30 Oct 2002 03:31:47 +0000 Subject: [PATCH] - Bugzilla 214: Have LOV do its own timeout management, so that all OSCs get removed correctly, _and_ we don't crash as soon as we try to kick off recovery. What a deal! - Fix removal of OSC from LOV on OST failure (silly & and type-smashing list_head macros). - Lots of comments in lov_brw, so I don't have to figure it out from first principles next time. (I think they're correct, but Andreas will review to make sure.) - Skip inactive OSCs in lov_statfs, because otherwise we return -EIO and the caller will ignore all of the data we get from the active/working OSCs. - A bit more bulletproofing of the sync_io_timeout case -- if you see that message, please reopen the bug. - Be a little smarter about double-failed connections. Might make failure-in-recovery work better, and will definitely make multiply-detected connection failure work better. --- lustre/include/linux/lustre_lib.h | 4 +- lustre/llite/recover.c | 2 +- lustre/lov/lov_obd.c | 154 +++++++++++++++++++++++++++++++------- lustre/obdclass/genops.c | 12 ++- lustre/osc/osc_request.c | 2 + lustre/ptlrpc/recovd.c | 12 ++- 6 files changed, 149 insertions(+), 37 deletions(-) diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index 923260d..f3f59b9 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -487,8 +487,8 @@ static inline int obd_ioctl_getdata(char **buf, int *len, void *arg) * waitq or some similar mechanism, or an interrupt occurs (if the caller has * asked for interrupts to be detected). The timeout will only fire once, so * callers should take care that a timeout_handler which returns zero will take - * future steps to awaken the process. N.B. that these steps must include making - * the provided condition become true. + * future steps to awaken the process. N.B. that these steps must include + * making the provided condition become true. * * If the interrupt flag (lwi_signals) is non-zero, then the process will be * interruptible, and will be awakened by any "killable" signal (SIGTERM, diff --git a/lustre/llite/recover.c b/lustre/llite/recover.c index 3692042..04df215 100644 --- a/lustre/llite/recover.c +++ b/lustre/llite/recover.c @@ -90,7 +90,7 @@ static void prepare_osc(struct obd_import *imp) struct lustre_handle fakeconn; struct obd_ioctl_data ioc_data; struct obd_export *exp = - list_entry(¬ify_obd->obd_exports.next, + list_entry(notify_obd->obd_exports.next, struct obd_export, exp_obd_chain); fakeconn.addr = (__u64)(unsigned long)exp; fakeconn.cookie = exp->exp_cookie; diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 471c50d..0621574 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -978,19 +978,41 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa, RETURN(rc); } +struct lov_brw_cb_data { + atomic_t lbc_remaining; + wait_queue_head_t lbc_waitq; +}; + static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase) { int ret = 0; + struct lov_brw_cb_data *lbc = cbd->data; ENTRY; - if (phase == CB_PHASE_START) + + if (phase == CB_PHASE_START) { + /* We raise the reference count here, so that it's still + * around when we go to inspect in case of failure. + * Balanced in the loop at the bottom of lov_brw. + */ + atomic_inc(&cbd->desc->bd_refcount); RETURN(0); + } if (phase == CB_PHASE_FINISH) { - if (err) + if (err) { + CDEBUG(D_HA, "err %d on BRW to %s\n", err, + cbd->desc->bd_connection->c_remote_uuid); cbd->err = err; - if (atomic_dec_and_test(&cbd->refcount)) - ret = cbd->cb(cbd->data, cbd->err, phase); + cbd->complete = 0; + } else { + CDEBUG(D_HA, "BRW to %s complete\n", + cbd->desc->bd_connection->c_remote_uuid); + cbd->err = 0; + cbd->complete = 1; + } + if (atomic_dec_and_test(&lbc->lbc_remaining)) + wake_up(&lbc->lbc_waitq); RETURN(ret); } @@ -998,10 +1020,10 @@ static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase) return 0; } -static inline int lov_brw(int cmd, struct lustre_handle *conn, - struct lov_stripe_md *lsm, obd_count oa_bufs, - struct brw_page *pga, - brw_callback_t callback, struct io_cb_data *cbd) +static int lov_brw(int cmd, struct lustre_handle *conn, + struct lov_stripe_md *lsm, obd_count oa_bufs, + struct brw_page *pga, brw_callback_t callback, + struct io_cb_data *cbd) { int stripe_count = lsm->lsm_stripe_count; struct obd_export *export = class_conn2export(conn); @@ -1015,8 +1037,10 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn, } *stripeinfo, *si, *si_last; struct brw_page *ioarr; int rc, i; - struct io_cb_data *our_cb; + struct io_cb_data *cb_data; struct lov_oinfo *loi; + struct lov_brw_cb_data lbc; + struct l_wait_info lwi; int *where; ENTRY; @@ -1033,13 +1057,9 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn, lov = &export->exp_obd->u.lov; - our_cb = ll_init_cb(); - if (!our_cb) - RETURN(-ENOMEM); - - OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo)); + OBD_ALLOC(stripeinfo, sizeof(*stripeinfo) * stripe_count); if (!stripeinfo) - GOTO(out_cbdata, rc = -ENOMEM); + RETURN(-ENOMEM); OBD_ALLOC(where, sizeof(*where) * oa_bufs); if (!where) @@ -1049,18 +1069,25 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn, if (!ioarr) GOTO(out_where, rc = -ENOMEM); - /* This is the only race-free way I can think of to get the refcount - * correct. -phil */ - atomic_set(&our_cb->refcount, 0); - our_cb->cb = callback; - our_cb->data = cbd; + OBD_ALLOC(cb_data, sizeof(*cb_data) * stripe_count); + if (!cb_data) + GOTO(out_ioarr, rc = -ENOMEM); + + init_waitqueue_head(&lbc.lbc_waitq); + atomic_set(&lbc.lbc_remaining, 0); + /* Compute the page count per stripe, and set where[i] to be the + * stripe number for this brw_page. + */ for (i = 0; i < oa_bufs; i++) { where[i] = lov_stripe_number(lsm, pga[i].off); if (stripeinfo[where[i]].bufct++ == 0) - atomic_inc(&our_cb->refcount); + atomic_inc(&lbc.lbc_remaining); } + /* Find the starting offset within the page array for each stripeinfo, + * and the index within this LOV's vector of possible OSCs. + */ for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo; i < stripe_count; i++, loi++, si_last = si, si++) { if (i > 0) @@ -1069,6 +1096,10 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn, si->ost_idx = loi->loi_ost_idx; } + /* Repack the requests densely into ioarr, with each target's pages in + * order, and then grouped by stripe order (A1A2A3B1B2B3C1C2, for a + * write with striping pattern of ABCABCAB)). + */ for (i = 0; i < oa_bufs; i++) { int which = where[i]; int shift; @@ -1080,27 +1111,93 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn, stripeinfo[which].subcount++; } + /* For each target to which we are writing -- some stripes might have + * zero pages to write, e.g. the write is < stripe_count *stripe_width + * -- call obd_brw for the range of brw_pages sent to that target. + * ([offset, count] will be A:[0, 3], B:[3, 3], C:[6, 2] for the + * example above.) + */ for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) { int shift = si->index; if (si->bufct) { + struct io_cb_data *data = &cb_data[i]; LASSERT(shift < oa_bufs); + + /* This looks like ll_init_cb, except in-place. */ + init_waitqueue_head(&data->waitq); + atomic_set(&data->refcount, 2); + data->data = &lbc; + data->cb = callback; + /* XXX handle error returns here */ - obd_brw(cmd, &lov->tgts[si->ost_idx].conn, - &si->lsm, si->bufct, &ioarr[shift], - lov_osc_brw_callback, our_cb); + rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn, + &si->lsm, si->bufct, &ioarr[shift], + lov_osc_brw_callback, data); + + /* On error, pretend this didn't exist, because we won't + * have seen a START call to add a ref to this OBD's + * desc, and so we don't want to muddle with the + * likely-deleted desc below. + */ + if (rc) + si->bufct = 0; + } } - rc = callback(cbd, 0, CB_PHASE_START); + /* A brief note on the recovery story here: + * + * Each obd_brw gets its own io_cb_data, and they're all fused into a + * single allocation (cb_data). The lov_osc_brw_callback invocation + * that results from each obd_brw's underlying bulk send/recv completing + * will mark that io_cb_data as complete, and decrement the + * lbc_remaining count in the LOV's "master" callback data. + * + * The LOV will go to sleep as soon as all the (async) obd_brws have + * been started. lov_osc_brw_callback will wake it up iff all OSCs have + * completed (lbc_remaining has reached zero). If the timeout expires, + * the LOV will walk the cb_data vector and initiate recovery on any + * connection associated with an as-yet-incomplete desc. + */ + + /* XXX Make sure that the callback doesn't block here, by faking + * XXX "completion". This is very very gross, and we might be + * XXX better off just not calling the callback at all. + */ + cbd->complete = 1; + (void)callback(cbd, 0, CB_PHASE_START); + /* XXX Watch us ignore the return code! */ + + lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, NULL, NULL, NULL); + rc = l_wait_event(lbc.lbc_waitq, atomic_read(&lbc.lbc_remaining) == 0, + &lwi); + + for (i = 0; i < oa_bufs; i++) { + if (stripeinfo[i].bufct == 0) + continue; + + if (!cb_data[i].complete) { + CERROR("invoking recovery for OSC %s: %d\n", + lov->tgts[stripeinfo[i].ost_idx].uuid, rc); + recovd_conn_fail(cb_data[i].desc->bd_connection); + } + ptlrpc_bulk_decref(cb_data[i].desc); + } + + (void)callback(cbd, 0, CB_PHASE_FINISH); + /* XXX We need an error reporting/bytes-written story here, statim. */ + + rc = 0; + OBD_FREE(cb_data, sizeof(*cb_data) * oa_bufs); + out_ioarr: OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs); out_where: OBD_FREE(where, sizeof(*where) * oa_bufs); out_sinfo: OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo)); - out_cbdata: - OBD_FREE(our_cb, sizeof(*our_cb)); + RETURN(rc); } @@ -1256,6 +1353,9 @@ static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs) for (i = 0; i < lov->desc.ld_tgt_count; i++) { int err; + if (!lov->tgts[i].active) + continue; + err = obd_statfs(&lov->tgts[i].conn, &lov_sfs); if (err) { CERROR("Error statfs OSC %s idx %d: err = %d\n", diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index aaa0316..9279dc1 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -37,7 +37,11 @@ static int sync_io_timeout(void *data) LASSERT(cbd); desc = cbd->desc; - LASSERT(desc); + if (!desc) { + CERROR("no desc for timed-out BRW, reopen Bugzilla 214!\n"); + RETURN(0); /* back to sleep -- someone had better wake us up! */ + } + LASSERT(desc->bd_connection); CERROR("IO of %d pages to/from %s:%d (conn %p) timed out\n", @@ -79,15 +83,15 @@ int ll_sync_io_cb(struct io_cb_data *data, int err, int phase) if (atomic_dec_and_test(&data->refcount)) OBD_FREE(data, sizeof(*data)); if (ret == -EINTR) - return ret; + RETURN(ret); } else if (phase == CB_PHASE_FINISH) { data->err = err; data->complete = 1; wake_up(&data->waitq); if (atomic_dec_and_test(&data->refcount)) OBD_FREE(data, sizeof(*data)); - return err; - } else + RETURN(err); + } else LBUG(); EXIT; return 0; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 9e7d257..2bc9093 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -391,6 +391,7 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm, cb_data->callback = callback; cb_data->cb_data = data; + CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc); data->desc = desc; desc->bd_cb_data = cb_data; @@ -509,6 +510,7 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md, cb_data->callback = callback; cb_data->cb_data = data; + CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc); data->desc = desc; desc->bd_cb_data = cb_data; diff --git a/lustre/ptlrpc/recovd.c b/lustre/ptlrpc/recovd.c index 180c339..5edef0c 100644 --- a/lustre/ptlrpc/recovd.c +++ b/lustre/ptlrpc/recovd.c @@ -116,10 +116,9 @@ void recovd_conn_fail(struct ptlrpc_connection *conn) } spin_lock(&recovd->recovd_lock); - if (rd->rd_phase != RD_IDLE) { - CERROR("connection %p to %s already in recovery\n", + if (rd->rd_phase == RD_TROUBLED || rd->rd_phase == RD_PREPARING) { + CDEBUG(D_HA, "connection %p to %s already in recovery\n", conn, conn->c_remote_uuid); - /* XXX need to distinguish from failure-in-recovery */ spin_unlock(&recovd->recovd_lock); EXIT; return; @@ -128,6 +127,13 @@ void recovd_conn_fail(struct ptlrpc_connection *conn) CERROR("connection %p to %s failed\n", conn, conn->c_remote_uuid); list_del(&rd->rd_managed_chain); list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items); + if (rd->rd_phase != RD_IDLE) { + CDEBUG(D_HA, + "connection %p to %s failed in recovery: restarting\n", + conn, conn->c_remote_uuid); + /* XXX call callback with PHASE_FAILED? */ + rd->rd_next_phase = RD_TROUBLED; + } rd->rd_phase = RD_TROUBLED; dump_lists(recovd); spin_unlock(&recovd->recovd_lock); -- 1.8.3.1