X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fmds%2Fhandler.c;h=a2cb3b6427cca6c78ad6ae8e5f24d653036609ea;hb=4196dc4a74f9aeb8ed4ff1bc20bd6d78decf8308;hp=72bfa8bd120f5304417d108457aef2ffb54a42d0;hpb=5bb3ae0f4161ba10a043a526a456889f6e44f39b;p=fs%2Flustre-release.git diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 72bfa8b..a2cb3b6 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -1,20 +1,21 @@ -/* +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * * linux/mds/handler.c - * + * * Lustre Metadata Server (mds) request handler - * + * * Copyright (C) 2001, 2002 Cluster File Systems, Inc. * * This code is issued under the GNU General Public License. * See the file COPYING in this distribution * * by Peter Braam - * - * This server is single threaded at present (but can easily be multi threaded). - * + * + * This server is single threaded at present (but can easily be multi threaded) + * */ - #define EXPORT_SYMTAB #include @@ -22,595 +23,551 @@ #include #include #include -#include #include #include #include -#include -#include -#include -#include + +#define DEBUG_SUBSYSTEM S_MDS + #include +#include #include -#include -// XXX for testing -static struct mds_obd *MDS; - -// XXX make this networked! -static int mds_queue_req(struct ptlrpc_request *req) +int mds_sendpage(struct ptlrpc_request *req, struct file *file, + __u64 offset, struct niobuf *dst) { - struct ptlrpc_request *srv_req; - - if (!MDS) { - EXIT; - return -1; - } - - srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL); - if (!srv_req) { - EXIT; - return -ENOMEM; - } - - printk("---> MDS at %d %p, incoming req %p, srv_req %p\n", - __LINE__, MDS, req, srv_req); - - memset(srv_req, 0, sizeof(*req)); - - /* move the request buffer */ - srv_req->rq_reqbuf = req->rq_reqbuf; - srv_req->rq_reqlen = req->rq_reqlen; - srv_req->rq_obd = MDS; - - /* remember where it came from */ - srv_req->rq_reply_handle = req; - - list_add(&srv_req->rq_list, &MDS->mds_reqs); - wake_up(&MDS->mds_waitq); - return 0; -} + int rc = 0; + mm_segment_t oldfs = get_fs(); + struct ptlrpc_bulk_desc *bulk; + char *buf; + + bulk = ptlrpc_prep_bulk(req->rq_connection); + if (bulk == NULL) { + rc = -ENOMEM; + GOTO(out, rc); + } -/* XXX do this over the net */ -int mds_sendpage(struct ptlrpc_request *req, struct file *file, - __u64 offset, struct niobuf *dst) -{ - int rc; - mm_segment_t oldfs = get_fs(); - /* dst->addr is a user address, but in a different task! */ - set_fs(KERNEL_DS); - rc = generic_file_read(file, (char *)(long)dst->addr, - PAGE_SIZE, &offset); - set_fs(oldfs); - - if (rc != PAGE_SIZE) - return -EIO; - return 0; -} + bulk->b_xid = req->rq_reqmsg->xid; -/* XXX replace with networking code */ -int mds_reply(struct ptlrpc_request *req) -{ - struct ptlrpc_request *clnt_req = req->rq_reply_handle; - - ENTRY; - - if (req->rq_obd->mds_service != NULL) { - /* This is a request that came from the network via portals. */ - - /* FIXME: we need to increment the count of handled events */ - ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0); - } else { - /* This is a local request that came from another thread. */ - - /* move the reply to the client */ - clnt_req->rq_replen = req->rq_replen; - clnt_req->rq_repbuf = req->rq_repbuf; - req->rq_repbuf = NULL; - req->rq_replen = 0; - - /* free the request buffer */ - kfree(req->rq_reqbuf); - req->rq_reqbuf = NULL; - - /* wake up the client */ - wake_up_interruptible(&clnt_req->rq_wait_for_rep); - } - - EXIT; - return 0; -} + OBD_ALLOC(buf, PAGE_SIZE); + if (!buf) { + rc = -ENOMEM; + GOTO(cleanup_bulk, rc); + } -int mds_error(struct ptlrpc_request *req) -{ - struct ptlrep_hdr *hdr; + set_fs(KERNEL_DS); + rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE, + &offset); + set_fs(oldfs); - ENTRY; + if (rc != PAGE_SIZE) { + rc = -EIO; + GOTO(cleanup_buf, rc); + } - hdr = kmalloc(sizeof(*hdr), GFP_KERNEL); - if (!hdr) { - EXIT; - return -ENOMEM; - } + bulk->b_buf = buf; + bulk->b_buflen = PAGE_SIZE; - memset(hdr, 0, sizeof(*hdr)); - - hdr->seqno = req->rq_reqhdr->seqno; - hdr->status = req->rq_status; - hdr->type = MDS_TYPE_ERR; + rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) { + CERROR("obd_fail_loc=%x, fail operation rc=%d\n", + OBD_FAIL_MDS_SENDPAGE, rc); + PtlMDUnlink(bulk->b_md_h); + GOTO(cleanup_buf, rc); + } + wait_event_interruptible(bulk->b_waitq, + ptlrpc_check_bulk_sent(bulk)); - req->rq_repbuf = (char *)hdr; - req->rq_replen = sizeof(*hdr); + if (bulk->b_flags == PTL_RPC_INTR) { + rc = -EINTR; + GOTO(cleanup_buf, rc); + } - EXIT; - return mds_reply(req); + EXIT; + cleanup_buf: + OBD_FREE(buf, PAGE_SIZE); + cleanup_bulk: + OBD_FREE(bulk, sizeof(*bulk)); + out: + return rc; } -struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt) +struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, + struct vfsmount **mnt) { - /* stolen from NFS */ - struct super_block *sb = mds->mds_sb; - unsigned long ino = fid->id; - //__u32 generation = fid->generation; - __u32 generation = 0; - struct inode *inode; - struct list_head *lp; - struct dentry *result; - - if (mnt) { - *mnt = mntget(mds->mds_vfsmnt); - } - - if (ino == 0) - return ERR_PTR(-ESTALE); - - inode = iget(sb, ino); - if (inode == NULL) - return ERR_PTR(-ENOMEM); - - printk("--> mds_fid2dentry: sb %p\n", inode->i_sb); - - if (is_bad_inode(inode) - || (generation && inode->i_generation != generation) - ) { - /* we didn't find the right inode.. */ - printk(__FUNCTION__ - "bad inode %lu, link: %d ct: %d or version %u/%u\n", - inode->i_ino, - inode->i_nlink, atomic_read(&inode->i_count), - inode->i_generation, - generation); - iput(inode); - return ERR_PTR(-ESTALE); - } - - /* now to find a dentry. - * If possible, get a well-connected one - */ - spin_lock(&dcache_lock); - for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) { - result = list_entry(lp,struct dentry, d_alias); - if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) { - dget_locked(result); - result->d_vfs_flags |= DCACHE_REFERENCED; - spin_unlock(&dcache_lock); - iput(inode); - return result; - } - } - spin_unlock(&dcache_lock); - result = d_alloc_root(inode); - if (result == NULL) { - iput(inode); - return ERR_PTR(-ENOMEM); - } - result->d_flags |= DCACHE_NFSD_DISCONNECTED; - return result; -} + /* stolen from NFS */ + struct super_block *sb = mds->mds_sb; + unsigned long ino = fid->id; + __u32 generation = fid->generation; + struct inode *inode; + struct list_head *lp; + struct dentry *result; + + if (ino == 0) + return ERR_PTR(-ESTALE); + + inode = iget(sb, ino); + if (inode == NULL) + return ERR_PTR(-ENOMEM); + + CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb); + + if (is_bad_inode(inode) || + (generation && inode->i_generation != generation)) { + /* we didn't find the right inode.. */ + CERROR("bad inode %lu, link: %d ct: %d or version %u/%u\n", + inode->i_ino, + inode->i_nlink, atomic_read(&inode->i_count), + inode->i_generation, + generation); + LBUG(); + iput(inode); + return ERR_PTR(-ESTALE); + } -static inline void mds_get_objid(struct inode *inode, __u64 *id) -{ - memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id)); + /* now to find a dentry. + * If possible, get a well-connected one + */ + if (mnt) + *mnt = mds->mds_vfsmnt; + spin_lock(&dcache_lock); + for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) { + result = list_entry(lp,struct dentry, d_alias); + if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) { + dget_locked(result); + result->d_vfs_flags |= DCACHE_REFERENCED; + spin_unlock(&dcache_lock); + iput(inode); + if (mnt) + mntget(*mnt); + return result; + } + } + spin_unlock(&dcache_lock); + result = d_alloc_root(inode); + if (result == NULL) { + iput(inode); + return ERR_PTR(-ENOMEM); + } + if (mnt) + mntget(*mnt); + result->d_flags |= DCACHE_NFSD_DISCONNECTED; + return result; } int mds_getattr(struct ptlrpc_request *req) { - struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, - NULL); - struct inode *inode; - struct mds_rep *rep; - int rc; - - rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - EXIT; - printk("mds: out of memory\n"); - req->rq_status = -ENOMEM; - return 0; - } - - req->rq_rephdr->seqno = req->rq_reqhdr->seqno; - rep = req->rq_rep.mds; - - if (!de) { - EXIT; - req->rq_rephdr->status = -ENOENT; - return 0; - } - - inode = de->d_inode; - rep->ino = inode->i_ino; - rep->atime = inode->i_atime; - rep->ctime = inode->i_ctime; - rep->mtime = inode->i_mtime; - rep->uid = inode->i_uid; - rep->gid = inode->i_gid; - rep->size = inode->i_size; - rep->mode = inode->i_mode; - rep->nlink = inode->i_nlink; - rep->valid = ~0; - mds_get_objid(inode, &rep->objid); - dput(de); - return 0; -} + struct dentry *de; + struct inode *inode; + struct mds_body *body; + struct mds_obd *mds = &req->rq_obd->u.mds; + int rc, size = sizeof(*body); + ENTRY; -int mds_readpage(struct ptlrpc_request *req) -{ - struct vfsmount *mnt; - struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, - &mnt); - struct file *file; - struct niobuf *niobuf; - struct mds_rep *rep; - int rc; - - printk("mds_readpage: ino %ld\n", de->d_inode->i_ino); - rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - EXIT; - printk("mds: out of memory\n"); - req->rq_status = -ENOMEM; - return 0; - } - - req->rq_rephdr->seqno = req->rq_reqhdr->seqno; - rep = req->rq_rep.mds; - - if (IS_ERR(de)) { - EXIT; - req->rq_rephdr->status = PTR_ERR(de); - return 0; - } - - file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); - /* note: in case of an error, dentry_open puts dentry */ - if (IS_ERR(file)) { - EXIT; - req->rq_rephdr->status = PTR_ERR(file); - return 0; - } - - niobuf = mds_req_tgt(req->rq_req.mds); - - /* to make this asynchronous make sure that the handling function - doesn't send a reply when this function completes. Instead a - callback function would send the reply */ - rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf); - - filp_close(file, 0); - req->rq_rephdr->status = rc; - EXIT; - return 0; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { + CERROR("mds: out of memory\n"); + req->rq_status = -ENOMEM; + RETURN(0); + } + + body = lustre_msg_buf(req->rq_reqmsg, 0); + de = mds_fid2dentry(mds, &body->fid1, NULL); + if (IS_ERR(de)) { + req->rq_status = -ENOENT; + RETURN(0); + } + + body = lustre_msg_buf(req->rq_repmsg, 0); + inode = de->d_inode; + body->ino = inode->i_ino; + body->generation = inode->i_generation; + body->atime = inode->i_atime; + body->ctime = inode->i_ctime; + body->mtime = inode->i_mtime; + body->uid = inode->i_uid; + body->gid = inode->i_gid; + body->size = inode->i_size; + body->mode = inode->i_mode; + body->nlink = inode->i_nlink; + body->valid = ~0; + mds_fs_get_objid(mds, inode, &body->objid); + l_dput(de); + RETURN(0); } -int mds_reint(struct ptlrpc_request *req) +int mds_open(struct ptlrpc_request *req) { - int rc; - char *buf = mds_req_tgt(req->rq_req.mds); - int len = req->rq_req.mds->tgtlen; - struct mds_update_record rec; - - rc = mds_update_unpack(buf, len, &rec); - if (rc) { - printk(__FUNCTION__ ": invalid record\n"); - req->rq_status = -EINVAL; - return 0; - } - /* rc will be used to interrupt a for loop over multiple records */ - rc = mds_reint_rec(&rec, req); - return 0; + struct dentry *de; + struct mds_body *body; + struct file *file; + struct vfsmount *mnt; + __u32 flags; + int rc, size = sizeof(*body); + ENTRY; + + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) { + CERROR("mds: out of memory\n"); + req->rq_status = -ENOMEM; + RETURN(0); + } + + body = lustre_msg_buf(req->rq_reqmsg, 0); + de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt); + if (IS_ERR(de)) { + req->rq_status = -ENOENT; + RETURN(0); + } + flags = body->flags; + file = dentry_open(de, mnt, flags); + if (!file || IS_ERR(file)) { + req->rq_status = -EINVAL; + RETURN(0); + } + + body = lustre_msg_buf(req->rq_repmsg, 0); + body->objid = (__u64) (unsigned long)file; + RETURN(0); } -//int mds_handle(struct mds_conn *conn, int len, char *buf) -int mds_handle(struct ptlrpc_request *req) +int mds_close(struct ptlrpc_request *req) { - int rc; - struct ptlreq_hdr *hdr; - - ENTRY; + struct dentry *de; + struct mds_body *body; + struct file *file; + struct vfsmount *mnt; + int rc; + ENTRY; - hdr = (struct ptlreq_hdr *)req->rq_reqbuf; + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) { + CERROR("mds: out of memory\n"); + req->rq_status = -ENOMEM; + RETURN(0); + } - if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) { - printk("lustre_mds: wrong packet type sent %d\n", - NTOH__u32(hdr->type)); - rc = -EINVAL; - goto out; - } + body = lustre_msg_buf(req->rq_reqmsg, 0); + de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt); + if (IS_ERR(de)) { + req->rq_status = -ENOENT; + RETURN(0); + } - rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, - &req->rq_reqhdr, &req->rq_req.mds); - if (rc) { - printk("lustre_mds: Invalid request\n"); - EXIT; - goto out; - } + file = (struct file *)(unsigned long)body->objid; + req->rq_status = filp_close(file, 0); + l_dput(de); + mntput(mnt); - switch (req->rq_reqhdr->opc) { + RETURN(0); +} - case MDS_GETATTR: - CDEBUG(D_INODE, "getattr\n"); - rc = mds_getattr(req); - break; +int mds_readpage(struct ptlrpc_request *req) +{ + struct vfsmount *mnt; + struct dentry *de; + struct file *file; + struct niobuf *niobuf; + struct mds_body *body; + int rc, size = sizeof(*body); + ENTRY; - case MDS_READPAGE: - CDEBUG(D_INODE, "readpage\n"); - rc = mds_readpage(req); - break; + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) { + CERROR("mds: out of memory\n"); + req->rq_status = -ENOMEM; + RETURN(0); + } - case MDS_REINT: - CDEBUG(D_INODE, "reint\n"); - rc = mds_reint(req); - break; + body = lustre_msg_buf(req->rq_reqmsg, 0); + de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt); + if (IS_ERR(de)) { + req->rq_status = PTR_ERR(de); + RETURN(0); + } - default: - return mds_error(req); - } + CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino); -out: - if (rc) { - printk(__FUNCTION__ ": no header\n"); - return 0; - } - - if( req->rq_status) { - mds_error(req); - } else { - CDEBUG(D_INODE, "sending reply\n"); - mds_reply(req); - } - - return 0; -} + file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); + /* note: in case of an error, dentry_open puts dentry */ + if (IS_ERR(file)) { + req->rq_status = PTR_ERR(file); + RETURN(0); + } + niobuf = lustre_msg_buf(req->rq_reqmsg, 1); + if (!niobuf) { + req->rq_status = -EINVAL; + LBUG(); + RETURN(0); + } -static void mds_timer_run(unsigned long __data) -{ - struct task_struct * p = (struct task_struct *) __data; + /* to make this asynchronous make sure that the handling function + doesn't send a reply when this function completes. Instead a + callback function would send the reply */ + rc = mds_sendpage(req, file, body->size, niobuf); - wake_up_process(p); + filp_close(file, 0); + req->rq_status = rc; + RETURN(0); } -int mds_main(void *arg) +int mds_reint(struct ptlrpc_request *req) { - struct mds_obd *mds = (struct mds_obd *) arg; - struct timer_list timer; - - lock_kernel(); - daemonize(); - spin_lock_irq(¤t->sigmask_lock); - sigfillset(¤t->blocked); - recalc_sigpending(current); - spin_unlock_irq(¤t->sigmask_lock); - - sprintf(current->comm, "lustre_mds"); - - /* Set up an interval timer which can be used to trigger a - wakeup after the interval expires */ - init_timer(&timer); - timer.data = (unsigned long) current; - timer.function = mds_timer_run; - mds->mds_timer = &timer; - - /* Record that the thread is running */ - mds->mds_thread = current; - wake_up(&mds->mds_done_waitq); - - printk(KERN_INFO "lustre_mds starting. Commit interval %d seconds\n", - mds->mds_interval / HZ); - - /* XXX maintain a list of all managed devices: insert here */ - - /* And now, wait forever for commit wakeup events. */ - while (1) { - int rc; - - if (mds->mds_flags & MDS_UNMOUNT) - break; - - wake_up(&mds->mds_done_waitq); - interruptible_sleep_on(&mds->mds_waitq); - - CDEBUG(D_INODE, "lustre_mds wakes\n"); - CDEBUG(D_INODE, "pick up req here and continue\n"); - - if (mds->mds_service != NULL) { - ptl_event_t ev; - - while (1) { - struct ptlrpc_request request; - - rc = PtlEQGet(mds->mds_service->srv_eq, &ev); - if (rc != PTL_OK && rc != PTL_EQ_DROPPED) - break; - /* FIXME: If we move to an event-driven model, - * we should put the request on the stack of - * mds_handle instead. */ - memset(&request, 0, sizeof(request)); - request.rq_reqbuf = ev.mem_desc.start + - ev.offset; - request.rq_reqlen = ev.mem_desc.length; - request.rq_obd = MDS; - request.rq_xid = ev.match_bits; - - request.rq_peer.peer_nid = ev.initiator.nid; - /* FIXME: this NI should be the incoming NI. - * We don't know how to find that from here. */ - request.rq_peer.peer_ni = - mds->mds_service->srv_self.peer_ni; - rc = mds_handle(&request); - } - } else { - struct ptlrpc_request *request; - - if (list_empty(&mds->mds_reqs)) { - CDEBUG(D_INODE, "woke because of timer\n"); - } else { - request = list_entry(mds->mds_reqs.next, - struct ptlrpc_request, - rq_list); - list_del(&request->rq_list); - rc = mds_handle(request); - } - } - } - - del_timer_sync(mds->mds_timer); - - /* XXX maintain a list of all managed devices: cleanup here */ - - mds->mds_thread = NULL; - wake_up(&mds->mds_done_waitq); - printk("lustre_mds: exiting\n"); - return 0; + int rc; + struct mds_update_record rec; + + rc = mds_update_unpack(req, &rec); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) { + CERROR("invalid record\n"); + req->rq_status = -EINVAL; + RETURN(0); + } + /* rc will be used to interrupt a for loop over multiple records */ + rc = mds_reint_rec(&rec, req); + return 0; } -static void mds_stop_srv_thread(struct mds_obd *mds) +int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc, + struct ptlrpc_request *req) { - mds->mds_flags |= MDS_UNMOUNT; + int rc; + ENTRY; + + rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) { + CERROR("lustre_mds: Invalid request\n"); + GOTO(out, rc); + } - while (mds->mds_thread) { - wake_up(&mds->mds_waitq); - sleep_on(&mds->mds_done_waitq); - } + if (req->rq_reqmsg->type != PTL_RPC_REQUEST) { + CERROR("lustre_mds: wrong packet type sent %d\n", + req->rq_reqmsg->type); + GOTO(out, rc = -EINVAL); + } + + switch (req->rq_reqmsg->opc) { + case MDS_GETATTR: + CDEBUG(D_INODE, "getattr\n"); + OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0); + rc = mds_getattr(req); + break; + + case MDS_READPAGE: + CDEBUG(D_INODE, "readpage\n"); + OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0); + rc = mds_readpage(req); + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) + return 0; + break; + + case MDS_REINT: + CDEBUG(D_INODE, "reint\n"); + OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0); + rc = mds_reint(req); + break; + + case MDS_OPEN: + CDEBUG(D_INODE, "open\n"); + OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0); + rc = mds_open(req); + break; + + case MDS_CLOSE: + CDEBUG(D_INODE, "close\n"); + OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0); + rc = mds_close(req); + break; + + default: + rc = ptlrpc_error(svc, req); + RETURN(rc); + } + + EXIT; +out: + if (rc) { + ptlrpc_error(svc, req); + } else { + CDEBUG(D_NET, "sending reply\n"); + ptlrpc_reply(svc, req); + } + + return 0; } -static void mds_start_srv_thread(struct mds_obd *mds) +static int mds_prep(struct obd_device *obddev) { - init_waitqueue_head(&mds->mds_waitq); - init_waitqueue_head(&mds->mds_done_waitq); - kernel_thread(mds_main, (void *)mds, - CLONE_VM | CLONE_FS | CLONE_FILES); - while (!mds->mds_thread) - sleep_on(&mds->mds_done_waitq); + struct obd_run_ctxt saved; + struct mds_obd *mds = &obddev->u.mds; + struct super_operations *s_ops; + struct file *f; + int err; + + mds->mds_service = ptlrpc_init_svc(128 * 1024, + MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, + "self", mds_handle); + + if (!mds->mds_service) { + CERROR("failed to start service\n"); + RETURN(-EINVAL); + } + + err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds"); + if (err) { + CERROR("cannot start thread\n"); + GOTO(err_svc, err); + } + + push_ctxt(&saved, &mds->mds_ctxt); + err = simple_mkdir(current->fs->pwd, "ROOT", 0700); + if (err && err != -EEXIST) { + CERROR("cannot create ROOT directory\n"); + GOTO(err_svc, err); + } + err = simple_mkdir(current->fs->pwd, "FH", 0700); + if (err && err != -EEXIST) { + CERROR("cannot create FH directory\n"); + GOTO(err_svc, err); + } + f = filp_open("last_rcvd", O_RDWR | O_CREAT, 0644); + if (IS_ERR(f)) { + CERROR("cannot open/create last_rcvd file\n"); + GOTO(err_svc, err = PTR_ERR(f)); + } + mds->last_rcvd = f; + pop_ctxt(&saved); + + /* + * Replace the client filesystem delete_inode method with our own, + * so that we can clear the object ID before the inode is deleted. + * The fs_delete_inode method will call cl_delete_inode for us. + * + * We need to do this for the MDS superblock only, hence we install + * a modified copy of the original superblock method table. + * + * We still assume that there is only a single MDS client filesystem + * type, as we don't have access to the mds struct in * delete_inode. + */ + OBD_ALLOC(s_ops, sizeof(*s_ops)); + memcpy(s_ops, mds->mds_sb->s_op, sizeof(*s_ops)); + mds->mds_fsops->cl_delete_inode = s_ops->delete_inode; + s_ops->delete_inode = mds->mds_fsops->fs_delete_inode; + mds->mds_sb->s_op = s_ops; + + RETURN(0); + +err_svc: + rpc_unregister_service(mds->mds_service); + OBD_FREE(mds->mds_service, sizeof(*mds->mds_service)); + + return(err); } /* mount the file system (secretly) */ -static int mds_setup(struct obd_device *obddev, obd_count len, - void *buf) - +static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) { - struct obd_ioctl_data* data = buf; - struct mds_obd *mds = &obddev->u.mds; - struct vfsmount *mnt; - struct lustre_peer peer; - int err; + struct obd_ioctl_data* data = buf; + struct mds_obd *mds = &obddev->u.mds; + struct vfsmount *mnt; + int err = 0; ENTRY; +#ifdef CONFIG_DEV_RDONLY + dev_clear_rdonly(2); +#endif + mds->mds_fstype = strdup(data->ioc_inlbuf2); + + if (!strcmp(mds->mds_fstype, "ext3")) + mds->mds_fsops = &mds_ext3_fs_ops; + else if (!strcmp(mds->mds_fstype, "ext2")) + mds->mds_fsops = &mds_ext2_fs_ops; + else { + CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype); + GOTO(err_kfree, (err = -EPERM)); + } - mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); - err = PTR_ERR(mnt); - if (IS_ERR(mnt)) { - EXIT; - return err; - } - - mds->mds_sb = mnt->mnt_root->d_inode->i_sb; - if (!obddev->u.mds.mds_sb) { - EXIT; - return -ENODEV; - } - - mds->mds_vfsmnt = mnt; - obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2); - - mds->mds_ctxt.pwdmnt = mnt; - mds->mds_ctxt.pwd = mnt->mnt_root; - mds->mds_ctxt.fs = KERNEL_DS; - mds->mds_remote_nid = 0; - - INIT_LIST_HEAD(&mds->mds_reqs); - mds->mds_thread = NULL; - mds->mds_flags = 0; - mds->mds_interval = 3 * HZ; - MDS = mds; + MOD_INC_USE_COUNT; + mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL); + if (IS_ERR(mnt)) { + CERROR("do_kern_mount failed: %d\n", err); + GOTO(err_dec, err = PTR_ERR(mnt)); + } - spin_lock_init(&obddev->u.mds.mds_lock); + mds->mds_sb = mnt->mnt_root->d_inode->i_sb; + if (!mds->mds_sb) + GOTO(err_put, (err = -ENODEV)); - err = kportal_uuid_to_peer("self", &peer); - if (err == 0) { - mds->mds_service = kmalloc(sizeof(*mds->mds_service), - GFP_KERNEL); - if (mds->mds_service == NULL) - return -ENOMEM; - mds->mds_service->srv_buf_size = 64 * 1024; - mds->mds_service->srv_portal = MDS_REQUEST_PORTAL; - memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer)); - mds->mds_service->srv_wait_queue = &mds->mds_waitq; + mds->mds_vfsmnt = mnt; + mds->mds_ctxt.pwdmnt = mnt; + mds->mds_ctxt.pwd = mnt->mnt_root; + mds->mds_ctxt.fs = KERNEL_DS; - rpc_register_service(mds->mds_service, "self"); - } + err = mds_prep(obddev); + if (err) + GOTO(err_put, err); - mds_start_srv_thread(mds); + RETURN(0); - MOD_INC_USE_COUNT; - EXIT; - return 0; -} +err_put: + unlock_kernel(); + mntput(mds->mds_vfsmnt); + mds->mds_sb = 0; + lock_kernel(); +err_dec: + MOD_DEC_USE_COUNT; +err_kfree: + kfree(mds->mds_fstype); + return err; +} static int mds_cleanup(struct obd_device * obddev) { + struct super_operations *s_ops = NULL; struct super_block *sb; - struct mds_obd *mds = &obddev->u.mds; + struct mds_obd *mds = &obddev->u.mds; ENTRY; - if ( !(obddev->obd_flags & OBD_SET_UP) ) { - EXIT; - return 0; + if ( !list_empty(&obddev->obd_gen_clients) ) { + CERROR("still has clients!\n"); + RETURN(-EBUSY); } - if ( !list_empty(&obddev->obd_gen_clients) ) { - printk(KERN_WARNING __FUNCTION__ ": still has clients!\n"); - EXIT; - return -EBUSY; + ptlrpc_stop_thread(mds->mds_service); + rpc_unregister_service(mds->mds_service); + if (!list_empty(&mds->mds_service->srv_reqs)) { + // XXX reply with errors and clean up + CERROR("Request list not empty!\n"); } + OBD_FREE(mds->mds_service, sizeof(*mds->mds_service)); - MDS = NULL; - mds_stop_srv_thread(mds); sb = mds->mds_sb; - if (!mds->mds_sb){ - EXIT; - return 0; - } + if (!mds->mds_sb) + RETURN(0); + + if (mds->last_rcvd) { + int rc = filp_close(mds->last_rcvd, 0); + mds->last_rcvd = NULL; - if (!list_empty(&mds->mds_reqs)) { - // XXX reply with errors and clean up - CDEBUG(D_INODE, "Request list not empty!\n"); - } + if (rc) + CERROR("last_rcvd file won't close, rc=%d\n", rc); + } + s_ops = sb->s_op; - unlock_kernel(); - mntput(mds->mds_vfsmnt); + unlock_kernel(); + mntput(mds->mds_vfsmnt); mds->mds_sb = 0; - kfree(mds->mds_fstype); - lock_kernel(); - + kfree(mds->mds_fstype); + lock_kernel(); +#ifdef CONFIG_DEV_RDONLY + dev_clear_rdonly(2); +#endif + OBD_FREE(s_ops, sizeof(*s_ops)); MOD_DEC_USE_COUNT; - EXIT; - return 0; + RETURN(0); } /* use obd ops to offer management infrastructure */ @@ -622,21 +579,17 @@ static struct obd_ops mds_obd_ops = { static int __init mds_init(void) { obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME); - return 0; + return 0; } static void __exit mds_exit(void) { - obd_unregister_type(LUSTRE_MDS_NAME); + obd_unregister_type(LUSTRE_MDS_NAME); } MODULE_AUTHOR("Peter J. Braam "); MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01"); MODULE_LICENSE("GPL"); - -// for testing (maybe this stays) -EXPORT_SYMBOL(mds_queue_req); - module_init(mds_init); module_exit(mds_exit);