Whamcloud - gitweb
We touched everything, but it's not as scary as it looks.
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *  
6  *  Lustre Metadata Server (mds) request handler
7  * 
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  * 
15  *  This server is single threaded at present (but can easily be multi threaded). 
16  * 
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/version.h>
22 #include <linux/module.h>
23 #include <linux/fs.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/ext2_fs.h>
27 #include <linux/quotaops.h>
28 #include <asm/unistd.h>
29 #include <asm/uaccess.h>
30
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_lib.h>
35
36 int mds_sendpage(struct ptlrpc_request *req, struct file *file, 
37                  __u64 offset, struct niobuf *dst)
38 {
39         int rc; 
40         mm_segment_t oldfs = get_fs();
41
42         if (req->rq_peer.peer_nid == 0) {
43                 /* dst->addr is a user address, but in a different task! */
44                 set_fs(KERNEL_DS); 
45                 rc = generic_file_read(file, (char *)(long)dst->addr, 
46                                        PAGE_SIZE, &offset); 
47                 set_fs(oldfs);
48
49                 if (rc != PAGE_SIZE) 
50                         return -EIO;
51         } else {
52                 struct ptlrpc_bulk_desc *bulk;
53                 char *buf;
54
55                 bulk = ptlrpc_prep_bulk(&req->rq_peer);
56                 if (bulk == NULL)
57                         return -ENOMEM;
58
59                 bulk->b_xid = req->rq_xid;
60
61                 OBD_ALLOC(buf, PAGE_SIZE);
62                 if (!buf) {
63                         OBD_FREE(bulk, sizeof(*bulk));
64                         return -ENOMEM;
65                 }
66
67                 set_fs(KERNEL_DS); 
68                 rc = generic_file_read(file, buf, PAGE_SIZE, &offset); 
69                 set_fs(oldfs);
70
71                 if (rc != PAGE_SIZE) {
72                         OBD_FREE(buf, PAGE_SIZE);
73                         return -EIO;
74                 }
75
76                 bulk->b_buf = buf;
77                 bulk->b_buflen = PAGE_SIZE;
78
79                 rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
80                 wait_event_interruptible(bulk->b_waitq,
81                                          ptlrpc_check_bulk_sent(bulk));
82
83                 if (bulk->b_flags == PTL_RPC_INTR) {
84                         EXIT;
85                         /* FIXME: hey hey, we leak here. */
86                         return -EINTR;
87                 }
88
89                 OBD_FREE(bulk, sizeof(*bulk));
90                 OBD_FREE(buf, PAGE_SIZE);
91         }
92
93         return 0;
94 }
95
96 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
97                               struct vfsmount **mnt)
98 {
99         /* stolen from NFS */ 
100         struct super_block *sb = mds->mds_sb; 
101         unsigned long ino = fid->id;
102         //__u32 generation = fid->generation;
103         __u32 generation = 0;
104         struct inode *inode;
105         struct list_head *lp;
106         struct dentry *result;
107
108         if (ino == 0)
109                 return ERR_PTR(-ESTALE);
110
111         inode = iget(sb, ino);
112         if (inode == NULL)
113                 return ERR_PTR(-ENOMEM);
114
115         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb); 
116
117         if (is_bad_inode(inode)
118             || (generation && inode->i_generation != generation)
119                 ) {
120                 /* we didn't find the right inode.. */
121                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
122                         inode->i_ino,
123                         inode->i_nlink, atomic_read(&inode->i_count),
124                         inode->i_generation,
125                         generation);
126                 iput(inode);
127                 return ERR_PTR(-ESTALE);
128         }
129
130         /* now to find a dentry.
131          * If possible, get a well-connected one
132          */
133         if (mnt)
134                 *mnt = mds->mds_vfsmnt;
135         spin_lock(&dcache_lock);
136         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
137                 result = list_entry(lp,struct dentry, d_alias);
138                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
139                         dget_locked(result);
140                         result->d_vfs_flags |= DCACHE_REFERENCED;
141                         spin_unlock(&dcache_lock);
142                         iput(inode);
143                         if (mnt)
144                                 mntget(*mnt);
145                         return result;
146                 }
147         }
148         spin_unlock(&dcache_lock);
149         result = d_alloc_root(inode);
150         if (result == NULL) {
151                 iput(inode);
152                 return ERR_PTR(-ENOMEM);
153         }
154         if (mnt)
155                 mntget(*mnt);
156         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
157         return result;
158 }
159
160 static inline void mds_get_objid(struct inode *inode, __u64 *id)
161 {
162         memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
163 }
164
165 int mds_getattr(struct ptlrpc_request *req)
166 {
167         struct dentry *de;
168         struct inode *inode;
169         struct mds_rep *rep;
170         int rc;
171         
172         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
173                           &req->rq_replen, &req->rq_repbuf);
174         if (rc) { 
175                 EXIT;
176                 CERROR("mds: out of memory\n");
177                 req->rq_status = -ENOMEM;
178                 return 0;
179         }
180
181         req->rq_rephdr->xid = req->rq_reqhdr->xid;
182         rep = req->rq_rep.mds;
183
184         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, NULL);
185         if (IS_ERR(de)) { 
186                 EXIT;
187                 req->rq_rephdr->status = -ENOENT;
188                 return 0;
189         }
190
191         inode = de->d_inode;
192         rep->ino = inode->i_ino;
193         rep->atime = inode->i_atime;
194         rep->ctime = inode->i_ctime;
195         rep->mtime = inode->i_mtime;
196         rep->uid = inode->i_uid;
197         rep->gid = inode->i_gid;
198         rep->size = inode->i_size;
199         rep->mode = inode->i_mode;
200         rep->nlink = inode->i_nlink;
201         rep->valid = ~0;
202         mds_get_objid(inode, &rep->objid);
203         dput(de); 
204         return 0;
205 }
206
207 int mds_open(struct ptlrpc_request *req)
208 {
209         struct dentry *de;
210         struct mds_rep *rep;
211         struct file *file;
212         struct vfsmount *mnt;
213         __u32 flags;
214         int rc;
215         
216         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
217                           &req->rq_replen, &req->rq_repbuf);
218         if (rc) { 
219                 EXIT;
220                 CERROR("mds: out of memory\n");
221                 req->rq_status = -ENOMEM;
222                 return 0;
223         }
224
225         req->rq_rephdr->xid = req->rq_reqhdr->xid;
226         rep = req->rq_rep.mds;
227
228         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
229         if (IS_ERR(de)) { 
230                 EXIT;
231                 req->rq_rephdr->status = -ENOENT;
232                 return 0;
233         }
234         flags = req->rq_req.mds->flags;
235         file = dentry_open(de, mnt, flags);
236         if (!file || IS_ERR(file)) { 
237                 req->rq_rephdr->status = -EINVAL;
238                 return 0;
239         }               
240         
241         rep->objid = (__u64) (unsigned long)file; 
242         //mds_get_objid(inode, &rep->objid);
243         dput(de); 
244         return 0;
245 }
246
247 int mds_close(struct ptlrpc_request *req)
248 {
249         struct dentry *de;
250         struct mds_rep *rep;
251         struct file *file;
252         struct vfsmount *mnt;
253         int rc;
254         
255         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
256                           &req->rq_replen, &req->rq_repbuf);
257         if (rc) { 
258                 EXIT;
259                 CERROR("mds: out of memory\n");
260                 req->rq_status = -ENOMEM;
261                 return 0;
262         }
263
264         req->rq_rephdr->xid = req->rq_reqhdr->xid;
265         rep = req->rq_rep.mds;
266
267         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
268         if (IS_ERR(de)) { 
269                 EXIT;
270                 req->rq_rephdr->status = -ENOENT;
271                 return 0;
272         }
273
274         file = (struct file *)(unsigned long) req->rq_req.mds->objid;
275         req->rq_rephdr->status = filp_close(file, 0); 
276         dput(de); 
277         return 0;
278 }
279
280
281 int mds_readpage(struct ptlrpc_request *req)
282 {
283         struct vfsmount *mnt;
284         struct dentry *de;
285         struct file *file; 
286         struct niobuf *niobuf; 
287         struct mds_rep *rep;
288         int rc;
289         
290         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
291                           &req->rq_replen, &req->rq_repbuf);
292         if (rc) { 
293                 EXIT;
294                 CERROR("mds: out of memory\n");
295                 req->rq_status = -ENOMEM;
296                 return 0;
297         }
298
299         req->rq_rephdr->xid = req->rq_reqhdr->xid;
300         rep = req->rq_rep.mds;
301
302         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
303         if (IS_ERR(de)) { 
304                 EXIT;
305                 req->rq_rephdr->status = PTR_ERR(de); 
306                 return 0;
307         }
308
309         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
310
311         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); 
312         /* note: in case of an error, dentry_open puts dentry */
313         if (IS_ERR(file)) { 
314                 EXIT;
315                 req->rq_rephdr->status = PTR_ERR(file);
316                 return 0;
317         }
318
319         niobuf = mds_req_tgt(req->rq_req.mds);
320
321         /* to make this asynchronous make sure that the handling function 
322            doesn't send a reply when this function completes. Instead a 
323            callback function would send the reply */ 
324         rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf); 
325
326         filp_close(file, 0);
327         req->rq_rephdr->status = rc;
328         EXIT;
329         return 0;
330 }
331
332 int mds_reint(struct ptlrpc_request *req)
333 {
334         int rc;
335         char *buf = mds_req_tgt(req->rq_req.mds);
336         int len = req->rq_req.mds->tgtlen;
337         struct mds_update_record rec;
338         
339         rc = mds_update_unpack(buf, len, &rec);
340         if (rc) { 
341                 CERROR("invalid record\n");
342                 req->rq_status = -EINVAL;
343                 return 0;
344         }
345         /* rc will be used to interrupt a for loop over multiple records */
346         rc = mds_reint_rec(&rec, req); 
347         return 0; 
348 }
349
350 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
351                struct ptlrpc_request *req)
352 {
353         int rc;
354         struct ptlreq_hdr *hdr;
355
356         ENTRY;
357
358         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
359
360         if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
361                 CERROR("lustre_mds: wrong packet type sent %d\n",
362                        NTOH__u32(hdr->type));
363                 rc = -EINVAL;
364                 goto out;
365         }
366
367         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
368                             &req->rq_reqhdr, &req->rq_req);
369         if (rc) { 
370                 CERROR("lustre_mds: Invalid request\n");
371                 EXIT; 
372                 goto out;
373         }
374
375         switch (req->rq_reqhdr->opc) { 
376
377         case MDS_GETATTR:
378                 CDEBUG(D_INODE, "getattr\n");
379                 rc = mds_getattr(req);
380                 break;
381
382         case MDS_READPAGE:
383                 CDEBUG(D_INODE, "readpage\n");
384                 rc = mds_readpage(req);
385                 break;
386
387         case MDS_REINT:
388                 CDEBUG(D_INODE, "reint\n");
389                 rc = mds_reint(req);
390                 break;
391
392         default:
393                 return ptlrpc_error(dev, svc, req);
394         }
395
396 out:
397         if (rc) { 
398                 CERROR("no header\n");
399                 return 0;
400         }
401
402         if( req->rq_status) { 
403                 ptlrpc_error(dev, svc, req);
404         } else { 
405                 CDEBUG(D_INODE, "sending reply\n"); 
406                 ptlrpc_reply(dev, svc, req); 
407         }
408
409         return 0;
410 }
411
412
413 /* mount the file system (secretly) */
414 static int mds_setup(struct obd_device *obddev, obd_count len,
415                         void *buf)
416                         
417 {
418         struct obd_ioctl_data* data = buf;
419         struct mds_obd *mds = &obddev->u.mds;
420         struct vfsmount *mnt;
421         int err; 
422         ENTRY;
423
424         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); 
425         err = PTR_ERR(mnt);
426         if (IS_ERR(mnt)) { 
427                 EXIT;
428                 return err;
429         }
430
431         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
432         if (!obddev->u.mds.mds_sb) {
433                 EXIT;
434                 return -ENODEV;
435         }
436
437         mds->mds_vfsmnt = mnt;
438         obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
439
440         mds->mds_ctxt.pwdmnt = mnt;
441         mds->mds_ctxt.pwd = mnt->mnt_root;
442         mds->mds_ctxt.fs = KERNEL_DS;
443
444         mds->mds_service = ptlrpc_init_svc( 64 * 1024, 
445                                             MDS_REQUEST_PORTAL,
446                                             MDC_REPLY_PORTAL,
447                                             "self", 
448                                             mds_unpack_req,
449                                             mds_pack_rep,
450                                             mds_handle);
451
452         rpc_register_service(mds->mds_service, "self");
453
454         err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds"); 
455         if (err) { 
456                 CERROR("cannot start thread\n");
457         }
458                 
459
460         MOD_INC_USE_COUNT;
461         EXIT; 
462         return 0;
463
464
465 static int mds_cleanup(struct obd_device * obddev)
466 {
467         struct super_block *sb;
468         struct mds_obd *mds = &obddev->u.mds;
469
470         ENTRY;
471
472         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
473                 EXIT;
474                 return 0;
475         }
476
477         if ( !list_empty(&obddev->obd_gen_clients) ) {
478                 CERROR("still has clients!\n");
479                 EXIT;
480                 return -EBUSY;
481         }
482
483         ptlrpc_stop_thread(mds->mds_service);
484         rpc_unregister_service(mds->mds_service);
485
486         if (!list_empty(&mds->mds_service->srv_reqs)) {
487                 // XXX reply with errors and clean up
488                 CERROR("Request list not empty!\n");
489         }
490
491         rpc_unregister_service(mds->mds_service);
492         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
493
494         sb = mds->mds_sb;
495         if (!mds->mds_sb){
496                 EXIT;
497                 return 0;
498         }
499
500         unlock_kernel();
501         mntput(mds->mds_vfsmnt); 
502         mds->mds_sb = 0;
503         kfree(mds->mds_fstype);
504         lock_kernel();
505
506         MOD_DEC_USE_COUNT;
507         EXIT;
508         return 0;
509 }
510
511 /* use obd ops to offer management infrastructure */
512 static struct obd_ops mds_obd_ops = {
513         o_setup:       mds_setup,
514         o_cleanup:     mds_cleanup,
515 };
516
517 static int __init mds_init(void)
518 {
519         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
520         return 0;
521 }
522
523 static void __exit mds_exit(void)
524 {
525         obd_unregister_type(LUSTRE_MDS_NAME);
526 }
527
528 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
529 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
530 MODULE_LICENSE("GPL");
531
532 module_init(mds_init);
533 module_exit(mds_exit);