Whamcloud - gitweb
assorted bug fixes and a working directory readpage routine
[fs/lustre-release.git] / lustre / mds / handler.c
1 /*
2  *  linux/mds/handler.c
3  *  
4  *  Lustre Metadata Server (mds) request handler
5  * 
6  *  Copyright (C) 2001  Cluster File Systems, Inc.
7  *
8  *  This code is issued under the GNU General Public License.
9  *  See the file COPYING in this distribution
10  *
11  *  by Peter Braam <braam@clusterfs.com>
12  * 
13  *  This server is single threaded at present (but can easily be multi threaded). 
14  * 
15  */
16
17
18 #define EXPORT_SYMTAB
19
20 #include <linux/version.h>
21 #include <linux/module.h>
22 #include <linux/fs.h>
23 #include <linux/stat.h>
24 #include <linux/locks.h>
25 #include <linux/ext2_fs.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29 #include <linux/obd_support.h>
30 #include <linux/obd.h>
31 #include <linux/lustre_lib.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/obd_class.h>
35
36 // for testing
37 static struct mds_obd *MDS;
38
39 // XXX make this networked!  
40 static int mds_queue_req(struct mds_request *req)
41 {
42         struct mds_request *srv_req;
43         
44         if (!MDS) { 
45                 EXIT;
46                 return -1;
47         }
48
49         srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
50         if (!srv_req) { 
51                 EXIT;
52                 return -ENOMEM;
53         }
54
55         printk("---> MDS at %d %p, incoming req %p, srv_req %p\n", 
56                __LINE__, MDS, req, srv_req);
57
58         memset(srv_req, 0, sizeof(*req)); 
59
60         /* move the request buffer */
61         srv_req->rq_reqbuf = req->rq_reqbuf;
62         srv_req->rq_reqlen    = req->rq_reqlen;
63         srv_req->rq_obd = MDS;
64
65         /* remember where it came from */
66         srv_req->rq_reply_handle = req;
67
68         list_add(&srv_req->rq_list, &MDS->mds_reqs); 
69         wake_up(&MDS->mds_waitq);
70         return 0;
71 }
72
73 /* XXX do this over the net */
74 int mds_sendpage(struct mds_request *req, struct file *file, 
75                     __u64 offset, struct niobuf *dst)
76 {
77         int rc; 
78
79         rc = generic_file_read(file, (char *)(long)dst->addr, 
80                               PAGE_SIZE, &offset); 
81
82         if (rc != PAGE_SIZE) 
83                 return -EIO;
84         return 0;
85 }
86
87 /* XXX replace with networking code */
88 int mds_reply(struct mds_request *req)
89 {
90         struct mds_request *clnt_req = req->rq_reply_handle;
91
92         ENTRY;
93
94         /* free the request buffer */
95         kfree(req->rq_reqbuf);
96         req->rq_reqbuf = NULL; 
97         
98         /* move the reply to the client */ 
99         clnt_req->rq_replen = req->rq_replen;
100         clnt_req->rq_repbuf = req->rq_repbuf;
101         req->rq_repbuf = NULL;
102         req->rq_replen = 0;
103
104         /* wake up the client */ 
105         wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
106         EXIT;
107         return 0;
108 }
109
110 int mds_error(struct mds_request *req)
111 {
112         struct mds_rep_hdr *hdr;
113
114         ENTRY;
115         hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
116         if (!hdr) { 
117                 EXIT;
118                 return -ENOMEM;
119         }
120
121         memset(hdr, 0, sizeof(*hdr));
122         
123         hdr->seqno = req->rq_reqhdr->seqno;
124         hdr->status = req->rq_status; 
125         hdr->type = MDS_TYPE_ERR;
126
127         req->rq_repbuf = (char *)hdr;
128         req->rq_replen = sizeof(*hdr); 
129
130         EXIT;
131         return mds_reply(req);
132 }
133
134 static struct dentry *mds_fid2dentry(struct mds_obd *mds, struct lustre_fid *fid, struct vfsmount **mnt)
135 {
136
137         /* iget isn't really right if the inode is currently unallocated!!
138          * This should really all be done inside each filesystem
139          *
140          * ext2fs' read_inode has been strengthed to return a bad_inode if the inode
141          *   had been deleted.
142          *
143          * Currently we don't know the generation for parent directory, so a generation
144          * of 0 means "accept any"
145          */
146         struct super_block *sb = mds->mds_sb; 
147         unsigned long ino = fid->id;
148         __u32 generation = fid->generation;
149         struct inode *inode;
150         struct list_head *lp;
151         struct dentry *result;
152
153         if (mnt) { 
154                 *mnt = mntget(mds->mds_vfsmnt);
155         }
156
157         if (ino == 0)
158                 return ERR_PTR(-ESTALE);
159         inode = iget(sb, ino);
160         if (inode == NULL)
161                 return ERR_PTR(-ENOMEM);
162
163         printk("--> mds_fid2dentry: sb %p\n", inode->i_sb); 
164
165         if (is_bad_inode(inode)
166             || (generation && inode->i_generation != generation)
167                 ) {
168                 /* we didn't find the right inode.. */
169                 printk("mds_fid2dentry: Inode %lu, Bad count: %d %d or version  %u %u\n",
170                         inode->i_ino,
171                         inode->i_nlink, atomic_read(&inode->i_count),
172                         inode->i_generation,
173                         generation);
174
175                 iput(inode);
176                 return ERR_PTR(-ESTALE);
177         }
178         /* now to find a dentry.
179          * If possible, get a well-connected one
180          */
181         spin_lock(&dcache_lock);
182         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
183                 result = list_entry(lp,struct dentry, d_alias);
184                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
185                         dget_locked(result);
186                         result->d_vfs_flags |= DCACHE_REFERENCED;
187                         spin_unlock(&dcache_lock);
188                         iput(inode);
189                         return result;
190                 }
191         }
192         spin_unlock(&dcache_lock);
193         result = d_alloc_root(inode);
194         if (result == NULL) {
195                 iput(inode);
196                 return ERR_PTR(-ENOMEM);
197         }
198         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
199         return result;
200 }
201
202 int mds_getattr(struct mds_request *req)
203 {
204         struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req->fid1, 
205                                            NULL);
206         struct inode *inode;
207         struct mds_rep *rep;
208         int rc;
209         
210         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
211                           &req->rq_replen, &req->rq_repbuf);
212         if (rc) { 
213                 EXIT;
214                 printk("mds: out of memory\n");
215                 req->rq_status = -ENOMEM;
216                 return -ENOMEM;
217         }
218
219         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
220         rep = req->rq_rep;
221
222         if (!de) { 
223                 EXIT;
224                 req->rq_rephdr->status = -ENOENT;
225                 return 0;
226         }
227
228         inode = de->d_inode;
229         rep->ino = inode->i_ino;
230         rep->atime = inode->i_atime;
231         rep->ctime = inode->i_ctime;
232         rep->mtime = inode->i_mtime;
233         rep->uid = inode->i_uid;
234         rep->gid = inode->i_gid;
235         rep->size = inode->i_size;
236         rep->mode = inode->i_mode;
237         rep->valid = ~0;
238
239         dput(de); 
240         return 0;
241 }
242
243 int mds_readpage(struct mds_request *req)
244 {
245         struct vfsmount *mnt;
246         struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req->fid1, 
247                                            &mnt);
248         struct file *file; 
249         struct niobuf *niobuf; 
250         struct mds_rep *rep;
251         int rc;
252         
253         printk("mds_readpage: ino %ld\n", de->d_inode->i_ino);
254         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
255                           &req->rq_replen, &req->rq_repbuf);
256         if (rc) { 
257                 EXIT;
258                 printk("mds: out of memory\n");
259                 req->rq_status = -ENOMEM;
260                 return -ENOMEM;
261         }
262
263         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
264         rep = req->rq_rep;
265
266         if (IS_ERR(de)) { 
267                 EXIT;
268                 req->rq_rephdr->status = PTR_ERR(de); 
269                 return 0;
270         }
271
272         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); 
273         /* note: in case of an error, dentry_open puts dentry */
274         if (IS_ERR(file)) { 
275                 EXIT;
276                 req->rq_rephdr->status = PTR_ERR(file);
277                 return 0;
278         }
279                 
280         niobuf = (struct niobuf *)req->rq_req->tgt;
281
282         /* to make this asynchronous make sure that the handling function 
283            doesn't send a reply when this function completes. Instead a 
284            callback function would send the reply */ 
285         rc = mds_sendpage(req, file, req->rq_req->size, niobuf); 
286
287         filp_close(file, 0);
288         req->rq_rephdr->status = rc;
289         EXIT;
290         return 0;
291 }
292
293
294
295 //int mds_handle(struct mds_conn *conn, int len, char *buf)
296 int mds_handle(struct mds_request *req)
297 {
298         int rc;
299         struct mds_req_hdr *hdr;
300
301         ENTRY;
302
303         hdr = (struct mds_req_hdr *)req->rq_reqbuf;
304
305         if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
306                 printk("lustre_mds: wrong packet type sent %d\n",
307                        NTOH__u32(hdr->type));
308                 rc = -EINVAL;
309                 goto out;
310         }
311
312         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
313                             &req->rq_reqhdr, &req->rq_req);
314         if (rc) { 
315                 printk("lustre_mds: Invalid request\n");
316                 EXIT; 
317                 goto out;
318         }
319
320         switch (req->rq_reqhdr->opc) { 
321
322         case MDS_GETATTR:
323                 CDEBUG(D_INODE, "getattr\n");
324                 rc = mds_getattr(req);
325                 break;
326
327         case MDS_READPAGE:
328                 CDEBUG(D_INODE, "readpage\n");
329                 rc = mds_readpage(req);
330                 break;
331
332         case MDS_SETATTR:
333                 return mds_getattr(req);
334
335         case MDS_CREATE:
336                 return mds_getattr(req);
337
338         case MDS_MKDIR:
339                 return mds_getattr(req);
340
341         case MDS_RMDIR:
342                 return mds_getattr(req);
343
344         case MDS_SYMLINK:
345                 return mds_getattr(req);
346  
347         case MDS_LINK:
348                 return mds_getattr(req);
349   
350         case MDS_MKNOD:
351                 return mds_getattr(req);
352
353         case MDS_UNLINK:
354                 return mds_getattr(req);
355
356         case MDS_RENAME:
357                 return mds_getattr(req);
358
359         default:
360                 return mds_error(req);
361         }
362
363 out:
364         if (rc) { 
365                 printk("mds: processing error %d\n", rc);
366                 mds_error(req);
367         } else { 
368                 CDEBUG(D_INODE, "sending reply\n"); 
369                 mds_reply(req); 
370         }
371
372         return 0;
373 }
374
375
376 static void mds_timer_run(unsigned long __data)
377 {
378         struct task_struct * p = (struct task_struct *) __data;
379
380         wake_up_process(p);
381 }
382
383 int mds_main(void *arg)
384 {
385         struct mds_obd *mds = (struct mds_obd *) arg;
386         struct timer_list timer;
387
388         lock_kernel();
389         daemonize();
390         spin_lock_irq(&current->sigmask_lock);
391         sigfillset(&current->blocked);
392         recalc_sigpending(current);
393         spin_unlock_irq(&current->sigmask_lock);
394
395         sprintf(current->comm, "lustre_mds");
396
397         /* Set up an interval timer which can be used to trigger a
398            wakeup after the interval expires */
399         init_timer(&timer);
400         timer.data = (unsigned long) current;
401         timer.function = mds_timer_run;
402         mds->mds_timer = &timer;
403
404         /* Record that the  thread is running */
405         mds->mds_thread = current;
406         wake_up(&mds->mds_done_waitq); 
407
408         printk(KERN_INFO "lustre_mds starting.  Commit interval %d seconds\n",
409                         mds->mds_interval / HZ);
410
411         /* XXX maintain a list of all managed devices: insert here */
412
413         /* And now, wait forever for commit wakeup events. */
414         while (1) {
415                 struct mds_request *request;
416                 int rc; 
417
418                 if (mds->mds_flags & MDS_UNMOUNT)
419                         break;
420
421
422                 wake_up(&mds->mds_done_waitq);
423                 interruptible_sleep_on(&mds->mds_waitq);
424
425                 CDEBUG(D_INODE, "lustre_mds wakes\n");
426                 CDEBUG(D_INODE, "pick up req here and continue\n"); 
427
428                 if (list_empty(&mds->mds_reqs)) { 
429                         CDEBUG(D_INODE, "woke because of timer\n"); 
430                 } else { 
431                         request = list_entry(mds->mds_reqs.next, 
432                                              struct mds_request, rq_list);
433                         list_del(&request->rq_list);
434                         rc = mds_handle(request); 
435                 }
436         }
437
438         del_timer_sync(mds->mds_timer);
439
440         /* XXX maintain a list of all managed devices: cleanup here */
441
442         mds->mds_thread = NULL;
443         wake_up(&mds->mds_done_waitq);
444         printk("lustre_mds: exiting\n");
445         return 0;
446 }
447
448 static void mds_stop_srv_thread(struct mds_obd *mds)
449 {
450         mds->mds_flags |= MDS_UNMOUNT;
451
452         while (mds->mds_thread) {
453                 wake_up(&mds->mds_waitq);
454                 sleep_on(&mds->mds_done_waitq);
455         }
456 }
457
458 static void mds_start_srv_thread(struct mds_obd *mds)
459 {
460         init_waitqueue_head(&mds->mds_waitq);
461         init_waitqueue_head(&mds->mds_done_waitq);
462         kernel_thread(mds_main, (void *)mds, 
463                       CLONE_VM | CLONE_FS | CLONE_FILES);
464         while (!mds->mds_thread) 
465                 sleep_on(&mds->mds_done_waitq);
466 }
467
468 /* mount the file system (secretly) */
469 static int mds_setup(struct obd_device *obddev, obd_count len,
470                         void *buf)
471                         
472 {
473         struct obd_ioctl_data* data = buf;
474         struct mds_obd *mds = &obddev->u.mds;
475         struct vfsmount *mnt;
476         int err; 
477         ENTRY;
478         
479         mnt = do_kern_mount(data->ioc_inlbuf2, 0, 
480                             data->ioc_inlbuf1, NULL); 
481         err = PTR_ERR(mnt);
482         if (IS_ERR(mnt)) { 
483                 EXIT;
484                 return err;
485         }
486
487         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
488         if (!obddev->u.mds.mds_sb) {
489                 EXIT;
490                 return -ENODEV;
491         }
492
493         INIT_LIST_HEAD(&mds->mds_reqs);
494         mds->mds_thread = NULL;
495         mds->mds_flags = 0;
496         mds->mds_interval = 3 * HZ;
497         mds->mds_vfsmnt = mnt;
498         obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
499
500         mds->mds_ctxt.pwdmnt = mnt;
501         mds->mds_ctxt.pwd = mnt->mnt_root;
502         mds->mds_ctxt.fs = KERNEL_DS;
503         MDS = mds;
504
505         spin_lock_init(&obddev->u.mds.mds_lock);
506
507         mds_start_srv_thread(mds);
508
509         MOD_INC_USE_COUNT;
510         EXIT; 
511         return 0;
512
513
514 static int mds_cleanup(struct obd_device * obddev)
515 {
516         struct super_block *sb;
517         struct mds_obd *mds = &obddev->u.mds;
518
519         ENTRY;
520
521         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
522                 EXIT;
523                 return 0;
524         }
525
526         if ( !list_empty(&obddev->obd_gen_clients) ) {
527                 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
528                 EXIT;
529                 return -EBUSY;
530         }
531
532         MDS = NULL;
533         mds_stop_srv_thread(mds);
534         sb = mds->mds_sb;
535         if (!mds->mds_sb){
536                 EXIT;
537                 return 0;
538         }
539
540         if (!list_empty(&mds->mds_reqs)) {
541                 // XXX reply with errors and clean up
542                 CDEBUG(D_INODE, "Request list not empty!\n");
543         }
544
545         unlock_kernel();
546         mntput(mds->mds_vfsmnt); 
547         mds->mds_sb = 0;
548         kfree(mds->mds_fstype);
549         lock_kernel();
550         
551
552         MOD_DEC_USE_COUNT;
553         EXIT;
554         return 0;
555 }
556
557 /* use obd ops to offer management infrastructure */
558 static struct obd_ops mds_obd_ops = {
559         o_setup:       mds_setup,
560         o_cleanup:     mds_cleanup,
561 };
562
563 static int __init mds_init(void)
564 {
565         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
566         return 0;
567 }
568
569 static void __exit mds_exit(void)
570 {
571         obd_unregister_type(LUSTRE_MDS_NAME);
572 }
573
574 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
575 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
576 MODULE_LICENSE("GPL");
577
578
579 // for testing (maybe this stays)
580 EXPORT_SYMBOL(mds_queue_req);
581
582 module_init(mds_init);
583 module_exit(mds_exit);