return 0;
}
-int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
+static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
{
struct ptlrpc_bulk_desc **bulk_vec = NULL;
struct ptlrpc_bulk_desc *bulk = NULL;
int i, j;
int objcount, niocount;
char *tmp1, *tmp2, *end2;
- char *res;
+ char *res = NULL;
int cmd;
- struct niobuf *nb, *src, *dst;
+ struct niobuf *nb, *src;
struct obd_ioobj *ioo;
struct ost_req *r = req->rq_req.ost;
}
}
- rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb),
+ rc = ost_pack_rep(NULL, 0, NULL, 0,
&req->rq_rephdr, &req->rq_rep,
&req->rq_replen, &req->rq_repbuf);
- if (rc) {
+ if (rc) {
CERROR("cannot pack reply\n");
return rc;
}
goto out;
}
- if (cmd == OBD_BRW_WRITE) {
- /* Setup buffers for the incoming pages, then send the niobufs
- * describing those buffers to the OSC. */
- OBD_ALLOC(bulk_vec,
- niocount * sizeof(struct ptlrpc_bulk_desc *));
- if (bulk_vec == NULL) {
- CERROR("cannot alloc bulk desc vector\n");
- return -ENOMEM;
+ for (i = 0; i < niocount; i++) {
+ bulk = ptlrpc_prep_bulk(&req->rq_peer);
+ if (bulk == NULL) {
+ CERROR("cannot alloc bulk desc\n");
+ rc = -ENOMEM;
+ goto out;
}
- memset(bulk_vec, 0,
- niocount * sizeof(struct ptlrpc_bulk_desc *));
- for (i = 0; i < niocount; i++) {
- struct ptlrpc_service *srv =
- req->rq_obd->u.ost.ost_service;
-
- bulk_vec[i] = ptlrpc_prep_bulk(&req->rq_peer);
- if (bulk_vec[i] == NULL) {
- CERROR("cannot alloc bulk desc\n");
- rc = -ENOMEM;
- goto out;
- }
-
- spin_lock(&srv->srv_lock);
- bulk_vec[i]->b_xid = srv->srv_xid++;
- spin_unlock(&srv->srv_lock);
-
- dst = &((struct niobuf *)res)[i];
- /* FIXME: we overload ->page with the xid of this buffer
- * for the benefit of the remote client */
- dst->page =
- (void *)(unsigned long)HTON__u64(bulk_vec[i]->b_xid);
-
- bulk_vec[i]->b_buf = (void *)(unsigned long)dst->addr;
- bulk_vec[i]->b_buflen = PAGE_SIZE;
- bulk_vec[i]->b_portal = OSC_BULK_PORTAL;
- rc = ptlrpc_register_bulk(bulk_vec[i]);
- if (rc)
- goto out;
+ src = &((struct niobuf *)tmp2)[i];
+
+ bulk->b_xid = src->xid;
+ bulk->b_buf = (void *)(unsigned long)src->addr;
+ bulk->b_buflen = PAGE_SIZE;
+ rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
+ if (rc) {
+ EXIT;
+ goto out;
+ }
+ wait_event_interruptible(bulk->b_waitq,
+ ptlrpc_check_bulk_sent(bulk));
+
+ if (bulk->b_flags == PTL_RPC_INTR) {
+ EXIT;
+ goto out;
+ }
+
+ OBD_FREE(bulk, sizeof(*bulk));
+ bulk = NULL;
+ }
#if 0
- /* Local delivery */
- src = &((struct niobuf *)tmp2)[i];
- memcpy((void *)(unsigned long)dst->addr,
- (void *)(unsigned long)src->addr, src->len);
+ /* Local delivery */
+ dst = &((struct niobuf *)tmp2)[i];
+ memcpy((void *)(unsigned long)dst->addr,
+ (void *)(unsigned long)src->addr, PAGE_SIZE);
#endif
- }
- barrier();
- } else {
+ barrier();
+
+ out:
+ if (res != NULL)
+ OBD_FREE(res, sizeof(struct niobuf) * niocount);
+ if (bulk != NULL)
+ OBD_FREE(bulk, sizeof(*bulk));
+ if (bulk_vec != NULL) {
for (i = 0; i < niocount; i++) {
- bulk = ptlrpc_prep_bulk(&req->rq_peer);
- if (bulk == NULL) {
- CERROR("cannot alloc bulk desc\n");
- rc = -ENOMEM;
- goto out;
- }
-
- src = &((struct niobuf *)tmp2)[i];
-
- bulk->b_xid = src->xid;
- bulk->b_buf = (void *)(unsigned long)src->addr;
- bulk->b_buflen = PAGE_SIZE;
- rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
- if (rc) {
- EXIT;
- goto out;
- }
- wait_event_interruptible(bulk->b_waitq,
- ptlrpc_check_bulk_sent(bulk));
-
- if (bulk->b_flags == PTL_RPC_INTR) {
- EXIT;
- goto out;
- }
-
- OBD_FREE(bulk, sizeof(*bulk));
- bulk = NULL;
+ if (bulk_vec[i] != NULL)
+ OBD_FREE(bulk_vec[i], sizeof(*bulk));
}
+ OBD_FREE(bulk_vec,
+ niocount * sizeof(struct ptlrpc_bulk_desc *));
+ }
+
+ EXIT;
+ return 0;
+}
+
+int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc **bulk_vec = NULL;
+ struct ptlrpc_bulk_desc *bulk = NULL;
+ struct obd_conn conn;
+ int rc;
+ int i, j;
+ int objcount, niocount;
+ char *tmp1, *tmp2, *end2;
+ char *res;
+ int cmd;
+ struct niobuf *nb, *dst;
+ struct obd_ioobj *ioo;
+ struct ost_req *r = req->rq_req.ost;
+
+ ENTRY;
+
+ tmp1 = ost_req_buf1(r);
+ tmp2 = ost_req_buf2(r);
+ end2 = tmp2 + req->rq_req.ost->buflen2;
+ objcount = r->buflen1 / sizeof(*ioo);
+ niocount = r->buflen2 / sizeof(*nb);
+ cmd = r->cmd;
+
+ conn.oc_id = req->rq_req.ost->connid;
+ conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
+
+ for (i = 0; i < objcount; i++) {
+ ost_unpack_ioo((void *)&tmp1, &ioo);
+ if (tmp2 + ioo->ioo_bufcnt > end2) {
+ rc = -EFAULT;
+ break;
+ }
+ for (j = 0; j < ioo->ioo_bufcnt; j++) {
+ ost_unpack_niobuf((void *)&tmp2, &nb);
+ }
+ }
+
+ rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb),
+ &req->rq_rephdr, &req->rq_rep,
+ &req->rq_replen, &req->rq_repbuf);
+ if (rc) {
+ CERROR("cannot pack reply\n");
+ return rc;
+ }
+ res = ost_rep_buf2(req->rq_rep.ost);
+
+ /* The unpackers move tmp1 and tmp2, so reset them before using */
+ tmp1 = ost_req_buf1(r);
+ tmp2 = ost_req_buf2(r);
+ req->rq_rep.ost->result = obd_preprw
+ (cmd, &conn, objcount, (struct obd_ioobj *)tmp1,
+ niocount, (struct niobuf *)tmp2, (struct niobuf *)res);
+
+ if (req->rq_rep.ost->result) {
+ EXIT;
+ goto out;
+ }
+
+ /* Setup buffers for the incoming pages, then send the niobufs
+ * describing those buffers to the OSC. */
+ OBD_ALLOC(bulk_vec, niocount * sizeof(struct ptlrpc_bulk_desc *));
+ if (bulk_vec == NULL) {
+ CERROR("cannot alloc bulk desc vector\n");
+ return -ENOMEM;
+ }
+ memset(bulk_vec, 0, niocount * sizeof(struct ptlrpc_bulk_desc *));
+
+ for (i = 0; i < niocount; i++) {
+ struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
+
+ bulk_vec[i] = ptlrpc_prep_bulk(&req->rq_peer);
+ if (bulk_vec[i] == NULL) {
+ CERROR("cannot alloc bulk desc\n");
+ rc = -ENOMEM;
+ goto out;
+ }
+
+ spin_lock(&srv->srv_lock);
+ bulk_vec[i]->b_xid = srv->srv_xid++;
+ spin_unlock(&srv->srv_lock);
+
+ dst = &((struct niobuf *)res)[i];
+ dst->xid = HTON__u32(bulk_vec[i]->b_xid);
+
+ bulk_vec[i]->b_buf = (void *)(unsigned long)dst->addr;
+ bulk_vec[i]->b_buflen = PAGE_SIZE;
+ bulk_vec[i]->b_portal = OSC_BULK_PORTAL;
+ rc = ptlrpc_register_bulk(bulk_vec[i]);
+ if (rc)
+ goto out;
#if 0
/* Local delivery */
- dst = &((struct niobuf *)tmp2)[i];
- memcpy((void *)(unsigned long)dst->addr,
- (void *)(unsigned long)src->addr, PAGE_SIZE);
+ src = &((struct niobuf *)tmp2)[i];
+ memcpy((void *)(unsigned long)dst->addr,
+ (void *)(unsigned long)src->addr, src->len);
#endif
- barrier();
- }
+ }
+ barrier();
out:
if (bulk != NULL)
return 0;
}
+int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
+{
+ struct ost_req *r = req->rq_req.ost;
+ int cmd = r->cmd;
+
+ if (cmd == OBD_BRW_READ)
+ return ost_brw_read(obddev, req);
+ else
+ return ost_brw_write(obddev, req);
+}
+
int ost_brw_complete(struct ost_obd *obddev, struct ptlrpc_request *req)
{
struct obd_conn conn;