/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ * Copyright (C) 2001-2003 Cluster File Systems, Inc.
* Author: Peter J. Braam <braam@clusterfs.com>
* Author: Phil Schwan <phil@clusterfs.com>
*
#include <linux/init.h>
#include <linux/lprocfs_status.h>
-extern struct lprocfs_vars status_var_nm_1[];
-extern struct lprocfs_vars status_class_var[];
-static int ost_destroy(struct ptlrpc_request *req)
+static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
{
struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ost_body *body;
if (rc)
RETURN(rc);
- req->rq_status = obd_destroy(conn, &body->oa, NULL);
+ req->rq_status = obd_destroy(conn, &body->oa, NULL, oti);
RETURN(0);
}
RETURN(0);
}
-static int ost_open(struct ptlrpc_request *req)
+static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti)
{
struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ost_body *body, *repbody;
repbody = lustre_msg_buf(req->rq_repmsg, 0);
/* FIXME: unpack only valid fields instead of memcpy, endianness */
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
- req->rq_status = obd_open(conn, &repbody->oa, NULL);
+ req->rq_status = obd_open(conn, &repbody->oa, NULL, oti);
RETURN(0);
}
-static int ost_close(struct ptlrpc_request *req)
+static int ost_close(struct ptlrpc_request *req, struct obd_trans_info *oti)
{
struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ost_body *body, *repbody;
repbody = lustre_msg_buf(req->rq_repmsg, 0);
/* FIXME: unpack only valid fields instead of memcpy, endianness */
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
- req->rq_status = obd_close(conn, &repbody->oa, NULL);
+ req->rq_status = obd_close(conn, &repbody->oa, NULL, oti);
RETURN(0);
}
-static int ost_create(struct ptlrpc_request *req)
+static int ost_create(struct ptlrpc_request *req, struct obd_trans_info *oti)
{
struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ost_body *body, *repbody;
repbody = lustre_msg_buf(req->rq_repmsg, 0);
/* FIXME: unpack only valid fields instead of memcpy, endianness */
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
- req->rq_status = obd_create(conn, &repbody->oa, NULL);
+ req->rq_status = obd_create(conn, &repbody->oa, NULL, oti);
RETURN(0);
}
-static int ost_punch(struct ptlrpc_request *req)
+static int ost_punch(struct ptlrpc_request *req, struct obd_trans_info *oti)
{
struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ost_body *body, *repbody;
/* FIXME: unpack only valid fields instead of memcpy, endianness */
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
req->rq_status = obd_punch(conn, &repbody->oa, NULL,
- repbody->oa.o_size, repbody->oa.o_blocks);
+ repbody->oa.o_size, repbody->oa.o_blocks, oti);
RETURN(0);
}
-static int ost_setattr(struct ptlrpc_request *req)
+static int ost_setattr(struct ptlrpc_request *req, struct obd_trans_info *oti)
{
struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ost_body *body, *repbody;
repbody = lustre_msg_buf(req->rq_repmsg, 0);
/* FIXME: unpack only valid fields instead of memcpy, endianness */
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
- req->rq_status = obd_setattr(conn, &repbody->oa, NULL);
+ req->rq_status = obd_setattr(conn, &repbody->oa, NULL, oti);
RETURN(0);
}
struct ost_body *body;
struct l_wait_info lwi;
void *desc_priv = NULL;
- int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
+ int cmd, i, j, objcount, niocount, size = sizeof(*body);
+ int rc = 0;
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
cmd = OBD_BRW_READ;
if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
- GOTO(out, rc = 0);
+ GOTO(out, req->rq_status = -EIO);
for (i = 0; i < objcount; i++) {
ost_unpack_ioo(&tmp1, &ioo);
LBUG();
GOTO(out, rc = -EFAULT);
}
- for (j = 0; j < ioo->ioo_bufcnt; j++)
+ for (j = 0; j < ioo->ioo_bufcnt; j++) {
+ /* XXX verify niobuf[j].offset > niobuf[j-1].offset */
ost_unpack_niobuf(&tmp2, &remote_nb);
+ }
}
OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
ioo = lustre_msg_buf(req->rq_reqmsg, 1);
remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
- remote_nb, local_nb, &desc_priv);
+ remote_nb, local_nb, &desc_priv, NULL);
if (req->rq_status)
- GOTO(out, rc = 0);
+ GOTO(out, req->rq_status);
desc = ptlrpc_prep_bulk(req->rq_connection);
if (desc == NULL)
bulk->bp_buflen = remote_nb[i].len;
}
- rc = ptlrpc_send_bulk(desc);
+ rc = ptlrpc_bulk_put(desc);
if (rc)
GOTO(out_bulk, rc);
}
req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
- local_nb, desc_priv);
-
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+ local_nb, desc_priv, NULL);
out_bulk:
ptlrpc_bulk_decref(desc);
out_local:
OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
out:
+ if (!rc)
+ /* Hmm, we don't return anything in this reply buffer?
+ * We should be returning per-page status codes and also
+ * per-object size, blocks count, mtime, ctime. (bug 593) */
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
+ &req->rq_repmsg);
if (rc)
ptlrpc_error(req->rq_svc, req);
else
RETURN(rc);
}
-static int ost_brw_write(struct ptlrpc_request *req)
+static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
{
struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ptlrpc_bulk_desc *desc;
void *tmp2, *end2;
struct niobuf_remote *remote_nb;
struct niobuf_local *local_nb = NULL;
- struct niobuf_local *lnb;
struct obd_ioobj *ioo;
struct ost_body *body;
struct l_wait_info lwi;
- int rc, cmd, i, j, objcount, niocount;
- int size[2] = {sizeof(*body)};
void *desc_priv = NULL;
- int reply_sent = 0;
- struct ptlrpc_service *srv;
- __u32 xid;
+ int cmd, i, j, objcount, niocount, size = sizeof(*body);
+ int rc = 0;
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
cmd = OBD_BRW_WRITE;
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
+ GOTO(out, req->rq_status = -EIO);
+
for (i = 0; i < objcount; i++) {
- ost_unpack_ioo((void *)&tmp1, &ioo);
+ ost_unpack_ioo(&tmp1, &ioo);
if (tmp2 + ioo->ioo_bufcnt > end2) {
- rc = -EFAULT;
- break;
+ LBUG();
+ GOTO(out, rc = -EFAULT);
+ }
+ for (j = 0; j < ioo->ioo_bufcnt; j++) {
+ /* XXX verify niobuf[j].offset > niobuf[j-1].offset */
+ ost_unpack_niobuf(&tmp2, &remote_nb);
}
- for (j = 0; j < ioo->ioo_bufcnt; j++)
- ost_unpack_niobuf((void *)&tmp2, &remote_nb);
}
- size[1] = niocount * sizeof(*remote_nb);
- rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
- if (rc)
- GOTO(out, rc);
- remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
-
- OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
+ OBD_ALLOC(local_nb, sizeof(*local_nb)* niocount);
if (local_nb == NULL)
GOTO(out, rc = -ENOMEM);
/* The unpackers move tmp1 and tmp2, so reset them before using */
- tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
- tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
- req->rq_status = obd_preprw(cmd, conn, objcount, tmp1, niocount, tmp2,
- local_nb, &desc_priv);
- if (req->rq_status)
- GOTO(out_free, rc = 0); /* XXX is this correct? */
+ ioo = lustre_msg_buf(req->rq_reqmsg, 1);
+ remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
+ req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
+ remote_nb, local_nb, &desc_priv, oti);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
- GOTO(fail_preprw, rc = 0);
+ if (req->rq_status)
+ GOTO(out, rc = 0);
desc = ptlrpc_prep_bulk(req->rq_connection);
if (desc == NULL)
- GOTO(fail_preprw, rc = -ENOMEM);
+ GOTO(out_local, rc = -ENOMEM);
desc->bd_ptl_ev_hdlr = NULL;
desc->bd_portal = OSC_BULK_PORTAL;
- desc->bd_desc_private = desc_priv;
- memcpy(&(desc->bd_conn), &conn, sizeof(conn));
-
- srv = req->rq_obd->u.ost.ost_service;
- spin_lock(&srv->srv_lock);
- xid = srv->srv_xid++; /* single xid for all pages */
- spin_unlock(&srv->srv_lock);
- for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
- struct ptlrpc_bulk_page *bulk;
+ for (i = 0; i < niocount; i++) {
+ struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
- bulk = ptlrpc_prep_bulk_page(desc);
if (bulk == NULL)
- GOTO(fail_bulk, rc = -ENOMEM);
-
- bulk->bp_xid = xid; /* single xid for all pages */
-
- bulk->bp_buf = lnb->addr;
- bulk->bp_page = lnb->page;
- bulk->bp_flags = lnb->flags;
- bulk->bp_dentry = lnb->dentry;
- bulk->bp_buflen = lnb->len;
- bulk->bp_cb = NULL;
-
- /* this advances remote_nb */
- ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
- bulk->bp_xid);
+ GOTO(out_bulk, rc = -ENOMEM);
+ bulk->bp_xid = remote_nb[i].xid;
+ bulk->bp_buf = local_nb[i].addr;
+ bulk->bp_buflen = remote_nb[i].len;
}
- rc = ptlrpc_register_bulk(desc);
+ rc = ptlrpc_bulk_get(desc);
if (rc)
- GOTO(fail_bulk, rc);
-
- reply_sent = 1;
- ptlrpc_reply(req->rq_svc, req);
+ GOTO(out_bulk, rc);
lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD,
&lwi);
if (rc) {
- if (rc != -ETIMEDOUT)
- LBUG();
+ LASSERT(rc == -ETIMEDOUT);
ptlrpc_abort_bulk(desc);
recovd_conn_fail(desc->bd_connection);
- obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
- desc->bd_desc_private);
- } else {
- rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
- desc->bd_desc_private);
+ obd_commitrw(cmd, conn, objcount, ioo, niocount, local_nb,
+ desc_priv, oti);
+ GOTO(out_bulk, rc);
}
+ req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
+ local_nb, desc_priv, oti);
+
+ out_bulk:
ptlrpc_bulk_decref(desc);
- EXIT;
-out_free:
- OBD_FREE(local_nb, niocount * sizeof(*local_nb));
-out:
- if (!reply_sent) {
- if (rc) {
- OBD_FREE(req->rq_repmsg, req->rq_replen);
- req->rq_repmsg = NULL;
- ptlrpc_error(req->rq_svc, req);
- } else
- ptlrpc_reply(req->rq_svc, req);
- }
- return rc;
+ out_local:
+ OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
+ out:
+ if (!rc)
+ /* Hmm, we don't return anything in this reply buffer?
+ * We should be returning per-page status codes and also
+ * per-object size, blocks count, mtime, ctime. (bug 593) */
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
+ &req->rq_repmsg);
+ if (rc)
+ ptlrpc_error(req->rq_svc, req);
+ else
+ rc = ptlrpc_reply(req->rq_svc, req);
+ RETURN(rc);
+}
-fail_bulk:
- ptlrpc_free_bulk(desc);
-fail_preprw:
- /* FIXME: how do we undo the preprw? - answer = call commitrw */
- goto out_free;
+inline void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
+{
+ if (oti && req->rq_repmsg)
+ req->rq_repmsg->transno = HTON__u64(oti->oti_transno);
+ EXIT;
}
static int ost_handle(struct ptlrpc_request *req)
{
+ struct obd_trans_info trans_info = { 0, }, *oti = &trans_info;
int rc;
ENTRY;
GOTO(out, rc);
}
- if (req->rq_reqmsg->opc != OST_CONNECT &&
- req->rq_export == NULL) {
+ if (req->rq_reqmsg->opc != OST_CONNECT && req->rq_export == NULL) {
CERROR("lustre_ost: operation %d on unconnected OST\n",
req->rq_reqmsg->opc);
req->rq_status = -ENOTCONN;
case OST_CREATE:
CDEBUG(D_INODE, "create\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
- rc = ost_create(req);
+ rc = ost_create(req, oti);
break;
case OST_DESTROY:
CDEBUG(D_INODE, "destroy\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
- rc = ost_destroy(req);
+ rc = ost_destroy(req, oti);
break;
case OST_GETATTR:
CDEBUG(D_INODE, "getattr\n");
case OST_SETATTR:
CDEBUG(D_INODE, "setattr\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
- rc = ost_setattr(req);
+ rc = ost_setattr(req, oti);
break;
case OST_OPEN:
CDEBUG(D_INODE, "open\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
- rc = ost_open(req);
+ rc = ost_open(req, oti);
break;
case OST_CLOSE:
CDEBUG(D_INODE, "close\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
- rc = ost_close(req);
+ rc = ost_close(req, oti);
break;
case OST_WRITE:
CDEBUG(D_INODE, "write\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
- rc = ost_brw_write(req);
+ rc = ost_brw_write(req, oti);
/* ost_brw sends its own replies */
RETURN(rc);
case OST_READ:
case OST_PUNCH:
CDEBUG(D_INODE, "punch\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
- rc = ost_punch(req);
+ rc = ost_punch(req, oti);
break;
case OST_STATFS:
CDEBUG(D_INODE, "statfs\n");
case LDLM_ENQUEUE:
CDEBUG(D_INODE, "enqueue\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
- rc = ldlm_handle_enqueue(req);
+ rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
+ ldlm_server_blocking_ast);
break;
case LDLM_CONVERT:
CDEBUG(D_INODE, "convert\n");
}
EXIT;
+ /* If we're DISCONNECTing, the export_data is already freed */
+ if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
+ struct obd_device *obd = req->rq_export->exp_obd;
+ if ((obd->obd_flags & OBD_NO_TRANSNO) == 0) {
+ req->rq_repmsg->last_committed =
+ HTON__u64(obd->obd_last_committed);
+ } else {
+ DEBUG_REQ(D_IOCTL, req,
+ "not sending last_committed update");
+ }
+ CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
+ obd->obd_last_committed, HTON__u64(req->rq_xid));
+ }
+
out:
//req->rq_status = rc;
if (rc) {
CDEBUG(D_INODE, "sending reply\n");
if (req->rq_repmsg == NULL)
CERROR("handler for opcode %d returned rc=0 without "
- "creating rq_repmsg; needs to return rc != "
- "0!\n", req->rq_reqmsg->opc);
+ "creating rq_repmsg; needs to return rc != 0!\n",
+ req->rq_reqmsg->opc);
+ else
+ oti_to_request(oti, req);
ptlrpc_reply(req->rq_svc, req);
}
return 0;
}
-/* mount the file system (secretly) */
static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
{
- struct obd_ioctl_data* data = buf;
struct ost_obd *ost = &obddev->u.ost;
- struct obd_device *tgt;
+ struct obd_uuid self = { "self" };
int err;
int i;
ENTRY;
- if (data->ioc_inllen1 < 1) {
- CERROR("requires a TARGET OBD UUID\n");
- RETURN(-EINVAL);
- }
- if (data->ioc_inllen1 > 37) {
- CERROR("OBD UUID must be less than 38 characters\n");
- RETURN(-EINVAL);
- }
-
- tgt = class_uuid2obd(data->ioc_inlbuf1);
- if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) ||
- !(tgt->obd_flags & OBD_SET_UP)) {
- CERROR("device not attached or not set up (%d)\n",
- data->ioc_dev);
- RETURN(err = -EINVAL);
- }
-
- err = obd_connect(&ost->ost_conn, tgt, NULL, NULL, NULL);
- if (err) {
- CERROR("fail to connect to device %d\n", data->ioc_dev);
- RETURN(err);
- }
-
ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
OST_BUFSIZE, OST_MAXREQSIZE,
OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
- "self", ost_handle, "ost");
+ &self, ost_handle, "ost");
if (!ost->ost_service) {
CERROR("failed to start service\n");
GOTO(error_disc, err = -ENOMEM);
RETURN(0);
error_disc:
- obd_disconnect(&ost->ost_conn);
RETURN(err);
}
static int ost_cleanup(struct obd_device * obddev)
{
struct ost_obd *ost = &obddev->u.ost;
- int err;
+ int err = 0;
ENTRY;
- if ( !list_empty(&obddev->obd_exports) ) {
- CERROR("still has clients!\n");
- RETURN(-EBUSY);
- }
-
ptlrpc_stop_all_threads(ost->ost_service);
ptlrpc_unregister_service(ost->ost_service);
- err = obd_disconnect(&ost->ost_conn);
- if (err)
- CERROR("lustre ost: fail to disconnect device\n");
-
RETURN(err);
}
int ost_attach(struct obd_device *dev, obd_count len, void *data)
{
- return lprocfs_reg_obd(dev, status_var_nm_1, dev);
+ struct lprocfs_static_vars lvars;
+
+ lprocfs_init_vars(&lvars);
+ return lprocfs_obd_attach(dev, lvars.obd_vars);
}
int ost_detach(struct obd_device *dev)
{
- return lprocfs_dereg_obd(dev);
+ return lprocfs_obd_detach(dev);
}
/* This is so similar to mds_connect that it makes my heart weep: we should
* target_handle_connect.
*/
static int ost_connect(struct lustre_handle *conn,
- struct obd_device *obd, obd_uuid_t cluuid,
+ struct obd_device *obd, struct obd_uuid *cluuid,
struct recovd_obd *recovd,
ptlrpc_recovery_cb_t recover)
{
RETURN(-EINVAL);
/* lctl gets a backstage, all-access pass. */
- if (!strcmp(cluuid, "OBD_CLASS_UUID"))
+ if (!strcmp(cluuid->uuid, "OBD_CLASS_UUID"))
goto dont_check_exports;
spin_lock(&obd->obd_dev_lock);
list_for_each(p, &obd->obd_exports) {
exp = list_entry(p, struct obd_export, exp_obd_chain);
oed = &exp->exp_ost_data;
- if (!memcmp(cluuid, oed->oed_uuid, sizeof oed->oed_uuid)) {
+ if (!memcmp(cluuid->uuid, oed->oed_uuid.uuid,
+ sizeof(oed->oed_uuid.uuid))) {
spin_unlock(&obd->obd_dev_lock);
LASSERT(exp->exp_obd == obd);
LASSERT(exp);
oed = &exp->exp_ost_data;
- memcpy(oed->oed_uuid, cluuid, sizeof oed->oed_uuid);
+ memcpy(oed->oed_uuid.uuid, cluuid->uuid, sizeof(oed->oed_uuid.uuid));
RETURN(0);
}
-
/* use obd ops to offer management infrastructure */
static struct obd_ops ost_obd_ops = {
o_owner: THIS_MODULE,
static int __init ost_init(void)
{
- int rc;
-
- rc = class_register_type(&ost_obd_ops, status_class_var,
- LUSTRE_OST_NAME);
- RETURN(rc);
+ struct lprocfs_static_vars lvars;
+ ENTRY;
+ lprocfs_init_vars(&lvars);
+ RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
+ LUSTRE_OST_NAME));
}
static void __exit ost_exit(void)