Whamcloud - gitweb
- Bugzilla 214: Have LOV do its own timeout management, so that all OSCs get
authorshaver <shaver>
Wed, 30 Oct 2002 03:31:47 +0000 (03:31 +0000)
committershaver <shaver>
Wed, 30 Oct 2002 03:31:47 +0000 (03:31 +0000)
  removed correctly, _and_ we don't crash as soon as we try to kick off
  recovery.  What a deal!

- Fix removal of OSC from LOV on OST failure (silly & and type-smashing
  list_head macros).

- Lots of comments in lov_brw, so I don't have to figure it out from first
  principles next time.  (I think they're correct, but Andreas will review to
  make sure.)

- Skip inactive OSCs in lov_statfs, because otherwise we return -EIO and
  the caller will ignore all of the data we get from the active/working OSCs.

- A bit more bulletproofing of the sync_io_timeout case -- if you see that
  message, please reopen the bug.

- Be a little smarter about double-failed connections.  Might make
  failure-in-recovery work better, and will definitely make multiply-detected
  connection failure work better.

lustre/include/linux/lustre_lib.h
lustre/llite/recover.c
lustre/lov/lov_obd.c
lustre/obdclass/genops.c
lustre/osc/osc_request.c
lustre/ptlrpc/recovd.c

index 923260d..f3f59b9 100644 (file)
@@ -487,8 +487,8 @@ static inline int obd_ioctl_getdata(char **buf, int *len, void *arg)
  * waitq or some similar mechanism, or an interrupt occurs (if the caller has
  * asked for interrupts to be detected).  The timeout will only fire once, so
  * callers should take care that a timeout_handler which returns zero will take
- * future steps to awaken the process.  N.B. that these steps must include making
- * the provided condition become true.
+ * future steps to awaken the process.  N.B. that these steps must include
+ * making the provided condition become true.
  *
  * If the interrupt flag (lwi_signals) is non-zero, then the process will be
  * interruptible, and will be awakened by any "killable" signal (SIGTERM,
index 3692042..04df215 100644 (file)
@@ -90,7 +90,7 @@ static void prepare_osc(struct obd_import *imp)
                 struct lustre_handle fakeconn;
                 struct obd_ioctl_data ioc_data;
                 struct obd_export *exp =
-                        list_entry(&notify_obd->obd_exports.next,
+                        list_entry(notify_obd->obd_exports.next,
                                    struct obd_export, exp_obd_chain);
                 fakeconn.addr = (__u64)(unsigned long)exp;
                 fakeconn.cookie = exp->exp_cookie;
index 471c50d..0621574 100644 (file)
@@ -978,19 +978,41 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
         RETURN(rc);
 }
 
+struct lov_brw_cb_data {
+        atomic_t           lbc_remaining;
+        wait_queue_head_t  lbc_waitq;
+};
+
 static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase)
 {
         int ret = 0;
+        struct lov_brw_cb_data *lbc = cbd->data;
         ENTRY;
 
-        if (phase == CB_PHASE_START)
+
+        if (phase == CB_PHASE_START) {
+                /* We raise the reference count here, so that it's still
+                 * around when we go to inspect in case of failure.
+                 * Balanced in the loop at the bottom of lov_brw.
+                 */
+                atomic_inc(&cbd->desc->bd_refcount);
                 RETURN(0);
+        }                
 
         if (phase == CB_PHASE_FINISH) {
-                if (err)
+                if (err) {
+                        CDEBUG(D_HA, "err %d on BRW to %s\n", err,
+                               cbd->desc->bd_connection->c_remote_uuid);
                         cbd->err = err;
-                if (atomic_dec_and_test(&cbd->refcount))
-                        ret = cbd->cb(cbd->data, cbd->err, phase);
+                        cbd->complete = 0;
+                } else {
+                        CDEBUG(D_HA, "BRW to %s complete\n",
+                               cbd->desc->bd_connection->c_remote_uuid);
+                        cbd->err = 0;
+                        cbd->complete = 1;
+                }
+                if (atomic_dec_and_test(&lbc->lbc_remaining))
+                        wake_up(&lbc->lbc_waitq);
                 RETURN(ret);
         }
 
@@ -998,10 +1020,10 @@ static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase)
         return 0;
 }
 
-static inline int lov_brw(int cmd, struct lustre_handle *conn,
-                          struct lov_stripe_md *lsm, obd_count oa_bufs,
-                          struct brw_page *pga,
-                          brw_callback_t callback, struct io_cb_data *cbd)
+static int lov_brw(int cmd, struct lustre_handle *conn,
+                   struct lov_stripe_md *lsm, obd_count oa_bufs,
+                   struct brw_page *pga, brw_callback_t callback,
+                   struct io_cb_data *cbd)
 {
         int stripe_count = lsm->lsm_stripe_count;
         struct obd_export *export = class_conn2export(conn);
@@ -1015,8 +1037,10 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn,
         } *stripeinfo, *si, *si_last;
         struct brw_page *ioarr;
         int rc, i;
-        struct io_cb_data *our_cb;
+        struct io_cb_data *cb_data;
         struct lov_oinfo *loi;
+        struct lov_brw_cb_data lbc;
+        struct l_wait_info lwi;
         int *where;
         ENTRY;
 
@@ -1033,13 +1057,9 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn,
 
         lov = &export->exp_obd->u.lov;
 
-        our_cb = ll_init_cb();
-        if (!our_cb)
-                RETURN(-ENOMEM);
-
-        OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
+        OBD_ALLOC(stripeinfo, sizeof(*stripeinfo) * stripe_count);
         if (!stripeinfo)
-                GOTO(out_cbdata, rc = -ENOMEM);
+                RETURN(-ENOMEM);
 
         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
         if (!where)
@@ -1049,18 +1069,25 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn,
         if (!ioarr)
                 GOTO(out_where, rc = -ENOMEM);
 
-        /* This is the only race-free way I can think of to get the refcount
-         * correct. -phil */
-        atomic_set(&our_cb->refcount, 0);
-        our_cb->cb = callback;
-        our_cb->data = cbd;
+        OBD_ALLOC(cb_data, sizeof(*cb_data) * stripe_count);
+        if (!cb_data)
+                GOTO(out_ioarr, rc = -ENOMEM);
+
+        init_waitqueue_head(&lbc.lbc_waitq);
+        atomic_set(&lbc.lbc_remaining, 0);
 
+        /* Compute the page count per stripe, and set where[i] to be the 
+         * stripe number for this brw_page.
+         */
         for (i = 0; i < oa_bufs; i++) {
                 where[i] = lov_stripe_number(lsm, pga[i].off);
                 if (stripeinfo[where[i]].bufct++ == 0)
-                        atomic_inc(&our_cb->refcount);
+                        atomic_inc(&lbc.lbc_remaining);
         }
 
+        /* Find the starting offset within the page array for each stripeinfo,
+         * and the index within this LOV's vector of possible OSCs.
+         */
         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
              i < stripe_count; i++, loi++, si_last = si, si++) {
                 if (i > 0)
@@ -1069,6 +1096,10 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn,
                 si->ost_idx = loi->loi_ost_idx;
         }
 
+        /* Repack the requests densely into ioarr, with each target's pages in
+         * order, and then grouped by stripe order (A1A2A3B1B2B3C1C2, for a
+         * write with striping pattern of ABCABCAB)).
+         */
         for (i = 0; i < oa_bufs; i++) {
                 int which = where[i];
                 int shift;
@@ -1080,27 +1111,93 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn,
                 stripeinfo[which].subcount++;
         }
 
+        /* For each target to which we are writing -- some stripes might have
+         * zero pages to write, e.g. the write is < stripe_count *stripe_width
+         * -- call obd_brw for the range of brw_pages sent to that target.
+         * ([offset, count] will be A:[0, 3], B:[3, 3], C:[6, 2] for the
+         * example above.)
+         */
         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
                 int shift = si->index;
 
                 if (si->bufct) {
+                        struct io_cb_data *data = &cb_data[i];
                         LASSERT(shift < oa_bufs);
+
+                        /* This looks like ll_init_cb, except in-place. */
+                        init_waitqueue_head(&data->waitq);
+                        atomic_set(&data->refcount, 2);
+                        data->data = &lbc;
+                        data->cb = callback;
+
                         /* XXX handle error returns here */
-                        obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
-                                &si->lsm, si->bufct, &ioarr[shift],
-                                lov_osc_brw_callback, our_cb);
+                        rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
+                                     &si->lsm, si->bufct, &ioarr[shift],
+                                     lov_osc_brw_callback, data);
+
+                        /* On error, pretend this didn't exist, because we won't
+                         * have seen a START call to add a ref to this OBD's
+                         * desc, and so we don't want to muddle with the
+                         * likely-deleted desc below.
+                         */
+                        if (rc)
+                                si->bufct = 0;
+                                
                 }
         }
 
-        rc = callback(cbd, 0, CB_PHASE_START);
+        /* A brief note on the recovery story here:
+         *
+         * Each obd_brw gets its own io_cb_data, and they're all fused into a
+         * single allocation (cb_data).  The lov_osc_brw_callback invocation
+         * that results from each obd_brw's underlying bulk send/recv completing
+         * will mark that io_cb_data as complete, and decrement the
+         * lbc_remaining count in the LOV's "master" callback data.
+         *
+         * The LOV will go to sleep as soon as all the (async) obd_brws have
+         * been started.  lov_osc_brw_callback will wake it up iff all OSCs have
+         * completed (lbc_remaining has reached zero).  If the timeout expires,
+         * the LOV will walk the cb_data vector and initiate recovery on any
+         * connection associated with an as-yet-incomplete desc.
+         */
+
+        /* XXX Make sure that the callback doesn't block here, by faking
+         * XXX "completion".  This is very very gross, and we might be
+         * XXX better off just not calling the callback at all.
+         */
+        cbd->complete = 1;
+        (void)callback(cbd, 0, CB_PHASE_START);
+        /* XXX Watch us ignore the return code! */
+
+        lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, NULL, NULL, NULL);
+        rc = l_wait_event(lbc.lbc_waitq, atomic_read(&lbc.lbc_remaining) == 0, 
+                          &lwi);
+
+        for (i = 0; i < oa_bufs; i++) {
+                if (stripeinfo[i].bufct == 0)
+                        continue;
+
+                if (!cb_data[i].complete) {
+                        CERROR("invoking recovery for OSC %s: %d\n",
+                               lov->tgts[stripeinfo[i].ost_idx].uuid, rc);
+                        recovd_conn_fail(cb_data[i].desc->bd_connection);
+                }
+                ptlrpc_bulk_decref(cb_data[i].desc);
+        }
+        
+        (void)callback(cbd, 0, CB_PHASE_FINISH);
+        /* XXX We need an error reporting/bytes-written story here, statim. */
+        
+        rc = 0;
 
+        OBD_FREE(cb_data, sizeof(*cb_data) * oa_bufs);
+ out_ioarr:
         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
  out_where:
         OBD_FREE(where, sizeof(*where) * oa_bufs);
  out_sinfo:
         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
- out_cbdata:
-        OBD_FREE(our_cb, sizeof(*our_cb));
+
         RETURN(rc);
 }
 
@@ -1256,6 +1353,9 @@ static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                 int err;
 
+                if (!lov->tgts[i].active)
+                        continue;
+
                 err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
                 if (err) {
                         CERROR("Error statfs OSC %s idx %d: err = %d\n",
index aaa0316..9279dc1 100644 (file)
@@ -37,7 +37,11 @@ static int sync_io_timeout(void *data)
         LASSERT(cbd);
         desc = cbd->desc;
 
-        LASSERT(desc);
+        if (!desc) {
+                CERROR("no desc for timed-out BRW, reopen Bugzilla 214!\n");
+                RETURN(0); /* back to sleep -- someone had better wake us up! */
+        }
+
         LASSERT(desc->bd_connection);
 
         CERROR("IO of %d pages to/from %s:%d (conn %p) timed out\n",
@@ -79,15 +83,15 @@ int ll_sync_io_cb(struct io_cb_data *data, int err, int phase)
                 if (atomic_dec_and_test(&data->refcount))
                         OBD_FREE(data, sizeof(*data));
                 if (ret == -EINTR)
-                        return ret;
+                        RETURN(ret);
         } else if (phase == CB_PHASE_FINISH) {
                 data->err = err;
                 data->complete = 1;
                 wake_up(&data->waitq);
                 if (atomic_dec_and_test(&data->refcount))
                         OBD_FREE(data, sizeof(*data));
-                return err;
-        } else
+                RETURN(err);
+        } else                
                 LBUG();
         EXIT;
         return 0;
index 9e7d257..2bc9093 100644 (file)
@@ -391,6 +391,7 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
 
         cb_data->callback = callback;
         cb_data->cb_data = data;
+        CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
         data->desc = desc;
         desc->bd_cb_data = cb_data;
 
@@ -509,6 +510,7 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
 
         cb_data->callback = callback;
         cb_data->cb_data = data;
+        CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
         data->desc = desc;
         desc->bd_cb_data = cb_data;
 
index 180c339..5edef0c 100644 (file)
@@ -116,10 +116,9 @@ void recovd_conn_fail(struct ptlrpc_connection *conn)
         }
 
         spin_lock(&recovd->recovd_lock);
-        if (rd->rd_phase != RD_IDLE) {
-                CERROR("connection %p to %s already in recovery\n",
+        if (rd->rd_phase == RD_TROUBLED || rd->rd_phase == RD_PREPARING) {
+                CDEBUG(D_HA, "connection %p to %s already in recovery\n",
                        conn, conn->c_remote_uuid);
-                /* XXX need to distinguish from failure-in-recovery */
                 spin_unlock(&recovd->recovd_lock);
                 EXIT;
                 return;
@@ -128,6 +127,13 @@ void recovd_conn_fail(struct ptlrpc_connection *conn)
         CERROR("connection %p to %s failed\n", conn, conn->c_remote_uuid);
         list_del(&rd->rd_managed_chain);
         list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items);
+        if (rd->rd_phase != RD_IDLE) {
+                CDEBUG(D_HA,
+                       "connection %p to %s failed in recovery: restarting\n",
+                       conn, conn->c_remote_uuid);
+                /* XXX call callback with PHASE_FAILED? */
+                rd->rd_next_phase = RD_TROUBLED;
+        }
         rd->rd_phase = RD_TROUBLED;
         dump_lists(recovd);
         spin_unlock(&recovd->recovd_lock);