-/*
- * ost/ost_handler.c
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ * Author: Peter J. Braam <braam@clusterfs.com>
+ * Author: Phil Schwan <phil@clusterfs.com>
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
* Storage Target Handling functions
- *
* Lustre Object Server Module (OST)
- *
- * Copyright (C) 2001 Cluster File Systems, Inc.
*
- * This code is issued under the GNU General Public License.
- * See the file COPYING in this distribution
- *
- * by Peter Braam <braam@clusterfs.com>
- *
- * This server is single threaded at present (but can easily be multi threaded).
- * For testing and management it is treated as an obd_device, although it does
- * not export a full OBD method table (the requests are coming in over the wire,
- * so object target modules do not have a full method table.)
- *
+ * This server is single threaded at present (but can easily be multi
+ * threaded). For testing and management it is treated as an
+ * obd_device, although it does not export a full OBD method table
+ * (the requests are coming in over the wire, so object target
+ * modules do not have a full method table.)
*/
-
#define EXPORT_SYMTAB
+#define DEBUG_SUBSYSTEM S_OST
-#include <linux/version.h>
#include <linux/module.h>
-#include <linux/fs.h>
-#include <linux/stat.h>
-#include <linux/locks.h>
-#include <linux/ext2_fs.h>
-#include <linux/quotaops.h>
-#include <asm/unistd.h>
-#include <linux/obd_support.h>
-#include <linux/obd.h>
-#include <linux/obd_class.h>
-#include <linux/lustre_lib.h>
-#include <linux/lustre_idl.h>
-#include <linux/lustre_mds.h>
-#include <linux/obd_class.h>
-
-// for testing
-static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req)
+#include <linux/obd_ost.h>
+#include <linux/lustre_net.h>
+
+static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
{
- struct ptlrpc_request *srv_req;
- struct ost_obd *ost = &obddev->u.ost;
-
- if (!ost) {
- EXIT;
- return -1;
- }
-
- srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
- if (!srv_req) {
- EXIT;
- return -ENOMEM;
- }
-
- printk("---> OST at %d %p, incoming req %p, srv_req %p\n",
- __LINE__, ost, req, srv_req);
-
- memset(srv_req, 0, sizeof(*req));
-
- /* move the request buffer */
- srv_req->rq_reqbuf = req->rq_reqbuf;
- srv_req->rq_reqlen = req->rq_reqlen;
- srv_req->rq_ost = ost;
-
- /* remember where it came from */
- srv_req->rq_reply_handle = req;
-
- list_add(&srv_req->rq_list, &ost->ost_reqs);
- wake_up(&ost->ost_waitq);
- return 0;
-}
+ struct obd_conn conn;
+ struct ost_body *body;
+ int rc, size = sizeof(*body);
+ ENTRY;
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ conn.oc_id = body->connid;
+ conn.oc_dev = ost->ost_tgt;
-/* XXX replace with networking code */
-int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req)
-{
- struct ptlrpc_request *clnt_req = req->rq_reply_handle;
-
- ENTRY;
- printk("ost_reply: req %p clnt_req at %p\n", req, clnt_req);
-
- /* free the request buffer */
- kfree(req->rq_reqbuf);
- req->rq_reqbuf = NULL;
-
- /* move the reply to the client */
- clnt_req->rq_replen = req->rq_replen;
- clnt_req->rq_repbuf = req->rq_repbuf;
-
- printk("---> client req %p repbuf %p len %d status %d\n",
- clnt_req, clnt_req->rq_repbuf, clnt_req->rq_replen,
- req->rq_rephdr->status);
-
- req->rq_repbuf = NULL;
- req->rq_replen = 0;
-
- /* free the server request */
- kfree(req);
- /* wake up the client */
- wake_up_interruptible(&clnt_req->rq_wait_for_rep);
- EXIT;
- return 0;
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ RETURN(rc);
+
+ req->rq_status = obd_destroy(&conn, &body->oa);
+ RETURN(0);
}
-int ost_error(struct obd_device *obddev, struct ptlrpc_request *req)
+static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
{
- struct ptlrep_hdr *hdr;
-
- ENTRY;
- hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
- if (!hdr) {
- EXIT;
- return -ENOMEM;
- }
-
- memset(hdr, 0, sizeof(*hdr));
-
- hdr->seqno = req->rq_reqhdr->seqno;
- hdr->status = req->rq_status;
- hdr->type = OST_TYPE_ERR;
-
- req->rq_repbuf = (char *)hdr;
- req->rq_replen = sizeof(*hdr);
-
- EXIT;
- return ost_reply(obddev, req);
+ struct obd_conn conn;
+ struct ost_body *body, *repbody;
+ int rc, size = sizeof(*body);
+ ENTRY;
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ conn.oc_id = body->connid;
+ conn.oc_dev = ost->ost_tgt;
+
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ RETURN(rc);
+
+ repbody = lustre_msg_buf(req->rq_repmsg, 0);
+ memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+ req->rq_status = obd_getattr(&conn, &repbody->oa);
+ RETURN(0);
}
-static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_open(struct ost_obd *ost, struct ptlrpc_request *req)
{
- struct obd_conn conn;
- int rc;
-
- ENTRY;
-
- conn.oc_id = req->rq_req.ost->connid;
- conn.oc_dev = ost->ost_tgt;
-
- rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
- &req->rq_replen, &req->rq_repbuf);
- if (rc) {
- printk("ost_destroy: cannot pack reply\n");
- return rc;
- }
-
- req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
- (&conn, &req->rq_req.ost->oa);
-
- EXIT;
- return 0;
+ struct obd_conn conn;
+ struct ost_body *body, *repbody;
+ int rc, size = sizeof(*body);
+ ENTRY;
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ conn.oc_id = body->connid;
+ conn.oc_dev = ost->ost_tgt;
+
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ RETURN(rc);
+
+ repbody = lustre_msg_buf(req->rq_repmsg, 0);
+ memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+ req->rq_status = obd_open(&conn, &repbody->oa);
+ RETURN(0);
}
-static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
+static int ost_close(struct ost_obd *ost, struct ptlrpc_request *req)
{
- struct obd_conn conn;
- int rc;
-
- ENTRY;
- printk("ost getattr entered\n");
-
- conn.oc_id = req->rq_req.ost->connid;
- conn.oc_dev = ost->ost_tgt;
-
- rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
- &req->rq_replen, &req->rq_repbuf);
- if (rc) {
- printk("ost_getattr: cannot pack reply\n");
- return rc;
- }
- req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
- req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
-
- req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
- (&conn, &req->rq_rep.ost->oa);
-
- EXIT;
- return 0;
+ struct obd_conn conn;
+ struct ost_body *body, *repbody;
+ int rc, size = sizeof(*body);
+ ENTRY;
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ conn.oc_id = body->connid;
+ conn.oc_dev = ost->ost_tgt;
+
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ RETURN(rc);
+
+ repbody = lustre_msg_buf(req->rq_repmsg, 0);
+ memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+ req->rq_status = obd_close(&conn, &repbody->oa);
+ RETURN(0);
}
static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
{
- struct obd_conn conn;
- int rc;
+ struct obd_conn conn;
+ struct ost_body *body, *repbody;
+ int rc, size = sizeof(*body);
+ ENTRY;
- ENTRY;
-
- conn.oc_id = req->rq_req.ost->connid;
- conn.oc_dev = ost->ost_tgt;
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ conn.oc_id = body->connid;
+ conn.oc_dev = ost->ost_tgt;
- rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
- &req->rq_replen, &req->rq_repbuf);
- if (rc) {
- printk("ost_create: cannot pack reply\n");
- return rc;
- }
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ RETURN(rc);
- memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
+ repbody = lustre_msg_buf(req->rq_repmsg, 0);
+ memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+ req->rq_status = obd_create(&conn, &repbody->oa);
+ RETURN(0);
+}
- req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create
- (&conn, &req->rq_rep.ost->oa);
+static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
+{
+ struct obd_conn conn;
+ struct ost_body *body, *repbody;
+ int rc, size = sizeof(*body);
+ ENTRY;
- EXIT;
- return 0;
-}
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ conn.oc_id = body->connid;
+ conn.oc_dev = ost->ost_tgt;
+
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ RETURN(rc);
+ repbody = lustre_msg_buf(req->rq_repmsg, 0);
+ memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+ req->rq_status = obd_punch(&conn, &repbody->oa,
+ repbody->oa.o_size, repbody->oa.o_blocks);
+ RETURN(0);
+}
static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
{
- struct obd_conn conn;
- int rc;
-
- ENTRY;
-
- conn.oc_id = req->rq_req.ost->connid;
- conn.oc_dev = ost->ost_tgt;
-
- rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
- &req->rq_replen, &req->rq_repbuf);
- if (rc) {
- printk("ost_setattr: cannot pack reply\n");
- return rc;
- }
+ struct obd_conn conn;
+ struct ost_body *body, *repbody;
+ int rc, size = sizeof(*body);
+ ENTRY;
- memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ conn.oc_id = body->connid;
+ conn.oc_dev = ost->ost_tgt;
- req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
- (&conn, &req->rq_rep.ost->oa);
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ RETURN(rc);
- EXIT;
- return 0;
+ repbody = lustre_msg_buf(req->rq_repmsg, 0);
+ memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+ req->rq_status = obd_setattr(&conn, &repbody->oa);
+ RETURN(0);
}
static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
{
- struct obd_conn conn;
- int rc;
-
- ENTRY;
-
- conn.oc_dev = ost->ost_tgt;
-
- rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
- &req->rq_replen, &req->rq_repbuf);
- if (rc) {
- printk("ost_setattr: cannot pack reply\n");
- return rc;
- }
-
- req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
-
- printk("ost_connect: rep buffer %p, id %d\n", req->rq_repbuf,
- conn.oc_id);
- req->rq_rep.ost->connid = conn.oc_id;
- EXIT;
- return 0;
-}
+ struct obd_conn conn;
+ struct ost_body *body;
+ int rc, size = sizeof(*body);
+ ENTRY;
+
+ conn.oc_dev = ost->ost_tgt;
+
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ RETURN(rc);
+ req->rq_status = obd_connect(&conn);
+
+ CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repmsg, conn.oc_id);
+ body = lustre_msg_buf(req->rq_repmsg, 0);
+ body->connid = conn.oc_id;
+ RETURN(0);
+}
static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
{
- struct obd_conn conn;
- int rc;
-
- ENTRY;
-
- conn.oc_dev = ost->ost_tgt;
- conn.oc_id = req->rq_req.ost->connid;
+ struct obd_conn conn;
+ struct ost_body *body;
+ int rc, size = sizeof(*body);
+ ENTRY;
- rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
- &req->rq_replen, &req->rq_repbuf);
- if (rc) {
- printk("ost_setattr: cannot pack reply\n");
- return rc;
- }
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ conn.oc_id = body->connid;
+ conn.oc_dev = ost->ost_tgt;
- req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ RETURN(rc);
- EXIT;
- return 0;
+ CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id);
+ req->rq_status = obd_disconnect(&conn);
+ RETURN(0);
}
static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
{
- struct obd_conn conn;
- int rc;
- int vallen;
- void *val;
- char *ptr;
-
- ENTRY;
-
- conn.oc_id = req->rq_req.ost->connid;
- conn.oc_dev = ost->ost_tgt;
-
- ptr = ost_req_buf1(req->rq_req.ost);
- req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
- (&conn, req->rq_req.ost->buflen1, ptr, &vallen, &val);
-
- rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
- &req->rq_replen, &req->rq_repbuf);
- if (rc) {
- printk("ost_setattr: cannot pack reply\n");
- return rc;
- }
-
- EXIT;
- return 0;
-}
+ struct obd_conn conn;
+ struct ost_body *body;
+ int rc, size[2] = {sizeof(*body)};
+ char *bufs[2] = {NULL, NULL}, *ptr;
+ ENTRY;
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ conn.oc_id = body->connid;
+ conn.oc_dev = ost->ost_tgt;
-#if 0
-static struct page * ext2_get_page(struct inode *dir, unsigned long n)
-{
- struct address_space *mapping = dir->i_mapping;
- struct page *page = read_cache_page(mapping, n,
- (filler_t*)mapping->a_ops->readpage, NULL);
- if (!IS_ERR(page)) {
- wait_on_page(page);
- kmap(page);
- if (!Page_Uptodate(page))
- goto fail;
- if (!PageChecked(page))
- ext2_check_page(page);
- if (PageError(page))
- goto fail;
- }
- return page;
-
-fail:
- ext2_put_page(page);
- return ERR_PTR(-EIO);
-}
+ ptr = lustre_msg_buf(req->rq_reqmsg, 1);
+ if (!ptr)
+ RETURN(-EINVAL);
-static inline void ext2_put_page(struct page *page)
-{
- kunmap(page);
- page_cache_release(page);
+ req->rq_status = obd_get_info(&conn, req->rq_reqmsg->buflens[1], ptr,
+ &(size[1]), (void **)&(bufs[1]));
+
+ rc = lustre_pack_msg(2, size, bufs, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ CERROR("cannot pack reply\n");
+
+ RETURN(rc);
}
-/* Releases the page */
-void ext2_set_link(struct inode *dir, struct ext2_dir_entry_2 *de,
- struct page *page, struct inode *inode)
+static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
{
- unsigned from = (char *) de - (char *) page_address(page);
- unsigned to = from + le16_to_cpu(de->rec_len);
- int err;
-
- lock_page(page);
- err = page->mapping->a_ops->prepare_write(NULL, page, from, to);
- if (err)
- BUG();
- de->inode = cpu_to_le32(inode->i_ino);
- ext2_set_de_type (de, inode);
- dir->i_mtime = dir->i_ctime = CURRENT_TIME;
- err = ext2_commit_chunk(page, from, to);
- UnlockPage(page);
- ext2_put_page(page);
+ struct ptlrpc_bulk_desc *desc;
+ struct obd_conn conn;
+ void *tmp1, *tmp2, *end2;
+ struct niobuf_remote *remote_nb;
+ struct niobuf_local *local_nb = NULL;
+ struct obd_ioobj *ioo;
+ struct ost_body *body;
+ int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
+ ENTRY;
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+ tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+ end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
+ objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
+ niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
+ cmd = body->data;
+
+ conn.oc_id = body->connid;
+ conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
+
+ for (i = 0; i < objcount; i++) {
+ ost_unpack_ioo(&tmp1, &ioo);
+ if (tmp2 + ioo->ioo_bufcnt > end2) {
+ LBUG();
+ GOTO(out, rc = -EFAULT);
+ }
+ for (j = 0; j < ioo->ioo_bufcnt; j++)
+ ost_unpack_niobuf(&tmp2, &remote_nb);
+ }
+
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ RETURN(rc);
+ OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
+ if (local_nb == NULL)
+ RETURN(-ENOMEM);
+
+ /* The unpackers move tmp1 and tmp2, so reset them before using */
+ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+ tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+ req->rq_status = obd_preprw(cmd, &conn, objcount,
+ tmp1, niocount, tmp2, local_nb);
+
+ if (req->rq_status)
+ GOTO(out_local, 0);
+
+ desc = ptlrpc_prep_bulk(req->rq_connection);
+ if (desc == NULL)
+ GOTO(out_local, rc = -ENOMEM);
+ desc->b_portal = OST_BULK_PORTAL;
+
+ for (i = 0; i < niocount; i++) {
+ struct ptlrpc_bulk_page *bulk;
+ bulk = ptlrpc_prep_bulk_page(desc);
+ if (bulk == NULL)
+ GOTO(out_bulk, rc = -ENOMEM);
+ remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
+ bulk->b_xid = remote_nb->xid;
+ bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
+ bulk->b_buflen = PAGE_SIZE;
+ }
+
+ rc = ptlrpc_send_bulk(desc);
+ if (rc)
+ GOTO(out_bulk, rc);
+
+ ptlrpc_free_bulk(desc);
+
+ /* The unpackers move tmp1 and tmp2, so reset them before using */
+ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+ tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+ req->rq_status = obd_commitrw(cmd, &conn, objcount,
+ tmp1, niocount, local_nb);
+
+ RETURN(rc);
+
+ out_bulk:
+ ptlrpc_free_bulk(desc);
+ out_local:
+ if (local_nb != NULL)
+ OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
+ out:
+ return 0;
}
-static int ext2_commit_chunk(struct page *page, unsigned from, unsigned to)
+static int ost_commit_page(struct obd_conn *conn, struct page *page)
{
- struct inode *dir = page->mapping->host;
- int err = 0;
- dir->i_version = ++event;
- SetPageUptodate(page);
- set_page_clean(page);
-
- //page->mapping->a_ops->commit_write(NULL, page, from, to);
- //if (IS_SYNC(dir))
- // err = waitfor_one_page(page);
- return err;
-}
+ struct obd_ioobj obj;
+ struct niobuf_local buf;
+ int rc;
+ ENTRY;
-#endif
+ memset(&buf, 0, sizeof(buf));
+ memset(&obj, 0, sizeof(obj));
-int ost_prepw(struct ost_obd *obddev, struct ptlrpc_request *req)
-{
-#if 0
- struct obd_conn conn;
- int rc;
- int i, j, n;
- int objcount;
- void *tmp;
- struct niobuf **nb;
- struct obd_ioo **ioo;
-
- ENTRY;
-
- tmp1 = ost_req_buf1(req);
- tmp2 = ost_req_buf2(req);
- objcount = req->buflen1 / sizeof(**ioo);
-
- n = 0;
- for (i=0 ; i<objcount ; i++) {
- obd_unpack_ioo
-
- conn.oc_id = req->rq_req.ost->connid;
- conn.oc_dev = ost->ost_tgt;
-
- rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
- &req->rq_replen, &req->rq_repbuf);
- if (rc) {
- printk("ost_create: cannot pack reply\n");
- return rc;
- }
-
- memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
-
- req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create
- (&conn, &req->rq_rep.ost->oa);
-
- EXIT;
- return 0;
-#endif
- return -ENOTSUPP;
+ buf.page = page;
+ obj.ioo_bufcnt = 1;
+ rc = obd_commitrw(OBD_BRW_WRITE, conn, 1, &obj, 1, &buf);
+ RETURN(rc);
}
+static int ost_brw_write_cb(struct ptlrpc_bulk_page *bulk)
+{
+ void *journal_save;
+ int rc;
+ ENTRY;
+
+ /* Restore the filesystem journal context when we do the commit.
+ * This is needed for ext3 and reiserfs, but can't really hurt
+ * other filesystems.
+ */
+ journal_save = current->journal_info;
+ current->journal_info = bulk->b_desc->b_journal_info;
+ CDEBUG(D_BUFFS, "journal_info: saved %p->%p, restored %p\n", current,
+ journal_save, bulk->b_desc->b_journal_info);
+ rc = ost_commit_page(&bulk->b_desc->b_conn, bulk->b_page);
+ current->journal_info = journal_save;
+ CDEBUG(D_BUFFS, "journal_info: restored %p->%p\n", current,
+ journal_save);
+ if (rc)
+ CERROR("ost_commit_page failed: %d\n", rc);
+
+ RETURN(rc);
+}
-int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req)
+static int ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc)
{
- int rc;
- struct ost_obd *ost = &obddev->u.ost;
- struct ptlreq_hdr *hdr;
-
- ENTRY;
- printk("ost_handle: req at %p\n", req);
-
- hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
- if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
- printk("lustre_ost: wrong packet type sent %d\n",
- NTOH__u32(hdr->type));
- rc = -EINVAL;
- goto out;
- }
-
- rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen,
- &req->rq_reqhdr, &req->rq_req.ost);
- if (rc) {
- printk("lustre_ost: Invalid request\n");
- EXIT;
- goto out;
- }
-
- switch (req->rq_reqhdr->opc) {
-
- case OST_CONNECT:
- CDEBUG(D_INODE, "connect\n");
- printk("----> connect \n");
- rc = ost_connect(ost, req);
- break;
- case OST_DISCONNECT:
- CDEBUG(D_INODE, "disconnect\n");
- rc = ost_disconnect(ost, req);
- break;
- case OST_GET_INFO:
- CDEBUG(D_INODE, "get_info\n");
- rc = ost_get_info(ost, req);
- break;
- case OST_CREATE:
- CDEBUG(D_INODE, "create\n");
- rc = ost_create(ost, req);
- break;
- case OST_DESTROY:
- CDEBUG(D_INODE, "destroy\n");
- rc = ost_destroy(ost, req);
- break;
- case OST_GETATTR:
- CDEBUG(D_INODE, "getattr\n");
- rc = ost_getattr(ost, req);
- break;
- case OST_SETATTR:
- CDEBUG(D_INODE, "setattr\n");
- rc = ost_setattr(ost, req);
- break;
- case OST_PREPW:
- CDEBUG(D_INODE, "prepw\n");
- rc = ost_prepw(ost, req);
- break;
- default:
- req->rq_status = -ENOTSUPP;
- return ost_error(obddev, req);
- }
+ ptlrpc_free_bulk(desc);
-out:
- req->rq_status = rc;
- if (rc) {
- printk("ost: processing error %d\n", rc);
- ost_error(obddev, req);
- } else {
- CDEBUG(D_INODE, "sending reply\n");
- ost_reply(obddev, req);
- }
-
- return 0;
+ return 0;
}
-int ost_main(void *arg)
+static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
{
- struct obd_device *obddev = (struct obd_device *) arg;
- struct ost_obd *ost = &obddev->u.ost;
- ENTRY;
- printk("---> %d\n", __LINE__);
-
-
- lock_kernel();
- printk("---> %d\n", __LINE__);
- daemonize();
- printk("---> %d\n", __LINE__);
- spin_lock_irq(¤t->sigmask_lock);
- printk("---> %d\n", __LINE__);
- sigfillset(¤t->blocked);
- printk("---> %d\n", __LINE__);
- recalc_sigpending(current);
- printk("---> %d\n", __LINE__);
- spin_unlock_irq(¤t->sigmask_lock);
- printk("---> %d\n", __LINE__);
-
- printk("---> %d\n", __LINE__);
- sprintf(current->comm, "lustre_ost");
- printk("---> %d\n", __LINE__);
-
- /* Record that the thread is running */
- ost->ost_thread = current;
- printk("---> %d\n", __LINE__);
- wake_up(&ost->ost_done_waitq);
- printk("---> %d\n", __LINE__);
-
- /* XXX maintain a list of all managed devices: insert here */
-
- /* And now, wait forever for commit wakeup events. */
- while (1) {
- struct ptlrpc_request *request;
- int rc;
-
- if (ost->ost_flags & OST_EXIT)
- break;
-
-
- wake_up(&ost->ost_done_waitq);
- interruptible_sleep_on(&ost->ost_waitq);
-
- CDEBUG(D_INODE, "lustre_ost wakes\n");
- CDEBUG(D_INODE, "pick up req here and continue\n");
-
- if (list_empty(&ost->ost_reqs)) {
- CDEBUG(D_INODE, "woke because of timer\n");
- } else {
- printk("---> %d\n", __LINE__);
- request = list_entry(ost->ost_reqs.next,
- struct ptlrpc_request, rq_list);
- printk("---> %d\n", __LINE__);
- list_del(&request->rq_list);
- rc = ost_handle(obddev, request);
- }
- }
-
- /* XXX maintain a list of all managed devices: cleanup here */
- printk("---> %d\n", __LINE__);
- ost->ost_thread = NULL;
- printk("---> %d\n", __LINE__);
- wake_up(&ost->ost_done_waitq);
- printk("lustre_ost: exiting\n");
- return 0;
+ struct ptlrpc_bulk_desc *desc;
+ struct obd_conn conn;
+ struct niobuf_remote *remote_nb;
+ struct niobuf_local *local_nb, *lnb;
+ struct obd_ioobj *ioo;
+ struct ost_body *body;
+ int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
+ void *tmp1, *tmp2, *end2;
+ ENTRY;
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+ tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+ end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
+ objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
+ niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
+ cmd = body->data;
+
+ conn.oc_id = body->connid;
+ conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
+
+ for (i = 0; i < objcount; i++) {
+ ost_unpack_ioo((void *)&tmp1, &ioo);
+ if (tmp2 + ioo->ioo_bufcnt > end2) {
+ rc = -EFAULT;
+ break;
+ }
+ for (j = 0; j < ioo->ioo_bufcnt; j++)
+ ost_unpack_niobuf((void *)&tmp2, &remote_nb);
+ }
+
+ size[1] = niocount * sizeof(*remote_nb);
+ rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ GOTO(fail, rc);
+ remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
+
+ OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
+ if (local_nb == NULL)
+ GOTO(fail, rc = -ENOMEM);
+
+ /* The unpackers move tmp1 and tmp2, so reset them before using */
+ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+ tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+ req->rq_status = obd_preprw(cmd, &conn, objcount,
+ tmp1, niocount, tmp2, local_nb);
+ if (req->rq_status)
+ GOTO(success, 0);
+
+ desc = ptlrpc_prep_bulk(req->rq_connection);
+ if (desc == NULL)
+ GOTO(fail_preprw, rc = -ENOMEM);
+ desc->b_cb = ost_brw_write_finished_cb;
+ desc->b_portal = OSC_BULK_PORTAL;
+ memcpy(&(desc->b_conn), &conn, sizeof(conn));
+
+ /* Save journal context for commit callbacks */
+ CDEBUG(D_BUFFS, "journal_info: saved %p->%p\n", current,
+ current->journal_info);
+ desc->b_journal_info = current->journal_info;
+
+ for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
+ struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
+ struct ptlrpc_bulk_page *bulk;
+
+ bulk = ptlrpc_prep_bulk_page(desc);
+ if (bulk == NULL)
+ GOTO(fail_bulk, rc = -ENOMEM);
+
+ spin_lock(&srv->srv_lock);
+ bulk->b_xid = srv->srv_xid++;
+ spin_unlock(&srv->srv_lock);
+
+ bulk->b_buf = (void *)(unsigned long)lnb->addr;
+ bulk->b_page = lnb->page;
+ bulk->b_buflen = PAGE_SIZE;
+ bulk->b_cb = ost_brw_write_cb;
+
+ /* this advances remote_nb */
+ ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
+ bulk->b_xid);
+ }
+
+ rc = ptlrpc_register_bulk(desc);
+ current->journal_info = NULL; /* kind of scary */
+ if (rc)
+ GOTO(fail_bulk, rc);
+
+ EXIT;
+ success:
+ OBD_FREE(local_nb, niocount * sizeof(*local_nb));
+ return 0;
+
+ fail_bulk:
+ ptlrpc_free_bulk(desc);
+ fail_preprw:
+ OBD_FREE(local_nb, niocount * sizeof(*local_nb));
+ /* FIXME: how do we undo the preprw? */
+ fail:
+ return rc;
}
-static void ost_stop_srv_thread(struct ost_obd *ost)
+static int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
{
- ost->ost_flags |= OST_EXIT;
+ struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, 0);
- while (ost->ost_thread) {
- wake_up(&ost->ost_waitq);
- sleep_on(&ost->ost_done_waitq);
- }
+ if (body->data == OBD_BRW_READ)
+ return ost_brw_read(obddev, req);
+ else
+ return ost_brw_write(obddev, req);
}
-static void ost_start_srv_thread(struct obd_device *obd)
+static int ost_handle(struct obd_device *obddev, struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
{
- struct ost_obd *ost = &obd->u.ost;
- ENTRY;
-
- init_waitqueue_head(&ost->ost_waitq);
- printk("---> %d\n", __LINE__);
- init_waitqueue_head(&ost->ost_done_waitq);
- printk("---> %d\n", __LINE__);
- kernel_thread(ost_main, (void *)obd,
- CLONE_VM | CLONE_FS | CLONE_FILES);
- printk("---> %d\n", __LINE__);
- while (!ost->ost_thread)
- sleep_on(&ost->ost_done_waitq);
- printk("---> %d\n", __LINE__);
- EXIT;
+ int rc;
+ struct ost_obd *ost = &obddev->u.ost;
+ ENTRY;
+
+ rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
+ if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
+ CERROR("lustre_mds: Invalid request\n");
+ GOTO(out, rc);
+ }
+
+ if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
+ CERROR("lustre_mds: wrong packet type sent %d\n",
+ req->rq_reqmsg->type);
+ GOTO(out, rc = -EINVAL);
+ }
+
+ switch (req->rq_reqmsg->opc) {
+ case OST_CONNECT:
+ CDEBUG(D_INODE, "connect\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
+ rc = ost_connect(ost, req);
+ break;
+ case OST_DISCONNECT:
+ CDEBUG(D_INODE, "disconnect\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
+ rc = ost_disconnect(ost, req);
+ break;
+ case OST_GET_INFO:
+ CDEBUG(D_INODE, "get_info\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_GET_INFO_NET, 0);
+ rc = ost_get_info(ost, req);
+ break;
+ case OST_CREATE:
+ CDEBUG(D_INODE, "create\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
+ rc = ost_create(ost, req);
+ break;
+ case OST_DESTROY:
+ CDEBUG(D_INODE, "destroy\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
+ rc = ost_destroy(ost, req);
+ break;
+ case OST_GETATTR:
+ CDEBUG(D_INODE, "getattr\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
+ rc = ost_getattr(ost, req);
+ break;
+ case OST_SETATTR:
+ CDEBUG(D_INODE, "setattr\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
+ rc = ost_setattr(ost, req);
+ break;
+ case OST_OPEN:
+ CDEBUG(D_INODE, "setattr\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
+ rc = ost_open(ost, req);
+ break;
+ case OST_CLOSE:
+ CDEBUG(D_INODE, "setattr\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
+ rc = ost_close(ost, req);
+ break;
+ case OST_BRW:
+ CDEBUG(D_INODE, "brw\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
+ rc = ost_brw(ost, req);
+ break;
+ case OST_PUNCH:
+ CDEBUG(D_INODE, "punch\n");
+ OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
+ rc = ost_punch(ost, req);
+ break;
+ default:
+ req->rq_status = -ENOTSUPP;
+ rc = ptlrpc_error(svc, req);
+ RETURN(rc);
+ }
+
+ EXIT;
+out:
+ //req->rq_status = rc;
+ if (rc) {
+ CERROR("ost: processing error %d\n", rc);
+ ptlrpc_error(svc, req);
+ } else {
+ CDEBUG(D_INODE, "sending reply\n");
+ ptlrpc_reply(svc, req);
+ }
+
+ return 0;
}
/* mount the file system (secretly) */
-static int ost_setup(struct obd_device *obddev, obd_count len,
- void *buf)
-
+static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
{
- struct obd_ioctl_data* data = buf;
- struct ost_obd *ost = &obddev->u.ost;
- struct obd_device *tgt;
- int err;
+ struct obd_ioctl_data* data = buf;
+ struct ost_obd *ost = &obddev->u.ost;
+ struct obd_device *tgt;
+ int err;
ENTRY;
- if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES) {
- EXIT;
- return -ENODEV;
- }
+ if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES)
+ RETURN(-ENODEV);
+ MOD_INC_USE_COUNT;
tgt = &obd_dev[data->ioc_dev];
- ost->ost_tgt = tgt;
- if ( ! (tgt->obd_flags & OBD_ATTACHED) ||
- ! (tgt->obd_flags & OBD_SET_UP) ){
- printk("device not attached or not set up (%d)\n",
+ ost->ost_tgt = tgt;
+ if (!(tgt->obd_flags & OBD_ATTACHED) ||
+ !(tgt->obd_flags & OBD_SET_UP)) {
+ CERROR("device not attached or not set up (%d)\n",
data->ioc_dev);
- EXIT;
- return -EINVAL;
- }
+ GOTO(error_dec, err = -EINVAL);
+ }
- ost->ost_conn.oc_dev = tgt;
- err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
- if (err) {
- printk("lustre ost: fail to connect to device %d\n",
- data->ioc_dev);
- return -EINVAL;
- }
+ ost->ost_conn.oc_dev = tgt;
+ err = obd_connect(&ost->ost_conn);
+ if (err) {
+ CERROR("fail to connect to device %d\n", data->ioc_dev);
+ GOTO(error_dec, err = -EINVAL);
+ }
- INIT_LIST_HEAD(&ost->ost_reqs);
- ost->ost_thread = NULL;
- ost->ost_flags = 0;
+ ost->ost_service = ptlrpc_init_svc(128 * 1024,
+ OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
+ "self", ost_handle);
+ if (!ost->ost_service) {
+ CERROR("failed to start service\n");
+ GOTO(error_disc, err = -EINVAL);
+ }
- spin_lock_init(&obddev->u.ost.ost_lock);
+ err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
+ if (err)
+ GOTO(error_disc, err = -EINVAL);
+ err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
+ if (err)
+ GOTO(error_disc, err = -EINVAL);
- ost_start_srv_thread(obddev);
+ RETURN(0);
- MOD_INC_USE_COUNT;
- EXIT;
- return 0;
-}
+error_disc:
+ obd_disconnect(&ost->ost_conn);
+error_dec:
+ MOD_DEC_USE_COUNT;
+ RETURN(err);
+}
static int ost_cleanup(struct obd_device * obddev)
{
- struct ost_obd *ost = &obddev->u.ost;
- struct obd_device *tgt;
- int err;
+ struct ost_obd *ost = &obddev->u.ost;
+ int err;
ENTRY;
- if ( !(obddev->obd_flags & OBD_SET_UP) ) {
- EXIT;
- return 0;
- }
-
if ( !list_empty(&obddev->obd_gen_clients) ) {
- printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
- EXIT;
- return -EBUSY;
+ CERROR("still has clients!\n");
+ RETURN(-EBUSY);
}
- ost_stop_srv_thread(ost);
+ ptlrpc_stop_all_threads(ost->ost_service);
+ rpc_unregister_service(ost->ost_service);
- if (!list_empty(&ost->ost_reqs)) {
- // XXX reply with errors and clean up
- CDEBUG(D_INODE, "Request list not empty!\n");
- }
+ if (!list_empty(&ost->ost_service->srv_reqs)) {
+ // XXX reply with errors and clean up
+ CERROR("Request list not empty!\n");
+ }
+ OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
- tgt = ost->ost_tgt;
- err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
- if (err) {
- printk("lustre ost: fail to disconnect device\n");
- return -EINVAL;
- }
-
+ err = obd_disconnect(&ost->ost_conn);
+ if (err) {
+ CERROR("lustre ost: fail to disconnect device\n");
+ RETURN(-EINVAL);
+ }
MOD_DEC_USE_COUNT;
- EXIT;
- return 0;
+ RETURN(0);
}
/* use obd ops to offer management infrastructure */
static int __init ost_init(void)
{
obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
- return 0;
+ return 0;
}
static void __exit ost_exit(void)
{
- obd_unregister_type(LUSTRE_OST_NAME);
+ obd_unregister_type(LUSTRE_OST_NAME);
}
MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
MODULE_LICENSE("GPL");
-// for testing (maybe this stays)
-EXPORT_SYMBOL(ost_queue_req);
-
module_init(ost_init);
module_exit(ost_exit);