1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Lustre Metadata Server (mds) request handler
8 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
10 * This code is issued under the GNU General Public License.
11 * See the file COPYING in this distribution
13 * by Peter Braam <braam@clusterfs.com>
15 * This server is single threaded at present (but can easily be multi threaded)
20 #define DEBUG_SUBSYSTEM S_MDS
22 #include <linux/module.h>
23 #include <linux/lustre_mds.h>
26 int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset)
29 mm_segment_t oldfs = get_fs();
30 struct ptlrpc_bulk_desc *desc;
31 struct ptlrpc_bulk_page *bulk;
35 desc = ptlrpc_prep_bulk(req->rq_connection);
37 GOTO(out, rc = -ENOMEM);
39 bulk = ptlrpc_prep_bulk_page(desc);
41 GOTO(cleanup_bulk, rc = -ENOMEM);
43 OBD_ALLOC(buf, PAGE_SIZE);
45 GOTO(cleanup_bulk, rc = -ENOMEM);
48 rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
53 GOTO(cleanup_buf, rc = -EIO);
55 bulk->b_xid = req->rq_reqmsg->xid;
57 bulk->b_buflen = PAGE_SIZE;
58 desc->b_portal = MDS_BULK_PORTAL;
60 rc = ptlrpc_send_bulk(desc);
61 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
62 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
63 OBD_FAIL_MDS_SENDPAGE, rc);
64 ptlrpc_abort_bulk(desc);
65 GOTO(cleanup_buf, rc);
70 OBD_FREE(buf, PAGE_SIZE);
72 ptlrpc_free_bulk(desc);
77 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
78 struct vfsmount **mnt)
81 struct super_block *sb = mds->mds_sb;
82 unsigned long ino = fid->id;
83 __u32 generation = fid->generation;
86 struct dentry *result;
89 RETURN(ERR_PTR(-ESTALE));
91 inode = iget(sb, ino);
93 RETURN(ERR_PTR(-ENOMEM));
95 CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
97 if (is_bad_inode(inode) ||
98 (generation && inode->i_generation != generation)) {
99 /* we didn't find the right inode.. */
100 CERROR("bad inode %lu, link: %d ct: %d or version %u/%u\n",
101 inode->i_ino, inode->i_nlink,
102 atomic_read(&inode->i_count), inode->i_generation,
106 RETURN(ERR_PTR(-ESTALE));
109 /* now to find a dentry.
110 * If possible, get a well-connected one
113 *mnt = mds->mds_vfsmnt;
114 spin_lock(&dcache_lock);
115 for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
116 result = list_entry(lp,struct dentry, d_alias);
117 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
119 result->d_vfs_flags |= DCACHE_REFERENCED;
120 spin_unlock(&dcache_lock);
127 spin_unlock(&dcache_lock);
128 result = d_alloc_root(inode);
129 if (result == NULL) {
131 return ERR_PTR(-ENOMEM);
135 result->d_flags |= DCACHE_NFSD_DISCONNECTED;
140 int mds_connect(struct ptlrpc_request *req)
142 struct mds_body *body;
143 struct mds_obd *mds = &req->rq_obd->u.mds;
144 struct mds_client_info *mci;
145 struct mds_client_data *mcd;
146 int rc, size = sizeof(*body);
149 CDEBUG(D_INFO, "MDS connect from UUID '%s'\n", ptlrpc_req_to_uuid(req));
150 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
151 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_PACK)) {
152 CERROR("mds: out of memory for message: size=%d\n", size);
153 req->rq_status = -ENOMEM;
157 body = lustre_msg_buf(req->rq_reqmsg, 0);
158 mds_unpack_req_body(req);
159 /* Anything we need to do here with the client's trans no or so? */
161 body = lustre_msg_buf(req->rq_repmsg, 0);
162 memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
164 mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
166 /* We don't have any old connection data for this client */
169 CDEBUG(D_INFO, "allocating new client data for UUID '%s'",
170 ptlrpc_req_to_uuid(req));
172 OBD_ALLOC(mcd, sizeof(*mcd));
174 CERROR("mds: out of memory for client data\n");
175 req->rq_status = -ENOMEM;
178 rc = mds_client_add(mds, mcd, -1);
184 /* We have old connection data for this client... */
186 CDEBUG(D_INFO, "found existing data for UUID '%s' at #%d\n",
187 mcd->mcd_uuid, mci->mci_off);
189 /* mcd_last_xid is is stored in little endian on the disk and
190 mds_pack_rep_body converts it to network order */
191 body->last_xid = le32_to_cpu(mcd->mcd_last_xid);
192 mds_pack_rep_body(req);
197 int mds_getattr(struct ptlrpc_request *req)
201 struct mds_body *body;
202 struct mds_obd *mds = &req->rq_obd->u.mds;
203 int rc, size[2] = {sizeof(*body)}, count = 1;
206 body = lustre_msg_buf(req->rq_reqmsg, 0);
207 de = mds_fid2dentry(mds, &body->fid1, NULL);
209 req->rq_status = -ENOENT;
213 if (body->valid & OBD_MD_LINKNAME) {
215 size[1] = inode->i_size;
218 rc = lustre_pack_msg(count, size, NULL, &req->rq_replen,
220 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
221 CERROR("mds: out of memory\n");
222 req->rq_status = -ENOMEM;
226 if (body->valid & OBD_MD_LINKNAME) {
227 char *tmp = lustre_msg_buf(req->rq_repmsg, 1);
232 rc = inode->i_op->readlink(de, tmp, size[1]);
237 CERROR("readlink failed: %d\n", rc);
242 body = lustre_msg_buf(req->rq_repmsg, 0);
243 body->ino = inode->i_ino;
244 body->generation = inode->i_generation;
245 body->atime = inode->i_atime;
246 body->ctime = inode->i_ctime;
247 body->mtime = inode->i_mtime;
248 body->uid = inode->i_uid;
249 body->gid = inode->i_gid;
250 body->size = inode->i_size;
251 body->mode = inode->i_mode;
252 body->nlink = inode->i_nlink;
253 if (S_ISREG(inode->i_mode)) {
254 rc = mds_fs_get_objid(mds, inode, &body->objid);
257 CERROR("readlink failed: %d\n", rc);
261 body->valid = ~0; /* FIXME: should be more selective */
268 int mds_open(struct ptlrpc_request *req)
270 struct mds_obd *mds = &req->rq_obd->u.mds;
272 struct mds_body *body;
274 struct vfsmount *mnt;
275 struct mds_client_info *mci;
277 struct list_head *tmp;
278 struct mds_file_data *mfd;
279 int rc, size = sizeof(*body);
283 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
284 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
285 CERROR("mds: out of memory\n");
286 req->rq_status = -ENOMEM;
291 mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
293 CERROR("mds: no mci!\n");
294 req->rq_status = -ENOTCONN;
298 body = lustre_msg_buf(req->rq_reqmsg, 0);
300 /* was this animal open already? */
301 list_for_each(tmp, &mci->mci_open_head) {
302 struct mds_file_data *fd;
303 fd = list_entry(tmp, struct mds_file_data, mfd_list);
304 if (body->objid == fd->mfd_clientfd &&
305 body->fid1.id == fd->mfd_file->f_dentry->d_inode->i_ino) {
306 CERROR("Re opening %Ld\n", body->fid1.id);
311 OBD_ALLOC(mfd, sizeof(*mfd));
313 CERROR("mds: out of memory\n");
314 req->rq_status = -ENOMEM;
318 de = mds_fid2dentry(mds, &body->fid1, &mnt);
320 req->rq_status = -ENOENT;
325 file = dentry_open(de, mnt, flags);
326 if (!file || IS_ERR(file)) {
327 req->rq_status = -EINVAL;
328 OBD_FREE(mfd, sizeof(*mfd));
332 file->private_data = mfd;
333 mfd->mfd_file = file;
334 mfd->mfd_clientfd = body->objid;
335 list_add(&mfd->mfd_list, &mci->mci_open_head);
337 body = lustre_msg_buf(req->rq_repmsg, 0);
338 body->objid = (__u64) (unsigned long)file;
343 int mds_close(struct ptlrpc_request *req)
346 struct mds_body *body;
348 struct vfsmount *mnt;
349 struct mds_file_data *mfd;
353 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
354 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
355 CERROR("mds: out of memory\n");
356 req->rq_status = -ENOMEM;
360 body = lustre_msg_buf(req->rq_reqmsg, 0);
361 de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
363 req->rq_status = -ENOENT;
367 file = (struct file *)(unsigned long)body->objid;
370 mfd = (struct mds_file_data *)file->private_data;
371 list_del(&mfd->mfd_list);
372 OBD_FREE(mfd, sizeof(*mfd));
374 req->rq_status = filp_close(file, 0);
381 int mds_readpage(struct ptlrpc_request *req)
383 struct vfsmount *mnt;
386 struct mds_body *body;
387 int rc, size = sizeof(*body);
390 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
391 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
392 CERROR("mds: out of memory\n");
393 req->rq_status = -ENOMEM;
397 body = lustre_msg_buf(req->rq_reqmsg, 0);
398 de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
400 req->rq_status = PTR_ERR(de);
404 CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
406 file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
407 /* note: in case of an error, dentry_open puts dentry */
409 req->rq_status = PTR_ERR(file);
413 /* to make this asynchronous make sure that the handling function
414 doesn't send a reply when this function completes. Instead a
415 callback function would send the reply */
416 rc = mds_sendpage(req, file, body->size);
423 int mds_reint(struct ptlrpc_request *req)
426 struct mds_update_record rec;
428 rc = mds_update_unpack(req, &rec);
429 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
430 CERROR("invalid record\n");
431 req->rq_status = -EINVAL;
434 /* rc will be used to interrupt a for loop over multiple records */
435 rc = mds_reint_rec(&rec, req);
439 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
440 struct ptlrpc_request *req)
442 struct mds_obd *mds = &req->rq_obd->u.mds;
446 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
447 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
448 CERROR("lustre_mds: Invalid request\n");
452 if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
453 CERROR("lustre_mds: wrong packet type sent %d\n",
454 req->rq_reqmsg->type);
455 GOTO(out, rc = -EINVAL);
458 switch (req->rq_reqmsg->opc) {
460 CDEBUG(D_INODE, "getattr\n");
461 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
462 rc = mds_connect(req);
466 CDEBUG(D_INODE, "getattr\n");
467 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
468 rc = mds_getattr(req);
472 CDEBUG(D_INODE, "readpage\n");
473 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
474 rc = mds_readpage(req);
476 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
481 CDEBUG(D_INODE, "reint\n");
482 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
484 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
488 CDEBUG(D_INODE, "open\n");
489 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
494 CDEBUG(D_INODE, "close\n");
495 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
500 rc = ptlrpc_error(svc, req);
506 /* Still not 100% sure whether we should reply with the server
507 * last_rcvd or that of this client. I'm not sure it even makes
508 * a difference on a per-client basis, because last_rcvd is global
509 * and we are not supposed to allow transactions while in recovery.
511 req->rq_repmsg->last_rcvd = HTON__u64(mds->mds_last_rcvd);
512 req->rq_repmsg->last_committed = HTON__u64(mds->mds_last_committed);
513 CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu, xid %d\n",
514 (unsigned long long)mds->mds_last_rcvd,
515 (unsigned long long)mds->mds_last_committed,
516 cpu_to_le32(req->rq_reqmsg->xid));
518 ptlrpc_error(svc, req);
520 CDEBUG(D_NET, "sending reply\n");
521 ptlrpc_reply(svc, req);
527 /* Update the server data on disk. This stores the new mount_count and
528 * also the last_rcvd value to disk. If we don't have a clean shutdown,
529 * then the server last_rcvd value may be less than that of the clients.
530 * This will alert us that we may need to do client recovery.
533 int mds_update_server_data(struct mds_obd *mds)
535 struct obd_run_ctxt saved;
536 struct mds_server_data *msd = mds->mds_server_data;
537 struct file *filp = mds->mds_rcvd_filp;
541 msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
542 msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
544 CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_rcvd is %Lu\n",
545 (unsigned long long)mds->mds_mount_count,
546 (unsigned long long)mds->mds_last_rcvd);
547 push_ctxt(&saved, &mds->mds_ctxt);
548 rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
549 if (rc != sizeof(*msd)) {
550 CERROR("error writing MDS server data: rc = %d\n", rc);
555 rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
558 CERROR("error flushing MDS server data: rc = %d\n", rc);
563 /* Do recovery actions for the MDS */
564 static int mds_recover(struct obd_device *obddev)
566 struct mds_obd *mds = &obddev->u.mds;
569 /* This happens at the end when recovery is complete */
570 ++mds->mds_mount_count;
571 rc = mds_update_server_data(mds);
576 /* mount the file system (secretly) */
577 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
579 struct obd_ioctl_data* data = buf;
580 struct mds_obd *mds = &obddev->u.mds;
581 struct vfsmount *mnt;
586 #ifdef CONFIG_DEV_RDONLY
589 if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
590 GOTO(err_dec, rc = -EINVAL);
592 mds->mds_fstype = strdup(data->ioc_inlbuf2);
594 mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
597 CERROR("do_kern_mount failed: rc = %d\n", rc);
601 mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
603 GOTO(err_put, rc = -ENODEV);
605 rc = mds_fs_setup(mds, mnt);
607 CERROR("MDS filesystem method init failed: rc = %d\n", rc);
611 mds->mds_service = ptlrpc_init_svc(128 * 1024,
612 MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, "self", mds_handle);
613 if (!mds->mds_service) {
614 CERROR("failed to start service\n");
615 GOTO(err_fs, rc = -EINVAL);
618 rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
620 CERROR("cannot start thread: rc = %d\n", rc);
624 rc = mds_recover(obddev);
626 GOTO(err_thread, rc);
631 ptlrpc_stop_all_threads(mds->mds_service);
633 rpc_unregister_service(mds->mds_service);
634 OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
639 mntput(mds->mds_vfsmnt);
643 kfree(mds->mds_fstype);
649 static int mds_cleanup(struct obd_device * obddev)
651 struct super_block *sb;
652 struct mds_obd *mds = &obddev->u.mds;
656 if ( !list_empty(&obddev->obd_gen_clients) ) {
657 CERROR("still has clients!\n");
661 ptlrpc_stop_all_threads(mds->mds_service);
662 rpc_unregister_service(mds->mds_service);
663 if (!list_empty(&mds->mds_service->srv_reqs)) {
664 // XXX reply with errors and clean up
665 CERROR("Request list not empty!\n");
667 OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
673 mds_update_server_data(mds);
675 if (mds->mds_rcvd_filp) {
676 int rc = filp_close(mds->mds_rcvd_filp, 0);
677 mds->mds_rcvd_filp = NULL;
680 CERROR("last_rcvd file won't close, rc=%d\n", rc);
684 mntput(mds->mds_vfsmnt);
686 kfree(mds->mds_fstype);
688 #ifdef CONFIG_DEV_RDONLY
697 /* use obd ops to offer management infrastructure */
698 static struct obd_ops mds_obd_ops = {
700 o_cleanup: mds_cleanup,
703 static int __init mds_init(void)
705 obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
709 static void __exit mds_exit(void)
711 obd_unregister_type(LUSTRE_MDS_NAME);
714 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
715 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
716 MODULE_LICENSE("GPL");
718 module_init(mds_init);
719 module_exit(mds_exit);