int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
struct niobuf *dst, struct niobuf *src)
{
- if (conn->oc_id != -1) {
+ struct ptlrpc_client *cl = osc_con2cl(conn);
+
+ if (cl->cli_obd) {
/* local sendpage */
memcpy((char *)(unsigned long)dst->addr,
(char *)(unsigned long)src->addr, src->len);
} else {
- struct ptlrpc_client *cl = osc_con2cl(conn);
struct ptlrpc_bulk_desc *bulk;
- char *buf;
int rc;
bulk = ptlrpc_prep_bulk(&cl->cli_server);
if (bulk == NULL)
return -ENOMEM;
- spin_lock(&cl->cli_lock);
- bulk->b_xid = cl->cli_xid++;
- spin_unlock(&cl->cli_lock);
-
- OBD_ALLOC(buf, src->len);
- if (!buf) {
- OBD_FREE(bulk, sizeof(*bulk));
- return -ENOMEM;
- }
-
- memcpy(buf, (char *)(unsigned long)src->addr, src->len);
-
- bulk->b_buf = buf;
+ bulk->b_buf = (void *)(unsigned long)src->addr;
bulk->b_buflen = src->len;
- /* FIXME: maybe we should add an XID to struct niobuf? */
- bulk->b_xid = (__u32)(unsigned long)src->page;
-
+ bulk->b_xid = dst->xid;
rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
if (rc != 0) {
CERROR("send_bulk failed: %d\n", rc);
}
OBD_FREE(bulk, sizeof(*bulk));
- OBD_FREE(buf, src->len);
}
return 0;
obd_off *offset, obd_flag *flags)
{
struct ptlrpc_client *cl = osc_con2cl(conn);
- struct ptlrpc_request *request;
+ struct ptlrpc_request *request, *req2 = NULL;
struct obd_ioobj ioo;
struct niobuf src;
int pages, rc, i, j, n, size1, size2 = 0;
- void *ptr1, *ptr2;
+ void *ptr1, *ptr2, *reqbuf;
size1 = num_oa * sizeof(ioo);
pages = 0;
CERROR("cannot pack req!\n");
return -ENOMEM;
}
+ OBD_ALLOC(reqbuf, request->rq_reqlen);
+ if (reqbuf == NULL) {
+ CERROR("cannot make duplicate buffer\n");
+ return -ENOMEM;
+ }
request->rq_req.ost->cmd = OBD_BRW_WRITE;
n = 0;
}
}
+ memcpy(reqbuf, request->rq_reqbuf, request->rq_reqlen);
request->rq_replen = sizeof(struct ptlrep_hdr) +
sizeof(struct ost_rep) + pages * sizeof(struct niobuf);
rc = ptlrpc_queue_wait(cl, request);
goto out;
}
+ n = 0;
for (i = 0; i < num_oa; i++) {
for (j = 0; j < oa_bufs[i]; j++) {
struct niobuf *dst;
}
}
- /* Reuse the request structure for the completion request. */
- OBD_FREE(request->rq_rephdr, request->rq_replen);
- request->rq_rephdr = NULL;
- request->rq_repbuf = NULL;
- request->rq_reqhdr->opc = OST_BRW_COMPLETE;
- request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
- rc = ptlrpc_queue_wait(cl, request);
+ ptr2 = ost_rep_buf2(request->rq_rep.ost);
+ req2 = ptlrpc_prep_req(cl, OST_BRW_COMPLETE, size1, ptr1,
+ request->rq_rep.ost->buflen2, ptr2);
+ if (!req2) {
+ CERROR("cannot pack second request!\n");
+ return -ENOMEM;
+ }
+
+ req2->rq_reqhdr->opc = OST_BRW_COMPLETE;
+ req2->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
+ rc = ptlrpc_queue_wait(cl, req2);
if (rc) {
EXIT;
goto out;
out:
if (request->rq_rephdr)
OBD_FREE(request->rq_rephdr, request->rq_replen);
+ if (req2 && req2->rq_rephdr)
+ OBD_FREE(req2->rq_rephdr, req2->rq_replen);
n = 0;
for (i = 0; i < num_oa; i++) {
for (j = 0; j < oa_bufs[i]; j++) {
}
}
+ if (req2)
+ ptlrpc_free_req(req2);
ptlrpc_free_req(request);
return 0;
}
return 0;
}
-int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
+static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
{
struct ptlrpc_bulk_desc **bulk_vec = NULL;
struct ptlrpc_bulk_desc *bulk = NULL;
int i, j;
int objcount, niocount;
char *tmp1, *tmp2, *end2;
- char *res;
+ char *res = NULL;
int cmd;
- struct niobuf *nb, *src, *dst;
+ struct niobuf *nb, *src;
struct obd_ioobj *ioo;
struct ost_req *r = req->rq_req.ost;
}
}
- rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb),
+ rc = ost_pack_rep(NULL, 0, NULL, 0,
&req->rq_rephdr, &req->rq_rep,
&req->rq_replen, &req->rq_repbuf);
- if (rc) {
+ if (rc) {
CERROR("cannot pack reply\n");
return rc;
}
goto out;
}
- if (cmd == OBD_BRW_WRITE) {
- /* Setup buffers for the incoming pages, then send the niobufs
- * describing those buffers to the OSC. */
- OBD_ALLOC(bulk_vec,
- niocount * sizeof(struct ptlrpc_bulk_desc *));
- if (bulk_vec == NULL) {
- CERROR("cannot alloc bulk desc vector\n");
- return -ENOMEM;
+ for (i = 0; i < niocount; i++) {
+ bulk = ptlrpc_prep_bulk(&req->rq_peer);
+ if (bulk == NULL) {
+ CERROR("cannot alloc bulk desc\n");
+ rc = -ENOMEM;
+ goto out;
}
- memset(bulk_vec, 0,
- niocount * sizeof(struct ptlrpc_bulk_desc *));
- for (i = 0; i < niocount; i++) {
- struct ptlrpc_service *srv =
- req->rq_obd->u.ost.ost_service;
-
- bulk_vec[i] = ptlrpc_prep_bulk(&req->rq_peer);
- if (bulk_vec[i] == NULL) {
- CERROR("cannot alloc bulk desc\n");
- rc = -ENOMEM;
- goto out;
- }
-
- spin_lock(&srv->srv_lock);
- bulk_vec[i]->b_xid = srv->srv_xid++;
- spin_unlock(&srv->srv_lock);
-
- dst = &((struct niobuf *)res)[i];
- /* FIXME: we overload ->page with the xid of this buffer
- * for the benefit of the remote client */
- dst->page =
- (void *)(unsigned long)HTON__u64(bulk_vec[i]->b_xid);
-
- bulk_vec[i]->b_buf = (void *)(unsigned long)dst->addr;
- bulk_vec[i]->b_buflen = PAGE_SIZE;
- bulk_vec[i]->b_portal = OSC_BULK_PORTAL;
- rc = ptlrpc_register_bulk(bulk_vec[i]);
- if (rc)
- goto out;
+ src = &((struct niobuf *)tmp2)[i];
+
+ bulk->b_xid = src->xid;
+ bulk->b_buf = (void *)(unsigned long)src->addr;
+ bulk->b_buflen = PAGE_SIZE;
+ rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
+ if (rc) {
+ EXIT;
+ goto out;
+ }
+ wait_event_interruptible(bulk->b_waitq,
+ ptlrpc_check_bulk_sent(bulk));
+
+ if (bulk->b_flags == PTL_RPC_INTR) {
+ EXIT;
+ goto out;
+ }
+
+ OBD_FREE(bulk, sizeof(*bulk));
+ bulk = NULL;
+ }
#if 0
- /* Local delivery */
- src = &((struct niobuf *)tmp2)[i];
- memcpy((void *)(unsigned long)dst->addr,
- (void *)(unsigned long)src->addr, src->len);
+ /* Local delivery */
+ dst = &((struct niobuf *)tmp2)[i];
+ memcpy((void *)(unsigned long)dst->addr,
+ (void *)(unsigned long)src->addr, PAGE_SIZE);
#endif
- }
- barrier();
- } else {
+ barrier();
+
+ out:
+ if (res != NULL)
+ OBD_FREE(res, sizeof(struct niobuf) * niocount);
+ if (bulk != NULL)
+ OBD_FREE(bulk, sizeof(*bulk));
+ if (bulk_vec != NULL) {
for (i = 0; i < niocount; i++) {
- bulk = ptlrpc_prep_bulk(&req->rq_peer);
- if (bulk == NULL) {
- CERROR("cannot alloc bulk desc\n");
- rc = -ENOMEM;
- goto out;
- }
-
- src = &((struct niobuf *)tmp2)[i];
-
- bulk->b_xid = src->xid;
- bulk->b_buf = (void *)(unsigned long)src->addr;
- bulk->b_buflen = PAGE_SIZE;
- rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
- if (rc) {
- EXIT;
- goto out;
- }
- wait_event_interruptible(bulk->b_waitq,
- ptlrpc_check_bulk_sent(bulk));
-
- if (bulk->b_flags == PTL_RPC_INTR) {
- EXIT;
- goto out;
- }
-
- OBD_FREE(bulk, sizeof(*bulk));
- bulk = NULL;
+ if (bulk_vec[i] != NULL)
+ OBD_FREE(bulk_vec[i], sizeof(*bulk));
}
+ OBD_FREE(bulk_vec,
+ niocount * sizeof(struct ptlrpc_bulk_desc *));
+ }
+
+ EXIT;
+ return 0;
+}
+
+int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc **bulk_vec = NULL;
+ struct ptlrpc_bulk_desc *bulk = NULL;
+ struct obd_conn conn;
+ int rc;
+ int i, j;
+ int objcount, niocount;
+ char *tmp1, *tmp2, *end2;
+ char *res;
+ int cmd;
+ struct niobuf *nb, *dst;
+ struct obd_ioobj *ioo;
+ struct ost_req *r = req->rq_req.ost;
+
+ ENTRY;
+
+ tmp1 = ost_req_buf1(r);
+ tmp2 = ost_req_buf2(r);
+ end2 = tmp2 + req->rq_req.ost->buflen2;
+ objcount = r->buflen1 / sizeof(*ioo);
+ niocount = r->buflen2 / sizeof(*nb);
+ cmd = r->cmd;
+
+ conn.oc_id = req->rq_req.ost->connid;
+ conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
+
+ for (i = 0; i < objcount; i++) {
+ ost_unpack_ioo((void *)&tmp1, &ioo);
+ if (tmp2 + ioo->ioo_bufcnt > end2) {
+ rc = -EFAULT;
+ break;
+ }
+ for (j = 0; j < ioo->ioo_bufcnt; j++) {
+ ost_unpack_niobuf((void *)&tmp2, &nb);
+ }
+ }
+
+ rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb),
+ &req->rq_rephdr, &req->rq_rep,
+ &req->rq_replen, &req->rq_repbuf);
+ if (rc) {
+ CERROR("cannot pack reply\n");
+ return rc;
+ }
+ res = ost_rep_buf2(req->rq_rep.ost);
+
+ /* The unpackers move tmp1 and tmp2, so reset them before using */
+ tmp1 = ost_req_buf1(r);
+ tmp2 = ost_req_buf2(r);
+ req->rq_rep.ost->result = obd_preprw
+ (cmd, &conn, objcount, (struct obd_ioobj *)tmp1,
+ niocount, (struct niobuf *)tmp2, (struct niobuf *)res);
+
+ if (req->rq_rep.ost->result) {
+ EXIT;
+ goto out;
+ }
+
+ /* Setup buffers for the incoming pages, then send the niobufs
+ * describing those buffers to the OSC. */
+ OBD_ALLOC(bulk_vec, niocount * sizeof(struct ptlrpc_bulk_desc *));
+ if (bulk_vec == NULL) {
+ CERROR("cannot alloc bulk desc vector\n");
+ return -ENOMEM;
+ }
+ memset(bulk_vec, 0, niocount * sizeof(struct ptlrpc_bulk_desc *));
+
+ for (i = 0; i < niocount; i++) {
+ struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
+
+ bulk_vec[i] = ptlrpc_prep_bulk(&req->rq_peer);
+ if (bulk_vec[i] == NULL) {
+ CERROR("cannot alloc bulk desc\n");
+ rc = -ENOMEM;
+ goto out;
+ }
+
+ spin_lock(&srv->srv_lock);
+ bulk_vec[i]->b_xid = srv->srv_xid++;
+ spin_unlock(&srv->srv_lock);
+
+ dst = &((struct niobuf *)res)[i];
+ dst->xid = HTON__u32(bulk_vec[i]->b_xid);
+
+ bulk_vec[i]->b_buf = (void *)(unsigned long)dst->addr;
+ bulk_vec[i]->b_buflen = PAGE_SIZE;
+ bulk_vec[i]->b_portal = OSC_BULK_PORTAL;
+ rc = ptlrpc_register_bulk(bulk_vec[i]);
+ if (rc)
+ goto out;
#if 0
/* Local delivery */
- dst = &((struct niobuf *)tmp2)[i];
- memcpy((void *)(unsigned long)dst->addr,
- (void *)(unsigned long)src->addr, PAGE_SIZE);
+ src = &((struct niobuf *)tmp2)[i];
+ memcpy((void *)(unsigned long)dst->addr,
+ (void *)(unsigned long)src->addr, src->len);
#endif
- barrier();
- }
+ }
+ barrier();
out:
if (bulk != NULL)
return 0;
}
+int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
+{
+ struct ost_req *r = req->rq_req.ost;
+ int cmd = r->cmd;
+
+ if (cmd == OBD_BRW_READ)
+ return ost_brw_read(obddev, req);
+ else
+ return ost_brw_write(obddev, req);
+}
+
int ost_brw_complete(struct ost_obd *obddev, struct ptlrpc_request *req)
{
struct obd_conn conn;