From 51571a86a14d094502ab18e28f1812aa91ff5334 Mon Sep 17 00:00:00 2001 From: pschwan Date: Sun, 3 Mar 2002 05:46:50 +0000 Subject: [PATCH] *** empty log message *** --- lustre/osc/osc_request.c | 58 ++++++------ lustre/ost/ost_handler.c | 242 +++++++++++++++++++++++++++++++---------------- 2 files changed, 190 insertions(+), 110 deletions(-) diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 824d36a..1b93efe 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -273,37 +273,23 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa) int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req, struct niobuf *dst, struct niobuf *src) { - if (conn->oc_id != -1) { + struct ptlrpc_client *cl = osc_con2cl(conn); + + if (cl->cli_obd) { /* local sendpage */ memcpy((char *)(unsigned long)dst->addr, (char *)(unsigned long)src->addr, src->len); } else { - struct ptlrpc_client *cl = osc_con2cl(conn); struct ptlrpc_bulk_desc *bulk; - char *buf; int rc; bulk = ptlrpc_prep_bulk(&cl->cli_server); if (bulk == NULL) return -ENOMEM; - spin_lock(&cl->cli_lock); - bulk->b_xid = cl->cli_xid++; - spin_unlock(&cl->cli_lock); - - OBD_ALLOC(buf, src->len); - if (!buf) { - OBD_FREE(bulk, sizeof(*bulk)); - return -ENOMEM; - } - - memcpy(buf, (char *)(unsigned long)src->addr, src->len); - - bulk->b_buf = buf; + bulk->b_buf = (void *)(unsigned long)src->addr; bulk->b_buflen = src->len; - /* FIXME: maybe we should add an XID to struct niobuf? */ - bulk->b_xid = (__u32)(unsigned long)src->page; - + bulk->b_xid = dst->xid; rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL); if (rc != 0) { CERROR("send_bulk failed: %d\n", rc); @@ -320,7 +306,6 @@ int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req, } OBD_FREE(bulk, sizeof(*bulk)); - OBD_FREE(buf, src->len); } return 0; @@ -429,11 +414,11 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, obd_off *offset, obd_flag *flags) { struct ptlrpc_client *cl = osc_con2cl(conn); - struct ptlrpc_request *request; + struct ptlrpc_request *request, *req2 = NULL; struct obd_ioobj ioo; struct niobuf src; int pages, rc, i, j, n, size1, size2 = 0; - void *ptr1, *ptr2; + void *ptr1, *ptr2, *reqbuf; size1 = num_oa * sizeof(ioo); pages = 0; @@ -447,6 +432,11 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, CERROR("cannot pack req!\n"); return -ENOMEM; } + OBD_ALLOC(reqbuf, request->rq_reqlen); + if (reqbuf == NULL) { + CERROR("cannot make duplicate buffer\n"); + return -ENOMEM; + } request->rq_req.ost->cmd = OBD_BRW_WRITE; n = 0; @@ -461,6 +451,7 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, } } + memcpy(reqbuf, request->rq_reqbuf, request->rq_reqlen); request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep) + pages * sizeof(struct niobuf); rc = ptlrpc_queue_wait(cl, request); @@ -477,6 +468,7 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, goto out; } + n = 0; for (i = 0; i < num_oa; i++) { for (j = 0; j < oa_bufs[i]; j++) { struct niobuf *dst; @@ -488,13 +480,17 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, } } - /* Reuse the request structure for the completion request. */ - OBD_FREE(request->rq_rephdr, request->rq_replen); - request->rq_rephdr = NULL; - request->rq_repbuf = NULL; - request->rq_reqhdr->opc = OST_BRW_COMPLETE; - request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - rc = ptlrpc_queue_wait(cl, request); + ptr2 = ost_rep_buf2(request->rq_rep.ost); + req2 = ptlrpc_prep_req(cl, OST_BRW_COMPLETE, size1, ptr1, + request->rq_rep.ost->buflen2, ptr2); + if (!req2) { + CERROR("cannot pack second request!\n"); + return -ENOMEM; + } + + req2->rq_reqhdr->opc = OST_BRW_COMPLETE; + req2->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); + rc = ptlrpc_queue_wait(cl, req2); if (rc) { EXIT; goto out; @@ -503,6 +499,8 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, out: if (request->rq_rephdr) OBD_FREE(request->rq_rephdr, request->rq_replen); + if (req2 && req2->rq_rephdr) + OBD_FREE(req2->rq_rephdr, req2->rq_replen); n = 0; for (i = 0; i < num_oa; i++) { for (j = 0; j < oa_bufs[i]; j++) { @@ -511,6 +509,8 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, } } + if (req2) + ptlrpc_free_req(req2); ptlrpc_free_req(request); return 0; } diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 4e128bf..46b2913 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -258,7 +258,7 @@ static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req) return 0; } -int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req) +static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req) { struct ptlrpc_bulk_desc **bulk_vec = NULL; struct ptlrpc_bulk_desc *bulk = NULL; @@ -267,9 +267,9 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req) int i, j; int objcount, niocount; char *tmp1, *tmp2, *end2; - char *res; + char *res = NULL; int cmd; - struct niobuf *nb, *src, *dst; + struct niobuf *nb, *src; struct obd_ioobj *ioo; struct ost_req *r = req->rq_req.ost; @@ -296,10 +296,10 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req) } } - rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb), + rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, &req->rq_replen, &req->rq_repbuf); - if (rc) { + if (rc) { CERROR("cannot pack reply\n"); return rc; } @@ -321,93 +321,162 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req) goto out; } - if (cmd == OBD_BRW_WRITE) { - /* Setup buffers for the incoming pages, then send the niobufs - * describing those buffers to the OSC. */ - OBD_ALLOC(bulk_vec, - niocount * sizeof(struct ptlrpc_bulk_desc *)); - if (bulk_vec == NULL) { - CERROR("cannot alloc bulk desc vector\n"); - return -ENOMEM; + for (i = 0; i < niocount; i++) { + bulk = ptlrpc_prep_bulk(&req->rq_peer); + if (bulk == NULL) { + CERROR("cannot alloc bulk desc\n"); + rc = -ENOMEM; + goto out; } - memset(bulk_vec, 0, - niocount * sizeof(struct ptlrpc_bulk_desc *)); - for (i = 0; i < niocount; i++) { - struct ptlrpc_service *srv = - req->rq_obd->u.ost.ost_service; - - bulk_vec[i] = ptlrpc_prep_bulk(&req->rq_peer); - if (bulk_vec[i] == NULL) { - CERROR("cannot alloc bulk desc\n"); - rc = -ENOMEM; - goto out; - } - - spin_lock(&srv->srv_lock); - bulk_vec[i]->b_xid = srv->srv_xid++; - spin_unlock(&srv->srv_lock); - - dst = &((struct niobuf *)res)[i]; - /* FIXME: we overload ->page with the xid of this buffer - * for the benefit of the remote client */ - dst->page = - (void *)(unsigned long)HTON__u64(bulk_vec[i]->b_xid); - - bulk_vec[i]->b_buf = (void *)(unsigned long)dst->addr; - bulk_vec[i]->b_buflen = PAGE_SIZE; - bulk_vec[i]->b_portal = OSC_BULK_PORTAL; - rc = ptlrpc_register_bulk(bulk_vec[i]); - if (rc) - goto out; + src = &((struct niobuf *)tmp2)[i]; + + bulk->b_xid = src->xid; + bulk->b_buf = (void *)(unsigned long)src->addr; + bulk->b_buflen = PAGE_SIZE; + rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL); + if (rc) { + EXIT; + goto out; + } + wait_event_interruptible(bulk->b_waitq, + ptlrpc_check_bulk_sent(bulk)); + + if (bulk->b_flags == PTL_RPC_INTR) { + EXIT; + goto out; + } + + OBD_FREE(bulk, sizeof(*bulk)); + bulk = NULL; + } #if 0 - /* Local delivery */ - src = &((struct niobuf *)tmp2)[i]; - memcpy((void *)(unsigned long)dst->addr, - (void *)(unsigned long)src->addr, src->len); + /* Local delivery */ + dst = &((struct niobuf *)tmp2)[i]; + memcpy((void *)(unsigned long)dst->addr, + (void *)(unsigned long)src->addr, PAGE_SIZE); #endif - } - barrier(); - } else { + barrier(); + + out: + if (res != NULL) + OBD_FREE(res, sizeof(struct niobuf) * niocount); + if (bulk != NULL) + OBD_FREE(bulk, sizeof(*bulk)); + if (bulk_vec != NULL) { for (i = 0; i < niocount; i++) { - bulk = ptlrpc_prep_bulk(&req->rq_peer); - if (bulk == NULL) { - CERROR("cannot alloc bulk desc\n"); - rc = -ENOMEM; - goto out; - } - - src = &((struct niobuf *)tmp2)[i]; - - bulk->b_xid = src->xid; - bulk->b_buf = (void *)(unsigned long)src->addr; - bulk->b_buflen = PAGE_SIZE; - rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL); - if (rc) { - EXIT; - goto out; - } - wait_event_interruptible(bulk->b_waitq, - ptlrpc_check_bulk_sent(bulk)); - - if (bulk->b_flags == PTL_RPC_INTR) { - EXIT; - goto out; - } - - OBD_FREE(bulk, sizeof(*bulk)); - bulk = NULL; + if (bulk_vec[i] != NULL) + OBD_FREE(bulk_vec[i], sizeof(*bulk)); } + OBD_FREE(bulk_vec, + niocount * sizeof(struct ptlrpc_bulk_desc *)); + } + + EXIT; + return 0; +} + +int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req) +{ + struct ptlrpc_bulk_desc **bulk_vec = NULL; + struct ptlrpc_bulk_desc *bulk = NULL; + struct obd_conn conn; + int rc; + int i, j; + int objcount, niocount; + char *tmp1, *tmp2, *end2; + char *res; + int cmd; + struct niobuf *nb, *dst; + struct obd_ioobj *ioo; + struct ost_req *r = req->rq_req.ost; + + ENTRY; + + tmp1 = ost_req_buf1(r); + tmp2 = ost_req_buf2(r); + end2 = tmp2 + req->rq_req.ost->buflen2; + objcount = r->buflen1 / sizeof(*ioo); + niocount = r->buflen2 / sizeof(*nb); + cmd = r->cmd; + + conn.oc_id = req->rq_req.ost->connid; + conn.oc_dev = req->rq_obd->u.ost.ost_tgt; + + for (i = 0; i < objcount; i++) { + ost_unpack_ioo((void *)&tmp1, &ioo); + if (tmp2 + ioo->ioo_bufcnt > end2) { + rc = -EFAULT; + break; + } + for (j = 0; j < ioo->ioo_bufcnt; j++) { + ost_unpack_niobuf((void *)&tmp2, &nb); + } + } + + rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb), + &req->rq_rephdr, &req->rq_rep, + &req->rq_replen, &req->rq_repbuf); + if (rc) { + CERROR("cannot pack reply\n"); + return rc; + } + res = ost_rep_buf2(req->rq_rep.ost); + + /* The unpackers move tmp1 and tmp2, so reset them before using */ + tmp1 = ost_req_buf1(r); + tmp2 = ost_req_buf2(r); + req->rq_rep.ost->result = obd_preprw + (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, + niocount, (struct niobuf *)tmp2, (struct niobuf *)res); + + if (req->rq_rep.ost->result) { + EXIT; + goto out; + } + + /* Setup buffers for the incoming pages, then send the niobufs + * describing those buffers to the OSC. */ + OBD_ALLOC(bulk_vec, niocount * sizeof(struct ptlrpc_bulk_desc *)); + if (bulk_vec == NULL) { + CERROR("cannot alloc bulk desc vector\n"); + return -ENOMEM; + } + memset(bulk_vec, 0, niocount * sizeof(struct ptlrpc_bulk_desc *)); + + for (i = 0; i < niocount; i++) { + struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service; + + bulk_vec[i] = ptlrpc_prep_bulk(&req->rq_peer); + if (bulk_vec[i] == NULL) { + CERROR("cannot alloc bulk desc\n"); + rc = -ENOMEM; + goto out; + } + + spin_lock(&srv->srv_lock); + bulk_vec[i]->b_xid = srv->srv_xid++; + spin_unlock(&srv->srv_lock); + + dst = &((struct niobuf *)res)[i]; + dst->xid = HTON__u32(bulk_vec[i]->b_xid); + + bulk_vec[i]->b_buf = (void *)(unsigned long)dst->addr; + bulk_vec[i]->b_buflen = PAGE_SIZE; + bulk_vec[i]->b_portal = OSC_BULK_PORTAL; + rc = ptlrpc_register_bulk(bulk_vec[i]); + if (rc) + goto out; #if 0 /* Local delivery */ - dst = &((struct niobuf *)tmp2)[i]; - memcpy((void *)(unsigned long)dst->addr, - (void *)(unsigned long)src->addr, PAGE_SIZE); + src = &((struct niobuf *)tmp2)[i]; + memcpy((void *)(unsigned long)dst->addr, + (void *)(unsigned long)src->addr, src->len); #endif - barrier(); - } + } + barrier(); out: if (bulk != NULL) @@ -425,6 +494,17 @@ int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req) return 0; } +int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req) +{ + struct ost_req *r = req->rq_req.ost; + int cmd = r->cmd; + + if (cmd == OBD_BRW_READ) + return ost_brw_read(obddev, req); + else + return ost_brw_write(obddev, req); +} + int ost_brw_complete(struct ost_obd *obddev, struct ptlrpc_request *req) { struct obd_conn conn; -- 1.8.3.1