Whamcloud - gitweb
Revert Mike's recovery changes from LOV, for the greater good of being
authoradilger <adilger>
Fri, 1 Nov 2002 22:33:08 +0000 (22:33 +0000)
committeradilger <adilger>
Fri, 1 Nov 2002 22:33:08 +0000 (22:33 +0000)
able to use LOV again.

lustre/lov/lov_obd.c

index 7e1d1cb..97429ee 100644 (file)
@@ -982,41 +982,19 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
         RETURN(rc);
 }
 
-struct lov_brw_cb_data {
-        atomic_t           lbc_remaining;
-        wait_queue_head_t  lbc_waitq;
-};
-
 static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase)
 {
         int ret = 0;
-        struct lov_brw_cb_data *lbc = cbd->data;
         ENTRY;
 
-
-        if (phase == CB_PHASE_START) {
-                /* We raise the reference count here, so that it's still
-                 * around when we go to inspect in case of failure.
-                 * Balanced in the loop at the bottom of lov_brw.
-                 */
-                atomic_inc(&cbd->desc->bd_refcount);
+        if (phase == CB_PHASE_START)
                 RETURN(0);
-        }                
 
         if (phase == CB_PHASE_FINISH) {
-                if (err) {
-                        CDEBUG(D_HA, "err %d on BRW to %s\n", err,
-                               cbd->desc->bd_connection->c_remote_uuid);
+                if (err)
                         cbd->err = err;
-                        cbd->complete = 0;
-                } else {
-                        CDEBUG(D_HA, "BRW to %s complete\n",
-                               cbd->desc->bd_connection->c_remote_uuid);
-                        cbd->err = 0;
-                        cbd->complete = 1;
-                }
-                if (atomic_dec_and_test(&lbc->lbc_remaining))
-                        wake_up(&lbc->lbc_waitq);
+                if (atomic_dec_and_test(&cbd->refcount))
+                        ret = cbd->cb(cbd->data, cbd->err, phase);
                 RETURN(ret);
         }
 
@@ -1024,10 +1002,10 @@ static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase)
         return 0;
 }
 
-static int lov_brw(int cmd, struct lustre_handle *conn,
-                   struct lov_stripe_md *lsm, obd_count oa_bufs,
-                   struct brw_page *pga, brw_callback_t callback,
-                   struct io_cb_data *cbd)
+static inline int lov_brw(int cmd, struct lustre_handle *conn,
+                          struct lov_stripe_md *lsm, obd_count oa_bufs,
+                          struct brw_page *pga,
+                          brw_callback_t callback, struct io_cb_data *cbd)
 {
         int stripe_count = lsm->lsm_stripe_count;
         struct obd_export *export = class_conn2export(conn);
@@ -1041,10 +1019,8 @@ static int lov_brw(int cmd, struct lustre_handle *conn,
         } *stripeinfo, *si, *si_last;
         struct brw_page *ioarr;
         int rc, i;
-        struct io_cb_data *cb_data;
+        struct io_cb_data *our_cb;
         struct lov_oinfo *loi;
-        struct lov_brw_cb_data lbc;
-        struct l_wait_info lwi;
         int *where;
         ENTRY;
 
@@ -1061,10 +1037,14 @@ static int lov_brw(int cmd, struct lustre_handle *conn,
 
         lov = &export->exp_obd->u.lov;
 
-        OBD_ALLOC(stripeinfo, sizeof(*stripeinfo) * stripe_count);
-        if (!stripeinfo)
+        our_cb = ll_init_cb();
+        if (!our_cb)
                 RETURN(-ENOMEM);
 
+        OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
+        if (!stripeinfo)
+                GOTO(out_cbdata, rc = -ENOMEM);
+
         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
         if (!where)
                 GOTO(out_sinfo, rc = -ENOMEM);
@@ -1073,25 +1053,18 @@ static int lov_brw(int cmd, struct lustre_handle *conn,
         if (!ioarr)
                 GOTO(out_where, rc = -ENOMEM);
 
-        OBD_ALLOC(cb_data, sizeof(*cb_data) * stripe_count);
-        if (!cb_data)
-                GOTO(out_ioarr, rc = -ENOMEM);
-
-        init_waitqueue_head(&lbc.lbc_waitq);
-        atomic_set(&lbc.lbc_remaining, 0);
+        /* This is the only race-free way I can think of to get the refcount
+         * correct. -phil */
+        atomic_set(&our_cb->refcount, 0);
+        our_cb->cb = callback;
+        our_cb->data = cbd;
 
-        /* Compute the page count per stripe, and set where[i] to be the 
-         * stripe number for this brw_page.
-         */
         for (i = 0; i < oa_bufs; i++) {
                 where[i] = lov_stripe_number(lsm, pga[i].off);
                 if (stripeinfo[where[i]].bufct++ == 0)
-                        atomic_inc(&lbc.lbc_remaining);
+                        atomic_inc(&our_cb->refcount);
         }
 
-        /* Find the starting offset within the page array for each stripeinfo,
-         * and the index within this LOV's vector of possible OSCs.
-         */
         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
              i < stripe_count; i++, loi++, si_last = si, si++) {
                 if (i > 0)
@@ -1100,10 +1073,6 @@ static int lov_brw(int cmd, struct lustre_handle *conn,
                 si->ost_idx = loi->loi_ost_idx;
         }
 
-        /* Repack the requests densely into ioarr, with each target's pages in
-         * order, and then grouped by stripe order (A1A2A3B1B2B3C1C2, for a
-         * write with striping pattern of ABCABCAB)).
-         */
         for (i = 0; i < oa_bufs; i++) {
                 int which = where[i];
                 int shift;
@@ -1115,93 +1084,27 @@ static int lov_brw(int cmd, struct lustre_handle *conn,
                 stripeinfo[which].subcount++;
         }
 
-        /* For each target to which we are writing -- some stripes might have
-         * zero pages to write, e.g. the write is < stripe_count *stripe_width
-         * -- call obd_brw for the range of brw_pages sent to that target.
-         * ([offset, count] will be A:[0, 3], B:[3, 3], C:[6, 2] for the
-         * example above.)
-         */
         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
                 int shift = si->index;
 
                 if (si->bufct) {
-                        struct io_cb_data *data = &cb_data[i];
                         LASSERT(shift < oa_bufs);
-
-                        /* This looks like ll_init_cb, except in-place. */
-                        init_waitqueue_head(&data->waitq);
-                        atomic_set(&data->refcount, 2);
-                        data->data = &lbc;
-                        data->cb = callback;
-
                         /* XXX handle error returns here */
-                        rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
-                                     &si->lsm, si->bufct, &ioarr[shift],
-                                     lov_osc_brw_callback, data);
-
-                        /* On error, pretend this didn't exist, because we won't
-                         * have seen a START call to add a ref to this OBD's
-                         * desc, and so we don't want to muddle with the
-                         * likely-deleted desc below.
-                         */
-                        if (rc)
-                                si->bufct = 0;
-                                
+                        obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
+                                &si->lsm, si->bufct, &ioarr[shift],
+                                lov_osc_brw_callback, our_cb);
                 }
         }
 
-        /* A brief note on the recovery story here:
-         *
-         * Each obd_brw gets its own io_cb_data, and they're all fused into a
-         * single allocation (cb_data).  The lov_osc_brw_callback invocation
-         * that results from each obd_brw's underlying bulk send/recv completing
-         * will mark that io_cb_data as complete, and decrement the
-         * lbc_remaining count in the LOV's "master" callback data.
-         *
-         * The LOV will go to sleep as soon as all the (async) obd_brws have
-         * been started.  lov_osc_brw_callback will wake it up iff all OSCs have
-         * completed (lbc_remaining has reached zero).  If the timeout expires,
-         * the LOV will walk the cb_data vector and initiate recovery on any
-         * connection associated with an as-yet-incomplete desc.
-         */
-
-        /* XXX Make sure that the callback doesn't block here, by faking
-         * XXX "completion".  This is very very gross, and we might be
-         * XXX better off just not calling the callback at all.
-         */
-        cbd->complete = 1;
-        (void)callback(cbd, 0, CB_PHASE_START);
-        /* XXX Watch us ignore the return code! */
-
-        lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, NULL, NULL, NULL);
-        rc = l_wait_event(lbc.lbc_waitq, atomic_read(&lbc.lbc_remaining) == 0, 
-                          &lwi);
-
-        for (i = 0; i < oa_bufs; i++) {
-                if (stripeinfo[i].bufct == 0)
-                        continue;
-
-                if (!cb_data[i].complete) {
-                        CERROR("invoking recovery for OSC %s: %d\n",
-                               lov->tgts[stripeinfo[i].ost_idx].uuid, rc);
-                        recovd_conn_fail(cb_data[i].desc->bd_connection);
-                }
-                ptlrpc_bulk_decref(cb_data[i].desc);
-        }
-        
-        (void)callback(cbd, 0, CB_PHASE_FINISH);
-        /* XXX We need an error reporting/bytes-written story here, statim. */
-        
-        rc = 0;
+        rc = callback(cbd, 0, CB_PHASE_START);
 
-        OBD_FREE(cb_data, sizeof(*cb_data) * oa_bufs);
- out_ioarr:
         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
  out_where:
         OBD_FREE(where, sizeof(*where) * oa_bufs);
  out_sinfo:
         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
-
+ out_cbdata:
+        OBD_FREE(our_cb, sizeof(*our_cb));
         RETURN(rc);
 }