repbody = lustre_msg_buf(req->rq_repmsg, 0);
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
- req->rq_status = obd_punch(conn, &repbody->oa, NULL,
+ req->rq_status = obd_punch(conn, &repbody->oa, NULL,
repbody->oa.o_blocks, repbody->oa.o_size);
RETURN(0);
}
ptlrpc_reply(req->rq_svc, req);
lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
- rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD, &lwi);
+ rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD,
+ &lwi);
if (rc) {
if (rc != -ETIMEDOUT)
LBUG();
GOTO(out, rc);
}
- if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
- CERROR("lustre_ost: wrong packet type sent %d\n",
- req->rq_reqmsg->type);
- GOTO(out, rc = -EINVAL);
- }
-
if (req->rq_reqmsg->opc != OST_CONNECT &&
req->rq_export == NULL) {
CERROR("lustre_ost: operation %d on unconnected OST\n",
}
ost->ost_service = ptlrpc_init_svc(64 * 1024, OST_REQUEST_PORTAL,
- OSC_REPLY_PORTAL, "self",ost_handle,
+ OSC_REPLY_PORTAL, "self", ost_handle,
"ost");
if (!ost->ost_service) {
CERROR("failed to start service\n");
}
request->rq_level = LUSTRE_CONN_FULL;
- request->rq_type = PTL_RPC_TYPE_REQUEST;
+ request->rq_type = PTL_RPC_MSG_REQUEST;
request->rq_import = imp;
request->rq_connection = ptlrpc_connection_addref(conn);
request->rq_reqmsg->magic = PTLRPC_MSG_MAGIC;
request->rq_reqmsg->version = PTLRPC_MSG_VERSION;
request->rq_reqmsg->opc = HTON__u32(opcode);
- request->rq_reqmsg->type = HTON__u32(PTL_RPC_MSG_REQUEST);
ptlrpc_hdl2req(request, &imp->imp_handle);
RETURN(request);
// up(&cli->cli_rpc_sem);
if (req->rq_flags & PTL_RPC_FL_INTR) {
if (!(req->rq_flags & PTL_RPC_FL_TIMEOUT))
- LBUG(); /* should only be interrupted if we timed out. */
+ LBUG(); /* should only be interrupted if we timed out */
/* Clean up the dangling reply buffers */
ptlrpc_abort(req);
GOTO(out, rc = -EINTR);
CERROR("unpack_rep failed: %d\n", rc);
GOTO(out, rc);
}
+ if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
+ req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
+ CERROR("invalid packet type received (type=%u)\n",
+ req->rq_repmsg->type);
+ LBUG();
+ GOTO(out, rc = -EINVAL);
+ }
+
CDEBUG(D_NET, "got rep %Ld\n", req->rq_xid);
if (req->rq_repmsg->status == 0)
CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
int rc;
ptl_process_id_t remote_id;
ptl_handle_md_t md_h;
- ptl_ack_req_t ack;
request->rq_req_md.user_ptr = request;
switch (request->rq_type) {
- case PTL_RPC_TYPE_REQUEST:
+ case PTL_RPC_MSG_REQUEST:
+ request->rq_reqmsg->type = HTON__u32(request->rq_type);
request->rq_req_md.start = request->rq_reqmsg;
request->rq_req_md.length = request->rq_reqlen;
request->rq_req_md.eventq = request_out_eq;
- request->rq_req_md.threshold = 1;
- ack = PTL_NOACK_REQ;
break;
- case PTL_RPC_TYPE_REPLY:
+ case PTL_RPC_MSG_REPLY:
+ request->rq_repmsg->type = HTON__u32(request->rq_type);
request->rq_req_md.start = request->rq_repmsg;
request->rq_req_md.length = request->rq_replen;
request->rq_req_md.eventq = reply_out_eq;
- request->rq_req_md.threshold = 1;
- ack = PTL_NOACK_REQ;
break;
default:
LBUG();
return -1; /* notreached */
}
+ request->rq_req_md.threshold = 1;
request->rq_req_md.options = PTL_MD_OP_PUT;
request->rq_req_md.user_ptr = request;
rc = PtlMDBind(conn->c_peer.peer_ni, request->rq_req_md, &md_h);
- //CERROR("MDBind (outgoing req/rep/bulk): %Lu\n", (__u64)md_h);
if (rc != 0) {
CERROR("PtlMDBind failed: %d\n", rc);
LBUG();
CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %Ld\n",
request->rq_req_md.length, portal, request->rq_xid);
- rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_xid,
+ rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, request->rq_xid,
0, 0);
if (rc != PTL_OK) {
CERROR("PtlPut(%Lu, %d, %Ld) failed: %d\n", remote_id.nid,
ptlrpc_get_bulk_iov (struct ptlrpc_bulk_desc *desc)
{
struct iovec *iov;
-
+
if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (struct iovec))
return (desc->bd_iov);
-
+
OBD_ALLOC (iov, desc->bd_page_count * sizeof (struct iovec));
if (iov == NULL)
LBUG();
-
+
return (iov);
}
if (desc->bd_md.niov == 0)
xid = bulk->bp_xid;
LASSERT (xid == bulk->bp_xid); /* should all be the same */
-
+
iov[desc->bd_md.niov].iov_base = bulk->bp_buf;
iov[desc->bd_md.niov].iov_len = bulk->bp_buflen;
desc->bd_md.niov++;
desc->bd_md.length += bulk->bp_buflen;
}
-
+
LASSERT (desc->bd_md.niov == desc->bd_page_count);
LASSERT (desc->bd_md.niov != 0);
-
+
rc = PtlMDBind(desc->bd_connection->c_peer.peer_ni, desc->bd_md,
&desc->bd_md_h);
LBUG();
RETURN(rc);
}
-
+
remote_id.nid = desc->bd_connection->c_peer.peer_nid;
remote_id.pid = 0;
-
+
CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d nid %Lx pid %d xid %d\n",
- desc->bd_md.niov, desc->bd_md.length,
+ desc->bd_md.niov, desc->bd_md.length,
desc->bd_portal, remote_id.nid, remote_id.pid, xid);
-
+
rc = PtlPut(desc->bd_md_h, PTL_ACK_REQ, remote_id,
desc->bd_portal, 0, xid, 0, 0);
if (rc != PTL_OK) {
if (desc->bd_md.niov == 0)
xid = bulk->bp_xid;
LASSERT (xid == bulk->bp_xid); /* should all be the same */
-
+
iov[desc->bd_md.niov].iov_base = bulk->bp_buf;
iov[desc->bd_md.niov].iov_len = bulk->bp_buflen;
desc->bd_md.niov++;
LASSERT (desc->bd_md.niov == desc->bd_page_count);
LASSERT (desc->bd_md.niov != 0);
-
+
rc = PtlMEAttach(desc->bd_connection->c_peer.peer_ni,
desc->bd_portal, local_id, xid, 0,
PTL_UNLINK, PTL_INS_AFTER, &desc->bd_me_h);
LBUG();
GOTO(cleanup, rc);
}
-
+
rc = PtlMDAttach(desc->bd_me_h, desc->bd_md, PTL_UNLINK,
&desc->bd_md_h);
if (rc != PTL_OK) {
LBUG();
GOTO(cleanup, rc);
}
-
+
CDEBUG(D_NET, "Setup bulk sink buffers: %u pages %u bytes, xid %u, "
- "portal %u\n", desc->bd_md.niov, desc->bd_md.length,
+ "portal %u\n", desc->bd_md.niov, desc->bd_md.length,
xid, desc->bd_portal);
RETURN(0);
}
/* FIXME: we need to increment the count of handled events */
- req->rq_type = PTL_RPC_TYPE_REPLY;
+ if (req->rq_type != PTL_RPC_MSG_ERR)
+ req->rq_type = PTL_RPC_MSG_REPLY;
//req->rq_repmsg->conn = req->rq_connection->c_remote_conn;
//req->rq_repmsg->token = req->rq_connection->c_remote_token;
req->rq_repmsg->status = HTON__u32(req->rq_status);
- req->rq_repmsg->type = HTON__u32(req->rq_type);
return ptl_send_buf(req, req->rq_connection, svc->srv_rep_portal);
}
RETURN(rc);
}
-
int ptl_send_rpc(struct ptlrpc_request *request)
{
int rc;
ENTRY;
- if (NTOH__u32(request->rq_reqmsg->type) != PTL_RPC_MSG_REQUEST) {
+ if (request->rq_type != PTL_RPC_MSG_REQUEST) {
CERROR("wrong packet type sent %d\n",
NTOH__u32(request->rq_reqmsg->type));
LBUG();
GOTO(cleanup, rc);
}
- request->rq_type = PTL_RPC_TYPE_REQUEST;
request->rq_reply_md.start = repbuf;
request->rq_reply_md.length = request->rq_replen;
request->rq_reply_md.threshold = 1;
CERROR("PtlMEAttach failed: %d\n", rc);
LBUG();
}
-
+
if (service->srv_ref_count[i])
LBUG();
dummy.user_ptr = service;
dummy.eventq = service->srv_eq_h;
dummy.max_offset = service->srv_buf_size;
-
+
rc = PtlMDAttach(service->srv_me_h[i], dummy, PTL_UNLINK, &md_h);
if (rc != PTL_OK) {
/* cleanup */
CERROR("PtlMDAttach failed: %d\n", rc);
LBUG();
}
-}
+}
/* ptl_handled_rpc() should be called by the sleeping process once
* it finishes processing an event. This ensures the ref count is
* decremented and that the rpc ring buffer cycles properly.
- */
-int ptl_handled_rpc(struct ptlrpc_service *service, void *start)
+ */
+int ptl_handled_rpc(struct ptlrpc_service *service, void *start)
{
int index;
spin_lock(&service->srv_lock);
for (index = 0; index < service->srv_ring_length; index++)
- if (service->srv_buf[index] == start)
+ if (service->srv_buf[index] == start)
break;
if (index == service->srv_ring_length)
if (service->srv_ref_count[index] == 0 &&
!ptl_is_valid_handle(&(service->srv_me_h[index]))) {
- CDEBUG(D_NET, "relinking %d\n", index);
- ptlrpc_link_svc_me(service, index);
+ CDEBUG(D_NET, "relinking %d\n", index);
+ ptlrpc_link_svc_me(service, index);
}
-
+
spin_unlock(&service->srv_lock);
return 0;
}