Whamcloud - gitweb
e0df185441b121767ad5281688ac76128e0f3bea
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *
6  *  Lustre Metadata Server (mds) request handler
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  *  This server is single threaded at present (but can easily be multi threaded)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/version.h>
22 #include <linux/module.h>
23 #include <linux/fs.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29
30 #define DEBUG_SUBSYSTEM S_MDS
31
32 #include <linux/lustre_mds.h>
33 #include <linux/lustre_lib.h>
34 #include <linux/lustre_net.h>
35
36 int mds_sendpage(struct ptlrpc_request *req, struct file *file,
37                  __u64 offset, struct niobuf *dst)
38 {
39         int rc = 0;
40         mm_segment_t oldfs = get_fs();
41
42         if (req->rq_peer.peer_nid == 0) {
43                 /* dst->addr is a user address, but in a different task! */
44                 char *buf = (char *)(long)dst->addr;
45
46                 set_fs(KERNEL_DS);
47                 rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
48                                      &offset);
49                 set_fs(oldfs);
50
51                 if (rc != PAGE_SIZE) {
52                         rc = -EIO;
53                         GOTO(out, rc);
54                 }
55                 EXIT;
56         } else {
57                 struct ptlrpc_bulk_desc *bulk;
58                 char *buf;
59
60                 bulk = ptlrpc_prep_bulk(&req->rq_peer);
61                 if (bulk == NULL) {
62                         rc = -ENOMEM;
63                         GOTO(out, rc);
64                 }
65
66                 bulk->b_xid = req->rq_xid;
67
68                 OBD_ALLOC(buf, PAGE_SIZE);
69                 if (!buf) {
70                         rc = -ENOMEM;
71                         GOTO(cleanup_bulk, rc);
72                 }
73
74                 set_fs(KERNEL_DS);
75                 rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
76                                      &offset);
77                 set_fs(oldfs);
78
79                 if (rc != PAGE_SIZE) {
80                         rc = -EIO;
81                         GOTO(cleanup_buf, rc);
82                 }
83
84                 bulk->b_buf = buf;
85                 bulk->b_buflen = PAGE_SIZE;
86
87                 rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
88                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
89                         CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
90                                OBD_FAIL_MDS_SENDPAGE, rc);
91                         PtlMDUnlink(bulk->b_md_h);
92                         GOTO(cleanup_buf, rc);
93                 }
94                 wait_event_interruptible(bulk->b_waitq,
95                                          ptlrpc_check_bulk_sent(bulk));
96
97                 if (bulk->b_flags == PTL_RPC_INTR) {
98                         rc = -EINTR;
99                         GOTO(cleanup_buf, rc);
100                 }
101
102                 EXIT;
103         cleanup_buf:
104                 OBD_FREE(buf, PAGE_SIZE);
105         cleanup_bulk:
106                 OBD_FREE(bulk, sizeof(*bulk));
107         }
108 out:
109         return rc;
110 }
111
112 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
113                               struct vfsmount **mnt)
114 {
115         /* stolen from NFS */
116         struct super_block *sb = mds->mds_sb;
117         unsigned long ino = fid->id;
118         __u32 generation = fid->generation;
119         struct inode *inode;
120         struct list_head *lp;
121         struct dentry *result;
122
123         if (ino == 0)
124                 return ERR_PTR(-ESTALE);
125
126         inode = iget(sb, ino);
127         if (inode == NULL)
128                 return ERR_PTR(-ENOMEM);
129
130         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
131
132         if (is_bad_inode(inode) ||
133             (generation && inode->i_generation != generation)) {
134                 /* we didn't find the right inode.. */
135                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
136                         inode->i_ino,
137                         inode->i_nlink, atomic_read(&inode->i_count),
138                         inode->i_generation,
139                         generation);
140                 LBUG();
141                 iput(inode);
142                 return ERR_PTR(-ESTALE);
143         }
144
145         /* now to find a dentry.
146          * If possible, get a well-connected one
147          */
148         if (mnt)
149                 *mnt = mds->mds_vfsmnt;
150         spin_lock(&dcache_lock);
151         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
152                 result = list_entry(lp,struct dentry, d_alias);
153                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
154                         dget_locked(result);
155                         result->d_vfs_flags |= DCACHE_REFERENCED;
156                         spin_unlock(&dcache_lock);
157                         iput(inode);
158                         if (mnt)
159                                 mntget(*mnt);
160                         return result;
161                 }
162         }
163         spin_unlock(&dcache_lock);
164         result = d_alloc_root(inode);
165         if (result == NULL) {
166                 iput(inode);
167                 return ERR_PTR(-ENOMEM);
168         }
169         if (mnt)
170                 mntget(*mnt);
171         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
172         return result;
173 }
174
175 int mds_getattr(struct ptlrpc_request *req)
176 {
177         struct dentry *de;
178         struct inode *inode;
179         struct mds_rep *rep;
180         struct mds_obd *mds = &req->rq_obd->u.mds;
181         int rc;
182
183         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
184                           &req->rq_replen, &req->rq_repbuf);
185         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
186                 CERROR("mds: out of memory\n");
187                 req->rq_status = -ENOMEM;
188                 RETURN(0);
189         }
190
191         req->rq_rephdr->xid = req->rq_reqhdr->xid;
192         rep = req->rq_rep.mds;
193
194         de = mds_fid2dentry(mds, &req->rq_req.mds->fid1, NULL);
195         if (IS_ERR(de)) {
196                 req->rq_rephdr->status = -ENOENT;
197                 RETURN(0);
198         }
199
200         inode = de->d_inode;
201         rep->ino = inode->i_ino;
202         rep->generation = inode->i_generation;
203         rep->atime = inode->i_atime;
204         rep->ctime = inode->i_ctime;
205         rep->mtime = inode->i_mtime;
206         rep->uid = inode->i_uid;
207         rep->gid = inode->i_gid;
208         rep->size = inode->i_size;
209         rep->mode = inode->i_mode;
210         rep->nlink = inode->i_nlink;
211         rep->valid = ~0;
212         mds_fs_get_objid(mds, inode, &rep->objid);
213         l_dput(de);
214         return 0;
215 }
216
217 int mds_open(struct ptlrpc_request *req)
218 {
219         struct dentry *de;
220         struct mds_rep *rep;
221         struct file *file;
222         struct vfsmount *mnt;
223         __u32 flags;
224         int rc;
225
226         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
227                           &req->rq_replen, &req->rq_repbuf);
228         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
229                 CERROR("mds: out of memory\n");
230                 req->rq_status = -ENOMEM;
231                 RETURN(0);
232         }
233
234         req->rq_rephdr->xid = req->rq_reqhdr->xid;
235         rep = req->rq_rep.mds;
236
237         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
238         if (IS_ERR(de)) {
239                 req->rq_rephdr->status = -ENOENT;
240                 RETURN(0);
241         }
242         flags = req->rq_req.mds->flags;
243         file = dentry_open(de, mnt, flags);
244         if (!file || IS_ERR(file)) {
245                 req->rq_rephdr->status = -EINVAL;
246                 RETURN(0);
247         }
248
249         rep->objid = (__u64) (unsigned long)file;
250         return 0;
251 }
252
253 int mds_close(struct ptlrpc_request *req)
254 {
255         struct dentry *de;
256         struct mds_rep *rep;
257         struct file *file;
258         struct vfsmount *mnt;
259         int rc;
260
261         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
262                           &req->rq_replen, &req->rq_repbuf);
263         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
264                 CERROR("mds: out of memory\n");
265                 req->rq_status = -ENOMEM;
266                 RETURN(0);
267         }
268
269         req->rq_rephdr->xid = req->rq_reqhdr->xid;
270         rep = req->rq_rep.mds;
271
272         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
273         if (IS_ERR(de)) {
274                 req->rq_rephdr->status = -ENOENT;
275                 RETURN(0);
276         }
277
278         file = (struct file *)(unsigned long) req->rq_req.mds->objid;
279
280         req->rq_rephdr->status = filp_close(file, 0);
281         l_dput(de);
282         mntput(mnt);
283         return 0;
284 }
285
286 int mds_readpage(struct ptlrpc_request *req)
287 {
288         struct vfsmount *mnt;
289         struct dentry *de;
290         struct file *file;
291         struct niobuf *niobuf;
292         struct mds_rep *rep;
293         int rc;
294
295         ENTRY;
296
297         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
298                           &req->rq_replen, &req->rq_repbuf);
299         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
300                 CERROR("mds: out of memory\n");
301                 req->rq_status = -ENOMEM;
302                 RETURN(0);
303         }
304
305         req->rq_rephdr->xid = req->rq_reqhdr->xid;
306         rep = req->rq_rep.mds;
307
308         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
309         if (IS_ERR(de)) {
310                 req->rq_rephdr->status = PTR_ERR(de);
311                 RETURN(0);
312         }
313
314         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
315
316         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
317         /* note: in case of an error, dentry_open puts dentry */
318         if (IS_ERR(file)) {
319                 req->rq_rephdr->status = PTR_ERR(file);
320                 RETURN(0);
321         }
322
323         niobuf = mds_req_tgt(req->rq_req.mds);
324
325         /* to make this asynchronous make sure that the handling function
326            doesn't send a reply when this function completes. Instead a
327            callback function would send the reply */
328         rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf);
329
330         filp_close(file, 0);
331         req->rq_rephdr->status = rc;
332         RETURN(0);
333 }
334
335 int mds_reint(struct ptlrpc_request *req)
336 {
337         char *buf;
338         int rc, len;
339         struct mds_update_record rec;
340
341         buf = mds_req_tgt(req->rq_req.mds);
342         len = req->rq_req.mds->tgtlen;
343
344         rc = mds_update_unpack(buf, len, &rec);
345         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
346                 CERROR("invalid record\n");
347                 req->rq_status = -EINVAL;
348                 RETURN(0);
349         }
350         /* rc will be used to interrupt a for loop over multiple records */
351         rc = mds_reint_rec(&rec, req);
352         return 0;
353 }
354
355 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
356                struct ptlrpc_request *req)
357 {
358         int rc;
359         struct ptlreq_hdr *hdr;
360
361         ENTRY;
362
363         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
364
365         if (NTOH__u32(hdr->type) != PTL_RPC_REQUEST) {
366                 CERROR("lustre_mds: wrong packet type sent %d\n",
367                        NTOH__u32(hdr->type));
368                 rc = -EINVAL;
369                 GOTO(out, rc);
370         }
371
372         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen,
373                             &req->rq_reqhdr, &req->rq_req);
374         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
375                 CERROR("lustre_mds: Invalid request\n");
376                 GOTO(out, rc);
377         }
378
379         switch (req->rq_reqhdr->opc) {
380
381         case MDS_GETATTR:
382                 CDEBUG(D_INODE, "getattr\n");
383                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
384                 rc = mds_getattr(req);
385                 break;
386
387         case MDS_READPAGE:
388                 CDEBUG(D_INODE, "readpage\n");
389                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
390                 rc = mds_readpage(req);
391
392                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
393                         return 0;
394                 break;
395
396         case MDS_REINT:
397                 CDEBUG(D_INODE, "reint\n");
398                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
399                 rc = mds_reint(req);
400                 break;
401
402         case MDS_OPEN:
403                 CDEBUG(D_INODE, "open\n");
404                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
405                 rc = mds_open(req);
406                 break;
407
408         case MDS_CLOSE:
409                 CDEBUG(D_INODE, "close\n");
410                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
411                 rc = mds_close(req);
412                 break;
413
414         default:
415                 rc = ptlrpc_error(dev, svc, req);
416                 RETURN(rc);
417         }
418
419         EXIT;
420 out:
421         if (rc) {
422                 CERROR("no header\n");
423                 LBUG();
424                 return 0;
425         }
426
427         if( req->rq_status) {
428                 ptlrpc_error(dev, svc, req);
429         } else {
430                 CDEBUG(D_NET, "sending reply\n");
431                 ptlrpc_reply(dev, svc, req);
432         }
433
434         return 0;
435 }
436
437 static int mds_prep(struct obd_device *obddev)
438 {
439         struct obd_run_ctxt saved;
440         struct mds_obd *mds = &obddev->u.mds;
441         struct super_operations *s_ops;
442         int err;
443
444         mds->mds_service = ptlrpc_init_svc(128 * 1024,
445                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
446                                            "self", mds_handle);
447
448         if (!mds->mds_service) {
449                 CERROR("failed to start service\n");
450                 RETURN(-EINVAL);
451         }
452
453         err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
454         if (err) {
455                 CERROR("cannot start thread\n");
456                 GOTO(err_svc, err);
457         }
458
459         push_ctxt(&saved, &mds->mds_ctxt);
460         err = simple_mkdir(current->fs->pwd, "ROOT", 0700);
461         err = simple_mkdir(current->fs->pwd, "FH", 0700);
462         pop_ctxt(&saved);
463
464         /*
465          * Replace the client filesystem delete_inode method with our own,
466          * so that we can clear the object ID before the inode is deleted.
467          * The fs_delete_inode method will call cl_delete_inode for us.
468          *
469          * We need to do this for the MDS superblock only, hence we install
470          * a modified copy of the original superblock method table.
471          *
472          * We still assume that there is only a single MDS client filesystem
473          * type, as we don't have access to the mds struct in * delete_inode.
474          */
475         OBD_ALLOC(s_ops, sizeof(*s_ops));
476         memcpy(s_ops, mds->mds_sb->s_op, sizeof(*s_ops));
477         mds->mds_fsops->cl_delete_inode = s_ops->delete_inode;
478         s_ops->delete_inode = mds->mds_fsops->fs_delete_inode;
479         mds->mds_sb->s_op = s_ops;
480
481         RETURN(0);
482
483 err_svc:
484         rpc_unregister_service(mds->mds_service);
485         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
486
487         return(err);
488 }
489
490 /* mount the file system (secretly) */
491 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
492 {
493         struct obd_ioctl_data* data = buf;
494         struct mds_obd *mds = &obddev->u.mds;
495         struct vfsmount *mnt;
496         int err = 0;
497         ENTRY;
498
499 #ifdef CONFIG_DEV_RDONLY
500         dev_clear_rdonly(2);
501 #endif
502         mds->mds_fstype = strdup(data->ioc_inlbuf2);
503
504         if (!strcmp(mds->mds_fstype, "ext3"))
505                 mds->mds_fsops = &mds_ext3_fs_ops;
506         else if (!strcmp(mds->mds_fstype, "ext2"))
507                 mds->mds_fsops = &mds_ext2_fs_ops;
508         else {
509                 CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype);
510                 GOTO(err_kfree, (err = -EPERM));
511         }
512
513         MOD_INC_USE_COUNT;
514         mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
515         if (IS_ERR(mnt)) {
516                 CERROR("do_kern_mount failed: %d\n", err);
517                 GOTO(err_dec, err = PTR_ERR(mnt));
518         }
519
520         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
521         if (!mds->mds_sb)
522                 GOTO(err_put, (err = -ENODEV));
523
524         mds->mds_vfsmnt = mnt;
525         mds->mds_ctxt.pwdmnt = mnt;
526         mds->mds_ctxt.pwd = mnt->mnt_root;
527         mds->mds_ctxt.fs = KERNEL_DS;
528
529         err = mds_prep(obddev);
530         if (err)
531                 GOTO(err_put, err);
532
533         RETURN(0);
534
535 err_put:
536         unlock_kernel();
537         mntput(mds->mds_vfsmnt);
538         mds->mds_sb = 0;
539         lock_kernel();
540 err_dec:
541         MOD_DEC_USE_COUNT;
542 err_kfree:
543         kfree(mds->mds_fstype);
544         return err;
545 }
546
547 static int mds_cleanup(struct obd_device * obddev)
548 {
549         struct super_operations *s_ops = NULL;
550         struct super_block *sb;
551         struct mds_obd *mds = &obddev->u.mds;
552
553         ENTRY;
554
555         if ( !list_empty(&obddev->obd_gen_clients) ) {
556                 CERROR("still has clients!\n");
557                 RETURN(-EBUSY);
558         }
559
560         ptlrpc_stop_thread(mds->mds_service);
561         rpc_unregister_service(mds->mds_service);
562         if (!list_empty(&mds->mds_service->srv_reqs)) {
563                 // XXX reply with errors and clean up
564                 CERROR("Request list not empty!\n");
565         }
566         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
567
568         sb = mds->mds_sb;
569         if (!mds->mds_sb)
570                 RETURN(0);
571
572         s_ops = sb->s_op;
573
574         unlock_kernel();
575         mntput(mds->mds_vfsmnt);
576         mds->mds_sb = 0;
577         kfree(mds->mds_fstype);
578         lock_kernel();
579 #ifdef CONFIG_DEV_RDONLY
580         dev_clear_rdonly(2);
581 #endif
582         OBD_FREE(s_ops, sizeof(*s_ops));
583
584         MOD_DEC_USE_COUNT;
585         RETURN(0);
586 }
587
588 /* use obd ops to offer management infrastructure */
589 static struct obd_ops mds_obd_ops = {
590         o_setup:       mds_setup,
591         o_cleanup:     mds_cleanup,
592 };
593
594 static int __init mds_init(void)
595 {
596         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
597         return 0;
598 }
599
600 static void __exit mds_exit(void)
601 {
602         obd_unregister_type(LUSTRE_MDS_NAME);
603 }
604
605 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
606 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
607 MODULE_LICENSE("GPL");
608
609 module_init(mds_init);
610 module_exit(mds_exit);