+ size[1] = num_oa * sizeof(ioo);
+ pages = 0;
+ for (i = 0; i < num_oa; i++)
+ pages += oa_bufs[i];
+ size[2] = pages * sizeof(src);
+
+ OBD_ALLOC(bulk, pages * sizeof(*bulk));
+ if (bulk == NULL)
+ RETURN(-ENOMEM);
+
+ osc_con2cl(conn, &cl, &connection);
+ request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
+ if (!request)
+ GOTO(out, rc = -ENOMEM);
+
+ body = lustre_msg_buf(request->rq_reqmsg, 0);
+ body->data = OBD_BRW_READ;
+
+ ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
+ ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
+ for (pages = 0, i = 0; i < num_oa; i++) {
+ ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
+ for (j = 0; j < oa_bufs[i]; j++, pages++) {
+ bulk[pages] = ptlrpc_prep_bulk(connection);
+ if (bulk[pages] == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ spin_lock(&connection->c_lock);
+ bulk[pages]->b_xid = ++connection->c_xid_out;
+ spin_unlock(&connection->c_lock);
+
+ bulk[pages]->b_buf = kmap(buf[pages]);
+ bulk[pages]->b_buflen = PAGE_SIZE;
+ bulk[pages]->b_portal = OST_BULK_PORTAL;
+ ost_pack_niobuf(&ptr2, bulk[pages]->b_buf,
+ offset[pages], count[pages],
+ flags[pages], bulk[pages]->b_xid);
+
+ rc = ptlrpc_register_bulk(bulk[pages]);
+ if (rc)
+ GOTO(out, rc);
+ }
+ }
+
+ request->rq_replen = lustre_msg_size(1, size);
+ rc = ptlrpc_queue_wait(request);
+ GOTO(out, rc);
+
+ out:
+ /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
+ * abort those bulk listeners. */
+
+ for (pages = 0, i = 0; i < num_oa; i++) {
+ for (j = 0; j < oa_bufs[i]; j++, pages++) {
+ if (bulk[pages] == NULL)
+ continue;
+ kunmap(buf[pages]);
+ ptlrpc_free_bulk(bulk[pages]);
+ }
+ }
+
+ OBD_FREE(bulk, pages * sizeof(*bulk));
+ ptlrpc_free_req(request);
+ return rc;
+}
+
+int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
+ obd_count *oa_bufs, struct page **buf, obd_size *count,
+ obd_off *offset, obd_flag *flags)