X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=7b33e6c9e7b093e38ec44f5b5b998c498af1becc;hb=2ad1df3c62663ce61e82856ebe4f576f751d617d;hp=9fe2c3053094d87549ce0f07857a55125886a110;hpb=5a041e92e58f516516912a1dda4c3e8f4f624e15;p=fs%2Flustre-release.git diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 9fe2c30..7b33e6c 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -7,345 +7,346 @@ * See the file COPYING in this distribution * * Author Peter Braam - * + * * This server is single threaded at present (but can easily be multi * threaded). For testing and management it is treated as an * obd_device, although it does not export a full OBD method table * (the requests are coming in over the wire, so object target * modules do not have a full method table.) - * + * */ #define EXPORT_SYMTAB - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#include -#include -#include -#include -#include - #define DEBUG_SUBSYSTEM S_OSC -#include -#include -#include +#include +#include #include -struct ptlrpc_client *osc_con2cl(struct obd_conn *conn) +static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl, + struct ptlrpc_connection **connection) { - struct osc_obd *osc = &conn->oc_dev->u.osc; - return osc->osc_peer; + struct osc_obd *osc = &conn->oc_dev->u.osc; + *cl = osc->osc_client; + *connection = osc->osc_conn; } static int osc_connect(struct obd_conn *conn) { - struct ptlrpc_request *request; - struct ptlrpc_client *peer = osc_con2cl(conn); - int rc; - ENTRY; - - request = ptlrpc_prep_req(peer, OST_CONNECT, 0, NULL, 0, NULL); - if (!request) { - CERROR("cannot pack req!\n"); - return -ENOMEM; - } - - request->rq_replen = - sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - - rc = ptlrpc_queue_wait(peer, request); - if (rc) { - EXIT; - goto out; - } - - CDEBUG(D_INODE, "received connid %d\n", request->rq_rep.ost->connid); - - conn->oc_id = request->rq_rep.ost->connid; + struct ptlrpc_request *request; + struct ptlrpc_client *cl; + struct ptlrpc_connection *connection; + struct ost_body *body; + int rc, size = sizeof(*body); + ENTRY; + + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL); + if (!request) + RETURN(-ENOMEM); + + request->rq_replen = lustre_msg_size(1, &size); + + rc = ptlrpc_queue_wait(request); + if (rc) + GOTO(out, rc); + + body = lustre_msg_buf(request->rq_repmsg, 0); + CDEBUG(D_INODE, "received connid %d\n", body->connid); + + conn->oc_id = body->connid; + EXIT; out: - ptlrpc_free_req(request); - EXIT; - return rc; + ptlrpc_free_req(request); + return rc; } static int osc_disconnect(struct obd_conn *conn) { - struct ptlrpc_request *request; - struct ptlrpc_client *peer = osc_con2cl(conn); - int rc; - ENTRY; - - request = ptlrpc_prep_req(peer, OST_DISCONNECT, 0, NULL, 0, NULL); - if (!request) { - CERROR("cannot pack req!\n"); - return -ENOMEM; - } - request->rq_req.ost->connid = conn->oc_id; - request->rq_replen = - sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - - rc = ptlrpc_queue_wait(peer, request); - if (rc) { - EXIT; - goto out; - } + struct ptlrpc_request *request; + struct ptlrpc_client *cl; + struct ptlrpc_connection *connection; + struct ost_body *body; + int rc, size = sizeof(*body); + ENTRY; + + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL); + if (!request) + RETURN(-ENOMEM); + + body = lustre_msg_buf(request->rq_reqmsg, 0); + body->connid = conn->oc_id; + + request->rq_replen = lustre_msg_size(1, &size); + + rc = ptlrpc_queue_wait(request); + GOTO(out, rc); out: - ptlrpc_free_req(request); - EXIT; - return rc; + ptlrpc_free_req(request); + return rc; } - static int osc_getattr(struct obd_conn *conn, struct obdo *oa) { - struct ptlrpc_request *request; - struct ptlrpc_client *peer = osc_con2cl(conn); - int rc; - - request = ptlrpc_prep_req(peer, OST_GETATTR, 0, NULL, 0, NULL); - if (!request) { - CERROR("cannot pack req!\n"); - return -ENOMEM; - } - - memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa)); - request->rq_req.ost->connid = conn->oc_id; - request->rq_req.ost->oa.o_valid = ~0; - request->rq_replen = - sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - - rc = ptlrpc_queue_wait(peer, request); - if (rc) { - EXIT; - goto out; - } - - CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); - if (oa) { - memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa)); - } + struct ptlrpc_request *request; + struct ptlrpc_client *cl; + struct ptlrpc_connection *connection; + struct ost_body *body; + int rc, size = sizeof(*body); + ENTRY; + + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL); + if (!request) + RETURN(-ENOMEM); + + body = lustre_msg_buf(request->rq_reqmsg, 0); + memcpy(&body->oa, oa, sizeof(*oa)); + body->connid = conn->oc_id; + body->oa.o_valid = ~0; + + request->rq_replen = lustre_msg_size(1, &size); + rc = ptlrpc_queue_wait(request); + if (rc) + GOTO(out, rc); + + body = lustre_msg_buf(request->rq_repmsg, 0); + CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); + if (oa) + memcpy(oa, &body->oa, sizeof(*oa)); + + EXIT; out: - ptlrpc_free_req(request); - return 0; + ptlrpc_free_req(request); + return 0; } static int osc_open(struct obd_conn *conn, struct obdo *oa) { - struct ptlrpc_request *request; - struct ptlrpc_client *peer = osc_con2cl(conn); - int rc; - - request = ptlrpc_prep_req(peer, OST_OPEN, 0, NULL, 0, NULL); - if (!request) { - CERROR("cannot pack req!\n"); - return -ENOMEM; - } - - memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa)); - request->rq_req.ost->connid = conn->oc_id; - if (request->rq_req.ost->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID)) - BUG(); - request->rq_replen = - sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - - rc = ptlrpc_queue_wait(peer, request); - if (rc) { - EXIT; - goto out; - } - - CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); - if (oa) { - memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa)); - } + struct ptlrpc_request *request; + struct ptlrpc_client *cl; + struct ptlrpc_connection *connection; + struct ost_body *body; + int rc, size = sizeof(*body); + ENTRY; + + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL); + if (!request) + RETURN(-ENOMEM); + body = lustre_msg_buf(request->rq_reqmsg, 0); + memcpy(&body->oa, oa, sizeof(*oa)); + body->connid = conn->oc_id; + if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID)) + LBUG(); + + request->rq_replen = lustre_msg_size(1, &size); + + rc = ptlrpc_queue_wait(request); + if (rc) + GOTO(out, rc); + + body = lustre_msg_buf(request->rq_repmsg, 0); + CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); + if (oa) + memcpy(oa, &body->oa, sizeof(*oa)); + + EXIT; out: - ptlrpc_free_req(request); - return 0; + ptlrpc_free_req(request); + return 0; } static int osc_close(struct obd_conn *conn, struct obdo *oa) { - struct ptlrpc_request *request; - struct ptlrpc_client *peer = osc_con2cl(conn); - int rc; - - request = ptlrpc_prep_req(peer, OST_CLOSE, 0, NULL, 0, NULL); - if (!request) { - CERROR("cannot pack req!\n"); - return -ENOMEM; - } - - memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa)); - request->rq_req.ost->connid = conn->oc_id; - - request->rq_replen = - sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - - rc = ptlrpc_queue_wait(peer, request); - if (rc) { - EXIT; - goto out; - } - - CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); - if (oa) { - memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa)); - } + struct ptlrpc_request *request; + struct ptlrpc_client *cl; + struct ptlrpc_connection *connection; + struct ost_body *body; + int rc, size = sizeof(*body); + ENTRY; + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL); + if (!request) + RETURN(-ENOMEM); + + body = lustre_msg_buf(request->rq_reqmsg, 0); + memcpy(&body->oa, oa, sizeof(*oa)); + body->connid = conn->oc_id; + + request->rq_replen = lustre_msg_size(1, &size); + + rc = ptlrpc_queue_wait(request); + if (rc) + GOTO(out, rc); + + body = lustre_msg_buf(request->rq_repmsg, 0); + CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); + if (oa) + memcpy(oa, &body->oa, sizeof(*oa)); + + EXIT; out: - ptlrpc_free_req(request); - return 0; + ptlrpc_free_req(request); + return 0; } static int osc_setattr(struct obd_conn *conn, struct obdo *oa) { - struct ptlrpc_request *request; - struct ptlrpc_client *peer = osc_con2cl(conn); - int rc; - - request = ptlrpc_prep_req(peer, OST_SETATTR, 0, NULL, 0, NULL); - if (!request) { - CERROR("cannot pack req!\n"); - return -ENOMEM; - } - - memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa)); - request->rq_req.ost->connid = conn->oc_id; - request->rq_replen = - sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - - rc = ptlrpc_queue_wait(peer, request); - if (rc) { - EXIT; - goto out; - } + struct ptlrpc_request *request; + struct ptlrpc_client *cl; + struct ptlrpc_connection *connection; + struct ost_body *body; + int rc, size = sizeof(*body); + ENTRY; + + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL); + if (!request) + RETURN(-ENOMEM); + + body = lustre_msg_buf(request->rq_reqmsg, 0); + memcpy(&body->oa, oa, sizeof(*oa)); + body->connid = conn->oc_id; + + request->rq_replen = lustre_msg_size(1, &size); + + rc = ptlrpc_queue_wait(request); + GOTO(out, rc); out: - ptlrpc_free_req(request); - return 0; + ptlrpc_free_req(request); + return 0; } static int osc_create(struct obd_conn *conn, struct obdo *oa) { - struct ptlrpc_request *request; - struct ptlrpc_client *peer = osc_con2cl(conn); - int rc; - - if (!oa) { - CERROR("oa NULL\n"); - } - request = ptlrpc_prep_req(peer, OST_CREATE, 0, NULL, 0, NULL); - if (!request) { - CERROR("cannot pack req!\n"); - return -ENOMEM; - } - - memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa)); - request->rq_req.ost->connid = conn->oc_id; - request->rq_req.ost->oa.o_valid = ~0; - request->rq_replen = - sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - - rc = ptlrpc_queue_wait(peer, request); - if (rc) { - EXIT; - goto out; - } - memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa)); + struct ptlrpc_request *request; + struct ptlrpc_client *cl; + struct ptlrpc_connection *connection; + struct ost_body *body; + int rc, size = sizeof(*body); + ENTRY; + + if (!oa) { + CERROR("oa NULL\n"); + RETURN(-EINVAL); + } + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL); + if (!request) + RETURN(-ENOMEM); + + body = lustre_msg_buf(request->rq_reqmsg, 0); + memcpy(&body->oa, oa, sizeof(*oa)); + body->oa.o_valid = ~0; + body->connid = conn->oc_id; + + request->rq_replen = lustre_msg_size(1, &size); + rc = ptlrpc_queue_wait(request); + if (rc) + GOTO(out, rc); + + body = lustre_msg_buf(request->rq_repmsg, 0); + memcpy(oa, &body->oa, sizeof(*oa)); + + EXIT; out: - ptlrpc_free_req(request); - return 0; + ptlrpc_free_req(request); + return 0; } static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, obd_off offset) { - struct ptlrpc_request *request; - struct ptlrpc_client *peer = osc_con2cl(conn); - int rc; - - if (!oa) { - CERROR("oa NULL\n"); - } - request = ptlrpc_prep_req(peer, OST_PUNCH, 0, NULL, 0, NULL); - if (!request) { - CERROR("cannot pack req!\n"); - return -ENOMEM; - } - - memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa)); - request->rq_req.ost->connid = conn->oc_id; - request->rq_req.ost->oa.o_valid = ~0; - request->rq_req.ost->oa.o_size = offset; - request->rq_req.ost->oa.o_blocks = count; - request->rq_replen = - sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - - rc = ptlrpc_queue_wait(peer, request); - if (rc) { - EXIT; - goto out; - } - memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa)); + struct ptlrpc_request *request; + struct ptlrpc_client *cl; + struct ptlrpc_connection *connection; + struct ost_body *body; + int rc, size = sizeof(*body); + ENTRY; + + if (!oa) { + CERROR("oa NULL\n"); + RETURN(-EINVAL); + } + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL); + if (!request) + RETURN(-ENOMEM); + + body = lustre_msg_buf(request->rq_reqmsg, 0); + memcpy(&body->oa, oa, sizeof(*oa)); + body->connid = conn->oc_id; + body->oa.o_valid = ~0; + body->oa.o_size = offset; + body->oa.o_blocks = count; + + request->rq_replen = lustre_msg_size(1, &size); + + rc = ptlrpc_queue_wait(request); + if (rc) + GOTO(out, rc); + body = lustre_msg_buf(request->rq_repmsg, 0); + memcpy(oa, &body->oa, sizeof(*oa)); + + EXIT; out: - ptlrpc_free_req(request); - return 0; + ptlrpc_free_req(request); + return 0; } static int osc_destroy(struct obd_conn *conn, struct obdo *oa) { - struct ptlrpc_request *request; - struct ptlrpc_client *peer = osc_con2cl(conn); - int rc; - - if (!oa) { - CERROR("oa NULL\n"); - } - request = ptlrpc_prep_req(peer, OST_DESTROY, 0, NULL, 0, NULL); - if (!request) { - CERROR("cannot pack req!\n"); - return -ENOMEM; - } - - memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa)); - request->rq_req.ost->connid = conn->oc_id; - request->rq_req.ost->oa.o_valid = ~0; - request->rq_replen = - sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - - rc = ptlrpc_queue_wait(peer, request); - if (rc) { - EXIT; - goto out; - } - memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa)); + struct ptlrpc_request *request; + struct ptlrpc_client *cl; + struct ptlrpc_connection *connection; + struct ost_body *body; + int rc, size = sizeof(*body); + ENTRY; + + if (!oa) { + CERROR("oa NULL\n"); + RETURN(-EINVAL); + } + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL); + if (!request) + RETURN(-ENOMEM); + + body = lustre_msg_buf(request->rq_reqmsg, 0); + memcpy(&body->oa, oa, sizeof(*oa)); + body->connid = conn->oc_id; + body->oa.o_valid = ~0; + + request->rq_replen = lustre_msg_size(1, &size); + + rc = ptlrpc_queue_wait(request); + if (rc) + GOTO(out, rc); + + body = lustre_msg_buf(request->rq_repmsg, 0); + memcpy(oa, &body->oa, sizeof(*oa)); + EXIT; out: - ptlrpc_free_req(request); - return 0; + ptlrpc_free_req(request); + return 0; } int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req, struct niobuf *dst, struct niobuf *src) { - struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_client *cl; + struct ptlrpc_connection *connection; + + osc_con2cl(conn, &cl, &connection); if (cl->cli_obd) { /* local sendpage */ @@ -355,29 +356,29 @@ int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req, struct ptlrpc_bulk_desc *bulk; int rc; - bulk = ptlrpc_prep_bulk(&cl->cli_server); + bulk = ptlrpc_prep_bulk(connection); if (bulk == NULL) - return -ENOMEM; + RETURN(-ENOMEM); bulk->b_buf = (void *)(unsigned long)src->addr; bulk->b_buflen = src->len; bulk->b_xid = dst->xid; - rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL); + rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL); if (rc != 0) { CERROR("send_bulk failed: %d\n", rc); - BUG(); - return rc; + ptlrpc_free_bulk(bulk); + LBUG(); + RETURN(rc); } wait_event_interruptible(bulk->b_waitq, ptlrpc_check_bulk_sent(bulk)); - if (bulk->b_flags == PTL_RPC_INTR) { - EXIT; - /* FIXME: hey hey, we leak here. */ - return -EINTR; + if (bulk->b_flags & PTL_RPC_FL_INTR) { + ptlrpc_free_bulk(bulk); + RETURN(-EINTR); } - OBD_FREE(bulk, sizeof(*bulk)); + ptlrpc_free_bulk(bulk); } return 0; @@ -387,85 +388,79 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, obd_count *oa_bufs, struct page **buf, obd_size *count, obd_off *offset, obd_flag *flags) { - struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_client *cl; + struct ptlrpc_connection *connection; struct ptlrpc_request *request; - int pages; - int rc; - struct obd_ioobj ioo; - struct niobuf src; - int size1, size2 = 0; - void *ptr1, *ptr2; - int i, j, n; + struct ost_body *body; + struct obd_ioobj ioo; + struct niobuf src; + int pages, rc, i, j, size[3] = {sizeof(*body)}; + void *ptr1, *ptr2; struct ptlrpc_bulk_desc **bulk; + ENTRY; - size1 = num_oa * sizeof(ioo); + size[1] = num_oa * sizeof(ioo); pages = 0; for (i = 0; i < num_oa; i++) pages += oa_bufs[i]; - size2 = pages * sizeof(src); - - request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL); - if (!request) { - CERROR("cannot pack req!\n"); - return -ENOMEM; - } - request->rq_req.ost->cmd = OBD_BRW_READ; - - OBD_ALLOC(bulk, pages * sizeof(struct ptlrpc_bulk_desc *)); - if (bulk == NULL) { - CERROR("cannot alloc bulk desc vector\n"); - return -ENOMEM; - } - memset(bulk, 0, pages * sizeof(struct ptlrpc_bulk_desc *)); - - n = 0; - ptr1 = ost_req_buf1(request->rq_req.ost); - ptr2 = ost_req_buf2(request->rq_req.ost); - for (i = 0; i < num_oa; i++) { - ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); - for (j = 0; j < oa_bufs[i]; j++) { - bulk[n] = ptlrpc_prep_bulk(&cl->cli_server); - if (bulk[n] == NULL) { - CERROR("cannot alloc bulk desc\n"); - rc = -ENOMEM; - goto out; - } - - spin_lock(&cl->cli_lock); - bulk[n]->b_xid = cl->cli_xid++; - spin_unlock(&cl->cli_lock); - bulk[n]->b_buf = kmap(buf[n]); - bulk[n]->b_buflen = PAGE_SIZE; - bulk[n]->b_portal = OST_BULK_PORTAL; - ost_pack_niobuf(&ptr2, bulk[n]->b_buf, offset[n], - count[n], flags[n], bulk[n]->b_xid); - - rc = ptlrpc_register_bulk(bulk[n]); + size[2] = pages * sizeof(src); + + OBD_ALLOC(bulk, pages * sizeof(*bulk)); + if (bulk == NULL) + RETURN(-ENOMEM); + + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL); + if (!request) + GOTO(out, rc = -ENOMEM); + + body = lustre_msg_buf(request->rq_reqmsg, 0); + body->data = OBD_BRW_READ; + + ptr1 = lustre_msg_buf(request->rq_reqmsg, 1); + ptr2 = lustre_msg_buf(request->rq_reqmsg, 2); + for (pages = 0, i = 0; i < num_oa; i++) { + ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); + for (j = 0; j < oa_bufs[i]; j++, pages++) { + bulk[pages] = ptlrpc_prep_bulk(connection); + if (bulk[pages] == NULL) + GOTO(out, rc = -ENOMEM); + + spin_lock(&connection->c_lock); + bulk[pages]->b_xid = ++connection->c_xid_out; + spin_unlock(&connection->c_lock); + + bulk[pages]->b_buf = kmap(buf[pages]); + bulk[pages]->b_buflen = PAGE_SIZE; + bulk[pages]->b_portal = OST_BULK_PORTAL; + ost_pack_niobuf(&ptr2, bulk[pages]->b_buf, + offset[pages], count[pages], + flags[pages], bulk[pages]->b_xid); + + rc = ptlrpc_register_bulk(bulk[pages]); if (rc) - goto out; - n++; + GOTO(out, rc); } } - request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep); - rc = ptlrpc_queue_wait(cl, request); + request->rq_replen = lustre_msg_size(1, size); + rc = ptlrpc_queue_wait(request); + GOTO(out, rc); out: /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to * abort those bulk listeners. */ - n = 0; - for (i = 0; i < num_oa; i++) { - for (j = 0; j < oa_bufs[i]; j++) { - if (bulk[n] == NULL) + for (pages = 0, i = 0; i < num_oa; i++) { + for (j = 0; j < oa_bufs[i]; j++, pages++) { + if (bulk[pages] == NULL) continue; - kunmap(buf[n]); - OBD_FREE(bulk[n], sizeof(struct ptlrpc_bulk_desc)); - n++; + kunmap(buf[pages]); + ptlrpc_free_bulk(bulk[pages]); } } - OBD_FREE(bulk, pages * sizeof(struct ptlrpc_bulk_desc *)); + OBD_FREE(bulk, pages * sizeof(*bulk)); ptlrpc_free_req(request); return rc; } @@ -474,165 +469,167 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, obd_count *oa_bufs, struct page **buf, obd_size *count, obd_off *offset, obd_flag *flags) { - struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_client *cl; + struct ptlrpc_connection *connection; struct ptlrpc_request *request; - struct obd_ioobj ioo; - struct niobuf *src; - int pages, rc, i, j, n, size1, size2 = 0; - void *ptr1, *ptr2; + struct obd_ioobj ioo; + struct ost_body *body; + struct niobuf *src; + int pages, rc, i, j, size[3] = {sizeof(*body)}; + void *ptr1, *ptr2; + ENTRY; - size1 = num_oa * sizeof(ioo); + size[1] = num_oa * sizeof(ioo); pages = 0; for (i = 0; i < num_oa; i++) pages += oa_bufs[i]; - size2 = pages * sizeof(*src); + size[2] = pages * sizeof(*src); + + OBD_ALLOC(src, size[2]); + if (!src) + RETURN(-ENOMEM); - OBD_ALLOC(src, size2); - if (!src) { - CERROR("no src memory\n"); - return -ENOMEM; + osc_con2cl(conn, &cl, &connection); + request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL); + if (!request) + RETURN(-ENOMEM); + body = lustre_msg_buf(request->rq_reqmsg, 0); + body->data = OBD_BRW_WRITE; + + ptr1 = lustre_msg_buf(request->rq_reqmsg, 1); + ptr2 = lustre_msg_buf(request->rq_reqmsg, 2); + for (pages = 0, i = 0; i < num_oa; i++) { + ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); + for (j = 0; j < oa_bufs[i]; j++, pages++) { + ost_pack_niobuf(&ptr2, kmap(buf[pages]), offset[pages], + count[pages], flags[pages], 0); + } } - memset((char *)src, 0, size2); - - request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL); - if (!request) { - CERROR("cannot pack req!\n"); - return -ENOMEM; - } - request->rq_req.ost->cmd = OBD_BRW_WRITE; - - n = 0; - ptr1 = ost_req_buf1(request->rq_req.ost); - ptr2 = ost_req_buf2(request->rq_req.ost); - for (i = 0; i < num_oa; i++) { - ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); - for (j = 0; j < oa_bufs[i]; j++) { - ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n], - count[n], flags[n], 0); - n++; - } - } - memcpy((char *)src, (char *)ost_req_buf2(request->rq_req.ost), size2); - - request->rq_replen = sizeof(struct ptlrep_hdr) + - sizeof(struct ost_rep) + pages * sizeof(struct niobuf); - rc = ptlrpc_queue_wait(cl, request); - if (rc) { - EXIT; - goto out; - } - - ptr2 = ost_rep_buf2(request->rq_rep.ost); - if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) { + memcpy(src, lustre_msg_buf(request->rq_reqmsg, 2), size[2]); + + size[1] = pages * sizeof(struct niobuf); + request->rq_replen = lustre_msg_size(2, size); + + rc = ptlrpc_queue_wait(request); + if (rc) + GOTO(out, rc); + + ptr2 = lustre_msg_buf(request->rq_repmsg, 1); + if (ptr2 == NULL) + GOTO(out, rc = -EINVAL); + + if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) { CERROR("buffer length wrong (%d vs. %d)\n", - request->rq_rep.ost->buflen2, n * sizeof(struct niobuf)); - EXIT; - goto out; + request->rq_repmsg->buflens[1], + pages * sizeof(struct niobuf)); + GOTO(out, rc = -EINVAL); } - n = 0; - for (i = 0; i < num_oa; i++) { - for (j = 0; j < oa_bufs[i]; j++) { - struct niobuf *dst; - ost_unpack_niobuf(&ptr2, &dst); - osc_sendpage(conn, request, dst, &src[n]); - n++; - } - } - OBD_FREE(src, size2); + for (pages = 0, i = 0; i < num_oa; i++) { + for (j = 0; j < oa_bufs[i]; j++, pages++) { + struct niobuf *dst; + ost_unpack_niobuf(&ptr2, &dst); + osc_sendpage(conn, request, dst, &src[pages]); + } + } + OBD_FREE(src, size[2]); out: - n = 0; - for (i = 0; i < num_oa; i++) { - for (j = 0; j < oa_bufs[i]; j++) { - kunmap(buf[n]); - n++; - } - } - - ptlrpc_free_req(request); - return 0; + for (pages = 0, i = 0; i < num_oa; i++) + for (j = 0; j < oa_bufs[i]; j++, pages++) + kunmap(buf[pages]); + + ptlrpc_free_req(request); + return 0; } int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa, - struct obdo **oa, obd_count *oa_bufs, struct page **buf, - obd_size *count, obd_off *offset, obd_flag *flags) + struct obdo **oa, obd_count *oa_bufs, struct page **buf, + obd_size *count, obd_off *offset, obd_flag *flags) { - if (rw == OBD_BRW_READ) { + if (rw == OBD_BRW_READ) return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count, offset, flags); - } else { + else return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count, offset, flags); - } } -/* mount the file system (secretly) */ -static int osc_setup(struct obd_device *obddev, obd_count len, - void *buf) - +static int osc_setup(struct obd_device *obddev, obd_count len, void *buf) { - struct osc_obd *osc = &obddev->u.osc; - struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf; - int rc; - int dev = data->ioc_dev; + struct osc_obd *osc = &obddev->u.osc; + int rc; ENTRY; - OBD_ALLOC(osc->osc_peer, sizeof(*osc->osc_peer)); - if (osc->osc_peer == NULL) - RETURN(-ENOMEM); + osc->osc_conn = ptlrpc_uuid_to_connection("ost"); + if (!osc->osc_conn) + RETURN(-EINVAL); + + OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client)); + if (osc->osc_client == NULL) + GOTO(out_conn, rc = -ENOMEM); - rc = ptlrpc_connect_client(dev, "ost", - OST_REQUEST_PORTAL, - OSC_REPLY_PORTAL, - ost_pack_req, - ost_unpack_rep, - osc->osc_peer); + OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client)); + if (osc->osc_ldlm_client == NULL) + GOTO(out_client, rc = -ENOMEM); - if (rc == 0) - MOD_INC_USE_COUNT; - RETURN(rc); -} + ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, + osc->osc_client); + ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, + osc->osc_ldlm_client); + + MOD_INC_USE_COUNT; + RETURN(0); + + out_client: + OBD_FREE(osc->osc_client, sizeof(*osc->osc_client)); + out_conn: + ptlrpc_put_connection(osc->osc_conn); + return rc; +} static int osc_cleanup(struct obd_device * obddev) { - struct osc_obd *osc = &obddev->u.osc; + struct osc_obd *osc = &obddev->u.osc; - if (osc->osc_peer != NULL) - OBD_FREE(osc->osc_peer, sizeof(*osc->osc_peer)); + ptlrpc_cleanup_client(osc->osc_client); + OBD_FREE(osc->osc_client, sizeof(*osc->osc_client)); + ptlrpc_cleanup_client(osc->osc_ldlm_client); + OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client)); + ptlrpc_put_connection(osc->osc_conn); MOD_DEC_USE_COUNT; return 0; } -struct obd_ops osc_obd_ops = { - o_setup: osc_setup, - o_cleanup: osc_cleanup, - o_create: osc_create, - o_destroy: osc_destroy, - o_getattr: osc_getattr, - o_setattr: osc_setattr, - o_open: osc_open, - o_close: osc_close, - o_connect: osc_connect, - o_disconnect: osc_disconnect, - o_brw: osc_brw, - o_punch: osc_punch +struct obd_ops osc_obd_ops = { + o_setup: osc_setup, + o_cleanup: osc_cleanup, + o_create: osc_create, + o_destroy: osc_destroy, + o_getattr: osc_getattr, + o_setattr: osc_setattr, + o_open: osc_open, + o_close: osc_close, + o_connect: osc_connect, + o_disconnect: osc_disconnect, + o_brw: osc_brw, + o_punch: osc_punch }; static int __init osc_init(void) { obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME); - return 0; + return 0; } static void __exit osc_exit(void) { - obd_unregister_type(LUSTRE_OSC_NAME); + obd_unregister_type(LUSTRE_OSC_NAME); } MODULE_AUTHOR("Peter J. Braam "); MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0"); -MODULE_LICENSE("GPL"); +MODULE_LICENSE("GPL"); module_init(osc_init); module_exit(osc_exit);