From 75f05cdc561f3758427463a8b6dfce9ea3ab91b5 Mon Sep 17 00:00:00 2001 From: braam Date: Wed, 27 Mar 2002 20:31:22 +0000 Subject: [PATCH] - simplified ptlrpc_service by using ME auto-unlinking - merged ptlrpc_init_srv and rpc_register_service --- lustre/include/linux/lustre_net.h | 50 ++++++------ lustre/mds/handler.c | 6 +- lustre/ost/ost_handler.c | 3 +- lustre/ptlrpc/events.c | 61 +++++---------- lustre/ptlrpc/niobuf.c | 95 +++++++++++------------ lustre/ptlrpc/service.c | 156 +++++++++++++------------------------- 6 files changed, 151 insertions(+), 220 deletions(-) diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 330cb81..bd5e1fd 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -99,28 +99,28 @@ struct ptlrpc_client { struct ptlrpc_request { int rq_type; /* one of PTL_RPC_REQUEST, PTL_RPC_REPLY, PTL_RPC_BULK */ spinlock_t rq_lock; - struct list_head rq_list; - struct obd_device *rq_obd; - int rq_status; + struct list_head rq_list; + struct obd_device *rq_obd; + int rq_status; int rq_flags; __u32 rq_connid; __u32 rq_xid; - char *rq_reqbuf; - int rq_reqlen; - struct ptlreq_hdr *rq_reqhdr; - union ptl_req rq_req; + char *rq_reqbuf; + int rq_reqlen; + struct ptlreq_hdr *rq_reqhdr; + union ptl_req rq_req; - char *rq_repbuf; - int rq_replen; - struct ptlrep_hdr *rq_rephdr; - union ptl_rep rq_rep; + char *rq_repbuf; + int rq_replen; + struct ptlrep_hdr *rq_rephdr; + union ptl_rep rq_rep; char *rq_bulkbuf; int rq_bulklen; void * rq_reply_handle; - wait_queue_head_t rq_wait_for_rep; + wait_queue_head_t rq_wait_for_rep; /* incoming reply */ ptl_md_t rq_reply_md; @@ -148,7 +148,7 @@ struct ptlrpc_bulk_desc { struct obd_conn b_conn; __u32 b_xid; - wait_queue_head_t b_waitq; + wait_queue_head_t b_waitq; ptl_md_t b_md; ptl_handle_md_t b_md_h; @@ -159,17 +159,13 @@ struct ptlrpc_service { /* incoming request buffers */ /* FIXME: perhaps a list of EQs, if multiple NIs are used? */ char *srv_buf[RPC_RING_LENGTH]; + __u32 srv_ref_count[RPC_RING_LENGTH]; + ptl_handle_me_t srv_me_h[RPC_RING_LENGTH]; __u32 srv_buf_size; - __u32 srv_me_tail; - __u32 srv_md_active; __u32 srv_ring_length; __u32 srv_req_portal; __u32 srv_rep_portal; - __u32 srv_ref_count[RPC_RING_LENGTH]; - ptl_handle_me_t srv_me_h[RPC_RING_LENGTH]; - ptl_process_id_t srv_id; - ptl_md_t srv_md[RPC_RING_LENGTH]; - ptl_handle_md_t srv_md_h[RPC_RING_LENGTH]; + __u32 srv_xid; /* event queue */ @@ -177,13 +173,14 @@ struct ptlrpc_service { __u32 srv_flags; struct lustre_peer srv_self; - struct task_struct *srv_thread; - wait_queue_head_t srv_waitq; - wait_queue_head_t srv_ctl_waitq; - int ost_flags; + ptl_process_id_t srv_id; + + struct task_struct *srv_thread; + wait_queue_head_t srv_waitq; + wait_queue_head_t srv_ctl_waitq; - spinlock_t srv_lock; - struct list_head srv_reqs; + spinlock_t srv_lock; + struct list_head srv_reqs; ptl_event_t srv_ev; req_unpack_t srv_req_unpack; rep_pack_t srv_rep_pack; @@ -208,6 +205,7 @@ int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc, int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc, struct ptlrpc_request *req); int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer); +void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i); /* rpc/client.c */ int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal, diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 92f4216..c589d2d 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -505,8 +505,10 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) mds->mds_service = ptlrpc_init_svc(64 * 1024, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, "self", mds_handle); - - rpc_register_service(mds->mds_service, "self"); + if (!mds->mds_service) { + CERROR("failed to start service\n"); + RETURN(-EINVAL); + } err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds"); if (err) diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index b8b3bdf..6d818e0 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -682,12 +682,11 @@ static int ost_setup(struct obd_device *obddev, obd_count len, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, "self", ost_handle); if (!ost->ost_service) { + CERROR("failed to start service\n"); obd_disconnect(&ost->ost_conn); RETURN(-EINVAL); } - rpc_register_service(ost->ost_service, "self"); - err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost"); if (err) { obd_disconnect(&ost->ost_conn); diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index d07acd7..fc97e3a 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -79,63 +79,43 @@ static int rcvd_reply_callback(ptl_event_t *ev, void *data) int server_request_callback(ptl_event_t *ev, void *data) { struct ptlrpc_service *service = data; - int rc; + int index; if (ev->rlength != ev->mlength) CERROR("Warning: Possibly truncated rpc (%d/%d)\n", ev->mlength, ev->rlength); - /* The ME is unlinked when there is less than 1024 bytes free - * on its MD. This ensures we are always able to handle the rpc, - * although the 1024 value is a guess as to the size of a - * large rpc (the known safe margin should be determined). - * - * NOTE: The portals API by default unlinks all MD's associated - * with an ME when it's unlinked. For now, this behavior - * has been commented out of the portals library so the - * MD can be unlinked when its ref count drops to zero. - * A new MD and ME will then be created that use the same - * kmalloc()'ed memory and inserted at the ring tail. - */ - spin_lock(&service->srv_lock); - if ( ev->mem_desc.start != - service->srv_md[service->srv_md_active].start ) { + for (index = 0; index < service->srv_ring_length; index++) + if ( service->srv_buf[index] == ev->mem_desc.start) + break; + + if (index == service->srv_ring_length) LBUG(); - } - service->srv_ref_count[service->srv_md_active]++; - CDEBUG(D_INODE, "event offset %d buf size %d\n", - ev->offset, service->srv_buf_size); - if (ev->offset >= (service->srv_buf_size - 1024)) { - CDEBUG(D_INODE, "Unlinking ME %d\n", service->srv_md_active); + service->srv_ref_count[index]++; - rc = PtlMEUnlink(service->srv_me_h[service->srv_md_active]); - service->srv_me_h[service->srv_md_active] = 0; + if (ev->unlinked_me != -1) { + int idx; - if (rc != PTL_OK) { - CERROR("PtlMEUnlink failed - DROPPING soon: %d\n", rc); + for (idx = 0; idx < service->srv_ring_length; idx++) + if (service->srv_me_h[idx] == ev->unlinked_me) + break; + if (idx == service->srv_ring_length) LBUG(); - spin_unlock(&service->srv_lock); - return rc; - } - service->srv_md_active = (service->srv_md_active + 1) % - service->srv_ring_length; + CDEBUG(D_NET, "unlinked %d\n", idx); + service->srv_me_h[idx] = 0; - if (service->srv_me_h[service->srv_md_active] == 0) { - CERROR("All %d ring ME's are unlinked!\n", - service->srv_ring_length); - LBUG(); - } + if (service->srv_ref_count[idx] == 0) + ptlrpc_link_svc_me(service, idx); } spin_unlock(&service->srv_lock); - if (ev->type == PTL_EVENT_PUT) { + if (ev->type == PTL_EVENT_PUT) wake_up(&service->srv_waitq); - } else { + else CERROR("Unexpected event type: %d\n", ev->type); - } return 0; } @@ -180,9 +160,8 @@ static int bulk_sink_callback(ptl_event_t *ev, void *data) } /* FIXME: This should happen unconditionally */ - if (bulk->b_cb != NULL) { + if (bulk->b_cb != NULL) OBD_FREE(bulk, sizeof(*bulk)); - } EXIT; return 1; diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 10dd72d..9738531 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -158,8 +158,9 @@ int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *bulk) ENTRY; - rc = PtlMEPrepend(bulk->b_peer.peer_ni, bulk->b_portal, local_id, - bulk->b_xid, 0, PTL_UNLINK, &bulk->b_me_h); + rc = PtlMEAttach(bulk->b_peer.peer_ni, bulk->b_portal, local_id, + bulk->b_xid, 0, PTL_UNLINK, PTL_INS_AFTER, + &bulk->b_me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); LBUG(); @@ -293,8 +294,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) local_id.pid = PTL_ID_ANY; //CERROR("sending req %d\n", request->rq_xid); - rc = PtlMEPrepend(peer->peer_ni, request->rq_reply_portal, local_id, - request->rq_xid, 0, PTL_UNLINK, + rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id, + request->rq_xid, 0, PTL_UNLINK, PTL_INS_AFTER, &request->rq_reply_me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); @@ -334,21 +335,55 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) return rc; } +void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i) +{ + int rc; + ptl_md_t dummy; + ptl_handle_md_t md_h; + + /* Attach the leading ME on which we build the ring */ + rc = PtlMEAttach(service->srv_self.peer_ni, service->srv_req_portal, + service->srv_id, 0, ~0, PTL_RETAIN, PTL_INS_BEFORE, + &(service->srv_me_h[i])); + if (rc != PTL_OK) { + CERROR("PtlMEAttach failed: %d\n", rc); + LBUG(); + } + + if (service->srv_ref_count[i]) + LBUG(); + + dummy.start = service->srv_buf[i]; + dummy.length = service->srv_buf_size; + dummy.max_offset = service->srv_buf_size; + dummy.threshold = PTL_MD_THRESH_INF; + dummy.options = PTL_MD_OP_PUT | PTL_MD_AUTO_UNLINK; + dummy.user_ptr = service; + dummy.eventq = service->srv_eq_h; + dummy.max_offset = service->srv_buf_size; + + rc = PtlMDAttach(service->srv_me_h[i], dummy, PTL_UNLINK, &md_h); + if (rc != PTL_OK) { + /* cleanup */ + CERROR("PtlMDAttach failed: %d\n", rc); + LBUG(); + } +} + /* ptl_handled_rpc() should be called by the sleeping process once * it finishes processing an event. This ensures the ref count is * decremented and that the rpc ring buffer cycles properly. */ int ptl_handled_rpc(struct ptlrpc_service *service, void *start) { - int rc, index = 0; + int index; spin_lock(&service->srv_lock); - while (index < service->srv_ring_length) { - if ( service->srv_md[index].start == start) + for (index = 0; index < service->srv_ring_length; index++) + if (service->srv_buf[index] == start) break; - index++; - } + if (index == service->srv_ring_length) LBUG(); @@ -360,44 +395,10 @@ int ptl_handled_rpc(struct ptlrpc_service *service, void *start) LBUG(); if (service->srv_ref_count[index] == 0 && - service->srv_me_h[index] == 0) { - - /* Replace the unlinked ME and MD */ - rc = PtlMEInsert(service->srv_me_h[service->srv_me_tail], - service->srv_id, 0, ~0, PTL_RETAIN, - PTL_INS_AFTER, &(service->srv_me_h[index])); - if (rc != PTL_OK) { - CERROR("PtlMEInsert failed: %d\n", rc); - LBUG(); - spin_unlock(&service->srv_lock); - return rc; - } - CDEBUG(D_NET, "Inserting new ME and MD in ring, rc %d\n", rc); - - service->srv_me_tail = index; - - service->srv_md[index].start = service->srv_buf[index]; - service->srv_md[index].length = service->srv_buf_size; - service->srv_md[index].threshold = PTL_MD_THRESH_INF; - service->srv_md[index].options = PTL_MD_OP_PUT; - service->srv_md[index].user_ptr = service; - service->srv_md[index].eventq = service->srv_eq_h; - - rc = PtlMDAttach(service->srv_me_h[index], - service->srv_md[index], - PTL_RETAIN, &(service->srv_md_h[index])); - //CERROR("MDAttach (request MDs): %Lu\n", - //(__u64)(service->srv_md_h[index])); - - CDEBUG(D_INFO, "Attach MD in ring, rc %d\n", rc); - if (rc != PTL_OK) { - /* XXX cleanup */ - CERROR("PtlMDAttach failed: %d\n", rc); - LBUG(); - spin_unlock(&service->srv_lock); - return rc; - } - } + service->srv_me_h[index] == 0) { + CDEBUG(D_NET, "relinking %d\n", index); + ptlrpc_link_svc_me(service, index); + } spin_unlock(&service->srv_lock); return 0; diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 83538e0..1612597 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -98,36 +98,67 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid, svc_handler_t handler) { int err; - struct ptlrpc_service *svc; + int rc, i; + struct ptlrpc_service *service; - OBD_ALLOC(svc, sizeof(*svc)); - if ( !svc ) { + OBD_ALLOC(service, sizeof(*service)); + if ( !service ) { CERROR("no memory\n"); + LBUG(); return NULL; } - memset(svc, 0, sizeof(*svc)); + memset(service, 0, sizeof(*service)); - spin_lock_init(&svc->srv_lock); - INIT_LIST_HEAD(&svc->srv_reqs); - init_waitqueue_head(&svc->srv_ctl_waitq); - init_waitqueue_head(&svc->srv_waitq); + spin_lock_init(&service->srv_lock); + INIT_LIST_HEAD(&service->srv_reqs); + init_waitqueue_head(&service->srv_ctl_waitq); + init_waitqueue_head(&service->srv_waitq); - svc->srv_thread = NULL; - svc->srv_flags = 0; + service->srv_thread = NULL; + service->srv_flags = 0; - svc->srv_buf_size = bufsize; - svc->srv_rep_portal = rep_portal; - svc->srv_req_portal = req_portal; + service->srv_buf_size = bufsize; + service->srv_rep_portal = rep_portal; + service->srv_req_portal = req_portal; + service->srv_handler = handler; - svc->srv_handler = handler; - err = kportal_uuid_to_peer(uuid, &svc->srv_self); + err = kportal_uuid_to_peer(uuid, &service->srv_self); if (err) { CERROR("cannot get peer for uuid %s", uuid); - OBD_FREE(svc, sizeof(*svc)); + OBD_FREE(service, sizeof(*service)); return NULL; } - return svc; + + service->srv_ring_length = RPC_RING_LENGTH; + service->srv_id.nid = PTL_ID_ANY; + service->srv_id.pid = PTL_ID_ANY; + + rc = PtlEQAlloc(service->srv_self.peer_ni, 128, + server_request_callback, + service, &(service->srv_eq_h)); + + if (rc != PTL_OK) { + CERROR("PtlEQAlloc failed: %d\n", rc); + LBUG(); + return NULL; + } + + for (i = 0; i < service->srv_ring_length; i++) { + OBD_ALLOC(service->srv_buf[i], service->srv_buf_size); + if (service->srv_buf[i] == NULL) { + CERROR("no memory\n"); + LBUG(); + return NULL; + } + service->srv_ref_count[i] = 0; + ptlrpc_link_svc_me(service, i); + } + + CDEBUG(D_NET, "Starting service listening on portal %d\n", + service->srv_req_portal); + + return service; } static int ptlrpc_main(void *arg) @@ -261,97 +292,18 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc, } -int rpc_register_service(struct ptlrpc_service *service, char *uuid) -{ - struct lustre_peer peer; - int rc, i; - - rc = kportal_uuid_to_peer(uuid, &peer); - if (rc != 0) { - CERROR("Invalid uuid \"%s\"\n", uuid); - return -EINVAL; - } - - service->srv_ring_length = RPC_RING_LENGTH; - service->srv_md_active = 0; - - service->srv_id.nid = PTL_ID_ANY; - service->srv_id.pid = PTL_ID_ANY; - - rc = PtlEQAlloc(peer.peer_ni, 128, server_request_callback, - service, &(service->srv_eq_h)); - - if (rc != PTL_OK) { - CERROR("PtlEQAlloc failed: %d\n", rc); - return rc; - } - - CDEBUG(D_NET, "Starting service listening on portal %d\n", - service->srv_req_portal); - - /* Attach the leading ME on which we build the ring */ - rc = PtlMEAttach(peer.peer_ni, service->srv_req_portal, - service->srv_id, 0, ~0, PTL_RETAIN, - &(service->srv_me_h[0])); - - if (rc != PTL_OK) { - CERROR("PtlMEAttach failed: %d\n", rc); - return rc; - } - - for (i = 0; i < service->srv_ring_length; i++) { - OBD_ALLOC(service->srv_buf[i], service->srv_buf_size); - - if (service->srv_buf[i] == NULL) { - CERROR("no memory\n"); - return -ENOMEM; - } - - /* Insert additional ME's to the ring */ - if (i > 0) { - rc = PtlMEInsert(service->srv_me_h[i - 1], - service->srv_id, 0, ~0, PTL_RETAIN, - PTL_INS_AFTER, - &(service->srv_me_h[i])); - service->srv_me_tail = i; - - if (rc != PTL_OK) { - CERROR("PtlMEInsert failed: %d\n", rc); - return rc; - } - } - - service->srv_ref_count[i] = 0; - service->srv_md[i].start = service->srv_buf[i]; - service->srv_md[i].length = service->srv_buf_size; - service->srv_md[i].max_offset = service->srv_buf_size; - service->srv_md[i].threshold = PTL_MD_THRESH_INF; - service->srv_md[i].options = PTL_MD_OP_PUT; - service->srv_md[i].user_ptr = service; - service->srv_md[i].eventq = service->srv_eq_h; - service->srv_md[i].max_offset = service->srv_buf_size; - - rc = PtlMDAttach(service->srv_me_h[i], service->srv_md[i], - PTL_RETAIN, &(service->srv_md_h[i])); - - if (rc != PTL_OK) { - /* cleanup */ - CERROR("PtlMDAttach failed: %d\n", rc); - return rc; - } - } - - return 0; -} int rpc_unregister_service(struct ptlrpc_service *service) { int rc, i; for (i = 0; i < service->srv_ring_length; i++) { - rc = PtlMEUnlink(service->srv_me_h[i]); - if (rc) - CERROR("PtlMEUnlink failed: %d\n", rc); + if (service->srv_me_h[i]) { + rc = PtlMEUnlink(service->srv_me_h[i]); + if (rc) + CERROR("PtlMEUnlink failed: %d\n", rc); + service->srv_me_h[i] = 0; + } if (service->srv_buf[i] != NULL) OBD_FREE(service->srv_buf[i], service->srv_buf_size); -- 1.8.3.1