Whamcloud - gitweb
- Added match_or_enqueue helper function
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *
6  *  Lustre Metadata Server (mds) request handler
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com> &
14  *     Andreas Dilger <braam@clusterfs.com>
15  *
16  *  This server is single threaded at present (but can easily be multi threaded)
17  *
18  */
19
20 #define EXPORT_SYMTAB
21 #define DEBUG_SUBSYSTEM S_MDS
22
23 #include <linux/module.h>
24 #include <linux/lustre_mds.h>
25 #include <linux/lustre_dlm.h>
26 extern int mds_get_lovtgts(struct obd_device *obd, uuid_t *uuidarray);
27 extern int mds_get_lovdesc(struct obd_device *obd, struct lov_desc *desc);
28 extern int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
29                                 struct ptlrpc_request *req);
30 static int mds_cleanup(struct obd_device * obddev);
31
32 inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
33 {
34         return &req->rq_export->export_obd->u.mds;
35 }
36
37 /* Assumes caller has already pushed into the kernel filesystem context */
38 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
39                         __u64 offset)
40 {
41         int rc = 0;
42         struct mds_obd *mds = mds_req2mds(req); 
43         struct ptlrpc_bulk_desc *desc;
44         struct ptlrpc_bulk_page *bulk;
45         char *buf;
46         ENTRY;
47
48         desc = ptlrpc_prep_bulk(req->rq_connection);
49         if (desc == NULL)
50                 GOTO(out, rc = -ENOMEM);
51
52         bulk = ptlrpc_prep_bulk_page(desc);
53         if (bulk == NULL)
54                 GOTO(cleanup_bulk, rc = -ENOMEM);
55
56         OBD_ALLOC(buf, PAGE_SIZE);
57         if (buf == NULL)
58                 GOTO(cleanup_bulk, rc = -ENOMEM);
59
60         rc = mds_fs_readpage(mds, file, buf, PAGE_SIZE, (loff_t *)&offset);
61
62         if (rc != PAGE_SIZE)
63                 GOTO(cleanup_buf, rc = -EIO);
64
65         bulk->b_xid = req->rq_xid;
66         bulk->b_buf = buf;
67         bulk->b_buflen = PAGE_SIZE;
68         desc->b_portal = MDS_BULK_PORTAL;
69
70         rc = ptlrpc_send_bulk(desc);
71         if (rc)
72                 GOTO(cleanup_buf, rc);
73
74         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
75                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
76                        OBD_FAIL_MDS_SENDPAGE, rc);
77                 ptlrpc_abort_bulk(desc);
78                 GOTO(cleanup_buf, rc);
79         }
80
81         wait_event(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
82         if (desc->b_flags & PTL_RPC_FL_INTR)
83                 GOTO(cleanup_buf, rc = -EINTR);
84
85         EXIT;
86  cleanup_buf:
87         OBD_FREE(buf, PAGE_SIZE);
88  cleanup_bulk:
89         ptlrpc_free_bulk(desc);
90  out:
91         return rc;
92 }
93
94 /* 'dir' is a inode for which a lock has already been taken */
95 struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir,
96                                       struct vfsmount **mnt, char *name,
97                                       int namelen, int lock_mode,
98                                       struct lustre_handle *lockh)
99 {
100         struct dentry *dchild;
101         int flags, rc;
102         __u64 res_id[3] = {0};
103         ENTRY;
104
105         down(&dir->d_inode->i_sem);
106         dchild = lookup_one_len(name, dir, namelen);
107         if (IS_ERR(dchild)) {
108                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
109                 up(&dir->d_inode->i_sem);
110                 LBUG();
111         }
112         up(&dir->d_inode->i_sem);
113
114         if (lock_mode == 0)
115                 RETURN(dchild);
116
117         res_id[0] = dchild->d_inode->i_ino;
118         rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
119                                    (struct lustre_handle *)&mds->mds_connh,
120                                    NULL, mds->mds_local_namespace, NULL,
121                                    res_id, LDLM_PLAIN, NULL, 0, lock_mode,
122                                    &flags, (void *)mds_lock_callback, NULL,
123                                    0, lockh);
124         if (rc != ELDLM_OK) {
125                 l_dput(dchild);
126                 RETURN(NULL);
127         }
128
129         RETURN(dchild);
130 }
131
132 struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid,
133                                      struct vfsmount **mnt, int lock_mode,
134                                      struct lustre_handle *lockh)
135 {
136         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
137         int flags, rc;
138         __u64 res_id[3] = {0};
139         ENTRY;
140
141         if (IS_ERR(de))
142                 RETURN(de);
143
144         res_id[0] = de->d_inode->i_ino;
145         rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
146                                    (struct lustre_handle *)&mds->mds_connh,
147                                    NULL, mds->mds_local_namespace, NULL,
148                                    res_id, LDLM_PLAIN, NULL, 0, lock_mode,
149                                    &flags, (void *)mds_lock_callback, NULL,
150                                    0, lockh);
151         if (rc != ELDLM_OK) {
152                 l_dput(de);
153                 retval = NULL;
154         }
155
156         RETURN(retval);
157 }
158
159 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
160                               struct vfsmount **mnt)
161 {
162         /* stolen from NFS */
163         struct super_block *sb = mds->mds_sb;
164         unsigned long ino = fid->id;
165         __u32 generation = fid->generation;
166         struct inode *inode;
167         struct list_head *lp;
168         struct dentry *result;
169
170         if (ino == 0)
171                 RETURN(ERR_PTR(-ESTALE));
172
173         inode = iget(sb, ino);
174         if (inode == NULL)
175                 RETURN(ERR_PTR(-ENOMEM));
176
177         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
178
179         if (is_bad_inode(inode) ||
180             (generation && inode->i_generation != generation)) {
181                 /* we didn't find the right inode.. */
182                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
183                        inode->i_ino, inode->i_nlink,
184                        atomic_read(&inode->i_count), inode->i_generation,
185                        generation);
186                 LBUG();
187                 iput(inode);
188                 RETURN(ERR_PTR(-ESTALE));
189         }
190
191         /* now to find a dentry.
192          * If possible, get a well-connected one
193          */
194         if (mnt)
195                 *mnt = mds->mds_vfsmnt;
196         spin_lock(&dcache_lock);
197         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
198                 result = list_entry(lp,struct dentry, d_alias);
199                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
200                         dget_locked(result);
201                         result->d_vfs_flags |= DCACHE_REFERENCED;
202                         spin_unlock(&dcache_lock);
203                         iput(inode);
204                         if (mnt)
205                                 mntget(*mnt);
206                         return result;
207                 }
208         }
209         spin_unlock(&dcache_lock);
210         result = d_alloc_root(inode);
211         if (result == NULL) {
212                 iput(inode);
213                 return ERR_PTR(-ENOMEM);
214         }
215         if (mnt)
216                 mntget(*mnt);
217         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
218         return result;
219 }
220
221 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd)
222 {
223         int rc;
224
225         MOD_INC_USE_COUNT;
226         rc = class_connect(conn, obd);
227
228         if (rc)
229                 MOD_DEC_USE_COUNT;
230
231         return rc;
232 }
233
234 static int mds_disconnect(struct lustre_handle *conn)
235 {
236         int rc;
237
238         rc = class_disconnect(conn);
239         if (!rc)
240                 MOD_DEC_USE_COUNT;
241
242         return rc;
243 }
244
245 /* FIXME: the error cases need fixing to avoid leaks */
246 static int mds_getstatus(struct ptlrpc_request *req)
247 {
248         struct mds_obd *mds = mds_req2mds(req);
249         struct mds_body *body;
250         struct mds_client_info *mci;
251         struct mds_client_data *mcd;
252         int rc, size = sizeof(*body);
253         ENTRY;
254
255         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
256         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
257                 CERROR("mds: out of memory for message: size=%d\n", size);
258                 req->rq_status = -ENOMEM;
259                 RETURN(0);
260         }
261
262         body = lustre_msg_buf(req->rq_reqmsg, 0);
263         mds_unpack_body(body);
264
265         /* Anything we need to do here with the client's trans no or so? */
266         body = lustre_msg_buf(req->rq_repmsg, 0);
267         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
268
269         mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
270         if (!mci) {
271                 /* We don't have any old connection data for this client */
272                 int rc;
273
274                 CDEBUG(D_INFO, "allocating new client data for UUID '%s'",
275                        ptlrpc_req_to_uuid(req));
276
277                 OBD_ALLOC(mcd, sizeof(*mcd));
278                 if (!mcd) {
279                         CERROR("mds: out of memory for client data\n");
280                         req->rq_status = -ENOMEM;
281                         RETURN(0);
282                 }
283                 memcpy(mcd->mcd_uuid, ptlrpc_req_to_uuid(req),
284                        sizeof(mcd->mcd_uuid));
285                 rc = mds_client_add(mds, mcd, -1);
286                 if (rc) {
287                         req->rq_status = rc;
288                         RETURN(0);
289                 }
290         } else {
291                 /* We have old connection data for this client... */
292                 mcd = mci->mci_mcd;
293                 CDEBUG(D_INFO, "found existing data for UUID '%s' at #%d\n",
294                        mcd->mcd_uuid, mci->mci_off);
295         }
296         /* mcd_last_xid is is stored in little endian on the disk and
297            mds_pack_rep_body converts it to network order */
298         body->last_xid = le32_to_cpu(mcd->mcd_last_xid);
299         mds_pack_rep_body(req);
300         RETURN(0);
301 }
302
303 static int mds_lovinfo(struct ptlrpc_request *req)
304 {
305         struct mds_status_req *streq;
306         struct lov_desc *desc; 
307         int rc, size[2] = {sizeof(*desc)};
308         ENTRY;
309
310         streq = lustre_msg_buf(req->rq_reqmsg, 0); 
311         streq->flags = NTOH__u32(streq->flags); 
312         streq->repbuf = NTOH__u32(streq->repbuf); 
313         size[1] = streq->repbuf;
314
315         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
316         if (rc) { 
317                 CERROR("mds: out of memory for message: size=%d\n", size[1]);
318                 req->rq_status = -ENOMEM;
319                 RETURN(0);
320         }
321
322         desc = lustre_msg_buf(req->rq_repmsg, 0); 
323         rc = mds_get_lovdesc(req->rq_obd, desc);
324         if (rc != 0 ) { 
325                 CERROR("get_lovdesc error %d", rc);
326                 req->rq_status = rc;
327                 RETURN(0);
328         }
329
330         if (desc->ld_tgt_count * sizeof(uuid_t) > streq->repbuf) { 
331                 CERROR("too many targets, enlarge client buffers\n");
332                 req->rq_status = -ENOSPC;
333                 RETURN(0);
334         }
335
336         rc = mds_get_lovtgts(req->rq_obd, lustre_msg_buf(req->rq_repmsg, 1));
337         if (rc) { 
338                 CERROR("get_lovtgts error %d", rc);
339                 req->rq_status = rc;
340                 RETURN(0);
341         }
342         RETURN(0);
343 }
344
345 int mds_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
346                       void *data, int data_len, struct ptlrpc_request **reqp)
347 {
348         ENTRY;
349
350         if (desc == NULL) {
351                 /* Completion AST.  Do nothing */
352                 RETURN(0);
353         }
354
355         if (ldlm_cli_cancel(lockh) < 0)
356                 LBUG();
357         RETURN(0);
358 }
359
360 static int mds_getattr_name(int offset, struct ptlrpc_request *req)
361 {
362         struct mds_obd *mds = mds_req2mds(req);
363         struct obd_run_ctxt saved;
364         struct mds_body *body;
365         struct dentry *de = NULL, *dchild = NULL;
366         struct inode *dir;
367         struct lustre_handle lockh;
368         char *name;
369         int namelen, flags, lock_mode, rc = 0;
370         __u64 res_id[3] = {0, 0, 0};
371         ENTRY;
372
373         if (strcmp(req->rq_export->export_obd->obd_type->typ_name, "mds") != 0)
374                 LBUG();
375
376         if (req->rq_reqmsg->bufcount <= offset + 1) {
377                 LBUG();
378                 GOTO(out_pre_de, rc = -EINVAL);
379         }
380
381         body = lustre_msg_buf(req->rq_reqmsg, offset);
382         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
383         namelen = req->rq_reqmsg->buflens[offset + 1];
384         /* requests were at offset 2, replies go back at 1 */
385         if (offset)
386                 offset = 1;
387
388         push_ctxt(&saved, &mds->mds_ctxt);
389         de = mds_fid2dentry(mds, &body->fid1, NULL);
390         if (IS_ERR(de)) {
391                 LBUG();
392                 GOTO(out_pre_de, rc = -ESTALE);
393         }
394
395         dir = de->d_inode;
396         CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
397
398         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
399         res_id[0] = dir->i_ino;
400
401         rc = ldlm_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
402                              NULL, 0, lock_mode, &lockh);
403         if (rc == 0) {
404                 LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
405                 rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
406                                       (struct lustre_handle *)&mds->mds_connh, 
407                                       NULL, mds->mds_local_namespace, NULL,
408                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
409                                       &flags, (void *)mds_lock_callback,
410                                       NULL, 0, &lockh);
411                 if (rc != ELDLM_OK) {
412                         CERROR("lock enqueue: err: %d\n", rc);
413                         GOTO(out_create_de, rc = -EIO);
414                 }
415         }
416         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
417
418         down(&dir->i_sem);
419         dchild = lookup_one_len(name, de, namelen - 1);
420         if (IS_ERR(dchild)) {
421                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
422                 up(&dir->i_sem);
423                 LBUG();
424                 GOTO(out_create_dchild, rc = -ESTALE);
425         }
426
427         if (dchild->d_inode) {
428                 struct mds_body *body;
429                 struct obdo *obdo;
430                 struct inode *inode = dchild->d_inode;
431                 CERROR("child exists (dir %ld, name %s, ino %ld)\n",
432                        dir->i_ino, name, dchild->d_inode->i_ino);
433
434                 body = lustre_msg_buf(req->rq_repmsg, offset);
435                 mds_pack_inode2fid(&body->fid1, inode);
436                 mds_pack_inode2body(body, inode);
437                 if (S_ISREG(inode->i_mode)) {
438                         obdo = lustre_msg_buf(req->rq_repmsg, offset + 1);
439                         mds_fs_get_obdo(mds, inode, obdo);
440                 }
441                 /* now a normal case for intent locking */
442                 rc = 0;
443         } else
444                 rc = -ENOENT;
445
446         EXIT;
447 out_create_dchild:
448         l_dput(dchild);
449         up(&dir->i_sem);
450         ldlm_lock_decref(&lockh, lock_mode);
451 out_create_de:
452         l_dput(de);
453 out_pre_de:
454         req->rq_status = rc;
455         pop_ctxt(&saved);
456         return 0;
457 }
458
459
460 static int mds_getattr(int offset, struct ptlrpc_request *req)
461 {
462         struct mds_obd *mds = mds_req2mds(req);
463         struct obd_run_ctxt saved;
464         struct dentry *de;
465         struct inode *inode;
466         struct mds_body *body;
467         int rc, size[2] = {sizeof(*body)}, bufcount = 1;
468         ENTRY;
469
470         body = lustre_msg_buf(req->rq_reqmsg, offset);
471         push_ctxt(&saved, &mds->mds_ctxt);
472         de = mds_fid2dentry(mds, &body->fid1, NULL);
473         if (IS_ERR(de)) {
474                 GOTO(out_pop, rc = -ENOENT);
475         }
476
477         inode = de->d_inode;
478         if (S_ISREG(body->fid1.f_type)) {
479                 bufcount = 2;
480                 size[1] = sizeof(struct obdo);
481         } else if (body->valid & OBD_MD_LINKNAME) {
482                 bufcount = 2;
483                 size[1] = inode->i_size;
484         }
485
486         rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
487                              &req->rq_repmsg);
488         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
489                 CERROR("mds: out of memory\n");
490                 GOTO(out, rc);
491         }
492
493         if (body->valid & OBD_MD_LINKNAME) {
494                 char *tmp = lustre_msg_buf(req->rq_repmsg, 1);
495
496                 rc = inode->i_op->readlink(de, tmp, size[1]);
497
498                 if (rc < 0) {
499                         CERROR("readlink failed: %d\n", rc);
500                         GOTO(out, rc);
501                 }
502         }
503
504         body = lustre_msg_buf(req->rq_repmsg, 0);
505         body->ino = inode->i_ino;
506         body->generation = inode->i_generation;
507         body->atime = inode->i_atime;
508         body->ctime = inode->i_ctime;
509         body->mtime = inode->i_mtime;
510         body->uid = inode->i_uid;
511         body->gid = inode->i_gid;
512         body->size = inode->i_size;
513         body->mode = inode->i_mode;
514         body->nlink = inode->i_nlink;
515         body->valid = ~0; /* FIXME: should be more selective */
516
517         if (S_ISREG(inode->i_mode)) {
518                 rc = mds_fs_get_obdo(mds, inode,
519                                      lustre_msg_buf(req->rq_repmsg, 1));
520                 if (rc < 0) {
521                         CERROR("mds_fs_get_obdo failed: %d\n", rc);
522                         GOTO(out, rc);
523                 }
524         }
525 out:
526         l_dput(de);
527 out_pop:
528         pop_ctxt(&saved);
529         req->rq_status = rc;
530         RETURN(0);
531 }
532
533 static int mds_statfs(struct ptlrpc_request *req)
534 {
535         struct mds_obd *mds = mds_req2mds(req);
536         struct obd_statfs *osfs;
537         struct statfs sfs;
538         int rc, size = sizeof(*osfs);
539         ENTRY;
540
541         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
542                              &req->rq_repmsg);
543         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
544                 CERROR("mds: statfs lustre_pack_msg failed: rc = %d\n", rc);
545                 GOTO(out, rc);
546         }
547
548         rc = vfs_statfs(mds->mds_sb, &sfs);
549         if (rc) {
550                 CERROR("mds: statfs failed: rc %d\n", rc);
551                 GOTO(out, rc);
552         }
553         osfs = lustre_msg_buf(req->rq_repmsg, 0);
554         memset(osfs, 0, size);
555         obd_statfs_pack(osfs, &sfs);
556
557 out:
558         req->rq_status = rc;
559         RETURN(0);
560 }
561
562 static int mds_open(struct ptlrpc_request *req)
563 {
564         struct dentry *de;
565         struct mds_body *body;
566         struct file *file;
567         struct vfsmount *mnt;
568         struct mds_obd *mds = mds_req2mds(req);
569         struct mds_client_info *mci;
570         __u32 flags;
571         struct list_head *tmp;
572         struct mds_file_data *mfd;
573         int rc, size = sizeof(*body);
574         ENTRY;
575
576         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
577         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
578                 CERROR("mds: out of memory\n");
579                 req->rq_status = -ENOMEM;
580                 RETURN(0);
581         }
582
583         mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
584         if (!mci) {
585                 CERROR("mds: no mci!\n");
586                 req->rq_status = -ENOTCONN;
587                 RETURN(0);
588         }
589
590         body = lustre_msg_buf(req->rq_reqmsg, 0);
591
592         /* was this animal open already? */
593         /* XXX we should only check on re-open, or do a refcount... */
594         list_for_each(tmp, &mci->mci_open_head) {
595                 struct mds_file_data *fd;
596                 fd = list_entry(tmp, struct mds_file_data, mfd_list);
597                 if (body->extra == fd->mfd_clientfd &&
598                     body->fid1.id == fd->mfd_file->f_dentry->d_inode->i_ino) {
599                         CERROR("Re opening %Ld\n", body->fid1.id);
600                         RETURN(0);
601                 }
602         }
603
604         OBD_ALLOC(mfd, sizeof(*mfd));
605         if (!mfd) {
606                 CERROR("mds: out of memory\n");
607                 req->rq_status = -ENOMEM;
608                 RETURN(0);
609         }
610
611         de = mds_fid2dentry(mds, &body->fid1, &mnt);
612         if (IS_ERR(de)) {
613                 req->rq_status = -ENOENT;
614                 RETURN(0);
615         }
616
617         /* check if this inode has seen a delayed object creation */
618         if (req->rq_reqmsg->bufcount > 1) {
619                 void *handle;
620                 struct inode *inode = de->d_inode;
621                 //struct iattr iattr;
622                 struct obdo *obdo;
623                 int rc;
624
625                 obdo = lustre_msg_buf(req->rq_reqmsg, 1);
626                 //iattr.ia_valid = ATTR_MODE;
627                 //iattr.ia_mode = inode->i_mode;
628
629                 handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR);
630                 if (!handle) {
631                         req->rq_status = -ENOMEM;
632                         RETURN(0);
633                 }
634
635                 /* XXX error handling */
636                 rc = mds_fs_set_obdo(mds, inode, handle, obdo);
637                 //                rc = mds_fs_setattr(mds, de, handle, &iattr);
638                 if (!rc) {
639                         struct obd_run_ctxt saved;
640                         push_ctxt(&saved, &mds->mds_ctxt);
641                         rc = mds_update_last_rcvd(mds, handle, req);
642                         pop_ctxt(&saved);
643                 } else {
644                         req->rq_status = rc;
645                         RETURN(0);
646                 }
647                 /* FIXME: need to return last_rcvd, last_committed */
648
649                 /* FIXME: keep rc intact */
650                 rc = mds_fs_commit(mds, de->d_inode, handle);
651                 if (rc) {
652                         req->rq_status = rc;
653                         RETURN(0);
654                 }
655         }
656
657         flags = body->flags;
658         file = dentry_open(de, mnt, flags & ~O_DIRECT);
659         if (!file || IS_ERR(file)) {
660                 req->rq_status = -EINVAL;
661                 OBD_FREE(mfd, sizeof(*mfd));
662                 RETURN(0);
663         }
664
665         file->private_data = mfd;
666         mfd->mfd_file = file;
667         mfd->mfd_clientfd = body->extra;
668         list_add(&mfd->mfd_list, &mci->mci_open_head);
669
670         body = lustre_msg_buf(req->rq_repmsg, 0);
671         body->extra = (__u64) (unsigned long)file;
672         RETURN(0);
673 }
674
675 static int mds_close(struct ptlrpc_request *req)
676 {
677         struct dentry *de;
678         struct mds_body *body;
679         struct file *file;
680         struct mds_obd *mds = mds_req2mds(req);
681         struct vfsmount *mnt;
682         struct mds_file_data *mfd;
683         int rc;
684         ENTRY;
685
686         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
687         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
688                 CERROR("mds: out of memory\n");
689                 req->rq_status = -ENOMEM;
690                 RETURN(0);
691         }
692
693         body = lustre_msg_buf(req->rq_reqmsg, 0);
694         de = mds_fid2dentry(mds, &body->fid1, &mnt);
695         if (IS_ERR(de)) {
696                 req->rq_status = -ENOENT;
697                 RETURN(0);
698         }
699
700         file = (struct file *)(unsigned long)body->extra;
701         if (!file->f_dentry)
702                 LBUG();
703         mfd = (struct mds_file_data *)file->private_data;
704         list_del(&mfd->mfd_list);
705         OBD_FREE(mfd, sizeof(*mfd));
706
707         req->rq_status = filp_close(file, 0);
708         l_dput(de);
709         mntput(mnt);
710
711         RETURN(0);
712 }
713
714 static int mds_readpage(struct ptlrpc_request *req)
715 {
716         struct mds_obd *mds = mds_req2mds(req);
717         struct vfsmount *mnt;
718         struct dentry *de;
719         struct file *file;
720         struct mds_body *body;
721         struct obd_run_ctxt saved;
722         int rc, size = sizeof(*body);
723         ENTRY;
724
725         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
726         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
727                 CERROR("mds: out of memory\n");
728                 GOTO(out, rc = -ENOMEM);
729         }
730
731         body = lustre_msg_buf(req->rq_reqmsg, 0);
732         push_ctxt(&saved, &mds->mds_ctxt);
733         de = mds_fid2dentry(mds, &body->fid1, &mnt);
734         if (IS_ERR(de))
735                 GOTO(out_pop, rc = PTR_ERR(de));
736
737         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
738
739         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
740         /* note: in case of an error, dentry_open puts dentry */
741         if (IS_ERR(file))
742                 GOTO(out_pop, rc = PTR_ERR(file));
743
744         /* to make this asynchronous make sure that the handling function
745            doesn't send a reply when this function completes. Instead a
746            callback function would send the reply */
747         rc = mds_sendpage(req, file, body->size);
748
749         filp_close(file, 0);
750 out_pop:
751         pop_ctxt(&saved);
752 out:
753         req->rq_status = rc;
754         RETURN(0);
755 }
756
757 int mds_reint(int offset, struct ptlrpc_request *req)
758 {
759         int rc;
760         struct mds_update_record rec;
761
762         rc = mds_update_unpack(req, offset, &rec);
763         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
764                 CERROR("invalid record\n");
765                 req->rq_status = -EINVAL;
766                 RETURN(0);
767         }
768         /* rc will be used to interrupt a for loop over multiple records */
769         rc = mds_reint_rec(&rec, offset, req);
770         return rc;
771 }
772
773 int mds_handle(struct ptlrpc_request *req)
774 {
775         int rc;
776         ENTRY;
777
778         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
779         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
780                 CERROR("lustre_mds: Invalid request\n");
781                 GOTO(out, rc);
782         }
783
784         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
785                 CERROR("lustre_mds: wrong packet type sent %d\n",
786                        req->rq_reqmsg->type);
787                 GOTO(out, rc = -EINVAL);
788         }
789
790         if (req->rq_reqmsg->opc != MDS_CONNECT &&
791             req->rq_export == NULL)
792                 GOTO(out, rc = -ENOTCONN);
793
794         if (strcmp(req->rq_obd->obd_type->typ_name, "mds") != 0)
795                 GOTO(out, rc = -EINVAL);
796
797         switch (req->rq_reqmsg->opc) {
798         case MDS_CONNECT:
799                 CDEBUG(D_INODE, "connect\n");
800                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
801                 rc = target_handle_connect(req);
802                 break;
803
804         case MDS_DISCONNECT:
805                 CDEBUG(D_INODE, "disconnect\n");
806                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
807                 rc = target_handle_disconnect(req);
808                 goto out;
809
810         case MDS_GETSTATUS:
811                 CDEBUG(D_INODE, "getstatus\n");
812                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
813                 rc = mds_getstatus(req);
814                 break;
815
816         case MDS_LOVINFO:
817                 CDEBUG(D_INODE, "lovinfo\n");
818                 rc = mds_lovinfo(req);
819                 break;
820
821         case MDS_GETATTR:
822                 CDEBUG(D_INODE, "getattr\n");
823                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
824                 rc = mds_getattr(0, req);
825                 break;
826
827         case MDS_STATFS:
828                 CDEBUG(D_INODE, "statfs\n");
829                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
830                 rc = mds_statfs(req);
831                 break;
832
833         case MDS_READPAGE:
834                 CDEBUG(D_INODE, "readpage\n");
835                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
836                 rc = mds_readpage(req);
837
838                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
839                         return 0;
840                 break;
841
842         case MDS_REINT: {
843                 int size = sizeof(struct mds_body);
844                 CDEBUG(D_INODE, "reint\n");
845                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
846
847                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
848                                      &req->rq_repmsg);
849                 if (rc) {
850                         rc = req->rq_status = -ENOMEM;
851                         break;
852                 }
853                 rc = mds_reint(0, req);
854                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
855                 break;
856         }
857
858         case MDS_OPEN:
859                 CDEBUG(D_INODE, "open\n");
860                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
861                 rc = mds_open(req);
862                 break;
863
864         case MDS_CLOSE:
865                 CDEBUG(D_INODE, "close\n");
866                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
867                 rc = mds_close(req);
868                 break;
869
870         default:
871                 rc = ptlrpc_error(req->rq_svc, req);
872                 RETURN(rc);
873         }
874
875         EXIT;
876
877         if (!rc) { 
878                 struct mds_obd *mds = mds_req2mds(req);
879                 req->rq_repmsg->last_rcvd = HTON__u64(mds->mds_last_rcvd);
880                 req->rq_repmsg->last_committed =
881                         HTON__u64(mds->mds_last_committed);
882                 CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu, xid %d\n",
883                        (unsigned long long)mds->mds_last_rcvd,
884                        (unsigned long long)mds->mds_last_committed,
885                        cpu_to_le32(req->rq_xid));
886         }
887  out:
888         /* Still not 100% sure whether we should reply with the server
889          * last_rcvd or that of this client.  I'm not sure it even makes
890          * a difference on a per-client basis, because last_rcvd is global
891          * and we are not supposed to allow transactions while in recovery.
892          */
893         if (rc) {
894                 CERROR("mds: processing error %d\n", rc);
895                 ptlrpc_error(req->rq_svc, req);
896         } else {
897                 CDEBUG(D_NET, "sending reply\n");
898                 ptlrpc_reply(req->rq_svc, req);
899         }
900         return 0;
901 }
902
903 /* Update the server data on disk.  This stores the new mount_count and
904  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
905  * then the server last_rcvd value may be less than that of the clients.
906  * This will alert us that we may need to do client recovery.
907  */
908 static
909 int mds_update_server_data(struct mds_obd *mds)
910 {
911         struct obd_run_ctxt saved;
912         struct mds_server_data *msd = mds->mds_server_data;
913         struct file *filp = mds->mds_rcvd_filp;
914         loff_t off = 0;
915         int rc;
916
917         msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
918         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
919
920         CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_rcvd is %Lu\n",
921                (unsigned long long)mds->mds_mount_count,
922                (unsigned long long)mds->mds_last_rcvd);
923         push_ctxt(&saved, &mds->mds_ctxt);
924         rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
925         if (rc != sizeof(*msd)) {
926                 CERROR("error writing MDS server data: rc = %d\n", rc);
927                 if (rc > 0)
928                         RETURN(-EIO);
929                 RETURN(rc);
930         }
931         rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
932         pop_ctxt(&saved);
933         if (rc)
934                 CERROR("error flushing MDS server data: rc = %d\n", rc);
935
936         return 0;
937 }
938
939 /* Do recovery actions for the MDS */
940 static int mds_recover(struct obd_device *obddev)
941 {
942         struct mds_obd *mds = &obddev->u.mds;
943         int rc;
944
945         /* This happens at the end when recovery is complete */
946         ++mds->mds_mount_count;
947         rc = mds_update_server_data(mds);
948
949         return rc;
950 }
951
952
953 /* mount the file system (secretly) */
954 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
955 {
956         struct obd_ioctl_data* data = buf;
957         struct obd_export *export;
958         struct mds_obd *mds = &obddev->u.mds;
959         struct vfsmount *mnt;
960         int rc = 0;
961         ENTRY;
962
963         MOD_INC_USE_COUNT;
964 #ifdef CONFIG_DEV_RDONLY
965         dev_clear_rdonly(2);
966 #endif
967         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
968                 GOTO(err_dec, rc = -EINVAL);
969
970         mds->mds_fstype = strdup(data->ioc_inlbuf2);
971
972         mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
973         if (IS_ERR(mnt)) {
974                 rc = PTR_ERR(mnt);
975                 CERROR("do_kern_mount failed: rc = %d\n", rc);
976                 GOTO(err_kfree, rc);
977         }
978
979         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
980         if (!mds->mds_sb)
981                 GOTO(err_put, rc = -ENODEV);
982
983         rc = mds_fs_setup(mds, mnt);
984         if (rc) {
985                 CERROR("MDS filesystem method init failed: rc = %d\n", rc);
986                 GOTO(err_put, rc);
987         }
988
989         mds->mds_service = ptlrpc_init_svc(64 * 1024, MDS_REQUEST_PORTAL,
990                                            MDC_REPLY_PORTAL, "self",mds_handle);
991         if (!mds->mds_service) {
992                 CERROR("failed to start service\n");
993                 GOTO(err_fs, rc = -EINVAL);
994         }
995
996         rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
997         if (rc) {
998                 CERROR("cannot start thread: rc = %d\n", rc);
999                 GOTO(err_svc, rc);
1000         }
1001
1002         rc = -ENOENT;
1003         mds->mds_ldlm_conn = ptlrpc_uuid_to_connection("self");
1004         if (!mds->mds_ldlm_conn) {
1005                 mds_cleanup(obddev);
1006                 GOTO(err_thread, rc);
1007         }
1008
1009         obddev->obd_namespace =
1010                 ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
1011         if (obddev->obd_namespace == NULL) {
1012                 LBUG();
1013                 mds_cleanup(obddev);
1014                 GOTO(err_thread, rc);
1015         }
1016
1017         mds->mds_local_namespace =
1018                 ldlm_namespace_new("mds_client", LDLM_NAMESPACE_CLIENT);
1019         if (mds->mds_local_namespace == NULL) {
1020                 LBUG();
1021                 mds_cleanup(obddev);
1022                 GOTO(err_thread, rc);
1023         }
1024
1025         OBD_ALLOC(mds->mds_ldlm_client, sizeof(*mds->mds_ldlm_client));
1026         if (mds->mds_ldlm_client == NULL) {
1027                 LBUG();
1028                 mds_cleanup(obddev);
1029                 GOTO(err_thread, rc);
1030         }
1031         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
1032                            mds->mds_ldlm_client);
1033         mds->mds_ldlm_client->cli_target_devno = obddev->obd_minor;
1034         mds->mds_ldlm_client->cli_name = "mds ldlm";
1035
1036         rc = mds_recover(obddev);
1037         if (rc)
1038                 GOTO(err_thread, rc);
1039
1040         rc = class_connect(&mds->mds_connh, obddev);
1041         if (rc)
1042                 GOTO(err_thread, rc);
1043         export = class_conn2export(&mds->mds_connh);
1044         if (!export)
1045                 LBUG();
1046         export->export_connection = mds->mds_ldlm_conn;
1047
1048         RETURN(0);
1049
1050         
1051
1052 err_thread:
1053         ptlrpc_stop_all_threads(mds->mds_service);
1054 err_svc:
1055         ptlrpc_unregister_service(mds->mds_service);
1056 err_fs:
1057         mds_fs_cleanup(mds);
1058 err_put:
1059         unlock_kernel();
1060         mntput(mds->mds_vfsmnt);
1061         mds->mds_sb = 0;
1062         lock_kernel();
1063 err_kfree:
1064         kfree(mds->mds_fstype);
1065 err_dec:
1066         MOD_DEC_USE_COUNT;
1067         return rc;
1068 }
1069
1070 static int mds_cleanup(struct obd_device * obddev)
1071 {
1072         struct super_block *sb;
1073         struct mds_obd *mds = &obddev->u.mds;
1074
1075         ENTRY;
1076         class_disconnect(&mds->mds_connh);
1077
1078
1079         if ( !list_empty(&obddev->obd_exports) ) {
1080                 CERROR("still has exports!\n");
1081                 RETURN(-EBUSY);
1082         }
1083
1084         ptlrpc_stop_all_threads(mds->mds_service);
1085         ptlrpc_unregister_service(mds->mds_service);
1086
1087         sb = mds->mds_sb;
1088         if (!mds->mds_sb)
1089                 RETURN(0);
1090
1091         mds_update_server_data(mds);
1092
1093         if (mds->mds_rcvd_filp) {
1094                 int rc = filp_close(mds->mds_rcvd_filp, 0);
1095                 mds->mds_rcvd_filp = NULL;
1096
1097                 if (rc)
1098                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
1099         }
1100
1101         unlock_kernel();
1102         mntput(mds->mds_vfsmnt);
1103         mds->mds_sb = 0;
1104         kfree(mds->mds_fstype);
1105
1106         ldlm_namespace_free(mds->mds_local_namespace);
1107         ldlm_namespace_free(obddev->obd_namespace);
1108
1109         if (mds->mds_ldlm_conn != NULL)
1110                 ptlrpc_put_connection(mds->mds_ldlm_conn);
1111
1112         OBD_FREE(mds->mds_ldlm_client, sizeof(*mds->mds_ldlm_client));
1113
1114         lock_kernel();
1115 #ifdef CONFIG_DEV_RDONLY
1116         dev_clear_rdonly(2);
1117 #endif
1118         mds_fs_cleanup(mds);
1119
1120         MOD_DEC_USE_COUNT;
1121         RETURN(0);
1122 }
1123
1124 extern int mds_iocontrol(long cmd, struct lustre_handle *conn, 
1125                           int len, void *karg, void *uarg);
1126
1127 /* use obd ops to offer management infrastructure */
1128 static struct obd_ops mds_obd_ops = {
1129         o_connect:     mds_connect,
1130         o_disconnect:  mds_disconnect,
1131         o_setup:       mds_setup,
1132         o_cleanup:     mds_cleanup,
1133         o_iocontrol:   mds_iocontrol
1134 };
1135
1136 static int __init mds_init(void)
1137 {
1138         inter_module_register("mds_reint", THIS_MODULE, &mds_reint);
1139         inter_module_register("mds_getattr_name", THIS_MODULE,
1140                               &mds_getattr_name);
1141         class_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
1142         return 0;
1143 }
1144
1145 static void __exit mds_exit(void)
1146 {
1147         inter_module_unregister("mds_reint");
1148         inter_module_unregister("mds_getattr_name");
1149         class_unregister_type(LUSTRE_MDS_NAME);
1150 }
1151
1152 MODULE_AUTHOR("Cluster File Systems <info@clusterfs.com>");
1153 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
1154 MODULE_LICENSE("GPL");
1155
1156 module_init(mds_init);
1157 module_exit(mds_exit);