Whamcloud - gitweb
- Added DEBUG_SUBSYSTEMs
[fs/lustre-release.git] / lustre / mds / handler.c
1 /*
2  *  linux/mds/handler.c
3  *  
4  *  Lustre Metadata Server (mds) request handler
5  * 
6  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
7  *
8  *  This code is issued under the GNU General Public License.
9  *  See the file COPYING in this distribution
10  *
11  *  by Peter Braam <braam@clusterfs.com>
12  * 
13  *  This server is single threaded at present (but can easily be multi threaded). 
14  * 
15  */
16
17 #define EXPORT_SYMTAB
18
19 #include <linux/version.h>
20 #include <linux/module.h>
21 #include <linux/fs.h>
22 #include <linux/stat.h>
23 #include <linux/locks.h>
24 #include <linux/ext2_fs.h>
25 #include <linux/quotaops.h>
26 #include <asm/unistd.h>
27 #include <asm/uaccess.h>
28
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/obd_support.h>
32 #include <linux/obd.h>
33 #include <linux/lustre_lib.h>
34 #include <linux/lustre_idl.h>
35 #include <linux/lustre_mds.h>
36 #include <linux/lustre_net.h>
37 #include <linux/obd_class.h>
38
39 // XXX for testing
40 static struct mds_obd *MDS;
41
42 // XXX make this networked!  
43 static int mds_queue_req(struct ptlrpc_request *req)
44 {
45         struct ptlrpc_request *srv_req;
46         
47         if (!MDS) { 
48                 EXIT;
49                 return -1;
50         }
51
52         OBD_ALLOC(srv_req, sizeof(*srv_req));
53         if (!srv_req) { 
54                 EXIT;
55                 return -ENOMEM;
56         }
57
58         CDEBUG(0, "---> MDS at %d %p, incoming req %p, srv_req %p\n",
59                __LINE__, MDS, req, srv_req);
60
61         memset(srv_req, 0, sizeof(*req)); 
62
63         /* move the request buffer */
64         srv_req->rq_reqbuf = req->rq_reqbuf;
65         srv_req->rq_reqlen = req->rq_reqlen;
66         srv_req->rq_obd = MDS;
67
68         /* remember where it came from */
69         srv_req->rq_reply_handle = req;
70
71         list_add(&srv_req->rq_list, &MDS->mds_reqs); 
72         wake_up(&MDS->mds_waitq);
73         return 0;
74 }
75
76 int mds_sendpage(struct ptlrpc_request *req, struct file *file, 
77                     __u64 offset, struct niobuf *dst)
78 {
79         int rc; 
80         mm_segment_t oldfs = get_fs();
81
82         if (req->rq_peer.peer_nid == 0) {
83                 /* dst->addr is a user address, but in a different task! */
84                 set_fs(KERNEL_DS); 
85                 rc = generic_file_read(file, (char *)(long)dst->addr, 
86                                        PAGE_SIZE, &offset); 
87                 set_fs(oldfs);
88
89                 if (rc != PAGE_SIZE) 
90                         return -EIO;
91         } else {
92                 char *buf;
93
94                 OBD_ALLOC(buf, PAGE_SIZE);
95                 if (!buf)
96                         return -ENOMEM;
97
98                 set_fs(KERNEL_DS); 
99                 rc = generic_file_read(file, buf, PAGE_SIZE, &offset); 
100                 set_fs(oldfs);
101
102                 if (rc != PAGE_SIZE) {
103                         OBD_FREE(buf, PAGE_SIZE);
104                         return -EIO;
105                 }
106
107                 req->rq_bulkbuf = buf;
108                 req->rq_bulklen = PAGE_SIZE;
109                 rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL, 0);
110                 init_waitqueue_head(&req->rq_wait_for_bulk);
111                 sleep_on(&req->rq_wait_for_bulk);
112                 OBD_FREE(buf, PAGE_SIZE);
113                 req->rq_bulklen = 0; /* FIXME: eek. */
114         }
115
116         return 0;
117 }
118
119 int mds_reply(struct ptlrpc_request *req)
120 {
121         struct ptlrpc_request *clnt_req = req->rq_reply_handle;
122
123         ENTRY;
124         
125         if (req->rq_obd->mds_service != NULL) {
126                 /* This is a request that came from the network via portals. */
127
128                 /* FIXME: we need to increment the count of handled events */
129                 ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0);
130         } else {
131                 /* This is a local request that came from another thread. */
132
133                 /* move the reply to the client */ 
134                 clnt_req->rq_replen = req->rq_replen;
135                 clnt_req->rq_repbuf = req->rq_repbuf;
136                 req->rq_repbuf = NULL;
137                 req->rq_replen = 0;
138
139                 /* free the request buffer */
140                 OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
141                 req->rq_reqbuf = NULL;
142
143                 /* wake up the client */ 
144                 wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
145         }
146
147         EXIT;
148         return 0;
149 }
150
151 int mds_error(struct ptlrpc_request *req)
152 {
153         struct ptlrep_hdr *hdr;
154
155         ENTRY;
156
157         OBD_ALLOC(hdr, sizeof(*hdr));
158         if (!hdr) { 
159                 EXIT;
160                 return -ENOMEM;
161         }
162
163         memset(hdr, 0, sizeof(*hdr));
164         
165         hdr->seqno = req->rq_reqhdr->seqno;
166         hdr->status = req->rq_status; 
167         hdr->type = MDS_TYPE_ERR;
168
169         req->rq_repbuf = (char *)hdr;
170         req->rq_replen = sizeof(*hdr); 
171
172         EXIT;
173         return mds_reply(req);
174 }
175
176 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
177                               struct vfsmount **mnt)
178 {
179         /* stolen from NFS */ 
180         struct super_block *sb = mds->mds_sb; 
181         unsigned long ino = fid->id;
182         //__u32 generation = fid->generation;
183         __u32 generation = 0;
184         struct inode *inode;
185         struct list_head *lp;
186         struct dentry *result;
187
188         if (ino == 0)
189                 return ERR_PTR(-ESTALE);
190
191         inode = iget(sb, ino);
192         if (inode == NULL)
193                 return ERR_PTR(-ENOMEM);
194
195         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb); 
196
197         if (is_bad_inode(inode)
198             || (generation && inode->i_generation != generation)
199                 ) {
200                 /* we didn't find the right inode.. */
201                 printk(__FUNCTION__ 
202                        "bad inode %lu, link: %d ct: %d or version  %u/%u\n",
203                         inode->i_ino,
204                         inode->i_nlink, atomic_read(&inode->i_count),
205                         inode->i_generation,
206                         generation);
207                 iput(inode);
208                 return ERR_PTR(-ESTALE);
209         }
210
211         /* now to find a dentry.
212          * If possible, get a well-connected one
213          */
214         if (mnt)
215                 *mnt = mds->mds_vfsmnt;
216         spin_lock(&dcache_lock);
217         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
218                 result = list_entry(lp,struct dentry, d_alias);
219                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
220                         dget_locked(result);
221                         result->d_vfs_flags |= DCACHE_REFERENCED;
222                         spin_unlock(&dcache_lock);
223                         iput(inode);
224                         if (mnt)
225                                 mntget(*mnt);
226                         return result;
227                 }
228         }
229         spin_unlock(&dcache_lock);
230         result = d_alloc_root(inode);
231         if (result == NULL) {
232                 iput(inode);
233                 return ERR_PTR(-ENOMEM);
234         }
235         if (mnt)
236                 mntget(*mnt);
237         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
238         return result;
239 }
240
241 static inline void mds_get_objid(struct inode *inode, __u64 *id)
242 {
243         memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
244 }
245
246 int mds_getattr(struct ptlrpc_request *req)
247 {
248         struct dentry *de;
249         struct inode *inode;
250         struct mds_rep *rep;
251         int rc;
252         
253         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds, 
254                           &req->rq_replen, &req->rq_repbuf);
255         if (rc) { 
256                 EXIT;
257                 printk("mds: out of memory\n");
258                 req->rq_status = -ENOMEM;
259                 return 0;
260         }
261
262         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
263         rep = req->rq_rep.mds;
264
265         de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, NULL);
266         if (IS_ERR(de)) { 
267                 EXIT;
268                 req->rq_rephdr->status = -ENOENT;
269                 return 0;
270         }
271
272         inode = de->d_inode;
273         rep->ino = inode->i_ino;
274         rep->atime = inode->i_atime;
275         rep->ctime = inode->i_ctime;
276         rep->mtime = inode->i_mtime;
277         rep->uid = inode->i_uid;
278         rep->gid = inode->i_gid;
279         rep->size = inode->i_size;
280         rep->mode = inode->i_mode;
281         rep->nlink = inode->i_nlink;
282         rep->valid = ~0;
283         mds_get_objid(inode, &rep->objid);
284         dput(de); 
285         return 0;
286 }
287
288 int mds_readpage(struct ptlrpc_request *req)
289 {
290         struct vfsmount *mnt;
291         struct dentry *de;
292         struct file *file; 
293         struct niobuf *niobuf; 
294         struct mds_rep *rep;
295         int rc;
296         
297         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds, 
298                           &req->rq_replen, &req->rq_repbuf);
299         if (rc) { 
300                 EXIT;
301                 printk("mds: out of memory\n");
302                 req->rq_status = -ENOMEM;
303                 return 0;
304         }
305
306         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
307         rep = req->rq_rep.mds;
308
309         de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, &mnt);
310         if (IS_ERR(de)) { 
311                 EXIT;
312                 req->rq_rephdr->status = PTR_ERR(de); 
313                 return 0;
314         }
315
316         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
317
318         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); 
319         /* note: in case of an error, dentry_open puts dentry */
320         if (IS_ERR(file)) { 
321                 EXIT;
322                 req->rq_rephdr->status = PTR_ERR(file);
323                 return 0;
324         }
325                 
326         niobuf = mds_req_tgt(req->rq_req.mds);
327
328         /* to make this asynchronous make sure that the handling function 
329            doesn't send a reply when this function completes. Instead a 
330            callback function would send the reply */ 
331         rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf); 
332
333         filp_close(file, 0);
334         req->rq_rephdr->status = rc;
335         EXIT;
336         return 0;
337 }
338
339 int mds_reint(struct ptlrpc_request *req)
340 {
341         int rc;
342         char *buf = mds_req_tgt(req->rq_req.mds);
343         int len = req->rq_req.mds->tgtlen;
344         struct mds_update_record rec;
345         
346         rc = mds_update_unpack(buf, len, &rec);
347         if (rc) { 
348                 printk(__FUNCTION__ ": invalid record\n");
349                 req->rq_status = -EINVAL;
350                 return 0;
351         }
352         /* rc will be used to interrupt a for loop over multiple records */
353         rc = mds_reint_rec(&rec, req); 
354         return 0; 
355 }
356
357 //int mds_handle(struct mds_conn *conn, int len, char *buf)
358 int mds_handle(struct ptlrpc_request *req)
359 {
360         int rc;
361         struct ptlreq_hdr *hdr;
362
363         ENTRY;
364
365         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
366
367         if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
368                 printk("lustre_mds: wrong packet type sent %d\n",
369                        NTOH__u32(hdr->type));
370                 rc = -EINVAL;
371                 goto out;
372         }
373
374         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
375                             &req->rq_reqhdr, &req->rq_req.mds);
376         if (rc) { 
377                 printk("lustre_mds: Invalid request\n");
378                 EXIT; 
379                 goto out;
380         }
381
382         switch (req->rq_reqhdr->opc) { 
383
384         case MDS_GETATTR:
385                 CDEBUG(D_INODE, "getattr\n");
386                 rc = mds_getattr(req);
387                 break;
388
389         case MDS_READPAGE:
390                 CDEBUG(D_INODE, "readpage\n");
391                 rc = mds_readpage(req);
392                 break;
393
394         case MDS_REINT:
395                 CDEBUG(D_INODE, "reint\n");
396                 rc = mds_reint(req);
397                 break;
398
399         default:
400                 return mds_error(req);
401         }
402
403 out:
404         if (rc) { 
405                 printk(__FUNCTION__ ": no header\n");
406                 return 0;
407         }
408
409         if( req->rq_status) { 
410                 mds_error(req);
411         } else { 
412                 CDEBUG(D_INODE, "sending reply\n"); 
413                 mds_reply(req); 
414         }
415
416         return 0;
417 }
418
419
420 static void mds_timer_run(unsigned long __data)
421 {
422         struct task_struct * p = (struct task_struct *) __data;
423
424         wake_up_process(p);
425 }
426
427 int mds_main(void *arg)
428 {
429         struct mds_obd *mds = (struct mds_obd *) arg;
430         struct timer_list timer;
431
432         lock_kernel();
433         daemonize();
434         spin_lock_irq(&current->sigmask_lock);
435         sigfillset(&current->blocked);
436         recalc_sigpending(current);
437         spin_unlock_irq(&current->sigmask_lock);
438
439         sprintf(current->comm, "lustre_mds");
440
441         /* Set up an interval timer which can be used to trigger a
442            wakeup after the interval expires */
443         init_timer(&timer);
444         timer.data = (unsigned long) current;
445         timer.function = mds_timer_run;
446         mds->mds_timer = &timer;
447
448         /* Record that the  thread is running */
449         mds->mds_thread = current;
450         wake_up(&mds->mds_done_waitq); 
451
452         /* And now, wait forever for commit wakeup events. */
453         while (1) {
454                 int rc;
455
456                 if (mds->mds_flags & MDS_UNMOUNT)
457                         break;
458
459                 wake_up(&mds->mds_done_waitq);
460                 interruptible_sleep_on(&mds->mds_waitq);
461
462                 CDEBUG(D_INODE, "lustre_mds wakes\n");
463                 CDEBUG(D_INODE, "pick up req here and continue\n"); 
464
465                 if (mds->mds_service != NULL) {
466                         ptl_event_t ev;
467
468                         while (1) {
469                                 struct ptlrpc_request request;
470                                 struct ptlrpc_service *service;
471
472                                 rc = PtlEQGet(mds->mds_service->srv_eq_h, &ev);
473                                 if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
474                                         break;
475                                 
476                                 service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;        
477
478                                 /* FIXME: If we move to an event-driven model,
479                                  * we should put the request on the stack of
480                                  * mds_handle instead. */
481                                 memset(&request, 0, sizeof(request));
482                                 request.rq_reqbuf = ev.mem_desc.start +
483                                         ev.offset;
484                                 request.rq_reqlen = ev.mem_desc.length;
485                                 request.rq_obd = MDS;
486                                 request.rq_xid = ev.match_bits;
487
488                                 request.rq_peer.peer_nid = ev.initiator.nid;
489                                 /* FIXME: this NI should be the incoming NI.
490                                  * We don't know how to find that from here. */
491                                 request.rq_peer.peer_ni =
492                                         mds->mds_service->srv_self.peer_ni;
493                                 rc = mds_handle(&request);
494
495                                 /* Inform the rpc layer the event has been handled */ 
496                                 ptl_received_rpc(service);
497                         }
498                 } else {
499                         struct ptlrpc_request *request;
500
501                         if (list_empty(&mds->mds_reqs)) {
502                                 CDEBUG(D_INODE, "woke because of timer\n");
503                         } else {
504                                 request = list_entry(mds->mds_reqs.next,
505                                                      struct ptlrpc_request,
506                                                      rq_list);
507                                 list_del(&request->rq_list);
508                                 rc = mds_handle(request);
509                         }
510                 }
511         }
512
513         del_timer_sync(mds->mds_timer);
514
515         /* XXX maintain a list of all managed devices: cleanup here */
516
517         mds->mds_thread = NULL;
518         wake_up(&mds->mds_done_waitq);
519         printk("lustre_mds: exiting\n");
520         return 0;
521 }
522
523 static void mds_stop_srv_thread(struct mds_obd *mds)
524 {
525         mds->mds_flags |= MDS_UNMOUNT;
526
527         while (mds->mds_thread) {
528                 wake_up(&mds->mds_waitq);
529                 sleep_on(&mds->mds_done_waitq);
530         }
531 }
532
533 static void mds_start_srv_thread(struct mds_obd *mds)
534 {
535         init_waitqueue_head(&mds->mds_waitq);
536         init_waitqueue_head(&mds->mds_done_waitq);
537         kernel_thread(mds_main, (void *)mds, 
538                       CLONE_VM | CLONE_FS | CLONE_FILES);
539         while (!mds->mds_thread) 
540                 sleep_on(&mds->mds_done_waitq);
541 }
542
543 /* mount the file system (secretly) */
544 static int mds_setup(struct obd_device *obddev, obd_count len,
545                         void *buf)
546                         
547 {
548         struct obd_ioctl_data* data = buf;
549         struct mds_obd *mds = &obddev->u.mds;
550         struct vfsmount *mnt;
551         struct lustre_peer peer;
552         int err; 
553         ENTRY;
554
555
556         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); 
557         err = PTR_ERR(mnt);
558         if (IS_ERR(mnt)) { 
559                 EXIT;
560                 return err;
561         }
562
563         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
564         if (!obddev->u.mds.mds_sb) {
565                 EXIT;
566                 return -ENODEV;
567         }
568
569         mds->mds_vfsmnt = mnt;
570         obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
571
572         mds->mds_ctxt.pwdmnt = mnt;
573         mds->mds_ctxt.pwd = mnt->mnt_root;
574         mds->mds_ctxt.fs = KERNEL_DS;
575         mds->mds_remote_nid = 0;
576
577         INIT_LIST_HEAD(&mds->mds_reqs);
578         mds->mds_thread = NULL;
579         mds->mds_flags = 0;
580         mds->mds_interval = 3 * HZ;
581         MDS = mds;
582
583         spin_lock_init(&obddev->u.mds.mds_lock);
584
585         err = kportal_uuid_to_peer("self", &peer);
586         if (err == 0) {
587                 OBD_ALLOC(mds->mds_service, sizeof(*mds->mds_service));
588                 if (mds->mds_service == NULL)
589                         return -ENOMEM;
590                 mds->mds_service->srv_buf_size = 64 * 1024;
591                 mds->mds_service->srv_portal = MDS_REQUEST_PORTAL;
592                 memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer));
593                 mds->mds_service->srv_wait_queue = &mds->mds_waitq;
594
595                 rpc_register_service(mds->mds_service, "self");
596         }
597
598         mds_start_srv_thread(mds);
599
600         MOD_INC_USE_COUNT;
601         EXIT; 
602         return 0;
603
604
605 static int mds_cleanup(struct obd_device * obddev)
606 {
607         struct super_block *sb;
608         struct mds_obd *mds = &obddev->u.mds;
609
610         ENTRY;
611
612         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
613                 EXIT;
614                 return 0;
615         }
616
617         if ( !list_empty(&obddev->obd_gen_clients) ) {
618                 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
619                 EXIT;
620                 return -EBUSY;
621         }
622
623         MDS = NULL;
624         mds_stop_srv_thread(mds);
625         rpc_unregister_service(mds->mds_service);
626         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
627
628         sb = mds->mds_sb;
629         if (!mds->mds_sb){
630                 EXIT;
631                 return 0;
632         }
633
634         if (!list_empty(&mds->mds_reqs)) {
635                 // XXX reply with errors and clean up
636                 CDEBUG(D_INODE, "Request list not empty!\n");
637         }
638
639         unlock_kernel();
640         mntput(mds->mds_vfsmnt); 
641         mds->mds_sb = 0;
642         kfree(mds->mds_fstype);
643         lock_kernel();
644
645         MOD_DEC_USE_COUNT;
646         EXIT;
647         return 0;
648 }
649
650 /* use obd ops to offer management infrastructure */
651 static struct obd_ops mds_obd_ops = {
652         o_setup:       mds_setup,
653         o_cleanup:     mds_cleanup,
654 };
655
656 static int __init mds_init(void)
657 {
658         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
659         return 0;
660 }
661
662 static void __exit mds_exit(void)
663 {
664         obd_unregister_type(LUSTRE_MDS_NAME);
665 }
666
667 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
668 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
669 MODULE_LICENSE("GPL");
670
671
672 // for testing (maybe this stays)
673 EXPORT_SYMBOL(mds_queue_req);
674
675 module_init(mds_init);
676 module_exit(mds_exit);