X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fost%2Fost_handler.c;h=92827527ef77faa61749bb35e01e3e89aa6114e1;hp=b81caf12e31199cd0854f00da40af0963edf756c;hb=400b0681017091fab9cef9bd00e0f536e1793dcc;hpb=ec417e23307b411a0a8bc9bf9f78c34d35e42071 diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index b81caf1..9282752 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -31,752 +31,692 @@ */ #define EXPORT_SYMTAB +#define DEBUG_SUBSYSTEM S_OST -#include #include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include -#define DEBUG_SUBSYSTEM S_OST +extern lprocfs_vars_t status_var_nm_1[]; +extern lprocfs_vars_t status_class_var[]; -#include -#include -#include -#include -#include -#include -#include +static int ost_destroy(struct ptlrpc_request *req) +{ + struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; + struct ost_body *body; + int rc, size = sizeof(*body); + ENTRY; + body = lustre_msg_buf(req->rq_reqmsg, 0); + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + RETURN(rc); -static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req) -{ - struct obd_conn conn; - int rc; + req->rq_status = obd_destroy(conn, &body->oa, NULL); + RETURN(0); +} - ENTRY; - - conn.oc_id = req->rq_req.ost->connid; - conn.oc_dev = ost->ost_tgt; +static int ost_getattr(struct ptlrpc_request *req) +{ + struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; + struct ost_body *body, *repbody; + int rc, size = sizeof(*body); + ENTRY; - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - CERROR("cannot pack reply\n"); - return rc; - } + body = lustre_msg_buf(req->rq_reqmsg, 0); - req->rq_rep.ost->result = obd_destroy(&conn, &req->rq_req.ost->oa); + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + RETURN(rc); - EXIT; - return 0; + repbody = lustre_msg_buf(req->rq_repmsg, 0); + /* FIXME: unpack only valid fields instead of memcpy, endianness */ + memcpy(&repbody->oa, &body->oa, sizeof(body->oa)); + req->rq_status = obd_getattr(conn, &repbody->oa, NULL); + RETURN(0); } -static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req) +static int ost_statfs(struct ptlrpc_request *req) { - struct obd_conn conn; - int rc; - - ENTRY; - - conn.oc_id = req->rq_req.ost->connid; - conn.oc_dev = ost->ost_tgt; - - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - CERROR("cannot pack reply\n"); - return rc; - } - req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id; - req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid; - - req->rq_rep.ost->result = obd_getattr(&conn, &req->rq_rep.ost->oa); - - EXIT; - return 0; -} + struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; + struct obd_statfs *osfs; + int rc, size = sizeof(*osfs); + ENTRY; -static int ost_open(struct ost_obd *ost, struct ptlrpc_request *req) -{ - struct obd_conn conn; - int rc; - - ENTRY; - - conn.oc_id = req->rq_req.ost->connid; - conn.oc_dev = ost->ost_tgt; - - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - CERROR("cannot pack reply\n"); - return rc; - } - req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id; - req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid; - - req->rq_rep.ost->result = obd_open(&conn, &req->rq_rep.ost->oa); - - EXIT; - return 0; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + RETURN(rc); + + osfs = lustre_msg_buf(req->rq_repmsg, 0); + memset(osfs, 0, size); + + rc = obd_statfs(conn, osfs); + if (rc) { + CERROR("ost: statfs failed: rc %d\n", rc); + req->rq_status = rc; + RETURN(rc); + } + obd_statfs_pack(osfs, osfs); + + RETURN(0); } -static int ost_close(struct ost_obd *ost, struct ptlrpc_request *req) +static int ost_open(struct ptlrpc_request *req) { - struct obd_conn conn; - int rc; - - ENTRY; - - conn.oc_id = req->rq_req.ost->connid; - conn.oc_dev = ost->ost_tgt; - - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - CERROR("cannot pack reply\n"); - return rc; - } - req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id; - req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid; - - req->rq_rep.ost->result = obd_close(&conn, &req->rq_rep.ost->oa); - - EXIT; - return 0; -} + struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; + struct ost_body *body, *repbody; + int rc, size = sizeof(*body); + ENTRY; + body = lustre_msg_buf(req->rq_reqmsg, 0); -static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req) -{ - struct obd_conn conn; - int rc; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + RETURN(rc); - ENTRY; - - conn.oc_id = req->rq_req.ost->connid; - conn.oc_dev = ost->ost_tgt; + repbody = lustre_msg_buf(req->rq_repmsg, 0); + /* FIXME: unpack only valid fields instead of memcpy, endianness */ + memcpy(&repbody->oa, &body->oa, sizeof(body->oa)); + req->rq_status = obd_open(conn, &repbody->oa, NULL); + RETURN(0); +} - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - CERROR("cannot pack reply\n"); - return rc; - } +static int ost_close(struct ptlrpc_request *req) +{ + struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; + struct ost_body *body, *repbody; + int rc, size = sizeof(*body); + ENTRY; - memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, - sizeof(req->rq_req.ost->oa)); + body = lustre_msg_buf(req->rq_reqmsg, 0); - req->rq_rep.ost->result =obd_create(&conn, &req->rq_rep.ost->oa); + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + RETURN(rc); - EXIT; - return 0; + repbody = lustre_msg_buf(req->rq_repmsg, 0); + /* FIXME: unpack only valid fields instead of memcpy, endianness */ + memcpy(&repbody->oa, &body->oa, sizeof(body->oa)); + req->rq_status = obd_close(conn, &repbody->oa, NULL); + RETURN(0); } -static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req) +static int ost_create(struct ptlrpc_request *req) { - struct obd_conn conn; - int rc; - - ENTRY; - - conn.oc_id = req->rq_req.ost->connid; - conn.oc_dev = ost->ost_tgt; - - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - CERROR("cannot pack reply\n"); - return rc; - } - - memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, - sizeof(req->rq_req.ost->oa)); - - req->rq_rep.ost->result = obd_punch(&conn, &req->rq_rep.ost->oa, - req->rq_rep.ost->oa.o_size, - req->rq_rep.ost->oa.o_blocks); - - EXIT; - return 0; -} + struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; + struct ost_body *body, *repbody; + int rc, size = sizeof(*body); + ENTRY; + body = lustre_msg_buf(req->rq_reqmsg, 0); -static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req) -{ - struct obd_conn conn; - int rc; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + RETURN(rc); - ENTRY; - - conn.oc_id = req->rq_req.ost->connid; - conn.oc_dev = ost->ost_tgt; + repbody = lustre_msg_buf(req->rq_repmsg, 0); + /* FIXME: unpack only valid fields instead of memcpy, endianness */ + memcpy(&repbody->oa, &body->oa, sizeof(body->oa)); + req->rq_status = obd_create(conn, &repbody->oa, NULL); + RETURN(0); +} - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - CERROR("cannot pack reply\n"); - return rc; - } +static int ost_punch(struct ptlrpc_request *req) +{ + struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; + struct ost_body *body, *repbody; + int rc, size = sizeof(*body); + ENTRY; - memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, - sizeof(req->rq_req.ost->oa)); + body = lustre_msg_buf(req->rq_reqmsg, 0); - req->rq_rep.ost->result = obd_setattr(&conn, &req->rq_rep.ost->oa); + if ((NTOH__u32(body->oa.o_valid) & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))!= + (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) + RETURN(-EINVAL); - EXIT; - return 0; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + RETURN(rc); + + repbody = lustre_msg_buf(req->rq_repmsg, 0); + /* FIXME: unpack only valid fields instead of memcpy, endianness */ + memcpy(&repbody->oa, &body->oa, sizeof(body->oa)); + req->rq_status = obd_punch(conn, &repbody->oa, NULL, + repbody->oa.o_size, repbody->oa.o_blocks); + RETURN(0); } -static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req) +static int ost_setattr(struct ptlrpc_request *req) { - struct obd_conn conn; - int rc; - - ENTRY; - - conn.oc_dev = ost->ost_tgt; - - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - CERROR("cannot pack reply\n"); - return rc; - } - - req->rq_rep.ost->result = obd_connect(&conn); - - CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repbuf, conn.oc_id); - req->rq_rep.ost->connid = conn.oc_id; - EXIT; - return 0; -} + struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; + struct ost_body *body, *repbody; + int rc, size = sizeof(*body); + ENTRY; -static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req) -{ - struct obd_conn conn; - int rc; - - ENTRY; - - conn.oc_dev = ost->ost_tgt; - conn.oc_id = req->rq_req.ost->connid; - - rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - CERROR("cannot pack reply\n"); - return rc; - } - CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id); - req->rq_rep.ost->result = obd_disconnect(&conn); - - EXIT; - return 0; + body = lustre_msg_buf(req->rq_reqmsg, 0); + + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + RETURN(rc); + + repbody = lustre_msg_buf(req->rq_repmsg, 0); + /* FIXME: unpack only valid fields instead of memcpy, endianness */ + memcpy(&repbody->oa, &body->oa, sizeof(body->oa)); + req->rq_status = obd_setattr(conn, &repbody->oa, NULL); + RETURN(0); } -static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req) +static int ost_bulk_timeout(void *data) { - struct obd_conn conn; - int rc; - int vallen; - void *val; - char *ptr; - - ENTRY; - - conn.oc_id = req->rq_req.ost->connid; - conn.oc_dev = ost->ost_tgt; - - ptr = ost_req_buf1(req->rq_req.ost); - req->rq_rep.ost->result = obd_get_info(&conn, - req->rq_req.ost->buflen1, ptr, - &vallen, &val); - - rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, - &req->rq_rep, &req->rq_replen, &req->rq_repbuf); - if (rc) { - CERROR("cannot pack reply\n"); - return rc; - } - - EXIT; - return 0; + struct ptlrpc_bulk_desc *desc = data; + + ENTRY; + CERROR("(not yet) starting recovery of client %p\n", desc->bd_client); + RETURN(1); } -static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req) +static int ost_brw_read(struct ptlrpc_request *req) { - struct ptlrpc_bulk_desc **bulk_vec = NULL; - struct ptlrpc_bulk_desc *bulk = NULL; - struct obd_conn conn; - int rc; - int i, j; - int objcount, niocount; - char *tmp1, *tmp2, *end2; - char *res = NULL; - int cmd; - struct niobuf *nb, *src, *dst; - struct obd_ioobj *ioo; - struct ost_req *r = req->rq_req.ost; - - ENTRY; - - tmp1 = ost_req_buf1(r); - tmp2 = ost_req_buf2(r); - end2 = tmp2 + req->rq_req.ost->buflen2; - objcount = r->buflen1 / sizeof(*ioo); - niocount = r->buflen2 / sizeof(*nb); - cmd = r->cmd; - - conn.oc_id = req->rq_req.ost->connid; - conn.oc_dev = req->rq_obd->u.ost.ost_tgt; + struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; + struct ptlrpc_bulk_desc *desc; + void *tmp1, *tmp2, *end2; + struct niobuf_remote *remote_nb; + struct niobuf_local *local_nb = NULL; + struct obd_ioobj *ioo; + struct ost_body *body; + struct l_wait_info lwi; + void *desc_priv = NULL; + int rc, cmd, i, j, objcount, niocount, size = sizeof(*body); + ENTRY; + + body = lustre_msg_buf(req->rq_reqmsg, 0); + tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); + tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); + end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2]; + objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo); + niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb); + cmd = OBD_BRW_READ; + + if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK)) + GOTO(out, rc = 0); for (i = 0; i < objcount; i++) { - ost_unpack_ioo((void *)&tmp1, &ioo); - if (tmp2 + ioo->ioo_bufcnt > end2) { - rc = -EFAULT; - break; - } - for (j = 0; j < ioo->ioo_bufcnt; j++) { - ost_unpack_niobuf((void *)&tmp2, &nb); - } - } - - rc = ost_pack_rep(NULL, 0, NULL, 0, - &req->rq_rephdr, &req->rq_rep, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - CERROR("cannot pack reply\n"); - return rc; - } - OBD_ALLOC(res, sizeof(struct niobuf) * niocount); - if (res == NULL) { - EXIT; - return -ENOMEM; + ost_unpack_ioo(&tmp1, &ioo); + if (tmp2 + ioo->ioo_bufcnt > end2) { + LBUG(); + GOTO(out, rc = -EFAULT); + } + for (j = 0; j < ioo->ioo_bufcnt; j++) + ost_unpack_niobuf(&tmp2, &remote_nb); } - /* The unpackers move tmp1 and tmp2, so reset them before using */ - tmp1 = ost_req_buf1(r); - tmp2 = ost_req_buf2(r); - req->rq_rep.ost->result = obd_preprw - (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, - niocount, (struct niobuf *)tmp2, (struct niobuf *)res); + OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount); + if (local_nb == NULL) + GOTO(out, rc = -ENOMEM); - if (req->rq_rep.ost->result) { - EXIT; - goto out; - } + /* The unpackers move tmp1 and tmp2, so reset them before using */ + ioo = lustre_msg_buf(req->rq_reqmsg, 1); + remote_nb = lustre_msg_buf(req->rq_reqmsg, 2); + req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount, + remote_nb, local_nb, &desc_priv); - for (i = 0; i < niocount; i++) { - bulk = ptlrpc_prep_bulk(&req->rq_peer); - if (bulk == NULL) { - CERROR("cannot alloc bulk desc\n"); - rc = -ENOMEM; - goto out; - } + if (req->rq_status) + GOTO(out, rc = 0); - src = &((struct niobuf *)res)[i]; - dst = &((struct niobuf *)tmp2)[i]; - bulk->b_xid = dst->xid; - bulk->b_buf = (void *)(unsigned long)src->addr; - bulk->b_buflen = PAGE_SIZE; - rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL); - if (rc) { - EXIT; - goto out; - } - wait_event_interruptible(bulk->b_waitq, - ptlrpc_check_bulk_sent(bulk)); + desc = ptlrpc_prep_bulk(req->rq_connection); + if (desc == NULL) + GOTO(out_local, rc = -ENOMEM); + desc->bd_portal = OST_BULK_PORTAL; - if (bulk->b_flags == PTL_RPC_INTR) { - EXIT; - goto out; - } + for (i = 0; i < niocount; i++) { + struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc); - OBD_FREE(bulk, sizeof(*bulk)); - bulk = NULL; + if (bulk == NULL) + GOTO(out_bulk, rc = -ENOMEM); + bulk->bp_xid = remote_nb[i].xid; + bulk->bp_buf = local_nb[i].addr; + bulk->bp_buflen = remote_nb[i].len; } -#if 0 - /* Local delivery */ - dst = &((struct niobuf *)tmp2)[i]; - memcpy((void *)(unsigned long)dst->addr, - (void *)(unsigned long)src->addr, PAGE_SIZE); -#endif - barrier(); - - /* The unpackers move tmp1 and tmp2, so reset them before using */ - tmp1 = ost_req_buf1(r); - tmp2 = ost_req_buf2(r); - req->rq_rep.ost->result = obd_commitrw - (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, - niocount, (struct niobuf *)res); - - out: - if (res != NULL) - OBD_FREE(res, sizeof(struct niobuf) * niocount); - if (bulk != NULL) - OBD_FREE(bulk, sizeof(*bulk)); - if (bulk_vec != NULL) { - for (i = 0; i < niocount; i++) { - if (bulk_vec[i] != NULL) - OBD_FREE(bulk_vec[i], sizeof(*bulk)); - } - OBD_FREE(bulk_vec, - niocount * sizeof(struct ptlrpc_bulk_desc *)); - } + rc = ptlrpc_send_bulk(desc); + if (rc) + GOTO(out_bulk, rc); - EXIT; - return 0; -} + lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc); + rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_SENT, &lwi); + if (rc) { + LASSERT(rc == -ETIMEDOUT); + GOTO(out_bulk, rc); + } -static int ost_commit_page(struct obd_conn *conn, struct page *page) -{ - struct obd_ioobj obj; - struct niobuf buf; - int rc; - ENTRY; + req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount, + local_nb, desc_priv); - memset(&buf, 0, sizeof(buf)); - memset(&obj, 0, sizeof(obj)); + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); - buf.page = page; - obj.ioo_bufcnt = 1; - - rc = obd_commitrw(OBD_BRW_WRITE, conn, 1, &obj, 1, &buf); - EXIT; - return rc; +out_bulk: + ptlrpc_free_bulk(desc); +out_local: + OBD_FREE(local_nb, sizeof(*local_nb) * niocount); +out: + if (rc) { + /* It's a lot of work to delay allocating the reply, and a lot + * less work to just free it here. */ + OBD_FREE(req->rq_repmsg, req->rq_replen); + req->rq_repmsg = NULL; + ptlrpc_error(req->rq_svc, req); + } else + ptlrpc_reply(req->rq_svc, req); + RETURN(rc); } -static int ost_brw_write_cb(struct ptlrpc_bulk_desc *bulk, void *data) +static int ost_brw_write(struct ptlrpc_request *req) { - int rc; - + struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; + struct ptlrpc_bulk_desc *desc; + struct niobuf_remote *remote_nb; + struct niobuf_local *local_nb, *lnb; + struct obd_ioobj *ioo; + struct ost_body *body; + int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)}; + void *tmp1, *tmp2, *end2; + void *desc_priv = NULL; + int reply_sent = 0; + struct ptlrpc_service *srv; + struct l_wait_info lwi; + __u32 xid; ENTRY; - rc = ost_commit_page(&bulk->b_conn, bulk->b_page); - if (rc) - CERROR("ost_commit_page failed: %d\n", rc); - - EXIT; - return rc; -} - -int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req) -{ - struct obd_conn conn; - int rc; - int i, j; - int objcount, niocount; - char *tmp1, *tmp2, *end2; - char *res; - int cmd; - struct niobuf *nb, *dst; - struct obd_ioobj *ioo; - struct ost_req *r = req->rq_req.ost; - - ENTRY; - - tmp1 = ost_req_buf1(r); - tmp2 = ost_req_buf2(r); - end2 = tmp2 + req->rq_req.ost->buflen2; - objcount = r->buflen1 / sizeof(*ioo); - niocount = r->buflen2 / sizeof(*nb); - cmd = r->cmd; - - conn.oc_id = req->rq_req.ost->connid; - conn.oc_dev = req->rq_obd->u.ost.ost_tgt; + body = lustre_msg_buf(req->rq_reqmsg, 0); + tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); + tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); + end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2]; + objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo); + niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb); + cmd = OBD_BRW_WRITE; for (i = 0; i < objcount; i++) { - ost_unpack_ioo((void *)&tmp1, &ioo); - if (tmp2 + ioo->ioo_bufcnt > end2) { - rc = -EFAULT; - break; - } - for (j = 0; j < ioo->ioo_bufcnt; j++) { - ost_unpack_niobuf((void *)&tmp2, &nb); - } - } - - rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb), - &req->rq_rephdr, &req->rq_rep, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - CERROR("cannot pack reply\n"); - return rc; - } - res = ost_rep_buf2(req->rq_rep.ost); - - /* The unpackers move tmp1 and tmp2, so reset them before using */ - tmp1 = ost_req_buf1(r); - tmp2 = ost_req_buf2(r); - req->rq_rep.ost->result = obd_preprw - (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, - niocount, (struct niobuf *)tmp2, (struct niobuf *)res); - - if (req->rq_rep.ost->result) { - EXIT; - goto out; - } - - for (i = 0; i < niocount; i++) { - struct ptlrpc_bulk_desc *bulk; - struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service; - - bulk = ptlrpc_prep_bulk(&req->rq_peer); - if (bulk == NULL) { - CERROR("cannot alloc bulk desc\n"); - rc = -ENOMEM; - goto out; + ost_unpack_ioo((void *)&tmp1, &ioo); + if (tmp2 + ioo->ioo_bufcnt > end2) { + rc = -EFAULT; + break; } + for (j = 0; j < ioo->ioo_bufcnt; j++) + ost_unpack_niobuf((void *)&tmp2, &remote_nb); + } - spin_lock(&srv->srv_lock); - bulk->b_xid = srv->srv_xid++; - spin_unlock(&srv->srv_lock); + size[1] = niocount * sizeof(*remote_nb); + rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + GOTO(out, rc); + remote_nb = lustre_msg_buf(req->rq_repmsg, 1); + + OBD_ALLOC(local_nb, niocount * sizeof(*local_nb)); + if (local_nb == NULL) + GOTO(out, rc = -ENOMEM); + + /* The unpackers move tmp1 and tmp2, so reset them before using */ + tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); + tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); + req->rq_status = obd_preprw(cmd, conn, objcount, tmp1, niocount, tmp2, + local_nb, &desc_priv); + if (req->rq_status) + GOTO(out_free, rc = 0); /* XXX is this correct? */ + + if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK)) + GOTO(fail_preprw, rc = 0); + + desc = ptlrpc_prep_bulk(req->rq_connection); + if (desc == NULL) + GOTO(fail_preprw, rc = -ENOMEM); + desc->bd_cb = NULL; + desc->bd_portal = OSC_BULK_PORTAL; + desc->bd_desc_private = desc_priv; + memcpy(&(desc->bd_conn), &conn, sizeof(conn)); + + srv = req->rq_obd->u.ost.ost_service; + spin_lock(&srv->srv_lock); + xid = srv->srv_xid++; /* single xid for all pages */ + spin_unlock(&srv->srv_lock); + + for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) { + struct ptlrpc_bulk_page *bulk; + + bulk = ptlrpc_prep_bulk_page(desc); + if (bulk == NULL) + GOTO(fail_bulk, rc = -ENOMEM); + + bulk->bp_xid = xid; /* single xid for all pages */ + + bulk->bp_buf = lnb->addr; + bulk->bp_page = lnb->page; + bulk->bp_flags = lnb->flags; + bulk->bp_dentry = lnb->dentry; + bulk->bp_buflen = lnb->len; + bulk->bp_cb = NULL; + + /* this advances remote_nb */ + ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0, + bulk->bp_xid); + } - dst = &((struct niobuf *)res)[i]; - dst->xid = HTON__u32(bulk->b_xid); + rc = ptlrpc_register_bulk(desc); + if (rc) + GOTO(fail_bulk, rc); + + reply_sent = 1; + ptlrpc_reply(req->rq_svc, req); + + lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc); + rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD, + &lwi); + if (rc) { + if (rc != -ETIMEDOUT) + LBUG(); + GOTO(fail_bulk, rc); + } - bulk->b_buf = (void *)(unsigned long)dst->addr; - bulk->b_cb = ost_brw_write_cb; - bulk->b_page = dst->page; - memcpy(&(bulk->b_conn), &conn, sizeof(conn)); - bulk->b_buflen = PAGE_SIZE; - bulk->b_portal = OSC_BULK_PORTAL; - rc = ptlrpc_register_bulk(bulk); - if (rc) - goto out; - -#if 0 - /* Local delivery */ - src = &((struct niobuf *)tmp2)[i]; - memcpy((void *)(unsigned long)dst->addr, - (void *)(unsigned long)src->addr, src->len); -#endif + rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb, + desc->bd_desc_private); + ptlrpc_free_bulk(desc); + EXIT; +out_free: + OBD_FREE(local_nb, niocount * sizeof(*local_nb)); +out: + if (!reply_sent) { + if (rc) { + OBD_FREE(req->rq_repmsg, req->rq_replen); + req->rq_repmsg = NULL; + ptlrpc_error(req->rq_svc, req); + } else + ptlrpc_reply(req->rq_svc, req); } - barrier(); + return rc; - out: - EXIT; - return 0; +fail_bulk: + ptlrpc_free_bulk(desc); +fail_preprw: + /* FIXME: how do we undo the preprw? */ + goto out_free; } -int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req) +static int ost_handle(struct ptlrpc_request *req) { - struct ost_req *r = req->rq_req.ost; - int cmd = r->cmd; + int rc; + ENTRY; - if (cmd == OBD_BRW_READ) - return ost_brw_read(obddev, req); - else - return ost_brw_write(obddev, req); -} + rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) { + CERROR("lustre_ost: Invalid request\n"); + GOTO(out, rc); + } -static int ost_handle(struct obd_device *obddev, - struct ptlrpc_service *svc, - struct ptlrpc_request *req) -{ - int rc; - struct ost_obd *ost = &obddev->u.ost; - struct ptlreq_hdr *hdr; - - ENTRY; - - hdr = (struct ptlreq_hdr *)req->rq_reqbuf; - if (NTOH__u32(hdr->type) != OST_TYPE_REQ) { - CERROR("lustre_ost: wrong packet type sent %d\n", - NTOH__u32(hdr->type)); - rc = -EINVAL; - goto out; - } - - rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, - &req->rq_reqhdr, &req->rq_req); - if (rc) { - CERROR("lustre_ost: Invalid request\n"); - EXIT; - goto out; - } - - switch (req->rq_reqhdr->opc) { - - case OST_CONNECT: - CDEBUG(D_INODE, "connect\n"); - rc = ost_connect(ost, req); - break; - case OST_DISCONNECT: - CDEBUG(D_INODE, "disconnect\n"); - rc = ost_disconnect(ost, req); - break; - case OST_GET_INFO: - CDEBUG(D_INODE, "get_info\n"); - rc = ost_get_info(ost, req); - break; - case OST_CREATE: - CDEBUG(D_INODE, "create\n"); - rc = ost_create(ost, req); - break; - case OST_DESTROY: - CDEBUG(D_INODE, "destroy\n"); - rc = ost_destroy(ost, req); - break; - case OST_GETATTR: - CDEBUG(D_INODE, "getattr\n"); - rc = ost_getattr(ost, req); - break; - case OST_SETATTR: - CDEBUG(D_INODE, "setattr\n"); - rc = ost_setattr(ost, req); - break; - case OST_OPEN: - CDEBUG(D_INODE, "setattr\n"); - rc = ost_open(ost, req); - break; - case OST_CLOSE: - CDEBUG(D_INODE, "setattr\n"); - rc = ost_close(ost, req); - break; - case OST_BRW: - CDEBUG(D_INODE, "brw\n"); - rc = ost_brw(ost, req); - break; - case OST_PUNCH: - CDEBUG(D_INODE, "punch\n"); - rc = ost_punch(ost, req); - break; - default: - req->rq_status = -ENOTSUPP; - return ptlrpc_error(obddev, svc, req); - } + if (req->rq_reqmsg->opc != OST_CONNECT && + req->rq_export == NULL) { + CERROR("lustre_ost: operation %d on unconnected OST\n", + req->rq_reqmsg->opc); + GOTO(out, rc = -ENOTCONN); + } + if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0) + GOTO(out, rc = -EINVAL); + + switch (req->rq_reqmsg->opc) { + case OST_CONNECT: + CDEBUG(D_INODE, "connect\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0); + rc = target_handle_connect(req); + break; + case OST_DISCONNECT: + CDEBUG(D_INODE, "disconnect\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0); + rc = target_handle_disconnect(req); + break; + case OST_CREATE: + CDEBUG(D_INODE, "create\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0); + rc = ost_create(req); + break; + case OST_DESTROY: + CDEBUG(D_INODE, "destroy\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0); + rc = ost_destroy(req); + break; + case OST_GETATTR: + CDEBUG(D_INODE, "getattr\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0); + rc = ost_getattr(req); + break; + case OST_SETATTR: + CDEBUG(D_INODE, "setattr\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0); + rc = ost_setattr(req); + break; + case OST_OPEN: + CDEBUG(D_INODE, "open\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0); + rc = ost_open(req); + break; + case OST_CLOSE: + CDEBUG(D_INODE, "close\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0); + rc = ost_close(req); + break; + case OST_WRITE: + CDEBUG(D_INODE, "write\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0); + rc = ost_brw_write(req); + /* ost_brw sends its own replies */ + RETURN(rc); + case OST_READ: + CDEBUG(D_INODE, "read\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0); + rc = ost_brw_read(req); + /* ost_brw sends its own replies */ + RETURN(rc); + case OST_PUNCH: + CDEBUG(D_INODE, "punch\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0); + rc = ost_punch(req); + break; + case OST_STATFS: + CDEBUG(D_INODE, "statfs\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0); + rc = ost_statfs(req); + break; + case LDLM_ENQUEUE: + CDEBUG(D_INODE, "enqueue\n"); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); + rc = ldlm_handle_enqueue(req); + if (rc) + break; + RETURN(0); + case LDLM_CONVERT: + CDEBUG(D_INODE, "convert\n"); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0); + rc = ldlm_handle_convert(req); + if (rc) + break; + RETURN(0); + case LDLM_CANCEL: + CDEBUG(D_INODE, "cancel\n"); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0); + rc = ldlm_handle_cancel(req); + if (rc) + break; + RETURN(0); + case LDLM_BL_CALLBACK: + case LDLM_CP_CALLBACK: + CDEBUG(D_INODE, "callback\n"); + CERROR("callbacks should not happen on OST\n"); + LBUG(); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0); + break; + default: + req->rq_status = -ENOTSUPP; + rc = ptlrpc_error(req->rq_svc, req); + RETURN(rc); + } + + EXIT; out: - req->rq_status = rc; - if (rc) { - CERROR("ost: processing error %d\n", rc); - ptlrpc_error(obddev, svc, req); - } else { - CDEBUG(D_INODE, "sending reply\n"); - ptlrpc_reply(obddev, svc, req); - } - - return 0; + //req->rq_status = rc; + if (rc) { + CERROR("ost: processing error (opcode=%d): %d\n", + req->rq_reqmsg->opc, rc); + ptlrpc_error(req->rq_svc, req); + } else { + CDEBUG(D_INODE, "sending reply\n"); + if (req->rq_repmsg == NULL) + CERROR("handler for opcode %d returned rc=0 without " + "creating rq_repmsg; needs to return rc != " + "0!\n", req->rq_reqmsg->opc); + ptlrpc_reply(req->rq_svc, req); + } + + return 0; } +#define OST_NUM_THREADS 6 /* mount the file system (secretly) */ -static int ost_setup(struct obd_device *obddev, obd_count len, - void *buf) - +static int ost_setup(struct obd_device *obddev, obd_count len, void *buf) { - struct obd_ioctl_data* data = buf; - struct ost_obd *ost = &obddev->u.ost; - struct obd_device *tgt; - int err; + struct obd_ioctl_data* data = buf; + struct ost_obd *ost = &obddev->u.ost; + struct obd_device *tgt; + int err; + int i; ENTRY; - if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES) { - EXIT; - return -ENODEV; - } + if (data->ioc_inllen1 < 1) { + CERROR("requires a TARGET OBD UUID\n"); + RETURN(-EINVAL); + } + if (data->ioc_inllen1 > 37) { + CERROR("OBD UUID must be less than 38 characters\n"); + RETURN(-EINVAL); + } - tgt = &obd_dev[data->ioc_dev]; - ost->ost_tgt = tgt; - if ( ! (tgt->obd_flags & OBD_ATTACHED) || - ! (tgt->obd_flags & OBD_SET_UP) ){ - CERROR("device not attached or not set up (%d)\n", + MOD_INC_USE_COUNT; + tgt = class_uuid2obd(data->ioc_inlbuf1); + if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) || + !(tgt->obd_flags & OBD_SET_UP)) { + CERROR("device not attached or not set up (%d)\n", data->ioc_dev); - EXIT; - return -EINVAL; - } - - ost->ost_conn.oc_dev = tgt; - err = obd_connect(&ost->ost_conn); - if (err) { - CERROR("fail to connect to device %d\n", data->ioc_dev); - return -EINVAL; - } - - ost->ost_service = ptlrpc_init_svc( 64 * 1024, - OST_REQUEST_PORTAL, - OSC_REPLY_PORTAL, - "self", - ost_unpack_req, - ost_pack_rep, - ost_handle); - if (!ost->ost_service) { - obd_disconnect(&ost->ost_conn); - return -EINVAL; + GOTO(error_dec, err = -EINVAL); } - - rpc_register_service(ost->ost_service, "self"); - err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost"); - if (err) { - obd_disconnect(&ost->ost_conn); - return -EINVAL; + err = obd_connect(&ost->ost_conn, tgt, NULL, NULL, NULL); + if (err) { + CERROR("fail to connect to device %d\n", data->ioc_dev); + GOTO(error_dec, err = -EINVAL); + } + + ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS, + OST_BUFSIZE, OST_MAXREQSIZE, + OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, + "self", ost_handle, "ost"); + if (!ost->ost_service) { + CERROR("failed to start service\n"); + GOTO(error_disc, err = -EINVAL); } - - MOD_INC_USE_COUNT; - EXIT; - return 0; -} + + for (i = 0; i < OST_NUM_THREADS; i++) { + char name[32]; + sprintf(name, "lustre_ost_%02d", i); + err = ptlrpc_start_thread(obddev, ost->ost_service, name); + if (err) { + CERROR("error starting thread #%d: rc %d\n", i, err); + GOTO(error_disc, err = -EINVAL); + } + } + + RETURN(0); + +error_disc: + obd_disconnect(&ost->ost_conn); +error_dec: + MOD_DEC_USE_COUNT; + RETURN(err); +} static int ost_cleanup(struct obd_device * obddev) { - struct ost_obd *ost = &obddev->u.ost; - int err; + struct ost_obd *ost = &obddev->u.ost; + int err; ENTRY; - if ( !list_empty(&obddev->obd_gen_clients) ) { + if ( !list_empty(&obddev->obd_exports) ) { CERROR("still has clients!\n"); - EXIT; - return -EBUSY; + RETURN(-EBUSY); } - ptlrpc_stop_thread(ost->ost_service); - rpc_unregister_service(ost->ost_service); - - if (!list_empty(&ost->ost_service->srv_reqs)) { - // XXX reply with errors and clean up - CERROR("Request list not empty!\n"); - } - OBD_FREE(ost->ost_service, sizeof(*ost->ost_service)); + ptlrpc_stop_all_threads(ost->ost_service); + ptlrpc_unregister_service(ost->ost_service); - err = obd_disconnect(&ost->ost_conn); - if (err) { - CERROR("lustre ost: fail to disconnect device\n"); - return -EINVAL; - } + err = obd_disconnect(&ost->ost_conn); + if (err) { + CERROR("lustre ost: fail to disconnect device\n"); + RETURN(-EINVAL); + } MOD_DEC_USE_COUNT; - EXIT; + RETURN(0); +} +int ost_attach(struct obd_device *dev, + obd_count len, void *data) +{ + /* lprocfs_reg_dev(dev, (lprocfs_group_t*)lprocfs_ptlrpc_nm, + sizeof(struct lprofiler_ptlrpc)); + */ + lprocfs_reg_obd(dev, (lprocfs_vars_t*)status_var_nm_1, (void*)dev); + return 0; +} + +int ost_detach(struct obd_device *dev) +{ + /* lprocfs_dereg_dev(dev); */ + lprocfs_dereg_obd(dev); return 0; + } + + /* use obd ops to offer management infrastructure */ static struct obd_ops ost_obd_ops = { + o_attach: ost_attach, + o_detach: ost_detach, o_setup: ost_setup, o_cleanup: ost_cleanup, }; static int __init ost_init(void) { - obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME); - return 0; + int rc; + + rc = class_register_type(&ost_obd_ops, + (lprocfs_vars_t*)status_class_var, + LUSTRE_OST_NAME); + if (rc) RETURN(rc); + + return 0; + } static void __exit ost_exit(void) { - obd_unregister_type(LUSTRE_OST_NAME); + + class_unregister_type(LUSTRE_OST_NAME); } -MODULE_AUTHOR("Peter J. Braam "); +MODULE_AUTHOR("Cluster File Systems, Inc. "); MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01"); MODULE_LICENSE("GPL");