atomic_t refcount;
int complete;
int err;
+ struct ptlrpc_bulk_desc *desc;
};
-int ll_sync_io_cb(void *data, int err, int phase);
+int ll_sync_io_cb(struct io_cb_data *data, int err, int phase);
struct io_cb_data *ll_init_cb(void);
inline void lustre_put_page(struct page *page);
struct page *lustre_get_page_read(struct inode *dir, unsigned long index);
#define OBD_IOC_DEC_FS_USE_COUNT _IO ('f', 133 )
+struct l_wait_info {
+ long lwi_timeout;
+ int (*lwi_on_timeout)(void *);
+ long lwi_signals;
+ int (*lwi_on_signal)(void *); /* XXX return is ignored for now */
+ void *lwi_cb_data;
+};
+#define LWI_TIMEOUT(time, cb, data) \
+((struct l_wait_info) { \
+ lwi_timeout: time, \
+ lwi_on_timeout: cb, \
+ lwi_cb_data: data \
+})
+#define LWI_INTR(signals, cb, data) \
+((struct l_wait_info) { \
+ lwi_signals: signals, \
+ lwi_on_signal: cb, \
+ lwi_cb_data: data \
+})
+#define LWI_TIMEOUT_INTR(time, time_cb, signals, sig_cb, data) \
+((struct l_wait_info) { \
+ lwi_timeout: time, \
+ lwi_on_timeout: time_cb, \
+ lwi_signals: signals, \
+ lwi_on_signal: sig_cb, \
+ lwi_cb_data: data \
+})
/* XXX this should be one mask-check */
-#define l_killable_pending(task) \
-(sigismember(&(task->pending.signal), SIGKILL) || \
- sigismember(&(task->pending.signal), SIGINT) || \
+#define l_killable_pending(task) \
+(sigismember(&(task->pending.signal), SIGKILL) || \
+ sigismember(&(task->pending.signal), SIGINT) || \
sigismember(&(task->pending.signal), SIGTERM))
-/*
- * Like wait_event_interruptible, but we're only interruptible by
- * KILL, INT, or TERM.
- *
- * XXXshaver These are going away soon, I hope.
- */
-#define __l_wait_event_killable(wq, condition, ret) \
-do { \
- wait_queue_t __wait; \
- init_waitqueue_entry(&__wait, current); \
- \
- add_wait_queue(&wq, &__wait); \
- for (;;) { \
- set_current_state(TASK_INTERRUPTIBLE); \
- if (condition) \
- break; \
- if (!signal_pending(current) || \
- !l_killable_pending(current)) { \
- schedule(); \
- continue; \
- } \
- ret = -ERESTARTSYS; \
- break; \
- } \
- current->state = TASK_RUNNING; \
- remove_wait_queue(&wq, &__wait); \
-} while(0)
-
-#define l_wait_event_killable(wq, condition) \
-({ \
- int __ret = 0; \
- if (!(condition)) \
- __l_wait_event_killable(wq, condition, __ret); \
- __ret; \
-})
-
-#define __l_wait_event_timeout(wq, condition, timeout, ret) \
-do { \
- wait_queue_t __wait; \
- init_waitqueue_entry(&__wait, current); \
- \
- add_wait_queue(&wq, &__wait); \
- for (;;) { \
- set_current_state(TASK_INTERRUPTIBLE); \
- if (condition) \
- break; \
- if (timeout) \
- schedule_timeout(timeout); \
- else \
- schedule(); \
- } \
- current->state = TASK_RUNNING; \
- remove_wait_queue(&wq, &__wait); \
+#define __l_wait_event(wq, condition, info, ret) \
+do { \
+ wait_queue_t __wait; \
+ long __state; \
+ init_waitqueue_entry(&__wait, current); \
+ \
+ add_wait_queue(&wq, &__wait); \
+ __state = TASK_UNINTERRUPTIBLE; \
+ for (;;) { \
+ set_current_state(__state); \
+ if (condition) \
+ break; \
+ /* We only become INTERRUPTIBLE if a timeout has fired, and \
+ * the caller has given us some signals to care about. \
+ * \
+ * XXXshaver we should check against info->wli_signals here, \
+ * XXXshaver instead of just using l_killable_pending, perhaps. \
+ */ \
+ if (__state == TASK_INTERRUPTIBLE && \
+ l_killable_pending(current)) { \
+ if (info->lwi_on_signal) \
+ info->lwi_on_signal(info->lwi_cb_data); \
+ ret = -EINTR; \
+ break; \
+ } \
+ if (info->lwi_timeout) { \
+ if (schedule_timeout(info->lwi_timeout) == 0) { \
+ /* We'll take signals only after a timeout. */ \
+ if (info->lwi_signals) \
+ __state = TASK_INTERRUPTIBLE; \
+ if (info->lwi_on_timeout && \
+ info->lwi_on_timeout(info->lwi_cb_data)) { \
+ ret = -ETIMEDOUT; \
+ break; \
+ } \
+ } \
+ } else { \
+ schedule(); \
+ } \
+ } \
+ current->state = TASK_RUNNING; \
+ remove_wait_queue(&wq, &__wait); \
} while(0)
-#define l_wait_event_timeout(wq, condition, timeout) \
-({ \
- int __ret = 0; \
- if (!(condition)) \
- __l_wait_event_timeout(wq, condition, timeout, __ret); \
- __ret; \
+#define l_wait_event(wq, condition, info) \
+({ \
+ int __ret = 0; \
+ struct l_wait_info *__info = (info); \
+ if (!(condition)) \
+ __l_wait_event(wq, condition, __info, __ret); \
+ __ret; \
})
#endif /* _LUSTRE_LIB_H */
int typ_refcnt;
};
-typedef int (*brw_callback_t)(void *, int err, int phase);
+struct io_cb_data;
+typedef int (*brw_callback_t)(struct io_cb_data *, int err, int phase);
struct brw_page {
struct page *pg;
obd_size count;
} u;
};
+struct io_cb_data;
struct obd_ops {
int (*o_iocontrol)(long cmd, struct lustre_handle *, int len,
int (*o_brw)(int rw, struct lustre_handle *conn,
struct lov_stripe_md *md, obd_count oa_bufs,
struct brw_page *pgarr, brw_callback_t callback,
- void * data);
+ struct io_cb_data *data);
int (*o_punch)(struct lustre_handle *conn, struct obdo *tgt,
struct lov_stripe_md *md, obd_size count,
obd_off offset);
#include <linux/obd_class.h>
#include <linux/lustre_net.h>
#include <linux/lustre_lib.h>
+#include <linux/lustre_ha.h>
+static int sync_io_timeout(void *data)
+{
+ struct io_cb_data *cbd = data;
+ struct ptlrpc_bulk_desc *desc = cbd->desc;
+
+ ENTRY;
+ desc->b_connection->c_level = LUSTRE_CONN_RECOVD;
+ desc->b_flags |= PTL_RPC_FL_TIMEOUT;
+ if (desc->b_client && desc->b_client->cli_recovd) {
+ /* XXXshaver Do we need a resend strategy, or do we just
+ * XXXshaver return -ERESTARTSYS and punt it?
+ */
+#if 0
+ recovd_cli_fail(desc->b_client);
+#endif
+ }
+
+ /* We go back to sleep, until we're resumed or interrupted. */
+ RETURN(0);
+}
+
+static int sync_io_intr(void *data)
+{
+ struct io_cb_data *cbd = data;
+ struct ptlrpc_bulk_desc *desc = cbd->desc;
+
+ ENTRY;
+ desc->b_flags |= PTL_RPC_FL_INTR;
+ RETURN(1); /* ignored, as of this writing */
+}
-int ll_sync_io_cb(void *data, int err, int phase)
+int ll_sync_io_cb(struct io_cb_data *data, int err, int phase)
{
- struct io_cb_data *d = data;
int ret;
ENTRY;
if (phase == CB_PHASE_START) {
- ret = l_wait_event_killable(d->waitq, d->complete);
- if (atomic_dec_and_test(&d->refcount))
- OBD_FREE(d, sizeof(*d));
+#warning shaver hardcoded timeout
+ struct l_wait_info lwi;
+ lwi = LWI_TIMEOUT_INTR(100, sync_io_timeout,
+ SIGTERM | SIGKILL | SIGINT, sync_io_intr,
+ data);
+ ret = l_wait_event(data->waitq, data->complete, &lwi);
+ if (atomic_dec_and_test(&data->refcount))
+ OBD_FREE(data, sizeof(*data));
if (ret == -ERESTARTSYS)
return ret;
} else if (phase == CB_PHASE_FINISH) {
- d->err = err;
- d->complete = 1;
- wake_up(&d->waitq);
- if (atomic_dec_and_test(&d->refcount))
- OBD_FREE(d, sizeof(*d));
+ data->err = err;
+ data->complete = 1;
+ wake_up(&data->waitq);
+ if (atomic_dec_and_test(&data->refcount))
+ OBD_FREE(data, sizeof(*data));
return err;
} else
LBUG();
return 0;
}
-struct io_cb_data *ll_init_cb(void)
+struct io_cb_data *ll_init_cb(void)
{
struct io_cb_data *d;
}
struct lov_callback_data {
- atomic_t count;
- struct io_cb_data *cbd;
- brw_callback_t cb;
- int err;
+ struct io_cb_data cbd;
+ brw_callback_t cb;
};
-int lov_osc_brw_callback(void *data, int err, int phase)
+int lov_osc_brw_callback(struct io_cb_data *data, int err, int phase)
{
- struct lov_callback_data *d = data;
+ struct lov_callback_data *lovcbd = (struct lov_callback_data *)data;
int ret = 0;
ENTRY;
- if (phase == CB_PHASE_START) {
+ if (phase == CB_PHASE_START)
RETURN(0);
- } else if (phase == CB_PHASE_FINISH) {
+
+ if (phase == CB_PHASE_FINISH) {
if (err)
- d->err = err;
- if (atomic_dec_and_test(&d->count)) {
- ret = d->cb(d->cbd, 0, d->err);
- }
+ lovcbd->cbd.err = err;
+ if (atomic_dec_and_test(&lovcbd->cbd.refcount))
+ ret = lovcbd->cb(&lovcbd->cbd, 0, lovcbd->cbd.err);
RETURN(ret);
- } else
- LBUG();
- EXIT;
+ }
+
+ LBUG();
return 0;
}
struct lov_stripe_md *md,
obd_count oa_bufs,
struct brw_page *pga,
- brw_callback_t callback, void *data)
+ brw_callback_t callback, struct io_cb_data *data)
{
int stripe_count = md->lmd_stripe_count;
struct obd_export *export = class_conn2export(conn);
stripeinfo[which].subcount++;
}
+ lov_cb_data->cbd = *data;
lov_cb_data->cb = callback;
- lov_cb_data->cbd = data;
- atomic_set(&lov_cb_data->count, oa_bufs);
+ atomic_set(&lov_cb_data->cbd.refcount, oa_bufs);
for (i=0 ; i < stripe_count ; i++) {
int shift = stripeinfo[i].index;
obd_brw(cmd, &lov->tgts[i].conn, &stripeinfo[i].md,
stripeinfo[i].bufct, &ioarr[shift],
- lov_osc_brw_callback, &lov_cb_data);
+ lov_osc_brw_callback, (struct io_cb_data *)lov_cb_data);
}
- rc = callback(lov_cb_data, 0, CB_PHASE_START);
+ rc = callback((struct io_cb_data *)lov_cb_data, 0, CB_PHASE_START);
RETURN(rc);
}
#include <linux/obd_ost.h>
#include <linux/obd_lov.h>
#include <linux/init.h>
-
-
+#include <linux/lustre_ha.h>
static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *md)
int err = 0;
ENTRY;
- if (desc->b_flags & PTL_RPC_FL_INTR) {
- err = -ERESTARTSYS;
- CERROR("got signal\n");
+ if (desc->b_flags & PTL_RPC_FL_TIMEOUT) {
+ err = (desc->b_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
+ -ETIMEDOUT);
}
if (cb_data->callback)
EXIT;
}
-static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, obd_count page_count, struct brw_page *pga, brw_callback_t callback, void *data)
+static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
+ obd_count page_count, struct brw_page *pga,
+ brw_callback_t callback, struct io_cb_data *data)
{
struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
struct ptlrpc_request *request = NULL;
static int osc_brw_write(struct lustre_handle *conn,
struct lov_stripe_md *md, obd_count page_count,
struct brw_page *pga,
- brw_callback_t callback, void *data)
+ brw_callback_t callback, struct io_cb_data *data)
{
struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
struct ptlrpc_request *request = NULL;
static int osc_brw(int cmd, struct lustre_handle *conn,
struct lov_stripe_md *md, obd_count page_count,
struct brw_page *pagear, brw_callback_t callback,
- void *data)
+ struct io_cb_data *data)
{
if (cmd & OBD_BRW_WRITE)
return osc_brw_write(conn, md, page_count, pagear, callback, data);
GOTO(out, rc = 1);
}
+#if 0
if (req->rq_flags & PTL_RPC_FL_RESEND) {
if (l_killable_pending(current)) {
CERROR("-- INTR --\n");
CERROR("-- RESEND --\n");
GOTO(out, rc = 1);
}
+#endif
if (req->rq_flags & PTL_RPC_FL_RECOVERY) {
CERROR("-- RESTART --\n");
EXIT;
}
+static int expired_request(void *data)
+{
+ struct ptlrpc_request *req = data;
+
+ ENTRY;
+ req->rq_timeout = 0;
+ req->rq_connection->c_level = LUSTRE_CONN_RECOVD;
+ req->rq_flags |= PTL_RPC_FL_TIMEOUT;
+ /* Activate the recovd for this client, if there is one. */
+ if (req->rq_client && req->rq_client->cli_recovd)
+ recovd_cli_fail(req->rq_client);
+
+ /* If this request is for recovery or other primordial tasks,
+ * don't go back to sleep.
+ */
+ if (req->rq_level < LUSTRE_CONN_FULL)
+ RETURN(1);
+ RETURN(0);
+}
+
+static int interrupted_request(void *data)
+{
+ struct ptlrpc_request *req = data;
+ ENTRY;
+ req->rq_flags |= PTL_RPC_FL_INTR;
+ RETURN(1); /* ignored, as of this writing */
+}
+
int ptlrpc_queue_wait(struct ptlrpc_request *req)
{
- int rc = 0, timeout;
+ int rc = 0;
+ struct l_wait_info lwi;
struct ptlrpc_client *cli = req->rq_client;
ENTRY;
if (req->rq_level > req->rq_connection->c_level) {
CERROR("process %d waiting for recovery (%d > %d)\n",
current->pid, req->rq_level, req->rq_connection->c_level);
+
spin_lock(&cli->cli_lock);
list_del_init(&req->rq_list);
list_add_tail(&req->rq_list, &cli->cli_delayed_head);
spin_unlock(&cli->cli_lock);
- l_wait_event_killable
- (req->rq_wait_for_rep,
- req->rq_level <= req->rq_connection->c_level);
+
+#warning shaver: what happens when we get interrupted during this wait?
+ lwi = LWI_INTR(SIGTERM | SIGKILL | SIGINT, NULL, NULL);
+ l_wait_event(req->rq_wait_for_rep,
+ req->rq_level <= req->rq_connection->c_level,
+ &lwi);
+
spin_lock(&cli->cli_lock);
list_del_init(&req->rq_list);
spin_unlock(&cli->cli_lock);
+
CERROR("process %d resumed\n", current->pid);
}
resend:
spin_unlock(&cli->cli_lock);
CDEBUG(D_OTHER, "-- sleeping\n");
- /*
- * req->rq_timeout gets reset in the timeout case, and
- * l_wait_event_timeout is a macro, so save the timeout value here.
- */
- timeout = req->rq_timeout * HZ;
- l_wait_event_timeout(req->rq_wait_for_rep, ptlrpc_check_reply(req),
- timeout);
+ lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
+ SIGKILL | SIGTERM | SIGINT, interrupted_request,
+ req);
+ l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
CDEBUG(D_OTHER, "-- done\n");
- if (req->rq_flags & PTL_RPC_FL_RESEND) {
+ /* Don't resend if we were interrupted. */
+ if ((req->rq_flags & (PTL_RPC_FL_RESEND | PTL_RPC_FL_INTR)) ==
+ PTL_RPC_FL_RESEND) {
req->rq_flags &= ~PTL_RPC_FL_RESEND;
goto resend;
}
up(&cli->cli_rpc_sem);
if (req->rq_flags & PTL_RPC_FL_INTR) {
+ if (!(req->rq_flags & PTL_RPC_FL_TIMEOUT))
+ LBUG(); /* should only be interrupted if we timed out. */
/* Clean up the dangling reply buffers */
ptlrpc_abort(req);
GOTO(out, rc = -EINTR);
{
int rc = 0;
struct ptlrpc_client *cli = req->rq_client;
+ struct l_wait_info lwi = LWI_INTR(SIGKILL|SIGTERM|SIGINT, NULL, NULL);
ENTRY;
init_waitqueue_head(&req->rq_wait_for_rep);
}
CDEBUG(D_OTHER, "-- sleeping\n");
- l_wait_event_killable(req->rq_wait_for_rep, ptlrpc_check_reply(req));
+ l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
CDEBUG(D_OTHER, "-- done\n");
up(&cli->cli_rpc_sem);