Whamcloud - gitweb
3585f8c3d424bc150b5bbf7352c2fc997d7f3970
[fs/lustre-release.git] / lustre / mds / handler.c
1 /*
2  *  linux/mds/handler.c
3  *  
4  *  Lustre Metadata Server (mds) request handler
5  * 
6  *  Copyright (C) 2001  Cluster File Systems, Inc.
7  *
8  *  This code is issued under the GNU General Public License.
9  *  See the file COPYING in this distribution
10  *
11  *  by Peter Braam <braam@clusterfs.com>
12  * 
13  *  This server is single threaded at present (but can easily be multi threaded). 
14  * 
15  */
16
17
18 #define EXPORT_SYMTAB
19
20 #include <linux/version.h>
21 #include <linux/module.h>
22 #include <linux/fs.h>
23 #include <linux/stat.h>
24 #include <linux/locks.h>
25 #include <linux/ext2_fs.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29 #include <linux/obd_support.h>
30 #include <linux/obd.h>
31 #include <linux/lustre_lib.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/obd_class.h>
35
36 // for testing
37 static struct mds_obd *MDS;
38
39 // XXX make this networked!  
40 static int mds_queue_req(struct mds_request *req)
41 {
42         struct mds_request *srv_req;
43         
44         if (!MDS) { 
45                 EXIT;
46                 return -1;
47         }
48
49         srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
50         if (!srv_req) { 
51                 EXIT;
52                 return -ENOMEM;
53         }
54
55         printk("---> MDS at %d %p, incoming req %p, srv_req %p\n", 
56                __LINE__, MDS, req, srv_req);
57
58         memset(srv_req, 0, sizeof(*req)); 
59
60         /* move the request buffer */
61         srv_req->rq_reqbuf = req->rq_reqbuf;
62         srv_req->rq_reqlen    = req->rq_reqlen;
63         srv_req->rq_obd = MDS;
64
65         /* remember where it came from */
66         srv_req->rq_reply_handle = req;
67
68         list_add(&srv_req->rq_list, &MDS->mds_reqs); 
69         wake_up(&MDS->mds_waitq);
70         return 0;
71 }
72
73 /* XXX do this over the net */
74 int mds_sendpage(struct mds_request *req, struct file *file, 
75                     __u64 offset, struct niobuf *dst)
76 {
77         int rc; 
78
79         rc = generic_file_read(file, (char *)(long)dst->addr, 
80                               PAGE_SIZE, &offset); 
81
82         if (rc != PAGE_SIZE) 
83                 return -EIO;
84         return 0;
85 }
86
87 /* XXX replace with networking code */
88 int mds_reply(struct mds_request *req)
89 {
90         struct mds_request *clnt_req = req->rq_reply_handle;
91
92         ENTRY;
93
94         /* free the request buffer */
95         kfree(req->rq_reqbuf);
96         req->rq_reqbuf = NULL; 
97         
98         /* move the reply to the client */ 
99         clnt_req->rq_replen = req->rq_replen;
100         clnt_req->rq_repbuf = req->rq_repbuf;
101         req->rq_repbuf = NULL;
102         req->rq_replen = 0;
103
104         /* wake up the client */ 
105         wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
106         EXIT;
107         return 0;
108 }
109
110 int mds_error(struct mds_request *req)
111 {
112         struct mds_rep_hdr *hdr;
113
114         ENTRY;
115         hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
116         if (!hdr) { 
117                 EXIT;
118                 return -ENOMEM;
119         }
120
121         memset(hdr, 0, sizeof(*hdr));
122         
123         hdr->seqno = req->rq_reqhdr->seqno;
124         hdr->status = req->rq_status; 
125         hdr->type = MDS_TYPE_ERR;
126
127         req->rq_repbuf = (char *)hdr;
128         req->rq_replen = sizeof(*hdr); 
129
130         EXIT;
131         return mds_reply(req);
132 }
133
134 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt)
135 {
136
137         /* iget isn't really right if the inode is currently unallocated!!
138          * This should really all be done inside each filesystem
139          *
140          * ext2fs' read_inode has been strengthed to return a bad_inode if the inode
141          *   had been deleted.
142          *
143          * Currently we don't know the generation for parent directory, so a generation
144          * of 0 means "accept any"
145          */
146         struct super_block *sb = mds->mds_sb; 
147         unsigned long ino = fid->id;
148         //__u32 generation = fid->generation;
149         __u32 generation = 0;
150         struct inode *inode;
151         struct list_head *lp;
152         struct dentry *result;
153
154         if (mnt) { 
155                 *mnt = mntget(mds->mds_vfsmnt);
156         }
157
158         if (ino == 0)
159                 return ERR_PTR(-ESTALE);
160         inode = iget(sb, ino);
161         if (inode == NULL)
162                 return ERR_PTR(-ENOMEM);
163
164         printk("--> mds_fid2dentry: sb %p\n", inode->i_sb); 
165
166         if (is_bad_inode(inode)
167             || (generation && inode->i_generation != generation)
168                 ) {
169                 /* we didn't find the right inode.. */
170                 printk("mds_fid2dentry: Inode %lu, Bad count: %d %d or version  %u %u\n",
171                         inode->i_ino,
172                         inode->i_nlink, atomic_read(&inode->i_count),
173                         inode->i_generation,
174                         generation);
175
176                 iput(inode);
177                 return ERR_PTR(-ESTALE);
178         }
179         /* now to find a dentry.
180          * If possible, get a well-connected one
181          */
182         spin_lock(&dcache_lock);
183         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
184                 result = list_entry(lp,struct dentry, d_alias);
185                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
186                         dget_locked(result);
187                         result->d_vfs_flags |= DCACHE_REFERENCED;
188                         spin_unlock(&dcache_lock);
189                         iput(inode);
190                         return result;
191                 }
192         }
193         spin_unlock(&dcache_lock);
194         result = d_alloc_root(inode);
195         if (result == NULL) {
196                 iput(inode);
197                 return ERR_PTR(-ENOMEM);
198         }
199         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
200         return result;
201 }
202
203 int mds_getattr(struct mds_request *req)
204 {
205         struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req->fid1, 
206                                            NULL);
207         struct inode *inode;
208         struct mds_rep *rep;
209         int rc;
210         
211         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
212                           &req->rq_replen, &req->rq_repbuf);
213         if (rc) { 
214                 EXIT;
215                 printk("mds: out of memory\n");
216                 req->rq_status = -ENOMEM;
217                 return -ENOMEM;
218         }
219
220         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
221         rep = req->rq_rep;
222
223         if (!de) { 
224                 EXIT;
225                 req->rq_rephdr->status = -ENOENT;
226                 return 0;
227         }
228
229         inode = de->d_inode;
230         rep->ino = inode->i_ino;
231         rep->atime = inode->i_atime;
232         rep->ctime = inode->i_ctime;
233         rep->mtime = inode->i_mtime;
234         rep->uid = inode->i_uid;
235         rep->gid = inode->i_gid;
236         rep->size = inode->i_size;
237         rep->mode = inode->i_mode;
238         rep->valid = ~0;
239
240         dput(de); 
241         return 0;
242 }
243
244 int mds_readpage(struct mds_request *req)
245 {
246         struct vfsmount *mnt;
247         struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req->fid1, 
248                                            &mnt);
249         struct file *file; 
250         struct niobuf *niobuf; 
251         struct mds_rep *rep;
252         int rc;
253         
254         printk("mds_readpage: ino %ld\n", de->d_inode->i_ino);
255         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
256                           &req->rq_replen, &req->rq_repbuf);
257         if (rc) { 
258                 EXIT;
259                 printk("mds: out of memory\n");
260                 req->rq_status = -ENOMEM;
261                 return -ENOMEM;
262         }
263
264         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
265         rep = req->rq_rep;
266
267         if (IS_ERR(de)) { 
268                 EXIT;
269                 req->rq_rephdr->status = PTR_ERR(de); 
270                 return 0;
271         }
272
273         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); 
274         /* note: in case of an error, dentry_open puts dentry */
275         if (IS_ERR(file)) { 
276                 EXIT;
277                 req->rq_rephdr->status = PTR_ERR(file);
278                 return 0;
279         }
280                 
281         niobuf = mds_req_tgt(req->rq_req);
282
283         /* to make this asynchronous make sure that the handling function 
284            doesn't send a reply when this function completes. Instead a 
285            callback function would send the reply */ 
286         rc = mds_sendpage(req, file, req->rq_req->size, niobuf); 
287
288         filp_close(file, 0);
289         req->rq_rephdr->status = rc;
290         EXIT;
291         return 0;
292 }
293
294 int mds_reint(struct mds_request *req)
295 {
296         int opc = NTOH__u32(req->rq_req->opcode);
297         
298         switch (opc) { 
299         case REINT_SETATTR: 
300                 return mds_reint_setattr(req);
301         default: 
302                 printk(__FUNCTION__ "opcode %d not handled.\n", opc); 
303                 return -EINVAL;
304         }
305
306         return 0; 
307 }
308
309 //int mds_handle(struct mds_conn *conn, int len, char *buf)
310 int mds_handle(struct mds_request *req)
311 {
312         int rc;
313         struct mds_req_hdr *hdr;
314
315         ENTRY;
316
317         hdr = (struct mds_req_hdr *)req->rq_reqbuf;
318
319         if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
320                 printk("lustre_mds: wrong packet type sent %d\n",
321                        NTOH__u32(hdr->type));
322                 rc = -EINVAL;
323                 goto out;
324         }
325
326         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
327                             &req->rq_reqhdr, &req->rq_req);
328         if (rc) { 
329                 printk("lustre_mds: Invalid request\n");
330                 EXIT; 
331                 goto out;
332         }
333
334         switch (req->rq_reqhdr->opc) { 
335
336         case MDS_GETATTR:
337                 CDEBUG(D_INODE, "getattr\n");
338                 rc = mds_getattr(req);
339                 break;
340
341         case MDS_READPAGE:
342                 CDEBUG(D_INODE, "readpage\n");
343                 rc = mds_readpage(req);
344                 break;
345
346         case MDS_REINT:
347                 CDEBUG(D_INODE, "reint\n");
348                 rc = mds_reint(req);
349                 break;
350
351         default:
352                 return mds_error(req);
353         }
354
355 out:
356         if (rc) { 
357                 printk("mds: processing error %d\n", rc);
358                 mds_error(req);
359         } else { 
360                 CDEBUG(D_INODE, "sending reply\n"); 
361                 mds_reply(req); 
362         }
363
364         return 0;
365 }
366
367
368 static void mds_timer_run(unsigned long __data)
369 {
370         struct task_struct * p = (struct task_struct *) __data;
371
372         wake_up_process(p);
373 }
374
375 int mds_main(void *arg)
376 {
377         struct mds_obd *mds = (struct mds_obd *) arg;
378         struct timer_list timer;
379
380         lock_kernel();
381         daemonize();
382         spin_lock_irq(&current->sigmask_lock);
383         sigfillset(&current->blocked);
384         recalc_sigpending(current);
385         spin_unlock_irq(&current->sigmask_lock);
386
387         sprintf(current->comm, "lustre_mds");
388
389         /* Set up an interval timer which can be used to trigger a
390            wakeup after the interval expires */
391         init_timer(&timer);
392         timer.data = (unsigned long) current;
393         timer.function = mds_timer_run;
394         mds->mds_timer = &timer;
395
396         /* Record that the  thread is running */
397         mds->mds_thread = current;
398         wake_up(&mds->mds_done_waitq); 
399
400         printk(KERN_INFO "lustre_mds starting.  Commit interval %d seconds\n",
401                         mds->mds_interval / HZ);
402
403         /* XXX maintain a list of all managed devices: insert here */
404
405         /* And now, wait forever for commit wakeup events. */
406         while (1) {
407                 struct mds_request *request;
408                 int rc; 
409
410                 if (mds->mds_flags & MDS_UNMOUNT)
411                         break;
412
413
414                 wake_up(&mds->mds_done_waitq);
415                 interruptible_sleep_on(&mds->mds_waitq);
416
417                 CDEBUG(D_INODE, "lustre_mds wakes\n");
418                 CDEBUG(D_INODE, "pick up req here and continue\n"); 
419
420                 if (list_empty(&mds->mds_reqs)) { 
421                         CDEBUG(D_INODE, "woke because of timer\n"); 
422                 } else { 
423                         request = list_entry(mds->mds_reqs.next, 
424                                              struct mds_request, rq_list);
425                         list_del(&request->rq_list);
426                         rc = mds_handle(request); 
427                 }
428         }
429
430         del_timer_sync(mds->mds_timer);
431
432         /* XXX maintain a list of all managed devices: cleanup here */
433
434         mds->mds_thread = NULL;
435         wake_up(&mds->mds_done_waitq);
436         printk("lustre_mds: exiting\n");
437         return 0;
438 }
439
440 static void mds_stop_srv_thread(struct mds_obd *mds)
441 {
442         mds->mds_flags |= MDS_UNMOUNT;
443
444         while (mds->mds_thread) {
445                 wake_up(&mds->mds_waitq);
446                 sleep_on(&mds->mds_done_waitq);
447         }
448 }
449
450 static void mds_start_srv_thread(struct mds_obd *mds)
451 {
452         init_waitqueue_head(&mds->mds_waitq);
453         init_waitqueue_head(&mds->mds_done_waitq);
454         kernel_thread(mds_main, (void *)mds, 
455                       CLONE_VM | CLONE_FS | CLONE_FILES);
456         while (!mds->mds_thread) 
457                 sleep_on(&mds->mds_done_waitq);
458 }
459
460 /* mount the file system (secretly) */
461 static int mds_setup(struct obd_device *obddev, obd_count len,
462                         void *buf)
463                         
464 {
465         struct obd_ioctl_data* data = buf;
466         struct mds_obd *mds = &obddev->u.mds;
467         struct vfsmount *mnt;
468         int err; 
469         ENTRY;
470         
471         mnt = do_kern_mount(data->ioc_inlbuf2, 0, 
472                             data->ioc_inlbuf1, NULL); 
473         err = PTR_ERR(mnt);
474         if (IS_ERR(mnt)) { 
475                 EXIT;
476                 return err;
477         }
478
479         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
480         if (!obddev->u.mds.mds_sb) {
481                 EXIT;
482                 return -ENODEV;
483         }
484
485         INIT_LIST_HEAD(&mds->mds_reqs);
486         mds->mds_thread = NULL;
487         mds->mds_flags = 0;
488         mds->mds_interval = 3 * HZ;
489         mds->mds_vfsmnt = mnt;
490         obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
491
492         mds->mds_ctxt.pwdmnt = mnt;
493         mds->mds_ctxt.pwd = mnt->mnt_root;
494         mds->mds_ctxt.fs = KERNEL_DS;
495         MDS = mds;
496
497         spin_lock_init(&obddev->u.mds.mds_lock);
498
499         mds_start_srv_thread(mds);
500
501         MOD_INC_USE_COUNT;
502         EXIT; 
503         return 0;
504
505
506 static int mds_cleanup(struct obd_device * obddev)
507 {
508         struct super_block *sb;
509         struct mds_obd *mds = &obddev->u.mds;
510
511         ENTRY;
512
513         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
514                 EXIT;
515                 return 0;
516         }
517
518         if ( !list_empty(&obddev->obd_gen_clients) ) {
519                 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
520                 EXIT;
521                 return -EBUSY;
522         }
523
524         MDS = NULL;
525         mds_stop_srv_thread(mds);
526         sb = mds->mds_sb;
527         if (!mds->mds_sb){
528                 EXIT;
529                 return 0;
530         }
531
532         if (!list_empty(&mds->mds_reqs)) {
533                 // XXX reply with errors and clean up
534                 CDEBUG(D_INODE, "Request list not empty!\n");
535         }
536
537         unlock_kernel();
538         mntput(mds->mds_vfsmnt); 
539         mds->mds_sb = 0;
540         kfree(mds->mds_fstype);
541         lock_kernel();
542         
543
544         MOD_DEC_USE_COUNT;
545         EXIT;
546         return 0;
547 }
548
549 /* use obd ops to offer management infrastructure */
550 static struct obd_ops mds_obd_ops = {
551         o_setup:       mds_setup,
552         o_cleanup:     mds_cleanup,
553 };
554
555 static int __init mds_init(void)
556 {
557         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
558         return 0;
559 }
560
561 static void __exit mds_exit(void)
562 {
563         obd_unregister_type(LUSTRE_MDS_NAME);
564 }
565
566 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
567 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
568 MODULE_LICENSE("GPL");
569
570
571 // for testing (maybe this stays)
572 EXPORT_SYMBOL(mds_queue_req);
573
574 module_init(mds_init);
575 module_exit(mds_exit);