Whamcloud - gitweb
Add mount_count file to hold the current MDS generation number. Note that
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *
6  *  Lustre Metadata Server (mds) request handler
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  *  This server is single threaded at present (but can easily be multi threaded)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/version.h>
22 #include <linux/module.h>
23 #include <linux/fs.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29
30 #define DEBUG_SUBSYSTEM S_MDS
31
32 #include <linux/lustre_mds.h>
33 #include <linux/lustre_lib.h>
34 #include <linux/lustre_net.h>
35
36 int mds_sendpage(struct ptlrpc_request *req, struct file *file,
37                  __u64 offset, struct niobuf *dst)
38 {
39         int rc = 0;
40         mm_segment_t oldfs = get_fs();
41         struct ptlrpc_bulk_desc *bulk;
42         char *buf;
43
44         bulk = ptlrpc_prep_bulk(req->rq_connection);
45         if (bulk == NULL) {
46                 rc = -ENOMEM;
47                 GOTO(out, rc);
48         }
49
50         bulk->b_xid = req->rq_reqmsg->xid;
51
52         OBD_ALLOC(buf, PAGE_SIZE);
53         if (!buf) {
54                 rc = -ENOMEM;
55                 GOTO(cleanup_bulk, rc);
56         }
57
58         set_fs(KERNEL_DS);
59         rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
60                              &offset);
61         set_fs(oldfs);
62
63         if (rc != PAGE_SIZE) {
64                 rc = -EIO;
65                 GOTO(cleanup_buf, rc);
66         }
67
68         bulk->b_buf = buf;
69         bulk->b_buflen = PAGE_SIZE;
70
71         rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
72         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
73                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
74                        OBD_FAIL_MDS_SENDPAGE, rc);
75                 PtlMDUnlink(bulk->b_md_h);
76                 GOTO(cleanup_buf, rc);
77         }
78         wait_event_interruptible(bulk->b_waitq,
79                                  ptlrpc_check_bulk_sent(bulk));
80
81         if (bulk->b_flags == PTL_RPC_INTR) {
82                 rc = -EINTR;
83                 GOTO(cleanup_buf, rc);
84         }
85
86         EXIT;
87  cleanup_buf:
88         OBD_FREE(buf, PAGE_SIZE);
89  cleanup_bulk:
90         OBD_FREE(bulk, sizeof(*bulk));
91  out:
92         return rc;
93 }
94
95 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
96                               struct vfsmount **mnt)
97 {
98         /* stolen from NFS */
99         struct super_block *sb = mds->mds_sb;
100         unsigned long ino = fid->id;
101         __u32 generation = fid->generation;
102         struct inode *inode;
103         struct list_head *lp;
104         struct dentry *result;
105
106         if (ino == 0)
107                 return ERR_PTR(-ESTALE);
108
109         inode = iget(sb, ino);
110         if (inode == NULL)
111                 return ERR_PTR(-ENOMEM);
112
113         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
114
115         if (is_bad_inode(inode) ||
116             (generation && inode->i_generation != generation)) {
117                 /* we didn't find the right inode.. */
118                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
119                         inode->i_ino,
120                         inode->i_nlink, atomic_read(&inode->i_count),
121                         inode->i_generation,
122                         generation);
123                 LBUG();
124                 iput(inode);
125                 return ERR_PTR(-ESTALE);
126         }
127
128         /* now to find a dentry.
129          * If possible, get a well-connected one
130          */
131         if (mnt)
132                 *mnt = mds->mds_vfsmnt;
133         spin_lock(&dcache_lock);
134         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
135                 result = list_entry(lp,struct dentry, d_alias);
136                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
137                         dget_locked(result);
138                         result->d_vfs_flags |= DCACHE_REFERENCED;
139                         spin_unlock(&dcache_lock);
140                         iput(inode);
141                         if (mnt)
142                                 mntget(*mnt);
143                         return result;
144                 }
145         }
146         spin_unlock(&dcache_lock);
147         result = d_alloc_root(inode);
148         if (result == NULL) {
149                 iput(inode);
150                 return ERR_PTR(-ENOMEM);
151         }
152         if (mnt)
153                 mntget(*mnt);
154         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
155         return result;
156 }
157
158 int mds_getattr(struct ptlrpc_request *req)
159 {
160         struct dentry *de;
161         struct inode *inode;
162         struct mds_body *body;
163         struct mds_obd *mds = &req->rq_obd->u.mds;
164         int rc, size = sizeof(*body);
165         ENTRY;
166
167         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
168         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
169                 CERROR("mds: out of memory\n");
170                 req->rq_status = -ENOMEM;
171                 RETURN(0);
172         }
173
174         body = lustre_msg_buf(req->rq_reqmsg, 0);
175         de = mds_fid2dentry(mds, &body->fid1, NULL);
176         if (IS_ERR(de)) {
177                 req->rq_status = -ENOENT;
178                 RETURN(0);
179         }
180
181         body = lustre_msg_buf(req->rq_repmsg, 0);
182         inode = de->d_inode;
183         body->ino = inode->i_ino;
184         body->generation = inode->i_generation;
185         body->atime = inode->i_atime;
186         body->ctime = inode->i_ctime;
187         body->mtime = inode->i_mtime;
188         body->uid = inode->i_uid;
189         body->gid = inode->i_gid;
190         body->size = inode->i_size;
191         body->mode = inode->i_mode;
192         body->nlink = inode->i_nlink;
193         body->valid = ~0;
194         mds_fs_get_objid(mds, inode, &body->objid);
195         l_dput(de);
196         RETURN(0);
197 }
198
199 int mds_open(struct ptlrpc_request *req)
200 {
201         struct dentry *de;
202         struct mds_body *body;
203         struct file *file;
204         struct vfsmount *mnt;
205         __u32 flags;
206         int rc, size = sizeof(*body);
207         ENTRY;
208
209         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
210         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
211                 CERROR("mds: out of memory\n");
212                 req->rq_status = -ENOMEM;
213                 RETURN(0);
214         }
215
216         body = lustre_msg_buf(req->rq_reqmsg, 0);
217         de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
218         if (IS_ERR(de)) {
219                 req->rq_status = -ENOENT;
220                 RETURN(0);
221         }
222         flags = body->flags;
223         file = dentry_open(de, mnt, flags);
224         if (!file || IS_ERR(file)) {
225                 req->rq_status = -EINVAL;
226                 RETURN(0);
227         }
228
229         body = lustre_msg_buf(req->rq_repmsg, 0);
230         body->objid = (__u64) (unsigned long)file;
231         RETURN(0);
232 }
233
234 int mds_close(struct ptlrpc_request *req)
235 {
236         struct dentry *de;
237         struct mds_body *body;
238         struct file *file;
239         struct vfsmount *mnt;
240         int rc;
241         ENTRY;
242
243         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
244         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
245                 CERROR("mds: out of memory\n");
246                 req->rq_status = -ENOMEM;
247                 RETURN(0);
248         }
249
250         body = lustre_msg_buf(req->rq_reqmsg, 0);
251         de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
252         if (IS_ERR(de)) {
253                 req->rq_status = -ENOENT;
254                 RETURN(0);
255         }
256
257         file = (struct file *)(unsigned long)body->objid;
258         req->rq_status = filp_close(file, 0);
259         l_dput(de);
260         mntput(mnt);
261
262         RETURN(0);
263 }
264
265 int mds_readpage(struct ptlrpc_request *req)
266 {
267         struct vfsmount *mnt;
268         struct dentry *de;
269         struct file *file;
270         struct niobuf *niobuf;
271         struct mds_body *body;
272         int rc, size = sizeof(*body);
273         ENTRY;
274
275         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
276         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
277                 CERROR("mds: out of memory\n");
278                 req->rq_status = -ENOMEM;
279                 RETURN(0);
280         }
281
282         body = lustre_msg_buf(req->rq_reqmsg, 0);
283         de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
284         if (IS_ERR(de)) {
285                 req->rq_status = PTR_ERR(de);
286                 RETURN(0);
287         }
288
289         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
290
291         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
292         /* note: in case of an error, dentry_open puts dentry */
293         if (IS_ERR(file)) {
294                 req->rq_status = PTR_ERR(file);
295                 RETURN(0);
296         }
297
298         niobuf = lustre_msg_buf(req->rq_reqmsg, 1);
299         if (!niobuf) {
300                 req->rq_status = -EINVAL;
301                 LBUG();
302                 RETURN(0);
303         }
304
305         /* to make this asynchronous make sure that the handling function
306            doesn't send a reply when this function completes. Instead a
307            callback function would send the reply */
308         rc = mds_sendpage(req, file, body->size, niobuf);
309
310         filp_close(file, 0);
311         req->rq_status = rc;
312         RETURN(0);
313 }
314
315 int mds_reint(struct ptlrpc_request *req)
316 {
317         int rc;
318         struct mds_update_record rec;
319
320         rc = mds_update_unpack(req, &rec);
321         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
322                 CERROR("invalid record\n");
323                 req->rq_status = -EINVAL;
324                 RETURN(0);
325         }
326         /* rc will be used to interrupt a for loop over multiple records */
327         rc = mds_reint_rec(&rec, req);
328         return 0;
329 }
330
331 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
332                struct ptlrpc_request *req)
333 {
334         int rc;
335         ENTRY;
336
337         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
338         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
339                 CERROR("lustre_mds: Invalid request\n");
340                 GOTO(out, rc);
341         }
342
343         if (req->rq_reqmsg->type != PTL_RPC_REQUEST) {
344                 CERROR("lustre_mds: wrong packet type sent %d\n",
345                        req->rq_reqmsg->type);
346                 GOTO(out, rc = -EINVAL);
347         }
348
349         switch (req->rq_reqmsg->opc) {
350         case MDS_GETATTR:
351                 CDEBUG(D_INODE, "getattr\n");
352                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
353                 rc = mds_getattr(req);
354                 break;
355
356         case MDS_READPAGE:
357                 CDEBUG(D_INODE, "readpage\n");
358                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
359                 rc = mds_readpage(req);
360
361                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
362                         return 0;
363                 break;
364
365         case MDS_REINT:
366                 CDEBUG(D_INODE, "reint\n");
367                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
368                 rc = mds_reint(req);
369                 break;
370
371         case MDS_OPEN:
372                 CDEBUG(D_INODE, "open\n");
373                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
374                 rc = mds_open(req);
375                 break;
376
377         case MDS_CLOSE:
378                 CDEBUG(D_INODE, "close\n");
379                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
380                 rc = mds_close(req);
381                 break;
382
383         default:
384                 rc = ptlrpc_error(svc, req);
385                 RETURN(rc);
386         }
387
388         EXIT;
389 out:
390         if (rc) {
391                 ptlrpc_error(svc, req);
392         } else {
393                 CDEBUG(D_NET, "sending reply\n");
394                 ptlrpc_reply(svc, req);
395         }
396
397         return 0;
398 }
399
400 static int mds_prep(struct obd_device *obddev)
401 {
402         struct obd_run_ctxt saved;
403         struct mds_obd *mds = &obddev->u.mds;
404         struct super_operations *s_ops;
405         struct file *f;
406         loff_t off = 0;
407         __u64 mount_count;
408         int rc;
409
410         mds->mds_service = ptlrpc_init_svc(128 * 1024,
411                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
412                                            "self", mds_handle);
413
414         if (!mds->mds_service) {
415                 CERROR("failed to start service\n");
416                 RETURN(-EINVAL);
417         }
418
419         rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
420         if (rc) {
421                 CERROR("cannot start thread\n");
422                 GOTO(err_svc, rc);
423         }
424
425         push_ctxt(&saved, &mds->mds_ctxt);
426         rc = simple_mkdir(current->fs->pwd, "ROOT", 0700);
427         if (rc && rc != -EEXIST) {
428                 CERROR("cannot create ROOT directory\n");
429                 GOTO(err_svc, rc);
430         }
431         rc = simple_mkdir(current->fs->pwd, "FH", 0700);
432         if (rc && rc != -EEXIST) {
433                 CERROR("cannot create FH directory\n");
434                 GOTO(err_svc, rc);
435         }
436
437         f = filp_open("mount_count", O_RDWR | O_CREAT, 0644);
438         if (IS_ERR(f)) {
439                 CERROR("cannot open/create mount_count file, rc = %ld\n",
440                        PTR_ERR(f));
441                 GOTO(err_svc, rc = PTR_ERR(f));
442         }
443         rc = lustre_fread(f, (char *)&mount_count, sizeof(mount_count), &off);
444         if (rc == 0) {
445                 CERROR("empty MDS mount_count, new MDS?\n");
446                 /* XXX maybe this should just be a random number? */
447                 mds->mds_mount_count = 0;
448         } else if (rc != sizeof(mount_count)) {
449                 CERROR("error reading mount_count: rc = %d\n", rc);
450                 /* XXX maybe this should just be a random number? */
451                 mds->mds_mount_count = 0;
452         } else {
453                 mds->mds_mount_count = le64_to_cpu(mount_count);
454         }
455
456         mds->mds_mount_count++;
457         CDEBUG(D_SUPER, "MDS mount_count is %Ld\n", mds->mds_mount_count);
458         off = 0;
459         mount_count = cpu_to_le64(mds->mds_mount_count);
460         rc = lustre_fwrite(f, (char *)&mount_count, sizeof(mount_count), &off);
461         if (rc != sizeof(mount_count))
462                 CERROR("error writing mount_count: rc = %d\n", rc);
463         rc = filp_close(f, 0);
464         if (rc)
465                 CERROR("error closing mount_count: rc = %d\n", rc);
466
467         f = filp_open("last_rcvd", O_RDWR | O_CREAT, 0644);
468         if (IS_ERR(f)) {
469                 CERROR("cannot open/create last_rcvd file\n");
470                 GOTO(err_svc, rc = PTR_ERR(f));
471         }
472         mds->mds_last_rcvd = f;
473         pop_ctxt(&saved);
474
475         /*
476          * Replace the client filesystem delete_inode method with our own,
477          * so that we can clear the object ID before the inode is deleted.
478          * The fs_delete_inode method will call cl_delete_inode for us.
479          *
480          * We need to do this for the MDS superblock only, hence we install
481          * a modified copy of the original superblock method table.
482          *
483          * We still assume that there is only a single MDS client filesystem
484          * type, as we don't have access to the mds struct in * delete_inode.
485          */
486         OBD_ALLOC(s_ops, sizeof(*s_ops));
487         memcpy(s_ops, mds->mds_sb->s_op, sizeof(*s_ops));
488         mds->mds_fsops->cl_delete_inode = s_ops->delete_inode;
489         s_ops->delete_inode = mds->mds_fsops->fs_delete_inode;
490         mds->mds_sb->s_op = s_ops;
491
492         RETURN(0);
493
494 err_svc:
495         rpc_unregister_service(mds->mds_service);
496         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
497
498         return rc;
499 }
500
501 /* mount the file system (secretly) */
502 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
503 {
504         struct obd_ioctl_data* data = buf;
505         struct mds_obd *mds = &obddev->u.mds;
506         struct vfsmount *mnt;
507         int err = 0;
508         ENTRY;
509
510 #ifdef CONFIG_DEV_RDONLY
511         dev_clear_rdonly(2);
512 #endif
513         mds->mds_fstype = strdup(data->ioc_inlbuf2);
514
515         if (!strcmp(mds->mds_fstype, "ext3"))
516                 mds->mds_fsops = &mds_ext3_fs_ops;
517         else if (!strcmp(mds->mds_fstype, "ext2"))
518                 mds->mds_fsops = &mds_ext2_fs_ops;
519         else {
520                 CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype);
521                 GOTO(err_kfree, (err = -EPERM));
522         }
523
524         MOD_INC_USE_COUNT;
525         mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
526         if (IS_ERR(mnt)) {
527                 CERROR("do_kern_mount failed: %d\n", err);
528                 GOTO(err_dec, err = PTR_ERR(mnt));
529         }
530
531         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
532         if (!mds->mds_sb)
533                 GOTO(err_put, (err = -ENODEV));
534
535         mds->mds_vfsmnt = mnt;
536         mds->mds_ctxt.pwdmnt = mnt;
537         mds->mds_ctxt.pwd = mnt->mnt_root;
538         mds->mds_ctxt.fs = KERNEL_DS;
539
540         err = mds_prep(obddev);
541         if (err)
542                 GOTO(err_put, err);
543
544         RETURN(0);
545
546 err_put:
547         unlock_kernel();
548         mntput(mds->mds_vfsmnt);
549         mds->mds_sb = 0;
550         lock_kernel();
551 err_dec:
552         MOD_DEC_USE_COUNT;
553 err_kfree:
554         kfree(mds->mds_fstype);
555         return err;
556 }
557
558 static int mds_cleanup(struct obd_device * obddev)
559 {
560         struct super_operations *s_ops = NULL;
561         struct super_block *sb;
562         struct mds_obd *mds = &obddev->u.mds;
563
564         ENTRY;
565
566         if ( !list_empty(&obddev->obd_gen_clients) ) {
567                 CERROR("still has clients!\n");
568                 RETURN(-EBUSY);
569         }
570
571         ptlrpc_stop_thread(mds->mds_service);
572         rpc_unregister_service(mds->mds_service);
573         if (!list_empty(&mds->mds_service->srv_reqs)) {
574                 // XXX reply with errors and clean up
575                 CERROR("Request list not empty!\n");
576         }
577         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
578
579         sb = mds->mds_sb;
580         if (!mds->mds_sb)
581                 RETURN(0);
582
583         if (mds->mds_last_rcvd) {
584                 int rc = filp_close(mds->mds_last_rcvd, 0);
585                 mds->mds_last_rcvd = NULL;
586
587                 if (rc)
588                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
589         }
590         s_ops = sb->s_op;
591
592         unlock_kernel();
593         mntput(mds->mds_vfsmnt);
594         mds->mds_sb = 0;
595         kfree(mds->mds_fstype);
596         lock_kernel();
597 #ifdef CONFIG_DEV_RDONLY
598         dev_clear_rdonly(2);
599 #endif
600         OBD_FREE(s_ops, sizeof(*s_ops));
601
602         MOD_DEC_USE_COUNT;
603         RETURN(0);
604 }
605
606 /* use obd ops to offer management infrastructure */
607 static struct obd_ops mds_obd_ops = {
608         o_setup:       mds_setup,
609         o_cleanup:     mds_cleanup,
610 };
611
612 static int __init mds_init(void)
613 {
614         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
615         return 0;
616 }
617
618 static void __exit mds_exit(void)
619 {
620         obd_unregister_type(LUSTRE_MDS_NAME);
621 }
622
623 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
624 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
625 MODULE_LICENSE("GPL");
626
627 module_init(mds_init);
628 module_exit(mds_exit);