Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / ptlrpc / niobuf.c
index ef3a215..62a76c4 100644 (file)
  */
 
 #define DEBUG_SUBSYSTEM S_RPC
-
+#ifndef __KERNEL__
+#include <liblustre.h>
+#include <portals/lib-types.h>
+#endif
 #include <linux/obd_support.h>
 #include <linux/lustre_net.h>
 #include <linux/lustre_lib.h>
 #include <linux/obd.h>
 
-extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq,
-        bulk_put_source_eq, bulk_put_sink_eq, 
-        bulk_get_source_eq, bulk_get_sink_eq;
-
 static int ptl_send_buf(struct ptlrpc_request *request,
                         struct ptlrpc_connection *conn, int portal)
 {
         int rc;
         ptl_process_id_t remote_id;
         ptl_handle_md_t md_h;
+        ptl_ack_req_t ack_req;
 
         LASSERT(conn);
+        CDEBUG (D_INFO, "conn=%p ni %s nid "LPX64" on %s\n", 
+                conn, conn->c_peer.peer_ni->pni_name,
+                conn->c_peer.peer_nid, conn->c_peer.peer_ni->pni_name);
 
         request->rq_req_md.user_ptr = request;
 
@@ -47,24 +50,35 @@ static int ptl_send_buf(struct ptlrpc_request *request,
                 request->rq_reqmsg->type = HTON__u32(request->rq_type);
                 request->rq_req_md.start = request->rq_reqmsg;
                 request->rq_req_md.length = request->rq_reqlen;
-                request->rq_req_md.eventq = request_out_eq;
+                request->rq_req_md.eventq = conn->c_peer.peer_ni->pni_request_out_eq_h;
                 break;
         case PTL_RPC_MSG_ERR:
         case PTL_RPC_MSG_REPLY:
                 request->rq_repmsg->type = HTON__u32(request->rq_type);
                 request->rq_req_md.start = request->rq_repmsg;
                 request->rq_req_md.length = request->rq_replen;
-                request->rq_req_md.eventq = reply_out_eq;
+                request->rq_req_md.eventq = conn->c_peer.peer_ni->pni_reply_out_eq_h;
                 break;
         default:
                 LBUG();
                 return -1; /* notreached */
         }
-        request->rq_req_md.threshold = 1;
+        if (request->rq_flags & PTL_RPC_FL_WANT_ACK) {
+                request->rq_req_md.threshold = 2; /* SENT and ACK */
+                ack_req = PTL_ACK_REQ;
+        } else {
+                request->rq_req_md.threshold = 1;
+                ack_req = PTL_NOACK_REQ;
+        }
         request->rq_req_md.options = PTL_MD_OP_PUT;
         request->rq_req_md.user_ptr = request;
 
-        rc = PtlMDBind(conn->c_peer.peer_ni, request->rq_req_md, &md_h);
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE)) {
+                request->rq_req_md.options |= PTL_MD_ACK_DISABLE;
+                obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
+        }
+
+        rc = PtlMDBind(conn->c_peer.peer_ni->pni_ni_h, request->rq_req_md, &md_h);
         if (rc != 0) {
                 CERROR("PtlMDBind failed: %d\n", rc);
                 LBUG();
@@ -79,8 +93,7 @@ static int ptl_send_buf(struct ptlrpc_request *request,
 
         if (!portal)
                 LBUG();
-        rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, request->rq_xid,
-                    0, 0);
+        rc = PtlPut(md_h, ack_req, remote_id, portal, 0, request->rq_xid, 0, 0);
         if (rc != PTL_OK) {
                 CERROR("PtlPut("LPU64", %d, "LPD64") failed: %d\n",
                        remote_id.nid, portal, request->rq_xid, rc);
@@ -117,6 +130,7 @@ ptlrpc_put_bulk_iov (struct ptlrpc_bulk_desc *desc, struct iovec *iov)
 int ptlrpc_bulk_put(struct ptlrpc_bulk_desc *desc)
 {
         int rc;
+        struct ptlrpc_peer *peer;
         struct list_head *tmp, *next;
         ptl_process_id_t remote_id;
         __u32 xid = 0;
@@ -127,10 +141,12 @@ int ptlrpc_bulk_put(struct ptlrpc_bulk_desc *desc)
         if (iov == NULL)
                 RETURN (-ENOMEM);
 
+        peer = &desc->bd_connection->c_peer;
+
         desc->bd_md.start = iov;
         desc->bd_md.niov = 0;
         desc->bd_md.length = 0;
-        desc->bd_md.eventq = bulk_put_source_eq;
+        desc->bd_md.eventq = peer->peer_ni->pni_bulk_put_source_eq_h;
         desc->bd_md.threshold = 2; /* SENT and ACK */
         desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
         desc->bd_md.user_ptr = desc;
@@ -164,7 +180,7 @@ int ptlrpc_bulk_put(struct ptlrpc_bulk_desc *desc)
         LASSERT(desc->bd_md.niov == desc->bd_page_count);
         LASSERT(desc->bd_md.niov != 0);
 
-        rc = PtlMDBind(desc->bd_connection->c_peer.peer_ni, desc->bd_md,
+        rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md,
                        &desc->bd_md_h);
 
         ptlrpc_put_bulk_iov (desc, iov); /*move down to reduce latency to send*/
@@ -175,12 +191,14 @@ int ptlrpc_bulk_put(struct ptlrpc_bulk_desc *desc)
                 RETURN(rc);
         }
 
-        remote_id.nid = desc->bd_connection->c_peer.peer_nid;
+        remote_id.nid = peer->peer_nid;
         remote_id.pid = 0;
 
-        CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d nid "LPX64" pid "
-               "%d xid %d\n", desc->bd_md.niov, desc->bd_md.length,
-               desc->bd_portal, remote_id.nid, remote_id.pid, xid);
+        CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d on %s "
+               "nid "LPX64" pid %d xid %d\n", 
+               desc->bd_md.niov, desc->bd_md.length,
+               desc->bd_portal, peer->peer_ni->pni_name,
+               remote_id.nid, remote_id.pid, xid);
 
         rc = PtlPut(desc->bd_md_h, PTL_ACK_REQ, remote_id,
                     desc->bd_portal, 0, xid, 0, 0);
@@ -198,6 +216,7 @@ int ptlrpc_bulk_put(struct ptlrpc_bulk_desc *desc)
 int ptlrpc_bulk_get(struct ptlrpc_bulk_desc *desc)
 {
         int rc;
+        struct ptlrpc_peer *peer;
         struct list_head *tmp, *next;
         ptl_process_id_t remote_id;
         __u32 xid = 0;
@@ -208,10 +227,12 @@ int ptlrpc_bulk_get(struct ptlrpc_bulk_desc *desc)
         if (iov == NULL)
                 RETURN (-ENOMEM);
 
+        peer = &desc->bd_connection->c_peer;
+
         desc->bd_md.start = iov;
         desc->bd_md.niov = 0;
         desc->bd_md.length = 0;
-        desc->bd_md.eventq = bulk_get_sink_eq;
+        desc->bd_md.eventq = peer->peer_ni->pni_bulk_get_sink_eq_h;
         desc->bd_md.threshold = 2; /* SENT and REPLY */
         desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_IOV;
         desc->bd_md.user_ptr = desc;
@@ -231,10 +252,10 @@ int ptlrpc_bulk_get(struct ptlrpc_bulk_desc *desc)
                 iov[desc->bd_md.niov].iov_base = bulk->bp_buf;
                 iov[desc->bd_md.niov].iov_len = bulk->bp_buflen;
                 if (iov[desc->bd_md.niov].iov_len <= 0) {
-                        CERROR("bad bp_buflen[%d] @ %p: %d\n", desc->bd_md.niov,
-                               bulk->bp_buf, bulk->bp_buflen);
-                        CERROR("desc: xid %u, pages %d, ptl %d, ref %d\n",
-                               xid, desc->bd_page_count, desc->bd_portal,
+                        CERROR("bad bulk %p bp_buflen[%d] @ %p: %d\n", bulk,
+                               desc->bd_md.niov, bulk->bp_buf, bulk->bp_buflen);
+                        CERROR("desc %p: xid %u, pages %d, ptl %d, ref %d\n",
+                               desc, xid, desc->bd_page_count, desc->bd_portal,
                                atomic_read(&desc->bd_refcount));
                         LBUG();
                 }
@@ -245,7 +266,7 @@ int ptlrpc_bulk_get(struct ptlrpc_bulk_desc *desc)
         LASSERT(desc->bd_md.niov == desc->bd_page_count);
         LASSERT(desc->bd_md.niov != 0);
 
-        rc = PtlMDBind(desc->bd_connection->c_peer.peer_ni, desc->bd_md,
+        rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md,
                        &desc->bd_md_h);
 
         ptlrpc_put_bulk_iov (desc, iov); /*move down to reduce latency to send*/
@@ -259,9 +280,11 @@ int ptlrpc_bulk_get(struct ptlrpc_bulk_desc *desc)
         remote_id.nid = desc->bd_connection->c_peer.peer_nid;
         remote_id.pid = 0;
 
-        CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d nid "LPX64" pid "
-               "%d xid %d\n", desc->bd_md.niov, desc->bd_md.length,
-               desc->bd_portal, remote_id.nid, remote_id.pid, xid);
+        CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d on %s "
+               "nid "LPX64" pid %d xid %d\n", 
+               desc->bd_md.niov, desc->bd_md.length,
+               desc->bd_portal, peer->peer_ni->pni_name,
+               remote_id.nid, remote_id.pid, xid);
 
         rc = PtlGet(desc->bd_md_h, remote_id, desc->bd_portal, 0, xid, 0);
         if (rc != PTL_OK) {
@@ -277,6 +300,7 @@ int ptlrpc_bulk_get(struct ptlrpc_bulk_desc *desc)
 
 static int ptlrpc_register_bulk_shared(struct ptlrpc_bulk_desc *desc)
 {
+        struct ptlrpc_peer *peer;
         struct list_head *tmp, *next;
         int rc;
         __u32 xid = 0;
@@ -294,6 +318,8 @@ static int ptlrpc_register_bulk_shared(struct ptlrpc_bulk_desc *desc)
         if (iov == NULL)
                 return (-ENOMEM);
 
+        peer = &desc->bd_connection->c_peer;
+        
         desc->bd_md.start = iov;
         desc->bd_md.niov = 0;
         desc->bd_md.length = 0;
@@ -322,7 +348,7 @@ static int ptlrpc_register_bulk_shared(struct ptlrpc_bulk_desc *desc)
         source_id.nid = desc->bd_connection->c_peer.peer_nid;
         source_id.pid = PTL_PID_ANY;
 
-        rc = PtlMEAttach(desc->bd_connection->c_peer.peer_ni,
+        rc = PtlMEAttach(peer->peer_ni->pni_ni_h,
                          desc->bd_portal, source_id, xid, 0,
                          PTL_UNLINK, PTL_INS_AFTER, &desc->bd_me_h);
 
@@ -343,8 +369,8 @@ static int ptlrpc_register_bulk_shared(struct ptlrpc_bulk_desc *desc)
         ptlrpc_put_bulk_iov (desc, iov);
 
         CDEBUG(D_NET, "Setup bulk sink buffers: %u pages %u bytes, xid %u, "
-               "portal %u\n", desc->bd_md.niov, desc->bd_md.length,
-               xid, desc->bd_portal);
+               "portal %u on %s\n", desc->bd_md.niov, desc->bd_md.length,
+               xid, desc->bd_portal, peer->peer_ni->pni_name);
 
         RETURN(0);
 
@@ -358,7 +384,8 @@ static int ptlrpc_register_bulk_shared(struct ptlrpc_bulk_desc *desc)
 int ptlrpc_register_bulk_get(struct ptlrpc_bulk_desc *desc)
 {
         desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_IOV;
-        desc->bd_md.eventq = bulk_get_source_eq;
+        desc->bd_md.eventq = 
+                desc->bd_connection->c_peer.peer_ni->pni_bulk_get_source_eq_h;
 
         return ptlrpc_register_bulk_shared(desc);
 }
@@ -366,7 +393,8 @@ int ptlrpc_register_bulk_get(struct ptlrpc_bulk_desc *desc)
 int ptlrpc_register_bulk_put(struct ptlrpc_bulk_desc *desc)
 {
         desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
-        desc->bd_md.eventq = bulk_put_sink_eq;
+        desc->bd_md.eventq = 
+                desc->bd_connection->c_peer.peer_ni->pni_bulk_put_sink_eq_h;
 
         return ptlrpc_register_bulk_shared(desc);
 }
@@ -391,6 +419,13 @@ void obd_brw_set_add(struct obd_brw_set *set, struct ptlrpc_bulk_desc *desc)
         list_add(&desc->bd_set_chain, &set->brw_desc_head);
 }
 
+void obd_brw_set_del(struct ptlrpc_bulk_desc *desc)
+{
+        atomic_dec(&desc->bd_brw_set->brw_refcount);
+        list_del_init(&desc->bd_set_chain);
+        ptlrpc_bulk_decref(desc);
+}
+
 struct obd_brw_set *obd_brw_set_new(void)
 {
         struct obd_brw_set *set;
@@ -411,11 +446,6 @@ void obd_brw_set_free(struct obd_brw_set *set)
         struct list_head *tmp, *next;
         ENTRY;
 
-        if (!list_empty(&set->brw_desc_head)) {
-                EXIT;
-                return;
-        }
-
         list_for_each_safe(tmp, next, &set->brw_desc_head) {
                 struct ptlrpc_bulk_desc *desc =
                         list_entry(tmp, struct ptlrpc_bulk_desc, bd_set_chain);
@@ -502,7 +532,7 @@ int ptl_send_rpc(struct ptlrpc_request *request)
                         }
                 }
 
-                rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni,
+                rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni->pni_ni_h,
                              request->rq_reply_portal,/* XXX FIXME bug 625069 */
                                  source_id, request->rq_xid, 0, PTL_UNLINK,
                                  PTL_INS_AFTER, &request->rq_reply_me_h);
@@ -517,7 +547,8 @@ int ptl_send_rpc(struct ptlrpc_request *request)
                 request->rq_reply_md.threshold = 1;
                 request->rq_reply_md.options = PTL_MD_OP_PUT;
                 request->rq_reply_md.user_ptr = request;
-                request->rq_reply_md.eventq = reply_in_eq;
+                request->rq_reply_md.eventq =
+                        request->rq_connection->c_peer.peer_ni->pni_reply_in_eq_h;
 
                 rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md,
                                  PTL_UNLINK, NULL);
@@ -528,14 +559,16 @@ int ptl_send_rpc(struct ptlrpc_request *request)
                 }
 
                 CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64
-                       ", portal %u\n",
+                       ", portal %u on %s\n",
                        request->rq_replen, request->rq_xid,
-                       request->rq_reply_portal);
+                       request->rq_reply_portal,
+                       request->rq_connection->c_peer.peer_ni->pni_name);
         }
 
         /* Clear any flags that may be present from previous sends,
-         * except for REPLAY. */
-        request->rq_flags &= PTL_RPC_FL_REPLAY;
+         * except for REPLAY, NO_RESEND and WANT_ACK. */
+        request->rq_flags &= (PTL_RPC_FL_REPLAY | PTL_RPC_FL_NO_RESEND |
+                              PTL_RPC_FL_WANT_ACK);
         rc = ptl_send_buf(request, request->rq_connection,
                           request->rq_request_portal);
         RETURN(rc);
@@ -551,7 +584,8 @@ int ptl_send_rpc(struct ptlrpc_request *request)
 
 void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd)
 {
-        struct ptlrpc_service *service = rqbd->rqbd_service;
+        struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
+        struct ptlrpc_service *service = srv_ni->sni_service;
         static ptl_process_id_t match_id = {PTL_NID_ANY, PTL_PID_ANY};
         int rc;
         ptl_md_t dummy;
@@ -559,8 +593,13 @@ void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd)
 
         LASSERT(atomic_read(&rqbd->rqbd_refcount) == 0);
 
+        CDEBUG(D_NET, "PtlMEAttach: portal %d on %s h %lx.%lx\n",
+               service->srv_req_portal, srv_ni->sni_ni->pni_name,
+               srv_ni->sni_ni->pni_ni_h.nal_idx,
+               srv_ni->sni_ni->pni_ni_h.handle_idx);
+
         /* Attach the leading ME on which we build the ring */
-        rc = PtlMEAttach(service->srv_self.peer_ni, service->srv_req_portal,
+        rc = PtlMEAttach(srv_ni->sni_ni->pni_ni_h, service->srv_req_portal,
                          match_id, 0, ~0,
                          PTL_UNLINK, PTL_INS_AFTER, &rqbd->rqbd_me_h);
         if (rc != PTL_OK) {
@@ -574,9 +613,9 @@ void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd)
         dummy.threshold  = PTL_MD_THRESH_INF;
         dummy.options    = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | PTL_MD_AUTO_UNLINK;
         dummy.user_ptr   = rqbd;
-        dummy.eventq     = service->srv_eq_h;
+        dummy.eventq     = srv_ni->sni_eq_h;
 
-        atomic_inc(&service->srv_nrqbds_receiving);
+        atomic_inc(&srv_ni->sni_nrqbds_receiving);
         atomic_set(&rqbd->rqbd_refcount, 1);   /* 1 ref for portals */
 
         rc = PtlMDAttach(rqbd->rqbd_me_h, dummy, PTL_UNLINK, &md_h);
@@ -586,6 +625,6 @@ void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd)
 #warning proper cleanup required
                 PtlMEUnlink (rqbd->rqbd_me_h);
                 atomic_set(&rqbd->rqbd_refcount, 0);
-                atomic_dec(&service->srv_nrqbds_receiving);
+                atomic_dec(&srv_ni->sni_nrqbds_receiving);
         }
 }