Whamcloud - gitweb
- added rq_type field to ptlrpc_request
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *  
6  *  Lustre Metadata Server (mds) request handler
7  * 
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  * 
15  *  This server is single threaded at present (but can easily be multi threaded). 
16  * 
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/version.h>
22 #include <linux/module.h>
23 #include <linux/fs.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/ext2_fs.h>
27 #include <linux/quotaops.h>
28 #include <asm/unistd.h>
29 #include <asm/uaccess.h>
30
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/obd_support.h>
34 #include <linux/obd.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_idl.h>
37 #include <linux/lustre_mds.h>
38 #include <linux/lustre_net.h>
39 #include <linux/obd_class.h>
40
41 // XXX for testing
42 static struct mds_obd *MDS;
43
44 // XXX make this networked!  
45 static int mds_queue_req(struct ptlrpc_request *req)
46 {
47         struct ptlrpc_request *srv_req;
48         
49         if (!MDS) { 
50                 EXIT;
51                 return -1;
52         }
53
54         OBD_ALLOC(srv_req, sizeof(*srv_req));
55         if (!srv_req) { 
56                 EXIT;
57                 return -ENOMEM;
58         }
59
60         CDEBUG(0, "---> MDS at %d %p, incoming req %p, srv_req %p\n",
61                __LINE__, MDS, req, srv_req);
62
63         memset(srv_req, 0, sizeof(*req)); 
64
65         /* move the request buffer */
66         srv_req->rq_reqbuf = req->rq_reqbuf;
67         srv_req->rq_reqlen = req->rq_reqlen;
68         srv_req->rq_obd = MDS;
69
70         /* remember where it came from */
71         srv_req->rq_reply_handle = req;
72
73         list_add(&srv_req->rq_list, &MDS->mds_reqs); 
74         wake_up(&MDS->mds_waitq);
75         return 0;
76 }
77
78 int mds_sendpage(struct ptlrpc_request *req, struct file *file, 
79                  __u64 offset, struct niobuf *dst)
80 {
81         int rc; 
82         mm_segment_t oldfs = get_fs();
83
84         if (req->rq_peer.peer_nid == 0) {
85                 /* dst->addr is a user address, but in a different task! */
86                 set_fs(KERNEL_DS); 
87                 rc = generic_file_read(file, (char *)(long)dst->addr, 
88                                        PAGE_SIZE, &offset); 
89                 set_fs(oldfs);
90
91                 if (rc != PAGE_SIZE) 
92                         return -EIO;
93         } else {
94                 char *buf;
95                 DECLARE_WAITQUEUE(wait, current);
96
97                 OBD_ALLOC(buf, PAGE_SIZE);
98                 if (!buf)
99                         return -ENOMEM;
100
101                 set_fs(KERNEL_DS); 
102                 rc = generic_file_read(file, buf, PAGE_SIZE, &offset); 
103                 set_fs(oldfs);
104
105                 if (rc != PAGE_SIZE) {
106                         OBD_FREE(buf, PAGE_SIZE);
107                         return -EIO;
108                 }
109
110                 req->rq_type = PTLRPC_BULK;
111                 req->rq_bulkbuf = buf;
112                 req->rq_bulklen = PAGE_SIZE;
113
114                 init_waitqueue_head(&req->rq_wait_for_bulk);
115                 rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL);
116                 add_wait_queue(&req->rq_wait_for_bulk, &wait);
117                 /* The bulk callback will set rq->bulkbuf to NULL when it's
118                  * been ACKed and it's finished using it. */
119                 while (req->rq_bulkbuf != NULL) {
120                         set_current_state(TASK_INTERRUPTIBLE);
121
122                         /* if this process really wants to die, let it go */
123                         if (sigismember(&(current->pending.signal), SIGKILL) ||
124                             sigismember(&(current->pending.signal), SIGINT))
125                                 break;
126
127                         schedule();
128                 }
129                 remove_wait_queue(&req->rq_wait_for_bulk, &wait);
130                 set_current_state(TASK_RUNNING);
131
132                 if (req->rq_bulkbuf != NULL) {
133                         EXIT;
134                         return -EINTR;
135                 }
136
137                 OBD_FREE(buf, PAGE_SIZE);
138                 req->rq_bulklen = 0; /* FIXME: eek. */
139         }
140
141         return 0;
142 }
143
144 int mds_reply(struct ptlrpc_request *req)
145 {
146         struct ptlrpc_request *clnt_req = req->rq_reply_handle;
147
148         ENTRY;
149         
150         if (req->rq_obd->mds_service != NULL) {
151                 /* This is a request that came from the network via portals. */
152
153                 /* FIXME: we need to increment the count of handled events */
154                 req->rq_type = PTLRPC_REPLY;
155                 ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL);
156         } else {
157                 /* This is a local request that came from another thread. */
158
159                 /* move the reply to the client */ 
160                 clnt_req->rq_replen = req->rq_replen;
161                 clnt_req->rq_repbuf = req->rq_repbuf;
162                 req->rq_repbuf = NULL;
163                 req->rq_replen = 0;
164
165                 /* free the request buffer */
166                 OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
167                 req->rq_reqbuf = NULL;
168
169                 /* wake up the client */ 
170                 wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
171         }
172
173         EXIT;
174         return 0;
175 }
176
177 int mds_error(struct ptlrpc_request *req)
178 {
179         struct ptlrep_hdr *hdr;
180
181         ENTRY;
182
183         OBD_ALLOC(hdr, sizeof(*hdr));
184         if (!hdr) { 
185                 EXIT;
186                 return -ENOMEM;
187         }
188
189         memset(hdr, 0, sizeof(*hdr));
190         
191         hdr->seqno = req->rq_reqhdr->seqno;
192         hdr->status = req->rq_status; 
193         hdr->type = MDS_TYPE_ERR;
194
195         req->rq_repbuf = (char *)hdr;
196         req->rq_replen = sizeof(*hdr); 
197
198         EXIT;
199         return mds_reply(req);
200 }
201
202 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
203                               struct vfsmount **mnt)
204 {
205         /* stolen from NFS */ 
206         struct super_block *sb = mds->mds_sb; 
207         unsigned long ino = fid->id;
208         //__u32 generation = fid->generation;
209         __u32 generation = 0;
210         struct inode *inode;
211         struct list_head *lp;
212         struct dentry *result;
213
214         if (ino == 0)
215                 return ERR_PTR(-ESTALE);
216
217         inode = iget(sb, ino);
218         if (inode == NULL)
219                 return ERR_PTR(-ENOMEM);
220
221         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb); 
222
223         if (is_bad_inode(inode)
224             || (generation && inode->i_generation != generation)
225                 ) {
226                 /* we didn't find the right inode.. */
227                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
228                         inode->i_ino,
229                         inode->i_nlink, atomic_read(&inode->i_count),
230                         inode->i_generation,
231                         generation);
232                 iput(inode);
233                 return ERR_PTR(-ESTALE);
234         }
235
236         /* now to find a dentry.
237          * If possible, get a well-connected one
238          */
239         if (mnt)
240                 *mnt = mds->mds_vfsmnt;
241         spin_lock(&dcache_lock);
242         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
243                 result = list_entry(lp,struct dentry, d_alias);
244                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
245                         dget_locked(result);
246                         result->d_vfs_flags |= DCACHE_REFERENCED;
247                         spin_unlock(&dcache_lock);
248                         iput(inode);
249                         if (mnt)
250                                 mntget(*mnt);
251                         return result;
252                 }
253         }
254         spin_unlock(&dcache_lock);
255         result = d_alloc_root(inode);
256         if (result == NULL) {
257                 iput(inode);
258                 return ERR_PTR(-ENOMEM);
259         }
260         if (mnt)
261                 mntget(*mnt);
262         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
263         return result;
264 }
265
266 static inline void mds_get_objid(struct inode *inode, __u64 *id)
267 {
268         memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
269 }
270
271 int mds_getattr(struct ptlrpc_request *req)
272 {
273         struct dentry *de;
274         struct inode *inode;
275         struct mds_rep *rep;
276         int rc;
277         
278         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
279                           &req->rq_replen, &req->rq_repbuf);
280         if (rc) { 
281                 EXIT;
282                 CERROR("mds: out of memory\n");
283                 req->rq_status = -ENOMEM;
284                 return 0;
285         }
286
287         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
288         rep = req->rq_rep.mds;
289
290         de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, NULL);
291         if (IS_ERR(de)) { 
292                 EXIT;
293                 req->rq_rephdr->status = -ENOENT;
294                 return 0;
295         }
296
297         inode = de->d_inode;
298         rep->ino = inode->i_ino;
299         rep->atime = inode->i_atime;
300         rep->ctime = inode->i_ctime;
301         rep->mtime = inode->i_mtime;
302         rep->uid = inode->i_uid;
303         rep->gid = inode->i_gid;
304         rep->size = inode->i_size;
305         rep->mode = inode->i_mode;
306         rep->nlink = inode->i_nlink;
307         rep->valid = ~0;
308         mds_get_objid(inode, &rep->objid);
309         dput(de); 
310         return 0;
311 }
312
313 int mds_open(struct ptlrpc_request *req)
314 {
315         struct dentry *de;
316         struct inode *inode;
317         struct mds_rep *rep;
318         struct file *file;
319         struct vfsmount *mnt;
320         __u32 flags;
321         int rc;
322         
323         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
324                           &req->rq_replen, &req->rq_repbuf);
325         if (rc) { 
326                 EXIT;
327                 CERROR("mds: out of memory\n");
328                 req->rq_status = -ENOMEM;
329                 return 0;
330         }
331
332         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
333         rep = req->rq_rep.mds;
334
335         de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, &mnt);
336         if (IS_ERR(de)) { 
337                 EXIT;
338                 req->rq_rephdr->status = -ENOENT;
339                 return 0;
340         }
341         flags = req->rq_req.mds->flags;
342         file = dentry_open(de, mnt, flags);
343         if (!file || IS_ERR(file)) { 
344                 req->rq_rephdr->status = -EINVAL;
345                 return 0;
346         }               
347         
348         rep->objid = (__u64) (unsigned long)file; 
349         mds_get_objid(inode, &rep->objid);
350         dput(de); 
351         return 0;
352 }
353
354
355 int mds_readpage(struct ptlrpc_request *req)
356 {
357         struct vfsmount *mnt;
358         struct dentry *de;
359         struct file *file; 
360         struct niobuf *niobuf; 
361         struct mds_rep *rep;
362         int rc;
363         
364         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
365                           &req->rq_replen, &req->rq_repbuf);
366         if (rc) { 
367                 EXIT;
368                 CERROR("mds: out of memory\n");
369                 req->rq_status = -ENOMEM;
370                 return 0;
371         }
372
373         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
374         rep = req->rq_rep.mds;
375
376         de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, &mnt);
377         if (IS_ERR(de)) { 
378                 EXIT;
379                 req->rq_rephdr->status = PTR_ERR(de); 
380                 return 0;
381         }
382
383         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
384
385         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); 
386         /* note: in case of an error, dentry_open puts dentry */
387         if (IS_ERR(file)) { 
388                 EXIT;
389                 req->rq_rephdr->status = PTR_ERR(file);
390                 return 0;
391         }
392
393         niobuf = mds_req_tgt(req->rq_req.mds);
394
395         /* to make this asynchronous make sure that the handling function 
396            doesn't send a reply when this function completes. Instead a 
397            callback function would send the reply */ 
398         rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf); 
399
400         filp_close(file, 0);
401         req->rq_rephdr->status = rc;
402         EXIT;
403         return 0;
404 }
405
406 int mds_reint(struct ptlrpc_request *req)
407 {
408         int rc;
409         char *buf = mds_req_tgt(req->rq_req.mds);
410         int len = req->rq_req.mds->tgtlen;
411         struct mds_update_record rec;
412         
413         rc = mds_update_unpack(buf, len, &rec);
414         if (rc) { 
415                 CERROR("invalid record\n");
416                 req->rq_status = -EINVAL;
417                 return 0;
418         }
419         /* rc will be used to interrupt a for loop over multiple records */
420         rc = mds_reint_rec(&rec, req); 
421         return 0; 
422 }
423
424 //int mds_handle(struct mds_conn *conn, int len, char *buf)
425 int mds_handle(struct ptlrpc_request *req)
426 {
427         int rc;
428         struct ptlreq_hdr *hdr;
429
430         ENTRY;
431
432         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
433
434         if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
435                 CERROR("lustre_mds: wrong packet type sent %d\n",
436                        NTOH__u32(hdr->type));
437                 rc = -EINVAL;
438                 goto out;
439         }
440
441         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
442                             &req->rq_reqhdr, &req->rq_req);
443         if (rc) { 
444                 CERROR("lustre_mds: Invalid request\n");
445                 EXIT; 
446                 goto out;
447         }
448
449         switch (req->rq_reqhdr->opc) { 
450
451         case MDS_GETATTR:
452                 CDEBUG(D_INODE, "getattr\n");
453                 rc = mds_getattr(req);
454                 break;
455
456         case MDS_READPAGE:
457                 CDEBUG(D_INODE, "readpage\n");
458                 rc = mds_readpage(req);
459                 break;
460
461         case MDS_REINT:
462                 CDEBUG(D_INODE, "reint\n");
463                 rc = mds_reint(req);
464                 break;
465
466         default:
467                 return mds_error(req);
468         }
469
470 out:
471         if (rc) { 
472                 CERROR("no header\n");
473                 return 0;
474         }
475
476         if( req->rq_status) { 
477                 mds_error(req);
478         } else { 
479                 CDEBUG(D_INODE, "sending reply\n"); 
480                 mds_reply(req); 
481         }
482
483         return 0;
484 }
485
486
487 static void mds_timer_run(unsigned long __data)
488 {
489         struct task_struct * p = (struct task_struct *) __data;
490
491         wake_up_process(p);
492 }
493
494 int mds_main(void *arg)
495 {
496         struct mds_obd *mds = (struct mds_obd *) arg;
497         struct timer_list timer;
498         DECLARE_WAITQUEUE(wait, current);
499
500         lock_kernel();
501         daemonize();
502         spin_lock_irq(&current->sigmask_lock);
503         sigfillset(&current->blocked);
504         recalc_sigpending(current);
505         spin_unlock_irq(&current->sigmask_lock);
506
507         sprintf(current->comm, "lustre_mds");
508
509         /* Set up an interval timer which can be used to trigger a
510            wakeup after the interval expires */
511         init_timer(&timer);
512         timer.data = (unsigned long) current;
513         timer.function = mds_timer_run;
514         mds->mds_timer = &timer;
515
516         /* Record that the  thread is running */
517         mds->mds_thread = current;
518         mds->mds_flags = MDS_RUNNING;
519         wake_up(&mds->mds_done_waitq); 
520
521         /* And now, wait forever for commit wakeup events. */
522         while (1) {
523                 int signal;
524                 int rc;
525
526                 wake_up(&mds->mds_done_waitq);
527                 CDEBUG(D_INODE, "mds_wakes pick up req here and continue\n"); 
528
529                 if (mds->mds_service != NULL) {
530                         ptl_event_t ev;
531                         struct ptlrpc_request request;
532                         struct ptlrpc_service *service;
533
534                         CDEBUG(D_IOCTL, "-- sleeping\n");
535                         signal = 0;
536                         add_wait_queue(&mds->mds_waitq, &wait);
537                         while (1) {
538                                 set_current_state(TASK_INTERRUPTIBLE);
539                                 rc = PtlEQGet(mds->mds_service->srv_eq_h, &ev);
540                                 if (rc == PTL_OK || rc == PTL_EQ_DROPPED)
541                                         break;
542                                 CERROR("EQGet rc %d\n", rc); 
543                                 if (mds->mds_flags & MDS_STOPPING)
544                                         break;
545
546                                 /* if this process really wants to die,
547                                  * let it go */
548                                 if (sigismember(&(current->pending.signal),
549                                                 SIGKILL) ||
550                                     sigismember(&(current->pending.signal),
551                                                 SIGINT)) {
552                                         signal = 1;
553                                         break;
554                                 }
555
556                                 schedule();
557                         }
558                         remove_wait_queue(&mds->mds_waitq, &wait);
559                         set_current_state(TASK_RUNNING);
560                         CDEBUG(D_IOCTL, "-- done\n");
561
562                         if (signal == 1) {
563                                 /* We broke out because of a signal */
564                                 EXIT;
565                                 break;
566                         }
567                         if (mds->mds_flags & MDS_STOPPING) { 
568                                 break;
569                         }
570
571                         service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
572
573                         /* FIXME: If we move to an event-driven model,
574                          * we should put the request on the stack of
575                          * mds_handle instead. */
576                         memset(&request, 0, sizeof(request));
577                         request.rq_reqbuf = ev.mem_desc.start + ev.offset;
578                         request.rq_reqlen = ev.mem_desc.length;
579                         request.rq_obd = MDS;
580                         request.rq_xid = ev.match_bits;
581                         CERROR("got req %d\n", request.rq_xid);
582
583                         request.rq_peer.peer_nid = ev.initiator.nid;
584                         /* FIXME: this NI should be the incoming NI.
585                          * We don't know how to find that from here. */
586                         request.rq_peer.peer_ni =
587                                 mds->mds_service->srv_self.peer_ni;
588                         rc = mds_handle(&request);
589
590                         /* Inform the rpc layer the event has been handled */ 
591                         ptl_received_rpc(service);
592                 } else {
593                         struct ptlrpc_request *request;
594
595                         CDEBUG(D_IOCTL, "-- sleeping\n");
596                         add_wait_queue(&mds->mds_waitq, &wait);
597                         while (1) {
598                                 spin_lock(&mds->mds_lock);
599                                 if (!list_empty(&mds->mds_reqs))
600                                         break;
601
602                                 set_current_state(TASK_INTERRUPTIBLE);
603
604                                 /* if this process really wants to die,
605                                  * let it go */
606                                 if (sigismember(&(current->pending.signal),
607                                                 SIGKILL) ||
608                                     sigismember(&(current->pending.signal),
609                                                 SIGINT))
610                                         break;
611
612                                 spin_unlock(&mds->mds_lock);
613
614                                 schedule();
615                         }
616                         remove_wait_queue(&mds->mds_waitq, &wait);
617                         set_current_state(TASK_RUNNING);
618                         CDEBUG(D_IOCTL, "-- done\n");
619
620                         if (list_empty(&mds->mds_reqs)) {
621                                 CDEBUG(D_INODE, "woke because of signal\n");
622                                 spin_unlock(&mds->mds_lock);
623                         } else {
624                                 request = list_entry(mds->mds_reqs.next,
625                                                      struct ptlrpc_request,
626                                                      rq_list);
627                                 list_del(&request->rq_list);
628                                 spin_unlock(&mds->mds_lock);
629                                 rc = mds_handle(request);
630                         }
631                 }
632         }
633
634         del_timer_sync(mds->mds_timer);
635
636         /* XXX maintain a list of all managed devices: cleanup here */
637
638         mds->mds_thread = NULL;
639         wake_up(&mds->mds_done_waitq);
640         CERROR("lustre_mds: exiting\n");
641         return 0;
642 }
643
644 static void mds_stop_srv_thread(struct mds_obd *mds)
645 {
646         mds->mds_flags |= MDS_STOPPING;
647
648         while (mds->mds_thread) {
649                 wake_up(&mds->mds_waitq);
650                 sleep_on(&mds->mds_done_waitq);
651         }
652 }
653
654 static void mds_start_srv_thread(struct mds_obd *mds)
655 {
656         init_waitqueue_head(&mds->mds_waitq);
657         init_waitqueue_head(&mds->mds_done_waitq);
658         kernel_thread(mds_main, (void *)mds, CLONE_VM | CLONE_FS | CLONE_FILES);
659         while (!mds->mds_thread) 
660                 sleep_on(&mds->mds_done_waitq);
661 }
662
663 /* mount the file system (secretly) */
664 static int mds_setup(struct obd_device *obddev, obd_count len,
665                         void *buf)
666                         
667 {
668         struct obd_ioctl_data* data = buf;
669         struct mds_obd *mds = &obddev->u.mds;
670         struct vfsmount *mnt;
671         struct lustre_peer peer;
672         int err; 
673         ENTRY;
674
675
676         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); 
677         err = PTR_ERR(mnt);
678         if (IS_ERR(mnt)) { 
679                 EXIT;
680                 return err;
681         }
682
683         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
684         if (!obddev->u.mds.mds_sb) {
685                 EXIT;
686                 return -ENODEV;
687         }
688
689         mds->mds_vfsmnt = mnt;
690         obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
691
692         mds->mds_ctxt.pwdmnt = mnt;
693         mds->mds_ctxt.pwd = mnt->mnt_root;
694         mds->mds_ctxt.fs = KERNEL_DS;
695         mds->mds_remote_nid = 0;
696
697         INIT_LIST_HEAD(&mds->mds_reqs);
698         mds->mds_thread = NULL;
699         mds->mds_flags = 0;
700         mds->mds_interval = 3 * HZ;
701         MDS = mds;
702
703         spin_lock_init(&obddev->u.mds.mds_lock);
704
705         err = kportal_uuid_to_peer("self", &peer);
706         if (err == 0) {
707                 OBD_ALLOC(mds->mds_service, sizeof(*mds->mds_service));
708                 if (mds->mds_service == NULL)
709                         return -ENOMEM;
710                 mds->mds_service->srv_buf_size = 64 * 1024;
711                 //mds->mds_service->srv_buf_size = 1024;
712                 mds->mds_service->srv_portal = MDS_REQUEST_PORTAL;
713                 memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer));
714                 mds->mds_service->srv_wait_queue = &mds->mds_waitq;
715
716                 rpc_register_service(mds->mds_service, "self");
717         }
718
719         mds_start_srv_thread(mds);
720
721         MOD_INC_USE_COUNT;
722         EXIT; 
723         return 0;
724
725
726 static int mds_cleanup(struct obd_device * obddev)
727 {
728         struct super_block *sb;
729         struct mds_obd *mds = &obddev->u.mds;
730
731         ENTRY;
732
733         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
734                 EXIT;
735                 return 0;
736         }
737
738         if ( !list_empty(&obddev->obd_gen_clients) ) {
739                 CERROR("still has clients!\n");
740                 EXIT;
741                 return -EBUSY;
742         }
743
744         MDS = NULL;
745         mds_stop_srv_thread(mds);
746         rpc_unregister_service(mds->mds_service);
747         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
748
749         sb = mds->mds_sb;
750         if (!mds->mds_sb){
751                 EXIT;
752                 return 0;
753         }
754
755         if (!list_empty(&mds->mds_reqs)) {
756                 // XXX reply with errors and clean up
757                 CDEBUG(D_INODE, "Request list not empty!\n");
758         }
759
760         unlock_kernel();
761         mntput(mds->mds_vfsmnt); 
762         mds->mds_sb = 0;
763         kfree(mds->mds_fstype);
764         lock_kernel();
765
766         MOD_DEC_USE_COUNT;
767         EXIT;
768         return 0;
769 }
770
771 /* use obd ops to offer management infrastructure */
772 static struct obd_ops mds_obd_ops = {
773         o_setup:       mds_setup,
774         o_cleanup:     mds_cleanup,
775 };
776
777 static int __init mds_init(void)
778 {
779         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
780         return 0;
781 }
782
783 static void __exit mds_exit(void)
784 {
785         obd_unregister_type(LUSTRE_MDS_NAME);
786 }
787
788 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
789 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
790 MODULE_LICENSE("GPL");
791
792
793 // for testing (maybe this stays)
794 EXPORT_SYMBOL(mds_queue_req);
795
796 module_init(mds_init);
797 module_exit(mds_exit);