Whamcloud - gitweb
Simple journal abstractions for the no-journal (ext2) and ext3 cases.
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *
6  *  Lustre Metadata Server (mds) request handler
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  *  This server is single threaded at present (but can easily be multi threaded)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/version.h>
22 #include <linux/module.h>
23 #include <linux/fs.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/ext2_fs.h>
27 #include <linux/quotaops.h>
28 #include <asm/unistd.h>
29 #include <asm/uaccess.h>
30
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_net.h>
36
37 struct buffer_head *ext3_bread(void *handle, struct inode *inode,
38                                int block, int create, int *err);
39
40 int mds_sendpage(struct ptlrpc_request *req, struct file *file,
41                  __u64 offset, struct niobuf *dst)
42 {
43         int rc = 0;
44         mm_segment_t oldfs = get_fs();
45
46         OBD_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, -EIO);
47
48         if (req->rq_peer.peer_nid == 0) {
49                 struct inode *inode = file->f_dentry->d_inode;
50                 char *buf = (char *)(long)dst->addr;
51
52                 /* dst->addr is a user address, but in a different task! */
53                 set_fs(KERNEL_DS);
54                 /* FIXME: we need to use ext3_bread because ext3 does not
55                  *        have the directories in page cache yet.  If we
56                  *        just use generic_file_read() then the pages we
57                  *        get are in a different address space than those
58                  *        used by the filesystem == cache incoherency.
59                  */
60                 if (S_ISREG(inode->i_mode))
61                         rc = file->f_op->read(file, buf, PAGE_SIZE, &offset);
62                 else if (!strcmp(inode->i_sb->s_type->name, "ext3")) {
63                         struct buffer_head *bh;
64
65                         bh = ext3_bread(NULL, inode,
66                                         offset >> inode->i_sb->s_blocksize_bits,
67                                         0, &rc);
68
69                         if (bh) {
70                                 memcpy(buf, bh->b_data, inode->i_blksize);
71                                 brelse(bh);
72                                 rc = inode->i_blksize;
73                         }
74                 } else
75                         rc = generic_file_read(file, buf, PAGE_SIZE, &offset);
76
77                 set_fs(oldfs);
78
79                 if (rc != PAGE_SIZE) {
80                         rc = -EIO;
81                         GOTO(out, rc);
82                 }
83                 EXIT;
84         } else {
85                 struct inode *inode = file->f_dentry->d_inode;
86                 struct ptlrpc_bulk_desc *bulk;
87                 char *buf;
88
89                 bulk = ptlrpc_prep_bulk(&req->rq_peer);
90                 if (bulk == NULL) {
91                         rc = -ENOMEM;
92                         GOTO(out, rc);
93                 }
94
95                 bulk->b_xid = req->rq_xid;
96
97                 OBD_ALLOC(buf, PAGE_SIZE);
98                 if (!buf) {
99                         rc = -ENOMEM;
100                         GOTO(cleanup_bulk, rc);
101                 }
102
103                 set_fs(KERNEL_DS);
104                 /* FIXME: see comments above */
105                 if (S_ISREG(inode->i_mode))
106                         rc = file->f_op->read(file, buf, PAGE_SIZE, &offset);
107                 else if (!strcmp(inode->i_sb->s_type->name, "ext3")) {
108                         struct buffer_head *bh;
109
110                         bh = ext3_bread(NULL, inode, offset >> inode->i_blkbits,
111                                         0, &rc);
112
113                         if (bh) {
114                                 memcpy(buf, bh->b_data, inode->i_blksize);
115                                 brelse(bh);
116                                 rc = inode->i_blksize;
117                         }
118                 } else
119                         rc = generic_file_read(file, buf, PAGE_SIZE, &offset);
120
121                 set_fs(oldfs);
122
123                 if (rc != PAGE_SIZE) {
124                         rc = -EIO;
125                         GOTO(cleanup_buf, rc);
126                 }
127
128                 bulk->b_buf = buf;
129                 bulk->b_buflen = PAGE_SIZE;
130
131                 rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
132                 wait_event_interruptible(bulk->b_waitq,
133                                          ptlrpc_check_bulk_sent(bulk));
134
135                 if (bulk->b_flags == PTL_RPC_INTR) {
136                         rc = -EINTR;
137                         GOTO(cleanup_buf, rc);
138                 }
139
140                 EXIT;
141         cleanup_buf:
142                 OBD_FREE(buf, PAGE_SIZE);
143         cleanup_bulk:
144                 OBD_FREE(bulk, sizeof(*bulk));
145         }
146 out:
147         return rc;
148 }
149
150 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
151                               struct vfsmount **mnt)
152 {
153         /* stolen from NFS */
154         struct super_block *sb = mds->mds_sb;
155         unsigned long ino = fid->id;
156         //__u32 generation = fid->generation;
157         __u32 generation = 0;
158         struct inode *inode;
159         struct list_head *lp;
160         struct dentry *result;
161
162         if (ino == 0)
163                 return ERR_PTR(-ESTALE);
164
165         inode = iget(sb, ino);
166         if (inode == NULL)
167                 return ERR_PTR(-ENOMEM);
168
169         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
170
171         if (is_bad_inode(inode) ||
172             (generation && inode->i_generation != generation)) {
173                 /* we didn't find the right inode.. */
174                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
175                         inode->i_ino,
176                         inode->i_nlink, atomic_read(&inode->i_count),
177                         inode->i_generation,
178                         generation);
179                 LBUG();
180                 iput(inode);
181                 return ERR_PTR(-ESTALE);
182         }
183
184         /* now to find a dentry.
185          * If possible, get a well-connected one
186          */
187         if (mnt)
188                 *mnt = mds->mds_vfsmnt;
189         spin_lock(&dcache_lock);
190         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
191                 result = list_entry(lp,struct dentry, d_alias);
192                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
193                         dget_locked(result);
194                         result->d_vfs_flags |= DCACHE_REFERENCED;
195                         spin_unlock(&dcache_lock);
196                         iput(inode);
197                         if (mnt)
198                                 mntget(*mnt);
199                         return result;
200                 }
201         }
202         spin_unlock(&dcache_lock);
203         result = d_alloc_root(inode);
204         if (result == NULL) {
205                 iput(inode);
206                 return ERR_PTR(-ENOMEM);
207         }
208         if (mnt)
209                 mntget(*mnt);
210         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
211         return result;
212 }
213
214 static inline void mds_get_objid(struct inode *inode, __u64 *id)
215 {
216         /* FIXME: it is only by luck that this works on ext3 */
217         memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
218 }
219
220 int mds_getattr(struct ptlrpc_request *req)
221 {
222         struct dentry *de;
223         struct inode *inode;
224         struct mds_rep *rep;
225         int rc;
226
227         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
228                           &req->rq_replen, &req->rq_repbuf);
229         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
230                 CERROR("mds: out of memory\n");
231                 req->rq_status = -ENOMEM;
232                 RETURN(0);
233         }
234
235         req->rq_rephdr->xid = req->rq_reqhdr->xid;
236         rep = req->rq_rep.mds;
237
238         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, NULL);
239         if (IS_ERR(de)) {
240                 req->rq_rephdr->status = -ENOENT;
241                 RETURN(0);
242         }
243
244         inode = de->d_inode;
245         rep->ino = inode->i_ino;
246         rep->atime = inode->i_atime;
247         rep->ctime = inode->i_ctime;
248         rep->mtime = inode->i_mtime;
249         rep->uid = inode->i_uid;
250         rep->gid = inode->i_gid;
251         rep->size = inode->i_size;
252         rep->mode = inode->i_mode;
253         rep->nlink = inode->i_nlink;
254         rep->valid = ~0;
255         mds_get_objid(inode, &rep->objid);
256         dput(de);
257         return 0;
258 }
259
260 int mds_open(struct ptlrpc_request *req)
261 {
262         struct dentry *de;
263         struct mds_rep *rep;
264         struct file *file;
265         struct vfsmount *mnt;
266         __u32 flags;
267         int rc;
268
269         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
270                           &req->rq_replen, &req->rq_repbuf);
271         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
272                 CERROR("mds: out of memory\n");
273                 req->rq_status = -ENOMEM;
274                 RETURN(0);
275         }
276
277         req->rq_rephdr->xid = req->rq_reqhdr->xid;
278         rep = req->rq_rep.mds;
279
280         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
281         if (IS_ERR(de)) {
282                 req->rq_rephdr->status = -ENOENT;
283                 RETURN(0);
284         }
285         flags = req->rq_req.mds->flags;
286         file = dentry_open(de, mnt, flags);
287         if (!file || IS_ERR(file)) {
288                 req->rq_rephdr->status = -EINVAL;
289                 RETURN(0);
290         }
291
292         rep->objid = (__u64) (unsigned long)file;
293         return 0;
294 }
295
296 int mds_close(struct ptlrpc_request *req)
297 {
298         struct dentry *de;
299         struct mds_rep *rep;
300         struct file *file;
301         struct vfsmount *mnt;
302         int rc;
303
304         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
305                           &req->rq_replen, &req->rq_repbuf);
306         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
307                 CERROR("mds: out of memory\n");
308                 req->rq_status = -ENOMEM;
309                 RETURN(0);
310         }
311
312         req->rq_rephdr->xid = req->rq_reqhdr->xid;
313         rep = req->rq_rep.mds;
314
315         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
316         if (IS_ERR(de)) {
317                 req->rq_rephdr->status = -ENOENT;
318                 RETURN(0);
319         }
320
321         file = (struct file *)(unsigned long) req->rq_req.mds->objid;
322
323         req->rq_rephdr->status = filp_close(file, 0);
324         dput(de);
325         mntput(mnt);
326         return 0;
327 }
328
329
330 int mds_readpage(struct ptlrpc_request *req)
331 {
332         struct vfsmount *mnt;
333         struct dentry *de;
334         struct file *file;
335         struct niobuf *niobuf;
336         struct mds_rep *rep;
337         int rc;
338
339         ENTRY;
340
341         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
342                           &req->rq_replen, &req->rq_repbuf);
343         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
344                 CERROR("mds: out of memory\n");
345                 req->rq_status = -ENOMEM;
346                 RETURN(0);
347         }
348
349         req->rq_rephdr->xid = req->rq_reqhdr->xid;
350         rep = req->rq_rep.mds;
351
352         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
353         if (IS_ERR(de)) {
354                 req->rq_rephdr->status = PTR_ERR(de);
355                 RETURN(0);
356         }
357
358         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
359
360         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
361         /* note: in case of an error, dentry_open puts dentry */
362         if (IS_ERR(file)) {
363                 req->rq_rephdr->status = PTR_ERR(file);
364                 RETURN(0);
365         }
366
367         niobuf = mds_req_tgt(req->rq_req.mds);
368
369         /* to make this asynchronous make sure that the handling function
370            doesn't send a reply when this function completes. Instead a
371            callback function would send the reply */
372         rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf);
373
374         filp_close(file, 0);
375         req->rq_rephdr->status = rc;
376         RETURN(0);
377 }
378
379 int mds_reint(struct ptlrpc_request *req)
380 {
381         char *buf;
382         int rc, len;
383         struct mds_update_record rec;
384
385         buf = mds_req_tgt(req->rq_req.mds);
386         len = req->rq_req.mds->tgtlen;
387
388         rc = mds_update_unpack(buf, len, &rec);
389         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
390                 CERROR("invalid record\n");
391                 req->rq_status = -EINVAL;
392                 RETURN(0);
393         }
394         /* rc will be used to interrupt a for loop over multiple records */
395         rc = mds_reint_rec(&rec, req);
396         return 0;
397 }
398
399 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
400                struct ptlrpc_request *req)
401 {
402         int rc;
403         struct ptlreq_hdr *hdr;
404
405         ENTRY;
406
407         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
408
409         if (NTOH__u32(hdr->type) != PTL_RPC_REQUEST) {
410                 CERROR("lustre_mds: wrong packet type sent %d\n",
411                        NTOH__u32(hdr->type));
412                 rc = -EINVAL;
413                 GOTO(out, rc);
414         }
415
416         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen,
417                             &req->rq_reqhdr, &req->rq_req);
418         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
419                 CERROR("lustre_mds: Invalid request\n");
420                 GOTO(out, rc);
421         }
422
423         switch (req->rq_reqhdr->opc) {
424
425         case MDS_GETATTR:
426                 CDEBUG(D_INODE, "getattr\n");
427                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
428                 rc = mds_getattr(req);
429                 break;
430
431         case MDS_READPAGE:
432                 CDEBUG(D_INODE, "readpage\n");
433                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
434                 rc = mds_readpage(req);
435                 break;
436
437         case MDS_REINT:
438                 CDEBUG(D_INODE, "reint\n");
439                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
440                 rc = mds_reint(req);
441                 break;
442
443         case MDS_OPEN:
444                 CDEBUG(D_INODE, "open\n");
445                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
446                 rc = mds_open(req);
447                 break;
448
449         case MDS_CLOSE:
450                 CDEBUG(D_INODE, "close\n");
451                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
452                 rc = mds_close(req);
453                 break;
454
455         default:
456                 rc = ptlrpc_error(dev, svc, req);
457                 RETURN(rc);
458         }
459
460         EXIT;
461 out:
462         if (rc) {
463                 CERROR("no header\n");
464                 return 0;
465         }
466
467         if( req->rq_status) {
468                 ptlrpc_error(dev, svc, req);
469         } else {
470                 CDEBUG(D_NET, "sending reply\n");
471                 ptlrpc_reply(dev, svc, req);
472         }
473
474         return 0;
475 }
476
477
478 /* mount the file system (secretly) */
479 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
480 {
481         struct obd_ioctl_data* data = buf;
482         struct mds_obd *mds = &obddev->u.mds;
483         struct vfsmount *mnt;
484         int err;
485         ENTRY;
486
487         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
488         err = PTR_ERR(mnt);
489         if (IS_ERR(mnt)) {
490                 CERROR("do_kern_mount failed: %d\n", err);
491                 RETURN(err);
492         }
493
494         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
495         if (!mds->mds_sb)
496                 RETURN(-ENODEV);
497
498         mds->mds_vfsmnt = mnt;
499         mds->mds_fstype = strdup(data->ioc_inlbuf2);
500
501         mds->mds_ctxt.pwdmnt = mnt;
502         mds->mds_ctxt.pwd = mnt->mnt_root;
503         mds->mds_ctxt.fs = KERNEL_DS;
504
505         mds->mds_service = ptlrpc_init_svc(64 * 1024,
506                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
507                                            "self", mds_handle);
508
509         rpc_register_service(mds->mds_service, "self");
510
511         err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
512         if (err)
513                 CERROR("cannot start thread\n");
514
515         MOD_INC_USE_COUNT;
516         RETURN(0);
517 }
518
519 static int mds_cleanup(struct obd_device * obddev)
520 {
521         struct super_block *sb;
522         struct mds_obd *mds = &obddev->u.mds;
523
524         ENTRY;
525
526         if ( !list_empty(&obddev->obd_gen_clients) ) {
527                 CERROR("still has clients!\n");
528                 RETURN(-EBUSY);
529         }
530
531         ptlrpc_stop_thread(mds->mds_service);
532         rpc_unregister_service(mds->mds_service);
533
534         if (!list_empty(&mds->mds_service->srv_reqs)) {
535                 // XXX reply with errors and clean up
536                 CERROR("Request list not empty!\n");
537         }
538
539         rpc_unregister_service(mds->mds_service);
540         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
541
542         sb = mds->mds_sb;
543         if (!mds->mds_sb)
544                 RETURN(0);
545
546         unlock_kernel();
547         mntput(mds->mds_vfsmnt);
548         mds->mds_sb = 0;
549         kfree(mds->mds_fstype);
550         lock_kernel();
551
552         MOD_DEC_USE_COUNT;
553         RETURN(0);
554 }
555
556 /* use obd ops to offer management infrastructure */
557 static struct obd_ops mds_obd_ops = {
558         o_setup:       mds_setup,
559         o_cleanup:     mds_cleanup,
560 };
561
562 static int __init mds_init(void)
563 {
564         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
565         return 0;
566 }
567
568 static void __exit mds_exit(void)
569 {
570         obd_unregister_type(LUSTRE_MDS_NAME);
571 }
572
573 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
574 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
575 MODULE_LICENSE("GPL");
576
577 module_init(mds_init);
578 module_exit(mds_exit);