Whamcloud - gitweb
- fixed some NTOH/HTON mixups
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *  
6  *  Lustre Metadata Server (mds) request handler
7  * 
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  * 
15  *  This server is single threaded at present (but can easily be multi threaded). 
16  * 
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/version.h>
22 #include <linux/module.h>
23 #include <linux/fs.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/ext2_fs.h>
27 #include <linux/quotaops.h>
28 #include <asm/unistd.h>
29 #include <asm/uaccess.h>
30
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_lib.h>
35
36 int mds_sendpage(struct ptlrpc_request *req, struct file *file, 
37                  __u64 offset, struct niobuf *dst)
38 {
39         int rc = 0;
40         mm_segment_t oldfs = get_fs();
41
42         if (req->rq_peer.peer_nid == 0) {
43                 /* dst->addr is a user address, but in a different task! */
44                 set_fs(KERNEL_DS); 
45                 /* FIXME: Can't use file->f_op->read() on a directory */
46                 rc = generic_file_read(file, (char *)(long)dst->addr, 
47                                        PAGE_SIZE, &offset); 
48                 set_fs(oldfs);
49
50                 if (rc != PAGE_SIZE) {
51                         rc = -EIO;
52                         EXIT;
53                         goto out;
54                 }
55                 EXIT;
56         } else {
57                 struct ptlrpc_bulk_desc *bulk;
58                 char *buf = NULL;
59
60                 bulk = ptlrpc_prep_bulk(&req->rq_peer);
61                 if (bulk == NULL) {
62                         rc = -ENOMEM;
63                         EXIT;
64                         goto out;
65                 }
66
67                 bulk->b_xid = req->rq_xid;
68
69                 OBD_ALLOC(buf, PAGE_SIZE);
70                 if (!buf) {
71                         rc = -ENOMEM;
72                         EXIT;
73                         goto cleanup_bulk;
74                 }
75
76                 set_fs(KERNEL_DS); 
77                 /* FIXME: Can't use file->f_op->read() on a directory */
78                 //rc = file->f_op->read(file, buf, PAGE_SIZE, &offset);
79                 rc = generic_file_read(file, buf, PAGE_SIZE, &offset);
80                 set_fs(oldfs);
81
82                 if (rc != PAGE_SIZE) {
83                         rc = -EIO;
84                         EXIT;
85                         goto cleanup_buf;
86                 }
87
88                 bulk->b_buf = buf;
89                 bulk->b_buflen = PAGE_SIZE;
90
91                 rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
92                 wait_event_interruptible(bulk->b_waitq,
93                                          ptlrpc_check_bulk_sent(bulk));
94
95                 if (bulk->b_flags == PTL_RPC_INTR) {
96                         EXIT;
97                         rc = -EINTR;
98                         goto cleanup_buf;
99                 }
100
101                 EXIT;
102         cleanup_buf:
103                 OBD_FREE(buf, PAGE_SIZE);
104         cleanup_bulk:
105                 OBD_FREE(bulk, sizeof(*bulk));
106         }
107 out:
108         return rc;
109 }
110
111 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
112                               struct vfsmount **mnt)
113 {
114         /* stolen from NFS */ 
115         struct super_block *sb = mds->mds_sb; 
116         unsigned long ino = fid->id;
117         //__u32 generation = fid->generation;
118         __u32 generation = 0;
119         struct inode *inode;
120         struct list_head *lp;
121         struct dentry *result;
122
123         if (ino == 0)
124                 return ERR_PTR(-ESTALE);
125
126         inode = iget(sb, ino);
127         if (inode == NULL)
128                 return ERR_PTR(-ENOMEM);
129
130         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb); 
131
132         if (is_bad_inode(inode)
133             || (generation && inode->i_generation != generation)
134                 ) {
135                 /* we didn't find the right inode.. */
136                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
137                         inode->i_ino,
138                         inode->i_nlink, atomic_read(&inode->i_count),
139                         inode->i_generation,
140                         generation);
141                 BUG();
142                 iput(inode);
143                 return ERR_PTR(-ESTALE);
144         }
145
146         /* now to find a dentry.
147          * If possible, get a well-connected one
148          */
149         if (mnt)
150                 *mnt = mds->mds_vfsmnt;
151         spin_lock(&dcache_lock);
152         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
153                 result = list_entry(lp,struct dentry, d_alias);
154                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
155                         dget_locked(result);
156                         result->d_vfs_flags |= DCACHE_REFERENCED;
157                         spin_unlock(&dcache_lock);
158                         iput(inode);
159                         if (mnt)
160                                 mntget(*mnt);
161                         return result;
162                 }
163         }
164         spin_unlock(&dcache_lock);
165         result = d_alloc_root(inode);
166         if (result == NULL) {
167                 iput(inode);
168                 return ERR_PTR(-ENOMEM);
169         }
170         if (mnt)
171                 mntget(*mnt);
172         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
173         return result;
174 }
175
176 static inline void mds_get_objid(struct inode *inode, __u64 *id)
177 {
178         /* FIXME: it is only by luck that this works on ext3 */
179         memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
180 }
181
182 int mds_getattr(struct ptlrpc_request *req)
183 {
184         struct dentry *de;
185         struct inode *inode;
186         struct mds_rep *rep;
187         int rc;
188         
189         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
190                           &req->rq_replen, &req->rq_repbuf);
191         if (rc) { 
192                 EXIT;
193                 CERROR("mds: out of memory\n");
194                 req->rq_status = -ENOMEM;
195                 return 0;
196         }
197
198         req->rq_rephdr->xid = req->rq_reqhdr->xid;
199         rep = req->rq_rep.mds;
200
201         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, NULL);
202         if (IS_ERR(de)) { 
203                 EXIT;
204                 req->rq_rephdr->status = -ENOENT;
205                 return 0;
206         }
207
208         inode = de->d_inode;
209         rep->ino = inode->i_ino;
210         rep->atime = inode->i_atime;
211         rep->ctime = inode->i_ctime;
212         rep->mtime = inode->i_mtime;
213         rep->uid = inode->i_uid;
214         rep->gid = inode->i_gid;
215         rep->size = inode->i_size;
216         rep->mode = inode->i_mode;
217         rep->nlink = inode->i_nlink;
218         rep->valid = ~0;
219         mds_get_objid(inode, &rep->objid);
220         dput(de); 
221         return 0;
222 }
223
224 int mds_open(struct ptlrpc_request *req)
225 {
226         struct dentry *de;
227         struct mds_rep *rep;
228         struct file *file;
229         struct vfsmount *mnt;
230         __u32 flags;
231         int rc;
232         
233         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
234                           &req->rq_replen, &req->rq_repbuf);
235         if (rc) { 
236                 EXIT;
237                 CERROR("mds: out of memory\n");
238                 req->rq_status = -ENOMEM;
239                 return 0;
240         }
241
242         req->rq_rephdr->xid = req->rq_reqhdr->xid;
243         rep = req->rq_rep.mds;
244
245         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
246         if (IS_ERR(de)) { 
247                 EXIT;
248                 req->rq_rephdr->status = -ENOENT;
249                 return 0;
250         }
251         flags = req->rq_req.mds->flags;
252         file = dentry_open(de, mnt, flags);
253         if (!file || IS_ERR(file)) { 
254                 req->rq_rephdr->status = -EINVAL;
255                 return 0;
256         }               
257         
258         rep->objid = (__u64) (unsigned long)file; 
259         return 0;
260 }
261
262 int mds_close(struct ptlrpc_request *req)
263 {
264         struct dentry *de;
265         struct mds_rep *rep;
266         struct file *file;
267         struct vfsmount *mnt;
268         int rc;
269         
270         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
271                           &req->rq_replen, &req->rq_repbuf);
272         if (rc) { 
273                 EXIT;
274                 CERROR("mds: out of memory\n");
275                 req->rq_status = -ENOMEM;
276                 return 0;
277         }
278
279         req->rq_rephdr->xid = req->rq_reqhdr->xid;
280         rep = req->rq_rep.mds;
281
282         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
283         if (IS_ERR(de)) { 
284                 EXIT;
285                 req->rq_rephdr->status = -ENOENT;
286                 return 0;
287         }
288
289         file = (struct file *)(unsigned long) req->rq_req.mds->objid;
290
291         req->rq_rephdr->status = filp_close(file, 0); 
292         dput(de); 
293         return 0;
294 }
295
296
297 int mds_readpage(struct ptlrpc_request *req)
298 {
299         struct vfsmount *mnt;
300         struct dentry *de;
301         struct file *file; 
302         struct niobuf *niobuf; 
303         struct mds_rep *rep;
304         int rc;
305         
306         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
307                           &req->rq_replen, &req->rq_repbuf);
308         if (rc) { 
309                 EXIT;
310                 CERROR("mds: out of memory\n");
311                 req->rq_status = -ENOMEM;
312                 return 0;
313         }
314
315         req->rq_rephdr->xid = req->rq_reqhdr->xid;
316         rep = req->rq_rep.mds;
317
318         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
319         if (IS_ERR(de)) { 
320                 EXIT;
321                 req->rq_rephdr->status = PTR_ERR(de); 
322                 return 0;
323         }
324
325         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
326
327         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); 
328         /* note: in case of an error, dentry_open puts dentry */
329         if (IS_ERR(file)) { 
330                 EXIT;
331                 req->rq_rephdr->status = PTR_ERR(file);
332                 return 0;
333         }
334
335         niobuf = mds_req_tgt(req->rq_req.mds);
336
337         /* to make this asynchronous make sure that the handling function 
338            doesn't send a reply when this function completes. Instead a 
339            callback function would send the reply */ 
340         rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf); 
341
342         filp_close(file, 0);
343         req->rq_rephdr->status = rc;
344         EXIT;
345         return 0;
346 }
347
348 int mds_reint(struct ptlrpc_request *req)
349 {
350         int rc;
351         char *buf = mds_req_tgt(req->rq_req.mds);
352         int len = req->rq_req.mds->tgtlen;
353         struct mds_update_record rec;
354         
355         rc = mds_update_unpack(buf, len, &rec);
356         if (rc) { 
357                 CERROR("invalid record\n");
358                 req->rq_status = -EINVAL;
359                 return 0;
360         }
361         /* rc will be used to interrupt a for loop over multiple records */
362         rc = mds_reint_rec(&rec, req); 
363         return 0; 
364 }
365
366 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
367                struct ptlrpc_request *req)
368 {
369         int rc;
370         struct ptlreq_hdr *hdr;
371
372         ENTRY;
373
374         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
375
376         if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
377                 CERROR("lustre_mds: wrong packet type sent %d\n",
378                        NTOH__u32(hdr->type));
379                 rc = -EINVAL;
380                 goto out;
381         }
382
383         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
384                             &req->rq_reqhdr, &req->rq_req);
385         if (rc) { 
386                 CERROR("lustre_mds: Invalid request\n");
387                 EXIT; 
388                 goto out;
389         }
390
391         switch (req->rq_reqhdr->opc) { 
392
393         case MDS_GETATTR:
394                 CDEBUG(D_INODE, "getattr\n");
395                 rc = mds_getattr(req);
396                 break;
397
398         case MDS_READPAGE:
399                 CDEBUG(D_INODE, "readpage\n");
400                 rc = mds_readpage(req);
401                 break;
402
403         case MDS_REINT:
404                 CDEBUG(D_INODE, "reint\n");
405                 rc = mds_reint(req);
406                 break;
407
408         case MDS_OPEN:
409                 CDEBUG(D_INODE, "open\n");
410                 rc = mds_open(req);
411                 break;
412
413         case MDS_CLOSE:
414                 CDEBUG(D_INODE, "close\n");
415                 rc = mds_close(req);
416                 break;
417
418         default:
419                 return ptlrpc_error(dev, svc, req);
420         }
421
422 out:
423         if (rc) { 
424                 CERROR("no header\n");
425                 return 0;
426         }
427
428         if( req->rq_status) { 
429                 ptlrpc_error(dev, svc, req);
430         } else { 
431                 CDEBUG(D_INODE, "sending reply\n"); 
432                 ptlrpc_reply(dev, svc, req); 
433         }
434
435         return 0;
436 }
437
438
439 /* mount the file system (secretly) */
440 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
441 {
442         struct obd_ioctl_data* data = buf;
443         struct mds_obd *mds = &obddev->u.mds;
444         struct vfsmount *mnt;
445         int err; 
446         ENTRY;
447
448         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); 
449         err = PTR_ERR(mnt);
450         if (IS_ERR(mnt)) {
451                 CERROR("do_kern_mount failed: %d\n", err);
452                 EXIT;
453                 return err;
454         }
455
456         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
457         if (!mds->mds_sb) {
458                 EXIT;
459                 return -ENODEV;
460         }
461
462         mds->mds_vfsmnt = mnt;
463         mds->mds_fstype = strdup(data->ioc_inlbuf2);
464
465         mds->mds_ctxt.pwdmnt = mnt;
466         mds->mds_ctxt.pwd = mnt->mnt_root;
467         mds->mds_ctxt.fs = KERNEL_DS;
468
469         mds->mds_service = ptlrpc_init_svc( 64 * 1024, 
470                                             MDS_REQUEST_PORTAL,
471                                             MDC_REPLY_PORTAL,
472                                             "self", 
473                                             mds_unpack_req,
474                                             mds_pack_rep,
475                                             mds_handle);
476
477         rpc_register_service(mds->mds_service, "self");
478
479         err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds"); 
480         if (err) { 
481                 CERROR("cannot start thread\n");
482         }
483
484         MOD_INC_USE_COUNT;
485         EXIT; 
486         return 0;
487
488
489 static int mds_cleanup(struct obd_device * obddev)
490 {
491         struct super_block *sb;
492         struct mds_obd *mds = &obddev->u.mds;
493
494         ENTRY;
495
496         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
497                 EXIT;
498                 return 0;
499         }
500
501         if ( !list_empty(&obddev->obd_gen_clients) ) {
502                 CERROR("still has clients!\n");
503                 EXIT;
504                 return -EBUSY;
505         }
506
507         ptlrpc_stop_thread(mds->mds_service);
508         rpc_unregister_service(mds->mds_service);
509
510         if (!list_empty(&mds->mds_service->srv_reqs)) {
511                 // XXX reply with errors and clean up
512                 CERROR("Request list not empty!\n");
513         }
514
515         rpc_unregister_service(mds->mds_service);
516         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
517
518         sb = mds->mds_sb;
519         if (!mds->mds_sb){
520                 EXIT;
521                 return 0;
522         }
523
524         unlock_kernel();
525         mntput(mds->mds_vfsmnt); 
526         mds->mds_sb = 0;
527         kfree(mds->mds_fstype);
528         lock_kernel();
529
530         MOD_DEC_USE_COUNT;
531         EXIT;
532         return 0;
533 }
534
535 /* use obd ops to offer management infrastructure */
536 static struct obd_ops mds_obd_ops = {
537         o_setup:       mds_setup,
538         o_cleanup:     mds_cleanup,
539 };
540
541 static int __init mds_init(void)
542 {
543         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
544         return 0;
545 }
546
547 static void __exit mds_exit(void)
548 {
549         obd_unregister_type(LUSTRE_MDS_NAME);
550 }
551
552 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
553 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
554 MODULE_LICENSE("GPL");
555
556 module_init(mds_init);
557 module_exit(mds_exit);