From: shaver Date: Mon, 12 Aug 2002 19:53:27 +0000 (+0000) Subject: First steps at getting recovery back off the ground: X-Git-Tag: 0.5.5~167 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=e726878d613b55e9ff413d0710ecde0fda58edc0;p=fs%2Flustre-release.git First steps at getting recovery back off the ground: * make the callback data parameter to brw functions be strongly typed as cb_io_data. LOV and other non-OSC users of these facilities should "inherit" from this struct, see lov_callback_data for an example. * replace l_wait_event_killable and some wait_event calls with l_wait_event, an all-singing, all-dancing timeout- and interrupt-handling event waiting macro. More such replacement to come. * interrupt and timeout handling of bulk data will probably crash at present, but it didn't really work before either -- I'll fix it up ASAP. --- diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index 4e0f03f..2cdaddb 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -67,8 +67,9 @@ struct io_cb_data { atomic_t refcount; int complete; int err; + struct ptlrpc_bulk_desc *desc; }; -int ll_sync_io_cb(void *data, int err, int phase); +int ll_sync_io_cb(struct io_cb_data *data, int err, int phase); struct io_cb_data *ll_init_cb(void); inline void lustre_put_page(struct page *page); struct page *lustre_get_page_read(struct inode *dir, unsigned long index); @@ -407,77 +408,94 @@ static inline int obd_ioctl_getdata(char **buf, int *len, void *arg) #define OBD_IOC_DEC_FS_USE_COUNT _IO ('f', 133 ) +struct l_wait_info { + long lwi_timeout; + int (*lwi_on_timeout)(void *); + long lwi_signals; + int (*lwi_on_signal)(void *); /* XXX return is ignored for now */ + void *lwi_cb_data; +}; +#define LWI_TIMEOUT(time, cb, data) \ +((struct l_wait_info) { \ + lwi_timeout: time, \ + lwi_on_timeout: cb, \ + lwi_cb_data: data \ +}) +#define LWI_INTR(signals, cb, data) \ +((struct l_wait_info) { \ + lwi_signals: signals, \ + lwi_on_signal: cb, \ + lwi_cb_data: data \ +}) +#define LWI_TIMEOUT_INTR(time, time_cb, signals, sig_cb, data) \ +((struct l_wait_info) { \ + lwi_timeout: time, \ + lwi_on_timeout: time_cb, \ + lwi_signals: signals, \ + lwi_on_signal: sig_cb, \ + lwi_cb_data: data \ +}) /* XXX this should be one mask-check */ -#define l_killable_pending(task) \ -(sigismember(&(task->pending.signal), SIGKILL) || \ - sigismember(&(task->pending.signal), SIGINT) || \ +#define l_killable_pending(task) \ +(sigismember(&(task->pending.signal), SIGKILL) || \ + sigismember(&(task->pending.signal), SIGINT) || \ sigismember(&(task->pending.signal), SIGTERM)) -/* - * Like wait_event_interruptible, but we're only interruptible by - * KILL, INT, or TERM. - * - * XXXshaver These are going away soon, I hope. - */ -#define __l_wait_event_killable(wq, condition, ret) \ -do { \ - wait_queue_t __wait; \ - init_waitqueue_entry(&__wait, current); \ - \ - add_wait_queue(&wq, &__wait); \ - for (;;) { \ - set_current_state(TASK_INTERRUPTIBLE); \ - if (condition) \ - break; \ - if (!signal_pending(current) || \ - !l_killable_pending(current)) { \ - schedule(); \ - continue; \ - } \ - ret = -ERESTARTSYS; \ - break; \ - } \ - current->state = TASK_RUNNING; \ - remove_wait_queue(&wq, &__wait); \ -} while(0) - -#define l_wait_event_killable(wq, condition) \ -({ \ - int __ret = 0; \ - if (!(condition)) \ - __l_wait_event_killable(wq, condition, __ret); \ - __ret; \ -}) - -#define __l_wait_event_timeout(wq, condition, timeout, ret) \ -do { \ - wait_queue_t __wait; \ - init_waitqueue_entry(&__wait, current); \ - \ - add_wait_queue(&wq, &__wait); \ - for (;;) { \ - set_current_state(TASK_INTERRUPTIBLE); \ - if (condition) \ - break; \ - if (timeout) \ - schedule_timeout(timeout); \ - else \ - schedule(); \ - } \ - current->state = TASK_RUNNING; \ - remove_wait_queue(&wq, &__wait); \ +#define __l_wait_event(wq, condition, info, ret) \ +do { \ + wait_queue_t __wait; \ + long __state; \ + init_waitqueue_entry(&__wait, current); \ + \ + add_wait_queue(&wq, &__wait); \ + __state = TASK_UNINTERRUPTIBLE; \ + for (;;) { \ + set_current_state(__state); \ + if (condition) \ + break; \ + /* We only become INTERRUPTIBLE if a timeout has fired, and \ + * the caller has given us some signals to care about. \ + * \ + * XXXshaver we should check against info->wli_signals here, \ + * XXXshaver instead of just using l_killable_pending, perhaps. \ + */ \ + if (__state == TASK_INTERRUPTIBLE && \ + l_killable_pending(current)) { \ + if (info->lwi_on_signal) \ + info->lwi_on_signal(info->lwi_cb_data); \ + ret = -EINTR; \ + break; \ + } \ + if (info->lwi_timeout) { \ + if (schedule_timeout(info->lwi_timeout) == 0) { \ + /* We'll take signals only after a timeout. */ \ + if (info->lwi_signals) \ + __state = TASK_INTERRUPTIBLE; \ + if (info->lwi_on_timeout && \ + info->lwi_on_timeout(info->lwi_cb_data)) { \ + ret = -ETIMEDOUT; \ + break; \ + } \ + } \ + } else { \ + schedule(); \ + } \ + } \ + current->state = TASK_RUNNING; \ + remove_wait_queue(&wq, &__wait); \ } while(0) -#define l_wait_event_timeout(wq, condition, timeout) \ -({ \ - int __ret = 0; \ - if (!(condition)) \ - __l_wait_event_timeout(wq, condition, timeout, __ret); \ - __ret; \ +#define l_wait_event(wq, condition, info) \ +({ \ + int __ret = 0; \ + struct l_wait_info *__info = (info); \ + if (!(condition)) \ + __l_wait_event(wq, condition, __info, __ret); \ + __ret; \ }) #endif /* _LUSTRE_LIB_H */ diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index 6156266..27aed58 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -23,7 +23,8 @@ struct obd_type { int typ_refcnt; }; -typedef int (*brw_callback_t)(void *, int err, int phase); +struct io_cb_data; +typedef int (*brw_callback_t)(struct io_cb_data *, int err, int phase); struct brw_page { struct page *pg; obd_size count; @@ -206,6 +207,7 @@ struct obd_device { } u; }; +struct io_cb_data; struct obd_ops { int (*o_iocontrol)(long cmd, struct lustre_handle *, int len, @@ -241,7 +243,7 @@ struct obd_ops { int (*o_brw)(int rw, struct lustre_handle *conn, struct lov_stripe_md *md, obd_count oa_bufs, struct brw_page *pgarr, brw_callback_t callback, - void * data); + struct io_cb_data *data); int (*o_punch)(struct lustre_handle *conn, struct obdo *tgt, struct lov_stripe_md *md, obd_size count, obd_off offset); diff --git a/lustre/lib/page.c b/lustre/lib/page.c index 51bb5f5..54ed7db 100644 --- a/lustre/lib/page.c +++ b/lustre/lib/page.c @@ -48,26 +48,61 @@ #include #include #include +#include +static int sync_io_timeout(void *data) +{ + struct io_cb_data *cbd = data; + struct ptlrpc_bulk_desc *desc = cbd->desc; + + ENTRY; + desc->b_connection->c_level = LUSTRE_CONN_RECOVD; + desc->b_flags |= PTL_RPC_FL_TIMEOUT; + if (desc->b_client && desc->b_client->cli_recovd) { + /* XXXshaver Do we need a resend strategy, or do we just + * XXXshaver return -ERESTARTSYS and punt it? + */ +#if 0 + recovd_cli_fail(desc->b_client); +#endif + } + + /* We go back to sleep, until we're resumed or interrupted. */ + RETURN(0); +} + +static int sync_io_intr(void *data) +{ + struct io_cb_data *cbd = data; + struct ptlrpc_bulk_desc *desc = cbd->desc; + + ENTRY; + desc->b_flags |= PTL_RPC_FL_INTR; + RETURN(1); /* ignored, as of this writing */ +} -int ll_sync_io_cb(void *data, int err, int phase) +int ll_sync_io_cb(struct io_cb_data *data, int err, int phase) { - struct io_cb_data *d = data; int ret; ENTRY; if (phase == CB_PHASE_START) { - ret = l_wait_event_killable(d->waitq, d->complete); - if (atomic_dec_and_test(&d->refcount)) - OBD_FREE(d, sizeof(*d)); +#warning shaver hardcoded timeout + struct l_wait_info lwi; + lwi = LWI_TIMEOUT_INTR(100, sync_io_timeout, + SIGTERM | SIGKILL | SIGINT, sync_io_intr, + data); + ret = l_wait_event(data->waitq, data->complete, &lwi); + if (atomic_dec_and_test(&data->refcount)) + OBD_FREE(data, sizeof(*data)); if (ret == -ERESTARTSYS) return ret; } else if (phase == CB_PHASE_FINISH) { - d->err = err; - d->complete = 1; - wake_up(&d->waitq); - if (atomic_dec_and_test(&d->refcount)) - OBD_FREE(d, sizeof(*d)); + data->err = err; + data->complete = 1; + wake_up(&data->waitq); + if (atomic_dec_and_test(&data->refcount)) + OBD_FREE(data, sizeof(*data)); return err; } else LBUG(); @@ -75,7 +110,7 @@ int ll_sync_io_cb(void *data, int err, int phase) return 0; } -struct io_cb_data *ll_init_cb(void) +struct io_cb_data *ll_init_cb(void) { struct io_cb_data *d; diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index d00296e..ebb225c 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -511,30 +511,28 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa, } struct lov_callback_data { - atomic_t count; - struct io_cb_data *cbd; - brw_callback_t cb; - int err; + struct io_cb_data cbd; + brw_callback_t cb; }; -int lov_osc_brw_callback(void *data, int err, int phase) +int lov_osc_brw_callback(struct io_cb_data *data, int err, int phase) { - struct lov_callback_data *d = data; + struct lov_callback_data *lovcbd = (struct lov_callback_data *)data; int ret = 0; ENTRY; - if (phase == CB_PHASE_START) { + if (phase == CB_PHASE_START) RETURN(0); - } else if (phase == CB_PHASE_FINISH) { + + if (phase == CB_PHASE_FINISH) { if (err) - d->err = err; - if (atomic_dec_and_test(&d->count)) { - ret = d->cb(d->cbd, 0, d->err); - } + lovcbd->cbd.err = err; + if (atomic_dec_and_test(&lovcbd->cbd.refcount)) + ret = lovcbd->cb(&lovcbd->cbd, 0, lovcbd->cbd.err); RETURN(ret); - } else - LBUG(); - EXIT; + } + + LBUG(); return 0; } @@ -542,7 +540,7 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn, struct lov_stripe_md *md, obd_count oa_bufs, struct brw_page *pga, - brw_callback_t callback, void *data) + brw_callback_t callback, struct io_cb_data *data) { int stripe_count = md->lmd_stripe_count; struct obd_export *export = class_conn2export(conn); @@ -598,18 +596,18 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn, stripeinfo[which].subcount++; } + lov_cb_data->cbd = *data; lov_cb_data->cb = callback; - lov_cb_data->cbd = data; - atomic_set(&lov_cb_data->count, oa_bufs); + atomic_set(&lov_cb_data->cbd.refcount, oa_bufs); for (i=0 ; i < stripe_count ; i++) { int shift = stripeinfo[i].index; obd_brw(cmd, &lov->tgts[i].conn, &stripeinfo[i].md, stripeinfo[i].bufct, &ioarr[shift], - lov_osc_brw_callback, &lov_cb_data); + lov_osc_brw_callback, (struct io_cb_data *)lov_cb_data); } - rc = callback(lov_cb_data, 0, CB_PHASE_START); + rc = callback((struct io_cb_data *)lov_cb_data, 0, CB_PHASE_START); RETURN(rc); } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 3f960a4..4bb1dc7 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -25,8 +25,7 @@ #include #include #include - - +#include static int osc_getattr(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *md) @@ -319,9 +318,9 @@ static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data) int err = 0; ENTRY; - if (desc->b_flags & PTL_RPC_FL_INTR) { - err = -ERESTARTSYS; - CERROR("got signal\n"); + if (desc->b_flags & PTL_RPC_FL_TIMEOUT) { + err = (desc->b_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS : + -ETIMEDOUT); } if (cb_data->callback) @@ -339,7 +338,9 @@ static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data) EXIT; } -static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, obd_count page_count, struct brw_page *pga, brw_callback_t callback, void *data) +static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, + obd_count page_count, struct brw_page *pga, + brw_callback_t callback, struct io_cb_data *data) { struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn; struct ptlrpc_request *request = NULL; @@ -453,7 +454,7 @@ out_unmap: static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md, obd_count page_count, struct brw_page *pga, - brw_callback_t callback, void *data) + brw_callback_t callback, struct io_cb_data *data) { struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn; struct ptlrpc_request *request = NULL; @@ -581,7 +582,7 @@ out_cb: static int osc_brw(int cmd, struct lustre_handle *conn, struct lov_stripe_md *md, obd_count page_count, struct brw_page *pagear, brw_callback_t callback, - void *data) + struct io_cb_data *data) { if (cmd & OBD_BRW_WRITE) return osc_brw_write(conn, md, page_count, pagear, callback, data); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index d830e82a..31d4e91 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -269,6 +269,7 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req) GOTO(out, rc = 1); } +#if 0 if (req->rq_flags & PTL_RPC_FL_RESEND) { if (l_killable_pending(current)) { CERROR("-- INTR --\n"); @@ -278,6 +279,7 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req) CERROR("-- RESEND --\n"); GOTO(out, rc = 1); } +#endif if (req->rq_flags & PTL_RPC_FL_RECOVERY) { CERROR("-- RESTART --\n"); @@ -470,9 +472,38 @@ void ptlrpc_restart_req(struct ptlrpc_request *req) EXIT; } +static int expired_request(void *data) +{ + struct ptlrpc_request *req = data; + + ENTRY; + req->rq_timeout = 0; + req->rq_connection->c_level = LUSTRE_CONN_RECOVD; + req->rq_flags |= PTL_RPC_FL_TIMEOUT; + /* Activate the recovd for this client, if there is one. */ + if (req->rq_client && req->rq_client->cli_recovd) + recovd_cli_fail(req->rq_client); + + /* If this request is for recovery or other primordial tasks, + * don't go back to sleep. + */ + if (req->rq_level < LUSTRE_CONN_FULL) + RETURN(1); + RETURN(0); +} + +static int interrupted_request(void *data) +{ + struct ptlrpc_request *req = data; + ENTRY; + req->rq_flags |= PTL_RPC_FL_INTR; + RETURN(1); /* ignored, as of this writing */ +} + int ptlrpc_queue_wait(struct ptlrpc_request *req) { - int rc = 0, timeout; + int rc = 0; + struct l_wait_info lwi; struct ptlrpc_client *cli = req->rq_client; ENTRY; @@ -485,16 +516,22 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) if (req->rq_level > req->rq_connection->c_level) { CERROR("process %d waiting for recovery (%d > %d)\n", current->pid, req->rq_level, req->rq_connection->c_level); + spin_lock(&cli->cli_lock); list_del_init(&req->rq_list); list_add_tail(&req->rq_list, &cli->cli_delayed_head); spin_unlock(&cli->cli_lock); - l_wait_event_killable - (req->rq_wait_for_rep, - req->rq_level <= req->rq_connection->c_level); + +#warning shaver: what happens when we get interrupted during this wait? + lwi = LWI_INTR(SIGTERM | SIGKILL | SIGINT, NULL, NULL); + l_wait_event(req->rq_wait_for_rep, + req->rq_level <= req->rq_connection->c_level, + &lwi); + spin_lock(&cli->cli_lock); list_del_init(&req->rq_list); spin_unlock(&cli->cli_lock); + CERROR("process %d resumed\n", current->pid); } resend: @@ -516,22 +553,23 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) spin_unlock(&cli->cli_lock); CDEBUG(D_OTHER, "-- sleeping\n"); - /* - * req->rq_timeout gets reset in the timeout case, and - * l_wait_event_timeout is a macro, so save the timeout value here. - */ - timeout = req->rq_timeout * HZ; - l_wait_event_timeout(req->rq_wait_for_rep, ptlrpc_check_reply(req), - timeout); + lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request, + SIGKILL | SIGTERM | SIGINT, interrupted_request, + req); + l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi); CDEBUG(D_OTHER, "-- done\n"); - if (req->rq_flags & PTL_RPC_FL_RESEND) { + /* Don't resend if we were interrupted. */ + if ((req->rq_flags & (PTL_RPC_FL_RESEND | PTL_RPC_FL_INTR)) == + PTL_RPC_FL_RESEND) { req->rq_flags &= ~PTL_RPC_FL_RESEND; goto resend; } up(&cli->cli_rpc_sem); if (req->rq_flags & PTL_RPC_FL_INTR) { + if (!(req->rq_flags & PTL_RPC_FL_TIMEOUT)) + LBUG(); /* should only be interrupted if we timed out. */ /* Clean up the dangling reply buffers */ ptlrpc_abort(req); GOTO(out, rc = -EINTR); @@ -568,6 +606,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) { int rc = 0; struct ptlrpc_client *cli = req->rq_client; + struct l_wait_info lwi = LWI_INTR(SIGKILL|SIGTERM|SIGINT, NULL, NULL); ENTRY; init_waitqueue_head(&req->rq_wait_for_rep); @@ -586,7 +625,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) } CDEBUG(D_OTHER, "-- sleeping\n"); - l_wait_event_killable(req->rq_wait_for_rep, ptlrpc_check_reply(req)); + l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi); CDEBUG(D_OTHER, "-- done\n"); up(&cli->cli_rpc_sem);