Whamcloud - gitweb
Only set up the MDS service after the filesystem-specific stuff is set up.
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *
6  *  Lustre Metadata Server (mds) request handler
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  *  This server is single threaded at present (but can easily be multi threaded)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/version.h>
22 #include <linux/module.h>
23 #include <linux/fs.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29
30 #define DEBUG_SUBSYSTEM S_MDS
31
32 #include <linux/lustre_mds.h>
33 #include <linux/lustre_lib.h>
34 #include <linux/lustre_net.h>
35
36 int mds_sendpage(struct ptlrpc_request *req, struct file *file,
37                  __u64 offset, struct niobuf *dst)
38 {
39         int rc = 0;
40         mm_segment_t oldfs = get_fs();
41         struct ptlrpc_bulk_desc *bulk;
42         char *buf;
43
44         bulk = ptlrpc_prep_bulk(req->rq_connection);
45         if (bulk == NULL) {
46                 rc = -ENOMEM;
47                 GOTO(out, rc);
48         }
49
50         bulk->b_xid = req->rq_reqmsg->xid;
51
52         OBD_ALLOC(buf, PAGE_SIZE);
53         if (!buf) {
54                 rc = -ENOMEM;
55                 GOTO(cleanup_bulk, rc);
56         }
57
58         set_fs(KERNEL_DS);
59         rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
60                              (loff_t *)&offset);
61         set_fs(oldfs);
62
63         if (rc != PAGE_SIZE) {
64                 rc = -EIO;
65                 GOTO(cleanup_buf, rc);
66         }
67
68         bulk->b_buf = buf;
69         bulk->b_buflen = PAGE_SIZE;
70
71         rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
72         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
73                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
74                        OBD_FAIL_MDS_SENDPAGE, rc);
75                 PtlMDUnlink(bulk->b_md_h);
76                 GOTO(cleanup_buf, rc);
77         }
78         wait_event_interruptible(bulk->b_waitq,
79                                  ptlrpc_check_bulk_sent(bulk));
80
81         if (bulk->b_flags & PTL_RPC_FL_INTR) {
82                 rc = -EINTR;
83                 GOTO(cleanup_buf, rc);
84         }
85
86         EXIT;
87  cleanup_buf:
88         OBD_FREE(buf, PAGE_SIZE);
89  cleanup_bulk:
90         ptlrpc_free_bulk(bulk);
91  out:
92         return rc;
93 }
94
95 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
96                               struct vfsmount **mnt)
97 {
98         /* stolen from NFS */
99         struct super_block *sb = mds->mds_sb;
100         unsigned long ino = fid->id;
101         __u32 generation = fid->generation;
102         struct inode *inode;
103         struct list_head *lp;
104         struct dentry *result;
105
106         if (ino == 0)
107                 return ERR_PTR(-ESTALE);
108
109         inode = iget(sb, ino);
110         if (inode == NULL)
111                 return ERR_PTR(-ENOMEM);
112
113         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
114
115         if (is_bad_inode(inode) ||
116             (generation && inode->i_generation != generation)) {
117                 /* we didn't find the right inode.. */
118                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
119                         inode->i_ino,
120                         inode->i_nlink, atomic_read(&inode->i_count),
121                         inode->i_generation,
122                         generation);
123                 LBUG();
124                 iput(inode);
125                 return ERR_PTR(-ESTALE);
126         }
127
128         /* now to find a dentry.
129          * If possible, get a well-connected one
130          */
131         if (mnt)
132                 *mnt = mds->mds_vfsmnt;
133         spin_lock(&dcache_lock);
134         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
135                 result = list_entry(lp,struct dentry, d_alias);
136                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
137                         dget_locked(result);
138                         result->d_vfs_flags |= DCACHE_REFERENCED;
139                         spin_unlock(&dcache_lock);
140                         iput(inode);
141                         if (mnt)
142                                 mntget(*mnt);
143                         return result;
144                 }
145         }
146         spin_unlock(&dcache_lock);
147         result = d_alloc_root(inode);
148         if (result == NULL) {
149                 iput(inode);
150                 return ERR_PTR(-ENOMEM);
151         }
152         if (mnt)
153                 mntget(*mnt);
154         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
155         return result;
156 }
157
158 int mds_connect(struct ptlrpc_request *req)
159 {
160         struct mds_body *body;
161         struct mds_obd *mds = &req->rq_obd->u.mds;
162         int rc, size = sizeof(*body);
163         ENTRY;
164
165         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
166         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_PACK)) {
167                 CERROR("mds: out of memory\n");
168                 req->rq_status = -ENOMEM;
169                 RETURN(0);
170         }
171
172         body = lustre_msg_buf(req->rq_reqmsg, 0);
173         mds_unpack_req_body(req); 
174         /* Anything we need to do here with the client's trans no or so? */
175
176         body = lustre_msg_buf(req->rq_repmsg, 0);
177         memcpy(&body->fid1, &mds->mds_rootfid  , sizeof(body->fid1)); 
178         mds_pack_rep_body(req);
179         RETURN(0);
180 }
181
182 int mds_getattr(struct ptlrpc_request *req)
183 {
184         struct dentry *de;
185         struct inode *inode;
186         struct mds_body *body;
187         struct mds_obd *mds = &req->rq_obd->u.mds;
188         int rc, size = sizeof(*body);
189         ENTRY;
190
191         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
192         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
193                 CERROR("mds: out of memory\n");
194                 req->rq_status = -ENOMEM;
195                 RETURN(0);
196         }
197
198         body = lustre_msg_buf(req->rq_reqmsg, 0);
199         de = mds_fid2dentry(mds, &body->fid1, NULL);
200         if (IS_ERR(de)) {
201                 req->rq_status = -ENOENT;
202                 RETURN(0);
203         }
204
205         body = lustre_msg_buf(req->rq_repmsg, 0);
206         inode = de->d_inode;
207         body->ino = inode->i_ino;
208         body->generation = inode->i_generation;
209         body->atime = inode->i_atime;
210         body->ctime = inode->i_ctime;
211         body->mtime = inode->i_mtime;
212         body->uid = inode->i_uid;
213         body->gid = inode->i_gid;
214         body->size = inode->i_size;
215         body->mode = inode->i_mode;
216         body->nlink = inode->i_nlink;
217         body->valid = ~0;
218         mds_fs_get_objid(mds, inode, &body->objid);
219         l_dput(de);
220         RETURN(0);
221 }
222
223 int mds_open(struct ptlrpc_request *req)
224 {
225         struct dentry *de;
226         struct mds_body *body;
227         struct file *file;
228         struct vfsmount *mnt;
229         __u32 flags;
230         int rc, size = sizeof(*body);
231         ENTRY;
232
233         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
234         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
235                 CERROR("mds: out of memory\n");
236                 req->rq_status = -ENOMEM;
237                 RETURN(0);
238         }
239
240         body = lustre_msg_buf(req->rq_reqmsg, 0);
241         de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
242         if (IS_ERR(de)) {
243                 req->rq_status = -ENOENT;
244                 RETURN(0);
245         }
246         flags = body->flags;
247         file = dentry_open(de, mnt, flags);
248         if (!file || IS_ERR(file)) {
249                 req->rq_status = -EINVAL;
250                 RETURN(0);
251         }
252
253         body = lustre_msg_buf(req->rq_repmsg, 0);
254         body->objid = (__u64) (unsigned long)file;
255         RETURN(0);
256 }
257
258 int mds_close(struct ptlrpc_request *req)
259 {
260         struct dentry *de;
261         struct mds_body *body;
262         struct file *file;
263         struct vfsmount *mnt;
264         int rc;
265         ENTRY;
266
267         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
268         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
269                 CERROR("mds: out of memory\n");
270                 req->rq_status = -ENOMEM;
271                 RETURN(0);
272         }
273
274         body = lustre_msg_buf(req->rq_reqmsg, 0);
275         de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
276         if (IS_ERR(de)) {
277                 req->rq_status = -ENOENT;
278                 RETURN(0);
279         }
280
281         file = (struct file *)(unsigned long)body->objid;
282         req->rq_status = filp_close(file, 0);
283         l_dput(de);
284         mntput(mnt);
285
286         RETURN(0);
287 }
288
289 int mds_readpage(struct ptlrpc_request *req)
290 {
291         struct vfsmount *mnt;
292         struct dentry *de;
293         struct file *file;
294         struct niobuf *niobuf;
295         struct mds_body *body;
296         int rc, size = sizeof(*body);
297         ENTRY;
298
299         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
300         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
301                 CERROR("mds: out of memory\n");
302                 req->rq_status = -ENOMEM;
303                 RETURN(0);
304         }
305
306         body = lustre_msg_buf(req->rq_reqmsg, 0);
307         de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
308         if (IS_ERR(de)) {
309                 req->rq_status = PTR_ERR(de);
310                 RETURN(0);
311         }
312
313         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
314
315         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
316         /* note: in case of an error, dentry_open puts dentry */
317         if (IS_ERR(file)) {
318                 req->rq_status = PTR_ERR(file);
319                 RETURN(0);
320         }
321
322         niobuf = lustre_msg_buf(req->rq_reqmsg, 1);
323         if (!niobuf) {
324                 req->rq_status = -EINVAL;
325                 LBUG();
326                 RETURN(0);
327         }
328
329         /* to make this asynchronous make sure that the handling function
330            doesn't send a reply when this function completes. Instead a
331            callback function would send the reply */
332         rc = mds_sendpage(req, file, body->size, niobuf);
333
334         filp_close(file, 0);
335         req->rq_status = rc;
336         RETURN(0);
337 }
338
339 int mds_reint(struct ptlrpc_request *req)
340 {
341         int rc;
342         struct mds_update_record rec;
343
344         rc = mds_update_unpack(req, &rec);
345         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
346                 CERROR("invalid record\n");
347                 req->rq_status = -EINVAL;
348                 RETURN(0);
349         }
350         /* rc will be used to interrupt a for loop over multiple records */
351         rc = mds_reint_rec(&rec, req);
352         return 0;
353 }
354
355 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
356                struct ptlrpc_request *req)
357 {
358         int rc;
359         ENTRY;
360
361         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
362         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
363                 CERROR("lustre_mds: Invalid request\n");
364                 GOTO(out, rc);
365         }
366
367         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
368                 CERROR("lustre_mds: wrong packet type sent %d\n",
369                        req->rq_reqmsg->type);
370                 GOTO(out, rc = -EINVAL);
371         }
372
373         switch (req->rq_reqmsg->opc) {
374         case MDS_CONNECT:
375                 CDEBUG(D_INODE, "getattr\n");
376                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
377                 rc = mds_connect(req);
378                 break;
379
380         case MDS_GETATTR:
381                 CDEBUG(D_INODE, "getattr\n");
382                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
383                 rc = mds_getattr(req);
384                 break;
385
386         case MDS_READPAGE:
387                 CDEBUG(D_INODE, "readpage\n");
388                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
389                 rc = mds_readpage(req);
390
391                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
392                         return 0;
393                 break;
394
395         case MDS_REINT:
396                 CDEBUG(D_INODE, "reint\n");
397                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
398                 rc = mds_reint(req);
399                 break;
400
401         case MDS_OPEN:
402                 CDEBUG(D_INODE, "open\n");
403                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
404                 rc = mds_open(req);
405                 break;
406
407         case MDS_CLOSE:
408                 CDEBUG(D_INODE, "close\n");
409                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
410                 rc = mds_close(req);
411                 break;
412
413         default:
414                 rc = ptlrpc_error(svc, req);
415                 RETURN(rc);
416         }
417
418         EXIT;
419 out:
420         if (rc) {
421                 ptlrpc_error(svc, req);
422         } else {
423                 CDEBUG(D_NET, "sending reply\n");
424                 ptlrpc_reply(svc, req);
425         }
426
427         return 0;
428 }
429
430 #define LAST_RCVD "last_rcvd"
431
432 static int mds_prep(struct obd_device *obddev)
433 {
434         struct obd_run_ctxt saved;
435         struct mds_obd *mds = &obddev->u.mds;
436         struct super_operations *s_ops;
437         struct file *f;
438         loff_t off = 0;
439         __u64 mount_count;
440         int rc;
441
442         push_ctxt(&saved, &mds->mds_ctxt);
443         rc = simple_mkdir(current->fs->pwd, "ROOT", 0700);
444         if (rc && rc != -EEXIST) {
445                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
446                 GOTO(err_pop, rc);
447         }
448         f = filp_open("ROOT", O_RDONLY, 0);
449         if (IS_ERR(f)) {
450                 rc = PTR_ERR(f);
451                 CERROR("cannot open ROOT: rc = %d\n", rc);
452                 LBUG();
453                 GOTO(err_pop, rc);
454         }
455
456         mds->mds_rootfid.id = f->f_dentry->d_inode->i_ino;
457         mds->mds_rootfid.generation = f->f_dentry->d_inode->i_generation;
458         mds->mds_rootfid.f_type = S_IFDIR;
459
460         rc = filp_close(f, 0);
461         if (rc) {
462                 CERROR("cannot close ROOT: rc = %d\n", rc);
463                 LBUG();
464         }
465
466         rc = simple_mkdir(current->fs->pwd, "FH", 0700);
467         if (rc && rc != -EEXIST) {
468                 CERROR("cannot create FH directory: rc = %d\n", rc);
469                 GOTO(err_pop, rc);
470         }
471
472         f = filp_open("mount_count", O_RDWR | O_CREAT, 0644);
473         if (IS_ERR(f)) {
474                 rc = PTR_ERR(f);
475                 CERROR("cannot open/create mount_count file, rc = %d\n", rc);
476                 GOTO(err_pop, rc = PTR_ERR(f));
477         }
478         rc = lustre_fread(f, (char *)&mount_count, sizeof(mount_count), &off);
479         if (rc == 0) {
480                 CERROR("empty MDS mount_count, new MDS?\n");
481                 /* XXX maybe this should just be a random number? */
482                 mds->mds_mount_count = 0;
483         } else if (rc != sizeof(mount_count)) {
484                 CERROR("error reading mount_count: rc = %d\n", rc);
485                 /* XXX maybe this should just be a random number? */
486                 mds->mds_mount_count = 0;
487         } else {
488                 mds->mds_mount_count = le64_to_cpu(mount_count);
489         }
490
491         mds->mds_mount_count++;
492         CDEBUG(D_SUPER, "MDS mount_count is %Ld\n",
493                (unsigned long long)mds->mds_mount_count);
494         off = 0;
495         mount_count = cpu_to_le64(mds->mds_mount_count);
496         rc = lustre_fwrite(f, (char *)&mount_count, sizeof(mount_count), &off);
497         if (rc != sizeof(mount_count))
498                 CERROR("error writing mount_count: rc = %d\n", rc);
499         rc = filp_close(f, 0);
500         if (rc)
501                 CERROR("error closing mount_count: rc = %d\n", rc);
502
503         f = filp_open("last_rcvd", O_RDWR | O_CREAT, 0644);
504         if (IS_ERR(f)) {
505                 rc = PTR_ERR(f);
506                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
507                 GOTO(err_pop, rc);
508         }
509         mds->mds_rcvd_filp = f;
510         pop_ctxt(&saved);
511
512         /*
513          * Replace the client filesystem delete_inode method with our own,
514          * so that we can clear the object ID before the inode is deleted.
515          * The fs_delete_inode method will call cl_delete_inode for us.
516          *
517          * We need to do this for the MDS superblock only, hence we install
518          * a modified copy of the original superblock method table.
519          *
520          * We still assume that there is only a single MDS client filesystem
521          * type, as we don't have access to the mds struct in delete_inode
522          * and store the client delete_inode method in a global table.  This
523          * will only become a problem when multiple MDSs are running on a
524          * single host with different client filesystems.
525          */
526         OBD_ALLOC(s_ops, sizeof(*s_ops));
527         if (!s_ops)
528                 GOTO(err_filp, rc = -ENOMEM);
529
530         memcpy(s_ops, mds->mds_sb->s_op, sizeof(*s_ops));
531         mds->mds_fsops->cl_delete_inode = s_ops->delete_inode;
532         s_ops->delete_inode = mds->mds_fsops->fs_delete_inode;
533         mds->mds_sb->s_op = s_ops;
534
535         mds->mds_service = ptlrpc_init_svc(128 * 1024,
536                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
537                                            "self", mds_handle);
538
539         if (!mds->mds_service) {
540                 CERROR("failed to start service\n");
541                 GOTO(err_filp, rc = -EINVAL);
542         }
543
544         rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
545         if (rc) {
546                 CERROR("cannot start thread\n");
547                 GOTO(err_svc, rc);
548         }
549
550         RETURN(0);
551
552 err_svc:
553         rpc_unregister_service(mds->mds_service);
554         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
555 err_filp:
556         if (filp_close(f, 0))
557                 CERROR("can't close %s after error\n", LAST_RCVD);
558 err_pop:
559         pop_ctxt(&saved);
560
561         return rc;
562 }
563
564 /* mount the file system (secretly) */
565 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
566 {
567         struct obd_ioctl_data* data = buf;
568         struct mds_obd *mds = &obddev->u.mds;
569         struct vfsmount *mnt;
570         int rc = 0;
571         ENTRY;
572
573 #ifdef CONFIG_DEV_RDONLY
574         dev_clear_rdonly(2);
575 #endif
576         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
577                 RETURN(-EINVAL);
578
579         mds->mds_fstype = strdup(data->ioc_inlbuf2);
580
581         if (!strcmp(mds->mds_fstype, "ext3"))
582                 mds->mds_fsops = &mds_ext3_fs_ops;
583         else if (!strcmp(mds->mds_fstype, "ext2"))
584                 mds->mds_fsops = &mds_ext2_fs_ops;
585         else {
586                 CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype);
587                 GOTO(err_kfree, rc = -EPERM);
588         }
589
590         MOD_INC_USE_COUNT;
591         mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
592         if (IS_ERR(mnt)) {
593                 rc = PTR_ERR(mnt);
594                 CERROR("do_kern_mount failed: rc = %d\n", rc);
595                 GOTO(err_dec, rc);
596         }
597
598         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
599         if (!mds->mds_sb)
600                 GOTO(err_put, rc = -ENODEV);
601
602         mds->mds_vfsmnt = mnt;
603         mds->mds_ctxt.pwdmnt = mnt;
604         mds->mds_ctxt.pwd = mnt->mnt_root;
605         mds->mds_ctxt.fs = KERNEL_DS;
606
607         rc = mds_prep(obddev);
608         if (rc)
609                 GOTO(err_put, rc);
610
611         RETURN(0);
612
613 err_put:
614         unlock_kernel();
615         mntput(mds->mds_vfsmnt);
616         mds->mds_sb = 0;
617         lock_kernel();
618 err_dec:
619         MOD_DEC_USE_COUNT;
620 err_kfree:
621         kfree(mds->mds_fstype);
622         return rc;
623 }
624
625 static int mds_cleanup(struct obd_device * obddev)
626 {
627         struct super_operations *s_ops = NULL;
628         struct super_block *sb;
629         struct mds_obd *mds = &obddev->u.mds;
630
631         ENTRY;
632
633         if ( !list_empty(&obddev->obd_gen_clients) ) {
634                 CERROR("still has clients!\n");
635                 RETURN(-EBUSY);
636         }
637
638         ptlrpc_stop_thread(mds->mds_service);
639         rpc_unregister_service(mds->mds_service);
640         if (!list_empty(&mds->mds_service->srv_reqs)) {
641                 // XXX reply with errors and clean up
642                 CERROR("Request list not empty!\n");
643         }
644         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
645
646         sb = mds->mds_sb;
647         if (!mds->mds_sb)
648                 RETURN(0);
649
650         if (mds->mds_rcvd_filp) {
651                 int rc = filp_close(mds->mds_rcvd_filp, 0);
652                 mds->mds_rcvd_filp = NULL;
653
654                 if (rc)
655                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
656         }
657         s_ops = sb->s_op;
658
659         unlock_kernel();
660         mntput(mds->mds_vfsmnt);
661         mds->mds_sb = 0;
662         kfree(mds->mds_fstype);
663         lock_kernel();
664 #ifdef CONFIG_DEV_RDONLY
665         dev_clear_rdonly(2);
666 #endif
667         OBD_FREE(s_ops, sizeof(*s_ops));
668
669         MOD_DEC_USE_COUNT;
670         RETURN(0);
671 }
672
673 /* use obd ops to offer management infrastructure */
674 static struct obd_ops mds_obd_ops = {
675         o_setup:       mds_setup,
676         o_cleanup:     mds_cleanup,
677 };
678
679 static int __init mds_init(void)
680 {
681         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
682         return 0;
683 }
684
685 static void __exit mds_exit(void)
686 {
687         obd_unregister_type(LUSTRE_MDS_NAME);
688 }
689
690 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
691 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
692 MODULE_LICENSE("GPL");
693
694 module_init(mds_init);
695 module_exit(mds_exit);