/*
- * linux/mds/handler.c
+ * ost/ost_handler.c
+ * Storage Target Handling functions
*
* Lustre Object Server Module (OST)
*
- * Copyright (C) 2001 Cluster File Systems, Inc.
+ * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
*
* This code is issued under the GNU General Public License.
* See the file COPYING in this distribution
*
* by Peter Braam <braam@clusterfs.com>
*
- * This server is single threaded at present (but can easily be multi threaded).
- * For testing and management it is treated as an obd_device, although it does
- * not export a full OBD method table (the requests are coming in over the wire,
- * so object target modules do not have a full method table.)
+ * This server is single threaded at present (but can easily be multi
+ * threaded). For testing and management it is treated as an
+ * obd_device, although it does not export a full OBD method table
+ * (the requests are coming in over the wire, so object target
+ * modules do not have a full method table.)
*
*/
-
#define EXPORT_SYMTAB
#include <linux/version.h>
#include <linux/ext2_fs.h>
#include <linux/quotaops.h>
#include <asm/unistd.h>
+
+#define DEBUG_SUBSYSTEM S_OST
+
#include <linux/obd_support.h>
#include <linux/obd.h>
#include <linux/obd_class.h>
#include <linux/obd_class.h>
// for testing
-static int ost_queue_req(struct obd_device *obddev, struct ost_request *req)
+static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req)
{
- struct ost_request *srv_req;
+ struct ptlrpc_request *srv_req;
struct ost_obd *ost = &obddev->u.ost;
if (!ost) {
return -1;
}
- srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
+ OBD_ALLOC(srv_req, sizeof(*srv_req));
if (!srv_req) {
EXIT;
return -ENOMEM;
}
- printk("---> OST at %d %p, incoming req %p, srv_req %p\n",
+ CDEBUG(0, "---> OST at %d %p, incoming req %p, srv_req %p\n",
__LINE__, ost, req, srv_req);
memset(srv_req, 0, sizeof(*req));
+
+ /* move the request buffer */
srv_req->rq_reqbuf = req->rq_reqbuf;
srv_req->rq_reqlen = req->rq_reqlen;
- srv_req->rq_obd = ost;
+ srv_req->rq_ost = ost;
+
+ /* remember where it came from */
srv_req->rq_reply_handle = req;
list_add(&srv_req->rq_list, &ost->ost_reqs);
return 0;
}
-
-/* XXX replace with networking code */
-int ost_reply(struct obd_device *obddev, struct ost_request *req)
+int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req)
{
- struct ost_request *clnt_req = req->rq_reply_handle;
+ struct ptlrpc_request *clnt_req = req->rq_reply_handle;
ENTRY;
- printk("ost_reply: req %p clnt_req at %p\n", req, clnt_req);
- /* free the request buffer */
- kfree(req->rq_reqbuf);
- req->rq_reqbuf = NULL;
-
- /* move the reply to the client */
- clnt_req->rq_replen = req->rq_replen;
- clnt_req->rq_repbuf = req->rq_repbuf;
+ if (req->rq_ost->ost_service != NULL) {
+ /* This is a request that came from the network via portals. */
- printk("---> client req %p repbuf %p len %d status %d\n",
- clnt_req, clnt_req->rq_repbuf, clnt_req->rq_replen,
- req->rq_rephdr->status);
+ /* FIXME: we need to increment the count of handled events */
+ ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL, 0);
+ } else {
+ /* This is a local request that came from another thread. */
+
+ /* move the reply to the client */
+ clnt_req->rq_replen = req->rq_replen;
+ clnt_req->rq_repbuf = req->rq_repbuf;
+ req->rq_repbuf = NULL;
+ req->rq_replen = 0;
+
+ /* free the request buffer */
+ OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
+ req->rq_reqbuf = NULL;
+
+ /* wake up the client */
+ wake_up_interruptible(&clnt_req->rq_wait_for_rep);
+ }
- req->rq_repbuf = NULL;
- req->rq_replen = 0;
-
- /* free the server request */
- kfree(req);
- /* wake up the client */
- wake_up_interruptible(&clnt_req->rq_wait_for_rep);
EXIT;
return 0;
}
-int ost_error(struct obd_device *obddev, struct ost_request *req)
+int ost_error(struct obd_device *obddev, struct ptlrpc_request *req)
{
- struct ost_rep_hdr *hdr;
+ struct ptlrep_hdr *hdr;
ENTRY;
- hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
+
+ OBD_ALLOC(hdr, sizeof(*hdr));
if (!hdr) {
EXIT;
return -ENOMEM;
hdr->seqno = req->rq_reqhdr->seqno;
hdr->status = req->rq_status;
hdr->type = OST_TYPE_ERR;
+
req->rq_repbuf = (char *)hdr;
+ req->rq_replen = sizeof(*hdr);
EXIT;
return ost_reply(obddev, req);
}
-static int ost_destroy(struct ost_obd *ost, struct ost_request *req)
+static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
{
struct obd_conn conn;
int rc;
ENTRY;
- conn.oc_id = req->rq_req->connid;
+ conn.oc_id = req->rq_req.ost->connid;
conn.oc_dev = ost->ost_tgt;
- rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
+ rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
&req->rq_replen, &req->rq_repbuf);
if (rc) {
printk("ost_destroy: cannot pack reply\n");
return rc;
}
- req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
- (&conn, &req->rq_req->oa);
+ req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
+ (&conn, &req->rq_req.ost->oa);
EXIT;
return 0;
}
-static int ost_getattr(struct ost_obd *ost, struct ost_request *req)
+static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
{
struct obd_conn conn;
int rc;
ENTRY;
- printk("ost getattr entered\n");
- conn.oc_id = req->rq_req->connid;
+ conn.oc_id = req->rq_req.ost->connid;
conn.oc_dev = ost->ost_tgt;
- rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
+ rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
&req->rq_replen, &req->rq_repbuf);
if (rc) {
printk("ost_getattr: cannot pack reply\n");
return rc;
}
- req->rq_rep->oa.o_id = req->rq_req->oa.o_id;
- req->rq_rep->oa.o_valid = req->rq_req->oa.o_valid;
+ req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
+ req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
- req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
- (&conn, &req->rq_rep->oa);
+ req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
+ (&conn, &req->rq_rep.ost->oa);
EXIT;
return 0;
}
-static int ost_create(struct ost_obd *ost, struct ost_request *req)
+static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
{
struct obd_conn conn;
int rc;
ENTRY;
- conn.oc_id = req->rq_req->connid;
+ conn.oc_id = req->rq_req.ost->connid;
conn.oc_dev = ost->ost_tgt;
- rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
+ rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
&req->rq_replen, &req->rq_repbuf);
if (rc) {
printk("ost_create: cannot pack reply\n");
return rc;
}
- memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
+ memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
- req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_create
- (&conn, &req->rq_rep->oa);
+ req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create
+ (&conn, &req->rq_rep.ost->oa);
EXIT;
return 0;
}
-static int ost_setattr(struct ost_obd *ost, struct ost_request *req)
+static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
{
struct obd_conn conn;
int rc;
ENTRY;
- conn.oc_id = req->rq_req->connid;
+ conn.oc_id = req->rq_req.ost->connid;
conn.oc_dev = ost->ost_tgt;
- rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
+ rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
&req->rq_replen, &req->rq_repbuf);
if (rc) {
printk("ost_setattr: cannot pack reply\n");
return rc;
}
- memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
+ memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
+ sizeof(req->rq_req.ost->oa));
- req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
- (&conn, &req->rq_rep->oa);
+ req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
+ (&conn, &req->rq_rep.ost->oa);
EXIT;
return 0;
}
-static int ost_connect(struct ost_obd *ost, struct ost_request *req)
+static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
{
struct obd_conn conn;
int rc;
conn.oc_dev = ost->ost_tgt;
- rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
+ rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
&req->rq_replen, &req->rq_repbuf);
if (rc) {
printk("ost_setattr: cannot pack reply\n");
return rc;
}
- req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
+ req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
- printk("ost_connect: rep buffer %p, id %d\n", req->rq_repbuf,
+ CDEBUG(0, "ost_connect: rep buffer %p, id %d\n", req->rq_repbuf,
conn.oc_id);
- req->rq_rep->connid = conn.oc_id;
+ req->rq_rep.ost->connid = conn.oc_id;
EXIT;
return 0;
}
-
-static int ost_disconnect(struct ost_obd *ost, struct ost_request *req)
+static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
{
struct obd_conn conn;
int rc;
ENTRY;
conn.oc_dev = ost->ost_tgt;
- conn.oc_id = req->rq_req->connid;
+ conn.oc_id = req->rq_req.ost->connid;
- rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
+ rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
&req->rq_replen, &req->rq_repbuf);
if (rc) {
printk("ost_setattr: cannot pack reply\n");
return rc;
}
- req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
+ req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
EXIT;
return 0;
}
-static int ost_get_info(struct ost_obd *ost, struct ost_request *req)
+static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
{
struct obd_conn conn;
int rc;
int vallen;
void *val;
+ char *ptr;
ENTRY;
- conn.oc_id = req->rq_req->connid;
+ conn.oc_id = req->rq_req.ost->connid;
conn.oc_dev = ost->ost_tgt;
- req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
- (&conn, req->rq_req->buflen1, req->rq_req->buf1, &vallen, &val);
-
+ ptr = ost_req_buf1(req->rq_req.ost);
+ req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
+ (&conn, req->rq_req.ost->buflen1, ptr, &vallen, &val);
- rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep,
+ rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
&req->rq_replen, &req->rq_repbuf);
if (rc) {
printk("ost_setattr: cannot pack reply\n");
return 0;
}
+int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
+{
+ struct obd_conn conn;
+ int rc;
+ int i, j;
+ int objcount, niocount;
+ char *tmp1, *tmp2, *end2;
+ char *res;
+ int cmd;
+ struct niobuf *nb, *src, *dst;
+ struct obd_ioobj *ioo;
+ struct ost_req *r = req->rq_req.ost;
+ ENTRY;
+
+ tmp1 = ost_req_buf1(r);
+ tmp2 = ost_req_buf2(r);
+ end2 = tmp2 + req->rq_req.ost->buflen2;
+ objcount = r->buflen1 / sizeof(*ioo);
+ niocount = r->buflen2 / sizeof(*nb);
+ cmd = r->cmd;
+
+ conn.oc_id = req->rq_req.ost->connid;
+ conn.oc_dev = req->rq_ost->ost_tgt;
+
+ rc = ost_pack_rep(NULL, niocount, NULL, 0,
+ &req->rq_rephdr, &req->rq_rep.ost,
+ &req->rq_replen, &req->rq_repbuf);
+ if (rc) {
+ printk("ost_create: cannot pack reply\n");
+ return rc;
+ }
+ res = ost_rep_buf1(req->rq_rep.ost);
+
+ for (i=0; i < objcount; i++) {
+ ost_unpack_ioo((void *)&tmp1, &ioo);
+ if (tmp2 + ioo->ioo_bufcnt > end2) {
+ rc = -EFAULT;
+ break;
+ }
+ for (j = 0 ; j < ioo->ioo_bufcnt ; j++) {
+ ost_unpack_niobuf((void *)&tmp2, &nb);
+ }
+ }
+
+ /* The unpackers move tmp1 and tmp2, so reset them before using */
+ tmp1 = ost_req_buf1(r);
+ tmp2 = ost_req_buf2(r);
+ req->rq_rep.ost->result =
+ req->rq_ost->ost_tgt->obd_type->typ_ops->o_preprw
+ (cmd, &conn, objcount, (struct obd_ioobj *)tmp1,
+ niocount, (struct niobuf *)tmp2, (struct niobuf *)res);
+
+ if (cmd == OBD_BRW_WRITE) {
+ for (i=0; i<niocount; i++) {
+ src = &((struct niobuf *)tmp2)[i];
+ dst = &((struct niobuf *)res)[i];
+ memcpy((void *)(unsigned long)dst->addr,
+ (void *)(unsigned long)src->addr,
+ src->len);
+ }
+ } else {
+ for (i=0; i<niocount; i++) {
+ dst = &((struct niobuf *)tmp2)[i];
+ src = &((struct niobuf *)res)[i];
+ memcpy((void *)(unsigned long)dst->addr,
+ (void *)(unsigned long)src->addr,
+ PAGE_SIZE);
+ }
+ }
+
+ req->rq_rep.ost->result =
+ req->rq_ost->ost_tgt->obd_type->typ_ops->o_commitrw
+ (cmd, &conn, objcount, (struct obd_ioobj *)tmp1,
+ niocount, (struct niobuf *)res);
+
+ EXIT;
+ return 0;
+}
-//int ost_handle(struct ost_conn *conn, int len, char *buf)
-int ost_handle(struct obd_device *obddev, struct ost_request *req)
+int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req)
{
int rc;
struct ost_obd *ost = &obddev->u.ost;
- struct ost_req_hdr *hdr;
+ struct ptlreq_hdr *hdr;
ENTRY;
- printk("ost_handle: req at %p\n", req);
+ CDEBUG(0, "req at %p\n", req);
- hdr = (struct ost_req_hdr *)req->rq_reqbuf;
+ hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
printk("lustre_ost: wrong packet type sent %d\n",
NTOH__u32(hdr->type));
}
rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen,
- &req->rq_reqhdr, &req->rq_req);
+ &req->rq_reqhdr, &req->rq_req.ost);
if (rc) {
printk("lustre_ost: Invalid request\n");
EXIT;
case OST_CONNECT:
CDEBUG(D_INODE, "connect\n");
- printk("----> connect \n");
rc = ost_connect(ost, req);
break;
case OST_DISCONNECT:
CDEBUG(D_INODE, "setattr\n");
rc = ost_setattr(ost, req);
break;
-
+ case OST_BRW:
+ CDEBUG(D_INODE, "brw\n");
+ rc = ost_brw(ost, req);
+ break;
default:
+ req->rq_status = -ENOTSUPP;
return ost_error(obddev, req);
}
out:
- req->rq_rephdr->status = rc;
+ req->rq_status = rc;
if (rc) {
printk("ost: processing error %d\n", rc);
ost_error(obddev, req);
struct obd_device *obddev = (struct obd_device *) arg;
struct ost_obd *ost = &obddev->u.ost;
ENTRY;
- printk("---> %d\n", __LINE__);
-
lock_kernel();
- printk("---> %d\n", __LINE__);
daemonize();
- printk("---> %d\n", __LINE__);
spin_lock_irq(¤t->sigmask_lock);
- printk("---> %d\n", __LINE__);
sigfillset(¤t->blocked);
- printk("---> %d\n", __LINE__);
recalc_sigpending(current);
- printk("---> %d\n", __LINE__);
spin_unlock_irq(¤t->sigmask_lock);
- printk("---> %d\n", __LINE__);
- printk("---> %d\n", __LINE__);
sprintf(current->comm, "lustre_ost");
- printk("---> %d\n", __LINE__);
/* Record that the thread is running */
ost->ost_thread = current;
- printk("---> %d\n", __LINE__);
wake_up(&ost->ost_done_waitq);
- printk("---> %d\n", __LINE__);
/* XXX maintain a list of all managed devices: insert here */
/* And now, wait forever for commit wakeup events. */
while (1) {
- struct ost_request *request;
int rc;
if (ost->ost_flags & OST_EXIT)
break;
-
wake_up(&ost->ost_done_waitq);
interruptible_sleep_on(&ost->ost_waitq);
CDEBUG(D_INODE, "lustre_ost wakes\n");
CDEBUG(D_INODE, "pick up req here and continue\n");
- if (list_empty(&ost->ost_reqs)) {
- CDEBUG(D_INODE, "woke because of timer\n");
- } else {
- printk("---> %d\n", __LINE__);
- request = list_entry(ost->ost_reqs.next,
- struct ost_request, rq_list);
- printk("---> %d\n", __LINE__);
- list_del(&request->rq_list);
- rc = ost_handle(obddev, request);
+
+ if (ost->ost_service != NULL) {
+ ptl_event_t ev;
+
+ while (1) {
+ struct ptlrpc_request request;
+ struct ptlrpc_service *service;
+
+ rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev);
+ if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
+ break;
+
+ service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
+
+ /* FIXME: If we move to an event-driven model,
+ * we should put the request on the stack of
+ * mds_handle instead. */
+ memset(&request, 0, sizeof(request));
+ request.rq_reqbuf = ev.mem_desc.start +
+ ev.offset;
+ request.rq_reqlen = ev.mem_desc.length;
+ request.rq_ost = ost;
+ request.rq_xid = ev.match_bits;
+
+ request.rq_peer.peer_nid = ev.initiator.nid;
+ /* FIXME: this NI should be the incoming NI.
+ * We don't know how to find that from here. */
+ request.rq_peer.peer_ni =
+ ost->ost_service->srv_self.peer_ni;
+ rc = ost_handle(obddev, &request);
+
+ /* Inform the rpc layer the event has been handled */
+ ptl_received_rpc(service);
+ }
+ } else {
+ struct ptlrpc_request *request;
+
+ if (list_empty(&ost->ost_reqs)) {
+ CDEBUG(D_INODE, "woke because of timer\n");
+ } else {
+ request = list_entry(ost->ost_reqs.next,
+ struct ptlrpc_request,
+ rq_list);
+ list_del(&request->rq_list);
+ rc = ost_handle(obddev, request);
+ }
}
}
/* XXX maintain a list of all managed devices: cleanup here */
- printk("---> %d\n", __LINE__);
+
ost->ost_thread = NULL;
- printk("---> %d\n", __LINE__);
wake_up(&ost->ost_done_waitq);
printk("lustre_ost: exiting\n");
return 0;
ENTRY;
init_waitqueue_head(&ost->ost_waitq);
- printk("---> %d\n", __LINE__);
init_waitqueue_head(&ost->ost_done_waitq);
- printk("---> %d\n", __LINE__);
kernel_thread(ost_main, (void *)obd,
CLONE_VM | CLONE_FS | CLONE_FILES);
- printk("---> %d\n", __LINE__);
while (!ost->ost_thread)
sleep_on(&ost->ost_done_waitq);
- printk("---> %d\n", __LINE__);
EXIT;
}
struct obd_ioctl_data* data = buf;
struct ost_obd *ost = &obddev->u.ost;
struct obd_device *tgt;
+ struct lustre_peer peer;
int err;
ENTRY;
ost->ost_thread = NULL;
ost->ost_flags = 0;
- spin_lock_init(&obddev->u.ost.fo_lock);
+ spin_lock_init(&obddev->u.ost.ost_lock);
+
+ err = kportal_uuid_to_peer("self", &peer);
+ if (err == 0) {
+ OBD_ALLOC(ost->ost_service, sizeof(*ost->ost_service));
+ if (ost->ost_service == NULL)
+ return -ENOMEM;
+ ost->ost_service->srv_buf_size = 64 * 1024;
+ ost->ost_service->srv_portal = OST_REQUEST_PORTAL;
+ memcpy(&ost->ost_service->srv_self, &peer, sizeof(peer));
+ ost->ost_service->srv_wait_queue = &ost->ost_waitq;
+
+ rpc_register_service(ost->ost_service, "self");
+ }
ost_start_srv_thread(obddev);
}
ost_stop_srv_thread(ost);
+ rpc_unregister_service(ost->ost_service);
+ OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
if (!list_empty(&ost->ost_reqs)) {
// XXX reply with errors and clean up