Whamcloud - gitweb
72bfa8bd120f5304417d108457aef2ffb54a42d0
[fs/lustre-release.git] / lustre / mds / handler.c
1 /*
2  *  linux/mds/handler.c
3  *  
4  *  Lustre Metadata Server (mds) request handler
5  * 
6  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
7  *
8  *  This code is issued under the GNU General Public License.
9  *  See the file COPYING in this distribution
10  *
11  *  by Peter Braam <braam@clusterfs.com>
12  * 
13  *  This server is single threaded at present (but can easily be multi threaded). 
14  * 
15  */
16
17
18 #define EXPORT_SYMTAB
19
20 #include <linux/version.h>
21 #include <linux/module.h>
22 #include <linux/fs.h>
23 #include <linux/stat.h>
24 #include <linux/locks.h>
25 #include <linux/ext2_fs.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29 #include <linux/obd_support.h>
30 #include <linux/obd.h>
31 #include <linux/lustre_lib.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_net.h>
35 #include <linux/obd_class.h>
36
37 // XXX for testing
38 static struct mds_obd *MDS;
39
40 // XXX make this networked!  
41 static int mds_queue_req(struct ptlrpc_request *req)
42 {
43         struct ptlrpc_request *srv_req;
44         
45         if (!MDS) { 
46                 EXIT;
47                 return -1;
48         }
49
50         srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
51         if (!srv_req) { 
52                 EXIT;
53                 return -ENOMEM;
54         }
55
56         printk("---> MDS at %d %p, incoming req %p, srv_req %p\n", 
57                __LINE__, MDS, req, srv_req);
58
59         memset(srv_req, 0, sizeof(*req)); 
60
61         /* move the request buffer */
62         srv_req->rq_reqbuf = req->rq_reqbuf;
63         srv_req->rq_reqlen    = req->rq_reqlen;
64         srv_req->rq_obd = MDS;
65
66         /* remember where it came from */
67         srv_req->rq_reply_handle = req;
68
69         list_add(&srv_req->rq_list, &MDS->mds_reqs); 
70         wake_up(&MDS->mds_waitq);
71         return 0;
72 }
73
74 /* XXX do this over the net */
75 int mds_sendpage(struct ptlrpc_request *req, struct file *file, 
76                     __u64 offset, struct niobuf *dst)
77 {
78         int rc; 
79         mm_segment_t oldfs = get_fs();
80         /* dst->addr is a user address, but in a different task! */
81         set_fs(KERNEL_DS); 
82         rc = generic_file_read(file, (char *)(long)dst->addr, 
83                               PAGE_SIZE, &offset); 
84         set_fs(oldfs);
85
86         if (rc != PAGE_SIZE) 
87                 return -EIO;
88         return 0;
89 }
90
91 /* XXX replace with networking code */
92 int mds_reply(struct ptlrpc_request *req)
93 {
94         struct ptlrpc_request *clnt_req = req->rq_reply_handle;
95
96         ENTRY;
97         
98         if (req->rq_obd->mds_service != NULL) {
99                 /* This is a request that came from the network via portals. */
100
101                 /* FIXME: we need to increment the count of handled events */
102                 ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0);
103         } else {
104                 /* This is a local request that came from another thread. */
105
106                 /* move the reply to the client */ 
107                 clnt_req->rq_replen = req->rq_replen;
108                 clnt_req->rq_repbuf = req->rq_repbuf;
109                 req->rq_repbuf = NULL;
110                 req->rq_replen = 0;
111
112                 /* free the request buffer */
113                 kfree(req->rq_reqbuf);
114                 req->rq_reqbuf = NULL;
115
116                 /* wake up the client */ 
117                 wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
118         }
119
120         EXIT;
121         return 0;
122 }
123
124 int mds_error(struct ptlrpc_request *req)
125 {
126         struct ptlrep_hdr *hdr;
127
128         ENTRY;
129
130         hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
131         if (!hdr) { 
132                 EXIT;
133                 return -ENOMEM;
134         }
135
136         memset(hdr, 0, sizeof(*hdr));
137         
138         hdr->seqno = req->rq_reqhdr->seqno;
139         hdr->status = req->rq_status; 
140         hdr->type = MDS_TYPE_ERR;
141
142         req->rq_repbuf = (char *)hdr;
143         req->rq_replen = sizeof(*hdr); 
144
145         EXIT;
146         return mds_reply(req);
147 }
148
149 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt)
150 {
151         /* stolen from NFS */ 
152         struct super_block *sb = mds->mds_sb; 
153         unsigned long ino = fid->id;
154         //__u32 generation = fid->generation;
155         __u32 generation = 0;
156         struct inode *inode;
157         struct list_head *lp;
158         struct dentry *result;
159
160         if (mnt) { 
161                 *mnt = mntget(mds->mds_vfsmnt);
162         }
163
164         if (ino == 0)
165                 return ERR_PTR(-ESTALE);
166
167         inode = iget(sb, ino);
168         if (inode == NULL)
169                 return ERR_PTR(-ENOMEM);
170
171         printk("--> mds_fid2dentry: sb %p\n", inode->i_sb); 
172
173         if (is_bad_inode(inode)
174             || (generation && inode->i_generation != generation)
175                 ) {
176                 /* we didn't find the right inode.. */
177                 printk(__FUNCTION__ 
178                        "bad inode %lu, link: %d ct: %d or version  %u/%u\n",
179                         inode->i_ino,
180                         inode->i_nlink, atomic_read(&inode->i_count),
181                         inode->i_generation,
182                         generation);
183                 iput(inode);
184                 return ERR_PTR(-ESTALE);
185         }
186
187         /* now to find a dentry.
188          * If possible, get a well-connected one
189          */
190         spin_lock(&dcache_lock);
191         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
192                 result = list_entry(lp,struct dentry, d_alias);
193                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
194                         dget_locked(result);
195                         result->d_vfs_flags |= DCACHE_REFERENCED;
196                         spin_unlock(&dcache_lock);
197                         iput(inode);
198                         return result;
199                 }
200         }
201         spin_unlock(&dcache_lock);
202         result = d_alloc_root(inode);
203         if (result == NULL) {
204                 iput(inode);
205                 return ERR_PTR(-ENOMEM);
206         }
207         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
208         return result;
209 }
210
211 static inline void mds_get_objid(struct inode *inode, __u64 *id)
212 {
213         memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
214 }
215
216 int mds_getattr(struct ptlrpc_request *req)
217 {
218         struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, 
219                                            NULL);
220         struct inode *inode;
221         struct mds_rep *rep;
222         int rc;
223         
224         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds, 
225                           &req->rq_replen, &req->rq_repbuf);
226         if (rc) { 
227                 EXIT;
228                 printk("mds: out of memory\n");
229                 req->rq_status = -ENOMEM;
230                 return 0;
231         }
232
233         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
234         rep = req->rq_rep.mds;
235
236         if (!de) { 
237                 EXIT;
238                 req->rq_rephdr->status = -ENOENT;
239                 return 0;
240         }
241
242         inode = de->d_inode;
243         rep->ino = inode->i_ino;
244         rep->atime = inode->i_atime;
245         rep->ctime = inode->i_ctime;
246         rep->mtime = inode->i_mtime;
247         rep->uid = inode->i_uid;
248         rep->gid = inode->i_gid;
249         rep->size = inode->i_size;
250         rep->mode = inode->i_mode;
251         rep->nlink = inode->i_nlink;
252         rep->valid = ~0;
253         mds_get_objid(inode, &rep->objid);
254         dput(de); 
255         return 0;
256 }
257
258 int mds_readpage(struct ptlrpc_request *req)
259 {
260         struct vfsmount *mnt;
261         struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, 
262                                            &mnt);
263         struct file *file; 
264         struct niobuf *niobuf; 
265         struct mds_rep *rep;
266         int rc;
267         
268         printk("mds_readpage: ino %ld\n", de->d_inode->i_ino);
269         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds, 
270                           &req->rq_replen, &req->rq_repbuf);
271         if (rc) { 
272                 EXIT;
273                 printk("mds: out of memory\n");
274                 req->rq_status = -ENOMEM;
275                 return 0;
276         }
277
278         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
279         rep = req->rq_rep.mds;
280
281         if (IS_ERR(de)) { 
282                 EXIT;
283                 req->rq_rephdr->status = PTR_ERR(de); 
284                 return 0;
285         }
286
287         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); 
288         /* note: in case of an error, dentry_open puts dentry */
289         if (IS_ERR(file)) { 
290                 EXIT;
291                 req->rq_rephdr->status = PTR_ERR(file);
292                 return 0;
293         }
294                 
295         niobuf = mds_req_tgt(req->rq_req.mds);
296
297         /* to make this asynchronous make sure that the handling function 
298            doesn't send a reply when this function completes. Instead a 
299            callback function would send the reply */ 
300         rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf); 
301
302         filp_close(file, 0);
303         req->rq_rephdr->status = rc;
304         EXIT;
305         return 0;
306 }
307
308 int mds_reint(struct ptlrpc_request *req)
309 {
310         int rc;
311         char *buf = mds_req_tgt(req->rq_req.mds);
312         int len = req->rq_req.mds->tgtlen;
313         struct mds_update_record rec;
314         
315         rc = mds_update_unpack(buf, len, &rec);
316         if (rc) { 
317                 printk(__FUNCTION__ ": invalid record\n");
318                 req->rq_status = -EINVAL;
319                 return 0;
320         }
321         /* rc will be used to interrupt a for loop over multiple records */
322         rc = mds_reint_rec(&rec, req); 
323         return 0; 
324 }
325
326 //int mds_handle(struct mds_conn *conn, int len, char *buf)
327 int mds_handle(struct ptlrpc_request *req)
328 {
329         int rc;
330         struct ptlreq_hdr *hdr;
331
332         ENTRY;
333
334         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
335
336         if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
337                 printk("lustre_mds: wrong packet type sent %d\n",
338                        NTOH__u32(hdr->type));
339                 rc = -EINVAL;
340                 goto out;
341         }
342
343         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
344                             &req->rq_reqhdr, &req->rq_req.mds);
345         if (rc) { 
346                 printk("lustre_mds: Invalid request\n");
347                 EXIT; 
348                 goto out;
349         }
350
351         switch (req->rq_reqhdr->opc) { 
352
353         case MDS_GETATTR:
354                 CDEBUG(D_INODE, "getattr\n");
355                 rc = mds_getattr(req);
356                 break;
357
358         case MDS_READPAGE:
359                 CDEBUG(D_INODE, "readpage\n");
360                 rc = mds_readpage(req);
361                 break;
362
363         case MDS_REINT:
364                 CDEBUG(D_INODE, "reint\n");
365                 rc = mds_reint(req);
366                 break;
367
368         default:
369                 return mds_error(req);
370         }
371
372 out:
373         if (rc) { 
374                 printk(__FUNCTION__ ": no header\n");
375                 return 0;
376         }
377
378         if( req->rq_status) { 
379                 mds_error(req);
380         } else { 
381                 CDEBUG(D_INODE, "sending reply\n"); 
382                 mds_reply(req); 
383         }
384
385         return 0;
386 }
387
388
389 static void mds_timer_run(unsigned long __data)
390 {
391         struct task_struct * p = (struct task_struct *) __data;
392
393         wake_up_process(p);
394 }
395
396 int mds_main(void *arg)
397 {
398         struct mds_obd *mds = (struct mds_obd *) arg;
399         struct timer_list timer;
400
401         lock_kernel();
402         daemonize();
403         spin_lock_irq(&current->sigmask_lock);
404         sigfillset(&current->blocked);
405         recalc_sigpending(current);
406         spin_unlock_irq(&current->sigmask_lock);
407
408         sprintf(current->comm, "lustre_mds");
409
410         /* Set up an interval timer which can be used to trigger a
411            wakeup after the interval expires */
412         init_timer(&timer);
413         timer.data = (unsigned long) current;
414         timer.function = mds_timer_run;
415         mds->mds_timer = &timer;
416
417         /* Record that the  thread is running */
418         mds->mds_thread = current;
419         wake_up(&mds->mds_done_waitq); 
420
421         printk(KERN_INFO "lustre_mds starting.  Commit interval %d seconds\n",
422                         mds->mds_interval / HZ);
423
424         /* XXX maintain a list of all managed devices: insert here */
425
426         /* And now, wait forever for commit wakeup events. */
427         while (1) {
428                 int rc;
429
430                 if (mds->mds_flags & MDS_UNMOUNT)
431                         break;
432
433                 wake_up(&mds->mds_done_waitq);
434                 interruptible_sleep_on(&mds->mds_waitq);
435
436                 CDEBUG(D_INODE, "lustre_mds wakes\n");
437                 CDEBUG(D_INODE, "pick up req here and continue\n"); 
438
439                 if (mds->mds_service != NULL) {
440                         ptl_event_t ev;
441
442                         while (1) {
443                                 struct ptlrpc_request request;
444
445                                 rc = PtlEQGet(mds->mds_service->srv_eq, &ev);
446                                 if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
447                                         break;
448                                 /* FIXME: If we move to an event-driven model,
449                                  * we should put the request on the stack of
450                                  * mds_handle instead. */
451                                 memset(&request, 0, sizeof(request));
452                                 request.rq_reqbuf = ev.mem_desc.start +
453                                         ev.offset;
454                                 request.rq_reqlen = ev.mem_desc.length;
455                                 request.rq_obd = MDS;
456                                 request.rq_xid = ev.match_bits;
457
458                                 request.rq_peer.peer_nid = ev.initiator.nid;
459                                 /* FIXME: this NI should be the incoming NI.
460                                  * We don't know how to find that from here. */
461                                 request.rq_peer.peer_ni =
462                                         mds->mds_service->srv_self.peer_ni;
463                                 rc = mds_handle(&request);
464                         }
465                 } else {
466                         struct ptlrpc_request *request;
467
468                         if (list_empty(&mds->mds_reqs)) {
469                                 CDEBUG(D_INODE, "woke because of timer\n");
470                         } else {
471                                 request = list_entry(mds->mds_reqs.next,
472                                                      struct ptlrpc_request,
473                                                      rq_list);
474                                 list_del(&request->rq_list);
475                                 rc = mds_handle(request);
476                         }
477                 }
478         }
479
480         del_timer_sync(mds->mds_timer);
481
482         /* XXX maintain a list of all managed devices: cleanup here */
483
484         mds->mds_thread = NULL;
485         wake_up(&mds->mds_done_waitq);
486         printk("lustre_mds: exiting\n");
487         return 0;
488 }
489
490 static void mds_stop_srv_thread(struct mds_obd *mds)
491 {
492         mds->mds_flags |= MDS_UNMOUNT;
493
494         while (mds->mds_thread) {
495                 wake_up(&mds->mds_waitq);
496                 sleep_on(&mds->mds_done_waitq);
497         }
498 }
499
500 static void mds_start_srv_thread(struct mds_obd *mds)
501 {
502         init_waitqueue_head(&mds->mds_waitq);
503         init_waitqueue_head(&mds->mds_done_waitq);
504         kernel_thread(mds_main, (void *)mds, 
505                       CLONE_VM | CLONE_FS | CLONE_FILES);
506         while (!mds->mds_thread) 
507                 sleep_on(&mds->mds_done_waitq);
508 }
509
510 /* mount the file system (secretly) */
511 static int mds_setup(struct obd_device *obddev, obd_count len,
512                         void *buf)
513                         
514 {
515         struct obd_ioctl_data* data = buf;
516         struct mds_obd *mds = &obddev->u.mds;
517         struct vfsmount *mnt;
518         struct lustre_peer peer;
519         int err; 
520         ENTRY;
521
522
523         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); 
524         err = PTR_ERR(mnt);
525         if (IS_ERR(mnt)) { 
526                 EXIT;
527                 return err;
528         }
529
530         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
531         if (!obddev->u.mds.mds_sb) {
532                 EXIT;
533                 return -ENODEV;
534         }
535
536         mds->mds_vfsmnt = mnt;
537         obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
538
539         mds->mds_ctxt.pwdmnt = mnt;
540         mds->mds_ctxt.pwd = mnt->mnt_root;
541         mds->mds_ctxt.fs = KERNEL_DS;
542         mds->mds_remote_nid = 0;
543
544         INIT_LIST_HEAD(&mds->mds_reqs);
545         mds->mds_thread = NULL;
546         mds->mds_flags = 0;
547         mds->mds_interval = 3 * HZ;
548         MDS = mds;
549
550         spin_lock_init(&obddev->u.mds.mds_lock);
551
552         err = kportal_uuid_to_peer("self", &peer);
553         if (err == 0) {
554                 mds->mds_service = kmalloc(sizeof(*mds->mds_service),
555                                                   GFP_KERNEL);
556                 if (mds->mds_service == NULL)
557                         return -ENOMEM;
558                 mds->mds_service->srv_buf_size = 64 * 1024;
559                 mds->mds_service->srv_portal = MDS_REQUEST_PORTAL;
560                 memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer));
561                 mds->mds_service->srv_wait_queue = &mds->mds_waitq;
562
563                 rpc_register_service(mds->mds_service, "self");
564         }
565
566         mds_start_srv_thread(mds);
567
568         MOD_INC_USE_COUNT;
569         EXIT; 
570         return 0;
571
572
573 static int mds_cleanup(struct obd_device * obddev)
574 {
575         struct super_block *sb;
576         struct mds_obd *mds = &obddev->u.mds;
577
578         ENTRY;
579
580         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
581                 EXIT;
582                 return 0;
583         }
584
585         if ( !list_empty(&obddev->obd_gen_clients) ) {
586                 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
587                 EXIT;
588                 return -EBUSY;
589         }
590
591         MDS = NULL;
592         mds_stop_srv_thread(mds);
593         sb = mds->mds_sb;
594         if (!mds->mds_sb){
595                 EXIT;
596                 return 0;
597         }
598
599         if (!list_empty(&mds->mds_reqs)) {
600                 // XXX reply with errors and clean up
601                 CDEBUG(D_INODE, "Request list not empty!\n");
602         }
603
604         unlock_kernel();
605         mntput(mds->mds_vfsmnt); 
606         mds->mds_sb = 0;
607         kfree(mds->mds_fstype);
608         lock_kernel();
609         
610
611         MOD_DEC_USE_COUNT;
612         EXIT;
613         return 0;
614 }
615
616 /* use obd ops to offer management infrastructure */
617 static struct obd_ops mds_obd_ops = {
618         o_setup:       mds_setup,
619         o_cleanup:     mds_cleanup,
620 };
621
622 static int __init mds_init(void)
623 {
624         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
625         return 0;
626 }
627
628 static void __exit mds_exit(void)
629 {
630         obd_unregister_type(LUSTRE_MDS_NAME);
631 }
632
633 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
634 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
635 MODULE_LICENSE("GPL");
636
637
638 // for testing (maybe this stays)
639 EXPORT_SYMBOL(mds_queue_req);
640
641 module_init(mds_init);
642 module_exit(mds_exit);