1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Lustre Metadata Server (mds) request handler
8 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
10 * This code is issued under the GNU General Public License.
11 * See the file COPYING in this distribution
13 * by Peter Braam <braam@clusterfs.com>
15 * This server is single threaded at present (but can easily be multi threaded)
21 #include <linux/version.h>
22 #include <linux/module.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/ext2_fs.h>
27 #include <linux/quotaops.h>
28 #include <asm/unistd.h>
29 #include <asm/uaccess.h>
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_net.h>
37 struct buffer_head *ext3_bread(void *handle, struct inode *inode,
38 int block, int create, int *err);
40 int mds_sendpage(struct ptlrpc_request *req, struct file *file,
41 __u64 offset, struct niobuf *dst)
44 mm_segment_t oldfs = get_fs();
46 OBD_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, -EIO);
48 if (req->rq_peer.peer_nid == 0) {
49 struct inode *inode = file->f_dentry->d_inode;
50 char *buf = (char *)(long)dst->addr;
52 /* dst->addr is a user address, but in a different task! */
54 /* FIXME: we need to use ext3_bread because ext3 does not
55 * have the directories in page cache yet. If we
56 * just use generic_file_read() then the pages we
57 * get are in a different address space than those
58 * used by the filesystem == cache incoherency.
60 if (S_ISREG(inode->i_mode))
61 rc = file->f_op->read(file, buf, PAGE_SIZE, &offset);
62 else if (!strcmp(inode->i_sb->s_type->name, "ext3")) {
63 struct buffer_head *bh;
65 bh = ext3_bread(NULL, inode,
66 offset >> inode->i_sb->s_blocksize_bits,
70 memcpy(buf, bh->b_data, inode->i_blksize);
72 rc = inode->i_blksize;
75 rc = generic_file_read(file, buf, PAGE_SIZE, &offset);
79 if (rc != PAGE_SIZE) {
85 struct inode *inode = file->f_dentry->d_inode;
86 struct ptlrpc_bulk_desc *bulk;
89 bulk = ptlrpc_prep_bulk(&req->rq_peer);
95 bulk->b_xid = req->rq_xid;
97 OBD_ALLOC(buf, PAGE_SIZE);
100 GOTO(cleanup_bulk, rc);
104 /* FIXME: see comments above */
105 if (S_ISREG(inode->i_mode))
106 rc = file->f_op->read(file, buf, PAGE_SIZE, &offset);
107 else if (!strcmp(inode->i_sb->s_type->name, "ext3")) {
108 struct buffer_head *bh;
110 bh = ext3_bread(NULL, inode, offset >> inode->i_blkbits,
114 memcpy(buf, bh->b_data, inode->i_blksize);
116 rc = inode->i_blksize;
119 rc = generic_file_read(file, buf, PAGE_SIZE, &offset);
123 if (rc != PAGE_SIZE) {
125 GOTO(cleanup_buf, rc);
129 bulk->b_buflen = PAGE_SIZE;
131 rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
132 wait_event_interruptible(bulk->b_waitq,
133 ptlrpc_check_bulk_sent(bulk));
135 if (bulk->b_flags == PTL_RPC_INTR) {
137 GOTO(cleanup_buf, rc);
142 OBD_FREE(buf, PAGE_SIZE);
144 OBD_FREE(bulk, sizeof(*bulk));
150 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
151 struct vfsmount **mnt)
153 /* stolen from NFS */
154 struct super_block *sb = mds->mds_sb;
155 unsigned long ino = fid->id;
156 __u32 generation = fid->generation;
158 struct list_head *lp;
159 struct dentry *result;
162 return ERR_PTR(-ESTALE);
164 inode = iget(sb, ino);
166 return ERR_PTR(-ENOMEM);
168 CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
170 if (is_bad_inode(inode) ||
171 (generation && inode->i_generation != generation)) {
172 /* we didn't find the right inode.. */
173 CERROR("bad inode %lu, link: %d ct: %d or version %u/%u\n",
175 inode->i_nlink, atomic_read(&inode->i_count),
180 return ERR_PTR(-ESTALE);
183 /* now to find a dentry.
184 * If possible, get a well-connected one
187 *mnt = mds->mds_vfsmnt;
188 spin_lock(&dcache_lock);
189 for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
190 result = list_entry(lp,struct dentry, d_alias);
191 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
193 result->d_vfs_flags |= DCACHE_REFERENCED;
194 spin_unlock(&dcache_lock);
201 spin_unlock(&dcache_lock);
202 result = d_alloc_root(inode);
203 if (result == NULL) {
205 return ERR_PTR(-ENOMEM);
209 result->d_flags |= DCACHE_NFSD_DISCONNECTED;
213 static inline void mds_get_objid(struct inode *inode, __u64 *id)
215 /* FIXME: it is only by luck that this works on ext3 */
216 memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
219 int mds_getattr(struct ptlrpc_request *req)
226 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
227 &req->rq_replen, &req->rq_repbuf);
228 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
229 CERROR("mds: out of memory\n");
230 req->rq_status = -ENOMEM;
234 req->rq_rephdr->xid = req->rq_reqhdr->xid;
235 rep = req->rq_rep.mds;
237 de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, NULL);
239 req->rq_rephdr->status = -ENOENT;
244 rep->ino = inode->i_ino;
245 rep->generation = inode->i_generation;
246 rep->atime = inode->i_atime;
247 rep->ctime = inode->i_ctime;
248 rep->mtime = inode->i_mtime;
249 rep->uid = inode->i_uid;
250 rep->gid = inode->i_gid;
251 rep->size = inode->i_size;
252 rep->mode = inode->i_mode;
253 rep->nlink = inode->i_nlink;
255 mds_get_objid(inode, &rep->objid);
260 int mds_open(struct ptlrpc_request *req)
265 struct vfsmount *mnt;
269 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
270 &req->rq_replen, &req->rq_repbuf);
271 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
272 CERROR("mds: out of memory\n");
273 req->rq_status = -ENOMEM;
277 req->rq_rephdr->xid = req->rq_reqhdr->xid;
278 rep = req->rq_rep.mds;
280 de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
282 req->rq_rephdr->status = -ENOENT;
285 flags = req->rq_req.mds->flags;
286 file = dentry_open(de, mnt, flags);
287 if (!file || IS_ERR(file)) {
288 req->rq_rephdr->status = -EINVAL;
292 rep->objid = (__u64) (unsigned long)file;
296 int mds_close(struct ptlrpc_request *req)
301 struct vfsmount *mnt;
304 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
305 &req->rq_replen, &req->rq_repbuf);
306 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
307 CERROR("mds: out of memory\n");
308 req->rq_status = -ENOMEM;
312 req->rq_rephdr->xid = req->rq_reqhdr->xid;
313 rep = req->rq_rep.mds;
315 de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
317 req->rq_rephdr->status = -ENOENT;
321 file = (struct file *)(unsigned long) req->rq_req.mds->objid;
323 req->rq_rephdr->status = filp_close(file, 0);
330 int mds_readpage(struct ptlrpc_request *req)
332 struct vfsmount *mnt;
335 struct niobuf *niobuf;
341 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
342 &req->rq_replen, &req->rq_repbuf);
343 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
344 CERROR("mds: out of memory\n");
345 req->rq_status = -ENOMEM;
349 req->rq_rephdr->xid = req->rq_reqhdr->xid;
350 rep = req->rq_rep.mds;
352 de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
354 req->rq_rephdr->status = PTR_ERR(de);
358 CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
360 file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
361 /* note: in case of an error, dentry_open puts dentry */
363 req->rq_rephdr->status = PTR_ERR(file);
367 niobuf = mds_req_tgt(req->rq_req.mds);
369 /* to make this asynchronous make sure that the handling function
370 doesn't send a reply when this function completes. Instead a
371 callback function would send the reply */
372 rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf);
375 req->rq_rephdr->status = rc;
379 int mds_reint(struct ptlrpc_request *req)
383 struct mds_update_record rec;
385 buf = mds_req_tgt(req->rq_req.mds);
386 len = req->rq_req.mds->tgtlen;
388 rc = mds_update_unpack(buf, len, &rec);
389 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
390 CERROR("invalid record\n");
391 req->rq_status = -EINVAL;
394 /* rc will be used to interrupt a for loop over multiple records */
395 rc = mds_reint_rec(&rec, req);
399 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
400 struct ptlrpc_request *req)
403 struct ptlreq_hdr *hdr;
407 hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
409 if (NTOH__u32(hdr->type) != PTL_RPC_REQUEST) {
410 CERROR("lustre_mds: wrong packet type sent %d\n",
411 NTOH__u32(hdr->type));
416 rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen,
417 &req->rq_reqhdr, &req->rq_req);
418 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
419 CERROR("lustre_mds: Invalid request\n");
423 switch (req->rq_reqhdr->opc) {
426 CDEBUG(D_INODE, "getattr\n");
427 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
428 rc = mds_getattr(req);
432 CDEBUG(D_INODE, "readpage\n");
433 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
434 rc = mds_readpage(req);
438 CDEBUG(D_INODE, "reint\n");
439 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
444 CDEBUG(D_INODE, "open\n");
445 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
450 CDEBUG(D_INODE, "close\n");
451 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
456 rc = ptlrpc_error(dev, svc, req);
463 CERROR("no header\n");
468 if( req->rq_status) {
469 ptlrpc_error(dev, svc, req);
471 CDEBUG(D_NET, "sending reply\n");
472 ptlrpc_reply(dev, svc, req);
479 /* mount the file system (secretly) */
480 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
482 struct obd_ioctl_data* data = buf;
483 struct mds_obd *mds = &obddev->u.mds;
484 struct vfsmount *mnt;
488 mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
491 CERROR("do_kern_mount failed: %d\n", err);
495 mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
499 mds->mds_vfsmnt = mnt;
500 mds->mds_fstype = strdup(data->ioc_inlbuf2);
502 mds->mds_ctxt.pwdmnt = mnt;
503 mds->mds_ctxt.pwd = mnt->mnt_root;
504 mds->mds_ctxt.fs = KERNEL_DS;
506 mds->mds_service = ptlrpc_init_svc(128 * 1024,
507 MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
509 if (!mds->mds_service) {
510 CERROR("failed to start service\n");
514 err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
516 CERROR("cannot start thread\n");
522 static int mds_cleanup(struct obd_device * obddev)
524 struct super_block *sb;
525 struct mds_obd *mds = &obddev->u.mds;
529 if ( !list_empty(&obddev->obd_gen_clients) ) {
530 CERROR("still has clients!\n");
534 ptlrpc_stop_thread(mds->mds_service);
535 rpc_unregister_service(mds->mds_service);
537 if (!list_empty(&mds->mds_service->srv_reqs)) {
538 // XXX reply with errors and clean up
539 CERROR("Request list not empty!\n");
542 rpc_unregister_service(mds->mds_service);
543 OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
550 mntput(mds->mds_vfsmnt);
552 kfree(mds->mds_fstype);
559 /* use obd ops to offer management infrastructure */
560 static struct obd_ops mds_obd_ops = {
562 o_cleanup: mds_cleanup,
565 static int __init mds_init(void)
567 obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
571 static void __exit mds_exit(void)
573 obd_unregister_type(LUSTRE_MDS_NAME);
576 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
577 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
578 MODULE_LICENSE("GPL");
580 module_init(mds_init);
581 module_exit(mds_exit);