From 9140bd3dae4ac7e447da4e7ab56e9d49c9fd29c3 Mon Sep 17 00:00:00 2001 From: shaver Date: Sat, 13 Jul 2002 23:38:45 +0000 Subject: [PATCH] - Use refcounting to control lifetime of bulk descriptors. - Add some LDLM timeout #warnings for future reference. - Make all bulk-desc cleanup happen asynchronously, for cleaner interrupt (and later timeout) handling. - Have OSC wait for brw_write reply before beginning bulk operations, for symmetry with brw_read and general niceness. - Fix reply-for-freed-request check to use proper XID width. - Use l_killable_pending in ptlrpc_check_bulk_sent, for consistency. (Also in the new ptlrpc_check_bulk_received sibling.) --- lustre/doc/.cvsignore | 1 + lustre/include/linux/lustre_net.h | 14 +++ lustre/ldlm/ldlm_request.c | 1 + lustre/osc/osc_request.c | 235 ++++++++++++++++++++++---------------- lustre/ptlrpc/client.c | 3 +- lustre/ptlrpc/events.c | 2 +- lustre/ptlrpc/niobuf.c | 20 +++- lustre/ptlrpc/rpc.c | 1 + 8 files changed, 172 insertions(+), 105 deletions(-) diff --git a/lustre/doc/.cvsignore b/lustre/doc/.cvsignore index 249de7d..43841ad 100644 --- a/lustre/doc/.cvsignore +++ b/lustre/doc/.cvsignore @@ -12,3 +12,4 @@ OBD-HOWTO.txt lustre-HOWTO.txt *.eps master.pdf +lustre.lyx diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index dc07883..65dd815 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -162,6 +162,7 @@ struct ptlrpc_bulk_desc { struct list_head b_page_list; __u32 b_page_count; atomic_t b_pages_remaining; + atomic_t b_refcount; void *b_desc_private; }; @@ -226,6 +227,7 @@ void ptlrpc_cleanup_connection(void); /* rpc/niobuf.c */ int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk); +int ptlrpc_check_bulk_received(struct ptlrpc_bulk_desc *bulk); int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *); int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *); int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk); @@ -282,4 +284,16 @@ int lustre_pack_msg(int count, int *lens, char **bufs, int *len, int lustre_msg_size(int count, int *lengths); int lustre_unpack_msg(struct lustre_msg *m, int len); void *lustre_msg_buf(struct lustre_msg *m, int n); + +static inline void ptlrpc_bulk_decref(struct ptlrpc_bulk_desc *desc) +{ + if (atomic_dec_and_test(&desc->b_refcount)) { + CDEBUG(D_PAGE, "Released last ref on %p, freeing\n", desc); + ptlrpc_free_bulk(desc); + } else { + CDEBUG(D_PAGE, "%p -> %d\n", desc, + atomic_read(&desc->b_refcount)); + } +} + #endif diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index f632587..eddf484 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -134,6 +134,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock," " sleeping"); ldlm_lock_dump(lock); +#warning ldlm needs to time out wait_event(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode); LDLM_DEBUG(lock, "client-side enqueue waking up: granted"); diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index c8a7e39..70d40c2 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1,7 +1,7 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copryright (C) 2001, 2002 Cluster File Systems, Inc. + * Copyright (C) 2001, 2002 Cluster File Systems, Inc. * * This code is issued under the GNU General Public License. * See the file COPYING in this distribution @@ -395,24 +395,32 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa) struct osc_brw_cb_data { struct page **buf; - struct ptlrpc_request *req; bulk_callback_t callback; void *cb_data; + void *obd_data; + size_t obd_size; }; -static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data) +static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data) { struct osc_brw_cb_data *cb_data = data; + int i; + ENTRY; if (desc->b_flags & PTL_RPC_FL_INTR) CERROR("got signal\n"); - (cb_data->callback)(desc, cb_data->cb_data); + for (i = 0; i < desc->b_page_count; i++) + kunmap(cb_data->buf[i]); - ptlrpc_free_bulk(desc); - ptlrpc_free_req(cb_data->req); + if (cb_data->callback) + (cb_data->callback)(desc, cb_data->cb_data); + ptlrpc_bulk_decref(desc); + if (cb_data->obd_data) + OBD_FREE(cb_data->obd_data, cb_data->obd_size); OBD_FREE(cb_data, sizeof(*cb_data)); + EXIT; } static int osc_brw_read(struct lustre_handle *conn, obd_count num_oa, @@ -422,15 +430,17 @@ static int osc_brw_read(struct lustre_handle *conn, obd_count num_oa, { struct ptlrpc_client *cl; struct ptlrpc_connection *connection; + struct ptlrpc_request *request = NULL; + struct ptlrpc_bulk_desc *desc = NULL; struct osc_obd *osc = &class_conn2obd(conn)->u.osc; - struct ptlrpc_request *request; struct ost_body *body; - struct list_head *tmp; - int pages, rc, i, j, size[3] = {sizeof(*body)}; - void *ptr1, *ptr2; - struct ptlrpc_bulk_desc *desc; + struct osc_brw_cb_data *cb_data = NULL; + long pages; + int rc, i, j, size[3] = {sizeof(*body)}; + void *iooptr, *nioptr; ENTRY; + /* XXX almost identical to brw_write case */ size[1] = num_oa * sizeof(struct obd_ioobj); pages = 0; for (i = 0; i < num_oa; i++) @@ -439,28 +449,36 @@ static int osc_brw_read(struct lustre_handle *conn, obd_count num_oa, osc_con2cl(conn, &cl, &connection); request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, - OST_BRW, 3, size, NULL); + OST_BRW, 3, size, NULL); if (!request) - GOTO(out, rc = -ENOMEM); + RETURN(-ENOMEM); body = lustre_msg_buf(request->rq_reqmsg, 0); body->data = OBD_BRW_READ; desc = ptlrpc_prep_bulk(connection); if (!desc) - GOTO(out2, rc = -ENOMEM); + GOTO(out_free, rc = -ENOMEM); desc->b_portal = OST_BULK_PORTAL; - - ptr1 = lustre_msg_buf(request->rq_reqmsg, 1); - ptr2 = lustre_msg_buf(request->rq_reqmsg, 2); + desc->b_cb = brw_finish; + OBD_ALLOC(cb_data, sizeof(*cb_data)); + if (!cb_data) + GOTO(out_free, rc = -ENOMEM); + cb_data->buf = NULL; + cb_data->callback = callback; + desc->b_cb_data = cb_data; + /* XXX end almost identical to brw_write case */ + + iooptr = lustre_msg_buf(request->rq_reqmsg, 1); + nioptr = lustre_msg_buf(request->rq_reqmsg, 2); for (pages = 0, i = 0; i < num_oa; i++) { - ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); + ost_pack_ioo(&iooptr, oa[i], oa_bufs[i]); /* FIXME: this inner loop is wrong for multiple OAs */ for (j = 0; j < oa_bufs[i]; j++, pages++) { struct ptlrpc_bulk_page *bulk; bulk = ptlrpc_prep_bulk_page(desc); if (bulk == NULL) - GOTO(out3, rc = -ENOMEM); + GOTO(out_unmap, rc = -ENOMEM); spin_lock(&connection->c_lock); bulk->b_xid = ++connection->c_xid_out; @@ -469,55 +487,52 @@ static int osc_brw_read(struct lustre_handle *conn, obd_count num_oa, bulk->b_buf = kmap(buf[pages]); bulk->b_page = buf[pages]; bulk->b_buflen = PAGE_SIZE; - ost_pack_niobuf(&ptr2, offset[pages], count[pages], + ost_pack_niobuf(&nioptr, offset[pages], count[pages], flags[pages], bulk->b_xid); } } + /* + * Register the bulk first, because the reply could arrive out of order, + * and we want to be ready for the bulk data. + * + * One reference is released by the bulk callback, the other when + * we finish sleeping on it (if we don't have a callback). + */ + atomic_set(&desc->b_refcount, callback ? 1 : 2); rc = ptlrpc_register_bulk(desc); if (rc) - GOTO(out3, rc); + GOTO(out_unmap, rc); request->rq_replen = lustre_msg_size(1, size); rc = ptlrpc_queue_wait(request); rc = ptlrpc_check_status(request, rc); - if (rc) - ptlrpc_abort_bulk(desc); - GOTO(out3, rc); - - out3: - list_for_each(tmp, &desc->b_page_list) { - struct ptlrpc_bulk_page *bulk; - bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link); - if (bulk->b_page != NULL) - kunmap(bulk->b_page); + if (rc) { + ptlrpc_bulk_decref(desc); + GOTO(out_unmap, rc); } - ptlrpc_free_bulk(desc); - out2: - ptlrpc_free_req(request); - out: - return rc; -} -static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data) -{ - struct osc_brw_cb_data *cb_data = data; - int i; - ENTRY; + /* Callbacks cause asynchronous handling. */ + if (callback) + RETURN(0); + l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc)); + ptlrpc_bulk_decref(desc); if (desc->b_flags & PTL_RPC_FL_INTR) - CERROR("got signal\n"); - - for (i = 0; i < desc->b_page_count; i++) - kunmap(cb_data->buf[i]); - - (cb_data->callback)(desc, cb_data->cb_data); - + RETURN(-EINTR); + RETURN(0); + + /* Clean up on error. */ + out_unmap: + for (pages = 0, i = 0; i < num_oa; i++) + for (j = 0; j < oa_bufs[i]; j++, pages++) + kunmap(pagearray[pages]); + out_free: + if (cb_data) + OBD_FREE(cb_data, sizeof(*cb_data)); ptlrpc_free_bulk(desc); - ptlrpc_free_req(cb_data->req); - - OBD_FREE(cb_data, sizeof(*cb_data)); - EXIT; + ptlrpc_free_req(request); + return rc; } static int osc_brw_write(struct lustre_handle *conn, obd_count num_oa, @@ -528,89 +543,94 @@ static int osc_brw_write(struct lustre_handle *conn, obd_count num_oa, { struct ptlrpc_client *cl; struct ptlrpc_connection *connection; - struct ptlrpc_request *request; - struct ptlrpc_bulk_desc *desc; + struct ptlrpc_request *request = NULL; + struct ptlrpc_bulk_desc *desc = NULL; struct osc_obd *osc = &class_conn2obd(conn)->u.osc; - struct obd_ioobj ioo; struct ost_body *body; - struct niobuf_local *local; + struct niobuf_local *local = NULL; struct niobuf_remote *remote; - struct osc_brw_cb_data *cb_data; + struct osc_brw_cb_data *cb_data = NULL; long pages; int rc, i, j, size[3] = {sizeof(*body)}; - void *ptr1, *ptr2; + void *iooptr, *nioptr; ENTRY; - size[1] = num_oa * sizeof(ioo); + /* XXX almost identical to brw_read case */ + size[1] = num_oa * sizeof(struct obd_ioobj); pages = 0; for (i = 0; i < num_oa; i++) pages += oa_bufs[i]; - size[2] = pages * sizeof(*remote); - - OBD_ALLOC(local, pages * sizeof(*local)); - if (local == NULL) - RETURN(-ENOMEM); + size[2] = pages * sizeof(struct niobuf_remote); osc_con2cl(conn, &cl, &connection); request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, - OST_BRW, 3, size, NULL); + OST_BRW, 3, size, NULL); if (!request) - GOTO(out, rc = -ENOMEM); + RETURN(-ENOMEM); + body = lustre_msg_buf(request->rq_reqmsg, 0); body->data = OBD_BRW_WRITE; - ptr1 = lustre_msg_buf(request->rq_reqmsg, 1); - ptr2 = lustre_msg_buf(request->rq_reqmsg, 2); + OBD_ALLOC(local, pages * sizeof(*local)); + if (!local) + GOTO(out_free, rc = -ENOMEM); + + desc = ptlrpc_prep_bulk(connection); + if (!desc) + GOTO(out_free, rc = -ENOMEM); + desc->b_portal = OSC_BULK_PORTAL; + desc->b_cb = brw_finish; + OBD_ALLOC(cb_data, sizeof(*cb_data)); + if (!cb_data) + GOTO(out_free, rc = -ENOMEM); + cb_data->buf = pagearray; + cb_data->callback = callback; + desc->b_cb_data = cb_data; + /* XXX end almost identical to brw_read case */ + cb_data->obd_data = local; + cb_data->obd_size = pages * sizeof(*local); + + iooptr = lustre_msg_buf(request->rq_reqmsg, 1); + nioptr = lustre_msg_buf(request->rq_reqmsg, 2); for (pages = 0, i = 0; i < num_oa; i++) { - ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); + ost_pack_ioo(&iooptr, oa[i], oa_bufs[i]); for (j = 0; j < oa_bufs[i]; j++, pages++) { local[pages].addr = kmap(pagearray[pages]); local[pages].offset = offset[pages]; local[pages].len = count[pages]; - ost_pack_niobuf(&ptr2, offset[pages], count[pages], + ost_pack_niobuf(&nioptr, offset[pages], count[pages], flags[pages], 0); } } size[1] = pages * sizeof(struct niobuf_remote); request->rq_replen = lustre_msg_size(2, size); - rc = ptlrpc_queue_wait(request); rc = ptlrpc_check_status(request, rc); if (rc) - GOTO(out2, rc); + GOTO(out_unmap, rc); - ptr2 = lustre_msg_buf(request->rq_repmsg, 1); - if (ptr2 == NULL) - GOTO(out2, rc = -EINVAL); + nioptr = lustre_msg_buf(request->rq_repmsg, 1); + if (!nioptr) + GOTO(out_unmap, rc = -EINVAL); if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf_remote)) { CERROR("buffer length wrong (%d vs. %ld)\n", request->rq_repmsg->buflens[1], pages * sizeof(struct niobuf_remote)); - GOTO(out2, rc = -EINVAL); - } - - desc = ptlrpc_prep_bulk(connection); - desc->b_portal = OSC_BULK_PORTAL; - if (callback) { - desc->b_cb = brw_write_finish; - OBD_ALLOC(cb_data, sizeof(*cb_data)); - cb_data->buf = pagearray; - cb_data->callback = callback; - desc->b_cb_data = cb_data; + GOTO(out_unmap, rc = -EINVAL); } for (pages = 0, i = 0; i < num_oa; i++) { for (j = 0; j < oa_bufs[i]; j++, pages++) { struct ptlrpc_bulk_page *page; - ost_unpack_niobuf(&ptr2, &remote); + ost_unpack_niobuf(&nioptr, &remote); page = ptlrpc_prep_bulk_page(desc); - if (page == NULL) - GOTO(out3, rc = -ENOMEM); + if (!page) + GOTO(out_unmap, rc = -ENOMEM); page->b_buf = (void *)(unsigned long)local[pages].addr; page->b_buflen = local[pages].len; @@ -621,29 +641,42 @@ static int osc_brw_write(struct lustre_handle *conn, obd_count num_oa, if (desc->b_page_count != pages) LBUG(); + /* + * One is released when the bulk is complete, the other when we finish + * waiting on it. (Callback cases don't sleep, so only one ref for + * them.) + */ + atomic_set(&desc->b_refcount, callback ? 1 : 2); + CDEBUG(D_PAGE, "Set refcount of %p to %d\n", desc, + atomic_read(&desc->b_refcount)); rc = ptlrpc_send_bulk(desc); if (rc) - GOTO(out3, rc); + GOTO(out_unmap, rc); + + /* Callbacks cause asynchronous handling. */ if (callback) - GOTO(out, rc); + RETURN(0); /* If there's no callback function, sleep here until complete. */ l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc)); + ptlrpc_bulk_decref(desc); if (desc->b_flags & PTL_RPC_FL_INTR) - rc = -EINTR; - - GOTO(out3, rc); + RETURN(-EINTR); + RETURN(0); - out3: - ptlrpc_free_bulk(desc); - out2: - ptlrpc_free_req(request); + /* Clean up on error. */ + out_unmap: for (pages = 0, i = 0; i < num_oa; i++) for (j = 0; j < oa_bufs[i]; j++, pages++) kunmap(pagearray[pages]); - out: - OBD_FREE(local, pages * sizeof(*local)); + out_free: + if (cb_data) + OBD_FREE(cb_data, sizeof(*cb_data)); + if (local) + OBD_FREE(local, pages * sizeof(*local)); + ptlrpc_free_bulk(desc); + ptlrpc_req_finished(request); return rc; } diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index ce69521..2f8c1f4 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -469,7 +469,8 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) /* XXX probably both an import and connection level are needed */ if (req->rq_level > req->rq_connection->c_level) { - CERROR("process %d waiting for recovery\n", current->pid); + CERROR("process %d waiting for recovery (%d > %d)\n", + current->pid, req->rq_level, req->rq_connection->c_level); spin_lock(&cli->cli_lock); list_del_init(&req->rq_list); list_add(&req->rq_list, cli->cli_delayed_head.prev); diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index ca4e566d..b819c29 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -73,7 +73,7 @@ static int reply_in_callback(ptl_event_t *ev) struct ptlrpc_request *req = ev->mem_desc.user_ptr; ENTRY; - if (req->rq_xid == 0x5a5a5a5a) { + if (req->rq_xid == 0x5a5a5a5a5a5a5a5a) { CERROR("Reply received for freed request! Probably a missing " "ptlrpc_abort()\n"); LBUG(); diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index a72d14e..072592f 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -24,6 +24,7 @@ #include #include +#include extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq, bulk_source_eq, bulk_sink_eq; @@ -36,8 +37,23 @@ int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk) if (bulk->b_flags & PTL_BULK_FL_SENT) RETURN(1); - if (sigismember(&(current->pending.signal), SIGKILL) || - sigismember(&(current->pending.signal), SIGINT)) { + if (l_killable_pending(current)) { + bulk->b_flags |= PTL_RPC_FL_INTR; + RETURN(1); + } + + CDEBUG(D_NET, "no event yet\n"); + RETURN(0); +} + +int ptlrpc_check_bulk_received(struct ptlrpc_bulk_desc *bulk) +{ + ENTRY; + + if (bulk->b_flags & PTL_BULK_FL_RCVD) + RETURN(1); + + if (l_killable_pending(current)) { bulk->b_flags |= PTL_RPC_FL_INTR; RETURN(1); } diff --git a/lustre/ptlrpc/rpc.c b/lustre/ptlrpc/rpc.c index 578b99e..c8e268a 100644 --- a/lustre/ptlrpc/rpc.c +++ b/lustre/ptlrpc/rpc.c @@ -147,6 +147,7 @@ static void __exit ptlrpc_exit(void) /* events.c */ EXPORT_SYMBOL(ptlrpc_check_bulk_sent); +EXPORT_SYMBOL(ptlrpc_check_bulk_received); /* connmgr.c */ EXPORT_SYMBOL(ptlrpc_connmgr); -- 1.8.3.1