Whamcloud - gitweb
WARNING - if an RPC times out you will crash older UML's.
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index a1322ac..23a3b9f 100644 (file)
 
 #define EXPORT_SYMTAB
 
-#include <linux/config.h>
-#include <linux/module.h>
-#include <linux/kernel.h>
-#include <linux/list.h>
-
 #define DEBUG_SUBSYSTEM S_RPC
 
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
 #include <linux/lustre_net.h>
 
-void ptlrpc_init_client(int dev, int req_portal, int rep_portal,
-                          struct ptlrpc_client *cl)
+
+void ptlrpc_init_client(struct connmgr_obd *mgr, int req_portal,
+                        int rep_portal, struct ptlrpc_client *cl)
 {
         memset(cl, 0, sizeof(*cl));
-        spin_lock_init(&cl->cli_lock);
-        cl->cli_xid = 1;
-        cl->cli_generation = 1;
-        cl->cli_epoch = 1;
-        cl->cli_bootcount = 0;
+        cl->cli_ha_mgr = mgr;
+        if (mgr)
+                connmgr_cli_manage(mgr, cl);
         cl->cli_obd = NULL;
         cl->cli_request_portal = req_portal;
         cl->cli_reply_portal = rep_portal;
@@ -50,33 +42,42 @@ void ptlrpc_init_client(int dev, int req_portal, int rep_portal,
         sema_init(&cl->cli_rpc_sem, 32);
 }
 
-int ptlrpc_connect_client(int dev, char *uuid, struct ptlrpc_client *cl)
+struct ptlrpc_connection *ptlrpc_uuid_to_connection(char *uuid)
 {
+        struct ptlrpc_connection *c;
+        struct lustre_peer peer;
         int err;
 
-        cl->cli_epoch++;
-        err = kportal_uuid_to_peer(uuid, &cl->cli_server);
-        if (err != 0)
+        err = kportal_uuid_to_peer(uuid, &peer);
+        if (err != 0) {
                 CERROR("cannot find peer %s!\n", uuid);
+                return NULL;
+        }
 
-        return err;
+        c = ptlrpc_get_connection(&peer);
+        if (c)
+                c->c_epoch++;
+
+        return c;
 }
 
-struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer)
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn)
 {
         struct ptlrpc_bulk_desc *bulk;
 
         OBD_ALLOC(bulk, sizeof(*bulk));
         if (bulk != NULL) {
-                memcpy(&bulk->b_peer, peer, sizeof(*peer));
+                bulk->b_connection = ptlrpc_connection_addref(conn);
                 init_waitqueue_head(&bulk->b_waitq);
         }
 
         return bulk;
 }
 
-struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode,
-                                       int count, int *lengths, char **bufs)
+struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
+                                       struct ptlrpc_connection *conn,
+                                       int opcode, int count, int *lengths,
+                                       char **bufs)
 {
         struct ptlrpc_request *request;
         int rc;
@@ -88,22 +89,28 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode,
                 RETURN(NULL);
         }
 
-        spin_lock(&cl->cli_lock);
-        request->rq_xid = cl->cli_xid++;
-        spin_unlock(&cl->cli_lock);
-
         rc = lustre_pack_msg(count, lengths, bufs,
-                             &request->rq_reqlen, &request->rq_reqbuf);
+                             &request->rq_reqlen, &request->rq_reqmsg);
         if (rc) {
                 CERROR("cannot pack request %d\n", rc);
                 RETURN(NULL);
         }
+
+        request->rq_time = CURRENT_TIME;
         request->rq_type = PTL_RPC_REQUEST;
-        request->rq_reqmsg = (struct lustre_msg *)request->rq_reqbuf;
+        request->rq_connection = ptlrpc_connection_addref(conn);
+
+        request->rq_reqmsg->conn = (__u64)(unsigned long)conn->c_remote_conn;
+        request->rq_reqmsg->token = conn->c_remote_token;
         request->rq_reqmsg->opc = HTON__u32(opcode);
-        request->rq_reqmsg->xid = HTON__u32(request->rq_xid);
         request->rq_reqmsg->type = HTON__u32(request->rq_type);
 
+        spin_lock(&conn->c_lock);
+        request->rq_reqmsg->xid = HTON__u32(++conn->c_xid_out);
+        spin_unlock(&conn->c_lock);
+
+        request->rq_client = cl;
+
         RETURN(request);
 }
 
@@ -112,8 +119,11 @@ void ptlrpc_free_req(struct ptlrpc_request *request)
         if (request == NULL)
                 return;
 
-        if (request->rq_repbuf != NULL)
-                OBD_FREE(request->rq_repbuf, request->rq_replen);
+        if (request->rq_repmsg != NULL)
+                OBD_FREE(request->rq_repmsg, request->rq_replen);
+
+        ptlrpc_put_connection(request->rq_connection);
+
         OBD_FREE(request, sizeof(*request));
 }
 
@@ -121,11 +131,19 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
 {
         int rc = 0;
 
-        if (req->rq_repbuf != NULL) {
+        schedule_timeout(3 * HZ);  /* 3 second timeout */
+        if (req->rq_repmsg != NULL) {
                 req->rq_flags = PTL_RPC_REPLY;
                 GOTO(out, rc = 1);
         }
 
+        if (CURRENT_TIME - req->rq_time >= 3) { 
+                CERROR("-- REQ TIMEOUT --\n"); 
+                if (req->rq_client && req->rq_client->cli_ha_mgr)
+                        connmgr_cli_fail(req->rq_client); 
+                return 0;
+        }
+
         if (sigismember(&(current->pending.signal), SIGKILL) ||
             sigismember(&(current->pending.signal), SIGTERM) ||
             sigismember(&(current->pending.signal), SIGINT)) {
@@ -168,8 +186,8 @@ int ptlrpc_check_status(struct ptlrpc_request *req, int err)
 
 static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
 {
-        OBD_FREE(request->rq_reqbuf, request->rq_reqlen);
-        request->rq_reqbuf = NULL;
+        OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
+        request->rq_reqmsg = NULL;
         request->rq_reqlen = 0;
 }
 
@@ -180,26 +198,23 @@ static int ptlrpc_abort(struct ptlrpc_request *request)
          * that we can tear down the buffer safely. */
         PtlMEUnlink(request->rq_reply_me_h);
         OBD_FREE(request->rq_reply_md.start, request->rq_replen);
-        request->rq_repbuf = NULL;
+        request->rq_repmsg = NULL;
         request->rq_replen = 0;
         return 0;
 }
 
-int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
+int ptlrpc_queue_wait(struct ptlrpc_request *req)
 {
         int rc = 0;
         ENTRY;
 
         init_waitqueue_head(&req->rq_wait_for_rep);
 
-        req->rq_client = cl;
-        req->rq_req_portal = cl->cli_request_portal;
-        req->rq_reply_portal = cl->cli_reply_portal;
-        rc = ptl_send_rpc(req, cl);
+        rc = ptl_send_rpc(req);
         if (rc) {
                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
                 ptlrpc_cleanup_request_buf(req);
-                up(&cl->cli_rpc_sem);
+                up(&req->rq_client->cli_rpc_sem);
                 RETURN(-rc);
         }
 
@@ -207,7 +222,7 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
         wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
         CDEBUG(D_OTHER, "-- done\n");
         ptlrpc_cleanup_request_buf(req);
-        up(&cl->cli_rpc_sem);
+        up(&req->rq_client->cli_rpc_sem);
         if (req->rq_flags == PTL_RPC_INTR) {
                 /* Clean up the dangling reply buffers */
                 ptlrpc_abort(req);
@@ -221,8 +236,7 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
                 GOTO(out, rc = -EINTR);
         }
 
-        rc = lustre_unpack_msg(req->rq_repbuf, req->rq_replen);
-        req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+        rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
         if (rc) {
                 CERROR("unpack_rep failed: %d\n", rc);
                 GOTO(out, rc);
@@ -230,7 +244,7 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
         CDEBUG(D_NET, "got rep %d\n", req->rq_repmsg->xid);
 
         if (req->rq_repmsg->status == 0)
-                CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repbuf,
+                CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
                        req->rq_replen, req->rq_repmsg->status);
 
         EXIT;