#include <linux/module.h>
#include <linux/obd_ost.h>
#include <linux/lustre_net.h>
+#include <linux/lustre_dlm.h>
+#include <linux/init.h>
+#include <linux/lprocfs_status.h>
-static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
+extern lprocfs_vars_t status_var_nm_1[];
+extern lprocfs_vars_t status_class_var[];
+
+static int ost_destroy(struct ptlrpc_request *req)
{
- struct obd_conn conn;
+ struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ost_body *body;
int rc, size = sizeof(*body);
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
- conn.oc_id = body->connid;
- conn.oc_dev = ost->ost_tgt;
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
- req->rq_status = obd_destroy(&conn, &body->oa);
+ req->rq_status = obd_destroy(conn, &body->oa, NULL);
RETURN(0);
}
-static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_getattr(struct ptlrpc_request *req)
{
- struct obd_conn conn;
+ struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ost_body *body, *repbody;
int rc, size = sizeof(*body);
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
- conn.oc_id = body->connid;
- conn.oc_dev = ost->ost_tgt;
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
repbody = lustre_msg_buf(req->rq_repmsg, 0);
+ /* FIXME: unpack only valid fields instead of memcpy, endianness */
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
- req->rq_status = obd_getattr(&conn, &repbody->oa);
+ req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
RETURN(0);
}
-static int ost_open(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_statfs(struct ptlrpc_request *req)
{
- struct obd_conn conn;
- struct ost_body *body, *repbody;
- int rc, size = sizeof(*body);
+ struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+ struct obd_statfs *osfs;
+ int rc, size = sizeof(*osfs);
ENTRY;
- body = lustre_msg_buf(req->rq_reqmsg, 0);
- conn.oc_id = body->connid;
- conn.oc_dev = ost->ost_tgt;
-
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
- repbody = lustre_msg_buf(req->rq_repmsg, 0);
- memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
- req->rq_status = obd_open(&conn, &repbody->oa);
+ osfs = lustre_msg_buf(req->rq_repmsg, 0);
+ memset(osfs, 0, size);
+
+ rc = obd_statfs(conn, osfs);
+ if (rc) {
+ CERROR("ost: statfs failed: rc %d\n", rc);
+ req->rq_status = rc;
+ RETURN(rc);
+ }
+ obd_statfs_pack(osfs, osfs);
+
RETURN(0);
}
-static int ost_close(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_open(struct ptlrpc_request *req)
{
- struct obd_conn conn;
+ struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ost_body *body, *repbody;
int rc, size = sizeof(*body);
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
- conn.oc_id = body->connid;
- conn.oc_dev = ost->ost_tgt;
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
repbody = lustre_msg_buf(req->rq_repmsg, 0);
+ /* FIXME: unpack only valid fields instead of memcpy, endianness */
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
- req->rq_status = obd_close(&conn, &repbody->oa);
+ req->rq_status = obd_open(conn, &repbody->oa, NULL);
RETURN(0);
}
-static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_close(struct ptlrpc_request *req)
{
- struct obd_conn conn;
+ struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ost_body *body, *repbody;
int rc, size = sizeof(*body);
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
- conn.oc_id = body->connid;
- conn.oc_dev = ost->ost_tgt;
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
repbody = lustre_msg_buf(req->rq_repmsg, 0);
+ /* FIXME: unpack only valid fields instead of memcpy, endianness */
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
- req->rq_status = obd_create(&conn, &repbody->oa);
+ req->rq_status = obd_close(conn, &repbody->oa, NULL);
RETURN(0);
}
-static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_create(struct ptlrpc_request *req)
{
- struct obd_conn conn;
+ struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ost_body *body, *repbody;
int rc, size = sizeof(*body);
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
- conn.oc_id = body->connid;
- conn.oc_dev = ost->ost_tgt;
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
repbody = lustre_msg_buf(req->rq_repmsg, 0);
+ /* FIXME: unpack only valid fields instead of memcpy, endianness */
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
- req->rq_status = obd_punch(&conn, &repbody->oa,
- repbody->oa.o_size, repbody->oa.o_blocks);
+ req->rq_status = obd_create(conn, &repbody->oa, NULL);
RETURN(0);
}
-static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_punch(struct ptlrpc_request *req)
{
- struct obd_conn conn;
+ struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ost_body *body, *repbody;
int rc, size = sizeof(*body);
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
- conn.oc_id = body->connid;
- conn.oc_dev = ost->ost_tgt;
+
+ if ((NTOH__u32(body->oa.o_valid) & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))!=
+ (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
+ RETURN(-EINVAL);
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
repbody = lustre_msg_buf(req->rq_repmsg, 0);
+ /* FIXME: unpack only valid fields instead of memcpy, endianness */
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
- req->rq_status = obd_setattr(&conn, &repbody->oa);
- RETURN(0);
-}
-
-static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
-{
- struct obd_conn conn;
- struct ost_body *body;
- int rc, size = sizeof(*body);
- ENTRY;
-
- conn.oc_dev = ost->ost_tgt;
-
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
- if (rc)
- RETURN(rc);
-
- req->rq_status = obd_connect(&conn);
-
- CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repmsg, conn.oc_id);
- body = lustre_msg_buf(req->rq_repmsg, 0);
- body->connid = conn.oc_id;
+ req->rq_status = obd_punch(conn, &repbody->oa, NULL,
+ repbody->oa.o_size, repbody->oa.o_blocks);
RETURN(0);
}
-static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_setattr(struct ptlrpc_request *req)
{
- struct obd_conn conn;
- struct ost_body *body;
+ struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
+ struct ost_body *body, *repbody;
int rc, size = sizeof(*body);
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
- conn.oc_id = body->connid;
- conn.oc_dev = ost->ost_tgt;
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
RETURN(rc);
- CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id);
- req->rq_status = obd_disconnect(&conn);
+ repbody = lustre_msg_buf(req->rq_repmsg, 0);
+ /* FIXME: unpack only valid fields instead of memcpy, endianness */
+ memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+ req->rq_status = obd_setattr(conn, &repbody->oa, NULL);
RETURN(0);
}
-static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_bulk_timeout(void *data)
{
- struct obd_conn conn;
- struct ost_body *body;
- int rc, size[2] = {sizeof(*body)};
- char *bufs[2] = {NULL, NULL}, *ptr;
- ENTRY;
-
- body = lustre_msg_buf(req->rq_reqmsg, 0);
- conn.oc_id = body->connid;
- conn.oc_dev = ost->ost_tgt;
-
- ptr = lustre_msg_buf(req->rq_reqmsg, 1);
- if (!ptr)
- RETURN(-EINVAL);
-
- req->rq_status = obd_get_info(&conn, req->rq_reqmsg->buflens[1], ptr,
- &(size[1]), (void **)&(bufs[1]));
-
- rc = lustre_pack_msg(2, size, bufs, &req->rq_replen, &req->rq_repmsg);
- if (rc)
- CERROR("cannot pack reply\n");
+ struct ptlrpc_bulk_desc *desc = data;
- RETURN(rc);
+ ENTRY;
+ CERROR("(not yet) starting recovery of client %p\n", desc->bd_client);
+ RETURN(1);
}
-static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
+static int ost_brw_read(struct ptlrpc_request *req)
{
+ struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ptlrpc_bulk_desc *desc;
- struct obd_conn conn;
void *tmp1, *tmp2, *end2;
struct niobuf_remote *remote_nb;
struct niobuf_local *local_nb = NULL;
struct obd_ioobj *ioo;
struct ost_body *body;
+ struct l_wait_info lwi;
+ void *desc_priv = NULL;
int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
ENTRY;
end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
- cmd = body->data;
+ cmd = OBD_BRW_READ;
- conn.oc_id = body->connid;
- conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
+ GOTO(out, rc = 0);
for (i = 0; i < objcount; i++) {
ost_unpack_ioo(&tmp1, &ioo);
ost_unpack_niobuf(&tmp2, &remote_nb);
}
- rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
- if (rc)
- RETURN(rc);
OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
if (local_nb == NULL)
- RETURN(-ENOMEM);
+ GOTO(out, rc = -ENOMEM);
/* The unpackers move tmp1 and tmp2, so reset them before using */
- tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
- tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
- req->rq_status = obd_preprw(cmd, &conn, objcount,
- tmp1, niocount, tmp2, local_nb);
+ ioo = lustre_msg_buf(req->rq_reqmsg, 1);
+ remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
+ req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
+ remote_nb, local_nb, &desc_priv);
if (req->rq_status)
- GOTO(out_local, 0);
+ GOTO(out, rc = 0);
desc = ptlrpc_prep_bulk(req->rq_connection);
if (desc == NULL)
GOTO(out_local, rc = -ENOMEM);
- desc->b_portal = OST_BULK_PORTAL;
+ desc->bd_portal = OST_BULK_PORTAL;
for (i = 0; i < niocount; i++) {
- struct ptlrpc_bulk_page *bulk;
- bulk = ptlrpc_prep_bulk_page(desc);
+ struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
+
if (bulk == NULL)
GOTO(out_bulk, rc = -ENOMEM);
- remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
- bulk->b_xid = remote_nb->xid;
- bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
- bulk->b_buflen = PAGE_SIZE;
+ bulk->bp_xid = remote_nb[i].xid;
+ bulk->bp_buf = local_nb[i].addr;
+ bulk->bp_buflen = remote_nb[i].len;
}
rc = ptlrpc_send_bulk(desc);
if (rc)
GOTO(out_bulk, rc);
- ptlrpc_free_bulk(desc);
+ lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
+ rc = l_wait_event(desc->bd_waitq, desc->bd_flags &PTL_BULK_FL_SENT, &lwi);
+ if (rc) {
+ LASSERT(rc == -ETIMEDOUT);
+ GOTO(out_bulk, rc);
+ }
- /* The unpackers move tmp1 and tmp2, so reset them before using */
- tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
- tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
- req->rq_status = obd_commitrw(cmd, &conn, objcount,
- tmp1, niocount, local_nb);
+ req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
+ local_nb, desc_priv);
- RETURN(rc);
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
- out_bulk:
+out_bulk:
ptlrpc_free_bulk(desc);
- out_local:
- if (local_nb != NULL)
- OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
- out:
- return 0;
-}
-
-static int ost_commit_page(struct obd_conn *conn, struct page *page)
-{
- struct obd_ioobj obj;
- struct niobuf_local buf;
- int rc;
- ENTRY;
-
- memset(&buf, 0, sizeof(buf));
- memset(&obj, 0, sizeof(obj));
-
- buf.page = page;
- obj.ioo_bufcnt = 1;
-
- rc = obd_commitrw(OBD_BRW_WRITE, conn, 1, &obj, 1, &buf);
- RETURN(rc);
-}
-
-static int ost_brw_write_cb(struct ptlrpc_bulk_page *bulk)
-{
- void *journal_save;
- int rc;
- ENTRY;
-
- /* Restore the filesystem journal context when we do the commit.
- * This is needed for ext3 and reiserfs, but can't really hurt
- * other filesystems.
- */
- journal_save = current->journal_info;
- current->journal_info = bulk->b_desc->b_journal_info;
- CDEBUG(D_BUFFS, "journal_info: saved %p->%p, restored %p\n", current,
- journal_save, bulk->b_desc->b_journal_info);
- rc = ost_commit_page(&bulk->b_desc->b_conn, bulk->b_page);
- current->journal_info = journal_save;
- CDEBUG(D_BUFFS, "journal_info: restored %p->%p\n", current,
- journal_save);
- if (rc)
- CERROR("ost_commit_page failed: %d\n", rc);
-
+out_local:
+ OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
+out:
+ if (rc) {
+ /* It's a lot of work to delay allocating the reply, and a lot
+ * less work to just free it here. */
+ OBD_FREE(req->rq_repmsg, req->rq_replen);
+ req->rq_repmsg = NULL;
+ ptlrpc_error(req->rq_svc, req);
+ } else
+ ptlrpc_reply(req->rq_svc, req);
RETURN(rc);
}
-static int ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc)
-{
- ptlrpc_free_bulk(desc);
-
- return 0;
-}
-
-static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
+static int ost_brw_write(struct ptlrpc_request *req)
{
+ struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
struct ptlrpc_bulk_desc *desc;
- struct obd_conn conn;
struct niobuf_remote *remote_nb;
struct niobuf_local *local_nb, *lnb;
struct obd_ioobj *ioo;
struct ost_body *body;
int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
void *tmp1, *tmp2, *end2;
+ void *desc_priv = NULL;
+ int reply_sent = 0;
+ struct ptlrpc_service *srv;
+ struct l_wait_info lwi;
+ __u32 xid;
ENTRY;
body = lustre_msg_buf(req->rq_reqmsg, 0);
end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
- cmd = body->data;
-
- conn.oc_id = body->connid;
- conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
+ cmd = OBD_BRW_WRITE;
for (i = 0; i < objcount; i++) {
ost_unpack_ioo((void *)&tmp1, &ioo);
size[1] = niocount * sizeof(*remote_nb);
rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc)
- GOTO(fail, rc);
+ GOTO(out, rc);
remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
if (local_nb == NULL)
- GOTO(fail, rc = -ENOMEM);
+ GOTO(out, rc = -ENOMEM);
/* The unpackers move tmp1 and tmp2, so reset them before using */
tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
- req->rq_status = obd_preprw(cmd, &conn, objcount,
- tmp1, niocount, tmp2, local_nb);
+ req->rq_status = obd_preprw(cmd, conn, objcount, tmp1, niocount, tmp2,
+ local_nb, &desc_priv);
if (req->rq_status)
- GOTO(success, 0);
+ GOTO(out_free, rc = 0); /* XXX is this correct? */
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
+ GOTO(fail_preprw, rc = 0);
desc = ptlrpc_prep_bulk(req->rq_connection);
if (desc == NULL)
GOTO(fail_preprw, rc = -ENOMEM);
- desc->b_cb = ost_brw_write_finished_cb;
- desc->b_portal = OSC_BULK_PORTAL;
- memcpy(&(desc->b_conn), &conn, sizeof(conn));
+ desc->bd_cb = NULL;
+ desc->bd_portal = OSC_BULK_PORTAL;
+ desc->bd_desc_private = desc_priv;
+ memcpy(&(desc->bd_conn), &conn, sizeof(conn));
- /* Save journal context for commit callbacks */
- CERROR("journal_info: saved %p->%p\n", current, current->journal_info);
- desc->b_journal_info = current->journal_info;
+ srv = req->rq_obd->u.ost.ost_service;
+ spin_lock(&srv->srv_lock);
+ xid = srv->srv_xid++; /* single xid for all pages */
+ spin_unlock(&srv->srv_lock);
for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
- struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
struct ptlrpc_bulk_page *bulk;
bulk = ptlrpc_prep_bulk_page(desc);
if (bulk == NULL)
GOTO(fail_bulk, rc = -ENOMEM);
- spin_lock(&srv->srv_lock);
- bulk->b_xid = srv->srv_xid++;
- spin_unlock(&srv->srv_lock);
+ bulk->bp_xid = xid; /* single xid for all pages */
- bulk->b_buf = (void *)(unsigned long)lnb->addr;
- bulk->b_page = lnb->page;
- bulk->b_buflen = PAGE_SIZE;
- bulk->b_cb = ost_brw_write_cb;
+ bulk->bp_buf = lnb->addr;
+ bulk->bp_page = lnb->page;
+ bulk->bp_flags = lnb->flags;
+ bulk->bp_dentry = lnb->dentry;
+ bulk->bp_buflen = lnb->len;
+ bulk->bp_cb = NULL;
/* this advances remote_nb */
ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
- bulk->b_xid);
+ bulk->bp_xid);
}
rc = ptlrpc_register_bulk(desc);
- current->journal_info = NULL; /* kind of scary */
if (rc)
GOTO(fail_bulk, rc);
- EXIT;
- success:
- OBD_FREE(local_nb, niocount * sizeof(*local_nb));
- return 0;
+ reply_sent = 1;
+ ptlrpc_reply(req->rq_svc, req);
- fail_bulk:
+ lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
+ rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD,
+ &lwi);
+ if (rc) {
+ if (rc != -ETIMEDOUT)
+ LBUG();
+ GOTO(fail_bulk, rc);
+ }
+
+ rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb,
+ desc->bd_desc_private);
ptlrpc_free_bulk(desc);
- fail_preprw:
+ EXIT;
+out_free:
OBD_FREE(local_nb, niocount * sizeof(*local_nb));
- /* FIXME: how do we undo the preprw? */
- fail:
+out:
+ if (!reply_sent) {
+ if (rc) {
+ OBD_FREE(req->rq_repmsg, req->rq_replen);
+ req->rq_repmsg = NULL;
+ ptlrpc_error(req->rq_svc, req);
+ } else
+ ptlrpc_reply(req->rq_svc, req);
+ }
return rc;
-}
-static int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
-{
- struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, 0);
-
- if (body->data == OBD_BRW_READ)
- return ost_brw_read(obddev, req);
- else
- return ost_brw_write(obddev, req);
+fail_bulk:
+ ptlrpc_free_bulk(desc);
+fail_preprw:
+ /* FIXME: how do we undo the preprw? */
+ goto out_free;
}
-static int ost_handle(struct obd_device *obddev, struct ptlrpc_service *svc,
- struct ptlrpc_request *req)
+static int ost_handle(struct ptlrpc_request *req)
{
int rc;
- struct ost_obd *ost = &obddev->u.ost;
ENTRY;
rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
- if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
- CERROR("lustre_mds: Invalid request\n");
+ if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) {
+ CERROR("lustre_ost: Invalid request\n");
GOTO(out, rc);
}
- if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
- CERROR("lustre_mds: wrong packet type sent %d\n",
- req->rq_reqmsg->type);
- GOTO(out, rc = -EINVAL);
+ if (req->rq_reqmsg->opc != OST_CONNECT &&
+ req->rq_export == NULL) {
+ CERROR("lustre_ost: operation %d on unconnected OST\n",
+ req->rq_reqmsg->opc);
+ GOTO(out, rc = -ENOTCONN);
}
+ if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
+ GOTO(out, rc = -EINVAL);
+
switch (req->rq_reqmsg->opc) {
case OST_CONNECT:
CDEBUG(D_INODE, "connect\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
- rc = ost_connect(ost, req);
+ rc = target_handle_connect(req);
break;
case OST_DISCONNECT:
CDEBUG(D_INODE, "disconnect\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
- rc = ost_disconnect(ost, req);
- break;
- case OST_GET_INFO:
- CDEBUG(D_INODE, "get_info\n");
- OBD_FAIL_RETURN(OBD_FAIL_OST_GET_INFO_NET, 0);
- rc = ost_get_info(ost, req);
+ rc = target_handle_disconnect(req);
break;
case OST_CREATE:
CDEBUG(D_INODE, "create\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
- rc = ost_create(ost, req);
+ rc = ost_create(req);
break;
case OST_DESTROY:
CDEBUG(D_INODE, "destroy\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
- rc = ost_destroy(ost, req);
+ rc = ost_destroy(req);
break;
case OST_GETATTR:
CDEBUG(D_INODE, "getattr\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
- rc = ost_getattr(ost, req);
+ rc = ost_getattr(req);
break;
case OST_SETATTR:
CDEBUG(D_INODE, "setattr\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
- rc = ost_setattr(ost, req);
+ rc = ost_setattr(req);
break;
case OST_OPEN:
- CDEBUG(D_INODE, "setattr\n");
+ CDEBUG(D_INODE, "open\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
- rc = ost_open(ost, req);
+ rc = ost_open(req);
break;
case OST_CLOSE:
- CDEBUG(D_INODE, "setattr\n");
+ CDEBUG(D_INODE, "close\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
- rc = ost_close(ost, req);
+ rc = ost_close(req);
break;
- case OST_BRW:
- CDEBUG(D_INODE, "brw\n");
+ case OST_WRITE:
+ CDEBUG(D_INODE, "write\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
- rc = ost_brw(ost, req);
- break;
+ rc = ost_brw_write(req);
+ /* ost_brw sends its own replies */
+ RETURN(rc);
+ case OST_READ:
+ CDEBUG(D_INODE, "read\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
+ rc = ost_brw_read(req);
+ /* ost_brw sends its own replies */
+ RETURN(rc);
case OST_PUNCH:
CDEBUG(D_INODE, "punch\n");
OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
- rc = ost_punch(ost, req);
+ rc = ost_punch(req);
+ break;
+ case OST_STATFS:
+ CDEBUG(D_INODE, "statfs\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
+ rc = ost_statfs(req);
+ break;
+ case LDLM_ENQUEUE:
+ CDEBUG(D_INODE, "enqueue\n");
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
+ rc = ldlm_handle_enqueue(req);
+ if (rc)
+ break;
+ RETURN(0);
+ case LDLM_CONVERT:
+ CDEBUG(D_INODE, "convert\n");
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
+ rc = ldlm_handle_convert(req);
+ if (rc)
+ break;
+ RETURN(0);
+ case LDLM_CANCEL:
+ CDEBUG(D_INODE, "cancel\n");
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
+ rc = ldlm_handle_cancel(req);
+ if (rc)
+ break;
+ RETURN(0);
+ case LDLM_BL_CALLBACK:
+ case LDLM_CP_CALLBACK:
+ CDEBUG(D_INODE, "callback\n");
+ CERROR("callbacks should not happen on OST\n");
+ LBUG();
+ OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
break;
default:
req->rq_status = -ENOTSUPP;
- rc = ptlrpc_error(svc, req);
+ rc = ptlrpc_error(req->rq_svc, req);
RETURN(rc);
}
out:
//req->rq_status = rc;
if (rc) {
- CERROR("ost: processing error %d\n", rc);
- ptlrpc_error(svc, req);
+ CERROR("ost: processing error (opcode=%d): %d\n",
+ req->rq_reqmsg->opc, rc);
+ ptlrpc_error(req->rq_svc, req);
} else {
CDEBUG(D_INODE, "sending reply\n");
- ptlrpc_reply(svc, req);
+ if (req->rq_repmsg == NULL)
+ CERROR("handler for opcode %d returned rc=0 without "
+ "creating rq_repmsg; needs to return rc != "
+ "0!\n", req->rq_reqmsg->opc);
+ ptlrpc_reply(req->rq_svc, req);
}
return 0;
}
+#define OST_NUM_THREADS 6
+
/* mount the file system (secretly) */
static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
{
struct ost_obd *ost = &obddev->u.ost;
struct obd_device *tgt;
int err;
+ int i;
ENTRY;
- if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES)
- RETURN(-ENODEV);
+ if (data->ioc_inllen1 < 1) {
+ CERROR("requires a TARGET OBD UUID\n");
+ RETURN(-EINVAL);
+ }
+ if (data->ioc_inllen1 > 37) {
+ CERROR("OBD UUID must be less than 38 characters\n");
+ RETURN(-EINVAL);
+ }
MOD_INC_USE_COUNT;
- tgt = &obd_dev[data->ioc_dev];
- ost->ost_tgt = tgt;
- if (!(tgt->obd_flags & OBD_ATTACHED) ||
+ tgt = class_uuid2obd(data->ioc_inlbuf1);
+ if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) ||
!(tgt->obd_flags & OBD_SET_UP)) {
CERROR("device not attached or not set up (%d)\n",
data->ioc_dev);
GOTO(error_dec, err = -EINVAL);
}
- ost->ost_conn.oc_dev = tgt;
- err = obd_connect(&ost->ost_conn);
+ err = obd_connect(&ost->ost_conn, tgt, NULL, NULL, NULL);
if (err) {
CERROR("fail to connect to device %d\n", data->ioc_dev);
GOTO(error_dec, err = -EINVAL);
}
- ost->ost_service = ptlrpc_init_svc(128 * 1024,
- OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
- "self", ost_handle);
+ ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
+ OST_BUFSIZE, OST_MAXREQSIZE,
+ OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
+ "self", ost_handle, "ost");
if (!ost->ost_service) {
CERROR("failed to start service\n");
GOTO(error_disc, err = -EINVAL);
}
- err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
- if (err)
- GOTO(error_disc, err = -EINVAL);
- err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
- if (err)
- GOTO(error_disc, err = -EINVAL);
+ for (i = 0; i < OST_NUM_THREADS; i++) {
+ char name[32];
+ sprintf(name, "lustre_ost_%02d", i);
+ err = ptlrpc_start_thread(obddev, ost->ost_service, name);
+ if (err) {
+ CERROR("error starting thread #%d: rc %d\n", i, err);
+ GOTO(error_disc, err = -EINVAL);
+ }
+ }
RETURN(0);
ENTRY;
- if ( !list_empty(&obddev->obd_gen_clients) ) {
+ if ( !list_empty(&obddev->obd_exports) ) {
CERROR("still has clients!\n");
RETURN(-EBUSY);
}
ptlrpc_stop_all_threads(ost->ost_service);
- rpc_unregister_service(ost->ost_service);
-
- if (!list_empty(&ost->ost_service->srv_reqs)) {
- // XXX reply with errors and clean up
- CERROR("Request list not empty!\n");
- }
- OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
+ ptlrpc_unregister_service(ost->ost_service);
err = obd_disconnect(&ost->ost_conn);
if (err) {
MOD_DEC_USE_COUNT;
RETURN(0);
}
+int ost_attach(struct obd_device *dev,
+ obd_count len, void *data)
+{
+ /* lprocfs_reg_dev(dev, (lprocfs_group_t*)lprocfs_ptlrpc_nm,
+ sizeof(struct lprofiler_ptlrpc));
+ */
+ lprocfs_reg_obd(dev, (lprocfs_vars_t*)status_var_nm_1, (void*)dev);
+ return 0;
+}
+
+int ost_detach(struct obd_device *dev)
+{
+ /* lprocfs_dereg_dev(dev); */
+ lprocfs_dereg_obd(dev);
+ return 0;
+
+}
+
+
/* use obd ops to offer management infrastructure */
static struct obd_ops ost_obd_ops = {
+ o_attach: ost_attach,
+ o_detach: ost_detach,
o_setup: ost_setup,
o_cleanup: ost_cleanup,
};
static int __init ost_init(void)
{
- obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
+ int rc;
+
+ rc = class_register_type(&ost_obd_ops,
+ (lprocfs_vars_t*)status_class_var,
+ LUSTRE_OST_NAME);
+ if (rc) RETURN(rc);
+
return 0;
+
}
static void __exit ost_exit(void)
{
- obd_unregister_type(LUSTRE_OST_NAME);
+
+ class_unregister_type(LUSTRE_OST_NAME);
}
-MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
+MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
MODULE_LICENSE("GPL");