From: eeb Date: Fri, 20 Sep 2002 21:05:50 +0000 (+0000) Subject: Modifications for "circulating" request buffers (sized in lustre_net.h) X-Git-Tag: 0.5.12~46 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=30c8ce5c470a4e7e473188ebd0923a56b3d8c109;p=fs%2Flustre-release.git Modifications for "circulating" request buffers (sized in lustre_net.h) --- diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index a5626e5..69c6732 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -31,9 +31,42 @@ #include #include -/* default rpc ring length */ -//#define RPC_RING_LENGTH 10 -#define RPC_REQUEST_QUEUE_DEPTH 1024 +/* The following constants determine how much memory is devoted to + * buffering in the lustre services. + * + * ?_NEVENTS # event queue entries + * + * ?_NBUFS # request buffers + * ?_BUFSIZE # bytes in a single request buffer + * total memory = ?_NBUFS * ?_BUFSIZE + * + * ?_MAXREQSIZE # maximum request service will receive + * larger messages will get dropped. + * request buffers are auto-unlinked when less than ?_MAXREQSIZE + * is left in them. + */ + +#define LDLM_NEVENTS 1024 +#define LDLM_NBUFS 10 +#define LDLM_BUFSIZE (64 * 1024) +#define LDLM_MAXREQSIZE 1024 + +#define MDS_NEVENTS 1024 +#define MDS_NBUFS 10 +#define MDS_BUFSIZE (64 * 1024) +#define MDS_MAXREQSIZE 1024 + +#ifdef __arch_um__ +#define OST_NEVENTS 1024 +#define OST_NBUFS 10 +#define OST_BUFSIZE (64 * 1024) +#define OST_MAXREQSIZE (8 * 1024) +#else +#define OST_NEVENTS 4096 +#define OST_NBUFS 40 +#define OST_BUFSIZE (128 * 1024) +#define OST_MAXREQSIZE (8 * 1024) +#endif struct ptlrpc_connection { struct list_head c_link; @@ -184,8 +217,10 @@ struct ptlrpc_thread { }; struct ptlrpc_request_buffer_desc { + struct list_head rqbd_list; struct ptlrpc_service *rqbd_service; ptl_handle_me_t rqbd_me_h; + atomic_t rqbd_refcount; char *rqbd_buffer; }; @@ -196,10 +231,12 @@ struct ptlrpc_service { /* incoming request buffers */ /* FIXME: perhaps a list of EQs, if multiple NIs are used? */ - struct ptlrpc_request_buffer_desc *srv_rqbds; /* all the request buffer descriptors */ + __u32 srv_max_req_size; /* biggest request to receive */ + __u32 srv_buf_size; /* # bytes in a request buffer */ + struct list_head srv_rqbds; /* all the request buffer descriptors */ + __u32 srv_nrqbds; /* # request buffers */ + atomic_t srv_nrqbds_receiving; /* # request buffers posted for input */ - __u32 srv_buf_size; /* # bytes in a request buffer */ - __u32 srv_nbuffs; /* # request buffers */ __u32 srv_req_portal; __u32 srv_rep_portal; @@ -213,7 +250,6 @@ struct ptlrpc_service { wait_queue_head_t srv_waitq; /* all threads sleep on this */ spinlock_t srv_lock; - struct list_head srv_reqs; struct list_head srv_threads; int (*srv_handler)(struct ptlrpc_request *req); char *srv_name; /* only statically allocated strings here; we don't clean them */ @@ -274,7 +310,8 @@ int ptlrpc_check_status(struct ptlrpc_request *req, int err); /* rpc/service.c */ struct ptlrpc_service * -ptlrpc_init_svc(__u32 bufsize, int nbuffs, int req_portal, int rep_portal, +ptlrpc_init_svc(__u32 nevents, __u32 nbufs, __u32 bufsize, __u32 max_req_size, + int req_portal, int rep_portal, obd_uuid_t uuid, svc_handler_t, char *name); void ptlrpc_stop_all_threads(struct ptlrpc_service *svc); int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc, diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index d7cb958..a6ede2f 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -559,9 +559,10 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf) if (rc != 0) GOTO(out_dec, rc); */ - ldlm->ldlm_service = ptlrpc_init_svc(1024, 640, LDLM_REQUEST_PORTAL, - LDLM_REPLY_PORTAL, "self", - ldlm_callback_handler, "ldlm"); + ldlm->ldlm_service = ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, + LDLM_BUFSIZE, LDLM_MAXREQSIZE, + LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, + "self", ldlm_callback_handler, "ldlm"); if (!ldlm->ldlm_service) GOTO(out_dec, rc = -ENOMEM); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 113094a..539e50b 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -1098,9 +1098,10 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) GOTO(err_put, rc); } - mds->mds_service = ptlrpc_init_svc(1024, 640, MDS_REQUEST_PORTAL, - MDC_REPLY_PORTAL, "self",mds_handle, - "mds"); + mds->mds_service = ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS, + MDS_BUFSIZE, MDS_MAXREQSIZE, + MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, + "self", mds_handle, "mds"); if (!mds->mds_service) { CERROR("failed to start service\n"); GOTO(err_fs, rc = -EINVAL); diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 03a58ea..2389655 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -597,9 +597,10 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf) GOTO(error_dec, err = -EINVAL); } - ost->ost_service = ptlrpc_init_svc(1024, 4096, OST_REQUEST_PORTAL, - OSC_REPLY_PORTAL, "self", ost_handle, - "ost"); + ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS, + OST_BUFSIZE, OST_MAXREQSIZE, + OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, + "self", ost_handle, "ost"); if (!ost->ost_service) { CERROR("failed to start service\n"); GOTO(error_disc, err = -EINVAL); diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index bff7466..4e9b29c 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -109,15 +109,34 @@ int request_in_callback(ptl_event_t *ev) struct ptlrpc_service *service = rqbd->rqbd_service; LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* requests always contiguous */ - + LASSERT (ev->type == PTL_EVENT_PUT); /* we only enable puts */ + LASSERT (atomic_read (&service->srv_nrqbds_receiving) > 0); + LASSERT (atomic_read (&rqbd->rqbd_refcount) > 0); + if (ev->rlength != ev->mlength) CERROR("Warning: Possibly truncated rpc (%d/%d)\n", ev->mlength, ev->rlength); - if (ev->type == PTL_EVENT_PUT) - wake_up(&service->srv_waitq); + if (ptl_is_valid_handle (&ev->unlinked_me)) + { + /* This is the last request to be received into this + * request buffer. We don't bump the refcount, since the + * thread servicing this event is effectively taking over + * portals' reference. + */ + LASSERT (!memcmp (&ev->unlinked_me, &rqbd->rqbd_me_h, + sizeof (ev->unlinked_me))); + + if (atomic_dec_and_test (&service->srv_nrqbds_receiving)) /* we're off-air */ + { + CERROR ("All request buffers busy\n"); + LBUG(); + } + } else - CERROR("Unexpected event type: %d\n", ev->type); + atomic_inc (&rqbd->rqbd_refcount); /* +1 ref for service thread */ + + wake_up(&service->srv_waitq); return 0; } diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 2351bdd..d7884f9 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -398,6 +398,8 @@ void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd) ptl_md_t dummy; ptl_handle_md_t md_h; + LASSERT (atomic_read (&rqbd->rqbd_refcount) == 0); + /* Attach the leading ME on which we build the ring */ rc = PtlMEAttach(service->srv_self.peer_ni, service->srv_req_portal, match_id, 0, ~0, @@ -409,16 +411,22 @@ void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd) dummy.start = rqbd->rqbd_buffer; dummy.length = service->srv_buf_size; - dummy.max_offset = service->srv_buf_size; - dummy.threshold = 1; - dummy.options = PTL_MD_OP_PUT; + dummy.max_size = service->srv_max_req_size; + dummy.threshold = PTL_MD_THRESH_INF; + dummy.options = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | PTL_MD_AUTO_UNLINK; dummy.user_ptr = rqbd; dummy.eventq = service->srv_eq_h; + atomic_inc (&service->srv_nrqbds_receiving); + atomic_set (&rqbd->rqbd_refcount, 1); /* 1 ref for portals */ + rc = PtlMDAttach(rqbd->rqbd_me_h, dummy, PTL_UNLINK, &md_h); if (rc != PTL_OK) { - /* cleanup */ CERROR("PtlMDAttach failed: %d\n", rc); LBUG(); +#warning proper cleanup required + PtlMEUnlink (rqbd->rqbd_me_h); + atomic_set (&rqbd->rqbd_refcount, 0); + atomic_dec (&service->srv_nrqbds_receiving); } } diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 81abcd7..cac9932 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -64,7 +64,9 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc, } struct ptlrpc_service * -ptlrpc_init_svc(__u32 bufsize, int nbuffs, int req_portal, int rep_portal, +ptlrpc_init_svc(__u32 nevents, __u32 nbufs, + __u32 bufsize, __u32 max_req_size, + int req_portal, int rep_portal, obd_uuid_t uuid, svc_handler_t handler, char *name) { int err; @@ -80,12 +82,15 @@ ptlrpc_init_svc(__u32 bufsize, int nbuffs, int req_portal, int rep_portal, service->srv_name = name; spin_lock_init(&service->srv_lock); - INIT_LIST_HEAD(&service->srv_reqs); INIT_LIST_HEAD(&service->srv_threads); init_waitqueue_head(&service->srv_waitq); + service->srv_max_req_size = max_req_size; service->srv_buf_size = bufsize; - service->srv_nbuffs = nbuffs; + INIT_LIST_HEAD (&service->srv_rqbds); + service->srv_nrqbds = 0; + atomic_set (&service->srv_nrqbds_receiving, 0); + service->srv_rep_portal = rep_portal; service->srv_req_portal = req_portal; service->srv_handler = handler; @@ -97,36 +102,37 @@ ptlrpc_init_svc(__u32 bufsize, int nbuffs, int req_portal, int rep_portal, RETURN(NULL); } - /* NB We need exactly 1 event for each buffer we queue */ - rc = PtlEQAlloc(service->srv_self.peer_ni, service->srv_nbuffs, + rc = PtlEQAlloc(service->srv_self.peer_ni, nevents, request_in_callback, &(service->srv_eq_h)); if (rc != PTL_OK) { CERROR("PtlEQAlloc failed: %d\n", rc); - LBUG(); OBD_FREE(service, sizeof(*service)); RETURN(NULL); } - OBD_ALLOC(service->srv_rqbds, service->srv_nbuffs * - sizeof(struct ptlrpc_request_buffer_desc)); - if (service->srv_rqbds == NULL) { - CERROR("no memory\n"); - LBUG(); - GOTO(failed, NULL); - } + for (i = 0; i < nbufs; i++) { + struct ptlrpc_request_buffer_desc *rqbd; - for (i = 0; i < service->srv_nbuffs; i++) { - struct ptlrpc_request_buffer_desc *rqbd =&service->srv_rqbds[i]; + OBD_ALLOC (rqbd, sizeof (*rqbd)); + if (rqbd == NULL) + { + CERROR ("no memory\n"); + GOTO (failed, NULL); + } rqbd->rqbd_service = service; ptl_set_inv_handle (&rqbd->rqbd_me_h); + atomic_set (&rqbd->rqbd_refcount, 0); OBD_ALLOC(rqbd->rqbd_buffer, service->srv_buf_size); if (rqbd->rqbd_buffer == NULL) { + OBD_FREE (rqbd, sizeof (*rqbd)); CERROR("no memory\n"); - LBUG(); GOTO(failed, NULL); } + list_add (&rqbd->rqbd_list, &service->srv_rqbds); + service->srv_nrqbds++; + ptlrpc_link_svc_me(rqbd); } @@ -149,48 +155,47 @@ static int handle_incoming_request(struct obd_device *obddev, /* FIXME: If we move to an event-driven model, we should put the request * on the stack of mds_handle instead. */ + + LASSERT (atomic_read (&rqbd->rqbd_refcount) > 0); LASSERT ((event->mem_desc.options & PTL_MD_IOV) == 0); LASSERT (rqbd->rqbd_service == svc); LASSERT (rqbd->rqbd_buffer == event->mem_desc.start); - LASSERT (event->offset == 0); + LASSERT (event->offset + event->mlength <= svc->srv_buf_size); memset(request, 0, sizeof(*request)); request->rq_svc = svc; request->rq_obd = obddev; request->rq_xid = event->match_bits; request->rq_reqmsg = event->mem_desc.start + event->offset; - request->rq_reqlen = event->mem_desc.length; + request->rq_reqlen = event->mlength; + + rc = -EINVAL; if (request->rq_reqlen < sizeof(struct lustre_msg)) { CERROR("incomplete request (%d): ptl %d from "LPX64" xid "LPD64"\n", request->rq_reqlen, svc->srv_req_portal, event->initiator.nid, request->rq_xid); - spin_unlock(&svc->srv_lock); - RETURN(-EINVAL); + goto out; } if (NTOH__u32(request->rq_reqmsg->type) != PTL_RPC_MSG_REQUEST) { CERROR("wrong packet type received (type=%u)\n", request->rq_reqmsg->type); - LBUG(); - spin_unlock(&svc->srv_lock); - RETURN(-EINVAL); + goto out; } if (request->rq_reqmsg->magic != PTLRPC_MSG_MAGIC) { CERROR("wrong lustre_msg magic %d: ptl %d from "LPX64" xid "LPD64"\n", request->rq_reqmsg->magic, svc->srv_req_portal, event->initiator.nid, request->rq_xid); - spin_unlock(&svc->srv_lock); - RETURN(-EINVAL); + goto out; } if (request->rq_reqmsg->version != PTLRPC_MSG_VERSION) { CERROR("wrong lustre_msg version %d: ptl %d from "LPX64" xid "LPD64"\n", request->rq_reqmsg->version, svc->srv_req_portal, event->initiator.nid, request->rq_xid); - spin_unlock(&svc->srv_lock); - RETURN(-EINVAL); + goto out; } CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid); @@ -208,12 +213,13 @@ static int handle_incoming_request(struct obd_device *obddev, ptlrpc_connection_addref(request->rq_connection); } - spin_unlock(&svc->srv_lock); - rc = svc->srv_handler(request); ptlrpc_put_connection(request->rq_connection); - ptlrpc_link_svc_me (rqbd); + out: + if (atomic_dec_and_test (&rqbd->rqbd_refcount)) /* last reference? */ + ptlrpc_link_svc_me (rqbd); + return rc; } @@ -257,25 +263,27 @@ static int ptlrpc_main(void *arg) wait_event(svc->srv_waitq, ptlrpc_check_event(svc, thread, event)); - spin_lock(&svc->srv_lock); - if (thread->t_flags & SVC_STOPPING) { + spin_lock(&svc->srv_lock); thread->t_flags &= ~SVC_STOPPING; spin_unlock(&svc->srv_lock); + EXIT; break; } if (thread->t_flags & SVC_EVENT) { - LASSERT (event->sequence != 0); + spin_lock(&svc->srv_lock); + thread->t_flags &= ~SVC_EVENT; + spin_unlock(&svc->srv_lock); + rc = handle_incoming_request(obddev, svc, event, request); - thread->t_flags &= ~SVC_EVENT; continue; } CERROR("unknown break in service"); - spin_unlock(&svc->srv_lock); + LBUG(); EXIT; break; } @@ -358,43 +366,47 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc, int ptlrpc_unregister_service(struct ptlrpc_service *service) { - int rc, i; - - /* NB service->srv_nbuffs gets set before we attempt (and possibly - * fail) to allocate srv_rqbds. - */ - if (service->srv_rqbds != NULL) { - for (i = 0; i < service->srv_nbuffs; i++) { - struct ptlrpc_request_buffer_desc *rqbd = - &service->srv_rqbds[i]; - - if (rqbd->rqbd_buffer == NULL) /* no buffer allocated */ - continue; /* => never initialised */ - - /* Buffer allocated => got linked */ - LASSERT (ptl_is_valid_handle (&rqbd->rqbd_me_h)); + int rc; - rc = PtlMEUnlink(rqbd->rqbd_me_h); - if (rc) - CERROR("PtlMEUnlink failed: %d\n", rc); + LASSERT (list_empty (&service->srv_threads)); - OBD_FREE(rqbd->rqbd_buffer, service->srv_buf_size); - } - - OBD_FREE(service->srv_rqbds, service->srv_nbuffs * - sizeof (struct ptlrpc_request_buffer_desc)); + /* XXX We could reply (with failure) to all buffered requests + * _after_ unlinking _all_ the request buffers, but _before_ + * freeing them. + */ + + while (!list_empty (&service->srv_rqbds)) + { + struct ptlrpc_request_buffer_desc *rqbd = + list_entry (service->srv_rqbds.next, + struct ptlrpc_request_buffer_desc, + rqbd_list); + + list_del (&rqbd->rqbd_list); + + LASSERT (atomic_read (&rqbd->rqbd_refcount) > 0); + /* refcount could be anything; it's possible for the + * buffers to continued to get filled after all the server + * threads exited. But we know they _have_ exited. + */ + + (void) PtlMEUnlink(rqbd->rqbd_me_h); + /* The callback handler could have unlinked this ME already + * (we're racing with her) but it's safe to ensure it _has_ + * been unlinked. + */ + + OBD_FREE (rqbd->rqbd_buffer, service->srv_buf_size); + OBD_FREE (rqbd, sizeof (*rqbd)); + service->srv_nrqbds--; } + LASSERT (service->srv_nrqbds == 0); + rc = PtlEQFree(service->srv_eq_h); if (rc) CERROR("PtlEQFree failed: %d\n", rc); - if (!list_empty(&service->srv_reqs)) { - // XXX reply with errors and clean up - CERROR("Request list not empty!\n"); - rc = -EBUSY; - } - OBD_FREE(service, sizeof(*service)); if (rc) LBUG();