From 0fff249f58467410011444548f7652665301d22d Mon Sep 17 00:00:00 2001 From: eeb Date: Thu, 5 Sep 2002 16:33:51 +0000 Subject: [PATCH] Fixed service request buffer race --- lustre/include/linux/lustre_net.h | 25 ++++++--- lustre/ldlm/ldlm_lockd.c | 2 +- lustre/lib/debug.c | 16 +++--- lustre/mds/handler.c | 2 +- lustre/ost/ost_handler.c | 2 +- lustre/ptlrpc/events.c | 3 +- lustre/ptlrpc/niobuf.c | 56 ++++--------------- lustre/ptlrpc/service.c | 113 +++++++++++++++++--------------------- lustre/utils/obd.c | 7 ++- 9 files changed, 95 insertions(+), 131 deletions(-) diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 19c9be9..79c7d02 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -32,7 +32,8 @@ #include /* default rpc ring length */ -#define RPC_RING_LENGTH 10 +//#define RPC_RING_LENGTH 10 +#define RPC_REQUEST_QUEUE_DEPTH 1024 struct ptlrpc_connection { struct list_head c_link; @@ -178,17 +179,23 @@ struct ptlrpc_thread { wait_queue_head_t t_ctl_waitq; }; +struct ptlrpc_request_buffer_desc { + struct ptlrpc_service *rqbd_service; + ptl_handle_me_t rqbd_me_h; + char *rqbd_buffer; +}; + struct ptlrpc_service { time_t srv_time; time_t srv_timeout; /* incoming request buffers */ /* FIXME: perhaps a list of EQs, if multiple NIs are used? */ - char *srv_buf[RPC_RING_LENGTH]; - __u32 srv_ref_count[RPC_RING_LENGTH]; - ptl_handle_me_t srv_me_h[RPC_RING_LENGTH]; - __u32 srv_buf_size; - __u32 srv_ring_length; + + struct ptlrpc_request_buffer_desc *srv_rqbds; /* all the request buffer descriptors */ + + __u32 srv_buf_size; /* # bytes in a request buffer */ + __u32 srv_nbuffs; /* # request buffers */ __u32 srv_req_portal; __u32 srv_rep_portal; @@ -237,7 +244,7 @@ int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req); int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req); void ptlrpc_resend_req(struct ptlrpc_request *request); int ptl_send_rpc(struct ptlrpc_request *request); -void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i); +void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd); /* rpc/client.c */ void ptlrpc_init_client(int req_portal, int rep_portal, char *name, @@ -263,8 +270,8 @@ int ptlrpc_check_status(struct ptlrpc_request *req, int err); /* rpc/service.c */ struct ptlrpc_service * -ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid, - svc_handler_t, char *name); +ptlrpc_init_svc(__u32 bufsize, int nbuffs, int req_portal, int rep_portal, + char *uuid, svc_handler_t, char *name); void ptlrpc_stop_all_threads(struct ptlrpc_service *svc); int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc, char *name); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 0f36c0a..86e4eae 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -550,7 +550,7 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf) if (rc != 0) GOTO(out_dec, rc); - ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL, + ldlm->ldlm_service = ptlrpc_init_svc(1024, 640, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, "self", ldlm_callback_handler, "ldlm"); if (!ldlm->ldlm_service) diff --git a/lustre/lib/debug.c b/lustre/lib/debug.c index 9f6d9c5..18ade8b 100644 --- a/lustre/lib/debug.c +++ b/lustre/lib/debug.c @@ -117,25 +117,25 @@ int page_debug_check(char *who, void *addr, int end, __u64 off, __u64 id) ne_off = HTON__u64(off); id = HTON__u64(id); if (memcmp(addr, (char *)&ne_off, LPDS)) { - CERROR("%s: offset "LPU64" off: "LPX64" != "LPX64"\n", - who, off, *(__u64 *)addr, ne_off); + CERROR("%s: id "LPU64" offset "LPU64" off: "LPX64" != "LPX64"\n", + who, id, off, *(__u64 *)addr, ne_off); err = -EINVAL; } if (memcmp(addr + LPDS, (char *)&id, LPDS)) { - CERROR("%s: offset "LPU64" id: "LPX64" != "LPX64"\n", - who, off, *(__u64 *)(addr + LPDS), id); + CERROR("%s: id "LPU64" offset "LPU64" id: "LPX64" != "LPX64"\n", + who, id, off, *(__u64 *)(addr + LPDS), id); err = -EINVAL; } addr += end - LPDS - LPDS; if (memcmp(addr, (char *)&ne_off, LPDS)) { - CERROR("%s: offset "LPU64" end off: "LPX64" != "LPX64"\n", - who, off, *(__u64 *)addr, ne_off); + CERROR("%s: id "LPU64" offset "LPU64" end off: "LPX64" != "LPX64"\n", + who, id, off, *(__u64 *)addr, ne_off); err = -EINVAL; } if (memcmp(addr + LPDS, (char *)&id, LPDS)) { - CERROR("%s: offset "LPU64" end id: "LPX64" != "LPX64"\n", - who, off, *(__u64 *)(addr + LPDS), id); + CERROR("%s: id "LPU64" offset "LPU64" end id: "LPX64" != "LPX64"\n", + who, id, off, *(__u64 *)(addr + LPDS), id); err = -EINVAL; } diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index e5a4724..b97bc88 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -1081,7 +1081,7 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) GOTO(err_put, rc); } - mds->mds_service = ptlrpc_init_svc(64 * 1024, MDS_REQUEST_PORTAL, + mds->mds_service = ptlrpc_init_svc(1024, 640, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, "self",mds_handle, "mds"); if (!mds->mds_service) { diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 1102e4e..2ac42ef 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -597,7 +597,7 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf) GOTO(error_dec, err = -EINVAL); } - ost->ost_service = ptlrpc_init_svc(64 * 1024, OST_REQUEST_PORTAL, + ost->ost_service = ptlrpc_init_svc(1024, 640, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, "self", ost_handle, "ost"); if (!ost->ost_service) { diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index b52e1b0..23718b8 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -105,7 +105,8 @@ static int reply_in_callback(ptl_event_t *ev) int request_in_callback(ptl_event_t *ev) { - struct ptlrpc_service *service = ev->mem_desc.user_ptr; + struct ptlrpc_request_buffer_desc *rqbd = ev->mem_desc.user_ptr; + struct ptlrpc_service *service = rqbd->rqbd_service; LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* requests always contiguous */ diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 41987e2..6445db0 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -28,7 +28,6 @@ extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq, bulk_source_eq, bulk_sink_eq; -static ptl_process_id_t local_id = {PTL_NID_ANY, PTL_PID_ANY}; static int ptl_send_buf(struct ptlrpc_request *request, struct ptlrpc_connection *conn, int portal) @@ -380,70 +379,35 @@ int ptl_send_rpc(struct ptlrpc_request *request) return rc; } -void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i) +void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd) { + struct ptlrpc_service *service = rqbd->rqbd_service; + static ptl_process_id_t match_id = {PTL_NID_ANY, PTL_PID_ANY}; int rc; ptl_md_t dummy; ptl_handle_md_t md_h; /* Attach the leading ME on which we build the ring */ rc = PtlMEAttach(service->srv_self.peer_ni, service->srv_req_portal, - local_id, 0, ~0, PTL_RETAIN, PTL_INS_BEFORE, - &(service->srv_me_h[i])); + match_id, 0, ~0, + PTL_UNLINK, PTL_INS_AFTER, &rqbd->rqbd_me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); LBUG(); } - if (service->srv_ref_count[i]) - LBUG(); - - dummy.start = service->srv_buf[i]; + dummy.start = rqbd->rqbd_buffer; dummy.length = service->srv_buf_size; dummy.max_offset = service->srv_buf_size; - dummy.threshold = PTL_MD_THRESH_INF; - dummy.options = PTL_MD_OP_PUT | PTL_MD_AUTO_UNLINK; - dummy.user_ptr = service; + dummy.threshold = 1; + dummy.options = PTL_MD_OP_PUT; + dummy.user_ptr = rqbd; dummy.eventq = service->srv_eq_h; - dummy.max_offset = service->srv_buf_size; - rc = PtlMDAttach(service->srv_me_h[i], dummy, PTL_UNLINK, &md_h); + rc = PtlMDAttach(rqbd->rqbd_me_h, dummy, PTL_UNLINK, &md_h); if (rc != PTL_OK) { /* cleanup */ CERROR("PtlMDAttach failed: %d\n", rc); LBUG(); } } - -/* ptl_handled_rpc() should be called by the sleeping process once - * it finishes processing an event. This ensures the ref count is - * decremented and that the rpc ring buffer cycles properly. - */ -int ptl_handled_rpc(struct ptlrpc_service *service, void *start) -{ - int index; - - spin_lock(&service->srv_lock); - for (index = 0; index < service->srv_ring_length; index++) - if (service->srv_buf[index] == start) - break; - - if (index == service->srv_ring_length) - LBUG(); - - CDEBUG(D_INFO, "MD index=%d Ref Count=%d\n", index, - service->srv_ref_count[index]); - service->srv_ref_count[index]--; - - if (service->srv_ref_count[index] < 0) - LBUG(); - - if (service->srv_ref_count[index] == 0 && - !ptl_is_valid_handle(&(service->srv_me_h[index]))) { - CDEBUG(D_NET, "relinking %d\n", index); - ptlrpc_link_svc_me(service, index); - } - - spin_unlock(&service->srv_lock); - return 0; -} diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 7bba07c..9c69af8 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -27,7 +27,6 @@ #include extern int request_in_callback(ptl_event_t *ev); -extern int ptl_handled_rpc(struct ptlrpc_service *service, void *start); static int ptlrpc_check_event(struct ptlrpc_service *svc, struct ptlrpc_thread *thread, ptl_event_t *event) @@ -39,6 +38,8 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc, if (thread->t_flags & SVC_STOPPING) GOTO(out, rc = 1); + LASSERT ((thread->t_flags & SVC_EVENT) == 0); + if (ptl_is_valid_handle(&svc->srv_eq_h)) { int err; err = PtlEQGet(svc->srv_eq_h, event); @@ -63,7 +64,7 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc, } struct ptlrpc_service * -ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid, +ptlrpc_init_svc(__u32 bufsize, int nbuffs, int req_portal, int rep_portal, char *uuid, svc_handler_t handler, char *name) { int err; @@ -84,6 +85,7 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid, init_waitqueue_head(&service->srv_waitq); service->srv_buf_size = bufsize; + service->srv_nbuffs = nbuffs; service->srv_rep_portal = rep_portal; service->srv_req_portal = req_portal; service->srv_handler = handler; @@ -95,10 +97,9 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid, RETURN(NULL); } - service->srv_ring_length = RPC_RING_LENGTH; - - rc = PtlEQAlloc(service->srv_self.peer_ni, 1024, request_in_callback, - &(service->srv_eq_h)); + /* NB We need exactly 1 event for each buffer we queue */ + rc = PtlEQAlloc(service->srv_self.peer_ni, service->srv_nbuffs, + request_in_callback, &(service->srv_eq_h)); if (rc != PTL_OK) { CERROR("PtlEQAlloc failed: %d\n", rc); @@ -107,23 +108,33 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid, RETURN(NULL); } - for (i = 0; i < service->srv_ring_length; i++) { - OBD_ALLOC(service->srv_buf[i], service->srv_buf_size); - if (service->srv_buf[i] == NULL) { + OBD_ALLOC(service->srv_rqbds, + service->srv_nbuffs * sizeof (struct ptlrpc_request_buffer_desc)); + if (service->srv_rqbds == NULL) { + CERROR("no memory\n"); + LBUG(); + GOTO(failed, NULL); + } + + for (i = 0; i < service->srv_nbuffs; i++) { + struct ptlrpc_request_buffer_desc *rqbd = &service->srv_rqbds[i]; + + rqbd->rqbd_service = service; + ptl_set_inv_handle (&rqbd->rqbd_me_h); + OBD_ALLOC(rqbd->rqbd_buffer, service->srv_buf_size); + if (rqbd->rqbd_buffer == NULL) { CERROR("no memory\n"); LBUG(); - GOTO(err_ring, NULL); + GOTO(failed, NULL); } - service->srv_ref_count[i] = 0; - ptlrpc_link_svc_me(service, i); + ptlrpc_link_svc_me(rqbd); } CDEBUG(D_NET, "Starting service listening on portal %d\n", service->srv_req_portal); RETURN(service); -err_ring: - service->srv_ring_length = i; +failed: ptlrpc_unregister_service(service); return NULL; } @@ -132,15 +143,17 @@ static int handle_incoming_request(struct obd_device *obddev, struct ptlrpc_service *svc, ptl_event_t *event) { + struct ptlrpc_request_buffer_desc *rqbd = event->mem_desc.user_ptr; struct ptlrpc_request request; - void *start; int rc; /* FIXME: If we move to an event-driven model, we should put the request * on the stack of mds_handle instead. */ LASSERT ((event->mem_desc.options & PTL_MD_IOV) == 0); - start = event->mem_desc.start; - + LASSERT (rqbd->rqbd_service == svc); + LASSERT (rqbd->rqbd_buffer == event->mem_desc.start); + LASSERT (event->offset == 0); + memset(&request, 0, sizeof(request)); request.rq_svc = svc; request.rq_obd = obddev; @@ -160,6 +173,7 @@ static int handle_incoming_request(struct obd_device *obddev, CERROR("wrong packet type received (type=%u)\n", request.rq_reqmsg->type); LBUG(); + spin_unlock(&svc->srv_lock); RETURN(-EINVAL); } @@ -194,44 +208,12 @@ static int handle_incoming_request(struct obd_device *obddev, } spin_unlock(&svc->srv_lock); + rc = svc->srv_handler(&request); ptlrpc_put_connection(request.rq_connection); - ptl_handled_rpc(svc, start); - return rc; -} - -void ptlrpc_rotate_reqbufs(struct ptlrpc_service *service, - ptl_event_t *ev) -{ - int index; - - LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); - for (index = 0; index < service->srv_ring_length; index++) - if (service->srv_buf[index] == ev->mem_desc.start) - break; - - if (index == service->srv_ring_length) - LBUG(); - - service->srv_ref_count[index]++; - - if (ptl_is_valid_handle(&ev->unlinked_me)) { - int idx; - - for (idx = 0; idx < service->srv_ring_length; idx++) - if (service->srv_me_h[idx].handle_idx == - ev->unlinked_me.handle_idx) - break; - if (idx == service->srv_ring_length) - LBUG(); - - CDEBUG(D_NET, "unlinked %d\n", idx); - ptl_set_inv_handle(&(service->srv_me_h[idx])); - - if (service->srv_ref_count[idx] == 0) - ptlrpc_link_svc_me(service, idx); - } + ptlrpc_link_svc_me (rqbd); + return rc; } static int ptlrpc_main(void *arg) @@ -277,9 +259,7 @@ static int ptlrpc_main(void *arg) } if (thread->t_flags & SVC_EVENT) { - thread->t_flags &= ~SVC_EVENT; - ptlrpc_rotate_reqbufs(svc, &event); - + LASSERT (event.sequence != 0); rc = handle_incoming_request(obddev, svc, &event); thread->t_flags &= ~SVC_EVENT; continue; @@ -364,17 +344,26 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) { int rc, i; - for (i = 0; i < service->srv_ring_length; i++) { - if (ptl_is_valid_handle(&(service->srv_me_h[i]))) { - rc = PtlMEUnlink(service->srv_me_h[i]); + if (service->srv_rqbds != NULL) + { + for (i = 0; i < service->srv_nbuffs; i++) { + struct ptlrpc_request_buffer_desc *rqbd = &service->srv_rqbds[i]; + + if (rqbd->rqbd_buffer == NULL) /* no buffer allocated */ + continue; /* => never initialised */ + + /* Buffer allocated => got linked */ + LASSERT (ptl_is_valid_handle (&rqbd->rqbd_me_h)); + + rc = PtlMEUnlink(rqbd->rqbd_me_h); if (rc) CERROR("PtlMEUnlink failed: %d\n", rc); - ptl_set_inv_handle(&(service->srv_me_h[i])); + + OBD_FREE(rqbd->rqbd_buffer, service->srv_buf_size); } - if (service->srv_buf[i] != NULL) - OBD_FREE(service->srv_buf[i], service->srv_buf_size); - service->srv_buf[i] = NULL; + OBD_FREE(service->srv_rqbds, + service->srv_nbuffs * sizeof (struct ptlrpc_request_buffer_desc)); } rc = PtlEQFree(service->srv_eq_h); diff --git a/lustre/utils/obd.c b/lustre/utils/obd.c index ce2b83e..6a22992 100644 --- a/lustre/utils/obd.c +++ b/lustre/utils/obd.c @@ -1044,7 +1044,10 @@ int jt_obd_test_brw(int argc, char **argv) } } if (argc >= 6) { - objid = strtoull(argv[5], &end, 0); + if (argv[5][0] == 't') + objid = strtoul(argv[5] + 1, &end, 0) + thread; + else + objid = strtoull(argv[5], &end, 0); if (*end) { fprintf(stderr, "error: %s: bad objid '%s'\n", cmdname(argv[0]), argv[5]); @@ -1069,7 +1072,7 @@ int jt_obd_test_brw(int argc, char **argv) count, pages, ctime(&start.tv_sec)); rw = write ? OBD_IOC_BRW_WRITE : OBD_IOC_BRW_READ; - for (i = 1, next_count = verbose, offset = 0; i <= count; i++) { + for (i = 1, next_count = verbose; i <= count; i++) { rc = ioctl(fd, rw, &data); SHMEM_BUMP(); if (rc) { -- 1.8.3.1