size[0] = sizeof(struct ptlbd_op);
size[1] = page_count * sizeof(struct ptlbd_niob);
- req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_PBD_VERSION, cmd, 2, size, NULL);
if (!req)
RETURN(rc = 1); /* need to return error cnt */
op->op_block_cnt = page_count;
if (cmd == PTLBD_READ)
- desc = ptlrpc_prep_bulk_imp (req, BULK_PUT_SINK, PTLBD_BULK_PORTAL);
+ desc = ptlrpc_prep_bulk_imp (req, page_count,
+ BULK_PUT_SINK, PTLBD_BULK_PORTAL);
else
- desc = ptlrpc_prep_bulk_imp (req, BULK_GET_SOURCE, PTLBD_BULK_PORTAL);
+ desc = ptlrpc_prep_bulk_imp (req, page_count,
+ BULK_GET_SOURCE, PTLBD_BULK_PORTAL);
if ( desc == NULL )
GOTO(out, rc = 1); /* need to return error cnt */
/* NB req now owns desc, and frees it when she frees herself */
for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_reqnext, niob++ ) {
- rc = ptlrpc_prep_bulk_page(desc, bh->b_page,
- bh_offset (bh) & (PAGE_SIZE - 1),
- bh->b_size);
- if (rc != 0)
- GOTO(out, rc = 1); /* need to return error cnt */
+ ptlrpc_prep_bulk_page(desc, bh->b_page,
+ bh_offset (bh) & (PAGE_SIZE - 1),
+ bh->b_size);
niob->n_block_nr = bh->b_blocknr;
niob->n_offset = bh_offset(bh);
req->rq_replen = lustre_msg_size(1, &rep_size);
/* XXX find out how we're really supposed to manage levels */
- req->rq_level = imp->imp_level;
+ req->rq_send_state = imp->imp_state;
rc = ptlrpc_queue_wait(req);
if ( rc != 0 )
size[0] = sizeof(struct ptlbd_op);
- req = ptlrpc_prep_req(imp, cmd, 1, size, NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_PBD_VERSION, cmd, 1, size, NULL);
if (!req)
RETURN(-ENOMEM);
req->rq_replen = lustre_msg_size(1, &rep_size);
/* XXX find out how we're really supposed to manage levels */
- req->rq_level = imp->imp_level;
+ req->rq_send_state = imp->imp_state;
rc = ptlrpc_queue_wait(req);
if ( rc != 0 )
struct ptlbd_niob *niob, *niobs;
struct ptlbd_rsp *rsp;
struct ptlrpc_bulk_desc *desc = NULL;
- struct file *filp = req->rq_obd->u.ptlbd.filp;
+ struct file *filp = req->rq_export->exp_obd->u.ptlbd.filp;
struct l_wait_info lwi;
int size[1], i, page_count, rc = 0, error_cnt = 0;
struct list_head *pos, *n;
GOTO (out, rc = -EFAULT);
size[0] = sizeof(struct ptlbd_rsp);
- rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
+ rc = lustre_pack_reply(req, 1, size, NULL);
if ( rc )
GOTO(out, rc);
if ( rsp == NULL )
GOTO (out, rc = -EFAULT);
+ /* FIXME: assumes each niobuf fits in 1 page */
page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
if (swab) { /* swab remaining niobs */
for (i = 1; i < page_count; i++)
}
if (cmd == PTLBD_READ)
- desc = ptlrpc_prep_bulk_exp (req, BULK_PUT_SOURCE, PTLBD_BULK_PORTAL);
+ desc = ptlrpc_prep_bulk_exp (req, page_count,
+ BULK_PUT_SOURCE, PTLBD_BULK_PORTAL);
else
- desc = ptlrpc_prep_bulk_exp (req, BULK_GET_SINK, PTLBD_BULK_PORTAL);
+ desc = ptlrpc_prep_bulk_exp (req, page_count,
+ BULK_GET_SINK, PTLBD_BULK_PORTAL);
if (desc == NULL) {
error_cnt++;
GOTO(out_reply, rc = -ENOMEM);
}
list_add_tail(&page->list, &tmp_pages);
- rc = ptlrpc_prep_bulk_page(desc, page,
- niob->n_offset & (PAGE_SIZE - 1),
- niob->n_length);
- if (rc != 0) {
- error_cnt++;
- GOTO(out_reply, rc);
- }
+ ptlrpc_prep_bulk_page(desc, page,
+ niob->n_offset & (PAGE_SIZE - 1),
+ niob->n_length);
}
if ( cmd == PTLBD_READ ) {
- if ((rc = ptlbd_do_filp(filp, PTLBD_READ, niobs,
- page_count, &tmp_pages)) < 0) {
+ rc = ptlbd_do_filp(filp, PTLBD_READ, niobs,
+ page_count, &tmp_pages);
+ if (rc < 0) {
error_cnt++;
GOTO(out_reply, rc);
}
- rc = ptlrpc_bulk_put(desc);
- } else {
- rc = ptlrpc_bulk_get(desc);
}
+ rc = ptlrpc_start_bulk_transfer(desc);
if ( rc ) {
error_cnt++;
GOTO(out_reply, rc);
}
- lwi = LWI_TIMEOUT(obd_timeout * HZ, NULL, desc);
- rc = l_wait_event(desc->bd_waitq, ptlrpc_bulk_complete(desc), &lwi);
+ lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, desc);
+ rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
if (rc != 0) {
LASSERT(rc == -ETIMEDOUT);
ptlrpc_abort_bulk(desc);
error_cnt++;
GOTO(out_reply, rc);
}
+
+ /* XXX do some error handling */
+ LASSERT(desc->bd_success && desc->bd_nob_transferred == desc->bd_nob);
if ( cmd == PTLBD_WRITE ) {
if ((rc = ptlbd_do_filp(filp, PTLBD_WRITE, niobs,
struct ptlrpc_request *req)
{
struct ptlbd_rsp *rsp;
- struct file *filp = req->rq_obd->u.ptlbd.filp;
+ struct file *filp = req->rq_export->exp_obd->u.ptlbd.filp;
int size[1], rc, status;
ENTRY;
size[0] = sizeof(struct ptlbd_rsp);
- rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
+ rc = lustre_pack_reply(req, 1, size, NULL);
if ( rc )
RETURN(rc);
swab = lustre_msg_swabbed (req->rq_reqmsg);
if (req->rq_reqmsg->opc == PTLBD_CONNECT) {
- rc = target_handle_connect(req, ptlbd_handle);
+ rc = target_handle_connect(req);
target_send_reply(req, rc, OBD_FAIL_PTLRPC);
RETURN(0);
}