RETURN(rc);
}
+struct lov_brw_cb_data {
+ atomic_t lbc_remaining;
+ wait_queue_head_t lbc_waitq;
+};
+
static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase)
{
int ret = 0;
+ struct lov_brw_cb_data *lbc = cbd->data;
ENTRY;
- if (phase == CB_PHASE_START)
+
+ if (phase == CB_PHASE_START) {
+ /* We raise the reference count here, so that it's still
+ * around when we go to inspect in case of failure.
+ * Balanced in the loop at the bottom of lov_brw.
+ */
+ atomic_inc(&cbd->desc->bd_refcount);
RETURN(0);
+ }
if (phase == CB_PHASE_FINISH) {
- if (err)
+ if (err) {
+ CDEBUG(D_HA, "err %d on BRW to %s\n", err,
+ cbd->desc->bd_connection->c_remote_uuid);
cbd->err = err;
- if (atomic_dec_and_test(&cbd->refcount))
- ret = cbd->cb(cbd->data, cbd->err, phase);
+ cbd->complete = 0;
+ } else {
+ CDEBUG(D_HA, "BRW to %s complete\n",
+ cbd->desc->bd_connection->c_remote_uuid);
+ cbd->err = 0;
+ cbd->complete = 1;
+ }
+ if (atomic_dec_and_test(&lbc->lbc_remaining))
+ wake_up(&lbc->lbc_waitq);
RETURN(ret);
}
return 0;
}
-static inline int lov_brw(int cmd, struct lustre_handle *conn,
- struct lov_stripe_md *lsm, obd_count oa_bufs,
- struct brw_page *pga,
- brw_callback_t callback, struct io_cb_data *cbd)
+static int lov_brw(int cmd, struct lustre_handle *conn,
+ struct lov_stripe_md *lsm, obd_count oa_bufs,
+ struct brw_page *pga, brw_callback_t callback,
+ struct io_cb_data *cbd)
{
int stripe_count = lsm->lsm_stripe_count;
struct obd_export *export = class_conn2export(conn);
} *stripeinfo, *si, *si_last;
struct brw_page *ioarr;
int rc, i;
- struct io_cb_data *our_cb;
+ struct io_cb_data *cb_data;
struct lov_oinfo *loi;
+ struct lov_brw_cb_data lbc;
+ struct l_wait_info lwi;
int *where;
ENTRY;
lov = &export->exp_obd->u.lov;
- our_cb = ll_init_cb();
- if (!our_cb)
- RETURN(-ENOMEM);
-
- OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
+ OBD_ALLOC(stripeinfo, sizeof(*stripeinfo) * stripe_count);
if (!stripeinfo)
- GOTO(out_cbdata, rc = -ENOMEM);
+ RETURN(-ENOMEM);
OBD_ALLOC(where, sizeof(*where) * oa_bufs);
if (!where)
if (!ioarr)
GOTO(out_where, rc = -ENOMEM);
- /* This is the only race-free way I can think of to get the refcount
- * correct. -phil */
- atomic_set(&our_cb->refcount, 0);
- our_cb->cb = callback;
- our_cb->data = cbd;
+ OBD_ALLOC(cb_data, sizeof(*cb_data) * stripe_count);
+ if (!cb_data)
+ GOTO(out_ioarr, rc = -ENOMEM);
+
+ init_waitqueue_head(&lbc.lbc_waitq);
+ atomic_set(&lbc.lbc_remaining, 0);
+ /* Compute the page count per stripe, and set where[i] to be the
+ * stripe number for this brw_page.
+ */
for (i = 0; i < oa_bufs; i++) {
where[i] = lov_stripe_number(lsm, pga[i].off);
if (stripeinfo[where[i]].bufct++ == 0)
- atomic_inc(&our_cb->refcount);
+ atomic_inc(&lbc.lbc_remaining);
}
+ /* Find the starting offset within the page array for each stripeinfo,
+ * and the index within this LOV's vector of possible OSCs.
+ */
for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
i < stripe_count; i++, loi++, si_last = si, si++) {
if (i > 0)
si->ost_idx = loi->loi_ost_idx;
}
+ /* Repack the requests densely into ioarr, with each target's pages in
+ * order, and then grouped by stripe order (A1A2A3B1B2B3C1C2, for a
+ * write with striping pattern of ABCABCAB)).
+ */
for (i = 0; i < oa_bufs; i++) {
int which = where[i];
int shift;
stripeinfo[which].subcount++;
}
+ /* For each target to which we are writing -- some stripes might have
+ * zero pages to write, e.g. the write is < stripe_count *stripe_width
+ * -- call obd_brw for the range of brw_pages sent to that target.
+ * ([offset, count] will be A:[0, 3], B:[3, 3], C:[6, 2] for the
+ * example above.)
+ */
for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
int shift = si->index;
if (si->bufct) {
+ struct io_cb_data *data = &cb_data[i];
LASSERT(shift < oa_bufs);
+
+ /* This looks like ll_init_cb, except in-place. */
+ init_waitqueue_head(&data->waitq);
+ atomic_set(&data->refcount, 2);
+ data->data = &lbc;
+ data->cb = callback;
+
/* XXX handle error returns here */
- obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
- &si->lsm, si->bufct, &ioarr[shift],
- lov_osc_brw_callback, our_cb);
+ rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
+ &si->lsm, si->bufct, &ioarr[shift],
+ lov_osc_brw_callback, data);
+
+ /* On error, pretend this didn't exist, because we won't
+ * have seen a START call to add a ref to this OBD's
+ * desc, and so we don't want to muddle with the
+ * likely-deleted desc below.
+ */
+ if (rc)
+ si->bufct = 0;
+
}
}
- rc = callback(cbd, 0, CB_PHASE_START);
+ /* A brief note on the recovery story here:
+ *
+ * Each obd_brw gets its own io_cb_data, and they're all fused into a
+ * single allocation (cb_data). The lov_osc_brw_callback invocation
+ * that results from each obd_brw's underlying bulk send/recv completing
+ * will mark that io_cb_data as complete, and decrement the
+ * lbc_remaining count in the LOV's "master" callback data.
+ *
+ * The LOV will go to sleep as soon as all the (async) obd_brws have
+ * been started. lov_osc_brw_callback will wake it up iff all OSCs have
+ * completed (lbc_remaining has reached zero). If the timeout expires,
+ * the LOV will walk the cb_data vector and initiate recovery on any
+ * connection associated with an as-yet-incomplete desc.
+ */
+
+ /* XXX Make sure that the callback doesn't block here, by faking
+ * XXX "completion". This is very very gross, and we might be
+ * XXX better off just not calling the callback at all.
+ */
+ cbd->complete = 1;
+ (void)callback(cbd, 0, CB_PHASE_START);
+ /* XXX Watch us ignore the return code! */
+
+ lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, NULL, NULL, NULL);
+ rc = l_wait_event(lbc.lbc_waitq, atomic_read(&lbc.lbc_remaining) == 0,
+ &lwi);
+
+ for (i = 0; i < oa_bufs; i++) {
+ if (stripeinfo[i].bufct == 0)
+ continue;
+
+ if (!cb_data[i].complete) {
+ CERROR("invoking recovery for OSC %s: %d\n",
+ lov->tgts[stripeinfo[i].ost_idx].uuid, rc);
+ recovd_conn_fail(cb_data[i].desc->bd_connection);
+ }
+ ptlrpc_bulk_decref(cb_data[i].desc);
+ }
+
+ (void)callback(cbd, 0, CB_PHASE_FINISH);
+ /* XXX We need an error reporting/bytes-written story here, statim. */
+
+ rc = 0;
+ OBD_FREE(cb_data, sizeof(*cb_data) * oa_bufs);
+ out_ioarr:
OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
out_where:
OBD_FREE(where, sizeof(*where) * oa_bufs);
out_sinfo:
OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
- out_cbdata:
- OBD_FREE(our_cb, sizeof(*our_cb));
+
RETURN(rc);
}
for (i = 0; i < lov->desc.ld_tgt_count; i++) {
int err;
+ if (!lov->tgts[i].active)
+ continue;
+
err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
if (err) {
CERROR("Error statfs OSC %s idx %d: err = %d\n",