Whamcloud - gitweb
Changes for file creation and small fixes.
[fs/lustre-release.git] / lustre / mds / handler.c
1 /*
2  *  linux/mds/handler.c
3  *  
4  *  Lustre Metadata Server (mds) request handler
5  * 
6  *  Copyright (C) 2001  Cluster File Systems, Inc.
7  *
8  *  This code is issued under the GNU General Public License.
9  *  See the file COPYING in this distribution
10  *
11  *  by Peter Braam <braam@clusterfs.com>
12  * 
13  *  This server is single threaded at present (but can easily be multi threaded). 
14  * 
15  */
16
17
18 #define EXPORT_SYMTAB
19
20 #include <linux/version.h>
21 #include <linux/module.h>
22 #include <linux/fs.h>
23 #include <linux/stat.h>
24 #include <linux/locks.h>
25 #include <linux/ext2_fs.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29 #include <linux/obd_support.h>
30 #include <linux/obd.h>
31 #include <linux/lustre_lib.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/obd_class.h>
35
36 // XXX for testing
37 static struct mds_obd *MDS;
38
39 // XXX make this networked!  
40 static int mds_queue_req(struct mds_request *req)
41 {
42         struct mds_request *srv_req;
43         
44         if (!MDS) { 
45                 EXIT;
46                 return -1;
47         }
48
49         srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
50         if (!srv_req) { 
51                 EXIT;
52                 return -ENOMEM;
53         }
54
55         printk("---> MDS at %d %p, incoming req %p, srv_req %p\n", 
56                __LINE__, MDS, req, srv_req);
57
58         memset(srv_req, 0, sizeof(*req)); 
59
60         /* move the request buffer */
61         srv_req->rq_reqbuf = req->rq_reqbuf;
62         srv_req->rq_reqlen    = req->rq_reqlen;
63         srv_req->rq_obd = MDS;
64
65         /* remember where it came from */
66         srv_req->rq_reply_handle = req;
67
68         list_add(&srv_req->rq_list, &MDS->mds_reqs); 
69         wake_up(&MDS->mds_waitq);
70         return 0;
71 }
72
73 /* XXX do this over the net */
74 int mds_sendpage(struct mds_request *req, struct file *file, 
75                     __u64 offset, struct niobuf *dst)
76 {
77         int rc; 
78         mm_segment_t oldfs = get_fs();
79         /* dst->addr is a user address, but in a different task! */
80         set_fs(KERNEL_DS); 
81         rc = generic_file_read(file, (char *)(long)dst->addr, 
82                               PAGE_SIZE, &offset); 
83         set_fs(oldfs);
84
85         if (rc != PAGE_SIZE) 
86                 return -EIO;
87         return 0;
88 }
89
90 /* XXX replace with networking code */
91 int mds_reply(struct mds_request *req)
92 {
93         struct mds_request *clnt_req = req->rq_reply_handle;
94
95         ENTRY;
96
97         /* free the request buffer */
98         kfree(req->rq_reqbuf);
99         req->rq_reqbuf = NULL; 
100         
101         /* move the reply to the client */ 
102         clnt_req->rq_replen = req->rq_replen;
103         clnt_req->rq_repbuf = req->rq_repbuf;
104         req->rq_repbuf = NULL;
105         req->rq_replen = 0;
106
107         /* wake up the client */ 
108         wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
109         EXIT;
110         return 0;
111 }
112
113 int mds_error(struct mds_request *req)
114 {
115         struct mds_rep_hdr *hdr;
116
117         ENTRY;
118         hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
119         if (!hdr) { 
120                 EXIT;
121                 return -ENOMEM;
122         }
123
124         memset(hdr, 0, sizeof(*hdr));
125         
126         hdr->seqno = req->rq_reqhdr->seqno;
127         hdr->status = req->rq_status; 
128         hdr->type = MDS_TYPE_ERR;
129
130         req->rq_repbuf = (char *)hdr;
131         req->rq_replen = sizeof(*hdr); 
132
133         EXIT;
134         return mds_reply(req);
135 }
136
137 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt)
138 {
139         /* stolen from NFS */ 
140         struct super_block *sb = mds->mds_sb; 
141         unsigned long ino = fid->id;
142         //__u32 generation = fid->generation;
143         __u32 generation = 0;
144         struct inode *inode;
145         struct list_head *lp;
146         struct dentry *result;
147
148         if (mnt) { 
149                 *mnt = mntget(mds->mds_vfsmnt);
150         }
151
152         if (ino == 0)
153                 return ERR_PTR(-ESTALE);
154
155         inode = iget(sb, ino);
156         if (inode == NULL)
157                 return ERR_PTR(-ENOMEM);
158
159         printk("--> mds_fid2dentry: sb %p\n", inode->i_sb); 
160
161         if (is_bad_inode(inode)
162             || (generation && inode->i_generation != generation)
163                 ) {
164                 /* we didn't find the right inode.. */
165                 printk(__FUNCTION__ 
166                        "bad inode %lu, link: %d ct: %d or version  %u/%u\n",
167                         inode->i_ino,
168                         inode->i_nlink, atomic_read(&inode->i_count),
169                         inode->i_generation,
170                         generation);
171                 iput(inode);
172                 return ERR_PTR(-ESTALE);
173         }
174
175         /* now to find a dentry.
176          * If possible, get a well-connected one
177          */
178         spin_lock(&dcache_lock);
179         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
180                 result = list_entry(lp,struct dentry, d_alias);
181                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
182                         dget_locked(result);
183                         result->d_vfs_flags |= DCACHE_REFERENCED;
184                         spin_unlock(&dcache_lock);
185                         iput(inode);
186                         return result;
187                 }
188         }
189         spin_unlock(&dcache_lock);
190         result = d_alloc_root(inode);
191         if (result == NULL) {
192                 iput(inode);
193                 return ERR_PTR(-ENOMEM);
194         }
195         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
196         return result;
197 }
198
199 static inline void mds_get_objid(struct inode *inode, __u64 *id)
200 {
201         memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
202 }
203
204 int mds_getattr(struct mds_request *req)
205 {
206         struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req->fid1, 
207                                            NULL);
208         struct inode *inode;
209         struct mds_rep *rep;
210         int rc;
211         
212         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
213                           &req->rq_replen, &req->rq_repbuf);
214         if (rc) { 
215                 EXIT;
216                 printk("mds: out of memory\n");
217                 req->rq_status = -ENOMEM;
218                 return -ENOMEM;
219         }
220
221         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
222         rep = req->rq_rep;
223
224         if (!de) { 
225                 EXIT;
226                 req->rq_rephdr->status = -ENOENT;
227                 return 0;
228         }
229
230         inode = de->d_inode;
231         rep->ino = inode->i_ino;
232         rep->atime = inode->i_atime;
233         rep->ctime = inode->i_ctime;
234         rep->mtime = inode->i_mtime;
235         rep->uid = inode->i_uid;
236         rep->gid = inode->i_gid;
237         rep->size = inode->i_size;
238         rep->mode = inode->i_mode;
239         rep->nlink = inode->i_nlink;
240         rep->valid = ~0;
241         mds_get_objid(inode, &rep->objid);
242         dput(de); 
243         return 0;
244 }
245
246 int mds_readpage(struct mds_request *req)
247 {
248         struct vfsmount *mnt;
249         struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req->fid1, 
250                                            &mnt);
251         struct file *file; 
252         struct niobuf *niobuf; 
253         struct mds_rep *rep;
254         int rc;
255         
256         printk("mds_readpage: ino %ld\n", de->d_inode->i_ino);
257         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
258                           &req->rq_replen, &req->rq_repbuf);
259         if (rc) { 
260                 EXIT;
261                 printk("mds: out of memory\n");
262                 req->rq_status = -ENOMEM;
263                 return -ENOMEM;
264         }
265
266         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
267         rep = req->rq_rep;
268
269         if (IS_ERR(de)) { 
270                 EXIT;
271                 req->rq_rephdr->status = PTR_ERR(de); 
272                 return 0;
273         }
274
275         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); 
276         /* note: in case of an error, dentry_open puts dentry */
277         if (IS_ERR(file)) { 
278                 EXIT;
279                 req->rq_rephdr->status = PTR_ERR(file);
280                 return 0;
281         }
282                 
283         niobuf = mds_req_tgt(req->rq_req);
284
285         /* to make this asynchronous make sure that the handling function 
286            doesn't send a reply when this function completes. Instead a 
287            callback function would send the reply */ 
288         rc = mds_sendpage(req, file, req->rq_req->size, niobuf); 
289
290         filp_close(file, 0);
291         req->rq_rephdr->status = rc;
292         EXIT;
293         return 0;
294 }
295
296 int mds_reint(struct mds_request *req)
297 {
298         int rc;
299         char *buf = mds_req_tgt(req->rq_req);
300         int len = req->rq_req->tgtlen;
301         struct mds_update_record rec;
302         
303         rc = mds_update_unpack(buf, len, &rec);
304         if (rc) { 
305                 printk(__FUNCTION__ ": invalid record\n");
306                 return -EINVAL;
307         }
308
309         rc = mds_reint_rec(&rec, req); 
310         return 0; 
311 }
312
313 //int mds_handle(struct mds_conn *conn, int len, char *buf)
314 int mds_handle(struct mds_request *req)
315 {
316         int rc;
317         struct mds_req_hdr *hdr;
318
319         ENTRY;
320
321         hdr = (struct mds_req_hdr *)req->rq_reqbuf;
322
323         if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
324                 printk("lustre_mds: wrong packet type sent %d\n",
325                        NTOH__u32(hdr->type));
326                 rc = -EINVAL;
327                 goto out;
328         }
329
330         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
331                             &req->rq_reqhdr, &req->rq_req);
332         if (rc) { 
333                 printk("lustre_mds: Invalid request\n");
334                 EXIT; 
335                 goto out;
336         }
337
338         switch (req->rq_reqhdr->opc) { 
339
340         case MDS_GETATTR:
341                 CDEBUG(D_INODE, "getattr\n");
342                 rc = mds_getattr(req);
343                 break;
344
345         case MDS_READPAGE:
346                 CDEBUG(D_INODE, "readpage\n");
347                 rc = mds_readpage(req);
348                 break;
349
350         case MDS_REINT:
351                 CDEBUG(D_INODE, "reint\n");
352                 rc = mds_reint(req);
353                 break;
354
355         default:
356                 return mds_error(req);
357         }
358
359 out:
360         if (rc) { 
361                 printk("mds: processing error %d\n", rc);
362                 mds_error(req);
363         } else { 
364                 CDEBUG(D_INODE, "sending reply\n"); 
365                 mds_reply(req); 
366         }
367
368         return 0;
369 }
370
371
372 static void mds_timer_run(unsigned long __data)
373 {
374         struct task_struct * p = (struct task_struct *) __data;
375
376         wake_up_process(p);
377 }
378
379 int mds_main(void *arg)
380 {
381         struct mds_obd *mds = (struct mds_obd *) arg;
382         struct timer_list timer;
383
384         lock_kernel();
385         daemonize();
386         spin_lock_irq(&current->sigmask_lock);
387         sigfillset(&current->blocked);
388         recalc_sigpending(current);
389         spin_unlock_irq(&current->sigmask_lock);
390
391         sprintf(current->comm, "lustre_mds");
392
393         /* Set up an interval timer which can be used to trigger a
394            wakeup after the interval expires */
395         init_timer(&timer);
396         timer.data = (unsigned long) current;
397         timer.function = mds_timer_run;
398         mds->mds_timer = &timer;
399
400         /* Record that the  thread is running */
401         mds->mds_thread = current;
402         wake_up(&mds->mds_done_waitq); 
403
404         printk(KERN_INFO "lustre_mds starting.  Commit interval %d seconds\n",
405                         mds->mds_interval / HZ);
406
407         /* XXX maintain a list of all managed devices: insert here */
408
409         /* And now, wait forever for commit wakeup events. */
410         while (1) {
411                 struct mds_request *request;
412                 int rc; 
413
414                 if (mds->mds_flags & MDS_UNMOUNT)
415                         break;
416
417
418                 wake_up(&mds->mds_done_waitq);
419                 interruptible_sleep_on(&mds->mds_waitq);
420
421                 CDEBUG(D_INODE, "lustre_mds wakes\n");
422                 CDEBUG(D_INODE, "pick up req here and continue\n"); 
423
424                 if (list_empty(&mds->mds_reqs)) { 
425                         CDEBUG(D_INODE, "woke because of timer\n"); 
426                 } else { 
427                         request = list_entry(mds->mds_reqs.next, 
428                                              struct mds_request, rq_list);
429                         list_del(&request->rq_list);
430                         rc = mds_handle(request); 
431                 }
432         }
433
434         del_timer_sync(mds->mds_timer);
435
436         /* XXX maintain a list of all managed devices: cleanup here */
437
438         mds->mds_thread = NULL;
439         wake_up(&mds->mds_done_waitq);
440         printk("lustre_mds: exiting\n");
441         return 0;
442 }
443
444 static void mds_stop_srv_thread(struct mds_obd *mds)
445 {
446         mds->mds_flags |= MDS_UNMOUNT;
447
448         while (mds->mds_thread) {
449                 wake_up(&mds->mds_waitq);
450                 sleep_on(&mds->mds_done_waitq);
451         }
452 }
453
454 static void mds_start_srv_thread(struct mds_obd *mds)
455 {
456         init_waitqueue_head(&mds->mds_waitq);
457         init_waitqueue_head(&mds->mds_done_waitq);
458         kernel_thread(mds_main, (void *)mds, 
459                       CLONE_VM | CLONE_FS | CLONE_FILES);
460         while (!mds->mds_thread) 
461                 sleep_on(&mds->mds_done_waitq);
462 }
463
464 /* mount the file system (secretly) */
465 static int mds_setup(struct obd_device *obddev, obd_count len,
466                         void *buf)
467                         
468 {
469         struct obd_ioctl_data* data = buf;
470         struct mds_obd *mds = &obddev->u.mds;
471         struct vfsmount *mnt;
472         int err; 
473         ENTRY;
474         
475         mnt = do_kern_mount(data->ioc_inlbuf2, 0, 
476                             data->ioc_inlbuf1, NULL); 
477         err = PTR_ERR(mnt);
478         if (IS_ERR(mnt)) { 
479                 EXIT;
480                 return err;
481         }
482
483         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
484         if (!obddev->u.mds.mds_sb) {
485                 EXIT;
486                 return -ENODEV;
487         }
488
489         INIT_LIST_HEAD(&mds->mds_reqs);
490         mds->mds_thread = NULL;
491         mds->mds_flags = 0;
492         mds->mds_interval = 3 * HZ;
493         mds->mds_vfsmnt = mnt;
494         obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
495
496         mds->mds_ctxt.pwdmnt = mnt;
497         mds->mds_ctxt.pwd = mnt->mnt_root;
498         mds->mds_ctxt.fs = KERNEL_DS;
499         MDS = mds;
500
501         spin_lock_init(&obddev->u.mds.mds_lock);
502
503         mds_start_srv_thread(mds);
504
505         MOD_INC_USE_COUNT;
506         EXIT; 
507         return 0;
508
509
510 static int mds_cleanup(struct obd_device * obddev)
511 {
512         struct super_block *sb;
513         struct mds_obd *mds = &obddev->u.mds;
514
515         ENTRY;
516
517         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
518                 EXIT;
519                 return 0;
520         }
521
522         if ( !list_empty(&obddev->obd_gen_clients) ) {
523                 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
524                 EXIT;
525                 return -EBUSY;
526         }
527
528         MDS = NULL;
529         mds_stop_srv_thread(mds);
530         sb = mds->mds_sb;
531         if (!mds->mds_sb){
532                 EXIT;
533                 return 0;
534         }
535
536         if (!list_empty(&mds->mds_reqs)) {
537                 // XXX reply with errors and clean up
538                 CDEBUG(D_INODE, "Request list not empty!\n");
539         }
540
541         unlock_kernel();
542         mntput(mds->mds_vfsmnt); 
543         mds->mds_sb = 0;
544         kfree(mds->mds_fstype);
545         lock_kernel();
546         
547
548         MOD_DEC_USE_COUNT;
549         EXIT;
550         return 0;
551 }
552
553 /* use obd ops to offer management infrastructure */
554 static struct obd_ops mds_obd_ops = {
555         o_setup:       mds_setup,
556         o_cleanup:     mds_cleanup,
557 };
558
559 static int __init mds_init(void)
560 {
561         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
562         return 0;
563 }
564
565 static void __exit mds_exit(void)
566 {
567         obd_unregister_type(LUSTRE_MDS_NAME);
568 }
569
570 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
571 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
572 MODULE_LICENSE("GPL");
573
574
575 // for testing (maybe this stays)
576 EXPORT_SYMBOL(mds_queue_req);
577
578 module_init(mds_init);
579 module_exit(mds_exit);