Whamcloud - gitweb
- zero out the request structure after allocation
[fs/lustre-release.git] / lustre / mds / handler.c
1 /*
2  *  linux/mds/handler.c
3  *  
4  *  Lustre Metadata Server (mds) request handler
5  * 
6  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
7  *
8  *  This code is issued under the GNU General Public License.
9  *  See the file COPYING in this distribution
10  *
11  *  by Peter Braam <braam@clusterfs.com>
12  * 
13  *  This server is single threaded at present (but can easily be multi threaded). 
14  * 
15  */
16
17
18 #define EXPORT_SYMTAB
19
20 #include <linux/version.h>
21 #include <linux/module.h>
22 #include <linux/fs.h>
23 #include <linux/stat.h>
24 #include <linux/locks.h>
25 #include <linux/ext2_fs.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29 #include <linux/obd_support.h>
30 #include <linux/obd.h>
31 #include <linux/lustre_lib.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_net.h>
35 #include <linux/obd_class.h>
36
37 // XXX for testing
38 static struct mds_obd *MDS;
39
40 // XXX make this networked!  
41 static int mds_queue_req(struct ptlrpc_request *req)
42 {
43         struct ptlrpc_request *srv_req;
44         
45         if (!MDS) { 
46                 EXIT;
47                 return -1;
48         }
49
50         srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
51         if (!srv_req) { 
52                 EXIT;
53                 return -ENOMEM;
54         }
55
56         printk("---> MDS at %d %p, incoming req %p, srv_req %p\n", 
57                __LINE__, MDS, req, srv_req);
58
59         memset(srv_req, 0, sizeof(*req)); 
60
61         /* move the request buffer */
62         srv_req->rq_reqbuf = req->rq_reqbuf;
63         srv_req->rq_reqlen    = req->rq_reqlen;
64         srv_req->rq_obd = MDS;
65
66         /* remember where it came from */
67         srv_req->rq_reply_handle = req;
68
69         list_add(&srv_req->rq_list, &MDS->mds_reqs); 
70         wake_up(&MDS->mds_waitq);
71         return 0;
72 }
73
74 /* XXX do this over the net */
75 int mds_sendpage(struct ptlrpc_request *req, struct file *file, 
76                     __u64 offset, struct niobuf *dst)
77 {
78         int rc; 
79         mm_segment_t oldfs = get_fs();
80
81         if (req->rq_peer.peer_nid == 0) {
82                 /* dst->addr is a user address, but in a different task! */
83                 set_fs(KERNEL_DS); 
84                 rc = generic_file_read(file, (char *)(long)dst->addr, 
85                                        PAGE_SIZE, &offset); 
86                 set_fs(oldfs);
87
88                 if (rc != PAGE_SIZE) 
89                         return -EIO;
90         } else {
91                 char *buf;
92
93                 buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
94                 if (!buf) {
95                         return -ENOMEM;
96                 }
97
98                 set_fs(KERNEL_DS); 
99                 rc = generic_file_read(file, buf, PAGE_SIZE, &offset); 
100                 set_fs(oldfs);
101
102                 if (rc != PAGE_SIZE) 
103                         return -EIO;
104
105                 req->rq_bulkbuf = buf;
106                 req->rq_bulklen = PAGE_SIZE;
107                 rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL, 0);
108                 init_waitqueue_head(&req->rq_wait_for_bulk);
109                 sleep_on(&req->rq_wait_for_bulk);
110                 kfree(buf);
111                 req->rq_bulklen = 0; /* FIXME: eek. */
112         }
113
114         return 0;
115 }
116
117 /* XXX replace with networking code */
118 int mds_reply(struct ptlrpc_request *req)
119 {
120         struct ptlrpc_request *clnt_req = req->rq_reply_handle;
121
122         ENTRY;
123         
124         if (req->rq_obd->mds_service != NULL) {
125                 /* This is a request that came from the network via portals. */
126
127                 /* FIXME: we need to increment the count of handled events */
128                 ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0);
129         } else {
130                 /* This is a local request that came from another thread. */
131
132                 /* move the reply to the client */ 
133                 clnt_req->rq_replen = req->rq_replen;
134                 clnt_req->rq_repbuf = req->rq_repbuf;
135                 req->rq_repbuf = NULL;
136                 req->rq_replen = 0;
137
138                 /* free the request buffer */
139                 kfree(req->rq_reqbuf);
140                 req->rq_reqbuf = NULL;
141
142                 /* wake up the client */ 
143                 wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
144         }
145
146         EXIT;
147         return 0;
148 }
149
150 int mds_error(struct ptlrpc_request *req)
151 {
152         struct ptlrep_hdr *hdr;
153
154         ENTRY;
155
156         hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
157         if (!hdr) { 
158                 EXIT;
159                 return -ENOMEM;
160         }
161
162         memset(hdr, 0, sizeof(*hdr));
163         
164         hdr->seqno = req->rq_reqhdr->seqno;
165         hdr->status = req->rq_status; 
166         hdr->type = MDS_TYPE_ERR;
167
168         req->rq_repbuf = (char *)hdr;
169         req->rq_replen = sizeof(*hdr); 
170
171         EXIT;
172         return mds_reply(req);
173 }
174
175 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt)
176 {
177         /* stolen from NFS */ 
178         struct super_block *sb = mds->mds_sb; 
179         unsigned long ino = fid->id;
180         //__u32 generation = fid->generation;
181         __u32 generation = 0;
182         struct inode *inode;
183         struct list_head *lp;
184         struct dentry *result;
185
186         if (mnt) { 
187                 *mnt = mntget(mds->mds_vfsmnt);
188         }
189
190         if (ino == 0)
191                 return ERR_PTR(-ESTALE);
192
193         inode = iget(sb, ino);
194         if (inode == NULL)
195                 return ERR_PTR(-ENOMEM);
196
197         printk("--> mds_fid2dentry: sb %p\n", inode->i_sb); 
198
199         if (is_bad_inode(inode)
200             || (generation && inode->i_generation != generation)
201                 ) {
202                 /* we didn't find the right inode.. */
203                 printk(__FUNCTION__ 
204                        "bad inode %lu, link: %d ct: %d or version  %u/%u\n",
205                         inode->i_ino,
206                         inode->i_nlink, atomic_read(&inode->i_count),
207                         inode->i_generation,
208                         generation);
209                 iput(inode);
210                 return ERR_PTR(-ESTALE);
211         }
212
213         /* now to find a dentry.
214          * If possible, get a well-connected one
215          */
216         spin_lock(&dcache_lock);
217         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
218                 result = list_entry(lp,struct dentry, d_alias);
219                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
220                         dget_locked(result);
221                         result->d_vfs_flags |= DCACHE_REFERENCED;
222                         spin_unlock(&dcache_lock);
223                         iput(inode);
224                         return result;
225                 }
226         }
227         spin_unlock(&dcache_lock);
228         result = d_alloc_root(inode);
229         if (result == NULL) {
230                 iput(inode);
231                 return ERR_PTR(-ENOMEM);
232         }
233         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
234         return result;
235 }
236
237 static inline void mds_get_objid(struct inode *inode, __u64 *id)
238 {
239         memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
240 }
241
242 int mds_getattr(struct ptlrpc_request *req)
243 {
244         struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, 
245                                            NULL);
246         struct inode *inode;
247         struct mds_rep *rep;
248         int rc;
249         
250         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds, 
251                           &req->rq_replen, &req->rq_repbuf);
252         if (rc) { 
253                 EXIT;
254                 printk("mds: out of memory\n");
255                 req->rq_status = -ENOMEM;
256                 return 0;
257         }
258
259         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
260         rep = req->rq_rep.mds;
261
262         if (!de) { 
263                 EXIT;
264                 req->rq_rephdr->status = -ENOENT;
265                 return 0;
266         }
267
268         inode = de->d_inode;
269         rep->ino = inode->i_ino;
270         rep->atime = inode->i_atime;
271         rep->ctime = inode->i_ctime;
272         rep->mtime = inode->i_mtime;
273         rep->uid = inode->i_uid;
274         rep->gid = inode->i_gid;
275         rep->size = inode->i_size;
276         rep->mode = inode->i_mode;
277         rep->nlink = inode->i_nlink;
278         rep->valid = ~0;
279         mds_get_objid(inode, &rep->objid);
280         dput(de); 
281         return 0;
282 }
283
284 int mds_readpage(struct ptlrpc_request *req)
285 {
286         struct vfsmount *mnt;
287         struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, 
288                                            &mnt);
289         struct file *file; 
290         struct niobuf *niobuf; 
291         struct mds_rep *rep;
292         int rc;
293         
294         printk("mds_readpage: ino %ld\n", de->d_inode->i_ino);
295         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds, 
296                           &req->rq_replen, &req->rq_repbuf);
297         if (rc) { 
298                 EXIT;
299                 printk("mds: out of memory\n");
300                 req->rq_status = -ENOMEM;
301                 return 0;
302         }
303
304         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
305         rep = req->rq_rep.mds;
306
307         if (IS_ERR(de)) { 
308                 EXIT;
309                 req->rq_rephdr->status = PTR_ERR(de); 
310                 return 0;
311         }
312
313         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); 
314         /* note: in case of an error, dentry_open puts dentry */
315         if (IS_ERR(file)) { 
316                 EXIT;
317                 req->rq_rephdr->status = PTR_ERR(file);
318                 return 0;
319         }
320                 
321         niobuf = mds_req_tgt(req->rq_req.mds);
322
323         /* to make this asynchronous make sure that the handling function 
324            doesn't send a reply when this function completes. Instead a 
325            callback function would send the reply */ 
326         rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf); 
327
328         filp_close(file, 0);
329         req->rq_rephdr->status = rc;
330         EXIT;
331         return 0;
332 }
333
334 int mds_reint(struct ptlrpc_request *req)
335 {
336         int rc;
337         char *buf = mds_req_tgt(req->rq_req.mds);
338         int len = req->rq_req.mds->tgtlen;
339         struct mds_update_record rec;
340         
341         rc = mds_update_unpack(buf, len, &rec);
342         if (rc) { 
343                 printk(__FUNCTION__ ": invalid record\n");
344                 req->rq_status = -EINVAL;
345                 return 0;
346         }
347         /* rc will be used to interrupt a for loop over multiple records */
348         rc = mds_reint_rec(&rec, req); 
349         return 0; 
350 }
351
352 //int mds_handle(struct mds_conn *conn, int len, char *buf)
353 int mds_handle(struct ptlrpc_request *req)
354 {
355         int rc;
356         struct ptlreq_hdr *hdr;
357
358         ENTRY;
359
360         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
361
362         if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
363                 printk("lustre_mds: wrong packet type sent %d\n",
364                        NTOH__u32(hdr->type));
365                 rc = -EINVAL;
366                 goto out;
367         }
368
369         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
370                             &req->rq_reqhdr, &req->rq_req.mds);
371         if (rc) { 
372                 printk("lustre_mds: Invalid request\n");
373                 EXIT; 
374                 goto out;
375         }
376
377         switch (req->rq_reqhdr->opc) { 
378
379         case MDS_GETATTR:
380                 CDEBUG(D_INODE, "getattr\n");
381                 rc = mds_getattr(req);
382                 break;
383
384         case MDS_READPAGE:
385                 CDEBUG(D_INODE, "readpage\n");
386                 rc = mds_readpage(req);
387                 break;
388
389         case MDS_REINT:
390                 CDEBUG(D_INODE, "reint\n");
391                 rc = mds_reint(req);
392                 break;
393
394         default:
395                 return mds_error(req);
396         }
397
398 out:
399         if (rc) { 
400                 printk(__FUNCTION__ ": no header\n");
401                 return 0;
402         }
403
404         if( req->rq_status) { 
405                 mds_error(req);
406         } else { 
407                 CDEBUG(D_INODE, "sending reply\n"); 
408                 mds_reply(req); 
409         }
410
411         return 0;
412 }
413
414
415 static void mds_timer_run(unsigned long __data)
416 {
417         struct task_struct * p = (struct task_struct *) __data;
418
419         wake_up_process(p);
420 }
421
422 int mds_main(void *arg)
423 {
424         struct mds_obd *mds = (struct mds_obd *) arg;
425         struct timer_list timer;
426
427         lock_kernel();
428         daemonize();
429         spin_lock_irq(&current->sigmask_lock);
430         sigfillset(&current->blocked);
431         recalc_sigpending(current);
432         spin_unlock_irq(&current->sigmask_lock);
433
434         sprintf(current->comm, "lustre_mds");
435
436         /* Set up an interval timer which can be used to trigger a
437            wakeup after the interval expires */
438         init_timer(&timer);
439         timer.data = (unsigned long) current;
440         timer.function = mds_timer_run;
441         mds->mds_timer = &timer;
442
443         /* Record that the  thread is running */
444         mds->mds_thread = current;
445         wake_up(&mds->mds_done_waitq); 
446
447         printk(KERN_INFO "lustre_mds starting.  Commit interval %d seconds\n",
448                         mds->mds_interval / HZ);
449
450         /* XXX maintain a list of all managed devices: insert here */
451
452         /* And now, wait forever for commit wakeup events. */
453         while (1) {
454                 int rc;
455
456                 if (mds->mds_flags & MDS_UNMOUNT)
457                         break;
458
459                 wake_up(&mds->mds_done_waitq);
460                 interruptible_sleep_on(&mds->mds_waitq);
461
462                 CDEBUG(D_INODE, "lustre_mds wakes\n");
463                 CDEBUG(D_INODE, "pick up req here and continue\n"); 
464
465                 if (mds->mds_service != NULL) {
466                         ptl_event_t ev;
467
468                         while (1) {
469                                 struct ptlrpc_request request;
470
471                                 rc = PtlEQGet(mds->mds_service->srv_eq, &ev);
472                                 if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
473                                         break;
474                                 /* FIXME: If we move to an event-driven model,
475                                  * we should put the request on the stack of
476                                  * mds_handle instead. */
477                                 memset(&request, 0, sizeof(request));
478                                 request.rq_reqbuf = ev.mem_desc.start +
479                                         ev.offset;
480                                 request.rq_reqlen = ev.mem_desc.length;
481                                 request.rq_obd = MDS;
482                                 request.rq_xid = ev.match_bits;
483
484                                 request.rq_peer.peer_nid = ev.initiator.nid;
485                                 /* FIXME: this NI should be the incoming NI.
486                                  * We don't know how to find that from here. */
487                                 request.rq_peer.peer_ni =
488                                         mds->mds_service->srv_self.peer_ni;
489                                 rc = mds_handle(&request);
490                         }
491                 } else {
492                         struct ptlrpc_request *request;
493
494                         if (list_empty(&mds->mds_reqs)) {
495                                 CDEBUG(D_INODE, "woke because of timer\n");
496                         } else {
497                                 request = list_entry(mds->mds_reqs.next,
498                                                      struct ptlrpc_request,
499                                                      rq_list);
500                                 list_del(&request->rq_list);
501                                 rc = mds_handle(request);
502                         }
503                 }
504         }
505
506         del_timer_sync(mds->mds_timer);
507
508         /* XXX maintain a list of all managed devices: cleanup here */
509
510         mds->mds_thread = NULL;
511         wake_up(&mds->mds_done_waitq);
512         printk("lustre_mds: exiting\n");
513         return 0;
514 }
515
516 static void mds_stop_srv_thread(struct mds_obd *mds)
517 {
518         mds->mds_flags |= MDS_UNMOUNT;
519
520         while (mds->mds_thread) {
521                 wake_up(&mds->mds_waitq);
522                 sleep_on(&mds->mds_done_waitq);
523         }
524 }
525
526 static void mds_start_srv_thread(struct mds_obd *mds)
527 {
528         init_waitqueue_head(&mds->mds_waitq);
529         init_waitqueue_head(&mds->mds_done_waitq);
530         kernel_thread(mds_main, (void *)mds, 
531                       CLONE_VM | CLONE_FS | CLONE_FILES);
532         while (!mds->mds_thread) 
533                 sleep_on(&mds->mds_done_waitq);
534 }
535
536 /* mount the file system (secretly) */
537 static int mds_setup(struct obd_device *obddev, obd_count len,
538                         void *buf)
539                         
540 {
541         struct obd_ioctl_data* data = buf;
542         struct mds_obd *mds = &obddev->u.mds;
543         struct vfsmount *mnt;
544         struct lustre_peer peer;
545         int err; 
546         ENTRY;
547
548
549         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); 
550         err = PTR_ERR(mnt);
551         if (IS_ERR(mnt)) { 
552                 EXIT;
553                 return err;
554         }
555
556         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
557         if (!obddev->u.mds.mds_sb) {
558                 EXIT;
559                 return -ENODEV;
560         }
561
562         mds->mds_vfsmnt = mnt;
563         obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
564
565         mds->mds_ctxt.pwdmnt = mnt;
566         mds->mds_ctxt.pwd = mnt->mnt_root;
567         mds->mds_ctxt.fs = KERNEL_DS;
568         mds->mds_remote_nid = 0;
569
570         INIT_LIST_HEAD(&mds->mds_reqs);
571         mds->mds_thread = NULL;
572         mds->mds_flags = 0;
573         mds->mds_interval = 3 * HZ;
574         MDS = mds;
575
576         spin_lock_init(&obddev->u.mds.mds_lock);
577
578         err = kportal_uuid_to_peer("self", &peer);
579         if (err == 0) {
580                 mds->mds_service = kmalloc(sizeof(*mds->mds_service),
581                                                   GFP_KERNEL);
582                 if (mds->mds_service == NULL)
583                         return -ENOMEM;
584                 mds->mds_service->srv_buf_size = 64 * 1024;
585                 mds->mds_service->srv_portal = MDS_REQUEST_PORTAL;
586                 memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer));
587                 mds->mds_service->srv_wait_queue = &mds->mds_waitq;
588
589                 rpc_register_service(mds->mds_service, "self");
590         }
591
592         mds_start_srv_thread(mds);
593
594         MOD_INC_USE_COUNT;
595         EXIT; 
596         return 0;
597
598
599 static int mds_cleanup(struct obd_device * obddev)
600 {
601         struct super_block *sb;
602         struct mds_obd *mds = &obddev->u.mds;
603
604         ENTRY;
605
606         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
607                 EXIT;
608                 return 0;
609         }
610
611         if ( !list_empty(&obddev->obd_gen_clients) ) {
612                 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
613                 EXIT;
614                 return -EBUSY;
615         }
616
617         MDS = NULL;
618         mds_stop_srv_thread(mds);
619         sb = mds->mds_sb;
620         if (!mds->mds_sb){
621                 EXIT;
622                 return 0;
623         }
624
625         if (!list_empty(&mds->mds_reqs)) {
626                 // XXX reply with errors and clean up
627                 CDEBUG(D_INODE, "Request list not empty!\n");
628         }
629
630         unlock_kernel();
631         mntput(mds->mds_vfsmnt); 
632         mds->mds_sb = 0;
633         kfree(mds->mds_fstype);
634         lock_kernel();
635         
636
637         MOD_DEC_USE_COUNT;
638         EXIT;
639         return 0;
640 }
641
642 /* use obd ops to offer management infrastructure */
643 static struct obd_ops mds_obd_ops = {
644         o_setup:       mds_setup,
645         o_cleanup:     mds_cleanup,
646 };
647
648 static int __init mds_init(void)
649 {
650         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
651         return 0;
652 }
653
654 static void __exit mds_exit(void)
655 {
656         obd_unregister_type(LUSTRE_MDS_NAME);
657 }
658
659 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
660 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
661 MODULE_LICENSE("GPL");
662
663
664 // for testing (maybe this stays)
665 EXPORT_SYMBOL(mds_queue_req);
666
667 module_init(mds_init);
668 module_exit(mds_exit);