4 * Lustre Metadata Server (mds) request handler
6 * Copyright (C) 2001 Cluster File Systems, Inc.
8 * This code is issued under the GNU General Public License.
9 * See the file COPYING in this distribution
11 * by Peter Braam <braam@clusterfs.com>
13 * This server is single threaded at present (but can easily be multi threaded).
20 #include <linux/version.h>
21 #include <linux/module.h>
23 #include <linux/stat.h>
24 #include <linux/locks.h>
25 #include <linux/ext2_fs.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29 #include <linux/obd_support.h>
30 #include <linux/obd.h>
31 #include <linux/lustre_lib.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/obd_class.h>
37 static struct mds_obd *MDS;
39 // XXX make this networked!
40 static int mds_queue_req(struct mds_request *req)
42 struct mds_request *srv_req;
49 srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
55 printk("---> MDS at %d %p, incoming req %p, srv_req %p\n",
56 __LINE__, MDS, req, srv_req);
58 memset(srv_req, 0, sizeof(*req));
60 /* move the request buffer */
61 srv_req->rq_reqbuf = req->rq_reqbuf;
62 srv_req->rq_reqlen = req->rq_reqlen;
63 srv_req->rq_obd = MDS;
65 /* remember where it came from */
66 srv_req->rq_reply_handle = req;
68 list_add(&srv_req->rq_list, &MDS->mds_reqs);
69 wake_up(&MDS->mds_waitq);
73 /* XXX do this over the net */
74 int mds_sendpage(struct mds_request *req, struct file *file,
75 __u64 offset, struct niobuf *dst)
79 rc = generic_file_read(file, (char *)(long)dst->addr,
87 /* XXX replace with networking code */
88 int mds_reply(struct mds_request *req)
90 struct mds_request *clnt_req = req->rq_reply_handle;
94 /* free the request buffer */
95 kfree(req->rq_reqbuf);
96 req->rq_reqbuf = NULL;
98 /* move the reply to the client */
99 clnt_req->rq_replen = req->rq_replen;
100 clnt_req->rq_repbuf = req->rq_repbuf;
101 req->rq_repbuf = NULL;
104 /* wake up the client */
105 wake_up_interruptible(&clnt_req->rq_wait_for_rep);
110 int mds_error(struct mds_request *req)
112 struct mds_rep_hdr *hdr;
115 hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
121 memset(hdr, 0, sizeof(*hdr));
123 hdr->seqno = req->rq_reqhdr->seqno;
124 hdr->status = req->rq_status;
125 hdr->type = MDS_TYPE_ERR;
127 req->rq_repbuf = (char *)hdr;
128 req->rq_replen = sizeof(*hdr);
131 return mds_reply(req);
134 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt)
137 /* iget isn't really right if the inode is currently unallocated!!
138 * This should really all be done inside each filesystem
140 * ext2fs' read_inode has been strengthed to return a bad_inode if the inode
143 * Currently we don't know the generation for parent directory, so a generation
144 * of 0 means "accept any"
146 struct super_block *sb = mds->mds_sb;
147 unsigned long ino = fid->id;
148 //__u32 generation = fid->generation;
149 __u32 generation = 0;
151 struct list_head *lp;
152 struct dentry *result;
155 *mnt = mntget(mds->mds_vfsmnt);
159 return ERR_PTR(-ESTALE);
160 inode = iget(sb, ino);
162 return ERR_PTR(-ENOMEM);
164 printk("--> mds_fid2dentry: sb %p\n", inode->i_sb);
166 if (is_bad_inode(inode)
167 || (generation && inode->i_generation != generation)
169 /* we didn't find the right inode.. */
170 printk("mds_fid2dentry: Inode %lu, Bad count: %d %d or version %u %u\n",
172 inode->i_nlink, atomic_read(&inode->i_count),
177 return ERR_PTR(-ESTALE);
179 /* now to find a dentry.
180 * If possible, get a well-connected one
182 spin_lock(&dcache_lock);
183 for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
184 result = list_entry(lp,struct dentry, d_alias);
185 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
187 result->d_vfs_flags |= DCACHE_REFERENCED;
188 spin_unlock(&dcache_lock);
193 spin_unlock(&dcache_lock);
194 result = d_alloc_root(inode);
195 if (result == NULL) {
197 return ERR_PTR(-ENOMEM);
199 result->d_flags |= DCACHE_NFSD_DISCONNECTED;
203 int mds_getattr(struct mds_request *req)
205 struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req->fid1,
211 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
212 &req->rq_replen, &req->rq_repbuf);
215 printk("mds: out of memory\n");
216 req->rq_status = -ENOMEM;
220 req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
225 req->rq_rephdr->status = -ENOENT;
230 rep->ino = inode->i_ino;
231 rep->atime = inode->i_atime;
232 rep->ctime = inode->i_ctime;
233 rep->mtime = inode->i_mtime;
234 rep->uid = inode->i_uid;
235 rep->gid = inode->i_gid;
236 rep->size = inode->i_size;
237 rep->mode = inode->i_mode;
244 int mds_readpage(struct mds_request *req)
246 struct vfsmount *mnt;
247 struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req->fid1,
250 struct niobuf *niobuf;
254 printk("mds_readpage: ino %ld\n", de->d_inode->i_ino);
255 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
256 &req->rq_replen, &req->rq_repbuf);
259 printk("mds: out of memory\n");
260 req->rq_status = -ENOMEM;
264 req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
269 req->rq_rephdr->status = PTR_ERR(de);
273 file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
274 /* note: in case of an error, dentry_open puts dentry */
277 req->rq_rephdr->status = PTR_ERR(file);
281 niobuf = mds_req_tgt(req->rq_req);
283 /* to make this asynchronous make sure that the handling function
284 doesn't send a reply when this function completes. Instead a
285 callback function would send the reply */
286 rc = mds_sendpage(req, file, req->rq_req->size, niobuf);
289 req->rq_rephdr->status = rc;
294 int mds_reint(struct mds_request *req)
296 int opc = NTOH__u32(req->rq_req->opcode);
300 return mds_reint_setattr(req);
302 printk(__FUNCTION__ "opcode %d not handled.\n", opc);
309 //int mds_handle(struct mds_conn *conn, int len, char *buf)
310 int mds_handle(struct mds_request *req)
313 struct mds_req_hdr *hdr;
317 hdr = (struct mds_req_hdr *)req->rq_reqbuf;
319 if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
320 printk("lustre_mds: wrong packet type sent %d\n",
321 NTOH__u32(hdr->type));
326 rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen,
327 &req->rq_reqhdr, &req->rq_req);
329 printk("lustre_mds: Invalid request\n");
334 switch (req->rq_reqhdr->opc) {
337 CDEBUG(D_INODE, "getattr\n");
338 rc = mds_getattr(req);
342 CDEBUG(D_INODE, "readpage\n");
343 rc = mds_readpage(req);
347 CDEBUG(D_INODE, "reint\n");
352 return mds_error(req);
357 printk("mds: processing error %d\n", rc);
360 CDEBUG(D_INODE, "sending reply\n");
368 static void mds_timer_run(unsigned long __data)
370 struct task_struct * p = (struct task_struct *) __data;
375 int mds_main(void *arg)
377 struct mds_obd *mds = (struct mds_obd *) arg;
378 struct timer_list timer;
382 spin_lock_irq(¤t->sigmask_lock);
383 sigfillset(¤t->blocked);
384 recalc_sigpending(current);
385 spin_unlock_irq(¤t->sigmask_lock);
387 sprintf(current->comm, "lustre_mds");
389 /* Set up an interval timer which can be used to trigger a
390 wakeup after the interval expires */
392 timer.data = (unsigned long) current;
393 timer.function = mds_timer_run;
394 mds->mds_timer = &timer;
396 /* Record that the thread is running */
397 mds->mds_thread = current;
398 wake_up(&mds->mds_done_waitq);
400 printk(KERN_INFO "lustre_mds starting. Commit interval %d seconds\n",
401 mds->mds_interval / HZ);
403 /* XXX maintain a list of all managed devices: insert here */
405 /* And now, wait forever for commit wakeup events. */
407 struct mds_request *request;
410 if (mds->mds_flags & MDS_UNMOUNT)
414 wake_up(&mds->mds_done_waitq);
415 interruptible_sleep_on(&mds->mds_waitq);
417 CDEBUG(D_INODE, "lustre_mds wakes\n");
418 CDEBUG(D_INODE, "pick up req here and continue\n");
420 if (list_empty(&mds->mds_reqs)) {
421 CDEBUG(D_INODE, "woke because of timer\n");
423 request = list_entry(mds->mds_reqs.next,
424 struct mds_request, rq_list);
425 list_del(&request->rq_list);
426 rc = mds_handle(request);
430 del_timer_sync(mds->mds_timer);
432 /* XXX maintain a list of all managed devices: cleanup here */
434 mds->mds_thread = NULL;
435 wake_up(&mds->mds_done_waitq);
436 printk("lustre_mds: exiting\n");
440 static void mds_stop_srv_thread(struct mds_obd *mds)
442 mds->mds_flags |= MDS_UNMOUNT;
444 while (mds->mds_thread) {
445 wake_up(&mds->mds_waitq);
446 sleep_on(&mds->mds_done_waitq);
450 static void mds_start_srv_thread(struct mds_obd *mds)
452 init_waitqueue_head(&mds->mds_waitq);
453 init_waitqueue_head(&mds->mds_done_waitq);
454 kernel_thread(mds_main, (void *)mds,
455 CLONE_VM | CLONE_FS | CLONE_FILES);
456 while (!mds->mds_thread)
457 sleep_on(&mds->mds_done_waitq);
460 /* mount the file system (secretly) */
461 static int mds_setup(struct obd_device *obddev, obd_count len,
465 struct obd_ioctl_data* data = buf;
466 struct mds_obd *mds = &obddev->u.mds;
467 struct vfsmount *mnt;
471 mnt = do_kern_mount(data->ioc_inlbuf2, 0,
472 data->ioc_inlbuf1, NULL);
479 mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
480 if (!obddev->u.mds.mds_sb) {
485 INIT_LIST_HEAD(&mds->mds_reqs);
486 mds->mds_thread = NULL;
488 mds->mds_interval = 3 * HZ;
489 mds->mds_vfsmnt = mnt;
490 obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
492 mds->mds_ctxt.pwdmnt = mnt;
493 mds->mds_ctxt.pwd = mnt->mnt_root;
494 mds->mds_ctxt.fs = KERNEL_DS;
497 spin_lock_init(&obddev->u.mds.mds_lock);
499 mds_start_srv_thread(mds);
506 static int mds_cleanup(struct obd_device * obddev)
508 struct super_block *sb;
509 struct mds_obd *mds = &obddev->u.mds;
513 if ( !(obddev->obd_flags & OBD_SET_UP) ) {
518 if ( !list_empty(&obddev->obd_gen_clients) ) {
519 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
525 mds_stop_srv_thread(mds);
532 if (!list_empty(&mds->mds_reqs)) {
533 // XXX reply with errors and clean up
534 CDEBUG(D_INODE, "Request list not empty!\n");
538 mntput(mds->mds_vfsmnt);
540 kfree(mds->mds_fstype);
549 /* use obd ops to offer management infrastructure */
550 static struct obd_ops mds_obd_ops = {
552 o_cleanup: mds_cleanup,
555 static int __init mds_init(void)
557 obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
561 static void __exit mds_exit(void)
563 obd_unregister_type(LUSTRE_MDS_NAME);
566 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
567 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
568 MODULE_LICENSE("GPL");
571 // for testing (maybe this stays)
572 EXPORT_SYMBOL(mds_queue_req);
574 module_init(mds_init);
575 module_exit(mds_exit);