Whamcloud - gitweb
Remove no-longer-needed inode operations (they previously had extN EA VFS
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *
6  *  Lustre Metadata Server (mds) request handler
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  *  This server is single threaded at present (but can easily be multi threaded)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_MDS
21
22 #include <linux/module.h>
23 #include <linux/lustre_mds.h>
24
25 static
26 int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset)
27 {
28         int rc = 0;
29         mm_segment_t oldfs = get_fs();
30         struct ptlrpc_bulk_desc *desc;
31         struct ptlrpc_bulk_page *bulk;
32         char *buf;
33         ENTRY;
34
35         desc = ptlrpc_prep_bulk(req->rq_connection);
36         if (desc == NULL)
37                 GOTO(out, rc = -ENOMEM);
38
39         bulk = ptlrpc_prep_bulk_page(desc);
40         if (bulk == NULL)
41                 GOTO(cleanup_bulk, rc = -ENOMEM);
42
43         OBD_ALLOC(buf, PAGE_SIZE);
44         if (buf == NULL)
45                 GOTO(cleanup_bulk, rc = -ENOMEM);
46
47         set_fs(KERNEL_DS);
48         rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
49                              (loff_t *)&offset);
50         set_fs(oldfs);
51
52         if (rc != PAGE_SIZE)
53                 GOTO(cleanup_buf, rc = -EIO);
54
55         bulk->b_xid = req->rq_reqmsg->xid;
56         bulk->b_buf = buf;
57         bulk->b_buflen = PAGE_SIZE;
58         desc->b_portal = MDS_BULK_PORTAL;
59
60         rc = ptlrpc_send_bulk(desc);
61         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
62                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
63                        OBD_FAIL_MDS_SENDPAGE, rc);
64                 ptlrpc_abort_bulk(desc);
65                 GOTO(cleanup_buf, rc);
66         }
67
68         EXIT;
69  cleanup_buf:
70         OBD_FREE(buf, PAGE_SIZE);
71  cleanup_bulk:
72         ptlrpc_free_bulk(desc);
73  out:
74         return rc;
75 }
76
77 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
78                               struct vfsmount **mnt)
79 {
80         /* stolen from NFS */
81         struct super_block *sb = mds->mds_sb;
82         unsigned long ino = fid->id;
83         __u32 generation = fid->generation;
84         struct inode *inode;
85         struct list_head *lp;
86         struct dentry *result;
87
88         if (ino == 0)
89                 RETURN(ERR_PTR(-ESTALE));
90
91         inode = iget(sb, ino);
92         if (inode == NULL)
93                 RETURN(ERR_PTR(-ENOMEM));
94
95         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
96
97         if (is_bad_inode(inode) ||
98             (generation && inode->i_generation != generation)) {
99                 /* we didn't find the right inode.. */
100                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
101                        inode->i_ino, inode->i_nlink,
102                        atomic_read(&inode->i_count), inode->i_generation,
103                        generation);
104                 LBUG();
105                 iput(inode);
106                 RETURN(ERR_PTR(-ESTALE));
107         }
108
109         /* now to find a dentry.
110          * If possible, get a well-connected one
111          */
112         if (mnt)
113                 *mnt = mds->mds_vfsmnt;
114         spin_lock(&dcache_lock);
115         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
116                 result = list_entry(lp,struct dentry, d_alias);
117                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
118                         dget_locked(result);
119                         result->d_vfs_flags |= DCACHE_REFERENCED;
120                         spin_unlock(&dcache_lock);
121                         iput(inode);
122                         if (mnt)
123                                 mntget(*mnt);
124                         return result;
125                 }
126         }
127         spin_unlock(&dcache_lock);
128         result = d_alloc_root(inode);
129         if (result == NULL) {
130                 iput(inode);
131                 return ERR_PTR(-ENOMEM);
132         }
133         if (mnt)
134                 mntget(*mnt);
135         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
136         return result;
137 }
138
139 static
140 int mds_connect(struct ptlrpc_request *req)
141 {
142         struct mds_body *body;
143         struct mds_obd *mds = &req->rq_obd->u.mds;
144         struct mds_client_info *mci;
145         struct mds_client_data *mcd;
146         int rc, size = sizeof(*body);
147         ENTRY;
148
149         CDEBUG(D_INFO, "MDS connect from UUID '%s'\n", ptlrpc_req_to_uuid(req));
150         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
151         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_PACK)) {
152                 CERROR("mds: out of memory for message: size=%d\n", size);
153                 req->rq_status = -ENOMEM;
154                 RETURN(0);
155         }
156
157         body = lustre_msg_buf(req->rq_reqmsg, 0);
158         mds_unpack_req_body(req);
159         /* Anything we need to do here with the client's trans no or so? */
160
161         body = lustre_msg_buf(req->rq_repmsg, 0);
162         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
163
164         mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
165         if (!mci) {
166                 /* We don't have any old connection data for this client */
167                 int rc;
168
169                 CDEBUG(D_INFO, "allocating new client data for UUID '%s'",
170                        ptlrpc_req_to_uuid(req));
171
172                 OBD_ALLOC(mcd, sizeof(*mcd));
173                 if (!mcd) {
174                         CERROR("mds: out of memory for client data\n");
175                         req->rq_status = -ENOMEM;
176                         RETURN(0);
177                 }
178                 rc = mds_client_add(mds, mcd, -1);
179                 if (rc) {
180                         req->rq_status = rc;
181                         RETURN(0);
182                 }
183         } else {
184                 /* We have old connection data for this client... */
185                 mcd = mci->mci_mcd;
186                 CDEBUG(D_INFO, "found existing data for UUID '%s' at #%d\n",
187                        mcd->mcd_uuid, mci->mci_off);
188         }
189         /* mcd_last_xid is is stored in little endian on the disk and 
190            mds_pack_rep_body converts it to network order */
191         body->last_xid = le32_to_cpu(mcd->mcd_last_xid);
192         mds_pack_rep_body(req);
193         RETURN(0);
194 }
195
196 static
197 int mds_getattr(struct ptlrpc_request *req)
198 {
199         struct dentry *de;
200         struct inode *inode;
201         struct mds_body *body;
202         struct mds_obd *mds = &req->rq_obd->u.mds;
203         int rc, size[2] = {sizeof(*body)}, count = 1;
204         ENTRY;
205
206         body = lustre_msg_buf(req->rq_reqmsg, 0);
207         de = mds_fid2dentry(mds, &body->fid1, NULL);
208         if (IS_ERR(de)) {
209                 req->rq_status = -ENOENT;
210                 RETURN(0);
211         }
212         inode = de->d_inode;
213         if (body->valid & OBD_MD_LINKNAME) {
214                 count = 2;
215                 size[1] = inode->i_size;
216         }
217
218         rc = lustre_pack_msg(count, size, NULL, &req->rq_replen,
219                              &req->rq_repmsg);
220         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
221                 CERROR("mds: out of memory\n");
222                 req->rq_status = -ENOMEM;
223                 GOTO(out, 0);
224         }
225
226         if (body->valid & OBD_MD_LINKNAME) {
227                 char *tmp = lustre_msg_buf(req->rq_repmsg, 1);
228                 mm_segment_t oldfs;
229
230                 oldfs = get_fs();
231                 set_fs(KERNEL_DS);
232                 rc = inode->i_op->readlink(de, tmp, size[1]);
233                 set_fs(oldfs);
234
235                 if (rc < 0) {
236                         req->rq_status = rc;
237                         CERROR("readlink failed: %d\n", rc);
238                         GOTO(out, 0);
239                 }
240         }
241
242         body = lustre_msg_buf(req->rq_repmsg, 0);
243         body->ino = inode->i_ino;
244         body->generation = inode->i_generation;
245         body->atime = inode->i_atime;
246         body->ctime = inode->i_ctime;
247         body->mtime = inode->i_mtime;
248         body->uid = inode->i_uid;
249         body->gid = inode->i_gid;
250         body->size = inode->i_size;
251         body->mode = inode->i_mode;
252         body->nlink = inode->i_nlink;
253         if (S_ISREG(inode->i_mode)) {
254                 rc = mds_fs_get_objid(mds, inode, &body->objid);
255                 if (rc < 0) {
256                         req->rq_status = rc;
257                         CERROR("readlink failed: %d\n", rc);
258                         GOTO(out, 0);
259                 }
260         }
261         body->valid = ~0; /* FIXME: should be more selective */
262  out:
263         l_dput(de);
264         RETURN(0);
265 }
266
267 static
268 int mds_open(struct ptlrpc_request *req)
269 {
270         struct mds_obd *mds = &req->rq_obd->u.mds;
271         struct dentry *de;
272         struct mds_body *body;
273         struct file *file;
274         struct vfsmount *mnt;
275         struct mds_client_info *mci;
276         __u32 flags;
277         struct list_head *tmp;
278         struct mds_file_data *mfd;
279         int rc, size = sizeof(*body);
280         ENTRY;
281
282
283         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
284         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
285                 CERROR("mds: out of memory\n");
286                 req->rq_status = -ENOMEM;
287                 RETURN(0);
288         }
289
290
291         mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
292         if (!mci) { 
293                 CERROR("mds: no mci!\n");
294                 req->rq_status = -ENOTCONN;
295                 RETURN(0);
296         }
297
298         body = lustre_msg_buf(req->rq_reqmsg, 0);
299
300         /* was this animal open already? */
301         list_for_each(tmp, &mci->mci_open_head) { 
302                 struct mds_file_data *fd;
303                 fd = list_entry(tmp, struct mds_file_data, mfd_list);
304                 if (body->objid == fd->mfd_clientfd && 
305                     body->fid1.id == fd->mfd_file->f_dentry->d_inode->i_ino) { 
306                         CERROR("Re opening %Ld\n", body->fid1.id);
307                         RETURN(0);
308                 }
309         }
310
311         OBD_ALLOC(mfd, sizeof(*mfd));
312         if (!mfd) { 
313                 CERROR("mds: out of memory\n");
314                 req->rq_status = -ENOMEM;
315                 RETURN(0);
316         }
317
318         de = mds_fid2dentry(mds, &body->fid1, &mnt);
319         if (IS_ERR(de)) {
320                 req->rq_status = -ENOENT;
321                 RETURN(0);
322         }
323
324         flags = body->flags;
325         file = dentry_open(de, mnt, flags);
326         if (!file || IS_ERR(file)) {
327                 req->rq_status = -EINVAL;
328                 OBD_FREE(mfd, sizeof(*mfd));
329                 RETURN(0);
330         }
331
332         file->private_data = mfd;
333         mfd->mfd_file = file;
334         mfd->mfd_clientfd = body->objid;
335         list_add(&mfd->mfd_list, &mci->mci_open_head); 
336
337         body = lustre_msg_buf(req->rq_repmsg, 0);
338         body->objid = (__u64) (unsigned long)file;
339         RETURN(0);
340 }
341
342 static
343 int mds_close(struct ptlrpc_request *req)
344 {
345         struct dentry *de;
346         struct mds_body *body;
347         struct file *file;
348         struct vfsmount *mnt;
349         struct mds_file_data *mfd;
350         int rc;
351         ENTRY;
352
353         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
354         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
355                 CERROR("mds: out of memory\n");
356                 req->rq_status = -ENOMEM;
357                 RETURN(0);
358         }
359
360         body = lustre_msg_buf(req->rq_reqmsg, 0);
361         de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
362         if (IS_ERR(de)) {
363                 req->rq_status = -ENOENT;
364                 RETURN(0);
365         }
366
367         file = (struct file *)(unsigned long)body->objid;
368         if (!file->f_dentry) 
369                 LBUG();
370         mfd = (struct mds_file_data *)file->private_data;
371         list_del(&mfd->mfd_list);
372         OBD_FREE(mfd, sizeof(*mfd));
373
374         req->rq_status = filp_close(file, 0);
375         l_dput(de);
376         mntput(mnt);
377
378         RETURN(0);
379 }
380
381 int mds_readpage(struct ptlrpc_request *req)
382 {
383         struct vfsmount *mnt;
384         struct dentry *de;
385         struct file *file;
386         struct mds_body *body;
387         int rc, size = sizeof(*body);
388         ENTRY;
389
390         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
391         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
392                 CERROR("mds: out of memory\n");
393                 req->rq_status = -ENOMEM;
394                 RETURN(0);
395         }
396
397         body = lustre_msg_buf(req->rq_reqmsg, 0);
398         de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
399         if (IS_ERR(de)) {
400                 req->rq_status = PTR_ERR(de);
401                 RETURN(0);
402         }
403
404         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
405
406         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
407         /* note: in case of an error, dentry_open puts dentry */
408         if (IS_ERR(file)) {
409                 req->rq_status = PTR_ERR(file);
410                 RETURN(0);
411         }
412
413         /* to make this asynchronous make sure that the handling function
414            doesn't send a reply when this function completes. Instead a
415            callback function would send the reply */
416         rc = mds_sendpage(req, file, body->size);
417
418         filp_close(file, 0);
419         req->rq_status = rc;
420         RETURN(0);
421 }
422
423 int mds_reint(struct ptlrpc_request *req)
424 {
425         int rc;
426         struct mds_update_record rec;
427
428         rc = mds_update_unpack(req, &rec);
429         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
430                 CERROR("invalid record\n");
431                 req->rq_status = -EINVAL;
432                 RETURN(0);
433         }
434         /* rc will be used to interrupt a for loop over multiple records */
435         rc = mds_reint_rec(&rec, req);
436         return 0;
437 }
438
439 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
440                struct ptlrpc_request *req)
441 {
442         struct mds_obd *mds = &req->rq_obd->u.mds;
443         int rc;
444         ENTRY;
445
446         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
447         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
448                 CERROR("lustre_mds: Invalid request\n");
449                 GOTO(out, rc);
450         }
451
452         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
453                 CERROR("lustre_mds: wrong packet type sent %d\n",
454                        req->rq_reqmsg->type);
455                 GOTO(out, rc = -EINVAL);
456         }
457
458         switch (req->rq_reqmsg->opc) {
459         case MDS_CONNECT:
460                 CDEBUG(D_INODE, "getattr\n");
461                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
462                 rc = mds_connect(req);
463                 break;
464
465         case MDS_GETATTR:
466                 CDEBUG(D_INODE, "getattr\n");
467                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
468                 rc = mds_getattr(req);
469                 break;
470
471         case MDS_READPAGE:
472                 CDEBUG(D_INODE, "readpage\n");
473                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
474                 rc = mds_readpage(req);
475
476                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
477                         return 0;
478                 break;
479
480         case MDS_REINT:
481                 CDEBUG(D_INODE, "reint\n");
482                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
483                 rc = mds_reint(req);
484                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
485                 break;
486
487         case MDS_OPEN:
488                 CDEBUG(D_INODE, "open\n");
489                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
490                 rc = mds_open(req);
491                 break;
492
493         case MDS_CLOSE:
494                 CDEBUG(D_INODE, "close\n");
495                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
496                 rc = mds_close(req);
497                 break;
498
499         default:
500                 rc = ptlrpc_error(svc, req);
501                 RETURN(rc);
502         }
503
504         EXIT;
505 out:
506         /* Still not 100% sure whether we should reply with the server
507          * last_rcvd or that of this client.  I'm not sure it even makes
508          * a difference on a per-client basis, because last_rcvd is global
509          * and we are not supposed to allow transactions while in recovery.
510          */
511         req->rq_repmsg->last_rcvd = HTON__u64(mds->mds_last_rcvd);
512         req->rq_repmsg->last_committed = HTON__u64(mds->mds_last_committed);
513         CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu, xid %d\n",
514                (unsigned long long)mds->mds_last_rcvd,
515                (unsigned long long)mds->mds_last_committed, 
516                cpu_to_le32(req->rq_reqmsg->xid));
517         if (rc) {
518                 ptlrpc_error(svc, req);
519         } else {
520                 CDEBUG(D_NET, "sending reply\n");
521                 ptlrpc_reply(svc, req);
522         }
523
524         return 0;
525 }
526
527 /* Update the server data on disk.  This stores the new mount_count and
528  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
529  * then the server last_rcvd value may be less than that of the clients.
530  * This will alert us that we may need to do client recovery.
531  */
532 static
533 int mds_update_server_data(struct mds_obd *mds)
534 {
535         struct obd_run_ctxt saved;
536         struct mds_server_data *msd = mds->mds_server_data;
537         struct file *filp = mds->mds_rcvd_filp;
538         loff_t off = 0;
539         int rc;
540
541         msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
542         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
543
544         CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_rcvd is %Lu\n",
545                (unsigned long long)mds->mds_mount_count,
546                (unsigned long long)mds->mds_last_rcvd);
547         push_ctxt(&saved, &mds->mds_ctxt);
548         rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
549         if (rc != sizeof(*msd)) {
550                 CERROR("error writing MDS server data: rc = %d\n", rc);
551                 if (rc > 0)
552                         RETURN(-EIO);
553                 RETURN(rc);
554         }
555         rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
556         pop_ctxt(&saved);
557         if (rc)
558                 CERROR("error flushing MDS server data: rc = %d\n", rc);
559
560         return 0;
561 }
562
563 /* Do recovery actions for the MDS */
564 static int mds_recover(struct obd_device *obddev)
565 {
566         struct mds_obd *mds = &obddev->u.mds;
567         int rc;
568
569         /* This happens at the end when recovery is complete */
570         ++mds->mds_mount_count;
571         rc = mds_update_server_data(mds);
572
573         return rc;
574 }
575
576 /* mount the file system (secretly) */
577 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
578 {
579         struct obd_ioctl_data* data = buf;
580         struct mds_obd *mds = &obddev->u.mds;
581         struct vfsmount *mnt;
582         int rc = 0;
583         ENTRY;
584
585         MOD_INC_USE_COUNT;
586 #ifdef CONFIG_DEV_RDONLY
587         dev_clear_rdonly(2);
588 #endif
589         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
590                 GOTO(err_dec, rc = -EINVAL);
591
592         mds->mds_fstype = strdup(data->ioc_inlbuf2);
593
594         mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
595         if (IS_ERR(mnt)) {
596                 rc = PTR_ERR(mnt);
597                 CERROR("do_kern_mount failed: rc = %d\n", rc);
598                 GOTO(err_kfree, rc);
599         }
600
601         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
602         if (!mds->mds_sb)
603                 GOTO(err_put, rc = -ENODEV);
604
605         rc = mds_fs_setup(mds, mnt);
606         if (rc) {
607                 CERROR("MDS filesystem method init failed: rc = %d\n", rc);
608                 GOTO(err_put, rc);
609         }
610
611         mds->mds_service = ptlrpc_init_svc(128 * 1024,
612                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,                                           "self", mds_handle);
613         if (!mds->mds_service) {
614                 CERROR("failed to start service\n");
615                 GOTO(err_fs, rc = -EINVAL);
616         }
617
618         rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
619         if (rc) {
620                 CERROR("cannot start thread: rc = %d\n", rc);
621                 GOTO(err_svc, rc);
622         }
623
624         rc = mds_recover(obddev);
625         if (rc)
626                 GOTO(err_thread, rc);
627
628         RETURN(0);
629
630 err_thread:
631         ptlrpc_stop_all_threads(mds->mds_service);
632 err_svc:
633         rpc_unregister_service(mds->mds_service);
634         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
635 err_fs:
636         mds_fs_cleanup(mds);
637 err_put:
638         unlock_kernel();
639         mntput(mds->mds_vfsmnt);
640         mds->mds_sb = 0;
641         lock_kernel();
642 err_kfree:
643         kfree(mds->mds_fstype);
644 err_dec:
645         MOD_DEC_USE_COUNT;
646         return rc;
647 }
648
649 static int mds_cleanup(struct obd_device * obddev)
650 {
651         struct super_block *sb;
652         struct mds_obd *mds = &obddev->u.mds;
653
654         ENTRY;
655
656         if ( !list_empty(&obddev->obd_gen_clients) ) {
657                 CERROR("still has clients!\n");
658                 RETURN(-EBUSY);
659         }
660
661         ptlrpc_stop_all_threads(mds->mds_service);
662         rpc_unregister_service(mds->mds_service);
663         if (!list_empty(&mds->mds_service->srv_reqs)) {
664                 // XXX reply with errors and clean up
665                 CERROR("Request list not empty!\n");
666         }
667         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
668
669         sb = mds->mds_sb;
670         if (!mds->mds_sb)
671                 RETURN(0);
672
673         mds_update_server_data(mds);
674
675         if (mds->mds_rcvd_filp) {
676                 int rc = filp_close(mds->mds_rcvd_filp, 0);
677                 mds->mds_rcvd_filp = NULL;
678
679                 if (rc)
680                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
681         }
682
683         unlock_kernel();
684         mntput(mds->mds_vfsmnt);
685         mds->mds_sb = 0;
686         kfree(mds->mds_fstype);
687         lock_kernel();
688 #ifdef CONFIG_DEV_RDONLY
689         dev_clear_rdonly(2);
690 #endif
691         mds_fs_cleanup(mds);
692
693         MOD_DEC_USE_COUNT;
694         RETURN(0);
695 }
696
697 /* use obd ops to offer management infrastructure */
698 static struct obd_ops mds_obd_ops = {
699         o_setup:       mds_setup,
700         o_cleanup:     mds_cleanup,
701 };
702
703 static int __init mds_init(void)
704 {
705         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
706         return 0;
707 }
708
709 static void __exit mds_exit(void)
710 {
711         obd_unregister_type(LUSTRE_MDS_NAME);
712 }
713
714 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
715 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
716 MODULE_LICENSE("GPL");
717
718 module_init(mds_init);
719 module_exit(mds_exit);