Whamcloud - gitweb
Landing b_recovery
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 6f3ae1b..12cf867 100644 (file)
@@ -43,11 +43,6 @@ void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
         cl->cli_name           = name;
 }
 
-struct obd_uuid *ptlrpc_req_to_uuid(struct ptlrpc_request *req)
-{
-        return &req->rq_connection->c_remote_uuid;
-}
-
 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
 {
         struct ptlrpc_connection *c;
@@ -246,10 +241,9 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
         request->rq_request_portal = imp->imp_client->cli_request_portal;
         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
 
-        request->rq_connection = ptlrpc_connection_addref(imp->imp_connection);
-
         spin_lock_init(&request->rq_lock);
         INIT_LIST_HEAD(&request->rq_list);
+        INIT_LIST_HEAD(&request->rq_replay_list);
         init_waitqueue_head(&request->rq_reply_waitq);
         request->rq_xid = ptlrpc_next_xid();
         atomic_set(&request->rq_refcount, 1);
@@ -372,11 +366,14 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
         LASSERT (status != NULL);
         *status = 0;
 
-        /* A new import, or one that has been cleaned up.
-         */
         if (imp->imp_state == LUSTRE_IMP_NEW) {
                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
                 *status = -EIO;
+                LBUG();
+        }
+        else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
+                DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
+                *status = -EIO;
         }
         /*
          * If the import has been invalidated (such as by an OST failure), the
@@ -442,7 +439,8 @@ static int ptlrpc_check_status(struct ptlrpc_request *req)
 
         err = req->rq_repmsg->status;
         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
-                DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR");
+                DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
+                          err);
                 RETURN(err < 0 ? err : -EINVAL);
         }
 
@@ -456,7 +454,7 @@ static int ptlrpc_check_status(struct ptlrpc_request *req)
         RETURN(err);
 }
 
-static int after_reply(struct ptlrpc_request *req, int *restartp)
+static int after_reply(struct ptlrpc_request *req)
 {
         unsigned long flags;
         struct obd_import *imp = req->rq_import;
@@ -466,9 +464,6 @@ static int after_reply(struct ptlrpc_request *req, int *restartp)
         LASSERT(!req->rq_receiving_reply);
         LASSERT(req->rq_replied);
 
-        if (restartp != NULL)
-                *restartp = 0;
-
         /* NB Until this point, the whole of the incoming message,
          * including buflens, status etc is in the sender's byte order. */
 
@@ -505,26 +500,7 @@ static int after_reply(struct ptlrpc_request *req, int *restartp)
 
                 ptlrpc_request_handle_notconn(req);
 
-                if (req->rq_err)
-                        RETURN(-EIO);
-
-                if (req->rq_no_resend)
-                        RETURN(rc); /* -ENOTCONN */
-
-                if (req->rq_resend) {
-                        if (restartp == NULL)
-                                LBUG(); /* async resend not supported yet */
-                        spin_lock_irqsave (&req->rq_lock, flags);
-                        req->rq_resend = 0;
-                        spin_unlock_irqrestore (&req->rq_lock, flags);
-                        *restartp = 1;
-                        lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
-                        DEBUG_REQ(D_HA, req, "resending: ");
-                        RETURN(0);
-                }
-
-                CERROR("request should be err or resend: %p\n", req);
-                LBUG();
+                RETURN(rc);
         }
 
         if (req->rq_import->imp_replayable) {
@@ -555,7 +531,6 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
         int rc;
         ENTRY;
 
-        LASSERT(req->rq_send_state == LUSTRE_IMP_FULL);
         LASSERT(req->rq_phase == RQ_PHASE_NEW);
         req->rq_phase = RQ_PHASE_RPC;
 
@@ -681,7 +656,6 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                 }
 
                 if (req->rq_phase == RQ_PHASE_RPC) {
-                        int do_restart = 0;
                         if (req->rq_waiting || req->rq_resend) {
                                 int status;
                                 spin_lock_irqsave(&imp->imp_lock, flags);
@@ -709,11 +683,6 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                                 if (req->rq_resend) {
                                         lustre_msg_add_flags(req->rq_reqmsg,
                                                              MSG_RESENT);
-                                        spin_lock_irqsave(&req->rq_lock, flags);
-                                        req->rq_resend = 0;
-                                        spin_unlock_irqrestore(&req->rq_lock,
-                                                               flags);
-
                                         ptlrpc_unregister_reply(req);
                                         if (req->rq_bulk) {
                                                 __u64 old_xid = req->rq_xid;
@@ -750,11 +719,15 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                         list_del_init(&req->rq_list);
                         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
-                        req->rq_status = after_reply(req, &do_restart);
-                        if (do_restart) {
+                        req->rq_status = after_reply(req);
+                        if (req->rq_resend) {
+                                /* Add this req to the delayed list so
+                                   it can be errored if the import is
+                                   evicted after recovery. */
                                 spin_lock_irqsave (&req->rq_lock, flags);
-                                req->rq_resend = 1; /* ugh */
-                                spin_unlock_irqrestore (&req->rq_lock, flags);
+                                list_add_tail(&req->rq_list, 
+                                              &imp->imp_delayed_list);
+                                spin_unlock_irqrestore(&req->rq_lock, flags);
                                 continue;
                         }
 
@@ -785,6 +758,8 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                 if (req->rq_bulk != NULL)
                         ptlrpc_unregister_bulk (req);
 
+                req->rq_phase = RQ_PHASE_COMPLETE;
+
                 if (req->rq_interpret_reply != NULL) {
                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
                                 req->rq_interpret_reply;
@@ -800,7 +775,6 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                        imp->imp_connection->c_peer.peer_nid,
                        req->rq_reqmsg->opc);
 
-                req->rq_phase = RQ_PHASE_COMPLETE;
                 set->set_remaining--;
         }
 
@@ -832,9 +806,15 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req)
                 RETURN(1);
 
         /* If this request is for recovery or other primordial tasks,
-         * don't go back to sleep, and don't start recovery again.. */
-        if (req->rq_send_state != LUSTRE_IMP_FULL || imp->imp_obd->obd_no_recov)
+         * then error it out here. */
+        if (req->rq_send_state != LUSTRE_IMP_FULL || 
+            imp->imp_obd->obd_no_recov) {
+                spin_lock_irqsave (&req->rq_lock, flags);
+                req->rq_status = -ETIMEDOUT;
+                req->rq_err = 1;
+                spin_unlock_irqrestore (&req->rq_lock, flags);
                 RETURN(1);
+        }
 
         ptlrpc_fail_import(imp, req->rq_import_generation);
 
@@ -856,7 +836,8 @@ int ptlrpc_expired_set(void *data)
                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
 
                 /* request in-flight? */
-                if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
+                if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
+                       && !req->rq_resend) ||
                       (req->rq_phase == RQ_PHASE_BULK)))
                         continue;
 
@@ -1007,7 +988,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
                 unsigned long flags = 0;
                 if (!locked)
                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
-                list_del_init(&request->rq_list);
+                list_del_init(&request->rq_replay_list);
                 if (!locked)
                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
                                                flags);
@@ -1038,7 +1019,6 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
         if (request->rq_bulk != NULL)
                 ptlrpc_free_bulk(request->rq_bulk);
 
-        ptlrpc_put_connection(request->rq_connection);
         OBD_FREE(request, sizeof(*request));
         EXIT;
 }
@@ -1086,13 +1066,6 @@ void ptlrpc_req_finished(struct ptlrpc_request *request)
         __ptlrpc_req_finished(request, 0);
 }
 
-static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
-{
-        OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
-        request->rq_reqmsg = NULL;
-        request->rq_reqlen = 0;
-}
-
 /* Disengage the client's reply buffer from the network
  * NB does _NOT_ unregister any client-side bulk.
  * IDEMPOTENT, but _not_ safe against concurrent callers.
@@ -1181,7 +1154,7 @@ void ptlrpc_free_committed(struct obd_import *imp)
                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
 
         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
-                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
 
                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
                 LASSERT(req != last_req);
@@ -1208,7 +1181,7 @@ void ptlrpc_free_committed(struct obd_import *imp)
 free_req:
                 if (req->rq_commit_cb != NULL)
                         req->rq_commit_cb(req);
-                list_del_init(&req->rq_list);
+                list_del_init(&req->rq_replay_list);
                 __ptlrpc_req_finished(req, 1);
         }
 
@@ -1227,11 +1200,8 @@ void ptlrpc_resend_req(struct ptlrpc_request *req)
 {
         unsigned long flags;
 
-        DEBUG_REQ(D_HA, req, "resending");
+        DEBUG_REQ(D_HA, req, "going to resend");
         req->rq_reqmsg->handle.cookie = 0;
-        ptlrpc_put_connection(req->rq_connection);
-        req->rq_connection =
-                ptlrpc_connection_addref(req->rq_import->imp_connection);
         req->rq_status = -EAGAIN;
 
         spin_lock_irqsave (&req->rq_lock, flags);
@@ -1297,12 +1267,16 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
         LASSERT(spin_is_locked(&imp->imp_lock));
 #endif
 
+        /* don't re-add requests that have been replayed */
+        if (!list_empty(&req->rq_replay_list))
+                return;
+
         LASSERT(imp->imp_replayable);
         /* Balanced in ptlrpc_free_committed, usually. */
         ptlrpc_request_addref(req);
         list_for_each_prev(tmp, &imp->imp_replay_list) {
                 struct ptlrpc_request *iter =
-                        list_entry(tmp, struct ptlrpc_request, rq_list);
+                        list_entry(tmp, struct ptlrpc_request, rq_replay_list);
 
                 /* We may have duplicate transnos if we create and then
                  * open a file, or for closes retained if to match creating
@@ -1319,11 +1293,11 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
                                 continue;
                 }
 
-                list_add(&req->rq_list, &iter->rq_list);
+                list_add(&req->rq_replay_list, &iter->rq_replay_list);
                 return;
         }
 
-        list_add_tail(&req->rq_list, &imp->imp_replay_list);
+        list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
 }
 
 int ptlrpc_queue_wait(struct ptlrpc_request *req)
@@ -1333,7 +1307,6 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         struct l_wait_info lwi;
         struct obd_import *imp = req->rq_import;
         unsigned long flags;
-        int do_restart = 0;
         int timeout = 0;
         ENTRY;
 
@@ -1363,15 +1336,19 @@ restart:
                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
 
-                DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%d > %d)",
-                          current->comm, req->rq_send_state, imp->imp_state);
+                DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
+                          current->comm, 
+                          ptlrpc_import_state_name(req->rq_send_state), 
+                          ptlrpc_import_state_name(imp->imp_state));
                 lwi = LWI_INTR(interrupted_request, req);
                 rc = l_wait_event(req->rq_reply_waitq,
                                   (req->rq_send_state == imp->imp_state ||
                                    req->rq_err),
                                   &lwi);
-                DEBUG_REQ(D_HA, req, "\"%s\" awake: (%d > %d or %d == 1)",
-                          current->comm, imp->imp_state, req->rq_send_state,
+                DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
+                          current->comm, 
+                          ptlrpc_import_state_name(imp->imp_state), 
+                          ptlrpc_import_state_name(req->rq_send_state),
                           req->rq_err);
 
                 spin_lock_irqsave(&imp->imp_lock, flags);
@@ -1395,6 +1372,15 @@ restart:
                 GOTO(out, rc);
         }
 
+        if (req->rq_resend) {
+                lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
+
+                if (req->rq_bulk != NULL)
+                        ptlrpc_unregister_bulk (req);
+
+                DEBUG_REQ(D_HA, req, "resending: ");
+        }
+
         /* XXX this is the same as ptlrpc_set_wait */
         LASSERT(list_empty(&req->rq_list));
         list_add_tail(&req->rq_list, &imp->imp_sending_list);
@@ -1438,15 +1424,6 @@ restart:
                 /* ...unless we were specifically told otherwise. */
                 if (req->rq_no_resend)
                         GOTO(out, rc = -ETIMEDOUT);
-                spin_lock_irqsave (&req->rq_lock, flags);
-                req->rq_resend = 0;
-                spin_unlock_irqrestore (&req->rq_lock, flags);
-                lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
-
-                if (req->rq_bulk != NULL)
-                        ptlrpc_unregister_bulk (req);
-
-                DEBUG_REQ(D_HA, req, "resending: ");
                 spin_lock_irqsave(&imp->imp_lock, flags);
                 goto restart;
         }
@@ -1470,12 +1447,9 @@ restart:
                 GOTO(out, rc = req->rq_status);
         }
 
-        rc = after_reply (req, &do_restart);
+        rc = after_reply (req);
         /* NB may return +ve success rc */
-        if (do_restart) {
-                if (req->rq_bulk != NULL)
-                        ptlrpc_unregister_bulk (req);
-                DEBUG_REQ(D_HA, req, "resending: ");
+        if (req->rq_resend) {
                 spin_lock_irqsave(&imp->imp_lock, flags);
                 goto restart;
         }
@@ -1502,68 +1476,19 @@ restart:
         RETURN(rc);
 }
 
-int ptlrpc_replay_req(struct ptlrpc_request *req)
-{
-        int rc = 0, old_state, old_status = 0;
-        // struct ptlrpc_client *cli = req->rq_import->imp_client;
-        struct l_wait_info lwi;
-        ENTRY;
-
-        LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
-
-        /* I don't touch rq_phase here, so the debug log can show what
-         * state it was left in */
-
-        /* Not handling automatic bulk replay yet (or ever?) */
-        LASSERT(req->rq_bulk == NULL);
-
-        DEBUG_REQ(D_NET, req, "about to replay");
-
-        /* Update request's state, since we might have a new connection. */
-        ptlrpc_put_connection(req->rq_connection);
-        req->rq_connection =
-                ptlrpc_connection_addref(req->rq_import->imp_connection);
-
-        /* temporarily set request to REPLAY level---not strictly
-         * necessary since ptl_send_rpc doesn't check state, but let's
-         * be consistent.*/
-        old_state = req->rq_send_state;
-
-        /*
-         * Q: "How can a req get on the replay list if it wasn't replied?"
-         * A: "If we failed during the replay of this request, it will still
-         *     be on the list, but rq_replied will have been reset to 0."
-         */
-        if (req->rq_replied)
-                old_status = req->rq_repmsg->status;
-        req->rq_send_state = LUSTRE_IMP_REPLAY;
-        rc = ptl_send_rpc(req);
-        if (rc) {
-                CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
-                ptlrpc_cleanup_request_buf(req);
-                // up(&cli->cli_rpc_sem);
-                GOTO(out, rc = -rc);
-        }
-
-        CDEBUG(D_OTHER, "-- sleeping\n");
-        lwi = LWI_INTR(NULL, NULL); /* XXX needs timeout, nested recovery */
-        l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
-        CDEBUG(D_OTHER, "-- done\n");
-
-        // up(&cli->cli_rpc_sem);
+struct ptlrpc_replay_async_args {
+        int praa_old_state;
+        int praa_old_status;
+};
 
-        /* If the reply was received normally, this just grabs the spinlock
-         * (ensuring the reply callback has returned), sees that
-         * req->rq_receiving_reply is clear and returns. */
-        ptlrpc_unregister_reply (req);
+static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
+                                    void * data, int rc)
+{
+        struct ptlrpc_replay_async_args *aa = data;
+        struct obd_import *imp = req->rq_import;
+        unsigned long flags;
 
-        if (!req->rq_replied) {
-                CERROR("Unknown reason for wakeup\n");
-                /* XXX Phil - I end up here when I kill obdctl */
-                /* ...that's because signals aren't all masked in
-                 * l_wait_event() -eeb */
-                GOTO(out, rc = -EINTR);
-        }
+        atomic_dec(&imp->imp_replay_inflight);
 
 #if SWAB_PARANOIA
         /* Clear reply swab mask; this is a new reply in sender's byte order */
@@ -1574,15 +1499,6 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
                 CERROR("unpack_rep failed: %d\n", rc);
                 GOTO(out, rc = -EPROTO);
         }
-#if 0
-        /* FIXME: Enable when BlueArc makes new release */
-        if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
-            req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
-                CERROR("invalid packet type received (type=%u)\n",
-                       req->rq_repmsg->type);
-                GOTO(out, rc = -EPROTO);
-        }
-#endif
 
         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
             req->rq_repmsg->status == -ENOTCONN) 
@@ -1591,25 +1507,76 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         /* The transno had better not change over replay. */
         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
 
-        CDEBUG(D_NET, "got rep "LPD64"\n", req->rq_xid);
+        DEBUG_REQ(D_HA, req, "got rep");
 
         /* let the callback do fixups, possibly including in the request */
         if (req->rq_replay_cb)
                 req->rq_replay_cb(req);
 
-        if (req->rq_replied && req->rq_repmsg->status != old_status) {
+        if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
-                          req->rq_repmsg->status, old_status);
+                          req->rq_repmsg->status, aa->praa_old_status);
         } else {
                 /* Put it back for re-replay. */
-                req->rq_status = old_status;
+                req->rq_repmsg->status = aa->praa_old_status;
         }
 
+        spin_lock_irqsave(&imp->imp_lock, flags);
+        imp->imp_last_replay_transno = req->rq_transno;
+        spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+        /* continue with recovery */
+        rc = ptlrpc_import_recovery_state_machine(imp);
  out:
-        req->rq_send_state = old_state;
+        req->rq_send_state = aa->praa_old_state;
+        
+        if (rc != 0)
+                /* this replay failed, so restart recovery */
+                ptlrpc_connect_import(imp, NULL);
+
         RETURN(rc);
 }
 
+
+int ptlrpc_replay_req(struct ptlrpc_request *req)
+{
+        struct ptlrpc_replay_async_args *aa;
+        ENTRY;
+
+        LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
+
+        /* Not handling automatic bulk replay yet (or ever?) */
+        LASSERT(req->rq_bulk == NULL);
+
+        DEBUG_REQ(D_HA, req, "REPLAY");
+
+        LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
+        aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
+        memset(aa, 0, sizeof *aa);
+
+        /* Prepare request to be resent with ptlrpcd */
+        aa->praa_old_state = req->rq_send_state;
+        req->rq_send_state = LUSTRE_IMP_REPLAY;
+        req->rq_phase = RQ_PHASE_NEW;
+        /*
+         * Q: "How can a req get on the replay list if it wasn't replied?"
+         * A: "If we failed during the replay of this request, it will still
+         *     be on the list, but rq_replied will have been reset to 0."
+         */
+        if (req->rq_replied) {
+                aa->praa_old_status = req->rq_repmsg->status;
+                req->rq_status = 0;
+                req->rq_replied = 0;
+        }
+
+        req->rq_interpret_reply = ptlrpc_replay_interpret;
+        atomic_inc(&req->rq_import->imp_replay_inflight);
+        ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
+
+        ptlrpcd_add_req(req);
+        RETURN(0);
+}
+
 void ptlrpc_abort_inflight(struct obd_import *imp)
 {
         unsigned long flags;