ENTRY;
osc_con2cl(conn, &cl, &connection);
- request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
+ request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size,
+ NULL);
if (!request)
RETURN(-ENOMEM);
return 0;
}
-static int osc_sendpage(struct ptlrpc_bulk_desc *desc,
- struct niobuf_remote *dst, struct niobuf_local *src)
+struct osc_brw_cb_data {
+ struct page **buf;
+ struct ptlrpc_request *req;
+ bulk_callback_t callback;
+ void *cb_data;
+};
+
+static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
{
- struct ptlrpc_bulk_page *page;
- ENTRY;
+ struct osc_brw_cb_data *cb_data = data;
- page = ptlrpc_prep_bulk_page(desc);
- if (page == NULL)
- RETURN(-ENOMEM);
+ if (desc->b_flags & PTL_RPC_FL_INTR)
+ CERROR("got signal\n");
- page->b_buf = (void *)(unsigned long)src->addr;
- page->b_buflen = src->len;
- page->b_xid = dst->xid;
+ (cb_data->callback)(desc, cb_data->cb_data);
- RETURN(0);
+ ptlrpc_free_bulk(desc);
+ ptlrpc_free_req(cb_data->req);
+
+ OBD_FREE(cb_data, sizeof(*cb_data));
}
static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
struct obdo **oa, obd_count *oa_bufs, struct page **buf,
- obd_size *count, obd_off *offset, obd_flag *flags)
+ obd_size *count, obd_off *offset, obd_flag *flags,
+ bulk_callback_t callback)
{
struct ptlrpc_client *cl;
struct ptlrpc_connection *connection;
ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
for (pages = 0, i = 0; i < num_oa; i++) {
ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
+ /* FIXME: this inner loop is wrong for multiple OAs */
for (j = 0; j < oa_bufs[i]; j++, pages++) {
struct ptlrpc_bulk_page *bulk;
bulk = ptlrpc_prep_bulk_page(desc);
return rc;
}
+static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
+{
+ struct osc_brw_cb_data *cb_data = data;
+ int i;
+
+ if (desc->b_flags & PTL_RPC_FL_INTR)
+ CERROR("got signal\n");
+
+ for (i = 0; i < desc->b_page_count; i++)
+ kunmap(cb_data->buf[i]);
+
+ (cb_data->callback)(desc, cb_data->cb_data);
+
+ ptlrpc_free_bulk(desc);
+ ptlrpc_free_req(cb_data->req);
+
+ OBD_FREE(cb_data, sizeof(*cb_data));
+}
+
static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
struct obdo **oa, obd_count *oa_bufs,
- struct page **pagearray, obd_size *count, obd_off *offset,
- obd_flag *flags)
+ struct page **pagearray, obd_size *count,
+ obd_off *offset, obd_flag *flags,
+ bulk_callback_t callback)
{
struct ptlrpc_client *cl;
struct ptlrpc_connection *connection;
struct ost_body *body;
struct niobuf_local *local;
struct niobuf_remote *remote;
+ struct osc_brw_cb_data *cb_data;
long pages;
int rc, i, j, size[3] = {sizeof(*body)};
void *ptr1, *ptr2;
desc = ptlrpc_prep_bulk(connection);
desc->b_portal = OSC_BULK_PORTAL;
+ if (callback) {
+ desc->b_cb = brw_write_finish;
+ OBD_ALLOC(cb_data, sizeof(*cb_data));
+ cb_data->buf = pagearray;
+ cb_data->callback = callback;
+ desc->b_cb_data = cb_data;
+ }
for (pages = 0, i = 0; i < num_oa; i++) {
for (j = 0; j < oa_bufs[i]; j++, pages++) {
+ struct ptlrpc_bulk_page *page;
+
ost_unpack_niobuf(&ptr2, &remote);
- rc = osc_sendpage(desc, remote, &local[pages]);
- if (rc)
- GOTO(out3, rc);
+
+ page = ptlrpc_prep_bulk_page(desc);
+ if (page == NULL)
+ GOTO(out3, rc = -ENOMEM);
+
+ page->b_buf = (void *)(unsigned long)local[pages].addr;
+ page->b_buflen = local[pages].len;
+ page->b_xid = remote->xid;
}
}
+ if (desc->b_page_count != pages)
+ LBUG();
+
rc = ptlrpc_send_bulk(desc);
+ if (rc)
+ GOTO(out3, rc);
+ if (callback)
+ GOTO(out, rc);
+
+ /* If there's no callback function, sleep here until complete. */
+ wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
+ if (desc->b_flags & PTL_RPC_FL_INTR)
+ rc = -EINTR;
+
GOTO(out3, rc);
out3:
static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
struct obdo **oa, obd_count *oa_bufs, struct page **buf,
- obd_size *count, obd_off *offset, obd_flag *flags)
+ obd_size *count, obd_off *offset, obd_flag *flags,
+ void *callback)
{
+ if (num_oa != 1)
+ LBUG();
+
if (rw == OBD_BRW_READ)
return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
- offset, flags);
+ offset, flags, (bulk_callback_t)callback);
else
return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
- offset, flags);
+ offset, flags, (bulk_callback_t)callback);
}
static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,