to the tip, also, which is going to break the tip. You have been warned.
int err;
memset(cl, 0, sizeof(*cl));
+ spin_lock_init(&cl->cli_lock);
cl->cli_xid = 1;
cl->cli_obd = NULL;
cl->cli_request_portal = req_portal;
return err;
}
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer)
+{
+ struct ptlrpc_bulk_desc *bulk;
+
+ OBD_ALLOC(bulk, sizeof(*bulk));
+ if (bulk != NULL) {
+ memset(bulk, 0, sizeof(*bulk));
+ memcpy(&bulk->b_peer, peer, sizeof(*peer));
+ init_waitqueue_head(&bulk->b_waitq);
+ }
+
+ return bulk;
+}
+
struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
int opcode, int namelen, char *name,
int tgtlen, char *tgt)
}
memset(request, 0, sizeof(*request));
+
+ spin_lock(&cl->cli_lock);
request->rq_xid = cl->cli_xid++;
+ spin_unlock(&cl->cli_lock);
rc = cl->cli_req_pack(name, namelen, tgt, tgtlen,
&request->rq_reqhdr, &request->rq_req,
return 1;
}
- CERROR("no event yet\n");
return 0;
}
request->rq_repbuf = NULL;
request->rq_replen = 0;
- if (request->rq_bulklen != 0) {
- PtlMEUnlink(request->rq_bulk_me_h);
- PtlMDUnlink(request->rq_bulk_md_h);
- /* FIXME: wake whoever's sleeping on this bulk sending to let
- * -them- clean it up. */
- }
-
return 0;
}
rc = ptl_send_rpc(req, &cl->cli_server);
}
if (rc) {
- CERROR("error %d, opcode %d\n", rc,
- req->rq_reqhdr->opc);
+ CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc);
return -rc;
}
CDEBUG(0, "-- sleeping\n");
- wait_event_interruptible(req->rq_wait_for_rep,
- ptlrpc_check_reply(req));
+ wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
CDEBUG(0, "-- done\n");
if (req->rq_flags == PTL_RPC_INTR) {
return -EINTR;
}
- rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, &req->rq_rep);
+ rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen,
+ &req->rq_rephdr, &req->rq_rep);
if (rc) {
CERROR("unpack_rep failed: %d\n", rc);
return rc;
CERROR("got rep %d\n", req->rq_rephdr->xid);
if ( req->rq_rephdr->status == 0 )
- CDEBUG(0, "--> buf %p len %d status %d\n",
- req->rq_repbuf, req->rq_replen,
- req->rq_rephdr->status);
+ CDEBUG(0, "--> buf %p len %d status %d\n", req->rq_repbuf,
+ req->rq_replen, req->rq_rephdr->status);
EXIT;
return 0;
#include <linux/lustre_net.h>
ptl_handle_eq_t sent_pkt_eq, rcvd_rep_eq, bulk_source_eq, bulk_sink_eq;
+static const ptl_handle_ni_t *socknal_nip = NULL, *qswnal_nip = NULL;
/*
* Free the packet when it has gone out
static int bulk_source_callback(ptl_event_t *ev, void *data)
{
- struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+ struct ptlrpc_bulk_desc *bulk = ev->mem_desc.user_ptr;
ENTRY;
CDEBUG(D_NET, "got SENT event\n");
} else if (ev->type == PTL_EVENT_ACK) {
CDEBUG(D_NET, "got ACK event\n");
- rpc->rq_bulkbuf = NULL;
- wake_up_interruptible(&rpc->rq_wait_for_bulk);
+ bulk->b_flags = PTL_BULK_SENT;
+ wake_up_interruptible(&bulk->b_waitq);
} else {
CERROR("Unexpected event type!\n");
BUG();
static int bulk_sink_callback(ptl_event_t *ev, void *data)
{
- struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+ struct ptlrpc_bulk_desc *bulk = ev->mem_desc.user_ptr;
ENTRY;
if (ev->type == PTL_EVENT_PUT) {
- if (rpc->rq_bulkbuf != ev->mem_desc.start + ev->offset)
+ if (bulk->b_buf != ev->mem_desc.start + ev->offset)
CERROR("bulkbuf != mem_desc -- why?\n");
- //wake_up_interruptible(&rpc->rq_wait_for_bulk);
+ bulk->b_flags = PTL_BULK_RCVD;
+ wake_up_interruptible(&bulk->b_waitq);
} else {
CERROR("Unexpected event type!\n");
BUG();
int ptlrpc_init_portals(void)
{
int rc;
- const ptl_handle_ni_t *nip;
ptl_handle_ni_t ni;
- nip = inter_module_get_request(LUSTRE_NAL "_ni", LUSTRE_NAL);
- if (nip == NULL) {
- CERROR("get_ni failed: is the NAL module loaded?\n");
+ socknal_nip = inter_module_get_request("ksocknal_ni", "ksocknal");
+ qswnal_nip = inter_module_get_request("kqswnal_ni", "kqswnal");
+ if (socknal_nip == NULL && qswnal_nip == NULL) {
+ CERROR("get_ni failed: is a NAL module loaded?\n");
return -EIO;
}
- ni = *nip;
+
+ /* Use the qswnal if it's there */
+ if (qswnal_nip != NULL)
+ ni = *qswnal_nip;
+ else
+ ni = *socknal_nip;
rc = PtlEQAlloc(ni, 128, sent_packet_callback, NULL, &sent_pkt_eq);
if (rc != PTL_OK)
PtlEQFree(bulk_source_eq);
PtlEQFree(bulk_sink_eq);
- inter_module_put(LUSTRE_NAL "_ni");
+ if (qswnal_nip != NULL)
+ inter_module_put("kqswnal_ni");
+ if (socknal_nip != NULL)
+ inter_module_put("ksocknal_ni");
}
#include <linux/lustre_net.h>
extern ptl_handle_eq_t bulk_source_eq, sent_pkt_eq, rcvd_rep_eq, bulk_sink_eq;
+static ptl_process_id_t local_id = {PTL_ADDR_GID, PTL_ID_ANY, PTL_ID_ANY};
+int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
+{
+ if (bulk->b_flags == PTL_BULK_SENT) {
+ EXIT;
+ return 1;
+ }
+
+ if (sigismember(&(current->pending.signal), SIGKILL) ||
+ sigismember(&(current->pending.signal), SIGINT)) {
+ bulk->b_flags = PTL_RPC_INTR;
+ EXIT;
+ return 1;
+ }
+
+ CERROR("no event yet\n");
+ return 0;
+}
int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
int portal)
break;
default:
BUG();
+ return -1; /* notreached */
}
request->rq_req_md.options = PTL_MD_OP_PUT;
request->rq_req_md.user_ptr = request;
return rc;
}
+int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *bulk, int portal)
+{
+ int rc;
+ ptl_process_id_t remote_id;
+ ptl_handle_md_t md_h;
+
+ bulk->b_md.start = bulk->b_buf;
+ bulk->b_md.length = bulk->b_buflen;
+ bulk->b_md.eventq = bulk_source_eq;
+ bulk->b_md.threshold = 2; /* SENT and ACK events */
+ bulk->b_md.options = PTL_MD_OP_PUT;
+ bulk->b_md.user_ptr = bulk;
+
+ rc = PtlMDBind(bulk->b_peer.peer_ni, bulk->b_md, &md_h);
+ if (rc != 0) {
+ BUG();
+ CERROR("PtlMDBind failed: %d\n", rc);
+ return rc;
+ }
+
+ remote_id.addr_kind = PTL_ADDR_NID;
+ remote_id.nid = bulk->b_peer.peer_nid;
+ remote_id.pid = 0;
+
+ CERROR("Sending %d bytes to portal %d, xid %d\n",
+ bulk->b_md.length, portal, bulk->b_xid);
+
+ rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0, bulk->b_xid, 0, 0);
+ if (rc != PTL_OK) {
+ BUG();
+ CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
+ portal, bulk->b_xid, rc);
+ /* FIXME: tear down md */
+ }
+
+ return rc;
+}
+
+int ptlrpc_wait_bulk(struct ptlrpc_bulk_desc *bulk)
+{
+ int rc;
+
+ ENTRY;
+
+ rc = PtlMEPrepend(bulk->b_peer.peer_ni, bulk->b_portal, local_id,
+ bulk->b_xid, 0, PTL_UNLINK, &bulk->b_me_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMEAttach failed: %d\n", rc);
+ BUG();
+ EXIT;
+ goto cleanup1;
+ }
+
+ bulk->b_md.start = bulk->b_buf;
+ bulk->b_md.length = bulk->b_buflen;
+ bulk->b_md.threshold = 1;
+ bulk->b_md.options = PTL_MD_OP_PUT;
+ bulk->b_md.user_ptr = bulk;
+ bulk->b_md.eventq = bulk_sink_eq;
+
+ rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK, &bulk->b_md_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMDAttach failed: %d\n", rc);
+ BUG();
+ EXIT;
+ goto cleanup2;
+ }
+
+ CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, portal %u\n",
+ bulk->b_buflen, bulk->b_xid, bulk->b_portal);
+
+ cleanup2:
+ PtlMEUnlink(bulk->b_me_h);
+ cleanup1:
+ PtlMDUnlink(bulk->b_md_h);
+
+ return rc;
+}
+
int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc,
struct ptlrpc_request *req)
{
return ptlrpc_reply(obddev, svc, req);
}
-
int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
{
ptl_process_id_t local_id;
local_id.rid = PTL_ID_ANY;
//CERROR("sending req %d\n", request->rq_xid);
- rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
- request->rq_xid, 0, PTL_UNLINK,
- &request->rq_reply_me_h);
+ rc = PtlMEPrepend(peer->peer_ni, request->rq_reply_portal, local_id,
+ request->rq_xid, 0, PTL_UNLINK,
+ &request->rq_reply_me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
BUG();
goto cleanup2;
}
- if (request->rq_bulklen != 0) {
- rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal,
- local_id, request->rq_xid, 0, PTL_UNLINK,
- &request->rq_bulk_me_h);
- if (rc != PTL_OK) {
- CERROR("PtlMEAttach failed: %d\n", rc);
- BUG();
- EXIT;
- goto cleanup3;
- }
-
- request->rq_bulk_md.start = request->rq_bulkbuf;
- request->rq_bulk_md.length = request->rq_bulklen;
- request->rq_bulk_md.threshold = 1;
- request->rq_bulk_md.options = PTL_MD_OP_PUT;
- request->rq_bulk_md.user_ptr = request;
- request->rq_bulk_md.eventq = bulk_sink_eq;
-
- rc = PtlMDAttach(request->rq_bulk_me_h, request->rq_bulk_md,
- PTL_UNLINK, &request->rq_bulk_md_h);
- if (rc != PTL_OK) {
- CERROR("PtlMDAttach failed: %d\n", rc);
- BUG();
- EXIT;
- goto cleanup4;
- }
- }
+ CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %u, portal %u\n",
+ request->rq_replen, request->rq_xid, request->rq_reply_portal);
return ptl_send_buf(request, peer, request->rq_req_portal);
- cleanup4:
- PtlMEUnlink(request->rq_bulk_me_h);
- cleanup3:
- PtlMDUnlink(request->rq_reply_md_h);
cleanup2:
PtlMEUnlink(request->rq_reply_me_h);
cleanup:
static int ptlrpc_check_event(struct ptlrpc_service *svc)
{
- if (sigismember(&(current->pending.signal),
- SIGKILL) ||
- sigismember(&(current->pending.signal),
- SIGINT)) {
+ if (sigismember(&(current->pending.signal), SIGKILL) ||
+ sigismember(&(current->pending.signal), SIGINT)) {
svc->srv_flags |= SVC_KILLED;
EXIT;
return 1;
return 0;
}
-struct ptlrpc_service *ptlrpc_init_svc(__u32 bufsize,
- int req_portal,
- int rep_portal,
- char *uuid,
- req_unpack_t unpack,
- rep_pack_t pack,
- svc_handler_t handler
- )
+struct ptlrpc_service *
+ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid,
+ req_unpack_t unpack, rep_pack_t pack, svc_handler_t handler)
{
int err;
struct ptlrpc_service *svc;
svc->srv_flags = SVC_STOPPING;
wake_up(&svc->srv_waitq);
- wait_event_interruptible(svc->srv_ctl_waitq, (svc->srv_flags & SVC_STOPPED));
+ wait_event_interruptible(svc->srv_ctl_waitq,
+ (svc->srv_flags & SVC_STOPPED));
}
int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
rc = PtlMEUnlink(service->srv_me_h[i]);
if (rc)
CERROR("PtlMEUnlink failed: %d\n", rc);
-
- OBD_FREE(service->srv_buf[i], service->srv_buf_size);
+
+ if (service->srv_buf[i] != NULL)
+ OBD_FREE(service->srv_buf[i], service->srv_buf_size);
+ service->srv_buf[i] = NULL;
}
rc = PtlEQFree(service->srv_eq_h);