From b4993f8e4aed32f23f6ff6ad901daa5fa9d23eae Mon Sep 17 00:00:00 2001 From: behlendo Date: Fri, 22 Feb 2002 02:05:30 +0000 Subject: [PATCH 1/1] - Added some fields for rpc double buffering - Added a NEXT_INDEX() macro - Adjusted mds/handler.c and ost/ost_handler.c to notify the rpc layer when finished processing an event, this ensures the buffers are recycled. - Added support for double buffering --- lustre/include/linux/lustre_net.h | 20 +++- lustre/include/linux/obd_rpc.h | 4 + lustre/mds/handler.c | 9 +- lustre/ost/ost_handler.c | 9 +- lustre/ptlrpc/rpc.c | 204 ++++++++++++++++++++++++++++++-------- 5 files changed, 200 insertions(+), 46 deletions(-) diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 6f25cd9..f312a80 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -40,21 +40,32 @@ #define OSC_BULK_PORTAL 9 #define OST_BULK_PORTAL 10 +/* default rpc ring length */ +#define RPC_RING_LENGTH 2 + +/* generic wrappable next */ +#define NEXT_INDEX(index, max) (((index+1) >= max) ? 0 : (index+1)) + + struct ptlrpc_service { - char *srv_buf; + char *srv_buf[RPC_RING_LENGTH]; __u32 srv_buf_size; + __u32 srv_me_active; + __u32 srv_me_tail; + __u32 srv_md_active; __u32 srv_ring_length; __u32 srv_portal; + __u32 srv_ref_count[RPC_RING_LENGTH]; struct lustre_peer srv_self; /* FIXME: perhaps a list of EQs, if multiple NIs are used? */ ptl_handle_eq_t srv_eq_h; - ptl_handle_me_t srv_me_h; + ptl_handle_me_t srv_me_h[RPC_RING_LENGTH]; ptl_process_id_t srv_id; - ptl_md_t srv_md; - ptl_handle_md_t srv_md_h; + ptl_md_t srv_md[RPC_RING_LENGTH]; + ptl_handle_md_t srv_md_h[RPC_RING_LENGTH]; wait_queue_head_t *srv_wait_queue; }; @@ -99,6 +110,7 @@ struct ptlrpc_request { int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, int portal, int is_request); int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer); +int ptl_received_rpc(struct ptlrpc_service *service); int rpc_register_service(struct ptlrpc_service *service, char *uuid); int rpc_unregister_service(struct ptlrpc_service *service); diff --git a/lustre/include/linux/obd_rpc.h b/lustre/include/linux/obd_rpc.h index af0f878..e0847bc 100644 --- a/lustre/include/linux/obd_rpc.h +++ b/lustre/include/linux/obd_rpc.h @@ -15,6 +15,10 @@ #define OBD_TGT_SOFT 0x4 +/* generic wrappable next */ + +#define NEXT_INDEX(index, max) \ + (((index + 1) >= max) ? 0 : (index + 1)) /* error codes */ diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 2308ffb..33e1649 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -464,10 +464,14 @@ int mds_main(void *arg) while (1) { struct ptlrpc_request request; + struct ptlrpc_service *service; rc = PtlEQGet(mds->mds_service->srv_eq_h, &ev); if (rc != PTL_OK && rc != PTL_EQ_DROPPED) break; + + service = (struct ptlrpc_service *)ev.mem_desc.user_ptr; + /* FIXME: If we move to an event-driven model, * we should put the request on the stack of * mds_handle instead. */ @@ -484,6 +488,9 @@ int mds_main(void *arg) request.rq_peer.peer_ni = mds->mds_service->srv_self.peer_ni; rc = mds_handle(&request); + + /* Inform the rpc layer the event has been handled */ + ptl_received_rpc(service); } } else { struct ptlrpc_request *request; @@ -577,7 +584,7 @@ static int mds_setup(struct obd_device *obddev, obd_count len, OBD_ALLOC(mds->mds_service, sizeof(*mds->mds_service)); if (mds->mds_service == NULL) return -ENOMEM; - mds->mds_service->srv_buf_size = 64 * 1024; + mds->mds_service->srv_buf_size = 4 * 1024; mds->mds_service->srv_portal = MDS_REQUEST_PORTAL; memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer)); mds->mds_service->srv_wait_queue = &mds->mds_waitq; diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index dc034b9..0359a00 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -508,10 +508,14 @@ int ost_main(void *arg) while (1) { struct ptlrpc_request request; + struct ptlrpc_service *service; rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev); if (rc != PTL_OK && rc != PTL_EQ_DROPPED) break; + + service = (struct ptlrpc_service *)ev.mem_desc.user_ptr; + /* FIXME: If we move to an event-driven model, * we should put the request on the stack of * mds_handle instead. */ @@ -528,6 +532,9 @@ int ost_main(void *arg) request.rq_peer.peer_ni = ost->ost_service->srv_self.peer_ni; rc = ost_handle(obddev, &request); + + /* Inform the rpc layer the event has been handled */ + ptl_received_rpc(service); } } else { struct ptlrpc_request *request; @@ -622,7 +629,7 @@ static int ost_setup(struct obd_device *obddev, obd_count len, OBD_ALLOC(ost->ost_service, sizeof(*ost->ost_service)); if (ost->ost_service == NULL) return -ENOMEM; - ost->ost_service->srv_buf_size = 64 * 1024; + ost->ost_service->srv_buf_size = 4 * 1024; ost->ost_service->srv_portal = OST_REQUEST_PORTAL; memcpy(&ost->ost_service->srv_self, &peer, sizeof(peer)); ost->ost_service->srv_wait_queue = &ost->ost_waitq; diff --git a/lustre/ptlrpc/rpc.c b/lustre/ptlrpc/rpc.c index df6e1bb..b61b8ce 100644 --- a/lustre/ptlrpc/rpc.c +++ b/lustre/ptlrpc/rpc.c @@ -30,9 +30,10 @@ #include static ptl_handle_eq_t req_eq, bulk_source_eq, bulk_sink_eq; +int obd_debug_level; +int obd_print_entry; -/* This callback performs two functions: - * +/* * 1. Free the request buffer after it has gone out on the wire * 2. Wake up the thread waiting for the reply once it comes in. */ @@ -56,8 +57,46 @@ static int request_callback(ptl_event_t *ev, void *data) static int incoming_callback(ptl_event_t *ev, void *data) { struct ptlrpc_service *service = data; + int rc; - ENTRY; + if (ev->rlength != ev->mlength) + printk("Warning: Possibly truncated rpc (%d/%d)\n", + ev->mlength, ev->rlength); + + /* The ME is unlinked when there is less than 1024 bytes free + * on its MD. This ensures we are always able to handle the rpc, + * although the 1024 value is a guess as to the size of a + * large rpc (the known safe margin should be determined). + * + * NOTE: The portals API by default unlinks all MD's associated + * with an ME when it's unlinked. For now, this behavior + * has been commented out of the portals library so the + * MD can be unlinked when its ref count drops to zero. + * A new MD and ME will then be created that use the same + * kmalloc()'ed memory and inserted at the ring tail. + */ + + service->srv_ref_count[service->srv_md_active]++; + + if (ev->offset >= (service->srv_buf_size - 1024)) { + printk("Unlinking ME %d\n", service->srv_me_active); + + rc = PtlMEUnlink(service->srv_me_h[service->srv_me_active]); + service->srv_me_h[service->srv_me_active] = 0; + + if (rc != PTL_OK) { + printk("PtlMEUnlink failed: %d\n", rc); + return rc; + } + + service->srv_me_active = NEXT_INDEX(service->srv_me_active, + service->srv_ring_length); + + if (service->srv_me_h[service->srv_me_active] == 0) + printk("All %d ring ME's are unlinked!\n", + service->srv_ring_length); + + } if (ev->type == PTL_EVENT_PUT) { wake_up(service->srv_wait_queue); @@ -65,7 +104,6 @@ static int incoming_callback(ptl_event_t *ev, void *data) printk("Unexpected event type: %d\n", ev->type); } - EXIT; return 0; } @@ -228,10 +266,68 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) return ptl_send_buf(request, peer, request->rq_req_portal, 1); } +/* ptl_received_rpc() should be called by the sleeping process once + * it finishes processing an event. This ensures the ref count is + * decrimented and that the rpc ring buffer cycles properly. + */ +int ptl_received_rpc(struct ptlrpc_service *service) { + int rc, index; + + index = service->srv_md_active; + CDEBUG(D_INFO, "MD index=%d Ref Count=%d\n", index, + service->srv_ref_count[index]); + service->srv_ref_count[index]--; + + if ((service->srv_ref_count[index] <= 0) && + (service->srv_me_h[index] == 0)) { + + CDEBUG(D_INFO, "Removing MD at index %d\n", index); + rc = PtlMDUnlink(service->srv_md_h[index]); + + if (rc) + printk(__FUNCTION__ ": PtlMDUnlink failed: %d\n", rc); + + /* Replace the unlinked ME and MD */ + CDEBUG(D_INFO, "Inserting new ME and MD in ring\n"); + + rc = PtlMEInsert(service->srv_me_h[service->srv_me_tail], + service->srv_id, 0, ~0, PTL_RETAIN, + PTL_INS_AFTER, &(service->srv_me_h[index])); + service->srv_me_tail = index; + service->srv_ref_count[index] = 0; + + if (rc != PTL_OK) { + printk("PtlMEInsert failed: %d\n", rc); + return rc; + } + + service->srv_md[index].start = service->srv_buf[index]; + service->srv_md[index].length = service->srv_buf_size; + service->srv_md[index].threshold = PTL_MD_THRESH_INF; + service->srv_md[index].options = PTL_MD_OP_PUT; + service->srv_md[index].user_ptr = service; + service->srv_md[index].eventq = service->srv_eq_h; + + rc = PtlMDAttach(service->srv_me_h[index], service->srv_md[index], + PTL_RETAIN, &(service->srv_md_h[index])); + + if (rc != PTL_OK) { + /* cleanup */ + printk("PtlMDAttach failed: %d\n", rc); + return rc; + } + + service->srv_md_active = NEXT_INDEX(index, + service->srv_ring_length); + } + + return 0; +} + int rpc_register_service(struct ptlrpc_service *service, char *uuid) { struct lustre_peer peer; - int rc; + int rc, i; rc = kportal_uuid_to_peer(uuid, &peer); if (rc != 0) { @@ -239,66 +335,94 @@ int rpc_register_service(struct ptlrpc_service *service, char *uuid) return -EINVAL; } - OBD_ALLOC(service->srv_buf, service->srv_buf_size); - if (service->srv_buf == NULL) { - printk(__FUNCTION__ ": no memory\n"); - return -ENOMEM; - } + service->srv_ring_length = RPC_RING_LENGTH; + service->srv_me_active = 0; + service->srv_md_active = 0; service->srv_id.addr_kind = PTL_ADDR_GID; service->srv_id.gid = PTL_ID_ANY; service->srv_id.rid = PTL_ID_ANY; - rc = PtlMEAttach(peer.peer_ni, service->srv_portal, service->srv_id, - 0, ~0, PTL_RETAIN, &service->srv_me_h); - if (rc != PTL_OK) { - printk("PtlMEAttach failed: %d\n", rc); - return rc; - } + rc = PtlEQAlloc(peer.peer_ni, 128, incoming_callback, + service, &(service->srv_eq_h)); - rc = PtlEQAlloc(peer.peer_ni, 128, incoming_callback, service, - &service->srv_eq_h); if (rc != PTL_OK) { printk("PtlEQAlloc failed: %d\n", rc); return rc; } - /* FIXME: Build an auto-unlinking MD and build a ring. */ - /* FIXME: Make sure that these are reachable by DMA on well-known - * addresses. */ - service->srv_md.start = service->srv_buf; - service->srv_md.length = service->srv_buf_size; - service->srv_md.threshold = PTL_MD_THRESH_INF; - service->srv_md.options = PTL_MD_OP_PUT; - service->srv_md.user_ptr = service; - service->srv_md.eventq = service->srv_eq_h; - - rc = PtlMDAttach(service->srv_me_h, service->srv_md, - PTL_RETAIN, &service->srv_md_h); + /* Attach the leading ME on which we build the ring */ + rc = PtlMEAttach(peer.peer_ni, service->srv_portal, + service->srv_id, 0, ~0, PTL_RETAIN, + &(service->srv_me_h[0])); + if (rc != PTL_OK) { - printk("PtlMDAttach failed: %d\n", rc); - /* FIXME: wow, we need to clean up. */ + printk("PtlMEAttach failed: %d\n", rc); return rc; } + for (i = 0; i < service->srv_ring_length; i++) { + OBD_ALLOC(service->srv_buf[i], service->srv_buf_size); + + if (service->srv_buf[i] == NULL) { + printk(__FUNCTION__ ": no memory\n"); + return -ENOMEM; + } + + /* Insert additional ME's to the ring */ + if (i > 0) { + rc = PtlMEInsert(service->srv_me_h[i-1], + service->srv_id, 0, ~0, PTL_RETAIN, + PTL_INS_AFTER, &(service->srv_me_h[i])); + service->srv_me_tail = i; + + if (rc != PTL_OK) { + printk("PtlMEInsert failed: %d\n", rc); + return rc; + } + } + + service->srv_ref_count[i] = 0; + service->srv_md[i].start = service->srv_buf[i]; + service->srv_md[i].length = service->srv_buf_size; + service->srv_md[i].threshold = PTL_MD_THRESH_INF; + service->srv_md[i].options = PTL_MD_OP_PUT; + service->srv_md[i].user_ptr = service; + service->srv_md[i].eventq = service->srv_eq_h; + + rc = PtlMDAttach(service->srv_me_h[i], service->srv_md[i], + PTL_RETAIN, &(service->srv_md_h[i])); + + if (rc != PTL_OK) { + /* cleanup */ + printk("PtlMDAttach failed: %d\n", rc); + return rc; + } + } + return 0; } int rpc_unregister_service(struct ptlrpc_service *service) { - int rc; + int rc, i; + + for (i = 0; i < service->srv_ring_length; i++) { + rc = PtlMDUnlink(service->srv_md_h[i]); + if (rc) + printk(__FUNCTION__ ": PtlMDUnlink failed: %d\n", rc); + + rc = PtlMEUnlink(service->srv_me_h[i]); + if (rc) + printk(__FUNCTION__ ": PtlMEUnlink failed: %d\n", rc); + + OBD_FREE(service->srv_buf[i], service->srv_buf_size); + } - rc = PtlMDUnlink(service->srv_md_h); - if (rc) - printk(__FUNCTION__ ": PtlMDUnlink failed: %d\n", rc); rc = PtlEQFree(service->srv_eq_h); if (rc) printk(__FUNCTION__ ": PtlEQFree failed: %d\n", rc); - rc = PtlMEUnlink(service->srv_me_h); - if (rc) - printk(__FUNCTION__ ": PtlMEUnlink failed: %d\n", rc); - OBD_FREE(service->srv_buf, service->srv_buf_size); return 0; } -- 1.8.3.1