1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Lustre Metadata Server (mds) request handler
8 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
10 * This code is issued under the GNU General Public License.
11 * See the file COPYING in this distribution
13 * by Peter Braam <braam@clusterfs.com>
15 * This server is single threaded at present (but can easily be multi threaded).
21 #include <linux/version.h>
22 #include <linux/module.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/ext2_fs.h>
27 #include <linux/quotaops.h>
28 #include <asm/unistd.h>
29 #include <asm/uaccess.h>
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_lib.h>
36 int mds_sendpage(struct ptlrpc_request *req, struct file *file,
37 __u64 offset, struct niobuf *dst)
40 mm_segment_t oldfs = get_fs();
42 if (req->rq_peer.peer_nid == 0) {
43 /* dst->addr is a user address, but in a different task! */
45 rc = generic_file_read(file, (char *)(long)dst->addr,
52 struct ptlrpc_bulk_desc *bulk;
55 bulk = ptlrpc_prep_bulk(&req->rq_peer);
59 bulk->b_xid = req->rq_xid;
61 OBD_ALLOC(buf, PAGE_SIZE);
63 OBD_FREE(bulk, sizeof(*bulk));
68 rc = generic_file_read(file, buf, PAGE_SIZE, &offset);
71 if (rc != PAGE_SIZE) {
72 OBD_FREE(buf, PAGE_SIZE);
77 bulk->b_buflen = PAGE_SIZE;
79 rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
80 wait_event_interruptible(bulk->b_waitq,
81 ptlrpc_check_bulk_sent(bulk));
83 if (bulk->b_flags == PTL_RPC_INTR) {
85 /* FIXME: hey hey, we leak here. */
89 OBD_FREE(bulk, sizeof(*bulk));
90 OBD_FREE(buf, PAGE_SIZE);
96 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
97 struct vfsmount **mnt)
100 struct super_block *sb = mds->mds_sb;
101 unsigned long ino = fid->id;
102 //__u32 generation = fid->generation;
103 __u32 generation = 0;
105 struct list_head *lp;
106 struct dentry *result;
109 return ERR_PTR(-ESTALE);
111 inode = iget(sb, ino);
113 return ERR_PTR(-ENOMEM);
115 CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
117 if (is_bad_inode(inode)
118 || (generation && inode->i_generation != generation)
120 /* we didn't find the right inode.. */
121 CERROR("bad inode %lu, link: %d ct: %d or version %u/%u\n",
123 inode->i_nlink, atomic_read(&inode->i_count),
127 return ERR_PTR(-ESTALE);
130 /* now to find a dentry.
131 * If possible, get a well-connected one
134 *mnt = mds->mds_vfsmnt;
135 spin_lock(&dcache_lock);
136 for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
137 result = list_entry(lp,struct dentry, d_alias);
138 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
140 result->d_vfs_flags |= DCACHE_REFERENCED;
141 spin_unlock(&dcache_lock);
148 spin_unlock(&dcache_lock);
149 result = d_alloc_root(inode);
150 if (result == NULL) {
152 return ERR_PTR(-ENOMEM);
156 result->d_flags |= DCACHE_NFSD_DISCONNECTED;
160 static inline void mds_get_objid(struct inode *inode, __u64 *id)
162 memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
165 int mds_getattr(struct ptlrpc_request *req)
172 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
173 &req->rq_replen, &req->rq_repbuf);
176 CERROR("mds: out of memory\n");
177 req->rq_status = -ENOMEM;
181 req->rq_rephdr->xid = req->rq_reqhdr->xid;
182 rep = req->rq_rep.mds;
184 de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, NULL);
187 req->rq_rephdr->status = -ENOENT;
192 rep->ino = inode->i_ino;
193 rep->atime = inode->i_atime;
194 rep->ctime = inode->i_ctime;
195 rep->mtime = inode->i_mtime;
196 rep->uid = inode->i_uid;
197 rep->gid = inode->i_gid;
198 rep->size = inode->i_size;
199 rep->mode = inode->i_mode;
200 rep->nlink = inode->i_nlink;
202 mds_get_objid(inode, &rep->objid);
207 int mds_open(struct ptlrpc_request *req)
212 struct vfsmount *mnt;
216 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
217 &req->rq_replen, &req->rq_repbuf);
220 CERROR("mds: out of memory\n");
221 req->rq_status = -ENOMEM;
225 req->rq_rephdr->xid = req->rq_reqhdr->xid;
226 rep = req->rq_rep.mds;
228 de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
231 req->rq_rephdr->status = -ENOENT;
234 flags = req->rq_req.mds->flags;
235 file = dentry_open(de, mnt, flags);
236 if (!file || IS_ERR(file)) {
237 req->rq_rephdr->status = -EINVAL;
241 rep->objid = (__u64) (unsigned long)file;
242 //mds_get_objid(inode, &rep->objid);
247 int mds_close(struct ptlrpc_request *req)
252 struct vfsmount *mnt;
255 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
256 &req->rq_replen, &req->rq_repbuf);
259 CERROR("mds: out of memory\n");
260 req->rq_status = -ENOMEM;
264 req->rq_rephdr->xid = req->rq_reqhdr->xid;
265 rep = req->rq_rep.mds;
267 de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
270 req->rq_rephdr->status = -ENOENT;
274 file = (struct file *)(unsigned long) req->rq_req.mds->objid;
275 req->rq_rephdr->status = filp_close(file, 0);
281 int mds_readpage(struct ptlrpc_request *req)
283 struct vfsmount *mnt;
286 struct niobuf *niobuf;
290 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
291 &req->rq_replen, &req->rq_repbuf);
294 CERROR("mds: out of memory\n");
295 req->rq_status = -ENOMEM;
299 req->rq_rephdr->xid = req->rq_reqhdr->xid;
300 rep = req->rq_rep.mds;
302 de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
305 req->rq_rephdr->status = PTR_ERR(de);
309 CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
311 file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
312 /* note: in case of an error, dentry_open puts dentry */
315 req->rq_rephdr->status = PTR_ERR(file);
319 niobuf = mds_req_tgt(req->rq_req.mds);
321 /* to make this asynchronous make sure that the handling function
322 doesn't send a reply when this function completes. Instead a
323 callback function would send the reply */
324 rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf);
327 req->rq_rephdr->status = rc;
332 int mds_reint(struct ptlrpc_request *req)
335 char *buf = mds_req_tgt(req->rq_req.mds);
336 int len = req->rq_req.mds->tgtlen;
337 struct mds_update_record rec;
339 rc = mds_update_unpack(buf, len, &rec);
341 CERROR("invalid record\n");
342 req->rq_status = -EINVAL;
345 /* rc will be used to interrupt a for loop over multiple records */
346 rc = mds_reint_rec(&rec, req);
350 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
351 struct ptlrpc_request *req)
354 struct ptlreq_hdr *hdr;
358 hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
360 if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
361 CERROR("lustre_mds: wrong packet type sent %d\n",
362 NTOH__u32(hdr->type));
367 rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen,
368 &req->rq_reqhdr, &req->rq_req);
370 CERROR("lustre_mds: Invalid request\n");
375 switch (req->rq_reqhdr->opc) {
378 CDEBUG(D_INODE, "getattr\n");
379 rc = mds_getattr(req);
383 CDEBUG(D_INODE, "readpage\n");
384 rc = mds_readpage(req);
388 CDEBUG(D_INODE, "reint\n");
393 return ptlrpc_error(dev, svc, req);
398 CERROR("no header\n");
402 if( req->rq_status) {
403 ptlrpc_error(dev, svc, req);
405 CDEBUG(D_INODE, "sending reply\n");
406 ptlrpc_reply(dev, svc, req);
413 /* mount the file system (secretly) */
414 static int mds_setup(struct obd_device *obddev, obd_count len,
418 struct obd_ioctl_data* data = buf;
419 struct mds_obd *mds = &obddev->u.mds;
420 struct vfsmount *mnt;
424 mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
431 mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
432 if (!obddev->u.mds.mds_sb) {
437 mds->mds_vfsmnt = mnt;
438 obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
440 mds->mds_ctxt.pwdmnt = mnt;
441 mds->mds_ctxt.pwd = mnt->mnt_root;
442 mds->mds_ctxt.fs = KERNEL_DS;
444 mds->mds_service = ptlrpc_init_svc( 64 * 1024,
452 rpc_register_service(mds->mds_service, "self");
454 err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
456 CERROR("cannot start thread\n");
465 static int mds_cleanup(struct obd_device * obddev)
467 struct super_block *sb;
468 struct mds_obd *mds = &obddev->u.mds;
472 if ( !(obddev->obd_flags & OBD_SET_UP) ) {
477 if ( !list_empty(&obddev->obd_gen_clients) ) {
478 CERROR("still has clients!\n");
483 ptlrpc_stop_thread(mds->mds_service);
484 rpc_unregister_service(mds->mds_service);
486 if (!list_empty(&mds->mds_service->srv_reqs)) {
487 // XXX reply with errors and clean up
488 CERROR("Request list not empty!\n");
491 rpc_unregister_service(mds->mds_service);
492 OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
501 mntput(mds->mds_vfsmnt);
503 kfree(mds->mds_fstype);
511 /* use obd ops to offer management infrastructure */
512 static struct obd_ops mds_obd_ops = {
514 o_cleanup: mds_cleanup,
517 static int __init mds_init(void)
519 obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
523 static void __exit mds_exit(void)
525 obd_unregister_type(LUSTRE_MDS_NAME);
528 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
529 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
530 MODULE_LICENSE("GPL");
532 module_init(mds_init);
533 module_exit(mds_exit);