Whamcloud - gitweb
4af1f85f48272fdd9f82367629a7b0b54aecec95
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *
6  *  Lustre Metadata Server (mds) request handler
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  *  This server is single threaded at present (but can easily be multi threaded)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/version.h>
22 #include <linux/module.h>
23 #include <linux/fs.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29
30 #define DEBUG_SUBSYSTEM S_MDS
31
32 #include <linux/lustre_mds.h>
33 #include <linux/lustre_lib.h>
34 #include <linux/lustre_net.h>
35
36 int mds_sendpage(struct ptlrpc_request *req, struct file *file,
37                  __u64 offset, struct niobuf *dst)
38 {
39         int rc = 0;
40         mm_segment_t oldfs = get_fs();
41         struct ptlrpc_bulk_desc *bulk;
42         char *buf;
43
44         bulk = ptlrpc_prep_bulk(req->rq_connection);
45         if (bulk == NULL) {
46                 rc = -ENOMEM;
47                 GOTO(out, rc);
48         }
49
50         bulk->b_xid = req->rq_reqmsg->xid;
51
52         OBD_ALLOC(buf, PAGE_SIZE);
53         if (!buf) {
54                 rc = -ENOMEM;
55                 GOTO(cleanup_bulk, rc);
56         }
57
58         set_fs(KERNEL_DS);
59         rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
60                              (loff_t *)&offset);
61         set_fs(oldfs);
62
63         if (rc != PAGE_SIZE) {
64                 rc = -EIO;
65                 GOTO(cleanup_buf, rc);
66         }
67
68         bulk->b_buf = buf;
69         bulk->b_buflen = PAGE_SIZE;
70
71         rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
72         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
73                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
74                        OBD_FAIL_MDS_SENDPAGE, rc);
75                 PtlMDUnlink(bulk->b_md_h);
76                 GOTO(cleanup_buf, rc);
77         }
78         wait_event_interruptible(bulk->b_waitq,
79                                  ptlrpc_check_bulk_sent(bulk));
80
81         if (bulk->b_flags & PTL_RPC_FL_INTR) {
82                 rc = -EINTR;
83                 GOTO(cleanup_buf, rc);
84         }
85
86         EXIT;
87  cleanup_buf:
88         OBD_FREE(buf, PAGE_SIZE);
89  cleanup_bulk:
90         ptlrpc_free_bulk(bulk);
91  out:
92         return rc;
93 }
94
95 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
96                               struct vfsmount **mnt)
97 {
98         /* stolen from NFS */
99         struct super_block *sb = mds->mds_sb;
100         unsigned long ino = fid->id;
101         __u32 generation = fid->generation;
102         struct inode *inode;
103         struct list_head *lp;
104         struct dentry *result;
105
106         if (ino == 0)
107                 return ERR_PTR(-ESTALE);
108
109         inode = iget(sb, ino);
110         if (inode == NULL)
111                 return ERR_PTR(-ENOMEM);
112
113         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
114
115         if (is_bad_inode(inode) ||
116             (generation && inode->i_generation != generation)) {
117                 /* we didn't find the right inode.. */
118                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
119                         inode->i_ino,
120                         inode->i_nlink, atomic_read(&inode->i_count),
121                         inode->i_generation,
122                         generation);
123                 LBUG();
124                 iput(inode);
125                 return ERR_PTR(-ESTALE);
126         }
127
128         /* now to find a dentry.
129          * If possible, get a well-connected one
130          */
131         if (mnt)
132                 *mnt = mds->mds_vfsmnt;
133         spin_lock(&dcache_lock);
134         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
135                 result = list_entry(lp,struct dentry, d_alias);
136                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
137                         dget_locked(result);
138                         result->d_vfs_flags |= DCACHE_REFERENCED;
139                         spin_unlock(&dcache_lock);
140                         iput(inode);
141                         if (mnt)
142                                 mntget(*mnt);
143                         return result;
144                 }
145         }
146         spin_unlock(&dcache_lock);
147         result = d_alloc_root(inode);
148         if (result == NULL) {
149                 iput(inode);
150                 return ERR_PTR(-ENOMEM);
151         }
152         if (mnt)
153                 mntget(*mnt);
154         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
155         return result;
156 }
157
158 #define MDS_MAX_CLIENTS 1024
159 #define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
160
161 static unsigned long last_rcvd_slots[MDS_MAX_CLIENT_WORDS];
162
163 /* Add client data to the MDS.  The in-memory storage will be a hash at some
164  * point.  We use a bitmap to locate a free space in the last_rcvd file if
165  * cl_off is -1 (i.e. a new client).  Otherwise, we have just read the data
166  * from the last_rcvd file and we know its offset.
167  */
168 int mds_client_add(struct mds_obd *mds, struct mds_client_data *mcd, int cl_off)
169 {
170         struct mds_client_info *mci;
171
172         OBD_ALLOC(mci, sizeof(*mci));
173         if (!mci) {
174                 CERROR("no memory for MDS client info\n");
175                 RETURN(-ENOMEM);
176         }
177
178         CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
179                cl_off, mcd->mcd_uuid);
180
181         if (cl_off == -1) {
182                 unsigned long *word;
183                 int bit;
184
185         repeat:
186                 word = last_rcvd_slots;
187                 while(*word == ~0UL)
188                         ++word;
189                 if (word - last_rcvd_slots >= MDS_MAX_CLIENT_WORDS) {
190                         CERROR("no room in client MDS bitmap - fix code\n");
191                         return -ENOMEM;
192                 }
193                 bit = ffz(*word);
194                 if (test_and_set_bit(bit, word)) {
195                         CERROR("found bit %d set for word %d - fix code\n",
196                                bit, word - last_rcvd_slots);
197                         goto repeat;
198                 }
199                 cl_off = word - last_rcvd_slots + bit;
200         } else {
201                 unsigned long *word;
202                 int bit;
203
204                 word = last_rcvd_slots + cl_off / sizeof(unsigned long);
205                 bit = cl_off % sizeof(unsigned long);
206
207                 if (test_and_set_bit(bit, word)) {
208                         CERROR("bit %d already set in word %d - bad bad\n",
209                                bit, word - last_rcvd_slots);
210                         LBUG();
211                 }
212         }
213
214         mci->mci_mcd = mcd;
215         mci->mci_off = cl_off;
216
217         /* For now we just put the clients in a list, not a hashed list */
218         list_add_tail(&mci->mci_list, &mds->mds_client_info);
219
220         mds->mds_client_count++;
221
222         return 0;
223 }
224
225 void mds_client_del(struct mds_obd *mds, struct mds_client_info *mci)
226 {
227         unsigned long *word;
228         int bit;
229
230         word = last_rcvd_slots + mci->mci_off / sizeof(unsigned long);
231         bit = mci->mci_off % sizeof(unsigned long);
232
233         if (!test_and_clear_bit(bit, word)) {
234                 CERROR("bit %d already clear in word %d - bad bad\n",
235                        bit, word - last_rcvd_slots);
236                 LBUG();
237         }
238
239         --mds->mds_client_count;
240         list_del(&mci->mci_list);
241         OBD_FREE(mci->mci_mcd, sizeof(*mci->mci_mcd));
242         OBD_FREE(mci, sizeof (*mci));
243 }
244
245 int mds_client_free_all(struct mds_obd *mds)
246 {
247         struct list_head *p, *n;
248
249         list_for_each_safe(p, n, &mds->mds_client_info) {
250                 struct mds_client_info *mci;
251
252                 mci = list_entry(p, struct mds_client_info, mci_list);
253                 mds_client_del(mds, mci);
254         }
255
256         return 0;
257 }
258
259 int mds_server_free_data(struct mds_obd *mds)
260 {
261         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
262         mds->mds_server_data = NULL;
263
264         return 0;
265 }
266
267 int mds_connect(struct ptlrpc_request *req)
268 {
269         struct mds_body *body;
270         struct mds_obd *mds = &req->rq_obd->u.mds;
271         struct mds_client_info *mci;
272         struct mds_client_data *mcd;
273         int rc, size = sizeof(*body);
274         ENTRY;
275
276         CDEBUG(D_INFO, "MDS connect from UUID '%s'\n", ptlrpc_req_to_uuid(req));
277         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
278         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_PACK)) {
279                 CERROR("mds: out of memory for message: size=%d\n", size);
280                 req->rq_status = -ENOMEM;
281                 RETURN(0);
282         }
283
284         body = lustre_msg_buf(req->rq_reqmsg, 0);
285         mds_unpack_req_body(req);
286         /* Anything we need to do here with the client's trans no or so? */
287
288         body = lustre_msg_buf(req->rq_repmsg, 0);
289         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
290
291         mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
292         if (!mci) {
293                 /* We don't have any old connection data for this client */
294                 int rc;
295
296                 CDEBUG(D_INFO, "allocating new client data for UUID '%s'",
297                        ptlrpc_req_to_uuid(req));
298
299                 OBD_ALLOC(mcd, sizeof(*mcd));
300                 if (!mcd) {
301                         CERROR("mds: out of memory for client data\n");
302                         req->rq_status = -ENOMEM;
303                         RETURN(0);
304                 }
305                 rc = mds_client_add(mds, mcd, -1);
306                 if (rc) {
307                         req->rq_status = rc;
308                         RETURN(0);
309                 }
310         } else {
311                 /* We have old connection data for this client... */
312                 mcd = mci->mci_mcd;
313                 CDEBUG(D_INFO, "found existing data for UUID '%s' at #%d\n",
314                        mcd->mcd_uuid, mci->mci_off);
315         }
316         /* Still not 100% sure whether we should reply with the server
317          * last_rcvd or that of this client.  I'm not sure it even makes
318          * a difference on a per-client basis, because last_rcvd is global
319          * and we are not supposed to allow transactions while in recovery.
320          */
321         body->last_xid = le32_to_cpu(mcd->mcd_last_xid);
322         body->last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
323         //body->last_rcvd = mds->mds_last_rcvd;
324         body->last_committed = mds->mds_last_committed;
325         CDEBUG(D_INFO, "last_rcvd %ld, last_committed %ld, last_xid %d\n",
326                (unsigned long)body->last_rcvd,
327                (unsigned long)body->last_committed, body->last_xid);
328         mds_pack_rep_body(req);
329         RETURN(0);
330 }
331
332 int mds_getattr(struct ptlrpc_request *req)
333 {
334         struct dentry *de;
335         struct inode *inode;
336         struct mds_body *body;
337         struct mds_obd *mds = &req->rq_obd->u.mds;
338         int rc, size = sizeof(*body);
339         ENTRY;
340
341         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
342         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
343                 CERROR("mds: out of memory\n");
344                 req->rq_status = -ENOMEM;
345                 RETURN(0);
346         }
347
348         body = lustre_msg_buf(req->rq_reqmsg, 0);
349         de = mds_fid2dentry(mds, &body->fid1, NULL);
350         if (IS_ERR(de)) {
351                 req->rq_status = -ENOENT;
352                 RETURN(0);
353         }
354
355         body = lustre_msg_buf(req->rq_repmsg, 0);
356         inode = de->d_inode;
357         body->ino = inode->i_ino;
358         body->generation = inode->i_generation;
359         body->atime = inode->i_atime;
360         body->ctime = inode->i_ctime;
361         body->mtime = inode->i_mtime;
362         body->uid = inode->i_uid;
363         body->gid = inode->i_gid;
364         body->size = inode->i_size;
365         body->mode = inode->i_mode;
366         body->nlink = inode->i_nlink;
367         body->valid = ~0;
368         body->last_committed = mds->mds_last_committed;
369         mds_fs_get_objid(mds, inode, &body->objid);
370         l_dput(de);
371         RETURN(0);
372 }
373
374 int mds_open(struct ptlrpc_request *req)
375 {
376         struct mds_obd *mds = &req->rq_obd->u.mds;
377         struct dentry *de;
378         struct mds_body *body;
379         struct file *file;
380         struct vfsmount *mnt;
381         __u32 flags;
382         int rc, size = sizeof(*body);
383         ENTRY;
384
385         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
386         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
387                 CERROR("mds: out of memory\n");
388                 req->rq_status = -ENOMEM;
389                 RETURN(0);
390         }
391
392         body = lustre_msg_buf(req->rq_reqmsg, 0);
393         de = mds_fid2dentry(mds, &body->fid1, &mnt);
394         if (IS_ERR(de)) {
395                 req->rq_status = -ENOENT;
396                 RETURN(0);
397         }
398         flags = body->flags;
399         file = dentry_open(de, mnt, flags);
400         if (!file || IS_ERR(file)) {
401                 req->rq_status = -EINVAL;
402                 RETURN(0);
403         }
404
405         body = lustre_msg_buf(req->rq_repmsg, 0);
406         body->objid = (__u64) (unsigned long)file;
407         body->last_committed = mds->mds_last_committed;
408         RETURN(0);
409 }
410
411 int mds_close(struct ptlrpc_request *req)
412 {
413         struct dentry *de;
414         struct mds_body *body;
415         struct file *file;
416         struct vfsmount *mnt;
417         int rc;
418         ENTRY;
419
420         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
421         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
422                 CERROR("mds: out of memory\n");
423                 req->rq_status = -ENOMEM;
424                 RETURN(0);
425         }
426
427         body = lustre_msg_buf(req->rq_reqmsg, 0);
428         de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
429         if (IS_ERR(de)) {
430                 req->rq_status = -ENOENT;
431                 RETURN(0);
432         }
433
434         file = (struct file *)(unsigned long)body->objid;
435         req->rq_status = filp_close(file, 0);
436         l_dput(de);
437         mntput(mnt);
438
439         RETURN(0);
440 }
441
442 int mds_readpage(struct ptlrpc_request *req)
443 {
444         struct vfsmount *mnt;
445         struct dentry *de;
446         struct file *file;
447         struct niobuf *niobuf;
448         struct mds_body *body;
449         int rc, size = sizeof(*body);
450         ENTRY;
451
452         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
453         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
454                 CERROR("mds: out of memory\n");
455                 req->rq_status = -ENOMEM;
456                 RETURN(0);
457         }
458
459         body = lustre_msg_buf(req->rq_reqmsg, 0);
460         de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
461         if (IS_ERR(de)) {
462                 req->rq_status = PTR_ERR(de);
463                 RETURN(0);
464         }
465
466         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
467
468         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
469         /* note: in case of an error, dentry_open puts dentry */
470         if (IS_ERR(file)) {
471                 req->rq_status = PTR_ERR(file);
472                 RETURN(0);
473         }
474
475         niobuf = lustre_msg_buf(req->rq_reqmsg, 1);
476         if (!niobuf) {
477                 req->rq_status = -EINVAL;
478                 LBUG();
479                 RETURN(0);
480         }
481
482         /* to make this asynchronous make sure that the handling function
483            doesn't send a reply when this function completes. Instead a
484            callback function would send the reply */
485         rc = mds_sendpage(req, file, body->size, niobuf);
486
487         filp_close(file, 0);
488         req->rq_status = rc;
489         RETURN(0);
490 }
491
492 int mds_reint(struct ptlrpc_request *req)
493 {
494         int rc;
495         struct mds_update_record rec;
496
497         rc = mds_update_unpack(req, &rec);
498         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
499                 CERROR("invalid record\n");
500                 req->rq_status = -EINVAL;
501                 RETURN(0);
502         }
503         /* rc will be used to interrupt a for loop over multiple records */
504         rc = mds_reint_rec(&rec, req);
505         return 0;
506 }
507
508 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
509                struct ptlrpc_request *req)
510 {
511         int rc;
512         ENTRY;
513
514         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
515         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
516                 CERROR("lustre_mds: Invalid request\n");
517                 GOTO(out, rc);
518         }
519
520         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
521                 CERROR("lustre_mds: wrong packet type sent %d\n",
522                        req->rq_reqmsg->type);
523                 GOTO(out, rc = -EINVAL);
524         }
525
526         switch (req->rq_reqmsg->opc) {
527         case MDS_CONNECT:
528                 CDEBUG(D_INODE, "getattr\n");
529                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
530                 rc = mds_connect(req);
531                 break;
532
533         case MDS_GETATTR:
534                 CDEBUG(D_INODE, "getattr\n");
535                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
536                 rc = mds_getattr(req);
537                 break;
538
539         case MDS_READPAGE:
540                 CDEBUG(D_INODE, "readpage\n");
541                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
542                 rc = mds_readpage(req);
543
544                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
545                         return 0;
546                 break;
547
548         case MDS_REINT:
549                 CDEBUG(D_INODE, "reint\n");
550                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
551                 rc = mds_reint(req);
552                 break;
553
554         case MDS_OPEN:
555                 CDEBUG(D_INODE, "open\n");
556                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
557                 rc = mds_open(req);
558                 break;
559
560         case MDS_CLOSE:
561                 CDEBUG(D_INODE, "close\n");
562                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
563                 rc = mds_close(req);
564                 break;
565
566         default:
567                 rc = ptlrpc_error(svc, req);
568                 RETURN(rc);
569         }
570
571         EXIT;
572 out:
573         if (rc) {
574                 ptlrpc_error(svc, req);
575         } else {
576                 CDEBUG(D_NET, "sending reply\n");
577                 ptlrpc_reply(svc, req);
578         }
579
580         return 0;
581 }
582
583 /* This will be a hash table at some point. */
584 int mds_init_client_data(struct mds_obd *mds)
585 {
586         INIT_LIST_HEAD(&mds->mds_client_info);
587         return 0;
588 }
589
590 #define LAST_RCVD "last_rcvd"
591
592 int mds_read_last_rcvd(struct mds_obd *mds, struct file *f)
593 {
594         struct mds_server_data *msd;
595         struct mds_client_data *mcd = NULL;
596         loff_t fsize = f->f_dentry->d_inode->i_size;
597         loff_t off = 0;
598         int cl_off;
599         __u64 last_rcvd = 0;
600         __u64 last_mount;
601         int rc = 0;
602
603         OBD_ALLOC(msd, sizeof(*msd));
604         if (!msd)
605                 RETURN(-ENOMEM);
606         rc = lustre_fread(f, (char *)msd, sizeof(*msd), &off);
607
608         mds->mds_server_data = msd;
609         if (rc == 0) {
610                 CERROR("empty MDS %s, new MDS?\n", LAST_RCVD);
611                 RETURN(0);
612         } else if (rc != sizeof(*msd)) {
613                 CERROR("error reading MDS %s: rc = %d\n", LAST_RCVD, rc);
614                 if (rc > 0) {
615                         rc = -EIO;
616                 }
617                 GOTO(err_msd, rc);
618         }
619
620         /*
621          * When we do a clean MDS shutdown, we save the last_rcvd into
622          * the header.  If we find clients with higher last_rcvd values
623          * then those clients may need recovery done.
624          */
625         last_rcvd = le64_to_cpu(msd->msd_last_rcvd);
626         mds->mds_last_rcvd = last_rcvd;
627         CDEBUG(D_INODE, "got %Ld for server last_rcvd value\n",
628                (unsigned long long)last_rcvd);
629
630         last_mount = le64_to_cpu(msd->msd_mount_count);
631         mds->mds_mount_count = last_mount;
632         CDEBUG(D_INODE, "got %Ld for server last_mount value\n",
633                (unsigned long long)last_mount);
634
635         for (off = MDS_LR_CLIENT, cl_off = 0, rc = sizeof(*mcd);
636              off <= fsize - sizeof(*mcd) && rc == sizeof(*mcd);
637              off = MDS_LR_CLIENT + ++cl_off * MDS_LR_SIZE) {
638                 if (!mcd)
639                         OBD_ALLOC(mcd, sizeof(*mcd));
640                 if (!mcd)
641                         GOTO(err_msd, rc = -ENOMEM);
642
643                 rc = lustre_fread(f, (char *)mcd, sizeof(*mcd), &off);
644                 if (rc != sizeof(*mcd)) {
645                         CERROR("error reading MDS %s offset %d: rc = %d\n",
646                                LAST_RCVD, cl_off, rc);
647                         if (rc > 0)
648                                 rc = -EIO;
649                         break;
650                 }
651
652                 last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
653                 last_mount = le64_to_cpu(mcd->mcd_mount_count);
654
655                 if (last_rcvd &&
656                     last_mount - mcd->mcd_mount_count < MDS_MOUNT_RECOV) {
657                         rc = mds_client_add(mds, mcd, cl_off);
658                         if (rc) {
659                                 rc = 0;
660                                 break;
661                         }
662                         mcd = NULL;
663                 } else {
664                         CDEBUG(D_INFO,
665                                "client at offset %d with UUID '%s' ignored\n",
666                                cl_off, mcd->mcd_uuid);
667                 }
668
669                 if (last_rcvd > mds->mds_last_rcvd) {
670                         CDEBUG(D_OTHER,
671                                "client at offset %d has last_rcvd = %Ld\n",
672                                cl_off, (unsigned long long)last_rcvd);
673                         mds->mds_last_rcvd = last_rcvd;
674                 }
675         }
676         CDEBUG(D_INODE, "got %Ld for highest last_rcvd value, %d clients\n",
677                (unsigned long long)mds->mds_last_rcvd, mds->mds_client_count);
678
679         /* After recovery, there can be no local uncommitted transactions */
680         mds->mds_last_committed = mds->mds_last_rcvd;
681
682         return 0;
683
684 err_msd:
685         mds_server_free_data(mds);
686         return rc;
687 }
688
689 static int mds_prep(struct obd_device *obddev)
690 {
691         struct obd_run_ctxt saved;
692         struct mds_obd *mds = &obddev->u.mds;
693         struct super_operations *s_ops;
694         struct file *f;
695         int rc;
696
697         push_ctxt(&saved, &mds->mds_ctxt);
698         rc = simple_mkdir(current->fs->pwd, "ROOT", 0700);
699         if (rc && rc != -EEXIST) {
700                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
701                 GOTO(err_pop, rc);
702         }
703         f = filp_open("ROOT", O_RDONLY, 0);
704         if (IS_ERR(f)) {
705                 rc = PTR_ERR(f);
706                 CERROR("cannot open ROOT: rc = %d\n", rc);
707                 LBUG();
708                 GOTO(err_pop, rc);
709         }
710
711         mds->mds_rootfid.id = f->f_dentry->d_inode->i_ino;
712         mds->mds_rootfid.generation = f->f_dentry->d_inode->i_generation;
713         mds->mds_rootfid.f_type = S_IFDIR;
714
715         rc = filp_close(f, 0);
716         if (rc) {
717                 CERROR("cannot close ROOT: rc = %d\n", rc);
718                 LBUG();
719         }
720
721         rc = simple_mkdir(current->fs->pwd, "FH", 0700);
722         if (rc && rc != -EEXIST) {
723                 CERROR("cannot create FH directory: rc = %d\n", rc);
724                 GOTO(err_pop, rc);
725         }
726
727         rc = mds_init_client_data(mds);
728         if (rc)
729                 GOTO(err_pop, rc);
730
731         f = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
732         if (IS_ERR(f)) {
733                 rc = PTR_ERR(f);
734                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
735                 GOTO(err_pop, rc = PTR_ERR(f));
736         }
737         if (!S_ISREG(f->f_dentry->d_inode->i_mode)) {
738                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
739                        f->f_dentry->d_inode->i_mode);
740                 GOTO(err_pop, rc = -ENOENT);
741         }
742
743         rc = mds_fs_journal_data(mds, f);
744         if (rc) {
745                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
746                 GOTO(err_filp, rc);
747         }
748
749         rc = mds_read_last_rcvd(mds, f);
750         if (rc) {
751                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
752                 GOTO(err_client, rc);
753         }
754         mds->mds_rcvd_filp = f;
755         pop_ctxt(&saved);
756
757         /*
758          * Replace the client filesystem delete_inode method with our own,
759          * so that we can clear the object ID before the inode is deleted.
760          * The fs_delete_inode method will call cl_delete_inode for us.
761          *
762          * We need to do this for the MDS superblock only, hence we install
763          * a modified copy of the original superblock method table.
764          *
765          * We still assume that there is only a single MDS client filesystem
766          * type, as we don't have access to the mds struct in delete_inode
767          * and store the client delete_inode method in a global table.  This
768          * will only become a problem when multiple MDSs are running on a
769          * single host with different client filesystems.
770          */
771         OBD_ALLOC(s_ops, sizeof(*s_ops));
772         if (!s_ops)
773                 GOTO(err_filp, rc = -ENOMEM);
774
775         memcpy(s_ops, mds->mds_sb->s_op, sizeof(*s_ops));
776         mds->mds_fsops->cl_delete_inode = s_ops->delete_inode;
777         s_ops->delete_inode = mds->mds_fsops->fs_delete_inode;
778         mds->mds_sb->s_op = s_ops;
779
780         mds->mds_service = ptlrpc_init_svc(128 * 1024,
781                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
782                                            "self", mds_handle);
783
784         if (!mds->mds_service) {
785                 CERROR("failed to start service\n");
786                 GOTO(err_filp, rc = -EINVAL);
787         }
788
789         rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
790         if (rc) {
791                 CERROR("cannot start thread: rc = %d\n", rc);
792                 GOTO(err_svc, rc);
793         }
794
795         RETURN(0);
796
797 err_svc:
798         rpc_unregister_service(mds->mds_service);
799         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
800 err_client:
801         mds_client_free_all(mds);
802 err_filp:
803         if (filp_close(f, 0))
804                 CERROR("can't close %s after error\n", LAST_RCVD);
805 err_pop:
806         pop_ctxt(&saved);
807
808         return rc;
809 }
810
811 /* Update the server data on disk.  This stores the new mount_count and
812  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
813  * then the server last_rcvd value may be less than that of the clients.
814  * This will alert us that we may need to do client recovery.
815  */
816 int mds_update_server_data(struct mds_obd *mds)
817 {
818         struct obd_run_ctxt saved;
819         struct mds_server_data *msd = mds->mds_server_data;
820         struct file *filp = mds->mds_rcvd_filp;
821         loff_t off = 0;
822         int rc;
823
824         msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
825         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
826
827         CDEBUG(D_SUPER, "MDS mount_count is %Ld, last_rcvd is %Ld\n",
828                (unsigned long long)mds->mds_mount_count,
829                (unsigned long long)mds->mds_last_rcvd);
830         push_ctxt(&saved, &mds->mds_ctxt);
831         rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
832         if (rc != sizeof(*msd)) {
833                 CERROR("error writing MDS server data: rc = %d\n", rc);
834                 if (rc > 0)
835                         RETURN(-EIO);
836                 RETURN(rc);
837         }
838         rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
839         pop_ctxt(&saved);
840         if (rc)
841                 CERROR("error flushing MDS server data: rc = %d\n", rc);
842
843         return 0;
844 }
845
846 /* Do recovery actions for the MDS */
847 static int mds_recover(struct obd_device *obddev)
848 {
849         struct mds_obd *mds = &obddev->u.mds;
850         int rc;
851
852         /* This happens at the end when recovery is complete */
853         ++mds->mds_mount_count;
854         rc = mds_update_server_data(mds);
855
856         return rc;
857 }
858
859 static int mds_cleanup(struct obd_device *obddev);
860
861 /* mount the file system (secretly) */
862 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
863 {
864         struct obd_ioctl_data* data = buf;
865         struct mds_obd *mds = &obddev->u.mds;
866         struct vfsmount *mnt;
867         int rc = 0;
868         ENTRY;
869
870 #ifdef CONFIG_DEV_RDONLY
871         dev_clear_rdonly(2);
872 #endif
873         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
874                 RETURN(-EINVAL);
875
876         mds->mds_fstype = strdup(data->ioc_inlbuf2);
877
878         if (!strcmp(mds->mds_fstype, "ext3"))
879                 mds->mds_fsops = &mds_ext3_fs_ops;
880         else if (!strcmp(mds->mds_fstype, "ext2"))
881                 mds->mds_fsops = &mds_ext2_fs_ops;
882         else {
883                 CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype);
884                 GOTO(err_kfree, rc = -EPERM);
885         }
886
887         MOD_INC_USE_COUNT;
888         mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
889         if (IS_ERR(mnt)) {
890                 rc = PTR_ERR(mnt);
891                 CERROR("do_kern_mount failed: rc = %d\n", rc);
892                 GOTO(err_dec, rc);
893         }
894
895         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
896         if (!mds->mds_sb)
897                 GOTO(err_put, rc = -ENODEV);
898
899         mds->mds_vfsmnt = mnt;
900         mds->mds_ctxt.pwdmnt = mnt;
901         mds->mds_ctxt.pwd = mnt->mnt_root;
902         mds->mds_ctxt.fs = KERNEL_DS;
903
904         rc = mds_prep(obddev);
905         if (rc)
906                 GOTO(err_put, rc);
907
908         rc = mds_recover(obddev);
909         if (rc) {
910                 mds_cleanup(obddev);
911                 RETURN(rc);
912         }
913
914         RETURN(0);
915
916 err_put:
917         unlock_kernel();
918         mntput(mds->mds_vfsmnt);
919         mds->mds_sb = 0;
920         lock_kernel();
921 err_dec:
922         MOD_DEC_USE_COUNT;
923 err_kfree:
924         kfree(mds->mds_fstype);
925         return rc;
926 }
927
928 static int mds_cleanup(struct obd_device * obddev)
929 {
930         struct super_operations *s_ops = NULL;
931         struct super_block *sb;
932         struct mds_obd *mds = &obddev->u.mds;
933
934         ENTRY;
935
936         if ( !list_empty(&obddev->obd_gen_clients) ) {
937                 CERROR("still has clients!\n");
938                 RETURN(-EBUSY);
939         }
940
941         ptlrpc_stop_thread(mds->mds_service);
942         rpc_unregister_service(mds->mds_service);
943         if (!list_empty(&mds->mds_service->srv_reqs)) {
944                 // XXX reply with errors and clean up
945                 CERROR("Request list not empty!\n");
946         }
947         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
948
949         sb = mds->mds_sb;
950         if (!mds->mds_sb)
951                 RETURN(0);
952
953         mds_client_free_all(mds);
954         mds_update_server_data(mds);
955         mds_server_free_data(mds);
956
957         if (mds->mds_rcvd_filp) {
958                 int rc = filp_close(mds->mds_rcvd_filp, 0);
959                 mds->mds_rcvd_filp = NULL;
960
961                 if (rc)
962                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
963         }
964         s_ops = sb->s_op;
965
966         unlock_kernel();
967         mntput(mds->mds_vfsmnt);
968         mds->mds_sb = 0;
969         kfree(mds->mds_fstype);
970         lock_kernel();
971 #ifdef CONFIG_DEV_RDONLY
972         dev_clear_rdonly(2);
973 #endif
974         OBD_FREE(s_ops, sizeof(*s_ops));
975
976         MOD_DEC_USE_COUNT;
977         RETURN(0);
978 }
979
980 /* use obd ops to offer management infrastructure */
981 static struct obd_ops mds_obd_ops = {
982         o_setup:       mds_setup,
983         o_cleanup:     mds_cleanup,
984 };
985
986 static int __init mds_init(void)
987 {
988         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
989         return 0;
990 }
991
992 static void __exit mds_exit(void)
993 {
994         obd_unregister_type(LUSTRE_MDS_NAME);
995 }
996
997 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
998 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
999 MODULE_LICENSE("GPL");
1000
1001 module_init(mds_init);
1002 module_exit(mds_exit);