Whamcloud - gitweb
3b0f96550558c19c5207c62df3a58be76ca69e9a
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *  
6  *  Lustre Metadata Server (mds) request handler
7  * 
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  * 
15  *  This server is single threaded at present (but can easily be multi threaded). 
16  * 
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/version.h>
22 #include <linux/module.h>
23 #include <linux/fs.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/ext2_fs.h>
27 #include <linux/quotaops.h>
28 #include <asm/unistd.h>
29 #include <asm/uaccess.h>
30
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/obd_support.h>
34 #include <linux/obd.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_idl.h>
37 #include <linux/lustre_mds.h>
38 #include <linux/lustre_net.h>
39 #include <linux/obd_class.h>
40
41 // XXX for testing
42 static struct mds_obd *MDS;
43
44 // XXX make this networked!  
45 static int mds_queue_req(struct ptlrpc_request *req)
46 {
47         struct ptlrpc_request *srv_req;
48         
49         if (!MDS) { 
50                 EXIT;
51                 return -1;
52         }
53
54         OBD_ALLOC(srv_req, sizeof(*srv_req));
55         if (!srv_req) { 
56                 EXIT;
57                 return -ENOMEM;
58         }
59
60         CDEBUG(0, "---> MDS at %d %p, incoming req %p, srv_req %p\n",
61                __LINE__, MDS, req, srv_req);
62
63         memset(srv_req, 0, sizeof(*req)); 
64
65         /* move the request buffer */
66         srv_req->rq_reqbuf = req->rq_reqbuf;
67         srv_req->rq_reqlen = req->rq_reqlen;
68         srv_req->rq_obd = MDS;
69
70         /* remember where it came from */
71         srv_req->rq_reply_handle = req;
72
73         list_add(&srv_req->rq_list, &MDS->mds_reqs); 
74         wake_up(&MDS->mds_waitq);
75         return 0;
76 }
77
78 int mds_sendpage(struct ptlrpc_request *req, struct file *file, 
79                  __u64 offset, struct niobuf *dst)
80 {
81         int rc; 
82         mm_segment_t oldfs = get_fs();
83
84         if (req->rq_peer.peer_nid == 0) {
85                 /* dst->addr is a user address, but in a different task! */
86                 set_fs(KERNEL_DS); 
87                 rc = generic_file_read(file, (char *)(long)dst->addr, 
88                                        PAGE_SIZE, &offset); 
89                 set_fs(oldfs);
90
91                 if (rc != PAGE_SIZE) 
92                         return -EIO;
93         } else {
94                 char *buf;
95
96                 OBD_ALLOC(buf, PAGE_SIZE);
97                 if (!buf)
98                         return -ENOMEM;
99
100                 set_fs(KERNEL_DS); 
101                 rc = generic_file_read(file, buf, PAGE_SIZE, &offset); 
102                 set_fs(oldfs);
103
104                 if (rc != PAGE_SIZE) {
105                         OBD_FREE(buf, PAGE_SIZE);
106                         return -EIO;
107                 }
108
109                 req->rq_bulkbuf = buf;
110                 req->rq_bulklen = PAGE_SIZE;
111                 init_waitqueue_head(&req->rq_wait_for_bulk);
112                 rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL, 0);
113                 sleep_on(&req->rq_wait_for_bulk);
114                 OBD_FREE(buf, PAGE_SIZE);
115                 req->rq_bulklen = 0; /* FIXME: eek. */
116         }
117
118         return 0;
119 }
120
121 int mds_reply(struct ptlrpc_request *req)
122 {
123         struct ptlrpc_request *clnt_req = req->rq_reply_handle;
124
125         ENTRY;
126         
127         if (req->rq_obd->mds_service != NULL) {
128                 /* This is a request that came from the network via portals. */
129
130                 /* FIXME: we need to increment the count of handled events */
131                 ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0);
132         } else {
133                 /* This is a local request that came from another thread. */
134
135                 /* move the reply to the client */ 
136                 clnt_req->rq_replen = req->rq_replen;
137                 clnt_req->rq_repbuf = req->rq_repbuf;
138                 req->rq_repbuf = NULL;
139                 req->rq_replen = 0;
140
141                 /* free the request buffer */
142                 OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
143                 req->rq_reqbuf = NULL;
144
145                 /* wake up the client */ 
146                 wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
147         }
148
149         EXIT;
150         return 0;
151 }
152
153 int mds_error(struct ptlrpc_request *req)
154 {
155         struct ptlrep_hdr *hdr;
156
157         ENTRY;
158
159         OBD_ALLOC(hdr, sizeof(*hdr));
160         if (!hdr) { 
161                 EXIT;
162                 return -ENOMEM;
163         }
164
165         memset(hdr, 0, sizeof(*hdr));
166         
167         hdr->seqno = req->rq_reqhdr->seqno;
168         hdr->status = req->rq_status; 
169         hdr->type = MDS_TYPE_ERR;
170
171         req->rq_repbuf = (char *)hdr;
172         req->rq_replen = sizeof(*hdr); 
173
174         EXIT;
175         return mds_reply(req);
176 }
177
178 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
179                               struct vfsmount **mnt)
180 {
181         /* stolen from NFS */ 
182         struct super_block *sb = mds->mds_sb; 
183         unsigned long ino = fid->id;
184         //__u32 generation = fid->generation;
185         __u32 generation = 0;
186         struct inode *inode;
187         struct list_head *lp;
188         struct dentry *result;
189
190         if (ino == 0)
191                 return ERR_PTR(-ESTALE);
192
193         inode = iget(sb, ino);
194         if (inode == NULL)
195                 return ERR_PTR(-ENOMEM);
196
197         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb); 
198
199         if (is_bad_inode(inode)
200             || (generation && inode->i_generation != generation)
201                 ) {
202                 /* we didn't find the right inode.. */
203                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
204                         inode->i_ino,
205                         inode->i_nlink, atomic_read(&inode->i_count),
206                         inode->i_generation,
207                         generation);
208                 iput(inode);
209                 return ERR_PTR(-ESTALE);
210         }
211
212         /* now to find a dentry.
213          * If possible, get a well-connected one
214          */
215         if (mnt)
216                 *mnt = mds->mds_vfsmnt;
217         spin_lock(&dcache_lock);
218         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
219                 result = list_entry(lp,struct dentry, d_alias);
220                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
221                         dget_locked(result);
222                         result->d_vfs_flags |= DCACHE_REFERENCED;
223                         spin_unlock(&dcache_lock);
224                         iput(inode);
225                         if (mnt)
226                                 mntget(*mnt);
227                         return result;
228                 }
229         }
230         spin_unlock(&dcache_lock);
231         result = d_alloc_root(inode);
232         if (result == NULL) {
233                 iput(inode);
234                 return ERR_PTR(-ENOMEM);
235         }
236         if (mnt)
237                 mntget(*mnt);
238         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
239         return result;
240 }
241
242 static inline void mds_get_objid(struct inode *inode, __u64 *id)
243 {
244         memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
245 }
246
247 int mds_getattr(struct ptlrpc_request *req)
248 {
249         struct dentry *de;
250         struct inode *inode;
251         struct mds_rep *rep;
252         int rc;
253         
254         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
255                           &req->rq_replen, &req->rq_repbuf);
256         if (rc) { 
257                 EXIT;
258                 CERROR("mds: out of memory\n");
259                 req->rq_status = -ENOMEM;
260                 return 0;
261         }
262
263         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
264         rep = req->rq_rep.mds;
265
266         de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, NULL);
267         if (IS_ERR(de)) { 
268                 EXIT;
269                 req->rq_rephdr->status = -ENOENT;
270                 return 0;
271         }
272
273         inode = de->d_inode;
274         rep->ino = inode->i_ino;
275         rep->atime = inode->i_atime;
276         rep->ctime = inode->i_ctime;
277         rep->mtime = inode->i_mtime;
278         rep->uid = inode->i_uid;
279         rep->gid = inode->i_gid;
280         rep->size = inode->i_size;
281         rep->mode = inode->i_mode;
282         rep->nlink = inode->i_nlink;
283         rep->valid = ~0;
284         mds_get_objid(inode, &rep->objid);
285         dput(de); 
286         return 0;
287 }
288
289 int mds_open(struct ptlrpc_request *req)
290 {
291         struct dentry *de;
292         struct inode *inode;
293         struct mds_rep *rep;
294         struct file *file;
295         struct vfsmount *mnt;
296         __u32 flags;
297         int rc;
298         
299         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
300                           &req->rq_replen, &req->rq_repbuf);
301         if (rc) { 
302                 EXIT;
303                 CERROR("mds: out of memory\n");
304                 req->rq_status = -ENOMEM;
305                 return 0;
306         }
307
308         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
309         rep = req->rq_rep.mds;
310
311         de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, &mnt);
312         if (IS_ERR(de)) { 
313                 EXIT;
314                 req->rq_rephdr->status = -ENOENT;
315                 return 0;
316         }
317         flags = req->rq_req.mds->flags;
318         file = dentry_open(de, mnt, flags);
319         if (!file || IS_ERR(file)) { 
320                 req->rq_rephdr->status = -EINVAL;
321                 return 0;
322         }               
323         
324         rep->objid = (__u64) (unsigned long)file; 
325         mds_get_objid(inode, &rep->objid);
326         dput(de); 
327         return 0;
328 }
329
330
331 int mds_readpage(struct ptlrpc_request *req)
332 {
333         struct vfsmount *mnt;
334         struct dentry *de;
335         struct file *file; 
336         struct niobuf *niobuf; 
337         struct mds_rep *rep;
338         int rc;
339         
340         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
341                           &req->rq_replen, &req->rq_repbuf);
342         if (rc) { 
343                 EXIT;
344                 CERROR("mds: out of memory\n");
345                 req->rq_status = -ENOMEM;
346                 return 0;
347         }
348
349         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
350         rep = req->rq_rep.mds;
351
352         de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, &mnt);
353         if (IS_ERR(de)) { 
354                 EXIT;
355                 req->rq_rephdr->status = PTR_ERR(de); 
356                 return 0;
357         }
358
359         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
360
361         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); 
362         /* note: in case of an error, dentry_open puts dentry */
363         if (IS_ERR(file)) { 
364                 EXIT;
365                 req->rq_rephdr->status = PTR_ERR(file);
366                 return 0;
367         }
368
369         niobuf = mds_req_tgt(req->rq_req.mds);
370
371         /* to make this asynchronous make sure that the handling function 
372            doesn't send a reply when this function completes. Instead a 
373            callback function would send the reply */ 
374         rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf); 
375
376         filp_close(file, 0);
377         req->rq_rephdr->status = rc;
378         EXIT;
379         return 0;
380 }
381
382 int mds_reint(struct ptlrpc_request *req)
383 {
384         int rc;
385         char *buf = mds_req_tgt(req->rq_req.mds);
386         int len = req->rq_req.mds->tgtlen;
387         struct mds_update_record rec;
388         
389         rc = mds_update_unpack(buf, len, &rec);
390         if (rc) { 
391                 CERROR("invalid record\n");
392                 req->rq_status = -EINVAL;
393                 return 0;
394         }
395         /* rc will be used to interrupt a for loop over multiple records */
396         rc = mds_reint_rec(&rec, req); 
397         return 0; 
398 }
399
400 //int mds_handle(struct mds_conn *conn, int len, char *buf)
401 int mds_handle(struct ptlrpc_request *req)
402 {
403         int rc;
404         struct ptlreq_hdr *hdr;
405
406         ENTRY;
407
408         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
409
410         if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
411                 CERROR("lustre_mds: wrong packet type sent %d\n",
412                        NTOH__u32(hdr->type));
413                 rc = -EINVAL;
414                 goto out;
415         }
416
417         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
418                             &req->rq_reqhdr, &req->rq_req);
419         if (rc) { 
420                 CERROR("lustre_mds: Invalid request\n");
421                 EXIT; 
422                 goto out;
423         }
424
425         switch (req->rq_reqhdr->opc) { 
426
427         case MDS_GETATTR:
428                 CDEBUG(D_INODE, "getattr\n");
429                 rc = mds_getattr(req);
430                 break;
431
432         case MDS_READPAGE:
433                 CDEBUG(D_INODE, "readpage\n");
434                 rc = mds_readpage(req);
435                 break;
436
437         case MDS_REINT:
438                 CDEBUG(D_INODE, "reint\n");
439                 rc = mds_reint(req);
440                 break;
441
442         default:
443                 return mds_error(req);
444         }
445
446 out:
447         if (rc) { 
448                 CERROR("no header\n");
449                 return 0;
450         }
451
452         if( req->rq_status) { 
453                 mds_error(req);
454         } else { 
455                 CDEBUG(D_INODE, "sending reply\n"); 
456                 mds_reply(req); 
457         }
458
459         return 0;
460 }
461
462
463 static void mds_timer_run(unsigned long __data)
464 {
465         struct task_struct * p = (struct task_struct *) __data;
466
467         wake_up_process(p);
468 }
469
470 int mds_main(void *arg)
471 {
472         struct mds_obd *mds = (struct mds_obd *) arg;
473         struct timer_list timer;
474         DECLARE_WAITQUEUE(wait, current);
475
476         lock_kernel();
477         daemonize();
478         spin_lock_irq(&current->sigmask_lock);
479         sigfillset(&current->blocked);
480         recalc_sigpending(current);
481         spin_unlock_irq(&current->sigmask_lock);
482
483         sprintf(current->comm, "lustre_mds");
484
485         /* Set up an interval timer which can be used to trigger a
486            wakeup after the interval expires */
487         init_timer(&timer);
488         timer.data = (unsigned long) current;
489         timer.function = mds_timer_run;
490         mds->mds_timer = &timer;
491
492         /* Record that the  thread is running */
493         mds->mds_thread = current;
494         mds->mds_flags = MDS_RUNNING;
495         wake_up(&mds->mds_done_waitq); 
496
497         /* And now, wait forever for commit wakeup events. */
498         while (1) {
499                 int signal;
500                 int rc;
501
502                 wake_up(&mds->mds_done_waitq);
503                 CDEBUG(D_INODE, "mds_wakes pick up req here and continue\n"); 
504
505                 if (mds->mds_service != NULL) {
506                         ptl_event_t ev;
507                         struct ptlrpc_request request;
508                         struct ptlrpc_service *service;
509
510                         CDEBUG(D_IOCTL, "-- sleeping\n");
511                         signal = 0;
512                         add_wait_queue(&mds->mds_waitq, &wait);
513                         while (1) {
514                                 set_current_state(TASK_INTERRUPTIBLE);
515                                 rc = PtlEQGet(mds->mds_service->srv_eq_h, &ev);
516                                 if (rc == PTL_OK || rc == PTL_EQ_DROPPED)
517                                         break;
518                                 CERROR("EQGet rc %d\n", rc); 
519                                 if (mds->mds_flags & MDS_STOPPING)
520                                         break;
521
522                                 /* if this process really wants to die,
523                                  * let it go */
524                                 if (sigismember(&(current->pending.signal),
525                                                 SIGKILL) ||
526                                     sigismember(&(current->pending.signal),
527                                                 SIGINT)) {
528                                         signal = 1;
529                                         break;
530                                 }
531
532                                 schedule();
533                         }
534                         remove_wait_queue(&mds->mds_waitq, &wait);
535                         set_current_state(TASK_RUNNING);
536                         CDEBUG(D_IOCTL, "-- done\n");
537
538                         if (signal == 1) {
539                                 /* We broke out because of a signal */
540                                 EXIT;
541                                 break;
542                         }
543                         if (mds->mds_flags & MDS_STOPPING) { 
544                                 break;
545                         }
546
547                         service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
548
549                         /* FIXME: If we move to an event-driven model,
550                          * we should put the request on the stack of
551                          * mds_handle instead. */
552                         memset(&request, 0, sizeof(request));
553                         request.rq_reqbuf = ev.mem_desc.start + ev.offset;
554                         request.rq_reqlen = ev.mem_desc.length;
555                         request.rq_obd = MDS;
556                         request.rq_xid = ev.match_bits;
557                         CERROR("got req %d\n", request.rq_xid);
558
559                         request.rq_peer.peer_nid = ev.initiator.nid;
560                         /* FIXME: this NI should be the incoming NI.
561                          * We don't know how to find that from here. */
562                         request.rq_peer.peer_ni =
563                                 mds->mds_service->srv_self.peer_ni;
564                         rc = mds_handle(&request);
565
566                         /* Inform the rpc layer the event has been handled */ 
567                         ptl_received_rpc(service);
568                 } else {
569                         struct ptlrpc_request *request;
570
571                         CDEBUG(D_IOCTL, "-- sleeping\n");
572                         add_wait_queue(&mds->mds_waitq, &wait);
573                         while (1) {
574                                 spin_lock(&mds->mds_lock);
575                                 if (!list_empty(&mds->mds_reqs))
576                                         break;
577
578                                 set_current_state(TASK_INTERRUPTIBLE);
579
580                                 /* if this process really wants to die,
581                                  * let it go */
582                                 if (sigismember(&(current->pending.signal),
583                                                 SIGKILL) ||
584                                     sigismember(&(current->pending.signal),
585                                                 SIGINT))
586                                         break;
587
588                                 spin_unlock(&mds->mds_lock);
589
590                                 schedule();
591                         }
592                         remove_wait_queue(&mds->mds_waitq, &wait);
593                         set_current_state(TASK_RUNNING);
594                         CDEBUG(D_IOCTL, "-- done\n");
595
596                         if (list_empty(&mds->mds_reqs)) {
597                                 CDEBUG(D_INODE, "woke because of signal\n");
598                                 spin_unlock(&mds->mds_lock);
599                         } else {
600                                 request = list_entry(mds->mds_reqs.next,
601                                                      struct ptlrpc_request,
602                                                      rq_list);
603                                 list_del(&request->rq_list);
604                                 spin_unlock(&mds->mds_lock);
605                                 rc = mds_handle(request);
606                         }
607                 }
608         }
609
610         del_timer_sync(mds->mds_timer);
611
612         /* XXX maintain a list of all managed devices: cleanup here */
613
614         mds->mds_thread = NULL;
615         wake_up(&mds->mds_done_waitq);
616         CERROR("lustre_mds: exiting\n");
617         return 0;
618 }
619
620 static void mds_stop_srv_thread(struct mds_obd *mds)
621 {
622         mds->mds_flags |= MDS_STOPPING;
623
624         while (mds->mds_thread) {
625                 wake_up(&mds->mds_waitq);
626                 sleep_on(&mds->mds_done_waitq);
627         }
628 }
629
630 static void mds_start_srv_thread(struct mds_obd *mds)
631 {
632         init_waitqueue_head(&mds->mds_waitq);
633         init_waitqueue_head(&mds->mds_done_waitq);
634         kernel_thread(mds_main, (void *)mds, CLONE_VM | CLONE_FS | CLONE_FILES);
635         while (!mds->mds_thread) 
636                 sleep_on(&mds->mds_done_waitq);
637 }
638
639 /* mount the file system (secretly) */
640 static int mds_setup(struct obd_device *obddev, obd_count len,
641                         void *buf)
642                         
643 {
644         struct obd_ioctl_data* data = buf;
645         struct mds_obd *mds = &obddev->u.mds;
646         struct vfsmount *mnt;
647         struct lustre_peer peer;
648         int err; 
649         ENTRY;
650
651
652         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); 
653         err = PTR_ERR(mnt);
654         if (IS_ERR(mnt)) { 
655                 EXIT;
656                 return err;
657         }
658
659         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
660         if (!obddev->u.mds.mds_sb) {
661                 EXIT;
662                 return -ENODEV;
663         }
664
665         mds->mds_vfsmnt = mnt;
666         obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
667
668         mds->mds_ctxt.pwdmnt = mnt;
669         mds->mds_ctxt.pwd = mnt->mnt_root;
670         mds->mds_ctxt.fs = KERNEL_DS;
671         mds->mds_remote_nid = 0;
672
673         INIT_LIST_HEAD(&mds->mds_reqs);
674         mds->mds_thread = NULL;
675         mds->mds_flags = 0;
676         mds->mds_interval = 3 * HZ;
677         MDS = mds;
678
679         spin_lock_init(&obddev->u.mds.mds_lock);
680
681         err = kportal_uuid_to_peer("self", &peer);
682         if (err == 0) {
683                 OBD_ALLOC(mds->mds_service, sizeof(*mds->mds_service));
684                 if (mds->mds_service == NULL)
685                         return -ENOMEM;
686                 mds->mds_service->srv_buf_size = 64 * 1024;
687                 //mds->mds_service->srv_buf_size = 1024;
688                 mds->mds_service->srv_portal = MDS_REQUEST_PORTAL;
689                 memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer));
690                 mds->mds_service->srv_wait_queue = &mds->mds_waitq;
691
692                 rpc_register_service(mds->mds_service, "self");
693         }
694
695         mds_start_srv_thread(mds);
696
697         MOD_INC_USE_COUNT;
698         EXIT; 
699         return 0;
700
701
702 static int mds_cleanup(struct obd_device * obddev)
703 {
704         struct super_block *sb;
705         struct mds_obd *mds = &obddev->u.mds;
706
707         ENTRY;
708
709         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
710                 EXIT;
711                 return 0;
712         }
713
714         if ( !list_empty(&obddev->obd_gen_clients) ) {
715                 CERROR("still has clients!\n");
716                 EXIT;
717                 return -EBUSY;
718         }
719
720         MDS = NULL;
721         mds_stop_srv_thread(mds);
722         rpc_unregister_service(mds->mds_service);
723         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
724
725         sb = mds->mds_sb;
726         if (!mds->mds_sb){
727                 EXIT;
728                 return 0;
729         }
730
731         if (!list_empty(&mds->mds_reqs)) {
732                 // XXX reply with errors and clean up
733                 CDEBUG(D_INODE, "Request list not empty!\n");
734         }
735
736         unlock_kernel();
737         mntput(mds->mds_vfsmnt); 
738         mds->mds_sb = 0;
739         kfree(mds->mds_fstype);
740         lock_kernel();
741
742         MOD_DEC_USE_COUNT;
743         EXIT;
744         return 0;
745 }
746
747 /* use obd ops to offer management infrastructure */
748 static struct obd_ops mds_obd_ops = {
749         o_setup:       mds_setup,
750         o_cleanup:     mds_cleanup,
751 };
752
753 static int __init mds_init(void)
754 {
755         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
756         return 0;
757 }
758
759 static void __exit mds_exit(void)
760 {
761         obd_unregister_type(LUSTRE_MDS_NAME);
762 }
763
764 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
765 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
766 MODULE_LICENSE("GPL");
767
768
769 // for testing (maybe this stays)
770 EXPORT_SYMBOL(mds_queue_req);
771
772 module_init(mds_init);
773 module_exit(mds_exit);