X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Fservice.c;h=38bc4abebccf9b6e7f4cd8a39f59d620b5ac4a98;hb=0935858b52b14bca0fa8ea7d15aff207dbc8fc99;hp=7bba07c1f5764758d7c40466ad01749954669c77;hpb=d1c5e58516b8fb39f3b2fd188f53d752d4ad8668;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 7bba07c..38bc4ab 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -27,7 +27,6 @@ #include extern int request_in_callback(ptl_event_t *ev); -extern int ptl_handled_rpc(struct ptlrpc_service *service, void *start); static int ptlrpc_check_event(struct ptlrpc_service *svc, struct ptlrpc_thread *thread, ptl_event_t *event) @@ -39,6 +38,8 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc, if (thread->t_flags & SVC_STOPPING) GOTO(out, rc = 1); + LASSERT ((thread->t_flags & SVC_EVENT) == 0); + if (ptl_is_valid_handle(&svc->srv_eq_h)) { int err; err = PtlEQGet(svc->srv_eq_h, event); @@ -63,8 +64,8 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc, } struct ptlrpc_service * -ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid, - svc_handler_t handler, char *name) +ptlrpc_init_svc(__u32 bufsize, int nbuffs, int req_portal, int rep_portal, + char *uuid, svc_handler_t handler, char *name) { int err; int rc, i; @@ -84,6 +85,7 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid, init_waitqueue_head(&service->srv_waitq); service->srv_buf_size = bufsize; + service->srv_nbuffs = nbuffs; service->srv_rep_portal = rep_portal; service->srv_req_portal = req_portal; service->srv_handler = handler; @@ -95,10 +97,9 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid, RETURN(NULL); } - service->srv_ring_length = RPC_RING_LENGTH; - - rc = PtlEQAlloc(service->srv_self.peer_ni, 1024, request_in_callback, - &(service->srv_eq_h)); + /* NB We need exactly 1 event for each buffer we queue */ + rc = PtlEQAlloc(service->srv_self.peer_ni, service->srv_nbuffs, + request_in_callback, &(service->srv_eq_h)); if (rc != PTL_OK) { CERROR("PtlEQAlloc failed: %d\n", rc); @@ -107,23 +108,33 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid, RETURN(NULL); } - for (i = 0; i < service->srv_ring_length; i++) { - OBD_ALLOC(service->srv_buf[i], service->srv_buf_size); - if (service->srv_buf[i] == NULL) { + OBD_ALLOC(service->srv_rqbds, service->srv_nbuffs * + sizeof(struct ptlrpc_request_buffer_desc)); + if (service->srv_rqbds == NULL) { + CERROR("no memory\n"); + LBUG(); + GOTO(failed, NULL); + } + + for (i = 0; i < service->srv_nbuffs; i++) { + struct ptlrpc_request_buffer_desc *rqbd =&service->srv_rqbds[i]; + + rqbd->rqbd_service = service; + ptl_set_inv_handle (&rqbd->rqbd_me_h); + OBD_ALLOC(rqbd->rqbd_buffer, service->srv_buf_size); + if (rqbd->rqbd_buffer == NULL) { CERROR("no memory\n"); LBUG(); - GOTO(err_ring, NULL); + GOTO(failed, NULL); } - service->srv_ref_count[i] = 0; - ptlrpc_link_svc_me(service, i); + ptlrpc_link_svc_me(rqbd); } CDEBUG(D_NET, "Starting service listening on portal %d\n", service->srv_req_portal); RETURN(service); -err_ring: - service->srv_ring_length = i; +failed: ptlrpc_unregister_service(service); return NULL; } @@ -132,14 +143,16 @@ static int handle_incoming_request(struct obd_device *obddev, struct ptlrpc_service *svc, ptl_event_t *event) { + struct ptlrpc_request_buffer_desc *rqbd = event->mem_desc.user_ptr; struct ptlrpc_request request; - void *start; int rc; /* FIXME: If we move to an event-driven model, we should put the request * on the stack of mds_handle instead. */ LASSERT ((event->mem_desc.options & PTL_MD_IOV) == 0); - start = event->mem_desc.start; + LASSERT (rqbd->rqbd_service == svc); + LASSERT (rqbd->rqbd_buffer == event->mem_desc.start); + LASSERT (event->offset == 0); memset(&request, 0, sizeof(request)); request.rq_svc = svc; @@ -149,7 +162,7 @@ static int handle_incoming_request(struct obd_device *obddev, request.rq_reqlen = event->mem_desc.length; if (request.rq_reqlen < sizeof(struct lustre_msg)) { - CERROR("incomplete request (%d): ptl %d from %Lx xid %Ld\n", + CERROR("incomplete request (%d): ptl %d from "LPX64" xid "LPD64"\n", request.rq_reqlen, svc->srv_req_portal, event->initiator.nid, request.rq_xid); spin_unlock(&svc->srv_lock); @@ -160,11 +173,12 @@ static int handle_incoming_request(struct obd_device *obddev, CERROR("wrong packet type received (type=%u)\n", request.rq_reqmsg->type); LBUG(); + spin_unlock(&svc->srv_lock); RETURN(-EINVAL); } if (request.rq_reqmsg->magic != PTLRPC_MSG_MAGIC) { - CERROR("wrong lustre_msg magic %d: ptl %d from %Lx xid %Ld\n", + CERROR("wrong lustre_msg magic %d: ptl %d from "LPX64" xid "LPD64"\n", request.rq_reqmsg->magic, svc->srv_req_portal, event->initiator.nid, request.rq_xid); spin_unlock(&svc->srv_lock); @@ -172,21 +186,22 @@ static int handle_incoming_request(struct obd_device *obddev, } if (request.rq_reqmsg->version != PTLRPC_MSG_VERSION) { - CERROR("wrong lustre_msg version %d: ptl %d from %Lx xid %Ld\n", + CERROR("wrong lustre_msg version %d: ptl %d from "LPX64" xid "LPD64"\n", request.rq_reqmsg->version, svc->srv_req_portal, event->initiator.nid, request.rq_xid); spin_unlock(&svc->srv_lock); RETURN(-EINVAL); } - CDEBUG(D_NET, "got req %Ld\n", request.rq_xid); + CDEBUG(D_NET, "got req "LPD64"\n", request.rq_xid); request.rq_peer.peer_nid = event->initiator.nid; /* FIXME: this NI should be the incoming NI. * We don't know how to find that from here. */ request.rq_peer.peer_ni = svc->srv_self.peer_ni; - request.rq_export = class_conn2export((struct lustre_handle *) request.rq_reqmsg); + request.rq_export = class_conn2export((struct lustre_handle *) + request.rq_reqmsg); if (request.rq_export) { request.rq_connection = request.rq_export->exp_connection; @@ -194,44 +209,12 @@ static int handle_incoming_request(struct obd_device *obddev, } spin_unlock(&svc->srv_lock); + rc = svc->srv_handler(&request); ptlrpc_put_connection(request.rq_connection); - ptl_handled_rpc(svc, start); - return rc; -} - -void ptlrpc_rotate_reqbufs(struct ptlrpc_service *service, - ptl_event_t *ev) -{ - int index; - LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); - - for (index = 0; index < service->srv_ring_length; index++) - if (service->srv_buf[index] == ev->mem_desc.start) - break; - - if (index == service->srv_ring_length) - LBUG(); - - service->srv_ref_count[index]++; - - if (ptl_is_valid_handle(&ev->unlinked_me)) { - int idx; - - for (idx = 0; idx < service->srv_ring_length; idx++) - if (service->srv_me_h[idx].handle_idx == - ev->unlinked_me.handle_idx) - break; - if (idx == service->srv_ring_length) - LBUG(); - - CDEBUG(D_NET, "unlinked %d\n", idx); - ptl_set_inv_handle(&(service->srv_me_h[idx])); - - if (service->srv_ref_count[idx] == 0) - ptlrpc_link_svc_me(service, idx); - } + ptlrpc_link_svc_me (rqbd); + return rc; } static int ptlrpc_main(void *arg) @@ -277,9 +260,7 @@ static int ptlrpc_main(void *arg) } if (thread->t_flags & SVC_EVENT) { - thread->t_flags &= ~SVC_EVENT; - ptlrpc_rotate_reqbufs(svc, &event); - + LASSERT (event.sequence != 0); rc = handle_incoming_request(obddev, svc, &event); thread->t_flags &= ~SVC_EVENT; continue; @@ -364,17 +345,29 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) { int rc, i; - for (i = 0; i < service->srv_ring_length; i++) { - if (ptl_is_valid_handle(&(service->srv_me_h[i]))) { - rc = PtlMEUnlink(service->srv_me_h[i]); + /* NB service->srv_nbuffs gets set before we attempt (and possibly + * fail) to allocate srv_rqbds. + */ + if (service->srv_rqbds != NULL) { + for (i = 0; i < service->srv_nbuffs; i++) { + struct ptlrpc_request_buffer_desc *rqbd = + &service->srv_rqbds[i]; + + if (rqbd->rqbd_buffer == NULL) /* no buffer allocated */ + continue; /* => never initialised */ + + /* Buffer allocated => got linked */ + LASSERT (ptl_is_valid_handle (&rqbd->rqbd_me_h)); + + rc = PtlMEUnlink(rqbd->rqbd_me_h); if (rc) CERROR("PtlMEUnlink failed: %d\n", rc); - ptl_set_inv_handle(&(service->srv_me_h[i])); + + OBD_FREE(rqbd->rqbd_buffer, service->srv_buf_size); } - if (service->srv_buf[i] != NULL) - OBD_FREE(service->srv_buf[i], service->srv_buf_size); - service->srv_buf[i] = NULL; + OBD_FREE(service->srv_rqbds, service->srv_nbuffs * + sizeof (struct ptlrpc_request_buffer_desc)); } rc = PtlEQFree(service->srv_eq_h); @@ -388,7 +381,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) } OBD_FREE(service, sizeof(*service)); - if (rc) + if (rc) LBUG(); return rc; }