Whamcloud - gitweb
Add client RPC xid to per-client last_rcvd data. If the MDS dies but the
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *
6  *  Lustre Metadata Server (mds) request handler
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  *  This server is single threaded at present (but can easily be multi threaded)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/version.h>
22 #include <linux/module.h>
23 #include <linux/fs.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29
30 #define DEBUG_SUBSYSTEM S_MDS
31
32 #include <linux/lustre_mds.h>
33 #include <linux/lustre_lib.h>
34 #include <linux/lustre_net.h>
35
36 int mds_sendpage(struct ptlrpc_request *req, struct file *file,
37                  __u64 offset, struct niobuf *dst)
38 {
39         int rc = 0;
40         mm_segment_t oldfs = get_fs();
41         struct ptlrpc_bulk_desc *bulk;
42         char *buf;
43
44         bulk = ptlrpc_prep_bulk(req->rq_connection);
45         if (bulk == NULL) {
46                 rc = -ENOMEM;
47                 GOTO(out, rc);
48         }
49
50         bulk->b_xid = req->rq_reqmsg->xid;
51
52         OBD_ALLOC(buf, PAGE_SIZE);
53         if (!buf) {
54                 rc = -ENOMEM;
55                 GOTO(cleanup_bulk, rc);
56         }
57
58         set_fs(KERNEL_DS);
59         rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
60                              (loff_t *)&offset);
61         set_fs(oldfs);
62
63         if (rc != PAGE_SIZE) {
64                 rc = -EIO;
65                 GOTO(cleanup_buf, rc);
66         }
67
68         bulk->b_buf = buf;
69         bulk->b_buflen = PAGE_SIZE;
70
71         rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
72         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
73                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
74                        OBD_FAIL_MDS_SENDPAGE, rc);
75                 PtlMDUnlink(bulk->b_md_h);
76                 GOTO(cleanup_buf, rc);
77         }
78         wait_event_interruptible(bulk->b_waitq,
79                                  ptlrpc_check_bulk_sent(bulk));
80
81         if (bulk->b_flags & PTL_RPC_FL_INTR) {
82                 rc = -EINTR;
83                 GOTO(cleanup_buf, rc);
84         }
85
86         EXIT;
87  cleanup_buf:
88         OBD_FREE(buf, PAGE_SIZE);
89  cleanup_bulk:
90         ptlrpc_free_bulk(bulk);
91  out:
92         return rc;
93 }
94
95 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
96                               struct vfsmount **mnt)
97 {
98         /* stolen from NFS */
99         struct super_block *sb = mds->mds_sb;
100         unsigned long ino = fid->id;
101         __u32 generation = fid->generation;
102         struct inode *inode;
103         struct list_head *lp;
104         struct dentry *result;
105
106         if (ino == 0)
107                 return ERR_PTR(-ESTALE);
108
109         inode = iget(sb, ino);
110         if (inode == NULL)
111                 return ERR_PTR(-ENOMEM);
112
113         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
114
115         if (is_bad_inode(inode) ||
116             (generation && inode->i_generation != generation)) {
117                 /* we didn't find the right inode.. */
118                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
119                         inode->i_ino,
120                         inode->i_nlink, atomic_read(&inode->i_count),
121                         inode->i_generation,
122                         generation);
123                 LBUG();
124                 iput(inode);
125                 return ERR_PTR(-ESTALE);
126         }
127
128         /* now to find a dentry.
129          * If possible, get a well-connected one
130          */
131         if (mnt)
132                 *mnt = mds->mds_vfsmnt;
133         spin_lock(&dcache_lock);
134         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
135                 result = list_entry(lp,struct dentry, d_alias);
136                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
137                         dget_locked(result);
138                         result->d_vfs_flags |= DCACHE_REFERENCED;
139                         spin_unlock(&dcache_lock);
140                         iput(inode);
141                         if (mnt)
142                                 mntget(*mnt);
143                         return result;
144                 }
145         }
146         spin_unlock(&dcache_lock);
147         result = d_alloc_root(inode);
148         if (result == NULL) {
149                 iput(inode);
150                 return ERR_PTR(-ENOMEM);
151         }
152         if (mnt)
153                 mntget(*mnt);
154         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
155         return result;
156 }
157
158 int mds_connect(struct ptlrpc_request *req)
159 {
160         struct mds_body *body;
161         struct mds_obd *mds = &req->rq_obd->u.mds;
162         int rc, size = sizeof(*body);
163         ENTRY;
164
165         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
166         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_PACK)) {
167                 CERROR("mds: out of memory\n");
168                 req->rq_status = -ENOMEM;
169                 RETURN(0);
170         }
171
172         body = lustre_msg_buf(req->rq_reqmsg, 0);
173         mds_unpack_req_body(req); 
174         /* Anything we need to do here with the client's trans no or so? */
175
176         body = lustre_msg_buf(req->rq_repmsg, 0);
177         memcpy(&body->fid1, &mds->mds_rootfid  , sizeof(body->fid1)); 
178         mds_pack_rep_body(req);
179         RETURN(0);
180 }
181
182 int mds_getattr(struct ptlrpc_request *req)
183 {
184         struct dentry *de;
185         struct inode *inode;
186         struct mds_body *body;
187         struct mds_obd *mds = &req->rq_obd->u.mds;
188         int rc, size = sizeof(*body);
189         ENTRY;
190
191         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
192         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
193                 CERROR("mds: out of memory\n");
194                 req->rq_status = -ENOMEM;
195                 RETURN(0);
196         }
197
198         body = lustre_msg_buf(req->rq_reqmsg, 0);
199         de = mds_fid2dentry(mds, &body->fid1, NULL);
200         if (IS_ERR(de)) {
201                 req->rq_status = -ENOENT;
202                 RETURN(0);
203         }
204
205         body = lustre_msg_buf(req->rq_repmsg, 0);
206         inode = de->d_inode;
207         body->ino = inode->i_ino;
208         body->generation = inode->i_generation;
209         body->atime = inode->i_atime;
210         body->ctime = inode->i_ctime;
211         body->mtime = inode->i_mtime;
212         body->uid = inode->i_uid;
213         body->gid = inode->i_gid;
214         body->size = inode->i_size;
215         body->mode = inode->i_mode;
216         body->nlink = inode->i_nlink;
217         body->valid = ~0;
218         mds_fs_get_objid(mds, inode, &body->objid);
219         l_dput(de);
220         RETURN(0);
221 }
222
223 int mds_open(struct ptlrpc_request *req)
224 {
225         struct dentry *de;
226         struct mds_body *body;
227         struct file *file;
228         struct vfsmount *mnt;
229         __u32 flags;
230         int rc, size = sizeof(*body);
231         ENTRY;
232
233         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
234         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
235                 CERROR("mds: out of memory\n");
236                 req->rq_status = -ENOMEM;
237                 RETURN(0);
238         }
239
240         body = lustre_msg_buf(req->rq_reqmsg, 0);
241         de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
242         if (IS_ERR(de)) {
243                 req->rq_status = -ENOENT;
244                 RETURN(0);
245         }
246         flags = body->flags;
247         file = dentry_open(de, mnt, flags);
248         if (!file || IS_ERR(file)) {
249                 req->rq_status = -EINVAL;
250                 RETURN(0);
251         }
252
253         body = lustre_msg_buf(req->rq_repmsg, 0);
254         body->objid = (__u64) (unsigned long)file;
255         RETURN(0);
256 }
257
258 int mds_close(struct ptlrpc_request *req)
259 {
260         struct dentry *de;
261         struct mds_body *body;
262         struct file *file;
263         struct vfsmount *mnt;
264         int rc;
265         ENTRY;
266
267         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
268         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
269                 CERROR("mds: out of memory\n");
270                 req->rq_status = -ENOMEM;
271                 RETURN(0);
272         }
273
274         body = lustre_msg_buf(req->rq_reqmsg, 0);
275         de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
276         if (IS_ERR(de)) {
277                 req->rq_status = -ENOENT;
278                 RETURN(0);
279         }
280
281         file = (struct file *)(unsigned long)body->objid;
282         req->rq_status = filp_close(file, 0);
283         l_dput(de);
284         mntput(mnt);
285
286         RETURN(0);
287 }
288
289 int mds_readpage(struct ptlrpc_request *req)
290 {
291         struct vfsmount *mnt;
292         struct dentry *de;
293         struct file *file;
294         struct niobuf *niobuf;
295         struct mds_body *body;
296         int rc, size = sizeof(*body);
297         ENTRY;
298
299         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
300         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
301                 CERROR("mds: out of memory\n");
302                 req->rq_status = -ENOMEM;
303                 RETURN(0);
304         }
305
306         body = lustre_msg_buf(req->rq_reqmsg, 0);
307         de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
308         if (IS_ERR(de)) {
309                 req->rq_status = PTR_ERR(de);
310                 RETURN(0);
311         }
312
313         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
314
315         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
316         /* note: in case of an error, dentry_open puts dentry */
317         if (IS_ERR(file)) {
318                 req->rq_status = PTR_ERR(file);
319                 RETURN(0);
320         }
321
322         niobuf = lustre_msg_buf(req->rq_reqmsg, 1);
323         if (!niobuf) {
324                 req->rq_status = -EINVAL;
325                 LBUG();
326                 RETURN(0);
327         }
328
329         /* to make this asynchronous make sure that the handling function
330            doesn't send a reply when this function completes. Instead a
331            callback function would send the reply */
332         rc = mds_sendpage(req, file, body->size, niobuf);
333
334         filp_close(file, 0);
335         req->rq_status = rc;
336         RETURN(0);
337 }
338
339 int mds_reint(struct ptlrpc_request *req)
340 {
341         int rc;
342         struct mds_update_record rec;
343
344         rc = mds_update_unpack(req, &rec);
345         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
346                 CERROR("invalid record\n");
347                 req->rq_status = -EINVAL;
348                 RETURN(0);
349         }
350         /* rc will be used to interrupt a for loop over multiple records */
351         rc = mds_reint_rec(&rec, req);
352         return 0;
353 }
354
355 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
356                struct ptlrpc_request *req)
357 {
358         int rc;
359         ENTRY;
360
361         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
362         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
363                 CERROR("lustre_mds: Invalid request\n");
364                 GOTO(out, rc);
365         }
366
367         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
368                 CERROR("lustre_mds: wrong packet type sent %d\n",
369                        req->rq_reqmsg->type);
370                 GOTO(out, rc = -EINVAL);
371         }
372
373         switch (req->rq_reqmsg->opc) {
374         case MDS_CONNECT:
375                 CDEBUG(D_INODE, "getattr\n");
376                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
377                 rc = mds_connect(req);
378                 break;
379
380         case MDS_GETATTR:
381                 CDEBUG(D_INODE, "getattr\n");
382                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
383                 rc = mds_getattr(req);
384                 break;
385
386         case MDS_READPAGE:
387                 CDEBUG(D_INODE, "readpage\n");
388                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
389                 rc = mds_readpage(req);
390
391                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
392                         return 0;
393                 break;
394
395         case MDS_REINT:
396                 CDEBUG(D_INODE, "reint\n");
397                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
398                 rc = mds_reint(req);
399                 break;
400
401         case MDS_OPEN:
402                 CDEBUG(D_INODE, "open\n");
403                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
404                 rc = mds_open(req);
405                 break;
406
407         case MDS_CLOSE:
408                 CDEBUG(D_INODE, "close\n");
409                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
410                 rc = mds_close(req);
411                 break;
412
413         default:
414                 rc = ptlrpc_error(svc, req);
415                 RETURN(rc);
416         }
417
418         EXIT;
419 out:
420         if (rc) {
421                 ptlrpc_error(svc, req);
422         } else {
423                 CDEBUG(D_NET, "sending reply\n");
424                 ptlrpc_reply(svc, req);
425         }
426
427         return 0;
428 }
429
430 /* This will be a hash table at some point. */
431 int mds_init_client_data(struct mds_obd *mds)
432 {
433         if (mds->mds_client_info)
434                 LBUG();
435
436         OBD_ALLOC(mds->mds_client_info,
437                   MDS_CLIENT_SLOTS * sizeof(struct mds_client_info));
438
439         if (!mds->mds_client_info)
440                 return -ENOMEM;
441
442         return 0;
443 }
444
445 /* Add client data to the MDS.  This will be a hash at some point. */
446 int mds_add_client(struct mds_obd *mds, struct mds_client_data *mcd, loff_t off)
447 {
448         int num = mds->mds_client_count;
449
450         if (num >= MDS_CLIENT_SLOTS) {
451                 CERROR("too many clients for current MDS config - fix code\n");
452                 return -ENOMEM;
453         }
454
455         /* For now we cop-out and just put the clients in a list */
456         mds->mds_client_info[num].mci_mcd = mcd;
457         mds->mds_client_info[num].mci_off = off; /* in last_rcvd on disk */
458
459         mds->mds_client_count++;
460
461         return 0;
462 }
463
464 int mds_free_client_data(struct mds_obd *mds)
465 {
466         struct mds_client_info *mci = mds->mds_client_info;
467         int i;
468
469         if (!mci)
470                 RETURN(0);
471
472         for (i = 0; i < mds->mds_client_count; i++, mci++) {
473                 OBD_FREE(mci->mci_mcd, sizeof(*mci->mci_mcd));
474                 mci->mci_mcd = NULL;
475         }
476
477         OBD_FREE(mds->mds_client_info, MDS_CLIENT_SLOTS * sizeof(*mci));
478         mds->mds_client_info = NULL;
479
480         return 0;
481 }
482
483 int mds_free_server_data(struct mds_obd *mds)
484 {
485         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
486         mds->mds_server_data = NULL;
487
488         return 0;
489 }
490
491 #define LAST_RCVD "last_rcvd"
492
493 int mds_read_last_rcvd(struct mds_obd *mds, struct file *f)
494 {
495         struct mds_server_data *msd;
496         struct mds_client_data *mcd = NULL;
497         loff_t fsize = f->f_dentry->d_inode->i_size;
498         loff_t off = 0, cl_off;
499         __u64 last_rcvd = 0;
500         __u64 last_mount;
501         int rc = 0;
502
503         OBD_ALLOC(msd, sizeof(*msd));
504         if (!msd)
505                 RETURN(-ENOMEM);
506         rc = lustre_fread(f, (char *)msd, sizeof(*msd), &off);
507
508         mds->mds_server_data = msd;
509         if (rc == 0) {
510                 CERROR("empty MDS %s, new MDS?\n", LAST_RCVD);
511                 RETURN(0);
512         } else if (rc != sizeof(*msd)) {
513                 CERROR("error reading MDS %s: rc = %d\n", LAST_RCVD, rc);
514                 if (rc > 0) {
515                         rc = -EIO;
516                 }
517                 GOTO(err_msd, rc);
518         }
519
520         last_rcvd = le64_to_cpu(msd->msd_last_rcvd);
521         mds->mds_last_rcvd = last_rcvd;
522         CDEBUG(D_INODE, "got %Ld for server last_rcvd value\n",
523                (unsigned long long)last_rcvd);
524
525         last_mount = le64_to_cpu(msd->msd_mount_count);
526         mds->mds_mount_count = last_mount;
527         CDEBUG(D_INODE, "got %Ld for server last_mount value\n",
528                (unsigned long long)last_rcvd);
529
530         for (off = cl_off = MDS_LR_CLIENT, rc = sizeof(*mcd);
531              off <= fsize - sizeof(*mcd) && rc == sizeof(*mcd);
532              off = cl_off + MDS_LR_SIZE) {
533                 if (!mcd)
534                         OBD_ALLOC(mcd, sizeof(*mcd));
535                 if (!mcd)
536                         GOTO(err_msd, rc = -ENOMEM);
537
538                 rc = lustre_fread(f, (char *)mcd, sizeof(*mcd), &off);
539                 if (rc != sizeof(*mcd)) {
540                         CERROR("error reading MDS %s offset %Ld: rc = %d\n",
541                                LAST_RCVD, (unsigned long long)cl_off, rc);
542                         if (rc > 0)
543                                 rc = -EIO;
544                         break;
545                 }
546
547                 last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
548                 last_mount = le64_to_cpu(mcd->mcd_mount_count);
549
550                 if (last_rcvd &&
551                     last_mount - mcd->mcd_mount_count < MDS_MOUNT_RECOV) {
552                         rc = mds_add_client(mds, mcd, cl_off);
553                         if (rc) {
554                                 rc = 0;
555                                 break;
556                         }
557                         mcd = NULL;
558                 }
559
560                 if (last_rcvd > mds->mds_last_rcvd) {
561                         CDEBUG(D_OTHER,
562                                "client at offset %Ld has last_rcvd = %Ld\n",
563                                (unsigned long long)cl_off,
564                                (unsigned long long)last_rcvd);
565                         mds->mds_last_rcvd = last_rcvd;
566                 }
567         }
568         CDEBUG(D_INODE, "got %Ld for highest last_rcvd value, %d clients\n",
569                (unsigned long long)mds->mds_last_rcvd, mds->mds_client_count);
570
571         return 0;
572
573 err_msd:
574         mds_free_server_data(mds);
575         return rc;
576 }
577
578 static int mds_prep(struct obd_device *obddev)
579 {
580         struct obd_run_ctxt saved;
581         struct mds_obd *mds = &obddev->u.mds;
582         struct super_operations *s_ops;
583         struct file *f;
584         int rc;
585
586         push_ctxt(&saved, &mds->mds_ctxt);
587         rc = simple_mkdir(current->fs->pwd, "ROOT", 0700);
588         if (rc && rc != -EEXIST) {
589                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
590                 GOTO(err_pop, rc);
591         }
592         f = filp_open("ROOT", O_RDONLY, 0);
593         if (IS_ERR(f)) {
594                 rc = PTR_ERR(f);
595                 CERROR("cannot open ROOT: rc = %d\n", rc);
596                 LBUG();
597                 GOTO(err_pop, rc);
598         }
599
600         mds->mds_rootfid.id = f->f_dentry->d_inode->i_ino;
601         mds->mds_rootfid.generation = f->f_dentry->d_inode->i_generation;
602         mds->mds_rootfid.f_type = S_IFDIR;
603
604         rc = filp_close(f, 0);
605         if (rc) {
606                 CERROR("cannot close ROOT: rc = %d\n", rc);
607                 LBUG();
608         }
609
610         rc = simple_mkdir(current->fs->pwd, "FH", 0700);
611         if (rc && rc != -EEXIST) {
612                 CERROR("cannot create FH directory: rc = %d\n", rc);
613                 GOTO(err_pop, rc);
614         }
615
616         rc = mds_init_client_data(mds);
617         if (rc)
618                 GOTO(err_pop, rc);
619
620         f = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
621         if (IS_ERR(f)) {
622                 rc = PTR_ERR(f);
623                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
624                 GOTO(err_pop, rc = PTR_ERR(f));
625         }
626         if (!S_ISREG(f->f_dentry->d_inode->i_mode)) {
627                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
628                        f->f_dentry->d_inode->i_mode);
629                 GOTO(err_pop, rc = -ENOENT);
630         }
631
632         rc = mds_fs_journal_data(mds, f->f_dentry->d_inode, f);
633         if (rc) {
634                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
635                 GOTO(err_filp, rc);
636         }
637
638         rc = mds_read_last_rcvd(mds, f);
639         if (rc) {
640                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
641                 GOTO(err_client, rc);
642         }
643         mds->mds_rcvd_filp = f;
644         pop_ctxt(&saved);
645
646         /*
647          * Replace the client filesystem delete_inode method with our own,
648          * so that we can clear the object ID before the inode is deleted.
649          * The fs_delete_inode method will call cl_delete_inode for us.
650          *
651          * We need to do this for the MDS superblock only, hence we install
652          * a modified copy of the original superblock method table.
653          *
654          * We still assume that there is only a single MDS client filesystem
655          * type, as we don't have access to the mds struct in delete_inode
656          * and store the client delete_inode method in a global table.  This
657          * will only become a problem when multiple MDSs are running on a
658          * single host with different client filesystems.
659          */
660         OBD_ALLOC(s_ops, sizeof(*s_ops));
661         if (!s_ops)
662                 GOTO(err_filp, rc = -ENOMEM);
663
664         memcpy(s_ops, mds->mds_sb->s_op, sizeof(*s_ops));
665         mds->mds_fsops->cl_delete_inode = s_ops->delete_inode;
666         s_ops->delete_inode = mds->mds_fsops->fs_delete_inode;
667         mds->mds_sb->s_op = s_ops;
668
669         mds->mds_service = ptlrpc_init_svc(128 * 1024,
670                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
671                                            "self", mds_handle);
672
673         if (!mds->mds_service) {
674                 CERROR("failed to start service\n");
675                 GOTO(err_filp, rc = -EINVAL);
676         }
677
678         rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
679         if (rc) {
680                 CERROR("cannot start thread: rc = %d\n", rc);
681                 GOTO(err_svc, rc);
682         }
683
684         RETURN(0);
685
686 err_svc:
687         rpc_unregister_service(mds->mds_service);
688         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
689 err_client:
690         mds_free_client_data(mds);
691 err_filp:
692         if (filp_close(f, 0))
693                 CERROR("can't close %s after error\n", LAST_RCVD);
694 err_pop:
695         pop_ctxt(&saved);
696
697         return rc;
698 }
699
700 /* Update the server data on disk. */
701 int mds_update_server_data(struct mds_obd *mds)
702 {
703         struct obd_run_ctxt saved;
704         struct mds_server_data *msd = mds->mds_server_data;
705         loff_t off = 0;
706         int rc;
707
708         msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
709         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
710
711         CDEBUG(D_SUPER, "MDS mount_count is %Ld\n",
712                (unsigned long long)mds->mds_mount_count);
713         push_ctxt(&saved, &mds->mds_ctxt);
714         rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)msd, sizeof(*msd), &off);
715         pop_ctxt(&saved);
716         if (rc != sizeof(*msd)) {
717                 CERROR("error writing MDS server data: rc = %d\n", rc);
718                 if (rc > 0)
719                         RETURN(-EIO);
720                 RETURN(rc);
721         }
722
723         return 0;
724 }
725
726 /* Do recovery actions for the MDS */
727 static int mds_recover(struct obd_device *obddev)
728 {
729         struct mds_obd *mds = &obddev->u.mds;
730         int rc;
731
732         /* This happens at the end when recovery is complete */
733         ++mds->mds_mount_count;
734         rc = mds_update_server_data(mds);
735
736         return rc;
737 }
738
739 static int mds_cleanup(struct obd_device *obddev);
740
741 /* mount the file system (secretly) */
742 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
743 {
744         struct obd_ioctl_data* data = buf;
745         struct mds_obd *mds = &obddev->u.mds;
746         struct vfsmount *mnt;
747         int rc = 0;
748         ENTRY;
749
750 #ifdef CONFIG_DEV_RDONLY
751         dev_clear_rdonly(2);
752 #endif
753         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
754                 RETURN(-EINVAL);
755
756         mds->mds_fstype = strdup(data->ioc_inlbuf2);
757
758         if (!strcmp(mds->mds_fstype, "ext3"))
759                 mds->mds_fsops = &mds_ext3_fs_ops;
760         else if (!strcmp(mds->mds_fstype, "ext2"))
761                 mds->mds_fsops = &mds_ext2_fs_ops;
762         else {
763                 CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype);
764                 GOTO(err_kfree, rc = -EPERM);
765         }
766
767         MOD_INC_USE_COUNT;
768         mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
769         if (IS_ERR(mnt)) {
770                 rc = PTR_ERR(mnt);
771                 CERROR("do_kern_mount failed: rc = %d\n", rc);
772                 GOTO(err_dec, rc);
773         }
774
775         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
776         if (!mds->mds_sb)
777                 GOTO(err_put, rc = -ENODEV);
778
779         mds->mds_vfsmnt = mnt;
780         mds->mds_ctxt.pwdmnt = mnt;
781         mds->mds_ctxt.pwd = mnt->mnt_root;
782         mds->mds_ctxt.fs = KERNEL_DS;
783
784         rc = mds_prep(obddev);
785         if (rc)
786                 GOTO(err_put, rc);
787
788         rc = mds_recover(obddev);
789         if (rc) {
790                 mds_cleanup(obddev);
791                 RETURN(rc);
792         }
793
794         RETURN(0);
795
796 err_put:
797         unlock_kernel();
798         mntput(mds->mds_vfsmnt);
799         mds->mds_sb = 0;
800         lock_kernel();
801 err_dec:
802         MOD_DEC_USE_COUNT;
803 err_kfree:
804         kfree(mds->mds_fstype);
805         return rc;
806 }
807
808 static int mds_cleanup(struct obd_device * obddev)
809 {
810         struct super_operations *s_ops = NULL;
811         struct super_block *sb;
812         struct mds_obd *mds = &obddev->u.mds;
813
814         ENTRY;
815
816         if ( !list_empty(&obddev->obd_gen_clients) ) {
817                 CERROR("still has clients!\n");
818                 RETURN(-EBUSY);
819         }
820
821         ptlrpc_stop_thread(mds->mds_service);
822         rpc_unregister_service(mds->mds_service);
823         if (!list_empty(&mds->mds_service->srv_reqs)) {
824                 // XXX reply with errors and clean up
825                 CERROR("Request list not empty!\n");
826         }
827         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
828
829         sb = mds->mds_sb;
830         if (!mds->mds_sb)
831                 RETURN(0);
832
833         mds_free_client_data(mds);
834         mds_update_server_data(mds);
835         mds_free_server_data(mds);
836
837         if (mds->mds_rcvd_filp) {
838                 int rc = filp_close(mds->mds_rcvd_filp, 0);
839                 mds->mds_rcvd_filp = NULL;
840
841                 if (rc)
842                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
843         }
844         s_ops = sb->s_op;
845
846         unlock_kernel();
847         mntput(mds->mds_vfsmnt);
848         mds->mds_sb = 0;
849         kfree(mds->mds_fstype);
850         lock_kernel();
851 #ifdef CONFIG_DEV_RDONLY
852         dev_clear_rdonly(2);
853 #endif
854         OBD_FREE(s_ops, sizeof(*s_ops));
855
856         MOD_DEC_USE_COUNT;
857         RETURN(0);
858 }
859
860 /* use obd ops to offer management infrastructure */
861 static struct obd_ops mds_obd_ops = {
862         o_setup:       mds_setup,
863         o_cleanup:     mds_cleanup,
864 };
865
866 static int __init mds_init(void)
867 {
868         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
869         return 0;
870 }
871
872 static void __exit mds_exit(void)
873 {
874         obd_unregister_type(LUSTRE_MDS_NAME);
875 }
876
877 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
878 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
879 MODULE_LICENSE("GPL");
880
881 module_init(mds_init);
882 module_exit(mds_exit);