1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Lustre Metadata Server (mds) request handler
8 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
10 * This code is issued under the GNU General Public License.
11 * See the file COPYING in this distribution
13 * by Peter Braam <braam@clusterfs.com>
15 * This server is single threaded at present (but can easily be multi threaded)
21 #include <linux/version.h>
22 #include <linux/module.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
30 #define DEBUG_SUBSYSTEM S_MDS
32 #include <linux/lustre_mds.h>
33 #include <linux/lustre_lib.h>
34 #include <linux/lustre_net.h>
36 int mds_sendpage(struct ptlrpc_request *req, struct file *file,
37 __u64 offset, struct niobuf *dst)
40 mm_segment_t oldfs = get_fs();
42 if (req->rq_peer.peer_nid == 0) {
43 /* dst->addr is a user address, but in a different task! */
44 char *buf = (char *)(long)dst->addr;
47 rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
51 if (rc != PAGE_SIZE) {
57 struct ptlrpc_bulk_desc *bulk;
60 bulk = ptlrpc_prep_bulk(&req->rq_peer);
66 bulk->b_xid = req->rq_xid;
68 OBD_ALLOC(buf, PAGE_SIZE);
71 GOTO(cleanup_bulk, rc);
75 rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
79 if (rc != PAGE_SIZE) {
81 GOTO(cleanup_buf, rc);
85 bulk->b_buflen = PAGE_SIZE;
87 rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
88 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
89 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
90 OBD_FAIL_MDS_SENDPAGE, rc);
91 PtlMDUnlink(bulk->b_md_h);
92 GOTO(cleanup_buf, rc);
94 wait_event_interruptible(bulk->b_waitq,
95 ptlrpc_check_bulk_sent(bulk));
97 if (bulk->b_flags == PTL_RPC_INTR) {
99 GOTO(cleanup_buf, rc);
104 OBD_FREE(buf, PAGE_SIZE);
106 OBD_FREE(bulk, sizeof(*bulk));
112 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
113 struct vfsmount **mnt)
115 /* stolen from NFS */
116 struct super_block *sb = mds->mds_sb;
117 unsigned long ino = fid->id;
118 __u32 generation = fid->generation;
120 struct list_head *lp;
121 struct dentry *result;
124 return ERR_PTR(-ESTALE);
126 inode = iget(sb, ino);
128 return ERR_PTR(-ENOMEM);
130 CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
132 if (is_bad_inode(inode) ||
133 (generation && inode->i_generation != generation)) {
134 /* we didn't find the right inode.. */
135 CERROR("bad inode %lu, link: %d ct: %d or version %u/%u\n",
137 inode->i_nlink, atomic_read(&inode->i_count),
142 return ERR_PTR(-ESTALE);
145 /* now to find a dentry.
146 * If possible, get a well-connected one
149 *mnt = mds->mds_vfsmnt;
150 spin_lock(&dcache_lock);
151 for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
152 result = list_entry(lp,struct dentry, d_alias);
153 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
155 result->d_vfs_flags |= DCACHE_REFERENCED;
156 spin_unlock(&dcache_lock);
163 spin_unlock(&dcache_lock);
164 result = d_alloc_root(inode);
165 if (result == NULL) {
167 return ERR_PTR(-ENOMEM);
171 result->d_flags |= DCACHE_NFSD_DISCONNECTED;
175 int mds_getattr(struct ptlrpc_request *req)
179 struct mds_body *body;
180 struct mds_obd *mds = &req->rq_obd->u.mds;
181 int rc, size = sizeof(*body);
184 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
185 req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
186 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
187 CERROR("mds: out of memory\n");
188 req->rq_status = -ENOMEM;
192 body = lustre_msg_buf(req->rq_reqmsg, 0);
193 de = mds_fid2dentry(mds, &body->fid1, NULL);
195 req->rq_status = -ENOENT;
199 body = lustre_msg_buf(req->rq_repmsg, 0);
201 body->ino = inode->i_ino;
202 body->generation = inode->i_generation;
203 body->atime = inode->i_atime;
204 body->ctime = inode->i_ctime;
205 body->mtime = inode->i_mtime;
206 body->uid = inode->i_uid;
207 body->gid = inode->i_gid;
208 body->size = inode->i_size;
209 body->mode = inode->i_mode;
210 body->nlink = inode->i_nlink;
212 mds_fs_get_objid(mds, inode, &body->objid);
217 int mds_open(struct ptlrpc_request *req)
220 struct mds_body *body;
222 struct vfsmount *mnt;
224 int rc, size = sizeof(*body);
227 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
228 req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
229 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
230 CERROR("mds: out of memory\n");
231 req->rq_status = -ENOMEM;
235 body = lustre_msg_buf(req->rq_reqmsg, 0);
236 de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
238 req->rq_status = -ENOENT;
242 file = dentry_open(de, mnt, flags);
243 if (!file || IS_ERR(file)) {
244 req->rq_status = -EINVAL;
248 body = lustre_msg_buf(req->rq_repmsg, 0);
249 body->objid = (__u64) (unsigned long)file;
253 int mds_close(struct ptlrpc_request *req)
256 struct mds_body *body;
258 struct vfsmount *mnt;
262 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
263 req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
264 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
265 CERROR("mds: out of memory\n");
266 req->rq_status = -ENOMEM;
270 body = lustre_msg_buf(req->rq_reqmsg, 0);
271 de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
273 req->rq_status = -ENOENT;
277 file = (struct file *)(unsigned long)body->objid;
278 req->rq_status = filp_close(file, 0);
285 int mds_readpage(struct ptlrpc_request *req)
287 struct vfsmount *mnt;
290 struct niobuf *niobuf;
291 struct mds_body *body;
292 int rc, size = sizeof(*body);
295 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
296 req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
297 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
298 CERROR("mds: out of memory\n");
299 req->rq_status = -ENOMEM;
303 body = lustre_msg_buf(req->rq_reqmsg, 0);
304 de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
306 req->rq_status = PTR_ERR(de);
310 CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
312 file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
313 /* note: in case of an error, dentry_open puts dentry */
315 req->rq_status = PTR_ERR(file);
319 niobuf = lustre_msg_buf(req->rq_reqmsg, 1);
321 req->rq_status = -EINVAL;
326 /* to make this asynchronous make sure that the handling function
327 doesn't send a reply when this function completes. Instead a
328 callback function would send the reply */
329 rc = mds_sendpage(req, file, body->size, niobuf);
336 int mds_reint(struct ptlrpc_request *req)
339 struct mds_update_record rec;
341 rc = mds_update_unpack(req, &rec);
342 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
343 CERROR("invalid record\n");
344 req->rq_status = -EINVAL;
347 /* rc will be used to interrupt a for loop over multiple records */
348 rc = mds_reint_rec(&rec, req);
352 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
353 struct ptlrpc_request *req)
358 rc = lustre_unpack_msg(req->rq_reqbuf, req->rq_reqlen);
359 req->rq_reqmsg = (struct lustre_msg *)req->rq_reqbuf;
360 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
361 CERROR("lustre_mds: Invalid request\n");
365 if (req->rq_reqmsg->type != PTL_RPC_REQUEST) {
366 CERROR("lustre_mds: wrong packet type sent %d\n",
367 req->rq_reqmsg->type);
368 GOTO(out, rc = -EINVAL);
371 switch (req->rq_reqmsg->opc) {
373 CDEBUG(D_INODE, "getattr\n");
374 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
375 rc = mds_getattr(req);
379 CDEBUG(D_INODE, "readpage\n");
380 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
381 rc = mds_readpage(req);
383 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
388 CDEBUG(D_INODE, "reint\n");
389 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
394 CDEBUG(D_INODE, "open\n");
395 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
400 CDEBUG(D_INODE, "close\n");
401 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
406 rc = ptlrpc_error(svc, req);
413 ptlrpc_error(svc, req);
415 CDEBUG(D_NET, "sending reply\n");
416 ptlrpc_reply(svc, req);
422 static int mds_prep(struct obd_device *obddev)
424 struct obd_run_ctxt saved;
425 struct mds_obd *mds = &obddev->u.mds;
426 struct super_operations *s_ops;
429 mds->mds_service = ptlrpc_init_svc(128 * 1024,
430 MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
433 if (!mds->mds_service) {
434 CERROR("failed to start service\n");
438 err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
440 CERROR("cannot start thread\n");
444 push_ctxt(&saved, &mds->mds_ctxt);
445 err = simple_mkdir(current->fs->pwd, "ROOT", 0700);
446 err = simple_mkdir(current->fs->pwd, "FH", 0700);
450 * Replace the client filesystem delete_inode method with our own,
451 * so that we can clear the object ID before the inode is deleted.
452 * The fs_delete_inode method will call cl_delete_inode for us.
454 * We need to do this for the MDS superblock only, hence we install
455 * a modified copy of the original superblock method table.
457 * We still assume that there is only a single MDS client filesystem
458 * type, as we don't have access to the mds struct in * delete_inode.
460 OBD_ALLOC(s_ops, sizeof(*s_ops));
461 memcpy(s_ops, mds->mds_sb->s_op, sizeof(*s_ops));
462 mds->mds_fsops->cl_delete_inode = s_ops->delete_inode;
463 s_ops->delete_inode = mds->mds_fsops->fs_delete_inode;
464 mds->mds_sb->s_op = s_ops;
469 rpc_unregister_service(mds->mds_service);
470 OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
475 /* mount the file system (secretly) */
476 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
478 struct obd_ioctl_data* data = buf;
479 struct mds_obd *mds = &obddev->u.mds;
480 struct vfsmount *mnt;
484 #ifdef CONFIG_DEV_RDONLY
487 mds->mds_fstype = strdup(data->ioc_inlbuf2);
489 if (!strcmp(mds->mds_fstype, "ext3"))
490 mds->mds_fsops = &mds_ext3_fs_ops;
491 else if (!strcmp(mds->mds_fstype, "ext2"))
492 mds->mds_fsops = &mds_ext2_fs_ops;
494 CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype);
495 GOTO(err_kfree, (err = -EPERM));
499 mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
501 CERROR("do_kern_mount failed: %d\n", err);
502 GOTO(err_dec, err = PTR_ERR(mnt));
505 mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
507 GOTO(err_put, (err = -ENODEV));
509 mds->mds_vfsmnt = mnt;
510 mds->mds_ctxt.pwdmnt = mnt;
511 mds->mds_ctxt.pwd = mnt->mnt_root;
512 mds->mds_ctxt.fs = KERNEL_DS;
514 err = mds_prep(obddev);
522 mntput(mds->mds_vfsmnt);
528 kfree(mds->mds_fstype);
532 static int mds_cleanup(struct obd_device * obddev)
534 struct super_operations *s_ops = NULL;
535 struct super_block *sb;
536 struct mds_obd *mds = &obddev->u.mds;
540 if ( !list_empty(&obddev->obd_gen_clients) ) {
541 CERROR("still has clients!\n");
545 ptlrpc_stop_thread(mds->mds_service);
546 rpc_unregister_service(mds->mds_service);
547 if (!list_empty(&mds->mds_service->srv_reqs)) {
548 // XXX reply with errors and clean up
549 CERROR("Request list not empty!\n");
551 OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
560 mntput(mds->mds_vfsmnt);
562 kfree(mds->mds_fstype);
564 #ifdef CONFIG_DEV_RDONLY
567 OBD_FREE(s_ops, sizeof(*s_ops));
573 /* use obd ops to offer management infrastructure */
574 static struct obd_ops mds_obd_ops = {
576 o_cleanup: mds_cleanup,
579 static int __init mds_init(void)
581 obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
585 static void __exit mds_exit(void)
587 obd_unregister_type(LUSTRE_MDS_NAME);
590 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
591 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
592 MODULE_LICENSE("GPL");
594 module_init(mds_init);
595 module_exit(mds_exit);