/*
- * linux/fs/ext2_obd/ext2_obd.c
+ * linux/mds/handler.c
+ *
+ * Lustre Metadata Server (mds) request handler
+ *
+ * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
*
- * Copyright (C) 2001 Cluster File Systems, Inc.
+ * This code is issued under the GNU General Public License.
+ * See the file COPYING in this distribution
*
- * This code is issued under the GNU General Public License.
- * See the file COPYING in this distribution
- *
- * by Peter Braam <braam@clusterfs.com>
+ * by Peter Braam <braam@clusterfs.com>
+ *
+ * This server is single threaded at present (but can easily be multi threaded).
+ *
*/
+
#define EXPORT_SYMTAB
#include <linux/version.h>
#include <linux/ext2_fs.h>
#include <linux/quotaops.h>
#include <asm/unistd.h>
+#include <asm/uaccess.h>
#include <linux/obd_support.h>
#include <linux/obd.h>
#include <linux/lustre_lib.h>
#include <linux/lustre_idl.h>
#include <linux/lustre_mds.h>
+#include <linux/lustre_net.h>
#include <linux/obd_class.h>
+// XXX for testing
+static struct mds_obd *MDS;
-static struct dentry *mds_fid2dentry(struct mds_obd *mds, struct lustre_fid *fid)
+// XXX make this networked!
+static int mds_queue_req(struct ptlrpc_request *req)
{
- struct dentry *de;
- struct inode *inode;
+ struct ptlrpc_request *srv_req;
+
+ if (!MDS) {
+ EXIT;
+ return -1;
+ }
- inode = iget(mds->mds_sb, fid->id);
- if (!inode) {
+ srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
+ if (!srv_req) {
EXIT;
+ return -ENOMEM;
}
- de = d_alloc_root(inode);
- if (!de) {
- iput(inode);
+ printk("---> MDS at %d %p, incoming req %p, srv_req %p\n",
+ __LINE__, MDS, req, srv_req);
+
+ memset(srv_req, 0, sizeof(*req));
+
+ /* move the request buffer */
+ srv_req->rq_reqbuf = req->rq_reqbuf;
+ srv_req->rq_reqlen = req->rq_reqlen;
+ srv_req->rq_obd = MDS;
+
+ /* remember where it came from */
+ srv_req->rq_reply_handle = req;
+
+ list_add(&srv_req->rq_list, &MDS->mds_reqs);
+ wake_up(&MDS->mds_waitq);
+ return 0;
+}
+
+/* XXX do this over the net */
+int mds_sendpage(struct ptlrpc_request *req, struct file *file,
+ __u64 offset, struct niobuf *dst)
+{
+ int rc;
+ mm_segment_t oldfs = get_fs();
+
+ if (req->rq_peer.peer_nid == 0) {
+ /* dst->addr is a user address, but in a different task! */
+ set_fs(KERNEL_DS);
+ rc = generic_file_read(file, (char *)(long)dst->addr,
+ PAGE_SIZE, &offset);
+ set_fs(oldfs);
+
+ if (rc != PAGE_SIZE)
+ return -EIO;
+ } else {
+ char *buf;
+
+ buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
+ if (!buf) {
+ return -ENOMEM;
+ }
+
+ set_fs(KERNEL_DS);
+ rc = generic_file_read(file, buf, PAGE_SIZE, &offset);
+ set_fs(oldfs);
+
+ if (rc != PAGE_SIZE)
+ return -EIO;
+
+ req->rq_bulkbuf = buf;
+ req->rq_bulklen = PAGE_SIZE;
+ rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL, 0);
+ init_waitqueue_head(&req->rq_wait_for_bulk);
+ sleep_on(&req->rq_wait_for_bulk);
+ kfree(buf);
+ req->rq_bulklen = 0; /* FIXME: eek. */
+ }
+
+ return 0;
+}
+
+/* XXX replace with networking code */
+int mds_reply(struct ptlrpc_request *req)
+{
+ struct ptlrpc_request *clnt_req = req->rq_reply_handle;
+
+ ENTRY;
+
+ if (req->rq_obd->mds_service != NULL) {
+ /* This is a request that came from the network via portals. */
+
+ /* FIXME: we need to increment the count of handled events */
+ ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0);
+ } else {
+ /* This is a local request that came from another thread. */
+
+ /* move the reply to the client */
+ clnt_req->rq_replen = req->rq_replen;
+ clnt_req->rq_repbuf = req->rq_repbuf;
+ req->rq_repbuf = NULL;
+ req->rq_replen = 0;
+
+ /* free the request buffer */
+ kfree(req->rq_reqbuf);
+ req->rq_reqbuf = NULL;
+
+ /* wake up the client */
+ wake_up_interruptible(&clnt_req->rq_wait_for_rep);
+ }
+
+ EXIT;
+ return 0;
+}
+
+int mds_error(struct ptlrpc_request *req)
+{
+ struct ptlrep_hdr *hdr;
+
+ ENTRY;
+
+ hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
+ if (!hdr) {
EXIT;
- return NULL;
+ return -ENOMEM;
+ }
+
+ memset(hdr, 0, sizeof(*hdr));
+
+ hdr->seqno = req->rq_reqhdr->seqno;
+ hdr->status = req->rq_status;
+ hdr->type = MDS_TYPE_ERR;
+
+ req->rq_repbuf = (char *)hdr;
+ req->rq_replen = sizeof(*hdr);
+
+ EXIT;
+ return mds_reply(req);
+}
+
+struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt)
+{
+ /* stolen from NFS */
+ struct super_block *sb = mds->mds_sb;
+ unsigned long ino = fid->id;
+ //__u32 generation = fid->generation;
+ __u32 generation = 0;
+ struct inode *inode;
+ struct list_head *lp;
+ struct dentry *result;
+
+ if (mnt) {
+ *mnt = mntget(mds->mds_vfsmnt);
+ }
+
+ if (ino == 0)
+ return ERR_PTR(-ESTALE);
+
+ inode = iget(sb, ino);
+ if (inode == NULL)
+ return ERR_PTR(-ENOMEM);
+
+ printk("--> mds_fid2dentry: sb %p\n", inode->i_sb);
+
+ if (is_bad_inode(inode)
+ || (generation && inode->i_generation != generation)
+ ) {
+ /* we didn't find the right inode.. */
+ printk(__FUNCTION__
+ "bad inode %lu, link: %d ct: %d or version %u/%u\n",
+ inode->i_ino,
+ inode->i_nlink, atomic_read(&inode->i_count),
+ inode->i_generation,
+ generation);
+ iput(inode);
+ return ERR_PTR(-ESTALE);
}
- de->d_inode = inode;
- return de;
+ /* now to find a dentry.
+ * If possible, get a well-connected one
+ */
+ spin_lock(&dcache_lock);
+ for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
+ result = list_entry(lp,struct dentry, d_alias);
+ if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
+ dget_locked(result);
+ result->d_vfs_flags |= DCACHE_REFERENCED;
+ spin_unlock(&dcache_lock);
+ iput(inode);
+ return result;
+ }
+ }
+ spin_unlock(&dcache_lock);
+ result = d_alloc_root(inode);
+ if (result == NULL) {
+ iput(inode);
+ return ERR_PTR(-ENOMEM);
+ }
+ result->d_flags |= DCACHE_NFSD_DISCONNECTED;
+ return result;
}
-int mds_getattr(struct mds_request *req)
+static inline void mds_get_objid(struct inode *inode, __u64 *id)
{
- struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req->fid1);
+ memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
+}
+
+int mds_getattr(struct ptlrpc_request *req)
+{
+ struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1,
+ NULL);
struct inode *inode;
- struct mds_rep *rep = req->rq_rep;
+ struct mds_rep *rep;
int rc;
- rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
+ rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds,
&req->rq_replen, &req->rq_repbuf);
if (rc) {
EXIT;
printk("mds: out of memory\n");
req->rq_status = -ENOMEM;
- return -ENOMEM;
+ return 0;
}
req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
+ rep = req->rq_rep.mds;
if (!de) {
EXIT;
}
inode = de->d_inode;
+ rep->ino = inode->i_ino;
rep->atime = inode->i_atime;
rep->ctime = inode->i_ctime;
rep->mtime = inode->i_mtime;
rep->gid = inode->i_gid;
rep->size = inode->i_size;
rep->mode = inode->i_mode;
-
+ rep->nlink = inode->i_nlink;
+ rep->valid = ~0;
+ mds_get_objid(inode, &rep->objid);
+ dput(de);
return 0;
}
-int mds_reply(struct mds_request *req)
+int mds_readpage(struct ptlrpc_request *req)
{
- ENTRY;
- kfree(req->rq_reqbuf);
- req->rq_reqbuf = NULL;
- wake_up_interruptible(&req->rq_wait_for_mds_rep);
- EXIT;
- return 0;
-}
+ struct vfsmount *mnt;
+ struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1,
+ &mnt);
+ struct file *file;
+ struct niobuf *niobuf;
+ struct mds_rep *rep;
+ int rc;
+
+ printk("mds_readpage: ino %ld\n", de->d_inode->i_ino);
+ rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds,
+ &req->rq_replen, &req->rq_repbuf);
+ if (rc) {
+ EXIT;
+ printk("mds: out of memory\n");
+ req->rq_status = -ENOMEM;
+ return 0;
+ }
-int mds_error(struct mds_request *req)
-{
- struct mds_rep_hdr *hdr;
+ req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
+ rep = req->rq_rep.mds;
- ENTRY;
- hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
- if (!hdr) {
+ if (IS_ERR(de)) {
EXIT;
- return -ENOMEM;
+ req->rq_rephdr->status = PTR_ERR(de);
+ return 0;
}
- memset(hdr, 0, sizeof(*hdr));
-
- hdr->seqno = req->rq_reqhdr->seqno;
- hdr->status = req->rq_status;
- hdr->type = MDS_TYPE_ERR;
- req->rq_repbuf = (char *)hdr;
+ file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
+ /* note: in case of an error, dentry_open puts dentry */
+ if (IS_ERR(file)) {
+ EXIT;
+ req->rq_rephdr->status = PTR_ERR(file);
+ return 0;
+ }
+
+ niobuf = mds_req_tgt(req->rq_req.mds);
+
+ /* to make this asynchronous make sure that the handling function
+ doesn't send a reply when this function completes. Instead a
+ callback function would send the reply */
+ rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf);
+ filp_close(file, 0);
+ req->rq_rephdr->status = rc;
EXIT;
- return mds_reply(req);
+ return 0;
+}
+
+int mds_reint(struct ptlrpc_request *req)
+{
+ int rc;
+ char *buf = mds_req_tgt(req->rq_req.mds);
+ int len = req->rq_req.mds->tgtlen;
+ struct mds_update_record rec;
+
+ rc = mds_update_unpack(buf, len, &rec);
+ if (rc) {
+ printk(__FUNCTION__ ": invalid record\n");
+ req->rq_status = -EINVAL;
+ return 0;
+ }
+ /* rc will be used to interrupt a for loop over multiple records */
+ rc = mds_reint_rec(&rec, req);
+ return 0;
}
//int mds_handle(struct mds_conn *conn, int len, char *buf)
-int mds_handle(struct mds_request *req)
+int mds_handle(struct ptlrpc_request *req)
{
int rc;
- struct mds_req_hdr *hdr;
+ struct ptlreq_hdr *hdr;
ENTRY;
- hdr = (struct mds_req_hdr *)req->rq_reqbuf;
+ hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
- if (NTOH_u32(hdr->type) != MDS_TYPE_REQ) {
+ if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
printk("lustre_mds: wrong packet type sent %d\n",
- NTOH_u32(hdr->type));
+ NTOH__u32(hdr->type));
rc = -EINVAL;
goto out;
}
rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen,
- &req->rq_reqhdr, &req->rq_req);
+ &req->rq_reqhdr, &req->rq_req.mds);
if (rc) {
printk("lustre_mds: Invalid request\n");
EXIT;
rc = mds_getattr(req);
break;
- case MDS_OPEN:
- return mds_getattr(req);
-
- case MDS_SETATTR:
- return mds_getattr(req);
-
- case MDS_CREATE:
- return mds_getattr(req);
-
- case MDS_MKDIR:
- return mds_getattr(req);
-
- case MDS_RMDIR:
- return mds_getattr(req);
-
- case MDS_SYMLINK:
- return mds_getattr(req);
-
- case MDS_LINK:
- return mds_getattr(req);
-
- case MDS_MKNOD:
- return mds_getattr(req);
-
- case MDS_UNLINK:
- return mds_getattr(req);
+ case MDS_READPAGE:
+ CDEBUG(D_INODE, "readpage\n");
+ rc = mds_readpage(req);
+ break;
- case MDS_RENAME:
- return mds_getattr(req);
+ case MDS_REINT:
+ CDEBUG(D_INODE, "reint\n");
+ rc = mds_reint(req);
+ break;
default:
return mds_error(req);
out:
if (rc) {
- printk("mds: processing error %d\n", rc);
+ printk(__FUNCTION__ ": no header\n");
+ return 0;
+ }
+
+ if( req->rq_status) {
mds_error(req);
} else {
CDEBUG(D_INODE, "sending reply\n");
mds->mds_thread = current;
wake_up(&mds->mds_done_waitq);
- printk(KERN_INFO "lustre_mds starting. Commit interval %ld seconds\n",
+ printk(KERN_INFO "lustre_mds starting. Commit interval %d seconds\n",
mds->mds_interval / HZ);
/* XXX maintain a list of all managed devices: insert here */
/* And now, wait forever for commit wakeup events. */
while (1) {
+ int rc;
+
if (mds->mds_flags & MDS_UNMOUNT)
break;
-
wake_up(&mds->mds_done_waitq);
interruptible_sleep_on(&mds->mds_waitq);
CDEBUG(D_INODE, "lustre_mds wakes\n");
CDEBUG(D_INODE, "pick up req here and continue\n");
- if (list_empty(&mds->mds_reqs)) {
- CDEBUG(D_INODE, "woke because of timer\n");
+ if (mds->mds_service != NULL) {
+ ptl_event_t ev;
+
+ while (1) {
+ struct ptlrpc_request request;
+
+ rc = PtlEQGet(mds->mds_service->srv_eq, &ev);
+ if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
+ break;
+ /* FIXME: If we move to an event-driven model,
+ * we should put the request on the stack of
+ * mds_handle instead. */
+ memset(&request, 0, sizeof(request));
+ request.rq_reqbuf = ev.mem_desc.start +
+ ev.offset;
+ request.rq_reqlen = ev.mem_desc.length;
+ request.rq_obd = MDS;
+ request.rq_xid = ev.match_bits;
+
+ request.rq_peer.peer_nid = ev.initiator.nid;
+ /* FIXME: this NI should be the incoming NI.
+ * We don't know how to find that from here. */
+ request.rq_peer.peer_ni =
+ mds->mds_service->srv_self.peer_ni;
+ rc = mds_handle(&request);
+ }
+ } else {
+ struct ptlrpc_request *request;
+
+ if (list_empty(&mds->mds_reqs)) {
+ CDEBUG(D_INODE, "woke because of timer\n");
+ } else {
+ request = list_entry(mds->mds_reqs.next,
+ struct ptlrpc_request,
+ rq_list);
+ list_del(&request->rq_list);
+ rc = mds_handle(request);
+ }
}
}
struct obd_ioctl_data* data = buf;
struct mds_obd *mds = &obddev->u.mds;
struct vfsmount *mnt;
+ struct lustre_peer peer;
int err;
ENTRY;
-
- mnt = do_kern_mount(data->ioc_inlbuf2, 0,
- data->ioc_inlbuf1, NULL);
+
+
+ mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
err = PTR_ERR(mnt);
if (IS_ERR(mnt)) {
EXIT;
}
mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
- if (!obddev->u.mds.mds_sb) {
- EXIT;
- return -ENODEV;
- }
+ if (!obddev->u.mds.mds_sb) {
+ EXIT;
+ return -ENODEV;
+ }
- INIT_LIST_HEAD(&mds->mds_reqs);
- mds->mds_thread = NULL;
- mds->mds_flags = 0;
- mds->mds_interval = 3 * HZ;
mds->mds_vfsmnt = mnt;
obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
mds->mds_ctxt.pwdmnt = mnt;
mds->mds_ctxt.pwd = mnt->mnt_root;
mds->mds_ctxt.fs = KERNEL_DS;
+ mds->mds_remote_nid = 0;
- spin_lock_init(&obddev->u.mds.fo_lock);
+ INIT_LIST_HEAD(&mds->mds_reqs);
+ mds->mds_thread = NULL;
+ mds->mds_flags = 0;
+ mds->mds_interval = 3 * HZ;
+ MDS = mds;
+
+ spin_lock_init(&obddev->u.mds.mds_lock);
+
+ err = kportal_uuid_to_peer("self", &peer);
+ if (err == 0) {
+ mds->mds_service = kmalloc(sizeof(*mds->mds_service),
+ GFP_KERNEL);
+ if (mds->mds_service == NULL)
+ return -ENOMEM;
+ mds->mds_service->srv_buf_size = 64 * 1024;
+ mds->mds_service->srv_portal = MDS_REQUEST_PORTAL;
+ memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer));
+ mds->mds_service->srv_wait_queue = &mds->mds_waitq;
+
+ rpc_register_service(mds->mds_service, "self");
+ }
mds_start_srv_thread(mds);
return -EBUSY;
}
+ MDS = NULL;
mds_stop_srv_thread(mds);
sb = mds->mds_sb;
if (!mds->mds_sb){
static int __init mds_init(void)
{
- printk(KERN_INFO "Lustre MDS v0.01, braam@clusterfs.com\n");
obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
+ return 0;
}
static void __exit mds_exit(void)
}
MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
-MODULE_DESCRIPTION("Lustre Metadata server module");
+MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
MODULE_LICENSE("GPL");
+
+// for testing (maybe this stays)
+EXPORT_SYMBOL(mds_queue_req);
+
module_init(mds_init);
module_exit(mds_exit);