1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Lustre Metadata Server (mds) request handler
8 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
10 * This code is issued under the GNU General Public License.
11 * See the file COPYING in this distribution
13 * by Peter Braam <braam@clusterfs.com>
15 * This server is single threaded at present (but can easily be multi threaded).
21 #include <linux/version.h>
22 #include <linux/module.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/ext2_fs.h>
27 #include <linux/quotaops.h>
28 #include <asm/unistd.h>
29 #include <asm/uaccess.h>
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/obd_support.h>
34 #include <linux/obd.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_idl.h>
37 #include <linux/lustre_mds.h>
38 #include <linux/lustre_net.h>
39 #include <linux/obd_class.h>
42 static struct mds_obd *MDS;
44 // XXX make this networked!
45 static int mds_queue_req(struct ptlrpc_request *req)
47 struct ptlrpc_request *srv_req;
54 OBD_ALLOC(srv_req, sizeof(*srv_req));
60 CDEBUG(0, "---> MDS at %d %p, incoming req %p, srv_req %p\n",
61 __LINE__, MDS, req, srv_req);
63 memset(srv_req, 0, sizeof(*req));
65 /* move the request buffer */
66 srv_req->rq_reqbuf = req->rq_reqbuf;
67 srv_req->rq_reqlen = req->rq_reqlen;
68 srv_req->rq_obd = MDS;
70 /* remember where it came from */
71 srv_req->rq_reply_handle = req;
73 list_add(&srv_req->rq_list, &MDS->mds_reqs);
74 wake_up(&MDS->mds_waitq);
78 int mds_sendpage(struct ptlrpc_request *req, struct file *file,
79 __u64 offset, struct niobuf *dst)
82 mm_segment_t oldfs = get_fs();
84 if (req->rq_peer.peer_nid == 0) {
85 /* dst->addr is a user address, but in a different task! */
87 rc = generic_file_read(file, (char *)(long)dst->addr,
96 OBD_ALLOC(buf, PAGE_SIZE);
101 rc = generic_file_read(file, buf, PAGE_SIZE, &offset);
104 if (rc != PAGE_SIZE) {
105 OBD_FREE(buf, PAGE_SIZE);
109 req->rq_bulkbuf = buf;
110 req->rq_bulklen = PAGE_SIZE;
111 init_waitqueue_head(&req->rq_wait_for_bulk);
112 rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL, 0);
113 sleep_on(&req->rq_wait_for_bulk);
114 OBD_FREE(buf, PAGE_SIZE);
115 req->rq_bulklen = 0; /* FIXME: eek. */
121 int mds_reply(struct ptlrpc_request *req)
123 struct ptlrpc_request *clnt_req = req->rq_reply_handle;
127 if (req->rq_obd->mds_service != NULL) {
128 /* This is a request that came from the network via portals. */
130 /* FIXME: we need to increment the count of handled events */
131 ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0);
133 /* This is a local request that came from another thread. */
135 /* move the reply to the client */
136 clnt_req->rq_replen = req->rq_replen;
137 clnt_req->rq_repbuf = req->rq_repbuf;
138 req->rq_repbuf = NULL;
141 /* free the request buffer */
142 OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
143 req->rq_reqbuf = NULL;
145 /* wake up the client */
146 wake_up_interruptible(&clnt_req->rq_wait_for_rep);
153 int mds_error(struct ptlrpc_request *req)
155 struct ptlrep_hdr *hdr;
159 OBD_ALLOC(hdr, sizeof(*hdr));
165 memset(hdr, 0, sizeof(*hdr));
167 hdr->seqno = req->rq_reqhdr->seqno;
168 hdr->status = req->rq_status;
169 hdr->type = MDS_TYPE_ERR;
171 req->rq_repbuf = (char *)hdr;
172 req->rq_replen = sizeof(*hdr);
175 return mds_reply(req);
178 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
179 struct vfsmount **mnt)
181 /* stolen from NFS */
182 struct super_block *sb = mds->mds_sb;
183 unsigned long ino = fid->id;
184 //__u32 generation = fid->generation;
185 __u32 generation = 0;
187 struct list_head *lp;
188 struct dentry *result;
191 return ERR_PTR(-ESTALE);
193 inode = iget(sb, ino);
195 return ERR_PTR(-ENOMEM);
197 CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
199 if (is_bad_inode(inode)
200 || (generation && inode->i_generation != generation)
202 /* we didn't find the right inode.. */
203 CERROR("bad inode %lu, link: %d ct: %d or version %u/%u\n",
205 inode->i_nlink, atomic_read(&inode->i_count),
209 return ERR_PTR(-ESTALE);
212 /* now to find a dentry.
213 * If possible, get a well-connected one
216 *mnt = mds->mds_vfsmnt;
217 spin_lock(&dcache_lock);
218 for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
219 result = list_entry(lp,struct dentry, d_alias);
220 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
222 result->d_vfs_flags |= DCACHE_REFERENCED;
223 spin_unlock(&dcache_lock);
230 spin_unlock(&dcache_lock);
231 result = d_alloc_root(inode);
232 if (result == NULL) {
234 return ERR_PTR(-ENOMEM);
238 result->d_flags |= DCACHE_NFSD_DISCONNECTED;
242 static inline void mds_get_objid(struct inode *inode, __u64 *id)
244 memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
247 int mds_getattr(struct ptlrpc_request *req)
254 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
255 &req->rq_replen, &req->rq_repbuf);
258 CERROR("mds: out of memory\n");
259 req->rq_status = -ENOMEM;
263 req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
264 rep = req->rq_rep.mds;
266 de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, NULL);
269 req->rq_rephdr->status = -ENOENT;
274 rep->ino = inode->i_ino;
275 rep->atime = inode->i_atime;
276 rep->ctime = inode->i_ctime;
277 rep->mtime = inode->i_mtime;
278 rep->uid = inode->i_uid;
279 rep->gid = inode->i_gid;
280 rep->size = inode->i_size;
281 rep->mode = inode->i_mode;
282 rep->nlink = inode->i_nlink;
284 mds_get_objid(inode, &rep->objid);
289 int mds_open(struct ptlrpc_request *req)
295 struct vfsmount *mnt;
299 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
300 &req->rq_replen, &req->rq_repbuf);
303 CERROR("mds: out of memory\n");
304 req->rq_status = -ENOMEM;
308 req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
309 rep = req->rq_rep.mds;
311 de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, &mnt);
314 req->rq_rephdr->status = -ENOENT;
317 flags = req->rq_req.mds->flags;
318 file = dentry_open(de, mnt, flags);
319 if (!file || IS_ERR(file)) {
320 req->rq_rephdr->status = -EINVAL;
324 rep->objid = (__u64) (unsigned long)file;
325 mds_get_objid(inode, &rep->objid);
331 int mds_readpage(struct ptlrpc_request *req)
333 struct vfsmount *mnt;
336 struct niobuf *niobuf;
340 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
341 &req->rq_replen, &req->rq_repbuf);
344 CERROR("mds: out of memory\n");
345 req->rq_status = -ENOMEM;
349 req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
350 rep = req->rq_rep.mds;
352 de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, &mnt);
355 req->rq_rephdr->status = PTR_ERR(de);
359 CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
361 file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
362 /* note: in case of an error, dentry_open puts dentry */
365 req->rq_rephdr->status = PTR_ERR(file);
369 niobuf = mds_req_tgt(req->rq_req.mds);
371 /* to make this asynchronous make sure that the handling function
372 doesn't send a reply when this function completes. Instead a
373 callback function would send the reply */
374 rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf);
377 req->rq_rephdr->status = rc;
382 int mds_reint(struct ptlrpc_request *req)
385 char *buf = mds_req_tgt(req->rq_req.mds);
386 int len = req->rq_req.mds->tgtlen;
387 struct mds_update_record rec;
389 rc = mds_update_unpack(buf, len, &rec);
391 CERROR("invalid record\n");
392 req->rq_status = -EINVAL;
395 /* rc will be used to interrupt a for loop over multiple records */
396 rc = mds_reint_rec(&rec, req);
400 //int mds_handle(struct mds_conn *conn, int len, char *buf)
401 int mds_handle(struct ptlrpc_request *req)
404 struct ptlreq_hdr *hdr;
408 hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
410 if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
411 CERROR("lustre_mds: wrong packet type sent %d\n",
412 NTOH__u32(hdr->type));
417 rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen,
418 &req->rq_reqhdr, &req->rq_req);
420 CERROR("lustre_mds: Invalid request\n");
425 switch (req->rq_reqhdr->opc) {
428 CDEBUG(D_INODE, "getattr\n");
429 rc = mds_getattr(req);
433 CDEBUG(D_INODE, "readpage\n");
434 rc = mds_readpage(req);
438 CDEBUG(D_INODE, "reint\n");
443 return mds_error(req);
448 CERROR("no header\n");
452 if( req->rq_status) {
455 CDEBUG(D_INODE, "sending reply\n");
463 static void mds_timer_run(unsigned long __data)
465 struct task_struct * p = (struct task_struct *) __data;
470 int mds_main(void *arg)
472 struct mds_obd *mds = (struct mds_obd *) arg;
473 struct timer_list timer;
474 DECLARE_WAITQUEUE(wait, current);
478 spin_lock_irq(¤t->sigmask_lock);
479 sigfillset(¤t->blocked);
480 recalc_sigpending(current);
481 spin_unlock_irq(¤t->sigmask_lock);
483 sprintf(current->comm, "lustre_mds");
485 /* Set up an interval timer which can be used to trigger a
486 wakeup after the interval expires */
488 timer.data = (unsigned long) current;
489 timer.function = mds_timer_run;
490 mds->mds_timer = &timer;
492 /* Record that the thread is running */
493 mds->mds_thread = current;
494 mds->mds_flags = MDS_RUNNING;
495 wake_up(&mds->mds_done_waitq);
497 /* And now, wait forever for commit wakeup events. */
502 wake_up(&mds->mds_done_waitq);
503 CDEBUG(D_INODE, "mds_wakes pick up req here and continue\n");
505 if (mds->mds_service != NULL) {
507 struct ptlrpc_request request;
508 struct ptlrpc_service *service;
510 CDEBUG(D_IOCTL, "-- sleeping\n");
512 add_wait_queue(&mds->mds_waitq, &wait);
514 set_current_state(TASK_INTERRUPTIBLE);
515 rc = PtlEQGet(mds->mds_service->srv_eq_h, &ev);
516 if (rc == PTL_OK || rc == PTL_EQ_DROPPED)
518 CERROR("EQGet rc %d\n", rc);
519 if (mds->mds_flags & MDS_STOPPING)
522 /* if this process really wants to die,
524 if (sigismember(&(current->pending.signal),
526 sigismember(&(current->pending.signal),
534 remove_wait_queue(&mds->mds_waitq, &wait);
535 set_current_state(TASK_RUNNING);
536 CDEBUG(D_IOCTL, "-- done\n");
539 /* We broke out because of a signal */
543 if (mds->mds_flags & MDS_STOPPING) {
547 service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
549 /* FIXME: If we move to an event-driven model,
550 * we should put the request on the stack of
551 * mds_handle instead. */
552 memset(&request, 0, sizeof(request));
553 request.rq_reqbuf = ev.mem_desc.start + ev.offset;
554 request.rq_reqlen = ev.mem_desc.length;
555 request.rq_obd = MDS;
556 request.rq_xid = ev.match_bits;
557 CERROR("got req %d\n", request.rq_xid);
559 request.rq_peer.peer_nid = ev.initiator.nid;
560 /* FIXME: this NI should be the incoming NI.
561 * We don't know how to find that from here. */
562 request.rq_peer.peer_ni =
563 mds->mds_service->srv_self.peer_ni;
564 rc = mds_handle(&request);
566 /* Inform the rpc layer the event has been handled */
567 ptl_received_rpc(service);
569 struct ptlrpc_request *request;
571 CDEBUG(D_IOCTL, "-- sleeping\n");
572 add_wait_queue(&mds->mds_waitq, &wait);
574 spin_lock(&mds->mds_lock);
575 if (!list_empty(&mds->mds_reqs))
578 set_current_state(TASK_INTERRUPTIBLE);
580 /* if this process really wants to die,
582 if (sigismember(&(current->pending.signal),
584 sigismember(&(current->pending.signal),
588 spin_unlock(&mds->mds_lock);
592 remove_wait_queue(&mds->mds_waitq, &wait);
593 set_current_state(TASK_RUNNING);
594 CDEBUG(D_IOCTL, "-- done\n");
596 if (list_empty(&mds->mds_reqs)) {
597 CDEBUG(D_INODE, "woke because of signal\n");
598 spin_unlock(&mds->mds_lock);
600 request = list_entry(mds->mds_reqs.next,
601 struct ptlrpc_request,
603 list_del(&request->rq_list);
604 spin_unlock(&mds->mds_lock);
605 rc = mds_handle(request);
610 del_timer_sync(mds->mds_timer);
612 /* XXX maintain a list of all managed devices: cleanup here */
614 mds->mds_thread = NULL;
615 wake_up(&mds->mds_done_waitq);
616 CERROR("lustre_mds: exiting\n");
620 static void mds_stop_srv_thread(struct mds_obd *mds)
622 mds->mds_flags |= MDS_STOPPING;
624 while (mds->mds_thread) {
625 wake_up(&mds->mds_waitq);
626 sleep_on(&mds->mds_done_waitq);
630 static void mds_start_srv_thread(struct mds_obd *mds)
632 init_waitqueue_head(&mds->mds_waitq);
633 init_waitqueue_head(&mds->mds_done_waitq);
634 kernel_thread(mds_main, (void *)mds, CLONE_VM | CLONE_FS | CLONE_FILES);
635 while (!mds->mds_thread)
636 sleep_on(&mds->mds_done_waitq);
639 /* mount the file system (secretly) */
640 static int mds_setup(struct obd_device *obddev, obd_count len,
644 struct obd_ioctl_data* data = buf;
645 struct mds_obd *mds = &obddev->u.mds;
646 struct vfsmount *mnt;
647 struct lustre_peer peer;
652 mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
659 mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
660 if (!obddev->u.mds.mds_sb) {
665 mds->mds_vfsmnt = mnt;
666 obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
668 mds->mds_ctxt.pwdmnt = mnt;
669 mds->mds_ctxt.pwd = mnt->mnt_root;
670 mds->mds_ctxt.fs = KERNEL_DS;
671 mds->mds_remote_nid = 0;
673 INIT_LIST_HEAD(&mds->mds_reqs);
674 mds->mds_thread = NULL;
676 mds->mds_interval = 3 * HZ;
679 spin_lock_init(&obddev->u.mds.mds_lock);
681 err = kportal_uuid_to_peer("self", &peer);
683 OBD_ALLOC(mds->mds_service, sizeof(*mds->mds_service));
684 if (mds->mds_service == NULL)
686 mds->mds_service->srv_buf_size = 64 * 1024;
687 //mds->mds_service->srv_buf_size = 1024;
688 mds->mds_service->srv_portal = MDS_REQUEST_PORTAL;
689 memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer));
690 mds->mds_service->srv_wait_queue = &mds->mds_waitq;
692 rpc_register_service(mds->mds_service, "self");
695 mds_start_srv_thread(mds);
702 static int mds_cleanup(struct obd_device * obddev)
704 struct super_block *sb;
705 struct mds_obd *mds = &obddev->u.mds;
709 if ( !(obddev->obd_flags & OBD_SET_UP) ) {
714 if ( !list_empty(&obddev->obd_gen_clients) ) {
715 CERROR("still has clients!\n");
721 mds_stop_srv_thread(mds);
722 rpc_unregister_service(mds->mds_service);
723 OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
731 if (!list_empty(&mds->mds_reqs)) {
732 // XXX reply with errors and clean up
733 CDEBUG(D_INODE, "Request list not empty!\n");
737 mntput(mds->mds_vfsmnt);
739 kfree(mds->mds_fstype);
747 /* use obd ops to offer management infrastructure */
748 static struct obd_ops mds_obd_ops = {
750 o_cleanup: mds_cleanup,
753 static int __init mds_init(void)
755 obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
759 static void __exit mds_exit(void)
761 obd_unregister_type(LUSTRE_MDS_NAME);
764 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
765 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
766 MODULE_LICENSE("GPL");
769 // for testing (maybe this stays)
770 EXPORT_SYMBOL(mds_queue_req);
772 module_init(mds_init);
773 module_exit(mds_exit);