Whamcloud - gitweb
Disable the delete_inode overloading for now. It overloads the methods
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *
6  *  Lustre Metadata Server (mds) request handler
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  *
15  *  This server is single threaded at present (but can easily be multi threaded)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/version.h>
22 #include <linux/module.h>
23 #include <linux/fs.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29
30 #define DEBUG_SUBSYSTEM S_MDS
31
32 #include <linux/lustre_mds.h>
33 #include <linux/lustre_lib.h>
34 #include <linux/lustre_net.h>
35
36 int mds_sendpage(struct ptlrpc_request *req, struct file *file,
37                  __u64 offset, struct niobuf *dst)
38 {
39         int rc = 0;
40         mm_segment_t oldfs = get_fs();
41
42         OBD_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, -EIO);
43
44         if (req->rq_peer.peer_nid == 0) {
45                 /* dst->addr is a user address, but in a different task! */
46                 char *buf = (char *)(long)dst->addr;
47
48                 set_fs(KERNEL_DS);
49                 rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
50                                      &offset);
51                 set_fs(oldfs);
52
53                 if (rc != PAGE_SIZE) {
54                         rc = -EIO;
55                         GOTO(out, rc);
56                 }
57                 EXIT;
58         } else {
59                 struct ptlrpc_bulk_desc *bulk;
60                 char *buf;
61
62                 bulk = ptlrpc_prep_bulk(&req->rq_peer);
63                 if (bulk == NULL) {
64                         rc = -ENOMEM;
65                         GOTO(out, rc);
66                 }
67
68                 bulk->b_xid = req->rq_xid;
69
70                 OBD_ALLOC(buf, PAGE_SIZE);
71                 if (!buf) {
72                         rc = -ENOMEM;
73                         GOTO(cleanup_bulk, rc);
74                 }
75
76                 set_fs(KERNEL_DS);
77                 rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
78                                      &offset);
79                 set_fs(oldfs);
80
81                 if (rc != PAGE_SIZE) {
82                         rc = -EIO;
83                         GOTO(cleanup_buf, rc);
84                 }
85
86                 bulk->b_buf = buf;
87                 bulk->b_buflen = PAGE_SIZE;
88
89                 rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
90                 wait_event_interruptible(bulk->b_waitq,
91                                          ptlrpc_check_bulk_sent(bulk));
92
93                 if (bulk->b_flags == PTL_RPC_INTR) {
94                         rc = -EINTR;
95                         GOTO(cleanup_buf, rc);
96                 }
97
98                 EXIT;
99         cleanup_buf:
100                 OBD_FREE(buf, PAGE_SIZE);
101         cleanup_bulk:
102                 OBD_FREE(bulk, sizeof(*bulk));
103         }
104 out:
105         return rc;
106 }
107
108 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
109                               struct vfsmount **mnt)
110 {
111         /* stolen from NFS */
112         struct super_block *sb = mds->mds_sb;
113         unsigned long ino = fid->id;
114         __u32 generation = fid->generation;
115         struct inode *inode;
116         struct list_head *lp;
117         struct dentry *result;
118
119         if (ino == 0)
120                 return ERR_PTR(-ESTALE);
121
122         inode = iget(sb, ino);
123         if (inode == NULL)
124                 return ERR_PTR(-ENOMEM);
125
126         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
127
128         if (is_bad_inode(inode) ||
129             (generation && inode->i_generation != generation)) {
130                 /* we didn't find the right inode.. */
131                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
132                         inode->i_ino,
133                         inode->i_nlink, atomic_read(&inode->i_count),
134                         inode->i_generation,
135                         generation);
136                 LBUG();
137                 iput(inode);
138                 return ERR_PTR(-ESTALE);
139         }
140
141         /* now to find a dentry.
142          * If possible, get a well-connected one
143          */
144         if (mnt)
145                 *mnt = mds->mds_vfsmnt;
146         spin_lock(&dcache_lock);
147         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
148                 result = list_entry(lp,struct dentry, d_alias);
149                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
150                         dget_locked(result);
151                         result->d_vfs_flags |= DCACHE_REFERENCED;
152                         spin_unlock(&dcache_lock);
153                         iput(inode);
154                         if (mnt)
155                                 mntget(*mnt);
156                         return result;
157                 }
158         }
159         spin_unlock(&dcache_lock);
160         result = d_alloc_root(inode);
161         if (result == NULL) {
162                 iput(inode);
163                 return ERR_PTR(-ENOMEM);
164         }
165         if (mnt)
166                 mntget(*mnt);
167         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
168         return result;
169 }
170
171 int mds_getattr(struct ptlrpc_request *req)
172 {
173         struct dentry *de;
174         struct inode *inode;
175         struct mds_rep *rep;
176         struct mds_obd *mds = &req->rq_obd->u.mds;
177         int rc;
178
179         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
180                           &req->rq_replen, &req->rq_repbuf);
181         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
182                 CERROR("mds: out of memory\n");
183                 req->rq_status = -ENOMEM;
184                 RETURN(0);
185         }
186
187         req->rq_rephdr->xid = req->rq_reqhdr->xid;
188         rep = req->rq_rep.mds;
189
190         de = mds_fid2dentry(mds, &req->rq_req.mds->fid1, NULL);
191         if (IS_ERR(de)) {
192                 req->rq_rephdr->status = -ENOENT;
193                 RETURN(0);
194         }
195
196         inode = de->d_inode;
197         rep->ino = inode->i_ino;
198         rep->generation = inode->i_generation;
199         rep->atime = inode->i_atime;
200         rep->ctime = inode->i_ctime;
201         rep->mtime = inode->i_mtime;
202         rep->uid = inode->i_uid;
203         rep->gid = inode->i_gid;
204         rep->size = inode->i_size;
205         rep->mode = inode->i_mode;
206         rep->nlink = inode->i_nlink;
207         rep->valid = ~0;
208         mds_fs_get_objid(mds, inode, &rep->objid);
209         dput(de);
210         return 0;
211 }
212
213 int mds_open(struct ptlrpc_request *req)
214 {
215         struct dentry *de;
216         struct mds_rep *rep;
217         struct file *file;
218         struct vfsmount *mnt;
219         __u32 flags;
220         int rc;
221
222         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
223                           &req->rq_replen, &req->rq_repbuf);
224         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
225                 CERROR("mds: out of memory\n");
226                 req->rq_status = -ENOMEM;
227                 RETURN(0);
228         }
229
230         req->rq_rephdr->xid = req->rq_reqhdr->xid;
231         rep = req->rq_rep.mds;
232
233         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
234         if (IS_ERR(de)) {
235                 req->rq_rephdr->status = -ENOENT;
236                 RETURN(0);
237         }
238         flags = req->rq_req.mds->flags;
239         file = dentry_open(de, mnt, flags);
240         if (!file || IS_ERR(file)) {
241                 req->rq_rephdr->status = -EINVAL;
242                 RETURN(0);
243         }
244
245         rep->objid = (__u64) (unsigned long)file;
246         return 0;
247 }
248
249 int mds_close(struct ptlrpc_request *req)
250 {
251         struct dentry *de;
252         struct mds_rep *rep;
253         struct file *file;
254         struct vfsmount *mnt;
255         int rc;
256
257         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
258                           &req->rq_replen, &req->rq_repbuf);
259         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
260                 CERROR("mds: out of memory\n");
261                 req->rq_status = -ENOMEM;
262                 RETURN(0);
263         }
264
265         req->rq_rephdr->xid = req->rq_reqhdr->xid;
266         rep = req->rq_rep.mds;
267
268         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
269         if (IS_ERR(de)) {
270                 req->rq_rephdr->status = -ENOENT;
271                 RETURN(0);
272         }
273
274         file = (struct file *)(unsigned long) req->rq_req.mds->objid;
275
276         req->rq_rephdr->status = filp_close(file, 0);
277         dput(de);
278         mntput(mnt);
279         return 0;
280 }
281
282
283 int mds_readpage(struct ptlrpc_request *req)
284 {
285         struct vfsmount *mnt;
286         struct dentry *de;
287         struct file *file;
288         struct niobuf *niobuf;
289         struct mds_rep *rep;
290         int rc;
291
292         ENTRY;
293
294         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
295                           &req->rq_replen, &req->rq_repbuf);
296         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
297                 CERROR("mds: out of memory\n");
298                 req->rq_status = -ENOMEM;
299                 RETURN(0);
300         }
301
302         req->rq_rephdr->xid = req->rq_reqhdr->xid;
303         rep = req->rq_rep.mds;
304
305         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
306         if (IS_ERR(de)) {
307                 req->rq_rephdr->status = PTR_ERR(de);
308                 RETURN(0);
309         }
310
311         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
312
313         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
314         /* note: in case of an error, dentry_open puts dentry */
315         if (IS_ERR(file)) {
316                 req->rq_rephdr->status = PTR_ERR(file);
317                 RETURN(0);
318         }
319
320         niobuf = mds_req_tgt(req->rq_req.mds);
321
322         /* to make this asynchronous make sure that the handling function
323            doesn't send a reply when this function completes. Instead a
324            callback function would send the reply */
325         rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf);
326
327         filp_close(file, 0);
328         req->rq_rephdr->status = rc;
329         RETURN(0);
330 }
331
332 int mds_reint(struct ptlrpc_request *req)
333 {
334         char *buf;
335         int rc, len;
336         struct mds_update_record rec;
337
338         buf = mds_req_tgt(req->rq_req.mds);
339         len = req->rq_req.mds->tgtlen;
340
341         rc = mds_update_unpack(buf, len, &rec);
342         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
343                 CERROR("invalid record\n");
344                 req->rq_status = -EINVAL;
345                 RETURN(0);
346         }
347         /* rc will be used to interrupt a for loop over multiple records */
348         rc = mds_reint_rec(&rec, req);
349         return 0;
350 }
351
352 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
353                struct ptlrpc_request *req)
354 {
355         int rc;
356         struct ptlreq_hdr *hdr;
357
358         ENTRY;
359
360         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
361
362         if (NTOH__u32(hdr->type) != PTL_RPC_REQUEST) {
363                 CERROR("lustre_mds: wrong packet type sent %d\n",
364                        NTOH__u32(hdr->type));
365                 rc = -EINVAL;
366                 GOTO(out, rc);
367         }
368
369         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen,
370                             &req->rq_reqhdr, &req->rq_req);
371         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
372                 CERROR("lustre_mds: Invalid request\n");
373                 GOTO(out, rc);
374         }
375
376         switch (req->rq_reqhdr->opc) {
377
378         case MDS_GETATTR:
379                 CDEBUG(D_INODE, "getattr\n");
380                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
381                 rc = mds_getattr(req);
382                 break;
383
384         case MDS_READPAGE:
385                 CDEBUG(D_INODE, "readpage\n");
386                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
387                 rc = mds_readpage(req);
388                 break;
389
390         case MDS_REINT:
391                 CDEBUG(D_INODE, "reint\n");
392                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
393                 rc = mds_reint(req);
394                 break;
395
396         case MDS_OPEN:
397                 CDEBUG(D_INODE, "open\n");
398                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
399                 rc = mds_open(req);
400                 break;
401
402         case MDS_CLOSE:
403                 CDEBUG(D_INODE, "close\n");
404                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
405                 rc = mds_close(req);
406                 break;
407
408         default:
409                 rc = ptlrpc_error(dev, svc, req);
410                 RETURN(rc);
411         }
412
413         EXIT;
414 out:
415         if (rc) {
416                 CERROR("no header\n");
417                 LBUG();
418                 return 0;
419         }
420
421         if( req->rq_status) {
422                 ptlrpc_error(dev, svc, req);
423         } else {
424                 CDEBUG(D_NET, "sending reply\n");
425                 ptlrpc_reply(dev, svc, req);
426         }
427
428         return 0;
429 }
430
431
432 /* mount the file system (secretly) */
433 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
434 {
435         struct obd_ioctl_data* data = buf;
436         struct mds_obd *mds = &obddev->u.mds;
437         struct vfsmount *mnt;
438         int err;
439         ENTRY;
440
441         MOD_INC_USE_COUNT;
442         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
443         err = PTR_ERR(mnt);
444         if (IS_ERR(mnt)) {
445                 CERROR("do_kern_mount failed: %d\n", err);
446                 MOD_DEC_USE_COUNT;
447                 RETURN(err);
448         }
449
450         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
451         if (!mds->mds_sb) {
452                 MOD_DEC_USE_COUNT;
453                 RETURN(-ENODEV);
454         }
455
456         mds->mds_vfsmnt = mnt;
457         mds->mds_fstype = strdup(data->ioc_inlbuf2);
458
459         if (!strcmp(mds->mds_fstype, "ext3"))
460                 mds->mds_fsops = &mds_ext3_fs_ops;
461         else if (!strcmp(mds->mds_fstype, "ext2"))
462                 mds->mds_fsops = &mds_ext2_fs_ops;
463         else {
464                 CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype);
465                 kfree(mds->mds_fstype);
466                 MOD_DEC_USE_COUNT;
467                 RETURN(-EPERM);
468         }
469
470         /*
471          * Replace the client filesystem delete_inode method with our own,
472          * so that we can clear the object ID before the inode is deleted.
473          * The fs_delete_inode method will call cl_delete_inode for us.
474         mds->mds_fsops->cl_delete_inode = mds->mds_sb->s_op->delete_inode;
475         mds->mds_sb->s_op->delete_inode = mds->mds_fsops->fs_delete_inode;
476          */
477
478         mds->mds_ctxt.pwdmnt = mnt;
479         mds->mds_ctxt.pwd = mnt->mnt_root;
480         mds->mds_ctxt.fs = KERNEL_DS;
481
482         mds->mds_service = ptlrpc_init_svc(128 * 1024,
483                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
484                                            "self", mds_handle);
485         if (!mds->mds_service) {
486                 CERROR("failed to start service\n");
487                 RETURN(-EINVAL);
488         }
489
490         err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
491         if (err)
492                 CERROR("cannot start thread\n");
493                 /* FIXME: do we need to MOD_DEC_USE_COUNT here? */
494
495         RETURN(0);
496 }
497
498 static int mds_cleanup(struct obd_device * obddev)
499 {
500         struct super_block *sb;
501         struct mds_obd *mds = &obddev->u.mds;
502
503         ENTRY;
504
505         if ( !list_empty(&obddev->obd_gen_clients) ) {
506                 CERROR("still has clients!\n");
507                 RETURN(-EBUSY);
508         }
509
510         ptlrpc_stop_thread(mds->mds_service);
511         rpc_unregister_service(mds->mds_service);
512
513         if (!list_empty(&mds->mds_service->srv_reqs)) {
514                 // XXX reply with errors and clean up
515                 CERROR("Request list not empty!\n");
516         }
517
518         rpc_unregister_service(mds->mds_service);
519         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
520
521         sb = mds->mds_sb;
522         if (!mds->mds_sb)
523                 RETURN(0);
524
525         unlock_kernel();
526         mntput(mds->mds_vfsmnt);
527         mds->mds_sb = 0;
528         kfree(mds->mds_fstype);
529         lock_kernel();
530
531         MOD_DEC_USE_COUNT;
532         RETURN(0);
533 }
534
535 /* use obd ops to offer management infrastructure */
536 static struct obd_ops mds_obd_ops = {
537         o_setup:       mds_setup,
538         o_cleanup:     mds_cleanup,
539 };
540
541 static int __init mds_init(void)
542 {
543         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
544         return 0;
545 }
546
547 static void __exit mds_exit(void)
548 {
549         obd_unregister_type(LUSTRE_MDS_NAME);
550 }
551
552 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
553 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
554 MODULE_LICENSE("GPL");
555
556 module_init(mds_init);
557 module_exit(mds_exit);