Whamcloud - gitweb
Add module refcounts for filesystem interface modules.
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *
6  *  Lustre Metadata Server (mds) request handler
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com> &
14  *     Andreas Dilger <braam@clusterfs.com>
15  *
16  *  This server is single threaded at present (but can easily be multi threaded)
17  *
18  */
19
20 #define EXPORT_SYMTAB
21 #define DEBUG_SUBSYSTEM S_MDS
22
23 #include <linux/module.h>
24 #include <linux/lustre_mds.h>
25 #include <linux/lustre_dlm.h>
26 extern int mds_get_lovtgts(struct obd_device *obd, int tgt_count,
27                            uuid_t *uuidarray);
28 extern int mds_get_lovdesc(struct obd_device *obd, struct lov_desc *desc);
29 extern int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
30                                 struct ptlrpc_request *req);
31 static int mds_cleanup(struct obd_device * obddev);
32
33 inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
34 {
35         return &req->rq_export->exp_obd->u.mds;
36 }
37
38 /* Assumes caller has already pushed into the kernel filesystem context */
39 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
40                         __u64 offset)
41 {
42         int rc = 0;
43         struct mds_obd *mds = mds_req2mds(req); 
44         struct ptlrpc_bulk_desc *desc;
45         struct ptlrpc_bulk_page *bulk;
46         char *buf;
47         ENTRY;
48
49         desc = ptlrpc_prep_bulk(req->rq_connection);
50         if (desc == NULL)
51                 GOTO(out, rc = -ENOMEM);
52
53         bulk = ptlrpc_prep_bulk_page(desc);
54         if (bulk == NULL)
55                 GOTO(cleanup_bulk, rc = -ENOMEM);
56
57         OBD_ALLOC(buf, PAGE_SIZE);
58         if (buf == NULL)
59                 GOTO(cleanup_bulk, rc = -ENOMEM);
60
61         rc = mds_fs_readpage(mds, file, buf, PAGE_SIZE, (loff_t *)&offset);
62
63         if (rc != PAGE_SIZE)
64                 GOTO(cleanup_buf, rc = -EIO);
65
66         bulk->b_xid = req->rq_xid;
67         bulk->b_buf = buf;
68         bulk->b_buflen = PAGE_SIZE;
69         desc->b_portal = MDS_BULK_PORTAL;
70
71         rc = ptlrpc_send_bulk(desc);
72         if (rc)
73                 GOTO(cleanup_buf, rc);
74
75         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
76                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
77                        OBD_FAIL_MDS_SENDPAGE, rc);
78                 ptlrpc_abort_bulk(desc);
79                 GOTO(cleanup_buf, rc);
80         }
81
82         wait_event(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
83         if (desc->b_flags & PTL_RPC_FL_INTR)
84                 GOTO(cleanup_buf, rc = -EINTR);
85
86         EXIT;
87  cleanup_buf:
88         OBD_FREE(buf, PAGE_SIZE);
89  cleanup_bulk:
90         ptlrpc_free_bulk(desc);
91  out:
92         return rc;
93 }
94
95 /* 'dir' is a inode for which a lock has already been taken */
96 struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir,
97                                       struct vfsmount **mnt, char *name,
98                                       int namelen, int lock_mode,
99                                       struct lustre_handle *lockh, 
100                                       int dir_lock_mode)
101 {
102         struct dentry *dchild;
103         int flags, rc;
104         __u64 res_id[3] = {0};
105         ENTRY;
106
107         down(&dir->d_inode->i_sem);
108         dchild = lookup_one_len(name, dir, namelen);
109         if (IS_ERR(dchild)) {
110                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
111                 up(&dir->d_inode->i_sem);
112                 LBUG();
113         }
114         if (dir_lock_mode != LCK_EX && dir_lock_mode != LCK_PW) { 
115                 up(&dir->d_inode->i_sem);
116                 ldlm_lock_decref(lockh, dir_lock_mode); 
117         }
118
119         if (lock_mode == 0 || !dchild->d_inode)
120                 RETURN(dchild);
121
122         res_id[0] = dchild->d_inode->i_ino;
123         rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
124                                    (struct lustre_handle *)&mds->mds_connh,
125                                    NULL, mds->mds_local_namespace, NULL,
126                                    res_id, LDLM_PLAIN, NULL, 0, lock_mode,
127                                    &flags, (void *)mds_lock_callback, NULL,
128                                    0, lockh);
129         if (rc != ELDLM_OK) {
130                 l_dput(dchild);
131                 RETURN(NULL);
132         }
133
134         RETURN(dchild);
135 }
136
137 struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid,
138                                      struct vfsmount **mnt, int lock_mode,
139                                      struct lustre_handle *lockh)
140 {
141         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
142         int flags, rc;
143         __u64 res_id[3] = {0};
144         ENTRY;
145
146         if (IS_ERR(de))
147                 RETURN(de);
148
149         res_id[0] = de->d_inode->i_ino;
150         rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
151                                    (struct lustre_handle *)&mds->mds_connh,
152                                    NULL, mds->mds_local_namespace, NULL,
153                                    res_id, LDLM_PLAIN, NULL, 0, lock_mode,
154                                    &flags, (void *)mds_lock_callback, NULL,
155                                    0, lockh);
156         if (rc != ELDLM_OK) {
157                 l_dput(de);
158                 retval = NULL;
159         }
160
161         RETURN(retval);
162 }
163
164 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
165                               struct vfsmount **mnt)
166 {
167         /* stolen from NFS */
168         struct super_block *sb = mds->mds_sb;
169         unsigned long ino = fid->id;
170         __u32 generation = fid->generation;
171         struct inode *inode;
172         struct list_head *lp;
173         struct dentry *result;
174
175         if (ino == 0)
176                 RETURN(ERR_PTR(-ESTALE));
177
178         inode = iget(sb, ino);
179         if (inode == NULL)
180                 RETURN(ERR_PTR(-ENOMEM));
181
182         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
183
184         if (is_bad_inode(inode) ||
185             (generation && inode->i_generation != generation)) {
186                 /* we didn't find the right inode.. */
187                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
188                        inode->i_ino, inode->i_nlink,
189                        atomic_read(&inode->i_count), inode->i_generation,
190                        generation);
191                 LBUG();
192                 iput(inode);
193                 RETURN(ERR_PTR(-ESTALE));
194         }
195
196         /* now to find a dentry.
197          * If possible, get a well-connected one
198          */
199         if (mnt)
200                 *mnt = mds->mds_vfsmnt;
201         spin_lock(&dcache_lock);
202         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
203                 result = list_entry(lp,struct dentry, d_alias);
204                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
205                         dget_locked(result);
206                         result->d_vfs_flags |= DCACHE_REFERENCED;
207                         spin_unlock(&dcache_lock);
208                         iput(inode);
209                         if (mnt)
210                                 mntget(*mnt);
211                         return result;
212                 }
213         }
214         spin_unlock(&dcache_lock);
215         result = d_alloc_root(inode);
216         if (result == NULL) {
217                 iput(inode);
218                 return ERR_PTR(-ENOMEM);
219         }
220         if (mnt)
221                 mntget(*mnt);
222         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
223         return result;
224 }
225
226 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd)
227 {
228         int rc;
229
230         MOD_INC_USE_COUNT;
231         rc = class_connect(conn, obd);
232
233         if (rc)
234                 MOD_DEC_USE_COUNT;
235
236         return rc;
237 }
238
239 static int mds_disconnect(struct lustre_handle *conn)
240 {
241         int rc;
242
243         rc = class_disconnect(conn);
244         if (!rc)
245                 MOD_DEC_USE_COUNT;
246
247         return rc;
248 }
249
250 /* FIXME: the error cases need fixing to avoid leaks */
251 static int mds_getstatus(struct ptlrpc_request *req)
252 {
253         struct mds_obd *mds = mds_req2mds(req);
254         struct mds_body *body;
255         struct mds_client_info *mci;
256         struct mds_client_data *mcd;
257         int rc, size = sizeof(*body);
258         ENTRY;
259
260         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
261         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
262                 CERROR("mds: out of memory for message: size=%d\n", size);
263                 req->rq_status = -ENOMEM;
264                 RETURN(0);
265         }
266
267         body = lustre_msg_buf(req->rq_reqmsg, 0);
268         mds_unpack_body(body);
269
270         /* Anything we need to do here with the client's trans no or so? */
271         body = lustre_msg_buf(req->rq_repmsg, 0);
272         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
273
274         mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
275         if (!mci) {
276                 /* We don't have any old connection data for this client */
277                 int rc;
278
279                 CDEBUG(D_INFO, "allocating new client data for UUID '%s'",
280                        ptlrpc_req_to_uuid(req));
281
282                 OBD_ALLOC(mcd, sizeof(*mcd));
283                 if (!mcd) {
284                         CERROR("mds: out of memory for client data\n");
285                         req->rq_status = -ENOMEM;
286                         RETURN(0);
287                 }
288                 memcpy(mcd->mcd_uuid, ptlrpc_req_to_uuid(req),
289                        sizeof(mcd->mcd_uuid));
290                 rc = mds_client_add(mds, mcd, -1);
291                 if (rc) {
292                         req->rq_status = rc;
293                         RETURN(0);
294                 }
295         } else {
296                 /* We have old connection data for this client... */
297                 mcd = mci->mci_mcd;
298                 CDEBUG(D_INFO, "found existing data for UUID '%s' at #%d\n",
299                        mcd->mcd_uuid, mci->mci_off);
300         }
301         /* mcd_last_xid is is stored in little endian on the disk and
302            mds_pack_rep_body converts it to network order */
303         body->last_xid = le32_to_cpu(mcd->mcd_last_xid);
304         mds_pack_rep_body(req);
305         RETURN(0);
306 }
307
308 static int mds_getlovinfo(struct ptlrpc_request *req)
309 {
310         struct mds_obd *mds = mds_req2mds(req);
311         struct mds_status_req *streq;
312         struct lov_desc *desc; 
313         int tgt_count;
314         int rc, size[2] = {sizeof(*desc)};
315         ENTRY;
316
317         streq = lustre_msg_buf(req->rq_reqmsg, 0); 
318         streq->flags = NTOH__u32(streq->flags); 
319         streq->repbuf = NTOH__u32(streq->repbuf); 
320         size[1] = streq->repbuf;
321
322         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
323         if (rc) { 
324                 CERROR("mds: out of memory for message: size=%d\n", size[1]);
325                 req->rq_status = -ENOMEM;
326                 RETURN(0);
327         }
328
329         desc = lustre_msg_buf(req->rq_repmsg, 0); 
330         rc = mds_get_lovdesc(req->rq_obd, desc);
331         if (rc != 0 ) { 
332                 CERROR("mds_get_lovdesc error %d", rc);
333                 req->rq_status = rc;
334                 RETURN(0);
335         }
336
337         tgt_count = NTOH__u32(desc->ld_tgt_count);
338         if (tgt_count * sizeof(uuid_t) > streq->repbuf) {
339                 CERROR("too many targets, enlarge client buffers\n");
340                 req->rq_status = -ENOSPC;
341                 RETURN(0);
342         }
343
344         mds->mds_max_mdsize = sizeof(desc) + tgt_count * sizeof(uuid_t);
345         rc = mds_get_lovtgts(req->rq_obd, tgt_count,
346                              lustre_msg_buf(req->rq_repmsg, 1));
347         if (rc) {
348                 CERROR("get_lovtgts error %d", rc);
349                 req->rq_status = rc;
350                 RETURN(0);
351         }
352         RETURN(0);
353 }
354
355 int mds_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
356                       void *data, int data_len, struct ptlrpc_request **reqp)
357 {
358         ENTRY;
359
360         if (desc == NULL) {
361                 /* Completion AST.  Do nothing */
362                 RETURN(0);
363         }
364
365         if (ldlm_cli_cancel(lockh) < 0)
366                 LBUG();
367         RETURN(0);
368 }
369
370 static int mds_getattr_name(int offset, struct ptlrpc_request *req)
371 {
372         struct mds_obd *mds = mds_req2mds(req);
373         struct obd_run_ctxt saved;
374         struct mds_body *body;
375         struct dentry *de = NULL, *dchild = NULL;
376         struct inode *dir;
377         struct lustre_handle lockh;
378         char *name;
379         int namelen, flags, lock_mode, rc = 0;
380         __u64 res_id[3] = {0, 0, 0};
381         ENTRY;
382
383         if (strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds") != 0)
384                 LBUG();
385
386         if (req->rq_reqmsg->bufcount <= offset + 1) {
387                 LBUG();
388                 GOTO(out_pre_de, rc = -EINVAL);
389         }
390
391         body = lustre_msg_buf(req->rq_reqmsg, offset);
392         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
393         namelen = req->rq_reqmsg->buflens[offset + 1];
394         /* requests were at offset 2, replies go back at 1 */
395         if (offset)
396                 offset = 1;
397
398         push_ctxt(&saved, &mds->mds_ctxt);
399         de = mds_fid2dentry(mds, &body->fid1, NULL);
400         if (IS_ERR(de)) {
401                 LBUG();
402                 GOTO(out_pre_de, rc = -ESTALE);
403         }
404
405         dir = de->d_inode;
406         CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
407
408         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
409         res_id[0] = dir->i_ino;
410
411         rc = ldlm_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
412                              NULL, 0, lock_mode, &lockh);
413         if (rc == 0) {
414                 LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
415                 rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
416                                       (struct lustre_handle *)&mds->mds_connh, 
417                                       NULL, mds->mds_local_namespace, NULL,
418                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
419                                       &flags, (void *)mds_lock_callback,
420                                       NULL, 0, &lockh);
421                 if (rc != ELDLM_OK) {
422                         CERROR("lock enqueue: err: %d\n", rc);
423                         GOTO(out_create_de, rc = -EIO);
424                 }
425         }
426         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
427
428         down(&dir->i_sem);
429         dchild = lookup_one_len(name, de, namelen - 1);
430         if (IS_ERR(dchild)) {
431                 CDEBUG(D_INODE, "child lookup error %ld\n", PTR_ERR(dchild));
432                 up(&dir->i_sem);
433                 LBUG();
434                 GOTO(out_create_dchild, rc = -ESTALE);
435         }
436
437         if (dchild->d_inode) {
438                 struct mds_body *body;
439                 struct inode *inode = dchild->d_inode;
440                 CDEBUG(D_INODE, "child exists (dir %ld, name %s, ino %ld)\n",
441                        dir->i_ino, name, dchild->d_inode->i_ino);
442
443                 body = lustre_msg_buf(req->rq_repmsg, offset);
444                 mds_pack_inode2fid(&body->fid1, inode);
445                 mds_pack_inode2body(body, inode);
446                 if (S_ISREG(inode->i_mode)) {
447                         struct lov_stripe_md *md;
448                         md = lustre_msg_buf(req->rq_repmsg, offset + 1);
449                         md->lmd_size = mds->mds_max_mdsize;
450                         mds_fs_get_md(mds, inode, md);
451                 }
452                 /* now a normal case for intent locking */
453                 rc = 0;
454         } else
455                 rc = -ENOENT;
456
457         EXIT;
458 out_create_dchild:
459         l_dput(dchild);
460         up(&dir->i_sem);
461         ldlm_lock_decref(&lockh, lock_mode);
462 out_create_de:
463         l_dput(de);
464 out_pre_de:
465         req->rq_status = rc;
466         pop_ctxt(&saved);
467         return 0;
468 }
469
470
471 static int mds_getattr(int offset, struct ptlrpc_request *req)
472 {
473         struct mds_obd *mds = mds_req2mds(req);
474         struct obd_run_ctxt saved;
475         struct dentry *de;
476         struct inode *inode;
477         struct mds_body *body;
478         int rc, size[2] = {sizeof(*body)}, bufcount = 1;
479         ENTRY;
480
481         body = lustre_msg_buf(req->rq_reqmsg, offset);
482         push_ctxt(&saved, &mds->mds_ctxt);
483         de = mds_fid2dentry(mds, &body->fid1, NULL);
484         if (IS_ERR(de)) {
485                 GOTO(out_pop, rc = -ENOENT);
486         }
487
488         inode = de->d_inode;
489         if (S_ISREG(body->fid1.f_type)) {
490                 bufcount = 2;
491                 size[1] = mds->mds_max_mdsize;
492         } else if (body->valid & OBD_MD_LINKNAME) {
493                 bufcount = 2;
494                 size[1] = inode->i_size;
495         }
496
497         rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
498                              &req->rq_repmsg);
499         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
500                 CERROR("mds: out of memory\n");
501                 GOTO(out, rc);
502         }
503
504         if (body->valid & OBD_MD_LINKNAME) {
505                 char *tmp = lustre_msg_buf(req->rq_repmsg, 1);
506
507                 rc = inode->i_op->readlink(de, tmp, size[1]);
508
509                 if (rc < 0) {
510                         CERROR("readlink failed: %d\n", rc);
511                         GOTO(out, rc);
512                 }
513         }
514
515         body = lustre_msg_buf(req->rq_repmsg, 0);
516         body->ino = inode->i_ino;
517         body->generation = inode->i_generation;
518         body->atime = inode->i_atime;
519         body->ctime = inode->i_ctime;
520         body->mtime = inode->i_mtime;
521         body->uid = inode->i_uid;
522         body->gid = inode->i_gid;
523         body->size = inode->i_size;
524         body->mode = inode->i_mode;
525         body->nlink = inode->i_nlink;
526         body->valid = ~0; /* FIXME: should be more selective */
527
528         if (S_ISREG(inode->i_mode)) {
529                 rc = mds_fs_get_md(mds, inode,
530                                      lustre_msg_buf(req->rq_repmsg, 1));
531                 if (rc < 0) {
532                         CERROR("mds_fs_get_md failed: %d\n", rc);
533                         GOTO(out, rc);
534                 }
535         }
536 out:
537         l_dput(de);
538 out_pop:
539         pop_ctxt(&saved);
540         req->rq_status = rc;
541         RETURN(0);
542 }
543
544 static int mds_statfs(struct ptlrpc_request *req)
545 {
546         struct mds_obd *mds = mds_req2mds(req);
547         struct obd_statfs *osfs;
548         struct statfs sfs;
549         int rc, size = sizeof(*osfs);
550         ENTRY;
551
552         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
553                              &req->rq_repmsg);
554         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
555                 CERROR("mds: statfs lustre_pack_msg failed: rc = %d\n", rc);
556                 GOTO(out, rc);
557         }
558
559         rc = vfs_statfs(mds->mds_sb, &sfs);
560         if (rc) {
561                 CERROR("mds: statfs failed: rc %d\n", rc);
562                 GOTO(out, rc);
563         }
564         osfs = lustre_msg_buf(req->rq_repmsg, 0);
565         memset(osfs, 0, size);
566         obd_statfs_pack(osfs, &sfs);
567
568 out:
569         req->rq_status = rc;
570         RETURN(0);
571 }
572
573 static int mds_open(struct ptlrpc_request *req)
574 {
575         struct dentry *de;
576         struct mds_body *body;
577         struct file *file;
578         struct vfsmount *mnt;
579         struct mds_obd *mds = mds_req2mds(req);
580         struct mds_client_info *mci;
581         __u32 flags;
582         struct list_head *tmp;
583         struct mds_file_data *mfd;
584         int rc, size = sizeof(*body);
585         ENTRY;
586
587         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
588         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
589                 CERROR("mds: out of memory\n");
590                 req->rq_status = -ENOMEM;
591                 RETURN(0);
592         }
593
594         mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
595         if (!mci) {
596                 CERROR("mds: no mci!\n");
597                 req->rq_status = -ENOTCONN;
598                 RETURN(0);
599         }
600
601         body = lustre_msg_buf(req->rq_reqmsg, 0);
602
603         /* was this animal open already? */
604         /* XXX we should only check on re-open, or do a refcount... */
605         list_for_each(tmp, &mci->mci_open_head) {
606                 struct mds_file_data *fd;
607                 fd = list_entry(tmp, struct mds_file_data, mfd_list);
608                 if (body->extra == fd->mfd_clientfd &&
609                     body->fid1.id == fd->mfd_file->f_dentry->d_inode->i_ino) {
610                         CERROR("Re opening %Ld\n", body->fid1.id);
611                         RETURN(0);
612                 }
613         }
614
615         OBD_ALLOC(mfd, sizeof(*mfd));
616         if (!mfd) {
617                 CERROR("mds: out of memory\n");
618                 req->rq_status = -ENOMEM;
619                 RETURN(0);
620         }
621
622         de = mds_fid2dentry(mds, &body->fid1, &mnt);
623         if (IS_ERR(de)) {
624                 req->rq_status = -ENOENT;
625                 RETURN(0);
626         }
627
628         /* check if this inode has seen a delayed object creation */
629         if (req->rq_reqmsg->bufcount > 1) {
630                 void *handle;
631                 struct lov_stripe_md *md;
632                 struct inode *inode = de->d_inode;
633                 //struct iattr iattr;
634                 int rc;
635
636                 md = lustre_msg_buf(req->rq_reqmsg, 1);
637                 //iattr.ia_mode = inode->i_mode;
638
639                 handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR);
640                 if (!handle) {
641                         req->rq_status = -ENOMEM;
642                         RETURN(0);
643                 }
644
645                 /* XXX error handling */
646                 rc = mds_fs_set_md(mds, inode, handle, md);
647                 //                rc = mds_fs_setattr(mds, de, handle, &iattr);
648                 if (!rc) {
649                         struct obd_run_ctxt saved;
650                         push_ctxt(&saved, &mds->mds_ctxt);
651                         rc = mds_update_last_rcvd(mds, handle, req);
652                         pop_ctxt(&saved);
653                 } else {
654                         req->rq_status = rc;
655                         RETURN(0);
656                 }
657                 /* FIXME: need to return last_rcvd, last_committed */
658
659                 /* FIXME: keep rc intact */
660                 rc = mds_fs_commit(mds, de->d_inode, handle);
661                 if (rc) {
662                         req->rq_status = rc;
663                         RETURN(0);
664                 }
665         }
666
667         flags = body->flags;
668         file = dentry_open(de, mnt, flags & ~O_DIRECT);
669         if (!file || IS_ERR(file)) {
670                 req->rq_status = -EINVAL;
671                 OBD_FREE(mfd, sizeof(*mfd));
672                 RETURN(0);
673         }
674
675         file->private_data = mfd;
676         mfd->mfd_file = file;
677         mfd->mfd_clientfd = body->extra;
678         list_add(&mfd->mfd_list, &mci->mci_open_head);
679
680         body = lustre_msg_buf(req->rq_repmsg, 0);
681         body->extra = (__u64) (unsigned long)file;
682         RETURN(0);
683 }
684
685 static int mds_close(struct ptlrpc_request *req)
686 {
687         struct dentry *de;
688         struct mds_body *body;
689         struct file *file;
690         struct mds_obd *mds = mds_req2mds(req);
691         struct vfsmount *mnt;
692         struct mds_file_data *mfd;
693         int rc;
694         ENTRY;
695
696         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
697         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
698                 CERROR("mds: out of memory\n");
699                 req->rq_status = -ENOMEM;
700                 RETURN(0);
701         }
702
703         body = lustre_msg_buf(req->rq_reqmsg, 0);
704         de = mds_fid2dentry(mds, &body->fid1, &mnt);
705         if (IS_ERR(de)) {
706                 req->rq_status = -ENOENT;
707                 RETURN(0);
708         }
709
710         file = (struct file *)(unsigned long)body->extra;
711         if (!file->f_dentry)
712                 LBUG();
713         mfd = (struct mds_file_data *)file->private_data;
714         list_del(&mfd->mfd_list);
715         OBD_FREE(mfd, sizeof(*mfd));
716
717         req->rq_status = filp_close(file, 0);
718         l_dput(de);
719         mntput(mnt);
720
721         RETURN(0);
722 }
723
724 static int mds_readpage(struct ptlrpc_request *req)
725 {
726         struct mds_obd *mds = mds_req2mds(req);
727         struct vfsmount *mnt;
728         struct dentry *de;
729         struct file *file;
730         struct mds_body *body;
731         struct obd_run_ctxt saved;
732         int rc, size = sizeof(*body);
733         ENTRY;
734
735         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
736         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
737                 CERROR("mds: out of memory\n");
738                 GOTO(out, rc = -ENOMEM);
739         }
740
741         body = lustre_msg_buf(req->rq_reqmsg, 0);
742         push_ctxt(&saved, &mds->mds_ctxt);
743         de = mds_fid2dentry(mds, &body->fid1, &mnt);
744         if (IS_ERR(de))
745                 GOTO(out_pop, rc = PTR_ERR(de));
746
747         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
748
749         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
750         /* note: in case of an error, dentry_open puts dentry */
751         if (IS_ERR(file))
752                 GOTO(out_pop, rc = PTR_ERR(file));
753
754         /* to make this asynchronous make sure that the handling function
755            doesn't send a reply when this function completes. Instead a
756            callback function would send the reply */
757         rc = mds_sendpage(req, file, body->size);
758
759         filp_close(file, 0);
760 out_pop:
761         pop_ctxt(&saved);
762 out:
763         req->rq_status = rc;
764         RETURN(0);
765 }
766
767 int mds_reint(int offset, struct ptlrpc_request *req)
768 {
769         int rc;
770         struct mds_update_record rec;
771
772         rc = mds_update_unpack(req, offset, &rec);
773         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
774                 CERROR("invalid record\n");
775                 req->rq_status = -EINVAL;
776                 RETURN(0);
777         }
778         /* rc will be used to interrupt a for loop over multiple records */
779         rc = mds_reint_rec(&rec, offset, req);
780         return rc;
781 }
782
783 int mds_handle(struct ptlrpc_request *req)
784 {
785         int rc;
786         ENTRY;
787
788         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
789         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
790                 CERROR("lustre_mds: Invalid request\n");
791                 GOTO(out, rc);
792         }
793
794         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
795                 CERROR("lustre_mds: wrong packet type sent %d\n",
796                        req->rq_reqmsg->type);
797                 GOTO(out, rc = -EINVAL);
798         }
799
800         if (req->rq_reqmsg->opc != MDS_CONNECT &&
801             req->rq_export == NULL)
802                 GOTO(out, rc = -ENOTCONN);
803
804         if (strcmp(req->rq_obd->obd_type->typ_name, "mds") != 0)
805                 GOTO(out, rc = -EINVAL);
806
807         switch (req->rq_reqmsg->opc) {
808         case MDS_CONNECT:
809                 CDEBUG(D_INODE, "connect\n");
810                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
811                 rc = target_handle_connect(req);
812                 break;
813
814         case MDS_DISCONNECT:
815                 CDEBUG(D_INODE, "disconnect\n");
816                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
817                 rc = target_handle_disconnect(req);
818                 goto out;
819
820         case MDS_GETSTATUS:
821                 CDEBUG(D_INODE, "getstatus\n");
822                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
823                 rc = mds_getstatus(req);
824                 break;
825
826         case MDS_GETLOVINFO:
827                 CDEBUG(D_INODE, "getlovinfo\n");
828                 rc = mds_getlovinfo(req);
829                 break;
830
831         case MDS_GETATTR:
832                 CDEBUG(D_INODE, "getattr\n");
833                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
834                 rc = mds_getattr(0, req);
835                 break;
836
837         case MDS_STATFS:
838                 CDEBUG(D_INODE, "statfs\n");
839                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
840                 rc = mds_statfs(req);
841                 break;
842
843         case MDS_READPAGE:
844                 CDEBUG(D_INODE, "readpage\n");
845                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
846                 rc = mds_readpage(req);
847
848                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
849                         return 0;
850                 break;
851
852         case MDS_REINT: {
853                 int size = sizeof(struct mds_body);
854                 CDEBUG(D_INODE, "reint\n");
855                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
856
857                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
858                                      &req->rq_repmsg);
859                 if (rc) {
860                         rc = req->rq_status = -ENOMEM;
861                         break;
862                 }
863                 rc = mds_reint(0, req);
864                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
865                 break;
866         }
867
868         case MDS_OPEN:
869                 CDEBUG(D_INODE, "open\n");
870                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
871                 rc = mds_open(req);
872                 break;
873
874         case MDS_CLOSE:
875                 CDEBUG(D_INODE, "close\n");
876                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
877                 rc = mds_close(req);
878                 break;
879
880         default:
881                 rc = ptlrpc_error(req->rq_svc, req);
882                 RETURN(rc);
883         }
884
885         EXIT;
886
887         if (!rc) { 
888                 struct mds_obd *mds = mds_req2mds(req);
889                 req->rq_repmsg->last_rcvd = HTON__u64(mds->mds_last_rcvd);
890                 req->rq_repmsg->last_committed =
891                         HTON__u64(mds->mds_last_committed);
892                 CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu, xid %d\n",
893                        (unsigned long long)mds->mds_last_rcvd,
894                        (unsigned long long)mds->mds_last_committed,
895                        cpu_to_le32(req->rq_xid));
896         }
897  out:
898         /* Still not 100% sure whether we should reply with the server
899          * last_rcvd or that of this client.  I'm not sure it even makes
900          * a difference on a per-client basis, because last_rcvd is global
901          * and we are not supposed to allow transactions while in recovery.
902          */
903         if (rc) {
904                 CERROR("mds: processing error %d\n", rc);
905                 ptlrpc_error(req->rq_svc, req);
906         } else {
907                 CDEBUG(D_NET, "sending reply\n");
908                 ptlrpc_reply(req->rq_svc, req);
909         }
910         return 0;
911 }
912
913 /* Update the server data on disk.  This stores the new mount_count and
914  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
915  * then the server last_rcvd value may be less than that of the clients.
916  * This will alert us that we may need to do client recovery.
917  */
918 static
919 int mds_update_server_data(struct mds_obd *mds)
920 {
921         struct obd_run_ctxt saved;
922         struct mds_server_data *msd = mds->mds_server_data;
923         struct file *filp = mds->mds_rcvd_filp;
924         loff_t off = 0;
925         int rc;
926
927         msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
928         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
929
930         CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_rcvd is %Lu\n",
931                (unsigned long long)mds->mds_mount_count,
932                (unsigned long long)mds->mds_last_rcvd);
933         push_ctxt(&saved, &mds->mds_ctxt);
934         rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
935         if (rc != sizeof(*msd)) {
936                 CERROR("error writing MDS server data: rc = %d\n", rc);
937                 if (rc > 0)
938                         RETURN(-EIO);
939                 RETURN(rc);
940         }
941         rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
942         pop_ctxt(&saved);
943         if (rc)
944                 CERROR("error flushing MDS server data: rc = %d\n", rc);
945
946         return 0;
947 }
948
949 /* Do recovery actions for the MDS */
950 static int mds_recover(struct obd_device *obddev)
951 {
952         struct mds_obd *mds = &obddev->u.mds;
953         int rc;
954
955         /* This happens at the end when recovery is complete */
956         ++mds->mds_mount_count;
957         rc = mds_update_server_data(mds);
958
959         return rc;
960 }
961
962
963 /* mount the file system (secretly) */
964 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
965 {
966         struct obd_ioctl_data* data = buf;
967         struct obd_export *export;
968         struct mds_obd *mds = &obddev->u.mds;
969         struct vfsmount *mnt;
970         int rc = 0;
971         ENTRY;
972
973         MOD_INC_USE_COUNT;
974 #ifdef CONFIG_DEV_RDONLY
975         dev_clear_rdonly(2);
976 #endif
977         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
978                 GOTO(err_dec, rc = -EINVAL);
979
980         mds->mds_fstype = strdup(data->ioc_inlbuf2);
981
982         mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
983         if (IS_ERR(mnt)) {
984                 rc = PTR_ERR(mnt);
985                 CERROR("do_kern_mount failed: rc = %d\n", rc);
986                 GOTO(err_kfree, rc);
987         }
988
989         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
990         if (!mds->mds_sb)
991                 GOTO(err_put, rc = -ENODEV);
992
993         mds->mds_max_mdsize = sizeof(struct lov_stripe_md);
994         rc = mds_fs_setup(mds, mnt);
995         if (rc) {
996                 CERROR("MDS filesystem method init failed: rc = %d\n", rc);
997                 GOTO(err_put, rc);
998         }
999
1000         mds->mds_service = ptlrpc_init_svc(64 * 1024, MDS_REQUEST_PORTAL,
1001                                            MDC_REPLY_PORTAL, "self",mds_handle);
1002         if (!mds->mds_service) {
1003                 CERROR("failed to start service\n");
1004                 GOTO(err_fs, rc = -EINVAL);
1005         }
1006
1007         rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
1008         if (rc) {
1009                 CERROR("cannot start thread: rc = %d\n", rc);
1010                 GOTO(err_svc, rc);
1011         }
1012
1013         rc = -ENOENT;
1014         mds->mds_ldlm_conn = ptlrpc_uuid_to_connection("self");
1015         if (!mds->mds_ldlm_conn) {
1016                 mds_cleanup(obddev);
1017                 GOTO(err_thread, rc);
1018         }
1019
1020         obddev->obd_namespace =
1021                 ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
1022         if (obddev->obd_namespace == NULL) {
1023                 LBUG();
1024                 mds_cleanup(obddev);
1025                 GOTO(err_thread, rc);
1026         }
1027
1028         mds->mds_local_namespace =
1029                 ldlm_namespace_new("mds_client", LDLM_NAMESPACE_CLIENT);
1030         if (mds->mds_local_namespace == NULL) {
1031                 LBUG();
1032                 mds_cleanup(obddev);
1033                 GOTO(err_thread, rc);
1034         }
1035
1036         OBD_ALLOC(mds->mds_ldlm_client, sizeof(*mds->mds_ldlm_client));
1037         if (mds->mds_ldlm_client == NULL) {
1038                 LBUG();
1039                 mds_cleanup(obddev);
1040                 GOTO(err_thread, rc);
1041         }
1042         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
1043                            mds->mds_ldlm_client);
1044         mds->mds_ldlm_client->cli_target_devno = obddev->obd_minor;
1045         mds->mds_ldlm_client->cli_name = "mds ldlm";
1046
1047         rc = mds_recover(obddev);
1048         if (rc)
1049                 GOTO(err_thread, rc);
1050
1051         rc = class_connect(&mds->mds_connh, obddev);
1052         if (rc)
1053                 GOTO(err_thread, rc);
1054         export = class_conn2export(&mds->mds_connh);
1055         if (!export)
1056                 LBUG();
1057         export->exp_connection = mds->mds_ldlm_conn;
1058
1059         RETURN(0);
1060
1061         
1062
1063 err_thread:
1064         ptlrpc_stop_all_threads(mds->mds_service);
1065 err_svc:
1066         ptlrpc_unregister_service(mds->mds_service);
1067 err_fs:
1068         mds_fs_cleanup(mds);
1069 err_put:
1070         unlock_kernel();
1071         mntput(mds->mds_vfsmnt);
1072         mds->mds_sb = 0;
1073         lock_kernel();
1074 err_kfree:
1075         kfree(mds->mds_fstype);
1076 err_dec:
1077         MOD_DEC_USE_COUNT;
1078         return rc;
1079 }
1080
1081 static int mds_cleanup(struct obd_device * obddev)
1082 {
1083         struct super_block *sb;
1084         struct mds_obd *mds = &obddev->u.mds;
1085
1086         ENTRY;
1087         class_disconnect(&mds->mds_connh);
1088
1089
1090         if ( !list_empty(&obddev->obd_exports) ) {
1091                 CERROR("still has exports!\n");
1092                 RETURN(-EBUSY);
1093         }
1094
1095         ptlrpc_stop_all_threads(mds->mds_service);
1096         ptlrpc_unregister_service(mds->mds_service);
1097
1098         sb = mds->mds_sb;
1099         if (!mds->mds_sb)
1100                 RETURN(0);
1101
1102         mds_update_server_data(mds);
1103
1104         if (mds->mds_rcvd_filp) {
1105                 int rc = filp_close(mds->mds_rcvd_filp, 0);
1106                 mds->mds_rcvd_filp = NULL;
1107
1108                 if (rc)
1109                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
1110         }
1111
1112         unlock_kernel();
1113         mntput(mds->mds_vfsmnt);
1114         mds->mds_sb = 0;
1115         kfree(mds->mds_fstype);
1116
1117         ldlm_namespace_free(mds->mds_local_namespace);
1118         ldlm_namespace_free(obddev->obd_namespace);
1119
1120         if (mds->mds_ldlm_conn != NULL)
1121                 ptlrpc_put_connection(mds->mds_ldlm_conn);
1122
1123         OBD_FREE(mds->mds_ldlm_client, sizeof(*mds->mds_ldlm_client));
1124
1125         lock_kernel();
1126 #ifdef CONFIG_DEV_RDONLY
1127         dev_clear_rdonly(2);
1128 #endif
1129         mds_fs_cleanup(mds);
1130
1131         MOD_DEC_USE_COUNT;
1132         RETURN(0);
1133 }
1134
1135 extern int mds_iocontrol(long cmd, struct lustre_handle *conn, 
1136                           int len, void *karg, void *uarg);
1137
1138 /* use obd ops to offer management infrastructure */
1139 static struct obd_ops mds_obd_ops = {
1140         o_connect:     mds_connect,
1141         o_disconnect:  mds_disconnect,
1142         o_setup:       mds_setup,
1143         o_cleanup:     mds_cleanup,
1144         o_iocontrol:   mds_iocontrol
1145 };
1146
1147 static int __init mds_init(void)
1148 {
1149         inter_module_register("mds_reint", THIS_MODULE, &mds_reint);
1150         inter_module_register("mds_getattr_name", THIS_MODULE,
1151                               &mds_getattr_name);
1152         class_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
1153         return 0;
1154 }
1155
1156 static void __exit mds_exit(void)
1157 {
1158         inter_module_unregister("mds_reint");
1159         inter_module_unregister("mds_getattr_name");
1160         class_unregister_type(LUSTRE_MDS_NAME);
1161 }
1162
1163 MODULE_AUTHOR("Cluster File Systems <info@clusterfs.com>");
1164 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
1165 MODULE_LICENSE("GPL");
1166
1167 module_init(mds_init);
1168 module_exit(mds_exit);