Whamcloud - gitweb
Merge of b_md to HEAD:
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001, 2002 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of Lustre, http://www.lustre.org.
14  *
15  *   Lustre is free software; you can redistribute it and/or
16  *   modify it under the terms of version 2 of the GNU General Public
17  *   License as published by the Free Software Foundation.
18  *
19  *   Lustre is distributed in the hope that it will be useful,
20  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
21  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   GNU General Public License for more details.
23  *
24  *   You should have received a copy of the GNU General Public License
25  *   along with Lustre; if not, write to the Free Software
26  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_MDS
31
32 #include <linux/module.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_dlm.h>
35 #include <linux/init.h>
36 #include <linux/obd_class.h>
37 #include <linux/random.h>
38 #include <linux/locks.h>
39 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
40 #include <linux/buffer_head.h>
41 #endif
42 #include <linux/obd_lov.h>
43 #include <linux/lprocfs_status.h>
44
45 static kmem_cache_t *mds_file_cache;
46
47 extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count,
48                            obd_uuid_t *uuidarray);
49 extern int mds_get_lovdesc(struct mds_obd  *obd, struct lov_desc *desc);
50 extern void mds_start_transno(struct mds_obd *mds);
51 extern int mds_finish_transno(struct mds_obd *mds, void *handle,
52                               struct ptlrpc_request *req, int rc);
53 static int mds_cleanup(struct obd_device * obddev);
54
55 extern struct lprocfs_vars status_var_nm_1[];
56 extern struct lprocfs_vars status_class_var[];
57
58 inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
59 {
60         return &req->rq_export->exp_obd->u.mds;
61 }
62
63 static int mds_bulk_timeout(void *data)
64 {
65         struct ptlrpc_bulk_desc *desc = data;
66
67         ENTRY;
68         recovd_conn_fail(desc->bd_connection);
69         RETURN(1);
70 }
71
72 /* Assumes caller has already pushed into the kernel filesystem context */
73 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
74                         __u64 offset)
75 {
76         int rc = 0;
77         struct mds_obd *mds = mds_req2mds(req);
78         struct ptlrpc_bulk_desc *desc;
79         struct ptlrpc_bulk_page *bulk;
80         struct l_wait_info lwi;
81         char *buf;
82         ENTRY;
83
84         desc = ptlrpc_prep_bulk(req->rq_connection);
85         if (desc == NULL)
86                 GOTO(out, rc = -ENOMEM);
87
88         bulk = ptlrpc_prep_bulk_page(desc);
89         if (bulk == NULL)
90                 GOTO(cleanup_bulk, rc = -ENOMEM);
91
92         OBD_ALLOC(buf, PAGE_SIZE);
93         if (buf == NULL)
94                 GOTO(cleanup_bulk, rc = -ENOMEM);
95
96         rc = mds_fs_readpage(mds, file, buf, PAGE_SIZE, (loff_t *)&offset);
97
98         if (rc != PAGE_SIZE)
99                 GOTO(cleanup_buf, rc = -EIO);
100
101         bulk->bp_xid = req->rq_xid;
102         bulk->bp_buf = buf;
103         bulk->bp_buflen = PAGE_SIZE;
104         desc->bd_portal = MDS_BULK_PORTAL;
105
106         rc = ptlrpc_send_bulk(desc);
107         if (rc)
108                 GOTO(cleanup_buf, rc);
109
110         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
111                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
112                        OBD_FAIL_MDS_SENDPAGE, rc);
113                 ptlrpc_abort_bulk(desc);
114                 GOTO(cleanup_buf, rc);
115         }
116
117         lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc);
118         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT,
119                           &lwi);
120         if (rc) {
121                 if (rc != -ETIMEDOUT)
122                         LBUG();
123                 GOTO(cleanup_buf, rc);
124         }
125
126         EXIT;
127  cleanup_buf:
128         OBD_FREE(buf, PAGE_SIZE);
129  cleanup_bulk:
130         ptlrpc_free_bulk(desc);
131  out:
132         return rc;
133 }
134
135 /*
136  * Look up a named entry in a directory, and get an LDLM lock on it.
137  * 'dir' is a inode for which an LDLM lock has already been taken.
138  *
139  * If we do not need an exclusive or write lock on this entry (e.g.
140  * a read lock for attribute lookup only) then we do not hold the
141  * directory semaphore on return.  It is up to the caller to know what
142  * type of lock it is getting, and clean up appropriately.
143  */
144 struct dentry *mds_name2locked_dentry(struct obd_device *obd,
145                                       struct dentry *dir, struct vfsmount **mnt,
146                                       char *name, int namelen, int lock_mode,
147                                       struct lustre_handle *lockh,
148                                       int dir_lock_mode)
149 {
150         struct dentry *dchild;
151         int flags = 0, rc;
152         __u64 res_id[3] = {0};
153         ENTRY;
154
155         down(&dir->d_inode->i_sem);
156         dchild = lookup_one_len(name, dir, namelen);
157         if (IS_ERR(dchild)) {
158                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
159                 up(&dir->d_inode->i_sem);
160                 LBUG();
161                 RETURN(dchild);
162         }
163         if (dir_lock_mode != LCK_EX && dir_lock_mode != LCK_PW) {
164                 up(&dir->d_inode->i_sem);
165                 ldlm_lock_decref(lockh, dir_lock_mode);
166         }
167
168         if (lock_mode == 0 || !dchild->d_inode)
169                 RETURN(dchild);
170
171         res_id[0] = dchild->d_inode->i_ino;
172         res_id[1] = dchild->d_inode->i_generation;
173         rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL,
174                                    res_id, LDLM_PLAIN, NULL, 0, lock_mode,
175                                    &flags, ldlm_completion_ast,
176                                    mds_blocking_ast, NULL, 0, lockh);
177         if (rc != ELDLM_OK) {
178                 l_dput(dchild);
179                 up(&dir->d_inode->i_sem);
180                 RETURN(ERR_PTR(-ENOLCK)); /* XXX translate ldlm code */
181         }
182
183         RETURN(dchild);
184 }
185
186 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
187                                      struct vfsmount **mnt, int lock_mode,
188                                      struct lustre_handle *lockh)
189 {
190         struct mds_obd *mds = &obd->u.mds;
191         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
192         int flags = 0, rc;
193         __u64 res_id[3] = {0};
194         ENTRY;
195
196         if (IS_ERR(de))
197                 RETURN(de);
198
199         res_id[0] = de->d_inode->i_ino;
200         res_id[1] = de->d_inode->i_generation;
201         rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL,
202                                    res_id, LDLM_PLAIN, NULL, 0, lock_mode,
203                                    &flags, ldlm_completion_ast,
204                                    mds_blocking_ast, NULL, 0, lockh);
205         if (rc != ELDLM_OK) {
206                 l_dput(de);
207                 retval = ERR_PTR(-ENOLCK); /* XXX translate ldlm code */
208         }
209
210         RETURN(retval);
211 }
212
213 #ifndef DCACHE_DISCONNECTED
214 #define DCACHE_DISCONNECTED DCACHE_NFSD_DISCONNECTED
215 #endif
216
217 /* Look up an entry by inode number. */
218 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
219                               struct vfsmount **mnt)
220 {
221         /* stolen from NFS */
222         struct super_block *sb = mds->mds_sb;
223         unsigned long ino = fid->id;
224         __u32 generation = fid->generation;
225         struct inode *inode;
226         struct list_head *lp;
227         struct dentry *result;
228
229         if (ino == 0)
230                 RETURN(ERR_PTR(-ESTALE));
231
232         inode = iget(sb, ino);
233         if (inode == NULL)
234                 RETURN(ERR_PTR(-ENOMEM));
235
236         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
237
238         if (is_bad_inode(inode) ||
239             (generation && inode->i_generation != generation)) {
240                 /* we didn't find the right inode.. */
241                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
242                        inode->i_ino, inode->i_nlink,
243                        atomic_read(&inode->i_count), inode->i_generation,
244                        generation);
245                 iput(inode);
246                 RETURN(ERR_PTR(-ENOENT));
247         }
248
249         /* now to find a dentry. If possible, get a well-connected one */
250         if (mnt)
251                 *mnt = mds->mds_vfsmnt;
252         spin_lock(&dcache_lock);
253         list_for_each(lp, &inode->i_dentry) {
254                 result = list_entry(lp, struct dentry, d_alias);
255                 if (!(result->d_flags & DCACHE_DISCONNECTED)) {
256                         dget_locked(result);
257                         result->d_vfs_flags |= DCACHE_REFERENCED;
258                         spin_unlock(&dcache_lock);
259                         iput(inode);
260                         if (mnt)
261                                 mntget(*mnt);
262                         return result;
263                 }
264         }
265         spin_unlock(&dcache_lock);
266         result = d_alloc_root(inode);
267         if (result == NULL) {
268                 iput(inode);
269                 return ERR_PTR(-ENOMEM);
270         }
271         if (mnt)
272                 mntget(*mnt);
273         result->d_flags |= DCACHE_DISCONNECTED;
274         return result;
275 }
276
277 /* Establish a connection to the MDS.
278  *
279  * This will set up an export structure for the client to hold state data
280  * about that client, like open files, the last operation number it did
281  * on the server, etc.
282  */
283 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
284                        obd_uuid_t cluuid, struct recovd_obd *recovd,
285                        ptlrpc_recovery_cb_t recover)
286 {
287         struct obd_export *exp;
288         struct mds_export_data *med;
289         struct mds_client_data *mcd;
290         struct list_head *p;
291         int rc;
292         ENTRY;
293
294         if (!conn || !obd || !cluuid)
295                 RETURN(-EINVAL);
296
297         MOD_INC_USE_COUNT;
298
299         spin_lock(&obd->obd_dev_lock);
300         list_for_each(p, &obd->obd_exports) {
301                 exp = list_entry(p, struct obd_export, exp_obd_chain);
302                 mcd = exp->exp_mds_data.med_mcd;
303                 if (!mcd) {
304                         CERROR("FYI: NULL mcd - simultaneous connects\n");
305                         continue;
306                 }
307                 if (!memcmp(cluuid, mcd->mcd_uuid, sizeof mcd->mcd_uuid)) {
308                         /* XXX make handle-found-export a subroutine */
309                         LASSERT(exp->exp_obd == obd);
310
311                         spin_unlock(&obd->obd_dev_lock);
312                         if (exp->exp_connection) {
313                                 struct lustre_handle *hdl;
314                                 hdl = &exp->exp_ldlm_data.led_import.imp_handle;
315                                 /* Might be a re-connect after a partition. */
316                                 if (!memcmp(conn, hdl, sizeof *conn)) {
317                                         CERROR("%s reconnecting\n", cluuid);
318                                         conn->addr = (__u64) (unsigned long)exp;
319                                         conn->cookie = exp->exp_cookie;
320                                         rc = EALREADY;
321                                 } else {
322                                         CERROR("%s reconnecting from %s, "
323                                                "handle mismatch (ours %Lx/%Lx, "
324                                                "theirs %Lx/%Lx)\n", cluuid,
325                                                exp->exp_connection->
326                                                c_remote_uuid, hdl->addr,
327                                                hdl->cookie, conn->addr,
328                                                conn->cookie);
329                                         /* XXX disconnect them here? */
330                                         memset(conn, 0, sizeof *conn);
331                                         rc = -EALREADY;
332                                 }
333                                 MOD_DEC_USE_COUNT;
334                                 RETURN(rc);
335                         }
336                         conn->addr = (__u64) (unsigned long)exp;
337                         conn->cookie = exp->exp_cookie;
338                         CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n",
339                                cluuid, exp);
340                         CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n",
341                                (long long)conn->addr, (long long)conn->cookie);
342                         RETURN(0);
343                 }
344         }
345         spin_unlock(&obd->obd_dev_lock);
346
347         if (obd->u.mds.mds_recoverable_clients != 0) {
348                 CERROR("denying connection for new client %s: in recovery\n",
349                        cluuid);
350                 MOD_DEC_USE_COUNT;
351                 RETURN(-EBUSY);
352         }
353
354         /* XXX There is a small race between checking the list and adding a
355          * new connection for the same UUID, but the real threat (list
356          * corruption when multiple different clients connect) is solved.
357          *
358          * There is a second race between adding the export to the list,
359          * and filling in the client data below.  Hence skipping the case
360          * of NULL mcd above.  We should already be controlling multiple
361          * connects at the client, and we can't hold the spinlock over
362          * memory allocations without risk of deadlocking.
363          */
364         rc = class_connect(conn, obd, cluuid);
365         if (rc)
366                 GOTO(out_dec, rc);
367         exp = class_conn2export(conn);
368         LASSERT(exp);
369         med = &exp->exp_mds_data;
370
371         OBD_ALLOC(mcd, sizeof(*mcd));
372         if (!mcd) {
373                 CERROR("mds: out of memory for client data\n");
374                 GOTO(out_export, rc = -ENOMEM);
375         }
376
377         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
378         med->med_mcd = mcd;
379
380         INIT_LIST_HEAD(&med->med_open_head);
381         spin_lock_init(&med->med_open_lock);
382
383         rc = mds_client_add(&obd->u.mds, med, -1);
384         if (rc)
385                 GOTO(out_mcd, rc);
386
387         RETURN(0);
388
389 out_mcd:
390         OBD_FREE(mcd, sizeof(*mcd));
391 out_export:
392         class_disconnect(conn);
393 out_dec:
394         MOD_DEC_USE_COUNT;
395
396         return rc;
397 }
398
399 /* Call with med->med_open_lock held, please. */
400 inline int mds_close_mfd(struct mds_file_data *mfd, struct mds_export_data *med)
401 {
402         struct file *file = mfd->mfd_file;
403         LASSERT(file->private_data == mfd);
404
405         list_del(&mfd->mfd_list);
406         mfd->mfd_servercookie = DEAD_HANDLE_MAGIC;
407         kmem_cache_free(mds_file_cache, mfd);
408
409         return filp_close(file, 0);
410 }
411
412 static int mds_disconnect(struct lustre_handle *conn)
413 {
414         struct obd_export *export = class_conn2export(conn);
415         struct list_head *tmp, *n;
416         struct mds_export_data *med = &export->exp_mds_data;
417         int rc;
418         ENTRY;
419
420         /*
421          * Close any open files.
422          */
423         spin_lock(&med->med_open_lock);
424         list_for_each_safe(tmp, n, &med->med_open_head) {
425                 struct mds_file_data *mfd =
426                         list_entry(tmp, struct mds_file_data, mfd_list);
427                 rc = mds_close_mfd(mfd, med);
428                 if (rc) {
429                         /* XXX better diagnostics, with file path and stuff */
430                         CDEBUG(D_INODE, "Error %d closing mfd %p\n", rc, mfd);
431                 }
432         }
433         spin_unlock(&med->med_open_lock);
434
435         ldlm_cancel_locks_for_export(export);
436         mds_client_free(export);
437
438         rc = class_disconnect(conn);
439         if (!rc)
440                 MOD_DEC_USE_COUNT;
441
442         RETURN(rc);
443 }
444
445 /*
446  * XXX This is NOT guaranteed to flush all transactions to disk (even though
447  *     it is equivalent to calling sync()) because it only _starts_ the flush
448  *     and does not wait for completion.  It's better than nothing though.
449  *     What we really want is a mild form of fsync_dev_lockfs(), but it is
450  *     non-standard, or enabling do_sync_supers in ext3, just for this call.
451  */
452 static void mds_fsync_super(struct super_block *sb)
453 {
454         lock_kernel();
455         lock_super(sb);
456         if (sb->s_dirt && sb->s_op && sb->s_op->write_super)
457                 sb->s_op->write_super(sb);
458         unlock_super(sb);
459         unlock_kernel();
460 }
461
462 static int mds_getstatus(struct ptlrpc_request *req)
463 {
464         struct mds_obd *mds = mds_req2mds(req);
465         struct mds_body *body;
466         int rc, size = sizeof(*body);
467         ENTRY;
468
469         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
470         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
471                 CERROR("mds: out of memory for message: size=%d\n", size);
472                 req->rq_status = -ENOMEM;
473                 RETURN(0);
474         }
475
476         /* Flush any outstanding transactions to disk so the client will
477          * get the latest last_committed value and can drop their local
478          * requests if they have any.  This would be fsync_super() if it
479          * was exported.
480          */
481         mds_fsync_super(mds->mds_sb);
482
483         body = lustre_msg_buf(req->rq_repmsg, 0);
484         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
485
486         /* the last_committed and last_xid fields are filled in for all
487          * replies already - no need to do so here also.
488          */
489         RETURN(0);
490 }
491
492 static int mds_getlovinfo(struct ptlrpc_request *req)
493 {
494         struct mds_obd *mds = mds_req2mds(req);
495         struct mds_status_req *streq;
496         struct lov_desc *desc;
497         int tgt_count;
498         int rc, size[2] = {sizeof(*desc)};
499         ENTRY;
500
501         streq = lustre_msg_buf(req->rq_reqmsg, 0);
502         streq->flags = NTOH__u32(streq->flags);
503         streq->repbuf = NTOH__u32(streq->repbuf);
504         size[1] = streq->repbuf;
505
506         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
507         if (rc) {
508                 CERROR("mds: out of memory for message: size=%d\n", size[1]);
509                 req->rq_status = -ENOMEM;
510                 RETURN(0);
511         }
512
513         desc = lustre_msg_buf(req->rq_repmsg, 0);
514         rc = mds_get_lovdesc(mds, desc);
515         if (rc) {
516                 req->rq_status = rc;
517                 RETURN(0);
518         }
519
520         tgt_count = le32_to_cpu(desc->ld_tgt_count);
521         if (tgt_count * sizeof(obd_uuid_t) > streq->repbuf) {
522                 CERROR("too many targets, enlarge client buffers\n");
523                 req->rq_status = -ENOSPC;
524                 RETURN(0);
525         }
526
527         /* XXX the MDS should not really know about this */
528         mds->mds_max_mdsize = lov_mds_md_size(tgt_count);
529         rc = mds_get_lovtgts(mds, tgt_count,
530                              lustre_msg_buf(req->rq_repmsg, 1));
531         if (rc) {
532                 CERROR("get_lovtgts error %d\n", rc);
533                 req->rq_status = rc;
534                 RETURN(0);
535         }
536         RETURN(0);
537 }
538
539 int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
540                      void *data, __u32 data_len, int flag)
541 {
542         int do_ast;
543         ENTRY;
544
545         if (flag == LDLM_CB_CANCELING) {
546                 /* Don't need to do anything here. */
547                 RETURN(0);
548         }
549
550         /* XXX layering violation!  -phil */
551         l_lock(&lock->l_resource->lr_namespace->ns_lock);
552         lock->l_flags |= LDLM_FL_CBPENDING;
553         do_ast = (!lock->l_readers && !lock->l_writers);
554         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
555
556         if (do_ast) {
557                 struct lustre_handle lockh;
558                 int rc;
559
560                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
561                 ldlm_lock2handle(lock, &lockh);
562                 rc = ldlm_cli_cancel(&lockh);
563                 if (rc < 0)
564                         CERROR("ldlm_cli_cancel: %d\n", rc);
565         } else
566                 LDLM_DEBUG(lock, "Lock still has references, will be"
567                            "cancelled later");
568         RETURN(0);
569 }
570
571 int mds_pack_md(struct mds_obd *mds, struct ptlrpc_request *req,
572                 int offset, struct mds_body *body, struct inode *inode)
573 {
574         struct lov_mds_md *lmm;
575         int lmm_size = req->rq_repmsg->buflens[offset];
576         int rc;
577
578         if (lmm_size == 0) {
579                 CDEBUG(D_INFO, "no space reserved for inode %u MD\n", inode->i_ino);
580                 RETURN(0);
581         }
582
583         lmm = lustre_msg_buf(req->rq_repmsg, offset);
584
585         /* I don't really like this, but it is a sanity check on the client
586          * MD request.  However, if the client doesn't know how much space
587          * to reserve for the MD, this shouldn't be fatal either...
588          */
589         if (lmm_size > mds->mds_max_mdsize) {
590                 CERROR("Reading MD for inode %u of %d bytes > max %d\n",
591                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
592                 // RETURN(-EINVAL);
593         }
594
595         /* We don't need to store the reply size, because this buffer is
596          * discarded right after unpacking, and the LOV can figure out the
597          * size itself from the ost count.
598          */
599         if ((rc = mds_fs_get_md(mds, inode, lmm, lmm_size)) < 0) {
600                 CDEBUG(D_INFO, "No md for ino %u: rc = %d\n", inode->i_ino, rc);
601         } else if (rc > 0) {
602                 body->valid |= OBD_MD_FLEASIZE;
603                 rc = 0;
604         }
605
606         return rc;
607 }
608
609 static int mds_getattr_internal(struct mds_obd *mds, struct dentry *dentry,
610                                 struct ptlrpc_request *req,
611                                 struct mds_body *reqbody, int reply_off)
612 {
613         struct mds_body *body;
614         struct inode *inode = dentry->d_inode;
615         int rc = 0;
616         ENTRY;
617
618         if (inode == NULL)
619                 RETURN(-ENOENT);
620
621         body = lustre_msg_buf(req->rq_repmsg, reply_off);
622
623         mds_pack_inode2fid(&body->fid1, inode);
624         mds_pack_inode2body(body, inode);
625
626         if (S_ISREG(inode->i_mode)) {
627                 rc = mds_pack_md(mds, req, reply_off + 1, body, inode);
628         } else if (S_ISLNK(inode->i_mode) && reqbody->valid & OBD_MD_LINKNAME) {
629                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1);
630                 int len = req->rq_repmsg->buflens[reply_off + 1];
631
632                 rc = inode->i_op->readlink(dentry, symname, len);
633                 if (rc < 0) {
634                         CERROR("readlink failed: %d\n", rc);
635                 } else {
636                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
637                         body->valid |= OBD_MD_LINKNAME;
638                 }
639         }
640         RETURN(rc);
641 }
642
643 static int mds_getattr_name(int offset, struct ptlrpc_request *req)
644 {
645         struct mds_obd *mds = mds_req2mds(req);
646         struct obd_device *obd = req->rq_export->exp_obd;
647         struct obd_run_ctxt saved;
648         struct mds_body *body;
649         struct dentry *de = NULL, *dchild = NULL;
650         struct inode *dir;
651         struct lustre_handle lockh;
652         char *name;
653         int namelen, flags = 0, lock_mode, rc = 0;
654         struct obd_ucred uc;
655         __u64 res_id[3] = {0, 0, 0};
656         ENTRY;
657
658         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
659
660         if (req->rq_reqmsg->bufcount <= offset + 1) {
661                 LBUG();
662                 GOTO(out_pre_de, rc = -EINVAL);
663         }
664
665         body = lustre_msg_buf(req->rq_reqmsg, offset);
666         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
667         namelen = req->rq_reqmsg->buflens[offset + 1];
668         /* requests were at offset 2, replies go back at 1 */
669         if (offset)
670                 offset = 1;
671
672         uc.ouc_fsuid = body->fsuid;
673         uc.ouc_fsgid = body->fsgid;
674         uc.ouc_cap = body->capability;
675         push_ctxt(&saved, &mds->mds_ctxt, &uc);
676         de = mds_fid2dentry(mds, &body->fid1, NULL);
677         if (IS_ERR(de)) {
678                 GOTO(out_pre_de, rc = -ENOENT);
679         }
680
681         dir = de->d_inode;
682         CDEBUG(D_INODE, "parent ino %ld, name %*s\n", dir->i_ino,namelen,name);
683
684         lock_mode = LCK_PR;
685         res_id[0] = dir->i_ino;
686         res_id[1] = dir->i_generation;
687
688         rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
689                              NULL, 0, lock_mode, &lockh);
690         if (rc == 0) {
691                 LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]);
692                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
693                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
694                                       &flags, ldlm_completion_ast,
695                                       mds_blocking_ast, NULL, 0, &lockh);
696                 if (rc != ELDLM_OK) {
697                         CERROR("lock enqueue: err: %d\n", rc);
698                         GOTO(out_create_de, rc = -EIO);
699                 }
700         }
701         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
702
703         down(&dir->i_sem);
704         dchild = lookup_one_len(name, de, namelen - 1);
705         if (IS_ERR(dchild)) {
706                 CDEBUG(D_INODE, "child lookup error %ld\n", PTR_ERR(dchild));
707                 up(&dir->i_sem);
708                 GOTO(out_create_dchild, rc = PTR_ERR(dchild));
709         }
710
711         rc = mds_getattr_internal(mds, dchild, req, body, offset);
712
713         EXIT;
714 out_create_dchild:
715         l_dput(dchild);
716         up(&dir->i_sem);
717         ldlm_lock_decref(&lockh, lock_mode);
718 out_create_de:
719         l_dput(de);
720 out_pre_de:
721         req->rq_status = rc;
722         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
723         return 0;
724 }
725
726 static int mds_getattr(int offset, struct ptlrpc_request *req)
727 {
728         struct mds_obd *mds = mds_req2mds(req);
729         struct obd_run_ctxt saved;
730         struct dentry *de;
731         struct inode *inode;
732         struct mds_body *body;
733         struct obd_ucred uc;
734         int rc = 0, size[2] = {sizeof(*body)}, bufcount = 1;
735         ENTRY;
736
737         body = lustre_msg_buf(req->rq_reqmsg, offset);
738         uc.ouc_fsuid = body->fsuid;
739         uc.ouc_fsgid = body->fsgid;
740         uc.ouc_cap = body->capability;
741         push_ctxt(&saved, &mds->mds_ctxt, &uc);
742         de = mds_fid2dentry(mds, &body->fid1, NULL);
743         if (IS_ERR(de)) {
744                 rc = req->rq_status = -ENOENT;
745                 GOTO(out_pop, PTR_ERR(de));
746         }
747
748         inode = de->d_inode;
749         if (S_ISREG(body->fid1.f_type)) {
750                 int rc = mds_fs_get_md(mds, inode, NULL, 0);
751                 CDEBUG(D_INODE, "got %d bytes MD data for inode %u\n",
752                        rc, inode->i_ino);
753                 if (rc < 0) {
754                         if (rc != -ENODATA)
755                                 CERROR("error getting inode %u MD: rc = %d\n",
756                                        inode->i_ino, rc);
757                         size[bufcount] = 0;
758                 } else if (rc > mds->mds_max_mdsize) {
759                         size[bufcount] = 0;
760                         CERROR("MD size %d larger than maximum possible %u\n",
761                                rc, mds->mds_max_mdsize);
762                 } else
763                         size[bufcount] = rc;
764                 bufcount++;
765         } else if (body->valid & OBD_MD_LINKNAME) {
766                 size[bufcount] = MIN(inode->i_size + 1, body->size);
767                 bufcount++;
768                 CDEBUG(D_INODE, "symlink size: %d, reply space: %d\n",
769                        inode->i_size + 1, body->size);
770         }
771
772         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
773                 CERROR("failed MDS_GETATTR_PACK test\n");
774                 req->rq_status = -ENOMEM;
775                 GOTO(out, rc = -ENOMEM);
776         }
777
778         rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
779                              &req->rq_repmsg);
780         if (rc) {
781                 CERROR("out of memoryK\n");
782                 req->rq_status = rc;
783                 GOTO(out, rc);
784         }
785
786         req->rq_status = mds_getattr_internal(mds, de, req, body, 0);
787
788 out:
789         l_dput(de);
790 out_pop:
791         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
792         RETURN(rc);
793 }
794
795 static int mds_statfs(struct ptlrpc_request *req)
796 {
797         struct mds_obd *mds = mds_req2mds(req);
798         struct obd_statfs *osfs;
799         struct statfs sfs;
800         int rc, size = sizeof(*osfs);
801         ENTRY;
802
803         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
804         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
805                 CERROR("mds: statfs lustre_pack_msg failed: rc = %d\n", rc);
806                 GOTO(out, rc);
807         }
808
809         rc = mds_fs_statfs(mds, &sfs);
810         if (rc) {
811                 CERROR("mds: statfs failed: rc %d\n", rc);
812                 GOTO(out, rc);
813         }
814         osfs = lustre_msg_buf(req->rq_repmsg, 0);
815         memset(osfs, 0, size);
816         statfs_pack(osfs, &sfs);
817         obd_statfs_pack(osfs, osfs);
818
819 out:
820         req->rq_status = rc;
821         RETURN(0);
822 }
823
824 static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle)
825 {
826         struct mds_file_data *mfd = NULL;
827
828         if (!handle || !handle->addr)
829                 RETURN(NULL);
830
831         mfd = (struct mds_file_data *)(unsigned long)(handle->addr);
832         if (!kmem_cache_validate(mds_file_cache, mfd))
833                 RETURN(NULL);
834
835         if (mfd->mfd_servercookie != handle->cookie)
836                 RETURN(NULL);
837
838         return mfd;
839 }
840
841 static int mds_store_md(struct mds_obd *mds, struct ptlrpc_request *req,
842                         int offset, struct mds_body *body, struct inode *inode)
843 {
844         struct lov_mds_md *lmm = lustre_msg_buf(req->rq_reqmsg, offset);
845         int lmm_size = req->rq_reqmsg->buflens[offset];
846         struct obd_run_ctxt saved;
847         struct obd_ucred uc;
848         void *handle;
849         int rc, rc2;
850         ENTRY;
851
852         /* I don't really like this, but it is a sanity check on the client
853          * MD request.
854          */
855         if (lmm_size > mds->mds_max_mdsize) {
856                 CERROR("Saving MD for inode %u of %d bytes > max %d\n",
857                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
858                 //RETURN(-EINVAL);
859         }
860
861         CDEBUG(D_INODE, "storing %d bytes MD for inode %u\n",
862                lmm_size, inode->i_ino);
863         uc.ouc_fsuid = body->fsuid;
864         uc.ouc_fsgid = body->fsgid;
865         uc.ouc_cap = body->capability;
866         push_ctxt(&saved, &mds->mds_ctxt, &uc);
867         mds_start_transno(mds);
868         handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR);
869         if (IS_ERR(handle)) {
870                 rc = PTR_ERR(handle);
871                 mds_finish_transno(mds, handle, req, rc);
872                 GOTO(out_ea, rc);
873         }
874
875         rc = mds_fs_set_md(mds, inode, handle, lmm, lmm_size);
876         rc = mds_finish_transno(mds, handle, req, rc);
877
878         rc2 = mds_fs_commit(mds, inode, handle);
879         if (rc2 && !rc)
880                 rc = rc2;
881 out_ea:
882         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
883
884         RETURN(rc);
885 }
886
887 static int mds_open(struct ptlrpc_request *req)
888 {
889         struct mds_obd *mds = mds_req2mds(req);
890         struct mds_body *body;
891         struct mds_export_data *med;
892         struct mds_file_data *mfd;
893         struct dentry *de;
894         struct file *file;
895         struct vfsmount *mnt;
896         __u32 flags;
897         struct list_head *tmp;
898         int rc, size = sizeof(*body);
899         ENTRY;
900
901         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
902                 CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n");
903                 req->rq_status = -ENOMEM;
904                 RETURN(-ENOMEM);
905         }
906
907         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
908         if (rc) {
909                 CERROR("mds: pack error: rc = %d\n", rc);
910                 req->rq_status = rc;
911                 RETURN(rc);
912         }
913
914         body = lustre_msg_buf(req->rq_reqmsg, 0);
915
916         /* was this animal open already and the client lost the reply? */
917         /* XXX need some way to detect a reopen, to avoid locked list walks */
918         med = &req->rq_export->exp_mds_data;
919         spin_lock(&med->med_open_lock);
920         list_for_each(tmp, &med->med_open_head) {
921                 mfd = list_entry(tmp, typeof(*mfd), mfd_list);
922                 if (!memcmp(&mfd->mfd_clienthandle, &body->handle,
923                             sizeof(mfd->mfd_clienthandle)) &&
924                     body->fid1.id == mfd->mfd_file->f_dentry->d_inode->i_ino) {
925                         de = mfd->mfd_file->f_dentry;
926                         spin_unlock(&med->med_open_lock);
927                         CERROR("Re opening "LPD64"\n", body->fid1.id);
928                         GOTO(out_pack, rc = 0);
929                 }
930         }
931         spin_unlock(&med->med_open_lock);
932
933         mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL);
934         if (!mfd) {
935                 CERROR("mds: out of memory\n");
936                 req->rq_status = -ENOMEM;
937                 RETURN(0);
938         }
939
940         de = mds_fid2dentry(mds, &body->fid1, &mnt);
941         if (IS_ERR(de))
942                 GOTO(out_free, rc = PTR_ERR(de));
943
944         /* check if this inode has seen a delayed object creation */
945         if (lustre_msg_get_op_flags(req->rq_reqmsg) & MDS_OPEN_HAS_EA) {
946                 rc = mds_store_md(mds, req, 1, body, de->d_inode);
947                 if (rc) {
948                         l_dput(de);
949                         mntput(mnt);
950                         GOTO(out_free, rc);
951                 }
952         }
953
954         flags = body->flags;
955         /* dentry_open does a dput(de) and mntput(mnt) on error */
956         file = dentry_open(de, mnt, flags & ~O_DIRECT);
957         if (IS_ERR(file)) {
958                 rc = PTR_ERR(file);
959                 GOTO(out_free, 0);
960         }
961
962         file->private_data = mfd;
963         mfd->mfd_file = file;
964         memcpy(&mfd->mfd_clienthandle, &body->handle, sizeof(body->handle));
965         get_random_bytes(&mfd->mfd_servercookie, sizeof(mfd->mfd_servercookie));
966         spin_lock(&med->med_open_lock);
967         list_add(&mfd->mfd_list, &med->med_open_head);
968         spin_unlock(&med->med_open_lock);
969
970 out_pack:
971         body = lustre_msg_buf(req->rq_repmsg, 0);
972         mds_pack_inode2fid(&body->fid1, de->d_inode);
973         mds_pack_inode2body(body, de->d_inode);
974         body->handle.addr = (__u64)(unsigned long)mfd;
975         body->handle.cookie = mfd->mfd_servercookie;
976         CDEBUG(D_INODE, "llite file "LPX64": addr %p, cookie "LPX64"\n",
977                mfd->mfd_clienthandle.addr, mfd, mfd->mfd_servercookie);
978         RETURN(0);
979
980 out_free:
981         mfd->mfd_servercookie = DEAD_HANDLE_MAGIC;
982         kmem_cache_free(mds_file_cache, mfd);
983         req->rq_status = rc;
984         RETURN(0);
985 }
986
987 static int mds_close(struct ptlrpc_request *req)
988 {
989         struct mds_export_data *med = &req->rq_export->exp_mds_data;
990         struct mds_body *body;
991         struct mds_file_data *mfd;
992         int rc;
993         ENTRY;
994
995         body = lustre_msg_buf(req->rq_reqmsg, 0);
996
997         mfd = mds_handle2mfd(&body->handle);
998         if (!mfd) {
999                 CERROR("no handle for file close "LPD64
1000                        ": addr "LPX64", cookie "LPX64"\n",
1001                        body->fid1.id, body->handle.addr, body->handle.cookie);
1002                 RETURN(-ESTALE);
1003         }
1004
1005         spin_lock(&med->med_open_lock);
1006         req->rq_status = mds_close_mfd(mfd, med);
1007         spin_unlock(&med->med_open_lock);
1008
1009         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
1010                 CERROR("test case OBD_FAIL_MDS_CLOSE_PACK\n");
1011                 req->rq_status = -ENOMEM;
1012                 RETURN(-ENOMEM);
1013         }
1014
1015         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
1016         if (rc) {
1017                 CERROR("mds: lustre_pack_msg: rc = %d\n", rc);
1018                 req->rq_status = rc;
1019         }
1020
1021         RETURN(0);
1022 }
1023
1024 static int mds_readpage(struct ptlrpc_request *req)
1025 {
1026         struct mds_obd *mds = mds_req2mds(req);
1027         struct vfsmount *mnt;
1028         struct dentry *de;
1029         struct file *file;
1030         struct mds_body *body, *repbody;
1031         struct obd_run_ctxt saved;
1032         int rc, size = sizeof(*body);
1033         struct obd_ucred uc;
1034         ENTRY;
1035
1036         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
1037         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
1038                 CERROR("mds: out of memory\n");
1039                 GOTO(out, rc = -ENOMEM);
1040         }
1041
1042         body = lustre_msg_buf(req->rq_reqmsg, 0);
1043         uc.ouc_fsuid = body->fsuid;
1044         uc.ouc_fsgid = body->fsgid;
1045         uc.ouc_cap = body->capability;
1046         push_ctxt(&saved, &mds->mds_ctxt, &uc);
1047         de = mds_fid2dentry(mds, &body->fid1, &mnt);
1048         if (IS_ERR(de))
1049                 GOTO(out_pop, rc = PTR_ERR(de));
1050
1051         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
1052
1053         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1054         /* note: in case of an error, dentry_open puts dentry */
1055         if (IS_ERR(file))
1056                 GOTO(out_pop, rc = PTR_ERR(file));
1057
1058         repbody = lustre_msg_buf(req->rq_repmsg, 0);
1059         repbody->size = file->f_dentry->d_inode->i_size;
1060         repbody->valid = OBD_MD_FLSIZE;
1061
1062         /* to make this asynchronous make sure that the handling function
1063            doesn't send a reply when this function completes. Instead a
1064            callback function would send the reply */
1065         /* note: in case of an error, dentry_open puts dentry */
1066         rc = mds_sendpage(req, file, body->size);
1067
1068         filp_close(file, 0);
1069 out_pop:
1070         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
1071 out:
1072         req->rq_status = rc;
1073         RETURN(0);
1074 }
1075
1076 int mds_reint(struct ptlrpc_request *req, int offset)
1077 {
1078         int rc;
1079         struct mds_update_record rec;
1080
1081         rc = mds_update_unpack(req, offset, &rec);
1082         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1083                 CERROR("invalid record\n");
1084                 req->rq_status = -EINVAL;
1085                 RETURN(0);
1086         }
1087         /* rc will be used to interrupt a for loop over multiple records */
1088         rc = mds_reint_rec(&rec, offset, req);
1089         return rc;
1090 }
1091
1092 /* forward declaration */
1093 int mds_handle(struct ptlrpc_request *req);
1094
1095 static int check_for_next_transno(struct mds_obd *mds)
1096 {
1097         struct ptlrpc_request *req;
1098         req = list_entry(mds->mds_recovery_queue.next, 
1099                          struct ptlrpc_request, rq_list);
1100         return req->rq_reqmsg->transno == mds->mds_next_recovery_transno;
1101 }
1102
1103 static void process_recovery_queue(struct mds_obd *mds)
1104 {
1105         struct ptlrpc_request *req;
1106         
1107         for (;;) {
1108                 spin_lock(&mds->mds_processing_task_lock);
1109                 req = list_entry(mds->mds_recovery_queue.next, 
1110                                  struct ptlrpc_request, rq_list);
1111
1112                 if (req->rq_reqmsg->transno != mds->mds_next_recovery_transno) {
1113                         spin_unlock(&mds->mds_processing_task_lock);
1114                         wait_event(mds->mds_next_transno_waitq,
1115                                    check_for_next_transno(mds));
1116                         continue;
1117                 }
1118                 list_del(&req->rq_list);
1119                 spin_unlock(&mds->mds_processing_task_lock);
1120
1121                 DEBUG_REQ(D_HA, req, "");
1122                 mds_handle(req);
1123                 
1124                 if (list_empty(&mds->mds_recovery_queue))
1125                         break;
1126         }
1127 }
1128
1129 static int queue_recovery_request(struct ptlrpc_request *req,
1130                                   struct mds_obd *mds)
1131 {
1132         struct list_head *tmp;
1133         int inserted = 0, transno = req->rq_reqmsg->transno;
1134
1135         if (!transno) {
1136                 DEBUG_REQ(D_HA, req, "not queueing");
1137                 return 1;
1138         }
1139
1140         spin_lock(&mds->mds_processing_task_lock);
1141
1142         if (mds->mds_processing_task == current->pid) {
1143                 /* Processing the queue right now, don't re-add. */
1144                 spin_unlock(&mds->mds_processing_task_lock);
1145                 return 1;
1146         }
1147
1148         /* XXX O(n^2) */
1149         list_for_each(tmp, &mds->mds_recovery_queue) {
1150                 struct ptlrpc_request *reqiter = 
1151                         list_entry(tmp, struct ptlrpc_request, rq_list);
1152                 if (reqiter->rq_reqmsg->transno > transno) {
1153                         list_add_tail(&req->rq_list, &reqiter->rq_list);
1154                         inserted = 1;
1155                         break;
1156                 }
1157         }
1158
1159         if (!inserted)
1160                 list_add_tail(&req->rq_list, &mds->mds_recovery_queue);
1161
1162         if (mds->mds_processing_task != 0) {
1163                 /* Someone else is processing this queue, we'll leave it to
1164                  * them.
1165                  */
1166                 spin_unlock(&mds->mds_processing_task_lock);
1167                 if (transno == mds->mds_next_recovery_transno)
1168                         wake_up(&mds->mds_next_transno_waitq);
1169                 return 0;
1170         }
1171
1172         /* Nobody is processing, and we know there's (at least) one to process
1173          * now, so we'll do the honours.
1174          */
1175         mds->mds_processing_task = current->pid;
1176         spin_unlock(&mds->mds_processing_task_lock);
1177
1178         process_recovery_queue(mds);
1179         return 0;
1180 }
1181
1182 static int filter_recovery_request(struct ptlrpc_request *req, 
1183                                    struct mds_obd *mds, int *process)
1184 {
1185         switch (req->rq_reqmsg->opc) {
1186         case MDS_CONNECT:
1187         case MDS_DISCONNECT:
1188         case MDS_OPEN:
1189                *process = 1;
1190                RETURN(0);
1191             
1192         case MDS_GETSTATUS: /* used in unmounting */
1193         case MDS_REINT:
1194         case LDLM_ENQUEUE:
1195                 *process = queue_recovery_request(req, mds);
1196                 RETURN(0);
1197                 
1198         default:
1199                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1200                 *process = 0;
1201                 RETURN(ptlrpc_error(req->rq_svc, req));
1202         }
1203 }
1204
1205 static int mds_queue_final_reply(struct ptlrpc_request *req, int rc)
1206 {
1207         struct mds_obd *mds = mds_req2mds(req);
1208
1209         if (rc) {
1210                 /* Just like ptlrpc_error, but without the sending. */
1211                 lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
1212                                 &req->rq_repmsg);
1213                 req->rq_type = PTL_RPC_MSG_ERR;
1214         }
1215
1216         list_add(&req->rq_list, &mds->mds_delayed_reply_queue);
1217         if (--mds->mds_recoverable_clients == 0) {
1218                 struct list_head *tmp, *n;
1219
1220                 CDEBUG(D_HA,
1221                        "all clients recovered, sending delayed replies\n");
1222                 list_for_each_safe(tmp, n, &mds->mds_delayed_reply_queue) {
1223                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
1224                         DEBUG_REQ(D_HA, req, "delayed:");
1225                         ptlrpc_reply(req->rq_svc, req);
1226                 }
1227         } else {
1228                 CDEBUG(D_HA, "%d recoverable clients remain\n",
1229                        mds->mds_recoverable_clients);
1230         }
1231
1232         return 1;
1233 }
1234
1235 static char *reint_names[] = {
1236         [REINT_SETATTR] "setattr",
1237         [REINT_CREATE]  "create",
1238         [REINT_LINK]    "link",
1239         [REINT_UNLINK]  "unlink",
1240         [REINT_RENAME]  "rename"
1241 };
1242
1243 int mds_handle(struct ptlrpc_request *req)
1244 {
1245         int rc;
1246         int should_process;
1247         struct mds_obd *mds = NULL; /* quell gcc overwarning */
1248         ENTRY;
1249
1250         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
1251         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
1252                 CERROR("lustre_mds: Invalid request\n");
1253                 GOTO(out, rc);
1254         }
1255
1256         LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME));
1257
1258         if (req->rq_reqmsg->opc != MDS_CONNECT) {
1259                 if (req->rq_export == NULL)
1260                         GOTO(out, rc = -ENOTCONN);
1261
1262                 mds = mds_req2mds(req);
1263                 if (mds->mds_recoverable_clients != 0) {
1264                         rc = filter_recovery_request(req, mds, &should_process);
1265                         if (rc || !should_process)
1266                                 RETURN(rc);
1267                 }
1268         }
1269
1270         switch (req->rq_reqmsg->opc) {
1271         case MDS_CONNECT:
1272                 DEBUG_REQ(D_INODE, req, "connect");
1273                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
1274                 rc = target_handle_connect(req);
1275                 /* Make sure that last_rcvd is correct. */
1276                 if (!rc) {
1277                         /* Now that we have an export, set mds. */
1278                         mds = mds_req2mds(req);
1279                         mds_fsync_super(mds->mds_sb);
1280                 }
1281                 break;
1282
1283         case MDS_DISCONNECT:
1284                 DEBUG_REQ(D_INODE, req, "disconnect");
1285                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
1286                 rc = target_handle_disconnect(req);
1287                 /* Make sure that last_rcvd is correct. */
1288                 if (!rc)
1289                         mds_fsync_super(mds->mds_sb);
1290                 goto out;
1291
1292         case MDS_GETSTATUS:
1293                 DEBUG_REQ(D_INODE, req, "getstatus");
1294                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
1295                 rc = mds_getstatus(req);
1296                 break;
1297
1298         case MDS_GETLOVINFO:
1299                 DEBUG_REQ(D_INODE, req, "getlovinfo");
1300                 rc = mds_getlovinfo(req);
1301                 break;
1302
1303         case MDS_GETATTR:
1304                 DEBUG_REQ(D_INODE, req, "getattr");
1305                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
1306                 rc = mds_getattr(0, req);
1307                 break;
1308
1309         case MDS_STATFS:
1310                 DEBUG_REQ(D_INODE, req, "statfs");
1311                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
1312                 rc = mds_statfs(req);
1313                 break;
1314
1315         case MDS_READPAGE:
1316                 DEBUG_REQ(D_INODE, req, "readpage\n");
1317                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
1318                 rc = mds_readpage(req);
1319
1320                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
1321                         return 0;
1322                 break;
1323
1324         case MDS_REINT: {
1325                 int size = sizeof(struct mds_body);
1326                 int opc = *(u32 *)lustre_msg_buf(req->rq_reqmsg, 0), 
1327                         realopc = opc & REINT_OPCODE_MASK;
1328                         
1329                 DEBUG_REQ(D_INODE, req, "reint (%s%s)",
1330                           reint_names[realopc],
1331                           opc & REINT_REPLAYING ? "|REPLAYING" : "");
1332                           
1333                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
1334
1335                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
1336                                      &req->rq_repmsg);
1337                 if (rc) {
1338                         req->rq_status = rc;
1339                         break;
1340                 }
1341                 rc = mds_reint(req, 0);
1342                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
1343                 break;
1344                 }
1345
1346         case MDS_OPEN:
1347                 DEBUG_REQ(D_INODE, req, "open");
1348                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
1349                 rc = mds_open(req);
1350                 break;
1351
1352         case MDS_CLOSE:
1353                 DEBUG_REQ(D_INODE, req, "close");
1354                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
1355                 rc = mds_close(req);
1356                 break;
1357
1358         case LDLM_ENQUEUE:
1359                 DEBUG_REQ(D_INODE, req, "enqueue");
1360                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1361                 rc = ldlm_handle_enqueue(req);
1362                 break;
1363         case LDLM_CONVERT:
1364                 DEBUG_REQ(D_INODE, req, "convert");
1365                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1366                 rc = ldlm_handle_convert(req);
1367                 break;
1368         case LDLM_BL_CALLBACK:
1369         case LDLM_CP_CALLBACK:
1370                 DEBUG_REQ(D_INODE, req, "callback");
1371                 CERROR("callbacks should not happen on MDS\n");
1372                 LBUG();
1373                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1374                 break;
1375         default:
1376                 rc = ptlrpc_error(req->rq_svc, req);
1377                 RETURN(rc);
1378         }
1379
1380         EXIT;
1381
1382         if (!rc) {
1383                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1384
1385                 req->rq_repmsg->last_xid =
1386                         HTON__u64(le64_to_cpu(med->med_mcd->mcd_last_xid));
1387                 req->rq_repmsg->last_committed =
1388                         HTON__u64(mds->mds_last_committed);
1389                 CDEBUG(D_INFO, "last_rcvd ~%Lu, last_committed %Lu, xid %d\n",
1390                        (unsigned long long)mds->mds_last_rcvd,
1391                        (unsigned long long)mds->mds_last_committed,
1392                        cpu_to_le32(req->rq_xid));
1393         }
1394  out:
1395
1396         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1397                 struct mds_obd *mds = mds_req2mds(req);
1398                 LASSERT(mds->mds_recoverable_clients);
1399                 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1400                 return mds_queue_final_reply(req, rc);
1401         }
1402         
1403         /* MDS_CONNECT / EALREADY (note: not -EALREADY!) isn't an error */
1404         if (rc && (req->rq_reqmsg->opc != MDS_CONNECT ||
1405                    rc != EALREADY)) {
1406                 CERROR("mds: processing error (opcode %d): %d\n",
1407                        req->rq_reqmsg->opc, rc);
1408                 ptlrpc_error(req->rq_svc, req);
1409         } else {
1410                 CDEBUG(D_NET, "sending reply\n");
1411                 ptlrpc_reply(req->rq_svc, req);
1412         }
1413         return 0;
1414 }
1415
1416 /* Update the server data on disk.  This stores the new mount_count and
1417  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
1418  * then the server last_rcvd value may be less than that of the clients.
1419  * This will alert us that we may need to do client recovery.
1420  *
1421  * Assumes we are already in the server filesystem context.
1422  *
1423  * Also assumes for mds_last_rcvd that we are not modifying it (no locking).
1424  */
1425 int mds_update_server_data(struct mds_obd *mds)
1426 {
1427         struct mds_server_data *msd = mds->mds_server_data;
1428         struct file *filp = mds->mds_rcvd_filp;
1429         loff_t off = 0;
1430         int rc;
1431
1432         msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
1433         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
1434
1435         CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_rcvd is %Lu\n",
1436                (unsigned long long)mds->mds_mount_count,
1437                (unsigned long long)mds->mds_last_rcvd);
1438         rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
1439         if (rc != sizeof(*msd)) {
1440                 CERROR("error writing MDS server data: rc = %d\n", rc);
1441                 if (rc > 0)
1442                         RETURN(-EIO);
1443                 RETURN(rc);
1444         }
1445 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1446         rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
1447 #else
1448         rc = file_fsync(filp,  filp->f_dentry, 1);
1449 #endif
1450         if (rc)
1451                 CERROR("error flushing MDS server data: rc = %d\n", rc);
1452
1453         return 0;
1454 }
1455
1456 /* Do recovery actions for the MDS */
1457 static int mds_recovery_complete(struct obd_device *obddev)
1458 {
1459         struct mds_obd *mds = &obddev->u.mds;
1460         struct obd_run_ctxt saved;
1461         int rc;
1462
1463         LASSERT(mds->mds_recoverable_clients == 0);
1464
1465         /* This happens at the end when recovery is complete */
1466         ++mds->mds_mount_count;
1467         push_ctxt(&saved, &mds->mds_ctxt, NULL);
1468         rc = mds_update_server_data(mds);
1469         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
1470
1471         return rc;
1472 }
1473
1474 /* mount the file system (secretly) */
1475 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
1476 {
1477         struct obd_ioctl_data* data = buf;
1478         struct mds_obd *mds = &obddev->u.mds;
1479         struct vfsmount *mnt;
1480         int rc = 0;
1481         ENTRY;
1482
1483         MOD_INC_USE_COUNT;
1484 #ifdef CONFIG_DEV_RDONLY
1485         dev_clear_rdonly(2);
1486 #endif
1487         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1488                 GOTO(err_dec, rc = -EINVAL);
1489
1490         mds->mds_fstype = strdup(data->ioc_inlbuf2);
1491
1492         mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
1493         if (IS_ERR(mnt)) {
1494                 rc = PTR_ERR(mnt);
1495                 CERROR("do_kern_mount failed: rc = %d\n", rc);
1496                 GOTO(err_kfree, rc);
1497         }
1498
1499         CERROR("%s: mnt is %p\n", data->ioc_inlbuf1, mnt);
1500         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
1501         if (!mds->mds_sb)
1502                 GOTO(err_put, rc = -ENODEV);
1503
1504         init_MUTEX(&mds->mds_transno_sem);
1505         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
1506         rc = mds_fs_setup(obddev, mnt);
1507         if (rc) {
1508                 CERROR("MDS filesystem method init failed: rc = %d\n", rc);
1509                 GOTO(err_put, rc);
1510         }
1511
1512         obddev->obd_namespace =
1513                 ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
1514         if (obddev->obd_namespace == NULL) {
1515                 mds_cleanup(obddev);
1516                 GOTO(err_fs, rc = -ENOMEM);
1517         }
1518
1519         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1520                            "mds_ldlm_client", &obddev->obd_ldlm_client);
1521
1522         spin_lock_init(&mds->mds_processing_task_lock);
1523         mds->mds_processing_task = 0;
1524         INIT_LIST_HEAD(&mds->mds_recovery_queue);
1525         INIT_LIST_HEAD(&mds->mds_delayed_reply_queue);
1526         
1527         RETURN(0);
1528
1529 err_fs:
1530         mds_fs_cleanup(obddev);
1531 err_put:
1532         unlock_kernel();
1533         mntput(mds->mds_vfsmnt);
1534         mds->mds_sb = 0;
1535         lock_kernel();
1536 err_kfree:
1537         kfree(mds->mds_fstype);
1538 err_dec:
1539         MOD_DEC_USE_COUNT;
1540         RETURN(rc);
1541 }
1542
1543 static int mds_cleanup(struct obd_device *obddev)
1544 {
1545         struct super_block *sb;
1546         struct mds_obd *mds = &obddev->u.mds;
1547         struct obd_run_ctxt saved;
1548         ENTRY;
1549
1550         sb = mds->mds_sb;
1551         if (!mds->mds_sb)
1552                 RETURN(0);
1553
1554         push_ctxt(&saved, &mds->mds_ctxt, NULL);
1555         mds_update_server_data(mds);
1556
1557         if (mds->mds_rcvd_filp) {
1558                 int rc = filp_close(mds->mds_rcvd_filp, 0);
1559                 mds->mds_rcvd_filp = NULL;
1560
1561                 if (rc)
1562                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
1563         }
1564         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
1565
1566         unlock_kernel();
1567         mntput(mds->mds_vfsmnt);
1568         mds->mds_sb = 0;
1569         kfree(mds->mds_fstype);
1570
1571         ldlm_namespace_free(obddev->obd_namespace);
1572
1573         lock_kernel();
1574 #ifdef CONFIG_DEV_RDONLY
1575         dev_clear_rdonly(2);
1576 #endif
1577         mds_fs_cleanup(obddev);
1578
1579         MOD_DEC_USE_COUNT;
1580         RETURN(0);
1581 }
1582
1583 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
1584                               ldlm_mode_t mode, int flags, void *data)
1585 {
1586         struct ptlrpc_request *req = req_cookie;
1587         int rc = 0;
1588         ENTRY;
1589
1590         if (!req_cookie)
1591                 RETURN(0);
1592
1593         if (req->rq_reqmsg->bufcount > 1) {
1594                 /* an intent needs to be considered */
1595                 struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
1596                 struct mds_obd *mds= &req->rq_export->exp_obd->u.mds;
1597                 struct mds_body *mds_rep;
1598                 struct ldlm_reply *rep;
1599                 __u64 new_resid[3] = {0, 0, 0}, old_res;
1600                 int rc, size[3] = {sizeof(struct ldlm_reply),
1601                                                   sizeof(struct mds_body),
1602                                                   mds->mds_max_mdsize};
1603
1604                 it->opc = NTOH__u64(it->opc);
1605
1606                 LDLM_DEBUG(lock, "intent policy, opc: %s",
1607                            ldlm_it2str(it->opc));
1608
1609                 rc = lustre_pack_msg(3, size, NULL, &req->rq_replen,
1610                                      &req->rq_repmsg);
1611                 if (rc) {
1612                         rc = req->rq_status = -ENOMEM;
1613                         RETURN(rc);
1614                 }
1615
1616                 rep = lustre_msg_buf(req->rq_repmsg, 0);
1617                 rep->lock_policy_res1 = 1;
1618
1619                 /* execute policy */
1620                 switch ((long)it->opc) {
1621                 case IT_CREAT|IT_OPEN:
1622                         rc = mds_reint(req, 2);
1623                         if (rc || (req->rq_status != 0 &&
1624                                    req->rq_status != -EEXIST)) {
1625                                 rep->lock_policy_res2 = req->rq_status;
1626                                 RETURN(ELDLM_LOCK_ABORTED);
1627                         }
1628                         break;
1629                 case IT_CREAT:
1630                 case IT_MKDIR:
1631                 case IT_MKNOD:
1632                 case IT_RENAME2:
1633                 case IT_LINK2:
1634                 case IT_RMDIR:
1635                 case IT_SYMLINK:
1636                 case IT_UNLINK:
1637                         rc = mds_reint(req, 2);
1638                         if (rc || (req->rq_status != 0 &&
1639                                    req->rq_status != -EISDIR &&
1640                                    req->rq_status != -ENOTDIR)) {
1641                                 rep->lock_policy_res2 = req->rq_status;
1642                                 RETURN(ELDLM_LOCK_ABORTED);
1643                         }
1644                         break;
1645                 case IT_GETATTR:
1646                 case IT_LOOKUP:
1647                 case IT_OPEN:
1648                 case IT_READDIR:
1649                 case IT_READLINK:
1650                 case IT_RENAME:
1651                 case IT_LINK:
1652                 case IT_SETATTR:
1653                         rc = mds_getattr_name(2, req);
1654                         /* FIXME: we need to sit down and decide on who should
1655                          * set req->rq_status, who should return negative and
1656                          * positive return values, and what they all mean. */
1657                         if (rc || req->rq_status != 0) {
1658                                 rep->lock_policy_res2 = req->rq_status;
1659                                 RETURN(ELDLM_LOCK_ABORTED);
1660                         }
1661                         break;
1662                 case IT_READDIR|IT_OPEN:
1663                         LBUG();
1664                         break;
1665                 default:
1666                         CERROR("Unhandled intent "LPD64"\n", it->opc);
1667                         LBUG();
1668                 }
1669
1670                 /* We don't bother returning a lock to the client for a file
1671                  * or directory we are removing.
1672                  *
1673                  * As for link and rename, there is no reason for the client
1674                  * to get a lock on the target at this point.  If they are
1675                  * going to modify the file/directory later they will get a
1676                  * lock at that time.
1677                  */
1678                 if (it->opc & (IT_UNLINK | IT_RMDIR | IT_LINK | IT_LINK2 |
1679                                IT_RENAME | IT_RENAME2))
1680                         RETURN(ELDLM_LOCK_ABORTED);
1681
1682                 rep->lock_policy_res2 = req->rq_status;
1683                 mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
1684
1685                 /* If the client is about to open a file that doesn't have an MD
1686                  * stripe record, it's going to need a write lock. */
1687                 if (it->opc & IT_OPEN && !(mds_rep->valid & OBD_MD_FLEASIZE)) {
1688                         LDLM_DEBUG(lock, "open with no EA; returning PW lock");
1689                         lock->l_req_mode = LCK_PW;
1690                 }
1691
1692                 if (flags & LDLM_FL_INTENT_ONLY) {
1693                         LDLM_DEBUG(lock, "INTENT_ONLY, aborting lock");
1694                         RETURN(ELDLM_LOCK_ABORTED);
1695                 }
1696                 /* Give the client a lock on the child object, instead of the
1697                  * parent that it requested. */
1698                 new_resid[0] = NTOH__u32(mds_rep->ino);
1699                 new_resid[1] = NTOH__u32(mds_rep->generation);
1700                 if (new_resid[0] == 0)
1701                         LBUG();
1702                 old_res = lock->l_resource->lr_name[0];
1703
1704                 ldlm_lock_change_resource(lock, new_resid);
1705                 if (lock->l_resource == NULL) {
1706                         LBUG();
1707                         RETURN(-ENOMEM);
1708                 }
1709                 LDLM_DEBUG(lock, "intent policy, old res %ld",
1710                            (long)old_res);
1711                 RETURN(ELDLM_LOCK_CHANGED);
1712         } else {
1713                 int size = sizeof(struct ldlm_reply);
1714                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
1715                                      &req->rq_repmsg);
1716                 if (rc) {
1717                         LBUG();
1718                         RETURN(-ENOMEM);
1719                 }
1720         }
1721         RETURN(rc);
1722 }
1723
1724 int mds_attach(struct obd_device *dev, obd_count len, void *data)
1725 {
1726         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
1727 }
1728
1729 int mds_detach(struct obd_device *dev)
1730 {
1731         return lprocfs_dereg_obd(dev);
1732 }
1733
1734 static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf)
1735 {
1736         int i;
1737         //        struct obd_ioctl_data* data = buf;
1738         struct mds_obd *mds = &obddev->u.mds;
1739         int rc = 0;
1740         ENTRY;
1741
1742         MOD_INC_USE_COUNT;
1743
1744         mds->mds_service = ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
1745                                            MDS_BUFSIZE, MDS_MAXREQSIZE,
1746                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
1747                                            "self", mds_handle, "mds");
1748         if (!mds->mds_service) {
1749                 CERROR("failed to start service\n");
1750                 GOTO(err_dec, rc = -ENOMEM);
1751         }
1752
1753         for (i = 0; i < MDT_NUM_THREADS; i++) {
1754                 char name[32];
1755                 sprintf(name, "ll_mdt_%02d", i);
1756                 rc = ptlrpc_start_thread(obddev, mds->mds_service, name);
1757                 if (rc) {
1758                         CERROR("cannot start MDT thread #%d: rc %d\n", i, rc);
1759                         GOTO(err_thread, rc);
1760                 }
1761         }
1762
1763         RETURN(0);
1764
1765 err_thread:
1766         ptlrpc_stop_all_threads(mds->mds_service);
1767         ptlrpc_unregister_service(mds->mds_service);
1768 err_dec:
1769         MOD_DEC_USE_COUNT;
1770         RETURN(rc);
1771 }
1772
1773
1774 static int mdt_cleanup(struct obd_device *obddev)
1775 {
1776         struct mds_obd *mds = &obddev->u.mds;
1777         ENTRY;
1778
1779         ptlrpc_stop_all_threads(mds->mds_service);
1780         ptlrpc_unregister_service(mds->mds_service);
1781
1782         MOD_DEC_USE_COUNT;
1783         RETURN(0);
1784 }
1785
1786 extern int mds_iocontrol(long cmd, struct lustre_handle *conn,
1787                          int len, void *karg, void *uarg);
1788
1789 /* use obd ops to offer management infrastructure */
1790 static struct obd_ops mds_obd_ops = {
1791         o_attach:      mds_attach,
1792         o_detach:      mds_detach,
1793         o_connect:     mds_connect,
1794         o_disconnect:  mds_disconnect,
1795         o_setup:       mds_setup,
1796         o_cleanup:     mds_cleanup,
1797         o_iocontrol:   mds_iocontrol
1798 };
1799
1800 static struct obd_ops mdt_obd_ops = {
1801         o_setup:       mdt_setup,
1802         o_cleanup:     mdt_cleanup,
1803 };
1804
1805
1806 static int __init mds_init(void)
1807 {
1808
1809         mds_file_cache = kmem_cache_create("ll_mds_file_data",
1810                                            sizeof(struct mds_file_data),
1811                                            0, 0, NULL, NULL);
1812         if (mds_file_cache == NULL)
1813                 return -ENOMEM;
1814
1815         class_register_type(&mds_obd_ops, status_class_var, LUSTRE_MDS_NAME);
1816         class_register_type(&mdt_obd_ops, 0, LUSTRE_MDT_NAME);
1817         ldlm_register_intent(ldlm_intent_policy);
1818         return 0;
1819
1820 }
1821
1822 static void __exit mds_exit(void)
1823 {
1824
1825
1826         ldlm_unregister_intent();
1827         class_unregister_type(LUSTRE_MDS_NAME);
1828         class_unregister_type(LUSTRE_MDT_NAME);
1829         if (kmem_cache_destroy(mds_file_cache))
1830                 CERROR("couldn't free MDS file cache\n");
1831
1832 }
1833
1834 MODULE_AUTHOR("Cluster File Systems <info@clusterfs.com>");
1835 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
1836 MODULE_LICENSE("GPL");
1837
1838 module_init(mds_init);
1839 module_exit(mds_exit);