Whamcloud - gitweb
b=17103,i=nikita,i=green,i=isaac:
authorisaac <isaac>
Thu, 15 Oct 2009 04:05:36 +0000 (04:05 +0000)
committerisaac <isaac>
Thu, 15 Oct 2009 04:05:36 +0000 (04:05 +0000)
- Rewrite ptlrpc_queue_wait() using sets (from rread).

lustre/include/lustre_lib.h
lustre/ptlrpc/client.c
lustre/tests/conf-sanity.sh

index d5547ce..e0e26fa 100644 (file)
@@ -606,6 +606,7 @@ static inline void obd_ioctl_freedata(char *buf, int len)
 struct l_wait_info {
         cfs_duration_t lwi_timeout;
         cfs_duration_t lwi_interval;
+        int            lwi_allow_intr;
         int  (*lwi_on_timeout)(void *);
         void (*lwi_on_signal)(void *);
         void  *lwi_cb_data;
@@ -617,7 +618,8 @@ struct l_wait_info {
         .lwi_timeout    = time,                 \
         .lwi_on_timeout = cb,                   \
         .lwi_cb_data    = data,                 \
-        .lwi_interval   = 0                     \
+        .lwi_interval   = 0,                    \
+        .lwi_allow_intr = 0                     \
 })
 
 #define LWI_TIMEOUT_INTERVAL(time, interval, cb, data)  \
@@ -625,16 +627,28 @@ struct l_wait_info {
         .lwi_timeout    = time,                         \
         .lwi_on_timeout = cb,                           \
         .lwi_cb_data    = data,                         \
-        .lwi_interval   = interval                      \
+        .lwi_interval   = interval,                     \
+        .lwi_allow_intr = 0                             \
 })
 
 #define LWI_TIMEOUT_INTR(time, time_cb, sig_cb, data)   \
 ((struct l_wait_info) {                                 \
         .lwi_timeout    = time,                         \
         .lwi_on_timeout = time_cb,                      \
-        .lwi_on_signal = sig_cb,                        \
+        .lwi_on_signal  = sig_cb,                       \
         .lwi_cb_data    = data,                         \
-        .lwi_interval    = 0                            \
+        .lwi_interval   = 0,                            \
+        .lwi_allow_intr = 0                             \
+})
+
+#define LWI_TIMEOUT_INTR_ALL(time, time_cb, sig_cb, data)       \
+((struct l_wait_info) {                                         \
+        .lwi_timeout    = time,                                 \
+        .lwi_on_timeout = time_cb,                              \
+        .lwi_on_signal  = sig_cb,                               \
+        .lwi_cb_data    = data,                                 \
+        .lwi_interval   = 0,                                    \
+        .lwi_allow_intr = 1                                     \
 })
 
 #define LWI_INTR(cb, data)  LWI_TIMEOUT_INTR(0, NULL, cb, data)
@@ -650,6 +664,7 @@ do {                                                                           \
         cfs_waitlink_t __wait;                                                 \
         cfs_duration_t __timeout = info->lwi_timeout;                          \
         cfs_sigset_t   __blocked;                                              \
+        int   __allow_intr = info->lwi_allow_intr;                             \
                                                                                \
         ret = 0;                                                               \
         if (condition)                                                         \
@@ -662,7 +677,7 @@ do {                                                                           \
                 cfs_waitq_add(&wq, &__wait);                                   \
                                                                                \
         /* Block all signals (just the non-fatal ones if no timeout). */       \
-        if (info->lwi_on_signal != NULL && __timeout == 0)                     \
+        if (info->lwi_on_signal != NULL && (__timeout == 0 || __allow_intr))   \
                 __blocked = l_w_e_set_sigs(LUSTRE_FATAL_SIGS);                 \
         else                                                                   \
                 __blocked = l_w_e_set_sigs(0);                                 \
@@ -700,7 +715,8 @@ do {                                                                           \
                 if (condition)                                                 \
                         break;                                                 \
                 if (cfs_signal_pending()) {                                    \
-                        if (info->lwi_on_signal != NULL && __timeout == 0) {   \
+                        if (info->lwi_on_signal != NULL &&                     \
+                            (__timeout == 0 || __allow_intr)) {                \
                                 if (info->lwi_on_signal != LWI_ON_SIGNAL_NOOP) \
                                         info->lwi_on_signal(info->lwi_cb_data);\
                                 ret = -EINTR;                                  \
index 5b28d32..20b575e 100644 (file)
@@ -915,44 +915,6 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
         RETURN(delay);
 }
 
-static int ptlrpc_check_reply(struct ptlrpc_request *req)
-{
-        int rc = 0;
-        ENTRY;
-
-        /* serialise with network callback */
-        spin_lock(&req->rq_lock);
-
-        if (ptlrpc_client_replied(req))
-                GOTO(out, rc = 1);
-
-        if (req->rq_net_err && !req->rq_timedout) {
-                spin_unlock(&req->rq_lock);
-                rc = ptlrpc_expire_one_request(req, 0);
-                spin_lock(&req->rq_lock);
-                GOTO(out, rc);
-        }
-
-        if (req->rq_err)
-                GOTO(out, rc = 1);
-
-        if (req->rq_resend)
-                GOTO(out, rc = 1);
-
-        if (req->rq_restart)
-                GOTO(out, rc = 1);
-
-        if (ptlrpc_client_early(req)) {
-                ptlrpc_at_recv_early_reply(req);
-                GOTO(out, rc = 0); /* keep waiting */
-        }
-
-        EXIT;
- out:
-        spin_unlock(&req->rq_lock);
-        DEBUG_REQ(D_NET, req, "rc = %d for", rc);
-        return rc;
-}
 
 /* Conditionally suppress specific console messages */
 static int ptlrpc_console_allow(struct ptlrpc_request *req)
@@ -978,7 +940,6 @@ static int ptlrpc_console_allow(struct ptlrpc_request *req)
         return 1;
 }
 
-
 static int ptlrpc_check_status(struct ptlrpc_request *req)
 {
         int err;
@@ -1342,11 +1303,15 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                         GOTO(interpret, req->rq_status);
                 }
 
-                /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
-                 * will only be set after rq_timedout, but the synchronous IO
-                 * waiting path sets rq_intr irrespective of whether ptlrpcd
-                 * has seen a timeout.  our policy is to only interpret
-                 * interrupted rpcs after they have timed out */
+                /* ptlrpc_set_wait->l_wait_event sets lwi_allow_intr
+                 * so it sets rq_intr regardless of individual rpc
+                 * timeouts. The synchronous IO waiting path sets 
+                 * rq_intr irrespective of whether ptlrpcd
+                 * has seen a timeout.  Our policy is to only interpret
+                 * interrupted rpcs after they have timed out, so we
+                 * need to enforce that here.
+                 */
+
                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting ||
                                      req->rq_wait_ctx)) {
                         req->rq_status = -EINTR;
@@ -1757,11 +1722,27 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                  * req times out */
                 CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n",
                        set, timeout);
-                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout ? timeout : 1),
-                                       ptlrpc_expired_set,
-                                       ptlrpc_interrupted_set, set);
-                rc = l_wait_event(set->set_waitq,
-                                  ptlrpc_check_set(NULL, set), &lwi);
+
+                if (timeout == 0 && !cfs_signal_pending())
+                        /*
+                         * No requests are in-flight (ether timed out
+                         * or delayed), so we can allow interrupts.
+                         * We still want to block for a limited time,
+                         * so we allow interrupts during the timeout.
+                         */
+                        lwi = LWI_TIMEOUT_INTR_ALL(cfs_time_seconds(1), 
+                                                   ptlrpc_expired_set,
+                                                   ptlrpc_interrupted_set, set);
+                else
+                        /*
+                         * At least one request is in flight, so no
+                         * interrupts are allowed. Wait until all
+                         * complete, or an in-flight req times out. 
+                         */
+                        lwi = LWI_TIMEOUT(cfs_time_seconds(timeout? timeout : 1),
+                                          ptlrpc_expired_set, set);
+
+                rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi);
 
                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
 
@@ -2098,35 +2079,6 @@ void ptlrpc_restart_req(struct ptlrpc_request *req)
         spin_unlock(&req->rq_lock);
 }
 
-static int expired_request(void *data)
-{
-        struct ptlrpc_request *req = data;
-        ENTRY;
-
-        /*
-         * Some failure can suspend regular timeouts.
-         */
-        if (ptlrpc_check_suspend())
-                RETURN(1);
-
-        /*
-         * Deadline may have changed with an early reply.
-         */
-        if (req->rq_deadline > cfs_time_current_sec())
-                RETURN(1);
-
-        RETURN(ptlrpc_expire_one_request(req, 0));
-}
-
-static void interrupted_request(void *data)
-{
-        struct ptlrpc_request *req = data;
-        DEBUG_REQ(D_HA, req, "request interrupted");
-        spin_lock(&req->rq_lock);
-        req->rq_intr = 1;
-        spin_unlock(&req->rq_lock);
-}
-
 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
 {
         ENTRY;
@@ -2187,225 +2139,28 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
 
 int ptlrpc_queue_wait(struct ptlrpc_request *req)
 {
-        int rc = 0;
-        int brc;
-        struct l_wait_info lwi;
-        struct obd_import *imp = req->rq_import;
-        cfs_duration_t timeout = CFS_TICK;
-        long timeoutl;
+        struct ptlrpc_request_set *set;
+        int rc;
         ENTRY;
 
         LASSERT(req->rq_set == NULL);
         LASSERT(!req->rq_receiving_reply);
 
-        /* for distributed debugging */
-        lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid());
-        LASSERT(imp->imp_obd != NULL);
-        CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc "
-               "%s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
-               imp->imp_obd->obd_uuid.uuid,
-               lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
-               libcfs_nid2str(imp->imp_connection->c_peer.nid),
-               lustre_msg_get_opc(req->rq_reqmsg));
-
-        /* Mark phase here for a little debug help */
-        ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
-
-        spin_lock(&imp->imp_lock);
-        req->rq_import_generation = imp->imp_generation;
-restart:
-        if (ptlrpc_import_delay_req(imp, req, &rc)) {
-                list_del_init(&req->rq_list);
-                list_add_tail(&req->rq_list, &imp->imp_delayed_list);
-                atomic_inc(&imp->imp_inflight);
-                spin_unlock(&imp->imp_lock);
-
-                DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
-                          cfs_curproc_comm(),
-                          ptlrpc_import_state_name(req->rq_send_state),
-                          ptlrpc_import_state_name(imp->imp_state));
-                lwi = LWI_INTR(interrupted_request, req);
-                rc = l_wait_event(req->rq_reply_waitq,
-                                  (req->rq_send_state == imp->imp_state ||
-                                   req->rq_err || req->rq_intr),
-                                  &lwi);
-                DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d/%d == 1)",
-                          cfs_curproc_comm(),
-                          ptlrpc_import_state_name(imp->imp_state),
-                          ptlrpc_import_state_name(req->rq_send_state),
-                          req->rq_err, req->rq_intr);
-
-                spin_lock(&imp->imp_lock);
-                list_del_init(&req->rq_list);
-                atomic_dec(&imp->imp_inflight);
-
-                if (req->rq_err) {
-                        /* rq_status was set locally */
-                        rc = req->rq_status ? req->rq_status : -EIO;
-                }
-                else if (req->rq_intr) {
-                        rc = -EINTR;
-                }
-                else if (req->rq_no_resend) {
-                        rc = -ETIMEDOUT;
-                } else {
-                        GOTO(restart, rc);
-                }
-        }
-
-        if (rc != 0) {
-                spin_unlock(&imp->imp_lock);
-                req->rq_status = rc; // XXX this ok?
-                GOTO(out, rc);
-        }
-
-        if (req->rq_resend) {
-                if (req->rq_bulk != NULL) {
-                        ptlrpc_unregister_bulk(req, 0);
-
-                        /* bulk requests are supposed to be
-                         * idempotent, so we are free to bump the xid
-                         * here, which we need to do before
-                         * registering the bulk again (bug 6371).
-                         * print the old xid first for sanity.
-                         */
-                        DEBUG_REQ(D_HA, req, "bumping xid for bulk: ");
-                        req->rq_xid = ptlrpc_next_xid();
-                }
-
-                DEBUG_REQ(D_HA, req, "resending: ");
-        }
-
-        /* XXX this is the same as ptlrpc_set_wait */
-        LASSERT(list_empty(&req->rq_list));
-        list_add_tail(&req->rq_list, &imp->imp_sending_list);
-        atomic_inc(&imp->imp_inflight);
-        spin_unlock(&imp->imp_lock);
-
-        rc = sptlrpc_req_refresh_ctx(req, 0);
-        if (rc) {
-                if (req->rq_err) {
-                        /* we got fatal ctx refresh error, directly jump out
-                         * thus we can pass back the actual error code.
-                         */
-                        spin_lock(&imp->imp_lock);
-                        list_del_init(&req->rq_list);
-                        atomic_dec(&imp->imp_inflight);
-                        spin_unlock(&imp->imp_lock);
-
-                        CERROR("Failed to refresh ctx of req %p: %d\n",
-                               req, rc);
-                        GOTO(out, rc);
-                }
-                /* simulating we got error during send rpc */
-                goto after_send;
-        }
-
-        rc = ptl_send_rpc(req, 0);
-        if (rc)
-                DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
-repeat:
-        timeoutl = req->rq_deadline - cfs_time_current_sec();
-        timeout = (timeoutl <= 0 || rc) ? CFS_TICK :
-                cfs_time_seconds(timeoutl);
-        DEBUG_REQ(D_NET, req,
-                  "-- sleeping for "CFS_DURATION_T" ticks", timeout);
-        lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
-                               req);
-        brc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
-        if (brc == -ETIMEDOUT && ((req->rq_deadline > cfs_time_current_sec()) ||
-                                 ptlrpc_check_and_wait_suspend(req)))
-                goto repeat;
-
-after_send:
-        CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:opc "
-               "%s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
-               imp->imp_obd->obd_uuid.uuid,
-               lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
-               libcfs_nid2str(imp->imp_connection->c_peer.nid),
-               lustre_msg_get_opc(req->rq_reqmsg));
-
-        /* If the reply was received normally, this just grabs the spinlock
-         * (ensuring the reply callback has returned), sees that
-         * req->rq_receiving_reply is clear and returns. */
-        ptlrpc_unregister_reply(req, 0);
-
-        spin_lock(&imp->imp_lock);
-        list_del_init(&req->rq_list);
-        atomic_dec(&imp->imp_inflight);
-        spin_unlock(&imp->imp_lock);
-
-        if (req->rq_err) {
-                DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d",
-                          rc, req->rq_status);
-                rc = rc ? rc : req->rq_status;
-                GOTO(out, rc = rc ? rc : -EIO);
-        }
-
-        if (req->rq_intr) {
-                /* Should only be interrupted if we timed out. */
-                if (!req->rq_timedout)
-                        DEBUG_REQ(D_ERROR, req,
-                                  "rq_intr set but rq_timedout not");
-                GOTO(out, rc = -EINTR);
-        }
-
-        /* Resend if we need to */
-        if (req->rq_resend||req->rq_timedout) {
-                /* ...unless we were specifically told otherwise. */
-                if (req->rq_no_resend)
-                        GOTO(out, rc = -ETIMEDOUT);
-                spin_lock(&imp->imp_lock);
-                /* we can have rq_timeout on dlm fake import which not support
-                 * recovery - but me need resend request on this import instead
-                 * of return error */
-                req->rq_resend = 1;
-                goto restart;
-        }
-
-        if (!ptlrpc_client_replied(req)) {
-                /* How can this be? -eeb */
-                DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
-                LBUG();
-                GOTO(out, rc = req->rq_status);
+        set = ptlrpc_prep_set();
+        if (set == NULL) {
+                CERROR("Unable to allocate ptlrpc set.");
+                RETURN(-ENOMEM);
         }
 
-        rc = after_reply(req);
-        /* NB may return +ve success rc */
-        if (req->rq_resend) {
-                spin_lock(&imp->imp_lock);
-                goto restart;
-        }
+        /* for distributed debugging */
+        lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid());
 
- out:
-        if (req->rq_bulk != NULL) {
-                if (rc >= 0) {
-                        /* success so far.  Note that anything going wrong
-                         * with bulk now, is EXTREMELY strange, since the
-                         * server must have believed that the bulk
-                         * transferred OK before she replied with success to
-                         * me. */
-                        lwi = LWI_TIMEOUT(timeout, NULL, NULL);
-                        brc = l_wait_event(req->rq_reply_waitq,
-                                           !ptlrpc_client_bulk_active(req),
-                                           &lwi);
-                        LASSERT(brc == 0 || brc == -ETIMEDOUT);
-                        if (brc != 0) {
-                                LASSERT(brc == -ETIMEDOUT);
-                                DEBUG_REQ(D_ERROR, req, "bulk timed out");
-                                rc = brc;
-                        } else if (!req->rq_bulk->bd_success) {
-                                DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
-                                rc = -EIO;
-                        }
-                }
-                if (rc < 0)
-                        ptlrpc_unregister_bulk(req, 0);
-        }
+        /* add a ref for the set (see comment in ptlrpc_set_add_req) */
+        ptlrpc_request_addref(req);
+        ptlrpc_set_add_req(set, req);
+        rc = ptlrpc_set_wait(set);
+        ptlrpc_set_destroy(set);
 
-        LASSERT(!req->rq_receiving_reply);
-        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
-        cfs_waitq_broadcast(&imp->imp_recovery_waitq);
         RETURN(rc);
 }
 
index cb2f716..e300c28 100644 (file)
@@ -656,8 +656,8 @@ test_23a() {        # was test_23
                echo "waiting for mount to finish ... "
                WAIT=$(( WAIT + sleep))
        done
-       [ "$WAIT" -eq "$MAX_WAIT" ] && error "MOUNT_PID $MOUNT_PID and \
-               MOUNT__LUSTRE_PID $MOUNT__LUSTRE_PID still not killed in $WAIT secs"
+       [ "$WAIT" -eq "$MAX_WAIT" ] && error "MOUNT_PID $MOUNT_PID and "\
+               "MOUNT_LUSTRE_PID $MOUNT_LUSTRE_PID still not killed in $WAIT secs"
        ps -ef | grep mount
        stop_mds || error
        stop_ost || error