desc = ptlrpc_prep_bulk(connection);
if (!desc)
- GOTO(out_free, rc = -ENOMEM);
+ GOTO(out_req, rc = -ENOMEM);
desc->b_portal = OST_BULK_PORTAL;
desc->b_cb = brw_finish;
OBD_ALLOC(cb_data, sizeof(*cb_data));
if (!cb_data)
- GOTO(out_free, rc = -ENOMEM);
+ GOTO(out_desc, rc = -ENOMEM);
+
cb_data->callback = callback;
cb_data->cb_data = data;
desc->b_cb_data = cb_data;
- /* end almost identical to brw_write case */
iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
ost_pack_ioo(&iooptr, md, page_count);
+ /* end almost identical to brw_write case */
+
for (mapped = 0; mapped < page_count; mapped++) {
- struct ptlrpc_bulk_page *bulk;
- bulk = ptlrpc_prep_bulk_page(desc);
+ struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
if (bulk == NULL)
GOTO(out_unmap, rc = -ENOMEM);
* Register the bulk first, because the reply could arrive out of order,
* and we want to be ready for the bulk data.
*
- * One reference is released by the bulk callback, the other when
- * we finish sleeping on it (if we don't have a callback).
+ * One reference is released when brw_finish is complete, the
+ * other here when we finish waiting on it if we don't have a callback.
+ *
+ * We don't reference the bulk descriptor again here if there is a
+ * callback, so we don't need an additional refcount on it.
+ *
+ * On error, we never do the brw_finish, so we handle all decrefs.
*/
- atomic_set(&desc->b_refcount, callback ? 1 : 2);
+ if (!callback)
+ ptlrpc_bulk_addref(desc);
rc = ptlrpc_register_bulk(desc);
if (rc)
- GOTO(out_unmap, rc);
+ GOTO(out_desc2, rc);
request->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
- if (rc) {
- ptlrpc_bulk_decref(desc);
- GOTO(out_unmap, rc);
- }
+
+ /* XXX: Mike, this is the only place I'm not sure of. If we had
+ * an error here, will we always call brw_finish? If yes, then
+ * out_desc_2 will do too much and we should jump to out_desc.
+ * If maybe, then we are screwed, and we need to set things up
+ * so that bulk_sink_callback is called for each bulk page,
+ * even on error so brw_finish is always called. It would need
+ * to be passed an error code as a parameter to know what to do.
+ *
+ * That would also help with the partial completion case, so
+ * we could say in brw_finish "these pages are done, don't
+ * restart them" and osc_brw callers can know this.
+ */
+ if (rc)
+ GOTO(out_desc2, rc);
/* Callbacks cause asynchronous handling. */
if (callback)
- RETURN(0);
+ GOTO(out_req, rc = 0);
+ /* If there's no callback function, sleep here until complete. */
l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
- rc = desc->b_flags & PTL_RPC_FL_INTR ? -EINTR : 0;
+ if (desc->b_flags & PTL_RPC_FL_INTR)
+ GOTO(out_desc, rc = -EINTR);
+
+ EXIT;
+out_desc:
ptlrpc_bulk_decref(desc);
+out_req:
+ ptlrpc_req_finished(request);
RETURN(rc);
/* Clean up on error. */
- out_unmap:
+out_desc2:
+ if (!callback)
+ ptlrpc_bulk_decref(desc);
+out_unmap:
while (mapped-- > 0)
kunmap(page_array[mapped]);
- out_free:
- if (cb_data)
- OBD_FREE(cb_data, sizeof(*cb_data));
- ptlrpc_free_bulk(desc);
- ptlrpc_free_req(request);
- return rc;
+ OBD_FREE(cb_data, sizeof(*cb_data));
+ goto out_desc;
}
static int osc_brw_write(struct lustre_handle *conn,
body = lustre_msg_buf(request->rq_reqmsg, 0);
body->data = OBD_BRW_WRITE;
- OBD_ALLOC(local, page_count * sizeof(*local));
- if (!local)
- GOTO(out_free, rc = -ENOMEM);
-
desc = ptlrpc_prep_bulk(connection);
if (!desc)
- GOTO(out_free, rc = -ENOMEM);
+ GOTO(out_req, rc = -ENOMEM);
desc->b_portal = OSC_BULK_PORTAL;
desc->b_cb = brw_finish;
OBD_ALLOC(cb_data, sizeof(*cb_data));
if (!cb_data)
- GOTO(out_free, rc = -ENOMEM);
+ GOTO(out_desc, rc = -ENOMEM);
+
cb_data->callback = callback;
cb_data->cb_data = data;
desc->b_cb_data = cb_data;
ost_pack_ioo(&iooptr, md, page_count);
/* end almost identical to brw_read case */
+ OBD_ALLOC(local, page_count * sizeof(*local));
+ if (!local)
+ GOTO(out_cb, rc = -ENOMEM);
+
cb_data->obd_data = local;
cb_data->obd_size = page_count * sizeof(*local);
LBUG();
/*
- * One is released when the bulk is complete, the other when we finish
- * waiting on it. (Callback cases don't sleep, so only one ref for
- * them.)
+ * One reference is released when brw_finish is complete, the
+ * other here when we finish waiting on it if we don't have a callback.
+ *
+ * We don't reference the bulk descriptor again here if there is a
+ * callback, so we don't need an additional refcount on it.
*/
- atomic_set(&desc->b_refcount, callback ? 1 : 2);
- CDEBUG(D_PAGE, "Set refcount of %p to %d\n", desc,
- atomic_read(&desc->b_refcount));
+ if (!callback)
+ ptlrpc_bulk_addref(desc);
rc = ptlrpc_send_bulk(desc);
+
+ /* XXX: Mike, same question as in osc_brw_read. */
if (rc)
- GOTO(out_unmap, rc);
+ GOTO(out_desc2, rc);
/* Callbacks cause asynchronous handling. */
if (callback)
- RETURN(0);
+ GOTO(out_req, rc = 0);
/* If there's no callback function, sleep here until complete. */
l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
- ptlrpc_bulk_decref(desc);
if (desc->b_flags & PTL_RPC_FL_INTR)
- RETURN(-EINTR);
- RETURN(0);
+ GOTO(out_desc, rc = -EINTR);
+
+ EXIT;
+out_desc:
+ ptlrpc_bulk_decref(desc);
+out_req:
+ ptlrpc_req_finished(request);
+ return rc;
/* Clean up on error. */
- out_unmap:
+out_desc2:
+ if (!callback)
+ ptlrpc_bulk_decref(desc);
+out_unmap:
while (mapped-- > 0)
kunmap(pagearray[mapped]);
- out_free:
- OBD_FREE(cb_data, sizeof(*cb_data));
OBD_FREE(local, page_count * sizeof(*local));
- ptlrpc_free_bulk(desc);
- ptlrpc_req_finished(request);
- return rc;
+out_cb:
+ OBD_FREE(cb_data, sizeof(*cb_data));
+ goto out_desc;
}
static int osc_brw(int cmd, struct lustre_handle *conn,