1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Lustre Metadata Server (mds) request handler
8 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
10 * This code is issued under the GNU General Public License.
11 * See the file COPYING in this distribution
13 * by Peter Braam <braam@clusterfs.com>
15 * This server is single threaded at present (but can easily be multi threaded)
20 #define DEBUG_SUBSYSTEM S_MDS
22 #include <linux/module.h>
23 #include <linux/lustre_mds.h>
25 int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset)
28 mm_segment_t oldfs = get_fs();
29 struct ptlrpc_bulk_desc *desc;
30 struct ptlrpc_bulk_page *bulk;
34 desc = ptlrpc_prep_bulk(req->rq_connection);
36 GOTO(out, rc = -ENOMEM);
38 bulk = ptlrpc_prep_bulk_page(desc);
40 GOTO(cleanup_bulk, rc = -ENOMEM);
42 OBD_ALLOC(buf, PAGE_SIZE);
44 GOTO(cleanup_bulk, rc = -ENOMEM);
47 rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
52 GOTO(cleanup_buf, rc = -EIO);
54 bulk->b_xid = req->rq_reqmsg->xid;
56 bulk->b_buflen = PAGE_SIZE;
57 desc->b_portal = MDS_BULK_PORTAL;
59 rc = ptlrpc_send_bulk(desc);
60 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
61 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
62 OBD_FAIL_MDS_SENDPAGE, rc);
63 ptlrpc_abort_bulk(desc);
64 GOTO(cleanup_buf, rc);
69 OBD_FREE(buf, PAGE_SIZE);
71 ptlrpc_free_bulk(desc);
76 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
77 struct vfsmount **mnt)
80 struct super_block *sb = mds->mds_sb;
81 unsigned long ino = fid->id;
82 __u32 generation = fid->generation;
85 struct dentry *result;
88 RETURN(ERR_PTR(-ESTALE));
90 inode = iget(sb, ino);
92 RETURN(ERR_PTR(-ENOMEM));
94 CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
96 if (is_bad_inode(inode) ||
97 (generation && inode->i_generation != generation)) {
98 /* we didn't find the right inode.. */
99 CERROR("bad inode %lu, link: %d ct: %d or version %u/%u\n",
100 inode->i_ino, inode->i_nlink,
101 atomic_read(&inode->i_count), inode->i_generation,
105 RETURN(ERR_PTR(-ESTALE));
108 /* now to find a dentry.
109 * If possible, get a well-connected one
112 *mnt = mds->mds_vfsmnt;
113 spin_lock(&dcache_lock);
114 for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
115 result = list_entry(lp,struct dentry, d_alias);
116 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
118 result->d_vfs_flags |= DCACHE_REFERENCED;
119 spin_unlock(&dcache_lock);
126 spin_unlock(&dcache_lock);
127 result = d_alloc_root(inode);
128 if (result == NULL) {
130 return ERR_PTR(-ENOMEM);
134 result->d_flags |= DCACHE_NFSD_DISCONNECTED;
138 #define MDS_MAX_CLIENTS 1024
139 #define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
141 static unsigned long last_rcvd_slots[MDS_MAX_CLIENT_WORDS];
143 /* Add client data to the MDS. The in-memory storage will be a hash at some
144 * point. We use a bitmap to locate a free space in the last_rcvd file if
145 * cl_off is -1 (i.e. a new client). Otherwise, we have just read the data
146 * from the last_rcvd file and we know its offset.
148 int mds_client_add(struct mds_obd *mds, struct mds_client_data *mcd, int cl_off)
150 struct mds_client_info *mci;
152 OBD_ALLOC(mci, sizeof(*mci));
154 CERROR("no memory for MDS client info\n");
157 INIT_LIST_HEAD(&mci->mci_open_head);
159 CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
160 cl_off, mcd->mcd_uuid);
167 word = last_rcvd_slots;
170 if (word - last_rcvd_slots >= MDS_MAX_CLIENT_WORDS) {
171 CERROR("no room in client MDS bitmap - fix code\n");
175 if (test_and_set_bit(bit, word)) {
176 CERROR("found bit %d set for word %d - fix code\n",
177 bit, word - last_rcvd_slots);
180 cl_off = word - last_rcvd_slots + bit;
182 if (test_and_set_bit(cl_off, last_rcvd_slots)) {
183 CERROR("bit %d already set in bitmap - bad bad\n",
190 mci->mci_off = cl_off;
192 /* For now we just put the clients in a list, not a hashed list */
193 list_add_tail(&mci->mci_list, &mds->mds_client_info);
195 mds->mds_client_count++;
200 void mds_client_del(struct mds_obd *mds, struct mds_client_info *mci)
205 word = last_rcvd_slots + mci->mci_off / sizeof(unsigned long);
206 bit = mci->mci_off % sizeof(unsigned long);
208 if (!test_and_clear_bit(bit, word)) {
209 CERROR("bit %d already clear in word %d - bad bad\n",
210 bit, word - last_rcvd_slots);
214 --mds->mds_client_count;
215 list_del(&mci->mci_list);
216 OBD_FREE(mci->mci_mcd, sizeof(*mci->mci_mcd));
217 OBD_FREE(mci, sizeof (*mci));
220 int mds_client_free_all(struct mds_obd *mds)
222 struct list_head *p, *n;
224 list_for_each_safe(p, n, &mds->mds_client_info) {
225 struct mds_client_info *mci;
227 mci = list_entry(p, struct mds_client_info, mci_list);
228 mds_client_del(mds, mci);
234 int mds_server_free_data(struct mds_obd *mds)
236 OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
237 mds->mds_server_data = NULL;
242 int mds_connect(struct ptlrpc_request *req)
244 struct mds_body *body;
245 struct mds_obd *mds = &req->rq_obd->u.mds;
246 struct mds_client_info *mci;
247 struct mds_client_data *mcd;
248 int rc, size = sizeof(*body);
251 CDEBUG(D_INFO, "MDS connect from UUID '%s'\n", ptlrpc_req_to_uuid(req));
252 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
253 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_PACK)) {
254 CERROR("mds: out of memory for message: size=%d\n", size);
255 req->rq_status = -ENOMEM;
259 body = lustre_msg_buf(req->rq_reqmsg, 0);
260 mds_unpack_req_body(req);
261 /* Anything we need to do here with the client's trans no or so? */
263 body = lustre_msg_buf(req->rq_repmsg, 0);
264 memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
266 mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
268 /* We don't have any old connection data for this client */
271 CDEBUG(D_INFO, "allocating new client data for UUID '%s'",
272 ptlrpc_req_to_uuid(req));
274 OBD_ALLOC(mcd, sizeof(*mcd));
276 CERROR("mds: out of memory for client data\n");
277 req->rq_status = -ENOMEM;
280 rc = mds_client_add(mds, mcd, -1);
286 /* We have old connection data for this client... */
288 CDEBUG(D_INFO, "found existing data for UUID '%s' at #%d\n",
289 mcd->mcd_uuid, mci->mci_off);
291 /* mcd_last_xid is is stored in little endian on the disk and
292 mds_pack_rep_body converts it to network order */
293 body->last_xid = le32_to_cpu(mcd->mcd_last_xid);
294 mds_pack_rep_body(req);
298 int mds_getattr(struct ptlrpc_request *req)
302 struct mds_body *body;
303 struct mds_obd *mds = &req->rq_obd->u.mds;
304 int rc, size[2] = {sizeof(*body)}, count = 1;
307 body = lustre_msg_buf(req->rq_reqmsg, 0);
308 de = mds_fid2dentry(mds, &body->fid1, NULL);
310 req->rq_status = -ENOENT;
314 if (body->valid & OBD_MD_LINKNAME) {
316 size[1] = inode->i_size;
319 rc = lustre_pack_msg(count, size, NULL, &req->rq_replen,
321 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
322 CERROR("mds: out of memory\n");
323 req->rq_status = -ENOMEM;
327 if (body->valid & OBD_MD_LINKNAME) {
328 char *tmp = lustre_msg_buf(req->rq_repmsg, 1);
333 rc = inode->i_op->readlink(de, tmp, size[1]);
338 CERROR("readlink failed: %d\n", req->rq_status);
343 body = lustre_msg_buf(req->rq_repmsg, 0);
344 body->ino = inode->i_ino;
345 body->generation = inode->i_generation;
346 body->atime = inode->i_atime;
347 body->ctime = inode->i_ctime;
348 body->mtime = inode->i_mtime;
349 body->uid = inode->i_uid;
350 body->gid = inode->i_gid;
351 body->size = inode->i_size;
352 body->mode = inode->i_mode;
353 body->nlink = inode->i_nlink;
355 mds_fs_get_objid(mds, inode, &body->objid);
361 int mds_open(struct ptlrpc_request *req)
363 struct mds_obd *mds = &req->rq_obd->u.mds;
365 struct mds_body *body;
367 struct vfsmount *mnt;
368 struct mds_client_info *mci;
370 struct list_head *tmp;
371 struct mds_file_data *mfd;
372 int rc, size = sizeof(*body);
376 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
377 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
378 CERROR("mds: out of memory\n");
379 req->rq_status = -ENOMEM;
384 mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
386 CERROR("mds: no mci!\n");
387 req->rq_status = -ENOTCONN;
391 body = lustre_msg_buf(req->rq_reqmsg, 0);
393 /* was this animal open already? */
394 list_for_each(tmp, &mci->mci_open_head) {
395 struct mds_file_data *fd;
396 fd = list_entry(tmp, struct mds_file_data, mfd_list);
397 if (body->objid == fd->mfd_clientfd &&
398 body->fid1.id == fd->mfd_file->f_dentry->d_inode->i_ino) {
399 CERROR("Re opening %Ld\n", body->fid1.id);
404 OBD_ALLOC(mfd, sizeof(*mfd));
406 CERROR("mds: out of memory\n");
407 req->rq_status = -ENOMEM;
411 de = mds_fid2dentry(mds, &body->fid1, &mnt);
413 req->rq_status = -ENOENT;
418 file = dentry_open(de, mnt, flags);
419 if (!file || IS_ERR(file)) {
420 req->rq_status = -EINVAL;
421 OBD_FREE(mfd, sizeof(*mfd));
425 file->private_data = mfd;
426 mfd->mfd_file = file;
427 mfd->mfd_clientfd = body->objid;
428 list_add(&mfd->mfd_list, &mci->mci_open_head);
430 body = lustre_msg_buf(req->rq_repmsg, 0);
431 body->objid = (__u64) (unsigned long)file;
435 int mds_close(struct ptlrpc_request *req)
438 struct mds_body *body;
440 struct vfsmount *mnt;
441 struct mds_file_data *mfd;
445 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
446 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
447 CERROR("mds: out of memory\n");
448 req->rq_status = -ENOMEM;
452 body = lustre_msg_buf(req->rq_reqmsg, 0);
453 de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
455 req->rq_status = -ENOENT;
459 file = (struct file *)(unsigned long)body->objid;
462 mfd = (struct mds_file_data *)file->private_data;
463 list_del(&mfd->mfd_list);
464 OBD_FREE(mfd, sizeof(*mfd));
466 req->rq_status = filp_close(file, 0);
473 int mds_readpage(struct ptlrpc_request *req)
475 struct vfsmount *mnt;
478 struct mds_body *body;
479 int rc, size = sizeof(*body);
482 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
483 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
484 CERROR("mds: out of memory\n");
485 req->rq_status = -ENOMEM;
489 body = lustre_msg_buf(req->rq_reqmsg, 0);
490 de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
492 req->rq_status = PTR_ERR(de);
496 CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
498 file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
499 /* note: in case of an error, dentry_open puts dentry */
501 req->rq_status = PTR_ERR(file);
505 /* to make this asynchronous make sure that the handling function
506 doesn't send a reply when this function completes. Instead a
507 callback function would send the reply */
508 rc = mds_sendpage(req, file, body->size);
515 int mds_reint(struct ptlrpc_request *req)
518 struct mds_update_record rec;
520 rc = mds_update_unpack(req, &rec);
521 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
522 CERROR("invalid record\n");
523 req->rq_status = -EINVAL;
526 /* rc will be used to interrupt a for loop over multiple records */
527 rc = mds_reint_rec(&rec, req);
531 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
532 struct ptlrpc_request *req)
534 struct mds_obd *mds = &req->rq_obd->u.mds;
538 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
539 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
540 CERROR("lustre_mds: Invalid request\n");
544 if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
545 CERROR("lustre_mds: wrong packet type sent %d\n",
546 req->rq_reqmsg->type);
547 GOTO(out, rc = -EINVAL);
550 switch (req->rq_reqmsg->opc) {
552 CDEBUG(D_INODE, "getattr\n");
553 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
554 rc = mds_connect(req);
558 CDEBUG(D_INODE, "getattr\n");
559 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
560 rc = mds_getattr(req);
564 CDEBUG(D_INODE, "readpage\n");
565 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
566 rc = mds_readpage(req);
568 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
573 CDEBUG(D_INODE, "reint\n");
574 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
576 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
580 CDEBUG(D_INODE, "open\n");
581 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
586 CDEBUG(D_INODE, "close\n");
587 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
592 rc = ptlrpc_error(svc, req);
598 /* Still not 100% sure whether we should reply with the server
599 * last_rcvd or that of this client. I'm not sure it even makes
600 * a difference on a per-client basis, because last_rcvd is global
601 * and we are not supposed to allow transactions while in recovery.
603 req->rq_repmsg->last_rcvd = HTON__u64(mds->mds_last_rcvd);
604 req->rq_repmsg->last_committed = HTON__u64(mds->mds_last_committed);
605 CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu, xid %d\n",
606 (unsigned long long)mds->mds_last_rcvd,
607 (unsigned long long)mds->mds_last_committed,
608 cpu_to_le32(req->rq_reqmsg->xid));
610 ptlrpc_error(svc, req);
612 CDEBUG(D_NET, "sending reply\n");
613 ptlrpc_reply(svc, req);
619 /* This will be a hash table at some point. */
620 int mds_init_client_data(struct mds_obd *mds)
622 INIT_LIST_HEAD(&mds->mds_client_info);
626 #define LAST_RCVD "last_rcvd"
628 int mds_read_last_rcvd(struct mds_obd *mds, struct file *f)
630 struct mds_server_data *msd;
631 struct mds_client_data *mcd = NULL;
632 loff_t fsize = f->f_dentry->d_inode->i_size;
639 OBD_ALLOC(msd, sizeof(*msd));
642 rc = lustre_fread(f, (char *)msd, sizeof(*msd), &off);
644 mds->mds_server_data = msd;
646 CERROR("empty MDS %s, new MDS?\n", LAST_RCVD);
648 } else if (rc != sizeof(*msd)) {
649 CERROR("error reading MDS %s: rc = %d\n", LAST_RCVD, rc);
657 * When we do a clean MDS shutdown, we save the last_rcvd into
658 * the header. If we find clients with higher last_rcvd values
659 * then those clients may need recovery done.
661 last_rcvd = le64_to_cpu(msd->msd_last_rcvd);
662 mds->mds_last_rcvd = last_rcvd;
663 CDEBUG(D_INODE, "got %Lu for server last_rcvd value\n",
664 (unsigned long long)last_rcvd);
666 last_mount = le64_to_cpu(msd->msd_mount_count);
667 mds->mds_mount_count = last_mount;
668 CDEBUG(D_INODE, "got %Lu for server last_mount value\n",
669 (unsigned long long)last_mount);
671 for (off = MDS_LR_CLIENT, cl_off = 0, rc = sizeof(*mcd);
672 off <= fsize - sizeof(*mcd) && rc == sizeof(*mcd);
673 off = MDS_LR_CLIENT + ++cl_off * MDS_LR_SIZE) {
675 OBD_ALLOC(mcd, sizeof(*mcd));
677 GOTO(err_msd, rc = -ENOMEM);
679 rc = lustre_fread(f, (char *)mcd, sizeof(*mcd), &off);
680 if (rc != sizeof(*mcd)) {
681 CERROR("error reading MDS %s offset %d: rc = %d\n",
682 LAST_RCVD, cl_off, rc);
688 last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
689 last_mount = le64_to_cpu(mcd->mcd_mount_count);
692 last_mount - mcd->mcd_mount_count < MDS_MOUNT_RECOV) {
693 rc = mds_client_add(mds, mcd, cl_off);
701 "client at offset %d with UUID '%s' ignored\n",
702 cl_off, mcd->mcd_uuid);
705 if (last_rcvd > mds->mds_last_rcvd) {
707 "client at offset %d has last_rcvd = %Lu\n",
708 cl_off, (unsigned long long)last_rcvd);
709 mds->mds_last_rcvd = last_rcvd;
712 CDEBUG(D_INODE, "got %Lu for highest last_rcvd value, %d clients\n",
713 (unsigned long long)mds->mds_last_rcvd, mds->mds_client_count);
715 /* After recovery, there can be no local uncommitted transactions */
716 mds->mds_last_committed = mds->mds_last_rcvd;
721 mds_server_free_data(mds);
725 static int mds_prep(struct obd_device *obddev)
727 struct obd_run_ctxt saved;
728 struct mds_obd *mds = &obddev->u.mds;
729 struct super_operations *s_ops;
730 struct dentry *dentry;
734 push_ctxt(&saved, &mds->mds_ctxt);
735 dentry = simple_mkdir(current->fs->pwd, "ROOT", 0700);
736 if (IS_ERR(dentry)) {
737 rc = PTR_ERR(dentry);
738 CERROR("cannot create ROOT directory: rc = %d\n", rc);
741 /* XXX probably want to hold on to this later... */
743 f = filp_open("ROOT", O_RDONLY, 0);
746 CERROR("cannot open ROOT: rc = %d\n", rc);
751 mds->mds_rootfid.id = f->f_dentry->d_inode->i_ino;
752 mds->mds_rootfid.generation = f->f_dentry->d_inode->i_generation;
753 mds->mds_rootfid.f_type = S_IFDIR;
755 rc = filp_close(f, 0);
757 CERROR("cannot close ROOT: rc = %d\n", rc);
761 dentry = simple_mkdir(current->fs->pwd, "FH", 0700);
762 if (IS_ERR(dentry)) {
763 rc = PTR_ERR(dentry);
764 CERROR("cannot create FH directory: rc = %d\n", rc);
767 /* XXX probably want to hold on to this later... */
770 rc = mds_init_client_data(mds);
774 f = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
777 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
778 GOTO(err_pop, rc = PTR_ERR(f));
780 if (!S_ISREG(f->f_dentry->d_inode->i_mode)) {
781 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
782 f->f_dentry->d_inode->i_mode);
783 GOTO(err_pop, rc = -ENOENT);
786 rc = mds_fs_journal_data(mds, f);
788 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
792 rc = mds_read_last_rcvd(mds, f);
794 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
795 GOTO(err_client, rc);
797 mds->mds_rcvd_filp = f;
801 * Replace the client filesystem delete_inode method with our own,
802 * so that we can clear the object ID before the inode is deleted.
803 * The fs_delete_inode method will call cl_delete_inode for us.
805 * We need to do this for the MDS superblock only, hence we install
806 * a modified copy of the original superblock method table.
808 * We still assume that there is only a single MDS client filesystem
809 * type, as we don't have access to the mds struct in delete_inode
810 * and store the client delete_inode method in a global table. This
811 * will only become a problem when multiple MDSs are running on a
812 * single host with different client filesystems.
814 OBD_ALLOC(s_ops, sizeof(*s_ops));
816 GOTO(err_filp, rc = -ENOMEM);
818 memcpy(s_ops, mds->mds_sb->s_op, sizeof(*s_ops));
819 mds->mds_fsops->cl_delete_inode = s_ops->delete_inode;
820 s_ops->delete_inode = mds->mds_fsops->fs_delete_inode;
821 mds->mds_sb->s_op = s_ops;
823 mds->mds_service = ptlrpc_init_svc(128 * 1024,
824 MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
827 if (!mds->mds_service) {
828 CERROR("failed to start service\n");
829 GOTO(err_filp, rc = -EINVAL);
832 rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
834 CERROR("cannot start thread: rc = %d\n", rc);
841 rpc_unregister_service(mds->mds_service);
842 OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
844 mds_client_free_all(mds);
846 if (filp_close(f, 0))
847 CERROR("can't close %s after error\n", LAST_RCVD);
854 /* Update the server data on disk. This stores the new mount_count and
855 * also the last_rcvd value to disk. If we don't have a clean shutdown,
856 * then the server last_rcvd value may be less than that of the clients.
857 * This will alert us that we may need to do client recovery.
859 int mds_update_server_data(struct mds_obd *mds)
861 struct obd_run_ctxt saved;
862 struct mds_server_data *msd = mds->mds_server_data;
863 struct file *filp = mds->mds_rcvd_filp;
867 msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
868 msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
870 CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_rcvd is %Lu\n",
871 (unsigned long long)mds->mds_mount_count,
872 (unsigned long long)mds->mds_last_rcvd);
873 push_ctxt(&saved, &mds->mds_ctxt);
874 rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
875 if (rc != sizeof(*msd)) {
876 CERROR("error writing MDS server data: rc = %d\n", rc);
881 rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
884 CERROR("error flushing MDS server data: rc = %d\n", rc);
889 /* Do recovery actions for the MDS */
890 static int mds_recover(struct obd_device *obddev)
892 struct mds_obd *mds = &obddev->u.mds;
895 /* This happens at the end when recovery is complete */
896 ++mds->mds_mount_count;
897 rc = mds_update_server_data(mds);
902 static int mds_cleanup(struct obd_device *obddev);
904 /* mount the file system (secretly) */
905 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
907 struct obd_ioctl_data* data = buf;
908 struct mds_obd *mds = &obddev->u.mds;
909 struct vfsmount *mnt;
913 #ifdef CONFIG_DEV_RDONLY
916 if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
919 mds->mds_fstype = strdup(data->ioc_inlbuf2);
921 if (!strcmp(mds->mds_fstype, "extN"))
922 mds->mds_fsops = &mds_extN_fs_ops;
923 else if (!strcmp(mds->mds_fstype, "ext3"))
924 mds->mds_fsops = &mds_ext3_fs_ops;
925 else if (!strcmp(mds->mds_fstype, "ext2"))
926 mds->mds_fsops = &mds_ext2_fs_ops;
928 CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype);
929 GOTO(err_kfree, rc = -EPERM);
933 mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
936 CERROR("do_kern_mount failed: rc = %d\n", rc);
940 mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
942 GOTO(err_put, rc = -ENODEV);
944 mds->mds_vfsmnt = mnt;
945 mds->mds_ctxt.pwdmnt = mnt;
946 mds->mds_ctxt.pwd = mnt->mnt_root;
947 mds->mds_ctxt.fs = KERNEL_DS;
949 rc = mds_prep(obddev);
953 rc = mds_recover(obddev);
963 mntput(mds->mds_vfsmnt);
969 kfree(mds->mds_fstype);
973 static int mds_cleanup(struct obd_device * obddev)
975 struct super_operations *s_ops = NULL;
976 struct super_block *sb;
977 struct mds_obd *mds = &obddev->u.mds;
981 if ( !list_empty(&obddev->obd_gen_clients) ) {
982 CERROR("still has clients!\n");
986 ptlrpc_stop_all_threads(mds->mds_service);
987 rpc_unregister_service(mds->mds_service);
988 if (!list_empty(&mds->mds_service->srv_reqs)) {
989 // XXX reply with errors and clean up
990 CERROR("Request list not empty!\n");
992 OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
998 mds_client_free_all(mds);
999 mds_update_server_data(mds);
1000 mds_server_free_data(mds);
1002 if (mds->mds_rcvd_filp) {
1003 int rc = filp_close(mds->mds_rcvd_filp, 0);
1004 mds->mds_rcvd_filp = NULL;
1007 CERROR("last_rcvd file won't close, rc=%d\n", rc);
1012 mntput(mds->mds_vfsmnt);
1014 kfree(mds->mds_fstype);
1016 #ifdef CONFIG_DEV_RDONLY
1017 dev_clear_rdonly(2);
1019 OBD_FREE(s_ops, sizeof(*s_ops));
1025 /* use obd ops to offer management infrastructure */
1026 static struct obd_ops mds_obd_ops = {
1028 o_cleanup: mds_cleanup,
1031 static int __init mds_init(void)
1033 obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
1037 static void __exit mds_exit(void)
1039 obd_unregister_type(LUSTRE_MDS_NAME);
1042 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
1043 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
1044 MODULE_LICENSE("GPL");
1046 module_init(mds_init);
1047 module_exit(mds_exit);