Whamcloud - gitweb
merge b_devel into HEAD (20030703)
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index c79329c..a98af3e 100644 (file)
@@ -109,7 +109,6 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
                                                int type, int portal)
 {
         struct obd_import       *imp = req->rq_import;
-        unsigned long            flags;
         struct ptlrpc_bulk_desc *desc;
 
         LASSERT (type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
@@ -118,13 +117,7 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
         if (desc == NULL)
                 RETURN(NULL);
 
-        /* Is this sampled at the right place?  Do we want to get the import
-         * generation just before we send?  Should it match the generation of
-         * the request? */
-        spin_lock_irqsave(&imp->imp_lock, flags);
-        desc->bd_import_generation = imp->imp_generation;
-        spin_unlock_irqrestore(&imp->imp_lock, flags);
-
+        desc->bd_import_generation = req->rq_import_generation;
         desc->bd_import = class_import_get(imp);
         desc->bd_req = req;
         desc->bd_type = type;
@@ -449,13 +442,7 @@ static int after_reply(struct ptlrpc_request *req, int *restartp)
                         RETURN(-ENOTCONN);
                 }
 
-                rc = ptlrpc_request_handle_eviction(req);
-                if (rc)
-                        CERROR("can't reconnect to %s@%s: %d\n",
-                               imp->imp_target_uuid.uuid,
-                               imp->imp_connection->c_remote_uuid.uuid, rc);
-                else
-                        ptlrpc_wake_delayed(imp);
+                ptlrpc_request_handle_eviction(req);
 
                 if (req->rq_err)
                         RETURN(-EIO);
@@ -486,15 +473,6 @@ static int after_reply(struct ptlrpc_request *req, int *restartp)
 
                 /* Replay-enabled imports return commit-status information. */
                 if (req->rq_repmsg->last_committed) {
-                        if (req->rq_repmsg->last_committed <
-                            imp->imp_peer_committed_transno) {
-                                CERROR("%s went back in time (transno "LPD64
-                                       " was committed, server claims "LPD64
-                                       ")! is shared storage not coherent?\n",
-                                       imp->imp_target_uuid.uuid,
-                                       imp->imp_peer_committed_transno,
-                                       req->rq_repmsg->last_committed);
-                        }
                         imp->imp_peer_committed_transno =
                                 req->rq_repmsg->last_committed;
                 }
@@ -505,7 +483,7 @@ static int after_reply(struct ptlrpc_request *req, int *restartp)
         RETURN(rc);
 }
 
-static int check_set(struct ptlrpc_request_set *set)
+int ptlrpc_check_set(struct ptlrpc_request_set *set)
 {
         unsigned long flags;
         struct list_head *tmp;
@@ -574,6 +552,15 @@ static int check_set(struct ptlrpc_request_set *set)
                                 list_del(&req->rq_list);
                                 list_add_tail(&req->rq_list,
                                               &imp->imp_sending_list);
+
+                                if (req->rq_import_generation < 
+                                    imp->imp_generation) {
+                                        req->rq_status = -EIO;
+                                        req->rq_phase = RQ_PHASE_INTERPRET;
+                                        spin_unlock_irqrestore(&imp->imp_lock, 
+                                                               flags);
+                                        GOTO (interpret, req->rq_status);
+                                }
                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
 
                                 req->rq_waiting = 0;
@@ -641,6 +628,7 @@ static int check_set(struct ptlrpc_request_set *set)
                 LASSERT (req->rq_phase == RQ_PHASE_INTERPRET);
                 LASSERT (!req->rq_receiving_reply);
 
+                ptlrpc_unregister_reply(req);
                 if (req->rq_bulk != NULL)
                         ptlrpc_unregister_bulk (req);
 
@@ -666,7 +654,7 @@ static int check_set(struct ptlrpc_request_set *set)
         RETURN (set->set_remaining == 0);
 }
 
-static int expire_one_request(struct ptlrpc_request *req)
+int ptlrpc_expire_one_request(struct ptlrpc_request *req)
 {
         unsigned long      flags;
         struct obd_import *imp = req->rq_import;
@@ -724,7 +712,7 @@ static int expired_set(void *data)
                         continue;
 
                 /* deal with this guy */
-                expire_one_request (req);
+                ptlrpc_expire_one_request (req);
         }
 
         /* When waiting for a whole set, we always to break out of the
@@ -787,6 +775,8 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                         continue;
                 }
 
+                req->rq_import_generation = imp->imp_generation;
+
                 if (req->rq_level > imp->imp_level) {
                         if (req->rq_no_recov || imp->imp_obd->obd_no_recov ||
                             imp->imp_dlm_fake) {
@@ -809,7 +799,6 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                 /* XXX this is the same as ptlrpc_queue_wait */
                 LASSERT(list_empty(&req->rq_list));
                 list_add_tail(&req->rq_list, &imp->imp_sending_list);
-                req->rq_import_generation = imp->imp_generation;
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
 
                 CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
@@ -853,9 +842,9 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                  * req times out */
                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
                        set, timeout);
-                lwi = LWI_TIMEOUT_INTR(timeout ? timeout * HZ : 1,
+                lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
                                        expired_set, interrupted_set, set);
-                rc = l_wait_event(set->set_waitq, check_set(set), &lwi);
+                rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
 
                 LASSERT (rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
 
@@ -951,7 +940,8 @@ static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
         if (request == NULL)
                 RETURN(1);
 
-        if (request == (void *)(long)(0x5a5a5a5a5a5a5a5a)) {
+        if (request == (void *)(long)(0x5a5a5a5a5a5a5a5a) || 
+            request->rq_obd == (void *)(long)(0x5a5a5a5a5a5a5a5a)) {
                 CERROR("dereferencing freed request (bug 575)\n");
                 LBUG();
                 RETURN(1);
@@ -1074,6 +1064,11 @@ void ptlrpc_free_committed(struct obd_import *imp)
                 LASSERT (req != last_req);
                 last_req = req;
 
+                if (req->rq_import_generation < imp->imp_generation) {
+                        DEBUG_REQ(D_HA, req, "freeing request with old gen");
+                        GOTO(free_req, 0);
+                }
+
                 if (req->rq_replay) {
                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
                         continue;
@@ -1087,6 +1082,7 @@ void ptlrpc_free_committed(struct obd_import *imp)
 
                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
                           imp->imp_peer_committed_transno);
+free_req:
                 list_del_init(&req->rq_list);
                 __ptlrpc_req_finished(req, 1);
         }
@@ -1146,7 +1142,7 @@ static int expired_request(void *data)
         struct ptlrpc_request *req = data;
         ENTRY;
 
-        RETURN(expire_one_request(req));
+        RETURN(ptlrpc_expire_one_request(req));
 }
 
 static void interrupted_request(void *data)
@@ -1234,6 +1230,8 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         /* Mark phase here for a little debug help */
         req->rq_phase = RQ_PHASE_RPC;
 
+        spin_lock_irqsave(&imp->imp_lock, flags);
+        req->rq_import_generation = imp->imp_generation;
 restart:
         /*
          * If the import has been invalidated (such as by an OST failure), the
@@ -1241,13 +1239,18 @@ restart:
          * through, though, so that they have a chance to revalidate the
          * import.
          */
-        spin_lock_irqsave(&imp->imp_lock, flags);
         if (req->rq_import->imp_invalid && req->rq_level == LUSTRE_CONN_FULL) {
                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID:");
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
                 GOTO (out, rc = -EIO);
         }
 
+        if (req->rq_import_generation < imp->imp_generation) {
+                DEBUG_REQ(D_ERROR, req, "req old gen:");
+                spin_unlock_irqrestore(&imp->imp_lock, flags);
+                GOTO (out, rc = -EIO);
+        }
+
         if (req->rq_level > imp->imp_level) {
                 list_del(&req->rq_list);
                 if (req->rq_no_recov || obd->obd_no_recov ||
@@ -1272,9 +1275,11 @@ restart:
                 spin_lock_irqsave(&imp->imp_lock, flags);
                 list_del_init(&req->rq_list);
 
-                if (req->rq_err)
+                if (req->rq_err || 
+                    req->rq_import_generation < imp->imp_generation)
                         rc = -EIO;
 
+
                 if (rc) {
                         spin_unlock_irqrestore(&imp->imp_lock, flags);
                         GOTO (out, rc);
@@ -1286,7 +1291,6 @@ restart:
         /* XXX this is the same as ptlrpc_set_wait */
         LASSERT(list_empty(&req->rq_list));
         list_add_tail(&req->rq_list, &imp->imp_sending_list);
-        req->rq_import_generation = imp->imp_generation;
         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
         rc = ptl_send_rpc(req);
@@ -1376,6 +1380,7 @@ restart:
                         ptlrpc_unregister_bulk (req);
 
                 DEBUG_REQ(D_HA, req, "resending: ");
+                spin_lock_irqsave(&imp->imp_lock, flags);
                 goto restart;
         }
 
@@ -1404,6 +1409,7 @@ restart:
                 if (req->rq_bulk != NULL)
                         ptlrpc_unregister_bulk (req);
                 DEBUG_REQ(D_HA, req, "resending: ");
+                spin_lock_irqsave(&imp->imp_lock, flags);
                 goto restart;
         }
 
@@ -1457,7 +1463,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         old_level = req->rq_level;
         if (req->rq_replied)
                 old_status = req->rq_repmsg->status;
-        req->rq_level = LUSTRE_CONN_RECOVD;
+        req->rq_level = LUSTRE_CONN_RECOVER;
         rc = ptl_send_rpc(req);
         if (rc) {
                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
@@ -1535,13 +1541,6 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
          * this flag and then putting requests on sending_list or delayed_list.
          */
         spin_lock_irqsave(&imp->imp_lock, flags);
-        if (!imp->imp_replayable)
-                /* on b_devel, I moved this line to
-                   ptlrpc_set_import_active because I thought it made
-                   more sense there and possibly not all callers of
-                   this function expect this. I'll leave it here until
-                   I can figure out if it's correct or not. - rread 5/12/03  */
-                imp->imp_invalid = 1;
 
         /* XXX locking?  Maybe we should remove each request with the list
          * locked?  Also, how do we know if the requests on the list are
@@ -1554,11 +1553,13 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                 DEBUG_REQ(D_HA, req, "inflight");
 
                 spin_lock (&req->rq_lock);
-                req->rq_err = 1;
-                if (req->rq_set != NULL)
-                        wake_up(&req->rq_set->set_waitq);
-                else
-                        wake_up(&req->rq_wait_for_rep);
+                if (req->rq_import_generation < imp->imp_generation) {
+                        req->rq_err = 1;
+                        if (req->rq_set != NULL)
+                                wake_up(&req->rq_set->set_waitq);
+                        else
+                                wake_up(&req->rq_wait_for_rep);
+                }
                 spin_unlock (&req->rq_lock);
         }
 
@@ -1569,12 +1570,14 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                 DEBUG_REQ(D_HA, req, "aborting waiting req");
 
                 spin_lock (&req->rq_lock);
-                req->rq_err = 1;
-                if (req->rq_set != NULL)
-                        wake_up(&req->rq_set->set_waitq);
-                else
-                        wake_up(&req->rq_wait_for_rep);
-                spin_unlock (&req->rq_lock);
+                if (req->rq_import_generation < imp->imp_generation) {
+                        req->rq_err = 1;
+                        if (req->rq_set != NULL)
+                                wake_up(&req->rq_set->set_waitq);
+                        else
+                                wake_up(&req->rq_wait_for_rep);
+                        spin_unlock (&req->rq_lock);
+                }
         }
 
         /* Last chance to free reqs left on the replay list, but we