Whamcloud - gitweb
First steps at getting recovery back off the ground:
authorshaver <shaver>
Mon, 12 Aug 2002 19:53:27 +0000 (19:53 +0000)
committershaver <shaver>
Mon, 12 Aug 2002 19:53:27 +0000 (19:53 +0000)
* make the callback data parameter to brw functions be strongly typed as
  cb_io_data.  LOV and other non-OSC users of these facilities should
  "inherit" from this struct, see lov_callback_data for an example.
* replace l_wait_event_killable and some wait_event calls with
  l_wait_event, an all-singing, all-dancing timeout- and interrupt-handling
  event waiting macro.  More such replacement to come.
* interrupt and timeout handling of bulk data will probably crash at present,
  but it didn't really work before either -- I'll fix it up ASAP.

lustre/include/linux/lustre_lib.h
lustre/include/linux/obd.h
lustre/lib/page.c
lustre/lov/lov_obd.c
lustre/osc/osc_request.c
lustre/ptlrpc/client.c

index 4e0f03f..2cdaddb 100644 (file)
@@ -67,8 +67,9 @@ struct io_cb_data {
         atomic_t refcount;
         int complete;
         int err;
+        struct ptlrpc_bulk_desc *desc;
 };
-int ll_sync_io_cb(void *data, int err, int phase);
+int ll_sync_io_cb(struct io_cb_data *data, int err, int phase);
 struct  io_cb_data *ll_init_cb(void);
 inline void lustre_put_page(struct page *page);
 struct page *lustre_get_page_read(struct inode *dir, unsigned long index);
@@ -407,77 +408,94 @@ static inline int obd_ioctl_getdata(char **buf, int *len, void *arg)
 
 #define OBD_IOC_DEC_FS_USE_COUNT       _IO  ('f', 133      )
 
+struct l_wait_info {
+        long   lwi_timeout;
+        int  (*lwi_on_timeout)(void *);
+        long   lwi_signals;
+        int  (*lwi_on_signal)(void *); /* XXX return is ignored for now */
+        void  *lwi_cb_data;
+};
 
+#define LWI_TIMEOUT(time, cb, data)                                             \
+((struct l_wait_info) {                                                         \
+        lwi_timeout:    time,                                                   \
+        lwi_on_timeout: cb,                                                     \
+        lwi_cb_data:    data                                                    \
+})
 
+#define LWI_INTR(signals, cb, data)                                             \
+((struct l_wait_info) {                                                         \
+        lwi_signals:   signals,                                                 \
+        lwi_on_signal: cb,                                                      \
+        lwi_cb_data:   data                                                     \
+})
 
+#define LWI_TIMEOUT_INTR(time, time_cb, signals, sig_cb, data)                  \
+((struct l_wait_info) {                                                         \
+        lwi_timeout:    time,                                                   \
+        lwi_on_timeout: time_cb,                                                \
+        lwi_signals:    signals,                                                \
+        lwi_on_signal:  sig_cb,                                                 \
+        lwi_cb_data:    data                                                    \
+})
 
 /* XXX this should be one mask-check */
-#define l_killable_pending(task)                                               \
-(sigismember(&(task->pending.signal), SIGKILL) ||                              \
- sigismember(&(task->pending.signal), SIGINT) ||                               \
+#define l_killable_pending(task)                                                \
+(sigismember(&(task->pending.signal), SIGKILL) ||                               \
+ sigismember(&(task->pending.signal), SIGINT) ||                                \
  sigismember(&(task->pending.signal), SIGTERM))
 
-/*
- * Like wait_event_interruptible, but we're only interruptible by
- * KILL, INT, or TERM.
- *
- * XXXshaver These are going away soon, I hope.
- */
-#define __l_wait_event_killable(wq, condition, ret)                          \
-do {                                                                         \
-        wait_queue_t __wait;                                                 \
-        init_waitqueue_entry(&__wait, current);                              \
-                                                                             \
-        add_wait_queue(&wq, &__wait);                                        \
-        for (;;) {                                                           \
-                set_current_state(TASK_INTERRUPTIBLE);                       \
-                if (condition)                                               \
-                        break;                                               \
-                if (!signal_pending(current) ||                              \
-                    !l_killable_pending(current)) {                          \
-                        schedule();                                          \
-                        continue;                                            \
-                }                                                            \
-                ret = -ERESTARTSYS;                                          \
-                break;                                                       \
-        }                                                                    \
-        current->state = TASK_RUNNING;                                       \
-        remove_wait_queue(&wq, &__wait);                                     \
-} while(0)
-
-#define l_wait_event_killable(wq, condition)                            \
-({                                                                      \
-        int __ret = 0;                                                  \
-        if (!(condition))                                               \
-                __l_wait_event_killable(wq, condition, __ret);          \
-        __ret;                                                          \
-})
-
-#define __l_wait_event_timeout(wq, condition, timeout, ret)                   \
-do {                                                                          \
-        wait_queue_t __wait;                                                  \
-        init_waitqueue_entry(&__wait, current);                               \
-                                                                              \
-        add_wait_queue(&wq, &__wait);                                         \
-        for (;;) {                                                            \
-                set_current_state(TASK_INTERRUPTIBLE);                        \
-                if (condition)                                                \
-                        break;                                                \
-                if (timeout)                                                  \
-                        schedule_timeout(timeout);                            \
-                else                                                          \
-                        schedule();                                           \
-        }                                                                     \
-        current->state = TASK_RUNNING;                                        \
-        remove_wait_queue(&wq, &__wait);                                      \
+#define __l_wait_event(wq, condition, info, ret)                                \
+do {                                                                            \
+        wait_queue_t __wait;                                                    \
+        long __state;                                                           \
+        init_waitqueue_entry(&__wait, current);                                 \
+                                                                                \
+        add_wait_queue(&wq, &__wait);                                           \
+        __state = TASK_UNINTERRUPTIBLE;                                         \
+        for (;;) {                                                              \
+                set_current_state(__state);                                     \
+                if (condition)                                                  \
+                        break;                                                  \
+                /* We only become INTERRUPTIBLE if a timeout has fired, and     \
+                 * the caller has given us some signals to care about.          \
+                 *                                                              \
+                 * XXXshaver we should check against info->wli_signals here,    \
+                 * XXXshaver instead of just using l_killable_pending, perhaps. \
+                 */                                                             \
+                if (__state == TASK_INTERRUPTIBLE &&                            \
+                    l_killable_pending(current)) {                              \
+                        if (info->lwi_on_signal)                                \
+                                info->lwi_on_signal(info->lwi_cb_data);         \
+                        ret = -EINTR;                                           \
+                        break;                                                  \
+                }                                                               \
+                if (info->lwi_timeout) {                                        \
+                        if (schedule_timeout(info->lwi_timeout) == 0) {         \
+                                /* We'll take signals only after a timeout. */  \
+                                if (info->lwi_signals)                          \
+                                        __state = TASK_INTERRUPTIBLE;           \
+                                if (info->lwi_on_timeout &&                     \
+                                    info->lwi_on_timeout(info->lwi_cb_data)) {  \
+                                        ret = -ETIMEDOUT;                       \
+                                        break;                                  \
+                                }                                               \
+                        }                                                       \
+                } else {                                                        \
+                        schedule();                                             \
+                }                                                               \
+        }                                                                       \
+        current->state = TASK_RUNNING;                                          \
+        remove_wait_queue(&wq, &__wait);                                        \
 } while(0)
 
-#define l_wait_event_timeout(wq, condition, timeout)                          \
-({                                                                            \
-        int __ret = 0;                                                        \
-        if (!(condition))                                                     \
-                __l_wait_event_timeout(wq, condition, timeout, __ret);        \
-        __ret;                                                                \
+#define l_wait_event(wq, condition, info)                                       \
+({                                                                              \
+        int __ret = 0;                                                          \
+        struct l_wait_info *__info = (info);                                    \
+        if (!(condition))                                                       \
+                __l_wait_event(wq, condition, __info, __ret);                   \
+        __ret;                                                                  \
 })
 
 #endif /* _LUSTRE_LIB_H */
index 6156266..27aed58 100644 (file)
@@ -23,7 +23,8 @@ struct obd_type {
         int  typ_refcnt;
 };
 
-typedef int (*brw_callback_t)(void *, int err, int phase);
+struct io_cb_data;
+typedef int (*brw_callback_t)(struct io_cb_data *, int err, int phase);
 struct brw_page { 
         struct page *pg;
         obd_size count;
@@ -206,6 +207,7 @@ struct obd_device {
         } u;
 };
 
+struct io_cb_data;
 
 struct obd_ops {
         int (*o_iocontrol)(long cmd, struct lustre_handle *, int len,
@@ -241,7 +243,7 @@ struct obd_ops {
         int (*o_brw)(int rw, struct lustre_handle *conn,
                      struct lov_stripe_md *md, obd_count oa_bufs,
                      struct brw_page *pgarr, brw_callback_t callback, 
-                     void * data);
+                     struct io_cb_data *data);
         int (*o_punch)(struct lustre_handle *conn, struct obdo *tgt,
                        struct lov_stripe_md *md, obd_size count,
                        obd_off offset);
index 51bb5f5..54ed7db 100644 (file)
 #include <linux/obd_class.h>
 #include <linux/lustre_net.h>
 #include <linux/lustre_lib.h>
+#include <linux/lustre_ha.h>
 
+static int sync_io_timeout(void *data)
+{
+        struct io_cb_data *cbd = data;
+        struct ptlrpc_bulk_desc *desc = cbd->desc;
+
+        ENTRY;
+        desc->b_connection->c_level = LUSTRE_CONN_RECOVD;
+        desc->b_flags |= PTL_RPC_FL_TIMEOUT;
+        if (desc->b_client && desc->b_client->cli_recovd) {
+                /* XXXshaver Do we need a resend strategy, or do we just
+                 * XXXshaver return -ERESTARTSYS and punt it?
+                 */
+#if 0
+                recovd_cli_fail(desc->b_client);
+#endif
+        }
+
+        /* We go back to sleep, until we're resumed or interrupted. */
+        RETURN(0);
+}
+
+static int sync_io_intr(void *data)
+{
+        struct io_cb_data *cbd = data;
+        struct ptlrpc_bulk_desc *desc = cbd->desc;
+
+        ENTRY;
+        desc->b_flags |= PTL_RPC_FL_INTR;
+        RETURN(1); /* ignored, as of this writing */
+}
 
-int ll_sync_io_cb(void *data, int err, int phase)
+int ll_sync_io_cb(struct io_cb_data *data, int err, int phase)
 {
-        struct io_cb_data *d = data;
         int ret;
         ENTRY; 
 
         if (phase == CB_PHASE_START) { 
-                ret = l_wait_event_killable(d->waitq, d->complete);
-                if (atomic_dec_and_test(&d->refcount))
-                        OBD_FREE(d, sizeof(*d));
+#warning shaver hardcoded timeout
+                struct l_wait_info lwi;
+                lwi = LWI_TIMEOUT_INTR(100, sync_io_timeout,
+                                       SIGTERM | SIGKILL | SIGINT, sync_io_intr,
+                                       data);
+                ret = l_wait_event(data->waitq, data->complete, &lwi);
+                if (atomic_dec_and_test(&data->refcount))
+                        OBD_FREE(data, sizeof(*data));
                 if (ret == -ERESTARTSYS)
                         return ret;
         } else if (phase == CB_PHASE_FINISH) { 
-                d->err = err;
-                d->complete = 1;
-                wake_up(&d->waitq); 
-                if (atomic_dec_and_test(&d->refcount))
-                        OBD_FREE(d, sizeof(*d));
+                data->err = err;
+                data->complete = 1;
+                wake_up(&data->waitq); 
+                if (atomic_dec_and_test(&data->refcount))
+                        OBD_FREE(data, sizeof(*data));
                 return err;
         } else 
                 LBUG();
@@ -75,7 +110,7 @@ int ll_sync_io_cb(void *data, int err, int phase)
         return 0;
 }
 
-struct  io_cb_data *ll_init_cb(void)
+struct io_cb_data *ll_init_cb(void)
 {
         struct io_cb_data *d;
 
index d00296e..ebb225c 100644 (file)
@@ -511,30 +511,28 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
 }
 
 struct lov_callback_data {
-        atomic_t count;
-        struct io_cb_data *cbd;
-        brw_callback_t cb;
-        int err;
+        struct io_cb_data cbd;
+        brw_callback_t    cb;
 };
 
-int lov_osc_brw_callback(void *data, int err, int phase)
+int lov_osc_brw_callback(struct io_cb_data *data, int err, int phase)
 {
-        struct lov_callback_data *d = data;
+        struct lov_callback_data *lovcbd = (struct lov_callback_data *)data;
         int ret = 0;
         ENTRY; 
 
-        if (phase == CB_PHASE_START) { 
+        if (phase == CB_PHASE_START)
                 RETURN(0);
-        } else if (phase == CB_PHASE_FINISH) { 
+
+        if (phase == CB_PHASE_FINISH) { 
                 if (err) 
-                        d->err = err;
-                if (atomic_dec_and_test(&d->count)) { 
-                        ret = d->cb(d->cbd, 0, d->err); 
-                }
+                        lovcbd->cbd.err = err;
+                if (atomic_dec_and_test(&lovcbd->cbd.refcount))
+                        ret = lovcbd->cb(&lovcbd->cbd, 0, lovcbd->cbd.err); 
                 RETURN(ret);
-        } else 
-                LBUG();
-        EXIT;
+        }
+
+        LBUG();
         return 0;
 }
 
@@ -542,7 +540,7 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn,
                           struct lov_stripe_md *md, 
                           obd_count oa_bufs,
                           struct brw_page *pga,
-                          brw_callback_t callback, void *data)
+                          brw_callback_t callback, struct io_cb_data *data)
 {
         int stripe_count = md->lmd_stripe_count;
         struct obd_export *export = class_conn2export(conn);
@@ -598,18 +596,18 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn,
                 stripeinfo[which].subcount++;
         }
         
+        lov_cb_data->cbd = *data;
         lov_cb_data->cb = callback;
-        lov_cb_data->cbd = data;
-        atomic_set(&lov_cb_data->count, oa_bufs);
+        atomic_set(&lov_cb_data->cbd.refcount, oa_bufs);
         for (i=0 ; i < stripe_count ; i++) { 
                 int shift = stripeinfo[i].index;
 
                 obd_brw(cmd, &lov->tgts[i].conn, &stripeinfo[i].md, 
                         stripeinfo[i].bufct, &ioarr[shift], 
-                        lov_osc_brw_callback,  &lov_cb_data);
+                        lov_osc_brw_callback, (struct io_cb_data *)lov_cb_data);
         }
 
-        rc = callback(lov_cb_data, 0, CB_PHASE_START);
+        rc = callback((struct io_cb_data *)lov_cb_data, 0, CB_PHASE_START);
 
         RETURN(rc);
 }
index 3f960a4..4bb1dc7 100644 (file)
@@ -25,8 +25,7 @@
 #include <linux/obd_ost.h>
 #include <linux/obd_lov.h>
 #include <linux/init.h>
-
-
+#include <linux/lustre_ha.h>
 
 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa, 
                        struct lov_stripe_md *md)
@@ -319,9 +318,9 @@ static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
         int err = 0;
         ENTRY;
 
-        if (desc->b_flags & PTL_RPC_FL_INTR) {
-                err = -ERESTARTSYS;
-                CERROR("got signal\n");
+        if (desc->b_flags & PTL_RPC_FL_TIMEOUT) {
+                err = (desc->b_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS : 
+                       -ETIMEDOUT);
         }
 
         if (cb_data->callback)
@@ -339,7 +338,9 @@ static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
         EXIT;
 }
 
-static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, obd_count page_count, struct brw_page *pga, brw_callback_t callback, void *data)
+static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
+                        obd_count page_count, struct brw_page *pga,
+                        brw_callback_t callback, struct io_cb_data *data)
 {
         struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
         struct ptlrpc_request *request = NULL;
@@ -453,7 +454,7 @@ out_unmap:
 static int osc_brw_write(struct lustre_handle *conn,
                          struct lov_stripe_md *md, obd_count page_count,
                          struct brw_page *pga,
-                         brw_callback_t callback, void *data)
+                         brw_callback_t callback, struct io_cb_data *data)
 {
         struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
         struct ptlrpc_request *request = NULL;
@@ -581,7 +582,7 @@ out_cb:
 static int osc_brw(int cmd, struct lustre_handle *conn,
                    struct lov_stripe_md *md, obd_count page_count,
                    struct brw_page *pagear, brw_callback_t callback, 
-                   void *data) 
+                   struct io_cb_data *data) 
 {
         if (cmd & OBD_BRW_WRITE)
                 return osc_brw_write(conn, md, page_count, pagear, callback, data);
index d830e82..31d4e91 100644 (file)
@@ -269,6 +269,7 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
                 GOTO(out, rc = 1);
         }
 
+#if 0
         if (req->rq_flags & PTL_RPC_FL_RESEND) { 
                 if (l_killable_pending(current)) {
                         CERROR("-- INTR --\n");
@@ -278,6 +279,7 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
                 CERROR("-- RESEND --\n");
                 GOTO(out, rc = 1);
         }
+#endif
 
         if (req->rq_flags & PTL_RPC_FL_RECOVERY) { 
                 CERROR("-- RESTART --\n");
@@ -470,9 +472,38 @@ void ptlrpc_restart_req(struct ptlrpc_request *req)
         EXIT;
 }
 
+static int expired_request(void *data)
+{
+        struct ptlrpc_request *req = data;
+        
+        ENTRY;
+        req->rq_timeout = 0;
+        req->rq_connection->c_level = LUSTRE_CONN_RECOVD;
+        req->rq_flags |= PTL_RPC_FL_TIMEOUT;
+        /* Activate the recovd for this client, if there is one. */
+        if (req->rq_client && req->rq_client->cli_recovd)
+                recovd_cli_fail(req->rq_client);
+
+        /* If this request is for recovery or other primordial tasks,
+         * don't go back to sleep.
+         */
+        if (req->rq_level < LUSTRE_CONN_FULL)
+                RETURN(1);
+        RETURN(0);
+}
+
+static int interrupted_request(void *data)
+{
+        struct ptlrpc_request *req = data;
+        ENTRY;
+        req->rq_flags |= PTL_RPC_FL_INTR;
+        RETURN(1); /* ignored, as of this writing */
+}
+
 int ptlrpc_queue_wait(struct ptlrpc_request *req)
 {
-        int rc = 0, timeout;
+        int rc = 0;
+        struct l_wait_info lwi;
         struct ptlrpc_client *cli = req->rq_client;
         ENTRY;
 
@@ -485,16 +516,22 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         if (req->rq_level > req->rq_connection->c_level) { 
                 CERROR("process %d waiting for recovery (%d > %d)\n", 
                        current->pid, req->rq_level, req->rq_connection->c_level);
+
                 spin_lock(&cli->cli_lock);
                 list_del_init(&req->rq_list);
                 list_add_tail(&req->rq_list, &cli->cli_delayed_head);
                 spin_unlock(&cli->cli_lock);
-                l_wait_event_killable
-                        (req->rq_wait_for_rep, 
-                         req->rq_level <= req->rq_connection->c_level);
+
+#warning shaver: what happens when we get interrupted during this wait?
+                lwi = LWI_INTR(SIGTERM | SIGKILL | SIGINT, NULL, NULL);
+                l_wait_event(req->rq_wait_for_rep,
+                             req->rq_level <= req->rq_connection->c_level,
+                             &lwi);
+
                 spin_lock(&cli->cli_lock);
                 list_del_init(&req->rq_list);
                 spin_unlock(&cli->cli_lock);
+
                 CERROR("process %d resumed\n", current->pid);
         }
  resend:
@@ -516,22 +553,23 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         spin_unlock(&cli->cli_lock);
 
         CDEBUG(D_OTHER, "-- sleeping\n");
-        /*
-         * req->rq_timeout gets reset in the timeout case, and
-         * l_wait_event_timeout is a macro, so save the timeout value here.
-         */
-        timeout = req->rq_timeout * HZ;
-        l_wait_event_timeout(req->rq_wait_for_rep, ptlrpc_check_reply(req),
-                             timeout);
+        lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
+                               SIGKILL | SIGTERM | SIGINT, interrupted_request,
+                               req);
+        l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
         CDEBUG(D_OTHER, "-- done\n");
 
-        if (req->rq_flags & PTL_RPC_FL_RESEND) {
+        /* Don't resend if we were interrupted. */
+        if ((req->rq_flags & (PTL_RPC_FL_RESEND | PTL_RPC_FL_INTR)) ==
+            PTL_RPC_FL_RESEND) {
                 req->rq_flags &= ~PTL_RPC_FL_RESEND;
                 goto resend;
         }
 
         up(&cli->cli_rpc_sem);
         if (req->rq_flags & PTL_RPC_FL_INTR) {
+                if (!(req->rq_flags & PTL_RPC_FL_TIMEOUT))
+                        LBUG(); /* should only be interrupted if we timed out. */
                 /* Clean up the dangling reply buffers */
                 ptlrpc_abort(req);
                 GOTO(out, rc = -EINTR);
@@ -568,6 +606,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
 {
         int rc = 0;
         struct ptlrpc_client *cli = req->rq_client;
+        struct l_wait_info lwi = LWI_INTR(SIGKILL|SIGTERM|SIGINT, NULL, NULL);
         ENTRY;
 
         init_waitqueue_head(&req->rq_wait_for_rep);
@@ -586,7 +625,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         }
 
         CDEBUG(D_OTHER, "-- sleeping\n");
-        l_wait_event_killable(req->rq_wait_for_rep, ptlrpc_check_reply(req));
+        l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
         CDEBUG(D_OTHER, "-- done\n");
 
         up(&cli->cli_rpc_sem);