RETURN(rc);
}
-struct lov_brw_cb_data {
- atomic_t lbc_remaining;
- wait_queue_head_t lbc_waitq;
-};
-
static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase)
{
int ret = 0;
- struct lov_brw_cb_data *lbc = cbd->data;
ENTRY;
-
- if (phase == CB_PHASE_START) {
- /* We raise the reference count here, so that it's still
- * around when we go to inspect in case of failure.
- * Balanced in the loop at the bottom of lov_brw.
- */
- atomic_inc(&cbd->desc->bd_refcount);
+ if (phase == CB_PHASE_START)
RETURN(0);
- }
if (phase == CB_PHASE_FINISH) {
- if (err) {
- CDEBUG(D_HA, "err %d on BRW to %s\n", err,
- cbd->desc->bd_connection->c_remote_uuid);
+ if (err)
cbd->err = err;
- cbd->complete = 0;
- } else {
- CDEBUG(D_HA, "BRW to %s complete\n",
- cbd->desc->bd_connection->c_remote_uuid);
- cbd->err = 0;
- cbd->complete = 1;
- }
- if (atomic_dec_and_test(&lbc->lbc_remaining))
- wake_up(&lbc->lbc_waitq);
+ if (atomic_dec_and_test(&cbd->refcount))
+ ret = cbd->cb(cbd->data, cbd->err, phase);
RETURN(ret);
}
return 0;
}
-static int lov_brw(int cmd, struct lustre_handle *conn,
- struct lov_stripe_md *lsm, obd_count oa_bufs,
- struct brw_page *pga, brw_callback_t callback,
- struct io_cb_data *cbd)
+static inline int lov_brw(int cmd, struct lustre_handle *conn,
+ struct lov_stripe_md *lsm, obd_count oa_bufs,
+ struct brw_page *pga,
+ brw_callback_t callback, struct io_cb_data *cbd)
{
int stripe_count = lsm->lsm_stripe_count;
struct obd_export *export = class_conn2export(conn);
} *stripeinfo, *si, *si_last;
struct brw_page *ioarr;
int rc, i;
- struct io_cb_data *cb_data;
+ struct io_cb_data *our_cb;
struct lov_oinfo *loi;
- struct lov_brw_cb_data lbc;
- struct l_wait_info lwi;
int *where;
ENTRY;
lov = &export->exp_obd->u.lov;
- OBD_ALLOC(stripeinfo, sizeof(*stripeinfo) * stripe_count);
- if (!stripeinfo)
+ our_cb = ll_init_cb();
+ if (!our_cb)
RETURN(-ENOMEM);
+ OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
+ if (!stripeinfo)
+ GOTO(out_cbdata, rc = -ENOMEM);
+
OBD_ALLOC(where, sizeof(*where) * oa_bufs);
if (!where)
GOTO(out_sinfo, rc = -ENOMEM);
if (!ioarr)
GOTO(out_where, rc = -ENOMEM);
- OBD_ALLOC(cb_data, sizeof(*cb_data) * stripe_count);
- if (!cb_data)
- GOTO(out_ioarr, rc = -ENOMEM);
-
- init_waitqueue_head(&lbc.lbc_waitq);
- atomic_set(&lbc.lbc_remaining, 0);
+ /* This is the only race-free way I can think of to get the refcount
+ * correct. -phil */
+ atomic_set(&our_cb->refcount, 0);
+ our_cb->cb = callback;
+ our_cb->data = cbd;
- /* Compute the page count per stripe, and set where[i] to be the
- * stripe number for this brw_page.
- */
for (i = 0; i < oa_bufs; i++) {
where[i] = lov_stripe_number(lsm, pga[i].off);
if (stripeinfo[where[i]].bufct++ == 0)
- atomic_inc(&lbc.lbc_remaining);
+ atomic_inc(&our_cb->refcount);
}
- /* Find the starting offset within the page array for each stripeinfo,
- * and the index within this LOV's vector of possible OSCs.
- */
for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
i < stripe_count; i++, loi++, si_last = si, si++) {
if (i > 0)
si->ost_idx = loi->loi_ost_idx;
}
- /* Repack the requests densely into ioarr, with each target's pages in
- * order, and then grouped by stripe order (A1A2A3B1B2B3C1C2, for a
- * write with striping pattern of ABCABCAB)).
- */
for (i = 0; i < oa_bufs; i++) {
int which = where[i];
int shift;
stripeinfo[which].subcount++;
}
- /* For each target to which we are writing -- some stripes might have
- * zero pages to write, e.g. the write is < stripe_count *stripe_width
- * -- call obd_brw for the range of brw_pages sent to that target.
- * ([offset, count] will be A:[0, 3], B:[3, 3], C:[6, 2] for the
- * example above.)
- */
for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
int shift = si->index;
if (si->bufct) {
- struct io_cb_data *data = &cb_data[i];
LASSERT(shift < oa_bufs);
-
- /* This looks like ll_init_cb, except in-place. */
- init_waitqueue_head(&data->waitq);
- atomic_set(&data->refcount, 2);
- data->data = &lbc;
- data->cb = callback;
-
/* XXX handle error returns here */
- rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
- &si->lsm, si->bufct, &ioarr[shift],
- lov_osc_brw_callback, data);
-
- /* On error, pretend this didn't exist, because we won't
- * have seen a START call to add a ref to this OBD's
- * desc, and so we don't want to muddle with the
- * likely-deleted desc below.
- */
- if (rc)
- si->bufct = 0;
-
+ obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
+ &si->lsm, si->bufct, &ioarr[shift],
+ lov_osc_brw_callback, our_cb);
}
}
- /* A brief note on the recovery story here:
- *
- * Each obd_brw gets its own io_cb_data, and they're all fused into a
- * single allocation (cb_data). The lov_osc_brw_callback invocation
- * that results from each obd_brw's underlying bulk send/recv completing
- * will mark that io_cb_data as complete, and decrement the
- * lbc_remaining count in the LOV's "master" callback data.
- *
- * The LOV will go to sleep as soon as all the (async) obd_brws have
- * been started. lov_osc_brw_callback will wake it up iff all OSCs have
- * completed (lbc_remaining has reached zero). If the timeout expires,
- * the LOV will walk the cb_data vector and initiate recovery on any
- * connection associated with an as-yet-incomplete desc.
- */
-
- /* XXX Make sure that the callback doesn't block here, by faking
- * XXX "completion". This is very very gross, and we might be
- * XXX better off just not calling the callback at all.
- */
- cbd->complete = 1;
- (void)callback(cbd, 0, CB_PHASE_START);
- /* XXX Watch us ignore the return code! */
-
- lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, NULL, NULL, NULL);
- rc = l_wait_event(lbc.lbc_waitq, atomic_read(&lbc.lbc_remaining) == 0,
- &lwi);
-
- for (i = 0; i < oa_bufs; i++) {
- if (stripeinfo[i].bufct == 0)
- continue;
-
- if (!cb_data[i].complete) {
- CERROR("invoking recovery for OSC %s: %d\n",
- lov->tgts[stripeinfo[i].ost_idx].uuid, rc);
- recovd_conn_fail(cb_data[i].desc->bd_connection);
- }
- ptlrpc_bulk_decref(cb_data[i].desc);
- }
-
- (void)callback(cbd, 0, CB_PHASE_FINISH);
- /* XXX We need an error reporting/bytes-written story here, statim. */
-
- rc = 0;
+ rc = callback(cbd, 0, CB_PHASE_START);
- OBD_FREE(cb_data, sizeof(*cb_data) * oa_bufs);
- out_ioarr:
OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
out_where:
OBD_FREE(where, sizeof(*where) * oa_bufs);
out_sinfo:
OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
-
+ out_cbdata:
+ OBD_FREE(our_cb, sizeof(*our_cb));
RETURN(rc);
}