1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Lustre Metadata Server (mds) request handler
8 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
10 * This code is issued under the GNU General Public License.
11 * See the file COPYING in this distribution
13 * by Peter Braam <braam@clusterfs.com>
15 * This server is single threaded at present (but can easily be multi threaded).
21 #include <linux/version.h>
22 #include <linux/module.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/ext2_fs.h>
27 #include <linux/quotaops.h>
28 #include <asm/unistd.h>
29 #include <asm/uaccess.h>
31 #define DEBUG_SUBSYSTEM S_MDS
33 #include <linux/obd_support.h>
34 #include <linux/obd_class.h>
35 #include <linux/obd.h>
36 #include <linux/lustre_lib.h>
37 #include <linux/lustre_idl.h>
38 #include <linux/lustre_mds.h>
39 #include <linux/lustre_net.h>
40 #include <linux/obd_class.h>
42 int mds_sendpage(struct ptlrpc_request *req, struct file *file,
43 __u64 offset, struct niobuf *dst)
46 mm_segment_t oldfs = get_fs();
48 if (req->rq_peer.peer_nid == 0) {
49 /* dst->addr is a user address, but in a different task! */
51 rc = generic_file_read(file, (char *)(long)dst->addr,
58 struct ptlrpc_bulk_desc *bulk;
61 bulk = ptlrpc_prep_bulk(&req->rq_peer);
65 bulk->b_xid = req->rq_xid;
67 OBD_ALLOC(buf, PAGE_SIZE);
69 OBD_FREE(bulk, sizeof(*bulk));
74 rc = generic_file_read(file, buf, PAGE_SIZE, &offset);
77 if (rc != PAGE_SIZE) {
78 OBD_FREE(buf, PAGE_SIZE);
83 bulk->b_buflen = PAGE_SIZE;
85 rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
86 wait_event_interruptible(bulk->b_waitq,
87 ptlrpc_check_bulk_sent(bulk));
89 if (bulk->b_flags == PTL_RPC_INTR) {
91 /* FIXME: hey hey, we leak here. */
95 OBD_FREE(bulk, sizeof(*bulk));
96 OBD_FREE(buf, PAGE_SIZE);
102 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
103 struct vfsmount **mnt)
105 /* stolen from NFS */
106 struct super_block *sb = mds->mds_sb;
107 unsigned long ino = fid->id;
108 //__u32 generation = fid->generation;
109 __u32 generation = 0;
111 struct list_head *lp;
112 struct dentry *result;
115 return ERR_PTR(-ESTALE);
117 inode = iget(sb, ino);
119 return ERR_PTR(-ENOMEM);
121 CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
123 if (is_bad_inode(inode)
124 || (generation && inode->i_generation != generation)
126 /* we didn't find the right inode.. */
127 CERROR("bad inode %lu, link: %d ct: %d or version %u/%u\n",
129 inode->i_nlink, atomic_read(&inode->i_count),
133 return ERR_PTR(-ESTALE);
136 /* now to find a dentry.
137 * If possible, get a well-connected one
140 *mnt = mds->mds_vfsmnt;
141 spin_lock(&dcache_lock);
142 for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
143 result = list_entry(lp,struct dentry, d_alias);
144 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
146 result->d_vfs_flags |= DCACHE_REFERENCED;
147 spin_unlock(&dcache_lock);
154 spin_unlock(&dcache_lock);
155 result = d_alloc_root(inode);
156 if (result == NULL) {
158 return ERR_PTR(-ENOMEM);
162 result->d_flags |= DCACHE_NFSD_DISCONNECTED;
166 static inline void mds_get_objid(struct inode *inode, __u64 *id)
168 memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
171 int mds_getattr(struct ptlrpc_request *req)
178 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
179 &req->rq_replen, &req->rq_repbuf);
182 CERROR("mds: out of memory\n");
183 req->rq_status = -ENOMEM;
187 req->rq_rephdr->xid = req->rq_reqhdr->xid;
188 rep = req->rq_rep.mds;
190 de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, NULL);
193 req->rq_rephdr->status = -ENOENT;
198 rep->ino = inode->i_ino;
199 rep->atime = inode->i_atime;
200 rep->ctime = inode->i_ctime;
201 rep->mtime = inode->i_mtime;
202 rep->uid = inode->i_uid;
203 rep->gid = inode->i_gid;
204 rep->size = inode->i_size;
205 rep->mode = inode->i_mode;
206 rep->nlink = inode->i_nlink;
208 mds_get_objid(inode, &rep->objid);
213 int mds_open(struct ptlrpc_request *req)
218 struct vfsmount *mnt;
222 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
223 &req->rq_replen, &req->rq_repbuf);
226 CERROR("mds: out of memory\n");
227 req->rq_status = -ENOMEM;
231 req->rq_rephdr->xid = req->rq_reqhdr->xid;
232 rep = req->rq_rep.mds;
234 de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
237 req->rq_rephdr->status = -ENOENT;
240 flags = req->rq_req.mds->flags;
241 file = dentry_open(de, mnt, flags);
242 if (!file || IS_ERR(file)) {
243 req->rq_rephdr->status = -EINVAL;
247 rep->objid = (__u64) (unsigned long)file;
248 //mds_get_objid(inode, &rep->objid);
253 int mds_close(struct ptlrpc_request *req)
258 struct vfsmount *mnt;
261 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
262 &req->rq_replen, &req->rq_repbuf);
265 CERROR("mds: out of memory\n");
266 req->rq_status = -ENOMEM;
270 req->rq_rephdr->xid = req->rq_reqhdr->xid;
271 rep = req->rq_rep.mds;
273 de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
276 req->rq_rephdr->status = -ENOENT;
280 file = (struct file *)(unsigned long) req->rq_req.mds->objid;
281 req->rq_rephdr->status = filp_close(file, 0);
287 int mds_readpage(struct ptlrpc_request *req)
289 struct vfsmount *mnt;
292 struct niobuf *niobuf;
296 rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
297 &req->rq_replen, &req->rq_repbuf);
300 CERROR("mds: out of memory\n");
301 req->rq_status = -ENOMEM;
305 req->rq_rephdr->xid = req->rq_reqhdr->xid;
306 rep = req->rq_rep.mds;
308 de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
311 req->rq_rephdr->status = PTR_ERR(de);
315 CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
317 file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
318 /* note: in case of an error, dentry_open puts dentry */
321 req->rq_rephdr->status = PTR_ERR(file);
325 niobuf = mds_req_tgt(req->rq_req.mds);
327 /* to make this asynchronous make sure that the handling function
328 doesn't send a reply when this function completes. Instead a
329 callback function would send the reply */
330 rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf);
333 req->rq_rephdr->status = rc;
338 int mds_reint(struct ptlrpc_request *req)
341 char *buf = mds_req_tgt(req->rq_req.mds);
342 int len = req->rq_req.mds->tgtlen;
343 struct mds_update_record rec;
345 rc = mds_update_unpack(buf, len, &rec);
347 CERROR("invalid record\n");
348 req->rq_status = -EINVAL;
351 /* rc will be used to interrupt a for loop over multiple records */
352 rc = mds_reint_rec(&rec, req);
356 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
357 struct ptlrpc_request *req)
360 struct ptlreq_hdr *hdr;
364 hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
366 if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
367 CERROR("lustre_mds: wrong packet type sent %d\n",
368 NTOH__u32(hdr->type));
373 rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen,
374 &req->rq_reqhdr, &req->rq_req);
376 CERROR("lustre_mds: Invalid request\n");
381 switch (req->rq_reqhdr->opc) {
384 CDEBUG(D_INODE, "getattr\n");
385 rc = mds_getattr(req);
389 CDEBUG(D_INODE, "readpage\n");
390 rc = mds_readpage(req);
394 CDEBUG(D_INODE, "reint\n");
399 return ptlrpc_error(dev, svc, req);
404 CERROR("no header\n");
408 if( req->rq_status) {
409 ptlrpc_error(dev, svc, req);
411 CDEBUG(D_INODE, "sending reply\n");
412 ptlrpc_reply(dev, svc, req);
419 /* mount the file system (secretly) */
420 static int mds_setup(struct obd_device *obddev, obd_count len,
424 struct obd_ioctl_data* data = buf;
425 struct mds_obd *mds = &obddev->u.mds;
426 struct vfsmount *mnt;
430 mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
437 mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
438 if (!obddev->u.mds.mds_sb) {
443 mds->mds_vfsmnt = mnt;
444 obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
446 mds->mds_ctxt.pwdmnt = mnt;
447 mds->mds_ctxt.pwd = mnt->mnt_root;
448 mds->mds_ctxt.fs = KERNEL_DS;
450 mds->mds_service = ptlrpc_init_svc( 64 * 1024,
458 rpc_register_service(mds->mds_service, "self");
460 err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
462 CERROR("cannot start thread\n");
471 static int mds_cleanup(struct obd_device * obddev)
473 struct super_block *sb;
474 struct mds_obd *mds = &obddev->u.mds;
478 if ( !(obddev->obd_flags & OBD_SET_UP) ) {
483 if ( !list_empty(&obddev->obd_gen_clients) ) {
484 CERROR("still has clients!\n");
489 ptlrpc_stop_thread(mds->mds_service);
490 rpc_unregister_service(mds->mds_service);
492 if (!list_empty(&mds->mds_service->srv_reqs)) {
493 // XXX reply with errors and clean up
494 CERROR("Request list not empty!\n");
497 rpc_unregister_service(mds->mds_service);
498 OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
507 mntput(mds->mds_vfsmnt);
509 kfree(mds->mds_fstype);
517 /* use obd ops to offer management infrastructure */
518 static struct obd_ops mds_obd_ops = {
520 o_cleanup: mds_cleanup,
523 static int __init mds_init(void)
525 obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
529 static void __exit mds_exit(void)
531 obd_unregister_type(LUSTRE_MDS_NAME);
534 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
535 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
536 MODULE_LICENSE("GPL");
538 module_init(mds_init);
539 module_exit(mds_exit);