X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmds%2Fhandler.c;h=9f3001663f698e935a0eb38c0b868fbc560608cf;hb=5ac94a75953124d934cef2788a3dddd676c49bb3;hp=e57472da3755860473fedd5895c6f518e8e4695e;hpb=4c819fc310c1f6fe135fc7b1f35dac40e23ae076;p=fs%2Flustre-release.git diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index e57472d..9f30016 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -2,18 +2,18 @@ * vim:expandtab:shiftwidth=8:tabstop=8: * * linux/mds/handler.c - * + * * Lustre Metadata Server (mds) request handler - * + * * Copyright (C) 2001, 2002 Cluster File Systems, Inc. * * This code is issued under the GNU General Public License. * See the file COPYING in this distribution * * by Peter Braam - * - * This server is single threaded at present (but can easily be multi threaded). - * + * + * This server is single threaded at present (but can easily be multi threaded) + * */ #define EXPORT_SYMTAB @@ -23,728 +23,542 @@ #include #include #include -#include #include #include #include #define DEBUG_SUBSYSTEM S_MDS -#include -#include -#include -#include #include +#include #include -#include - -// XXX for testing -static struct mds_obd *MDS; - -// XXX make this networked! -static int mds_queue_req(struct ptlrpc_request *req) -{ - struct ptlrpc_request *srv_req; - - if (!MDS) { - EXIT; - return -1; - } - - OBD_ALLOC(srv_req, sizeof(*srv_req)); - if (!srv_req) { - EXIT; - return -ENOMEM; - } - - CDEBUG(0, "---> MDS at %d %p, incoming req %p, srv_req %p\n", - __LINE__, MDS, req, srv_req); - - memset(srv_req, 0, sizeof(*req)); - - /* move the request buffer */ - srv_req->rq_reqbuf = req->rq_reqbuf; - srv_req->rq_reqlen = req->rq_reqlen; - srv_req->rq_obd = MDS; - - /* remember where it came from */ - srv_req->rq_reply_handle = req; - - list_add(&srv_req->rq_list, &MDS->mds_reqs); - wake_up(&MDS->mds_waitq); - return 0; -} -int mds_sendpage(struct ptlrpc_request *req, struct file *file, +int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset, struct niobuf *dst) { - int rc; - mm_segment_t oldfs = get_fs(); - - if (req->rq_peer.peer_nid == 0) { - /* dst->addr is a user address, but in a different task! */ - set_fs(KERNEL_DS); - rc = generic_file_read(file, (char *)(long)dst->addr, - PAGE_SIZE, &offset); - set_fs(oldfs); - - if (rc != PAGE_SIZE) - return -EIO; - } else { - char *buf; - - OBD_ALLOC(buf, PAGE_SIZE); - if (!buf) - return -ENOMEM; - - set_fs(KERNEL_DS); - rc = generic_file_read(file, buf, PAGE_SIZE, &offset); - set_fs(oldfs); - - if (rc != PAGE_SIZE) { - OBD_FREE(buf, PAGE_SIZE); - return -EIO; - } + int rc = 0; + mm_segment_t oldfs = get_fs(); - req->rq_bulkbuf = buf; - req->rq_bulklen = PAGE_SIZE; - rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL, 0); - init_waitqueue_head(&req->rq_wait_for_bulk); - sleep_on(&req->rq_wait_for_bulk); - OBD_FREE(buf, PAGE_SIZE); - req->rq_bulklen = 0; /* FIXME: eek. */ - } + OBD_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, -EIO); - return 0; -} + if (req->rq_peer.peer_nid == 0) { + /* dst->addr is a user address, but in a different task! */ + char *buf = (char *)(long)dst->addr; -int mds_reply(struct ptlrpc_request *req) -{ - struct ptlrpc_request *clnt_req = req->rq_reply_handle; - - ENTRY; - - if (req->rq_obd->mds_service != NULL) { - /* This is a request that came from the network via portals. */ - - /* FIXME: we need to increment the count of handled events */ - ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0); - } else { - /* This is a local request that came from another thread. */ - - /* move the reply to the client */ - clnt_req->rq_replen = req->rq_replen; - clnt_req->rq_repbuf = req->rq_repbuf; - req->rq_repbuf = NULL; - req->rq_replen = 0; - - /* free the request buffer */ - OBD_FREE(req->rq_reqbuf, req->rq_reqlen); - req->rq_reqbuf = NULL; - - /* wake up the client */ - wake_up_interruptible(&clnt_req->rq_wait_for_rep); - } - - EXIT; - return 0; -} + set_fs(KERNEL_DS); + rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE, + &offset); + set_fs(oldfs); -int mds_error(struct ptlrpc_request *req) -{ - struct ptlrep_hdr *hdr; + if (rc != PAGE_SIZE) { + rc = -EIO; + GOTO(out, rc); + } + EXIT; + } else { + struct ptlrpc_bulk_desc *bulk; + char *buf; + + bulk = ptlrpc_prep_bulk(&req->rq_peer); + if (bulk == NULL) { + rc = -ENOMEM; + GOTO(out, rc); + } + + bulk->b_xid = req->rq_xid; - ENTRY; + OBD_ALLOC(buf, PAGE_SIZE); + if (!buf) { + rc = -ENOMEM; + GOTO(cleanup_bulk, rc); + } + + set_fs(KERNEL_DS); + rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE, + &offset); + set_fs(oldfs); - OBD_ALLOC(hdr, sizeof(*hdr)); - if (!hdr) { - EXIT; - return -ENOMEM; - } + if (rc != PAGE_SIZE) { + rc = -EIO; + GOTO(cleanup_buf, rc); + } - memset(hdr, 0, sizeof(*hdr)); - - hdr->seqno = req->rq_reqhdr->seqno; - hdr->status = req->rq_status; - hdr->type = MDS_TYPE_ERR; + bulk->b_buf = buf; + bulk->b_buflen = PAGE_SIZE; - req->rq_repbuf = (char *)hdr; - req->rq_replen = sizeof(*hdr); + rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL); + wait_event_interruptible(bulk->b_waitq, + ptlrpc_check_bulk_sent(bulk)); - EXIT; - return mds_reply(req); + if (bulk->b_flags == PTL_RPC_INTR) { + rc = -EINTR; + GOTO(cleanup_buf, rc); + } + + EXIT; + cleanup_buf: + OBD_FREE(buf, PAGE_SIZE); + cleanup_bulk: + OBD_FREE(bulk, sizeof(*bulk)); + } +out: + return rc; } struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt) { - /* stolen from NFS */ - struct super_block *sb = mds->mds_sb; - unsigned long ino = fid->id; - //__u32 generation = fid->generation; - __u32 generation = 0; - struct inode *inode; - struct list_head *lp; - struct dentry *result; - - if (ino == 0) - return ERR_PTR(-ESTALE); - - inode = iget(sb, ino); - if (inode == NULL) - return ERR_PTR(-ENOMEM); - - CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb); - - if (is_bad_inode(inode) - || (generation && inode->i_generation != generation) - ) { - /* we didn't find the right inode.. */ - CERROR("bad inode %lu, link: %d ct: %d or version %u/%u\n", - inode->i_ino, - inode->i_nlink, atomic_read(&inode->i_count), - inode->i_generation, - generation); - iput(inode); - return ERR_PTR(-ESTALE); - } - - /* now to find a dentry. - * If possible, get a well-connected one - */ - if (mnt) - *mnt = mds->mds_vfsmnt; - spin_lock(&dcache_lock); - for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) { - result = list_entry(lp,struct dentry, d_alias); - if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) { - dget_locked(result); - result->d_vfs_flags |= DCACHE_REFERENCED; - spin_unlock(&dcache_lock); - iput(inode); - if (mnt) - mntget(*mnt); - return result; - } - } - spin_unlock(&dcache_lock); - result = d_alloc_root(inode); - if (result == NULL) { - iput(inode); - return ERR_PTR(-ENOMEM); - } - if (mnt) - mntget(*mnt); - result->d_flags |= DCACHE_NFSD_DISCONNECTED; - return result; -} + /* stolen from NFS */ + struct super_block *sb = mds->mds_sb; + unsigned long ino = fid->id; + __u32 generation = fid->generation; + struct inode *inode; + struct list_head *lp; + struct dentry *result; + + if (ino == 0) + return ERR_PTR(-ESTALE); + + inode = iget(sb, ino); + if (inode == NULL) + return ERR_PTR(-ENOMEM); + + CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb); + + if (is_bad_inode(inode) || + (generation && inode->i_generation != generation)) { + /* we didn't find the right inode.. */ + CERROR("bad inode %lu, link: %d ct: %d or version %u/%u\n", + inode->i_ino, + inode->i_nlink, atomic_read(&inode->i_count), + inode->i_generation, + generation); + LBUG(); + iput(inode); + return ERR_PTR(-ESTALE); + } -static inline void mds_get_objid(struct inode *inode, __u64 *id) -{ - memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id)); + /* now to find a dentry. + * If possible, get a well-connected one + */ + if (mnt) + *mnt = mds->mds_vfsmnt; + spin_lock(&dcache_lock); + for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) { + result = list_entry(lp,struct dentry, d_alias); + if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) { + dget_locked(result); + result->d_vfs_flags |= DCACHE_REFERENCED; + spin_unlock(&dcache_lock); + iput(inode); + if (mnt) + mntget(*mnt); + return result; + } + } + spin_unlock(&dcache_lock); + result = d_alloc_root(inode); + if (result == NULL) { + iput(inode); + return ERR_PTR(-ENOMEM); + } + if (mnt) + mntget(*mnt); + result->d_flags |= DCACHE_NFSD_DISCONNECTED; + return result; } int mds_getattr(struct ptlrpc_request *req) { - struct dentry *de; - struct inode *inode; - struct mds_rep *rep; - int rc; - - rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - EXIT; - CERROR("mds: out of memory\n"); - req->rq_status = -ENOMEM; - return 0; - } - - req->rq_rephdr->seqno = req->rq_reqhdr->seqno; - rep = req->rq_rep.mds; - - de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, NULL); - if (IS_ERR(de)) { - EXIT; - req->rq_rephdr->status = -ENOENT; - return 0; - } - - inode = de->d_inode; - rep->ino = inode->i_ino; - rep->atime = inode->i_atime; - rep->ctime = inode->i_ctime; - rep->mtime = inode->i_mtime; - rep->uid = inode->i_uid; - rep->gid = inode->i_gid; - rep->size = inode->i_size; - rep->mode = inode->i_mode; - rep->nlink = inode->i_nlink; - rep->valid = ~0; - mds_get_objid(inode, &rep->objid); - dput(de); - return 0; + struct dentry *de; + struct inode *inode; + struct mds_rep *rep; + struct mds_obd *mds = &req->rq_obd->u.mds; + int rc; + + rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, + &req->rq_replen, &req->rq_repbuf); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { + CERROR("mds: out of memory\n"); + req->rq_status = -ENOMEM; + RETURN(0); + } + + req->rq_rephdr->xid = req->rq_reqhdr->xid; + rep = req->rq_rep.mds; + + de = mds_fid2dentry(mds, &req->rq_req.mds->fid1, NULL); + if (IS_ERR(de)) { + req->rq_rephdr->status = -ENOENT; + RETURN(0); + } + + inode = de->d_inode; + rep->ino = inode->i_ino; + rep->generation = inode->i_generation; + rep->atime = inode->i_atime; + rep->ctime = inode->i_ctime; + rep->mtime = inode->i_mtime; + rep->uid = inode->i_uid; + rep->gid = inode->i_gid; + rep->size = inode->i_size; + rep->mode = inode->i_mode; + rep->nlink = inode->i_nlink; + rep->valid = ~0; + mds_fs_get_objid(mds, inode, &rep->objid); + dput(de); + return 0; } int mds_open(struct ptlrpc_request *req) { - struct dentry *de; - struct inode *inode; - struct mds_rep *rep; - struct file *file; - struct vfsmount *mnt; - __u32 flags; - int rc; - - rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - EXIT; - CERROR("mds: out of memory\n"); - req->rq_status = -ENOMEM; - return 0; - } - - req->rq_rephdr->seqno = req->rq_reqhdr->seqno; - rep = req->rq_rep.mds; - - de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, &mnt); - if (IS_ERR(de)) { - EXIT; - req->rq_rephdr->status = -ENOENT; - return 0; - } - flags = req->rq_req.mds->flags; - file = dentry_open(de, mnt, flags); - if (!file || IS_ERR(file)) { - req->rq_rephdr->status = -EINVAL; - return 0; - } - - rep->objid = (__u64) (unsigned long)file; - mds_get_objid(inode, &rep->objid); - dput(de); - return 0; -} - + struct dentry *de; + struct mds_rep *rep; + struct file *file; + struct vfsmount *mnt; + __u32 flags; + int rc; + + rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, + &req->rq_replen, &req->rq_repbuf); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) { + CERROR("mds: out of memory\n"); + req->rq_status = -ENOMEM; + RETURN(0); + } -int mds_readpage(struct ptlrpc_request *req) -{ - struct vfsmount *mnt; - struct dentry *de; - struct file *file; - struct niobuf *niobuf; - struct mds_rep *rep; - int rc; - - rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, - &req->rq_replen, &req->rq_repbuf); - if (rc) { - EXIT; - CERROR("mds: out of memory\n"); - req->rq_status = -ENOMEM; - return 0; - } - - req->rq_rephdr->seqno = req->rq_reqhdr->seqno; - rep = req->rq_rep.mds; - - de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, &mnt); - if (IS_ERR(de)) { - EXIT; - req->rq_rephdr->status = PTR_ERR(de); - return 0; - } + req->rq_rephdr->xid = req->rq_reqhdr->xid; + rep = req->rq_rep.mds; - CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino); + de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt); + if (IS_ERR(de)) { + req->rq_rephdr->status = -ENOENT; + RETURN(0); + } + flags = req->rq_req.mds->flags; + file = dentry_open(de, mnt, flags); + if (!file || IS_ERR(file)) { + req->rq_rephdr->status = -EINVAL; + RETURN(0); + } - file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); - /* note: in case of an error, dentry_open puts dentry */ - if (IS_ERR(file)) { - EXIT; - req->rq_rephdr->status = PTR_ERR(file); - return 0; - } - - niobuf = mds_req_tgt(req->rq_req.mds); - - /* to make this asynchronous make sure that the handling function - doesn't send a reply when this function completes. Instead a - callback function would send the reply */ - rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf); - - filp_close(file, 0); - req->rq_rephdr->status = rc; - EXIT; - return 0; + rep->objid = (__u64) (unsigned long)file; + return 0; } -int mds_reint(struct ptlrpc_request *req) +int mds_close(struct ptlrpc_request *req) { - int rc; - char *buf = mds_req_tgt(req->rq_req.mds); - int len = req->rq_req.mds->tgtlen; - struct mds_update_record rec; - - rc = mds_update_unpack(buf, len, &rec); - if (rc) { - CERROR("invalid record\n"); - req->rq_status = -EINVAL; - return 0; - } - /* rc will be used to interrupt a for loop over multiple records */ - rc = mds_reint_rec(&rec, req); - return 0; -} + struct dentry *de; + struct mds_rep *rep; + struct file *file; + struct vfsmount *mnt; + int rc; + + rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, + &req->rq_replen, &req->rq_repbuf); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) { + CERROR("mds: out of memory\n"); + req->rq_status = -ENOMEM; + RETURN(0); + } -//int mds_handle(struct mds_conn *conn, int len, char *buf) -int mds_handle(struct ptlrpc_request *req) -{ - int rc; - struct ptlreq_hdr *hdr; + req->rq_rephdr->xid = req->rq_reqhdr->xid; + rep = req->rq_rep.mds; - ENTRY; + de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt); + if (IS_ERR(de)) { + req->rq_rephdr->status = -ENOENT; + RETURN(0); + } - hdr = (struct ptlreq_hdr *)req->rq_reqbuf; + file = (struct file *)(unsigned long) req->rq_req.mds->objid; - if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) { - CERROR("lustre_mds: wrong packet type sent %d\n", - NTOH__u32(hdr->type)); - rc = -EINVAL; - goto out; - } + req->rq_rephdr->status = filp_close(file, 0); + dput(de); + mntput(mnt); + return 0; +} - rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, - &req->rq_reqhdr, &req->rq_req); - if (rc) { - CERROR("lustre_mds: Invalid request\n"); - EXIT; - goto out; - } - switch (req->rq_reqhdr->opc) { +int mds_readpage(struct ptlrpc_request *req) +{ + struct vfsmount *mnt; + struct dentry *de; + struct file *file; + struct niobuf *niobuf; + struct mds_rep *rep; + int rc; - case MDS_GETATTR: - CDEBUG(D_INODE, "getattr\n"); - rc = mds_getattr(req); - break; + ENTRY; - case MDS_READPAGE: - CDEBUG(D_INODE, "readpage\n"); - rc = mds_readpage(req); - break; + rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, + &req->rq_replen, &req->rq_repbuf); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) { + CERROR("mds: out of memory\n"); + req->rq_status = -ENOMEM; + RETURN(0); + } - case MDS_REINT: - CDEBUG(D_INODE, "reint\n"); - rc = mds_reint(req); - break; + req->rq_rephdr->xid = req->rq_reqhdr->xid; + rep = req->rq_rep.mds; - default: - return mds_error(req); - } + de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt); + if (IS_ERR(de)) { + req->rq_rephdr->status = PTR_ERR(de); + RETURN(0); + } -out: - if (rc) { - CERROR("no header\n"); - return 0; - } - - if( req->rq_status) { - mds_error(req); - } else { - CDEBUG(D_INODE, "sending reply\n"); - mds_reply(req); - } - - return 0; -} + CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino); + file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); + /* note: in case of an error, dentry_open puts dentry */ + if (IS_ERR(file)) { + req->rq_rephdr->status = PTR_ERR(file); + RETURN(0); + } -static void mds_timer_run(unsigned long __data) -{ - struct task_struct * p = (struct task_struct *) __data; + niobuf = mds_req_tgt(req->rq_req.mds); - wake_up_process(p); + /* to make this asynchronous make sure that the handling function + doesn't send a reply when this function completes. Instead a + callback function would send the reply */ + rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf); + + filp_close(file, 0); + req->rq_rephdr->status = rc; + RETURN(0); } -int mds_main(void *arg) +int mds_reint(struct ptlrpc_request *req) { - struct mds_obd *mds = (struct mds_obd *) arg; - struct timer_list timer; - DECLARE_WAITQUEUE(wait, current); - - lock_kernel(); - daemonize(); - spin_lock_irq(¤t->sigmask_lock); - sigfillset(¤t->blocked); - recalc_sigpending(current); - spin_unlock_irq(¤t->sigmask_lock); - - sprintf(current->comm, "lustre_mds"); - - /* Set up an interval timer which can be used to trigger a - wakeup after the interval expires */ - init_timer(&timer); - timer.data = (unsigned long) current; - timer.function = mds_timer_run; - mds->mds_timer = &timer; - - /* Record that the thread is running */ - mds->mds_thread = current; - mds->mds_flags = MDS_RUNNING; - wake_up(&mds->mds_done_waitq); - - /* And now, wait forever for commit wakeup events. */ - while (1) { - int signal; - int rc; - - - wake_up(&mds->mds_done_waitq); - CDEBUG(D_INODE, "mds_wakes pick up req here and continue\n"); - - if (mds->mds_service != NULL) { - ptl_event_t ev; - struct ptlrpc_request request; - struct ptlrpc_service *service; - - CDEBUG(D_IOCTL, "-- sleeping\n"); - signal = 0; - add_wait_queue(&mds->mds_waitq, &wait); - while (1) { - set_current_state(TASK_INTERRUPTIBLE); - rc = PtlEQGet(mds->mds_service->srv_eq_h, &ev); - if (rc == PTL_OK || rc == PTL_EQ_DROPPED) - break; - CERROR("EQGet rc %d\n", rc); - if (mds->mds_flags & MDS_STOPPING) - break; - - - /* if this process really wants to die, - * let it go */ - if (sigismember(&(current->pending.signal), - SIGKILL) || - sigismember(&(current->pending.signal), - SIGINT)) { - signal = 1; - break; - } - - schedule(); - } - remove_wait_queue(&mds->mds_waitq, &wait); - set_current_state(TASK_RUNNING); - CDEBUG(D_IOCTL, "-- done\n"); - - if (signal == 1) { - /* We broke out because of a signal */ - EXIT; - break; - } - if (mds->mds_flags & MDS_STOPPING) { - break; - } - - service = (struct ptlrpc_service *)ev.mem_desc.user_ptr; - - /* FIXME: If we move to an event-driven model, - * we should put the request on the stack of - * mds_handle instead. */ - memset(&request, 0, sizeof(request)); - request.rq_reqbuf = ev.mem_desc.start + ev.offset; - request.rq_reqlen = ev.mem_desc.length; - request.rq_obd = MDS; - request.rq_xid = ev.match_bits; - CERROR("got req %d\n", request.rq_xid); - - request.rq_peer.peer_nid = ev.initiator.nid; - /* FIXME: this NI should be the incoming NI. - * We don't know how to find that from here. */ - request.rq_peer.peer_ni = - mds->mds_service->srv_self.peer_ni; - rc = mds_handle(&request); - - /* Inform the rpc layer the event has been handled */ - ptl_received_rpc(service); - } else { - struct ptlrpc_request *request; - - CDEBUG(D_IOCTL, "-- sleeping\n"); - add_wait_queue(&mds->mds_waitq, &wait); - while (1) { - spin_lock(&mds->mds_lock); - if (!list_empty(&mds->mds_reqs)) - break; - - set_current_state(TASK_INTERRUPTIBLE); - - /* if this process really wants to die, - * let it go */ - if (sigismember(&(current->pending.signal), - SIGKILL) || - sigismember(&(current->pending.signal), - SIGINT)) - break; - - spin_unlock(&mds->mds_lock); - - schedule(); - } - remove_wait_queue(&mds->mds_waitq, &wait); - set_current_state(TASK_RUNNING); - CDEBUG(D_IOCTL, "-- done\n"); - - if (list_empty(&mds->mds_reqs)) { - CDEBUG(D_INODE, "woke because of signal\n"); - spin_unlock(&mds->mds_lock); - } else { - request = list_entry(mds->mds_reqs.next, - struct ptlrpc_request, - rq_list); - list_del(&request->rq_list); - spin_unlock(&mds->mds_lock); - rc = mds_handle(request); - } - } - } - - del_timer_sync(mds->mds_timer); - - /* XXX maintain a list of all managed devices: cleanup here */ - - mds->mds_thread = NULL; - wake_up(&mds->mds_done_waitq); - CERROR("lustre_mds: exiting\n"); - return 0; + char *buf; + int rc, len; + struct mds_update_record rec; + + buf = mds_req_tgt(req->rq_req.mds); + len = req->rq_req.mds->tgtlen; + + rc = mds_update_unpack(buf, len, &rec); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) { + CERROR("invalid record\n"); + req->rq_status = -EINVAL; + RETURN(0); + } + /* rc will be used to interrupt a for loop over multiple records */ + rc = mds_reint_rec(&rec, req); + return 0; } -static void mds_stop_srv_thread(struct mds_obd *mds) +int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc, + struct ptlrpc_request *req) { - mds->mds_flags |= MDS_STOPPING; + int rc; + struct ptlreq_hdr *hdr; - while (mds->mds_thread) { - wake_up(&mds->mds_waitq); - sleep_on(&mds->mds_done_waitq); - } -} + ENTRY; -static void mds_start_srv_thread(struct mds_obd *mds) -{ - init_waitqueue_head(&mds->mds_waitq); - init_waitqueue_head(&mds->mds_done_waitq); - kernel_thread(mds_main, (void *)mds, - CLONE_VM | CLONE_FS | CLONE_FILES); - while (!mds->mds_thread) - sleep_on(&mds->mds_done_waitq); + hdr = (struct ptlreq_hdr *)req->rq_reqbuf; + + if (NTOH__u32(hdr->type) != PTL_RPC_REQUEST) { + CERROR("lustre_mds: wrong packet type sent %d\n", + NTOH__u32(hdr->type)); + rc = -EINVAL; + GOTO(out, rc); + } + + rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, + &req->rq_reqhdr, &req->rq_req); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) { + CERROR("lustre_mds: Invalid request\n"); + GOTO(out, rc); + } + + switch (req->rq_reqhdr->opc) { + + case MDS_GETATTR: + CDEBUG(D_INODE, "getattr\n"); + OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0); + rc = mds_getattr(req); + break; + + case MDS_READPAGE: + CDEBUG(D_INODE, "readpage\n"); + OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0); + rc = mds_readpage(req); + break; + + case MDS_REINT: + CDEBUG(D_INODE, "reint\n"); + OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0); + rc = mds_reint(req); + break; + + case MDS_OPEN: + CDEBUG(D_INODE, "open\n"); + OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0); + rc = mds_open(req); + break; + + case MDS_CLOSE: + CDEBUG(D_INODE, "close\n"); + OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0); + rc = mds_close(req); + break; + + default: + rc = ptlrpc_error(dev, svc, req); + RETURN(rc); + } + + EXIT; +out: + if (rc) { + CERROR("no header\n"); + LBUG(); + return 0; + } + + if( req->rq_status) { + ptlrpc_error(dev, svc, req); + } else { + CDEBUG(D_NET, "sending reply\n"); + ptlrpc_reply(dev, svc, req); + } + + return 0; } + /* mount the file system (secretly) */ -static int mds_setup(struct obd_device *obddev, obd_count len, - void *buf) - +static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) { - struct obd_ioctl_data* data = buf; - struct mds_obd *mds = &obddev->u.mds; - struct vfsmount *mnt; - struct lustre_peer peer; - int err; + struct obd_ioctl_data* data = buf; + struct mds_obd *mds = &obddev->u.mds; + struct super_operations *s_ops; + struct super_block *sb; + struct vfsmount *mnt; + int err; ENTRY; + MOD_INC_USE_COUNT; +#ifdef CONFIG_DEV_RDONLY + dev_clear_rdonly(2); +#endif + mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); + err = PTR_ERR(mnt); + if (IS_ERR(mnt)) { + CERROR("do_kern_mount failed: %d\n", err); + GOTO(err_dec, err); + } - mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); - err = PTR_ERR(mnt); - if (IS_ERR(mnt)) { - EXIT; - return err; - } - - mds->mds_sb = mnt->mnt_root->d_inode->i_sb; - if (!obddev->u.mds.mds_sb) { - EXIT; - return -ENODEV; - } - - mds->mds_vfsmnt = mnt; - obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2); - - mds->mds_ctxt.pwdmnt = mnt; - mds->mds_ctxt.pwd = mnt->mnt_root; - mds->mds_ctxt.fs = KERNEL_DS; - mds->mds_remote_nid = 0; + sb = mds->mds_sb = mnt->mnt_root->d_inode->i_sb; + if (!sb) + GOTO(err_put, (err = -ENODEV)); - INIT_LIST_HEAD(&mds->mds_reqs); - mds->mds_thread = NULL; - mds->mds_flags = 0; - mds->mds_interval = 3 * HZ; - MDS = mds; + mds->mds_vfsmnt = mnt; + mds->mds_fstype = strdup(data->ioc_inlbuf2); - spin_lock_init(&obddev->u.mds.mds_lock); + if (!strcmp(mds->mds_fstype, "ext3")) + mds->mds_fsops = &mds_ext3_fs_ops; + else if (!strcmp(mds->mds_fstype, "ext2")) + mds->mds_fsops = &mds_ext2_fs_ops; + else { + CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype); + GOTO(err_kfree, (err = -EPERM)); + } - err = kportal_uuid_to_peer("self", &peer); - if (err == 0) { - OBD_ALLOC(mds->mds_service, sizeof(*mds->mds_service)); - if (mds->mds_service == NULL) - return -ENOMEM; - mds->mds_service->srv_buf_size = 64 * 1024; - //mds->mds_service->srv_buf_size = 1024; - mds->mds_service->srv_portal = MDS_REQUEST_PORTAL; - memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer)); - mds->mds_service->srv_wait_queue = &mds->mds_waitq; + mds->mds_ctxt.pwdmnt = mnt; + mds->mds_ctxt.pwd = mnt->mnt_root; + mds->mds_ctxt.fs = KERNEL_DS; - rpc_register_service(mds->mds_service, "self"); - } + mds->mds_service = ptlrpc_init_svc(128 * 1024, + MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, + "self", mds_handle); + if (!mds->mds_service) { + CERROR("failed to start service\n"); + GOTO(err_kfree, (err = -EINVAL)); + } - mds_start_srv_thread(mds); + err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds"); + if (err) { + CERROR("cannot start thread\n"); + GOTO(err_svc, err); + } - MOD_INC_USE_COUNT; - EXIT; - return 0; -} + /* + * Replace the client filesystem delete_inode method with our own, + * so that we can clear the object ID before the inode is deleted. + * The fs_delete_inode method will call cl_delete_inode for us. + * + * We need to do this for the MDS superblock only, hence we install + * a modified copy of the original superblock method table. + * + * We still assume that there is only a single MDS client filesystem + * type, as we don't have access to the mds struct in * delete_inode. + */ + OBD_ALLOC(s_ops, sizeof(*s_ops)); + memcpy(s_ops, sb->s_op, sizeof(*s_ops)); + mds->mds_fsops->cl_delete_inode = s_ops->delete_inode; + s_ops->delete_inode = mds->mds_fsops->fs_delete_inode; + sb->s_op = s_ops; + + RETURN(0); + +err_svc: + rpc_unregister_service(mds->mds_service); + OBD_FREE(mds->mds_service, sizeof(*mds->mds_service)); +err_kfree: + kfree(mds->mds_fstype); +err_put: + unlock_kernel(); // XXX do we want/need this? + mntput(mds->mds_vfsmnt); + mds->mds_sb = 0; + lock_kernel(); // XXX do we want/need this? +err_dec: + MOD_DEC_USE_COUNT; + return err; +} static int mds_cleanup(struct obd_device * obddev) { + struct super_operations *s_ops = NULL; struct super_block *sb; - struct mds_obd *mds = &obddev->u.mds; + struct mds_obd *mds = &obddev->u.mds; ENTRY; - if ( !(obddev->obd_flags & OBD_SET_UP) ) { - EXIT; - return 0; - } - if ( !list_empty(&obddev->obd_gen_clients) ) { CERROR("still has clients!\n"); - EXIT; - return -EBUSY; + RETURN(-EBUSY); } - MDS = NULL; - mds_stop_srv_thread(mds); + ptlrpc_stop_thread(mds->mds_service); rpc_unregister_service(mds->mds_service); + if (!list_empty(&mds->mds_service->srv_reqs)) { + // XXX reply with errors and clean up + CERROR("Request list not empty!\n"); + } OBD_FREE(mds->mds_service, sizeof(*mds->mds_service)); sb = mds->mds_sb; - if (!mds->mds_sb){ - EXIT; - return 0; - } + if (!mds->mds_sb) + RETURN(0); - if (!list_empty(&mds->mds_reqs)) { - // XXX reply with errors and clean up - CDEBUG(D_INODE, "Request list not empty!\n"); - } + s_ops = sb->s_op; - unlock_kernel(); - mntput(mds->mds_vfsmnt); + unlock_kernel(); + mntput(mds->mds_vfsmnt); mds->mds_sb = 0; - kfree(mds->mds_fstype); - lock_kernel(); + kfree(mds->mds_fstype); + lock_kernel(); +#ifdef CONFIG_DEV_RDONLY + dev_clear_rdonly(2); +#endif + + OBD_FREE(s_ops, sizeof(*s_ops)); MOD_DEC_USE_COUNT; - EXIT; - return 0; + RETURN(0); } /* use obd ops to offer management infrastructure */ @@ -756,21 +570,17 @@ static struct obd_ops mds_obd_ops = { static int __init mds_init(void) { obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME); - return 0; + return 0; } static void __exit mds_exit(void) { - obd_unregister_type(LUSTRE_MDS_NAME); + obd_unregister_type(LUSTRE_MDS_NAME); } MODULE_AUTHOR("Peter J. Braam "); MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01"); MODULE_LICENSE("GPL"); - -// for testing (maybe this stays) -EXPORT_SYMBOL(mds_queue_req); - module_init(mds_init); module_exit(mds_exit);