1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Lustre Metadata Server (mds) request handler
8 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
10 * This code is issued under the GNU General Public License.
11 * See the file COPYING in this distribution
13 * by Peter Braam <braam@clusterfs.com>
15 * This server is single threaded at present (but can easily be multi threaded)
21 #include <linux/version.h>
22 #include <linux/module.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
30 #define DEBUG_SUBSYSTEM S_MDS
32 #include <linux/lustre_mds.h>
33 #include <linux/lustre_lib.h>
34 #include <linux/lustre_net.h>
36 int mds_sendpage(struct ptlrpc_request *req, struct file *file,
37 __u64 offset, struct niobuf *dst)
40 mm_segment_t oldfs = get_fs();
42 OBD_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, -EIO);
44 if (req->rq_peer.peer_nid == 0) {
45 /* dst->addr is a user address, but in a different task! */
46 char *buf = (char *)(long)dst->addr;
49 rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
53 if (rc != PAGE_SIZE) {
59 struct ptlrpc_bulk_desc *bulk;
62 bulk = ptlrpc_prep_bulk(&req->rq_peer);
68 bulk->b_xid = req->rq_xid;
70 OBD_ALLOC(buf, PAGE_SIZE);
73 GOTO(cleanup_bulk, rc);
77 rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
81 if (rc != PAGE_SIZE) {
83 GOTO(cleanup_buf, rc);
87 bulk->b_buflen = PAGE_SIZE;
89 rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
90 wait_event_interruptible(bulk->b_waitq,
91 ptlrpc_check_bulk_sent(bulk));
93 if (bulk->b_flags == PTL_RPC_INTR) {
95 GOTO(cleanup_buf, rc);
100 OBD_FREE(buf, PAGE_SIZE);
102 OBD_FREE(bulk, sizeof(*bulk));
108 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
109 struct vfsmount **mnt)
111 /* stolen from NFS */
112 struct super_block *sb = mds->mds_sb;
113 unsigned long ino = fid->id;
114 __u32 generation = fid->generation;
116 struct list_head *lp;
117 struct dentry *result;
120 return ERR_PTR(-ESTALE);
122 inode = iget(sb, ino);
124 return ERR_PTR(-ENOMEM);
126 CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
128 if (is_bad_inode(inode) ||
129 (generation && inode->i_generation != generation)) {
130 /* we didn't find the right inode.. */
131 CERROR("bad inode %lu, link: %d ct: %d or version %u/%u\n",
133 inode->i_nlink, atomic_read(&inode->i_count),
138 return ERR_PTR(-ESTALE);
141 /* now to find a dentry.
142 * If possible, get a well-connected one
145 *mnt = mds->mds_vfsmnt;
146 spin_lock(&dcache_lock);
147 for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
148 result = list_entry(lp,struct dentry, d_alias);
149 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
151 result->d_vfs_flags |= DCACHE_REFERENCED;
152 spin_unlock(&dcache_lock);
159 spin_unlock(&dcache_lock);
160 result = d_alloc_root(inode);
161 if (result == NULL) {
163 return ERR_PTR(-ENOMEM);
167 result->d_flags |= DCACHE_NFSD_DISCONNECTED;
171 int mds_getattr(struct ptlrpc_request *req)
176 struct mds_obd *mds = &req->rq_obd->u.mds;
179 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
180 &req->rq_replen, &req->rq_repbuf);
181 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
182 CERROR("mds: out of memory\n");
183 req->rq_status = -ENOMEM;
187 req->rq_rephdr->xid = req->rq_reqhdr->xid;
188 rep = req->rq_rep.mds;
190 de = mds_fid2dentry(mds, &req->rq_req.mds->fid1, NULL);
192 req->rq_rephdr->status = -ENOENT;
197 rep->ino = inode->i_ino;
198 rep->generation = inode->i_generation;
199 rep->atime = inode->i_atime;
200 rep->ctime = inode->i_ctime;
201 rep->mtime = inode->i_mtime;
202 rep->uid = inode->i_uid;
203 rep->gid = inode->i_gid;
204 rep->size = inode->i_size;
205 rep->mode = inode->i_mode;
206 rep->nlink = inode->i_nlink;
208 mds_fs_get_objid(mds, inode, &rep->objid);
213 int mds_open(struct ptlrpc_request *req)
218 struct vfsmount *mnt;
222 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
223 &req->rq_replen, &req->rq_repbuf);
224 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
225 CERROR("mds: out of memory\n");
226 req->rq_status = -ENOMEM;
230 req->rq_rephdr->xid = req->rq_reqhdr->xid;
231 rep = req->rq_rep.mds;
233 de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
235 req->rq_rephdr->status = -ENOENT;
238 flags = req->rq_req.mds->flags;
239 file = dentry_open(de, mnt, flags);
240 if (!file || IS_ERR(file)) {
241 req->rq_rephdr->status = -EINVAL;
245 rep->objid = (__u64) (unsigned long)file;
249 int mds_close(struct ptlrpc_request *req)
254 struct vfsmount *mnt;
257 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
258 &req->rq_replen, &req->rq_repbuf);
259 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
260 CERROR("mds: out of memory\n");
261 req->rq_status = -ENOMEM;
265 req->rq_rephdr->xid = req->rq_reqhdr->xid;
266 rep = req->rq_rep.mds;
268 de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
270 req->rq_rephdr->status = -ENOENT;
274 file = (struct file *)(unsigned long) req->rq_req.mds->objid;
276 req->rq_rephdr->status = filp_close(file, 0);
283 int mds_readpage(struct ptlrpc_request *req)
285 struct vfsmount *mnt;
288 struct niobuf *niobuf;
294 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
295 &req->rq_replen, &req->rq_repbuf);
296 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
297 CERROR("mds: out of memory\n");
298 req->rq_status = -ENOMEM;
302 req->rq_rephdr->xid = req->rq_reqhdr->xid;
303 rep = req->rq_rep.mds;
305 de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
307 req->rq_rephdr->status = PTR_ERR(de);
311 CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
313 file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
314 /* note: in case of an error, dentry_open puts dentry */
316 req->rq_rephdr->status = PTR_ERR(file);
320 niobuf = mds_req_tgt(req->rq_req.mds);
322 /* to make this asynchronous make sure that the handling function
323 doesn't send a reply when this function completes. Instead a
324 callback function would send the reply */
325 rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf);
328 req->rq_rephdr->status = rc;
332 int mds_reint(struct ptlrpc_request *req)
336 struct mds_update_record rec;
338 buf = mds_req_tgt(req->rq_req.mds);
339 len = req->rq_req.mds->tgtlen;
341 rc = mds_update_unpack(buf, len, &rec);
342 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
343 CERROR("invalid record\n");
344 req->rq_status = -EINVAL;
347 /* rc will be used to interrupt a for loop over multiple records */
348 rc = mds_reint_rec(&rec, req);
352 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
353 struct ptlrpc_request *req)
356 struct ptlreq_hdr *hdr;
360 hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
362 if (NTOH__u32(hdr->type) != PTL_RPC_REQUEST) {
363 CERROR("lustre_mds: wrong packet type sent %d\n",
364 NTOH__u32(hdr->type));
369 rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen,
370 &req->rq_reqhdr, &req->rq_req);
371 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
372 CERROR("lustre_mds: Invalid request\n");
376 switch (req->rq_reqhdr->opc) {
379 CDEBUG(D_INODE, "getattr\n");
380 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
381 rc = mds_getattr(req);
385 CDEBUG(D_INODE, "readpage\n");
386 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
387 rc = mds_readpage(req);
391 CDEBUG(D_INODE, "reint\n");
392 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
397 CDEBUG(D_INODE, "open\n");
398 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
403 CDEBUG(D_INODE, "close\n");
404 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
409 rc = ptlrpc_error(dev, svc, req);
416 CERROR("no header\n");
421 if( req->rq_status) {
422 ptlrpc_error(dev, svc, req);
424 CDEBUG(D_NET, "sending reply\n");
425 ptlrpc_reply(dev, svc, req);
432 /* mount the file system (secretly) */
433 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
435 struct obd_ioctl_data* data = buf;
436 struct mds_obd *mds = &obddev->u.mds;
437 struct super_operations *s_ops;
438 struct super_block *sb;
439 struct vfsmount *mnt;
444 mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
447 CERROR("do_kern_mount failed: %d\n", err);
451 sb = mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
453 GOTO(err_put, (err = -ENODEV));
455 mds->mds_vfsmnt = mnt;
456 mds->mds_fstype = strdup(data->ioc_inlbuf2);
458 if (!strcmp(mds->mds_fstype, "ext3"))
459 mds->mds_fsops = &mds_ext3_fs_ops;
460 else if (!strcmp(mds->mds_fstype, "ext2"))
461 mds->mds_fsops = &mds_ext2_fs_ops;
463 CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype);
464 GOTO(err_kfree, (err = -EPERM));
467 mds->mds_ctxt.pwdmnt = mnt;
468 mds->mds_ctxt.pwd = mnt->mnt_root;
469 mds->mds_ctxt.fs = KERNEL_DS;
471 mds->mds_service = ptlrpc_init_svc(128 * 1024,
472 MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
474 if (!mds->mds_service) {
475 CERROR("failed to start service\n");
476 GOTO(err_kfree, (err = -EINVAL));
479 err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
481 CERROR("cannot start thread\n");
486 * Replace the client filesystem delete_inode method with our own,
487 * so that we can clear the object ID before the inode is deleted.
488 * The fs_delete_inode method will call cl_delete_inode for us.
490 * We need to do this for the MDS superblock only, hence we install
491 * a modified copy of the original superblock method table.
493 * We still assume that there is only a single MDS client filesystem
494 * type, as we don't have access to the mds struct in * delete_inode.
496 OBD_ALLOC(s_ops, sizeof(*s_ops));
497 memcpy(s_ops, sb->s_op, sizeof(*s_ops));
498 mds->mds_fsops->cl_delete_inode = s_ops->delete_inode;
499 s_ops->delete_inode = mds->mds_fsops->fs_delete_inode;
505 rpc_unregister_service(mds->mds_service);
506 OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
508 kfree(mds->mds_fstype);
510 unlock_kernel(); // XXX do we want/need this?
511 mntput(mds->mds_vfsmnt);
513 lock_kernel(); // XXX do we want/need this?
519 static int mds_cleanup(struct obd_device * obddev)
521 struct super_operations *s_ops = NULL;
522 struct super_block *sb;
523 struct mds_obd *mds = &obddev->u.mds;
527 if ( !list_empty(&obddev->obd_gen_clients) ) {
528 CERROR("still has clients!\n");
532 ptlrpc_stop_thread(mds->mds_service);
534 if (!list_empty(&mds->mds_service->srv_reqs)) {
535 // XXX reply with errors and clean up
536 CERROR("Request list not empty!\n");
539 rpc_unregister_service(mds->mds_service);
540 OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
549 mntput(mds->mds_vfsmnt);
551 kfree(mds->mds_fstype);
554 OBD_FREE(s_ops, sizeof(*s_ops));
560 /* use obd ops to offer management infrastructure */
561 static struct obd_ops mds_obd_ops = {
563 o_cleanup: mds_cleanup,
566 static int __init mds_init(void)
568 obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
572 static void __exit mds_exit(void)
574 obd_unregister_type(LUSTRE_MDS_NAME);
577 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
578 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
579 MODULE_LICENSE("GPL");
581 module_init(mds_init);
582 module_exit(mds_exit);