From 32a56b4b86bf4c55b3cd0c98c91a3d4ba21c7954 Mon Sep 17 00:00:00 2001 From: adilger Date: Wed, 24 Jul 2002 20:05:29 +0000 Subject: [PATCH] Fix RPC request leak and potential refcounting problems on bulk descriptor. --- lustre/osc/osc_request.c | 124 ++++++++++++++++++++++++++++++----------------- 1 file changed, 79 insertions(+), 45 deletions(-) diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 8594c94..e7e6d54 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -501,23 +501,24 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, desc = ptlrpc_prep_bulk(connection); if (!desc) - GOTO(out_free, rc = -ENOMEM); + GOTO(out_req, rc = -ENOMEM); desc->b_portal = OST_BULK_PORTAL; desc->b_cb = brw_finish; OBD_ALLOC(cb_data, sizeof(*cb_data)); if (!cb_data) - GOTO(out_free, rc = -ENOMEM); + GOTO(out_desc, rc = -ENOMEM); + cb_data->callback = callback; cb_data->cb_data = data; desc->b_cb_data = cb_data; - /* end almost identical to brw_write case */ iooptr = lustre_msg_buf(request->rq_reqmsg, 1); nioptr = lustre_msg_buf(request->rq_reqmsg, 2); ost_pack_ioo(&iooptr, md, page_count); + /* end almost identical to brw_write case */ + for (mapped = 0; mapped < page_count; mapped++) { - struct ptlrpc_bulk_page *bulk; - bulk = ptlrpc_prep_bulk_page(desc); + struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc); if (bulk == NULL) GOTO(out_unmap, rc = -ENOMEM); @@ -536,41 +537,64 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, * Register the bulk first, because the reply could arrive out of order, * and we want to be ready for the bulk data. * - * One reference is released by the bulk callback, the other when - * we finish sleeping on it (if we don't have a callback). + * One reference is released when brw_finish is complete, the + * other here when we finish waiting on it if we don't have a callback. + * + * We don't reference the bulk descriptor again here if there is a + * callback, so we don't need an additional refcount on it. + * + * On error, we never do the brw_finish, so we handle all decrefs. */ - atomic_set(&desc->b_refcount, callback ? 1 : 2); + if (!callback) + ptlrpc_bulk_addref(desc); rc = ptlrpc_register_bulk(desc); if (rc) - GOTO(out_unmap, rc); + GOTO(out_desc2, rc); request->rq_replen = lustre_msg_size(1, size); rc = ptlrpc_queue_wait(request); rc = ptlrpc_check_status(request, rc); - if (rc) { - ptlrpc_bulk_decref(desc); - GOTO(out_unmap, rc); - } + + /* XXX: Mike, this is the only place I'm not sure of. If we had + * an error here, will we always call brw_finish? If yes, then + * out_desc_2 will do too much and we should jump to out_desc. + * If maybe, then we are screwed, and we need to set things up + * so that bulk_sink_callback is called for each bulk page, + * even on error so brw_finish is always called. It would need + * to be passed an error code as a parameter to know what to do. + * + * That would also help with the partial completion case, so + * we could say in brw_finish "these pages are done, don't + * restart them" and osc_brw callers can know this. + */ + if (rc) + GOTO(out_desc2, rc); /* Callbacks cause asynchronous handling. */ if (callback) - RETURN(0); + GOTO(out_req, rc = 0); + /* If there's no callback function, sleep here until complete. */ l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc)); - rc = desc->b_flags & PTL_RPC_FL_INTR ? -EINTR : 0; + if (desc->b_flags & PTL_RPC_FL_INTR) + GOTO(out_desc, rc = -EINTR); + + EXIT; +out_desc: ptlrpc_bulk_decref(desc); +out_req: + ptlrpc_req_finished(request); RETURN(rc); /* Clean up on error. */ - out_unmap: +out_desc2: + if (!callback) + ptlrpc_bulk_decref(desc); +out_unmap: while (mapped-- > 0) kunmap(page_array[mapped]); - out_free: - if (cb_data) - OBD_FREE(cb_data, sizeof(*cb_data)); - ptlrpc_free_bulk(desc); - ptlrpc_free_req(request); - return rc; + OBD_FREE(cb_data, sizeof(*cb_data)); + goto out_desc; } static int osc_brw_write(struct lustre_handle *conn, @@ -605,18 +629,15 @@ static int osc_brw_write(struct lustre_handle *conn, body = lustre_msg_buf(request->rq_reqmsg, 0); body->data = OBD_BRW_WRITE; - OBD_ALLOC(local, page_count * sizeof(*local)); - if (!local) - GOTO(out_free, rc = -ENOMEM); - desc = ptlrpc_prep_bulk(connection); if (!desc) - GOTO(out_free, rc = -ENOMEM); + GOTO(out_req, rc = -ENOMEM); desc->b_portal = OSC_BULK_PORTAL; desc->b_cb = brw_finish; OBD_ALLOC(cb_data, sizeof(*cb_data)); if (!cb_data) - GOTO(out_free, rc = -ENOMEM); + GOTO(out_desc, rc = -ENOMEM); + cb_data->callback = callback; cb_data->cb_data = data; desc->b_cb_data = cb_data; @@ -626,6 +647,10 @@ static int osc_brw_write(struct lustre_handle *conn, ost_pack_ioo(&iooptr, md, page_count); /* end almost identical to brw_read case */ + OBD_ALLOC(local, page_count * sizeof(*local)); + if (!local) + GOTO(out_cb, rc = -ENOMEM); + cb_data->obd_data = local; cb_data->obd_size = page_count * sizeof(*local); @@ -672,39 +697,48 @@ static int osc_brw_write(struct lustre_handle *conn, LBUG(); /* - * One is released when the bulk is complete, the other when we finish - * waiting on it. (Callback cases don't sleep, so only one ref for - * them.) + * One reference is released when brw_finish is complete, the + * other here when we finish waiting on it if we don't have a callback. + * + * We don't reference the bulk descriptor again here if there is a + * callback, so we don't need an additional refcount on it. */ - atomic_set(&desc->b_refcount, callback ? 1 : 2); - CDEBUG(D_PAGE, "Set refcount of %p to %d\n", desc, - atomic_read(&desc->b_refcount)); + if (!callback) + ptlrpc_bulk_addref(desc); rc = ptlrpc_send_bulk(desc); + + /* XXX: Mike, same question as in osc_brw_read. */ if (rc) - GOTO(out_unmap, rc); + GOTO(out_desc2, rc); /* Callbacks cause asynchronous handling. */ if (callback) - RETURN(0); + GOTO(out_req, rc = 0); /* If there's no callback function, sleep here until complete. */ l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc)); - ptlrpc_bulk_decref(desc); if (desc->b_flags & PTL_RPC_FL_INTR) - RETURN(-EINTR); - RETURN(0); + GOTO(out_desc, rc = -EINTR); + + EXIT; +out_desc: + ptlrpc_bulk_decref(desc); +out_req: + ptlrpc_req_finished(request); + return rc; /* Clean up on error. */ - out_unmap: +out_desc2: + if (!callback) + ptlrpc_bulk_decref(desc); +out_unmap: while (mapped-- > 0) kunmap(pagearray[mapped]); - out_free: - OBD_FREE(cb_data, sizeof(*cb_data)); OBD_FREE(local, page_count * sizeof(*local)); - ptlrpc_free_bulk(desc); - ptlrpc_req_finished(request); - return rc; +out_cb: + OBD_FREE(cb_data, sizeof(*cb_data)); + goto out_desc; } static int osc_brw(int cmd, struct lustre_handle *conn, -- 1.8.3.1