Whamcloud - gitweb
da72b6475c1b8292c506f66392e16d0a0de3a32d
[fs/lustre-release.git] / lustre / mds / handler.c
1 /*
2  *  linux/mds/handler.c
3  *  
4  *  Lustre Metadata Server (mds) request handler
5  * 
6  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
7  *
8  *  This code is issued under the GNU General Public License.
9  *  See the file COPYING in this distribution
10  *
11  *  by Peter Braam <braam@clusterfs.com>
12  * 
13  *  This server is single threaded at present (but can easily be multi threaded). 
14  * 
15  */
16
17
18 #define EXPORT_SYMTAB
19
20 #include <linux/version.h>
21 #include <linux/module.h>
22 #include <linux/fs.h>
23 #include <linux/stat.h>
24 #include <linux/locks.h>
25 #include <linux/ext2_fs.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <asm/uaccess.h>
29 #include <linux/obd_support.h>
30 #include <linux/obd.h>
31 #include <linux/lustre_lib.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_net.h>
35 #include <linux/obd_class.h>
36
37 // XXX for testing
38 static struct mds_obd *MDS;
39
40 // XXX make this networked!  
41 static int mds_queue_req(struct ptlrpc_request *req)
42 {
43         struct ptlrpc_request *srv_req;
44         
45         if (!MDS) { 
46                 EXIT;
47                 return -1;
48         }
49
50         srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
51         if (!srv_req) { 
52                 EXIT;
53                 return -ENOMEM;
54         }
55
56         printk("---> MDS at %d %p, incoming req %p, srv_req %p\n", 
57                __LINE__, MDS, req, srv_req);
58
59         memset(srv_req, 0, sizeof(*req)); 
60
61         /* move the request buffer */
62         srv_req->rq_reqbuf = req->rq_reqbuf;
63         srv_req->rq_reqlen    = req->rq_reqlen;
64         srv_req->rq_obd = MDS;
65
66         /* remember where it came from */
67         srv_req->rq_reply_handle = req;
68
69         list_add(&srv_req->rq_list, &MDS->mds_reqs); 
70         wake_up(&MDS->mds_waitq);
71         return 0;
72 }
73
74 /* XXX do this over the net */
75 int mds_sendpage(struct ptlrpc_request *req, struct file *file, 
76                     __u64 offset, struct niobuf *dst)
77 {
78         int rc; 
79         mm_segment_t oldfs = get_fs();
80         /* dst->addr is a user address, but in a different task! */
81         set_fs(KERNEL_DS); 
82         rc = generic_file_read(file, (char *)(long)dst->addr, 
83                               PAGE_SIZE, &offset); 
84         set_fs(oldfs);
85
86         if (rc != PAGE_SIZE) 
87                 return -EIO;
88         return 0;
89 }
90
91 /* XXX replace with networking code */
92 int mds_reply(struct ptlrpc_request *req)
93 {
94         struct ptlrpc_request *clnt_req = req->rq_reply_handle;
95
96         ENTRY;
97         
98         /* move the reply to the client */ 
99         clnt_req->rq_replen = req->rq_replen;
100         clnt_req->rq_repbuf = req->rq_repbuf;
101         req->rq_repbuf = NULL;
102         req->rq_replen = 0;
103
104         /* free the request buffer */
105         kfree(req->rq_reqbuf);
106         req->rq_reqbuf = NULL; 
107
108         /* wake up the client */ 
109         wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
110         EXIT;
111         return 0;
112 }
113
114 int mds_error(struct ptlrpc_request *req)
115 {
116         struct ptlrep_hdr *hdr;
117
118         ENTRY;
119
120         hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
121         if (!hdr) { 
122                 EXIT;
123                 return -ENOMEM;
124         }
125
126         memset(hdr, 0, sizeof(*hdr));
127         
128         hdr->seqno = req->rq_reqhdr->seqno;
129         hdr->status = req->rq_status; 
130         hdr->type = MDS_TYPE_ERR;
131
132         req->rq_repbuf = (char *)hdr;
133         req->rq_replen = sizeof(*hdr); 
134
135         EXIT;
136         return mds_reply(req);
137 }
138
139 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt)
140 {
141         /* stolen from NFS */ 
142         struct super_block *sb = mds->mds_sb; 
143         unsigned long ino = fid->id;
144         //__u32 generation = fid->generation;
145         __u32 generation = 0;
146         struct inode *inode;
147         struct list_head *lp;
148         struct dentry *result;
149
150         if (mnt) { 
151                 *mnt = mntget(mds->mds_vfsmnt);
152         }
153
154         if (ino == 0)
155                 return ERR_PTR(-ESTALE);
156
157         inode = iget(sb, ino);
158         if (inode == NULL)
159                 return ERR_PTR(-ENOMEM);
160
161         printk("--> mds_fid2dentry: sb %p\n", inode->i_sb); 
162
163         if (is_bad_inode(inode)
164             || (generation && inode->i_generation != generation)
165                 ) {
166                 /* we didn't find the right inode.. */
167                 printk(__FUNCTION__ 
168                        "bad inode %lu, link: %d ct: %d or version  %u/%u\n",
169                         inode->i_ino,
170                         inode->i_nlink, atomic_read(&inode->i_count),
171                         inode->i_generation,
172                         generation);
173                 iput(inode);
174                 return ERR_PTR(-ESTALE);
175         }
176
177         /* now to find a dentry.
178          * If possible, get a well-connected one
179          */
180         spin_lock(&dcache_lock);
181         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
182                 result = list_entry(lp,struct dentry, d_alias);
183                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
184                         dget_locked(result);
185                         result->d_vfs_flags |= DCACHE_REFERENCED;
186                         spin_unlock(&dcache_lock);
187                         iput(inode);
188                         return result;
189                 }
190         }
191         spin_unlock(&dcache_lock);
192         result = d_alloc_root(inode);
193         if (result == NULL) {
194                 iput(inode);
195                 return ERR_PTR(-ENOMEM);
196         }
197         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
198         return result;
199 }
200
201 static inline void mds_get_objid(struct inode *inode, __u64 *id)
202 {
203         memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
204 }
205
206 int mds_getattr(struct ptlrpc_request *req)
207 {
208         struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, 
209                                            NULL);
210         struct inode *inode;
211         struct mds_rep *rep;
212         int rc;
213         
214         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds, 
215                           &req->rq_replen, &req->rq_repbuf);
216         if (rc) { 
217                 EXIT;
218                 printk("mds: out of memory\n");
219                 req->rq_status = -ENOMEM;
220                 return 0;
221         }
222
223         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
224         rep = req->rq_rep.mds;
225
226         if (!de) { 
227                 EXIT;
228                 req->rq_rephdr->status = -ENOENT;
229                 return 0;
230         }
231
232         inode = de->d_inode;
233         rep->ino = inode->i_ino;
234         rep->atime = inode->i_atime;
235         rep->ctime = inode->i_ctime;
236         rep->mtime = inode->i_mtime;
237         rep->uid = inode->i_uid;
238         rep->gid = inode->i_gid;
239         rep->size = inode->i_size;
240         rep->mode = inode->i_mode;
241         rep->nlink = inode->i_nlink;
242         rep->valid = ~0;
243         mds_get_objid(inode, &rep->objid);
244         dput(de); 
245         return 0;
246 }
247
248 int mds_readpage(struct ptlrpc_request *req)
249 {
250         struct vfsmount *mnt;
251         struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, 
252                                            &mnt);
253         struct file *file; 
254         struct niobuf *niobuf; 
255         struct mds_rep *rep;
256         int rc;
257         
258         printk("mds_readpage: ino %ld\n", de->d_inode->i_ino);
259         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds, 
260                           &req->rq_replen, &req->rq_repbuf);
261         if (rc) { 
262                 EXIT;
263                 printk("mds: out of memory\n");
264                 req->rq_status = -ENOMEM;
265                 return 0;
266         }
267
268         req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
269         rep = req->rq_rep.mds;
270
271         if (IS_ERR(de)) { 
272                 EXIT;
273                 req->rq_rephdr->status = PTR_ERR(de); 
274                 return 0;
275         }
276
277         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); 
278         /* note: in case of an error, dentry_open puts dentry */
279         if (IS_ERR(file)) { 
280                 EXIT;
281                 req->rq_rephdr->status = PTR_ERR(file);
282                 return 0;
283         }
284                 
285         niobuf = mds_req_tgt(req->rq_req.mds);
286
287         /* to make this asynchronous make sure that the handling function 
288            doesn't send a reply when this function completes. Instead a 
289            callback function would send the reply */ 
290         rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf); 
291
292         filp_close(file, 0);
293         req->rq_rephdr->status = rc;
294         EXIT;
295         return 0;
296 }
297
298 int mds_reint(struct ptlrpc_request *req)
299 {
300         int rc;
301         char *buf = mds_req_tgt(req->rq_req.mds);
302         int len = req->rq_req.mds->tgtlen;
303         struct mds_update_record rec;
304         
305         rc = mds_update_unpack(buf, len, &rec);
306         if (rc) { 
307                 printk(__FUNCTION__ ": invalid record\n");
308                 req->rq_status = -EINVAL;
309                 return 0;
310         }
311         /* rc will be used to interrupt a for loop over multiple records */
312         rc = mds_reint_rec(&rec, req); 
313         return 0; 
314 }
315
316 //int mds_handle(struct mds_conn *conn, int len, char *buf)
317 int mds_handle(struct ptlrpc_request *req)
318 {
319         int rc;
320         struct ptlreq_hdr *hdr;
321
322         ENTRY;
323
324         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
325
326         if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
327                 printk("lustre_mds: wrong packet type sent %d\n",
328                        NTOH__u32(hdr->type));
329                 rc = -EINVAL;
330                 goto out;
331         }
332
333         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
334                             &req->rq_reqhdr, &req->rq_req.mds);
335         if (rc) { 
336                 printk("lustre_mds: Invalid request\n");
337                 EXIT; 
338                 goto out;
339         }
340
341         switch (req->rq_reqhdr->opc) { 
342
343         case MDS_GETATTR:
344                 CDEBUG(D_INODE, "getattr\n");
345                 rc = mds_getattr(req);
346                 break;
347
348         case MDS_READPAGE:
349                 CDEBUG(D_INODE, "readpage\n");
350                 rc = mds_readpage(req);
351                 break;
352
353         case MDS_REINT:
354                 CDEBUG(D_INODE, "reint\n");
355                 rc = mds_reint(req);
356                 break;
357
358         default:
359                 return mds_error(req);
360         }
361
362 out:
363         if (rc) { 
364                 printk(__FUNCTION__ ": no header\n");
365                 return 0;
366         }
367
368         if( req->rq_status) { 
369                 mds_error(req);
370         } else { 
371                 CDEBUG(D_INODE, "sending reply\n"); 
372                 mds_reply(req); 
373         }
374
375         return 0;
376 }
377
378
379 static void mds_timer_run(unsigned long __data)
380 {
381         struct task_struct * p = (struct task_struct *) __data;
382
383         wake_up_process(p);
384 }
385
386 int mds_main(void *arg)
387 {
388         struct mds_obd *mds = (struct mds_obd *) arg;
389         struct timer_list timer;
390
391         lock_kernel();
392         daemonize();
393         spin_lock_irq(&current->sigmask_lock);
394         sigfillset(&current->blocked);
395         recalc_sigpending(current);
396         spin_unlock_irq(&current->sigmask_lock);
397
398         sprintf(current->comm, "lustre_mds");
399
400         /* Set up an interval timer which can be used to trigger a
401            wakeup after the interval expires */
402         init_timer(&timer);
403         timer.data = (unsigned long) current;
404         timer.function = mds_timer_run;
405         mds->mds_timer = &timer;
406
407         /* Record that the  thread is running */
408         mds->mds_thread = current;
409         wake_up(&mds->mds_done_waitq); 
410
411         printk(KERN_INFO "lustre_mds starting.  Commit interval %d seconds\n",
412                         mds->mds_interval / HZ);
413
414         /* XXX maintain a list of all managed devices: insert here */
415
416         /* And now, wait forever for commit wakeup events. */
417         while (1) {
418                 struct ptlrpc_request *request;
419                 int rc; 
420
421                 if (mds->mds_flags & MDS_UNMOUNT)
422                         break;
423
424
425                 wake_up(&mds->mds_done_waitq);
426                 interruptible_sleep_on(&mds->mds_waitq);
427
428                 CDEBUG(D_INODE, "lustre_mds wakes\n");
429                 CDEBUG(D_INODE, "pick up req here and continue\n"); 
430
431                 if (list_empty(&mds->mds_reqs)) { 
432                         CDEBUG(D_INODE, "woke because of timer\n"); 
433                 } else { 
434                         request = list_entry(mds->mds_reqs.next, 
435                                              struct ptlrpc_request, rq_list);
436                         list_del(&request->rq_list);
437                         rc = mds_handle(request); 
438                 }
439         }
440
441         del_timer_sync(mds->mds_timer);
442
443         /* XXX maintain a list of all managed devices: cleanup here */
444
445         mds->mds_thread = NULL;
446         wake_up(&mds->mds_done_waitq);
447         printk("lustre_mds: exiting\n");
448         return 0;
449 }
450
451 static void mds_stop_srv_thread(struct mds_obd *mds)
452 {
453         mds->mds_flags |= MDS_UNMOUNT;
454
455         while (mds->mds_thread) {
456                 wake_up(&mds->mds_waitq);
457                 sleep_on(&mds->mds_done_waitq);
458         }
459 }
460
461 static void mds_start_srv_thread(struct mds_obd *mds)
462 {
463         init_waitqueue_head(&mds->mds_waitq);
464         init_waitqueue_head(&mds->mds_done_waitq);
465         kernel_thread(mds_main, (void *)mds, 
466                       CLONE_VM | CLONE_FS | CLONE_FILES);
467         while (!mds->mds_thread) 
468                 sleep_on(&mds->mds_done_waitq);
469 }
470
471 /* mount the file system (secretly) */
472 static int mds_setup(struct obd_device *obddev, obd_count len,
473                         void *buf)
474                         
475 {
476         struct obd_ioctl_data* data = buf;
477         struct mds_obd *mds = &obddev->u.mds;
478         ENTRY;
479
480         /* If the ioctl data contains two inline buffers, this is a request to
481          * mount a local filesystem for metadata storage.  If the ioctl data
482          * contains one buffer, however, it is the UUID of the remote node that
483          * we will contact for metadata. */
484         if (data->ioc_inllen2 == 0) {
485                 __u32 nid;
486
487                 nid = kportal_uuid_to_nid(data->ioc_inlbuf1);
488                 if (nid == 0) {
489                         printk("Lustre: uuid_to_nid failed; use ptlctl to "
490                                "associate this uuid with a NID\n");
491                         EXIT;
492                         return -EINVAL;
493                 }
494                 printk("Lustre MDS: remote nid is %u\n", nid);
495                 mds->mds_remote_nid = nid;
496         } else {
497                 struct vfsmount *mnt;
498                 int err; 
499
500                 mnt = do_kern_mount(data->ioc_inlbuf2, 0, 
501                                     data->ioc_inlbuf1, NULL); 
502                 err = PTR_ERR(mnt);
503                 if (IS_ERR(mnt)) { 
504                         EXIT;
505                         return err;
506                 }
507
508                 mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
509                 if (!obddev->u.mds.mds_sb) {
510                         EXIT;
511                         return -ENODEV;
512                 }
513
514                 mds->mds_vfsmnt = mnt;
515                 obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
516
517                 mds->mds_ctxt.pwdmnt = mnt;
518                 mds->mds_ctxt.pwd = mnt->mnt_root;
519                 mds->mds_ctxt.fs = KERNEL_DS;
520                 mds->mds_remote_nid = 0;
521         }
522
523         INIT_LIST_HEAD(&mds->mds_reqs);
524         mds->mds_thread = NULL;
525         mds->mds_flags = 0;
526         mds->mds_interval = 3 * HZ;
527         MDS = mds;
528
529         spin_lock_init(&obddev->u.mds.mds_lock);
530
531         mds_start_srv_thread(mds);
532
533         MOD_INC_USE_COUNT;
534         EXIT; 
535         return 0;
536
537
538 static int mds_cleanup(struct obd_device * obddev)
539 {
540         struct super_block *sb;
541         struct mds_obd *mds = &obddev->u.mds;
542
543         ENTRY;
544
545         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
546                 EXIT;
547                 return 0;
548         }
549
550         if ( !list_empty(&obddev->obd_gen_clients) ) {
551                 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
552                 EXIT;
553                 return -EBUSY;
554         }
555
556         MDS = NULL;
557         mds_stop_srv_thread(mds);
558         sb = mds->mds_sb;
559         if (!mds->mds_sb){
560                 EXIT;
561                 return 0;
562         }
563
564         if (!list_empty(&mds->mds_reqs)) {
565                 // XXX reply with errors and clean up
566                 CDEBUG(D_INODE, "Request list not empty!\n");
567         }
568
569         unlock_kernel();
570         mntput(mds->mds_vfsmnt); 
571         mds->mds_sb = 0;
572         kfree(mds->mds_fstype);
573         lock_kernel();
574         
575
576         MOD_DEC_USE_COUNT;
577         EXIT;
578         return 0;
579 }
580
581 /* use obd ops to offer management infrastructure */
582 static struct obd_ops mds_obd_ops = {
583         o_setup:       mds_setup,
584         o_cleanup:     mds_cleanup,
585 };
586
587 static int __init mds_init(void)
588 {
589         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
590         return 0;
591 }
592
593 static void __exit mds_exit(void)
594 {
595         obd_unregister_type(LUSTRE_MDS_NAME);
596 }
597
598 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
599 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
600 MODULE_LICENSE("GPL");
601
602
603 // for testing (maybe this stays)
604 EXPORT_SYMBOL(mds_queue_req);
605
606 module_init(mds_init);
607 module_exit(mds_exit);