*/
#define DEBUG_SUBSYSTEM S_RPC
-
+#ifndef __KERNEL__
+#include <liblustre.h>
+#include <portals/lib-types.h>
+#endif
#include <linux/obd_support.h>
#include <linux/lustre_net.h>
#include <linux/lustre_lib.h>
#include <linux/obd.h>
-extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq,
- bulk_put_source_eq, bulk_put_sink_eq,
- bulk_get_source_eq, bulk_get_sink_eq;
-
static int ptl_send_buf(struct ptlrpc_request *request,
struct ptlrpc_connection *conn, int portal)
{
int rc;
ptl_process_id_t remote_id;
ptl_handle_md_t md_h;
+ ptl_ack_req_t ack_req;
LASSERT(conn);
+ CDEBUG (D_INFO, "conn=%p ni %s nid "LPX64" on %s\n",
+ conn, conn->c_peer.peer_ni->pni_name,
+ conn->c_peer.peer_nid, conn->c_peer.peer_ni->pni_name);
request->rq_req_md.user_ptr = request;
request->rq_reqmsg->type = HTON__u32(request->rq_type);
request->rq_req_md.start = request->rq_reqmsg;
request->rq_req_md.length = request->rq_reqlen;
- request->rq_req_md.eventq = request_out_eq;
+ request->rq_req_md.eventq = conn->c_peer.peer_ni->pni_request_out_eq_h;
break;
case PTL_RPC_MSG_ERR:
case PTL_RPC_MSG_REPLY:
request->rq_repmsg->type = HTON__u32(request->rq_type);
request->rq_req_md.start = request->rq_repmsg;
request->rq_req_md.length = request->rq_replen;
- request->rq_req_md.eventq = reply_out_eq;
+ request->rq_req_md.eventq = conn->c_peer.peer_ni->pni_reply_out_eq_h;
break;
default:
LBUG();
return -1; /* notreached */
}
- request->rq_req_md.threshold = 1;
+ if (request->rq_flags & PTL_RPC_FL_WANT_ACK) {
+ request->rq_req_md.threshold = 2; /* SENT and ACK */
+ ack_req = PTL_ACK_REQ;
+ } else {
+ request->rq_req_md.threshold = 1;
+ ack_req = PTL_NOACK_REQ;
+ }
request->rq_req_md.options = PTL_MD_OP_PUT;
request->rq_req_md.user_ptr = request;
- rc = PtlMDBind(conn->c_peer.peer_ni, request->rq_req_md, &md_h);
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE)) {
+ request->rq_req_md.options |= PTL_MD_ACK_DISABLE;
+ obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
+ }
+
+ rc = PtlMDBind(conn->c_peer.peer_ni->pni_ni_h, request->rq_req_md, &md_h);
if (rc != 0) {
CERROR("PtlMDBind failed: %d\n", rc);
LBUG();
if (!portal)
LBUG();
- rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, request->rq_xid,
- 0, 0);
+ rc = PtlPut(md_h, ack_req, remote_id, portal, 0, request->rq_xid, 0, 0);
if (rc != PTL_OK) {
CERROR("PtlPut("LPU64", %d, "LPD64") failed: %d\n",
remote_id.nid, portal, request->rq_xid, rc);
int ptlrpc_bulk_put(struct ptlrpc_bulk_desc *desc)
{
int rc;
+ struct ptlrpc_peer *peer;
struct list_head *tmp, *next;
ptl_process_id_t remote_id;
__u32 xid = 0;
if (iov == NULL)
RETURN (-ENOMEM);
+ peer = &desc->bd_connection->c_peer;
+
desc->bd_md.start = iov;
desc->bd_md.niov = 0;
desc->bd_md.length = 0;
- desc->bd_md.eventq = bulk_put_source_eq;
+ desc->bd_md.eventq = peer->peer_ni->pni_bulk_put_source_eq_h;
desc->bd_md.threshold = 2; /* SENT and ACK */
desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
desc->bd_md.user_ptr = desc;
LASSERT(desc->bd_md.niov == desc->bd_page_count);
LASSERT(desc->bd_md.niov != 0);
- rc = PtlMDBind(desc->bd_connection->c_peer.peer_ni, desc->bd_md,
+ rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md,
&desc->bd_md_h);
ptlrpc_put_bulk_iov (desc, iov); /*move down to reduce latency to send*/
RETURN(rc);
}
- remote_id.nid = desc->bd_connection->c_peer.peer_nid;
+ remote_id.nid = peer->peer_nid;
remote_id.pid = 0;
- CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d nid "LPX64" pid "
- "%d xid %d\n", desc->bd_md.niov, desc->bd_md.length,
- desc->bd_portal, remote_id.nid, remote_id.pid, xid);
+ CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d on %s "
+ "nid "LPX64" pid %d xid %d\n",
+ desc->bd_md.niov, desc->bd_md.length,
+ desc->bd_portal, peer->peer_ni->pni_name,
+ remote_id.nid, remote_id.pid, xid);
rc = PtlPut(desc->bd_md_h, PTL_ACK_REQ, remote_id,
desc->bd_portal, 0, xid, 0, 0);
int ptlrpc_bulk_get(struct ptlrpc_bulk_desc *desc)
{
int rc;
+ struct ptlrpc_peer *peer;
struct list_head *tmp, *next;
ptl_process_id_t remote_id;
__u32 xid = 0;
if (iov == NULL)
RETURN (-ENOMEM);
+ peer = &desc->bd_connection->c_peer;
+
desc->bd_md.start = iov;
desc->bd_md.niov = 0;
desc->bd_md.length = 0;
- desc->bd_md.eventq = bulk_get_sink_eq;
+ desc->bd_md.eventq = peer->peer_ni->pni_bulk_get_sink_eq_h;
desc->bd_md.threshold = 2; /* SENT and REPLY */
desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_IOV;
desc->bd_md.user_ptr = desc;
iov[desc->bd_md.niov].iov_base = bulk->bp_buf;
iov[desc->bd_md.niov].iov_len = bulk->bp_buflen;
if (iov[desc->bd_md.niov].iov_len <= 0) {
- CERROR("bad bp_buflen[%d] @ %p: %d\n", desc->bd_md.niov,
- bulk->bp_buf, bulk->bp_buflen);
- CERROR("desc: xid %u, pages %d, ptl %d, ref %d\n",
- xid, desc->bd_page_count, desc->bd_portal,
+ CERROR("bad bulk %p bp_buflen[%d] @ %p: %d\n", bulk,
+ desc->bd_md.niov, bulk->bp_buf, bulk->bp_buflen);
+ CERROR("desc %p: xid %u, pages %d, ptl %d, ref %d\n",
+ desc, xid, desc->bd_page_count, desc->bd_portal,
atomic_read(&desc->bd_refcount));
LBUG();
}
LASSERT(desc->bd_md.niov == desc->bd_page_count);
LASSERT(desc->bd_md.niov != 0);
- rc = PtlMDBind(desc->bd_connection->c_peer.peer_ni, desc->bd_md,
+ rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md,
&desc->bd_md_h);
ptlrpc_put_bulk_iov (desc, iov); /*move down to reduce latency to send*/
remote_id.nid = desc->bd_connection->c_peer.peer_nid;
remote_id.pid = 0;
- CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d nid "LPX64" pid "
- "%d xid %d\n", desc->bd_md.niov, desc->bd_md.length,
- desc->bd_portal, remote_id.nid, remote_id.pid, xid);
+ CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d on %s "
+ "nid "LPX64" pid %d xid %d\n",
+ desc->bd_md.niov, desc->bd_md.length,
+ desc->bd_portal, peer->peer_ni->pni_name,
+ remote_id.nid, remote_id.pid, xid);
rc = PtlGet(desc->bd_md_h, remote_id, desc->bd_portal, 0, xid, 0);
if (rc != PTL_OK) {
static int ptlrpc_register_bulk_shared(struct ptlrpc_bulk_desc *desc)
{
+ struct ptlrpc_peer *peer;
struct list_head *tmp, *next;
int rc;
__u32 xid = 0;
if (iov == NULL)
return (-ENOMEM);
+ peer = &desc->bd_connection->c_peer;
+
desc->bd_md.start = iov;
desc->bd_md.niov = 0;
desc->bd_md.length = 0;
source_id.nid = desc->bd_connection->c_peer.peer_nid;
source_id.pid = PTL_PID_ANY;
- rc = PtlMEAttach(desc->bd_connection->c_peer.peer_ni,
+ rc = PtlMEAttach(peer->peer_ni->pni_ni_h,
desc->bd_portal, source_id, xid, 0,
PTL_UNLINK, PTL_INS_AFTER, &desc->bd_me_h);
ptlrpc_put_bulk_iov (desc, iov);
CDEBUG(D_NET, "Setup bulk sink buffers: %u pages %u bytes, xid %u, "
- "portal %u\n", desc->bd_md.niov, desc->bd_md.length,
- xid, desc->bd_portal);
+ "portal %u on %s\n", desc->bd_md.niov, desc->bd_md.length,
+ xid, desc->bd_portal, peer->peer_ni->pni_name);
RETURN(0);
int ptlrpc_register_bulk_get(struct ptlrpc_bulk_desc *desc)
{
desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_IOV;
- desc->bd_md.eventq = bulk_get_source_eq;
+ desc->bd_md.eventq =
+ desc->bd_connection->c_peer.peer_ni->pni_bulk_get_source_eq_h;
return ptlrpc_register_bulk_shared(desc);
}
int ptlrpc_register_bulk_put(struct ptlrpc_bulk_desc *desc)
{
desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
- desc->bd_md.eventq = bulk_put_sink_eq;
+ desc->bd_md.eventq =
+ desc->bd_connection->c_peer.peer_ni->pni_bulk_put_sink_eq_h;
return ptlrpc_register_bulk_shared(desc);
}
list_add(&desc->bd_set_chain, &set->brw_desc_head);
}
+void obd_brw_set_del(struct ptlrpc_bulk_desc *desc)
+{
+ atomic_dec(&desc->bd_brw_set->brw_refcount);
+ list_del_init(&desc->bd_set_chain);
+ ptlrpc_bulk_decref(desc);
+}
+
struct obd_brw_set *obd_brw_set_new(void)
{
struct obd_brw_set *set;
struct list_head *tmp, *next;
ENTRY;
- if (!list_empty(&set->brw_desc_head)) {
- EXIT;
- return;
- }
-
list_for_each_safe(tmp, next, &set->brw_desc_head) {
struct ptlrpc_bulk_desc *desc =
list_entry(tmp, struct ptlrpc_bulk_desc, bd_set_chain);
}
}
- rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni,
+ rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni->pni_ni_h,
request->rq_reply_portal,/* XXX FIXME bug 625069 */
source_id, request->rq_xid, 0, PTL_UNLINK,
PTL_INS_AFTER, &request->rq_reply_me_h);
request->rq_reply_md.threshold = 1;
request->rq_reply_md.options = PTL_MD_OP_PUT;
request->rq_reply_md.user_ptr = request;
- request->rq_reply_md.eventq = reply_in_eq;
+ request->rq_reply_md.eventq =
+ request->rq_connection->c_peer.peer_ni->pni_reply_in_eq_h;
rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md,
PTL_UNLINK, NULL);
}
CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64
- ", portal %u\n",
+ ", portal %u on %s\n",
request->rq_replen, request->rq_xid,
- request->rq_reply_portal);
+ request->rq_reply_portal,
+ request->rq_connection->c_peer.peer_ni->pni_name);
}
/* Clear any flags that may be present from previous sends,
- * except for REPLAY. */
- request->rq_flags &= PTL_RPC_FL_REPLAY;
+ * except for REPLAY, NO_RESEND and WANT_ACK. */
+ request->rq_flags &= (PTL_RPC_FL_REPLAY | PTL_RPC_FL_NO_RESEND |
+ PTL_RPC_FL_WANT_ACK);
rc = ptl_send_buf(request, request->rq_connection,
request->rq_request_portal);
RETURN(rc);
void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd)
{
- struct ptlrpc_service *service = rqbd->rqbd_service;
+ struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
+ struct ptlrpc_service *service = srv_ni->sni_service;
static ptl_process_id_t match_id = {PTL_NID_ANY, PTL_PID_ANY};
int rc;
ptl_md_t dummy;
LASSERT(atomic_read(&rqbd->rqbd_refcount) == 0);
+ CDEBUG(D_NET, "PtlMEAttach: portal %d on %s h %lx.%lx\n",
+ service->srv_req_portal, srv_ni->sni_ni->pni_name,
+ srv_ni->sni_ni->pni_ni_h.nal_idx,
+ srv_ni->sni_ni->pni_ni_h.handle_idx);
+
/* Attach the leading ME on which we build the ring */
- rc = PtlMEAttach(service->srv_self.peer_ni, service->srv_req_portal,
+ rc = PtlMEAttach(srv_ni->sni_ni->pni_ni_h, service->srv_req_portal,
match_id, 0, ~0,
PTL_UNLINK, PTL_INS_AFTER, &rqbd->rqbd_me_h);
if (rc != PTL_OK) {
dummy.threshold = PTL_MD_THRESH_INF;
dummy.options = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | PTL_MD_AUTO_UNLINK;
dummy.user_ptr = rqbd;
- dummy.eventq = service->srv_eq_h;
+ dummy.eventq = srv_ni->sni_eq_h;
- atomic_inc(&service->srv_nrqbds_receiving);
+ atomic_inc(&srv_ni->sni_nrqbds_receiving);
atomic_set(&rqbd->rqbd_refcount, 1); /* 1 ref for portals */
rc = PtlMDAttach(rqbd->rqbd_me_h, dummy, PTL_UNLINK, &md_h);
#warning proper cleanup required
PtlMEUnlink (rqbd->rqbd_me_h);
atomic_set(&rqbd->rqbd_refcount, 0);
- atomic_dec(&service->srv_nrqbds_receiving);
+ atomic_dec(&srv_ni->sni_nrqbds_receiving);
}
}