#include <linux/lprocfs_status.h>
#include <linux/obd_ptlbd.h>
-#define RSP_OK 0
-#define RSP_NOTOK -1
-#define RQ_OK 0
-
-int ptlbd_send_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
- struct request *blkreq)
+int ptlbd_send_rw_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd,
+ struct buffer_head *first_bh)
{
- struct buffer_head *first_bh = blkreq->bh;
- struct obd_import *imp = &ptlbd->bd_import;
+ struct obd_import *imp = ptlbd->bd_import;
struct ptlbd_op *op;
struct ptlbd_niob *niob, *niobs;
struct ptlbd_rsp *rsp;
struct buffer_head *bh;
unsigned int page_count;
int rc, rep_size, size[2];
- __u32 xid;
ENTRY;
LASSERT(cmd == PTLBD_READ || cmd == PTLBD_WRITE);
- for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_next )
+ for ( page_count = 0, bh = first_bh ; bh ; bh = bh->b_reqnext )
page_count++;
size[0] = sizeof(struct ptlbd_op);
size[1] = page_count * sizeof(struct ptlbd_niob);
- req = ptlrpc_prep_req(imp, cmd, 2, size, NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_PBD_VERSION, cmd, 2, size, NULL);
if (!req)
- RETURN(-ENOMEM);
+ RETURN(rc = 1); /* need to return error cnt */
- op = lustre_msg_buf(req->rq_reqmsg, 0);
- niobs = lustre_msg_buf(req->rq_reqmsg, 1);
+ op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
+ niobs = lustre_msg_buf(req->rq_reqmsg, 1, size[1]);
/* XXX pack */
op->op_cmd = cmd;
op->op__padding = 0;
op->op_block_cnt = page_count;
- desc = ptlrpc_prep_bulk(imp->imp_connection);
+ if (cmd == PTLBD_READ)
+ desc = ptlrpc_prep_bulk_imp (req, page_count,
+ BULK_PUT_SINK, PTLBD_BULK_PORTAL);
+ else
+ desc = ptlrpc_prep_bulk_imp (req, page_count,
+ BULK_GET_SOURCE, PTLBD_BULK_PORTAL);
if ( desc == NULL )
- GOTO(out_req, rc = -ENOMEM);
- desc->bd_portal = PTLBD_BULK_PORTAL;
- desc->bd_ptl_ev_hdlr = NULL;
-
- xid = ptlrpc_next_xid();
-
- for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_next, niob++ ) {
- struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
- if (bulk == NULL)
- GOTO(out_req, rc = -ENOMEM);
+ GOTO(out, rc = 1); /* need to return error cnt */
+ /* NB req now owns desc, and frees it when she frees herself */
+
+ for ( niob = niobs, bh = first_bh ; bh ; bh = bh->b_reqnext, niob++ ) {
+ ptlrpc_prep_bulk_page(desc, bh->b_page,
+ bh_offset (bh) & (PAGE_SIZE - 1),
+ bh->b_size);
- niob->n_xid = xid;
niob->n_block_nr = bh->b_blocknr;
niob->n_offset = bh_offset(bh);
niob->n_length = bh->b_size;
-
- bulk->bp_xid = xid;
- bulk->bp_buf = bh->b_data;
- bulk->bp_page = bh->b_page;
- bulk->bp_buflen = bh->b_size;
}
- if ( cmd == PTLBD_READ )
- rc = ptlrpc_register_bulk_put(desc);
- else
- rc = ptlrpc_register_bulk_get(desc);
-
- if (rc)
- GOTO(out_desc, rc);
-
rep_size = sizeof(struct ptlbd_rsp);
req->rq_replen = lustre_msg_size(1, &rep_size);
/* XXX find out how we're really supposed to manage levels */
- req->rq_level = imp->imp_level;
+ req->rq_send_state = imp->imp_state;
rc = ptlrpc_queue_wait(req);
- if ( rc != 0 ) {
- blkreq->errors++;
- GOTO(out_desc, rc);
+ if ( rc != 0 )
+ GOTO(out, rc = 1); /* need to return error count */
+
+ rsp = lustre_swab_repbuf(req, 0, sizeof (*rsp),
+ lustre_swab_ptlbd_rsp);
+ if (rsp == NULL) {
+ CERROR ("can't unpack response\n");
+ GOTO (out, rc = 1); /* need to return error count */
}
- rsp = lustre_msg_buf(req->rq_repmsg, 0);
- if (rsp->r_status != RSP_OK) {
- blkreq->errors += rsp->r_error_cnt;
+ else if (rsp->r_status != 0) {
+ rc = rsp->r_error_cnt;
}
-out_desc:
- ptlrpc_bulk_decref(desc);
-out_req:
+out:
ptlrpc_req_finished(req);
RETURN(rc);
}
-static int ptlbd_bulk_timeout(void *data)
+
+int ptlbd_send_flush_req(struct ptlbd_obd *ptlbd, ptlbd_cmd_t cmd)
{
-/* struct ptlrpc_bulk_desc *desc = data;*/
+ struct obd_import *imp = ptlbd->bd_import;
+ struct ptlbd_op *op;
+ struct ptlbd_rsp *rsp;
+ struct ptlrpc_request *req;
+ int rc, rep_size, size[1];
ENTRY;
- CERROR("ugh, timed out\n");
+ LASSERT(cmd == PTLBD_FLUSH);
+
+ size[0] = sizeof(struct ptlbd_op);
+
+ req = ptlrpc_prep_req(imp, LUSTRE_PBD_VERSION, cmd, 1, size, NULL);
+ if (!req)
+ RETURN(-ENOMEM);
+
+ op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
+
+ /* XXX pack */
+ op->op_cmd = cmd;
+ op->op_lun = 0;
+ op->op_niob_cnt = 0;
+ op->op__padding = 0;
+ op->op_block_cnt = 0;
+
+ rep_size = sizeof(struct ptlbd_rsp);
+ req->rq_replen = lustre_msg_size(1, &rep_size);
+
+ /* XXX find out how we're really supposed to manage levels */
+ req->rq_send_state = imp->imp_state;
+
+ rc = ptlrpc_queue_wait(req);
+ if ( rc != 0 )
+ GOTO(out_req, rc = 1);
+ rsp = lustre_swab_repbuf(req, 0, sizeof (*rsp),
+ lustre_swab_ptlbd_rsp);
+ if (rsp->r_status != 0)
+ rc = rsp->r_status;
- RETURN(1);
+out_req:
+ ptlrpc_req_finished(req);
+ RETURN(rc);
}
+
int ptlbd_do_filp(struct file *filp, int op, struct ptlbd_niob *niobs,
int page_count, struct list_head *page_list)
{
mm_segment_t old_fs;
struct list_head *pos;
- int status = RSP_OK;
+ int status = 0;
ENTRY;
old_fs = get_fs();
struct page *page = list_entry(pos, struct page, list);
loff_t offset = (niobs->n_block_nr << PAGE_SHIFT) +
niobs->n_offset;
- if ( op == PTLBD_READ ) {
- if ((ret = filp->f_op->read(filp, page_address(page),
- niobs->n_length, &offset)) != niobs->n_length)
- status = ret;
- goto out;
- } else {
- if ((ret = filp->f_op->write(filp, page_address(page),
- niobs->n_length, &offset)) != niobs->n_length)
- status = ret;
- goto out;
- }
-
+ if ( op == PTLBD_READ )
+ ret = filp->f_op->read(filp, page_address(page),
+ niobs->n_length, &offset);
+ else
+ ret = filp->f_op->write(filp, page_address(page),
+ niobs->n_length, &offset);
+ if (ret != niobs->n_length) {
+ status = ret;
+ break;
+ }
niobs++;
}
-out:
set_fs(old_fs);
RETURN(status);
}
-int ptlbd_parse_req(struct ptlrpc_request *req)
+
+int ptlbd_srv_rw_req(ptlbd_cmd_t cmd, __u16 index,
+ struct ptlrpc_request *req, int swab)
{
- struct ptlbd_op *op;
struct ptlbd_niob *niob, *niobs;
struct ptlbd_rsp *rsp;
- struct ptlrpc_bulk_desc *desc;
- struct file *filp = req->rq_obd->u.ptlbd.filp;
+ struct ptlrpc_bulk_desc *desc = NULL;
+ struct file *filp = req->rq_export->exp_obd->u.ptlbd.filp;
struct l_wait_info lwi;
- int size[1], wait_flag, i, page_count, rc, error_cnt = 0,
- status = RSP_OK;
+ int size[1], i, page_count, rc = 0, error_cnt = 0;
struct list_head *pos, *n;
+ struct page *page;
LIST_HEAD(tmp_pages);
ENTRY;
- rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
+ niobs = lustre_swab_reqbuf (req, 1, sizeof (*niobs),
+ lustre_swab_ptlbd_niob);
+ if (niobs == NULL)
+ GOTO (out, rc = -EFAULT);
+
+ size[0] = sizeof(struct ptlbd_rsp);
+ rc = lustre_pack_reply(req, 1, size, NULL);
if ( rc )
- RETURN(rc);
+ GOTO(out, rc);
- op = lustre_msg_buf(req->rq_reqmsg, 0);
- LASSERT(op->op_cmd == PTLBD_READ || op->op_cmd == PTLBD_WRITE);
+ rsp = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rsp));
+ if ( rsp == NULL )
+ GOTO (out, rc = -EFAULT);
- niobs = lustre_msg_buf(req->rq_reqmsg, 1);
+ /* FIXME: assumes each niobuf fits in 1 page */
page_count = req->rq_reqmsg->buflens[1] / sizeof(struct ptlbd_niob);
-
- desc = ptlrpc_prep_bulk(req->rq_connection);
- if (desc == NULL)
- GOTO(out, rc = -ENOMEM);
- desc->bd_ptl_ev_hdlr = NULL;
+ if (swab) { /* swab remaining niobs */
+ for (i = 1; i < page_count; i++)
+ lustre_swab_ptlbd_niob(&niobs[i]);
+ }
+ if (req->rq_export == NULL) {
+ error_cnt++;
+ GOTO(out_reply, rc = -EFAULT);
+ }
+
+ if (cmd == PTLBD_READ)
+ desc = ptlrpc_prep_bulk_exp (req, page_count,
+ BULK_PUT_SOURCE, PTLBD_BULK_PORTAL);
+ else
+ desc = ptlrpc_prep_bulk_exp (req, page_count,
+ BULK_GET_SINK, PTLBD_BULK_PORTAL);
+ if (desc == NULL) {
+ error_cnt++;
+ GOTO(out_reply, rc = -ENOMEM);
+ }
desc->bd_portal = PTLBD_BULK_PORTAL;
+ LASSERT (page_count > 0);
for ( i = 0, niob = niobs ; i < page_count; niob++, i++) {
- struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
- if (bulk == NULL)
- GOTO(out_bulk, rc = -ENOMEM);
-
- bulk->bp_page = alloc_page(GFP_KERNEL);
- if (bulk->bp_page == NULL)
- GOTO(out_bulk, rc = -ENOMEM);
- list_add(&bulk->bp_page->list, &tmp_pages);
-
- bulk->bp_xid = niob->n_xid;
- bulk->bp_buf = page_address(bulk->bp_page);
- bulk->bp_buflen = niob->n_length;
+ page = alloc_page(GFP_KERNEL);
+ if (page == NULL) {
+ error_cnt++;
+ GOTO(out_reply, rc = -ENOMEM);
+ }
+ list_add_tail(&page->list, &tmp_pages);
+
+ ptlrpc_prep_bulk_page(desc, page,
+ niob->n_offset & (PAGE_SIZE - 1),
+ niob->n_length);
}
- if ( op->op_cmd == PTLBD_READ ) {
- if ((status = ptlbd_do_filp(filp, PTLBD_READ, niobs,
- page_count, &tmp_pages)) < 0) {
+ if ( cmd == PTLBD_READ ) {
+ rc = ptlbd_do_filp(filp, PTLBD_READ, niobs,
+ page_count, &tmp_pages);
+ if (rc < 0) {
error_cnt++;
+ GOTO(out_reply, rc);
}
- rc = ptlrpc_bulk_put(desc);
- wait_flag = PTL_BULK_FL_SENT;
- } else {
- rc = ptlrpc_bulk_get(desc);
- wait_flag = PTL_BULK_FL_RCVD;
}
+ rc = ptlrpc_start_bulk_transfer(desc);
- if ( rc )
- GOTO(out_bulk, rc);
-
- /* this synchronization probably isn't good enough */
- lwi = LWI_TIMEOUT(obd_timeout * HZ, ptlbd_bulk_timeout, desc);
- rc = l_wait_event(desc->bd_waitq, desc->bd_flags & wait_flag, &lwi);
+ if ( rc ) {
+ error_cnt++;
+ GOTO(out_reply, rc);
+ }
- size[0] = sizeof(struct ptlbd_rsp);
- rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
- if ( rc )
- GOTO(out, rc);
+ lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, desc);
+ rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
+ if (rc != 0) {
+ LASSERT(rc == -ETIMEDOUT);
+ ptlrpc_abort_bulk(desc);
+ error_cnt++;
+ GOTO(out_reply, rc);
+ }
- rsp = lustre_msg_buf(req->rq_repmsg, 0);
- if ( rsp == NULL )
- GOTO(out, rc = -EINVAL);
+ /* XXX do some error handling */
+ LASSERT(desc->bd_success && desc->bd_nob_transferred == desc->bd_nob);
- if ( op->op_cmd == PTLBD_WRITE ) {
- if ((status = ptlbd_do_filp(filp, PTLBD_WRITE, niobs,
+ if ( cmd == PTLBD_WRITE ) {
+ if ((rc = ptlbd_do_filp(filp, PTLBD_WRITE, niobs,
page_count, &tmp_pages)) < 0) {
error_cnt++;
}
}
+out_reply:
rsp->r_error_cnt = error_cnt;
- rsp->r_status = status; /* I/O status */
- req->rq_status = RQ_OK ; /* XXX */ /* ptlbd req status */
+ rsp->r_status = rc;
+ req->rq_status = rc;
- ptlrpc_reply(req->rq_svc, req);
+ ptlrpc_reply(req);
-out_bulk:
list_for_each_safe(pos, n, &tmp_pages) {
struct page *page = list_entry(pos, struct page, list);
list_del(&page->list);
__free_page(page);
}
- ptlrpc_bulk_decref(desc);
+ if (desc)
+ ptlrpc_free_bulk(desc);
out:
RETURN(rc);
}
+
+
+int ptlbd_srv_flush_req(ptlbd_cmd_t cmd, __u16 index,
+ struct ptlrpc_request *req)
+{
+ struct ptlbd_rsp *rsp;
+ struct file *filp = req->rq_export->exp_obd->u.ptlbd.filp;
+ int size[1], rc, status;
+ ENTRY;
+
+ size[0] = sizeof(struct ptlbd_rsp);
+ rc = lustre_pack_reply(req, 1, size, NULL);
+ if ( rc )
+ RETURN(rc);
+
+ rsp = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rsp));
+ if ( rsp == NULL )
+ RETURN(-EINVAL);
+
+ if (! (filp) && (filp->f_op) && (filp->f_op->fsync) &&
+ (filp->f_dentry))
+ GOTO(out_reply, status = -EINVAL);
+
+ status = filp->f_op->fsync(filp, filp->f_dentry, 1);
+
+out_reply:
+ rsp->r_error_cnt = 0;
+ rsp->r_status = status;
+ req->rq_status = 0;
+
+ ptlrpc_reply(req);
+ RETURN(0);
+}
+
+
+int ptlbd_handle(struct ptlrpc_request *req)
+{
+ struct ptlbd_op *op;
+ int swab;
+ int rc;
+ ENTRY;
+
+ swab = lustre_msg_swabbed (req->rq_reqmsg);
+
+ if (req->rq_reqmsg->opc == PTLBD_CONNECT) {
+ rc = target_handle_connect(req);
+ target_send_reply(req, rc, OBD_FAIL_PTLRPC);
+ RETURN(0);
+ }
+ if (req->rq_reqmsg->opc == PTLBD_DISCONNECT) {
+ rc = target_handle_disconnect(req);
+ target_send_reply(req, rc, OBD_FAIL_PTLRPC);
+ RETURN(0);
+ }
+ op = lustre_swab_reqbuf (req, 0, sizeof (*op),
+ lustre_swab_ptlbd_op);
+ if (op == NULL)
+ RETURN(-EFAULT);
+
+ switch (op->op_cmd) {
+ case PTLBD_READ:
+ case PTLBD_WRITE:
+ rc = ptlbd_srv_rw_req(op->op_cmd, op->op_lun, req,
+ swab);
+ break;
+
+ case PTLBD_FLUSH:
+ rc = ptlbd_srv_flush_req(op->op_cmd, op->op_lun, req);
+ break;
+ default:
+ rc = -EINVAL;
+ }
+
+ RETURN(rc);
+}