* Register the bulk first, because the reply could arrive out of order,
* and we want to be ready for the bulk data.
*
- * One reference is released when brw_finish is complete, the
- * other here when we finish waiting on it if we don't have a callback.
- *
- * We don't reference the bulk descriptor again here if there is a
- * callback, so we don't need an additional refcount on it.
+ * The reference is released when brw_finish is complete.
*
* On error, we never do the brw_finish, so we handle all decrefs.
*/
rc = ptlrpc_register_bulk(desc);
if (rc)
- GOTO(out_unmap, rc);
+ GOTO(out_desc, rc);
request->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(request);
rc = ptlrpc_check_status(request, rc);
- /* XXX: Mike, this is the only place I'm not sure of. If we had
- * an error here, will we always call brw_finish? If yes, then
- * out_desc_2 will do too much and we should jump to out_desc.
+ /* XXX: Mike, this is the only place I'm not sure of. If we have
+ * an error here, will we have always called brw_finish? If no,
+ * then out_req will not clean up and we should go to out_desc.
* If maybe, then we are screwed, and we need to set things up
* so that bulk_sink_callback is called for each bulk page,
* even on error so brw_finish is always called. It would need
* restart them" and osc_brw callers can know this.
*/
if (rc)
- GOTO(out_unmap, rc);
+ GOTO(out_req, rc);
/* Callbacks cause asynchronous handling. */
rc = callback(data, 0, CB_PHASE_START);
EXIT;
-out_desc:
- ptlrpc_bulk_decref(desc);
out_req:
ptlrpc_req_finished(request);
RETURN(rc);
/* Clean up on error. */
+out_desc:
+ ptlrpc_bulk_decref(desc);
out_unmap:
while (mapped-- > 0)
kunmap(page_array[mapped]);
OBD_FREE(cb_data, sizeof(*cb_data));
- goto out_desc;
+ goto out_req;
}
-static int osc_brw_write(struct lustre_handle *conn,
- struct lov_stripe_md *md, obd_count page_count,
- struct brw_page *pga,
+static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
+ obd_count page_count, struct brw_page *pga,
brw_callback_t callback, struct io_cb_data *data)
{
struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
if (desc->b_page_count != page_count)
LBUG();
- /*
- * One reference is released when brw_finish is complete, the
- * other here when we finish waiting on it if we don't have a callback.
- */
+ /* Our reference is released when brw_finish is complete. */
rc = ptlrpc_send_bulk(desc);
/* XXX: Mike, same question as in osc_brw_read. */
if (rc)
- GOTO(out_desc2, rc);
+ GOTO(out_req, rc);
/* Callbacks cause asynchronous handling. */
rc = callback(data, 0, CB_PHASE_START);
EXIT;
-out_desc:
- ptlrpc_bulk_decref(desc);
out_req:
ptlrpc_req_finished(request);
return rc;
/* Clean up on error. */
-out_desc2:
- if (!callback)
- ptlrpc_bulk_decref(desc);
out_unmap:
while (mapped-- > 0)
kunmap(pagearray[mapped]);
OBD_FREE(local, page_count * sizeof(*local));
out_cb:
OBD_FREE(cb_data, sizeof(*cb_data));
- goto out_desc;
+out_desc:
+ ptlrpc_bulk_decref(desc);
+ goto out_req;
}
static int osc_brw(int cmd, struct lustre_handle *conn,
struct lov_stripe_md *md, obd_count page_count,
- struct brw_page *pagear, brw_callback_t callback,
- struct io_cb_data *data)
+ struct brw_page *pga, brw_callback_t callback,
+ struct io_cb_data *data)
{
if (cmd & OBD_BRW_WRITE)
- return osc_brw_write(conn, md, page_count, pagear, callback, data);
+ return osc_brw_write(conn, md, page_count, pga, callback, data);
else
- return osc_brw_read(conn, md, page_count, pagear, callback, data);
+ return osc_brw_read(conn, md, page_count, pga, callback, data);
}
static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md,