struct ptlrpc_request {
int rq_type; /* one of PTL_RPC_REQUEST, PTL_RPC_REPLY, PTL_RPC_BULK */
spinlock_t rq_lock;
- struct list_head rq_list;
- struct obd_device *rq_obd;
- int rq_status;
+ struct list_head rq_list;
+ struct obd_device *rq_obd;
+ int rq_status;
int rq_flags;
__u32 rq_connid;
__u32 rq_xid;
- char *rq_reqbuf;
- int rq_reqlen;
- struct ptlreq_hdr *rq_reqhdr;
- union ptl_req rq_req;
+ char *rq_reqbuf;
+ int rq_reqlen;
+ struct ptlreq_hdr *rq_reqhdr;
+ union ptl_req rq_req;
- char *rq_repbuf;
- int rq_replen;
- struct ptlrep_hdr *rq_rephdr;
- union ptl_rep rq_rep;
+ char *rq_repbuf;
+ int rq_replen;
+ struct ptlrep_hdr *rq_rephdr;
+ union ptl_rep rq_rep;
char *rq_bulkbuf;
int rq_bulklen;
void * rq_reply_handle;
- wait_queue_head_t rq_wait_for_rep;
+ wait_queue_head_t rq_wait_for_rep;
/* incoming reply */
ptl_md_t rq_reply_md;
struct obd_conn b_conn;
__u32 b_xid;
- wait_queue_head_t b_waitq;
+ wait_queue_head_t b_waitq;
ptl_md_t b_md;
ptl_handle_md_t b_md_h;
/* incoming request buffers */
/* FIXME: perhaps a list of EQs, if multiple NIs are used? */
char *srv_buf[RPC_RING_LENGTH];
+ __u32 srv_ref_count[RPC_RING_LENGTH];
+ ptl_handle_me_t srv_me_h[RPC_RING_LENGTH];
__u32 srv_buf_size;
- __u32 srv_me_tail;
- __u32 srv_md_active;
__u32 srv_ring_length;
__u32 srv_req_portal;
__u32 srv_rep_portal;
- __u32 srv_ref_count[RPC_RING_LENGTH];
- ptl_handle_me_t srv_me_h[RPC_RING_LENGTH];
- ptl_process_id_t srv_id;
- ptl_md_t srv_md[RPC_RING_LENGTH];
- ptl_handle_md_t srv_md_h[RPC_RING_LENGTH];
+
__u32 srv_xid;
/* event queue */
__u32 srv_flags;
struct lustre_peer srv_self;
- struct task_struct *srv_thread;
- wait_queue_head_t srv_waitq;
- wait_queue_head_t srv_ctl_waitq;
- int ost_flags;
+ ptl_process_id_t srv_id;
+
+ struct task_struct *srv_thread;
+ wait_queue_head_t srv_waitq;
+ wait_queue_head_t srv_ctl_waitq;
- spinlock_t srv_lock;
- struct list_head srv_reqs;
+ spinlock_t srv_lock;
+ struct list_head srv_reqs;
ptl_event_t srv_ev;
req_unpack_t srv_req_unpack;
rep_pack_t srv_rep_pack;
int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc,
struct ptlrpc_request *req);
int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer);
+void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i);
/* rpc/client.c */
int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal,
mds->mds_service = ptlrpc_init_svc(64 * 1024,
MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
"self", mds_handle);
-
- rpc_register_service(mds->mds_service, "self");
+ if (!mds->mds_service) {
+ CERROR("failed to start service\n");
+ RETURN(-EINVAL);
+ }
err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
if (err)
OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
"self", ost_handle);
if (!ost->ost_service) {
+ CERROR("failed to start service\n");
obd_disconnect(&ost->ost_conn);
RETURN(-EINVAL);
}
- rpc_register_service(ost->ost_service, "self");
-
err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
if (err) {
obd_disconnect(&ost->ost_conn);
int server_request_callback(ptl_event_t *ev, void *data)
{
struct ptlrpc_service *service = data;
- int rc;
+ int index;
if (ev->rlength != ev->mlength)
CERROR("Warning: Possibly truncated rpc (%d/%d)\n",
ev->mlength, ev->rlength);
- /* The ME is unlinked when there is less than 1024 bytes free
- * on its MD. This ensures we are always able to handle the rpc,
- * although the 1024 value is a guess as to the size of a
- * large rpc (the known safe margin should be determined).
- *
- * NOTE: The portals API by default unlinks all MD's associated
- * with an ME when it's unlinked. For now, this behavior
- * has been commented out of the portals library so the
- * MD can be unlinked when its ref count drops to zero.
- * A new MD and ME will then be created that use the same
- * kmalloc()'ed memory and inserted at the ring tail.
- */
-
spin_lock(&service->srv_lock);
- if ( ev->mem_desc.start !=
- service->srv_md[service->srv_md_active].start ) {
+ for (index = 0; index < service->srv_ring_length; index++)
+ if ( service->srv_buf[index] == ev->mem_desc.start)
+ break;
+
+ if (index == service->srv_ring_length)
LBUG();
- }
- service->srv_ref_count[service->srv_md_active]++;
- CDEBUG(D_INODE, "event offset %d buf size %d\n",
- ev->offset, service->srv_buf_size);
- if (ev->offset >= (service->srv_buf_size - 1024)) {
- CDEBUG(D_INODE, "Unlinking ME %d\n", service->srv_md_active);
+ service->srv_ref_count[index]++;
- rc = PtlMEUnlink(service->srv_me_h[service->srv_md_active]);
- service->srv_me_h[service->srv_md_active] = 0;
+ if (ev->unlinked_me != -1) {
+ int idx;
- if (rc != PTL_OK) {
- CERROR("PtlMEUnlink failed - DROPPING soon: %d\n", rc);
+ for (idx = 0; idx < service->srv_ring_length; idx++)
+ if (service->srv_me_h[idx] == ev->unlinked_me)
+ break;
+ if (idx == service->srv_ring_length)
LBUG();
- spin_unlock(&service->srv_lock);
- return rc;
- }
- service->srv_md_active = (service->srv_md_active + 1) %
- service->srv_ring_length;
+ CDEBUG(D_NET, "unlinked %d\n", idx);
+ service->srv_me_h[idx] = 0;
- if (service->srv_me_h[service->srv_md_active] == 0) {
- CERROR("All %d ring ME's are unlinked!\n",
- service->srv_ring_length);
- LBUG();
- }
+ if (service->srv_ref_count[idx] == 0)
+ ptlrpc_link_svc_me(service, idx);
}
spin_unlock(&service->srv_lock);
- if (ev->type == PTL_EVENT_PUT) {
+ if (ev->type == PTL_EVENT_PUT)
wake_up(&service->srv_waitq);
- } else {
+ else
CERROR("Unexpected event type: %d\n", ev->type);
- }
return 0;
}
}
/* FIXME: This should happen unconditionally */
- if (bulk->b_cb != NULL) {
+ if (bulk->b_cb != NULL)
OBD_FREE(bulk, sizeof(*bulk));
- }
EXIT;
return 1;
ENTRY;
- rc = PtlMEPrepend(bulk->b_peer.peer_ni, bulk->b_portal, local_id,
- bulk->b_xid, 0, PTL_UNLINK, &bulk->b_me_h);
+ rc = PtlMEAttach(bulk->b_peer.peer_ni, bulk->b_portal, local_id,
+ bulk->b_xid, 0, PTL_UNLINK, PTL_INS_AFTER,
+ &bulk->b_me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
LBUG();
local_id.pid = PTL_ID_ANY;
//CERROR("sending req %d\n", request->rq_xid);
- rc = PtlMEPrepend(peer->peer_ni, request->rq_reply_portal, local_id,
- request->rq_xid, 0, PTL_UNLINK,
+ rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
+ request->rq_xid, 0, PTL_UNLINK, PTL_INS_AFTER,
&request->rq_reply_me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
return rc;
}
+void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i)
+{
+ int rc;
+ ptl_md_t dummy;
+ ptl_handle_md_t md_h;
+
+ /* Attach the leading ME on which we build the ring */
+ rc = PtlMEAttach(service->srv_self.peer_ni, service->srv_req_portal,
+ service->srv_id, 0, ~0, PTL_RETAIN, PTL_INS_BEFORE,
+ &(service->srv_me_h[i]));
+ if (rc != PTL_OK) {
+ CERROR("PtlMEAttach failed: %d\n", rc);
+ LBUG();
+ }
+
+ if (service->srv_ref_count[i])
+ LBUG();
+
+ dummy.start = service->srv_buf[i];
+ dummy.length = service->srv_buf_size;
+ dummy.max_offset = service->srv_buf_size;
+ dummy.threshold = PTL_MD_THRESH_INF;
+ dummy.options = PTL_MD_OP_PUT | PTL_MD_AUTO_UNLINK;
+ dummy.user_ptr = service;
+ dummy.eventq = service->srv_eq_h;
+ dummy.max_offset = service->srv_buf_size;
+
+ rc = PtlMDAttach(service->srv_me_h[i], dummy, PTL_UNLINK, &md_h);
+ if (rc != PTL_OK) {
+ /* cleanup */
+ CERROR("PtlMDAttach failed: %d\n", rc);
+ LBUG();
+ }
+}
+
/* ptl_handled_rpc() should be called by the sleeping process once
* it finishes processing an event. This ensures the ref count is
* decremented and that the rpc ring buffer cycles properly.
*/
int ptl_handled_rpc(struct ptlrpc_service *service, void *start)
{
- int rc, index = 0;
+ int index;
spin_lock(&service->srv_lock);
- while (index < service->srv_ring_length) {
- if ( service->srv_md[index].start == start)
+ for (index = 0; index < service->srv_ring_length; index++)
+ if (service->srv_buf[index] == start)
break;
- index++;
- }
+
if (index == service->srv_ring_length)
LBUG();
LBUG();
if (service->srv_ref_count[index] == 0 &&
- service->srv_me_h[index] == 0) {
-
- /* Replace the unlinked ME and MD */
- rc = PtlMEInsert(service->srv_me_h[service->srv_me_tail],
- service->srv_id, 0, ~0, PTL_RETAIN,
- PTL_INS_AFTER, &(service->srv_me_h[index]));
- if (rc != PTL_OK) {
- CERROR("PtlMEInsert failed: %d\n", rc);
- LBUG();
- spin_unlock(&service->srv_lock);
- return rc;
- }
- CDEBUG(D_NET, "Inserting new ME and MD in ring, rc %d\n", rc);
-
- service->srv_me_tail = index;
-
- service->srv_md[index].start = service->srv_buf[index];
- service->srv_md[index].length = service->srv_buf_size;
- service->srv_md[index].threshold = PTL_MD_THRESH_INF;
- service->srv_md[index].options = PTL_MD_OP_PUT;
- service->srv_md[index].user_ptr = service;
- service->srv_md[index].eventq = service->srv_eq_h;
-
- rc = PtlMDAttach(service->srv_me_h[index],
- service->srv_md[index],
- PTL_RETAIN, &(service->srv_md_h[index]));
- //CERROR("MDAttach (request MDs): %Lu\n",
- //(__u64)(service->srv_md_h[index]));
-
- CDEBUG(D_INFO, "Attach MD in ring, rc %d\n", rc);
- if (rc != PTL_OK) {
- /* XXX cleanup */
- CERROR("PtlMDAttach failed: %d\n", rc);
- LBUG();
- spin_unlock(&service->srv_lock);
- return rc;
- }
- }
+ service->srv_me_h[index] == 0) {
+ CDEBUG(D_NET, "relinking %d\n", index);
+ ptlrpc_link_svc_me(service, index);
+ }
spin_unlock(&service->srv_lock);
return 0;
svc_handler_t handler)
{
int err;
- struct ptlrpc_service *svc;
+ int rc, i;
+ struct ptlrpc_service *service;
- OBD_ALLOC(svc, sizeof(*svc));
- if ( !svc ) {
+ OBD_ALLOC(service, sizeof(*service));
+ if ( !service ) {
CERROR("no memory\n");
+ LBUG();
return NULL;
}
- memset(svc, 0, sizeof(*svc));
+ memset(service, 0, sizeof(*service));
- spin_lock_init(&svc->srv_lock);
- INIT_LIST_HEAD(&svc->srv_reqs);
- init_waitqueue_head(&svc->srv_ctl_waitq);
- init_waitqueue_head(&svc->srv_waitq);
+ spin_lock_init(&service->srv_lock);
+ INIT_LIST_HEAD(&service->srv_reqs);
+ init_waitqueue_head(&service->srv_ctl_waitq);
+ init_waitqueue_head(&service->srv_waitq);
- svc->srv_thread = NULL;
- svc->srv_flags = 0;
+ service->srv_thread = NULL;
+ service->srv_flags = 0;
- svc->srv_buf_size = bufsize;
- svc->srv_rep_portal = rep_portal;
- svc->srv_req_portal = req_portal;
+ service->srv_buf_size = bufsize;
+ service->srv_rep_portal = rep_portal;
+ service->srv_req_portal = req_portal;
+ service->srv_handler = handler;
- svc->srv_handler = handler;
- err = kportal_uuid_to_peer(uuid, &svc->srv_self);
+ err = kportal_uuid_to_peer(uuid, &service->srv_self);
if (err) {
CERROR("cannot get peer for uuid %s", uuid);
- OBD_FREE(svc, sizeof(*svc));
+ OBD_FREE(service, sizeof(*service));
return NULL;
}
- return svc;
+
+ service->srv_ring_length = RPC_RING_LENGTH;
+ service->srv_id.nid = PTL_ID_ANY;
+ service->srv_id.pid = PTL_ID_ANY;
+
+ rc = PtlEQAlloc(service->srv_self.peer_ni, 128,
+ server_request_callback,
+ service, &(service->srv_eq_h));
+
+ if (rc != PTL_OK) {
+ CERROR("PtlEQAlloc failed: %d\n", rc);
+ LBUG();
+ return NULL;
+ }
+
+ for (i = 0; i < service->srv_ring_length; i++) {
+ OBD_ALLOC(service->srv_buf[i], service->srv_buf_size);
+ if (service->srv_buf[i] == NULL) {
+ CERROR("no memory\n");
+ LBUG();
+ return NULL;
+ }
+ service->srv_ref_count[i] = 0;
+ ptlrpc_link_svc_me(service, i);
+ }
+
+ CDEBUG(D_NET, "Starting service listening on portal %d\n",
+ service->srv_req_portal);
+
+ return service;
}
static int ptlrpc_main(void *arg)
}
-int rpc_register_service(struct ptlrpc_service *service, char *uuid)
-{
- struct lustre_peer peer;
- int rc, i;
-
- rc = kportal_uuid_to_peer(uuid, &peer);
- if (rc != 0) {
- CERROR("Invalid uuid \"%s\"\n", uuid);
- return -EINVAL;
- }
-
- service->srv_ring_length = RPC_RING_LENGTH;
- service->srv_md_active = 0;
-
- service->srv_id.nid = PTL_ID_ANY;
- service->srv_id.pid = PTL_ID_ANY;
-
- rc = PtlEQAlloc(peer.peer_ni, 128, server_request_callback,
- service, &(service->srv_eq_h));
-
- if (rc != PTL_OK) {
- CERROR("PtlEQAlloc failed: %d\n", rc);
- return rc;
- }
-
- CDEBUG(D_NET, "Starting service listening on portal %d\n",
- service->srv_req_portal);
-
- /* Attach the leading ME on which we build the ring */
- rc = PtlMEAttach(peer.peer_ni, service->srv_req_portal,
- service->srv_id, 0, ~0, PTL_RETAIN,
- &(service->srv_me_h[0]));
-
- if (rc != PTL_OK) {
- CERROR("PtlMEAttach failed: %d\n", rc);
- return rc;
- }
-
- for (i = 0; i < service->srv_ring_length; i++) {
- OBD_ALLOC(service->srv_buf[i], service->srv_buf_size);
-
- if (service->srv_buf[i] == NULL) {
- CERROR("no memory\n");
- return -ENOMEM;
- }
-
- /* Insert additional ME's to the ring */
- if (i > 0) {
- rc = PtlMEInsert(service->srv_me_h[i - 1],
- service->srv_id, 0, ~0, PTL_RETAIN,
- PTL_INS_AFTER,
- &(service->srv_me_h[i]));
- service->srv_me_tail = i;
-
- if (rc != PTL_OK) {
- CERROR("PtlMEInsert failed: %d\n", rc);
- return rc;
- }
- }
-
- service->srv_ref_count[i] = 0;
- service->srv_md[i].start = service->srv_buf[i];
- service->srv_md[i].length = service->srv_buf_size;
- service->srv_md[i].max_offset = service->srv_buf_size;
- service->srv_md[i].threshold = PTL_MD_THRESH_INF;
- service->srv_md[i].options = PTL_MD_OP_PUT;
- service->srv_md[i].user_ptr = service;
- service->srv_md[i].eventq = service->srv_eq_h;
- service->srv_md[i].max_offset = service->srv_buf_size;
-
- rc = PtlMDAttach(service->srv_me_h[i], service->srv_md[i],
- PTL_RETAIN, &(service->srv_md_h[i]));
-
- if (rc != PTL_OK) {
- /* cleanup */
- CERROR("PtlMDAttach failed: %d\n", rc);
- return rc;
- }
- }
-
- return 0;
-}
int rpc_unregister_service(struct ptlrpc_service *service)
{
int rc, i;
for (i = 0; i < service->srv_ring_length; i++) {
- rc = PtlMEUnlink(service->srv_me_h[i]);
- if (rc)
- CERROR("PtlMEUnlink failed: %d\n", rc);
+ if (service->srv_me_h[i]) {
+ rc = PtlMEUnlink(service->srv_me_h[i]);
+ if (rc)
+ CERROR("PtlMEUnlink failed: %d\n", rc);
+ service->srv_me_h[i] = 0;
+ }
if (service->srv_buf[i] != NULL)
OBD_FREE(service->srv_buf[i], service->srv_buf_size);