Whamcloud - gitweb
a6ad4f69e8b23da1ec0631c4810bb4cb1738475d
[fs/lustre-release.git] / lustre / mds / handler.c
1 /*
2  *  linux/mds/handler.c
3  *  
4  *  Lustre Metadata Server (mds) request handler
5  * 
6  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
7  *
8  *  This code is issued under the GNU General Public License.
9  *  See the file COPYING in this distribution
10  *
11  *  by Peter Braam <braam@clusterfs.com>
12  * 
13  *  This server is single threaded at present (but can easily be multi threaded). 
14  * 
15  */
16
17
18 #define EXPORT_SYMTAB
19
20 #include <linux/version.h>
21 #include <linux/module.h>
22 #include <linux/fs.h>
23 #include <linux/stat.h>
24 #include <linux/locks.h>
25 #include <linux/ext2_fs.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29 #include <linux/obd_support.h>
30 #include <linux/obd.h>
31 #include <linux/lustre_lib.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_net.h>
35 #include <linux/obd_class.h>
36
37 // XXX for testing
38 static struct mds_obd *MDS;
39
40 // XXX make this networked!  
41 static int mds_queue_req(struct ptlrpc_request *req)
42 {
43         struct ptlrpc_request *srv_req;
44         
45         if (!MDS) { 
46                 EXIT;
47                 return -1;
48         }
49
50         OBD_ALLOC(srv_req, sizeof(*srv_req));
51         if (!srv_req) { 
52                 EXIT;
53                 return -ENOMEM;
54         }
55
56         printk("---> MDS at %d %p, incoming req %p, srv_req %p\n", 
57                __LINE__, MDS, req, srv_req);
58
59         memset(srv_req, 0, sizeof(*req)); 
60
61         /* move the request buffer */
62         srv_req->rq_reqbuf = req->rq_reqbuf;
63         srv_req->rq_reqlen = req->rq_reqlen;
64         srv_req->rq_obd = MDS;
65
66         /* remember where it came from */
67         srv_req->rq_reply_handle = req;
68
69         list_add(&srv_req->rq_list, &MDS->mds_reqs); 
70         wake_up(&MDS->mds_waitq);
71         return 0;
72 }
73
74 int mds_sendpage(struct ptlrpc_request *req, struct file *file, 
75                     __u64 offset, struct niobuf *dst)
76 {
77         int rc; 
78         mm_segment_t oldfs = get_fs();
79
80         if (req->rq_peer.peer_nid == 0) {
81                 /* dst->addr is a user address, but in a different task! */
82                 set_fs(KERNEL_DS); 
83                 rc = generic_file_read(file, (char *)(long)dst->addr, 
84                                        PAGE_SIZE, &offset); 
85                 set_fs(oldfs);
86
87                 if (rc != PAGE_SIZE) 
88                         return -EIO;
89         } else {
90                 char *buf;
91
92                 OBD_ALLOC(buf, PAGE_SIZE);
93                 if (!buf)
94                         return -ENOMEM;
95
96                 set_fs(KERNEL_DS); 
97                 rc = generic_file_read(file, buf, PAGE_SIZE, &offset); 
98                 set_fs(oldfs);
99
100                 if (rc != PAGE_SIZE) {
101                         OBD_FREE(buf, PAGE_SIZE);
102                         return -EIO;
103                 }
104
105                 req->rq_bulkbuf = buf;
106                 req->rq_bulklen = PAGE_SIZE;
107                 rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL, 0);
108                 init_waitqueue_head(&req->rq_wait_for_bulk);
109                 sleep_on(&req->rq_wait_for_bulk);
110                 OBD_FREE(buf, PAGE_SIZE);
111                 req->rq_bulklen = 0; /* FIXME: eek. */
112         }
113
114         return 0;
115 }
116
117 int mds_reply(struct ptlrpc_request *req)
118 {
119         struct ptlrpc_request *clnt_req = req->rq_reply_handle;
120
121         ENTRY;
122         
123         if (req->rq_obd->mds_service != NULL) {
124                 /* This is a request that came from the network via portals. */
125
126                 /* FIXME: we need to increment the count of handled events */
127                 ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0);
128         } else {
129                 /* This is a local request that came from another thread. */
130
131                 /* move the reply to the client */ 
132                 clnt_req->rq_replen = req->rq_replen;
133                 clnt_req->rq_repbuf = req->rq_repbuf;
134                 req->rq_repbuf = NULL;
135                 req->rq_replen = 0;
136
137                 /* free the request buffer */
138                 OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
139                 req->rq_reqbuf = NULL;
140
141                 /* wake up the client */ 
142                 wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
143         }
144
145         EXIT;
146         return 0;
147 }
148
149 int mds_error(struct ptlrpc_request *req)
150 {
151         struct ptlrep_hdr *hdr;
152
153         ENTRY;
154
155         OBD_ALLOC(hdr, sizeof(*hdr));
156         if (!hdr) { 
157                 EXIT;
158                 return -ENOMEM;
159         }
160
161         memset(hdr, 0, sizeof(*hdr));
162         
163         hdr->seqno = req->rq_reqhdr->seqno;
164         hdr->status = req->rq_status; 
165         hdr->type = MDS_TYPE_ERR;
166
167         req->rq_repbuf = (char *)hdr;
168         req->rq_replen = sizeof(*hdr); 
169
170         EXIT;
171         return mds_reply(req);
172 }
173
174 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
175                               struct vfsmount **mnt)
176 {
177         /* stolen from NFS */ 
178         struct super_block *sb = mds->mds_sb; 
179         unsigned long ino = fid->id;
180         //__u32 generation = fid->generation;
181         __u32 generation = 0;
182         struct inode *inode;
183         struct list_head *lp;
184         struct dentry *result;
185
186         if (ino == 0)
187                 return ERR_PTR(-ESTALE);
188
189         inode = iget(sb, ino);
190         if (inode == NULL)
191                 return ERR_PTR(-ENOMEM);
192
193         printk("--> mds_fid2dentry: sb %p\n", inode->i_sb); 
194
195         if (is_bad_inode(inode)
196             || (generation && inode->i_generation != generation)
197                 ) {
198                 /* we didn't find the right inode.. */
199                 printk(__FUNCTION__ 
200                        "bad inode %lu, link: %d ct: %d or version  %u/%u\n",
201                         inode->i_ino,
202                         inode->i_nlink, atomic_read(&inode->i_count),
203                         inode->i_generation,
204                         generation);
205                 iput(inode);
206                 return ERR_PTR(-ESTALE);
207         }
208
209         /* now to find a dentry.
210          * If possible, get a well-connected one
211          */
212         spin_lock(&dcache_lock);
213         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
214                 result = list_entry(lp,struct dentry, d_alias);
215                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
216                         dget_locked(result);
217                         result->d_vfs_flags |= DCACHE_REFERENCED;
218                         spin_unlock(&dcache_lock);
219                         iput(inode);
220                         return result;
221                 }
222         }
223         spin_unlock(&dcache_lock);
224         result = d_alloc_root(inode);
225         if (result == NULL) {
226                 iput(inode);
227                 return ERR_PTR(-ENOMEM);
228         }
229         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
230         return result;
231 }
232
233 static inline void mds_get_objid(struct inode *inode, __u64 *id)
234 {
235         memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
236 }
237
238 int mds_getattr(struct ptlrpc_request *req)
239 {
240         struct dentry *de;
241         struct inode *inode;
242         struct mds_rep *rep;
243         int rc;
244         
245         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds, 
246                           &req->rq_replen, &req->rq_repbuf);
247         if (rc) { 
248                 EXIT;
249                 printk("mds: out of memory\n");
250                 req->rq_status = -ENOMEM;
251                 return 0;
252         }
253
254         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
255         rep = req->rq_rep.mds;
256
257         de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, NULL);
258         if (IS_ERR(de)) { 
259                 EXIT;
260                 req->rq_rephdr->status = -ENOENT;
261                 return 0;
262         }
263
264         inode = de->d_inode;
265         rep->ino = inode->i_ino;
266         rep->atime = inode->i_atime;
267         rep->ctime = inode->i_ctime;
268         rep->mtime = inode->i_mtime;
269         rep->uid = inode->i_uid;
270         rep->gid = inode->i_gid;
271         rep->size = inode->i_size;
272         rep->mode = inode->i_mode;
273         rep->nlink = inode->i_nlink;
274         rep->valid = ~0;
275         mds_get_objid(inode, &rep->objid);
276         dput(de); 
277         return 0;
278 }
279
280 int mds_readpage(struct ptlrpc_request *req)
281 {
282         struct vfsmount *mnt;
283         struct dentry *de;
284         struct file *file; 
285         struct niobuf *niobuf; 
286         struct mds_rep *rep;
287         int rc;
288         
289         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds, 
290                           &req->rq_replen, &req->rq_repbuf);
291         if (rc) { 
292                 EXIT;
293                 printk("mds: out of memory\n");
294                 req->rq_status = -ENOMEM;
295                 return 0;
296         }
297
298         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
299         rep = req->rq_rep.mds;
300
301         de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, &mnt);
302         if (IS_ERR(de)) { 
303                 EXIT;
304                 req->rq_rephdr->status = PTR_ERR(de); 
305                 return 0;
306         }
307
308         printk("mds_readpage: ino %ld\n", de->d_inode->i_ino);
309
310         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); 
311         /* note: in case of an error, dentry_open puts dentry */
312         if (IS_ERR(file)) { 
313                 EXIT;
314                 req->rq_rephdr->status = PTR_ERR(file);
315                 return 0;
316         }
317                 
318         niobuf = mds_req_tgt(req->rq_req.mds);
319
320         /* to make this asynchronous make sure that the handling function 
321            doesn't send a reply when this function completes. Instead a 
322            callback function would send the reply */ 
323         rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf); 
324
325         filp_close(file, 0);
326         req->rq_rephdr->status = rc;
327         EXIT;
328         return 0;
329 }
330
331 int mds_reint(struct ptlrpc_request *req)
332 {
333         int rc;
334         char *buf = mds_req_tgt(req->rq_req.mds);
335         int len = req->rq_req.mds->tgtlen;
336         struct mds_update_record rec;
337         
338         rc = mds_update_unpack(buf, len, &rec);
339         if (rc) { 
340                 printk(__FUNCTION__ ": invalid record\n");
341                 req->rq_status = -EINVAL;
342                 return 0;
343         }
344         /* rc will be used to interrupt a for loop over multiple records */
345         rc = mds_reint_rec(&rec, req); 
346         return 0; 
347 }
348
349 //int mds_handle(struct mds_conn *conn, int len, char *buf)
350 int mds_handle(struct ptlrpc_request *req)
351 {
352         int rc;
353         struct ptlreq_hdr *hdr;
354
355         ENTRY;
356
357         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
358
359         if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
360                 printk("lustre_mds: wrong packet type sent %d\n",
361                        NTOH__u32(hdr->type));
362                 rc = -EINVAL;
363                 goto out;
364         }
365
366         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
367                             &req->rq_reqhdr, &req->rq_req.mds);
368         if (rc) { 
369                 printk("lustre_mds: Invalid request\n");
370                 EXIT; 
371                 goto out;
372         }
373
374         switch (req->rq_reqhdr->opc) { 
375
376         case MDS_GETATTR:
377                 CDEBUG(D_INODE, "getattr\n");
378                 rc = mds_getattr(req);
379                 break;
380
381         case MDS_READPAGE:
382                 CDEBUG(D_INODE, "readpage\n");
383                 rc = mds_readpage(req);
384                 break;
385
386         case MDS_REINT:
387                 CDEBUG(D_INODE, "reint\n");
388                 rc = mds_reint(req);
389                 break;
390
391         default:
392                 return mds_error(req);
393         }
394
395 out:
396         if (rc) { 
397                 printk(__FUNCTION__ ": no header\n");
398                 return 0;
399         }
400
401         if( req->rq_status) { 
402                 mds_error(req);
403         } else { 
404                 CDEBUG(D_INODE, "sending reply\n"); 
405                 mds_reply(req); 
406         }
407
408         return 0;
409 }
410
411
412 static void mds_timer_run(unsigned long __data)
413 {
414         struct task_struct * p = (struct task_struct *) __data;
415
416         wake_up_process(p);
417 }
418
419 int mds_main(void *arg)
420 {
421         struct mds_obd *mds = (struct mds_obd *) arg;
422         struct timer_list timer;
423
424         lock_kernel();
425         daemonize();
426         spin_lock_irq(&current->sigmask_lock);
427         sigfillset(&current->blocked);
428         recalc_sigpending(current);
429         spin_unlock_irq(&current->sigmask_lock);
430
431         sprintf(current->comm, "lustre_mds");
432
433         /* Set up an interval timer which can be used to trigger a
434            wakeup after the interval expires */
435         init_timer(&timer);
436         timer.data = (unsigned long) current;
437         timer.function = mds_timer_run;
438         mds->mds_timer = &timer;
439
440         /* Record that the  thread is running */
441         mds->mds_thread = current;
442         wake_up(&mds->mds_done_waitq); 
443
444         printk(KERN_INFO "lustre_mds starting.  Commit interval %d seconds\n",
445                         mds->mds_interval / HZ);
446
447         /* XXX maintain a list of all managed devices: insert here */
448
449         /* And now, wait forever for commit wakeup events. */
450         while (1) {
451                 int rc;
452
453                 if (mds->mds_flags & MDS_UNMOUNT)
454                         break;
455
456                 wake_up(&mds->mds_done_waitq);
457                 interruptible_sleep_on(&mds->mds_waitq);
458
459                 CDEBUG(D_INODE, "lustre_mds wakes\n");
460                 CDEBUG(D_INODE, "pick up req here and continue\n"); 
461
462                 if (mds->mds_service != NULL) {
463                         ptl_event_t ev;
464
465                         while (1) {
466                                 struct ptlrpc_request request;
467                                 struct ptlrpc_service *service;
468
469                                 rc = PtlEQGet(mds->mds_service->srv_eq_h, &ev);
470                                 if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
471                                         break;
472                                 
473                                 service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;        
474
475                                 /* FIXME: If we move to an event-driven model,
476                                  * we should put the request on the stack of
477                                  * mds_handle instead. */
478                                 memset(&request, 0, sizeof(request));
479                                 request.rq_reqbuf = ev.mem_desc.start +
480                                         ev.offset;
481                                 request.rq_reqlen = ev.mem_desc.length;
482                                 request.rq_obd = MDS;
483                                 request.rq_xid = ev.match_bits;
484
485                                 request.rq_peer.peer_nid = ev.initiator.nid;
486                                 /* FIXME: this NI should be the incoming NI.
487                                  * We don't know how to find that from here. */
488                                 request.rq_peer.peer_ni =
489                                         mds->mds_service->srv_self.peer_ni;
490                                 rc = mds_handle(&request);
491
492                                 /* Inform the rpc layer the event has been handled */ 
493                                 ptl_received_rpc(service);
494                         }
495                 } else {
496                         struct ptlrpc_request *request;
497
498                         if (list_empty(&mds->mds_reqs)) {
499                                 CDEBUG(D_INODE, "woke because of timer\n");
500                         } else {
501                                 request = list_entry(mds->mds_reqs.next,
502                                                      struct ptlrpc_request,
503                                                      rq_list);
504                                 list_del(&request->rq_list);
505                                 rc = mds_handle(request);
506                         }
507                 }
508         }
509
510         del_timer_sync(mds->mds_timer);
511
512         /* XXX maintain a list of all managed devices: cleanup here */
513
514         mds->mds_thread = NULL;
515         wake_up(&mds->mds_done_waitq);
516         printk("lustre_mds: exiting\n");
517         return 0;
518 }
519
520 static void mds_stop_srv_thread(struct mds_obd *mds)
521 {
522         mds->mds_flags |= MDS_UNMOUNT;
523
524         while (mds->mds_thread) {
525                 wake_up(&mds->mds_waitq);
526                 sleep_on(&mds->mds_done_waitq);
527         }
528 }
529
530 static void mds_start_srv_thread(struct mds_obd *mds)
531 {
532         init_waitqueue_head(&mds->mds_waitq);
533         init_waitqueue_head(&mds->mds_done_waitq);
534         kernel_thread(mds_main, (void *)mds, 
535                       CLONE_VM | CLONE_FS | CLONE_FILES);
536         while (!mds->mds_thread) 
537                 sleep_on(&mds->mds_done_waitq);
538 }
539
540 /* mount the file system (secretly) */
541 static int mds_setup(struct obd_device *obddev, obd_count len,
542                         void *buf)
543                         
544 {
545         struct obd_ioctl_data* data = buf;
546         struct mds_obd *mds = &obddev->u.mds;
547         struct vfsmount *mnt;
548         struct lustre_peer peer;
549         int err; 
550         ENTRY;
551
552
553         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); 
554         err = PTR_ERR(mnt);
555         if (IS_ERR(mnt)) { 
556                 EXIT;
557                 return err;
558         }
559
560         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
561         if (!obddev->u.mds.mds_sb) {
562                 EXIT;
563                 return -ENODEV;
564         }
565
566         mds->mds_vfsmnt = mnt;
567         obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
568
569         mds->mds_ctxt.pwdmnt = mnt;
570         mds->mds_ctxt.pwd = mnt->mnt_root;
571         mds->mds_ctxt.fs = KERNEL_DS;
572         mds->mds_remote_nid = 0;
573
574         INIT_LIST_HEAD(&mds->mds_reqs);
575         mds->mds_thread = NULL;
576         mds->mds_flags = 0;
577         mds->mds_interval = 3 * HZ;
578         MDS = mds;
579
580         spin_lock_init(&obddev->u.mds.mds_lock);
581
582         err = kportal_uuid_to_peer("self", &peer);
583         if (err == 0) {
584                 OBD_ALLOC(mds->mds_service, sizeof(*mds->mds_service));
585                 if (mds->mds_service == NULL)
586                         return -ENOMEM;
587                 mds->mds_service->srv_buf_size = 64 * 1024;
588                 mds->mds_service->srv_portal = MDS_REQUEST_PORTAL;
589                 memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer));
590                 mds->mds_service->srv_wait_queue = &mds->mds_waitq;
591
592                 rpc_register_service(mds->mds_service, "self");
593         }
594
595         mds_start_srv_thread(mds);
596
597         MOD_INC_USE_COUNT;
598         EXIT; 
599         return 0;
600
601
602 static int mds_cleanup(struct obd_device * obddev)
603 {
604         struct super_block *sb;
605         struct mds_obd *mds = &obddev->u.mds;
606
607         ENTRY;
608
609         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
610                 EXIT;
611                 return 0;
612         }
613
614         if ( !list_empty(&obddev->obd_gen_clients) ) {
615                 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
616                 EXIT;
617                 return -EBUSY;
618         }
619
620         MDS = NULL;
621         mds_stop_srv_thread(mds);
622         rpc_unregister_service(mds->mds_service);
623         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
624
625         sb = mds->mds_sb;
626         if (!mds->mds_sb){
627                 EXIT;
628                 return 0;
629         }
630
631         if (!list_empty(&mds->mds_reqs)) {
632                 // XXX reply with errors and clean up
633                 CDEBUG(D_INODE, "Request list not empty!\n");
634         }
635
636         unlock_kernel();
637         mntput(mds->mds_vfsmnt); 
638         mds->mds_sb = 0;
639         kfree(mds->mds_fstype);
640         lock_kernel();
641
642         MOD_DEC_USE_COUNT;
643         EXIT;
644         return 0;
645 }
646
647 /* use obd ops to offer management infrastructure */
648 static struct obd_ops mds_obd_ops = {
649         o_setup:       mds_setup,
650         o_cleanup:     mds_cleanup,
651 };
652
653 static int __init mds_init(void)
654 {
655         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
656         return 0;
657 }
658
659 static void __exit mds_exit(void)
660 {
661         obd_unregister_type(LUSTRE_MDS_NAME);
662 }
663
664 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
665 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
666 MODULE_LICENSE("GPL");
667
668
669 // for testing (maybe this stays)
670 EXPORT_SYMBOL(mds_queue_req);
671
672 module_init(mds_init);
673 module_exit(mds_exit);