Whamcloud - gitweb
Add separate file for MDS filesystem interaction routines.
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *
6  *  Lustre Metadata Server (mds) request handler
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  *  This server is single threaded at present (but can easily be multi threaded)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_MDS
21
22 #include <linux/module.h>
23 #include <linux/lustre_mds.h>
24
25 int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset)
26 {
27         int rc = 0;
28         mm_segment_t oldfs = get_fs();
29         struct ptlrpc_bulk_desc *desc;
30         struct ptlrpc_bulk_page *bulk;
31         char *buf;
32         ENTRY;
33
34         desc = ptlrpc_prep_bulk(req->rq_connection);
35         if (desc == NULL)
36                 GOTO(out, rc = -ENOMEM);
37
38         bulk = ptlrpc_prep_bulk_page(desc);
39         if (bulk == NULL)
40                 GOTO(cleanup_bulk, rc = -ENOMEM);
41
42         OBD_ALLOC(buf, PAGE_SIZE);
43         if (buf == NULL)
44                 GOTO(cleanup_bulk, rc = -ENOMEM);
45
46         set_fs(KERNEL_DS);
47         rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
48                              (loff_t *)&offset);
49         set_fs(oldfs);
50
51         if (rc != PAGE_SIZE)
52                 GOTO(cleanup_buf, rc = -EIO);
53
54         bulk->b_xid = req->rq_reqmsg->xid;
55         bulk->b_buf = buf;
56         bulk->b_buflen = PAGE_SIZE;
57         desc->b_portal = MDS_BULK_PORTAL;
58
59         rc = ptlrpc_send_bulk(desc);
60         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
61                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
62                        OBD_FAIL_MDS_SENDPAGE, rc);
63                 ptlrpc_abort_bulk(desc);
64                 GOTO(cleanup_buf, rc);
65         }
66
67         EXIT;
68  cleanup_buf:
69         OBD_FREE(buf, PAGE_SIZE);
70  cleanup_bulk:
71         ptlrpc_free_bulk(desc);
72  out:
73         return rc;
74 }
75
76 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
77                               struct vfsmount **mnt)
78 {
79         /* stolen from NFS */
80         struct super_block *sb = mds->mds_sb;
81         unsigned long ino = fid->id;
82         __u32 generation = fid->generation;
83         struct inode *inode;
84         struct list_head *lp;
85         struct dentry *result;
86
87         if (ino == 0)
88                 RETURN(ERR_PTR(-ESTALE));
89
90         inode = iget(sb, ino);
91         if (inode == NULL)
92                 RETURN(ERR_PTR(-ENOMEM));
93
94         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
95
96         if (is_bad_inode(inode) ||
97             (generation && inode->i_generation != generation)) {
98                 /* we didn't find the right inode.. */
99                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
100                        inode->i_ino, inode->i_nlink,
101                        atomic_read(&inode->i_count), inode->i_generation,
102                        generation);
103                 LBUG();
104                 iput(inode);
105                 RETURN(ERR_PTR(-ESTALE));
106         }
107
108         /* now to find a dentry.
109          * If possible, get a well-connected one
110          */
111         if (mnt)
112                 *mnt = mds->mds_vfsmnt;
113         spin_lock(&dcache_lock);
114         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
115                 result = list_entry(lp,struct dentry, d_alias);
116                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
117                         dget_locked(result);
118                         result->d_vfs_flags |= DCACHE_REFERENCED;
119                         spin_unlock(&dcache_lock);
120                         iput(inode);
121                         if (mnt)
122                                 mntget(*mnt);
123                         return result;
124                 }
125         }
126         spin_unlock(&dcache_lock);
127         result = d_alloc_root(inode);
128         if (result == NULL) {
129                 iput(inode);
130                 return ERR_PTR(-ENOMEM);
131         }
132         if (mnt)
133                 mntget(*mnt);
134         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
135         return result;
136 }
137
138 #define MDS_MAX_CLIENTS 1024
139 #define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
140
141 static unsigned long last_rcvd_slots[MDS_MAX_CLIENT_WORDS];
142
143 /* Add client data to the MDS.  The in-memory storage will be a hash at some
144  * point.  We use a bitmap to locate a free space in the last_rcvd file if
145  * cl_off is -1 (i.e. a new client).  Otherwise, we have just read the data
146  * from the last_rcvd file and we know its offset.
147  */
148 int mds_client_add(struct mds_obd *mds, struct mds_client_data *mcd, int cl_off)
149 {
150         struct mds_client_info *mci;
151
152         OBD_ALLOC(mci, sizeof(*mci));
153         if (!mci) {
154                 CERROR("no memory for MDS client info\n");
155                 RETURN(-ENOMEM);
156         }
157         INIT_LIST_HEAD(&mci->mci_open_head);
158
159         CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
160                cl_off, mcd->mcd_uuid);
161
162         if (cl_off == -1) {
163                 unsigned long *word;
164                 int bit;
165
166         repeat:
167                 word = last_rcvd_slots;
168                 while(*word == ~0UL)
169                         ++word;
170                 if (word - last_rcvd_slots >= MDS_MAX_CLIENT_WORDS) {
171                         CERROR("no room in client MDS bitmap - fix code\n");
172                         return -ENOMEM;
173                 }
174                 bit = ffz(*word);
175                 if (test_and_set_bit(bit, word)) {
176                         CERROR("found bit %d set for word %d - fix code\n",
177                                bit, word - last_rcvd_slots);
178                         goto repeat;
179                 }
180                 cl_off = word - last_rcvd_slots + bit;
181         } else {
182                 if (test_and_set_bit(cl_off, last_rcvd_slots)) {
183                         CERROR("bit %d already set in bitmap - bad bad\n",
184                                cl_off);
185                         LBUG();
186                 }
187         }
188
189         mci->mci_mcd = mcd;
190         mci->mci_off = cl_off;
191
192         /* For now we just put the clients in a list, not a hashed list */
193         list_add_tail(&mci->mci_list, &mds->mds_client_info);
194
195         mds->mds_client_count++;
196
197         return 0;
198 }
199
200 void mds_client_del(struct mds_obd *mds, struct mds_client_info *mci)
201 {
202         unsigned long *word;
203         int bit;
204
205         word = last_rcvd_slots + mci->mci_off / sizeof(unsigned long);
206         bit = mci->mci_off % sizeof(unsigned long);
207
208         if (!test_and_clear_bit(bit, word)) {
209                 CERROR("bit %d already clear in word %d - bad bad\n",
210                        bit, word - last_rcvd_slots);
211                 LBUG();
212         }
213
214         --mds->mds_client_count;
215         list_del(&mci->mci_list);
216         OBD_FREE(mci->mci_mcd, sizeof(*mci->mci_mcd));
217         OBD_FREE(mci, sizeof (*mci));
218 }
219
220 int mds_client_free_all(struct mds_obd *mds)
221 {
222         struct list_head *p, *n;
223
224         list_for_each_safe(p, n, &mds->mds_client_info) {
225                 struct mds_client_info *mci;
226
227                 mci = list_entry(p, struct mds_client_info, mci_list);
228                 mds_client_del(mds, mci);
229         }
230
231         return 0;
232 }
233
234 int mds_server_free_data(struct mds_obd *mds)
235 {
236         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
237         mds->mds_server_data = NULL;
238
239         return 0;
240 }
241
242 int mds_connect(struct ptlrpc_request *req)
243 {
244         struct mds_body *body;
245         struct mds_obd *mds = &req->rq_obd->u.mds;
246         struct mds_client_info *mci;
247         struct mds_client_data *mcd;
248         int rc, size = sizeof(*body);
249         ENTRY;
250
251         CDEBUG(D_INFO, "MDS connect from UUID '%s'\n", ptlrpc_req_to_uuid(req));
252         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
253         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_PACK)) {
254                 CERROR("mds: out of memory for message: size=%d\n", size);
255                 req->rq_status = -ENOMEM;
256                 RETURN(0);
257         }
258
259         body = lustre_msg_buf(req->rq_reqmsg, 0);
260         mds_unpack_req_body(req);
261         /* Anything we need to do here with the client's trans no or so? */
262
263         body = lustre_msg_buf(req->rq_repmsg, 0);
264         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
265
266         mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
267         if (!mci) {
268                 /* We don't have any old connection data for this client */
269                 int rc;
270
271                 CDEBUG(D_INFO, "allocating new client data for UUID '%s'",
272                        ptlrpc_req_to_uuid(req));
273
274                 OBD_ALLOC(mcd, sizeof(*mcd));
275                 if (!mcd) {
276                         CERROR("mds: out of memory for client data\n");
277                         req->rq_status = -ENOMEM;
278                         RETURN(0);
279                 }
280                 rc = mds_client_add(mds, mcd, -1);
281                 if (rc) {
282                         req->rq_status = rc;
283                         RETURN(0);
284                 }
285         } else {
286                 /* We have old connection data for this client... */
287                 mcd = mci->mci_mcd;
288                 CDEBUG(D_INFO, "found existing data for UUID '%s' at #%d\n",
289                        mcd->mcd_uuid, mci->mci_off);
290         }
291         /* mcd_last_xid is is stored in little endian on the disk and 
292            mds_pack_rep_body converts it to network order */
293         body->last_xid = le32_to_cpu(mcd->mcd_last_xid);
294         mds_pack_rep_body(req);
295         RETURN(0);
296 }
297
298 int mds_getattr(struct ptlrpc_request *req)
299 {
300         struct dentry *de;
301         struct inode *inode;
302         struct mds_body *body;
303         struct mds_obd *mds = &req->rq_obd->u.mds;
304         int rc, size[2] = {sizeof(*body)}, count = 1;
305         ENTRY;
306
307         body = lustre_msg_buf(req->rq_reqmsg, 0);
308         de = mds_fid2dentry(mds, &body->fid1, NULL);
309         if (IS_ERR(de)) {
310                 req->rq_status = -ENOENT;
311                 RETURN(0);
312         }
313         inode = de->d_inode;
314         if (body->valid & OBD_MD_LINKNAME) {
315                 count = 2;
316                 size[1] = inode->i_size;
317         }
318
319         rc = lustre_pack_msg(count, size, NULL, &req->rq_replen,
320                              &req->rq_repmsg);
321         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
322                 CERROR("mds: out of memory\n");
323                 req->rq_status = -ENOMEM;
324                 GOTO(out, 0);
325         }
326
327         if (body->valid & OBD_MD_LINKNAME) {
328                 char *tmp = lustre_msg_buf(req->rq_repmsg, 1);
329                 mm_segment_t oldfs;
330
331                 oldfs = get_fs();
332                 set_fs(KERNEL_DS);
333                 rc = inode->i_op->readlink(de, tmp, size[1]);
334                 set_fs(oldfs);
335
336                 if (rc < 0) {
337                         req->rq_status = rc;
338                         CERROR("readlink failed: %d\n", req->rq_status);
339                         GOTO(out, 0);
340                 }
341         }
342
343         body = lustre_msg_buf(req->rq_repmsg, 0);
344         body->ino = inode->i_ino;
345         body->generation = inode->i_generation;
346         body->atime = inode->i_atime;
347         body->ctime = inode->i_ctime;
348         body->mtime = inode->i_mtime;
349         body->uid = inode->i_uid;
350         body->gid = inode->i_gid;
351         body->size = inode->i_size;
352         body->mode = inode->i_mode;
353         body->nlink = inode->i_nlink;
354         body->valid = ~0;
355         mds_fs_get_objid(mds, inode, &body->objid);
356  out:
357         l_dput(de);
358         RETURN(0);
359 }
360
361 int mds_open(struct ptlrpc_request *req)
362 {
363         struct mds_obd *mds = &req->rq_obd->u.mds;
364         struct dentry *de;
365         struct mds_body *body;
366         struct file *file;
367         struct vfsmount *mnt;
368         struct mds_client_info *mci;
369         __u32 flags;
370         struct list_head *tmp;
371         struct mds_file_data *mfd;
372         int rc, size = sizeof(*body);
373         ENTRY;
374
375
376         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
377         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
378                 CERROR("mds: out of memory\n");
379                 req->rq_status = -ENOMEM;
380                 RETURN(0);
381         }
382
383
384         mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
385         if (!mci) { 
386                 CERROR("mds: no mci!\n");
387                 req->rq_status = -ENOTCONN;
388                 RETURN(0);
389         }
390
391         body = lustre_msg_buf(req->rq_reqmsg, 0);
392
393         /* was this animal open already? */
394         list_for_each(tmp, &mci->mci_open_head) { 
395                 struct mds_file_data *fd;
396                 fd = list_entry(tmp, struct mds_file_data, mfd_list);
397                 if (body->objid == fd->mfd_clientfd && 
398                     body->fid1.id == fd->mfd_file->f_dentry->d_inode->i_ino) { 
399                         CERROR("Re opening %Ld\n", body->fid1.id);
400                         RETURN(0);
401                 }
402         }
403
404         OBD_ALLOC(mfd, sizeof(*mfd));
405         if (!mfd) { 
406                 CERROR("mds: out of memory\n");
407                 req->rq_status = -ENOMEM;
408                 RETURN(0);
409         }
410
411         de = mds_fid2dentry(mds, &body->fid1, &mnt);
412         if (IS_ERR(de)) {
413                 req->rq_status = -ENOENT;
414                 RETURN(0);
415         }
416
417         flags = body->flags;
418         file = dentry_open(de, mnt, flags);
419         if (!file || IS_ERR(file)) {
420                 req->rq_status = -EINVAL;
421                 OBD_FREE(mfd, sizeof(*mfd));
422                 RETURN(0);
423         }
424
425         file->private_data = mfd;
426         mfd->mfd_file = file;
427         mfd->mfd_clientfd = body->objid;
428         list_add(&mfd->mfd_list, &mci->mci_open_head); 
429
430         body = lustre_msg_buf(req->rq_repmsg, 0);
431         body->objid = (__u64) (unsigned long)file;
432         RETURN(0);
433 }
434
435 int mds_close(struct ptlrpc_request *req)
436 {
437         struct dentry *de;
438         struct mds_body *body;
439         struct file *file;
440         struct vfsmount *mnt;
441         struct mds_file_data *mfd;
442         int rc;
443         ENTRY;
444
445         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
446         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
447                 CERROR("mds: out of memory\n");
448                 req->rq_status = -ENOMEM;
449                 RETURN(0);
450         }
451
452         body = lustre_msg_buf(req->rq_reqmsg, 0);
453         de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
454         if (IS_ERR(de)) {
455                 req->rq_status = -ENOENT;
456                 RETURN(0);
457         }
458
459         file = (struct file *)(unsigned long)body->objid;
460         if (!file->f_dentry) 
461                 LBUG();
462         mfd = (struct mds_file_data *)file->private_data;
463         list_del(&mfd->mfd_list);
464         OBD_FREE(mfd, sizeof(*mfd));
465
466         req->rq_status = filp_close(file, 0);
467         l_dput(de);
468         mntput(mnt);
469
470         RETURN(0);
471 }
472
473 int mds_readpage(struct ptlrpc_request *req)
474 {
475         struct vfsmount *mnt;
476         struct dentry *de;
477         struct file *file;
478         struct mds_body *body;
479         int rc, size = sizeof(*body);
480         ENTRY;
481
482         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
483         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
484                 CERROR("mds: out of memory\n");
485                 req->rq_status = -ENOMEM;
486                 RETURN(0);
487         }
488
489         body = lustre_msg_buf(req->rq_reqmsg, 0);
490         de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
491         if (IS_ERR(de)) {
492                 req->rq_status = PTR_ERR(de);
493                 RETURN(0);
494         }
495
496         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
497
498         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
499         /* note: in case of an error, dentry_open puts dentry */
500         if (IS_ERR(file)) {
501                 req->rq_status = PTR_ERR(file);
502                 RETURN(0);
503         }
504
505         /* to make this asynchronous make sure that the handling function
506            doesn't send a reply when this function completes. Instead a
507            callback function would send the reply */
508         rc = mds_sendpage(req, file, body->size);
509
510         filp_close(file, 0);
511         req->rq_status = rc;
512         RETURN(0);
513 }
514
515 int mds_reint(struct ptlrpc_request *req)
516 {
517         int rc;
518         struct mds_update_record rec;
519
520         rc = mds_update_unpack(req, &rec);
521         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
522                 CERROR("invalid record\n");
523                 req->rq_status = -EINVAL;
524                 RETURN(0);
525         }
526         /* rc will be used to interrupt a for loop over multiple records */
527         rc = mds_reint_rec(&rec, req);
528         return 0;
529 }
530
531 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
532                struct ptlrpc_request *req)
533 {
534         struct mds_obd *mds = &req->rq_obd->u.mds;
535         int rc;
536         ENTRY;
537
538         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
539         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
540                 CERROR("lustre_mds: Invalid request\n");
541                 GOTO(out, rc);
542         }
543
544         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
545                 CERROR("lustre_mds: wrong packet type sent %d\n",
546                        req->rq_reqmsg->type);
547                 GOTO(out, rc = -EINVAL);
548         }
549
550         switch (req->rq_reqmsg->opc) {
551         case MDS_CONNECT:
552                 CDEBUG(D_INODE, "getattr\n");
553                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
554                 rc = mds_connect(req);
555                 break;
556
557         case MDS_GETATTR:
558                 CDEBUG(D_INODE, "getattr\n");
559                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
560                 rc = mds_getattr(req);
561                 break;
562
563         case MDS_READPAGE:
564                 CDEBUG(D_INODE, "readpage\n");
565                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
566                 rc = mds_readpage(req);
567
568                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
569                         return 0;
570                 break;
571
572         case MDS_REINT:
573                 CDEBUG(D_INODE, "reint\n");
574                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
575                 rc = mds_reint(req);
576                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
577                 break;
578
579         case MDS_OPEN:
580                 CDEBUG(D_INODE, "open\n");
581                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
582                 rc = mds_open(req);
583                 break;
584
585         case MDS_CLOSE:
586                 CDEBUG(D_INODE, "close\n");
587                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
588                 rc = mds_close(req);
589                 break;
590
591         default:
592                 rc = ptlrpc_error(svc, req);
593                 RETURN(rc);
594         }
595
596         EXIT;
597 out:
598         /* Still not 100% sure whether we should reply with the server
599          * last_rcvd or that of this client.  I'm not sure it even makes
600          * a difference on a per-client basis, because last_rcvd is global
601          * and we are not supposed to allow transactions while in recovery.
602          */
603         req->rq_repmsg->last_rcvd = HTON__u64(mds->mds_last_rcvd);
604         req->rq_repmsg->last_committed = HTON__u64(mds->mds_last_committed);
605         CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu, xid %d\n",
606                (unsigned long long)mds->mds_last_rcvd,
607                (unsigned long long)mds->mds_last_committed, 
608                cpu_to_le32(req->rq_reqmsg->xid));
609         if (rc) {
610                 ptlrpc_error(svc, req);
611         } else {
612                 CDEBUG(D_NET, "sending reply\n");
613                 ptlrpc_reply(svc, req);
614         }
615
616         return 0;
617 }
618
619 /* This will be a hash table at some point. */
620 int mds_init_client_data(struct mds_obd *mds)
621 {
622         INIT_LIST_HEAD(&mds->mds_client_info);
623         return 0;
624 }
625
626 #define LAST_RCVD "last_rcvd"
627
628 int mds_read_last_rcvd(struct mds_obd *mds, struct file *f)
629 {
630         struct mds_server_data *msd;
631         struct mds_client_data *mcd = NULL;
632         loff_t fsize = f->f_dentry->d_inode->i_size;
633         loff_t off = 0;
634         int cl_off;
635         __u64 last_rcvd = 0;
636         __u64 last_mount;
637         int rc = 0;
638
639         OBD_ALLOC(msd, sizeof(*msd));
640         if (!msd)
641                 RETURN(-ENOMEM);
642         rc = lustre_fread(f, (char *)msd, sizeof(*msd), &off);
643
644         mds->mds_server_data = msd;
645         if (rc == 0) {
646                 CERROR("empty MDS %s, new MDS?\n", LAST_RCVD);
647                 RETURN(0);
648         } else if (rc != sizeof(*msd)) {
649                 CERROR("error reading MDS %s: rc = %d\n", LAST_RCVD, rc);
650                 if (rc > 0) {
651                         rc = -EIO;
652                 }
653                 GOTO(err_msd, rc);
654         }
655
656         /*
657          * When we do a clean MDS shutdown, we save the last_rcvd into
658          * the header.  If we find clients with higher last_rcvd values
659          * then those clients may need recovery done.
660          */
661         last_rcvd = le64_to_cpu(msd->msd_last_rcvd);
662         mds->mds_last_rcvd = last_rcvd;
663         CDEBUG(D_INODE, "got %Lu for server last_rcvd value\n",
664                (unsigned long long)last_rcvd);
665
666         last_mount = le64_to_cpu(msd->msd_mount_count);
667         mds->mds_mount_count = last_mount;
668         CDEBUG(D_INODE, "got %Lu for server last_mount value\n",
669                (unsigned long long)last_mount);
670
671         for (off = MDS_LR_CLIENT, cl_off = 0, rc = sizeof(*mcd);
672              off <= fsize - sizeof(*mcd) && rc == sizeof(*mcd);
673              off = MDS_LR_CLIENT + ++cl_off * MDS_LR_SIZE) {
674                 if (!mcd)
675                         OBD_ALLOC(mcd, sizeof(*mcd));
676                 if (!mcd)
677                         GOTO(err_msd, rc = -ENOMEM);
678
679                 rc = lustre_fread(f, (char *)mcd, sizeof(*mcd), &off);
680                 if (rc != sizeof(*mcd)) {
681                         CERROR("error reading MDS %s offset %d: rc = %d\n",
682                                LAST_RCVD, cl_off, rc);
683                         if (rc > 0)
684                                 rc = -EIO;
685                         break;
686                 }
687
688                 last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
689                 last_mount = le64_to_cpu(mcd->mcd_mount_count);
690
691                 if (last_rcvd &&
692                     last_mount - mcd->mcd_mount_count < MDS_MOUNT_RECOV) {
693                         rc = mds_client_add(mds, mcd, cl_off);
694                         if (rc) {
695                                 rc = 0;
696                                 break;
697                         }
698                         mcd = NULL;
699                 } else {
700                         CDEBUG(D_INFO,
701                                "client at offset %d with UUID '%s' ignored\n",
702                                cl_off, mcd->mcd_uuid);
703                 }
704
705                 if (last_rcvd > mds->mds_last_rcvd) {
706                         CDEBUG(D_OTHER,
707                                "client at offset %d has last_rcvd = %Lu\n",
708                                cl_off, (unsigned long long)last_rcvd);
709                         mds->mds_last_rcvd = last_rcvd;
710                 }
711         }
712         CDEBUG(D_INODE, "got %Lu for highest last_rcvd value, %d clients\n",
713                (unsigned long long)mds->mds_last_rcvd, mds->mds_client_count);
714
715         /* After recovery, there can be no local uncommitted transactions */
716         mds->mds_last_committed = mds->mds_last_rcvd;
717
718         return 0;
719
720 err_msd:
721         mds_server_free_data(mds);
722         return rc;
723 }
724
725 static int mds_prep(struct obd_device *obddev)
726 {
727         struct obd_run_ctxt saved;
728         struct mds_obd *mds = &obddev->u.mds;
729         struct super_operations *s_ops;
730         struct dentry *dentry;
731         struct file *f;
732         int rc;
733
734         push_ctxt(&saved, &mds->mds_ctxt);
735         dentry = simple_mkdir(current->fs->pwd, "ROOT", 0700);
736         if (IS_ERR(dentry)) {
737                 rc = PTR_ERR(dentry);
738                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
739                 GOTO(err_pop, rc);
740         }
741         /* XXX probably want to hold on to this later... */
742         dput(dentry);
743         f = filp_open("ROOT", O_RDONLY, 0);
744         if (IS_ERR(f)) {
745                 rc = PTR_ERR(f);
746                 CERROR("cannot open ROOT: rc = %d\n", rc);
747                 LBUG();
748                 GOTO(err_pop, rc);
749         }
750
751         mds->mds_rootfid.id = f->f_dentry->d_inode->i_ino;
752         mds->mds_rootfid.generation = f->f_dentry->d_inode->i_generation;
753         mds->mds_rootfid.f_type = S_IFDIR;
754
755         rc = filp_close(f, 0);
756         if (rc) {
757                 CERROR("cannot close ROOT: rc = %d\n", rc);
758                 LBUG();
759         }
760
761         dentry = simple_mkdir(current->fs->pwd, "FH", 0700);
762         if (IS_ERR(dentry)) {
763                 rc = PTR_ERR(dentry);
764                 CERROR("cannot create FH directory: rc = %d\n", rc);
765                 GOTO(err_pop, rc);
766         }
767         /* XXX probably want to hold on to this later... */
768         dput(dentry);
769
770         rc = mds_init_client_data(mds);
771         if (rc)
772                 GOTO(err_pop, rc);
773
774         f = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
775         if (IS_ERR(f)) {
776                 rc = PTR_ERR(f);
777                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
778                 GOTO(err_pop, rc = PTR_ERR(f));
779         }
780         if (!S_ISREG(f->f_dentry->d_inode->i_mode)) {
781                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
782                        f->f_dentry->d_inode->i_mode);
783                 GOTO(err_pop, rc = -ENOENT);
784         }
785
786         rc = mds_fs_journal_data(mds, f);
787         if (rc) {
788                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
789                 GOTO(err_filp, rc);
790         }
791
792         rc = mds_read_last_rcvd(mds, f);
793         if (rc) {
794                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
795                 GOTO(err_client, rc);
796         }
797         mds->mds_rcvd_filp = f;
798         pop_ctxt(&saved);
799
800         /*
801          * Replace the client filesystem delete_inode method with our own,
802          * so that we can clear the object ID before the inode is deleted.
803          * The fs_delete_inode method will call cl_delete_inode for us.
804          *
805          * We need to do this for the MDS superblock only, hence we install
806          * a modified copy of the original superblock method table.
807          *
808          * We still assume that there is only a single MDS client filesystem
809          * type, as we don't have access to the mds struct in delete_inode
810          * and store the client delete_inode method in a global table.  This
811          * will only become a problem when multiple MDSs are running on a
812          * single host with different client filesystems.
813          */
814         OBD_ALLOC(s_ops, sizeof(*s_ops));
815         if (!s_ops)
816                 GOTO(err_filp, rc = -ENOMEM);
817
818         memcpy(s_ops, mds->mds_sb->s_op, sizeof(*s_ops));
819         mds->mds_fsops->cl_delete_inode = s_ops->delete_inode;
820         s_ops->delete_inode = mds->mds_fsops->fs_delete_inode;
821         mds->mds_sb->s_op = s_ops;
822
823         mds->mds_service = ptlrpc_init_svc(128 * 1024,
824                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
825                                            "self", mds_handle);
826
827         if (!mds->mds_service) {
828                 CERROR("failed to start service\n");
829                 GOTO(err_filp, rc = -EINVAL);
830         }
831
832         rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
833         if (rc) {
834                 CERROR("cannot start thread: rc = %d\n", rc);
835                 GOTO(err_svc, rc);
836         }
837
838         RETURN(0);
839
840 err_svc:
841         rpc_unregister_service(mds->mds_service);
842         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
843 err_client:
844         mds_client_free_all(mds);
845 err_filp:
846         if (filp_close(f, 0))
847                 CERROR("can't close %s after error\n", LAST_RCVD);
848 err_pop:
849         pop_ctxt(&saved);
850
851         return rc;
852 }
853
854 /* Update the server data on disk.  This stores the new mount_count and
855  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
856  * then the server last_rcvd value may be less than that of the clients.
857  * This will alert us that we may need to do client recovery.
858  */
859 int mds_update_server_data(struct mds_obd *mds)
860 {
861         struct obd_run_ctxt saved;
862         struct mds_server_data *msd = mds->mds_server_data;
863         struct file *filp = mds->mds_rcvd_filp;
864         loff_t off = 0;
865         int rc;
866
867         msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
868         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
869
870         CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_rcvd is %Lu\n",
871                (unsigned long long)mds->mds_mount_count,
872                (unsigned long long)mds->mds_last_rcvd);
873         push_ctxt(&saved, &mds->mds_ctxt);
874         rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
875         if (rc != sizeof(*msd)) {
876                 CERROR("error writing MDS server data: rc = %d\n", rc);
877                 if (rc > 0)
878                         RETURN(-EIO);
879                 RETURN(rc);
880         }
881         rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
882         pop_ctxt(&saved);
883         if (rc)
884                 CERROR("error flushing MDS server data: rc = %d\n", rc);
885
886         return 0;
887 }
888
889 /* Do recovery actions for the MDS */
890 static int mds_recover(struct obd_device *obddev)
891 {
892         struct mds_obd *mds = &obddev->u.mds;
893         int rc;
894
895         /* This happens at the end when recovery is complete */
896         ++mds->mds_mount_count;
897         rc = mds_update_server_data(mds);
898
899         return rc;
900 }
901
902 static int mds_cleanup(struct obd_device *obddev);
903
904 /* mount the file system (secretly) */
905 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
906 {
907         struct obd_ioctl_data* data = buf;
908         struct mds_obd *mds = &obddev->u.mds;
909         struct vfsmount *mnt;
910         int rc = 0;
911         ENTRY;
912
913 #ifdef CONFIG_DEV_RDONLY
914         dev_clear_rdonly(2);
915 #endif
916         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
917                 RETURN(-EINVAL);
918
919         mds->mds_fstype = strdup(data->ioc_inlbuf2);
920
921         if (!strcmp(mds->mds_fstype, "extN"))
922                 mds->mds_fsops = &mds_extN_fs_ops;
923         else if (!strcmp(mds->mds_fstype, "ext3"))
924                 mds->mds_fsops = &mds_ext3_fs_ops;
925         else if (!strcmp(mds->mds_fstype, "ext2"))
926                 mds->mds_fsops = &mds_ext2_fs_ops;
927         else {
928                 CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype);
929                 GOTO(err_kfree, rc = -EPERM);
930         }
931
932         MOD_INC_USE_COUNT;
933         mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
934         if (IS_ERR(mnt)) {
935                 rc = PTR_ERR(mnt);
936                 CERROR("do_kern_mount failed: rc = %d\n", rc);
937                 GOTO(err_dec, rc);
938         }
939
940         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
941         if (!mds->mds_sb)
942                 GOTO(err_put, rc = -ENODEV);
943
944         mds->mds_vfsmnt = mnt;
945         mds->mds_ctxt.pwdmnt = mnt;
946         mds->mds_ctxt.pwd = mnt->mnt_root;
947         mds->mds_ctxt.fs = KERNEL_DS;
948
949         rc = mds_prep(obddev);
950         if (rc)
951                 GOTO(err_put, rc);
952
953         rc = mds_recover(obddev);
954         if (rc) {
955                 mds_cleanup(obddev);
956                 RETURN(rc);
957         }
958
959         RETURN(0);
960
961 err_put:
962         unlock_kernel();
963         mntput(mds->mds_vfsmnt);
964         mds->mds_sb = 0;
965         lock_kernel();
966 err_dec:
967         MOD_DEC_USE_COUNT;
968 err_kfree:
969         kfree(mds->mds_fstype);
970         return rc;
971 }
972
973 static int mds_cleanup(struct obd_device * obddev)
974 {
975         struct super_operations *s_ops = NULL;
976         struct super_block *sb;
977         struct mds_obd *mds = &obddev->u.mds;
978
979         ENTRY;
980
981         if ( !list_empty(&obddev->obd_gen_clients) ) {
982                 CERROR("still has clients!\n");
983                 RETURN(-EBUSY);
984         }
985
986         ptlrpc_stop_all_threads(mds->mds_service);
987         rpc_unregister_service(mds->mds_service);
988         if (!list_empty(&mds->mds_service->srv_reqs)) {
989                 // XXX reply with errors and clean up
990                 CERROR("Request list not empty!\n");
991         }
992         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
993
994         sb = mds->mds_sb;
995         if (!mds->mds_sb)
996                 RETURN(0);
997
998         mds_client_free_all(mds);
999         mds_update_server_data(mds);
1000         mds_server_free_data(mds);
1001
1002         if (mds->mds_rcvd_filp) {
1003                 int rc = filp_close(mds->mds_rcvd_filp, 0);
1004                 mds->mds_rcvd_filp = NULL;
1005
1006                 if (rc)
1007                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
1008         }
1009         s_ops = sb->s_op;
1010
1011         unlock_kernel();
1012         mntput(mds->mds_vfsmnt);
1013         mds->mds_sb = 0;
1014         kfree(mds->mds_fstype);
1015         lock_kernel();
1016 #ifdef CONFIG_DEV_RDONLY
1017         dev_clear_rdonly(2);
1018 #endif
1019         OBD_FREE(s_ops, sizeof(*s_ops));
1020
1021         MOD_DEC_USE_COUNT;
1022         RETURN(0);
1023 }
1024
1025 /* use obd ops to offer management infrastructure */
1026 static struct obd_ops mds_obd_ops = {
1027         o_setup:       mds_setup,
1028         o_cleanup:     mds_cleanup,
1029 };
1030
1031 static int __init mds_init(void)
1032 {
1033         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
1034         return 0;
1035 }
1036
1037 static void __exit mds_exit(void)
1038 {
1039         obd_unregister_type(LUSTRE_MDS_NAME);
1040 }
1041
1042 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
1043 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
1044 MODULE_LICENSE("GPL");
1045
1046 module_init(mds_init);
1047 module_exit(mds_exit);