Whamcloud - gitweb
* l_wait_event can now do interrupts without a timeout, if we're feeling brave.
authorshaver <shaver>
Sat, 17 Aug 2002 22:06:32 +0000 (22:06 +0000)
committershaver <shaver>
Sat, 17 Aug 2002 22:06:32 +0000 (22:06 +0000)
* Big doc comment for l_wait_event.
* Only fire the timeout once from l_wait_event.
* Made timeout and the recovery-upcall path configurable via sysctl.
* Added OBD_FAIL_OSC codes for simulating simple client failure.
* Tentative rewiring of recovd into client connections, needs more thought
  and then more typing. We do fire the upcall, at least.
* Use the provided cluuid instead of NULL wherever it's handy already.
* Protect (feebly) against waiting for recovery that will never happen,
  in sync_io_timeout.
* Add timeouts to bulk operations in MDS and OST -- a recovery stub is now
  triggered, but nothing else.
* Document the unpleasant business in osc_brw_{read,write} as pertains to
  errors in the callbacks and cleanup of descriptors.
* Remove now-unused ptlrpc_check_bulk_{sent,received}.

15 files changed:
lustre/include/linux/lustre_lib.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/obd_support.h
lustre/lib/l_net.c
lustre/lib/page.c
lustre/llite/recover.c
lustre/mds/handler.c
lustre/obdclass/class_obd.c
lustre/obdclass/sysctl.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/recovd.c
lustre/ptlrpc/rpc.c

index 47f3fdc..2b3ff7a 100644 (file)
@@ -413,6 +413,42 @@ static inline int obd_ioctl_getdata(char **buf, int *len, void *arg)
 
 #define OBD_IOC_DEC_FS_USE_COUNT       _IO  ('f', 133      )
 
+/*
+ * l_wait_event is a flexible sleeping function, permitting simple caller
+ * configuration of interrupt and timeout sensitivity along with actions to
+ * be performed in the event of either exception.
+ *
+ * Common usage looks like this:
+ * 
+ * struct l_wait_info lwi = LWI_TIMEOUT_INTR(timeout, timeout_handler,
+ *                                           intr_handler, callback_data);
+ * rc = l_wait_event(waitq, condition, &lwi);
+ *
+ * (LWI_TIMEOUT and LWI_INTR macros are available for timeout- and
+ * interrupt-only variants, respectively.)
+ *
+ * If a timeout is specified, the timeout_handler will be invoked in the event
+ * that the timeout expires before the process is awakened.  (Note that any
+ * waking of the process will restart the timeout, even if the condition is
+ * not satisfied and the process immediately returns to sleep.  This might be
+ * considered a bug.)  If the timeout_handler returns non-zero, l_wait_event
+ * will return -ETIMEDOUT and the caller will continue.  If the handler returns
+ * zero instead, the process will go back to sleep until it is awakened by the
+ * waitq or some similar mechanism, or an interrupt occurs (if the caller has
+ * asked for interrupts to be detected).  The timeout will only fire once, so
+ * callers should take care that a timeout_handler which returns zero will take
+ * future steps to awaken the process.  N.B. that these steps must include making
+ * the provided condition become true.
+ *
+ * If the interrupt flag (lwi_signals) is non-zero, then the process will be
+ * interruptible, and will be awakened by any "killable" signal (SIGTERM,
+ * SIGKILL or SIGINT).  If a timeout is also specified, then the process will
+ * only become interruptible _after_ the timeout has expired, though it can be
+ * awakened by a signal that was delivered before the timeout and is still
+ * pending when the timeout expires.  If a timeout is not specified, the process
+ * will be interruptible at all times during l_wait_event.
+ */
+
 struct l_wait_info {
         long   lwi_timeout;
         int  (*lwi_on_timeout)(void *);
@@ -428,18 +464,18 @@ struct l_wait_info {
         lwi_cb_data:    data                                                    \
 })
 
-#define LWI_INTR(signals, cb, data)                                             \
+#define LWI_INTR(cb, data)                                                      \
 ((struct l_wait_info) {                                                         \
-        lwi_signals:   signals,                                                 \
+        lwi_signals:   1,                                                       \
         lwi_on_signal: cb,                                                      \
         lwi_cb_data:   data                                                     \
 })
 
-#define LWI_TIMEOUT_INTR(time, time_cb, signals, sig_cb, data)                  \
+#define LWI_TIMEOUT_INTR(time, time_cb, sig_cb, data)                           \
 ((struct l_wait_info) {                                                         \
         lwi_timeout:    time,                                                   \
         lwi_on_timeout: time_cb,                                                \
-        lwi_signals:    signals,                                                \
+        lwi_signals:    1,                                                      \
         lwi_on_signal:  sig_cb,                                                 \
         lwi_cb_data:    data                                                    \
 })
@@ -454,48 +490,44 @@ struct l_wait_info {
 do {                                                                            \
         wait_queue_t __wait;                                                    \
         long __state;                                                           \
+        int __timed_out = 0;                                                    \
         init_waitqueue_entry(&__wait, current);                                 \
                                                                                 \
         add_wait_queue(&wq, &__wait);                                           \
-        __state = TASK_UNINTERRUPTIBLE;                                         \
+        if (info->lwi_signals && !info->lwi_timeout)                            \
+            __state = TASK_INTERRUPTIBLE;                                       \
+        else                                                                    \
+            __state = TASK_UNINTERRUPTIBLE;                                     \
         for (;;) {                                                              \
             set_current_state(__state);                                         \
             if (condition)                                                      \
                     break;                                                      \
-            /* We only become INTERRUPTIBLE if a timeout has fired, and         \
-             * the caller has given us some signals to care about.              \
-             *                                                                  \
-             * XXXshaver we should check against info->wli_signals here,        \
-             * XXXshaver instead of just using l_killable_pending, perhaps.     \
-             */                                                                 \
-            if (__state == TASK_INTERRUPTIBLE &&                                \
-                l_killable_pending(current)) {                                  \
-                    CERROR("lwe: interrupt for %d\n", current->pid);            \
-                    if (info->lwi_on_signal)                                    \
-                            info->lwi_on_signal(info->lwi_cb_data);             \
-                    ret = -EINTR;                                               \
-                    break;                                                      \
+            if (__state == TASK_INTERRUPTIBLE && l_killable_pending(current)) { \
+                CERROR("lwe: interrupt\n");                                     \
+                if (info->lwi_on_signal)                                        \
+                        info->lwi_on_signal(info->lwi_cb_data);                 \
+                ret = -EINTR;                                                   \
+                break;                                                          \
             }                                                                   \
-            if (info->lwi_timeout) {                                            \
+            if (info->lwi_timeout && !__timed_out) {                            \
                 if (schedule_timeout(info->lwi_timeout) == 0) {                 \
-                    CERROR("lwe: timeout for %d\n", current->pid);              \
+                    CERROR("lwe: timeout\n");                                   \
+                    __timed_out = 1;                                            \
                     if (!info->lwi_on_timeout ||                                \
                         info->lwi_on_timeout(info->lwi_cb_data)) {              \
                         ret = -ETIMEDOUT;                                       \
                         break;                                                  \
                     }                                                           \
-                    /* We'll take signals only after a timeout. */              \
+                    /* We'll take signals after a timeout. */                   \
                     if (info->lwi_signals) {                                    \
                         __state = TASK_INTERRUPTIBLE;                           \
                         /* Check for a pending interrupt. */                    \
-                        if (info->lwi_signals &&                                \
-                            l_killable_pending(current)) {                      \
-                             CERROR("lwe: pending interrupt for %d\n",          \
-                                    current->pid);                              \
-                             if (info->lwi_on_signal)                           \
-                                 info->lwi_on_signal(info->lwi_cb_data);        \
-                             ret = -EINTR;                                      \
-                             break;                                             \
+                        if (info->lwi_signals && l_killable_pending(current)) { \
+                            CERROR("lwe: pending interrupt\n");                 \
+                            if (info->lwi_on_signal)                            \
+                                info->lwi_on_signal(info->lwi_cb_data);         \
+                            ret = -EINTR;                                       \
+                            break;                                              \
                         }                                                       \
                     }                                                           \
                 }                                                               \
index 3177cdf..45ec453 100644 (file)
@@ -88,8 +88,8 @@ struct mds_export_data {
 
 /* file data for open files on MDS */
 struct mds_file_data {
-        struct list_head mfd_list;
-        struct file mfd_file;
+        struct list_head  mfd_list;
+        struct file      *mfd_file;
         __u64             mfd_clientfd;
         __u32             mfd_clientcookie;
 };
index 72f3a94..e9cd118 100644 (file)
@@ -30,6 +30,8 @@
 /* global variables */
 extern unsigned long obd_memory;
 extern unsigned long obd_fail_loc;
+extern unsigned long obd_timeout;
+extern char obd_recovery_upcall[128];
 
 #define OBD_FAIL_MDS                     0x100
 #define OBD_FAIL_MDS_HANDLE_UNPACK       0x101
@@ -80,7 +82,7 @@ extern unsigned long obd_fail_loc;
 #define OBD_FAIL_OST_BRW_WRITE_BULK      0x20e
 #define OBD_FAIL_OST_BRW_READ_BULK       0x20f
 
-#define OBB_FAIL_LDLM                    0x300
+#define OBD_FAIL_LDLM                    0x300
 #define OBD_FAIL_LDLM_NAMESPACE_NEW      0x301
 #define OBD_FAIL_LDLM_ENQUEUE            0x302
 #define OBD_FAIL_LDLM_CONVERT            0x303
@@ -88,6 +90,10 @@ extern unsigned long obd_fail_loc;
 #define OBD_FAIL_LDLM_BL_CALLBACK        0x305
 #define OBD_FAIL_LDLM_CP_CALLBACK        0x306
 
+#define OBD_FAIL_OSC                     0x400
+#define OBD_FAIL_OSC_BRW_READ_BULK       0x401
+#define OBD_FAIL_OSC_BRW_WRITE_BULK      0x402
+
 /* preparation for a more advanced failure testbed (not functional yet) */
 #define OBD_FAIL_MASK_SYS    0x0000FF00
 #define OBD_FAIL_MASK_LOC    (0x000000FF | OBD_FAIL_MASK_SYS)
index a1a687c..239f8c0 100644 (file)
@@ -98,9 +98,11 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         /* XXX get recovery hooked in here again */
         //ptlrpc_init_client(ptlrpc_connmgr, ll_recover,...
 
-        ptlrpc_init_client(NULL, NULL, rq_portal, rp_portal, mdc->cl_client);
-        ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
-                           mdc->cl_ldlm_client);
+        ptlrpc_init_client(ptlrpc_connmgr, NULL, rq_portal, rp_portal,
+                           mdc->cl_client);
+        /* XXXshaver Should the LDLM have its own recover function? Probably. */
+        ptlrpc_init_client(ptlrpc_connmgr, NULL, LDLM_REQUEST_PORTAL,
+                           LDLM_REPLY_PORTAL, mdc->cl_ldlm_client);
         mdc->cl_client->cli_name = "mdc";
         mdc->cl_ldlm_client->cli_name = "ldlm";
         mdc->cl_max_mdsize = sizeof(struct lov_stripe_md);
@@ -142,8 +144,7 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
         ENTRY;
         down(&cli->cl_sem);
         MOD_INC_USE_COUNT;
-#warning shaver: we might need a real cluuid here
-        rc = class_connect(conn, obd, NULL);
+        rc = class_connect(conn, obd, cluuid);
         if (rc) {
                 MOD_DEC_USE_COUNT;
                 GOTO(out_sem, rc);
index ddcd5de..ac7660e 100644 (file)
@@ -58,16 +58,20 @@ static int sync_io_timeout(void *data)
         ENTRY;
         desc->b_connection->c_level = LUSTRE_CONN_RECOVD;
         desc->b_flags |= PTL_RPC_FL_TIMEOUT;
-        if (desc->b_client && desc->b_client->cli_recovd) {
+        if (desc->b_client && desc->b_client->cli_recovd &&
+            class_signal_client_failure) {
                 /* XXXshaver Do we need a resend strategy, or do we just
                  * XXXshaver return -ERESTARTSYS and punt it?
                  */
                 CERROR("signalling failure of client %p\n", desc->b_client);
                 class_signal_client_failure(desc->b_client);
-        }
 
-        /* We go back to sleep, until we're resumed or interrupted. */
-        RETURN(0);
+                /* We go back to sleep, until we're resumed or interrupted. */
+                RETURN(0);
+        }
+        
+        /* If we can't be recovered, just abort the syscall with -ETIMEDOUT. */
+        RETURN(1);
 }
 
 static int sync_io_intr(void *data)
@@ -86,11 +90,9 @@ int ll_sync_io_cb(struct io_cb_data *data, int err, int phase)
         ENTRY; 
 
         if (phase == CB_PHASE_START) { 
-#warning shaver hardcoded timeout (/proc/sys/lustre/timeout)
                 struct l_wait_info lwi;
-                lwi = LWI_TIMEOUT_INTR(100 * HZ, sync_io_timeout,
-                                       SIGTERM | SIGKILL | SIGINT, sync_io_intr,
-                                       data);
+                lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, sync_io_timeout,
+                                       sync_io_intr, data);
                 ret = l_wait_event(data->waitq, data->complete, &lwi);
                 if (atomic_dec_and_test(&data->refcount))
                         OBD_FREE(data, sizeof(*data));
index f2e4719..e282daf 100644 (file)
@@ -57,7 +57,6 @@ static int ll_reconnect(struct ll_sb_info *sbi)
         return err;
 }
 
-
 int ll_recover(struct ptlrpc_client *cli)
 {
         struct ptlrpc_request *req;
index 9a4c151..dff14c1 100644 (file)
@@ -45,6 +45,14 @@ inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
         return &req->rq_export->exp_obd->u.mds;
 }
 
+static int mds_bulk_timeout(void *data)
+{
+        struct ptlrpc_bulk_desc *desc = data;
+        
+        ENTRY;
+        CERROR("(not yet) starting recovery of client %p\n", desc->b_client);
+        RETURN(1);
+}
 
 /* Assumes caller has already pushed into the kernel filesystem context */
 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
@@ -54,6 +62,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
         struct mds_obd *mds = mds_req2mds(req);
         struct ptlrpc_bulk_desc *desc;
         struct ptlrpc_bulk_page *bulk;
+        struct l_wait_info lwi;
         char *buf;
         ENTRY;
 
@@ -90,9 +99,13 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
                 GOTO(cleanup_buf, rc);
         }
 
-        wait_event(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
-        if (desc->b_flags & PTL_RPC_FL_INTR)
-                GOTO(cleanup_buf, rc = -EINTR);
+        lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc);
+        rc = l_wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_SENT, &lwi);
+        if (rc) {
+                if (rc != -ETIMEDOUT)
+                        LBUG();
+                GOTO(cleanup_buf, rc);
+        }
 
         EXIT;
  cleanup_buf:
@@ -277,9 +290,7 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
                         RETURN(0);
                 }
         }
-
-#warning shaver: we might need a real cluuid here
-        rc = class_connect(conn, obd, NULL);
+        rc = class_connect(conn, obd, cluuid);
         if (rc)
                 GOTO(out_dec, rc);
         exp = class_conn2export(conn);
index 9da3866..d140511 100644 (file)
@@ -46,7 +46,11 @@ struct semaphore obd_conf_sem;   /* serialize configuration commands */
 struct obd_device obd_dev[MAX_OBD_DEVICES];
 struct list_head obd_types;
 unsigned long obd_memory = 0;
+
+/* The following are visible and mutable through /proc/sys/lustre/. */
 unsigned long obd_fail_loc = 0;
+unsigned long obd_timeout = 100;
+char obd_recovery_upcall[128] = "/usr/lib/lustre/ha_assist";
 
 extern struct obd_type *class_nm_to_type(char *nm);
 
@@ -573,6 +577,8 @@ EXPORT_SYMBOL(obd_dev);
 EXPORT_SYMBOL(obdo_cachep);
 EXPORT_SYMBOL(obd_memory);
 EXPORT_SYMBOL(obd_fail_loc);
+EXPORT_SYMBOL(obd_timeout);
+EXPORT_SYMBOL(obd_recovery_upcall);
 
 EXPORT_SYMBOL(class_register_type);
 EXPORT_SYMBOL(class_unregister_type);
index 57ae735..8e74aab 100644 (file)
@@ -54,11 +54,12 @@ static int obd_sctl_reset( ctl_table * table, int write, struct file
 
 #define OBD_FAIL_LOC        1       /* control test failures instrumentation */
 #define OBD_ENTRY           2       /* control enter/leave pattern */
-#define OBD_TIMEOUT         3       /* timeout on upcalls to become intrble */
-#define OBD_HARD            4       /* mount type "hard" or "soft" */
-#define OBD_VARS            5
-#define OBD_INDEX           6
-#define OBD_RESET           7
+#define OBD_VARS            3
+#define OBD_INDEX           4
+#define OBD_RESET           5
+#define OBD_TIMEOUT         6       /* RPC timeout before recovery/intr */
+/* XXX move to /proc/sys/lustre/recovery? */
+#define OBD_UPCALL          7       /* path to recovery upcall */
 
 #define OBD_VARS_SLOT       2
 
@@ -67,6 +68,10 @@ static ctl_table obd_table[] = {
         {OBD_VARS, "vars", &vars[0], sizeof(int), 0644, NULL, &proc_dointvec},
         {OBD_INDEX, "index", &index, sizeof(int), 0644, NULL, &obd_sctl_vars},
         {OBD_RESET, "reset", NULL, 0, 0644, NULL, &obd_sctl_reset},
+        {OBD_TIMEOUT, "timeout", &obd_timeout, sizeof(int), 0644, NULL, &proc_dointvec},
+        /* XXX need to lock so we avoid update races with the recovery upcall! */
+        {OBD_UPCALL, "recovery_upcall", obd_recovery_upcall, 128, 0644, NULL,
+         &proc_dostring, &sysctl_string },
        { 0 }
 };
 
index 9bb2c04..152b103 100644 (file)
@@ -26,6 +26,7 @@
 #include <linux/obd_lov.h>
 #include <linux/init.h>
 #include <linux/lustre_ha.h>
+#include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
 
 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa, 
                        struct lov_stripe_md *md)
@@ -407,25 +408,30 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
          *
          * On error, we never do the brw_finish, so we handle all decrefs.
          */
-        rc = ptlrpc_register_bulk(desc);
-        if (rc)
-                GOTO(out_unmap, rc);
+        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
+                CERROR("obd_fail_loc=%x, skipping register_bulk\n",
+                       OBD_FAIL_OSC_BRW_READ_BULK);
+        } else {
+                rc = ptlrpc_register_bulk(desc);
+                if (rc)
+                        GOTO(out_unmap, rc);
+        }
 
         request->rq_replen = lustre_msg_size(1, size);
         rc = ptlrpc_queue_wait(request);
         rc = ptlrpc_check_status(request, rc);
 
-        /* XXX: Mike, this is the only place I'm not sure of.  If we have
-         *      an error here, will we have always called brw_finish?  If no,
-         *      then out_req will not clean up and we should go to out_desc.
-         *      If maybe, then we are screwed, and we need to set things up
-         *      so that bulk_sink_callback is called for each bulk page,
-         *      even on error so brw_finish is always called.  It would need
-         *      to be passed an error code as a parameter to know what to do.
-         *
-         *      That would also help with the partial completion case, so
-         *      we could say in brw_finish "these pages are done, don't
-         *      restart them" and osc_brw callers can know this.
+        /*
+         * XXX: If there is an error during the processing of the callback,
+         *      such as a timeout in a sleep that it performs, brw_finish
+         *      will never get called, and we'll leak the desc, fail to kunmap
+         *      things, cats will live with dogs.  One solution would be to
+         *      export brw_finish as osc_brw_finish, so that the timeout case and
+         *      its kin could call it for proper cleanup.  An alternative would
+         *      be for an error return from the callback to cause us to clean up,
+         *      but that doesn't help the truly async cases (like LOV), which
+         *      will immediately return from their PHASE_START callback, before
+         *      any such cleanup-requiring error condition can be detected.
          */
         if (rc)
                 GOTO(out_req, rc);
@@ -541,6 +547,9 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
         if (desc->b_page_count != page_count)
                 LBUG();
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
+                GOTO(out_unmap, rc = 0);
+
         /* Our reference is released when brw_finish is complete. */
         rc = ptlrpc_send_bulk(desc);
 
index ccbf640..9792e6d 100644 (file)
@@ -196,6 +196,15 @@ static int ost_setattr(struct ptlrpc_request *req)
         RETURN(0);
 }
 
+static int ost_bulk_timeout(void *data)
+{
+        struct ptlrpc_bulk_desc *desc = data;
+
+        ENTRY;
+        CERROR("(not yet) starting recovery of client %p\n", desc->b_client);
+        RETURN(1);
+}
+
 static int ost_brw_read(struct ptlrpc_request *req)
 {
         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
@@ -205,6 +214,7 @@ static int ost_brw_read(struct ptlrpc_request *req)
         struct niobuf_local *local_nb = NULL;
         struct obd_ioobj *ioo;
         struct ost_body *body;
+        struct l_wait_info lwi;
         int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
         ENTRY;
 
@@ -216,6 +226,9 @@ static int ost_brw_read(struct ptlrpc_request *req)
         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
         cmd = OBD_BRW_READ;
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
+                GOTO(out, rc = 0);
+
         for (i = 0; i < objcount; i++) {
                 ost_unpack_ioo(&tmp1, &ioo);
                 if (tmp2 + ioo->ioo_bufcnt > end2) {
@@ -226,12 +239,9 @@ static int ost_brw_read(struct ptlrpc_request *req)
                         ost_unpack_niobuf(&tmp2, &remote_nb);
         }
 
-        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
-        if (rc)
-                RETURN(rc);
         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
         if (local_nb == NULL)
-                RETURN(-ENOMEM);
+                GOTO(out, rc = -ENOMEM);
 
         /* The unpackers move tmp1 and tmp2, so reset them before using */
         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
@@ -240,7 +250,7 @@ static int ost_brw_read(struct ptlrpc_request *req)
                                     tmp1, niocount, tmp2, local_nb, NULL);
 
         if (req->rq_status)
-                GOTO(out_local, 0);
+                GOTO(out, 0);
 
         desc = ptlrpc_prep_bulk(req->rq_connection);
         if (desc == NULL)
@@ -262,10 +272,12 @@ static int ost_brw_read(struct ptlrpc_request *req)
         if (rc)
                 GOTO(out_bulk, rc);
 
-#warning OST must time out here.
-        wait_event(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
-        if (desc->b_flags & PTL_RPC_FL_INTR)
-                rc = -EINTR;
+        lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
+        rc = l_wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_SENT, &lwi);
+        if (rc) {
+                LASSERT(rc == -ETIMEDOUT);
+                GOTO(out_bulk, rc);
+        }
 
         /* The unpackers move tmp1 and tmp2, so reset them before using */
         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
@@ -273,6 +285,8 @@ static int ost_brw_read(struct ptlrpc_request *req)
         req->rq_status = obd_commitrw(cmd, conn, objcount,
                                       tmp1, niocount, local_nb, NULL);
 
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+
 out_bulk:
         ptlrpc_free_bulk(desc);
 out_local:
@@ -298,6 +312,7 @@ static int ost_brw_write(struct ptlrpc_request *req)
         void *desc_priv = NULL;
         int reply_sent = 0;
         struct ptlrpc_service *srv;
+        struct l_wait_info lwi;
         __u32 xid;
         ENTRY;
 
@@ -381,8 +396,13 @@ static int ost_brw_write(struct ptlrpc_request *req)
         reply_sent = 1;
         ptlrpc_reply(req->rq_svc, req);
 
-#warning OST must time out here.
-        wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_RCVD);
+        lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
+        rc = l_wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_RCVD, &lwi);
+        if (rc) {
+                if (rc != -ETIMEDOUT)
+                        LBUG();
+                GOTO(fail_bulk, rc);
+        }
 
         rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
                           desc->b_desc_private);
index f276238..c775a5e 100644 (file)
@@ -33,7 +33,6 @@ void ptlrpc_init_client(struct recovd_obd *recovd,
                         int rep_portal, struct ptlrpc_client *cl)
 {
         memset(cl, 0, sizeof(*cl));
-        cl->cli_recovd = recovd;
         cl->cli_recover = recover;
         if (recovd)
                 recovd_cli_manage(recovd, cl);
@@ -486,21 +485,23 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                 list_add_tail(&req->rq_list, &cli->cli_delayed_head);
                 spin_unlock(&cli->cli_lock);
 
-#warning shaver: what happens when we get interrupted during this wait?
-                lwi = LWI_INTR(SIGTERM | SIGKILL | SIGINT, NULL, NULL);
-                l_wait_event(req->rq_wait_for_rep,
-                             req->rq_level <= req->rq_connection->c_level,
-                             &lwi);
+                lwi = LWI_INTR(NULL, NULL);
+                rc = l_wait_event(req->rq_wait_for_rep,
+                                  req->rq_level <= req->rq_connection->c_level,
+                                  &lwi);
 
                 spin_lock(&cli->cli_lock);
                 list_del_init(&req->rq_list);
                 spin_unlock(&cli->cli_lock);
+                
+                if (rc)
+                        RETURN(rc);
 
                 CERROR("process %d resumed\n", current->pid);
         }
  resend:
         req->rq_time = CURRENT_TIME;
-        req->rq_timeout = 100;
+        req->rq_timeout = obd_timeout;
         rc = ptl_send_rpc(req);
         if (rc) {
                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
@@ -518,8 +519,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
 
         CDEBUG(D_OTHER, "-- sleeping\n");
         lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
-                               SIGKILL | SIGTERM | SIGINT, interrupted_request,
-                               req);
+                               interrupted_request,req);
         l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
         CDEBUG(D_OTHER, "-- done\n");
 
@@ -570,7 +570,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
 {
         int rc = 0;
         struct ptlrpc_client *cli = req->rq_client;
-        struct l_wait_info lwi = LWI_INTR(SIGKILL|SIGTERM|SIGINT, NULL, NULL);
+        struct l_wait_info lwi;
         ENTRY;
 
         init_waitqueue_head(&req->rq_wait_for_rep);
@@ -579,7 +579,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
                req->rq_connection->c_level);
 
         req->rq_time = CURRENT_TIME;
-        req->rq_timeout = 100;
+        req->rq_timeout = obd_timeout;
         rc = ptl_send_rpc(req);
         if (rc) {
                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
@@ -589,6 +589,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         }
 
         CDEBUG(D_OTHER, "-- sleeping\n");
+        lwi = LWI_INTR(NULL, NULL);
         l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
         CDEBUG(D_OTHER, "-- done\n");
 
index 0f7c955..3933160 100644 (file)
@@ -30,38 +30,6 @@ extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq,
         bulk_source_eq, bulk_sink_eq;
 static ptl_process_id_t local_id = {PTL_NID_ANY, PTL_PID_ANY};
 
-int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *desc)
-{
-        ENTRY;
-
-        if (desc->b_flags & PTL_BULK_FL_SENT)
-                RETURN(1);
-
-        if (l_killable_pending(current)) {
-                desc->b_flags |= PTL_RPC_FL_INTR;
-                RETURN(1);
-        }
-
-        CDEBUG(D_NET, "no event yet\n");
-        RETURN(0);
-}
-
-int ptlrpc_check_bulk_received(struct ptlrpc_bulk_desc *desc)
-{
-        ENTRY;
-
-        if (desc->b_flags & PTL_BULK_FL_RCVD)
-                RETURN(1);
-
-        if (l_killable_pending(current)) {
-                desc->b_flags |= PTL_RPC_FL_INTR;
-                RETURN(1);
-        }
-
-        CDEBUG(D_NET, "no event yet\n");
-        RETURN(0);
-}
-
 static int ptl_send_buf(struct ptlrpc_request *request,
                         struct ptlrpc_connection *conn, int portal)
 {
index 1c8037c..194e2b4 100644 (file)
@@ -19,6 +19,7 @@
 #include <linux/kmod.h>
 #include <linux/lustre_lite.h>
 #include <linux/lustre_ha.h>
+#include <linux/obd_support.h>
 
 struct recovd_obd *ptlrpc_connmgr;
 
@@ -60,7 +61,7 @@ static int recovd_upcall(void)
         char *argv[2];
         char *envp[3];
 
-        argv[0] = "/usr/src/obd/utils/ha_assist.sh";
+        argv[0] = obd_recovery_upcall;
         argv[1] = NULL;
 
         envp [0] = "HOME=/";
index e0c9414..2459760 100644 (file)
@@ -146,10 +146,6 @@ static void __exit ptlrpc_exit(void)
         ptlrpc_cleanup_connection();
 }
 
-/* events.c */
-EXPORT_SYMBOL(ptlrpc_check_bulk_sent);
-EXPORT_SYMBOL(ptlrpc_check_bulk_received);
-
 /* connmgr.c */
 EXPORT_SYMBOL(ptlrpc_connmgr);
 EXPORT_SYMBOL(connmgr_connect);