Whamcloud - gitweb
4f5f6e3e4d556340af75e764bf57106d815232eb
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001, 2002 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of Lustre, http://www.lustre.org.
14  *
15  *   Lustre is free software; you can redistribute it and/or
16  *   modify it under the terms of version 2 of the GNU General Public
17  *   License as published by the Free Software Foundation.
18  *
19  *   Lustre is distributed in the hope that it will be useful,
20  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
21  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   GNU General Public License for more details.
23  *
24  *   You should have received a copy of the GNU General Public License
25  *   along with Lustre; if not, write to the Free Software
26  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_MDS
31
32 #include <linux/module.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_dlm.h>
35 #include <linux/init.h>
36 #include <linux/obd_class.h>
37 #include <linux/random.h>
38 #include <linux/locks.h>
39 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
40 #include <linux/buffer_head.h>
41 #endif
42 #include <linux/obd_lov.h>
43 #include <linux/lprocfs_status.h>
44
45 static kmem_cache_t *mds_file_cache;
46
47 extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count,
48                            obd_uuid_t *uuidarray);
49 extern int mds_get_lovdesc(struct mds_obd  *obd, struct lov_desc *desc);
50 extern void mds_start_transno(struct mds_obd *mds);
51 extern int mds_finish_transno(struct mds_obd *mds, void *handle,
52                               struct ptlrpc_request *req, int rc);
53 static int mds_cleanup(struct obd_device * obddev);
54
55 extern struct lprocfs_vars status_var_nm_1[];
56 extern struct lprocfs_vars status_class_var[];
57
58 inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
59 {
60         return &req->rq_export->exp_obd->u.mds;
61 }
62
63 static int mds_bulk_timeout(void *data)
64 {
65         struct ptlrpc_bulk_desc *desc = data;
66
67         ENTRY;
68         recovd_conn_fail(desc->bd_connection);
69         RETURN(1);
70 }
71
72 /* Assumes caller has already pushed into the kernel filesystem context */
73 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
74                         __u64 offset)
75 {
76         int rc = 0;
77         struct mds_obd *mds = mds_req2mds(req);
78         struct ptlrpc_bulk_desc *desc;
79         struct ptlrpc_bulk_page *bulk;
80         struct l_wait_info lwi;
81         char *buf;
82         ENTRY;
83
84         desc = ptlrpc_prep_bulk(req->rq_connection);
85         if (desc == NULL)
86                 GOTO(out, rc = -ENOMEM);
87
88         bulk = ptlrpc_prep_bulk_page(desc);
89         if (bulk == NULL)
90                 GOTO(cleanup_bulk, rc = -ENOMEM);
91
92         OBD_ALLOC(buf, PAGE_SIZE);
93         if (buf == NULL)
94                 GOTO(cleanup_bulk, rc = -ENOMEM);
95
96         rc = mds_fs_readpage(mds, file, buf, PAGE_SIZE, (loff_t *)&offset);
97
98         if (rc != PAGE_SIZE)
99                 GOTO(cleanup_buf, rc = -EIO);
100
101         bulk->bp_xid = req->rq_xid;
102         bulk->bp_buf = buf;
103         bulk->bp_buflen = PAGE_SIZE;
104         desc->bd_portal = MDS_BULK_PORTAL;
105
106         rc = ptlrpc_send_bulk(desc);
107         if (rc)
108                 GOTO(cleanup_buf, rc);
109
110         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
111                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
112                        OBD_FAIL_MDS_SENDPAGE, rc);
113                 ptlrpc_abort_bulk(desc);
114                 GOTO(cleanup_buf, rc);
115         }
116
117         lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc);
118         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT,
119                           &lwi);
120         if (rc) {
121                 if (rc != -ETIMEDOUT)
122                         LBUG();
123                 GOTO(cleanup_buf, rc);
124         }
125
126         EXIT;
127  cleanup_buf:
128         OBD_FREE(buf, PAGE_SIZE);
129  cleanup_bulk:
130         ptlrpc_free_bulk(desc);
131  out:
132         return rc;
133 }
134
135 /*
136  * Look up a named entry in a directory, and get an LDLM lock on it.
137  * 'dir' is a inode for which an LDLM lock has already been taken.
138  *
139  * If we do not need an exclusive or write lock on this entry (e.g.
140  * a read lock for attribute lookup only) then we do not hold the
141  * directory on return.  It is up to the caller to know what type
142  * of lock it is getting, and clean up appropriately.
143  */
144 struct dentry *mds_name2locked_dentry(struct obd_device *obd,
145                                       struct dentry *dir, struct vfsmount **mnt,
146                                       char *name, int namelen, int lock_mode,
147                                       struct lustre_handle *lockh,
148                                       int dir_lock_mode)
149 {
150         struct dentry *dchild;
151         int flags = 0, rc;
152         __u64 res_id[3] = {0};
153         ENTRY;
154
155         down(&dir->d_inode->i_sem);
156         dchild = lookup_one_len(name, dir, namelen);
157         if (IS_ERR(dchild)) {
158                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
159                 up(&dir->d_inode->i_sem);
160                 LBUG();
161                 RETURN(dchild);
162         }
163         if (dir_lock_mode != LCK_EX && dir_lock_mode != LCK_PW) {
164                 up(&dir->d_inode->i_sem);
165                 ldlm_lock_decref(lockh, dir_lock_mode);
166         }
167
168         if (lock_mode == 0 || !dchild->d_inode)
169                 RETURN(dchild);
170
171         res_id[0] = dchild->d_inode->i_ino;
172         res_id[1] = dchild->d_inode->i_generation;
173         rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL,
174                                    res_id, LDLM_PLAIN, NULL, 0, lock_mode,
175                                    &flags, ldlm_completion_ast,
176                                    mds_blocking_ast, NULL, 0, lockh);
177         if (rc != ELDLM_OK) {
178                 l_dput(dchild);
179                 up(&dir->d_inode->i_sem);
180                 RETURN(ERR_PTR(-ENOLCK)); /* XXX translate ldlm code */
181         }
182
183         RETURN(dchild);
184 }
185
186 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
187                                      struct vfsmount **mnt, int lock_mode,
188                                      struct lustre_handle *lockh)
189 {
190         struct mds_obd *mds = &obd->u.mds;
191         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
192         int flags = 0, rc;
193         __u64 res_id[3] = {0};
194         ENTRY;
195
196         if (IS_ERR(de))
197                 RETURN(de);
198
199         res_id[0] = de->d_inode->i_ino;
200         res_id[1] = de->d_inode->i_generation;
201         rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL,
202                                    res_id, LDLM_PLAIN, NULL, 0, lock_mode,
203                                    &flags, ldlm_completion_ast,
204                                    mds_blocking_ast, NULL, 0, lockh);
205         if (rc != ELDLM_OK) {
206                 l_dput(de);
207                 retval = ERR_PTR(-ENOLCK); /* XXX translate ldlm code */
208         }
209
210         RETURN(retval);
211 }
212
213 #ifndef DCACHE_DISCONNECTED
214 #define DCACHE_DISCONNECTED DCACHE_NFSD_DISCONNECTED
215 #endif
216
217 /* Look up an entry by inode number. */
218 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
219                               struct vfsmount **mnt)
220 {
221         /* stolen from NFS */
222         struct super_block *sb = mds->mds_sb;
223         unsigned long ino = fid->id;
224         __u32 generation = fid->generation;
225         struct inode *inode;
226         struct list_head *lp;
227         struct dentry *result;
228
229         if (ino == 0)
230                 RETURN(ERR_PTR(-ESTALE));
231
232         inode = iget(sb, ino);
233         if (inode == NULL)
234                 RETURN(ERR_PTR(-ENOMEM));
235
236         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
237
238         if (is_bad_inode(inode) ||
239             (generation && inode->i_generation != generation)) {
240                 /* we didn't find the right inode.. */
241                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
242                        inode->i_ino, inode->i_nlink,
243                        atomic_read(&inode->i_count), inode->i_generation,
244                        generation);
245                 iput(inode);
246                 RETURN(ERR_PTR(-ENOENT));
247         }
248
249         /* now to find a dentry. If possible, get a well-connected one */
250         if (mnt)
251                 *mnt = mds->mds_vfsmnt;
252         spin_lock(&dcache_lock);
253         list_for_each(lp, &inode->i_dentry) {
254                 result = list_entry(lp, struct dentry, d_alias);
255                 if (!(result->d_flags & DCACHE_DISCONNECTED)) {
256                         dget_locked(result);
257                         result->d_vfs_flags |= DCACHE_REFERENCED;
258                         spin_unlock(&dcache_lock);
259                         iput(inode);
260                         if (mnt)
261                                 mntget(*mnt);
262                         return result;
263                 }
264         }
265         spin_unlock(&dcache_lock);
266         result = d_alloc_root(inode);
267         if (result == NULL) {
268                 iput(inode);
269                 return ERR_PTR(-ENOMEM);
270         }
271         if (mnt)
272                 mntget(*mnt);
273         result->d_flags |= DCACHE_DISCONNECTED;
274         return result;
275 }
276
277 /* Establish a connection to the MDS.
278  *
279  * This will set up an export structure for the client to hold state data
280  * about that client, like open files, the last operation number it did
281  * on the server, etc.
282  */
283 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
284                        obd_uuid_t cluuid, struct recovd_obd *recovd,
285                        ptlrpc_recovery_cb_t recover)
286 {
287         struct obd_export *exp;
288         struct mds_export_data *med;
289         struct mds_client_data *mcd;
290         struct list_head *p;
291         int rc;
292         ENTRY;
293
294         if (!conn || !obd || !cluuid)
295                 RETURN(-EINVAL);
296
297         MOD_INC_USE_COUNT;
298
299         spin_lock(&obd->obd_dev_lock);
300         list_for_each(p, &obd->obd_exports) {
301                 exp = list_entry(p, struct obd_export, exp_obd_chain);
302                 mcd = exp->exp_mds_data.med_mcd;
303                 if (!mcd) {
304                         CERROR("FYI: NULL mcd - simultaneous connects\n");
305                         continue;
306                 }
307                 if (!memcmp(cluuid, mcd->mcd_uuid, sizeof mcd->mcd_uuid)) {
308                         /* XXX make handle-found-export a subroutine */
309                         LASSERT(exp->exp_obd == obd);
310
311                         spin_unlock(&obd->obd_dev_lock);
312                         if (exp->exp_connection) {
313                                 struct lustre_handle *hdl;
314                                 hdl = &exp->exp_ldlm_data.led_import.imp_handle;
315                                 /* Might be a re-connect after a partition. */
316                                 if (!memcmp(conn, hdl, sizeof *conn)) {
317                                         CERROR("%s reconnecting\n", cluuid);
318                                         conn->addr = (__u64) (unsigned long)exp;
319                                         conn->cookie = exp->exp_cookie;
320                                         rc = EALREADY;
321                                 } else {
322                                         CERROR("%s reconnecting from %s, "
323                                                "handle mismatch (ours %Lx/%Lx, "
324                                                "theirs %Lx/%Lx)\n", cluuid,
325                                                exp->exp_connection->
326                                                c_remote_uuid, hdl->addr,
327                                                hdl->cookie, conn->addr,
328                                                conn->cookie);
329                                         /* XXX disconnect them here? */
330                                         memset(conn, 0, sizeof *conn);
331                                         rc = -EALREADY;
332                                 }
333                                 MOD_DEC_USE_COUNT;
334                                 RETURN(rc);
335                         }
336                         conn->addr = (__u64) (unsigned long)exp;
337                         conn->cookie = exp->exp_cookie;
338                         CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n",
339                                cluuid, exp);
340                         CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n",
341                                (long long)conn->addr, (long long)conn->cookie);
342                         RETURN(0);
343                 }
344         }
345         spin_unlock(&obd->obd_dev_lock);
346
347         if (obd->u.mds.mds_recoverable_clients != 0) {
348                 CERROR("denying connection for new client %s: in recovery\n",
349                        cluuid);
350                 MOD_DEC_USE_COUNT;
351                 RETURN(-EBUSY);
352         }
353
354         /* XXX There is a small race between checking the list and adding a
355          * new connection for the same UUID, but the real threat (list
356          * corruption when multiple different clients connect) is solved.
357          *
358          * There is a second race between adding the export to the list,
359          * and filling in the client data below.  Hence skipping the case
360          * of NULL mcd above.  We should already be controlling multiple
361          * connects at the client, and we can't hold the spinlock over
362          * memory allocations without risk of deadlocking.
363          */
364         rc = class_connect(conn, obd, cluuid);
365         if (rc)
366                 GOTO(out_dec, rc);
367         exp = class_conn2export(conn);
368         LASSERT(exp);
369         med = &exp->exp_mds_data;
370
371         OBD_ALLOC(mcd, sizeof(*mcd));
372         if (!mcd) {
373                 CERROR("mds: out of memory for client data\n");
374                 GOTO(out_export, rc = -ENOMEM);
375         }
376
377         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
378         med->med_mcd = mcd;
379
380         INIT_LIST_HEAD(&med->med_open_head);
381         spin_lock_init(&med->med_open_lock);
382
383         rc = mds_client_add(&obd->u.mds, med, -1);
384         if (rc)
385                 GOTO(out_mcd, rc);
386
387         RETURN(0);
388
389 out_mcd:
390         OBD_FREE(mcd, sizeof(*mcd));
391 out_export:
392         class_disconnect(conn);
393 out_dec:
394         MOD_DEC_USE_COUNT;
395
396         return rc;
397 }
398
399 /* Call with med->med_open_lock held, please. */
400 inline int mds_close_mfd(struct mds_file_data *mfd, struct mds_export_data *med)
401 {
402         struct file *file = mfd->mfd_file;
403         LASSERT(file->private_data == mfd);
404
405         list_del(&mfd->mfd_list);
406         mfd->mfd_servercookie = DEAD_HANDLE_MAGIC;
407         kmem_cache_free(mds_file_cache, mfd);
408
409         return filp_close(file, 0);
410 }
411
412 static int mds_disconnect(struct lustre_handle *conn)
413 {
414         struct obd_export *export = class_conn2export(conn);
415         struct list_head *tmp, *n;
416         struct mds_export_data *med = &export->exp_mds_data;
417         int rc;
418         ENTRY;
419
420         /*
421          * Close any open files.
422          */
423         spin_lock(&med->med_open_lock);
424         list_for_each_safe(tmp, n, &med->med_open_head) {
425                 struct mds_file_data *mfd =
426                         list_entry(tmp, struct mds_file_data, mfd_list);
427                 rc = mds_close_mfd(mfd, med);
428                 if (rc) {
429                         /* XXX better diagnostics, with file path and stuff */
430                         CDEBUG(D_INODE, "Error %d closing mfd %p\n", rc, mfd);
431                 }
432         }
433         spin_unlock(&med->med_open_lock);
434
435         ldlm_cancel_locks_for_export(export);
436         mds_client_free(export);
437
438         rc = class_disconnect(conn);
439         if (!rc)
440                 MOD_DEC_USE_COUNT;
441
442         RETURN(rc);
443 }
444
445 /*
446  * XXX This is NOT guaranteed to flush all transactions to disk (even though
447  *     it is equivalent to calling sync()) because it only _starts_ the flush
448  *     and does not wait for completion.  It's better than nothing though.
449  *     What we really want is a mild form of fsync_dev_lockfs(), but it is
450  *     non-standard, or enabling do_sync_supers in ext3, just for this call.
451  */
452 static void mds_fsync_super(struct super_block *sb)
453 {
454         lock_kernel();
455         lock_super(sb);
456         if (sb->s_dirt && sb->s_op && sb->s_op->write_super)
457                 sb->s_op->write_super(sb);
458         unlock_super(sb);
459         unlock_kernel();
460 }
461
462 static int mds_getstatus(struct ptlrpc_request *req)
463 {
464         struct mds_obd *mds = mds_req2mds(req);
465         struct mds_body *body;
466         int rc, size = sizeof(*body);
467         ENTRY;
468
469         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
470         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
471                 CERROR("mds: out of memory for message: size=%d\n", size);
472                 req->rq_status = -ENOMEM;
473                 RETURN(0);
474         }
475
476         /* Flush any outstanding transactions to disk so the client will
477          * get the latest last_committed value and can drop their local
478          * requests if they have any.  This would be fsync_super() if it
479          * was exported.
480          */
481         mds_fsync_super(mds->mds_sb);
482
483         body = lustre_msg_buf(req->rq_repmsg, 0);
484         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
485
486         /* the last_committed and last_xid fields are filled in for all
487          * replies already - no need to do so here also.
488          */
489         RETURN(0);
490 }
491
492 static int mds_getlovinfo(struct ptlrpc_request *req)
493 {
494         struct mds_obd *mds = mds_req2mds(req);
495         struct mds_status_req *streq;
496         struct lov_desc *desc;
497         int tgt_count;
498         int rc, size[2] = {sizeof(*desc)};
499         ENTRY;
500
501         streq = lustre_msg_buf(req->rq_reqmsg, 0);
502         streq->flags = NTOH__u32(streq->flags);
503         streq->repbuf = NTOH__u32(streq->repbuf);
504         size[1] = streq->repbuf;
505
506         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
507         if (rc) {
508                 CERROR("mds: out of memory for message: size=%d\n", size[1]);
509                 req->rq_status = -ENOMEM;
510                 RETURN(0);
511         }
512
513         desc = lustre_msg_buf(req->rq_repmsg, 0);
514         rc = mds_get_lovdesc(mds, desc);
515         if (rc) {
516                 CERROR("mds_get_lovdesc error %d", rc);
517                 req->rq_status = rc;
518                 RETURN(0);
519         }
520
521         tgt_count = le32_to_cpu(desc->ld_tgt_count);
522         if (tgt_count * sizeof(obd_uuid_t) > streq->repbuf) {
523                 CERROR("too many targets, enlarge client buffers\n");
524                 req->rq_status = -ENOSPC;
525                 RETURN(0);
526         }
527
528         /* XXX the MDS should not really know about this */
529         mds->mds_max_mdsize = lov_mds_md_size(tgt_count);
530         rc = mds_get_lovtgts(mds, tgt_count,
531                              lustre_msg_buf(req->rq_repmsg, 1));
532         if (rc) {
533                 CERROR("get_lovtgts error %d\n", rc);
534                 req->rq_status = rc;
535                 RETURN(0);
536         }
537         RETURN(0);
538 }
539
540 int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
541                      void *data, __u32 data_len, int flag)
542 {
543         int do_ast;
544         ENTRY;
545
546         if (flag == LDLM_CB_CANCELING) {
547                 /* Don't need to do anything here. */
548                 RETURN(0);
549         }
550
551         /* XXX layering violation!  -phil */
552         l_lock(&lock->l_resource->lr_namespace->ns_lock);
553         lock->l_flags |= LDLM_FL_CBPENDING;
554         do_ast = (!lock->l_readers && !lock->l_writers);
555         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
556
557         if (do_ast) {
558                 struct lustre_handle lockh;
559                 int rc;
560
561                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
562                 ldlm_lock2handle(lock, &lockh);
563                 rc = ldlm_cli_cancel(&lockh);
564                 if (rc < 0)
565                         CERROR("ldlm_cli_cancel: %d\n", rc);
566         } else
567                 LDLM_DEBUG(lock, "Lock still has references, will be"
568                            "cancelled later");
569         RETURN(0);
570 }
571
572 int mds_pack_md(struct mds_obd *mds, struct ptlrpc_request *req,
573                 int offset, struct mds_body *body, struct inode *inode)
574 {
575         struct lov_mds_md *lmm;
576         int lmm_size = req->rq_repmsg->buflens[offset];
577         int rc;
578
579         if (lmm_size == 0) {
580                 CDEBUG(D_INFO, "no space reserved for inode %u MD\n", inode->i_ino);
581                 RETURN(0);
582         }
583
584         lmm = lustre_msg_buf(req->rq_repmsg, offset);
585
586         /* I don't really like this, but it is a sanity check on the client
587          * MD request.  However, if the client doesn't know how much space
588          * to reserve for the MD, this shouldn't be fatal either...
589          */
590         if (lmm_size > mds->mds_max_mdsize) {
591                 CERROR("Reading MD for inode %u of %d bytes > max %d\n",
592                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
593                 // RETURN(-EINVAL);
594         }
595
596         /* We don't need to store the reply size, because this buffer is
597          * discarded right after unpacking, and the LOV can figure out the
598          * size itself from the ost count.
599          */
600         if ((rc = mds_fs_get_md(mds, inode, lmm, lmm_size)) < 0) {
601                 CDEBUG(D_INFO, "No md for ino %u: rc = %d\n", inode->i_ino, rc);
602         } else if (rc > 0) {
603                 body->valid |= OBD_MD_FLEASIZE;
604                 rc = 0;
605         }
606
607         return rc;
608 }
609
610 static int mds_getattr_internal(struct mds_obd *mds, struct dentry *dentry,
611                                 struct ptlrpc_request *req,
612                                 struct mds_body *reqbody, int reply_off)
613 {
614         struct mds_body *body;
615         struct inode *inode = dentry->d_inode;
616         int rc = 0;
617         ENTRY;
618
619         if (inode == NULL)
620                 RETURN(-ENOENT);
621
622         body = lustre_msg_buf(req->rq_repmsg, reply_off);
623
624         mds_pack_inode2fid(&body->fid1, inode);
625         mds_pack_inode2body(body, inode);
626
627         if (S_ISREG(inode->i_mode)) {
628                 rc = mds_pack_md(mds, req, reply_off + 1, body, inode);
629         } else if (S_ISLNK(inode->i_mode) && reqbody->valid & OBD_MD_LINKNAME) {
630                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1);
631                 int len = req->rq_repmsg->buflens[reply_off + 1];
632
633                 rc = inode->i_op->readlink(dentry, symname, len);
634                 if (rc < 0) {
635                         CERROR("readlink failed: %d\n", rc);
636                 } else {
637                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
638                         body->valid |= OBD_MD_LINKNAME;
639                 }
640         }
641         RETURN(rc);
642 }
643
644 static int mds_getattr_name(int offset, struct ptlrpc_request *req)
645 {
646         struct mds_obd *mds = mds_req2mds(req);
647         struct obd_device *obd = req->rq_export->exp_obd;
648         struct obd_run_ctxt saved;
649         struct mds_body *body;
650         struct dentry *de = NULL, *dchild = NULL;
651         struct inode *dir;
652         struct lustre_handle lockh;
653         char *name;
654         int namelen, flags = 0, lock_mode, rc = 0;
655         struct obd_ucred uc;
656         __u64 res_id[3] = {0, 0, 0};
657         ENTRY;
658
659         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
660
661         if (req->rq_reqmsg->bufcount <= offset + 1) {
662                 LBUG();
663                 GOTO(out_pre_de, rc = -EINVAL);
664         }
665
666         body = lustre_msg_buf(req->rq_reqmsg, offset);
667         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
668         namelen = req->rq_reqmsg->buflens[offset + 1];
669         /* requests were at offset 2, replies go back at 1 */
670         if (offset)
671                 offset = 1;
672
673         uc.ouc_fsuid = body->fsuid;
674         uc.ouc_fsgid = body->fsgid;
675         uc.ouc_cap = body->capability;
676         push_ctxt(&saved, &mds->mds_ctxt, &uc);
677         de = mds_fid2dentry(mds, &body->fid1, NULL);
678         if (IS_ERR(de)) {
679                 GOTO(out_pre_de, rc = -ENOENT);
680         }
681
682         dir = de->d_inode;
683         CDEBUG(D_INODE, "parent ino %ld, name %*s\n", dir->i_ino,namelen,name);
684
685         lock_mode = LCK_PR;
686         res_id[0] = dir->i_ino;
687         res_id[1] = dir->i_generation;
688
689         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
690                              NULL, 0, lock_mode, &lockh);
691         if (rc == 0) {
692                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
693                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
694                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
695                                       &flags, ldlm_completion_ast,
696                                       mds_blocking_ast, NULL, 0, &lockh);
697                 if (rc != ELDLM_OK) {
698                         CERROR("lock enqueue: err: %d\n", rc);
699                         GOTO(out_create_de, rc = -EIO);
700                 }
701         }
702         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
703
704         down(&dir->i_sem);
705         dchild = lookup_one_len(name, de, namelen - 1);
706         if (IS_ERR(dchild)) {
707                 CDEBUG(D_INODE, "child lookup error %ld\n", PTR_ERR(dchild));
708                 up(&dir->i_sem);
709                 GOTO(out_create_dchild, rc = PTR_ERR(dchild));
710         }
711
712         rc = mds_getattr_internal(mds, dchild, req, body, offset);
713
714         EXIT;
715 out_create_dchild:
716         l_dput(dchild);
717         up(&dir->i_sem);
718         ldlm_lock_decref(&lockh, lock_mode);
719 out_create_de:
720         l_dput(de);
721 out_pre_de:
722         req->rq_status = rc;
723         pop_ctxt(&saved);
724         return 0;
725 }
726
727 static int mds_getattr(int offset, struct ptlrpc_request *req)
728 {
729         struct mds_obd *mds = mds_req2mds(req);
730         struct obd_run_ctxt saved;
731         struct dentry *de;
732         struct inode *inode;
733         struct mds_body *body;
734         struct obd_ucred uc;
735         int rc = 0, size[2] = {sizeof(*body)}, bufcount = 1;
736         ENTRY;
737
738         body = lustre_msg_buf(req->rq_reqmsg, offset);
739         uc.ouc_fsuid = body->fsuid;
740         uc.ouc_fsgid = body->fsgid;
741         uc.ouc_cap = body->capability;
742         push_ctxt(&saved, &mds->mds_ctxt, &uc);
743         de = mds_fid2dentry(mds, &body->fid1, NULL);
744         if (IS_ERR(de)) {
745                 rc = req->rq_status = -ENOENT;
746                 GOTO(out_pop, PTR_ERR(de));
747         }
748
749         inode = de->d_inode;
750         if (S_ISREG(body->fid1.f_type)) {
751                 int rc = mds_fs_get_md(mds, inode, NULL, 0);
752                 CDEBUG(D_INODE, "got %d bytes MD data for inode %u\n",
753                        rc, inode->i_ino);
754                 if (rc < 0) {
755                         if (rc != -ENODATA)
756                                 CERROR("error getting inode %u MD: rc = %d\n",
757                                        inode->i_ino, rc);
758                         size[bufcount] = 0;
759                 } else if (rc > mds->mds_max_mdsize) {
760                         size[bufcount] = 0;
761                         CERROR("MD size %d larger than maximum possible %u\n",
762                                rc, mds->mds_max_mdsize);
763                 } else
764                         size[bufcount] = rc;
765                 bufcount++;
766         } else if (body->valid & OBD_MD_LINKNAME) {
767                 size[bufcount] = MIN(inode->i_size + 1, body->size);
768                 bufcount++;
769                 CDEBUG(D_INODE, "symlink size: %d, reply space: %d\n",
770                        inode->i_size + 1, body->size);
771         }
772
773         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
774                 CERROR("failed MDS_GETATTR_PACK test\n");
775                 req->rq_status = -ENOMEM;
776                 GOTO(out, rc = -ENOMEM);
777         }
778
779         rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
780                              &req->rq_repmsg);
781         if (rc) {
782                 CERROR("out of memoryK\n");
783                 req->rq_status = rc;
784                 GOTO(out, rc);
785         }
786
787         req->rq_status = mds_getattr_internal(mds, de, req, body, 0);
788
789 out:
790         l_dput(de);
791 out_pop:
792         pop_ctxt(&saved);
793         RETURN(rc);
794 }
795
796 static int mds_statfs(struct ptlrpc_request *req)
797 {
798         struct mds_obd *mds = mds_req2mds(req);
799         struct obd_statfs *osfs;
800         struct statfs sfs;
801         int rc, size = sizeof(*osfs);
802         ENTRY;
803
804         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
805         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
806                 CERROR("mds: statfs lustre_pack_msg failed: rc = %d\n", rc);
807                 GOTO(out, rc);
808         }
809
810         rc = mds_fs_statfs(mds, &sfs);
811         if (rc) {
812                 CERROR("mds: statfs failed: rc %d\n", rc);
813                 GOTO(out, rc);
814         }
815         osfs = lustre_msg_buf(req->rq_repmsg, 0);
816         memset(osfs, 0, size);
817         statfs_pack(osfs, &sfs);
818         obd_statfs_pack(osfs, osfs);
819
820 out:
821         req->rq_status = rc;
822         RETURN(0);
823 }
824
825 static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle)
826 {
827         struct mds_file_data *mfd = NULL;
828
829         if (!handle || !handle->addr)
830                 RETURN(NULL);
831
832         mfd = (struct mds_file_data *)(unsigned long)(handle->addr);
833         if (!kmem_cache_validate(mds_file_cache, mfd))
834                 RETURN(NULL);
835
836         if (mfd->mfd_servercookie != handle->cookie)
837                 RETURN(NULL);
838
839         return mfd;
840 }
841
842 static int mds_store_md(struct mds_obd *mds, struct ptlrpc_request *req,
843                         int offset, struct mds_body *body, struct inode *inode)
844 {
845         struct lov_mds_md *lmm = lustre_msg_buf(req->rq_reqmsg, offset);
846         int lmm_size = req->rq_reqmsg->buflens[offset];
847         struct obd_run_ctxt saved;
848         struct obd_ucred uc;
849         void *handle;
850         int rc, rc2;
851         ENTRY;
852
853         /* I don't really like this, but it is a sanity check on the client
854          * MD request.
855          */
856         if (lmm_size > mds->mds_max_mdsize) {
857                 CERROR("Saving MD for inode %u of %d bytes > max %d\n",
858                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
859                 //RETURN(-EINVAL);
860         }
861
862         CDEBUG(D_INODE, "storing %d bytes MD for inode %u\n",
863                lmm_size, inode->i_ino);
864         uc.ouc_fsuid = body->fsuid;
865         uc.ouc_fsgid = body->fsgid;
866         uc.ouc_cap = body->capability;
867         push_ctxt(&saved, &mds->mds_ctxt, &uc);
868         mds_start_transno(mds);
869         handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR);
870         if (IS_ERR(handle)) {
871                 rc = PTR_ERR(handle);
872                 mds_finish_transno(mds, handle, req, rc);
873                 GOTO(out_ea, rc);
874         }
875
876         rc = mds_fs_set_md(mds, inode, handle, lmm, lmm_size);
877         rc = mds_finish_transno(mds, handle, req, rc);
878
879         rc2 = mds_fs_commit(mds, inode, handle);
880         if (rc2 && !rc)
881                 rc = rc2;
882 out_ea:
883         pop_ctxt(&saved);
884
885         RETURN(rc);
886 }
887
888 static int mds_open(struct ptlrpc_request *req)
889 {
890         struct mds_obd *mds = mds_req2mds(req);
891         struct mds_body *body;
892         struct mds_export_data *med;
893         struct mds_file_data *mfd;
894         struct dentry *de;
895         struct file *file;
896         struct vfsmount *mnt;
897         __u32 flags;
898         struct list_head *tmp;
899         int rc, size = sizeof(*body);
900         ENTRY;
901
902         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
903                 CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n");
904                 req->rq_status = -ENOMEM;
905                 RETURN(-ENOMEM);
906         }
907
908         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
909         if (rc) {
910                 CERROR("mds: pack error: rc = %d\n", rc);
911                 req->rq_status = rc;
912                 RETURN(rc);
913         }
914
915         body = lustre_msg_buf(req->rq_reqmsg, 0);
916
917         /* was this animal open already and the client lost the reply? */
918         /* XXX need some way to detect a reopen, to avoid locked list walks */
919         med = &req->rq_export->exp_mds_data;
920         spin_lock(&med->med_open_lock);
921         list_for_each(tmp, &med->med_open_head) {
922                 mfd = list_entry(tmp, typeof(*mfd), mfd_list);
923                 if (!memcmp(&mfd->mfd_clienthandle, &body->handle,
924                             sizeof(mfd->mfd_clienthandle)) &&
925                     body->fid1.id == mfd->mfd_file->f_dentry->d_inode->i_ino) {
926                         de = mfd->mfd_file->f_dentry;
927                         spin_unlock(&med->med_open_lock);
928                         CERROR("Re opening "LPD64"\n", body->fid1.id);
929                         GOTO(out_pack, rc = 0);
930                 }
931         }
932         spin_unlock(&med->med_open_lock);
933
934         mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL);
935         if (!mfd) {
936                 CERROR("mds: out of memory\n");
937                 req->rq_status = -ENOMEM;
938                 RETURN(0);
939         }
940
941         de = mds_fid2dentry(mds, &body->fid1, &mnt);
942         if (IS_ERR(de))
943                 GOTO(out_free, rc = PTR_ERR(de));
944
945         /* check if this inode has seen a delayed object creation */
946         if (lustre_msg_get_op_flags(req->rq_reqmsg) & MDS_OPEN_HAS_EA) {
947                 rc = mds_store_md(mds, req, 1, body, de->d_inode);
948                 if (rc) {
949                         l_dput(de);
950                         mntput(mnt);
951                         GOTO(out_free, rc);
952                 }
953         }
954
955         flags = body->flags;
956         /* dentry_open does a dput(de) and mntput(mnt) on error */
957         file = dentry_open(de, mnt, flags & ~O_DIRECT);
958         if (IS_ERR(file)) {
959                 rc = PTR_ERR(file);
960                 GOTO(out_free, 0);
961         }
962
963         file->private_data = mfd;
964         mfd->mfd_file = file;
965         memcpy(&mfd->mfd_clienthandle, &body->handle, sizeof(body->handle));
966         get_random_bytes(&mfd->mfd_servercookie, sizeof(mfd->mfd_servercookie));
967         spin_lock(&med->med_open_lock);
968         list_add(&mfd->mfd_list, &med->med_open_head);
969         spin_unlock(&med->med_open_lock);
970
971 out_pack:
972         body = lustre_msg_buf(req->rq_repmsg, 0);
973         mds_pack_inode2fid(&body->fid1, de->d_inode);
974         mds_pack_inode2body(body, de->d_inode);
975         body->handle.addr = (__u64)(unsigned long)mfd;
976         body->handle.cookie = mfd->mfd_servercookie;
977         CDEBUG(D_INODE, "llite file "LPX64": addr %p, cookie "LPX64"\n",
978                mfd->mfd_clienthandle.addr, mfd, mfd->mfd_servercookie);
979         RETURN(0);
980
981 out_free:
982         mfd->mfd_servercookie = DEAD_HANDLE_MAGIC;
983         kmem_cache_free(mds_file_cache, mfd);
984         req->rq_status = rc;
985         RETURN(0);
986 }
987
988 static int mds_close(struct ptlrpc_request *req)
989 {
990         struct mds_export_data *med = &req->rq_export->exp_mds_data;
991         struct mds_body *body;
992         struct mds_file_data *mfd;
993         int rc;
994         ENTRY;
995
996         body = lustre_msg_buf(req->rq_reqmsg, 0);
997
998         mfd = mds_handle2mfd(&body->handle);
999         if (!mfd) {
1000                 CERROR("no handle for file close "LPD64
1001                        ": addr "LPX64", cookie "LPX64"\n",
1002                        body->fid1.id, body->handle.addr, body->handle.cookie);
1003                 RETURN(-ESTALE);
1004         }
1005
1006         spin_lock(&med->med_open_lock);
1007         req->rq_status = mds_close_mfd(mfd, med);
1008         spin_unlock(&med->med_open_lock);
1009
1010         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
1011                 CERROR("test case OBD_FAIL_MDS_CLOSE_PACK\n");
1012                 req->rq_status = -ENOMEM;
1013                 RETURN(-ENOMEM);
1014         }
1015
1016         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
1017         if (rc) {
1018                 CERROR("mds: lustre_pack_msg: rc = %d\n", rc);
1019                 req->rq_status = rc;
1020         }
1021
1022         RETURN(0);
1023 }
1024
1025 static int mds_readpage(struct ptlrpc_request *req)
1026 {
1027         struct mds_obd *mds = mds_req2mds(req);
1028         struct vfsmount *mnt;
1029         struct dentry *de;
1030         struct file *file;
1031         struct mds_body *body, *repbody;
1032         struct obd_run_ctxt saved;
1033         int rc, size = sizeof(*body);
1034         struct obd_ucred uc;
1035         ENTRY;
1036
1037         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
1038         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
1039                 CERROR("mds: out of memory\n");
1040                 GOTO(out, rc = -ENOMEM);
1041         }
1042
1043         body = lustre_msg_buf(req->rq_reqmsg, 0);
1044         uc.ouc_fsuid = body->fsuid;
1045         uc.ouc_fsgid = body->fsgid;
1046         uc.ouc_cap = body->capability;
1047         push_ctxt(&saved, &mds->mds_ctxt, &uc);
1048         de = mds_fid2dentry(mds, &body->fid1, &mnt);
1049         if (IS_ERR(de))
1050                 GOTO(out_pop, rc = PTR_ERR(de));
1051
1052         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
1053
1054         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1055         /* note: in case of an error, dentry_open puts dentry */
1056         if (IS_ERR(file))
1057                 GOTO(out_pop, rc = PTR_ERR(file));
1058
1059         repbody = lustre_msg_buf(req->rq_repmsg, 0);
1060         repbody->size = file->f_dentry->d_inode->i_size;
1061         repbody->valid = OBD_MD_FLSIZE;
1062
1063         /* to make this asynchronous make sure that the handling function
1064            doesn't send a reply when this function completes. Instead a
1065            callback function would send the reply */
1066         /* note: in case of an error, dentry_open puts dentry */
1067         rc = mds_sendpage(req, file, body->size);
1068
1069         filp_close(file, 0);
1070 out_pop:
1071         pop_ctxt(&saved);
1072 out:
1073         req->rq_status = rc;
1074         RETURN(0);
1075 }
1076
1077 int mds_reint(struct ptlrpc_request *req, int offset)
1078 {
1079         int rc;
1080         struct mds_update_record rec;
1081
1082         rc = mds_update_unpack(req, offset, &rec);
1083         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1084                 CERROR("invalid record\n");
1085                 req->rq_status = -EINVAL;
1086                 RETURN(0);
1087         }
1088         /* rc will be used to interrupt a for loop over multiple records */
1089         rc = mds_reint_rec(&rec, offset, req);
1090         return rc;
1091 }
1092
1093 /* forward declaration */
1094 int mds_handle(struct ptlrpc_request *req);
1095
1096 static int check_for_next_transno(struct mds_obd *mds)
1097 {
1098         struct ptlrpc_request *req;
1099         req = list_entry(mds->mds_recovery_queue.next, 
1100                          struct ptlrpc_request, rq_list);
1101         return req->rq_reqmsg->transno == mds->mds_next_recovery_transno;
1102 }
1103
1104 static void process_recovery_queue(struct mds_obd *mds)
1105 {
1106         struct ptlrpc_request *req;
1107         
1108         for (;;) {
1109                 spin_lock(&mds->mds_processing_task_lock);
1110                 req = list_entry(mds->mds_recovery_queue.next, 
1111                                  struct ptlrpc_request, rq_list);
1112
1113                 if (req->rq_reqmsg->transno != mds->mds_next_recovery_transno) {
1114                         spin_unlock(&mds->mds_processing_task_lock);
1115                         wait_event(mds->mds_next_transno_waitq,
1116                                    check_for_next_transno(mds));
1117                         continue;
1118                 }
1119                 list_del(&req->rq_list);
1120                 spin_unlock(&mds->mds_processing_task_lock);
1121
1122                 DEBUG_REQ(D_HA, req, "");
1123                 mds_handle(req);
1124                 
1125                 if (list_empty(&mds->mds_recovery_queue))
1126                         break;
1127         }
1128 }
1129
1130 static int queue_recovery_request(struct ptlrpc_request *req,
1131                                   struct mds_obd *mds)
1132 {
1133         struct list_head *tmp;
1134         int inserted = 0, transno = req->rq_reqmsg->transno;
1135
1136         if (!transno) {
1137                 DEBUG_REQ(D_HA, req, "not queueing");
1138                 return 1;
1139         }
1140
1141         spin_lock(&mds->mds_processing_task_lock);
1142
1143         if (mds->mds_processing_task == current->pid) {
1144                 /* Processing the queue right now, don't re-add. */
1145                 spin_unlock(&mds->mds_processing_task_lock);
1146                 return 1;
1147         }
1148
1149         /* XXX O(n^2) */
1150         list_for_each(tmp, &mds->mds_recovery_queue) {
1151                 struct ptlrpc_request *reqiter = 
1152                         list_entry(tmp, struct ptlrpc_request, rq_list);
1153                 if (reqiter->rq_reqmsg->transno > transno) {
1154                         list_add_tail(&req->rq_list, &reqiter->rq_list);
1155                         inserted = 1;
1156                         break;
1157                 }
1158         }
1159
1160         if (!inserted)
1161                 list_add_tail(&req->rq_list, &mds->mds_recovery_queue);
1162
1163         if (mds->mds_processing_task != 0) {
1164                 /* Someone else is processing this queue, we'll leave it to
1165                  * them.
1166                  */
1167                 spin_unlock(&mds->mds_processing_task_lock);
1168                 if (transno == mds->mds_next_recovery_transno)
1169                         wake_up(&mds->mds_next_transno_waitq);
1170                 return 0;
1171         }
1172
1173         /* Nobody is processing, and we know there's (at least) one to process
1174          * now, so we'll do the honours.
1175          */
1176         mds->mds_processing_task = current->pid;
1177         spin_unlock(&mds->mds_processing_task_lock);
1178
1179         process_recovery_queue(mds);
1180         return 0;
1181 }
1182
1183 static int filter_recovery_request(struct ptlrpc_request *req, 
1184                                    struct mds_obd *mds, int *process)
1185 {
1186         switch (req->rq_reqmsg->opc) {
1187         case MDS_CONNECT:
1188         case MDS_DISCONNECT:
1189         case MDS_OPEN:
1190                *process = 1;
1191                RETURN(0);
1192             
1193         case MDS_GETSTATUS: /* used in unmounting */
1194         case MDS_REINT:
1195         case LDLM_ENQUEUE:
1196                 *process = queue_recovery_request(req, mds);
1197                 RETURN(0);
1198                 
1199         default:
1200                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1201                 *process = 0;
1202                 RETURN(ptlrpc_error(req->rq_svc, req));
1203         }
1204 }
1205
1206 static int mds_queue_final_reply(struct ptlrpc_request *req, int rc)
1207 {
1208         struct mds_obd *mds = mds_req2mds(req);
1209
1210         if (rc) {
1211                 /* Just like ptlrpc_error, but without the sending. */
1212                 lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
1213                                 &req->rq_repmsg);
1214                 req->rq_type = PTL_RPC_MSG_ERR;
1215         }
1216
1217         list_add(&req->rq_list, &mds->mds_delayed_reply_queue);
1218         if (--mds->mds_recoverable_clients == 0) {
1219                 struct list_head *tmp, *n;
1220
1221                 CDEBUG(D_HA,
1222                        "all clients recovered, sending delayed replies\n");
1223                 list_for_each_safe(tmp, n, &mds->mds_delayed_reply_queue) {
1224                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
1225                         DEBUG_REQ(D_HA, req, "delayed:");
1226                         ptlrpc_reply(req->rq_svc, req);
1227                 }
1228         } else {
1229                 CDEBUG(D_HA, "%d recoverable clients remain\n",
1230                        mds->mds_recoverable_clients);
1231         }
1232
1233         return 1;
1234 }
1235
1236 static char *reint_names[] = {
1237         [REINT_SETATTR] "setattr",
1238         [REINT_CREATE]  "create",
1239         [REINT_LINK]    "link",
1240         [REINT_UNLINK]  "unlink",
1241         [REINT_RENAME]  "rename"
1242 };
1243
1244 int mds_handle(struct ptlrpc_request *req)
1245 {
1246         int rc;
1247         int should_process;
1248         struct mds_obd *mds = NULL; /* quell gcc overwarning */
1249         ENTRY;
1250
1251         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
1252         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
1253                 CERROR("lustre_mds: Invalid request\n");
1254                 GOTO(out, rc);
1255         }
1256
1257         LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME));
1258
1259         if (req->rq_reqmsg->opc != MDS_CONNECT) {
1260                 if (req->rq_export == NULL)
1261                         GOTO(out, rc = -ENOTCONN);
1262
1263                 mds = mds_req2mds(req);
1264                 if (mds->mds_recoverable_clients != 0) {
1265                         rc = filter_recovery_request(req, mds, &should_process);
1266                         if (rc || !should_process)
1267                                 RETURN(rc);
1268                 }
1269         }
1270
1271         switch (req->rq_reqmsg->opc) {
1272         case MDS_CONNECT:
1273                 DEBUG_REQ(D_INODE, req, "connect");
1274                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
1275                 rc = target_handle_connect(req);
1276                 /* Make sure that last_rcvd is correct. */
1277                 if (!rc) {
1278                         /* Now that we have an export, set mds. */
1279                         mds = mds_req2mds(req);
1280                         mds_fsync_super(mds->mds_sb);
1281                 }
1282                 break;
1283
1284         case MDS_DISCONNECT:
1285                 DEBUG_REQ(D_INODE, req, "disconnect");
1286                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
1287                 rc = target_handle_disconnect(req);
1288                 /* Make sure that last_rcvd is correct. */
1289                 if (!rc)
1290                         mds_fsync_super(mds->mds_sb);
1291                 goto out;
1292
1293         case MDS_GETSTATUS:
1294                 DEBUG_REQ(D_INODE, req, "getstatus");
1295                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
1296                 rc = mds_getstatus(req);
1297                 break;
1298
1299         case MDS_GETLOVINFO:
1300                 DEBUG_REQ(D_INODE, req, "getlovinfo");
1301                 rc = mds_getlovinfo(req);
1302                 break;
1303
1304         case MDS_GETATTR:
1305                 DEBUG_REQ(D_INODE, req, "getattr");
1306                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
1307                 rc = mds_getattr(0, req);
1308                 break;
1309
1310         case MDS_STATFS:
1311                 DEBUG_REQ(D_INODE, req, "statfs");
1312                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
1313                 rc = mds_statfs(req);
1314                 break;
1315
1316         case MDS_READPAGE:
1317                 DEBUG_REQ(D_INODE, req, "readpage\n");
1318                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
1319                 rc = mds_readpage(req);
1320
1321                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
1322                         return 0;
1323                 break;
1324
1325         case MDS_REINT: {
1326                 int size = sizeof(struct mds_body);
1327                 int opc = *(u32 *)lustre_msg_buf(req->rq_reqmsg, 0), 
1328                         realopc = opc & REINT_OPCODE_MASK;
1329                         
1330                 DEBUG_REQ(D_INODE, req, "reint (%s%s)",
1331                           reint_names[realopc],
1332                           opc & REINT_REPLAYING ? "|REPLAYING" : "");
1333                           
1334                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
1335
1336                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
1337                                      &req->rq_repmsg);
1338                 if (rc) {
1339                         req->rq_status = rc;
1340                         break;
1341                 }
1342                 rc = mds_reint(req, 0);
1343                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
1344                 break;
1345                 }
1346
1347         case MDS_OPEN:
1348                 DEBUG_REQ(D_INODE, req, "open");
1349                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
1350                 rc = mds_open(req);
1351                 break;
1352
1353         case MDS_CLOSE:
1354                 DEBUG_REQ(D_INODE, req, "close");
1355                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
1356                 rc = mds_close(req);
1357                 break;
1358
1359         case LDLM_ENQUEUE:
1360                 DEBUG_REQ(D_INODE, req, "enqueue");
1361                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1362                 rc = ldlm_handle_enqueue(req);
1363                 break;
1364         case LDLM_CONVERT:
1365                 DEBUG_REQ(D_INODE, req, "convert");
1366                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1367                 rc = ldlm_handle_convert(req);
1368                 break;
1369         case LDLM_BL_CALLBACK:
1370         case LDLM_CP_CALLBACK:
1371                 DEBUG_REQ(D_INODE, req, "callback");
1372                 CERROR("callbacks should not happen on MDS\n");
1373                 LBUG();
1374                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1375                 break;
1376         default:
1377                 rc = ptlrpc_error(req->rq_svc, req);
1378                 RETURN(rc);
1379         }
1380
1381         EXIT;
1382
1383         if (!rc) {
1384                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1385
1386                 req->rq_repmsg->last_xid =
1387                         HTON__u64(le64_to_cpu(med->med_mcd->mcd_last_xid));
1388                 req->rq_repmsg->last_committed =
1389                         HTON__u64(mds->mds_last_committed);
1390                 CDEBUG(D_INFO, "last_rcvd ~%Lu, last_committed %Lu, xid %d\n",
1391                        (unsigned long long)mds->mds_last_rcvd,
1392                        (unsigned long long)mds->mds_last_committed,
1393                        cpu_to_le32(req->rq_xid));
1394         }
1395  out:
1396
1397         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1398                 struct mds_obd *mds = mds_req2mds(req);
1399                 LASSERT(mds->mds_recoverable_clients);
1400                 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1401                 return mds_queue_final_reply(req, rc);
1402         }
1403         
1404         /* MDS_CONNECT / EALREADY (note: not -EALREADY!) isn't an error */
1405         if (rc && (req->rq_reqmsg->opc != MDS_CONNECT ||
1406                    rc != EALREADY)) {
1407                 CERROR("mds: processing error (opcode %d): %d\n",
1408                        req->rq_reqmsg->opc, rc);
1409                 ptlrpc_error(req->rq_svc, req);
1410         } else {
1411                 CDEBUG(D_NET, "sending reply\n");
1412                 ptlrpc_reply(req->rq_svc, req);
1413         }
1414         return 0;
1415 }
1416
1417 /* Update the server data on disk.  This stores the new mount_count and
1418  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
1419  * then the server last_rcvd value may be less than that of the clients.
1420  * This will alert us that we may need to do client recovery.
1421  *
1422  * Assumes we are already in the server filesystem context.
1423  *
1424  * Also assumes for mds_last_rcvd that we are not modifying it (no locking).
1425  */
1426 int mds_update_server_data(struct mds_obd *mds)
1427 {
1428         struct mds_server_data *msd = mds->mds_server_data;
1429         struct file *filp = mds->mds_rcvd_filp;
1430         loff_t off = 0;
1431         int rc;
1432
1433         msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
1434         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
1435
1436         CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_rcvd is %Lu\n",
1437                (unsigned long long)mds->mds_mount_count,
1438                (unsigned long long)mds->mds_last_rcvd);
1439         rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
1440         if (rc != sizeof(*msd)) {
1441                 CERROR("error writing MDS server data: rc = %d\n", rc);
1442                 if (rc > 0)
1443                         RETURN(-EIO);
1444                 RETURN(rc);
1445         }
1446 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1447         rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
1448 #else
1449         rc = file_fsync(filp,  filp->f_dentry, 1);
1450 #endif
1451         if (rc)
1452                 CERROR("error flushing MDS server data: rc = %d\n", rc);
1453
1454         return 0;
1455 }
1456
1457 /* Do recovery actions for the MDS */
1458 static int mds_recovery_complete(struct obd_device *obddev)
1459 {
1460         struct mds_obd *mds = &obddev->u.mds;
1461         struct obd_run_ctxt saved;
1462         int rc;
1463
1464         LASSERT(mds->mds_recoverable_clients == 0);
1465
1466         /* This happens at the end when recovery is complete */
1467         ++mds->mds_mount_count;
1468         push_ctxt(&saved, &mds->mds_ctxt, NULL);
1469         rc = mds_update_server_data(mds);
1470         pop_ctxt(&saved);
1471
1472         return rc;
1473 }
1474
1475 /* mount the file system (secretly) */
1476 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
1477 {
1478         struct obd_ioctl_data* data = buf;
1479         struct mds_obd *mds = &obddev->u.mds;
1480         struct vfsmount *mnt;
1481         int rc = 0;
1482         ENTRY;
1483
1484         MOD_INC_USE_COUNT;
1485 #ifdef CONFIG_DEV_RDONLY
1486         dev_clear_rdonly(2);
1487 #endif
1488         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1489                 GOTO(err_dec, rc = -EINVAL);
1490
1491         mds->mds_fstype = strdup(data->ioc_inlbuf2);
1492
1493         mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
1494         if (IS_ERR(mnt)) {
1495                 rc = PTR_ERR(mnt);
1496                 CERROR("do_kern_mount failed: rc = %d\n", rc);
1497                 GOTO(err_kfree, rc);
1498         }
1499
1500         CERROR("%s: mnt is %p\n", data->ioc_inlbuf1, mnt);
1501         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
1502         if (!mds->mds_sb)
1503                 GOTO(err_put, rc = -ENODEV);
1504
1505         init_MUTEX(&mds->mds_transno_sem);
1506         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
1507         rc = mds_fs_setup(obddev, mnt);
1508         if (rc) {
1509                 CERROR("MDS filesystem method init failed: rc = %d\n", rc);
1510                 GOTO(err_put, rc);
1511         }
1512
1513         obddev->obd_namespace =
1514                 ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
1515         if (obddev->obd_namespace == NULL) {
1516                 mds_cleanup(obddev);
1517                 GOTO(err_fs, rc = -ENOMEM);
1518         }
1519
1520         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1521                            "mds_ldlm_client", &obddev->obd_ldlm_client);
1522
1523         spin_lock_init(&mds->mds_processing_task_lock);
1524         mds->mds_processing_task = 0;
1525         INIT_LIST_HEAD(&mds->mds_recovery_queue);
1526         INIT_LIST_HEAD(&mds->mds_delayed_reply_queue);
1527         
1528         RETURN(0);
1529
1530 err_fs:
1531         mds_fs_cleanup(obddev);
1532 err_put:
1533         unlock_kernel();
1534         mntput(mds->mds_vfsmnt);
1535         mds->mds_sb = 0;
1536         lock_kernel();
1537 err_kfree:
1538         kfree(mds->mds_fstype);
1539 err_dec:
1540         MOD_DEC_USE_COUNT;
1541         RETURN(rc);
1542 }
1543
1544 static int mds_cleanup(struct obd_device *obddev)
1545 {
1546         struct super_block *sb;
1547         struct mds_obd *mds = &obddev->u.mds;
1548         struct obd_run_ctxt saved;
1549         ENTRY;
1550
1551         sb = mds->mds_sb;
1552         if (!mds->mds_sb)
1553                 RETURN(0);
1554
1555         push_ctxt(&saved, &mds->mds_ctxt, NULL);
1556         mds_update_server_data(mds);
1557
1558         if (mds->mds_rcvd_filp) {
1559                 int rc = filp_close(mds->mds_rcvd_filp, 0);
1560                 mds->mds_rcvd_filp = NULL;
1561
1562                 if (rc)
1563                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
1564         }
1565         pop_ctxt(&saved);
1566
1567         unlock_kernel();
1568         mntput(mds->mds_vfsmnt);
1569         mds->mds_sb = 0;
1570         kfree(mds->mds_fstype);
1571
1572         ldlm_namespace_free(obddev->obd_namespace);
1573
1574         lock_kernel();
1575 #ifdef CONFIG_DEV_RDONLY
1576         dev_clear_rdonly(2);
1577 #endif
1578         mds_fs_cleanup(obddev);
1579
1580         MOD_DEC_USE_COUNT;
1581         RETURN(0);
1582 }
1583
1584 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
1585                               ldlm_mode_t mode, int flags, void *data)
1586 {
1587         struct ptlrpc_request *req = req_cookie;
1588         int rc = 0;
1589         ENTRY;
1590
1591         if (!req_cookie)
1592                 RETURN(0);
1593
1594         if (req->rq_reqmsg->bufcount > 1) {
1595                 /* an intent needs to be considered */
1596                 struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
1597                 struct mds_obd *mds= &req->rq_export->exp_obd->u.mds;
1598                 struct mds_body *mds_rep;
1599                 struct ldlm_reply *rep;
1600                 __u64 new_resid[3] = {0, 0, 0}, old_res;
1601                 int rc, size[3] = {sizeof(struct ldlm_reply),
1602                                                   sizeof(struct mds_body),
1603                                                   mds->mds_max_mdsize};
1604
1605                 it->opc = NTOH__u64(it->opc);
1606
1607                 LDLM_DEBUG(lock, "intent policy, opc: %s",
1608                            ldlm_it2str(it->opc));
1609
1610                 rc = lustre_pack_msg(3, size, NULL, &req->rq_replen,
1611                                      &req->rq_repmsg);
1612                 if (rc) {
1613                         rc = req->rq_status = -ENOMEM;
1614                         RETURN(rc);
1615                 }
1616
1617                 rep = lustre_msg_buf(req->rq_repmsg, 0);
1618                 rep->lock_policy_res1 = 1;
1619
1620                 /* execute policy */
1621                 switch ((long)it->opc) {
1622                 case IT_CREAT|IT_OPEN:
1623                         rc = mds_reint(req, 2);
1624                         if (rc || (req->rq_status != 0 &&
1625                                    req->rq_status != -EEXIST)) {
1626                                 rep->lock_policy_res2 = req->rq_status;
1627                                 RETURN(ELDLM_LOCK_ABORTED);
1628                         }
1629                         break;
1630                 case IT_CREAT:
1631                 case IT_MKDIR:
1632                 case IT_MKNOD:
1633                 case IT_RENAME2:
1634                 case IT_LINK2:
1635                 case IT_RMDIR:
1636                 case IT_SYMLINK:
1637                 case IT_UNLINK:
1638                         rc = mds_reint(req, 2);
1639                         if (rc || (req->rq_status != 0 &&
1640                                    req->rq_status != -EISDIR &&
1641                                    req->rq_status != -ENOTDIR)) {
1642                                 rep->lock_policy_res2 = req->rq_status;
1643                                 RETURN(ELDLM_LOCK_ABORTED);
1644                         }
1645                         break;
1646                 case IT_GETATTR:
1647                 case IT_LOOKUP:
1648                 case IT_OPEN:
1649                 case IT_READDIR:
1650                 case IT_READLINK:
1651                 case IT_RENAME:
1652                 case IT_LINK:
1653                 case IT_SETATTR:
1654                         rc = mds_getattr_name(2, req);
1655                         /* FIXME: we need to sit down and decide on who should
1656                          * set req->rq_status, who should return negative and
1657                          * positive return values, and what they all mean. */
1658                         if (rc || req->rq_status != 0) {
1659                                 rep->lock_policy_res2 = req->rq_status;
1660                                 RETURN(ELDLM_LOCK_ABORTED);
1661                         }
1662                         break;
1663                 case IT_READDIR|IT_OPEN:
1664                         LBUG();
1665                         break;
1666                 default:
1667                         CERROR("Unhandled intent "LPD64"\n", it->opc);
1668                         LBUG();
1669                 }
1670
1671                 /* We don't bother returning a lock to the client for a file
1672                  * or directory we are removing.
1673                  *
1674                  * As for link and rename, there is no reason for the client
1675                  * to get a lock on the target at this point.  If they are
1676                  * going to modify the file/directory later they will get a
1677                  * lock at that time.
1678                  */
1679                 if (it->opc & (IT_UNLINK | IT_RMDIR | IT_LINK | IT_LINK2 |
1680                                IT_RENAME | IT_RENAME2))
1681                         RETURN(ELDLM_LOCK_ABORTED);
1682
1683                 rep->lock_policy_res2 = req->rq_status;
1684                 mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
1685
1686                 /* If the client is about to open a file that doesn't have an MD
1687                  * stripe record, it's going to need a write lock. */
1688                 if (it->opc & IT_OPEN &&
1689                     !(lustre_msg_get_op_flags(req->rq_reqmsg)&MDS_OPEN_HAS_EA)){
1690                         LDLM_DEBUG(lock, "open with no EA; returning PW lock");
1691                         lock->l_req_mode = LCK_PW;
1692                 }
1693
1694                 if (flags & LDLM_FL_INTENT_ONLY) {
1695                         LDLM_DEBUG(lock, "INTENT_ONLY, aborting lock");
1696                         RETURN(ELDLM_LOCK_ABORTED);
1697                 }
1698                 /* Give the client a lock on the child object, instead of the
1699                  * parent that it requested. */
1700                 new_resid[0] = NTOH__u32(mds_rep->ino);
1701                 new_resid[1] = NTOH__u32(mds_rep->generation);
1702                 if (new_resid[0] == 0)
1703                         LBUG();
1704                 old_res = lock->l_resource->lr_name[0];
1705
1706                 ldlm_lock_change_resource(lock, new_resid);
1707                 if (lock->l_resource == NULL) {
1708                         LBUG();
1709                         RETURN(-ENOMEM);
1710                 }
1711                 LDLM_DEBUG(lock, "intent policy, old res %ld",
1712                            (long)old_res);
1713                 RETURN(ELDLM_LOCK_CHANGED);
1714         } else {
1715                 int size = sizeof(struct ldlm_reply);
1716                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
1717                                      &req->rq_repmsg);
1718                 if (rc) {
1719                         LBUG();
1720                         RETURN(-ENOMEM);
1721                 }
1722         }
1723         RETURN(rc);
1724 }
1725
1726 int mds_attach(struct obd_device *dev, obd_count len, void *data)
1727 {
1728         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
1729 }
1730
1731 int mds_detach(struct obd_device *dev)
1732 {
1733         return lprocfs_dereg_obd(dev);
1734 }
1735
1736 static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf)
1737 {
1738         int i;
1739         //        struct obd_ioctl_data* data = buf;
1740         struct mds_obd *mds = &obddev->u.mds;
1741         int rc = 0;
1742         ENTRY;
1743
1744         MOD_INC_USE_COUNT;
1745
1746         mds->mds_service = ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
1747                                            MDS_BUFSIZE, MDS_MAXREQSIZE,
1748                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
1749                                            "self", mds_handle, "mds");
1750         if (!mds->mds_service) {
1751                 CERROR("failed to start service\n");
1752                 GOTO(err_dec, rc = -EINVAL);
1753         }
1754
1755         for (i = 0; i < MDT_NUM_THREADS; i++) {
1756                 char name[32];
1757                 sprintf(name, "ll_mdt_%02d", i);
1758                 rc = ptlrpc_start_thread(obddev, mds->mds_service, name);
1759                 if (rc) {
1760                         CERROR("cannot start MDT thread #%d: rc %d\n", i, rc);
1761                         GOTO(err_thread, rc);
1762                 }
1763         }
1764
1765         RETURN(0);
1766
1767 err_thread:
1768         ptlrpc_stop_all_threads(mds->mds_service);
1769         ptlrpc_unregister_service(mds->mds_service);
1770 err_dec:
1771         MOD_DEC_USE_COUNT;
1772         RETURN(rc);
1773 }
1774
1775
1776 static int mdt_cleanup(struct obd_device *obddev)
1777 {
1778         struct mds_obd *mds = &obddev->u.mds;
1779         ENTRY;
1780
1781         ptlrpc_stop_all_threads(mds->mds_service);
1782         ptlrpc_unregister_service(mds->mds_service);
1783
1784         MOD_DEC_USE_COUNT;
1785         RETURN(0);
1786 }
1787
1788 extern int mds_iocontrol(long cmd, struct lustre_handle *conn,
1789                          int len, void *karg, void *uarg);
1790
1791 /* use obd ops to offer management infrastructure */
1792 static struct obd_ops mds_obd_ops = {
1793         o_attach:      mds_attach,
1794         o_detach:      mds_detach,
1795         o_connect:     mds_connect,
1796         o_disconnect:  mds_disconnect,
1797         o_setup:       mds_setup,
1798         o_cleanup:     mds_cleanup,
1799         o_iocontrol:   mds_iocontrol
1800 };
1801
1802 static struct obd_ops mdt_obd_ops = {
1803         o_setup:       mdt_setup,
1804         o_cleanup:     mdt_cleanup,
1805 };
1806
1807
1808 static int __init mds_init(void)
1809 {
1810
1811         mds_file_cache = kmem_cache_create("ll_mds_file_data",
1812                                            sizeof(struct mds_file_data),
1813                                            0, 0, NULL, NULL);
1814         if (mds_file_cache == NULL)
1815                 return -ENOMEM;
1816
1817         class_register_type(&mds_obd_ops, status_class_var, LUSTRE_MDS_NAME);
1818         class_register_type(&mdt_obd_ops, 0, LUSTRE_MDT_NAME);
1819         ldlm_register_intent(ldlm_intent_policy);
1820         return 0;
1821
1822 }
1823
1824 static void __exit mds_exit(void)
1825 {
1826
1827
1828         ldlm_unregister_intent();
1829         class_unregister_type(LUSTRE_MDS_NAME);
1830         class_unregister_type(LUSTRE_MDT_NAME);
1831         if (kmem_cache_destroy(mds_file_cache))
1832                 CERROR("couldn't free MDS file cache\n");
1833
1834 }
1835
1836 MODULE_AUTHOR("Cluster File Systems <info@clusterfs.com>");
1837 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
1838 MODULE_LICENSE("GPL");
1839
1840 module_init(mds_init);
1841 module_exit(mds_exit);