/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
+ * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
*
* This code is issued under the GNU General Public License.
* See the file COPYING in this distribution
struct osc_brw_cb_data {
struct page **buf;
- struct ptlrpc_request *req;
bulk_callback_t callback;
void *cb_data;
+ void *obd_data;
+ size_t obd_size;
};
-static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
+static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
{
struct osc_brw_cb_data *cb_data = data;
+ int i;
+ ENTRY;
if (desc->b_flags & PTL_RPC_FL_INTR)
CERROR("got signal\n");
- (cb_data->callback)(desc, cb_data->cb_data);
+ for (i = 0; i < desc->b_page_count; i++)
+ kunmap(cb_data->buf[i]);
- ptlrpc_free_bulk(desc);
- ptlrpc_free_req(cb_data->req);
+ if (cb_data->callback)
+ (cb_data->callback)(desc, cb_data->cb_data);
+ ptlrpc_bulk_decref(desc);
+ if (cb_data->obd_data)
+ OBD_FREE(cb_data->obd_data, cb_data->obd_size);
OBD_FREE(cb_data, sizeof(*cb_data));
+ EXIT;
}
static int osc_brw_read(struct lustre_handle *conn, obd_count num_oa,
{
struct ptlrpc_client *cl;
struct ptlrpc_connection *connection;
+ struct ptlrpc_request *request = NULL;
+ struct ptlrpc_bulk_desc *desc = NULL;
struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
- struct ptlrpc_request *request;
struct ost_body *body;
- struct list_head *tmp;
- int pages, rc, i, j, size[3] = {sizeof(*body)};
- void *ptr1, *ptr2;
- struct ptlrpc_bulk_desc *desc;
+ struct osc_brw_cb_data *cb_data = NULL;
+ long pages;
+ int rc, i, j, size[3] = {sizeof(*body)};
+ void *iooptr, *nioptr;
ENTRY;
+ /* XXX almost identical to brw_write case */
size[1] = num_oa * sizeof(struct obd_ioobj);
pages = 0;
for (i = 0; i < num_oa; i++)
osc_con2cl(conn, &cl, &connection);
request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
- OST_BRW, 3, size, NULL);
+ OST_BRW, 3, size, NULL);
if (!request)
- GOTO(out, rc = -ENOMEM);
+ RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
body->data = OBD_BRW_READ;
desc = ptlrpc_prep_bulk(connection);
if (!desc)
- GOTO(out2, rc = -ENOMEM);
+ GOTO(out_free, rc = -ENOMEM);
desc->b_portal = OST_BULK_PORTAL;
-
- ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
- ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
+ desc->b_cb = brw_finish;
+ OBD_ALLOC(cb_data, sizeof(*cb_data));
+ if (!cb_data)
+ GOTO(out_free, rc = -ENOMEM);
+ cb_data->buf = NULL;
+ cb_data->callback = callback;
+ desc->b_cb_data = cb_data;
+ /* XXX end almost identical to brw_write case */
+
+ iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
+ nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
for (pages = 0, i = 0; i < num_oa; i++) {
- ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
+ ost_pack_ioo(&iooptr, oa[i], oa_bufs[i]);
/* FIXME: this inner loop is wrong for multiple OAs */
for (j = 0; j < oa_bufs[i]; j++, pages++) {
struct ptlrpc_bulk_page *bulk;
bulk = ptlrpc_prep_bulk_page(desc);
if (bulk == NULL)
- GOTO(out3, rc = -ENOMEM);
+ GOTO(out_unmap, rc = -ENOMEM);
spin_lock(&connection->c_lock);
bulk->b_xid = ++connection->c_xid_out;
bulk->b_buf = kmap(buf[pages]);
bulk->b_page = buf[pages];
bulk->b_buflen = PAGE_SIZE;
- ost_pack_niobuf(&ptr2, offset[pages], count[pages],
+ ost_pack_niobuf(&nioptr, offset[pages], count[pages],
flags[pages], bulk->b_xid);
}
}
+ /*
+ * Register the bulk first, because the reply could arrive out of order,
+ * and we want to be ready for the bulk data.
+ *
+ * One reference is released by the bulk callback, the other when
+ * we finish sleeping on it (if we don't have a callback).
+ */
+ atomic_set(&desc->b_refcount, callback ? 1 : 2);
rc = ptlrpc_register_bulk(desc);
if (rc)
- GOTO(out3, rc);
+ GOTO(out_unmap, rc);
request->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
- if (rc)
- ptlrpc_abort_bulk(desc);
- GOTO(out3, rc);
-
- out3:
- list_for_each(tmp, &desc->b_page_list) {
- struct ptlrpc_bulk_page *bulk;
- bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
- if (bulk->b_page != NULL)
- kunmap(bulk->b_page);
+ if (rc) {
+ ptlrpc_bulk_decref(desc);
+ GOTO(out_unmap, rc);
}
- ptlrpc_free_bulk(desc);
- out2:
- ptlrpc_free_req(request);
- out:
- return rc;
-}
-static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
-{
- struct osc_brw_cb_data *cb_data = data;
- int i;
- ENTRY;
+ /* Callbacks cause asynchronous handling. */
+ if (callback)
+ RETURN(0);
+ l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
+ ptlrpc_bulk_decref(desc);
if (desc->b_flags & PTL_RPC_FL_INTR)
- CERROR("got signal\n");
-
- for (i = 0; i < desc->b_page_count; i++)
- kunmap(cb_data->buf[i]);
-
- (cb_data->callback)(desc, cb_data->cb_data);
-
+ RETURN(-EINTR);
+ RETURN(0);
+
+ /* Clean up on error. */
+ out_unmap:
+ for (pages = 0, i = 0; i < num_oa; i++)
+ for (j = 0; j < oa_bufs[i]; j++, pages++)
+ kunmap(pagearray[pages]);
+ out_free:
+ if (cb_data)
+ OBD_FREE(cb_data, sizeof(*cb_data));
ptlrpc_free_bulk(desc);
- ptlrpc_free_req(cb_data->req);
-
- OBD_FREE(cb_data, sizeof(*cb_data));
- EXIT;
+ ptlrpc_free_req(request);
+ return rc;
}
static int osc_brw_write(struct lustre_handle *conn, obd_count num_oa,
{
struct ptlrpc_client *cl;
struct ptlrpc_connection *connection;
- struct ptlrpc_request *request;
- struct ptlrpc_bulk_desc *desc;
+ struct ptlrpc_request *request = NULL;
+ struct ptlrpc_bulk_desc *desc = NULL;
struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
- struct obd_ioobj ioo;
struct ost_body *body;
- struct niobuf_local *local;
+ struct niobuf_local *local = NULL;
struct niobuf_remote *remote;
- struct osc_brw_cb_data *cb_data;
+ struct osc_brw_cb_data *cb_data = NULL;
long pages;
int rc, i, j, size[3] = {sizeof(*body)};
- void *ptr1, *ptr2;
+ void *iooptr, *nioptr;
ENTRY;
- size[1] = num_oa * sizeof(ioo);
+ /* XXX almost identical to brw_read case */
+ size[1] = num_oa * sizeof(struct obd_ioobj);
pages = 0;
for (i = 0; i < num_oa; i++)
pages += oa_bufs[i];
- size[2] = pages * sizeof(*remote);
-
- OBD_ALLOC(local, pages * sizeof(*local));
- if (local == NULL)
- RETURN(-ENOMEM);
+ size[2] = pages * sizeof(struct niobuf_remote);
osc_con2cl(conn, &cl, &connection);
request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
- OST_BRW, 3, size, NULL);
+ OST_BRW, 3, size, NULL);
if (!request)
- GOTO(out, rc = -ENOMEM);
+ RETURN(-ENOMEM);
+
body = lustre_msg_buf(request->rq_reqmsg, 0);
body->data = OBD_BRW_WRITE;
- ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
- ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
+ OBD_ALLOC(local, pages * sizeof(*local));
+ if (!local)
+ GOTO(out_free, rc = -ENOMEM);
+
+ desc = ptlrpc_prep_bulk(connection);
+ if (!desc)
+ GOTO(out_free, rc = -ENOMEM);
+ desc->b_portal = OSC_BULK_PORTAL;
+ desc->b_cb = brw_finish;
+ OBD_ALLOC(cb_data, sizeof(*cb_data));
+ if (!cb_data)
+ GOTO(out_free, rc = -ENOMEM);
+ cb_data->buf = pagearray;
+ cb_data->callback = callback;
+ desc->b_cb_data = cb_data;
+ /* XXX end almost identical to brw_read case */
+ cb_data->obd_data = local;
+ cb_data->obd_size = pages * sizeof(*local);
+
+ iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
+ nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
for (pages = 0, i = 0; i < num_oa; i++) {
- ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
+ ost_pack_ioo(&iooptr, oa[i], oa_bufs[i]);
for (j = 0; j < oa_bufs[i]; j++, pages++) {
local[pages].addr = kmap(pagearray[pages]);
local[pages].offset = offset[pages];
local[pages].len = count[pages];
- ost_pack_niobuf(&ptr2, offset[pages], count[pages],
+ ost_pack_niobuf(&nioptr, offset[pages], count[pages],
flags[pages], 0);
}
}
size[1] = pages * sizeof(struct niobuf_remote);
request->rq_replen = lustre_msg_size(2, size);
-
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
if (rc)
- GOTO(out2, rc);
+ GOTO(out_unmap, rc);
- ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
- if (ptr2 == NULL)
- GOTO(out2, rc = -EINVAL);
+ nioptr = lustre_msg_buf(request->rq_repmsg, 1);
+ if (!nioptr)
+ GOTO(out_unmap, rc = -EINVAL);
if (request->rq_repmsg->buflens[1] !=
pages * sizeof(struct niobuf_remote)) {
CERROR("buffer length wrong (%d vs. %ld)\n",
request->rq_repmsg->buflens[1],
pages * sizeof(struct niobuf_remote));
- GOTO(out2, rc = -EINVAL);
- }
-
- desc = ptlrpc_prep_bulk(connection);
- desc->b_portal = OSC_BULK_PORTAL;
- if (callback) {
- desc->b_cb = brw_write_finish;
- OBD_ALLOC(cb_data, sizeof(*cb_data));
- cb_data->buf = pagearray;
- cb_data->callback = callback;
- desc->b_cb_data = cb_data;
+ GOTO(out_unmap, rc = -EINVAL);
}
for (pages = 0, i = 0; i < num_oa; i++) {
for (j = 0; j < oa_bufs[i]; j++, pages++) {
struct ptlrpc_bulk_page *page;
- ost_unpack_niobuf(&ptr2, &remote);
+ ost_unpack_niobuf(&nioptr, &remote);
page = ptlrpc_prep_bulk_page(desc);
- if (page == NULL)
- GOTO(out3, rc = -ENOMEM);
+ if (!page)
+ GOTO(out_unmap, rc = -ENOMEM);
page->b_buf = (void *)(unsigned long)local[pages].addr;
page->b_buflen = local[pages].len;
if (desc->b_page_count != pages)
LBUG();
+ /*
+ * One is released when the bulk is complete, the other when we finish
+ * waiting on it. (Callback cases don't sleep, so only one ref for
+ * them.)
+ */
+ atomic_set(&desc->b_refcount, callback ? 1 : 2);
+ CDEBUG(D_PAGE, "Set refcount of %p to %d\n", desc,
+ atomic_read(&desc->b_refcount));
rc = ptlrpc_send_bulk(desc);
if (rc)
- GOTO(out3, rc);
+ GOTO(out_unmap, rc);
+
+ /* Callbacks cause asynchronous handling. */
if (callback)
- GOTO(out, rc);
+ RETURN(0);
/* If there's no callback function, sleep here until complete. */
l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
+ ptlrpc_bulk_decref(desc);
if (desc->b_flags & PTL_RPC_FL_INTR)
- rc = -EINTR;
-
- GOTO(out3, rc);
+ RETURN(-EINTR);
+ RETURN(0);
- out3:
- ptlrpc_free_bulk(desc);
- out2:
- ptlrpc_free_req(request);
+ /* Clean up on error. */
+ out_unmap:
for (pages = 0, i = 0; i < num_oa; i++)
for (j = 0; j < oa_bufs[i]; j++, pages++)
kunmap(pagearray[pages]);
- out:
- OBD_FREE(local, pages * sizeof(*local));
+ out_free:
+ if (cb_data)
+ OBD_FREE(cb_data, sizeof(*cb_data));
+ if (local)
+ OBD_FREE(local, pages * sizeof(*local));
+ ptlrpc_free_bulk(desc);
+ ptlrpc_req_finished(request);
return rc;
}