Whamcloud - gitweb
- add open and close calls: initial purpose is to support I/O to open,
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *  
6  *  Lustre Metadata Server (mds) request handler
7  * 
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com>
14  * 
15  *  This server is single threaded at present (but can easily be multi threaded). 
16  * 
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/version.h>
22 #include <linux/module.h>
23 #include <linux/fs.h>
24 #include <linux/stat.h>
25 #include <linux/locks.h>
26 #include <linux/ext2_fs.h>
27 #include <linux/quotaops.h>
28 #include <asm/unistd.h>
29 #include <asm/uaccess.h>
30
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/obd_support.h>
34 #include <linux/obd_class.h>
35 #include <linux/obd.h>
36 #include <linux/lustre_lib.h>
37 #include <linux/lustre_idl.h>
38 #include <linux/lustre_mds.h>
39 #include <linux/lustre_net.h>
40 #include <linux/obd_class.h>
41
42 int mds_sendpage(struct ptlrpc_request *req, struct file *file, 
43                  __u64 offset, struct niobuf *dst)
44 {
45         int rc; 
46         mm_segment_t oldfs = get_fs();
47
48         if (req->rq_peer.peer_nid == 0) {
49                 /* dst->addr is a user address, but in a different task! */
50                 set_fs(KERNEL_DS); 
51                 rc = generic_file_read(file, (char *)(long)dst->addr, 
52                                        PAGE_SIZE, &offset); 
53                 set_fs(oldfs);
54
55                 if (rc != PAGE_SIZE) 
56                         return -EIO;
57         } else {
58                 struct ptlrpc_bulk_desc *bulk;
59                 char *buf;
60
61                 bulk = ptlrpc_prep_bulk(&req->rq_peer);
62                 if (bulk == NULL)
63                         return -ENOMEM;
64
65                 bulk->b_xid = req->rq_xid;
66
67                 OBD_ALLOC(buf, PAGE_SIZE);
68                 if (!buf) {
69                         OBD_FREE(bulk, sizeof(*bulk));
70                         return -ENOMEM;
71                 }
72
73                 set_fs(KERNEL_DS); 
74                 rc = generic_file_read(file, buf, PAGE_SIZE, &offset); 
75                 set_fs(oldfs);
76
77                 if (rc != PAGE_SIZE) {
78                         OBD_FREE(buf, PAGE_SIZE);
79                         return -EIO;
80                 }
81
82                 bulk->b_buf = buf;
83                 bulk->b_buflen = PAGE_SIZE;
84
85                 rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
86                 wait_event_interruptible(bulk->b_waitq,
87                                          ptlrpc_check_bulk_sent(bulk));
88
89                 if (bulk->b_flags == PTL_RPC_INTR) {
90                         EXIT;
91                         /* FIXME: hey hey, we leak here. */
92                         return -EINTR;
93                 }
94
95                 OBD_FREE(bulk, sizeof(*bulk));
96                 OBD_FREE(buf, PAGE_SIZE);
97         }
98
99         return 0;
100 }
101
102 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
103                               struct vfsmount **mnt)
104 {
105         /* stolen from NFS */ 
106         struct super_block *sb = mds->mds_sb; 
107         unsigned long ino = fid->id;
108         //__u32 generation = fid->generation;
109         __u32 generation = 0;
110         struct inode *inode;
111         struct list_head *lp;
112         struct dentry *result;
113
114         if (ino == 0)
115                 return ERR_PTR(-ESTALE);
116
117         inode = iget(sb, ino);
118         if (inode == NULL)
119                 return ERR_PTR(-ENOMEM);
120
121         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb); 
122
123         if (is_bad_inode(inode)
124             || (generation && inode->i_generation != generation)
125                 ) {
126                 /* we didn't find the right inode.. */
127                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
128                         inode->i_ino,
129                         inode->i_nlink, atomic_read(&inode->i_count),
130                         inode->i_generation,
131                         generation);
132                 iput(inode);
133                 return ERR_PTR(-ESTALE);
134         }
135
136         /* now to find a dentry.
137          * If possible, get a well-connected one
138          */
139         if (mnt)
140                 *mnt = mds->mds_vfsmnt;
141         spin_lock(&dcache_lock);
142         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
143                 result = list_entry(lp,struct dentry, d_alias);
144                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
145                         dget_locked(result);
146                         result->d_vfs_flags |= DCACHE_REFERENCED;
147                         spin_unlock(&dcache_lock);
148                         iput(inode);
149                         if (mnt)
150                                 mntget(*mnt);
151                         return result;
152                 }
153         }
154         spin_unlock(&dcache_lock);
155         result = d_alloc_root(inode);
156         if (result == NULL) {
157                 iput(inode);
158                 return ERR_PTR(-ENOMEM);
159         }
160         if (mnt)
161                 mntget(*mnt);
162         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
163         return result;
164 }
165
166 static inline void mds_get_objid(struct inode *inode, __u64 *id)
167 {
168         memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
169 }
170
171 int mds_getattr(struct ptlrpc_request *req)
172 {
173         struct dentry *de;
174         struct inode *inode;
175         struct mds_rep *rep;
176         int rc;
177         
178         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
179                           &req->rq_replen, &req->rq_repbuf);
180         if (rc) { 
181                 EXIT;
182                 CERROR("mds: out of memory\n");
183                 req->rq_status = -ENOMEM;
184                 return 0;
185         }
186
187         req->rq_rephdr->xid = req->rq_reqhdr->xid;
188         rep = req->rq_rep.mds;
189
190         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, NULL);
191         if (IS_ERR(de)) { 
192                 EXIT;
193                 req->rq_rephdr->status = -ENOENT;
194                 return 0;
195         }
196
197         inode = de->d_inode;
198         rep->ino = inode->i_ino;
199         rep->atime = inode->i_atime;
200         rep->ctime = inode->i_ctime;
201         rep->mtime = inode->i_mtime;
202         rep->uid = inode->i_uid;
203         rep->gid = inode->i_gid;
204         rep->size = inode->i_size;
205         rep->mode = inode->i_mode;
206         rep->nlink = inode->i_nlink;
207         rep->valid = ~0;
208         mds_get_objid(inode, &rep->objid);
209         dput(de); 
210         return 0;
211 }
212
213 int mds_open(struct ptlrpc_request *req)
214 {
215         struct dentry *de;
216         struct mds_rep *rep;
217         struct file *file;
218         struct vfsmount *mnt;
219         __u32 flags;
220         int rc;
221         
222         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
223                           &req->rq_replen, &req->rq_repbuf);
224         if (rc) { 
225                 EXIT;
226                 CERROR("mds: out of memory\n");
227                 req->rq_status = -ENOMEM;
228                 return 0;
229         }
230
231         req->rq_rephdr->xid = req->rq_reqhdr->xid;
232         rep = req->rq_rep.mds;
233
234         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
235         if (IS_ERR(de)) { 
236                 EXIT;
237                 req->rq_rephdr->status = -ENOENT;
238                 return 0;
239         }
240         flags = req->rq_req.mds->flags;
241         file = dentry_open(de, mnt, flags);
242         if (!file || IS_ERR(file)) { 
243                 req->rq_rephdr->status = -EINVAL;
244                 return 0;
245         }               
246         
247         rep->objid = (__u64) (unsigned long)file; 
248         //mds_get_objid(inode, &rep->objid);
249         dput(de); 
250         return 0;
251 }
252
253 int mds_close(struct ptlrpc_request *req)
254 {
255         struct dentry *de;
256         struct mds_rep *rep;
257         struct file *file;
258         struct vfsmount *mnt;
259         int rc;
260         
261         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
262                           &req->rq_replen, &req->rq_repbuf);
263         if (rc) { 
264                 EXIT;
265                 CERROR("mds: out of memory\n");
266                 req->rq_status = -ENOMEM;
267                 return 0;
268         }
269
270         req->rq_rephdr->xid = req->rq_reqhdr->xid;
271         rep = req->rq_rep.mds;
272
273         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
274         if (IS_ERR(de)) { 
275                 EXIT;
276                 req->rq_rephdr->status = -ENOENT;
277                 return 0;
278         }
279
280         file = (struct file *)(unsigned long) req->rq_req.mds->objid;
281         req->rq_rephdr->status = filp_close(file, 0); 
282         dput(de); 
283         return 0;
284 }
285
286
287 int mds_readpage(struct ptlrpc_request *req)
288 {
289         struct vfsmount *mnt;
290         struct dentry *de;
291         struct file *file; 
292         struct niobuf *niobuf; 
293         struct mds_rep *rep;
294         int rc;
295         
296         rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
297                           &req->rq_replen, &req->rq_repbuf);
298         if (rc) { 
299                 EXIT;
300                 CERROR("mds: out of memory\n");
301                 req->rq_status = -ENOMEM;
302                 return 0;
303         }
304
305         req->rq_rephdr->xid = req->rq_reqhdr->xid;
306         rep = req->rq_rep.mds;
307
308         de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
309         if (IS_ERR(de)) { 
310                 EXIT;
311                 req->rq_rephdr->status = PTR_ERR(de); 
312                 return 0;
313         }
314
315         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
316
317         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); 
318         /* note: in case of an error, dentry_open puts dentry */
319         if (IS_ERR(file)) { 
320                 EXIT;
321                 req->rq_rephdr->status = PTR_ERR(file);
322                 return 0;
323         }
324
325         niobuf = mds_req_tgt(req->rq_req.mds);
326
327         /* to make this asynchronous make sure that the handling function 
328            doesn't send a reply when this function completes. Instead a 
329            callback function would send the reply */ 
330         rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf); 
331
332         filp_close(file, 0);
333         req->rq_rephdr->status = rc;
334         EXIT;
335         return 0;
336 }
337
338 int mds_reint(struct ptlrpc_request *req)
339 {
340         int rc;
341         char *buf = mds_req_tgt(req->rq_req.mds);
342         int len = req->rq_req.mds->tgtlen;
343         struct mds_update_record rec;
344         
345         rc = mds_update_unpack(buf, len, &rec);
346         if (rc) { 
347                 CERROR("invalid record\n");
348                 req->rq_status = -EINVAL;
349                 return 0;
350         }
351         /* rc will be used to interrupt a for loop over multiple records */
352         rc = mds_reint_rec(&rec, req); 
353         return 0; 
354 }
355
356 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
357                struct ptlrpc_request *req)
358 {
359         int rc;
360         struct ptlreq_hdr *hdr;
361
362         ENTRY;
363
364         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
365
366         if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
367                 CERROR("lustre_mds: wrong packet type sent %d\n",
368                        NTOH__u32(hdr->type));
369                 rc = -EINVAL;
370                 goto out;
371         }
372
373         rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
374                             &req->rq_reqhdr, &req->rq_req);
375         if (rc) { 
376                 CERROR("lustre_mds: Invalid request\n");
377                 EXIT; 
378                 goto out;
379         }
380
381         switch (req->rq_reqhdr->opc) { 
382
383         case MDS_GETATTR:
384                 CDEBUG(D_INODE, "getattr\n");
385                 rc = mds_getattr(req);
386                 break;
387
388         case MDS_READPAGE:
389                 CDEBUG(D_INODE, "readpage\n");
390                 rc = mds_readpage(req);
391                 break;
392
393         case MDS_REINT:
394                 CDEBUG(D_INODE, "reint\n");
395                 rc = mds_reint(req);
396                 break;
397
398         default:
399                 return ptlrpc_error(dev, svc, req);
400         }
401
402 out:
403         if (rc) { 
404                 CERROR("no header\n");
405                 return 0;
406         }
407
408         if( req->rq_status) { 
409                 ptlrpc_error(dev, svc, req);
410         } else { 
411                 CDEBUG(D_INODE, "sending reply\n"); 
412                 ptlrpc_reply(dev, svc, req); 
413         }
414
415         return 0;
416 }
417
418
419 /* mount the file system (secretly) */
420 static int mds_setup(struct obd_device *obddev, obd_count len,
421                         void *buf)
422                         
423 {
424         struct obd_ioctl_data* data = buf;
425         struct mds_obd *mds = &obddev->u.mds;
426         struct vfsmount *mnt;
427         int err; 
428         ENTRY;
429
430         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); 
431         err = PTR_ERR(mnt);
432         if (IS_ERR(mnt)) { 
433                 EXIT;
434                 return err;
435         }
436
437         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
438         if (!obddev->u.mds.mds_sb) {
439                 EXIT;
440                 return -ENODEV;
441         }
442
443         mds->mds_vfsmnt = mnt;
444         obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
445
446         mds->mds_ctxt.pwdmnt = mnt;
447         mds->mds_ctxt.pwd = mnt->mnt_root;
448         mds->mds_ctxt.fs = KERNEL_DS;
449
450         mds->mds_service = ptlrpc_init_svc( 64 * 1024, 
451                                             MDS_REQUEST_PORTAL,
452                                             MDC_REPLY_PORTAL,
453                                             "self", 
454                                             mds_unpack_req,
455                                             mds_pack_rep,
456                                             mds_handle);
457
458         rpc_register_service(mds->mds_service, "self");
459
460         err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds"); 
461         if (err) { 
462                 CERROR("cannot start thread\n");
463         }
464                 
465
466         MOD_INC_USE_COUNT;
467         EXIT; 
468         return 0;
469
470
471 static int mds_cleanup(struct obd_device * obddev)
472 {
473         struct super_block *sb;
474         struct mds_obd *mds = &obddev->u.mds;
475
476         ENTRY;
477
478         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
479                 EXIT;
480                 return 0;
481         }
482
483         if ( !list_empty(&obddev->obd_gen_clients) ) {
484                 CERROR("still has clients!\n");
485                 EXIT;
486                 return -EBUSY;
487         }
488
489         ptlrpc_stop_thread(mds->mds_service);
490         rpc_unregister_service(mds->mds_service);
491
492         if (!list_empty(&mds->mds_service->srv_reqs)) {
493                 // XXX reply with errors and clean up
494                 CERROR("Request list not empty!\n");
495         }
496
497         rpc_unregister_service(mds->mds_service);
498         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
499
500         sb = mds->mds_sb;
501         if (!mds->mds_sb){
502                 EXIT;
503                 return 0;
504         }
505
506         unlock_kernel();
507         mntput(mds->mds_vfsmnt); 
508         mds->mds_sb = 0;
509         kfree(mds->mds_fstype);
510         lock_kernel();
511
512         MOD_DEC_USE_COUNT;
513         EXIT;
514         return 0;
515 }
516
517 /* use obd ops to offer management infrastructure */
518 static struct obd_ops mds_obd_ops = {
519         o_setup:       mds_setup,
520         o_cleanup:     mds_cleanup,
521 };
522
523 static int __init mds_init(void)
524 {
525         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
526         return 0;
527 }
528
529 static void __exit mds_exit(void)
530 {
531         obd_unregister_type(LUSTRE_MDS_NAME);
532 }
533
534 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
535 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
536 MODULE_LICENSE("GPL");
537
538 module_init(mds_init);
539 module_exit(mds_exit);