From: braam Date: Mon, 18 Mar 2002 15:00:07 +0000 (+0000) Subject: - add a figure X-Git-Tag: 0.4.2~519 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=215c76813ea378e4937aae905516364ddcbdb9b9;p=fs%2Flustre-release.git - add a figure - fix the problem that the refcount is reduced on the wrong md in the ring buffer --- diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 609f2e1..f30f0b8 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -97,6 +97,7 @@ struct ptlrpc_client { struct ptlrpc_request { int rq_type; /* one of PTL_RPC_REQUEST, PTL_RPC_REPLY, PTL_RPC_BULK */ + spinlock_t rq_lock; struct list_head rq_list; struct obd_device *rq_obd; int rq_status; @@ -207,7 +208,6 @@ int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc, int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc, struct ptlrpc_request *req); int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer); -int ptl_received_rpc(struct ptlrpc_service *service); /* rpc/client.c */ int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal, diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 4755dda..4393f45 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -60,7 +60,9 @@ int ptlrpc_enqueue(struct ptlrpc_client *peer, struct ptlrpc_request *req) /* remember where it came from */ srv_req->rq_reply_handle = req; + spin_lock(&peer->cli_lock); list_add(&srv_req->rq_list, &peer->cli_obd->obd_req_list); + spin_unlock(&peer->cli_lock); wake_up(&peer->cli_obd->obd_req_waitq); return 0; } @@ -136,6 +138,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, } memset(request, 0, sizeof(*request)); + spin_lock_init(&request->rq_lock); spin_lock(&cl->cli_lock); request->rq_xid = cl->cli_xid++; @@ -167,10 +170,14 @@ void ptlrpc_free_req(struct ptlrpc_request *request) static int ptlrpc_check_reply(struct ptlrpc_request *req) { + int rc = 0; + + spin_lock(&req->rq_lock); if (req->rq_repbuf != NULL) { req->rq_flags = PTL_RPC_REPLY; EXIT; - return 1; + rc = 1; + goto out; } if (sigismember(&(current->pending.signal), SIGKILL) || @@ -178,10 +185,13 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req) sigismember(&(current->pending.signal), SIGINT)) { req->rq_flags = PTL_RPC_INTR; EXIT; - return 1; + rc = 1; + goto out; } - return 0; + out: + spin_unlock(&req->rq_lock); + return rc; } int ptlrpc_check_status(struct ptlrpc_request *req, int err) @@ -223,19 +233,22 @@ int ptlrpc_abort(struct ptlrpc_request *request) { /* First remove the MD for the reply; in theory, this means * that we can tear down the buffer safely. */ + spin_lock(&request->rq_lock); PtlMEUnlink(request->rq_reply_me_h); PtlMDUnlink(request->rq_reply_md_h); OBD_FREE(request->rq_repbuf, request->rq_replen); request->rq_repbuf = NULL; request->rq_replen = 0; + spin_unlock(&request->rq_lock); return 0; } + int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) { - int rc; + int rc = 0; ENTRY; init_waitqueue_head(&req->rq_wait_for_rep); @@ -258,12 +271,13 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) CDEBUG(D_OTHER, "-- sleeping\n"); wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req)); CDEBUG(D_OTHER, "-- done\n"); - + spin_lock(&req->rq_lock); if (req->rq_flags == PTL_RPC_INTR) { /* Clean up the dangling reply buffers */ ptlrpc_abort(req); EXIT; - return -EINTR; + rc = -EINTR; + goto out; } if (req->rq_flags != PTL_RPC_REPLY) { @@ -272,14 +286,15 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) ptlrpc_abort(req); //BUG(); EXIT; - return -EINTR; + rc = -EINTR; + goto out; } rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, &req->rq_rep); if (rc) { CERROR("unpack_rep failed: %d\n", rc); - return rc; + goto out; } CDEBUG(D_NET, "got rep %d\n", req->rq_rephdr->xid); @@ -288,5 +303,7 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) req->rq_replen, req->rq_rephdr->status); EXIT; - return 0; + out: + spin_unlock(&req->rq_lock); + return rc; } diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index e734aac..e941587 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -339,17 +339,28 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) * it finishes processing an event. This ensures the ref count is * decremented and that the rpc ring buffer cycles properly. */ -int ptl_received_rpc(struct ptlrpc_service *service) +int ptl_handled_rpc(struct ptlrpc_service *service, void *start) { - int rc, index; + int rc, index = 0; spin_lock(&service->srv_lock); - index = service->srv_md_active; + /* XXX this is wrong must find index on which request arrived!!!*/ + while (index < service->srv_ring_length) { + if ( service->srv_md[index].start == start) + break; + index++; + } + if (index == service->srv_ring_length) + BUG(); + CDEBUG(D_INFO, "MD index=%d Ref Count=%d\n", index, service->srv_ref_count[index]); service->srv_ref_count[index]--; - if ((service->srv_ref_count[index] <= 0) && + if (service->srv_ref_count[index] < 0) + BUG(); + + if ((service->srv_ref_count[index] == 0) && (service->srv_me_h[index] == 0)) { /* Replace the unlinked ME and MD */ diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index f59b267..652da36 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -33,6 +33,7 @@ #include extern int server_request_callback(ptl_event_t *ev, void *data); +extern int ptl_handled_rpc(struct ptlrpc_service *service, void *start); static int ptlrpc_check_event(struct ptlrpc_service *svc) { @@ -174,11 +175,13 @@ static int ptlrpc_main(void *arg) if (svc->srv_flags & SVC_EVENT) { struct ptlrpc_request request; + void *start; svc->srv_flags = SVC_RUNNING; /* FIXME: If we move to an event-driven model, * we should put the request on the stack of * mds_handle instead. */ + start = svc->srv_ev.mem_desc.start; memset(&request, 0, sizeof(request)); request.rq_obd = obddev; request.rq_reqbuf = (svc->srv_ev.mem_desc.start + @@ -195,7 +198,7 @@ static int ptlrpc_main(void *arg) spin_unlock(&svc->srv_lock); rc = svc->srv_handler(obddev, svc, &request); - ptl_received_rpc(svc); + ptl_handled_rpc(svc, start); continue; }