Whamcloud - gitweb
b=17310
[fs/lustre-release.git] / lustre / include / lustre_net.h
index 363a0b8..34b5831 100644 (file)
@@ -58,6 +58,8 @@
 #include <lu_object.h>
 #include <lustre_req_layout.h>
 
+#include <obd_support.h>
+
 /* MD flags we _always_ use */
 #define PTLRPC_MD_OPTIONS  0
 
@@ -291,11 +293,13 @@ struct ptlrpc_reply_state {
 struct ptlrpc_thread;
 
 enum rq_phase {
-        RQ_PHASE_NEW         = 0xebc0de00,
-        RQ_PHASE_RPC         = 0xebc0de01,
-        RQ_PHASE_BULK        = 0xebc0de02,
-        RQ_PHASE_INTERPRET   = 0xebc0de03,
-        RQ_PHASE_COMPLETE    = 0xebc0de04,
+        RQ_PHASE_NEW            = 0xebc0de00,
+        RQ_PHASE_RPC            = 0xebc0de01,
+        RQ_PHASE_BULK           = 0xebc0de02,
+        RQ_PHASE_INTERPRET      = 0xebc0de03,
+        RQ_PHASE_COMPLETE       = 0xebc0de04,
+        RQ_PHASE_UNREGISTERING  = 0xebc0de05,
+        RQ_PHASE_UNDEFINED      = 0xebc0de06
 };
 
 /** Type of request interpreter call-back */
@@ -347,6 +351,7 @@ struct ptlrpc_request {
                 rq_sent_final:1;    /* stop sending early replies */
 
         enum rq_phase rq_phase; /* one of RQ_PHASE_* */
+        enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
         atomic_t rq_refcount;   /* client-side refcount for SENT race,
                                    server-side refcounf for multiple replies */
 
@@ -454,6 +459,7 @@ struct ptlrpc_request {
         volatile time_t rq_deadline;     /* when request must finish. volatile
                so that servers' early reply updates to the deadline aren't
                kept in per-cpu cache */
+        time_t rq_reply_deadline;        /* when req reply unlink must finish. */
         int    rq_timeout;               /* service time estimate (secs) */
 
         /* Multi-rpc bits */
@@ -505,9 +511,9 @@ static inline int lustre_rep_swabbed(struct ptlrpc_request *req, int index)
 }
 
 static inline const char *
-ptlrpc_rqphase2str(const struct ptlrpc_request *req)
+ptlrpc_phase2str(enum rq_phase phase)
 {
-        switch (req->rq_phase) {
+        switch (phase) {
         case RQ_PHASE_NEW:
                 return "New";
         case RQ_PHASE_RPC:
@@ -518,11 +524,19 @@ ptlrpc_rqphase2str(const struct ptlrpc_request *req)
                 return "Interpret";
         case RQ_PHASE_COMPLETE:
                 return "Complete";
+        case RQ_PHASE_UNREGISTERING:
+                return "Unregistering";
         default:
                 return "?Phase?";
         }
 }
 
+static inline const char *
+ptlrpc_rqphase2str(struct ptlrpc_request *req)
+{
+        return ptlrpc_phase2str(req->rq_phase);
+}
+
 /* Spare the preprocessor, spoil the bugs. */
 #define FLAG(field, str) (field ? str : "")
 
@@ -846,29 +860,9 @@ void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
 void ptlrpc_cleanup_client(struct obd_import *imp);
 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid);
 
-static inline int
-ptlrpc_client_recv_or_unlink (struct ptlrpc_request *req)
-{
-        int           rc;
-
-        spin_lock(&req->rq_lock);
-        rc = req->rq_receiving_reply || req->rq_must_unlink;
-        spin_unlock(&req->rq_lock);
-        return (rc);
-}
-
-static inline void
-ptlrpc_wake_client_req (struct ptlrpc_request *req)
-{
-        if (req->rq_set == NULL)
-                cfs_waitq_signal(&req->rq_reply_waitq);
-        else
-                cfs_waitq_signal(&req->rq_set->set_waitq);
-}
-
 int ptlrpc_queue_wait(struct ptlrpc_request *req);
 int ptlrpc_replay_req(struct ptlrpc_request *req);
-void ptlrpc_unregister_reply(struct ptlrpc_request *req);
+int ptlrpc_unregister_reply(struct ptlrpc_request *req, int async);
 void ptlrpc_restart_req(struct ptlrpc_request *req);
 void ptlrpc_abort_inflight(struct obd_import *imp);
 void ptlrpc_abort_set(struct ptlrpc_request_set *set);
@@ -1077,6 +1071,81 @@ lustre_shrink_reply(struct ptlrpc_request *req, int segment,
 }
 
 static inline void
+ptlrpc_rqphase_move(struct ptlrpc_request *req, enum rq_phase new_phase)
+{
+        if (req->rq_phase == new_phase)
+                return;
+        
+        if (new_phase == RQ_PHASE_UNREGISTERING) {
+                req->rq_next_phase = req->rq_phase;
+                if (req->rq_import)
+                        atomic_inc(&req->rq_import->imp_unregistering);
+        }
+        
+        if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
+                if (req->rq_import)
+                        atomic_dec(&req->rq_import->imp_unregistering);
+        }
+
+        DEBUG_REQ(D_RPCTRACE, req, "move req \"%s\" -> \"%s\"", 
+                  ptlrpc_rqphase2str(req), ptlrpc_phase2str(new_phase));
+
+        req->rq_phase = new_phase;
+}
+
+static inline int
+ptlrpc_client_early(struct ptlrpc_request *req)
+{
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+            req->rq_reply_deadline > cfs_time_current_sec())
+                return 0;
+        return req->rq_early;
+}
+
+static inline int
+ptlrpc_client_replied(struct ptlrpc_request *req)
+{
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+            req->rq_reply_deadline > cfs_time_current_sec())
+                return 0;
+        return req->rq_replied;
+}
+
+static inline int
+ptlrpc_client_recv(struct ptlrpc_request *req)
+{
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+            req->rq_reply_deadline > cfs_time_current_sec())
+                return 1;
+        return req->rq_receiving_reply;
+}
+
+static inline int
+ptlrpc_client_recv_or_unlink(struct ptlrpc_request *req)
+{
+        int rc;
+
+        spin_lock(&req->rq_lock);
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+            req->rq_reply_deadline > cfs_time_current_sec()) {
+                spin_unlock(&req->rq_lock);
+                return 1;
+        }
+        rc = req->rq_receiving_reply || req->rq_must_unlink;
+        spin_unlock(&req->rq_lock);
+        return rc;
+}
+
+static inline void
+ptlrpc_client_wake_req(struct ptlrpc_request *req)
+{
+        if (req->rq_set == NULL)
+                cfs_waitq_signal(&req->rq_reply_waitq);
+        else
+                cfs_waitq_signal(&req->rq_set->set_waitq);
+}
+
+static inline void
 ptlrpc_rs_addref(struct ptlrpc_reply_state *rs)
 {
         LASSERT(atomic_read(&rs->rs_refcount) > 0);