4 * Lustre Metadata Server (mds) request handler
6 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
8 * This code is issued under the GNU General Public License.
9 * See the file COPYING in this distribution
11 * by Peter Braam <braam@clusterfs.com>
13 * This server is single threaded at present (but can easily be multi threaded).
20 #include <linux/version.h>
21 #include <linux/module.h>
23 #include <linux/stat.h>
24 #include <linux/locks.h>
25 #include <linux/ext2_fs.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29 #include <linux/obd_support.h>
30 #include <linux/obd.h>
31 #include <linux/lustre_lib.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_net.h>
35 #include <linux/obd_class.h>
38 static struct mds_obd *MDS;
40 // XXX make this networked!
41 static int mds_queue_req(struct ptlrpc_request *req)
43 struct ptlrpc_request *srv_req;
50 srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
56 printk("---> MDS at %d %p, incoming req %p, srv_req %p\n",
57 __LINE__, MDS, req, srv_req);
59 memset(srv_req, 0, sizeof(*req));
61 /* move the request buffer */
62 srv_req->rq_reqbuf = req->rq_reqbuf;
63 srv_req->rq_reqlen = req->rq_reqlen;
64 srv_req->rq_obd = MDS;
66 /* remember where it came from */
67 srv_req->rq_reply_handle = req;
69 list_add(&srv_req->rq_list, &MDS->mds_reqs);
70 wake_up(&MDS->mds_waitq);
74 /* XXX do this over the net */
75 int mds_sendpage(struct ptlrpc_request *req, struct file *file,
76 __u64 offset, struct niobuf *dst)
79 mm_segment_t oldfs = get_fs();
81 if (req->rq_peer.peer_nid == 0) {
82 /* dst->addr is a user address, but in a different task! */
84 rc = generic_file_read(file, (char *)(long)dst->addr,
93 buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
99 rc = generic_file_read(file, buf, PAGE_SIZE, &offset);
105 req->rq_bulkbuf = buf;
106 req->rq_bulklen = PAGE_SIZE;
107 rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL, 0);
108 init_waitqueue_head(&req->rq_wait_for_bulk);
109 sleep_on(&req->rq_wait_for_bulk);
111 req->rq_bulklen = 0; /* FIXME: eek. */
117 /* XXX replace with networking code */
118 int mds_reply(struct ptlrpc_request *req)
120 struct ptlrpc_request *clnt_req = req->rq_reply_handle;
124 if (req->rq_obd->mds_service != NULL) {
125 /* This is a request that came from the network via portals. */
127 /* FIXME: we need to increment the count of handled events */
128 ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0);
130 /* This is a local request that came from another thread. */
132 /* move the reply to the client */
133 clnt_req->rq_replen = req->rq_replen;
134 clnt_req->rq_repbuf = req->rq_repbuf;
135 req->rq_repbuf = NULL;
138 /* free the request buffer */
139 kfree(req->rq_reqbuf);
140 req->rq_reqbuf = NULL;
142 /* wake up the client */
143 wake_up_interruptible(&clnt_req->rq_wait_for_rep);
150 int mds_error(struct ptlrpc_request *req)
152 struct ptlrep_hdr *hdr;
156 hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
162 memset(hdr, 0, sizeof(*hdr));
164 hdr->seqno = req->rq_reqhdr->seqno;
165 hdr->status = req->rq_status;
166 hdr->type = MDS_TYPE_ERR;
168 req->rq_repbuf = (char *)hdr;
169 req->rq_replen = sizeof(*hdr);
172 return mds_reply(req);
175 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt)
177 /* stolen from NFS */
178 struct super_block *sb = mds->mds_sb;
179 unsigned long ino = fid->id;
180 //__u32 generation = fid->generation;
181 __u32 generation = 0;
183 struct list_head *lp;
184 struct dentry *result;
187 *mnt = mntget(mds->mds_vfsmnt);
191 return ERR_PTR(-ESTALE);
193 inode = iget(sb, ino);
195 return ERR_PTR(-ENOMEM);
197 printk("--> mds_fid2dentry: sb %p\n", inode->i_sb);
199 if (is_bad_inode(inode)
200 || (generation && inode->i_generation != generation)
202 /* we didn't find the right inode.. */
204 "bad inode %lu, link: %d ct: %d or version %u/%u\n",
206 inode->i_nlink, atomic_read(&inode->i_count),
210 return ERR_PTR(-ESTALE);
213 /* now to find a dentry.
214 * If possible, get a well-connected one
216 spin_lock(&dcache_lock);
217 for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
218 result = list_entry(lp,struct dentry, d_alias);
219 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
221 result->d_vfs_flags |= DCACHE_REFERENCED;
222 spin_unlock(&dcache_lock);
227 spin_unlock(&dcache_lock);
228 result = d_alloc_root(inode);
229 if (result == NULL) {
231 return ERR_PTR(-ENOMEM);
233 result->d_flags |= DCACHE_NFSD_DISCONNECTED;
237 static inline void mds_get_objid(struct inode *inode, __u64 *id)
239 memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
242 int mds_getattr(struct ptlrpc_request *req)
244 struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1,
250 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds,
251 &req->rq_replen, &req->rq_repbuf);
254 printk("mds: out of memory\n");
255 req->rq_status = -ENOMEM;
259 req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
260 rep = req->rq_rep.mds;
264 req->rq_rephdr->status = -ENOENT;
269 rep->ino = inode->i_ino;
270 rep->atime = inode->i_atime;
271 rep->ctime = inode->i_ctime;
272 rep->mtime = inode->i_mtime;
273 rep->uid = inode->i_uid;
274 rep->gid = inode->i_gid;
275 rep->size = inode->i_size;
276 rep->mode = inode->i_mode;
277 rep->nlink = inode->i_nlink;
279 mds_get_objid(inode, &rep->objid);
284 int mds_readpage(struct ptlrpc_request *req)
286 struct vfsmount *mnt;
287 struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1,
290 struct niobuf *niobuf;
294 printk("mds_readpage: ino %ld\n", de->d_inode->i_ino);
295 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds,
296 &req->rq_replen, &req->rq_repbuf);
299 printk("mds: out of memory\n");
300 req->rq_status = -ENOMEM;
304 req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
305 rep = req->rq_rep.mds;
309 req->rq_rephdr->status = PTR_ERR(de);
313 file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
314 /* note: in case of an error, dentry_open puts dentry */
317 req->rq_rephdr->status = PTR_ERR(file);
321 niobuf = mds_req_tgt(req->rq_req.mds);
323 /* to make this asynchronous make sure that the handling function
324 doesn't send a reply when this function completes. Instead a
325 callback function would send the reply */
326 rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf);
329 req->rq_rephdr->status = rc;
334 int mds_reint(struct ptlrpc_request *req)
337 char *buf = mds_req_tgt(req->rq_req.mds);
338 int len = req->rq_req.mds->tgtlen;
339 struct mds_update_record rec;
341 rc = mds_update_unpack(buf, len, &rec);
343 printk(__FUNCTION__ ": invalid record\n");
344 req->rq_status = -EINVAL;
347 /* rc will be used to interrupt a for loop over multiple records */
348 rc = mds_reint_rec(&rec, req);
352 //int mds_handle(struct mds_conn *conn, int len, char *buf)
353 int mds_handle(struct ptlrpc_request *req)
356 struct ptlreq_hdr *hdr;
360 hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
362 if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
363 printk("lustre_mds: wrong packet type sent %d\n",
364 NTOH__u32(hdr->type));
369 rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen,
370 &req->rq_reqhdr, &req->rq_req.mds);
372 printk("lustre_mds: Invalid request\n");
377 switch (req->rq_reqhdr->opc) {
380 CDEBUG(D_INODE, "getattr\n");
381 rc = mds_getattr(req);
385 CDEBUG(D_INODE, "readpage\n");
386 rc = mds_readpage(req);
390 CDEBUG(D_INODE, "reint\n");
395 return mds_error(req);
400 printk(__FUNCTION__ ": no header\n");
404 if( req->rq_status) {
407 CDEBUG(D_INODE, "sending reply\n");
415 static void mds_timer_run(unsigned long __data)
417 struct task_struct * p = (struct task_struct *) __data;
422 int mds_main(void *arg)
424 struct mds_obd *mds = (struct mds_obd *) arg;
425 struct timer_list timer;
429 spin_lock_irq(¤t->sigmask_lock);
430 sigfillset(¤t->blocked);
431 recalc_sigpending(current);
432 spin_unlock_irq(¤t->sigmask_lock);
434 sprintf(current->comm, "lustre_mds");
436 /* Set up an interval timer which can be used to trigger a
437 wakeup after the interval expires */
439 timer.data = (unsigned long) current;
440 timer.function = mds_timer_run;
441 mds->mds_timer = &timer;
443 /* Record that the thread is running */
444 mds->mds_thread = current;
445 wake_up(&mds->mds_done_waitq);
447 printk(KERN_INFO "lustre_mds starting. Commit interval %d seconds\n",
448 mds->mds_interval / HZ);
450 /* XXX maintain a list of all managed devices: insert here */
452 /* And now, wait forever for commit wakeup events. */
456 if (mds->mds_flags & MDS_UNMOUNT)
459 wake_up(&mds->mds_done_waitq);
460 interruptible_sleep_on(&mds->mds_waitq);
462 CDEBUG(D_INODE, "lustre_mds wakes\n");
463 CDEBUG(D_INODE, "pick up req here and continue\n");
465 if (mds->mds_service != NULL) {
469 struct ptlrpc_request request;
471 rc = PtlEQGet(mds->mds_service->srv_eq, &ev);
472 if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
474 /* FIXME: If we move to an event-driven model,
475 * we should put the request on the stack of
476 * mds_handle instead. */
477 memset(&request, 0, sizeof(request));
478 request.rq_reqbuf = ev.mem_desc.start +
480 request.rq_reqlen = ev.mem_desc.length;
481 request.rq_obd = MDS;
482 request.rq_xid = ev.match_bits;
484 request.rq_peer.peer_nid = ev.initiator.nid;
485 /* FIXME: this NI should be the incoming NI.
486 * We don't know how to find that from here. */
487 request.rq_peer.peer_ni =
488 mds->mds_service->srv_self.peer_ni;
489 rc = mds_handle(&request);
492 struct ptlrpc_request *request;
494 if (list_empty(&mds->mds_reqs)) {
495 CDEBUG(D_INODE, "woke because of timer\n");
497 request = list_entry(mds->mds_reqs.next,
498 struct ptlrpc_request,
500 list_del(&request->rq_list);
501 rc = mds_handle(request);
506 del_timer_sync(mds->mds_timer);
508 /* XXX maintain a list of all managed devices: cleanup here */
510 mds->mds_thread = NULL;
511 wake_up(&mds->mds_done_waitq);
512 printk("lustre_mds: exiting\n");
516 static void mds_stop_srv_thread(struct mds_obd *mds)
518 mds->mds_flags |= MDS_UNMOUNT;
520 while (mds->mds_thread) {
521 wake_up(&mds->mds_waitq);
522 sleep_on(&mds->mds_done_waitq);
526 static void mds_start_srv_thread(struct mds_obd *mds)
528 init_waitqueue_head(&mds->mds_waitq);
529 init_waitqueue_head(&mds->mds_done_waitq);
530 kernel_thread(mds_main, (void *)mds,
531 CLONE_VM | CLONE_FS | CLONE_FILES);
532 while (!mds->mds_thread)
533 sleep_on(&mds->mds_done_waitq);
536 /* mount the file system (secretly) */
537 static int mds_setup(struct obd_device *obddev, obd_count len,
541 struct obd_ioctl_data* data = buf;
542 struct mds_obd *mds = &obddev->u.mds;
543 struct vfsmount *mnt;
544 struct lustre_peer peer;
549 mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
556 mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
557 if (!obddev->u.mds.mds_sb) {
562 mds->mds_vfsmnt = mnt;
563 obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
565 mds->mds_ctxt.pwdmnt = mnt;
566 mds->mds_ctxt.pwd = mnt->mnt_root;
567 mds->mds_ctxt.fs = KERNEL_DS;
568 mds->mds_remote_nid = 0;
570 INIT_LIST_HEAD(&mds->mds_reqs);
571 mds->mds_thread = NULL;
573 mds->mds_interval = 3 * HZ;
576 spin_lock_init(&obddev->u.mds.mds_lock);
578 err = kportal_uuid_to_peer("self", &peer);
580 mds->mds_service = kmalloc(sizeof(*mds->mds_service),
582 if (mds->mds_service == NULL)
584 mds->mds_service->srv_buf_size = 64 * 1024;
585 mds->mds_service->srv_portal = MDS_REQUEST_PORTAL;
586 memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer));
587 mds->mds_service->srv_wait_queue = &mds->mds_waitq;
589 rpc_register_service(mds->mds_service, "self");
592 mds_start_srv_thread(mds);
599 static int mds_cleanup(struct obd_device * obddev)
601 struct super_block *sb;
602 struct mds_obd *mds = &obddev->u.mds;
606 if ( !(obddev->obd_flags & OBD_SET_UP) ) {
611 if ( !list_empty(&obddev->obd_gen_clients) ) {
612 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
618 mds_stop_srv_thread(mds);
625 if (!list_empty(&mds->mds_reqs)) {
626 // XXX reply with errors and clean up
627 CDEBUG(D_INODE, "Request list not empty!\n");
631 mntput(mds->mds_vfsmnt);
633 kfree(mds->mds_fstype);
642 /* use obd ops to offer management infrastructure */
643 static struct obd_ops mds_obd_ops = {
645 o_cleanup: mds_cleanup,
648 static int __init mds_init(void)
650 obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
654 static void __exit mds_exit(void)
656 obd_unregister_type(LUSTRE_MDS_NAME);
659 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
660 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
661 MODULE_LICENSE("GPL");
664 // for testing (maybe this stays)
665 EXPORT_SYMBOL(mds_queue_req);
667 module_init(mds_init);
668 module_exit(mds_exit);