Whamcloud - gitweb
- added connection structure which encompasses some (but perhaps not enough yet)
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 5a6a5c1..e2bceff 100644 (file)
 
 #define EXPORT_SYMTAB
 
-#include <linux/config.h>
-#include <linux/module.h>
-#include <linux/kernel.h>
-#include <linux/list.h>
-
 #define DEBUG_SUBSYSTEM S_RPC
 
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
 #include <linux/lustre_net.h>
 
 void llite_ha_conn_manage(struct lustre_ha_mgr *mgr, struct ptlrpc_client *cli)
@@ -54,18 +47,14 @@ void llite_ha_conn_fail(struct ptlrpc_client *cli)
         EXIT;
 }
 
-void ptlrpc_init_client(struct lustre_ha_mgr *mgr, int req_portal, int rep_portal,
-                          struct ptlrpc_client *cl)
+void ptlrpc_init_client(struct lustre_ha_mgr *mgr, int req_portal,
+                        int rep_portal, struct ptlrpc_client *cl)
 {
         memset(cl, 0, sizeof(*cl));
         spin_lock_init(&cl->cli_lock);
         cl->cli_ha_mgr = mgr;
         if (mgr)
                 llite_ha_conn_manage(mgr, cl);
-        cl->cli_xid = 1;
-        cl->cli_generation = 1;
-        cl->cli_epoch = 1;
-        cl->cli_bootcount = 0;
         cl->cli_obd = NULL;
         cl->cli_request_portal = req_portal;
         cl->cli_reply_portal = rep_portal;
@@ -74,26 +63,32 @@ void ptlrpc_init_client(struct lustre_ha_mgr *mgr, int req_portal, int rep_porta
         sema_init(&cl->cli_rpc_sem, 32);
 }
 
-int ptlrpc_connect_client(char *uuid, struct ptlrpc_client *cl,
-                          struct lustre_peer *peer)
+struct ptlrpc_connection *ptlrpc_connect_client(char *uuid)
 {
+        struct ptlrpc_connection *c;
+        struct lustre_peer peer;
         int err;
 
-        cl->cli_epoch++;
-        err = kportal_uuid_to_peer(uuid, peer);
-        if (err != 0)
+        err = kportal_uuid_to_peer(uuid, &peer);
+        if (err != 0) {
                 CERROR("cannot find peer %s!\n", uuid);
+                return NULL;
+        }
 
-        return err;
+        c = ptlrpc_get_connection(&peer);
+        if (c)
+                c->c_epoch++;
+
+        return c;
 }
 
-struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer)
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn)
 {
         struct ptlrpc_bulk_desc *bulk;
 
         OBD_ALLOC(bulk, sizeof(*bulk));
         if (bulk != NULL) {
-                memcpy(&bulk->b_peer, peer, sizeof(*peer));
+                bulk->b_connection = ptlrpc_connection_addref(conn);
                 init_waitqueue_head(&bulk->b_waitq);
         }
 
@@ -101,8 +96,9 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer)
 }
 
 struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
-                                       struct lustre_peer *peer, int opcode,
-                                       int count, int *lengths, char **bufs)
+                                       struct ptlrpc_connection *conn,
+                                       int opcode, int count, int *lengths,
+                                       char **bufs)
 {
         struct ptlrpc_request *request;
         int rc;
@@ -114,26 +110,27 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
                 RETURN(NULL);
         }
 
-        spin_lock(&cl->cli_lock);
-        request->rq_xid = cl->cli_xid++;
-        spin_unlock(&cl->cli_lock);
-
         rc = lustre_pack_msg(count, lengths, bufs,
-                             &request->rq_reqlen, &request->rq_reqbuf);
+                             &request->rq_reqlen, &request->rq_reqmsg);
         if (rc) {
                 CERROR("cannot pack request %d\n", rc);
                 RETURN(NULL);
         }
+
         request->rq_time = CURRENT_TIME;
         request->rq_type = PTL_RPC_REQUEST;
-        memcpy(&request->rq_peer, peer, sizeof(*peer));
-        request->rq_reqmsg = (struct lustre_msg *)request->rq_reqbuf;
+        request->rq_connection = ptlrpc_connection_addref(conn);
+
+        request->rq_reqmsg->conn = (__u64)(unsigned long)conn;
+        request->rq_reqmsg->token = conn->c_token;
         request->rq_reqmsg->opc = HTON__u32(opcode);
-        request->rq_reqmsg->xid = HTON__u32(request->rq_xid);
         request->rq_reqmsg->type = HTON__u32(request->rq_type);
+
+        spin_lock(&conn->c_lock);
+        request->rq_reqmsg->xid = HTON__u32(++conn->c_xid_out);
+        spin_unlock(&c->c_lock);
+
         request->rq_client = cl;
-        request->rq_req_portal = cl->cli_request_portal;
-        request->rq_reply_portal = cl->cli_reply_portal;
 
         RETURN(request);
 }
@@ -143,8 +140,11 @@ void ptlrpc_free_req(struct ptlrpc_request *request)
         if (request == NULL)
                 return;
 
-        if (request->rq_repbuf != NULL)
-                OBD_FREE(request->rq_repbuf, request->rq_replen);
+        if (request->rq_repmsg != NULL)
+                OBD_FREE(request->rq_repmsg, request->rq_replen);
+
+        ptlrpc_put_connection(request->rq_connection);
+
         OBD_FREE(request, sizeof(*request));
 }
 
@@ -153,7 +153,7 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
         int rc = 0;
 
         schedule_timeout(3 * HZ);  /* 3 second timeout */
-        if (req->rq_repbuf != NULL) {
+        if (req->rq_repmsg != NULL) {
                 req->rq_flags = PTL_RPC_REPLY;
                 GOTO(out, rc = 1);
         }
@@ -207,8 +207,8 @@ int ptlrpc_check_status(struct ptlrpc_request *req, int err)
 
 static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
 {
-        OBD_FREE(request->rq_reqbuf, request->rq_reqlen);
-        request->rq_reqbuf = NULL;
+        OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
+        request->rq_reqmsg = NULL;
         request->rq_reqlen = 0;
 }
 
@@ -219,23 +219,23 @@ static int ptlrpc_abort(struct ptlrpc_request *request)
          * that we can tear down the buffer safely. */
         PtlMEUnlink(request->rq_reply_me_h);
         OBD_FREE(request->rq_reply_md.start, request->rq_replen);
-        request->rq_repbuf = NULL;
+        request->rq_repmsg = NULL;
         request->rq_replen = 0;
         return 0;
 }
 
-int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
+int ptlrpc_queue_wait(struct ptlrpc_request *req)
 {
         int rc = 0;
         ENTRY;
 
         init_waitqueue_head(&req->rq_wait_for_rep);
 
-        rc = ptl_send_rpc(req, cl);
+        rc = ptl_send_rpc(req);
         if (rc) {
                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
                 ptlrpc_cleanup_request_buf(req);
-                up(&cl->cli_rpc_sem);
+                up(&req->rq_client->cli_rpc_sem);
                 RETURN(-rc);
         }
 
@@ -243,7 +243,7 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
         wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
         CDEBUG(D_OTHER, "-- done\n");
         ptlrpc_cleanup_request_buf(req);
-        up(&cl->cli_rpc_sem);
+        up(&req->rq_client->cli_rpc_sem);
         if (req->rq_flags == PTL_RPC_INTR) {
                 /* Clean up the dangling reply buffers */
                 ptlrpc_abort(req);
@@ -257,8 +257,7 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
                 GOTO(out, rc = -EINTR);
         }
 
-        rc = lustre_unpack_msg(req->rq_repbuf, req->rq_replen);
-        req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+        rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
         if (rc) {
                 CERROR("unpack_rep failed: %d\n", rc);
                 GOTO(out, rc);
@@ -266,7 +265,7 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
         CDEBUG(D_NET, "got rep %d\n", req->rq_repmsg->xid);
 
         if (req->rq_repmsg->status == 0)
-                CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repbuf,
+                CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
                        req->rq_replen, req->rq_repmsg->status);
 
         EXIT;