Whamcloud - gitweb
class/class_obd.c: small OBD_ATTACHED sanity cleanup; OBD_SET_UP fix.
[fs/lustre-release.git] / lustre / mds / handler.c
1 /*
2  *  linux/mds/handler.c
3  *  
4  *  Lustre Metadata Server (mds) request handler
5  * 
6  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
7  *
8  *  This code is issued under the GNU General Public License.
9  *  See the file COPYING in this distribution
10  *
11  *  by Peter Braam <braam@clusterfs.com>
12  * 
13  *  This server is single threaded at present (but can easily be multi threaded). 
14  * 
15  */
16
17
18 #define EXPORT_SYMTAB
19
20 #include <linux/version.h>
21 #include <linux/module.h>
22 #include <linux/fs.h>
23 #include <linux/stat.h>
24 #include <linux/locks.h>
25 #include <linux/ext2_fs.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29 #include <linux/obd_support.h>
30 #include <linux/obd.h>
31 #include <linux/lustre_lib.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_net.h>
35 #include <linux/obd_class.h>
36
37 // XXX for testing
38 static struct mds_obd *MDS;
39
40 // XXX make this networked!  
41 static int mds_queue_req(struct ptlrpc_request *req)
42 {
43         struct ptlrpc_request *srv_req;
44         
45         if (!MDS) { 
46                 EXIT;
47                 return -1;
48         }
49
50         srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
51         if (!srv_req) { 
52                 EXIT;
53                 return -ENOMEM;
54         }
55
56         printk("---> MDS at %d %p, incoming req %p, srv_req %p\n", 
57                __LINE__, MDS, req, srv_req);
58
59         memset(srv_req, 0, sizeof(*req)); 
60
61         /* move the request buffer */
62         srv_req->rq_reqbuf = req->rq_reqbuf;
63         srv_req->rq_reqlen    = req->rq_reqlen;
64         srv_req->rq_obd = MDS;
65
66         /* remember where it came from */
67         srv_req->rq_reply_handle = req;
68
69         list_add(&srv_req->rq_list, &MDS->mds_reqs); 
70         wake_up(&MDS->mds_waitq);
71         return 0;
72 }
73
74 int mds_sendpage(struct ptlrpc_request *req, struct file *file, 
75                     __u64 offset, struct niobuf *dst)
76 {
77         int rc; 
78         mm_segment_t oldfs = get_fs();
79
80         if (req->rq_peer.peer_nid == 0) {
81                 /* dst->addr is a user address, but in a different task! */
82                 set_fs(KERNEL_DS); 
83                 rc = generic_file_read(file, (char *)(long)dst->addr, 
84                                        PAGE_SIZE, &offset); 
85                 set_fs(oldfs);
86
87                 if (rc != PAGE_SIZE) 
88                         return -EIO;
89         } else {
90                 char *buf;
91
92                 buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
93                 if (!buf) {
94                         return -ENOMEM;
95                 }
96
97                 set_fs(KERNEL_DS); 
98                 rc = generic_file_read(file, buf, PAGE_SIZE, &offset); 
99                 set_fs(oldfs);
100
101                 if (rc != PAGE_SIZE) 
102                         return -EIO;
103
104                 req->rq_bulkbuf = buf;
105                 req->rq_bulklen = PAGE_SIZE;
106                 rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL, 0);
107                 init_waitqueue_head(&req->rq_wait_for_bulk);
108                 sleep_on(&req->rq_wait_for_bulk);
109                 kfree(buf);
110                 req->rq_bulklen = 0; /* FIXME: eek. */
111         }
112
113         return 0;
114 }
115
116 int mds_reply(struct ptlrpc_request *req)
117 {
118         struct ptlrpc_request *clnt_req = req->rq_reply_handle;
119
120         ENTRY;
121         
122         if (req->rq_obd->mds_service != NULL) {
123                 /* This is a request that came from the network via portals. */
124
125                 /* FIXME: we need to increment the count of handled events */
126                 ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0);
127         } else {
128                 /* This is a local request that came from another thread. */
129
130                 /* move the reply to the client */ 
131                 clnt_req->rq_replen = req->rq_replen;
132                 clnt_req->rq_repbuf = req->rq_repbuf;
133                 req->rq_repbuf = NULL;
134                 req->rq_replen = 0;
135
136                 /* free the request buffer */
137                 kfree(req->rq_reqbuf);
138                 req->rq_reqbuf = NULL;
139
140                 /* wake up the client */ 
141                 wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
142         }
143
144         EXIT;
145         return 0;
146 }
147
148 int mds_error(struct ptlrpc_request *req)
149 {
150         struct ptlrep_hdr *hdr;
151
152         ENTRY;
153
154         hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
155         if (!hdr) { 
156                 EXIT;
157                 return -ENOMEM;
158         }
159
160         memset(hdr, 0, sizeof(*hdr));
161         
162         hdr->seqno = req->rq_reqhdr->seqno;
163         hdr->status = req->rq_status; 
164         hdr->type = MDS_TYPE_ERR;
165
166         req->rq_repbuf = (char *)hdr;
167         req->rq_replen = sizeof(*hdr); 
168
169         EXIT;
170         return mds_reply(req);
171 }
172
173 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt)
174 {
175         /* stolen from NFS */ 
176         struct super_block *sb = mds->mds_sb; 
177         unsigned long ino = fid->id;
178         //__u32 generation = fid->generation;
179         __u32 generation = 0;
180         struct inode *inode;
181         struct list_head *lp;
182         struct dentry *result;
183
184         if (mnt) { 
185                 *mnt = mntget(mds->mds_vfsmnt);
186         }
187
188         if (ino == 0)
189                 return ERR_PTR(-ESTALE);
190
191         inode = iget(sb, ino);
192         if (inode == NULL)
193                 return ERR_PTR(-ENOMEM);
194
195         printk("--> mds_fid2dentry: sb %p\n", inode->i_sb); 
196
197         if (is_bad_inode(inode)
198             || (generation && inode->i_generation != generation)
199                 ) {
200                 /* we didn't find the right inode.. */
201                 printk(__FUNCTION__ 
202                        "bad inode %lu, link: %d ct: %d or version  %u/%u\n",
203                         inode->i_ino,
204                         inode->i_nlink, atomic_read(&inode->i_count),
205                         inode->i_generation,
206                         generation);
207                 iput(inode);
208                 return ERR_PTR(-ESTALE);
209         }
210
211         /* now to find a dentry.
212          * If possible, get a well-connected one
213          */
214         spin_lock(&dcache_lock);
215         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
216                 result = list_entry(lp,struct dentry, d_alias);
217                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
218                         dget_locked(result);
219                         result->d_vfs_flags |= DCACHE_REFERENCED;
220                         spin_unlock(&dcache_lock);
221                         iput(inode);
222                         return result;
223                 }
224         }
225         spin_unlock(&dcache_lock);
226         result = d_alloc_root(inode);
227         if (result == NULL) {
228                 iput(inode);
229                 return ERR_PTR(-ENOMEM);
230         }
231         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
232         return result;
233 }
234
235 static inline void mds_get_objid(struct inode *inode, __u64 *id)
236 {
237         memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
238 }
239
240 int mds_getattr(struct ptlrpc_request *req)
241 {
242         struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, 
243                                            NULL);
244         struct inode *inode;
245         struct mds_rep *rep;
246         int rc;
247         
248         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds, 
249                           &req->rq_replen, &req->rq_repbuf);
250         if (rc) { 
251                 EXIT;
252                 printk("mds: out of memory\n");
253                 req->rq_status = -ENOMEM;
254                 return 0;
255         }
256
257         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
258         rep = req->rq_rep.mds;
259
260         if (!de) { 
261                 EXIT;
262                 req->rq_rephdr->status = -ENOENT;
263                 return 0;
264         }
265
266         inode = de->d_inode;
267         rep->ino = inode->i_ino;
268         rep->atime = inode->i_atime;
269         rep->ctime = inode->i_ctime;
270         rep->mtime = inode->i_mtime;
271         rep->uid = inode->i_uid;
272         rep->gid = inode->i_gid;
273         rep->size = inode->i_size;
274         rep->mode = inode->i_mode;
275         rep->nlink = inode->i_nlink;
276         rep->valid = ~0;
277         mds_get_objid(inode, &rep->objid);
278         dput(de); 
279         return 0;
280 }
281
282 int mds_readpage(struct ptlrpc_request *req)
283 {
284         struct vfsmount *mnt;
285         struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, 
286                                            &mnt);
287         struct file *file; 
288         struct niobuf *niobuf; 
289         struct mds_rep *rep;
290         int rc;
291         
292         printk("mds_readpage: ino %ld\n", de->d_inode->i_ino);
293         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds, 
294                           &req->rq_replen, &req->rq_repbuf);
295         if (rc) { 
296                 EXIT;
297                 printk("mds: out of memory\n");
298                 req->rq_status = -ENOMEM;
299                 return 0;
300         }
301
302         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
303         rep = req->rq_rep.mds;
304
305         if (IS_ERR(de)) { 
306                 EXIT;
307                 req->rq_rephdr->status = PTR_ERR(de); 
308                 return 0;
309         }
310
311         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); 
312         /* note: in case of an error, dentry_open puts dentry */
313         if (IS_ERR(file)) { 
314                 EXIT;
315                 req->rq_rephdr->status = PTR_ERR(file);
316                 return 0;
317         }
318                 
319         niobuf = mds_req_tgt(req->rq_req.mds);
320
321         /* to make this asynchronous make sure that the handling function 
322            doesn't send a reply when this function completes. Instead a 
323            callback function would send the reply */ 
324         rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf); 
325
326         filp_close(file, 0);
327         req->rq_rephdr->status = rc;
328         EXIT;
329         return 0;
330 }
331
332 int mds_reint(struct ptlrpc_request *req)
333 {
334         int rc;
335         char *buf = mds_req_tgt(req->rq_req.mds);
336         int len = req->rq_req.mds->tgtlen;
337         struct mds_update_record rec;
338         
339         rc = mds_update_unpack(buf, len, &rec);
340         if (rc) { 
341                 printk(__FUNCTION__ ": invalid record\n");
342                 req->rq_status = -EINVAL;
343                 return 0;
344         }
345         /* rc will be used to interrupt a for loop over multiple records */
346         rc = mds_reint_rec(&rec, req); 
347         return 0; 
348 }
349
350 //int mds_handle(struct mds_conn *conn, int len, char *buf)
351 int mds_handle(struct ptlrpc_request *req)
352 {
353         int rc;
354         struct ptlreq_hdr *hdr;
355
356         ENTRY;
357
358         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
359
360         if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
361                 printk("lustre_mds: wrong packet type sent %d\n",
362                        NTOH__u32(hdr->type));
363                 rc = -EINVAL;
364                 goto out;
365         }
366
367         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
368                             &req->rq_reqhdr, &req->rq_req.mds);
369         if (rc) { 
370                 printk("lustre_mds: Invalid request\n");
371                 EXIT; 
372                 goto out;
373         }
374
375         switch (req->rq_reqhdr->opc) { 
376
377         case MDS_GETATTR:
378                 CDEBUG(D_INODE, "getattr\n");
379                 rc = mds_getattr(req);
380                 break;
381
382         case MDS_READPAGE:
383                 CDEBUG(D_INODE, "readpage\n");
384                 rc = mds_readpage(req);
385                 break;
386
387         case MDS_REINT:
388                 CDEBUG(D_INODE, "reint\n");
389                 rc = mds_reint(req);
390                 break;
391
392         default:
393                 return mds_error(req);
394         }
395
396 out:
397         if (rc) { 
398                 printk(__FUNCTION__ ": no header\n");
399                 return 0;
400         }
401
402         if( req->rq_status) { 
403                 mds_error(req);
404         } else { 
405                 CDEBUG(D_INODE, "sending reply\n"); 
406                 mds_reply(req); 
407         }
408
409         return 0;
410 }
411
412
413 static void mds_timer_run(unsigned long __data)
414 {
415         struct task_struct * p = (struct task_struct *) __data;
416
417         wake_up_process(p);
418 }
419
420 int mds_main(void *arg)
421 {
422         struct mds_obd *mds = (struct mds_obd *) arg;
423         struct timer_list timer;
424
425         lock_kernel();
426         daemonize();
427         spin_lock_irq(&current->sigmask_lock);
428         sigfillset(&current->blocked);
429         recalc_sigpending(current);
430         spin_unlock_irq(&current->sigmask_lock);
431
432         sprintf(current->comm, "lustre_mds");
433
434         /* Set up an interval timer which can be used to trigger a
435            wakeup after the interval expires */
436         init_timer(&timer);
437         timer.data = (unsigned long) current;
438         timer.function = mds_timer_run;
439         mds->mds_timer = &timer;
440
441         /* Record that the  thread is running */
442         mds->mds_thread = current;
443         wake_up(&mds->mds_done_waitq); 
444
445         printk(KERN_INFO "lustre_mds starting.  Commit interval %d seconds\n",
446                         mds->mds_interval / HZ);
447
448         /* XXX maintain a list of all managed devices: insert here */
449
450         /* And now, wait forever for commit wakeup events. */
451         while (1) {
452                 int rc;
453
454                 if (mds->mds_flags & MDS_UNMOUNT)
455                         break;
456
457                 wake_up(&mds->mds_done_waitq);
458                 interruptible_sleep_on(&mds->mds_waitq);
459
460                 CDEBUG(D_INODE, "lustre_mds wakes\n");
461                 CDEBUG(D_INODE, "pick up req here and continue\n"); 
462
463                 if (mds->mds_service != NULL) {
464                         ptl_event_t ev;
465
466                         while (1) {
467                                 struct ptlrpc_request request;
468
469                                 rc = PtlEQGet(mds->mds_service->srv_eq_h, &ev);
470                                 if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
471                                         break;
472                                 /* FIXME: If we move to an event-driven model,
473                                  * we should put the request on the stack of
474                                  * mds_handle instead. */
475                                 memset(&request, 0, sizeof(request));
476                                 request.rq_reqbuf = ev.mem_desc.start +
477                                         ev.offset;
478                                 request.rq_reqlen = ev.mem_desc.length;
479                                 request.rq_obd = MDS;
480                                 request.rq_xid = ev.match_bits;
481
482                                 request.rq_peer.peer_nid = ev.initiator.nid;
483                                 /* FIXME: this NI should be the incoming NI.
484                                  * We don't know how to find that from here. */
485                                 request.rq_peer.peer_ni =
486                                         mds->mds_service->srv_self.peer_ni;
487                                 rc = mds_handle(&request);
488                         }
489                 } else {
490                         struct ptlrpc_request *request;
491
492                         if (list_empty(&mds->mds_reqs)) {
493                                 CDEBUG(D_INODE, "woke because of timer\n");
494                         } else {
495                                 request = list_entry(mds->mds_reqs.next,
496                                                      struct ptlrpc_request,
497                                                      rq_list);
498                                 list_del(&request->rq_list);
499                                 rc = mds_handle(request);
500                         }
501                 }
502         }
503
504         del_timer_sync(mds->mds_timer);
505
506         /* XXX maintain a list of all managed devices: cleanup here */
507
508         mds->mds_thread = NULL;
509         wake_up(&mds->mds_done_waitq);
510         printk("lustre_mds: exiting\n");
511         return 0;
512 }
513
514 static void mds_stop_srv_thread(struct mds_obd *mds)
515 {
516         mds->mds_flags |= MDS_UNMOUNT;
517
518         while (mds->mds_thread) {
519                 wake_up(&mds->mds_waitq);
520                 sleep_on(&mds->mds_done_waitq);
521         }
522 }
523
524 static void mds_start_srv_thread(struct mds_obd *mds)
525 {
526         init_waitqueue_head(&mds->mds_waitq);
527         init_waitqueue_head(&mds->mds_done_waitq);
528         kernel_thread(mds_main, (void *)mds, 
529                       CLONE_VM | CLONE_FS | CLONE_FILES);
530         while (!mds->mds_thread) 
531                 sleep_on(&mds->mds_done_waitq);
532 }
533
534 /* mount the file system (secretly) */
535 static int mds_setup(struct obd_device *obddev, obd_count len,
536                         void *buf)
537                         
538 {
539         struct obd_ioctl_data* data = buf;
540         struct mds_obd *mds = &obddev->u.mds;
541         struct vfsmount *mnt;
542         struct lustre_peer peer;
543         int err; 
544         ENTRY;
545
546
547         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); 
548         err = PTR_ERR(mnt);
549         if (IS_ERR(mnt)) { 
550                 EXIT;
551                 return err;
552         }
553
554         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
555         if (!obddev->u.mds.mds_sb) {
556                 EXIT;
557                 return -ENODEV;
558         }
559
560         mds->mds_vfsmnt = mnt;
561         obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
562
563         mds->mds_ctxt.pwdmnt = mnt;
564         mds->mds_ctxt.pwd = mnt->mnt_root;
565         mds->mds_ctxt.fs = KERNEL_DS;
566         mds->mds_remote_nid = 0;
567
568         INIT_LIST_HEAD(&mds->mds_reqs);
569         mds->mds_thread = NULL;
570         mds->mds_flags = 0;
571         mds->mds_interval = 3 * HZ;
572         MDS = mds;
573
574         spin_lock_init(&obddev->u.mds.mds_lock);
575
576         err = kportal_uuid_to_peer("self", &peer);
577         if (err == 0) {
578                 mds->mds_service = kmalloc(sizeof(*mds->mds_service),
579                                                   GFP_KERNEL);
580                 if (mds->mds_service == NULL)
581                         return -ENOMEM;
582                 mds->mds_service->srv_buf_size = 64 * 1024;
583                 mds->mds_service->srv_portal = MDS_REQUEST_PORTAL;
584                 memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer));
585                 mds->mds_service->srv_wait_queue = &mds->mds_waitq;
586
587                 rpc_register_service(mds->mds_service, "self");
588         }
589
590         mds_start_srv_thread(mds);
591
592         MOD_INC_USE_COUNT;
593         EXIT; 
594         return 0;
595
596
597 static int mds_cleanup(struct obd_device * obddev)
598 {
599         struct super_block *sb;
600         struct mds_obd *mds = &obddev->u.mds;
601
602         ENTRY;
603
604         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
605                 EXIT;
606                 return 0;
607         }
608
609         if ( !list_empty(&obddev->obd_gen_clients) ) {
610                 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
611                 EXIT;
612                 return -EBUSY;
613         }
614
615         MDS = NULL;
616         mds_stop_srv_thread(mds);
617         sb = mds->mds_sb;
618         if (!mds->mds_sb){
619                 EXIT;
620                 return 0;
621         }
622
623         if (!list_empty(&mds->mds_reqs)) {
624                 // XXX reply with errors and clean up
625                 CDEBUG(D_INODE, "Request list not empty!\n");
626         }
627
628         unlock_kernel();
629         mntput(mds->mds_vfsmnt); 
630         mds->mds_sb = 0;
631         kfree(mds->mds_fstype);
632         lock_kernel();
633
634         MOD_DEC_USE_COUNT;
635         EXIT;
636         return 0;
637 }
638
639 /* use obd ops to offer management infrastructure */
640 static struct obd_ops mds_obd_ops = {
641         o_setup:       mds_setup,
642         o_cleanup:     mds_cleanup,
643 };
644
645 static int __init mds_init(void)
646 {
647         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
648         return 0;
649 }
650
651 static void __exit mds_exit(void)
652 {
653         obd_unregister_type(LUSTRE_MDS_NAME);
654 }
655
656 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
657 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
658 MODULE_LICENSE("GPL");
659
660
661 // for testing (maybe this stays)
662 EXPORT_SYMBOL(mds_queue_req);
663
664 module_init(mds_init);
665 module_exit(mds_exit);