1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Lustre Metadata Server (mds) request handler
8 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
10 * This code is issued under the GNU General Public License.
11 * See the file COPYING in this distribution
13 * by Peter Braam <braam@clusterfs.com>
15 * This server is single threaded at present (but can easily be multi threaded)
21 #include <linux/version.h>
22 #include <linux/module.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
30 #define DEBUG_SUBSYSTEM S_MDS
32 #include <linux/lustre_mds.h>
33 #include <linux/lustre_lib.h>
34 #include <linux/lustre_net.h>
36 int mds_sendpage(struct ptlrpc_request *req, struct file *file,
37 __u64 offset, struct niobuf *dst)
40 mm_segment_t oldfs = get_fs();
41 struct ptlrpc_bulk_desc *bulk;
44 bulk = ptlrpc_prep_bulk(req->rq_connection);
50 bulk->b_xid = req->rq_reqmsg->xid;
52 OBD_ALLOC(buf, PAGE_SIZE);
55 GOTO(cleanup_bulk, rc);
59 rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
63 if (rc != PAGE_SIZE) {
65 GOTO(cleanup_buf, rc);
69 bulk->b_buflen = PAGE_SIZE;
71 rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
72 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
73 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
74 OBD_FAIL_MDS_SENDPAGE, rc);
75 PtlMDUnlink(bulk->b_md_h);
76 GOTO(cleanup_buf, rc);
78 wait_event_interruptible(bulk->b_waitq,
79 ptlrpc_check_bulk_sent(bulk));
81 if (bulk->b_flags & PTL_RPC_FL_INTR) {
83 GOTO(cleanup_buf, rc);
88 OBD_FREE(buf, PAGE_SIZE);
90 ptlrpc_free_bulk(bulk);
95 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
96 struct vfsmount **mnt)
99 struct super_block *sb = mds->mds_sb;
100 unsigned long ino = fid->id;
101 __u32 generation = fid->generation;
103 struct list_head *lp;
104 struct dentry *result;
107 return ERR_PTR(-ESTALE);
109 inode = iget(sb, ino);
111 return ERR_PTR(-ENOMEM);
113 CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
115 if (is_bad_inode(inode) ||
116 (generation && inode->i_generation != generation)) {
117 /* we didn't find the right inode.. */
118 CERROR("bad inode %lu, link: %d ct: %d or version %u/%u\n",
120 inode->i_nlink, atomic_read(&inode->i_count),
125 return ERR_PTR(-ESTALE);
128 /* now to find a dentry.
129 * If possible, get a well-connected one
132 *mnt = mds->mds_vfsmnt;
133 spin_lock(&dcache_lock);
134 for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
135 result = list_entry(lp,struct dentry, d_alias);
136 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
138 result->d_vfs_flags |= DCACHE_REFERENCED;
139 spin_unlock(&dcache_lock);
146 spin_unlock(&dcache_lock);
147 result = d_alloc_root(inode);
148 if (result == NULL) {
150 return ERR_PTR(-ENOMEM);
154 result->d_flags |= DCACHE_NFSD_DISCONNECTED;
158 int mds_connect(struct ptlrpc_request *req)
160 struct mds_body *body;
161 struct mds_obd *mds = &req->rq_obd->u.mds;
162 int rc, size = sizeof(*body);
165 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
166 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_PACK)) {
167 CERROR("mds: out of memory\n");
168 req->rq_status = -ENOMEM;
172 body = lustre_msg_buf(req->rq_reqmsg, 0);
173 mds_unpack_req_body(req);
174 /* Anything we need to do here with the client's trans no or so? */
176 body = lustre_msg_buf(req->rq_repmsg, 0);
177 memcpy(&body->fid1, &mds->mds_rootfid , sizeof(body->fid1));
178 mds_pack_rep_body(req);
182 int mds_getattr(struct ptlrpc_request *req)
186 struct mds_body *body;
187 struct mds_obd *mds = &req->rq_obd->u.mds;
188 int rc, size = sizeof(*body);
191 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
192 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
193 CERROR("mds: out of memory\n");
194 req->rq_status = -ENOMEM;
198 body = lustre_msg_buf(req->rq_reqmsg, 0);
199 de = mds_fid2dentry(mds, &body->fid1, NULL);
201 req->rq_status = -ENOENT;
205 body = lustre_msg_buf(req->rq_repmsg, 0);
207 body->ino = inode->i_ino;
208 body->generation = inode->i_generation;
209 body->atime = inode->i_atime;
210 body->ctime = inode->i_ctime;
211 body->mtime = inode->i_mtime;
212 body->uid = inode->i_uid;
213 body->gid = inode->i_gid;
214 body->size = inode->i_size;
215 body->mode = inode->i_mode;
216 body->nlink = inode->i_nlink;
218 mds_fs_get_objid(mds, inode, &body->objid);
223 int mds_open(struct ptlrpc_request *req)
226 struct mds_body *body;
228 struct vfsmount *mnt;
230 int rc, size = sizeof(*body);
233 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
234 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
235 CERROR("mds: out of memory\n");
236 req->rq_status = -ENOMEM;
240 body = lustre_msg_buf(req->rq_reqmsg, 0);
241 de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
243 req->rq_status = -ENOENT;
247 file = dentry_open(de, mnt, flags);
248 if (!file || IS_ERR(file)) {
249 req->rq_status = -EINVAL;
253 body = lustre_msg_buf(req->rq_repmsg, 0);
254 body->objid = (__u64) (unsigned long)file;
258 int mds_close(struct ptlrpc_request *req)
261 struct mds_body *body;
263 struct vfsmount *mnt;
267 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
268 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
269 CERROR("mds: out of memory\n");
270 req->rq_status = -ENOMEM;
274 body = lustre_msg_buf(req->rq_reqmsg, 0);
275 de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
277 req->rq_status = -ENOENT;
281 file = (struct file *)(unsigned long)body->objid;
282 req->rq_status = filp_close(file, 0);
289 int mds_readpage(struct ptlrpc_request *req)
291 struct vfsmount *mnt;
294 struct niobuf *niobuf;
295 struct mds_body *body;
296 int rc, size = sizeof(*body);
299 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
300 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
301 CERROR("mds: out of memory\n");
302 req->rq_status = -ENOMEM;
306 body = lustre_msg_buf(req->rq_reqmsg, 0);
307 de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
309 req->rq_status = PTR_ERR(de);
313 CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
315 file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
316 /* note: in case of an error, dentry_open puts dentry */
318 req->rq_status = PTR_ERR(file);
322 niobuf = lustre_msg_buf(req->rq_reqmsg, 1);
324 req->rq_status = -EINVAL;
329 /* to make this asynchronous make sure that the handling function
330 doesn't send a reply when this function completes. Instead a
331 callback function would send the reply */
332 rc = mds_sendpage(req, file, body->size, niobuf);
339 int mds_reint(struct ptlrpc_request *req)
342 struct mds_update_record rec;
344 rc = mds_update_unpack(req, &rec);
345 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
346 CERROR("invalid record\n");
347 req->rq_status = -EINVAL;
350 /* rc will be used to interrupt a for loop over multiple records */
351 rc = mds_reint_rec(&rec, req);
355 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
356 struct ptlrpc_request *req)
361 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
362 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
363 CERROR("lustre_mds: Invalid request\n");
367 if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
368 CERROR("lustre_mds: wrong packet type sent %d\n",
369 req->rq_reqmsg->type);
370 GOTO(out, rc = -EINVAL);
373 switch (req->rq_reqmsg->opc) {
375 CDEBUG(D_INODE, "getattr\n");
376 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
377 rc = mds_connect(req);
381 CDEBUG(D_INODE, "getattr\n");
382 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
383 rc = mds_getattr(req);
387 CDEBUG(D_INODE, "readpage\n");
388 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
389 rc = mds_readpage(req);
391 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
396 CDEBUG(D_INODE, "reint\n");
397 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
402 CDEBUG(D_INODE, "open\n");
403 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
408 CDEBUG(D_INODE, "close\n");
409 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
414 rc = ptlrpc_error(svc, req);
421 ptlrpc_error(svc, req);
423 CDEBUG(D_NET, "sending reply\n");
424 ptlrpc_reply(svc, req);
430 #define LAST_RCVD "last_rcvd"
432 static int mds_prep(struct obd_device *obddev)
434 struct obd_run_ctxt saved;
435 struct mds_obd *mds = &obddev->u.mds;
436 struct super_operations *s_ops;
442 push_ctxt(&saved, &mds->mds_ctxt);
443 rc = simple_mkdir(current->fs->pwd, "ROOT", 0700);
444 if (rc && rc != -EEXIST) {
445 CERROR("cannot create ROOT directory: rc = %d\n", rc);
448 f = filp_open("ROOT", O_RDONLY, 0);
451 CERROR("cannot open ROOT: rc = %d\n", rc);
456 mds->mds_rootfid.id = f->f_dentry->d_inode->i_ino;
457 mds->mds_rootfid.generation = f->f_dentry->d_inode->i_generation;
458 mds->mds_rootfid.f_type = S_IFDIR;
460 rc = filp_close(f, 0);
462 CERROR("cannot close ROOT: rc = %d\n", rc);
466 rc = simple_mkdir(current->fs->pwd, "FH", 0700);
467 if (rc && rc != -EEXIST) {
468 CERROR("cannot create FH directory: rc = %d\n", rc);
472 f = filp_open("mount_count", O_RDWR | O_CREAT, 0644);
475 CERROR("cannot open/create mount_count file, rc = %d\n", rc);
476 GOTO(err_pop, rc = PTR_ERR(f));
478 rc = lustre_fread(f, (char *)&mount_count, sizeof(mount_count), &off);
480 CERROR("empty MDS mount_count, new MDS?\n");
481 /* XXX maybe this should just be a random number? */
482 mds->mds_mount_count = 0;
483 } else if (rc != sizeof(mount_count)) {
484 CERROR("error reading mount_count: rc = %d\n", rc);
485 /* XXX maybe this should just be a random number? */
486 mds->mds_mount_count = 0;
488 mds->mds_mount_count = le64_to_cpu(mount_count);
491 mds->mds_mount_count++;
492 CDEBUG(D_SUPER, "MDS mount_count is %Ld\n",
493 (unsigned long long)mds->mds_mount_count);
495 mount_count = cpu_to_le64(mds->mds_mount_count);
496 rc = lustre_fwrite(f, (char *)&mount_count, sizeof(mount_count), &off);
497 if (rc != sizeof(mount_count))
498 CERROR("error writing mount_count: rc = %d\n", rc);
499 rc = filp_close(f, 0);
501 CERROR("error closing mount_count: rc = %d\n", rc);
503 f = filp_open("last_rcvd", O_RDWR | O_CREAT, 0644);
506 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
509 mds->mds_rcvd_filp = f;
513 * Replace the client filesystem delete_inode method with our own,
514 * so that we can clear the object ID before the inode is deleted.
515 * The fs_delete_inode method will call cl_delete_inode for us.
517 * We need to do this for the MDS superblock only, hence we install
518 * a modified copy of the original superblock method table.
520 * We still assume that there is only a single MDS client filesystem
521 * type, as we don't have access to the mds struct in delete_inode
522 * and store the client delete_inode method in a global table. This
523 * will only become a problem when multiple MDSs are running on a
524 * single host with different client filesystems.
526 OBD_ALLOC(s_ops, sizeof(*s_ops));
528 GOTO(err_filp, rc = -ENOMEM);
530 memcpy(s_ops, mds->mds_sb->s_op, sizeof(*s_ops));
531 mds->mds_fsops->cl_delete_inode = s_ops->delete_inode;
532 s_ops->delete_inode = mds->mds_fsops->fs_delete_inode;
533 mds->mds_sb->s_op = s_ops;
535 mds->mds_service = ptlrpc_init_svc(128 * 1024,
536 MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
539 if (!mds->mds_service) {
540 CERROR("failed to start service\n");
541 GOTO(err_filp, rc = -EINVAL);
544 rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
546 CERROR("cannot start thread: rc = %d\n", rc);
553 rpc_unregister_service(mds->mds_service);
554 OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
556 if (filp_close(f, 0))
557 CERROR("can't close %s after error\n", LAST_RCVD);
564 /* mount the file system (secretly) */
565 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
567 struct obd_ioctl_data* data = buf;
568 struct mds_obd *mds = &obddev->u.mds;
569 struct vfsmount *mnt;
573 #ifdef CONFIG_DEV_RDONLY
576 if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
579 mds->mds_fstype = strdup(data->ioc_inlbuf2);
581 if (!strcmp(mds->mds_fstype, "ext3"))
582 mds->mds_fsops = &mds_ext3_fs_ops;
583 else if (!strcmp(mds->mds_fstype, "ext2"))
584 mds->mds_fsops = &mds_ext2_fs_ops;
586 CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype);
587 GOTO(err_kfree, rc = -EPERM);
591 mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
594 CERROR("do_kern_mount failed: rc = %d\n", rc);
598 mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
600 GOTO(err_put, rc = -ENODEV);
602 mds->mds_vfsmnt = mnt;
603 mds->mds_ctxt.pwdmnt = mnt;
604 mds->mds_ctxt.pwd = mnt->mnt_root;
605 mds->mds_ctxt.fs = KERNEL_DS;
607 rc = mds_prep(obddev);
615 mntput(mds->mds_vfsmnt);
621 kfree(mds->mds_fstype);
625 static int mds_cleanup(struct obd_device * obddev)
627 struct super_operations *s_ops = NULL;
628 struct super_block *sb;
629 struct mds_obd *mds = &obddev->u.mds;
633 if ( !list_empty(&obddev->obd_gen_clients) ) {
634 CERROR("still has clients!\n");
638 ptlrpc_stop_thread(mds->mds_service);
639 rpc_unregister_service(mds->mds_service);
640 if (!list_empty(&mds->mds_service->srv_reqs)) {
641 // XXX reply with errors and clean up
642 CERROR("Request list not empty!\n");
644 OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
650 if (mds->mds_rcvd_filp) {
651 int rc = filp_close(mds->mds_rcvd_filp, 0);
652 mds->mds_rcvd_filp = NULL;
655 CERROR("last_rcvd file won't close, rc=%d\n", rc);
660 mntput(mds->mds_vfsmnt);
662 kfree(mds->mds_fstype);
664 #ifdef CONFIG_DEV_RDONLY
667 OBD_FREE(s_ops, sizeof(*s_ops));
673 /* use obd ops to offer management infrastructure */
674 static struct obd_ops mds_obd_ops = {
676 o_cleanup: mds_cleanup,
679 static int __init mds_init(void)
681 obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
685 static void __exit mds_exit(void)
687 obd_unregister_type(LUSTRE_MDS_NAME);
690 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
691 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
692 MODULE_LICENSE("GPL");
694 module_init(mds_init);
695 module_exit(mds_exit);